You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ch...@apache.org on 2018/09/04 17:43:50 UTC

[mesos] 04/05: Cleaned up residual CSI endpoint sockets for terminated plugins.

This is an automated email from the ASF dual-hosted git repository.

chhsiao pushed a commit to branch slrp
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 5c5103df5f5f5952859c0c27616fc2d950468763
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
AuthorDate: Mon Sep 3 15:16:34 2018 -0700

    Cleaned up residual CSI endpoint sockets for terminated plugins.
    
    If a CSI plugin is crashed during agent failover, the residual socket
    file would exist during SLRP recovery, which may in turn make the plugin
    fail to restart. This patch cleans up the residual socket files to avoid
    such failures.
    
    Review: https://reviews.apache.org/r/68601
---
 src/resource_provider/storage/provider.cpp | 370 +++++++++++++++++------------
 1 file changed, 218 insertions(+), 152 deletions(-)

diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp
index 43a3ffc..6475f65 100644
--- a/src/resource_provider/storage/provider.cpp
+++ b/src/resource_provider/storage/provider.cpp
@@ -48,9 +48,11 @@
 
 #include <stout/foreach.hpp>
 #include <stout/hashmap.hpp>
+#include <stout/hashset.hpp>
 #include <stout/linkedhashmap.hpp>
 #include <stout/os.hpp>
 #include <stout/path.hpp>
+#include <stout/strings.hpp>
 
 #include <stout/os/realpath.hpp>
 
@@ -395,7 +397,9 @@ private:
 
   Future<csi::v0::Client> connect(const string& endpoint);
   Future<csi::v0::Client> getService(const ContainerID& containerId);
-  Future<Nothing> killService(const ContainerID& containerId);
+  Future<hashmap<ContainerID, Option<ContainerStatus>>> getContainers();
+  Future<Nothing> waitContainer(const ContainerID& containerId);
+  Future<Nothing> killContainer(const ContainerID& containerId);
 
   Future<Nothing> prepareIdentityService();
   Future<Nothing> prepareControllerService();
@@ -700,103 +704,127 @@ Future<Nothing> StorageLocalResourceProviderProcess::recover()
 
 Future<Nothing> StorageLocalResourceProviderProcess::recoverServices()
 {
-  Try<list<string>> containerPaths = csi::paths::getContainerPaths(
-      slave::paths::getCsiRootDir(workDir),
-      info.storage().plugin().type(),
-      info.storage().plugin().name());
-
-  if (containerPaths.isError()) {
-    return Failure(
-        "Failed to find plugin containers for CSI plugin type '" +
-        info.storage().plugin().type() + "' and name '" +
-        info.storage().plugin().name() + ": " +
-        containerPaths.error());
-  }
-
-  vector<Future<Nothing>> futures;
-
-  foreach (const string& path, containerPaths.get()) {
-    Try<csi::paths::ContainerPath> containerPath =
-      csi::paths::parseContainerPath(
+  return getContainers()
+    .then(defer(self(), [=](
+        const hashmap<ContainerID, Option<ContainerStatus>>& runningContainers)
+        -> Future<Nothing> {
+      Try<list<string>> containerPaths = csi::paths::getContainerPaths(
           slave::paths::getCsiRootDir(workDir),
-          path);
-
-    if (containerPath.isError()) {
-      return Failure(
-          "Failed to parse container path '" + path + "': " +
-          containerPath.error());
-    }
-
-    CHECK_EQ(info.storage().plugin().type(), containerPath->type);
-    CHECK_EQ(info.storage().plugin().name(), containerPath->name);
-
-    const ContainerID& containerId = containerPath->containerId;
+          info.storage().plugin().type(),
+          info.storage().plugin().name());
 
-    CHECK_SOME(nodeContainerId);
+      if (containerPaths.isError()) {
+        return Failure(
+            "Failed to find plugin containers for CSI plugin type '" +
+            info.storage().plugin().type() + "' and name '" +
+            info.storage().plugin().name() + "': " +
+            containerPaths.error());
+      }
 
-    // Do not kill the up-to-date controller or node container.
-    // Otherwise, kill them and perform cleanups.
-    if (nodeContainerId == containerId ||
-        controllerContainerId == containerId) {
-      const string configPath = csi::paths::getContainerInfoPath(
-          slave::paths::getCsiRootDir(workDir),
-          info.storage().plugin().type(),
-          info.storage().plugin().name(),
-          containerId);
+      vector<Future<Nothing>> futures;
 
-      if (os::exists(configPath)) {
-        Result<CSIPluginContainerInfo> config =
-          slave::state::read<CSIPluginContainerInfo>(configPath);
+      foreach (const string& path, containerPaths.get()) {
+        Try<csi::paths::ContainerPath> containerPath =
+          csi::paths::parseContainerPath(
+              slave::paths::getCsiRootDir(workDir),
+              path);
 
-        if (config.isError()) {
+        if (containerPath.isError()) {
           return Failure(
-              "Failed to read plugin container config from '" +
-              configPath + "': " + config.error());
+              "Failed to parse container path '" + path + "': " +
+              containerPath.error());
         }
 
-        if (config.isSome() &&
-            getCSIPluginContainerInfo(info, containerId) == config.get()) {
-          continue;
-        }
-      }
-    }
+        CHECK_EQ(info.storage().plugin().type(), containerPath->type);
+        CHECK_EQ(info.storage().plugin().name(), containerPath->name);
+
+        const ContainerID& containerId = containerPath->containerId;
 
-    futures.push_back(killService(containerId)
-      .then(defer(self(), [=]() -> Future<Nothing> {
-        Result<string> endpointDir =
-          os::realpath(csi::paths::getEndpointDirSymlinkPath(
+        // NOTE: Since `getContainers` might return containers that are not
+        // actually running, to identify if the container is actually running,
+        // we check if the `executor_pid` field is set as a workaround.
+        bool isRunningContainer = runningContainers.contains(containerId) &&
+          runningContainers.at(containerId).isSome() &&
+          runningContainers.at(containerId)->has_executor_pid();
+
+        // Do not kill the up-to-date running controller or node container.
+        if ((nodeContainerId == containerId ||
+             controllerContainerId == containerId) && isRunningContainer) {
+          const string configPath = csi::paths::getContainerInfoPath(
               slave::paths::getCsiRootDir(workDir),
               info.storage().plugin().type(),
               info.storage().plugin().name(),
-              containerId));
+              containerId);
 
-        if (endpointDir.isSome()) {
-          Try<Nothing> rmdir = os::rmdir(endpointDir.get());
-          if (rmdir.isError()) {
-            return Failure(
-                "Failed to remove endpoint directory '" + endpointDir.get() +
-                "': " + rmdir.error());
+          if (os::exists(configPath)) {
+            Result<CSIPluginContainerInfo> config =
+              slave::state::read<CSIPluginContainerInfo>(configPath);
+
+            if (config.isError()) {
+              return Failure(
+                  "Failed to read plugin container config from '" + configPath +
+                  "': " + config.error());
+            }
+
+            if (config.isSome() &&
+                getCSIPluginContainerInfo(info, containerId) == config.get()) {
+              continue;
+            }
           }
         }
 
-        Try<Nothing> rmdir = os::rmdir(path);
-        if (rmdir.isError()) {
-          return Failure(
-              "Failed to remove plugin container directory '" + path + "': " +
-              rmdir.error());
+        LOG(INFO) << "Cleaning up plugin container '" << containerId << "'";
+
+        // Otherwise, kill the container only if it is actually running (i.e.,
+        // not already being destroyed), then wait for the container to be
+        // destroyed before performing the cleanup despite if we kill it.
+        Future<Nothing> cleanup = Nothing();
+        if (runningContainers.contains(containerId)) {
+          if (isRunningContainer) {
+            cleanup = killContainer(containerId);
+          }
+          cleanup = cleanup
+            .then(defer(self(), &Self::waitContainer, containerId));
         }
 
-        return Nothing();
-      })));
-  }
+        cleanup = cleanup
+          .then(defer(self(), [=]() -> Future<Nothing> {
+            Result<string> endpointDir =
+              os::realpath(csi::paths::getEndpointDirSymlinkPath(
+                  slave::paths::getCsiRootDir(workDir),
+                  info.storage().plugin().type(),
+                  info.storage().plugin().name(),
+                  containerId));
+
+            if (endpointDir.isSome()) {
+              Try<Nothing> rmdir = os::rmdir(endpointDir.get());
+              if (rmdir.isError()) {
+                return Failure(
+                    "Failed to remove endpoint directory '" +
+                    endpointDir.get() + "': " + rmdir.error());
+              }
+            }
+
+            Try<Nothing> rmdir = os::rmdir(path);
+                if (rmdir.isError()) {
+              return Failure(
+                  "Failed to remove plugin container directory '" + path +
+                  "': " + rmdir.error());
+            }
+
+            return Nothing();
+          }));
+
+        futures.push_back(cleanup);
+      }
 
-  // NOTE: The `Controller` service is supported if the plugin has the
-  // `CONTROLLER_SERVICE` capability, and the `NodeGetId` call is
-  // supported if the `Controller` service has the
-  // `PUBLISH_UNPUBLISH_VOLUME` capability. Therefore, we first launch
-  // the node plugin to get the plugin capabilities, then decide if we
-  // need to launch the controller plugin and get the node ID.
-  return collect(futures)
+      return collect(futures).then([] { return Nothing(); });
+    }))
+    // NOTE: The `Controller` service is supported if the plugin has the
+    // `CONTROLLER_SERVICE` capability, and the `NodeGetId` call is supported if
+    // the `Controller` service has the `PUBLISH_UNPUBLISH_VOLUME` capability.
+    // So we first launch the node plugin to get the plugin capabilities, then
+    // decide if we need to launch the controller plugin and get the node ID.
     .then(defer(self(), &Self::prepareIdentityService))
     .then(defer(self(), &Self::prepareControllerService))
     .then(defer(self(), &Self::prepareNodeService));
@@ -815,7 +843,7 @@ Future<Nothing> StorageLocalResourceProviderProcess::recoverVolumes()
     return Failure(
         "Failed to find volumes for CSI plugin type '" +
         info.storage().plugin().type() + "' and name '" +
-        info.storage().plugin().name() + ": " + volumePaths.error());
+        info.storage().plugin().name() + "': " + volumePaths.error());
   }
 
   vector<Future<Nothing>> futures;
@@ -969,9 +997,8 @@ StorageLocalResourceProviderProcess::recoverResourceProviderState()
 
   if (realpath.isError()) {
     return Failure(
-        "Failed to read the latest symlink for resource provider with "
-        "type '" + info.type() + "' and name '" + info.name() + "'"
-        ": " + realpath.error());
+        "Failed to read the latest symlink for resource provider with type '" +
+        info.type() + "' and name '" + info.name() + "': " + realpath.error());
   }
 
   if (realpath.isSome()) {
@@ -1244,7 +1271,7 @@ StorageLocalResourceProviderProcess::reconcileOperationStatuses()
 
         auto err = [](const id::UUID& uuid, const string& message) {
           LOG(ERROR)
-            << "Falied to apply operation (uuid: " << uuid << "): "
+            << "Failed to apply operation (uuid: " << uuid << "): "
             << message;
         };
 
@@ -1911,11 +1938,24 @@ Future<csi::v0::Client> StorageLocalResourceProviderProcess::getService(
 
   Option<CSIPluginContainerInfo> config =
     getCSIPluginContainerInfo(info, containerId);
-
   CHECK_SOME(config);
 
-  CommandInfo commandInfo;
+  // We checkpoint the config first to keep track of the plugin container even
+  // if we fail to create its container daemon.
+  const string configPath = csi::paths::getContainerInfoPath(
+      slave::paths::getCsiRootDir(workDir),
+      info.storage().plugin().type(),
+      info.storage().plugin().name(),
+      containerId);
 
+  Try<Nothing> checkpoint = slave::state::checkpoint(configPath, config.get());
+  if (checkpoint.isError()) {
+    return Failure(
+        "Failed to checkpoint plugin container config to '" + configPath +
+        "': " + checkpoint.error());
+  }
+
+  CommandInfo commandInfo;
   if (config->has_command()) {
     commandInfo.CopyFrom(config->command());
   }
@@ -1940,7 +1980,6 @@ Future<csi::v0::Client> StorageLocalResourceProviderProcess::getService(
   endpointVar->set_value("unix://" + endpointPath);
 
   ContainerInfo containerInfo;
-
   if (config->has_container()) {
     containerInfo.CopyFrom(config->container());
   } else {
@@ -1986,19 +2025,9 @@ Future<csi::v0::Client> StorageLocalResourceProviderProcess::getService(
       config->resources(),
       containerInfo,
       std::function<Future<Nothing>()>(defer(self(), [=]() -> Future<Nothing> {
-        CHECK(services.at(containerId)->future().isPending());
-
-        return connect(endpointPath)
-          .then(defer(self(), [=](const csi::v0::Client& client) {
-            services.at(containerId)->set(client);
-            return Nothing();
-          }))
-          .onFailed(defer(self(), [=](const string& failure) {
-            services.at(containerId)->fail(failure);
-          }))
-          .onDiscarded(defer(self(), [=] {
-            services.at(containerId)->discard();
-          }));
+        CHECK(services.at(containerId)->associate(connect(endpointPath)));
+        return services.at(containerId)->future()
+          .then([] { return Nothing(); });
       })),
       std::function<Future<Nothing>()>(defer(self(), [=]() -> Future<Nothing> {
         ++metrics.csi_plugin_container_terminations;
@@ -2010,8 +2039,8 @@ Future<csi::v0::Client> StorageLocalResourceProviderProcess::getService(
           Try<Nothing> rm = os::rm(endpointPath);
           if (rm.isError()) {
             return Failure(
-                "Failed to remove endpoint '" + endpointPath +
-                "': " + rm.error());
+                "Failed to remove endpoint '" + endpointPath + "': " +
+                rm.error());
           }
         }
 
@@ -2024,20 +2053,6 @@ Future<csi::v0::Client> StorageLocalResourceProviderProcess::getService(
         stringify(containerId) + "': " + daemon.error());
   }
 
-  // Checkpoint the plugin container config.
-  const string configPath = csi::paths::getContainerInfoPath(
-      slave::paths::getCsiRootDir(workDir),
-      info.storage().plugin().type(),
-      info.storage().plugin().name(),
-      containerId);
-
-  Try<Nothing> checkpoint = slave::state::checkpoint(configPath, config.get());
-  if (checkpoint.isError()) {
-    return Failure(
-        "Failed to checkpoint plugin container config to '" + configPath +
-        "': " + checkpoint.error());
-  }
-
   auto die = [=](const string& message) {
     LOG(ERROR)
       << "Container daemon for '" << containerId << "' failed: " << message;
@@ -2053,14 +2068,89 @@ Future<csi::v0::Client> StorageLocalResourceProviderProcess::getService(
 }
 
 
-// Kills the specified plugin container and returns a future that waits
-// for it to terminate.
-Future<Nothing> StorageLocalResourceProviderProcess::killService(
+// Lists all running plugin containers for this resource provider.
+// NOTE: This might return containers that are not actually running,
+// e.g., if they are being destroyed.
+Future<hashmap<ContainerID, Option<ContainerStatus>>>
+StorageLocalResourceProviderProcess::getContainers()
+{
+  agent::Call call;
+  call.set_type(agent::Call::GET_CONTAINERS);
+  call.mutable_get_containers()->set_show_nested(false);
+  call.mutable_get_containers()->set_show_standalone(true);
+
+  return http::post(
+      extractParentEndpoint(url),
+      getAuthHeader(authToken) +
+        http::Headers{{"Accept", stringify(contentType)}},
+      serialize(contentType, evolve(call)),
+      stringify(contentType))
+    .then(defer(self(), [=](const http::Response& httpResponse)
+        -> Future<hashmap<ContainerID, Option<ContainerStatus>>> {
+      hashmap<ContainerID, Option<ContainerStatus>> result;
+
+      if (httpResponse.status != http::OK().status) {
+        return Failure(
+            "Failed to get containers: Unexpected response '" +
+            httpResponse.status + "' (" + httpResponse.body + ")");
+      }
+
+      Try<v1::agent::Response> v1Response =
+        deserialize<v1::agent::Response>(contentType, httpResponse.body);
+      if (v1Response.isError()) {
+        return Failure("Failed to get containers: " + v1Response.error());
+      }
+
+      const string prefix = getContainerIdPrefix(info);
+
+      agent::Response response = devolve(v1Response.get());
+      foreach (const agent::Response::GetContainers::Container& container,
+               response.get_containers().containers()) {
+        if (strings::startsWith(container.container_id().value(), prefix)) {
+          result.put(
+              container.container_id(),
+              container.has_container_status()
+                ? container.container_status()
+                : Option<ContainerStatus>::none());
+        }
+      }
+
+      return result;
+    }));
+}
+
+
+// Waits for the specified plugin container to be terminated.
+Future<Nothing> StorageLocalResourceProviderProcess::waitContainer(
     const ContainerID& containerId)
 {
-  CHECK(!daemons.contains(containerId));
-  CHECK(!services.contains(containerId));
+  agent::Call call;
+  call.set_type(agent::Call::WAIT_CONTAINER);
+  call.mutable_wait_container()->mutable_container_id()->CopyFrom(containerId);
 
+  return http::post(
+      extractParentEndpoint(url),
+      getAuthHeader(authToken),
+      serialize(contentType, evolve(call)),
+      stringify(contentType))
+    .then([containerId](const http::Response& response) -> Future<Nothing> {
+      if (response.status != http::OK().status &&
+          response.status != http::NotFound().status) {
+        return Failure(
+            "Failed to wait for container '" + stringify(containerId) +
+            "': Unexpected response '" + response.status + "' (" + response.body
+            + ")");
+      }
+
+      return Nothing();
+    });
+}
+
+
+// Kills the specified plugin container.
+Future<Nothing> StorageLocalResourceProviderProcess::killContainer(
+    const ContainerID& containerId)
+{
   agent::Call call;
   call.set_type(agent::Call::KILL_CONTAINER);
   call.mutable_kill_container()->mutable_container_id()->CopyFrom(containerId);
@@ -2070,41 +2160,17 @@ Future<Nothing> StorageLocalResourceProviderProcess::killService(
       getAuthHeader(authToken),
       serialize(contentType, evolve(call)),
       stringify(contentType))
-    .then(defer(self(), [=](const http::Response& response) -> Future<Nothing> {
-      if (response.status == http::NotFound().status) {
-        return Nothing();
-      }
-
-      if (response.status != http::OK().status) {
+    .then([containerId](const http::Response& response) -> Future<Nothing> {
+      if (response.status != http::OK().status &&
+          response.status != http::NotFound().status) {
         return Failure(
             "Failed to kill container '" + stringify(containerId) +
             "': Unexpected response '" + response.status + "' (" + response.body
             + ")");
       }
 
-      agent::Call call;
-      call.set_type(agent::Call::WAIT_CONTAINER);
-      call.mutable_wait_container()
-        ->mutable_container_id()->CopyFrom(containerId);
-
-      return http::post(
-          extractParentEndpoint(url),
-          getAuthHeader(authToken),
-          serialize(contentType, evolve(call)),
-          stringify(contentType))
-        .then(defer(self(), [=](
-            const http::Response& response) -> Future<Nothing> {
-          if (response.status != http::OK().status &&
-              response.status != http::NotFound().status) {
-            return Failure(
-                "Failed to wait for container '" + stringify(containerId) +
-                "': Unexpected response '" + response.status + "' (" +
-                response.body + ")");
-          }
-
-          return Nothing();
-        }));
-    }));
+      return Nothing();
+    });
 }
 
 
@@ -3239,8 +3305,8 @@ StorageLocalResourceProviderProcess::applyDestroyDisk(
           Resource::DiskInfo::Source::RAW);
       converted.mutable_disk()->mutable_source()->clear_mount();
 
-      // We only clear the the volume ID and metadata if the destroyed volume is
-      // not a pre-existing volume.
+      // We only clear the volume ID and metadata if the destroyed volume is not
+      // a pre-existing volume.
       if (resource.disk().source().has_profile()) {
         converted.mutable_disk()->mutable_source()->clear_id();
         converted.mutable_disk()->mutable_source()->clear_metadata();