You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2018/02/15 06:49:40 UTC
incubator-ratis git commit: RATIS-205. Return commit information to
client.
Repository: incubator-ratis
Updated Branches:
refs/heads/master 3e69b13a2 -> 4104860b9
RATIS-205. Return commit information to client.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/4104860b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/4104860b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/4104860b
Branch: refs/heads/master
Commit: 4104860b9a1b79b997df4e39c3c70c7008422da4
Parents: 3e69b13
Author: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Authored: Thu Feb 15 14:49:00 2018 +0800
Committer: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Committed: Thu Feb 15 14:49:00 2018 +0800
----------------------------------------------------------------------
.../ratis/client/impl/ClientProtoUtils.java | 10 ++-
.../java/org/apache/ratis/protocol/Message.java | 4 +
.../apache/ratis/protocol/RaftClientReply.java | 46 +++++++---
.../org/apache/ratis/protocol/RaftGroup.java | 31 ++++---
.../ratis/protocol/ServerInformationReply.java | 30 +++----
.../java/org/apache/ratis/util/ProtoUtils.java | 23 +++++
.../ratis/grpc/server/GRpcLogAppender.java | 2 +
ratis-proto-shaded/src/main/proto/Raft.proto | 20 +++--
.../ratis/server/impl/CommitInfoCache.java | 52 +++++++++++
.../apache/ratis/server/impl/FollowerInfo.java | 21 ++++-
.../apache/ratis/server/impl/LeaderState.java | 15 ++++
.../apache/ratis/server/impl/LogAppender.java | 20 +++--
.../ratis/server/impl/PendingRequest.java | 6 +-
.../ratis/server/impl/PendingRequests.java | 2 +-
.../ratis/server/impl/RaftServerImpl.java | 90 ++++++++++++--------
.../ratis/server/impl/RaftServerProxy.java | 26 ++----
.../ratis/server/impl/ServerProtoUtils.java | 9 +-
.../apache/ratis/server/impl/ServerState.java | 37 ++++----
.../apache/ratis/server/storage/RaftLog.java | 5 +-
.../org/apache/ratis/RaftExceptionBaseTest.java | 2 +-
.../server/impl/ServerInformationBaseTest.java | 74 ++++++++++++++--
21 files changed, 374 insertions(+), 151 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4104860b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
----------------------------------------------------------------------
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
index 861dba6..9148633 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
@@ -22,7 +22,6 @@ import org.apache.ratis.shaded.com.google.protobuf.ByteString;
import org.apache.ratis.shaded.proto.RaftProtos.*;
import org.apache.ratis.util.ProtoUtils;
import org.apache.ratis.util.ReflectionUtils;
-import org.apache.ratis.util.StringUtils;
import java.util.Arrays;
@@ -109,6 +108,7 @@ public interface ClientProtoUtils {
if (reply.getMessage() != null) {
b.setMessage(toClientMessageEntryProtoBuilder(reply.getMessage()));
}
+ ProtoUtils.addCommitInfos(reply.getCommitInfos(), i -> b.addCommitInfos(i));
final NotLeaderException nle = reply.getNotLeaderException();
final StateMachineException sme;
@@ -146,6 +146,7 @@ public interface ClientProtoUtils {
if (reply.getRaftGroupId() != null) {
b.setGroup(ProtoUtils.toRaftGroupProtoBuilder(reply.getGroup()));
}
+ ProtoUtils.addCommitInfos(reply.getCommitInfos(), i -> b.addCommitInfos(i));
}
return b.build();
}
@@ -172,7 +173,8 @@ public interface ClientProtoUtils {
final RaftGroupId groupId = ProtoUtils.toRaftGroupId(rp.getRaftGroupId());
return new RaftClientReply(clientId, RaftPeerId.valueOf(rp.getReplyId()),
groupId, rp.getCallId(), rp.getSuccess(),
- toMessage(replyProto.getMessage()), e);
+ toMessage(replyProto.getMessage()), e,
+ replyProto.getCommitInfosList());
}
static ServerInformationReply toServerInformationReply(
@@ -182,8 +184,8 @@ public interface ClientProtoUtils {
final RaftGroupId groupId = ProtoUtils.toRaftGroupId(rp.getRaftGroupId());
final RaftGroup raftGroup = ProtoUtils.toRaftGroup(replyProto.getGroup());
return new ServerInformationReply(clientId, RaftPeerId.valueOf(rp.getReplyId()),
- groupId, rp.getCallId(), rp.getSuccess(), null,
- null, raftGroup);
+ groupId, rp.getCallId(), rp.getSuccess(),
+ replyProto.getCommitInfosList(), raftGroup);
}
static StateMachineException wrapStateMachineException(
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4104860b/ratis-common/src/main/java/org/apache/ratis/protocol/Message.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/Message.java b/ratis-common/src/main/java/org/apache/ratis/protocol/Message.java
index 4efd29c..9e8198e 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/Message.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/Message.java
@@ -47,6 +47,10 @@ public interface Message {
return valueOf(bytes, () -> "Message:" + StringUtils.bytes2HexShortString(bytes));
}
+ static Message valueOf(String string) {
+ return valueOf(ByteString.copyFromUtf8(string), () -> "Message:" + string);
+ }
+
Message EMPTY = valueOf(ByteString.EMPTY);
/**
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4104860b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
index 77a987d..af64d66 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
@@ -17,10 +17,16 @@
*/
package org.apache.ratis.protocol;
+import org.apache.ratis.shaded.proto.RaftProtos.CommitInfoProto;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.Preconditions;
+import org.apache.ratis.util.ProtoUtils;
import org.apache.ratis.util.ReflectionUtils;
+import java.util.Collection;
+import java.util.stream.Collector;
+import java.util.stream.Collectors;
+
/**
* Reply from server to client
*/
@@ -36,9 +42,13 @@ public class RaftClientReply extends RaftClientMessage {
private final RaftException exception;
private final Message message;
- public RaftClientReply(ClientId clientId, RaftPeerId serverId,
- RaftGroupId groupId, long callId, boolean success, Message message,
- RaftException exception) {
+ /** The commit information when the reply is created. */
+ private final Collection<CommitInfoProto> commitInfos;
+
+ public RaftClientReply(
+ ClientId clientId, RaftPeerId serverId, RaftGroupId groupId,
+ long callId, boolean success, Message message, RaftException exception,
+ Collection<CommitInfoProto> commitInfos) {
super(clientId, serverId, groupId);
this.success = success;
this.callId = callId;
@@ -52,17 +62,32 @@ public class RaftClientReply extends RaftClientMessage {
ReflectionUtils.isInstance(exception, NotLeaderException.class, StateMachineException.class),
() -> "Unexpected exception class: " + this);
}
+
+ this.commitInfos = commitInfos;
}
- public RaftClientReply(RaftClientRequest request,
- RaftException exception) {
+ public RaftClientReply(RaftClientRequest request, RaftException exception, Collection<CommitInfoProto> commitInfos) {
this(request.getClientId(), request.getServerId(), request.getRaftGroupId(),
- request.getCallId(), false, null, exception);
+ request.getCallId(), false, null, exception, commitInfos);
+ }
+
+ public RaftClientReply(RaftClientRequest request, Collection<CommitInfoProto> commitInfos) {
+ this(request, (Message) null, commitInfos);
}
- public RaftClientReply(RaftClientRequest request, Message message) {
+ public RaftClientReply(RaftClientRequest request, Message message, Collection<CommitInfoProto> commitInfos) {
this(request.getClientId(), request.getServerId(), request.getRaftGroupId(),
- request.getCallId(), true, message, null);
+ request.getCallId(), true, message, null, commitInfos);
+ }
+
+ /**
+ * Get the commit information for the entire group.
+ * The commit information may be unavailable for exception reply.
+ *
+ * @return the commit information if it is available; otherwise, return null.
+ */
+ public Collection<CommitInfoProto> getCommitInfos() {
+ return commitInfos;
}
@Override
@@ -76,8 +101,9 @@ public class RaftClientReply extends RaftClientMessage {
@Override
public String toString() {
- return super.toString() + ", cid=" + getCallId()
- + ", success? " + isSuccess() + ", exception=" + exception;
+ return super.toString() + ", cid=" + getCallId() + ", "
+ + (isSuccess()? "SUCCESS": "FAILED " + exception)
+ + ", commits" + ProtoUtils.toString(commitInfos);
}
public boolean isSuccess() {
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4104860b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroup.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroup.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroup.java
index f4a16c4..c119e32 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroup.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftGroup.java
@@ -17,8 +17,6 @@
*/
package org.apache.ratis.protocol;
-import org.apache.ratis.util.Preconditions;
-
import java.util.*;
/**
@@ -26,7 +24,7 @@ import java.util.*;
* peers.
*/
public class RaftGroup {
- private static RaftGroup EMPTY_GROUP = new RaftGroup(RaftGroupId.emptyGroupId(), Collections.emptyList());
+ private static RaftGroup EMPTY_GROUP = new RaftGroup(RaftGroupId.emptyGroupId());
public static RaftGroup emptyGroup() {
return EMPTY_GROUP;
@@ -35,32 +33,43 @@ public class RaftGroup {
/** UTF-8 string as id */
private final RaftGroupId groupId;
/** The group of raft peers */
- private final List<RaftPeer> peers;
+ private final Map<RaftPeerId, RaftPeer> peers;
public RaftGroup(RaftGroupId groupId) {
this(groupId, Collections.emptyList());
}
- public RaftGroup(RaftGroupId groupId, RaftPeer[] peers) {
+ public RaftGroup(RaftGroupId groupId, RaftPeer... peers) {
this(groupId, Arrays.asList(peers));
}
public RaftGroup(RaftGroupId groupId, Collection<RaftPeer> peers) {
- Preconditions.assertTrue(peers != null);
- this.groupId = groupId;
- this.peers = Collections.unmodifiableList(new ArrayList<>(peers));
+ this.groupId = Objects.requireNonNull(groupId, "groupId == null");
+
+ if (peers == null || peers.isEmpty()) {
+ this.peers = Collections.emptyMap();
+ } else {
+ final Map<RaftPeerId, RaftPeer> map = new HashMap<>();
+ peers.stream().forEach(p -> map.put(p.getId(), p));
+ this.peers = Collections.unmodifiableMap(map);
+ }
}
public RaftGroupId getGroupId() {
return groupId;
}
- public List<RaftPeer> getPeers() {
- return peers;
+ public Collection<RaftPeer> getPeers() {
+ return peers.values();
+ }
+
+ /** @return the peer with the given id if it is in this group; otherwise, return null. */
+ public RaftPeer getPeer(RaftPeerId id) {
+ return peers.get(Objects.requireNonNull(id, "id == null"));
}
@Override
public String toString() {
- return groupId + ":" + peers;
+ return groupId + ":" + peers.values();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4104860b/ratis-common/src/main/java/org/apache/ratis/protocol/ServerInformationReply.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/ServerInformationReply.java b/ratis-common/src/main/java/org/apache/ratis/protocol/ServerInformationReply.java
index d4257e1..d06b251 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/ServerInformationReply.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/ServerInformationReply.java
@@ -17,34 +17,30 @@
*/
package org.apache.ratis.protocol;
+import org.apache.ratis.shaded.proto.RaftProtos.CommitInfoProto;
+
+import java.util.Collection;
+
/**
* The response of server information request. Sent from server to client.
- *
- * TODO : currently, only information returned is the info of the group the
- * server belongs to.
*/
public class ServerInformationReply extends RaftClientReply {
- RaftGroup group;
+ private final RaftGroup group;
- public ServerInformationReply(RaftClientRequest request, Message message,
- RaftGroup group) {
- super(request, message);
+ public ServerInformationReply(
+ RaftClientRequest request, Collection<CommitInfoProto> commitInfos, RaftGroup group) {
+ super(request, commitInfos);
this.group = group;
}
- public ServerInformationReply(RaftClientRequest request,
- RaftException ex) {
- super(request, ex);
+ public ServerInformationReply(
+ ClientId clientId, RaftPeerId serverId, RaftGroupId groupId,
+ long callId, boolean success, Collection<CommitInfoProto> commitInfos, RaftGroup group) {
+ super(clientId, serverId, groupId, callId, success, null, null, commitInfos);
+ this.group = group;
}
public RaftGroup getGroup() {
return group;
}
-
- public ServerInformationReply(ClientId clientId, RaftPeerId serverId,
- RaftGroupId groupId, long callId, boolean success, Message message,
- RaftException exception, RaftGroup group) {
- super(clientId, serverId, groupId, callId, success, message, exception);
- this.group = group;
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4104860b/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
index 37a538d..d3b8fcf 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
@@ -29,6 +29,8 @@ import java.io.ObjectOutputStream;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
public interface ProtoUtils {
static ByteString writeObject2ByteString(Object obj) {
@@ -124,6 +126,27 @@ public interface ProtoUtils {
.addAllPeers(toRaftPeerProtos(group.getPeers()));
}
+ static CommitInfoProto toCommitInfoProto(RaftPeer peer, long commitIndex) {
+ return CommitInfoProto.newBuilder()
+ .setServer(toRaftPeerProto(peer))
+ .setCommitIndex(commitIndex)
+ .build();
+ }
+
+ static void addCommitInfos(Collection<CommitInfoProto> commitInfos, Consumer<CommitInfoProto> adder) {
+ if (commitInfos != null && !commitInfos.isEmpty()) {
+ commitInfos.stream().forEach(i -> adder.accept(i));
+ }
+ }
+
+ static String toString(CommitInfoProto proto) {
+ return RaftPeerId.valueOf(proto.getServer().getId()) + ":c" + proto.getCommitIndex();
+ }
+
+ static String toString(Collection<CommitInfoProto> protos) {
+ return protos.stream().map(ProtoUtils::toString).collect(Collectors.toList()).toString();
+ }
+
static boolean isConfigurationLogEntry(LogEntryProto entry) {
return entry.getLogEntryBodyCase() ==
LogEntryProto.LogEntryBodyCase.CONFIGURATIONENTRY;
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4104860b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java
index cb13561..19389bd 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java
@@ -263,6 +263,8 @@ public class GRpcLogAppender extends LogAppender {
private void onSuccess(AppendEntriesReplyProto reply) {
AppendEntriesRequestProto request = pendingRequests.poll();
+ updateCommitIndex(request.getLeaderCommit());
+
final long replyNextIndex = reply.getNextIndex();
Objects.requireNonNull(request,
() -> "Got reply with next index " + replyNextIndex
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4104860b/ratis-proto-shaded/src/main/proto/Raft.proto
----------------------------------------------------------------------
diff --git a/ratis-proto-shaded/src/main/proto/Raft.proto b/ratis-proto-shaded/src/main/proto/Raft.proto
index fea6bd0..fa823ab 100644
--- a/ratis-proto-shaded/src/main/proto/Raft.proto
+++ b/ratis-proto-shaded/src/main/proto/Raft.proto
@@ -118,6 +118,11 @@ message RequestVoteReplyProto {
bool shouldShutdown = 3;
}
+message CommitInfoProto {
+ RaftPeerProto server = 1;
+ uint64 commitIndex = 2;
+}
+
message AppendEntriesRequestProto {
RaftRpcRequestProto serverRequest = 1;
uint64 leaderTerm = 2;
@@ -125,6 +130,8 @@ message AppendEntriesRequestProto {
repeated LogEntryProto entries = 4;
uint64 leaderCommit = 5;
bool initializing = 6;
+
+ repeated CommitInfoProto commitInfos = 15;
}
message AppendEntriesReplyProto {
@@ -189,11 +196,8 @@ message RaftClientReplyProto {
NotLeaderExceptionProto notLeaderException = 3;
StateMachineExceptionProto stateMachineException = 4;
}
-}
-message ServerInformationReplyProto {
- RaftRpcReplyProto rpcReply = 1;
- RaftGroupProto group = 2;
+ repeated CommitInfoProto commitInfos = 15;
}
// setConfiguration request
@@ -211,4 +215,10 @@ message ReinitializeRequestProto {
// server info request
message ServerInformationRequestProto {
RaftRpcRequestProto rpcRequest = 1;
-}
\ No newline at end of file
+}
+
+message ServerInformationReplyProto {
+ RaftRpcReplyProto rpcReply = 1;
+ RaftGroupProto group = 2;
+ repeated CommitInfoProto commitInfos = 3;
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4104860b/ratis-server/src/main/java/org/apache/ratis/server/impl/CommitInfoCache.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/CommitInfoCache.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/CommitInfoCache.java
new file mode 100644
index 0000000..e74ff9c
--- /dev/null
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/CommitInfoCache.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.impl;
+
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.shaded.proto.RaftProtos.CommitInfoProto;
+import org.apache.ratis.util.ProtoUtils;
+
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/** Caching the commit information. */
+class CommitInfoCache {
+ private final ConcurrentMap<RaftPeerId, CommitInfoProto> map = new ConcurrentHashMap<>();
+
+ CommitInfoProto get(RaftPeerId id) {
+ return map.get(id);
+ }
+
+ CommitInfoProto update(RaftPeer peer, long newCommitIndex) {
+ Objects.requireNonNull(peer, "peer == null");
+ return map.compute(peer.getId(), (id, old) ->
+ old == null || newCommitIndex > old.getCommitIndex()? ProtoUtils.toCommitInfoProto(peer, newCommitIndex): old);
+ }
+
+ CommitInfoProto update(CommitInfoProto newInfo) {
+ return map.compute(RaftPeerId.valueOf(newInfo.getServer().getId()),
+ (id, old) -> old == null || newInfo.getCommitIndex() > old.getCommitIndex()? newInfo: old);
+ }
+
+ @Override
+ public String toString() {
+ return getClass().getSimpleName() + ":" + map.values();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4104860b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfo.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfo.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfo.java
index f72e037..246b9df 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfo.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfo.java
@@ -17,18 +17,21 @@
*/
package org.apache.ratis.server.impl;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-
import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.Timestamp;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
public class FollowerInfo {
private final RaftPeer peer;
private final AtomicReference<Timestamp> lastRpcResponseTime;
private final AtomicReference<Timestamp> lastRpcSendTime;
private long nextIndex;
private final AtomicLong matchIndex;
+ private final AtomicLong commitIndex = new AtomicLong(RaftServerConstants.INVALID_LOG_INDEX);
private volatile boolean attendVote;
FollowerInfo(RaftPeer peer, Timestamp lastRpcTime, long nextIndex,
@@ -49,6 +52,18 @@ public class FollowerInfo {
return matchIndex.get();
}
+ /** @return the commit index acked by the follower. */
+ long getCommitIndex() {
+ return commitIndex.get();
+ }
+
+ boolean updateCommitIndex(long newCommitIndex) {
+ final long old = commitIndex.getAndUpdate(oldCommitIndex -> newCommitIndex);
+ Preconditions.assertTrue(newCommitIndex >= old,
+ () -> "newCommitIndex = " + newCommitIndex + " < old = " + old);
+ return old != newCommitIndex;
+ }
+
public synchronized long getNextIndex() {
return nextIndex;
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4104860b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
index a3974d5..840df08 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
@@ -22,6 +22,8 @@ import org.apache.ratis.protocol.*;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.storage.RaftLog;
+import org.apache.ratis.shaded.proto.RaftProtos.CommitInfoProto;
+import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesRequestProto;
import org.apache.ratis.shaded.proto.RaftProtos.LeaderNoOp;
import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.statemachine.TransactionContext;
@@ -258,6 +260,19 @@ public class LeaderState {
server.getState().setRaftConf(logIndex, newConf);
}
+ void updateFollowerCommitInfos(CommitInfoCache cache, List<CommitInfoProto> protos) {
+ senders.stream().map(LogAppender::getFollower)
+ .map(f -> cache.update(f.getPeer(), f.getCommitIndex()))
+ .forEach(protos::add);
+ }
+
+ AppendEntriesRequestProto newAppendEntriesRequestProto(
+ RaftPeerId targetId, TermIndex previous, List<LogEntryProto> entries, boolean initializing) {
+ return ServerProtoUtils.toAppendEntriesRequestProto(server.getId(), targetId,
+ server.getGroupId(), currentTerm, entries, raftLog.getLastCommittedIndex(),
+ initializing, previous, server.getCommitInfos());
+ }
+
/**
* After receiving a setConfiguration request, the leader should update its
* RpcSender list.
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4104860b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
index 556ba83..ed4e843 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
@@ -18,6 +18,7 @@
package org.apache.ratis.server.impl;
import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.LeaderState.StateUpdateEventType;
import org.apache.ratis.server.protocol.TermIndex;
@@ -57,7 +58,6 @@ public class LogAppender extends Daemon {
private final int maxBufferSize;
private final boolean batchSending;
private final LogEntryBuffer buffer;
- private final long leaderTerm;
private final int snapshotChunkMaxSize;
private volatile boolean sending = true;
@@ -74,7 +74,6 @@ public class LogAppender extends Daemon {
this.snapshotChunkMaxSize = RaftServerConfigKeys.Log.Appender.snapshotChunkSizeMax(properties).getSizeInt();
this.buffer = new LogEntryBuffer();
- this.leaderTerm = server.getState().getCurrentTerm();
}
@Override
@@ -107,6 +106,10 @@ public class LogAppender extends Daemon {
return follower;
}
+ RaftPeerId getFollowerId() {
+ return getFollower().getPeer().getId();
+ }
+
/**
* A buffer for log entries with size limitation.
*/
@@ -135,9 +138,8 @@ public class LogAppender extends Daemon {
}
AppendEntriesRequestProto getAppendRequest(TermIndex previous) {
- final AppendEntriesRequestProto request = server
- .createAppendEntriesRequest(leaderTerm, follower.getPeer().getId(),
- previous, buf, !follower.isAttendingVote());
+ final AppendEntriesRequestProto request = leaderState.newAppendEntriesRequestProto(
+ getFollowerId(), previous, buf, !follower.isAttendingVote());
buf.clear();
totalSize = 0;
return request;
@@ -209,10 +211,10 @@ public class LogAppender extends Daemon {
}
follower.updateLastRpcSendTime();
- final AppendEntriesReplyProto r = server.getServerRpc()
- .appendEntries(request);
+ final AppendEntriesReplyProto r = server.getServerRpc().appendEntries(request);
follower.updateLastRpcResponseTime();
+ updateCommitIndex(request.getLeaderCommit());
return r;
} catch (InterruptedIOException | RaftLogIOException e) {
throw e;
@@ -227,6 +229,10 @@ public class LogAppender extends Daemon {
return null;
}
+ protected void updateCommitIndex(long commitIndex) {
+ follower.updateCommitIndex(commitIndex);
+ }
+
protected class SnapshotRequestIter
implements Iterable<InstallSnapshotRequestProto> {
private final SnapshotInfo snapshot;
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4104860b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java
index 95731c5..b63cd01 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java
@@ -71,14 +71,10 @@ public class PendingRequest implements Comparable<PendingRequest> {
}
TransactionContext setNotLeaderException(NotLeaderException nle) {
- setReply(new RaftClientReply(getRequest(), nle));
+ setReply(new RaftClientReply(getRequest(), nle, null));
return getEntry();
}
- void setSuccessReply(Message message) {
- setReply(new RaftClientReply(getRequest(), message));
- }
-
@Override
public int compareTo(PendingRequest that) {
return Long.compare(this.index, that.index);
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4104860b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
index 1dead9d..c9c66a5 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
@@ -113,7 +113,7 @@ class PendingRequests {
if (pendingSetConf != null) {
// for setConfiguration we do not need to wait for statemachine. send back
// reply after it's committed.
- pendingSetConf.setSuccessReply(null);
+ pendingSetConf.setReply(new RaftClientReply(pendingSetConf.getRequest(), server.getCommitInfos()));
pendingSetConf = null;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4104860b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 423837b..37cd16f 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -40,12 +40,8 @@ import org.slf4j.LoggerFactory;
import javax.management.ObjectName;
import java.io.IOException;
import java.util.*;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.CompletionException;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.TimeUnit;
-import java.util.function.BiFunction;
+import java.util.concurrent.*;
+import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@@ -89,6 +85,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
private volatile LeaderState leaderState;
private final RetryCache retryCache;
+ private final CommitInfoCache commitInfoCache = new CommitInfoCache();
private final RaftServerJmxAdapter jmxAdapter;
@@ -102,7 +99,9 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
Preconditions.assertTrue(maxTimeoutMs > minTimeoutMs,
"max timeout: %s, min timeout: %s", maxTimeoutMs, minTimeoutMs);
this.proxy = proxy;
- this.state = new ServerState(id, group, properties, this, proxy.getStateMachine());
+
+ final RaftPeer peer = new RaftPeer(id, proxy.getServerRpc().getInetSocketAddress());
+ this.state = new ServerState(peer, group, properties, this, proxy.getStateMachine());
this.retryCache = initRetryCache(properties);
this.jmxAdapter = new RaftServerJmxAdapter();
@@ -340,6 +339,31 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
heartbeatMonitor = null;
}
+ Collection<CommitInfoProto> getCommitInfos() {
+ final List<CommitInfoProto> infos = new ArrayList<>();
+ // add the commit info of this server
+ infos.add(state.updateCommitInfo(commitInfoCache));
+
+ // add the commit infos of other servers
+ if (isLeader()) {
+ Optional.of(leaderState).ifPresent(
+ leader -> leader.updateFollowerCommitInfos(commitInfoCache, infos));
+ } else {
+ getRaftConf().getPeers().stream()
+ .filter(p -> !p.getId().equals(state.getSelfId()))
+ .map(RaftPeer::getId)
+ .map(commitInfoCache::get)
+ .filter(i -> i != null)
+ .forEach(infos::add);
+ }
+ return infos;
+ }
+
+ ServerInformationReply getServerInformation(ServerInformatonRequest request) {
+ final RaftGroup group = new RaftGroup(groupId, getRaftConf().getPeers());
+ return new ServerInformationReply(request, getCommitInfos(), group);
+ }
+
synchronized void changeToCandidate() {
Preconditions.assertTrue(isFollower());
shutdownHeartbeatMonitor();
@@ -368,7 +392,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
if (!isLeader()) {
NotLeaderException exception = generateNotLeaderException();
- final RaftClientReply reply = new RaftClientReply(request, exception);
+ final RaftClientReply reply = new RaftClientReply(request, exception, getCommitInfos());
return RetryCache.failWithReply(reply, entry);
} else if (leaderState == null || !leaderState.isReady()) {
RetryCache.CacheEntry cacheEntry = retryCache.get(request.getClientId(), request.getCallId());
@@ -435,7 +459,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
} catch (StateMachineException e) {
// the StateMachineException is thrown by the SM in the preAppend stage.
// Return the exception in a RaftClientReply.
- RaftClientReply exceptionReply = new RaftClientReply(request, e);
+ RaftClientReply exceptionReply = new RaftClientReply(request, e, getCommitInfos());
cacheEntry.failWithReply(exceptionReply);
return CompletableFuture.completedFuture(exceptionReply);
}
@@ -463,7 +487,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
// TODO: We might not be the leader anymore by the time this completes.
// See the RAFT paper section 8 (last part)
return stateMachine.query(request.getMessage())
- .thenApply(r -> new RaftClientReply(request, r));
+ .thenApply(r -> new RaftClientReply(request, r, getCommitInfos()));
}
// query the retry cache
@@ -482,7 +506,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
TransactionContext context = stateMachine.startTransaction(request);
if (context.getException() != null) {
RaftClientReply exceptionReply = new RaftClientReply(request,
- new StateMachineException(getId(), context.getException()));
+ new StateMachineException(getId(), context.getException()), getCommitInfos());
cacheEntry.failWithReply(exceptionReply);
return CompletableFuture.completedFuture(exceptionReply);
}
@@ -495,15 +519,15 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
return waitForReply(getId(), request, submitClientRequestAsync(request));
}
- static RaftClientReply waitForReply(RaftPeerId id,
+ RaftClientReply waitForReply(RaftPeerId id,
RaftClientRequest request, CompletableFuture<RaftClientReply> future)
throws IOException {
- return waitForReply(id, request, future, RaftClientReply::new);
+ return waitForReply(id, request, future, e -> new RaftClientReply(request, e, getCommitInfos()));
}
static <REPLY extends RaftClientReply> REPLY waitForReply(
RaftPeerId id, RaftClientRequest request, CompletableFuture<REPLY> future,
- BiFunction<RaftClientRequest, RaftException, REPLY> exceptionReply)
+ Function<RaftException, REPLY> exceptionReply)
throws IOException {
try {
return future.get();
@@ -518,10 +542,12 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
}
if (cause instanceof NotLeaderException ||
cause instanceof StateMachineException) {
- return exceptionReply.apply(request, (RaftException) cause);
- } else {
- throw IOUtils.asIOException(cause);
+ final REPLY reply = exceptionReply.apply((RaftException) cause);
+ if (reply != null) {
+ return reply;
+ }
}
+ throw IOUtils.asIOException(cause);
}
}
@@ -556,8 +582,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
final RaftConfiguration current = getRaftConf();
// make sure there is no other raft reconfiguration in progress
- if (!current.isStable() || leaderState.inStagingState() ||
- !state.isCurrentConfCommitted()) {
+ if (!current.isStable() || leaderState.inStagingState() || !state.isConfCommitted()) {
throw new ReconfigurationInProgressException(
"Reconfiguration is already in progress: " + current);
}
@@ -565,7 +590,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
// return success with a null message if the new conf is the same as the current
if (current.hasNoChange(peersInNewConf)) {
pending = new PendingRequest(request);
- pending.setSuccessReply(null);
+ pending.setReply(new RaftClientReply(request, getCommitInfos()));
return pending.getFuture();
}
@@ -709,7 +734,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
return appendEntriesAsync(RaftPeerId.valueOf(request.getRequestorId()),
ProtoUtils.toRaftGroupId(request.getRaftGroupId()),
r.getLeaderTerm(), previous, r.getLeaderCommit(), r.getInitializing(),
- entries);
+ r.getCommitInfosList(), entries);
}
static void logAppendEntries(boolean isHeartbeat, Supplier<String> message) {
@@ -727,14 +752,15 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
private CompletableFuture<AppendEntriesReplyProto> appendEntriesAsync(
RaftPeerId leaderId, RaftGroupId leaderGroupId, long leaderTerm,
TermIndex previous, long leaderCommit, boolean initializing,
- LogEntryProto... entries) throws IOException {
+ List<CommitInfoProto> commitInfos, LogEntryProto... entries) throws IOException {
CodeInjectionForTesting.execute(APPEND_ENTRIES, getId(),
leaderId, leaderTerm, previous, leaderCommit, initializing, entries);
final boolean isHeartbeat = entries.length == 0;
logAppendEntries(isHeartbeat,
() -> getId() + ": receive appendEntries(" + leaderId + ", " + leaderGroupId + ", "
- + leaderTerm + ", " + previous + ", " + leaderCommit + ", "
- + initializing + ServerProtoUtils.toString(entries));
+ + leaderTerm + ", " + previous + ", " + leaderCommit + ", " + initializing
+ + ", commits" + ProtoUtils.toString(commitInfos)
+ + ", entries: " + ServerProtoUtils.toString(entries));
assertLifeCycleState(STARTING, RUNNING);
assertGroup(leaderId, leaderGroupId);
@@ -792,6 +818,8 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
state.updateConfiguration(entries);
state.updateStatemachine(leaderCommit, currentTerm);
+
+ commitInfos.stream().forEach(c -> commitInfoCache.update(c));
}
if (entries.length > 0) {
CodeInjectionForTesting.execute(RaftLog.LOG_SYNC, getId(), null);
@@ -894,14 +922,6 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
currentTerm, request.getRequestIndex(), InstallSnapshotResult.SUCCESS);
}
- AppendEntriesRequestProto createAppendEntriesRequest(long leaderTerm,
- RaftPeerId targetId, TermIndex previous, List<LogEntryProto> entries,
- boolean initializing) {
- return ServerProtoUtils.toAppendEntriesRequestProto(getId(), targetId, groupId,
- leaderTerm, entries, state.getLog().getLastCommittedIndex(),
- initializing, previous);
- }
-
synchronized InstallSnapshotRequestProto createInstallSnapshotRequest(
RaftPeerId targetId, String requestId, int requestIndex,
SnapshotInfo snapshot, List<FileChunkProto> chunks, boolean done) {
@@ -947,12 +967,12 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
return stateMachineFuture.whenComplete((reply, exception) -> {
final RaftClientReply r;
if (exception == null) {
- r = new RaftClientReply(clientId, serverId, groupId, callId, true, reply, null);
+ r = new RaftClientReply(clientId, serverId, groupId, callId, true, reply, null, getCommitInfos());
} else {
// the exception is coming from the state machine. wrap it into the
// reply as a StateMachineException
final StateMachineException e = new StateMachineException(getId(), exception);
- r = new RaftClientReply(clientId, serverId, groupId, callId, false, null, e);
+ r = new RaftClientReply(clientId, serverId, groupId, callId, false, null, e, getCommitInfos());
}
// update retry cache
cacheEntry.updateResult(r);
@@ -1011,7 +1031,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
final RetryCache.CacheEntry cacheEntry = getRetryCache().get(clientId, logEntry.getCallId());
if (cacheEntry != null) {
final RaftClientReply reply = new RaftClientReply(clientId, getId(), getGroupId(),
- logEntry.getCallId(), false, null, generateNotLeaderException());
+ logEntry.getCallId(), false, null, generateNotLeaderException(), getCommitInfos());
cacheEntry.failWithReply(reply);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4104860b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
index 071f65c..760cf70 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
@@ -33,7 +33,6 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
-import java.util.Collection;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@@ -161,7 +160,8 @@ public class RaftServerProxy implements RaftServer {
@Override
public RaftClientReply reinitialize(ReinitializeRequest request) throws IOException {
- return RaftServerImpl.waitForReply(getId(), request, reinitializeAsync(request));
+ return RaftServerImpl.waitForReply(getId(), request, reinitializeAsync(request),
+ e -> new RaftClientReply(request, e, null));
}
@Override
@@ -187,13 +187,13 @@ public class RaftServerProxy implements RaftServer {
"Failed to reinitialize, request=" + request, ioe);
impl.completeExceptionally(new IOException(
"Server " + getId() + " is not initialized.", re));
- return new RaftClientReply(request, re);
+ return new RaftClientReply(request, re, null);
}
getServerRpc().addPeers(request.getGroup().getPeers());
newImpl.start();
impl.complete(newImpl);
- return new RaftClientReply(request, (Message) null);
+ return new RaftClientReply(request, newImpl.getCommitInfos());
} finally {
reinitializeRequest.set(null);
}
@@ -204,25 +204,13 @@ public class RaftServerProxy implements RaftServer {
public ServerInformationReply getInfo(ServerInformatonRequest request)
throws IOException {
return RaftServerImpl.waitForReply(getId(), request, getInfoAsync(request),
- ServerInformationReply::new);
+ r -> null);
}
@Override
public CompletableFuture<ServerInformationReply> getInfoAsync(
- ServerInformatonRequest request) throws IOException {
- return CompletableFuture.supplyAsync(() -> {
- try {
- RaftServerImpl server = impl.get();
- Collection<RaftPeer> peers = server.getRaftConf().getPeers();
- RaftGroupId groupId = server.getGroupId();
- RaftGroup group = new RaftGroup(groupId, peers);
- return new ServerInformationReply(request, null, group);
- } catch (Exception e) {
- final RaftException re = new RaftException(
- "Failed to get info, request=" + request, e);
- return new ServerInformationReply(request, re);
- }
- });
+ ServerInformatonRequest request) {
+ return impl.thenApply(server -> server.getServerInformation(request));
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4104860b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
index 3e44c76..cfdf4cf 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
@@ -21,20 +21,18 @@ import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_CALLID;
import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_SEQNUM;
import java.util.Arrays;
+import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.ratis.client.impl.ClientProtoUtils;
-import org.apache.ratis.protocol.ClientId;
-import org.apache.ratis.protocol.RaftGroupId;
-import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.*;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.shaded.com.google.protobuf.ByteString;
import org.apache.ratis.shaded.proto.RaftProtos.*;
import org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesReplyProto.*;
import org.apache.ratis.util.ProtoUtils;
-
/** Server proto utilities for internal use. */
public class ServerProtoUtils {
public static TermIndex toTermIndex(TermIndexProto p) {
@@ -187,7 +185,7 @@ public class ServerProtoUtils {
public static AppendEntriesRequestProto toAppendEntriesRequestProto(
RaftPeerId requestorId, RaftPeerId replyId, RaftGroupId groupId, long leaderTerm,
List<LogEntryProto> entries, long leaderCommit, boolean initializing,
- TermIndex previous) {
+ TermIndex previous, Collection<CommitInfoProto> commitInfos) {
final AppendEntriesRequestProto.Builder b = AppendEntriesRequestProto
.newBuilder()
.setServerRequest(toRaftRpcRequestProtoBuilder(requestorId, replyId, groupId))
@@ -201,6 +199,7 @@ public class ServerProtoUtils {
if (previous != null) {
b.setPreviousLog(toTermIndexProto(previous));
}
+ ProtoUtils.addCommitInfos(commitInfos, i -> b.addCommitInfos(i));
return b.build();
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4104860b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index b50393e..6354e73 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -18,13 +18,11 @@
package org.apache.ratis.server.impl;
import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.protocol.ClientId;
-import org.apache.ratis.protocol.RaftGroup;
-import org.apache.ratis.protocol.RaftPeerId;
-import org.apache.ratis.protocol.StateMachineException;
+import org.apache.ratis.protocol.*;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.storage.*;
+import org.apache.ratis.shaded.proto.RaftProtos.CommitInfoProto;
import org.apache.ratis.shaded.proto.RaftProtos.InstallSnapshotRequestProto;
import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.statemachine.SnapshotInfo;
@@ -46,6 +44,7 @@ import static org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto.LogEntryBod
*/
public class ServerState implements Closeable {
private final RaftPeerId selfId;
+ private final RaftPeer peer;
private final RaftServerImpl server;
/** Raft log */
private final RaftLog log;
@@ -79,10 +78,11 @@ public class ServerState implements Closeable {
*/
private TermIndex latestInstalledSnapshot;
- ServerState(RaftPeerId id, RaftGroup group, RaftProperties prop,
+ ServerState(RaftPeer peer, RaftGroup group, RaftProperties prop,
RaftServerImpl server, StateMachine stateMachine)
throws IOException {
- this.selfId = id;
+ this.selfId = peer.getId();
+ this.peer = peer;
this.server = server;
RaftConfiguration initialConf = RaftConfiguration.newBuilder()
.setConf(group.getPeers()).build();
@@ -91,14 +91,14 @@ public class ServerState implements Closeable {
final File dir = RaftServerConfigKeys.storageDir(prop);
storage = new RaftStorage(new File(dir, group.getGroupId().toString()),
RaftServerConstants.StartupOption.REGULAR);
- snapshotManager = new SnapshotManager(storage, id);
+ snapshotManager = new SnapshotManager(storage, peer.getId());
long lastApplied = initStatemachine(stateMachine, prop);
leaderId = null;
// we cannot apply log entries to the state machine in this step, since we
// do not know whether the local log entries have been committed.
- log = initLog(id, prop, lastApplied, entry -> {
+ log = initLog(peer.getId(), prop, lastApplied, entry -> {
if (entry.getLogEntryBodyCase() == CONFIGURATIONENTRY) {
configurationManager.addConfiguration(entry.getIndex(),
ServerProtoUtils.toRaftConfiguration(entry.getIndex(),
@@ -163,6 +163,10 @@ public class ServerState implements Closeable {
return this.selfId;
}
+ CommitInfoProto updateCommitInfo(CommitInfoCache cache) {
+ return cache.update(peer, log.getLastCommittedIndex());
+ }
+
public long getCurrentTerm() {
return currentTerm;
}
@@ -304,15 +308,16 @@ public class ServerState implements Closeable {
}
}
- void updateStatemachine(long majorityIndex, long currentTerm) {
- log.updateLastCommitted(majorityIndex, currentTerm);
- stateMachineUpdater.notifyUpdater();
+ boolean updateStatemachine(long majorityIndex, long currentTerm) {
+ if (log.updateLastCommitted(majorityIndex, currentTerm)) {
+ stateMachineUpdater.notifyUpdater();
+ return true;
+ }
+ return false;
}
- void reloadStateMachine(long lastIndexInSnapshot, long currentTerm)
- throws IOException {
+ void reloadStateMachine(long lastIndexInSnapshot, long currentTerm) {
log.updateLastCommitted(lastIndexInSnapshot, currentTerm);
-
stateMachineUpdater.reloadStateMachine();
}
@@ -351,8 +356,4 @@ public class ServerState implements Closeable {
public long getLastAppliedIndex() {
return stateMachineUpdater.getLastAppliedIndex();
}
-
- boolean isCurrentConfCommitted() {
- return getRaftConf().getLogEntryIndex() <= getLog().getLastCommittedIndex();
- }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4104860b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
index 8edb8a1..91f3b41 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
@@ -80,8 +80,9 @@ public abstract class RaftLog implements Closeable {
* Update the last committed index.
* @param majorityIndex the index that has achieved majority.
* @param currentTerm the current term.
+ * @return true if update is applied; otherwise, return false, i.e. no update required.
*/
- public void updateLastCommitted(long majorityIndex, long currentTerm) {
+ public boolean updateLastCommitted(long majorityIndex, long currentTerm) {
try(AutoCloseableLock writeLock = writeLock()) {
if (lastCommitted.get() < majorityIndex) {
// Only update last committed index for current term. See ยง5.4.2 in
@@ -90,9 +91,11 @@ public abstract class RaftLog implements Closeable {
if (entry != null && entry.getTerm() == currentTerm) {
LOG.debug("{}: Updating lastCommitted to {}", selfId, majorityIndex);
lastCommitted.set(majorityIndex);
+ return true;
}
}
}
+ return false;
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4104860b/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java
index e69f35d..f90012e 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java
@@ -187,7 +187,7 @@ public abstract class RaftExceptionBaseTest<CLUSTER extends MiniRaftCluster>
GroupMismatchException.class);
testFailureCase("reinitialize(..) with client group being different from the server group",
- () -> client.reinitialize(anotherGroup, clusterGroup.getPeers().get(0).getId()),
+ () -> client.reinitialize(anotherGroup, clusterGroup.getPeers().iterator().next().getId()),
GroupMismatchException.class);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/4104860b/ratis-server/src/test/java/org/apache/ratis/server/impl/ServerInformationBaseTest.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/ServerInformationBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/ServerInformationBaseTest.java
index c22d4c5..2862e0d 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/ServerInformationBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/ServerInformationBaseTest.java
@@ -20,17 +20,17 @@ package org.apache.ratis.server.impl;
import org.apache.log4j.Level;
import org.apache.ratis.BaseTest;
import org.apache.ratis.MiniRaftCluster;
+import org.apache.ratis.RaftTestUtil;
import org.apache.ratis.client.RaftClient;
-import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.protocol.RaftClientReply;
-import org.apache.ratis.protocol.RaftGroup;
-import org.apache.ratis.protocol.RaftPeer;
-import org.apache.ratis.protocol.ServerInformationReply;
+import org.apache.ratis.protocol.*;
+import org.apache.ratis.shaded.proto.RaftProtos.CommitInfoProto;
import org.apache.ratis.util.LogUtils;
+import org.junit.Assert;
import org.junit.Test;
-import java.io.IOException;
+import java.util.Collection;
import java.util.List;
+
import static org.apache.ratis.util.Preconditions.assertTrue;
public abstract class ServerInformationBaseTest<CLUSTER extends MiniRaftCluster>
@@ -43,7 +43,7 @@ public abstract class ServerInformationBaseTest<CLUSTER extends MiniRaftCluster>
@Test
public void testServerInformation() throws Exception {
- runTest(5);
+ runTest(3);
}
private void runTest(int num) throws Exception {
@@ -66,6 +66,62 @@ public abstract class ServerInformationBaseTest<CLUSTER extends MiniRaftCluster>
assertTrue(sameGroup(group, info.getGroup()));
}
}
+
+ final int numMessages = 5;
+ final long maxCommit;
+ {
+ // send some messages and get max commit from the last reply
+ final RaftClientReply reply = sendMessages(numMessages, cluster);
+ maxCommit = reply.getCommitInfos().stream().mapToLong(CommitInfoProto::getCommitIndex).max().getAsLong();
+ }
+ // kill a follower
+ final RaftPeerId killedFollower = cluster.getFollowers().iterator().next().getId();
+ cluster.killServer(killedFollower);
+ {
+ // send more messages and check last reply
+ final RaftClientReply reply = sendMessages(numMessages, cluster);
+ for(CommitInfoProto i : reply.getCommitInfos()) {
+ if (RaftPeerId.valueOf(i.getServer().getId()).equals(killedFollower)) {
+ Assert.assertTrue(i.getCommitIndex() <= maxCommit);
+ } else {
+ Assert.assertTrue(i.getCommitIndex() > maxCommit);
+ }
+ }
+ }
+
+ // check serverInformation
+ for(RaftPeer peer : peers) {
+ if (peer.getId().equals(killedFollower)) {
+ continue;
+ }
+ try(final RaftClient client = cluster.createClient(peer.getId())) {
+ RaftClientReply reply = client.serverInformation(peer.getId());
+ assertTrue(reply instanceof ServerInformationReply);
+ ServerInformationReply info = (ServerInformationReply)reply;
+ assertTrue(sameGroup(group, info.getGroup()));
+ for(CommitInfoProto i : info.getCommitInfos()) {
+ if (RaftPeerId.valueOf(i.getServer().getId()).equals(killedFollower)) {
+ Assert.assertTrue(i.getCommitIndex() <= maxCommit);
+ } else {
+ Assert.assertTrue(i.getCommitIndex() > maxCommit);
+ }
+ }
+ }
+ }
+
+ cluster.shutdown();
+ }
+
+ RaftClientReply sendMessages(int n, MiniRaftCluster cluster) throws Exception {
+ LOG.info("sendMessages: " + n);
+ final RaftPeerId leader = RaftTestUtil.waitForLeader(cluster).getId();
+ RaftClientReply reply = null;
+ try(final RaftClient client = cluster.createClient(leader)) {
+ for(int i = 0; i < n; i++) {
+ reply = client.send(Message.valueOf("m" + i));
+ }
+ }
+ return reply;
}
private boolean sameGroup(RaftGroup expected, RaftGroup given) {
@@ -73,8 +129,8 @@ public abstract class ServerInformationBaseTest<CLUSTER extends MiniRaftCluster>
given.getGroupId().toString())) {
return false;
}
- List<RaftPeer> expectedPeers = expected.getPeers();
- List<RaftPeer> givenPeers = given.getPeers();
+ Collection<RaftPeer> expectedPeers = expected.getPeers();
+ Collection<RaftPeer> givenPeers = given.getPeers();
if (expectedPeers.size() != givenPeers.size()) {
return false;
}