You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2013/11/26 05:36:56 UTC

git commit: Fixed status update manager to backoff on status update retries.

Updated Branches:
  refs/heads/master 5d08c6590 -> afc0855fe


Fixed status update manager to backoff on status update retries.

Review: https://reviews.apache.org/r/15851


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/afc0855f
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/afc0855f
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/afc0855f

Branch: refs/heads/master
Commit: afc0855fe7a1587b34d2d6cfb2eaf69c907935f7
Parents: 5d08c65
Author: Vinod Kone <vi...@twitter.com>
Authored: Mon Nov 25 12:06:57 2013 -0800
Committer: Vinod Kone <vi...@twitter.com>
Committed: Mon Nov 25 20:28:51 2013 -0800

----------------------------------------------------------------------
 src/slave/constants.cpp                   |  3 +-
 src/slave/constants.hpp                   |  3 +-
 src/slave/slave.cpp                       |  5 +++
 src/slave/status_update_manager.cpp       | 51 ++++++++++++++++++--------
 src/slave/status_update_manager.hpp       |  5 +++
 src/tests/fault_tolerance_tests.cpp       |  7 ++--
 src/tests/status_update_manager_tests.cpp |  8 ++--
 7 files changed, 58 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/afc0855f/src/slave/constants.cpp
----------------------------------------------------------------------
diff --git a/src/slave/constants.cpp b/src/slave/constants.cpp
index 5573d39..1226485 100644
--- a/src/slave/constants.cpp
+++ b/src/slave/constants.cpp
@@ -25,7 +25,8 @@ namespace slave {
 const Duration EXECUTOR_REGISTRATION_TIMEOUT = Minutes(1);
 const Duration EXECUTOR_SHUTDOWN_GRACE_PERIOD = Seconds(5);
 const Duration EXECUTOR_REREGISTER_TIMEOUT = Seconds(2);
-const Duration STATUS_UPDATE_RETRY_INTERVAL = Seconds(10);
+const Duration STATUS_UPDATE_RETRY_INTERVAL_MIN = Seconds(10);
+const Duration STATUS_UPDATE_RETRY_INTERVAL_MAX = Minutes(10);
 const Duration GC_DELAY = Weeks(1);
 const double GC_DISK_HEADROOM = 0.1;
 const Duration DISK_WATCH_INTERVAL = Minutes(1);

http://git-wip-us.apache.org/repos/asf/mesos/blob/afc0855f/src/slave/constants.hpp
----------------------------------------------------------------------
diff --git a/src/slave/constants.hpp b/src/slave/constants.hpp
index bbbbfd3..d237383 100644
--- a/src/slave/constants.hpp
+++ b/src/slave/constants.hpp
@@ -38,7 +38,8 @@ extern const Duration EXECUTOR_REGISTRATION_TIMEOUT;
 extern const Duration EXECUTOR_SHUTDOWN_GRACE_PERIOD;
 extern const Duration EXECUTOR_REREGISTER_TIMEOUT;
 extern const Duration RECOVERY_TIMEOUT;
-extern const Duration STATUS_UPDATE_RETRY_INTERVAL;
+extern const Duration STATUS_UPDATE_RETRY_INTERVAL_MIN;
+extern const Duration STATUS_UPDATE_RETRY_INTERVAL_MAX;
 extern const Duration GC_DELAY;
 extern const Duration DISK_WATCH_INTERVAL;
 extern const Duration RESOURCE_MONITORING_INTERVAL;

http://git-wip-us.apache.org/repos/asf/mesos/blob/afc0855f/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index a9be378..6fc18c5 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -1301,6 +1301,11 @@ void Slave::updateFramework(const FrameworkID& frameworkId, const string& pid)
                   << framework->pid << "' to '" << path << "'";
         CHECK_SOME(state::checkpoint(path, framework->pid));
       }
+
+      // Inform status update manager to immediately resend any pending
+      // updates.
+      statusUpdateManager->flush();
+
       break;
     }
     default:

http://git-wip-us.apache.org/repos/asf/mesos/blob/afc0855f/src/slave/status_update_manager.cpp
----------------------------------------------------------------------
diff --git a/src/slave/status_update_manager.cpp b/src/slave/status_update_manager.cpp
index fafd83d..b79f9de 100644
--- a/src/slave/status_update_manager.cpp
+++ b/src/slave/status_update_manager.cpp
@@ -86,6 +86,8 @@ public:
 
   void newMasterDetected(const UPID& pid);
 
+  void flush();
+
   void cleanup(const FrameworkID& frameworkId);
 
 private:
@@ -98,13 +100,13 @@ private:
       const Option<UUID>& uuid);
 
   // Status update timeout.
-  void timeout();
+  void timeout(const Duration& duration);
 
-  // Forwards the status update to the master and starts a timer to check
-  // for ACK from the scheduler.
+  // Forwards the status update to the master and starts a timer based
+  // on the 'duration' to check for ACK from the scheduler.
   // NOTE: This should only be used for those messages that expect an
   // ACK (e.g updates from the executor).
-  Timeout forward(const StatusUpdate& update);
+  Timeout forward(const StatusUpdate& update, const Duration& duration);
 
   // Helper functions.
 
@@ -159,14 +161,18 @@ void StatusUpdateManagerProcess::newMasterDetected(const UPID& pid)
   master = pid;
 
   // Retry any pending status updates.
-  // This is useful when the updates were pending because there was
-  // no master elected (e.g., during recovery).
+  flush();
+}
+
+
+void StatusUpdateManagerProcess::flush()
+{
   foreachkey (const FrameworkID& frameworkId, streams) {
     foreachvalue (StatusUpdateStream* stream, streams[frameworkId]) {
       if (!stream->pending.empty()) {
         const StatusUpdate& update = stream->pending.front();
         LOG(WARNING) << "Resending status update " << update;
-        stream->timeout = forward(update);
+        stream->timeout = forward(update, STATUS_UPDATE_RETRY_INTERVAL_MIN);
       }
     }
   }
@@ -251,7 +257,8 @@ Future<Nothing> StatusUpdateManagerProcess::recover(
           const Result<StatusUpdate>& next = stream->next();
           CHECK(!next.isError());
           if (next.isSome()) {
-            stream->timeout = forward(next.get());
+            stream->timeout =
+              forward(next.get(), STATUS_UPDATE_RETRY_INTERVAL_MIN);
           }
         }
       }
@@ -343,14 +350,16 @@ Future<Nothing> StatusUpdateManagerProcess::_update(
     }
 
     CHECK_SOME(next);
-    stream->timeout = forward(next.get());
+    stream->timeout = forward(next.get(), STATUS_UPDATE_RETRY_INTERVAL_MIN);
   }
 
   return Nothing();
 }
 
 
-Timeout StatusUpdateManagerProcess::forward(const StatusUpdate& update)
+Timeout StatusUpdateManagerProcess::forward(
+    const StatusUpdate& update,
+    const Duration& duration)
 {
   if (master) {
     LOG(INFO) << "Forwarding status update " << update << " to " << master;
@@ -366,9 +375,10 @@ Timeout StatusUpdateManagerProcess::forward(const StatusUpdate& update)
   }
 
   // Send a message to self to resend after some delay if no ACK is received.
-  return delay(STATUS_UPDATE_RETRY_INTERVAL,
+  return delay(duration,
                self(),
-               &StatusUpdateManagerProcess::timeout).timeout();
+               &StatusUpdateManagerProcess::timeout,
+               duration).timeout();
 }
 
 
@@ -438,7 +448,7 @@ Future<bool> StatusUpdateManagerProcess::acknowledgement(
     cleanupStatusUpdateStream(taskId, frameworkId);
   } else if (next.isSome()) {
     // Forward the next queued status update.
-    stream->timeout = forward(next.get());
+    stream->timeout = forward(next.get(), STATUS_UPDATE_RETRY_INTERVAL_MIN);
   }
 
   return !terminated;
@@ -446,7 +456,7 @@ Future<bool> StatusUpdateManagerProcess::acknowledgement(
 
 
 // TODO(vinod): There should be a limit on the retries.
-void StatusUpdateManagerProcess::timeout()
+void StatusUpdateManagerProcess::timeout(const Duration& duration)
 {
   // Check and see if we should resend any status updates.
   foreachkey (const FrameworkID& frameworkId, streams) {
@@ -457,7 +467,12 @@ void StatusUpdateManagerProcess::timeout()
         if (stream->timeout.get().expired()) {
           const StatusUpdate& update = stream->pending.front();
           LOG(WARNING) << "Resending status update " << update;
-          stream->timeout = forward(update);
+
+          // Bounded exponential backoff.
+          Duration duration_ =
+            std::min(duration * 2, STATUS_UPDATE_RETRY_INTERVAL_MAX);
+
+          stream->timeout = forward(update, duration_);
         }
       }
     }
@@ -605,6 +620,12 @@ void StatusUpdateManager::newMasterDetected(const UPID& pid)
 }
 
 
+void StatusUpdateManager::flush()
+{
+  dispatch(process, &StatusUpdateManagerProcess::flush);
+}
+
+
 void StatusUpdateManager::cleanup(const FrameworkID& frameworkId)
 {
   dispatch(process, &StatusUpdateManagerProcess::cleanup, frameworkId);

http://git-wip-us.apache.org/repos/asf/mesos/blob/afc0855f/src/slave/status_update_manager.hpp
----------------------------------------------------------------------
diff --git a/src/slave/status_update_manager.hpp b/src/slave/status_update_manager.hpp
index 5243ed4..06ea465 100644
--- a/src/slave/status_update_manager.hpp
+++ b/src/slave/status_update_manager.hpp
@@ -120,6 +120,11 @@ public:
   // TODO(vinod): Remove this hack once the new leader detector code is merged.
   void newMasterDetected(const UPID& pid);
 
+  // Resend all the pending updates right away.
+  // This is useful when the updates were pending because there was
+  // no master elected (e.g., during recovery) or framework failed over.
+  void flush();
+
   // Closes all the status update streams corresponding to this framework.
   // NOTE: This stops retrying any pending status updates for this framework.
   void cleanup(const FrameworkID& frameworkId);

http://git-wip-us.apache.org/repos/asf/mesos/blob/afc0855f/src/tests/fault_tolerance_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/fault_tolerance_tests.cpp b/src/tests/fault_tolerance_tests.cpp
index 40f474a..f376b88 100644
--- a/src/tests/fault_tolerance_tests.cpp
+++ b/src/tests/fault_tolerance_tests.cpp
@@ -55,7 +55,7 @@ using mesos::internal::master::Master;
 
 using mesos::internal::slave::Isolator;
 using mesos::internal::slave::Slave;
-using mesos::internal::slave::STATUS_UPDATE_RETRY_INTERVAL;
+using mesos::internal::slave::STATUS_UPDATE_RETRY_INTERVAL_MIN;
 
 using process::Clock;
 using process::Future;
@@ -1072,7 +1072,7 @@ TEST_F(FaultToleranceTest, SchedulerFailoverStatusUpdate)
   EXPECT_CALL(sched2, statusUpdate(&driver2, _))
     .WillOnce(FutureSatisfy(&statusUpdate));
 
-  Clock::advance(STATUS_UPDATE_RETRY_INTERVAL);
+  Clock::advance(STATUS_UPDATE_RETRY_INTERVAL_MIN);
 
   AWAIT_READY(statusUpdate);
 
@@ -1853,7 +1853,8 @@ TEST_F(FaultToleranceTest, ReconcileIncompleteTasks)
 
   Future<TaskStatus> status;
   EXPECT_CALL(sched, statusUpdate(&driver, _))
-    .WillOnce(FutureArg<1>(&status));
+    .WillOnce(FutureArg<1>(&status))
+    .WillRepeatedly(Return()); // Ignore retried update due to update framework.
 
   Clock::pause();
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/afc0855f/src/tests/status_update_manager_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/status_update_manager_tests.cpp b/src/tests/status_update_manager_tests.cpp
index 184cd0e..07aa2ce 100644
--- a/src/tests/status_update_manager_tests.cpp
+++ b/src/tests/status_update_manager_tests.cpp
@@ -241,7 +241,7 @@ TEST_F(StatusUpdateManagerTest, RetryStatusUpdate)
   EXPECT_CALL(sched, statusUpdate(_, _))
     .WillOnce(FutureArg<1>(&status));
 
-  Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL);
+  Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
 
   AWAIT_READY(status);
 
@@ -325,7 +325,7 @@ TEST_F(StatusUpdateManagerTest, IgnoreDuplicateStatusUpdateAck)
   Future<Nothing> ack =
     FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
 
-  Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL);
+  Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
 
   AWAIT_READY(status);
 
@@ -557,7 +557,7 @@ TEST_F(StatusUpdateManagerTest, DuplicateTerminalUpdateBeforeAck)
   EXPECT_CALL(sched, statusUpdate(_, _))
     .WillOnce(FutureArg<1>(&update));
 
-  Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL);
+  Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
   Clock::settle();
 
   // Ensure the scheduler receives TASK_FINISHED.
@@ -763,7 +763,7 @@ TEST_F(StatusUpdateManagerTest, DuplicateUpdateBeforeAck)
   EXPECT_CALL(sched, statusUpdate(_, _))
     .WillOnce(FutureArg<1>(&update));
 
-  Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL);
+  Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
   Clock::settle();
 
   // Ensure the scheduler receives TASK_FINISHED.