You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ch...@apache.org on 2018/09/04 17:43:47 UTC

[mesos] 01/05: Added a unit test for plugin crash during an agent failover.

This is an automated email from the ASF dual-hosted git repository.

chhsiao pushed a commit to branch slrp
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit d0349dc6a71e439f2057fd0211880ddcfd773ce5
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
AuthorDate: Sat Sep 1 05:05:26 2018 -0700

    Added a unit test for plugin crash during an agent failover.
    
    If a CSI plugin is crashed during agent failover, the residual socket
    file would exist during SLRP recovery. This test verifies that the
    plugin is properly cleaned up during recovery so the plugin can be
    restarted.
    
    Review: https://reviews.apache.org/r/68600
---
 src/examples/test_csi_plugin.cpp                   |  11 +-
 .../storage_local_resource_provider_tests.cpp      | 112 +++++++++++++++++++++
 2 files changed, 122 insertions(+), 1 deletion(-)

diff --git a/src/examples/test_csi_plugin.cpp b/src/examples/test_csi_plugin.cpp
index 93301f8..7fa325e 100644
--- a/src/examples/test_csi_plugin.cpp
+++ b/src/examples/test_csi_plugin.cpp
@@ -964,12 +964,21 @@ int main(int argc, char** argv)
 
       if (error.isSome()) {
         cerr << "Failed to parse item '" << token << "' in 'volumes' flag: "
-             << error->message;
+             << error->message << endl;
         return EXIT_FAILURE;
       }
     }
   }
 
+  // Terminate the plugin if the endpoint socket file already exists to simulate
+  // an `EADDRINUSE` bind error.
+  const string endpointPath = strings::remove("unix://", flags.endpoint);
+  if (os::exists(endpointPath)) {
+    cerr << "Failed to create endpoint '" << endpointPath << "': already exists"
+         << endl;
+    return EXIT_FAILURE;
+  }
+
   unique_ptr<TestCSIPlugin> plugin(new TestCSIPlugin(
       flags.work_dir,
       flags.endpoint,
diff --git a/src/tests/storage_local_resource_provider_tests.cpp b/src/tests/storage_local_resource_provider_tests.cpp
index 9d0020c..d191783 100644
--- a/src/tests/storage_local_resource_provider_tests.cpp
+++ b/src/tests/storage_local_resource_provider_tests.cpp
@@ -19,6 +19,7 @@
 #include <process/http.hpp>
 #include <process/gtest.hpp>
 #include <process/gmock.hpp>
+#include <process/reap.hpp>
 
 #include <stout/hashmap.hpp>
 #include <stout/uri.hpp>
@@ -63,6 +64,7 @@ using process::Future;
 using process::Owned;
 using process::Promise;
 using process::post;
+using process::reap;
 
 using testing::AtMost;
 using testing::DoAll;
@@ -1311,6 +1313,116 @@ TEST_F(StorageLocalResourceProviderTest, ProfileDisappeared)
 }
 
 
+// This test verifies that the storage local resource provider can
+// recover if the plugin is killed during an agent failover..
+TEST_F(StorageLocalResourceProviderTest, AgentFailoverPluginKilled)
+{
+  setupResourceProviderConfig(Bytes(0), "volume0:4GB");
+
+  master::Flags masterFlags = CreateMasterFlags();
+  masterFlags.allocation_interval = Milliseconds(50);
+
+  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+
+  slave::Flags slaveFlags = CreateSlaveFlags();
+  slaveFlags.resource_provider_config_dir = resourceProviderConfigDir;
+
+  slave::Fetcher fetcher(slaveFlags);
+
+  Try<slave::MesosContainerizer*> _containerizer =
+    slave::MesosContainerizer::create(slaveFlags, false, &fetcher);
+  ASSERT_SOME(_containerizer);
+
+  Owned<slave::MesosContainerizer> containerizer(_containerizer.get());
+
+  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+  Try<Owned<cluster::Slave>> slave =
+    StartSlave(detector.get(), containerizer.get(), slaveFlags);
+  ASSERT_SOME(slave);
+
+  AWAIT_READY(slaveRegisteredMessage);
+
+  // Register a framework to receive offers.
+  FrameworkInfo framework = DEFAULT_FRAMEWORK_INFO;
+  framework.set_roles(0, "storage");
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, framework, master.get()->pid, DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _));
+
+  // The framework is expected to see the following offers in sequence:
+  //   1. One containing any resource from the resource provider before the
+  //      agent fails over.
+  //   2. One containing any resource from the resource provider after the agent
+  //      recovers from the failover.
+  //
+  // We set up the expectations for these offers as the test progresses.
+  Future<vector<Offer>> rawDiskOffers;
+  Future<vector<Offer>> slaveRecoveredOffers;
+
+  // We use the following filter to filter offers that do not have
+  // wanted resources for 365 days (the maximum).
+  Filters declineFilters;
+  declineFilters.set_refuse_seconds(Days(365).secs());
+
+  // Decline offers that contain only the agent's default resources.
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillRepeatedly(DeclineOffers(declineFilters));
+
+  EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource(
+      &Resources::hasResourceProvider)))
+    .WillOnce(FutureArg<1>(&rawDiskOffers));
+
+  driver.start();
+
+  AWAIT_READY(rawDiskOffers);
+
+  // Get the PID of the plugin container so we can kill it later.
+  Future<hashset<ContainerID>> pluginContainers = containerizer->containers();
+
+  AWAIT_READY(pluginContainers);
+  ASSERT_EQ(1u, pluginContainers->size());
+
+  Future<ContainerStatus> pluginStatus =
+    containerizer->status(*pluginContainers->begin());
+
+  AWAIT_READY(pluginStatus);
+
+  // Terminate the agent to simulate a failover.
+  EXPECT_CALL(sched, offerRescinded(_, _));
+
+  slave.get()->terminate();
+  slave->reset();
+  containerizer.reset();
+
+  // Kill the plugin container.
+  Future<Option<int>> pluginReaped = reap(pluginStatus->executor_pid());
+  ASSERT_EQ(0, os::kill(pluginStatus->executor_pid(), SIGKILL));
+
+  AWAIT_READY(pluginReaped);
+  ASSERT_SOME(pluginReaped.get());
+  ASSERT_TRUE(WIFSIGNALED(pluginReaped->get()));
+  EXPECT_EQ(SIGKILL, WTERMSIG(pluginReaped->get()));
+
+  // Restart the agent.
+  EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource(
+      &Resources::hasResourceProvider)))
+    .WillOnce(FutureArg<1>(&slaveRecoveredOffers));
+
+  slave = StartSlave(detector.get(), slaveFlags);
+  ASSERT_SOME(slave);
+
+  AWAIT_READY(slaveRecoveredOffers);
+}
+
+
 // This test verifies that if an agent is registered with a new ID,
 // the ID of the resource provider would be changed as well, and any
 // created volume becomes a pre-existing volume.