You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2013/04/19 22:44:32 UTC

svn commit: r1470033 - in /incubator/mesos/trunk/src: common/type_utils.hpp slave/slave.cpp slave/status_update_manager.cpp slave/status_update_manager.hpp tests/status_update_manager_tests.cpp

Author: vinodkone
Date: Fri Apr 19 20:44:32 2013
New Revision: 1470033

URL: http://svn.apache.org/r1470033
Log:
Fixed status udpate manager to ignore unexpected acknowledgements.

Review: https://reviews.apache.org/r/10660

Modified:
    incubator/mesos/trunk/src/common/type_utils.hpp
    incubator/mesos/trunk/src/slave/slave.cpp
    incubator/mesos/trunk/src/slave/status_update_manager.cpp
    incubator/mesos/trunk/src/slave/status_update_manager.hpp
    incubator/mesos/trunk/src/tests/status_update_manager_tests.cpp

Modified: incubator/mesos/trunk/src/common/type_utils.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/common/type_utils.hpp?rev=1470033&r1=1470032&r2=1470033&view=diff
==============================================================================
--- incubator/mesos/trunk/src/common/type_utils.hpp (original)
+++ incubator/mesos/trunk/src/common/type_utils.hpp Fri Apr 19 20:44:32 2013
@@ -19,11 +19,13 @@
 #ifndef __TYPE_UTILS_HPP__
 #define __TYPE_UTILS_HPP__
 
+#include <boost/functional/hash.hpp>
+
 #include <google/protobuf/descriptor.h>
 
 #include <mesos/mesos.hpp>
 
-#include <boost/functional/hash.hpp>
+#include <stout/uuid.hpp>
 
 #include "common/attributes.hpp"
 #include "common/resources.hpp"
@@ -340,7 +342,8 @@ inline std::ostream& operator << (
 {
   return stream
     << update.status().state()
-    << " from task " << update.status().task_id()
+    << " (UUID: " << UUID::fromBytes(update.uuid())
+    << ") for task " << update.status().task_id()
     << " of framework " << update.framework_id();
 }
 

Modified: incubator/mesos/trunk/src/slave/slave.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/slave.cpp?rev=1470033&r1=1470032&r2=1470033&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/slave.cpp (original)
+++ incubator/mesos/trunk/src/slave/slave.cpp Fri Apr 19 20:44:32 2013
@@ -1289,7 +1289,7 @@ void Slave::statusUpdateAcknowledgement(
     const TaskID& taskId,
     const string& uuid)
 {
-  LOG(INFO) << "Got acknowledgement of status update"
+  LOG(INFO) << "Got status update acknowledgement " << UUID::fromBytes(uuid)
             << " for task " << taskId
             << " of framework " << frameworkId;
 
@@ -1311,7 +1311,7 @@ void Slave::_statusUpdateAcknowledgement
     const UUID& uuid)
 {
   if (!future.isReady()) {
-    LOG(FATAL) << "Failed to handle status update acknowledgement"
+    LOG(FATAL) << "Failed to handle status update acknowledgement " << uuid
                << " for task " << taskId
                << " of framework " << frameworkId << ": "
                << (future.isFailed() ? future.failure() : "future discarded");
@@ -1319,7 +1319,7 @@ void Slave::_statusUpdateAcknowledgement
   }
 
   if (future.get().isError()) {
-    LOG(ERROR) << "Failed to handle the status update acknowledgement"
+    LOG(ERROR) << "Failed to handle the status update acknowledgement " << uuid
                << " for task " << taskId
                << " of framework " << frameworkId
                << ": " << future.get().error();
@@ -1327,13 +1327,14 @@ void Slave::_statusUpdateAcknowledgement
   }
 
   if (!future.get().get()) {
-    LOG(WARNING) << "Ignoring status update acknowledgement"
+    LOG(WARNING) << "Ignoring status update acknowledgement " << uuid
                  << " for task " << taskId
                  << " of framework " << frameworkId;
   }
 
   LOG(INFO) << "Status update manager successfully handled status update"
-            << " acknowledgement for task " << taskId
+            << " acknowledgement " << uuid
+            << " for task " << taskId
             << " of framework " << frameworkId;
 
   CHECK(state == RECOVERING || state == DISCONNECTED ||
@@ -1342,7 +1343,8 @@ void Slave::_statusUpdateAcknowledgement
 
   Framework* framework = getFramework(frameworkId);
   if (framework == NULL) {
-    LOG(ERROR) << "Status update acknowledgement for task " << taskId
+    LOG(ERROR) << "Status update acknowledgement " << uuid
+               << " for task " << taskId
                << " of unknown framework " << frameworkId;
     return;
   }
@@ -1354,7 +1356,8 @@ void Slave::_statusUpdateAcknowledgement
   // Find the executor that has this update.
   Executor* executor = framework->getExecutor(taskId);
   if (executor == NULL) {
-    LOG(ERROR) << "Status update acknowledgement for task " << taskId
+    LOG(ERROR) << "Status update acknowledgement " << uuid
+              << " for task " << taskId
                << " of unknown executor";
     return;
   }
@@ -1780,10 +1783,13 @@ void Slave::_statusUpdate(
     return;
   }
 
+  LOG(INFO) << "Status update manager successfully handled status update "
+            << update;
+
   // Status update manager successfully handled the status update.
   // Acknowledge the executor, if necessary.
   if (pid.isSome()) {
-    LOG(INFO) << "Sending ACK for status update " << update
+    LOG(INFO) << "Sending acknowledgement for status update " << update
               << " to executor " << pid.get();
     StatusUpdateAcknowledgementMessage message;
     message.mutable_framework_id()->MergeFrom(update.framework_id());

Modified: incubator/mesos/trunk/src/slave/status_update_manager.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/status_update_manager.cpp?rev=1470033&r1=1470032&r2=1470033&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/status_update_manager.cpp (original)
+++ incubator/mesos/trunk/src/slave/status_update_manager.cpp Fri Apr 19 20:44:32 2013
@@ -380,7 +380,7 @@ Try<bool> StatusUpdateManagerProcess::ac
   // This might happen if we retried a status update and got back
   // acknowledgments for both the original and the retried update.
   if (update.isNone()) {
-    LOG(WARNING) << "Ignoring duplicate status update acknowledgment " << uuid
+    LOG(WARNING) << "Ignoring unexpected status update acknowledgment " << uuid
                  << " for task " << taskId
                  << " of framework " << frameworkId;
     return false;

Modified: incubator/mesos/trunk/src/slave/status_update_manager.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/status_update_manager.hpp?rev=1470033&r1=1470032&r2=1470033&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/status_update_manager.hpp (original)
+++ incubator/mesos/trunk/src/slave/status_update_manager.hpp Fri Apr 19 20:44:32 2013
@@ -237,15 +237,20 @@ struct StatusUpdateStream
 
     if (acknowledged.contains(uuid)) {
       LOG(WARNING) << "Ignoring duplicate status update acknowledgment " << uuid
-                   << " for task " << taskId
-                   << " of framework " << frameworkId;;
+                   << " for update " << update;
       return false;
     }
 
-    CHECK(uuid == UUID::fromBytes(update.uuid()))
-      << "Unexpected UUID mismatch! (received " << uuid
-      << ", expecting " << UUID::fromBytes(update.uuid()).toString()
-      << ") for update " << stringify(update);
+    // This might happen if we retried a status update and got back
+    // acknowledgments for both the original and the retried update.
+    if (uuid != UUID::fromBytes(update.uuid())) {
+      LOG(WARNING)
+        << "Ignoring unexpected status update acknowledgement "
+        << "(received " << uuid
+        << ", expecting " << UUID::fromBytes(update.uuid())
+        << ") for update " << update;
+      return false;
+    }
 
     // Handle the ACK, checkpointing if necessary.
     Try<Nothing> result = handle(update, StatusUpdateRecord::ACK);
@@ -347,8 +352,6 @@ private:
   {
     CHECK(error.isNone());
 
-    LOG(INFO) << "Handling " << type << " for status update " << update;
-
     if (type == StatusUpdateRecord::UPDATE) {
       // Record this update.
       received.insert(UUID::fromBytes(update.uuid()));

Modified: incubator/mesos/trunk/src/tests/status_update_manager_tests.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/status_update_manager_tests.cpp?rev=1470033&r1=1470032&r2=1470033&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/status_update_manager_tests.cpp (original)
+++ incubator/mesos/trunk/src/tests/status_update_manager_tests.cpp Fri Apr 19 20:44:32 2013
@@ -120,7 +120,7 @@ TEST_F(StatusUpdateManagerTest, Checkpoi
 
   driver.start();
 
-  AWAIT_UNTIL(offers);
+  AWAIT_READY(offers);
   EXPECT_NE(0u, offers.get().size());
 
   EXPECT_CALL(exec, registered(_, _, _, _))
@@ -138,10 +138,10 @@ TEST_F(StatusUpdateManagerTest, Checkpoi
 
   driver.launchTasks(offers.get()[0].id(), createTasks(offers.get()[0]));
 
-  AWAIT_UNTIL(status);
+  AWAIT_READY(status);
   EXPECT_EQ(TASK_RUNNING, status.get().state());
 
-  AWAIT_UNTIL(_statusUpdateAcknowledgement);
+  AWAIT_READY(_statusUpdateAcknowledgement);
 
   // Ensure that both the status update and its acknowledgement are
   // correctly checkpointed.
@@ -185,7 +185,7 @@ TEST_F(StatusUpdateManagerTest, Checkpoi
   driver.stop();
   driver.join();
 
-  AWAIT_UNTIL(shutdown); // Ensures MockExecutor can be deallocated.
+  AWAIT_READY(shutdown); // Ensures MockExecutor can be deallocated.
 
   cluster.shutdown();
 }
@@ -223,7 +223,7 @@ TEST_F(StatusUpdateManagerTest, RetrySta
 
   driver.start();
 
-  AWAIT_UNTIL(offers);
+  AWAIT_READY(offers);
   EXPECT_NE(0u, offers.get().size());
 
   EXPECT_CALL(exec, registered(_, _, _, _))
@@ -239,7 +239,7 @@ TEST_F(StatusUpdateManagerTest, RetrySta
 
   driver.launchTasks(offers.get()[0].id(), createTasks(offers.get()[0]));
 
-  AWAIT_UNTIL(statusUpdateMessage);
+  AWAIT_READY(statusUpdateMessage);
 
   Future<TaskStatus> status;
   EXPECT_CALL(sched, statusUpdate(_, _))
@@ -247,7 +247,7 @@ TEST_F(StatusUpdateManagerTest, RetrySta
 
   Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL);
 
-  AWAIT_UNTIL(status);
+  AWAIT_READY(status);
 
   EXPECT_EQ(TASK_RUNNING, status.get().state());
 
@@ -260,7 +260,7 @@ TEST_F(StatusUpdateManagerTest, RetrySta
   driver.stop();
   driver.join();
 
-  AWAIT_UNTIL(shutdown); // Ensures MockExecutor can be deallocated.
+  AWAIT_READY(shutdown); // Ensures MockExecutor can be deallocated.
 
   cluster.shutdown();
 }
@@ -303,7 +303,7 @@ TEST_F(StatusUpdateManagerTest, IgnoreDu
 
   driver.start();
 
-  AWAIT_UNTIL(offers);
+  AWAIT_READY(offers);
   EXPECT_NE(0u, offers.get().size());
 
   ExecutorDriver* execDriver;
@@ -322,7 +322,7 @@ TEST_F(StatusUpdateManagerTest, IgnoreDu
 
   driver.launchTasks(offers.get()[0].id(), createTasks(offers.get()[0]));
 
-  AWAIT_UNTIL(statusUpdateMessage);
+  AWAIT_READY(statusUpdateMessage);
   StatusUpdate update = statusUpdateMessage.get().update();
 
   Future<TaskStatus> status;
@@ -335,7 +335,7 @@ TEST_F(StatusUpdateManagerTest, IgnoreDu
 
   Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL);
 
-  AWAIT_UNTIL(status);
+  AWAIT_READY(status);
 
   EXPECT_EQ(TASK_RUNNING, status.get().state());
 
@@ -370,8 +370,7 @@ TEST_F(StatusUpdateManagerTest, IgnoreDu
 
   // TODO(vinod): It would've been great to introspect the first
   // argument of '_statusUpdateAcknowledement()' and ensure that
-  // it is 'false'. All we do now is wait for the slave to not crash
-  // due to CHECK failure in status update manager.
+  // it is 'false'.
   AWAIT_READY(duplicateAck);
 
   Clock::resume();
@@ -383,7 +382,104 @@ TEST_F(StatusUpdateManagerTest, IgnoreDu
   driver.stop();
   driver.join();
 
-  AWAIT_UNTIL(shutdown); // Ensures MockExecutor can be deallocated.
+  AWAIT_READY(shutdown); // Ensures MockExecutor can be deallocated.
+
+  cluster.shutdown();
+}
+
+
+// This test verifies that status update manager ignores
+// unexpected ACK for an earlier update when it is waiting
+// for an ACK for another update. We do this by dropping ACKs
+// for the original update and sending a random ACK to the slave.
+TEST_F(StatusUpdateManagerTest, IgnoreUnexpectedStatusUpdateAck)
+{
+  ASSERT_TRUE(GTEST_IS_THREADSAFE);
+
+  Try<PID<Master> > master = cluster.masters.start();
+  ASSERT_SOME(master);
+
+  MockExecutor exec;
+
+  slave::Flags flags = cluster.slaves.flags;
+  flags.checkpoint = true;
+  Try<PID<Slave> > slave = cluster.slaves.start(
+      flags, DEFAULT_EXECUTOR_ID, &exec);
+  ASSERT_SOME(slave);
+
+  FrameworkInfo frameworkInfo; // Bug in gcc 4.1.*, must assign on next line.
+  frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_checkpoint(true); // Enable checkpointing.
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(&sched, frameworkInfo, master.get());
+
+  FrameworkID frameworkId;
+  EXPECT_CALL(sched, registered(_, _, _))
+    .WillOnce(SaveArg<1>(&frameworkId));
+
+  Future<vector<Offer> > offers;
+  EXPECT_CALL(sched, resourceOffers(_, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  Future<TaskStatus> status;
+  EXPECT_CALL(sched, statusUpdate(_, _))
+    .WillOnce(FutureArg<1>(&status));
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0u, offers.get().size());
+
+  ExecutorDriver* execDriver;
+  EXPECT_CALL(exec, registered(_, _, _, _))
+      .WillOnce(SaveArg<0>(&execDriver));
+
+  EXPECT_CALL(exec, launchTask(_, _))
+    .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+
+  Future<StatusUpdateMessage> statusUpdateMessage =
+    FUTURE_PROTOBUF(StatusUpdateMessage(), master.get(), _);
+
+  // Drop the ACKs, so that status update manager
+  // retries the update.
+  DROP_PROTOBUFS(StatusUpdateAcknowledgementMessage(), _, _);
+
+  driver.launchTasks(offers.get()[0].id(), createTasks(offers.get()[0]));
+
+  AWAIT_READY(statusUpdateMessage);
+  StatusUpdate update = statusUpdateMessage.get().update();
+
+  AWAIT_READY(status);
+
+  EXPECT_EQ(TASK_RUNNING, status.get().state());
+
+  Future<Nothing> unexpectedAck =
+      FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
+
+  // Now send an ACK with a random UUID.
+  process::dispatch(
+      slave.get(),
+      &Slave::statusUpdateAcknowledgement,
+      update.slave_id(),
+      frameworkId,
+      update.status().task_id(),
+      UUID::random().toBytes());
+
+  // TODO(vinod): It would've been great to introspect the first
+  // argument of '_statusUpdateAcknowledement()' and ensure that
+  // it is 'false'.
+  AWAIT_READY(unexpectedAck);
+
+  Future<Nothing> shutdown;
+  EXPECT_CALL(exec, shutdown(_))
+    .WillRepeatedly(FutureSatisfy(&shutdown));
+
+  driver.stop();
+  driver.join();
+
+  AWAIT_READY(shutdown); // Ensures MockExecutor can be deallocated.
 
   cluster.shutdown();
 }