You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2020/04/23 10:59:52 UTC

[GitHub] [arrow] pitrou commented on a change in pull request #6744: PARQUET-1820: [C++] pre-buffer specified columns of row group

pitrou commented on a change in pull request #6744:
URL: https://github.com/apache/arrow/pull/6744#discussion_r413715323



##########
File path: cpp/src/arrow/filesystem/s3fs_benchmark.cc
##########
@@ -331,10 +358,64 @@ BENCHMARK_DEFINE_F(MinioFixture, ReadCoalesced500Mib)(benchmark::State& st) {
 }
 BENCHMARK_REGISTER_F(MinioFixture, ReadCoalesced500Mib)->UseRealTime();
 
-BENCHMARK_DEFINE_F(MinioFixture, ReadParquet250K)(benchmark::State& st) {
-  ParquetRead(st, fs_.get(), bucket_ + "/pq_c100_r250k");
-}
-BENCHMARK_REGISTER_F(MinioFixture, ReadParquet250K)->UseRealTime();
+// Helpers to generate various multiple benchmarks for a given Parquet file.
+
+// NAME: the base name of the benchmark.
+// ROWS: the number of rows in the Parquet file.
+// COLS: the number of columns in the Parquet file.
+// STRATEGY: how to read the file (ReadTable or GetRecordBatchReader)
+#define PQ_BENCHMARK_IMPL(NAME, ROWS, COLS, STRATEGY)                                 \
+  BENCHMARK_DEFINE_F(MinioFixture, NAME##STRATEGY##AllNaive)(benchmark::State & st) { \
+    std::vector<int> column_indices(COLS);                                            \
+    std::iota(column_indices.begin(), column_indices.end(), 0);                       \
+    std::stringstream ss;                                                             \
+    ss << bucket_ << "/pq_c" << COLS << "_r" << ROWS << "k";                          \
+    ParquetRead(st, fs_.get(), ss.str(), column_indices, false, #STRATEGY);           \

Review comment:
       From a code maintainability standpoint, can we avoid putting so much logic in C macros? Ideally, the macro can call into a templated function.

##########
File path: cpp/src/parquet/file_reader.cc
##########
@@ -78,14 +79,46 @@ std::unique_ptr<PageReader> RowGroupReader::GetColumnPageReader(int i) {
 // Returns the rowgroup metadata
 const RowGroupMetaData* RowGroupReader::metadata() const { return contents_->metadata(); }
 
+/// Compute the section of the file that should be read for the given
+/// row group and column chunk.
+arrow::io::ReadRange ComputeColumnChunkRange(FileMetaData* file_metadata,
+                                             int64_t source_size, int row_group_index,
+                                             int column_index) {
+  auto row_group_metadata = file_metadata->RowGroup(row_group_index);
+  auto column_metadata = row_group_metadata->ColumnChunk(column_index);
+
+  int64_t col_start = column_metadata->data_page_offset();
+  if (column_metadata->has_dictionary_page() &&
+      column_metadata->dictionary_page_offset() > 0 &&
+      col_start > column_metadata->dictionary_page_offset()) {
+    col_start = column_metadata->dictionary_page_offset();
+  }
+
+  int64_t col_length = column_metadata->total_compressed_size();
+  // PARQUET-816 workaround for old files created by older parquet-mr
+  const ApplicationVersion& version = file_metadata->writer_version();
+  if (version.VersionLt(ApplicationVersion::PARQUET_816_FIXED_VERSION())) {
+    // The Parquet MR writer had a bug in 1.2.8 and below where it didn't include the
+    // dictionary page header size in total_compressed_size and total_uncompressed_size
+    // (see IMPALA-694). We add padding to compensate.
+    int64_t bytes_remaining = source_size - (col_start + col_length);
+    int64_t padding = std::min<int64_t>(kMaxDictHeaderSize, bytes_remaining);
+    col_length += padding;
+  }
+
+  return {col_start, col_length};
+}
+
 // RowGroupReader::Contents implementation for the Parquet file specification
 class SerializedRowGroup : public RowGroupReader::Contents {
  public:
-  SerializedRowGroup(std::shared_ptr<ArrowInputFile> source, int64_t source_size,
-                     FileMetaData* file_metadata, int row_group_number,
-                     const ReaderProperties& props,
+  SerializedRowGroup(std::shared_ptr<ArrowInputFile> source,
+                     std::shared_ptr<::arrow::io::internal::ReadRangeCache> cached_source,
+                     int64_t source_size, FileMetaData* file_metadata,
+                     int row_group_number, const ReaderProperties& props,
                      std::shared_ptr<InternalFileDecryptor> file_decryptor = nullptr)
       : source_(std::move(source)),
+        cached_source_(cached_source ? std::move(cached_source) : nullptr),

Review comment:
       I don't think the ternary expression is needed. You should be able to move a null `shared_ptr` alright.

##########
File path: cpp/src/parquet/file_reader.cc
##########
@@ -212,6 +237,21 @@ class SerializedFile : public ParquetFileReader::Contents {
     file_metadata_ = std::move(metadata);
   }
 
+  void PreBuffer(const std::vector<int>& row_groups,
+                 const std::vector<int>& column_indices,
+                 const ::arrow::io::CacheOptions& options) {
+    cached_source_ =
+        std::make_shared<arrow::io::internal::ReadRangeCache>(source_, options);

Review comment:
       Be careful: the cache is unbounded. Does it mean you're willing to let all these row groups survive in memory until the reader gets destroyed?
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org