You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2022/02/18 16:32:44 UTC
[ratis] branch master updated: RATIS-1301. Support listener in ratis (#598)
This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new a838ac4 RATIS-1301. Support listener in ratis (#598)
a838ac4 is described below
commit a838ac4bfc1076f54d4a5aa639b0e77259266b4e
Author: Yaolong Liu <ly...@163.com>
AuthorDate: Sat Feb 19 00:32:36 2022 +0800
RATIS-1301. Support listener in ratis (#598)
---
ratis-proto/src/main/proto/Raft.proto | 1 +
.../java/org/apache/ratis/server/DivisionInfo.java | 4 ++
.../org/apache/ratis/server/RaftConfiguration.java | 16 ++++++++
.../ratis/server/impl/PeerConfiguration.java | 45 ++++++++++++++++++---
.../ratis/server/impl/RaftConfigurationImpl.java | 46 ++++++++++++++++++++--
5 files changed, 103 insertions(+), 9 deletions(-)
diff --git a/ratis-proto/src/main/proto/Raft.proto b/ratis-proto/src/main/proto/Raft.proto
index f49afcd..d4d8bee 100644
--- a/ratis-proto/src/main/proto/Raft.proto
+++ b/ratis-proto/src/main/proto/Raft.proto
@@ -263,6 +263,7 @@ enum RaftPeerRole {
LEADER = 0;
CANDIDATE = 1;
FOLLOWER = 2;
+ LISTENER = 3;
}
message WriteRequestTypeProto {
diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/DivisionInfo.java b/ratis-server-api/src/main/java/org/apache/ratis/server/DivisionInfo.java
index f815924..e59cba5 100644
--- a/ratis-server-api/src/main/java/org/apache/ratis/server/DivisionInfo.java
+++ b/ratis-server-api/src/main/java/org/apache/ratis/server/DivisionInfo.java
@@ -44,6 +44,10 @@ public interface DivisionInfo {
return getCurrentRole() == RaftPeerRole.LEADER;
}
+ default boolean isListener() {
+ return getCurrentRole() == RaftPeerRole.LISTENER;
+ }
+
/** Is this server division currently the leader and ready? */
boolean isLeaderReady();
diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftConfiguration.java b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftConfiguration.java
index ffff90c..2708558 100644
--- a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftConfiguration.java
+++ b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftConfiguration.java
@@ -25,9 +25,16 @@ import java.util.Collection;
/**
* A configuration is a subset of the members in a {@link org.apache.ratis.protocol.RaftGroup}.
* The configuration of a cluster may change from time to time.
+ *
+ * In a configuration,
+ * - the peers are voting members such as LEADER, CANDIDATE and FOLLOWER;
+ * - the listeners are non-voting members.
+ *
* This class captures the current configuration and the previous configuration of a cluster.
*
* The objects of this class are immutable.
+ *
+ * @see org.apache.ratis.proto.RaftProtos.RaftPeerRole
*/
public interface RaftConfiguration {
/**
@@ -36,9 +43,18 @@ public interface RaftConfiguration {
*/
RaftPeer getPeer(RaftPeerId id);
+ /**
+ * @return the listener corresponding to the given id;
+ * or return null if the listener is not in this configuration.
+ */
+ RaftPeer getListener(RaftPeerId id);
+
/** @return all the peers in the current configuration and the previous configuration. */
Collection<RaftPeer> getAllPeers();
+ /** @return all the listeners in the current configuration and the previous configuration. */
+ Collection<RaftPeer> getAllListeners();
+
/** @return all the peers in the current configuration. */
Collection<RaftPeer> getCurrentPeers();
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/PeerConfiguration.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/PeerConfiguration.java
index 60cd33d..d8e3c7d 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/PeerConfiguration.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/PeerConfiguration.java
@@ -35,24 +35,51 @@ import java.util.Objects;
* The objects of this class are immutable.
*/
class PeerConfiguration {
+ /**
+ * Peers are voting members such as LEADER, CANDIDATE and FOLLOWER
+ * @see org.apache.ratis.proto.RaftProtos.RaftPeerRole
+ */
private final Map<RaftPeerId, RaftPeer> peers;
+ /**
+ * Listeners are non-voting members.
+ * @see org.apache.ratis.proto.RaftProtos.RaftPeerRole#LISTENER
+ */
+ private final Map<RaftPeerId, RaftPeer> listeners;
- PeerConfiguration(Iterable<RaftPeer> peers) {
- Objects.requireNonNull(peers);
- Map<RaftPeerId, RaftPeer> map = new HashMap<>();
+ static Map<RaftPeerId, RaftPeer> newMap(Iterable<RaftPeer> peers, String name, Map<RaftPeerId, RaftPeer> existing) {
+ Objects.requireNonNull(peers, () -> name + " == null");
+ final Map<RaftPeerId, RaftPeer> map = new HashMap<>();
for(RaftPeer p : peers) {
+ if (existing.containsKey(p.getId())) {
+ throw new IllegalArgumentException("Failed to initialize " + name
+ + ": Found " + p.getId() + " in existing peers " + existing);
+ }
final RaftPeer previous = map.putIfAbsent(p.getId(), p);
if (previous != null) {
- throw new IllegalArgumentException("Found duplicated ids " + p.getId() + " in peers " + peers);
+ throw new IllegalArgumentException("Failed to initialize " + name
+ + ": Found duplicated ids " + p.getId() + " in " + peers);
}
}
- this.peers = Collections.unmodifiableMap(map);
+ return Collections.unmodifiableMap(map);
+ }
+
+ PeerConfiguration(Iterable<RaftPeer> peers) {
+ this(peers, Collections.emptyList());
+ }
+
+ PeerConfiguration(Iterable<RaftPeer> peers, Iterable<RaftPeer> listeners) {
+ this.peers = newMap(peers, "peers", Collections.emptyMap());
+ this.listeners = newMap(listeners, "listeners", this.peers);
}
Collection<RaftPeer> getPeers() {
return Collections.unmodifiableCollection(peers.values());
}
+ Collection<RaftPeer> getListeners() {
+ return Collections.unmodifiableCollection(listeners.values());
+ }
+
int size() {
return peers.size();
}
@@ -66,10 +93,18 @@ class PeerConfiguration {
return peers.get(id);
}
+ RaftPeer getListener(RaftPeerId id) {
+ return listeners.get(id);
+ }
+
boolean contains(RaftPeerId id) {
return peers.containsKey(id);
}
+ boolean containsListener(RaftPeerId id) {
+ return listeners.containsKey(id);
+ }
+
List<RaftPeer> getOtherPeers(RaftPeerId selfId) {
List<RaftPeer> others = new ArrayList<>();
for (Map.Entry<RaftPeerId, RaftPeer> entry : peers.entrySet()) {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfigurationImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfigurationImpl.java
index dd85078..4bab365 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfigurationImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfigurationImpl.java
@@ -28,6 +28,8 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
+import java.util.function.BiFunction;
+import java.util.function.Function;
import java.util.stream.Collectors;
/**
@@ -65,6 +67,10 @@ final class RaftConfigurationImpl implements RaftConfiguration {
return setConf(new PeerConfiguration(peers));
}
+ Builder setConf(Iterable<RaftPeer> peers, Iterable<RaftPeer> listeners) {
+ return setConf(new PeerConfiguration(peers, listeners));
+ }
+
Builder setConf(RaftConfigurationImpl transitionalConf) {
Objects.requireNonNull(transitionalConf);
Preconditions.assertTrue(transitionalConf.isTransitional());
@@ -145,6 +151,10 @@ final class RaftConfigurationImpl implements RaftConfiguration {
return conf.contains(peerId);
}
+ boolean containsListenerInConf(RaftPeerId peerId) {
+ return conf.containsListener(peerId);
+ }
+
boolean isHighestPriority(RaftPeerId peerId) {
RaftPeer target = getPeer(peerId);
if (target == null) {
@@ -163,6 +173,10 @@ final class RaftConfigurationImpl implements RaftConfiguration {
return oldConf != null && oldConf.contains(peerId);
}
+ boolean containsListenerInOldConf(RaftPeerId peerId) {
+ return oldConf != null && oldConf.containsListener(peerId);
+ }
+
/**
* @return true iff the given peer is contained in conf and,
* if old conf exists, is contained in old conf.
@@ -172,25 +186,49 @@ final class RaftConfigurationImpl implements RaftConfiguration {
(oldConf == null || containsInOldConf(peerId));
}
+ boolean containsListenerInBothConfs(RaftPeerId peerId) {
+ return containsListenerInConf(peerId) &&
+ (oldConf == null || containsListenerInOldConf(peerId));
+ }
+
@Override
public RaftPeer getPeer(RaftPeerId id) {
+ return get(id, (c, peerId) -> c.getPeer(id));
+ }
+
+ @Override
+ public RaftPeer getListener(RaftPeerId id) {
+ return get(id, (c, peerId) -> c.getListener(id));
+ }
+
+ private RaftPeer get(RaftPeerId id, BiFunction<PeerConfiguration, RaftPeerId, RaftPeer> getMethod) {
if (id == null) {
return null;
}
- RaftPeer peer = conf.getPeer(id);
+ final RaftPeer peer = getMethod.apply(conf, id);
if (peer != null) {
return peer;
} else if (oldConf != null) {
- return oldConf.getPeer(id);
+ return getMethod.apply(oldConf, id);
}
return null;
}
@Override
public Collection<RaftPeer> getAllPeers() {
- final Collection<RaftPeer> peers = new ArrayList<>(conf.getPeers());
+ return getAll(PeerConfiguration::getPeers);
+ }
+
+ @Override
+ public Collection<RaftPeer> getAllListeners() {
+ return getAll(PeerConfiguration::getListeners);
+ }
+
+ private Collection<RaftPeer> getAll(Function<PeerConfiguration, Collection<RaftPeer>> getMethod) {
+ final Collection<RaftPeer> peers = new ArrayList<>(getMethod.apply(conf));
if (oldConf != null) {
- oldConf.getPeers().stream().filter(p -> !peers.contains(p))
+ getMethod.apply(oldConf).stream()
+ .filter(p -> !peers.contains(p))
.forEach(peers::add);
}
return peers;