You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ch...@apache.org on 2018/09/04 17:45:52 UTC

[mesos] branch master updated (9231d78 -> 16df659)

This is an automated email from the ASF dual-hosted git repository.

chhsiao pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git.


    from 9231d78  Fixed flakiness in an operation reconciliation test.
     new d0349dc  Added a unit test for plugin crash during an agent failover.
     new be6809a  Implicitly authorized `VIEW_STANDALONE_CONTAINER` for SLRPs.
     new 7e82511  Added the `devolve` helper for agent v1 API responses.
     new 5c5103d  Cleaned up residual CSI endpoint sockets for terminated plugins.
     new 16df659  Added MESOS-8429 to the 1.7.0 CHANGELOG.

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 CHANGELOG                                          |   1 +
 src/authorizer/local/authorizer.cpp                |   6 +-
 src/examples/test_csi_plugin.cpp                   |  11 +-
 src/internal/devolve.cpp                           |   6 +
 src/internal/devolve.hpp                           |   1 +
 src/resource_provider/storage/provider.cpp         | 370 ++++++++++++---------
 src/slave/http.cpp                                 |   2 +-
 .../storage_local_resource_provider_tests.cpp      | 112 +++++++
 8 files changed, 353 insertions(+), 156 deletions(-)


[mesos] 02/05: Implicitly authorized `VIEW_STANDALONE_CONTAINER` for SLRPs.

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chhsiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit be6809a6fb440b3573328e93badee78b7db64848
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
AuthorDate: Mon Sep 3 14:52:10 2018 -0700

    Implicitly authorized `VIEW_STANDALONE_CONTAINER` for SLRPs.
    
    Review: https://reviews.apache.org/r/68614
---
 src/authorizer/local/authorizer.cpp | 6 ++++--
 src/slave/http.cpp                  | 2 +-
 2 files changed, 5 insertions(+), 3 deletions(-)

diff --git a/src/authorizer/local/authorizer.cpp b/src/authorizer/local/authorizer.cpp
index 3ab1b3b..83944b9 100644
--- a/src/authorizer/local/authorizer.cpp
+++ b/src/authorizer/local/authorizer.cpp
@@ -1089,7 +1089,8 @@ public:
           (action == authorization::LAUNCH_STANDALONE_CONTAINER ||
            action == authorization::WAIT_STANDALONE_CONTAINER ||
            action == authorization::KILL_STANDALONE_CONTAINER ||
-           action == authorization::REMOVE_STANDALONE_CONTAINER));
+           action == authorization::REMOVE_STANDALONE_CONTAINER ||
+           action == authorization::VIEW_STANDALONE_CONTAINER));
 
     Option<string> subjectPrefix;
     foreach (const Label& claim, subject->claims().labels()) {
@@ -1136,7 +1137,8 @@ public:
       if (action == authorization::LAUNCH_STANDALONE_CONTAINER ||
           action == authorization::WAIT_STANDALONE_CONTAINER ||
           action == authorization::KILL_STANDALONE_CONTAINER ||
-          action == authorization::REMOVE_STANDALONE_CONTAINER) {
+          action == authorization::REMOVE_STANDALONE_CONTAINER ||
+          action == authorization::VIEW_STANDALONE_CONTAINER) {
         return getImplicitResourceProviderObjectApprover(subject, action);
       }
     }
diff --git a/src/slave/http.cpp b/src/slave/http.cpp
index d0f6e1c..f8199af 100644
--- a/src/slave/http.cpp
+++ b/src/slave/http.cpp
@@ -2232,7 +2232,7 @@ Future<JSON::Array> Http::__containers(
         }
 
         if (isRootContainerStandalone &&
-            !approvers->approved<VIEW_STANDALONE_CONTAINER>()) {
+            !approvers->approved<VIEW_STANDALONE_CONTAINER>(rootContainerId)) {
           continue;
         }
 


[mesos] 03/05: Added the `devolve` helper for agent v1 API responses.

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chhsiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 7e825116cdd5a17bf04b6d9075debbb173cc74b6
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
AuthorDate: Mon Sep 3 14:53:14 2018 -0700

    Added the `devolve` helper for agent v1 API responses.
    
    Review: https://reviews.apache.org/r/68615
---
 src/internal/devolve.cpp | 6 ++++++
 src/internal/devolve.hpp | 1 +
 2 files changed, 7 insertions(+)

diff --git a/src/internal/devolve.cpp b/src/internal/devolve.cpp
index 93bd975..a5d9ab9 100644
--- a/src/internal/devolve.cpp
+++ b/src/internal/devolve.cpp
@@ -236,6 +236,12 @@ mesos::agent::Call devolve(const v1::agent::Call& call)
 }
 
 
+mesos::agent::Response devolve(const v1::agent::Response& response)
+{
+  return devolve<mesos::agent::Response>(response);
+}
+
+
 mesos::master::Call devolve(const v1::master::Call& call)
 {
   return devolve<mesos::master::Call>(call);
diff --git a/src/internal/devolve.hpp b/src/internal/devolve.hpp
index 1bc2a32..a1f8d8d 100644
--- a/src/internal/devolve.hpp
+++ b/src/internal/devolve.hpp
@@ -82,6 +82,7 @@ executor::Call devolve(const v1::executor::Call& call);
 executor::Event devolve(const v1::executor::Event& event);
 
 mesos::agent::Call devolve(const v1::agent::Call& call);
+mesos::agent::Response devolve(const v1::agent::Response& response);
 
 mesos::master::Call devolve(const v1::master::Call& call);
 


[mesos] 01/05: Added a unit test for plugin crash during an agent failover.

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chhsiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit d0349dc6a71e439f2057fd0211880ddcfd773ce5
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
AuthorDate: Sat Sep 1 05:05:26 2018 -0700

    Added a unit test for plugin crash during an agent failover.
    
    If a CSI plugin is crashed during agent failover, the residual socket
    file would exist during SLRP recovery. This test verifies that the
    plugin is properly cleaned up during recovery so the plugin can be
    restarted.
    
    Review: https://reviews.apache.org/r/68600
---
 src/examples/test_csi_plugin.cpp                   |  11 +-
 .../storage_local_resource_provider_tests.cpp      | 112 +++++++++++++++++++++
 2 files changed, 122 insertions(+), 1 deletion(-)

diff --git a/src/examples/test_csi_plugin.cpp b/src/examples/test_csi_plugin.cpp
index 93301f8..7fa325e 100644
--- a/src/examples/test_csi_plugin.cpp
+++ b/src/examples/test_csi_plugin.cpp
@@ -964,12 +964,21 @@ int main(int argc, char** argv)
 
       if (error.isSome()) {
         cerr << "Failed to parse item '" << token << "' in 'volumes' flag: "
-             << error->message;
+             << error->message << endl;
         return EXIT_FAILURE;
       }
     }
   }
 
+  // Terminate the plugin if the endpoint socket file already exists to simulate
+  // an `EADDRINUSE` bind error.
+  const string endpointPath = strings::remove("unix://", flags.endpoint);
+  if (os::exists(endpointPath)) {
+    cerr << "Failed to create endpoint '" << endpointPath << "': already exists"
+         << endl;
+    return EXIT_FAILURE;
+  }
+
   unique_ptr<TestCSIPlugin> plugin(new TestCSIPlugin(
       flags.work_dir,
       flags.endpoint,
diff --git a/src/tests/storage_local_resource_provider_tests.cpp b/src/tests/storage_local_resource_provider_tests.cpp
index 9d0020c..d191783 100644
--- a/src/tests/storage_local_resource_provider_tests.cpp
+++ b/src/tests/storage_local_resource_provider_tests.cpp
@@ -19,6 +19,7 @@
 #include <process/http.hpp>
 #include <process/gtest.hpp>
 #include <process/gmock.hpp>
+#include <process/reap.hpp>
 
 #include <stout/hashmap.hpp>
 #include <stout/uri.hpp>
@@ -63,6 +64,7 @@ using process::Future;
 using process::Owned;
 using process::Promise;
 using process::post;
+using process::reap;
 
 using testing::AtMost;
 using testing::DoAll;
@@ -1311,6 +1313,116 @@ TEST_F(StorageLocalResourceProviderTest, ProfileDisappeared)
 }
 
 
+// This test verifies that the storage local resource provider can
+// recover if the plugin is killed during an agent failover..
+TEST_F(StorageLocalResourceProviderTest, AgentFailoverPluginKilled)
+{
+  setupResourceProviderConfig(Bytes(0), "volume0:4GB");
+
+  master::Flags masterFlags = CreateMasterFlags();
+  masterFlags.allocation_interval = Milliseconds(50);
+
+  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+
+  slave::Flags slaveFlags = CreateSlaveFlags();
+  slaveFlags.resource_provider_config_dir = resourceProviderConfigDir;
+
+  slave::Fetcher fetcher(slaveFlags);
+
+  Try<slave::MesosContainerizer*> _containerizer =
+    slave::MesosContainerizer::create(slaveFlags, false, &fetcher);
+  ASSERT_SOME(_containerizer);
+
+  Owned<slave::MesosContainerizer> containerizer(_containerizer.get());
+
+  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+  Try<Owned<cluster::Slave>> slave =
+    StartSlave(detector.get(), containerizer.get(), slaveFlags);
+  ASSERT_SOME(slave);
+
+  AWAIT_READY(slaveRegisteredMessage);
+
+  // Register a framework to receive offers.
+  FrameworkInfo framework = DEFAULT_FRAMEWORK_INFO;
+  framework.set_roles(0, "storage");
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, framework, master.get()->pid, DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _));
+
+  // The framework is expected to see the following offers in sequence:
+  //   1. One containing any resource from the resource provider before the
+  //      agent fails over.
+  //   2. One containing any resource from the resource provider after the agent
+  //      recovers from the failover.
+  //
+  // We set up the expectations for these offers as the test progresses.
+  Future<vector<Offer>> rawDiskOffers;
+  Future<vector<Offer>> slaveRecoveredOffers;
+
+  // We use the following filter to filter offers that do not have
+  // wanted resources for 365 days (the maximum).
+  Filters declineFilters;
+  declineFilters.set_refuse_seconds(Days(365).secs());
+
+  // Decline offers that contain only the agent's default resources.
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillRepeatedly(DeclineOffers(declineFilters));
+
+  EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource(
+      &Resources::hasResourceProvider)))
+    .WillOnce(FutureArg<1>(&rawDiskOffers));
+
+  driver.start();
+
+  AWAIT_READY(rawDiskOffers);
+
+  // Get the PID of the plugin container so we can kill it later.
+  Future<hashset<ContainerID>> pluginContainers = containerizer->containers();
+
+  AWAIT_READY(pluginContainers);
+  ASSERT_EQ(1u, pluginContainers->size());
+
+  Future<ContainerStatus> pluginStatus =
+    containerizer->status(*pluginContainers->begin());
+
+  AWAIT_READY(pluginStatus);
+
+  // Terminate the agent to simulate a failover.
+  EXPECT_CALL(sched, offerRescinded(_, _));
+
+  slave.get()->terminate();
+  slave->reset();
+  containerizer.reset();
+
+  // Kill the plugin container.
+  Future<Option<int>> pluginReaped = reap(pluginStatus->executor_pid());
+  ASSERT_EQ(0, os::kill(pluginStatus->executor_pid(), SIGKILL));
+
+  AWAIT_READY(pluginReaped);
+  ASSERT_SOME(pluginReaped.get());
+  ASSERT_TRUE(WIFSIGNALED(pluginReaped->get()));
+  EXPECT_EQ(SIGKILL, WTERMSIG(pluginReaped->get()));
+
+  // Restart the agent.
+  EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource(
+      &Resources::hasResourceProvider)))
+    .WillOnce(FutureArg<1>(&slaveRecoveredOffers));
+
+  slave = StartSlave(detector.get(), slaveFlags);
+  ASSERT_SOME(slave);
+
+  AWAIT_READY(slaveRecoveredOffers);
+}
+
+
 // This test verifies that if an agent is registered with a new ID,
 // the ID of the resource provider would be changed as well, and any
 // created volume becomes a pre-existing volume.


[mesos] 05/05: Added MESOS-8429 to the 1.7.0 CHANGELOG.

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chhsiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 16df659276f713fa861ae6eabcb71a2adb16b437
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
AuthorDate: Tue Sep 4 10:42:34 2018 -0700

    Added MESOS-8429 to the 1.7.0 CHANGELOG.
---
 CHANGELOG | 1 +
 1 file changed, 1 insertion(+)

diff --git a/CHANGELOG b/CHANGELOG
index 0937c37..2f82122 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -169,6 +169,7 @@ All Resolved Issues:
   * [MESOS-7966] - check for maintenance on agent causes fatal error
   * [MESOS-8128] - Make os::pipe file descriptors O_CLOEXEC.
   * [MESOS-8134] - SlaveTest.ContainersEndpoint is flaky due to getenv crash.
+  * [MESOS-8429] - Clean up endpoint socket if the container daemon is destroyed while waiting.
   * [MESOS-8499] - Change docker health check image to the new nanoserver one
   * [MESOS-8567] - Test UriDiskProfileTest.FetchFromHTTP is flaky.
   * [MESOS-8613] - Test `MasterAllocatorTest/*.TaskFinished` is flaky.


[mesos] 04/05: Cleaned up residual CSI endpoint sockets for terminated plugins.

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chhsiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 5c5103df5f5f5952859c0c27616fc2d950468763
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
AuthorDate: Mon Sep 3 15:16:34 2018 -0700

    Cleaned up residual CSI endpoint sockets for terminated plugins.
    
    If a CSI plugin is crashed during agent failover, the residual socket
    file would exist during SLRP recovery, which may in turn make the plugin
    fail to restart. This patch cleans up the residual socket files to avoid
    such failures.
    
    Review: https://reviews.apache.org/r/68601
---
 src/resource_provider/storage/provider.cpp | 370 +++++++++++++++++------------
 1 file changed, 218 insertions(+), 152 deletions(-)

diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp
index 43a3ffc..6475f65 100644
--- a/src/resource_provider/storage/provider.cpp
+++ b/src/resource_provider/storage/provider.cpp
@@ -48,9 +48,11 @@
 
 #include <stout/foreach.hpp>
 #include <stout/hashmap.hpp>
+#include <stout/hashset.hpp>
 #include <stout/linkedhashmap.hpp>
 #include <stout/os.hpp>
 #include <stout/path.hpp>
+#include <stout/strings.hpp>
 
 #include <stout/os/realpath.hpp>
 
@@ -395,7 +397,9 @@ private:
 
   Future<csi::v0::Client> connect(const string& endpoint);
   Future<csi::v0::Client> getService(const ContainerID& containerId);
-  Future<Nothing> killService(const ContainerID& containerId);
+  Future<hashmap<ContainerID, Option<ContainerStatus>>> getContainers();
+  Future<Nothing> waitContainer(const ContainerID& containerId);
+  Future<Nothing> killContainer(const ContainerID& containerId);
 
   Future<Nothing> prepareIdentityService();
   Future<Nothing> prepareControllerService();
@@ -700,103 +704,127 @@ Future<Nothing> StorageLocalResourceProviderProcess::recover()
 
 Future<Nothing> StorageLocalResourceProviderProcess::recoverServices()
 {
-  Try<list<string>> containerPaths = csi::paths::getContainerPaths(
-      slave::paths::getCsiRootDir(workDir),
-      info.storage().plugin().type(),
-      info.storage().plugin().name());
-
-  if (containerPaths.isError()) {
-    return Failure(
-        "Failed to find plugin containers for CSI plugin type '" +
-        info.storage().plugin().type() + "' and name '" +
-        info.storage().plugin().name() + ": " +
-        containerPaths.error());
-  }
-
-  vector<Future<Nothing>> futures;
-
-  foreach (const string& path, containerPaths.get()) {
-    Try<csi::paths::ContainerPath> containerPath =
-      csi::paths::parseContainerPath(
+  return getContainers()
+    .then(defer(self(), [=](
+        const hashmap<ContainerID, Option<ContainerStatus>>& runningContainers)
+        -> Future<Nothing> {
+      Try<list<string>> containerPaths = csi::paths::getContainerPaths(
           slave::paths::getCsiRootDir(workDir),
-          path);
-
-    if (containerPath.isError()) {
-      return Failure(
-          "Failed to parse container path '" + path + "': " +
-          containerPath.error());
-    }
-
-    CHECK_EQ(info.storage().plugin().type(), containerPath->type);
-    CHECK_EQ(info.storage().plugin().name(), containerPath->name);
-
-    const ContainerID& containerId = containerPath->containerId;
+          info.storage().plugin().type(),
+          info.storage().plugin().name());
 
-    CHECK_SOME(nodeContainerId);
+      if (containerPaths.isError()) {
+        return Failure(
+            "Failed to find plugin containers for CSI plugin type '" +
+            info.storage().plugin().type() + "' and name '" +
+            info.storage().plugin().name() + "': " +
+            containerPaths.error());
+      }
 
-    // Do not kill the up-to-date controller or node container.
-    // Otherwise, kill them and perform cleanups.
-    if (nodeContainerId == containerId ||
-        controllerContainerId == containerId) {
-      const string configPath = csi::paths::getContainerInfoPath(
-          slave::paths::getCsiRootDir(workDir),
-          info.storage().plugin().type(),
-          info.storage().plugin().name(),
-          containerId);
+      vector<Future<Nothing>> futures;
 
-      if (os::exists(configPath)) {
-        Result<CSIPluginContainerInfo> config =
-          slave::state::read<CSIPluginContainerInfo>(configPath);
+      foreach (const string& path, containerPaths.get()) {
+        Try<csi::paths::ContainerPath> containerPath =
+          csi::paths::parseContainerPath(
+              slave::paths::getCsiRootDir(workDir),
+              path);
 
-        if (config.isError()) {
+        if (containerPath.isError()) {
           return Failure(
-              "Failed to read plugin container config from '" +
-              configPath + "': " + config.error());
+              "Failed to parse container path '" + path + "': " +
+              containerPath.error());
         }
 
-        if (config.isSome() &&
-            getCSIPluginContainerInfo(info, containerId) == config.get()) {
-          continue;
-        }
-      }
-    }
+        CHECK_EQ(info.storage().plugin().type(), containerPath->type);
+        CHECK_EQ(info.storage().plugin().name(), containerPath->name);
+
+        const ContainerID& containerId = containerPath->containerId;
 
-    futures.push_back(killService(containerId)
-      .then(defer(self(), [=]() -> Future<Nothing> {
-        Result<string> endpointDir =
-          os::realpath(csi::paths::getEndpointDirSymlinkPath(
+        // NOTE: Since `getContainers` might return containers that are not
+        // actually running, to identify if the container is actually running,
+        // we check if the `executor_pid` field is set as a workaround.
+        bool isRunningContainer = runningContainers.contains(containerId) &&
+          runningContainers.at(containerId).isSome() &&
+          runningContainers.at(containerId)->has_executor_pid();
+
+        // Do not kill the up-to-date running controller or node container.
+        if ((nodeContainerId == containerId ||
+             controllerContainerId == containerId) && isRunningContainer) {
+          const string configPath = csi::paths::getContainerInfoPath(
               slave::paths::getCsiRootDir(workDir),
               info.storage().plugin().type(),
               info.storage().plugin().name(),
-              containerId));
+              containerId);
 
-        if (endpointDir.isSome()) {
-          Try<Nothing> rmdir = os::rmdir(endpointDir.get());
-          if (rmdir.isError()) {
-            return Failure(
-                "Failed to remove endpoint directory '" + endpointDir.get() +
-                "': " + rmdir.error());
+          if (os::exists(configPath)) {
+            Result<CSIPluginContainerInfo> config =
+              slave::state::read<CSIPluginContainerInfo>(configPath);
+
+            if (config.isError()) {
+              return Failure(
+                  "Failed to read plugin container config from '" + configPath +
+                  "': " + config.error());
+            }
+
+            if (config.isSome() &&
+                getCSIPluginContainerInfo(info, containerId) == config.get()) {
+              continue;
+            }
           }
         }
 
-        Try<Nothing> rmdir = os::rmdir(path);
-        if (rmdir.isError()) {
-          return Failure(
-              "Failed to remove plugin container directory '" + path + "': " +
-              rmdir.error());
+        LOG(INFO) << "Cleaning up plugin container '" << containerId << "'";
+
+        // Otherwise, kill the container only if it is actually running (i.e.,
+        // not already being destroyed), then wait for the container to be
+        // destroyed before performing the cleanup despite if we kill it.
+        Future<Nothing> cleanup = Nothing();
+        if (runningContainers.contains(containerId)) {
+          if (isRunningContainer) {
+            cleanup = killContainer(containerId);
+          }
+          cleanup = cleanup
+            .then(defer(self(), &Self::waitContainer, containerId));
         }
 
-        return Nothing();
-      })));
-  }
+        cleanup = cleanup
+          .then(defer(self(), [=]() -> Future<Nothing> {
+            Result<string> endpointDir =
+              os::realpath(csi::paths::getEndpointDirSymlinkPath(
+                  slave::paths::getCsiRootDir(workDir),
+                  info.storage().plugin().type(),
+                  info.storage().plugin().name(),
+                  containerId));
+
+            if (endpointDir.isSome()) {
+              Try<Nothing> rmdir = os::rmdir(endpointDir.get());
+              if (rmdir.isError()) {
+                return Failure(
+                    "Failed to remove endpoint directory '" +
+                    endpointDir.get() + "': " + rmdir.error());
+              }
+            }
+
+            Try<Nothing> rmdir = os::rmdir(path);
+                if (rmdir.isError()) {
+              return Failure(
+                  "Failed to remove plugin container directory '" + path +
+                  "': " + rmdir.error());
+            }
+
+            return Nothing();
+          }));
+
+        futures.push_back(cleanup);
+      }
 
-  // NOTE: The `Controller` service is supported if the plugin has the
-  // `CONTROLLER_SERVICE` capability, and the `NodeGetId` call is
-  // supported if the `Controller` service has the
-  // `PUBLISH_UNPUBLISH_VOLUME` capability. Therefore, we first launch
-  // the node plugin to get the plugin capabilities, then decide if we
-  // need to launch the controller plugin and get the node ID.
-  return collect(futures)
+      return collect(futures).then([] { return Nothing(); });
+    }))
+    // NOTE: The `Controller` service is supported if the plugin has the
+    // `CONTROLLER_SERVICE` capability, and the `NodeGetId` call is supported if
+    // the `Controller` service has the `PUBLISH_UNPUBLISH_VOLUME` capability.
+    // So we first launch the node plugin to get the plugin capabilities, then
+    // decide if we need to launch the controller plugin and get the node ID.
     .then(defer(self(), &Self::prepareIdentityService))
     .then(defer(self(), &Self::prepareControllerService))
     .then(defer(self(), &Self::prepareNodeService));
@@ -815,7 +843,7 @@ Future<Nothing> StorageLocalResourceProviderProcess::recoverVolumes()
     return Failure(
         "Failed to find volumes for CSI plugin type '" +
         info.storage().plugin().type() + "' and name '" +
-        info.storage().plugin().name() + ": " + volumePaths.error());
+        info.storage().plugin().name() + "': " + volumePaths.error());
   }
 
   vector<Future<Nothing>> futures;
@@ -969,9 +997,8 @@ StorageLocalResourceProviderProcess::recoverResourceProviderState()
 
   if (realpath.isError()) {
     return Failure(
-        "Failed to read the latest symlink for resource provider with "
-        "type '" + info.type() + "' and name '" + info.name() + "'"
-        ": " + realpath.error());
+        "Failed to read the latest symlink for resource provider with type '" +
+        info.type() + "' and name '" + info.name() + "': " + realpath.error());
   }
 
   if (realpath.isSome()) {
@@ -1244,7 +1271,7 @@ StorageLocalResourceProviderProcess::reconcileOperationStatuses()
 
         auto err = [](const id::UUID& uuid, const string& message) {
           LOG(ERROR)
-            << "Falied to apply operation (uuid: " << uuid << "): "
+            << "Failed to apply operation (uuid: " << uuid << "): "
             << message;
         };
 
@@ -1911,11 +1938,24 @@ Future<csi::v0::Client> StorageLocalResourceProviderProcess::getService(
 
   Option<CSIPluginContainerInfo> config =
     getCSIPluginContainerInfo(info, containerId);
-
   CHECK_SOME(config);
 
-  CommandInfo commandInfo;
+  // We checkpoint the config first to keep track of the plugin container even
+  // if we fail to create its container daemon.
+  const string configPath = csi::paths::getContainerInfoPath(
+      slave::paths::getCsiRootDir(workDir),
+      info.storage().plugin().type(),
+      info.storage().plugin().name(),
+      containerId);
 
+  Try<Nothing> checkpoint = slave::state::checkpoint(configPath, config.get());
+  if (checkpoint.isError()) {
+    return Failure(
+        "Failed to checkpoint plugin container config to '" + configPath +
+        "': " + checkpoint.error());
+  }
+
+  CommandInfo commandInfo;
   if (config->has_command()) {
     commandInfo.CopyFrom(config->command());
   }
@@ -1940,7 +1980,6 @@ Future<csi::v0::Client> StorageLocalResourceProviderProcess::getService(
   endpointVar->set_value("unix://" + endpointPath);
 
   ContainerInfo containerInfo;
-
   if (config->has_container()) {
     containerInfo.CopyFrom(config->container());
   } else {
@@ -1986,19 +2025,9 @@ Future<csi::v0::Client> StorageLocalResourceProviderProcess::getService(
       config->resources(),
       containerInfo,
       std::function<Future<Nothing>()>(defer(self(), [=]() -> Future<Nothing> {
-        CHECK(services.at(containerId)->future().isPending());
-
-        return connect(endpointPath)
-          .then(defer(self(), [=](const csi::v0::Client& client) {
-            services.at(containerId)->set(client);
-            return Nothing();
-          }))
-          .onFailed(defer(self(), [=](const string& failure) {
-            services.at(containerId)->fail(failure);
-          }))
-          .onDiscarded(defer(self(), [=] {
-            services.at(containerId)->discard();
-          }));
+        CHECK(services.at(containerId)->associate(connect(endpointPath)));
+        return services.at(containerId)->future()
+          .then([] { return Nothing(); });
       })),
       std::function<Future<Nothing>()>(defer(self(), [=]() -> Future<Nothing> {
         ++metrics.csi_plugin_container_terminations;
@@ -2010,8 +2039,8 @@ Future<csi::v0::Client> StorageLocalResourceProviderProcess::getService(
           Try<Nothing> rm = os::rm(endpointPath);
           if (rm.isError()) {
             return Failure(
-                "Failed to remove endpoint '" + endpointPath +
-                "': " + rm.error());
+                "Failed to remove endpoint '" + endpointPath + "': " +
+                rm.error());
           }
         }
 
@@ -2024,20 +2053,6 @@ Future<csi::v0::Client> StorageLocalResourceProviderProcess::getService(
         stringify(containerId) + "': " + daemon.error());
   }
 
-  // Checkpoint the plugin container config.
-  const string configPath = csi::paths::getContainerInfoPath(
-      slave::paths::getCsiRootDir(workDir),
-      info.storage().plugin().type(),
-      info.storage().plugin().name(),
-      containerId);
-
-  Try<Nothing> checkpoint = slave::state::checkpoint(configPath, config.get());
-  if (checkpoint.isError()) {
-    return Failure(
-        "Failed to checkpoint plugin container config to '" + configPath +
-        "': " + checkpoint.error());
-  }
-
   auto die = [=](const string& message) {
     LOG(ERROR)
       << "Container daemon for '" << containerId << "' failed: " << message;
@@ -2053,14 +2068,89 @@ Future<csi::v0::Client> StorageLocalResourceProviderProcess::getService(
 }
 
 
-// Kills the specified plugin container and returns a future that waits
-// for it to terminate.
-Future<Nothing> StorageLocalResourceProviderProcess::killService(
+// Lists all running plugin containers for this resource provider.
+// NOTE: This might return containers that are not actually running,
+// e.g., if they are being destroyed.
+Future<hashmap<ContainerID, Option<ContainerStatus>>>
+StorageLocalResourceProviderProcess::getContainers()
+{
+  agent::Call call;
+  call.set_type(agent::Call::GET_CONTAINERS);
+  call.mutable_get_containers()->set_show_nested(false);
+  call.mutable_get_containers()->set_show_standalone(true);
+
+  return http::post(
+      extractParentEndpoint(url),
+      getAuthHeader(authToken) +
+        http::Headers{{"Accept", stringify(contentType)}},
+      serialize(contentType, evolve(call)),
+      stringify(contentType))
+    .then(defer(self(), [=](const http::Response& httpResponse)
+        -> Future<hashmap<ContainerID, Option<ContainerStatus>>> {
+      hashmap<ContainerID, Option<ContainerStatus>> result;
+
+      if (httpResponse.status != http::OK().status) {
+        return Failure(
+            "Failed to get containers: Unexpected response '" +
+            httpResponse.status + "' (" + httpResponse.body + ")");
+      }
+
+      Try<v1::agent::Response> v1Response =
+        deserialize<v1::agent::Response>(contentType, httpResponse.body);
+      if (v1Response.isError()) {
+        return Failure("Failed to get containers: " + v1Response.error());
+      }
+
+      const string prefix = getContainerIdPrefix(info);
+
+      agent::Response response = devolve(v1Response.get());
+      foreach (const agent::Response::GetContainers::Container& container,
+               response.get_containers().containers()) {
+        if (strings::startsWith(container.container_id().value(), prefix)) {
+          result.put(
+              container.container_id(),
+              container.has_container_status()
+                ? container.container_status()
+                : Option<ContainerStatus>::none());
+        }
+      }
+
+      return result;
+    }));
+}
+
+
+// Waits for the specified plugin container to be terminated.
+Future<Nothing> StorageLocalResourceProviderProcess::waitContainer(
     const ContainerID& containerId)
 {
-  CHECK(!daemons.contains(containerId));
-  CHECK(!services.contains(containerId));
+  agent::Call call;
+  call.set_type(agent::Call::WAIT_CONTAINER);
+  call.mutable_wait_container()->mutable_container_id()->CopyFrom(containerId);
 
+  return http::post(
+      extractParentEndpoint(url),
+      getAuthHeader(authToken),
+      serialize(contentType, evolve(call)),
+      stringify(contentType))
+    .then([containerId](const http::Response& response) -> Future<Nothing> {
+      if (response.status != http::OK().status &&
+          response.status != http::NotFound().status) {
+        return Failure(
+            "Failed to wait for container '" + stringify(containerId) +
+            "': Unexpected response '" + response.status + "' (" + response.body
+            + ")");
+      }
+
+      return Nothing();
+    });
+}
+
+
+// Kills the specified plugin container.
+Future<Nothing> StorageLocalResourceProviderProcess::killContainer(
+    const ContainerID& containerId)
+{
   agent::Call call;
   call.set_type(agent::Call::KILL_CONTAINER);
   call.mutable_kill_container()->mutable_container_id()->CopyFrom(containerId);
@@ -2070,41 +2160,17 @@ Future<Nothing> StorageLocalResourceProviderProcess::killService(
       getAuthHeader(authToken),
       serialize(contentType, evolve(call)),
       stringify(contentType))
-    .then(defer(self(), [=](const http::Response& response) -> Future<Nothing> {
-      if (response.status == http::NotFound().status) {
-        return Nothing();
-      }
-
-      if (response.status != http::OK().status) {
+    .then([containerId](const http::Response& response) -> Future<Nothing> {
+      if (response.status != http::OK().status &&
+          response.status != http::NotFound().status) {
         return Failure(
             "Failed to kill container '" + stringify(containerId) +
             "': Unexpected response '" + response.status + "' (" + response.body
             + ")");
       }
 
-      agent::Call call;
-      call.set_type(agent::Call::WAIT_CONTAINER);
-      call.mutable_wait_container()
-        ->mutable_container_id()->CopyFrom(containerId);
-
-      return http::post(
-          extractParentEndpoint(url),
-          getAuthHeader(authToken),
-          serialize(contentType, evolve(call)),
-          stringify(contentType))
-        .then(defer(self(), [=](
-            const http::Response& response) -> Future<Nothing> {
-          if (response.status != http::OK().status &&
-              response.status != http::NotFound().status) {
-            return Failure(
-                "Failed to wait for container '" + stringify(containerId) +
-                "': Unexpected response '" + response.status + "' (" +
-                response.body + ")");
-          }
-
-          return Nothing();
-        }));
-    }));
+      return Nothing();
+    });
 }
 
 
@@ -3239,8 +3305,8 @@ StorageLocalResourceProviderProcess::applyDestroyDisk(
           Resource::DiskInfo::Source::RAW);
       converted.mutable_disk()->mutable_source()->clear_mount();
 
-      // We only clear the the volume ID and metadata if the destroyed volume is
-      // not a pre-existing volume.
+      // We only clear the volume ID and metadata if the destroyed volume is not
+      // a pre-existing volume.
       if (resource.disk().source().has_profile()) {
         converted.mutable_disk()->mutable_source()->clear_id();
         converted.mutable_disk()->mutable_source()->clear_metadata();