You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2020/12/02 11:55:17 UTC

[incubator-ratis] branch master updated: RATIS-1174. Test leader change when close stream (#310)

This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new d15a3e4  RATIS-1174. Test leader change when close stream (#310)
d15a3e4 is described below

commit d15a3e452e55fce2062b53658cd3bee14f898a47
Author: runzhiwang <51...@users.noreply.github.com>
AuthorDate: Wed Dec 2 19:55:08 2020 +0800

    RATIS-1174. Test leader change when close stream (#310)
---
 .../datastream/DataStreamAsyncClusterTests.java    | 42 ++++++++++++++++++----
 .../ratis/datastream/DataStreamBaseTest.java       |  2 +-
 .../ratis/datastream/DataStreamTestUtils.java      | 29 +++++++++------
 3 files changed, 55 insertions(+), 18 deletions(-)

diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamAsyncClusterTests.java b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamAsyncClusterTests.java
index 8a19e10..869da26 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamAsyncClusterTests.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamAsyncClusterTests.java
@@ -69,20 +69,46 @@ public abstract class DataStreamAsyncClusterTests<CLUSTER extends MiniRaftCluste
     RaftServerConfigKeys.Rpc.setTimeoutMax(getProperties(), max);
   }
 
+  @Test
+  public void testMultipleStreamsMultipleServersStepDownLeader() throws Exception {
+    runWithNewCluster(3, this::runTestDataStreamStepDownLeader);
+  }
+
+  void runTestDataStreamStepDownLeader(CLUSTER cluster) throws Exception {
+    runTestDataStream(cluster, true);
+  }
+
   void runTestDataStream(CLUSTER cluster) throws Exception {
+    runTestDataStream(cluster, false);
+  }
+
+  void runTestDataStream(CLUSTER cluster, boolean stepDownLeader) throws Exception {
     RaftTestUtil.waitForLeader(cluster);
 
     final List<CompletableFuture<Long>> futures = new ArrayList<>();
-    futures.add(CompletableFuture.supplyAsync(() -> runTestDataStream(cluster, 5, 10, 1_000_000, 10), executor));
-    futures.add(CompletableFuture.supplyAsync(() -> runTestDataStream(cluster, 2, 20, 1_000, 10_000), executor));
+    futures.add(CompletableFuture.supplyAsync(() -> runTestDataStream(cluster, 5, 10, 1_000_000, 10, stepDownLeader), executor));
+    futures.add(CompletableFuture.supplyAsync(() -> runTestDataStream(cluster, 2, 20, 1_000, 10_000, stepDownLeader), executor));
     final long maxIndex = futures.stream()
         .map(CompletableFuture::join)
         .max(Long::compareTo)
         .orElseThrow(IllegalStateException::new);
 
+    if (stepDownLeader) {
+      final RaftPeerId oldLeader = cluster.getLeader().getId();
+      final CompletableFuture<RaftPeerId> changeLeader = futures.get(0).thenApplyAsync(dummy -> {
+        try {
+          return RaftTestUtil.changeLeader(cluster, oldLeader);
+        } catch (Exception e) {
+          throw new CompletionException("Failed to change leader from " + oldLeader, e);
+        }
+      });
+      LOG.info("Changed leader from {} to {}", oldLeader, changeLeader.join());
+    }
+
     // wait for all servers to catch up
     try (RaftClient client = cluster.createClient()) {
-      client.async().watch(maxIndex, ReplicationLevel.ALL).join();
+      RaftClientReply reply = client.async().watch(maxIndex, ReplicationLevel.ALL).join();
+      Assert.assertTrue(reply.isSuccess());
     }
     // assert all streams are linked
     for (RaftServer proxy : cluster.getServers()) {
@@ -95,10 +121,12 @@ public abstract class DataStreamAsyncClusterTests<CLUSTER extends MiniRaftCluste
     }
   }
 
-  Long runTestDataStream(CLUSTER cluster, int numClients, int numStreams, int bufferSize, int bufferNum) {
+  Long runTestDataStream(
+      CLUSTER cluster, int numClients, int numStreams, int bufferSize, int bufferNum, boolean stepDownLeader) {
     final List<CompletableFuture<Long>> futures = new ArrayList<>();
     for (int j = 0; j < numClients; j++) {
-      futures.add(CompletableFuture.supplyAsync(() -> runTestDataStream(cluster, numStreams, bufferSize, bufferNum), executor));
+      futures.add(CompletableFuture.supplyAsync(
+          () -> runTestDataStream(cluster, numStreams, bufferSize, bufferNum, stepDownLeader), executor));
     }
     Assert.assertEquals(numClients, futures.size());
     return futures.stream()
@@ -111,7 +139,7 @@ public abstract class DataStreamAsyncClusterTests<CLUSTER extends MiniRaftCluste
     return cluster.getDivision(primary.getId()).getRaftClient().getId();
   }
 
-  long runTestDataStream(CLUSTER cluster, int numStreams, int bufferSize, int bufferNum) {
+  long runTestDataStream(CLUSTER cluster, int numStreams, int bufferSize, int bufferNum, boolean stepDownLeader) {
     final Iterable<RaftServer> servers = CollectionUtils.as(cluster.getServers(), s -> s);
     final RaftPeerId leader = cluster.getLeader().getId();
     final List<CompletableFuture<RaftClientReply>> futures = new ArrayList<>();
@@ -121,7 +149,7 @@ public abstract class DataStreamAsyncClusterTests<CLUSTER extends MiniRaftCluste
       for (int i = 0; i < numStreams; i++) {
         final DataStreamOutputImpl out = (DataStreamOutputImpl) client.getDataStreamApi().stream();
         futures.add(CompletableFuture.supplyAsync(() -> DataStreamTestUtils.writeAndCloseAndAssertReplies(
-            servers, leader, out, bufferSize, bufferNum, primaryClientId).join(), executor));
+            servers, leader, out, bufferSize, bufferNum, primaryClientId, cluster, stepDownLeader).join(), executor));
       }
       Assert.assertEquals(numStreams, futures.size());
       return futures.stream()
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
index b8d7dd3..c504d95 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
@@ -391,7 +391,7 @@ abstract class DataStreamBaseTest extends BaseTest {
 
       final RaftClientReply clientReply = DataStreamTestUtils.writeAndCloseAndAssertReplies(
           CollectionUtils.as(servers, Server::getRaftServer), null, out, bufferSize, bufferNum,
-          getPrimaryClientId()).join();
+          getPrimaryClientId(), null, false).join();
       if (expectedException != null) {
         Assert.assertFalse(clientReply.isSuccess());
         Assert.assertTrue(clientReply.getException().getMessage().contains(
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java
index 3131ee7..673eab8 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java
@@ -18,6 +18,8 @@
 package org.apache.ratis.datastream;
 
 import org.apache.ratis.BaseTest;
+import org.apache.ratis.RaftTestUtil;
+import org.apache.ratis.server.impl.MiniRaftCluster;
 import org.apache.ratis.client.impl.ClientProtoUtils;
 import org.apache.ratis.client.impl.DataStreamClientImpl.DataStreamOutputImpl;
 import org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
@@ -64,6 +66,7 @@ import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
 
 public interface DataStreamTestUtils {
   Logger LOG = LoggerFactory.getLogger(DataStreamTestUtils.class);
@@ -286,22 +289,24 @@ public interface DataStreamTestUtils {
 
   static CompletableFuture<RaftClientReply> writeAndCloseAndAssertReplies(
       Iterable<RaftServer> servers, RaftPeerId leader, DataStreamOutputImpl out, int bufferSize, int bufferNum,
-      ClientId primaryClientId) {
+      ClientId primaryClientId, MiniRaftCluster cluster, boolean stepDownLeader) {
     LOG.info("start Stream{}", out.getHeader().getCallId());
     final int bytesWritten = writeAndAssertReplies(out, bufferSize, bufferNum);
     try {
       for (RaftServer s : servers) {
-        assertHeader(s, out.getHeader(), bytesWritten);
+        assertHeader(s, out.getHeader(), bytesWritten, stepDownLeader);
       }
     } catch (Throwable e) {
       throw new CompletionException(e);
     }
     LOG.info("Stream{}: bytesWritten={}", out.getHeader().getCallId(), bytesWritten);
 
-    return out.closeAsync().thenCompose(reply -> assertCloseReply(out, reply, bytesWritten, leader, primaryClientId));
+    return out.closeAsync().thenCompose(
+        reply -> assertCloseReply(out, reply, bytesWritten, leader, primaryClientId, cluster, stepDownLeader));
   }
 
-  static void assertHeader(RaftServer server, RaftClientRequest header, int dataSize) throws Exception {
+  static void assertHeader(RaftServer server, RaftClientRequest header, int dataSize, boolean stepDownLeader)
+      throws Exception {
     // check header
     Assert.assertEquals(RaftClientRequest.dataStreamRequestType(), header.getType());
 
@@ -315,11 +320,11 @@ public interface DataStreamTestUtils {
     // check writeRequest
     final RaftClientRequest writeRequest = stream.getWriteRequest();
     Assert.assertEquals(RaftClientRequest.dataStreamRequestType(), writeRequest.getType());
-    assertRaftClientMessage(header, null, writeRequest, header.getClientId());
+    assertRaftClientMessage(header, null, writeRequest, header.getClientId(), stepDownLeader);
   }
 
   static CompletableFuture<RaftClientReply> assertCloseReply(DataStreamOutputImpl out, DataStreamReply dataStreamReply,
-      long bytesWritten, RaftPeerId leader, ClientId primaryClientId) {
+      long bytesWritten, RaftPeerId leader, ClientId primaryClientId, MiniRaftCluster cluster, boolean stepDownLeader) {
     // Test close idempotent
     Assert.assertSame(dataStreamReply, out.closeAsync().join());
     BaseTest.testFailureCase("writeAsync should fail",
@@ -329,7 +334,7 @@ public interface DataStreamTestUtils {
     final DataStreamReplyByteBuffer buffer = (DataStreamReplyByteBuffer) dataStreamReply;
     try {
       final RaftClientReply reply = ClientProtoUtils.toRaftClientReply(buffer.slice());
-      assertRaftClientMessage(out.getHeader(), leader, reply, primaryClientId);
+      assertRaftClientMessage(out.getHeader(), leader, reply, primaryClientId, stepDownLeader);
       if (reply.isSuccess()) {
         final ByteString bytes = reply.getMessage().getContent();
         if (!bytes.equals(MOCK)) {
@@ -344,10 +349,14 @@ public interface DataStreamTestUtils {
   }
 
   static void assertRaftClientMessage(
-      RaftClientMessage expected, RaftPeerId expectedServerId, RaftClientMessage computed, ClientId expectedClientId) {
+      RaftClientMessage expected, RaftPeerId expectedServerId, RaftClientMessage computed, ClientId expectedClientId,
+      boolean stepDownLeader) {
     Assert.assertNotNull(computed);
     Assert.assertEquals(expectedClientId, computed.getClientId());
-    Assert.assertEquals(Optional.ofNullable(expectedServerId).orElseGet(expected::getServerId), computed.getServerId());
+    if (!stepDownLeader) {
+      Assert.assertEquals(
+          Optional.ofNullable(expectedServerId).orElseGet(expected::getServerId), computed.getServerId());
+    }
     Assert.assertEquals(expected.getRaftGroupId(), computed.getRaftGroupId());
   }
 
@@ -379,6 +388,6 @@ public interface DataStreamTestUtils {
 
     final LogEntryProto entryFromLog = searchLogEntry(ClientInvocationId.valueOf(request),
         RaftServerTestUtil.getRaftLog(division));
-    Assert.assertSame(entryFromStream, entryFromLog);
+    Assert.assertEquals(entryFromStream, entryFromLog);
   }
 }