You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/02/01 02:35:26 UTC

[GitHub] [arrow] vibhatha commented on a change in pull request #12267: ARROW-15258: [C++] Easy options to create a source node from a table

vibhatha commented on a change in pull request #12267:
URL: https://github.com/apache/arrow/pull/12267#discussion_r796225973



##########
File path: cpp/src/arrow/compute/exec/source_node.cc
##########
@@ -169,12 +172,83 @@ struct SourceNode : ExecNode {
   AsyncGenerator<util::optional<ExecBatch>> generator_;
 };
 
+struct TableSourceNode : public SourceNode {
+  TableSourceNode(ExecPlan* plan, std::shared_ptr<Schema> output_schema,
+                  std::shared_ptr<Table> table, int64_t batch_size)
+      : SourceNode(plan, output_schema,
+                   generator(ConvertTableToExecBatches(*table.get()).ValueOrDie())),
+        batch_size(batch_size) {}
+
+  static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
+                                const ExecNodeOptions& options) {
+    RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 0, "TableSourceNode"));
+    const auto& table_options = checked_cast<const TableSourceNodeOptions&>(options);
+    return plan->EmplaceNode<TableSourceNode>(plan, table_options.table->schema(),
+                                              table_options.table,
+                                              table_options.batch_size);
+  }
+  const char* kind_name() const override { return "TableSourceNode"; }
+
+  [[noreturn]] void InputReceived(ExecNode* input, ExecBatch batch) override {
+    SourceNode::InputReceived(input, batch);
+  }
+  [[noreturn]] void ErrorReceived(ExecNode* input, Status status) override {
+    SourceNode::ErrorReceived(input, status);
+  }
+  [[noreturn]] void InputFinished(ExecNode* input, int total_batches) override {
+    SourceNode::InputFinished(input, total_batches);
+  }
+
+  Status StartProducing() override { return SourceNode::StartProducing(); }
+
+  void PauseProducing(ExecNode* output) override { SourceNode::PauseProducing(output); }
+
+  void StopProducing() override { SourceNode::StopProducing(); }
+
+  Future<> finished() override { return SourceNode::finished(); }

Review comment:
       I think it is not needed. It won't half the functionality. 

##########
File path: cpp/src/arrow/compute/exec/source_node.cc
##########
@@ -169,12 +172,83 @@ struct SourceNode : ExecNode {
   AsyncGenerator<util::optional<ExecBatch>> generator_;
 };
 
+struct TableSourceNode : public SourceNode {
+  TableSourceNode(ExecPlan* plan, std::shared_ptr<Schema> output_schema,
+                  std::shared_ptr<Table> table, int64_t batch_size)
+      : SourceNode(plan, output_schema,
+                   generator(ConvertTableToExecBatches(*table.get()).ValueOrDie())),
+        batch_size(batch_size) {}
+
+  static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
+                                const ExecNodeOptions& options) {
+    RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 0, "TableSourceNode"));
+    const auto& table_options = checked_cast<const TableSourceNodeOptions&>(options);
+    return plan->EmplaceNode<TableSourceNode>(plan, table_options.table->schema(),
+                                              table_options.table,
+                                              table_options.batch_size);
+  }
+  const char* kind_name() const override { return "TableSourceNode"; }
+
+  [[noreturn]] void InputReceived(ExecNode* input, ExecBatch batch) override {
+    SourceNode::InputReceived(input, batch);
+  }
+  [[noreturn]] void ErrorReceived(ExecNode* input, Status status) override {
+    SourceNode::ErrorReceived(input, status);
+  }
+  [[noreturn]] void InputFinished(ExecNode* input, int total_batches) override {
+    SourceNode::InputFinished(input, total_batches);
+  }
+
+  Status StartProducing() override { return SourceNode::StartProducing(); }
+
+  void PauseProducing(ExecNode* output) override { SourceNode::PauseProducing(output); }
+
+  void StopProducing() override { SourceNode::StopProducing(); }
+
+  Future<> finished() override { return SourceNode::finished(); }

Review comment:
       I think it is not needed. It won't halm the functionality. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org