You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2016/12/14 16:50:23 UTC
[2/2] mesos git commit: Used process::loop in infinitely recursive
functions.
Used process::loop in infinitely recursive functions.
Review: https://reviews.apache.org/r/54751
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a3a65509
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a3a65509
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a3a65509
Branch: refs/heads/master
Commit: a3a65509acebefa285d09079d39a0ebf7b5f086b
Parents: fe6c3e4
Author: Benjamin Hindman <be...@gmail.com>
Authored: Wed Dec 14 08:32:08 2016 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Wed Dec 14 08:45:22 2016 -0800
----------------------------------------------------------------------
src/common/recordio.hpp | 45 ++++++++++++++++++++++++--------------------
src/slave/http.cpp | 34 ++++++++++++++++-----------------
2 files changed, 42 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/a3a65509/src/common/recordio.hpp
----------------------------------------------------------------------
diff --git a/src/common/recordio.hpp b/src/common/recordio.hpp
index 0f6b47b..5a22d06 100644
--- a/src/common/recordio.hpp
+++ b/src/common/recordio.hpp
@@ -26,6 +26,7 @@
#include <process/defer.hpp>
#include <process/dispatch.hpp>
#include <process/http.hpp>
+#include <process/loop.hpp>
#include <process/owned.hpp>
#include <process/pid.hpp>
#include <process/process.hpp>
@@ -113,26 +114,30 @@ process::Future<Nothing> transform(
const std::function<std::string(const T&)>& func,
process::http::Pipe::Writer writer)
{
- return reader->read()
- .then([=](const Result<T>& record) mutable -> process::Future<Nothing> {
- // This could happen if EOF is sent by the writer.
- if (record.isNone()) {
- return Nothing();
- }
-
- // This could happen if there is a de-serialization error.
- if (record.isError()) {
- return process::Failure(record.error());
- }
-
- // TODO(vinod): Instead of detecting that the reader went away only
- // after attempting a write, leverage `writer.readerClosed` future.
- if (!writer.write(func(record.get()))) {
- return process::Failure("Write failed to the pipe");
- }
-
- return transform(std::move(reader), func, writer);
- });
+ return process::loop(
+ None(),
+ [=]() {
+ return reader->read();
+ },
+ [=](const Result<T>& record) mutable -> process::Future<bool> {
+ // This could happen if EOF is sent by the writer.
+ if (record.isNone()) {
+ return false;
+ }
+
+ // This could happen if there is a de-serialization error.
+ if (record.isError()) {
+ return process::Failure(record.error());
+ }
+
+ // TODO(vinod): Instead of detecting that the reader went away only
+ // after attempting a write, leverage `writer.readerClosed` future.
+ if (!writer.write(func(record.get()))) {
+ return process::Failure("Write failed to the pipe");
+ }
+
+ return true;
+ });
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/a3a65509/src/slave/http.cpp
----------------------------------------------------------------------
diff --git a/src/slave/http.cpp b/src/slave/http.cpp
index 4cd352f..ecec24a 100644
--- a/src/slave/http.cpp
+++ b/src/slave/http.cpp
@@ -42,6 +42,7 @@
#include <process/http.hpp>
#include <process/limiter.hpp>
#include <process/logging.hpp>
+#include <process/loop.hpp>
#include <process/owned.hpp>
#include <process/metrics/metrics.hpp>
@@ -85,6 +86,7 @@ using process::Failure;
using process::Future;
using process::HELP;
using process::Logging;
+using process::loop;
using process::Owned;
using process::TLDR;
@@ -2582,25 +2584,23 @@ Future<Response> Slave::Http::attachContainerInput(
// TODO(vinod): Move this to libprocess if this is more generally useful.
Future<Nothing> connect(Pipe::Reader reader, Pipe::Writer writer)
{
- return reader.read()
- .then([reader, writer](const Future<string>& chunk) mutable
- -> Future<Nothing> {
- if (!chunk.isReady()) {
- return process::Failure(
- chunk.isFailed() ? chunk.failure() : "discarded");
- }
-
- if (chunk->empty()) {
- // EOF case.
- return Nothing();
- }
+ return loop(
+ None(),
+ [=]() mutable {
+ return reader.read();
+ },
+ [=](const string& chunk) mutable -> Future<bool> {
+ if (chunk.empty()) {
+ // EOF case.
+ return false;
+ }
- if (!writer.write(chunk.get())) {
- return process::Failure("Write failed to the pipe");
- }
+ if (!writer.write(chunk)) {
+ return Failure("Write failed to the pipe");
+ }
- return connect(reader, writer);
- });
+ return true;
+ });
}