You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2022/02/18 16:32:44 UTC

[ratis] branch master updated: RATIS-1301. Support listener in ratis (#598)

This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new a838ac4  RATIS-1301. Support listener in ratis (#598)
a838ac4 is described below

commit a838ac4bfc1076f54d4a5aa639b0e77259266b4e
Author: Yaolong Liu <ly...@163.com>
AuthorDate: Sat Feb 19 00:32:36 2022 +0800

    RATIS-1301. Support listener in ratis (#598)
---
 ratis-proto/src/main/proto/Raft.proto              |  1 +
 .../java/org/apache/ratis/server/DivisionInfo.java |  4 ++
 .../org/apache/ratis/server/RaftConfiguration.java | 16 ++++++++
 .../ratis/server/impl/PeerConfiguration.java       | 45 ++++++++++++++++++---
 .../ratis/server/impl/RaftConfigurationImpl.java   | 46 ++++++++++++++++++++--
 5 files changed, 103 insertions(+), 9 deletions(-)

diff --git a/ratis-proto/src/main/proto/Raft.proto b/ratis-proto/src/main/proto/Raft.proto
index f49afcd..d4d8bee 100644
--- a/ratis-proto/src/main/proto/Raft.proto
+++ b/ratis-proto/src/main/proto/Raft.proto
@@ -263,6 +263,7 @@ enum RaftPeerRole {
   LEADER = 0;
   CANDIDATE = 1;
   FOLLOWER = 2;
+  LISTENER = 3;
 }
 
 message WriteRequestTypeProto {
diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/DivisionInfo.java b/ratis-server-api/src/main/java/org/apache/ratis/server/DivisionInfo.java
index f815924..e59cba5 100644
--- a/ratis-server-api/src/main/java/org/apache/ratis/server/DivisionInfo.java
+++ b/ratis-server-api/src/main/java/org/apache/ratis/server/DivisionInfo.java
@@ -44,6 +44,10 @@ public interface DivisionInfo {
     return getCurrentRole() == RaftPeerRole.LEADER;
   }
 
+  default boolean isListener() {
+    return getCurrentRole() == RaftPeerRole.LISTENER;
+  }
+
   /** Is this server division currently the leader and ready? */
   boolean isLeaderReady();
 
diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftConfiguration.java b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftConfiguration.java
index ffff90c..2708558 100644
--- a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftConfiguration.java
+++ b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftConfiguration.java
@@ -25,9 +25,16 @@ import java.util.Collection;
 /**
  * A configuration is a subset of the members in a {@link org.apache.ratis.protocol.RaftGroup}.
  * The configuration of a cluster may change from time to time.
+ *
+ * In a configuration,
+ * - the peers are voting members such as LEADER, CANDIDATE and FOLLOWER;
+ * - the listeners are non-voting members.
+ *
  * This class captures the current configuration and the previous configuration of a cluster.
  *
  * The objects of this class are immutable.
+ *
+ * @see org.apache.ratis.proto.RaftProtos.RaftPeerRole
  */
 public interface RaftConfiguration {
   /**
@@ -36,9 +43,18 @@ public interface RaftConfiguration {
    */
   RaftPeer getPeer(RaftPeerId id);
 
+  /**
+   * @return the listener corresponding to the given id;
+   *         or return null if the listener is not in this configuration.
+   */
+  RaftPeer getListener(RaftPeerId id);
+
   /** @return all the peers in the current configuration and the previous configuration. */
   Collection<RaftPeer> getAllPeers();
 
+  /** @return all the listeners in the current configuration and the previous configuration. */
+  Collection<RaftPeer> getAllListeners();
+
   /** @return all the peers in the current configuration. */
   Collection<RaftPeer> getCurrentPeers();
 
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/PeerConfiguration.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/PeerConfiguration.java
index 60cd33d..d8e3c7d 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/PeerConfiguration.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/PeerConfiguration.java
@@ -35,24 +35,51 @@ import java.util.Objects;
  * The objects of this class are immutable.
  */
 class PeerConfiguration {
+  /**
+   * Peers are voting members such as LEADER, CANDIDATE and FOLLOWER
+   * @see org.apache.ratis.proto.RaftProtos.RaftPeerRole
+   */
   private final Map<RaftPeerId, RaftPeer> peers;
+  /**
+   * Listeners are non-voting members.
+   * @see org.apache.ratis.proto.RaftProtos.RaftPeerRole#LISTENER
+   */
+  private final Map<RaftPeerId, RaftPeer> listeners;
 
-  PeerConfiguration(Iterable<RaftPeer> peers) {
-    Objects.requireNonNull(peers);
-    Map<RaftPeerId, RaftPeer> map = new HashMap<>();
+  static Map<RaftPeerId, RaftPeer> newMap(Iterable<RaftPeer> peers, String name, Map<RaftPeerId, RaftPeer> existing) {
+    Objects.requireNonNull(peers, () -> name + " == null");
+    final Map<RaftPeerId, RaftPeer> map = new HashMap<>();
     for(RaftPeer p : peers) {
+      if (existing.containsKey(p.getId())) {
+        throw new IllegalArgumentException("Failed to initialize " + name
+            + ": Found " + p.getId() + " in existing peers " + existing);
+      }
       final RaftPeer previous = map.putIfAbsent(p.getId(), p);
       if (previous != null) {
-        throw new IllegalArgumentException("Found duplicated ids " + p.getId() + " in peers " + peers);
+        throw new IllegalArgumentException("Failed to initialize " + name
+            + ": Found duplicated ids " + p.getId() + " in " + peers);
       }
     }
-    this.peers = Collections.unmodifiableMap(map);
+    return Collections.unmodifiableMap(map);
+  }
+
+  PeerConfiguration(Iterable<RaftPeer> peers) {
+    this(peers, Collections.emptyList());
+  }
+
+  PeerConfiguration(Iterable<RaftPeer> peers, Iterable<RaftPeer> listeners) {
+    this.peers = newMap(peers, "peers", Collections.emptyMap());
+    this.listeners = newMap(listeners, "listeners", this.peers);
   }
 
   Collection<RaftPeer> getPeers() {
     return Collections.unmodifiableCollection(peers.values());
   }
 
+  Collection<RaftPeer> getListeners() {
+    return Collections.unmodifiableCollection(listeners.values());
+  }
+
   int size() {
     return peers.size();
   }
@@ -66,10 +93,18 @@ class PeerConfiguration {
     return peers.get(id);
   }
 
+  RaftPeer getListener(RaftPeerId id) {
+    return listeners.get(id);
+  }
+
   boolean contains(RaftPeerId id) {
     return peers.containsKey(id);
   }
 
+  boolean containsListener(RaftPeerId id) {
+    return listeners.containsKey(id);
+  }
+
   List<RaftPeer> getOtherPeers(RaftPeerId selfId) {
     List<RaftPeer> others = new ArrayList<>();
     for (Map.Entry<RaftPeerId, RaftPeer> entry : peers.entrySet()) {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfigurationImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfigurationImpl.java
index dd85078..4bab365 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfigurationImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfigurationImpl.java
@@ -28,6 +28,8 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
+import java.util.function.BiFunction;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 
 /**
@@ -65,6 +67,10 @@ final class RaftConfigurationImpl implements RaftConfiguration {
       return setConf(new PeerConfiguration(peers));
     }
 
+    Builder setConf(Iterable<RaftPeer> peers, Iterable<RaftPeer> listeners) {
+      return setConf(new PeerConfiguration(peers, listeners));
+    }
+
     Builder setConf(RaftConfigurationImpl transitionalConf) {
       Objects.requireNonNull(transitionalConf);
       Preconditions.assertTrue(transitionalConf.isTransitional());
@@ -145,6 +151,10 @@ final class RaftConfigurationImpl implements RaftConfiguration {
     return conf.contains(peerId);
   }
 
+  boolean containsListenerInConf(RaftPeerId peerId) {
+    return conf.containsListener(peerId);
+  }
+
   boolean isHighestPriority(RaftPeerId peerId) {
     RaftPeer target = getPeer(peerId);
     if (target == null) {
@@ -163,6 +173,10 @@ final class RaftConfigurationImpl implements RaftConfiguration {
     return oldConf != null && oldConf.contains(peerId);
   }
 
+  boolean containsListenerInOldConf(RaftPeerId peerId) {
+    return oldConf != null && oldConf.containsListener(peerId);
+  }
+
   /**
    * @return true iff the given peer is contained in conf and,
    *         if old conf exists, is contained in old conf.
@@ -172,25 +186,49 @@ final class RaftConfigurationImpl implements RaftConfiguration {
         (oldConf == null || containsInOldConf(peerId));
   }
 
+  boolean containsListenerInBothConfs(RaftPeerId peerId) {
+    return containsListenerInConf(peerId) &&
+        (oldConf == null || containsListenerInOldConf(peerId));
+  }
+
   @Override
   public RaftPeer getPeer(RaftPeerId id) {
+    return get(id, (c, peerId) -> c.getPeer(id));
+  }
+
+  @Override
+  public RaftPeer getListener(RaftPeerId id) {
+    return get(id, (c, peerId) -> c.getListener(id));
+  }
+
+  private RaftPeer get(RaftPeerId id, BiFunction<PeerConfiguration, RaftPeerId, RaftPeer> getMethod) {
     if (id == null) {
       return null;
     }
-    RaftPeer peer = conf.getPeer(id);
+    final RaftPeer peer = getMethod.apply(conf, id);
     if (peer != null) {
       return peer;
     } else if (oldConf != null) {
-      return oldConf.getPeer(id);
+      return getMethod.apply(oldConf, id);
     }
     return null;
   }
 
   @Override
   public Collection<RaftPeer> getAllPeers() {
-    final Collection<RaftPeer> peers = new ArrayList<>(conf.getPeers());
+    return getAll(PeerConfiguration::getPeers);
+  }
+
+  @Override
+  public Collection<RaftPeer> getAllListeners() {
+    return getAll(PeerConfiguration::getListeners);
+  }
+
+  private Collection<RaftPeer> getAll(Function<PeerConfiguration, Collection<RaftPeer>> getMethod) {
+    final Collection<RaftPeer> peers = new ArrayList<>(getMethod.apply(conf));
     if (oldConf != null) {
-      oldConf.getPeers().stream().filter(p -> !peers.contains(p))
+      getMethod.apply(oldConf).stream()
+          .filter(p -> !peers.contains(p))
           .forEach(peers::add);
     }
     return peers;