You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2014/10/22 00:47:31 UTC

[2/6] git commit: Updated status update manager to forward updates via slave.

Updated status update manager to forward updates via slave.

Review: https://reviews.apache.org/r/26846


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e64dda41
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e64dda41
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e64dda41

Branch: refs/heads/master
Commit: e64dda411bc83963179c92ae71caefa8d21b54b4
Parents: 616d401
Author: Vinod Kone <vi...@gmail.com>
Authored: Wed Oct 15 18:22:46 2014 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Tue Oct 21 15:47:08 2014 -0700

----------------------------------------------------------------------
 src/master/master.cpp               |   6 +-
 src/slave/slave.cpp                 | 122 ++++++++++++++++++-------------
 src/slave/slave.hpp                 |   4 +
 src/slave/status_update_manager.cpp |  47 ++----------
 src/slave/status_update_manager.hpp |   2 -
 src/tests/slave_recovery_tests.cpp  |  19 +++--
 6 files changed, 94 insertions(+), 106 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/e64dda41/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index be910d9..f04c085 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -3294,14 +3294,12 @@ void Master::unregisterSlave(const UPID& from, const SlaveID& slaveId)
 }
 
 
-// NOTE: We cannot use 'from' here to identify the slave as this is
-// now sent by the StatusUpdateManagerProcess and master itself when
-// it generates TASK_LOST messages. Only 'pid' can be used to identify
-// the slave.
 // TODO(bmahler): The master will not release resources until the
 // slave receives acknowlegements for all non-terminal updates. This
 // means if a framework is down, the resources will remain allocated
 // even though the tasks are terminal on the slaves!
+// TODO(vinod): Since 0.22.0, we can use 'from' instead of 'pid' because
+// the status updates will be sent by the slave.
 void Master::statusUpdate(const StatusUpdate& update, const UPID& pid)
 {
   ++metrics.messages_status_update;

http://git-wip-us.apache.org/repos/asf/mesos/blob/e64dda41/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 7b5474a..afcb669 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -497,7 +497,7 @@ void Slave::finalize()
     }
   }
 
-  if (state == TERMINATING || flags.recover == "cleanup") {
+  if (state == TERMINATING) {
     // We remove the "latest" symlink in meta directory, so that the
     // slave doesn't recover the state when it restarts and registers
     // as a new slave with the master.
@@ -533,14 +533,8 @@ void Slave::shutdown(const UPID& from, const string& message)
   if (frameworks.empty()) { // Terminate slave if there are no frameworks.
     terminate(self());
   } else {
-    // NOTE: The slave will terminate after all
-    // executors have terminated.
-    // TODO(vinod): Wait until all updates have been acknowledged.
-    // This is tricky without persistent state at master because the
-    // slave might wait forever for status update acknowledgements,
-    // since it cannot reliably know when a framework has shut down.
-    // A short-term fix could be to wait for a certain time for ACKs
-    // and then shutdown.
+    // NOTE: The slave will terminate after all the executors have
+    // terminated.
     // NOTE: We use 'frameworks.keys()' here because 'shutdownFramework'
     // can potentially remove a framework from 'frameworks'.
     foreach (const FrameworkID& frameworkId, frameworks.keys()) {
@@ -600,22 +594,11 @@ void Slave::detected(const Future<Option<MasterInfo> >& _master)
     LOG(INFO) << "New master detected at " << master.get();
     link(master.get());
 
-    // Inform the status updates manager about the new master.
-    statusUpdateManager->newMasterDetected(master.get());
-
     if (state == TERMINATING) {
       LOG(INFO) << "Skipping registration because slave is terminating";
       return;
     }
 
-    // The slave does not (re-)register if it is in the cleanup mode
-    // because we do not want to accept new tasks.
-    if (flags.recover == "cleanup") {
-      LOG(INFO)
-        << "Skipping registration because slave was started in cleanup mode";
-      return;
-    }
-
     // Wait for a random amount of time before authentication or
     // registration.
     Duration duration =
@@ -829,6 +812,11 @@ void Slave::reregistered(
       CHECK_SOME(master);
       LOG(INFO) << "Re-registered with master " << master.get();
       state = RUNNING;
+
+      // Inform status update manager to immediately resend any
+      // pending updates.
+      statusUpdateManager->flush();
+
       break;
     case RUNNING:
       CHECK_SOME(master);
@@ -931,6 +919,8 @@ void Slave::doReliableRegistration(const Duration& duration)
 
   CHECK(state == DISCONNECTED) << state;
 
+  CHECK_NE("cleanup", flags.recover);
+
   if (!info.has_id()) {
     // Registering for the first time.
     RegisterSlaveMessage message;
@@ -2373,6 +2363,33 @@ void Slave::__statusUpdate(
 }
 
 
+// NOTE: An acknowledgement for this update might have already been
+// processed by the slave but not the status update manager.
+void Slave::forward(const StatusUpdate& update)
+{
+  CHECK(state == RECOVERING || state == DISCONNECTED ||
+        state == RUNNING || state == TERMINATING)
+    << state;
+
+  if (state != RUNNING) {
+    LOG(WARNING) << "Dropping status update " << update
+                 << " sent by status update manager because the slave"
+                 << " is in " << state << " state";
+    return;
+  }
+
+  CHECK_SOME(master);
+  LOG(INFO) << "Forwarding the update " << update << " to " << master.get();
+
+  // Forward the update to master.
+  StatusUpdateMessage message;
+  message.mutable_update()->MergeFrom(update);
+  message.set_pid(self()); // The ACK will be first received by the slave.
+
+  send(master.get(), message);
+}
+
+
 void Slave::executorMessage(
     const SlaveID& slaveId,
     const FrameworkID& frameworkId,
@@ -2866,9 +2883,10 @@ void Slave::executorTerminated(
         if (master.isSome()) { send(master.get(), message); }
       }
 
-      // Remove the executor if either the framework is terminating or
-      // there are no incomplete tasks.
-      if (framework->state == Framework::TERMINATING ||
+      // Remove the executor if either the slave or framework is
+      // terminating or there are no incomplete tasks.
+      if (state == TERMINATING ||
+          framework->state == Framework::TERMINATING ||
           !executor->incompleteTasks()) {
         removeExecutor(framework, executor);
       }
@@ -2900,13 +2918,15 @@ void Slave::removeExecutor(Framework* framework, Executor* executor)
         framework->state == Framework::TERMINATING)
     << framework->state;
 
-  // Check that this executor has terminated and either has no
-  // pending updates or the framework is terminating. We don't
-  // care for pending updates when a framework is terminating
-  // because the framework cannot ACK them.
+  // Check that this executor has terminated.
   CHECK(executor->state == Executor::TERMINATED) << executor->state;
-  CHECK(framework->state == Framework::TERMINATING ||
-        !executor->incompleteTasks());
+
+  // Check that either 1) the executor has no tasks with pending
+  // updates or 2) the slave/framework is terminating, because no
+  // acknowledgements might be received.
+  CHECK(!executor->incompleteTasks() ||
+        state == TERMINATING ||
+        framework->state == Framework::TERMINATING);
 
   // Write a sentinel file to indicate that this executor
   // is completed.
@@ -3006,17 +3026,8 @@ void Slave::removeFramework(Framework* framework)
   // Pass ownership of the framework pointer.
   completedFrameworks.push_back(Owned<Framework>(framework));
 
-  if (frameworks.empty()) {
-    // Terminate the slave if
-    // 1) it's being shut down or
-    // 2) it's started in cleanup mode and recovery finished.
-    // TODO(vinod): Instead of doing it this way, shutdownFramework()
-    // and shutdownExecutor() could return Futures and a slave could
-    // shutdown when all the Futures are satisfied (e.g., collect()).
-    if (state == TERMINATING ||
-        (flags.recover == "cleanup" && !recovered.future().isPending())) {
-      terminate(self());
-    }
+  if (state == TERMINATING && frameworks.empty()) {
+    terminate(self());
   }
 }
 
@@ -3362,7 +3373,6 @@ void Slave::__recover(const Future<Nothing>& future)
   LOG(INFO) << "Finished recovery";
 
   CHECK_EQ(RECOVERING, state);
-  state = DISCONNECTED;
 
   // Checkpoint boot ID.
   Try<string> bootId = os::bootId();
@@ -3413,18 +3423,30 @@ void Slave::__recover(const Future<Nothing>& future)
     }
   }
 
-  recovered.set(Nothing()); // Signal recovery.
+  if (flags.recover == "reconnect") {
+    state = DISCONNECTED;
 
-  // Terminate slave, if it has no active frameworks and is started
-  // in 'cleanup' mode.
-  if (frameworks.empty() && flags.recover == "cleanup") {
-    terminate(self());
-    return;
+    // Start detecting masters.
+    detection = detector->detect()
+      .onAny(defer(self(), &Slave::detected, lambda::_1));
+  } else {
+    // Slave started in cleanup mode.
+    CHECK_EQ("cleanup", flags.recover);
+    state = TERMINATING;
+
+    if (frameworks.empty()) {
+      terminate(self());
+    }
+
+    // If there are active executors/frameworks, the slave will
+    // shutdown when all the executors are terminated. Note that
+    // the executors are guaranteed to terminate because they
+    // are sent shutdown signal in '_recover()' which results in
+    // 'Containerizer::destroy()' being called if the termination
+    // doesn't happen within a timeout.
   }
 
-  // Start detecting masters.
-  detection = detector->detect()
-    .onAny(defer(self(), &Slave::detected, lambda::_1));
+  recovered.set(Nothing()); // Signal recovery.
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/e64dda41/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index ccc0e03..439052e 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -197,6 +197,10 @@ public:
       const StatusUpdate& update,
       const process::UPID& pid);
 
+  // This is called by status update manager to forward a
+  // status update to the master.
+  void forward(const StatusUpdate& update);
+
   void statusUpdateAcknowledgement(
       const process::UPID& from,
       const SlaveID& slaveId,

http://git-wip-us.apache.org/repos/asf/mesos/blob/e64dda41/src/slave/status_update_manager.cpp
----------------------------------------------------------------------
diff --git a/src/slave/status_update_manager.cpp b/src/slave/status_update_manager.cpp
index 5d5cf23..fb35ace 100644
--- a/src/slave/status_update_manager.cpp
+++ b/src/slave/status_update_manager.cpp
@@ -91,8 +91,6 @@ public:
       const string& rootDir,
       const Option<SlaveState>& state);
 
-  void newMasterDetected(const UPID& pid);
-
   void flush();
 
   void cleanup(const FrameworkID& frameworkId);
@@ -135,7 +133,6 @@ private:
       const TaskID& taskId,
       const FrameworkID& frameworkId);
 
-  UPID master;
   Flags flags;
   PID<Slave> slave;
   hashmap<FrameworkID, hashmap<TaskID, StatusUpdateStream*> > streams;
@@ -162,23 +159,13 @@ void StatusUpdateManagerProcess::initialize(
 }
 
 
-void StatusUpdateManagerProcess::newMasterDetected(const UPID& pid)
-{
-  LOG(INFO) << "New master detected at " << pid;
-  master = pid;
-
-  // Retry any pending status updates.
-  flush();
-}
-
-
 void StatusUpdateManagerProcess::flush()
 {
   foreachkey (const FrameworkID& frameworkId, streams) {
     foreachvalue (StatusUpdateStream* stream, streams[frameworkId]) {
       if (!stream->pending.empty()) {
         const StatusUpdate& update = stream->pending.front();
-        LOG(WARNING) << "Resending status update " << update;
+        LOG(WARNING) << "Flushing status update " << update;
         stream->timeout = forward(update, STATUS_UPDATE_RETRY_INTERVAL_MIN);
       }
     }
@@ -256,18 +243,11 @@ Future<Nothing> StatusUpdateManagerProcess::recover(
         }
 
         // At the end of the replay, the stream is either terminated or
-        // contains only unacknowledged, if any, pending updates.
+        // contains only unacknowledged, if any, pending updates. The
+        // pending updates will be flushed after the slave
+        // re-registers with the master.
         if (stream->terminated) {
           cleanupStatusUpdateStream(task.id, framework.id);
-        } else {
-          // If a stream has pending updates after the replay,
-          // send the first pending update.
-          const Result<StatusUpdate>& next = stream->next();
-          CHECK(!next.isError());
-          if (next.isSome()) {
-            stream->timeout =
-              forward(next.get(), STATUS_UPDATE_RETRY_INTERVAL_MIN);
-          }
         }
       }
     }
@@ -369,18 +349,10 @@ Timeout StatusUpdateManagerProcess::forward(
     const StatusUpdate& update,
     const Duration& duration)
 {
-  if (master) {
-    LOG(INFO) << "Forwarding status update " << update << " to " << master;
+  VLOG(1) << "Forwarding update " << update << " to the slave";
 
-    StatusUpdateMessage message;
-    message.mutable_update()->MergeFrom(update);
-    message.set_pid(slave); // The ACK will be first received by the slave.
-
-    send(master, message);
-  } else {
-    LOG(WARNING) << "Not forwarding status update " << update
-                 << " because no master is elected yet";
-  }
+  // Forward the update to the slave.
+  dispatch(slave, &Slave::forward, update);
 
   // Send a message to self to resend after some delay if no ACK is received.
   return delay(duration,
@@ -622,11 +594,6 @@ Future<Nothing> StatusUpdateManager::recover(
 }
 
 
-void StatusUpdateManager::newMasterDetected(const UPID& pid)
-{
-  dispatch(process, &StatusUpdateManagerProcess::newMasterDetected, pid);
-}
-
 
 void StatusUpdateManager::flush()
 {

http://git-wip-us.apache.org/repos/asf/mesos/blob/e64dda41/src/slave/status_update_manager.hpp
----------------------------------------------------------------------
diff --git a/src/slave/status_update_manager.hpp b/src/slave/status_update_manager.hpp
index c371e55..1c1a8a8 100644
--- a/src/slave/status_update_manager.hpp
+++ b/src/slave/status_update_manager.hpp
@@ -117,8 +117,6 @@ public:
       const std::string& rootDir,
       const Option<state::SlaveState>& state);
 
-  // TODO(vinod): Remove this hack once the new leader detector code is merged.
-  void newMasterDetected(const process::UPID& pid);
 
   // Resend all the pending updates right away.
   // This is useful when the updates were pending because there was

http://git-wip-us.apache.org/repos/asf/mesos/blob/e64dda41/src/tests/slave_recovery_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_recovery_tests.cpp b/src/tests/slave_recovery_tests.cpp
index 4fb357b..813e2d6 100644
--- a/src/tests/slave_recovery_tests.cpp
+++ b/src/tests/slave_recovery_tests.cpp
@@ -912,7 +912,7 @@ TYPED_TEST(SlaveRecoveryTest, RecoverCompletedExecutor)
 
 // The slave is stopped after a non-terminal update is received.
 // Slave is restarted in recovery=cleanup mode. It kills the command
-// executor, and transitions the task to FAILED.
+// executor, and terminates. Master should then send TASK_LOST.
 TYPED_TEST(SlaveRecoveryTest, CleanupExecutor)
 {
   Try<PID<Master> > master = this->StartMaster();
@@ -965,8 +965,8 @@ TYPED_TEST(SlaveRecoveryTest, CleanupExecutor)
   this->Stop(slave.get());
   delete containerizer1.get();
 
-  // Slave in cleanup mode shouldn't reregister with slave and hence
-  // no offers should be made by the master.
+  // Slave in cleanup mode shouldn't re-register with the master and
+  // hence no offers should be made by the master.
   EXPECT_CALL(sched, resourceOffers(_, _))
     .Times(0);
 
@@ -976,12 +976,14 @@ TYPED_TEST(SlaveRecoveryTest, CleanupExecutor)
 
   Future<Nothing> __recover = FUTURE_DISPATCH(_, &Slave::__recover);
 
+  EXPECT_CALL(sched, slaveLost(_, _))
+    .Times(AtMost(1));
+
   // Restart the slave in 'cleanup' recovery mode with a new isolator.
   Try<TypeParam*> containerizer2 = TypeParam::create(flags, true);
   ASSERT_SOME(containerizer2);
 
   flags.recover = "cleanup";
-
   slave = this->StartSlave(containerizer2.get(), flags);
   ASSERT_SOME(slave);
 
@@ -993,15 +995,12 @@ TYPED_TEST(SlaveRecoveryTest, CleanupExecutor)
     Clock::settle();
   }
 
-  // Scheduler should receive the TASK_FAILED update.
-  AWAIT_READY(status);
-  ASSERT_EQ(TASK_FAILED, status.get().state());
-
   // Wait for recovery to complete.
   AWAIT_READY(__recover);
-  Clock::settle();
 
-  Clock::resume();
+  // Scheduler should receive the TASK_LOST update.
+  AWAIT_READY(status);
+  ASSERT_EQ(TASK_LOST, status.get().state());
 
   driver.stop();
   driver.join();