You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2015/12/15 19:56:05 UTC
[6/7] mesos git commit: Made HDFS::du asynchrounous.
Made HDFS::du asynchrounous.
Review: https://reviews.apache.org/r/40946/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/00f6693a
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/00f6693a
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/00f6693a
Branch: refs/heads/master
Commit: 00f6693a6d9d71d122cee066a9cbb5d25bb67e1a
Parents: 3d5e42a
Author: Jie Yu <yu...@gmail.com>
Authored: Mon Dec 14 11:42:58 2015 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Tue Dec 15 10:55:33 2015 -0800
----------------------------------------------------------------------
src/hdfs/hdfs.cpp | 72 +++++++++++++++++---------------
src/hdfs/hdfs.hpp | 2 +-
src/slave/containerizer/fetcher.cpp | 9 ++--
3 files changed, 46 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/00f6693a/src/hdfs/hdfs.cpp
----------------------------------------------------------------------
diff --git a/src/hdfs/hdfs.cpp b/src/hdfs/hdfs.cpp
index 2b7a58e..51f016b 100644
--- a/src/hdfs/hdfs.cpp
+++ b/src/hdfs/hdfs.cpp
@@ -159,48 +159,54 @@ Future<bool> HDFS::exists(const string& path)
}
-Try<Bytes> HDFS::du(const string& _path)
+Future<Bytes> HDFS::du(const string& _path)
{
const string path = absolutePath(_path);
- Try<string> command = strings::format(
- "%s fs -du '%s'", hadoop, path);
-
- CHECK_SOME(command);
-
- // We are piping stderr to stdout so that we can see the error (if
- // any) in the logs emitted by `os::shell()` in case of failure.
- //
- // TODO(marco): this was the existing logic, but not sure it is
- // actually needed.
- Try<string> out = os::shell(command.get() + " 2>&1");
+ Try<Subprocess> s = subprocess(
+ hadoop,
+ {"hadoop", "fs", "-du", path},
+ Subprocess::PATH("/dev/null"),
+ Subprocess::PIPE(),
+ Subprocess::PIPE());
- if (out.isError()) {
- return Error("HDFS du failed: " + out.error());
+ if (s.isError()) {
+ return Failure("Failed to execute the subprocess: " + s.error());
}
- // We expect 2 space-separated output fields; a number of bytes then
- // the name of the path we gave. The 'hadoop' command can emit
- // various WARN or other log messages, so we make an effort to scan
- // for the field we want.
- foreach (const string& line, strings::tokenize(out.get(), "\n")) {
- // Note that we use tokenize() rather than split() since fields
- // can be delimited by multiple spaces.
- vector<string> fields = strings::tokenize(line, " ");
-
- if (fields.size() == 2 && fields[1] == path) {
- Result<size_t> size = numify<size_t>(fields[0]);
- if (size.isError()) {
- return Error("HDFS du returned unexpected format: " + size.error());
- } else if (size.isNone()) {
- return Error("HDFS du returned unexpected format");
+ return result(s.get())
+ .then([path](const CommandResult& result) -> Future<Bytes> {
+ if (result.status.isNone()) {
+ return Failure("Failed to reap the subprocess");
+ }
+
+ if (result.status.get() != 0) {
+ return Failure(
+ "Unexpected result from the subprocess: "
+ "status='" + stringify(result.status.get()) + "', " +
+ "stdout='" + result.out + "', " +
+ "stderr='" + result.err + "'");
}
- return Bytes(size.get());
- }
- }
+ // We expect 2 space-separated output fields; a number of bytes
+ // then the name of the path we gave. The 'hadoop' command can
+ // emit various WARN or other log messages, so we make an effort
+ // to scan for the field we want.
+ foreach (const string& line, strings::tokenize(result.out, "\n")) {
+ // Note that we use tokenize() rather than split() since
+ // fields can be delimited by multiple spaces.
+ vector<string> fields = strings::tokenize(line, " \t");
+
+ if (fields.size() == 2 && fields[1] == path) {
+ Result<size_t> size = numify<size_t>(fields[0]);
+ if (size.isSome()) {
+ return Bytes(size.get());
+ }
+ }
+ }
- return Error("HDFS du returned an unexpected format: '" + out.get() + "'");
+ return Failure("Unexpected output format: '" + result.out + "'");
+ });
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/00f6693a/src/hdfs/hdfs.hpp
----------------------------------------------------------------------
diff --git a/src/hdfs/hdfs.hpp b/src/hdfs/hdfs.hpp
index 24d3ffc..abdb9b9 100644
--- a/src/hdfs/hdfs.hpp
+++ b/src/hdfs/hdfs.hpp
@@ -50,7 +50,7 @@ public:
const Option<std::string>& hadoop = None());
process::Future<bool> exists(const std::string& path);
- Try<Bytes> du(const std::string& path);
+ process::Future<Bytes> du(const std::string& path);
process::Future<Nothing> rm(const std::string& path);
process::Future<Nothing> copyFromLocal(
http://git-wip-us.apache.org/repos/asf/mesos/blob/00f6693a/src/slave/containerizer/fetcher.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/fetcher.cpp b/src/slave/containerizer/fetcher.cpp
index e479bd3..4ac9149 100644
--- a/src/slave/containerizer/fetcher.cpp
+++ b/src/slave/containerizer/fetcher.cpp
@@ -281,9 +281,12 @@ static Try<Bytes> fetchSize(
return Error("Failed to create HDFS client: " + hdfs.error());
}
- Try<Bytes> size = hdfs.get()->du(uri);
- if (size.isError()) {
- return Error("Hadoop client could not determine size: " + size.error());
+ Future<Bytes> size = hdfs.get()->du(uri);
+ size.await();
+
+ if (!size.isReady()) {
+ return Error("Hadoop client could not determine size: " +
+ (size.isFailed() ? size.failure() : "discarded"));
}
return size.get();