You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bb...@apache.org on 2017/10/05 15:30:39 UTC

[1/5] mesos git commit: Added a master-registry backed resource provider manager registry.

Repository: mesos
Updated Branches:
  refs/heads/master 38af943ad -> 46db7e4f2


Added a master-registry backed resource provider manager registry.

This patch adds an implementation of the resource provider registrar
backed by the master's registrar. With that it becomes possible to
persist resource provider manager state in a single master registrar,
but use the narrowly defined resource provider registry.

Review: https://reviews.apache.org/r/62353


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/46db7e4f
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/46db7e4f
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/46db7e4f

Branch: refs/heads/master
Commit: 46db7e4f27831d20244a57b22a70312f2a574395
Parents: 8db88af
Author: Benjamin Bannier <bb...@gmail.com>
Authored: Tue Aug 15 11:35:15 2017 +0200
Committer: Benjamin Bannier <bb...@apache.org>
Committed: Thu Oct 5 16:50:43 2017 +0200

----------------------------------------------------------------------
 src/master/registry.proto                     |  7 ++
 src/resource_provider/registrar.cpp           | 97 +++++++++++++++++++++-
 src/resource_provider/registrar.hpp           | 27 ++++++
 src/tests/resource_provider_manager_tests.cpp | 37 +++++++++
 4 files changed, 167 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/46db7e4f/src/master/registry.proto
----------------------------------------------------------------------
diff --git a/src/master/registry.proto b/src/master/registry.proto
index 8916ad3..cac0c31 100644
--- a/src/master/registry.proto
+++ b/src/master/registry.proto
@@ -17,9 +17,13 @@
 syntax = "proto2";
 
 import "mesos/mesos.proto";
+
 import "mesos/maintenance/maintenance.proto";
+
 import "mesos/quota/quota.proto";
 
+import "resource_provider/registry.proto";
+
 package mesos.internal;
 
 /**
@@ -123,4 +127,7 @@ message Registry {
   // A list of recorded weights in the cluster, a newly elected master shall
   // reconstruct it from the registry.
   repeated Weight weights = 6;
+
+  // All known resource providers.
+  optional resource_provider.registry.Registry resource_provider_registry = 9;
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/46db7e4f/src/resource_provider/registrar.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/registrar.cpp b/src/resource_provider/registrar.cpp
index 5dfcb89..ef66c12 100644
--- a/src/resource_provider/registrar.cpp
+++ b/src/resource_provider/registrar.cpp
@@ -42,12 +42,13 @@
 #include <stout/path.hpp>
 #include <stout/unreachable.hpp>
 
+#include "master/registrar.hpp"
+
 #include "slave/paths.hpp"
 
 
 using std::deque;
 using std::string;
-using std::unique_ptr;
 
 using mesos::resource_provider::registry::Registry;
 using mesos::resource_provider::registry::ResourceProvider;
@@ -73,6 +74,7 @@ using process::terminate;
 using process::wait;
 
 
+namespace master = mesos::internal::master;
 namespace slave = mesos::internal::slave;
 
 namespace mesos {
@@ -102,6 +104,12 @@ Try<Owned<Registrar>> Registrar::create(
 }
 
 
+Try<Owned<Registrar>> Registrar::create(master::Registrar* registrar)
+{
+  return new MasterRegistrar(registrar);
+}
+
+
 AdmitResourceProvider::AdmitResourceProvider(const ResourceProviderID& _id)
   : id(_id) {}
 
@@ -371,5 +379,92 @@ Future<bool> AgentRegistrar::apply(Owned<Operation> operation)
       std::move(operation));
 }
 
+
+class MasterRegistrarProcess : public Process<MasterRegistrarProcess>
+{
+  // A helper class for adapting operations on the resource provider
+  // registry to the master registry.
+  class AdaptedOperation : public master::Operation
+  {
+  public:
+    AdaptedOperation(Owned<Registrar::Operation> operation);
+
+  private:
+    Try<bool> perform(internal::Registry* registry, hashset<SlaveID>*) override;
+
+    Owned<Registrar::Operation> operation;
+
+    AdaptedOperation(const AdaptedOperation&) = delete;
+    AdaptedOperation(AdaptedOperation&&) = default;
+    AdaptedOperation& operator=(const AdaptedOperation&) = delete;
+    AdaptedOperation& operator=(AdaptedOperation&&) = default;
+  };
+
+public:
+  explicit MasterRegistrarProcess(master::Registrar* registrar);
+
+  Future<bool> apply(Owned<Registrar::Operation> operation);
+
+private:
+  master::Registrar* registrar = nullptr;
+};
+
+
+MasterRegistrarProcess::AdaptedOperation::AdaptedOperation(
+    Owned<Registrar::Operation> operation)
+  : operation(std::move(operation)) {}
+
+
+Try<bool> MasterRegistrarProcess::AdaptedOperation::perform(
+    internal::Registry* registry,
+    hashset<SlaveID>*)
+{
+  return (*operation)(registry->mutable_resource_provider_registry());
+}
+
+
+MasterRegistrarProcess::MasterRegistrarProcess(master::Registrar* _registrar)
+  : ProcessBase(process::ID::generate("resource-provider-agent-registrar")),
+    registrar(_registrar) {}
+
+
+Future<bool> MasterRegistrarProcess::apply(
+    Owned<Registrar::Operation> operation)
+{
+  auto adaptedOperation =
+    Owned<master::Operation>(new AdaptedOperation(std::move(operation)));
+
+  return registrar->apply(std::move(adaptedOperation));
+}
+
+
+MasterRegistrar::MasterRegistrar(master::Registrar* registrar)
+  : process(new MasterRegistrarProcess(registrar))
+{
+  spawn(process.get(), false);
+}
+
+
+MasterRegistrar::~MasterRegistrar()
+{
+  terminate(*process);
+  wait(*process);
+}
+
+
+Future<Nothing> MasterRegistrar::recover()
+{
+  return Nothing();
+}
+
+
+Future<bool> MasterRegistrar::apply(Owned<Operation> operation)
+{
+  return dispatch(
+      process.get(),
+      &MasterRegistrarProcess::apply,
+      std::move(operation));
+}
+
 } // namespace resource_provider {
 } // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/46db7e4f/src/resource_provider/registrar.hpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/registrar.hpp b/src/resource_provider/registrar.hpp
index 40c08c7..39f45b0 100644
--- a/src/resource_provider/registrar.hpp
+++ b/src/resource_provider/registrar.hpp
@@ -25,6 +25,8 @@
 #include <stout/nothing.hpp>
 #include <stout/try.hpp>
 
+#include "master/registrar.hpp"
+
 #include "resource_provider/registry.hpp"
 
 #include "slave/flags.hpp"
@@ -62,6 +64,10 @@ public:
     bool success = false;
   };
 
+  // Create a registry on top of a master's persistent state.
+  static Try<process::Owned<Registrar>> create(
+      mesos::internal::master::Registrar* registrar);
+
   // Create a registry on top of an agent's persistent state.
   static Try<process::Owned<Registrar>> create(
       const mesos::internal::slave::Flags& slaveFlags,
@@ -118,6 +124,27 @@ private:
   std::unique_ptr<AgentRegistrarProcess> process;
 };
 
+
+class MasterRegistrarProcess;
+
+
+class MasterRegistrar : public Registrar
+{
+public:
+  explicit MasterRegistrar(mesos::internal::master::Registrar* Registrar);
+
+  ~MasterRegistrar() override;
+
+  // This registrar performs no recovery; instead to recover
+  // the underlying master registrar needs to be recovered.
+  process::Future<Nothing> recover() override;
+
+  process::Future<bool> apply(process::Owned<Operation> operation) override;
+
+private:
+  std::unique_ptr<MasterRegistrarProcess> process;
+};
+
 } // namespace resource_provider {
 } // namespace mesos {
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/46db7e4f/src/tests/resource_provider_manager_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/resource_provider_manager_tests.cpp b/src/tests/resource_provider_manager_tests.cpp
index 562aff2..c29b892 100644
--- a/src/tests/resource_provider_manager_tests.cpp
+++ b/src/tests/resource_provider_manager_tests.cpp
@@ -21,6 +21,9 @@
 #include <mesos/http.hpp>
 #include <mesos/resources.hpp>
 
+#include <mesos/state/in_memory.hpp>
+#include <mesos/state/state.hpp>
+
 #include <mesos/v1/mesos.hpp>
 
 #include <mesos/v1/resource_provider/resource_provider.hpp>
@@ -59,6 +62,9 @@ using mesos::internal::slave::Slave;
 
 using mesos::master::detector::MasterDetector;
 
+using mesos::state::InMemoryStorage;
+using mesos::state::State;
+
 using mesos::resource_provider::AdmitResourceProvider;
 using mesos::resource_provider::Registrar;
 using mesos::resource_provider::RemoveResourceProvider;
@@ -388,6 +394,37 @@ TEST_F(ResourceProviderRegistrarTest, AgentRegistrar)
       new RemoveResourceProvider(resourceProviderId))));
 }
 
+
+// Test that the master resource provider registrar works as expected.
+TEST_F(ResourceProviderRegistrarTest, MasterRegistrar)
+{
+  ResourceProviderID resourceProviderId;
+  resourceProviderId.set_value("foo");
+
+  InMemoryStorage storage;
+  State state(&storage);
+  master::Registrar masterRegistrar(CreateMasterFlags(), &state);
+
+  const MasterInfo masterInfo = protobuf::createMasterInfo({});
+
+  Try<Owned<Registrar>> registrar = Registrar::create(&masterRegistrar);
+
+  ASSERT_SOME(registrar);
+  ASSERT_NE(nullptr, registrar->get());
+
+  // Applying operations on a not yet recovered registrar fails.
+  AWAIT_FAILED(registrar.get()->apply(Owned<Registrar::Operation>(
+      new AdmitResourceProvider(resourceProviderId))));
+
+  AWAIT_READY(masterRegistrar.recover(masterInfo));
+
+  AWAIT_READY(registrar.get()->apply(Owned<Registrar::Operation>(
+      new AdmitResourceProvider(resourceProviderId))));
+
+  AWAIT_READY(registrar.get()->apply(Owned<Registrar::Operation>(
+      new RemoveResourceProvider(resourceProviderId))));
+}
+
 } // namespace tests {
 } // namespace internal {
 } // namespace mesos {


[4/5] mesos git commit: Implemented a registrar for resource provider manager state.

Posted by bb...@apache.org.
Implemented a registrar for resource provider manager state.

This patch adds a registry and registrar interface for resource
provider managers. The registrar interface is modelled after the
master registrar and supports similar operations. Currently a single,
LevelDB-backed registrar is implemented which we plan to use for
resource provider managers in agents.

Current the registry allows to add and remove resource provider IDs.

Review: https://reviews.apache.org/r/61528/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/8db88af0
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/8db88af0
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/8db88af0

Branch: refs/heads/master
Commit: 8db88af08763f489d65f7b3fa2e372f195fa4736
Parents: 2af9a5b
Author: Benjamin Bannier <bb...@gmail.com>
Authored: Thu Aug 3 08:53:20 2017 +0200
Committer: Benjamin Bannier <bb...@apache.org>
Committed: Thu Oct 5 16:50:43 2017 +0200

----------------------------------------------------------------------
 src/CMakeLists.txt                            |   2 +
 src/Makefile.am                               |   6 +
 src/resource_provider/registrar.cpp           | 375 +++++++++++++++++++++
 src/resource_provider/registrar.hpp           | 124 +++++++
 src/resource_provider/registry.hpp            |  24 ++
 src/resource_provider/registry.proto          |  38 +++
 src/slave/paths.cpp                           |  11 +
 src/slave/paths.hpp                           |   5 +
 src/tests/resource_provider_manager_tests.cpp |  49 +++
 9 files changed, 634 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/8db88af0/src/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 5677933..1a0dff3 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -70,6 +70,7 @@ PROTOC_GENERATE(INTERNAL TARGET slave/containerizer/mesos/isolators/network/cni/
 PROTOC_GENERATE(INTERNAL TARGET slave/containerizer/mesos/isolators/docker/volume/state)
 PROTOC_GENERATE(INTERNAL TARGET slave/containerizer/mesos/provisioner/docker/message)
 PROTOC_GENERATE(INTERNAL TARGET master/registry)
+PROTOC_GENERATE(INTERNAL TARGET resource_provider/registry)
 
 
 # BUILD PROTOBUFS.
@@ -466,6 +467,7 @@ set(RESOURCE_PROVIDER_SRC
   resource_provider/driver.cpp
   resource_provider/local.cpp
   resource_provider/manager.cpp
+  resource_provider/registrar.cpp
   resource_provider/validation.cpp
   resource_provider/storage/provider.cpp)
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/8db88af0/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index 2b5b88d..da8af91 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -364,6 +364,8 @@ CXX_PROTOS +=								\
   messages/flags.pb.h							\
   messages/messages.pb.cc						\
   messages/messages.pb.h						\
+  resource_provider/registry.pb.cc					\
+  resource_provider/registry.pb.h					\
   slave/containerizer/mesos/provisioner/docker/message.pb.cc		\
   slave/containerizer/mesos/provisioner/docker/message.pb.h		\
   slave/containerizer/mesos/isolators/docker/volume/state.pb.cc		\
@@ -995,6 +997,7 @@ libmesos_no_3rdparty_la_SOURCES +=					\
   resource_provider/driver.cpp						\
   resource_provider/local.cpp						\
   resource_provider/manager.cpp						\
+  resource_provider/registrar.cpp					\
   resource_provider/validation.cpp					\
   resource_provider/storage/provider.cpp				\
   sched/sched.cpp							\
@@ -1137,6 +1140,9 @@ libmesos_no_3rdparty_la_SOURCES +=					\
   resource_provider/local.hpp						\
   resource_provider/manager.hpp						\
   resource_provider/message.hpp						\
+  resource_provider/registrar.hpp					\
+  resource_provider/registry.hpp					\
+  resource_provider/registry.proto					\
   resource_provider/validation.hpp					\
   resource_provider/storage/provider.hpp				\
   sched/constants.hpp							\

http://git-wip-us.apache.org/repos/asf/mesos/blob/8db88af0/src/resource_provider/registrar.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/registrar.cpp b/src/resource_provider/registrar.cpp
new file mode 100644
index 0000000..5dfcb89
--- /dev/null
+++ b/src/resource_provider/registrar.cpp
@@ -0,0 +1,375 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "resource_provider/registrar.hpp"
+
+#include <algorithm>
+#include <deque>
+#include <string>
+#include <utility>
+
+#include <glog/logging.h>
+
+#include <mesos/type_utils.hpp>
+
+#include <mesos/state/in_memory.hpp>
+
+#ifndef __WINDOWS__
+#include <mesos/state/leveldb.hpp>
+#endif // __WINDOWS__
+
+#include <mesos/state/protobuf.hpp>
+
+#include <process/defer.hpp>
+#include <process/process.hpp>
+
+#include <stout/none.hpp>
+#include <stout/nothing.hpp>
+#include <stout/option.hpp>
+#include <stout/path.hpp>
+#include <stout/unreachable.hpp>
+
+#include "slave/paths.hpp"
+
+
+using std::deque;
+using std::string;
+using std::unique_ptr;
+
+using mesos::resource_provider::registry::Registry;
+using mesos::resource_provider::registry::ResourceProvider;
+
+using mesos::state::InMemoryStorage;
+
+#ifndef __WINDOWS__
+using mesos::state::LevelDBStorage;
+#endif // __WINDOWS__
+
+using mesos::state::Storage;
+
+using mesos::state::protobuf::Variable;
+
+using process::Failure;
+using process::Future;
+using process::Owned;
+using process::Process;
+using process::Promise;
+using process::defer;
+using process::spawn;
+using process::terminate;
+using process::wait;
+
+
+namespace slave = mesos::internal::slave;
+
+namespace mesos {
+namespace resource_provider {
+
+Try<bool> Registrar::Operation::operator()(Registry* registry)
+{
+  Try<bool> result = perform(registry);
+
+  success = !result.isError();
+
+  return result;
+}
+
+
+bool Registrar::Operation::set()
+{
+  return Promise<bool>::set(success);
+}
+
+
+Try<Owned<Registrar>> Registrar::create(
+    const slave::Flags& slaveFlags,
+    const SlaveID& slaveId)
+{
+  return new AgentRegistrar(slaveFlags, slaveId);
+}
+
+
+AdmitResourceProvider::AdmitResourceProvider(const ResourceProviderID& _id)
+  : id(_id) {}
+
+
+Try<bool> AdmitResourceProvider::perform(Registry* registry)
+{
+  if (std::find_if(
+          registry->resource_providers().begin(),
+          registry->resource_providers().end(),
+          [this](const ResourceProvider& resourceProvider) {
+            return resourceProvider.id() == this->id;
+          }) != registry->resource_providers().end()) {
+    return Error("Resource provider already admitted");
+  }
+
+  ResourceProvider resourceProvider;
+  resourceProvider.mutable_id()->CopyFrom(id);
+
+  registry->add_resource_providers()->CopyFrom(resourceProvider);
+
+  return true; // Mutation.
+}
+
+
+RemoveResourceProvider::RemoveResourceProvider(const ResourceProviderID& _id)
+  : id(_id) {}
+
+
+Try<bool> RemoveResourceProvider::perform(Registry* registry)
+{
+  auto pos = std::find_if(
+      registry->resource_providers().begin(),
+      registry->resource_providers().end(),
+      [this](const ResourceProvider& resourceProvider) {
+        return resourceProvider.id() == this->id;
+      });
+
+  if (pos == registry->resource_providers().end()) {
+    return Error("Attempted to remove an unknown resource provider");
+  }
+
+  registry->mutable_resource_providers()->erase(pos);
+
+  return true; // Mutation.
+}
+
+
+class AgentRegistrarProcess : public Process<AgentRegistrarProcess>
+{
+public:
+  AgentRegistrarProcess(const slave::Flags& flags, const SlaveID& slaveId);
+
+  Future<Nothing> recover();
+
+  Future<bool> apply(Owned<Registrar::Operation> operation);
+
+  Future<bool> _apply(Owned<Registrar::Operation> operation);
+
+  void update();
+
+  void _update(
+      const Future<Option<Variable<Registry>>>& store,
+      const Registry& updatedRegistry,
+      deque<Owned<Registrar::Operation>> applied);
+
+private:
+  Owned<Storage> storage;
+
+  // Use fully qualified type for `State` to disambiguate with `State`
+  // enumeration in `ProcessBase`.
+  mesos::state::protobuf::State state;
+
+  Option<Future<Nothing>> recovered;
+  Option<Registry> registry;
+  Option<Variable<Registry>> variable;
+
+  Option<Error> error;
+
+  deque<Owned<Registrar::Operation>> operations;
+
+  bool updating = false;
+
+  static Owned<Storage> createStorage(const std::string& path);
+};
+
+
+Owned<Storage> AgentRegistrarProcess::createStorage(const std::string& path)
+{
+  // The registrar uses LevelDB as underlying storage. Since LevelDB
+  // is currently not supported on Windows (see MESOS-5932), we fall
+  // back to in-memory storage there.
+  //
+  // TODO(bbannier): Remove this Windows workaround once MESOS-5932 is fixed.
+#ifndef __WINDOWS__
+  return Owned<Storage>(new LevelDBStorage(path));
+#else
+  LOG(WARNING)
+    << "Persisting resource provider manager state is not supported on Windows";
+  return Owned<Storage>(new InMemoryStorage());
+#endif // __WINDOWS__
+}
+
+
+AgentRegistrarProcess::AgentRegistrarProcess(
+    const slave::Flags& flags, const SlaveID& slaveId)
+  : ProcessBase(process::ID::generate("resource-provider-agent-registrar")),
+    storage(createStorage(slave::paths::getResourceProviderRegistryPath(
+        flags.work_dir, slaveId))),
+    state(storage.get()) {}
+
+
+Future<Nothing> AgentRegistrarProcess::recover()
+{
+  constexpr char NAME[] = "RESOURCE_PROVIDER_REGISTRAR";
+
+  if (recovered.isNone()) {
+    recovered = state.fetch<Registry>(NAME).then(
+        defer(self(), [this](const Variable<Registry>& recovery) {
+          registry = recovery.get();
+          variable = recovery;
+
+          return Nothing();
+        }));
+  }
+
+  return recovered.get();
+}
+
+
+Future<bool> AgentRegistrarProcess::apply(Owned<Registrar::Operation> operation)
+{
+  if (recovered.isNone()) {
+    return Failure("Attempted to apply the operation before recovering");
+  }
+
+  return recovered->then(defer(self(), &Self::_apply, std::move(operation)));
+}
+
+
+Future<bool> AgentRegistrarProcess::_apply(
+    Owned<Registrar::Operation> operation)
+{
+  if (error.isSome()) {
+    return Failure(error.get());
+  }
+
+  operations.push_back(std::move(operation));
+
+  Future<bool> future = operations.back()->future();
+  if (!updating) {
+    update();
+  }
+
+  return future;
+}
+
+
+void AgentRegistrarProcess::update()
+{
+  CHECK(!updating);
+  CHECK_NONE(error);
+
+  if (operations.empty()) {
+    return; // No-op.
+  }
+
+  updating = true;
+
+  CHECK_SOME(registry);
+  Registry updatedRegistry = registry.get();
+
+  foreach (Owned<Registrar::Operation>& operation, operations) {
+    (*operation)(&updatedRegistry);
+  }
+
+  // Serialize updated registry.
+  CHECK_SOME(variable);
+
+  Future<Option<Variable<Registry>>> store =
+    state.store(variable->mutate(updatedRegistry));
+
+  store.onAny(defer(
+      self(),
+      &Self::_update,
+      lambda::_1,
+      updatedRegistry,
+      std::move(operations)));
+
+  operations.clear();
+}
+
+
+void AgentRegistrarProcess::_update(
+    const Future<Option<Variable<Registry>>>& store,
+    const Registry& updatedRegistry,
+    deque<Owned<Registrar::Operation>> applied)
+{
+  updating = false;
+  // Abort if the storage operation did not succeed.
+  if (!store.isReady() || store.get().isNone()) {
+    string message = "Failed to update registry: ";
+
+    if (store.isFailed()) {
+      message += store.failure();
+    } else if (store.isDiscarded()) {
+      message += "discarded";
+    } else {
+      message += "version mismatch";
+    }
+
+    while (!applied.empty()) {
+      applied.front()->fail(message);
+      applied.pop_front();
+    }
+
+    error = Error(message);
+
+    LOG(ERROR) << "Registrar aborting: " << message;
+
+    return;
+  }
+
+  variable = store->get();
+  registry = updatedRegistry;
+
+  // Remove the operations.
+  while (!applied.empty()) {
+    Owned<Registrar::Operation> operation = applied.front();
+    applied.pop_front();
+
+    operation->set();
+  }
+
+  if (!operations.empty()) {
+    update();
+  }
+}
+
+
+AgentRegistrar::AgentRegistrar(
+    const slave::Flags& slaveFlags,
+    const SlaveID& slaveId)
+  : process(new AgentRegistrarProcess(slaveFlags, slaveId))
+{
+  process::spawn(process.get(), false);
+}
+
+
+AgentRegistrar::~AgentRegistrar()
+{
+  process::terminate(*process);
+  process::wait(*process);
+}
+
+
+Future<Nothing> AgentRegistrar::recover()
+{
+  return dispatch(process.get(), &AgentRegistrarProcess::recover);
+}
+
+
+Future<bool> AgentRegistrar::apply(Owned<Operation> operation)
+{
+  return dispatch(
+      process.get(),
+      &AgentRegistrarProcess::apply,
+      std::move(operation));
+}
+
+} // namespace resource_provider {
+} // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/8db88af0/src/resource_provider/registrar.hpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/registrar.hpp b/src/resource_provider/registrar.hpp
new file mode 100644
index 0000000..40c08c7
--- /dev/null
+++ b/src/resource_provider/registrar.hpp
@@ -0,0 +1,124 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef __RESOURCE_PROVIDER_REGISTRAR_HPP__
+#define __RESOURCE_PROVIDER_REGISTRAR_HPP__
+
+#include <memory>
+
+#include <process/future.hpp>
+#include <process/owned.hpp>
+
+#include <stout/nothing.hpp>
+#include <stout/try.hpp>
+
+#include "resource_provider/registry.hpp"
+
+#include "slave/flags.hpp"
+
+
+namespace mesos {
+namespace resource_provider {
+
+class Registrar
+{
+public:
+  // Defines an abstraction for operations that can be applied on the
+  // Registry.
+  // TODO(xujyan): Make Operation generic so that we can apply them
+  // against a generic "batch operation applier" abstraction, see TODO
+  // in master/registrar.hpp for more details.
+  class Operation : public process::Promise<bool>
+  {
+  public:
+    virtual ~Operation() = default;
+
+    // Attempts to invoke the operation on the registry object.
+    //
+    // Returns whether the operation mutates 'registry', or an error if
+    // the operation cannot be applied successfully.
+    Try<bool> operator()(registry::Registry* registry);
+
+    // Sets the promise based on whether the operation was successful.
+    bool set();
+
+  protected:
+    virtual Try<bool> perform(registry::Registry* registry) = 0;
+
+  private:
+    bool success = false;
+  };
+
+  // Create a registry on top of an agent's persistent state.
+  static Try<process::Owned<Registrar>> create(
+      const mesos::internal::slave::Flags& slaveFlags,
+      const SlaveID& slaveId);
+
+  virtual ~Registrar() = default;
+
+  virtual process::Future<Nothing> recover() = 0;
+  virtual process::Future<bool> apply(process::Owned<Operation> operation) = 0;
+};
+
+
+class AdmitResourceProvider : public Registrar::Operation
+{
+public:
+  explicit AdmitResourceProvider(const ResourceProviderID& id);
+
+private:
+  Try<bool> perform(registry::Registry* registry) override;
+
+  ResourceProviderID id;
+};
+
+
+class RemoveResourceProvider : public Registrar::Operation
+{
+public:
+  explicit RemoveResourceProvider(const ResourceProviderID& id);
+
+private:
+  Try<bool> perform(registry::Registry* registry) override;
+
+  ResourceProviderID id;
+};
+
+
+class AgentRegistrarProcess;
+
+
+class AgentRegistrar : public Registrar
+{
+public:
+  AgentRegistrar(
+      const mesos::internal::slave::Flags& slaveFlags,
+      const SlaveID& slaveId);
+
+  ~AgentRegistrar() override;
+
+  process::Future<Nothing> recover() override;
+
+  process::Future<bool> apply(process::Owned<Operation> operation) override;
+
+private:
+  std::unique_ptr<AgentRegistrarProcess> process;
+};
+
+} // namespace resource_provider {
+} // namespace mesos {
+
+#endif // __RESOURCE_PROVIDER_REGISTRAR_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/8db88af0/src/resource_provider/registry.hpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/registry.hpp b/src/resource_provider/registry.hpp
new file mode 100644
index 0000000..048cd6b
--- /dev/null
+++ b/src/resource_provider/registry.hpp
@@ -0,0 +1,24 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+
+#ifndef __RESOURCE_PROVIDER_REGISTRY_HPP__
+#define __RESOURCE_PROVIDER_REGISTRY_HPP__
+
+// ONLY USEFUL AFTER RUNNING PROTOC.
+#include <resource_provider/registry.pb.h>
+
+#endif // __RESOURCE_PROVIDER_REGISTRY_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/8db88af0/src/resource_provider/registry.proto
----------------------------------------------------------------------
diff --git a/src/resource_provider/registry.proto b/src/resource_provider/registry.proto
new file mode 100644
index 0000000..14bd433
--- /dev/null
+++ b/src/resource_provider/registry.proto
@@ -0,0 +1,38 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+syntax = "proto2";
+
+import "mesos/mesos.proto";
+
+package mesos.resource_provider.registry;
+
+option java_package = "org.apache.mesos.resource_provider.registry";
+option java_outer_classname = "Protos";
+
+// NOTE: This object defines wrappers around existing objects in case
+// the Registry wishes to store more information about the wrapped objects
+// in the future.
+
+message ResourceProvider {
+  required ResourceProviderID id = 1;
+}
+
+
+// A top level object that is managed by the Registrar and persisted.
+message Registry {
+  repeated ResourceProvider resource_providers = 1;
+}

http://git-wip-us.apache.org/repos/asf/mesos/blob/8db88af0/src/slave/paths.cpp
----------------------------------------------------------------------
diff --git a/src/slave/paths.cpp b/src/slave/paths.cpp
index e8724bf..fd54652 100644
--- a/src/slave/paths.cpp
+++ b/src/slave/paths.cpp
@@ -70,6 +70,7 @@ const char SLAVES_DIR[] = "slaves";
 const char FRAMEWORKS_DIR[] = "frameworks";
 const char EXECUTORS_DIR[] = "executors";
 const char EXECUTOR_RUNS_DIR[] = "runs";
+const char RESOURCE_PROVIDER_REGISTRY[] = "resource_provider_registry";
 
 
 Try<ExecutorRunPath> parseExecutorRunPath(
@@ -447,6 +448,16 @@ string getTaskUpdatesPath(
 }
 
 
+string getResourceProviderRegistryPath(
+    const string& rootDir,
+    const SlaveID& slaveId)
+{
+  return path::join(
+      getSlavePath(getMetaRootDir(rootDir), slaveId),
+      RESOURCE_PROVIDER_REGISTRY);
+}
+
+
 string getResourcesInfoPath(
     const string& rootDir)
 {

http://git-wip-us.apache.org/repos/asf/mesos/blob/8db88af0/src/slave/paths.hpp
----------------------------------------------------------------------
diff --git a/src/slave/paths.hpp b/src/slave/paths.hpp
index d021e6b..f000508 100644
--- a/src/slave/paths.hpp
+++ b/src/slave/paths.hpp
@@ -284,6 +284,11 @@ std::string getTaskUpdatesPath(
     const TaskID& taskId);
 
 
+std::string getResourceProviderRegistryPath(
+    const std::string& rootDir,
+    const SlaveID& slaveId);
+
+
 std::string getResourcesInfoPath(
     const std::string& rootDir);
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/8db88af0/src/tests/resource_provider_manager_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/resource_provider_manager_tests.cpp b/src/tests/resource_provider_manager_tests.cpp
index 3bc56b5..562aff2 100644
--- a/src/tests/resource_provider_manager_tests.cpp
+++ b/src/tests/resource_provider_manager_tests.cpp
@@ -47,6 +47,7 @@
 #include "internal/devolve.hpp"
 
 #include "resource_provider/manager.hpp"
+#include "resource_provider/registrar.hpp"
 
 #include "slave/slave.hpp"
 
@@ -58,6 +59,10 @@ using mesos::internal::slave::Slave;
 
 using mesos::master::detector::MasterDetector;
 
+using mesos::resource_provider::AdmitResourceProvider;
+using mesos::resource_provider::Registrar;
+using mesos::resource_provider::RemoveResourceProvider;
+
 using mesos::v1::resource_provider::Call;
 using mesos::v1::resource_provider::Event;
 
@@ -339,6 +344,50 @@ TEST_P(ResourceProviderManagerHttpApiTest, AgentEndpoint)
   EXPECT_FALSE(event->get().subscribed().provider_id().value().empty());
 }
 
+
+class ResourceProviderRegistrarTest : public tests::MesosTest {};
+
+
+// Test that the agent resource provider registrar works as expected.
+TEST_F(ResourceProviderRegistrarTest, AgentRegistrar)
+{
+  ResourceProviderID resourceProviderId;
+  resourceProviderId.set_value("foo");
+
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+
+  const slave::Flags flags = CreateSlaveFlags();
+
+  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), master.get()->pid, _);
+
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags);
+  ASSERT_SOME(slave);
+
+  AWAIT_READY(slaveRegisteredMessage);
+
+  Try<Owned<Registrar>> registrar =
+    Registrar::create(flags, slaveRegisteredMessage->slave_id());
+
+  ASSERT_SOME(registrar);
+  ASSERT_NE(nullptr, registrar->get());
+
+  // Applying operations on a not yet recovered registrar fails.
+  AWAIT_FAILED(registrar.get()->apply(Owned<Registrar::Operation>(
+      new AdmitResourceProvider(resourceProviderId))));
+
+  AWAIT_READY(registrar.get()->recover());
+
+  AWAIT_READY(registrar.get()->apply(Owned<Registrar::Operation>(
+      new AdmitResourceProvider(resourceProviderId))));
+
+  AWAIT_READY(registrar.get()->apply(Owned<Registrar::Operation>(
+      new RemoveResourceProvider(resourceProviderId))));
+}
+
 } // namespace tests {
 } // namespace internal {
 } // namespace mesos {


[5/5] mesos git commit: Rescinded offers possibly affected by updates to agent total resources.

Posted by bb...@apache.org.
Rescinded offers possibly affected by updates to agent total resources.

When an agent changes its resources, the master should rescind any
offers affected by the change. We already performed the rescind for
updates to the agent's oversubscribed resources; this patch adds offer
rescinding when an update an agent's total resources is processed.

While for updates to an agent's oversubscribed resources we currently
only rescind offers containing revocable resources to e.g., reduce
offer churn, for updates to the total we here currently rescind all
offers for resources on the agent.

As an optimization, this patch adds logic to ignore redundant updates
to agent resources.

Review: https://reviews.apache.org/r/62158


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/d38fe9d5
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/d38fe9d5
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/d38fe9d5

Branch: refs/heads/master
Commit: d38fe9d5a4db0a37b876c55c99b547d4c8fbd8dd
Parents: a88469b
Author: Benjamin Bannier <bb...@gmail.com>
Authored: Thu Sep 7 16:09:10 2017 +0200
Committer: Benjamin Bannier <bb...@apache.org>
Committed: Thu Oct 5 16:50:43 2017 +0200

----------------------------------------------------------------------
 src/master/master.cpp | 49 ++++++++++++++++++++++++++++++++++++++--------
 1 file changed, 41 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/d38fe9d5/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 169fee4..3603878 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -6820,27 +6820,60 @@ void Master::updateSlave(const UpdateSlaveMessage& message)
     newOversubscribed = oversubscribedResources;
   }
 
-  slave->totalResources =
+  const Resources newSlaveResources =
     newTotal.getOrElse(slave->totalResources.nonRevocable()) +
     newOversubscribed.getOrElse(slave->totalResources.revocable());
 
+  if (newSlaveResources == slave->totalResources) {
+    LOG(INFO) << "Ignoring update on agent " << *slave
+              << " as it reports no changes";
+    return;
+  }
+
+  slave->totalResources = newSlaveResources;
+
   // Now update the agent's resources in the allocator.
   allocator->updateSlave(slaveId, slave->totalResources);
 
-  // Then rescind any outstanding offers with revocable resources.
+  // Then rescind outstanding offers affected by the update.
   // NOTE: Need a copy of offers because the offers are removed inside the loop.
   foreach (Offer* offer, utils::copy(slave->offers)) {
+    bool rescind = false;
+
     const Resources& offered = offer->resources();
-    if (!offered.revocable().empty()) {
+    // Since updates of the agent's oversubscribed resources are sent at regular
+    // intervals, we only rescind offers containing revocable resources to
+    // reduce churn.
+    if (hasOversubscribed && !offered.revocable().empty()) {
       LOG(INFO) << "Removing offer " << offer->id()
-                << " with revocable resources " << offered
-                << " on agent " << *slave;
+                << " with revocable resources " << offered << " on agent "
+                << *slave;
 
-      allocator->recoverResources(
-          offer->framework_id(), offer->slave_id(), offered, None());
+      rescind = true;
+    }
+
+    // For updates to the agent's total resources all offers are rescinded.
+    //
+    // TODO(bbannier): Only rescind offers possibly containing removed
+    // resources.
+    if (hasTotal) {
+      LOG(INFO) << "Removing offer " << offer->id() << " with resources "
+                << offered << " on agent " << *slave;
 
-      removeOffer(offer, true); // Rescind.
+      rescind = true;
     }
+
+    if (!rescind) {
+      continue;
+    }
+
+    allocator->recoverResources(
+        offer->framework_id(),
+        offer->slave_id(),
+        offered,
+        None());
+
+    removeOffer(offer, true); // Rescind.
   }
 
   // NOTE: We don't need to rescind inverse offers here as they are unrelated to


[3/5] mesos git commit: Made 'UpdateSlaveMessage' a union of possible updates.

Posted by bb...@apache.org.
Made 'UpdateSlaveMessage' a union of possible updates.

The 'UpdateSlaveMessage' could either transport an update to an
agent's total, or to its oversubscribed resources. In certain
scenarios this required the agent to send two messages where before
one was sufficient. When talking to a master which did not understand
the 'type' field this would have let the master to rescind all offers
for revocable resources from that agent.

This patch changes the message to be a union of possible updates,
i.e., it is now possible to send updates to both the total and the
oversubscribed resources simultaneously on agent registration and
reregistration without the master rescinding offers for revocable
resources on the agent.

Review: https://reviews.apache.org/r/62655


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a88469bf
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a88469bf
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a88469bf

Branch: refs/heads/master
Commit: a88469bfe8b61aa7e83e8ceb5e21fc5a4641cec8
Parents: 38af943
Author: Benjamin Bannier <bb...@apache.org>
Authored: Wed Sep 27 14:26:03 2017 +0200
Committer: Benjamin Bannier <bb...@apache.org>
Committed: Thu Oct 5 16:50:43 2017 +0200

----------------------------------------------------------------------
 src/master/master.cpp                | 70 +++++++++++++++++--------------
 src/messages/messages.proto          | 24 +++++------
 src/slave/slave.cpp                  |  6 +--
 src/tests/oversubscription_tests.cpp | 15 +++++--
 4 files changed, 64 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/a88469bf/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 51d92ef..169fee4 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -6782,42 +6782,48 @@ void Master::updateSlave(const UpdateSlaveMessage& message)
   // updating the agent in the allocator. This would lead us to
   // re-send out the stale oversubscribed resources!
 
-  // If the caller did not specify a type we assume we should set
-  // `oversubscribedResources` to be backwards-compatibility with
-  // older clients.
-  const UpdateSlaveMessage::Type type =
-    message.has_type() ? message.type() : UpdateSlaveMessage::OVERSUBSCRIBED;
-
-  switch (type) {
-    case UpdateSlaveMessage::OVERSUBSCRIBED: {
-      const Resources oversubscribedResources =
-        message.oversubscribed_resources();
-
-      LOG(INFO) << "Received update of agent " << *slave << " with total"
-                << " oversubscribed resources " << oversubscribedResources;
-
-      slave->totalResources =
-        slave->totalResources.nonRevocable() +
-        oversubscribedResources.revocable();
-      break;
-    }
-    case UpdateSlaveMessage::TOTAL: {
-      const Resources totalResources =
-        message.total_resources();
+  // If the caller did not specify a resource category we assume we should set
+  // `oversubscribedResources` to be backwards-compatibility with older clients.
+  const bool hasOversubscribed =
+    (message.has_resource_categories() &&
+     message.resource_categories().has_oversubscribed() &&
+     message.resource_categories().oversubscribed()) ||
+    !message.has_resource_categories();
 
-      LOG(INFO) << "Received update of agent " << *slave << " with total"
-                << " resources " << totalResources;
+  const bool hasTotal =
+    message.has_resource_categories() &&
+    message.resource_categories().has_total() &&
+    message.resource_categories().total();
 
-      slave->totalResources = totalResources;
-      break;
-    }
-    case UpdateSlaveMessage::UNKNOWN: {
-      LOG(WARNING) << "Ignoring update on agent " << slaveId
-                   << " since the update type is not understood";
-      return;
-    }
+  Option<Resources> newTotal;
+  Option<Resources> newOversubscribed;
+
+  if (hasTotal) {
+    const Resources& totalResources = message.total_resources();
+
+    LOG(INFO) << "Received update of agent " << *slave << " with total"
+              << " resources " << totalResources;
+
+    newTotal = totalResources;
   }
 
+  // Since `total` always overwrites an existing total, we apply
+  // `oversubscribed` after updating the total to be able to
+  // independently apply it regardless of whether `total` was sent.
+  if (hasOversubscribed) {
+    const Resources& oversubscribedResources =
+      message.oversubscribed_resources();
+
+    LOG(INFO) << "Received update of agent " << *slave << " with total"
+              << " oversubscribed resources " << oversubscribedResources;
+
+    newOversubscribed = oversubscribedResources;
+  }
+
+  slave->totalResources =
+    newTotal.getOrElse(slave->totalResources.nonRevocable()) +
+    newOversubscribed.getOrElse(slave->totalResources.revocable());
+
   // Now update the agent's resources in the allocator.
   allocator->updateSlave(slaveId, slave->totalResources);
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/a88469bf/src/messages/messages.proto
----------------------------------------------------------------------
diff --git a/src/messages/messages.proto b/src/messages/messages.proto
index dc4e19c..afca6d1 100644
--- a/src/messages/messages.proto
+++ b/src/messages/messages.proto
@@ -610,22 +610,20 @@ message CheckpointResourcesMessage {
 message UpdateSlaveMessage {
   required SlaveID slave_id = 1;
 
-  // This message can contain one of `oversubscribed_resources` or
-  // `total_resources`. Callers are expected to set the `type` field
-  // to denote which field should be examined. For backwards
-  // compatibility we interpret an unset `type` field as if it was
-  // `OVERSUBSCRIBED`.
+  // This message can contain `oversubscribed_resources` or
+  // `total_resources`. Callers are expected to set the respective
+  // categories in `resource_categories` to denote which fields should
+  // be examined. For backwards compatibility we interpret an unset
+  // `category` field as if only oversubscribed was set.
   //
-  // It is suggested that callers use the `total_resources` field
-  // exclusively as the `oversubscribed_resources` field might be
-  // removed in the future.
-  enum Type {
-    UNKNOWN = 0;
-    OVERSUBSCRIBED = 1;
-    TOTAL = 2;
+  // Oversubscribed resources must be revocable, while total resources
+  // must be non-revocable.
+  message ResourceCategories {
+    optional bool total = 1;
+    optional bool oversubscribed = 2;
   }
 
-  optional Type type = 3;
+  optional ResourceCategories resource_categories = 5;
 
   repeated Resource oversubscribed_resources = 2;
   repeated Resource total_resources = 4;

http://git-wip-us.apache.org/repos/asf/mesos/blob/a88469bf/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 1ac95f5..c35cf7d 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -1264,7 +1264,7 @@ void Slave::registered(
 
     UpdateSlaveMessage message;
     message.mutable_slave_id()->CopyFrom(info.id());
-    message.set_type(UpdateSlaveMessage::OVERSUBSCRIBED);
+    message.mutable_resource_categories()->set_oversubscribed(true);
     message.mutable_oversubscribed_resources()->CopyFrom(
         oversubscribedResources.get());
 
@@ -1344,7 +1344,7 @@ void Slave::reregistered(
 
     UpdateSlaveMessage message;
     message.mutable_slave_id()->CopyFrom(info.id());
-    message.set_type(UpdateSlaveMessage::OVERSUBSCRIBED);
+    message.mutable_resource_categories()->set_oversubscribed(true);
     message.mutable_oversubscribed_resources()->CopyFrom(
         oversubscribedResources.get());
 
@@ -6542,7 +6542,7 @@ void Slave::_forwardOversubscribed(const Future<Resources>& oversubscribable)
 
       UpdateSlaveMessage message;
       message.mutable_slave_id()->CopyFrom(info.id());
-      message.set_type(UpdateSlaveMessage::OVERSUBSCRIBED);
+      message.mutable_resource_categories()->set_oversubscribed(true);
       message.mutable_oversubscribed_resources()->CopyFrom(oversubscribed);
 
       CHECK_SOME(master);

http://git-wip-us.apache.org/repos/asf/mesos/blob/a88469bf/src/tests/oversubscription_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/oversubscription_tests.cpp b/src/tests/oversubscription_tests.cpp
index 02b10d6..cd98b8f 100644
--- a/src/tests/oversubscription_tests.cpp
+++ b/src/tests/oversubscription_tests.cpp
@@ -323,7 +323,9 @@ TEST_F(OversubscriptionTest, ForwardUpdateSlaveMessage)
 
   AWAIT_READY(update);
 
-  EXPECT_EQ(update->type(), UpdateSlaveMessage::OVERSUBSCRIBED);
+  EXPECT_TRUE(update->has_resource_categories());
+  EXPECT_TRUE(update->resource_categories().has_oversubscribed());
+  EXPECT_TRUE(update->resource_categories().oversubscribed());
   EXPECT_EQ(update->oversubscribed_resources(), resources);
 
   // Ensure the metric is updated.
@@ -696,7 +698,9 @@ TEST_F(OversubscriptionTest, FixedResourceEstimator)
   Clock::settle();
 
   AWAIT_READY(update);
-  ASSERT_EQ(UpdateSlaveMessage::OVERSUBSCRIBED, update->type());
+  ASSERT_TRUE(update->has_resource_categories());
+  ASSERT_TRUE(update->resource_categories().has_oversubscribed());
+  ASSERT_TRUE(update->resource_categories().oversubscribed());
 
   Resources resources = update->oversubscribed_resources();
   EXPECT_SOME_EQ(2.0, resources.cpus());
@@ -898,7 +902,9 @@ TEST_F(OversubscriptionTest, Reregistration)
   Clock::advance(agentFlags.registration_backoff_factor);
   AWAIT_READY(slaveRegistered);
   AWAIT_READY(update);
-  ASSERT_EQ(UpdateSlaveMessage::OVERSUBSCRIBED, update->type());
+  ASSERT_TRUE(update->has_resource_categories());
+  ASSERT_TRUE(update->resource_categories().has_oversubscribed());
+  ASSERT_TRUE(update->resource_categories().oversubscribed());
 
   Resources resources = update->oversubscribed_resources();
   EXPECT_SOME_EQ(2.0, resources.cpus());
@@ -915,6 +921,9 @@ TEST_F(OversubscriptionTest, Reregistration)
   Clock::advance(agentFlags.registration_backoff_factor);
   AWAIT_READY(slaveReregistered);
   AWAIT_READY(update);
+  EXPECT_TRUE(update->has_resource_categories());
+  EXPECT_TRUE(update->resource_categories().has_oversubscribed());
+  EXPECT_TRUE(update->resource_categories().oversubscribed());
 }
 
 


[2/5] mesos git commit: Triggered 'UpdateSlaveMessage' when 'ResourceProviderManager' updates.

Posted by bb...@apache.org.
Triggered 'UpdateSlaveMessage' when 'ResourceProviderManager' updates.

The agent's resource provider manager sends a
'ResourceProviderMessage' when its managed resources change. This
commit adds handling in the agent so that an 'UpdateSlaveMessage' is
sent to the master to update the total resource available on the
agent.

In order to provide push-like handling of the resource provider
manager's message queue, we chain recursive calls to the handler for
continuous processing. Initially, processing is kicked off from
'Slave::initialize'. In this simple implementation we e.g., provide no
direct way to stop processing of messages, yet, but it can be achieved
by e.g., replacing the manager with a new instance (this would also
require updating routes).

Since the agent can only send an 'UpdateSlaveMessage' when it is
registered with a master, a simple back-off of 5 s is implemented which
will defer processing of a ready message should the agent not yet have
registered.

To facilitate logging we add a stringification function for
'ResourceProviderMessage's.

This patch also adjusts a number of tests to no expect two
'UpdateSlaveMessage's.

Review: https://reviews.apache.org/r/61183


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/2af9a5b0
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/2af9a5b0
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/2af9a5b0

Branch: refs/heads/master
Commit: 2af9a5b07dc80151154264e974d03f56a1c25838
Parents: d38fe9d
Author: Benjamin Bannier <bb...@apache.org>
Authored: Wed Sep 27 15:18:27 2017 +0200
Committer: Benjamin Bannier <bb...@apache.org>
Committed: Thu Oct 5 16:50:43 2017 +0200

----------------------------------------------------------------------
 src/resource_provider/message.hpp    |  25 +++++++
 src/slave/slave.cpp                  | 114 ++++++++++++++++++++++++++--
 src/slave/slave.hpp                  |   7 +-
 src/tests/oversubscription_tests.cpp |  29 +++++---
 src/tests/slave_tests.cpp            | 119 ++++++++++++++++++++++++++++++
 5 files changed, 275 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/2af9a5b0/src/resource_provider/message.hpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/message.hpp b/src/resource_provider/message.hpp
index 3c7c3f2..931aab6 100644
--- a/src/resource_provider/message.hpp
+++ b/src/resource_provider/message.hpp
@@ -17,10 +17,14 @@
 #ifndef __RESOURCE_PROVIDER_MESSAGE_HPP__
 #define __RESOURCE_PROVIDER_MESSAGE_HPP__
 
+#include <ostream>
+
 #include <mesos/mesos.hpp>
 #include <mesos/resources.hpp>
 
+#include <stout/check.hpp>
 #include <stout/option.hpp>
+#include <stout/unreachable.hpp>
 
 namespace mesos {
 namespace internal {
@@ -42,6 +46,27 @@ struct ResourceProviderMessage
   Option<UpdateTotalResources> updateTotalResources;
 };
 
+
+inline std::ostream& operator<<(
+    std::ostream& stream,
+    const ResourceProviderMessage& resourceProviderMessage)
+{
+  switch (resourceProviderMessage.type) {
+    case ResourceProviderMessage::Type::UPDATE_TOTAL_RESOURCES:
+      const Option<ResourceProviderMessage::UpdateTotalResources>&
+        updateTotalResources = resourceProviderMessage.updateTotalResources;
+
+      CHECK_SOME(updateTotalResources);
+
+      return stream
+          << "UPDATE_TOTAL_RESOURCES: "
+          << updateTotalResources->id << " "
+          << updateTotalResources->total;
+  }
+
+  UNREACHABLE();
+}
+
 } // namespace internal {
 } // namespace mesos {
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/2af9a5b0/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index c35cf7d..2e05637 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -1257,19 +1257,27 @@ void Slave::registered(
       break;
   }
 
+  // Send the latest total, including resources from resource providers. We send
+  // this message here as a resource provider might have registered with the
+  // agent between recovery completion and agent registration.
+  LOG(INFO) << "Forwarding total resources " << totalResources;
+
+  UpdateSlaveMessage message;
+  message.mutable_slave_id()->CopyFrom(info.id());
+  message.mutable_resource_categories()->set_total(true);
+  message.mutable_total_resources()->CopyFrom(totalResources);
+
   // Send the latest estimate for oversubscribed resources.
   if (oversubscribedResources.isSome()) {
     LOG(INFO) << "Forwarding total oversubscribed resources "
               << oversubscribedResources.get();
 
-    UpdateSlaveMessage message;
-    message.mutable_slave_id()->CopyFrom(info.id());
     message.mutable_resource_categories()->set_oversubscribed(true);
     message.mutable_oversubscribed_resources()->CopyFrom(
         oversubscribedResources.get());
-
-    send(master.get(), message);
   }
+
+  send(master.get(), message);
 }
 
 
@@ -1337,20 +1345,28 @@ void Slave::reregistered(
       return;
   }
 
+  // Send the latest total, including resources from resource providers. We send
+  // this message here as a resource provider might have registered with the
+  // agent between recovery completion and agent registration.
+  LOG(INFO) << "Forwarding total resources " << totalResources;
+
+  UpdateSlaveMessage message;
+  message.mutable_slave_id()->CopyFrom(info.id());
+  message.mutable_resource_categories()->set_total(true);
+  message.mutable_total_resources()->CopyFrom(totalResources);
+
   // Send the latest estimate for oversubscribed resources.
   if (oversubscribedResources.isSome()) {
     LOG(INFO) << "Forwarding total oversubscribed resources "
               << oversubscribedResources.get();
 
-    UpdateSlaveMessage message;
-    message.mutable_slave_id()->CopyFrom(info.id());
     message.mutable_resource_categories()->set_oversubscribed(true);
     message.mutable_oversubscribed_resources()->CopyFrom(
         oversubscribedResources.get());
-
-    send(master.get(), message);
   }
 
+  send(master.get(), message);
+
   // Reconcile any tasks per the master's request.
   foreach (const ReconcileTasksMessage& reconcile, reconciliations) {
     Framework* framework = getFramework(reconcile.framework_id());
@@ -6367,6 +6383,10 @@ void Slave::__recover(const Future<Nothing>& future)
     detection = detector->detect()
       .onAny(defer(self(), &Slave::detected, lambda::_1));
 
+    // Start listening for messages from the resource provider manager.
+    resourceProviderManager.messages().get().onAny(
+        defer(self(), &Self::handleResourceProviderMessage, lambda::_1));
+
     // Forward oversubscribed resources.
     forwardOversubscribed();
 
@@ -6559,6 +6579,84 @@ void Slave::_forwardOversubscribed(const Future<Resources>& oversubscribable)
 }
 
 
+void Slave::handleResourceProviderMessage(
+    const Future<ResourceProviderMessage>& message)
+{
+  // Ignore terminal messages which are not ready. These
+  // can arise e.g., if the `Future` was discarded.
+  if (!message.isReady()) {
+    LOG(ERROR) << "Last resource provider message became terminal before "
+                  "becoming ready: "
+               << (message.isFailed() ? message.failure() : "future discarded");
+
+    // Wait for the next message.
+    resourceProviderManager.messages().get()
+      .onAny(defer(self(), &Self::handleResourceProviderMessage, lambda::_1));
+
+    return;
+  }
+
+  LOG(INFO) << "Handling resource provider message '" << message.get() << "'";
+
+  switch(message->type) {
+    case ResourceProviderMessage::Type::UPDATE_TOTAL_RESOURCES: {
+      CHECK_SOME(message->updateTotalResources);
+
+      const Resources& newTotal = message->updateTotalResources->total;
+
+      const ResourceProviderID& resourceProviderId =
+        message->updateTotalResources->id;
+
+      const Resources oldTotal =
+        totalResources.filter([&resourceProviderId](const Resource& resource) {
+          return resource.provider_id() == resourceProviderId;
+        });
+
+      // Ignore the update if it contained no new information.
+      if (newTotal == oldTotal) {
+        break;
+      }
+
+      totalResources -= oldTotal;
+      totalResources += newTotal;
+
+      // Send the updated resources to the master if the agent is running. Note
+      // that since we have already updated our copy of the latest resource
+      // provider resources, it is safe to consume this message and wait for the
+      // next one; even if we do not send the update to the master right now, an
+      // update will be send once the agent reregisters.
+      switch (state) {
+        case RECOVERING:
+        case DISCONNECTED:
+        case TERMINATING: {
+          break;
+        }
+        case RUNNING: {
+          LOG(INFO) << "Forwarding new total resources " << totalResources;
+
+          // Inform the master that the total capacity of this agent has
+          // changed.
+          UpdateSlaveMessage updateSlaveMessage;
+          updateSlaveMessage.mutable_slave_id()->CopyFrom(info.id());
+          updateSlaveMessage.mutable_resource_categories()->set_total(true);
+          updateSlaveMessage.mutable_total_resources()->CopyFrom(
+              totalResources);
+
+          send(master.get(), updateSlaveMessage);
+
+          break;
+        }
+      }
+      break;
+    }
+  }
+
+  // Wait for the next message.
+  resourceProviderManager.messages().get()
+    .onAny(defer(self(), &Self::handleResourceProviderMessage, lambda::_1));
+}
+
+
 void Slave::qosCorrections()
 {
   qosController->corrections()

http://git-wip-us.apache.org/repos/asf/mesos/blob/2af9a5b0/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index aea1e94..9cfb006 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -524,6 +524,9 @@ private:
   void _forwardOversubscribed(
       const process::Future<Resources>& oversubscribable);
 
+  void handleResourceProviderMessage(
+      const process::Future<ResourceProviderMessage>& message);
+
   // Gauge methods.
   double _frameworks_active()
   {
@@ -568,8 +571,8 @@ private:
   // Resources that are checkpointed by the slave.
   Resources checkpointedResources;
 
-  // The current total resources of the agent, i.e.,
-  // `info.resources()` with checkpointed resources applied.
+  // The current total resources of the agent, i.e., `info.resources()` with
+  // checkpointed resources applied and resource provider resources.
   Resources totalResources;
 
   Option<process::UPID> master;

http://git-wip-us.apache.org/repos/asf/mesos/blob/2af9a5b0/src/tests/oversubscription_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/oversubscription_tests.cpp b/src/tests/oversubscription_tests.cpp
index cd98b8f..09b4a42 100644
--- a/src/tests/oversubscription_tests.cpp
+++ b/src/tests/oversubscription_tests.cpp
@@ -289,6 +289,13 @@ TEST_F(OversubscriptionTest, ForwardUpdateSlaveMessage)
   Future<SlaveRegisteredMessage> slaveRegistered =
     FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
 
+  Future<UpdateSlaveMessage> updateTotal, updateOversubscribed;
+  {
+    ::testing::InSequence dummy;
+    updateTotal = FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+    updateOversubscribed = FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+  }
+
   MockResourceEstimator resourceEstimator;
 
   EXPECT_CALL(resourceEstimator, initialize(_));
@@ -307,26 +314,28 @@ TEST_F(OversubscriptionTest, ForwardUpdateSlaveMessage)
 
   AWAIT_READY(slaveRegistered);
 
-  Future<UpdateSlaveMessage> update =
-    FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+  AWAIT_READY(updateTotal);
+  ASSERT_TRUE(updateTotal->has_resource_categories());
+  ASSERT_TRUE(updateTotal->resource_categories().has_total());
+  ASSERT_TRUE(updateTotal->resource_categories().total());
 
   Clock::pause();
   // No update should be sent until there is an estimate.
   Clock::advance(flags.oversubscribed_resources_interval);
   Clock::settle();
 
-  ASSERT_FALSE(update.isReady());
+  ASSERT_FALSE(updateOversubscribed.isReady());
 
   // Inject an estimation of oversubscribable resources.
   Resources resources = createRevocableResources("cpus", "1");
   estimations.put(resources);
 
-  AWAIT_READY(update);
+  AWAIT_READY(updateOversubscribed);
 
-  EXPECT_TRUE(update->has_resource_categories());
-  EXPECT_TRUE(update->resource_categories().has_oversubscribed());
-  EXPECT_TRUE(update->resource_categories().oversubscribed());
-  EXPECT_EQ(update->oversubscribed_resources(), resources);
+  EXPECT_TRUE(updateOversubscribed->has_resource_categories());
+  EXPECT_TRUE(updateOversubscribed->resource_categories().has_oversubscribed());
+  EXPECT_TRUE(updateOversubscribed->resource_categories().oversubscribed());
+  EXPECT_EQ(updateOversubscribed->oversubscribed_resources(), resources);
 
   // Ensure the metric is updated.
   JSON::Object metrics = Metrics();
@@ -334,7 +343,7 @@ TEST_F(OversubscriptionTest, ForwardUpdateSlaveMessage)
       1u,
       metrics.values.count("master/messages_update_slave"));
   ASSERT_EQ(
-      1u,
+      2u,
       metrics.values["master/messages_update_slave"]);
 
   ASSERT_EQ(
@@ -901,6 +910,7 @@ TEST_F(OversubscriptionTest, Reregistration)
 
   Clock::advance(agentFlags.registration_backoff_factor);
   AWAIT_READY(slaveRegistered);
+
   AWAIT_READY(update);
   ASSERT_TRUE(update->has_resource_categories());
   ASSERT_TRUE(update->resource_categories().has_oversubscribed());
@@ -920,6 +930,7 @@ TEST_F(OversubscriptionTest, Reregistration)
   // Clock::settle();
   Clock::advance(agentFlags.registration_backoff_factor);
   AWAIT_READY(slaveReregistered);
+
   AWAIT_READY(update);
   EXPECT_TRUE(update->has_resource_categories());
   EXPECT_TRUE(update->resource_categories().has_oversubscribed());

http://git-wip-us.apache.org/repos/asf/mesos/blob/2af9a5b0/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index 2ff6dab..ff0bc42 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -31,6 +31,8 @@
 
 #include <mesos/authentication/http/basic_authenticator_factory.hpp>
 
+#include <mesos/v1/resource_provider/resource_provider.hpp>
+
 #include <process/clock.hpp>
 #include <process/collect.hpp>
 #include <process/future.hpp>
@@ -41,6 +43,8 @@
 #include <process/reap.hpp>
 #include <process/subprocess.hpp>
 
+#include <process/ssl/flags.hpp>
+
 #include <stout/hashset.hpp>
 #include <stout/json.hpp>
 #include <stout/none.hpp>
@@ -8491,6 +8495,121 @@ TEST_P(DefaultContainerDNSFlagTest, ValidateFlag)
   }
 }
 
+
+// This test checks that when a resource provider subscribes with the
+// agent's resource provider manager, the agent send an
+// `UpdateSlaveMessage` reflecting the updated capacity.
+//
+// TODO(bbannier): We should also add tests for the agent behavior
+// with resource providers where the agent ultimately resends the
+// previous total when the master fails over, of for the interaction
+// with the usual oversubscription protocol (oversubscribed resources
+// vs. updates of total).
+TEST_F(SlaveTest, ResourceProviderSubscribe)
+{
+  Clock::pause();
+
+  // Start an agent an a master.
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+  // Specify the agent resources so we can check the reported total later.
+  slave::Flags slaveFlags = CreateSlaveFlags();
+  slaveFlags.authenticate_http_readwrite = false;
+  slaveFlags.resources = "cpus:2;mem:512;disk:512;ports:[]";
+
+  StandaloneMasterDetector detector(master.get()->pid);
+  Try<Owned<cluster::Slave>> slave = StartSlave(&detector, slaveFlags);
+  ASSERT_SOME(slave);
+
+  Clock::advance(slaveFlags.registration_backoff_factor);
+  AWAIT_READY(slaveRegisteredMessage);
+
+  // Register a local resource provider with the agent.
+  v1::MockResourceProvider resourceProvider;
+
+  Future<Nothing> connected;
+  EXPECT_CALL(resourceProvider, connected())
+    .WillOnce(FutureSatisfy(&connected));
+
+  string scheme = "http";
+
+#ifdef USE_SSL_SOCKET
+  if (process::network::openssl::flags().enabled) {
+    scheme = "https";
+  }
+#endif
+
+  process::http::URL url(
+      scheme,
+      slave.get()->pid.address.ip,
+      slave.get()->pid.address.port,
+      slave.get()->pid.id + "/api/v1/resource_provider");
+
+  Owned<EndpointDetector> endpointDetector(new ConstantEndpointDetector(url));
+
+  resourceProvider.start(
+      endpointDetector,
+      ContentType::PROTOBUF,
+      v1::DEFAULT_CREDENTIAL);
+
+  AWAIT_READY(connected);
+
+  Future<mesos::v1::resource_provider::Event::Subscribed> subscribed;
+  EXPECT_CALL(resourceProvider, subscribed(_))
+    .WillOnce(FutureArg<0>(&subscribed));
+
+  Future<UpdateSlaveMessage> updateSlaveMessage =
+    FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+  mesos::v1::resource_provider::Call call;
+  call.set_type(mesos::v1::resource_provider::Call::SUBSCRIBE);
+
+  mesos::v1::resource_provider::Call::Subscribe* subscribe =
+    call.mutable_subscribe();
+
+  v1::Resources resourceProviderResources =
+    v1::Resources::parse("disk:8096").get();
+
+  subscribe->mutable_resources()->CopyFrom(resourceProviderResources);
+
+  mesos::v1::ResourceProviderInfo* info =
+    subscribe->mutable_resource_provider_info();
+
+  info->set_type("org.apache.mesos.resource_provider.test");
+  info->set_name("test");
+
+  resourceProvider.send(call);
+
+  // The subscription event contains the assigned resource provider id.
+  AWAIT_READY(subscribed);
+
+  const mesos::v1::ResourceProviderID& resourceProviderId =
+    subscribed->provider_id();
+
+  AWAIT_READY(updateSlaveMessage);
+
+  EXPECT_TRUE(updateSlaveMessage->has_resource_categories());
+  EXPECT_TRUE(updateSlaveMessage->resource_categories().has_total());
+  EXPECT_TRUE(updateSlaveMessage->resource_categories().total());
+
+  // We expect the updated agent total to contain both the resources of the
+  // agent and of the newly subscribed resource provider. The resources from the
+  // resource provider have a matching `ResourceProviderId` set.
+  Resources expectedResources =
+    Resources::parse(slaveFlags.resources.get()).get();
+
+  foreach (v1::Resource resource, resourceProviderResources) {
+    resource.mutable_provider_id()->CopyFrom(resourceProviderId);
+    expectedResources += devolve(resource);
+  }
+
+  EXPECT_EQ(expectedResources, updateSlaveMessage->total_resources());
+}
+
 } // namespace tests {
 } // namespace internal {
 } // namespace mesos {