You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2014/10/09 03:30:10 UTC
[1/3] git commit: Introduced Master <-> Slave reconciliation.
Repository: mesos
Updated Branches:
refs/heads/master b810250fa -> c96ba8f60
Introduced Master <-> Slave reconciliation.
Review: https://reviews.apache.org/r/26206
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/cc4444eb
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/cc4444eb
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/cc4444eb
Branch: refs/heads/master
Commit: cc4444eb47705649a4e93a4045b4d130dd4b3354
Parents: b810250
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Tue Sep 30 11:31:53 2014 -0700
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Wed Oct 8 18:09:04 2014 -0700
----------------------------------------------------------------------
src/master/master.cpp | 95 +++++++++++++++++++-------------
src/master/master.hpp | 5 +-
src/messages/messages.proto | 2 +
src/slave/slave.cpp | 79 +++++++++++++++++++++-----
src/slave/slave.hpp | 7 ++-
src/tests/fault_tolerance_tests.cpp | 12 +++-
6 files changed, 142 insertions(+), 58 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/cc4444eb/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index cc48b96..cb46cec 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -3092,21 +3092,7 @@ void Master::reregisterSlave(
// For now, we assume this slave is not nefarious (eventually
// this will be handled by orthogonal security measures like key
// based authentication).
- LOG(WARNING) << "Slave at " << from << " (" << slave->info.hostname()
- << ") is being allowed to re-register with an already"
- << " in use id (" << slave->id << ")";
-
- // TODO(bmahler): There's an implicit assumption here that when
- // the master already knows about this slave, the slave cannot
- // have tasks unknown to the master. This _should_ be the case
- // since the causal relationship is:
- // slave removes task -> master removes task
- // We should enforce this via a CHECK (dangerous), or by shutting
- // down slaves that are found to violate this assumption.
-
- SlaveReregisteredMessage message;
- message.mutable_slave_id()->MergeFrom(slave->id);
- send(from, message);
+ LOG(INFO) << "Re-registering slave " << *slave;
// Update the slave pid and relink to it.
// NOTE: Re-linking the slave here always rather than only when
@@ -3119,8 +3105,8 @@ void Master::reregisterSlave(
link(slave->pid);
// Reconcile tasks between master and the slave.
- // NOTE: This needs to be done after the registration message is
- // sent to the slave and the new pid is linked.
+ // NOTE: This sends the re-registered message, including tasks
+ // that need to be reconciled by the slave.
reconcile(slave, executorInfos, tasks);
// If this is a disconnected slave, add it back to the allocator.
@@ -3871,44 +3857,79 @@ void Master::reconcile(
{
CHECK_NOTNULL(slave);
+ // TODO(bmahler): There's an implicit assumption here the slave
+ // cannot have tasks unknown to the master. This _should_ be the
+ // case since the causal relationship is:
+ // slave removes task -> master removes task
+ // Add error logging for any violations of this assumption!
+
// We convert the 'tasks' into a map for easier lookup below.
- // TODO(vinod): Check if the tasks are known to the master.
multihashmap<FrameworkID, TaskID> slaveTasks;
foreach (const Task& task, tasks) {
slaveTasks.put(task.framework_id(), task.task_id());
}
- // Send TASK_LOST updates for tasks present in the master but
- // missing from the slave. This could happen if the task was
- // dropped by the slave (e.g., slave exited before getting the
- // task or the task was launched while slave was in recovery).
- // NOTE: copies are needed because removeTask modifies slave->tasks.
+ // Look for tasks missing in the slave's re-registration message.
+ // This can occur when:
+ // (1) a launch message was dropped (e.g. slave failed over), or
+ // (2) the slave re-registration raced with a launch message, in
+ // which case the slave actually received the task.
+ // To resolve both cases correctly, we must reconcile through the
+ // slave. For slaves that do not support reconciliation, we keep
+ // the old semantics and cover only case (1) via TASK_LOST.
+ SlaveReregisteredMessage reregistered;
+ reregistered.mutable_slave_id()->MergeFrom(slave->id);
+
+ // NOTE: copies are needed because removeTask modified slave->tasks.
foreachkey (const FrameworkID& frameworkId, utils::copy(slave->tasks)) {
+ ReconcileTasksMessage reconcile;
+ reconcile.mutable_framework_id()->CopyFrom(frameworkId);
+
foreachvalue (Task* task, utils::copy(slave->tasks[frameworkId])) {
if (!slaveTasks.contains(task->framework_id(), task->task_id())) {
LOG(WARNING) << "Task " << task->task_id()
<< " of framework " << task->framework_id()
<< " unknown to the slave " << *slave
- << " during re-registration";
-
- const StatusUpdate& update = protobuf::createStatusUpdate(
- task->framework_id(),
- slave->id,
- task->task_id(),
- TASK_LOST,
- "Task is unknown to the slave");
+ << " during re-registration"
+ << (slave->version.isSome()
+ ? ": reconciling with the slave"
+ : ": sending TASK_LOST");
+
+ if (slave->version.isSome()) {
+ TaskStatus* status = reconcile.add_statuses();
+ status->mutable_task_id()->CopyFrom(task->task_id());
+ status->mutable_slave_id()->CopyFrom(slave->id);
+ status->set_state(task->state());
+ status->set_message("Reconciliation request");
+ status->set_timestamp(Clock::now().secs());
+ } else {
+ // TODO(bmahler): Remove this case in 0.22.0.
+ const StatusUpdate& update = protobuf::createStatusUpdate(
+ task->framework_id(),
+ slave->id,
+ task->task_id(),
+ TASK_LOST,
+ "Task is unknown to the slave");
+
+ updateTask(task, update.status());
+ removeTask(task);
- updateTask(task, update.status());
- removeTask(task);
-
- Framework* framework = getFramework(frameworkId);
- if (framework != NULL) {
- forward(update, UPID(), framework);
+ Framework* framework = getFramework(frameworkId);
+ if (framework != NULL) {
+ forward(update, UPID(), framework);
+ }
}
}
}
+
+ if (slave->version.isSome() && reconcile.statuses_size() > 0) {
+ reregistered.add_reconciliations()->CopyFrom(reconcile);
+ }
}
+ // Re-register the slave.
+ send(slave->pid, reregistered);
+
// Likewise, any executors that are present in the master but
// not present in the slave must be removed to correctly account
// for resources. First we index the executors for fast lookup below.
http://git-wip-us.apache.org/repos/asf/mesos/blob/cc4444eb/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 49589f4..14f1d0f 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -292,9 +292,8 @@ protected:
// Invoked when the contender has entered the contest.
void contended(const process::Future<process::Future<Nothing> >& candidacy);
- // Reconciles a re-registering slave's tasks / executors and sends
- // TASK_LOST updates for tasks known to the master but unknown to
- // the slave.
+ // Handles a known re-registering slave by reconciling the master's
+ // view of the slave's tasks and executors.
void reconcile(
Slave* slave,
const std::vector<ExecutorInfo>& executors,
http://git-wip-us.apache.org/repos/asf/mesos/blob/cc4444eb/src/messages/messages.proto
----------------------------------------------------------------------
diff --git a/src/messages/messages.proto b/src/messages/messages.proto
index edf1e4e..77515d9 100644
--- a/src/messages/messages.proto
+++ b/src/messages/messages.proto
@@ -259,6 +259,8 @@ message SlaveRegisteredMessage {
message SlaveReregisteredMessage {
required SlaveID slave_id = 1;
+
+ repeated ReconcileTasksMessage reconciliations = 2;
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/cc4444eb/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 809b008..cb37599 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -347,7 +347,8 @@ void Slave::initialize()
install<SlaveReregisteredMessage>(
&Slave::reregistered,
- &SlaveReregisteredMessage::slave_id);
+ &SlaveReregisteredMessage::slave_id,
+ &SlaveReregisteredMessage::reconciliations);
install<RunTaskMessage>(
&Slave::runTask,
@@ -810,7 +811,10 @@ void Slave::registered(const UPID& from, const SlaveID& slaveId)
}
-void Slave::reregistered(const UPID& from, const SlaveID& slaveId)
+void Slave::reregistered(
+ const UPID& from,
+ const SlaveID& slaveId,
+ const vector<ReconcileTasksMessage>& reconciliations)
{
if (master != from) {
LOG(WARNING) << "Ignoring re-registration message from " << from
@@ -823,25 +827,15 @@ void Slave::reregistered(const UPID& from, const SlaveID& slaveId)
case DISCONNECTED:
CHECK_SOME(master);
LOG(INFO) << "Re-registered with master " << master.get();
-
state = RUNNING;
- if (!(info.id() == slaveId)) {
- EXIT(1) << "Re-registered but got wrong id: " << slaveId
- << "(expected: " << info.id() << "). Committing suicide";
- }
break;
case RUNNING:
- // Already re-registered!
- if (!(info.id() == slaveId)) {
- EXIT(1) << "Re-registered but got wrong id: " << slaveId
- << "(expected: " << info.id() << "). Committing suicide";
- }
CHECK_SOME(master);
LOG(WARNING) << "Already re-registered with master " << master.get();
break;
case TERMINATING:
LOG(WARNING) << "Ignoring re-registration because slave is terminating";
- break;
+ return;
case RECOVERING:
// It's possible to receive a message intended for the previous
// run of the slave here. Short term we can leave this as is and
@@ -851,7 +845,64 @@ void Slave::reregistered(const UPID& from, const SlaveID& slaveId)
// https://issues.apache.org/jira/browse/MESOS-677
default:
LOG(FATAL) << "Unexpected slave state " << state;
- break;
+ return;;
+ }
+
+ if (!(info.id() == slaveId)) {
+ EXIT(1) << "Re-registered but got wrong id: " << slaveId
+ << "(expected: " << info.id() << "). Committing suicide";
+ }
+
+ // Reconcile any tasks per the master's request.
+ foreach (const ReconcileTasksMessage& reconcile, reconciliations) {
+ Framework* framework = getFramework(reconcile.framework_id());
+
+ foreach (const TaskStatus& status, reconcile.statuses()) {
+ const TaskID& taskId = status.task_id();
+
+ bool known = false;
+
+ // Try to locate the task.
+ if (framework != NULL) {
+ foreachkey (const ExecutorID& executorId, framework->pending) {
+ if (framework->pending[executorId].contains(taskId)) {
+ known = true;
+ }
+ }
+ foreachvalue (Executor* executor, framework->executors) {
+ if (executor->queuedTasks.contains(taskId) ||
+ executor->launchedTasks.contains(taskId) ||
+ executor->terminatedTasks.contains(taskId)) {
+ known = true;
+ }
+ }
+ }
+
+ // We only need to send a TASK_LOST update when the task is
+ // unknown (so that the master removes it). Otherwise, the
+ // master correctly holds the task and will receive updates.
+ if (!known) {
+ LOG(WARNING) << "Slave reconciling task " << taskId
+ << " of framework " << reconcile.framework_id()
+ << " in state TASK_LOST: task unknown to the slave";
+
+ const StatusUpdate& update = protobuf::createStatusUpdate(
+ reconcile.framework_id(),
+ info.id(),
+ taskId,
+ TASK_LOST,
+ "Reconciliation: task unknown to the slave");
+
+ // NOTE: We can't use statusUpdate() here because it drops
+ // updates for unknown frameworks.
+ statusUpdateManager->update(update, info.id())
+ .onAny(defer(self(),
+ &Slave::__statusUpdate,
+ lambda::_1,
+ update,
+ UPID()));
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/cc4444eb/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 2869710..76d505c 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -97,7 +97,12 @@ public:
void shutdown(const process::UPID& from, const std::string& message);
void registered(const process::UPID& from, const SlaveID& slaveId);
- void reregistered(const process::UPID& from, const SlaveID& slaveId);
+
+ void reregistered(
+ const process::UPID& from,
+ const SlaveID& slaveId,
+ const std::vector<ReconcileTasksMessage>& reconciliations);
+
void doReliableRegistration(const Duration& duration);
void runTask(
http://git-wip-us.apache.org/repos/asf/mesos/blob/cc4444eb/src/tests/fault_tolerance_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/fault_tolerance_tests.cpp b/src/tests/fault_tolerance_tests.cpp
index e8f5322..c34a9d6 100644
--- a/src/tests/fault_tolerance_tests.cpp
+++ b/src/tests/fault_tolerance_tests.cpp
@@ -2233,9 +2233,9 @@ TEST_F(FaultToleranceTest, FrameworkReregisterEmptyExecutor)
}
-// This test verifies that the master sends TASK_LOST updates
-// for tasks in the master absent from the re-registered slave.
-// We do this by dropping RunTaskMessage from master to the slave.
+// This test verifies that the master reconciles tasks that are
+// missing from a re-registering slave. In this case, we drop the
+// RunTaskMessage so the slave should send TASK_LOST.
TEST_F(FaultToleranceTest, ReconcileLostTasks)
{
Try<PID<Master> > master = StartMaster();
@@ -2285,6 +2285,9 @@ TEST_F(FaultToleranceTest, ReconcileLostTasks)
Future<SlaveReregisteredMessage> slaveReregisteredMessage =
FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
+ Future<StatusUpdateMessage> statusUpdateMessage =
+ FUTURE_PROTOBUF(StatusUpdateMessage(), _, master.get());
+
Future<TaskStatus> status;
EXPECT_CALL(sched, statusUpdate(&driver, _))
.WillOnce(FutureArg<1>(&status));
@@ -2295,6 +2298,9 @@ TEST_F(FaultToleranceTest, ReconcileLostTasks)
AWAIT_READY(slaveReregisteredMessage);
+ // Make sure the slave generated the TASK_LOST.
+ AWAIT_READY(statusUpdateMessage);
+
AWAIT_READY(status);
ASSERT_EQ(task.task_id(), status.get().task_id());
[3/3] git commit: Added a test for the Master <-> Slave
reconciliation race.
Posted by bm...@apache.org.
Added a test for the Master <-> Slave reconciliation race.
Review: https://reviews.apache.org/r/26208
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/c96ba8f6
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/c96ba8f6
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/c96ba8f6
Branch: refs/heads/master
Commit: c96ba8f6035329acebb25ca0f52215284bbf8f8f
Parents: cd03dfa
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Tue Sep 30 15:52:00 2014 -0700
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Wed Oct 8 18:09:06 2014 -0700
----------------------------------------------------------------------
src/tests/master_slave_reconciliation_tests.cpp | 119 +++++++++++++++++++
1 file changed, 119 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/c96ba8f6/src/tests/master_slave_reconciliation_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_slave_reconciliation_tests.cpp b/src/tests/master_slave_reconciliation_tests.cpp
index 0e703ab..04806ed 100644
--- a/src/tests/master_slave_reconciliation_tests.cpp
+++ b/src/tests/master_slave_reconciliation_tests.cpp
@@ -237,6 +237,125 @@ TEST_F(MasterSlaveReconciliationTest, ReconcileLostTask)
}
+// This test verifies that the master reconciles tasks that are
+// missing from a re-registering slave. In this case, we trigger
+// a race between the slave re-registration message and the launch
+// message. There should be no TASK_LOST.
+// This was motivated by MESOS-1696.
+TEST_F(MasterSlaveReconciliationTest, ReconcileRace)
+{
+ Try<PID<Master> > master = StartMaster();
+ ASSERT_SOME(master);
+
+ MockExecutor exec(DEFAULT_EXECUTOR_ID);
+
+ StandaloneMasterDetector detector(master.get());
+
+ Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+ FUTURE_PROTOBUF(SlaveRegisteredMessage(), master.get(), _);
+
+ Try<PID<Slave> > slave = StartSlave(&exec, &detector);
+ ASSERT_SOME(slave);
+
+ AWAIT_READY(slaveRegisteredMessage);
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+ EXPECT_CALL(sched, registered(&driver, _, _));
+
+ Future<vector<Offer> > offers;
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ driver.start();
+
+ // Trigger a re-registration of the slave and capture the message
+ // so that we can spoof a race with a launch task message.
+ DROP_PROTOBUFS(ReregisterSlaveMessage(), slave.get(), master.get());
+
+ Future<ReregisterSlaveMessage> reregisterSlaveMessage =
+ DROP_PROTOBUF(ReregisterSlaveMessage(), slave.get(), master.get());
+
+ detector.appoint(master.get());
+
+ AWAIT_READY(reregisterSlaveMessage);
+
+ AWAIT_READY(offers);
+ EXPECT_NE(0u, offers.get().size());
+
+ TaskInfo task;
+ task.set_name("test task");
+ task.mutable_task_id()->set_value("1");
+ task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
+ task.mutable_resources()->MergeFrom(offers.get()[0].resources());
+ task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO);
+
+ vector<TaskInfo> tasks;
+ tasks.push_back(task);
+
+ ExecutorDriver* executorDriver;
+ EXPECT_CALL(exec, registered(_, _, _, _))
+ .WillOnce(SaveArg<0>(&executorDriver));
+
+ // Leave the task in TASK_STAGING.
+ Future<Nothing> launchTask;
+ EXPECT_CALL(exec, launchTask(_, _))
+ .WillOnce(FutureSatisfy(&launchTask));
+
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .Times(0);
+
+ driver.launchTasks(offers.get()[0].id(), tasks);
+
+ AWAIT_READY(launchTask);
+
+ // Send the stale re-registration message, which does not contain
+ // the task we just launched. This will trigger a reconciliation
+ // by the master.
+ Future<SlaveReregisteredMessage> slaveReregisteredMessage =
+ FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
+
+ // Prevent this from being dropped per the DROP_PROTOBUFS above.
+ FUTURE_PROTOBUF(ReregisterSlaveMessage(), slave.get(), master.get());
+
+ process::post(slave.get(), master.get(), reregisterSlaveMessage.get());
+
+ AWAIT_READY(slaveReregisteredMessage);
+
+ // Neither the master nor the slave should not send a TASK_LOST
+ // as part of the reconciliation. We check this by calling
+ // Clock::settle() to flush all pending events.
+ Clock::pause();
+ Clock::settle();
+ Clock::resume();
+
+ // Now send TASK_FINISHED and make sure it's the only message
+ // received by the scheduler.
+ Future<TaskStatus> status;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&status));
+
+ TaskStatus taskStatus;
+ taskStatus.mutable_task_id()->CopyFrom(task.task_id());
+ taskStatus.set_state(TASK_FINISHED);
+ executorDriver->sendStatusUpdate(taskStatus);
+
+ AWAIT_READY(status);
+ ASSERT_EQ(TASK_FINISHED, status.get().state());
+
+ EXPECT_CALL(exec, shutdown(_))
+ .Times(AtMost(1));
+
+ driver.stop();
+ driver.join();
+
+ Shutdown();
+}
+
+
// This test verifies that the slave reports pending tasks when
// re-registering, otherwise the master will report them as being
// lost.
[2/3] git commit: Split out partition and reconciliation tests from
FaultTolerace tests.
Posted by bm...@apache.org.
Split out partition and reconciliation tests from FaultTolerace tests.
Review: https://reviews.apache.org/r/26207
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/cd03dfa6
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/cd03dfa6
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/cd03dfa6
Branch: refs/heads/master
Commit: cd03dfa6e4022bc17fa9dd0599a17f391ce7978d
Parents: cc4444e
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Tue Sep 30 12:18:28 2014 -0700
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Wed Oct 8 18:09:06 2014 -0700
----------------------------------------------------------------------
src/Makefile.am | 1 +
src/tests/fault_tolerance_tests.cpp | 769 +------------------
src/tests/master_slave_reconciliation_tests.cpp | 401 ++++++++++
src/tests/partition_tests.cpp | 363 ++++++++-
4 files changed, 800 insertions(+), 734 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/cd03dfa6/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index fb12b3e..d503c8d 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -1166,6 +1166,7 @@ mesos_tests_SOURCES = \
tests/main.cpp \
tests/master_authorization_tests.cpp \
tests/master_contender_detector_tests.cpp \
+ tests/master_slave_reconciliation_tests.cpp \
tests/master_tests.cpp \
tests/mesos.cpp \
tests/module_tests.cpp \
http://git-wip-us.apache.org/repos/asf/mesos/blob/cd03dfa6/src/tests/fault_tolerance_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/fault_tolerance_tests.cpp b/src/tests/fault_tolerance_tests.cpp
index c34a9d6..a75910d 100644
--- a/src/tests/fault_tolerance_tests.cpp
+++ b/src/tests/fault_tolerance_tests.cpp
@@ -16,13 +16,8 @@
* limitations under the License.
*/
-#include <stdint.h>
-#include <unistd.h>
-
#include <gmock/gmock.h>
-#include <map>
-#include <string>
#include <vector>
#include <mesos/executor.hpp>
@@ -45,6 +40,7 @@
#include "master/allocator.hpp"
#include "master/master.hpp"
+#include "slave/constants.hpp"
#include "slave/slave.hpp"
#include "tests/containerizer.hpp"
@@ -58,7 +54,6 @@ using namespace mesos::internal::tests;
using mesos::internal::master::Master;
using mesos::internal::slave::Slave;
-using mesos::internal::slave::STATUS_UPDATE_RETRY_INTERVAL_MIN;
using process::Clock;
using process::Future;
@@ -70,15 +65,12 @@ using process::UPID;
using process::http::OK;
using process::http::Response;
-using std::string;
-using std::map;
using std::vector;
using testing::_;
using testing::AnyOf;
using testing::AtMost;
using testing::DoAll;
-using testing::ElementsAre;
using testing::Eq;
using testing::Not;
using testing::Return;
@@ -134,224 +126,7 @@ TEST_F(FaultToleranceTest, SlaveLost)
}
-// This test checks that a scheduler gets a slave lost
-// message for a partioned slave.
-TEST_F(FaultToleranceTest, PartitionedSlave)
-{
- Try<PID<Master> > master = StartMaster();
- ASSERT_SOME(master);
-
- // Set these expectations up before we spawn the slave so that we
- // don't miss the first PING.
- Future<Message> ping = FUTURE_MESSAGE(Eq("PING"), _, _);
-
- // Drop all the PONGs to simulate slave partition.
- DROP_MESSAGES(Eq("PONG"), _, _);
-
- Try<PID<Slave> > slave = StartSlave();
- ASSERT_SOME(slave);
-
- MockScheduler sched;
- MesosSchedulerDriver driver(
- &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
-
- EXPECT_CALL(sched, registered(&driver, _, _));
-
- Future<Nothing> resourceOffers;
- EXPECT_CALL(sched, resourceOffers(&driver, _))
- .WillOnce(FutureSatisfy(&resourceOffers))
- .WillRepeatedly(Return()); // Ignore subsequent offers.
-
- driver.start();
-
- // Need to make sure the framework AND slave have registered with
- // master. Waiting for resource offers should accomplish both.
- AWAIT_READY(resourceOffers);
-
- Clock::pause();
-
- EXPECT_CALL(sched, offerRescinded(&driver, _))
- .Times(AtMost(1));
-
- Future<Nothing> slaveLost;
- EXPECT_CALL(sched, slaveLost(&driver, _))
- .WillOnce(FutureSatisfy(&slaveLost));
-
- // Now advance through the PINGs.
- uint32_t pings = 0;
- while (true) {
- AWAIT_READY(ping);
- pings++;
- if (pings == master::MAX_SLAVE_PING_TIMEOUTS) {
- break;
- }
- ping = FUTURE_MESSAGE(Eq("PING"), _, _);
- Clock::advance(master::SLAVE_PING_TIMEOUT);
- }
-
- Clock::advance(master::SLAVE_PING_TIMEOUT);
-
- AWAIT_READY(slaveLost);
-
- driver.stop();
- driver.join();
-
- Shutdown();
-
- Clock::resume();
-}
-
-
-// The purpose of this test is to ensure that when slaves are removed
-// from the master, and then attempt to re-register, we deny the
-// re-registration by sending a ShutdownMessage to the slave.
-// Why? Because during a network partition, the master will remove a
-// partitioned slave, thus sending its tasks to LOST. At this point,
-// when the partition is removed, the slave will attempt to
-// re-register with its running tasks. We've already notified
-// frameworks that these tasks were LOST, so we have to have the slave
-// slave shut down.
-TEST_F(FaultToleranceTest, PartitionedSlaveReregistration)
-{
- Try<PID<Master> > master = StartMaster();
- ASSERT_SOME(master);
-
- // Allow the master to PING the slave, but drop all PONG messages
- // from the slave. Note that we don't match on the master / slave
- // PIDs because it's actually the SlaveObserver Process that sends
- // the pings.
- Future<Message> ping = FUTURE_MESSAGE(Eq("PING"), _, _);
- DROP_MESSAGES(Eq("PONG"), _, _);
-
- MockExecutor exec(DEFAULT_EXECUTOR_ID);
-
- StandaloneMasterDetector detector(master.get());
-
- Try<PID<Slave> > slave = StartSlave(&exec, &detector);
- ASSERT_SOME(slave);
-
- MockScheduler sched;
- MesosSchedulerDriver driver(
- &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
-
- EXPECT_CALL(sched, registered(&driver, _, _));
-
- Future<vector<Offer> > offers;
- EXPECT_CALL(sched, resourceOffers(&driver, _))
- .WillOnce(FutureArg<1>(&offers))
- .WillRepeatedly(Return());
-
- driver.start();
-
- AWAIT_READY(offers);
- ASSERT_NE(0u, offers.get().size());
-
- // Launch a task. This is to ensure the task is killed by the slave,
- // during shutdown.
- TaskID taskId;
- taskId.set_value("1");
-
- TaskInfo task;
- task.set_name("");
- task.mutable_task_id()->MergeFrom(taskId);
- task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
- task.mutable_resources()->MergeFrom(offers.get()[0].resources());
- task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO);
- task.mutable_executor()->mutable_command()->set_value("sleep 60");
-
- vector<TaskInfo> tasks;
- tasks.push_back(task);
-
- // Set up the expectations for launching the task.
- EXPECT_CALL(exec, registered(_, _, _, _));
- EXPECT_CALL(exec, launchTask(_, _))
- .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
-
- Future<TaskStatus> runningStatus;
- EXPECT_CALL(sched, statusUpdate(&driver, _))
- .WillOnce(FutureArg<1>(&runningStatus));
-
- Future<Nothing> statusUpdateAck = FUTURE_DISPATCH(
- slave.get(), &Slave::_statusUpdateAcknowledgement);
-
- driver.launchTasks(offers.get()[0].id(), tasks);
-
- AWAIT_READY(runningStatus);
- EXPECT_EQ(TASK_RUNNING, runningStatus.get().state());
-
- // Wait for the slave to have handled the acknowledgment prior
- // to pausing the clock.
- AWAIT_READY(statusUpdateAck);
-
- // Drop the first shutdown message from the master (simulated
- // partition), allow the second shutdown message to pass when
- // the slave re-registers.
- Future<ShutdownMessage> shutdownMessage =
- DROP_PROTOBUF(ShutdownMessage(), _, slave.get());
-
- Future<TaskStatus> lostStatus;
- EXPECT_CALL(sched, statusUpdate(&driver, _))
- .WillOnce(FutureArg<1>(&lostStatus));
-
- Future<Nothing> slaveLost;
- EXPECT_CALL(sched, slaveLost(&driver, _))
- .WillOnce(FutureSatisfy(&slaveLost));
-
- Clock::pause();
-
- // Now, induce a partition of the slave by having the master
- // timeout the slave.
- uint32_t pings = 0;
- while (true) {
- AWAIT_READY(ping);
- pings++;
- if (pings == master::MAX_SLAVE_PING_TIMEOUTS) {
- break;
- }
- ping = FUTURE_MESSAGE(Eq("PING"), _, _);
- Clock::advance(master::SLAVE_PING_TIMEOUT);
- Clock::settle();
- }
-
- Clock::advance(master::SLAVE_PING_TIMEOUT);
- Clock::settle();
-
- // The master will have notified the framework of the lost task.
- AWAIT_READY(lostStatus);
- EXPECT_EQ(TASK_LOST, lostStatus.get().state());
-
- // Wait for the master to attempt to shut down the slave.
- AWAIT_READY(shutdownMessage);
-
- // The master will notify the framework that the slave was lost.
- AWAIT_READY(slaveLost);
-
- Clock::resume();
-
- // We now complete the partition on the slave side as well. This
- // is done by simulating a master loss event which would normally
- // occur during a network partition.
- detector.appoint(None());
-
- Future<Nothing> shutdown;
- EXPECT_CALL(exec, shutdown(_))
- .WillOnce(FutureSatisfy(&shutdown));
-
- shutdownMessage = FUTURE_PROTOBUF(ShutdownMessage(), _, slave.get());
-
- // Have the slave re-register with the master.
- detector.appoint(master.get());
-
- // Upon re-registration, the master will shutdown the slave.
- // The slave will then shut down the executor.
- AWAIT_READY(shutdownMessage);
- AWAIT_READY(shutdown);
-
- driver.stop();
- driver.join();
- Shutdown();
-}
// The purpose of this test is to ensure that when slaves are removed
@@ -465,196 +240,60 @@ TEST_F(FaultToleranceTest, PartitionedSlaveStatusUpdates)
}
-// The purpose of this test is to ensure that when slaves are removed
-// from the master, and then attempt to send exited executor messages,
-// we send a ShutdownMessage to the slave. Why? Because during a
-// network partition, the master will remove a partitioned slave, thus
-// sending its tasks to LOST. At this point, when the partition is
-// removed, the slave may attempt to send exited executor messages if
-// it was unaware that the master removed it. We've already
-// notified frameworks that the tasks under the executors were LOST,
-// so we have to have the slave shut down.
-TEST_F(FaultToleranceTest, PartitionedSlaveExitedExecutor)
+// This test ensures that a framework connecting with a
+// failed over master gets a registered callback.
+// Note that this behavior might change in the future and
+// the scheduler might receive a re-registered callback.
+TEST_F(FaultToleranceTest, MasterFailover)
{
Try<PID<Master> > master = StartMaster();
ASSERT_SOME(master);
- // Allow the master to PING the slave, but drop all PONG messages
- // from the slave. Note that we don't match on the master / slave
- // PIDs because it's actually the SlaveObserver Process that sends
- // the pings.
- Future<Message> ping = FUTURE_MESSAGE(Eq("PING"), _, _);
- DROP_MESSAGES(Eq("PONG"), _, _);
-
- MockExecutor exec(DEFAULT_EXECUTOR_ID);
- TestContainerizer containerizer(&exec);
-
- Try<PID<Slave> > slave = StartSlave(&containerizer);
- ASSERT_SOME(slave);
-
MockScheduler sched;
- MesosSchedulerDriver driver(
- &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+ StandaloneMasterDetector detector(master.get());
+ TestingMesosSchedulerDriver driver(&sched, &detector);
- Future<FrameworkID> frameworkId;
- EXPECT_CALL(sched, registered(&driver, _, _))
- .WillOnce(FutureArg<1>(&frameworkId));\
+ Future<process::Message> frameworkRegisteredMessage =
+ FUTURE_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()), _, _);
- Future<vector<Offer> > offers;
- EXPECT_CALL(sched, resourceOffers(&driver, _))
- .WillOnce(FutureArg<1>(&offers))
- .WillRepeatedly(Return());
+ Future<Nothing> registered1;
+ EXPECT_CALL(sched, registered(&driver, _, _))
+ .WillOnce(FutureSatisfy(®istered1));
driver.start();
- AWAIT_READY(frameworkId);
- AWAIT_READY(offers);
- ASSERT_NE(0u, offers.get().size());
+ AWAIT_READY(frameworkRegisteredMessage);
- // Launch a task. This allows us to have the slave send an
- // ExitedExecutorMessage.
- TaskID taskId;
- taskId.set_value("1");
+ AWAIT_READY(registered1);
- TaskInfo task;
- task.set_name("");
- task.mutable_task_id()->MergeFrom(taskId);
- task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
- task.mutable_resources()->MergeFrom(offers.get()[0].resources());
- task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO);
- task.mutable_executor()->mutable_command()->set_value("sleep 60");
+ // Simulate failed over master by restarting the master.
+ Stop(master.get());
+ master = StartMaster();
+ ASSERT_SOME(master);
- vector<TaskInfo> tasks;
- tasks.push_back(task);
+ EXPECT_CALL(sched, disconnected(&driver));
- // Set up the expectations for launching the task.
- EXPECT_CALL(exec, registered(_, _, _, _));
+ Future<AuthenticateMessage> authenticateMessage =
+ FUTURE_PROTOBUF(AuthenticateMessage(), _, _);
- EXPECT_CALL(exec, launchTask(_, _))
- .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+ Future<Nothing> registered2;
+ EXPECT_CALL(sched, registered(&driver, _, _))
+ .WillOnce(FutureSatisfy(®istered2));
- // Drop all the status updates from the slave, so that we can
- // ensure the ExitedExecutorMessage is what triggers the slave
- // shutdown.
- DROP_PROTOBUFS(StatusUpdateMessage(), _, master.get());
+ // Simulate a new master detected message to the scheduler.
+ detector.appoint(master.get());
- driver.launchTasks(offers.get()[0].id(), tasks);
+ // Scheduler should retry authentication.
+ AWAIT_READY(authenticateMessage);
- // Drop the first shutdown message from the master (simulated
- // partition) and allow the second shutdown message to pass when
- // triggered by the ExitedExecutorMessage.
- Future<ShutdownMessage> shutdownMessage =
- DROP_PROTOBUF(ShutdownMessage(), _, slave.get());
+ // Framework should get a registered callback.
+ AWAIT_READY(registered2);
- Future<TaskStatus> lostStatus;
- EXPECT_CALL(sched, statusUpdate(&driver, _))
- .WillOnce(FutureArg<1>(&lostStatus));
+ driver.stop();
+ driver.join();
- Future<Nothing> slaveLost;
- EXPECT_CALL(sched, slaveLost(&driver, _))
- .WillOnce(FutureSatisfy(&slaveLost));
-
- Clock::pause();
-
- // Now, induce a partition of the slave by having the master
- // timeout the slave.
- uint32_t pings = 0;
- while (true) {
- AWAIT_READY(ping);
- pings++;
- if (pings == master::MAX_SLAVE_PING_TIMEOUTS) {
- break;
- }
- ping = FUTURE_MESSAGE(Eq("PING"), _, _);
- Clock::advance(master::SLAVE_PING_TIMEOUT);
- Clock::settle();
- }
-
- Clock::advance(master::SLAVE_PING_TIMEOUT);
- Clock::settle();
-
- // The master will have notified the framework of the lost task.
- AWAIT_READY(lostStatus);
- EXPECT_EQ(TASK_LOST, lostStatus.get().state());
-
- // Wait for the master to attempt to shut down the slave.
- AWAIT_READY(shutdownMessage);
-
- // The master will notify the framework that the slave was lost.
- AWAIT_READY(slaveLost);
-
- shutdownMessage = FUTURE_PROTOBUF(ShutdownMessage(), _, slave.get());
-
- // Induce an ExitedExecutorMessage from the slave.
- containerizer.destroy(
- frameworkId.get(), DEFAULT_EXECUTOR_INFO.executor_id());
-
- // Upon receiving the message, the master will shutdown the slave.
- AWAIT_READY(shutdownMessage);
-
- Clock::resume();
-
- driver.stop();
- driver.join();
-
- Shutdown();
-}
-
-
-// This test ensures that a framework connecting with a
-// failed over master gets a registered callback.
-// Note that this behavior might change in the future and
-// the scheduler might receive a re-registered callback.
-TEST_F(FaultToleranceTest, MasterFailover)
-{
- Try<PID<Master> > master = StartMaster();
- ASSERT_SOME(master);
-
- MockScheduler sched;
- StandaloneMasterDetector detector(master.get());
- TestingMesosSchedulerDriver driver(&sched, &detector);
-
- Future<process::Message> frameworkRegisteredMessage =
- FUTURE_MESSAGE(Eq(FrameworkRegisteredMessage().GetTypeName()), _, _);
-
- Future<Nothing> registered1;
- EXPECT_CALL(sched, registered(&driver, _, _))
- .WillOnce(FutureSatisfy(®istered1));
-
- driver.start();
-
- AWAIT_READY(frameworkRegisteredMessage);
-
- AWAIT_READY(registered1);
-
- // Simulate failed over master by restarting the master.
- Stop(master.get());
- master = StartMaster();
- ASSERT_SOME(master);
-
- EXPECT_CALL(sched, disconnected(&driver));
-
- Future<AuthenticateMessage> authenticateMessage =
- FUTURE_PROTOBUF(AuthenticateMessage(), _, _);
-
- Future<Nothing> registered2;
- EXPECT_CALL(sched, registered(&driver, _, _))
- .WillOnce(FutureSatisfy(®istered2));
-
- // Simulate a new master detected message to the scheduler.
- detector.appoint(master.get());
-
- // Scheduler should retry authentication.
- AWAIT_READY(authenticateMessage);
-
- // Framework should get a registered callback.
- AWAIT_READY(registered2);
-
- driver.stop();
- driver.join();
-
- Shutdown();
-}
+ Shutdown();
+}
// This test ensures that a failed over master recovers completed tasks
@@ -1454,7 +1093,7 @@ TEST_F(FaultToleranceTest, SchedulerFailoverStatusUpdate)
// Now advance time enough for the reliable timeout
// to kick in and another status update is sent.
- Clock::advance(STATUS_UPDATE_RETRY_INTERVAL_MIN);
+ Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
AWAIT_READY(statusUpdate);
@@ -2045,98 +1684,6 @@ TEST_F(FaultToleranceTest, SlaveReregisterOnZKExpiration)
}
-// This test verifies that a re-registering slave sends the terminal
-// unacknowledged tasks for a terminal executor. This is required
-// for the master to correctly reconcile it's view with the slave's
-// view of tasks. This test drops a terminal update to the master
-// and then forces the slave to re-register.
-TEST_F(FaultToleranceTest, SlaveReregisterTerminatedExecutor)
-{
- Try<PID<Master> > master = StartMaster();
- ASSERT_SOME(master);
-
- MockExecutor exec(DEFAULT_EXECUTOR_ID);
- TestContainerizer containerizer(&exec);
-
- StandaloneMasterDetector detector(master.get());
-
- Try<PID<Slave> > slave = StartSlave(&containerizer, &detector);
- ASSERT_SOME(slave);
-
- MockScheduler sched;
- MesosSchedulerDriver driver(
- &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
-
- Future<FrameworkID> frameworkId;
- EXPECT_CALL(sched, registered(&driver, _, _))
- .WillOnce(FutureArg<1>(&frameworkId));
-
- EXPECT_CALL(sched, resourceOffers(&driver, _))
- .WillOnce(LaunchTasks(DEFAULT_EXECUTOR_INFO, 1, 1, 512, "*"))
- .WillRepeatedly(Return()); // Ignore subsequent offers.
-
- ExecutorDriver* execDriver;
- EXPECT_CALL(exec, registered(_, _, _, _))
- .WillOnce(SaveArg<0>(&execDriver));
-
- EXPECT_CALL(exec, launchTask(_, _))
- .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
-
- Future<TaskStatus> status;
- EXPECT_CALL(sched, statusUpdate(&driver, _))
- .WillOnce(FutureArg<1>(&status));
-
- Future<StatusUpdateAcknowledgementMessage> statusUpdateAcknowledgementMessage
- = FUTURE_PROTOBUF(
- StatusUpdateAcknowledgementMessage(), master.get(), slave.get());
-
- driver.start();
-
- AWAIT_READY(status);
- EXPECT_EQ(TASK_RUNNING, status.get().state());
-
- // Make sure the acknowledgement reaches the slave.
- AWAIT_READY(statusUpdateAcknowledgementMessage);
-
- // Drop the TASK_FINISHED status update sent to the master.
- Future<StatusUpdateMessage> statusUpdateMessage =
- DROP_PROTOBUF(StatusUpdateMessage(), _, master.get());
-
- Future<ExitedExecutorMessage> executorExitedMessage =
- FUTURE_PROTOBUF(ExitedExecutorMessage(), _, _);
-
- TaskStatus finishedStatus;
- finishedStatus = status.get();
- finishedStatus.set_state(TASK_FINISHED);
- execDriver->sendStatusUpdate(finishedStatus);
-
- // Ensure the update was sent.
- AWAIT_READY(statusUpdateMessage);
-
- // Now kill the executor.
- containerizer.destroy(frameworkId.get(), DEFAULT_EXECUTOR_ID);
-
- Future<TaskStatus> status2;
- EXPECT_CALL(sched, statusUpdate(&driver, _))
- .WillOnce(FutureArg<1>(&status2));
-
- // We drop the 'UpdateFrameworkMessage' from the master to slave to
- // stop the status update manager from retrying the update that was
- // already sent due to the new master detection.
- DROP_PROTOBUFS(UpdateFrameworkMessage(), _, _);
-
- detector.appoint(master.get());
-
- AWAIT_READY(status2);
- EXPECT_EQ(TASK_FINISHED, status2.get().state());
-
- driver.stop();
- driver.join();
-
- Shutdown();
-}
-
-
// This test ensures that a master properly handles the
// re-registration of a framework when an empty executor is present
// on a slave. This was added to prevent regressions on MESOS-1821.
@@ -2233,250 +1780,6 @@ TEST_F(FaultToleranceTest, FrameworkReregisterEmptyExecutor)
}
-// This test verifies that the master reconciles tasks that are
-// missing from a re-registering slave. In this case, we drop the
-// RunTaskMessage so the slave should send TASK_LOST.
-TEST_F(FaultToleranceTest, ReconcileLostTasks)
-{
- Try<PID<Master> > master = StartMaster();
- ASSERT_SOME(master);
-
- StandaloneMasterDetector detector(master.get());
-
- Try<PID<Slave> > slave = StartSlave(&detector);
- ASSERT_SOME(slave);
-
- MockScheduler sched;
- MesosSchedulerDriver driver(
- &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
-
- EXPECT_CALL(sched, registered(&driver, _, _));
-
- Future<vector<Offer> > offers;
- EXPECT_CALL(sched, resourceOffers(&driver, _))
- .WillOnce(FutureArg<1>(&offers))
- .WillRepeatedly(Return()); // Ignore subsequent offers.
-
- driver.start();
-
- AWAIT_READY(offers);
-
- EXPECT_NE(0u, offers.get().size());
-
- TaskInfo task;
- task.set_name("test task");
- task.mutable_task_id()->set_value("1");
- task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
- task.mutable_resources()->MergeFrom(offers.get()[0].resources());
- task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO);
-
- vector<TaskInfo> tasks;
- tasks.push_back(task);
-
- // We now launch a task and drop the corresponding RunTaskMessage on
- // the slave, to ensure that only the master knows about this task.
- Future<RunTaskMessage> runTaskMessage =
- DROP_PROTOBUF(RunTaskMessage(), _, _);
-
- driver.launchTasks(offers.get()[0].id(), tasks);
-
- AWAIT_READY(runTaskMessage);
-
- Future<SlaveReregisteredMessage> slaveReregisteredMessage =
- FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
-
- Future<StatusUpdateMessage> statusUpdateMessage =
- FUTURE_PROTOBUF(StatusUpdateMessage(), _, master.get());
-
- Future<TaskStatus> status;
- EXPECT_CALL(sched, statusUpdate(&driver, _))
- .WillOnce(FutureArg<1>(&status));
-
- // Simulate a spurious master change event (e.g., due to ZooKeeper
- // expiration) at the slave to force re-registration.
- detector.appoint(master.get());
-
- AWAIT_READY(slaveReregisteredMessage);
-
- // Make sure the slave generated the TASK_LOST.
- AWAIT_READY(statusUpdateMessage);
-
- AWAIT_READY(status);
-
- ASSERT_EQ(task.task_id(), status.get().task_id());
- ASSERT_EQ(TASK_LOST, status.get().state());
-
- driver.stop();
- driver.join();
-
- Shutdown();
-}
-
-
-// This test verifies that the slave reports pending tasks when
-// re-registering, otherwise the master will report them as being
-// lost.
-TEST_F(FaultToleranceTest, ReconcilePendingTasks)
-{
- Try<PID<Master> > master = StartMaster();
- ASSERT_SOME(master);
-
- StandaloneMasterDetector detector(master.get());
-
- Try<PID<Slave> > slave = StartSlave(&detector);
- ASSERT_SOME(slave);
-
- MockScheduler sched;
- MesosSchedulerDriver driver(
- &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
-
- EXPECT_CALL(sched, registered(&driver, _, _));
-
- Future<vector<Offer> > offers;
- EXPECT_CALL(sched, resourceOffers(&driver, _))
- .WillOnce(FutureArg<1>(&offers))
- .WillRepeatedly(Return()); // Ignore subsequent offers.
-
- driver.start();
-
- AWAIT_READY(offers);
- EXPECT_NE(0u, offers.get().size());
-
- // No TASK_LOST updates should occur!
- EXPECT_CALL(sched, statusUpdate(&driver, _))
- .Times(0);
-
- // We drop the _runTask dispatch to ensure the task remains
- // pending in the slave.
- Future<Nothing> _runTask = DROP_DISPATCH(slave.get(), &Slave::_runTask);
-
- TaskInfo task1;
- task1.set_name("test task");
- task1.mutable_task_id()->set_value("1");
- task1.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
- task1.mutable_resources()->MergeFrom(offers.get()[0].resources());
- task1.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO);
-
- vector<TaskInfo> tasks1;
- tasks1.push_back(task1);
-
- driver.launchTasks(offers.get()[0].id(), tasks1);
-
- AWAIT_READY(_runTask);
-
- Future<SlaveReregisteredMessage> slaveReregisteredMessage =
- FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
-
- // Simulate a spurious master change event (e.g., due to ZooKeeper
- // expiration) at the slave to force re-registration.
- detector.appoint(master.get());
-
- AWAIT_READY(slaveReregisteredMessage);
-
- Clock::pause();
- Clock::settle();
- Clock::resume();
-
- driver.stop();
- driver.join();
-
- Shutdown();
-}
-
-
-// This test verifies that when the slave re-registers, the master
-// does not send TASK_LOST update for a task that has reached terminal
-// state but is waiting for an acknowledgement.
-TEST_F(FaultToleranceTest, ReconcileIncompleteTasks)
-{
- Try<PID<Master> > master = StartMaster();
- ASSERT_SOME(master);
-
- MockExecutor exec(DEFAULT_EXECUTOR_ID);
-
- StandaloneMasterDetector detector(master.get());
-
- Try<PID<Slave> > slave = StartSlave(&exec, &detector);
- ASSERT_SOME(slave);
-
- MockScheduler sched;
- MesosSchedulerDriver driver(
- &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
-
- EXPECT_CALL(sched, registered(&driver, _, _));
-
- Future<vector<Offer> > offers;
- EXPECT_CALL(sched, resourceOffers(&driver, _))
- .WillOnce(FutureArg<1>(&offers))
- .WillRepeatedly(Return()); // Ignore subsequent offers.
-
- driver.start();
-
- AWAIT_READY(offers);
- EXPECT_NE(0u, offers.get().size());
-
- TaskInfo task;
- task.set_name("test task");
- task.mutable_task_id()->set_value("1");
- task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
- task.mutable_resources()->MergeFrom(offers.get()[0].resources());
- task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO);
-
- vector<TaskInfo> tasks;
- tasks.push_back(task);
-
- EXPECT_CALL(exec, registered(_, _, _, _));
-
- // Send a terminal update right away.
- EXPECT_CALL(exec, launchTask(_, _))
- .WillOnce(SendStatusUpdateFromTask(TASK_FINISHED));
-
- // Drop the status update from slave to the master, so that
- // the slave has a pending terminal update when it re-registers.
- DROP_PROTOBUF(StatusUpdateMessage(), _, master.get());
-
- Future<Nothing> _statusUpdate = FUTURE_DISPATCH(_, &Slave::_statusUpdate);
-
- Future<TaskStatus> status;
- EXPECT_CALL(sched, statusUpdate(&driver, _))
- .WillOnce(FutureArg<1>(&status))
- .WillRepeatedly(Return()); // Ignore retried update due to update framework.
-
- driver.launchTasks(offers.get()[0].id(), tasks);
-
- AWAIT_READY(_statusUpdate);
-
- Future<SlaveReregisteredMessage> slaveReregisteredMessage =
- FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
-
- // Simulate a spurious master change event (e.g., due to ZooKeeper
- // expiration) at the slave to force re-registration.
- detector.appoint(master.get());
-
- AWAIT_READY(slaveReregisteredMessage);
-
- // The master should not send a TASK_LOST after the slave
- // re-registers. We check this by calling Clock::settle() so that
- // the only update the scheduler receives is the retried
- // TASK_FINISHED update.
- // NOTE: The status update manager resends the status update when
- // it detects a new master.
- Clock::pause();
- Clock::settle();
-
- AWAIT_READY(status);
- ASSERT_EQ(TASK_FINISHED, status.get().state());
-
- EXPECT_CALL(exec, shutdown(_))
- .Times(AtMost(1));
-
- driver.stop();
- driver.join();
-
- Shutdown();
-}
-
-
// This test ensures that if a master incorrectly thinks that it is
// leading, the scheduler driver will drop messages from this master.
// Unfortunately, it is not currently possible to start more than one
http://git-wip-us.apache.org/repos/asf/mesos/blob/cd03dfa6/src/tests/master_slave_reconciliation_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_slave_reconciliation_tests.cpp b/src/tests/master_slave_reconciliation_tests.cpp
new file mode 100644
index 0000000..0e703ab
--- /dev/null
+++ b/src/tests/master_slave_reconciliation_tests.cpp
@@ -0,0 +1,401 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <gmock/gmock.h>
+
+#include <vector>
+
+#include <mesos/executor.hpp>
+#include <mesos/mesos.hpp>
+#include <mesos/scheduler.hpp>
+
+#include <process/future.hpp>
+#include <process/gmock.hpp>
+#include <process/pid.hpp>
+#include <process/process.hpp>
+#include <process/protobuf.hpp>
+
+#include "common/protobuf_utils.hpp"
+
+#include "master/allocator.hpp"
+#include "master/master.hpp"
+
+#include "slave/slave.hpp"
+
+#include "tests/containerizer.hpp"
+#include "tests/mesos.hpp"
+
+using namespace mesos;
+using namespace mesos::internal;
+using namespace mesos::internal::protobuf;
+using namespace mesos::internal::tests;
+
+using mesos::internal::master::Master;
+
+using mesos::internal::slave::Slave;
+
+using process::Clock;
+using process::Future;
+using process::Message;
+using process::PID;
+
+using std::vector;
+
+using testing::_;
+using testing::AtMost;
+using testing::Return;
+using testing::SaveArg;
+
+
+class MasterSlaveReconciliationTest : public MesosTest {};
+
+
+// This test verifies that a re-registering slave sends the terminal
+// unacknowledged tasks for a terminal executor. This is required
+// for the master to correctly reconcile it's view with the slave's
+// view of tasks. This test drops a terminal update to the master
+// and then forces the slave to re-register.
+TEST_F(MasterSlaveReconciliationTest, SlaveReregisterTerminatedExecutor)
+{
+ Try<PID<Master> > master = StartMaster();
+ ASSERT_SOME(master);
+
+ MockExecutor exec(DEFAULT_EXECUTOR_ID);
+ TestContainerizer containerizer(&exec);
+
+ StandaloneMasterDetector detector(master.get());
+
+ Try<PID<Slave> > slave = StartSlave(&containerizer, &detector);
+ ASSERT_SOME(slave);
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+ Future<FrameworkID> frameworkId;
+ EXPECT_CALL(sched, registered(&driver, _, _))
+ .WillOnce(FutureArg<1>(&frameworkId));
+
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(LaunchTasks(DEFAULT_EXECUTOR_INFO, 1, 1, 512, "*"))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ ExecutorDriver* execDriver;
+ EXPECT_CALL(exec, registered(_, _, _, _))
+ .WillOnce(SaveArg<0>(&execDriver));
+
+ EXPECT_CALL(exec, launchTask(_, _))
+ .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+
+ Future<TaskStatus> status;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&status));
+
+ Future<StatusUpdateAcknowledgementMessage> statusUpdateAcknowledgementMessage
+ = FUTURE_PROTOBUF(
+ StatusUpdateAcknowledgementMessage(), master.get(), slave.get());
+
+ driver.start();
+
+ AWAIT_READY(status);
+ EXPECT_EQ(TASK_RUNNING, status.get().state());
+
+ // Make sure the acknowledgement reaches the slave.
+ AWAIT_READY(statusUpdateAcknowledgementMessage);
+
+ // Drop the TASK_FINISHED status update sent to the master.
+ Future<StatusUpdateMessage> statusUpdateMessage =
+ DROP_PROTOBUF(StatusUpdateMessage(), _, master.get());
+
+ Future<ExitedExecutorMessage> executorExitedMessage =
+ FUTURE_PROTOBUF(ExitedExecutorMessage(), _, _);
+
+ TaskStatus finishedStatus;
+ finishedStatus = status.get();
+ finishedStatus.set_state(TASK_FINISHED);
+ execDriver->sendStatusUpdate(finishedStatus);
+
+ // Ensure the update was sent.
+ AWAIT_READY(statusUpdateMessage);
+
+ // Now kill the executor.
+ containerizer.destroy(frameworkId.get(), DEFAULT_EXECUTOR_ID);
+
+ Future<TaskStatus> status2;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&status2));
+
+ // We drop the 'UpdateFrameworkMessage' from the master to slave to
+ // stop the status update manager from retrying the update that was
+ // already sent due to the new master detection.
+ DROP_PROTOBUFS(UpdateFrameworkMessage(), _, _);
+
+ detector.appoint(master.get());
+
+ AWAIT_READY(status2);
+ EXPECT_EQ(TASK_FINISHED, status2.get().state());
+
+ driver.stop();
+ driver.join();
+
+ Shutdown();
+}
+
+
+// This test verifies that the master reconciles tasks that are
+// missing from a re-registering slave. In this case, we drop the
+// RunTaskMessage so the slave should send TASK_LOST.
+TEST_F(MasterSlaveReconciliationTest, ReconcileLostTask)
+{
+ Try<PID<Master> > master = StartMaster();
+ ASSERT_SOME(master);
+
+ StandaloneMasterDetector detector(master.get());
+
+ Try<PID<Slave> > slave = StartSlave(&detector);
+ ASSERT_SOME(slave);
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+ EXPECT_CALL(sched, registered(&driver, _, _));
+
+ Future<vector<Offer> > offers;
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ driver.start();
+
+ AWAIT_READY(offers);
+
+ EXPECT_NE(0u, offers.get().size());
+
+ TaskInfo task;
+ task.set_name("test task");
+ task.mutable_task_id()->set_value("1");
+ task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
+ task.mutable_resources()->MergeFrom(offers.get()[0].resources());
+ task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO);
+
+ vector<TaskInfo> tasks;
+ tasks.push_back(task);
+
+ // We now launch a task and drop the corresponding RunTaskMessage on
+ // the slave, to ensure that only the master knows about this task.
+ Future<RunTaskMessage> runTaskMessage =
+ DROP_PROTOBUF(RunTaskMessage(), _, _);
+
+ driver.launchTasks(offers.get()[0].id(), tasks);
+
+ AWAIT_READY(runTaskMessage);
+
+ Future<SlaveReregisteredMessage> slaveReregisteredMessage =
+ FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
+
+ Future<StatusUpdateMessage> statusUpdateMessage =
+ FUTURE_PROTOBUF(StatusUpdateMessage(), _, master.get());
+
+ Future<TaskStatus> status;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&status));
+
+ // Simulate a spurious master change event (e.g., due to ZooKeeper
+ // expiration) at the slave to force re-registration.
+ detector.appoint(master.get());
+
+ AWAIT_READY(slaveReregisteredMessage);
+
+ // Make sure the slave generated the TASK_LOST.
+ AWAIT_READY(statusUpdateMessage);
+
+ AWAIT_READY(status);
+
+ ASSERT_EQ(task.task_id(), status.get().task_id());
+ ASSERT_EQ(TASK_LOST, status.get().state());
+
+ driver.stop();
+ driver.join();
+
+ Shutdown();
+}
+
+
+// This test verifies that the slave reports pending tasks when
+// re-registering, otherwise the master will report them as being
+// lost.
+TEST_F(MasterSlaveReconciliationTest, SlaveReregisterPendingTask)
+{
+ Try<PID<Master> > master = StartMaster();
+ ASSERT_SOME(master);
+
+ StandaloneMasterDetector detector(master.get());
+
+ Try<PID<Slave> > slave = StartSlave(&detector);
+ ASSERT_SOME(slave);
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+ EXPECT_CALL(sched, registered(&driver, _, _));
+
+ Future<vector<Offer> > offers;
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ driver.start();
+
+ AWAIT_READY(offers);
+ EXPECT_NE(0u, offers.get().size());
+
+ // No TASK_LOST updates should occur!
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .Times(0);
+
+ // We drop the _runTask dispatch to ensure the task remains
+ // pending in the slave.
+ Future<Nothing> _runTask = DROP_DISPATCH(slave.get(), &Slave::_runTask);
+
+ TaskInfo task1;
+ task1.set_name("test task");
+ task1.mutable_task_id()->set_value("1");
+ task1.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
+ task1.mutable_resources()->MergeFrom(offers.get()[0].resources());
+ task1.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO);
+
+ vector<TaskInfo> tasks1;
+ tasks1.push_back(task1);
+
+ driver.launchTasks(offers.get()[0].id(), tasks1);
+
+ AWAIT_READY(_runTask);
+
+ Future<SlaveReregisteredMessage> slaveReregisteredMessage =
+ FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
+
+ // Simulate a spurious master change event (e.g., due to ZooKeeper
+ // expiration) at the slave to force re-registration.
+ detector.appoint(master.get());
+
+ AWAIT_READY(slaveReregisteredMessage);
+
+ Clock::pause();
+ Clock::settle();
+ Clock::resume();
+
+ driver.stop();
+ driver.join();
+
+ Shutdown();
+}
+
+
+// This test verifies that when the slave re-registers, the master
+// does not send TASK_LOST update for a task that has reached terminal
+// state but is waiting for an acknowledgement.
+TEST_F(MasterSlaveReconciliationTest, SlaveReregisterTerminalTask)
+{
+ Try<PID<Master> > master = StartMaster();
+ ASSERT_SOME(master);
+
+ MockExecutor exec(DEFAULT_EXECUTOR_ID);
+
+ StandaloneMasterDetector detector(master.get());
+
+ Try<PID<Slave> > slave = StartSlave(&exec, &detector);
+ ASSERT_SOME(slave);
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+ EXPECT_CALL(sched, registered(&driver, _, _));
+
+ Future<vector<Offer> > offers;
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ driver.start();
+
+ AWAIT_READY(offers);
+ EXPECT_NE(0u, offers.get().size());
+
+ TaskInfo task;
+ task.set_name("test task");
+ task.mutable_task_id()->set_value("1");
+ task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
+ task.mutable_resources()->MergeFrom(offers.get()[0].resources());
+ task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO);
+
+ vector<TaskInfo> tasks;
+ tasks.push_back(task);
+
+ EXPECT_CALL(exec, registered(_, _, _, _));
+
+ // Send a terminal update right away.
+ EXPECT_CALL(exec, launchTask(_, _))
+ .WillOnce(SendStatusUpdateFromTask(TASK_FINISHED));
+
+ // Drop the status update from slave to the master, so that
+ // the slave has a pending terminal update when it re-registers.
+ DROP_PROTOBUF(StatusUpdateMessage(), _, master.get());
+
+ Future<Nothing> _statusUpdate = FUTURE_DISPATCH(_, &Slave::_statusUpdate);
+
+ Future<TaskStatus> status;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&status))
+ .WillRepeatedly(Return()); // Ignore retried update due to update framework.
+
+ driver.launchTasks(offers.get()[0].id(), tasks);
+
+ AWAIT_READY(_statusUpdate);
+
+ Future<SlaveReregisteredMessage> slaveReregisteredMessage =
+ FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
+
+ // Simulate a spurious master change event (e.g., due to ZooKeeper
+ // expiration) at the slave to force re-registration.
+ detector.appoint(master.get());
+
+ AWAIT_READY(slaveReregisteredMessage);
+
+ // The master should not send a TASK_LOST after the slave
+ // re-registers. We check this by calling Clock::settle() so that
+ // the only update the scheduler receives is the retried
+ // TASK_FINISHED update.
+ // NOTE: The status update manager resends the status update when
+ // it detects a new master.
+ Clock::pause();
+ Clock::settle();
+
+ AWAIT_READY(status);
+ ASSERT_EQ(TASK_FINISHED, status.get().state());
+
+ EXPECT_CALL(exec, shutdown(_))
+ .Times(AtMost(1));
+
+ driver.stop();
+ driver.join();
+
+ Shutdown();
+}
http://git-wip-us.apache.org/repos/asf/mesos/blob/cd03dfa6/src/tests/partition_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/partition_tests.cpp b/src/tests/partition_tests.cpp
index 8136a95..0dc1a92 100644
--- a/src/tests/partition_tests.cpp
+++ b/src/tests/partition_tests.cpp
@@ -18,7 +18,7 @@
#include <gmock/gmock.h>
-#include <string>
+#include <vector>
#include <process/clock.hpp>
#include <process/future.hpp>
@@ -33,6 +33,7 @@
#include "slave/flags.hpp"
#include "slave/slave.hpp"
+#include "tests/containerizer.hpp"
#include "tests/flags.hpp"
#include "tests/mesos.hpp"
@@ -51,13 +52,373 @@ using process::Future;
using process::Message;
using process::PID;
+using std::vector;
+
using testing::_;
+using testing::AtMost;
using testing::Eq;
+using testing::Return;
class PartitionTest : public MesosTest {};
+// This test checks that a scheduler gets a slave lost
+// message for a partioned slave.
+TEST_F(PartitionTest, PartitionedSlave)
+{
+ Try<PID<Master> > master = StartMaster();
+ ASSERT_SOME(master);
+
+ // Set these expectations up before we spawn the slave so that we
+ // don't miss the first PING.
+ Future<Message> ping = FUTURE_MESSAGE(Eq("PING"), _, _);
+
+ // Drop all the PONGs to simulate slave partition.
+ DROP_MESSAGES(Eq("PONG"), _, _);
+
+ Try<PID<Slave> > slave = StartSlave();
+ ASSERT_SOME(slave);
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+ EXPECT_CALL(sched, registered(&driver, _, _));
+
+ Future<Nothing> resourceOffers;
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureSatisfy(&resourceOffers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ driver.start();
+
+ // Need to make sure the framework AND slave have registered with
+ // master. Waiting for resource offers should accomplish both.
+ AWAIT_READY(resourceOffers);
+
+ Clock::pause();
+
+ EXPECT_CALL(sched, offerRescinded(&driver, _))
+ .Times(AtMost(1));
+
+ Future<Nothing> slaveLost;
+ EXPECT_CALL(sched, slaveLost(&driver, _))
+ .WillOnce(FutureSatisfy(&slaveLost));
+
+ // Now advance through the PINGs.
+ uint32_t pings = 0;
+ while (true) {
+ AWAIT_READY(ping);
+ pings++;
+ if (pings == master::MAX_SLAVE_PING_TIMEOUTS) {
+ break;
+ }
+ ping = FUTURE_MESSAGE(Eq("PING"), _, _);
+ Clock::advance(master::SLAVE_PING_TIMEOUT);
+ }
+
+ Clock::advance(master::SLAVE_PING_TIMEOUT);
+
+ AWAIT_READY(slaveLost);
+
+ driver.stop();
+ driver.join();
+
+ Shutdown();
+
+ Clock::resume();
+}
+
+
+// The purpose of this test is to ensure that when slaves are removed
+// from the master, and then attempt to re-register, we deny the
+// re-registration by sending a ShutdownMessage to the slave.
+// Why? Because during a network partition, the master will remove a
+// partitioned slave, thus sending its tasks to LOST. At this point,
+// when the partition is removed, the slave will attempt to
+// re-register with its running tasks. We've already notified
+// frameworks that these tasks were LOST, so we have to have the slave
+// slave shut down.
+TEST_F(PartitionTest, PartitionedSlaveReregistration)
+{
+ Try<PID<Master> > master = StartMaster();
+ ASSERT_SOME(master);
+
+ // Allow the master to PING the slave, but drop all PONG messages
+ // from the slave. Note that we don't match on the master / slave
+ // PIDs because it's actually the SlaveObserver Process that sends
+ // the pings.
+ Future<Message> ping = FUTURE_MESSAGE(Eq("PING"), _, _);
+ DROP_MESSAGES(Eq("PONG"), _, _);
+
+ MockExecutor exec(DEFAULT_EXECUTOR_ID);
+
+ StandaloneMasterDetector detector(master.get());
+
+ Try<PID<Slave> > slave = StartSlave(&exec, &detector);
+ ASSERT_SOME(slave);
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+ EXPECT_CALL(sched, registered(&driver, _, _));
+
+ Future<vector<Offer> > offers;
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return());
+
+ driver.start();
+
+ AWAIT_READY(offers);
+ ASSERT_NE(0u, offers.get().size());
+
+ // Launch a task. This is to ensure the task is killed by the slave,
+ // during shutdown.
+ TaskID taskId;
+ taskId.set_value("1");
+
+ TaskInfo task;
+ task.set_name("");
+ task.mutable_task_id()->MergeFrom(taskId);
+ task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
+ task.mutable_resources()->MergeFrom(offers.get()[0].resources());
+ task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO);
+ task.mutable_executor()->mutable_command()->set_value("sleep 60");
+
+ vector<TaskInfo> tasks;
+ tasks.push_back(task);
+
+ // Set up the expectations for launching the task.
+ EXPECT_CALL(exec, registered(_, _, _, _));
+ EXPECT_CALL(exec, launchTask(_, _))
+ .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+
+ Future<TaskStatus> runningStatus;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&runningStatus));
+
+ Future<Nothing> statusUpdateAck = FUTURE_DISPATCH(
+ slave.get(), &Slave::_statusUpdateAcknowledgement);
+
+ driver.launchTasks(offers.get()[0].id(), tasks);
+
+ AWAIT_READY(runningStatus);
+ EXPECT_EQ(TASK_RUNNING, runningStatus.get().state());
+
+ // Wait for the slave to have handled the acknowledgment prior
+ // to pausing the clock.
+ AWAIT_READY(statusUpdateAck);
+
+ // Drop the first shutdown message from the master (simulated
+ // partition), allow the second shutdown message to pass when
+ // the slave re-registers.
+ Future<ShutdownMessage> shutdownMessage =
+ DROP_PROTOBUF(ShutdownMessage(), _, slave.get());
+
+ Future<TaskStatus> lostStatus;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&lostStatus));
+
+ Future<Nothing> slaveLost;
+ EXPECT_CALL(sched, slaveLost(&driver, _))
+ .WillOnce(FutureSatisfy(&slaveLost));
+
+ Clock::pause();
+
+ // Now, induce a partition of the slave by having the master
+ // timeout the slave.
+ uint32_t pings = 0;
+ while (true) {
+ AWAIT_READY(ping);
+ pings++;
+ if (pings == master::MAX_SLAVE_PING_TIMEOUTS) {
+ break;
+ }
+ ping = FUTURE_MESSAGE(Eq("PING"), _, _);
+ Clock::advance(master::SLAVE_PING_TIMEOUT);
+ Clock::settle();
+ }
+
+ Clock::advance(master::SLAVE_PING_TIMEOUT);
+ Clock::settle();
+
+ // The master will have notified the framework of the lost task.
+ AWAIT_READY(lostStatus);
+ EXPECT_EQ(TASK_LOST, lostStatus.get().state());
+
+ // Wait for the master to attempt to shut down the slave.
+ AWAIT_READY(shutdownMessage);
+
+ // The master will notify the framework that the slave was lost.
+ AWAIT_READY(slaveLost);
+
+ Clock::resume();
+
+ // We now complete the partition on the slave side as well. This
+ // is done by simulating a master loss event which would normally
+ // occur during a network partition.
+ detector.appoint(None());
+
+ Future<Nothing> shutdown;
+ EXPECT_CALL(exec, shutdown(_))
+ .WillOnce(FutureSatisfy(&shutdown));
+
+ shutdownMessage = FUTURE_PROTOBUF(ShutdownMessage(), _, slave.get());
+
+ // Have the slave re-register with the master.
+ detector.appoint(master.get());
+
+ // Upon re-registration, the master will shutdown the slave.
+ // The slave will then shut down the executor.
+ AWAIT_READY(shutdownMessage);
+ AWAIT_READY(shutdown);
+
+ driver.stop();
+ driver.join();
+
+ Shutdown();
+}
+
+
+// The purpose of this test is to ensure that when slaves are removed
+// from the master, and then attempt to send exited executor messages,
+// we send a ShutdownMessage to the slave. Why? Because during a
+// network partition, the master will remove a partitioned slave, thus
+// sending its tasks to LOST. At this point, when the partition is
+// removed, the slave may attempt to send exited executor messages if
+// it was unaware that the master removed it. We've already
+// notified frameworks that the tasks under the executors were LOST,
+// so we have to have the slave shut down.
+TEST_F(PartitionTest, PartitionedSlaveExitedExecutor)
+{
+ Try<PID<Master> > master = StartMaster();
+ ASSERT_SOME(master);
+
+ // Allow the master to PING the slave, but drop all PONG messages
+ // from the slave. Note that we don't match on the master / slave
+ // PIDs because it's actually the SlaveObserver Process that sends
+ // the pings.
+ Future<Message> ping = FUTURE_MESSAGE(Eq("PING"), _, _);
+ DROP_MESSAGES(Eq("PONG"), _, _);
+
+ MockExecutor exec(DEFAULT_EXECUTOR_ID);
+ TestContainerizer containerizer(&exec);
+
+ Try<PID<Slave> > slave = StartSlave(&containerizer);
+ ASSERT_SOME(slave);
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, DEFAULT_FRAMEWORK_INFO, master.get(), DEFAULT_CREDENTIAL);
+
+ Future<FrameworkID> frameworkId;
+ EXPECT_CALL(sched, registered(&driver, _, _))
+ .WillOnce(FutureArg<1>(&frameworkId));\
+
+ Future<vector<Offer> > offers;
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return());
+
+ driver.start();
+
+ AWAIT_READY(frameworkId);
+ AWAIT_READY(offers);
+ ASSERT_NE(0u, offers.get().size());
+
+ // Launch a task. This allows us to have the slave send an
+ // ExitedExecutorMessage.
+ TaskID taskId;
+ taskId.set_value("1");
+
+ TaskInfo task;
+ task.set_name("");
+ task.mutable_task_id()->MergeFrom(taskId);
+ task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
+ task.mutable_resources()->MergeFrom(offers.get()[0].resources());
+ task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO);
+ task.mutable_executor()->mutable_command()->set_value("sleep 60");
+
+ vector<TaskInfo> tasks;
+ tasks.push_back(task);
+
+ // Set up the expectations for launching the task.
+ EXPECT_CALL(exec, registered(_, _, _, _));
+
+ EXPECT_CALL(exec, launchTask(_, _))
+ .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+
+ // Drop all the status updates from the slave, so that we can
+ // ensure the ExitedExecutorMessage is what triggers the slave
+ // shutdown.
+ DROP_PROTOBUFS(StatusUpdateMessage(), _, master.get());
+
+ driver.launchTasks(offers.get()[0].id(), tasks);
+
+ // Drop the first shutdown message from the master (simulated
+ // partition) and allow the second shutdown message to pass when
+ // triggered by the ExitedExecutorMessage.
+ Future<ShutdownMessage> shutdownMessage =
+ DROP_PROTOBUF(ShutdownMessage(), _, slave.get());
+
+ Future<TaskStatus> lostStatus;
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&lostStatus));
+
+ Future<Nothing> slaveLost;
+ EXPECT_CALL(sched, slaveLost(&driver, _))
+ .WillOnce(FutureSatisfy(&slaveLost));
+
+ Clock::pause();
+
+ // Now, induce a partition of the slave by having the master
+ // timeout the slave.
+ uint32_t pings = 0;
+ while (true) {
+ AWAIT_READY(ping);
+ pings++;
+ if (pings == master::MAX_SLAVE_PING_TIMEOUTS) {
+ break;
+ }
+ ping = FUTURE_MESSAGE(Eq("PING"), _, _);
+ Clock::advance(master::SLAVE_PING_TIMEOUT);
+ Clock::settle();
+ }
+
+ Clock::advance(master::SLAVE_PING_TIMEOUT);
+ Clock::settle();
+
+ // The master will have notified the framework of the lost task.
+ AWAIT_READY(lostStatus);
+ EXPECT_EQ(TASK_LOST, lostStatus.get().state());
+
+ // Wait for the master to attempt to shut down the slave.
+ AWAIT_READY(shutdownMessage);
+
+ // The master will notify the framework that the slave was lost.
+ AWAIT_READY(slaveLost);
+
+ shutdownMessage = FUTURE_PROTOBUF(ShutdownMessage(), _, slave.get());
+
+ // Induce an ExitedExecutorMessage from the slave.
+ containerizer.destroy(
+ frameworkId.get(), DEFAULT_EXECUTOR_INFO.executor_id());
+
+ // Upon receiving the message, the master will shutdown the slave.
+ AWAIT_READY(shutdownMessage);
+
+ Clock::resume();
+
+ driver.stop();
+ driver.join();
+
+ Shutdown();
+}
+
+
// This test verifies that if master --> slave socket closes and the
// slave is not aware of it (i.e., one way network partition), slave
// will re-register with the master.