You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2016/03/13 01:32:00 UTC
[1/3] mesos git commit: Pass `received` argument by const ref in the
executor library.
Repository: mesos
Updated Branches:
refs/heads/master a34bd48ef -> 295b58022
Pass `received` argument by const ref in the executor library.
Review: https://reviews.apache.org/r/44578/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/2aff8f7f
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/2aff8f7f
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/2aff8f7f
Branch: refs/heads/master
Commit: 2aff8f7fb8c6b55bf234294abdd5784659632d5c
Parents: a34bd48
Author: Anand Mazumdar <ma...@gmail.com>
Authored: Sat Mar 12 19:31:22 2016 -0500
Committer: Vinod Kone <vi...@gmail.com>
Committed: Sat Mar 12 19:31:22 2016 -0500
----------------------------------------------------------------------
src/executor/executor.cpp | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/2aff8f7f/src/executor/executor.cpp
----------------------------------------------------------------------
diff --git a/src/executor/executor.cpp b/src/executor/executor.cpp
index c3e95ea..5d00b11 100644
--- a/src/executor/executor.cpp
+++ b/src/executor/executor.cpp
@@ -139,7 +139,7 @@ public:
ContentType _contentType,
const lambda::function<void(void)>& connected,
const lambda::function<void(void)>& disconnected,
- lambda::function<void(const queue<Event>&)> received)
+ const lambda::function<void(const queue<Event>&)>& received)
: ProcessBase(generate("executor")),
state(DISCONNECTED),
contentType(_contentType),
[3/3] mesos git commit: Made changes to the executor library around
managing connections.
Posted by vi...@apache.org.
Made changes to the executor library around managing connections.
This change makes the following modifications to the library:
\- Removes passing connection objects to \`defer\` callbacks as it can
sometimes lead to deadlocks around destruction in the same execution
context.
\- Introduced 3 additional states \`CONNECTING\`, \`SUBSCRIBING\` and
\`SUBSCRIBED\`. The \`CONNECTING\` state helps us in identifying if a
connection attempt is in progress while the latter two states allows
us to drop subscribe calls if one is already is in progress.
\- Creates a random \`connectionID\` to demarcate a new connection
instance and allowing to discard a state connection attempt.
\- Changes around setting the recovery timeout timer only once.
This allows us to later discard the recoveryTimeout callback
if we connected with the agent at a later point of time.
Review: https://reviews.apache.org/r/44580/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/295b5802
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/295b5802
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/295b5802
Branch: refs/heads/master
Commit: 295b58022523c636d4331af760e5d3ba6f8e4703
Parents: 441c278
Author: Anand Mazumdar <ma...@gmail.com>
Authored: Sat Mar 12 19:31:33 2016 -0500
Committer: Vinod Kone <vi...@gmail.com>
Committed: Sat Mar 12 19:31:33 2016 -0500
----------------------------------------------------------------------
src/executor/executor.cpp | 263 +++++++++++++++++++++++++++++------------
1 file changed, 187 insertions(+), 76 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/295b5802/src/executor/executor.cpp
----------------------------------------------------------------------
diff --git a/src/executor/executor.cpp b/src/executor/executor.cpp
index 5d00b11..0050e22 100644
--- a/src/executor/executor.cpp
+++ b/src/executor/executor.cpp
@@ -21,6 +21,7 @@
#include <mesos/v1/mesos.hpp>
#include <process/async.hpp>
+#include <process/clock.hpp>
#include <process/delay.hpp>
#include <process/dispatch.hpp>
#include <process/future.hpp>
@@ -30,12 +31,14 @@
#include <process/owned.hpp>
#include <process/process.hpp>
#include <process/protobuf.hpp>
+#include <process/timer.hpp>
#include <stout/duration.hpp>
#include <stout/lambda.hpp>
#include <stout/nothing.hpp>
#include <stout/option.hpp>
#include <stout/os.hpp>
+#include <stout/uuid.hpp>
#include "common/http.hpp"
#include "common/recordio.hpp"
@@ -52,6 +55,7 @@
using namespace mesos;
using namespace mesos::internal;
+using std::ostream;
using std::queue;
using std::string;
@@ -60,11 +64,13 @@ using mesos::internal::recordio::Reader;
using mesos::internal::slave::validation::executor::call::validate;
using process::async;
+using process::Clock;
using process::delay;
using process::dispatch;
using process::Future;
using process::Mutex;
using process::Owned;
+using process::Timer;
using process::ID::generate;
@@ -245,17 +251,26 @@ public:
void send(const Call& call)
{
- if (state == DISCONNECTED) {
- drop(call, "Disconnected");
- return;
- }
-
Option<Error> error = validate(devolve(call));
if (error.isSome()) {
drop(call, error->message);
return;
}
+ if (call.type() == Call::SUBSCRIBE && state != CONNECTED) {
+ // It might be possible that the executor is retrying. We drop the
+ // request if we have an ongoing subscribe request in flight or if the
+ // executor is already subscribed.
+ drop(call, "Executor is in state " + stringify(state));
+ return;
+ }
+
+ if (call.type() != Call::SUBSCRIBE && state != SUBSCRIBED) {
+ // We drop all non-subscribe calls if we are not currently subscribed.
+ drop(call, "Executor is in state " + stringify(state));
+ return;
+ }
+
VLOG(1) << "Sending " << call.type() << " call to " << agent;
::Request request;
@@ -270,18 +285,25 @@ public:
Future<Response> response;
if (call.type() == Call::SUBSCRIBE) {
+ state = SUBSCRIBING;
+
// Send a streaming request for Subscribe call.
response = connections->subscribe.send(request, true);
} else {
response = connections->nonSubscribe.send(request);
}
- response.onAny(defer(self(), &Self::_send, call, lambda::_1));
+ CHECK_SOME(connectionId);
+ response.onAny(defer(self(),
+ &Self::_send,
+ connectionId.get(),
+ call,
+ lambda::_1));
}
~MesosProcess()
{
- close();
+ disconnect();
}
protected:
@@ -292,25 +314,48 @@ protected:
void connect()
{
- if (state == CONNECTED) {
- return;
- }
+ CHECK(state == DISCONNECTED || state == CONNECTING) << state;
+
+ connectionId = UUID::random();
+
+ state = CONNECTING;
+
+ // This automatic variable is needed for lambda capture. We need to
+ // create a copy here because `connectionId` might change by the time the
+ // second `http::connect()` gets called.
+ UUID connectionId_ = connectionId.get();
// We create two persistent connections here, one for subscribe
// call/streaming response and another for non-subscribe calls/responses.
process::http::connect(agent)
- .onAny(defer(self(), [this](const Future<Connection>& connection) {
+ .onAny(defer(self(), [this, connectionId_](
+ const Future<Connection>& connection) {
process::http::connect(agent)
- .onAny(defer(self(), &Self::connected, connection, lambda::_1));
+ .onAny(defer(self(),
+ &Self::connected,
+ connectionId_,
+ connection,
+ lambda::_1));
}));
}
void connected(
+ const UUID& _connectionId,
const Future<Connection>& connection1,
const Future<Connection>& connection2)
{
+ // It is possible that the agent process failed while we have an ongoing
+ // (re-)connection attempt with the agent.
+ if (connectionId != _connectionId) {
+ VLOG(1) << "Ignoring connection attempt from stale connection";
+ return;
+ }
+
+ CHECK_EQ(CONNECTING, state);
+ CHECK_SOME(connectionId);
+
if (!connection1.isReady()) {
- disconnected(connections,
+ disconnected(connectionId.get(),
connection1.isFailed()
? connection1.failure()
: "Subscribe future discarded");
@@ -318,7 +363,7 @@ protected:
}
if (!connection2.isReady()) {
- disconnected(connections,
+ disconnected(connectionId.get(),
connection2.isFailed()
? connection2.failure()
: "Non-subscribe future discarded");
@@ -334,15 +379,25 @@ protected:
connections->subscribe.disconnected()
.onAny(defer(self(),
&Self::disconnected,
- connections,
+ connectionId.get(),
"Subscribe connection interrupted"));
connections->nonSubscribe.disconnected()
.onAny(defer(self(),
&Self::disconnected,
- connections,
+ connectionId.get(),
"Non-subscribe connection interrupted"));
+ // Cancel the recovery timer if we connected after a disconnection with the
+ // agent when framework checkpointing is enabled. This ensures that we have
+ // only one active timer instance at a given point of time.
+ if (recoveryTimer.isSome()) {
+ CHECK(checkpoint);
+
+ Clock::cancel(recoveryTimer.get());
+ recoveryTimer = None();
+ }
+
// Invoke the connected callback once we have established both subscribe
// and non-subscribe connections with the agent.
mutex.lock()
@@ -353,52 +408,55 @@ protected:
}
void disconnected(
- const Option<Connections>& _connections,
+ const UUID& _connectionId,
const string& failure)
{
- // Ignore if we are already disconnected from the agent.
- if (state == DISCONNECTED) {
- return;
- }
-
// Ignore if the disconnection happened from an old stale connection.
- if (connections != _connections) {
+ if (connectionId != _connectionId) {
+ VLOG(1) << "Ignoring disconnection attempt from stale connection";
return;
}
+ CHECK_NE(DISCONNECTED, state);
+
VLOG(1) << "Disconnected from agent: " << failure;
- state = DISCONNECTED;
+ bool connected =
+ (state == CONNECTED || state == SUBSCRIBING || state == SUBSCRIBED);
- // NOTE: We will be here if either subscribe or non-subscribe connection is
- // disconnected. We explicitly disconnect both the connections here for
- // simplicity.
- CHECK_SOME(connections);
+ if (connected) {
+ // Invoke the disconnected callback the first time we disconnect from
+ // the agent.
+ mutex.lock()
+ .then(defer(self(), [this]() {
+ return async(callbacks.disconnected);
+ }))
+ .onAny(lambda::bind(&Mutex::unlock, mutex));
+ }
- connections->subscribe.disconnect();
- connections->nonSubscribe.disconnect();
+ // Disconnect any active connections.
+ disconnect();
- // Close the old subscribed response stream.
- close();
+ // This represents a disconnection due to a backoff attempt after being
+ // already disconnected from the agent. We had already started the
+ // recovery timer when we initially noticed the disconnection.
+ if (recoveryTimer.isSome()) {
+ CHECK(checkpoint);
- // Invoke the disconnected callback the first time we disconnect from
- // the agent.
- mutex.lock()
- .then(defer(self(), [this]() {
- return async(callbacks.disconnected);
- }))
- .onAny(lambda::bind(&Mutex::unlock, mutex));
+ return;
+ }
- if (checkpoint) {
+ if (checkpoint && connected) {
CHECK_SOME(recoveryTimeout);
+ CHECK_NONE(recoveryTimer);
- CHECK_SOME(connections);
-
- // Set up recovery timeout upon disconnection.
- delay(recoveryTimeout.get(),
- self(),
- &Self::_recoveryTimeout,
- connections.get());
+ // Set up the recovery timeout upon disconnection. We only set it once per
+ // disconnection. This ensures that when we try to (re-)connect with
+ // the agent and are unsuccessful, we don't restart the recovery timer.
+ recoveryTimer = delay(
+ recoveryTimeout.get(),
+ self(),
+ &Self::_recoveryTimeout);
// Backoff and reconnect only if framework checkpointing is enabled.
backoff();
@@ -409,10 +467,12 @@ protected:
void backoff()
{
- if (state == CONNECTED) {
+ if (state == CONNECTED || state == SUBSCRIBING || state == SUBSCRIBED) {
return;
}
+ CHECK(state == DISCONNECTED || state == CONNECTING) << state;
+
CHECK(checkpoint);
CHECK_SOME(maxBackoff);
@@ -434,26 +494,42 @@ protected:
return future;
}
- void _recoveryTimeout(const Connections& _connections)
+ void _recoveryTimeout()
{
- // If we're connected, don't do anything.
- if (state == CONNECTED) {
+ // It's possible that a new connection was established since the timeout
+ // fired and we were unable to cancel this timeout. If this occurs, don't
+ // bother trying to shutdown the executor.
+ if (recoveryTimer.isNone() || !recoveryTimer->timeout().expired()) {
return;
}
- // Ignore if this timeout is for a stale connection.
- if (connections != _connections) {
- return;
- }
+ CHECK(state == DISCONNECTED || state == CONNECTING) << state;
CHECK_SOME(recoveryTimeout);
-
LOG(INFO) << "Recovery timeout of " << recoveryTimeout.get()
<< " exceeded; Shutting down";
shutdown();
}
+ void disconnect()
+ {
+ if (connections.isSome()) {
+ connections->subscribe.disconnect();
+ connections->nonSubscribe.disconnect();
+ }
+
+ if (subscribed.isSome()) {
+ subscribed->reader.close();
+ }
+
+ state = DISCONNECTED;
+
+ connections = None();
+ connectionId = None();
+ subscribed = None();
+ }
+
// Helper for injecting an ERROR event.
void error(const string& message)
{
@@ -466,9 +542,19 @@ protected:
receive(event, true);
}
- void _send(const Call& call, const Future<Response>& response)
+ void _send(
+ const UUID& _connectionId,
+ const Call& call,
+ const Future<Response>& response)
{
+ // It is possible that the agent process failed before a response could
+ // be received.
+ if (connectionId != _connectionId) {
+ return;
+ }
+
CHECK(!response.isDiscarded());
+ CHECK(state == SUBSCRIBING || state == SUBSCRIBED) << state;
// This can happen if the agent process is restarted or a network blip
// caused the socket to timeout. Eventually, the executor would
@@ -485,6 +571,8 @@ protected:
CHECK_EQ(response->type, Response::PIPE);
CHECK_SOME(response->reader);
+ state = SUBSCRIBED;
+
Pipe::Reader reader = response->reader.get();
auto deserializer =
@@ -493,9 +581,6 @@ protected:
Owned<Reader<Event>> decoder(
new Reader<Event>(Decoder<Event>(deserializer), reader));
- // Close previous reader.
- close();
-
subscribed = SubscribedResponse {reader, decoder};
read();
@@ -508,6 +593,13 @@ protected:
return;
}
+ // We reset the state to connected if the subscribe call did not
+ // succceed (e.g., the agent has not yet set up HTTP routes). The executor
+ // can then retry the subscribe call.
+ if (call.type() == Call::SUBSCRIBE) {
+ state = CONNECTED;
+ }
+
if (response->code == process::http::Status::SERVICE_UNAVAILABLE) {
// This could happen if the agent is still in the process of recovery.
LOG(WARNING) << "Received '" << response->status << "' ("
@@ -550,21 +642,25 @@ protected:
return;
}
+ CHECK_EQ(SUBSCRIBED, state);
+ CHECK_SOME(connectionId);
+
// This could happen if the agent process died while sending a response.
if (event.isFailed()) {
LOG(ERROR) << "Failed to decode the stream of events: "
<< event.failure();
- disconnected(connections, event.failure());
+ disconnected(connectionId.get(), event.failure());
return;
}
+ // This could happen if the agent failed over after sending an event.
if (event->isNone()) {
const string error = "End-Of-File received from agent. The agent closed "
"the event stream";
LOG(ERROR) << error;
- disconnected(connections, error);
+ disconnected(connectionId.get(), error);
return;
}
@@ -577,20 +673,12 @@ protected:
read();
}
- void close()
- {
- if (subscribed.isSome()) {
- subscribed->reader.close();
- subscribed = None();
- }
- }
-
void receive(const Event& event, bool isLocallyInjected)
{
- // Check if we're disconnected but received an event.
- if (!isLocallyInjected && state == DISCONNECTED) {
+ // Check if we're no longer subscribed but received an event.
+ if (!isLocallyInjected && state != SUBSCRIBED) {
LOG(WARNING) << "Ignoring " << stringify(event.type())
- << " event because we're disconnected";
+ << " event because we're no longer subscribed";
return;
}
@@ -646,12 +734,34 @@ private:
process::Owned<Reader<Event>> decoder;
};
- enum
+ enum State
{
- CONNECTED, // Established subscribe and non-subscribe connection.
- DISCONNECTED // Either of subscribe/non-subscribe connection is broken.
+ DISCONNECTED, // Either of subscribe/non-subscribe connection is broken.
+ CONNECTING, // Trying to establish subscribe and non-subscribe connections.
+ CONNECTED, // Established subscribe and non-subscribe connections.
+ SUBSCRIBING, // Trying to subscribe with the agent.
+ SUBSCRIBED // Subscribed with the agent.
} state;
+ friend ostream& operator<<(ostream& stream, State state)
+ {
+ switch (state) {
+ case DISCONNECTED: return stream << "DISCONNECTED";
+ case CONNECTING: return stream << "CONNECTING";
+ case CONNECTED: return stream << "CONNECTED";
+ case SUBSCRIBING: return stream << "SUBSCRIBING";
+ case SUBSCRIBED: return stream << "SUBSCRIBED";
+ }
+
+ UNREACHABLE();
+ }
+
+ // There can be multiple simulataneous ongoing (re-)connection attempts with
+ // the agent (e.g., the agent process restarted while an attempt was in
+ // progress). This helps us in uniquely identifying the current connection
+ // instance and ignoring the stale instance.
+ Option<UUID> connectionId; // UUID to identify the connection instance.
+
ContentType contentType;
Callbacks callbacks;
Mutex mutex; // Used to serialize the callback invocations.
@@ -663,6 +773,7 @@ private:
bool checkpoint;
Option<Duration> recoveryTimeout;
Option<Duration> maxBackoff;
+ Option<Timer> recoveryTimer;
Duration shutdownGracePeriod;
};
[2/3] mesos git commit: Minor fix to output state during a `CHECK`
failure.
Posted by vi...@apache.org.
Minor fix to output state during a `CHECK` failure.
This change outputs the state when a check fails inside the scheduler library.
Review: https://reviews.apache.org/r/44579/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/441c2784
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/441c2784
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/441c2784
Branch: refs/heads/master
Commit: 441c278459f9dc6f35e9dff8b6be7704ed075bc9
Parents: 2aff8f7
Author: Anand Mazumdar <ma...@gmail.com>
Authored: Sat Mar 12 19:31:28 2016 -0500
Committer: Vinod Kone <vi...@gmail.com>
Committed: Sat Mar 12 19:31:28 2016 -0500
----------------------------------------------------------------------
src/scheduler/scheduler.cpp | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/441c2784/src/scheduler/scheduler.cpp
----------------------------------------------------------------------
diff --git a/src/scheduler/scheduler.cpp b/src/scheduler/scheduler.cpp
index b010a81..35f4794 100644
--- a/src/scheduler/scheduler.cpp
+++ b/src/scheduler/scheduler.cpp
@@ -482,7 +482,7 @@ protected:
}
CHECK(!response.isDiscarded());
- CHECK(state == SUBSCRIBING || state == SUBSCRIBED);
+ CHECK(state == SUBSCRIBING || state == SUBSCRIBED) << state;
// This can happen during a master failover or a network blip
// causing the socket to timeout. Eventually, the scheduler would