You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2015/12/31 20:42:01 UTC
[1/2] mesos git commit: Added HDFS URI fetcher plugin.
Repository: mesos
Updated Branches:
refs/heads/master 4568e584d -> f05eb74b9
Added HDFS URI fetcher plugin.
Review: https://reviews.apache.org/r/41713
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/0ed4155a
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/0ed4155a
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/0ed4155a
Branch: refs/heads/master
Commit: 0ed4155a8efc8fe113def7f24fb78b466440f508
Parents: 4568e58
Author: Jie Yu <yu...@gmail.com>
Authored: Tue Dec 15 14:26:14 2015 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Wed Dec 30 21:23:28 2015 -0800
----------------------------------------------------------------------
include/mesos/uri/fetcher.hpp | 27 +++++++----
include/mesos/uri/uri.hpp | 6 +--
src/Makefile.am | 3 ++
src/tests/uri_fetcher_tests.cpp | 13 +++---
src/uri/fetcher.cpp | 54 ++++++++++++++++------
src/uri/fetcher.hpp | 52 ++++++++++++++++++++++
src/uri/fetchers/curl.cpp | 9 +++-
src/uri/fetchers/curl.hpp | 7 ++-
src/uri/fetchers/hadoop.cpp | 86 ++++++++++++++++++++++++++++++++++++
src/uri/fetchers/hadoop.hpp | 65 +++++++++++++++++++++++++++
10 files changed, 290 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/0ed4155a/include/mesos/uri/fetcher.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/uri/fetcher.hpp b/include/mesos/uri/fetcher.hpp
index 6cf7b37..4223f9e 100644
--- a/include/mesos/uri/fetcher.hpp
+++ b/include/mesos/uri/fetcher.hpp
@@ -14,9 +14,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-#ifndef __URI_FETCHER_HPP__
-#define __URI_FETCHER_HPP__
+#ifndef __MESOS_URI_FETCHER_HPP__
+#define __MESOS_URI_FETCHER_HPP__
+#include <set>
#include <string>
#include <process/future.hpp>
@@ -24,13 +25,16 @@
#include <stout/hashmap.hpp>
#include <stout/nothing.hpp>
-#include <stout/try.hpp>
#include <mesos/uri/uri.hpp>
namespace mesos {
namespace uri {
+// Forward declarations.
+class Flags;
+
+
/**
* Provides an abstraction for fetching URIs. It is pluggable through
* plugins. Each plugin is responsible for one or more URI schemes,
@@ -50,6 +54,11 @@ public:
virtual ~Plugin() {}
/**
+ * Returns the URI schemes that this plugin handles.
+ */
+ virtual std::set<std::string> schemes() = 0;
+
+ /**
* Fetches a URI to the given directory. To avoid blocking or
* crashing the current thread, this method might choose to fork
* subprocesses for third party commands.
@@ -63,9 +72,12 @@ public:
};
/**
- * Factory method for creating a Fetcher instance.
+ * Create the Fetcher instance with the given plugins.
+ *
+ * @param _plugins a URI scheme to plugin map
*/
- static Try<process::Owned<Fetcher>> create();
+ Fetcher(const hashmap<std::string, process::Owned<Plugin>>& _plugins)
+ : plugins(_plugins) {}
/**
* Fetches a URI to the given directory. This method will dispatch
@@ -79,9 +91,6 @@ public:
const std::string& directory);
private:
- Fetcher(const hashmap<std::string, process::Owned<Plugin>>& _plugins)
- : plugins(_plugins) {}
-
Fetcher(const Fetcher&) = delete; // Not copyable.
Fetcher& operator=(const Fetcher&) = delete; // Not assignable.
@@ -91,4 +100,4 @@ private:
} // namespace uri {
} // namespace mesos {
-#endif // __URI_FETCHER_HPP__
+#endif // __MESOS_URI_FETCHER_HPP__
http://git-wip-us.apache.org/repos/asf/mesos/blob/0ed4155a/include/mesos/uri/uri.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/uri/uri.hpp b/include/mesos/uri/uri.hpp
index aa3ab5d..75f08c8 100644
--- a/include/mesos/uri/uri.hpp
+++ b/include/mesos/uri/uri.hpp
@@ -14,8 +14,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-#ifndef __URI_PROTO_HPP__
-#define __URI_PROTO_HPP__
+#ifndef __MESOS_URI_PROTO_HPP__
+#define __MESOS_URI_PROTO_HPP__
#include <ostream>
@@ -28,4 +28,4 @@ std::ostream& operator<<(std::ostream& stream, const URI& uri);
} // namespace mesos {
-#endif // __URI_PROTO_HPP__
+#endif // __MESOS_URI_PROTO_HPP__
http://git-wip-us.apache.org/repos/asf/mesos/blob/0ed4155a/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index bf71fe6..1f6c737 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -636,6 +636,7 @@ libmesos_no_3rdparty_la_SOURCES += \
uri/fetcher.cpp \
uri/utils.cpp \
uri/fetchers/curl.cpp \
+ uri/fetchers/hadoop.cpp \
usage/usage.cpp \
v1/attributes.cpp \
v1/mesos.cpp \
@@ -761,8 +762,10 @@ libmesos_no_3rdparty_la_SOURCES += \
tests/containerizer/rootfs.hpp \
tests/containerizer/setns_test_helper.hpp \
tests/containerizer/store.hpp \
+ uri/fetcher.hpp \
uri/utils.hpp \
uri/fetchers/curl.hpp \
+ uri/fetchers/hadoop.hpp \
uri/schemes/file.hpp \
uri/schemes/http.hpp \
usage/usage.hpp \
http://git-wip-us.apache.org/repos/asf/mesos/blob/0ed4155a/src/tests/uri_fetcher_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/uri_fetcher_tests.cpp b/src/tests/uri_fetcher_tests.cpp
index d54a7e7..7f54927 100644
--- a/src/tests/uri_fetcher_tests.cpp
+++ b/src/tests/uri_fetcher_tests.cpp
@@ -29,16 +29,19 @@
#include <stout/tests/utils.hpp>
-#include <mesos/uri/fetcher.hpp>
-#include <mesos/uri/uri.hpp>
+#include "uri/fetcher.hpp"
#include "uri/schemes/http.hpp"
-using namespace process;
+namespace http = process::http;
using testing::_;
using testing::Return;
+using process::Future;
+using process::Owned;
+using process::Process;
+
namespace mesos {
namespace internal {
namespace tests {
@@ -87,7 +90,7 @@ TEST_F(CurlFetcherPluginTest, CURL_ValidUri)
EXPECT_CALL(server, test(_))
.WillOnce(Return(http::OK("test")));
- Try<Owned<uri::Fetcher>> fetcher = uri::Fetcher::create();
+ Try<Owned<uri::Fetcher>> fetcher = uri::fetcher::create();
ASSERT_SOME(fetcher);
AWAIT_READY(fetcher.get()->fetch(uri, os::getcwd()));
@@ -106,7 +109,7 @@ TEST_F(CurlFetcherPluginTest, CURL_InvalidUri)
EXPECT_CALL(server, test(_))
.WillOnce(Return(http::NotFound()));
- Try<Owned<uri::Fetcher>> fetcher = uri::Fetcher::create();
+ Try<Owned<uri::Fetcher>> fetcher = uri::fetcher::create();
ASSERT_SOME(fetcher);
AWAIT_FAILED(fetcher.get()->fetch(uri, os::getcwd()));
http://git-wip-us.apache.org/repos/asf/mesos/blob/0ed4155a/src/uri/fetcher.cpp
----------------------------------------------------------------------
diff --git a/src/uri/fetcher.cpp b/src/uri/fetcher.cpp
index 8bd7cf3..ac13fbd 100644
--- a/src/uri/fetcher.cpp
+++ b/src/uri/fetcher.cpp
@@ -14,38 +14,66 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-#include <mesos/uri/fetcher.hpp>
+#include <string>
-#include "uri/fetchers/curl.hpp"
+#include <stout/foreach.hpp>
+#include <stout/lambda.hpp>
+#include <stout/hashmap.hpp>
-using namespace process;
+#include "uri/fetcher.hpp"
using std::string;
+using process::Failure;
+using process::Future;
+using process::Owned;
+
namespace mesos {
namespace uri {
+namespace fetcher {
-Try<Owned<Fetcher>> Fetcher::create()
+Try<Owned<Fetcher>> create(const Option<Flags>& _flags)
{
- hashmap<string, Owned<Plugin>> plugins;
+ // Use the default flags if not specified.
+ Flags flags;
+ if (_flags.isSome()) {
+ flags = _flags.get();
+ }
+
+ // Load built-in plugins.
+ typedef lambda::function<Try<Owned<Fetcher::Plugin>>()> Creator;
+
+ hashmap<string, Creator> creators;
+ creators.put("curl", lambda::bind(&CurlFetcherPlugin::create, flags));
+ creators.put("hadoop", lambda::bind(&HadoopFetcherPlugin::create, flags));
- hashmap<string, Try<Owned<Plugin>>(*)()> creators;
- creators.put("http", &CurlFetcherPlugin::create);
+ hashmap<string, Owned<Fetcher::Plugin>> plugins;
- foreachkey (const string& scheme, creators) {
- Try<Owned<Plugin>> plugin = creators[scheme]();
+ foreachpair (const string& name, const Creator& creator, creators) {
+ Try<Owned<Fetcher::Plugin>> plugin = creator();
if (plugin.isError()) {
- return Error(
- "Failed to create plugin for scheme '" +
- scheme + "': " + plugin.error());
+ // NOTE: We skip the plugin if it cannot be created, instead of
+ // returning an Error so that we can still use other plugins.
+ LOG(ERROR) << "Failed to create URI fetcher plugin "
+ << "'" << name << "': " << plugin.error();
+ continue;
}
- plugins.put(scheme, plugin.get());
+ foreach (const string& scheme, plugin.get()->schemes()) {
+ if (plugins.contains(scheme)) {
+ LOG(WARNING) << "Multiple URI fetcher plugins register "
+ << "URI scheme '" << scheme << "'";
+ }
+
+ plugins.put(scheme, plugin.get());
+ }
}
return Owned<Fetcher>(new Fetcher(plugins));
}
+} // namespace fetcher {
+
Future<Nothing> Fetcher::fetch(
const URI& uri,
http://git-wip-us.apache.org/repos/asf/mesos/blob/0ed4155a/src/uri/fetcher.hpp
----------------------------------------------------------------------
diff --git a/src/uri/fetcher.hpp b/src/uri/fetcher.hpp
new file mode 100644
index 0000000..d5182a5
--- /dev/null
+++ b/src/uri/fetcher.hpp
@@ -0,0 +1,52 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef __URI_FETCHER_HPP__
+#define __URI_FETCHER_HPP__
+
+#include <process/owned.hpp>
+
+#include <stout/none.hpp>
+#include <stout/option.hpp>
+#include <stout/try.hpp>
+
+#include <mesos/uri/fetcher.hpp>
+
+#include "uri/fetchers/curl.hpp"
+#include "uri/fetchers/hadoop.hpp"
+
+namespace mesos {
+namespace uri {
+namespace fetcher {
+
+/**
+ * The combined flags for all built-in plugins.
+ */
+class Flags :
+ public CurlFetcherPlugin::Flags,
+ public HadoopFetcherPlugin::Flags {};
+
+
+/**
+ * Factory method for creating a Fetcher instance.
+ */
+Try<process::Owned<Fetcher>> create(const Option<Flags>& flags = None());
+
+} // namespace fetcher {
+} // namespace uri {
+} // namespace mesos {
+
+#endif // __URI_FETCHER_HPP__
http://git-wip-us.apache.org/repos/asf/mesos/blob/0ed4155a/src/uri/fetchers/curl.cpp
----------------------------------------------------------------------
diff --git a/src/uri/fetchers/curl.cpp b/src/uri/fetchers/curl.cpp
index 6c607a6..0ff9833 100644
--- a/src/uri/fetchers/curl.cpp
+++ b/src/uri/fetchers/curl.cpp
@@ -35,6 +35,7 @@
using namespace process;
+using std::set;
using std::string;
using std::vector;
@@ -48,7 +49,7 @@ static Future<Nothing> _fetch(const Future<std::tuple<
Future<string>>>& future);
-Try<Owned<Fetcher::Plugin>> CurlFetcherPlugin::create()
+Try<Owned<Fetcher::Plugin>> CurlFetcherPlugin::create(const Flags& flags)
{
// TODO(jieyu): Make sure curl is available.
@@ -56,6 +57,12 @@ Try<Owned<Fetcher::Plugin>> CurlFetcherPlugin::create()
}
+set<string> CurlFetcherPlugin::schemes()
+{
+ return {"http", "https", "ftp", "ftps"};
+}
+
+
Future<Nothing> CurlFetcherPlugin::fetch(
const URI& uri,
const string& directory)
http://git-wip-us.apache.org/repos/asf/mesos/blob/0ed4155a/src/uri/fetchers/curl.hpp
----------------------------------------------------------------------
diff --git a/src/uri/fetchers/curl.hpp b/src/uri/fetchers/curl.hpp
index 401829d..447e01b 100644
--- a/src/uri/fetchers/curl.hpp
+++ b/src/uri/fetchers/curl.hpp
@@ -19,6 +19,7 @@
#include <process/owned.hpp>
+#include <stout/flags.hpp>
#include <stout/try.hpp>
#include <mesos/uri/fetcher.hpp>
@@ -29,9 +30,13 @@ namespace uri {
class CurlFetcherPlugin : public Fetcher::Plugin
{
public:
+ class Flags : public virtual flags::FlagsBase {};
+
+ static Try<process::Owned<Fetcher::Plugin>> create(const Flags& flags);
+
virtual ~CurlFetcherPlugin() {}
- static Try<process::Owned<Fetcher::Plugin>> create();
+ virtual std::set<std::string> schemes();
virtual process::Future<Nothing> fetch(
const URI& uri,
http://git-wip-us.apache.org/repos/asf/mesos/blob/0ed4155a/src/uri/fetchers/hadoop.cpp
----------------------------------------------------------------------
diff --git a/src/uri/fetchers/hadoop.cpp b/src/uri/fetchers/hadoop.cpp
new file mode 100644
index 0000000..69da5b5
--- /dev/null
+++ b/src/uri/fetchers/hadoop.cpp
@@ -0,0 +1,86 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include <stout/path.hpp>
+
+#include <stout/os/mkdir.hpp>
+
+#include "uri/fetchers/hadoop.hpp"
+
+using std::set;
+using std::string;
+
+using process::Failure;
+using process::Future;
+using process::Owned;
+
+namespace mesos {
+namespace uri {
+
+HadoopFetcherPlugin::Flags::Flags()
+{
+ add(&Flags::hadoop,
+ "hadoop",
+ "The path to the hadoop client\n");
+}
+
+
+Try<Owned<Fetcher::Plugin>> HadoopFetcherPlugin::create(const Flags& flags)
+{
+ Try<Owned<HDFS>> hdfs = HDFS::create(flags.hadoop);
+ if (hdfs.isError()) {
+ return Error("Failed to create HDFS client: " + hdfs.error());
+ }
+
+ return Owned<Fetcher::Plugin>(new HadoopFetcherPlugin(hdfs.get()));
+}
+
+
+set<string> HadoopFetcherPlugin::schemes()
+{
+ return {"hdfs", "hftp", "s3", "s3n"};
+}
+
+
+Future<Nothing> HadoopFetcherPlugin::fetch(
+ const URI& uri,
+ const string& directory)
+{
+ // TODO(jieyu): Validate the given URI.
+
+ if (!uri.has_path()) {
+ return Failure("URI path is not specified");
+ }
+
+ Try<Nothing> mkdir = os::mkdir(directory);
+ if (mkdir.isError()) {
+ return Failure(
+ "Failed to create directory '" +
+ directory + "': " + mkdir.error());
+ }
+
+ // NOTE: We ignore the scheme prefix if the host in URI is not
+ // specified. This is the case when the host is set using the hadoop
+ // configuration file.
+ //
+ // TODO(jieyu): Allow user to specify the name of the output file.
+ return hdfs.get()->copyToLocal(
+ (uri.has_host() ? stringify(uri) : uri.path()),
+ path::join(directory, Path(uri.path()).basename()));
+}
+
+} // namespace uri {
+} // namespace mesos {
http://git-wip-us.apache.org/repos/asf/mesos/blob/0ed4155a/src/uri/fetchers/hadoop.hpp
----------------------------------------------------------------------
diff --git a/src/uri/fetchers/hadoop.hpp b/src/uri/fetchers/hadoop.hpp
new file mode 100644
index 0000000..0665684
--- /dev/null
+++ b/src/uri/fetchers/hadoop.hpp
@@ -0,0 +1,65 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef __URI_FETCHERS_HADOOP_HPP__
+#define __URI_FETCHERS_HADOOP_HPP__
+
+#include <process/owned.hpp>
+
+#include <stout/flags.hpp>
+#include <stout/option.hpp>
+#include <stout/try.hpp>
+
+#include <mesos/mesos.hpp>
+
+#include <mesos/uri/fetcher.hpp>
+
+#include "hdfs/hdfs.hpp"
+
+namespace mesos {
+namespace uri {
+
+class HadoopFetcherPlugin : public Fetcher::Plugin
+{
+public:
+ class Flags : public virtual flags::FlagsBase
+ {
+ public:
+ Flags();
+
+ Option<std::string> hadoop;
+ };
+
+ static Try<process::Owned<Fetcher::Plugin>> create(const Flags& flags);
+
+ virtual ~HadoopFetcherPlugin() {}
+
+ virtual std::set<std::string> schemes();
+
+ virtual process::Future<Nothing> fetch(
+ const URI& uri,
+ const std::string& directory);
+
+private:
+ HadoopFetcherPlugin(process::Owned<HDFS> _hdfs) : hdfs(_hdfs) {}
+
+ process::Owned<HDFS> hdfs;
+};
+
+} // namespace uri {
+} // namespace mesos {
+
+#endif // __URI_FETCHERS_HADOOP_HPP__
[2/2] mesos git commit: Added tests for HDFS URI fetcher plugin.
Posted by ji...@apache.org.
Added tests for HDFS URI fetcher plugin.
Review: https://reviews.apache.org/r/41714
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/f05eb74b
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/f05eb74b
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/f05eb74b
Branch: refs/heads/master
Commit: f05eb74b905ebb1eec2db0b694e2ff0c34413d26
Parents: 0ed4155
Author: Jie Yu <yu...@gmail.com>
Authored: Thu Dec 24 21:29:37 2015 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Wed Dec 30 21:27:31 2015 -0800
----------------------------------------------------------------------
src/Makefile.am | 1 +
src/tests/uri_fetcher_tests.cpp | 88 +++++++++++++++++++++++++++++++++++-
src/uri/schemes/hdfs.hpp | 41 +++++++++++++++++
3 files changed, 128 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/f05eb74b/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index 1f6c737..b58d6b5 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -767,6 +767,7 @@ libmesos_no_3rdparty_la_SOURCES += \
uri/fetchers/curl.hpp \
uri/fetchers/hadoop.hpp \
uri/schemes/file.hpp \
+ uri/schemes/hdfs.hpp \
uri/schemes/http.hpp \
usage/usage.hpp \
version/version.hpp \
http://git-wip-us.apache.org/repos/asf/mesos/blob/f05eb74b/src/tests/uri_fetcher_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/uri_fetcher_tests.cpp b/src/tests/uri_fetcher_tests.cpp
index 7f54927..ca1710c 100644
--- a/src/tests/uri_fetcher_tests.cpp
+++ b/src/tests/uri_fetcher_tests.cpp
@@ -21,27 +21,33 @@
#include <process/http.hpp>
#include <process/process.hpp>
+#include <stout/check.hpp>
#include <stout/gtest.hpp>
+#include <stout/os.hpp>
#include <stout/path.hpp>
#include <stout/os/exists.hpp>
#include <stout/os/getcwd.hpp>
+#include <stout/os/write.hpp>
#include <stout/tests/utils.hpp>
#include "uri/fetcher.hpp"
+#include "uri/schemes/hdfs.hpp"
#include "uri/schemes/http.hpp"
namespace http = process::http;
-using testing::_;
-using testing::Return;
+using std::string;
using process::Future;
using process::Owned;
using process::Process;
+using testing::_;
+using testing::Return;
+
namespace mesos {
namespace internal {
namespace tests {
@@ -115,6 +121,84 @@ TEST_F(CurlFetcherPluginTest, CURL_InvalidUri)
AWAIT_FAILED(fetcher.get()->fetch(uri, os::getcwd()));
}
+
+class HadoopFetcherPluginTest : public TemporaryDirectoryTest
+{
+public:
+ virtual void SetUp()
+ {
+ TemporaryDirectoryTest::SetUp();
+
+ // Create a fake hadoop command line tool. It emulates the hadoop
+ // client's logic while operating on the local filesystem.
+ hadoop = path::join(os::getcwd(), "hadoop");
+
+ // The script emulating 'hadoop fs -copyToLocal <from> <to>'.
+ // NOTE: We emulate a version call here which is exercised when
+ // creating the HDFS client. But, we don't expect any other
+ // command to be called.
+ ASSERT_SOME(os::write(
+ hadoop,
+ "#!/bin/sh\n"
+ "if [ \"$1\" = \"version\" ]; then\n"
+ " exit 0\n"
+ "fi\n"
+ "if [ \"$1\" != \"fs\" ]; then\n"
+ " exit 1\n"
+ "fi\n"
+ "if [ \"$2\" != \"-copyToLocal\" ]; then\n"
+ " exit 1\n"
+ "fi\n"
+ "cp $3 $4\n"));
+
+ // Make sure the script has execution permission.
+ ASSERT_SOME(os::chmod(
+ hadoop,
+ S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH | S_IXOTH));
+ }
+
+protected:
+ string hadoop;
+};
+
+
+TEST_F(HadoopFetcherPluginTest, FetchExistingFile)
+{
+ string file = path::join(os::getcwd(), "file");
+
+ ASSERT_SOME(os::write(file, "abc"));
+
+ URI uri = uri::hdfs(file);
+
+ uri::fetcher::Flags flags;
+ flags.hadoop = hadoop;
+
+ Try<Owned<uri::Fetcher>> fetcher = uri::fetcher::create(flags);
+ ASSERT_SOME(fetcher);
+
+ string dir = path::join(os::getcwd(), "dir");
+
+ AWAIT_READY(fetcher.get()->fetch(uri, dir));
+
+ EXPECT_SOME_EQ("abc", os::read(path::join(dir, "file")));
+}
+
+
+TEST_F(HadoopFetcherPluginTest, FetchNonExistingFile)
+{
+ URI uri = uri::hdfs(path::join(os::getcwd(), "non-exist"));
+
+ uri::fetcher::Flags flags;
+ flags.hadoop = hadoop;
+
+ Try<Owned<uri::Fetcher>> fetcher = uri::fetcher::create(flags);
+ ASSERT_SOME(fetcher);
+
+ string dir = path::join(os::getcwd(), "dir");
+
+ AWAIT_FAILED(fetcher.get()->fetch(uri, dir));
+}
+
} // namespace tests {
} // namespace internal {
} // namespace mesos {
http://git-wip-us.apache.org/repos/asf/mesos/blob/f05eb74b/src/uri/schemes/hdfs.hpp
----------------------------------------------------------------------
diff --git a/src/uri/schemes/hdfs.hpp b/src/uri/schemes/hdfs.hpp
new file mode 100644
index 0000000..46b9055
--- /dev/null
+++ b/src/uri/schemes/hdfs.hpp
@@ -0,0 +1,41 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef __URI_SCHEMES_HDFS_HPP__
+#define __URI_SCHEMES_HDFS_HPP__
+
+#include <string>
+
+#include <mesos/uri/uri.hpp>
+
+#include "uri/utils.hpp"
+
+namespace mesos {
+namespace uri {
+
+/**
+ * Creates an hdfs URI with the given path.
+ */
+inline URI hdfs(const std::string& path)
+{
+ return construct("hdfs", path);
+}
+
+} // namespace uri {
+} // namespace mesos {
+
+
+#endif // __URI_SCHEMES_HDFS_HPP__