You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bb...@apache.org on 2017/10/05 15:30:42 UTC
[4/5] mesos git commit: Implemented a registrar for resource provider
manager state.
Implemented a registrar for resource provider manager state.
This patch adds a registry and registrar interface for resource
provider managers. The registrar interface is modelled after the
master registrar and supports similar operations. Currently a single,
LevelDB-backed registrar is implemented which we plan to use for
resource provider managers in agents.
Current the registry allows to add and remove resource provider IDs.
Review: https://reviews.apache.org/r/61528/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/8db88af0
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/8db88af0
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/8db88af0
Branch: refs/heads/master
Commit: 8db88af08763f489d65f7b3fa2e372f195fa4736
Parents: 2af9a5b
Author: Benjamin Bannier <bb...@gmail.com>
Authored: Thu Aug 3 08:53:20 2017 +0200
Committer: Benjamin Bannier <bb...@apache.org>
Committed: Thu Oct 5 16:50:43 2017 +0200
----------------------------------------------------------------------
src/CMakeLists.txt | 2 +
src/Makefile.am | 6 +
src/resource_provider/registrar.cpp | 375 +++++++++++++++++++++
src/resource_provider/registrar.hpp | 124 +++++++
src/resource_provider/registry.hpp | 24 ++
src/resource_provider/registry.proto | 38 +++
src/slave/paths.cpp | 11 +
src/slave/paths.hpp | 5 +
src/tests/resource_provider_manager_tests.cpp | 49 +++
9 files changed, 634 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/8db88af0/src/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 5677933..1a0dff3 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -70,6 +70,7 @@ PROTOC_GENERATE(INTERNAL TARGET slave/containerizer/mesos/isolators/network/cni/
PROTOC_GENERATE(INTERNAL TARGET slave/containerizer/mesos/isolators/docker/volume/state)
PROTOC_GENERATE(INTERNAL TARGET slave/containerizer/mesos/provisioner/docker/message)
PROTOC_GENERATE(INTERNAL TARGET master/registry)
+PROTOC_GENERATE(INTERNAL TARGET resource_provider/registry)
# BUILD PROTOBUFS.
@@ -466,6 +467,7 @@ set(RESOURCE_PROVIDER_SRC
resource_provider/driver.cpp
resource_provider/local.cpp
resource_provider/manager.cpp
+ resource_provider/registrar.cpp
resource_provider/validation.cpp
resource_provider/storage/provider.cpp)
http://git-wip-us.apache.org/repos/asf/mesos/blob/8db88af0/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index 2b5b88d..da8af91 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -364,6 +364,8 @@ CXX_PROTOS += \
messages/flags.pb.h \
messages/messages.pb.cc \
messages/messages.pb.h \
+ resource_provider/registry.pb.cc \
+ resource_provider/registry.pb.h \
slave/containerizer/mesos/provisioner/docker/message.pb.cc \
slave/containerizer/mesos/provisioner/docker/message.pb.h \
slave/containerizer/mesos/isolators/docker/volume/state.pb.cc \
@@ -995,6 +997,7 @@ libmesos_no_3rdparty_la_SOURCES += \
resource_provider/driver.cpp \
resource_provider/local.cpp \
resource_provider/manager.cpp \
+ resource_provider/registrar.cpp \
resource_provider/validation.cpp \
resource_provider/storage/provider.cpp \
sched/sched.cpp \
@@ -1137,6 +1140,9 @@ libmesos_no_3rdparty_la_SOURCES += \
resource_provider/local.hpp \
resource_provider/manager.hpp \
resource_provider/message.hpp \
+ resource_provider/registrar.hpp \
+ resource_provider/registry.hpp \
+ resource_provider/registry.proto \
resource_provider/validation.hpp \
resource_provider/storage/provider.hpp \
sched/constants.hpp \
http://git-wip-us.apache.org/repos/asf/mesos/blob/8db88af0/src/resource_provider/registrar.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/registrar.cpp b/src/resource_provider/registrar.cpp
new file mode 100644
index 0000000..5dfcb89
--- /dev/null
+++ b/src/resource_provider/registrar.cpp
@@ -0,0 +1,375 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "resource_provider/registrar.hpp"
+
+#include <algorithm>
+#include <deque>
+#include <string>
+#include <utility>
+
+#include <glog/logging.h>
+
+#include <mesos/type_utils.hpp>
+
+#include <mesos/state/in_memory.hpp>
+
+#ifndef __WINDOWS__
+#include <mesos/state/leveldb.hpp>
+#endif // __WINDOWS__
+
+#include <mesos/state/protobuf.hpp>
+
+#include <process/defer.hpp>
+#include <process/process.hpp>
+
+#include <stout/none.hpp>
+#include <stout/nothing.hpp>
+#include <stout/option.hpp>
+#include <stout/path.hpp>
+#include <stout/unreachable.hpp>
+
+#include "slave/paths.hpp"
+
+
+using std::deque;
+using std::string;
+using std::unique_ptr;
+
+using mesos::resource_provider::registry::Registry;
+using mesos::resource_provider::registry::ResourceProvider;
+
+using mesos::state::InMemoryStorage;
+
+#ifndef __WINDOWS__
+using mesos::state::LevelDBStorage;
+#endif // __WINDOWS__
+
+using mesos::state::Storage;
+
+using mesos::state::protobuf::Variable;
+
+using process::Failure;
+using process::Future;
+using process::Owned;
+using process::Process;
+using process::Promise;
+using process::defer;
+using process::spawn;
+using process::terminate;
+using process::wait;
+
+
+namespace slave = mesos::internal::slave;
+
+namespace mesos {
+namespace resource_provider {
+
+Try<bool> Registrar::Operation::operator()(Registry* registry)
+{
+ Try<bool> result = perform(registry);
+
+ success = !result.isError();
+
+ return result;
+}
+
+
+bool Registrar::Operation::set()
+{
+ return Promise<bool>::set(success);
+}
+
+
+Try<Owned<Registrar>> Registrar::create(
+ const slave::Flags& slaveFlags,
+ const SlaveID& slaveId)
+{
+ return new AgentRegistrar(slaveFlags, slaveId);
+}
+
+
+AdmitResourceProvider::AdmitResourceProvider(const ResourceProviderID& _id)
+ : id(_id) {}
+
+
+Try<bool> AdmitResourceProvider::perform(Registry* registry)
+{
+ if (std::find_if(
+ registry->resource_providers().begin(),
+ registry->resource_providers().end(),
+ [this](const ResourceProvider& resourceProvider) {
+ return resourceProvider.id() == this->id;
+ }) != registry->resource_providers().end()) {
+ return Error("Resource provider already admitted");
+ }
+
+ ResourceProvider resourceProvider;
+ resourceProvider.mutable_id()->CopyFrom(id);
+
+ registry->add_resource_providers()->CopyFrom(resourceProvider);
+
+ return true; // Mutation.
+}
+
+
+RemoveResourceProvider::RemoveResourceProvider(const ResourceProviderID& _id)
+ : id(_id) {}
+
+
+Try<bool> RemoveResourceProvider::perform(Registry* registry)
+{
+ auto pos = std::find_if(
+ registry->resource_providers().begin(),
+ registry->resource_providers().end(),
+ [this](const ResourceProvider& resourceProvider) {
+ return resourceProvider.id() == this->id;
+ });
+
+ if (pos == registry->resource_providers().end()) {
+ return Error("Attempted to remove an unknown resource provider");
+ }
+
+ registry->mutable_resource_providers()->erase(pos);
+
+ return true; // Mutation.
+}
+
+
+class AgentRegistrarProcess : public Process<AgentRegistrarProcess>
+{
+public:
+ AgentRegistrarProcess(const slave::Flags& flags, const SlaveID& slaveId);
+
+ Future<Nothing> recover();
+
+ Future<bool> apply(Owned<Registrar::Operation> operation);
+
+ Future<bool> _apply(Owned<Registrar::Operation> operation);
+
+ void update();
+
+ void _update(
+ const Future<Option<Variable<Registry>>>& store,
+ const Registry& updatedRegistry,
+ deque<Owned<Registrar::Operation>> applied);
+
+private:
+ Owned<Storage> storage;
+
+ // Use fully qualified type for `State` to disambiguate with `State`
+ // enumeration in `ProcessBase`.
+ mesos::state::protobuf::State state;
+
+ Option<Future<Nothing>> recovered;
+ Option<Registry> registry;
+ Option<Variable<Registry>> variable;
+
+ Option<Error> error;
+
+ deque<Owned<Registrar::Operation>> operations;
+
+ bool updating = false;
+
+ static Owned<Storage> createStorage(const std::string& path);
+};
+
+
+Owned<Storage> AgentRegistrarProcess::createStorage(const std::string& path)
+{
+ // The registrar uses LevelDB as underlying storage. Since LevelDB
+ // is currently not supported on Windows (see MESOS-5932), we fall
+ // back to in-memory storage there.
+ //
+ // TODO(bbannier): Remove this Windows workaround once MESOS-5932 is fixed.
+#ifndef __WINDOWS__
+ return Owned<Storage>(new LevelDBStorage(path));
+#else
+ LOG(WARNING)
+ << "Persisting resource provider manager state is not supported on Windows";
+ return Owned<Storage>(new InMemoryStorage());
+#endif // __WINDOWS__
+}
+
+
+AgentRegistrarProcess::AgentRegistrarProcess(
+ const slave::Flags& flags, const SlaveID& slaveId)
+ : ProcessBase(process::ID::generate("resource-provider-agent-registrar")),
+ storage(createStorage(slave::paths::getResourceProviderRegistryPath(
+ flags.work_dir, slaveId))),
+ state(storage.get()) {}
+
+
+Future<Nothing> AgentRegistrarProcess::recover()
+{
+ constexpr char NAME[] = "RESOURCE_PROVIDER_REGISTRAR";
+
+ if (recovered.isNone()) {
+ recovered = state.fetch<Registry>(NAME).then(
+ defer(self(), [this](const Variable<Registry>& recovery) {
+ registry = recovery.get();
+ variable = recovery;
+
+ return Nothing();
+ }));
+ }
+
+ return recovered.get();
+}
+
+
+Future<bool> AgentRegistrarProcess::apply(Owned<Registrar::Operation> operation)
+{
+ if (recovered.isNone()) {
+ return Failure("Attempted to apply the operation before recovering");
+ }
+
+ return recovered->then(defer(self(), &Self::_apply, std::move(operation)));
+}
+
+
+Future<bool> AgentRegistrarProcess::_apply(
+ Owned<Registrar::Operation> operation)
+{
+ if (error.isSome()) {
+ return Failure(error.get());
+ }
+
+ operations.push_back(std::move(operation));
+
+ Future<bool> future = operations.back()->future();
+ if (!updating) {
+ update();
+ }
+
+ return future;
+}
+
+
+void AgentRegistrarProcess::update()
+{
+ CHECK(!updating);
+ CHECK_NONE(error);
+
+ if (operations.empty()) {
+ return; // No-op.
+ }
+
+ updating = true;
+
+ CHECK_SOME(registry);
+ Registry updatedRegistry = registry.get();
+
+ foreach (Owned<Registrar::Operation>& operation, operations) {
+ (*operation)(&updatedRegistry);
+ }
+
+ // Serialize updated registry.
+ CHECK_SOME(variable);
+
+ Future<Option<Variable<Registry>>> store =
+ state.store(variable->mutate(updatedRegistry));
+
+ store.onAny(defer(
+ self(),
+ &Self::_update,
+ lambda::_1,
+ updatedRegistry,
+ std::move(operations)));
+
+ operations.clear();
+}
+
+
+void AgentRegistrarProcess::_update(
+ const Future<Option<Variable<Registry>>>& store,
+ const Registry& updatedRegistry,
+ deque<Owned<Registrar::Operation>> applied)
+{
+ updating = false;
+ // Abort if the storage operation did not succeed.
+ if (!store.isReady() || store.get().isNone()) {
+ string message = "Failed to update registry: ";
+
+ if (store.isFailed()) {
+ message += store.failure();
+ } else if (store.isDiscarded()) {
+ message += "discarded";
+ } else {
+ message += "version mismatch";
+ }
+
+ while (!applied.empty()) {
+ applied.front()->fail(message);
+ applied.pop_front();
+ }
+
+ error = Error(message);
+
+ LOG(ERROR) << "Registrar aborting: " << message;
+
+ return;
+ }
+
+ variable = store->get();
+ registry = updatedRegistry;
+
+ // Remove the operations.
+ while (!applied.empty()) {
+ Owned<Registrar::Operation> operation = applied.front();
+ applied.pop_front();
+
+ operation->set();
+ }
+
+ if (!operations.empty()) {
+ update();
+ }
+}
+
+
+AgentRegistrar::AgentRegistrar(
+ const slave::Flags& slaveFlags,
+ const SlaveID& slaveId)
+ : process(new AgentRegistrarProcess(slaveFlags, slaveId))
+{
+ process::spawn(process.get(), false);
+}
+
+
+AgentRegistrar::~AgentRegistrar()
+{
+ process::terminate(*process);
+ process::wait(*process);
+}
+
+
+Future<Nothing> AgentRegistrar::recover()
+{
+ return dispatch(process.get(), &AgentRegistrarProcess::recover);
+}
+
+
+Future<bool> AgentRegistrar::apply(Owned<Operation> operation)
+{
+ return dispatch(
+ process.get(),
+ &AgentRegistrarProcess::apply,
+ std::move(operation));
+}
+
+} // namespace resource_provider {
+} // namespace mesos {
http://git-wip-us.apache.org/repos/asf/mesos/blob/8db88af0/src/resource_provider/registrar.hpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/registrar.hpp b/src/resource_provider/registrar.hpp
new file mode 100644
index 0000000..40c08c7
--- /dev/null
+++ b/src/resource_provider/registrar.hpp
@@ -0,0 +1,124 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef __RESOURCE_PROVIDER_REGISTRAR_HPP__
+#define __RESOURCE_PROVIDER_REGISTRAR_HPP__
+
+#include <memory>
+
+#include <process/future.hpp>
+#include <process/owned.hpp>
+
+#include <stout/nothing.hpp>
+#include <stout/try.hpp>
+
+#include "resource_provider/registry.hpp"
+
+#include "slave/flags.hpp"
+
+
+namespace mesos {
+namespace resource_provider {
+
+class Registrar
+{
+public:
+ // Defines an abstraction for operations that can be applied on the
+ // Registry.
+ // TODO(xujyan): Make Operation generic so that we can apply them
+ // against a generic "batch operation applier" abstraction, see TODO
+ // in master/registrar.hpp for more details.
+ class Operation : public process::Promise<bool>
+ {
+ public:
+ virtual ~Operation() = default;
+
+ // Attempts to invoke the operation on the registry object.
+ //
+ // Returns whether the operation mutates 'registry', or an error if
+ // the operation cannot be applied successfully.
+ Try<bool> operator()(registry::Registry* registry);
+
+ // Sets the promise based on whether the operation was successful.
+ bool set();
+
+ protected:
+ virtual Try<bool> perform(registry::Registry* registry) = 0;
+
+ private:
+ bool success = false;
+ };
+
+ // Create a registry on top of an agent's persistent state.
+ static Try<process::Owned<Registrar>> create(
+ const mesos::internal::slave::Flags& slaveFlags,
+ const SlaveID& slaveId);
+
+ virtual ~Registrar() = default;
+
+ virtual process::Future<Nothing> recover() = 0;
+ virtual process::Future<bool> apply(process::Owned<Operation> operation) = 0;
+};
+
+
+class AdmitResourceProvider : public Registrar::Operation
+{
+public:
+ explicit AdmitResourceProvider(const ResourceProviderID& id);
+
+private:
+ Try<bool> perform(registry::Registry* registry) override;
+
+ ResourceProviderID id;
+};
+
+
+class RemoveResourceProvider : public Registrar::Operation
+{
+public:
+ explicit RemoveResourceProvider(const ResourceProviderID& id);
+
+private:
+ Try<bool> perform(registry::Registry* registry) override;
+
+ ResourceProviderID id;
+};
+
+
+class AgentRegistrarProcess;
+
+
+class AgentRegistrar : public Registrar
+{
+public:
+ AgentRegistrar(
+ const mesos::internal::slave::Flags& slaveFlags,
+ const SlaveID& slaveId);
+
+ ~AgentRegistrar() override;
+
+ process::Future<Nothing> recover() override;
+
+ process::Future<bool> apply(process::Owned<Operation> operation) override;
+
+private:
+ std::unique_ptr<AgentRegistrarProcess> process;
+};
+
+} // namespace resource_provider {
+} // namespace mesos {
+
+#endif // __RESOURCE_PROVIDER_REGISTRAR_HPP__
http://git-wip-us.apache.org/repos/asf/mesos/blob/8db88af0/src/resource_provider/registry.hpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/registry.hpp b/src/resource_provider/registry.hpp
new file mode 100644
index 0000000..048cd6b
--- /dev/null
+++ b/src/resource_provider/registry.hpp
@@ -0,0 +1,24 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+
+#ifndef __RESOURCE_PROVIDER_REGISTRY_HPP__
+#define __RESOURCE_PROVIDER_REGISTRY_HPP__
+
+// ONLY USEFUL AFTER RUNNING PROTOC.
+#include <resource_provider/registry.pb.h>
+
+#endif // __RESOURCE_PROVIDER_REGISTRY_HPP__
http://git-wip-us.apache.org/repos/asf/mesos/blob/8db88af0/src/resource_provider/registry.proto
----------------------------------------------------------------------
diff --git a/src/resource_provider/registry.proto b/src/resource_provider/registry.proto
new file mode 100644
index 0000000..14bd433
--- /dev/null
+++ b/src/resource_provider/registry.proto
@@ -0,0 +1,38 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+syntax = "proto2";
+
+import "mesos/mesos.proto";
+
+package mesos.resource_provider.registry;
+
+option java_package = "org.apache.mesos.resource_provider.registry";
+option java_outer_classname = "Protos";
+
+// NOTE: This object defines wrappers around existing objects in case
+// the Registry wishes to store more information about the wrapped objects
+// in the future.
+
+message ResourceProvider {
+ required ResourceProviderID id = 1;
+}
+
+
+// A top level object that is managed by the Registrar and persisted.
+message Registry {
+ repeated ResourceProvider resource_providers = 1;
+}
http://git-wip-us.apache.org/repos/asf/mesos/blob/8db88af0/src/slave/paths.cpp
----------------------------------------------------------------------
diff --git a/src/slave/paths.cpp b/src/slave/paths.cpp
index e8724bf..fd54652 100644
--- a/src/slave/paths.cpp
+++ b/src/slave/paths.cpp
@@ -70,6 +70,7 @@ const char SLAVES_DIR[] = "slaves";
const char FRAMEWORKS_DIR[] = "frameworks";
const char EXECUTORS_DIR[] = "executors";
const char EXECUTOR_RUNS_DIR[] = "runs";
+const char RESOURCE_PROVIDER_REGISTRY[] = "resource_provider_registry";
Try<ExecutorRunPath> parseExecutorRunPath(
@@ -447,6 +448,16 @@ string getTaskUpdatesPath(
}
+string getResourceProviderRegistryPath(
+ const string& rootDir,
+ const SlaveID& slaveId)
+{
+ return path::join(
+ getSlavePath(getMetaRootDir(rootDir), slaveId),
+ RESOURCE_PROVIDER_REGISTRY);
+}
+
+
string getResourcesInfoPath(
const string& rootDir)
{
http://git-wip-us.apache.org/repos/asf/mesos/blob/8db88af0/src/slave/paths.hpp
----------------------------------------------------------------------
diff --git a/src/slave/paths.hpp b/src/slave/paths.hpp
index d021e6b..f000508 100644
--- a/src/slave/paths.hpp
+++ b/src/slave/paths.hpp
@@ -284,6 +284,11 @@ std::string getTaskUpdatesPath(
const TaskID& taskId);
+std::string getResourceProviderRegistryPath(
+ const std::string& rootDir,
+ const SlaveID& slaveId);
+
+
std::string getResourcesInfoPath(
const std::string& rootDir);
http://git-wip-us.apache.org/repos/asf/mesos/blob/8db88af0/src/tests/resource_provider_manager_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/resource_provider_manager_tests.cpp b/src/tests/resource_provider_manager_tests.cpp
index 3bc56b5..562aff2 100644
--- a/src/tests/resource_provider_manager_tests.cpp
+++ b/src/tests/resource_provider_manager_tests.cpp
@@ -47,6 +47,7 @@
#include "internal/devolve.hpp"
#include "resource_provider/manager.hpp"
+#include "resource_provider/registrar.hpp"
#include "slave/slave.hpp"
@@ -58,6 +59,10 @@ using mesos::internal::slave::Slave;
using mesos::master::detector::MasterDetector;
+using mesos::resource_provider::AdmitResourceProvider;
+using mesos::resource_provider::Registrar;
+using mesos::resource_provider::RemoveResourceProvider;
+
using mesos::v1::resource_provider::Call;
using mesos::v1::resource_provider::Event;
@@ -339,6 +344,50 @@ TEST_P(ResourceProviderManagerHttpApiTest, AgentEndpoint)
EXPECT_FALSE(event->get().subscribed().provider_id().value().empty());
}
+
+class ResourceProviderRegistrarTest : public tests::MesosTest {};
+
+
+// Test that the agent resource provider registrar works as expected.
+TEST_F(ResourceProviderRegistrarTest, AgentRegistrar)
+{
+ ResourceProviderID resourceProviderId;
+ resourceProviderId.set_value("foo");
+
+ Try<Owned<cluster::Master>> master = StartMaster();
+ ASSERT_SOME(master);
+
+ Owned<MasterDetector> detector = master.get()->createDetector();
+
+ const slave::Flags flags = CreateSlaveFlags();
+
+ Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+ FUTURE_PROTOBUF(SlaveRegisteredMessage(), master.get()->pid, _);
+
+ Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags);
+ ASSERT_SOME(slave);
+
+ AWAIT_READY(slaveRegisteredMessage);
+
+ Try<Owned<Registrar>> registrar =
+ Registrar::create(flags, slaveRegisteredMessage->slave_id());
+
+ ASSERT_SOME(registrar);
+ ASSERT_NE(nullptr, registrar->get());
+
+ // Applying operations on a not yet recovered registrar fails.
+ AWAIT_FAILED(registrar.get()->apply(Owned<Registrar::Operation>(
+ new AdmitResourceProvider(resourceProviderId))));
+
+ AWAIT_READY(registrar.get()->recover());
+
+ AWAIT_READY(registrar.get()->apply(Owned<Registrar::Operation>(
+ new AdmitResourceProvider(resourceProviderId))));
+
+ AWAIT_READY(registrar.get()->apply(Owned<Registrar::Operation>(
+ new RemoveResourceProvider(resourceProviderId))));
+}
+
} // namespace tests {
} // namespace internal {
} // namespace mesos {