You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2019/08/15 16:45:39 UTC
[arrow] branch master updated: ARROW-6180: [C++][Parquet] Add
RandomAccessFile::GetStream that returns InputStream that reads a file
segment independent of the file's state,
fix concurrent buffered Parquet column reads
This is an automated email from the ASF dual-hosted git repository.
wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 2c808a2 ARROW-6180: [C++][Parquet] Add RandomAccessFile::GetStream that returns InputStream that reads a file segment independent of the file's state, fix concurrent buffered Parquet column reads
2c808a2 is described below
commit 2c808a2cbd62300a36d682ebd7bd25ad8b6cd500
Author: Wes McKinney <we...@apache.org>
AuthorDate: Thu Aug 15 11:45:24 2019 -0500
ARROW-6180: [C++][Parquet] Add RandomAccessFile::GetStream that returns InputStream that reads a file segment independent of the file's state, fix concurrent buffered Parquet column reads
This enables different functions to read portions of a `RandomAccessFile` as an InputStream without interfering with each other.
This also addresses PARQUET-1636 and adds a unit test for buffered column chunk reads. In the refactor to use the Arrow IO interfaces, I broke this by allowing the raw RandomAccessFile to be passed into multiple `BufferedInputStream` at once, so the file position was being manipulated by different column readers. We didn't catch the problem because we didn't have any unit tests, so this patch addresses that deficiency.
Closes #5085 from wesm/ARROW-6180 and squashes the following commits:
e4ad370d5 <Wes McKinney> Code review comments
2645bec64 <Wes McKinney> Add unit test that exhibits PARQUET-1636
76dc71c4f <Wes McKinney> stub
3eb0136d1 <Wes McKinney> Finish basic unit tests
4fd3d610a <Wes McKinney> Start implementation
Authored-by: Wes McKinney <we...@apache.org>
Signed-off-by: Wes McKinney <we...@apache.org>
---
cpp/src/arrow/io/interfaces.cc | 66 ++++++++++++++++++++++++++++
cpp/src/arrow/io/interfaces.h | 10 +++++
cpp/src/arrow/io/memory-test.cc | 67 ++++++++++++++++++++++++++++
cpp/src/arrow/testing/random.h | 33 +++++++-------
cpp/src/parquet/properties.cc | 7 ++-
cpp/src/parquet/properties.h | 2 +-
cpp/src/parquet/reader-test.cc | 96 +++++++++++++++++++++++++++++++++++++++++
7 files changed, 262 insertions(+), 19 deletions(-)
diff --git a/cpp/src/arrow/io/interfaces.cc b/cpp/src/arrow/io/interfaces.cc
index 06acb99..8c4f480 100644
--- a/cpp/src/arrow/io/interfaces.cc
+++ b/cpp/src/arrow/io/interfaces.cc
@@ -17,11 +17,15 @@
#include "arrow/io/interfaces.h"
+#include <algorithm>
#include <cstdint>
#include <memory>
#include <mutex>
+#include <utility>
+#include "arrow/buffer.h"
#include "arrow/status.h"
+#include "arrow/util/logging.h"
#include "arrow/util/string_view.h"
namespace arrow {
@@ -70,5 +74,67 @@ Status Writable::Write(const std::string& data) {
Status Writable::Flush() { return Status::OK(); }
+class FileSegmentReader : public InputStream {
+ public:
+ FileSegmentReader(std::shared_ptr<RandomAccessFile> file, int64_t file_offset,
+ int64_t nbytes)
+ : file_(std::move(file)),
+ closed_(false),
+ position_(0),
+ file_offset_(file_offset),
+ nbytes_(nbytes) {
+ FileInterface::set_mode(FileMode::READ);
+ }
+
+ Status CheckOpen() const {
+ if (closed_) {
+ return Status::IOError("Stream is closed");
+ }
+ return Status::OK();
+ }
+
+ Status Close() override {
+ closed_ = true;
+ return Status::OK();
+ }
+
+ Status Tell(int64_t* position) const override {
+ RETURN_NOT_OK(CheckOpen());
+ *position = position_;
+ return Status::OK();
+ }
+
+ bool closed() const override { return closed_; }
+
+ Status Read(int64_t nbytes, int64_t* bytes_read, void* out) override {
+ RETURN_NOT_OK(CheckOpen());
+ int64_t bytes_to_read = std::min(nbytes, nbytes_ - position_);
+ RETURN_NOT_OK(
+ file_->ReadAt(file_offset_ + position_, bytes_to_read, bytes_read, out));
+ position_ += *bytes_read;
+ return Status::OK();
+ }
+
+ Status Read(int64_t nbytes, std::shared_ptr<Buffer>* out) override {
+ RETURN_NOT_OK(CheckOpen());
+ int64_t bytes_to_read = std::min(nbytes, nbytes_ - position_);
+ RETURN_NOT_OK(file_->ReadAt(file_offset_ + position_, bytes_to_read, out));
+ position_ += (*out)->size();
+ return Status::OK();
+ }
+
+ private:
+ std::shared_ptr<RandomAccessFile> file_;
+ bool closed_;
+ int64_t position_;
+ int64_t file_offset_;
+ int64_t nbytes_;
+};
+
+std::shared_ptr<InputStream> RandomAccessFile::GetStream(
+ std::shared_ptr<RandomAccessFile> file, int64_t file_offset, int64_t nbytes) {
+ return std::make_shared<FileSegmentReader>(std::move(file), file_offset, nbytes);
+}
+
} // namespace io
} // namespace arrow
diff --git a/cpp/src/arrow/io/interfaces.h b/cpp/src/arrow/io/interfaces.h
index 678366b..95022e3 100644
--- a/cpp/src/arrow/io/interfaces.h
+++ b/cpp/src/arrow/io/interfaces.h
@@ -144,6 +144,16 @@ class ARROW_EXPORT RandomAccessFile : public InputStream, public Seekable {
/// Necessary because we hold a std::unique_ptr
~RandomAccessFile() override;
+ /// \brief Create an isolated InputStream that reads a segment of a
+ /// RandomAccessFile. Multiple such stream can be created and used
+ /// independently without interference
+ /// \param[in] file a file instance
+ /// \param[in] file_offset the starting position in the file
+ /// \param[in] nbytes the extent of bytes to read. The file should have
+ /// sufficient bytes available
+ static std::shared_ptr<InputStream> GetStream(std::shared_ptr<RandomAccessFile> file,
+ int64_t file_offset, int64_t nbytes);
+
virtual Status GetSize(int64_t* size) = 0;
/// \brief Read nbytes at position, provide default implementations using
diff --git a/cpp/src/arrow/io/memory-test.cc b/cpp/src/arrow/io/memory-test.cc
index 6fc0d05..d545520 100644
--- a/cpp/src/arrow/io/memory-test.cc
+++ b/cpp/src/arrow/io/memory-test.cc
@@ -218,6 +218,73 @@ TEST(TestBufferReader, RetainParentReference) {
ASSERT_EQ(0, std::memcmp(slice2->data(), data.c_str() + 4, 6));
}
+TEST(TestRandomAccessFile, GetStream) {
+ std::string data = "data1data2data3data4data5";
+
+ auto buf = std::make_shared<Buffer>(data);
+ auto file = std::make_shared<BufferReader>(buf);
+
+ std::shared_ptr<InputStream> stream1, stream2;
+
+ stream1 = RandomAccessFile::GetStream(file, 0, 10);
+ stream2 = RandomAccessFile::GetStream(file, 9, 16);
+
+ int64_t position = -1;
+ ASSERT_OK(stream1->Tell(&position));
+ ASSERT_EQ(0, position);
+
+ std::shared_ptr<Buffer> buf2;
+ uint8_t buf3[20];
+
+ int64_t bytes_read = -1;
+ ASSERT_OK(stream2->Read(4, &bytes_read, buf3));
+ ASSERT_EQ(4, bytes_read);
+ ASSERT_EQ(0, std::memcmp(buf3, "2dat", 4));
+ ASSERT_OK(stream2->Tell(&position));
+ ASSERT_EQ(4, position);
+
+ ASSERT_OK(stream1->Read(6, &bytes_read, buf3));
+ ASSERT_EQ(6, bytes_read);
+ ASSERT_EQ(0, std::memcmp(buf3, "data1d", 6));
+ ASSERT_OK(stream1->Tell(&position));
+ ASSERT_EQ(6, position);
+
+ ASSERT_OK(stream1->Read(2, &buf2));
+ ASSERT_TRUE(SliceBuffer(buf, 6, 2)->Equals(*buf2));
+
+ // Read to end of each stream
+ ASSERT_OK(stream1->Read(4, &bytes_read, buf3));
+ ASSERT_EQ(2, bytes_read);
+ ASSERT_EQ(0, std::memcmp(buf3, "a2", 2));
+ ASSERT_OK(stream1->Tell(&position));
+ ASSERT_EQ(10, position);
+
+ ASSERT_OK(stream1->Read(1, &bytes_read, buf3));
+ ASSERT_EQ(0, bytes_read);
+ ASSERT_OK(stream1->Tell(&position));
+ ASSERT_EQ(10, position);
+
+ // stream2 had its extent limited
+ ASSERT_OK(stream2->Read(20, &buf2));
+ ASSERT_TRUE(SliceBuffer(buf, 13, 12)->Equals(*buf2));
+
+ ASSERT_OK(stream2->Read(1, &buf2));
+ ASSERT_EQ(0, buf2->size());
+ ASSERT_OK(stream2->Tell(&position));
+ ASSERT_EQ(16, position);
+
+ ASSERT_OK(stream1->Close());
+
+ // idempotent
+ ASSERT_OK(stream1->Close());
+ ASSERT_TRUE(stream1->closed());
+
+ // Check whether closed
+ ASSERT_RAISES(IOError, stream1->Tell(&position));
+ ASSERT_RAISES(IOError, stream1->Read(1, &buf2));
+ ASSERT_RAISES(IOError, stream1->Read(1, &bytes_read, buf3));
+}
+
TEST(TestMemcopy, ParallelMemcopy) {
#if defined(ARROW_VALGRIND)
// Compensate for Valgrind's slowness
diff --git a/cpp/src/arrow/testing/random.h b/cpp/src/arrow/testing/random.h
index 75f6bdf..cb73e63 100644
--- a/cpp/src/arrow/testing/random.h
+++ b/cpp/src/arrow/testing/random.h
@@ -50,7 +50,7 @@ class ARROW_EXPORT RandomArrayGenerator {
///
/// \return a generated Array
std::shared_ptr<arrow::Array> Boolean(int64_t size, double probability,
- double null_probability);
+ double null_probability = 0);
/// \brief Generates a random UInt8Array
///
@@ -61,7 +61,7 @@ class ARROW_EXPORT RandomArrayGenerator {
///
/// \return a generated Array
std::shared_ptr<arrow::Array> UInt8(int64_t size, uint8_t min, uint8_t max,
- double null_probability);
+ double null_probability = 0);
/// \brief Generates a random Int8Array
///
@@ -72,7 +72,7 @@ class ARROW_EXPORT RandomArrayGenerator {
///
/// \return a generated Array
std::shared_ptr<arrow::Array> Int8(int64_t size, int8_t min, int8_t max,
- double null_probability);
+ double null_probability = 0);
/// \brief Generates a random UInt16Array
///
@@ -83,7 +83,7 @@ class ARROW_EXPORT RandomArrayGenerator {
///
/// \return a generated Array
std::shared_ptr<arrow::Array> UInt16(int64_t size, uint16_t min, uint16_t max,
- double null_probability);
+ double null_probability = 0);
/// \brief Generates a random Int16Array
///
@@ -94,7 +94,7 @@ class ARROW_EXPORT RandomArrayGenerator {
///
/// \return a generated Array
std::shared_ptr<arrow::Array> Int16(int64_t size, int16_t min, int16_t max,
- double null_probability);
+ double null_probability = 0);
/// \brief Generates a random UInt32Array
///
@@ -105,7 +105,7 @@ class ARROW_EXPORT RandomArrayGenerator {
///
/// \return a generated Array
std::shared_ptr<arrow::Array> UInt32(int64_t size, uint32_t min, uint32_t max,
- double null_probability);
+ double null_probability = 0);
/// \brief Generates a random Int32Array
///
@@ -116,7 +116,7 @@ class ARROW_EXPORT RandomArrayGenerator {
///
/// \return a generated Array
std::shared_ptr<arrow::Array> Int32(int64_t size, int32_t min, int32_t max,
- double null_probability);
+ double null_probability = 0);
/// \brief Generates a random UInt64Array
///
@@ -127,7 +127,7 @@ class ARROW_EXPORT RandomArrayGenerator {
///
/// \return a generated Array
std::shared_ptr<arrow::Array> UInt64(int64_t size, uint64_t min, uint64_t max,
- double null_probability);
+ double null_probability = 0);
/// \brief Generates a random Int64Array
///
@@ -138,7 +138,7 @@ class ARROW_EXPORT RandomArrayGenerator {
///
/// \return a generated Array
std::shared_ptr<arrow::Array> Int64(int64_t size, int64_t min, int64_t max,
- double null_probability);
+ double null_probability = 0);
/// \brief Generates a random FloatArray
///
@@ -149,7 +149,7 @@ class ARROW_EXPORT RandomArrayGenerator {
///
/// \return a generated Array
std::shared_ptr<arrow::Array> Float32(int64_t size, float min, float max,
- double null_probability);
+ double null_probability = 0);
/// \brief Generates a random DoubleArray
///
@@ -160,11 +160,11 @@ class ARROW_EXPORT RandomArrayGenerator {
///
/// \return a generated Array
std::shared_ptr<arrow::Array> Float64(int64_t size, double min, double max,
- double null_probability);
+ double null_probability = 0);
template <typename ArrowType, typename CType = typename ArrowType::c_type>
std::shared_ptr<arrow::Array> Numeric(int64_t size, CType min, CType max,
- double null_probability) {
+ double null_probability = 0) {
switch (ArrowType::type_id) {
case Type::UINT8:
return UInt8(size, static_cast<uint8_t>(min), static_cast<uint8_t>(max),
@@ -212,7 +212,7 @@ class ARROW_EXPORT RandomArrayGenerator {
///
/// \return a generated Array
std::shared_ptr<arrow::Array> String(int64_t size, int32_t min_length,
- int32_t max_length, double null_probability);
+ int32_t max_length, double null_probability = 0);
/// \brief Generates a random LargeStringArray
///
@@ -225,7 +225,8 @@ class ARROW_EXPORT RandomArrayGenerator {
///
/// \return a generated Array
std::shared_ptr<arrow::Array> LargeString(int64_t size, int32_t min_length,
- int32_t max_length, double null_probability);
+ int32_t max_length,
+ double null_probability = 0);
/// \brief Generates a random StringArray with repeated values
///
@@ -241,12 +242,12 @@ class ARROW_EXPORT RandomArrayGenerator {
/// \return a generated Array
std::shared_ptr<arrow::Array> StringWithRepeats(int64_t size, int64_t unique,
int32_t min_length, int32_t max_length,
- double null_probability);
+ double null_probability = 0);
/// \brief Like StringWithRepeats but return BinaryArray
std::shared_ptr<arrow::Array> BinaryWithRepeats(int64_t size, int64_t unique,
int32_t min_length, int32_t max_length,
- double null_probability);
+ double null_probability = 0);
SeedType seed() { return seed_distribution_(seed_rng_); }
diff --git a/cpp/src/parquet/properties.cc b/cpp/src/parquet/properties.cc
index e8ffad4..67b9ad4 100644
--- a/cpp/src/parquet/properties.cc
+++ b/cpp/src/parquet/properties.cc
@@ -26,10 +26,13 @@ namespace parquet {
std::shared_ptr<ArrowInputStream> ReaderProperties::GetStream(
std::shared_ptr<ArrowInputFile> source, int64_t start, int64_t num_bytes) {
if (buffered_stream_enabled_) {
+ // ARROW-6180 / PARQUET-1636 Create isolated reader that references segment
+ // of source
+ std::shared_ptr<::arrow::io::InputStream> safe_stream =
+ ::arrow::io::RandomAccessFile::GetStream(source, start, num_bytes);
std::shared_ptr<::arrow::io::BufferedInputStream> stream;
- PARQUET_THROW_NOT_OK(source->Seek(start));
PARQUET_THROW_NOT_OK(::arrow::io::BufferedInputStream::Create(
- buffer_size_, pool_, source, &stream, num_bytes));
+ buffer_size_, pool_, safe_stream, &stream, num_bytes));
return std::move(stream);
} else {
std::shared_ptr<Buffer> data;
diff --git a/cpp/src/parquet/properties.h b/cpp/src/parquet/properties.h
index 40a358c..b7e55f0 100644
--- a/cpp/src/parquet/properties.h
+++ b/cpp/src/parquet/properties.h
@@ -37,7 +37,7 @@ struct ParquetVersion {
enum type { PARQUET_1_0, PARQUET_2_0 };
};
-static int64_t DEFAULT_BUFFER_SIZE = 0;
+static int64_t DEFAULT_BUFFER_SIZE = 1024;
static bool DEFAULT_USE_BUFFERED_STREAM = false;
class PARQUET_EXPORT ReaderProperties {
diff --git a/cpp/src/parquet/reader-test.cc b/cpp/src/parquet/reader-test.cc
index ce4197f..2651bf3 100644
--- a/cpp/src/parquet/reader-test.cc
+++ b/cpp/src/parquet/reader-test.cc
@@ -23,11 +23,16 @@
#include <memory>
#include <string>
+#include "arrow/array.h"
+#include "arrow/buffer.h"
#include "arrow/io/file.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/testing/random.h"
#include "parquet/column_reader.h"
#include "parquet/column_scanner.h"
#include "parquet/file_reader.h"
+#include "parquet/file_writer.h"
#include "parquet/metadata.h"
#include "parquet/platform.h"
#include "parquet/printer.h"
@@ -35,6 +40,9 @@
namespace parquet {
+using schema::GroupNode;
+using schema::PrimitiveNode;
+
using ReadableFile = ::arrow::io::ReadableFile;
std::string data_file(const char* file) {
@@ -383,4 +391,92 @@ TEST(TestJSONWithLocalFile, JSONOutput) {
ASSERT_EQ(json_output, ss.str());
}
+TEST(TestFileReader, BufferedReads) {
+ // PARQUET-1636: Buffered reads were broken before introduction of
+ // RandomAccessFile::GetStream
+
+ const int num_columns = 10;
+ const int num_rows = 1000;
+
+ // Make schema
+ schema::NodeVector fields;
+ for (int i = 0; i < num_columns; ++i) {
+ fields.push_back(PrimitiveNode::Make("field" + std::to_string(i),
+ Repetition::REQUIRED, Type::DOUBLE,
+ ConvertedType::NONE));
+ }
+ auto schema = std::static_pointer_cast<GroupNode>(
+ GroupNode::Make("schema", Repetition::REQUIRED, fields));
+
+ // Write small batches and small data pages
+ std::shared_ptr<WriterProperties> writer_props =
+ WriterProperties::Builder().write_batch_size(64)->data_pagesize(128)->build();
+
+ std::shared_ptr<arrow::io::BufferOutputStream> out_file;
+ ASSERT_OK(arrow::io::BufferOutputStream::Create(1024, arrow::default_memory_pool(),
+ &out_file));
+ std::shared_ptr<ParquetFileWriter> file_writer =
+ ParquetFileWriter::Open(out_file, schema, writer_props);
+
+ RowGroupWriter* rg_writer = file_writer->AppendRowGroup();
+
+ std::vector<std::shared_ptr<arrow::Array>> column_data;
+ ::arrow::random::RandomArrayGenerator rag(0);
+
+ // Scratch space for reads
+ std::vector<std::shared_ptr<Buffer>> scratch_space;
+
+ // write columns
+ for (int col_index = 0; col_index < num_columns; ++col_index) {
+ DoubleWriter* writer = static_cast<DoubleWriter*>(rg_writer->NextColumn());
+ std::shared_ptr<arrow::Array> col = rag.Float64(num_rows, 0, 100);
+ const auto& col_typed = static_cast<const ::arrow::DoubleArray&>(*col);
+ writer->WriteBatch(num_rows, nullptr, nullptr, col_typed.raw_values());
+ column_data.push_back(col);
+
+ // We use this later for reading back the columns
+ scratch_space.push_back(
+ AllocateBuffer(::arrow::default_memory_pool(), num_rows * sizeof(double)));
+ }
+ rg_writer->Close();
+ file_writer->Close();
+
+ // Open the reader
+ std::shared_ptr<Buffer> file_buf;
+ ASSERT_OK(out_file->Finish(&file_buf));
+ auto in_file = std::make_shared<arrow::io::BufferReader>(file_buf);
+
+ ReaderProperties reader_props;
+ reader_props.enable_buffered_stream();
+ reader_props.set_buffer_size(64);
+ std::unique_ptr<ParquetFileReader> file_reader =
+ ParquetFileReader::Open(in_file, reader_props);
+
+ auto row_group = file_reader->RowGroup(0);
+ std::vector<std::shared_ptr<DoubleReader>> col_readers;
+ for (int col_index = 0; col_index < num_columns; ++col_index) {
+ col_readers.push_back(
+ std::static_pointer_cast<DoubleReader>(row_group->Column(col_index)));
+ }
+
+ for (int row_index = 0; row_index < num_rows; ++row_index) {
+ for (int col_index = 0; col_index < num_columns; ++col_index) {
+ double* out =
+ reinterpret_cast<double*>(scratch_space[col_index]->mutable_data()) + row_index;
+ int64_t values_read = 0;
+ int64_t levels_read =
+ col_readers[col_index]->ReadBatch(1, nullptr, nullptr, out, &values_read);
+
+ ASSERT_EQ(1, levels_read);
+ ASSERT_EQ(1, values_read);
+ }
+ }
+
+ // Check the results
+ for (int col_index = 0; col_index < num_columns; ++col_index) {
+ ASSERT_TRUE(
+ scratch_space[col_index]->Equals(*column_data[col_index]->data()->buffers[1]));
+ }
+}
+
} // namespace parquet