You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2015/12/31 20:42:01 UTC

[1/2] mesos git commit: Added HDFS URI fetcher plugin.

Repository: mesos
Updated Branches:
  refs/heads/master 4568e584d -> f05eb74b9


Added HDFS URI fetcher plugin.

Review: https://reviews.apache.org/r/41713


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/0ed4155a
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/0ed4155a
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/0ed4155a

Branch: refs/heads/master
Commit: 0ed4155a8efc8fe113def7f24fb78b466440f508
Parents: 4568e58
Author: Jie Yu <yu...@gmail.com>
Authored: Tue Dec 15 14:26:14 2015 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Wed Dec 30 21:23:28 2015 -0800

----------------------------------------------------------------------
 include/mesos/uri/fetcher.hpp   | 27 +++++++----
 include/mesos/uri/uri.hpp       |  6 +--
 src/Makefile.am                 |  3 ++
 src/tests/uri_fetcher_tests.cpp | 13 +++---
 src/uri/fetcher.cpp             | 54 ++++++++++++++++------
 src/uri/fetcher.hpp             | 52 ++++++++++++++++++++++
 src/uri/fetchers/curl.cpp       |  9 +++-
 src/uri/fetchers/curl.hpp       |  7 ++-
 src/uri/fetchers/hadoop.cpp     | 86 ++++++++++++++++++++++++++++++++++++
 src/uri/fetchers/hadoop.hpp     | 65 +++++++++++++++++++++++++++
 10 files changed, 290 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/0ed4155a/include/mesos/uri/fetcher.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/uri/fetcher.hpp b/include/mesos/uri/fetcher.hpp
index 6cf7b37..4223f9e 100644
--- a/include/mesos/uri/fetcher.hpp
+++ b/include/mesos/uri/fetcher.hpp
@@ -14,9 +14,10 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-#ifndef __URI_FETCHER_HPP__
-#define __URI_FETCHER_HPP__
+#ifndef __MESOS_URI_FETCHER_HPP__
+#define __MESOS_URI_FETCHER_HPP__
 
+#include <set>
 #include <string>
 
 #include <process/future.hpp>
@@ -24,13 +25,16 @@
 
 #include <stout/hashmap.hpp>
 #include <stout/nothing.hpp>
-#include <stout/try.hpp>
 
 #include <mesos/uri/uri.hpp>
 
 namespace mesos {
 namespace uri {
 
+// Forward declarations.
+class Flags;
+
+
 /**
  * Provides an abstraction for fetching URIs. It is pluggable through
  * plugins. Each plugin is responsible for one or more URI schemes,
@@ -50,6 +54,11 @@ public:
     virtual ~Plugin() {}
 
     /**
+     * Returns the URI schemes that this plugin handles.
+     */
+    virtual std::set<std::string> schemes() = 0;
+
+    /**
      * Fetches a URI to the given directory. To avoid blocking or
      * crashing the current thread, this method might choose to fork
      * subprocesses for third party commands.
@@ -63,9 +72,12 @@ public:
   };
 
   /**
-   * Factory method for creating a Fetcher instance.
+   * Create the Fetcher instance with the given plugins.
+   *
+   * @param _plugins a URI scheme to plugin map
    */
-  static Try<process::Owned<Fetcher>> create();
+  Fetcher(const hashmap<std::string, process::Owned<Plugin>>& _plugins)
+    : plugins(_plugins) {}
 
   /**
    * Fetches a URI to the given directory. This method will dispatch
@@ -79,9 +91,6 @@ public:
       const std::string& directory);
 
 private:
-  Fetcher(const hashmap<std::string, process::Owned<Plugin>>& _plugins)
-    : plugins(_plugins) {}
-
   Fetcher(const Fetcher&) = delete; // Not copyable.
   Fetcher& operator=(const Fetcher&) = delete; // Not assignable.
 
@@ -91,4 +100,4 @@ private:
 } // namespace uri {
 } // namespace mesos {
 
-#endif // __URI_FETCHER_HPP__
+#endif // __MESOS_URI_FETCHER_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/0ed4155a/include/mesos/uri/uri.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/uri/uri.hpp b/include/mesos/uri/uri.hpp
index aa3ab5d..75f08c8 100644
--- a/include/mesos/uri/uri.hpp
+++ b/include/mesos/uri/uri.hpp
@@ -14,8 +14,8 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-#ifndef __URI_PROTO_HPP__
-#define __URI_PROTO_HPP__
+#ifndef __MESOS_URI_PROTO_HPP__
+#define __MESOS_URI_PROTO_HPP__
 
 #include <ostream>
 
@@ -28,4 +28,4 @@ std::ostream& operator<<(std::ostream& stream, const URI& uri);
 
 } // namespace mesos {
 
-#endif // __URI_PROTO_HPP__
+#endif // __MESOS_URI_PROTO_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/0ed4155a/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index bf71fe6..1f6c737 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -636,6 +636,7 @@ libmesos_no_3rdparty_la_SOURCES +=					\
   uri/fetcher.cpp							\
   uri/utils.cpp								\
   uri/fetchers/curl.cpp							\
+  uri/fetchers/hadoop.cpp						\
   usage/usage.cpp							\
   v1/attributes.cpp							\
   v1/mesos.cpp								\
@@ -761,8 +762,10 @@ libmesos_no_3rdparty_la_SOURCES +=					\
   tests/containerizer/rootfs.hpp					\
   tests/containerizer/setns_test_helper.hpp				\
   tests/containerizer/store.hpp						\
+  uri/fetcher.hpp							\
   uri/utils.hpp								\
   uri/fetchers/curl.hpp							\
+  uri/fetchers/hadoop.hpp						\
   uri/schemes/file.hpp							\
   uri/schemes/http.hpp							\
   usage/usage.hpp							\

http://git-wip-us.apache.org/repos/asf/mesos/blob/0ed4155a/src/tests/uri_fetcher_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/uri_fetcher_tests.cpp b/src/tests/uri_fetcher_tests.cpp
index d54a7e7..7f54927 100644
--- a/src/tests/uri_fetcher_tests.cpp
+++ b/src/tests/uri_fetcher_tests.cpp
@@ -29,16 +29,19 @@
 
 #include <stout/tests/utils.hpp>
 
-#include <mesos/uri/fetcher.hpp>
-#include <mesos/uri/uri.hpp>
+#include "uri/fetcher.hpp"
 
 #include "uri/schemes/http.hpp"
 
-using namespace process;
+namespace http = process::http;
 
 using testing::_;
 using testing::Return;
 
+using process::Future;
+using process::Owned;
+using process::Process;
+
 namespace mesos {
 namespace internal {
 namespace tests {
@@ -87,7 +90,7 @@ TEST_F(CurlFetcherPluginTest, CURL_ValidUri)
   EXPECT_CALL(server, test(_))
     .WillOnce(Return(http::OK("test")));
 
-  Try<Owned<uri::Fetcher>> fetcher = uri::Fetcher::create();
+  Try<Owned<uri::Fetcher>> fetcher = uri::fetcher::create();
   ASSERT_SOME(fetcher);
 
   AWAIT_READY(fetcher.get()->fetch(uri, os::getcwd()));
@@ -106,7 +109,7 @@ TEST_F(CurlFetcherPluginTest, CURL_InvalidUri)
   EXPECT_CALL(server, test(_))
     .WillOnce(Return(http::NotFound()));
 
-  Try<Owned<uri::Fetcher>> fetcher = uri::Fetcher::create();
+  Try<Owned<uri::Fetcher>> fetcher = uri::fetcher::create();
   ASSERT_SOME(fetcher);
 
   AWAIT_FAILED(fetcher.get()->fetch(uri, os::getcwd()));

http://git-wip-us.apache.org/repos/asf/mesos/blob/0ed4155a/src/uri/fetcher.cpp
----------------------------------------------------------------------
diff --git a/src/uri/fetcher.cpp b/src/uri/fetcher.cpp
index 8bd7cf3..ac13fbd 100644
--- a/src/uri/fetcher.cpp
+++ b/src/uri/fetcher.cpp
@@ -14,38 +14,66 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-#include <mesos/uri/fetcher.hpp>
+#include <string>
 
-#include "uri/fetchers/curl.hpp"
+#include <stout/foreach.hpp>
+#include <stout/lambda.hpp>
+#include <stout/hashmap.hpp>
 
-using namespace process;
+#include "uri/fetcher.hpp"
 
 using std::string;
 
+using process::Failure;
+using process::Future;
+using process::Owned;
+
 namespace mesos {
 namespace uri {
+namespace fetcher {
 
-Try<Owned<Fetcher>> Fetcher::create()
+Try<Owned<Fetcher>> create(const Option<Flags>& _flags)
 {
-  hashmap<string, Owned<Plugin>> plugins;
+  // Use the default flags if not specified.
+  Flags flags;
+  if (_flags.isSome()) {
+    flags = _flags.get();
+  }
+
+  // Load built-in plugins.
+  typedef lambda::function<Try<Owned<Fetcher::Plugin>>()> Creator;
+
+  hashmap<string, Creator> creators;
+  creators.put("curl", lambda::bind(&CurlFetcherPlugin::create, flags));
+  creators.put("hadoop", lambda::bind(&HadoopFetcherPlugin::create, flags));
 
-  hashmap<string, Try<Owned<Plugin>>(*)()> creators;
-  creators.put("http", &CurlFetcherPlugin::create);
+  hashmap<string, Owned<Fetcher::Plugin>> plugins;
 
-  foreachkey (const string& scheme, creators) {
-    Try<Owned<Plugin>> plugin = creators[scheme]();
+  foreachpair (const string& name, const Creator& creator, creators) {
+    Try<Owned<Fetcher::Plugin>> plugin = creator();
     if (plugin.isError()) {
-      return Error(
-          "Failed to create plugin for scheme '" +
-          scheme + "': " + plugin.error());
+      // NOTE: We skip the plugin if it cannot be created, instead of
+      // returning an Error so that we can still use other plugins.
+      LOG(ERROR) << "Failed to create URI fetcher plugin "
+                 << "'"  << name << "': " << plugin.error();
+      continue;
     }
 
-    plugins.put(scheme, plugin.get());
+    foreach (const string& scheme, plugin.get()->schemes()) {
+      if (plugins.contains(scheme)) {
+        LOG(WARNING) << "Multiple URI fetcher plugins register "
+                     << "URI scheme '" << scheme << "'";
+      }
+
+      plugins.put(scheme, plugin.get());
+    }
   }
 
   return Owned<Fetcher>(new Fetcher(plugins));
 }
 
+} // namespace fetcher {
+
 
 Future<Nothing> Fetcher::fetch(
     const URI& uri,

http://git-wip-us.apache.org/repos/asf/mesos/blob/0ed4155a/src/uri/fetcher.hpp
----------------------------------------------------------------------
diff --git a/src/uri/fetcher.hpp b/src/uri/fetcher.hpp
new file mode 100644
index 0000000..d5182a5
--- /dev/null
+++ b/src/uri/fetcher.hpp
@@ -0,0 +1,52 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef __URI_FETCHER_HPP__
+#define __URI_FETCHER_HPP__
+
+#include <process/owned.hpp>
+
+#include <stout/none.hpp>
+#include <stout/option.hpp>
+#include <stout/try.hpp>
+
+#include <mesos/uri/fetcher.hpp>
+
+#include "uri/fetchers/curl.hpp"
+#include "uri/fetchers/hadoop.hpp"
+
+namespace mesos {
+namespace uri {
+namespace fetcher {
+
+/**
+ * The combined flags for all built-in plugins.
+ */
+class Flags :
+  public CurlFetcherPlugin::Flags,
+  public HadoopFetcherPlugin::Flags {};
+
+
+/**
+ * Factory method for creating a Fetcher instance.
+ */
+Try<process::Owned<Fetcher>> create(const Option<Flags>& flags = None());
+
+} // namespace fetcher {
+} // namespace uri {
+} // namespace mesos {
+
+#endif // __URI_FETCHER_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/0ed4155a/src/uri/fetchers/curl.cpp
----------------------------------------------------------------------
diff --git a/src/uri/fetchers/curl.cpp b/src/uri/fetchers/curl.cpp
index 6c607a6..0ff9833 100644
--- a/src/uri/fetchers/curl.cpp
+++ b/src/uri/fetchers/curl.cpp
@@ -35,6 +35,7 @@
 
 using namespace process;
 
+using std::set;
 using std::string;
 using std::vector;
 
@@ -48,7 +49,7 @@ static Future<Nothing> _fetch(const Future<std::tuple<
     Future<string>>>& future);
 
 
-Try<Owned<Fetcher::Plugin>> CurlFetcherPlugin::create()
+Try<Owned<Fetcher::Plugin>> CurlFetcherPlugin::create(const Flags& flags)
 {
   // TODO(jieyu): Make sure curl is available.
 
@@ -56,6 +57,12 @@ Try<Owned<Fetcher::Plugin>> CurlFetcherPlugin::create()
 }
 
 
+set<string> CurlFetcherPlugin::schemes()
+{
+  return {"http", "https", "ftp", "ftps"};
+}
+
+
 Future<Nothing> CurlFetcherPlugin::fetch(
     const URI& uri,
     const string& directory)

http://git-wip-us.apache.org/repos/asf/mesos/blob/0ed4155a/src/uri/fetchers/curl.hpp
----------------------------------------------------------------------
diff --git a/src/uri/fetchers/curl.hpp b/src/uri/fetchers/curl.hpp
index 401829d..447e01b 100644
--- a/src/uri/fetchers/curl.hpp
+++ b/src/uri/fetchers/curl.hpp
@@ -19,6 +19,7 @@
 
 #include <process/owned.hpp>
 
+#include <stout/flags.hpp>
 #include <stout/try.hpp>
 
 #include <mesos/uri/fetcher.hpp>
@@ -29,9 +30,13 @@ namespace uri {
 class CurlFetcherPlugin : public Fetcher::Plugin
 {
 public:
+  class Flags : public virtual flags::FlagsBase {};
+
+  static Try<process::Owned<Fetcher::Plugin>> create(const Flags& flags);
+
   virtual ~CurlFetcherPlugin() {}
 
-  static Try<process::Owned<Fetcher::Plugin>> create();
+  virtual std::set<std::string> schemes();
 
   virtual process::Future<Nothing> fetch(
       const URI& uri,

http://git-wip-us.apache.org/repos/asf/mesos/blob/0ed4155a/src/uri/fetchers/hadoop.cpp
----------------------------------------------------------------------
diff --git a/src/uri/fetchers/hadoop.cpp b/src/uri/fetchers/hadoop.cpp
new file mode 100644
index 0000000..69da5b5
--- /dev/null
+++ b/src/uri/fetchers/hadoop.cpp
@@ -0,0 +1,86 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include <stout/path.hpp>
+
+#include <stout/os/mkdir.hpp>
+
+#include "uri/fetchers/hadoop.hpp"
+
+using std::set;
+using std::string;
+
+using process::Failure;
+using process::Future;
+using process::Owned;
+
+namespace mesos {
+namespace uri {
+
+HadoopFetcherPlugin::Flags::Flags()
+{
+  add(&Flags::hadoop,
+      "hadoop",
+      "The path to the hadoop client\n");
+}
+
+
+Try<Owned<Fetcher::Plugin>> HadoopFetcherPlugin::create(const Flags& flags)
+{
+  Try<Owned<HDFS>> hdfs = HDFS::create(flags.hadoop);
+  if (hdfs.isError()) {
+    return Error("Failed to create HDFS client: " + hdfs.error());
+  }
+
+  return Owned<Fetcher::Plugin>(new HadoopFetcherPlugin(hdfs.get()));
+}
+
+
+set<string> HadoopFetcherPlugin::schemes()
+{
+  return {"hdfs", "hftp", "s3", "s3n"};
+}
+
+
+Future<Nothing> HadoopFetcherPlugin::fetch(
+    const URI& uri,
+    const string& directory)
+{
+  // TODO(jieyu): Validate the given URI.
+
+  if (!uri.has_path()) {
+    return Failure("URI path is not specified");
+  }
+
+  Try<Nothing> mkdir = os::mkdir(directory);
+  if (mkdir.isError()) {
+    return Failure(
+        "Failed to create directory '" +
+        directory + "': " + mkdir.error());
+  }
+
+  // NOTE: We ignore the scheme prefix if the host in URI is not
+  // specified. This is the case when the host is set using the hadoop
+  // configuration file.
+  //
+  // TODO(jieyu): Allow user to specify the name of the output file.
+  return hdfs.get()->copyToLocal(
+      (uri.has_host() ? stringify(uri) : uri.path()),
+      path::join(directory, Path(uri.path()).basename()));
+}
+
+} // namespace uri {
+} // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/0ed4155a/src/uri/fetchers/hadoop.hpp
----------------------------------------------------------------------
diff --git a/src/uri/fetchers/hadoop.hpp b/src/uri/fetchers/hadoop.hpp
new file mode 100644
index 0000000..0665684
--- /dev/null
+++ b/src/uri/fetchers/hadoop.hpp
@@ -0,0 +1,65 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef __URI_FETCHERS_HADOOP_HPP__
+#define __URI_FETCHERS_HADOOP_HPP__
+
+#include <process/owned.hpp>
+
+#include <stout/flags.hpp>
+#include <stout/option.hpp>
+#include <stout/try.hpp>
+
+#include <mesos/mesos.hpp>
+
+#include <mesos/uri/fetcher.hpp>
+
+#include "hdfs/hdfs.hpp"
+
+namespace mesos {
+namespace uri {
+
+class HadoopFetcherPlugin : public Fetcher::Plugin
+{
+public:
+  class Flags : public virtual flags::FlagsBase
+  {
+  public:
+    Flags();
+
+    Option<std::string> hadoop;
+  };
+
+  static Try<process::Owned<Fetcher::Plugin>> create(const Flags& flags);
+
+  virtual ~HadoopFetcherPlugin() {}
+
+  virtual std::set<std::string> schemes();
+
+  virtual process::Future<Nothing> fetch(
+      const URI& uri,
+      const std::string& directory);
+
+private:
+  HadoopFetcherPlugin(process::Owned<HDFS> _hdfs) : hdfs(_hdfs) {}
+
+  process::Owned<HDFS> hdfs;
+};
+
+} // namespace uri {
+} // namespace mesos {
+
+#endif // __URI_FETCHERS_HADOOP_HPP__


[2/2] mesos git commit: Added tests for HDFS URI fetcher plugin.

Posted by ji...@apache.org.
Added tests for HDFS URI fetcher plugin.

Review: https://reviews.apache.org/r/41714


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/f05eb74b
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/f05eb74b
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/f05eb74b

Branch: refs/heads/master
Commit: f05eb74b905ebb1eec2db0b694e2ff0c34413d26
Parents: 0ed4155
Author: Jie Yu <yu...@gmail.com>
Authored: Thu Dec 24 21:29:37 2015 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Wed Dec 30 21:27:31 2015 -0800

----------------------------------------------------------------------
 src/Makefile.am                 |  1 +
 src/tests/uri_fetcher_tests.cpp | 88 +++++++++++++++++++++++++++++++++++-
 src/uri/schemes/hdfs.hpp        | 41 +++++++++++++++++
 3 files changed, 128 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/f05eb74b/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index 1f6c737..b58d6b5 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -767,6 +767,7 @@ libmesos_no_3rdparty_la_SOURCES +=					\
   uri/fetchers/curl.hpp							\
   uri/fetchers/hadoop.hpp						\
   uri/schemes/file.hpp							\
+  uri/schemes/hdfs.hpp							\
   uri/schemes/http.hpp							\
   usage/usage.hpp							\
   version/version.hpp							\

http://git-wip-us.apache.org/repos/asf/mesos/blob/f05eb74b/src/tests/uri_fetcher_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/uri_fetcher_tests.cpp b/src/tests/uri_fetcher_tests.cpp
index 7f54927..ca1710c 100644
--- a/src/tests/uri_fetcher_tests.cpp
+++ b/src/tests/uri_fetcher_tests.cpp
@@ -21,27 +21,33 @@
 #include <process/http.hpp>
 #include <process/process.hpp>
 
+#include <stout/check.hpp>
 #include <stout/gtest.hpp>
+#include <stout/os.hpp>
 #include <stout/path.hpp>
 
 #include <stout/os/exists.hpp>
 #include <stout/os/getcwd.hpp>
+#include <stout/os/write.hpp>
 
 #include <stout/tests/utils.hpp>
 
 #include "uri/fetcher.hpp"
 
+#include "uri/schemes/hdfs.hpp"
 #include "uri/schemes/http.hpp"
 
 namespace http = process::http;
 
-using testing::_;
-using testing::Return;
+using std::string;
 
 using process::Future;
 using process::Owned;
 using process::Process;
 
+using testing::_;
+using testing::Return;
+
 namespace mesos {
 namespace internal {
 namespace tests {
@@ -115,6 +121,84 @@ TEST_F(CurlFetcherPluginTest, CURL_InvalidUri)
   AWAIT_FAILED(fetcher.get()->fetch(uri, os::getcwd()));
 }
 
+
+class HadoopFetcherPluginTest : public TemporaryDirectoryTest
+{
+public:
+  virtual void SetUp()
+  {
+    TemporaryDirectoryTest::SetUp();
+
+    // Create a fake hadoop command line tool. It emulates the hadoop
+    // client's logic while operating on the local filesystem.
+    hadoop = path::join(os::getcwd(), "hadoop");
+
+    // The script emulating 'hadoop fs -copyToLocal <from> <to>'.
+    // NOTE: We emulate a version call here which is exercised when
+    // creating the HDFS client. But, we don't expect any other
+    // command to be called.
+    ASSERT_SOME(os::write(
+        hadoop,
+        "#!/bin/sh\n"
+        "if [ \"$1\" = \"version\" ]; then\n"
+        "  exit 0\n"
+        "fi\n"
+        "if [ \"$1\" != \"fs\" ]; then\n"
+        "  exit 1\n"
+        "fi\n"
+        "if [ \"$2\" != \"-copyToLocal\" ]; then\n"
+        "  exit 1\n"
+        "fi\n"
+        "cp $3 $4\n"));
+
+    // Make sure the script has execution permission.
+    ASSERT_SOME(os::chmod(
+        hadoop,
+        S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH | S_IXOTH));
+  }
+
+protected:
+  string hadoop;
+};
+
+
+TEST_F(HadoopFetcherPluginTest, FetchExistingFile)
+{
+  string file = path::join(os::getcwd(), "file");
+
+  ASSERT_SOME(os::write(file, "abc"));
+
+  URI uri = uri::hdfs(file);
+
+  uri::fetcher::Flags flags;
+  flags.hadoop = hadoop;
+
+  Try<Owned<uri::Fetcher>> fetcher = uri::fetcher::create(flags);
+  ASSERT_SOME(fetcher);
+
+  string dir = path::join(os::getcwd(), "dir");
+
+  AWAIT_READY(fetcher.get()->fetch(uri, dir));
+
+  EXPECT_SOME_EQ("abc", os::read(path::join(dir, "file")));
+}
+
+
+TEST_F(HadoopFetcherPluginTest, FetchNonExistingFile)
+{
+  URI uri = uri::hdfs(path::join(os::getcwd(), "non-exist"));
+
+  uri::fetcher::Flags flags;
+  flags.hadoop = hadoop;
+
+  Try<Owned<uri::Fetcher>> fetcher = uri::fetcher::create(flags);
+  ASSERT_SOME(fetcher);
+
+  string dir = path::join(os::getcwd(), "dir");
+
+  AWAIT_FAILED(fetcher.get()->fetch(uri, dir));
+}
+
 } // namespace tests {
 } // namespace internal {
 } // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/f05eb74b/src/uri/schemes/hdfs.hpp
----------------------------------------------------------------------
diff --git a/src/uri/schemes/hdfs.hpp b/src/uri/schemes/hdfs.hpp
new file mode 100644
index 0000000..46b9055
--- /dev/null
+++ b/src/uri/schemes/hdfs.hpp
@@ -0,0 +1,41 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef __URI_SCHEMES_HDFS_HPP__
+#define __URI_SCHEMES_HDFS_HPP__
+
+#include <string>
+
+#include <mesos/uri/uri.hpp>
+
+#include "uri/utils.hpp"
+
+namespace mesos {
+namespace uri {
+
+/**
+ * Creates an hdfs URI with the given path.
+ */
+inline URI hdfs(const std::string& path)
+{
+  return construct("hdfs", path);
+}
+
+} // namespace uri {
+} // namespace mesos {
+
+
+#endif // __URI_SCHEMES_HDFS_HPP__