You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bb...@apache.org on 2019/04/09 20:02:22 UTC

[mesos] 01/02: Initialized resource provider manager earlier when recovering.

This is an automated email from the ASF dual-hosted git repository.

bbannier pushed a commit to branch 1.8.x
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit ef6912a27d164f74acab19b7da880b129e21837a
Author: Benjamin Bannier <be...@mesosphere.io>
AuthorDate: Wed Apr 3 11:51:41 2019 +0200

    Initialized resource provider manager earlier when recovering.
    
    When recovering and reusing the same agent ID the resource provider
    manager can be initialized before e.g., recovering executors. This patch
    move the initialization to such an earlier point. This e.g., allows to
    successfully publish resources via the manager when HTTP-based executors
    resubscribe which previously ran into an assertion failure.
    
    If the agent ID is not reused we still need to wait for the agent to
    register with the master which would assign an agent ID. In that case we
    do not expect any executors to resubscribe.
    
    Review: https://reviews.apache.org/r/70368/
---
 src/slave/slave.cpp       |   8 ++
 src/tests/slave_tests.cpp | 192 ++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 200 insertions(+)

diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 5373cee..794a9c9 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -1519,6 +1519,9 @@ void Slave::registered(
 
       CHECK_SOME(state::checkpoint(path, info));
 
+      // If we registered with this agent ID for the first time initialize
+      // the resource provider manager with it; if the manager was already
+      // initialized with a recovered agent ID this is a no-op.
       initializeResourceProviderManager(flags, info.id());
 
       // We start the local resource providers daemon once the agent is
@@ -7385,6 +7388,11 @@ Future<Nothing> Slave::recover(const Try<state::State>& state)
         requiredMasterCapabilities.agentUpdate = true;
       }
 
+      // If we restarted the agent process and will reuse the same agent ID
+      // we can immediately start the resource provider manager. This allows
+      // executors recovered later on to resubscribe immediately.
+      initializeResourceProviderManager(flags, info.id());
+
       // Recover the frameworks.
       foreachvalue (const FrameworkState& frameworkState,
                     slaveState->frameworks) {
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index 528a25a..b1c3a01 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -11566,6 +11566,198 @@ TEST_F(SlaveTest, RetryOperationStatusUpdateAfterRecovery)
   Clock::settle();
 }
 
+
+// This test verifies that on agent failover HTTP-based executors using
+// resource provider resources can resubscribe without crashing the
+// agent. This is a regression test for MESOS-9667.
+TEST_F(SlaveTest, AgentFailoverHTTPExecutorUsingResourceProviderResources)
+{
+  // This test is run with paused clock to avoid
+  // dealing with retried task status updates.
+  Clock::pause();
+
+  master::Flags masterFlags = CreateMasterFlags();
+  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  Future<UpdateSlaveMessage> updateSlaveMessage =
+    FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+  slave::Flags slaveFlags = CreateSlaveFlags();
+
+  // Use the same process ID so the executor can resubscribe.
+  string processId = process::ID::generate("slave");
+
+  StandaloneMasterDetector detector(master.get()->pid);
+  Try<Owned<cluster::Slave>> slave =
+    StartSlave(&detector, processId, slaveFlags);
+  ASSERT_SOME(slave);
+
+  Clock::advance(slaveFlags.registration_backoff_factor);
+
+  AWAIT_READY(updateSlaveMessage);
+
+  mesos::v1::ResourceProviderInfo resourceProviderInfo;
+  resourceProviderInfo.set_type("org.apache.mesos.resource_provider.test");
+  resourceProviderInfo.set_name("test");
+
+  // Register a resource provider with the agent.
+  v1::Resources resourceProviderResources = v1::createDiskResource(
+      "200",
+      "*",
+      None(),
+      None(),
+      v1::createDiskSourceRaw());
+
+  v1::MockResourceProvider resourceProvider(
+      resourceProviderInfo,
+      resourceProviderResources);
+
+  Owned<EndpointDetector> endpointDetector(
+      resource_provider::createEndpointDetector(slave.get()->pid));
+
+  updateSlaveMessage = FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+  resourceProvider.start(std::move(endpointDetector), ContentType::PROTOBUF);
+
+  AWAIT_READY(updateSlaveMessage);
+
+  // Register a framework to excercise operations.
+  auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
+
+  Future<Nothing> connected;
+  EXPECT_CALL(*scheduler, connected(_))
+    .WillOnce(FutureSatisfy(&connected));
+
+  v1::scheduler::TestMesos mesos(
+      master.get()->pid,
+      ContentType::PROTOBUF,
+      scheduler);
+
+  AWAIT_READY(connected);
+
+  Future<v1::scheduler::Event::Subscribed> subscribed;
+  Future<v1::scheduler::Event::Offers> offers;
+
+  EXPECT_CALL(*scheduler, heartbeat(_))
+    .WillRepeatedly(Return()); // Ignore heartbeats.
+
+  EXPECT_CALL(*scheduler, offers(_, _))
+    .WillRepeatedly(v1::scheduler::DeclineOffers());
+
+  EXPECT_CALL(
+      *scheduler,
+      offers(
+          _,
+          v1::scheduler::OffersHaveAnyResource(
+              &v1::Resources::hasResourceProvider)))
+    .WillOnce(FutureArg<1>(&offers));
+
+  v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_roles(0, "foo");
+  frameworkInfo.set_checkpoint(true);
+  frameworkInfo.set_failover_timeout(Days(365).secs());
+
+  // Subscribe the framework.
+  {
+    Call call;
+    call.set_type(Call::SUBSCRIBE);
+
+    Call::Subscribe* subscribe = call.mutable_subscribe();
+    subscribe->mutable_framework_info()->CopyFrom(frameworkInfo);
+
+    EXPECT_CALL(*scheduler, subscribed(_, _))
+      .WillOnce(FutureArg<1>(&subscribed));
+
+    mesos.send(call);
+  }
+
+  AWAIT_READY(subscribed);
+  v1::FrameworkID frameworkId = subscribed->framework_id();
+
+  AWAIT_READY(offers);
+  ASSERT_EQ(1, offers->offers_size());
+
+  const v1::Offer& offer = offers->offers(0);
+  const v1::AgentID agentId = offer.agent_id();
+
+  const v1::Resources resources = offer.resources();
+  ASSERT_FALSE(resources.filter(&v1::Resources::hasResourceProvider).empty())
+    << "Offer does not contain resource provider resources: " << resources;
+
+  v1::TaskID taskId;
+  taskId.set_value(id::UUID::random().toString());
+
+  Future<v1::scheduler::Event::Update> taskStarting;
+  Future<v1::scheduler::Event::Update> taskRunning;
+  EXPECT_CALL(*scheduler, update(_, _))
+    .WillOnce(DoAll(
+        v1::scheduler::SendAcknowledge(frameworkId, agentId),
+        FutureArg<1>(&taskStarting)))
+    .WillOnce(DoAll(
+        v1::scheduler::SendAcknowledge(frameworkId, agentId),
+        FutureArg<1>(&taskRunning)));
+
+  {
+    v1::Resources executorResources =
+      *v1::Resources::parse("cpus:0.1;mem:32;disk:32");
+    executorResources.allocate(frameworkInfo.roles(0));
+
+    v1::ExecutorInfo executorInfo;
+    executorInfo.set_type(v1::ExecutorInfo::DEFAULT);
+    executorInfo.mutable_executor_id()->CopyFrom(v1::DEFAULT_EXECUTOR_ID);
+    executorInfo.mutable_framework_id()->CopyFrom(frameworkId);
+    executorInfo.mutable_resources()->CopyFrom(executorResources);
+
+    ASSERT_TRUE(v1::Resources(offer.resources()).contains(executorResources));
+    v1::Resources taskResources = offer.resources() - executorResources;
+
+    v1::TaskInfo taskInfo =
+      v1::createTask(agentId, taskResources, SLEEP_COMMAND(1000));
+
+    Call call = v1::createCallAccept(
+        frameworkId,
+        offer,
+        {v1::LAUNCH_GROUP(executorInfo, v1::createTaskGroupInfo({taskInfo}))});
+
+    mesos.send(call);
+  }
+
+  AWAIT_READY(taskStarting);
+  ASSERT_EQ(v1::TaskState::TASK_STARTING, taskStarting->status().state());
+
+  AWAIT_READY(taskRunning);
+  ASSERT_EQ(v1::TaskState::TASK_RUNNING, taskRunning->status().state());
+
+  // Fail over the agent. We expect the agent to destroy the running
+  // container since the resource provider has not resubscribed when
+  // the executor resubscribes which prevents publishing of used resources.
+  EXPECT_CALL(resourceProvider, disconnected());
+
+  slave.get()->terminate();
+
+  Future<Nothing> destroy =
+    FUTURE_DISPATCH(_, &MesosContainerizerProcess::destroy);
+
+  slave = StartSlave(&detector, processId, slaveFlags);
+  ASSERT_SOME(slave);
+
+  AWAIT_READY(destroy);
+
+  // Trigger a executor registration timeout so the agent finishes
+  // recovery and can shut down as part of being destructed.
+  Clock::advance(slaveFlags.executor_registration_timeout);
+  Clock::settle();
+
+  // NOTE: We do not check that the task is reported as `TASK_LOST`
+  // to the framework since we have not made sure that the task status
+  // update acknowledgement has been processed.
+
+  // TODO(bbannier): Once MESOS-9711 is fixed make sure that the executor
+  // can resubscribe even if it initially attempted to resubscribe before
+  // the resource provider has resubscribed.
+}
+
 } // namespace tests {
 } // namespace internal {
 } // namespace mesos {