You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ji...@apache.org on 2017/02/18 02:17:05 UTC
incubator-ratis git commit: RATIS-18. A new leader should start
serving client requests only after it commits the first leader-placeholder
entry. Contributed by Jing Zhao.
Repository: incubator-ratis
Updated Branches:
refs/heads/master e1620e804 -> 5565425f0
RATIS-18. A new leader should start serving client requests only after it commits the first leader-placeholder entry. Contributed by Jing Zhao.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/5565425f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/5565425f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/5565425f
Branch: refs/heads/master
Commit: 5565425f0e4ebc450051340522702465b37d97c6
Parents: e1620e8
Author: Jing Zhao <ji...@apache.org>
Authored: Fri Feb 17 18:16:38 2017 -0800
Committer: Jing Zhao <ji...@apache.org>
Committed: Fri Feb 17 18:16:38 2017 -0800
----------------------------------------------------------------------
.../ratis/protocol/LeaderNotReadyException.java | 34 ++++++++++++
.../client/HadoopClientRequestSender.java | 7 ++-
.../apache/ratis/server/impl/LeaderState.java | 13 ++++-
.../ratis/server/impl/RaftServerImpl.java | 7 +++
.../java/org/apache/ratis/MiniRaftCluster.java | 3 ++
.../impl/DelayLocalExecutionInjection.java | 6 ++-
.../impl/RaftReconfigurationBaseTest.java | 56 +++++++++++++++++++-
7 files changed, 119 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/5565425f/ratis-common/src/main/java/org/apache/ratis/protocol/LeaderNotReadyException.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/LeaderNotReadyException.java b/ratis-common/src/main/java/org/apache/ratis/protocol/LeaderNotReadyException.java
new file mode 100644
index 0000000..33f6a4d
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/LeaderNotReadyException.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.protocol;
+
+/**
+ * This exception is sent from the server to a client. The server has just
+ * become the current leader, but has not committed its first place-holder
+ * log entry yet. Thus the leader cannot accept any new client requests since
+ * it cannot determine whether a request is just a retry.
+ */
+public class LeaderNotReadyException extends RaftException {
+ public LeaderNotReadyException() {
+ this("The leader is not ready yet");
+ }
+
+ public LeaderNotReadyException(String msg) {
+ super(msg);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/5565425f/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/HadoopClientRequestSender.java
----------------------------------------------------------------------
diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/HadoopClientRequestSender.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/HadoopClientRequestSender.java
index 116be19..918a191 100644
--- a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/HadoopClientRequestSender.java
+++ b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/client/HadoopClientRequestSender.java
@@ -50,9 +50,12 @@ public class HadoopClientRequestSender implements RaftClientRequestSender {
return proxy.submitClientRequest(request);
}
} catch (RemoteException e) {
- throw e.unwrapRemoteException(StateMachineException.class,
+ throw e.unwrapRemoteException(
+ StateMachineException.class,
ReconfigurationTimeoutException.class,
- ReconfigurationInProgressException.class, RaftException.class);
+ ReconfigurationInProgressException.class,
+ RaftException.class,
+ LeaderNotReadyException.class);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/5565425f/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
index d5d6adc..1750a04 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
@@ -53,6 +53,7 @@ import org.apache.ratis.server.storage.RaftLog;
import org.apache.ratis.shaded.proto.RaftProtos.LeaderNoOp;
import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.statemachine.TransactionContext;
+import org.apache.ratis.util.CodeInjectionForTesting;
import org.apache.ratis.util.Daemon;
import org.apache.ratis.util.ProtoUtils;
import org.apache.ratis.util.Timestamp;
@@ -70,6 +71,7 @@ import com.google.common.base.Preconditions;
*/
public class LeaderState {
private static final Logger LOG = RaftServerImpl.LOG;
+ public static final String APPEND_PLACEHOLDER = LeaderState.class.getSimpleName() + ".placeholder";
enum StateUpdateEventType {
STEPDOWN, UPDATECOMMIT, STAGINGPROGRESS
@@ -113,6 +115,7 @@ public class LeaderState {
private final int stagingCatchupGap;
private final int snapshotChunkMaxSize;
private final int syncInterval;
+ private final long placeHolderIndex;
LeaderState(RaftServerImpl server, RaftProperties properties) {
this.server = server;
@@ -137,11 +140,11 @@ public class LeaderState {
final RaftConfiguration conf = server.getRaftConf();
Collection<RaftPeer> others = conf.getOtherPeers(state.getSelfId());
final Timestamp t = new Timestamp().addTimeMs(-server.getMaxTimeoutMs());
- final long nextIndex = raftLog.getNextIndex();
+ placeHolderIndex = raftLog.getNextIndex();
senders = new ArrayList<>(others.size());
for (RaftPeer p : others) {
- FollowerInfo f = new FollowerInfo(p, t, nextIndex, true);
+ FollowerInfo f = new FollowerInfo(p, t, placeHolderIndex, true);
senders.add(server.getFactory().newLogAppender(server, this, f));
}
voterLists = divideFollowers(conf);
@@ -156,12 +159,18 @@ public class LeaderState {
.setTerm(server.getState().getCurrentTerm())
.setIndex(raftLog.getNextIndex())
.setNoOp(LeaderNoOp.newBuilder()).build();
+ CodeInjectionForTesting.execute(APPEND_PLACEHOLDER,
+ server.getId().toString(), null);
raftLog.append(placeHolder);
processor.start();
startSenders();
}
+ boolean isReady() {
+ return server.getState().getLastAppliedIndex() >= placeHolderIndex;
+ }
+
private void startSenders() {
senders.forEach(Thread::start);
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/5565425f/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 7d9e049..152d6a5 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -31,6 +31,7 @@ import java.util.concurrent.ExecutionException;
import java.util.function.Supplier;
import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.LeaderNotReadyException;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.NotLeaderException;
import org.apache.ratis.protocol.RaftClientReply;
@@ -347,6 +348,12 @@ public class RaftServerImpl implements RaftServer {
CompletableFuture<RaftClientReply> future = new CompletableFuture<>();
future.complete(new RaftClientReply(request, exception));
return future;
+ } else {
+ if (leaderState == null || !leaderState.isReady()) {
+ CompletableFuture<RaftClientReply> future = new CompletableFuture<>();
+ future.completeExceptionally(new LeaderNotReadyException());
+ return future;
+ }
}
return null;
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/5565425f/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
index 993d861..682849f 100644
--- a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
+++ b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
@@ -26,6 +26,7 @@ import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.RaftServerRpc;
import org.apache.ratis.server.impl.DelayLocalExecutionInjection;
+import org.apache.ratis.server.impl.LeaderState;
import org.apache.ratis.server.impl.RaftConfiguration;
import org.apache.ratis.server.impl.RaftServerImpl;
import org.apache.ratis.server.impl.ServerImplUtils;
@@ -51,6 +52,8 @@ public abstract class MiniRaftCluster {
public static final Logger LOG = LoggerFactory.getLogger(MiniRaftCluster.class);
public static final DelayLocalExecutionInjection logSyncDelay =
new DelayLocalExecutionInjection(RaftLog.LOG_SYNC);
+ public static final DelayLocalExecutionInjection leaderPlaceHolderDelay =
+ new DelayLocalExecutionInjection(LeaderState.APPEND_PLACEHOLDER);
public static final String CLASS_NAME = MiniRaftCluster.class.getSimpleName();
public static final String STATEMACHINE_CLASS_KEY = CLASS_NAME + ".statemachine.class";
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/5565425f/ratis-server/src/test/java/org/apache/ratis/server/impl/DelayLocalExecutionInjection.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/DelayLocalExecutionInjection.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/DelayLocalExecutionInjection.java
index 1818722..6df6176 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/DelayLocalExecutionInjection.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/DelayLocalExecutionInjection.java
@@ -29,8 +29,10 @@ import java.util.concurrent.atomic.AtomicInteger;
public class DelayLocalExecutionInjection implements CodeInjectionForTesting.Code {
private final Map<String, AtomicInteger> delays = new ConcurrentHashMap<>();
- public DelayLocalExecutionInjection(String method) {
- CodeInjectionForTesting.put(method, this);
+ public DelayLocalExecutionInjection(String... methods) {
+ for (String method : methods) {
+ CodeInjectionForTesting.put(method, this);
+ }
}
public void clear() {
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/5565425f/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
index e07f5cb..5b46af8 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
@@ -46,6 +46,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import static java.util.Arrays.asList;
+import static org.apache.ratis.MiniRaftCluster.leaderPlaceHolderDelay;
import static org.apache.ratis.MiniRaftCluster.logSyncDelay;
import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_SEQNUM;
import static org.apache.ratis.server.impl.RaftServerTestUtil.waitAndCheckNewConf;
@@ -509,7 +510,6 @@ public abstract class RaftReconfigurationBaseTest {
@Test
public void testRevertConfigurationChange() throws Exception {
LOG.info("Start testRevertConfigurationChange");
- // originally 3 peers
final MiniRaftCluster cluster = getCluster(5);
try {
cluster.start();
@@ -575,4 +575,58 @@ public abstract class RaftReconfigurationBaseTest {
cluster.shutdown();
}
}
+
+ /**
+ * Delay the commit of the leader placeholder log entry and see if the client
+ * can correctly receive and handle the LeaderNotReadyException.
+ */
+ @Test
+ public void testLeaderNotReadyException() throws Exception {
+ LOG.info("Start testLeaderNotReadyException");
+ final MiniRaftCluster cluster = getCluster(1);
+ final RaftPeerId leaderId = cluster.getPeers().iterator().next().getId();
+ try {
+ // delay 1s for each logSync call
+ cluster.getPeers().forEach(
+ peer -> leaderPlaceHolderDelay.setDelayMs(peer.getId().toString(), 2000));
+ cluster.start();
+
+ AtomicBoolean caughtNotReady = new AtomicBoolean(false);
+ AtomicBoolean success = new AtomicBoolean(false);
+ new Thread(() -> {
+ final RaftClient client = cluster.createClient(leaderId);
+ final RaftClientRequestSender sender = client.getRequestSender();
+
+ final RaftClientRequest request = new RaftClientRequest(client.getId(),
+ leaderId, 0, new SimpleMessage("test"));
+ while (!success.get()) {
+ try {
+ RaftClientReply reply = sender.sendRequest(request);
+ success.set(reply.isSuccess());
+ } catch (LeaderNotReadyException e) {
+ LOG.info("Hit LeaderNotReadyException", e);
+ caughtNotReady.set(true);
+ } catch (IOException e) {
+ LOG.info("Hit other IOException", e);
+ }
+ if (!success.get()) {
+ try {
+ Thread.sleep(200);
+ } catch (InterruptedException ignored) {
+ }
+ }
+ }
+ }).start();
+
+ RaftTestUtil.waitForLeader(cluster);
+ for (int i = 0; !success.get() && i < 5; i++) {
+ Thread.sleep(1000);
+ }
+ Assert.assertTrue(success.get());
+ Assert.assertTrue(caughtNotReady.get());
+ } finally {
+ leaderPlaceHolderDelay.clear();
+ cluster.shutdown();
+ }
+ }
}