You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bb...@apache.org on 2019/08/01 10:13:24 UTC

[mesos] branch master updated: Backed `MockResourceProvider` by a process.

This is an automated email from the ASF dual-hosted git repository.

bbannier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git


The following commit(s) were added to refs/heads/master by this push:
     new d8155f8  Backed `MockResourceProvider` by a process.
d8155f8 is described below

commit d8155f8125e38d145d280331146b934c2bb7c842
Author: Benjamin Bannier <bb...@apache.org>
AuthorDate: Thu Aug 1 11:20:59 2019 +0200

    Backed `MockResourceProvider` by a process.
    
    We were passing callbacks into `MockResourceProvider` to the HTTP
    driver. Since the lifecycle of the callbacks and the mock provider were
    decoupled and these callbacks were binding the mock provider instance
    the code was not safe as written as the driver could invoke the callback
    after the provider had been destructed.
    
    This patch makes sure that the callbacks are defered to a process. We
    also dispatch a number of other functions which are strongly coupled to
    the lifecycle of the provider. We still do not hide the provider away
    completely so the provider can be mocked in tests.
    
    Review: https://reviews.apache.org/r/70728/
---
 src/tests/api_tests.cpp                         |  37 ++---
 src/tests/master_slave_reconciliation_tests.cpp |  23 +--
 src/tests/master_tests.cpp                      |  38 +++--
 src/tests/mesos.hpp                             | 177 ++++++++++++++++--------
 src/tests/operation_reconciliation_tests.cpp    |  31 +++--
 src/tests/resource_provider_manager_tests.cpp   |  86 +++++++-----
 src/tests/slave_tests.cpp                       |  80 ++++++-----
 7 files changed, 286 insertions(+), 186 deletions(-)

diff --git a/src/tests/api_tests.cpp b/src/tests/api_tests.cpp
index 0ce2d25..c209967 100644
--- a/src/tests/api_tests.cpp
+++ b/src/tests/api_tests.cpp
@@ -232,7 +232,7 @@ TEST_P(MasterAPITest, GetAgents)
   v1::Resource resource = v1::createDiskResource(
       "200", "*", None(), None(), v1::createDiskSourceRaw());
 
-  v1::MockResourceProvider resourceProvider(info, resource);
+  v1::TestResourceProvider resourceProvider(info, resource);
 
   // Start and register a resource provider.
   Owned<EndpointDetector> endpointDetector(
@@ -310,7 +310,8 @@ TEST_P(MasterAPITest, GetAgentsDisconnectedResourceProvider)
   info.set_type("org.apache.mesos.rp.test");
   info.set_name("test");
 
-  v1::MockResourceProvider resourceProvider(info, v1::Resources());
+  Owned<v1::TestResourceProvider> resourceProvider(
+    new v1::TestResourceProvider(info, v1::Resources()));
 
   // Start and register a resource provider.
   Owned<EndpointDetector> endpointDetector(
@@ -318,7 +319,7 @@ TEST_P(MasterAPITest, GetAgentsDisconnectedResourceProvider)
 
   updateSlaveMessage = FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
 
-  resourceProvider.start(std::move(endpointDetector), contentType);
+  resourceProvider->start(std::move(endpointDetector), contentType);
 
   // Wait until the agent's resources have been updated to include the
   // resource provider.
@@ -352,7 +353,7 @@ TEST_P(MasterAPITest, GetAgentsDisconnectedResourceProvider)
   updateSlaveMessage = FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
 
   // Disconnect the resource provider.
-  resourceProvider.stop();
+  resourceProvider.reset();
 
   // Wait until the agent's resources have been updated to exclude the
   // resource provider.
@@ -1194,7 +1195,7 @@ TEST_P(MasterAPITest, GetOperations)
   info.set_type("org.apache.mesos.rp.test");
   info.set_name("test");
 
-  v1::MockResourceProvider resourceProvider(
+  v1::TestResourceProvider resourceProvider(
       info,
       v1::createDiskResource(
           "200",
@@ -1269,7 +1270,7 @@ TEST_P(MasterAPITest, GetOperations)
 
   // The operation is still pending when we receive this event.
   Future<mesos::v1::resource_provider::Event::ApplyOperation> operation;
-  EXPECT_CALL(resourceProvider, applyOperation(_))
+  EXPECT_CALL(*resourceProvider.process, applyOperation(_))
     .WillOnce(FutureArg<0>(&operation));
 
   // Start an operation.
@@ -5189,8 +5190,8 @@ TEST_P(MasterAPITest, OperationUpdatesUponAgentGone)
   v1::Resource disk = v1::createDiskResource(
       "200", "*", None(), None(), v1::createDiskSourceRaw());
 
-  Owned<v1::MockResourceProvider> resourceProvider(
-      new v1::MockResourceProvider(resourceProviderInfo, v1::Resources(disk)));
+  Owned<v1::TestResourceProvider> resourceProvider(
+      new v1::TestResourceProvider(resourceProviderInfo, v1::Resources(disk)));
 
   Owned<EndpointDetector> endpointDetector(
       mesos::internal::tests::resource_provider::createEndpointDetector(
@@ -5360,8 +5361,8 @@ TEST_P(MasterAPITest, OperationUpdatesUponUnreachable)
   v1::Resource disk = v1::createDiskResource(
       "200", "*", None(), None(), v1::createDiskSourceRaw());
 
-  Owned<v1::MockResourceProvider> resourceProvider(
-      new v1::MockResourceProvider(resourceProviderInfo, v1::Resources(disk)));
+  Owned<v1::TestResourceProvider> resourceProvider(
+      new v1::TestResourceProvider(resourceProviderInfo, v1::Resources(disk)));
 
   Owned<EndpointDetector> endpointDetector(
       mesos::internal::tests::resource_provider::createEndpointDetector(
@@ -9095,7 +9096,7 @@ TEST_P(AgentAPITest, GetResourceProviders)
   v1::Resource resource = v1::createDiskResource(
       "200", "*", None(), None(), v1::createDiskSourceRaw());
 
-  v1::MockResourceProvider resourceProvider(info, resource);
+  v1::TestResourceProvider resourceProvider(info, resource);
 
   // Start and register a resource provider.
   Owned<EndpointDetector> endpointDetector(
@@ -9183,7 +9184,7 @@ TEST_P(AgentAPITest, MarkResourceProviderGone)
 
   // Start a resource provider without resources since resource
   // providers with resources cannot be marked gone.
-  v1::MockResourceProvider resourceProvider(info, v1::Resource());
+  v1::TestResourceProvider resourceProvider(info, v1::Resource());
 
   // Start and register a resource provider.
   Owned<EndpointDetector> endpointDetector(
@@ -9191,9 +9192,11 @@ TEST_P(AgentAPITest, MarkResourceProviderGone)
 
   Future<mesos::v1::resource_provider::Event::Subscribed> subscribed;
 
-  EXPECT_CALL(resourceProvider, subscribed(_))
+  EXPECT_CALL(*resourceProvider.process, subscribed(_))
     .WillOnce(DoAll(
-        Invoke(&resourceProvider, &v1::MockResourceProvider::subscribedDefault),
+        Invoke(
+            resourceProvider.process.get(),
+            &v1::TestResourceProviderProcess::subscribedDefault),
         FutureArg<0>(&subscribed)))
     .WillRepeatedly(Return());
 
@@ -9222,7 +9225,7 @@ TEST_P(AgentAPITest, MarkResourceProviderGone)
     // connection is broken will does not succeed, we might observe other
     // `disconnected` events.
     Future<Nothing> disconnected;
-    EXPECT_CALL(resourceProvider, disconnected())
+    EXPECT_CALL(*resourceProvider.process, disconnected())
       .WillOnce(FutureSatisfy(&disconnected))
       .WillRepeatedly(Return());
 
@@ -9312,7 +9315,7 @@ TEST_P(AgentAPITest, GetOperations)
   info.set_type("org.apache.mesos.rp.test");
   info.set_name("test");
 
-  v1::MockResourceProvider resourceProvider(
+  v1::TestResourceProvider resourceProvider(
       info,
       v1::createDiskResource(
           "200",
@@ -9387,7 +9390,7 @@ TEST_P(AgentAPITest, GetOperations)
 
   // The operation is still pending when we receive this event.
   Future<mesos::v1::resource_provider::Event::ApplyOperation> operation;
-  EXPECT_CALL(resourceProvider, applyOperation(_))
+  EXPECT_CALL(*resourceProvider.process, applyOperation(_))
     .WillOnce(FutureArg<0>(&operation));
 
   // Start an operation.
diff --git a/src/tests/master_slave_reconciliation_tests.cpp b/src/tests/master_slave_reconciliation_tests.cpp
index 7b6ac50..9881b9c 100644
--- a/src/tests/master_slave_reconciliation_tests.cpp
+++ b/src/tests/master_slave_reconciliation_tests.cpp
@@ -474,20 +474,26 @@ TEST_F(
   v1::Resource disk = v1::createDiskResource(
       "200", "*", None(), None(), v1::createDiskSourceRaw());
 
-  Owned<v1::MockResourceProvider> resourceProvider(
-      new v1::MockResourceProvider(resourceProviderInfo, v1::Resources(disk)));
+  Owned<v1::TestResourceProvider> resourceProvider(
+      new v1::TestResourceProvider(resourceProviderInfo, v1::Resources(disk)));
+
+  Future<v1::ResourceProviderID> resourceProviderId =
+    resourceProvider->process->id();
 
   // Make the mock resource provider answer to reconciliation events with
   // OPERATION_DROPPED operation status updates.
   auto reconcileOperations =
-    [&resourceProvider](
+    [&resourceProvider, &resourceProviderId](
         const v1::resource_provider::Event::ReconcileOperations& reconcile) {
+      // NOTE: We do not use `AWAIT_READY` here since it
+      // would deadlock with below `Invoke` invocation.
+      ASSERT_TRUE(resourceProviderId.isReady());
+
       foreach (const v1::UUID& operationUuid, reconcile.operation_uuids()) {
         v1::resource_provider::Call call;
 
         call.set_type(v1::resource_provider::Call::UPDATE_OPERATION_STATUS);
-        call.mutable_resource_provider_id()->CopyFrom(
-            resourceProvider->info.id());
+        call.mutable_resource_provider_id()->CopyFrom(resourceProviderId.get());
 
         v1::resource_provider::Call::UpdateOperationStatus*
           updateOperationStatus = call.mutable_update_operation_status();
@@ -498,18 +504,15 @@ TEST_F(
         updateOperationStatus->mutable_operation_uuid()->CopyFrom(
             operationUuid);
 
-        ASSERT_TRUE(resourceProvider->info.has_id())
-          << "Asked to reconcile before subscription was finished";
-
         updateOperationStatus->mutable_status()
           ->mutable_resource_provider_id()
-          ->CopyFrom(resourceProvider->info.id());
+          ->CopyFrom(resourceProviderId.get());
 
         resourceProvider->send(call);
       }
     };
 
-  EXPECT_CALL(*resourceProvider, reconcileOperations(_))
+  EXPECT_CALL(*resourceProvider->process, reconcileOperations(_))
     .WillOnce(Invoke(reconcileOperations));
 
   Owned<EndpointDetector> endpointDetector(
diff --git a/src/tests/master_tests.cpp b/src/tests/master_tests.cpp
index 0a1ba41..429521d 100644
--- a/src/tests/master_tests.cpp
+++ b/src/tests/master_tests.cpp
@@ -9008,8 +9008,8 @@ TEST_F(MasterTest, UpdateSlaveMessageWithPendingOffers)
   v1::Resource disk1 = v1::createDiskResource(
       "200", "*", None(), None(), v1::createDiskSourceRaw());
 
-  Owned<v1::MockResourceProvider> resourceProvider(
-      new v1::MockResourceProvider(resourceProviderInfo, v1::Resources(disk1)));
+  Owned<v1::TestResourceProvider> resourceProvider(
+      new v1::TestResourceProvider(resourceProviderInfo, v1::Resources(disk1)));
 
   // Start and register a resource provider with a single disk resources.
   Owned<EndpointDetector> endpointDetector(
@@ -9018,9 +9018,13 @@ TEST_F(MasterTest, UpdateSlaveMessageWithPendingOffers)
   resourceProvider->start(std::move(endpointDetector), ContentType::PROTOBUF);
 
   AWAIT_READY(updateSlaveMessage);
-  ASSERT_TRUE(resourceProvider->info.has_id());
 
-  disk1.mutable_provider_id()->CopyFrom(resourceProvider->info.id());
+  Future<v1::ResourceProviderID> resourceProviderId =
+    resourceProvider->process->id();
+
+  AWAIT_READY(resourceProviderId);
+
+  disk1.mutable_provider_id()->CopyFrom(resourceProviderId.get());
 
   // Start and register a framework.
   auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
@@ -9055,10 +9059,10 @@ TEST_F(MasterTest, UpdateSlaveMessageWithPendingOffers)
   // of the new resource.
   v1::Resource disk2 = v1::createDiskResource(
       "100", "*", None(), None(), v1::createDiskSourceBlock());
-  disk2.mutable_provider_id()->CopyFrom(resourceProvider->info.id());
+  disk2.mutable_provider_id()->CopyFrom(resourceProviderId.get());
 
   v1::resource_provider::Call call;
-  call.mutable_resource_provider_id()->CopyFrom(resourceProvider->info.id());
+  call.mutable_resource_provider_id()->CopyFrom(resourceProviderId.get());
   call.set_type(v1::resource_provider::Call::UPDATE_STATE);
 
   v1::resource_provider::Call::UpdateState* updateState =
@@ -9128,7 +9132,7 @@ TEST_F(MasterTest, OperationUpdateDuringFailover)
   v1::Resources resourceProviderResources = v1::createDiskResource(
       "200", "*", None(), None(), v1::createDiskSourceRaw(None(), "profile"));
 
-  v1::MockResourceProvider resourceProvider(
+  v1::TestResourceProvider resourceProvider(
       resourceProviderInfo,
       resourceProviderResources);
 
@@ -9184,7 +9188,7 @@ TEST_F(MasterTest, OperationUpdateDuringFailover)
           frameworkInfo.roles(0), DEFAULT_CREDENTIAL.principal()));
 
   Future<mesos::v1::resource_provider::Event::ApplyOperation> operation;
-  EXPECT_CALL(resourceProvider, applyOperation(_))
+  EXPECT_CALL(*resourceProvider.process, applyOperation(_))
     .WillOnce(FutureArg<0>(&operation));
 
   // Drop the operation updates for the finished operations.
@@ -9247,7 +9251,10 @@ TEST_F(MasterTest, OperationUpdateDuringFailover)
   EXPECT_CALL(sched, disconnected(&driver));
 
   // Finish the pending operation.
-  resourceProvider.operationDefault(operation.get());
+  dispatch(
+      *resourceProvider.process,
+      &v1::TestResourceProviderProcess::operationDefault,
+      operation.get());
 
   AWAIT_READY(updateOperationStatusMessage1);
   AWAIT_READY(updateOperationStatusMessage2);
@@ -9364,7 +9371,7 @@ TEST_F(MasterTest, OperationUpdateCompletedFramework)
   v1::Resources resourceProviderResources = v1::createDiskResource(
       "200", "*", None(), None(), v1::createDiskSourceRaw(None(), "profile"));
 
-  v1::MockResourceProvider resourceProvider(
+  v1::TestResourceProvider resourceProvider(
       resourceProviderInfo,
       resourceProviderResources);
 
@@ -9377,8 +9384,6 @@ TEST_F(MasterTest, OperationUpdateCompletedFramework)
 
   AWAIT_READY(updateSlaveMessage);
 
-  ASSERT_TRUE(resourceProvider.info.has_id());
-
   // Start a framework to operate on offers.
   auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
 
@@ -9448,7 +9453,7 @@ TEST_F(MasterTest, OperationUpdateCompletedFramework)
   operationId.set_value("operation");
 
   Future<mesos::v1::resource_provider::Event::ApplyOperation> applyOperation;
-  EXPECT_CALL(resourceProvider, applyOperation(_))
+  EXPECT_CALL(*resourceProvider.process, applyOperation(_))
     .WillOnce(FutureArg<0>(&applyOperation));
 
   mesos->send(v1::createCallAccept(
@@ -9470,10 +9475,13 @@ TEST_F(MasterTest, OperationUpdateCompletedFramework)
     FUTURE_PROTOBUF(AcknowledgeOperationStatusMessage(), _, slave.get()->pid);
 
   Future<Nothing> acknowledgeOperationStatus;
-  EXPECT_CALL(resourceProvider, acknowledgeOperationStatus(_))
+  EXPECT_CALL(*resourceProvider.process, acknowledgeOperationStatus(_))
     .WillOnce(FutureSatisfy(&acknowledgeOperationStatus));
 
-  resourceProvider.operationDefault(applyOperation.get());
+  dispatch(
+      *resourceProvider.process,
+      &v1::TestResourceProviderProcess::operationDefault,
+      applyOperation.get());
 
   // We expect the master to acknowledge the operation status update.
   AWAIT_READY(acknowledgeOperationStatusMessage);
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index 4612e2e..cd50673 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -3018,51 +3018,53 @@ template <
     typename Call,
     typename Driver,
     typename ResourceProviderInfo,
+    typename ResourceProviderID,
     typename Resource,
     typename Resources,
-    typename ResourceProviderID,
     typename OperationState,
-    typename Operation,
-    typename Source>
-class MockResourceProvider
-{
-  using MockResourceProviderT = MockResourceProvider<
+    typename Operation>
+class TestResourceProviderProcess :
+  public process::Process<TestResourceProviderProcess<
       Event,
       Call,
       Driver,
       ResourceProviderInfo,
+      ResourceProviderID,
       Resource,
       Resources,
-      ResourceProviderID,
       OperationState,
-      Operation,
-      Source>;
-
+      Operation>>
+{
 public:
-  MockResourceProvider(
+  TestResourceProviderProcess(
       const ResourceProviderInfo& _info,
       const Option<Resources>& _resources = None())
     : info(_info),
       resources(_resources)
   {
     ON_CALL(*this, connected())
-      .WillByDefault(Invoke(this, &MockResourceProviderT::connectedDefault));
+      .WillByDefault(
+          Invoke(this, &TestResourceProviderProcess::connectedDefault));
     EXPECT_CALL(*this, connected()).WillRepeatedly(DoDefault());
 
     ON_CALL(*this, subscribed(_))
-      .WillByDefault(Invoke(this, &MockResourceProviderT::subscribedDefault));
+      .WillByDefault(
+          Invoke(this, &TestResourceProviderProcess::subscribedDefault));
     EXPECT_CALL(*this, subscribed(_)).WillRepeatedly(DoDefault());
 
     ON_CALL(*this, applyOperation(_))
-      .WillByDefault(Invoke(this, &MockResourceProviderT::operationDefault));
+      .WillByDefault(
+          Invoke(this, &TestResourceProviderProcess::operationDefault));
     EXPECT_CALL(*this, applyOperation(_)).WillRepeatedly(DoDefault());
 
     ON_CALL(*this, publishResources(_))
-      .WillByDefault(Invoke(this, &MockResourceProviderT::publishDefault));
+      .WillByDefault(
+          Invoke(this, &TestResourceProviderProcess::publishDefault));
     EXPECT_CALL(*this, publishResources(_)).WillRepeatedly(DoDefault());
 
     ON_CALL(*this, teardown())
-      .WillByDefault(Invoke(this, &MockResourceProviderT::teardownDefault));
+      .WillByDefault(
+          Invoke(this, &TestResourceProviderProcess::teardownDefault));
     EXPECT_CALL(*this, teardown()).WillRepeatedly(DoDefault());
   }
 
@@ -3122,7 +3124,7 @@ public:
       process::Owned<mesos::internal::EndpointDetector> detector,
       ContentType contentType)
   {
-    Option<std::string> token;
+    process::Future<Option<std::string>> token = None();
 
 #ifdef USE_SSL_SOCKET
     mesos::authentication::executor::JWTSecretGenerator secretGenerator(
@@ -3139,20 +3141,31 @@ public:
 
     process::Future<Secret> secret = secretGenerator.generate(principal);
 
-    AWAIT_READY(secret);
-
-    token = secret->value().data();
+    token = secretGenerator.generate(principal).then(
+        [](const Secret& secret) -> Option<std::string> {
+          return secret.value().data();
+        });
 #endif // USE_SSL_SOCKET
 
-    driver.reset(new Driver(
-      std::move(detector),
-      contentType,
-      lambda::bind(&MockResourceProviderT::connected, this),
-      lambda::bind(&MockResourceProviderT::disconnected, this),
-      lambda::bind(&MockResourceProviderT::events, this, lambda::_1),
-      token));
+    // TODO(bbannier): Remove the `shared_ptr` once we get C++14.
+    auto detector_ =
+      std::make_shared<process::Owned<EndpointDetector>>(std::move(detector));
 
-    driver->start();
+    token.then(defer(this->self(), [=](const Option<std::string>& token) {
+      driver.reset(new Driver(
+          std::move(*detector_),
+          contentType,
+          process::defer(this->self(), &TestResourceProviderProcess::connected),
+          process::defer(
+              this->self(), &TestResourceProviderProcess::disconnected),
+          process::defer(
+              this->self(), &TestResourceProviderProcess::events, lambda::_1),
+          token));
+
+      driver->start();
+
+      return Nothing();
+    }));
   }
 
   void stop()
@@ -3162,11 +3175,6 @@ public:
 
   void connectedDefault()
   {
-    // Do nothing if this is asynchronously called after `stop` is invoked.
-    if (driver == nullptr) {
-      return;
-    }
-
     Call call;
     call.set_type(Call::SUBSCRIBE);
     call.mutable_subscribe()->mutable_resource_provider_info()->CopyFrom(info);
@@ -3176,13 +3184,10 @@ public:
 
   void subscribedDefault(const typename Event::Subscribed& subscribed)
   {
-    // Do nothing if this is asynchronously called after `stop` is invoked.
-    if (driver == nullptr) {
-      return;
-    }
-
     info.mutable_id()->CopyFrom(subscribed.provider_id());
 
+    providerId.set(subscribed.provider_id());
+
     if (resources.isSome()) {
       Resources injected;
 
@@ -3206,11 +3211,6 @@ public:
 
   void operationDefault(const typename Event::ApplyOperation& operation)
   {
-    // Do nothing if this is asynchronously called after `stop` is invoked.
-    if (driver == nullptr) {
-      return;
-    }
-
     CHECK(info.has_id());
 
     Call call;
@@ -3266,7 +3266,7 @@ public:
           ->mutable_converted_resources(0)
           ->mutable_disk()
           ->mutable_source()
-          ->set_type(Source::RAW);
+          ->set_type(Resource::DiskInfo::Source::RAW);
         break;
       case Operation::UNKNOWN:
         break;
@@ -3285,11 +3285,6 @@ public:
 
   void publishDefault(const typename Event::PublishResources& publish)
   {
-    // Do nothing if this is asynchronously called after `stop` is invoked.
-    if (driver == nullptr) {
-      return;
-    }
-
     CHECK(info.has_id());
 
     Call call;
@@ -3306,11 +3301,74 @@ public:
 
   void teardownDefault() {}
 
-  ResourceProviderInfo info;
+  process::Future<ResourceProviderID> id() const { return providerId.future(); }
 
 private:
+  ResourceProviderInfo info;
+
   Option<Resources> resources;
   std::unique_ptr<Driver> driver;
+
+  process::Promise<ResourceProviderID> providerId;
+};
+
+template <
+    typename Event,
+    typename Call,
+    typename Driver,
+    typename ResourceProviderInfo,
+    typename ResourceProviderID,
+    typename Resource,
+    typename Resources,
+    typename OperationState,
+    typename Operation>
+class TestResourceProvider
+{
+public:
+  TestResourceProvider(
+      const ResourceProviderInfo& _info,
+      const Option<Resources>& _resources = None())
+    : process(new TestResourceProviderProcessT(_info, _resources))
+  {
+    process::spawn(*process);
+  }
+
+  ~TestResourceProvider()
+  {
+    process::terminate(*process);
+    process::wait(*process);
+  }
+
+  void start(
+      process::Owned<mesos::internal::EndpointDetector> detector,
+      ContentType contentType)
+  {
+    process::dispatch(
+        *process,
+        &TestResourceProviderProcessT::start,
+        std::move(detector),
+        contentType);
+  }
+
+  process::Future<Nothing> send(const Call& call)
+  {
+    return process::dispatch(
+        *process, &TestResourceProviderProcessT::send, call);
+  }
+
+  // Made public for mocking.
+  using TestResourceProviderProcessT = TestResourceProviderProcess<
+      mesos::v1::resource_provider::Event,
+      mesos::v1::resource_provider::Call,
+      mesos::v1::resource_provider::Driver,
+      mesos::v1::ResourceProviderInfo,
+      mesos::v1::ResourceProviderID,
+      mesos::v1::Resource,
+      mesos::v1::Resources,
+      mesos::v1::OperationState,
+      mesos::v1::Offer::Operation>;
+
+  std::unique_ptr<TestResourceProviderProcessT> process;
 };
 
 inline process::Owned<EndpointDetector> createEndpointDetector(
@@ -3347,17 +3405,28 @@ using Event = mesos::v1::resource_provider::Event;
 
 } // namespace resource_provider {
 
-using MockResourceProvider = tests::resource_provider::MockResourceProvider<
+using TestResourceProviderProcess =
+  tests::resource_provider::TestResourceProviderProcess<
+      mesos::v1::resource_provider::Event,
+      mesos::v1::resource_provider::Call,
+      mesos::v1::resource_provider::Driver,
+      mesos::v1::ResourceProviderInfo,
+      mesos::v1::ResourceProviderID,
+      mesos::v1::Resource,
+      mesos::v1::Resources,
+      mesos::v1::OperationState,
+      mesos::v1::Offer::Operation>;
+
+using TestResourceProvider = tests::resource_provider::TestResourceProvider<
     mesos::v1::resource_provider::Event,
     mesos::v1::resource_provider::Call,
     mesos::v1::resource_provider::Driver,
     mesos::v1::ResourceProviderInfo,
+    mesos::v1::ResourceProviderID,
     mesos::v1::Resource,
     mesos::v1::Resources,
-    mesos::v1::ResourceProviderID,
     mesos::v1::OperationState,
-    mesos::v1::Offer::Operation,
-    mesos::v1::Resource::DiskInfo::Source>;
+    mesos::v1::Offer::Operation>;
 
 } // namespace v1 {
 
diff --git a/src/tests/operation_reconciliation_tests.cpp b/src/tests/operation_reconciliation_tests.cpp
index eae318d..9d084c0 100644
--- a/src/tests/operation_reconciliation_tests.cpp
+++ b/src/tests/operation_reconciliation_tests.cpp
@@ -101,8 +101,8 @@ TEST_P(OperationReconciliationTest, PendingOperation)
   Resource disk =
     createDiskResource("200", "*", None(), None(), createDiskSourceRaw());
 
-  Owned<MockResourceProvider> resourceProvider(
-      new MockResourceProvider(
+  Owned<TestResourceProvider> resourceProvider(
+      new TestResourceProvider(
           resourceProviderInfo,
           Resources(disk)));
 
@@ -813,15 +813,15 @@ TEST_P(OperationReconciliationTest, AgentPendingOperationAfterMasterFailover)
   Resource disk = createDiskResource(
       "200", "*", None(), None(), createDiskSourceRaw(None(), "profile"));
 
-  Owned<MockResourceProvider> resourceProvider(
-      new MockResourceProvider(
+  Owned<TestResourceProvider> resourceProvider(
+      new TestResourceProvider(
           resourceProviderInfo,
           Resources(disk)));
 
   // We override the mock resource provider's default action, so the operation
   // will stay in `OPERATION_PENDING`.
   Future<resource_provider::Event::ApplyOperation> applyOperation;
-  EXPECT_CALL(*resourceProvider, applyOperation(_))
+  EXPECT_CALL(*resourceProvider->process, applyOperation(_))
     .WillOnce(FutureArg<0>(&applyOperation));
 
   Owned<EndpointDetector> endpointDetector(
@@ -1572,8 +1572,8 @@ TEST_P(OperationReconciliationTest, OperationOnUnsubscribedProvider)
   Resource disk =
     createDiskResource("200", "*", None(), None(), createDiskSourceRaw());
 
-  Owned<MockResourceProvider> resourceProvider(
-      new MockResourceProvider(
+  Owned<TestResourceProvider> resourceProvider(
+      new TestResourceProvider(
           resourceProviderInfo,
           Resources(disk)));
 
@@ -1659,7 +1659,7 @@ TEST_P(OperationReconciliationTest, OperationOnUnsubscribedProvider)
 
   AWAIT_READY(applyOperationMessage);
 
-  // Terminate the resource provider.
+  // Tear the resource provider down.
   resourceProvider.reset();
 
   // Make sure the resource provider manager processes the disconnection.
@@ -1731,8 +1731,8 @@ TEST_P(
   Resource disk =
     createDiskResource("200", "*", None(), None(), createDiskSourceRaw());
 
-  Owned<MockResourceProvider> resourceProvider(
-      new MockResourceProvider(
+  Owned<TestResourceProvider> resourceProvider(
+      new TestResourceProvider(
           resourceProviderInfo,
           Resources(disk)));
 
@@ -1757,9 +1757,12 @@ TEST_P(
   ASSERT_TRUE(updateSlaveMessage->has_resource_providers());
   ASSERT_EQ(1, updateSlaveMessage->resource_providers().providers_size());
 
-  ASSERT_TRUE(resourceProvider->info.has_id());
+  Future<v1::ResourceProviderID> resourceProviderId =
+    resourceProvider->process->id();
 
-  resourceProviderInfo = resourceProvider->info;
+  AWAIT_READY(resourceProviderId);
+
+  resourceProviderInfo.mutable_id()->CopyFrom(resourceProviderId.get());
 
   Clock::pause();
 
@@ -1811,8 +1814,6 @@ TEST_P(
       createDynamicReservationInfo(
           frameworkInfo.roles(0), DEFAULT_CREDENTIAL.principal()));
 
-  ResourceProviderID resourceProviderId(reservedResources.provider_id());
-
   OperationID operationId;
   operationId.set_value("operation");
 
@@ -1872,7 +1873,7 @@ TEST_P(
   scheduler::Call::ReconcileOperations::Operation operation;
   operation.mutable_operation_id()->CopyFrom(operationId);
   operation.mutable_agent_id()->CopyFrom(agentId);
-  operation.mutable_resource_provider_id()->CopyFrom(resourceProviderId);
+  operation.mutable_resource_provider_id()->CopyFrom(resourceProviderId.get());
 
   Future<v1::scheduler::Event::UpdateOperationStatus> reconciliationUpdate;
   EXPECT_CALL(*scheduler, updateOperationStatus(_, _))
diff --git a/src/tests/resource_provider_manager_tests.cpp b/src/tests/resource_provider_manager_tests.cpp
index 7d48f18..2792000 100644
--- a/src/tests/resource_provider_manager_tests.cpp
+++ b/src/tests/resource_provider_manager_tests.cpp
@@ -1015,7 +1015,7 @@ TEST_P(ResourceProviderManagerHttpApiTest, ConvertResources)
   resourceProviderInfo.set_type("org.apache.mesos.rp.test");
   resourceProviderInfo.set_name("test");
 
-  v1::MockResourceProvider resourceProvider(
+  v1::TestResourceProvider resourceProvider(
       resourceProviderInfo, Some(v1::Resources(disk)));
 
   // Start and register a resource provider.
@@ -1148,8 +1148,8 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(
   v1::Resource disk = v1::createDiskResource(
       "200", "*", None(), None(), v1::createDiskSourceRaw());
 
-  Owned<v1::MockResourceProvider> resourceProvider(
-      new v1::MockResourceProvider(resourceProviderInfo, v1::Resources(disk)));
+  Owned<v1::TestResourceProvider> resourceProvider(
+      new v1::TestResourceProvider(resourceProviderInfo, v1::Resources(disk)));
 
   // Start and register a resource provider.
   Owned<EndpointDetector> endpointDetector(
@@ -1164,17 +1164,20 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(
   // will have an ID assigned by the agent.
   AWAIT_READY(updateSlaveMessage);
 
-  ASSERT_TRUE(resourceProvider->info.has_id());
+  Future<v1::ResourceProviderID> resourceProviderId =
+    resourceProvider->process->id();
 
-  resourceProviderInfo = resourceProvider->info;
+  AWAIT_READY(resourceProviderId);
+
+  resourceProviderInfo.mutable_id()->CopyFrom(resourceProviderId.get());
 
   // Resource provider failover by opening a new connection.
   // The assigned resource provider ID will be used to resubscribe.
   resourceProvider.reset(
-      new v1::MockResourceProvider(resourceProviderInfo, v1::Resources(disk)));
+      new v1::TestResourceProvider(resourceProviderInfo, v1::Resources(disk)));
 
   Future<Event::Subscribed> subscribed1;
-  EXPECT_CALL(*resourceProvider, subscribed(_))
+  EXPECT_CALL(*resourceProvider->process, subscribed(_))
     .WillOnce(FutureArg<0>(&subscribed1));
 
   endpointDetector =
@@ -1190,11 +1193,13 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(
   // got disconnected. This avoids it to in turn resubscribe racing
   // with the newly created resource provider.
   Future<Nothing> disconnected;
-  EXPECT_CALL(*resourceProvider, disconnected())
+  EXPECT_CALL(*resourceProvider->process, disconnected())
     .WillOnce(DoAll(
-        Invoke([&resourceProvider]() { resourceProvider.reset(); }),
-        FutureSatisfy(&disconnected)))
-    .WillRepeatedly(Return()); // Ignore spurious calls concurrent with `reset`.
+      Invoke(
+        resourceProvider->process.get(),
+        &v1::TestResourceProviderProcess::stop),
+      FutureSatisfy(&disconnected)))
+    .WillRepeatedly(Return()); // Ignore spurious calls concurrent with `stop`.
 
   // The agent failover.
   agent->reset();
@@ -1211,12 +1216,12 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(
   endpointDetector =
     resource_provider::createEndpointDetector(agent.get()->pid);
 
-  resourceProvider.reset(new v1::MockResourceProvider(
+  resourceProvider.reset(new v1::TestResourceProvider(
       resourceProviderInfo,
       Some(v1::Resources(disk))));
 
   Future<Event::Subscribed> subscribed2;
-  EXPECT_CALL(*resourceProvider, subscribed(_))
+  EXPECT_CALL(*resourceProvider->process, subscribed(_))
     .WillOnce(FutureArg<0>(&subscribed2));
 
   resourceProvider->start(std::move(endpointDetector), contentType);
@@ -1261,16 +1266,16 @@ TEST_P(ResourceProviderManagerHttpApiTest, ResubscribeUnknownID)
   resourceProviderInfo.set_type("org.apache.mesos.rp.test");
   resourceProviderInfo.set_name("test");
 
-  Owned<v1::MockResourceProvider> resourceProvider(
-      new v1::MockResourceProvider(resourceProviderInfo));
+  v1::TestResourceProvider resourceProvider(resourceProviderInfo);
 
-  // We explicitly reset the resource provider after the expected
+  // We explicitly terminate the resource provider after the expected
   // disconnect to prevent it from resubscribing indefinitely.
   Future<Nothing> disconnected;
-  EXPECT_CALL(*resourceProvider, disconnected())
+  EXPECT_CALL(*resourceProvider.process, disconnected())
     .WillOnce(DoAll(
-        Invoke([&resourceProvider]() { resourceProvider.reset(); }),
-        FutureSatisfy(&disconnected)));
+      Invoke(
+        resourceProvider.process.get(), &v1::TestResourceProviderProcess::stop),
+      FutureSatisfy(&disconnected)));
 
   // Start and register a resource provider.
   Owned<EndpointDetector> endpointDetector(
@@ -1278,7 +1283,7 @@ TEST_P(ResourceProviderManagerHttpApiTest, ResubscribeUnknownID)
 
   const ContentType contentType = GetParam();
 
-  resourceProvider->start(std::move(endpointDetector), contentType);
+  resourceProvider.start(std::move(endpointDetector), contentType);
 
   AWAIT_READY(disconnected);
 }
@@ -1320,8 +1325,8 @@ TEST_P(ResourceProviderManagerHttpApiTest, ResourceProviderDisconnect)
   v1::Resource disk = v1::createDiskResource(
       "200", "*", None(), None(), v1::createDiskSourceRaw());
 
-  Owned<v1::MockResourceProvider> resourceProvider(
-      new v1::MockResourceProvider(
+  Owned<v1::TestResourceProvider> resourceProvider(
+      new v1::TestResourceProvider(
           resourceProviderInfo,
           v1::Resources(disk)));
 
@@ -1339,8 +1344,12 @@ TEST_P(ResourceProviderManagerHttpApiTest, ResourceProviderDisconnect)
     // provider will have an ID assigned by the agent.
     AWAIT_READY(updateSlaveMessage);
 
-    ASSERT_TRUE(resourceProvider->info.has_id());
-    disk.mutable_provider_id()->CopyFrom(resourceProvider->info.id());
+    Future<v1::ResourceProviderID> resourceProviderId =
+      resourceProvider->process->id();
+
+    AWAIT_READY(resourceProviderId);
+
+    disk.mutable_provider_id()->CopyFrom(resourceProviderId.get());
 
     const Resources& totalResources =
       updateSlaveMessage->resource_providers().providers(0).total_resources();
@@ -1389,18 +1398,17 @@ TEST_F(ResourceProviderManagerHttpApiTest, ResourceProviderSubscribeDisconnect)
   resourceProviderInfo.set_type("org.apache.mesos.rp.test");
   resourceProviderInfo.set_name("test");
 
-  Owned<v1::MockResourceProvider> resourceProvider1(
-      new v1::MockResourceProvider(resourceProviderInfo));
+  v1::TestResourceProvider resourceProvider1(resourceProviderInfo);
 
   // Start and register a resource provider.
   Owned<EndpointDetector> endpointDetector(
       resource_provider::createEndpointDetector(agent.get()->pid));
 
   Future<Event::Subscribed> subscribed1;
-  EXPECT_CALL(*resourceProvider1, subscribed(_))
+  EXPECT_CALL(*resourceProvider1.process, subscribed(_))
     .WillOnce(FutureArg<0>(&subscribed1));
 
-  resourceProvider1->start(std::move(endpointDetector), ContentType::PROTOBUF);
+  resourceProvider1.start(std::move(endpointDetector), ContentType::PROTOBUF);
 
   AWAIT_READY(subscribed1);
 
@@ -1409,21 +1417,23 @@ TEST_F(ResourceProviderManagerHttpApiTest, ResourceProviderSubscribeDisconnect)
   // Subscribing a second resource provider with the same ID will
   // disconnect the first instance and handle the subscription by the
   // second resource provider as a resubscription.
-  Owned<v1::MockResourceProvider> resourceProvider2(
-      new v1::MockResourceProvider(resourceProviderInfo));
+  Owned<v1::TestResourceProvider> resourceProvider2(
+      new v1::TestResourceProvider(resourceProviderInfo));
 
   // We terminate the first resource provider once we have confirmed
   // that it got disconnected. This avoids it to in turn resubscribe
   // racing with the other resource provider.
   Future<Nothing> disconnected1;
-  EXPECT_CALL(*resourceProvider1, disconnected())
+  EXPECT_CALL(*resourceProvider1.process, disconnected())
     .WillOnce(DoAll(
-        Invoke([&resourceProvider1]() { resourceProvider1.reset(); }),
-        FutureSatisfy(&disconnected1)))
-    .WillRepeatedly(Return()); // Ignore spurious calls concurrent with `reset`.
+      Invoke(
+        resourceProvider1.process.get(),
+        &v1::TestResourceProviderProcess::stop),
+      FutureSatisfy(&disconnected1)))
+    .WillRepeatedly(Return()); // Ignore spurious calls concurrent with `stop`.
 
   Future<Event::Subscribed> subscribed2;
-  EXPECT_CALL(*resourceProvider2, subscribed(_))
+  EXPECT_CALL(*resourceProvider2->process, subscribed(_))
     .WillOnce(FutureArg<0>(&subscribed2));
 
   endpointDetector =
@@ -1477,15 +1487,15 @@ TEST_F(ResourceProviderManagerHttpApiTest, Metrics)
   resourceProviderInfo.set_type("org.apache.mesos.rp.test");
   resourceProviderInfo.set_name("test");
 
-  Owned<v1::MockResourceProvider> resourceProvider(
-      new v1::MockResourceProvider(resourceProviderInfo));
+  Owned<v1::TestResourceProvider> resourceProvider(
+      new v1::TestResourceProvider(resourceProviderInfo));
 
   // Start and register a resource provider.
   Owned<EndpointDetector> endpointDetector(
       resource_provider::createEndpointDetector(agent.get()->pid));
 
   Future<Event::Subscribed> subscribed;
-  EXPECT_CALL(*resourceProvider, subscribed(_))
+  EXPECT_CALL(*resourceProvider->process, subscribed(_))
     .WillOnce(FutureArg<0>(&subscribed));
 
   resourceProvider->start(std::move(endpointDetector), ContentType::PROTOBUF);
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index fc95f19..7c6e1d9 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -10520,10 +10520,10 @@ TEST_F(SlaveTest, ResourceProviderSubscribe)
   resourceProviderInfo.set_name("test");
 
   // Register a local resource provider with the agent.
-  v1::MockResourceProvider resourceProvider(resourceProviderInfo);
+  v1::TestResourceProvider resourceProvider(resourceProviderInfo);
 
   Future<Nothing> connected;
-  EXPECT_CALL(resourceProvider, connected())
+  EXPECT_CALL(*resourceProvider.process, connected())
     .WillOnce(FutureSatisfy(&connected));
 
   Owned<EndpointDetector> endpointDetector(
@@ -10534,7 +10534,7 @@ TEST_F(SlaveTest, ResourceProviderSubscribe)
   AWAIT_READY(connected);
 
   Future<mesos::v1::resource_provider::Event::Subscribed> subscribed;
-  EXPECT_CALL(resourceProvider, subscribed(_))
+  EXPECT_CALL(*resourceProvider.process, subscribed(_))
     .WillOnce(FutureArg<0>(&subscribed));
 
   Future<UpdateSlaveMessage> updateSlaveMessage =
@@ -10627,7 +10627,7 @@ TEST_F(SlaveTest, ResourceProviderPublishAll)
       v1::Resources::parse("disk", "4096", "role2").get()
   };
 
-  v1::MockResourceProvider resourceProvider(resourceProviderInfo, resources);
+  v1::TestResourceProvider resourceProvider(resourceProviderInfo, resources);
 
   Owned<EndpointDetector> endpointDetector(
       resource_provider::createEndpointDetector(slave.get()->pid));
@@ -10682,14 +10682,15 @@ TEST_F(SlaveTest, ResourceProviderPublishAll)
 
     // Two PUBLISH_RESOURCES events will be received: one for launching the
     // executor, and the other for launching the task.
-    EXPECT_CALL(resourceProvider, publishResources(_))
-      .WillOnce(
-          Invoke(&resourceProvider,
-                 &v1::MockResourceProvider::publishDefault))
+    EXPECT_CALL(*resourceProvider.process, publishResources(_))
+      .WillOnce(Invoke(
+          resourceProvider.process.get(),
+          &v1::TestResourceProviderProcess::publishDefault))
       .WillOnce(DoAll(
           FutureArg<0>(&publish),
-          Invoke(&resourceProvider,
-                 &v1::MockResourceProvider::publishDefault)));
+          Invoke(
+              resourceProvider.process.get(),
+              &v1::TestResourceProviderProcess::publishDefault)));
 
     Future<TaskStatus> taskStarting;
     Future<TaskStatus> taskRunning;
@@ -10763,7 +10764,7 @@ TEST_F(SlaveTest, RemoveResourceProvider)
       None(),
       v1::createDiskSourceRaw(None(), "profile"));
 
-  v1::MockResourceProvider resourceProvider(resourceProviderInfo, disk);
+  v1::TestResourceProvider resourceProvider(resourceProviderInfo, disk);
 
   Owned<EndpointDetector> endpointDetector(
       resource_provider::createEndpointDetector(slave.get()->pid));
@@ -10825,7 +10826,7 @@ TEST_F(SlaveTest, RemoveResourceProvider)
 
   // Create a pending operation.
   Future<v1::resource_provider::Event::ApplyOperation> applyOperation;
-  EXPECT_CALL(resourceProvider, applyOperation(_))
+  EXPECT_CALL(*resourceProvider.process, applyOperation(_))
     .WillOnce(FutureArg<0>(&applyOperation));
 
   v1::OperationID operationId;
@@ -10844,16 +10845,16 @@ TEST_F(SlaveTest, RemoveResourceProvider)
   AWAIT_READY(applyOperation);
 
   // A resource provider cannot be removed while it still has resources.
-  ASSERT_TRUE(resourceProvider.info.has_id());
+  Future<v1::ResourceProviderID> resourceProviderId =
+    resourceProvider.process->id();
 
-  const mesos::v1::ResourceProviderID& resourceProviderId =
-    resourceProvider.info.id();
+  AWAIT_READY(resourceProviderId);
 
   v1::agent::Call v1Call;
   v1Call.set_type(v1::agent::Call::MARK_RESOURCE_PROVIDER_GONE);
   v1Call.mutable_mark_resource_provider_gone()
     ->mutable_resource_provider_id()
-    ->CopyFrom(resourceProviderId);
+    ->CopyFrom(resourceProviderId.get());
 
   constexpr ContentType contentType = ContentType::PROTOBUF;
   process::http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
@@ -10877,7 +10878,7 @@ TEST_F(SlaveTest, RemoveResourceProvider)
 
     Call call;
     call.set_type(Call::UPDATE_STATE);
-    call.mutable_resource_provider_id()->CopyFrom(resourceProvider.info.id());
+    call.mutable_resource_provider_id()->CopyFrom(resourceProviderId.get());
 
     Call::UpdateState* update = call.mutable_update_state();
     update->mutable_resources()->Clear();
@@ -10915,26 +10916,26 @@ TEST_F(SlaveTest, RemoveResourceProvider)
 
   // The resource provider will receive a TEARDOWN event on being marked gone.
   Future<Nothing> teardown;
-  EXPECT_CALL(resourceProvider, teardown())
+  EXPECT_CALL(*resourceProvider.process, teardown())
     .WillOnce(FutureSatisfy(&teardown));
 
   // We expect at least two disconnection events, one initially when the
   // connected resource provider gets removed, and when the automatic attempt
   // to resubscribe fails and leads the remote to close the connection.
   Future<Nothing> disconnected;
-  EXPECT_CALL(resourceProvider, disconnected())
+  EXPECT_CALL(*resourceProvider.process, disconnected())
     .WillOnce(DoDefault())
     .WillOnce(FutureSatisfy(&disconnected))
     .WillRepeatedly(Return()); // Ignore additional ddisconnection events.
 
   // The resource provider will automatically attempt to reconnect.
   Future<Nothing> connected;
-  EXPECT_CALL(resourceProvider, connected())
+  EXPECT_CALL(*resourceProvider.process, connected())
     .WillOnce(DoDefault())
     .WillRepeatedly(Return());
 
   // The resource provider should never successfully resubscribe.
-  EXPECT_CALL(resourceProvider, subscribed(_))
+  EXPECT_CALL(*resourceProvider.process, subscribed(_))
     .Times(Exactly(0));
 
   response = process::http::post(
@@ -11166,7 +11167,7 @@ TEST_F(SlaveTest, ResourceProviderReconciliation)
       None(),
       v1::createDiskSourceRaw());
 
-  v1::MockResourceProvider resourceProvider(
+  v1::TestResourceProvider resourceProvider(
       resourceProviderInfo,
       resourceProviderResources);
 
@@ -11228,7 +11229,7 @@ TEST_F(SlaveTest, ResourceProviderReconciliation)
   // We now perform a `RESERVE` operation on the offered resources,
   // but let the operation fail in the resource provider.
   Future<v1::resource_provider::Event::ApplyOperation> operation;
-  EXPECT_CALL(resourceProvider, applyOperation(_))
+  EXPECT_CALL(*resourceProvider.process, applyOperation(_))
     .WillOnce(FutureArg<0>(&operation));
 
   {
@@ -11265,11 +11266,14 @@ TEST_F(SlaveTest, ResourceProviderReconciliation)
   // Fail the operation in the resource provider. This should trigger
   // an `UpdateSlaveMessage` to the master.
   {
-    ASSERT_TRUE(resourceProvider.info.has_id());
+    Future<v1::ResourceProviderID> resourceProviderId =
+      resourceProvider.process->id();
+
+    AWAIT_READY(resourceProviderId);
 
     v1::Resources resourceProviderResources_;
     foreach (v1::Resource resource, resourceProviderResources) {
-      resource.mutable_provider_id()->CopyFrom(resourceProvider.info.id());
+      resource.mutable_provider_id()->CopyFrom(resourceProviderId.get());
 
       resourceProviderResources_ += resource;
     }
@@ -11280,7 +11284,7 @@ TEST_F(SlaveTest, ResourceProviderReconciliation)
     v1::resource_provider::Call call;
 
     call.set_type(v1::resource_provider::Call::UPDATE_STATE);
-    call.mutable_resource_provider_id()->CopyFrom(resourceProvider.info.id());
+    call.mutable_resource_provider_id()->CopyFrom(resourceProviderId.get());
 
     v1::resource_provider::Call::UpdateState* updateState =
       call.mutable_update_state();
@@ -11373,7 +11377,7 @@ TEST_F(SlaveTest, RunTaskResourceVersions)
       None(),
       v1::createDiskSourceRaw());
 
-  v1::MockResourceProvider resourceProvider(
+  v1::TestResourceProvider resourceProvider(
       resourceProviderInfo,
       resourceProviderResources);
 
@@ -11411,18 +11415,21 @@ TEST_F(SlaveTest, RunTaskResourceVersions)
 
   // Update resource version of the resource provider.
   {
-    CHECK(resourceProvider.info.has_id());
+    Future<v1::ResourceProviderID> resourceProviderId =
+      resourceProvider.process->id();
+
+    AWAIT_READY(resourceProviderId);
 
     v1::Resources resourceProviderResources_;
     foreach (v1::Resource resource, resourceProviderResources) {
-      resource.mutable_provider_id()->CopyFrom(resourceProvider.info.id());
+      resource.mutable_provider_id()->CopyFrom(resourceProviderId.get());
 
       resourceProviderResources_ += resource;
     }
 
     v1::resource_provider::Call call;
     call.set_type(v1::resource_provider::Call::UPDATE_STATE);
-    call.mutable_resource_provider_id()->CopyFrom(resourceProvider.info.id());
+    call.mutable_resource_provider_id()->CopyFrom(resourceProviderId.get());
 
     v1::resource_provider::Call::UpdateState* updateState =
       call.mutable_update_state();
@@ -11635,16 +11642,15 @@ TEST_F(
       None(),
       v1::createDiskSourceRaw());
 
-  v1::MockResourceProvider resourceProvider(
-      resourceProviderInfo,
-      resourceProviderResources);
+  Owned<v1::TestResourceProvider> resourceProvider(new v1::TestResourceProvider(
+      resourceProviderInfo, resourceProviderResources));
 
   Owned<EndpointDetector> endpointDetector(
       resource_provider::createEndpointDetector(slave.get()->pid));
 
   updateSlaveMessage = FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
 
-  resourceProvider.start(std::move(endpointDetector), ContentType::PROTOBUF);
+  resourceProvider->start(std::move(endpointDetector), ContentType::PROTOBUF);
 
   AWAIT_READY(updateSlaveMessage);
 
@@ -11771,15 +11777,15 @@ TEST_F(
 
   // Fail over the agent. We expect the executor to resubscribe successfully
   // even if the resource provider does not resubscribe.
-  EXPECT_CALL(resourceProvider, disconnected())
+  EXPECT_CALL(*resourceProvider->process, disconnected())
     .Times(AtMost(1));
 
   EXPECT_NO_FUTURE_DISPATCHES(_, &Slave::executorTerminated);
 
   slave.get()->terminate();
 
-  // Stop the mock resource provider so it won't resubscribe.
-  resourceProvider.stop();
+  // Terminate the mock resource provider so it won't resubscribe.
+  resourceProvider.reset();
 
   // The following future will be satisfied when an HTTP executor subscribes.
   Future<Nothing> executorSubscribed = FUTURE_DISPATCH(_, &Slave::___run);