You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2016/12/06 16:41:14 UTC
arrow git commit: ARROW-406: [C++] Set explicit 64K HDFS buffer size,
test large reads
Repository: arrow
Updated Branches:
refs/heads/master 82575ca3c -> 4b72329fe
ARROW-406: [C++] Set explicit 64K HDFS buffer size, test large reads
We could not support reads in excess of the default buffer size (typically 64K)
Author: Wes McKinney <we...@twosigma.com>
Closes #226 from wesm/ARROW-406 and squashes the following commits:
d09b645 [Wes McKinney] cpplint
0028e90 [Wes McKinney] Set explicit 64K HDFS buffer size, test large reads using buffered chunks
Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/4b72329f
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/4b72329f
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/4b72329f
Branch: refs/heads/master
Commit: 4b72329fe2d731f445e44925783b9489f4e0d0d5
Parents: 82575ca
Author: Wes McKinney <we...@twosigma.com>
Authored: Tue Dec 6 11:41:08 2016 -0500
Committer: Wes McKinney <we...@twosigma.com>
Committed: Tue Dec 6 11:41:08 2016 -0500
----------------------------------------------------------------------
cpp/src/arrow/io/hdfs.cc | 33 ++++++++++++++++++++++++++-------
cpp/src/arrow/io/hdfs.h | 3 +++
cpp/src/arrow/io/io-hdfs-test.cc | 33 +++++++++++++++++++++++++++++++++
3 files changed, 62 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/arrow/blob/4b72329f/cpp/src/arrow/io/hdfs.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/hdfs.cc b/cpp/src/arrow/io/hdfs.cc
index 13491e7..8c6d49f 100644
--- a/cpp/src/arrow/io/hdfs.cc
+++ b/cpp/src/arrow/io/hdfs.cc
@@ -17,6 +17,7 @@
#include <hdfs.h>
+#include <algorithm>
#include <cstdint>
#include <sstream>
#include <string>
@@ -51,6 +52,8 @@ static Status CheckReadResult(int ret) {
return Status::OK();
}
+static constexpr int kDefaultHdfsBufferSize = 1 << 16;
+
// ----------------------------------------------------------------------
// File reading
@@ -124,9 +127,16 @@ class HdfsReadableFile::HdfsReadableFileImpl : public HdfsAnyFileImpl {
}
Status Read(int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) {
- tSize ret = hdfsRead(fs_, file_, reinterpret_cast<void*>(buffer), nbytes);
- RETURN_NOT_OK(CheckReadResult(ret));
- *bytes_read = ret;
+ int64_t total_bytes = 0;
+ while (total_bytes < nbytes) {
+ tSize ret = hdfsRead(fs_, file_, reinterpret_cast<void*>(buffer + total_bytes),
+ std::min<int64_t>(buffer_size_, nbytes - total_bytes));
+ RETURN_NOT_OK(CheckReadResult(ret));
+ total_bytes += ret;
+ if (ret == 0) { break; }
+ }
+
+ *bytes_read = total_bytes;
return Status::OK();
}
@@ -136,7 +146,6 @@ class HdfsReadableFile::HdfsReadableFileImpl : public HdfsAnyFileImpl {
int64_t bytes_read = 0;
RETURN_NOT_OK(Read(nbytes, &bytes_read, buffer->mutable_data()));
-
if (bytes_read < nbytes) { RETURN_NOT_OK(buffer->Resize(bytes_read)); }
*out = buffer;
@@ -154,8 +163,11 @@ class HdfsReadableFile::HdfsReadableFileImpl : public HdfsAnyFileImpl {
void set_memory_pool(MemoryPool* pool) { pool_ = pool; }
+ void set_buffer_size(int32_t buffer_size) { buffer_size_ = buffer_size; }
+
private:
MemoryPool* pool_;
+ int32_t buffer_size_;
};
HdfsReadableFile::HdfsReadableFile(MemoryPool* pool) {
@@ -384,8 +396,9 @@ class HdfsClient::HdfsClientImpl {
return Status::OK();
}
- Status OpenReadable(const std::string& path, std::shared_ptr<HdfsReadableFile>* file) {
- hdfsFile handle = hdfsOpenFile(fs_, path.c_str(), O_RDONLY, 0, 0, 0);
+ Status OpenReadable(const std::string& path, int32_t buffer_size,
+ std::shared_ptr<HdfsReadableFile>* file) {
+ hdfsFile handle = hdfsOpenFile(fs_, path.c_str(), O_RDONLY, buffer_size, 0, 0);
if (handle == nullptr) {
// TODO(wesm): determine cause of failure
@@ -397,6 +410,7 @@ class HdfsClient::HdfsClientImpl {
// std::make_shared does not work with private ctors
*file = std::shared_ptr<HdfsReadableFile>(new HdfsReadableFile());
(*file)->impl_->set_members(path, fs_, handle);
+ (*file)->impl_->set_buffer_size(buffer_size);
return Status::OK();
}
@@ -490,9 +504,14 @@ Status HdfsClient::ListDirectory(
return impl_->ListDirectory(path, listing);
}
+Status HdfsClient::OpenReadable(const std::string& path, int32_t buffer_size,
+ std::shared_ptr<HdfsReadableFile>* file) {
+ return impl_->OpenReadable(path, buffer_size, file);
+}
+
Status HdfsClient::OpenReadable(
const std::string& path, std::shared_ptr<HdfsReadableFile>* file) {
- return impl_->OpenReadable(path, file);
+ return OpenReadable(path, kDefaultHdfsBufferSize, file);
}
Status HdfsClient::OpenWriteable(const std::string& path, bool append,
http://git-wip-us.apache.org/repos/asf/arrow/blob/4b72329f/cpp/src/arrow/io/hdfs.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/hdfs.h b/cpp/src/arrow/io/hdfs.h
index 48699c9..1c76f15 100644
--- a/cpp/src/arrow/io/hdfs.h
+++ b/cpp/src/arrow/io/hdfs.h
@@ -128,6 +128,9 @@ class ARROW_EXPORT HdfsClient : public FileSystemClient {
// status if the file is not found.
//
// @param path complete file path
+ Status OpenReadable(const std::string& path, int32_t buffer_size,
+ std::shared_ptr<HdfsReadableFile>* file);
+
Status OpenReadable(const std::string& path, std::shared_ptr<HdfsReadableFile>* file);
// FileMode::WRITE options
http://git-wip-us.apache.org/repos/asf/arrow/blob/4b72329f/cpp/src/arrow/io/io-hdfs-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/io-hdfs-test.cc b/cpp/src/arrow/io/io-hdfs-test.cc
index 7901932..8338de6 100644
--- a/cpp/src/arrow/io/io-hdfs-test.cc
+++ b/cpp/src/arrow/io/io-hdfs-test.cc
@@ -293,6 +293,39 @@ TEST_F(TestHdfsClient, ReadableMethods) {
ASSERT_EQ(60, position);
}
+TEST_F(TestHdfsClient, LargeFile) {
+ SKIP_IF_NO_LIBHDFS();
+
+ ASSERT_OK(MakeScratchDir());
+
+ auto path = ScratchPath("test-large-file");
+ const int size = 1000000;
+
+ std::vector<uint8_t> data = RandomData(size);
+ ASSERT_OK(WriteDummyFile(path, data.data(), size));
+
+ std::shared_ptr<HdfsReadableFile> file;
+ ASSERT_OK(client_->OpenReadable(path, &file));
+
+ auto buffer = std::make_shared<PoolBuffer>();
+ ASSERT_OK(buffer->Resize(size));
+ int64_t bytes_read = 0;
+
+ ASSERT_OK(file->Read(size, &bytes_read, buffer->mutable_data()));
+ ASSERT_EQ(0, std::memcmp(buffer->data(), data.data(), size));
+ ASSERT_EQ(size, bytes_read);
+
+ // explicit buffer size
+ std::shared_ptr<HdfsReadableFile> file2;
+ ASSERT_OK(client_->OpenReadable(path, 1 << 18, &file2));
+
+ auto buffer2 = std::make_shared<PoolBuffer>();
+ ASSERT_OK(buffer2->Resize(size));
+ ASSERT_OK(file2->Read(size, &bytes_read, buffer2->mutable_data()));
+ ASSERT_EQ(0, std::memcmp(buffer2->data(), data.data(), size));
+ ASSERT_EQ(size, bytes_read);
+}
+
TEST_F(TestHdfsClient, RenameFile) {
SKIP_IF_NO_LIBHDFS();