You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2015/12/15 19:56:05 UTC

[6/7] mesos git commit: Made HDFS::du asynchrounous.

Made HDFS::du asynchrounous.

Review: https://reviews.apache.org/r/40946/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/00f6693a
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/00f6693a
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/00f6693a

Branch: refs/heads/master
Commit: 00f6693a6d9d71d122cee066a9cbb5d25bb67e1a
Parents: 3d5e42a
Author: Jie Yu <yu...@gmail.com>
Authored: Mon Dec 14 11:42:58 2015 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Tue Dec 15 10:55:33 2015 -0800

----------------------------------------------------------------------
 src/hdfs/hdfs.cpp                   | 72 +++++++++++++++++---------------
 src/hdfs/hdfs.hpp                   |  2 +-
 src/slave/containerizer/fetcher.cpp |  9 ++--
 3 files changed, 46 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/00f6693a/src/hdfs/hdfs.cpp
----------------------------------------------------------------------
diff --git a/src/hdfs/hdfs.cpp b/src/hdfs/hdfs.cpp
index 2b7a58e..51f016b 100644
--- a/src/hdfs/hdfs.cpp
+++ b/src/hdfs/hdfs.cpp
@@ -159,48 +159,54 @@ Future<bool> HDFS::exists(const string& path)
 }
 
 
-Try<Bytes> HDFS::du(const string& _path)
+Future<Bytes> HDFS::du(const string& _path)
 {
   const string path = absolutePath(_path);
 
-  Try<string> command = strings::format(
-      "%s fs -du '%s'", hadoop, path);
-
-  CHECK_SOME(command);
-
-  // We are piping stderr to stdout so that we can see the error (if
-  // any) in the logs emitted by `os::shell()` in case of failure.
-  //
-  // TODO(marco): this was the existing logic, but not sure it is
-  // actually needed.
-  Try<string> out = os::shell(command.get() + " 2>&1");
+  Try<Subprocess> s = subprocess(
+      hadoop,
+      {"hadoop", "fs", "-du", path},
+      Subprocess::PATH("/dev/null"),
+      Subprocess::PIPE(),
+      Subprocess::PIPE());
 
-  if (out.isError()) {
-    return Error("HDFS du failed: " + out.error());
+  if (s.isError()) {
+    return Failure("Failed to execute the subprocess: " + s.error());
   }
 
-  // We expect 2 space-separated output fields; a number of bytes then
-  // the name of the path we gave. The 'hadoop' command can emit
-  // various WARN or other log messages, so we make an effort to scan
-  // for the field we want.
-  foreach (const string& line, strings::tokenize(out.get(), "\n")) {
-    // Note that we use tokenize() rather than split() since fields
-    // can be delimited by multiple spaces.
-    vector<string> fields = strings::tokenize(line, " ");
-
-    if (fields.size() == 2 && fields[1] == path) {
-      Result<size_t> size = numify<size_t>(fields[0]);
-      if (size.isError()) {
-        return Error("HDFS du returned unexpected format: " + size.error());
-      } else if (size.isNone()) {
-        return Error("HDFS du returned unexpected format");
+  return result(s.get())
+    .then([path](const CommandResult& result) -> Future<Bytes> {
+      if (result.status.isNone()) {
+        return Failure("Failed to reap the subprocess");
+      }
+
+      if (result.status.get() != 0) {
+        return Failure(
+            "Unexpected result from the subprocess: "
+            "status='" + stringify(result.status.get()) + "', " +
+            "stdout='" + result.out + "', " +
+            "stderr='" + result.err + "'");
       }
 
-      return Bytes(size.get());
-    }
-  }
+      // We expect 2 space-separated output fields; a number of bytes
+      // then the name of the path we gave. The 'hadoop' command can
+      // emit various WARN or other log messages, so we make an effort
+      // to scan for the field we want.
+      foreach (const string& line, strings::tokenize(result.out, "\n")) {
+        // Note that we use tokenize() rather than split() since
+        // fields can be delimited by multiple spaces.
+        vector<string> fields = strings::tokenize(line, " \t");
+
+        if (fields.size() == 2 && fields[1] == path) {
+          Result<size_t> size = numify<size_t>(fields[0]);
+          if (size.isSome()) {
+            return Bytes(size.get());
+          }
+        }
+      }
 
-  return Error("HDFS du returned an unexpected format: '" + out.get() + "'");
+      return Failure("Unexpected output format: '" + result.out + "'");
+    });
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/00f6693a/src/hdfs/hdfs.hpp
----------------------------------------------------------------------
diff --git a/src/hdfs/hdfs.hpp b/src/hdfs/hdfs.hpp
index 24d3ffc..abdb9b9 100644
--- a/src/hdfs/hdfs.hpp
+++ b/src/hdfs/hdfs.hpp
@@ -50,7 +50,7 @@ public:
       const Option<std::string>& hadoop = None());
 
   process::Future<bool> exists(const std::string& path);
-  Try<Bytes> du(const std::string& path);
+  process::Future<Bytes> du(const std::string& path);
   process::Future<Nothing> rm(const std::string& path);
 
   process::Future<Nothing> copyFromLocal(

http://git-wip-us.apache.org/repos/asf/mesos/blob/00f6693a/src/slave/containerizer/fetcher.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/fetcher.cpp b/src/slave/containerizer/fetcher.cpp
index e479bd3..4ac9149 100644
--- a/src/slave/containerizer/fetcher.cpp
+++ b/src/slave/containerizer/fetcher.cpp
@@ -281,9 +281,12 @@ static Try<Bytes> fetchSize(
     return Error("Failed to create HDFS client: " + hdfs.error());
   }
 
-  Try<Bytes> size = hdfs.get()->du(uri);
-  if (size.isError()) {
-    return Error("Hadoop client could not determine size: " + size.error());
+  Future<Bytes> size = hdfs.get()->du(uri);
+  size.await();
+
+  if (!size.isReady()) {
+    return Error("Hadoop client could not determine size: " +
+                 (size.isFailed() ? size.failure() : "discarded"));
   }
 
   return size.get();