You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bb...@apache.org on 2017/10/05 15:30:42 UTC

[4/5] mesos git commit: Implemented a registrar for resource provider manager state.

Implemented a registrar for resource provider manager state.

This patch adds a registry and registrar interface for resource
provider managers. The registrar interface is modelled after the
master registrar and supports similar operations. Currently a single,
LevelDB-backed registrar is implemented which we plan to use for
resource provider managers in agents.

Current the registry allows to add and remove resource provider IDs.

Review: https://reviews.apache.org/r/61528/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/8db88af0
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/8db88af0
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/8db88af0

Branch: refs/heads/master
Commit: 8db88af08763f489d65f7b3fa2e372f195fa4736
Parents: 2af9a5b
Author: Benjamin Bannier <bb...@gmail.com>
Authored: Thu Aug 3 08:53:20 2017 +0200
Committer: Benjamin Bannier <bb...@apache.org>
Committed: Thu Oct 5 16:50:43 2017 +0200

----------------------------------------------------------------------
 src/CMakeLists.txt                            |   2 +
 src/Makefile.am                               |   6 +
 src/resource_provider/registrar.cpp           | 375 +++++++++++++++++++++
 src/resource_provider/registrar.hpp           | 124 +++++++
 src/resource_provider/registry.hpp            |  24 ++
 src/resource_provider/registry.proto          |  38 +++
 src/slave/paths.cpp                           |  11 +
 src/slave/paths.hpp                           |   5 +
 src/tests/resource_provider_manager_tests.cpp |  49 +++
 9 files changed, 634 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/8db88af0/src/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 5677933..1a0dff3 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -70,6 +70,7 @@ PROTOC_GENERATE(INTERNAL TARGET slave/containerizer/mesos/isolators/network/cni/
 PROTOC_GENERATE(INTERNAL TARGET slave/containerizer/mesos/isolators/docker/volume/state)
 PROTOC_GENERATE(INTERNAL TARGET slave/containerizer/mesos/provisioner/docker/message)
 PROTOC_GENERATE(INTERNAL TARGET master/registry)
+PROTOC_GENERATE(INTERNAL TARGET resource_provider/registry)
 
 
 # BUILD PROTOBUFS.
@@ -466,6 +467,7 @@ set(RESOURCE_PROVIDER_SRC
   resource_provider/driver.cpp
   resource_provider/local.cpp
   resource_provider/manager.cpp
+  resource_provider/registrar.cpp
   resource_provider/validation.cpp
   resource_provider/storage/provider.cpp)
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/8db88af0/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index 2b5b88d..da8af91 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -364,6 +364,8 @@ CXX_PROTOS +=								\
   messages/flags.pb.h							\
   messages/messages.pb.cc						\
   messages/messages.pb.h						\
+  resource_provider/registry.pb.cc					\
+  resource_provider/registry.pb.h					\
   slave/containerizer/mesos/provisioner/docker/message.pb.cc		\
   slave/containerizer/mesos/provisioner/docker/message.pb.h		\
   slave/containerizer/mesos/isolators/docker/volume/state.pb.cc		\
@@ -995,6 +997,7 @@ libmesos_no_3rdparty_la_SOURCES +=					\
   resource_provider/driver.cpp						\
   resource_provider/local.cpp						\
   resource_provider/manager.cpp						\
+  resource_provider/registrar.cpp					\
   resource_provider/validation.cpp					\
   resource_provider/storage/provider.cpp				\
   sched/sched.cpp							\
@@ -1137,6 +1140,9 @@ libmesos_no_3rdparty_la_SOURCES +=					\
   resource_provider/local.hpp						\
   resource_provider/manager.hpp						\
   resource_provider/message.hpp						\
+  resource_provider/registrar.hpp					\
+  resource_provider/registry.hpp					\
+  resource_provider/registry.proto					\
   resource_provider/validation.hpp					\
   resource_provider/storage/provider.hpp				\
   sched/constants.hpp							\

http://git-wip-us.apache.org/repos/asf/mesos/blob/8db88af0/src/resource_provider/registrar.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/registrar.cpp b/src/resource_provider/registrar.cpp
new file mode 100644
index 0000000..5dfcb89
--- /dev/null
+++ b/src/resource_provider/registrar.cpp
@@ -0,0 +1,375 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "resource_provider/registrar.hpp"
+
+#include <algorithm>
+#include <deque>
+#include <string>
+#include <utility>
+
+#include <glog/logging.h>
+
+#include <mesos/type_utils.hpp>
+
+#include <mesos/state/in_memory.hpp>
+
+#ifndef __WINDOWS__
+#include <mesos/state/leveldb.hpp>
+#endif // __WINDOWS__
+
+#include <mesos/state/protobuf.hpp>
+
+#include <process/defer.hpp>
+#include <process/process.hpp>
+
+#include <stout/none.hpp>
+#include <stout/nothing.hpp>
+#include <stout/option.hpp>
+#include <stout/path.hpp>
+#include <stout/unreachable.hpp>
+
+#include "slave/paths.hpp"
+
+
+using std::deque;
+using std::string;
+using std::unique_ptr;
+
+using mesos::resource_provider::registry::Registry;
+using mesos::resource_provider::registry::ResourceProvider;
+
+using mesos::state::InMemoryStorage;
+
+#ifndef __WINDOWS__
+using mesos::state::LevelDBStorage;
+#endif // __WINDOWS__
+
+using mesos::state::Storage;
+
+using mesos::state::protobuf::Variable;
+
+using process::Failure;
+using process::Future;
+using process::Owned;
+using process::Process;
+using process::Promise;
+using process::defer;
+using process::spawn;
+using process::terminate;
+using process::wait;
+
+
+namespace slave = mesos::internal::slave;
+
+namespace mesos {
+namespace resource_provider {
+
+Try<bool> Registrar::Operation::operator()(Registry* registry)
+{
+  Try<bool> result = perform(registry);
+
+  success = !result.isError();
+
+  return result;
+}
+
+
+bool Registrar::Operation::set()
+{
+  return Promise<bool>::set(success);
+}
+
+
+Try<Owned<Registrar>> Registrar::create(
+    const slave::Flags& slaveFlags,
+    const SlaveID& slaveId)
+{
+  return new AgentRegistrar(slaveFlags, slaveId);
+}
+
+
+AdmitResourceProvider::AdmitResourceProvider(const ResourceProviderID& _id)
+  : id(_id) {}
+
+
+Try<bool> AdmitResourceProvider::perform(Registry* registry)
+{
+  if (std::find_if(
+          registry->resource_providers().begin(),
+          registry->resource_providers().end(),
+          [this](const ResourceProvider& resourceProvider) {
+            return resourceProvider.id() == this->id;
+          }) != registry->resource_providers().end()) {
+    return Error("Resource provider already admitted");
+  }
+
+  ResourceProvider resourceProvider;
+  resourceProvider.mutable_id()->CopyFrom(id);
+
+  registry->add_resource_providers()->CopyFrom(resourceProvider);
+
+  return true; // Mutation.
+}
+
+
+RemoveResourceProvider::RemoveResourceProvider(const ResourceProviderID& _id)
+  : id(_id) {}
+
+
+Try<bool> RemoveResourceProvider::perform(Registry* registry)
+{
+  auto pos = std::find_if(
+      registry->resource_providers().begin(),
+      registry->resource_providers().end(),
+      [this](const ResourceProvider& resourceProvider) {
+        return resourceProvider.id() == this->id;
+      });
+
+  if (pos == registry->resource_providers().end()) {
+    return Error("Attempted to remove an unknown resource provider");
+  }
+
+  registry->mutable_resource_providers()->erase(pos);
+
+  return true; // Mutation.
+}
+
+
+class AgentRegistrarProcess : public Process<AgentRegistrarProcess>
+{
+public:
+  AgentRegistrarProcess(const slave::Flags& flags, const SlaveID& slaveId);
+
+  Future<Nothing> recover();
+
+  Future<bool> apply(Owned<Registrar::Operation> operation);
+
+  Future<bool> _apply(Owned<Registrar::Operation> operation);
+
+  void update();
+
+  void _update(
+      const Future<Option<Variable<Registry>>>& store,
+      const Registry& updatedRegistry,
+      deque<Owned<Registrar::Operation>> applied);
+
+private:
+  Owned<Storage> storage;
+
+  // Use fully qualified type for `State` to disambiguate with `State`
+  // enumeration in `ProcessBase`.
+  mesos::state::protobuf::State state;
+
+  Option<Future<Nothing>> recovered;
+  Option<Registry> registry;
+  Option<Variable<Registry>> variable;
+
+  Option<Error> error;
+
+  deque<Owned<Registrar::Operation>> operations;
+
+  bool updating = false;
+
+  static Owned<Storage> createStorage(const std::string& path);
+};
+
+
+Owned<Storage> AgentRegistrarProcess::createStorage(const std::string& path)
+{
+  // The registrar uses LevelDB as underlying storage. Since LevelDB
+  // is currently not supported on Windows (see MESOS-5932), we fall
+  // back to in-memory storage there.
+  //
+  // TODO(bbannier): Remove this Windows workaround once MESOS-5932 is fixed.
+#ifndef __WINDOWS__
+  return Owned<Storage>(new LevelDBStorage(path));
+#else
+  LOG(WARNING)
+    << "Persisting resource provider manager state is not supported on Windows";
+  return Owned<Storage>(new InMemoryStorage());
+#endif // __WINDOWS__
+}
+
+
+AgentRegistrarProcess::AgentRegistrarProcess(
+    const slave::Flags& flags, const SlaveID& slaveId)
+  : ProcessBase(process::ID::generate("resource-provider-agent-registrar")),
+    storage(createStorage(slave::paths::getResourceProviderRegistryPath(
+        flags.work_dir, slaveId))),
+    state(storage.get()) {}
+
+
+Future<Nothing> AgentRegistrarProcess::recover()
+{
+  constexpr char NAME[] = "RESOURCE_PROVIDER_REGISTRAR";
+
+  if (recovered.isNone()) {
+    recovered = state.fetch<Registry>(NAME).then(
+        defer(self(), [this](const Variable<Registry>& recovery) {
+          registry = recovery.get();
+          variable = recovery;
+
+          return Nothing();
+        }));
+  }
+
+  return recovered.get();
+}
+
+
+Future<bool> AgentRegistrarProcess::apply(Owned<Registrar::Operation> operation)
+{
+  if (recovered.isNone()) {
+    return Failure("Attempted to apply the operation before recovering");
+  }
+
+  return recovered->then(defer(self(), &Self::_apply, std::move(operation)));
+}
+
+
+Future<bool> AgentRegistrarProcess::_apply(
+    Owned<Registrar::Operation> operation)
+{
+  if (error.isSome()) {
+    return Failure(error.get());
+  }
+
+  operations.push_back(std::move(operation));
+
+  Future<bool> future = operations.back()->future();
+  if (!updating) {
+    update();
+  }
+
+  return future;
+}
+
+
+void AgentRegistrarProcess::update()
+{
+  CHECK(!updating);
+  CHECK_NONE(error);
+
+  if (operations.empty()) {
+    return; // No-op.
+  }
+
+  updating = true;
+
+  CHECK_SOME(registry);
+  Registry updatedRegistry = registry.get();
+
+  foreach (Owned<Registrar::Operation>& operation, operations) {
+    (*operation)(&updatedRegistry);
+  }
+
+  // Serialize updated registry.
+  CHECK_SOME(variable);
+
+  Future<Option<Variable<Registry>>> store =
+    state.store(variable->mutate(updatedRegistry));
+
+  store.onAny(defer(
+      self(),
+      &Self::_update,
+      lambda::_1,
+      updatedRegistry,
+      std::move(operations)));
+
+  operations.clear();
+}
+
+
+void AgentRegistrarProcess::_update(
+    const Future<Option<Variable<Registry>>>& store,
+    const Registry& updatedRegistry,
+    deque<Owned<Registrar::Operation>> applied)
+{
+  updating = false;
+  // Abort if the storage operation did not succeed.
+  if (!store.isReady() || store.get().isNone()) {
+    string message = "Failed to update registry: ";
+
+    if (store.isFailed()) {
+      message += store.failure();
+    } else if (store.isDiscarded()) {
+      message += "discarded";
+    } else {
+      message += "version mismatch";
+    }
+
+    while (!applied.empty()) {
+      applied.front()->fail(message);
+      applied.pop_front();
+    }
+
+    error = Error(message);
+
+    LOG(ERROR) << "Registrar aborting: " << message;
+
+    return;
+  }
+
+  variable = store->get();
+  registry = updatedRegistry;
+
+  // Remove the operations.
+  while (!applied.empty()) {
+    Owned<Registrar::Operation> operation = applied.front();
+    applied.pop_front();
+
+    operation->set();
+  }
+
+  if (!operations.empty()) {
+    update();
+  }
+}
+
+
+AgentRegistrar::AgentRegistrar(
+    const slave::Flags& slaveFlags,
+    const SlaveID& slaveId)
+  : process(new AgentRegistrarProcess(slaveFlags, slaveId))
+{
+  process::spawn(process.get(), false);
+}
+
+
+AgentRegistrar::~AgentRegistrar()
+{
+  process::terminate(*process);
+  process::wait(*process);
+}
+
+
+Future<Nothing> AgentRegistrar::recover()
+{
+  return dispatch(process.get(), &AgentRegistrarProcess::recover);
+}
+
+
+Future<bool> AgentRegistrar::apply(Owned<Operation> operation)
+{
+  return dispatch(
+      process.get(),
+      &AgentRegistrarProcess::apply,
+      std::move(operation));
+}
+
+} // namespace resource_provider {
+} // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/8db88af0/src/resource_provider/registrar.hpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/registrar.hpp b/src/resource_provider/registrar.hpp
new file mode 100644
index 0000000..40c08c7
--- /dev/null
+++ b/src/resource_provider/registrar.hpp
@@ -0,0 +1,124 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef __RESOURCE_PROVIDER_REGISTRAR_HPP__
+#define __RESOURCE_PROVIDER_REGISTRAR_HPP__
+
+#include <memory>
+
+#include <process/future.hpp>
+#include <process/owned.hpp>
+
+#include <stout/nothing.hpp>
+#include <stout/try.hpp>
+
+#include "resource_provider/registry.hpp"
+
+#include "slave/flags.hpp"
+
+
+namespace mesos {
+namespace resource_provider {
+
+class Registrar
+{
+public:
+  // Defines an abstraction for operations that can be applied on the
+  // Registry.
+  // TODO(xujyan): Make Operation generic so that we can apply them
+  // against a generic "batch operation applier" abstraction, see TODO
+  // in master/registrar.hpp for more details.
+  class Operation : public process::Promise<bool>
+  {
+  public:
+    virtual ~Operation() = default;
+
+    // Attempts to invoke the operation on the registry object.
+    //
+    // Returns whether the operation mutates 'registry', or an error if
+    // the operation cannot be applied successfully.
+    Try<bool> operator()(registry::Registry* registry);
+
+    // Sets the promise based on whether the operation was successful.
+    bool set();
+
+  protected:
+    virtual Try<bool> perform(registry::Registry* registry) = 0;
+
+  private:
+    bool success = false;
+  };
+
+  // Create a registry on top of an agent's persistent state.
+  static Try<process::Owned<Registrar>> create(
+      const mesos::internal::slave::Flags& slaveFlags,
+      const SlaveID& slaveId);
+
+  virtual ~Registrar() = default;
+
+  virtual process::Future<Nothing> recover() = 0;
+  virtual process::Future<bool> apply(process::Owned<Operation> operation) = 0;
+};
+
+
+class AdmitResourceProvider : public Registrar::Operation
+{
+public:
+  explicit AdmitResourceProvider(const ResourceProviderID& id);
+
+private:
+  Try<bool> perform(registry::Registry* registry) override;
+
+  ResourceProviderID id;
+};
+
+
+class RemoveResourceProvider : public Registrar::Operation
+{
+public:
+  explicit RemoveResourceProvider(const ResourceProviderID& id);
+
+private:
+  Try<bool> perform(registry::Registry* registry) override;
+
+  ResourceProviderID id;
+};
+
+
+class AgentRegistrarProcess;
+
+
+class AgentRegistrar : public Registrar
+{
+public:
+  AgentRegistrar(
+      const mesos::internal::slave::Flags& slaveFlags,
+      const SlaveID& slaveId);
+
+  ~AgentRegistrar() override;
+
+  process::Future<Nothing> recover() override;
+
+  process::Future<bool> apply(process::Owned<Operation> operation) override;
+
+private:
+  std::unique_ptr<AgentRegistrarProcess> process;
+};
+
+} // namespace resource_provider {
+} // namespace mesos {
+
+#endif // __RESOURCE_PROVIDER_REGISTRAR_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/8db88af0/src/resource_provider/registry.hpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/registry.hpp b/src/resource_provider/registry.hpp
new file mode 100644
index 0000000..048cd6b
--- /dev/null
+++ b/src/resource_provider/registry.hpp
@@ -0,0 +1,24 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+
+#ifndef __RESOURCE_PROVIDER_REGISTRY_HPP__
+#define __RESOURCE_PROVIDER_REGISTRY_HPP__
+
+// ONLY USEFUL AFTER RUNNING PROTOC.
+#include <resource_provider/registry.pb.h>
+
+#endif // __RESOURCE_PROVIDER_REGISTRY_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/8db88af0/src/resource_provider/registry.proto
----------------------------------------------------------------------
diff --git a/src/resource_provider/registry.proto b/src/resource_provider/registry.proto
new file mode 100644
index 0000000..14bd433
--- /dev/null
+++ b/src/resource_provider/registry.proto
@@ -0,0 +1,38 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+syntax = "proto2";
+
+import "mesos/mesos.proto";
+
+package mesos.resource_provider.registry;
+
+option java_package = "org.apache.mesos.resource_provider.registry";
+option java_outer_classname = "Protos";
+
+// NOTE: This object defines wrappers around existing objects in case
+// the Registry wishes to store more information about the wrapped objects
+// in the future.
+
+message ResourceProvider {
+  required ResourceProviderID id = 1;
+}
+
+
+// A top level object that is managed by the Registrar and persisted.
+message Registry {
+  repeated ResourceProvider resource_providers = 1;
+}

http://git-wip-us.apache.org/repos/asf/mesos/blob/8db88af0/src/slave/paths.cpp
----------------------------------------------------------------------
diff --git a/src/slave/paths.cpp b/src/slave/paths.cpp
index e8724bf..fd54652 100644
--- a/src/slave/paths.cpp
+++ b/src/slave/paths.cpp
@@ -70,6 +70,7 @@ const char SLAVES_DIR[] = "slaves";
 const char FRAMEWORKS_DIR[] = "frameworks";
 const char EXECUTORS_DIR[] = "executors";
 const char EXECUTOR_RUNS_DIR[] = "runs";
+const char RESOURCE_PROVIDER_REGISTRY[] = "resource_provider_registry";
 
 
 Try<ExecutorRunPath> parseExecutorRunPath(
@@ -447,6 +448,16 @@ string getTaskUpdatesPath(
 }
 
 
+string getResourceProviderRegistryPath(
+    const string& rootDir,
+    const SlaveID& slaveId)
+{
+  return path::join(
+      getSlavePath(getMetaRootDir(rootDir), slaveId),
+      RESOURCE_PROVIDER_REGISTRY);
+}
+
+
 string getResourcesInfoPath(
     const string& rootDir)
 {

http://git-wip-us.apache.org/repos/asf/mesos/blob/8db88af0/src/slave/paths.hpp
----------------------------------------------------------------------
diff --git a/src/slave/paths.hpp b/src/slave/paths.hpp
index d021e6b..f000508 100644
--- a/src/slave/paths.hpp
+++ b/src/slave/paths.hpp
@@ -284,6 +284,11 @@ std::string getTaskUpdatesPath(
     const TaskID& taskId);
 
 
+std::string getResourceProviderRegistryPath(
+    const std::string& rootDir,
+    const SlaveID& slaveId);
+
+
 std::string getResourcesInfoPath(
     const std::string& rootDir);
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/8db88af0/src/tests/resource_provider_manager_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/resource_provider_manager_tests.cpp b/src/tests/resource_provider_manager_tests.cpp
index 3bc56b5..562aff2 100644
--- a/src/tests/resource_provider_manager_tests.cpp
+++ b/src/tests/resource_provider_manager_tests.cpp
@@ -47,6 +47,7 @@
 #include "internal/devolve.hpp"
 
 #include "resource_provider/manager.hpp"
+#include "resource_provider/registrar.hpp"
 
 #include "slave/slave.hpp"
 
@@ -58,6 +59,10 @@ using mesos::internal::slave::Slave;
 
 using mesos::master::detector::MasterDetector;
 
+using mesos::resource_provider::AdmitResourceProvider;
+using mesos::resource_provider::Registrar;
+using mesos::resource_provider::RemoveResourceProvider;
+
 using mesos::v1::resource_provider::Call;
 using mesos::v1::resource_provider::Event;
 
@@ -339,6 +344,50 @@ TEST_P(ResourceProviderManagerHttpApiTest, AgentEndpoint)
   EXPECT_FALSE(event->get().subscribed().provider_id().value().empty());
 }
 
+
+class ResourceProviderRegistrarTest : public tests::MesosTest {};
+
+
+// Test that the agent resource provider registrar works as expected.
+TEST_F(ResourceProviderRegistrarTest, AgentRegistrar)
+{
+  ResourceProviderID resourceProviderId;
+  resourceProviderId.set_value("foo");
+
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+
+  const slave::Flags flags = CreateSlaveFlags();
+
+  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), master.get()->pid, _);
+
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags);
+  ASSERT_SOME(slave);
+
+  AWAIT_READY(slaveRegisteredMessage);
+
+  Try<Owned<Registrar>> registrar =
+    Registrar::create(flags, slaveRegisteredMessage->slave_id());
+
+  ASSERT_SOME(registrar);
+  ASSERT_NE(nullptr, registrar->get());
+
+  // Applying operations on a not yet recovered registrar fails.
+  AWAIT_FAILED(registrar.get()->apply(Owned<Registrar::Operation>(
+      new AdmitResourceProvider(resourceProviderId))));
+
+  AWAIT_READY(registrar.get()->recover());
+
+  AWAIT_READY(registrar.get()->apply(Owned<Registrar::Operation>(
+      new AdmitResourceProvider(resourceProviderId))));
+
+  AWAIT_READY(registrar.get()->apply(Owned<Registrar::Operation>(
+      new RemoveResourceProvider(resourceProviderId))));
+}
+
 } // namespace tests {
 } // namespace internal {
 } // namespace mesos {