You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ru...@apache.org on 2020/11/22 23:36:15 UTC
[incubator-ratis] branch master updated: RATIS-1169. Move the
DataStream tests from TestDataStreamNetty to DataStreamClusterTests. (#293)
This is an automated email from the ASF dual-hosted git repository.
runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 83c613c RATIS-1169. Move the DataStream tests from TestDataStreamNetty to DataStreamClusterTests. (#293)
83c613c is described below
commit 83c613c6c2b287ef7767629081e1a1d024766d01
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Mon Nov 23 07:36:05 2020 +0800
RATIS-1169. Move the DataStream tests from TestDataStreamNetty to DataStreamClusterTests. (#293)
---
.../org/apache/ratis/util/CollectionUtils.java | 13 ++++
.../java/org/apache/ratis/MiniRaftCluster.java | 12 ++--
.../datastream/DataStreamAsyncClusterTests.java | 83 ++++++++++++++++++++++
.../ratis/datastream/DataStreamBaseTest.java | 80 ++-------------------
.../ratis/datastream/DataStreamTestUtils.java | 36 ++++++++++
.../TestNettyDataStreamWithGrpcCluster.java | 2 +-
...Netty.java => TestNettyDataStreamWithMock.java} | 14 +---
7 files changed, 147 insertions(+), 93 deletions(-)
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java
index ce78f52..39f932c 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java
@@ -68,6 +68,19 @@ public interface CollectionUtils {
return size == 0? null: list.get(ThreadLocalRandom.current().nextInt(size));
}
+ /** @return a randomly picked element. */
+ static <T> T random(Collection<T> elements) {
+ if (elements == null || elements.isEmpty()) {
+ return null;
+ }
+
+ final Iterator<T> i = elements.iterator();
+ for(int n = ThreadLocalRandom.current().nextInt(elements.size()); n > 0; n--) {
+ i.next();
+ }
+ return i.next();
+ }
+
static <INPUT, OUTPUT> Iterable<OUTPUT> as(
Iterable<INPUT> iteration, Function<INPUT, OUTPUT> converter) {
return () -> new Iterator<OUTPUT>() {
diff --git a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
index dff7640..395ad10 100644
--- a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
+++ b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
@@ -62,7 +62,6 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -439,15 +438,15 @@ public abstract class MiniRaftCluster implements Closeable {
startServers(newServers);
if (!startNewPeer) {
// start and then close, in order to bind the port
- newServers.forEach(p -> p.close());
+ newServers.forEach(RaftServerProxy::close);
}
final Collection<RaftPeer> newPeers = toRaftPeers(newServers);
- final RaftPeer[] np = newPeers.toArray(new RaftPeer[newPeers.size()]);
+ final RaftPeer[] np = newPeers.toArray(RaftPeer.emptyArray());
newPeers.addAll(group.getPeers());
- RaftPeer[] p = newPeers.toArray(new RaftPeer[newPeers.size()]);
+ RaftPeer[] p = newPeers.toArray(RaftPeer.emptyArray());
group = RaftGroup.valueOf(group.getGroupId(), p);
- return new PeerChanges(p, np, new RaftPeer[0]);
+ return new PeerChanges(p, np, RaftPeer.emptyArray());
}
static void startServers(Iterable<? extends RaftServer> servers) throws IOException {
@@ -707,6 +706,9 @@ public abstract class MiniRaftCluster implements Closeable {
}
public RaftClient createClient(RaftPeerId leaderId, RaftGroup group, RetryPolicy retryPolicy, RaftPeer primaryServer) {
+ if (primaryServer == null) {
+ primaryServer = CollectionUtils.random(group.getPeers());
+ }
RaftClient.Builder builder = RaftClient.newBuilder()
.setRaftGroup(group)
.setLeaderId(leaderId)
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamAsyncClusterTests.java b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamAsyncClusterTests.java
new file mode 100644
index 0000000..28aa30b
--- /dev/null
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamAsyncClusterTests.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.datastream;
+
+import org.apache.ratis.MiniRaftCluster;
+import org.apache.ratis.RaftTestUtil;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.client.impl.DataStreamClientImpl.DataStreamOutputImpl;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.util.CollectionUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+
+public abstract class DataStreamAsyncClusterTests<CLUSTER extends MiniRaftCluster>
+ extends DataStreamClusterTests<CLUSTER> {
+ final Executor executor = Executors.newFixedThreadPool(16);
+
+ @Test
+ public void testMultipleStreamsSingleServer() throws Exception {
+ runWithNewCluster(1, this::runTestDataStream);
+ }
+
+ @Test
+ public void testMultipleStreamsMultipleServers() throws Exception {
+ runWithNewCluster(3, this::runTestDataStream);
+ }
+
+ void runTestDataStream(CLUSTER cluster) throws Exception {
+ RaftTestUtil.waitForLeader(cluster);
+ final List<CompletableFuture<Void>> futures = new ArrayList<>();
+ futures.add(CompletableFuture.runAsync(() -> runTestDataStream(cluster, 5, 10, 1_000_000, 10), executor));
+ futures.add(CompletableFuture.runAsync(() -> runTestDataStream(cluster, 2, 20, 1_000, 10_000), executor));
+ futures.forEach(CompletableFuture::join);
+ }
+
+ void runTestDataStream(CLUSTER cluster, int numClients, int numStreams, int bufferSize, int bufferNum) {
+ final List<CompletableFuture<Void>> futures = new ArrayList<>();
+ for (int j = 0; j < numClients; j++) {
+ futures.add(CompletableFuture.runAsync(() -> runTestDataStream(cluster, numStreams, bufferSize, bufferNum), executor));
+ }
+ Assert.assertEquals(numClients, futures.size());
+ futures.forEach(CompletableFuture::join);
+ }
+
+ void runTestDataStream(CLUSTER cluster, int numStreams, int bufferSize, int bufferNum) {
+ final Iterable<RaftServer> servers = CollectionUtils.as(cluster.getServers(), s -> s);
+ final List<CompletableFuture<Void>> futures = new ArrayList<>();
+ try(RaftClient client = cluster.createClient()) {
+ for (int i = 0; i < numStreams; i++) {
+ final DataStreamOutputImpl out = (DataStreamOutputImpl) client.getDataStreamApi().stream();
+ futures.add(CompletableFuture.runAsync(() -> DataStreamTestUtils.writeAndCloseAndAssertReplies(
+ servers, out, bufferSize, bufferNum), executor));
+ }
+ Assert.assertEquals(numStreams, futures.size());
+ futures.forEach(CompletableFuture::join);
+ } catch (IOException e) {
+ throw new CompletionException(e);
+ }
+ }
+}
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
index 113fcb6..c08e1bf 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
@@ -25,7 +25,6 @@ import org.apache.ratis.client.impl.DataStreamClientImpl.DataStreamOutputImpl;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.datastream.DataStreamTestUtils.DataChannel;
import org.apache.ratis.datastream.DataStreamTestUtils.MultiDataStreamStateMachine;
-import org.apache.ratis.datastream.DataStreamTestUtils.SingleDataStream;
import org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto;
import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto;
@@ -58,6 +57,7 @@ import org.apache.ratis.server.impl.RaftServerTestUtil;
import org.apache.ratis.server.impl.ServerFactory;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.statemachine.StateMachine.StateMachineDataChannel;
+import org.apache.ratis.util.CollectionUtils;
import org.apache.ratis.util.LifeCycle;
import org.apache.ratis.util.NetUtils;
import org.junit.Assert;
@@ -68,11 +68,8 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
import java.util.stream.Collectors;
abstract class DataStreamBaseTest extends BaseTest {
@@ -120,8 +117,8 @@ abstract class DataStreamBaseTest extends BaseTest {
return peer;
}
- MyDivision getDivision(RaftGroupId groupId) throws IOException {
- return (MyDivision) raftServer.getDivision(groupId);
+ RaftServer getRaftServer() {
+ return raftServer;
}
void start() {
@@ -141,7 +138,6 @@ abstract class DataStreamBaseTest extends BaseTest {
private List<Server> servers;
private RaftGroup raftGroup;
- private final Executor executor = Executors.newFixedThreadPool(16);
Server getPrimaryServer() {
return servers.get(0);
@@ -348,41 +344,6 @@ abstract class DataStreamBaseTest extends BaseTest {
}
}
- void runTestDataStream(int numServers) throws Exception {
- try {
- setup(numServers);
- final List<CompletableFuture<Void>> futures = new ArrayList<>();
- futures.add(CompletableFuture.runAsync(() -> runTestDataStream(5, 10, 1_000_000, 10), executor));
- futures.add(CompletableFuture.runAsync(() -> runTestDataStream(2, 20, 1_000, 10_000), executor));
- futures.forEach(CompletableFuture::join);
- } finally {
- shutdown();
- }
- }
-
- void runTestDataStream(int numClients, int numStreams, int bufferSize, int bufferNum) {
- final List<CompletableFuture<Void>> futures = new ArrayList<>();
- for (int j = 0; j < numClients; j++) {
- futures.add(CompletableFuture.runAsync(() -> runTestDataStream(numStreams, bufferSize, bufferNum), executor));
- }
- Assert.assertEquals(numClients, futures.size());
- futures.forEach(CompletableFuture::join);
- }
-
- void runTestDataStream(int numStreams, int bufferSize, int bufferNum) {
- final List<CompletableFuture<Void>> futures = new ArrayList<>();
- try(RaftClient client = newRaftClientForDataStream()) {
- for (int i = 0; i < numStreams; i++) {
- futures.add(CompletableFuture.runAsync(() -> runTestDataStream(
- (DataStreamOutputImpl) client.getDataStreamApi().stream(), bufferSize, bufferNum), executor));
- }
- Assert.assertEquals(numStreams, futures.size());
- futures.forEach(CompletableFuture::join);
- } catch (IOException e) {
- throw new CompletionException(e);
- }
- }
-
void runTestMockCluster(ClientId clientId, int bufferSize, int bufferNum,
Exception expectedException, Exception headerException)
throws IOException {
@@ -397,7 +358,8 @@ abstract class DataStreamBaseTest extends BaseTest {
return;
}
- final RaftClientReply clientReply = runTestDataStream(out, bufferSize, bufferNum).join();
+ final RaftClientReply clientReply = DataStreamTestUtils.writeAndCloseAndAssertReplies(
+ CollectionUtils.as(servers, Server::getRaftServer), out, bufferSize, bufferNum).join();
if (expectedException != null) {
Assert.assertFalse(clientReply.isSuccess());
Assert.assertTrue(clientReply.getException().getMessage().contains(
@@ -407,36 +369,4 @@ abstract class DataStreamBaseTest extends BaseTest {
}
}
}
-
- CompletableFuture<RaftClientReply> runTestDataStream(DataStreamOutputImpl out, int bufferSize, int bufferNum) {
- LOG.info("start Stream{}", out.getHeader().getCallId());
- final int bytesWritten = DataStreamTestUtils.writeAndAssertReplies(out, bufferSize, bufferNum);
- try {
- for (Server s : servers) {
- assertHeader(s, out.getHeader(), bytesWritten);
- }
- } catch (Throwable e) {
- throw new CompletionException(e);
- }
-
- return out.closeAsync().thenCompose(reply -> DataStreamTestUtils.assertCloseReply(out, reply, bytesWritten));
- }
-
- void assertHeader(Server server, RaftClientRequest header, int dataSize) throws Exception {
- // check header
- Assert.assertEquals(raftGroup.getGroupId(), header.getRaftGroupId());
- Assert.assertEquals(RaftClientRequest.dataStreamRequestType(), header.getType());
-
- // check stream
- final MyDivision d = server.getDivision(header.getRaftGroupId());
- final SingleDataStream stream = d.getStateMachine().getSingleDataStream(header);
- final DataChannel channel = d.getStateMachine().getSingleDataStream(header).getWritableByteChannel();
- Assert.assertEquals(dataSize, channel.getBytesWritten());
- Assert.assertEquals(dataSize, channel.getForcedPosition());
-
- // check writeRequest
- final RaftClientRequest writeRequest = stream.getWriteRequest();
- Assert.assertEquals(RaftClientRequest.dataStreamRequestType(), writeRequest.getType());
- DataStreamTestUtils.assertRaftClientMessage(header, writeRequest);
- }
}
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java
index a834cd8..d8bd383 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java
@@ -31,6 +31,7 @@ import org.apache.ratis.protocol.RaftClientMessage;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
+import org.apache.ratis.server.RaftServer;
import org.apache.ratis.statemachine.StateMachine.DataStream;
import org.apache.ratis.statemachine.StateMachine.StateMachineDataChannel;
import org.apache.ratis.statemachine.impl.BaseStateMachine;
@@ -38,6 +39,7 @@ import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.util.JavaUtils;
import org.junit.Assert;
import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -49,6 +51,8 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadLocalRandom;
public interface DataStreamTestUtils {
+ Logger LOG = LoggerFactory.getLogger(DataStreamTestUtils.class);
+
ByteString MOCK = ByteString.copyFromUtf8("mock");
int MODULUS = 23;
@@ -198,6 +202,38 @@ public interface DataStreamTestUtils {
return dataSize;
}
+ static CompletableFuture<RaftClientReply> writeAndCloseAndAssertReplies(
+ Iterable<RaftServer> servers, DataStreamOutputImpl out, int bufferSize, int bufferNum) {
+ LOG.info("start Stream{}", out.getHeader().getCallId());
+ final int bytesWritten = writeAndAssertReplies(out, bufferSize, bufferNum);
+ try {
+ for (RaftServer s : servers) {
+ assertHeader(s, out.getHeader(), bytesWritten);
+ }
+ } catch (Throwable e) {
+ throw new CompletionException(e);
+ }
+
+ return out.closeAsync().thenCompose(reply -> assertCloseReply(out, reply, bytesWritten));
+ }
+
+ static void assertHeader(RaftServer server, RaftClientRequest header, int dataSize) throws Exception {
+ // check header
+ Assert.assertEquals(RaftClientRequest.dataStreamRequestType(), header.getType());
+
+ // check stream
+ final MultiDataStreamStateMachine stateMachine = (MultiDataStreamStateMachine) server.getDivision(header.getRaftGroupId()).getStateMachine();
+ final SingleDataStream stream = stateMachine.getSingleDataStream(header);
+ final DataChannel channel = stream.getWritableByteChannel();
+ Assert.assertEquals(dataSize, channel.getBytesWritten());
+ Assert.assertEquals(dataSize, channel.getForcedPosition());
+
+ // check writeRequest
+ final RaftClientRequest writeRequest = stream.getWriteRequest();
+ Assert.assertEquals(RaftClientRequest.dataStreamRequestType(), writeRequest.getType());
+ assertRaftClientMessage(header, writeRequest);
+ }
+
static CompletableFuture<RaftClientReply> assertCloseReply(DataStreamOutputImpl out, DataStreamReply dataStreamReply,
long bytesWritten) {
// Test close idempotent
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamWithGrpcCluster.java b/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamWithGrpcCluster.java
index ce5b25d..13121c3 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamWithGrpcCluster.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamWithGrpcCluster.java
@@ -18,6 +18,6 @@
package org.apache.ratis.datastream;
public class TestNettyDataStreamWithGrpcCluster
- extends DataStreamClusterTests<MiniRaftClusterWithRpcTypeGrpcAndDataStreamTypeNetty>
+ extends DataStreamAsyncClusterTests<MiniRaftClusterWithRpcTypeGrpcAndDataStreamTypeNetty>
implements MiniRaftClusterWithRpcTypeGrpcAndDataStreamTypeNetty.FactoryGet {
}
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamNetty.java b/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamWithMock.java
similarity index 96%
rename from ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamNetty.java
rename to ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamWithMock.java
index e9ce41e..16426a7 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamNetty.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamWithMock.java
@@ -50,7 +50,7 @@ import java.util.stream.Collectors;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-public class TestDataStreamNetty extends DataStreamBaseTest {
+public class TestNettyDataStreamWithMock extends DataStreamBaseTest {
static RaftPeer newRaftPeer(RaftServer server) {
final InetSocketAddress rpc = NetUtils.createLocalServerAddress();
final int dataStreamPort = NettyConfigKeys.DataStream.port(server.getProperties());
@@ -74,16 +74,6 @@ public class TestDataStreamNetty extends DataStreamBaseTest {
return super.newRaftServer(peer, p);
}
- @Test
- public void testDataStreamSingleServer() throws Exception {
- runTestDataStream(1);
- }
-
- @Test
- public void testDataStreamMultipleServer() throws Exception {
- runTestDataStream(3);
- }
-
private void testMockCluster(int leaderIndex, int numServers, RaftException leaderException,
Exception submitException) throws Exception {
testMockCluster(leaderIndex, numServers, leaderException, submitException, null);
@@ -146,7 +136,7 @@ public class TestDataStreamNetty extends DataStreamBaseTest {
Exception expectedException, Exception headerException) throws Exception {
try {
final List<RaftPeer> peers = raftServers.stream()
- .map(TestDataStreamNetty::newRaftPeer)
+ .map(TestNettyDataStreamWithMock::newRaftPeer)
.collect(Collectors.toList());
setup(groupId, peers, raftServers);
runTestMockCluster(clientId, bufferSize, bufferNum, expectedException, headerException);