You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2017/08/23 18:52:17 UTC
incubator-ratis git commit: RATIS-109. Improve the log messages in
RaftServerImpl and the related code.
Repository: incubator-ratis
Updated Branches:
refs/heads/master 704072c5c -> ee262d7e3
RATIS-109. Improve the log messages in RaftServerImpl and the related code.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/ee262d7e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/ee262d7e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/ee262d7e
Branch: refs/heads/master
Commit: ee262d7e3daefe323900146beb460b20b1a72869
Parents: 704072c
Author: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Authored: Wed Aug 23 11:51:42 2017 -0700
Committer: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Committed: Wed Aug 23 11:51:42 2017 -0700
----------------------------------------------------------------------
.../java/org/apache/ratis/protocol/RaftId.java | 2 +-
.../ratis/protocol/ServerNotReadyException.java | 27 +++++
.../arithmetic/ArithmeticStateMachine.java | 14 ++-
.../ratis/examples/RaftExamplesTestUtil.java | 5 +-
.../examples/arithmetic/TestArithmetic.java | 29 +++--
.../TestRaftStateMachineException.java | 2 +-
.../apache/ratis/server/impl/FollowerState.java | 3 +-
.../ratis/server/impl/LeaderElection.java | 2 +-
.../apache/ratis/server/impl/LeaderState.java | 8 +-
.../ratis/server/impl/PendingRequest.java | 2 +-
.../ratis/server/impl/RaftConfiguration.java | 6 +-
.../ratis/server/impl/RaftServerImpl.java | 110 ++++++++++---------
.../ratis/server/impl/ServerImplUtils.java | 6 +-
.../ratis/server/impl/ServerProtoUtils.java | 17 ++-
.../apache/ratis/server/impl/ServerState.java | 15 +--
.../ratis/server/impl/StateMachineUpdater.java | 11 +-
.../apache/ratis/server/protocol/TermIndex.java | 10 ++
.../ratis/server/storage/RaftLogWorker.java | 2 +-
.../statemachine/SimpleStateMachineStorage.java | 1 +
.../java/org/apache/ratis/MiniRaftCluster.java | 11 +-
.../impl/RaftReconfigurationBaseTest.java | 7 +-
21 files changed, 185 insertions(+), 105 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ee262d7e/ratis-common/src/main/java/org/apache/ratis/protocol/RaftId.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftId.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftId.java
index ebf9f75..0846856 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftId.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftId.java
@@ -31,7 +31,7 @@ public abstract class RaftId {
private static void checkLength(int length, String name) {
Preconditions.assertTrue(length == BYTE_LENGTH,
- " = %s != BYTE_LENGTH = %s", name, length, BYTE_LENGTH);
+ "%s = %s != BYTE_LENGTH = %s", name, length, BYTE_LENGTH);
}
private static UUID toUuid(ByteString bytes) {
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ee262d7e/ratis-common/src/main/java/org/apache/ratis/protocol/ServerNotReadyException.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/ServerNotReadyException.java b/ratis-common/src/main/java/org/apache/ratis/protocol/ServerNotReadyException.java
new file mode 100644
index 0000000..80307e6
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/ServerNotReadyException.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.protocol;
+
+/**
+ * The server is not ready yet.
+ */
+public class ServerNotReadyException extends RaftException {
+ public ServerNotReadyException(String message) {
+ super(message);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ee262d7e/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/ArithmeticStateMachine.java
----------------------------------------------------------------------
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/ArithmeticStateMachine.java b/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/ArithmeticStateMachine.java
index a4199b6..cf61df1 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/ArithmeticStateMachine.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/ArithmeticStateMachine.java
@@ -88,8 +88,8 @@ public class ArithmeticStateMachine extends BaseStateMachine {
last = latestTermIndex.get();
}
- File snapshotFile = new File(SimpleStateMachineStorage.getSnapshotFileName(
- last.getTerm(), last.getIndex()));
+ final File snapshotFile = storage.getSnapshotFile(last.getTerm(), last.getIndex());
+ LOG.info("Taking a snapshot to file {}", snapshotFile);
try(final ObjectOutputStream out = new ObjectOutputStream(
new BufferedOutputStream(new FileOutputStream(snapshotFile)))) {
@@ -107,12 +107,16 @@ public class ArithmeticStateMachine extends BaseStateMachine {
}
private long load(SingleFileSnapshotInfo snapshot, boolean reload) throws IOException {
- if (snapshot == null || !snapshot.getFile().getPath().toFile().exists()) {
- LOG.warn("The snapshot file {} does not exist", snapshot);
+ if (snapshot == null) {
+ LOG.warn("The snapshot info is null.");
+ return RaftServerConstants.INVALID_LOG_INDEX;
+ }
+ final File snapshotFile = snapshot.getFile().getPath().toFile();
+ if (!snapshotFile.exists()) {
+ LOG.warn("The snapshot file {} does not exist for snapshot {}", snapshotFile, snapshot);
return RaftServerConstants.INVALID_LOG_INDEX;
}
- File snapshotFile =snapshot.getFile().getPath().toFile();
final TermIndex last = SimpleStateMachineStorage.getTermIndexFromSnapshotFile(snapshotFile);
try(final AutoCloseableLock writeLock = writeLock();
final ObjectInputStream in = new ObjectInputStream(
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ee262d7e/ratis-examples/src/test/java/org/apache/ratis/examples/RaftExamplesTestUtil.java
----------------------------------------------------------------------
diff --git a/ratis-examples/src/test/java/org/apache/ratis/examples/RaftExamplesTestUtil.java b/ratis-examples/src/test/java/org/apache/ratis/examples/RaftExamplesTestUtil.java
index 7804353..786ac48 100644
--- a/ratis-examples/src/test/java/org/apache/ratis/examples/RaftExamplesTestUtil.java
+++ b/ratis-examples/src/test/java/org/apache/ratis/examples/RaftExamplesTestUtil.java
@@ -80,10 +80,11 @@ public class RaftExamplesTestUtil {
}
public static <S extends StateMachine> Collection<Object[]> getMiniRaftClusters(
- Class<S> stateMachineClass, Class<?>... clusterClasses) throws IOException {
+ Class<S> stateMachineClass, int clusterSize, Class<?>... clusterClasses)
+ throws IOException {
final RaftProperties prop = new RaftProperties();
prop.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
stateMachineClass, StateMachine.class);
- return getMiniRaftClusters(prop, 3, clusterClasses);
+ return getMiniRaftClusters(prop, clusterSize, clusterClasses);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ee262d7e/ratis-examples/src/test/java/org/apache/ratis/examples/arithmetic/TestArithmetic.java
----------------------------------------------------------------------
diff --git a/ratis-examples/src/test/java/org/apache/ratis/examples/arithmetic/TestArithmetic.java b/ratis-examples/src/test/java/org/apache/ratis/examples/arithmetic/TestArithmetic.java
index 6a16316..feff88f 100644
--- a/ratis-examples/src/test/java/org/apache/ratis/examples/arithmetic/TestArithmetic.java
+++ b/ratis-examples/src/test/java/org/apache/ratis/examples/arithmetic/TestArithmetic.java
@@ -17,7 +17,6 @@
*/
package org.apache.ratis.examples.arithmetic;
-
import org.apache.log4j.Level;
import org.apache.ratis.BaseTest;
import org.apache.ratis.MiniRaftCluster;
@@ -26,8 +25,8 @@ import org.apache.ratis.client.RaftClient;
import org.apache.ratis.examples.RaftExamplesTestUtil;
import org.apache.ratis.examples.arithmetic.expression.*;
import org.apache.ratis.protocol.RaftClientReply;
-import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.util.LogUtils;
+import org.apache.ratis.util.Preconditions;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -38,13 +37,13 @@ import java.util.Collection;
@RunWith(Parameterized.class)
public class TestArithmetic extends BaseTest {
- static {
+ {
LogUtils.setLogLevel(ArithmeticStateMachine.LOG, Level.ALL);
}
@Parameterized.Parameters
public static Collection<Object[]> data() throws IOException {
- return RaftExamplesTestUtil.getMiniRaftClusters(ArithmeticStateMachine.class);
+ return RaftExamplesTestUtil.getMiniRaftClusters(ArithmeticStateMachine.class, 3);
}
@Parameterized.Parameter
@@ -53,9 +52,20 @@ public class TestArithmetic extends BaseTest {
@Test
public void testPythagorean() throws Exception {
cluster.start();
- RaftTestUtil.waitForLeader(cluster);
- final RaftPeerId leaderId = cluster.getLeader().getId();
- final RaftClient client = cluster.createClient(leaderId);
+ try {
+ RaftTestUtil.waitForLeader(cluster);
+ try (final RaftClient client = cluster.createClient()) {
+ runTestPythagorean(client, 3, 100);
+ }
+ } finally {
+ cluster.shutdown();
+ }
+ }
+
+ public static void runTestPythagorean(
+ RaftClient client, int start, int count) throws IOException {
+ Preconditions.assertTrue(count > 0, () -> "count = " + count + " <= 0");
+ Preconditions.assertTrue(start >= 2, () -> "start = " + start + " < 2");
final Variable a = new Variable("a");
final Variable b = new Variable("b");
@@ -70,7 +80,8 @@ public class TestArithmetic extends BaseTest {
final AssignmentMessage nullB = new AssignmentMessage(b, NullValue.getInstance());
final AssignmentMessage nullC = new AssignmentMessage(c, NullValue.getInstance());
- for(int n = 3; n < 100; n += 2) {
+ final int end = start + 2*count;
+ for(int n = (start & 1) == 0? start + 1: start; n < end; n += 2) {
int n2 = n*n;
int half_n2 = n2/2;
@@ -93,8 +104,6 @@ public class TestArithmetic extends BaseTest {
r = client.send(nullC);
assertRaftClientReply(r, null);
}
- client.close();
- cluster.shutdown();
}
static void assertRaftClientReply(RaftClientReply reply, Double expected) {
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ee262d7e/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java
----------------------------------------------------------------------
diff --git a/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java b/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java
index f893cd3..d84a369 100644
--- a/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java
+++ b/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java
@@ -73,7 +73,7 @@ public class TestRaftStateMachineException extends BaseTest {
@Parameterized.Parameters
public static Collection<Object[]> data() throws IOException {
return RaftExamplesTestUtil.getMiniRaftClusters(
- StateMachineWithException.class);
+ StateMachineWithException.class, 3);
}
@Parameterized.Parameter
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ee262d7e/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java
index 1e57fa2..0a44e2f 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java
@@ -20,12 +20,13 @@ package org.apache.ratis.server.impl;
import org.apache.ratis.util.Daemon;
import org.apache.ratis.util.Timestamp;
import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Used when the peer is a follower. Used to track the election timeout.
*/
class FollowerState extends Daemon {
- static final Logger LOG = RaftServerImpl.LOG;
+ static final Logger LOG = LoggerFactory.getLogger(FollowerState.class);
private final RaftServerImpl server;
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ee262d7e/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
index 8b8e4ff..8c49005 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
@@ -227,7 +227,7 @@ class LeaderElection extends Daemon {
}
}
} catch(ExecutionException e) {
- LOG.info("Got exception when requesting votes: " + e);
+ LOG.info("{} got exception when requesting votes: {}", server.getId(), e);
LOG.trace("TRACE", e);
exceptions.add(e);
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ee262d7e/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
index 313a3bb..e711774 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
@@ -182,7 +182,7 @@ public class LeaderState {
try {
pendingRequests.sendNotLeaderResponses();
} catch (IOException e) {
- LOG.warn("Caught exception in sendNotLeaderResponses", e);
+ LOG.warn(server.getId() + ": Caught exception in sendNotLeaderResponses", e);
}
}
@@ -549,12 +549,6 @@ public class LeaderState {
return lists;
}
- PendingRequest returnNoConfChange(SetConfigurationRequest r) {
- PendingRequest pending = new PendingRequest(r);
- pending.setSuccessReply(null);
- return pending;
- }
-
void replyPendingRequest(long logIndex, RaftClientReply reply) {
pendingRequests.replyPendingRequest(logIndex, reply);
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ee262d7e/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java
index 59f870e..f1909d4 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java
@@ -27,7 +27,7 @@ import org.apache.ratis.util.Preconditions;
import java.util.concurrent.CompletableFuture;
public class PendingRequest implements Comparable<PendingRequest> {
- private final Long index;
+ private final long index;
private final RaftClientRequest request;
private final TransactionContext entry;
private final CompletableFuture<RaftClientReply> future;
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ee262d7e/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfiguration.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfiguration.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfiguration.java
index 6ce7ecd..0034f4e 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfiguration.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfiguration.java
@@ -156,6 +156,10 @@ public class RaftConfiguration {
return oldConf != null && oldConf.contains(peerId);
}
+ /**
+ * @return true iff the given peer is contained in conf and,
+ * if old conf exists, is contained in old conf.
+ */
boolean contains(RaftPeerId peerId) {
return containsInConf(peerId) &&
(oldConf == null || containsInOldConf(peerId));
@@ -211,7 +215,7 @@ public class RaftConfiguration {
@Override
public String toString() {
- return conf + (oldConf != null ? "old:" + oldConf : "");
+ return conf + ", old=" + oldConf;
}
boolean hasNoChange(RaftPeer[] newMembers) {
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ee262d7e/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 9d7788e..9e9215b 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -91,6 +91,7 @@ public class RaftServerImpl implements RaftServerProtocol,
RaftServerImpl(RaftPeerId id, RaftGroup group, RaftServerProxy proxy,
RaftProperties properties) throws IOException {
+ LOG.debug("new RaftServerImpl {}, {}", id , group);
this.groupId = group.getGroupId();
this.lifeCycle = new LifeCycle(id);
minTimeoutMs = RaftServerConfigKeys.Rpc.timeoutMin(properties).toInt(TimeUnit.MILLISECONDS);
@@ -150,15 +151,20 @@ public class RaftServerImpl implements RaftServerProtocol,
return proxy.getServerRpc();
}
+ private void setRole(Role newRole) {
+ LOG.debug("{} changes role from {} to {}", getId(), this.role, newRole);
+ this.role = newRole;
+ }
+
void start() {
lifeCycle.transition(STARTING);
state.start();
RaftConfiguration conf = getRaftConf();
if (conf != null && conf.contains(getId())) {
- LOG.debug("{} starts as a follower", getId());
+ LOG.debug("{} starts as a follower, conf={}", getId(), conf);
startAsFollower();
} else {
- LOG.debug("{} starts with initializing state", getId());
+ LOG.debug("{} starts with initializing state, conf={}", getId(), conf);
startInitializing();
}
registerMBean();
@@ -181,10 +187,8 @@ public class RaftServerImpl implements RaftServerProtocol,
* The peer belongs to the current configuration, should start as a follower
*/
private void startAsFollower() {
- role = Role.FOLLOWER;
- heartbeatMonitor = new FollowerState(this);
- heartbeatMonitor.start();
-
+ setRole(Role.FOLLOWER);
+ startHeartbeatMonitor();
lifeCycle.transition(RUNNING);
}
@@ -194,7 +198,7 @@ public class RaftServerImpl implements RaftServerProtocol,
* start election.
*/
private void startInitializing() {
- role = Role.FOLLOWER;
+ setRole(Role.FOLLOWER);
// do not start heartbeatMonitoring
}
@@ -223,7 +227,7 @@ public class RaftServerImpl implements RaftServerProtocol,
LOG.warn("Failed to shutdown election daemon for " + getId(), ignored);
}
try{
- shutdownLeaderState();
+ shutdownLeaderState(true);
} catch (Exception ignored) {
LOG.warn("Failed to shutdown leader state monitor for " + getId(), ignored);
}
@@ -262,25 +266,16 @@ public class RaftServerImpl implements RaftServerProtocol,
synchronized boolean changeToFollower(long newTerm, boolean sync)
throws IOException {
final Role old = role;
- role = Role.FOLLOWER;
-
- boolean metadataUpdated = false;
- if (newTerm > state.getCurrentTerm()) {
- state.setCurrentTerm(newTerm);
- state.resetLeaderAndVotedFor();
- metadataUpdated = true;
- }
-
- if (old == Role.LEADER) {
- assert leaderState != null;
- shutdownLeaderState();
- } else if (old == Role.CANDIDATE) {
- shutdownElectionDaemon();
- }
+ final boolean metadataUpdated = state.updateCurrentTerm(newTerm);
if (old != Role.FOLLOWER) {
- heartbeatMonitor = new FollowerState(this);
- heartbeatMonitor.start();
+ setRole(Role.FOLLOWER);
+ if (old == Role.LEADER) {
+ shutdownLeaderState(false);
+ } else if (old == Role.CANDIDATE) {
+ shutdownElectionDaemon();
+ }
+ startHeartbeatMonitor();
}
if (metadataUpdated && sync) {
@@ -289,12 +284,15 @@ public class RaftServerImpl implements RaftServerProtocol,
return metadataUpdated;
}
- private synchronized void shutdownLeaderState() {
- final LeaderState leader = leaderState;
- if (leader != null) {
- leader.stop();
+ private synchronized void shutdownLeaderState(boolean allowNull) {
+ if (leaderState == null) {
+ if (!allowNull) {
+ throw new NullPointerException("leaderState == null");
+ }
+ } else {
+ leaderState.stop();
+ leaderState = null;
}
- leaderState = null;
// TODO: make sure that StateMachineUpdater has applied all transactions that have context
}
@@ -310,13 +308,20 @@ public class RaftServerImpl implements RaftServerProtocol,
synchronized void changeToLeader() {
Preconditions.assertTrue(isCandidate());
shutdownElectionDaemon();
- role = Role.LEADER;
+ setRole(Role.LEADER);
state.becomeLeader();
// start sending AppendEntries RPC to followers
leaderState = new LeaderState(this, getProxy().getProperties());
leaderState.start();
}
+ private void startHeartbeatMonitor() {
+ Preconditions.assertTrue(heartbeatMonitor == null, "heartbeatMonitor != null");
+ LOG.debug("{} starts heartbeatMonitor", getId());
+ heartbeatMonitor = new FollowerState(this);
+ heartbeatMonitor.start();
+ }
+
private void shutdownHeartbeatMonitor() {
final FollowerState hm = heartbeatMonitor;
if (hm != null) {
@@ -329,7 +334,7 @@ public class RaftServerImpl implements RaftServerProtocol,
synchronized void changeToCandidate() {
Preconditions.assertTrue(isFollower());
shutdownHeartbeatMonitor();
- role = Role.CANDIDATE;
+ setRole(Role.CANDIDATE);
// start election
electionDaemon = new LeaderElection(this);
electionDaemon.start();
@@ -387,9 +392,9 @@ public class RaftServerImpl implements RaftServerProtocol,
peers.toArray(new RaftPeer[peers.size()]));
}
- private void assertLifeCycleState(LifeCycle.State... expected) throws IOException {
- lifeCycle.assertCurrentState((n, c) -> new IOException("Server " + n
- + " is not " + Arrays.asList(expected) + ": current state is " + c),
+ private void assertLifeCycleState(LifeCycle.State... expected) throws ServerNotReadyException {
+ lifeCycle.assertCurrentState((n, c) -> new ServerNotReadyException("Server " + n
+ + " is not " + Arrays.toString(expected) + ": current state is " + c),
expected);
}
@@ -534,9 +539,10 @@ public class RaftServerImpl implements RaftServerProtocol,
"Reconfiguration is already in progress: " + current);
}
- // return true if the new configuration is the same with the current one
+ // return success with a null message if the new conf is the same as the current
if (current.hasNoChange(peersInNewConf)) {
- pending = leaderState.returnNoConfChange(request);
+ pending = new PendingRequest(request);
+ pending.setSuccessReply(null);
return pending.getFuture();
}
@@ -574,18 +580,20 @@ public class RaftServerImpl implements RaftServerProtocol,
@Override
public RequestVoteReplyProto requestVote(RequestVoteRequestProto r)
throws IOException {
- final RaftPeerId candidateId =
- RaftPeerId.valueOf(r.getServerRequest().getRequestorId());
- return requestVote(candidateId, r.getCandidateTerm(),
+ final RaftRpcRequestProto request = r.getServerRequest();
+ return requestVote(RaftPeerId.valueOf(request.getRequestorId()),
+ ProtoUtils.toRaftGroupId(request.getRaftGroupId()),
+ r.getCandidateTerm(),
ServerProtoUtils.toTermIndex(r.getCandidateLastEntry()));
}
- private RequestVoteReplyProto requestVote(RaftPeerId candidateId,
+ private RequestVoteReplyProto requestVote(
+ RaftPeerId candidateId, RaftGroupId candidateGroupId,
long candidateTerm, TermIndex candidateLastEntry) throws IOException {
CodeInjectionForTesting.execute(REQUEST_VOTE, getId(),
candidateId, candidateTerm, candidateLastEntry);
- LOG.debug("{}: receive requestVote({}, {}, {})",
- getId(), candidateId, candidateTerm, candidateLastEntry);
+ LOG.debug("{}: receive requestVote({}, {}, {}, {})",
+ getId(), candidateId, candidateGroupId, candidateTerm, candidateLastEntry);
assertLifeCycleState(RUNNING);
boolean voteGranted = false;
@@ -655,11 +663,13 @@ public class RaftServerImpl implements RaftServerProtocol,
public AppendEntriesReplyProto appendEntries(AppendEntriesRequestProto r)
throws IOException {
// TODO avoid converting list to array
+ final RaftRpcRequestProto request = r.getServerRequest();
final LogEntryProto[] entries = r.getEntriesList()
.toArray(new LogEntryProto[r.getEntriesCount()]);
final TermIndex previous = r.hasPreviousLog() ?
ServerProtoUtils.toTermIndex(r.getPreviousLog()) : null;
- return appendEntries(RaftPeerId.valueOf(r.getServerRequest().getRequestorId()),
+ return appendEntries(RaftPeerId.valueOf(request.getRequestorId()),
+ ProtoUtils.toRaftGroupId(request.getRaftGroupId()),
r.getLeaderTerm(), previous, r.getLeaderCommit(), r.getInitializing(),
entries);
}
@@ -676,14 +686,15 @@ public class RaftServerImpl implements RaftServerProtocol,
}
}
- private AppendEntriesReplyProto appendEntries(RaftPeerId leaderId, long leaderTerm,
+ private AppendEntriesReplyProto appendEntries(
+ RaftPeerId leaderId, RaftGroupId leaderGroupId, long leaderTerm,
TermIndex previous, long leaderCommit, boolean initializing,
LogEntryProto... entries) throws IOException {
CodeInjectionForTesting.execute(APPEND_ENTRIES, getId(),
leaderId, leaderTerm, previous, leaderCommit, initializing, entries);
final boolean isHeartbeat = entries.length == 0;
logAppendEntries(isHeartbeat,
- () -> getId() + ": receive appendEntries(" + leaderId + ", "
+ () -> getId() + ": receive appendEntries(" + leaderId + ", " + leaderGroupId + ", "
+ leaderTerm + ", " + previous + ", " + leaderCommit + ", "
+ initializing + ServerProtoUtils.toString(entries));
@@ -713,8 +724,7 @@ public class RaftServerImpl implements RaftServerProtocol,
state.setLeader(leaderId);
if (!initializing && lifeCycle.compareAndTransition(STARTING, RUNNING)) {
- heartbeatMonitor = new FollowerState(this);
- heartbeatMonitor.start();
+ startHeartbeatMonitor();
}
if (lifeCycle.getCurrentState() == RUNNING) {
heartbeatMonitor.updateLastRpcTime(true);
@@ -780,8 +790,8 @@ public class RaftServerImpl implements RaftServerProtocol,
@Override
public InstallSnapshotReplyProto installSnapshot(
InstallSnapshotRequestProto request) throws IOException {
- final RaftPeerId leaderId = RaftPeerId.valueOf(
- request.getServerRequest().getRequestorId());
+ final RaftRpcRequestProto r = request.getServerRequest();
+ final RaftPeerId leaderId = RaftPeerId.valueOf(r.getRequestorId());
CodeInjectionForTesting.execute(INSTALL_SNAPSHOT, getId(),
leaderId, request);
LOG.debug("{}: receive installSnapshot({})", getId(), request);
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ee262d7e/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
index 3c617f1..bcbab9a 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
@@ -81,13 +81,9 @@ public class ServerImplUtils {
return Long.hashCode(term) ^ Long.hashCode(index);
}
- private static String toString(long n) {
- return n < 0 ? "~" : "" + n;
- }
-
@Override
public String toString() {
- return "(t:" + toString(term) + ", i:" + toString(index) + ")";
+ return TermIndex.toString(term, index);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ee262d7e/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
index 845a6ca..193ac83 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
@@ -24,9 +24,11 @@ import java.util.List;
import java.util.stream.Collectors;
import org.apache.ratis.client.impl.ClientProtoUtils;
+import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.shaded.com.google.protobuf.ByteString;
import org.apache.ratis.shaded.proto.RaftProtos.*;
import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesReplyProto.*;
import org.apache.ratis.util.ProtoUtils;
@@ -50,11 +52,22 @@ public class ServerProtoUtils {
TermIndex.newTermIndex(entry.getTerm(), entry.getIndex());
}
+ public static String toTermIndexString(LogEntryProto entry) {
+ return TermIndex.toString(entry.getTerm(), entry.getIndex());
+ }
+
+ private static String toLogEntryString(LogEntryProto entry) {
+ final ByteString clientId = entry.getClientId();
+ return toTermIndexString(entry) + entry.getLogEntryBodyCase()
+ + ", " + (clientId.isEmpty()? "<empty clientId>": new ClientId(clientId))
+ + ", callId=" + entry.getCallId();
+ }
+
public static String toString(LogEntryProto... entries) {
return entries == null? "null"
: entries.length == 0 ? "[]"
- : entries.length == 1? "" + toTermIndex(entries[0])
- : "" + Arrays.stream(entries).map(ServerProtoUtils::toTermIndex)
+ : entries.length == 1? toLogEntryString(entries[0])
+ : "" + Arrays.stream(entries).map(ServerProtoUtils::toLogEntryString)
.collect(Collectors.toList());
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ee262d7e/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index 5db3509..e83a931 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -165,8 +165,14 @@ public class ServerState implements Closeable {
return currentTerm;
}
- void setCurrentTerm(long term) {
- currentTerm = term;
+ boolean updateCurrentTerm(long newTerm) {
+ if (newTerm > currentTerm) {
+ currentTerm = newTerm;
+ votedFor = null;
+ leaderId = null;
+ return true;
+ }
+ return false;
}
RaftPeerId getLeaderId() {
@@ -190,11 +196,6 @@ public class ServerState implements Closeable {
this.log.writeMetadata(currentTerm, votedFor);
}
- void resetLeaderAndVotedFor() {
- votedFor = null;
- leaderId = null;
- }
-
/**
* Vote for a candidate and update the local state.
*/
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ee262d7e/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
index b9ed6a5..7ed6731 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
@@ -137,13 +137,18 @@ class StateMachineUpdater implements Runnable {
}
while (lastAppliedIndex < committedIndex) {
- final LogEntryProto next = raftLog.get(lastAppliedIndex + 1);
+ final long nextIndex = lastAppliedIndex + 1;
+ final LogEntryProto next = raftLog.get(nextIndex);
if (next != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("{}: applying nextIndex={}, nextLog={}",
+ this, nextIndex, ServerProtoUtils.toString(next));
+ }
server.applyLogToStateMachine(next);
- lastAppliedIndex++;
+ lastAppliedIndex = nextIndex;
} else {
LOG.debug("{}: logEntry {} is null. There may be snapshot to load. state:{}",
- this, lastAppliedIndex + 1, state);
+ this, nextIndex, state);
break;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ee262d7e/ratis-server/src/main/java/org/apache/ratis/server/protocol/TermIndex.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/protocol/TermIndex.java b/ratis-server/src/main/java/org/apache/ratis/server/protocol/TermIndex.java
index a16110f..fed14a3 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/protocol/TermIndex.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/protocol/TermIndex.java
@@ -19,6 +19,8 @@ package org.apache.ratis.server.protocol;
import org.apache.ratis.server.impl.ServerImplUtils;
+import java.util.function.LongFunction;
+
/** The term and the log index defined in the Raft consensus algorithm. */
public interface TermIndex extends Comparable<TermIndex> {
TermIndex[] EMPTY_TERMINDEX_ARRAY = {};
@@ -33,6 +35,14 @@ public interface TermIndex extends Comparable<TermIndex> {
static TermIndex newTermIndex(long term, long index) {
return ServerImplUtils.newTermIndex(term, index);
}
+
+ LongFunction<String> LONG_TO_STRING = n -> n >= 0L? String.valueOf(n): "~";
+
+ /** @return a string representing the given term and index. */
+ static String toString(long term, long index) {
+ return String.format("(t:%s, i:%s)",
+ LONG_TO_STRING.apply(term), LONG_TO_STRING.apply(index));
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ee262d7e/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java
index 1dc8ae1..0a90d91 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogWorker.java
@@ -44,7 +44,7 @@ import org.slf4j.LoggerFactory;
* raft peer.
*/
class RaftLogWorker implements Runnable {
- static final Logger LOG = RaftServerImpl.LOG;
+ static final Logger LOG = LoggerFactory.getLogger(RaftLogWorker.class);
/**
* The task queue accessed by rpc handler threads and the io worker thread.
*/
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ee262d7e/ratis-server/src/main/java/org/apache/ratis/statemachine/SimpleStateMachineStorage.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/SimpleStateMachineStorage.java b/ratis-server/src/main/java/org/apache/ratis/statemachine/SimpleStateMachineStorage.java
index 5a34271..05c5337 100644
--- a/ratis-server/src/main/java/org/apache/ratis/statemachine/SimpleStateMachineStorage.java
+++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/SimpleStateMachineStorage.java
@@ -52,6 +52,7 @@ public class SimpleStateMachineStorage implements StateMachineStorage {
private volatile SingleFileSnapshotInfo currentSnapshot = null;
+ @Override
public void init(RaftStorage raftStorage) throws IOException {
this.raftStorage = raftStorage;
this.smDir = raftStorage.getStorageDir().getStateMachineDir();
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ee262d7e/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
index f003127..31d16fe 100644
--- a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
+++ b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
@@ -28,7 +28,8 @@ import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.rpc.RpcType;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
-import org.apache.ratis.server.impl.*;
+import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.impl.RaftServerProxy;
import org.apache.ratis.server.storage.MemoryRaftLog;
import org.apache.ratis.server.storage.RaftLog;
import org.apache.ratis.statemachine.BaseStateMachine;
@@ -49,10 +50,6 @@ import java.util.stream.StreamSupport;
public abstract class MiniRaftCluster {
public static final Logger LOG = LoggerFactory.getLogger(MiniRaftCluster.class);
- public static final DelayLocalExecutionInjection logSyncDelay =
- new DelayLocalExecutionInjection(RaftLog.LOG_SYNC);
- public static final DelayLocalExecutionInjection leaderPlaceHolderDelay =
- new DelayLocalExecutionInjection(LeaderState.APPEND_PLACEHOLDER);
public static final String CLASS_NAME = MiniRaftCluster.class.getSimpleName();
public static final String STATEMACHINE_CLASS_KEY = CLASS_NAME + ".statemachine.class";
@@ -421,6 +418,10 @@ public abstract class MiniRaftCluster {
return group;
}
+ public RaftClient createClient() {
+ return createClient(null, group);
+ }
+
public RaftClient createClient(RaftPeerId leaderId) {
return createClient(leaderId, group);
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ee262d7e/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
index f1d640a..34ade52 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
@@ -45,8 +45,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import static java.util.Arrays.asList;
-import static org.apache.ratis.MiniRaftCluster.leaderPlaceHolderDelay;
-import static org.apache.ratis.MiniRaftCluster.logSyncDelay;
import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_CALLID;
import static org.apache.ratis.server.impl.RaftServerTestUtil.waitAndCheckNewConf;
import static org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto.LogEntryBodyCase.CONFIGURATIONENTRY;
@@ -57,6 +55,11 @@ public abstract class RaftReconfigurationBaseTest extends BaseTest {
LogUtils.setLogLevel(RaftClient.LOG, Level.DEBUG);
}
+ private static final DelayLocalExecutionInjection logSyncDelay =
+ new DelayLocalExecutionInjection(RaftLog.LOG_SYNC);
+ private static final DelayLocalExecutionInjection leaderPlaceHolderDelay =
+ new DelayLocalExecutionInjection(LeaderState.APPEND_PLACEHOLDER);
+
protected static final RaftProperties prop = new RaftProperties();
private static final ClientId clientId = ClientId.createId();