You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by gr...@apache.org on 2017/08/18 21:53:01 UTC

mesos git commit: Added a heartbeat event to the operator API.

Repository: mesos
Updated Branches:
  refs/heads/master 55678b41f -> 4d0e692e7


Added a heartbeat event to the operator API.

Added the 'HEARTBEAT' event for the operator API and modified
related test cases to accept heartbeats.

Review: https://reviews.apache.org/r/61262/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/4d0e692e
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/4d0e692e
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/4d0e692e

Branch: refs/heads/master
Commit: 4d0e692e72e56e827d2136b78311db0b72ada6a2
Parents: 55678b4
Author: Quinn Leng <qu...@gmail.com>
Authored: Fri Aug 18 14:50:58 2017 -0700
Committer: Greg Mann <gr...@gmail.com>
Committed: Fri Aug 18 14:51:11 2017 -0700

----------------------------------------------------------------------
 include/mesos/master/master.proto    |   9 ++
 include/mesos/v1/master/master.proto |   9 ++
 src/master/http.cpp                  |   4 +
 src/master/master.hpp                | 133 ++++++++++++++++++++----------
 src/tests/api_tests.cpp              |  91 ++++++++++++++++++++
 5 files changed, 201 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/4d0e692e/include/mesos/master/master.proto
----------------------------------------------------------------------
diff --git a/include/mesos/master/master.proto b/include/mesos/master/master.proto
index fc5bd89..7dc5881 100644
--- a/include/mesos/master/master.proto
+++ b/include/mesos/master/master.proto
@@ -484,6 +484,15 @@ message Event {
     FRAMEWORK_UPDATED = 7; // See `FrameworkUpdated` below.
     FRAMEWORK_REMOVED = 8; // See `FrameworkRemoved` below.
 
+    // Periodic message sent by the master to the subscriber according to
+    // 'Subscribed.heartbeat_interval_seconds'. If the subscriber does not
+    // receive any events (including heartbeats) for an extended period of
+    // time (e.g., 5 x heartbeat_interval_seconds), it is likely that the
+    // connection is lost or there is a network partition. In that case,
+    // the subscriber should close the existing subscription connection and
+    // resubscribe using a backoff strategy.
+    HEARTBEAT = 9;
+
     // TODO(vinod): Fill in more events.
   }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/4d0e692e/include/mesos/v1/master/master.proto
----------------------------------------------------------------------
diff --git a/include/mesos/v1/master/master.proto b/include/mesos/v1/master/master.proto
index 4ded0a8..db19c5c 100644
--- a/include/mesos/v1/master/master.proto
+++ b/include/mesos/v1/master/master.proto
@@ -482,6 +482,15 @@ message Event {
     FRAMEWORK_UPDATED = 7; // See `FrameworkUpdated` below.
     FRAMEWORK_REMOVED = 8; // See `FrameworkRemoved` below.
 
+    // Periodic message sent by the master to the subscriber according to
+    // 'Subscribed.heartbeat_interval_seconds'. If the subscriber does not
+    // receive any events (including heartbeats) for an extended period of
+    // time (e.g., 5 x heartbeat_interval_seconds), it is likely that the
+    // connection is lost or there is a network partition. In that case,
+    // the subscriber should close the existing subscription connection and
+    // resubscribe using a backoff strategy.
+    HEARTBEAT = 9;
+
     // TODO(vinod): Fill in more events.
   }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/4d0e692e/src/master/http.cpp
----------------------------------------------------------------------
diff --git a/src/master/http.cpp b/src/master/http.cpp
index 5654e7e..b09a9d0 100644
--- a/src/master/http.cpp
+++ b/src/master/http.cpp
@@ -855,6 +855,10 @@ Future<Response> Master::Http::subscribe(
 
           http.send<mesos::master::Event, v1::master::Event>(event);
 
+          mesos::master::Event heartbeatEvent;
+          heartbeatEvent.set_type(mesos::master::Event::HEARTBEAT);
+          http.send<mesos::master::Event, v1::master::Event>(heartbeatEvent);
+
           return ok;
     }));
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/4d0e692e/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index b802fd1..d9cfc42 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -306,6 +306,63 @@ struct HttpConnection
 };
 
 
+// This process periodically sends heartbeats to a given HTTP connection.
+// The `Message` template parameter is the type of the heartbeat event passed
+// into the heartbeater during construction, while the `Event` template
+// parameter is the versioned event type which is sent to the client.
+// The optional delay parameter is used to specify the delay period before it
+// sends the first heartbeat.
+template <typename Message, typename Event>
+class Heartbeater : public process::Process<Heartbeater<Message, Event>>
+{
+public:
+  Heartbeater(const std::string& _logMessage,
+              const Message& _heartbeatMessage,
+              const HttpConnection& _http,
+              const Duration& _interval,
+              const Option<Duration>& _delay = None())
+    : process::ProcessBase(process::ID::generate("heartbeater")),
+      logMessage(_logMessage),
+      heartbeatMessage(_heartbeatMessage),
+      http(_http),
+      interval(_interval),
+      delay(_delay) {}
+
+protected:
+  virtual void initialize() override
+  {
+    if (delay.isSome()) {
+      process::delay(
+          delay.get(),
+          this,
+          &Heartbeater<Message, Event>::heartbeat);
+    } else {
+      heartbeat();
+    }
+  }
+
+private:
+  void heartbeat()
+  {
+    // Only send a heartbeat if the connection is not closed.
+    if (http.closed().isPending()) {
+      VLOG(1) << "Sending heartbeat to " << logMessage;
+
+      Message message(heartbeatMessage);
+      http.send<Message, Event>(message);
+    }
+
+    process::delay(interval, this, &Heartbeater<Message, Event>::heartbeat);
+  }
+
+  const std::string logMessage;
+  const Message heartbeatMessage;
+  HttpConnection http;
+  const Duration interval;
+  const Option<Duration> delay;
+};
+
+
 class Master : public ProtobufProcess<Master>
 {
 public:
@@ -1829,8 +1886,22 @@ private:
     // might only be interested in a subset of events.
     struct Subscriber
     {
-      Subscriber(const HttpConnection& _http)
-        : http(_http) {}
+      Subscriber(const HttpConnection& _http) : http(_http)
+      {
+        mesos::master::Event event;
+        event.set_type(mesos::master::Event::HEARTBEAT);
+
+        heartbeater =
+          process::Owned<Heartbeater<mesos::master::Event, v1::master::Event>>(
+              new Heartbeater<mesos::master::Event, v1::master::Event>(
+                  "subscriber " + stringify(http.streamId),
+                  event,
+                  http,
+                  DEFAULT_HEARTBEAT_INTERVAL,
+                  DEFAULT_HEARTBEAT_INTERVAL));
+
+        process::spawn(heartbeater.get());
+      }
 
       // Not copyable, not assignable.
       Subscriber(const Subscriber&) = delete;
@@ -1843,9 +1914,14 @@ private:
         // after passing ownership to the `Subscriber` object. See MESOS-5843
         // for more details.
         http.close();
+
+        terminate(heartbeater.get());
+        wait(heartbeater.get());
       }
 
       HttpConnection http;
+      process::Owned<Heartbeater<mesos::master::Event, v1::master::Event>>
+        heartbeater;
     };
 
     // Sends the event to all subscribers connected to the 'api/vX' endpoint.
@@ -2190,47 +2266,6 @@ inline std::ostream& operator<<(
     const Framework& framework);
 
 
-// This process periodically sends heartbeats to a scheduler on the
-// given HTTP connection.
-class Heartbeater : public process::Process<Heartbeater>
-{
-public:
-  Heartbeater(const FrameworkID& _frameworkId,
-              const HttpConnection& _http,
-              const Duration& _interval)
-    : process::ProcessBase(process::ID::generate("heartbeater")),
-      frameworkId(_frameworkId),
-      http(_http),
-      interval(_interval) {}
-
-protected:
-  virtual void initialize() override
-  {
-    heartbeat();
-  }
-
-private:
-  void heartbeat()
-  {
-    // Only send a heartbeat if the connection is not closed.
-    if (http.closed().isPending()) {
-      VLOG(1) << "Sending heartbeat to " << frameworkId;
-
-      scheduler::Event event;
-      event.set_type(scheduler::Event::HEARTBEAT);
-
-      http.send(event);
-    }
-
-    process::delay(interval, self(), &Self::heartbeat);
-  }
-
-  const FrameworkID frameworkId;
-  HttpConnection http;
-  const Duration interval;
-};
-
-
 // TODO(bmahler): Keeping the task and executor information in sync
 // across the Slave and Framework structs is error prone!
 struct Framework
@@ -2717,8 +2752,15 @@ struct Framework
 
     // TODO(vinod): Make heartbeat interval configurable and include
     // this information in the SUBSCRIBED response.
+    scheduler::Event event;
+    event.set_type(scheduler::Event::HEARTBEAT);
+
     heartbeater =
-      new Heartbeater(info.id(), http.get(), DEFAULT_HEARTBEAT_INTERVAL);
+      new Heartbeater<scheduler::Event, v1::scheduler::Event>(
+          "framework " + stringify(info.id()),
+          event,
+          http.get(),
+          DEFAULT_HEARTBEAT_INTERVAL);
 
     process::spawn(heartbeater.get().get());
   }
@@ -2819,7 +2861,8 @@ struct Framework
   hashmap<SlaveID, Resources> offeredResources;
 
   // This is only set for HTTP frameworks.
-  Option<process::Owned<Heartbeater>> heartbeater;
+  Option<process::Owned<Heartbeater<scheduler::Event, v1::scheduler::Event>>>
+    heartbeater;
 
 private:
   Framework(Master* const _master,

http://git-wip-us.apache.org/repos/asf/mesos/blob/4d0e692e/src/tests/api_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/api_tests.cpp b/src/tests/api_tests.cpp
index 3ab4740..34480ea 100644
--- a/src/tests/api_tests.cpp
+++ b/src/tests/api_tests.cpp
@@ -73,6 +73,8 @@ using mesos::slave::ContainerTermination;
 using mesos::internal::devolve;
 using mesos::internal::evolve;
 
+using mesos::internal::master::DEFAULT_HEARTBEAT_INTERVAL;
+
 using mesos::internal::recordio::Reader;
 
 using mesos::internal::slave::Fetcher;
@@ -1590,6 +1592,12 @@ TEST_P(MasterAPITest, SubscribeAgentEvents)
   EXPECT_TRUE(getState.get_tasks().tasks().empty());
   EXPECT_TRUE(getState.get_executors().executors().empty());
 
+  event = decoder.read();
+
+  AWAIT_READY(event);
+
+  EXPECT_EQ(v1::master::Event::HEARTBEAT, event->get().type());
+
   // Start one agent.
   Future<SlaveRegisteredMessage> agentRegisteredMessage =
     FUTURE_PROTOBUF(SlaveRegisteredMessage(), master.get()->pid, _);
@@ -2089,6 +2097,12 @@ TEST_P(MasterAPITest, Subscribe)
   EXPECT_TRUE(getState.get_executors().executors().empty());
 
   event = decoder.read();
+
+  AWAIT_READY(event);
+
+  EXPECT_EQ(v1::master::Event::HEARTBEAT, event->get().type());
+
+  event = decoder.read();
   EXPECT_TRUE(event.isPending());
 
   const v1::Offer& offer = offers->offers(0);
@@ -2223,6 +2237,12 @@ TEST_P(MasterAPITest, FrameworksEvent)
   EXPECT_TRUE(getState.get_executors().executors().empty());
 
   event = decoder.read();
+
+  AWAIT_READY(event);
+
+  EXPECT_EQ(v1::master::Event::HEARTBEAT, event->get().type());
+
+  event = decoder.read();
   EXPECT_TRUE(event.isPending());
 
   // Start a scheduler. The subscriber will receive a 'FRAMEWORK_ADDED' event
@@ -2358,6 +2378,77 @@ TEST_P(MasterAPITest, FrameworksEvent)
 }
 
 
+// Verifies that 'HEARTBEAT' events are sent at the correct times.
+TEST_P(MasterAPITest, Heartbeat)
+{
+  ContentType contentType = GetParam();
+
+  Try<Owned<cluster::Master>> master = this->StartMaster();
+  ASSERT_SOME(master);
+
+  v1::master::Call v1Call;
+  v1Call.set_type(v1::master::Call::SUBSCRIBE);
+
+  http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+
+  headers["Accept"] = stringify(contentType);
+
+  Future<http::Response> response = http::streaming::post(
+      master.get()->pid,
+      "api/v1",
+      headers,
+      serialize(contentType, v1Call),
+      stringify(contentType));
+
+  AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response);
+  AWAIT_EXPECT_RESPONSE_HEADER_EQ("chunked", "Transfer-Encoding", response);
+  ASSERT_EQ(http::Response::PIPE, response->type);
+  ASSERT_SOME(response->reader);
+
+  http::Pipe::Reader reader = response->reader.get();
+
+  auto deserializer =
+    lambda::bind(deserialize<v1::master::Event>, contentType, lambda::_1);
+
+  Reader<v1::master::Event> decoder(
+      Decoder<v1::master::Event>(deserializer), reader);
+
+  Future<Result<v1::master::Event>> event = decoder.read();
+  AWAIT_READY(event);
+
+  EXPECT_EQ(v1::master::Event::SUBSCRIBED, event->get().type());
+  const v1::master::Response::GetState& getState =
+      event->get().subscribed().get_state();
+
+  EXPECT_EQ(0u, getState.get_frameworks().frameworks_size());
+  EXPECT_EQ(0u, getState.get_agents().agents_size());
+  EXPECT_EQ(0u, getState.get_tasks().tasks_size());
+  EXPECT_EQ(0u, getState.get_executors().executors_size());
+
+  event = decoder.read();
+
+  AWAIT_READY(event);
+
+  EXPECT_EQ(v1::master::Event::HEARTBEAT, event->get().type());
+
+  event = decoder.read();
+  EXPECT_TRUE(event.isPending());
+
+  // Expects a heartbeat event after every heartbeat interval.
+  for (int i = 0; i < 10; i++) {
+    // Advance the clock to receive another heartbeat.
+    Clock::pause();
+    Clock::advance(DEFAULT_HEARTBEAT_INTERVAL);
+
+    AWAIT_READY(event);
+    EXPECT_EQ(v1::master::Event::HEARTBEAT, event->get().type());
+
+    event = decoder.read();
+    EXPECT_TRUE(event.isPending());
+  }
+}
+
+
 // This test verifies if we can retrieve the current quota status through
 // `GET_QUOTA` call, after we set quota resources through `SET_QUOTA` call.
 TEST_P(MasterAPITest, GetQuota)