You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2016/01/27 00:22:36 UTC

[1/6] mesos git commit: Introduced a field for specifying grace period in shutdown event.

Repository: mesos
Updated Branches:
  refs/heads/master 5e618ac21 -> d62c3a311


Introduced a field for specifying grace period in shutdown event.

This change introduces a field `grace_period_seconds` that specifies
the time an agent would wait for the executor to terminate before
forcefully destroying the container.

Review: https://reviews.apache.org/r/41275/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/6826582c
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/6826582c
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/6826582c

Branch: refs/heads/master
Commit: 6826582cd369f155cd79bbc53d572695f137e852
Parents: 5e618ac
Author: Anand Mazumdar <ma...@gmail.com>
Authored: Tue Jan 26 15:20:10 2016 -0800
Committer: Vinod Kone <vi...@gmail.com>
Committed: Tue Jan 26 15:20:10 2016 -0800

----------------------------------------------------------------------
 include/mesos/executor/executor.proto    | 18 +++++++++++++-----
 include/mesos/v1/executor/executor.proto | 18 +++++++++++++-----
 2 files changed, 26 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/6826582c/include/mesos/executor/executor.proto
----------------------------------------------------------------------
diff --git a/include/mesos/executor/executor.proto b/include/mesos/executor/executor.proto
index cc6b4f6..e905a12 100644
--- a/include/mesos/executor/executor.proto
+++ b/include/mesos/executor/executor.proto
@@ -37,10 +37,7 @@ message Event {
     KILL = 3;         // See 'Kill' below.
     ACKNOWLEDGED = 4; // See 'Acknowledged' below.
     MESSAGE = 5;      // See 'Message' below.
-
-    // Received when the slave asks the executor to shutdown/kill itself.
-    SHUTDOWN = 6;
-
+    SHUTDOWN = 6;     // See 'Shutdown' below.
     ERROR = 7;        // See 'Error' below.
   }
 
@@ -87,6 +84,16 @@ message Event {
     required bytes data = 1;
   }
 
+  // Received when the agent asks the executor to shutdown/kill itself. The
+  // executor is then required to kill all its active tasks, send 'TASK_KILLED'
+  // status updates and gracefully exit. If an executor does not terminate
+  // within a certain period ('grace_period_seconds'), the agent will forcefully
+  // destroy the container where the executor is running. The agent would then
+  // send 'TASK_LOST' updates for any remaining active tasks of this executor.
+  message Shutdown {
+    optional double grace_period_seconds = 1 [default = 5.0];
+  }
+
   // Received in case the executor sends invalid calls (e.g.,
   // required values not set).
   // TODO(arojas): Remove this once the old executor driver is no
@@ -105,7 +112,8 @@ message Event {
   optional Launch launch = 4;
   optional Kill kill = 5;
   optional Message message = 6;
-  optional Error error = 7;
+  optional Shutdown shutdown = 7;
+  optional Error error = 8;
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/6826582c/include/mesos/v1/executor/executor.proto
----------------------------------------------------------------------
diff --git a/include/mesos/v1/executor/executor.proto b/include/mesos/v1/executor/executor.proto
index 4ecdaa0..dbbc265 100644
--- a/include/mesos/v1/executor/executor.proto
+++ b/include/mesos/v1/executor/executor.proto
@@ -37,10 +37,7 @@ message Event {
     KILL = 3;         // See 'Kill' below.
     ACKNOWLEDGED = 4; // See 'Acknowledged' below.
     MESSAGE = 5;      // See 'Message' below.
-
-    // Received when the agent asks the executor to shutdown/kill itself.
-    SHUTDOWN = 6;
-
+    SHUTDOWN = 6;     // See 'Shutdown' below.
     ERROR = 7;        // See 'Error' below.
   }
 
@@ -87,6 +84,16 @@ message Event {
     required bytes data = 1;
   }
 
+  // Received when the agent asks the executor to shutdown/kill itself. The
+  // executor is then required to kill all its active tasks, send 'TASK_KILLED'
+  // status updates and gracefully exit. If an executor does not terminate
+  // within a certain period ('grace_period_seconds'), the agent will forcefully
+  // destroy the container where the executor is running. The agent would then
+  // send 'TASK_LOST' updates for any remaining active tasks of this executor.
+  message Shutdown {
+    optional double grace_period_seconds = 1 [default = 5.0];
+  }
+
   // Received in case the executor sends invalid calls (e.g.,
   // required values not set).
   // TODO(arojas): Remove this once the old executor driver is no
@@ -105,7 +112,8 @@ message Event {
   optional Launch launch = 4;
   optional Kill kill = 5;
   optional Message message = 6;
-  optional Error error = 7;
+  optional Shutdown shutdown = 7;
+  optional Error error = 8;
 }
 
 


[6/6] mesos git commit: Introduced an Executor Library based on the new executor HTTP API.

Posted by vi...@apache.org.
Introduced an Executor Library based on the new executor HTTP API.

This uses the new HTTP Connection interface to ensure calls are properly
pipelined.

Review: https://reviews.apache.org/r/41283/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/d62c3a31
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/d62c3a31
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/d62c3a31

Branch: refs/heads/master
Commit: d62c3a3113c40fab0636e7178f1998117b623848
Parents: e4ec512
Author: Anand Mazumdar <ma...@gmail.com>
Authored: Tue Jan 26 15:21:45 2016 -0800
Committer: Vinod Kone <vi...@gmail.com>
Committed: Tue Jan 26 15:21:45 2016 -0800

----------------------------------------------------------------------
 src/Makefile.am           |   1 +
 src/executor/executor.cpp | 683 +++++++++++++++++++++++++++++++++++++++++
 2 files changed, 684 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/d62c3a31/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index a508f30..bdb3402 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -583,6 +583,7 @@ libmesos_no_3rdparty_la_SOURCES +=					\
   docker/docker.cpp							\
   docker/spec.cpp							\
   exec/exec.cpp								\
+  executor/executor.cpp							\
   files/files.cpp							\
   hdfs/hdfs.cpp								\
   hook/manager.cpp							\

http://git-wip-us.apache.org/repos/asf/mesos/blob/d62c3a31/src/executor/executor.cpp
----------------------------------------------------------------------
diff --git a/src/executor/executor.cpp b/src/executor/executor.cpp
new file mode 100644
index 0000000..92334ff
--- /dev/null
+++ b/src/executor/executor.cpp
@@ -0,0 +1,683 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include <queue>
+#include <string>
+
+#include <mesos/v1/executor.hpp>
+#include <mesos/v1/mesos.hpp>
+
+#include <process/async.hpp>
+#include <process/delay.hpp>
+#include <process/dispatch.hpp>
+#include <process/future.hpp>
+#include <process/http.hpp>
+#include <process/id.hpp>
+#include <process/mutex.hpp>
+#include <process/owned.hpp>
+#include <process/process.hpp>
+#include <process/protobuf.hpp>
+
+#include <stout/duration.hpp>
+#include <stout/lambda.hpp>
+#include <stout/nothing.hpp>
+#include <stout/option.hpp>
+#include <stout/os.hpp>
+
+#include "common/http.hpp"
+#include "common/recordio.hpp"
+
+#include "internal/devolve.hpp"
+
+#include "logging/flags.hpp"
+#include "logging/logging.hpp"
+
+#include "slave/validation.hpp"
+
+#include "version/version.hpp"
+
+using namespace mesos;
+using namespace mesos::internal;
+
+using std::queue;
+using std::string;
+
+using mesos::internal::recordio::Reader;
+
+using mesos::internal::slave::validation::executor::call::validate;
+
+using process::async;
+using process::delay;
+using process::dispatch;
+using process::Future;
+using process::Mutex;
+using process::Owned;
+
+using process::ID::generate;
+
+using process::http::Connection;
+using process::http::Headers;
+using process::http::Pipe;
+using process::http::post;
+using process::http::Request;
+using process::http::Response;
+using process::http::URL;
+
+using process::UPID;
+
+using ::recordio::Decoder;
+
+namespace mesos {
+namespace v1 {
+namespace executor {
+
+class ShutdownProcess : public process::Process<ShutdownProcess>
+{
+public:
+  ShutdownProcess(double _gracePeriod)
+    : gracePeriod(_gracePeriod) {}
+
+protected:
+  virtual void initialize()
+  {
+    Try<Duration> gracePeriod_ = Duration::create(gracePeriod);
+    CHECK_SOME(gracePeriod_);
+
+    VLOG(1) << "Scheduling shutdown of the executor with grace period: "
+            << gracePeriod_.get();
+
+    delay(gracePeriod_.get(), self(), &Self::kill);
+  }
+
+  void kill()
+  {
+    VLOG(1) << "Committing suicide by killing the process group";
+
+    // TODO(vinod): Invoke killtree without killing ourselves.
+    // Kill the process group (including ourself).
+    killpg(0, SIGKILL);
+
+    // The signal might not get delivered immediately, so sleep for a
+    // few seconds. Worst case scenario, exit abnormally.
+    os::sleep(Seconds(5));
+    exit(-1);
+  }
+
+private:
+  const double gracePeriod;
+};
+
+
+struct Connections
+{
+  bool operator==(const Connections& that) const
+  {
+    return subscribe == that.subscribe && nonSubscribe == that.nonSubscribe;
+  }
+
+  Connection subscribe; // Used for subscribe call/response.
+  Connection nonSubscribe; // Used for all other calls/responses.
+};
+
+
+// The process (below) is responsible for receiving messages (via events)
+// from the agent and sending messages (via calls) to the agent.
+class MesosProcess : public ProtobufProcess<MesosProcess>
+{
+public:
+  MesosProcess(
+      ContentType _contentType,
+      const lambda::function<void(void)>& connected,
+      const lambda::function<void(void)>& disconnected,
+      lambda::function<void(const queue<Event>&)> received)
+    : ProcessBase(generate("executor")),
+      state(DISCONNECTED),
+      contentType(_contentType),
+      callbacks {connected, disconnected, received}
+  {
+    GOOGLE_PROTOBUF_VERIFY_VERSION;
+
+    // Load any logging flags from the environment.
+    logging::Flags flags;
+
+    Try<Nothing> load = flags.load("MESOS_");
+
+    if (load.isError()) {
+      EXIT(1) << "Failed to load flags: " << load.error();
+    }
+
+    // Initialize libprocess.
+    process::initialize();
+
+    // Initialize logging.
+    if (flags.initialize_driver_logging) {
+      logging::initialize("mesos", flags);
+    } else {
+      VLOG(1) << "Disabling initialization of GLOG logging";
+    }
+
+    LOG(INFO) << "Version: " << MESOS_VERSION;
+
+    spawn(new VersionProcess(), true);
+
+    // Check if this is local (for example, for testing).
+    local = os::getenv("MESOS_LOCAL").isSome();
+
+    Option<string> value;
+
+    // Get agent PID from environment.
+    value = os::getenv("MESOS_SLAVE_PID");
+    if (value.isNone()) {
+      EXIT(1) << "Expecting 'MESOS_SLAVE_PID' to be set in the environment";
+    }
+
+    UPID upid(value.get());
+    CHECK(upid) << "Failed to parse MESOS_SLAVE_PID '" << value.get() << "'";
+
+    agent = ::URL(
+        "http",
+        upid.address.ip,
+        upid.address.port,
+        upid.id +
+        "/api/v1/executor");
+
+    // Get checkpointing status from environment.
+    value = os::getenv("MESOS_CHECKPOINT");
+    checkpoint = value.isSome() && value.get() == "1";
+
+    if (checkpoint) {
+      // Get recovery timeout from environment.
+      value = os::getenv("MESOS_RECOVERY_TIMEOUT");
+      if (value.isSome()) {
+        Try<Duration> _recoveryTimeout = Duration::parse(value.get());
+
+        CHECK_SOME(_recoveryTimeout)
+            << "Failed to parse MESOS_RECOVERY_TIMEOUT '" << value.get()
+            << "': " << _recoveryTimeout.error();
+
+        recoveryTimeout = _recoveryTimeout.get();
+      } else {
+        EXIT(1) << "Expecting 'MESOS_RECOVERY_TIMEOUT' to be set in the "
+                << "environment";
+      }
+
+      // Get maximum backoff factor from environment.
+      value = os::getenv("MESOS_SUBSCRIPTION_BACKOFF_MAX");
+      if (value.isSome()) {
+        Try<Duration> _maxBackoff = Duration::parse(value.get());
+
+        CHECK_SOME(_maxBackoff)
+            << "Failed to parse MESOS_SUBSCRIPTION_BACKOFF_MAX '"
+            << value.get() << "': " << _maxBackoff.error();
+
+        maxBackoff = _maxBackoff.get();
+      } else {
+        EXIT(1) << "Expecting 'MESOS_SUBSCRIPTION_BACKOFF_MAX' to be set "
+                << "in the environment";
+      }
+    }
+  }
+
+  void send(const Call& call)
+  {
+    if (state == DISCONNECTED) {
+      drop(call, "Disconnected");
+      return;
+    }
+
+    Option<Error> error = validate(devolve(call));
+    if (error.isSome()) {
+      drop(call, error->message);
+      return;
+    }
+
+    VLOG(1) << "Sending " << call.type() << " call to " << agent;
+
+    ::Request request;
+    request.method = "POST";
+    request.url = agent;
+    request.body = serialize(contentType, call);
+    request.keepAlive = true;
+    request.headers = {{"Accept", stringify(contentType)},
+                       {"Content-Type", stringify(contentType)}};
+
+    CHECK_SOME(connections);
+
+    Future<Response> response;
+    if (call.type() == Call::SUBSCRIBE) {
+      // Send a streaming request for Subscribe call.
+      response = connections->subscribe.send(request, true);
+    } else {
+      response = connections->nonSubscribe.send(request);
+    }
+
+    response.onAny(defer(self(), &Self::_send, call, lambda::_1));
+  }
+
+  ~MesosProcess()
+  {
+    close();
+  }
+
+protected:
+  virtual void initialize()
+  {
+    connect();
+  }
+
+  void connect()
+  {
+    if (state == CONNECTED) {
+      return;
+    }
+
+    // We create two persistent connections here, one for subscribe
+    // call/streaming response and another for non-subscribe calls/responses.
+    process::http::connect(agent)
+      .onAny(defer(self(), [this](const Future<Connection>& connection) {
+        process::http::connect(agent)
+          .onAny(defer(self(), &Self::connected, connection, lambda::_1));
+      }));
+  }
+
+  void connected(
+      const Future<Connection>& connection1,
+      const Future<Connection>& connection2)
+  {
+    if (!connection1.isReady()) {
+      disconnected(connections,
+                   connection1.isFailed()
+                     ? connection1.failure()
+                     : "Subscribe future discarded");
+      return;
+    }
+
+    if (!connection2.isReady()) {
+      disconnected(connections,
+                   connection2.isFailed()
+                     ? connection2.failure()
+                     : "Non-subscribe future discarded");
+      return;
+    }
+
+    VLOG(1) << "Connected with the agent";
+
+    state = CONNECTED;
+
+    connections = Connections {connection1.get(), connection2.get()};
+
+    connections->subscribe.disconnected()
+      .onAny(defer(self(),
+                   &Self::disconnected,
+                   connections,
+                   "Subscribe connection interrupted"));
+
+    connections->nonSubscribe.disconnected()
+      .onAny(defer(self(),
+                   &Self::disconnected,
+                   connections,
+                   "Non-subscribe connection interrupted"));
+
+    // Invoke the connected callback once we have established both subscribe
+    // and non-subscribe connections with the agent.
+    mutex.lock()
+      .then(defer(self(), [this]() {
+        return async(callbacks.connected);
+      }))
+      .onAny(lambda::bind(&Mutex::unlock, mutex));
+  }
+
+  void disconnected(
+      const Option<Connections>& _connections,
+      const string& failure)
+  {
+    // Ignore if we are already disconnected from the agent.
+    if (state == DISCONNECTED) {
+      return;
+    }
+
+    // Ignore if the disconnection happened from an old stale connection.
+    if (connections != _connections) {
+      return;
+    }
+
+    VLOG(1) << "Disconnected from agent: " << failure;
+
+    state = DISCONNECTED;
+
+    // NOTE: We will be here if either subscribe or non-subscribe connection is
+    // disconnected. We explicitly disconnect both the connections here for
+    // simplicity.
+    CHECK_SOME(connections);
+
+    connections->subscribe.disconnect();
+    connections->nonSubscribe.disconnect();
+
+    // Close the old subscribed response stream.
+    close();
+
+    // Invoke the disconnected callback the first time we disconnect from
+    // the agent.
+    mutex.lock()
+      .then(defer(self(), [this]() {
+        return async(callbacks.disconnected);
+      }))
+      .onAny(lambda::bind(&Mutex::unlock, mutex));
+
+    if (checkpoint) {
+      CHECK_SOME(recoveryTimeout);
+
+      CHECK_SOME(connections);
+
+      // Set up recovery timeout upon disconnection.
+      delay(recoveryTimeout.get(),
+            self(),
+            &Self::_recoveryTimeout,
+            connections.get());
+
+      // Backoff and reconnect only if framework checkpointing is enabled.
+      backoff();
+    } else {
+      shutdown();
+    }
+  }
+
+  void backoff()
+  {
+    if (state == CONNECTED) {
+      return;
+    }
+
+    CHECK(checkpoint);
+    CHECK_SOME(maxBackoff);
+
+    // Linearly backoff by picking a random duration between 0 and
+    // `maxBackoff`.
+    Duration backoff = maxBackoff.get() * ((double) ::random() / RAND_MAX);
+
+    VLOG(1) << "Will retry connecting with the agent again in " << backoff;
+
+    connect();
+
+    delay(backoff, self(), &Self::backoff);
+  }
+
+  Future<Nothing> _receive()
+  {
+    Future<Nothing> future = async(callbacks.received, events);
+    events = queue<Event>();
+    return future;
+  }
+
+  void _recoveryTimeout(const Connections& _connections)
+  {
+    // If we're connected, don't do anything.
+    if (state == CONNECTED) {
+      return;
+    }
+
+    // Ignore if this timeout is for a stale connection.
+    if (connections != _connections) {
+      return;
+    }
+
+    CHECK_SOME(recoveryTimeout);
+
+    LOG(INFO) << "Recovery timeout of " << recoveryTimeout.get()
+              << " exceeded; Shutting down";
+
+    shutdown();
+  }
+
+  // Helper for injecting an ERROR event.
+  void error(const string& message)
+  {
+    Event event;
+    event.set_type(Event::ERROR);
+
+    Event::Error* error = event.mutable_error();
+    error->set_message(message);
+
+    receive(event, true);
+  }
+
+  void _send(const Call& call, const Future<Response>& response)
+  {
+    CHECK(!response.isDiscarded());
+
+    // This can happen if the agent process is restarted or a network blip
+    // caused the socket to timeout. Eventually, the executor would
+    // detect the socket disconnection via the disconnected callback.
+    if (response.isFailed()) {
+      LOG(ERROR) << "Request for call type " << call.type() << " failed: "
+                 << response.failure();
+      return;
+    }
+
+    if (response->code == process::http::Status::OK) {
+      // Only SUBSCRIBE call should get a "200 OK" response.
+      CHECK_EQ(Call::SUBSCRIBE, call.type());
+      CHECK_EQ(response->type, Response::PIPE);
+      CHECK_SOME(response->reader);
+
+      Pipe::Reader reader = response->reader.get();
+
+      auto deserializer =
+        lambda::bind(deserialize<Event>, contentType, lambda::_1);
+
+      Owned<Reader<Event>> decoder(
+          new Reader<Event>(Decoder<Event>(deserializer), reader));
+
+      // Close previous reader.
+      close();
+
+      subscribed = SubscribedResponse {reader, decoder};
+
+      read();
+      return;
+    }
+
+    if (response->code == process::http::Status::ACCEPTED) {
+      // Only non SUBSCRIBE calls should get a "202 Accepted" response.
+      CHECK_NE(Call::SUBSCRIBE, call.type());
+      return;
+    }
+
+    if (response->code == process::http::Status::SERVICE_UNAVAILABLE) {
+      // This could happen if the agent is still in the process of recovery.
+      LOG(WARNING) << "Received '" << response->status << "' ("
+                   << response->body << ") for " << call.type();
+      return;
+    }
+
+    // We should not be able to get here since we already do validation
+    // of calls before sending them to the agent.
+    error("Received unexpected '" + response->status + "' (" +
+          response->body + ") for " + stringify(call.type()));
+  }
+
+  void read()
+  {
+    CHECK_SOME(subscribed);
+
+    subscribed->decoder->read()
+      .onAny(defer(self(),
+                   &Self::_read,
+                   subscribed->reader,
+                   lambda::_1));
+  }
+
+  void _read(const Pipe::Reader& reader, const Future<Result<Event>>& event)
+  {
+    CHECK(!event.isDiscarded());
+
+    // Ignore enqueued events from the previous Subscribe call reader.
+    if (subscribed.isNone() || subscribed->reader != reader) {
+      VLOG(1) << "Ignoring event from old stale connection";
+      return;
+    }
+
+    // This could happen if the agent process died while sending a response.
+    if (event.isFailed()) {
+      LOG(ERROR) << "Failed to decode the stream of events: "
+                 << event.failure();
+
+      disconnected(connections, event.failure());
+      return;
+    }
+
+    if (event->isNone()) {
+      const string error =  "End-Of-File received from agent. The agent closed "
+                            "the event stream";
+      LOG(ERROR) << error;
+
+      disconnected(connections, error);
+      return;
+    }
+
+    if (event->isError()) {
+      error("Failed to de-serialize event: " + event->error());
+      return;
+    }
+
+    receive(event.get().get(), false);
+    read();
+  }
+
+  void close()
+  {
+    if (subscribed.isSome()) {
+      subscribed->reader.close();
+      subscribed = None();
+    }
+  }
+
+  void receive(const Event& event, bool isLocallyInjected)
+  {
+    // Check if we're disconnected but received an event.
+    if (!isLocallyInjected && state == DISCONNECTED) {
+      LOG(WARNING) << "Ignoring " << stringify(event.type())
+                   << " event because we're disconnected";
+      return;
+    }
+
+    if (isLocallyInjected) {
+      VLOG(1) << "Enqueuing locally injected event " << stringify(event.type());
+    } else {
+      VLOG(1) << "Enqueuing event " << stringify(event.type()) << " received"
+              << " from " << agent;
+    }
+
+    // Queue up the event and invoke the `received` callback if this
+    // is the first event (between now and when the `received`
+    // callback actually gets invoked more events might get queued).
+    events.push(event);
+
+    if (events.size() == 1) {
+      mutex.lock()
+        .then(defer(self(), &Self::_receive))
+        .onAny(lambda::bind(&Mutex::unlock, mutex));
+    }
+
+    if (event.type() == Event::SHUTDOWN) {
+      shutdown(event.shutdown().grace_period_seconds());
+    }
+  }
+
+  void shutdown()
+  {
+    Event event;
+    event.set_type(Event::SHUTDOWN);
+    event.mutable_shutdown();
+
+    receive(event, true);
+  }
+
+  void shutdown(double gracePeriod)
+  {
+    if (!local) {
+      spawn(new ShutdownProcess(gracePeriod), true);
+    } else {
+      // Process any pending received events from agent and then terminate.
+      terminate(this, false);
+    }
+  }
+
+  void drop(const Call& call, const string& message)
+  {
+    LOG(WARNING) << "Dropping " << call.type() << ": " << message;
+  }
+
+private:
+  struct Callbacks
+  {
+    lambda::function<void(void)> connected;
+    lambda::function<void(void)> disconnected;
+    lambda::function<void(const queue<Event>&)> received;
+  };
+
+  struct SubscribedResponse
+  {
+    Pipe::Reader reader;
+    process::Owned<Reader<Event>> decoder;
+  };
+
+  enum
+  {
+    CONNECTED,   // Established subscribe and non-subscribe connection.
+    DISCONNECTED // Either of subscribe/non-subscribe connection is broken.
+  } state;
+
+  ContentType contentType;
+  Callbacks callbacks;
+  Mutex mutex; // Used to serialize the callback invocations.
+  queue<Event> events;
+  bool local;
+  Option<Connections> connections;
+  Option<SubscribedResponse> subscribed;
+  ::URL agent;
+  bool checkpoint;
+  Option<Duration> recoveryTimeout;
+  Option<Duration> maxBackoff;
+};
+
+
+Mesos::Mesos(
+    ContentType contentType,
+    const lambda::function<void(void)>& connected,
+    const lambda::function<void(void)>& disconnected,
+    const lambda::function<void(const queue<Event>&)>& received)
+  : process(new MesosProcess(contentType, connected, disconnected, received))
+{
+  spawn(process.get());
+}
+
+
+Mesos::~Mesos()
+{
+  terminate(process.get());
+  wait(process.get());
+}
+
+
+void Mesos::send(const Call& call)
+{
+  dispatch(process.get(), &MesosProcess::send, call);
+}
+
+} // namespace executor {
+} // namespace v1 {
+} // namespace mesos {


[2/6] mesos git commit: Added environment variables to be used by HTTP executors.

Posted by vi...@apache.org.
Added environment variables to be used by HTTP executors.

This change adds some environment variables that are used by the HTTP
executors upon startup for connecting/retrying to the agent. These
variables are discussed in more detail in the Executor HTTP API Design
Doc.

Review: https://reviews.apache.org/r/41277/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b338845c
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b338845c
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b338845c

Branch: refs/heads/master
Commit: b338845c2b7cd744169a86fe684a32394ba7f2db
Parents: 6826582
Author: Anand Mazumdar <ma...@gmail.com>
Authored: Tue Jan 26 15:20:29 2016 -0800
Committer: Vinod Kone <vi...@gmail.com>
Committed: Tue Jan 26 15:20:29 2016 -0800

----------------------------------------------------------------------
 src/slave/containerizer/containerizer.cpp | 6 ++++++
 1 file changed, 6 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/b338845c/src/slave/containerizer/containerizer.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/containerizer.cpp b/src/slave/containerizer/containerizer.cpp
index fa6e279..5990468 100644
--- a/src/slave/containerizer/containerizer.cpp
+++ b/src/slave/containerizer/containerizer.cpp
@@ -327,10 +327,16 @@ map<string, string> executorEnvironment(
   environment["MESOS_DIRECTORY"] = directory;
   environment["MESOS_SLAVE_ID"] = slaveId.value();
   environment["MESOS_SLAVE_PID"] = stringify(slavePid);
+  environment["MESOS_AGENT_ENDPOINT"] = stringify(slavePid.address);
   environment["MESOS_CHECKPOINT"] = checkpoint ? "1" : "0";
 
   if (checkpoint) {
     environment["MESOS_RECOVERY_TIMEOUT"] = stringify(flags.recovery_timeout);
+
+    // The maximum backoff duration to be used by an executor between two
+    // retries when disconnected.
+    environment["MESOS_SUBSCRIPTION_BACKOFF_MAX"] =
+      stringify(EXECUTOR_REREGISTER_TIMEOUT);
   }
 
   if (HookManager::hooksAvailable()) {


[4/6] mesos git commit: Added output operator to output call/event types in the Executor V1 API.

Posted by vi...@apache.org.
Added output operator to output call/event types in the Executor V1 API.

Review: https://reviews.apache.org/r/41281/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a383303e
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a383303e
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a383303e

Branch: refs/heads/master
Commit: a383303ed2dc3c1d1a55a46c97d313d1addc7cd7
Parents: 6cd85e9
Author: Anand Mazumdar <ma...@gmail.com>
Authored: Tue Jan 26 15:21:21 2016 -0800
Committer: Vinod Kone <vi...@gmail.com>
Committed: Tue Jan 26 15:21:21 2016 -0800

----------------------------------------------------------------------
 include/mesos/v1/executor/executor.hpp | 19 +++++++++++++++++++
 1 file changed, 19 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/a383303e/include/mesos/v1/executor/executor.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/v1/executor/executor.hpp b/include/mesos/v1/executor/executor.hpp
index a052b10..6e7b412 100644
--- a/include/mesos/v1/executor/executor.hpp
+++ b/include/mesos/v1/executor/executor.hpp
@@ -21,4 +21,23 @@
 // and generating the equivalent .ph.h files
 #include <mesos/v1/executor/executor.pb.h>
 
+namespace mesos {
+namespace v1 {
+namespace executor {
+
+inline std::ostream& operator<<(std::ostream& stream, const Call::Type& type)
+{
+  return stream << Call::Type_Name(type);
+}
+
+
+inline std::ostream& operator<<(std::ostream& stream, const Event::Type& type)
+{
+  return stream << Event::Type_Name(type);
+}
+
+} // namespace executor {
+} // namespace v1 {
+} // namespace mesos {
+
 #endif // __MESOS_V1_EXECUTOR_PROTO_HPP__


[5/6] mesos git commit: Added the interface for the V1 Executor HTTP Library.

Posted by vi...@apache.org.
Added the interface for the V1 Executor HTTP Library.

This change introduces an interface for the V1 Executor HTTP Library.
The interface is similar in semantics to the corresponding scheduler
library.

Review: https://reviews.apache.org/r/41282/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e4ec5122
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e4ec5122
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e4ec5122

Branch: refs/heads/master
Commit: e4ec5122fd420d0ce928fc15794c3aafc6e66126
Parents: a383303
Author: Anand Mazumdar <ma...@gmail.com>
Authored: Tue Jan 26 15:21:30 2016 -0800
Committer: Vinod Kone <vi...@gmail.com>
Committed: Tue Jan 26 15:21:30 2016 -0800

----------------------------------------------------------------------
 include/mesos/v1/executor.hpp | 75 ++++++++++++++++++++++++++++++++++++++
 src/Makefile.am               |  1 +
 2 files changed, 76 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/e4ec5122/include/mesos/v1/executor.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/v1/executor.hpp b/include/mesos/v1/executor.hpp
new file mode 100644
index 0000000..adca287
--- /dev/null
+++ b/include/mesos/v1/executor.hpp
@@ -0,0 +1,75 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef __MESOS_V1_EXECUTOR_HPP__
+#define __MESOS_V1_EXECUTOR_HPP__
+
+#include <functional>
+#include <queue>
+#include <string>
+
+#include <mesos/http.hpp>
+
+#include <mesos/v1/mesos.hpp>
+
+#include <mesos/v1/executor/executor.hpp>
+
+#include <process/owned.hpp>
+
+namespace mesos {
+namespace v1 {
+namespace executor {
+
+class MesosProcess; // Forward declaration.
+
+// Interface to Mesos for an executor.
+//
+// Expects three callbacks, 'connected', 'disconnected', and
+// 'received' which will get invoked _serially_ when it's determined
+// that we've connected (i.e. established TCP connection), disconnected
+// (i.e, connection is broken), or received events from the agent.
+// Note that we drop events while disconnected.
+class Mesos
+{
+public:
+  Mesos(ContentType contentType,
+        const std::function<void(void)>& connected,
+        const std::function<void(void)>& disconnected,
+        const std::function<void(const std::queue<Event>&)>& received);
+
+  // Delete copy constructor.
+  Mesos(const Mesos& other) = delete;
+
+  // Delete assignment operator.
+  Mesos& operator=(const Mesos& other) = delete;
+
+  virtual ~Mesos();
+
+  // Attempts to send a call to the agent.
+  //
+  // Some local validation of calls is performed which may result in dropped
+  // events without ever being sent to the agent.
+  virtual void send(const Call& call);
+
+private:
+  process::Owned<MesosProcess> process;
+};
+
+} // namespace executor {
+} // namespace v1 {
+} // namespace mesos {
+
+#endif // __MESOS_V1_EXECUTOR_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/e4ec5122/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index 8657a86..a508f30 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -518,6 +518,7 @@ v1dir = $(pkgincludedir)/v1
 
 v1_HEADERS =								\
   $(top_srcdir)/include/mesos/v1/attributes.hpp				\
+  $(top_srcdir)/include/mesos/v1/executor.hpp				\
   $(top_srcdir)/include/mesos/v1/mesos.hpp				\
   $(top_srcdir)/include/mesos/v1/mesos.proto				\
   $(top_srcdir)/include/mesos/v1/resources.hpp				\


[3/6] mesos git commit: Fixed header include order in src/tests/containerizer.hpp.

Posted by vi...@apache.org.
Fixed header include order in src/tests/containerizer.hpp.

Review: https://reviews.apache.org/r/41280/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/6cd85e93
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/6cd85e93
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/6cd85e93

Branch: refs/heads/master
Commit: 6cd85e9370ebe8d7772080123ba531fc0e2f3ec8
Parents: b338845
Author: Anand Mazumdar <ma...@gmail.com>
Authored: Tue Jan 26 15:20:51 2016 -0800
Committer: Vinod Kone <vi...@gmail.com>
Committed: Tue Jan 26 15:20:51 2016 -0800

----------------------------------------------------------------------
 src/tests/containerizer.hpp | 11 +++++------
 1 file changed, 5 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/6cd85e93/src/tests/containerizer.hpp
----------------------------------------------------------------------
diff --git a/src/tests/containerizer.hpp b/src/tests/containerizer.hpp
index 25b64da..bd9ee2c 100644
--- a/src/tests/containerizer.hpp
+++ b/src/tests/containerizer.hpp
@@ -22,10 +22,14 @@
 #include <map>
 #include <string>
 
-#include <gmock/gmock.h>
+#include <mesos/executor.hpp>
+#include <mesos/mesos.hpp>
+#include <mesos/resources.hpp>
+#include <mesos/type_utils.hpp>
 
 #include <process/dispatch.hpp>
 #include <process/future.hpp>
+#include <process/gmock.hpp>
 #include <process/pid.hpp>
 
 #include <stout/hashmap.hpp>
@@ -33,11 +37,6 @@
 #include <stout/try.hpp>
 #include <stout/uuid.hpp>
 
-#include "mesos/executor.hpp"
-#include "mesos/mesos.hpp"
-#include "mesos/resources.hpp"
-#include "mesos/type_utils.hpp"
-
 #include "slave/containerizer/containerizer.hpp"
 
 #include "slave/slave.hpp"