You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by gr...@apache.org on 2017/08/18 21:53:01 UTC
mesos git commit: Added a heartbeat event to the operator API.
Repository: mesos
Updated Branches:
refs/heads/master 55678b41f -> 4d0e692e7
Added a heartbeat event to the operator API.
Added the 'HEARTBEAT' event for the operator API and modified
related test cases to accept heartbeats.
Review: https://reviews.apache.org/r/61262/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/4d0e692e
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/4d0e692e
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/4d0e692e
Branch: refs/heads/master
Commit: 4d0e692e72e56e827d2136b78311db0b72ada6a2
Parents: 55678b4
Author: Quinn Leng <qu...@gmail.com>
Authored: Fri Aug 18 14:50:58 2017 -0700
Committer: Greg Mann <gr...@gmail.com>
Committed: Fri Aug 18 14:51:11 2017 -0700
----------------------------------------------------------------------
include/mesos/master/master.proto | 9 ++
include/mesos/v1/master/master.proto | 9 ++
src/master/http.cpp | 4 +
src/master/master.hpp | 133 ++++++++++++++++++++----------
src/tests/api_tests.cpp | 91 ++++++++++++++++++++
5 files changed, 201 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/4d0e692e/include/mesos/master/master.proto
----------------------------------------------------------------------
diff --git a/include/mesos/master/master.proto b/include/mesos/master/master.proto
index fc5bd89..7dc5881 100644
--- a/include/mesos/master/master.proto
+++ b/include/mesos/master/master.proto
@@ -484,6 +484,15 @@ message Event {
FRAMEWORK_UPDATED = 7; // See `FrameworkUpdated` below.
FRAMEWORK_REMOVED = 8; // See `FrameworkRemoved` below.
+ // Periodic message sent by the master to the subscriber according to
+ // 'Subscribed.heartbeat_interval_seconds'. If the subscriber does not
+ // receive any events (including heartbeats) for an extended period of
+ // time (e.g., 5 x heartbeat_interval_seconds), it is likely that the
+ // connection is lost or there is a network partition. In that case,
+ // the subscriber should close the existing subscription connection and
+ // resubscribe using a backoff strategy.
+ HEARTBEAT = 9;
+
// TODO(vinod): Fill in more events.
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/4d0e692e/include/mesos/v1/master/master.proto
----------------------------------------------------------------------
diff --git a/include/mesos/v1/master/master.proto b/include/mesos/v1/master/master.proto
index 4ded0a8..db19c5c 100644
--- a/include/mesos/v1/master/master.proto
+++ b/include/mesos/v1/master/master.proto
@@ -482,6 +482,15 @@ message Event {
FRAMEWORK_UPDATED = 7; // See `FrameworkUpdated` below.
FRAMEWORK_REMOVED = 8; // See `FrameworkRemoved` below.
+ // Periodic message sent by the master to the subscriber according to
+ // 'Subscribed.heartbeat_interval_seconds'. If the subscriber does not
+ // receive any events (including heartbeats) for an extended period of
+ // time (e.g., 5 x heartbeat_interval_seconds), it is likely that the
+ // connection is lost or there is a network partition. In that case,
+ // the subscriber should close the existing subscription connection and
+ // resubscribe using a backoff strategy.
+ HEARTBEAT = 9;
+
// TODO(vinod): Fill in more events.
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/4d0e692e/src/master/http.cpp
----------------------------------------------------------------------
diff --git a/src/master/http.cpp b/src/master/http.cpp
index 5654e7e..b09a9d0 100644
--- a/src/master/http.cpp
+++ b/src/master/http.cpp
@@ -855,6 +855,10 @@ Future<Response> Master::Http::subscribe(
http.send<mesos::master::Event, v1::master::Event>(event);
+ mesos::master::Event heartbeatEvent;
+ heartbeatEvent.set_type(mesos::master::Event::HEARTBEAT);
+ http.send<mesos::master::Event, v1::master::Event>(heartbeatEvent);
+
return ok;
}));
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/4d0e692e/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index b802fd1..d9cfc42 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -306,6 +306,63 @@ struct HttpConnection
};
+// This process periodically sends heartbeats to a given HTTP connection.
+// The `Message` template parameter is the type of the heartbeat event passed
+// into the heartbeater during construction, while the `Event` template
+// parameter is the versioned event type which is sent to the client.
+// The optional delay parameter is used to specify the delay period before it
+// sends the first heartbeat.
+template <typename Message, typename Event>
+class Heartbeater : public process::Process<Heartbeater<Message, Event>>
+{
+public:
+ Heartbeater(const std::string& _logMessage,
+ const Message& _heartbeatMessage,
+ const HttpConnection& _http,
+ const Duration& _interval,
+ const Option<Duration>& _delay = None())
+ : process::ProcessBase(process::ID::generate("heartbeater")),
+ logMessage(_logMessage),
+ heartbeatMessage(_heartbeatMessage),
+ http(_http),
+ interval(_interval),
+ delay(_delay) {}
+
+protected:
+ virtual void initialize() override
+ {
+ if (delay.isSome()) {
+ process::delay(
+ delay.get(),
+ this,
+ &Heartbeater<Message, Event>::heartbeat);
+ } else {
+ heartbeat();
+ }
+ }
+
+private:
+ void heartbeat()
+ {
+ // Only send a heartbeat if the connection is not closed.
+ if (http.closed().isPending()) {
+ VLOG(1) << "Sending heartbeat to " << logMessage;
+
+ Message message(heartbeatMessage);
+ http.send<Message, Event>(message);
+ }
+
+ process::delay(interval, this, &Heartbeater<Message, Event>::heartbeat);
+ }
+
+ const std::string logMessage;
+ const Message heartbeatMessage;
+ HttpConnection http;
+ const Duration interval;
+ const Option<Duration> delay;
+};
+
+
class Master : public ProtobufProcess<Master>
{
public:
@@ -1829,8 +1886,22 @@ private:
// might only be interested in a subset of events.
struct Subscriber
{
- Subscriber(const HttpConnection& _http)
- : http(_http) {}
+ Subscriber(const HttpConnection& _http) : http(_http)
+ {
+ mesos::master::Event event;
+ event.set_type(mesos::master::Event::HEARTBEAT);
+
+ heartbeater =
+ process::Owned<Heartbeater<mesos::master::Event, v1::master::Event>>(
+ new Heartbeater<mesos::master::Event, v1::master::Event>(
+ "subscriber " + stringify(http.streamId),
+ event,
+ http,
+ DEFAULT_HEARTBEAT_INTERVAL,
+ DEFAULT_HEARTBEAT_INTERVAL));
+
+ process::spawn(heartbeater.get());
+ }
// Not copyable, not assignable.
Subscriber(const Subscriber&) = delete;
@@ -1843,9 +1914,14 @@ private:
// after passing ownership to the `Subscriber` object. See MESOS-5843
// for more details.
http.close();
+
+ terminate(heartbeater.get());
+ wait(heartbeater.get());
}
HttpConnection http;
+ process::Owned<Heartbeater<mesos::master::Event, v1::master::Event>>
+ heartbeater;
};
// Sends the event to all subscribers connected to the 'api/vX' endpoint.
@@ -2190,47 +2266,6 @@ inline std::ostream& operator<<(
const Framework& framework);
-// This process periodically sends heartbeats to a scheduler on the
-// given HTTP connection.
-class Heartbeater : public process::Process<Heartbeater>
-{
-public:
- Heartbeater(const FrameworkID& _frameworkId,
- const HttpConnection& _http,
- const Duration& _interval)
- : process::ProcessBase(process::ID::generate("heartbeater")),
- frameworkId(_frameworkId),
- http(_http),
- interval(_interval) {}
-
-protected:
- virtual void initialize() override
- {
- heartbeat();
- }
-
-private:
- void heartbeat()
- {
- // Only send a heartbeat if the connection is not closed.
- if (http.closed().isPending()) {
- VLOG(1) << "Sending heartbeat to " << frameworkId;
-
- scheduler::Event event;
- event.set_type(scheduler::Event::HEARTBEAT);
-
- http.send(event);
- }
-
- process::delay(interval, self(), &Self::heartbeat);
- }
-
- const FrameworkID frameworkId;
- HttpConnection http;
- const Duration interval;
-};
-
-
// TODO(bmahler): Keeping the task and executor information in sync
// across the Slave and Framework structs is error prone!
struct Framework
@@ -2717,8 +2752,15 @@ struct Framework
// TODO(vinod): Make heartbeat interval configurable and include
// this information in the SUBSCRIBED response.
+ scheduler::Event event;
+ event.set_type(scheduler::Event::HEARTBEAT);
+
heartbeater =
- new Heartbeater(info.id(), http.get(), DEFAULT_HEARTBEAT_INTERVAL);
+ new Heartbeater<scheduler::Event, v1::scheduler::Event>(
+ "framework " + stringify(info.id()),
+ event,
+ http.get(),
+ DEFAULT_HEARTBEAT_INTERVAL);
process::spawn(heartbeater.get().get());
}
@@ -2819,7 +2861,8 @@ struct Framework
hashmap<SlaveID, Resources> offeredResources;
// This is only set for HTTP frameworks.
- Option<process::Owned<Heartbeater>> heartbeater;
+ Option<process::Owned<Heartbeater<scheduler::Event, v1::scheduler::Event>>>
+ heartbeater;
private:
Framework(Master* const _master,
http://git-wip-us.apache.org/repos/asf/mesos/blob/4d0e692e/src/tests/api_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/api_tests.cpp b/src/tests/api_tests.cpp
index 3ab4740..34480ea 100644
--- a/src/tests/api_tests.cpp
+++ b/src/tests/api_tests.cpp
@@ -73,6 +73,8 @@ using mesos::slave::ContainerTermination;
using mesos::internal::devolve;
using mesos::internal::evolve;
+using mesos::internal::master::DEFAULT_HEARTBEAT_INTERVAL;
+
using mesos::internal::recordio::Reader;
using mesos::internal::slave::Fetcher;
@@ -1590,6 +1592,12 @@ TEST_P(MasterAPITest, SubscribeAgentEvents)
EXPECT_TRUE(getState.get_tasks().tasks().empty());
EXPECT_TRUE(getState.get_executors().executors().empty());
+ event = decoder.read();
+
+ AWAIT_READY(event);
+
+ EXPECT_EQ(v1::master::Event::HEARTBEAT, event->get().type());
+
// Start one agent.
Future<SlaveRegisteredMessage> agentRegisteredMessage =
FUTURE_PROTOBUF(SlaveRegisteredMessage(), master.get()->pid, _);
@@ -2089,6 +2097,12 @@ TEST_P(MasterAPITest, Subscribe)
EXPECT_TRUE(getState.get_executors().executors().empty());
event = decoder.read();
+
+ AWAIT_READY(event);
+
+ EXPECT_EQ(v1::master::Event::HEARTBEAT, event->get().type());
+
+ event = decoder.read();
EXPECT_TRUE(event.isPending());
const v1::Offer& offer = offers->offers(0);
@@ -2223,6 +2237,12 @@ TEST_P(MasterAPITest, FrameworksEvent)
EXPECT_TRUE(getState.get_executors().executors().empty());
event = decoder.read();
+
+ AWAIT_READY(event);
+
+ EXPECT_EQ(v1::master::Event::HEARTBEAT, event->get().type());
+
+ event = decoder.read();
EXPECT_TRUE(event.isPending());
// Start a scheduler. The subscriber will receive a 'FRAMEWORK_ADDED' event
@@ -2358,6 +2378,77 @@ TEST_P(MasterAPITest, FrameworksEvent)
}
+// Verifies that 'HEARTBEAT' events are sent at the correct times.
+TEST_P(MasterAPITest, Heartbeat)
+{
+ ContentType contentType = GetParam();
+
+ Try<Owned<cluster::Master>> master = this->StartMaster();
+ ASSERT_SOME(master);
+
+ v1::master::Call v1Call;
+ v1Call.set_type(v1::master::Call::SUBSCRIBE);
+
+ http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+
+ headers["Accept"] = stringify(contentType);
+
+ Future<http::Response> response = http::streaming::post(
+ master.get()->pid,
+ "api/v1",
+ headers,
+ serialize(contentType, v1Call),
+ stringify(contentType));
+
+ AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response);
+ AWAIT_EXPECT_RESPONSE_HEADER_EQ("chunked", "Transfer-Encoding", response);
+ ASSERT_EQ(http::Response::PIPE, response->type);
+ ASSERT_SOME(response->reader);
+
+ http::Pipe::Reader reader = response->reader.get();
+
+ auto deserializer =
+ lambda::bind(deserialize<v1::master::Event>, contentType, lambda::_1);
+
+ Reader<v1::master::Event> decoder(
+ Decoder<v1::master::Event>(deserializer), reader);
+
+ Future<Result<v1::master::Event>> event = decoder.read();
+ AWAIT_READY(event);
+
+ EXPECT_EQ(v1::master::Event::SUBSCRIBED, event->get().type());
+ const v1::master::Response::GetState& getState =
+ event->get().subscribed().get_state();
+
+ EXPECT_EQ(0u, getState.get_frameworks().frameworks_size());
+ EXPECT_EQ(0u, getState.get_agents().agents_size());
+ EXPECT_EQ(0u, getState.get_tasks().tasks_size());
+ EXPECT_EQ(0u, getState.get_executors().executors_size());
+
+ event = decoder.read();
+
+ AWAIT_READY(event);
+
+ EXPECT_EQ(v1::master::Event::HEARTBEAT, event->get().type());
+
+ event = decoder.read();
+ EXPECT_TRUE(event.isPending());
+
+ // Expects a heartbeat event after every heartbeat interval.
+ for (int i = 0; i < 10; i++) {
+ // Advance the clock to receive another heartbeat.
+ Clock::pause();
+ Clock::advance(DEFAULT_HEARTBEAT_INTERVAL);
+
+ AWAIT_READY(event);
+ EXPECT_EQ(v1::master::Event::HEARTBEAT, event->get().type());
+
+ event = decoder.read();
+ EXPECT_TRUE(event.isPending());
+ }
+}
+
+
// This test verifies if we can retrieve the current quota status through
// `GET_QUOTA` call, after we set quota resources through `SET_QUOTA` call.
TEST_P(MasterAPITest, GetQuota)