You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2023/03/31 08:37:43 UTC
[ratis] branch master updated: RATIS-1826: Listener will change to follower when using ratis shell (#867)
This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new f70bf4fef RATIS-1826: Listener will change to follower when using ratis shell (#867)
f70bf4fef is described below
commit f70bf4fef0c919978408ceaf5ae569ff164e6136
Author: qian0817 <qi...@gmail.com>
AuthorDate: Fri Mar 31 16:37:37 2023 +0800
RATIS-1826: Listener will change to follower when using ratis shell (#867)
---
.../apache/ratis/client/impl/ClientProtoUtils.java | 3 ++-
.../org/apache/ratis/protocol/GroupInfoReply.java | 19 ++++++++++++----
ratis-proto/src/main/proto/Raft.proto | 1 +
.../apache/ratis/server/impl/RaftServerImpl.java | 5 ++++-
.../apache/ratis/server/raftlog/LogProtoUtils.java | 2 +-
.../shell/cli/sh/command/AbstractRatisCommand.java | 19 ++++++++++++++++
.../shell/cli/sh/election/TransferCommand.java | 6 ++++--
.../apache/ratis/shell/cli/sh/peer/AddCommand.java | 8 +++++--
.../ratis/shell/cli/sh/peer/RemoveCommand.java | 10 ++++++---
.../shell/cli/sh/peer/SetPriorityCommand.java | 25 ++++++++++------------
10 files changed, 70 insertions(+), 28 deletions(-)
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
index b3baabf9c..b5b1d8abe 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
@@ -539,7 +539,8 @@ public interface ClientProtoUtils {
replyProto.getCommitInfosList(),
ProtoUtils.toRaftGroup(replyProto.getGroup()),
replyProto.getRole(),
- replyProto.getIsRaftStorageHealthy());
+ replyProto.getIsRaftStorageHealthy(),
+ replyProto.hasConf()? replyProto.getConf(): null);
}
static Message toMessage(final ClientMessageEntryProto p) {
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/GroupInfoReply.java b/ratis-common/src/main/java/org/apache/ratis/protocol/GroupInfoReply.java
index 946bf2389..632fa6529 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/GroupInfoReply.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/GroupInfoReply.java
@@ -17,10 +17,12 @@
*/
package org.apache.ratis.protocol;
+import org.apache.ratis.proto.RaftProtos.RaftConfigurationProto;
import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
import org.apache.ratis.proto.RaftProtos.RoleInfoProto;
import java.util.Collection;
+import java.util.Optional;
/**
* The response of server information request. Sent from server to client.
@@ -30,21 +32,26 @@ public class GroupInfoReply extends RaftClientReply {
private final RaftGroup group;
private final RoleInfoProto roleInfoProto;
private final boolean isRaftStorageHealthy;
+ private final RaftConfigurationProto conf;
public GroupInfoReply(RaftClientRequest request, Collection<CommitInfoProto> commitInfos,
- RaftGroup group, RoleInfoProto roleInfoProto, boolean isRaftStorageHealthy) {
- this(request.getClientId(), request.getServerId(), request.getRaftGroupId(), request.getCallId(), commitInfos,
- group, roleInfoProto, isRaftStorageHealthy);
+ RaftGroup group, RoleInfoProto roleInfoProto, boolean isRaftStorageHealthy,
+ RaftConfigurationProto conf) {
+ this(request.getClientId(), request.getServerId(), request.getRaftGroupId(),
+ request.getCallId(), commitInfos,
+ group, roleInfoProto, isRaftStorageHealthy, conf);
}
@SuppressWarnings("parameternumber")
public GroupInfoReply(ClientId clientId, RaftPeerId serverId, RaftGroupId groupId, long callId,
Collection<CommitInfoProto> commitInfos,
- RaftGroup group, RoleInfoProto roleInfoProto, boolean isRaftStorageHealthy) {
+ RaftGroup group, RoleInfoProto roleInfoProto, boolean isRaftStorageHealthy,
+ RaftConfigurationProto conf) {
super(clientId, serverId, groupId, callId, true, null, null, 0L, commitInfos);
this.group = group;
this.roleInfoProto = roleInfoProto;
this.isRaftStorageHealthy = isRaftStorageHealthy;
+ this.conf = conf;
}
public RaftGroup getGroup() {
@@ -58,4 +65,8 @@ public class GroupInfoReply extends RaftClientReply {
public boolean isRaftStorageHealthy() {
return isRaftStorageHealthy;
}
+
+ public Optional<RaftConfigurationProto> getConf() {
+ return Optional.ofNullable(conf);
+ }
}
diff --git a/ratis-proto/src/main/proto/Raft.proto b/ratis-proto/src/main/proto/Raft.proto
index b83c375c6..b8680051f 100644
--- a/ratis-proto/src/main/proto/Raft.proto
+++ b/ratis-proto/src/main/proto/Raft.proto
@@ -551,4 +551,5 @@ message GroupInfoReplyProto {
RoleInfoProto role = 3;
bool isRaftStorageHealthy = 4;
repeated CommitInfoProto commitInfos = 5;
+ RaftConfigurationProto conf = 6;
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 4a65c3b80..90641b73e 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -596,7 +596,10 @@ class RaftServerImpl implements RaftServer.Division,
GroupInfoReply getGroupInfo(GroupInfoRequest request) {
final RaftStorageDirectory dir = state.getStorage().getStorageDir();
- return new GroupInfoReply(request, getCommitInfos(), getGroup(), getRoleInfoProto(), dir.isHealthy());
+ final RaftConfigurationProto conf =
+ LogProtoUtils.toRaftConfigurationProtoBuilder(getRaftConf()).build();
+ return new GroupInfoReply(request, getCommitInfos(), getGroup(), getRoleInfoProto(),
+ dir.isHealthy(), conf);
}
RoleInfoProto getRoleInfoProto() {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/LogProtoUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/LogProtoUtils.java
index dd8c67dc8..d84c35eb0 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/LogProtoUtils.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/LogProtoUtils.java
@@ -83,7 +83,7 @@ public final class LogProtoUtils {
.build();
}
- private static RaftConfigurationProto.Builder toRaftConfigurationProtoBuilder(RaftConfiguration conf) {
+ public static RaftConfigurationProto.Builder toRaftConfigurationProtoBuilder(RaftConfiguration conf) {
return RaftConfigurationProto.newBuilder()
.addAllPeers(ProtoUtils.toRaftPeerProtos(conf.getCurrentPeers()))
.addAllListeners(ProtoUtils.toRaftPeerProtos(conf.getCurrentPeers(RaftPeerRole.LISTENER)))
diff --git a/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/command/AbstractRatisCommand.java b/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/command/AbstractRatisCommand.java
index 8e92d9ace..74fcbae3d 100644
--- a/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/command/AbstractRatisCommand.java
+++ b/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/command/AbstractRatisCommand.java
@@ -25,10 +25,12 @@ import org.apache.ratis.shell.cli.RaftUtils;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.proto.RaftProtos.RaftConfigurationProto;
import org.apache.ratis.proto.RaftProtos.FollowerInfoProto;
import org.apache.ratis.proto.RaftProtos.RaftPeerProto;
import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
import org.apache.ratis.proto.RaftProtos.RoleInfoProto;
+import org.apache.ratis.util.ProtoUtils;
import org.apache.ratis.util.function.CheckedFunction;
import java.io.IOException;
@@ -38,6 +40,7 @@ import java.util.*;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
/**
* The base class for all the ratis shell {@link Command} classes.
@@ -215,4 +218,20 @@ public abstract class AbstractRatisCommand implements Command {
}
return ids;
}
+
+ protected Stream<RaftPeer> getPeerStream(RaftPeerRole role) {
+ final RaftConfigurationProto conf = groupInfoReply.getConf().orElse(null);
+ if (conf == null) {
+ // Assume all peers are followers in order preserve the pre-listener behaviors.
+ return role == RaftPeerRole.FOLLOWER ? getRaftGroup().getPeers().stream() : Stream.empty();
+ }
+ final Set<RaftPeer> targets = (role == RaftPeerRole.LISTENER ? conf.getListenersList() : conf.getPeersList())
+ .stream()
+ .map(ProtoUtils::toRaftPeer)
+ .collect(Collectors.toSet());
+ return getRaftGroup()
+ .getPeers()
+ .stream()
+ .filter(targets::contains);
+ }
}
diff --git a/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/election/TransferCommand.java b/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/election/TransferCommand.java
index 7dba5ae9e..c71d7f89f 100644
--- a/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/election/TransferCommand.java
+++ b/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/election/TransferCommand.java
@@ -21,6 +21,7 @@ import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.exceptions.TransferLeadershipException;
@@ -110,10 +111,11 @@ public class TransferCommand extends AbstractRatisCommand {
private void setPriority(RaftClient client, RaftPeer target, int priority) throws IOException {
printf("Changing priority of peer %s with address %s to %d%n", target.getId(), target.getAddress(), priority);
- List<RaftPeer> peers = getRaftGroup().getPeers().stream()
+ final List<RaftPeer> peers = getPeerStream(RaftPeerRole.FOLLOWER)
.map(peer -> peer == target ? RaftPeer.newBuilder(peer).setPriority(priority).build() : peer)
.collect(Collectors.toList());
- RaftClientReply reply = client.admin().setConfiguration(peers);
+ final List<RaftPeer> listeners = getPeerStream(RaftPeerRole.LISTENER).collect(Collectors.toList());
+ RaftClientReply reply = client.admin().setConfiguration(peers, listeners);
processReply(reply, () -> "Failed to set master priorities");
}
diff --git a/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/peer/AddCommand.java b/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/peer/AddCommand.java
index 317233531..3c65bb12d 100644
--- a/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/peer/AddCommand.java
+++ b/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/peer/AddCommand.java
@@ -21,6 +21,7 @@ import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
@@ -80,15 +81,18 @@ public class AddCommand extends AbstractRatisCommand {
}
try (RaftClient client = RaftUtils.createClient(getRaftGroup())) {
- final Stream<RaftPeer> remaining = getRaftGroup().getPeers().stream();
+ final Stream<RaftPeer> remaining = getPeerStream(RaftPeerRole.FOLLOWER);
final Stream<RaftPeer> adding = ids.stream().map(raftPeerId -> RaftPeer.newBuilder()
.setId(raftPeerId)
.setAddress(peersInfo.get(raftPeerId))
.setPriority(0)
.build());
final List<RaftPeer> peers = Stream.concat(remaining, adding).collect(Collectors.toList());
+ final List<RaftPeer> listeners = getPeerStream(RaftPeerRole.LISTENER)
+ .collect(Collectors.toList());
System.out.println("New peer list: " + peers);
- RaftClientReply reply = client.admin().setConfiguration(peers);
+ System.out.println("New listener list: " + listeners);
+ RaftClientReply reply = client.admin().setConfiguration(peers, listeners);
processReply(reply, () -> "Failed to change raft peer");
}
return 0;
diff --git a/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/peer/RemoveCommand.java b/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/peer/RemoveCommand.java
index d8cc76272..591851607 100644
--- a/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/peer/RemoveCommand.java
+++ b/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/peer/RemoveCommand.java
@@ -21,6 +21,7 @@ import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
@@ -66,10 +67,13 @@ public class RemoveCommand extends AbstractRatisCommand {
"Both " + PEER_ID_OPTION_NAME + " and " + ADDRESS_OPTION_NAME + " options are missing.");
}
try (RaftClient client = RaftUtils.createClient(getRaftGroup())) {
- final List<RaftPeer> remaining = getRaftGroup().getPeers().stream()
+ final List<RaftPeer> peers = getPeerStream(RaftPeerRole.FOLLOWER)
.filter(raftPeer -> !ids.contains(raftPeer.getId())).collect(Collectors.toList());
- System.out.println("New peer list: " + remaining);
- RaftClientReply reply = client.admin().setConfiguration(remaining);
+ final List<RaftPeer> listeners = getPeerStream(RaftPeerRole.LISTENER)
+ .filter(raftPeer -> !ids.contains(raftPeer.getId())).collect(Collectors.toList());
+ System.out.println("New peer list: " + peers);
+ System.out.println("New listener list: " + listeners);
+ final RaftClientReply reply = client.admin().setConfiguration(peers, listeners);
processReply(reply, () -> "Failed to change raft peer");
}
return 0;
diff --git a/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/peer/SetPriorityCommand.java b/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/peer/SetPriorityCommand.java
index 43206f87b..01e81f3c3 100644
--- a/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/peer/SetPriorityCommand.java
+++ b/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/peer/SetPriorityCommand.java
@@ -21,6 +21,7 @@ import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.shell.cli.RaftUtils;
@@ -28,10 +29,10 @@ import org.apache.ratis.shell.cli.sh.command.AbstractRatisCommand;
import org.apache.ratis.shell.cli.sh.command.Context;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
public class SetPriorityCommand extends AbstractRatisCommand {
@@ -55,7 +56,7 @@ public class SetPriorityCommand extends AbstractRatisCommand {
Map<String, Integer> addressPriorityMap = new HashMap<>();
for (String optionValue : cl.getOptionValues(PEER_WITH_NEW_PRIORITY_OPTION_NAME)) {
String[] str = optionValue.split("[|]");
- if(str.length < 2) {
+ if (str.length < 2) {
println("The format of the parameter is wrong");
return -1;
}
@@ -63,18 +64,14 @@ public class SetPriorityCommand extends AbstractRatisCommand {
}
try (RaftClient client = RaftUtils.createClient(getRaftGroup())) {
- List<RaftPeer> peers = new ArrayList<>();
- for (RaftPeer peer : getRaftGroup().getPeers()) {
- if (!addressPriorityMap.containsKey(peer.getAddress())) {
- peers.add(RaftPeer.newBuilder(peer).build());
- } else {
- peers.add(RaftPeer.newBuilder(peer)
- .setPriority(addressPriorityMap.get(peer.getAddress()))
- .build()
- );
- }
- }
- RaftClientReply reply = client.admin().setConfiguration(peers);
+ final List<RaftPeer> peers = getPeerStream(RaftPeerRole.FOLLOWER).map(peer -> {
+ final Integer newPriority = addressPriorityMap.get(peer.getAddress());
+ final int priority = newPriority != null ? newPriority : peer.getPriority();
+ return RaftPeer.newBuilder(peer).setPriority(priority).build();
+ }).collect(Collectors.toList());
+ final List<RaftPeer> listeners =
+ getPeerStream(RaftPeerRole.LISTENER).collect(Collectors.toList());
+ RaftClientReply reply = client.admin().setConfiguration(peers, listeners);
processReply(reply, () -> "Failed to set master priorities ");
}
return 0;