You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2013/11/26 05:36:56 UTC
git commit: Fixed status update manager to backoff on status update
retries.
Updated Branches:
refs/heads/master 5d08c6590 -> afc0855fe
Fixed status update manager to backoff on status update retries.
Review: https://reviews.apache.org/r/15851
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/afc0855f
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/afc0855f
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/afc0855f
Branch: refs/heads/master
Commit: afc0855fe7a1587b34d2d6cfb2eaf69c907935f7
Parents: 5d08c65
Author: Vinod Kone <vi...@twitter.com>
Authored: Mon Nov 25 12:06:57 2013 -0800
Committer: Vinod Kone <vi...@twitter.com>
Committed: Mon Nov 25 20:28:51 2013 -0800
----------------------------------------------------------------------
src/slave/constants.cpp | 3 +-
src/slave/constants.hpp | 3 +-
src/slave/slave.cpp | 5 +++
src/slave/status_update_manager.cpp | 51 ++++++++++++++++++--------
src/slave/status_update_manager.hpp | 5 +++
src/tests/fault_tolerance_tests.cpp | 7 ++--
src/tests/status_update_manager_tests.cpp | 8 ++--
7 files changed, 58 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/afc0855f/src/slave/constants.cpp
----------------------------------------------------------------------
diff --git a/src/slave/constants.cpp b/src/slave/constants.cpp
index 5573d39..1226485 100644
--- a/src/slave/constants.cpp
+++ b/src/slave/constants.cpp
@@ -25,7 +25,8 @@ namespace slave {
const Duration EXECUTOR_REGISTRATION_TIMEOUT = Minutes(1);
const Duration EXECUTOR_SHUTDOWN_GRACE_PERIOD = Seconds(5);
const Duration EXECUTOR_REREGISTER_TIMEOUT = Seconds(2);
-const Duration STATUS_UPDATE_RETRY_INTERVAL = Seconds(10);
+const Duration STATUS_UPDATE_RETRY_INTERVAL_MIN = Seconds(10);
+const Duration STATUS_UPDATE_RETRY_INTERVAL_MAX = Minutes(10);
const Duration GC_DELAY = Weeks(1);
const double GC_DISK_HEADROOM = 0.1;
const Duration DISK_WATCH_INTERVAL = Minutes(1);
http://git-wip-us.apache.org/repos/asf/mesos/blob/afc0855f/src/slave/constants.hpp
----------------------------------------------------------------------
diff --git a/src/slave/constants.hpp b/src/slave/constants.hpp
index bbbbfd3..d237383 100644
--- a/src/slave/constants.hpp
+++ b/src/slave/constants.hpp
@@ -38,7 +38,8 @@ extern const Duration EXECUTOR_REGISTRATION_TIMEOUT;
extern const Duration EXECUTOR_SHUTDOWN_GRACE_PERIOD;
extern const Duration EXECUTOR_REREGISTER_TIMEOUT;
extern const Duration RECOVERY_TIMEOUT;
-extern const Duration STATUS_UPDATE_RETRY_INTERVAL;
+extern const Duration STATUS_UPDATE_RETRY_INTERVAL_MIN;
+extern const Duration STATUS_UPDATE_RETRY_INTERVAL_MAX;
extern const Duration GC_DELAY;
extern const Duration DISK_WATCH_INTERVAL;
extern const Duration RESOURCE_MONITORING_INTERVAL;
http://git-wip-us.apache.org/repos/asf/mesos/blob/afc0855f/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index a9be378..6fc18c5 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -1301,6 +1301,11 @@ void Slave::updateFramework(const FrameworkID& frameworkId, const string& pid)
<< framework->pid << "' to '" << path << "'";
CHECK_SOME(state::checkpoint(path, framework->pid));
}
+
+ // Inform status update manager to immediately resend any pending
+ // updates.
+ statusUpdateManager->flush();
+
break;
}
default:
http://git-wip-us.apache.org/repos/asf/mesos/blob/afc0855f/src/slave/status_update_manager.cpp
----------------------------------------------------------------------
diff --git a/src/slave/status_update_manager.cpp b/src/slave/status_update_manager.cpp
index fafd83d..b79f9de 100644
--- a/src/slave/status_update_manager.cpp
+++ b/src/slave/status_update_manager.cpp
@@ -86,6 +86,8 @@ public:
void newMasterDetected(const UPID& pid);
+ void flush();
+
void cleanup(const FrameworkID& frameworkId);
private:
@@ -98,13 +100,13 @@ private:
const Option<UUID>& uuid);
// Status update timeout.
- void timeout();
+ void timeout(const Duration& duration);
- // Forwards the status update to the master and starts a timer to check
- // for ACK from the scheduler.
+ // Forwards the status update to the master and starts a timer based
+ // on the 'duration' to check for ACK from the scheduler.
// NOTE: This should only be used for those messages that expect an
// ACK (e.g updates from the executor).
- Timeout forward(const StatusUpdate& update);
+ Timeout forward(const StatusUpdate& update, const Duration& duration);
// Helper functions.
@@ -159,14 +161,18 @@ void StatusUpdateManagerProcess::newMasterDetected(const UPID& pid)
master = pid;
// Retry any pending status updates.
- // This is useful when the updates were pending because there was
- // no master elected (e.g., during recovery).
+ flush();
+}
+
+
+void StatusUpdateManagerProcess::flush()
+{
foreachkey (const FrameworkID& frameworkId, streams) {
foreachvalue (StatusUpdateStream* stream, streams[frameworkId]) {
if (!stream->pending.empty()) {
const StatusUpdate& update = stream->pending.front();
LOG(WARNING) << "Resending status update " << update;
- stream->timeout = forward(update);
+ stream->timeout = forward(update, STATUS_UPDATE_RETRY_INTERVAL_MIN);
}
}
}
@@ -251,7 +257,8 @@ Future<Nothing> StatusUpdateManagerProcess::recover(
const Result<StatusUpdate>& next = stream->next();
CHECK(!next.isError());
if (next.isSome()) {
- stream->timeout = forward(next.get());
+ stream->timeout =
+ forward(next.get(), STATUS_UPDATE_RETRY_INTERVAL_MIN);
}
}
}
@@ -343,14 +350,16 @@ Future<Nothing> StatusUpdateManagerProcess::_update(
}
CHECK_SOME(next);
- stream->timeout = forward(next.get());
+ stream->timeout = forward(next.get(), STATUS_UPDATE_RETRY_INTERVAL_MIN);
}
return Nothing();
}
-Timeout StatusUpdateManagerProcess::forward(const StatusUpdate& update)
+Timeout StatusUpdateManagerProcess::forward(
+ const StatusUpdate& update,
+ const Duration& duration)
{
if (master) {
LOG(INFO) << "Forwarding status update " << update << " to " << master;
@@ -366,9 +375,10 @@ Timeout StatusUpdateManagerProcess::forward(const StatusUpdate& update)
}
// Send a message to self to resend after some delay if no ACK is received.
- return delay(STATUS_UPDATE_RETRY_INTERVAL,
+ return delay(duration,
self(),
- &StatusUpdateManagerProcess::timeout).timeout();
+ &StatusUpdateManagerProcess::timeout,
+ duration).timeout();
}
@@ -438,7 +448,7 @@ Future<bool> StatusUpdateManagerProcess::acknowledgement(
cleanupStatusUpdateStream(taskId, frameworkId);
} else if (next.isSome()) {
// Forward the next queued status update.
- stream->timeout = forward(next.get());
+ stream->timeout = forward(next.get(), STATUS_UPDATE_RETRY_INTERVAL_MIN);
}
return !terminated;
@@ -446,7 +456,7 @@ Future<bool> StatusUpdateManagerProcess::acknowledgement(
// TODO(vinod): There should be a limit on the retries.
-void StatusUpdateManagerProcess::timeout()
+void StatusUpdateManagerProcess::timeout(const Duration& duration)
{
// Check and see if we should resend any status updates.
foreachkey (const FrameworkID& frameworkId, streams) {
@@ -457,7 +467,12 @@ void StatusUpdateManagerProcess::timeout()
if (stream->timeout.get().expired()) {
const StatusUpdate& update = stream->pending.front();
LOG(WARNING) << "Resending status update " << update;
- stream->timeout = forward(update);
+
+ // Bounded exponential backoff.
+ Duration duration_ =
+ std::min(duration * 2, STATUS_UPDATE_RETRY_INTERVAL_MAX);
+
+ stream->timeout = forward(update, duration_);
}
}
}
@@ -605,6 +620,12 @@ void StatusUpdateManager::newMasterDetected(const UPID& pid)
}
+void StatusUpdateManager::flush()
+{
+ dispatch(process, &StatusUpdateManagerProcess::flush);
+}
+
+
void StatusUpdateManager::cleanup(const FrameworkID& frameworkId)
{
dispatch(process, &StatusUpdateManagerProcess::cleanup, frameworkId);
http://git-wip-us.apache.org/repos/asf/mesos/blob/afc0855f/src/slave/status_update_manager.hpp
----------------------------------------------------------------------
diff --git a/src/slave/status_update_manager.hpp b/src/slave/status_update_manager.hpp
index 5243ed4..06ea465 100644
--- a/src/slave/status_update_manager.hpp
+++ b/src/slave/status_update_manager.hpp
@@ -120,6 +120,11 @@ public:
// TODO(vinod): Remove this hack once the new leader detector code is merged.
void newMasterDetected(const UPID& pid);
+ // Resend all the pending updates right away.
+ // This is useful when the updates were pending because there was
+ // no master elected (e.g., during recovery) or framework failed over.
+ void flush();
+
// Closes all the status update streams corresponding to this framework.
// NOTE: This stops retrying any pending status updates for this framework.
void cleanup(const FrameworkID& frameworkId);
http://git-wip-us.apache.org/repos/asf/mesos/blob/afc0855f/src/tests/fault_tolerance_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/fault_tolerance_tests.cpp b/src/tests/fault_tolerance_tests.cpp
index 40f474a..f376b88 100644
--- a/src/tests/fault_tolerance_tests.cpp
+++ b/src/tests/fault_tolerance_tests.cpp
@@ -55,7 +55,7 @@ using mesos::internal::master::Master;
using mesos::internal::slave::Isolator;
using mesos::internal::slave::Slave;
-using mesos::internal::slave::STATUS_UPDATE_RETRY_INTERVAL;
+using mesos::internal::slave::STATUS_UPDATE_RETRY_INTERVAL_MIN;
using process::Clock;
using process::Future;
@@ -1072,7 +1072,7 @@ TEST_F(FaultToleranceTest, SchedulerFailoverStatusUpdate)
EXPECT_CALL(sched2, statusUpdate(&driver2, _))
.WillOnce(FutureSatisfy(&statusUpdate));
- Clock::advance(STATUS_UPDATE_RETRY_INTERVAL);
+ Clock::advance(STATUS_UPDATE_RETRY_INTERVAL_MIN);
AWAIT_READY(statusUpdate);
@@ -1853,7 +1853,8 @@ TEST_F(FaultToleranceTest, ReconcileIncompleteTasks)
Future<TaskStatus> status;
EXPECT_CALL(sched, statusUpdate(&driver, _))
- .WillOnce(FutureArg<1>(&status));
+ .WillOnce(FutureArg<1>(&status))
+ .WillRepeatedly(Return()); // Ignore retried update due to update framework.
Clock::pause();
http://git-wip-us.apache.org/repos/asf/mesos/blob/afc0855f/src/tests/status_update_manager_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/status_update_manager_tests.cpp b/src/tests/status_update_manager_tests.cpp
index 184cd0e..07aa2ce 100644
--- a/src/tests/status_update_manager_tests.cpp
+++ b/src/tests/status_update_manager_tests.cpp
@@ -241,7 +241,7 @@ TEST_F(StatusUpdateManagerTest, RetryStatusUpdate)
EXPECT_CALL(sched, statusUpdate(_, _))
.WillOnce(FutureArg<1>(&status));
- Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL);
+ Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
AWAIT_READY(status);
@@ -325,7 +325,7 @@ TEST_F(StatusUpdateManagerTest, IgnoreDuplicateStatusUpdateAck)
Future<Nothing> ack =
FUTURE_DISPATCH(_, &Slave::_statusUpdateAcknowledgement);
- Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL);
+ Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
AWAIT_READY(status);
@@ -557,7 +557,7 @@ TEST_F(StatusUpdateManagerTest, DuplicateTerminalUpdateBeforeAck)
EXPECT_CALL(sched, statusUpdate(_, _))
.WillOnce(FutureArg<1>(&update));
- Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL);
+ Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
Clock::settle();
// Ensure the scheduler receives TASK_FINISHED.
@@ -763,7 +763,7 @@ TEST_F(StatusUpdateManagerTest, DuplicateUpdateBeforeAck)
EXPECT_CALL(sched, statusUpdate(_, _))
.WillOnce(FutureArg<1>(&update));
- Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL);
+ Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
Clock::settle();
// Ensure the scheduler receives TASK_FINISHED.