You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ratis.apache.org by GitBox <gi...@apache.org> on 2020/11/05 13:09:25 UTC

[GitHub] [incubator-ratis] szetszwo commented on a change in pull request #257: RATIS-1132. Primary and peer should use the same streamId

szetszwo commented on a change in pull request #257:
URL: https://github.com/apache/incubator-ratis/pull/257#discussion_r518032960



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -139,7 +141,7 @@ void close() {
       return stream;
     }
 
-    List<DataStreamOutputRpc> getDataStreamOutputs() {
+    public List<DataStreamOutputRpc> getDataStreamOutputs() {

Review comment:
       This change is not used.

##########
File path: ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
##########
@@ -390,6 +391,15 @@ protected void runTestCloseStream(List<RaftServer> raftServers, int bufferSize,
     }
   }
 
+  protected void runTestSameStreamId(int numServers, int bufferSize, int bufferNum) throws Exception {

Review comment:
       Let's combine the new test with the existing runTestDataStream(..).  We actually could change back it to check header for all servers (we changed to check only the primary server since call id was unique).
   ```
   +++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
   @@ -458,22 +458,27 @@ abstract class DataStreamBaseTest extends BaseTest {
          Assert.assertEquals(reply.getType(), Type.STREAM_DATA);
        }
        try {
   -      assertHeader(out.getHeader(), dataSize);
   +      for(Server s : servers) {
   +        assertHeader(s, out.getHeader(), dataSize);
   +      }
        } catch (Throwable e) {
          throw new CompletionException(e);
        }
      }
    
   -  void assertHeader(RaftClientRequest header, int dataSize) throws Exception {
   -    final Server server = getPrimaryServer();
   +  void assertHeader(Server server, RaftClientRequest header, int dataSize) throws Exception {
        final MultiDataStreamStateMachine s = server.getStateMachine(header.getRaftGroupId());
        final SingleDataStream stream = s.getSingleDataStream(header.getCallId());
        final RaftClientRequest writeRequest = stream.getWriteRequest();
        Assert.assertEquals(writeRequest.getCallId(), header.getCallId());
        Assert.assertEquals(writeRequest.getRaftGroupId(), header.getRaftGroupId());
        Assert.assertEquals(raftGroup.getGroupId(), header.getRaftGroupId());
   -    Assert.assertEquals(writeRequest.getServerId(), header.getServerId());
        Assert.assertEquals(dataSize, stream.getByteWritten());
   +
   +    final Server primary = getPrimaryServer();
   +    if (server == primary) {
   +      Assert.assertEquals(writeRequest.getServerId(), header.getServerId());
   +    }
      }
   ```
   

##########
File path: ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
##########
@@ -70,14 +70,17 @@ public DataStreamClientImpl(
     private long streamOffset = 0;
 
     public DataStreamOutputImpl(RaftGroupId groupId) {
-      final long streamId = RaftClientImpl.nextCallId();
+      this(groupId, RaftClientImpl.nextCallId());
+    }
+
+    public DataStreamOutputImpl(RaftGroupId groupId, long streamId) {
       this.header = new RaftClientRequest(clientId, raftServer.getId(), groupId, streamId,
           RaftClientRequest.writeRequestType());
       this.headerFuture = orderedStreamAsync.sendRequest(streamId, -1,
           ClientProtoUtils.toRaftClientRequestProto(header).toByteString().asReadOnlyByteBuffer(), Type.STREAM_HEADER);
     }
 
-    long getStreamId() {
+    public long getStreamId() {

Review comment:
       Use  getHeader().getCallId() in the test to avoid changing this to public.

##########
File path: ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamApi.java
##########
@@ -42,4 +42,7 @@
 
   /** Create a stream to write data to the given group. */
   DataStreamOutput stream(RaftGroupId groupId);
+
+  /** Create a stream to write data to the given group. */
+  DataStreamOutput stream(RaftGroupId groupId, long streamId);

Review comment:
       Similar to DataStreamOutputRpc, let's add a new interface
   ```
   package org.apache.ratis.client;
   
   interface DataStreamRpcApi implements DataStreamApi
   ```
   so that the new method  stream(RaftGroupId groupId, long streamId) is hide from users.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org