You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ratis.apache.org by GitBox <gi...@apache.org> on 2020/11/11 20:40:19 UTC

[GitHub] [incubator-ratis] amaliujia opened a new pull request #272: RATIS-1130. Add E2E test for Ratis streaming based on MiniRaftCluster

amaliujia opened a new pull request #272:
URL: https://github.com/apache/incubator-ratis/pull/272


   ## What changes were proposed in this pull request?
   
   Add E2E test for Ratis streaming based on MiniRaftCluster
   
   ## What is the link to the Apache JIRA
   
   https://issues.apache.org/jira/browse/RATIS-1130
   
   ## How was this patch tested?
   
   UT


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #272: RATIS-1130. Add E2E test for Ratis streaming based on MiniRaftCluster

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #272:
URL: https://github.com/apache/incubator-ratis/pull/272#discussion_r521883718



##########
File path: ratis-server/src/test/java/org/apache/ratis/DataStreamTests.java
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis;
+
+import static org.apache.ratis.DataStreamTestUtils.initBuffer;
+import static org.apache.ratis.RaftTestUtil.waitForLeader;
+
+import org.apache.ratis.client.DataStreamOutputRpc;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.proto.RaftProtos;
+import org.apache.ratis.proto.RaftProtos.StateMachineLogEntryProto;
+import org.apache.ratis.protocol.DataStreamReply;
+import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.raftlog.RaftLog;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ThreadLocalRandom;
+
+public abstract class DataStreamTests <CLUSTER extends MiniRaftCluster> extends BaseTest
+    implements MiniRaftCluster.Factory.Get<CLUSTER> {
+
+  public static final int NUM_SERVERS = 3;
+  // TODO: change bufferSize and bufferNum configurable
+  private static int bufferSize = 1_000_000;
+  private static int bufferNum =  10;
+
+  @Test
+  public void testStreamWrites() throws Exception {
+    runWithNewCluster(NUM_SERVERS, this::testStreamWrites);
+  }
+
+  void testStreamWrites(CLUSTER cluster) throws Exception {
+    final RaftServerImpl leader = waitForLeader(cluster);
+    Assert.assertTrue(leader != null);
+
+    final RaftGroup raftGroup = cluster.getGroup();
+    final Collection<RaftPeer> peers = raftGroup.getPeers();
+    Assert.assertEquals(NUM_SERVERS, peers.size());
+    RaftPeer raftPeer = peers.iterator().next();
+
+    try (RaftClient client = cluster.createClient(raftPeer)) {
+      // send header
+      DataStreamOutputRpc dataStreamOutputRpc = (DataStreamOutputRpc) client.getDataStreamApi().stream();
+      final List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
+      final List<Integer> sizes = new ArrayList<>();
+
+      dataStreamOutputRpc.getHeaderFuture().join();
+
+      // send data
+      final int halfBufferSize = bufferSize / 2;
+      int dataSize = 0;
+      for(int i = 0; i < bufferNum; i++) {
+        final int size = halfBufferSize + ThreadLocalRandom.current().nextInt(halfBufferSize);
+        sizes.add(size);
+
+        final ByteBuffer bf = initBuffer(dataSize, size);
+        futures.add(dataStreamOutputRpc.writeAsync(bf));
+        dataSize += size;
+      }
+
+      for (int i = 0; i < futures.size(); i++) {

Review comment:
       should remove these join.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #272: RATIS-1130. Add E2E test for Ratis streaming based on MiniRaftCluster

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #272:
URL: https://github.com/apache/incubator-ratis/pull/272#discussion_r522508404



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -326,8 +327,11 @@ private StreamInfo newStreamInfo(ByteBuf buf) {
   }
 
   static long writeTo(ByteBuf buf, DataStream stream) {
-    final WritableByteChannel channel = stream.getWritableByteChannel();
     long byteWritten = 0;
+    if (stream == null) {

Review comment:
       We need existing test case. We need both unit test and integration Test.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] szetszwo commented on a change in pull request #272: RATIS-1130. Add E2E test for Ratis streaming based on MiniRaftCluster

Posted by GitBox <gi...@apache.org>.
szetszwo commented on a change in pull request #272:
URL: https://github.com/apache/incubator-ratis/pull/272#discussion_r521858513



##########
File path: ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamApi.java
##########
@@ -36,5 +38,5 @@
  */
 public interface DataStreamApi {
   /** Create a stream to write data. */
-  DataStreamOutput stream();
+  DataStreamOutputRpc stream();

Review comment:
       DataStreamApi is user API.  We want to hide the RPC API from users.  So, please don't make this change.
   
   We may cast it to DataStreamOutputRpc in the test.

##########
File path: ratis-server/src/main/java/org/apache/ratis/server/DisabledDataStreamServerFactory.java
##########
@@ -33,6 +34,11 @@ public DataStreamServerRpc newDataStreamServerRpc(RaftServer server) {
       @Override
       public void start() {}
 
+      @Override
+      public InetSocketAddress getInetSocketAddress() {
+        return InetSocketAddress.createUnresolved("0.0.0.0", 0);

Review comment:
       Is it okay to return null?

##########
File path: ratis-server/src/test/java/org/apache/ratis/DataStreamTestUtils.java
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis;
+
+import java.nio.ByteBuffer;
+import org.junit.Assert;
+
+public class DataStreamTestUtils {
+    public static final int MODULUS = 23;
+
+    public static byte pos2byte(int pos) {
+        return (byte) ('A' + pos%MODULUS);
+    }
+
+    public static ByteBuffer initBuffer(int offset, int size) {
+        final ByteBuffer buffer = ByteBuffer.allocateDirect(size);
+        final int length = buffer.capacity();
+        buffer.position(0).limit(length);
+        for (int j = 0; j < length; j++) {
+            buffer.put(pos2byte(offset + j));
+        }
+        buffer.flip();
+        Assert.assertEquals(length, buffer.remaining());
+        return buffer;
+    }

Review comment:
       2-space indentations please.

##########
File path: ratis-server/src/test/java/org/apache/ratis/DataStreamTests.java
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis;
+
+import static org.apache.ratis.DataStreamTestUtils.initBuffer;
+import static org.apache.ratis.RaftTestUtil.waitForLeader;
+
+import org.apache.ratis.client.DataStreamOutputRpc;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.proto.RaftProtos;
+import org.apache.ratis.proto.RaftProtos.StateMachineLogEntryProto;
+import org.apache.ratis.protocol.DataStreamReply;
+import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.raftlog.RaftLog;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ThreadLocalRandom;
+
+public abstract class DataStreamTests <CLUSTER extends MiniRaftCluster> extends BaseTest

Review comment:
       2-space indentation please.

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -326,8 +327,11 @@ private StreamInfo newStreamInfo(ByteBuf buf) {
   }
 
   static long writeTo(ByteBuf buf, DataStream stream) {
-    final WritableByteChannel channel = stream.getWritableByteChannel();
     long byteWritten = 0;
+    if (stream == null) {

Review comment:
       Why the stream could be null?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on pull request #272: RATIS-1130. Add E2E test for Ratis streaming based on MiniRaftCluster

Posted by GitBox <gi...@apache.org>.
amaliujia commented on pull request #272:
URL: https://github.com/apache/incubator-ratis/pull/272#issuecomment-727297362


   @runzhiwang CI has passed.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on pull request #272: RATIS-1130. Add E2E test for Ratis streaming based on MiniRaftCluster

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on pull request #272:
URL: https://github.com/apache/incubator-ratis/pull/272#issuecomment-726414647


   > @runzhiwang yes I agree. What I meant was this PR will be updated and CI will rerun. If there are problems at that time I will take a look. Right now as this PR is not finalized I will focus on code changes first.
   
   Sure, thanks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on pull request #272: RATIS-1130. Add E2E test for Ratis streaming based on MiniRaftCluster

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on pull request #272:
URL: https://github.com/apache/incubator-ratis/pull/272#issuecomment-727335434


   @szetszwo Could you help review it again ?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #272: RATIS-1130. Add E2E test for Ratis streaming based on MiniRaftCluster

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #272:
URL: https://github.com/apache/incubator-ratis/pull/272#discussion_r521891899



##########
File path: ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java
##########
@@ -59,6 +74,7 @@ protected RaftServerProxy newRaftServer(
       RaftPeerId id, StateMachine.Registry stateMachineRegistry , RaftGroup group,
       RaftProperties properties) throws IOException {
     NettyConfigKeys.Server.setPort(properties, getPort(id, group));
+    NettyConfigKeys.DataStream.setPort(properties, getDataStreamPort(id, group));

Review comment:
       What's the relationship between this propertis and the propertis in the following code, why need setPort twice?
   ![image](https://user-images.githubusercontent.com/51938049/98909630-0be7d280-24fd-11eb-88bc-0e2c58ba4a14.png)
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on pull request #272: RATIS-1130. Add E2E test for Ratis streaming based on MiniRaftCluster

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on pull request #272:
URL: https://github.com/apache/incubator-ratis/pull/272#issuecomment-725947503


   @amaliujia I think this PR related to the failed CI.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] szetszwo commented on a change in pull request #272: RATIS-1130. Add E2E test for Ratis streaming based on MiniRaftCluster

Posted by GitBox <gi...@apache.org>.
szetszwo commented on a change in pull request #272:
URL: https://github.com/apache/incubator-ratis/pull/272#discussion_r521897365



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -326,8 +327,11 @@ private StreamInfo newStreamInfo(ByteBuf buf) {
   }
 
   static long writeTo(ByteBuf buf, DataStream stream) {
-    final WritableByteChannel channel = stream.getWritableByteChannel();
     long byteWritten = 0;
+    if (stream == null) {

Review comment:
       MultiDataStreamStateMachine is static so that we can use it directly without refactoring.  If it is desirable, we may do the refactoring later.
   
   > ... I found that without this NULL check, this code will hit a NPE and the program will hang there. ...
   
   The NULL check is hiding but not fixing the problem.  For fixing the problem, we should return the NPE back to the client.  As mentioned previously, we need to work on the exception handling once the basic feature has been completed.

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -326,8 +327,11 @@ private StreamInfo newStreamInfo(ByteBuf buf) {
   }
 
   static long writeTo(ByteBuf buf, DataStream stream) {
-    final WritableByteChannel channel = stream.getWritableByteChannel();
     long byteWritten = 0;
+    if (stream == null) {

Review comment:
       If  the test coverage MiniRaftCluster becomes a super set of TestDataStreamNetty, we should remove TestDataStreamNetty.  This is an engineering process.  We start with TestDataStreamNetty and keep adding features.  Our goal is to make Streaming working with a real cluster.
   
   Let's keep both tests for a moment and review if we should remove TestDataStreamNetty later on.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on a change in pull request #272: RATIS-1130. Add E2E test for Ratis streaming based on MiniRaftCluster

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #272:
URL: https://github.com/apache/incubator-ratis/pull/272#discussion_r521878561



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -326,8 +327,11 @@ private StreamInfo newStreamInfo(ByteBuf buf) {
   }
 
   static long writeTo(ByteBuf buf, DataStream stream) {
-    final WritableByteChannel channel = stream.getWritableByteChannel();
     long byteWritten = 0;
+    if (stream == null) {

Review comment:
       Yeah that what I was thinking in https://issues.apache.org/jira/browse/RATIS-1130?focusedCommentId=17228841&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17228841
   
   Sure I can move `MultiDataStreamStateMachine` to make it reusable. Do you prefer I do that in another PR (as this PR is pretty large now) or just add that refactoring in this PR?
   
   Also adding a `NULL` is still useful. I found that without this NULL check, this code will hit a NPE and the program will hang there. If don't add this NULL check, then at least we should check the NPE and solve why program will hang.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on a change in pull request #272: RATIS-1130. Add E2E test for Ratis streaming based on MiniRaftCluster

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #272:
URL: https://github.com/apache/incubator-ratis/pull/272#discussion_r522455442



##########
File path: ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java
##########
@@ -59,6 +74,7 @@ protected RaftServerProxy newRaftServer(
       RaftPeerId id, StateMachine.Registry stateMachineRegistry , RaftGroup group,
       RaftProperties properties) throws IOException {
     NettyConfigKeys.Server.setPort(properties, getPort(id, group));
+    NettyConfigKeys.DataStream.setPort(properties, getDataStreamPort(id, group));

Review comment:
       In `TestDataStreamNetty`, MiniRaftCluster is not used. This line is setup the port for MiniRaftCluster. 
   
   I am asking above for whether we want to merge current tests to only use MiniRaftCluster.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on a change in pull request #272: RATIS-1130. Add E2E test for Ratis streaming based on MiniRaftCluster

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #272:
URL: https://github.com/apache/incubator-ratis/pull/272#discussion_r521878561



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -326,8 +327,11 @@ private StreamInfo newStreamInfo(ByteBuf buf) {
   }
 
   static long writeTo(ByteBuf buf, DataStream stream) {
-    final WritableByteChannel channel = stream.getWritableByteChannel();
     long byteWritten = 0;
+    if (stream == null) {

Review comment:
       Yeah that is what I was thinking in https://issues.apache.org/jira/browse/RATIS-1130?focusedCommentId=17228841&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17228841
   
   Sure I can move `MultiDataStreamStateMachine` to make it reusable. Do you prefer I do that in another PR (as this PR is pretty large now) or just add that refactoring in this PR?
   
   Also adding a `NULL` check is still useful. I found that without this NULL check, this code will hit a NPE and the program will hang there. If don't add this NULL check, then at least we should check the NPE and solve why program will hang (not for this specific change, but for general purpose in case a user pass in a null and then the program will hang, thus hard to debug)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on a change in pull request #272: RATIS-1130. Add E2E test for Ratis streaming based on MiniRaftCluster

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #272:
URL: https://github.com/apache/incubator-ratis/pull/272#discussion_r522541008



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -326,8 +327,11 @@ private StreamInfo newStreamInfo(ByteBuf buf) {
   }
 
   static long writeTo(ByteBuf buf, DataStream stream) {
-    final WritableByteChannel channel = stream.getWritableByteChannel();
     long byteWritten = 0;
+    if (stream == null) {

Review comment:
       I will hold off if we need the `NULL` check for now because it is not related to test itself. 
   
   
   Back to whether we need `MultiDataStreamStateMachine` for this MiniRaftCluster test, here is my question:
   
   If we enable `MultiDataStreamStateMachine` for MiniRaftCluster test, what is the difference between this MiniRaftCluster test and the existing `TestDataStreamNetty`? From the test coverage MiniRaftCluster will be a super set over `TestDataStreamNetty`, why we want to keep both?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #272: RATIS-1130. Add E2E test for Ratis streaming based on MiniRaftCluster

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #272:
URL: https://github.com/apache/incubator-ratis/pull/272#discussion_r521891899



##########
File path: ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java
##########
@@ -59,6 +74,7 @@ protected RaftServerProxy newRaftServer(
       RaftPeerId id, StateMachine.Registry stateMachineRegistry , RaftGroup group,
       RaftProperties properties) throws IOException {
     NettyConfigKeys.Server.setPort(properties, getPort(id, group));
+    NettyConfigKeys.DataStream.setPort(properties, getDataStreamPort(id, group));

Review comment:
       What's the relationship between this propertis and the propertis in the following code, why need setPort twice?
   ![image](https://user-images.githubusercontent.com/51938049/98908844-fcb45500-24fb-11eb-8be3-24957f4beb7b.png)
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on pull request #272: RATIS-1130. Add E2E test for Ratis streaming based on MiniRaftCluster

Posted by GitBox <gi...@apache.org>.
amaliujia commented on pull request #272:
URL: https://github.com/apache/incubator-ratis/pull/272#issuecomment-725654537


   R: @szetszwo @runzhiwang 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia closed pull request #272: RATIS-1130. Add E2E test for Ratis streaming based on MiniRaftCluster

Posted by GitBox <gi...@apache.org>.
amaliujia closed pull request #272:
URL: https://github.com/apache/incubator-ratis/pull/272


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on a change in pull request #272: RATIS-1130. Add E2E test for Ratis streaming based on MiniRaftCluster

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #272:
URL: https://github.com/apache/incubator-ratis/pull/272#discussion_r522356920



##########
File path: ratis-server/src/test/java/org/apache/ratis/DataStreamTests.java
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis;
+
+import static org.apache.ratis.DataStreamTestUtils.initBuffer;
+import static org.apache.ratis.RaftTestUtil.waitForLeader;
+
+import org.apache.ratis.client.DataStreamOutputRpc;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.proto.RaftProtos;
+import org.apache.ratis.proto.RaftProtos.StateMachineLogEntryProto;
+import org.apache.ratis.protocol.DataStreamReply;
+import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.raftlog.RaftLog;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ThreadLocalRandom;
+
+public abstract class DataStreamTests <CLUSTER extends MiniRaftCluster> extends BaseTest
+    implements MiniRaftCluster.Factory.Get<CLUSTER> {
+
+  public static final int NUM_SERVERS = 3;
+  // TODO: change bufferSize and bufferNum configurable
+  private static int bufferSize = 1_000_000;
+  private static int bufferNum =  10;
+
+  @Test
+  public void testStreamWrites() throws Exception {
+    runWithNewCluster(NUM_SERVERS, this::testStreamWrites);
+  }
+
+  void testStreamWrites(CLUSTER cluster) throws Exception {

Review comment:
       Agreed. But I will prefer to add that support in another PR.
   
   I am trying to make this PR as one that have a working MiniRaftCluster with a simple test to prove it. Then we can add more tests afterwards.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on pull request #272: RATIS-1130. Add E2E test for Ratis streaming based on MiniRaftCluster

Posted by GitBox <gi...@apache.org>.
amaliujia commented on pull request #272:
URL: https://github.com/apache/incubator-ratis/pull/272#issuecomment-726382209


   @runzhiwang 
   Tests can pass locally so the failed CL might not be related to this PR. If there is still issues after rerun I will definitely take a deeper look.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] szetszwo commented on a change in pull request #272: RATIS-1130. Add E2E test for Ratis streaming based on MiniRaftCluster

Posted by GitBox <gi...@apache.org>.
szetszwo commented on a change in pull request #272:
URL: https://github.com/apache/incubator-ratis/pull/272#discussion_r523403654



##########
File path: ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTests.java
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.datastream;
+
+import static org.apache.ratis.RaftTestUtil.waitForLeader;
+
+import org.apache.ratis.BaseTest;
+import org.apache.ratis.MiniRaftCluster;
+import org.apache.ratis.client.DataStreamOutputRpc;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.proto.RaftProtos;
+import org.apache.ratis.proto.RaftProtos.StateMachineLogEntryProto;
+import org.apache.ratis.protocol.DataStreamReply;
+import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.raftlog.RaftLog;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ThreadLocalRandom;
+
+public abstract class DataStreamTests <CLUSTER extends MiniRaftCluster> extends BaseTest
+    implements MiniRaftCluster.Factory.Get<CLUSTER> {
+
+  public static final int NUM_SERVERS = 3;
+  // TODO: change bufferSize and bufferNum configurable
+  private static int bufferSize = 1_000_000;
+  private static int bufferNum =  10;
+
+  @Test
+  public void testStreamWrites() throws Exception {
+    runWithNewCluster(NUM_SERVERS, this::testStreamWrites);
+  }
+
+  void testStreamWrites(CLUSTER cluster) throws Exception {
+    final RaftServerImpl leader = waitForLeader(cluster);
+    Assert.assertTrue(leader != null);
+
+    final RaftGroup raftGroup = cluster.getGroup();
+    final Collection<RaftPeer> peers = raftGroup.getPeers();
+    Assert.assertEquals(NUM_SERVERS, peers.size());
+    RaftPeer raftPeer = peers.iterator().next();
+
+    try (RaftClient client = cluster.createClient(raftPeer)) {
+      // send header
+      DataStreamOutputRpc dataStreamOutputRpc = (DataStreamOutputRpc) client.getDataStreamApi().stream();
+      final List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();

Review comment:
       futures is not used.  Please remove it.

##########
File path: ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTests.java
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.datastream;
+
+import static org.apache.ratis.RaftTestUtil.waitForLeader;
+
+import org.apache.ratis.BaseTest;
+import org.apache.ratis.MiniRaftCluster;
+import org.apache.ratis.client.DataStreamOutputRpc;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.proto.RaftProtos;
+import org.apache.ratis.proto.RaftProtos.StateMachineLogEntryProto;
+import org.apache.ratis.protocol.DataStreamReply;
+import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.raftlog.RaftLog;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ThreadLocalRandom;
+
+public abstract class DataStreamTests <CLUSTER extends MiniRaftCluster> extends BaseTest
+    implements MiniRaftCluster.Factory.Get<CLUSTER> {
+
+  public static final int NUM_SERVERS = 3;
+  // TODO: change bufferSize and bufferNum configurable
+  private static int bufferSize = 1_000_000;
+  private static int bufferNum =  10;
+
+  @Test
+  public void testStreamWrites() throws Exception {
+    runWithNewCluster(NUM_SERVERS, this::testStreamWrites);
+  }
+
+  void testStreamWrites(CLUSTER cluster) throws Exception {
+    final RaftServerImpl leader = waitForLeader(cluster);
+    Assert.assertTrue(leader != null);

Review comment:
       This can be removed since leader is never null.

##########
File path: ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
##########
@@ -269,6 +283,19 @@ public MiniRaftCluster initServers() {
     return this;
   }
 
+  private void initDataStreamServer() {
+    LOG.info("Setting up data stream servers");
+    for (RaftServerProxy serverProxy : servers.values()) {
+      serverProxy.getDataStreamServerRpc().addRaftPeers(getOtherRaftPeers(serverProxy.getId()));
+    }
+  }
+
+  private Collection<RaftPeer> getOtherRaftPeers(RaftPeerId id) {
+    HashMap peerMap = new HashMap<>(peers);
+    peerMap.remove(id);
+    return peerMap.values();

Review comment:
       Replace it with the following.
   ```suggestion
       return peers.values().stream().filter(p -> !p.getId().equals(id)).collect(Collectors.toList());
   ```

##########
File path: ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamWithNettyMiniRaftCluster.java
##########
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.datastream;
+
+import org.apache.ratis.RaftConfigKeys;
+import org.apache.ratis.datastream.DataStreamBaseTest.MultiDataStreamStateMachine;
+import org.apache.ratis.netty.MiniRaftClusterWithNetty;
+
+public class TestDataStreamWithNettyMiniRaftCluster extends DataStreamTests<MiniRaftClusterWithNetty>
+    implements MiniRaftClusterWithNetty.FactoryGet {
+  {
+    RaftConfigKeys.DataStream.setType(getProperties(), SupportedDataStreamType.NETTY);
+    setStateMachine(MultiDataStreamStateMachine.class);

Review comment:
       Move this to DataStreamTests since all subclasses need it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on a change in pull request #272: RATIS-1130. Add E2E test for Ratis streaming based on MiniRaftCluster

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #272:
URL: https://github.com/apache/incubator-ratis/pull/272#discussion_r521870154



##########
File path: ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamApi.java
##########
@@ -36,5 +38,5 @@
  */
 public interface DataStreamApi {
   /** Create a stream to write data. */
-  DataStreamOutput stream();
+  DataStreamOutputRpc stream();

Review comment:
       Got it. Changed this back.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on a change in pull request #272: RATIS-1130. Add E2E test for Ratis streaming based on MiniRaftCluster

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #272:
URL: https://github.com/apache/incubator-ratis/pull/272#discussion_r521878561



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -326,8 +327,11 @@ private StreamInfo newStreamInfo(ByteBuf buf) {
   }
 
   static long writeTo(ByteBuf buf, DataStream stream) {
-    final WritableByteChannel channel = stream.getWritableByteChannel();
     long byteWritten = 0;
+    if (stream == null) {

Review comment:
       Yeah that what I was thinking in https://issues.apache.org/jira/browse/RATIS-1130?focusedCommentId=17228841&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17228841
   
   Sure I can move `MultiDataStreamStateMachine` to make it reusable. Do you prefer I do that in another PR (as this PR is pretty large now) or just add that refactoring in this PR?
   
   Also adding a `NULL` check is still useful. I found that without this NULL check, this code will hit a NPE and the program will hang there. If don't add this NULL check, then at least we should check the NPE and solve why program will hang.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] szetszwo commented on a change in pull request #272: RATIS-1130. Add E2E test for Ratis streaming based on MiniRaftCluster

Posted by GitBox <gi...@apache.org>.
szetszwo commented on a change in pull request #272:
URL: https://github.com/apache/incubator-ratis/pull/272#discussion_r521871398



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -326,8 +327,11 @@ private StreamInfo newStreamInfo(ByteBuf buf) {
   }
 
   static long writeTo(ByteBuf buf, DataStream stream) {
-    final WritableByteChannel channel = stream.getWritableByteChannel();
     long byteWritten = 0;
+    if (stream == null) {

Review comment:
       We should override DataApi.stream(RaftClientRequest request) in the test.  Otherwise, how could it test the correctness?  We may use MultiDataStreamStateMachine in DataStreamBaseTest.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on a change in pull request #272: RATIS-1130. Add E2E test for Ratis streaming based on MiniRaftCluster

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #272:
URL: https://github.com/apache/incubator-ratis/pull/272#discussion_r522358037



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -326,8 +327,11 @@ private StreamInfo newStreamInfo(ByteBuf buf) {
   }
 
   static long writeTo(ByteBuf buf, DataStream stream) {
-    final WritableByteChannel channel = stream.getWritableByteChannel();
     long byteWritten = 0;
+    if (stream == null) {

Review comment:
       Another thought is, with MiniRaftCluster based test, do we still need existing test cases? Can we migrate all tests to MiniRaftCluster based?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on pull request #272: RATIS-1130. Add E2E test for Ratis streaming based on MiniRaftCluster

Posted by GitBox <gi...@apache.org>.
amaliujia commented on pull request #272:
URL: https://github.com/apache/incubator-ratis/pull/272#issuecomment-727172408


   @szetszwo @runzhiwang 
   
   Comments addressed. Can you take a look?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on a change in pull request #272: RATIS-1130. Add E2E test for Ratis streaming based on MiniRaftCluster

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #272:
URL: https://github.com/apache/incubator-ratis/pull/272#discussion_r523397236



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -326,8 +327,11 @@ private StreamInfo newStreamInfo(ByteBuf buf) {
   }
 
   static long writeTo(ByteBuf buf, DataStream stream) {
-    final WritableByteChannel channel = stream.getWritableByteChannel();
     long byteWritten = 0;
+    if (stream == null) {

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #272: RATIS-1130. Add E2E test for Ratis streaming based on MiniRaftCluster

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #272:
URL: https://github.com/apache/incubator-ratis/pull/272#discussion_r521884028



##########
File path: ratis-server/src/test/java/org/apache/ratis/DataStreamTests.java
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis;
+
+import static org.apache.ratis.DataStreamTestUtils.initBuffer;
+import static org.apache.ratis.RaftTestUtil.waitForLeader;
+
+import org.apache.ratis.client.DataStreamOutputRpc;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.proto.RaftProtos;
+import org.apache.ratis.proto.RaftProtos.StateMachineLogEntryProto;
+import org.apache.ratis.protocol.DataStreamReply;
+import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.raftlog.RaftLog;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ThreadLocalRandom;
+
+public abstract class DataStreamTests <CLUSTER extends MiniRaftCluster> extends BaseTest
+    implements MiniRaftCluster.Factory.Get<CLUSTER> {
+
+  public static final int NUM_SERVERS = 3;
+  // TODO: change bufferSize and bufferNum configurable
+  private static int bufferSize = 1_000_000;
+  private static int bufferNum =  10;
+
+  @Test
+  public void testStreamWrites() throws Exception {
+    runWithNewCluster(NUM_SERVERS, this::testStreamWrites);
+  }
+
+  void testStreamWrites(CLUSTER cluster) throws Exception {
+    final RaftServerImpl leader = waitForLeader(cluster);
+    Assert.assertTrue(leader != null);
+
+    final RaftGroup raftGroup = cluster.getGroup();
+    final Collection<RaftPeer> peers = raftGroup.getPeers();
+    Assert.assertEquals(NUM_SERVERS, peers.size());
+    RaftPeer raftPeer = peers.iterator().next();
+
+    try (RaftClient client = cluster.createClient(raftPeer)) {
+      // send header
+      DataStreamOutputRpc dataStreamOutputRpc = (DataStreamOutputRpc) client.getDataStreamApi().stream();
+      final List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
+      final List<Integer> sizes = new ArrayList<>();
+
+      dataStreamOutputRpc.getHeaderFuture().join();
+
+      // send data
+      final int halfBufferSize = bufferSize / 2;
+      int dataSize = 0;
+      for(int i = 0; i < bufferNum; i++) {
+        final int size = halfBufferSize + ThreadLocalRandom.current().nextInt(halfBufferSize);
+        sizes.add(size);

Review comment:
       useless `sizes` 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on a change in pull request #272: RATIS-1130. Add E2E test for Ratis streaming based on MiniRaftCluster

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #272:
URL: https://github.com/apache/incubator-ratis/pull/272#discussion_r521863203



##########
File path: ratis-server/src/main/java/org/apache/ratis/server/DisabledDataStreamServerFactory.java
##########
@@ -33,6 +34,11 @@ public DataStreamServerRpc newDataStreamServerRpc(RaftServer server) {
       @Override
       public void start() {}
 
+      @Override
+      public InetSocketAddress getInetSocketAddress() {
+        return InetSocketAddress.createUnresolved("0.0.0.0", 0);

Review comment:
       hmm good point. Let me try. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #272: RATIS-1130. Add E2E test for Ratis streaming based on MiniRaftCluster

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #272:
URL: https://github.com/apache/incubator-ratis/pull/272#discussion_r521883501



##########
File path: ratis-server/src/test/java/org/apache/ratis/DataStreamTests.java
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis;
+
+import static org.apache.ratis.DataStreamTestUtils.initBuffer;
+import static org.apache.ratis.RaftTestUtil.waitForLeader;
+
+import org.apache.ratis.client.DataStreamOutputRpc;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.proto.RaftProtos;
+import org.apache.ratis.proto.RaftProtos.StateMachineLogEntryProto;
+import org.apache.ratis.protocol.DataStreamReply;
+import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.raftlog.RaftLog;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ThreadLocalRandom;
+
+public abstract class DataStreamTests <CLUSTER extends MiniRaftCluster> extends BaseTest
+    implements MiniRaftCluster.Factory.Get<CLUSTER> {
+
+  public static final int NUM_SERVERS = 3;
+  // TODO: change bufferSize and bufferNum configurable
+  private static int bufferSize = 1_000_000;
+  private static int bufferNum =  10;
+
+  @Test
+  public void testStreamWrites() throws Exception {
+    runWithNewCluster(NUM_SERVERS, this::testStreamWrites);
+  }
+
+  void testStreamWrites(CLUSTER cluster) throws Exception {
+    final RaftServerImpl leader = waitForLeader(cluster);
+    Assert.assertTrue(leader != null);
+
+    final RaftGroup raftGroup = cluster.getGroup();
+    final Collection<RaftPeer> peers = raftGroup.getPeers();
+    Assert.assertEquals(NUM_SERVERS, peers.size());
+    RaftPeer raftPeer = peers.iterator().next();
+
+    try (RaftClient client = cluster.createClient(raftPeer)) {
+      // send header
+      DataStreamOutputRpc dataStreamOutputRpc = (DataStreamOutputRpc) client.getDataStreamApi().stream();
+      final List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
+      final List<Integer> sizes = new ArrayList<>();
+
+      dataStreamOutputRpc.getHeaderFuture().join();
+
+      // send data
+      final int halfBufferSize = bufferSize / 2;
+      int dataSize = 0;
+      for(int i = 0; i < bufferNum; i++) {
+        final int size = halfBufferSize + ThreadLocalRandom.current().nextInt(halfBufferSize);
+        sizes.add(size);
+
+        final ByteBuffer bf = initBuffer(dataSize, size);
+        futures.add(dataStreamOutputRpc.writeAsync(bf));
+        dataSize += size;
+      }
+
+      for (int i = 0; i < futures.size(); i++) {
+        futures.get(i).join();
+      }
+
+      // send close
+      dataStreamOutputRpc.closeAsync().join();
+
+      // send start transaction
+      dataStreamOutputRpc.startTransactionAsync().join();

Review comment:
       should remove this, closeAsync includes startTransactionAsync.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on a change in pull request #272: RATIS-1130. Add E2E test for Ratis streaming based on MiniRaftCluster

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #272:
URL: https://github.com/apache/incubator-ratis/pull/272#discussion_r521870042



##########
File path: ratis-server/src/test/java/org/apache/ratis/DataStreamTests.java
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis;
+
+import static org.apache.ratis.DataStreamTestUtils.initBuffer;
+import static org.apache.ratis.RaftTestUtil.waitForLeader;
+
+import org.apache.ratis.client.DataStreamOutputRpc;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.proto.RaftProtos;
+import org.apache.ratis.proto.RaftProtos.StateMachineLogEntryProto;
+import org.apache.ratis.protocol.DataStreamReply;
+import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.raftlog.RaftLog;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ThreadLocalRandom;
+
+public abstract class DataStreamTests <CLUSTER extends MiniRaftCluster> extends BaseTest

Review comment:
       Done.
   
   I tried to update my IDE config again to match the 2-space indentation. I have three devices so always mess up with which IDE is using which style. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #272: RATIS-1130. Add E2E test for Ratis streaming based on MiniRaftCluster

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #272:
URL: https://github.com/apache/incubator-ratis/pull/272#discussion_r522511203



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -326,8 +327,11 @@ private StreamInfo newStreamInfo(ByteBuf buf) {
   }
 
   static long writeTo(ByteBuf buf, DataStream stream) {
-    final WritableByteChannel channel = stream.getWritableByteChannel();
     long byteWritten = 0;
+    if (stream == null) {

Review comment:
       I  do not think we need to add a NULL check. if NULL happen in test, we should change the test to avoid this. Please think about it.

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -326,8 +327,11 @@ private StreamInfo newStreamInfo(ByteBuf buf) {
   }
 
   static long writeTo(ByteBuf buf, DataStream stream) {
-    final WritableByteChannel channel = stream.getWritableByteChannel();
     long byteWritten = 0;
+    if (stream == null) {

Review comment:
       I  do not think we need to add a NULL check. if NULL happen in test, we should change the test to avoid this. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on a change in pull request #272: RATIS-1130. Add E2E test for Ratis streaming based on MiniRaftCluster

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #272:
URL: https://github.com/apache/incubator-ratis/pull/272#discussion_r521878561



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -326,8 +327,11 @@ private StreamInfo newStreamInfo(ByteBuf buf) {
   }
 
   static long writeTo(ByteBuf buf, DataStream stream) {
-    final WritableByteChannel channel = stream.getWritableByteChannel();
     long byteWritten = 0;
+    if (stream == null) {

Review comment:
       Yeah that is what I was thinking in https://issues.apache.org/jira/browse/RATIS-1130?focusedCommentId=17228841&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17228841
   
   Sure I can move `MultiDataStreamStateMachine` to make it reusable. Do you prefer I do that in another PR (as this PR is pretty large now) or just add that refactoring in this PR?
   
   Also adding a `NULL` check is still useful. I found that without this NULL check, this code will hit a NPE and the program will hang there. If don't add this NULL check, then at least we should check the NPE and solve why program will hang.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia edited a comment on pull request #272: RATIS-1130. Add E2E test for Ratis streaming based on MiniRaftCluster

Posted by GitBox <gi...@apache.org>.
amaliujia edited a comment on pull request #272:
URL: https://github.com/apache/incubator-ratis/pull/272#issuecomment-726414311


   @runzhiwang  yes I agree. What I meant was this PR will be updated and CI will rerun. If there are problems at that time I will take a look. Right now as this PR is not finalized I will focus on code changes first.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on a change in pull request #272: RATIS-1130. Add E2E test for Ratis streaming based on MiniRaftCluster

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #272:
URL: https://github.com/apache/incubator-ratis/pull/272#discussion_r521862965



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -326,8 +327,11 @@ private StreamInfo newStreamInfo(ByteBuf buf) {
   }
 
   static long writeTo(ByteBuf buf, DataStream stream) {
-    final WritableByteChannel channel = stream.getWritableByteChannel();
     long byteWritten = 0;
+    if (stream == null) {

Review comment:
       In current test I am using BaseStateMachine, in which the stream api returns:
   
   https://github.com/apache/incubator-ratis/blob/f91ef63809b21ed65445f4a60d8005ce0a1af8ca/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java#L91




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] szetszwo merged pull request #272: RATIS-1130. Add E2E test for Ratis streaming based on MiniRaftCluster

Posted by GitBox <gi...@apache.org>.
szetszwo merged pull request #272:
URL: https://github.com/apache/incubator-ratis/pull/272


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #272: RATIS-1130. Add E2E test for Ratis streaming based on MiniRaftCluster

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #272:
URL: https://github.com/apache/incubator-ratis/pull/272#discussion_r521885595



##########
File path: ratis-server/src/test/java/org/apache/ratis/DataStreamTests.java
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis;
+
+import static org.apache.ratis.DataStreamTestUtils.initBuffer;
+import static org.apache.ratis.RaftTestUtil.waitForLeader;
+
+import org.apache.ratis.client.DataStreamOutputRpc;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.proto.RaftProtos;
+import org.apache.ratis.proto.RaftProtos.StateMachineLogEntryProto;
+import org.apache.ratis.protocol.DataStreamReply;
+import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.raftlog.RaftLog;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ThreadLocalRandom;
+
+public abstract class DataStreamTests <CLUSTER extends MiniRaftCluster> extends BaseTest
+    implements MiniRaftCluster.Factory.Get<CLUSTER> {
+
+  public static final int NUM_SERVERS = 3;
+  // TODO: change bufferSize and bufferNum configurable
+  private static int bufferSize = 1_000_000;
+  private static int bufferNum =  10;
+
+  @Test
+  public void testStreamWrites() throws Exception {
+    runWithNewCluster(NUM_SERVERS, this::testStreamWrites);
+  }
+
+  void testStreamWrites(CLUSTER cluster) throws Exception {

Review comment:
       Should test multi-client and multi-stream.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #272: RATIS-1130. Add E2E test for Ratis streaming based on MiniRaftCluster

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #272:
URL: https://github.com/apache/incubator-ratis/pull/272#discussion_r521884144



##########
File path: ratis-server/src/test/java/org/apache/ratis/DataStreamTests.java
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis;
+
+import static org.apache.ratis.DataStreamTestUtils.initBuffer;
+import static org.apache.ratis.RaftTestUtil.waitForLeader;
+
+import org.apache.ratis.client.DataStreamOutputRpc;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.proto.RaftProtos;
+import org.apache.ratis.proto.RaftProtos.StateMachineLogEntryProto;
+import org.apache.ratis.protocol.DataStreamReply;
+import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.raftlog.RaftLog;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ThreadLocalRandom;
+
+public abstract class DataStreamTests <CLUSTER extends MiniRaftCluster> extends BaseTest
+    implements MiniRaftCluster.Factory.Get<CLUSTER> {
+
+  public static final int NUM_SERVERS = 3;
+  // TODO: change bufferSize and bufferNum configurable
+  private static int bufferSize = 1_000_000;
+  private static int bufferNum =  10;
+
+  @Test
+  public void testStreamWrites() throws Exception {
+    runWithNewCluster(NUM_SERVERS, this::testStreamWrites);
+  }
+
+  void testStreamWrites(CLUSTER cluster) throws Exception {
+    final RaftServerImpl leader = waitForLeader(cluster);
+    Assert.assertTrue(leader != null);
+
+    final RaftGroup raftGroup = cluster.getGroup();
+    final Collection<RaftPeer> peers = raftGroup.getPeers();
+    Assert.assertEquals(NUM_SERVERS, peers.size());
+    RaftPeer raftPeer = peers.iterator().next();
+
+    try (RaftClient client = cluster.createClient(raftPeer)) {
+      // send header
+      DataStreamOutputRpc dataStreamOutputRpc = (DataStreamOutputRpc) client.getDataStreamApi().stream();
+      final List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
+      final List<Integer> sizes = new ArrayList<>();
+
+      dataStreamOutputRpc.getHeaderFuture().join();

Review comment:
       Do not need this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on pull request #272: RATIS-1130. Add E2E test for Ratis streaming based on MiniRaftCluster

Posted by GitBox <gi...@apache.org>.
amaliujia commented on pull request #272:
URL: https://github.com/apache/incubator-ratis/pull/272#issuecomment-726414311


   @runzhiwang  yes agreed. What I meant was this PR will be updated and CI will rerun. If there are problems at that time I will take a look. Right now as this PR is not finalized I will focus on code changes first.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #272: RATIS-1130. Add E2E test for Ratis streaming based on MiniRaftCluster

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #272:
URL: https://github.com/apache/incubator-ratis/pull/272#discussion_r521883718



##########
File path: ratis-server/src/test/java/org/apache/ratis/DataStreamTests.java
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis;
+
+import static org.apache.ratis.DataStreamTestUtils.initBuffer;
+import static org.apache.ratis.RaftTestUtil.waitForLeader;
+
+import org.apache.ratis.client.DataStreamOutputRpc;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.proto.RaftProtos;
+import org.apache.ratis.proto.RaftProtos.StateMachineLogEntryProto;
+import org.apache.ratis.protocol.DataStreamReply;
+import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.raftlog.RaftLog;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ThreadLocalRandom;
+
+public abstract class DataStreamTests <CLUSTER extends MiniRaftCluster> extends BaseTest
+    implements MiniRaftCluster.Factory.Get<CLUSTER> {
+
+  public static final int NUM_SERVERS = 3;
+  // TODO: change bufferSize and bufferNum configurable
+  private static int bufferSize = 1_000_000;
+  private static int bufferNum =  10;
+
+  @Test
+  public void testStreamWrites() throws Exception {
+    runWithNewCluster(NUM_SERVERS, this::testStreamWrites);
+  }
+
+  void testStreamWrites(CLUSTER cluster) throws Exception {
+    final RaftServerImpl leader = waitForLeader(cluster);
+    Assert.assertTrue(leader != null);
+
+    final RaftGroup raftGroup = cluster.getGroup();
+    final Collection<RaftPeer> peers = raftGroup.getPeers();
+    Assert.assertEquals(NUM_SERVERS, peers.size());
+    RaftPeer raftPeer = peers.iterator().next();
+
+    try (RaftClient client = cluster.createClient(raftPeer)) {
+      // send header
+      DataStreamOutputRpc dataStreamOutputRpc = (DataStreamOutputRpc) client.getDataStreamApi().stream();
+      final List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
+      final List<Integer> sizes = new ArrayList<>();
+
+      dataStreamOutputRpc.getHeaderFuture().join();
+
+      // send data
+      final int halfBufferSize = bufferSize / 2;
+      int dataSize = 0;
+      for(int i = 0; i < bufferNum; i++) {
+        final int size = halfBufferSize + ThreadLocalRandom.current().nextInt(halfBufferSize);
+        sizes.add(size);
+
+        final ByteBuffer bf = initBuffer(dataSize, size);
+        futures.add(dataStreamOutputRpc.writeAsync(bf));
+        dataSize += size;
+      }
+
+      for (int i = 0; i < futures.size(); i++) {

Review comment:
       should remove these join. closeAsync().join() finish means these already finish




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #272: RATIS-1130. Add E2E test for Ratis streaming based on MiniRaftCluster

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #272:
URL: https://github.com/apache/incubator-ratis/pull/272#discussion_r521878967



##########
File path: ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java
##########
@@ -47,6 +48,20 @@ public MiniRaftClusterWithNetty newCluster(String[] ids, RaftProperties prop) {
     }
   }
 
+  public interface FactoryGetWithDataStreamEnabled extends Factory.Get<MiniRaftClusterWithNetty> {

Review comment:
       try the following code, and delete `FactoryGetWithDataStreamEnabled`.
   ```
   public class TestDataStreamWithNetty extends DataStreamTests<MiniRaftClusterWithNetty>
       implements MiniRaftClusterWithNetty.FactoryGet {
     {
       RaftConfigKeys.DataStream.setType(getProperties(), SupportedDataStreamType.NETTY);
     }
   }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on pull request #272: RATIS-1130. Add E2E test for Ratis streaming based on MiniRaftCluster

Posted by GitBox <gi...@apache.org>.
amaliujia commented on pull request #272:
URL: https://github.com/apache/incubator-ratis/pull/272#issuecomment-725874161


   @szetszwo comment addressed.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on a change in pull request #272: RATIS-1130. Add E2E test for Ratis streaming based on MiniRaftCluster

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #272:
URL: https://github.com/apache/incubator-ratis/pull/272#discussion_r521869712



##########
File path: ratis-server/src/main/java/org/apache/ratis/server/DisabledDataStreamServerFactory.java
##########
@@ -33,6 +34,11 @@ public DataStreamServerRpc newDataStreamServerRpc(RaftServer server) {
       @Override
       public void start() {}
 
+      @Override
+      public InetSocketAddress getInetSocketAddress() {
+        return InetSocketAddress.createUnresolved("0.0.0.0", 0);

Review comment:
       `null` works. Have updated to using `null`

##########
File path: ratis-server/src/test/java/org/apache/ratis/DataStreamTestUtils.java
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis;
+
+import java.nio.ByteBuffer;
+import org.junit.Assert;
+
+public class DataStreamTestUtils {
+    public static final int MODULUS = 23;
+
+    public static byte pos2byte(int pos) {
+        return (byte) ('A' + pos%MODULUS);
+    }
+
+    public static ByteBuffer initBuffer(int offset, int size) {
+        final ByteBuffer buffer = ByteBuffer.allocateDirect(size);
+        final int length = buffer.capacity();
+        buffer.position(0).limit(length);
+        for (int j = 0; j < length; j++) {
+            buffer.put(pos2byte(offset + j));
+        }
+        buffer.flip();
+        Assert.assertEquals(length, buffer.remaining());
+        return buffer;
+    }

Review comment:
       Done. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on pull request #272: RATIS-1130. Add E2E test for Ratis streaming based on MiniRaftCluster

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on pull request #272:
URL: https://github.com/apache/incubator-ratis/pull/272#issuecomment-726411389


   > @runzhiwang
   > Tests can pass locally so the failed CL might not be related to this PR. If there is still issues after rerun I will definitely take a deeper look.
   
   @amaliujia Now we merge PR  when it pass CI.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #272: RATIS-1130. Add E2E test for Ratis streaming based on MiniRaftCluster

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #272:
URL: https://github.com/apache/incubator-ratis/pull/272#discussion_r521884028



##########
File path: ratis-server/src/test/java/org/apache/ratis/DataStreamTests.java
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis;
+
+import static org.apache.ratis.DataStreamTestUtils.initBuffer;
+import static org.apache.ratis.RaftTestUtil.waitForLeader;
+
+import org.apache.ratis.client.DataStreamOutputRpc;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.proto.RaftProtos;
+import org.apache.ratis.proto.RaftProtos.StateMachineLogEntryProto;
+import org.apache.ratis.protocol.DataStreamReply;
+import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.raftlog.RaftLog;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ThreadLocalRandom;
+
+public abstract class DataStreamTests <CLUSTER extends MiniRaftCluster> extends BaseTest
+    implements MiniRaftCluster.Factory.Get<CLUSTER> {
+
+  public static final int NUM_SERVERS = 3;
+  // TODO: change bufferSize and bufferNum configurable
+  private static int bufferSize = 1_000_000;
+  private static int bufferNum =  10;
+
+  @Test
+  public void testStreamWrites() throws Exception {
+    runWithNewCluster(NUM_SERVERS, this::testStreamWrites);
+  }
+
+  void testStreamWrites(CLUSTER cluster) throws Exception {
+    final RaftServerImpl leader = waitForLeader(cluster);
+    Assert.assertTrue(leader != null);
+
+    final RaftGroup raftGroup = cluster.getGroup();
+    final Collection<RaftPeer> peers = raftGroup.getPeers();
+    Assert.assertEquals(NUM_SERVERS, peers.size());
+    RaftPeer raftPeer = peers.iterator().next();
+
+    try (RaftClient client = cluster.createClient(raftPeer)) {
+      // send header
+      DataStreamOutputRpc dataStreamOutputRpc = (DataStreamOutputRpc) client.getDataStreamApi().stream();
+      final List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
+      final List<Integer> sizes = new ArrayList<>();
+
+      dataStreamOutputRpc.getHeaderFuture().join();
+
+      // send data
+      final int halfBufferSize = bufferSize / 2;
+      int dataSize = 0;
+      for(int i = 0; i < bufferNum; i++) {
+        final int size = halfBufferSize + ThreadLocalRandom.current().nextInt(halfBufferSize);
+        sizes.add(size);

Review comment:
       why need sizes ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on a change in pull request #272: RATIS-1130. Add E2E test for Ratis streaming based on MiniRaftCluster

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #272:
URL: https://github.com/apache/incubator-ratis/pull/272#discussion_r521862965



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -326,8 +327,11 @@ private StreamInfo newStreamInfo(ByteBuf buf) {
   }
 
   static long writeTo(ByteBuf buf, DataStream stream) {
-    final WritableByteChannel channel = stream.getWritableByteChannel();
     long byteWritten = 0;
+    if (stream == null) {

Review comment:
       In current test I am using BaseStateMachine, in which the  stream(request) returns:
   
   https://github.com/apache/incubator-ratis/blob/f91ef63809b21ed65445f4a60d8005ce0a1af8ca/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java#L91




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org