You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2022/03/02 18:36:51 UTC

[ratis] branch master updated: RATIS-1540. Support listeners in AdminApi, SetConfigurationRequest and proto. (#614)

This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new a6c7589  RATIS-1540. Support listeners in AdminApi, SetConfigurationRequest and proto. (#614)
a6c7589 is described below

commit a6c75898d2c45ad971e69b7233bb89118fa1eb49
Author: Yaolong Liu <ly...@163.com>
AuthorDate: Thu Mar 3 02:36:25 2022 +0800

    RATIS-1540. Support listeners in AdminApi, SetConfigurationRequest and proto. (#614)
---
 .../main/java/org/apache/ratis/client/api/AdminApi.java | 17 +++++++++++++++--
 .../java/org/apache/ratis/client/impl/AdminImpl.java    |  5 +++--
 .../org/apache/ratis/client/impl/ClientProtoUtils.java  |  4 +++-
 .../apache/ratis/protocol/SetConfigurationRequest.java  | 12 ++++++++++++
 ratis-proto/src/main/proto/Raft.proto                   |  3 +++
 .../org/apache/ratis/server/impl/PeerConfiguration.java |  5 ++++-
 .../apache/ratis/server/impl/RaftConfigurationImpl.java |  4 ++--
 .../org/apache/ratis/server/impl/ServerImplUtils.java   |  9 ++++++---
 .../org/apache/ratis/server/raftlog/LogProtoUtils.java  |  7 ++++---
 .../org/apache/ratis/server/impl/MiniRaftCluster.java   |  3 ++-
 10 files changed, 54 insertions(+), 15 deletions(-)

diff --git a/ratis-client/src/main/java/org/apache/ratis/client/api/AdminApi.java b/ratis-client/src/main/java/org/apache/ratis/client/api/AdminApi.java
index 175fd95..706c917 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/api/AdminApi.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/api/AdminApi.java
@@ -23,6 +23,7 @@ import org.apache.ratis.protocol.RaftPeerId;
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 
 /**
@@ -30,14 +31,26 @@ import java.util.List;
  * such as setting raft configuration and transferring leadership.
  */
 public interface AdminApi {
-  /** Set the configuration request to the raft service. */
-  RaftClientReply setConfiguration(List<RaftPeer> serversInNewConf) throws IOException;
+  /** The same as setConfiguration(serversInNewConf, Collections.emptyList()). */
+  default RaftClientReply setConfiguration(List<RaftPeer> serversInNewConf) throws IOException {
+    return setConfiguration(serversInNewConf, Collections.emptyList());
+  }
 
   /** The same as setConfiguration(Arrays.asList(serversInNewConf)). */
   default RaftClientReply setConfiguration(RaftPeer[] serversInNewConf) throws IOException {
     return setConfiguration(Arrays.asList(serversInNewConf));
   }
 
+  /** Set the configuration request to the raft service. */
+  RaftClientReply setConfiguration(List<RaftPeer> serversInNewConf, List<RaftPeer> listenersInNewConf)
+      throws IOException;
+
+  /** The same as setConfiguration(Arrays.asList(serversInNewConf), Arrays.asList(listenersInNewConf)). */
+  default RaftClientReply setConfiguration(RaftPeer[] serversInNewConf, RaftPeer[] listenersInNewConf)
+      throws IOException {
+    return setConfiguration(Arrays.asList(serversInNewConf), Arrays.asList(listenersInNewConf));
+  }
+
   /** Transfer leadership to the given server.*/
   RaftClientReply transferLeadership(RaftPeerId newLeader, long timeoutMs) throws IOException;
 }
\ No newline at end of file
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/AdminImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/AdminImpl.java
index b555d03..1bb5cee 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/AdminImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/AdminImpl.java
@@ -37,14 +37,15 @@ class AdminImpl implements AdminApi {
   }
 
   @Override
-  public RaftClientReply setConfiguration(List<RaftPeer> peersInNewConf) throws IOException {
+  public RaftClientReply setConfiguration(List<RaftPeer> peersInNewConf, List<RaftPeer> listenersInNewConf)
+      throws IOException {
     Objects.requireNonNull(peersInNewConf, "peersInNewConf == null");
 
     final long callId = CallId.getAndIncrement();
     // also refresh the rpc proxies for these peers
     client.getClientRpc().addRaftPeers(peersInNewConf);
     return client.io().sendRequestWithRetry(() -> new SetConfigurationRequest(
-        client.getId(), client.getLeaderId(), client.getGroupId(), callId, peersInNewConf));
+        client.getId(), client.getLeaderId(), client.getGroupId(), callId, peersInNewConf, listenersInNewConf));
   }
 
   @Override
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
index 43ed7a0..3f8fa45 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
@@ -501,11 +501,12 @@ public interface ClientProtoUtils {
       SetConfigurationRequestProto p) {
     final RaftRpcRequestProto m = p.getRpcRequest();
     final List<RaftPeer> peers = ProtoUtils.toRaftPeers(p.getPeersList());
+    final List<RaftPeer> listeners = ProtoUtils.toRaftPeers(p.getListenersList());
     return new SetConfigurationRequest(
         ClientId.valueOf(m.getRequestorId()),
         RaftPeerId.valueOf(m.getReplyId()),
         ProtoUtils.toRaftGroupId(m.getRaftGroupId()),
-        p.getRpcRequest().getCallId(), peers);
+        p.getRpcRequest().getCallId(), peers, listeners);
   }
 
   static SetConfigurationRequestProto toSetConfigurationRequestProto(
@@ -513,6 +514,7 @@ public interface ClientProtoUtils {
     return SetConfigurationRequestProto.newBuilder()
         .setRpcRequest(toRaftRpcRequestProtoBuilder(request))
         .addAllPeers(ProtoUtils.toRaftPeerProtos(request.getPeersInNewConf()))
+        .addAllListeners(ProtoUtils.toRaftPeerProtos(request.getListenersInNewConf()))
         .build();
   }
 
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/SetConfigurationRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/SetConfigurationRequest.java
index 5a8fc21..da551f2 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/SetConfigurationRequest.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/SetConfigurationRequest.java
@@ -24,18 +24,30 @@ import java.util.List;
 
 public class SetConfigurationRequest extends RaftClientRequest {
   private final List<RaftPeer> peers;
+  private final List<RaftPeer> listeners;
 
   public SetConfigurationRequest(ClientId clientId, RaftPeerId serverId,
       RaftGroupId groupId, long callId, List<RaftPeer> peers) {
+    this(clientId, serverId, groupId, callId, peers, Collections.emptyList());
+  }
+
+  public SetConfigurationRequest(ClientId clientId, RaftPeerId serverId,
+      RaftGroupId groupId, long callId, List<RaftPeer> peers, List<RaftPeer> listeners) {
     super(clientId, serverId, groupId, callId, true, writeRequestType());
     this.peers = peers != null? Collections.unmodifiableList(peers): Collections.emptyList();
+    this.listeners = listeners !=  null? Collections.unmodifiableList(listeners) : Collections.emptyList();
     Preconditions.assertUnique(this.peers);
+    Preconditions.assertUnique(this.listeners);
   }
 
   public List<RaftPeer> getPeersInNewConf() {
     return peers;
   }
 
+  public List<RaftPeer> getListenersInNewConf() {
+    return listeners;
+  }
+
   @Override
   public String toString() {
     return super.toString() + ", peers:" + getPeersInNewConf();
diff --git a/ratis-proto/src/main/proto/Raft.proto b/ratis-proto/src/main/proto/Raft.proto
index d4d8bee..df2c3d8 100644
--- a/ratis-proto/src/main/proto/Raft.proto
+++ b/ratis-proto/src/main/proto/Raft.proto
@@ -51,6 +51,8 @@ message RaftGroupMemberIdProto {
 message RaftConfigurationProto {
   repeated RaftPeerProto peers = 1; // the peers in the current or new conf
   repeated RaftPeerProto oldPeers = 2; // the peers in the old conf
+  repeated RaftPeerProto listeners = 3;
+  repeated RaftPeerProto oldListeners = 4;
 }
 
 message StateMachineEntryProto {
@@ -406,6 +408,7 @@ message RaftClientReplyProto {
 message SetConfigurationRequestProto {
   RaftRpcRequestProto rpcRequest = 1;
   repeated RaftPeerProto peers = 2;
+  repeated RaftPeerProto listeners = 3;
 }
 
 // transfer leadership request
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/PeerConfiguration.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/PeerConfiguration.java
index d8e3c7d..530b20c 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/PeerConfiguration.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/PeerConfiguration.java
@@ -28,6 +28,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 
 /**
  * The peer configuration of a raft cluster.
@@ -69,7 +70,9 @@ class PeerConfiguration {
 
   PeerConfiguration(Iterable<RaftPeer> peers, Iterable<RaftPeer> listeners) {
     this.peers = newMap(peers, "peers", Collections.emptyMap());
-    this.listeners = newMap(listeners, "listeners", this.peers);
+    this.listeners = Optional.ofNullable(listeners)
+        .map(l -> newMap(listeners, "listeners", this.peers))
+        .orElseGet(Collections::emptyMap);
   }
 
   Collection<RaftPeer> getPeers() {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfigurationImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfigurationImpl.java
index 4bab365..4f6c122 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfigurationImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfigurationImpl.java
@@ -88,8 +88,8 @@ final class RaftConfigurationImpl implements RaftConfiguration {
       return this;
     }
 
-    Builder setOldConf(Iterable<RaftPeer> oldPeers) {
-      return setOldConf(new PeerConfiguration(oldPeers));
+    Builder setOldConf(Iterable<RaftPeer> oldPeers, Iterable<RaftPeer> oldListeners) {
+      return setOldConf(new PeerConfiguration(oldPeers, oldListeners));
     }
 
     Builder setOldConf(RaftConfigurationImpl stableConf) {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
index 9bc9918..8bbdfa6 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
@@ -70,11 +70,14 @@ public final class ServerImplUtils {
     return proxy;
   }
 
-  public static RaftConfiguration newRaftConfiguration(List<RaftPeer> conf, long index, List<RaftPeer> oldConf) {
+  public static RaftConfiguration newRaftConfiguration(List<RaftPeer> conf, List<RaftPeer> listener,
+      long index, List<RaftPeer> oldConf, List<RaftPeer> oldListener) {
     final RaftConfigurationImpl.Builder b = RaftConfigurationImpl.newBuilder()
-        .setConf(conf)
+        .setConf(conf, listener)
         .setLogEntryIndex(index);
-    Optional.ofNullable(oldConf).filter(p -> p.size() > 0).ifPresent(b::setOldConf);
+    if (!oldConf.isEmpty() || !oldListener.isEmpty()) {
+      b.setOldConf(oldConf, oldListener);
+    }
     return b.build();
   }
 
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/LogProtoUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/LogProtoUtils.java
index 22d206c..849dda5 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/LogProtoUtils.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/LogProtoUtils.java
@@ -200,8 +200,9 @@ public final class LogProtoUtils {
     Preconditions.assertTrue(entry.hasConfigurationEntry());
     final RaftConfigurationProto proto = entry.getConfigurationEntry();
     final List<RaftPeer> conf = ProtoUtils.toRaftPeers(proto.getPeersList());
-    final List<RaftPeer> oldConf = proto.getOldPeersCount() == 0? null
-        : ProtoUtils.toRaftPeers(proto.getOldPeersList());
-    return ServerImplUtils.newRaftConfiguration(conf, entry.getIndex(), oldConf);
+    final List<RaftPeer> listener = ProtoUtils.toRaftPeers(proto.getListenersList());
+    final List<RaftPeer> oldConf = ProtoUtils.toRaftPeers(proto.getOldPeersList());
+    final List<RaftPeer> oldListener = ProtoUtils.toRaftPeers(proto.getOldListenersList());
+    return ServerImplUtils.newRaftConfiguration(conf, listener, entry.getIndex(), oldConf, oldListener);
   }
 }
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
index 5247d23..6ac14d7 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
@@ -739,7 +739,8 @@ public abstract class MiniRaftCluster implements Closeable {
   public SetConfigurationRequest newSetConfigurationRequest(
       ClientId clientId, RaftPeerId leaderId,
       RaftPeer... peers) {
-    return new SetConfigurationRequest(clientId, leaderId, getGroupId(), CallId.getDefault(), Arrays.asList(peers));
+    return new SetConfigurationRequest(clientId, leaderId, getGroupId(), CallId.getDefault(),
+        Arrays.asList(peers), Collections.emptyList());
   }
 
   public void setConfiguration(RaftPeer... peers) throws IOException {