You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2020/12/02 11:55:17 UTC
[incubator-ratis] branch master updated: RATIS-1174. Test leader
change when close stream (#310)
This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new d15a3e4 RATIS-1174. Test leader change when close stream (#310)
d15a3e4 is described below
commit d15a3e452e55fce2062b53658cd3bee14f898a47
Author: runzhiwang <51...@users.noreply.github.com>
AuthorDate: Wed Dec 2 19:55:08 2020 +0800
RATIS-1174. Test leader change when close stream (#310)
---
.../datastream/DataStreamAsyncClusterTests.java | 42 ++++++++++++++++++----
.../ratis/datastream/DataStreamBaseTest.java | 2 +-
.../ratis/datastream/DataStreamTestUtils.java | 29 +++++++++------
3 files changed, 55 insertions(+), 18 deletions(-)
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamAsyncClusterTests.java b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamAsyncClusterTests.java
index 8a19e10..869da26 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamAsyncClusterTests.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamAsyncClusterTests.java
@@ -69,20 +69,46 @@ public abstract class DataStreamAsyncClusterTests<CLUSTER extends MiniRaftCluste
RaftServerConfigKeys.Rpc.setTimeoutMax(getProperties(), max);
}
+ @Test
+ public void testMultipleStreamsMultipleServersStepDownLeader() throws Exception {
+ runWithNewCluster(3, this::runTestDataStreamStepDownLeader);
+ }
+
+ void runTestDataStreamStepDownLeader(CLUSTER cluster) throws Exception {
+ runTestDataStream(cluster, true);
+ }
+
void runTestDataStream(CLUSTER cluster) throws Exception {
+ runTestDataStream(cluster, false);
+ }
+
+ void runTestDataStream(CLUSTER cluster, boolean stepDownLeader) throws Exception {
RaftTestUtil.waitForLeader(cluster);
final List<CompletableFuture<Long>> futures = new ArrayList<>();
- futures.add(CompletableFuture.supplyAsync(() -> runTestDataStream(cluster, 5, 10, 1_000_000, 10), executor));
- futures.add(CompletableFuture.supplyAsync(() -> runTestDataStream(cluster, 2, 20, 1_000, 10_000), executor));
+ futures.add(CompletableFuture.supplyAsync(() -> runTestDataStream(cluster, 5, 10, 1_000_000, 10, stepDownLeader), executor));
+ futures.add(CompletableFuture.supplyAsync(() -> runTestDataStream(cluster, 2, 20, 1_000, 10_000, stepDownLeader), executor));
final long maxIndex = futures.stream()
.map(CompletableFuture::join)
.max(Long::compareTo)
.orElseThrow(IllegalStateException::new);
+ if (stepDownLeader) {
+ final RaftPeerId oldLeader = cluster.getLeader().getId();
+ final CompletableFuture<RaftPeerId> changeLeader = futures.get(0).thenApplyAsync(dummy -> {
+ try {
+ return RaftTestUtil.changeLeader(cluster, oldLeader);
+ } catch (Exception e) {
+ throw new CompletionException("Failed to change leader from " + oldLeader, e);
+ }
+ });
+ LOG.info("Changed leader from {} to {}", oldLeader, changeLeader.join());
+ }
+
// wait for all servers to catch up
try (RaftClient client = cluster.createClient()) {
- client.async().watch(maxIndex, ReplicationLevel.ALL).join();
+ RaftClientReply reply = client.async().watch(maxIndex, ReplicationLevel.ALL).join();
+ Assert.assertTrue(reply.isSuccess());
}
// assert all streams are linked
for (RaftServer proxy : cluster.getServers()) {
@@ -95,10 +121,12 @@ public abstract class DataStreamAsyncClusterTests<CLUSTER extends MiniRaftCluste
}
}
- Long runTestDataStream(CLUSTER cluster, int numClients, int numStreams, int bufferSize, int bufferNum) {
+ Long runTestDataStream(
+ CLUSTER cluster, int numClients, int numStreams, int bufferSize, int bufferNum, boolean stepDownLeader) {
final List<CompletableFuture<Long>> futures = new ArrayList<>();
for (int j = 0; j < numClients; j++) {
- futures.add(CompletableFuture.supplyAsync(() -> runTestDataStream(cluster, numStreams, bufferSize, bufferNum), executor));
+ futures.add(CompletableFuture.supplyAsync(
+ () -> runTestDataStream(cluster, numStreams, bufferSize, bufferNum, stepDownLeader), executor));
}
Assert.assertEquals(numClients, futures.size());
return futures.stream()
@@ -111,7 +139,7 @@ public abstract class DataStreamAsyncClusterTests<CLUSTER extends MiniRaftCluste
return cluster.getDivision(primary.getId()).getRaftClient().getId();
}
- long runTestDataStream(CLUSTER cluster, int numStreams, int bufferSize, int bufferNum) {
+ long runTestDataStream(CLUSTER cluster, int numStreams, int bufferSize, int bufferNum, boolean stepDownLeader) {
final Iterable<RaftServer> servers = CollectionUtils.as(cluster.getServers(), s -> s);
final RaftPeerId leader = cluster.getLeader().getId();
final List<CompletableFuture<RaftClientReply>> futures = new ArrayList<>();
@@ -121,7 +149,7 @@ public abstract class DataStreamAsyncClusterTests<CLUSTER extends MiniRaftCluste
for (int i = 0; i < numStreams; i++) {
final DataStreamOutputImpl out = (DataStreamOutputImpl) client.getDataStreamApi().stream();
futures.add(CompletableFuture.supplyAsync(() -> DataStreamTestUtils.writeAndCloseAndAssertReplies(
- servers, leader, out, bufferSize, bufferNum, primaryClientId).join(), executor));
+ servers, leader, out, bufferSize, bufferNum, primaryClientId, cluster, stepDownLeader).join(), executor));
}
Assert.assertEquals(numStreams, futures.size());
return futures.stream()
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
index b8d7dd3..c504d95 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
@@ -391,7 +391,7 @@ abstract class DataStreamBaseTest extends BaseTest {
final RaftClientReply clientReply = DataStreamTestUtils.writeAndCloseAndAssertReplies(
CollectionUtils.as(servers, Server::getRaftServer), null, out, bufferSize, bufferNum,
- getPrimaryClientId()).join();
+ getPrimaryClientId(), null, false).join();
if (expectedException != null) {
Assert.assertFalse(clientReply.isSuccess());
Assert.assertTrue(clientReply.getException().getMessage().contains(
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java
index 3131ee7..673eab8 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java
@@ -18,6 +18,8 @@
package org.apache.ratis.datastream;
import org.apache.ratis.BaseTest;
+import org.apache.ratis.RaftTestUtil;
+import org.apache.ratis.server.impl.MiniRaftCluster;
import org.apache.ratis.client.impl.ClientProtoUtils;
import org.apache.ratis.client.impl.DataStreamClientImpl.DataStreamOutputImpl;
import org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
@@ -64,6 +66,7 @@ import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
public interface DataStreamTestUtils {
Logger LOG = LoggerFactory.getLogger(DataStreamTestUtils.class);
@@ -286,22 +289,24 @@ public interface DataStreamTestUtils {
static CompletableFuture<RaftClientReply> writeAndCloseAndAssertReplies(
Iterable<RaftServer> servers, RaftPeerId leader, DataStreamOutputImpl out, int bufferSize, int bufferNum,
- ClientId primaryClientId) {
+ ClientId primaryClientId, MiniRaftCluster cluster, boolean stepDownLeader) {
LOG.info("start Stream{}", out.getHeader().getCallId());
final int bytesWritten = writeAndAssertReplies(out, bufferSize, bufferNum);
try {
for (RaftServer s : servers) {
- assertHeader(s, out.getHeader(), bytesWritten);
+ assertHeader(s, out.getHeader(), bytesWritten, stepDownLeader);
}
} catch (Throwable e) {
throw new CompletionException(e);
}
LOG.info("Stream{}: bytesWritten={}", out.getHeader().getCallId(), bytesWritten);
- return out.closeAsync().thenCompose(reply -> assertCloseReply(out, reply, bytesWritten, leader, primaryClientId));
+ return out.closeAsync().thenCompose(
+ reply -> assertCloseReply(out, reply, bytesWritten, leader, primaryClientId, cluster, stepDownLeader));
}
- static void assertHeader(RaftServer server, RaftClientRequest header, int dataSize) throws Exception {
+ static void assertHeader(RaftServer server, RaftClientRequest header, int dataSize, boolean stepDownLeader)
+ throws Exception {
// check header
Assert.assertEquals(RaftClientRequest.dataStreamRequestType(), header.getType());
@@ -315,11 +320,11 @@ public interface DataStreamTestUtils {
// check writeRequest
final RaftClientRequest writeRequest = stream.getWriteRequest();
Assert.assertEquals(RaftClientRequest.dataStreamRequestType(), writeRequest.getType());
- assertRaftClientMessage(header, null, writeRequest, header.getClientId());
+ assertRaftClientMessage(header, null, writeRequest, header.getClientId(), stepDownLeader);
}
static CompletableFuture<RaftClientReply> assertCloseReply(DataStreamOutputImpl out, DataStreamReply dataStreamReply,
- long bytesWritten, RaftPeerId leader, ClientId primaryClientId) {
+ long bytesWritten, RaftPeerId leader, ClientId primaryClientId, MiniRaftCluster cluster, boolean stepDownLeader) {
// Test close idempotent
Assert.assertSame(dataStreamReply, out.closeAsync().join());
BaseTest.testFailureCase("writeAsync should fail",
@@ -329,7 +334,7 @@ public interface DataStreamTestUtils {
final DataStreamReplyByteBuffer buffer = (DataStreamReplyByteBuffer) dataStreamReply;
try {
final RaftClientReply reply = ClientProtoUtils.toRaftClientReply(buffer.slice());
- assertRaftClientMessage(out.getHeader(), leader, reply, primaryClientId);
+ assertRaftClientMessage(out.getHeader(), leader, reply, primaryClientId, stepDownLeader);
if (reply.isSuccess()) {
final ByteString bytes = reply.getMessage().getContent();
if (!bytes.equals(MOCK)) {
@@ -344,10 +349,14 @@ public interface DataStreamTestUtils {
}
static void assertRaftClientMessage(
- RaftClientMessage expected, RaftPeerId expectedServerId, RaftClientMessage computed, ClientId expectedClientId) {
+ RaftClientMessage expected, RaftPeerId expectedServerId, RaftClientMessage computed, ClientId expectedClientId,
+ boolean stepDownLeader) {
Assert.assertNotNull(computed);
Assert.assertEquals(expectedClientId, computed.getClientId());
- Assert.assertEquals(Optional.ofNullable(expectedServerId).orElseGet(expected::getServerId), computed.getServerId());
+ if (!stepDownLeader) {
+ Assert.assertEquals(
+ Optional.ofNullable(expectedServerId).orElseGet(expected::getServerId), computed.getServerId());
+ }
Assert.assertEquals(expected.getRaftGroupId(), computed.getRaftGroupId());
}
@@ -379,6 +388,6 @@ public interface DataStreamTestUtils {
final LogEntryProto entryFromLog = searchLogEntry(ClientInvocationId.valueOf(request),
RaftServerTestUtil.getRaftLog(division));
- Assert.assertSame(entryFromStream, entryFromLog);
+ Assert.assertEquals(entryFromStream, entryFromLog);
}
}