You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/04/27 01:00:39 UTC

[GitHub] [arrow] westonpace commented on a change in pull request #9620: ARROW-11843: [C++] Provide reentrant Parquet reader

westonpace commented on a change in pull request #9620:
URL: https://github.com/apache/arrow/pull/9620#discussion_r620702796



##########
File path: cpp/src/arrow/dataset/file_parquet.cc
##########
@@ -325,6 +325,45 @@ Result<std::unique_ptr<parquet::arrow::FileReader>> ParquetFileFormat::GetReader
   return std::move(arrow_reader);
 }
 
+Future<std::shared_ptr<parquet::arrow::FileReader>> ParquetFileFormat::GetReaderAsync(
+    const FileSource& source, ScanOptions* options) const {
+  ARROW_ASSIGN_OR_RAISE(auto parquet_scan_options,
+                        GetFragmentScanOptions<ParquetFragmentScanOptions>(
+                            kParquetTypeName, options, default_fragment_scan_options));
+  MemoryPool* pool = options ? options->pool : default_memory_pool();
+  auto properties = MakeReaderProperties(*this, parquet_scan_options.get(), pool);
+  ARROW_ASSIGN_OR_RAISE(auto input, source.Open());
+  // Some ugliness needed due to having Future<unique_ptr<>> here
+  auto reader_fut =
+      parquet::ParquetFileReader::OpenAsync(std::move(input), std::move(properties));
+  auto path = source.path();
+  auto self = checked_pointer_cast<const ParquetFileFormat>(shared_from_this());
+  return reader_fut.Then(
+      [=](const std::unique_ptr<parquet::ParquetFileReader>&) mutable
+      -> Result<std::shared_ptr<parquet::arrow::FileReader>> {
+        ARROW_ASSIGN_OR_RAISE(std::unique_ptr<parquet::ParquetFileReader> reader,
+                              reader_fut.MoveResult());
+        std::shared_ptr<parquet::FileMetaData> metadata = reader->metadata();
+        auto arrow_properties = MakeArrowReaderProperties(*self, *metadata);
+        if (options) {
+          arrow_properties.set_batch_size(options->batch_size);
+        }
+        if (options && !options->use_threads) {
+          arrow_properties.set_use_threads(
+              parquet_scan_options->enable_parallel_column_conversion);

Review comment:
       I don't think we need to look at `enable_parallel_column_conversion` anymore since we're async?

##########
File path: cpp/src/arrow/io/caching.cc
##########
@@ -188,17 +223,51 @@ Result<std::shared_ptr<Buffer>> ReadRangeCache::Read(ReadRange range) {
       [](const RangeCacheEntry& entry, const ReadRange& range) {
         return entry.range.offset + entry.range.length < range.offset + range.length;
       });
+  std::unique_lock<std::mutex> guard = impl_->TakeGuard();
   if (it != impl_->entries.end() && it->range.Contains(range)) {
-    ARROW_ASSIGN_OR_RAISE(auto buf, it->future.result());
+    auto fut = impl_->GetFuture(&*it);

Review comment:
       `&*it` -> `it` unless I'm missing something

##########
File path: cpp/src/arrow/io/caching.cc
##########
@@ -150,11 +153,44 @@ struct ReadRangeCache::Impl {
       entries = std::move(new_entries);
     }
   }
+
+  virtual std::unique_lock<std::mutex> TakeGuard() {
+    return std::unique_lock<std::mutex>();
+  }
+
+  virtual Future<std::shared_ptr<Buffer>> GetFuture(RangeCacheEntry* entry) {
+    return entry->future;
+  }
+
+  virtual RangeCacheEntry Cache(const ReadRange& range) {
+    return {range, file->ReadAsync(ctx, range.offset, range.length)};
+  }
+};
+
+struct ReadRangeCache::LazyImpl : public ReadRangeCache::Impl {
+  std::mutex entry_mutex;
+
+  virtual ~LazyImpl() = default;
+
+  std::unique_lock<std::mutex> TakeGuard() override {
+    return std::unique_lock<std::mutex>(entry_mutex);
+  }
+
+  Future<std::shared_ptr<Buffer>> GetFuture(RangeCacheEntry* entry) override {

Review comment:
       `GetFuture` is not a very clear name.  I'm not sure what a better one would be.  Maybe `GetOrIssueRead`?

##########
File path: cpp/src/parquet/arrow/reader.cc
##########
@@ -996,6 +1100,13 @@ Status FileReaderImpl::ReadRowGroups(const std::vector<int>& row_groups,
     END_PARQUET_CATCH_EXCEPTIONS
   }
 
+  return ReadRowGroupsImpl(row_groups, column_indices, out);
+}
+
+// Also used by RowGroupGenerator - skip bounds check/pre-buffer to avoid doing that twice
+Status FileReaderImpl::ReadRowGroupsImpl(const std::vector<int>& row_groups,

Review comment:
       Naming nit: Maybe `DecodeRowGroup`?

##########
File path: cpp/src/arrow/python/io.cc
##########
@@ -265,6 +266,14 @@ Result<std::shared_ptr<Buffer>> PyReadableFile::ReadAt(int64_t position, int64_t
   });
 }
 
+Future<std::shared_ptr<Buffer>> PyReadableFile::ReadAsync(const io::IOContext&,

Review comment:
       This could become a problem (to be addressed in ARROW-12560) if nothing actually creates a thread task.

##########
File path: cpp/src/arrow/io/caching.cc
##########
@@ -150,11 +153,44 @@ struct ReadRangeCache::Impl {
       entries = std::move(new_entries);
     }
   }
+
+  virtual std::unique_lock<std::mutex> TakeGuard() {
+    return std::unique_lock<std::mutex>();
+  }
+
+  virtual Future<std::shared_ptr<Buffer>> GetFuture(RangeCacheEntry* entry) {
+    return entry->future;
+  }
+
+  virtual RangeCacheEntry Cache(const ReadRange& range) {
+    return {range, file->ReadAsync(ctx, range.offset, range.length)};
+  }
+};
+
+struct ReadRangeCache::LazyImpl : public ReadRangeCache::Impl {
+  std::mutex entry_mutex;

Review comment:
       Since this only exists on the lazy impl I'm guessing that means we are guarding `entries` because two or more threads might try to find-or-create `entry->future` at the same time but I'm not sure.
   
   If that is the case it seems like the mutex could be specific to `LazyImpl` (and not part of the interface)
   
   On the other hand, maybe you're guarding against modifying `entries` while iterating `entries` but I don't think made any changes regarding where `entries`' size is modified so if that is a concern it would be a concern with the eager version as well.

##########
File path: cpp/src/parquet/file_reader.cc
##########
@@ -277,43 +292,56 @@ class SerializedFile : public ParquetFileReader::Contents {
     }
 
     int64_t footer_read_size = std::min(source_size_, kDefaultFooterReadSize);
-    PARQUET_ASSIGN_OR_THROW(
-        auto footer_buffer,
-        source_->ReadAt(source_size_ - footer_read_size, footer_read_size));
-
-    // Check if all bytes are read. Check if last 4 bytes read have the magic bits
-    if (footer_buffer->size() != footer_read_size ||
-        (memcmp(footer_buffer->data() + footer_read_size - 4, kParquetMagic, 4) != 0 &&
-         memcmp(footer_buffer->data() + footer_read_size - 4, kParquetEMagic, 4) != 0)) {
-      throw ParquetInvalidOrCorruptedFileException(
-          "Parquet magic bytes not found in footer. Either the file is corrupted or this "
-          "is not a parquet file.");
-    }
-
-    if (memcmp(footer_buffer->data() + footer_read_size - 4, kParquetEMagic, 4) == 0) {
-      // Encrypted file with Encrypted footer.
-      ParseMetaDataOfEncryptedFileWithEncryptedFooter(footer_buffer, footer_read_size);
-      return;
-    }
+    return source_->ReadAsync(source_size_ - footer_read_size, footer_read_size)
+        .Then([=](const std::shared_ptr<::arrow::Buffer>& footer_buffer)
+                  -> ::arrow::Future<> {
+          // Check if all bytes are read. Check if last 4 bytes read have the magic bits
+          if (footer_buffer->size() != footer_read_size ||
+              (memcmp(footer_buffer->data() + footer_read_size - 4, kParquetMagic, 4) !=
+                   0 &&
+               memcmp(footer_buffer->data() + footer_read_size - 4, kParquetEMagic, 4) !=
+                   0)) {
+            return ::arrow::Status::FromDetailAndArgs(
+                ::arrow::StatusCode::IOError,
+                ParquetInvalidOrCorruptedFileStatusDetail::Instance(),
+                "Parquet magic bytes not found in footer. Either the file is corrupted "
+                "or this is not a parquet file.");
+          }
 
-    // No encryption or encryption with plaintext footer mode.
-    std::shared_ptr<Buffer> metadata_buffer;
-    uint32_t metadata_len, read_metadata_len;
-    ParseUnencryptedFileMetadata(footer_buffer, footer_read_size, &metadata_buffer,
-                                 &metadata_len, &read_metadata_len);
+          if (memcmp(footer_buffer->data() + footer_read_size - 4, kParquetEMagic, 4) ==
+              0) {
+            // Encrypted file with Encrypted footer.
+            BEGIN_PARQUET_CATCH_EXCEPTIONS
+            return ParseMetaDataOfEncryptedFileWithEncryptedFooter(footer_buffer,

Review comment:
       Does this function throw exceptions?  It looks to me like that got changed when it changed to future.

##########
File path: cpp/src/parquet/arrow/reader.cc
##########
@@ -968,6 +979,99 @@ Status FileReaderImpl::GetRecordBatchReader(const std::vector<int>& row_groups,
   return Status::OK();
 }
 
+/// Given a file reader and a list of row groups, this is a generator of record
+/// batch vectors (where each vector is the contents of a single row group).
+class RowGroupGenerator {
+ public:
+  using Item = ::arrow::AsyncGenerator<std::shared_ptr<::arrow::RecordBatch>>;
+
+  explicit RowGroupGenerator(FileReaderImpl* arrow_reader,
+                             ::arrow::internal::Executor* executor,
+                             std::vector<int> row_groups, std::vector<int> column_indices)
+      : arrow_reader_(arrow_reader),

Review comment:
       Do we want this to take `std::shared_ptr<FileReaderImpl>` so that we don't have to remember to keep the reader alive elsewhere?  On the other hand, if you don't want to take in `shared_ptr` then aren't you assuming `FileReaderImpl` is valid for the lifetime and you could make `SubmitRead` and `ReadOneRowGroup` non-static (and remove the `FileReaderImpl* self`)?

##########
File path: cpp/src/parquet/arrow/reader.cc
##########
@@ -968,6 +979,99 @@ Status FileReaderImpl::GetRecordBatchReader(const std::vector<int>& row_groups,
   return Status::OK();
 }
 
+/// Given a file reader and a list of row groups, this is a generator of record
+/// batch vectors (where each vector is the contents of a single row group).
+class RowGroupGenerator {
+ public:
+  using Item = ::arrow::AsyncGenerator<std::shared_ptr<::arrow::RecordBatch>>;

Review comment:
       I could be sold on using `Item` for brevity since it is constrained to this file but maybe `Batches` or `Items` instead of `Item`?

##########
File path: cpp/src/arrow/io/caching.cc
##########
@@ -33,7 +34,7 @@ namespace io {
 
 CacheOptions CacheOptions::Defaults() {
   return CacheOptions{internal::ReadRangeCache::kDefaultHoleSizeLimit,
-                      internal::ReadRangeCache::kDefaultRangeSizeLimit};
+                      internal::ReadRangeCache::kDefaultRangeSizeLimit, false};

Review comment:
       `/*lazy=*/false`?  I'm not really sure what the rules are for those inline names.

##########
File path: cpp/src/arrow/dataset/file_parquet.cc
##########
@@ -325,6 +325,45 @@ Result<std::unique_ptr<parquet::arrow::FileReader>> ParquetFileFormat::GetReader
   return std::move(arrow_reader);
 }
 
+Future<std::shared_ptr<parquet::arrow::FileReader>> ParquetFileFormat::GetReaderAsync(
+    const FileSource& source, ScanOptions* options) const {
+  ARROW_ASSIGN_OR_RAISE(auto parquet_scan_options,
+                        GetFragmentScanOptions<ParquetFragmentScanOptions>(
+                            kParquetTypeName, options, default_fragment_scan_options));
+  MemoryPool* pool = options ? options->pool : default_memory_pool();
+  auto properties = MakeReaderProperties(*this, parquet_scan_options.get(), pool);
+  ARROW_ASSIGN_OR_RAISE(auto input, source.Open());
+  // Some ugliness needed due to having Future<unique_ptr<>> here
+  auto reader_fut =
+      parquet::ParquetFileReader::OpenAsync(std::move(input), std::move(properties));
+  auto path = source.path();
+  auto self = checked_pointer_cast<const ParquetFileFormat>(shared_from_this());
+  return reader_fut.Then(
+      [=](const std::unique_ptr<parquet::ParquetFileReader>&) mutable
+      -> Result<std::shared_ptr<parquet::arrow::FileReader>> {
+        ARROW_ASSIGN_OR_RAISE(std::unique_ptr<parquet::ParquetFileReader> reader,

Review comment:
       I'm a little bit surprised this works at all honestly.  Futures aren't really compatible with move-only types.  I thought you would have gotten a compile error.  This reminded me to create ARROW-12559.  As long as this works I think you're ok.  There is only one callback being added to `reader_fut` and you don't access the value that gets passed in as an arg here.

##########
File path: cpp/src/parquet/arrow/reader.cc
##########
@@ -968,6 +979,99 @@ Status FileReaderImpl::GetRecordBatchReader(const std::vector<int>& row_groups,
   return Status::OK();
 }
 
+/// Given a file reader and a list of row groups, this is a generator of record
+/// batch vectors (where each vector is the contents of a single row group).
+class RowGroupGenerator {
+ public:
+  using Item = ::arrow::AsyncGenerator<std::shared_ptr<::arrow::RecordBatch>>;
+
+  explicit RowGroupGenerator(FileReaderImpl* arrow_reader,
+                             ::arrow::internal::Executor* executor,
+                             std::vector<int> row_groups, std::vector<int> column_indices)
+      : arrow_reader_(arrow_reader),
+        executor_(executor),
+        row_groups_(std::move(row_groups)),
+        column_indices_(std::move(column_indices)),
+        index_(0) {}
+
+  ::arrow::Future<Item> operator()() {
+    if (index_ >= row_groups_.size()) {
+      return ::arrow::Future<Item>::MakeFinished(::arrow::IterationEnd<Item>());
+    }
+    int row_group = row_groups_[index_++];
+    std::vector<int> column_indices = column_indices_;
+    auto reader = arrow_reader_;
+    if (!reader->properties().pre_buffer()) {
+      return SubmitRead(executor_, reader, row_group, column_indices);
+    }
+    BEGIN_PARQUET_CATCH_EXCEPTIONS
+    auto ready = reader->parquet_reader()->WhenBuffered({row_group}, column_indices);
+    if (executor_) ready = executor_->Transfer(ready);
+    return ready.Then(
+        [=](const ::arrow::Result<::arrow::detail::Empty>& s) -> ::arrow::Result<Item> {
+          RETURN_NOT_OK(s);

Review comment:
       I think you can just take in `::arrow::detail::Empty&` and then you don't need the `RETURN_NOT_OK`

##########
File path: cpp/src/parquet/arrow/reader.cc
##########
@@ -968,6 +979,99 @@ Status FileReaderImpl::GetRecordBatchReader(const std::vector<int>& row_groups,
   return Status::OK();
 }
 
+/// Given a file reader and a list of row groups, this is a generator of record
+/// batch vectors (where each vector is the contents of a single row group).
+class RowGroupGenerator {
+ public:
+  using Item = ::arrow::AsyncGenerator<std::shared_ptr<::arrow::RecordBatch>>;
+
+  explicit RowGroupGenerator(FileReaderImpl* arrow_reader,
+                             ::arrow::internal::Executor* executor,
+                             std::vector<int> row_groups, std::vector<int> column_indices)
+      : arrow_reader_(arrow_reader),
+        executor_(executor),
+        row_groups_(std::move(row_groups)),
+        column_indices_(std::move(column_indices)),
+        index_(0) {}
+
+  ::arrow::Future<Item> operator()() {
+    if (index_ >= row_groups_.size()) {
+      return ::arrow::Future<Item>::MakeFinished(::arrow::IterationEnd<Item>());
+    }
+    int row_group = row_groups_[index_++];
+    std::vector<int> column_indices = column_indices_;
+    auto reader = arrow_reader_;
+    if (!reader->properties().pre_buffer()) {
+      return SubmitRead(executor_, reader, row_group, column_indices);
+    }
+    BEGIN_PARQUET_CATCH_EXCEPTIONS

Review comment:
       I don't know the rules around `PARQUET_CATCH_EXCEPTIONS` but will it be a problem that `ReadOneRowGroup` runs (presumably) outside of this guard?

##########
File path: cpp/src/parquet/file_reader.cc
##########
@@ -531,14 +584,42 @@ std::unique_ptr<ParquetFileReader::Contents> ParquetFileReader::Contents::Open(
 
   if (metadata == nullptr) {
     // Validates magic bytes, parses metadata, and initializes the SchemaDescriptor
-    file->ParseMetaData();
+    auto fut = file->ParseMetaData();
+    PARQUET_THROW_NOT_OK(fut.status());
   } else {
     file->set_metadata(std::move(metadata));
   }
 
   return result;
 }
 
+::arrow::Future<std::unique_ptr<ParquetFileReader::Contents>>
+ParquetFileReader::Contents::OpenAsync(std::shared_ptr<ArrowInputFile> source,
+                                       const ReaderProperties& props,
+                                       std::shared_ptr<FileMetaData> metadata) {
+  BEGIN_PARQUET_CATCH_EXCEPTIONS
+  std::unique_ptr<ParquetFileReader::Contents> result(
+      new SerializedFile(std::move(source), props));
+  SerializedFile* file = static_cast<SerializedFile*>(result.get());
+  if (metadata == nullptr) {

Review comment:
       Is this all to allow `unique_ptr` to work?  If so, maybe add a todo comment reference ARROW-12559




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org