You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ru...@apache.org on 2020/11/22 23:36:15 UTC

[incubator-ratis] branch master updated: RATIS-1169. Move the DataStream tests from TestDataStreamNetty to DataStreamClusterTests. (#293)

This is an automated email from the ASF dual-hosted git repository.

runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 83c613c  RATIS-1169. Move the DataStream tests from TestDataStreamNetty to DataStreamClusterTests. (#293)
83c613c is described below

commit 83c613c6c2b287ef7767629081e1a1d024766d01
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Mon Nov 23 07:36:05 2020 +0800

    RATIS-1169. Move the DataStream tests from TestDataStreamNetty to DataStreamClusterTests. (#293)
---
 .../org/apache/ratis/util/CollectionUtils.java     | 13 ++++
 .../java/org/apache/ratis/MiniRaftCluster.java     | 12 ++--
 .../datastream/DataStreamAsyncClusterTests.java    | 83 ++++++++++++++++++++++
 .../ratis/datastream/DataStreamBaseTest.java       | 80 ++-------------------
 .../ratis/datastream/DataStreamTestUtils.java      | 36 ++++++++++
 .../TestNettyDataStreamWithGrpcCluster.java        |  2 +-
 ...Netty.java => TestNettyDataStreamWithMock.java} | 14 +---
 7 files changed, 147 insertions(+), 93 deletions(-)

diff --git a/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java
index ce78f52..39f932c 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java
@@ -68,6 +68,19 @@ public interface CollectionUtils {
     return size == 0? null: list.get(ThreadLocalRandom.current().nextInt(size));
   }
 
+  /** @return a randomly picked element. */
+  static <T> T random(Collection<T> elements) {
+    if (elements == null || elements.isEmpty()) {
+      return null;
+    }
+
+    final Iterator<T> i = elements.iterator();
+    for(int n = ThreadLocalRandom.current().nextInt(elements.size()); n > 0; n--) {
+      i.next();
+    }
+    return i.next();
+  }
+
   static <INPUT, OUTPUT> Iterable<OUTPUT> as(
       Iterable<INPUT> iteration, Function<INPUT, OUTPUT> converter) {
     return () -> new Iterator<OUTPUT>() {
diff --git a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
index dff7640..395ad10 100644
--- a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
+++ b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
@@ -62,7 +62,6 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -439,15 +438,15 @@ public abstract class MiniRaftCluster implements Closeable {
     startServers(newServers);
     if (!startNewPeer) {
       // start and then close, in order to bind the port
-      newServers.forEach(p -> p.close());
+      newServers.forEach(RaftServerProxy::close);
     }
 
     final Collection<RaftPeer> newPeers = toRaftPeers(newServers);
-    final RaftPeer[] np = newPeers.toArray(new RaftPeer[newPeers.size()]);
+    final RaftPeer[] np = newPeers.toArray(RaftPeer.emptyArray());
     newPeers.addAll(group.getPeers());
-    RaftPeer[] p = newPeers.toArray(new RaftPeer[newPeers.size()]);
+    RaftPeer[] p = newPeers.toArray(RaftPeer.emptyArray());
     group = RaftGroup.valueOf(group.getGroupId(), p);
-    return new PeerChanges(p, np, new RaftPeer[0]);
+    return new PeerChanges(p, np, RaftPeer.emptyArray());
   }
 
   static void startServers(Iterable<? extends RaftServer> servers) throws IOException {
@@ -707,6 +706,9 @@ public abstract class MiniRaftCluster implements Closeable {
   }
 
   public RaftClient createClient(RaftPeerId leaderId, RaftGroup group, RetryPolicy retryPolicy, RaftPeer primaryServer) {
+    if (primaryServer == null) {
+      primaryServer = CollectionUtils.random(group.getPeers());
+    }
     RaftClient.Builder builder = RaftClient.newBuilder()
         .setRaftGroup(group)
         .setLeaderId(leaderId)
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamAsyncClusterTests.java b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamAsyncClusterTests.java
new file mode 100644
index 0000000..28aa30b
--- /dev/null
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamAsyncClusterTests.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.datastream;
+
+import org.apache.ratis.MiniRaftCluster;
+import org.apache.ratis.RaftTestUtil;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.client.impl.DataStreamClientImpl.DataStreamOutputImpl;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.util.CollectionUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+
+public abstract class DataStreamAsyncClusterTests<CLUSTER extends MiniRaftCluster>
+    extends DataStreamClusterTests<CLUSTER> {
+  final Executor executor = Executors.newFixedThreadPool(16);
+
+  @Test
+  public void testMultipleStreamsSingleServer() throws Exception {
+    runWithNewCluster(1, this::runTestDataStream);
+  }
+
+  @Test
+  public void testMultipleStreamsMultipleServers() throws Exception {
+    runWithNewCluster(3, this::runTestDataStream);
+  }
+
+  void runTestDataStream(CLUSTER cluster) throws Exception {
+    RaftTestUtil.waitForLeader(cluster);
+    final List<CompletableFuture<Void>> futures = new ArrayList<>();
+    futures.add(CompletableFuture.runAsync(() -> runTestDataStream(cluster, 5, 10, 1_000_000, 10), executor));
+    futures.add(CompletableFuture.runAsync(() -> runTestDataStream(cluster, 2, 20, 1_000, 10_000), executor));
+    futures.forEach(CompletableFuture::join);
+  }
+
+  void runTestDataStream(CLUSTER cluster, int numClients, int numStreams, int bufferSize, int bufferNum) {
+    final List<CompletableFuture<Void>> futures = new ArrayList<>();
+    for (int j = 0; j < numClients; j++) {
+      futures.add(CompletableFuture.runAsync(() -> runTestDataStream(cluster, numStreams, bufferSize, bufferNum), executor));
+    }
+    Assert.assertEquals(numClients, futures.size());
+    futures.forEach(CompletableFuture::join);
+  }
+
+  void runTestDataStream(CLUSTER cluster, int numStreams, int bufferSize, int bufferNum) {
+    final Iterable<RaftServer> servers = CollectionUtils.as(cluster.getServers(), s -> s);
+    final List<CompletableFuture<Void>> futures = new ArrayList<>();
+    try(RaftClient client = cluster.createClient()) {
+      for (int i = 0; i < numStreams; i++) {
+        final DataStreamOutputImpl out = (DataStreamOutputImpl) client.getDataStreamApi().stream();
+        futures.add(CompletableFuture.runAsync(() -> DataStreamTestUtils.writeAndCloseAndAssertReplies(
+            servers, out, bufferSize, bufferNum), executor));
+      }
+      Assert.assertEquals(numStreams, futures.size());
+      futures.forEach(CompletableFuture::join);
+    } catch (IOException e) {
+      throw new CompletionException(e);
+    }
+  }
+}
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
index 113fcb6..c08e1bf 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
@@ -25,7 +25,6 @@ import org.apache.ratis.client.impl.DataStreamClientImpl.DataStreamOutputImpl;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.datastream.DataStreamTestUtils.DataChannel;
 import org.apache.ratis.datastream.DataStreamTestUtils.MultiDataStreamStateMachine;
-import org.apache.ratis.datastream.DataStreamTestUtils.SingleDataStream;
 import org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
 import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto;
 import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto;
@@ -58,6 +57,7 @@ import org.apache.ratis.server.impl.RaftServerTestUtil;
 import org.apache.ratis.server.impl.ServerFactory;
 import org.apache.ratis.statemachine.StateMachine;
 import org.apache.ratis.statemachine.StateMachine.StateMachineDataChannel;
+import org.apache.ratis.util.CollectionUtils;
 import org.apache.ratis.util.LifeCycle;
 import org.apache.ratis.util.NetUtils;
 import org.junit.Assert;
@@ -68,11 +68,8 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
 import java.util.stream.Collectors;
 
 abstract class DataStreamBaseTest extends BaseTest {
@@ -120,8 +117,8 @@ abstract class DataStreamBaseTest extends BaseTest {
       return peer;
     }
 
-    MyDivision getDivision(RaftGroupId groupId) throws IOException {
-      return (MyDivision) raftServer.getDivision(groupId);
+    RaftServer getRaftServer() {
+      return raftServer;
     }
 
     void start() {
@@ -141,7 +138,6 @@ abstract class DataStreamBaseTest extends BaseTest {
 
   private List<Server> servers;
   private RaftGroup raftGroup;
-  private final Executor executor = Executors.newFixedThreadPool(16);
 
   Server getPrimaryServer() {
     return servers.get(0);
@@ -348,41 +344,6 @@ abstract class DataStreamBaseTest extends BaseTest {
     }
   }
 
-  void runTestDataStream(int numServers) throws Exception {
-    try {
-      setup(numServers);
-      final List<CompletableFuture<Void>> futures = new ArrayList<>();
-      futures.add(CompletableFuture.runAsync(() -> runTestDataStream(5, 10, 1_000_000, 10), executor));
-      futures.add(CompletableFuture.runAsync(() -> runTestDataStream(2, 20, 1_000, 10_000), executor));
-      futures.forEach(CompletableFuture::join);
-    } finally {
-      shutdown();
-    }
-  }
-
-  void runTestDataStream(int numClients, int numStreams, int bufferSize, int bufferNum) {
-    final List<CompletableFuture<Void>> futures = new ArrayList<>();
-    for (int j = 0; j < numClients; j++) {
-      futures.add(CompletableFuture.runAsync(() -> runTestDataStream(numStreams, bufferSize, bufferNum), executor));
-    }
-    Assert.assertEquals(numClients, futures.size());
-    futures.forEach(CompletableFuture::join);
-  }
-
-  void runTestDataStream(int numStreams, int bufferSize, int bufferNum) {
-    final List<CompletableFuture<Void>> futures = new ArrayList<>();
-    try(RaftClient client = newRaftClientForDataStream()) {
-      for (int i = 0; i < numStreams; i++) {
-        futures.add(CompletableFuture.runAsync(() -> runTestDataStream(
-            (DataStreamOutputImpl) client.getDataStreamApi().stream(), bufferSize, bufferNum), executor));
-      }
-      Assert.assertEquals(numStreams, futures.size());
-      futures.forEach(CompletableFuture::join);
-    } catch (IOException e) {
-      throw new CompletionException(e);
-    }
-  }
-
   void runTestMockCluster(ClientId clientId, int bufferSize, int bufferNum,
       Exception expectedException, Exception headerException)
       throws IOException {
@@ -397,7 +358,8 @@ abstract class DataStreamBaseTest extends BaseTest {
         return;
       }
 
-      final RaftClientReply clientReply = runTestDataStream(out, bufferSize, bufferNum).join();
+      final RaftClientReply clientReply = DataStreamTestUtils.writeAndCloseAndAssertReplies(
+          CollectionUtils.as(servers, Server::getRaftServer), out, bufferSize, bufferNum).join();
       if (expectedException != null) {
         Assert.assertFalse(clientReply.isSuccess());
         Assert.assertTrue(clientReply.getException().getMessage().contains(
@@ -407,36 +369,4 @@ abstract class DataStreamBaseTest extends BaseTest {
       }
     }
   }
-
-  CompletableFuture<RaftClientReply> runTestDataStream(DataStreamOutputImpl out, int bufferSize, int bufferNum) {
-    LOG.info("start Stream{}", out.getHeader().getCallId());
-    final int bytesWritten = DataStreamTestUtils.writeAndAssertReplies(out, bufferSize, bufferNum);
-    try {
-      for (Server s : servers) {
-        assertHeader(s, out.getHeader(), bytesWritten);
-      }
-    } catch (Throwable e) {
-      throw new CompletionException(e);
-    }
-
-    return out.closeAsync().thenCompose(reply -> DataStreamTestUtils.assertCloseReply(out, reply, bytesWritten));
-  }
-
-  void assertHeader(Server server, RaftClientRequest header, int dataSize) throws Exception {
-    // check header
-    Assert.assertEquals(raftGroup.getGroupId(), header.getRaftGroupId());
-    Assert.assertEquals(RaftClientRequest.dataStreamRequestType(), header.getType());
-
-    // check stream
-    final MyDivision d = server.getDivision(header.getRaftGroupId());
-    final SingleDataStream stream = d.getStateMachine().getSingleDataStream(header);
-    final DataChannel channel = d.getStateMachine().getSingleDataStream(header).getWritableByteChannel();
-    Assert.assertEquals(dataSize, channel.getBytesWritten());
-    Assert.assertEquals(dataSize, channel.getForcedPosition());
-
-    // check writeRequest
-    final RaftClientRequest writeRequest = stream.getWriteRequest();
-    Assert.assertEquals(RaftClientRequest.dataStreamRequestType(), writeRequest.getType());
-    DataStreamTestUtils.assertRaftClientMessage(header, writeRequest);
-  }
 }
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java
index a834cd8..d8bd383 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java
@@ -31,6 +31,7 @@ import org.apache.ratis.protocol.RaftClientMessage;
 import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
+import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.statemachine.StateMachine.DataStream;
 import org.apache.ratis.statemachine.StateMachine.StateMachineDataChannel;
 import org.apache.ratis.statemachine.impl.BaseStateMachine;
@@ -38,6 +39,7 @@ import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.apache.ratis.util.JavaUtils;
 import org.junit.Assert;
 import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -49,6 +51,8 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ThreadLocalRandom;
 
 public interface DataStreamTestUtils {
+  Logger LOG = LoggerFactory.getLogger(DataStreamTestUtils.class);
+
   ByteString MOCK = ByteString.copyFromUtf8("mock");
   int MODULUS = 23;
 
@@ -198,6 +202,38 @@ public interface DataStreamTestUtils {
     return dataSize;
   }
 
+  static CompletableFuture<RaftClientReply> writeAndCloseAndAssertReplies(
+      Iterable<RaftServer> servers, DataStreamOutputImpl out, int bufferSize, int bufferNum) {
+    LOG.info("start Stream{}", out.getHeader().getCallId());
+    final int bytesWritten = writeAndAssertReplies(out, bufferSize, bufferNum);
+    try {
+      for (RaftServer s : servers) {
+        assertHeader(s, out.getHeader(), bytesWritten);
+      }
+    } catch (Throwable e) {
+      throw new CompletionException(e);
+    }
+
+    return out.closeAsync().thenCompose(reply -> assertCloseReply(out, reply, bytesWritten));
+  }
+
+  static void assertHeader(RaftServer server, RaftClientRequest header, int dataSize) throws Exception {
+    // check header
+    Assert.assertEquals(RaftClientRequest.dataStreamRequestType(), header.getType());
+
+    // check stream
+    final MultiDataStreamStateMachine stateMachine = (MultiDataStreamStateMachine) server.getDivision(header.getRaftGroupId()).getStateMachine();
+    final SingleDataStream stream = stateMachine.getSingleDataStream(header);
+    final DataChannel channel = stream.getWritableByteChannel();
+    Assert.assertEquals(dataSize, channel.getBytesWritten());
+    Assert.assertEquals(dataSize, channel.getForcedPosition());
+
+    // check writeRequest
+    final RaftClientRequest writeRequest = stream.getWriteRequest();
+    Assert.assertEquals(RaftClientRequest.dataStreamRequestType(), writeRequest.getType());
+    assertRaftClientMessage(header, writeRequest);
+  }
+
   static CompletableFuture<RaftClientReply> assertCloseReply(DataStreamOutputImpl out, DataStreamReply dataStreamReply,
       long bytesWritten) {
     // Test close idempotent
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamWithGrpcCluster.java b/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamWithGrpcCluster.java
index ce5b25d..13121c3 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamWithGrpcCluster.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamWithGrpcCluster.java
@@ -18,6 +18,6 @@
 package org.apache.ratis.datastream;
 
 public class TestNettyDataStreamWithGrpcCluster
-    extends DataStreamClusterTests<MiniRaftClusterWithRpcTypeGrpcAndDataStreamTypeNetty>
+    extends DataStreamAsyncClusterTests<MiniRaftClusterWithRpcTypeGrpcAndDataStreamTypeNetty>
     implements MiniRaftClusterWithRpcTypeGrpcAndDataStreamTypeNetty.FactoryGet {
 }
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamNetty.java b/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamWithMock.java
similarity index 96%
rename from ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamNetty.java
rename to ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamWithMock.java
index e9ce41e..16426a7 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamNetty.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamWithMock.java
@@ -50,7 +50,7 @@ import java.util.stream.Collectors;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
-public class TestDataStreamNetty extends DataStreamBaseTest {
+public class TestNettyDataStreamWithMock extends DataStreamBaseTest {
   static RaftPeer newRaftPeer(RaftServer server) {
     final InetSocketAddress rpc = NetUtils.createLocalServerAddress();
     final int dataStreamPort = NettyConfigKeys.DataStream.port(server.getProperties());
@@ -74,16 +74,6 @@ public class TestDataStreamNetty extends DataStreamBaseTest {
     return super.newRaftServer(peer, p);
   }
 
-  @Test
-  public void testDataStreamSingleServer() throws Exception {
-    runTestDataStream(1);
-  }
-
-  @Test
-  public void testDataStreamMultipleServer() throws Exception {
-    runTestDataStream(3);
-  }
-
   private void testMockCluster(int leaderIndex, int numServers, RaftException leaderException,
       Exception submitException) throws Exception {
     testMockCluster(leaderIndex, numServers, leaderException, submitException, null);
@@ -146,7 +136,7 @@ public class TestDataStreamNetty extends DataStreamBaseTest {
       Exception expectedException, Exception headerException) throws Exception {
     try {
       final List<RaftPeer> peers = raftServers.stream()
-          .map(TestDataStreamNetty::newRaftPeer)
+          .map(TestNettyDataStreamWithMock::newRaftPeer)
           .collect(Collectors.toList());
       setup(groupId, peers, raftServers);
       runTestMockCluster(clientId, bufferSize, bufferNum, expectedException, headerException);