You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by gi...@apache.org on 2018/03/07 09:10:04 UTC
[07/10] mesos git commit: Updated discard handling in
`Docker::inspect()`.
Updated discard handling in `Docker::inspect()`.
Previously, discards of the `Future` returned by `Docker::inspect()`
were only handled at the beginning of each asynchronous continuation
in the library function's call chain. This meant that if a Docker
CLI command became stuck in between async calls, discarding the
`Future` would have no effect.
This patch adds an `onDiscard` callback to the `Future` to ensure
that any discards have the desired effect: cleanup of any spawned
subprocess, and a transition of the `Future` to the discarded state.
Since the Docker library is not a libprocess process, we must
implement this with a `shared_ptr` and a mutex, to protect against
concurrent access to the `onDiscard` callback, which must be updated
when retries are performed.
Review: https://reviews.apache.org/r/65683/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e72def49
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e72def49
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e72def49
Branch: refs/heads/1.5.x
Commit: e72def4926049d45641bbeb761faeb2613494441
Parents: 93fbdbe
Author: Greg Mann <gr...@mesosphere.io>
Authored: Wed Feb 28 15:24:17 2018 -0800
Committer: Gilbert Song <so...@gmail.com>
Committed: Wed Mar 7 01:08:12 2018 -0800
----------------------------------------------------------------------
src/docker/docker.cpp | 51 ++++++++++++++++++++++++++++++++++------------
src/docker/docker.hpp | 14 ++++++++++---
2 files changed, 49 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/e72def49/src/docker/docker.cpp
----------------------------------------------------------------------
diff --git a/src/docker/docker.cpp b/src/docker/docker.cpp
index 6013707..3e3b8d5 100644
--- a/src/docker/docker.cpp
+++ b/src/docker/docker.cpp
@@ -15,6 +15,8 @@
// limitations under the License.
#include <map>
+#include <mutex>
+#include <utility>
#include <vector>
#include <stout/error.hpp>
@@ -56,6 +58,9 @@ using namespace process;
using std::list;
using std::map;
+using std::mutex;
+using std::pair;
+using std::shared_ptr;
using std::string;
using std::vector;
@@ -1246,20 +1251,29 @@ Future<Docker::Container> Docker::inspect(
{
Owned<Promise<Docker::Container>> promise(new Promise<Docker::Container>());
+ // Holds a callback used for cleanup in case this call to 'docker inspect' is
+ // discarded, and a mutex to control access to the callback.
+ auto callback = std::make_shared<pair<lambda::function<void()>, mutex>>();
+
const string cmd = path + " -H " + socket + " inspect " + containerName;
- _inspect(cmd, promise, retryInterval);
+ _inspect(cmd, promise, retryInterval, callback);
- return promise->future();
+ return promise->future()
+ .onDiscard([callback]() {
+ synchronized (callback->second) {
+ callback->first();
+ }
+ });
}
void Docker::_inspect(
const string& cmd,
const Owned<Promise<Docker::Container>>& promise,
- const Option<Duration>& retryInterval)
+ const Option<Duration>& retryInterval,
+ shared_ptr<pair<lambda::function<void()>, mutex>> callback)
{
if (promise->future().hasDiscard()) {
- promise->discard();
return;
}
@@ -1276,13 +1290,25 @@ void Docker::_inspect(
return;
}
+ // Set the `onDiscard` callback which will clean up the subprocess if the
+ // caller discards the `Future` that we returned.
+ synchronized (callback->second) {
+ callback->first = [promise, s, cmd]() {
+ promise->discard();
+ CHECK_SOME(s);
+ commandDiscarded(s.get(), cmd);
+ };
+ }
+
// Start reading from stdout so writing to the pipe won't block
// to handle cases where the output is larger than the pipe
// capacity.
const Future<string> output = io::read(s.get().out().get());
s.get().status()
- .onAny([=]() { __inspect(cmd, promise, retryInterval, output, s.get()); });
+ .onAny([=]() {
+ __inspect(cmd, promise, retryInterval, output, s.get(), callback);
+ });
}
@@ -1291,11 +1317,10 @@ void Docker::__inspect(
const Owned<Promise<Docker::Container>>& promise,
const Option<Duration>& retryInterval,
Future<string> output,
- const Subprocess& s)
+ const Subprocess& s,
+ shared_ptr<pair<lambda::function<void()>, mutex>> callback)
{
if (promise->future().hasDiscard()) {
- promise->discard();
- output.discard();
return;
}
@@ -1313,7 +1338,7 @@ void Docker::__inspect(
VLOG(1) << "Retrying inspect with non-zero status code. cmd: '"
<< cmd << "', interval: " << stringify(retryInterval.get());
Clock::timer(retryInterval.get(),
- [=]() { _inspect(cmd, promise, retryInterval); } );
+ [=]() { _inspect(cmd, promise, retryInterval, callback); });
return;
}
@@ -1335,7 +1360,7 @@ void Docker::__inspect(
CHECK_SOME(s.out());
output
.onAny([=](const Future<string>& output) {
- ___inspect(cmd, promise, retryInterval, output);
+ ___inspect(cmd, promise, retryInterval, output, callback);
});
}
@@ -1344,10 +1369,10 @@ void Docker::___inspect(
const string& cmd,
const Owned<Promise<Docker::Container>>& promise,
const Option<Duration>& retryInterval,
- const Future<string>& output)
+ const Future<string>& output,
+ shared_ptr<pair<lambda::function<void()>, mutex>> callback)
{
if (promise->future().hasDiscard()) {
- promise->discard();
return;
}
@@ -1368,7 +1393,7 @@ void Docker::___inspect(
VLOG(1) << "Retrying inspect since container not yet started. cmd: '"
<< cmd << "', interval: " << stringify(retryInterval.get());
Clock::timer(retryInterval.get(),
- [=]() { _inspect(cmd, promise, retryInterval); } );
+ [=]() { _inspect(cmd, promise, retryInterval, callback); } );
return;
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/e72def49/src/docker/docker.hpp
----------------------------------------------------------------------
diff --git a/src/docker/docker.hpp b/src/docker/docker.hpp
index d9e71f8..f5e4a70 100644
--- a/src/docker/docker.hpp
+++ b/src/docker/docker.hpp
@@ -19,7 +19,9 @@
#include <list>
#include <map>
+#include <mutex>
#include <string>
+#include <utility>
#include <process/future.hpp>
#include <process/owned.hpp>
@@ -340,20 +342,26 @@ private:
static void _inspect(
const std::string& cmd,
const process::Owned<process::Promise<Container>>& promise,
- const Option<Duration>& retryInterval);
+ const Option<Duration>& retryInterval,
+ std::shared_ptr<std::pair<lambda::function<void()>, std::mutex>>
+ callback);
static void __inspect(
const std::string& cmd,
const process::Owned<process::Promise<Container>>& promise,
const Option<Duration>& retryInterval,
process::Future<std::string> output,
- const process::Subprocess& s);
+ const process::Subprocess& s,
+ std::shared_ptr<std::pair<lambda::function<void()>, std::mutex>>
+ callback);
static void ___inspect(
const std::string& cmd,
const process::Owned<process::Promise<Container>>& promise,
const Option<Duration>& retryInterval,
- const process::Future<std::string>& output);
+ const process::Future<std::string>& output,
+ std::shared_ptr<std::pair<lambda::function<void()>, std::mutex>>
+ callback);
static process::Future<std::list<Container>> _ps(
const Docker& docker,