You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ji...@apache.org on 2017/02/18 02:17:05 UTC

incubator-ratis git commit: RATIS-18. A new leader should start serving client requests only after it commits the first leader-placeholder entry. Contributed by Jing Zhao.

Repository: incubator-ratis
Updated Branches:
  refs/heads/master e1620e804 -> 5565425f0


RATIS-18. A new leader should start serving client requests only after it commits the first leader-placeholder entry. Contributed by Jing Zhao.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/5565425f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/5565425f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/5565425f

Branch: refs/heads/master
Commit: 5565425f0e4ebc450051340522702465b37d97c6
Parents: e1620e8
Author: Jing Zhao <ji...@apache.org>
Authored: Fri Feb 17 18:16:38 2017 -0800
Committer: Jing Zhao <ji...@apache.org>
Committed: Fri Feb 17 18:16:38 2017 -0800

----------------------------------------------------------------------
 .../ratis/protocol/LeaderNotReadyException.java | 34 ++++++++++++
 .../client/HadoopClientRequestSender.java       |  7 ++-
 .../apache/ratis/server/impl/LeaderState.java   | 13 ++++-
 .../ratis/server/impl/RaftServerImpl.java       |  7 +++
 .../java/org/apache/ratis/MiniRaftCluster.java  |  3 ++
 .../impl/DelayLocalExecutionInjection.java      |  6 ++-
 .../impl/RaftReconfigurationBaseTest.java       | 56 +++++++++++++++++++-
 7 files changed, 119 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/5565425f/ratis-common/src/main/java/org/apache/ratis/protocol/LeaderNotReadyException.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/LeaderNotReadyException.java b/ratis-common/src/main/java/org/apache/ratis/protocol/LeaderNotReadyException.java
new file mode 100644
index 0000000..33f6a4d
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/LeaderNotReadyException.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.protocol;
+
+/**
+ * This exception is sent from the server to a client. The server has just
+ * become the current leader, but has not committed its first place-holder
+ * log entry yet. Thus the leader cannot accept any new client requests since
+ * it cannot determine whether a request is just a retry.
+ */
+public class LeaderNotReadyException extends RaftException {
+  public LeaderNotReadyException() {
+    this("The leader is not ready yet");
+  }
+
+  public LeaderNotReadyException(String msg) {
+    super(msg);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/5565425f/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/HadoopClientRequestSender.java
----------------------------------------------------------------------
diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/HadoopClientRequestSender.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/HadoopClientRequestSender.java
index 116be19..918a191 100644
--- a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/HadoopClientRequestSender.java
+++ b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/HadoopClientRequestSender.java
@@ -50,9 +50,12 @@ public class HadoopClientRequestSender implements RaftClientRequestSender {
         return proxy.submitClientRequest(request);
       }
     } catch (RemoteException e) {
-      throw e.unwrapRemoteException(StateMachineException.class,
+      throw e.unwrapRemoteException(
+          StateMachineException.class,
           ReconfigurationTimeoutException.class,
-          ReconfigurationInProgressException.class, RaftException.class);
+          ReconfigurationInProgressException.class,
+          RaftException.class,
+          LeaderNotReadyException.class);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/5565425f/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
index d5d6adc..1750a04 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
@@ -53,6 +53,7 @@ import org.apache.ratis.server.storage.RaftLog;
 import org.apache.ratis.shaded.proto.RaftProtos.LeaderNoOp;
 import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
 import org.apache.ratis.statemachine.TransactionContext;
+import org.apache.ratis.util.CodeInjectionForTesting;
 import org.apache.ratis.util.Daemon;
 import org.apache.ratis.util.ProtoUtils;
 import org.apache.ratis.util.Timestamp;
@@ -70,6 +71,7 @@ import com.google.common.base.Preconditions;
  */
 public class LeaderState {
   private static final Logger LOG = RaftServerImpl.LOG;
+  public static final String APPEND_PLACEHOLDER = LeaderState.class.getSimpleName() + ".placeholder";
 
   enum StateUpdateEventType {
     STEPDOWN, UPDATECOMMIT, STAGINGPROGRESS
@@ -113,6 +115,7 @@ public class LeaderState {
   private final int stagingCatchupGap;
   private final int snapshotChunkMaxSize;
   private final int syncInterval;
+  private final long placeHolderIndex;
 
   LeaderState(RaftServerImpl server, RaftProperties properties) {
     this.server = server;
@@ -137,11 +140,11 @@ public class LeaderState {
     final RaftConfiguration conf = server.getRaftConf();
     Collection<RaftPeer> others = conf.getOtherPeers(state.getSelfId());
     final Timestamp t = new Timestamp().addTimeMs(-server.getMaxTimeoutMs());
-    final long nextIndex = raftLog.getNextIndex();
+    placeHolderIndex = raftLog.getNextIndex();
     senders = new ArrayList<>(others.size());
 
     for (RaftPeer p : others) {
-      FollowerInfo f = new FollowerInfo(p, t, nextIndex, true);
+      FollowerInfo f = new FollowerInfo(p, t, placeHolderIndex, true);
       senders.add(server.getFactory().newLogAppender(server, this, f));
     }
     voterLists = divideFollowers(conf);
@@ -156,12 +159,18 @@ public class LeaderState {
         .setTerm(server.getState().getCurrentTerm())
         .setIndex(raftLog.getNextIndex())
         .setNoOp(LeaderNoOp.newBuilder()).build();
+    CodeInjectionForTesting.execute(APPEND_PLACEHOLDER,
+        server.getId().toString(), null);
     raftLog.append(placeHolder);
 
     processor.start();
     startSenders();
   }
 
+  boolean isReady() {
+    return server.getState().getLastAppliedIndex() >= placeHolderIndex;
+  }
+
   private void startSenders() {
     senders.forEach(Thread::start);
   }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/5565425f/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 7d9e049..152d6a5 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -31,6 +31,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.function.Supplier;
 
 import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.LeaderNotReadyException;
 import org.apache.ratis.protocol.Message;
 import org.apache.ratis.protocol.NotLeaderException;
 import org.apache.ratis.protocol.RaftClientReply;
@@ -347,6 +348,12 @@ public class RaftServerImpl implements RaftServer {
       CompletableFuture<RaftClientReply> future = new CompletableFuture<>();
       future.complete(new RaftClientReply(request, exception));
       return future;
+    } else {
+      if (leaderState == null || !leaderState.isReady()) {
+        CompletableFuture<RaftClientReply> future = new CompletableFuture<>();
+        future.completeExceptionally(new LeaderNotReadyException());
+        return future;
+      }
     }
     return null;
   }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/5565425f/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
index 993d861..682849f 100644
--- a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
+++ b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
@@ -26,6 +26,7 @@ import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.RaftServerRpc;
 import org.apache.ratis.server.impl.DelayLocalExecutionInjection;
+import org.apache.ratis.server.impl.LeaderState;
 import org.apache.ratis.server.impl.RaftConfiguration;
 import org.apache.ratis.server.impl.RaftServerImpl;
 import org.apache.ratis.server.impl.ServerImplUtils;
@@ -51,6 +52,8 @@ public abstract class MiniRaftCluster {
   public static final Logger LOG = LoggerFactory.getLogger(MiniRaftCluster.class);
   public static final DelayLocalExecutionInjection logSyncDelay =
       new DelayLocalExecutionInjection(RaftLog.LOG_SYNC);
+  public static final DelayLocalExecutionInjection leaderPlaceHolderDelay =
+      new DelayLocalExecutionInjection(LeaderState.APPEND_PLACEHOLDER);
 
   public static final String CLASS_NAME = MiniRaftCluster.class.getSimpleName();
   public static final String STATEMACHINE_CLASS_KEY = CLASS_NAME + ".statemachine.class";

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/5565425f/ratis-server/src/test/java/org/apache/ratis/server/impl/DelayLocalExecutionInjection.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/DelayLocalExecutionInjection.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/DelayLocalExecutionInjection.java
index 1818722..6df6176 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/DelayLocalExecutionInjection.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/DelayLocalExecutionInjection.java
@@ -29,8 +29,10 @@ import java.util.concurrent.atomic.AtomicInteger;
 public class DelayLocalExecutionInjection implements CodeInjectionForTesting.Code {
   private final Map<String, AtomicInteger> delays = new ConcurrentHashMap<>();
 
-  public DelayLocalExecutionInjection(String method) {
-    CodeInjectionForTesting.put(method, this);
+  public DelayLocalExecutionInjection(String... methods) {
+    for (String method : methods) {
+      CodeInjectionForTesting.put(method, this);
+    }
   }
 
   public void clear() {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/5565425f/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
index e07f5cb..5b46af8 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
@@ -46,6 +46,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static java.util.Arrays.asList;
+import static org.apache.ratis.MiniRaftCluster.leaderPlaceHolderDelay;
 import static org.apache.ratis.MiniRaftCluster.logSyncDelay;
 import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_SEQNUM;
 import static org.apache.ratis.server.impl.RaftServerTestUtil.waitAndCheckNewConf;
@@ -509,7 +510,6 @@ public abstract class RaftReconfigurationBaseTest {
   @Test
   public void testRevertConfigurationChange() throws Exception {
     LOG.info("Start testRevertConfigurationChange");
-    // originally 3 peers
     final MiniRaftCluster cluster = getCluster(5);
     try {
       cluster.start();
@@ -575,4 +575,58 @@ public abstract class RaftReconfigurationBaseTest {
       cluster.shutdown();
     }
   }
+
+  /**
+   * Delay the commit of the leader placeholder log entry and see if the client
+   * can correctly receive and handle the LeaderNotReadyException.
+   */
+  @Test
+  public void testLeaderNotReadyException() throws Exception {
+    LOG.info("Start testLeaderNotReadyException");
+    final MiniRaftCluster cluster = getCluster(1);
+    final RaftPeerId leaderId = cluster.getPeers().iterator().next().getId();
+    try {
+      // delay 1s for each logSync call
+      cluster.getPeers().forEach(
+          peer -> leaderPlaceHolderDelay.setDelayMs(peer.getId().toString(), 2000));
+      cluster.start();
+
+      AtomicBoolean caughtNotReady = new AtomicBoolean(false);
+      AtomicBoolean success = new AtomicBoolean(false);
+      new Thread(() -> {
+        final RaftClient client = cluster.createClient(leaderId);
+        final RaftClientRequestSender sender = client.getRequestSender();
+
+        final RaftClientRequest request = new RaftClientRequest(client.getId(),
+            leaderId, 0, new SimpleMessage("test"));
+        while (!success.get()) {
+          try {
+            RaftClientReply reply = sender.sendRequest(request);
+            success.set(reply.isSuccess());
+          } catch (LeaderNotReadyException e) {
+            LOG.info("Hit LeaderNotReadyException", e);
+            caughtNotReady.set(true);
+          } catch (IOException e) {
+            LOG.info("Hit other IOException", e);
+          }
+          if (!success.get()) {
+            try {
+              Thread.sleep(200);
+            } catch (InterruptedException ignored) {
+            }
+          }
+        }
+      }).start();
+
+      RaftTestUtil.waitForLeader(cluster);
+      for (int i = 0; !success.get() && i < 5; i++) {
+        Thread.sleep(1000);
+      }
+      Assert.assertTrue(success.get());
+      Assert.assertTrue(caughtNotReady.get());
+    } finally {
+      leaderPlaceHolderDelay.clear();
+      cluster.shutdown();
+    }
+  }
 }