You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by an...@apache.org on 2017/09/29 20:31:34 UTC

[01/11] mesos git commit: Added the ability to prune the gone agent list from the registry.

Repository: mesos
Updated Branches:
  refs/heads/master 6eefc685c -> 3515cd0a9


Added the ability to prune the gone agent list from the registry.

This change uses the existing master options `register_max_agent_age`
and `registry_max_agent_count` for GC'ing the list of gone agents
in the registry.

Review: https://reviews.apache.org/r/62554/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3515cd0a
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3515cd0a
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3515cd0a

Branch: refs/heads/master
Commit: 3515cd0a94f257d8d2257075ccb4be8e26e2bbc4
Parents: 11dbb52
Author: Anand Mazumdar <an...@apache.org>
Authored: Fri Sep 29 11:58:53 2017 -0700
Committer: Anand Mazumdar <an...@apache.org>
Committed: Fri Sep 29 13:31:08 2017 -0700

----------------------------------------------------------------------
 src/master/flags.cpp          |  12 ++--
 src/master/master.cpp         |  95 ++++++++++++++++----------
 src/master/master.hpp         |  61 ++++++++++++-----
 src/tests/master_tests.cpp    | 133 +++++++++++++++++++++++++++++++++++++
 src/tests/partition_tests.cpp |  18 ++---
 src/tests/registrar_tests.cpp |  40 +++++++++--
 6 files changed, 286 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/3515cd0a/src/master/flags.cpp
----------------------------------------------------------------------
diff --git a/src/master/flags.cpp b/src/master/flags.cpp
index fa6d274..18f405b 100644
--- a/src/master/flags.cpp
+++ b/src/master/flags.cpp
@@ -591,18 +591,18 @@ mesos::internal::master::Flags::Flags()
       "Maximum length of time to store information in the registry about\n"
       "agents that are not currently connected to the cluster. This\n"
       "information allows frameworks to determine the status of unreachable\n"
-      "and removed agents. Note that the registry always stores information\n"
-      "on all connected agents. If there are more than\n"
-      "`registry_max_agent_count` partitioned or removed agents, agent\n"
+      "and gone agents. Note that the registry always stores\n"
+      "information on all connected agents. If there are more than\n"
+      "`registry_max_agent_count` partitioned/gone agents, agent\n"
       "information may be discarded from the registry sooner than indicated\n"
       "by this parameter.",
       DEFAULT_REGISTRY_MAX_AGENT_AGE);
 
   add(&Flags::registry_max_agent_count,
       "registry_max_agent_count",
-      "Maximum number of disconnected agents to store in the registry.\n"
-      "This information allows frameworks to determine the status of\n"
-      "disconnected agents. Note that the registry always stores\n"
+      "Maximum number of partitioned/gone agents to store in the\n"
+      "registry. This information allows frameworks to determine the status\n"
+      "of disconnected agents. Note that the registry always stores\n"
       "information about all connected agents. See also the\n"
       "`registry_max_agent_age` flag.",
       DEFAULT_REGISTRY_MAX_AGENT_COUNT);

http://git-wip-us.apache.org/repos/asf/mesos/blob/3515cd0a/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 0e0fb1d..b40621e 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -1848,69 +1848,81 @@ void Master::doRegistryGc()
   // concurrently). In this situation, we skip removing any elements
   // we don't find.
 
-  size_t unreachableCount = slaves.unreachable.size();
-  TimeInfo currentTime = protobuf::getCurrentTime();
-  hashset<SlaveID> toRemove;
-
-  foreachpair (const SlaveID& slave,
-               const TimeInfo& unreachableTime,
-               slaves.unreachable) {
-    // Count-based GC.
-    CHECK(toRemove.size() <= unreachableCount);
-
-    size_t liveCount = unreachableCount - toRemove.size();
-    if (liveCount > flags.registry_max_agent_count) {
-      toRemove.insert(slave);
-      continue;
-    }
+  auto prune = [this](const LinkedHashMap<SlaveID, TimeInfo>& slaves) {
+    size_t count = slaves.size();
+    TimeInfo currentTime = protobuf::getCurrentTime();
+    hashset<SlaveID> toRemove;
+
+    foreachpair (const SlaveID& slave,
+                 const TimeInfo& removalTime,
+                 slaves) {
+      // Count-based GC.
+      CHECK(toRemove.size() <= count);
+
+      size_t liveCount = count - toRemove.size();
+      if (liveCount > flags.registry_max_agent_count) {
+        toRemove.insert(slave);
+        continue;
+      }
 
-    // Age-based GC.
-    Duration age = Nanoseconds(
-        currentTime.nanoseconds() - unreachableTime.nanoseconds());
+      // Age-based GC.
+      Duration age = Nanoseconds(
+          currentTime.nanoseconds() - removalTime.nanoseconds());
 
-    if (age > flags.registry_max_agent_age) {
-      toRemove.insert(slave);
+      if (age > flags.registry_max_agent_age) {
+        toRemove.insert(slave);
+      }
     }
-  }
 
-  if (toRemove.empty()) {
+    return toRemove;
+  };
+
+  hashset<SlaveID> toRemoveUnreachable = prune(slaves.unreachable);
+  hashset<SlaveID> toRemoveGone = prune(slaves.gone);
+
+  if (toRemoveUnreachable.empty() && toRemoveGone.empty()) {
     VLOG(1) << "Skipping periodic registry garbage collection: "
             << "no agents qualify for removal";
     return;
   }
 
-  VLOG(1) << "Attempting to remove " << toRemove.size()
-          << " unreachable agents from the registry";
+  VLOG(1) << "Attempting to remove " << toRemoveUnreachable.size()
+          << " unreachable and " << toRemoveGone.size()
+          << " gone agents from the registry";
 
-  registrar->apply(Owned<Operation>(new PruneUnreachable(toRemove)))
+  registrar->apply(Owned<Operation>(
+      new Prune(toRemoveUnreachable, toRemoveGone)))
     .onAny(defer(self(),
                  &Self::_doRegistryGc,
-                 toRemove,
+                 toRemoveUnreachable,
+                 toRemoveGone,
                  lambda::_1));
 }
 
 
 void Master::_doRegistryGc(
-    const hashset<SlaveID>& toRemove,
+    const hashset<SlaveID>& toRemoveUnreachable,
+    const hashset<SlaveID>& toRemoveGone,
     const Future<bool>& registrarResult)
 {
   CHECK(!registrarResult.isDiscarded());
   CHECK(!registrarResult.isFailed());
 
-  // `PruneUnreachable` registry operation should never fail.
+  // `Prune` registry operation should never fail.
   CHECK(registrarResult.get());
 
   // Update in-memory state to be consistent with registry changes. If
   // there was a concurrent registry operation that also modified the
-  // unreachable list (e.g., an agent in `toRemove` concurrently
+  // unreachable/gone list (e.g., an agent in `toRemoveXXX` concurrently
   // reregistered), entries in `toRemove` might not appear in
-  // `slaves.unreachable`.
+  // `slaves.unreachable` or `slaves.gone`.
   //
   // TODO(neilc): It would be nice to verify that the effect of these
   // in-memory updates is equivalent to the changes made by the registry
   // operation, but there isn't an easy way to do that.
-  size_t numRemoved = 0;
-  foreach (const SlaveID& slave, toRemove) {
+
+  size_t numRemovedUnreachable = 0;
+  foreach (const SlaveID& slave, toRemoveUnreachable) {
     if (!slaves.unreachable.contains(slave)) {
       LOG(WARNING) << "Failed to garbage collect " << slave
                    << " from the unreachable list";
@@ -1918,12 +1930,25 @@ void Master::_doRegistryGc(
     }
 
     slaves.unreachable.erase(slave);
-    numRemoved++;
+    numRemovedUnreachable++;
+  }
+
+  size_t numRemovedGone = 0;
+  foreach (const SlaveID& slave, toRemoveGone) {
+    if (!slaves.gone.contains(slave)) {
+      LOG(WARNING) << "Failed to garbage collect " << slave
+                   << " from the gone list";
+      continue;
+    }
+
+    slaves.gone.erase(slave);
+    numRemovedGone++;
   }
 
   // TODO(neilc): Add a metric for # of agents discarded from the registry?
-  LOG(INFO) << "Garbage collected " << numRemoved
-            << " unreachable agents from the registry";
+  LOG(INFO) << "Garbage collected " << numRemovedUnreachable
+            << " unreachable and " << numRemovedGone
+            << " gone agents from the registry";
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/3515cd0a/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 23b864d..0c0f28e 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -1028,7 +1028,8 @@ private:
   void doRegistryGc();
 
   void _doRegistryGc(
-      const hashset<SlaveID>& toRemove,
+      const hashset<SlaveID>& toRemoveUnreachable,
+      const hashset<SlaveID>& toRemoveGone,
       const process::Future<bool>& registrarResult);
 
   process::Future<bool> authorizeLogAccess(
@@ -2239,45 +2240,69 @@ private:
 };
 
 
-class PruneUnreachable : public Operation
+class Prune : public Operation
 {
 public:
-  explicit PruneUnreachable(const hashset<SlaveID>& _toRemove)
-    : toRemove(_toRemove) {}
+  explicit Prune(
+      const hashset<SlaveID>& _toRemoveUnreachable,
+      const hashset<SlaveID>& _toRemoveGone)
+    : toRemoveUnreachable(_toRemoveUnreachable),
+      toRemoveGone(_toRemoveGone) {}
 
 protected:
   virtual Try<bool> perform(Registry* registry, hashset<SlaveID>* /*slaveIDs*/)
   {
-    // Attempt to remove the SlaveIDs in `toRemove` from the
-    // unreachable list. Some SlaveIDs in `toRemove` might not appear
+    // Attempt to remove the SlaveIDs in the `toRemoveXXX` from the
+    // unreachable/gone list. Some SlaveIDs in `toRemoveXXX` might not appear
     // in the registry; this is possible if there was a concurrent
     // registry operation.
     //
     // TODO(neilc): This has quadratic worst-case behavior, because
     // `DeleteSubrange` for a `repeated` object takes linear time.
     bool mutate = false;
-    int i = 0;
-    while (i < registry->unreachable().slaves().size()) {
-      const Registry::UnreachableSlave& slave =
-        registry->unreachable().slaves(i);
 
-      if (toRemove.contains(slave.id())) {
-        Registry::UnreachableSlaves* unreachable =
-          registry->mutable_unreachable();
+    {
+      int i = 0;
+      while (i < registry->unreachable().slaves().size()) {
+        const Registry::UnreachableSlave& slave =
+          registry->unreachable().slaves(i);
+
+        if (toRemoveUnreachable.contains(slave.id())) {
+          Registry::UnreachableSlaves* unreachable =
+            registry->mutable_unreachable();
+
+          unreachable->mutable_slaves()->DeleteSubrange(i, i+1);
+          mutate = true;
+          continue;
+        }
 
-        unreachable->mutable_slaves()->DeleteSubrange(i, i+1);
-        mutate = true;
-        continue;
+        i++;
       }
+    }
+
+    {
+      int i = 0;
+      while (i < registry->gone().slaves().size()) {
+        const Registry::GoneSlave& slave = registry->gone().slaves(i);
+
+        if (toRemoveGone.contains(slave.id())) {
+          Registry::GoneSlaves* gone = registry->mutable_gone();
 
-      i++;
+          gone->mutable_slaves()->DeleteSubrange(i, i+1);
+          mutate = true;
+          continue;
+        }
+
+        i++;
+      }
     }
 
     return mutate;
   }
 
 private:
-  const hashset<SlaveID> toRemove;
+  const hashset<SlaveID> toRemoveUnreachable;
+  const hashset<SlaveID> toRemoveGone;
 };
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/3515cd0a/src/tests/master_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_tests.cpp b/src/tests/master_tests.cpp
index 98908c0..4c77601 100644
--- a/src/tests/master_tests.cpp
+++ b/src/tests/master_tests.cpp
@@ -8759,6 +8759,139 @@ TEST_P(MasterTestPrePostReservationRefinement, CreateAndDestroyVolumesV1)
   AWAIT_EXPECT_RESPONSE_STATUS_EQ(Accepted().status, v1DestroyVolumesResponse);
 }
 
+
+// This test checks that the master correctly garbage collects
+// information about gone agents from the registry using the
+// count-based GC criterion.
+//
+// TODO(andschwa): Enable this when MESOS-7604 is fixed.
+TEST_F_TEMP_DISABLED_ON_WINDOWS(MasterTest, RegistryGcByCount)
+{
+  // Configure GC to only keep the most recent gone agent in the gone list.
+  master::Flags masterFlags = CreateMasterFlags();
+  masterFlags.registry_max_agent_count = 1;
+
+  Try<Owned<cluster::Master>> master = this->StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+
+  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), master.get()->pid, _);
+
+  slave::Flags slaveFlags = CreateSlaveFlags();
+
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags);
+  ASSERT_SOME(slave);
+
+  // Ensure that the agent is registered successfully with the master
+  // before marking it as gone.
+  AWAIT_READY(slaveRegisteredMessage);
+
+  ContentType contentType = ContentType::PROTOBUF;
+
+  const SlaveID& slaveId = slaveRegisteredMessage->slave_id();
+
+  {
+    v1::master::Call v1Call;
+    v1Call.set_type(v1::master::Call::MARK_AGENT_GONE);
+
+    v1::master::Call::MarkAgentGone* markAgentGone =
+      v1Call.mutable_mark_agent_gone();
+
+    markAgentGone->mutable_agent_id()->CopyFrom(evolve(slaveId));
+
+    Future<process::http::Response> response = process::http::post(
+        master.get()->pid,
+        "api/v1",
+        createBasicAuthHeaders(DEFAULT_CREDENTIAL),
+        serialize(contentType, v1Call),
+        stringify(contentType));
+
+    AWAIT_EXPECT_RESPONSE_STATUS_EQ(process::http::OK().status, response);
+  }
+
+  Future<SlaveRegisteredMessage> slaveRegisteredMessage2 =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), master.get()->pid, _);
+
+  slave::Flags slaveFlags2 = CreateSlaveFlags();
+
+  Try<Owned<cluster::Slave>> slave2 = StartSlave(detector.get(), slaveFlags2);
+  ASSERT_SOME(slave2);
+
+  AWAIT_READY(slaveRegisteredMessage2);
+
+  const SlaveID& slaveId2 = slaveRegisteredMessage2->slave_id();
+
+  {
+    v1::master::Call v1Call;
+    v1Call.set_type(v1::master::Call::MARK_AGENT_GONE);
+
+    v1::master::Call::MarkAgentGone* markAgentGone =
+      v1Call.mutable_mark_agent_gone();
+
+    markAgentGone->mutable_agent_id()->CopyFrom(evolve(slaveId2));
+
+    Future<process::http::Response> response = process::http::post(
+        master.get()->pid,
+        "api/v1",
+        createBasicAuthHeaders(DEFAULT_CREDENTIAL),
+        serialize(contentType, v1Call),
+        stringify(contentType));
+
+    AWAIT_EXPECT_RESPONSE_STATUS_EQ(process::http::OK().status, response);
+  }
+
+  // Advance the clock to cause GC to be performed.
+  Clock::pause();
+  Clock::advance(masterFlags.registry_gc_interval);
+  Clock::settle();
+
+  // Start a framework and do explicit reconciliation for a random task ID
+  // on `slave1` and `slave2`. Since, `slave1` has been GC'ed from the list
+  // of gone agents, a 'TASK_UNKNOWN' update should be received for it.
+
+  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.add_capabilities()->set_type(
+      FrameworkInfo::Capability::PARTITION_AWARE);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, frameworkInfo, master.get()->pid, DEFAULT_CREDENTIAL);
+
+  Future<Nothing> registered;
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .WillOnce(FutureSatisfy(&registered));
+
+  driver.start();
+
+  AWAIT_READY(registered);
+
+  TaskStatus status1;
+  status1.mutable_task_id()->set_value(UUID::random().toString());
+  status1.mutable_slave_id()->CopyFrom(slaveId);
+  status1.set_state(TASK_STAGING); // Dummy value.
+
+  TaskStatus status2;
+  status2.mutable_task_id()->set_value(UUID::random().toString());
+  status2.mutable_slave_id()->CopyFrom(slaveId2);
+  status2.set_state(TASK_STAGING); // Dummy value.
+
+  Future<TaskStatus> reconcileUpdate1;
+  Future<TaskStatus> reconcileUpdate2;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&reconcileUpdate1))
+    .WillOnce(FutureArg<1>(&reconcileUpdate2));
+
+  driver.reconcileTasks({status1, status2});
+
+  AWAIT_READY(reconcileUpdate1);
+  AWAIT_READY(reconcileUpdate2);
+
+  ASSERT_EQ(TASK_UNKNOWN, reconcileUpdate1->state());
+  ASSERT_EQ(TASK_GONE_BY_OPERATOR, reconcileUpdate2->state());
+}
+
 } // namespace tests {
 } // namespace internal {
 } // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/3515cd0a/src/tests/partition_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/partition_tests.cpp b/src/tests/partition_tests.cpp
index 0886f48..0597bd2 100644
--- a/src/tests/partition_tests.cpp
+++ b/src/tests/partition_tests.cpp
@@ -3191,19 +3191,19 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(PartitionTest, RegistryGcRace)
   // this should result in attempting to prune `slave1` and `slave2`
   // from the unreachable list. We intercept the registry operation to
   // force the race condition with the reregistration of `slave2`.
-  Future<Owned<master::Operation>> pruneUnreachable;
-  Promise<bool> pruneUnreachableContinue;
+  Future<Owned<master::Operation>> prune;
+  Promise<bool> pruneContinue;
   EXPECT_CALL(*master.get()->registrar.get(), apply(_))
-    .WillOnce(DoAll(FutureArg<0>(&pruneUnreachable),
-                    Return(pruneUnreachableContinue.future())));
+    .WillOnce(DoAll(FutureArg<0>(&prune),
+                    Return(pruneContinue.future())));
 
   Clock::advance(masterFlags.registry_gc_interval);
 
-  AWAIT_READY(pruneUnreachable);
+  AWAIT_READY(prune);
   EXPECT_NE(
       nullptr,
-      dynamic_cast<master::PruneUnreachable*>(
-          pruneUnreachable->get()));
+      dynamic_cast<master::Prune*>(
+          prune->get()));
 
   // Apply the registry operation to mark the slave reachable, then
   // pass the result back to the master to allow it to continue. We
@@ -3224,10 +3224,10 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(PartitionTest, RegistryGcRace)
   // Apply the registry operation to prune the unreachable list, then
   // pass the result back to the master to allow it to continue.
   Future<bool> applyPrune =
-    master.get()->registrar->unmocked_apply(pruneUnreachable.get());
+    master.get()->registrar->unmocked_apply(prune.get());
 
   AWAIT_READY(applyPrune);
-  pruneUnreachableContinue.set(applyPrune.get());
+  pruneContinue.set(applyPrune.get());
 
   // We expect that `slave1` has been removed from the unreachable
   // list, `slave2` is registered, and `slave3` is still in the

http://git-wip-us.apache.org/repos/asf/mesos/blob/3515cd0a/src/tests/registrar_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/registrar_tests.cpp b/src/tests/registrar_tests.cpp
index 286aa1c..9caff04 100644
--- a/src/tests/registrar_tests.cpp
+++ b/src/tests/registrar_tests.cpp
@@ -401,7 +401,7 @@ TEST_F(RegistrarTest, MarkUnreachableGone)
 }
 
 
-TEST_F(RegistrarTest, PruneUnreachable)
+TEST_F(RegistrarTest, Prune)
 {
   Registrar registrar(flags, state);
   AWAIT_READY(registrar.recover(master));
@@ -420,8 +420,16 @@ TEST_F(RegistrarTest, PruneUnreachable)
   info2.set_hostname("localhost");
   info2.mutable_id()->CopyFrom(id2);
 
+  SlaveID id3;
+  id3.set_value("3");
+
+  SlaveInfo info3;
+  info3.set_hostname("localhost");
+  info3.mutable_id()->CopyFrom(id3);
+
   AWAIT_TRUE(registrar.apply(Owned<Operation>(new AdmitSlave(info1))));
   AWAIT_TRUE(registrar.apply(Owned<Operation>(new AdmitSlave(info2))));
+  AWAIT_TRUE(registrar.apply(Owned<Operation>(new AdmitSlave(info3))));
 
   AWAIT_TRUE(
       registrar.apply(
@@ -433,17 +441,37 @@ TEST_F(RegistrarTest, PruneUnreachable)
           Owned<Operation>(
               new MarkSlaveUnreachable(info2, protobuf::getCurrentTime()))));
 
-  AWAIT_TRUE(registrar.apply(Owned<Operation>(new PruneUnreachable({id1}))));
-  AWAIT_TRUE(registrar.apply(Owned<Operation>(new PruneUnreachable({id2}))));
+  AWAIT_TRUE(
+      registrar.apply(
+          Owned<Operation>(
+              new MarkSlaveGone(info3.id(), protobuf::getCurrentTime()))));
+
+  AWAIT_TRUE(registrar.apply(Owned<Operation>(new Prune({id1}, {}))));
+  AWAIT_TRUE(registrar.apply(Owned<Operation>(new Prune({id2}, {}))));
+  AWAIT_TRUE(registrar.apply(Owned<Operation>(new Prune({}, {id3}))));
 
   AWAIT_TRUE(registrar.apply(Owned<Operation>(new AdmitSlave(info1))));
+  AWAIT_TRUE(registrar.apply(Owned<Operation>(new AdmitSlave(info2))));
+  AWAIT_TRUE(registrar.apply(Owned<Operation>(new AdmitSlave(info3))));
 
   AWAIT_TRUE(
       registrar.apply(
           Owned<Operation>(
               new MarkSlaveUnreachable(info1, protobuf::getCurrentTime()))));
 
-  AWAIT_TRUE(registrar.apply(Owned<Operation>(new PruneUnreachable({id1}))));
+  AWAIT_TRUE(
+      registrar.apply(
+          Owned<Operation>(
+              new MarkSlaveUnreachable(info2, protobuf::getCurrentTime()))));
+
+  AWAIT_TRUE(
+      registrar.apply(
+          Owned<Operation>(
+              new MarkSlaveGone(info3.id(), protobuf::getCurrentTime()))));
+
+  AWAIT_TRUE(registrar.apply(Owned<Operation>(new Prune({id1}, {}))));
+  AWAIT_TRUE(registrar.apply(Owned<Operation>(new Prune({id2}, {}))));
+  AWAIT_TRUE(registrar.apply(Owned<Operation>(new Prune({}, {id3}))));
 }
 
 
@@ -1495,7 +1523,9 @@ TEST_P(Registrar_BENCHMARK_Test, GcManyAgents)
 
   // Do GC.
   watch.start();
-  result = registrar.apply(Owned<Operation>(new PruneUnreachable(toRemove)));
+  result = registrar.apply(Owned<Operation>(
+      new Prune(toRemove, hashset<SlaveID>())));
+
   AWAIT_READY_FOR(result, Minutes(5));
   cout << "Garbage collected " << toRemove.size() << " agents in "
        << watch.elapsed() << endl;


[08/11] mesos git commit: Added `__removeSlave` function and made `_markUnreachable` use it.

Posted by an...@apache.org.
Added `__removeSlave` function and made `_markUnreachable` use it.

This would also be used by the `markGone()` function in the next
review to remove agents.

Review: https://reviews.apache.org/r/62581/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/ff07dd82
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/ff07dd82
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/ff07dd82

Branch: refs/heads/master
Commit: ff07dd82a0b99e5b7777c72ae844dd1ab409b2ed
Parents: 22f72a4
Author: Anand Mazumdar <an...@apache.org>
Authored: Fri Sep 29 11:56:50 2017 -0700
Committer: Anand Mazumdar <an...@apache.org>
Committed: Fri Sep 29 13:31:08 2017 -0700

----------------------------------------------------------------------
 src/master/master.cpp | 212 ++++++++++++++++++++++++---------------------
 src/master/master.hpp |   5 ++
 2 files changed, 118 insertions(+), 99 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/ff07dd82/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 6d84a26..ee71c1d 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -7175,107 +7175,9 @@ void Master::_markUnreachable(
   ++metrics->slave_removals;
   ++metrics->slave_removals_reason_unhealthy;
 
-  // We want to remove the slave first, to avoid the allocator
-  // re-allocating the recovered resources.
-  //
-  // NOTE: Removing the slave is not sufficient for recovering the
-  // resources in the allocator, because the "Sorters" are updated
-  // only within recoverResources() (see MESOS-621). The calls to
-  // recoverResources() below are therefore required, even though
-  // the slave is already removed.
-  allocator->removeSlave(slave->id);
-
-  // Transition tasks to TASK_UNREACHABLE / TASK_LOST and remove them.
-  // We only use TASK_UNREACHABLE if the framework has opted in to the
-  // PARTITION_AWARE capability.
-  foreachkey (const FrameworkID& frameworkId, utils::copy(slave->tasks)) {
-    Framework* framework = getFramework(frameworkId);
-    CHECK_NOTNULL(framework);
-
-    TaskState newTaskState = TASK_UNREACHABLE;
-    if (!framework->capabilities.partitionAware) {
-      newTaskState = TASK_LOST;
-    }
-
-    foreachvalue (Task* task, utils::copy(slave->tasks[frameworkId])) {
-      const StatusUpdate& update = protobuf::createStatusUpdate(
-          task->framework_id(),
-          task->slave_id(),
-          task->task_id(),
-          newTaskState,
-          TaskStatus::SOURCE_MASTER,
-          None(),
-          "Agent " + slave->info.hostname() + " is unreachable: " + message,
-          TaskStatus::REASON_SLAVE_REMOVED,
-          (task->has_executor_id() ?
-              Option<ExecutorID>(task->executor_id()) : None()),
-          None(),
-          None(),
-          None(),
-          None(),
-          unreachableTime);
-
-      updateTask(task, update);
-      removeTask(task);
-
-      if (!framework->connected()) {
-        LOG(WARNING) << "Dropping update " << update
-                     << " for disconnected "
-                     << " framework " << frameworkId;
-      } else {
-        forward(update, UPID(), framework);
-      }
-    }
-  }
-
-  // Remove executors from the slave for proper resource accounting.
-  foreachkey (const FrameworkID& frameworkId, utils::copy(slave->executors)) {
-    foreachkey (const ExecutorID& executorId,
-                utils::copy(slave->executors[frameworkId])) {
-      removeExecutor(slave, frameworkId, executorId);
-    }
-  }
-
-  foreach (Offer* offer, utils::copy(slave->offers)) {
-    // TODO(vinod): We don't need to call 'Allocator::recoverResources'
-    // once MESOS-621 is fixed.
-    allocator->recoverResources(
-        offer->framework_id(), slave->id, offer->resources(), None());
-
-    // Remove and rescind offers.
-    removeOffer(offer, true); // Rescind!
-  }
-
-  // Remove inverse offers because sending them for a slave that is
-  // unreachable doesn't make sense.
-  foreach (InverseOffer* inverseOffer, utils::copy(slave->inverseOffers)) {
-    // We don't need to update the allocator because we've already called
-    // `RemoveSlave()`.
-    // Remove and rescind inverse offers.
-    removeInverseOffer(inverseOffer, true); // Rescind!
-  }
-
-  // Mark the slave as being unreachable.
-  slaves.registered.remove(slave);
-  slaves.removed.put(slave->id, Nothing());
   slaves.unreachable[slave->id] = unreachableTime;
-  authenticated.erase(slave->pid);
-
-  // Remove the slave from the `machines` mapping.
-  CHECK(machines.contains(slave->machineId));
-  CHECK(machines[slave->machineId].slaves.contains(slave->id));
-  machines[slave->machineId].slaves.erase(slave->id);
-
-  // Kill the slave observer.
-  terminate(slave->observer);
-  wait(slave->observer);
-  delete slave->observer;
-
-  // TODO(benh): unlink(slave->pid);
-
-  sendSlaveLost(slave->info);
 
-  delete slave;
+  __removeSlave(slave, message, unreachableTime);
 }
 
 
@@ -8920,6 +8822,118 @@ void Master::_removeSlave(
 }
 
 
+void Master::__removeSlave(
+    Slave* slave,
+    const string& message,
+    const Option<TimeInfo>& unreachableTime)
+{
+  // We want to remove the slave first, to avoid the allocator
+  // re-allocating the recovered resources.
+  //
+  // NOTE: Removing the slave is not sufficient for recovering the
+  // resources in the allocator, because the "Sorters" are updated
+  // only within recoverResources() (see MESOS-621). The calls to
+  // recoverResources() below are therefore required, even though
+  // the slave is already removed.
+  allocator->removeSlave(slave->id);
+
+  // Transition tasks to TASK_UNREACHABLE/TASK_LOST
+  // and remove them. We only use TASK_UNREACHABLE if
+  // the framework has opted in to the PARTITION_AWARE capability.
+  foreachkey (const FrameworkID& frameworkId, utils::copy(slave->tasks)) {
+    Framework* framework = getFramework(frameworkId);
+    CHECK_NOTNULL(framework);
+
+    TaskState newTaskState = TASK_UNREACHABLE;
+    TaskStatus::Reason newTaskReason = TaskStatus::REASON_SLAVE_REMOVED;
+
+    if (!framework->capabilities.partitionAware) {
+      newTaskState = TASK_LOST;
+    } else {
+      newTaskState = TASK_UNREACHABLE;
+    }
+
+    foreachvalue (Task* task, utils::copy(slave->tasks[frameworkId])) {
+      const StatusUpdate& update = protobuf::createStatusUpdate(
+          task->framework_id(),
+          task->slave_id(),
+          task->task_id(),
+          newTaskState,
+          TaskStatus::SOURCE_MASTER,
+          None(),
+          message,
+          newTaskReason,
+          (task->has_executor_id() ?
+              Option<ExecutorID>(task->executor_id()) : None()),
+          None(),
+          None(),
+          None(),
+          None(),
+          unreachableTime.isSome() ? unreachableTime : None());
+
+      updateTask(task, update);
+      removeTask(task);
+
+      if (!framework->connected()) {
+        LOG(WARNING) << "Dropping update " << update
+                     << " for disconnected "
+                     << " framework " << frameworkId;
+      } else {
+        forward(update, UPID(), framework);
+      }
+    }
+  }
+
+  // Remove executors from the slave for proper resource accounting.
+  foreachkey (const FrameworkID& frameworkId, utils::copy(slave->executors)) {
+    foreachkey (const ExecutorID& executorId,
+                utils::copy(slave->executors[frameworkId])) {
+      removeExecutor(slave, frameworkId, executorId);
+    }
+  }
+
+  foreach (Offer* offer, utils::copy(slave->offers)) {
+    // TODO(vinod): We don't need to call 'Allocator::recoverResources'
+    // once MESOS-621 is fixed.
+    allocator->recoverResources(
+        offer->framework_id(), slave->id, offer->resources(), None());
+
+    // Remove and rescind offers.
+    removeOffer(offer, true); // Rescind!
+  }
+
+  // Remove inverse offers because sending them for a slave that is
+  // unreachable doesn't make sense.
+  foreach (InverseOffer* inverseOffer, utils::copy(slave->inverseOffers)) {
+    // We don't need to update the allocator because we've already called
+    // `RemoveSlave()`.
+    // Remove and rescind inverse offers.
+    removeInverseOffer(inverseOffer, true); // Rescind!
+  }
+
+  // Mark the slave as being removed.
+  slaves.registered.remove(slave);
+  slaves.removed.put(slave->id, Nothing());
+  authenticated.erase(slave->pid);
+
+  // Remove the slave from the `machines` mapping.
+  CHECK(machines.contains(slave->machineId));
+  CHECK(machines[slave->machineId].slaves.contains(slave->id));
+  machines[slave->machineId].slaves.erase(slave->id);
+
+  // Kill the slave observer.
+  terminate(slave->observer);
+  wait(slave->observer);
+  delete slave->observer;
+
+  // TODO(benh): unlink(slave->pid);
+
+  sendSlaveLost(slave->info);
+
+  delete slave;
+}
+
+
 void Master::updateTask(Task* task, const StatusUpdate& update)
 {
   CHECK_NOTNULL(task);

http://git-wip-us.apache.org/repos/asf/mesos/blob/ff07dd82/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 731619b..8cbf940 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -732,6 +732,11 @@ protected:
       const std::string& removalCause,
       Option<process::metrics::Counter> reason = None());
 
+  void __removeSlave(
+      Slave* slave,
+      const std::string& message,
+      const Option<TimeInfo>& unreachableTime);
+
   // Validates that the framework is authenticated, if required.
   Option<Error> validateFrameworkAuthentication(
       const FrameworkInfo& frameworkInfo,


[05/11] mesos git commit: Added the `REASON_AGENT_REMOVED_BY_OPERATOR` to the mesos protos.

Posted by an...@apache.org.
Added the `REASON_AGENT_REMOVED_BY_OPERATOR` to the mesos protos.

Review: https://reviews.apache.org/r/62476/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/ec1e98df
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/ec1e98df
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/ec1e98df

Branch: refs/heads/master
Commit: ec1e98df48dc53cc31e93265a996b4cccefba32e
Parents: 50f3549
Author: Anand Mazumdar <an...@apache.org>
Authored: Fri Sep 29 11:56:22 2017 -0700
Committer: Anand Mazumdar <an...@apache.org>
Committed: Fri Sep 29 13:31:08 2017 -0700

----------------------------------------------------------------------
 include/mesos/mesos.proto    | 1 +
 include/mesos/v1/mesos.proto | 1 +
 2 files changed, 2 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/ec1e98df/include/mesos/mesos.proto
----------------------------------------------------------------------
diff --git a/include/mesos/mesos.proto b/include/mesos/mesos.proto
index 3b2d6bb..1bfcc5b 100644
--- a/include/mesos/mesos.proto
+++ b/include/mesos/mesos.proto
@@ -2201,6 +2201,7 @@ message TaskStatus {
     REASON_RESOURCES_UNKNOWN = 18;
     REASON_SLAVE_DISCONNECTED = 10;
     REASON_SLAVE_REMOVED = 11;
+    REASON_SLAVE_REMOVED_BY_OPERATOR = 31;
     REASON_SLAVE_RESTARTED = 12;
     REASON_SLAVE_UNKNOWN = 13;
     REASON_TASK_KILLED_DURING_LAUNCH = 30;

http://git-wip-us.apache.org/repos/asf/mesos/blob/ec1e98df/include/mesos/v1/mesos.proto
----------------------------------------------------------------------
diff --git a/include/mesos/v1/mesos.proto b/include/mesos/v1/mesos.proto
index 4ca4886..d742adb 100644
--- a/include/mesos/v1/mesos.proto
+++ b/include/mesos/v1/mesos.proto
@@ -2184,6 +2184,7 @@ message TaskStatus {
     REASON_RESOURCES_UNKNOWN = 18;
     REASON_AGENT_DISCONNECTED = 10;
     REASON_AGENT_REMOVED = 11;
+    REASON_AGENT_REMOVED_BY_OPERATOR = 31;
     REASON_AGENT_RESTARTED = 12;
     REASON_AGENT_UNKNOWN = 13;
     REASON_TASK_KILLED_DURING_LAUNCH = 30;


[11/11] mesos git commit: Fixed tests impacted by no longer removing the agent symlink.

Posted by an...@apache.org.
Fixed tests impacted by no longer removing the agent symlink.

This change fixes tests that relied on the agent work directory
symlink being removed when the agent receives the shutdown message
from the master.

Review: https://reviews.apache.org/r/62481/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/c0468b24
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/c0468b24
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/c0468b24

Branch: refs/heads/master
Commit: c0468b240842d4aaf04249cb0a58c59c43d1850d
Parents: b491239
Author: Anand Mazumdar <an...@apache.org>
Authored: Fri Sep 29 11:58:26 2017 -0700
Committer: Anand Mazumdar <an...@apache.org>
Committed: Fri Sep 29 13:31:08 2017 -0700

----------------------------------------------------------------------
 src/tests/gc_tests.cpp             | 19 ++++++++++---------
 src/tests/slave_recovery_tests.cpp |  6 +++---
 src/tests/slave_tests.cpp          |  4 +++-
 3 files changed, 16 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/c0468b24/src/tests/gc_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/gc_tests.cpp b/src/tests/gc_tests.cpp
index 5581861..37d3eac 100644
--- a/src/tests/gc_tests.cpp
+++ b/src/tests/gc_tests.cpp
@@ -281,9 +281,11 @@ TEST_F(GarbageCollectorTest, Prune)
 class GarbageCollectorIntegrationTest : public MesosTest {};
 
 
-// This test ensures that garbage collection removes
+// This test ensures that garbage collection does not remove
 // the slave working directory after a slave restart.
-TEST_F(GarbageCollectorIntegrationTest, Restart)
+//
+// TODO(andschwa): Enable this when MESOS-7604 is fixed.
+TEST_F_TEMP_DISABLED_ON_WINDOWS(GarbageCollectorIntegrationTest, Restart)
 {
   Try<Owned<cluster::Master>> master = StartMaster();
   ASSERT_SOME(master);
@@ -371,22 +373,21 @@ TEST_F(GarbageCollectorIntegrationTest, Restart)
 
   Clock::pause();
 
-  Future<Nothing> schedule =
-    FUTURE_DISPATCH(_, &GarbageCollectorProcess::schedule);
+  Future<Nothing> __recover = FUTURE_DISPATCH(_, &Slave::__recover);
 
   slave = StartSlave(detector.get(), flags);
   ASSERT_SOME(slave);
 
-  AWAIT_READY(schedule);
-
-  Clock::settle(); // Wait for GarbageCollectorProcess::schedule to complete.
+  // Wait for the agent to finish recovery.
+  AWAIT_READY(__recover);
+  Clock::settle();
 
   Clock::advance(flags.gc_delay);
 
   Clock::settle();
 
-  // By this time the old slave directory should be cleaned up.
-  ASSERT_FALSE(os::exists(slaveDir));
+  // By this time the old slave directory should not be cleaned up.
+  ASSERT_TRUE(os::exists(slaveDir));
 
   Clock::resume();
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/c0468b24/src/tests/slave_recovery_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_recovery_tests.cpp b/src/tests/slave_recovery_tests.cpp
index 0cd2b5d..30d8c23 100644
--- a/src/tests/slave_recovery_tests.cpp
+++ b/src/tests/slave_recovery_tests.cpp
@@ -3024,7 +3024,7 @@ TYPED_TEST(SlaveRecoveryTest, GCExecutor)
 
 
 // The slave is asked to shutdown. When it comes back up, it should
-// register as a new slave.
+// re-register as the same agent.
 TYPED_TEST(SlaveRecoveryTest, ShutdownSlave)
 {
   Try<Owned<cluster::Master>> master = this->StartMaster();
@@ -3134,8 +3134,8 @@ TYPED_TEST(SlaveRecoveryTest, ShutdownSlave)
   EXPECT_EQ(Resources(offers1.get()[0].resources()),
             Resources(offers3.get()[0].resources()));
 
-  // Ensure the slave id is different.
-  EXPECT_NE(
+  // Ensure the slave id is same.
+  EXPECT_EQ(
       offers1.get()[0].slave_id().value(), offers3.get()[0].slave_id().value());
 
   driver.stop();

http://git-wip-us.apache.org/repos/asf/mesos/blob/c0468b24/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index e9bcfef..2ff6dab 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -8254,7 +8254,9 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(SlaveTest, ExecutorReregistrationTimeoutFlag)
 // successfully. Note that shutting down the agent gracefully (killing
 // all tasks) is necessary, because changing the agent's domain is an
 // incompatible change to its SlaveInfo.
-TEST_F(SlaveTest, ChangeDomain)
+//
+// TODO(anand): Re-enable this test when fault domain upgrade is supported.
+TEST_F(SlaveTest, DISABLED_ChangeDomain)
 {
   Clock::pause();
 


[04/11] mesos git commit: Added `MARK_AGENT_GONE` call to the v1 Master API.

Posted by an...@apache.org.
Added `MARK_AGENT_GONE` call to the v1 Master API.

This change introduces the `MARK_AGENT_GONE` call that can be
used by operators to assert that a given agent has failed. It
is specially useful for stateful frameworks to ascertain whether
its safe to move the workload to a new agent.

Review: https://reviews.apache.org/r/62475/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/50f35499
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/50f35499
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/50f35499

Branch: refs/heads/master
Commit: 50f35499ce41f0024053fb2557b69b5e03d5973b
Parents: 6eefc68
Author: Anand Mazumdar <an...@apache.org>
Authored: Fri Sep 29 11:56:04 2017 -0700
Committer: Anand Mazumdar <an...@apache.org>
Committed: Fri Sep 29 13:31:08 2017 -0700

----------------------------------------------------------------------
 include/mesos/master/master.proto    | 18 ++++++++++++++++++
 include/mesos/v1/master/master.proto | 18 ++++++++++++++++++
 2 files changed, 36 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/50f35499/include/mesos/master/master.proto
----------------------------------------------------------------------
diff --git a/include/mesos/master/master.proto b/include/mesos/master/master.proto
index b94e902..79be497 100644
--- a/include/mesos/master/master.proto
+++ b/include/mesos/master/master.proto
@@ -89,6 +89,8 @@ message Call {
     REMOVE_QUOTA = 30;       // See 'RemoveQuota' below.
 
     TEARDOWN = 31;       // See 'Teardown' below.
+
+    MARK_AGENT_GONE = 32; // See 'MarkAgentGone' below.
   }
 
   // Provides a snapshot of the current metrics tracked by the master.
@@ -196,6 +198,21 @@ message Call {
     required FrameworkID framework_id = 1;
   }
 
+  // Mark an agent as gone. This can be used by an operator to assert
+  // that the agent instance has failed and is never coming back (e.g.,
+  // ephemeral instance from cloud provider). The master would shutdown
+  // the agent and send 'TASK_GONE_BY_OPERATOR' updates for all the running
+  // tasks. The persistent volumes/reservations on the agent won't be
+  // accessible anymore.
+  //
+  // NOTE: It is possible that the tasks might still be running
+  // if the operator's assertion was wrong and the agent was partitioned
+  // away from the master. The agent would be shutdown when it tries to
+  // re-register with the master when the partition heals.
+  message MarkAgentGone {
+    required SlaveID slave_id = 1;
+  }
+
   optional Type type = 1;
 
   optional GetMetrics get_metrics = 2;
@@ -213,6 +230,7 @@ message Call {
   optional SetQuota set_quota = 14;
   optional RemoveQuota remove_quota = 15;
   optional Teardown teardown = 16;
+  optional MarkAgentGone mark_agent_gone = 17;
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/50f35499/include/mesos/v1/master/master.proto
----------------------------------------------------------------------
diff --git a/include/mesos/v1/master/master.proto b/include/mesos/v1/master/master.proto
index 7499fa4..ea04b8f 100644
--- a/include/mesos/v1/master/master.proto
+++ b/include/mesos/v1/master/master.proto
@@ -89,6 +89,8 @@ message Call {
     REMOVE_QUOTA = 30;       // See 'RemoveQuota' below.
 
     TEARDOWN = 31;       // See 'Teardown' below.
+
+    MARK_AGENT_GONE = 32; // See 'MarkAgentGone' below.
   }
 
   // Provides a snapshot of the current metrics tracked by the master.
@@ -196,6 +198,21 @@ message Call {
     required FrameworkID framework_id = 1;
   }
 
+  // Mark an agent as gone. This can be used by an operator to assert
+  // that the agent instance has failed and is never coming back (e.g.,
+  // ephemeral instance from cloud provider). The master would shutdown
+  // the agent and send 'TASK_GONE_BY_OPERATOR' updates for all the running
+  // tasks. The persistent volumes/reservations on the agent won't be
+  // accessible anymore.
+  //
+  // NOTE: It is possible that the tasks might still be running
+  // if the operator's assertion was wrong and the agent was partitioned
+  // away from the master. The agent would be shutdown when it tries to
+  // re-register with the master when the partition heals.
+  message MarkAgentGone {
+    required AgentID agent_id = 1;
+  }
+
   optional Type type = 1;
 
   optional GetMetrics get_metrics = 2;
@@ -213,6 +230,7 @@ message Call {
   optional SetQuota set_quota = 14;
   optional RemoveQuota remove_quota = 15;
   optional Teardown teardown = 16;
+  optional MarkAgentGone mark_agent_gone = 17;
 }
 
 


[06/11] mesos git commit: Added the gone agents to the master registry.

Posted by an...@apache.org.
Added the gone agents to the master registry.

This change adds the list of gone agents to the registry and also
introduces the \`MarkSlaveGone\` operation on the Mesos master. This
would be used by the Master Operator API handler to insert an agent
into the list of gone agents.

Review: https://reviews.apache.org/r/62477/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/22f72a46
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/22f72a46
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/22f72a46

Branch: refs/heads/master
Commit: 22f72a46f84ba518b39d714381b37a512b1d93cc
Parents: ec1e98d
Author: Anand Mazumdar <an...@apache.org>
Authored: Fri Sep 29 11:56:28 2017 -0700
Committer: Anand Mazumdar <an...@apache.org>
Committed: Fri Sep 29 13:31:08 2017 -0700

----------------------------------------------------------------------
 src/master/master.hpp         | 71 ++++++++++++++++++++++++++++++++++++++
 src/master/registry.proto     | 19 ++++++++++
 src/tests/registrar_tests.cpp | 62 +++++++++++++++++++++++++++++++++
 3 files changed, 152 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/22f72a46/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 05f8811..731619b 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -2287,6 +2287,77 @@ private:
 };
 
 
+// Move a slave from the list of admitted/unreachable slaves
+// to the list of gone slaves.
+class MarkSlaveGone : public Operation
+{
+public:
+  MarkSlaveGone(const SlaveID& _id, const TimeInfo& _goneTime)
+    : id(_id), goneTime(_goneTime) {}
+
+protected:
+  virtual Try<bool> perform(Registry* registry, hashset<SlaveID>* slaveIDs)
+  {
+    // Check whether the slave is already in the gone list. As currently
+    // implemented, this should not be possible: the master will not
+    // try to transition an already gone slave.
+    for (int i = 0; i < registry->gone().slaves().size(); i++) {
+      const Registry::GoneSlave& slave = registry->gone().slaves(i);
+
+      if (slave.id() == id) {
+        return Error("Agent " + stringify(id) + " already marked as gone");
+      }
+    }
+
+    // Check whether the slave is in the admitted/unreachable list.
+    bool found = false;
+    if (slaveIDs->contains(id)) {
+      found = true;
+      for (int i = 0; i < registry->slaves().slaves().size(); i++) {
+        const Registry::Slave& slave = registry->slaves().slaves(i);
+
+        if (slave.info().id() == id) {
+          registry->mutable_slaves()->mutable_slaves()->DeleteSubrange(i, 1);
+          slaveIDs->erase(id);
+          break;
+        }
+      }
+    }
+
+    if (!found) {
+      for (int i = 0; i < registry->unreachable().slaves().size(); i++) {
+        const Registry::UnreachableSlave& slave =
+          registry->unreachable().slaves(i);
+
+        if (slave.id() == id) {
+          registry->mutable_unreachable()->mutable_slaves()->DeleteSubrange(
+              i, 1);
+
+          found = true;
+          break;
+        }
+      }
+    }
+
+    if (found) {
+      Registry::GoneSlave* gone = registry->mutable_gone()->add_slaves();
+
+      gone->mutable_id()->CopyFrom(id);
+      gone->mutable_timestamp()->CopyFrom(goneTime);
+
+      return true; // Mutation;
+    }
+
+    // Should not happen.
+    return Error("Failed to find agent " + stringify(id));
+  }
+
+private:
+  const SlaveID id;
+  const TimeInfo goneTime;
+};
+
+
 inline std::ostream& operator<<(
     std::ostream& stream,
     const Framework& framework);

http://git-wip-us.apache.org/repos/asf/mesos/blob/22f72a46/src/master/registry.proto
----------------------------------------------------------------------
diff --git a/src/master/registry.proto b/src/master/registry.proto
index 362a9fa..8916ad3 100644
--- a/src/master/registry.proto
+++ b/src/master/registry.proto
@@ -54,6 +54,17 @@ message Registry {
     repeated UnreachableSlave slaves = 1;
   }
 
+  message GoneSlave {
+    required SlaveID id = 1;
+
+    // The time when the slave was marked gone by the master.
+    required TimeInfo timestamp = 2;
+  }
+
+  message GoneSlaves {
+    repeated GoneSlave slaves = 1;
+  }
+
   message Machine {
     required MachineInfo info = 1;
   }
@@ -86,6 +97,14 @@ message Registry {
   // `registry_max_agent_count` flags.
   optional UnreachableSlaves unreachable = 7;
 
+  // Slaves that have been explicitly marked as failed (no longer running)
+  // by the operator. They may or may not still be running.
+  //
+  // New entries are added to the end of this list; hence the first
+  // element of the list was added first (although if there is clock
+  // drift, it might not necessarily have the smallest timestamp).
+  optional GoneSlaves gone = 8;
+
   // Holds a list of machines and some status information about each.
   // See comments in `MachineInfo` for more information.
   optional Machines machines = 3;

http://git-wip-us.apache.org/repos/asf/mesos/blob/22f72a46/src/tests/registrar_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/registrar_tests.cpp b/src/tests/registrar_tests.cpp
index b5fc458..286aa1c 100644
--- a/src/tests/registrar_tests.cpp
+++ b/src/tests/registrar_tests.cpp
@@ -339,6 +339,68 @@ TEST_F(RegistrarTest, MarkUnreachable)
 }
 
 
+// Verify that an admitted slave can be marked as gone.
+TEST_F(RegistrarTest, MarkGone)
+{
+  Registrar registrar(flags, state);
+  AWAIT_READY(registrar.recover(master));
+
+  SlaveID id1;
+  id1.set_value("1");
+
+  SlaveInfo info1;
+  info1.set_hostname("localhost");
+  info1.mutable_id()->CopyFrom(id1);
+
+  AWAIT_TRUE(registrar.apply(Owned<Operation>(new AdmitSlave(info1))));
+
+  AWAIT_TRUE(
+      registrar.apply(
+          Owned<Operation>(
+              new MarkSlaveGone(info1.id(), protobuf::getCurrentTime()))));
+}
+
+
+// Verify that an unreachable slave can be marked as gone.
+TEST_F(RegistrarTest, MarkUnreachableGone)
+{
+  Registrar registrar(flags, state);
+  AWAIT_READY(registrar.recover(master));
+
+  SlaveID id1;
+  id1.set_value("1");
+
+  SlaveInfo info1;
+  info1.set_hostname("localhost");
+  info1.mutable_id()->CopyFrom(id1);
+
+  AWAIT_TRUE(registrar.apply(Owned<Operation>(new AdmitSlave(info1))));
+
+  AWAIT_TRUE(
+    registrar.apply(
+        Owned<Operation>(
+            new MarkSlaveUnreachable(info1, protobuf::getCurrentTime()))));
+
+  AWAIT_TRUE(
+    registrar.apply(
+        Owned<Operation>(
+            new MarkSlaveGone(info1.id(), protobuf::getCurrentTime()))));
+
+  // If a slave is already gone, trying to mark it gone again should fail.
+  AWAIT_FALSE(
+      registrar.apply(
+          Owned<Operation>(
+              new MarkSlaveGone(info1.id(), protobuf::getCurrentTime()))));
+
+  // If a slave is already gone, trying to mark it unreachable
+  // again should fail.
+  AWAIT_FALSE(
+      registrar.apply(
+          Owned<Operation>(
+              new MarkSlaveUnreachable(info1, protobuf::getCurrentTime()))));
+}
+
+
 TEST_F(RegistrarTest, PruneUnreachable)
 {
   Registrar registrar(flags, state);


[03/11] mesos git commit: Added the mark agent gone handler on the master.

Posted by an...@apache.org.
Added the mark agent gone handler on the master.

This change adds the neccessary logic for handling the mark agent
gone call on the master. Once an agent is marked as gone, it's
not allowed to re-register with the Mesos master. GC'ing the
list of gone agents (it can grow unbounded) would be added in
a separate change.

Review: https://reviews.apache.org/r/62478/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/57873eba
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/57873eba
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/57873eba

Branch: refs/heads/master
Commit: 57873ebacdad052024586cb28994e920aa7e6123
Parents: ff07dd8
Author: Anand Mazumdar <an...@apache.org>
Authored: Fri Sep 29 11:57:12 2017 -0700
Committer: Anand Mazumdar <an...@apache.org>
Committed: Fri Sep 29 13:31:08 2017 -0700

----------------------------------------------------------------------
 src/master/http.cpp       | 101 +++++++++++++++++++++++++++++++++++
 src/master/master.cpp     | 116 ++++++++++++++++++++++++++++++++++++++---
 src/master/master.hpp     |  17 ++++++
 src/master/validation.cpp |   6 +++
 4 files changed, 234 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/57873eba/src/master/http.cpp
----------------------------------------------------------------------
diff --git a/src/master/http.cpp b/src/master/http.cpp
index 28d0393..66a391f 100644
--- a/src/master/http.cpp
+++ b/src/master/http.cpp
@@ -779,6 +779,9 @@ Future<Response> Master::Http::api(
 
     case mesos::master::Call::TEARDOWN:
       return teardown(call, principal, acceptType);
+
+    case mesos::master::Call::MARK_AGENT_GONE:
+      return markAgentGone(call, principal, acceptType);
   }
 
   UNREACHABLE();
@@ -5286,6 +5289,104 @@ Future<Response> Master::Http::unreserveResources(
   return _unreserve(slaveId, resources, principal);
 }
 
+
+Future<Response> Master::Http::markAgentGone(
+    const mesos::master::Call& call,
+    const Option<Principal>& principal,
+    ContentType contentType) const
+{
+  CHECK_EQ(mesos::master::Call::MARK_AGENT_GONE, call.type());
+
+  const SlaveID& slaveId = call.mark_agent_gone().slave_id();
+
+  LOG(INFO) << "Marking agent '" << slaveId << "' as gone";
+
+  if (master->slaves.gone.contains(slaveId)) {
+    LOG(WARNING) << "Not marking agent '" << slaveId
+                 << "' as gone because it has already transitioned to gone";
+    return OK();
+  }
+
+  // We return a `ServiceUnavailable` (retryable error) if there is
+  // an ongoing registry transition to gone/removed/unreachable.
+  if (master->slaves.markingGone.contains(slaveId)) {
+    LOG(WARNING) << "Not marking agent '" << slaveId
+                 << "' as gone because another gone transition"
+                 << " is already in progress";
+
+    return ServiceUnavailable(
+        "Agent '" + stringify(slaveId) + "' is already being transitioned"
+        + " to gone");
+  }
+
+  if (master->slaves.removing.contains(slaveId)) {
+    LOG(WARNING) << "Not marking agent '" << slaveId
+                 << "' as gone because another remove transition"
+                 << " is already in progress";
+
+    return ServiceUnavailable(
+        "Agent '" + stringify(slaveId) + "' is being transitioned to removed");
+  }
+
+  if (master->slaves.markingUnreachable.contains(slaveId)) {
+    LOG(WARNING) << "Not marking agent '" << slaveId
+                 << "' as gone because another unreachable transition"
+                 << " is already in progress";
+
+    return ServiceUnavailable(
+        "Agent '" + stringify(slaveId) + "' is being transitioned to"
+        + " unreachable");
+  }
+
+  // We currently support marking an agent gone if the agent
+  // is present in the list of active, unreachable or recovered agents.
+  bool found = false;
+
+  if (master->slaves.registered.contains(slaveId)) {
+    found = true;
+  } else if(master->slaves.recovered.contains(slaveId)) {
+    found = true;
+  } else if (master->slaves.unreachable.contains(slaveId)) {
+    found = true;
+  }
+
+  if (!found) {
+    return NotFound("Agent '" + stringify(slaveId) + "' not found");
+  }
+
+  master->slaves.markingGone.insert(slaveId);
+
+  TimeInfo goneTime = protobuf::getCurrentTime();
+
+  Future<bool> gone = master->registrar->apply(Owned<Operation>(
+      new MarkSlaveGone(slaveId, goneTime)));
+
+  gone.onAny(defer(
+      master->self(), [this, slaveId, goneTime](Future<bool> registrarResult) {
+    CHECK(!registrarResult.isDiscarded());
+
+    if (registrarResult.isFailed()) {
+      LOG(FATAL) << "Failed to mark agent " << slaveId
+                 << " as gone in the registry: "
+                 << registrarResult.failure();
+    }
+
+    Slave* slave = master->slaves.registered.get(slaveId);
+
+    // This can happen if the agent that is being marked as
+    // gone is not currently registered (unreachable/recovered).
+    if (slave == nullptr) {
+      return;
+    }
+
+    master->markGone(slave, goneTime);
+  }));
+
+  return gone.then([]() -> Future<Response> {
+    return OK();
+  });
+}
+
 } // namespace master {
 } // namespace internal {
 } // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/57873eba/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index ee71c1d..0e0fb1d 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -1723,6 +1723,11 @@ Future<Nothing> Master::_recover(const Registry& registry)
     slaves.unreachable[unreachable.id()] = unreachable.timestamp();
   }
 
+  foreach (const Registry::GoneSlave& gone,
+           registry.gone().slaves()) {
+    slaves.gone[gone.id()] = gone.timestamp();
+  }
+
   // Set up a timer for age-based registry GC.
   scheduleRegistryGc();
 
@@ -2013,6 +2018,26 @@ Nothing Master::markUnreachableAfterFailover(const SlaveInfo& slave)
     return Nothing();
   }
 
+  if (slaves.markingGone.contains(slave.id())) {
+    LOG(INFO) << "Canceling transition of agent "
+              << slave.id() << " (" << slave.hostname() << ")"
+              << " to unreachable because an agent gone"
+              << " operation is in progress";
+
+    ++metrics->slave_unreachable_canceled;
+    return Nothing();
+  }
+
+  if (slaves.gone.contains(slave.id())) {
+    LOG(INFO) << "Canceling transition of agent "
+              << slave.id() << " (" << slave.hostname() << ")"
+              << " to unreachable because the agent has"
+              << " been marked gone";
+
+    ++metrics->slave_unreachable_canceled;
+    return Nothing();
+  }
+
   LOG(WARNING) << "Agent " << slave.id()
                << " (" << slave.hostname() << ") did not re-register"
                << " within " << flags.agent_reregister_timeout
@@ -6056,6 +6081,24 @@ void Master::reregisterSlave(
     return;
   }
 
+  if (slaves.markingGone.contains(slaveInfo.id())) {
+    LOG(INFO)
+      << "Ignoring re-register agent message from agent "
+      << slaveInfo.id() << " at " << from << " ("
+      << slaveInfo.hostname() << ") as a gone operation is already in progress";
+    return;
+  }
+
+  if (slaves.gone.contains(slaveInfo.id())) {
+    LOG(WARNING) << "Refusing re-registration of agent at " << from
+                 << " because it is already marked gone";
+
+    ShutdownMessage message;
+    message.set_message("Agent has been marked gone");
+    send(from, message);
+    return;
+  }
+
   Option<Error> error = validation::master::message::reregisterSlave(
       slaveInfo, tasks, checkpointedResources, executorInfos, frameworks);
 
@@ -7122,6 +7165,20 @@ void Master::markUnreachable(const SlaveID& slaveId, const string& message)
     return;
   }
 
+  if (slaves.markingGone.contains(slaveId)) {
+    LOG(INFO) << "Canceling transition of agent " << slaveId
+              << " to unreachable because an agent gone"
+              << " operation is in progress";
+    return;
+  }
+
+  if (slaves.gone.contains(slaveId)) {
+    LOG(INFO) << "Canceling transition of agent " << slaveId
+              << " to unreachable because the agent has"
+              << " been marked gone";
+    return;
+  }
+
   LOG(INFO) << "Marking agent " << *slave
             << " unreachable: " << message;
 
@@ -7181,6 +7238,23 @@ void Master::_markUnreachable(
 }
 
 
+void Master::markGone(Slave* slave, const TimeInfo& goneTime)
+{
+  CHECK_NOTNULL(slave);
+  CHECK(slaves.markingGone.contains(slave->info.id()));
+  slaves.markingGone.erase(slave->info.id());
+
+  slaves.gone[slave->id] = goneTime;
+
+  // Shutdown the agent if it transitioned to gone.
+  ShutdownMessage message;
+  message.set_message("Agent has been marked gone");
+  send(slave->pid, message);
+
+  __removeSlave(slave, "Agent has been marked gone", None());
+}
+
+
 void Master::reconcile(
     Framework* framework,
     const scheduler::Call::Reconcile& reconcile)
@@ -7312,9 +7386,10 @@ void Master::_reconcileTasks(
   //   (3) Task is unknown, slave is registered: TASK_UNKNOWN.
   //   (4) Task is unknown, slave is transitioning: no-op.
   //   (5) Task is unknown, slave is unreachable: TASK_UNREACHABLE.
-  //   (6) Task is unknown, slave is unknown: TASK_UNKNOWN.
+  //   (6) Task is unknown, slave is gone: TASK_GONE_BY_OPERATOR.
+  //   (7) Task is unknown, slave is unknown: TASK_UNKNOWN.
   //
-  // For cases (3), (5), and (6), TASK_LOST is sent instead if the
+  // For cases (3), (5), (6) and (7) TASK_LOST is sent instead if the
   // framework has not opted-in to the PARTITION_AWARE capability.
   foreach (const TaskStatus& status, statuses) {
     Option<SlaveID> slaveId = None();
@@ -7411,8 +7486,26 @@ void Master::_reconcileTasks(
           None(),
           None(),
           unreachableTime);
+    } else if (slaveId.isSome() && slaves.gone.contains(slaveId.get())) {
+      // (6) Slave is gone: TASK_GONE_BY_OPERATOR. If the framework
+      // does not have the PARTITION_AWARE capability, send TASK_LOST
+      // for backward compatibility.
+      TaskState taskState = TASK_GONE_BY_OPERATOR;
+      if (!framework->capabilities.partitionAware) {
+        taskState = TASK_LOST;
+      }
+
+      update = protobuf::createStatusUpdate(
+          framework->id(),
+          slaveId.get(),
+          status.task_id(),
+          taskState,
+          TaskStatus::SOURCE_MASTER,
+          None(),
+          "Reconciliation: Task is gone",
+          TaskStatus::REASON_RECONCILIATION);
     } else {
-      // (6) Task is unknown, slave is unknown: TASK_UNKNOWN. If the
+      // (7) Task is unknown, slave is unknown: TASK_UNKNOWN. If the
       // framework does not have the PARTITION_AWARE capability, send
       // TASK_LOST for backward compatibility.
       TaskState taskState = TASK_UNKNOWN;
@@ -8665,6 +8758,12 @@ void Master::removeSlave(
     return;
   }
 
+  if (slaves.markingGone.contains(slave->id)) {
+    LOG(WARNING) << "Ignoring removal of agent " << *slave
+                 << " that is in the process of being marked gone";
+    return;
+  }
+
   // This should not be possible, but we protect against it anyway for
   // the sake of paranoia.
   if (slaves.removing.contains(slave->id)) {
@@ -8837,8 +8936,8 @@ void Master::__removeSlave(
   // the slave is already removed.
   allocator->removeSlave(slave->id);
 
-  // Transition tasks to TASK_UNREACHABLE/TASK_LOST
-  // and remove them. We only use TASK_UNREACHABLE if
+  // Transition tasks to TASK_UNREACHABLE/TASK_GONE_BY_OPERATOR/TASK_LOST
+  // and remove them. We only use TASK_UNREACHABLE/TASK_GONE_BY_OPERATOR if
   // the framework has opted in to the PARTITION_AWARE capability.
   foreachkey (const FrameworkID& frameworkId, utils::copy(slave->tasks)) {
     Framework* framework = getFramework(frameworkId);
@@ -8850,7 +8949,12 @@ void Master::__removeSlave(
     if (!framework->capabilities.partitionAware) {
       newTaskState = TASK_LOST;
     } else {
-      newTaskState = TASK_UNREACHABLE;
+      if (unreachableTime.isSome()) {
+        newTaskState = TASK_UNREACHABLE;
+      } else {
+        newTaskState = TASK_GONE_BY_OPERATOR;
+        newTaskReason = TaskStatus::REASON_SLAVE_REMOVED_BY_OPERATOR;
+      }
     }
 
     foreachvalue (Task* task, utils::copy(slave->tasks[frameworkId])) {

http://git-wip-us.apache.org/repos/asf/mesos/blob/57873eba/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 8cbf940..a17378e 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -491,6 +491,8 @@ public:
       const SlaveID& slaveId,
       const std::string& message);
 
+  void markGone(Slave* slave, const TimeInfo& goneTime);
+
   void authenticate(
       const process::UPID& from,
       const process::UPID& pid);
@@ -1665,6 +1667,11 @@ private:
         const Option<process::http::authentication::Principal>& principal,
         ContentType contentType) const;
 
+    process::Future<process::http::Response> markAgentGone(
+        const mesos::master::Call& call,
+        const Option<process::http::authentication::Principal>& principal,
+        ContentType contentType) const;
+
     Master* master;
 
     // NOTE: The quota specific pieces of the Operator API are factored
@@ -1829,6 +1836,9 @@ private:
     // Slaves that are in the process of being marked unreachable.
     hashset<SlaveID> markingUnreachable;
 
+    // Slaves that are in the process of being marked gone.
+    hashset<SlaveID> markingGone;
+
     // This collection includes agents that have gracefully shutdown,
     // as well as those that have been marked unreachable. We keep a
     // cache here to prevent this from growing in an unbounded manner.
@@ -1849,6 +1859,13 @@ private:
     // `registry_max_agent_age`, and `registry_max_agent_count` flags.
     LinkedHashMap<SlaveID, TimeInfo> unreachable;
 
+    // Slaves that have been marked gone. We recover this from the
+    // registry, so it includes slaves marked as gone by other instances
+    // of the master. Note that we use a LinkedHashMap to ensure the order
+    // of elements here matches the order in the registry's gone list, which
+    // matches the order in which agents are marked gone.
+    LinkedHashMap<SlaveID, TimeInfo> gone;
+
     // This rate limiter is used to limit the removal of slaves failing
     // health checks.
     // NOTE: Using a 'shared_ptr' here is OK because 'RateLimiter' is

http://git-wip-us.apache.org/repos/asf/mesos/blob/57873eba/src/master/validation.cpp
----------------------------------------------------------------------
diff --git a/src/master/validation.cpp b/src/master/validation.cpp
index a6e6a90..01bc2e0 100644
--- a/src/master/validation.cpp
+++ b/src/master/validation.cpp
@@ -228,6 +228,12 @@ Option<Error> validate(
         return Error("Expecting 'teardown' to be present");
       }
       return None();
+
+    case mesos::master::Call::MARK_AGENT_GONE:
+      if (!call.has_mark_agent_gone()) {
+        return Error("Expecting 'mark_agent_gone' to be present");
+      }
+      return None();
   }
 
   UNREACHABLE();


[10/11] mesos git commit: Added authorization for 'MARK_AGENT_GONE' call.

Posted by an...@apache.org.
Added authorization for 'MARK_AGENT_GONE' call.

This change adds the relevant ACL's for doing AuthZ (any or none
access).

Review: https://reviews.apache.org/r/62531/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/11dbb52a
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/11dbb52a
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/11dbb52a

Branch: refs/heads/master
Commit: 11dbb52a6b9339e1ad243f00c92601e108499997
Parents: 6001e8a
Author: Anand Mazumdar <an...@apache.org>
Authored: Fri Sep 29 11:58:41 2017 -0700
Committer: Anand Mazumdar <an...@apache.org>
Committed: Fri Sep 29 13:31:08 2017 -0700

----------------------------------------------------------------------
 include/mesos/authorizer/acls.proto       | 11 +++++
 include/mesos/authorizer/authorizer.proto |  4 ++
 src/authorizer/local/authorizer.cpp       | 15 +++++++
 src/master/http.cpp                       | 29 +++++++++++++
 src/master/master.hpp                     |  3 ++
 src/tests/api_tests.cpp                   | 57 ++++++++++++++++++++++++++
 6 files changed, 119 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/11dbb52a/include/mesos/authorizer/acls.proto
----------------------------------------------------------------------
diff --git a/include/mesos/authorizer/acls.proto b/include/mesos/authorizer/acls.proto
index 9109283..587b714 100644
--- a/include/mesos/authorizer/acls.proto
+++ b/include/mesos/authorizer/acls.proto
@@ -404,6 +404,16 @@ message ACL {
     // access.
     required Entity machines = 2;
   }
+
+  // Which principals are authorized to mark an agent as gone.
+  message MarkAgentGone {
+    // Subjects: HTTP Username.
+    required Entity principals = 1;
+
+    // Objects: Given implicitly. Use Entity type ANY or NONE to allow or deny
+    // access.
+    required Entity agents = 2;
+  }
 }
 
 
@@ -474,4 +484,5 @@ message ACLs {
   repeated ACL.StartMaintenance start_maintenances = 37;
   repeated ACL.StopMaintenance stop_maintenances = 38;
   repeated ACL.GetMaintenanceStatus get_maintenance_statuses = 39;
+  repeated ACL.MarkAgentGone mark_agents_gone = 40;
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/11dbb52a/include/mesos/authorizer/authorizer.proto
----------------------------------------------------------------------
diff --git a/include/mesos/authorizer/authorizer.proto b/include/mesos/authorizer/authorizer.proto
index 38f0e0b..87a8057 100644
--- a/include/mesos/authorizer/authorizer.proto
+++ b/include/mesos/authorizer/authorizer.proto
@@ -213,6 +213,10 @@ enum Action {
 
   // This action will set objects of type `MachineID`.
   GET_MAINTENANCE_STATUS = 33;
+
+  // This action will not fill in any object fields, since a principal is
+  // either allowed to mark an agent as gone or is unauthorized.
+  MARK_AGENT_GONE = 34;
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/11dbb52a/src/authorizer/local/authorizer.cpp
----------------------------------------------------------------------
diff --git a/src/authorizer/local/authorizer.cpp b/src/authorizer/local/authorizer.cpp
index 82ae846..2fe7b87 100644
--- a/src/authorizer/local/authorizer.cpp
+++ b/src/authorizer/local/authorizer.cpp
@@ -398,6 +398,7 @@ public:
           break;
         case authorization::GET_MAINTENANCE_SCHEDULE:
         case authorization::GET_MAINTENANCE_STATUS:
+        case authorization::MARK_AGENT_GONE:
         case authorization::REGISTER_AGENT:
         case authorization::SET_LOG_LEVEL:
         case authorization::START_MAINTENANCE:
@@ -666,6 +667,7 @@ public:
         case authorization::KILL_NESTED_CONTAINER:
         case authorization::LAUNCH_NESTED_CONTAINER:
         case authorization::LAUNCH_NESTED_CONTAINER_SESSION:
+        case authorization::MARK_AGENT_GONE:
         case authorization::REGISTER_AGENT:
         case authorization::REMOVE_NESTED_CONTAINER:
         case authorization::RUN_TASK:
@@ -876,6 +878,7 @@ public:
       case authorization::KILL_NESTED_CONTAINER:
       case authorization::LAUNCH_NESTED_CONTAINER:
       case authorization::LAUNCH_NESTED_CONTAINER_SESSION:
+      case authorization::MARK_AGENT_GONE:
       case authorization::REGISTER_AGENT:
       case authorization::REMOVE_NESTED_CONTAINER:
       case authorization::RUN_TASK:
@@ -1040,6 +1043,7 @@ public:
       case authorization::GET_MAINTENANCE_SCHEDULE:
       case authorization::GET_MAINTENANCE_STATUS:
       case authorization::KILL_NESTED_CONTAINER:
+      case authorization::MARK_AGENT_GONE:
       case authorization::REGISTER_AGENT:
       case authorization::REMOVE_NESTED_CONTAINER:
       case authorization::RUN_TASK:
@@ -1333,6 +1337,17 @@ private:
         }
 
         return acls_;
+      case authorization::MARK_AGENT_GONE:
+        foreach (const ACL::MarkAgentGone& acl,
+                 acls.mark_agents_gone()) {
+          GenericACL acl_;
+          acl_.subjects = acl.principals();
+          acl_.objects = acl.agents();
+
+          acls_.push_back(acl_);
+        }
+
+        return acls_;
       case authorization::REGISTER_FRAMEWORK:
       case authorization::CREATE_VOLUME:
       case authorization::RESERVE_RESOURCES:

http://git-wip-us.apache.org/repos/asf/mesos/blob/11dbb52a/src/master/http.cpp
----------------------------------------------------------------------
diff --git a/src/master/http.cpp b/src/master/http.cpp
index 66a391f..42139be 100644
--- a/src/master/http.cpp
+++ b/src/master/http.cpp
@@ -5297,8 +5297,37 @@ Future<Response> Master::Http::markAgentGone(
 {
   CHECK_EQ(mesos::master::Call::MARK_AGENT_GONE, call.type());
 
+  Future<Owned<ObjectApprover>> approver;
+
+  if (master->authorizer.isSome()) {
+    Option<authorization::Subject> subject = createSubject(principal);
+
+    approver = master->authorizer.get()->getObjectApprover(
+        subject, authorization::MARK_AGENT_GONE);
+  } else {
+    approver = Owned<ObjectApprover>(new AcceptingObjectApprover());
+  }
+
   const SlaveID& slaveId = call.mark_agent_gone().slave_id();
 
+  return approver.then(defer(master->self(),
+      [this, slaveId](const Owned<ObjectApprover>& approver)
+          -> Future<Response> {
+    Try<bool> approved = approver->approved((ObjectApprover::Object()));
+
+    if (approved.isError()) {
+      return InternalServerError("Authorization error: " + approved.error());
+    } else if (!approved.get()) {
+      return Forbidden();
+    }
+
+    return _markAgentGone(slaveId);
+  }));
+}
+
+
+Future<Response> Master::Http::_markAgentGone(const SlaveID& slaveId) const
+{
   LOG(INFO) << "Marking agent '" << slaveId << "' as gone";
 
   if (master->slaves.gone.contains(slaveId)) {

http://git-wip-us.apache.org/repos/asf/mesos/blob/11dbb52a/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index a17378e..23b864d 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -1672,6 +1672,9 @@ private:
         const Option<process::http::authentication::Principal>& principal,
         ContentType contentType) const;
 
+    process::Future<process::http::Response> _markAgentGone(
+        const SlaveID& slaveId) const;
+
     Master* master;
 
     // NOTE: The quota specific pieces of the Operator API are factored

http://git-wip-us.apache.org/repos/asf/mesos/blob/11dbb52a/src/tests/api_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/api_tests.cpp b/src/tests/api_tests.cpp
index 9e2945c..3d0db3b 100644
--- a/src/tests/api_tests.cpp
+++ b/src/tests/api_tests.cpp
@@ -3583,6 +3583,63 @@ TEST_P_TEMP_DISABLED_ON_WINDOWS(MasterAPITest, MarkRegisteredAgentGone)
 }
 
 
+// This test verifies that unauthorized principals are unable to
+// mark agents as gone.
+TEST_P_TEMP_DISABLED_ON_WINDOWS(MasterAPITest,
+                                MarkRegisteredAgentGoneUnauthorized)
+{
+  master::Flags masterFlags = CreateMasterFlags();
+
+  {
+    // Default principal is not allowed to mark agents as gone.
+    mesos::ACL::MarkAgentGone* acl =
+      masterFlags.acls.get().add_mark_agents_gone();
+
+    acl->mutable_principals()->add_values(DEFAULT_CREDENTIAL.principal());
+    acl->mutable_agents()->set_type(mesos::ACL::Entity::NONE);
+  }
+
+  Try<Owned<cluster::Master>> master = this->StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+
+  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), master.get()->pid, _);
+
+  slave::Flags slaveFlags = CreateSlaveFlags();
+
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags);
+  ASSERT_SOME(slave);
+
+  // Ensure that the agent is registered successfully with the master
+  // before marking it as gone.
+  AWAIT_READY(slaveRegisteredMessage);
+
+  // Mark the agent as gone. This should fail due to an authorization error.
+
+  ContentType contentType = GetParam();
+
+  v1::master::Call v1Call;
+  v1Call.set_type(v1::master::Call::MARK_AGENT_GONE);
+
+  v1::master::Call::MarkAgentGone* markAgentGone =
+    v1Call.mutable_mark_agent_gone();
+
+  markAgentGone->mutable_agent_id()->CopyFrom(
+      evolve(slaveRegisteredMessage->slave_id()));
+
+  Future<http::Response> response = http::post(
+      master.get()->pid,
+      "api/v1",
+      createBasicAuthHeaders(DEFAULT_CREDENTIAL),
+      serialize(contentType, v1Call),
+      stringify(contentType));
+
+  AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::Forbidden().status, response);
+}
+
+
 // This test verifies that the master correctly sends 'TASK_GONE_BY_OPERATOR'
 // status updates when an agent running the tasks is marked as gone.
 //


[07/11] mesos git commit: Removed the logic for removing the latest symlink on the agent.

Posted by an...@apache.org.
Removed the logic for removing the latest symlink on the agent.

This change removes the logic of removing the latest symlink on
receiving the shutdown message from the Mesos master. This ensures
that agents come back with the same agent ID upon a successful
shutdown similar to the behavior when they come back post a reboot.

Review: https://reviews.apache.org/r/62479/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3da4f8a6
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3da4f8a6
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3da4f8a6

Branch: refs/heads/master
Commit: 3da4f8a6525a903187f25489d699c46af6120d3c
Parents: 57873eb
Author: Anand Mazumdar <an...@apache.org>
Authored: Fri Sep 29 11:57:50 2017 -0700
Committer: Anand Mazumdar <an...@apache.org>
Committed: Fri Sep 29 13:31:08 2017 -0700

----------------------------------------------------------------------
 src/slave/slave.cpp | 9 ---------
 1 file changed, 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/3da4f8a6/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 75e2e25..bf85baf 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -886,15 +886,6 @@ void Slave::finalize()
       shutdownFramework(UPID(), frameworkId);
     }
   }
-
-  if (state == TERMINATING) {
-    // We remove the "latest" symlink in meta directory, so that the
-    // slave doesn't recover the state when it restarts and registers
-    // as a new slave with the master.
-    if (os::exists(paths::getLatestSlavePath(metaDir))) {
-      CHECK_SOME(os::rm(paths::getLatestSlavePath(metaDir)));
-    }
-  }
 }
 
 


[09/11] mesos git commit: Added tests for the agent gone operation.

Posted by an...@apache.org.
Added tests for the agent gone operation.

This change adds tests for marking an active agent as gone and also
another test for ensuring that running task on an agent marked as gone
are correctly transitioned to 'TASK_GONE_BY_OPERATOR'.

Review: https://reviews.apache.org/r/62480/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b491239f
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b491239f
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b491239f

Branch: refs/heads/master
Commit: b491239f1b177c1e5ca70ddeb302f4c388debe93
Parents: 3da4f8a
Author: Anand Mazumdar <an...@apache.org>
Authored: Fri Sep 29 11:57:59 2017 -0700
Committer: Anand Mazumdar <an...@apache.org>
Committed: Fri Sep 29 13:31:08 2017 -0700

----------------------------------------------------------------------
 src/tests/api_tests.cpp | 195 ++++++++++++++++++++++++++++++++++++++++++-
 1 file changed, 194 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/b491239f/src/tests/api_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/api_tests.cpp b/src/tests/api_tests.cpp
index d260a1c..9e2945c 100644
--- a/src/tests/api_tests.cpp
+++ b/src/tests/api_tests.cpp
@@ -355,7 +355,7 @@ TEST_P(MasterAPITest, GetExecutors)
   ASSERT_SOME(slave);
 
   AWAIT_READY(slaveRegisteredMessage);
-  SlaveID slaveId = slaveRegisteredMessage->slave_id();
+  const SlaveID& slaveId = slaveRegisteredMessage->slave_id();
 
   MockScheduler sched;
   MesosSchedulerDriver driver(
@@ -3516,6 +3516,199 @@ TEST_P(MasterAPITest, Teardown)
 }
 
 
+// This test verifies that a registered agent can be marked as gone and
+// shutdown by the master subsequently. Upon restarting the agent, it
+// should not be able to reregister with the master.
+//
+// TODO(andschwa): Enable this when MESOS-7604 is fixed.
+TEST_P_TEMP_DISABLED_ON_WINDOWS(MasterAPITest, MarkRegisteredAgentGone)
+{
+  master::Flags masterFlags = CreateMasterFlags();
+
+  Try<Owned<cluster::Master>> master = this->StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+
+  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), master.get()->pid, _);
+
+  slave::Flags slaveFlags = CreateSlaveFlags();
+
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags);
+  ASSERT_SOME(slave);
+
+  // Ensure that the agent is registered successfully with the master
+  // before marking it as gone.
+  AWAIT_READY(slaveRegisteredMessage);
+
+  // Mark the agent as gone. This should result in the agent being shutdown.
+
+  Future<ShutdownMessage> shutdownMessage =
+    FUTURE_PROTOBUF(ShutdownMessage(), master.get()->pid, _);
+
+  ContentType contentType = GetParam();
+
+  v1::master::Call v1Call;
+  v1Call.set_type(v1::master::Call::MARK_AGENT_GONE);
+
+  v1::master::Call::MarkAgentGone* markAgentGone =
+    v1Call.mutable_mark_agent_gone();
+
+  markAgentGone->mutable_agent_id()->CopyFrom(
+      evolve(slaveRegisteredMessage->slave_id()));
+
+  Future<http::Response> response = http::post(
+      master.get()->pid,
+      "api/v1",
+      createBasicAuthHeaders(DEFAULT_CREDENTIAL),
+      serialize(contentType, v1Call),
+      stringify(contentType));
+
+  AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response);
+  AWAIT_READY(shutdownMessage);
+
+  // The agent should not be able to re-register with
+  // the master upon restart.
+
+  slave.get()->terminate();
+  slave->reset();
+
+  Future<ShutdownMessage> shutdownMessage2 =
+    FUTURE_PROTOBUF(ShutdownMessage(), master.get()->pid, _);
+
+  slave = StartSlave(detector.get(), slaveFlags);
+
+  AWAIT_READY(shutdownMessage2);
+}
+
+
+// This test verifies that the master correctly sends 'TASK_GONE_BY_OPERATOR'
+// status updates when an agent running the tasks is marked as gone.
+//
+// TODO(andschwa): Enable this when MESOS-7604 is fixed.
+TEST_P_TEMP_DISABLED_ON_WINDOWS(MasterAPITest, TaskUpdatesUponAgentGone)
+{
+  master::Flags masterFlags = CreateMasterFlags();
+
+  Try<Owned<cluster::Master>> master = this->StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  MockExecutor exec(DEFAULT_EXECUTOR_ID);
+  TestContainerizer containerizer(&exec);
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+
+  slave::Flags slaveFlags = CreateSlaveFlags();
+
+  Try<Owned<cluster::Slave>> slave =
+    StartSlave(detector.get(), &containerizer, slaveFlags);
+
+  ASSERT_SOME(slave);
+
+  FrameworkInfo framework = DEFAULT_FRAMEWORK_INFO;
+  framework.add_capabilities()->set_type(
+      FrameworkInfo::Capability::PARTITION_AWARE);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, framework, master.get()->pid, DEFAULT_CREDENTIAL);
+
+  Future<Nothing> registered;
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .WillOnce(FutureSatisfy(&registered));
+
+  Future<vector<Offer>> offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers));
+
+  driver.start();
+
+  AWAIT_READY(registered);
+
+  AWAIT_READY(offers);
+  ASSERT_FALSE(offers->empty());
+
+  TaskInfo task = createTask(offers.get()[0], "", DEFAULT_EXECUTOR_ID);
+
+  EXPECT_CALL(exec, registered(_, _, _, _));
+
+  EXPECT_CALL(exec, launchTask(_, _))
+    .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+
+  Future<TaskStatus> update;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&update));
+
+  driver.launchTasks(offers.get()[0].id(), {task});
+
+  AWAIT_READY(update);
+  ASSERT_EQ(TASK_RUNNING, update->state());
+
+  Future<TaskStatus> update2;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&update2));
+
+  EXPECT_CALL(sched, slaveLost(&driver, _))
+    .Times(AtMost(1));
+
+  EXPECT_CALL(exec, shutdown(_))
+    .Times(AtMost(1));
+
+  // Mark the agent as gone. This should result in the master sending
+  // a 'TASK_GONE_BY_OPERATOR' update for the running task.
+
+  ContentType contentType = GetParam();
+
+  SlaveID slaveId = offers.get()[0].slave_id();
+
+  v1::master::Call v1Call;
+  v1Call.set_type(v1::master::Call::MARK_AGENT_GONE);
+
+  v1::master::Call::MarkAgentGone* markAgentGone =
+    v1Call.mutable_mark_agent_gone();
+
+  markAgentGone->mutable_agent_id()->CopyFrom(evolve(slaveId));
+
+  Future<http::Response> response = http::post(
+      master.get()->pid,
+      "api/v1",
+      createBasicAuthHeaders(DEFAULT_CREDENTIAL),
+      serialize(contentType, v1Call),
+      stringify(contentType));
+
+  AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response);
+
+  AWAIT_READY(update2);
+
+  ASSERT_EQ(TASK_GONE_BY_OPERATOR, update2->state());
+  ASSERT_EQ(TaskStatus::REASON_SLAVE_REMOVED_BY_OPERATOR, update2->reason());
+
+  // Performing reconciliation for an unknown task on the gone agent should
+  // result in a 'TASK_GONE_BY_OPERATOR' update.
+
+  Future<TaskStatus> update3;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&update3));
+
+  vector<TaskStatus> statuses;
+
+  TaskStatus status;
+  status.mutable_slave_id()->CopyFrom(slaveId);
+  status.mutable_task_id()->set_value("dummy-task");
+
+  statuses.push_back(status);
+
+  driver.reconcileTasks(statuses);
+
+  AWAIT_READY(update3);
+  ASSERT_EQ(TASK_GONE_BY_OPERATOR, update3->state());
+
+  driver.stop();
+  driver.join();
+}
+
+
 class AgentAPITest
   : public MesosTest,
     public WithParamInterface<ContentType>


[02/11] mesos git commit: Added documentation about 'MARK_AGENT_GONE' call.

Posted by an...@apache.org.
Added documentation about 'MARK_AGENT_GONE' call.

Review: https://reviews.apache.org/r/62529


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/6001e8ae
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/6001e8ae
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/6001e8ae

Branch: refs/heads/master
Commit: 6001e8aefed87864b6ab289cbacb9dcf394fe538
Parents: c0468b2
Author: Anand Mazumdar <an...@apache.org>
Authored: Fri Sep 29 11:58:34 2017 -0700
Committer: Anand Mazumdar <an...@apache.org>
Committed: Fri Sep 29 13:31:08 2017 -0700

----------------------------------------------------------------------
 docs/operator-http-api.md | 35 +++++++++++++++++++++++++++++++++++
 1 file changed, 35 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/6001e8ae/docs/operator-http-api.md
----------------------------------------------------------------------
diff --git a/docs/operator-http-api.md b/docs/operator-http-api.md
index f6cfcf1..37593b3 100644
--- a/docs/operator-http-api.md
+++ b/docs/operator-http-api.md
@@ -2324,6 +2324,41 @@ HTTP/1.1 202 Accepted
 
 ```
 
+### MARK_AGENT_GONE
+
+This call can be used by operators to assert that an agent instance has
+failed and is never coming back (e.g., ephemeral instance from cloud provider).
+The master would shutdown the agent and send `TASK_GONE_BY_OPERATOR` updates
+for all the running tasks. This signal can be used by stateful frameworks to
+re-schedule their workloads (volumes, reservations etc.) to other agent
+instances. It is possible that the tasks might still be running if the
+operator's assertion was wrong and the agent was partitioned away from
+the master. The agent would be shutdown when it tries to re-register with the
+master when the partition heals. This call is idempotent.
+
+```
+MARK_AGENT_GONE HTTP Request (JSON):
+
+POST /api/v1  HTTP/1.1
+
+Host: masterhost:5050
+Content-Type: application/json
+Accept: application/json
+
+{
+  "type": "MARK_AGENT_GONE",
+  "mark_agent_gone": {
+    "agent_id": {
+      "value": "3192b9d1-db71-4699-ae25-e28dfbf42de1"
+    }
+  }
+}
+
+MARK_AGENT_GONE HTTP Response (JSON):
+
+HTTP/1.1 200 OK
+```
+
 ## Events
 
 Currently, the only call that results in a streaming response is the `SUBSCRIBE` call sent to the master API.