You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2015/12/15 19:56:00 UTC

[1/7] mesos git commit: Added tests for HDFS client.

Repository: mesos
Updated Branches:
  refs/heads/master 925e99ea7 -> 293639961


Added tests for HDFS client.

Review: https://reviews.apache.org/r/41384


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/29363996
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/29363996
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/29363996

Branch: refs/heads/master
Commit: 293639961525e49e17dccd8d380684b8d668840d
Parents: 00f6693
Author: Jie Yu <yu...@gmail.com>
Authored: Mon Dec 14 17:19:31 2015 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Tue Dec 15 10:55:33 2015 -0800

----------------------------------------------------------------------
 src/Makefile.am          |   1 +
 src/tests/hdfs_tests.cpp | 210 ++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 211 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/29363996/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index acd17de..8f6b98b 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -1688,6 +1688,7 @@ mesos_tests_SOURCES =						\
   tests/files_tests.cpp						\
   tests/flags.cpp						\
   tests/gc_tests.cpp						\
+  tests/hdfs_tests.cpp						\
   tests/health_check_tests.cpp					\
   tests/hierarchical_allocator_tests.cpp			\
   tests/hook_tests.cpp						\

http://git-wip-us.apache.org/repos/asf/mesos/blob/29363996/src/tests/hdfs_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/hdfs_tests.cpp b/src/tests/hdfs_tests.cpp
new file mode 100644
index 0000000..29f1560
--- /dev/null
+++ b/src/tests/hdfs_tests.cpp
@@ -0,0 +1,210 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include <process/gtest.hpp>
+#include <process/owned.hpp>
+
+#include <stout/check.hpp>
+#include <stout/os.hpp>
+#include <stout/path.hpp>
+
+#include <stout/os/touch.hpp>
+#include <stout/os/write.hpp>
+
+#include <stout/tests/utils.hpp>
+
+#include "hdfs/hdfs.hpp"
+
+using std::string;
+
+using process::Future;
+using process::Owned;
+
+namespace mesos {
+namespace internal {
+namespace tests {
+
+class HdfsTest : public TemporaryDirectoryTest
+{
+public:
+  virtual void SetUp()
+  {
+    TemporaryDirectoryTest::SetUp();
+
+    // Create a fake hadoop command line tool. The tests serialize
+    // bash scripts into this file which emulates the hadoop client's
+    // logic while operating on the local filesystem.
+    hadoop = path::join(os::getcwd(), "hadoop");
+
+    ASSERT_SOME(os::touch(hadoop));
+
+    // Make sure the script has execution permission.
+    ASSERT_SOME(os::chmod(
+        hadoop,
+        S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH | S_IXOTH));
+  }
+
+protected:
+  string hadoop;
+};
+
+
+// This test verifies the 'HDFS::exists(path)' method. We emulate the
+// hadoop client by testing the existence of a local file.
+TEST_F(HdfsTest, Exists)
+{
+  // The script emulating 'hadoop fs -test -e <path>'.
+  // NOTE: We emulate a version call here which is exercised when
+  // creating the HDFS client.
+  ASSERT_SOME(os::write(
+      hadoop,
+      "#!/bin/sh\n"
+      "if [ \"$1\" = \"version\" ]; then\n"
+      "  exit 0\n"
+      "fi\n"
+      "test -e $4\n"));
+
+  Try<Owned<HDFS>> hdfs = HDFS::create(hadoop);
+  ASSERT_SOME(hdfs);
+
+  Future<bool> exists = hdfs.get()->exists(hadoop);
+  AWAIT_READY(exists);
+  EXPECT_TRUE(exists.get());
+
+  exists = hdfs.get()->exists(path::join(os::getcwd(), "NotExists"));
+  AWAIT_READY(exists);
+  EXPECT_FALSE(exists.get());
+}
+
+
+// This test verifies the 'HDFS::du(path)' method. We emulate the
+// hadoop client by doing a 'du' on the local filesystem.
+TEST_F(HdfsTest, Du)
+{
+  // The script emulating 'hadoop fs -du <path>'.
+  // NOTE: We emulate a version call here which is exercised when
+  // creating the HDFS client.
+  ASSERT_SOME(os::write(
+      hadoop,
+      "#!/bin/sh\n"
+      "if [ \"$1\" = \"version\" ]; then\n"
+      "  exit 0\n"
+      "fi\n"
+      "du $3\n"));
+
+  Try<Owned<HDFS>> hdfs = HDFS::create(hadoop);
+  ASSERT_SOME(hdfs);
+
+  Future<Bytes> bytes = hdfs.get()->du(hadoop);
+  AWAIT_READY(bytes);
+
+  bytes = hdfs.get()->du(path::join(os::getcwd(), "Invalid"));
+  AWAIT_FAILED(bytes);
+}
+
+
+// This test verifies the 'HDFS::rm(path)' method. We emulate the
+// hadoop client by removing a file on the local filesystem.
+TEST_F(HdfsTest, Rm)
+{
+  // The script emulating 'hadoop fs -rm <path>'.
+  // NOTE: We emulate a version call here which is exercised when
+  // creating the HDFS client.
+  ASSERT_SOME(os::write(
+      hadoop,
+      "#!/bin/sh\n"
+      "if [ \"$1\" = \"version\" ]; then\n"
+      "  exit 0\n"
+      "fi\n"
+      "rm $3\n"));
+
+  Try<Owned<HDFS>> hdfs = HDFS::create(hadoop);
+  ASSERT_SOME(hdfs);
+
+  string file = path::join(os::getcwd(), "file");
+
+  ASSERT_SOME(os::touch(file));
+
+  Future<Nothing> rm = hdfs.get()->rm(file);
+  AWAIT_READY(rm);
+
+  rm = hdfs.get()->rm(path::join(os::getcwd(), "Invalid"));
+  AWAIT_FAILED(rm);
+}
+
+
+// This test verifies the 'HDFS::copyFromLocal(from, to)' method. We
+// emulate the hadoop client by doing a 'cp' on the local filesystem.
+TEST_F(HdfsTest, CopyFromLocal)
+{
+  // The script emulating 'hadoop fs -copyFromLocal <from> <to>'.
+  // NOTE: We emulate a version call here which is exercised when
+  // creating the HDFS client.
+  ASSERT_SOME(os::write(
+      hadoop,
+      "#!/bin/sh\n"
+      "if [ \"$1\" = \"version\" ]; then\n"
+      "  exit 0\n"
+      "fi\n"
+      "cp $3 $4"));
+
+  Try<Owned<HDFS>> hdfs = HDFS::create(hadoop);
+  ASSERT_SOME(hdfs);
+
+  string file1 = path::join(os::getcwd(), "file1");
+  string file2 = path::join(os::getcwd(), "file2");
+
+  ASSERT_SOME(os::write(file1, "abc"));
+
+  Future<Nothing> copy = hdfs.get()->copyFromLocal(file1, file2);
+  AWAIT_READY(copy);
+
+  EXPECT_SOME_EQ("abc", os::read(file2));
+}
+
+
+// This test verifies the 'HDFS::copyToLocal(from, to)' method. We
+// emulate the hadoop client by doing a 'cp' on the local filesystem.
+TEST_F(HdfsTest, CopyToLocal)
+{
+  // The script emulating 'hadoop fs -copyToLocal <from> <to>'.
+  // NOTE: We emulate a version call here which is exercised when
+  // creating the HDFS client.
+  ASSERT_SOME(os::write(
+      hadoop,
+      "#!/bin/sh\n"
+      "if [ \"$1\" = \"version\" ]; then\n"
+      "  exit 0\n"
+      "fi\n"
+      "cp $3 $4"));
+
+  Try<Owned<HDFS>> hdfs = HDFS::create(hadoop);
+  ASSERT_SOME(hdfs);
+
+  string file1 = path::join(os::getcwd(), "file1");
+  string file2 = path::join(os::getcwd(), "file2");
+
+  ASSERT_SOME(os::write(file1, "abc"));
+
+  Future<Nothing> copy = hdfs.get()->copyToLocal(file1, file2);
+  AWAIT_READY(copy);
+
+  EXPECT_SOME_EQ("abc", os::read(file2));
+}
+
+} // namespace tests {
+} // namespace internal {
+} // namespace mesos {


[7/7] mesos git commit: Made HDFS::copyFromLocal asynchronous.

Posted by ji...@apache.org.
Made HDFS::copyFromLocal asynchronous.

Review: https://reviews.apache.org/r/40943/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/905d5fb4
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/905d5fb4
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/905d5fb4

Branch: refs/heads/master
Commit: 905d5fb4dc433c9922fb3965a1bb4c40805f3b92
Parents: c08d348
Author: Jie Yu <yu...@gmail.com>
Authored: Mon Dec 14 11:42:36 2015 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Tue Dec 15 10:55:33 2015 -0800

----------------------------------------------------------------------
 src/cli/execute.cpp |  9 ++++++---
 src/hdfs/hdfs.cpp   | 37 +++++++++++++++++++++++++------------
 src/hdfs/hdfs.hpp   |  6 +++++-
 3 files changed, 36 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/905d5fb4/src/cli/execute.cpp
----------------------------------------------------------------------
diff --git a/src/cli/execute.cpp b/src/cli/execute.cpp
index fec2ad4..a2b610f 100644
--- a/src/cli/execute.cpp
+++ b/src/cli/execute.cpp
@@ -408,9 +408,12 @@ int main(int argc, char** argv)
       return EXIT_FAILURE;
     }
 
-    Try<Nothing> copy = hdfs.get()->copyFromLocal(flags.package.get(), path);
-    if (copy.isError()) {
-      cerr << "Failed to copy package: " << copy.error() << endl;
+    Future<Nothing> copy = hdfs.get()->copyFromLocal(flags.package.get(), path);
+    copy.await();
+
+    if (!copy.isReady()) {
+      cerr << "Failed to copy package: "
+           << (copy.isFailed() ? copy.failure() : "discarded") << endl;
       return EXIT_FAILURE;
     }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/905d5fb4/src/hdfs/hdfs.cpp
----------------------------------------------------------------------
diff --git a/src/hdfs/hdfs.cpp b/src/hdfs/hdfs.cpp
index ed7ab63..3c3f867 100644
--- a/src/hdfs/hdfs.cpp
+++ b/src/hdfs/hdfs.cpp
@@ -236,26 +236,39 @@ Future<Nothing> HDFS::rm(const string& path)
 }
 
 
-Try<Nothing> HDFS::copyFromLocal(const string& from, const string& _to)
+Future<Nothing> HDFS::copyFromLocal(const string& from, const string& to)
 {
   if (!os::exists(from)) {
-    return Error("Failed to find " + from);
+    return Failure("Failed to find '" + from + "'");
   }
 
-  const string to = absolutePath(_to);
-
-  Try<string> command = strings::format(
-      "%s fs -copyFromLocal '%s' '%s'", hadoop, from, to);
+  Try<Subprocess> s = subprocess(
+      hadoop,
+      {"hadoop", "fs", "-copyFromLocal", from, absolutePath(to)},
+      Subprocess::PATH("/dev/null"),
+      Subprocess::PIPE(),
+      Subprocess::PIPE());
 
-  CHECK_SOME(command);
+  if (s.isError()) {
+    return Failure("Failed to execute the subprocess: " + s.error());
+  }
 
-  Try<string> out = os::shell(command.get());
+  return result(s.get())
+    .then([](const CommandResult& result) -> Future<Nothing> {
+      if (result.status.isNone()) {
+        return Failure("Failed to reap the subprocess");
+      }
 
-  if (out.isError()) {
-    return Error(out.error());
-  }
+      if (result.status.get() != 0) {
+        return Failure(
+            "Unexpected result from the subprocess: "
+            "status='" + stringify(result.status.get()) + "', " +
+            "stdout='" + result.out + "', " +
+            "stderr='" + result.err + "'");
+      }
 
-  return Nothing();
+      return Nothing();
+    });
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/905d5fb4/src/hdfs/hdfs.hpp
----------------------------------------------------------------------
diff --git a/src/hdfs/hdfs.hpp b/src/hdfs/hdfs.hpp
index 13ec02c..efe6e1d 100644
--- a/src/hdfs/hdfs.hpp
+++ b/src/hdfs/hdfs.hpp
@@ -52,7 +52,11 @@ public:
   process::Future<bool> exists(const std::string& path);
   Try<Bytes> du(const std::string& path);
   process::Future<Nothing> rm(const std::string& path);
-  Try<Nothing> copyFromLocal(const std::string& from, const std::string& to);
+
+  process::Future<Nothing> copyFromLocal(
+      const std::string& from,
+      const std::string& to);
+
   Try<Nothing> copyToLocal(const std::string& from, const std::string& to);
 
 private:


[2/7] mesos git commit: Made HDFS::copyToLocal asynchronous.

Posted by ji...@apache.org.
Made HDFS::copyToLocal asynchronous.

Review: https://reviews.apache.org/r/40945/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3d5e42af
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3d5e42af
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3d5e42af

Branch: refs/heads/master
Commit: 3d5e42af8d4cb3d1d03db505b0140ff35f37d912
Parents: 905d5fb
Author: Jie Yu <yu...@gmail.com>
Authored: Mon Dec 14 11:42:46 2015 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Tue Dec 15 10:55:33 2015 -0800

----------------------------------------------------------------------
 src/hdfs/hdfs.cpp        | 35 ++++++++++++++++++++++++-----------
 src/hdfs/hdfs.hpp        |  4 +++-
 src/launcher/fetcher.cpp |  9 ++++++---
 3 files changed, 33 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/3d5e42af/src/hdfs/hdfs.cpp
----------------------------------------------------------------------
diff --git a/src/hdfs/hdfs.cpp b/src/hdfs/hdfs.cpp
index 3c3f867..2b7a58e 100644
--- a/src/hdfs/hdfs.cpp
+++ b/src/hdfs/hdfs.cpp
@@ -272,22 +272,35 @@ Future<Nothing> HDFS::copyFromLocal(const string& from, const string& to)
 }
 
 
-Try<Nothing> HDFS::copyToLocal(const string& _from, const string& to)
+Future<Nothing> HDFS::copyToLocal(const string& from, const string& to)
 {
-  const string from = absolutePath(_from);
-
-  Try<string> command = strings::format(
-      "%s fs -copyToLocal '%s' '%s'", hadoop, from, to);
+  Try<Subprocess> s = subprocess(
+      hadoop,
+      {"hadoop", "fs", "-copyToLocal", absolutePath(from), to},
+      Subprocess::PATH("/dev/null"),
+      Subprocess::PIPE(),
+      Subprocess::PIPE());
 
-  CHECK_SOME(command);
+  if (s.isError()) {
+    return Failure("Failed to execute the subprocess: " + s.error());
+  }
 
-  Try<string> out = os::shell(command.get());
+  return result(s.get())
+    .then([](const CommandResult& result) -> Future<Nothing> {
+      if (result.status.isNone()) {
+        return Failure("Failed to reap the subprocess");
+      }
 
-  if (out.isError()) {
-    return Error(out.error());
-  }
+      if (result.status.get() != 0) {
+        return Failure(
+            "Unexpected result from the subprocess: "
+            "status='" + stringify(result.status.get()) + "', " +
+            "stdout='" + result.out + "', " +
+            "stderr='" + result.err + "'");
+      }
 
-  return Nothing();
+      return Nothing();
+    });
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/3d5e42af/src/hdfs/hdfs.hpp
----------------------------------------------------------------------
diff --git a/src/hdfs/hdfs.hpp b/src/hdfs/hdfs.hpp
index efe6e1d..24d3ffc 100644
--- a/src/hdfs/hdfs.hpp
+++ b/src/hdfs/hdfs.hpp
@@ -57,7 +57,9 @@ public:
       const std::string& from,
       const std::string& to);
 
-  Try<Nothing> copyToLocal(const std::string& from, const std::string& to);
+  process::Future<Nothing> copyToLocal(
+      const std::string& from,
+      const std::string& to);
 
 private:
   explicit HDFS(const std::string& _hadoop)

http://git-wip-us.apache.org/repos/asf/mesos/blob/3d5e42af/src/launcher/fetcher.cpp
----------------------------------------------------------------------
diff --git a/src/launcher/fetcher.cpp b/src/launcher/fetcher.cpp
index eafdda3..0ff8598 100644
--- a/src/launcher/fetcher.cpp
+++ b/src/launcher/fetcher.cpp
@@ -109,9 +109,12 @@ static Try<string> downloadWithHadoopClient(
   LOG(INFO) << "Downloading resource with Hadoop client from '" << sourceUri
             << "' to '" << destinationPath << "'";
 
-  Try<Nothing> result = hdfs.get()->copyToLocal(sourceUri, destinationPath);
-  if (result.isError()) {
-    return Error("HDFS copyToLocal failed: " + result.error());
+  Future<Nothing> result = hdfs.get()->copyToLocal(sourceUri, destinationPath);
+  result.await();
+
+  if (!result.isReady()) {
+    return Error("HDFS copyToLocal failed: " +
+                 (result.isFailed() ? result.failure() : "discarded"));
   }
 
   return destinationPath;


[4/7] mesos git commit: Added a helper for HDFS client to shell out commands.

Posted by ji...@apache.org.
Added a helper for HDFS client to shell out commands.

Code is copied from https://reviews.apache.org/r/40559/.

Review: https://reviews.apache.org/r/40941/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b420b396
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b420b396
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b420b396

Branch: refs/heads/master
Commit: b420b396d1fa1e0adac4ecde2e72788b81bf5ad5
Parents: 925e99e
Author: Jie Yu <yu...@gmail.com>
Authored: Mon Dec 14 11:42:02 2015 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Tue Dec 15 10:55:33 2015 -0800

----------------------------------------------------------------------
 src/hdfs/hdfs.cpp | 57 ++++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 57 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/b420b396/src/hdfs/hdfs.cpp
----------------------------------------------------------------------
diff --git a/src/hdfs/hdfs.cpp b/src/hdfs/hdfs.cpp
index c32c2ae..e5e7097 100644
--- a/src/hdfs/hdfs.cpp
+++ b/src/hdfs/hdfs.cpp
@@ -15,8 +15,13 @@
 // limitations under the License
 
 #include <string>
+#include <tuple>
 #include <vector>
 
+#include <process/collect.hpp>
+#include <process/io.hpp>
+#include <process/subprocess.hpp>
+
 #include <stout/bytes.hpp>
 #include <stout/check.hpp>
 #include <stout/error.hpp>
@@ -37,6 +42,58 @@ using std::string;
 using std::vector;
 
 
+struct CommandResult
+{
+  Option<int> status;
+  string out;
+  string err;
+};
+
+
+static Future<CommandResult> result(const Subprocess& s)
+{
+  CHECK_SOME(s.out());
+  CHECK_SOME(s.err());
+
+  return await(
+      s.status(),
+      io::read(s.out().get()),
+      io::read(s.err().get()))
+    .then([](const std::tuple<
+        Future<Option<int>>,
+        Future<string>,
+        Future<string>>& t) -> Future<CommandResult> {
+      Future<Option<int>> status = std::get<0>(t);
+      if (!status.isReady()) {
+        return Failure(
+            "Failed to get the exit status of the subprocess: " +
+            (status.isFailed() ? status.failure() : "discarded"));
+      }
+
+      Future<string> output = std::get<1>(t);
+      if (!output.isReady()) {
+        return Failure(
+            "Failed to read stdout from the subprocess: " +
+            (output.isFailed() ? output.failure() : "discarded"));
+      }
+
+      Future<string> error = std::get<2>(t);
+      if (!error.isReady()) {
+        return Failure(
+            "Failed to read stderr from the subprocess: " +
+            (error.isFailed() ? error.failure() : "discarded"));
+      }
+
+      CommandResult result;
+      result.status = status.get();
+      result.out = output.get();
+      result.err = error.get();
+
+      return result;
+    });
+}
+
+
 Try<Owned<HDFS>> HDFS::create(const Option<string>& _hadoop)
 {
   // Determine the hadoop client to use. If the user has specified


[5/7] mesos git commit: Made HDFS::rm asynchronous.

Posted by ji...@apache.org.
Made HDFS::rm asynchronous.

Review: https://reviews.apache.org/r/40942/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/c08d3488
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/c08d3488
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/c08d3488

Branch: refs/heads/master
Commit: c08d3488bcb3b849d96f6466b40d37e55a480a79
Parents: 3424b1f
Author: Jie Yu <yu...@gmail.com>
Authored: Mon Dec 14 11:42:25 2015 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Tue Dec 15 10:55:33 2015 -0800

----------------------------------------------------------------------
 src/cli/execute.cpp |  9 ++++++---
 src/hdfs/hdfs.cpp   | 33 ++++++++++++++++++++++++---------
 src/hdfs/hdfs.hpp   |  2 +-
 3 files changed, 31 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/c08d3488/src/cli/execute.cpp
----------------------------------------------------------------------
diff --git a/src/cli/execute.cpp b/src/cli/execute.cpp
index eef3b91..fec2ad4 100644
--- a/src/cli/execute.cpp
+++ b/src/cli/execute.cpp
@@ -395,9 +395,12 @@ int main(int argc, char** argv)
            << (exists.isFailed() ? exists.failure() : "discarded") << endl;
       return EXIT_FAILURE;
     } else if (exists.get() && flags.overwrite) {
-      Try<Nothing> rm = hdfs.get()->rm(path);
-      if (rm.isError()) {
-        cerr << "Failed to remove existing file: " << rm.error() << endl;
+      Future<Nothing> rm = hdfs.get()->rm(path);
+      rm.await();
+
+      if (!rm.isReady()) {
+        cerr << "Failed to remove existing file: "
+             << (rm.isFailed() ? rm.failure() : "discarded") << endl;
         return EXIT_FAILURE;
       }
     } else if (exists.get()) {

http://git-wip-us.apache.org/repos/asf/mesos/blob/c08d3488/src/hdfs/hdfs.cpp
----------------------------------------------------------------------
diff --git a/src/hdfs/hdfs.cpp b/src/hdfs/hdfs.cpp
index 1038cc3..ed7ab63 100644
--- a/src/hdfs/hdfs.cpp
+++ b/src/hdfs/hdfs.cpp
@@ -204,20 +204,35 @@ Try<Bytes> HDFS::du(const string& _path)
 }
 
 
-Try<Nothing> HDFS::rm(const string& path)
+Future<Nothing> HDFS::rm(const string& path)
 {
-  Try<string> command = strings::format(
-      "%s fs -rm '%s'", hadoop, absolutePath(path));
+  Try<Subprocess> s = subprocess(
+      hadoop,
+      {"hadoop", "fs", "-rm", absolutePath(path)},
+      Subprocess::PATH("/dev/null"),
+      Subprocess::PIPE(),
+      Subprocess::PIPE());
 
-  CHECK_SOME(command);
+  if (s.isError()) {
+    return Failure("Failed to execute the subprocess: " + s.error());
+  }
 
-  Try<string> out = os::shell(command.get());
+  return result(s.get())
+    .then([](const CommandResult& result) -> Future<Nothing> {
+      if (result.status.isNone()) {
+        return Failure("Failed to reap the subprocess");
+      }
 
-  if (out.isError()) {
-    return Error(out.error());
-  }
+      if (result.status.get() != 0) {
+        return Failure(
+            "Unexpected result from the subprocess: "
+            "status='" + stringify(result.status.get()) + "', " +
+            "stdout='" + result.out + "', " +
+            "stderr='" + result.err + "'");
+      }
 
-  return Nothing();
+      return Nothing();
+    });
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/c08d3488/src/hdfs/hdfs.hpp
----------------------------------------------------------------------
diff --git a/src/hdfs/hdfs.hpp b/src/hdfs/hdfs.hpp
index aa44903..13ec02c 100644
--- a/src/hdfs/hdfs.hpp
+++ b/src/hdfs/hdfs.hpp
@@ -51,7 +51,7 @@ public:
 
   process::Future<bool> exists(const std::string& path);
   Try<Bytes> du(const std::string& path);
-  Try<Nothing> rm(const std::string& path);
+  process::Future<Nothing> rm(const std::string& path);
   Try<Nothing> copyFromLocal(const std::string& from, const std::string& to);
   Try<Nothing> copyToLocal(const std::string& from, const std::string& to);
 


[3/7] mesos git commit: Made HDFS::exists asynchronous.

Posted by ji...@apache.org.
Made HDFS::exists asynchronous.

This also fixed a bug in the orignal code: the original code never returns false.

Review: https://reviews.apache.org/r/40806/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3424b1f7
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3424b1f7
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3424b1f7

Branch: refs/heads/master
Commit: 3424b1f702da1d0c33d0a3e32569477e31dbb4e8
Parents: b420b39
Author: Jie Yu <yu...@gmail.com>
Authored: Mon Dec 14 11:42:13 2015 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Tue Dec 15 10:55:33 2015 -0800

----------------------------------------------------------------------
 src/cli/execute.cpp | 11 ++++++++---
 src/hdfs/hdfs.cpp   | 42 ++++++++++++++++++++++++++++++------------
 src/hdfs/hdfs.hpp   |  3 ++-
 3 files changed, 40 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/3424b1f7/src/cli/execute.cpp
----------------------------------------------------------------------
diff --git a/src/cli/execute.cpp b/src/cli/execute.cpp
index ac933b6..eef3b91 100644
--- a/src/cli/execute.cpp
+++ b/src/cli/execute.cpp
@@ -21,6 +21,7 @@
 #include <mesos/scheduler.hpp>
 #include <mesos/type_utils.hpp>
 
+#include <process/future.hpp>
 #include <process/owned.hpp>
 #include <process/pid.hpp>
 
@@ -40,6 +41,7 @@
 using namespace mesos;
 using namespace mesos::internal;
 
+using process::Future;
 using process::Owned;
 using process::UPID;
 
@@ -385,9 +387,12 @@ int main(int argc, char** argv)
     string path = path::join("/", user.get(), flags.package.get());
 
     // Check if the file exists and remove it if we're overwriting.
-    Try<bool> exists = hdfs.get()->exists(path);
-    if (exists.isError()) {
-      cerr << "Failed to check if file exists: " << exists.error() << endl;
+    Future<bool> exists = hdfs.get()->exists(path);
+    exists.await();
+
+    if (!exists.isReady()) {
+      cerr << "Failed to check if file exists: "
+           << (exists.isFailed() ? exists.failure() : "discarded") << endl;
       return EXIT_FAILURE;
     } else if (exists.get() && flags.overwrite) {
       Try<Nothing> rm = hdfs.get()->rm(path);

http://git-wip-us.apache.org/repos/asf/mesos/blob/3424b1f7/src/hdfs/hdfs.cpp
----------------------------------------------------------------------
diff --git a/src/hdfs/hdfs.cpp b/src/hdfs/hdfs.cpp
index e5e7097..1038cc3 100644
--- a/src/hdfs/hdfs.cpp
+++ b/src/hdfs/hdfs.cpp
@@ -122,22 +122,40 @@ Try<Owned<HDFS>> HDFS::create(const Option<string>& _hadoop)
 }
 
 
-Try<bool> HDFS::exists(const string& path)
+Future<bool> HDFS::exists(const string& path)
 {
-  Try<string> command = strings::format(
-      "%s fs -test -e '%s'", hadoop, absolutePath(path));
-
-  CHECK_SOME(command);
+  Try<Subprocess> s = subprocess(
+      hadoop,
+      {"hadoop", "fs", "-test", "-e", absolutePath(path)},
+      Subprocess::PATH("/dev/null"),
+      Subprocess::PIPE(),
+      Subprocess::PIPE());
+
+  if (s.isError()) {
+    return Failure("Failed to execute the subprocess: " + s.error());
+  }
 
-  // We are piping stderr to stdout so that we can see the error (if
-  // any) in the logs emitted by `os::shell()` in case of failure.
-  Try<string> out = os::shell(command.get() + " 2>&1");
+  return result(s.get())
+    .then([](const CommandResult& result) -> Future<bool> {
+      if (result.status.isNone()) {
+        return Failure("Failed to reap the subprocess");
+      }
 
-  if (out.isError()) {
-    return Error(out.error());
-  }
+      if (WIFEXITED(result.status.get())) {
+        int exitCode = WEXITSTATUS(result.status.get());
+        if (exitCode == 0) {
+          return true;
+        } else if (exitCode == 1) {
+          return false;
+        }
+      }
 
-  return true;
+      return Failure(
+          "Unexpected result from the subprocess: "
+          "status='" + stringify(result.status.get()) + "', " +
+          "stdout='" + result.out + "', " +
+          "stderr='" + result.err + "'");
+    });
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/3424b1f7/src/hdfs/hdfs.hpp
----------------------------------------------------------------------
diff --git a/src/hdfs/hdfs.hpp b/src/hdfs/hdfs.hpp
index 6bdeedf..aa44903 100644
--- a/src/hdfs/hdfs.hpp
+++ b/src/hdfs/hdfs.hpp
@@ -19,6 +19,7 @@
 
 #include <string>
 
+#include <process/future.hpp>
 #include <process/owned.hpp>
 
 #include <stout/bytes.hpp>
@@ -48,7 +49,7 @@ public:
   static Try<process::Owned<HDFS>> create(
       const Option<std::string>& hadoop = None());
 
-  Try<bool> exists(const std::string& path);
+  process::Future<bool> exists(const std::string& path);
   Try<Bytes> du(const std::string& path);
   Try<Nothing> rm(const std::string& path);
   Try<Nothing> copyFromLocal(const std::string& from, const std::string& to);


[6/7] mesos git commit: Made HDFS::du asynchrounous.

Posted by ji...@apache.org.
Made HDFS::du asynchrounous.

Review: https://reviews.apache.org/r/40946/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/00f6693a
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/00f6693a
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/00f6693a

Branch: refs/heads/master
Commit: 00f6693a6d9d71d122cee066a9cbb5d25bb67e1a
Parents: 3d5e42a
Author: Jie Yu <yu...@gmail.com>
Authored: Mon Dec 14 11:42:58 2015 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Tue Dec 15 10:55:33 2015 -0800

----------------------------------------------------------------------
 src/hdfs/hdfs.cpp                   | 72 +++++++++++++++++---------------
 src/hdfs/hdfs.hpp                   |  2 +-
 src/slave/containerizer/fetcher.cpp |  9 ++--
 3 files changed, 46 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/00f6693a/src/hdfs/hdfs.cpp
----------------------------------------------------------------------
diff --git a/src/hdfs/hdfs.cpp b/src/hdfs/hdfs.cpp
index 2b7a58e..51f016b 100644
--- a/src/hdfs/hdfs.cpp
+++ b/src/hdfs/hdfs.cpp
@@ -159,48 +159,54 @@ Future<bool> HDFS::exists(const string& path)
 }
 
 
-Try<Bytes> HDFS::du(const string& _path)
+Future<Bytes> HDFS::du(const string& _path)
 {
   const string path = absolutePath(_path);
 
-  Try<string> command = strings::format(
-      "%s fs -du '%s'", hadoop, path);
-
-  CHECK_SOME(command);
-
-  // We are piping stderr to stdout so that we can see the error (if
-  // any) in the logs emitted by `os::shell()` in case of failure.
-  //
-  // TODO(marco): this was the existing logic, but not sure it is
-  // actually needed.
-  Try<string> out = os::shell(command.get() + " 2>&1");
+  Try<Subprocess> s = subprocess(
+      hadoop,
+      {"hadoop", "fs", "-du", path},
+      Subprocess::PATH("/dev/null"),
+      Subprocess::PIPE(),
+      Subprocess::PIPE());
 
-  if (out.isError()) {
-    return Error("HDFS du failed: " + out.error());
+  if (s.isError()) {
+    return Failure("Failed to execute the subprocess: " + s.error());
   }
 
-  // We expect 2 space-separated output fields; a number of bytes then
-  // the name of the path we gave. The 'hadoop' command can emit
-  // various WARN or other log messages, so we make an effort to scan
-  // for the field we want.
-  foreach (const string& line, strings::tokenize(out.get(), "\n")) {
-    // Note that we use tokenize() rather than split() since fields
-    // can be delimited by multiple spaces.
-    vector<string> fields = strings::tokenize(line, " ");
-
-    if (fields.size() == 2 && fields[1] == path) {
-      Result<size_t> size = numify<size_t>(fields[0]);
-      if (size.isError()) {
-        return Error("HDFS du returned unexpected format: " + size.error());
-      } else if (size.isNone()) {
-        return Error("HDFS du returned unexpected format");
+  return result(s.get())
+    .then([path](const CommandResult& result) -> Future<Bytes> {
+      if (result.status.isNone()) {
+        return Failure("Failed to reap the subprocess");
+      }
+
+      if (result.status.get() != 0) {
+        return Failure(
+            "Unexpected result from the subprocess: "
+            "status='" + stringify(result.status.get()) + "', " +
+            "stdout='" + result.out + "', " +
+            "stderr='" + result.err + "'");
       }
 
-      return Bytes(size.get());
-    }
-  }
+      // We expect 2 space-separated output fields; a number of bytes
+      // then the name of the path we gave. The 'hadoop' command can
+      // emit various WARN or other log messages, so we make an effort
+      // to scan for the field we want.
+      foreach (const string& line, strings::tokenize(result.out, "\n")) {
+        // Note that we use tokenize() rather than split() since
+        // fields can be delimited by multiple spaces.
+        vector<string> fields = strings::tokenize(line, " \t");
+
+        if (fields.size() == 2 && fields[1] == path) {
+          Result<size_t> size = numify<size_t>(fields[0]);
+          if (size.isSome()) {
+            return Bytes(size.get());
+          }
+        }
+      }
 
-  return Error("HDFS du returned an unexpected format: '" + out.get() + "'");
+      return Failure("Unexpected output format: '" + result.out + "'");
+    });
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/00f6693a/src/hdfs/hdfs.hpp
----------------------------------------------------------------------
diff --git a/src/hdfs/hdfs.hpp b/src/hdfs/hdfs.hpp
index 24d3ffc..abdb9b9 100644
--- a/src/hdfs/hdfs.hpp
+++ b/src/hdfs/hdfs.hpp
@@ -50,7 +50,7 @@ public:
       const Option<std::string>& hadoop = None());
 
   process::Future<bool> exists(const std::string& path);
-  Try<Bytes> du(const std::string& path);
+  process::Future<Bytes> du(const std::string& path);
   process::Future<Nothing> rm(const std::string& path);
 
   process::Future<Nothing> copyFromLocal(

http://git-wip-us.apache.org/repos/asf/mesos/blob/00f6693a/src/slave/containerizer/fetcher.cpp
----------------------------------------------------------------------
diff --git a/src/slave/containerizer/fetcher.cpp b/src/slave/containerizer/fetcher.cpp
index e479bd3..4ac9149 100644
--- a/src/slave/containerizer/fetcher.cpp
+++ b/src/slave/containerizer/fetcher.cpp
@@ -281,9 +281,12 @@ static Try<Bytes> fetchSize(
     return Error("Failed to create HDFS client: " + hdfs.error());
   }
 
-  Try<Bytes> size = hdfs.get()->du(uri);
-  if (size.isError()) {
-    return Error("Hadoop client could not determine size: " + size.error());
+  Future<Bytes> size = hdfs.get()->du(uri);
+  size.await();
+
+  if (!size.isReady()) {
+    return Error("Hadoop client could not determine size: " +
+                 (size.isFailed() ? size.failure() : "discarded"));
   }
 
   return size.get();