You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by gr...@apache.org on 2019/05/07 18:02:46 UTC
[mesos] branch master updated: Fixed unguarded calls to
`Option::get()` in the master.
This is an automated email from the ASF dual-hosted git repository.
grag pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git
The following commit(s) were added to refs/heads/master by this push:
new 31ceed8 Fixed unguarded calls to `Option::get()` in the master.
31ceed8 is described below
commit 31ceed8dc636b3d63592d9c74f6cec03ed9745e8
Author: Greg Mann <gr...@gmail.com>
AuthorDate: Thu May 2 14:48:48 2019 -0700
Fixed unguarded calls to `Option::get()` in the master.
Review: https://reviews.apache.org/r/70587
---
src/master/master.cpp | 32 +++++--
src/tests/agent_operation_feedback_tests.cpp | 138 +++++++++++++++++++++++++++
2 files changed, 162 insertions(+), 8 deletions(-)
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 6c0e30b..437cbca 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -8759,8 +8759,9 @@ void Master::updateOperationStatus(UpdateOperationStatusMessage&& update)
const SlaveID& slaveId = update.slave_id();
- // The status update for the operation might be for an
- // operator API call, thus the framework ID here is optional.
+ // While currently only frameworks can initiate operations which request
+ // feedback, in some cases such as master/agent reconciliation, the framework
+ // ID will not be set in the update message.
Option<FrameworkID> frameworkId = update.has_framework_id()
? update.framework_id()
: Option<FrameworkID>::none();
@@ -8770,6 +8771,8 @@ void Master::updateOperationStatus(UpdateOperationStatusMessage&& update)
// agent. Since the operation UUID is not known, there is nothing for the
// master to do but forward the update to the framework and return.
if (!update.has_operation_uuid()) {
+ CHECK_SOME(frameworkId);
+
// Forward the status update to the framework.
Framework* framework = getFramework(frameworkId.get());
@@ -8815,11 +8818,20 @@ void Master::updateOperationStatus(UpdateOperationStatusMessage&& update)
Operation* operation = slave->getOperation(update.operation_uuid());
if (operation == nullptr) {
- // If the operation cannot be found, then this must be an update sent as a
- // result of a framework-inititated reconciliation which was forwarded to
- // the agent. Since the operation is not known to the master, there is
- // nothing for the master to do but forward the update to the framework and
- // return.
+ // If the operation cannot be found and no framework ID was set, then this
+ // must be a duplicate update as a result of master/agent reconciliation. In
+ // this case, a terminal reconciliation update has already been received by
+ // the master, so we simply return.
+ if (frameworkId.isNone()) {
+ return;
+ }
+
+ // If the operation cannot be found and a framework ID was set, then this
+ // must be an update sent as a result of a framework-inititated
+ // reconciliation which was forwarded to the agent and raced with an
+ // `UpdateSlaveMessage` which removed the operation. Since the operation is
+ // not known to the master, there is nothing for the master to do but
+ // forward the update to the framework and return.
Framework* framework = getFramework(frameworkId.get());
if (framework == nullptr || !framework->connected()) {
@@ -8846,7 +8858,7 @@ void Master::updateOperationStatus(UpdateOperationStatusMessage&& update)
// `ReconcileOperationsMessage`, but they can be deduced from the operation
// info kept on the master.
- // Only operations done via the scheduler API can have an ID.
+ // Currently, only operations done via the scheduler API can have an ID.
CHECK(operation->has_framework_id());
frameworkId = operation->framework_id();
@@ -8866,10 +8878,13 @@ void Master::updateOperationStatus(UpdateOperationStatusMessage&& update)
// Orphaned operations have no framework to send updates to.
bool frameworkWillAcknowledge =
operation->info().has_id() &&
+ frameworkId.isSome() &&
!isCompletedFramework(frameworkId.get()) &&
!slave->orphanedOperations.contains(operation->uuid());
if (frameworkWillAcknowledge) {
+ CHECK_SOME(frameworkId);
+
// Forward the status update to the framework.
Framework* framework = getFramework(frameworkId.get());
@@ -8910,6 +8925,7 @@ void Master::updateOperationStatus(UpdateOperationStatusMessage&& update)
// buffer may also be affected by this wait.
if (operation->info().has_id() &&
slave->orphanedOperations.contains(operation->uuid()) &&
+ frameworkId.isSome() &&
!isCompletedFramework(frameworkId.get())) {
if (slave->reregisteredTime.isSome() &&
(Clock::now() - slave->reregisteredTime.get()) <
diff --git a/src/tests/agent_operation_feedback_tests.cpp b/src/tests/agent_operation_feedback_tests.cpp
index e427441..a90038e 100644
--- a/src/tests/agent_operation_feedback_tests.cpp
+++ b/src/tests/agent_operation_feedback_tests.cpp
@@ -580,6 +580,144 @@ TEST_P(AgentOperationFeedbackTest, DroppedOperationStatusUpdate)
Clock::resume();
}
+
+// When a scheduler requests feedback for an operation and the operation is
+// dropped en route to the agent, it's possible that a `ReregisterSlaveMessage`
+// from the agent races with a `SlaveReregisteredMessage` from the master. In
+// this case, master/agent reconciliation of the operation may be performed more
+// than once, leading to duplicate operation status updates. This test verifies
+// that the master gracefully handles such a sequence of events. This is a
+// regression test for MESOS-9698.
+TEST_P(AgentOperationFeedbackTest, DroppedOperationDuplicateStatusUpdate)
+{
+ Clock::pause();
+
+ Try<Owned<cluster::Master>> master = StartMaster();
+ ASSERT_SOME(master);
+
+ Owned<MasterDetector> detector = master.get()->createDetector();
+
+ slave::Flags slaveFlags = CreateSlaveFlags();
+
+ Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags);
+ ASSERT_SOME(slave);
+
+ // Advance the clock to trigger agent registration.
+ Clock::advance(slaveFlags.registration_backoff_factor);
+
+ // Register a framework to exercise an operation.
+ FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+ frameworkInfo.set_roles(0, DEFAULT_TEST_ROLE);
+
+ auto scheduler = std::make_shared<MockHTTPScheduler>();
+
+ EXPECT_CALL(*scheduler, connected(_))
+ .WillOnce(scheduler::SendSubscribe(frameworkInfo));
+
+ Future<scheduler::Event::Subscribed> subscribed;
+ EXPECT_CALL(*scheduler, subscribed(_, _))
+ .WillOnce(FutureArg<1>(&subscribed));
+
+ EXPECT_CALL(*scheduler, heartbeat(_))
+ .WillRepeatedly(Return()); // Ignore heartbeats.
+
+ Future<scheduler::Event::Offers> offers;
+
+ // Set an expectation for the first offer.
+ EXPECT_CALL(*scheduler, offers(_, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ scheduler::TestMesos mesos(master.get()->pid, GetParam(), scheduler);
+
+ AWAIT_READY(subscribed);
+
+ const FrameworkID& frameworkId = subscribed->framework_id();
+
+ AWAIT_READY(offers);
+
+ ASSERT_FALSE(offers->offers().empty());
+
+ const Offer& offer = offers->offers(0);
+
+ // Reserve resources.
+ OperationID operationId;
+ operationId.set_value("operation");
+
+ ASSERT_FALSE(offer.resources().empty());
+
+ Resource reservedResources(*(offer.resources().begin()));
+ reservedResources.add_reservations()->CopyFrom(
+ createDynamicReservationInfo(
+ frameworkInfo.roles(0), DEFAULT_CREDENTIAL.principal()));
+
+ // Drop the operation in its way to the agent.
+ Future<ApplyOperationMessage> applyOperationMessage =
+ DROP_PROTOBUF(ApplyOperationMessage(), _, _);
+
+ mesos.send(createCallAccept(
+ frameworkId,
+ offer,
+ {RESERVE(reservedResources, operationId)}));
+
+ AWAIT_READY(applyOperationMessage);
+
+ // Capture the update on the way from agent to master
+ // so that we can inject a duplicate.
+ Future<UpdateOperationStatusMessage> updateOperationStatusMessage =
+ FUTURE_PROTOBUF(UpdateOperationStatusMessage(), _, _);
+
+ Future<scheduler::Event::UpdateOperationStatus> update;
+ EXPECT_CALL(*scheduler, updateOperationStatus(_, _))
+ .WillOnce(FutureArg<1>(&update));
+
+ Future<ReregisterSlaveMessage> reregisterSlaveMessage =
+ FUTURE_PROTOBUF(ReregisterSlaveMessage(), _, _);
+
+ // Resume the clock to avoid deadlocks related to agent registration.
+ // See MESOS-8828.
+ Clock::resume();
+
+ // Restart the agent to trigger operation reconciliation. This is reasonable
+ // because dropped messages from master to agent should only occur when there
+ // is an agent disconnection.
+ slave->reset();
+
+ slave = StartSlave(detector.get(), slaveFlags);
+ ASSERT_SOME(slave);
+
+ AWAIT_READY(reregisterSlaveMessage);
+
+ Clock::pause();
+
+ AWAIT_READY(updateOperationStatusMessage);
+ AWAIT_READY(update);
+
+ EXPECT_EQ(operationId, update->status().operation_id());
+ EXPECT_EQ(OPERATION_DROPPED, update->status().state());
+ EXPECT_FALSE(update->status().has_uuid());
+ EXPECT_FALSE(update->status().has_resource_provider_id());
+ EXPECT_TRUE(metricEquals("master/operations/dropped", 1));
+
+ const AgentID& agentId(offer.agent_id());
+ ASSERT_TRUE(update->status().has_agent_id());
+ EXPECT_EQ(agentId, update->status().agent_id());
+
+ // Inject a duplicate operation status update. Before resolution of
+ // MESOS-9698, this would crash the master.
+ process::post(
+ slave.get()->pid,
+ master.get()->pid,
+ updateOperationStatusMessage.get());
+
+ // Since a terminal update was already received by the master, nothing should
+ // be forwarded to the scheduler now. Settle the clock to ensure that we would
+ // notice if this happens.
+ Clock::settle();
+
+ Clock::resume();
+}
+
} // namespace v1 {
} // namespace tests {
} // namespace internal {