You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2018/06/14 09:32:51 UTC

incubator-ratis git commit: RATIS-233. Throw an exception for the delayed requests if the leader is stepping down. Contributed by Kit Hui

Repository: incubator-ratis
Updated Branches:
  refs/heads/master 1b753aeba -> ccc380119


RATIS-233. Throw an exception for the delayed requests if the leader is stepping down.  Contributed by Kit Hui


Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/ccc38011
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/ccc38011
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/ccc38011

Branch: refs/heads/master
Commit: ccc380119edb31631943b91b1f2f0fa70fc3bf86
Parents: 1b753ae
Author: Tsz Wo Nicholas Sze <sz...@apache.org>
Authored: Thu Jun 14 17:32:10 2018 +0800
Committer: Tsz Wo Nicholas Sze <sz...@apache.org>
Committed: Thu Jun 14 17:32:10 2018 +0800

----------------------------------------------------------------------
 .../ratis/client/impl/ClientProtoUtils.java     | 17 ++++++-
 .../ratis/protocol/NotReplicatedException.java  | 46 +++++++++++++++++++
 .../apache/ratis/protocol/RaftClientReply.java  | 17 +++++--
 .../ratis/protocol/RaftClientRequest.java       |  3 +-
 ratis-proto-shaded/src/main/proto/Raft.proto    |  9 +++-
 .../ratis/server/impl/PendingRequest.java       | 12 +++--
 .../ratis/server/impl/PendingRequests.java      | 10 +++++
 .../java/org/apache/ratis/RaftAsyncTests.java   |  8 ++++
 .../java/org/apache/ratis/RaftBasicTests.java   | 47 ++++++++++++++++++++
 9 files changed, 158 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ccc38011/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
----------------------------------------------------------------------
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
index 4d858e4..f32d6b9 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
@@ -26,6 +26,7 @@ import org.apache.ratis.util.ReflectionUtils;
 import java.util.Arrays;
 
 import static org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto.ExceptionDetailsCase.NOTLEADEREXCEPTION;
+import static org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto.ExceptionDetailsCase.NOTREPLICATEDEXCEPTION;
 import static org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto.ExceptionDetailsCase.STATEMACHINEEXCEPTION;
 
 public interface ClientProtoUtils {
@@ -163,6 +164,15 @@ public interface ClientProtoUtils {
             .setStacktrace(ProtoUtils.writeObject2ByteString(t.getStackTrace()));
         b.setStateMachineException(smeBuilder.build());
       }
+
+      final NotReplicatedException nre = reply.getNotReplicatedException();
+      if (nre != null) {
+        final NotReplicatedExceptionProto.Builder nreBuilder = NotReplicatedExceptionProto.newBuilder()
+            .setCallId(nre.getCallId())
+            .setReplication(nre.getRequiredReplication())
+            .setLogIndex(nre.getLogIndex());
+        b.setNotReplicatedException(nreBuilder);
+      }
     }
     return b.build();
   }
@@ -186,7 +196,7 @@ public interface ClientProtoUtils {
   static RaftClientReply toRaftClientReply(
       RaftClientReplyProto replyProto) {
     final RaftRpcReplyProto rp = replyProto.getRpcReply();
-    RaftException e = null;
+    final RaftException e;
     if (replyProto.getExceptionDetailsCase().equals(NOTLEADEREXCEPTION)) {
       NotLeaderExceptionProto nleProto = replyProto.getNotLeaderException();
       final RaftPeer suggestedLeader = nleProto.hasSuggestedLeader() ?
@@ -195,11 +205,16 @@ public interface ClientProtoUtils {
           nleProto.getPeersInConfList());
       e = new NotLeaderException(RaftPeerId.valueOf(rp.getReplyId()),
           suggestedLeader, peers);
+    } else if (replyProto.getExceptionDetailsCase() == NOTREPLICATEDEXCEPTION) {
+      final NotReplicatedExceptionProto nre = replyProto.getNotReplicatedException();
+      e = new NotReplicatedException(nre.getCallId(), nre.getReplication(), nre.getLogIndex());
     } else if (replyProto.getExceptionDetailsCase().equals(STATEMACHINEEXCEPTION)) {
       StateMachineExceptionProto smeProto = replyProto.getStateMachineException();
       e = wrapStateMachineException(RaftPeerId.valueOf(rp.getReplyId()),
           smeProto.getExceptionClassName(), smeProto.getErrorMsg(),
           smeProto.getStacktrace());
+    } else {
+      e = null;
     }
     ClientId clientId = ClientId.valueOf(rp.getRequestorId());
     final RaftGroupId groupId = ProtoUtils.toRaftGroupId(rp.getRaftGroupId());

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ccc38011/ratis-common/src/main/java/org/apache/ratis/protocol/NotReplicatedException.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/NotReplicatedException.java b/ratis-common/src/main/java/org/apache/ratis/protocol/NotReplicatedException.java
new file mode 100644
index 0000000..67bda34
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/NotReplicatedException.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.protocol;
+
+import org.apache.ratis.shaded.proto.RaftProtos.ReplicationLevel;
+
+public class NotReplicatedException extends RaftException {
+  private final long callId;
+  private final ReplicationLevel requiredReplication;
+  private final long logIndex;
+
+  public NotReplicatedException(long callId, ReplicationLevel requiredReplication, long logIndex) {
+    super("Request with call Id " + callId + " is committed with log index " + logIndex
+        + " but not yet replicated to " + requiredReplication);
+    this.callId = callId;
+    this.requiredReplication = requiredReplication;
+    this.logIndex = logIndex;
+  }
+
+  public long getCallId() {
+    return callId;
+  }
+
+  public ReplicationLevel getRequiredReplication() {
+    return requiredReplication;
+  }
+
+  public long getLogIndex() {
+    return logIndex;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ccc38011/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
index 8254ab4..ba3cdc7 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
@@ -55,16 +55,15 @@ public class RaftClientReply extends RaftClientMessage {
     this.callId = callId;
     this.message = message;
     this.exception = exception;
+    this.commitInfos = commitInfos != null? commitInfos: Collections.emptyList();
 
     if (exception != null) {
       Preconditions.assertTrue(!success,
           () -> "Inconsistent parameters: success && exception != null: " + this);
-      Preconditions.assertTrue(
-          ReflectionUtils.isInstance(exception, NotLeaderException.class, StateMachineException.class),
+      Preconditions.assertTrue(ReflectionUtils.isInstance(exception,
+          NotLeaderException.class, NotReplicatedException.class, StateMachineException.class),
           () -> "Unexpected exception class: " + this);
     }
-
-    this.commitInfos = commitInfos != null? commitInfos: Collections.emptyList();
   }
 
   public RaftClientReply(RaftClientRequest request, RaftException exception, Collection<CommitInfoProto> commitInfos) {
@@ -81,6 +80,11 @@ public class RaftClientReply extends RaftClientMessage {
         request.getCallId(), true, message, null, commitInfos);
   }
 
+  public RaftClientReply(RaftClientReply reply, NotReplicatedException nre) {
+    this(reply.getClientId(), reply.getServerId(), reply.getRaftGroupId(),
+        reply.getCallId(), false, reply.getMessage(), nre, reply.getCommitInfos());
+  }
+
   /**
    * Get the commit information for the entire group.
    * The commit information may be unavailable for exception reply.
@@ -120,6 +124,11 @@ public class RaftClientReply extends RaftClientMessage {
     return JavaUtils.cast(exception, NotLeaderException.class);
   }
 
+  /** If this reply has {@link NotReplicatedException}, return it; otherwise return null. */
+  public NotReplicatedException getNotReplicatedException() {
+    return JavaUtils.cast(exception, NotReplicatedException.class);
+  }
+
   /** If this reply has {@link StateMachineException}, return it; otherwise return null. */
   public StateMachineException getStateMachineException() {
     return JavaUtils.cast(exception, StateMachineException.class);

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ccc38011/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
index 232c51d..34c96f3 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
@@ -122,7 +122,8 @@ public class RaftClientRequest extends RaftClientMessage {
     public String toString() {
       switch (typeCase) {
         case WRITE:
-          return "RW";
+          final ReplicationLevel replication = write.getReplication();
+          return "RW" + (replication == ReplicationLevel.MAJORITY? "": "-" + replication);
         case READ:
           return "RO";
         case STALEREAD:

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ccc38011/ratis-proto-shaded/src/main/proto/Raft.proto
----------------------------------------------------------------------
diff --git a/ratis-proto-shaded/src/main/proto/Raft.proto b/ratis-proto-shaded/src/main/proto/Raft.proto
index 3f0baf8..303ec2b 100644
--- a/ratis-proto-shaded/src/main/proto/Raft.proto
+++ b/ratis-proto-shaded/src/main/proto/Raft.proto
@@ -203,6 +203,12 @@ message NotLeaderExceptionProto {
   repeated RaftPeerProto peersInConf = 2;
 }
 
+message NotReplicatedExceptionProto {
+  uint64 callId = 1;
+  ReplicationLevel replication = 2;
+  uint64 logIndex = 3;
+}
+
 message StateMachineExceptionProto {
   string exceptionClassName = 1;
   string errorMsg = 2;
@@ -215,7 +221,8 @@ message RaftClientReplyProto {
 
   oneof ExceptionDetails {
     NotLeaderExceptionProto notLeaderException = 3;
-    StateMachineExceptionProto stateMachineException = 4;
+    NotReplicatedExceptionProto notReplicatedException = 4;
+    StateMachineExceptionProto stateMachineException = 5;
   }
 
   repeated CommitInfoProto commitInfos = 15;

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ccc38011/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java
index 10cd95f..4a72197 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java
@@ -17,10 +17,8 @@
  */
 package org.apache.ratis.server.impl;
 
-import org.apache.ratis.protocol.NotLeaderException;
-import org.apache.ratis.protocol.RaftClientReply;
-import org.apache.ratis.protocol.RaftClientRequest;
-import org.apache.ratis.protocol.SetConfigurationRequest;
+import org.apache.ratis.protocol.*;
+import org.apache.ratis.shaded.proto.RaftProtos.ReplicationLevel;
 import org.apache.ratis.statemachine.TransactionContext;
 import org.apache.ratis.util.Preconditions;
 
@@ -86,6 +84,12 @@ public class PendingRequest implements Comparable<PendingRequest> {
     setReply(delayed);
   }
 
+  synchronized void failDelayedReply() {
+    final RaftClientRequest.Type type = request.getType();
+    final ReplicationLevel replication = type.getWrite().getReplication();
+    setReply(new RaftClientReply(delayed, new NotReplicatedException(request.getCallId(), replication, index)));
+  }
+
   TransactionContext setNotLeaderException(NotLeaderException nle) {
     setReply(new RaftClientReply(getRequest(), nle, null));
     return getEntry();

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ccc38011/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
index c02d7c3..2cf1271 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
@@ -116,9 +116,18 @@ class PendingRequests {
           polled = q.poll();
           Preconditions.assertTrue(polled == peeked);
         }
+        LOG.debug("{}: complete delay request {}", name, polled);
         polled.completeDelayedReply();
       }
     }
+
+    void failReplies() {
+      synchronized (q) {
+        for(; !q.isEmpty();) {
+          q.poll().failDelayedReply();
+        }
+      }
+    }
   }
 
   private PendingRequest pendingSetConf;
@@ -215,6 +224,7 @@ class PendingRequests {
     if (pendingSetConf != null) {
       pendingSetConf.setNotLeaderException(nle);
     }
+    delayedReplies.failReplies();
   }
 
   void checkDelayedReplies(long allAckedIndex) {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ccc38011/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
index 53e0f1f..b297a27 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
@@ -48,6 +48,7 @@ import java.util.List;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import static org.apache.ratis.RaftBasicTests.runTestDelayRequestIfLeaderStepDown;
 import static org.apache.ratis.RaftTestUtil.waitForLeader;
 
 public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends BaseTest
@@ -309,4 +310,11 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends Ba
     //reset for the other tests
     RaftServerConfigKeys.RetryCache.setExpiryTime(getProperties(), oldExpiryTime);
   }
+
+  @Test
+  public void testAsyncDelayRequestIfLeaderStepDown() throws Exception {
+    final CLUSTER cluster = newCluster(5);
+    cluster.start();
+    runTestDelayRequestIfLeaderStepDown(true, cluster, LOG);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ccc38011/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
index 207a458..8c744ab 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
@@ -22,6 +22,7 @@ import org.apache.ratis.RaftTestUtil.SimpleMessage;
 import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.client.impl.RaftClientTestUtil;
 import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.NotReplicatedException;
 import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.RaftServer;
@@ -50,6 +51,7 @@ import java.util.List;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -451,4 +453,49 @@ public abstract class RaftBasicTests extends BaseTest {
       Assert.assertTrue(duration.compareTo(retryCacheExpiryDuration) >= 0);
     }
   }
+
+  @Test
+  public void testDelayRequestIfLeaderStepDown() throws Exception {
+    runTestDelayRequestIfLeaderStepDown(false, getCluster(), LOG);
+  }
+
+  static void runTestDelayRequestIfLeaderStepDown(boolean async, MiniRaftCluster cluster, Logger LOG) throws Exception {
+    boolean skipfirstserver = false;
+    for (RaftServer s : cluster.getServers()) {
+      if (!skipfirstserver) {
+        skipfirstserver = true;
+        cluster.killServer(s.getId());
+        continue;
+      }
+      cluster.restartServer(s.getId(), false);
+    }
+    final RaftServerImpl leader = waitForLeader(cluster);
+    LOG.info("leader: " + leader.getId() + ", " + cluster.printServers());
+
+    final SimpleMessage message = SimpleMessage.create(1)[0];
+    try (final RaftClient client = cluster.createClientWithLeader()) {
+      final RaftClientReply reply;
+      if (async) {
+        final CompletableFuture<RaftClientReply> f = client.sendAsync(message, ReplicationLevel.ALL);
+        Thread.sleep(1000);
+        RaftTestUtil.changeLeader(cluster, leader.getId());
+
+        reply = f.get();
+      } else {
+        new Thread(() -> {
+          try {
+            Thread.sleep(1000);
+            RaftTestUtil.changeLeader(cluster, leader.getId());
+          } catch (Exception e) {
+            LOG.warn("changeLeader", e);
+          }
+        }).start();
+
+        reply = client.send(message, ReplicationLevel.ALL);
+      }
+      throw reply.getNotReplicatedException();
+    } catch (NotReplicatedException e) {
+      LOG.info("Expected", e);
+    }
+  }
 }