You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/02/14 20:54:27 UTC

[GitHub] [arrow] westonpace commented on a change in pull request #12267: ARROW-15258: [C++] Easy options to create a source node from a table

westonpace commented on a change in pull request #12267:
URL: https://github.com/apache/arrow/pull/12267#discussion_r806208552



##########
File path: cpp/src/arrow/compute/exec/options.h
##########
@@ -55,6 +55,21 @@ class ARROW_EXPORT SourceNodeOptions : public ExecNodeOptions {
   std::function<Future<util::optional<ExecBatch>>()> generator;
 };
 
+/// \brief Adapt an Table as a source node
+/// if plan->exec_context()->executor() is not null.
+class ARROW_EXPORT TableSourceNodeOptions : public ExecNodeOptions {
+ public:
+  TableSourceNodeOptions(std::shared_ptr<Table> table, int64_t batch_size)
+      : table(table), batch_size(batch_size) {}
+
+  // arrow table which acts as the data source
+  std::shared_ptr<Table> table;
+  // batch size which used to set the chunk_size to
+  // the table batch reader used in building the data source
+  // from the table

Review comment:
       ```suggestion
     // Size of batches to emit from this node
     // If the table is larger the node will emit multiple batches from the
     // the table to be processed in parallel.
   ```
   
   Users of this class don't need to know that we are building a data source or that there is a `chunk_size` property.  Someone using this class just  knows they are feeding a table into an execution plan.

##########
File path: cpp/src/arrow/compute/exec/options.h
##########
@@ -55,6 +55,21 @@ class ARROW_EXPORT SourceNodeOptions : public ExecNodeOptions {
   std::function<Future<util::optional<ExecBatch>>()> generator;
 };
 
+/// \brief Adapt an Table as a source node
+/// if plan->exec_context()->executor() is not null.

Review comment:
       ```suggestion
   ```

##########
File path: cpp/src/arrow/compute/exec/source_node.cc
##########
@@ -174,12 +177,82 @@ struct SourceNode : ExecNode {
   AsyncGenerator<util::optional<ExecBatch>> generator_;
 };
 
+struct TableSourceNode : public SourceNode {
+  TableSourceNode(ExecPlan* plan, std::shared_ptr<Table> table, int64_t batch_size)
+      : SourceNode(plan, table->schema(), TableGenerator(*table.get(), batch_size)) {}
+
+  static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
+                                const ExecNodeOptions& options) {
+    RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 0, "TableSourceNode"));
+    const auto& table_options = checked_cast<const TableSourceNodeOptions&>(options);
+    const auto table = table_options.table;
+    const int64_t batch_size = table_options.batch_size;
+
+    RETURN_NOT_OK(ValidateTableSourceNodeInpute(table, batch_size, "TableSourceNode"));
+
+    return plan->EmplaceNode<TableSourceNode>(plan, table, batch_size);
+  }
+
+  const char* kind_name() const override { return "TableSourceNode"; }
+
+  static arrow::Status ValidateTableSourceNodeInpute(const std::shared_ptr<Table> table,
+                                                     const int64_t batch_size,
+                                                     const char* kind_name) {
+    if (table == nullptr) {
+      return Status::Invalid(kind_name, " node requires table which is not null");
+    }
+
+    if (batch_size <= 0) {
+      return Status::Invalid(
+          kind_name, " node requires, batch_size > 0 , but got batch size ", batch_size);
+    }
+
+    return Status::OK();
+  }
+
+  static arrow::AsyncGenerator<util::optional<ExecBatch>> TableGenerator(
+      const Table& table, const int64_t batch_size) {
+    auto batches = ConvertTableToExecBatches(table, batch_size);
+    auto opt_batches = MapVector(
+        [](ExecBatch batch) { return util::make_optional(std::move(batch)); }, batches);
+    AsyncGenerator<util::optional<ExecBatch>> gen;
+    gen = MakeVectorGenerator(std::move(opt_batches));
+    return gen;
+  }
+
+  static std::vector<ExecBatch> ConvertTableToExecBatches(const Table& table,
+                                                          const int64_t batch_size) {
+    std::shared_ptr<TableBatchReader> reader = std::make_shared<TableBatchReader>(table);
+
+    // setting chunksize for the batch reader
+    if (batch_size > 0) {

Review comment:
       Nit: It's not really harmful but there is no way we should be able to get here unless `batch_size > 0` so this check is redundant.

##########
File path: cpp/src/arrow/compute/exec/source_node.cc
##########
@@ -174,12 +177,82 @@ struct SourceNode : ExecNode {
   AsyncGenerator<util::optional<ExecBatch>> generator_;
 };
 
+struct TableSourceNode : public SourceNode {
+  TableSourceNode(ExecPlan* plan, std::shared_ptr<Table> table, int64_t batch_size)
+      : SourceNode(plan, table->schema(), TableGenerator(*table.get(), batch_size)) {}
+
+  static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
+                                const ExecNodeOptions& options) {
+    RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 0, "TableSourceNode"));
+    const auto& table_options = checked_cast<const TableSourceNodeOptions&>(options);
+    const auto table = table_options.table;
+    const int64_t batch_size = table_options.batch_size;
+
+    RETURN_NOT_OK(ValidateTableSourceNodeInpute(table, batch_size, "TableSourceNode"));
+
+    return plan->EmplaceNode<TableSourceNode>(plan, table, batch_size);
+  }
+
+  const char* kind_name() const override { return "TableSourceNode"; }
+
+  static arrow::Status ValidateTableSourceNodeInpute(const std::shared_ptr<Table> table,
+                                                     const int64_t batch_size,
+                                                     const char* kind_name) {
+    if (table == nullptr) {
+      return Status::Invalid(kind_name, " node requires table which is not null");
+    }
+
+    if (batch_size <= 0) {
+      return Status::Invalid(
+          kind_name, " node requires, batch_size > 0 , but got batch size ", batch_size);

Review comment:
       ```suggestion
             "TableSourceNode requires batch_size > 0, but got batch size ", batch_size);
   ```

##########
File path: cpp/src/arrow/compute/exec/source_node.cc
##########
@@ -174,12 +177,82 @@ struct SourceNode : ExecNode {
   AsyncGenerator<util::optional<ExecBatch>> generator_;
 };
 
+struct TableSourceNode : public SourceNode {
+  TableSourceNode(ExecPlan* plan, std::shared_ptr<Table> table, int64_t batch_size)
+      : SourceNode(plan, table->schema(), TableGenerator(*table.get(), batch_size)) {}
+
+  static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
+                                const ExecNodeOptions& options) {
+    RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 0, "TableSourceNode"));
+    const auto& table_options = checked_cast<const TableSourceNodeOptions&>(options);
+    const auto table = table_options.table;
+    const int64_t batch_size = table_options.batch_size;
+
+    RETURN_NOT_OK(ValidateTableSourceNodeInpute(table, batch_size, "TableSourceNode"));
+
+    return plan->EmplaceNode<TableSourceNode>(plan, table, batch_size);
+  }
+
+  const char* kind_name() const override { return "TableSourceNode"; }
+
+  static arrow::Status ValidateTableSourceNodeInpute(const std::shared_ptr<Table> table,
+                                                     const int64_t batch_size,
+                                                     const char* kind_name) {
+    if (table == nullptr) {
+      return Status::Invalid(kind_name, " node requires table which is not null");
+    }
+
+    if (batch_size <= 0) {
+      return Status::Invalid(
+          kind_name, " node requires, batch_size > 0 , but got batch size ", batch_size);
+    }
+
+    return Status::OK();
+  }
+
+  static arrow::AsyncGenerator<util::optional<ExecBatch>> TableGenerator(
+      const Table& table, const int64_t batch_size) {
+    auto batches = ConvertTableToExecBatches(table, batch_size);
+    auto opt_batches = MapVector(
+        [](ExecBatch batch) { return util::make_optional(std::move(batch)); }, batches);
+    AsyncGenerator<util::optional<ExecBatch>> gen;
+    gen = MakeVectorGenerator(std::move(opt_batches));
+    return gen;
+  }
+
+  static std::vector<ExecBatch> ConvertTableToExecBatches(const Table& table,
+                                                          const int64_t batch_size) {
+    std::shared_ptr<TableBatchReader> reader = std::make_shared<TableBatchReader>(table);
+
+    // setting chunksize for the batch reader
+    if (batch_size > 0) {
+      reader->set_chunksize(batch_size);
+    }
+
+    std::shared_ptr<RecordBatch> batch;
+    std::vector<ExecBatch> exec_batches;
+    while (true) {
+      auto batch_res = reader->Next();
+      if (batch_res.ok()) {
+        batch = batch_res.ValueOrDie();
+      }
+      if (batch == NULLPTR) {
+        break;
+      }
+      ExecBatch exec_batch{*batch};
+      exec_batches.push_back(exec_batch);

Review comment:
       ```suggestion
         exec_batches.emplace_back(*batch);
   ```

##########
File path: cpp/src/arrow/compute/exec/source_node.cc
##########
@@ -174,12 +177,82 @@ struct SourceNode : ExecNode {
   AsyncGenerator<util::optional<ExecBatch>> generator_;
 };
 
+struct TableSourceNode : public SourceNode {
+  TableSourceNode(ExecPlan* plan, std::shared_ptr<Table> table, int64_t batch_size)
+      : SourceNode(plan, table->schema(), TableGenerator(*table.get(), batch_size)) {}
+
+  static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
+                                const ExecNodeOptions& options) {
+    RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 0, "TableSourceNode"));
+    const auto& table_options = checked_cast<const TableSourceNodeOptions&>(options);
+    const auto table = table_options.table;
+    const int64_t batch_size = table_options.batch_size;
+
+    RETURN_NOT_OK(ValidateTableSourceNodeInpute(table, batch_size, "TableSourceNode"));
+
+    return plan->EmplaceNode<TableSourceNode>(plan, table, batch_size);
+  }
+
+  const char* kind_name() const override { return "TableSourceNode"; }
+
+  static arrow::Status ValidateTableSourceNodeInpute(const std::shared_ptr<Table> table,
+                                                     const int64_t batch_size,
+                                                     const char* kind_name) {
+    if (table == nullptr) {
+      return Status::Invalid(kind_name, " node requires table which is not null");

Review comment:
       ```suggestion
         return Status::Invalid("TableSourceNode requires a table which is not null");
   ```

##########
File path: cpp/src/arrow/compute/exec/source_node.cc
##########
@@ -174,12 +177,82 @@ struct SourceNode : ExecNode {
   AsyncGenerator<util::optional<ExecBatch>> generator_;
 };
 
+struct TableSourceNode : public SourceNode {
+  TableSourceNode(ExecPlan* plan, std::shared_ptr<Table> table, int64_t batch_size)
+      : SourceNode(plan, table->schema(), TableGenerator(*table.get(), batch_size)) {}
+
+  static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
+                                const ExecNodeOptions& options) {
+    RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 0, "TableSourceNode"));
+    const auto& table_options = checked_cast<const TableSourceNodeOptions&>(options);
+    const auto table = table_options.table;
+    const int64_t batch_size = table_options.batch_size;
+
+    RETURN_NOT_OK(ValidateTableSourceNodeInpute(table, batch_size, "TableSourceNode"));
+
+    return plan->EmplaceNode<TableSourceNode>(plan, table, batch_size);
+  }
+
+  const char* kind_name() const override { return "TableSourceNode"; }
+
+  static arrow::Status ValidateTableSourceNodeInpute(const std::shared_ptr<Table> table,

Review comment:
       ```suggestion
     static arrow::Status ValidateTableSourceNodeInput(const std::shared_ptr<Table>& table,
   ```
   
   Thanks for including this btw, it is good to validate user options.

##########
File path: cpp/src/arrow/compute/exec/source_node.cc
##########
@@ -174,12 +177,82 @@ struct SourceNode : ExecNode {
   AsyncGenerator<util::optional<ExecBatch>> generator_;
 };
 
+struct TableSourceNode : public SourceNode {
+  TableSourceNode(ExecPlan* plan, std::shared_ptr<Table> table, int64_t batch_size)
+      : SourceNode(plan, table->schema(), TableGenerator(*table.get(), batch_size)) {}

Review comment:
       ```suggestion
         : SourceNode(plan, table->schema(), TableGenerator(*table, batch_size)) {}
   ```

##########
File path: cpp/src/arrow/compute/exec/source_node.cc
##########
@@ -174,12 +177,82 @@ struct SourceNode : ExecNode {
   AsyncGenerator<util::optional<ExecBatch>> generator_;
 };
 
+struct TableSourceNode : public SourceNode {
+  TableSourceNode(ExecPlan* plan, std::shared_ptr<Table> table, int64_t batch_size)
+      : SourceNode(plan, table->schema(), TableGenerator(*table.get(), batch_size)) {}
+
+  static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
+                                const ExecNodeOptions& options) {
+    RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 0, "TableSourceNode"));
+    const auto& table_options = checked_cast<const TableSourceNodeOptions&>(options);
+    const auto table = table_options.table;
+    const int64_t batch_size = table_options.batch_size;
+
+    RETURN_NOT_OK(ValidateTableSourceNodeInpute(table, batch_size, "TableSourceNode"));
+
+    return plan->EmplaceNode<TableSourceNode>(plan, table, batch_size);
+  }
+
+  const char* kind_name() const override { return "TableSourceNode"; }
+
+  static arrow::Status ValidateTableSourceNodeInpute(const std::shared_ptr<Table> table,
+                                                     const int64_t batch_size,
+                                                     const char* kind_name) {
+    if (table == nullptr) {
+      return Status::Invalid(kind_name, " node requires table which is not null");
+    }
+
+    if (batch_size <= 0) {
+      return Status::Invalid(
+          kind_name, " node requires, batch_size > 0 , but got batch size ", batch_size);
+    }
+
+    return Status::OK();
+  }
+
+  static arrow::AsyncGenerator<util::optional<ExecBatch>> TableGenerator(
+      const Table& table, const int64_t batch_size) {
+    auto batches = ConvertTableToExecBatches(table, batch_size);
+    auto opt_batches = MapVector(
+        [](ExecBatch batch) { return util::make_optional(std::move(batch)); }, batches);

Review comment:
       ```suggestion
       auto opt_batches = MapVector(
           [](ExecBatch batch) { return util::make_optional(std::move(batch)); }, std::move(batches));
   ```

##########
File path: cpp/src/arrow/compute/exec/source_node.cc
##########
@@ -174,12 +177,82 @@ struct SourceNode : ExecNode {
   AsyncGenerator<util::optional<ExecBatch>> generator_;
 };
 
+struct TableSourceNode : public SourceNode {
+  TableSourceNode(ExecPlan* plan, std::shared_ptr<Table> table, int64_t batch_size)
+      : SourceNode(plan, table->schema(), TableGenerator(*table.get(), batch_size)) {}
+
+  static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
+                                const ExecNodeOptions& options) {
+    RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 0, "TableSourceNode"));
+    const auto& table_options = checked_cast<const TableSourceNodeOptions&>(options);
+    const auto table = table_options.table;
+    const int64_t batch_size = table_options.batch_size;
+
+    RETURN_NOT_OK(ValidateTableSourceNodeInpute(table, batch_size, "TableSourceNode"));
+
+    return plan->EmplaceNode<TableSourceNode>(plan, table, batch_size);
+  }
+
+  const char* kind_name() const override { return "TableSourceNode"; }
+
+  static arrow::Status ValidateTableSourceNodeInpute(const std::shared_ptr<Table> table,
+                                                     const int64_t batch_size,
+                                                     const char* kind_name) {
+    if (table == nullptr) {
+      return Status::Invalid(kind_name, " node requires table which is not null");
+    }
+
+    if (batch_size <= 0) {
+      return Status::Invalid(
+          kind_name, " node requires, batch_size > 0 , but got batch size ", batch_size);
+    }
+
+    return Status::OK();
+  }
+
+  static arrow::AsyncGenerator<util::optional<ExecBatch>> TableGenerator(
+      const Table& table, const int64_t batch_size) {
+    auto batches = ConvertTableToExecBatches(table, batch_size);
+    auto opt_batches = MapVector(
+        [](ExecBatch batch) { return util::make_optional(std::move(batch)); }, batches);
+    AsyncGenerator<util::optional<ExecBatch>> gen;
+    gen = MakeVectorGenerator(std::move(opt_batches));
+    return gen;
+  }
+
+  static std::vector<ExecBatch> ConvertTableToExecBatches(const Table& table,
+                                                          const int64_t batch_size) {
+    std::shared_ptr<TableBatchReader> reader = std::make_shared<TableBatchReader>(table);
+
+    // setting chunksize for the batch reader
+    if (batch_size > 0) {
+      reader->set_chunksize(batch_size);
+    }
+
+    std::shared_ptr<RecordBatch> batch;
+    std::vector<ExecBatch> exec_batches;
+    while (true) {
+      auto batch_res = reader->Next();
+      if (batch_res.ok()) {
+        batch = batch_res.ValueOrDie();

Review comment:
       ```suggestion
           batch = std::move(batch_res).MoveValueUnsafe();
   ```
   This will do a move instead of a copy.

##########
File path: cpp/src/arrow/compute/exec/source_node.cc
##########
@@ -174,12 +177,82 @@ struct SourceNode : ExecNode {
   AsyncGenerator<util::optional<ExecBatch>> generator_;
 };
 
+struct TableSourceNode : public SourceNode {
+  TableSourceNode(ExecPlan* plan, std::shared_ptr<Table> table, int64_t batch_size)
+      : SourceNode(plan, table->schema(), TableGenerator(*table.get(), batch_size)) {}
+
+  static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
+                                const ExecNodeOptions& options) {
+    RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 0, "TableSourceNode"));
+    const auto& table_options = checked_cast<const TableSourceNodeOptions&>(options);
+    const auto table = table_options.table;

Review comment:
       ```suggestion
       const auto& table = table_options.table;
   ```

##########
File path: cpp/src/arrow/compute/exec/source_node.cc
##########
@@ -174,12 +177,82 @@ struct SourceNode : ExecNode {
   AsyncGenerator<util::optional<ExecBatch>> generator_;
 };
 
+struct TableSourceNode : public SourceNode {
+  TableSourceNode(ExecPlan* plan, std::shared_ptr<Table> table, int64_t batch_size)
+      : SourceNode(plan, table->schema(), TableGenerator(*table.get(), batch_size)) {}
+
+  static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
+                                const ExecNodeOptions& options) {
+    RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 0, "TableSourceNode"));
+    const auto& table_options = checked_cast<const TableSourceNodeOptions&>(options);
+    const auto table = table_options.table;
+    const int64_t batch_size = table_options.batch_size;
+
+    RETURN_NOT_OK(ValidateTableSourceNodeInpute(table, batch_size, "TableSourceNode"));

Review comment:
       ```suggestion
       RETURN_NOT_OK(ValidateTableSourceNodeInput(table, batch_size));
   ```

##########
File path: cpp/src/arrow/compute/exec/source_node.cc
##########
@@ -174,12 +177,82 @@ struct SourceNode : ExecNode {
   AsyncGenerator<util::optional<ExecBatch>> generator_;
 };
 
+struct TableSourceNode : public SourceNode {
+  TableSourceNode(ExecPlan* plan, std::shared_ptr<Table> table, int64_t batch_size)
+      : SourceNode(plan, table->schema(), TableGenerator(*table.get(), batch_size)) {}
+
+  static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
+                                const ExecNodeOptions& options) {
+    RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 0, "TableSourceNode"));
+    const auto& table_options = checked_cast<const TableSourceNodeOptions&>(options);
+    const auto table = table_options.table;
+    const int64_t batch_size = table_options.batch_size;
+
+    RETURN_NOT_OK(ValidateTableSourceNodeInpute(table, batch_size, "TableSourceNode"));
+
+    return plan->EmplaceNode<TableSourceNode>(plan, table, batch_size);
+  }
+
+  const char* kind_name() const override { return "TableSourceNode"; }
+
+  static arrow::Status ValidateTableSourceNodeInpute(const std::shared_ptr<Table> table,
+                                                     const int64_t batch_size,
+                                                     const char* kind_name) {

Review comment:
       ```suggestion
                                                        const int64_t batch_size) {
   ```
   We don't need to pass `kind_name` in if we're just going to use a literal.  I see it was based on `ValidateExecNodeInputs` but that case is a little different since that method is implemented in the base class and it doesn't know the kind name.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org