You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2023/02/21 19:16:36 UTC
[ratis] branch master updated: RATIS-1769. Avoid changing priorities in TransferCommand unless necessary (#808)
This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 4bb35f022 RATIS-1769. Avoid changing priorities in TransferCommand unless necessary (#808)
4bb35f022 is described below
commit 4bb35f022688105dedec483aca6fe77f84a75f3e
Author: Kaijie Chen <ck...@apache.org>
AuthorDate: Wed Feb 22 03:16:30 2023 +0800
RATIS-1769. Avoid changing priorities in TransferCommand unless necessary (#808)
---
.../shell/cli/sh/command/AbstractRatisCommand.java | 4 +
.../shell/cli/sh/election/TransferCommand.java | 101 ++++++++++++++-------
.../cli/sh/ElectionCommandIntegrationTest.java | 36 ++++++++
3 files changed, 106 insertions(+), 35 deletions(-)
diff --git a/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/command/AbstractRatisCommand.java b/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/command/AbstractRatisCommand.java
index a39dad6b9..8e92d9ace 100644
--- a/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/command/AbstractRatisCommand.java
+++ b/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/command/AbstractRatisCommand.java
@@ -151,6 +151,10 @@ public abstract class AbstractRatisCommand implements Command {
.addOption(GROUPID_OPTION_NAME, true, "Raft group id");
}
+ protected PrintStream getPrintStream() {
+ return printStream;
+ }
+
protected void printf(String format, Object... args) {
printStream.printf(format, args);
}
diff --git a/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/election/TransferCommand.java b/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/election/TransferCommand.java
index a1de4dbe3..b5180548c 100644
--- a/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/election/TransferCommand.java
+++ b/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/election/TransferCommand.java
@@ -23,14 +23,16 @@ import org.apache.commons.cli.Options;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftPeer;
-import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.exceptions.TransferLeadershipException;
import org.apache.ratis.shell.cli.RaftUtils;
import org.apache.ratis.shell.cli.sh.command.AbstractRatisCommand;
import org.apache.ratis.shell.cli.sh.command.Context;
+import org.apache.ratis.util.TimeDuration;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
@@ -38,6 +40,7 @@ import java.util.stream.Collectors;
*/
public class TransferCommand extends AbstractRatisCommand {
public static final String ADDRESS_OPTION_NAME = "address";
+ public static final String TIMEOUT_OPTION_NAME = "timeout";
/**
* @param context command context
*/
@@ -55,54 +58,76 @@ public class TransferCommand extends AbstractRatisCommand {
super.run(cl);
String strAddr = cl.getOptionValue(ADDRESS_OPTION_NAME);
+ // TODO: Default timeout should be set to 0, which means let server decide (based on election timeout).
+ // However, occasionally the request could timeout too fast while the transfer is in progress.
+ // i.e. request timeout doesn't mean transfer leadership has failed.
+ // Currently, Ratis shell returns merely based on the result of the request.
+ // So we set a larger default timeout here (3s).
+ final TimeDuration timeoutDefault = TimeDuration.valueOf(3, TimeUnit.SECONDS);
+ // Default timeout for legacy mode matches with the legacy command (version 2.4.x and older).
+ final TimeDuration timeoutLegacy = TimeDuration.valueOf(60, TimeUnit.SECONDS);
+ final Optional<TimeDuration> timeout = !cl.hasOption(TIMEOUT_OPTION_NAME) ? Optional.empty() :
+ Optional.of(TimeDuration.valueOf(cl.getOptionValue(TIMEOUT_OPTION_NAME), TimeUnit.SECONDS));
- RaftPeerId newLeaderId = null;
- // update priorities to enable transfer
- List<RaftPeer> peersWithNewPriorities = new ArrayList<>();
- for (RaftPeer peer : getRaftGroup().getPeers()) {
- peersWithNewPriorities.add(
- RaftPeer.newBuilder(peer)
- .setPriority(peer.getAddress().equals(strAddr) ? 2 : 1)
- .build()
- );
- if (peer.getAddress().equals(strAddr)) {
- newLeaderId = peer.getId();
- }
- }
- if (newLeaderId == null) {
+ final int highestPriority = getRaftGroup().getPeers().stream()
+ .mapToInt(RaftPeer::getPriority).max().orElse(0);
+ RaftPeer newLeader = getRaftGroup().getPeers().stream()
+ .filter(peer -> peer.getAddress().equals(strAddr)).findAny().orElse(null);
+ if (newLeader == null) {
+ printf("Peer with address %s not found.", strAddr);
return -2;
}
try (RaftClient client = RaftUtils.createClient(getRaftGroup())) {
- String stringPeers = "[" + peersWithNewPriorities.stream().map(RaftPeer::toString)
- .collect(Collectors.joining(", ")) + "]";
- printf("Applying new peer state before transferring leadership: %n%s%n", stringPeers);
- RaftClientReply setConfigurationReply =
- client.admin().setConfiguration(peersWithNewPriorities);
- processReply(setConfigurationReply,
- () -> "failed to set priorities before initiating election");
// transfer leadership
- printf("Transferring leadership to server with address <%s> %n", strAddr);
- try {
- Thread.sleep(3_000);
- RaftClientReply transferLeadershipReply =
- client.admin().transferLeadership(newLeaderId, 60_000);
- processReply(transferLeadershipReply, () -> "election failed");
- } catch (Throwable t) {
- printf("caught an error when executing transfer: %s%n", t.getMessage());
- return -1;
+ if (!tryTransfer(client, newLeader, highestPriority, timeout.orElse(timeoutDefault))) {
+ // legacy mode, transfer leadership by setting priority.
+ tryTransfer(client, newLeader, highestPriority + 1, timeout.orElse(timeoutLegacy));
}
- println("Transferring leadership initiated");
+ } catch (Throwable t) {
+ printf("Failed to transfer to peer %s with address %s: ", newLeader.getId(), newLeader.getAddress());
+ t.printStackTrace(getPrintStream());
+ return -1;
}
return 0;
}
+ private boolean tryTransfer(RaftClient client, RaftPeer newLeader, int highestPriority, TimeDuration timeout)
+ throws IOException {
+ printf("Transferring leadership to peer %s with address %s%n", newLeader.getId(), newLeader.getAddress());
+ try {
+ // lift the new leader to the highest priority,
+ if (newLeader.getPriority() < highestPriority) {
+ setPriority(client, newLeader, highestPriority);
+ }
+ RaftClientReply transferLeadershipReply =
+ client.admin().transferLeadership(newLeader.getId(), timeout.toLong(TimeUnit.MILLISECONDS));
+ processReply(transferLeadershipReply, () -> "election failed");
+ } catch (TransferLeadershipException tle) {
+ if (tle.getMessage().contains("it does not has highest priority")) {
+ return false;
+ }
+ throw tle;
+ }
+ println("Transferring leadership initiated");
+ return true;
+ }
+
+ private void setPriority(RaftClient client, RaftPeer target, int priority) throws IOException {
+ printf("Changing priority of peer %s with address %s to %d%n", target.getId(), target.getAddress(), priority);
+ List<RaftPeer> peers = getRaftGroup().getPeers().stream()
+ .map(peer -> peer == target ? RaftPeer.newBuilder(peer).setPriority(priority).build() : peer)
+ .collect(Collectors.toList());
+ RaftClientReply reply = client.admin().setConfiguration(peers);
+ processReply(reply, () -> "Failed to set master priorities");
+ }
+
@Override
public String getUsage() {
return String.format("%s -%s <HOSTNAME:PORT>"
+ " -%s <PEER0_HOST:PEER0_PORT,PEER1_HOST:PEER1_PORT,PEER2_HOST:PEER2_PORT>"
- + " [-%s <RAFT_GROUP_ID>]",
+ + " [-%s <RAFT_GROUP_ID>] [-%s <TIMEOUT_IN_SECONDS>]",
getCommandName(), ADDRESS_OPTION_NAME, PEER_OPTION_NAME,
- GROUPID_OPTION_NAME);
+ GROUPID_OPTION_NAME, TIMEOUT_OPTION_NAME);
}
@Override
@@ -119,6 +144,12 @@ public class TransferCommand extends AbstractRatisCommand {
.required()
.desc("Server address that will take over as leader")
.build()
+ ).addOption(
+ Option.builder()
+ .option(TIMEOUT_OPTION_NAME)
+ .hasArg()
+ .desc("Timeout for transfer leadership to complete (in seconds)")
+ .build()
);
}
diff --git a/ratis-test/src/test/java/org/apache/ratis/shell/cli/sh/ElectionCommandIntegrationTest.java b/ratis-test/src/test/java/org/apache/ratis/shell/cli/sh/ElectionCommandIntegrationTest.java
index cea1b17e2..e708153aa 100644
--- a/ratis-test/src/test/java/org/apache/ratis/shell/cli/sh/ElectionCommandIntegrationTest.java
+++ b/ratis-test/src/test/java/org/apache/ratis/shell/cli/sh/ElectionCommandIntegrationTest.java
@@ -73,6 +73,42 @@ public abstract class ElectionCommandIntegrationTest <CLUSTER extends MiniRaftCl
}, 10, TimeDuration.valueOf(1, TimeUnit.SECONDS), "testElectionTransferCommand", LOG);
}
+ @Test
+ public void testElectionTransferCommandToHigherPriority() throws Exception {
+ runWithNewCluster(NUM_SERVERS, this::runTestElectionTransferCommandToHigherPriority);
+ }
+
+ void runTestElectionTransferCommandToHigherPriority(MiniRaftCluster cluster) throws Exception {
+ final RaftServer.Division leader = RaftTestUtil.waitForLeader(cluster);
+ final String address = getClusterAddress(cluster);
+
+ RaftServer.Division newLeader = cluster.getFollowers().get(0);
+ final StringPrintStream out = new StringPrintStream();
+ RatisShell shell = new RatisShell(out.getPrintStream());
+ Assert.assertTrue(cluster.getFollowers().contains(newLeader));
+
+ // set current leader's priority to 2
+ int ret = shell.run("peer", "setPriority", "-peers", address, "-addressPriority",
+ leader.getPeer().getAddress()+ "|" + 2);
+ Assert.assertEquals(0, ret);
+
+ // transfer to new leader will set its priority to 2 (with timeout 1s)
+ ret = shell.run("election", "transfer", "-peers", address, "-address",
+ newLeader.getPeer().getAddress(), "-timeout", "1");
+ Assert.assertEquals(0, ret);
+
+ JavaUtils.attempt(() -> Assert.assertEquals(cluster.getLeader().getId(), newLeader.getId()),
+ 10, TimeDuration.valueOf(1, TimeUnit.SECONDS), "testElectionTransferLeaderCommand", LOG);
+
+ // verify that priorities of new leader and old leader are both 2
+ ret = shell.run("group", "info", "-peers", address);
+ Assert.assertEquals(0 , ret);
+ String expected = String.format("\"%s\"%n priority: %d", newLeader.getPeer().getAddress(), 2);
+ String expected2 = String.format("\"%s\"%n priority: %d", leader.getPeer().getAddress(), 2);
+ Assert.assertTrue(out.toString().contains(expected));
+ Assert.assertTrue(out.toString().contains(expected2));
+ }
+
@Test
public void testElectionPauseResumeCommand() throws Exception {
runWithNewCluster(NUM_SERVERS, this::runTestElectionPauseResumeCommand);