You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2013/08/14 20:07:41 UTC
[18/18] git commit: Fixed status update manager to properly handle
duplicate updates.
Fixed status update manager to properly handle duplicate updates.
Review: https://reviews.apache.org/r/13520
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/ad2d6624
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/ad2d6624
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/ad2d6624
Branch: refs/heads/master
Commit: ad2d662497afbe91ad62adb2f0b270e32bcad655
Parents: 3bc167a
Author: Vinod Kone <vi...@twitter.com>
Authored: Mon Aug 12 17:38:08 2013 -0700
Committer: Vinod Kone <vi...@twitter.com>
Committed: Wed Aug 14 11:03:02 2013 -0700
----------------------------------------------------------------------
src/slave/status_update_manager.cpp | 20 ++++-
src/slave/status_update_manager.hpp | 41 ++++++----
src/tests/status_update_manager_tests.cpp | 106 +++++++++++++++++++++++++
3 files changed, 149 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/ad2d6624/src/slave/status_update_manager.cpp
----------------------------------------------------------------------
diff --git a/src/slave/status_update_manager.cpp b/src/slave/status_update_manager.cpp
index 6d4598e..b6afeb1 100644
--- a/src/slave/status_update_manager.cpp
+++ b/src/slave/status_update_manager.cpp
@@ -242,7 +242,7 @@ Future<Nothing> StatusUpdateManagerProcess::recover(
cleanupStatusUpdateStream(task.id, framework.id);
} else {
// If a stream has pending updates after the replay,
- // send the first pending update
+ // send the first pending update.
const Result<StatusUpdate>& next = stream->next();
CHECK(!next.isError());
if (next.isSome()) {
@@ -317,11 +317,17 @@ Future<Nothing> StatusUpdateManagerProcess::_update(
}
// Handle the status update.
- Try<Nothing> result = stream->update(update);
+ Try<bool> result = stream->update(update);
if (result.isError()) {
return Future<Nothing>::failed(result.error());
}
+ // We don't return a failed future here so that the slave can re-ack
+ // the duplicate update.
+ if (!result.get()) {
+ return Nothing();
+ }
+
// Forward the status update to the master if this is the first in the stream.
// Subsequent status updates will get sent in 'acknowledgement()'.
if (stream->pending.size() == 1) {
@@ -403,6 +409,10 @@ Future<bool> StatusUpdateManagerProcess::acknowledgement(
return Future<bool>::failed(result.error());
}
+ if (!result.get()) {
+ return Future<bool>::failed("Duplicate acknowledgement");
+ }
+
// Reset the timeout.
stream->timeout = None();
@@ -412,7 +422,9 @@ Future<bool> StatusUpdateManagerProcess::acknowledgement(
return Future<bool>::failed(next.error());
}
- if (stream->terminated) {
+ bool terminated = stream->terminated;
+
+ if (terminated) {
if (next.isSome()) {
LOG(WARNING) << "Acknowledged a terminal"
<< " status update " << update.get()
@@ -424,7 +436,7 @@ Future<bool> StatusUpdateManagerProcess::acknowledgement(
stream->timeout = forward(next.get());
}
- return result.get();
+ return !terminated;
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/ad2d6624/src/slave/status_update_manager.hpp
----------------------------------------------------------------------
diff --git a/src/slave/status_update_manager.hpp b/src/slave/status_update_manager.hpp
index ffc79ae..a032646 100644
--- a/src/slave/status_update_manager.hpp
+++ b/src/slave/status_update_manager.hpp
@@ -103,9 +103,9 @@ public:
// Checkpoints the status update to disk if necessary.
// Also, sends the next pending status update, if any.
- // @return true if the ACK is handled successfully (e.g., checkpointed)
+ // @return True if the ACK is handled successfully (e.g., checkpointed)
// and the task's status update stream is not terminated.
- // false same as above except the status update stream is terminated.
+ // False same as above except the status update stream is terminated.
// Failed if there are any errors (e.g., duplicate, checkpointing).
process::Future<bool> acknowledgement(
const TaskID& taskId,
@@ -200,7 +200,11 @@ struct StatusUpdateStream
}
}
- Try<Nothing> update(const StatusUpdate& update)
+ // This function handles the update, checkpointing if necessary.
+ // @return True if the update is successfully handled.
+ // False if the update is a duplicate.
+ // Error Any errors (e.g., checkpointing).
+ Try<bool> update(const StatusUpdate& update)
{
if (error.isSome()) {
return Error(error.get());
@@ -212,7 +216,7 @@ struct StatusUpdateStream
if (acknowledged.contains(UUID::fromBytes(update.uuid()))) {
LOG(WARNING) << "Ignoring status update " << update
<< " that has already been acknowledged by the framework!";
- return Nothing();
+ return false;
}
// Check that this update hasn't already been received.
@@ -220,13 +224,22 @@ struct StatusUpdateStream
// then crashes after it writes it to disk but before it sends an ack.
if (received.contains(UUID::fromBytes(update.uuid()))) {
LOG(WARNING) << "Ignoring duplicate status update " << update;
- return Nothing();
+ return false;
}
// Handle the update, checkpointing if necessary.
- return handle(update, StatusUpdateRecord::UPDATE);
+ Try<Nothing> result = handle(update, StatusUpdateRecord::UPDATE);
+ if (result.isError()) {
+ return Error(result.error());
+ }
+
+ return true;
}
+ // This function handles the ACK, checkpointing if necessary.
+ // @return True if the acknowledgement is successfully handled.
+ // False if the acknowledgement is a duplicate.
+ // Error Any errors (e.g., checkpointing).
Try<bool> acknowledgement(
const TaskID& taskId,
const FrameworkID& frameworkId,
@@ -238,18 +251,18 @@ struct StatusUpdateStream
}
if (acknowledged.contains(uuid)) {
- return Error("Duplicate status update acknowledgment (UUID: "
- + uuid.toString() + ") for update " + stringify(update));
+ LOG(WARNING) << "Duplicate status update acknowledgment (UUID: "
+ << uuid << ") for update " << update;
+ return false;
}
// This might happen if we retried a status update and got back
// acknowledgments for both the original and the retried update.
if (uuid != UUID::fromBytes(update.uuid())) {
- return Error(
- "Unexpected status update acknowledgement (received " +
- uuid.toString() +
- ", expecting " + UUID::fromBytes(update.uuid()).toString() +
- ") for update " + stringify(update));
+ LOG(WARNING) << "Unexpected status update acknowledgement (received "
+ << uuid << ", expecting " << UUID::fromBytes(update.uuid())
+ << ") for update " << update;
+ return false;
}
// Handle the ACK, checkpointing if necessary.
@@ -258,7 +271,7 @@ struct StatusUpdateStream
return Error(result.error());
}
- return !terminated;
+ return true;
}
// Returns the next update (or none, if empty) in the queue.
http://git-wip-us.apache.org/repos/asf/mesos/blob/ad2d6624/src/tests/status_update_manager_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/status_update_manager_tests.cpp b/src/tests/status_update_manager_tests.cpp
index 6473478..cf420e4 100644
--- a/src/tests/status_update_manager_tests.cpp
+++ b/src/tests/status_update_manager_tests.cpp
@@ -660,3 +660,109 @@ TEST_F(StatusUpdateManagerTest, DuplicateTerminalUpdateAfterAck)
Shutdown();
}
+
+
+// This test verifies that the slave and status update manager
+// properly handle duplicate status updates, when the second
+// update with the same UUID is received before the ACK for the
+// first update. The proper behavior here is for the status update
+// manager to drop the duplicate update.
+TEST_F(StatusUpdateManagerTest, DuplicateUpdateBeforeAck)
+{
+ Try<PID<Master> > master = StartMaster();
+ ASSERT_SOME(master);
+
+ MockExecutor exec(DEFAULT_EXECUTOR_ID);
+
+ slave::Flags flags = CreateSlaveFlags();
+ flags.checkpoint = true;
+
+ Try<PID<Slave> > slave = StartSlave(&exec, flags);
+ ASSERT_SOME(slave);
+
+ FrameworkInfo frameworkInfo; // Bug in gcc 4.1.*, must assign on next line.
+ frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+ frameworkInfo.set_checkpoint(true); // Enable checkpointing.
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(&sched, frameworkInfo, master.get());
+
+ FrameworkID frameworkId;
+ EXPECT_CALL(sched, registered(_, _, _))
+ .WillOnce(SaveArg<1>(&frameworkId));
+
+ Future<vector<Offer> > offers;
+ EXPECT_CALL(sched, resourceOffers(_, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ driver.start();
+
+ AWAIT_READY(offers);
+ EXPECT_NE(0u, offers.get().size());
+
+ ExecutorDriver* execDriver;
+ EXPECT_CALL(exec, registered(_, _, _, _))
+ .WillOnce(SaveArg<0>(&execDriver));
+
+ EXPECT_CALL(exec, launchTask(_, _))
+ .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+
+ // Capture the first status update message.
+ Future<StatusUpdateMessage> statusUpdateMessage =
+ FUTURE_PROTOBUF(StatusUpdateMessage(), _, _);
+
+ Future<TaskStatus> status;
+ EXPECT_CALL(sched, statusUpdate(_, _))
+ .WillOnce(FutureArg<1>(&status));
+
+ // Drop the first ACK from the scheduler to the slave.
+ Future<StatusUpdateAcknowledgementMessage> statusUpdateAcknowledgementMessage =
+ DROP_PROTOBUF(StatusUpdateAcknowledgementMessage(), _, slave.get());
+
+ Clock::pause();
+
+ driver.launchTasks(offers.get()[0].id(), createTasks(offers.get()[0]));
+
+ AWAIT_READY(statusUpdateMessage);
+
+ AWAIT_READY(status);
+
+ EXPECT_EQ(TASK_RUNNING, status.get().state());
+
+ AWAIT_READY(statusUpdateAcknowledgementMessage);
+
+ Future<Nothing> _statusUpdate =
+ FUTURE_DISPATCH(slave.get(), &Slave::_statusUpdate);
+
+ // Now resend the TASK_RUNNING update.
+ process::post(slave.get(), statusUpdateMessage.get());
+
+ // At this point the status update manager has handled
+ // the duplicate status update.
+ AWAIT_READY(_statusUpdate);
+
+ // After we advance the clock, the status update manager should
+ // retry the TASK_RUNING update and the scheduler should receive
+ // and acknowledge it.
+ Future<TaskStatus> update;
+ EXPECT_CALL(sched, statusUpdate(_, _))
+ .WillOnce(FutureArg<1>(&update));
+
+ Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL);
+ Clock::settle();
+
+ // Ensure the scheduler receives TASK_FINISHED.
+ AWAIT_READY(update);
+ EXPECT_EQ(TASK_RUNNING, update.get().state());
+
+ EXPECT_CALL(exec, shutdown(_))
+ .Times(AtMost(1));
+
+ Clock::resume();
+
+ driver.stop();
+ driver.join();
+
+ Shutdown();
+}