You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by gr...@apache.org on 2018/12/14 21:32:07 UTC

[mesos] 03/06: Added heartbeaters for agent and HTTP executors.

This is an automated email from the ASF dual-hosted git repository.

grag pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit a47c7dea6ccb3558464219f3c6edf376b2f55086
Author: Joseph Wu <jo...@mesosphere.io>
AuthorDate: Thu Dec 13 16:34:36 2018 -0800

    Added heartbeaters for agent and HTTP executors.
    
    This implements two separate heartbeaters for Executor Events (agent
    to executor) and Executor Calls (executor to agent).  Both are set to
    non-configurable intervals of 30 minutes, which should be sufficient
    to keep the connections alive while not flooding logs with warnings
    if the executor/agent does not have this patch.
    
    Review: https://reviews.apache.org/r/69473/
---
 src/common/validation.cpp         |  4 ++++
 src/executor/executor.cpp         | 20 ++++++++++++++++++++
 src/executor/v0_v1executor.cpp    |  6 ++++++
 src/launcher/default_executor.cpp |  4 ++++
 src/launcher/executor.cpp         |  4 ++++
 src/slave/constants.hpp           |  3 +++
 src/slave/http.cpp                |  4 ++++
 src/slave/slave.cpp               | 12 ++++++++++++
 src/slave/slave.hpp               |  4 ++++
 9 files changed, 61 insertions(+)

diff --git a/src/common/validation.cpp b/src/common/validation.cpp
index 5f8f288..4a64fc8 100644
--- a/src/common/validation.cpp
+++ b/src/common/validation.cpp
@@ -602,6 +602,10 @@ Option<Error> validateExecutorCall(const mesos::executor::Call& call)
       return None();
     }
 
+    case mesos::executor::Call::HEARTBEAT: {
+      return None();
+    }
+
     case mesos::executor::Call::UNKNOWN: {
       return None();
     }
diff --git a/src/executor/executor.cpp b/src/executor/executor.cpp
index d00ea32..e9439da 100644
--- a/src/executor/executor.cpp
+++ b/src/executor/executor.cpp
@@ -90,6 +90,11 @@ namespace mesos {
 namespace v1 {
 namespace executor {
 
+// TODO(josephw): Move this default into a header which can be loaded
+// by tests. Also, consider making this heartbeat interval configurable.
+extern const Duration DEFAULT_HEARTBEAT_CALL_INTERVAL = Minutes(30);
+
+
 class ShutdownProcess : public process::Process<ShutdownProcess>
 {
 public:
@@ -363,6 +368,7 @@ protected:
   void initialize() override
   {
     connect();
+    heartbeat();
   }
 
   void connect()
@@ -781,6 +787,20 @@ protected:
     LOG(WARNING) << "Dropping " << call.type() << ": " << message;
   }
 
+  void heartbeat()
+  {
+    if (connections.isSome()) {
+      Call call;
+      call.set_type(Call::HEARTBEAT);
+      call.mutable_executor_id()->set_value("unused");
+      call.mutable_framework_id()->set_value("unused");
+
+      send(call);
+    }
+
+    delay(DEFAULT_HEARTBEAT_CALL_INTERVAL, self(), &Self::heartbeat);
+  }
+
 private:
   struct Callbacks
   {
diff --git a/src/executor/v0_v1executor.cpp b/src/executor/v0_v1executor.cpp
index aebdbe7..de73f0e 100644
--- a/src/executor/v0_v1executor.cpp
+++ b/src/executor/v0_v1executor.cpp
@@ -229,6 +229,12 @@ public:
         break;
       }
 
+      case Call::HEARTBEAT: {
+        // NOTE: Heartbeat calls were added to HTTP executors only.
+        // There is no equivalent method for PID-based executors.
+        break;
+      }
+
       case Call::UNKNOWN: {
         EXIT(EXIT_FAILURE) << "Received an unexpected " << call.type()
                            << " call";
diff --git a/src/launcher/default_executor.cpp b/src/launcher/default_executor.cpp
index cc7b6b7..5837cfa 100644
--- a/src/launcher/default_executor.cpp
+++ b/src/launcher/default_executor.cpp
@@ -295,6 +295,10 @@ public:
         break;
       }
 
+      case Event::HEARTBEAT: {
+        break;
+      }
+
       case Event::UNKNOWN: {
         LOG(WARNING) << "Received an UNKNOWN event and ignored";
         break;
diff --git a/src/launcher/executor.cpp b/src/launcher/executor.cpp
index 6b1204d..f962e80 100644
--- a/src/launcher/executor.cpp
+++ b/src/launcher/executor.cpp
@@ -258,6 +258,10 @@ public:
         break;
       }
 
+      case Event::HEARTBEAT: {
+        break;
+      }
+
       case Event::UNKNOWN: {
         LOG(WARNING) << "Received an UNKNOWN event and ignored";
         break;
diff --git a/src/slave/constants.hpp b/src/slave/constants.hpp
index fdc21a3..1a20113 100644
--- a/src/slave/constants.hpp
+++ b/src/slave/constants.hpp
@@ -48,6 +48,9 @@ constexpr Duration MAX_EXECUTOR_REREGISTRATION_TIMEOUT = Seconds(15);
 // shut down before destroying the container.
 constexpr Duration DEFAULT_EXECUTOR_SHUTDOWN_GRACE_PERIOD = Seconds(5);
 
+// The default amount of time between heartbeats sent to HTTP executors.
+constexpr Duration DEFAULT_EXECUTOR_HEARTBEAT_INTERVAL = Minutes(30);
+
 constexpr Duration RECOVERY_TIMEOUT = Minutes(15);
 
 // TODO(gkleiman): Move this to a different file once `TaskStatusUpdateManager`
diff --git a/src/slave/http.cpp b/src/slave/http.cpp
index 4b0347f..6ef99fb 100644
--- a/src/slave/http.cpp
+++ b/src/slave/http.cpp
@@ -859,6 +859,10 @@ Future<Response> Http::executor(
       return Accepted();
     }
 
+    case executor::Call::HEARTBEAT: {
+      return Accepted();
+    }
+
     case executor::Call::UNKNOWN: {
       LOG(WARNING) << "Received 'UNKNOWN' call";
       return NotImplemented();
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 2d79ee6..ad3b693 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -4692,6 +4692,18 @@ void Slave::subscribe(
       executor->http = http;
       executor->pid = None();
 
+      // Create a heartbeater for HTTP executors.
+      executor::Event heartbeatEvent;
+      heartbeatEvent.set_type(executor::Event::HEARTBEAT);
+
+      executor->heartbeater.reset(
+          new ResponseHeartbeater<executor::Event, v1::executor::Event>(
+              "executor " + stringify(executor->id),
+              heartbeatEvent,
+              http,
+              DEFAULT_EXECUTOR_HEARTBEAT_INTERVAL,
+              DEFAULT_EXECUTOR_HEARTBEAT_INTERVAL));
+
       if (framework->info.checkpoint()) {
         // Write a marker file to indicate that this executor
         // is HTTP based.
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 469f682..2eadf5f 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -65,6 +65,7 @@
 #include <stout/recordio.hpp>
 #include <stout/uuid.hpp>
 
+#include "common/heartbeater.hpp"
 #include "common/http.hpp"
 #include "common/protobuf_utils.hpp"
 #include "common/recordio.hpp"
@@ -986,6 +987,9 @@ public:
   //           *                 *       None       Some      Libprocess
   //           *                 *       Some       None            HTTP
   Option<StreamingHttpConnection<v1::executor::Event>> http;
+  process::Owned<ResponseHeartbeater<executor::Event, v1::executor::Event>>
+    heartbeater;
+
   Option<process::UPID> pid;
 
   // Tasks can be found in one of the following four data structures: