You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ratis.apache.org by GitBox <gi...@apache.org> on 2020/11/23 10:29:53 UTC

[GitHub] [incubator-ratis] szetszwo opened a new pull request #294: RATIS-1171. Allow null for the stream parameter in StateMachine.DataApi.link

szetszwo opened a new pull request #294:
URL: https://github.com/apache/incubator-ratis/pull/294


   See https://issues.apache.org/jira/browse/RATIS-1171


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] szetszwo commented on a change in pull request #294: RATIS-1171. Allow null for the stream parameter in StateMachine.DataApi.link

Posted by GitBox <gi...@apache.org>.
szetszwo commented on a change in pull request #294:
URL: https://github.com/apache/incubator-ratis/pull/294#discussion_r528721708



##########
File path: ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java
##########
@@ -76,18 +80,34 @@ static ByteString bytesWritten2ByteString(long bytesWritten) {
 
     @Override
     public CompletableFuture<?> link(DataStream stream, LogEntryProto entry) {
-      final SingleDataStream s = getSingleDataStream(ClientInvocationId.valueOf(entry.getStateMachineLogEntry()));
-      s.setLogEntry(entry);
+      LOG.info("link {}", stream);
+      if (stream == null) {
+        return JavaUtils.completeExceptionally(new IllegalStateException("Null stream: entry=" + entry));
+      }
+      ((SingleDataStream)stream).setLogEntry(entry);
       return CompletableFuture.completedFuture(null);
     }
 
+    @Override
+    public CompletableFuture<Message> applyTransaction(TransactionContext trx) {

Review comment:
       Yes.  Otherwise, the reply won't have a message.
   
   I just found that there are bugs in both the code and the tests.  The double negative makes the tests passed.  Will fix them.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang closed pull request #294: RATIS-1171. Allow null for the stream parameter in StateMachine.DataApi.link

Posted by GitBox <gi...@apache.org>.
runzhiwang closed pull request #294:
URL: https://github.com/apache/incubator-ratis/pull/294


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang closed pull request #294: RATIS-1171. Allow null for the stream parameter in StateMachine.DataApi.link

Posted by GitBox <gi...@apache.org>.
runzhiwang closed pull request #294:
URL: https://github.com/apache/incubator-ratis/pull/294


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] szetszwo closed pull request #294: RATIS-1171. Allow null for the stream parameter in StateMachine.DataApi.link

Posted by GitBox <gi...@apache.org>.
szetszwo closed pull request #294:
URL: https://github.com/apache/incubator-ratis/pull/294


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #294: RATIS-1171. Allow null for the stream parameter in StateMachine.DataApi.link

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #294:
URL: https://github.com/apache/incubator-ratis/pull/294#discussion_r529119150



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java
##########
@@ -101,7 +101,7 @@ static DataStreamReplyByteBuffer decodeDataStreamReplyByteBuffer(ByteBuf buf) {
         .map(header -> checkHeader(header, buf))
         .map(header -> DataStreamReplyByteBuffer.newBuilder()
             .setDataStreamReplyHeader(header)
-            .setBuffer(decodeData(buf, header, ByteBuf::nioBuffer))
+            .setBuffer(decodeData(buf, header, b -> b.copy().nioBuffer()))

Review comment:
       @szetszwo Sorry, miss this. Why we need b.copy() ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #294: RATIS-1171. Allow null for the stream parameter in StateMachine.DataApi.link

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #294:
URL: https://github.com/apache/incubator-ratis/pull/294#discussion_r528638515



##########
File path: ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java
##########
@@ -76,18 +80,34 @@ static ByteString bytesWritten2ByteString(long bytesWritten) {
 
     @Override
     public CompletableFuture<?> link(DataStream stream, LogEntryProto entry) {
-      final SingleDataStream s = getSingleDataStream(ClientInvocationId.valueOf(entry.getStateMachineLogEntry()));
-      s.setLogEntry(entry);
+      LOG.info("link {}", stream);
+      if (stream == null) {
+        return JavaUtils.completeExceptionally(new IllegalStateException("Null stream: entry=" + entry));
+      }
+      ((SingleDataStream)stream).setLogEntry(entry);
       return CompletableFuture.completedFuture(null);
     }
 
+    @Override
+    public CompletableFuture<Message> applyTransaction(TransactionContext trx) {

Review comment:
       @szetszwo Hi, sorry, why we need to override applyTransaction ? seems no test need this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang merged pull request #294: RATIS-1171. Allow null for the stream parameter in StateMachine.DataApi.link

Posted by GitBox <gi...@apache.org>.
runzhiwang merged pull request #294:
URL: https://github.com/apache/incubator-ratis/pull/294


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] szetszwo commented on a change in pull request #294: RATIS-1171. Allow null for the stream parameter in StateMachine.DataApi.link

Posted by GitBox <gi...@apache.org>.
szetszwo commented on a change in pull request #294:
URL: https://github.com/apache/incubator-ratis/pull/294#discussion_r528763724



##########
File path: ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamAsyncClusterTests.java
##########
@@ -50,32 +57,57 @@ public void testMultipleStreamsMultipleServers() throws Exception {
 
   void runTestDataStream(CLUSTER cluster) throws Exception {
     RaftTestUtil.waitForLeader(cluster);
-    final List<CompletableFuture<Void>> futures = new ArrayList<>();
-    futures.add(CompletableFuture.runAsync(() -> runTestDataStream(cluster, 5, 10, 1_000_000, 10), executor));
-    futures.add(CompletableFuture.runAsync(() -> runTestDataStream(cluster, 2, 20, 1_000, 10_000), executor));
-    futures.forEach(CompletableFuture::join);
+
+    final List<CompletableFuture<Long>> futures = new ArrayList<>();
+    futures.add(CompletableFuture.supplyAsync(() -> runTestDataStream(cluster, 5, 10, 1_000_000, 10), executor));
+    futures.add(CompletableFuture.supplyAsync(() -> runTestDataStream(cluster, 2, 20, 1_000, 10_000), executor));
+    final long maxIndex = futures.stream()
+        .map(CompletableFuture::join)
+        .max(Long::compareTo)
+        .orElseThrow(IllegalStateException::new);
+
+    // wait for all servers to catch up
+    try (RaftClient client = cluster.createClient()) {
+      client.async().watch(maxIndex, ReplicationLevel.ALL).join();
+    }
+    // assert all streams are linked
+    for (RaftServerProxy proxy : cluster.getServers()) {
+      final RaftServerImpl impl = proxy.getImpl(cluster.getGroupId());
+      final MultiDataStreamStateMachine stateMachine = (MultiDataStreamStateMachine) impl.getStateMachine();
+      for (SingleDataStream s : stateMachine.getStreams()) {
+        Assert.assertNotNull(s.getLogEntry());
+      }
+    }
   }
 
-  void runTestDataStream(CLUSTER cluster, int numClients, int numStreams, int bufferSize, int bufferNum) {
-    final List<CompletableFuture<Void>> futures = new ArrayList<>();
+  Long runTestDataStream(CLUSTER cluster, int numClients, int numStreams, int bufferSize, int bufferNum) {
+    final List<CompletableFuture<Long>> futures = new ArrayList<>();
     for (int j = 0; j < numClients; j++) {
-      futures.add(CompletableFuture.runAsync(() -> runTestDataStream(cluster, numStreams, bufferSize, bufferNum), executor));
+      futures.add(CompletableFuture.supplyAsync(() -> runTestDataStream(cluster, numStreams, bufferSize, bufferNum), executor));
     }
     Assert.assertEquals(numClients, futures.size());
-    futures.forEach(CompletableFuture::join);
+    return futures.stream()
+        .map(CompletableFuture::join)
+        .max(Long::compareTo)
+        .orElseThrow(IllegalStateException::new);
   }
 
-  void runTestDataStream(CLUSTER cluster, int numStreams, int bufferSize, int bufferNum) {
+  long runTestDataStream(CLUSTER cluster, int numStreams, int bufferSize, int bufferNum) {
     final Iterable<RaftServer> servers = CollectionUtils.as(cluster.getServers(), s -> s);
-    final List<CompletableFuture<Void>> futures = new ArrayList<>();
+    final RaftPeerId leader = cluster.getLeader().getId();
+    final List<CompletableFuture<RaftClientReply>> futures = new ArrayList<>();
     try(RaftClient client = cluster.createClient()) {
       for (int i = 0; i < numStreams; i++) {
         final DataStreamOutputImpl out = (DataStreamOutputImpl) client.getDataStreamApi().stream();
-        futures.add(CompletableFuture.runAsync(() -> DataStreamTestUtils.writeAndCloseAndAssertReplies(
-            servers, out, bufferSize, bufferNum), executor));
+        futures.add(CompletableFuture.supplyAsync(() -> DataStreamTestUtils.writeAndCloseAndAssertReplies(
+            servers, leader, out, bufferSize, bufferNum).join(), executor));

Review comment:
       @runzhiwang ,  FYI, this is the bug in the test -- join() was missing so that the test can pass even if the returned CompletableFuture<RaftClientReply> might complete exceptionally.  It indeed always completed exceptionally since applyTransaction was not overridden.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org