You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2017/05/24 21:20:46 UTC

arrow git commit: ARROW-424: [C++] Make ReadAt, Write HDFS functions threadsafe

Repository: arrow
Updated Branches:
  refs/heads/master 40b1b6431 -> 85ef000c5


ARROW-424: [C++] Make ReadAt, Write HDFS functions threadsafe

This also fixes the HDFS test suite to actually use libhdfs3 (it was not by accident)

Author: Wes McKinney <we...@twosigma.com>

Closes #712 from wesm/ARROW-424 and squashes the following commits:

0894719 [Wes McKinney] Make ReadAt, Write HDFS functions threadsafe


Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/85ef000c
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/85ef000c
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/85ef000c

Branch: refs/heads/master
Commit: 85ef000c57176df9d92b0b71a831d5c8f36b2949
Parents: 40b1b64
Author: Wes McKinney <we...@twosigma.com>
Authored: Wed May 24 17:20:40 2017 -0400
Committer: Wes McKinney <we...@twosigma.com>
Committed: Wed May 24 17:20:40 2017 -0400

----------------------------------------------------------------------
 cpp/src/arrow/array.h            |  1 -
 cpp/src/arrow/io/hdfs.cc         |  5 +++
 cpp/src/arrow/io/io-hdfs-test.cc | 68 ++++++++++++++++++++++++++++++-----
 3 files changed, 64 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/arrow/blob/85ef000c/cpp/src/arrow/array.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/array.h b/cpp/src/arrow/array.h
index 331c6c3..2c96ce0 100644
--- a/cpp/src/arrow/array.h
+++ b/cpp/src/arrow/array.h
@@ -508,7 +508,6 @@ ARROW_EXTERN_TEMPLATE NumericArray<Time32Type>;
 ARROW_EXTERN_TEMPLATE NumericArray<Time64Type>;
 ARROW_EXTERN_TEMPLATE NumericArray<TimestampType>;
 
-
 /// \brief Perform any validation checks to determine obvious inconsistencies
 /// with the array's internal data
 ///

http://git-wip-us.apache.org/repos/asf/arrow/blob/85ef000c/cpp/src/arrow/io/hdfs.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/hdfs.cc b/cpp/src/arrow/io/hdfs.cc
index a27e132..ba9c2c2 100644
--- a/cpp/src/arrow/io/hdfs.cc
+++ b/cpp/src/arrow/io/hdfs.cc
@@ -89,6 +89,9 @@ class HdfsAnyFileImpl {
 
   LibHdfsShim* driver_;
 
+  // For threadsafety
+  std::mutex lock_;
+
   // These are pointers in libhdfs, so OK to copy
   hdfsFS fs_;
   hdfsFile file_;
@@ -116,6 +119,7 @@ class HdfsReadableFile::HdfsReadableFileImpl : public HdfsAnyFileImpl {
       ret = driver_->Pread(fs_, file_, static_cast<tOffset>(position),
           reinterpret_cast<void*>(buffer), static_cast<tSize>(nbytes));
     } else {
+      std::lock_guard<std::mutex> guard(lock_);
       RETURN_NOT_OK(Seek(position));
       return Read(nbytes, bytes_read, buffer);
     }
@@ -253,6 +257,7 @@ class HdfsOutputStream::HdfsOutputStreamImpl : public HdfsAnyFileImpl {
   }
 
   Status Write(const uint8_t* buffer, int64_t nbytes, int64_t* bytes_written) {
+    std::lock_guard<std::mutex> guard(lock_);
     tSize ret = driver_->Write(
         fs_, file_, reinterpret_cast<const void*>(buffer), static_cast<tSize>(nbytes));
     CHECK_FAILURE(ret, "Write");

http://git-wip-us.apache.org/repos/asf/arrow/blob/85ef000c/cpp/src/arrow/io/io-hdfs-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/io-hdfs-test.cc b/cpp/src/arrow/io/io-hdfs-test.cc
index 0fdb897..74f8042 100644
--- a/cpp/src/arrow/io/io-hdfs-test.cc
+++ b/cpp/src/arrow/io/io-hdfs-test.cc
@@ -19,6 +19,7 @@
 #include <iostream>
 #include <sstream>
 #include <string>
+#include <thread>
 
 #include "gtest/gtest.h"
 
@@ -38,6 +39,14 @@ std::vector<uint8_t> RandomData(int64_t size) {
   return buffer;
 }
 
+struct JNIDriver {
+  static HdfsDriver type;
+};
+
+struct PivotalDriver {
+  static HdfsDriver type;
+};
+
 template <typename DRIVER>
 class TestHdfsClient : public ::testing::Test {
  public:
@@ -112,6 +121,7 @@ class TestHdfsClient : public ::testing::Test {
     conf_.host = host == nullptr ? "localhost" : host;
     conf_.user = user;
     conf_.port = port == nullptr ? 20500 : atoi(port);
+    conf_.driver = DRIVER::type;
 
     ASSERT_OK(HdfsClient::Connect(&conf_, &client_));
   }
@@ -133,20 +143,19 @@ class TestHdfsClient : public ::testing::Test {
   std::shared_ptr<HdfsClient> client_;
 };
 
+template <>
+std::string TestHdfsClient<PivotalDriver>::HdfsAbsPath(const std::string& relpath) {
+  std::stringstream ss;
+  ss << relpath;
+  return ss.str();
+}
+
 #define SKIP_IF_NO_DRIVER()                                  \
   if (!this->loaded_driver_) {                               \
     std::cout << "Driver not loaded, skipping" << std::endl; \
     return;                                                  \
   }
 
-struct JNIDriver {
-  static HdfsDriver type;
-};
-
-struct PivotalDriver {
-  static HdfsDriver type;
-};
-
 HdfsDriver JNIDriver::type = HdfsDriver::LIBHDFS;
 HdfsDriver PivotalDriver::type = HdfsDriver::LIBHDFS3;
 
@@ -364,7 +373,6 @@ TYPED_TEST(TestHdfsClient, LargeFile) {
 
 TYPED_TEST(TestHdfsClient, RenameFile) {
   SKIP_IF_NO_DRIVER();
-
   ASSERT_OK(this->MakeScratchDir());
 
   auto src_path = this->ScratchPath("src-file");
@@ -380,5 +388,47 @@ TYPED_TEST(TestHdfsClient, RenameFile) {
   ASSERT_TRUE(this->client_->Exists(dst_path));
 }
 
+TYPED_TEST(TestHdfsClient, ThreadSafety) {
+  SKIP_IF_NO_DRIVER();
+  ASSERT_OK(this->MakeScratchDir());
+
+  auto src_path = this->ScratchPath("threadsafety");
+
+  std::string data = "foobar";
+  ASSERT_OK(this->WriteDummyFile(src_path, reinterpret_cast<const uint8_t*>(data.c_str()),
+      static_cast<int64_t>(data.size())));
+
+  std::shared_ptr<HdfsReadableFile> file;
+  ASSERT_OK(this->client_->OpenReadable(src_path, &file));
+
+  std::atomic<int> correct_count(0);
+  const int niter = 1000;
+
+  auto ReadData = [&file, &correct_count, &data, niter]() {
+    for (int i = 0; i < niter; ++i) {
+      std::shared_ptr<Buffer> buffer;
+      if (i % 2 == 0) {
+        ASSERT_OK(file->ReadAt(3, 3, &buffer));
+        if (0 == memcmp(data.c_str() + 3, buffer->data(), 3)) { correct_count += 1; }
+      } else {
+        ASSERT_OK(file->ReadAt(0, 4, &buffer));
+        if (0 == memcmp(data.c_str() + 0, buffer->data(), 4)) { correct_count += 1; }
+      }
+    }
+  };
+
+  std::thread thread1(ReadData);
+  std::thread thread2(ReadData);
+  std::thread thread3(ReadData);
+  std::thread thread4(ReadData);
+
+  thread1.join();
+  thread2.join();
+  thread3.join();
+  thread4.join();
+
+  ASSERT_EQ(niter * 4, correct_count);
+}
+
 }  // namespace io
 }  // namespace arrow