You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by gr...@apache.org on 2018/02/24 03:07:19 UTC

[1/2] mesos git commit: Fixed a master API bug for agent re-registration after master failover.

Repository: mesos
Updated Branches:
  refs/heads/master 28ab19b11 -> b4e210678


Fixed a master API bug for agent re-registration after master failover.

When the master fails over and a client subscribes to the master before
agent re-registration, the master will crash when sending `TASK_ADDED`
because the framework info might not have been added to the master yet.
This patch fixes this bug.

Review: https://reviews.apache.org/r/65774/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/f2ec2b28
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/f2ec2b28
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/f2ec2b28

Branch: refs/heads/master
Commit: f2ec2b288e823424b2efe71d62ef90101b7a863f
Parents: 28ab19b
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
Authored: Fri Feb 23 18:37:12 2018 -0800
Committer: Greg Mann <gr...@gmail.com>
Committed: Fri Feb 23 18:37:12 2018 -0800

----------------------------------------------------------------------
 src/master/master.cpp | 98 ++++++++++++++++------------------------------
 src/master/master.hpp | 15 +++++--
 2 files changed, 46 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/f2ec2b28/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index e7d5ac6..f2c2dbe 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -10241,8 +10241,18 @@ void Master::updateTask(Task* task, const StatusUpdate& update)
   task->mutable_statuses(task->statuses_size() - 1)->clear_data();
 
   if (sendSubscribersUpdate && !subscribers.subscribed.empty()) {
-    subscribers.send(protobuf::master::event::createTaskUpdated(
-        *task, task->state(), status));
+    // If the framework has been removed, the task would have already
+    // transitioned to `TASK_KILLED` by `removeFramework()`, thus
+    // `sendSubscribersUpdate` shouldn't have been set to true.
+    // TODO(chhsiao): This may be changed after MESOS-6608 is resolved.
+    Framework* framework = getFramework(task->framework_id());
+    CHECK_NOTNULL(framework);
+
+    subscribers.send(
+        protobuf::master::event::createTaskUpdated(
+            *task, task->state(), status),
+        framework->info,
+        *task);
   }
 
   LOG(INFO) << "Updating the state of task " << task->task_id()
@@ -11185,57 +11195,25 @@ static bool isValidFailoverTimeout(const FrameworkInfo& frameworkInfo)
 }
 
 
-void Master::Subscribers::send(mesos::master::Event&& event)
+void Master::Subscribers::send(
+    mesos::master::Event&& event,
+    const Option<FrameworkInfo>& frameworkInfo,
+    const Option<Task>& task)
 {
   VLOG(1) << "Notifying all active subscribers about " << event.type()
           << " event";
 
-  Option<Shared<FrameworkInfo>> frameworkInfo;
-  Option<Shared<Task>> task;
-
-  // Copy metadata associated with the event if necessary, so that we capture
-  // the current state before we make asynchronous calls below.
-  switch (event.type()) {
-    case mesos::master::Event::TASK_ADDED: {
-      Framework* framework =
-        master->getFramework(event.task_added().task().framework_id());
-
-      CHECK_NOTNULL(framework);
-
-      frameworkInfo = Shared<FrameworkInfo>(new FrameworkInfo(framework->info));
-      break;
-    }
-    case mesos::master::Event::TASK_UPDATED: {
-      Framework* framework =
-        master->getFramework(event.task_updated().framework_id());
-
-      CHECK_NOTNULL(framework);
-
-      frameworkInfo = Shared<FrameworkInfo>(new FrameworkInfo(framework->info));
-
-      Task* storedTask =
-        framework->getTask(event.task_updated().status().task_id());
-
-      CHECK_NOTNULL(storedTask);
-
-      task = Shared<Task>(new Task(*storedTask));
-      break;
-    }
-    case mesos::master::Event::FRAMEWORK_ADDED:
-    case mesos::master::Event::FRAMEWORK_UPDATED:
-    case mesos::master::Event::FRAMEWORK_REMOVED:
-    case mesos::master::Event::AGENT_ADDED:
-    case mesos::master::Event::AGENT_REMOVED:
-    case mesos::master::Event::SUBSCRIBED:
-    case mesos::master::Event::HEARTBEAT:
-    case mesos::master::Event::UNKNOWN:
-      break;
-  }
-
   // Create a single copy of the event for all subscribers to share.
   Shared<mesos::master::Event> sharedEvent(
       new mesos::master::Event(std::move(event)));
 
+  // Create a single copy of `FrameworkInfo` and `Task` for all
+  // subscribers to share.
+  Shared<FrameworkInfo> sharedFrameworkInfo(
+      frameworkInfo.isSome()
+        ? new FrameworkInfo(frameworkInfo.get()) : nullptr);
+  Shared<Task> sharedTask(task.isSome() ? new Task(task.get()) : nullptr);
+
   foreachvalue (const Owned<Subscriber>& subscriber, subscribed) {
     Future<Owned<AuthorizationAcceptor>> authorizeRole =
       AuthorizationAcceptor::create(
@@ -11283,8 +11261,8 @@ void Master::Subscribers::send(mesos::master::Event&& event)
             authorizeFramework,
             authorizeTask,
             authorizeExecutor,
-            frameworkInfo,
-            task);
+            sharedFrameworkInfo,
+            sharedTask);
 
         return Nothing();
       }));
@@ -11298,30 +11276,26 @@ void Master::Subscribers::Subscriber::send(
     const Owned<AuthorizationAcceptor>& authorizeFramework,
     const Owned<AuthorizationAcceptor>& authorizeTask,
     const Owned<AuthorizationAcceptor>& authorizeExecutor,
-    const Option<Shared<FrameworkInfo>>& frameworkInfo,
-    const Option<Shared<Task>>& task)
+    const Shared<FrameworkInfo>& frameworkInfo,
+    const Shared<Task>& task)
 {
   switch (event->type()) {
     case mesos::master::Event::TASK_ADDED: {
-      CHECK_SOME(frameworkInfo);
-      CHECK_NOTNULL(&frameworkInfo.get());
+      CHECK_NOTNULL(frameworkInfo.get());
 
       if (authorizeTask->accept(
-              event->task_added().task(), *frameworkInfo.get()) &&
-          authorizeFramework->accept(*frameworkInfo.get())) {
+              event->task_added().task(), *frameworkInfo) &&
+          authorizeFramework->accept(*frameworkInfo)) {
         http.send<mesos::master::Event, v1::master::Event>(*event);
       }
       break;
     }
     case mesos::master::Event::TASK_UPDATED: {
-      CHECK_SOME(frameworkInfo);
-      CHECK_NOTNULL(&frameworkInfo.get());
+      CHECK_NOTNULL(frameworkInfo.get());
+      CHECK_NOTNULL(task.get());
 
-      CHECK_SOME(task);
-      CHECK_NOTNULL(&task.get());
-
-      if (authorizeTask->accept(*task.get(), *frameworkInfo.get()) &&
-          authorizeFramework->accept(*frameworkInfo.get())) {
+      if (authorizeTask->accept(*task, *frameworkInfo) &&
+          authorizeFramework->accept(*frameworkInfo)) {
         http.send<mesos::master::Event, v1::master::Event>(*event);
       }
       break;
@@ -11553,10 +11527,6 @@ void Slave::addTask(Task* task)
     usedResources[frameworkId] += resources;
   }
 
-  if (!master->subscribers.subscribed.empty()) {
-    master->subscribers.send(protobuf::master::event::createTaskAdded(*task));
-  }
-
   // Note that we use `Resources` for output as it's faster than
   // logging raw protobuf data.
   LOG(INFO) << "Adding task " << taskId

http://git-wip-us.apache.org/repos/asf/mesos/blob/f2ec2b28/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 6569c9e..1fadbe6 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -2004,8 +2004,8 @@ private:
           const process::Owned<AuthorizationAcceptor>& authorizeFramework,
           const process::Owned<AuthorizationAcceptor>& authorizeTask,
           const process::Owned<AuthorizationAcceptor>& authorizeExecutor,
-          const Option<process::Shared<FrameworkInfo>>& frameworkInfo,
-          const Option<process::Shared<Task>>& task);
+          const process::Shared<FrameworkInfo>& frameworkInfo,
+          const process::Shared<Task>& task);
 
       ~Subscriber()
       {
@@ -2026,7 +2026,10 @@ private:
     };
 
     // Sends the event to all subscribers connected to the 'api/vX' endpoint.
-    void send(mesos::master::Event&& event);
+    void send(
+        mesos::master::Event&& event,
+        const Option<FrameworkInfo>& frameworkInfo = None(),
+        const Option<Task>& task = None());
 
     Master* master;
 
@@ -2274,6 +2277,12 @@ struct Framework
         trackUnderRole(role);
       }
     }
+
+    if (!master->subscribers.subscribed.empty()) {
+      master->subscribers.send(
+          protobuf::master::event::createTaskAdded(*task),
+          info);
+    }
   }
 
   // Update framework to recover the resources that were previously


[2/2] mesos git commit: Added a master API test for agent re-registration after master failover.

Posted by gr...@apache.org.
Added a master API test for agent re-registration after master failover.

This test verifies that subscribing to the 'api/v1' endpoint between a
master failover and an agent re-registration won't cause the master to
crash.

Review: https://reviews.apache.org/r/65775/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b4e21067
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b4e21067
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b4e21067

Branch: refs/heads/master
Commit: b4e210678c04e57c2fa9f277b44f6d011da1846a
Parents: f2ec2b2
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
Authored: Fri Feb 23 18:37:17 2018 -0800
Committer: Greg Mann <gr...@gmail.com>
Committed: Fri Feb 23 18:37:17 2018 -0800

----------------------------------------------------------------------
 src/tests/api_tests.cpp | 170 +++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 170 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/b4e21067/src/tests/api_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/api_tests.cpp b/src/tests/api_tests.cpp
index 31906eb..9c172f7 100644
--- a/src/tests/api_tests.cpp
+++ b/src/tests/api_tests.cpp
@@ -94,6 +94,7 @@ using process::Promise;
 
 using recordio::Decoder;
 
+using std::set;
 using std::string;
 using std::tuple;
 using std::vector;
@@ -2389,6 +2390,175 @@ TEST_P(MasterAPITest, Subscribe)
 }
 
 
+// This test verifies that subscribing to the 'api/v1' endpoint between
+// a master failover and an agent re-registration won't cause the master
+// to crash. See MESOS-8601.
+TEST_P(MasterAPITest, MasterFailover)
+{
+  ContentType contentType = GetParam();
+
+  Clock::pause();
+
+  master::Flags masterFlags = CreateMasterFlags();
+  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  StandaloneMasterDetector detector(master.get()->pid);
+
+  slave::Flags slaveFlags = CreateSlaveFlags();
+  Try<Owned<cluster::Slave>> slave = StartSlave(&detector, slaveFlags);
+  ASSERT_SOME(slave);
+
+  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+  // Advance the clock to trigger agent registration.
+  Clock::advance(slaveFlags.registration_backoff_factor);
+
+  AWAIT_READY(slaveRegisteredMessage);
+
+  auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
+
+  EXPECT_CALL(*scheduler, connected(_))
+    .WillOnce(v1::scheduler::SendSubscribe(v1::DEFAULT_FRAMEWORK_INFO));
+
+  Future<v1::scheduler::Event::Subscribed> subscribed;
+  EXPECT_CALL(*scheduler, subscribed(_, _))
+    .WillOnce(FutureArg<1>(&subscribed));
+
+  EXPECT_CALL(*scheduler, heartbeat(_))
+    .WillRepeatedly(Return()); // Ignore heartbeats.
+
+  Future<v1::scheduler::Event::Offers> offers;
+  EXPECT_CALL(*scheduler, offers(_, _))
+    .WillOnce(FutureArg<1>(&offers));
+
+  Owned<v1::scheduler::TestMesos> mesos(new v1::scheduler::TestMesos(
+      master.get()->pid,
+      contentType,
+      scheduler));
+
+  AWAIT_READY(subscribed);
+  const v1::FrameworkID& frameworkId = subscribed->framework_id();
+
+  // Settle the clock to get the first offer.
+  Clock::settle();
+
+  AWAIT_READY(offers);
+  ASSERT_FALSE(offers->offers().empty());
+  const v1::AgentID& agentId = offers->offers(0).agent_id();
+
+  // Launch a task using the scheduler. This should result in a
+  // `TASK_STARTING` followed by a `TASK_RUNNING` status update.
+  Future<v1::scheduler::Event::Update> taskStarting;
+  Future<v1::scheduler::Event::Update> taskRunning;
+  EXPECT_CALL(*scheduler, update(_, _))
+    .WillOnce(DoAll(
+        FutureArg<1>(&taskStarting),
+        v1::scheduler::SendAcknowledge(frameworkId, agentId)))
+    .WillOnce(DoAll(
+        FutureArg<1>(&taskRunning),
+        v1::scheduler::SendAcknowledge(frameworkId, agentId)));
+
+  v1::TaskInfo task = v1::createTask(
+      agentId,
+      offers->offers(0).resources(),
+      "sleep 1000");
+
+  mesos->send(v1::createCallAccept(
+      frameworkId,
+      offers->offers(0),
+      {v1::LAUNCH({task})}));
+
+  AWAIT_READY(taskStarting);
+  EXPECT_EQ(v1::TASK_STARTING, taskStarting->status().state());
+
+  AWAIT_READY(taskRunning);
+  EXPECT_EQ(v1::TASK_RUNNING, taskRunning->status().state());
+
+  // Stop the scheduler and shutdown the master.
+  mesos.reset();
+  master->reset();
+
+  // Restart the master.
+  master = StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  detector.appoint(master.get()->pid);
+
+  // Create event stream after the master failover but before the agent
+  // re-registration. We should see no framework, agent, task and
+  // executor at all.
+  v1::master::Call v1Call;
+  v1Call.set_type(v1::master::Call::SUBSCRIBE);
+
+  http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+
+  headers["Accept"] = stringify(contentType);
+
+  Future<http::Response> response = http::streaming::post(
+      master.get()->pid,
+      "api/v1",
+      headers,
+      serialize(contentType, v1Call),
+      stringify(contentType));
+
+  AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response);
+  AWAIT_EXPECT_RESPONSE_HEADER_EQ("chunked", "Transfer-Encoding", response);
+  ASSERT_EQ(http::Response::PIPE, response->type);
+  ASSERT_SOME(response->reader);
+
+  http::Pipe::Reader reader = response->reader.get();
+
+  auto deserializer =
+    lambda::bind(deserialize<v1::master::Event>, contentType, lambda::_1);
+
+  Reader<v1::master::Event> decoder(
+      Decoder<v1::master::Event>(deserializer), reader);
+
+  Future<Result<v1::master::Event>> event = decoder.read();
+  AWAIT_READY(event);
+
+  EXPECT_EQ(v1::master::Event::SUBSCRIBED, event->get().type());
+  const v1::master::Response::GetState& getState =
+      event->get().subscribed().get_state();
+
+  EXPECT_EQ(0, getState.get_frameworks().frameworks_size());
+  EXPECT_EQ(0, getState.get_agents().agents_size());
+  EXPECT_EQ(0, getState.get_tasks().tasks_size());
+  EXPECT_EQ(0, getState.get_executors().executors_size());
+
+  event = decoder.read();
+  AWAIT_READY(event);
+
+  EXPECT_EQ(v1::master::Event::HEARTBEAT, event->get().type());
+
+  Future<SlaveReregisteredMessage> slaveReregisteredMessage =
+    FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
+
+  // Advance the clock to trigger agent re-registration.
+  Clock::advance(slaveFlags.registration_backoff_factor);
+
+  AWAIT_READY(slaveReregisteredMessage);
+
+  // The agent re-registration should result in an `AGENT_ADDED` event
+  // and a `TASK_ADDED` event.
+  set<v1::master::Event::Type> expectedEvents =
+    {v1::master::Event::AGENT_ADDED, v1::master::Event::TASK_ADDED};
+  set<v1::master::Event::Type> observedEvents;
+
+  event = decoder.read();
+  AWAIT_READY(event);
+  observedEvents.insert(event->get().type());
+
+  event = decoder.read();
+  AWAIT_READY(event);
+  observedEvents.insert(event->get().type());
+
+  EXPECT_EQ(expectedEvents, observedEvents);
+}
+
+
 // Verifies that operators subscribed to the master's operator API event
 // stream only receive events that they are authorized to see.
 TEST_P(MasterAPITest, EventAuthorizationFiltering)