You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ru...@apache.org on 2020/12/09 03:45:14 UTC
[incubator-ratis] branch master updated: RATIS-1219. Define an
interface for RaftConfiguration. (#336)
This is an automated email from the ASF dual-hosted git repository.
runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new d3e2862 RATIS-1219. Define an interface for RaftConfiguration. (#336)
d3e2862 is described below
commit d3e28626a12c09496eff459b8b63b158ee4d975d
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Wed Dec 9 11:45:08 2020 +0800
RATIS-1219. Define an interface for RaftConfiguration. (#336)
---
.../org/apache/ratis/server/RaftConfiguration.java | 47 +++++++++++++++
.../java/org/apache/ratis/server/RaftServer.java | 3 +-
.../ratis/server/impl/ConfigurationManager.java | 20 ++++---
.../apache/ratis/server/impl/LeaderElection.java | 7 ++-
.../apache/ratis/server/impl/LeaderStateImpl.java | 34 ++++++-----
...nfiguration.java => RaftConfigurationImpl.java} | 66 ++++++++++------------
.../apache/ratis/server/impl/RaftServerImpl.java | 14 ++---
.../apache/ratis/server/impl/ServerProtoUtils.java | 13 +++--
.../org/apache/ratis/server/impl/ServerState.java | 12 ++--
.../org/apache/ratis/server/raftlog/RaftLog.java | 2 +-
.../ratis/server/raftlog/RaftLogSequentialOps.java | 2 +-
.../apache/ratis/server/storage/RaftStorage.java | 2 +-
.../server/impl/RaftReconfigurationBaseTest.java | 17 ++++--
.../ratis/server/impl/RaftServerTestUtil.java | 12 ++--
.../ratis/datastream/DataStreamBaseTest.java | 2 +-
15 files changed, 152 insertions(+), 101 deletions(-)
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/RaftConfiguration.java b/ratis-server/src/main/java/org/apache/ratis/server/RaftConfiguration.java
new file mode 100644
index 0000000..2f6cc0f
--- /dev/null
+++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftConfiguration.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server;
+
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftPeerId;
+
+import java.util.Collection;
+
+/**
+ * A configuration is a subset of the members in a {@link org.apache.ratis.protocol.RaftGroup}.
+ * The configuration of a cluster may change from time to time.
+ * This class captures the current configuration and the previous configuration of a cluster.
+ *
+ * The objects of this class are immutable.
+ */
+public interface RaftConfiguration {
+ /**
+ * @return the peer corresponding to the given id;
+ * or return null if the peer is not in this configuration.
+ */
+ RaftPeer getPeer(RaftPeerId id);
+
+ /** @return all the peers in the current configuration and the previous configuration. */
+ Collection<RaftPeer> getAllPeers();
+
+ /** @return all the peers in the current configuration. */
+ Collection<RaftPeer> getCurrentPeers();
+
+ /** @return all the peers in the previous configuration. */
+ Collection<RaftPeer> getPreviousPeers();
+}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java b/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
index 5c309a1..838038a 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
@@ -22,7 +22,6 @@ import org.apache.ratis.conf.Parameters;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.*;
import org.apache.ratis.rpc.RpcType;
-import org.apache.ratis.server.impl.RaftConfiguration;
import org.apache.ratis.server.impl.ServerImplUtils;
import org.apache.ratis.server.metrics.RaftServerMetrics;
import org.apache.ratis.server.protocol.RaftServerAsynchronousProtocol;
@@ -73,7 +72,7 @@ public interface RaftServer extends Closeable, RpcType.Get,
/** @return the {@link RaftGroup} for this division. */
default RaftGroup getGroup() {
- return RaftGroup.valueOf(getMemberId().getGroupId(), getRaftConf().getPeers());
+ return RaftGroup.valueOf(getMemberId().getGroupId(), getRaftConf().getAllPeers());
}
/** @return the current {@link RaftConfiguration} for this division. */
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ConfigurationManager.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ConfigurationManager.java
index 10f95b7..019047f 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ConfigurationManager.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ConfigurationManager.java
@@ -17,6 +17,7 @@
*/
package org.apache.ratis.server.impl;
+import org.apache.ratis.server.RaftConfiguration;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.StringUtils;
@@ -30,15 +31,15 @@ import java.util.*;
* entries.
*/
public class ConfigurationManager {
- private final RaftConfiguration initialConf;
- private final NavigableMap<Long, RaftConfiguration> configurations = new TreeMap<>();
+ private final RaftConfigurationImpl initialConf;
+ private final NavigableMap<Long, RaftConfigurationImpl> configurations = new TreeMap<>();
/**
* The current raft configuration. If configurations is not empty, should be
* the last entry of the map. Otherwise is initialConf.
*/
- private RaftConfiguration currentConf;
+ private volatile RaftConfigurationImpl currentConf;
- ConfigurationManager(RaftConfiguration initialConf) {
+ ConfigurationManager(RaftConfigurationImpl initialConf) {
this.initialConf = initialConf;
this.currentConf = initialConf;
}
@@ -49,13 +50,17 @@ public class ConfigurationManager {
Preconditions.assertTrue(found.equals(conf));
return;
}
+ addRaftConfigurationImpl(logIndex, (RaftConfigurationImpl) conf);
+ }
+
+ private void addRaftConfigurationImpl(long logIndex, RaftConfigurationImpl conf) {
configurations.put(logIndex, conf);
if (logIndex == configurations.lastEntry().getKey()) {
currentConf = conf;
}
}
- synchronized RaftConfiguration getCurrent() {
+ RaftConfigurationImpl getCurrent() {
return currentConf;
}
@@ -66,9 +71,8 @@ public class ConfigurationManager {
* @return The configuration with largest log index < the given index.
*/
synchronized RaftConfiguration removeConfigurations(long index) {
- SortedMap<Long, RaftConfiguration> toRemove = configurations.tailMap(index);
- for (Iterator<Map.Entry<Long, RaftConfiguration>> iter =
- toRemove.entrySet().iterator(); iter.hasNext();) {
+ // remove all configurations starting at the index
+ for(final Iterator<?> iter = configurations.tailMap(index).entrySet().iterator(); iter.hasNext();) {
iter.next();
iter.remove();
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
index 9db608e..932130b 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
@@ -22,6 +22,7 @@ import org.apache.ratis.proto.RaftProtos.RequestVoteRequestProto;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.DivisionInfo;
+import org.apache.ratis.server.RaftConfiguration;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.ratis.util.Daemon;
@@ -205,7 +206,7 @@ class LeaderElection implements Runnable {
while (shouldRun()) {
// one round of requestVotes
final long electionTerm;
- final RaftConfiguration conf;
+ final RaftConfigurationImpl conf;
synchronized (server) {
if (!shouldRun()) {
break;
@@ -274,7 +275,7 @@ class LeaderElection implements Runnable {
Set<RaftPeerId> higherPriorityPeers = new HashSet<>();
int currPriority = conf.getPeer(server.getId()).getPriority();
- Collection<RaftPeer> peers = conf.getPeers();
+ final Collection<RaftPeer> peers = conf.getAllPeers();
for (RaftPeer peer : peers) {
if (peer.getPriority() > currPriority) {
@@ -286,7 +287,7 @@ class LeaderElection implements Runnable {
}
private ResultAndTerm waitForResults(final long electionTerm, final int submitted,
- RaftConfiguration conf, Executor voteExecutor) throws InterruptedException {
+ RaftConfigurationImpl conf, Executor voteExecutor) throws InterruptedException {
final Timestamp timeout = Timestamp.currentTime().addTime(server.getRandomElectionTimeout());
final Map<RaftPeerId, RequestVoteReplyProto> responses = new HashMap<>();
final List<Exception> exceptions = new ArrayList<>();
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
index 0352fb9..5441408 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
@@ -266,7 +266,7 @@ class LeaderStateImpl implements LeaderState {
this.watchRequests = new WatchRequests(server.getMemberId(), properties);
this.messageStreamRequests = new MessageStreamRequests(server.getMemberId());
- final RaftConfiguration conf = server.getRaftConf();
+ final RaftConfigurationImpl conf = state.getRaftConf();
Collection<RaftPeer> others = conf.getOtherPeers(server.getId());
placeHolderIndex = raftLog.getNextIndex();
@@ -430,9 +430,8 @@ class LeaderStateImpl implements LeaderState {
private void applyOldNewConf() {
final ServerState state = server.getState();
- final RaftConfiguration current = server.getRaftConf();
- final RaftConfiguration oldNewConf= stagingState.generateOldNewConf(current,
- state.getLog().getNextIndex());
+ final RaftConfigurationImpl current = state.getRaftConf();
+ final RaftConfigurationImpl oldNewConf= stagingState.generateOldNewConf(current, state.getLog().getNextIndex());
// apply the (old, new) configuration to log, and use it as the current conf
long index = state.getLog().append(state.getCurrentTerm(), oldNewConf);
updateConfiguration(index, oldNewConf);
@@ -441,7 +440,7 @@ class LeaderStateImpl implements LeaderState {
notifySenders();
}
- private void updateConfiguration(long logIndex, RaftConfiguration newConf) {
+ private void updateConfiguration(long logIndex, RaftConfigurationImpl newConf) {
voterLists = divideFollowers(newConf);
server.getState().setRaftConf(logIndex, newConf);
}
@@ -503,7 +502,7 @@ class LeaderStateImpl implements LeaderState {
/**
* Update the RpcSender list based on the current configuration
*/
- private void updateSenders(RaftConfiguration conf) {
+ private void updateSenders(RaftConfigurationImpl conf) {
Preconditions.assertTrue(conf.isStable() && !inStagingState());
stopAndRemoveSenders(s -> !conf.containsInConf(s.getFollowerId()));
}
@@ -545,8 +544,8 @@ class LeaderStateImpl implements LeaderState {
private void prepare() {
synchronized (server) {
if (running) {
- final RaftConfiguration conf = server.getRaftConf();
- if (conf.isTransitional() && server.getState().isConfCommitted()) {
+ final ServerState state = server.getState();
+ if (state.getRaftConf().isTransitional() && state.isConfCommitted()) {
// the configuration is in transitional state, and has been committed
// so it is time to generate and replicate (new) conf.
replicateNewConf();
@@ -692,7 +691,7 @@ class LeaderStateImpl implements LeaderState {
private Optional<MinMajorityMax> getMajorityMin(ToLongFunction<FollowerInfo> followerIndex, LongSupplier logIndex) {
final RaftPeerId selfId = server.getId();
- final RaftConfiguration conf = server.getRaftConf();
+ final RaftConfigurationImpl conf = server.getRaftConf();
final List<RaftPeerId> followers = voterLists.get(0);
final boolean includeSelf = conf.containsInConf(selfId);
@@ -762,7 +761,7 @@ class LeaderStateImpl implements LeaderState {
}
private void checkAndUpdateConfiguration(TermIndex[] entriesToCheck) {
- final RaftConfiguration conf = server.getRaftConf();
+ final RaftConfigurationImpl conf = server.getRaftConf();
if (committedConf(entriesToCheck)) {
if (conf.isTransitional()) {
replicateNewConf();
@@ -793,8 +792,8 @@ class LeaderStateImpl implements LeaderState {
* 4) start replicating the log entry
*/
private void replicateNewConf() {
- final RaftConfiguration conf = server.getRaftConf();
- final RaftConfiguration newConf = RaftConfiguration.newBuilder()
+ final RaftConfigurationImpl conf = server.getRaftConf();
+ final RaftConfigurationImpl newConf = RaftConfigurationImpl.newBuilder()
.setConf(conf)
.setLogEntryIndex(raftLog.getNextIndex())
.build();
@@ -843,7 +842,7 @@ class LeaderStateImpl implements LeaderState {
return indices;
}
- private List<List<RaftPeerId>> divideFollowers(RaftConfiguration conf) {
+ private List<List<RaftPeerId>> divideFollowers(RaftConfigurationImpl conf) {
List<List<RaftPeerId>> lists = new ArrayList<>(2);
List<RaftPeerId> listForNew = senders.stream()
.map(LogAppender::getFollowerId)
@@ -865,7 +864,7 @@ class LeaderStateImpl implements LeaderState {
return;
}
- final RaftConfiguration conf = server.getRaftConf();
+ final RaftConfigurationImpl conf = server.getRaftConf();
int leaderPriority = conf.getPeer(server.getId()).getPriority();
for (LogAppender logAppender : senders.getSenders()) {
@@ -927,7 +926,7 @@ class LeaderStateImpl implements LeaderState {
.map(LogAppender::getFollowerId)
.collect(Collectors.toList());
- final RaftConfiguration conf = server.getRaftConf();
+ final RaftConfigurationImpl conf = server.getRaftConf();
if (conf.hasMajority(activePeers, server.getId())) {
// leadership check passed
@@ -971,9 +970,8 @@ class LeaderStateImpl implements LeaderState {
this.newConf = newConf;
}
- RaftConfiguration generateOldNewConf(RaftConfiguration current,
- long logIndex) {
- return RaftConfiguration.newBuilder()
+ RaftConfigurationImpl generateOldNewConf(RaftConfigurationImpl current, long logIndex) {
+ return RaftConfigurationImpl.newBuilder()
.setConf(newConf)
.setOldConf(current)
.setLogEntryIndex(logIndex)
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfiguration.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfigurationImpl.java
similarity index 80%
rename from ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfiguration.java
rename to ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfigurationImpl.java
index 4f7e3ea..5469ce1 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfiguration.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfigurationImpl.java
@@ -19,11 +19,15 @@ package org.apache.ratis.server.impl;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.RaftConfiguration;
import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.util.Preconditions;
-import java.util.*;
-import java.util.concurrent.ThreadLocalRandom;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
import java.util.stream.Collectors;
/**
@@ -34,14 +38,13 @@ import java.util.stream.Collectors;
*
* The objects of this class are immutable.
*/
-public final class RaftConfiguration {
+final class RaftConfigurationImpl implements RaftConfiguration {
/** Create a {@link Builder}. */
- public static Builder newBuilder() {
+ static Builder newBuilder() {
return new Builder();
}
- /** To build {@link RaftConfiguration} objects. */
- public static final class Builder {
+ static final class Builder {
private PeerConfiguration oldConf;
private PeerConfiguration conf;
private long logEntryIndex = RaftLog.INVALID_LOG_INDEX;
@@ -51,18 +54,18 @@ public final class RaftConfiguration {
private Builder() {}
- public Builder setConf(PeerConfiguration conf) {
+ Builder setConf(PeerConfiguration conf) {
Objects.requireNonNull(conf);
Preconditions.assertTrue(this.conf == null, "conf is already set.");
this.conf = conf;
return this;
}
- public Builder setConf(Iterable<RaftPeer> peers) {
+ Builder setConf(Iterable<RaftPeer> peers) {
return setConf(new PeerConfiguration(peers));
}
- Builder setConf(RaftConfiguration transitionalConf) {
+ Builder setConf(RaftConfigurationImpl transitionalConf) {
Objects.requireNonNull(transitionalConf);
Preconditions.assertTrue(transitionalConf.isTransitional());
@@ -72,18 +75,18 @@ public final class RaftConfiguration {
}
- public Builder setOldConf(PeerConfiguration oldConf) {
+ Builder setOldConf(PeerConfiguration oldConf) {
Objects.requireNonNull(oldConf);
Preconditions.assertTrue(this.oldConf == null, "oldConf is already set.");
this.oldConf = oldConf;
return this;
}
- public Builder setOldConf(Iterable<RaftPeer> oldPeers) {
+ Builder setOldConf(Iterable<RaftPeer> oldPeers) {
return setOldConf(new PeerConfiguration(oldPeers));
}
- Builder setOldConf(RaftConfiguration stableConf) {
+ Builder setOldConf(RaftConfigurationImpl stableConf) {
Objects.requireNonNull(stableConf);
Preconditions.assertTrue(stableConf.isStable());
@@ -92,22 +95,21 @@ public final class RaftConfiguration {
return setOldConf(stableConf.conf);
}
- public Builder setLogEntryIndex(long logEntryIndex) {
+ Builder setLogEntryIndex(long logEntryIndex) {
Preconditions.assertTrue(logEntryIndex != RaftLog.INVALID_LOG_INDEX);
Preconditions.assertTrue(this.logEntryIndex == RaftLog.INVALID_LOG_INDEX, "logEntryIndex is already set.");
this.logEntryIndex = logEntryIndex;
return this;
}
- /** Build a {@link RaftConfiguration}. */
- public RaftConfiguration build() {
+ RaftConfigurationImpl build() {
if (forceTransitional) {
Preconditions.assertTrue(oldConf != null);
}
if (forceStable) {
Preconditions.assertTrue(oldConf == null);
}
- return new RaftConfiguration(conf, oldConf, logEntryIndex);
+ return new RaftConfigurationImpl(conf, oldConf, logEntryIndex);
}
}
@@ -122,7 +124,7 @@ public final class RaftConfiguration {
/** The index of the corresponding log entry for this configuration. */
private final long logEntryIndex;
- private RaftConfiguration(PeerConfiguration conf, PeerConfiguration oldConf,
+ private RaftConfigurationImpl(PeerConfiguration conf, PeerConfiguration oldConf,
long logEntryIndex) {
this.conf = Objects.requireNonNull(conf);
this.oldConf = oldConf;
@@ -151,15 +153,12 @@ public final class RaftConfiguration {
* @return true iff the given peer is contained in conf and,
* if old conf exists, is contained in old conf.
*/
- boolean contains(RaftPeerId peerId) {
+ boolean containsInBothConfs(RaftPeerId peerId) {
return containsInConf(peerId) &&
(oldConf == null || containsInOldConf(peerId));
}
- /**
- * @return the peer corresponding to the given id;
- * or return null if the peer is not in this configuration.
- */
+ @Override
public RaftPeer getPeer(RaftPeerId id) {
if (id == null) {
return null;
@@ -173,8 +172,8 @@ public final class RaftConfiguration {
return null;
}
- /** @return all the peers from the conf, and the old conf if it exists. */
- public Collection<RaftPeer> getPeers() {
+ @Override
+ public Collection<RaftPeer> getAllPeers() {
final Collection<RaftPeer> peers = new ArrayList<>(conf.getPeers());
if (oldConf != null) {
oldConf.getPeers().stream().filter(p -> !peers.contains(p))
@@ -187,7 +186,7 @@ public final class RaftConfiguration {
* @return all the peers other than the given self id from the conf,
* and the old conf if it exists.
*/
- public Collection<RaftPeer> getOtherPeers(RaftPeerId selfId) {
+ Collection<RaftPeer> getOtherPeers(RaftPeerId selfId) {
Collection<RaftPeer> others = conf.getOtherPeers(selfId);
if (oldConf != null) {
oldConf.getOtherPeers(selfId).stream()
@@ -236,20 +235,13 @@ public final class RaftConfiguration {
return peers.stream().filter(p -> !containsInConf(p.getId())).collect(Collectors.toList());
}
- RaftPeer getRandomPeer(RaftPeerId exclusiveId) {
- final List<RaftPeer> peers = conf.getOtherPeers(exclusiveId);
- if (peers.isEmpty()) {
- return null;
- }
- final int index = ThreadLocalRandom.current().nextInt(peers.size());
- return peers.get(index);
- }
-
- Collection<RaftPeer> getPeersInOldConf() {
+ @Override
+ public Collection<RaftPeer> getPreviousPeers() {
return oldConf != null ? oldConf.getPeers() : Collections.emptyList();
}
- Collection<RaftPeer> getPeersInConf() {
+ @Override
+ public Collection<RaftPeer> getCurrentPeers() {
return conf.getPeers();
}
@@ -260,7 +252,7 @@ public final class RaftConfiguration {
} else if (obj == null || obj.getClass() != this.getClass()) {
return false;
}
- final RaftConfiguration that = (RaftConfiguration)obj;
+ final RaftConfigurationImpl that = (RaftConfigurationImpl)obj;
return this.logEntryIndex == that.logEntryIndex
&& Objects.equals(this.conf, that.conf)
&& Objects.equals(this.oldConf, that.oldConf);
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 98e94b5..6669216 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -279,8 +279,8 @@ class RaftServerImpl implements RaftServer.Division,
if (!lifeCycle.compareAndTransition(NEW, STARTING)) {
return false;
}
- RaftConfiguration conf = getRaftConf();
- if (conf != null && conf.contains(getId())) {
+ final RaftConfigurationImpl conf = getRaftConf();
+ if (conf != null && conf.containsInBothConfs(getId())) {
LOG.info("{}: start as a follower, conf={}", getMemberId(), conf);
startAsFollower();
} else {
@@ -341,7 +341,7 @@ class RaftServerImpl implements RaftServer.Division,
}
@Override
- public RaftConfiguration getRaftConf() {
+ public RaftConfigurationImpl getRaftConf() {
return getState().getRaftConf();
}
@@ -486,7 +486,7 @@ class RaftServerImpl implements RaftServer.Division,
role.getLeaderState().ifPresent(
leader -> leader.updateFollowerCommitInfos(commitInfoCache, infos));
} else {
- getRaftConf().getPeers().stream()
+ getRaftConf().getAllPeers().stream()
.map(RaftPeer::getId)
.filter(id -> !id.equals(getId()))
.map(commitInfoCache::get)
@@ -620,8 +620,8 @@ class RaftServerImpl implements RaftServer.Division,
// leader, but it is about to step down. set the suggested leader as null.
leaderId = null;
}
- RaftConfiguration conf = getRaftConf();
- Collection<RaftPeer> peers = conf.getPeers();
+ final RaftConfigurationImpl conf = getRaftConf();
+ Collection<RaftPeer> peers = conf.getAllPeers();
return new NotLeaderException(getMemberId(), conf.getPeer(leaderId), peers);
}
@@ -888,7 +888,7 @@ class RaftServerImpl implements RaftServer.Division,
return reply;
}
- final RaftConfiguration current = getRaftConf();
+ final RaftConfigurationImpl current = getRaftConf();
final LeaderStateImpl leaderState = role.getLeaderStateNonNull();
// make sure there is no other raft reconfiguration in progress
if (!current.isStable() || leaderState.inStagingState() || !state.isConfCommitted()) {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
index 7bb58a8..86b6873 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
@@ -25,6 +25,7 @@ import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftGroupMemberId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.RaftConfiguration;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.util.Preconditions;
@@ -178,14 +179,14 @@ public interface ServerProtoUtils {
static RaftConfigurationProto.Builder toRaftConfigurationProto(RaftConfiguration conf) {
return RaftConfigurationProto.newBuilder()
- .addAllPeers(ProtoUtils.toRaftPeerProtos(conf.getPeersInConf()))
- .addAllOldPeers(ProtoUtils.toRaftPeerProtos(conf.getPeersInOldConf()));
+ .addAllPeers(ProtoUtils.toRaftPeerProtos(conf.getCurrentPeers()))
+ .addAllOldPeers(ProtoUtils.toRaftPeerProtos(conf.getPreviousPeers()));
}
- static RaftConfiguration toRaftConfiguration(LogEntryProto entry) {
+ static RaftConfigurationImpl toRaftConfiguration(LogEntryProto entry) {
Preconditions.assertTrue(entry.hasConfigurationEntry());
final RaftConfigurationProto proto = entry.getConfigurationEntry();
- final RaftConfiguration.Builder b = RaftConfiguration.newBuilder()
+ final RaftConfigurationImpl.Builder b = RaftConfigurationImpl.newBuilder()
.setConf(ProtoUtils.toRaftPeers(proto.getPeersList()))
.setLogEntryIndex(entry.getIndex());
if (proto.getOldPeersCount() > 0) {
@@ -398,7 +399,7 @@ public interface ServerProtoUtils {
.setDone(done);
// Set term to DEFAULT_TERM as this term is not going to used by installSnapshot to update the RaftConfiguration
final LogEntryProto confLogEntryProto = toLogEntryProto(raftConfiguration, DEFAULT_TERM,
- raftConfiguration.getLogEntryIndex());
+ ((RaftConfigurationImpl)raftConfiguration).getLogEntryIndex());
return InstallSnapshotRequestProto.newBuilder()
.setServerRequest(toRaftRpcRequestProtoBuilder(requestorId, replyId))
.setLastRaftConfigurationLogEntryProto(confLogEntryProto)
@@ -415,7 +416,7 @@ public interface ServerProtoUtils {
.setFirstAvailableTermIndex(toTermIndexProto(firstAvailable));
// Set term to DEFAULT_TERM as this term is not going to used by installSnapshot to update the RaftConfiguration
final LogEntryProto confLogEntryProto = toLogEntryProto(raftConfiguration, DEFAULT_TERM,
- raftConfiguration.getLogEntryIndex());
+ ((RaftConfigurationImpl)raftConfiguration).getLogEntryIndex());
return InstallSnapshotRequestProto.newBuilder()
.setServerRequest(toRaftRpcRequestProtoBuilder(requestorId, replyId))
.setLastRaftConfigurationLogEntryProto(confLogEntryProto)
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index 9f7e02b..2d62494 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -20,6 +20,7 @@ package org.apache.ratis.server.impl;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.*;
import org.apache.ratis.protocol.exceptions.StateMachineException;
+import org.apache.ratis.server.RaftConfiguration;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.raftlog.RaftLog;
@@ -99,8 +100,7 @@ class ServerState implements Closeable {
throws IOException {
this.memberId = RaftGroupMemberId.valueOf(id, group.getGroupId());
this.server = server;
- RaftConfiguration initialConf = RaftConfiguration.newBuilder()
- .setConf(group.getPeers()).build();
+ final RaftConfigurationImpl initialConf = RaftConfigurationImpl.newBuilder().setConf(group.getPeers()).build();
configurationManager = new ConfigurationManager(initialConf);
LOG.info("{}: {}", getMemberId(), configurationManager);
@@ -167,8 +167,8 @@ class ServerState implements Closeable {
sm.initialize(server.getRaftServer(), gid, storage);
// get the raft configuration from raft metafile
RaftConfiguration raftConf = storage.readRaftConfiguration();
- if (raftConf != null) {
- setRaftConf(raftConf.getLogEntryIndex(), raftConf);
+ if (raftConf instanceof RaftConfigurationImpl) {
+ setRaftConf(((RaftConfigurationImpl)raftConf).getLogEntryIndex(), raftConf);
}
}
@@ -197,7 +197,7 @@ class ServerState implements Closeable {
return log;
}
- RaftConfiguration getRaftConf() {
+ RaftConfigurationImpl getRaftConf() {
return configurationManager.getCurrent();
}
@@ -373,7 +373,7 @@ class ServerState implements Closeable {
void setRaftConf(long logIndex, RaftConfiguration conf) {
configurationManager.addConfiguration(logIndex, conf);
- server.getServerRpc().addRaftPeers(conf.getPeers());
+ server.getServerRpc().addRaftPeers(conf.getAllPeers());
LOG.info("{}: set configuration {} at {}", getMemberId(), conf, logIndex);
LOG.trace("{}: {}", getMemberId(), configurationManager);
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java
index d7c71c5..1e2d11b 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java
@@ -23,7 +23,7 @@ import org.apache.ratis.protocol.RaftGroupMemberId;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.exceptions.StateMachineException;
import org.apache.ratis.server.RaftServerConfigKeys;
-import org.apache.ratis.server.impl.RaftConfiguration;
+import org.apache.ratis.server.RaftConfiguration;
import org.apache.ratis.server.impl.ServerProtoUtils;
import org.apache.ratis.server.metrics.RaftLogMetrics;
import org.apache.ratis.server.protocol.TermIndex;
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogSequentialOps.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogSequentialOps.java
index 85349a1..34a8f89 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogSequentialOps.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogSequentialOps.java
@@ -19,7 +19,7 @@ package org.apache.ratis.server.raftlog;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.protocol.exceptions.StateMachineException;
-import org.apache.ratis.server.impl.RaftConfiguration;
+import org.apache.ratis.server.RaftConfiguration;
import org.apache.ratis.statemachine.TransactionContext;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.StringUtils;
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorage.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorage.java
index 9edce75..18810f3 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorage.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorage.java
@@ -19,7 +19,7 @@ package org.apache.ratis.server.storage;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.server.RaftServerConfigKeys.Log.CorruptionPolicy;
-import org.apache.ratis.server.impl.RaftConfiguration;
+import org.apache.ratis.server.RaftConfiguration;
import org.apache.ratis.server.impl.RaftServerConstants;
import org.apache.ratis.server.impl.ServerProtoUtils;
import org.apache.ratis.server.storage.RaftStorageDirectory.StorageState;
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
index 9c6a226..183284e 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
@@ -19,18 +19,25 @@ package org.apache.ratis.server.impl;
import org.apache.log4j.Level;
import org.apache.ratis.BaseTest;
-import org.apache.ratis.server.impl.MiniRaftCluster.PeerChanges;
import org.apache.ratis.RaftTestUtil;
import org.apache.ratis.RaftTestUtil.SimpleMessage;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.client.RaftClientRpc;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
-import org.apache.ratis.protocol.*;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.SetConfigurationRequest;
import org.apache.ratis.protocol.exceptions.LeaderNotReadyException;
import org.apache.ratis.protocol.exceptions.ReconfigurationInProgressException;
import org.apache.ratis.protocol.exceptions.ReconfigurationTimeoutException;
+import org.apache.ratis.server.RaftConfiguration;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.impl.MiniRaftCluster.PeerChanges;
import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.server.storage.RaftStorageTestUtils;
import org.apache.ratis.util.JavaUtils;
@@ -43,13 +50,13 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.List;
import static java.util.Arrays.asList;
import static org.apache.ratis.server.impl.RaftServerTestUtil.waitAndCheckNewConf;
@@ -262,7 +269,7 @@ public abstract class RaftReconfigurationBaseTest<CLUSTER extends MiniRaftCluste
LOG.info("Start changing the configuration: {}",
asList(c1.allPeersInNewConf));
- Assert.assertFalse(RaftServerTestUtil.getRaftConf(cluster.getLeader()).isTransitional());
+ Assert.assertFalse(((RaftConfigurationImpl)cluster.getLeader().getRaftConf()).isTransitional());
final RaftClientRpc sender = client.getClientRpc();
final SetConfigurationRequest request = cluster.newSetConfigurationRequest(
@@ -406,7 +413,7 @@ public abstract class RaftReconfigurationBaseTest<CLUSTER extends MiniRaftCluste
TimeUnit.SECONDS.sleep(1);
// the leader cannot generate the (old, new) conf, and it will keep
// bootstrapping the 2 new peers since they have not started yet
- Assert.assertFalse(RaftServerTestUtil.getRaftConf(cluster.getLeader()).isTransitional());
+ Assert.assertFalse(((RaftConfigurationImpl)cluster.getLeader().getRaftConf()).isTransitional());
// only (0) the first conf entry, (1) the 1st setConf entry and (2) a metadata entry
{
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
index aba4700..55f650b 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
@@ -25,6 +25,7 @@ import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.DataStreamMap;
import org.apache.ratis.server.DivisionInfo;
+import org.apache.ratis.server.RaftConfiguration;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerRpc;
import org.apache.ratis.server.leader.LogAppender;
@@ -72,7 +73,7 @@ public class RaftServerTestUtil {
int numIncluded = 0;
int deadIncluded = 0;
- final RaftConfiguration current = RaftConfiguration.newBuilder()
+ final RaftConfigurationImpl current = RaftConfigurationImpl.newBuilder()
.setConf(peers).setLogEntryIndex(0).build();
for (RaftServer.Division d : cluster.iterateDivisions()) {
final RaftServerImpl server = (RaftServerImpl)d;
@@ -83,15 +84,16 @@ public class RaftServerTestUtil {
}
continue;
}
+ final RaftConfigurationImpl conf = server.getState().getRaftConf();
if (current.containsInConf(server.getId())) {
numIncluded++;
- Assert.assertTrue(server.getRaftConf().isStable());
- Assert.assertTrue(server.getRaftConf().hasNoChange(peers));
+ Assert.assertTrue(conf.isStable());
+ Assert.assertTrue(conf.hasNoChange(peers));
} else if (server.getInfo().isAlive()) {
// The server is successfully removed from the conf
// It may not be shutdown since it may not be able to talk to the new leader (who is not in its conf).
- Assert.assertTrue(server.getRaftConf().isStable());
- Assert.assertFalse(server.getRaftConf().containsInConf(server.getId()));
+ Assert.assertTrue(conf.isStable());
+ Assert.assertFalse(conf.containsInConf(server.getId()));
}
}
Assert.assertEquals(peers.size(), numIncluded + deadIncluded);
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
index 32eb2de..6694db5 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
@@ -55,7 +55,7 @@ import org.apache.ratis.protocol.SetConfigurationRequest;
import org.apache.ratis.server.DataStreamMap;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.impl.DataStreamServerImpl;
-import org.apache.ratis.server.impl.RaftConfiguration;
+import org.apache.ratis.server.RaftConfiguration;
import org.apache.ratis.server.impl.RaftServerTestUtil;
import org.apache.ratis.server.ServerFactory;
import org.apache.ratis.server.metrics.RaftServerMetrics;