You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2021/04/12 17:05:02 UTC

[kudu] branch master updated: [mini_cluster] support pausing/resuming a daemon in test minicluster

This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 0767c0a  [mini_cluster] support pausing/resuming a daemon in test minicluster
0767c0a is described below

commit 0767c0ad1b2d84cdcf958484bfe88adbe15ddf60
Author: Alexey Serbin <al...@apache.org>
AuthorDate: Sun Apr 11 23:12:21 2021 -0700

    [mini_cluster] support pausing/resuming a daemon in test minicluster
    
    Added control knobs to call pause/resume a daemon (i.e. master or
    tablet server) via mini_cluster test interface.  Also, added test
    scenarios to cover the newly introduced functionality.
    
    The new functionality is used by a follow-up patch.
    
    Change-Id: I37fad035c80a7161e941d93df87265254552e8f1
    Reviewed-on: http://gerrit.cloudera.org:8080/17304
    Reviewed-by: Grant Henke <gr...@apache.org>
    Tested-by: Alexey Serbin <as...@cloudera.com>
---
 .../java/org/apache/kudu/test/KuduTestHarness.java |  45 ++++--
 .../apache/kudu/test/cluster/MiniKuduCluster.java  |  81 ++++++++++
 .../org/apache/kudu/test/TestMiniKuduCluster.java  |  20 +++
 src/kudu/tools/kudu-tool-test.cc                   | 175 ++++++++++++++++++++-
 src/kudu/tools/tool.proto                          |  18 ++-
 src/kudu/tools/tool_action_test.cc                 |  34 ++++
 6 files changed, 360 insertions(+), 13 deletions(-)

diff --git a/java/kudu-test-utils/src/main/java/org/apache/kudu/test/KuduTestHarness.java b/java/kudu-test-utils/src/main/java/org/apache/kudu/test/KuduTestHarness.java
index 805704f..0abb3f4 100644
--- a/java/kudu-test-utils/src/main/java/org/apache/kudu/test/KuduTestHarness.java
+++ b/java/kudu-test-utils/src/main/java/org/apache/kudu/test/KuduTestHarness.java
@@ -254,6 +254,15 @@ public class KuduTestHarness extends ExternalResource {
   }
 
   /**
+   * Find the host and port of the leader master.
+   * @return the host and port of the leader master
+   * @throws Exception if we are unable to find the leader master
+   */
+  public HostAndPort findLeaderMasterServer() throws Exception {
+    return client.findLeaderMasterServer();
+  }
+
+  /**
    * Helper method to easily kill the leader master.
    *
    * This method is thread-safe.
@@ -265,15 +274,6 @@ public class KuduTestHarness extends ExternalResource {
   }
 
   /**
-   * Find the host and port of the leader master.
-   * @return the host and port of the leader master
-   * @throws Exception if we are unable to find the leader master
-   */
-  public HostAndPort findLeaderMasterServer() throws Exception {
-    return client.findLeaderMasterServer();
-  }
-
-  /**
    * Picks at random a tablet server that serves tablets from the passed table and restarts it.
    * @param table table to query for a TS to restart
    * @throws Exception
@@ -315,6 +315,33 @@ public class KuduTestHarness extends ExternalResource {
   }
 
   /**
+   * Finds and pauses the leader master.
+   * @return the host and port of the paused master
+   * @throws Exception
+   */
+  public HostAndPort pauseLeaderMaster() throws Exception {
+    HostAndPort hp = findLeaderMasterServer();
+    miniCluster.pauseMasterServer(hp);
+    return hp;
+  }
+
+  /**
+   * Pauses the specified master.
+   * @throws Exception
+   */
+  public void pauseMaster(HostAndPort hp) throws Exception {
+    miniCluster.pauseMasterServer(hp);
+  }
+
+  /**
+   * Resumes the specified master.
+   * @throws Exception
+   */
+  public void resumeMaster(HostAndPort hp) throws Exception {
+    miniCluster.resumeMasterServer(hp);
+  }
+
+  /**
    * Return the comma-separated list of "host:port" pairs that describes the master
    * config for this cluster.
    * @return The master config string.
diff --git a/java/kudu-test-utils/src/main/java/org/apache/kudu/test/cluster/MiniKuduCluster.java b/java/kudu-test-utils/src/main/java/org/apache/kudu/test/cluster/MiniKuduCluster.java
index 2dcd781..191816e 100644
--- a/java/kudu-test-utils/src/main/java/org/apache/kudu/test/cluster/MiniKuduCluster.java
+++ b/java/kudu-test-utils/src/main/java/org/apache/kudu/test/cluster/MiniKuduCluster.java
@@ -58,6 +58,8 @@ import org.apache.kudu.tools.Tool.GetMastersRequestPB;
 import org.apache.kudu.tools.Tool.GetTServersRequestPB;
 import org.apache.kudu.tools.Tool.KdestroyRequestPB;
 import org.apache.kudu.tools.Tool.KinitRequestPB;
+import org.apache.kudu.tools.Tool.PauseDaemonRequestPB;
+import org.apache.kudu.tools.Tool.ResumeDaemonRequestPB;
 import org.apache.kudu.tools.Tool.SetDaemonFlagRequestPB;
 import org.apache.kudu.tools.Tool.StartClusterRequestPB;
 import org.apache.kudu.tools.Tool.StartDaemonRequestPB;
@@ -90,6 +92,7 @@ public final class MiniKuduCluster implements AutoCloseable {
   private static class DaemonInfo {
     DaemonIdentifierPB id;
     boolean isRunning;
+    boolean isPaused;
   }
 
   // Map of master addresses to daemon information.
@@ -276,6 +279,7 @@ public final class MiniKuduCluster implements AutoCloseable {
       DaemonInfo d = new DaemonInfo();
       d.id = info.getId();
       d.isRunning = true;
+      d.isPaused = false;
       masterServers.put(ProtobufHelper.hostAndPortFromPB(info.getBoundRpcAddress()), d);
     }
     resp = sendRequestToCluster(
@@ -286,6 +290,7 @@ public final class MiniKuduCluster implements AutoCloseable {
       DaemonInfo d = new DaemonInfo();
       d.id = info.getId();
       d.isRunning = true;
+      d.isPaused = false;
       tabletServers.put(ProtobufHelper.hostAndPortFromPB(info.getBoundRpcAddress()), d);
     }
   }
@@ -350,6 +355,44 @@ public final class MiniKuduCluster implements AutoCloseable {
   }
 
   /**
+   * Pauses a master identified identified by the specified host and port.
+   * Does nothing if the master is already paused.
+   *
+   * @param hp unique host and port identifying the server
+   * @throws IOException if something went wrong in transit
+   */
+  public void pauseMasterServer(HostAndPort hp) throws IOException {
+    DaemonInfo d = getMasterServer(hp);
+    if (d.isPaused) {
+      return;
+    }
+    LOG.info("pausing master server {}", hp);
+    sendRequestToCluster(ControlShellRequestPB.newBuilder()
+        .setPauseDaemon(PauseDaemonRequestPB.newBuilder().setId(d.id).build())
+        .build());
+    d.isPaused = true;
+  }
+
+  /**
+   * Resumes a master identified identified by the specified host and port.
+   * Does nothing if the master isn't paused.
+   *
+   * @param hp unique host and port identifying the server
+   * @throws IOException if something went wrong in transit
+   */
+  public void resumeMasterServer(HostAndPort hp) throws IOException {
+    DaemonInfo d = getMasterServer(hp);
+    if (!d.isPaused) {
+      return;
+    }
+    LOG.info("resuming master server {}", hp);
+    sendRequestToCluster(ControlShellRequestPB.newBuilder()
+        .setResumeDaemon(ResumeDaemonRequestPB.newBuilder().setId(d.id).build())
+        .build());
+    d.isPaused = false;
+  }
+
+  /**
    * Starts a tablet server identified by an host and port.
    * Does nothing if the server was already running.
    *
@@ -388,6 +431,44 @@ public final class MiniKuduCluster implements AutoCloseable {
   }
 
   /**
+   * Pauses a tablet server identified by the specified host and port.
+   * Does nothing if the tablet server is already paused.
+   *
+   * @param hp unique host and port identifying the server
+   * @throws IOException if something went wrong in transit
+   */
+  public void pauseTabletServer(HostAndPort hp) throws IOException {
+    DaemonInfo d = getTabletServer(hp);
+    if (d.isPaused) {
+      return;
+    }
+    LOG.info("pausing tablet server {}", hp);
+    sendRequestToCluster(ControlShellRequestPB.newBuilder()
+        .setPauseDaemon(PauseDaemonRequestPB.newBuilder().setId(d.id).build())
+        .build());
+    d.isPaused = true;
+  }
+
+  /**
+   * Resumes a tablet server identified by the specified host and port.
+   * Does nothing if the tablet server isn't paused.
+   *
+   * @param hp unique host and port identifying the server
+   * @throws IOException if something went wrong in transit
+   */
+  public void resumeTabletServer(HostAndPort hp) throws IOException {
+    DaemonInfo d = getTabletServer(hp);
+    if (!d.isPaused) {
+      return;
+    }
+    LOG.info("resuming tablet server {}", hp);
+    sendRequestToCluster(ControlShellRequestPB.newBuilder()
+        .setResumeDaemon(ResumeDaemonRequestPB.newBuilder().setId(d.id).build())
+        .build());
+    d.isPaused = true;
+  }
+
+  /**
    * Kills all the master servers.
    * Does nothing to the servers that are already dead.
    *
diff --git a/java/kudu-test-utils/src/test/java/org/apache/kudu/test/TestMiniKuduCluster.java b/java/kudu-test-utils/src/test/java/org/apache/kudu/test/TestMiniKuduCluster.java
index d86c1a3..6844fcd 100644
--- a/java/kudu-test-utils/src/test/java/org/apache/kudu/test/TestMiniKuduCluster.java
+++ b/java/kudu-test-utils/src/test/java/org/apache/kudu/test/TestMiniKuduCluster.java
@@ -69,6 +69,16 @@ public class TestMiniKuduCluster {
 
         // Test we can reach it.
         testHostPort(masterHostPort, true);
+
+        // Pause master.
+        cluster.pauseMasterServer(masterHostPort);
+        // Pausing master again doesn't do anything.
+        cluster.pauseMasterServer(masterHostPort);
+
+        // Resume master.
+        cluster.resumeMasterServer(masterHostPort);
+        // Resuming master while it's not paused doesn't do anything.
+        cluster.resumeMasterServer(masterHostPort);
       }
 
       {
@@ -83,6 +93,16 @@ public class TestMiniKuduCluster {
         cluster.startTabletServer(tsHostPort);
 
         testHostPort(tsHostPort, true);
+
+        // Pause the first TS.
+        cluster.pauseTabletServer(tsHostPort);
+        // Pausing master again doesn't do anything.
+        cluster.pauseTabletServer(tsHostPort);
+
+        // Resume test first TS.
+        cluster.resumeTabletServer(tsHostPort);
+        // Resuming master while it's not paused doesn't do anything.
+        cluster.resumeTabletServer(tsHostPort);
       }
     }
   }
diff --git a/src/kudu/tools/kudu-tool-test.cc b/src/kudu/tools/kudu-tool-test.cc
index f8793fc..326a4f4 100644
--- a/src/kudu/tools/kudu-tool-test.cc
+++ b/src/kudu/tools/kudu-tool-test.cc
@@ -4931,8 +4931,8 @@ INSTANTIATE_TEST_SUITE_P(SerializationModes, ControlShellToolTest,
                                             ::testing::Bool()));
 
 TEST_P(ControlShellToolTest, TestControlShell) {
-  const int kNumMasters = 1;
-  const int kNumTservers = 3;
+  constexpr auto kNumMasters = 1;
+  constexpr auto kNumTservers = 3;
 
   // Create an illegal cluster first, to make sure that the shell continues to
   // work in the event of an error.
@@ -5068,6 +5068,32 @@ TEST_P(ControlShellToolTest, TestControlShell) {
     ASSERT_OK(SendReceive(req, &resp));
   }
 
+  // Try to pause a tserver which isn't running.
+  {
+    ControlShellRequestPB req;
+    ControlShellResponsePB resp;
+    *req.mutable_pause_daemon()->mutable_id() = tservers[0].id();
+    ASSERT_OK(proto_->SendMessage(req));
+    ASSERT_OK(proto_->ReceiveMessage(&resp));
+    ASSERT_TRUE(resp.has_error());
+    auto s = StatusFromPB(resp.error());
+    ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
+    ASSERT_STR_CONTAINS(s.ToString(), "the process is not there");
+  }
+
+  // Try to resume a tserver which isn't running.
+  {
+    ControlShellRequestPB req;
+    ControlShellResponsePB resp;
+    *req.mutable_resume_daemon()->mutable_id() = tservers[0].id();
+    ASSERT_OK(proto_->SendMessage(req));
+    ASSERT_OK(proto_->ReceiveMessage(&resp));
+    ASSERT_TRUE(resp.has_error());
+    auto s = StatusFromPB(resp.error());
+    ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
+    ASSERT_STR_CONTAINS(s.ToString(), "the process is not there");
+  }
+
   // Restart it.
   {
     ControlShellRequestPB req;
@@ -5076,6 +5102,22 @@ TEST_P(ControlShellToolTest, TestControlShell) {
     ASSERT_OK(SendReceive(req, &resp));
   }
 
+  // Pause it.
+  {
+    ControlShellRequestPB req;
+    ControlShellResponsePB resp;
+    *req.mutable_pause_daemon()->mutable_id() = tservers[0].id();
+    ASSERT_OK(SendReceive(req, &resp));
+  }
+
+  // Resume it.
+  {
+    ControlShellRequestPB req;
+    ControlShellResponsePB resp;
+    *req.mutable_resume_daemon()->mutable_id() = tservers[0].id();
+    ASSERT_OK(SendReceive(req, &resp));
+  }
+
   // Stop a master.
   {
     ControlShellRequestPB req;
@@ -5084,6 +5126,32 @@ TEST_P(ControlShellToolTest, TestControlShell) {
     ASSERT_OK(SendReceive(req, &resp));
   }
 
+  // Try to pause a master which isn't running.
+  {
+    ControlShellRequestPB req;
+    ControlShellResponsePB resp;
+    *req.mutable_pause_daemon()->mutable_id() = masters[0].id();
+    ASSERT_OK(proto_->SendMessage(req));
+    ASSERT_OK(proto_->ReceiveMessage(&resp));
+    ASSERT_TRUE(resp.has_error());
+    auto s = StatusFromPB(resp.error());
+    ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
+    ASSERT_STR_CONTAINS(s.ToString(), "the process is not there");
+  }
+
+  // Try to resume a master which isn't running.
+  {
+    ControlShellRequestPB req;
+    ControlShellResponsePB resp;
+    *req.mutable_resume_daemon()->mutable_id() = masters[0].id();
+    ASSERT_OK(proto_->SendMessage(req));
+    ASSERT_OK(proto_->ReceiveMessage(&resp));
+    ASSERT_TRUE(resp.has_error());
+    auto s = StatusFromPB(resp.error());
+    ASSERT_TRUE(s.IsIllegalState()) << s.ToString();
+    ASSERT_STR_CONTAINS(s.ToString(), "the process is not there");
+  }
+
   // Restart it.
   {
     ControlShellRequestPB req;
@@ -5092,6 +5160,22 @@ TEST_P(ControlShellToolTest, TestControlShell) {
     ASSERT_OK(SendReceive(req, &resp));
   }
 
+  // Pause it.
+  {
+    ControlShellRequestPB req;
+    ControlShellResponsePB resp;
+    *req.mutable_pause_daemon()->mutable_id() = masters[0].id();
+    ASSERT_OK(SendReceive(req, &resp));
+  }
+
+  // Resume it.
+  {
+    ControlShellRequestPB req;
+    ControlShellResponsePB resp;
+    *req.mutable_resume_daemon()->mutable_id() = masters[0].id();
+    ASSERT_OK(SendReceive(req, &resp));
+  }
+
   // Restart some non-existent daemons.
   vector<DaemonIdentifierPB> daemons_to_restart;
   {
@@ -5128,6 +5212,61 @@ TEST_P(ControlShellToolTest, TestControlShell) {
     ASSERT_TRUE(resp.has_error());
   }
 
+  // Attempt to pause/resume some non-existent daemons.
+  {
+    vector<DaemonIdentifierPB> daemons;
+    {
+      // Unknown daemon type.
+      DaemonIdentifierPB id;
+      id.set_type(UNKNOWN_DAEMON);
+      daemons.emplace_back(std::move(id));
+    }
+    {
+      // Tablet server #5.
+      DaemonIdentifierPB id;
+      id.set_type(TSERVER);
+      id.set_index(5);
+      daemons.emplace_back(std::move(id));
+    }
+    {
+      // Tablet server without an index.
+      DaemonIdentifierPB id;
+      id.set_type(TSERVER);
+      daemons.emplace_back(std::move(id));
+    }
+    {
+      // Master without an index.
+      DaemonIdentifierPB id;
+      id.set_type(MASTER);
+      daemons.emplace_back(std::move(id));
+    }
+    {
+      // Master #100.
+      DaemonIdentifierPB id;
+      id.set_type(MASTER);
+      id.set_index(100);
+      daemons.emplace_back(std::move(id));
+    }
+    for (const auto& daemon : daemons) {
+      {
+        ControlShellRequestPB req;
+        ControlShellResponsePB resp;
+        *req.mutable_pause_daemon()->mutable_id() = daemon;
+        ASSERT_OK(proto_->SendMessage(req));
+        ASSERT_OK(proto_->ReceiveMessage(&resp));
+        ASSERT_TRUE(resp.has_error());
+      }
+      {
+        ControlShellRequestPB req;
+        ControlShellResponsePB resp;
+        *req.mutable_resume_daemon()->mutable_id() = daemon;
+        ASSERT_OK(proto_->SendMessage(req));
+        ASSERT_OK(proto_->ReceiveMessage(&resp));
+        ASSERT_TRUE(resp.has_error());
+      }
+    }
+  }
+
   // Stop the cluster.
   {
     ControlShellRequestPB req;
@@ -5208,6 +5347,38 @@ TEST_P(ControlShellToolTest, TestControlShell) {
                         "mini-KDC doesn't support SetFlag()");
   }
 
+  // Try pause KDC: this should fail since mini-KDC doesn't support that (yet).
+  {
+    ControlShellRequestPB req;
+    ControlShellResponsePB resp;
+    auto* r = req.mutable_pause_daemon();
+    r->mutable_id()->set_index(0);
+    r->mutable_id()->set_type(KDC);
+    ASSERT_OK(proto_->SendMessage(req));
+    ASSERT_OK(proto_->ReceiveMessage(&resp));
+    ASSERT_TRUE(resp.has_error());
+    auto s = StatusFromPB(resp.error());
+    ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
+    ASSERT_STR_CONTAINS(s.ToString(),
+                        "mini-KDC doesn't support pausing");
+  }
+
+  // Try resume KDC: this should fail since mini-KDC doesn't support that (yet).
+  {
+    ControlShellRequestPB req;
+    ControlShellResponsePB resp;
+    auto* r = req.mutable_resume_daemon();
+    r->mutable_id()->set_index(0);
+    r->mutable_id()->set_type(KDC);
+    ASSERT_OK(proto_->SendMessage(req));
+    ASSERT_OK(proto_->ReceiveMessage(&resp));
+    ASSERT_TRUE(resp.has_error());
+    auto s = StatusFromPB(resp.error());
+    ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
+    ASSERT_STR_CONTAINS(s.ToString(),
+                        "mini-KDC doesn't support resuming");
+  }
+
   if (enable_kerberos()) {
     // Restart the KDC.
     {
diff --git a/src/kudu/tools/tool.proto b/src/kudu/tools/tool.proto
index 199f11d..d5c60c7 100644
--- a/src/kudu/tools/tool.proto
+++ b/src/kudu/tools/tool.proto
@@ -113,11 +113,23 @@ message StartDaemonRequestPB {
 //
 // No-op for already stopped daemons.
 message StopDaemonRequestPB {
-  // The identifier for the daemon to be stopped. This identifier is unique
+  // The identifier of the daemon to be stopped. This identifier is unique
   // and immutable for the lifetime of the cluster.
   optional DaemonIdentifierPB id = 1;
 }
 
+// Pause a daemon: send SIGSTOP signal to the process.
+message PauseDaemonRequestPB {
+  // The identifier of the daemon to be paused.
+  optional DaemonIdentifierPB id = 1;
+}
+
+// Resume a paused daemon: send SIGCONT signal to the process.
+message ResumeDaemonRequestPB {
+  // The identifier of the daemon to be resumed.
+  optional DaemonIdentifierPB id = 1;
+}
+
 // Daemon information.
 message DaemonInfoPB {
   // Unique identifier of the daemon.
@@ -168,7 +180,7 @@ message KinitRequestPB {
 
 // Call SetFlag() on the specific daemon.
 message SetDaemonFlagRequestPB {
-  // The identifier for the daemon to sent the request to.
+  // The identifier of the daemon to sent the request to.
   optional DaemonIdentifierPB id = 1;
   // The name of the flag to set.
   optional string flag = 2;
@@ -211,6 +223,8 @@ message ControlShellRequestPB {
     KdestroyRequestPB kdestroy = 10;
     KinitRequestPB kinit = 11;
     SetDaemonFlagRequestPB set_daemon_flag = 12;
+    PauseDaemonRequestPB pause_daemon = 13;
+    ResumeDaemonRequestPB resume_daemon = 14;
   }
 }
 
diff --git a/src/kudu/tools/tool_action_test.cc b/src/kudu/tools/tool_action_test.cc
index 8e0fab4..11153aa 100644
--- a/src/kudu/tools/tool_action_test.cc
+++ b/src/kudu/tools/tool_action_test.cc
@@ -307,6 +307,40 @@ Status ProcessRequest(const ControlShellRequestPB& req,
       RETURN_NOT_OK((*cluster)->SetFlag(daemon, r.flag(), r.value()));
       break;
     }
+    case ControlShellRequestPB::kPauseDaemon:
+    {
+      const auto& r = req.pause_daemon();
+      if (!r.has_id()) {
+        RETURN_NOT_OK(Status::InvalidArgument("missing process id"));
+      }
+      const auto& id = r.id();
+      if (id.type() == DaemonType::KDC) {
+        return Status::InvalidArgument("mini-KDC doesn't support pausing");
+      }
+      ExternalDaemon* daemon;
+      MiniKdc* kdc;
+      RETURN_NOT_OK(FindDaemon(*cluster, id, &daemon, &kdc));
+      DCHECK(daemon);
+      RETURN_NOT_OK(daemon->Pause());
+      break;
+    }
+    case ControlShellRequestPB::kResumeDaemon:
+    {
+      const auto& r = req.resume_daemon();
+      if (!r.has_id()) {
+        RETURN_NOT_OK(Status::InvalidArgument("missing process id"));
+      }
+      const auto& id = r.id();
+      if (id.type() == DaemonType::KDC) {
+        return Status::InvalidArgument("mini-KDC doesn't support resuming");
+      }
+      ExternalDaemon* daemon;
+      MiniKdc* kdc;
+      RETURN_NOT_OK(FindDaemon(*cluster, id, &daemon, &kdc));
+      DCHECK(daemon);
+      RETURN_NOT_OK(daemon->Resume());
+      break;
+    }
     default:
       RETURN_NOT_OK(Status::InvalidArgument("unknown cluster control request"));
   }