You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/01/26 08:52:16 UTC

[GitHub] [arrow] vibhatha opened a new pull request #12267: ARROW-15258: [C++] Easy options to create a source node from a table

vibhatha opened a new pull request #12267:
URL: https://github.com/apache/arrow/pull/12267


   This PR includes the addition of `TableSourceNode` to create a `ExecNode` easily using a table as the data source. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] ursabot edited a comment on pull request #12267: ARROW-15258: [C++] Easy options to create a source node from a table

Posted by GitBox <gi...@apache.org>.
ursabot edited a comment on pull request #12267:
URL: https://github.com/apache/arrow/pull/12267#issuecomment-1050340033


   Benchmark runs are scheduled for baseline = a26506c8ce2496de6ed68f48972d53f78e19a962 and contender = fffdca2b9bf0245d5022b30eafcea6e3f1b57894. fffdca2b9bf0245d5022b30eafcea6e3f1b57894 is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
   Conbench compare runs links:
   [Finished :arrow_down:0.0% :arrow_up:0.0%] [ec2-t3-xlarge-us-east-2](https://conbench.ursa.dev/compare/runs/bf45a6b9c3b5455ca07f055a202c41ea...ea2bed2647b548cb945312217e00f5b8/)
   [Finished :arrow_down:0.5% :arrow_up:0.0%] [test-mac-arm](https://conbench.ursa.dev/compare/runs/8728cfa356c04f658d6946413c9bb73f...5d48286f655941d889a529dd616e56bb/)
   [Finished :arrow_down:1.07% :arrow_up:0.0%] [ursa-i9-9960x](https://conbench.ursa.dev/compare/runs/9d504e0b21664c65a2e4161df19d5f89...ed507d4c5d134fce8ccb009aae63698b/)
   [Finished :arrow_down:0.13% :arrow_up:0.04%] [ursa-thinkcentre-m75q](https://conbench.ursa.dev/compare/runs/c47c37c8a10849839b5cfd1c92ad17e4...4eadc8c85e7a4f26b6d2f51ba2da7dd2/)
   Supported benchmarks:
   ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python. Runs only benchmarks with cloud = True
   test-mac-arm: Supported benchmark langs: C++, Python, R
   ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
   ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] ursabot edited a comment on pull request #12267: ARROW-15258: [C++] Easy options to create a source node from a table

Posted by GitBox <gi...@apache.org>.
ursabot edited a comment on pull request #12267:
URL: https://github.com/apache/arrow/pull/12267#issuecomment-1050340033


   Benchmark runs are scheduled for baseline = a26506c8ce2496de6ed68f48972d53f78e19a962 and contender = fffdca2b9bf0245d5022b30eafcea6e3f1b57894. fffdca2b9bf0245d5022b30eafcea6e3f1b57894 is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
   Conbench compare runs links:
   [Finished :arrow_down:0.0% :arrow_up:0.0%] [ec2-t3-xlarge-us-east-2](https://conbench.ursa.dev/compare/runs/bf45a6b9c3b5455ca07f055a202c41ea...ea2bed2647b548cb945312217e00f5b8/)
   [Scheduled] [test-mac-arm](https://conbench.ursa.dev/compare/runs/8728cfa356c04f658d6946413c9bb73f...5d48286f655941d889a529dd616e56bb/)
   [Scheduled] [ursa-i9-9960x](https://conbench.ursa.dev/compare/runs/9d504e0b21664c65a2e4161df19d5f89...ed507d4c5d134fce8ccb009aae63698b/)
   [Scheduled] [ursa-thinkcentre-m75q](https://conbench.ursa.dev/compare/runs/c47c37c8a10849839b5cfd1c92ad17e4...4eadc8c85e7a4f26b6d2f51ba2da7dd2/)
   Supported benchmarks:
   ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python. Runs only benchmarks with cloud = True
   test-mac-arm: Supported benchmark langs: C++, Python, R
   ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
   ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] vibhatha commented on a change in pull request #12267: ARROW-15258: [C++] Easy options to create a source node from a table

Posted by GitBox <gi...@apache.org>.
vibhatha commented on a change in pull request #12267:
URL: https://github.com/apache/arrow/pull/12267#discussion_r796393003



##########
File path: cpp/src/arrow/compute/exec/source_node.cc
##########
@@ -169,12 +172,83 @@ struct SourceNode : ExecNode {
   AsyncGenerator<util::optional<ExecBatch>> generator_;
 };
 
+struct TableSourceNode : public SourceNode {
+  TableSourceNode(ExecPlan* plan, std::shared_ptr<Schema> output_schema,
+                  std::shared_ptr<Table> table, int64_t batch_size)
+      : SourceNode(plan, output_schema,
+                   generator(ConvertTableToExecBatches(*table.get()).ValueOrDie())),
+        batch_size(batch_size) {}
+
+  static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
+                                const ExecNodeOptions& options) {
+    RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 0, "TableSourceNode"));
+    const auto& table_options = checked_cast<const TableSourceNodeOptions&>(options);
+    return plan->EmplaceNode<TableSourceNode>(plan, table_options.table->schema(),
+                                              table_options.table,
+                                              table_options.batch_size);
+  }
+  const char* kind_name() const override { return "TableSourceNode"; }
+
+  [[noreturn]] void InputReceived(ExecNode* input, ExecBatch batch) override {
+    SourceNode::InputReceived(input, batch);
+  }
+  [[noreturn]] void ErrorReceived(ExecNode* input, Status status) override {
+    SourceNode::ErrorReceived(input, status);
+  }
+  [[noreturn]] void InputFinished(ExecNode* input, int total_batches) override {
+    SourceNode::InputFinished(input, total_batches);
+  }
+
+  Status StartProducing() override { return SourceNode::StartProducing(); }
+
+  void PauseProducing(ExecNode* output) override { SourceNode::PauseProducing(output); }
+
+  void StopProducing() override { SourceNode::StopProducing(); }
+
+  Future<> finished() override { return SourceNode::finished(); }
+
+  arrow::AsyncGenerator<util::optional<ExecBatch>> generator(
+      std::vector<ExecBatch> batches) {
+    auto opt_batches = MapVector(
+        [](ExecBatch batch) { return util::make_optional(std::move(batch)); }, batches);
+    AsyncGenerator<util::optional<ExecBatch>> gen;
+    gen = MakeVectorGenerator(std::move(opt_batches));
+    return gen;
+  }
+
+  arrow::Result<std::vector<ExecBatch>> ConvertTableToExecBatches(const Table& table) {
+    std::shared_ptr<TableBatchReader> reader = std::make_shared<TableBatchReader>(table);
+
+    // setting chunksize for the batch reader
+    if (batch_size > 0) {
+      reader->set_chunksize(batch_size);
+    }
+
+    std::shared_ptr<arrow::RecordBatch> batch;
+    std::vector<std::shared_ptr<arrow::RecordBatch>> batch_vector;
+    std::vector<ExecBatch> exec_batches;
+    while (true) {
+      ARROW_ASSIGN_OR_RAISE(batch, reader->Next());

Review comment:
       Instead what if we add 
   
   ```c++
   auto batch_res = reader->Next();
       if (batch_res.ok()) {
         batch = batch_res.ValueOrDie();
       }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] vibhatha commented on a change in pull request #12267: ARROW-15258: [C++] Easy options to create a source node from a table

Posted by GitBox <gi...@apache.org>.
vibhatha commented on a change in pull request #12267:
URL: https://github.com/apache/arrow/pull/12267#discussion_r793143500



##########
File path: cpp/src/arrow/compute/exec/options.h
##########
@@ -52,6 +52,19 @@ class ARROW_EXPORT SourceNodeOptions : public ExecNodeOptions {
   std::function<Future<util::optional<ExecBatch>>()> generator;
 };
 
+/// \brief Adapt an Table as a source node
+///
+/// plan->exec_context()->executor() will be used to parallelize pushing to
+/// outputs, if provided.
+class ARROW_EXPORT TableSourceNodeOptions : public ExecNodeOptions {
+ public:
+  TableSourceNodeOptions(std::shared_ptr<Table> table, int64_t max_chunksize)
+      : table(table), max_chunksize(max_chunksize) {}
+
+  std::shared_ptr<Table> table;
+  int64_t max_chunksize;

Review comment:
       Of course. I will modify it. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] ursabot commented on pull request #12267: ARROW-15258: [C++] Easy options to create a source node from a table

Posted by GitBox <gi...@apache.org>.
ursabot commented on pull request #12267:
URL: https://github.com/apache/arrow/pull/12267#issuecomment-1050340033


   Benchmark runs are scheduled for baseline = a26506c8ce2496de6ed68f48972d53f78e19a962 and contender = fffdca2b9bf0245d5022b30eafcea6e3f1b57894. fffdca2b9bf0245d5022b30eafcea6e3f1b57894 is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
   Conbench compare runs links:
   [Scheduled] [ec2-t3-xlarge-us-east-2](https://conbench.ursa.dev/compare/runs/bf45a6b9c3b5455ca07f055a202c41ea...ea2bed2647b548cb945312217e00f5b8/)
   [Scheduled] [test-mac-arm](https://conbench.ursa.dev/compare/runs/8728cfa356c04f658d6946413c9bb73f...5d48286f655941d889a529dd616e56bb/)
   [Scheduled] [ursa-i9-9960x](https://conbench.ursa.dev/compare/runs/9d504e0b21664c65a2e4161df19d5f89...ed507d4c5d134fce8ccb009aae63698b/)
   [Scheduled] [ursa-thinkcentre-m75q](https://conbench.ursa.dev/compare/runs/c47c37c8a10849839b5cfd1c92ad17e4...4eadc8c85e7a4f26b6d2f51ba2da7dd2/)
   Supported benchmarks:
   ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python. Runs only benchmarks with cloud = True
   test-mac-arm: Supported benchmark langs: C++, Python, R
   ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
   ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] vibhatha commented on a change in pull request #12267: ARROW-15258: [C++] Easy options to create a source node from a table

Posted by GitBox <gi...@apache.org>.
vibhatha commented on a change in pull request #12267:
URL: https://github.com/apache/arrow/pull/12267#discussion_r796245428



##########
File path: cpp/src/arrow/compute/exec/source_node.cc
##########
@@ -169,12 +172,83 @@ struct SourceNode : ExecNode {
   AsyncGenerator<util::optional<ExecBatch>> generator_;
 };
 
+struct TableSourceNode : public SourceNode {
+  TableSourceNode(ExecPlan* plan, std::shared_ptr<Schema> output_schema,
+                  std::shared_ptr<Table> table, int64_t batch_size)
+      : SourceNode(plan, output_schema,
+                   generator(ConvertTableToExecBatches(*table.get()).ValueOrDie())),
+        batch_size(batch_size) {}
+
+  static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
+                                const ExecNodeOptions& options) {
+    RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 0, "TableSourceNode"));
+    const auto& table_options = checked_cast<const TableSourceNodeOptions&>(options);
+    return plan->EmplaceNode<TableSourceNode>(plan, table_options.table->schema(),
+                                              table_options.table,
+                                              table_options.batch_size);
+  }
+  const char* kind_name() const override { return "TableSourceNode"; }
+
+  [[noreturn]] void InputReceived(ExecNode* input, ExecBatch batch) override {
+    SourceNode::InputReceived(input, batch);
+  }
+  [[noreturn]] void ErrorReceived(ExecNode* input, Status status) override {
+    SourceNode::ErrorReceived(input, status);
+  }
+  [[noreturn]] void InputFinished(ExecNode* input, int total_batches) override {
+    SourceNode::InputFinished(input, total_batches);
+  }
+
+  Status StartProducing() override { return SourceNode::StartProducing(); }
+
+  void PauseProducing(ExecNode* output) override { SourceNode::PauseProducing(output); }
+
+  void StopProducing() override { SourceNode::StopProducing(); }
+
+  Future<> finished() override { return SourceNode::finished(); }
+
+  arrow::AsyncGenerator<util::optional<ExecBatch>> generator(
+      std::vector<ExecBatch> batches) {
+    auto opt_batches = MapVector(
+        [](ExecBatch batch) { return util::make_optional(std::move(batch)); }, batches);
+    AsyncGenerator<util::optional<ExecBatch>> gen;
+    gen = MakeVectorGenerator(std::move(opt_batches));
+    return gen;
+  }
+
+  arrow::Result<std::vector<ExecBatch>> ConvertTableToExecBatches(const Table& table) {
+    std::shared_ptr<TableBatchReader> reader = std::make_shared<TableBatchReader>(table);
+
+    // setting chunksize for the batch reader
+    if (batch_size > 0) {
+      reader->set_chunksize(batch_size);
+    }
+
+    std::shared_ptr<arrow::RecordBatch> batch;
+    std::vector<std::shared_ptr<arrow::RecordBatch>> batch_vector;
+    std::vector<ExecBatch> exec_batches;
+    while (true) {
+      ARROW_ASSIGN_OR_RAISE(batch, reader->Next());

Review comment:
       Yes, this is indeed neater and safe. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace closed pull request #12267: ARROW-15258: [C++] Easy options to create a source node from a table

Posted by GitBox <gi...@apache.org>.
westonpace closed pull request #12267:
URL: https://github.com/apache/arrow/pull/12267


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] vibhatha commented on a change in pull request #12267: ARROW-15258: [C++] Easy options to create a source node from a table

Posted by GitBox <gi...@apache.org>.
vibhatha commented on a change in pull request #12267:
URL: https://github.com/apache/arrow/pull/12267#discussion_r806544545



##########
File path: cpp/src/arrow/compute/exec/source_node.cc
##########
@@ -174,12 +177,82 @@ struct SourceNode : ExecNode {
   AsyncGenerator<util::optional<ExecBatch>> generator_;
 };
 
+struct TableSourceNode : public SourceNode {
+  TableSourceNode(ExecPlan* plan, std::shared_ptr<Table> table, int64_t batch_size)
+      : SourceNode(plan, table->schema(), TableGenerator(*table.get(), batch_size)) {}
+
+  static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
+                                const ExecNodeOptions& options) {
+    RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 0, "TableSourceNode"));
+    const auto& table_options = checked_cast<const TableSourceNodeOptions&>(options);
+    const auto table = table_options.table;
+    const int64_t batch_size = table_options.batch_size;
+
+    RETURN_NOT_OK(ValidateTableSourceNodeInpute(table, batch_size, "TableSourceNode"));
+
+    return plan->EmplaceNode<TableSourceNode>(plan, table, batch_size);
+  }
+
+  const char* kind_name() const override { return "TableSourceNode"; }
+
+  static arrow::Status ValidateTableSourceNodeInpute(const std::shared_ptr<Table> table,
+                                                     const int64_t batch_size,
+                                                     const char* kind_name) {

Review comment:
       👍 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] vibhatha commented on a change in pull request #12267: ARROW-15258: [C++] Easy options to create a source node from a table

Posted by GitBox <gi...@apache.org>.
vibhatha commented on a change in pull request #12267:
URL: https://github.com/apache/arrow/pull/12267#discussion_r796225973



##########
File path: cpp/src/arrow/compute/exec/source_node.cc
##########
@@ -169,12 +172,83 @@ struct SourceNode : ExecNode {
   AsyncGenerator<util::optional<ExecBatch>> generator_;
 };
 
+struct TableSourceNode : public SourceNode {
+  TableSourceNode(ExecPlan* plan, std::shared_ptr<Schema> output_schema,
+                  std::shared_ptr<Table> table, int64_t batch_size)
+      : SourceNode(plan, output_schema,
+                   generator(ConvertTableToExecBatches(*table.get()).ValueOrDie())),
+        batch_size(batch_size) {}
+
+  static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
+                                const ExecNodeOptions& options) {
+    RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 0, "TableSourceNode"));
+    const auto& table_options = checked_cast<const TableSourceNodeOptions&>(options);
+    return plan->EmplaceNode<TableSourceNode>(plan, table_options.table->schema(),
+                                              table_options.table,
+                                              table_options.batch_size);
+  }
+  const char* kind_name() const override { return "TableSourceNode"; }
+
+  [[noreturn]] void InputReceived(ExecNode* input, ExecBatch batch) override {
+    SourceNode::InputReceived(input, batch);
+  }
+  [[noreturn]] void ErrorReceived(ExecNode* input, Status status) override {
+    SourceNode::ErrorReceived(input, status);
+  }
+  [[noreturn]] void InputFinished(ExecNode* input, int total_batches) override {
+    SourceNode::InputFinished(input, total_batches);
+  }
+
+  Status StartProducing() override { return SourceNode::StartProducing(); }
+
+  void PauseProducing(ExecNode* output) override { SourceNode::PauseProducing(output); }
+
+  void StopProducing() override { SourceNode::StopProducing(); }
+
+  Future<> finished() override { return SourceNode::finished(); }

Review comment:
       I think it is not needed. It won't half the functionality. 

##########
File path: cpp/src/arrow/compute/exec/source_node.cc
##########
@@ -169,12 +172,83 @@ struct SourceNode : ExecNode {
   AsyncGenerator<util::optional<ExecBatch>> generator_;
 };
 
+struct TableSourceNode : public SourceNode {
+  TableSourceNode(ExecPlan* plan, std::shared_ptr<Schema> output_schema,
+                  std::shared_ptr<Table> table, int64_t batch_size)
+      : SourceNode(plan, output_schema,
+                   generator(ConvertTableToExecBatches(*table.get()).ValueOrDie())),
+        batch_size(batch_size) {}
+
+  static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
+                                const ExecNodeOptions& options) {
+    RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 0, "TableSourceNode"));
+    const auto& table_options = checked_cast<const TableSourceNodeOptions&>(options);
+    return plan->EmplaceNode<TableSourceNode>(plan, table_options.table->schema(),
+                                              table_options.table,
+                                              table_options.batch_size);
+  }
+  const char* kind_name() const override { return "TableSourceNode"; }
+
+  [[noreturn]] void InputReceived(ExecNode* input, ExecBatch batch) override {
+    SourceNode::InputReceived(input, batch);
+  }
+  [[noreturn]] void ErrorReceived(ExecNode* input, Status status) override {
+    SourceNode::ErrorReceived(input, status);
+  }
+  [[noreturn]] void InputFinished(ExecNode* input, int total_batches) override {
+    SourceNode::InputFinished(input, total_batches);
+  }
+
+  Status StartProducing() override { return SourceNode::StartProducing(); }
+
+  void PauseProducing(ExecNode* output) override { SourceNode::PauseProducing(output); }
+
+  void StopProducing() override { SourceNode::StopProducing(); }
+
+  Future<> finished() override { return SourceNode::finished(); }

Review comment:
       I think it is not needed. It won't halm the functionality. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] vibhatha commented on a change in pull request #12267: ARROW-15258: [C++] Easy options to create a source node from a table

Posted by GitBox <gi...@apache.org>.
vibhatha commented on a change in pull request #12267:
URL: https://github.com/apache/arrow/pull/12267#discussion_r796245719



##########
File path: cpp/src/arrow/compute/exec/source_node.cc
##########
@@ -169,12 +172,83 @@ struct SourceNode : ExecNode {
   AsyncGenerator<util::optional<ExecBatch>> generator_;
 };
 
+struct TableSourceNode : public SourceNode {
+  TableSourceNode(ExecPlan* plan, std::shared_ptr<Schema> output_schema,
+                  std::shared_ptr<Table> table, int64_t batch_size)
+      : SourceNode(plan, output_schema,
+                   generator(ConvertTableToExecBatches(*table.get()).ValueOrDie())),
+        batch_size(batch_size) {}
+
+  static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
+                                const ExecNodeOptions& options) {
+    RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 0, "TableSourceNode"));
+    const auto& table_options = checked_cast<const TableSourceNodeOptions&>(options);
+    return plan->EmplaceNode<TableSourceNode>(plan, table_options.table->schema(),
+                                              table_options.table,
+                                              table_options.batch_size);
+  }
+  const char* kind_name() const override { return "TableSourceNode"; }
+
+  [[noreturn]] void InputReceived(ExecNode* input, ExecBatch batch) override {
+    SourceNode::InputReceived(input, batch);
+  }
+  [[noreturn]] void ErrorReceived(ExecNode* input, Status status) override {
+    SourceNode::ErrorReceived(input, status);
+  }
+  [[noreturn]] void InputFinished(ExecNode* input, int total_batches) override {
+    SourceNode::InputFinished(input, total_batches);
+  }
+
+  Status StartProducing() override { return SourceNode::StartProducing(); }
+
+  void PauseProducing(ExecNode* output) override { SourceNode::PauseProducing(output); }
+
+  void StopProducing() override { SourceNode::StopProducing(); }
+
+  Future<> finished() override { return SourceNode::finished(); }
+
+  arrow::AsyncGenerator<util::optional<ExecBatch>> generator(
+      std::vector<ExecBatch> batches) {
+    auto opt_batches = MapVector(
+        [](ExecBatch batch) { return util::make_optional(std::move(batch)); }, batches);
+    AsyncGenerator<util::optional<ExecBatch>> gen;
+    gen = MakeVectorGenerator(std::move(opt_batches));
+    return gen;
+  }
+
+  arrow::Result<std::vector<ExecBatch>> ConvertTableToExecBatches(const Table& table) {
+    std::shared_ptr<TableBatchReader> reader = std::make_shared<TableBatchReader>(table);
+
+    // setting chunksize for the batch reader
+    if (batch_size > 0) {
+      reader->set_chunksize(batch_size);
+    }
+
+    std::shared_ptr<arrow::RecordBatch> batch;
+    std::vector<std::shared_ptr<arrow::RecordBatch>> batch_vector;

Review comment:
       no these were supposed to use, but I haven't actually used them for any thing. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] vibhatha commented on a change in pull request #12267: ARROW-15258: [C++] Easy options to create a source node from a table

Posted by GitBox <gi...@apache.org>.
vibhatha commented on a change in pull request #12267:
URL: https://github.com/apache/arrow/pull/12267#discussion_r806541037



##########
File path: cpp/src/arrow/compute/exec/source_node.cc
##########
@@ -174,12 +177,82 @@ struct SourceNode : ExecNode {
   AsyncGenerator<util::optional<ExecBatch>> generator_;
 };
 
+struct TableSourceNode : public SourceNode {
+  TableSourceNode(ExecPlan* plan, std::shared_ptr<Table> table, int64_t batch_size)
+      : SourceNode(plan, table->schema(), TableGenerator(*table.get(), batch_size)) {}
+
+  static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
+                                const ExecNodeOptions& options) {
+    RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 0, "TableSourceNode"));
+    const auto& table_options = checked_cast<const TableSourceNodeOptions&>(options);
+    const auto table = table_options.table;
+    const int64_t batch_size = table_options.batch_size;
+
+    RETURN_NOT_OK(ValidateTableSourceNodeInpute(table, batch_size, "TableSourceNode"));
+
+    return plan->EmplaceNode<TableSourceNode>(plan, table, batch_size);
+  }
+
+  const char* kind_name() const override { return "TableSourceNode"; }
+
+  static arrow::Status ValidateTableSourceNodeInpute(const std::shared_ptr<Table> table,

Review comment:
       👍  

##########
File path: cpp/src/arrow/compute/exec/source_node.cc
##########
@@ -174,12 +177,82 @@ struct SourceNode : ExecNode {
   AsyncGenerator<util::optional<ExecBatch>> generator_;
 };
 
+struct TableSourceNode : public SourceNode {
+  TableSourceNode(ExecPlan* plan, std::shared_ptr<Table> table, int64_t batch_size)
+      : SourceNode(plan, table->schema(), TableGenerator(*table.get(), batch_size)) {}
+
+  static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
+                                const ExecNodeOptions& options) {
+    RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 0, "TableSourceNode"));
+    const auto& table_options = checked_cast<const TableSourceNodeOptions&>(options);
+    const auto table = table_options.table;
+    const int64_t batch_size = table_options.batch_size;
+
+    RETURN_NOT_OK(ValidateTableSourceNodeInpute(table, batch_size, "TableSourceNode"));
+
+    return plan->EmplaceNode<TableSourceNode>(plan, table, batch_size);
+  }
+
+  const char* kind_name() const override { return "TableSourceNode"; }
+
+  static arrow::Status ValidateTableSourceNodeInpute(const std::shared_ptr<Table> table,
+                                                     const int64_t batch_size,
+                                                     const char* kind_name) {

Review comment:
       👍 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] ursabot edited a comment on pull request #12267: ARROW-15258: [C++] Easy options to create a source node from a table

Posted by GitBox <gi...@apache.org>.
ursabot edited a comment on pull request #12267:
URL: https://github.com/apache/arrow/pull/12267#issuecomment-1050340033


   Benchmark runs are scheduled for baseline = a26506c8ce2496de6ed68f48972d53f78e19a962 and contender = fffdca2b9bf0245d5022b30eafcea6e3f1b57894. fffdca2b9bf0245d5022b30eafcea6e3f1b57894 is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
   Conbench compare runs links:
   [Finished :arrow_down:0.0% :arrow_up:0.0%] [ec2-t3-xlarge-us-east-2](https://conbench.ursa.dev/compare/runs/bf45a6b9c3b5455ca07f055a202c41ea...ea2bed2647b548cb945312217e00f5b8/)
   [Finished :arrow_down:0.5% :arrow_up:0.0%] [test-mac-arm](https://conbench.ursa.dev/compare/runs/8728cfa356c04f658d6946413c9bb73f...5d48286f655941d889a529dd616e56bb/)
   [Scheduled] [ursa-i9-9960x](https://conbench.ursa.dev/compare/runs/9d504e0b21664c65a2e4161df19d5f89...ed507d4c5d134fce8ccb009aae63698b/)
   [Finished :arrow_down:0.13% :arrow_up:0.04%] [ursa-thinkcentre-m75q](https://conbench.ursa.dev/compare/runs/c47c37c8a10849839b5cfd1c92ad17e4...4eadc8c85e7a4f26b6d2f51ba2da7dd2/)
   Supported benchmarks:
   ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python. Runs only benchmarks with cloud = True
   test-mac-arm: Supported benchmark langs: C++, Python, R
   ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
   ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #12267: ARROW-15258: [C++] Easy options to create a source node from a table

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #12267:
URL: https://github.com/apache/arrow/pull/12267#discussion_r792945719



##########
File path: cpp/src/arrow/compute/exec/options.h
##########
@@ -52,6 +52,19 @@ class ARROW_EXPORT SourceNodeOptions : public ExecNodeOptions {
   std::function<Future<util::optional<ExecBatch>>()> generator;
 };
 
+/// \brief Adapt an Table as a source node
+///
+/// plan->exec_context()->executor() will be used to parallelize pushing to
+/// outputs, if provided.
+class ARROW_EXPORT TableSourceNodeOptions : public ExecNodeOptions {
+ public:
+  TableSourceNodeOptions(std::shared_ptr<Table> table, int64_t max_chunksize)
+      : table(table), max_chunksize(max_chunksize) {}
+
+  std::shared_ptr<Table> table;
+  int64_t max_chunksize;

Review comment:
       Can we use `batch_size` instead of `max_chunksize`?  That will help make it clear that this is an analogue of `ScanNodeOptions::ScanNode::batch_size`.  Also, in my mental model, I think of a "chunk" as a bunch of bytes (or a subsection of an array) and a "batch" as a bunch of rows (I think we're fairly consistent with this in Arrow but not sure 100%)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #12267: ARROW-15258: [C++] Easy options to create a source node from a table

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #12267:
URL: https://github.com/apache/arrow/pull/12267#discussion_r794164590



##########
File path: cpp/src/arrow/compute/exec/options.h
##########
@@ -52,6 +52,19 @@ class ARROW_EXPORT SourceNodeOptions : public ExecNodeOptions {
   std::function<Future<util::optional<ExecBatch>>()> generator;
 };
 
+/// \brief Adapt an Table as a source node
+///
+/// plan->exec_context()->executor() will be used to parallelize pushing to
+/// outputs, if provided.
+class ARROW_EXPORT TableSourceNodeOptions : public ExecNodeOptions {
+ public:
+  TableSourceNodeOptions(std::shared_ptr<Table> table, int64_t batch_size)
+      : table(table), batch_size(batch_size) {}
+
+  std::shared_ptr<Table> table;
+  int64_t batch_size;

Review comment:
       Can you add docstrings for these fields?  They should hopefully be fairly obvious so I don't think we need too much information but just for consistency since we are pretty good about this elsewhere in this file.

##########
File path: cpp/src/arrow/compute/exec/source_node.cc
##########
@@ -169,12 +172,83 @@ struct SourceNode : ExecNode {
   AsyncGenerator<util::optional<ExecBatch>> generator_;
 };
 
+struct TableSourceNode : public SourceNode {
+  TableSourceNode(ExecPlan* plan, std::shared_ptr<Schema> output_schema,
+                  std::shared_ptr<Table> table, int64_t batch_size)
+      : SourceNode(plan, output_schema,
+                   generator(ConvertTableToExecBatches(*table.get()).ValueOrDie())),
+        batch_size(batch_size) {}
+
+  static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
+                                const ExecNodeOptions& options) {
+    RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 0, "TableSourceNode"));
+    const auto& table_options = checked_cast<const TableSourceNodeOptions&>(options);
+    return plan->EmplaceNode<TableSourceNode>(plan, table_options.table->schema(),
+                                              table_options.table,
+                                              table_options.batch_size);
+  }
+  const char* kind_name() const override { return "TableSourceNode"; }
+
+  [[noreturn]] void InputReceived(ExecNode* input, ExecBatch batch) override {
+    SourceNode::InputReceived(input, batch);
+  }
+  [[noreturn]] void ErrorReceived(ExecNode* input, Status status) override {
+    SourceNode::ErrorReceived(input, status);
+  }
+  [[noreturn]] void InputFinished(ExecNode* input, int total_batches) override {
+    SourceNode::InputFinished(input, total_batches);
+  }
+
+  Status StartProducing() override { return SourceNode::StartProducing(); }
+
+  void PauseProducing(ExecNode* output) override { SourceNode::PauseProducing(output); }
+
+  void StopProducing() override { SourceNode::StopProducing(); }
+
+  Future<> finished() override { return SourceNode::finished(); }

Review comment:
       Do these methods need to be overridden?  Will it work if we omit this?

##########
File path: cpp/src/arrow/compute/exec/source_node.cc
##########
@@ -169,12 +172,83 @@ struct SourceNode : ExecNode {
   AsyncGenerator<util::optional<ExecBatch>> generator_;
 };
 
+struct TableSourceNode : public SourceNode {
+  TableSourceNode(ExecPlan* plan, std::shared_ptr<Schema> output_schema,
+                  std::shared_ptr<Table> table, int64_t batch_size)
+      : SourceNode(plan, output_schema,
+                   generator(ConvertTableToExecBatches(*table.get()).ValueOrDie())),
+        batch_size(batch_size) {}
+
+  static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
+                                const ExecNodeOptions& options) {
+    RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 0, "TableSourceNode"));
+    const auto& table_options = checked_cast<const TableSourceNodeOptions&>(options);
+    return plan->EmplaceNode<TableSourceNode>(plan, table_options.table->schema(),
+                                              table_options.table,
+                                              table_options.batch_size);
+  }
+  const char* kind_name() const override { return "TableSourceNode"; }
+
+  [[noreturn]] void InputReceived(ExecNode* input, ExecBatch batch) override {
+    SourceNode::InputReceived(input, batch);
+  }
+  [[noreturn]] void ErrorReceived(ExecNode* input, Status status) override {
+    SourceNode::ErrorReceived(input, status);
+  }
+  [[noreturn]] void InputFinished(ExecNode* input, int total_batches) override {
+    SourceNode::InputFinished(input, total_batches);
+  }
+
+  Status StartProducing() override { return SourceNode::StartProducing(); }
+
+  void PauseProducing(ExecNode* output) override { SourceNode::PauseProducing(output); }
+
+  void StopProducing() override { SourceNode::StopProducing(); }
+
+  Future<> finished() override { return SourceNode::finished(); }
+
+  arrow::AsyncGenerator<util::optional<ExecBatch>> generator(
+      std::vector<ExecBatch> batches) {
+    auto opt_batches = MapVector(
+        [](ExecBatch batch) { return util::make_optional(std::move(batch)); }, batches);
+    AsyncGenerator<util::optional<ExecBatch>> gen;
+    gen = MakeVectorGenerator(std::move(opt_batches));
+    return gen;
+  }
+
+  arrow::Result<std::vector<ExecBatch>> ConvertTableToExecBatches(const Table& table) {
+    std::shared_ptr<TableBatchReader> reader = std::make_shared<TableBatchReader>(table);
+
+    // setting chunksize for the batch reader
+    if (batch_size > 0) {
+      reader->set_chunksize(batch_size);
+    }
+
+    std::shared_ptr<arrow::RecordBatch> batch;
+    std::vector<std::shared_ptr<arrow::RecordBatch>> batch_vector;
+    std::vector<ExecBatch> exec_batches;
+    while (true) {
+      ARROW_ASSIGN_OR_RAISE(batch, reader->Next());

Review comment:
       Can you use `ASSIGN_OR_ABORT` and add a comment explaining that a TableBatchReader should not be able to fail but always returns an OK status to comply with the RecordBatchReader interface.
   
   Then you can change the return value to `std::vector<ExecBatch>` and get rid of the `ValueOrDie`
   
   Otherwise I don't think it would be safe to be doing this kind of work in a constructor (and we should be doing it in the `Make` method instead so we can propagate the failure).

##########
File path: cpp/src/arrow/compute/exec/plan_test.cc
##########
@@ -238,6 +238,34 @@ TEST(ExecPlanExecution, SourceSink) {
   }
 }
 
+TEST(ExecPlanExecution, TableSourceSink) {
+  for (bool slow : {false, true}) {
+    SCOPED_TRACE(slow ? "slowed" : "unslowed");
+
+    for (bool parallel : {false, true}) {
+      SCOPED_TRACE(parallel ? "parallel" : "single threaded");
+
+      ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
+      AsyncGenerator<util::optional<ExecBatch>> sink_gen;
+
+      auto exp_batches = MakeBasicBatches();
+      ASSERT_OK_AND_ASSIGN(auto table,
+                           TableFromExecBatches(exp_batches.schema, exp_batches.batches));
+
+      ASSERT_OK(
+          Declaration::Sequence({
+                                    {"table_source", TableSourceNodeOptions{table, 1}},

Review comment:
       Can you test two different batch sizes?  I like 1 as a test case but can you also test 4 (there are no batches of size 4 in the table but this should still be a valid setting, it won't concatenate batches and will just emit the undersized batches which is ok).

##########
File path: cpp/src/arrow/compute/exec/source_node.cc
##########
@@ -169,12 +172,83 @@ struct SourceNode : ExecNode {
   AsyncGenerator<util::optional<ExecBatch>> generator_;
 };
 
+struct TableSourceNode : public SourceNode {
+  TableSourceNode(ExecPlan* plan, std::shared_ptr<Schema> output_schema,
+                  std::shared_ptr<Table> table, int64_t batch_size)
+      : SourceNode(plan, output_schema,
+                   generator(ConvertTableToExecBatches(*table.get()).ValueOrDie())),
+        batch_size(batch_size) {}
+
+  static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
+                                const ExecNodeOptions& options) {
+    RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 0, "TableSourceNode"));
+    const auto& table_options = checked_cast<const TableSourceNodeOptions&>(options);
+    return plan->EmplaceNode<TableSourceNode>(plan, table_options.table->schema(),
+                                              table_options.table,
+                                              table_options.batch_size);
+  }
+  const char* kind_name() const override { return "TableSourceNode"; }
+
+  [[noreturn]] void InputReceived(ExecNode* input, ExecBatch batch) override {
+    SourceNode::InputReceived(input, batch);
+  }
+  [[noreturn]] void ErrorReceived(ExecNode* input, Status status) override {
+    SourceNode::ErrorReceived(input, status);
+  }
+  [[noreturn]] void InputFinished(ExecNode* input, int total_batches) override {
+    SourceNode::InputFinished(input, total_batches);
+  }
+
+  Status StartProducing() override { return SourceNode::StartProducing(); }
+
+  void PauseProducing(ExecNode* output) override { SourceNode::PauseProducing(output); }
+
+  void StopProducing() override { SourceNode::StopProducing(); }
+
+  Future<> finished() override { return SourceNode::finished(); }
+
+  arrow::AsyncGenerator<util::optional<ExecBatch>> generator(

Review comment:
       Can we make this a static method named something like `TableToGenerator`?  It looks like a property accessor here but it isn't accessing the instance state and it is doing too much work.

##########
File path: cpp/src/arrow/compute/exec/source_node.cc
##########
@@ -169,12 +172,83 @@ struct SourceNode : ExecNode {
   AsyncGenerator<util::optional<ExecBatch>> generator_;
 };
 
+struct TableSourceNode : public SourceNode {
+  TableSourceNode(ExecPlan* plan, std::shared_ptr<Schema> output_schema,
+                  std::shared_ptr<Table> table, int64_t batch_size)
+      : SourceNode(plan, output_schema,
+                   generator(ConvertTableToExecBatches(*table.get()).ValueOrDie())),

Review comment:
       Nit: Rather than call `ConvertTableToExecBatches` here can we call it in the `generator` method.  That will be slightly more readable.
   
   It would be good if we could get rid of the `ValueOrDie` too.

##########
File path: cpp/src/arrow/compute/exec/options.h
##########
@@ -52,6 +52,19 @@ class ARROW_EXPORT SourceNodeOptions : public ExecNodeOptions {
   std::function<Future<util::optional<ExecBatch>>()> generator;
 };
 
+/// \brief Adapt an Table as a source node
+///
+/// plan->exec_context()->executor() will be used to parallelize pushing to
+/// outputs, if provided.

Review comment:
       I'm not sure how much this comment adds value.  Can we reword it?  Maybe something like:
   
   The table will be sent through the exec plan in batches.  Each batch will be submitted as a new thread task if plan->exec_context()->executor() is not null.

##########
File path: cpp/src/arrow/compute/exec/source_node.cc
##########
@@ -169,12 +172,83 @@ struct SourceNode : ExecNode {
   AsyncGenerator<util::optional<ExecBatch>> generator_;
 };
 
+struct TableSourceNode : public SourceNode {
+  TableSourceNode(ExecPlan* plan, std::shared_ptr<Schema> output_schema,
+                  std::shared_ptr<Table> table, int64_t batch_size)
+      : SourceNode(plan, output_schema,
+                   generator(ConvertTableToExecBatches(*table.get()).ValueOrDie())),
+        batch_size(batch_size) {}
+
+  static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
+                                const ExecNodeOptions& options) {
+    RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 0, "TableSourceNode"));
+    const auto& table_options = checked_cast<const TableSourceNodeOptions&>(options);
+    return plan->EmplaceNode<TableSourceNode>(plan, table_options.table->schema(),
+                                              table_options.table,
+                                              table_options.batch_size);
+  }
+  const char* kind_name() const override { return "TableSourceNode"; }
+
+  [[noreturn]] void InputReceived(ExecNode* input, ExecBatch batch) override {
+    SourceNode::InputReceived(input, batch);
+  }
+  [[noreturn]] void ErrorReceived(ExecNode* input, Status status) override {
+    SourceNode::ErrorReceived(input, status);
+  }
+  [[noreturn]] void InputFinished(ExecNode* input, int total_batches) override {
+    SourceNode::InputFinished(input, total_batches);
+  }
+
+  Status StartProducing() override { return SourceNode::StartProducing(); }
+
+  void PauseProducing(ExecNode* output) override { SourceNode::PauseProducing(output); }
+
+  void StopProducing() override { SourceNode::StopProducing(); }
+
+  Future<> finished() override { return SourceNode::finished(); }
+
+  arrow::AsyncGenerator<util::optional<ExecBatch>> generator(
+      std::vector<ExecBatch> batches) {
+    auto opt_batches = MapVector(
+        [](ExecBatch batch) { return util::make_optional(std::move(batch)); }, batches);
+    AsyncGenerator<util::optional<ExecBatch>> gen;
+    gen = MakeVectorGenerator(std::move(opt_batches));
+    return gen;
+  }
+
+  arrow::Result<std::vector<ExecBatch>> ConvertTableToExecBatches(const Table& table) {
+    std::shared_ptr<TableBatchReader> reader = std::make_shared<TableBatchReader>(table);
+
+    // setting chunksize for the batch reader
+    if (batch_size > 0) {
+      reader->set_chunksize(batch_size);
+    }
+
+    std::shared_ptr<arrow::RecordBatch> batch;
+    std::vector<std::shared_ptr<arrow::RecordBatch>> batch_vector;

Review comment:
       Is this used?

##########
File path: cpp/src/arrow/compute/exec/source_node.cc
##########
@@ -169,12 +172,83 @@ struct SourceNode : ExecNode {
   AsyncGenerator<util::optional<ExecBatch>> generator_;
 };
 
+struct TableSourceNode : public SourceNode {
+  TableSourceNode(ExecPlan* plan, std::shared_ptr<Schema> output_schema,
+                  std::shared_ptr<Table> table, int64_t batch_size)
+      : SourceNode(plan, output_schema,
+                   generator(ConvertTableToExecBatches(*table.get()).ValueOrDie())),
+        batch_size(batch_size) {}
+
+  static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
+                                const ExecNodeOptions& options) {
+    RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 0, "TableSourceNode"));
+    const auto& table_options = checked_cast<const TableSourceNodeOptions&>(options);
+    return plan->EmplaceNode<TableSourceNode>(plan, table_options.table->schema(),

Review comment:
       We aren't great about validation in our existing nodes but since this is essentially a "public API" I think we can start being better.
   
    * What happens if `table` is `nullptr`?
    * What happens if `batch_size <= 0`?

##########
File path: cpp/src/arrow/compute/exec/source_node.cc
##########
@@ -169,12 +172,83 @@ struct SourceNode : ExecNode {
   AsyncGenerator<util::optional<ExecBatch>> generator_;
 };
 
+struct TableSourceNode : public SourceNode {
+  TableSourceNode(ExecPlan* plan, std::shared_ptr<Schema> output_schema,
+                  std::shared_ptr<Table> table, int64_t batch_size)
+      : SourceNode(plan, output_schema,
+                   generator(ConvertTableToExecBatches(*table.get()).ValueOrDie())),
+        batch_size(batch_size) {}

Review comment:
       ```suggestion
     TableSourceNode(ExecPlan* plan,
                     std::shared_ptr<Table> table, int64_t batch_size)
         : SourceNode(plan, table->schema(),
                      generator(ConvertTableToExecBatches(*table.get()).ValueOrDie())),
           batch_size(batch_size) {}
   ```
   
   The `output_schema` should be the schema of the `table`.  We shouldn't need to take this in.  I guess that is where this comes from in `Make` but I think it would be a bit cleaner to take in three arguments instead of four.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] vibhatha commented on a change in pull request #12267: ARROW-15258: [C++] Easy options to create a source node from a table

Posted by GitBox <gi...@apache.org>.
vibhatha commented on a change in pull request #12267:
URL: https://github.com/apache/arrow/pull/12267#discussion_r796245930



##########
File path: cpp/src/arrow/compute/exec/plan_test.cc
##########
@@ -238,6 +238,34 @@ TEST(ExecPlanExecution, SourceSink) {
   }
 }
 
+TEST(ExecPlanExecution, TableSourceSink) {
+  for (bool slow : {false, true}) {
+    SCOPED_TRACE(slow ? "slowed" : "unslowed");
+
+    for (bool parallel : {false, true}) {
+      SCOPED_TRACE(parallel ? "parallel" : "single threaded");
+
+      ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
+      AsyncGenerator<util::optional<ExecBatch>> sink_gen;
+
+      auto exp_batches = MakeBasicBatches();
+      ASSERT_OK_AND_ASSIGN(auto table,
+                           TableFromExecBatches(exp_batches.schema, exp_batches.batches));
+
+      ASSERT_OK(
+          Declaration::Sequence({
+                                    {"table_source", TableSourceNodeOptions{table, 1}},

Review comment:
       Yes, it is good to have that. Let me add it. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] vibhatha commented on a change in pull request #12267: ARROW-15258: [C++] Easy options to create a source node from a table

Posted by GitBox <gi...@apache.org>.
vibhatha commented on a change in pull request #12267:
URL: https://github.com/apache/arrow/pull/12267#discussion_r806541037



##########
File path: cpp/src/arrow/compute/exec/source_node.cc
##########
@@ -174,12 +177,82 @@ struct SourceNode : ExecNode {
   AsyncGenerator<util::optional<ExecBatch>> generator_;
 };
 
+struct TableSourceNode : public SourceNode {
+  TableSourceNode(ExecPlan* plan, std::shared_ptr<Table> table, int64_t batch_size)
+      : SourceNode(plan, table->schema(), TableGenerator(*table.get(), batch_size)) {}
+
+  static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
+                                const ExecNodeOptions& options) {
+    RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 0, "TableSourceNode"));
+    const auto& table_options = checked_cast<const TableSourceNodeOptions&>(options);
+    const auto table = table_options.table;
+    const int64_t batch_size = table_options.batch_size;
+
+    RETURN_NOT_OK(ValidateTableSourceNodeInpute(table, batch_size, "TableSourceNode"));
+
+    return plan->EmplaceNode<TableSourceNode>(plan, table, batch_size);
+  }
+
+  const char* kind_name() const override { return "TableSourceNode"; }
+
+  static arrow::Status ValidateTableSourceNodeInpute(const std::shared_ptr<Table> table,

Review comment:
       👍  




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] ursabot edited a comment on pull request #12267: ARROW-15258: [C++] Easy options to create a source node from a table

Posted by GitBox <gi...@apache.org>.
ursabot edited a comment on pull request #12267:
URL: https://github.com/apache/arrow/pull/12267#issuecomment-1050340033


   Benchmark runs are scheduled for baseline = a26506c8ce2496de6ed68f48972d53f78e19a962 and contender = fffdca2b9bf0245d5022b30eafcea6e3f1b57894. fffdca2b9bf0245d5022b30eafcea6e3f1b57894 is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
   Conbench compare runs links:
   [Finished :arrow_down:0.0% :arrow_up:0.0%] [ec2-t3-xlarge-us-east-2](https://conbench.ursa.dev/compare/runs/bf45a6b9c3b5455ca07f055a202c41ea...ea2bed2647b548cb945312217e00f5b8/)
   [Scheduled] [test-mac-arm](https://conbench.ursa.dev/compare/runs/8728cfa356c04f658d6946413c9bb73f...5d48286f655941d889a529dd616e56bb/)
   [Scheduled] [ursa-i9-9960x](https://conbench.ursa.dev/compare/runs/9d504e0b21664c65a2e4161df19d5f89...ed507d4c5d134fce8ccb009aae63698b/)
   [Finished :arrow_down:0.13% :arrow_up:0.04%] [ursa-thinkcentre-m75q](https://conbench.ursa.dev/compare/runs/c47c37c8a10849839b5cfd1c92ad17e4...4eadc8c85e7a4f26b6d2f51ba2da7dd2/)
   Supported benchmarks:
   ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python. Runs only benchmarks with cloud = True
   test-mac-arm: Supported benchmark langs: C++, Python, R
   ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
   ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] vibhatha commented on a change in pull request #12267: ARROW-15258: [C++] Easy options to create a source node from a table

Posted by GitBox <gi...@apache.org>.
vibhatha commented on a change in pull request #12267:
URL: https://github.com/apache/arrow/pull/12267#discussion_r796268181



##########
File path: cpp/src/arrow/compute/exec/source_node.cc
##########
@@ -169,12 +172,83 @@ struct SourceNode : ExecNode {
   AsyncGenerator<util::optional<ExecBatch>> generator_;
 };
 
+struct TableSourceNode : public SourceNode {
+  TableSourceNode(ExecPlan* plan, std::shared_ptr<Schema> output_schema,
+                  std::shared_ptr<Table> table, int64_t batch_size)
+      : SourceNode(plan, output_schema,
+                   generator(ConvertTableToExecBatches(*table.get()).ValueOrDie())),
+        batch_size(batch_size) {}
+
+  static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
+                                const ExecNodeOptions& options) {
+    RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 0, "TableSourceNode"));
+    const auto& table_options = checked_cast<const TableSourceNodeOptions&>(options);
+    return plan->EmplaceNode<TableSourceNode>(plan, table_options.table->schema(),

Review comment:
       Added validation and a test case to verify it. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] vibhatha commented on a change in pull request #12267: ARROW-15258: [C++] Easy options to create a source node from a table

Posted by GitBox <gi...@apache.org>.
vibhatha commented on a change in pull request #12267:
URL: https://github.com/apache/arrow/pull/12267#discussion_r796379045



##########
File path: cpp/src/arrow/compute/exec/source_node.cc
##########
@@ -169,12 +172,83 @@ struct SourceNode : ExecNode {
   AsyncGenerator<util::optional<ExecBatch>> generator_;
 };
 
+struct TableSourceNode : public SourceNode {
+  TableSourceNode(ExecPlan* plan, std::shared_ptr<Schema> output_schema,
+                  std::shared_ptr<Table> table, int64_t batch_size)
+      : SourceNode(plan, output_schema,
+                   generator(ConvertTableToExecBatches(*table.get()).ValueOrDie())),
+        batch_size(batch_size) {}
+
+  static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
+                                const ExecNodeOptions& options) {
+    RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 0, "TableSourceNode"));
+    const auto& table_options = checked_cast<const TableSourceNodeOptions&>(options);
+    return plan->EmplaceNode<TableSourceNode>(plan, table_options.table->schema(),
+                                              table_options.table,
+                                              table_options.batch_size);
+  }
+  const char* kind_name() const override { return "TableSourceNode"; }
+
+  [[noreturn]] void InputReceived(ExecNode* input, ExecBatch batch) override {
+    SourceNode::InputReceived(input, batch);
+  }
+  [[noreturn]] void ErrorReceived(ExecNode* input, Status status) override {
+    SourceNode::ErrorReceived(input, status);
+  }
+  [[noreturn]] void InputFinished(ExecNode* input, int total_batches) override {
+    SourceNode::InputFinished(input, total_batches);
+  }
+
+  Status StartProducing() override { return SourceNode::StartProducing(); }
+
+  void PauseProducing(ExecNode* output) override { SourceNode::PauseProducing(output); }
+
+  void StopProducing() override { SourceNode::StopProducing(); }
+
+  Future<> finished() override { return SourceNode::finished(); }
+
+  arrow::AsyncGenerator<util::optional<ExecBatch>> generator(
+      std::vector<ExecBatch> batches) {
+    auto opt_batches = MapVector(
+        [](ExecBatch batch) { return util::make_optional(std::move(batch)); }, batches);
+    AsyncGenerator<util::optional<ExecBatch>> gen;
+    gen = MakeVectorGenerator(std::move(opt_batches));
+    return gen;
+  }
+
+  arrow::Result<std::vector<ExecBatch>> ConvertTableToExecBatches(const Table& table) {
+    std::shared_ptr<TableBatchReader> reader = std::make_shared<TableBatchReader>(table);
+
+    // setting chunksize for the batch reader
+    if (batch_size > 0) {
+      reader->set_chunksize(batch_size);
+    }
+
+    std::shared_ptr<arrow::RecordBatch> batch;
+    std::vector<std::shared_ptr<arrow::RecordBatch>> batch_vector;
+    std::vector<ExecBatch> exec_batches;
+    while (true) {
+      ARROW_ASSIGN_OR_RAISE(batch, reader->Next());

Review comment:
       Isn't this is a part of `gtest_util.h`, should we include it? I see it's usage is only in the tests and benchmarks. 
   Did I interpret it right? Please correct me if I am wrong. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] github-actions[bot] commented on pull request #12267: ARROW-15258: [C++] Easy options to create a source node from a table

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #12267:
URL: https://github.com/apache/arrow/pull/12267#issuecomment-1021988742


   https://issues.apache.org/jira/browse/ARROW-15258


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #12267: ARROW-15258: [C++] Easy options to create a source node from a table

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #12267:
URL: https://github.com/apache/arrow/pull/12267#discussion_r806208552



##########
File path: cpp/src/arrow/compute/exec/options.h
##########
@@ -55,6 +55,21 @@ class ARROW_EXPORT SourceNodeOptions : public ExecNodeOptions {
   std::function<Future<util::optional<ExecBatch>>()> generator;
 };
 
+/// \brief Adapt an Table as a source node
+/// if plan->exec_context()->executor() is not null.
+class ARROW_EXPORT TableSourceNodeOptions : public ExecNodeOptions {
+ public:
+  TableSourceNodeOptions(std::shared_ptr<Table> table, int64_t batch_size)
+      : table(table), batch_size(batch_size) {}
+
+  // arrow table which acts as the data source
+  std::shared_ptr<Table> table;
+  // batch size which used to set the chunk_size to
+  // the table batch reader used in building the data source
+  // from the table

Review comment:
       ```suggestion
     // Size of batches to emit from this node
     // If the table is larger the node will emit multiple batches from the
     // the table to be processed in parallel.
   ```
   
   Users of this class don't need to know that we are building a data source or that there is a `chunk_size` property.  Someone using this class just  knows they are feeding a table into an execution plan.

##########
File path: cpp/src/arrow/compute/exec/options.h
##########
@@ -55,6 +55,21 @@ class ARROW_EXPORT SourceNodeOptions : public ExecNodeOptions {
   std::function<Future<util::optional<ExecBatch>>()> generator;
 };
 
+/// \brief Adapt an Table as a source node
+/// if plan->exec_context()->executor() is not null.

Review comment:
       ```suggestion
   ```

##########
File path: cpp/src/arrow/compute/exec/source_node.cc
##########
@@ -174,12 +177,82 @@ struct SourceNode : ExecNode {
   AsyncGenerator<util::optional<ExecBatch>> generator_;
 };
 
+struct TableSourceNode : public SourceNode {
+  TableSourceNode(ExecPlan* plan, std::shared_ptr<Table> table, int64_t batch_size)
+      : SourceNode(plan, table->schema(), TableGenerator(*table.get(), batch_size)) {}
+
+  static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
+                                const ExecNodeOptions& options) {
+    RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 0, "TableSourceNode"));
+    const auto& table_options = checked_cast<const TableSourceNodeOptions&>(options);
+    const auto table = table_options.table;
+    const int64_t batch_size = table_options.batch_size;
+
+    RETURN_NOT_OK(ValidateTableSourceNodeInpute(table, batch_size, "TableSourceNode"));
+
+    return plan->EmplaceNode<TableSourceNode>(plan, table, batch_size);
+  }
+
+  const char* kind_name() const override { return "TableSourceNode"; }
+
+  static arrow::Status ValidateTableSourceNodeInpute(const std::shared_ptr<Table> table,
+                                                     const int64_t batch_size,
+                                                     const char* kind_name) {
+    if (table == nullptr) {
+      return Status::Invalid(kind_name, " node requires table which is not null");
+    }
+
+    if (batch_size <= 0) {
+      return Status::Invalid(
+          kind_name, " node requires, batch_size > 0 , but got batch size ", batch_size);
+    }
+
+    return Status::OK();
+  }
+
+  static arrow::AsyncGenerator<util::optional<ExecBatch>> TableGenerator(
+      const Table& table, const int64_t batch_size) {
+    auto batches = ConvertTableToExecBatches(table, batch_size);
+    auto opt_batches = MapVector(
+        [](ExecBatch batch) { return util::make_optional(std::move(batch)); }, batches);
+    AsyncGenerator<util::optional<ExecBatch>> gen;
+    gen = MakeVectorGenerator(std::move(opt_batches));
+    return gen;
+  }
+
+  static std::vector<ExecBatch> ConvertTableToExecBatches(const Table& table,
+                                                          const int64_t batch_size) {
+    std::shared_ptr<TableBatchReader> reader = std::make_shared<TableBatchReader>(table);
+
+    // setting chunksize for the batch reader
+    if (batch_size > 0) {

Review comment:
       Nit: It's not really harmful but there is no way we should be able to get here unless `batch_size > 0` so this check is redundant.

##########
File path: cpp/src/arrow/compute/exec/source_node.cc
##########
@@ -174,12 +177,82 @@ struct SourceNode : ExecNode {
   AsyncGenerator<util::optional<ExecBatch>> generator_;
 };
 
+struct TableSourceNode : public SourceNode {
+  TableSourceNode(ExecPlan* plan, std::shared_ptr<Table> table, int64_t batch_size)
+      : SourceNode(plan, table->schema(), TableGenerator(*table.get(), batch_size)) {}
+
+  static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
+                                const ExecNodeOptions& options) {
+    RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 0, "TableSourceNode"));
+    const auto& table_options = checked_cast<const TableSourceNodeOptions&>(options);
+    const auto table = table_options.table;
+    const int64_t batch_size = table_options.batch_size;
+
+    RETURN_NOT_OK(ValidateTableSourceNodeInpute(table, batch_size, "TableSourceNode"));
+
+    return plan->EmplaceNode<TableSourceNode>(plan, table, batch_size);
+  }
+
+  const char* kind_name() const override { return "TableSourceNode"; }
+
+  static arrow::Status ValidateTableSourceNodeInpute(const std::shared_ptr<Table> table,
+                                                     const int64_t batch_size,
+                                                     const char* kind_name) {
+    if (table == nullptr) {
+      return Status::Invalid(kind_name, " node requires table which is not null");
+    }
+
+    if (batch_size <= 0) {
+      return Status::Invalid(
+          kind_name, " node requires, batch_size > 0 , but got batch size ", batch_size);

Review comment:
       ```suggestion
             "TableSourceNode requires batch_size > 0, but got batch size ", batch_size);
   ```

##########
File path: cpp/src/arrow/compute/exec/source_node.cc
##########
@@ -174,12 +177,82 @@ struct SourceNode : ExecNode {
   AsyncGenerator<util::optional<ExecBatch>> generator_;
 };
 
+struct TableSourceNode : public SourceNode {
+  TableSourceNode(ExecPlan* plan, std::shared_ptr<Table> table, int64_t batch_size)
+      : SourceNode(plan, table->schema(), TableGenerator(*table.get(), batch_size)) {}
+
+  static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
+                                const ExecNodeOptions& options) {
+    RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 0, "TableSourceNode"));
+    const auto& table_options = checked_cast<const TableSourceNodeOptions&>(options);
+    const auto table = table_options.table;
+    const int64_t batch_size = table_options.batch_size;
+
+    RETURN_NOT_OK(ValidateTableSourceNodeInpute(table, batch_size, "TableSourceNode"));
+
+    return plan->EmplaceNode<TableSourceNode>(plan, table, batch_size);
+  }
+
+  const char* kind_name() const override { return "TableSourceNode"; }
+
+  static arrow::Status ValidateTableSourceNodeInpute(const std::shared_ptr<Table> table,
+                                                     const int64_t batch_size,
+                                                     const char* kind_name) {
+    if (table == nullptr) {
+      return Status::Invalid(kind_name, " node requires table which is not null");
+    }
+
+    if (batch_size <= 0) {
+      return Status::Invalid(
+          kind_name, " node requires, batch_size > 0 , but got batch size ", batch_size);
+    }
+
+    return Status::OK();
+  }
+
+  static arrow::AsyncGenerator<util::optional<ExecBatch>> TableGenerator(
+      const Table& table, const int64_t batch_size) {
+    auto batches = ConvertTableToExecBatches(table, batch_size);
+    auto opt_batches = MapVector(
+        [](ExecBatch batch) { return util::make_optional(std::move(batch)); }, batches);
+    AsyncGenerator<util::optional<ExecBatch>> gen;
+    gen = MakeVectorGenerator(std::move(opt_batches));
+    return gen;
+  }
+
+  static std::vector<ExecBatch> ConvertTableToExecBatches(const Table& table,
+                                                          const int64_t batch_size) {
+    std::shared_ptr<TableBatchReader> reader = std::make_shared<TableBatchReader>(table);
+
+    // setting chunksize for the batch reader
+    if (batch_size > 0) {
+      reader->set_chunksize(batch_size);
+    }
+
+    std::shared_ptr<RecordBatch> batch;
+    std::vector<ExecBatch> exec_batches;
+    while (true) {
+      auto batch_res = reader->Next();
+      if (batch_res.ok()) {
+        batch = batch_res.ValueOrDie();
+      }
+      if (batch == NULLPTR) {
+        break;
+      }
+      ExecBatch exec_batch{*batch};
+      exec_batches.push_back(exec_batch);

Review comment:
       ```suggestion
         exec_batches.emplace_back(*batch);
   ```

##########
File path: cpp/src/arrow/compute/exec/source_node.cc
##########
@@ -174,12 +177,82 @@ struct SourceNode : ExecNode {
   AsyncGenerator<util::optional<ExecBatch>> generator_;
 };
 
+struct TableSourceNode : public SourceNode {
+  TableSourceNode(ExecPlan* plan, std::shared_ptr<Table> table, int64_t batch_size)
+      : SourceNode(plan, table->schema(), TableGenerator(*table.get(), batch_size)) {}
+
+  static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
+                                const ExecNodeOptions& options) {
+    RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 0, "TableSourceNode"));
+    const auto& table_options = checked_cast<const TableSourceNodeOptions&>(options);
+    const auto table = table_options.table;
+    const int64_t batch_size = table_options.batch_size;
+
+    RETURN_NOT_OK(ValidateTableSourceNodeInpute(table, batch_size, "TableSourceNode"));
+
+    return plan->EmplaceNode<TableSourceNode>(plan, table, batch_size);
+  }
+
+  const char* kind_name() const override { return "TableSourceNode"; }
+
+  static arrow::Status ValidateTableSourceNodeInpute(const std::shared_ptr<Table> table,
+                                                     const int64_t batch_size,
+                                                     const char* kind_name) {
+    if (table == nullptr) {
+      return Status::Invalid(kind_name, " node requires table which is not null");

Review comment:
       ```suggestion
         return Status::Invalid("TableSourceNode requires a table which is not null");
   ```

##########
File path: cpp/src/arrow/compute/exec/source_node.cc
##########
@@ -174,12 +177,82 @@ struct SourceNode : ExecNode {
   AsyncGenerator<util::optional<ExecBatch>> generator_;
 };
 
+struct TableSourceNode : public SourceNode {
+  TableSourceNode(ExecPlan* plan, std::shared_ptr<Table> table, int64_t batch_size)
+      : SourceNode(plan, table->schema(), TableGenerator(*table.get(), batch_size)) {}
+
+  static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
+                                const ExecNodeOptions& options) {
+    RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 0, "TableSourceNode"));
+    const auto& table_options = checked_cast<const TableSourceNodeOptions&>(options);
+    const auto table = table_options.table;
+    const int64_t batch_size = table_options.batch_size;
+
+    RETURN_NOT_OK(ValidateTableSourceNodeInpute(table, batch_size, "TableSourceNode"));
+
+    return plan->EmplaceNode<TableSourceNode>(plan, table, batch_size);
+  }
+
+  const char* kind_name() const override { return "TableSourceNode"; }
+
+  static arrow::Status ValidateTableSourceNodeInpute(const std::shared_ptr<Table> table,

Review comment:
       ```suggestion
     static arrow::Status ValidateTableSourceNodeInput(const std::shared_ptr<Table>& table,
   ```
   
   Thanks for including this btw, it is good to validate user options.

##########
File path: cpp/src/arrow/compute/exec/source_node.cc
##########
@@ -174,12 +177,82 @@ struct SourceNode : ExecNode {
   AsyncGenerator<util::optional<ExecBatch>> generator_;
 };
 
+struct TableSourceNode : public SourceNode {
+  TableSourceNode(ExecPlan* plan, std::shared_ptr<Table> table, int64_t batch_size)
+      : SourceNode(plan, table->schema(), TableGenerator(*table.get(), batch_size)) {}

Review comment:
       ```suggestion
         : SourceNode(plan, table->schema(), TableGenerator(*table, batch_size)) {}
   ```

##########
File path: cpp/src/arrow/compute/exec/source_node.cc
##########
@@ -174,12 +177,82 @@ struct SourceNode : ExecNode {
   AsyncGenerator<util::optional<ExecBatch>> generator_;
 };
 
+struct TableSourceNode : public SourceNode {
+  TableSourceNode(ExecPlan* plan, std::shared_ptr<Table> table, int64_t batch_size)
+      : SourceNode(plan, table->schema(), TableGenerator(*table.get(), batch_size)) {}
+
+  static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
+                                const ExecNodeOptions& options) {
+    RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 0, "TableSourceNode"));
+    const auto& table_options = checked_cast<const TableSourceNodeOptions&>(options);
+    const auto table = table_options.table;
+    const int64_t batch_size = table_options.batch_size;
+
+    RETURN_NOT_OK(ValidateTableSourceNodeInpute(table, batch_size, "TableSourceNode"));
+
+    return plan->EmplaceNode<TableSourceNode>(plan, table, batch_size);
+  }
+
+  const char* kind_name() const override { return "TableSourceNode"; }
+
+  static arrow::Status ValidateTableSourceNodeInpute(const std::shared_ptr<Table> table,
+                                                     const int64_t batch_size,
+                                                     const char* kind_name) {
+    if (table == nullptr) {
+      return Status::Invalid(kind_name, " node requires table which is not null");
+    }
+
+    if (batch_size <= 0) {
+      return Status::Invalid(
+          kind_name, " node requires, batch_size > 0 , but got batch size ", batch_size);
+    }
+
+    return Status::OK();
+  }
+
+  static arrow::AsyncGenerator<util::optional<ExecBatch>> TableGenerator(
+      const Table& table, const int64_t batch_size) {
+    auto batches = ConvertTableToExecBatches(table, batch_size);
+    auto opt_batches = MapVector(
+        [](ExecBatch batch) { return util::make_optional(std::move(batch)); }, batches);

Review comment:
       ```suggestion
       auto opt_batches = MapVector(
           [](ExecBatch batch) { return util::make_optional(std::move(batch)); }, std::move(batches));
   ```

##########
File path: cpp/src/arrow/compute/exec/source_node.cc
##########
@@ -174,12 +177,82 @@ struct SourceNode : ExecNode {
   AsyncGenerator<util::optional<ExecBatch>> generator_;
 };
 
+struct TableSourceNode : public SourceNode {
+  TableSourceNode(ExecPlan* plan, std::shared_ptr<Table> table, int64_t batch_size)
+      : SourceNode(plan, table->schema(), TableGenerator(*table.get(), batch_size)) {}
+
+  static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
+                                const ExecNodeOptions& options) {
+    RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 0, "TableSourceNode"));
+    const auto& table_options = checked_cast<const TableSourceNodeOptions&>(options);
+    const auto table = table_options.table;
+    const int64_t batch_size = table_options.batch_size;
+
+    RETURN_NOT_OK(ValidateTableSourceNodeInpute(table, batch_size, "TableSourceNode"));
+
+    return plan->EmplaceNode<TableSourceNode>(plan, table, batch_size);
+  }
+
+  const char* kind_name() const override { return "TableSourceNode"; }
+
+  static arrow::Status ValidateTableSourceNodeInpute(const std::shared_ptr<Table> table,
+                                                     const int64_t batch_size,
+                                                     const char* kind_name) {
+    if (table == nullptr) {
+      return Status::Invalid(kind_name, " node requires table which is not null");
+    }
+
+    if (batch_size <= 0) {
+      return Status::Invalid(
+          kind_name, " node requires, batch_size > 0 , but got batch size ", batch_size);
+    }
+
+    return Status::OK();
+  }
+
+  static arrow::AsyncGenerator<util::optional<ExecBatch>> TableGenerator(
+      const Table& table, const int64_t batch_size) {
+    auto batches = ConvertTableToExecBatches(table, batch_size);
+    auto opt_batches = MapVector(
+        [](ExecBatch batch) { return util::make_optional(std::move(batch)); }, batches);
+    AsyncGenerator<util::optional<ExecBatch>> gen;
+    gen = MakeVectorGenerator(std::move(opt_batches));
+    return gen;
+  }
+
+  static std::vector<ExecBatch> ConvertTableToExecBatches(const Table& table,
+                                                          const int64_t batch_size) {
+    std::shared_ptr<TableBatchReader> reader = std::make_shared<TableBatchReader>(table);
+
+    // setting chunksize for the batch reader
+    if (batch_size > 0) {
+      reader->set_chunksize(batch_size);
+    }
+
+    std::shared_ptr<RecordBatch> batch;
+    std::vector<ExecBatch> exec_batches;
+    while (true) {
+      auto batch_res = reader->Next();
+      if (batch_res.ok()) {
+        batch = batch_res.ValueOrDie();

Review comment:
       ```suggestion
           batch = std::move(batch_res).MoveValueUnsafe();
   ```
   This will do a move instead of a copy.

##########
File path: cpp/src/arrow/compute/exec/source_node.cc
##########
@@ -174,12 +177,82 @@ struct SourceNode : ExecNode {
   AsyncGenerator<util::optional<ExecBatch>> generator_;
 };
 
+struct TableSourceNode : public SourceNode {
+  TableSourceNode(ExecPlan* plan, std::shared_ptr<Table> table, int64_t batch_size)
+      : SourceNode(plan, table->schema(), TableGenerator(*table.get(), batch_size)) {}
+
+  static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
+                                const ExecNodeOptions& options) {
+    RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 0, "TableSourceNode"));
+    const auto& table_options = checked_cast<const TableSourceNodeOptions&>(options);
+    const auto table = table_options.table;

Review comment:
       ```suggestion
       const auto& table = table_options.table;
   ```

##########
File path: cpp/src/arrow/compute/exec/source_node.cc
##########
@@ -174,12 +177,82 @@ struct SourceNode : ExecNode {
   AsyncGenerator<util::optional<ExecBatch>> generator_;
 };
 
+struct TableSourceNode : public SourceNode {
+  TableSourceNode(ExecPlan* plan, std::shared_ptr<Table> table, int64_t batch_size)
+      : SourceNode(plan, table->schema(), TableGenerator(*table.get(), batch_size)) {}
+
+  static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
+                                const ExecNodeOptions& options) {
+    RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 0, "TableSourceNode"));
+    const auto& table_options = checked_cast<const TableSourceNodeOptions&>(options);
+    const auto table = table_options.table;
+    const int64_t batch_size = table_options.batch_size;
+
+    RETURN_NOT_OK(ValidateTableSourceNodeInpute(table, batch_size, "TableSourceNode"));

Review comment:
       ```suggestion
       RETURN_NOT_OK(ValidateTableSourceNodeInput(table, batch_size));
   ```

##########
File path: cpp/src/arrow/compute/exec/source_node.cc
##########
@@ -174,12 +177,82 @@ struct SourceNode : ExecNode {
   AsyncGenerator<util::optional<ExecBatch>> generator_;
 };
 
+struct TableSourceNode : public SourceNode {
+  TableSourceNode(ExecPlan* plan, std::shared_ptr<Table> table, int64_t batch_size)
+      : SourceNode(plan, table->schema(), TableGenerator(*table.get(), batch_size)) {}
+
+  static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
+                                const ExecNodeOptions& options) {
+    RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 0, "TableSourceNode"));
+    const auto& table_options = checked_cast<const TableSourceNodeOptions&>(options);
+    const auto table = table_options.table;
+    const int64_t batch_size = table_options.batch_size;
+
+    RETURN_NOT_OK(ValidateTableSourceNodeInpute(table, batch_size, "TableSourceNode"));
+
+    return plan->EmplaceNode<TableSourceNode>(plan, table, batch_size);
+  }
+
+  const char* kind_name() const override { return "TableSourceNode"; }
+
+  static arrow::Status ValidateTableSourceNodeInpute(const std::shared_ptr<Table> table,
+                                                     const int64_t batch_size,
+                                                     const char* kind_name) {

Review comment:
       ```suggestion
                                                        const int64_t batch_size) {
   ```
   We don't need to pass `kind_name` in if we're just going to use a literal.  I see it was based on `ValidateExecNodeInputs` but that case is a little different since that method is implemented in the base class and it doesn't know the kind name.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #12267: ARROW-15258: [C++] Easy options to create a source node from a table

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #12267:
URL: https://github.com/apache/arrow/pull/12267#discussion_r806208552



##########
File path: cpp/src/arrow/compute/exec/options.h
##########
@@ -55,6 +55,21 @@ class ARROW_EXPORT SourceNodeOptions : public ExecNodeOptions {
   std::function<Future<util::optional<ExecBatch>>()> generator;
 };
 
+/// \brief Adapt an Table as a source node
+/// if plan->exec_context()->executor() is not null.
+class ARROW_EXPORT TableSourceNodeOptions : public ExecNodeOptions {
+ public:
+  TableSourceNodeOptions(std::shared_ptr<Table> table, int64_t batch_size)
+      : table(table), batch_size(batch_size) {}
+
+  // arrow table which acts as the data source
+  std::shared_ptr<Table> table;
+  // batch size which used to set the chunk_size to
+  // the table batch reader used in building the data source
+  // from the table

Review comment:
       ```suggestion
     // Size of batches to emit from this node
     // If the table is larger the node will emit multiple batches from the
     // the table to be processed in parallel.
   ```
   
   Users of this class don't need to know that we are building a data source or that there is a `chunk_size` property.  Someone using this class just  knows they are feeding a table into an execution plan.

##########
File path: cpp/src/arrow/compute/exec/options.h
##########
@@ -55,6 +55,21 @@ class ARROW_EXPORT SourceNodeOptions : public ExecNodeOptions {
   std::function<Future<util::optional<ExecBatch>>()> generator;
 };
 
+/// \brief Adapt an Table as a source node
+/// if plan->exec_context()->executor() is not null.

Review comment:
       ```suggestion
   ```

##########
File path: cpp/src/arrow/compute/exec/source_node.cc
##########
@@ -174,12 +177,82 @@ struct SourceNode : ExecNode {
   AsyncGenerator<util::optional<ExecBatch>> generator_;
 };
 
+struct TableSourceNode : public SourceNode {
+  TableSourceNode(ExecPlan* plan, std::shared_ptr<Table> table, int64_t batch_size)
+      : SourceNode(plan, table->schema(), TableGenerator(*table.get(), batch_size)) {}
+
+  static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
+                                const ExecNodeOptions& options) {
+    RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 0, "TableSourceNode"));
+    const auto& table_options = checked_cast<const TableSourceNodeOptions&>(options);
+    const auto table = table_options.table;
+    const int64_t batch_size = table_options.batch_size;
+
+    RETURN_NOT_OK(ValidateTableSourceNodeInpute(table, batch_size, "TableSourceNode"));
+
+    return plan->EmplaceNode<TableSourceNode>(plan, table, batch_size);
+  }
+
+  const char* kind_name() const override { return "TableSourceNode"; }
+
+  static arrow::Status ValidateTableSourceNodeInpute(const std::shared_ptr<Table> table,
+                                                     const int64_t batch_size,
+                                                     const char* kind_name) {
+    if (table == nullptr) {
+      return Status::Invalid(kind_name, " node requires table which is not null");
+    }
+
+    if (batch_size <= 0) {
+      return Status::Invalid(
+          kind_name, " node requires, batch_size > 0 , but got batch size ", batch_size);
+    }
+
+    return Status::OK();
+  }
+
+  static arrow::AsyncGenerator<util::optional<ExecBatch>> TableGenerator(
+      const Table& table, const int64_t batch_size) {
+    auto batches = ConvertTableToExecBatches(table, batch_size);
+    auto opt_batches = MapVector(
+        [](ExecBatch batch) { return util::make_optional(std::move(batch)); }, batches);
+    AsyncGenerator<util::optional<ExecBatch>> gen;
+    gen = MakeVectorGenerator(std::move(opt_batches));
+    return gen;
+  }
+
+  static std::vector<ExecBatch> ConvertTableToExecBatches(const Table& table,
+                                                          const int64_t batch_size) {
+    std::shared_ptr<TableBatchReader> reader = std::make_shared<TableBatchReader>(table);
+
+    // setting chunksize for the batch reader
+    if (batch_size > 0) {

Review comment:
       Nit: It's not really harmful but there is no way we should be able to get here unless `batch_size > 0` so this check is redundant.

##########
File path: cpp/src/arrow/compute/exec/source_node.cc
##########
@@ -174,12 +177,82 @@ struct SourceNode : ExecNode {
   AsyncGenerator<util::optional<ExecBatch>> generator_;
 };
 
+struct TableSourceNode : public SourceNode {
+  TableSourceNode(ExecPlan* plan, std::shared_ptr<Table> table, int64_t batch_size)
+      : SourceNode(plan, table->schema(), TableGenerator(*table.get(), batch_size)) {}
+
+  static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
+                                const ExecNodeOptions& options) {
+    RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 0, "TableSourceNode"));
+    const auto& table_options = checked_cast<const TableSourceNodeOptions&>(options);
+    const auto table = table_options.table;
+    const int64_t batch_size = table_options.batch_size;
+
+    RETURN_NOT_OK(ValidateTableSourceNodeInpute(table, batch_size, "TableSourceNode"));
+
+    return plan->EmplaceNode<TableSourceNode>(plan, table, batch_size);
+  }
+
+  const char* kind_name() const override { return "TableSourceNode"; }
+
+  static arrow::Status ValidateTableSourceNodeInpute(const std::shared_ptr<Table> table,
+                                                     const int64_t batch_size,
+                                                     const char* kind_name) {
+    if (table == nullptr) {
+      return Status::Invalid(kind_name, " node requires table which is not null");
+    }
+
+    if (batch_size <= 0) {
+      return Status::Invalid(
+          kind_name, " node requires, batch_size > 0 , but got batch size ", batch_size);

Review comment:
       ```suggestion
             "TableSourceNode requires batch_size > 0, but got batch size ", batch_size);
   ```

##########
File path: cpp/src/arrow/compute/exec/source_node.cc
##########
@@ -174,12 +177,82 @@ struct SourceNode : ExecNode {
   AsyncGenerator<util::optional<ExecBatch>> generator_;
 };
 
+struct TableSourceNode : public SourceNode {
+  TableSourceNode(ExecPlan* plan, std::shared_ptr<Table> table, int64_t batch_size)
+      : SourceNode(plan, table->schema(), TableGenerator(*table.get(), batch_size)) {}
+
+  static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
+                                const ExecNodeOptions& options) {
+    RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 0, "TableSourceNode"));
+    const auto& table_options = checked_cast<const TableSourceNodeOptions&>(options);
+    const auto table = table_options.table;
+    const int64_t batch_size = table_options.batch_size;
+
+    RETURN_NOT_OK(ValidateTableSourceNodeInpute(table, batch_size, "TableSourceNode"));
+
+    return plan->EmplaceNode<TableSourceNode>(plan, table, batch_size);
+  }
+
+  const char* kind_name() const override { return "TableSourceNode"; }
+
+  static arrow::Status ValidateTableSourceNodeInpute(const std::shared_ptr<Table> table,
+                                                     const int64_t batch_size,
+                                                     const char* kind_name) {
+    if (table == nullptr) {
+      return Status::Invalid(kind_name, " node requires table which is not null");
+    }
+
+    if (batch_size <= 0) {
+      return Status::Invalid(
+          kind_name, " node requires, batch_size > 0 , but got batch size ", batch_size);
+    }
+
+    return Status::OK();
+  }
+
+  static arrow::AsyncGenerator<util::optional<ExecBatch>> TableGenerator(
+      const Table& table, const int64_t batch_size) {
+    auto batches = ConvertTableToExecBatches(table, batch_size);
+    auto opt_batches = MapVector(
+        [](ExecBatch batch) { return util::make_optional(std::move(batch)); }, batches);
+    AsyncGenerator<util::optional<ExecBatch>> gen;
+    gen = MakeVectorGenerator(std::move(opt_batches));
+    return gen;
+  }
+
+  static std::vector<ExecBatch> ConvertTableToExecBatches(const Table& table,
+                                                          const int64_t batch_size) {
+    std::shared_ptr<TableBatchReader> reader = std::make_shared<TableBatchReader>(table);
+
+    // setting chunksize for the batch reader
+    if (batch_size > 0) {
+      reader->set_chunksize(batch_size);
+    }
+
+    std::shared_ptr<RecordBatch> batch;
+    std::vector<ExecBatch> exec_batches;
+    while (true) {
+      auto batch_res = reader->Next();
+      if (batch_res.ok()) {
+        batch = batch_res.ValueOrDie();
+      }
+      if (batch == NULLPTR) {
+        break;
+      }
+      ExecBatch exec_batch{*batch};
+      exec_batches.push_back(exec_batch);

Review comment:
       ```suggestion
         exec_batches.emplace_back(*batch);
   ```

##########
File path: cpp/src/arrow/compute/exec/source_node.cc
##########
@@ -174,12 +177,82 @@ struct SourceNode : ExecNode {
   AsyncGenerator<util::optional<ExecBatch>> generator_;
 };
 
+struct TableSourceNode : public SourceNode {
+  TableSourceNode(ExecPlan* plan, std::shared_ptr<Table> table, int64_t batch_size)
+      : SourceNode(plan, table->schema(), TableGenerator(*table.get(), batch_size)) {}
+
+  static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
+                                const ExecNodeOptions& options) {
+    RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 0, "TableSourceNode"));
+    const auto& table_options = checked_cast<const TableSourceNodeOptions&>(options);
+    const auto table = table_options.table;
+    const int64_t batch_size = table_options.batch_size;
+
+    RETURN_NOT_OK(ValidateTableSourceNodeInpute(table, batch_size, "TableSourceNode"));
+
+    return plan->EmplaceNode<TableSourceNode>(plan, table, batch_size);
+  }
+
+  const char* kind_name() const override { return "TableSourceNode"; }
+
+  static arrow::Status ValidateTableSourceNodeInpute(const std::shared_ptr<Table> table,
+                                                     const int64_t batch_size,
+                                                     const char* kind_name) {
+    if (table == nullptr) {
+      return Status::Invalid(kind_name, " node requires table which is not null");

Review comment:
       ```suggestion
         return Status::Invalid("TableSourceNode requires a table which is not null");
   ```

##########
File path: cpp/src/arrow/compute/exec/source_node.cc
##########
@@ -174,12 +177,82 @@ struct SourceNode : ExecNode {
   AsyncGenerator<util::optional<ExecBatch>> generator_;
 };
 
+struct TableSourceNode : public SourceNode {
+  TableSourceNode(ExecPlan* plan, std::shared_ptr<Table> table, int64_t batch_size)
+      : SourceNode(plan, table->schema(), TableGenerator(*table.get(), batch_size)) {}
+
+  static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
+                                const ExecNodeOptions& options) {
+    RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 0, "TableSourceNode"));
+    const auto& table_options = checked_cast<const TableSourceNodeOptions&>(options);
+    const auto table = table_options.table;
+    const int64_t batch_size = table_options.batch_size;
+
+    RETURN_NOT_OK(ValidateTableSourceNodeInpute(table, batch_size, "TableSourceNode"));
+
+    return plan->EmplaceNode<TableSourceNode>(plan, table, batch_size);
+  }
+
+  const char* kind_name() const override { return "TableSourceNode"; }
+
+  static arrow::Status ValidateTableSourceNodeInpute(const std::shared_ptr<Table> table,

Review comment:
       ```suggestion
     static arrow::Status ValidateTableSourceNodeInput(const std::shared_ptr<Table>& table,
   ```
   
   Thanks for including this btw, it is good to validate user options.

##########
File path: cpp/src/arrow/compute/exec/source_node.cc
##########
@@ -174,12 +177,82 @@ struct SourceNode : ExecNode {
   AsyncGenerator<util::optional<ExecBatch>> generator_;
 };
 
+struct TableSourceNode : public SourceNode {
+  TableSourceNode(ExecPlan* plan, std::shared_ptr<Table> table, int64_t batch_size)
+      : SourceNode(plan, table->schema(), TableGenerator(*table.get(), batch_size)) {}

Review comment:
       ```suggestion
         : SourceNode(plan, table->schema(), TableGenerator(*table, batch_size)) {}
   ```

##########
File path: cpp/src/arrow/compute/exec/source_node.cc
##########
@@ -174,12 +177,82 @@ struct SourceNode : ExecNode {
   AsyncGenerator<util::optional<ExecBatch>> generator_;
 };
 
+struct TableSourceNode : public SourceNode {
+  TableSourceNode(ExecPlan* plan, std::shared_ptr<Table> table, int64_t batch_size)
+      : SourceNode(plan, table->schema(), TableGenerator(*table.get(), batch_size)) {}
+
+  static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
+                                const ExecNodeOptions& options) {
+    RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 0, "TableSourceNode"));
+    const auto& table_options = checked_cast<const TableSourceNodeOptions&>(options);
+    const auto table = table_options.table;
+    const int64_t batch_size = table_options.batch_size;
+
+    RETURN_NOT_OK(ValidateTableSourceNodeInpute(table, batch_size, "TableSourceNode"));
+
+    return plan->EmplaceNode<TableSourceNode>(plan, table, batch_size);
+  }
+
+  const char* kind_name() const override { return "TableSourceNode"; }
+
+  static arrow::Status ValidateTableSourceNodeInpute(const std::shared_ptr<Table> table,
+                                                     const int64_t batch_size,
+                                                     const char* kind_name) {
+    if (table == nullptr) {
+      return Status::Invalid(kind_name, " node requires table which is not null");
+    }
+
+    if (batch_size <= 0) {
+      return Status::Invalid(
+          kind_name, " node requires, batch_size > 0 , but got batch size ", batch_size);
+    }
+
+    return Status::OK();
+  }
+
+  static arrow::AsyncGenerator<util::optional<ExecBatch>> TableGenerator(
+      const Table& table, const int64_t batch_size) {
+    auto batches = ConvertTableToExecBatches(table, batch_size);
+    auto opt_batches = MapVector(
+        [](ExecBatch batch) { return util::make_optional(std::move(batch)); }, batches);

Review comment:
       ```suggestion
       auto opt_batches = MapVector(
           [](ExecBatch batch) { return util::make_optional(std::move(batch)); }, std::move(batches));
   ```

##########
File path: cpp/src/arrow/compute/exec/source_node.cc
##########
@@ -174,12 +177,82 @@ struct SourceNode : ExecNode {
   AsyncGenerator<util::optional<ExecBatch>> generator_;
 };
 
+struct TableSourceNode : public SourceNode {
+  TableSourceNode(ExecPlan* plan, std::shared_ptr<Table> table, int64_t batch_size)
+      : SourceNode(plan, table->schema(), TableGenerator(*table.get(), batch_size)) {}
+
+  static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
+                                const ExecNodeOptions& options) {
+    RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 0, "TableSourceNode"));
+    const auto& table_options = checked_cast<const TableSourceNodeOptions&>(options);
+    const auto table = table_options.table;
+    const int64_t batch_size = table_options.batch_size;
+
+    RETURN_NOT_OK(ValidateTableSourceNodeInpute(table, batch_size, "TableSourceNode"));
+
+    return plan->EmplaceNode<TableSourceNode>(plan, table, batch_size);
+  }
+
+  const char* kind_name() const override { return "TableSourceNode"; }
+
+  static arrow::Status ValidateTableSourceNodeInpute(const std::shared_ptr<Table> table,
+                                                     const int64_t batch_size,
+                                                     const char* kind_name) {
+    if (table == nullptr) {
+      return Status::Invalid(kind_name, " node requires table which is not null");
+    }
+
+    if (batch_size <= 0) {
+      return Status::Invalid(
+          kind_name, " node requires, batch_size > 0 , but got batch size ", batch_size);
+    }
+
+    return Status::OK();
+  }
+
+  static arrow::AsyncGenerator<util::optional<ExecBatch>> TableGenerator(
+      const Table& table, const int64_t batch_size) {
+    auto batches = ConvertTableToExecBatches(table, batch_size);
+    auto opt_batches = MapVector(
+        [](ExecBatch batch) { return util::make_optional(std::move(batch)); }, batches);
+    AsyncGenerator<util::optional<ExecBatch>> gen;
+    gen = MakeVectorGenerator(std::move(opt_batches));
+    return gen;
+  }
+
+  static std::vector<ExecBatch> ConvertTableToExecBatches(const Table& table,
+                                                          const int64_t batch_size) {
+    std::shared_ptr<TableBatchReader> reader = std::make_shared<TableBatchReader>(table);
+
+    // setting chunksize for the batch reader
+    if (batch_size > 0) {
+      reader->set_chunksize(batch_size);
+    }
+
+    std::shared_ptr<RecordBatch> batch;
+    std::vector<ExecBatch> exec_batches;
+    while (true) {
+      auto batch_res = reader->Next();
+      if (batch_res.ok()) {
+        batch = batch_res.ValueOrDie();

Review comment:
       ```suggestion
           batch = std::move(batch_res).MoveValueUnsafe();
   ```
   This will do a move instead of a copy.

##########
File path: cpp/src/arrow/compute/exec/source_node.cc
##########
@@ -174,12 +177,82 @@ struct SourceNode : ExecNode {
   AsyncGenerator<util::optional<ExecBatch>> generator_;
 };
 
+struct TableSourceNode : public SourceNode {
+  TableSourceNode(ExecPlan* plan, std::shared_ptr<Table> table, int64_t batch_size)
+      : SourceNode(plan, table->schema(), TableGenerator(*table.get(), batch_size)) {}
+
+  static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
+                                const ExecNodeOptions& options) {
+    RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 0, "TableSourceNode"));
+    const auto& table_options = checked_cast<const TableSourceNodeOptions&>(options);
+    const auto table = table_options.table;

Review comment:
       ```suggestion
       const auto& table = table_options.table;
   ```

##########
File path: cpp/src/arrow/compute/exec/source_node.cc
##########
@@ -174,12 +177,82 @@ struct SourceNode : ExecNode {
   AsyncGenerator<util::optional<ExecBatch>> generator_;
 };
 
+struct TableSourceNode : public SourceNode {
+  TableSourceNode(ExecPlan* plan, std::shared_ptr<Table> table, int64_t batch_size)
+      : SourceNode(plan, table->schema(), TableGenerator(*table.get(), batch_size)) {}
+
+  static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
+                                const ExecNodeOptions& options) {
+    RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 0, "TableSourceNode"));
+    const auto& table_options = checked_cast<const TableSourceNodeOptions&>(options);
+    const auto table = table_options.table;
+    const int64_t batch_size = table_options.batch_size;
+
+    RETURN_NOT_OK(ValidateTableSourceNodeInpute(table, batch_size, "TableSourceNode"));

Review comment:
       ```suggestion
       RETURN_NOT_OK(ValidateTableSourceNodeInput(table, batch_size));
   ```

##########
File path: cpp/src/arrow/compute/exec/source_node.cc
##########
@@ -174,12 +177,82 @@ struct SourceNode : ExecNode {
   AsyncGenerator<util::optional<ExecBatch>> generator_;
 };
 
+struct TableSourceNode : public SourceNode {
+  TableSourceNode(ExecPlan* plan, std::shared_ptr<Table> table, int64_t batch_size)
+      : SourceNode(plan, table->schema(), TableGenerator(*table.get(), batch_size)) {}
+
+  static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
+                                const ExecNodeOptions& options) {
+    RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 0, "TableSourceNode"));
+    const auto& table_options = checked_cast<const TableSourceNodeOptions&>(options);
+    const auto table = table_options.table;
+    const int64_t batch_size = table_options.batch_size;
+
+    RETURN_NOT_OK(ValidateTableSourceNodeInpute(table, batch_size, "TableSourceNode"));
+
+    return plan->EmplaceNode<TableSourceNode>(plan, table, batch_size);
+  }
+
+  const char* kind_name() const override { return "TableSourceNode"; }
+
+  static arrow::Status ValidateTableSourceNodeInpute(const std::shared_ptr<Table> table,
+                                                     const int64_t batch_size,
+                                                     const char* kind_name) {

Review comment:
       ```suggestion
                                                        const int64_t batch_size) {
   ```
   We don't need to pass `kind_name` in if we're just going to use a literal.  I see it was based on `ValidateExecNodeInputs` but that case is a little different since that method is implemented in the base class and it doesn't know the kind name.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org