You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by mp...@apache.org on 2017/04/28 03:16:13 UTC

[3/3] mesos git commit: MESOS-7323: Made `addSlave` not activate any frameworks.

MESOS-7323: Made `addSlave` not activate any frameworks.

Review: https://reviews.apache.org/r/58689/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/72752fc6
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/72752fc6
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/72752fc6

Branch: refs/heads/master
Commit: 72752fc6deb8ebcbfbd5448dc599ef3774339d31
Parents: 6ace0a7
Author: Michael Park <mp...@apache.org>
Authored: Thu Apr 27 20:15:07 2017 -0700
Committer: Michael Park <mp...@apache.org>
Committed: Thu Apr 27 20:15:07 2017 -0700

----------------------------------------------------------------------
 src/master/allocator/mesos/hierarchical.cpp |   3 -
 src/tests/master_tests.cpp                  | 233 +++++++++++++++++++++++
 src/tests/upgrade_tests.cpp                 |   1 +
 3 files changed, 234 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/72752fc6/src/master/allocator/mesos/hierarchical.cpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/mesos/hierarchical.cpp b/src/master/allocator/mesos/hierarchical.cpp
index 968fec0..84dc31d 100644
--- a/src/master/allocator/mesos/hierarchical.cpp
+++ b/src/master/allocator/mesos/hierarchical.cpp
@@ -473,9 +473,6 @@ void HierarchicalAllocatorProcess::addSlave(
       // track the framework under the role.
       if (!isFrameworkTrackedUnderRole(frameworkId, role)) {
         trackFrameworkUnderRole(frameworkId, role);
-
-        CHECK(frameworkSorters.contains(role));
-        frameworkSorters.at(role)->activate(frameworkId.value());
       }
 
       // TODO(bmahler): Validate that the reserved resources have the

http://git-wip-us.apache.org/repos/asf/mesos/blob/72752fc6/src/tests/master_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_tests.cpp b/src/tests/master_tests.cpp
index d1828eb..7cb4774 100644
--- a/src/tests/master_tests.cpp
+++ b/src/tests/master_tests.cpp
@@ -6779,6 +6779,239 @@ TEST_F(MasterTest, TaskWithTinyResources)
   driver.join();
 }
 
+
+// This test ensures that when a paritioned agent comes back with tasks that
+// are allocated to a role that a framework is no longer subscribed to,
+// the framework is re-tracked under the role, but still does not receive
+// any offers with resources allocated to that role.
+TEST_F(MasterTest, MultiRoleSchedulerUnsubscribeFromRole)
+{
+  // Manipulate the clock manually.
+  Clock::pause();
+
+  master::Flags masterFlags = CreateMasterFlags();
+  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  // Allow the master to PING the agent, but drop all PONG messages
+  // from the agent. Note that we don't match on the master / agent
+  // PIDs because it's actually the `SlaveObserver` process that sends
+  // the pings.
+  Future<Message> ping = FUTURE_MESSAGE(
+      Eq(PingSlaveMessage().GetTypeName()), _, _);
+
+  DROP_PROTOBUFS(PongSlaveMessage(), _, _);
+
+  StandaloneMasterDetector detector(master.get()->pid);
+
+  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+  slave::Flags agentFlags = CreateSlaveFlags();
+  agentFlags.resources = "cpus:2;mem:2048";
+
+  Try<Owned<cluster::Slave>> agent = StartSlave(&detector, agentFlags);
+  ASSERT_SOME(agent);
+
+  Clock::advance(agentFlags.registration_backoff_factor);
+
+  AWAIT_READY(slaveRegisteredMessage);
+  SlaveID agentId = slaveRegisteredMessage->slave_id();
+
+  // Start a scheduler. The scheduler has the PARTITION_AWARE
+  // capability, so we expect its tasks to continue running when the
+  // partitioned agent reregisters.
+  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.clear_role();
+  frameworkInfo.add_roles("foo");
+  frameworkInfo.add_capabilities()->set_type(
+      FrameworkInfo::Capability::PARTITION_AWARE);
+  frameworkInfo.add_capabilities()->set_type(
+      FrameworkInfo::Capability::MULTI_ROLE);
+
+  MockScheduler sched1;
+  MesosSchedulerDriver driver1(
+      &sched1, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
+
+  Future<FrameworkID> frameworkId;
+  EXPECT_CALL(sched1, registered(&driver1, _, _))
+    .WillOnce(FutureArg<1>(&frameworkId));
+
+  Future<vector<Offer>> offers;
+  EXPECT_CALL(sched1, resourceOffers(&driver1, _))
+    .WillOnce(FutureArg<1>(&offers));
+
+  driver1.start();
+
+  Clock::advance(masterFlags.allocation_interval);
+
+  AWAIT_READY(frameworkId);
+  AWAIT_READY(offers);
+  ASSERT_EQ(1u, offers->size());
+
+  Resources resources = Resources::parse("cpus:1;mem:512").get();
+
+  Offer offer = offers.get()[0];
+
+  TaskInfo task = createTask(offer.slave_id(), resources, "sleep 60");
+
+  Future<TaskStatus> runningStatus;
+  EXPECT_CALL(sched1, statusUpdate(&driver1, _))
+    .WillOnce(FutureArg<1>(&runningStatus));
+
+  driver1.launchTasks(offer.id(), {task});
+
+  AWAIT_READY(runningStatus);
+  EXPECT_EQ(TASK_RUNNING, runningStatus->state());
+  EXPECT_EQ(task.task_id(), runningStatus->task_id());
+
+  // Remove the role from the framework.
+
+  frameworkInfo.mutable_id()->CopyFrom(frameworkId.get());
+  frameworkInfo.clear_roles();
+
+  MockScheduler sched2;
+  MesosSchedulerDriver driver2(
+      &sched2, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
+
+  Future<Nothing> registered2;
+  EXPECT_CALL(sched2, registered(&driver2, frameworkId.get(), _))
+    .WillOnce(FutureSatisfy(&registered2));
+
+  Future<UpdateFrameworkMessage> updateFrameworkMessage =
+    FUTURE_PROTOBUF(UpdateFrameworkMessage(), _, _);
+
+  // Scheduler1 should get an error due to failover.
+  EXPECT_CALL(sched1, error(&driver1, "Framework failed over"));
+
+  // Expect that there will be no resource offers made to the scheduler.
+  EXPECT_CALL(sched2, resourceOffers(&driver2, _)).Times(0);
+
+  driver2.start();
+
+  Clock::advance(masterFlags.allocation_interval);
+
+  AWAIT_READY(registered2);
+
+  // Wait for the agent to get the updated framework info.
+  AWAIT_READY(updateFrameworkMessage);
+
+  driver1.stop();
+  driver1.join();
+
+  // Now, induce a partition of the slave by having the master
+  // timeout the slave.
+  Future<TaskStatus> unreachableStatus;
+  EXPECT_CALL(sched2, statusUpdate(&driver2, _))
+    .WillOnce(FutureArg<1>(&unreachableStatus));
+
+  // We expect to get a `slaveLost` callback, even though this
+  // scheduler is partition-aware.
+  Future<Nothing> agentLost;
+  EXPECT_CALL(sched2, slaveLost(&driver2, _))
+    .WillOnce(FutureSatisfy(&agentLost));
+
+  size_t pings = 0;
+  while (true) {
+    AWAIT_READY(ping);
+    pings++;
+    if (pings == masterFlags.max_agent_ping_timeouts) {
+      break;
+    }
+    ping = FUTURE_MESSAGE(Eq(PingSlaveMessage().GetTypeName()), _, _);
+    Clock::advance(masterFlags.agent_ping_timeout);
+  }
+
+  Clock::advance(masterFlags.agent_ping_timeout);
+
+  AWAIT_READY(unreachableStatus);
+  EXPECT_EQ(TASK_UNREACHABLE, unreachableStatus->state());
+  EXPECT_EQ(TaskStatus::REASON_SLAVE_REMOVED, unreachableStatus->reason());
+  EXPECT_EQ(task.task_id(), unreachableStatus->task_id());
+  EXPECT_EQ(agentId, unreachableStatus->slave_id());
+
+  AWAIT_READY(agentLost);
+
+  // Check that the framework is not tracked under the role.
+  {
+    Future<Response> response = process::http::get(
+        master.get()->pid,
+        "roles",
+        None(),
+        createBasicAuthHeaders(DEFAULT_CREDENTIAL));
+
+    AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
+    AWAIT_EXPECT_RESPONSE_HEADER_EQ(APPLICATION_JSON, "Content-Type", response);
+
+    Try<JSON::Object> parse = JSON::parse<JSON::Object>(response->body);
+    ASSERT_SOME(parse);
+
+    const JSON::Object& result = parse.get();
+
+    JSON::Object expected = {
+      {"roles", JSON::Array{}}
+    };
+
+    EXPECT_EQ(expected, result);
+  }
+
+  // We now complete the partition on the agent side as well. We simulate
+  // a master loss event, which would normally happen during a network
+  // partition. The slave should then reregister with the master.
+  // The master will then re-track the framework under the role, but the
+  // framework should not receive any resources allocated to the role.
+  detector.appoint(None());
+
+  Future<SlaveReregisteredMessage> agentReregistered = FUTURE_PROTOBUF(
+      SlaveReregisteredMessage(), master.get()->pid, agent.get()->pid);
+
+  detector.appoint(master.get()->pid);
+
+  Clock::advance(agentFlags.registration_backoff_factor);
+
+  AWAIT_READY(agentReregistered);
+
+  // Check that the framework is re-tracked under the role by the master.
+  {
+    Future<Response> response = process::http::get(
+        master.get()->pid,
+        "roles",
+        None(),
+        createBasicAuthHeaders(DEFAULT_CREDENTIAL));
+
+    AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
+    AWAIT_EXPECT_RESPONSE_HEADER_EQ(APPLICATION_JSON, "Content-Type", response);
+
+    Try<JSON::Object> parse = JSON::parse<JSON::Object>(response->body);
+    ASSERT_SOME(parse);
+
+    JSON::Value result = parse.get();
+
+    JSON::Object expected = {
+      {
+        "roles",
+        JSON::Array {
+          JSON::Object {
+            { "name", "foo" },
+            { "frameworks", JSON::Array { frameworkId.get().value() } }
+          }
+        }
+      }
+    };
+
+    EXPECT_TRUE(result.contains(expected));
+  }
+
+  // Ensure allocations to be made.
+  Clock::advance(masterFlags.allocation_interval);
+
+  Clock::settle();
+  Clock::resume();
+
+  driver2.stop();
+  driver2.join();
+}
+
 } // namespace tests {
 } // namespace internal {
 } // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/72752fc6/src/tests/upgrade_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/upgrade_tests.cpp b/src/tests/upgrade_tests.cpp
index b5a28b5..03ca360 100644
--- a/src/tests/upgrade_tests.cpp
+++ b/src/tests/upgrade_tests.cpp
@@ -69,6 +69,7 @@ using std::initializer_list;
 using std::vector;
 
 using testing::_;
+using testing::Eq;
 using testing::AtMost;
 
 namespace mesos {