You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/01/27 13:26:53 UTC

[GitHub] [arrow] lidavidm commented on a change in pull request #12275: ARROW-15262: [C++] Create a ToTable sink node

lidavidm commented on a change in pull request #12275:
URL: https://github.com/apache/arrow/pull/12275#discussion_r793595631



##########
File path: cpp/src/arrow/compute/exec/sink_node.cc
##########
@@ -232,6 +232,77 @@ class ConsumingSinkNode : public ExecNode {
   std::shared_ptr<SinkNodeConsumer> consumer_;
 };
 
+/**
+ * @brief This node is an extension on ConsumingSinkNode
+ * to facilitate to get the output from an execution plan
+ * as a table. We define a custom SinkNodeConsumer to
+ * enable this functionality.
+ */
+
+struct TableSinkNodeConsumer : public arrow::compute::SinkNodeConsumer {
+ public:
+  TableSinkNodeConsumer(std::shared_ptr<Table>* out,
+                        std::shared_ptr<Schema> output_schema, MemoryPool* pool,
+                        Future<> finish)
+      : out_(out),
+        output_schema_(output_schema),
+        pool_(pool),
+        finish_(std::move(finish)) {}
+
+  Status Consume(ExecBatch batch) override {
+    ARROW_ASSIGN_OR_RAISE(auto rb, batch.ToRecordBatch(output_schema_, pool_));
+    if (rb) {

Review comment:
       I don't think this can ever return `nullptr`, right?

##########
File path: cpp/src/arrow/compute/exec/sink_node.cc
##########
@@ -232,6 +232,77 @@ class ConsumingSinkNode : public ExecNode {
   std::shared_ptr<SinkNodeConsumer> consumer_;
 };
 
+/**
+ * @brief This node is an extension on ConsumingSinkNode
+ * to facilitate to get the output from an execution plan
+ * as a table. We define a custom SinkNodeConsumer to
+ * enable this functionality.
+ */
+
+struct TableSinkNodeConsumer : public arrow::compute::SinkNodeConsumer {
+ public:
+  TableSinkNodeConsumer(std::shared_ptr<Table>* out,
+                        std::shared_ptr<Schema> output_schema, MemoryPool* pool,
+                        Future<> finish)
+      : out_(out),
+        output_schema_(output_schema),
+        pool_(pool),
+        finish_(std::move(finish)) {}
+
+  Status Consume(ExecBatch batch) override {
+    ARROW_ASSIGN_OR_RAISE(auto rb, batch.ToRecordBatch(output_schema_, pool_));
+    if (rb) {
+      batch_vector.push_back(rb);
+    } else {
+      return Status::Invalid("Invalid ExecBatch consumed");
+    }
+    return Status::OK();
+  }
+
+  Future<> Finish() override {
+    ARROW_ASSIGN_OR_RAISE(auto table, Table::FromRecordBatches(batch_vector));
+    *out_ = table;
+    return finish_;
+  }
+
+ private:
+  std::shared_ptr<Table>* out_;
+  std::shared_ptr<Schema> output_schema_;
+  MemoryPool* pool_;
+  Future<> finish_;
+  std::vector<std::shared_ptr<RecordBatch>> batch_vector;
+};
+
+static std::shared_ptr<SinkNodeConsumer> MakeTableSinkConsumer(
+    std::shared_ptr<Table>* out, std::shared_ptr<Schema> output_schema, MemoryPool* pool,
+    Future<> finish) {
+  auto tb_consumer =
+      std::make_shared<TableSinkNodeConsumer>(out, output_schema, pool, finish);
+  return std::move(tb_consumer);
+}
+
+class TableConsumingSinkNode : public ConsumingSinkNode {

Review comment:
       You could even ditch the subclass and just register something like `static Result<ExecNode*> MakeTableConsumingSinkNode()` as a factory. There's no requirement that factories and nodes are 1:1, e.g. look at the scanner: https://github.com/apache/arrow/blob/09e4a5e04c9e39169b5264e0aa98a5cf0945ecba/cpp/src/arrow/dataset/scanner.cc#L838-L859

##########
File path: cpp/src/arrow/compute/exec/plan_test.cc
##########
@@ -488,6 +488,43 @@ TEST(ExecPlanExecution, SourceConsumingSink) {
   }
 }
 
+TEST(ExecPlanExecution, SourceTableConsumingSink) {
+  for (bool slow : {false, true}) {
+    SCOPED_TRACE(slow ? "slowed" : "unslowed");
+
+    for (bool parallel : {false, true}) {
+      SCOPED_TRACE(parallel ? "parallel" : "single threaded");
+      ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
+
+      Future<> finish = Future<>::Make();
+      std::shared_ptr<Table> out;
+
+      auto basic_data = MakeBasicBatches();
+
+      TableSinkNodeOptions options{&out, basic_data.schema, finish};
+
+      ASSERT_OK_AND_ASSIGN(
+          auto source, MakeExecNode("source", plan.get(), {},
+                                    SourceNodeOptions(basic_data.schema,
+                                                      basic_data.gen(parallel, slow))));
+      ASSERT_OK(MakeExecNode("table_sink", plan.get(), {source}, options));
+      ASSERT_OK(plan->StartProducing());
+      // Source should finish fairly quickly
+      ASSERT_FINISHES_OK(source->finished());
+      SleepABit();
+      ASSERT_OK_AND_ASSIGN(auto actual,
+                           TableFromExecBatches(basic_data.schema, basic_data.batches));
+      ASSERT_EQ(5, out->num_rows());
+      AssertTablesEqual(*actual, *out);
+      // Consumer isn't finished and so plan shouldn't have finished
+      AssertNotFinished(plan->finished());
+      // Mark consumption complete, plan should finish
+      finish.MarkFinished();
+      ASSERT_FINISHES_OK(plan->finished());

Review comment:
       This seems to be testing something irrelevant to the issue at hand. We should just `ASSERT_FINISHES_OK(plan->finished())` then ensure the actual and expected tables are equal.

##########
File path: cpp/src/arrow/compute/exec/sink_node.cc
##########
@@ -232,6 +232,77 @@ class ConsumingSinkNode : public ExecNode {
   std::shared_ptr<SinkNodeConsumer> consumer_;
 };
 
+/**
+ * @brief This node is an extension on ConsumingSinkNode
+ * to facilitate to get the output from an execution plan
+ * as a table. We define a custom SinkNodeConsumer to
+ * enable this functionality.
+ */
+
+struct TableSinkNodeConsumer : public arrow::compute::SinkNodeConsumer {
+ public:
+  TableSinkNodeConsumer(std::shared_ptr<Table>* out,
+                        std::shared_ptr<Schema> output_schema, MemoryPool* pool,
+                        Future<> finish)
+      : out_(out),
+        output_schema_(output_schema),
+        pool_(pool),
+        finish_(std::move(finish)) {}
+
+  Status Consume(ExecBatch batch) override {
+    ARROW_ASSIGN_OR_RAISE(auto rb, batch.ToRecordBatch(output_schema_, pool_));
+    if (rb) {
+      batch_vector.push_back(rb);
+    } else {
+      return Status::Invalid("Invalid ExecBatch consumed");
+    }
+    return Status::OK();
+  }
+
+  Future<> Finish() override {
+    ARROW_ASSIGN_OR_RAISE(auto table, Table::FromRecordBatches(batch_vector));
+    *out_ = table;

Review comment:
       nit, but this can just be `ARROW_ASSIGN_OR_RAISE(*out_, Table::FromRecordBatches(std::move(batch_vector)));`
   
   ARROW_ASSIGN_OR_RAISE effectively lets you put anything on the LHS, there is no need to declare a variable there unless you want to.

##########
File path: cpp/src/arrow/compute/exec/sink_node.cc
##########
@@ -232,6 +232,77 @@ class ConsumingSinkNode : public ExecNode {
   std::shared_ptr<SinkNodeConsumer> consumer_;
 };
 
+/**
+ * @brief This node is an extension on ConsumingSinkNode
+ * to facilitate to get the output from an execution plan
+ * as a table. We define a custom SinkNodeConsumer to
+ * enable this functionality.
+ */
+
+struct TableSinkNodeConsumer : public arrow::compute::SinkNodeConsumer {
+ public:
+  TableSinkNodeConsumer(std::shared_ptr<Table>* out,
+                        std::shared_ptr<Schema> output_schema, MemoryPool* pool,
+                        Future<> finish)
+      : out_(out),
+        output_schema_(output_schema),
+        pool_(pool),
+        finish_(std::move(finish)) {}
+
+  Status Consume(ExecBatch batch) override {
+    ARROW_ASSIGN_OR_RAISE(auto rb, batch.ToRecordBatch(output_schema_, pool_));

Review comment:
       Don't we need to lock here? ConsumingSinkNode doesn't synchronize calls to Consume.

##########
File path: cpp/src/arrow/compute/exec/options.h
##########
@@ -270,5 +270,23 @@ class ARROW_EXPORT SelectKSinkNodeOptions : public SinkNodeOptions {
   SelectKOptions select_k_options;
 };
 
+/// \brief Adapt an Table as a sink node
+///
+/// plan->exec_context()->executor() will be used to parallelize pushing to
+/// outputs, if provided.

Review comment:
       nit: I don't think this bit of the docstring is relevant

##########
File path: cpp/src/arrow/compute/exec/options.h
##########
@@ -270,5 +270,23 @@ class ARROW_EXPORT SelectKSinkNodeOptions : public SinkNodeOptions {
   SelectKOptions select_k_options;
 };
 
+/// \brief Adapt an Table as a sink node
+///
+/// plan->exec_context()->executor() will be used to parallelize pushing to
+/// outputs, if provided.
+class ARROW_EXPORT TableSinkNodeOptions : public ExecNodeOptions {
+ public:
+  TableSinkNodeOptions(std::shared_ptr<Table>* output_table,
+                       std::shared_ptr<Schema> output_schema,
+                       Future<> finish = Future<>::Make())

Review comment:
       Hmm, I realize the tests do this, but there's no reason to let the user provide `Future<> finish` right? The consumer implementation can manage its own Future instance and mark it finished itself. (Why make the user manually mark the future finished? How is the user even supposed to know when the node finishes?) I think the tests just have this parameter to test things.

##########
File path: cpp/src/arrow/compute/exec/sink_node.cc
##########
@@ -232,6 +232,77 @@ class ConsumingSinkNode : public ExecNode {
   std::shared_ptr<SinkNodeConsumer> consumer_;
 };
 
+/**
+ * @brief This node is an extension on ConsumingSinkNode
+ * to facilitate to get the output from an execution plan
+ * as a table. We define a custom SinkNodeConsumer to
+ * enable this functionality.
+ */
+
+struct TableSinkNodeConsumer : public arrow::compute::SinkNodeConsumer {
+ public:
+  TableSinkNodeConsumer(std::shared_ptr<Table>* out,
+                        std::shared_ptr<Schema> output_schema, MemoryPool* pool,
+                        Future<> finish)
+      : out_(out),
+        output_schema_(output_schema),

Review comment:
       nit: `std::move` the schema




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org