You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2017/05/24 21:20:46 UTC
arrow git commit: ARROW-424: [C++] Make ReadAt,
Write HDFS functions threadsafe
Repository: arrow
Updated Branches:
refs/heads/master 40b1b6431 -> 85ef000c5
ARROW-424: [C++] Make ReadAt, Write HDFS functions threadsafe
This also fixes the HDFS test suite to actually use libhdfs3 (it was not by accident)
Author: Wes McKinney <we...@twosigma.com>
Closes #712 from wesm/ARROW-424 and squashes the following commits:
0894719 [Wes McKinney] Make ReadAt, Write HDFS functions threadsafe
Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/85ef000c
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/85ef000c
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/85ef000c
Branch: refs/heads/master
Commit: 85ef000c57176df9d92b0b71a831d5c8f36b2949
Parents: 40b1b64
Author: Wes McKinney <we...@twosigma.com>
Authored: Wed May 24 17:20:40 2017 -0400
Committer: Wes McKinney <we...@twosigma.com>
Committed: Wed May 24 17:20:40 2017 -0400
----------------------------------------------------------------------
cpp/src/arrow/array.h | 1 -
cpp/src/arrow/io/hdfs.cc | 5 +++
cpp/src/arrow/io/io-hdfs-test.cc | 68 ++++++++++++++++++++++++++++++-----
3 files changed, 64 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/arrow/blob/85ef000c/cpp/src/arrow/array.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/array.h b/cpp/src/arrow/array.h
index 331c6c3..2c96ce0 100644
--- a/cpp/src/arrow/array.h
+++ b/cpp/src/arrow/array.h
@@ -508,7 +508,6 @@ ARROW_EXTERN_TEMPLATE NumericArray<Time32Type>;
ARROW_EXTERN_TEMPLATE NumericArray<Time64Type>;
ARROW_EXTERN_TEMPLATE NumericArray<TimestampType>;
-
/// \brief Perform any validation checks to determine obvious inconsistencies
/// with the array's internal data
///
http://git-wip-us.apache.org/repos/asf/arrow/blob/85ef000c/cpp/src/arrow/io/hdfs.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/hdfs.cc b/cpp/src/arrow/io/hdfs.cc
index a27e132..ba9c2c2 100644
--- a/cpp/src/arrow/io/hdfs.cc
+++ b/cpp/src/arrow/io/hdfs.cc
@@ -89,6 +89,9 @@ class HdfsAnyFileImpl {
LibHdfsShim* driver_;
+ // For threadsafety
+ std::mutex lock_;
+
// These are pointers in libhdfs, so OK to copy
hdfsFS fs_;
hdfsFile file_;
@@ -116,6 +119,7 @@ class HdfsReadableFile::HdfsReadableFileImpl : public HdfsAnyFileImpl {
ret = driver_->Pread(fs_, file_, static_cast<tOffset>(position),
reinterpret_cast<void*>(buffer), static_cast<tSize>(nbytes));
} else {
+ std::lock_guard<std::mutex> guard(lock_);
RETURN_NOT_OK(Seek(position));
return Read(nbytes, bytes_read, buffer);
}
@@ -253,6 +257,7 @@ class HdfsOutputStream::HdfsOutputStreamImpl : public HdfsAnyFileImpl {
}
Status Write(const uint8_t* buffer, int64_t nbytes, int64_t* bytes_written) {
+ std::lock_guard<std::mutex> guard(lock_);
tSize ret = driver_->Write(
fs_, file_, reinterpret_cast<const void*>(buffer), static_cast<tSize>(nbytes));
CHECK_FAILURE(ret, "Write");
http://git-wip-us.apache.org/repos/asf/arrow/blob/85ef000c/cpp/src/arrow/io/io-hdfs-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/io-hdfs-test.cc b/cpp/src/arrow/io/io-hdfs-test.cc
index 0fdb897..74f8042 100644
--- a/cpp/src/arrow/io/io-hdfs-test.cc
+++ b/cpp/src/arrow/io/io-hdfs-test.cc
@@ -19,6 +19,7 @@
#include <iostream>
#include <sstream>
#include <string>
+#include <thread>
#include "gtest/gtest.h"
@@ -38,6 +39,14 @@ std::vector<uint8_t> RandomData(int64_t size) {
return buffer;
}
+struct JNIDriver {
+ static HdfsDriver type;
+};
+
+struct PivotalDriver {
+ static HdfsDriver type;
+};
+
template <typename DRIVER>
class TestHdfsClient : public ::testing::Test {
public:
@@ -112,6 +121,7 @@ class TestHdfsClient : public ::testing::Test {
conf_.host = host == nullptr ? "localhost" : host;
conf_.user = user;
conf_.port = port == nullptr ? 20500 : atoi(port);
+ conf_.driver = DRIVER::type;
ASSERT_OK(HdfsClient::Connect(&conf_, &client_));
}
@@ -133,20 +143,19 @@ class TestHdfsClient : public ::testing::Test {
std::shared_ptr<HdfsClient> client_;
};
+template <>
+std::string TestHdfsClient<PivotalDriver>::HdfsAbsPath(const std::string& relpath) {
+ std::stringstream ss;
+ ss << relpath;
+ return ss.str();
+}
+
#define SKIP_IF_NO_DRIVER() \
if (!this->loaded_driver_) { \
std::cout << "Driver not loaded, skipping" << std::endl; \
return; \
}
-struct JNIDriver {
- static HdfsDriver type;
-};
-
-struct PivotalDriver {
- static HdfsDriver type;
-};
-
HdfsDriver JNIDriver::type = HdfsDriver::LIBHDFS;
HdfsDriver PivotalDriver::type = HdfsDriver::LIBHDFS3;
@@ -364,7 +373,6 @@ TYPED_TEST(TestHdfsClient, LargeFile) {
TYPED_TEST(TestHdfsClient, RenameFile) {
SKIP_IF_NO_DRIVER();
-
ASSERT_OK(this->MakeScratchDir());
auto src_path = this->ScratchPath("src-file");
@@ -380,5 +388,47 @@ TYPED_TEST(TestHdfsClient, RenameFile) {
ASSERT_TRUE(this->client_->Exists(dst_path));
}
+TYPED_TEST(TestHdfsClient, ThreadSafety) {
+ SKIP_IF_NO_DRIVER();
+ ASSERT_OK(this->MakeScratchDir());
+
+ auto src_path = this->ScratchPath("threadsafety");
+
+ std::string data = "foobar";
+ ASSERT_OK(this->WriteDummyFile(src_path, reinterpret_cast<const uint8_t*>(data.c_str()),
+ static_cast<int64_t>(data.size())));
+
+ std::shared_ptr<HdfsReadableFile> file;
+ ASSERT_OK(this->client_->OpenReadable(src_path, &file));
+
+ std::atomic<int> correct_count(0);
+ const int niter = 1000;
+
+ auto ReadData = [&file, &correct_count, &data, niter]() {
+ for (int i = 0; i < niter; ++i) {
+ std::shared_ptr<Buffer> buffer;
+ if (i % 2 == 0) {
+ ASSERT_OK(file->ReadAt(3, 3, &buffer));
+ if (0 == memcmp(data.c_str() + 3, buffer->data(), 3)) { correct_count += 1; }
+ } else {
+ ASSERT_OK(file->ReadAt(0, 4, &buffer));
+ if (0 == memcmp(data.c_str() + 0, buffer->data(), 4)) { correct_count += 1; }
+ }
+ }
+ };
+
+ std::thread thread1(ReadData);
+ std::thread thread2(ReadData);
+ std::thread thread3(ReadData);
+ std::thread thread4(ReadData);
+
+ thread1.join();
+ thread2.join();
+ thread3.join();
+ thread4.join();
+
+ ASSERT_EQ(niter * 4, correct_count);
+}
+
} // namespace io
} // namespace arrow