You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2016/12/14 16:50:23 UTC

[2/2] mesos git commit: Used process::loop in infinitely recursive functions.

Used process::loop in infinitely recursive functions.

Review: https://reviews.apache.org/r/54751


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a3a65509
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a3a65509
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a3a65509

Branch: refs/heads/master
Commit: a3a65509acebefa285d09079d39a0ebf7b5f086b
Parents: fe6c3e4
Author: Benjamin Hindman <be...@gmail.com>
Authored: Wed Dec 14 08:32:08 2016 -0800
Committer: Benjamin Hindman <be...@gmail.com>
Committed: Wed Dec 14 08:45:22 2016 -0800

----------------------------------------------------------------------
 src/common/recordio.hpp | 45 ++++++++++++++++++++++++--------------------
 src/slave/http.cpp      | 34 ++++++++++++++++-----------------
 2 files changed, 42 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/a3a65509/src/common/recordio.hpp
----------------------------------------------------------------------
diff --git a/src/common/recordio.hpp b/src/common/recordio.hpp
index 0f6b47b..5a22d06 100644
--- a/src/common/recordio.hpp
+++ b/src/common/recordio.hpp
@@ -26,6 +26,7 @@
 #include <process/defer.hpp>
 #include <process/dispatch.hpp>
 #include <process/http.hpp>
+#include <process/loop.hpp>
 #include <process/owned.hpp>
 #include <process/pid.hpp>
 #include <process/process.hpp>
@@ -113,26 +114,30 @@ process::Future<Nothing> transform(
     const std::function<std::string(const T&)>& func,
     process::http::Pipe::Writer writer)
 {
-  return reader->read()
-    .then([=](const Result<T>& record) mutable -> process::Future<Nothing> {
-      // This could happen if EOF is sent by the writer.
-      if (record.isNone()) {
-        return Nothing();
-      }
-
-      // This could happen if there is a de-serialization error.
-      if (record.isError()) {
-        return process::Failure(record.error());
-      }
-
-      // TODO(vinod): Instead of detecting that the reader went away only
-      // after attempting a write, leverage `writer.readerClosed` future.
-      if (!writer.write(func(record.get()))) {
-        return process::Failure("Write failed to the pipe");
-      }
-
-      return transform(std::move(reader), func, writer);
-  });
+  return process::loop(
+      None(),
+      [=]() {
+        return reader->read();
+      },
+      [=](const Result<T>& record) mutable -> process::Future<bool> {
+        // This could happen if EOF is sent by the writer.
+        if (record.isNone()) {
+          return false;
+        }
+
+        // This could happen if there is a de-serialization error.
+        if (record.isError()) {
+          return process::Failure(record.error());
+        }
+
+        // TODO(vinod): Instead of detecting that the reader went away only
+        // after attempting a write, leverage `writer.readerClosed` future.
+        if (!writer.write(func(record.get()))) {
+          return process::Failure("Write failed to the pipe");
+        }
+
+        return true;
+      });
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/a3a65509/src/slave/http.cpp
----------------------------------------------------------------------
diff --git a/src/slave/http.cpp b/src/slave/http.cpp
index 4cd352f..ecec24a 100644
--- a/src/slave/http.cpp
+++ b/src/slave/http.cpp
@@ -42,6 +42,7 @@
 #include <process/http.hpp>
 #include <process/limiter.hpp>
 #include <process/logging.hpp>
+#include <process/loop.hpp>
 #include <process/owned.hpp>
 
 #include <process/metrics/metrics.hpp>
@@ -85,6 +86,7 @@ using process::Failure;
 using process::Future;
 using process::HELP;
 using process::Logging;
+using process::loop;
 using process::Owned;
 using process::TLDR;
 
@@ -2582,25 +2584,23 @@ Future<Response> Slave::Http::attachContainerInput(
 // TODO(vinod): Move this to libprocess if this is more generally useful.
 Future<Nothing> connect(Pipe::Reader reader, Pipe::Writer writer)
 {
-  return reader.read()
-    .then([reader, writer](const Future<string>& chunk) mutable
-        -> Future<Nothing> {
-      if (!chunk.isReady()) {
-        return process::Failure(
-            chunk.isFailed() ? chunk.failure() : "discarded");
-      }
-
-      if (chunk->empty()) {
-        // EOF case.
-        return Nothing();
-      }
+  return loop(
+      None(),
+      [=]() mutable {
+        return reader.read();
+      },
+      [=](const string& chunk) mutable -> Future<bool> {
+        if (chunk.empty()) {
+          // EOF case.
+          return false;
+        }
 
-      if (!writer.write(chunk.get())) {
-        return process::Failure("Write failed to the pipe");
-      }
+        if (!writer.write(chunk)) {
+          return Failure("Write failed to the pipe");
+        }
 
-      return connect(reader, writer);
-    });
+        return true;
+      });
 }