You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by jo...@apache.org on 2016/10/04 01:57:17 UTC
[1/3] mesos git commit: Added 'at' to LinkedHashMap.
Repository: mesos
Updated Branches:
refs/heads/1.0.x 6e503351a -> 837bb4ecc
Added 'at' to LinkedHashMap.
Review: https://reviews.apache.org/r/51403
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a333a2ed
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a333a2ed
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a333a2ed
Branch: refs/heads/1.0.x
Commit: a333a2edcf633c2075aad32660cb6222691b1931
Parents: 6e50335
Author: Benjamin Mahler <bm...@apache.org>
Authored: Wed Aug 24 18:03:54 2016 -0700
Committer: Joseph Wu <jo...@apache.org>
Committed: Mon Oct 3 18:40:16 2016 -0700
----------------------------------------------------------------------
3rdparty/stout/include/stout/linkedhashmap.hpp | 10 ++++++++++
1 file changed, 10 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/a333a2ed/3rdparty/stout/include/stout/linkedhashmap.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/stout/include/stout/linkedhashmap.hpp b/3rdparty/stout/include/stout/linkedhashmap.hpp
index 6e06260..ef1578e 100644
--- a/3rdparty/stout/include/stout/linkedhashmap.hpp
+++ b/3rdparty/stout/include/stout/linkedhashmap.hpp
@@ -49,6 +49,16 @@ public:
return None();
}
+ Value& at(const Key& key)
+ {
+ return values_.at(key).first;
+ }
+
+ const Value& at(const Key& key) const
+ {
+ return values_.at(key).first;
+ }
+
bool contains(const Key& key) const
{
return values_.contains(key);
[3/3] mesos git commit: Backported MESOS-6026 to 1.0.2.
Posted by jo...@apache.org.
Backported MESOS-6026 to 1.0.2.
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/837bb4ec
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/837bb4ec
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/837bb4ec
Branch: refs/heads/1.0.x
Commit: 837bb4ecc0424b4cd6c7996bd6752ec00b437322
Parents: d1b4b72
Author: Joseph Wu <jo...@apache.org>
Authored: Mon Oct 3 18:45:36 2016 -0700
Committer: Joseph Wu <jo...@apache.org>
Committed: Mon Oct 3 18:45:36 2016 -0700
----------------------------------------------------------------------
CHANGELOG | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/837bb4ec/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index c254d2c..ca9192d 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -16,7 +16,8 @@ All Issues:
* [MESOS-6233] - Master CHECK fails during recovery while relinking to other masters.
* [MESOS-6234] - Potential socket leak during Zookeeper network changes.
* [MESOS-6246] - Libprocess links will not generate an ExitedEvent if the socket creation fails.
- * [MESOS-6299] - Master doesn't remove task from pending when it is invalid
+ * [MESOS-6299] - Master doesn't remove task from pending when it is invalid.
+ * [MESOS-6026] - Tasks mistakenly marked as FAILED due to race b/w sendExecutorTerminatedStatusUpdate() and _statusUpdate().
Release Notes - Mesos - Version 1.0.1
[2/3] mesos git commit: Updated the agent to reject multiple terminal
status updates.
Posted by jo...@apache.org.
Updated the agent to reject multiple terminal status updates.
Currently the agent will allow terminal status updates when updating
the state of a task. This allows the status update invariants to be
violated! This patch disallows multiple terminal updates.
Review: https://reviews.apache.org/r/51404
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/d1b4b72d
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/d1b4b72d
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/d1b4b72d
Branch: refs/heads/1.0.x
Commit: d1b4b72dd14ab23ed731b8cf46edd8cd5d5b0058
Parents: a333a2e
Author: Benjamin Mahler <bm...@apache.org>
Authored: Fri Aug 12 19:44:43 2016 -0700
Committer: Joseph Wu <jo...@apache.org>
Committed: Mon Oct 3 18:41:26 2016 -0700
----------------------------------------------------------------------
src/slave/slave.cpp | 168 ++++++++++++++-----------
src/slave/slave.hpp | 3 +-
src/tests/slave_tests.cpp | 111 ++++++++++++++++
src/tests/status_update_manager_tests.cpp | 108 ----------------
4 files changed, 205 insertions(+), 185 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/d1b4b72d/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 47364b7..09be083 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -3427,18 +3427,26 @@ void Slave::_statusUpdate(
// task is sent only after the acknowledgement for the previous one
// is received, which could take a long time if the framework is
// backed up or is down.
- executor->updateTaskState(status);
-
- // Handle the task appropriately if it is terminated.
- // TODO(vinod): Revisit these semantics when we disallow duplicate
- // terminal updates (e.g., when slave recovery is always enabled).
- if (protobuf::isTerminalState(status.state()) &&
- (executor->queuedTasks.contains(status.task_id()) ||
- executor->launchedTasks.contains(status.task_id()))) {
- executor->terminateTask(status.task_id(), status);
-
- // Wait until the container's resources have been updated before
- // sending the status update.
+ Try<Nothing> updated = executor->updateTaskState(status);
+
+ // If we fail to update the task state, drop the update. Note that
+ // we have to acknowledge the executor so that it does not retry.
+ if (updated.isError()) {
+ LOG(ERROR) << "Failed to update state of task '" << status.task_id() << "'"
+ << " to " << status.state() << ": " << updated.error();
+
+ // NOTE: This may lead to out-of-order acknowledgements since
+ // other update acknowledgements may be waiting for the
+ // containerizer or status update manager.
+ ___statusUpdate(Nothing(), update, pid);
+ return;
+ }
+
+ if (protobuf::isTerminalState(status.state())) {
+ // If the task terminated, wait until the container's resources
+ // have been updated before sending the status update. Note that
+ // duplicate terminal updates are not possible here because they
+ // lead to an error from `Executor::updateTaskState`.
containerizer->update(executor->containerId, executor->resources)
.onAny(defer(self(),
&Slave::__statusUpdate,
@@ -6023,49 +6031,6 @@ Task* Executor::addTask(const TaskInfo& task)
}
-void Executor::terminateTask(
- const TaskID& taskId,
- const mesos::TaskStatus& status)
-{
- VLOG(1) << "Terminating task " << taskId;
-
- Task* task = nullptr;
- // Remove the task if it's queued.
- if (queuedTasks.contains(taskId)) {
- task = new Task(
- protobuf::createTask(queuedTasks[taskId], status.state(), frameworkId));
- queuedTasks.erase(taskId);
- } else if (launchedTasks.contains(taskId)) {
- // Update the resources if it's been launched.
- task = launchedTasks[taskId];
- resources -= task->resources();
- launchedTasks.erase(taskId);
- }
-
- switch (status.state()) {
- case TASK_FINISHED:
- ++slave->metrics.tasks_finished;
- break;
- case TASK_FAILED:
- ++slave->metrics.tasks_failed;
- break;
- case TASK_KILLED:
- ++slave->metrics.tasks_killed;
- break;
- case TASK_LOST:
- ++slave->metrics.tasks_lost;
- break;
- default:
- LOG(WARNING) << "Unhandled task state " << status.state()
- << " on completion.";
- break;
- }
-
- // TODO(dhamon): Update source/reason metrics.
- terminatedTasks[taskId] = CHECK_NOTNULL(task);
-}
-
-
void Executor::completeTask(const TaskID& taskId)
{
VLOG(1) << "Completing task " << taskId;
@@ -6135,21 +6100,28 @@ void Executor::recoverTask(const TaskState& state)
// Read updates to get the latest state of the task.
foreach (const StatusUpdate& update, state.updates) {
- updateTaskState(update.status());
-
- // Terminate the task if it received a terminal update.
- // We ignore duplicate terminal updates by checking if
- // the task is present in launchedTasks.
- // TODO(vinod): Revisit these semantics when we disallow duplicate
- // terminal updates (e.g., when slave recovery is always enabled).
- if (protobuf::isTerminalState(update.status().state()) &&
- launchedTasks.contains(state.id)) {
- terminateTask(state.id, update.status());
+ Try<Nothing> updated = updateTaskState(update.status());
+
+ // TODO(bmahler): We only log this error because we used to
+ // allow multiple terminal updates and so we may encounter
+ // this when recovering an old executor. We can hard-CHECK
+ // this 6 months from 1.1.0.
+ if (updated.isError()) {
+ LOG(ERROR) << "Failed to update state of recovered task"
+ << " '" << state.id << "' to " << update.status().state()
+ << ": " << updated.error();
+
+ // The only case that should be possible here is when the
+ // task had multiple terminal updates persisted.
+ continue;
+ }
+ // Complete the task if it is terminal and
+ // the update has been acknowledged.
+ if (protobuf::isTerminalState(update.status().state())) {
CHECK(update.has_uuid())
<< "Expecting updates without 'uuid' to have been rejected";
- // If the terminal update has been acknowledged, remove it.
if (state.acks.contains(UUID::fromBytes(update.uuid()).get())) {
completeTask(state.id);
}
@@ -6159,18 +6131,64 @@ void Executor::recoverTask(const TaskState& state)
}
-void Executor::updateTaskState(const TaskStatus& status)
+Try<Nothing> Executor::updateTaskState(const TaskStatus& status)
{
- if (launchedTasks.contains(status.task_id())) {
- Task* task = launchedTasks[status.task_id()];
- // TODO(brenden): Consider wiping the `data` and `message` fields?
- if (task->statuses_size() > 0 &&
- task->statuses(task->statuses_size() - 1).state() == status.state()) {
- task->mutable_statuses()->RemoveLast();
+ bool terminal = protobuf::isTerminalState(status.state());
+
+ const TaskID& taskId = status.task_id();
+ Task* task = nullptr;
+
+ if (queuedTasks.contains(taskId)) {
+ if (terminal) {
+ task = new Task(protobuf::createTask(
+ queuedTasks.at(taskId),
+ status.state(),
+ frameworkId));
+ queuedTasks.erase(taskId);
+ terminatedTasks[taskId] = task;
+ } else {
+ return Error("Cannot send non-terminal update for queued task");
+ }
+ } else if (launchedTasks.contains(taskId)) {
+ task = launchedTasks.at(status.task_id());
+
+ if (terminal) {
+ resources -= task->resources(); // Release the resources.
+ launchedTasks.erase(taskId);
+ terminatedTasks[taskId] = task;
+ }
+ } else if (terminatedTasks.contains(taskId)) {
+ return Error("Task is already terminated with state"
+ " " + stringify(terminatedTasks.at(taskId)->state()));
+ } else {
+ return Error("Task is unknown");
+ }
+
+ CHECK_NOTNULL(task);
+
+ // TODO(brenden): Consider wiping the `data` and `message` fields?
+ if (task->statuses_size() > 0 &&
+ task->statuses(task->statuses_size() - 1).state() == status.state()) {
+ task->mutable_statuses()->RemoveLast();
+ }
+ task->add_statuses()->CopyFrom(status);
+ task->set_state(status.state());
+
+ // TODO(bmahler): This only increments the state when the update
+ // can be handled. Should we always increment the state?
+ if (terminal) {
+ switch (status.state()) {
+ case TASK_FINISHED: ++slave->metrics.tasks_finished; break;
+ case TASK_FAILED: ++slave->metrics.tasks_failed; break;
+ case TASK_KILLED: ++slave->metrics.tasks_killed; break;
+ case TASK_LOST: ++slave->metrics.tasks_lost; break;
+ default:
+ LOG(ERROR) << "Unexpected terminal task state " << status.state();
+ break;
}
- task->add_statuses()->CopyFrom(status);
- task->set_state(status.state());
}
+
+ return Nothing();
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/d1b4b72d/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index a8952f0..95223ec 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -738,12 +738,11 @@ struct Executor
~Executor();
Task* addTask(const TaskInfo& task);
- void terminateTask(const TaskID& taskId, const mesos::TaskStatus& status);
void completeTask(const TaskID& taskId);
void checkpointExecutor();
void checkpointTask(const TaskInfo& task);
void recoverTask(const state::TaskState& state);
- void updateTaskState(const TaskStatus& status);
+ Try<Nothing> updateTaskState(const TaskStatus& status);
// Returns true if there are any queued/launched/terminated tasks.
bool incompleteTasks();
http://git-wip-us.apache.org/repos/asf/mesos/blob/d1b4b72d/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index b9fa85d..ea8f403 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -166,6 +166,117 @@ TEST_F(SlaveTest, Shutdown)
}
+// This test verifies that the slave rejects duplicate terminal
+// status updates for tasks before the first terminal update is
+// acknowledged.
+TEST_F(SlaveTest, DuplicateTerminalUpdateBeforeAck)
+{
+ Clock::pause();
+
+ Try<Owned<cluster::Master>> master = StartMaster();
+ ASSERT_SOME(master);
+
+ MockExecutor exec(DEFAULT_EXECUTOR_ID);
+ TestContainerizer containerizer(&exec);
+
+ Owned<MasterDetector> detector = master.get()->createDetector();
+ Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), &containerizer);
+ ASSERT_SOME(slave);
+
+ FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+ frameworkInfo.set_checkpoint(true); // Enable checkpointing.
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
+
+ FrameworkID frameworkId;
+ EXPECT_CALL(sched, registered(_, _, _))
+ .WillOnce(SaveArg<1>(&frameworkId));
+
+ Future<vector<Offer>> offers;
+ EXPECT_CALL(sched, resourceOffers(_, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ driver.start();
+
+ AWAIT_READY(offers);
+ EXPECT_NE(0u, offers->size());
+
+ ExecutorDriver* execDriver;
+ EXPECT_CALL(exec, registered(_, _, _, _))
+ .WillOnce(SaveArg<0>(&execDriver));
+
+ // Send a terminal update right away.
+ EXPECT_CALL(exec, launchTask(_, _))
+ .WillOnce(SendStatusUpdateFromTask(TASK_FINISHED));
+
+ Future<TaskStatus> status;
+ EXPECT_CALL(sched, statusUpdate(_, _))
+ .WillOnce(FutureArg<1>(&status));
+
+ // Drop the first ACK from the scheduler to the slave.
+ Future<StatusUpdateAcknowledgementMessage> statusUpdateAckMessage =
+ DROP_PROTOBUF(StatusUpdateAcknowledgementMessage(), _, slave.get()->pid);
+
+ Future<Nothing> ___statusUpdate =
+ FUTURE_DISPATCH(slave.get()->pid, &Slave::___statusUpdate);
+
+ TaskInfo task;
+ task.set_name("test-task");
+ task.mutable_task_id()->set_value("1");
+ task.mutable_slave_id()->MergeFrom(offers->at(0).slave_id());
+ task.mutable_resources()->MergeFrom(offers->at(0).resources());
+ task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO);
+
+ driver.launchTasks(offers->at(0).id(), {task});
+
+ AWAIT_READY(status);
+
+ EXPECT_EQ(TASK_FINISHED, status->state());
+
+ AWAIT_READY(statusUpdateAckMessage);
+
+ // At this point the status update manager has enqueued
+ // TASK_FINISHED update.
+ AWAIT_READY(___statusUpdate);
+
+ Future<Nothing> _statusUpdate2 =
+ FUTURE_DISPATCH(slave.get()->pid, &Slave::_statusUpdate);
+
+ // Now send a TASK_KILLED update for the same task.
+ TaskStatus status2 = status.get();
+ status2.set_state(TASK_KILLED);
+ execDriver->sendStatusUpdate(status2);
+
+ // At this point the slave has handled the TASK_KILLED update.
+ AWAIT_READY(_statusUpdate2);
+
+ // After we advance the clock, the scheduler should receive
+ // the retried TASK_FINISHED update and acknowledge it.
+ Future<TaskStatus> update;
+ EXPECT_CALL(sched, statusUpdate(_, _))
+ .WillOnce(FutureArg<1>(&update));
+
+ Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
+ Clock::settle();
+
+ // Ensure the scheduler receives TASK_FINISHED.
+ AWAIT_READY(update);
+ EXPECT_EQ(TASK_FINISHED, update->state());
+
+ // Settle the clock to ensure that TASK_KILLED is not sent.
+ Clock::settle();
+
+ EXPECT_CALL(exec, shutdown(_))
+ .Times(AtMost(1));
+
+ driver.stop();
+ driver.join();
+}
+
+
TEST_F(SlaveTest, ShutdownUnregisteredExecutor)
{
Try<Owned<cluster::Master>> master = StartMaster();
http://git-wip-us.apache.org/repos/asf/mesos/blob/d1b4b72d/src/tests/status_update_manager_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/status_update_manager_tests.cpp b/src/tests/status_update_manager_tests.cpp
index f5313a1..4b67700 100644
--- a/src/tests/status_update_manager_tests.cpp
+++ b/src/tests/status_update_manager_tests.cpp
@@ -458,114 +458,6 @@ TEST_F(StatusUpdateManagerTest, IgnoreUnexpectedStatusUpdateAck)
// This test verifies that the slave and status update manager
// properly handle duplicate terminal status updates, when the
-// second update is received before the ACK for the first update.
-// The proper behavior here is for the status update manager to
-// drop the duplicate update.
-TEST_F(StatusUpdateManagerTest, DuplicateTerminalUpdateBeforeAck)
-{
- Try<Owned<cluster::Master>> master = StartMaster();
- ASSERT_SOME(master);
-
- MockExecutor exec(DEFAULT_EXECUTOR_ID);
- TestContainerizer containerizer(&exec);
-
- Owned<MasterDetector> detector = master.get()->createDetector();
- Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), &containerizer);
- ASSERT_SOME(slave);
-
- FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
- frameworkInfo.set_checkpoint(true); // Enable checkpointing.
-
- MockScheduler sched;
- MesosSchedulerDriver driver(
- &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
-
- FrameworkID frameworkId;
- EXPECT_CALL(sched, registered(_, _, _))
- .WillOnce(SaveArg<1>(&frameworkId));
-
- Future<vector<Offer> > offers;
- EXPECT_CALL(sched, resourceOffers(_, _))
- .WillOnce(FutureArg<1>(&offers))
- .WillRepeatedly(Return()); // Ignore subsequent offers.
-
- driver.start();
-
- AWAIT_READY(offers);
- EXPECT_NE(0u, offers.get().size());
-
- ExecutorDriver* execDriver;
- EXPECT_CALL(exec, registered(_, _, _, _))
- .WillOnce(SaveArg<0>(&execDriver));
-
- // Send a terminal update right away.
- EXPECT_CALL(exec, launchTask(_, _))
- .WillOnce(SendStatusUpdateFromTask(TASK_FINISHED));
-
- Future<TaskStatus> status;
- EXPECT_CALL(sched, statusUpdate(_, _))
- .WillOnce(FutureArg<1>(&status));
-
- // Drop the first ACK from the scheduler to the slave.
- Future<StatusUpdateAcknowledgementMessage> statusUpdateAckMessage =
- DROP_PROTOBUF(StatusUpdateAcknowledgementMessage(), _, slave.get()->pid);
-
- Future<Nothing> ___statusUpdate =
- FUTURE_DISPATCH(slave.get()->pid, &Slave::___statusUpdate);
-
- Clock::pause();
-
- driver.launchTasks(offers.get()[0].id(), createTasks(offers.get()[0]));
-
- AWAIT_READY(status);
-
- EXPECT_EQ(TASK_FINISHED, status.get().state());
-
- AWAIT_READY(statusUpdateAckMessage);
-
- // At this point the status update manager has enqueued
- // TASK_FINISHED update.
- AWAIT_READY(___statusUpdate);
-
- Future<Nothing> ___statusUpdate2 =
- FUTURE_DISPATCH(slave.get()->pid, &Slave::___statusUpdate);
-
- // Now send a TASK_KILLED update for the same task.
- TaskStatus status2 = status.get();
- status2.set_state(TASK_KILLED);
- execDriver->sendStatusUpdate(status2);
-
- // At this point the status update manager has enqueued
- // TASK_FINISHED and TASK_KILLED updates.
- AWAIT_READY(___statusUpdate2);
-
- // After we advance the clock, the scheduler should receive
- // the retried TASK_FINISHED update and acknowledge it. The
- // TASK_KILLED update should be dropped by the status update
- // manager, as the stream is already terminated.
- Future<TaskStatus> update;
- EXPECT_CALL(sched, statusUpdate(_, _))
- .WillOnce(FutureArg<1>(&update));
-
- Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
- Clock::settle();
-
- // Ensure the scheduler receives TASK_FINISHED.
- AWAIT_READY(update);
- EXPECT_EQ(TASK_FINISHED, update.get().state());
-
- EXPECT_CALL(exec, shutdown(_))
- .Times(AtMost(1));
-
- Clock::resume();
-
- driver.stop();
- driver.join();
-}
-
-
-// This test verifies that the slave and status update manager
-// properly handle duplicate terminal status updates, when the
// second update is received after the ACK for the first update.
// The proper behavior here is for the status update manager to
// forward the duplicate update to the scheduler.