You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2016/10/03 23:47:40 UTC
[2/3] mesos git commit: Fixed bug with unreachable tasks and
disconnected frameworks.
Fixed bug with unreachable tasks and disconnected frameworks.
We previously assumed that when marking a task unreachable, we would
have access to the `FrameworkInfo` for that task's framework. However,
that is not the case if the master has failed over and the framework has
not yet reregistered with the new master.
Review: https://reviews.apache.org/r/52174/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/58be12f1
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/58be12f1
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/58be12f1
Branch: refs/heads/master
Commit: 58be12f1167774c4c398bfa67c9f7518c8178189
Parents: c83882d
Author: Neil Conway <ne...@gmail.com>
Authored: Mon Oct 3 16:45:27 2016 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Mon Oct 3 16:47:20 2016 -0700
----------------------------------------------------------------------
src/master/master.cpp | 23 +++++-
src/tests/partition_tests.cpp | 158 +++++++++++++++++++++++++++++++++++++
2 files changed, 178 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/58be12f1/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 032571e..02a2fb2 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -5949,11 +5949,23 @@ void Master::_markUnreachable(
// PARTITION_AWARE capability.
foreachkey (const FrameworkID& frameworkId, utils::copy(slave->tasks)) {
Framework* framework = getFramework(frameworkId);
- CHECK_NOTNULL(framework);
+
+ // If the framework has not yet re-registered after master failover,
+ // its FrameworkInfo will be in the `recovered` collection. Note that
+ // if the master knows about a task, its FrameworkInfo must appear in
+ // either the `registered` or `recovered` collections.
+ FrameworkInfo frameworkInfo;
+
+ if (framework == nullptr) {
+ CHECK(frameworks.recovered.contains(frameworkId));
+ frameworkInfo = frameworks.recovered[frameworkId];
+ } else {
+ frameworkInfo = framework->info;
+ }
TaskState newTaskState = TASK_UNREACHABLE;
if (!protobuf::frameworkHasCapability(
- framework->info, FrameworkInfo::Capability::PARTITION_AWARE)) {
+ frameworkInfo, FrameworkInfo::Capability::PARTITION_AWARE)) {
newTaskState = TASK_LOST;
}
@@ -5977,7 +5989,12 @@ void Master::_markUnreachable(
updateTask(task, update);
removeTask(task);
- forward(update, UPID(), framework);
+ if (framework == nullptr) {
+ LOG(WARNING) << "Dropping update " << update
+ << " for unknown framework " << frameworkId;
+ } else {
+ forward(update, UPID(), framework);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/58be12f1/src/tests/partition_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/partition_tests.cpp b/src/tests/partition_tests.cpp
index 7c38f0e..c88017c 100644
--- a/src/tests/partition_tests.cpp
+++ b/src/tests/partition_tests.cpp
@@ -60,6 +60,7 @@ using process::Owned;
using process::PID;
using process::Promise;
using process::Time;
+using process::UPID;
using process::http::OK;
using process::http::Response;
@@ -884,6 +885,163 @@ TEST_F(PartitionTest, PartitionedSlaveOrphanedTask)
}
+// This test checks that the master handles a slave that becomes
+// partitioned while running a task that belongs to a disconnected
+// framework.
+TEST_F(PartitionTest, DisconnectedFramework)
+{
+ master::Flags masterFlags = CreateMasterFlags();
+ Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+ ASSERT_SOME(master);
+
+ StandaloneMasterDetector detector(master.get()->pid);
+ Try<Owned<cluster::Slave>> slave = StartSlave(&detector);
+ ASSERT_SOME(slave);
+
+ FrameworkInfo frameworkInfo1 = DEFAULT_FRAMEWORK_INFO;
+ frameworkInfo1.set_failover_timeout(Weeks(2).secs());
+
+ MockScheduler sched1;
+ MesosSchedulerDriver driver1(
+ &sched1, frameworkInfo1, master.get()->pid, DEFAULT_CREDENTIAL);
+
+ Future<FrameworkID> frameworkId;
+ EXPECT_CALL(sched1, registered(&driver1, _, _))
+ .WillOnce(FutureArg<1>(&frameworkId));
+
+ Future<vector<Offer>> offers1;
+ EXPECT_CALL(sched1, resourceOffers(&driver1, _))
+ .WillOnce(FutureArg<1>(&offers1));
+
+ driver1.start();
+
+ AWAIT_READY(frameworkId);
+ AWAIT_READY(offers1);
+
+ ASSERT_FALSE(offers1.get().empty());
+ Offer offer = offers1.get()[0];
+
+ // Launch `task` using `sched1`.
+ TaskInfo task = createTask(offer, "sleep 60");
+
+ Future<TaskStatus> runningStatus;
+ EXPECT_CALL(sched1, statusUpdate(&driver1, _))
+ .WillOnce(FutureArg<1>(&runningStatus));
+
+ Future<Nothing> statusUpdateAck1 = FUTURE_DISPATCH(
+ slave.get()->pid, &Slave::_statusUpdateAcknowledgement);
+
+ driver1.launchTasks(offer.id(), {task});
+
+ AWAIT_READY(runningStatus);
+ EXPECT_EQ(TASK_RUNNING, runningStatus.get().state());
+ EXPECT_EQ(task.task_id(), runningStatus.get().task_id());
+
+ const SlaveID slaveId = runningStatus.get().slave_id();
+
+ AWAIT_READY(statusUpdateAck1);
+
+ // Shutdown the master.
+ master->reset();
+
+ // Stop the framework while the master is down.
+ driver1.stop();
+ driver1.join();
+
+ // Restart the master.
+ master = StartMaster(masterFlags);
+ ASSERT_SOME(master);
+
+ // Allow the master to PING the slave, but drop all PONG messages
+ // from the slave. Note that we don't match on the master / slave
+ // PIDs because it's actually the `SlaveObserver` process that sends
+ // the pings.
+ Future<Message> ping = FUTURE_MESSAGE(
+ Eq(PingSlaveMessage().GetTypeName()), _, _);
+
+ DROP_PROTOBUFS(PongSlaveMessage(), _, _);
+
+ // Notify the slave about the new master and wait for it to reregister.
+ Future<SlaveReregisteredMessage> slaveReregistered = FUTURE_PROTOBUF(
+ SlaveReregisteredMessage(), master.get()->pid, slave.get()->pid);
+
+ detector.appoint(master.get()->pid);
+
+ AWAIT_READY(slaveReregistered);
+
+ Clock::pause();
+
+ // Cause the slave to be partitioned from the master.
+ size_t pings = 0;
+ while (true) {
+ AWAIT_READY(ping);
+ pings++;
+ if (pings == masterFlags.max_agent_ping_timeouts) {
+ break;
+ }
+ ping = FUTURE_MESSAGE(Eq(PingSlaveMessage().GetTypeName()), _, _);
+ Clock::advance(masterFlags.agent_ping_timeout);
+ }
+
+ Clock::advance(masterFlags.agent_ping_timeout);
+ Clock::settle();
+
+ // Failover to a new scheduler instance using the same framework ID.
+ FrameworkInfo frameworkInfo2 = DEFAULT_FRAMEWORK_INFO;
+ frameworkInfo2.mutable_id()->MergeFrom(frameworkId.get());
+ frameworkInfo2.set_failover_timeout(frameworkInfo1.failover_timeout());
+
+ // TODO(neilc): We need to introduce an inner scope here to ensure
+ // the `MesosSchedulerDriver` gets destroyed before we fetch
+ // metrics, to workaround MESOS-6231.
+ {
+ MockScheduler sched2;
+ MesosSchedulerDriver driver2(
+ &sched2, frameworkInfo2, master.get()->pid, DEFAULT_CREDENTIAL);
+
+ Future<Nothing> registered2;
+ EXPECT_CALL(sched2, registered(&driver2, _, _))
+ .WillOnce(FutureSatisfy(®istered2));
+
+ driver2.start();
+
+ AWAIT_READY(registered2);
+
+ // Perform explicit reconciliation.
+ TaskStatus status;
+ status.mutable_task_id()->CopyFrom(task.task_id());
+ status.mutable_slave_id()->CopyFrom(slaveId);
+ status.set_state(TASK_STAGING); // Dummy value.
+
+ Future<TaskStatus> reconcileUpdate;
+ EXPECT_CALL(sched2, statusUpdate(&driver2, _))
+ .WillOnce(FutureArg<1>(&reconcileUpdate));
+
+ driver2.reconcileTasks({status});
+
+ AWAIT_READY(reconcileUpdate);
+ EXPECT_EQ(TASK_LOST, reconcileUpdate.get().state());
+ EXPECT_EQ(TaskStatus::REASON_RECONCILIATION,
+ reconcileUpdate.get().reason());
+ EXPECT_TRUE(reconcileUpdate.get().has_unreachable_time());
+
+ Clock::resume();
+
+ driver2.stop();
+ driver2.join();
+ }
+
+ JSON::Object stats = Metrics();
+ EXPECT_EQ(0, stats.values["master/tasks_unreachable"]);
+ EXPECT_EQ(1, stats.values["master/tasks_lost"]);
+ EXPECT_EQ(1, stats.values["master/slave_unreachable_scheduled"]);
+ EXPECT_EQ(1, stats.values["master/slave_unreachable_completed"]);
+ EXPECT_EQ(1, stats.values["master/slave_removals"]);
+ EXPECT_EQ(1, stats.values["master/slave_removals/reason_unhealthy"]);
+ EXPECT_EQ(0, stats.values["master/slave_removals/reason_unregistered"]);
+}
+
+
// This test checks that when a registered slave reregisters with the
// master (e.g., because of a spurious Zk leader flag at the slave),
// the master does not kill any tasks on the slave, even if those