You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by jh...@apache.org on 2015/12/16 16:57:11 UTC
hadoop git commit: HDFS-9524. libhdfs++ deadlocks in Filesystem::New
if NN connection fails. Contributed by Bob Hansen.
Repository: hadoop
Updated Branches:
refs/heads/HDFS-8707 0cf5e66ab -> 522610d95
HDFS-9524. libhdfs++ deadlocks in Filesystem::New if NN connection fails. Contributed by Bob Hansen.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/522610d9
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/522610d9
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/522610d9
Branch: refs/heads/HDFS-8707
Commit: 522610d95c202bc139d9ad132205f0609011d337
Parents: 0cf5e66
Author: James <jh...@apache.org>
Authored: Wed Dec 16 10:56:21 2015 -0500
Committer: James <jh...@apache.org>
Committed: Wed Dec 16 10:56:21 2015 -0500
----------------------------------------------------------------------
.../libhdfs-tests/test_libhdfs_threaded.c | 4 ++
.../native/libhdfspp/include/libhdfspp/hdfs.h | 19 +++---
.../native/libhdfspp/lib/bindings/c/hdfs.cc | 8 ++-
.../main/native/libhdfspp/lib/fs/filesystem.cc | 64 ++++++++------------
.../main/native/libhdfspp/lib/fs/filesystem.h | 11 ++--
5 files changed, 52 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/522610d9/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/test_libhdfs_threaded.c
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/test_libhdfs_threaded.c b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/test_libhdfs_threaded.c
index 032acbf..a922ff8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/test_libhdfs_threaded.c
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/test_libhdfs_threaded.c
@@ -266,6 +266,8 @@ static int testHdfsOperationsImpl(struct tlhThreadInfo *ti)
fprintf(stderr, "testHdfsOperations(threadIdx=%d): starting\n",
ti->threadIdx);
EXPECT_ZERO(hdfsSingleNameNodeConnect(tlhCluster, &fs, NULL));
+ if (!fs)
+ return 1;
EXPECT_ZERO(setupPaths(ti, &paths));
// test some operations
EXPECT_ZERO(doTestHdfsOperations(ti, fs, &paths));
@@ -276,6 +278,8 @@ static int testHdfsOperationsImpl(struct tlhThreadInfo *ti)
EXPECT_ZERO(hdfsDisconnect(fs));
// reconnect to do the final delete.
EXPECT_ZERO(hdfsSingleNameNodeConnect(tlhCluster, &fs, NULL));
+ if (!fs)
+ return 1;
EXPECT_ZERO(hdfsDelete(fs, paths.prefix, 1));
EXPECT_ZERO(hdfsDisconnect(fs));
return 0;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/522610d9/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/libhdfspp/hdfs.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/libhdfspp/hdfs.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/libhdfspp/hdfs.h
index dfff20b..7dc3f88 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/libhdfspp/hdfs.h
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/libhdfspp/hdfs.h
@@ -115,15 +115,16 @@ class FileSystem {
* initializes the RPC connections to the NameNode and returns an
* FileSystem object.
**/
- static void New(
- IoService *io_service, const Options &options, const std::string &server,
+ static FileSystem * New(
+ IoService *&io_service, const Options &options);
+
+ virtual void Connect(const std::string &server,
const std::string &service,
- const std::function<void(const Status &, FileSystem *)> &handler);
+ const std::function<void(const Status &, FileSystem *)> &&handler) = 0;
- /* Synchronous call of New*/
- static FileSystem *
- New(IoService *io_service, const Options &options, const std::string &server,
- const std::string &service);
+ /* Synchronous call of Connect */
+ virtual Status Connect(const std::string &server,
+ const std::string &service) = 0;
/**
* Open a file on HDFS. The call issues an RPC to the NameNode to
@@ -135,6 +136,10 @@ class FileSystem {
const std::function<void(const Status &, FileHandle *)> &handler) = 0;
virtual Status Open(const std::string &path, FileHandle **handle) = 0;
+ /**
+ * Note that it is an error to destroy the filesystem from within a filesystem
+ * callback. It will lead to a deadlock and the termination of the process.
+ */
virtual ~FileSystem() {};
};
http://git-wip-us.apache.org/repos/asf/hadoop/blob/522610d9/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc
index 802b3ea..d23c7b0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc
@@ -112,8 +112,12 @@ int hdfsFileIsOpenForRead(hdfsFile file) {
hdfsFS hdfsConnect(const char *nn, tPort port) {
std::string port_as_string = std::to_string(port);
IoService * io_service = IoService::New();
- FileSystem *fs = FileSystem::New(io_service, Options(), nn, port_as_string);
+ FileSystem *fs = FileSystem::New(io_service, Options());
if (!fs) {
+ return nullptr;
+ }
+
+ if (!fs->Connect(nn, port_as_string).ok()) {
ReportError(ENODEV, "Unable to connect to NameNode.");
// FileSystem's ctor might take ownership of the io_service; if it does,
@@ -121,6 +125,8 @@ hdfsFS hdfsConnect(const char *nn, tPort port) {
if (io_service)
delete io_service;
+ delete fs;
+
return nullptr;
}
return new hdfs_internal(fs);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/522610d9/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc
index 1808b85..1a1163b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc
@@ -26,6 +26,7 @@
#include <limits>
#include <future>
#include <tuple>
+#include <iostream>
namespace hdfs {
@@ -41,7 +42,7 @@ using ::asio::ip::tcp;
void NameNodeOperations::Connect(const std::string &server,
const std::string &service,
- std::function<void(const Status &)> &handler) {
+ std::function<void(const Status &)> &&handler) {
using namespace asio_continuation;
typedef std::vector<tcp::endpoint> State;
auto m = Pipeline<State>::Create();
@@ -106,41 +107,9 @@ void NameNodeOperations::GetBlockLocations(const std::string & path,
* FILESYSTEM BASE CLASS
****************************************************************************/
-void FileSystem::New(
- IoService *io_service, const Options &options, const std::string &server,
- const std::string &service,
- const std::function<void(const Status &, FileSystem *)> &handler) {
- FileSystemImpl *impl = new FileSystemImpl(io_service, options);
- impl->Connect(server, service, [impl, handler](const Status &stat) {
- if (stat.ok()) {
- handler(stat, impl);
- } else {
- delete impl;
- handler(stat, nullptr);
- }
- });
-}
-
FileSystem * FileSystem::New(
- IoService *io_service, const Options &options, const std::string &server,
- const std::string &service) {
- auto callstate = std::make_shared<std::promise<std::tuple<Status, FileSystem *>>>();
- std::future<std::tuple<Status, FileSystem *>> future(callstate->get_future());
-
- auto callback = [callstate](const Status &s, FileSystem * fs) {
- callstate->set_value(std::make_tuple(s, fs));
- };
-
- New(io_service, options, server, service, callback);
-
- /* block until promise is set */
- auto returnstate = future.get();
-
- if (std::get<0>(returnstate).ok()) {
- return std::get<1>(returnstate);
- } else {
- return nullptr;
- }
+ IoService *&io_service, const Options &options) {
+ return new FileSystemImpl(io_service, options);
}
/*****************************************************************************
@@ -175,12 +144,15 @@ FileSystemImpl::~FileSystemImpl() {
void FileSystemImpl::Connect(const std::string &server,
const std::string &service,
- std::function<void(const Status &)> &&handler) {
+ const std::function<void(const Status &, FileSystem * fs)> &&handler) {
/* IoService::New can return nullptr */
if (!io_service_) {
- handler (Status::Error("Null IoService"));
+ handler (Status::Error("Null IoService"), this);
}
- nn_.Connect(server, service, handler);
+
+ nn_.Connect(server, service, [this, handler](const Status & s) {
+ handler(s, this);
+ });
}
Status FileSystemImpl::Connect(const std::string &server, const std::string &service) {
@@ -188,7 +160,8 @@ Status FileSystemImpl::Connect(const std::string &server, const std::string &ser
auto stat = std::make_shared<std::promise<Status>>();
std::future<Status> future = stat->get_future();
- auto callback = [stat](const Status &s) {
+ auto callback = [stat](const Status &s, FileSystem *fs) {
+ (void)fs;
stat->set_value(s);
};
@@ -247,4 +220,17 @@ Status FileSystemImpl::Open(const std::string &path,
return stat;
}
+void FileSystemImpl::WorkerDeleter::operator()(std::thread *t) {
+ // It is far too easy to destroy the filesystem (and thus the threadpool)
+ // from within one of the worker threads, leading to a deadlock. Let's
+ // provide some explicit protection.
+ if(t->get_id() == std::this_thread::get_id()) {
+ //TODO: When we get good logging support, add it in here
+ std::cerr << "FATAL: Attempted to destroy a thread pool from within a "
+ "callback of the thread pool.\n";
+ }
+ t->join();
+ delete t;
+}
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/522610d9/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h
index 772f93b..d78df81 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h
@@ -55,7 +55,7 @@ public:
void Connect(const std::string &server,
const std::string &service,
- std::function<void(const Status &)> &handler);
+ std::function<void(const Status &)> &&handler);
void GetBlockLocations(const std::string & path,
std::function<void(const Status &, std::shared_ptr<const struct FileInfo>)> handler);
@@ -85,9 +85,9 @@ public:
/* attempt to connect to namenode, return bad status on failure */
void Connect(const std::string &server, const std::string &service,
- std::function<void(const Status &)> &&handler);
+ const std::function<void(const Status &, FileSystem *)> &&handler) override;
/* attempt to connect to namenode, return bad status on failure */
- Status Connect(const std::string &server, const std::string &service);
+ Status Connect(const std::string &server, const std::string &service) override;
virtual void Open(const std::string &path,
@@ -116,10 +116,7 @@ private:
std::shared_ptr<BadDataNodeTracker> bad_node_tracker_;
struct WorkerDeleter {
- void operator()(std::thread *t) {
- t->join();
- delete t;
- }
+ void operator()(std::thread *t);
};
typedef std::unique_ptr<std::thread, WorkerDeleter> WorkerPtr;
std::vector<WorkerPtr> worker_threads_;