You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2023/02/21 19:16:36 UTC

[ratis] branch master updated: RATIS-1769. Avoid changing priorities in TransferCommand unless necessary (#808)

This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 4bb35f022 RATIS-1769. Avoid changing priorities in TransferCommand unless necessary (#808)
4bb35f022 is described below

commit 4bb35f022688105dedec483aca6fe77f84a75f3e
Author: Kaijie Chen <ck...@apache.org>
AuthorDate: Wed Feb 22 03:16:30 2023 +0800

    RATIS-1769. Avoid changing priorities in TransferCommand unless necessary (#808)
---
 .../shell/cli/sh/command/AbstractRatisCommand.java |   4 +
 .../shell/cli/sh/election/TransferCommand.java     | 101 ++++++++++++++-------
 .../cli/sh/ElectionCommandIntegrationTest.java     |  36 ++++++++
 3 files changed, 106 insertions(+), 35 deletions(-)

diff --git a/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/command/AbstractRatisCommand.java b/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/command/AbstractRatisCommand.java
index a39dad6b9..8e92d9ace 100644
--- a/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/command/AbstractRatisCommand.java
+++ b/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/command/AbstractRatisCommand.java
@@ -151,6 +151,10 @@ public abstract class AbstractRatisCommand implements Command {
             .addOption(GROUPID_OPTION_NAME, true, "Raft group id");
   }
 
+  protected PrintStream getPrintStream() {
+    return printStream;
+  }
+
   protected void printf(String format, Object... args) {
     printStream.printf(format, args);
   }
diff --git a/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/election/TransferCommand.java b/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/election/TransferCommand.java
index a1de4dbe3..b5180548c 100644
--- a/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/election/TransferCommand.java
+++ b/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/election/TransferCommand.java
@@ -23,14 +23,16 @@ import org.apache.commons.cli.Options;
 import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.protocol.RaftPeer;
-import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.exceptions.TransferLeadershipException;
 import org.apache.ratis.shell.cli.RaftUtils;
 import org.apache.ratis.shell.cli.sh.command.AbstractRatisCommand;
 import org.apache.ratis.shell.cli.sh.command.Context;
+import org.apache.ratis.util.TimeDuration;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 /**
@@ -38,6 +40,7 @@ import java.util.stream.Collectors;
  */
 public class TransferCommand extends AbstractRatisCommand {
   public static final String ADDRESS_OPTION_NAME = "address";
+  public static final String TIMEOUT_OPTION_NAME = "timeout";
   /**
    * @param context command context
    */
@@ -55,54 +58,76 @@ public class TransferCommand extends AbstractRatisCommand {
     super.run(cl);
 
     String strAddr = cl.getOptionValue(ADDRESS_OPTION_NAME);
+    // TODO: Default timeout should be set to 0, which means let server decide (based on election timeout).
+    //       However, occasionally the request could timeout too fast while the transfer is in progress.
+    //       i.e. request timeout doesn't mean transfer leadership has failed.
+    //       Currently, Ratis shell returns merely based on the result of the request.
+    //       So we set a larger default timeout here (3s).
+    final TimeDuration timeoutDefault = TimeDuration.valueOf(3, TimeUnit.SECONDS);
+    // Default timeout for legacy mode matches with the legacy command (version 2.4.x and older).
+    final TimeDuration timeoutLegacy = TimeDuration.valueOf(60, TimeUnit.SECONDS);
+    final Optional<TimeDuration> timeout = !cl.hasOption(TIMEOUT_OPTION_NAME) ? Optional.empty() :
+        Optional.of(TimeDuration.valueOf(cl.getOptionValue(TIMEOUT_OPTION_NAME), TimeUnit.SECONDS));
 
-    RaftPeerId newLeaderId = null;
-    // update priorities to enable transfer
-    List<RaftPeer> peersWithNewPriorities = new ArrayList<>();
-    for (RaftPeer peer : getRaftGroup().getPeers()) {
-      peersWithNewPriorities.add(
-          RaftPeer.newBuilder(peer)
-              .setPriority(peer.getAddress().equals(strAddr) ? 2 : 1)
-              .build()
-      );
-      if (peer.getAddress().equals(strAddr)) {
-        newLeaderId = peer.getId();
-      }
-    }
-    if (newLeaderId == null) {
+    final int highestPriority = getRaftGroup().getPeers().stream()
+        .mapToInt(RaftPeer::getPriority).max().orElse(0);
+    RaftPeer newLeader = getRaftGroup().getPeers().stream()
+        .filter(peer -> peer.getAddress().equals(strAddr)).findAny().orElse(null);
+    if (newLeader == null) {
+      printf("Peer with address %s not found.", strAddr);
       return -2;
     }
     try (RaftClient client = RaftUtils.createClient(getRaftGroup())) {
-      String stringPeers = "[" + peersWithNewPriorities.stream().map(RaftPeer::toString)
-          .collect(Collectors.joining(", ")) + "]";
-      printf("Applying new peer state before transferring leadership: %n%s%n", stringPeers);
-      RaftClientReply setConfigurationReply =
-          client.admin().setConfiguration(peersWithNewPriorities);
-      processReply(setConfigurationReply,
-          () -> "failed to set priorities before initiating election");
       // transfer leadership
-      printf("Transferring leadership to server with address <%s> %n", strAddr);
-      try {
-        Thread.sleep(3_000);
-        RaftClientReply transferLeadershipReply =
-            client.admin().transferLeadership(newLeaderId, 60_000);
-        processReply(transferLeadershipReply, () -> "election failed");
-      } catch (Throwable t) {
-        printf("caught an error when executing transfer: %s%n", t.getMessage());
-        return -1;
+      if (!tryTransfer(client, newLeader, highestPriority, timeout.orElse(timeoutDefault))) {
+        // legacy mode, transfer leadership by setting priority.
+        tryTransfer(client, newLeader, highestPriority + 1, timeout.orElse(timeoutLegacy));
       }
-      println("Transferring leadership initiated");
+    } catch (Throwable t) {
+      printf("Failed to transfer to peer %s with address %s: ", newLeader.getId(), newLeader.getAddress());
+      t.printStackTrace(getPrintStream());
+      return -1;
     }
     return 0;
   }
 
+  private boolean tryTransfer(RaftClient client, RaftPeer newLeader, int highestPriority, TimeDuration timeout)
+      throws IOException {
+    printf("Transferring leadership to peer %s with address %s%n", newLeader.getId(), newLeader.getAddress());
+    try {
+      // lift the new leader to the highest priority,
+      if (newLeader.getPriority() < highestPriority) {
+        setPriority(client, newLeader, highestPriority);
+      }
+      RaftClientReply transferLeadershipReply =
+          client.admin().transferLeadership(newLeader.getId(), timeout.toLong(TimeUnit.MILLISECONDS));
+      processReply(transferLeadershipReply, () -> "election failed");
+    } catch (TransferLeadershipException tle) {
+      if (tle.getMessage().contains("it does not has highest priority")) {
+        return false;
+      }
+      throw tle;
+    }
+    println("Transferring leadership initiated");
+    return true;
+  }
+
+  private void setPriority(RaftClient client, RaftPeer target, int priority) throws IOException {
+    printf("Changing priority of peer %s with address %s to %d%n", target.getId(), target.getAddress(), priority);
+    List<RaftPeer> peers = getRaftGroup().getPeers().stream()
+        .map(peer -> peer == target ? RaftPeer.newBuilder(peer).setPriority(priority).build() : peer)
+        .collect(Collectors.toList());
+    RaftClientReply reply = client.admin().setConfiguration(peers);
+    processReply(reply, () -> "Failed to set master priorities");
+  }
+
   @Override
   public String getUsage() {
     return String.format("%s -%s <HOSTNAME:PORT>"
             + " -%s <PEER0_HOST:PEER0_PORT,PEER1_HOST:PEER1_PORT,PEER2_HOST:PEER2_PORT>"
-            + " [-%s <RAFT_GROUP_ID>]",
+            + " [-%s <RAFT_GROUP_ID>] [-%s <TIMEOUT_IN_SECONDS>]",
         getCommandName(), ADDRESS_OPTION_NAME, PEER_OPTION_NAME,
-        GROUPID_OPTION_NAME);
+        GROUPID_OPTION_NAME, TIMEOUT_OPTION_NAME);
   }
 
   @Override
@@ -119,6 +144,12 @@ public class TransferCommand extends AbstractRatisCommand {
             .required()
             .desc("Server address that will take over as leader")
             .build()
+    ).addOption(
+        Option.builder()
+            .option(TIMEOUT_OPTION_NAME)
+            .hasArg()
+            .desc("Timeout for transfer leadership to complete (in seconds)")
+            .build()
     );
   }
 
diff --git a/ratis-test/src/test/java/org/apache/ratis/shell/cli/sh/ElectionCommandIntegrationTest.java b/ratis-test/src/test/java/org/apache/ratis/shell/cli/sh/ElectionCommandIntegrationTest.java
index cea1b17e2..e708153aa 100644
--- a/ratis-test/src/test/java/org/apache/ratis/shell/cli/sh/ElectionCommandIntegrationTest.java
+++ b/ratis-test/src/test/java/org/apache/ratis/shell/cli/sh/ElectionCommandIntegrationTest.java
@@ -73,6 +73,42 @@ public abstract class ElectionCommandIntegrationTest <CLUSTER extends MiniRaftCl
     }, 10, TimeDuration.valueOf(1, TimeUnit.SECONDS), "testElectionTransferCommand", LOG);
   }
 
+  @Test
+  public void testElectionTransferCommandToHigherPriority() throws Exception {
+    runWithNewCluster(NUM_SERVERS, this::runTestElectionTransferCommandToHigherPriority);
+  }
+
+  void runTestElectionTransferCommandToHigherPriority(MiniRaftCluster cluster) throws Exception {
+    final RaftServer.Division leader = RaftTestUtil.waitForLeader(cluster);
+    final String address = getClusterAddress(cluster);
+
+    RaftServer.Division newLeader = cluster.getFollowers().get(0);
+    final StringPrintStream out = new StringPrintStream();
+    RatisShell shell = new RatisShell(out.getPrintStream());
+    Assert.assertTrue(cluster.getFollowers().contains(newLeader));
+
+    // set current leader's priority to 2
+    int ret = shell.run("peer", "setPriority", "-peers", address, "-addressPriority",
+        leader.getPeer().getAddress()+ "|" + 2);
+    Assert.assertEquals(0, ret);
+
+    // transfer to new leader will set its priority to 2 (with timeout 1s)
+    ret = shell.run("election", "transfer", "-peers", address, "-address",
+        newLeader.getPeer().getAddress(), "-timeout", "1");
+    Assert.assertEquals(0, ret);
+
+    JavaUtils.attempt(() -> Assert.assertEquals(cluster.getLeader().getId(), newLeader.getId()),
+        10, TimeDuration.valueOf(1, TimeUnit.SECONDS), "testElectionTransferLeaderCommand", LOG);
+
+    // verify that priorities of new leader and old leader are both 2
+    ret = shell.run("group", "info", "-peers", address);
+    Assert.assertEquals(0 , ret);
+    String expected = String.format("\"%s\"%n  priority: %d", newLeader.getPeer().getAddress(), 2);
+    String expected2 = String.format("\"%s\"%n  priority: %d", leader.getPeer().getAddress(), 2);
+    Assert.assertTrue(out.toString().contains(expected));
+    Assert.assertTrue(out.toString().contains(expected2));
+  }
+
   @Test
   public void testElectionPauseResumeCommand() throws Exception {
     runWithNewCluster(NUM_SERVERS, this::runTestElectionPauseResumeCommand);