You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by jh...@apache.org on 2015/12/16 16:57:11 UTC

hadoop git commit: HDFS-9524. libhdfs++ deadlocks in Filesystem::New if NN connection fails. Contributed by Bob Hansen.

Repository: hadoop
Updated Branches:
  refs/heads/HDFS-8707 0cf5e66ab -> 522610d95


HDFS-9524.  libhdfs++ deadlocks in Filesystem::New if NN connection fails.  Contributed by Bob Hansen.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/522610d9
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/522610d9
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/522610d9

Branch: refs/heads/HDFS-8707
Commit: 522610d95c202bc139d9ad132205f0609011d337
Parents: 0cf5e66
Author: James <jh...@apache.org>
Authored: Wed Dec 16 10:56:21 2015 -0500
Committer: James <jh...@apache.org>
Committed: Wed Dec 16 10:56:21 2015 -0500

----------------------------------------------------------------------
 .../libhdfs-tests/test_libhdfs_threaded.c       |  4 ++
 .../native/libhdfspp/include/libhdfspp/hdfs.h   | 19 +++---
 .../native/libhdfspp/lib/bindings/c/hdfs.cc     |  8 ++-
 .../main/native/libhdfspp/lib/fs/filesystem.cc  | 64 ++++++++------------
 .../main/native/libhdfspp/lib/fs/filesystem.h   | 11 ++--
 5 files changed, 52 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/522610d9/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/test_libhdfs_threaded.c
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/test_libhdfs_threaded.c b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/test_libhdfs_threaded.c
index 032acbf..a922ff8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/test_libhdfs_threaded.c
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs-tests/test_libhdfs_threaded.c
@@ -266,6 +266,8 @@ static int testHdfsOperationsImpl(struct tlhThreadInfo *ti)
     fprintf(stderr, "testHdfsOperations(threadIdx=%d): starting\n",
         ti->threadIdx);
     EXPECT_ZERO(hdfsSingleNameNodeConnect(tlhCluster, &fs, NULL));
+    if (!fs)
+        return 1;
     EXPECT_ZERO(setupPaths(ti, &paths));
     // test some operations
     EXPECT_ZERO(doTestHdfsOperations(ti, fs, &paths));
@@ -276,6 +278,8 @@ static int testHdfsOperationsImpl(struct tlhThreadInfo *ti)
     EXPECT_ZERO(hdfsDisconnect(fs));
     // reconnect to do the final delete.
     EXPECT_ZERO(hdfsSingleNameNodeConnect(tlhCluster, &fs, NULL));
+    if (!fs)
+        return 1;
     EXPECT_ZERO(hdfsDelete(fs, paths.prefix, 1));
     EXPECT_ZERO(hdfsDisconnect(fs));
     return 0;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/522610d9/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/libhdfspp/hdfs.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/libhdfspp/hdfs.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/libhdfspp/hdfs.h
index dfff20b..7dc3f88 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/libhdfspp/hdfs.h
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/libhdfspp/hdfs.h
@@ -115,15 +115,16 @@ class FileSystem {
    * initializes the RPC connections to the NameNode and returns an
    * FileSystem object.
    **/
-  static void New(
-      IoService *io_service, const Options &options, const std::string &server,
+  static FileSystem * New(
+      IoService *&io_service, const Options &options);
+
+  virtual void Connect(const std::string &server,
       const std::string &service,
-      const std::function<void(const Status &, FileSystem *)> &handler);
+      const std::function<void(const Status &, FileSystem *)> &&handler) = 0;
 
-  /* Synchronous call of New*/
-  static FileSystem *
-  New(IoService *io_service, const Options &options, const std::string &server,
-      const std::string &service);
+  /* Synchronous call of Connect */
+  virtual Status Connect(const std::string &server,
+      const std::string &service) = 0;
 
   /**
    * Open a file on HDFS. The call issues an RPC to the NameNode to
@@ -135,6 +136,10 @@ class FileSystem {
        const std::function<void(const Status &, FileHandle *)> &handler) = 0;
   virtual Status Open(const std::string &path, FileHandle **handle) = 0;
 
+  /**
+   * Note that it is an error to destroy the filesystem from within a filesystem
+   * callback.  It will lead to a deadlock and the termination of the process.
+   */
   virtual ~FileSystem() {};
 
 };

http://git-wip-us.apache.org/repos/asf/hadoop/blob/522610d9/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc
index 802b3ea..d23c7b0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc
@@ -112,8 +112,12 @@ int hdfsFileIsOpenForRead(hdfsFile file) {
 hdfsFS hdfsConnect(const char *nn, tPort port) {
   std::string port_as_string = std::to_string(port);
   IoService * io_service = IoService::New();
-  FileSystem *fs = FileSystem::New(io_service, Options(), nn, port_as_string);
+  FileSystem *fs = FileSystem::New(io_service, Options());
   if (!fs) {
+    return nullptr;
+  }
+
+  if (!fs->Connect(nn, port_as_string).ok()) {
     ReportError(ENODEV, "Unable to connect to NameNode.");
 
     // FileSystem's ctor might take ownership of the io_service; if it does,
@@ -121,6 +125,8 @@ hdfsFS hdfsConnect(const char *nn, tPort port) {
     if (io_service)
       delete io_service;
 
+    delete fs;
+
     return nullptr;
   }
   return new hdfs_internal(fs);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/522610d9/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc
index 1808b85..1a1163b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc
@@ -26,6 +26,7 @@
 #include <limits>
 #include <future>
 #include <tuple>
+#include <iostream>
 
 namespace hdfs {
 
@@ -41,7 +42,7 @@ using ::asio::ip::tcp;
 
 void NameNodeOperations::Connect(const std::string &server,
                              const std::string &service,
-                             std::function<void(const Status &)> &handler) {
+                             std::function<void(const Status &)> &&handler) {
   using namespace asio_continuation;
   typedef std::vector<tcp::endpoint> State;
   auto m = Pipeline<State>::Create();
@@ -106,41 +107,9 @@ void NameNodeOperations::GetBlockLocations(const std::string & path,
  *                    FILESYSTEM BASE CLASS
  ****************************************************************************/
 
-void FileSystem::New(
-    IoService *io_service, const Options &options, const std::string &server,
-    const std::string &service,
-    const std::function<void(const Status &, FileSystem *)> &handler) {
-  FileSystemImpl *impl = new FileSystemImpl(io_service, options);
-  impl->Connect(server, service, [impl, handler](const Status &stat) {
-    if (stat.ok()) {
-      handler(stat, impl);
-    } else {
-      delete impl;
-      handler(stat, nullptr);
-    }
-  });
-}
-
 FileSystem * FileSystem::New(
-    IoService *io_service, const Options &options, const std::string &server,
-    const std::string &service) {
-  auto callstate = std::make_shared<std::promise<std::tuple<Status, FileSystem *>>>();
-  std::future<std::tuple<Status, FileSystem *>> future(callstate->get_future());
-
-  auto callback = [callstate](const Status &s, FileSystem * fs) {
-    callstate->set_value(std::make_tuple(s, fs));
-  };
-
-  New(io_service, options, server, service, callback);
-
-  /* block until promise is set */
-  auto returnstate = future.get();
-
-  if (std::get<0>(returnstate).ok()) {
-    return std::get<1>(returnstate);
-  } else {
-    return nullptr;
-  }
+    IoService *&io_service, const Options &options) {
+  return new FileSystemImpl(io_service, options);
 }
 
 /*****************************************************************************
@@ -175,12 +144,15 @@ FileSystemImpl::~FileSystemImpl() {
 
 void FileSystemImpl::Connect(const std::string &server,
                              const std::string &service,
-                             std::function<void(const Status &)> &&handler) {
+                             const std::function<void(const Status &, FileSystem * fs)> &&handler) {
   /* IoService::New can return nullptr */
   if (!io_service_) {
-    handler (Status::Error("Null IoService"));
+    handler (Status::Error("Null IoService"), this);
   }
-  nn_.Connect(server, service, handler);
+
+  nn_.Connect(server, service, [this, handler](const Status & s) {
+    handler(s, this);
+  });
 }
 
 Status FileSystemImpl::Connect(const std::string &server, const std::string &service) {
@@ -188,7 +160,8 @@ Status FileSystemImpl::Connect(const std::string &server, const std::string &ser
   auto stat = std::make_shared<std::promise<Status>>();
   std::future<Status> future = stat->get_future();
 
-  auto callback = [stat](const Status &s) {
+  auto callback = [stat](const Status &s, FileSystem *fs) {
+    (void)fs;
     stat->set_value(s);
   };
 
@@ -247,4 +220,17 @@ Status FileSystemImpl::Open(const std::string &path,
   return stat;
 }
 
+void FileSystemImpl::WorkerDeleter::operator()(std::thread *t) {
+  // It is far too easy to destroy the filesystem (and thus the threadpool)
+  //     from within one of the worker threads, leading to a deadlock.  Let's
+  //     provide some explicit protection.
+  if(t->get_id() == std::this_thread::get_id()) {
+    //TODO: When we get good logging support, add it in here
+    std::cerr << "FATAL: Attempted to destroy a thread pool from within a "
+                 "callback of the thread pool.\n";
+  }
+  t->join();
+  delete t;
+}
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/522610d9/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h
index 772f93b..d78df81 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h
+++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h
@@ -55,7 +55,7 @@ public:
 
   void Connect(const std::string &server,
                const std::string &service,
-               std::function<void(const Status &)> &handler);
+               std::function<void(const Status &)> &&handler);
 
   void GetBlockLocations(const std::string & path,
     std::function<void(const Status &, std::shared_ptr<const struct FileInfo>)> handler);
@@ -85,9 +85,9 @@ public:
 
   /* attempt to connect to namenode, return bad status on failure */
   void Connect(const std::string &server, const std::string &service,
-               std::function<void(const Status &)> &&handler);
+               const std::function<void(const Status &, FileSystem *)> &&handler) override;
   /* attempt to connect to namenode, return bad status on failure */
-  Status Connect(const std::string &server, const std::string &service);
+  Status Connect(const std::string &server, const std::string &service) override;
 
 
   virtual void Open(const std::string &path,
@@ -116,10 +116,7 @@ private:
   std::shared_ptr<BadDataNodeTracker> bad_node_tracker_;
 
   struct WorkerDeleter {
-    void operator()(std::thread *t) {
-      t->join();
-      delete t;
-    }
+    void operator()(std::thread *t);
   };
   typedef std::unique_ptr<std::thread, WorkerDeleter> WorkerPtr;
   std::vector<WorkerPtr> worker_threads_;