You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2016/10/03 23:47:40 UTC

[2/3] mesos git commit: Fixed bug with unreachable tasks and disconnected frameworks.

Fixed bug with unreachable tasks and disconnected frameworks.

We previously assumed that when marking a task unreachable, we would
have access to the `FrameworkInfo` for that task's framework. However,
that is not the case if the master has failed over and the framework has
not yet reregistered with the new master.

Review: https://reviews.apache.org/r/52174/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/58be12f1
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/58be12f1
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/58be12f1

Branch: refs/heads/master
Commit: 58be12f1167774c4c398bfa67c9f7518c8178189
Parents: c83882d
Author: Neil Conway <ne...@gmail.com>
Authored: Mon Oct 3 16:45:27 2016 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Mon Oct 3 16:47:20 2016 -0700

----------------------------------------------------------------------
 src/master/master.cpp         |  23 +++++-
 src/tests/partition_tests.cpp | 158 +++++++++++++++++++++++++++++++++++++
 2 files changed, 178 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/58be12f1/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 032571e..02a2fb2 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -5949,11 +5949,23 @@ void Master::_markUnreachable(
   // PARTITION_AWARE capability.
   foreachkey (const FrameworkID& frameworkId, utils::copy(slave->tasks)) {
     Framework* framework = getFramework(frameworkId);
-    CHECK_NOTNULL(framework);
+
+    // If the framework has not yet re-registered after master failover,
+    // its FrameworkInfo will be in the `recovered` collection. Note that
+    // if the master knows about a task, its FrameworkInfo must appear in
+    // either the `registered` or `recovered` collections.
+    FrameworkInfo frameworkInfo;
+
+    if (framework == nullptr) {
+      CHECK(frameworks.recovered.contains(frameworkId));
+      frameworkInfo = frameworks.recovered[frameworkId];
+    } else {
+      frameworkInfo = framework->info;
+    }
 
     TaskState newTaskState = TASK_UNREACHABLE;
     if (!protobuf::frameworkHasCapability(
-            framework->info, FrameworkInfo::Capability::PARTITION_AWARE)) {
+            frameworkInfo, FrameworkInfo::Capability::PARTITION_AWARE)) {
       newTaskState = TASK_LOST;
     }
 
@@ -5977,7 +5989,12 @@ void Master::_markUnreachable(
       updateTask(task, update);
       removeTask(task);
 
-      forward(update, UPID(), framework);
+      if (framework == nullptr) {
+        LOG(WARNING) << "Dropping update " << update
+                     << " for unknown framework " << frameworkId;
+      } else {
+        forward(update, UPID(), framework);
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/58be12f1/src/tests/partition_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/partition_tests.cpp b/src/tests/partition_tests.cpp
index 7c38f0e..c88017c 100644
--- a/src/tests/partition_tests.cpp
+++ b/src/tests/partition_tests.cpp
@@ -60,6 +60,7 @@ using process::Owned;
 using process::PID;
 using process::Promise;
 using process::Time;
+using process::UPID;
 
 using process::http::OK;
 using process::http::Response;
@@ -884,6 +885,163 @@ TEST_F(PartitionTest, PartitionedSlaveOrphanedTask)
 }
 
 
+// This test checks that the master handles a slave that becomes
+// partitioned while running a task that belongs to a disconnected
+// framework.
+TEST_F(PartitionTest, DisconnectedFramework)
+{
+  master::Flags masterFlags = CreateMasterFlags();
+  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  StandaloneMasterDetector detector(master.get()->pid);
+  Try<Owned<cluster::Slave>> slave = StartSlave(&detector);
+  ASSERT_SOME(slave);
+
+  FrameworkInfo frameworkInfo1 = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo1.set_failover_timeout(Weeks(2).secs());
+
+  MockScheduler sched1;
+  MesosSchedulerDriver driver1(
+      &sched1, frameworkInfo1, master.get()->pid, DEFAULT_CREDENTIAL);
+
+  Future<FrameworkID> frameworkId;
+  EXPECT_CALL(sched1, registered(&driver1, _, _))
+    .WillOnce(FutureArg<1>(&frameworkId));
+
+  Future<vector<Offer>> offers1;
+  EXPECT_CALL(sched1, resourceOffers(&driver1, _))
+    .WillOnce(FutureArg<1>(&offers1));
+
+  driver1.start();
+
+  AWAIT_READY(frameworkId);
+  AWAIT_READY(offers1);
+
+  ASSERT_FALSE(offers1.get().empty());
+  Offer offer = offers1.get()[0];
+
+  // Launch `task` using `sched1`.
+  TaskInfo task = createTask(offer, "sleep 60");
+
+  Future<TaskStatus> runningStatus;
+  EXPECT_CALL(sched1, statusUpdate(&driver1, _))
+    .WillOnce(FutureArg<1>(&runningStatus));
+
+  Future<Nothing> statusUpdateAck1 = FUTURE_DISPATCH(
+      slave.get()->pid, &Slave::_statusUpdateAcknowledgement);
+
+  driver1.launchTasks(offer.id(), {task});
+
+  AWAIT_READY(runningStatus);
+  EXPECT_EQ(TASK_RUNNING, runningStatus.get().state());
+  EXPECT_EQ(task.task_id(), runningStatus.get().task_id());
+
+  const SlaveID slaveId = runningStatus.get().slave_id();
+
+  AWAIT_READY(statusUpdateAck1);
+
+  // Shutdown the master.
+  master->reset();
+
+  // Stop the framework while the master is down.
+  driver1.stop();
+  driver1.join();
+
+  // Restart the master.
+  master = StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  // Allow the master to PING the slave, but drop all PONG messages
+  // from the slave. Note that we don't match on the master / slave
+  // PIDs because it's actually the `SlaveObserver` process that sends
+  // the pings.
+  Future<Message> ping = FUTURE_MESSAGE(
+      Eq(PingSlaveMessage().GetTypeName()), _, _);
+
+  DROP_PROTOBUFS(PongSlaveMessage(), _, _);
+
+  // Notify the slave about the new master and wait for it to reregister.
+  Future<SlaveReregisteredMessage> slaveReregistered = FUTURE_PROTOBUF(
+      SlaveReregisteredMessage(), master.get()->pid, slave.get()->pid);
+
+  detector.appoint(master.get()->pid);
+
+  AWAIT_READY(slaveReregistered);
+
+  Clock::pause();
+
+  // Cause the slave to be partitioned from the master.
+  size_t pings = 0;
+  while (true) {
+    AWAIT_READY(ping);
+    pings++;
+    if (pings == masterFlags.max_agent_ping_timeouts) {
+      break;
+    }
+    ping = FUTURE_MESSAGE(Eq(PingSlaveMessage().GetTypeName()), _, _);
+    Clock::advance(masterFlags.agent_ping_timeout);
+  }
+
+  Clock::advance(masterFlags.agent_ping_timeout);
+  Clock::settle();
+
+  // Failover to a new scheduler instance using the same framework ID.
+  FrameworkInfo frameworkInfo2 = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo2.mutable_id()->MergeFrom(frameworkId.get());
+  frameworkInfo2.set_failover_timeout(frameworkInfo1.failover_timeout());
+
+  // TODO(neilc): We need to introduce an inner scope here to ensure
+  // the `MesosSchedulerDriver` gets destroyed before we fetch
+  // metrics, to workaround MESOS-6231.
+  {
+    MockScheduler sched2;
+    MesosSchedulerDriver driver2(
+        &sched2, frameworkInfo2, master.get()->pid, DEFAULT_CREDENTIAL);
+
+    Future<Nothing> registered2;
+    EXPECT_CALL(sched2, registered(&driver2, _, _))
+      .WillOnce(FutureSatisfy(&registered2));
+
+    driver2.start();
+
+    AWAIT_READY(registered2);
+
+    // Perform explicit reconciliation.
+    TaskStatus status;
+    status.mutable_task_id()->CopyFrom(task.task_id());
+    status.mutable_slave_id()->CopyFrom(slaveId);
+    status.set_state(TASK_STAGING); // Dummy value.
+
+    Future<TaskStatus> reconcileUpdate;
+    EXPECT_CALL(sched2, statusUpdate(&driver2, _))
+      .WillOnce(FutureArg<1>(&reconcileUpdate));
+
+    driver2.reconcileTasks({status});
+
+    AWAIT_READY(reconcileUpdate);
+    EXPECT_EQ(TASK_LOST, reconcileUpdate.get().state());
+    EXPECT_EQ(TaskStatus::REASON_RECONCILIATION,
+              reconcileUpdate.get().reason());
+    EXPECT_TRUE(reconcileUpdate.get().has_unreachable_time());
+
+    Clock::resume();
+
+    driver2.stop();
+    driver2.join();
+  }
+
+  JSON::Object stats = Metrics();
+  EXPECT_EQ(0, stats.values["master/tasks_unreachable"]);
+  EXPECT_EQ(1, stats.values["master/tasks_lost"]);
+  EXPECT_EQ(1, stats.values["master/slave_unreachable_scheduled"]);
+  EXPECT_EQ(1, stats.values["master/slave_unreachable_completed"]);
+  EXPECT_EQ(1, stats.values["master/slave_removals"]);
+  EXPECT_EQ(1, stats.values["master/slave_removals/reason_unhealthy"]);
+  EXPECT_EQ(0, stats.values["master/slave_removals/reason_unregistered"]);
+}
+
+
 // This test checks that when a registered slave reregisters with the
 // master (e.g., because of a spurious Zk leader flag at the slave),
 // the master does not kill any tasks on the slave, even if those