You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by gr...@apache.org on 2019/05/07 18:02:46 UTC

[mesos] branch master updated: Fixed unguarded calls to `Option::get()` in the master.

This is an automated email from the ASF dual-hosted git repository.

grag pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git


The following commit(s) were added to refs/heads/master by this push:
     new 31ceed8  Fixed unguarded calls to `Option::get()` in the master.
31ceed8 is described below

commit 31ceed8dc636b3d63592d9c74f6cec03ed9745e8
Author: Greg Mann <gr...@gmail.com>
AuthorDate: Thu May 2 14:48:48 2019 -0700

    Fixed unguarded calls to `Option::get()` in the master.
    
    Review: https://reviews.apache.org/r/70587
---
 src/master/master.cpp                        |  32 +++++--
 src/tests/agent_operation_feedback_tests.cpp | 138 +++++++++++++++++++++++++++
 2 files changed, 162 insertions(+), 8 deletions(-)

diff --git a/src/master/master.cpp b/src/master/master.cpp
index 6c0e30b..437cbca 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -8759,8 +8759,9 @@ void Master::updateOperationStatus(UpdateOperationStatusMessage&& update)
 
   const SlaveID& slaveId = update.slave_id();
 
-  // The status update for the operation might be for an
-  // operator API call, thus the framework ID here is optional.
+  // While currently only frameworks can initiate operations which request
+  // feedback, in some cases such as master/agent reconciliation, the framework
+  // ID will not be set in the update message.
   Option<FrameworkID> frameworkId = update.has_framework_id()
     ? update.framework_id()
     : Option<FrameworkID>::none();
@@ -8770,6 +8771,8 @@ void Master::updateOperationStatus(UpdateOperationStatusMessage&& update)
   // agent. Since the operation UUID is not known, there is nothing for the
   // master to do but forward the update to the framework and return.
   if (!update.has_operation_uuid()) {
+    CHECK_SOME(frameworkId);
+
     // Forward the status update to the framework.
     Framework* framework = getFramework(frameworkId.get());
 
@@ -8815,11 +8818,20 @@ void Master::updateOperationStatus(UpdateOperationStatusMessage&& update)
 
   Operation* operation = slave->getOperation(update.operation_uuid());
   if (operation == nullptr) {
-    // If the operation cannot be found, then this must be an update sent as a
-    // result of a framework-inititated reconciliation which was forwarded to
-    // the agent. Since the operation is not known to the master, there is
-    // nothing for the master to do but forward the update to the framework and
-    // return.
+    // If the operation cannot be found and no framework ID was set, then this
+    // must be a duplicate update as a result of master/agent reconciliation. In
+    // this case, a terminal reconciliation update has already been received by
+    // the master, so we simply return.
+    if (frameworkId.isNone()) {
+      return;
+    }
+
+    // If the operation cannot be found and a framework ID was set, then this
+    // must be an update sent as a result of a framework-inititated
+    // reconciliation which was forwarded to the agent and raced with an
+    // `UpdateSlaveMessage` which removed the operation. Since the operation is
+    // not known to the master, there is nothing for the master to do but
+    // forward the update to the framework and return.
     Framework* framework = getFramework(frameworkId.get());
 
     if (framework == nullptr || !framework->connected()) {
@@ -8846,7 +8858,7 @@ void Master::updateOperationStatus(UpdateOperationStatusMessage&& update)
     // `ReconcileOperationsMessage`, but they can be deduced from the operation
     // info kept on the master.
 
-    // Only operations done via the scheduler API can have an ID.
+    // Currently, only operations done via the scheduler API can have an ID.
     CHECK(operation->has_framework_id());
 
     frameworkId = operation->framework_id();
@@ -8866,10 +8878,13 @@ void Master::updateOperationStatus(UpdateOperationStatusMessage&& update)
   // Orphaned operations have no framework to send updates to.
   bool frameworkWillAcknowledge =
     operation->info().has_id() &&
+    frameworkId.isSome() &&
     !isCompletedFramework(frameworkId.get()) &&
     !slave->orphanedOperations.contains(operation->uuid());
 
   if (frameworkWillAcknowledge) {
+    CHECK_SOME(frameworkId);
+
     // Forward the status update to the framework.
     Framework* framework = getFramework(frameworkId.get());
 
@@ -8910,6 +8925,7 @@ void Master::updateOperationStatus(UpdateOperationStatusMessage&& update)
       // buffer may also be affected by this wait.
       if (operation->info().has_id() &&
           slave->orphanedOperations.contains(operation->uuid()) &&
+          frameworkId.isSome() &&
           !isCompletedFramework(frameworkId.get())) {
         if (slave->reregisteredTime.isSome() &&
             (Clock::now() - slave->reregisteredTime.get()) <
diff --git a/src/tests/agent_operation_feedback_tests.cpp b/src/tests/agent_operation_feedback_tests.cpp
index e427441..a90038e 100644
--- a/src/tests/agent_operation_feedback_tests.cpp
+++ b/src/tests/agent_operation_feedback_tests.cpp
@@ -580,6 +580,144 @@ TEST_P(AgentOperationFeedbackTest, DroppedOperationStatusUpdate)
   Clock::resume();
 }
 
+
+// When a scheduler requests feedback for an operation and the operation is
+// dropped en route to the agent, it's possible that a `ReregisterSlaveMessage`
+// from the agent races with a `SlaveReregisteredMessage` from the master. In
+// this case, master/agent reconciliation of the operation may be performed more
+// than once, leading to duplicate operation status updates. This test verifies
+// that the master gracefully handles such a sequence of events. This is a
+// regression test for MESOS-9698.
+TEST_P(AgentOperationFeedbackTest, DroppedOperationDuplicateStatusUpdate)
+{
+  Clock::pause();
+
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+
+  slave::Flags slaveFlags = CreateSlaveFlags();
+
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags);
+  ASSERT_SOME(slave);
+
+  // Advance the clock to trigger agent registration.
+  Clock::advance(slaveFlags.registration_backoff_factor);
+
+  // Register a framework to exercise an operation.
+  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_roles(0, DEFAULT_TEST_ROLE);
+
+  auto scheduler = std::make_shared<MockHTTPScheduler>();
+
+  EXPECT_CALL(*scheduler, connected(_))
+    .WillOnce(scheduler::SendSubscribe(frameworkInfo));
+
+  Future<scheduler::Event::Subscribed> subscribed;
+  EXPECT_CALL(*scheduler, subscribed(_, _))
+    .WillOnce(FutureArg<1>(&subscribed));
+
+  EXPECT_CALL(*scheduler, heartbeat(_))
+    .WillRepeatedly(Return()); // Ignore heartbeats.
+
+  Future<scheduler::Event::Offers> offers;
+
+  // Set an expectation for the first offer.
+  EXPECT_CALL(*scheduler, offers(_, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  scheduler::TestMesos mesos(master.get()->pid, GetParam(), scheduler);
+
+  AWAIT_READY(subscribed);
+
+  const FrameworkID& frameworkId = subscribed->framework_id();
+
+  AWAIT_READY(offers);
+
+  ASSERT_FALSE(offers->offers().empty());
+
+  const Offer& offer = offers->offers(0);
+
+  // Reserve resources.
+  OperationID operationId;
+  operationId.set_value("operation");
+
+  ASSERT_FALSE(offer.resources().empty());
+
+  Resource reservedResources(*(offer.resources().begin()));
+  reservedResources.add_reservations()->CopyFrom(
+      createDynamicReservationInfo(
+          frameworkInfo.roles(0), DEFAULT_CREDENTIAL.principal()));
+
+  // Drop the operation in its way to the agent.
+  Future<ApplyOperationMessage> applyOperationMessage =
+    DROP_PROTOBUF(ApplyOperationMessage(), _, _);
+
+  mesos.send(createCallAccept(
+      frameworkId,
+      offer,
+      {RESERVE(reservedResources, operationId)}));
+
+  AWAIT_READY(applyOperationMessage);
+
+  // Capture the update on the way from agent to master
+  // so that we can inject a duplicate.
+  Future<UpdateOperationStatusMessage> updateOperationStatusMessage =
+    FUTURE_PROTOBUF(UpdateOperationStatusMessage(), _, _);
+
+  Future<scheduler::Event::UpdateOperationStatus> update;
+  EXPECT_CALL(*scheduler, updateOperationStatus(_, _))
+    .WillOnce(FutureArg<1>(&update));
+
+  Future<ReregisterSlaveMessage> reregisterSlaveMessage =
+    FUTURE_PROTOBUF(ReregisterSlaveMessage(), _, _);
+
+  // Resume the clock to avoid deadlocks related to agent registration.
+  // See MESOS-8828.
+  Clock::resume();
+
+  // Restart the agent to trigger operation reconciliation. This is reasonable
+  // because dropped messages from master to agent should only occur when there
+  // is an agent disconnection.
+  slave->reset();
+
+  slave = StartSlave(detector.get(), slaveFlags);
+  ASSERT_SOME(slave);
+
+  AWAIT_READY(reregisterSlaveMessage);
+
+  Clock::pause();
+
+  AWAIT_READY(updateOperationStatusMessage);
+  AWAIT_READY(update);
+
+  EXPECT_EQ(operationId, update->status().operation_id());
+  EXPECT_EQ(OPERATION_DROPPED, update->status().state());
+  EXPECT_FALSE(update->status().has_uuid());
+  EXPECT_FALSE(update->status().has_resource_provider_id());
+  EXPECT_TRUE(metricEquals("master/operations/dropped", 1));
+
+  const AgentID& agentId(offer.agent_id());
+  ASSERT_TRUE(update->status().has_agent_id());
+  EXPECT_EQ(agentId, update->status().agent_id());
+
+  // Inject a duplicate operation status update. Before resolution of
+  // MESOS-9698, this would crash the master.
+  process::post(
+      slave.get()->pid,
+      master.get()->pid,
+      updateOperationStatusMessage.get());
+
+  // Since a terminal update was already received by the master, nothing should
+  // be forwarded to the scheduler now. Settle the clock to ensure that we would
+  // notice if this happens.
+  Clock::settle();
+
+  Clock::resume();
+}
+
 } // namespace v1 {
 } // namespace tests {
 } // namespace internal {