You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/06/10 08:41:52 UTC

[GitHub] [arrow] pitrou commented on a change in pull request #10482: ARROW-12597: [C++] Enable per-row-group parallelism in async Parquet reader

pitrou commented on a change in pull request #10482:
URL: https://github.com/apache/arrow/pull/10482#discussion_r648968980



##########
File path: cpp/src/arrow/util/parallel.h
##########
@@ -44,6 +45,25 @@ Status ParallelFor(int num_tasks, FUNCTION&& func,
   return st;
 }
 
+template <class FUNCTION, typename T,
+          typename R = typename internal::call_traits::return_type<FUNCTION>::ValueType>
+Future<std::vector<R>> ParallelForAsync(
+    std::vector<T> inputs, FUNCTION&& func,

Review comment:
       Is there a particular reason for taking a vector of input instead of a `num_tasks` such as above?

##########
File path: cpp/src/parquet/arrow/reader.cc
##########
@@ -1024,31 +1027,32 @@ class RowGroupGenerator {
       ::arrow::internal::Executor* cpu_executor, std::shared_ptr<FileReaderImpl> self,
       const int row_group, const std::vector<int>& column_indices) {
     if (!cpu_executor) {
-      return Future<RecordBatchGenerator>::MakeFinished(
-          ReadOneRowGroup(self, row_group, column_indices));
+      return ReadOneRowGroup(cpu_executor, self, row_group, column_indices);
     }
     // If we have an executor, then force transfer (even if I/O was complete)
-    return ::arrow::DeferNotOk(
-        cpu_executor->Submit(ReadOneRowGroup, self, row_group, column_indices));
+    return ::arrow::DeferNotOk(cpu_executor->Submit(ReadOneRowGroup, cpu_executor, self,
+                                                    row_group, column_indices));
   }
 
-  static ::arrow::Result<RecordBatchGenerator> ReadOneRowGroup(
-      std::shared_ptr<FileReaderImpl> self, const int row_group,
-      const std::vector<int>& column_indices) {
-    std::shared_ptr<::arrow::Table> table;
+  static ::arrow::Future<RecordBatchGenerator> ReadOneRowGroup(
+      ::arrow::internal::Executor* cpu_executor, std::shared_ptr<FileReaderImpl> self,
+      const int row_group, const std::vector<int>& column_indices) {
     // Skips bound checks/pre-buffering, since we've done that already
-    RETURN_NOT_OK(self->DecodeRowGroups({row_group}, column_indices, &table));
-    auto table_reader = std::make_shared<::arrow::TableBatchReader>(*table);
-    ::arrow::RecordBatchVector batches;
-    while (true) {
-      std::shared_ptr<::arrow::RecordBatch> batch;
-      RETURN_NOT_OK(table_reader->ReadNext(&batch));
-      if (!batch) {
-        break;
-      }
-      batches.push_back(batch);
-    }
-    return ::arrow::MakeVectorGenerator(std::move(batches));
+    return self->DecodeRowGroups(self, {row_group}, column_indices, cpu_executor)
+        .Then([](const std::shared_ptr<Table>& table)
+                  -> ::arrow::Result<RecordBatchGenerator> {
+          auto table_reader = std::make_shared<::arrow::TableBatchReader>(*table);
+          ::arrow::RecordBatchVector batches;
+          while (true) {

Review comment:
       Can use `table_reader->ReadAll(&batches)`?

##########
File path: cpp/src/arrow/util/parallel.h
##########
@@ -44,6 +45,25 @@ Status ParallelFor(int num_tasks, FUNCTION&& func,
   return st;
 }
 
+template <class FUNCTION, typename T,
+          typename R = typename internal::call_traits::return_type<FUNCTION>::ValueType>
+Future<std::vector<R>> ParallelForAsync(
+    std::vector<T> inputs, FUNCTION&& func,
+    Executor* executor = internal::GetCpuThreadPool()) {
+  std::vector<Future<R>> futures(inputs.size());
+  for (size_t i = 0; i < inputs.size(); ++i) {
+    ARROW_ASSIGN_OR_RAISE(futures[i], executor->Submit(func, i, std::move(inputs[i])));
+  }
+  return All(std::move(futures))
+      .Then([](const std::vector<Result<R>>& results) -> Result<std::vector<R>> {

Review comment:
       or `CollectResults`

##########
File path: cpp/src/parquet/arrow/reader.cc
##########
@@ -1024,31 +1027,32 @@ class RowGroupGenerator {
       ::arrow::internal::Executor* cpu_executor, std::shared_ptr<FileReaderImpl> self,
       const int row_group, const std::vector<int>& column_indices) {
     if (!cpu_executor) {
-      return Future<RecordBatchGenerator>::MakeFinished(
-          ReadOneRowGroup(self, row_group, column_indices));
+      return ReadOneRowGroup(cpu_executor, self, row_group, column_indices);
     }
     // If we have an executor, then force transfer (even if I/O was complete)
-    return ::arrow::DeferNotOk(
-        cpu_executor->Submit(ReadOneRowGroup, self, row_group, column_indices));
+    return ::arrow::DeferNotOk(cpu_executor->Submit(ReadOneRowGroup, cpu_executor, self,
+                                                    row_group, column_indices));
   }
 
-  static ::arrow::Result<RecordBatchGenerator> ReadOneRowGroup(
-      std::shared_ptr<FileReaderImpl> self, const int row_group,
-      const std::vector<int>& column_indices) {
-    std::shared_ptr<::arrow::Table> table;
+  static ::arrow::Future<RecordBatchGenerator> ReadOneRowGroup(
+      ::arrow::internal::Executor* cpu_executor, std::shared_ptr<FileReaderImpl> self,
+      const int row_group, const std::vector<int>& column_indices) {
     // Skips bound checks/pre-buffering, since we've done that already
-    RETURN_NOT_OK(self->DecodeRowGroups({row_group}, column_indices, &table));
-    auto table_reader = std::make_shared<::arrow::TableBatchReader>(*table);
-    ::arrow::RecordBatchVector batches;
-    while (true) {
-      std::shared_ptr<::arrow::RecordBatch> batch;
-      RETURN_NOT_OK(table_reader->ReadNext(&batch));
-      if (!batch) {
-        break;
-      }
-      batches.push_back(batch);
-    }
-    return ::arrow::MakeVectorGenerator(std::move(batches));
+    return self->DecodeRowGroups(self, {row_group}, column_indices, cpu_executor)
+        .Then([](const std::shared_ptr<Table>& table)
+                  -> ::arrow::Result<RecordBatchGenerator> {
+          auto table_reader = std::make_shared<::arrow::TableBatchReader>(*table);

Review comment:
       It doesn't look like `make_shared` is useful, just construct the `TableBatchReader` directly.

##########
File path: cpp/src/parquet/arrow/reader.cc
##########
@@ -1134,6 +1138,42 @@ Status FileReaderImpl::DecodeRowGroups(const std::vector<int>& row_groups,
   return (*out)->Validate();
 }
 
+Future<std::shared_ptr<Table>> FileReaderImpl::DecodeRowGroups(

Review comment:
       Is there a way to reconcile this with the sync `DecodeRowGroups` version?

##########
File path: cpp/src/arrow/util/parallel.h
##########
@@ -44,6 +45,25 @@ Status ParallelFor(int num_tasks, FUNCTION&& func,
   return st;
 }
 
+template <class FUNCTION, typename T,
+          typename R = typename internal::call_traits::return_type<FUNCTION>::ValueType>
+Future<std::vector<R>> ParallelForAsync(
+    std::vector<T> inputs, FUNCTION&& func,
+    Executor* executor = internal::GetCpuThreadPool()) {
+  std::vector<Future<R>> futures(inputs.size());
+  for (size_t i = 0; i < inputs.size(); ++i) {
+    ARROW_ASSIGN_OR_RAISE(futures[i], executor->Submit(func, i, std::move(inputs[i])));
+  }
+  return All(std::move(futures))
+      .Then([](const std::vector<Result<R>>& results) -> Result<std::vector<R>> {

Review comment:
       Perhaps we want to expose a `Result<std::vector<T>> GatherResults(const std::vector<Result<T>>&)` somewhere?

##########
File path: cpp/src/parquet/arrow/reader.cc
##########
@@ -1134,6 +1138,42 @@ Status FileReaderImpl::DecodeRowGroups(const std::vector<int>& row_groups,
   return (*out)->Validate();
 }
 
+Future<std::shared_ptr<Table>> FileReaderImpl::DecodeRowGroups(
+    std::shared_ptr<FileReaderImpl> self, const std::vector<int>& row_groups,
+    const std::vector<int>& column_indices, ::arrow::internal::Executor* cpu_executor) {
+  std::vector<std::shared_ptr<ColumnReaderImpl>> readers;
+  std::shared_ptr<::arrow::Schema> result_schema;
+  RETURN_NOT_OK(GetFieldReaders(column_indices, row_groups, &readers, &result_schema));
+
+  // OptionalParallelForAsync requires an executor
+  if (!cpu_executor) cpu_executor = ::arrow::internal::GetCpuThreadPool();
+  return ::arrow::internal::OptionalParallelForAsync(
+             reader_properties_.use_threads(), std::move(readers),
+             [row_groups, self](int i, std::shared_ptr<ColumnReaderImpl> reader)
+                 -> ::arrow::Result<std::shared_ptr<::arrow::ChunkedArray>> {
+               std::shared_ptr<::arrow::ChunkedArray> column;
+               RETURN_NOT_OK(self->ReadColumn(static_cast<int>(i), row_groups,
+                                              reader.get(), &column));
+               return column;
+             },
+             cpu_executor)

Review comment:
       For readability, can you factor out those two callbacks as separate lambdas?

##########
File path: cpp/src/parquet/arrow/reader.cc
##########
@@ -1024,31 +1027,32 @@ class RowGroupGenerator {
       ::arrow::internal::Executor* cpu_executor, std::shared_ptr<FileReaderImpl> self,
       const int row_group, const std::vector<int>& column_indices) {
     if (!cpu_executor) {
-      return Future<RecordBatchGenerator>::MakeFinished(
-          ReadOneRowGroup(self, row_group, column_indices));
+      return ReadOneRowGroup(cpu_executor, self, row_group, column_indices);
     }
     // If we have an executor, then force transfer (even if I/O was complete)
-    return ::arrow::DeferNotOk(
-        cpu_executor->Submit(ReadOneRowGroup, self, row_group, column_indices));
+    return ::arrow::DeferNotOk(cpu_executor->Submit(ReadOneRowGroup, cpu_executor, self,
+                                                    row_group, column_indices));
   }
 
-  static ::arrow::Result<RecordBatchGenerator> ReadOneRowGroup(
-      std::shared_ptr<FileReaderImpl> self, const int row_group,
-      const std::vector<int>& column_indices) {
-    std::shared_ptr<::arrow::Table> table;
+  static ::arrow::Future<RecordBatchGenerator> ReadOneRowGroup(
+      ::arrow::internal::Executor* cpu_executor, std::shared_ptr<FileReaderImpl> self,
+      const int row_group, const std::vector<int>& column_indices) {
     // Skips bound checks/pre-buffering, since we've done that already
-    RETURN_NOT_OK(self->DecodeRowGroups({row_group}, column_indices, &table));
-    auto table_reader = std::make_shared<::arrow::TableBatchReader>(*table);
-    ::arrow::RecordBatchVector batches;
-    while (true) {
-      std::shared_ptr<::arrow::RecordBatch> batch;
-      RETURN_NOT_OK(table_reader->ReadNext(&batch));
-      if (!batch) {
-        break;
-      }
-      batches.push_back(batch);
-    }
-    return ::arrow::MakeVectorGenerator(std::move(batches));
+    return self->DecodeRowGroups(self, {row_group}, column_indices, cpu_executor)
+        .Then([](const std::shared_ptr<Table>& table)
+                  -> ::arrow::Result<RecordBatchGenerator> {

Review comment:
       Hmm... so this is reading an entire table and then creates a generator out of that (already read) table? Is there a point or could you just return a `Future<RecordBatchVector>`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org