You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2013/08/14 20:07:41 UTC

[18/18] git commit: Fixed status update manager to properly handle duplicate updates.

Fixed status update manager to properly handle duplicate updates.

Review: https://reviews.apache.org/r/13520


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/ad2d6624
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/ad2d6624
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/ad2d6624

Branch: refs/heads/master
Commit: ad2d662497afbe91ad62adb2f0b270e32bcad655
Parents: 3bc167a
Author: Vinod Kone <vi...@twitter.com>
Authored: Mon Aug 12 17:38:08 2013 -0700
Committer: Vinod Kone <vi...@twitter.com>
Committed: Wed Aug 14 11:03:02 2013 -0700

----------------------------------------------------------------------
 src/slave/status_update_manager.cpp       |  20 ++++-
 src/slave/status_update_manager.hpp       |  41 ++++++----
 src/tests/status_update_manager_tests.cpp | 106 +++++++++++++++++++++++++
 3 files changed, 149 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/ad2d6624/src/slave/status_update_manager.cpp
----------------------------------------------------------------------
diff --git a/src/slave/status_update_manager.cpp b/src/slave/status_update_manager.cpp
index 6d4598e..b6afeb1 100644
--- a/src/slave/status_update_manager.cpp
+++ b/src/slave/status_update_manager.cpp
@@ -242,7 +242,7 @@ Future<Nothing> StatusUpdateManagerProcess::recover(
           cleanupStatusUpdateStream(task.id, framework.id);
         } else {
           // If a stream has pending updates after the replay,
-          // send the first pending update
+          // send the first pending update.
           const Result<StatusUpdate>& next = stream->next();
           CHECK(!next.isError());
           if (next.isSome()) {
@@ -317,11 +317,17 @@ Future<Nothing> StatusUpdateManagerProcess::_update(
   }
 
   // Handle the status update.
-  Try<Nothing> result = stream->update(update);
+  Try<bool> result = stream->update(update);
   if (result.isError()) {
     return Future<Nothing>::failed(result.error());
   }
 
+  // We don't return a failed future here so that the slave can re-ack
+  // the duplicate update.
+  if (!result.get()) {
+    return Nothing();
+  }
+
   // Forward the status update to the master if this is the first in the stream.
   // Subsequent status updates will get sent in 'acknowledgement()'.
   if (stream->pending.size() == 1) {
@@ -403,6 +409,10 @@ Future<bool> StatusUpdateManagerProcess::acknowledgement(
     return Future<bool>::failed(result.error());
   }
 
+  if (!result.get()) {
+    return Future<bool>::failed("Duplicate acknowledgement");
+  }
+
   // Reset the timeout.
   stream->timeout = None();
 
@@ -412,7 +422,9 @@ Future<bool> StatusUpdateManagerProcess::acknowledgement(
     return Future<bool>::failed(next.error());
   }
 
-  if (stream->terminated) {
+  bool terminated = stream->terminated;
+
+  if (terminated) {
     if (next.isSome()) {
       LOG(WARNING) << "Acknowledged a terminal"
                    << " status update " << update.get()
@@ -424,7 +436,7 @@ Future<bool> StatusUpdateManagerProcess::acknowledgement(
     stream->timeout = forward(next.get());
   }
 
-  return result.get();
+  return !terminated;
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/ad2d6624/src/slave/status_update_manager.hpp
----------------------------------------------------------------------
diff --git a/src/slave/status_update_manager.hpp b/src/slave/status_update_manager.hpp
index ffc79ae..a032646 100644
--- a/src/slave/status_update_manager.hpp
+++ b/src/slave/status_update_manager.hpp
@@ -103,9 +103,9 @@ public:
 
   // Checkpoints the status update to disk if necessary.
   // Also, sends the next pending status update, if any.
-  // @return true if the ACK is handled successfully (e.g., checkpointed)
+  // @return True if the ACK is handled successfully (e.g., checkpointed)
   //              and the task's status update stream is not terminated.
-  //         false same as above except the status update stream is terminated.
+  //         False same as above except the status update stream is terminated.
   //         Failed if there are any errors (e.g., duplicate, checkpointing).
   process::Future<bool> acknowledgement(
       const TaskID& taskId,
@@ -200,7 +200,11 @@ struct StatusUpdateStream
     }
   }
 
-  Try<Nothing> update(const StatusUpdate& update)
+  // This function handles the update, checkpointing if necessary.
+  // @return   True if the update is successfully handled.
+  //           False if the update is a duplicate.
+  //           Error Any errors (e.g., checkpointing).
+  Try<bool> update(const StatusUpdate& update)
   {
     if (error.isSome()) {
       return Error(error.get());
@@ -212,7 +216,7 @@ struct StatusUpdateStream
     if (acknowledged.contains(UUID::fromBytes(update.uuid()))) {
       LOG(WARNING) << "Ignoring status update " << update
                    << " that has already been acknowledged by the framework!";
-      return Nothing();
+      return false;
     }
 
     // Check that this update hasn't already been received.
@@ -220,13 +224,22 @@ struct StatusUpdateStream
     // then crashes after it writes it to disk but before it sends an ack.
     if (received.contains(UUID::fromBytes(update.uuid()))) {
       LOG(WARNING) << "Ignoring duplicate status update " << update;
-      return Nothing();
+      return false;
     }
 
     // Handle the update, checkpointing if necessary.
-    return handle(update, StatusUpdateRecord::UPDATE);
+    Try<Nothing> result = handle(update, StatusUpdateRecord::UPDATE);
+    if (result.isError()) {
+      return Error(result.error());
+    }
+
+    return true;
   }
 
+  // This function handles the ACK, checkpointing if necessary.
+  // @return   True if the acknowledgement is successfully handled.
+  //           False if the acknowledgement is a duplicate.
+  //           Error Any errors (e.g., checkpointing).
   Try<bool> acknowledgement(
       const TaskID& taskId,
       const FrameworkID& frameworkId,
@@ -238,18 +251,18 @@ struct StatusUpdateStream
     }
 
     if (acknowledged.contains(uuid)) {
-      return Error("Duplicate status update acknowledgment (UUID: "
-                   + uuid.toString() + ") for update " + stringify(update));
+      LOG(WARNING) << "Duplicate status update acknowledgment (UUID: "
+                    << uuid << ") for update " << update;
+      return false;
     }
 
     // This might happen if we retried a status update and got back
     // acknowledgments for both the original and the retried update.
     if (uuid != UUID::fromBytes(update.uuid())) {
-      return Error(
-        "Unexpected status update acknowledgement (received " +
-        uuid.toString() +
-        ", expecting " + UUID::fromBytes(update.uuid()).toString() +
-        ") for update " + stringify(update));
+      LOG(WARNING) << "Unexpected status update acknowledgement (received "
+                   << uuid << ", expecting " << UUID::fromBytes(update.uuid())
+                   << ") for update " << update;
+      return false;
     }
 
     // Handle the ACK, checkpointing if necessary.
@@ -258,7 +271,7 @@ struct StatusUpdateStream
       return Error(result.error());
     }
 
-    return !terminated;
+    return true;
   }
 
   // Returns the next update (or none, if empty) in the queue.

http://git-wip-us.apache.org/repos/asf/mesos/blob/ad2d6624/src/tests/status_update_manager_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/status_update_manager_tests.cpp b/src/tests/status_update_manager_tests.cpp
index 6473478..cf420e4 100644
--- a/src/tests/status_update_manager_tests.cpp
+++ b/src/tests/status_update_manager_tests.cpp
@@ -660,3 +660,109 @@ TEST_F(StatusUpdateManagerTest, DuplicateTerminalUpdateAfterAck)
 
   Shutdown();
 }
+
+
+// This test verifies that the slave and status update manager
+// properly handle duplicate status updates, when the second
+// update with the same UUID is received before the ACK for the
+// first update. The proper behavior here is for the status update
+// manager to drop the duplicate update.
+TEST_F(StatusUpdateManagerTest, DuplicateUpdateBeforeAck)
+{
+  Try<PID<Master> > master = StartMaster();
+  ASSERT_SOME(master);
+
+  MockExecutor exec(DEFAULT_EXECUTOR_ID);
+
+  slave::Flags flags = CreateSlaveFlags();
+  flags.checkpoint = true;
+
+  Try<PID<Slave> > slave = StartSlave(&exec, flags);
+  ASSERT_SOME(slave);
+
+  FrameworkInfo frameworkInfo; // Bug in gcc 4.1.*, must assign on next line.
+  frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_checkpoint(true); // Enable checkpointing.
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(&sched, frameworkInfo, master.get());
+
+  FrameworkID frameworkId;
+  EXPECT_CALL(sched, registered(_, _, _))
+    .WillOnce(SaveArg<1>(&frameworkId));
+
+  Future<vector<Offer> > offers;
+  EXPECT_CALL(sched, resourceOffers(_, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0u, offers.get().size());
+
+  ExecutorDriver* execDriver;
+  EXPECT_CALL(exec, registered(_, _, _, _))
+    .WillOnce(SaveArg<0>(&execDriver));
+
+  EXPECT_CALL(exec, launchTask(_, _))
+    .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+
+  // Capture the first status update message.
+  Future<StatusUpdateMessage> statusUpdateMessage =
+    FUTURE_PROTOBUF(StatusUpdateMessage(), _, _);
+
+  Future<TaskStatus> status;
+  EXPECT_CALL(sched, statusUpdate(_, _))
+    .WillOnce(FutureArg<1>(&status));
+
+  // Drop the first ACK from the scheduler to the slave.
+  Future<StatusUpdateAcknowledgementMessage> statusUpdateAcknowledgementMessage =
+    DROP_PROTOBUF(StatusUpdateAcknowledgementMessage(), _, slave.get());
+
+  Clock::pause();
+
+  driver.launchTasks(offers.get()[0].id(), createTasks(offers.get()[0]));
+
+  AWAIT_READY(statusUpdateMessage);
+
+  AWAIT_READY(status);
+
+  EXPECT_EQ(TASK_RUNNING, status.get().state());
+
+  AWAIT_READY(statusUpdateAcknowledgementMessage);
+
+  Future<Nothing> _statusUpdate =
+    FUTURE_DISPATCH(slave.get(), &Slave::_statusUpdate);
+
+  // Now resend the TASK_RUNNING update.
+  process::post(slave.get(), statusUpdateMessage.get());
+
+  // At this point the status update manager has handled
+  // the duplicate status update.
+  AWAIT_READY(_statusUpdate);
+
+  // After we advance the clock, the status update manager should
+  // retry the TASK_RUNING update and the scheduler should receive
+  // and acknowledge it.
+  Future<TaskStatus> update;
+  EXPECT_CALL(sched, statusUpdate(_, _))
+    .WillOnce(FutureArg<1>(&update));
+
+  Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL);
+  Clock::settle();
+
+  // Ensure the scheduler receives TASK_FINISHED.
+  AWAIT_READY(update);
+  EXPECT_EQ(TASK_RUNNING, update.get().state());
+
+  EXPECT_CALL(exec, shutdown(_))
+    .Times(AtMost(1));
+
+  Clock::resume();
+
+  driver.stop();
+  driver.join();
+
+  Shutdown();
+}