You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2017/12/19 23:14:31 UTC

[01/12] mesos git commit: Refactred `UriVolumeProfileAdaptor` and `VolumeProfileTest`.

Repository: mesos
Updated Branches:
  refs/heads/master 884226e3b -> 4feb36706


Refactred `UriVolumeProfileAdaptor` and `VolumeProfileTest`.

This patch moves `uri_volume_profile.proto` to `volume_profile.proto`
and made it a Mesos internal protobuf, to avoid loading the same
protobuf twice when loading the `UriVolumeProfileAdaptor` module.

Review: https://reviews.apache.org/r/64725/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/4feb3670
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/4feb3670
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/4feb3670

Branch: refs/heads/master
Commit: 4feb36706cd6a751e82378cd9d7a93928ccf7f19
Parents: edf8b70
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
Authored: Tue Dec 19 14:32:24 2017 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Tue Dec 19 15:14:22 2017 -0800

----------------------------------------------------------------------
 src/Makefile.am                                |  30 ++--
 src/csi/uri_volume_profile.proto               |  43 ------
 src/csi/utils.cpp                              |  41 ++++++
 src/csi/utils.hpp                              |   3 +
 src/resource_provider/uri_volume_profile.cpp   | 147 +-------------------
 src/resource_provider/uri_volume_profile.hpp   |  24 +---
 src/resource_provider/volume_profile.proto     |  43 ++++++
 src/resource_provider/volume_profile_utils.cpp | 121 ++++++++++++++++
 src/resource_provider/volume_profile_utils.hpp |  48 +++++++
 src/slave/flags.cpp                            |   2 +
 src/slave/flags.hpp                            |   2 +
 src/slave/slave.cpp                            |   4 +
 src/slave/slave.hpp                            |   7 +-
 src/tests/volume_profile_tests.cpp             | 108 ++++++++++----
 14 files changed, 378 insertions(+), 245 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/4feb3670/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index 6b18015..33677df 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -382,6 +382,10 @@ CXX_PROTOS +=								\
   slave/containerizer/mesos/isolators/network/cni/spec.pb.h
 
 if ENABLE_GRPC
+CXX_PROTOS +=								\
+  resource_provider/volume_profile.pb.cc				\
+  resource_provider/volume_profile.pb.h
+
 CXX_CSI_PROTOS =							\
   ../include/csi/csi.grpc.pb.cc						\
   ../include/csi/csi.grpc.pb.h						\
@@ -772,10 +776,14 @@ nodist_quota_HEADERS =							\
 resourceproviderdir = $(pkgincludedir)/resource_provider
 
 resourceprovider_HEADERS =						\
-  $(top_srcdir)/include/mesos/resource_provider/volume_profile.hpp	\
   $(top_srcdir)/include/mesos/resource_provider/resource_provider.hpp	\
   $(top_srcdir)/include/mesos/resource_provider/resource_provider.proto
 
+if ENABLE_GRPC
+resourceprovider_HEADERS +=						\
+  $(top_srcdir)/include/mesos/resource_provider/volume_profile.hpp
+endif
+
 nodist_resourceprovider_HEADERS =					\
   ../include/mesos/resource_provider/resource_provider.pb.h
 
@@ -1104,11 +1112,6 @@ libmesos_no_3rdparty_la_SOURCES +=					\
   zookeeper/zookeeper.cpp						\
   zookeeper/group.cpp
 
-if ENABLE_GRPC
-libmesos_no_3rdparty_la_SOURCES +=					\
-  resource_provider/volume_profile.cpp
-endif
-
 libmesos_no_3rdparty_la_SOURCES +=					\
   authentication/cram_md5/authenticatee.hpp				\
   authentication/cram_md5/authenticator.hpp				\
@@ -1468,6 +1471,12 @@ libmesos_no_3rdparty_la_SOURCES +=					\
 endif
 
 if ENABLE_GRPC
+libmesos_no_3rdparty_la_SOURCES +=					\
+  resource_provider/volume_profile.cpp					\
+  resource_provider/volume_profile.proto				\
+  resource_provider/volume_profile_utils.cpp				\
+  resource_provider/volume_profile_utils.hpp
+
 if OS_LINUX
 libmesos_no_3rdparty_la_SOURCES +=					\
   resource_provider/storage/provider.cpp				\
@@ -2336,10 +2345,10 @@ libload_qos_controller_la_LDFLAGS = $(MESOS_MODULE_LDFLAGS)
 if ENABLE_GRPC
 pkgmodule_LTLIBRARIES += liburi_volume_profile.la
 liburi_volume_profile_la_SOURCES =				\
-  csi/uri_volume_profile.pb.cc					\
-  csi/uri_volume_profile.pb.h					\
   resource_provider/uri_volume_profile.cpp			\
-  resource_provider/uri_volume_profile.hpp
+  resource_provider/uri_volume_profile.hpp			\
+  resource_provider/volume_profile.ph.h				\
+  resource_provider/volume_profile_utils.hpp
 liburi_volume_profile_la_CPPFLAGS = $(MESOS_CPPFLAGS)
 liburi_volume_profile_la_LDFLAGS = $(MESOS_MODULE_LDFLAGS)
 endif
@@ -2646,7 +2655,8 @@ if ENABLE_GRPC
 mesos_tests_SOURCES +=						\
   tests/csi_client_tests.cpp					\
   tests/mock_csi_plugin.cpp					\
-  tests/mock_csi_plugin.hpp
+  tests/mock_csi_plugin.hpp					\
+  tests/volume_profile_tests.cpp
 
 if OS_LINUX
 mesos_tests_SOURCES +=						\

http://git-wip-us.apache.org/repos/asf/mesos/blob/4feb3670/src/csi/uri_volume_profile.proto
----------------------------------------------------------------------
diff --git a/src/csi/uri_volume_profile.proto b/src/csi/uri_volume_profile.proto
deleted file mode 100644
index a3dfc5d..0000000
--- a/src/csi/uri_volume_profile.proto
+++ /dev/null
@@ -1,43 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-syntax = "proto3";
-
-import "csi.proto";
-
-package mesos.csi;
-
-
-message UriVolumeProfileMapping {
-  message CSIManifest {
-    // Capabilities used for creating, publishing, and validating volumes.
-    // This field is REQUIRED.
-    //
-    // NOTE: The name of this field is plural because some CSI requests
-    // support multiple capabilities. However, Mesos currently does not
-    // support this.
-    .csi.VolumeCapability volume_capabilities = 1;
-
-    // Parameters passed to the CSI CreateVolume RPC.
-    // This field is OPTIONAL.
-    map<string, string> create_parameters = 2;
-  }
-
-  // Each map entry associates a profile name (type string) with the CSI
-  // capabilities and parameters used to make specific CSI requests.
-  // This field is OPTIONAL.
-  map<string, CSIManifest> profile_matrix = 1;
-}

http://git-wip-us.apache.org/repos/asf/mesos/blob/4feb3670/src/csi/utils.cpp
----------------------------------------------------------------------
diff --git a/src/csi/utils.cpp b/src/csi/utils.cpp
index bae4654..9e04357 100644
--- a/src/csi/utils.cpp
+++ b/src/csi/utils.cpp
@@ -52,6 +52,47 @@ bool operator==(const Version& left, const Version& right)
 }
 
 
+bool operator==(const VolumeCapability& left, const VolumeCapability& right) {
+  // NOTE: This enumeration is set when `block` or `mount` are set and
+  // covers the case where neither are set.
+  if (left.access_type_case() != right.access_type_case()) {
+    return false;
+  }
+
+  // NOTE: No need to check `block` for equality as that object is empty.
+
+  if (left.has_mount()) {
+    if (left.mount().fs_type() != right.mount().fs_type()) {
+      return false;
+    }
+
+    if (left.mount().mount_flags_size() != right.mount().mount_flags_size()) {
+      return false;
+    }
+
+    // NOTE: Ordering may or may not matter for these flags, but this helper
+    // only checks for complete equality.
+    for (int i = 0; i < left.mount().mount_flags_size(); i++) {
+      if (left.mount().mount_flags(i) != right.mount().mount_flags(i)) {
+        return false;
+      }
+    }
+  }
+
+  if (left.has_access_mode() != right.has_access_mode()) {
+    return false;
+  }
+
+  if (left.has_access_mode()) {
+    if (left.access_mode().mode() != right.access_mode().mode()) {
+      return false;
+    }
+  }
+
+  return true;
+}
+
+
 bool operator!=(const Version& left, const Version& right)
 {
   return !(left == right);

http://git-wip-us.apache.org/repos/asf/mesos/blob/4feb3670/src/csi/utils.hpp
----------------------------------------------------------------------
diff --git a/src/csi/utils.hpp b/src/csi/utils.hpp
index b49fa59..2dd61bc 100644
--- a/src/csi/utils.hpp
+++ b/src/csi/utils.hpp
@@ -43,6 +43,9 @@ bool operator==(
 bool operator==(const Version& left, const Version& right);
 
 
+bool operator==(const VolumeCapability& left, const VolumeCapability& right);
+
+
 bool operator!=(const Version& left, const Version& right);
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/4feb3670/src/resource_provider/uri_volume_profile.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/uri_volume_profile.cpp b/src/resource_provider/uri_volume_profile.cpp
index 9dc0e6c..9c5bcb9 100644
--- a/src/resource_provider/uri_volume_profile.cpp
+++ b/src/resource_provider/uri_volume_profile.cpp
@@ -14,12 +14,12 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
+#include "resource_provider/uri_volume_profile.hpp"
+
 #include <map>
 #include <string>
 #include <tuple>
 
-#include <google/protobuf/util/json_util.h>
-
 #include <mesos/mesos.hpp>
 
 #include <mesos/module/volume_profile.hpp>
@@ -42,8 +42,9 @@
 #include <stout/strings.hpp>
 
 #include <csi/spec.hpp>
+#include <csi/utils.hpp>
 
-#include "resource_provider/uri_volume_profile.hpp"
+#include "resource_provider/volume_profile_utils.hpp"
 
 using namespace mesos;
 using namespace process;
@@ -54,52 +55,7 @@ using std::tuple;
 
 using google::protobuf::Map;
 
-using mesos::csi::UriVolumeProfileMapping;
-
-
-namespace csi {
-
-bool operator==(const VolumeCapability& left, const VolumeCapability& right) {
-  // NOTE: This enumeration is set when `block` or `mount` are set and
-  // covers the case where neither are set.
-  if (left.access_type_case() != right.access_type_case()) {
-    return false;
-  }
-
-  // NOTE: No need to check `block` for equality as that object is empty.
-
-  if (left.has_mount()) {
-    if (left.mount().fs_type() != right.mount().fs_type()) {
-      return false;
-    }
-
-    if (left.mount().mount_flags_size() != right.mount().mount_flags_size()) {
-      return false;
-    }
-
-    // NOTE: Ordering may or may not matter for these flags, but this helper
-    // only checks for complete equality.
-    for (int i = 0; i < left.mount().mount_flags_size(); i++) {
-      if (left.mount().mount_flags(i) != right.mount().mount_flags(i)) {
-        return false;
-      }
-    }
-  }
-
-  if (left.has_access_mode() != right.has_access_mode()) {
-    return false;
-  }
-
-  if (left.has_access_mode()) {
-    if (left.access_mode().mode() != right.access_mode().mode()) {
-      return false;
-    }
-  }
-
-  return true;
-}
-
-} // namespace csi {
+using mesos::resource_provider::VolumeProfileMapping;
 
 namespace mesos {
 namespace internal {
@@ -234,7 +190,7 @@ void UriVolumeProfileAdaptorProcess::poll()
 void UriVolumeProfileAdaptorProcess::_poll(const Try<string>& fetched)
 {
   if (fetched.isSome()) {
-    Try<UriVolumeProfileMapping> parsed = parse(fetched.get());
+    Try<VolumeProfileMapping> parsed = parseVolumeProfileMapping(fetched.get());
 
     if (parsed.isSome()) {
       notify(parsed.get());
@@ -254,7 +210,7 @@ void UriVolumeProfileAdaptorProcess::_poll(const Try<string>& fetched)
 
 
 void UriVolumeProfileAdaptorProcess::notify(
-    const UriVolumeProfileMapping& parsed)
+    const VolumeProfileMapping& parsed)
 {
   bool hasErrors = false;
 
@@ -327,95 +283,6 @@ void UriVolumeProfileAdaptorProcess::notify(
     << " total profiles";
 }
 
-
-Try<UriVolumeProfileMapping> UriVolumeProfileAdaptorProcess::parse(
-    const string& data)
-{
-  // Use Google's JSON utility function to parse the JSON string.
-  UriVolumeProfileMapping output;
-  google::protobuf::util::JsonParseOptions options;
-  options.ignore_unknown_fields = true;
-
-  google::protobuf::util::Status status =
-    google::protobuf::util::JsonStringToMessage(data, &output, options);
-
-  if (!status.ok()) {
-    return Error(
-        "Failed to parse UriVolumeProfileMapping message: "
-        + status.ToString());
-  }
-
-  Option<Error> validation = validate(output);
-  if (validation.isSome()) {
-    return Error(
-      "Fetched profile mapping failed validation with: " + validation->message);
-  }
-
-  return output;
-}
-
-
-Option<Error> UriVolumeProfileAdaptorProcess::validate(
-    const UriVolumeProfileMapping& mapping)
-{
-  auto iterator = mapping.profile_matrix().begin();
-  while (iterator != mapping.profile_matrix().end()) {
-    if (!iterator->second.has_volume_capabilities()) {
-      return Error(
-          "Profile '" + iterator->first + "' is missing the required field "
-          + "'volume_capabilities");
-    }
-
-    Option<Error> capabilities =
-      validate(iterator->second.volume_capabilities());
-
-    if (capabilities.isSome()) {
-      return Error(
-          "Profile '" + iterator->first + "' VolumeCapabilities are invalid: "
-          + capabilities->message);
-    }
-
-    // NOTE: The `create_parameters` field is optional and needs no further
-    // validation after parsing.
-
-    iterator++;
-  }
-
-  return None();
-}
-
-
-Option<Error> UriVolumeProfileAdaptorProcess::validate(
-    const csi::VolumeCapability& capability)
-{
-  if (capability.has_mount()) {
-    // The total size of this repeated field may not exceed 4 KB.
-    //
-    // TODO(josephw): The specification does not state how this maximum
-    // size is calculated. So this check is conservative and does not
-    // include padding or array separators in the size calculation.
-    size_t size = 0;
-    foreach (const string& flag, capability.mount().mount_flags()) {
-      size += flag.size();
-    }
-
-    if (Bytes(size) > Kilobytes(4)) {
-      return Error("Size of 'mount_flags' may not exceed 4 KB");
-    }
-  }
-
-  if (!capability.has_access_mode()) {
-    return Error("'access_mode' is a required field");
-  }
-
-  if (capability.access_mode().mode() ==
-      csi::VolumeCapability::AccessMode::UNKNOWN) {
-    return Error("'access_mode.mode' is unknown or not set");
-  }
-
-  return None();
-}
-
 } // namespace profile {
 } // namespace internal {
 } // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/4feb3670/src/resource_provider/uri_volume_profile.hpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/uri_volume_profile.hpp b/src/resource_provider/uri_volume_profile.hpp
index e6588f1..11c1727 100644
--- a/src/resource_provider/uri_volume_profile.hpp
+++ b/src/resource_provider/uri_volume_profile.hpp
@@ -38,8 +38,7 @@
 
 #include <csi/spec.hpp>
 
-// ONLY USEFUL AFTER RUNNING PROTOC.
-#include "csi/uri_volume_profile.pb.h"
+#include "resource_provider/volume_profile_utils.hpp"
 
 namespace mesos {
 namespace internal {
@@ -221,26 +220,9 @@ private:
   // of the module:
   //   * All known profiles must be included in the updated set.
   //   * All properties of known profiles must match those in the updated set.
-  void notify(const csi::UriVolumeProfileMapping& parsed);
-
-public:
-  // Helper for parsing a string as the expected data format.
-  // See the example string in the `--uri` help text for more details.
-  //
-  // NOTE: This method is public for testing purposes only.
-  static Try<csi::UriVolumeProfileMapping>
-    parse(const std::string& data);
+  void notify(const resource_provider::VolumeProfileMapping& parsed);
 
 private:
-  // Checks the fields inside a `UriVolumeProfileMapping` according to the
-  // comments above the protobuf.
-  static Option<Error> validate(
-      const csi::UriVolumeProfileMapping& mapping);
-
-  // Checks the fields inside a `VolumeCapability` according to the
-  // comments above the protobuf.
-  static Option<Error> validate(const csi::VolumeCapability& capability);
-
   Flags flags;
 
   // The last fetched profile mapping.
@@ -248,7 +230,7 @@ private:
   // Once added, profiles cannot be changed either.
   //
   // TODO(josephw): Consider persisting this mapping across agent restarts.
-  std::map<std::string, mesos::VolumeProfileAdaptor::ProfileInfo> data;
+  std::map<std::string, VolumeProfileAdaptor::ProfileInfo> data;
 
   // Convenience set of the keys in `data` above.
   // This module does not filter based on `CSIPluginInfo::type`, so this

http://git-wip-us.apache.org/repos/asf/mesos/blob/4feb3670/src/resource_provider/volume_profile.proto
----------------------------------------------------------------------
diff --git a/src/resource_provider/volume_profile.proto b/src/resource_provider/volume_profile.proto
new file mode 100644
index 0000000..c0628ec
--- /dev/null
+++ b/src/resource_provider/volume_profile.proto
@@ -0,0 +1,43 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+syntax = "proto3";
+
+import "csi.proto";
+
+package mesos.resource_provider;
+
+
+message VolumeProfileMapping {
+  message CSIManifest {
+    // Capabilities used for creating, publishing, and validating volumes.
+    // This field is REQUIRED.
+    //
+    // NOTE: The name of this field is plural because some CSI requests
+    // support multiple capabilities. However, Mesos currently does not
+    // support this.
+    .csi.VolumeCapability volume_capabilities = 1;
+
+    // Parameters passed to the CSI CreateVolume RPC.
+    // This field is OPTIONAL.
+    map<string, string> create_parameters = 2;
+  }
+
+  // Each map entry associates a profile name (type string) with the CSI
+  // capabilities and parameters used to make specific CSI requests.
+  // This field is OPTIONAL.
+  map<string, CSIManifest> profile_matrix = 1;
+}

http://git-wip-us.apache.org/repos/asf/mesos/blob/4feb3670/src/resource_provider/volume_profile_utils.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/volume_profile_utils.cpp b/src/resource_provider/volume_profile_utils.cpp
new file mode 100644
index 0000000..ec8b1f4
--- /dev/null
+++ b/src/resource_provider/volume_profile_utils.cpp
@@ -0,0 +1,121 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "resource_provider/volume_profile_utils.hpp"
+
+#include <google/protobuf/util/json_util.h>
+
+#include <stout/bytes.hpp>
+#include <stout/foreach.hpp>
+
+using std::string;
+
+using mesos::resource_provider::VolumeProfileMapping;
+
+namespace mesos{
+namespace internal {
+namespace profile {
+
+Try<VolumeProfileMapping> parseVolumeProfileMapping(
+    const string& data)
+{
+  // Use Google's JSON utility function to parse the JSON string.
+  VolumeProfileMapping output;
+  google::protobuf::util::JsonParseOptions options;
+  options.ignore_unknown_fields = true;
+
+  google::protobuf::util::Status status =
+    google::protobuf::util::JsonStringToMessage(data, &output, options);
+
+  if (!status.ok()) {
+    return Error(
+        "Failed to parse VolumeProfileMapping message: "
+        + status.ToString());
+  }
+
+  Option<Error> validation = validate(output);
+  if (validation.isSome()) {
+    return Error(
+      "Fetched profile mapping failed validation with: " + validation->message);
+  }
+
+  return output;
+}
+
+
+Option<Error> validate(const VolumeProfileMapping& mapping)
+{
+  auto iterator = mapping.profile_matrix().begin();
+  while (iterator != mapping.profile_matrix().end()) {
+    if (!iterator->second.has_volume_capabilities()) {
+      return Error(
+          "Profile '" + iterator->first + "' is missing the required field "
+          + "'volume_capabilities");
+    }
+
+    Option<Error> capabilities =
+      validate(iterator->second.volume_capabilities());
+
+    if (capabilities.isSome()) {
+      return Error(
+          "Profile '" + iterator->first + "' VolumeCapabilities are invalid: "
+          + capabilities->message);
+    }
+
+    // NOTE: The `create_parameters` field is optional and needs no further
+    // validation after parsing.
+
+    iterator++;
+  }
+
+  return None();
+}
+
+
+// TODO(chhsiao): Move this to CSI validation implementation file.
+Option<Error> validate(const csi::VolumeCapability& capability)
+{
+  if (capability.has_mount()) {
+    // The total size of this repeated field may not exceed 4 KB.
+    //
+    // TODO(josephw): The specification does not state how this maximum
+    // size is calculated. So this check is conservative and does not
+    // include padding or array separators in the size calculation.
+    size_t size = 0;
+    foreach (const string& flag, capability.mount().mount_flags()) {
+      size += flag.size();
+    }
+
+    if (Bytes(size) > Kilobytes(4)) {
+      return Error("Size of 'mount_flags' may not exceed 4 KB");
+    }
+  }
+
+  if (!capability.has_access_mode()) {
+    return Error("'access_mode' is a required field");
+  }
+
+  if (capability.access_mode().mode() ==
+      csi::VolumeCapability::AccessMode::UNKNOWN) {
+    return Error("'access_mode.mode' is unknown or not set");
+  }
+
+  return None();
+}
+
+} // namespace profile {
+} // namespace internal {
+} // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/4feb3670/src/resource_provider/volume_profile_utils.hpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/volume_profile_utils.hpp b/src/resource_provider/volume_profile_utils.hpp
new file mode 100644
index 0000000..29bdf44
--- /dev/null
+++ b/src/resource_provider/volume_profile_utils.hpp
@@ -0,0 +1,48 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef __RESOURCE_PROVIDER_URI_VOLUME_PROFILE_UTILS_HPP__
+#define __RESOURCE_PROVIDER_URI_VOLUME_PROFILE_UTILS_HPP__
+
+#include <stout/option.hpp>
+#include <stout/try.hpp>
+
+// ONLY USEFUL AFTER RUNNING PROTOC.
+#include "resource_provider/volume_profile.pb.h"
+
+namespace mesos {
+namespace internal {
+namespace profile {
+
+// Helper for parsing a string as the expected data format.
+Try<resource_provider::VolumeProfileMapping> parseVolumeProfileMapping(
+    const std::string& data);
+
+
+// Checks the fields inside a `VolumeProfileMapping` according to the
+// comments above the protobuf.
+Option<Error> validate(const resource_provider::VolumeProfileMapping& mapping);
+
+
+// Checks the fields inside a `VolumeCapability` according to the
+// comments above the protobuf.
+Option<Error> validate(const csi::VolumeCapability& capability);
+
+} // namespace profile {
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __RESOURCE_PROVIDER_URI_VOLUME_PROFILE_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/4feb3670/src/slave/flags.cpp
----------------------------------------------------------------------
diff --git a/src/slave/flags.cpp b/src/slave/flags.cpp
index 69f37fe..c789e7d 100644
--- a/src/slave/flags.cpp
+++ b/src/slave/flags.cpp
@@ -111,6 +111,7 @@ mesos::internal::slave::Flags::Flags()
       "  \"name\": \"lvm\"\n"
       "}");
 
+#ifdef ENABLE_GRPC
   add(&Flags::volume_profile_adaptor,
       "volume_profile_adaptor",
       "The name of the volume profile adaptor module that storage resource\n"
@@ -119,6 +120,7 @@ mesos::internal::slave::Flags::Flags()
       "If this flag is not specified, the default behavior for storage\n"
       "resource providers is to only expose resources for pre-existing\n"
       "volumes and not publish RAW volumes.");
+#endif
 
   add(&Flags::isolation,
       "isolation",

http://git-wip-us.apache.org/repos/asf/mesos/blob/4feb3670/src/slave/flags.hpp
----------------------------------------------------------------------
diff --git a/src/slave/flags.hpp b/src/slave/flags.hpp
index d5f95fe..1cf5272 100644
--- a/src/slave/flags.hpp
+++ b/src/slave/flags.hpp
@@ -46,7 +46,9 @@ public:
   bool hostname_lookup;
   Option<std::string> resources;
   Option<std::string> resource_provider_config_dir;
+#ifdef ENABLE_GRPC
   Option<std::string> volume_profile_adaptor;
+#endif
   std::string isolation;
   std::string launcher;
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/4feb3670/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 0093561..55dea08 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -36,6 +36,10 @@
 
 #include <mesos/module/authenticatee.hpp>
 
+#ifdef ENABLE_GRPC
+#include <mesos/resource_provider/volume_profile.hpp>
+#endif
+
 #include <process/after.hpp>
 #include <process/async.hpp>
 #include <process/check.hpp>

http://git-wip-us.apache.org/repos/asf/mesos/blob/4feb3670/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index c1322a3..9e2663e 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -40,10 +40,6 @@
 
 #include <mesos/module/authenticatee.hpp>
 
-#ifdef ENABLE_GRPC
-#include <mesos/resource_provider/volume_profile.hpp>
-#endif
-
 #include <mesos/slave/containerizer.hpp>
 #include <mesos/slave/qos_controller.hpp>
 #include <mesos/slave/resource_estimator.hpp>
@@ -102,6 +98,7 @@ namespace mesos {
 
 // Forward declarations.
 class Authorizer;
+class VolumeProfileAdaptor;
 
 namespace internal {
 namespace slave {
@@ -728,9 +725,7 @@ private:
 
   mesos::slave::QoSController* qosController;
 
-#ifdef ENABLE_GRPC
   std::shared_ptr<VolumeProfileAdaptor> volumeProfileAdaptor;
-#endif
 
   mesos::SecretGenerator* secretGenerator;
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/4feb3670/src/tests/volume_profile_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/volume_profile_tests.cpp b/src/tests/volume_profile_tests.cpp
index 41b43cb..2f5018f 100644
--- a/src/tests/volume_profile_tests.cpp
+++ b/src/tests/volume_profile_tests.cpp
@@ -19,6 +19,8 @@
 #include <tuple>
 #include <vector>
 
+#include <mesos/module/volume_profile.hpp>
+
 #include <mesos/resource_provider/volume_profile.hpp>
 
 #include <process/clock.hpp>
@@ -36,7 +38,10 @@
 
 #include <stout/os/write.hpp>
 
+#include "module/manager.hpp"
+
 #include "resource_provider/uri_volume_profile.hpp"
+#include "resource_provider/volume_profile_utils.hpp"
 
 #include "tests/flags.hpp"
 #include "tests/mesos.hpp"
@@ -44,8 +49,6 @@
 
 using namespace process;
 
-using mesos::VolumeProfileAdaptor;
-
 using std::map;
 using std::string;
 using std::tuple;
@@ -53,7 +56,7 @@ using std::vector;
 
 using google::protobuf::Map;
 
-using mesos::csi::UriVolumeProfileMapping;
+using mesos::resource_provider::VolumeProfileMapping;
 
 using testing::_;
 using testing::DoAll;
@@ -63,8 +66,45 @@ namespace mesos {
 namespace internal {
 namespace tests {
 
+constexpr char URI_VOLUME_PROFILE_ADAPTOR_NAME[] =
+  "org_apache_mesos_UriVolumeProfileAdaptor";
+
+
+class UriVolumeProfileTest : public MesosTest
+{
+public:
+  virtual void SetUp()
+  {
+    MesosTest::SetUp();
+
+    string libraryPath = getModulePath("uri_volume_profile");
+
+    Modules::Library* library = modules.add_libraries();
+    library->set_name("uri_volume_profile");
+    library->set_file(libraryPath);
+
+    Modules::Library::Module* module = library->add_modules();
+    module->set_name(URI_VOLUME_PROFILE_ADAPTOR_NAME);
+
+    ASSERT_SOME(modules::ModuleManager::load(modules));
+  }
+
+  virtual void TearDown()
+  {
+    foreach (const Modules::Library& library, modules.libraries()) {
+      foreach (const Modules::Library::Module& module, library.modules()) {
+        if (module.has_name()) {
+          ASSERT_SOME(modules::ModuleManager::unload(module.name()));
+        }
+      }
+    }
+
+    MesosTest::TearDown();
+  }
 
-class UriVolumeProfileTest : public MesosTest {};
+protected:
+  Modules modules;
+};
 
 
 // Exercises the volume profile map parsing method with the example found
@@ -89,8 +129,8 @@ TEST_F(UriVolumeProfileTest, ParseExample)
       }
     })~";
 
-  Try<UriVolumeProfileMapping> parsed =
-    mesos::internal::profile::UriVolumeProfileAdaptorProcess::parse(example);
+  Try<VolumeProfileMapping> parsed =
+    mesos::internal::profile::parseVolumeProfileMapping(example);
   ASSERT_SOME(parsed);
 
   const string key = "my-profile";
@@ -226,9 +266,8 @@ TEST_F(UriVolumeProfileTest, ParseInvalids)
 
   hashset<string> errors;
   for (size_t i = 0; i < examples.size(); i++) {
-    Try<UriVolumeProfileMapping> parsed =
-      mesos::internal::profile::UriVolumeProfileAdaptorProcess::parse(
-          examples[i]);
+    Try<VolumeProfileMapping> parsed =
+      mesos::internal::profile::parseVolumeProfileMapping(examples[i]);
 
     ASSERT_ERROR(parsed) << examples[i];
     ASSERT_EQ(0u, errors.count(parsed.error())) << parsed.error();
@@ -262,21 +301,34 @@ TEST_F(UriVolumeProfileTest, FetchFromFile)
   const Duration pollInterval = Seconds(10);
   const string csiPluginType = "ignored";
 
-  mesos::internal::profile::Flags flags;
-  flags.uri = Path(profileFile);
-  flags.poll_interval = pollInterval;
+  Parameters params;
+
+  Parameter* pollIntervalFlag = params.add_parameter();
+  pollIntervalFlag->set_key("poll_interval");
+  pollIntervalFlag->set_value(stringify(pollInterval));
+
+  // NOTE: We cannot use the `file://` URI to sepcify the file location,
+  // otherwise the file contents will be prematurely read. Therefore, we
+  // specify the absolute path of the file in the `uri` flag.
+  Parameter* uriFlag = params.add_parameter();
+  uriFlag->set_key("uri");
+  uriFlag->set_value(profileFile);
 
   // Create the module before we've written anything to the file.
   // This means the first poll will fail, so the module believes there
   // are no profiles at the moment.
-  mesos::internal::profile::UriVolumeProfileAdaptor module(flags);
+  Try<VolumeProfileAdaptor*> module =
+    modules::ModuleManager::create<VolumeProfileAdaptor>(
+        URI_VOLUME_PROFILE_ADAPTOR_NAME,
+        params);
+  ASSERT_SOME(module);
 
   // Start watching for updates.
   // By the time this returns, we'll know that the first poll has finished
   // because when the module reads from file, it does so immediately upon
   // being initialized.
   Future<hashset<string>> future =
-    module.watch(hashset<string>::EMPTY, csiPluginType);
+    module.get()->watch(hashset<string>::EMPTY, csiPluginType);
 
   // Write the single profile to the file.
   ASSERT_SOME(os::write(profileFile, contents));
@@ -290,7 +342,7 @@ TEST_F(UriVolumeProfileTest, FetchFromFile)
 
   // Translate the profile name into the profile mapping.
   Future<VolumeProfileAdaptor::ProfileInfo> mapping =
-    module.translate(profileName, csiPluginType);
+    module.get()->translate(profileName, csiPluginType);
 
   AWAIT_ASSERT_READY(mapping);
   ASSERT_TRUE(mapping.get().capability.has_block());
@@ -402,30 +454,36 @@ TEST_F(UriVolumeProfileTest, FetchFromHTTP)
     .WillOnce(DoAll(FutureSatisfy(&secondCall), Return(http::OK(contents2))))
     .WillOnce(Return(http::OK(contents3)));
 
-  mesos::internal::profile::Flags flags;
-  flags.poll_interval = pollInterval;
+  Parameters params;
+
+  Parameter* pollIntervalFlag = params.add_parameter();
+  pollIntervalFlag->set_key("poll_interval");
+  pollIntervalFlag->set_value(stringify(pollInterval));
 
-  // NOTE: Although we use the `Path` class here, this URI is not actually
-  // a path. The `Path` class is purely used so that `file://` type URIs are
-  // do not result in prematurely reading the file contents.
-  flags.uri = Path(stringify(process::http::URL(
+  Parameter* uriFlag = params.add_parameter();
+  uriFlag->set_key("uri");
+  uriFlag->set_value(stringify(process::http::URL(
       "http",
       process::address().ip,
       process::address().port,
       server.process->self().id + "/profiles")));
 
-  mesos::internal::profile::UriVolumeProfileAdaptor module(flags);
+  Try<VolumeProfileAdaptor*> module =
+    modules::ModuleManager::create<VolumeProfileAdaptor>(
+        URI_VOLUME_PROFILE_ADAPTOR_NAME,
+        params);
+  ASSERT_SOME(module);
 
   // Wait for the first HTTP poll to complete.
   Future<hashset<string>> future =
-    module.watch(hashset<string>::EMPTY, csiPluginType);
+    module.get()->watch(hashset<string>::EMPTY, csiPluginType);
 
   AWAIT_ASSERT_READY(future);
   ASSERT_EQ(1u, future->size());
   EXPECT_EQ("profile", *(future->begin()));
 
   // Start watching for an update to the list of profiles.
-  future = module.watch({"profile"}, csiPluginType);
+  future = module.get()->watch({"profile"}, csiPluginType);
 
   // Trigger the second HTTP poll.
   Clock::advance(pollInterval);
@@ -433,7 +491,7 @@ TEST_F(UriVolumeProfileTest, FetchFromHTTP)
 
   // Dispatch a call to the module, which ensures that the polling has actually
   // completed (not just the HTTP call).
-  AWAIT_ASSERT_READY(module.translate("profile", csiPluginType));
+  AWAIT_ASSERT_READY(module.get()->translate("profile", csiPluginType));
 
   // We don't expect the module to notify watcher(s) because the server's
   // response is considered invalid (the module does not allow profiles


[05/12] mesos git commit: Printed out the source profile and ID for a disk resource.

Posted by ji...@apache.org.
Printed out the source profile and ID for a disk resource.

`Resource::DiskInfo::Source` is printed out in the following formats:

  TYPE(profile,id):root

If the source has no profile and ID, then "(profile,id)" will be
skipped. If either the profile or the ID is missing, the missing part
will be replaced with an empty string. If the source has no root path,
":root" will be skipped.

Review: https://reviews.apache.org/r/64591/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b36152d6
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b36152d6
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b36152d6

Branch: refs/heads/master
Commit: b36152d6f2b6c07b6cf6cce641a8514b60913ff4
Parents: 5894f86
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
Authored: Tue Dec 19 11:25:03 2017 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Tue Dec 19 15:14:22 2017 -0800

----------------------------------------------------------------------
 src/common/resources.cpp | 26 ++++++++++++++++++--------
 src/v1/resources.cpp     | 26 ++++++++++++++++++--------
 2 files changed, 36 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/b36152d6/src/common/resources.cpp
----------------------------------------------------------------------
diff --git a/src/common/resources.cpp b/src/common/resources.cpp
index 2774372..919a03b 100644
--- a/src/common/resources.cpp
+++ b/src/common/resources.cpp
@@ -2087,17 +2087,27 @@ ostream& operator<<(ostream& stream, const Resource::DiskInfo::Source& source)
 {
   switch (source.type()) {
     case Resource::DiskInfo::Source::MOUNT:
-      return stream << "MOUNT"
-                    << (source.mount().has_root() ? ":" + source.mount().root()
-                                                  : "");
+      return stream
+        << "MOUNT"
+        << ((source.has_id() || source.has_profile())
+              ? "(" + source.id() + "," + source.profile() + ")" : "")
+        << (source.mount().has_root() ? ":" + source.mount().root() : "");
     case Resource::DiskInfo::Source::PATH:
-      return stream << "PATH"
-                    << (source.path().has_root() ? ":" + source.path().root()
-                                                 : "");
+      return stream
+        << "PATH"
+        << ((source.has_id() || source.has_profile())
+              ? "(" + source.id() + "," + source.profile() + ")" : "")
+        << (source.path().has_root() ? ":" + source.path().root() : "");
     case Resource::DiskInfo::Source::BLOCK:
-      return stream << "BLOCK";
+      return stream
+        << "BLOCK"
+        << ((source.has_id() || source.has_profile())
+              ? "(" + source.id() + "," + source.profile() + ")" : "");
     case Resource::DiskInfo::Source::RAW:
-      return stream << "RAW";
+      return stream
+        << "RAW"
+        << ((source.has_id() || source.has_profile())
+              ? "(" + source.id() + "," + source.profile() + ")" : "");
     case Resource::DiskInfo::Source::UNKNOWN:
       return stream << "UNKNOWN";
   }

http://git-wip-us.apache.org/repos/asf/mesos/blob/b36152d6/src/v1/resources.cpp
----------------------------------------------------------------------
diff --git a/src/v1/resources.cpp b/src/v1/resources.cpp
index 43d9b0f..365ffa7 100644
--- a/src/v1/resources.cpp
+++ b/src/v1/resources.cpp
@@ -2118,17 +2118,27 @@ ostream& operator<<(ostream& stream, const Resource::DiskInfo::Source& source)
 {
   switch (source.type()) {
     case Resource::DiskInfo::Source::MOUNT:
-      return stream << "MOUNT"
-                    << (source.mount().has_root() ? ":" + source.mount().root()
-                                                  : "");
+      return stream
+        << "MOUNT"
+        << ((source.has_id() || source.has_profile())
+              ? "(" + source.id() + "," + source.profile() + ")" : "")
+        << (source.mount().has_root() ? ":" + source.mount().root() : "");
     case Resource::DiskInfo::Source::PATH:
-      return stream << "PATH"
-                    << (source.path().has_root() ? ":" + source.path().root()
-                                                 : "");
+      return stream
+        << "PATH"
+        << ((source.has_id() || source.has_profile())
+              ? "(" + source.id() + "," + source.profile() + ")" : "")
+        << (source.path().has_root() ? ":" + source.path().root() : "");
     case Resource::DiskInfo::Source::BLOCK:
-      return stream << "BLOCK";
+      return stream
+        << "BLOCK"
+        << ((source.has_id() || source.has_profile())
+              ? "(" + source.id() + "," + source.profile() + ")" : "");
     case Resource::DiskInfo::Source::RAW:
-      return stream << "RAW";
+      return stream
+        << "RAW"
+        << ((source.has_id() || source.has_profile())
+              ? "(" + source.id() + "," + source.profile() + ")" : "");
     case Resource::DiskInfo::Source::UNKNOWN:
       return stream << "UNKNOWN";
   }


[09/12] mesos git commit: Fixed unit tests for volume profile module integration.

Posted by ji...@apache.org.
Fixed unit tests for volume profile module integration.

This patch loads `liburi_volume_profile` dynamically and uses it in the
unit tests of storage local resource providers.

NOTE: `volume_profile_tests.cpp` is removed from compilation to avoid\
loading `uri_volume_profile.proto` twice.

Review: https://reviews.apache.org/r/64659/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/cafd388e
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/cafd388e
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/cafd388e

Branch: refs/heads/master
Commit: cafd388e34e1f51606f1ca61e113dea0da2db78c
Parents: 0f3bdd2
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
Authored: Tue Dec 19 11:25:14 2017 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Tue Dec 19 15:14:22 2017 -0800

----------------------------------------------------------------------
 src/Makefile.am                                 |  5 +-
 ...agent_resource_provider_config_api_tests.cpp | 44 +++++++------
 .../storage_local_resource_provider_tests.cpp   | 65 ++++++++++++++++++++
 3 files changed, 90 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/cafd388e/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index 4623cfc..6b18015 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -2644,12 +2644,9 @@ endif
 
 if ENABLE_GRPC
 mesos_tests_SOURCES +=						\
-  csi/uri_volume_profile.pb.cc					\
-  resource_provider/uri_volume_profile.cpp			\
   tests/csi_client_tests.cpp					\
   tests/mock_csi_plugin.cpp					\
-  tests/mock_csi_plugin.hpp					\
-  tests/volume_profile_tests.cpp
+  tests/mock_csi_plugin.hpp
 
 if OS_LINUX
 mesos_tests_SOURCES +=						\

http://git-wip-us.apache.org/repos/asf/mesos/blob/cafd388e/src/tests/agent_resource_provider_config_api_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/agent_resource_provider_config_api_tests.cpp b/src/tests/agent_resource_provider_config_api_tests.cpp
index 3c68b73..f2486cb 100644
--- a/src/tests/agent_resource_provider_config_api_tests.cpp
+++ b/src/tests/agent_resource_provider_config_api_tests.cpp
@@ -69,10 +69,11 @@ public:
     ASSERT_SOME(os::mkdir(resourceProviderConfigDir));
   }
 
-  ResourceProviderInfo createResourceProviderInfo(const Bytes& capacity)
+  ResourceProviderInfo createResourceProviderInfo(const string& volumes)
   {
-    const string testCsiPluginWorkDir = path::join(sandbox.get(), "test");
-    CHECK_SOME(os::mkdir(testCsiPluginWorkDir));
+    Try<string> testCsiPluginWorkDir =
+      os::mkdtemp(path::join(sandbox.get(), "plugin_XXXXXX"));
+    CHECK_SOME(testCsiPluginWorkDir);
 
     string testCsiPluginPath =
       path::join(tests::flags.build_dir, "src", "test-csi-plugin");
@@ -91,7 +92,7 @@ public:
           "storage": {
             "plugin": {
               "type": "org.apache.mesos.csi.test",
-              "name": "slrp_test",
+              "name": "plugin",
               "containers": [
                 {
                   "services": [
@@ -103,8 +104,9 @@ public:
                     "value": "%s",
                     "arguments": [
                       "%s",
-                      "--available_capacity=%s",
-                      "--work_dir=%s"
+                      "--available_capacity=0B",
+                      "--work_dir=%s",
+                      "--volumes=%s"
                     ]
                   }
                 }
@@ -115,8 +117,8 @@ public:
         )~",
         testCsiPluginPath,
         testCsiPluginPath,
-        stringify(capacity),
-        testCsiPluginWorkDir);
+        testCsiPluginWorkDir.get(),
+        volumes);
 
     CHECK_SOME(resourceProviderConfig);
 
@@ -270,7 +272,7 @@ TEST_P(AgentResourceProviderConfigApiTest, ROOT_Add)
   driver.start();
 
   // Add a new resource provider.
-  ResourceProviderInfo info = createResourceProviderInfo(Gigabytes(4));
+  ResourceProviderInfo info = createResourceProviderInfo("volume1:4GB");
 
   AWAIT_EXPECT_RESPONSE_STATUS_EQ(
       http::OK().status,
@@ -334,7 +336,7 @@ TEST_P(AgentResourceProviderConfigApiTest, ROOT_AddConflict)
   const string configPath = path::join(resourceProviderConfigDir, "test.json");
   ASSERT_SOME(os::write(
       configPath,
-      stringify(JSON::protobuf(createResourceProviderInfo(Gigabytes(4))))));
+      stringify(JSON::protobuf(createResourceProviderInfo("volume1:4GB")))));
 
   Future<SlaveRegisteredMessage> slaveRegisteredMessage =
     FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
@@ -344,7 +346,7 @@ TEST_P(AgentResourceProviderConfigApiTest, ROOT_AddConflict)
 
   AWAIT_READY(slaveRegisteredMessage);
 
-  ResourceProviderInfo info = createResourceProviderInfo(Gigabytes(2));
+  ResourceProviderInfo info = createResourceProviderInfo("volume1:2GB");
 
   AWAIT_EXPECT_RESPONSE_STATUS_EQ(
       http::Conflict().status,
@@ -406,7 +408,7 @@ TEST_P(AgentResourceProviderConfigApiTest, ROOT_Update)
   const string configPath = path::join(resourceProviderConfigDir, "test.json");
   ASSERT_SOME(os::write(
       configPath,
-      stringify(JSON::protobuf(createResourceProviderInfo(Gigabytes(4))))));
+      stringify(JSON::protobuf(createResourceProviderInfo("volume1:4GB")))));
 
   Future<SlaveRegisteredMessage> slaveRegisteredMessage =
     FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
@@ -449,7 +451,7 @@ TEST_P(AgentResourceProviderConfigApiTest, ROOT_Update)
   // Wait for an offer having the old provider resource.
   AWAIT_READY(oldOffers);
 
-  ResourceProviderInfo info = createResourceProviderInfo(Gigabytes(2));
+  ResourceProviderInfo info = createResourceProviderInfo("volume1:2GB");
 
   AWAIT_EXPECT_RESPONSE_STATUS_EQ(
       http::OK().status,
@@ -521,7 +523,7 @@ TEST_P(AgentResourceProviderConfigApiTest, UpdateNotFound)
 
   AWAIT_READY(slaveRegisteredMessage);
 
-  ResourceProviderInfo info = createResourceProviderInfo(Gigabytes(4));
+  ResourceProviderInfo info = createResourceProviderInfo("volume1:4GB");
 
   AWAIT_EXPECT_RESPONSE_STATUS_EQ(
       http::NotFound().status,
@@ -568,7 +570,7 @@ TEST_P(AgentResourceProviderConfigApiTest, ROOT_Remove)
 
   // Generate a pre-existing config.
   const string configPath = path::join(resourceProviderConfigDir, "test.json");
-  ResourceProviderInfo info = createResourceProviderInfo(Gigabytes(4));
+  ResourceProviderInfo info = createResourceProviderInfo("volume1:4GB");
   ASSERT_SOME(os::write(configPath, stringify(JSON::protobuf(info))));
 
   Future<SlaveRegisteredMessage> slaveRegisteredMessage =
@@ -600,9 +602,10 @@ TEST_P(AgentResourceProviderConfigApiTest, ROOT_Remove)
       std::bind(&Resources::hasResourceProvider, lambda::_1))))
     .WillOnce(FutureArg<1>(&oldOffers));
 
-  // TODO(chhsiao): Wait for an rescinded offer once we implemented the
-  // logic to send `UpdateSlaveMessage` upon removal of a resource
-  // provider.
+  Future<OfferID> rescinded;
+
+  EXPECT_CALL(sched, offerRescinded(&driver, _))
+    .WillOnce(FutureArg<1>(&rescinded));
 
   driver.start();
 
@@ -617,7 +620,8 @@ TEST_P(AgentResourceProviderConfigApiTest, ROOT_Remove)
   // Check that the existing config is removed.
   EXPECT_FALSE(os::exists(configPath));
 
-  // TODO(chhsiao): Wait for the old offer to be rescinded.
+  // Wait for the old offer to be rescinded.
+  AWAIT_READY(rescinded);
 }
 
 
@@ -657,7 +661,7 @@ TEST_P(AgentResourceProviderConfigApiTest, RemoveNotFound)
 
   AWAIT_READY(slaveRegisteredMessage);
 
-  ResourceProviderInfo info = createResourceProviderInfo(Gigabytes(4));
+  ResourceProviderInfo info = createResourceProviderInfo("volume1:4GB");
 
   AWAIT_EXPECT_RESPONSE_STATUS_EQ(
       http::NotFound().status,

http://git-wip-us.apache.org/repos/asf/mesos/blob/cafd388e/src/tests/storage_local_resource_provider_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/storage_local_resource_provider_tests.cpp b/src/tests/storage_local_resource_provider_tests.cpp
index 1f11825..da0c2cc 100644
--- a/src/tests/storage_local_resource_provider_tests.cpp
+++ b/src/tests/storage_local_resource_provider_tests.cpp
@@ -19,6 +19,9 @@
 #include <process/gmock.hpp>
 
 #include <stout/hashmap.hpp>
+#include <stout/uri.hpp>
+
+#include "module/manager.hpp"
 
 #include "tests/flags.hpp"
 #include "tests/mesos.hpp"
@@ -39,6 +42,10 @@ namespace mesos {
 namespace internal {
 namespace tests {
 
+constexpr char URI_VOLUME_PROFILE_ADAPTOR_NAME[] =
+  "org_apache_mesos_UriVolumeProfileAdaptor";
+
+
 class StorageLocalResourceProviderTest : public MesosTest
 {
 public:
@@ -103,10 +110,62 @@ public:
     ASSERT_SOME(os::write(
         path::join(resourceProviderConfigDir, "test.json"),
         resourceProviderConfig.get()));
+
+    uriVolumeProfileConfigPath =
+      path::join(sandbox.get(), "volume_profiles.json");
+
+    ASSERT_SOME(os::write(
+        uriVolumeProfileConfigPath,
+        R"~(
+        {
+          "profile_matrix": {
+            "default" : {
+              "volume_capabilities" : {
+                "mount" : {},
+                "access_mode" : { "mode" : "SINGLE_NODE_WRITER" }
+              }
+            }
+          }
+        }
+        )~"));
+  }
+
+  virtual void TearDown()
+  {
+    // Unload modules.
+    foreach (const Modules::Library& library, modules.libraries()) {
+      foreach (const Modules::Library::Module& module, library.modules()) {
+        if (module.has_name()) {
+          ASSERT_SOME(modules::ModuleManager::unload(module.name()));
+        }
+      }
+    }
+
+    MesosTest::TearDown();
+  }
+
+  void loadUriVolumeProfileModule()
+  {
+    string libraryPath = getModulePath("uri_volume_profile");
+
+    Modules::Library* library = modules.add_libraries();
+    library->set_name("uri_volume_profile");
+    library->set_file(libraryPath);
+
+    Modules::Library::Module* module = library->add_modules();
+    module->set_name(URI_VOLUME_PROFILE_ADAPTOR_NAME);
+
+    Parameter* parameter = module->add_parameters();
+    parameter->set_key("uri");
+    parameter->set_value(uriVolumeProfileConfigPath);
+
+    ASSERT_SOME(modules::ModuleManager::load(modules));
   }
 
 protected:
+  Modules modules;
   string resourceProviderConfigDir;
+  string uriVolumeProfileConfigPath;
 };
 
 
@@ -115,6 +174,8 @@ protected:
 // that uses the test CSI plugin.
 TEST_F(StorageLocalResourceProviderTest, ROOT_NewVolume)
 {
+  loadUriVolumeProfileModule();
+
   Try<Owned<cluster::Master>> master = StartMaster();
   ASSERT_SOME(master);
 
@@ -140,6 +201,7 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_NewVolume)
   }
 
   flags.resource_provider_config_dir = resourceProviderConfigDir;
+  flags.volume_profile_adaptor = URI_VOLUME_PROFILE_ADAPTOR_NAME;
 
   Future<SlaveRegisteredMessage> slaveRegisteredMessage =
     FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
@@ -292,6 +354,8 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_NewVolume)
 // plugin, then destroy the volume while it is published.
 TEST_F(StorageLocalResourceProviderTest, ROOT_LaunchTask)
 {
+  loadUriVolumeProfileModule();
+
   Try<Owned<cluster::Master>> master = StartMaster();
   ASSERT_SOME(master);
 
@@ -317,6 +381,7 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_LaunchTask)
   }
 
   flags.resource_provider_config_dir = resourceProviderConfigDir;
+  flags.volume_profile_adaptor = URI_VOLUME_PROFILE_ADAPTOR_NAME;
 
   Future<SlaveRegisteredMessage> slaveRegisteredMessage =
     FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);


[08/12] mesos git commit: Updated logging for storage local resource provider.

Posted by ji...@apache.org.
Updated logging for storage local resource provider.

Review: https://reviews.apache.org/r/64664/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a36e2ec0
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a36e2ec0
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a36e2ec0

Branch: refs/heads/master
Commit: a36e2ec0d968b5ab48d14a9260151eeb8ade1c26
Parents: 6965084
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
Authored: Tue Dec 19 11:25:21 2017 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Tue Dec 19 15:14:22 2017 -0800

----------------------------------------------------------------------
 src/resource_provider/manager.cpp          |  4 ++-
 src/resource_provider/storage/provider.cpp | 40 ++++++++++++++-----------
 2 files changed, 25 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/a36e2ec0/src/resource_provider/manager.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/manager.cpp b/src/resource_provider/manager.cpp
index 2a167aa..e3fcb64 100644
--- a/src/resource_provider/manager.cpp
+++ b/src/resource_provider/manager.cpp
@@ -671,7 +671,9 @@ void ResourceProviderManagerProcess::updateState(
 
   LOG(INFO)
     << "Received UPDATE_STATE call with resources '" << update.resources()
-    << "' from resource provider " << resourceProvider->info.id();
+    << "' and " << offerOperations.size()
+    << " offer operations from resource provider "
+    << resourceProvider->info.id();
 
   ResourceProviderMessage::UpdateState updateState{
       resourceProvider->info,

http://git-wip-us.apache.org/repos/asf/mesos/blob/a36e2ec0/src/resource_provider/storage/provider.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp
index a6e4a0f..ab8c711 100644
--- a/src/resource_provider/storage/provider.cpp
+++ b/src/resource_provider/storage/provider.cpp
@@ -1132,8 +1132,8 @@ Future<Nothing> StorageLocalResourceProviderProcess::reconcileStatusUpdates()
 
           auto die = [=](const string& message) {
             LOG(ERROR)
-              << "Failed to update status of offer operation with UUID " << uuid
-              << ": " << message;
+              << "Failed to update status of offer operation (uuid: " << uuid
+              << "): " << message;
             fatal();
           };
 
@@ -1157,7 +1157,7 @@ Future<Nothing> StorageLocalResourceProviderProcess::reconcileStatusUpdates()
 
         auto err = [](const id::UUID& uuid, const string& message) {
           LOG(ERROR)
-            << "Falied to apply offer operation with UUID " << uuid << ": "
+            << "Falied to apply offer operation (uuid: " << uuid << "): "
             << message;
         };
 
@@ -1356,8 +1356,8 @@ void StorageLocalResourceProviderProcess::applyOfferOperation(
   CHECK_SOME(uuid);
 
   LOG(INFO)
-    << "Received " << operation.info().type() << " operation with UUID "
-    << uuid.get();
+    << "Received " << operation.info().type() << " operation '"
+    << operation.info().id() << "' (uuid: " << uuid.get() << ")";
 
   CHECK(!offerOperations.contains(uuid.get()));
   offerOperations[uuid.get()] = protobuf::createOfferOperation(
@@ -1396,8 +1396,7 @@ void StorageLocalResourceProviderProcess::applyOfferOperation(
 
   auto err = [](const id::UUID& uuid, const string& message) {
     LOG(ERROR)
-      << "Failed to apply offer operation with UUID " << uuid << ": "
-      << message;
+      << "Failed to apply offer operation (uuid: " << uuid << "): " << message;
   };
 
   result
@@ -1559,8 +1558,8 @@ void StorageLocalResourceProviderProcess::acknowledgeOfferOperation(
 
   auto err = [](const id::UUID& uuid, const string& message) {
     LOG(ERROR)
-      << "Failed to acknowledge status update for offer operation with UUID "
-      << uuid << ": " << message;
+      << "Failed to acknowledge status update for offer operation (uuid: "
+      << uuid << "): " << message;
   };
 
   // NOTE: It is possible that an incoming acknowledgement races with an
@@ -1628,8 +1627,8 @@ void StorageLocalResourceProviderProcess::reconcileOfferOperations(
 
     auto die = [=](const string& message) {
       LOG(ERROR)
-        << "Failed to update status of offer operation with UUID " << uuid.get()
-        << ": " << message;
+        << "Failed to update status of offer operation (uuid: " << uuid.get()
+        << "): " << message;
       fatal();
     };
 
@@ -2636,11 +2635,11 @@ Future<Nothing> StorageLocalResourceProviderProcess::_applyOfferOperation(
         LOG(INFO)
           << "Applying conversion from '" << conversions->at(0).consumed
           << "' to '" << conversions->at(0).converted
-          << "' for offer operation with UUID " << operationUuid;
+          << "' for offer operation (uuid: " << operationUuid << ")";
       } else {
         LOG(ERROR)
-          << "Failed to apply offer operation with UUID " << operationUuid
-          << ": " << conversions.error();
+          << "Failed to apply offer operation (uuid: " << operationUuid
+          << "): " << conversions.error();
       }
 
       promise->associate(
@@ -2949,8 +2948,8 @@ Try<Nothing> StorageLocalResourceProviderProcess::updateOfferOperationStatus(
 
   auto die = [=](const string& message) {
     LOG(ERROR)
-      << "Failed to update status of offer operation with UUID "
-      << operationUuid << ": " << message;
+      << "Failed to update status of offer operation (uuid: " << operationUuid
+      << "): " << message;
     fatal();
   };
 
@@ -3027,6 +3026,11 @@ void StorageLocalResourceProviderProcess::sendResourceProviderStateUpdate()
     update->add_operations()->CopyFrom(operation);
   }
 
+  LOG(INFO)
+    << "Sending UPDATE_STATE call with resources '" << totalResources
+    << "' and " << update->operations_size()
+    << " offer operations to agent " << slaveId;
+
   auto err = [](const ResourceProviderID& id, const string& message) {
     LOG(ERROR)
       << "Failed to update state for resource provider " << id << ": "
@@ -3061,8 +3065,8 @@ void StorageLocalResourceProviderProcess::sendOfferOperationStatusUpdate(
 
   auto err = [](const id::UUID& uuid, const string& message) {
     LOG(ERROR)
-      << "Failed to send status update for offer operation with UUID " << uuid
-      << ": " << message;
+      << "Failed to send status update for offer operation (uuid: " << uuid
+      << "): " << message;
   };
 
   Try<id::UUID> uuid =


[10/12] mesos git commit: Dropping instead of failing offer operations with mismatched versions.

Posted by ji...@apache.org.
Dropping instead of failing offer operations with mismatched versions.

Review: https://reviews.apache.org/r/64692/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/db4a6a55
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/db4a6a55
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/db4a6a55

Branch: refs/heads/master
Commit: db4a6a55b1a05c42e8a89f03035db709192c1c07
Parents: a36e2ec
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
Authored: Tue Dec 19 11:25:26 2017 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Tue Dec 19 15:14:22 2017 -0800

----------------------------------------------------------------------
 src/resource_provider/storage/provider.cpp | 137 +++++++++++++++---------
 1 file changed, 86 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/db4a6a55/src/resource_provider/storage/provider.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp
index ab8c711..772952e 100644
--- a/src/resource_provider/storage/provider.cpp
+++ b/src/resource_provider/storage/provider.cpp
@@ -382,6 +382,11 @@ private:
   Future<Resources> getCapacities();
 
   Future<Nothing> _applyOfferOperation(const id::UUID& operationUuid);
+  void dropOfferOperation(
+      const id::UUID& operationUuid,
+      const Option<FrameworkID>& frameworkId,
+      const Option<OfferOperationID>& operationId,
+      const string& message);
 
   Future<vector<ResourceConversion>> applyCreateVolumeOrBlock(
       const Resource& resource,
@@ -1359,47 +1364,58 @@ void StorageLocalResourceProviderProcess::applyOfferOperation(
     << "Received " << operation.info().type() << " operation '"
     << operation.info().id() << "' (uuid: " << uuid.get() << ")";
 
-  CHECK(!offerOperations.contains(uuid.get()));
-  offerOperations[uuid.get()] = protobuf::createOfferOperation(
-      operation.info(),
-      protobuf::createOfferOperationStatus(
-          OFFER_OPERATION_PENDING,
-          operation.info().has_id()
-            ? operation.info().id() : Option<OfferOperationID>::none()),
-      operation.has_framework_id()
-        ? operation.framework_id() : Option<FrameworkID>::none(),
-      slaveId,
-      uuid.get());
+  Option<FrameworkID> frameworkId = operation.has_framework_id()
+    ? operation.framework_id() : Option<FrameworkID>::none();
+  Option<OfferOperationID> operationId = operation.info().has_id()
+    ? operation.info().id() : Option<OfferOperationID>::none();
 
-  checkpointResourceProviderState();
+  if (state == SUBSCRIBED) {
+    return dropOfferOperation(
+        uuid.get(),
+        frameworkId,
+        operationId,
+        "Cannot apply offer operation in SUBSCRIBED state");
+  }
 
-  Future<Nothing> result;
+  if (reconciling) {
+    return dropOfferOperation(
+        uuid.get(),
+        frameworkId,
+        operationId,
+        "Cannot apply offer operation when reconciling storage pools");
+  }
 
   Try<id::UUID> operationVersion =
     id::UUID::fromBytes(operation.resource_version_uuid().value());
-
   CHECK_SOME(operationVersion);
 
-  if (state == SUBSCRIBED) {
-    result = updateOfferOperationStatus(uuid.get(), Error(
-        "Cannot apply offer operation in SUBSCRIBED state"));
-  } else if (reconciling) {
-    result = updateOfferOperationStatus(uuid.get(), Error(
-        "Cannot apply offer operation when reconciling storage pools"));
-  } else if (operationVersion.get() != resourceVersion) {
-    result = updateOfferOperationStatus(uuid.get(), Error(
+  if (operationVersion.get() != resourceVersion) {
+    return dropOfferOperation(
+        uuid.get(),
+        frameworkId,
+        operationId,
         "Mismatched resource version " + stringify(operationVersion.get()) +
-        " (expected: " + stringify(resourceVersion) + ")"));
-  } else {
-    result = _applyOfferOperation(uuid.get());
+        " (expected: " + stringify(resourceVersion) + ")");
   }
 
+  CHECK(!offerOperations.contains(uuid.get()));
+  offerOperations[uuid.get()] = protobuf::createOfferOperation(
+      operation.info(),
+      protobuf::createOfferOperationStatus(
+          OFFER_OPERATION_PENDING,
+          operationId),
+      frameworkId,
+      slaveId,
+      uuid.get());
+
+  checkpointResourceProviderState();
+
   auto err = [](const id::UUID& uuid, const string& message) {
     LOG(ERROR)
       << "Failed to apply offer operation (uuid: " << uuid << "): " << message;
   };
 
-  result
+  _applyOfferOperation(uuid.get())
     .onFailed(std::bind(err, uuid.get(), lambda::_1))
     .onDiscarded(std::bind(err, uuid.get(), "future discarded"));
 }
@@ -1612,29 +1628,11 @@ void StorageLocalResourceProviderProcess::reconcileOfferOperations(
       continue;
     }
 
-    OfferOperationStatusUpdate update =
-      protobuf::createOfferOperationStatusUpdate(
-          uuid.get(),
-          protobuf::createOfferOperationStatus(
-              OFFER_OPERATION_DROPPED,
-              None(),
-              None(),
-              None(),
-              id::UUID::random()),
-          None(),
-          None(),
-          slaveId);
-
-    auto die = [=](const string& message) {
-      LOG(ERROR)
-        << "Failed to update status of offer operation (uuid: " << uuid.get()
-        << "): " << message;
-      fatal();
-    };
-
-    statusUpdateManager.update(std::move(update), false)
-      .onFailed(defer(self(), std::bind(die, lambda::_1)))
-      .onDiscarded(defer(self(), std::bind(die, "future discarded")));
+    dropOfferOperation(
+        uuid.get(),
+        None(),
+        None(),
+        "Unknown offer operation");
   }
 }
 
@@ -2551,7 +2549,7 @@ Future<Resources> StorageLocalResourceProviderProcess::getCapacities()
 }
 
 
-// Applies the offer operation. Conventional operations will be
+// Applies the offer operation. Speculative operations will be
 // synchronously applied. Do nothing if the operation is already in a
 // terminal state.
 Future<Nothing> StorageLocalResourceProviderProcess::_applyOfferOperation(
@@ -2570,7 +2568,7 @@ Future<Nothing> StorageLocalResourceProviderProcess::_applyOfferOperation(
     case Offer::Operation::UNRESERVE:
     case Offer::Operation::CREATE:
     case Offer::Operation::DESTROY: {
-      // Synchronously apply the conventional operations to ensure that
+      // Synchronously apply the speculative operations to ensure that
       // its result is reflected in the total resources before any of
       // its succeeding operations is applied.
       return updateOfferOperationStatus(
@@ -2660,6 +2658,43 @@ Future<Nothing> StorageLocalResourceProviderProcess::_applyOfferOperation(
 }
 
 
+// Sends `OFFER_OPERATION_DROPPED` without checkpointing the status of
+// the offer operation.
+void StorageLocalResourceProviderProcess::dropOfferOperation(
+    const id::UUID& operationUuid,
+    const Option<FrameworkID>& frameworkId,
+    const Option<OfferOperationID>& operationId,
+    const string& message)
+{
+  LOG(WARNING)
+    << "Dropping offer operation (uuid: " << operationUuid << "): " << message;
+
+  OfferOperationStatusUpdate update =
+    protobuf::createOfferOperationStatusUpdate(
+       operationUuid,
+       protobuf::createOfferOperationStatus(
+           OFFER_OPERATION_DROPPED,
+           operationId,
+           message,
+           None(),
+           id::UUID::random()),
+       None(),
+       frameworkId,
+       slaveId);
+
+  auto die = [=](const string& message) {
+    LOG(ERROR)
+      << "Failed to update status of offer operation (uuid: " << operationUuid
+      << "): " << message;
+    fatal();
+  };
+
+  statusUpdateManager.update(std::move(update), false)
+    .onFailed(defer(self(), std::bind(die, lambda::_1)))
+    .onDiscarded(defer(self(), std::bind(die, "future discarded")));
+}
+
+
 Future<vector<ResourceConversion>>
 StorageLocalResourceProviderProcess::applyCreateVolumeOrBlock(
     const Resource& resource,


[04/12] mesos git commit: Fixed a corner case for pre-existing volumes created by old RPs.

Posted by ji...@apache.org.
Fixed a corner case for pre-existing volumes created by old RPs.

When an agent failed over and registered as a new one, the RP will be
registered as a new one as well, but it could pick up the checkpointed
states for volumes created by an old RP as pre-existing volumes. We
should use the checkpointed capabilities for these volumes instead of
using the default mount or block capability.

Review: https://reviews.apache.org/r/64621/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/5894f863
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/5894f863
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/5894f863

Branch: refs/heads/master
Commit: 5894f8632cb0072c7b24ac8181dc852d083e2263
Parents: 884226e
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
Authored: Tue Dec 19 11:24:59 2017 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Tue Dec 19 15:14:22 2017 -0800

----------------------------------------------------------------------
 src/resource_provider/storage/provider.cpp | 89 ++++++++++++++++---------
 1 file changed, 56 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/5894f863/src/resource_provider/storage/provider.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp
index 79d7f60..a103494 100644
--- a/src/resource_provider/storage/provider.cpp
+++ b/src/resource_provider/storage/provider.cpp
@@ -445,9 +445,6 @@ private:
   LinkedHashMap<id::UUID, OfferOperation> offerOperations;
   Resources totalResources;
   id::UUID resourceVersion;
-
-  // We maintain the state of a CSI volume if and only if its
-  // corresponding resource is not RAW.
   hashmap<string, VolumeData> volumes;
 };
 
@@ -2246,6 +2243,9 @@ Future<string> StorageLocalResourceProviderProcess::validateCapability(
     const Option<Labels>& metadata,
     const csi::VolumeCapability& capability)
 {
+  // NOTE: This can only be called for newly discovered volumes.
+  CHECK(!volumes.contains(volumeId));
+
   return getService(controllerContainerId)
     .then(defer(self(), [=](csi::Client client) {
       google::protobuf::Map<string, string> volumeAttributes;
@@ -2270,21 +2270,13 @@ Future<string> StorageLocalResourceProviderProcess::validateCapability(
                 "': " + response.message());
           }
 
-          if (volumes.contains(volumeId)) {
-            // The resource provider failed over after the last
-            // `ValidateVolumeCapability` call, but before the offer
-            // operation status was checkpointed.
-            CHECK_EQ(csi::state::VolumeState::CREATED,
-                     volumes.at(volumeId).state.state());
-          } else {
-            csi::state::VolumeState volumeState;
-            volumeState.set_state(csi::state::VolumeState::CREATED);
-            volumeState.mutable_volume_capability()->CopyFrom(capability);
-            *volumeState.mutable_volume_attributes() = volumeAttributes;
+          csi::state::VolumeState volumeState;
+          volumeState.set_state(csi::state::VolumeState::CREATED);
+          volumeState.mutable_volume_capability()->CopyFrom(capability);
+          *volumeState.mutable_volume_attributes() = volumeAttributes;
 
-            volumes.put(volumeId, std::move(volumeState));
-            checkpointVolumeState(volumeId);
-          }
+          volumes.put(volumeId, std::move(volumeState));
+          checkpointVolumeState(volumeId);
 
           return volumeId;
         }));
@@ -2455,8 +2447,13 @@ StorageLocalResourceProviderProcess::applyCreateVolumeOrBlock(
   // For 1, we check if its profile is mount or block capable, then
   // call `CreateVolume` with the operation UUID as the name (so that
   // the same volume will be returned when recovering from a failover).
-  // For 2, we call `ValidateVolumeCapabilities` with a default mount or
-  // block capability.
+  //
+  // For 2, there are two scenarios:
+  //   a. If the volume has a checkpointed state (becasue it was created
+  //      by a previous resource provider), we simply check if its
+  //      checkpointed capability supports the conversion.
+  //   b. If the volume is newly discovered, `ValidateVolumeCapabilities`
+  //      is called with a default mount or block capability.
   CHECK_NE(resource.disk().source().has_profile(),
            resource.disk().source().has_id());
 
@@ -2466,6 +2463,7 @@ StorageLocalResourceProviderProcess::applyCreateVolumeOrBlock(
     case Resource::DiskInfo::Source::PATH:
     case Resource::DiskInfo::Source::MOUNT: {
       if (resource.disk().source().has_profile()) {
+        CHECK(profiles.contains(resource.disk().source().profile()));
         if (!profiles.at(resource.disk().source().profile())
                .capability.has_mount()) {
           return Failure(
@@ -2481,18 +2479,31 @@ StorageLocalResourceProviderProcess::applyCreateVolumeOrBlock(
             Bytes(resource.scalar().value(), Bytes::MEGABYTES),
             profiles.at(resource.disk().source().profile()));
       } else {
-        // No need to call `ValidateVolumeCapabilities` sequentially
-        // since the volume is not used and thus not in `volumes` yet.
-        created = validateCapability(
-            resource.disk().source().id(),
-            resource.disk().source().has_metadata()
-              ? resource.disk().source().metadata() : Option<Labels>::none(),
-            defaultMountCapability);
+        const string& volumeId = resource.disk().source().id();
+
+        if (volumes.contains(volumeId)) {
+          if (!volumes.at(volumeId).state.volume_capability().has_mount()) {
+            return Failure(
+                "Volume '" + volumeId + "' cannot be converted to a " +
+                stringify(type) + " disk resource");
+          }
+
+          created = volumeId;
+        } else {
+          // No need to call `ValidateVolumeCapabilities` sequentially
+          // since the volume is not used and thus not in `volumes` yet.
+          created = validateCapability(
+              volumeId,
+              resource.disk().source().has_metadata()
+                ? resource.disk().source().metadata() : Option<Labels>::none(),
+              defaultMountCapability);
+        }
       }
       break;
     }
     case Resource::DiskInfo::Source::BLOCK: {
       if (resource.disk().source().has_profile()) {
+        CHECK(profiles.contains(resource.disk().source().profile()));
         if (!profiles.at(resource.disk().source().profile())
                .capability.has_block()) {
           return Failure(
@@ -2508,13 +2519,25 @@ StorageLocalResourceProviderProcess::applyCreateVolumeOrBlock(
             Bytes(resource.scalar().value(), Bytes::MEGABYTES),
             profiles.at(resource.disk().source().profile()));
       } else {
-        // No need to call `ValidateVolumeCapabilities` sequentially
-        // since the volume is not used and thus not in `volumes` yet.
-        created = validateCapability(
-            resource.disk().source().id(),
-            resource.disk().source().has_metadata()
-              ? resource.disk().source().metadata() : Option<Labels>::none(),
-            defaultBlockCapability);
+        const string& volumeId = resource.disk().source().id();
+
+        if (volumes.contains(volumeId)) {
+          if (!volumes.at(volumeId).state.volume_capability().has_block()) {
+            return Failure(
+                "Volume '" + volumeId + "' cannot be converted to a " +
+                stringify(type) + " disk resource");
+          }
+
+          created = volumeId;
+        } else {
+          // No need to call `ValidateVolumeCapabilities` sequentially
+          // since the volume is not used and thus not in `volumes` yet.
+          created = validateCapability(
+              volumeId,
+              resource.disk().source().has_metadata()
+                ? resource.disk().source().metadata() : Option<Labels>::none(),
+              defaultBlockCapability);
+        }
       }
       break;
     }


[03/12] mesos git commit: Fixed the calculation of available capacity test CSI plugin.

Posted by ji...@apache.org.
Fixed the calculation of available capacity test CSI plugin.

The size of existing volumes should be substracted away from the
available capacity. Otherwise, the total capacity will keep increasing
every time the plugin restarts.

Review: https://reviews.apache.org/r/64665/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/69650845
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/69650845
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/69650845

Branch: refs/heads/master
Commit: 69650845fa9bd88926d37fc7a2b5e40fbeb89b54
Parents: cafd388
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
Authored: Tue Dec 19 11:25:19 2017 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Tue Dec 19 15:14:22 2017 -0800

----------------------------------------------------------------------
 src/examples/test_csi_plugin.cpp | 7 ++++++-
 1 file changed, 6 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/69650845/src/examples/test_csi_plugin.cpp
----------------------------------------------------------------------
diff --git a/src/examples/test_csi_plugin.cpp b/src/examples/test_csi_plugin.cpp
index 742aea9..f6b2c98 100644
--- a/src/examples/test_csi_plugin.cpp
+++ b/src/examples/test_csi_plugin.cpp
@@ -79,7 +79,7 @@ public:
     add(&Flags::available_capacity,
         "available_capacity",
         "The available disk capacity managed by the plugin, in addition\n"
-        "to the pre-existing volumes.");
+        "to the pre-existing volumes specified in the --volumes flag.");
 
     add(&Flags::volumes,
         "volumes",
@@ -123,6 +123,11 @@ public:
 
       CHECK(!volumes.contains(volume->id));
       volumes.put(volume->id, volume.get());
+
+      if (!_volumes.contains(volume->id)) {
+        CHECK_GE(availableCapacity, volume->size);
+        availableCapacity -= volume->size;
+      }
     }
 
     foreachpair (const string& name, const Bytes& capacity, _volumes) {


[02/12] mesos git commit: Added unit tests for storage local resource provider recovery.

Posted by ji...@apache.org.
Added unit tests for storage local resource provider recovery.

Review: https://reviews.apache.org/r/64596/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/78cc4e94
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/78cc4e94
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/78cc4e94

Branch: refs/heads/master
Commit: 78cc4e944209556b73461b7b6bbbbe47b43b4032
Parents: 098f9d8
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
Authored: Tue Dec 19 11:25:33 2017 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Tue Dec 19 15:14:22 2017 -0800

----------------------------------------------------------------------
 .../storage_local_resource_provider_tests.cpp   | 430 +++++++++++++++++++
 1 file changed, 430 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/78cc4e94/src/tests/storage_local_resource_provider_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/storage_local_resource_provider_tests.cpp b/src/tests/storage_local_resource_provider_tests.cpp
index da0c2cc..516e56a 100644
--- a/src/tests/storage_local_resource_provider_tests.cpp
+++ b/src/tests/storage_local_resource_provider_tests.cpp
@@ -36,6 +36,7 @@ using process::Future;
 using process::Owned;
 
 using testing::Args;
+using testing::AtMost;
 using testing::Sequence;
 
 namespace mesos {
@@ -349,6 +350,202 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_NewVolume)
 }
 
 
+// This test verifies that a framework can destroy a new volume created
+// from the storage pool of a storage local resource provider that uses
+// the test CSI plugin after recovery.
+TEST_F(StorageLocalResourceProviderTest, ROOT_NewVolumeRecovery)
+{
+  loadUriVolumeProfileModule();
+
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+
+  slave::Flags flags = CreateSlaveFlags();
+  flags.isolation = "filesystem/linux";
+
+  // Disable HTTP authentication to simplify resource provider interactions.
+  flags.authenticate_http_readwrite = false;
+
+  // Set the resource provider capability and other required capabilities.
+  constexpr SlaveInfo::Capability::Type capabilities[] = {
+    SlaveInfo::Capability::MULTI_ROLE,
+    SlaveInfo::Capability::HIERARCHICAL_ROLE,
+    SlaveInfo::Capability::RESERVATION_REFINEMENT,
+    SlaveInfo::Capability::RESOURCE_PROVIDER
+  };
+
+  flags.agent_features = SlaveCapabilities();
+  foreach (SlaveInfo::Capability::Type type, capabilities) {
+    flags.agent_features->add_capabilities()->set_type(type);
+  }
+
+  flags.resource_provider_config_dir = resourceProviderConfigDir;
+  flags.volume_profile_adaptor = URI_VOLUME_PROFILE_ADAPTOR_NAME;
+
+  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags);
+  ASSERT_SOME(slave);
+
+  AWAIT_READY(slaveRegisteredMessage);
+
+  // Register a framework to exercise offer operations.
+  FrameworkInfo framework = DEFAULT_FRAMEWORK_INFO;
+  framework.set_roles(0, "storage");
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, framework, master.get()->pid, DEFAULT_CREDENTIAL);
+
+  // We use the filter explicitly here so that the resources will not
+  // be filtered for 5 seconds (the default).
+  Filters filters;
+  filters.set_refuse_seconds(0);
+
+  EXPECT_CALL(sched, registered(&driver, _, _));
+
+  // The framework is expected to see the following offers in sequence:
+  //   1. One containing a RAW disk resource before `CREATE_VOLUME`.
+  //   2. One containing a MOUNT disk resource after `CREATE_VOLUME`.
+  //   3. One containing a MOUNT disk resource after the agent recovers
+  //      from a failover.
+  //   4. One containing a RAW disk resource after `DSTROY_VOLUME`.
+  Future<vector<Offer>> rawDiskOffers;
+  Future<vector<Offer>> volumeCreatedOffers;
+  Future<vector<Offer>> agentRecoveredOffers;
+  Future<vector<Offer>> volumeDestroyedOffers;
+
+  Sequence offers;
+
+  // We are only interested in storage pools and volume created from
+  // them, which have a "default" profile.
+  auto hasSourceType = [](
+      const Resource& r,
+      const Resource::DiskInfo::Source::Type& type) {
+    return r.has_disk() &&
+      r.disk().has_source() &&
+      r.disk().source().has_profile() &&
+      r.disk().source().profile() == "default" &&
+      r.disk().source().type() == type;
+  };
+
+  // Decline offers that contain only the agent's default resources.
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillRepeatedly(DeclineOffers());
+
+  EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource(
+      std::bind(hasSourceType, lambda::_1, Resource::DiskInfo::Source::RAW))))
+    .InSequence(offers)
+    .WillOnce(FutureArg<1>(&rawDiskOffers));
+
+  EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource(
+      std::bind(hasSourceType, lambda::_1, Resource::DiskInfo::Source::MOUNT))))
+    .InSequence(offers)
+    .WillOnce(FutureArg<1>(&volumeCreatedOffers))
+    .WillOnce(FutureArg<1>(&agentRecoveredOffers));
+
+  EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource(
+      std::bind(hasSourceType, lambda::_1, Resource::DiskInfo::Source::RAW))))
+    .InSequence(offers)
+    .WillOnce(FutureArg<1>(&volumeDestroyedOffers));
+
+  EXPECT_CALL(sched, offerRescinded(_, _))
+    .Times(AtMost(1));
+
+  driver.start();
+
+  AWAIT_READY(rawDiskOffers);
+  ASSERT_FALSE(rawDiskOffers->empty());
+
+  Option<Resource> source;
+
+  foreach (const Resource& resource, rawDiskOffers->at(0).resources()) {
+    if (hasSourceType(resource, Resource::DiskInfo::Source::RAW)) {
+      source = resource;
+      break;
+    }
+  }
+
+  ASSERT_SOME(source);
+
+  // Create a volume.
+  driver.acceptOffers(
+      {rawDiskOffers->at(0).id()},
+      {CREATE_VOLUME(source.get(), Resource::DiskInfo::Source::MOUNT)},
+      filters);
+
+  AWAIT_READY(volumeCreatedOffers);
+  ASSERT_FALSE(volumeCreatedOffers->empty());
+
+  Option<Resource> volume;
+
+  foreach (const Resource& resource, volumeCreatedOffers->at(0).resources()) {
+    if (hasSourceType(resource, Resource::DiskInfo::Source::MOUNT)) {
+      volume = resource;
+      break;
+    }
+  }
+
+  ASSERT_SOME(volume);
+  ASSERT_TRUE(volume->disk().source().has_id());
+  ASSERT_TRUE(volume->disk().source().has_metadata());
+  ASSERT_TRUE(volume->disk().source().has_mount());
+  ASSERT_TRUE(volume->disk().source().mount().has_root());
+  EXPECT_FALSE(path::absolute(volume->disk().source().mount().root()));
+
+  // Check if the volume is actually created by the test CSI plugin.
+  Option<string> volumePath;
+
+  foreach (const Label& label, volume->disk().source().metadata().labels()) {
+    if (label.key() == "path") {
+      volumePath = label.value();
+      break;
+    }
+  }
+
+  ASSERT_SOME(volumePath);
+  EXPECT_TRUE(os::exists(volumePath.get()));
+
+  // Restart the agent.
+  slave.get()->terminate();
+
+  slave = StartSlave(detector.get(), flags);
+  ASSERT_SOME(slave);
+
+  AWAIT_READY(agentRecoveredOffers);
+  ASSERT_FALSE(agentRecoveredOffers->empty());
+
+  // Destroy the created volume.
+  driver.acceptOffers(
+      {agentRecoveredOffers->at(0).id()},
+      {DESTROY_VOLUME(volume.get())},
+      filters);
+
+  AWAIT_READY(volumeDestroyedOffers);
+  ASSERT_FALSE(volumeDestroyedOffers->empty());
+
+  Option<Resource> destroyed;
+
+  foreach (const Resource& resource, volumeDestroyedOffers->at(0).resources()) {
+    if (hasSourceType(resource, Resource::DiskInfo::Source::RAW)) {
+      destroyed = resource;
+      break;
+    }
+  }
+
+  ASSERT_SOME(destroyed);
+  ASSERT_FALSE(destroyed->disk().source().has_id());
+  ASSERT_FALSE(destroyed->disk().source().has_metadata());
+  ASSERT_FALSE(destroyed->disk().source().has_mount());
+
+  // Check if the volume is actually deleted by the test CSI plugin.
+  EXPECT_FALSE(os::exists(volumePath.get()));
+}
+
+
 // This test verifies that a framework can launch a task using a created
 // volume from a storage local resource provider that uses the test CSI
 // plugin, then destroy the volume while it is published.
@@ -568,6 +765,239 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_LaunchTask)
 }
 
 
+// This test verifies that a framework can destroy a volume that was
+// created from a storage pool of a storage local resource provider
+// and used by a task after recovery.
+TEST_F(StorageLocalResourceProviderTest, ROOT_LaunchTaskRecovery)
+{
+  loadUriVolumeProfileModule();
+
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+
+  slave::Flags flags = CreateSlaveFlags();
+  flags.isolation = "filesystem/linux";
+
+  // Disable HTTP authentication to simplify resource provider interactions.
+  flags.authenticate_http_readwrite = false;
+
+  // Set the resource provider capability and other required capabilities.
+  constexpr SlaveInfo::Capability::Type capabilities[] = {
+    SlaveInfo::Capability::MULTI_ROLE,
+    SlaveInfo::Capability::HIERARCHICAL_ROLE,
+    SlaveInfo::Capability::RESERVATION_REFINEMENT,
+    SlaveInfo::Capability::RESOURCE_PROVIDER
+  };
+
+  flags.agent_features = SlaveCapabilities();
+  foreach (SlaveInfo::Capability::Type type, capabilities) {
+    flags.agent_features->add_capabilities()->set_type(type);
+  }
+
+  flags.resource_provider_config_dir = resourceProviderConfigDir;
+  flags.volume_profile_adaptor = URI_VOLUME_PROFILE_ADAPTOR_NAME;
+
+  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags);
+  ASSERT_SOME(slave);
+
+  AWAIT_READY(slaveRegisteredMessage);
+
+  // Register a framework to exercise offer operations.
+  FrameworkInfo framework = DEFAULT_FRAMEWORK_INFO;
+  framework.set_roles(0, "storage");
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, framework, master.get()->pid, DEFAULT_CREDENTIAL);
+
+  // We use the filter explicitly here so that the resources will not
+  // be filtered for 5 seconds (the default).
+  Filters filters;
+  filters.set_refuse_seconds(0);
+
+  EXPECT_CALL(sched, registered(&driver, _, _));
+
+  // The framework is expected to see the following offers in sequence:
+  //   1. One containing a RAW disk resource before `CREATE_VOLUME`.
+  //   2. One containing a MOUNT disk resource after `CREATE_VOLUME`.
+  //   3. One containing the same MOUNT disk resource after `CREADE`,
+  //      `LAUNCH` and `DESTROY`.
+  //   4. One containing the same RAW disk resource after `DESTROY_VOLUME`.
+  //
+  // We set up the expectations for these offers as the test progresses.
+  Future<vector<Offer>> rawDiskOffers;
+  Future<vector<Offer>> volumeCreatedOffers;
+  Future<vector<Offer>> taskFinishedOffers;
+  Future<vector<Offer>> agentRecoveredOffers;
+  Future<vector<Offer>> volumeDestroyedOffers;
+
+  Sequence offers;
+
+  // We are only interested in storage pools and volume created from
+  // them, which have a "default" profile.
+  auto hasSourceType = [](
+      const Resource& r,
+      const Resource::DiskInfo::Source::Type& type) {
+    return r.has_disk() &&
+      r.disk().has_source() &&
+      r.disk().source().has_profile() &&
+      r.disk().source().profile() == "default" &&
+      r.disk().source().type() == type;
+  };
+
+  // Decline offers that contain only the agent's default resources.
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillRepeatedly(DeclineOffers());
+
+  EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource(
+      std::bind(hasSourceType, lambda::_1, Resource::DiskInfo::Source::RAW))))
+    .InSequence(offers)
+    .WillOnce(FutureArg<1>(&rawDiskOffers));
+
+  EXPECT_CALL(sched, offerRescinded(_, _))
+    .Times(AtMost(1));
+
+  driver.start();
+
+  AWAIT_READY(rawDiskOffers);
+  ASSERT_FALSE(rawDiskOffers->empty());
+
+  Option<Resource> source;
+
+  foreach (const Resource& resource, rawDiskOffers->at(0).resources()) {
+    if (hasSourceType(resource, Resource::DiskInfo::Source::RAW)) {
+      source = resource;
+      break;
+    }
+  }
+
+  ASSERT_SOME(source);
+
+  // Create a volume.
+  EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource(
+      std::bind(hasSourceType, lambda::_1, Resource::DiskInfo::Source::MOUNT))))
+    .InSequence(offers)
+    .WillOnce(FutureArg<1>(&volumeCreatedOffers));
+
+  driver.acceptOffers(
+      {rawDiskOffers->at(0).id()},
+      {CREATE_VOLUME(source.get(), Resource::DiskInfo::Source::MOUNT)},
+      filters);
+
+  AWAIT_READY(volumeCreatedOffers);
+  ASSERT_FALSE(volumeCreatedOffers->empty());
+
+  Option<Resource> volume;
+
+  foreach (const Resource& resource, volumeCreatedOffers->at(0).resources()) {
+    if (hasSourceType(resource, Resource::DiskInfo::Source::MOUNT)) {
+      volume = resource;
+      break;
+    }
+  }
+
+  ASSERT_SOME(volume);
+  ASSERT_TRUE(volume->disk().source().has_id());
+  ASSERT_TRUE(volume->disk().source().has_metadata());
+  ASSERT_TRUE(volume->disk().source().has_mount());
+  ASSERT_TRUE(volume->disk().source().mount().has_root());
+  EXPECT_FALSE(path::absolute(volume->disk().source().mount().root()));
+
+  // Check if the volume is actually created by the test CSI plugin.
+  Option<string> volumePath;
+
+  foreach (const Label& label, volume->disk().source().metadata().labels()) {
+    if (label.key() == "path") {
+      volumePath = label.value();
+      break;
+    }
+  }
+
+  ASSERT_SOME(volumePath);
+  EXPECT_TRUE(os::exists(volumePath.get()));
+
+  // Put a file into the volume.
+  ASSERT_SOME(os::touch(path::join(volumePath.get(), "file")));
+
+  // Create a persistent volume on the CSI volume, then launch a task to
+  // use the persistent volume.
+  Resource persistentVolume = volume.get();
+  persistentVolume.mutable_disk()->mutable_persistence()
+    ->set_id(id::UUID::random().toString());
+  persistentVolume.mutable_disk()->mutable_persistence()
+    ->set_principal(framework.principal());
+  persistentVolume.mutable_disk()->mutable_volume()
+    ->set_container_path("volume");
+  persistentVolume.mutable_disk()->mutable_volume()->set_mode(Volume::RW);
+
+  Future<TaskStatus> taskStarting;
+  Future<TaskStatus> taskRunning;
+  Future<TaskStatus> taskFinished;
+
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&taskStarting))
+    .WillOnce(FutureArg<1>(&taskRunning))
+    .WillOnce(FutureArg<1>(&taskFinished));
+
+  EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveResource(
+      persistentVolume)))
+    .InSequence(offers)
+    .WillOnce(FutureArg<1>(&taskFinishedOffers))
+    .WillOnce(FutureArg<1>(&agentRecoveredOffers));
+
+  driver.acceptOffers(
+      {volumeCreatedOffers->at(0).id()},
+      {CREATE(persistentVolume),
+       LAUNCH({createTask(
+           volumeCreatedOffers->at(0).slave_id(),
+           persistentVolume,
+           createCommandInfo("test -f " + path::join("volume", "file")))})},
+      filters);
+
+  AWAIT_READY(taskStarting);
+  EXPECT_EQ(TASK_STARTING, taskStarting->state());
+
+  AWAIT_READY(taskRunning);
+  EXPECT_EQ(TASK_RUNNING, taskRunning->state());
+
+  AWAIT_READY(taskFinished);
+  EXPECT_EQ(TASK_FINISHED, taskFinished->state());
+
+  AWAIT_READY(taskFinishedOffers);
+
+  // Restart the agent.
+  slave.get()->terminate();
+
+  slave = StartSlave(detector.get(), flags);
+  ASSERT_SOME(slave);
+
+  AWAIT_READY(agentRecoveredOffers);
+  ASSERT_FALSE(agentRecoveredOffers->empty());
+
+  // Destroy the persistent volume and the CSI volume.
+  EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveResource(source.get())))
+    .InSequence(offers)
+    .WillOnce(FutureArg<1>(&volumeDestroyedOffers));
+
+  driver.acceptOffers(
+      {agentRecoveredOffers->at(0).id()},
+      {DESTROY(persistentVolume),
+       DESTROY_VOLUME(volume.get())},
+      filters);
+
+  AWAIT_READY(volumeDestroyedOffers);
+  ASSERT_FALSE(volumeDestroyedOffers->empty());
+
+  // Check if the volume is actually deleted by the test CSI plugin.
+  EXPECT_FALSE(os::exists(volumePath.get()));
+}
+
+
 // This test verifies that a framework can convert pre-existing volumes
 // from a storage local resource provider that uses the test CSI plugin
 // into mount or block volumes.


[12/12] mesos git commit: Added logging for container daemon.

Posted by ji...@apache.org.
Added logging for container daemon.

Review: https://reviews.apache.org/r/64711/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/098f9d8c
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/098f9d8c
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/098f9d8c

Branch: refs/heads/master
Commit: 098f9d8cddbdd357c501dde043a28783718b6e37
Parents: db4a6a5
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
Authored: Tue Dec 19 11:25:30 2017 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Tue Dec 19 15:14:22 2017 -0800

----------------------------------------------------------------------
 src/slave/container_daemon.cpp | 25 +++++++++++++++++++++++++
 1 file changed, 25 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/098f9d8c/src/slave/container_daemon.cpp
----------------------------------------------------------------------
diff --git a/src/slave/container_daemon.cpp b/src/slave/container_daemon.cpp
index 2e6c748..d74fa51 100644
--- a/src/slave/container_daemon.cpp
+++ b/src/slave/container_daemon.cpp
@@ -159,6 +159,10 @@ void ContainerDaemonProcess::initialize()
 
 void ContainerDaemonProcess::launchContainer()
 {
+  LOG(INFO)
+    << "Launching container '" << launchCall.launch_container().container_id()
+    << "'";
+
   http::post(
       agentUrl,
       getAuthHeader(authToken),
@@ -179,9 +183,18 @@ void ContainerDaemonProcess::launchContainer()
     }))
     .onReady(defer(self(), &Self::waitContainer))
     .onFailed(defer(self(), [=](const string& failure) {
+      LOG(ERROR)
+        << "Failed to launch container '"
+        << launchCall.launch_container().container_id() << "': " << failure;
+
       terminated.fail(failure);
     }))
     .onDiscarded(defer(self(), [=] {
+      LOG(ERROR)
+        << "Failed to launch container '"
+        << launchCall.launch_container().container_id()
+        << "': future discarded";
+
       terminated.discard();
     }));
 }
@@ -189,6 +202,10 @@ void ContainerDaemonProcess::launchContainer()
 
 void ContainerDaemonProcess::waitContainer()
 {
+  LOG(INFO)
+    << "Waiting for container '" << waitCall.wait_container().container_id()
+    << "'";
+
   http::post(
       agentUrl,
       getAuthHeader(authToken),
@@ -208,9 +225,17 @@ void ContainerDaemonProcess::waitContainer()
     }))
     .onReady(defer(self(), &Self::launchContainer))
     .onFailed(defer(self(), [=](const string& failure) {
+      LOG(ERROR)
+        << "Failed to wait for container '"
+        << waitCall.wait_container().container_id() << "': " << failure;
+
       terminated.fail(failure);
     }))
     .onDiscarded(defer(self(), [=] {
+      LOG(ERROR)
+        << "Failed to wait for container '"
+        << waitCall.wait_container().container_id() << "': future discarded";
+
       terminated.discard();
     }));
 }


[07/12] mesos git commit: Modified SLRP to use the VolumeProfileAdaptor module.

Posted by ji...@apache.org.
Modified SLRP to use the VolumeProfileAdaptor module.

This changes the Storage Local Resource Provider's source of profile
information from only using the default to using a VolumeProfileAdaptor
module.

This patch is based on https://reviews.apache.org/r/64616.

Review: https://reviews.apache.org/r/64658/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/0f3bdd2e
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/0f3bdd2e
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/0f3bdd2e

Branch: refs/heads/master
Commit: 0f3bdd2ee12574111d4ba2938a6d70fd0a3ee4dc
Parents: 28bf089
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
Authored: Tue Dec 19 11:25:10 2017 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Tue Dec 19 15:14:22 2017 -0800

----------------------------------------------------------------------
 src/resource_provider/storage/provider.cpp | 476 ++++++++++++++++++------
 1 file changed, 356 insertions(+), 120 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/0f3bdd2e/src/resource_provider/storage/provider.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp
index 8510a31..a6e4a0f 100644
--- a/src/resource_provider/storage/provider.cpp
+++ b/src/resource_provider/storage/provider.cpp
@@ -297,7 +297,9 @@ public:
       slaveId(_slaveId),
       authToken(_authToken),
       strict(_strict),
-      resourceVersion(id::UUID::random())
+      reconciling(false),
+      resourceVersion(id::UUID::random()),
+      offerOperationSequence("offer-operation-sequence")
   {
     volumeProfileAdaptor = VolumeProfileAdaptor::getAdaptor();
     CHECK_NOTNULL(volumeProfileAdaptor.get());
@@ -314,12 +316,6 @@ public:
   void received(const Event& event);
 
 private:
-  struct ProfileData
-  {
-    csi::VolumeCapability capability;
-    google::protobuf::Map<string, string> parameters;
-  };
-
   struct VolumeData
   {
     VolumeData(const csi::state::VolumeState& _state)
@@ -338,14 +334,22 @@ private:
   Future<Nothing> recover();
   Future<Nothing> recoverServices();
   Future<Nothing> recoverVolumes();
-  Future<Nothing> recoverResources();
-  Future<Nothing> recoverStatusUpdates();
+  Future<Nothing> recoverResourceProviderState();
+  Future<Nothing> recoverProfiles();
   void doReliableRegistration();
   Future<Nothing> reconcileResourceProviderState();
+  Future<Nothing> reconcileStatusUpdates();
   ResourceConversion reconcileResources(
       const Resources& checkpointed,
       const Resources& discovered);
 
+  // Helper for updating the profiles mapping upon receiving an updated
+  // set of profiles from the VolumeProfileAdaptor module.
+  Future<Nothing> updateProfiles();
+
+  // Reconcile the storage pools upon profile updates.
+  Future<Nothing> reconcileProfileUpdates();
+
   // Functions for received events.
   void subscribed(const Event::Subscribed& subscribed);
   void applyOfferOperation(const Event::ApplyOfferOperation& operation);
@@ -368,7 +372,7 @@ private:
   Future<string> createVolume(
       const string& name,
       const Bytes& capacity,
-      const ProfileData& profile);
+      const VolumeProfileAdaptor::ProfileInfo& profileInfo);
   Future<Nothing> deleteVolume(const string& volumeId, bool preExisting);
   Future<string> validateCapability(
       const string& volumeId,
@@ -424,11 +428,19 @@ private:
   csi::VolumeCapability defaultMountCapability;
   csi::VolumeCapability defaultBlockCapability;
   string bootId;
-  hashmap<string, ProfileData> profiles;
   process::grpc::client::Runtime runtime;
   Owned<v1::resource_provider::Driver> driver;
   OfferOperationStatusUpdateManager statusUpdateManager;
 
+  // The mapping of known profiles fetched from the VolumeProfileAdaptor.
+  hashmap<string, VolumeProfileAdaptor::ProfileInfo> profileInfos;
+
+  // The last set of profile names fetched from the VolumeProfileAdaptor.
+  hashset<string> knownProfiles;
+
+  // True if a reconcilition of storage pools is happening.
+  bool reconciling;
+
   ContainerID controllerContainerId;
   ContainerID nodeContainerId;
   hashmap<ContainerID, Owned<ContainerDaemon>> daemons;
@@ -449,6 +461,11 @@ private:
   Resources totalResources;
   id::UUID resourceVersion;
   hashmap<string, VolumeData> volumes;
+
+  // We maintain a sequence to keep track of ongoing volume/block
+  // creation or destroy. These operations will not be sequentialized
+  // through the sequence. It is simply used to wait for them to finish.
+  Sequence offerOperationSequence;
 };
 
 
@@ -456,6 +473,8 @@ void StorageLocalResourceProviderProcess::connected()
 {
   CHECK_EQ(DISCONNECTED, state);
 
+  LOG(INFO) << "Connected to resource provider manager";
+
   state = CONNECTED;
 
   doReliableRegistration();
@@ -469,6 +488,8 @@ void StorageLocalResourceProviderProcess::disconnected()
   LOG(INFO) << "Disconnected from resource provider manager";
 
   state = DISCONNECTED;
+
+  statusUpdateManager.pause();
 }
 
 
@@ -557,12 +578,6 @@ void StorageLocalResourceProviderProcess::initialize()
     }
   }
 
-  // TODO(chhsiao): Use the volume profile module.
-  ProfileData& defaultProfile = profiles["default"];
-  defaultProfile.capability.mutable_mount();
-  defaultProfile.capability.mutable_access_mode()
-    ->set_mode(csi::VolumeCapability::AccessMode::SINGLE_NODE_WRITER);
-
   auto die = [=](const string& message) {
     LOG(ERROR)
       << "Failed to recover resource provider with type '" << info.type()
@@ -594,10 +609,43 @@ Future<Nothing> StorageLocalResourceProviderProcess::recover()
 
   return recoverServices()
     .then(defer(self(), &Self::recoverVolumes))
-    .then(defer(self(), &Self::recoverResources))
+    .then(defer(self(), &Self::recoverResourceProviderState))
+    .then(defer(self(), &Self::recoverProfiles))
     .then(defer(self(), [=]() -> Future<Nothing> {
+      LOG(INFO)
+        << "Finished recovery for resource provider with type '" << info.type()
+        << "' and name '" << info.name();
+
       state = DISCONNECTED;
 
+      statusUpdateManager.pause();
+
+      auto err = [](const string& message) {
+        LOG(ERROR)
+          << "Failed to watch for VolumeprofileAdaptor: " << message;
+      };
+
+      // Start watching the VolumeProfileAdaptor.
+      // TODO(chhsiao): Consider retrying with backoff.
+      loop(
+          self(),
+          [=] {
+            return volumeProfileAdaptor->watch(
+                knownProfiles,
+                info.storage().plugin().type())
+              .then(defer(self(), [=](const hashset<string>& profiles) {
+                // Save the returned set of profiles so that we
+                // can watch the module for changes to it.
+                knownProfiles = profiles;
+
+                return updateProfiles()
+                  .then(defer(self(), &Self::reconcileProfileUpdates));
+              }));
+          },
+          [](Nothing) -> ControlFlow<Nothing> { return Continue(); })
+        .onFailed(std::bind(err, lambda::_1))
+        .onDiscarded(std::bind(err, "future discarded"));
+
       driver.reset(new Driver(
           Owned<EndpointDetector>(new ConstantEndpointDetector(url)),
           contentType,
@@ -814,7 +862,8 @@ Future<Nothing> StorageLocalResourceProviderProcess::recoverVolumes()
 }
 
 
-Future<Nothing> StorageLocalResourceProviderProcess::recoverResources()
+Future<Nothing>
+StorageLocalResourceProviderProcess::recoverResourceProviderState()
 {
   // Recover the resource provider ID and state from the latest
   // symlink. If the symlink does not exist, this is a new resource
@@ -864,7 +913,134 @@ Future<Nothing> StorageLocalResourceProviderProcess::recoverResources()
   return Nothing();
 }
 
-Future<Nothing> StorageLocalResourceProviderProcess::recoverStatusUpdates()
+
+// NOTE: Currently we need to recover profiles for replaying pending
+// `CREATE_VOLUME` or `CREATE_BLOCK` operations after failover. Consider
+// either checkpointing the required profiles for these calls, or
+// checkpointing CSI volume states by volume names instead of IDs.
+Future<Nothing> StorageLocalResourceProviderProcess::recoverProfiles()
+{
+  // Rebuild the set of required profiles from the checkpointed storage
+  // pools (i.e., RAW resources that have no volume ID). We do not need
+  // to resolve profiles for resoures that have volume IDs, since their
+  // volume capabilities are already checkpointed.
+  hashset<string> requiredProfiles;
+  foreach (const Resource& resource, totalResources) {
+    if (!resource.disk().source().has_id()) {
+      requiredProfiles.insert(resource.disk().source().profile());
+    }
+  }
+
+  // If no pending offer operation uses any profile, there is no need
+  // to recover any profile. Watching the VolumeProfileAdaptor will be
+  // initiated later.
+  if (requiredProfiles.empty()) {
+    return Nothing();
+  }
+
+  LOG(INFO)
+    << "Waiting for VolumeProfileAdaptor to recover profiles: "
+    << stringify(requiredProfiles);
+
+  // The VolumeProfileAdapter module must at lest have knowledge of
+  // the required profiles. Because the module is initialized separately
+  // from this resource provider, we must watch the module until all
+  // required profiles have been recovered.
+  return loop(
+      self(),
+      [=] {
+        return volumeProfileAdaptor->watch(
+            knownProfiles,
+            info.storage().plugin().type());
+      },
+      [=](const hashset<string>& profiles) -> ControlFlow<Nothing> {
+        // Save the returned set of profiles so that we can watch the
+        // module for changes to it, both in this loop and after
+        // recovery completes.
+        knownProfiles = profiles;
+
+        foreach (const string& profile, requiredProfiles) {
+          if (!knownProfiles.contains(profile)) {
+            return Continue();
+          }
+        }
+
+        return Break();
+      })
+    .then(defer(self(), &Self::updateProfiles));
+}
+
+
+void StorageLocalResourceProviderProcess::doReliableRegistration()
+{
+  if (state == DISCONNECTED || state == SUBSCRIBED || state == READY) {
+    return;
+  }
+
+  CHECK_EQ(CONNECTED, state);
+
+  Call call;
+  call.set_type(Call::SUBSCRIBE);
+
+  Call::Subscribe* subscribe = call.mutable_subscribe();
+  subscribe->mutable_resource_provider_info()->CopyFrom(info);
+
+  auto err = [](const ResourceProviderInfo& info, const string& message) {
+    LOG(ERROR)
+      << "Failed to subscribe resource provider with type '" << info.type()
+      << "' and name '" << info.name() << "': " << message;
+  };
+
+  driver->send(evolve(call))
+    .onFailed(std::bind(err, info, lambda::_1))
+    .onDiscarded(std::bind(err, info, "future discarded"));
+
+  // TODO(chhsiao): Consider doing an exponential backoff.
+  delay(Seconds(1), self(), &Self::doReliableRegistration);
+}
+
+
+Future<Nothing>
+StorageLocalResourceProviderProcess::reconcileResourceProviderState()
+{
+  return reconcileStatusUpdates()
+    .then(defer(self(), [=] {
+      return collect(list<Future<Resources>>{listVolumes(), getCapacities()})
+        .then(defer(self(), [=](const list<Resources>& discovered) {
+          ResourceConversion conversion = reconcileResources(
+              totalResources,
+              accumulate(discovered.begin(), discovered.end(), Resources()));
+
+          Try<Resources> result = totalResources.apply(conversion);
+          CHECK_SOME(result);
+
+          if (result.get() != totalResources) {
+            LOG(INFO)
+              << "Removing '" << conversion.consumed << "' and adding '"
+              << conversion.converted << "' to the total resources";
+
+            totalResources = result.get();
+            checkpointResourceProviderState();
+          }
+
+          // NOTE: Since this is the first `UPDATE_STATE` call of the
+          // current subscription, there must be no racing speculative
+          // operation, thus no need to update the resource version.
+          sendResourceProviderStateUpdate();
+          statusUpdateManager.resume();
+
+          LOG(INFO)
+            << "Resource provider " << info.id() << " is in READY state";
+
+          state = READY;
+
+          return Nothing();
+        }));
+    }));
+}
+
+
+Future<Nothing> StorageLocalResourceProviderProcess::reconcileStatusUpdates()
 {
   CHECK(info.has_id());
 
@@ -878,8 +1054,6 @@ Future<Nothing> StorageLocalResourceProviderProcess::recoverStatusUpdates()
           resourceProviderDir,
           lambda::_1));
 
-  statusUpdateManager.pause();
-
   Try<list<string>> operationPaths = slave::paths::getOfferOperationPaths(
       slave::paths::getResourceProviderPath(
           metaDir, slaveId, info.type(), info.name(), info.id()));
@@ -999,65 +1173,6 @@ Future<Nothing> StorageLocalResourceProviderProcess::recoverStatusUpdates()
 }
 
 
-void StorageLocalResourceProviderProcess::doReliableRegistration()
-{
-  if (state == DISCONNECTED || state == SUBSCRIBED || state == READY) {
-    return;
-  }
-
-  CHECK_EQ(CONNECTED, state);
-
-  Call call;
-  call.set_type(Call::SUBSCRIBE);
-
-  Call::Subscribe* subscribe = call.mutable_subscribe();
-  subscribe->mutable_resource_provider_info()->CopyFrom(info);
-
-  auto err = [](const ResourceProviderInfo& info, const string& message) {
-    LOG(ERROR)
-      << "Failed to subscribe resource provider with type '" << info.type()
-      << "' and name '" << info.name() << "': " << message;
-  };
-
-  driver->send(evolve(call))
-    .onFailed(std::bind(err, info, lambda::_1))
-    .onDiscarded(std::bind(err, info, "future discarded"));
-
-  // TODO(chhsiao): Consider doing an exponential backoff.
-  delay(Seconds(1), self(), &Self::doReliableRegistration);
-}
-
-
-Future<Nothing>
-StorageLocalResourceProviderProcess::reconcileResourceProviderState()
-{
-  return recoverStatusUpdates()
-    .then(defer(self(), [=] {
-      return collect(list<Future<Resources>>{listVolumes(), getCapacities()})
-        .then(defer(self(), [=](const list<Resources>& discovered) {
-          ResourceConversion conversion = reconcileResources(
-              totalResources,
-              accumulate(discovered.begin(), discovered.end(), Resources()));
-
-          Try<Resources> result = totalResources.apply(conversion);
-          CHECK_SOME(result);
-
-          if (result.get() != totalResources) {
-            totalResources = result.get();
-            checkpointResourceProviderState();
-          }
-
-          sendResourceProviderStateUpdate();
-          statusUpdateManager.resume();
-
-          state = READY;
-
-          return Nothing();
-        }));
-    }));
-}
-
-
 ResourceConversion StorageLocalResourceProviderProcess::reconcileResources(
     const Resources& checkpointed,
     const Resources& discovered)
@@ -1106,6 +1221,97 @@ ResourceConversion StorageLocalResourceProviderProcess::reconcileResources(
 }
 
 
+Future<Nothing> StorageLocalResourceProviderProcess::updateProfiles()
+{
+  LOG(INFO)
+    << "Updating metadata for profiles: " << stringify(knownProfiles);
+
+  list<Future<Nothing>> futures;
+  foreach (const string& profile, knownProfiles) {
+    // Since profiles are immutable after creation and cannot be
+    // deleted, we do not need to update any profile that is already in
+    // the mapping.
+    // TODO(chhsiao): Handle profile deactivation.
+    if (profileInfos.contains(profile)) {
+      continue;
+    }
+
+    futures.push_back(volumeProfileAdaptor->translate(
+        profile, info.storage().plugin().type())
+      .then(defer(self(), [=](const VolumeProfileAdaptor::ProfileInfo& info) {
+        profileInfos.put(profile, info);
+        return Nothing();
+      })));
+  }
+
+  return collect(futures).then([] { return Nothing(); });
+}
+
+
+Future<Nothing> StorageLocalResourceProviderProcess::reconcileProfileUpdates()
+{
+  // Do nothing if the resource provider ID is not known yet, since it
+  // is used to construct the resource metadata of storage pools. The
+  // metadata will be constructed in `reconcileResourceProviderState`.
+  if (!info.has_id()) {
+    return Nothing();
+  }
+
+  CHECK(!reconciling);
+
+  LOG(INFO) << "Reconciling storage pools for resource provider " << info.id();
+
+  reconciling = true;
+
+  // We add a lambda into `offerOperationSequence` so that it will
+  // return after waiting for all pending operations in the sequence.
+  return offerOperationSequence.add(
+      std::function<Future<Nothing>()>([] { return Nothing(); }))
+    .then(defer(self(), &Self::getCapacities))
+    .then(defer(self(), [=](const Resources& discovered) {
+      auto isStoragePool = [](const Resource& r) {
+        return !r.disk().source().has_id();
+      };
+
+      ResourceConversion conversion = reconcileResources(
+          totalResources.filter(isStoragePool),
+          discovered);
+
+      Try<Resources> result = totalResources.apply(conversion);
+      CHECK_SOME(result);
+
+      if (result.get() != totalResources) {
+        LOG(INFO)
+          << "Removing '" << conversion.consumed << "' and adding '"
+          << conversion.converted << "' to the total resources";
+
+        totalResources = result.get();
+        checkpointResourceProviderState();
+
+        // NOTE: We ensure that the first `UPDATE_STATE` of the current
+        // subscription is sent by `reconcileResourceProviderState`, so
+        // that the total resources contain existing volumes.
+        if (state == READY) {
+          // NOTE: We always update the resource version before sending
+          // an `UPDATE_STATE`, so that any racing speculative operation
+          // will be rejected. Otherwise, the speculative resource
+          // conversion done on the master will be cancelled out.
+          resourceVersion = id::UUID::random();
+          sendResourceProviderStateUpdate();
+        }
+      }
+
+      LOG(INFO)
+        << "Finished reconciliation of storage pools for resource provider "
+        << info.id();
+
+      reconciling = false;
+
+      return Nothing();
+    }));
+}
+
+
 void StorageLocalResourceProviderProcess::subscribed(
     const Event::Subscribed& subscribed)
 {
@@ -1144,9 +1350,6 @@ void StorageLocalResourceProviderProcess::subscribed(
 void StorageLocalResourceProviderProcess::applyOfferOperation(
     const Event::ApplyOfferOperation& operation)
 {
-  // NOTE: If we receive an offer operation in SUBSCRIBED state, there
-  // must be a resource version mismatch since the current resource
-  // version is not reported yet.
   CHECK(state == SUBSCRIBED || state == READY);
 
   Try<id::UUID> uuid = id::UUID::fromBytes(operation.operation_uuid().value());
@@ -1177,7 +1380,13 @@ void StorageLocalResourceProviderProcess::applyOfferOperation(
 
   CHECK_SOME(operationVersion);
 
-  if (operationVersion.get() != resourceVersion) {
+  if (state == SUBSCRIBED) {
+    result = updateOfferOperationStatus(uuid.get(), Error(
+        "Cannot apply offer operation in SUBSCRIBED state"));
+  } else if (reconciling) {
+    result = updateOfferOperationStatus(uuid.get(), Error(
+        "Cannot apply offer operation when reconciling storage pools"));
+  } else if (operationVersion.get() != resourceVersion) {
     result = updateOfferOperationStatus(uuid.get(), Error(
         "Mismatched resource version " + stringify(operationVersion.get()) +
         " (expected: " + stringify(resourceVersion) + ")"));
@@ -2059,7 +2268,7 @@ Future<Nothing> StorageLocalResourceProviderProcess::nodeUnpublish(
 Future<string> StorageLocalResourceProviderProcess::createVolume(
     const string& name,
     const Bytes& capacity,
-    const ProfileData& profile)
+    const VolumeProfileAdaptor::ProfileInfo& profileInfo)
 {
   // NOTE: This can only be called after `prepareControllerService`.
   CHECK_SOME(controllerCapabilities);
@@ -2077,8 +2286,8 @@ Future<string> StorageLocalResourceProviderProcess::createVolume(
         ->set_required_bytes(capacity.bytes());
       request.mutable_capacity_range()
         ->set_limit_bytes(capacity.bytes());
-      request.add_volume_capabilities()->CopyFrom(profile.capability);
-      *request.mutable_parameters() = profile.parameters;
+      request.add_volume_capabilities()->CopyFrom(profileInfo.capability);
+      *request.mutable_parameters() = profileInfo.parameters;
 
       return client.CreateVolume(request)
         .then(defer(self(), [=](const csi::CreateVolumeResponse& response) {
@@ -2094,7 +2303,7 @@ Future<string> StorageLocalResourceProviderProcess::createVolume(
             csi::state::VolumeState volumeState;
             volumeState.set_state(csi::state::VolumeState::CREATED);
             volumeState.mutable_volume_capability()
-              ->CopyFrom(profile.capability);
+              ->CopyFrom(profileInfo.capability);
             *volumeState.mutable_volume_attributes() = volumeInfo.attributes();
 
             volumes.put(volumeInfo.id(), std::move(volumeState));
@@ -2308,11 +2517,18 @@ Future<Resources> StorageLocalResourceProviderProcess::getCapacities()
     .then(defer(self(), [=](csi::Client client) {
       list<Future<Resources>> futures;
 
-      foreachpair (const string& profile, const ProfileData& data, profiles) {
+      foreach (const string& profile, knownProfiles) {
+        CHECK(profileInfos.contains(profile));
+
+        // TODO(chhsiao): Skip inactive profiles.
+
+        const VolumeProfileAdaptor::ProfileInfo& profileInfo =
+          profileInfos.at(profile);
+
         csi::GetCapacityRequest request;
         request.mutable_version()->CopyFrom(csiVersion);
-        request.add_volume_capabilities()->CopyFrom(data.capability);
-        *request.mutable_parameters() = data.parameters;
+        request.add_volume_capabilities()->CopyFrom(profileInfo.capability);
+        *request.mutable_parameters() = profileInfo.parameters;
 
         futures.push_back(client.GetCapacity(request)
           .then(defer(self(), [=](
@@ -2348,6 +2564,7 @@ Future<Nothing> StorageLocalResourceProviderProcess::_applyOfferOperation(
   CHECK(!protobuf::isTerminalState(operation.latest_status().state()));
 
   Future<vector<ResourceConversion>> conversions;
+  Option<Resource> source;
 
   switch (operation.info().type()) {
     case Offer::Operation::RESERVE:
@@ -2364,8 +2581,9 @@ Future<Nothing> StorageLocalResourceProviderProcess::_applyOfferOperation(
     case Offer::Operation::CREATE_VOLUME: {
       CHECK(operation.info().has_create_volume());
 
+      source = operation.info().create_volume().source();
       conversions = applyCreateVolumeOrBlock(
-          operation.info().create_volume().source(),
+          source.get(),
           operationUuid,
           operation.info().create_volume().target_type());
 
@@ -2374,16 +2592,17 @@ Future<Nothing> StorageLocalResourceProviderProcess::_applyOfferOperation(
     case Offer::Operation::DESTROY_VOLUME: {
       CHECK(operation.info().has_destroy_volume());
 
-      conversions = applyDestroyVolumeOrBlock(
-          operation.info().destroy_volume().volume());
+      source = operation.info().destroy_volume().volume();
+      conversions = applyDestroyVolumeOrBlock(source.get());
 
       break;
     }
     case Offer::Operation::CREATE_BLOCK: {
       CHECK(operation.info().has_create_block());
 
+      source = operation.info().create_block().source();
       conversions = applyCreateVolumeOrBlock(
-          operation.info().create_block().source(),
+          source.get(),
           operationUuid,
           Resource::DiskInfo::Source::BLOCK);
 
@@ -2392,8 +2611,8 @@ Future<Nothing> StorageLocalResourceProviderProcess::_applyOfferOperation(
     case Offer::Operation::DESTROY_BLOCK: {
       CHECK(operation.info().has_destroy_block());
 
-      conversions = applyDestroyVolumeOrBlock(
-          operation.info().destroy_block().block());
+      source = operation.info().destroy_block().block();
+      conversions = applyDestroyVolumeOrBlock(source.get());
 
       break;
     }
@@ -2428,7 +2647,17 @@ Future<Nothing> StorageLocalResourceProviderProcess::_applyOfferOperation(
           updateOfferOperationStatus(operationUuid, conversions));
     }));
 
-  return promise->future();
+  Future<Nothing> future = promise->future();
+
+  CHECK_SOME(source);
+  if (source->disk().source().has_profile()) {
+    // We place the future in `offerOperationSequence` so it can be
+    // waited for during reconciliation upon profile updates.
+    offerOperationSequence.add(
+        std::function<Future<Nothing>()>([future] { return future; }));
+  }
+
+  return future;
 }
 
 
@@ -2468,8 +2697,11 @@ StorageLocalResourceProviderProcess::applyCreateVolumeOrBlock(
     case Resource::DiskInfo::Source::PATH:
     case Resource::DiskInfo::Source::MOUNT: {
       if (resource.disk().source().has_profile()) {
-        CHECK(profiles.contains(resource.disk().source().profile()));
-        if (!profiles.at(resource.disk().source().profile())
+        CHECK(profileInfos.contains(resource.disk().source().profile()));
+
+        // TODO(chhsiao): Reject if the source has an inactive profile.
+
+        if (!profileInfos.at(resource.disk().source().profile())
                .capability.has_mount()) {
           return Failure(
               "Profile '" + resource.disk().source().profile() +
@@ -2482,7 +2714,7 @@ StorageLocalResourceProviderProcess::applyCreateVolumeOrBlock(
         created = createVolume(
             operationUuid.toString(),
             Bytes(resource.scalar().value(), Bytes::MEGABYTES),
-            profiles.at(resource.disk().source().profile()));
+            profileInfos.at(resource.disk().source().profile()));
       } else {
         const string& volumeId = resource.disk().source().id();
 
@@ -2508,8 +2740,11 @@ StorageLocalResourceProviderProcess::applyCreateVolumeOrBlock(
     }
     case Resource::DiskInfo::Source::BLOCK: {
       if (resource.disk().source().has_profile()) {
-        CHECK(profiles.contains(resource.disk().source().profile()));
-        if (!profiles.at(resource.disk().source().profile())
+        CHECK(profileInfos.contains(resource.disk().source().profile()));
+
+        // TODO(chhsiao): Reject if the source has an inactive profile.
+
+        if (!profileInfos.at(resource.disk().source().profile())
                .capability.has_block()) {
           return Failure(
               "Profile '" + resource.disk().source().profile() +
@@ -2522,7 +2757,7 @@ StorageLocalResourceProviderProcess::applyCreateVolumeOrBlock(
         created = createVolume(
             operationUuid.toString(),
             Bytes(resource.scalar().value(), Bytes::MEGABYTES),
-            profiles.at(resource.disk().source().profile()));
+            profileInfos.at(resource.disk().source().profile()));
       } else {
         const string& volumeId = resource.disk().source().id();
 
@@ -2633,6 +2868,8 @@ StorageLocalResourceProviderProcess::applyDestroyVolumeOrBlock(
           resource.disk().source().id(),
           !resource.disk().source().has_profile())))
     .then(defer(self(), [=]() {
+      // TODO(chhsiao): Convert to an empty resource and update all
+      // storage pools if the profile has been deactivated.
       Resource converted = resource;
       converted.mutable_disk()->mutable_source()->set_type(
           Resource::DiskInfo::Source::RAW);
@@ -2698,20 +2935,6 @@ Try<Nothing> StorageLocalResourceProviderProcess::updateOfferOperationStatus(
 
   operation.add_statuses()->CopyFrom(operation.latest_status());
 
-  if (error.isSome()) {
-    // We only update the resource version for failed conventional
-    // operations, which are speculatively executed on the master.
-    if (operation.info().type() == Offer::Operation::RESERVE ||
-        operation.info().type() == Offer::Operation::UNRESERVE ||
-        operation.info().type() == Offer::Operation::CREATE ||
-        operation.info().type() == Offer::Operation::DESTROY) {
-      resourceVersion = id::UUID::random();
-
-      // Send an `UPDATE_STATE` after we finish the current operation.
-      dispatch(self(), &Self::sendResourceProviderStateUpdate);
-    }
-  }
-
   checkpointResourceProviderState();
 
   // Send out the status update for the offer operation.
@@ -2735,7 +2958,20 @@ Try<Nothing> StorageLocalResourceProviderProcess::updateOfferOperationStatus(
     .onFailed(defer(self(), std::bind(die, lambda::_1)))
     .onDiscarded(defer(self(), std::bind(die, "future discarded")));
 
-  return error.isNone() ? Nothing() : Try<Nothing>::error(error.get());
+  if (error.isSome()) {
+    // We only send `UPDATE_STATE` for failed speculative operations.
+    if (operation.info().type() == Offer::Operation::RESERVE ||
+        operation.info().type() == Offer::Operation::UNRESERVE ||
+        operation.info().type() == Offer::Operation::CREATE ||
+        operation.info().type() == Offer::Operation::DESTROY) {
+      resourceVersion = id::UUID::random();
+      sendResourceProviderStateUpdate();
+    }
+
+    return error.get();
+  }
+
+  return Nothing();
 }
 
 


[11/12] mesos git commit: Fixed a few typos in SLRP.

Posted by ji...@apache.org.
Fixed a few typos in SLRP.


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/edf8b707
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/edf8b707
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/edf8b707

Branch: refs/heads/master
Commit: edf8b7076d5fdbc6afd623e47f9f17c7aeda178b
Parents: 78cc4e9
Author: Jie Yu <yu...@gmail.com>
Authored: Tue Dec 19 11:37:58 2017 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Tue Dec 19 15:14:22 2017 -0800

----------------------------------------------------------------------
 src/resource_provider/storage/provider.cpp | 14 +++++++-------
 1 file changed, 7 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/edf8b707/src/resource_provider/storage/provider.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp
index 772952e..6fd82ba 100644
--- a/src/resource_provider/storage/provider.cpp
+++ b/src/resource_provider/storage/provider.cpp
@@ -443,7 +443,7 @@ private:
   // The last set of profile names fetched from the VolumeProfileAdaptor.
   hashset<string> knownProfiles;
 
-  // True if a reconcilition of storage pools is happening.
+  // True if a reconciliation of storage pools is happening.
   bool reconciling;
 
   ContainerID controllerContainerId;
@@ -850,7 +850,7 @@ Future<Nothing> StorageLocalResourceProviderProcess::recoverVolumes()
         }
 
         // NOTE: We avoid using a default clause for the following
-        // values in proto3's open enum to enable the compiler to detcet
+        // values in proto3's open enum to enable the compiler to detect
         // missing enum cases for us. See:
         // https://github.com/google/protobuf/issues/3917
         case google::protobuf::kint32min:
@@ -927,7 +927,7 @@ Future<Nothing> StorageLocalResourceProviderProcess::recoverProfiles()
 {
   // Rebuild the set of required profiles from the checkpointed storage
   // pools (i.e., RAW resources that have no volume ID). We do not need
-  // to resolve profiles for resoures that have volume IDs, since their
+  // to resolve profiles for resources that have volume IDs, since their
   // volume capabilities are already checkpointed.
   hashset<string> requiredProfiles;
   foreach (const Resource& resource, totalResources) {
@@ -947,7 +947,7 @@ Future<Nothing> StorageLocalResourceProviderProcess::recoverProfiles()
     << "Waiting for VolumeProfileAdaptor to recover profiles: "
     << stringify(requiredProfiles);
 
-  // The VolumeProfileAdapter module must at lest have knowledge of
+  // The VolumeProfileAdapter module must at least have knowledge of
   // the required profiles. Because the module is initialized separately
   // from this resource provider, we must watch the module until all
   // required profiles have been recovered.
@@ -1503,7 +1503,7 @@ void StorageLocalResourceProviderProcess::publishResources(
             }
 
             // NOTE: We avoid using a default clause for the following
-            // values in proto3's open enum to enable the compiler to detcet
+            // values in proto3's open enum to enable the compiler to detect
             // missing enum cases for us. See:
             // https://github.com/google/protobuf/issues/3917
             case google::protobuf::kint32min:
@@ -2717,7 +2717,7 @@ StorageLocalResourceProviderProcess::applyCreateVolumeOrBlock(
   // the same volume will be returned when recovering from a failover).
   //
   // For 2, there are two scenarios:
-  //   a. If the volume has a checkpointed state (becasue it was created
+  //   a. If the volume has a checkpointed state (because it was created
   //      by a previous resource provider), we simply check if its
   //      checkpointed capability supports the conversion.
   //   b. If the volume is newly discovered, `ValidateVolumeCapabilities`
@@ -2938,7 +2938,7 @@ Try<Nothing> StorageLocalResourceProviderProcess::updateOfferOperationStatus(
   OfferOperation& operation = offerOperations.at(operationUuid);
 
   if (conversions.isSome()) {
-    // Strip away the allocation info when applying the convertion to
+    // Strip away the allocation info when applying the conversion to
     // the total resources.
     vector<ResourceConversion> _conversions;
     foreach (ResourceConversion conversion, conversions.get()) {


[06/12] mesos git commit: Refactored and fixed bugs for SLRP resource reconciliation.

Posted by ji...@apache.org.
Refactored and fixed bugs for SLRP resource reconciliation.

Review: https://reviews.apache.org/r/64644/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/28bf0891
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/28bf0891
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/28bf0891

Branch: refs/heads/master
Commit: 28bf0891bf0b985153fa129e69fda0b7fd97d456
Parents: b36152d
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
Authored: Tue Dec 19 11:25:06 2017 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Tue Dec 19 15:14:22 2017 -0800

----------------------------------------------------------------------
 src/resource_provider/storage/provider.cpp | 261 ++++++++++++------------
 1 file changed, 133 insertions(+), 128 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/28bf0891/src/resource_provider/storage/provider.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp
index a103494..8510a31 100644
--- a/src/resource_provider/storage/provider.cpp
+++ b/src/resource_provider/storage/provider.cpp
@@ -342,6 +342,9 @@ private:
   Future<Nothing> recoverStatusUpdates();
   void doReliableRegistration();
   Future<Nothing> reconcileResourceProviderState();
+  ResourceConversion reconcileResources(
+      const Resources& checkpointed,
+      const Resources& discovered);
 
   // Functions for received events.
   void subscribed(const Event::Subscribed& subscribed);
@@ -358,7 +361,6 @@ private:
 
   Future<Nothing> prepareControllerService();
   Future<Nothing> prepareNodeService();
-  Future<Resources> discoverResources();
   Future<Nothing> controllerPublish(const string& volumeId);
   Future<Nothing> controllerUnpublish(const string& volumeId);
   Future<Nothing> nodePublish(const string& volumeId);
@@ -372,7 +374,8 @@ private:
       const string& volumeId,
       const Option<Labels>& metadata,
       const csi::VolumeCapability& capability);
-  Future<Resources> getCapacities(const hashmap<string, ProfileData>& profiles);
+  Future<Resources> listVolumes();
+  Future<Resources> getCapacities();
 
   Future<Nothing> _applyOfferOperation(const id::UUID& operationUuid);
 
@@ -969,6 +972,8 @@ Future<Nothing> StorageLocalResourceProviderProcess::recoverStatusUpdates()
       // We replay all pending operations here, so that if a volume is
       // created or deleted before the last failover, the result will be
       // reflected in the total resources before reconciliation.
+      list<Future<Nothing>> futures;
+
       foreachpair (const id::UUID& uuid,
                    const OfferOperation& operation,
                    offerOperations) {
@@ -982,12 +987,14 @@ Future<Nothing> StorageLocalResourceProviderProcess::recoverStatusUpdates()
             << message;
         };
 
-        _applyOfferOperation(uuid)
+        futures.push_back(_applyOfferOperation(uuid)
           .onFailed(std::bind(err, uuid, lambda::_1))
-          .onDiscarded(std::bind(err, uuid, "future discarded"));
+          .onDiscarded(std::bind(err, uuid, "future discarded")));
       }
 
-      return Nothing();
+      // We await the futures instead of collect them because it is OK
+      // for offer operations to fail.
+      return await(futures).then([] { return Nothing(); });
     }));
 }
 
@@ -1025,69 +1032,77 @@ Future<Nothing>
 StorageLocalResourceProviderProcess::reconcileResourceProviderState()
 {
   return recoverStatusUpdates()
-    .then(defer(self(), &Self::discoverResources))
-    .then(defer(self(), [=](Resources discoveredResources) {
-      // NODE: If a resource in the checkpointed total resources is
-      // missing in the discovered resources, we will still keep it if
-      // it is converted by an offer operation before (i.e., has extra
-      // info other than the default reservations). The reason is that
-      // we want to maintain a consistent view with frameworks, and do
-      // not want to lose any data on persistent volumes due to some
-      // temporarily CSI plugin faults. Other missing resources that are
-      // "unconverted" by any framework will be removed from the total
-      // resources. Then, any newly discovered resource will be reported
-      // under the default reservations.
-
-      Resources result;
-      Resources unconvertedTotal;
-
-      foreach (const Resource& resource, totalResources) {
-        Resource unconverted = createRawDiskResource(
-            info,
-            Bytes(resource.scalar().value(), Bytes::MEGABYTES),
-            resource.disk().source().has_profile()
-              ? resource.disk().source().profile() : Option<string>::none(),
-            resource.disk().source().has_id()
-              ? resource.disk().source().id() : Option<string>::none(),
-            resource.disk().source().has_metadata()
-              ? resource.disk().source().metadata() : Option<Labels>::none());
-        if (discoveredResources.contains(unconverted)) {
-          // The checkponited resource appears in the discovered resources.
-          result += resource;
-          unconvertedTotal += unconverted;
-        } else if (!totalResources.contains(unconverted)) {
-          // The checkpointed resource is missing but converted by a
-          // framework or the operator before, so we keep it.
-          result += resource;
-
-          LOG(WARNING)
-            << "Missing converted resource '" << resource
-            << "'. This might cause further offer operations to fail.";
-        }
-      }
-
-      // NOTE: The states of newly discovered pre-existing volumes will
-      // be added to `volumes` when `CREATE_VOLUME` or `CREATE_BLOCK`
-      // operations are applied.
-      const Resources newResources = discoveredResources - unconvertedTotal;
-      result += newResources;
+    .then(defer(self(), [=] {
+      return collect(list<Future<Resources>>{listVolumes(), getCapacities()})
+        .then(defer(self(), [=](const list<Resources>& discovered) {
+          ResourceConversion conversion = reconcileResources(
+              totalResources,
+              accumulate(discovered.begin(), discovered.end(), Resources()));
+
+          Try<Resources> result = totalResources.apply(conversion);
+          CHECK_SOME(result);
+
+          if (result.get() != totalResources) {
+            totalResources = result.get();
+            checkpointResourceProviderState();
+          }
 
-      LOG(INFO) << "Adding new resources '" << newResources << "'";
+          sendResourceProviderStateUpdate();
+          statusUpdateManager.resume();
 
-      // TODO(chhsiao): Check that all profiles exist.
+          state = READY;
 
-      if (result != totalResources) {
-        totalResources = result;
-        checkpointResourceProviderState();
-      }
+          return Nothing();
+        }));
+    }));
+}
 
-      sendResourceProviderStateUpdate();
-      statusUpdateManager.resume();
 
-      state = READY;
+ResourceConversion StorageLocalResourceProviderProcess::reconcileResources(
+    const Resources& checkpointed,
+    const Resources& discovered)
+{
+  // NOTE: If a resource in the checkpointed resources is missing in the
+  // discovered resources, we will still keep it if it is converted by
+  // an offer operation before (i.e., has extra info other than the
+  // default reservations). The reason is that we want to maintain a
+  // consistent view with frameworks, and do not want to lose any data on
+  // persistent volumes due to some temporarily CSI plugin faults. Other
+  // missing resources that are "unconverted" by any framework will be
+  // removed. Then, any newly discovered resource will be added.
+  Resources toRemove;
+  Resources toAdd = discovered;
+
+  foreach (const Resource& resource, checkpointed) {
+    Resource unconverted = createRawDiskResource(
+        info,
+        Bytes(resource.scalar().value(), Bytes::MEGABYTES),
+        resource.disk().source().has_profile()
+          ? resource.disk().source().profile() : Option<string>::none(),
+        resource.disk().source().has_id()
+          ? resource.disk().source().id() : Option<string>::none(),
+        resource.disk().source().has_metadata()
+          ? resource.disk().source().metadata() : Option<Labels>::none());
+
+    if (toAdd.contains(unconverted)) {
+      // If the remaining of the discovered resources contain the
+      // "unconverted" version of a checkpointed resource, this is not a
+      // new resource.
+      toAdd -= unconverted;
+    } else if (checkpointed.contains(unconverted)) {
+      // If the remaining of the discovered resources does not contain
+      // the "unconverted" version of the checkpointed resource, the
+      // resource is missing. However, if it remains unconverted in the
+      // checkpoint, we can safely remove it from the total resources.
+      toRemove += unconverted;
+    } else {
+      LOG(WARNING)
+        << "Missing converted resource '" << resource
+        << "'. This might cause further offer operations to fail.";
+    }
+  }
 
-      return Nothing();
-    }));
+  return ResourceConversion(std::move(toRemove), std::move(toAdd));
 }
 
 
@@ -1786,67 +1801,6 @@ Future<Nothing> StorageLocalResourceProviderProcess::prepareNodeService()
 }
 
 
-// Returns resources reported by the CSI plugin, which are unreserved
-// raw disk resources without any persistent volume.
-Future<Resources> StorageLocalResourceProviderProcess::discoverResources()
-{
-  // NOTE: This can only be called after `prepareControllerService` and
-  // the resource provider ID has been obtained.
-  CHECK_SOME(controllerCapabilities);
-  CHECK(info.has_id());
-
-  list<Future<Resources>> futures;
-  futures.push_back(getCapacities(profiles));
-
-  if (controllerCapabilities->listVolumes) {
-    futures.push_back(getService(controllerContainerId)
-      .then(defer(self(), [=](csi::Client client) {
-        // TODO(chhsiao): Set the max entries and use a loop to do
-        // mutliple `ListVolumes` calls.
-        csi::ListVolumesRequest request;
-        request.mutable_version()->CopyFrom(csiVersion);
-
-        return client.ListVolumes(request)
-          .then(defer(self(), [=](const csi::ListVolumesResponse& response) {
-            Resources resources;
-
-            // Recover volume profiles from the checkpointed state.
-            hashmap<string, string> volumesToProfiles;
-            foreach (const Resource& resource, totalResources) {
-              if (resource.disk().source().has_id() &&
-                  resource.disk().source().has_profile()) {
-                volumesToProfiles.put(
-                    resource.disk().source().id(),
-                    resource.disk().source().profile());
-              }
-            }
-
-            foreach (const auto& entry, response.entries()) {
-              resources += createRawDiskResource(
-                  info,
-                  Bytes(entry.volume_info().capacity_bytes()),
-                  volumesToProfiles.contains(entry.volume_info().id())
-                    ? volumesToProfiles.at(entry.volume_info().id())
-                    : Option<string>::none(),
-                  entry.volume_info().id(),
-                  entry.volume_info().attributes().empty()
-                    ? Option<Labels>::none()
-                    : convertStringMapToLabels(
-                          entry.volume_info().attributes()));
-            }
-
-            return resources;
-          }));
-      })));
-  }
-
-  return collect(futures)
-    .then(defer(self(), [=](const list<Resources>& resources) {
-      return accumulate(resources.begin(), resources.end(), Resources());
-    }));
-}
-
-
 Future<Nothing> StorageLocalResourceProviderProcess::controllerPublish(
     const string& volumeId)
 {
@@ -2284,17 +2238,68 @@ Future<string> StorageLocalResourceProviderProcess::validateCapability(
 }
 
 
-// Returns RAW disk resources for specified profiles.
-Future<Resources> StorageLocalResourceProviderProcess::getCapacities(
-    const hashmap<string, ProfileData>& profiles)
+Future<Resources> StorageLocalResourceProviderProcess::listVolumes()
+{
+  // NOTE: This can only be called after `prepareControllerService` and
+  // the resource provider ID has been obtained.
+  CHECK_SOME(controllerCapabilities);
+  CHECK(info.has_id());
+
+  // This is only used for reconciliation so no failure is returned.
+  if (!controllerCapabilities->listVolumes) {
+    return Resources();
+  }
+
+  return getService(controllerContainerId)
+    .then(defer(self(), [=](csi::Client client) {
+      // TODO(chhsiao): Set the max entries and use a loop to do
+      // mutliple `ListVolumes` calls.
+      csi::ListVolumesRequest request;
+      request.mutable_version()->CopyFrom(csiVersion);
+
+      return client.ListVolumes(request)
+        .then(defer(self(), [=](const csi::ListVolumesResponse& response) {
+          Resources resources;
+
+          // Recover volume profiles from the checkpointed state.
+          hashmap<string, string> volumesToProfiles;
+          foreach (const Resource& resource, totalResources) {
+            if (resource.disk().source().has_id() &&
+                resource.disk().source().has_profile()) {
+              volumesToProfiles.put(
+                  resource.disk().source().id(),
+                  resource.disk().source().profile());
+            }
+          }
+
+          foreach (const auto& entry, response.entries()) {
+            resources += createRawDiskResource(
+                info,
+                Bytes(entry.volume_info().capacity_bytes()),
+                volumesToProfiles.contains(entry.volume_info().id())
+                  ? volumesToProfiles.at(entry.volume_info().id())
+                  : Option<string>::none(),
+                entry.volume_info().id(),
+                entry.volume_info().attributes().empty()
+                  ? Option<Labels>::none()
+                  : convertStringMapToLabels(
+                        entry.volume_info().attributes()));
+          }
+
+          return resources;
+        }));
+    }));
+}
+
+
+Future<Resources> StorageLocalResourceProviderProcess::getCapacities()
 {
   // NOTE: This can only be called after `prepareControllerService` and
   // the resource provider ID has been obtained.
   CHECK_SOME(controllerCapabilities);
   CHECK(info.has_id());
 
-  // We do not return a failure because this is always called when a
-  // profile is added or a `CreateVolume` CSI call is made.
+  // This is only used for reconciliation so no failure is returned.
   if (!controllerCapabilities->getCapacity) {
     return Resources();
   }