You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@parquet.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/08/23 13:18:00 UTC

[jira] [Commented] (PARQUET-1392) [C++] Supply row group indices to parquet::arrow::FileReader::ReadTable

    [ https://issues.apache.org/jira/browse/PARQUET-1392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16590201#comment-16590201 ] 

ASF GitHub Bot commented on PARQUET-1392:
-----------------------------------------

xhochy closed pull request #492: PARQUET-1392: Read multiple RowGroups at once into an Arrow table
URL: https://github.com/apache/parquet-cpp/pull/492
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/src/parquet/arrow/arrow-reader-writer-benchmark.cc b/src/parquet/arrow/arrow-reader-writer-benchmark.cc
index 51eb0c23..41cb88d6 100644
--- a/src/parquet/arrow/arrow-reader-writer-benchmark.cc
+++ b/src/parquet/arrow/arrow-reader-writer-benchmark.cc
@@ -195,6 +195,69 @@ BENCHMARK_TEMPLATE2(BM_ReadColumn, true, DoubleType);
 BENCHMARK_TEMPLATE2(BM_ReadColumn, false, BooleanType);
 BENCHMARK_TEMPLATE2(BM_ReadColumn, true, BooleanType);
 
+static void BM_ReadIndividualRowGroups(::benchmark::State& state) {
+  std::vector<int64_t> values(BENCHMARK_SIZE, 128);
+  std::shared_ptr<::arrow::Table> table = TableFromVector<Int64Type>(values, true);
+  auto output = std::make_shared<InMemoryOutputStream>();
+  // This writes 10 RowGroups
+  EXIT_NOT_OK(
+      WriteTable(*table, ::arrow::default_memory_pool(), output, BENCHMARK_SIZE / 10));
+  std::shared_ptr<Buffer> buffer = output->GetBuffer();
+
+  while (state.KeepRunning()) {
+    auto reader =
+        ParquetFileReader::Open(std::make_shared<::arrow::io::BufferReader>(buffer));
+    FileReader filereader(::arrow::default_memory_pool(), std::move(reader));
+
+    std::vector<std::shared_ptr<::arrow::Table>> tables;
+    for (int i = 0; i < filereader.num_row_groups(); i++) {
+      // Only read the even numbered RowGroups
+      if ((i % 2) == 0) {
+        std::shared_ptr<::arrow::Table> table;
+        EXIT_NOT_OK(filereader.RowGroup(i)->ReadTable(&table));
+        tables.push_back(table);
+      }
+    }
+
+    std::shared_ptr<::arrow::Table> final_table;
+    EXIT_NOT_OK(ConcatenateTables(tables, &final_table));
+  }
+  SetBytesProcessed<true, Int64Type>(state);
+}
+
+BENCHMARK(BM_ReadIndividualRowGroups);
+
+static void BM_ReadMultipleRowGroups(::benchmark::State& state) {
+  std::vector<int64_t> values(BENCHMARK_SIZE, 128);
+  std::shared_ptr<::arrow::Table> table = TableFromVector<Int64Type>(values, true);
+  auto output = std::make_shared<InMemoryOutputStream>();
+  // This writes 10 RowGroups
+  EXIT_NOT_OK(
+      WriteTable(*table, ::arrow::default_memory_pool(), output, BENCHMARK_SIZE / 10));
+  std::shared_ptr<Buffer> buffer = output->GetBuffer();
+
+  while (state.KeepRunning()) {
+    auto reader =
+        ParquetFileReader::Open(std::make_shared<::arrow::io::BufferReader>(buffer));
+    FileReader filereader(::arrow::default_memory_pool(), std::move(reader));
+
+    std::vector<std::shared_ptr<::arrow::Table>> tables;
+    std::vector<int> rgs;
+    for (int i = 0; i < filereader.num_row_groups(); i++) {
+      // Only read the even numbered RowGroups
+      if ((i % 2) == 0) {
+        rgs.push_back(i);
+      }
+    }
+
+    std::shared_ptr<::arrow::Table> table;
+    EXIT_NOT_OK(filereader.ReadRowGroups(rgs, &table));
+  }
+  SetBytesProcessed<true, Int64Type>(state);
+}
+
+BENCHMARK(BM_ReadMultipleRowGroups);
+
 }  // namespace benchmark
 
 }  // namespace parquet
diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc b/src/parquet/arrow/arrow-reader-writer-test.cc
index ad78c22d..5f4e1234 100644
--- a/src/parquet/arrow/arrow-reader-writer-test.cc
+++ b/src/parquet/arrow/arrow-reader-writer-test.cc
@@ -1614,14 +1614,21 @@ TEST(TestArrowReadWrite, ReadSingleRowGroup) {
 
   ASSERT_EQ(2, reader->num_row_groups());
 
-  std::shared_ptr<Table> r1, r2;
+  std::shared_ptr<Table> r1, r2, r3, r4;
   // Read everything
   ASSERT_OK_NO_THROW(reader->ReadRowGroup(0, &r1));
   ASSERT_OK_NO_THROW(reader->RowGroup(1)->ReadTable(&r2));
+  ASSERT_OK_NO_THROW(reader->ReadRowGroups({0, 1}, &r3));
+  ASSERT_OK_NO_THROW(reader->ReadRowGroups({1}, &r4));
 
   std::shared_ptr<Table> concatenated;
+
   ASSERT_OK(ConcatenateTables({r1, r2}, &concatenated));
+  ASSERT_TRUE(table->Equals(*concatenated));
 
+  ASSERT_TRUE(table->Equals(*r3));
+  ASSERT_TRUE(r2->Equals(*r4));
+  ASSERT_OK(ConcatenateTables({r1, r4}, &concatenated));
   ASSERT_TRUE(table->Equals(*concatenated));
 }
 
diff --git a/src/parquet/arrow/reader.cc b/src/parquet/arrow/reader.cc
index d0b397f3..2e4dc815 100644
--- a/src/parquet/arrow/reader.cc
+++ b/src/parquet/arrow/reader.cc
@@ -228,11 +228,15 @@ class FileReader::Impl {
   Status GetSchema(std::shared_ptr<::arrow::Schema>* out);
   Status GetSchema(const std::vector<int>& indices,
                    std::shared_ptr<::arrow::Schema>* out);
+  Status ReadRowGroup(int row_group_index, std::shared_ptr<Table>* table);
   Status ReadRowGroup(int row_group_index, const std::vector<int>& indices,
                       std::shared_ptr<::arrow::Table>* out);
   Status ReadTable(const std::vector<int>& indices, std::shared_ptr<Table>* table);
   Status ReadTable(std::shared_ptr<Table>* table);
-  Status ReadRowGroup(int i, std::shared_ptr<Table>* table);
+  Status ReadRowGroups(const std::vector<int>& row_groups, std::shared_ptr<Table>* table);
+  Status ReadRowGroups(const std::vector<int>& row_groups,
+                       const std::vector<int>& indices,
+                       std::shared_ptr<::arrow::Table>* out);
 
   bool CheckForFlatColumn(const ColumnDescriptor* descr);
   bool CheckForFlatListColumn(const ColumnDescriptor* descr);
@@ -562,6 +566,29 @@ Status FileReader::Impl::ReadTable(std::shared_ptr<Table>* table) {
   return ReadTable(indices, table);
 }
 
+Status FileReader::Impl::ReadRowGroups(const std::vector<int>& row_groups,
+                                       const std::vector<int>& indices,
+                                       std::shared_ptr<Table>* table) {
+  // TODO(PARQUET-1393): Modify the record readers to already read this into a single,
+  // continuous array.
+  std::vector<std::shared_ptr<Table>> tables(row_groups.size(), nullptr);
+
+  for (size_t i = 0; i < row_groups.size(); ++i) {
+    RETURN_NOT_OK(ReadRowGroup(row_groups[i], indices, &tables[i]));
+  }
+  return ConcatenateTables(tables, table);
+}
+
+Status FileReader::Impl::ReadRowGroups(const std::vector<int>& row_groups,
+                                       std::shared_ptr<Table>* table) {
+  std::vector<int> indices(reader_->metadata()->num_columns());
+
+  for (size_t i = 0; i < indices.size(); ++i) {
+    indices[i] = static_cast<int>(i);
+  }
+  return ReadRowGroups(row_groups, indices, table);
+}
+
 Status FileReader::Impl::ReadRowGroup(int i, std::shared_ptr<Table>* table) {
   std::vector<int> indices(reader_->metadata()->num_columns());
 
@@ -683,6 +710,25 @@ Status FileReader::ReadRowGroup(int i, const std::vector<int>& indices,
   }
 }
 
+Status FileReader::ReadRowGroups(const std::vector<int>& row_groups,
+                                 std::shared_ptr<Table>* out) {
+  try {
+    return impl_->ReadRowGroups(row_groups, out);
+  } catch (const ::parquet::ParquetException& e) {
+    return ::arrow::Status::IOError(e.what());
+  }
+}
+
+Status FileReader::ReadRowGroups(const std::vector<int>& row_groups,
+                                 const std::vector<int>& indices,
+                                 std::shared_ptr<Table>* out) {
+  try {
+    return impl_->ReadRowGroups(row_groups, indices, out);
+  } catch (const ::parquet::ParquetException& e) {
+    return ::arrow::Status::IOError(e.what());
+  }
+}
+
 std::shared_ptr<RowGroupReader> FileReader::RowGroup(int row_group_index) {
   return std::shared_ptr<RowGroupReader>(
       new RowGroupReader(impl_.get(), row_group_index));
diff --git a/src/parquet/arrow/reader.h b/src/parquet/arrow/reader.h
index 1e37d898..db135da5 100644
--- a/src/parquet/arrow/reader.h
+++ b/src/parquet/arrow/reader.h
@@ -182,6 +182,13 @@ class PARQUET_EXPORT FileReader {
 
   ::arrow::Status ReadRowGroup(int i, std::shared_ptr<::arrow::Table>* out);
 
+  ::arrow::Status ReadRowGroups(const std::vector<int>& row_groups,
+                                const std::vector<int>& column_indices,
+                                std::shared_ptr<::arrow::Table>* out);
+
+  ::arrow::Status ReadRowGroups(const std::vector<int>& row_groups,
+                                std::shared_ptr<::arrow::Table>* out);
+
   /// \brief Scan file contents with one thread, return number of rows
   ::arrow::Status ScanContents(std::vector<int> columns, const int32_t column_batch_size,
                                int64_t* num_rows);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> [C++] Supply row group indices to parquet::arrow::FileReader::ReadTable
> -----------------------------------------------------------------------
>
>                 Key: PARQUET-1392
>                 URL: https://issues.apache.org/jira/browse/PARQUET-1392
>             Project: Parquet
>          Issue Type: New Feature
>          Components: parquet-cpp
>            Reporter: Uwe L. Korn
>            Assignee: Uwe L. Korn
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: cpp-1.5.0
>
>
> By looking at the Parquet statistics, a user can already determine with its own logic which RowGroups are interesting for him. Currently we only provide functions to read the whole file or individual RowGroups. By supplying {{parquet::arrow}} with the RowGroups at once, it can better optimize its memory allocations as well as make better use of the underlying thread pool.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)