You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by gr...@apache.org on 2018/12/14 21:32:07 UTC
[mesos] 03/06: Added heartbeaters for agent and HTTP executors.
This is an automated email from the ASF dual-hosted git repository.
grag pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git
commit a47c7dea6ccb3558464219f3c6edf376b2f55086
Author: Joseph Wu <jo...@mesosphere.io>
AuthorDate: Thu Dec 13 16:34:36 2018 -0800
Added heartbeaters for agent and HTTP executors.
This implements two separate heartbeaters for Executor Events (agent
to executor) and Executor Calls (executor to agent). Both are set to
non-configurable intervals of 30 minutes, which should be sufficient
to keep the connections alive while not flooding logs with warnings
if the executor/agent does not have this patch.
Review: https://reviews.apache.org/r/69473/
---
src/common/validation.cpp | 4 ++++
src/executor/executor.cpp | 20 ++++++++++++++++++++
src/executor/v0_v1executor.cpp | 6 ++++++
src/launcher/default_executor.cpp | 4 ++++
src/launcher/executor.cpp | 4 ++++
src/slave/constants.hpp | 3 +++
src/slave/http.cpp | 4 ++++
src/slave/slave.cpp | 12 ++++++++++++
src/slave/slave.hpp | 4 ++++
9 files changed, 61 insertions(+)
diff --git a/src/common/validation.cpp b/src/common/validation.cpp
index 5f8f288..4a64fc8 100644
--- a/src/common/validation.cpp
+++ b/src/common/validation.cpp
@@ -602,6 +602,10 @@ Option<Error> validateExecutorCall(const mesos::executor::Call& call)
return None();
}
+ case mesos::executor::Call::HEARTBEAT: {
+ return None();
+ }
+
case mesos::executor::Call::UNKNOWN: {
return None();
}
diff --git a/src/executor/executor.cpp b/src/executor/executor.cpp
index d00ea32..e9439da 100644
--- a/src/executor/executor.cpp
+++ b/src/executor/executor.cpp
@@ -90,6 +90,11 @@ namespace mesos {
namespace v1 {
namespace executor {
+// TODO(josephw): Move this default into a header which can be loaded
+// by tests. Also, consider making this heartbeat interval configurable.
+extern const Duration DEFAULT_HEARTBEAT_CALL_INTERVAL = Minutes(30);
+
+
class ShutdownProcess : public process::Process<ShutdownProcess>
{
public:
@@ -363,6 +368,7 @@ protected:
void initialize() override
{
connect();
+ heartbeat();
}
void connect()
@@ -781,6 +787,20 @@ protected:
LOG(WARNING) << "Dropping " << call.type() << ": " << message;
}
+ void heartbeat()
+ {
+ if (connections.isSome()) {
+ Call call;
+ call.set_type(Call::HEARTBEAT);
+ call.mutable_executor_id()->set_value("unused");
+ call.mutable_framework_id()->set_value("unused");
+
+ send(call);
+ }
+
+ delay(DEFAULT_HEARTBEAT_CALL_INTERVAL, self(), &Self::heartbeat);
+ }
+
private:
struct Callbacks
{
diff --git a/src/executor/v0_v1executor.cpp b/src/executor/v0_v1executor.cpp
index aebdbe7..de73f0e 100644
--- a/src/executor/v0_v1executor.cpp
+++ b/src/executor/v0_v1executor.cpp
@@ -229,6 +229,12 @@ public:
break;
}
+ case Call::HEARTBEAT: {
+ // NOTE: Heartbeat calls were added to HTTP executors only.
+ // There is no equivalent method for PID-based executors.
+ break;
+ }
+
case Call::UNKNOWN: {
EXIT(EXIT_FAILURE) << "Received an unexpected " << call.type()
<< " call";
diff --git a/src/launcher/default_executor.cpp b/src/launcher/default_executor.cpp
index cc7b6b7..5837cfa 100644
--- a/src/launcher/default_executor.cpp
+++ b/src/launcher/default_executor.cpp
@@ -295,6 +295,10 @@ public:
break;
}
+ case Event::HEARTBEAT: {
+ break;
+ }
+
case Event::UNKNOWN: {
LOG(WARNING) << "Received an UNKNOWN event and ignored";
break;
diff --git a/src/launcher/executor.cpp b/src/launcher/executor.cpp
index 6b1204d..f962e80 100644
--- a/src/launcher/executor.cpp
+++ b/src/launcher/executor.cpp
@@ -258,6 +258,10 @@ public:
break;
}
+ case Event::HEARTBEAT: {
+ break;
+ }
+
case Event::UNKNOWN: {
LOG(WARNING) << "Received an UNKNOWN event and ignored";
break;
diff --git a/src/slave/constants.hpp b/src/slave/constants.hpp
index fdc21a3..1a20113 100644
--- a/src/slave/constants.hpp
+++ b/src/slave/constants.hpp
@@ -48,6 +48,9 @@ constexpr Duration MAX_EXECUTOR_REREGISTRATION_TIMEOUT = Seconds(15);
// shut down before destroying the container.
constexpr Duration DEFAULT_EXECUTOR_SHUTDOWN_GRACE_PERIOD = Seconds(5);
+// The default amount of time between heartbeats sent to HTTP executors.
+constexpr Duration DEFAULT_EXECUTOR_HEARTBEAT_INTERVAL = Minutes(30);
+
constexpr Duration RECOVERY_TIMEOUT = Minutes(15);
// TODO(gkleiman): Move this to a different file once `TaskStatusUpdateManager`
diff --git a/src/slave/http.cpp b/src/slave/http.cpp
index 4b0347f..6ef99fb 100644
--- a/src/slave/http.cpp
+++ b/src/slave/http.cpp
@@ -859,6 +859,10 @@ Future<Response> Http::executor(
return Accepted();
}
+ case executor::Call::HEARTBEAT: {
+ return Accepted();
+ }
+
case executor::Call::UNKNOWN: {
LOG(WARNING) << "Received 'UNKNOWN' call";
return NotImplemented();
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 2d79ee6..ad3b693 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -4692,6 +4692,18 @@ void Slave::subscribe(
executor->http = http;
executor->pid = None();
+ // Create a heartbeater for HTTP executors.
+ executor::Event heartbeatEvent;
+ heartbeatEvent.set_type(executor::Event::HEARTBEAT);
+
+ executor->heartbeater.reset(
+ new ResponseHeartbeater<executor::Event, v1::executor::Event>(
+ "executor " + stringify(executor->id),
+ heartbeatEvent,
+ http,
+ DEFAULT_EXECUTOR_HEARTBEAT_INTERVAL,
+ DEFAULT_EXECUTOR_HEARTBEAT_INTERVAL));
+
if (framework->info.checkpoint()) {
// Write a marker file to indicate that this executor
// is HTTP based.
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 469f682..2eadf5f 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -65,6 +65,7 @@
#include <stout/recordio.hpp>
#include <stout/uuid.hpp>
+#include "common/heartbeater.hpp"
#include "common/http.hpp"
#include "common/protobuf_utils.hpp"
#include "common/recordio.hpp"
@@ -986,6 +987,9 @@ public:
// * * None Some Libprocess
// * * Some None HTTP
Option<StreamingHttpConnection<v1::executor::Event>> http;
+ process::Owned<ResponseHeartbeater<executor::Event, v1::executor::Event>>
+ heartbeater;
+
Option<process::UPID> pid;
// Tasks can be found in one of the following four data structures: