You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ru...@apache.org on 2020/11/06 11:33:37 UTC

[incubator-ratis] branch master updated: RATIS-1134. Remove DataStreamApi.stream(RaftGroupId). (#259)

This is an automated email from the ASF dual-hosted git repository.

runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 189ee94  RATIS-1134. Remove DataStreamApi.stream(RaftGroupId). (#259)
189ee94 is described below

commit 189ee94471afdef8bd6c8b1fb419309230bfaa24
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Fri Nov 6 19:32:44 2020 +0800

    RATIS-1134. Remove DataStreamApi.stream(RaftGroupId). (#259)
    
    * RATIS-1134. Remove DataStreamApi.stream(RaftGroupId).
    
    * Fixed checkstyle and changed TestDataStreamNetty slightly.
---
 .../org/apache/ratis/client/api/DataStreamApi.java |  7 +-----
 .../ratis/client/impl/DataStreamClientImpl.java    | 27 +++++++++-------------
 .../impl/DataStreamPacketByteBuffer.java           |  4 ++--
 .../ratis/datastream/DataStreamBaseTest.java       |  6 ++---
 .../ratis/datastream/TestDataStreamDisabled.java   |  1 -
 .../ratis/datastream/TestDataStreamNetty.java      |  4 ++--
 6 files changed, 19 insertions(+), 30 deletions(-)

diff --git a/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamApi.java b/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamApi.java
index 8b07fdb..3383567 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamApi.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamApi.java
@@ -17,8 +17,6 @@
  */
 package org.apache.ratis.client.api;
 
-import org.apache.ratis.protocol.RaftGroupId;
-
 /**
  * Stream data asynchronously to all the servers in the {@link org.apache.ratis.protocol.RaftGroup}.
  * Clients may stream data to the nearest server and then the server will forward the data to the other servers.
@@ -37,9 +35,6 @@ import org.apache.ratis.protocol.RaftGroupId;
  * but {@link MessageStreamApi} streams messages only to the leader.
  */
 public interface DataStreamApi {
-  /** Create a stream to write data to the default group. */
+  /** Create a stream to write data. */
   DataStreamOutput stream();
-
-  /** Create a stream to write data to the given group. */
-  DataStreamOutput stream(RaftGroupId groupId);
 }
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
index bf54fb9..6c07374 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
@@ -25,13 +25,13 @@ import org.apache.ratis.client.DataStreamOutputRpc;
 import org.apache.ratis.conf.Parameters;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.datastream.SupportedDataStreamType;
+import org.apache.ratis.datastream.impl.DataStreamPacketByteBuffer;
 import org.apache.ratis.protocol.ClientId;
 import org.apache.ratis.protocol.DataStreamReply;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
-import org.apache.ratis.thirdparty.io.netty.buffer.Unpooled;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -75,35 +75,35 @@ public class DataStreamClientImpl implements DataStreamClient {
           ClientProtoUtils.toRaftClientRequestProto(header).toByteString().asReadOnlyByteBuffer(), Type.STREAM_HEADER);
     }
 
-    long getStreamId() {
-      return header.getCallId();
+    private CompletableFuture<DataStreamReply> send(Type type, ByteBuffer buffer) {
+      return orderedStreamAsync.sendRequest(header.getCallId(), streamOffset, buffer, type);
+    }
+
+    private CompletableFuture<DataStreamReply> send(Type type) {
+      return send(type, DataStreamPacketByteBuffer.EMPTY_BYTE_BUFFER);
     }
 
     // send to the attached dataStreamClientRpc
     @Override
     public CompletableFuture<DataStreamReply> writeAsync(ByteBuffer buf) {
-      final CompletableFuture<DataStreamReply> f = orderedStreamAsync.sendRequest(getStreamId(), streamOffset, buf,
-          Type.STREAM_DATA);
+      final CompletableFuture<DataStreamReply> f = send(Type.STREAM_DATA, buf);
       streamOffset += buf.remaining();
       return f;
     }
 
     @Override
     public CompletableFuture<DataStreamReply> closeAsync() {
-      return orderedStreamAsync.sendRequest(getStreamId(), streamOffset, Unpooled.EMPTY_BUFFER.nioBuffer(),
-          Type.STREAM_CLOSE);
+      return send(Type.STREAM_CLOSE);
     }
 
     @Override
     public CompletableFuture<DataStreamReply> closeForwardAsync() {
-      return orderedStreamAsync.sendRequest(getStreamId(), streamOffset, Unpooled.EMPTY_BUFFER.nioBuffer(),
-          Type.STREAM_CLOSE_FORWARD);
+      return send(Type.STREAM_CLOSE_FORWARD);
     }
 
     @Override
     public CompletableFuture<DataStreamReply> startTransactionAsync() {
-      return orderedStreamAsync.sendRequest(getStreamId(), streamOffset, Unpooled.EMPTY_BUFFER.nioBuffer(),
-          Type.START_TRANSACTION);
+      return send(Type.START_TRANSACTION);
     }
 
     public RaftClientRequest getHeader() {
@@ -123,11 +123,6 @@ public class DataStreamClientImpl implements DataStreamClient {
 
   @Override
   public DataStreamOutputRpc stream() {
-    return stream(groupId);
-  }
-
-  @Override
-  public DataStreamOutputRpc stream(RaftGroupId gid) {
     RaftClientRequest request = new RaftClientRequest(
         clientId, raftServer.getId(), groupId, RaftClientImpl.nextCallId(), RaftClientRequest.writeRequestType());
     return new DataStreamOutputImpl(request);
diff --git a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamPacketByteBuffer.java b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamPacketByteBuffer.java
index fcb420d..0caa77e 100644
--- a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamPacketByteBuffer.java
+++ b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamPacketByteBuffer.java
@@ -24,13 +24,13 @@ import java.nio.ByteBuffer;
  * Implements {@link org.apache.ratis.protocol.DataStreamPacket} with {@link ByteBuffer}.
  */
 public abstract class DataStreamPacketByteBuffer extends DataStreamPacketImpl {
-  private static final ByteBuffer EMPTY = ByteBuffer.allocateDirect(0).asReadOnlyBuffer();
+  public static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.allocateDirect(0).asReadOnlyBuffer();
 
   private final ByteBuffer buffer;
 
   public DataStreamPacketByteBuffer(long streamId, long streamOffset, ByteBuffer buffer, Type type) {
     super(streamId, streamOffset, type);
-    this.buffer = buffer != null? buffer.asReadOnlyBuffer(): EMPTY;
+    this.buffer = buffer != null? buffer.asReadOnlyBuffer(): EMPTY_BYTE_BUFFER;
   }
 
   @Override
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
index 51c6bb7..4d50d7f 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
@@ -397,8 +397,8 @@ abstract class DataStreamBaseTest extends BaseTest {
         final RaftClient client = newRaftClientForDataStream();
         clients.add(client);
         for (int i = 0; i < numStreams; i++) {
-          futures.add(CompletableFuture.runAsync(
-              () -> runTestDataStream((DataStreamOutputImpl) client.getDataStreamApi().stream(raftGroup.getGroupId()), bufferSize, bufferNum)));
+          futures.add(CompletableFuture.runAsync(() -> runTestDataStream(
+              (DataStreamOutputImpl) client.getDataStreamApi().stream(), bufferSize, bufferNum)));
         }
       }
       Assert.assertEquals(numClients*numStreams, futures.size());
@@ -413,7 +413,7 @@ abstract class DataStreamBaseTest extends BaseTest {
   private void runTestCloseStream(int bufferSize, int bufferNum, RaftClientReply expectedClientReply)
       throws IOException {
     try (final RaftClient client = newRaftClientForDataStream()) {
-      DataStreamOutputImpl out = (DataStreamOutputImpl) client.getDataStreamApi().stream(raftGroup.getGroupId());
+      final DataStreamOutputImpl out = (DataStreamOutputImpl) client.getDataStreamApi().stream();
       runTestDataStream(out, bufferSize, bufferNum);
       DataStreamReplyByteBuffer replyByteBuffer = (DataStreamReplyByteBuffer) out.closeAsync().join();
 
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamDisabled.java b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamDisabled.java
index 0350b2d..7444c12 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamDisabled.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamDisabled.java
@@ -20,7 +20,6 @@ package org.apache.ratis.datastream;
 import org.apache.ratis.RaftConfigKeys;
 import org.apache.ratis.client.DisabledDataStreamClientFactory;
 import org.apache.ratis.client.RaftClient;
-import org.apache.ratis.client.impl.DataStreamClientImpl;
 import org.apache.ratis.conf.RaftProperties;
 import org.junit.Before;
 import org.junit.Rule;
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamNetty.java b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamNetty.java
index 83decef..4b13599 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamNetty.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamNetty.java
@@ -58,13 +58,13 @@ public class TestDataStreamNetty extends DataStreamBaseTest {
 
   @Test
   public void testDataStreamSingleServer() throws Exception {
-    runTestDataStream(1, 2, 20, 1_000_000, 10);
+    runTestDataStream(1, 5, 10, 1_000_000, 10);
     runTestDataStream(1, 2, 20, 1_000, 10_000);
   }
 
   @Test
   public void testDataStreamMultipleServer() throws Exception {
-    runTestDataStream(3, 2, 20, 1_000_000, 100);
+    runTestDataStream(3, 5, 10, 1_000_000, 10);
     runTestDataStream(3, 2, 20, 1_000, 10_000);
   }