You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by an...@apache.org on 2016/08/05 02:25:06 UTC
[2/3] mesos git commit: Fixed the master to recover resources/update
state for orphan tasks.
Fixed the master to recover resources/update state for orphan tasks.
The master's status handler function used to ignore the status updates
from the agents for frameworks not yet re-connected with the master
upon a failover. This change modifies that logic to still update
the local state and not bail out early.
Review: https://reviews.apache.org/r/50723/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/6d18068b
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/6d18068b
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/6d18068b
Branch: refs/heads/1.0.x
Commit: 6d18068b3d3576ae06d0058dd03b83a8c667db06
Parents: 3defabe
Author: Anand Mazumdar <an...@apache.org>
Authored: Thu Aug 4 19:00:47 2016 -0700
Committer: Anand Mazumdar <an...@apache.org>
Committed: Thu Aug 4 19:04:20 2016 -0700
----------------------------------------------------------------------
src/master/master.cpp | 50 +++++-----
src/tests/master_tests.cpp | 203 ++++++++++++++++++++++++++++++++++++++++
2 files changed, 232 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/6d18068b/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index cf2cb58..8def6b3 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -5134,16 +5134,6 @@ void Master::statusUpdate(StatusUpdate update, const UPID& pid)
return;
}
- Framework* framework = getFramework(update.framework_id());
-
- if (framework == nullptr) {
- LOG(WARNING) << "Ignoring status update " << update
- << " from agent " << *slave
- << " because the framework is unknown";
- metrics->invalid_status_updates++;
- return;
- }
-
LOG(INFO) << "Status update " << update << " from agent " << *slave;
// We ensure that the uuid of task status matches the update's uuid, in case
@@ -5155,8 +5145,21 @@ void Master::statusUpdate(StatusUpdate update, const UPID& pid)
update.mutable_status()->set_uuid(update.uuid());
}
- // Forward the update to the framework.
- forward(update, pid, framework);
+ bool validStatusUpdate = true;
+
+ Framework* framework = getFramework(update.framework_id());
+
+ // A framework might not have re-registered upon a master failover or
+ // got disconnected.
+ if (framework != nullptr && framework->connected) {
+ forward(update, pid, framework);
+ } else {
+ validStatusUpdate = false;
+ LOG(WARNING) << "Received status update " << update << " from agent "
+ << *slave << " for "
+ << (framework == nullptr ? "an unknown " : "a disconnected ")
+ << "framework";
+ }
// Lookup the task and see if we need to update anything locally.
Task* task = slave->getTask(update.framework_id(), update.status().task_id());
@@ -5175,7 +5178,8 @@ void Master::statusUpdate(StatusUpdate update, const UPID& pid)
removeTask(task);
}
- metrics->valid_status_updates++;
+ validStatusUpdate
+ ? metrics->valid_status_updates++ : metrics->invalid_status_updates++;
}
@@ -5195,6 +5199,18 @@ void Master::forward(
LOG(INFO) << "Forwarding status update " << update;
}
+ // The task might not exist in master's memory (e.g., failed task validation).
+ Task* task = framework->getTask(update.status().task_id());
+ if (task != nullptr) {
+ // Set the status update state and uuid for the task. Note that
+ // master-generated updates are terminal and do not have a uuid
+ // (in which case the master also calls `removeTask()`).
+ if (update.has_uuid()) {
+ task->set_status_update_state(update.status().state());
+ task->set_status_update_uuid(update.status().uuid());
+ }
+ }
+
StatusUpdateMessage message;
message.mutable_update()->MergeFrom(update);
message.set_pid(acknowledgee);
@@ -6807,14 +6823,6 @@ void Master::updateTask(Task* task, const StatusUpdate& update)
}
}
- // Set the status update state and uuid for the task. Note that
- // master-generated updates are terminal and do not have a uuid
- // (in which case the master also calls `removeTask()`).
- if (update.has_uuid()) {
- task->set_status_update_state(status.state());
- task->set_status_update_uuid(update.uuid());
- }
-
// TODO(brenden): Consider wiping the `message` field?
if (task->statuses_size() > 0 &&
task->statuses(task->statuses_size() - 1).state() == status.state()) {
http://git-wip-us.apache.org/repos/asf/mesos/blob/6d18068b/src/tests/master_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_tests.cpp b/src/tests/master_tests.cpp
index 6709818..b1d7545 100644
--- a/src/tests/master_tests.cpp
+++ b/src/tests/master_tests.cpp
@@ -86,6 +86,9 @@ using mesos::master::contender::MASTER_CONTENDER_ZK_SESSION_TIMEOUT;
using mesos::master::detector::MasterDetector;
using mesos::master::detector::StandaloneMasterDetector;
+using mesos::v1::scheduler::Call;
+using mesos::v1::scheduler::Event;
+
using process::Clock;
using process::Future;
using process::Owned;
@@ -4658,6 +4661,206 @@ TEST_F(MasterTest, RejectFrameworkWithInvalidFailoverTimeout)
AWAIT_READY(error);
}
+
+// This test verifies that we recover resources when an orphaned task reaches
+// a terminal state.
+TEST_F(MasterTest, RecoverResourcesOrphanedTask)
+{
+ Try<Owned<cluster::Master>> master = StartMaster();
+ ASSERT_SOME(master);
+
+ auto scheduler = std::make_shared<MockV1HTTPScheduler>();
+ auto executor = std::make_shared<MockV1HTTPExecutor>();
+
+ ExecutorID executorId = DEFAULT_EXECUTOR_ID;
+ TestContainerizer containerizer(executorId, executor);
+
+ StandaloneMasterDetector detector(master.get()->pid);
+ Try<Owned<cluster::Slave>> slave = StartSlave(&detector, &containerizer);
+ ASSERT_SOME(slave);
+
+ Future<Nothing> connected;
+ EXPECT_CALL(*scheduler, connected(_))
+ .WillOnce(FutureSatisfy(&connected))
+ .WillOnce(Return());
+
+ ContentType contentType = ContentType::PROTOBUF;
+
+ scheduler::TestV1Mesos mesos(master.get()->pid, contentType, scheduler);
+
+ AWAIT_READY(connected);
+
+ Future<Event::Subscribed> subscribed;
+ EXPECT_CALL(*scheduler, subscribed(_, _))
+ .WillOnce(FutureArg<1>(&subscribed));
+
+ EXPECT_CALL(*scheduler, heartbeat(_))
+ .WillRepeatedly(Return()); // Ignore heartbeats.
+
+ Future<Event::Offers> offers;
+ EXPECT_CALL(*scheduler, offers(_, _))
+ .WillOnce(FutureArg<1>(&offers));
+
+ {
+ Call call;
+ call.set_type(Call::SUBSCRIBE);
+
+ Call::Subscribe* subscribe = call.mutable_subscribe();
+ subscribe->mutable_framework_info()->CopyFrom(DEFAULT_V1_FRAMEWORK_INFO);
+
+ mesos.send(call);
+ }
+
+ AWAIT_READY(subscribed);
+
+ v1::FrameworkID frameworkId = subscribed.get().framework_id();
+
+ AWAIT_READY(offers);
+ EXPECT_NE(0, offers->offers().size());
+
+ v1::executor::Mesos* execMesos = nullptr;
+
+ EXPECT_CALL(*executor, connected(_))
+ .WillOnce(executor::SendSubscribe(frameworkId, evolve(executorId)));
+
+ EXPECT_CALL(*executor, subscribed(_, _))
+ .WillOnce(SaveArg<0>(&execMesos));
+
+ EXPECT_CALL(*executor, launch(_, _))
+ .WillOnce(executor::SendUpdateFromTask(
+ frameworkId, evolve(executorId), v1::TASK_RUNNING));
+
+ Future<Nothing> acknowledged;
+ EXPECT_CALL(*executor, acknowledged(_, _))
+ .WillOnce(FutureSatisfy(&acknowledged));
+
+ Future<Event::Update> update;
+ EXPECT_CALL(*scheduler, update(_, _))
+ .WillOnce(FutureArg<1>(&update));
+
+ const v1::Offer& offer = offers->offers(0);
+
+ v1::TaskInfo taskInfo =
+ evolve(createTask(devolve(offer), "", executorId));
+
+ {
+ Call call;
+ call.mutable_framework_id()->CopyFrom(frameworkId);
+ call.set_type(Call::ACCEPT);
+
+ Call::Accept* accept = call.mutable_accept();
+ accept->add_offer_ids()->CopyFrom(offers->offers(0).id());
+
+ v1::Offer::Operation* operation = accept->add_operations();
+ operation->set_type(v1::Offer::Operation::LAUNCH);
+ operation->mutable_launch()->add_task_infos()->CopyFrom(taskInfo);
+
+ mesos.send(call);
+ }
+
+ AWAIT_READY(acknowledged);
+ AWAIT_READY(update);
+
+ EXPECT_EQ(v1::TASK_RUNNING, update->status().state());
+ EXPECT_TRUE(update->status().has_executor_id());
+ EXPECT_EQ(executorId, devolve(update->status().executor_id()));
+
+ Future<Nothing> disconnected;
+ EXPECT_CALL(*scheduler, disconnected(_))
+ .WillOnce(FutureSatisfy(&disconnected));
+
+ // Failover the master.
+ master->reset();
+ master = StartMaster();
+ ASSERT_SOME(master);
+
+ AWAIT_READY(disconnected);
+
+ // Have the agent re-register with the master.
+ detector.appoint(master.get()->pid);
+
+ // Ensure re-registration is complete.
+ Clock::pause();
+ Clock::settle();
+
+ EXPECT_CALL(*executor, acknowledged(_, _));
+
+ Future<v1::executor::Call> updateCall =
+ FUTURE_HTTP_CALL(v1::executor::Call(),
+ v1::executor::Call::UPDATE,
+ _,
+ contentType);
+
+ // Send a terminal status update while the task is an orphan i.e., the
+ // framework has not reconnected.
+ {
+ v1::TaskStatus status;
+ status.mutable_task_id()->CopyFrom(taskInfo.task_id());
+ status.mutable_executor_id()->CopyFrom(evolve(executorId));
+ status.set_state(v1::TASK_FINISHED);
+ status.set_source(v1::TaskStatus::SOURCE_EXECUTOR);
+ status.set_uuid(UUID::random().toBytes());
+
+ v1::executor::Call call;
+ call.mutable_framework_id()->CopyFrom(frameworkId);
+ call.mutable_executor_id()->CopyFrom(evolve(executorId));
+
+ call.set_type(v1::executor::Call::UPDATE);
+
+ call.mutable_update()->mutable_status()->CopyFrom(status);
+
+ execMesos->send(call);
+ }
+
+ AWAIT_READY(updateCall);
+
+ // Ensure that the update is processed by the agent.
+ Clock::settle();
+
+ Future<Nothing> recoverResources =
+ FUTURE_DISPATCH(_, &MesosAllocatorProcess::recoverResources);
+
+ // Advance the clock for the status update manager to retry with the
+ // latest state of the task.
+ Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
+ Clock::settle();
+
+ // Ensure that the resources are successfully recovered.
+ AWAIT_READY(recoverResources);
+
+ // Ensure that the state of the task is updated to `TASK_FINISHED` on the
+ // master.
+ {
+ v1::master::Call call;
+ call.set_type(v1::master::Call::GET_TASKS);
+
+ process::http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+ headers["Accept"] = stringify(contentType);
+
+ Future<Response> response = process::http::post(
+ master.get()->pid,
+ "api/v1",
+ headers,
+ serialize(contentType, call),
+ stringify(contentType));
+
+ AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
+
+ v1::master::Response::GetTasks tasks = deserialize<v1::master::Response>(
+ contentType, response->body).get().get_tasks();
+
+ ASSERT_TRUE(tasks.IsInitialized());
+ ASSERT_EQ(1, tasks.orphan_tasks().size());
+ ASSERT_EQ(TASK_FINISHED, tasks.orphan_tasks(0).state());
+ }
+
+ EXPECT_CALL(*executor, shutdown(_))
+ .Times(AtMost(1));
+
+ EXPECT_CALL(*executor, disconnected(_))
+ .Times(AtMost(1));
+}
+
} // namespace tests {
} // namespace internal {
} // namespace mesos {