You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2019/08/15 16:45:39 UTC

[arrow] branch master updated: ARROW-6180: [C++][Parquet] Add RandomAccessFile::GetStream that returns InputStream that reads a file segment independent of the file's state, fix concurrent buffered Parquet column reads

This is an automated email from the ASF dual-hosted git repository.

wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 2c808a2  ARROW-6180: [C++][Parquet] Add RandomAccessFile::GetStream that returns InputStream that reads a file segment independent of the file's state, fix concurrent buffered Parquet column reads
2c808a2 is described below

commit 2c808a2cbd62300a36d682ebd7bd25ad8b6cd500
Author: Wes McKinney <we...@apache.org>
AuthorDate: Thu Aug 15 11:45:24 2019 -0500

    ARROW-6180: [C++][Parquet] Add RandomAccessFile::GetStream that returns InputStream that reads a file segment independent of the file's state, fix concurrent buffered Parquet column reads
    
    This enables different functions to read portions of a `RandomAccessFile` as an InputStream without interfering with each other.
    
    This also addresses PARQUET-1636 and adds a unit test for buffered column chunk reads. In the refactor to use the Arrow IO interfaces, I broke this by allowing the raw RandomAccessFile to be passed into multiple `BufferedInputStream` at once, so the file position was being manipulated by different column readers. We didn't catch the problem because we didn't have any unit tests, so this patch addresses that deficiency.
    
    Closes #5085 from wesm/ARROW-6180 and squashes the following commits:
    
    e4ad370d5 <Wes McKinney> Code review comments
    2645bec64 <Wes McKinney> Add unit test that exhibits PARQUET-1636
    76dc71c4f <Wes McKinney> stub
    3eb0136d1 <Wes McKinney> Finish basic unit tests
    4fd3d610a <Wes McKinney> Start implementation
    
    Authored-by: Wes McKinney <we...@apache.org>
    Signed-off-by: Wes McKinney <we...@apache.org>
---
 cpp/src/arrow/io/interfaces.cc  | 66 ++++++++++++++++++++++++++++
 cpp/src/arrow/io/interfaces.h   | 10 +++++
 cpp/src/arrow/io/memory-test.cc | 67 ++++++++++++++++++++++++++++
 cpp/src/arrow/testing/random.h  | 33 +++++++-------
 cpp/src/parquet/properties.cc   |  7 ++-
 cpp/src/parquet/properties.h    |  2 +-
 cpp/src/parquet/reader-test.cc  | 96 +++++++++++++++++++++++++++++++++++++++++
 7 files changed, 262 insertions(+), 19 deletions(-)

diff --git a/cpp/src/arrow/io/interfaces.cc b/cpp/src/arrow/io/interfaces.cc
index 06acb99..8c4f480 100644
--- a/cpp/src/arrow/io/interfaces.cc
+++ b/cpp/src/arrow/io/interfaces.cc
@@ -17,11 +17,15 @@
 
 #include "arrow/io/interfaces.h"
 
+#include <algorithm>
 #include <cstdint>
 #include <memory>
 #include <mutex>
+#include <utility>
 
+#include "arrow/buffer.h"
 #include "arrow/status.h"
+#include "arrow/util/logging.h"
 #include "arrow/util/string_view.h"
 
 namespace arrow {
@@ -70,5 +74,67 @@ Status Writable::Write(const std::string& data) {
 
 Status Writable::Flush() { return Status::OK(); }
 
+class FileSegmentReader : public InputStream {
+ public:
+  FileSegmentReader(std::shared_ptr<RandomAccessFile> file, int64_t file_offset,
+                    int64_t nbytes)
+      : file_(std::move(file)),
+        closed_(false),
+        position_(0),
+        file_offset_(file_offset),
+        nbytes_(nbytes) {
+    FileInterface::set_mode(FileMode::READ);
+  }
+
+  Status CheckOpen() const {
+    if (closed_) {
+      return Status::IOError("Stream is closed");
+    }
+    return Status::OK();
+  }
+
+  Status Close() override {
+    closed_ = true;
+    return Status::OK();
+  }
+
+  Status Tell(int64_t* position) const override {
+    RETURN_NOT_OK(CheckOpen());
+    *position = position_;
+    return Status::OK();
+  }
+
+  bool closed() const override { return closed_; }
+
+  Status Read(int64_t nbytes, int64_t* bytes_read, void* out) override {
+    RETURN_NOT_OK(CheckOpen());
+    int64_t bytes_to_read = std::min(nbytes, nbytes_ - position_);
+    RETURN_NOT_OK(
+        file_->ReadAt(file_offset_ + position_, bytes_to_read, bytes_read, out));
+    position_ += *bytes_read;
+    return Status::OK();
+  }
+
+  Status Read(int64_t nbytes, std::shared_ptr<Buffer>* out) override {
+    RETURN_NOT_OK(CheckOpen());
+    int64_t bytes_to_read = std::min(nbytes, nbytes_ - position_);
+    RETURN_NOT_OK(file_->ReadAt(file_offset_ + position_, bytes_to_read, out));
+    position_ += (*out)->size();
+    return Status::OK();
+  }
+
+ private:
+  std::shared_ptr<RandomAccessFile> file_;
+  bool closed_;
+  int64_t position_;
+  int64_t file_offset_;
+  int64_t nbytes_;
+};
+
+std::shared_ptr<InputStream> RandomAccessFile::GetStream(
+    std::shared_ptr<RandomAccessFile> file, int64_t file_offset, int64_t nbytes) {
+  return std::make_shared<FileSegmentReader>(std::move(file), file_offset, nbytes);
+}
+
 }  // namespace io
 }  // namespace arrow
diff --git a/cpp/src/arrow/io/interfaces.h b/cpp/src/arrow/io/interfaces.h
index 678366b..95022e3 100644
--- a/cpp/src/arrow/io/interfaces.h
+++ b/cpp/src/arrow/io/interfaces.h
@@ -144,6 +144,16 @@ class ARROW_EXPORT RandomAccessFile : public InputStream, public Seekable {
   /// Necessary because we hold a std::unique_ptr
   ~RandomAccessFile() override;
 
+  /// \brief Create an isolated InputStream that reads a segment of a
+  /// RandomAccessFile. Multiple such stream can be created and used
+  /// independently without interference
+  /// \param[in] file a file instance
+  /// \param[in] file_offset the starting position in the file
+  /// \param[in] nbytes the extent of bytes to read. The file should have
+  /// sufficient bytes available
+  static std::shared_ptr<InputStream> GetStream(std::shared_ptr<RandomAccessFile> file,
+                                                int64_t file_offset, int64_t nbytes);
+
   virtual Status GetSize(int64_t* size) = 0;
 
   /// \brief Read nbytes at position, provide default implementations using
diff --git a/cpp/src/arrow/io/memory-test.cc b/cpp/src/arrow/io/memory-test.cc
index 6fc0d05..d545520 100644
--- a/cpp/src/arrow/io/memory-test.cc
+++ b/cpp/src/arrow/io/memory-test.cc
@@ -218,6 +218,73 @@ TEST(TestBufferReader, RetainParentReference) {
   ASSERT_EQ(0, std::memcmp(slice2->data(), data.c_str() + 4, 6));
 }
 
+TEST(TestRandomAccessFile, GetStream) {
+  std::string data = "data1data2data3data4data5";
+
+  auto buf = std::make_shared<Buffer>(data);
+  auto file = std::make_shared<BufferReader>(buf);
+
+  std::shared_ptr<InputStream> stream1, stream2;
+
+  stream1 = RandomAccessFile::GetStream(file, 0, 10);
+  stream2 = RandomAccessFile::GetStream(file, 9, 16);
+
+  int64_t position = -1;
+  ASSERT_OK(stream1->Tell(&position));
+  ASSERT_EQ(0, position);
+
+  std::shared_ptr<Buffer> buf2;
+  uint8_t buf3[20];
+
+  int64_t bytes_read = -1;
+  ASSERT_OK(stream2->Read(4, &bytes_read, buf3));
+  ASSERT_EQ(4, bytes_read);
+  ASSERT_EQ(0, std::memcmp(buf3, "2dat", 4));
+  ASSERT_OK(stream2->Tell(&position));
+  ASSERT_EQ(4, position);
+
+  ASSERT_OK(stream1->Read(6, &bytes_read, buf3));
+  ASSERT_EQ(6, bytes_read);
+  ASSERT_EQ(0, std::memcmp(buf3, "data1d", 6));
+  ASSERT_OK(stream1->Tell(&position));
+  ASSERT_EQ(6, position);
+
+  ASSERT_OK(stream1->Read(2, &buf2));
+  ASSERT_TRUE(SliceBuffer(buf, 6, 2)->Equals(*buf2));
+
+  // Read to end of each stream
+  ASSERT_OK(stream1->Read(4, &bytes_read, buf3));
+  ASSERT_EQ(2, bytes_read);
+  ASSERT_EQ(0, std::memcmp(buf3, "a2", 2));
+  ASSERT_OK(stream1->Tell(&position));
+  ASSERT_EQ(10, position);
+
+  ASSERT_OK(stream1->Read(1, &bytes_read, buf3));
+  ASSERT_EQ(0, bytes_read);
+  ASSERT_OK(stream1->Tell(&position));
+  ASSERT_EQ(10, position);
+
+  // stream2 had its extent limited
+  ASSERT_OK(stream2->Read(20, &buf2));
+  ASSERT_TRUE(SliceBuffer(buf, 13, 12)->Equals(*buf2));
+
+  ASSERT_OK(stream2->Read(1, &buf2));
+  ASSERT_EQ(0, buf2->size());
+  ASSERT_OK(stream2->Tell(&position));
+  ASSERT_EQ(16, position);
+
+  ASSERT_OK(stream1->Close());
+
+  // idempotent
+  ASSERT_OK(stream1->Close());
+  ASSERT_TRUE(stream1->closed());
+
+  // Check whether closed
+  ASSERT_RAISES(IOError, stream1->Tell(&position));
+  ASSERT_RAISES(IOError, stream1->Read(1, &buf2));
+  ASSERT_RAISES(IOError, stream1->Read(1, &bytes_read, buf3));
+}
+
 TEST(TestMemcopy, ParallelMemcopy) {
 #if defined(ARROW_VALGRIND)
   // Compensate for Valgrind's slowness
diff --git a/cpp/src/arrow/testing/random.h b/cpp/src/arrow/testing/random.h
index 75f6bdf..cb73e63 100644
--- a/cpp/src/arrow/testing/random.h
+++ b/cpp/src/arrow/testing/random.h
@@ -50,7 +50,7 @@ class ARROW_EXPORT RandomArrayGenerator {
   ///
   /// \return a generated Array
   std::shared_ptr<arrow::Array> Boolean(int64_t size, double probability,
-                                        double null_probability);
+                                        double null_probability = 0);
 
   /// \brief Generates a random UInt8Array
   ///
@@ -61,7 +61,7 @@ class ARROW_EXPORT RandomArrayGenerator {
   ///
   /// \return a generated Array
   std::shared_ptr<arrow::Array> UInt8(int64_t size, uint8_t min, uint8_t max,
-                                      double null_probability);
+                                      double null_probability = 0);
 
   /// \brief Generates a random Int8Array
   ///
@@ -72,7 +72,7 @@ class ARROW_EXPORT RandomArrayGenerator {
   ///
   /// \return a generated Array
   std::shared_ptr<arrow::Array> Int8(int64_t size, int8_t min, int8_t max,
-                                     double null_probability);
+                                     double null_probability = 0);
 
   /// \brief Generates a random UInt16Array
   ///
@@ -83,7 +83,7 @@ class ARROW_EXPORT RandomArrayGenerator {
   ///
   /// \return a generated Array
   std::shared_ptr<arrow::Array> UInt16(int64_t size, uint16_t min, uint16_t max,
-                                       double null_probability);
+                                       double null_probability = 0);
 
   /// \brief Generates a random Int16Array
   ///
@@ -94,7 +94,7 @@ class ARROW_EXPORT RandomArrayGenerator {
   ///
   /// \return a generated Array
   std::shared_ptr<arrow::Array> Int16(int64_t size, int16_t min, int16_t max,
-                                      double null_probability);
+                                      double null_probability = 0);
 
   /// \brief Generates a random UInt32Array
   ///
@@ -105,7 +105,7 @@ class ARROW_EXPORT RandomArrayGenerator {
   ///
   /// \return a generated Array
   std::shared_ptr<arrow::Array> UInt32(int64_t size, uint32_t min, uint32_t max,
-                                       double null_probability);
+                                       double null_probability = 0);
 
   /// \brief Generates a random Int32Array
   ///
@@ -116,7 +116,7 @@ class ARROW_EXPORT RandomArrayGenerator {
   ///
   /// \return a generated Array
   std::shared_ptr<arrow::Array> Int32(int64_t size, int32_t min, int32_t max,
-                                      double null_probability);
+                                      double null_probability = 0);
 
   /// \brief Generates a random UInt64Array
   ///
@@ -127,7 +127,7 @@ class ARROW_EXPORT RandomArrayGenerator {
   ///
   /// \return a generated Array
   std::shared_ptr<arrow::Array> UInt64(int64_t size, uint64_t min, uint64_t max,
-                                       double null_probability);
+                                       double null_probability = 0);
 
   /// \brief Generates a random Int64Array
   ///
@@ -138,7 +138,7 @@ class ARROW_EXPORT RandomArrayGenerator {
   ///
   /// \return a generated Array
   std::shared_ptr<arrow::Array> Int64(int64_t size, int64_t min, int64_t max,
-                                      double null_probability);
+                                      double null_probability = 0);
 
   /// \brief Generates a random FloatArray
   ///
@@ -149,7 +149,7 @@ class ARROW_EXPORT RandomArrayGenerator {
   ///
   /// \return a generated Array
   std::shared_ptr<arrow::Array> Float32(int64_t size, float min, float max,
-                                        double null_probability);
+                                        double null_probability = 0);
 
   /// \brief Generates a random DoubleArray
   ///
@@ -160,11 +160,11 @@ class ARROW_EXPORT RandomArrayGenerator {
   ///
   /// \return a generated Array
   std::shared_ptr<arrow::Array> Float64(int64_t size, double min, double max,
-                                        double null_probability);
+                                        double null_probability = 0);
 
   template <typename ArrowType, typename CType = typename ArrowType::c_type>
   std::shared_ptr<arrow::Array> Numeric(int64_t size, CType min, CType max,
-                                        double null_probability) {
+                                        double null_probability = 0) {
     switch (ArrowType::type_id) {
       case Type::UINT8:
         return UInt8(size, static_cast<uint8_t>(min), static_cast<uint8_t>(max),
@@ -212,7 +212,7 @@ class ARROW_EXPORT RandomArrayGenerator {
   ///
   /// \return a generated Array
   std::shared_ptr<arrow::Array> String(int64_t size, int32_t min_length,
-                                       int32_t max_length, double null_probability);
+                                       int32_t max_length, double null_probability = 0);
 
   /// \brief Generates a random LargeStringArray
   ///
@@ -225,7 +225,8 @@ class ARROW_EXPORT RandomArrayGenerator {
   ///
   /// \return a generated Array
   std::shared_ptr<arrow::Array> LargeString(int64_t size, int32_t min_length,
-                                            int32_t max_length, double null_probability);
+                                            int32_t max_length,
+                                            double null_probability = 0);
 
   /// \brief Generates a random StringArray with repeated values
   ///
@@ -241,12 +242,12 @@ class ARROW_EXPORT RandomArrayGenerator {
   /// \return a generated Array
   std::shared_ptr<arrow::Array> StringWithRepeats(int64_t size, int64_t unique,
                                                   int32_t min_length, int32_t max_length,
-                                                  double null_probability);
+                                                  double null_probability = 0);
 
   /// \brief Like StringWithRepeats but return BinaryArray
   std::shared_ptr<arrow::Array> BinaryWithRepeats(int64_t size, int64_t unique,
                                                   int32_t min_length, int32_t max_length,
-                                                  double null_probability);
+                                                  double null_probability = 0);
 
   SeedType seed() { return seed_distribution_(seed_rng_); }
 
diff --git a/cpp/src/parquet/properties.cc b/cpp/src/parquet/properties.cc
index e8ffad4..67b9ad4 100644
--- a/cpp/src/parquet/properties.cc
+++ b/cpp/src/parquet/properties.cc
@@ -26,10 +26,13 @@ namespace parquet {
 std::shared_ptr<ArrowInputStream> ReaderProperties::GetStream(
     std::shared_ptr<ArrowInputFile> source, int64_t start, int64_t num_bytes) {
   if (buffered_stream_enabled_) {
+    // ARROW-6180 / PARQUET-1636 Create isolated reader that references segment
+    // of source
+    std::shared_ptr<::arrow::io::InputStream> safe_stream =
+        ::arrow::io::RandomAccessFile::GetStream(source, start, num_bytes);
     std::shared_ptr<::arrow::io::BufferedInputStream> stream;
-    PARQUET_THROW_NOT_OK(source->Seek(start));
     PARQUET_THROW_NOT_OK(::arrow::io::BufferedInputStream::Create(
-        buffer_size_, pool_, source, &stream, num_bytes));
+        buffer_size_, pool_, safe_stream, &stream, num_bytes));
     return std::move(stream);
   } else {
     std::shared_ptr<Buffer> data;
diff --git a/cpp/src/parquet/properties.h b/cpp/src/parquet/properties.h
index 40a358c..b7e55f0 100644
--- a/cpp/src/parquet/properties.h
+++ b/cpp/src/parquet/properties.h
@@ -37,7 +37,7 @@ struct ParquetVersion {
   enum type { PARQUET_1_0, PARQUET_2_0 };
 };
 
-static int64_t DEFAULT_BUFFER_SIZE = 0;
+static int64_t DEFAULT_BUFFER_SIZE = 1024;
 static bool DEFAULT_USE_BUFFERED_STREAM = false;
 
 class PARQUET_EXPORT ReaderProperties {
diff --git a/cpp/src/parquet/reader-test.cc b/cpp/src/parquet/reader-test.cc
index ce4197f..2651bf3 100644
--- a/cpp/src/parquet/reader-test.cc
+++ b/cpp/src/parquet/reader-test.cc
@@ -23,11 +23,16 @@
 #include <memory>
 #include <string>
 
+#include "arrow/array.h"
+#include "arrow/buffer.h"
 #include "arrow/io/file.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/testing/random.h"
 
 #include "parquet/column_reader.h"
 #include "parquet/column_scanner.h"
 #include "parquet/file_reader.h"
+#include "parquet/file_writer.h"
 #include "parquet/metadata.h"
 #include "parquet/platform.h"
 #include "parquet/printer.h"
@@ -35,6 +40,9 @@
 
 namespace parquet {
 
+using schema::GroupNode;
+using schema::PrimitiveNode;
+
 using ReadableFile = ::arrow::io::ReadableFile;
 
 std::string data_file(const char* file) {
@@ -383,4 +391,92 @@ TEST(TestJSONWithLocalFile, JSONOutput) {
   ASSERT_EQ(json_output, ss.str());
 }
 
+TEST(TestFileReader, BufferedReads) {
+  // PARQUET-1636: Buffered reads were broken before introduction of
+  // RandomAccessFile::GetStream
+
+  const int num_columns = 10;
+  const int num_rows = 1000;
+
+  // Make schema
+  schema::NodeVector fields;
+  for (int i = 0; i < num_columns; ++i) {
+    fields.push_back(PrimitiveNode::Make("field" + std::to_string(i),
+                                         Repetition::REQUIRED, Type::DOUBLE,
+                                         ConvertedType::NONE));
+  }
+  auto schema = std::static_pointer_cast<GroupNode>(
+      GroupNode::Make("schema", Repetition::REQUIRED, fields));
+
+  // Write small batches and small data pages
+  std::shared_ptr<WriterProperties> writer_props =
+      WriterProperties::Builder().write_batch_size(64)->data_pagesize(128)->build();
+
+  std::shared_ptr<arrow::io::BufferOutputStream> out_file;
+  ASSERT_OK(arrow::io::BufferOutputStream::Create(1024, arrow::default_memory_pool(),
+                                                  &out_file));
+  std::shared_ptr<ParquetFileWriter> file_writer =
+      ParquetFileWriter::Open(out_file, schema, writer_props);
+
+  RowGroupWriter* rg_writer = file_writer->AppendRowGroup();
+
+  std::vector<std::shared_ptr<arrow::Array>> column_data;
+  ::arrow::random::RandomArrayGenerator rag(0);
+
+  // Scratch space for reads
+  std::vector<std::shared_ptr<Buffer>> scratch_space;
+
+  // write columns
+  for (int col_index = 0; col_index < num_columns; ++col_index) {
+    DoubleWriter* writer = static_cast<DoubleWriter*>(rg_writer->NextColumn());
+    std::shared_ptr<arrow::Array> col = rag.Float64(num_rows, 0, 100);
+    const auto& col_typed = static_cast<const ::arrow::DoubleArray&>(*col);
+    writer->WriteBatch(num_rows, nullptr, nullptr, col_typed.raw_values());
+    column_data.push_back(col);
+
+    // We use this later for reading back the columns
+    scratch_space.push_back(
+        AllocateBuffer(::arrow::default_memory_pool(), num_rows * sizeof(double)));
+  }
+  rg_writer->Close();
+  file_writer->Close();
+
+  // Open the reader
+  std::shared_ptr<Buffer> file_buf;
+  ASSERT_OK(out_file->Finish(&file_buf));
+  auto in_file = std::make_shared<arrow::io::BufferReader>(file_buf);
+
+  ReaderProperties reader_props;
+  reader_props.enable_buffered_stream();
+  reader_props.set_buffer_size(64);
+  std::unique_ptr<ParquetFileReader> file_reader =
+      ParquetFileReader::Open(in_file, reader_props);
+
+  auto row_group = file_reader->RowGroup(0);
+  std::vector<std::shared_ptr<DoubleReader>> col_readers;
+  for (int col_index = 0; col_index < num_columns; ++col_index) {
+    col_readers.push_back(
+        std::static_pointer_cast<DoubleReader>(row_group->Column(col_index)));
+  }
+
+  for (int row_index = 0; row_index < num_rows; ++row_index) {
+    for (int col_index = 0; col_index < num_columns; ++col_index) {
+      double* out =
+          reinterpret_cast<double*>(scratch_space[col_index]->mutable_data()) + row_index;
+      int64_t values_read = 0;
+      int64_t levels_read =
+          col_readers[col_index]->ReadBatch(1, nullptr, nullptr, out, &values_read);
+
+      ASSERT_EQ(1, levels_read);
+      ASSERT_EQ(1, values_read);
+    }
+  }
+
+  // Check the results
+  for (int col_index = 0; col_index < num_columns; ++col_index) {
+    ASSERT_TRUE(
+        scratch_space[col_index]->Equals(*column_data[col_index]->data()->buffers[1]));
+  }
+}
+
 }  // namespace parquet