You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2016/12/06 16:41:14 UTC

arrow git commit: ARROW-406: [C++] Set explicit 64K HDFS buffer size, test large reads

Repository: arrow
Updated Branches:
  refs/heads/master 82575ca3c -> 4b72329fe


ARROW-406: [C++] Set explicit 64K HDFS buffer size, test large reads

We could not support reads in excess of the default buffer size (typically 64K)

Author: Wes McKinney <we...@twosigma.com>

Closes #226 from wesm/ARROW-406 and squashes the following commits:

d09b645 [Wes McKinney] cpplint
0028e90 [Wes McKinney] Set explicit 64K HDFS buffer size, test large reads using buffered chunks


Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/4b72329f
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/4b72329f
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/4b72329f

Branch: refs/heads/master
Commit: 4b72329fe2d731f445e44925783b9489f4e0d0d5
Parents: 82575ca
Author: Wes McKinney <we...@twosigma.com>
Authored: Tue Dec 6 11:41:08 2016 -0500
Committer: Wes McKinney <we...@twosigma.com>
Committed: Tue Dec 6 11:41:08 2016 -0500

----------------------------------------------------------------------
 cpp/src/arrow/io/hdfs.cc         | 33 ++++++++++++++++++++++++++-------
 cpp/src/arrow/io/hdfs.h          |  3 +++
 cpp/src/arrow/io/io-hdfs-test.cc | 33 +++++++++++++++++++++++++++++++++
 3 files changed, 62 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/arrow/blob/4b72329f/cpp/src/arrow/io/hdfs.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/hdfs.cc b/cpp/src/arrow/io/hdfs.cc
index 13491e7..8c6d49f 100644
--- a/cpp/src/arrow/io/hdfs.cc
+++ b/cpp/src/arrow/io/hdfs.cc
@@ -17,6 +17,7 @@
 
 #include <hdfs.h>
 
+#include <algorithm>
 #include <cstdint>
 #include <sstream>
 #include <string>
@@ -51,6 +52,8 @@ static Status CheckReadResult(int ret) {
   return Status::OK();
 }
 
+static constexpr int kDefaultHdfsBufferSize = 1 << 16;
+
 // ----------------------------------------------------------------------
 // File reading
 
@@ -124,9 +127,16 @@ class HdfsReadableFile::HdfsReadableFileImpl : public HdfsAnyFileImpl {
   }
 
   Status Read(int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) {
-    tSize ret = hdfsRead(fs_, file_, reinterpret_cast<void*>(buffer), nbytes);
-    RETURN_NOT_OK(CheckReadResult(ret));
-    *bytes_read = ret;
+    int64_t total_bytes = 0;
+    while (total_bytes < nbytes) {
+      tSize ret = hdfsRead(fs_, file_, reinterpret_cast<void*>(buffer + total_bytes),
+          std::min<int64_t>(buffer_size_, nbytes - total_bytes));
+      RETURN_NOT_OK(CheckReadResult(ret));
+      total_bytes += ret;
+      if (ret == 0) { break; }
+    }
+
+    *bytes_read = total_bytes;
     return Status::OK();
   }
 
@@ -136,7 +146,6 @@ class HdfsReadableFile::HdfsReadableFileImpl : public HdfsAnyFileImpl {
 
     int64_t bytes_read = 0;
     RETURN_NOT_OK(Read(nbytes, &bytes_read, buffer->mutable_data()));
-
     if (bytes_read < nbytes) { RETURN_NOT_OK(buffer->Resize(bytes_read)); }
 
     *out = buffer;
@@ -154,8 +163,11 @@ class HdfsReadableFile::HdfsReadableFileImpl : public HdfsAnyFileImpl {
 
   void set_memory_pool(MemoryPool* pool) { pool_ = pool; }
 
+  void set_buffer_size(int32_t buffer_size) { buffer_size_ = buffer_size; }
+
  private:
   MemoryPool* pool_;
+  int32_t buffer_size_;
 };
 
 HdfsReadableFile::HdfsReadableFile(MemoryPool* pool) {
@@ -384,8 +396,9 @@ class HdfsClient::HdfsClientImpl {
     return Status::OK();
   }
 
-  Status OpenReadable(const std::string& path, std::shared_ptr<HdfsReadableFile>* file) {
-    hdfsFile handle = hdfsOpenFile(fs_, path.c_str(), O_RDONLY, 0, 0, 0);
+  Status OpenReadable(const std::string& path, int32_t buffer_size,
+      std::shared_ptr<HdfsReadableFile>* file) {
+    hdfsFile handle = hdfsOpenFile(fs_, path.c_str(), O_RDONLY, buffer_size, 0, 0);
 
     if (handle == nullptr) {
       // TODO(wesm): determine cause of failure
@@ -397,6 +410,7 @@ class HdfsClient::HdfsClientImpl {
     // std::make_shared does not work with private ctors
     *file = std::shared_ptr<HdfsReadableFile>(new HdfsReadableFile());
     (*file)->impl_->set_members(path, fs_, handle);
+    (*file)->impl_->set_buffer_size(buffer_size);
 
     return Status::OK();
   }
@@ -490,9 +504,14 @@ Status HdfsClient::ListDirectory(
   return impl_->ListDirectory(path, listing);
 }
 
+Status HdfsClient::OpenReadable(const std::string& path, int32_t buffer_size,
+    std::shared_ptr<HdfsReadableFile>* file) {
+  return impl_->OpenReadable(path, buffer_size, file);
+}
+
 Status HdfsClient::OpenReadable(
     const std::string& path, std::shared_ptr<HdfsReadableFile>* file) {
-  return impl_->OpenReadable(path, file);
+  return OpenReadable(path, kDefaultHdfsBufferSize, file);
 }
 
 Status HdfsClient::OpenWriteable(const std::string& path, bool append,

http://git-wip-us.apache.org/repos/asf/arrow/blob/4b72329f/cpp/src/arrow/io/hdfs.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/hdfs.h b/cpp/src/arrow/io/hdfs.h
index 48699c9..1c76f15 100644
--- a/cpp/src/arrow/io/hdfs.h
+++ b/cpp/src/arrow/io/hdfs.h
@@ -128,6 +128,9 @@ class ARROW_EXPORT HdfsClient : public FileSystemClient {
   // status if the file is not found.
   //
   // @param path complete file path
+  Status OpenReadable(const std::string& path, int32_t buffer_size,
+      std::shared_ptr<HdfsReadableFile>* file);
+
   Status OpenReadable(const std::string& path, std::shared_ptr<HdfsReadableFile>* file);
 
   // FileMode::WRITE options

http://git-wip-us.apache.org/repos/asf/arrow/blob/4b72329f/cpp/src/arrow/io/io-hdfs-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/io-hdfs-test.cc b/cpp/src/arrow/io/io-hdfs-test.cc
index 7901932..8338de6 100644
--- a/cpp/src/arrow/io/io-hdfs-test.cc
+++ b/cpp/src/arrow/io/io-hdfs-test.cc
@@ -293,6 +293,39 @@ TEST_F(TestHdfsClient, ReadableMethods) {
   ASSERT_EQ(60, position);
 }
 
+TEST_F(TestHdfsClient, LargeFile) {
+  SKIP_IF_NO_LIBHDFS();
+
+  ASSERT_OK(MakeScratchDir());
+
+  auto path = ScratchPath("test-large-file");
+  const int size = 1000000;
+
+  std::vector<uint8_t> data = RandomData(size);
+  ASSERT_OK(WriteDummyFile(path, data.data(), size));
+
+  std::shared_ptr<HdfsReadableFile> file;
+  ASSERT_OK(client_->OpenReadable(path, &file));
+
+  auto buffer = std::make_shared<PoolBuffer>();
+  ASSERT_OK(buffer->Resize(size));
+  int64_t bytes_read = 0;
+
+  ASSERT_OK(file->Read(size, &bytes_read, buffer->mutable_data()));
+  ASSERT_EQ(0, std::memcmp(buffer->data(), data.data(), size));
+  ASSERT_EQ(size, bytes_read);
+
+  // explicit buffer size
+  std::shared_ptr<HdfsReadableFile> file2;
+  ASSERT_OK(client_->OpenReadable(path, 1 << 18, &file2));
+
+  auto buffer2 = std::make_shared<PoolBuffer>();
+  ASSERT_OK(buffer2->Resize(size));
+  ASSERT_OK(file2->Read(size, &bytes_read, buffer2->mutable_data()));
+  ASSERT_EQ(0, std::memcmp(buffer2->data(), data.data(), size));
+  ASSERT_EQ(size, bytes_read);
+}
+
 TEST_F(TestHdfsClient, RenameFile) {
   SKIP_IF_NO_LIBHDFS();