You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/04/29 17:44:50 UTC

[GitHub] [arrow] pitrou opened a new pull request #10204: [WIP] ARROW-11928: [C++] Execution engine API

pitrou opened a new pull request #10204:
URL: https://github.com/apache/arrow/pull/10204


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on pull request #10204: [WIP] ARROW-11928: [C++] Execution engine API

Posted by GitBox <gi...@apache.org>.
pitrou commented on pull request #10204:
URL: https://github.com/apache/arrow/pull/10204#issuecomment-829464949


   @wesm @bkietz @westonpace This is still quite draft-ish but you may want to look out for potential gotchas.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] bkietz commented on pull request #10204: [WIP] ARROW-11928: [C++] Execution engine API

Posted by GitBox <gi...@apache.org>.
bkietz commented on pull request #10204:
URL: https://github.com/apache/arrow/pull/10204#issuecomment-833803501


   I think we can defer support for multiple consumers for the moment, I'm refactoring to singular output nodes


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] bkietz commented on pull request #10204: [WIP] ARROW-11928: [C++] Execution engine API

Posted by GitBox <gi...@apache.org>.
bkietz commented on pull request #10204:
URL: https://github.com/apache/arrow/pull/10204#issuecomment-834542335


   I would workaround this with an API for producing conjoined split nodes:
   
   ```c++
   /// Split the output of `to_be_split` for consumption by multiple downstream nodes
   ///
   /// Each split node will list `to_be_split` as its only input, though `to_be_split` will only
   /// consider the split node as its output. Whenever `to_be_split` pushes to the first split
   /// node, that data will be replicated to the outputs of each split node. Back pressure
   /// on any split node will also be felt by `to_be_split`.
   std::vector<std::unique_ptr<ExecNode>> MakeSplit(ExecNode* to_be_split, int n_splits);
   ```
   
   I think that the multiple consumer case will be sufficiently uncommon that making it
   a first class citizen of the API will be more confusing than dedicated split nodes.
   Doubly so since there's a semantic distinction between the inputs: a FilterNode would
   have one input for values-to-be-filtered and a second for masks/selection vectors
   whereas any node with multiple outputs would be pushing identical batches to each.
   
   @michalursa


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on a change in pull request #10204: [WIP] ARROW-11928: [C++] Execution engine API

Posted by GitBox <gi...@apache.org>.
pitrou commented on a change in pull request #10204:
URL: https://github.com/apache/arrow/pull/10204#discussion_r623789186



##########
File path: cpp/src/arrow/engine/exec_plan.h
##########
@@ -0,0 +1,242 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "arrow/compute/type_fwd.h"
+#include "arrow/type_fwd.h"
+#include "arrow/util/macros.h"
+#include "arrow/util/visibility.h"
+
+// NOTES:
+// - ExecBatches only have arrays or scalars
+// - data streams may be ordered, so add input number?
+// - node to combine input needs to reorder
+
+namespace arrow {
+namespace engine {
+
+class ExecNode;
+
+class ARROW_EXPORT ExecPlan : public std::enable_shared_from_this<ExecPlan> {
+ public:
+  using NodeVector = std::vector<ExecNode*>;
+
+  virtual ~ExecPlan() = default;
+
+  /// Make an empty exec plan
+  static Result<std::shared_ptr<ExecPlan>> Make();
+
+  void AddNode(std::unique_ptr<ExecNode> node);
+
+  /// The initial inputs
+  const NodeVector& sources() const;
+
+  /// The final outputs
+  const NodeVector& sinks() const;
+
+  // XXX API question:
+  // There are clearly two phases in the ExecPlan lifecycle:
+  // - one construction phase where AddNode() and ExecNode::Bind() is called
+  //   (with optional validation at the end)
+  // - one execution phase where the nodes are topo-sorted and then started
+  //
+  // => Should we separate out those APIs? e.g. have a ExecPlanBuilder
+  // for the first phase.
+
+  Status Validate();
+
+  /// Start producing on all nodes
+  ///
+  /// Nodes are started in reverse topological order, such that any node
+  /// is started before all of its inputs.
+  Status StartProducing();
+
+  // XXX should we also have `void StopProducing()`?
+
+ protected:
+  ExecPlan() = default;
+};
+
+class ARROW_EXPORT ExecNode {
+ public:
+  struct OutputNode {
+    ExecNode* output;
+    // Index of corresponding input in `output` node
+    int input_index;
+  };
+
+  using NodeVector = std::vector<ExecNode*>;
+  using OutputNodeVector = std::vector<OutputNode>;
+  using BatchDescr = std::vector<ValueDescr>;
+
+  virtual ~ExecNode();
+
+  virtual const char* kind_name() = 0;
+  // The number of inputs and outputs expected by this node

Review comment:
       Expected. They're mostly there for validation and debugging. I'm not sure we'll keep them, but introspectability can be useful.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on a change in pull request #10204: [WIP] ARROW-11928: [C++] Execution engine API

Posted by GitBox <gi...@apache.org>.
pitrou commented on a change in pull request #10204:
URL: https://github.com/apache/arrow/pull/10204#discussion_r628278986



##########
File path: cpp/src/arrow/engine/exec_plan.h
##########
@@ -0,0 +1,245 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "arrow/compute/type_fwd.h"
+#include "arrow/type_fwd.h"
+#include "arrow/util/macros.h"
+#include "arrow/util/visibility.h"
+
+// NOTES:
+// - ExecBatches only have arrays or scalars
+// - data streams may be ordered, so add input number?
+// - node to combine input needs to reorder
+
+namespace arrow {
+namespace engine {
+
+class ExecNode;
+
+class ARROW_EXPORT ExecPlan : public std::enable_shared_from_this<ExecPlan> {
+ public:
+  using NodeVector = std::vector<ExecNode*>;
+
+  virtual ~ExecPlan() = default;
+
+  /// Make an empty exec plan
+  static Result<std::shared_ptr<ExecPlan>> Make();
+
+  void AddNode(std::unique_ptr<ExecNode> node);
+
+  /// The initial inputs
+  const NodeVector& sources() const;
+
+  /// The final outputs
+  const NodeVector& sinks() const;
+
+  // XXX API question:
+  // There are clearly two phases in the ExecPlan lifecycle:
+  // - one construction phase where AddNode() and ExecNode::Bind() is called
+  //   (with optional validation at the end)
+  // - one execution phase where the nodes are topo-sorted and then started
+  //
+  // => Should we separate out those APIs? e.g. have a ExecPlanBuilder
+  // for the first phase.
+
+  Status Validate();
+
+  /// Start producing on all nodes
+  ///
+  /// Nodes are started in reverse topological order, such that any node
+  /// is started before all of its inputs.
+  Status StartProducing();
+
+  // XXX should we also have `void StopProducing()`?
+
+ protected:
+  ExecPlan() = default;
+};
+
+class ARROW_EXPORT ExecNode {
+ public:
+  struct OutputNode {
+    ExecNode* output;
+    // Index of corresponding input in `output` node
+    int input_index;
+  };
+
+  using NodeVector = std::vector<ExecNode*>;
+  using OutputNodeVector = std::vector<OutputNode>;
+  using BatchDescr = std::vector<ValueDescr>;
+
+  virtual ~ExecNode();
+
+  virtual const char* kind_name() = 0;
+  // The number of inputs and outputs expected by this node
+  // XXX should these simply return `input_descrs_.size()`
+  // (`output_descrs_.size()` respectively)?
+  virtual int num_inputs() const = 0;
+  virtual int num_outputs() const = 0;
+
+  /// This node's predecessors in the exec plan
+  const NodeVector& inputs() const { return inputs_; }
+
+  /// The datatypes for each input
+  // XXX Should it be std::vector<DataType>?
+  const std::vector<BatchDescr>& input_descrs() const { return input_descrs_; }
+
+  /// This node's successors in the exec plan
+  const OutputNodeVector& outputs() const { return outputs_; }
+
+  /// The datatypes for each output
+  // XXX Should it be std::vector<DataType>?
+  const std::vector<BatchDescr>& output_descrs() const { return output_descrs_; }
+
+  /// This node's exec plan
+  ExecPlan* plan() { return plan_; }
+  std::shared_ptr<ExecPlan> plan_ref() { return plan_->shared_from_this(); }
+
+  /// \brief An optional label, for display and debugging
+  ///
+  /// There is no guarantee that this value is non-empty or unique.
+  const std::string& label() const { return label_; }
+
+  int AddInput(ExecNode* node) {

Review comment:
       +1




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #10204: [WIP] ARROW-11928: [C++] Execution engine API

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #10204:
URL: https://github.com/apache/arrow/pull/10204#discussion_r623505500



##########
File path: cpp/src/arrow/engine/exec_plan.h
##########
@@ -0,0 +1,242 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "arrow/compute/type_fwd.h"
+#include "arrow/type_fwd.h"
+#include "arrow/util/macros.h"
+#include "arrow/util/visibility.h"
+
+// NOTES:
+// - ExecBatches only have arrays or scalars
+// - data streams may be ordered, so add input number?
+// - node to combine input needs to reorder
+
+namespace arrow {
+namespace engine {
+
+class ExecNode;
+
+class ARROW_EXPORT ExecPlan : public std::enable_shared_from_this<ExecPlan> {
+ public:
+  using NodeVector = std::vector<ExecNode*>;
+
+  virtual ~ExecPlan() = default;
+
+  /// Make an empty exec plan
+  static Result<std::shared_ptr<ExecPlan>> Make();
+
+  void AddNode(std::unique_ptr<ExecNode> node);
+
+  /// The initial inputs
+  const NodeVector& sources() const;
+
+  /// The final outputs
+  const NodeVector& sinks() const;
+
+  // XXX API question:
+  // There are clearly two phases in the ExecPlan lifecycle:
+  // - one construction phase where AddNode() and ExecNode::Bind() is called
+  //   (with optional validation at the end)
+  // - one execution phase where the nodes are topo-sorted and then started
+  //
+  // => Should we separate out those APIs? e.g. have a ExecPlanBuilder
+  // for the first phase.
+
+  Status Validate();
+
+  /// Start producing on all nodes
+  ///
+  /// Nodes are started in reverse topological order, such that any node
+  /// is started before all of its inputs.
+  Status StartProducing();
+
+  // XXX should we also have `void StopProducing()`?

Review comment:
       Or you could use a stop token.

##########
File path: cpp/src/arrow/engine/exec_plan.h
##########
@@ -0,0 +1,242 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "arrow/compute/type_fwd.h"
+#include "arrow/type_fwd.h"
+#include "arrow/util/macros.h"
+#include "arrow/util/visibility.h"
+
+// NOTES:
+// - ExecBatches only have arrays or scalars
+// - data streams may be ordered, so add input number?
+// - node to combine input needs to reorder
+
+namespace arrow {
+namespace engine {
+
+class ExecNode;
+
+class ARROW_EXPORT ExecPlan : public std::enable_shared_from_this<ExecPlan> {
+ public:
+  using NodeVector = std::vector<ExecNode*>;
+
+  virtual ~ExecPlan() = default;
+
+  /// Make an empty exec plan
+  static Result<std::shared_ptr<ExecPlan>> Make();
+
+  void AddNode(std::unique_ptr<ExecNode> node);
+
+  /// The initial inputs
+  const NodeVector& sources() const;
+
+  /// The final outputs
+  const NodeVector& sinks() const;
+
+  // XXX API question:
+  // There are clearly two phases in the ExecPlan lifecycle:
+  // - one construction phase where AddNode() and ExecNode::Bind() is called
+  //   (with optional validation at the end)
+  // - one execution phase where the nodes are topo-sorted and then started
+  //
+  // => Should we separate out those APIs? e.g. have a ExecPlanBuilder
+  // for the first phase.
+
+  Status Validate();
+
+  /// Start producing on all nodes
+  ///
+  /// Nodes are started in reverse topological order, such that any node
+  /// is started before all of its inputs.
+  Status StartProducing();
+
+  // XXX should we also have `void StopProducing()`?
+
+ protected:
+  ExecPlan() = default;
+};
+
+class ARROW_EXPORT ExecNode {
+ public:
+  struct OutputNode {
+    ExecNode* output;
+    // Index of corresponding input in `output` node
+    int input_index;
+  };
+
+  using NodeVector = std::vector<ExecNode*>;
+  using OutputNodeVector = std::vector<OutputNode>;
+  using BatchDescr = std::vector<ValueDescr>;
+
+  virtual ~ExecNode();
+
+  virtual const char* kind_name() = 0;
+  // The number of inputs and outputs expected by this node

Review comment:
       Expected?  Or current?  What's the purpose of these fields?

##########
File path: cpp/src/arrow/engine/exec_plan.h
##########
@@ -0,0 +1,242 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "arrow/compute/type_fwd.h"
+#include "arrow/type_fwd.h"
+#include "arrow/util/macros.h"
+#include "arrow/util/visibility.h"
+
+// NOTES:
+// - ExecBatches only have arrays or scalars
+// - data streams may be ordered, so add input number?
+// - node to combine input needs to reorder
+
+namespace arrow {
+namespace engine {
+
+class ExecNode;
+
+class ARROW_EXPORT ExecPlan : public std::enable_shared_from_this<ExecPlan> {
+ public:
+  using NodeVector = std::vector<ExecNode*>;
+
+  virtual ~ExecPlan() = default;
+
+  /// Make an empty exec plan
+  static Result<std::shared_ptr<ExecPlan>> Make();
+
+  void AddNode(std::unique_ptr<ExecNode> node);
+
+  /// The initial inputs
+  const NodeVector& sources() const;
+
+  /// The final outputs
+  const NodeVector& sinks() const;
+
+  // XXX API question:
+  // There are clearly two phases in the ExecPlan lifecycle:
+  // - one construction phase where AddNode() and ExecNode::Bind() is called
+  //   (with optional validation at the end)
+  // - one execution phase where the nodes are topo-sorted and then started
+  //
+  // => Should we separate out those APIs? e.g. have a ExecPlanBuilder
+  // for the first phase.
+
+  Status Validate();
+
+  /// Start producing on all nodes
+  ///
+  /// Nodes are started in reverse topological order, such that any node
+  /// is started before all of its inputs.
+  Status StartProducing();
+
+  // XXX should we also have `void StopProducing()`?
+
+ protected:
+  ExecPlan() = default;
+};
+
+class ARROW_EXPORT ExecNode {
+ public:
+  struct OutputNode {
+    ExecNode* output;
+    // Index of corresponding input in `output` node
+    int input_index;
+  };
+
+  using NodeVector = std::vector<ExecNode*>;
+  using OutputNodeVector = std::vector<OutputNode>;
+  using BatchDescr = std::vector<ValueDescr>;
+
+  virtual ~ExecNode();
+
+  virtual const char* kind_name() = 0;
+  // The number of inputs and outputs expected by this node
+  // XXX should these simply return `input_descrs_.size()`
+  // (`output_descrs_.size()` respectively)?
+  virtual int num_inputs() const = 0;
+  virtual int num_outputs() const = 0;
+
+  /// This node's predecessors in the exec plan
+  const NodeVector& inputs() const { return inputs_; }
+
+  /// The datatypes for each input
+  // XXX Should it be std::vector<DataType>?
+  const std::vector<BatchDescr>& input_descrs() const { return input_descrs_; }
+
+  /// This node's successors in the exec plan
+  const OutputNodeVector& outputs() const { return outputs_; }
+
+  /// The datatypes for each output
+  // XXX Should it be std::vector<DataType>?
+  const std::vector<BatchDescr>& output_descrs() const { return output_descrs_; }
+
+  /// This node's exec plan
+  ExecPlan* plan() { return plan_; }
+  std::shared_ptr<ExecPlan> plan_ref() { return plan_->shared_from_this(); }
+
+  /// \brief An optional label, for display and debugging
+  ///
+  /// There is no guarantee that this value is non-empty or unique.
+  const std::string& label() const { return label_; }
+
+  int AddInput(ExecNode* node) {
+    inputs_.push_back(node);
+    return static_cast<int>(inputs_.size() - 1);
+  }
+
+  void AddOutput(ExecNode* node, int input_index) {
+    outputs_.push_back({node, input_index});
+  }
+
+  static void Bind(ExecNode* input, ExecNode* output) {
+    input->AddOutput(output, output->AddInput(input));
+  }
+
+  Status Validate() const;
+
+  /// Upstream API:
+  /// These functions are called by input nodes that want to inform this node
+  /// about an updated condition (a new input batch, an error, an impeding
+  /// end of stream).
+  ///
+  /// Implementation rules:
+  /// - these may be called anytime after StartProducing() has succeeded
+  ///   (and even during or after StopProducing())
+  /// - these may be called concurrently
+  /// - these are allowed to call back into PauseProducing(), ResumeProducing()
+  ///   and StopProducing()
+
+  /// Transfer input batch to ExecNode
+  virtual void InputReceived(int input_index, int seq_num, compute::ExecBatch batch) = 0;
+
+  /// Signal error to ExecNode
+  virtual void ErrorReceived(int input_index, Status error) = 0;
+
+  /// Mark the inputs finished after the given number of batches.
+  ///
+  /// This may be called before all inputs are received.  This simply fixes
+  /// the total number of incoming batches for an input, so that the ExecNode
+  /// knows when it has received all input, regardless of order.
+  virtual void InputFinished(int input_index, int seq_stop) = 0;
+
+  /// Lifecycle API:
+  /// - start / stop to initiate and terminate production
+  /// - pause / resume to apply backpressure
+  ///
+  /// Implementation rules:
+  /// - StartProducing() should not recurse into the inputs, as it is
+  ///   handled by ExecPlan::StartProducing()
+  /// - PauseProducing(), ResumeProducing(), StopProducing() may be called
+  ///   concurrently (but only after StartProducing() has returned successfully)
+  /// - PauseProducing(), ResumeProducing(), StopProducing() may be called
+  ///   by the downstream nodes' InputReceived(), ErrorReceived(), InputFinished()
+  ///   methods
+  /// - StopProducing() should recurse into the inputs
+  /// - StopProducing() must be idempotent
+
+  // XXX What happens if StartProducing() calls an output's InputReceived()
+  // synchronously, and InputReceived() decides to call back into StopProducing()
+  // (or PauseProducing()) because it received enough data?
+  //
+  // Right now, since synchronous calls happen in both directions (input to
+  // output and then output to input), a node must be careful to be reentrant

Review comment:
       What is the output -> input direction?  Backpressure?

##########
File path: cpp/src/arrow/engine/exec_plan.h
##########
@@ -0,0 +1,242 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "arrow/compute/type_fwd.h"
+#include "arrow/type_fwd.h"
+#include "arrow/util/macros.h"
+#include "arrow/util/visibility.h"
+
+// NOTES:
+// - ExecBatches only have arrays or scalars
+// - data streams may be ordered, so add input number?
+// - node to combine input needs to reorder
+
+namespace arrow {
+namespace engine {
+
+class ExecNode;
+
+class ARROW_EXPORT ExecPlan : public std::enable_shared_from_this<ExecPlan> {
+ public:
+  using NodeVector = std::vector<ExecNode*>;
+
+  virtual ~ExecPlan() = default;
+
+  /// Make an empty exec plan
+  static Result<std::shared_ptr<ExecPlan>> Make();
+
+  void AddNode(std::unique_ptr<ExecNode> node);
+
+  /// The initial inputs
+  const NodeVector& sources() const;
+
+  /// The final outputs
+  const NodeVector& sinks() const;
+
+  // XXX API question:
+  // There are clearly two phases in the ExecPlan lifecycle:
+  // - one construction phase where AddNode() and ExecNode::Bind() is called
+  //   (with optional validation at the end)
+  // - one execution phase where the nodes are topo-sorted and then started
+  //
+  // => Should we separate out those APIs? e.g. have a ExecPlanBuilder
+  // for the first phase.
+
+  Status Validate();
+
+  /// Start producing on all nodes
+  ///
+  /// Nodes are started in reverse topological order, such that any node
+  /// is started before all of its inputs.
+  Status StartProducing();
+
+  // XXX should we also have `void StopProducing()`?
+
+ protected:
+  ExecPlan() = default;
+};
+
+class ARROW_EXPORT ExecNode {
+ public:
+  struct OutputNode {
+    ExecNode* output;
+    // Index of corresponding input in `output` node
+    int input_index;
+  };
+
+  using NodeVector = std::vector<ExecNode*>;
+  using OutputNodeVector = std::vector<OutputNode>;
+  using BatchDescr = std::vector<ValueDescr>;
+
+  virtual ~ExecNode();
+
+  virtual const char* kind_name() = 0;
+  // The number of inputs and outputs expected by this node
+  // XXX should these simply return `input_descrs_.size()`
+  // (`output_descrs_.size()` respectively)?
+  virtual int num_inputs() const = 0;
+  virtual int num_outputs() const = 0;
+
+  /// This node's predecessors in the exec plan
+  const NodeVector& inputs() const { return inputs_; }
+
+  /// The datatypes for each input
+  // XXX Should it be std::vector<DataType>?
+  const std::vector<BatchDescr>& input_descrs() const { return input_descrs_; }

Review comment:
       When starting a scan Arrow doesn't know the physical schema at all.  Reading through this it seems like the execution plan cannot be created without knowing the physical schema.  Is the plan to do some initial scanning before creating an execution plan?

##########
File path: cpp/src/arrow/testing/future_util.h
##########
@@ -119,4 +119,14 @@ void AssertFailed(const Future<T>& fut) {
   }
 }
 
+template <typename T>
+Future<T> DelayAsync(Future<T> fut, double seconds) {

Review comment:
       I'm not sure what this is adding over `SleepAsync` but it's not used so I assume it's just part of the WIP

##########
File path: cpp/src/arrow/engine/exec_plan.h
##########
@@ -0,0 +1,242 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "arrow/compute/type_fwd.h"
+#include "arrow/type_fwd.h"
+#include "arrow/util/macros.h"
+#include "arrow/util/visibility.h"
+
+// NOTES:
+// - ExecBatches only have arrays or scalars
+// - data streams may be ordered, so add input number?
+// - node to combine input needs to reorder
+
+namespace arrow {
+namespace engine {
+
+class ExecNode;
+
+class ARROW_EXPORT ExecPlan : public std::enable_shared_from_this<ExecPlan> {
+ public:
+  using NodeVector = std::vector<ExecNode*>;
+
+  virtual ~ExecPlan() = default;
+
+  /// Make an empty exec plan
+  static Result<std::shared_ptr<ExecPlan>> Make();
+
+  void AddNode(std::unique_ptr<ExecNode> node);
+
+  /// The initial inputs
+  const NodeVector& sources() const;
+
+  /// The final outputs
+  const NodeVector& sinks() const;
+
+  // XXX API question:
+  // There are clearly two phases in the ExecPlan lifecycle:
+  // - one construction phase where AddNode() and ExecNode::Bind() is called
+  //   (with optional validation at the end)
+  // - one execution phase where the nodes are topo-sorted and then started
+  //
+  // => Should we separate out those APIs? e.g. have a ExecPlanBuilder
+  // for the first phase.
+
+  Status Validate();
+
+  /// Start producing on all nodes
+  ///
+  /// Nodes are started in reverse topological order, such that any node
+  /// is started before all of its inputs.
+  Status StartProducing();
+
+  // XXX should we also have `void StopProducing()`?
+
+ protected:
+  ExecPlan() = default;
+};
+
+class ARROW_EXPORT ExecNode {
+ public:
+  struct OutputNode {
+    ExecNode* output;
+    // Index of corresponding input in `output` node
+    int input_index;
+  };
+
+  using NodeVector = std::vector<ExecNode*>;
+  using OutputNodeVector = std::vector<OutputNode>;
+  using BatchDescr = std::vector<ValueDescr>;
+
+  virtual ~ExecNode();
+
+  virtual const char* kind_name() = 0;
+  // The number of inputs and outputs expected by this node
+  // XXX should these simply return `input_descrs_.size()`
+  // (`output_descrs_.size()` respectively)?
+  virtual int num_inputs() const = 0;
+  virtual int num_outputs() const = 0;
+
+  /// This node's predecessors in the exec plan
+  const NodeVector& inputs() const { return inputs_; }
+
+  /// The datatypes for each input
+  // XXX Should it be std::vector<DataType>?
+  const std::vector<BatchDescr>& input_descrs() const { return input_descrs_; }
+
+  /// This node's successors in the exec plan
+  const OutputNodeVector& outputs() const { return outputs_; }
+
+  /// The datatypes for each output
+  // XXX Should it be std::vector<DataType>?
+  const std::vector<BatchDescr>& output_descrs() const { return output_descrs_; }
+
+  /// This node's exec plan
+  ExecPlan* plan() { return plan_; }
+  std::shared_ptr<ExecPlan> plan_ref() { return plan_->shared_from_this(); }
+
+  /// \brief An optional label, for display and debugging
+  ///
+  /// There is no guarantee that this value is non-empty or unique.
+  const std::string& label() const { return label_; }
+
+  int AddInput(ExecNode* node) {
+    inputs_.push_back(node);
+    return static_cast<int>(inputs_.size() - 1);
+  }
+
+  void AddOutput(ExecNode* node, int input_index) {
+    outputs_.push_back({node, input_index});
+  }
+
+  static void Bind(ExecNode* input, ExecNode* output) {
+    input->AddOutput(output, output->AddInput(input));
+  }
+
+  Status Validate() const;
+
+  /// Upstream API:
+  /// These functions are called by input nodes that want to inform this node
+  /// about an updated condition (a new input batch, an error, an impeding
+  /// end of stream).
+  ///
+  /// Implementation rules:
+  /// - these may be called anytime after StartProducing() has succeeded
+  ///   (and even during or after StopProducing())
+  /// - these may be called concurrently
+  /// - these are allowed to call back into PauseProducing(), ResumeProducing()
+  ///   and StopProducing()
+
+  /// Transfer input batch to ExecNode
+  virtual void InputReceived(int input_index, int seq_num, compute::ExecBatch batch) = 0;
+
+  /// Signal error to ExecNode
+  virtual void ErrorReceived(int input_index, Status error) = 0;
+
+  /// Mark the inputs finished after the given number of batches.
+  ///
+  /// This may be called before all inputs are received.  This simply fixes
+  /// the total number of incoming batches for an input, so that the ExecNode
+  /// knows when it has received all input, regardless of order.
+  virtual void InputFinished(int input_index, int seq_stop) = 0;
+
+  /// Lifecycle API:
+  /// - start / stop to initiate and terminate production
+  /// - pause / resume to apply backpressure
+  ///
+  /// Implementation rules:
+  /// - StartProducing() should not recurse into the inputs, as it is
+  ///   handled by ExecPlan::StartProducing()
+  /// - PauseProducing(), ResumeProducing(), StopProducing() may be called
+  ///   concurrently (but only after StartProducing() has returned successfully)
+  /// - PauseProducing(), ResumeProducing(), StopProducing() may be called
+  ///   by the downstream nodes' InputReceived(), ErrorReceived(), InputFinished()
+  ///   methods
+  /// - StopProducing() should recurse into the inputs
+  /// - StopProducing() must be idempotent
+
+  // XXX What happens if StartProducing() calls an output's InputReceived()

Review comment:
       Might be good to create a map node to test these rules out.  Right now you have a source node which doesn't have to worry about input callbacks and a sink node which doesn't have to worry about output callbacks but no intermediate nodes.

##########
File path: cpp/src/arrow/engine/exec_plan.h
##########
@@ -0,0 +1,242 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "arrow/compute/type_fwd.h"
+#include "arrow/type_fwd.h"
+#include "arrow/util/macros.h"
+#include "arrow/util/visibility.h"
+
+// NOTES:
+// - ExecBatches only have arrays or scalars
+// - data streams may be ordered, so add input number?
+// - node to combine input needs to reorder
+
+namespace arrow {
+namespace engine {
+
+class ExecNode;
+
+class ARROW_EXPORT ExecPlan : public std::enable_shared_from_this<ExecPlan> {
+ public:
+  using NodeVector = std::vector<ExecNode*>;
+
+  virtual ~ExecPlan() = default;
+
+  /// Make an empty exec plan
+  static Result<std::shared_ptr<ExecPlan>> Make();
+
+  void AddNode(std::unique_ptr<ExecNode> node);
+
+  /// The initial inputs
+  const NodeVector& sources() const;
+
+  /// The final outputs
+  const NodeVector& sinks() const;
+
+  // XXX API question:
+  // There are clearly two phases in the ExecPlan lifecycle:
+  // - one construction phase where AddNode() and ExecNode::Bind() is called
+  //   (with optional validation at the end)
+  // - one execution phase where the nodes are topo-sorted and then started
+  //
+  // => Should we separate out those APIs? e.g. have a ExecPlanBuilder
+  // for the first phase.
+
+  Status Validate();
+
+  /// Start producing on all nodes
+  ///
+  /// Nodes are started in reverse topological order, such that any node
+  /// is started before all of its inputs.
+  Status StartProducing();
+
+  // XXX should we also have `void StopProducing()`?
+
+ protected:
+  ExecPlan() = default;
+};
+
+class ARROW_EXPORT ExecNode {
+ public:
+  struct OutputNode {
+    ExecNode* output;
+    // Index of corresponding input in `output` node
+    int input_index;
+  };
+
+  using NodeVector = std::vector<ExecNode*>;
+  using OutputNodeVector = std::vector<OutputNode>;
+  using BatchDescr = std::vector<ValueDescr>;
+
+  virtual ~ExecNode();
+
+  virtual const char* kind_name() = 0;
+  // The number of inputs and outputs expected by this node
+  // XXX should these simply return `input_descrs_.size()`
+  // (`output_descrs_.size()` respectively)?
+  virtual int num_inputs() const = 0;
+  virtual int num_outputs() const = 0;
+
+  /// This node's predecessors in the exec plan
+  const NodeVector& inputs() const { return inputs_; }
+
+  /// The datatypes for each input
+  // XXX Should it be std::vector<DataType>?
+  const std::vector<BatchDescr>& input_descrs() const { return input_descrs_; }
+
+  /// This node's successors in the exec plan
+  const OutputNodeVector& outputs() const { return outputs_; }
+
+  /// The datatypes for each output
+  // XXX Should it be std::vector<DataType>?
+  const std::vector<BatchDescr>& output_descrs() const { return output_descrs_; }
+
+  /// This node's exec plan
+  ExecPlan* plan() { return plan_; }
+  std::shared_ptr<ExecPlan> plan_ref() { return plan_->shared_from_this(); }
+
+  /// \brief An optional label, for display and debugging
+  ///
+  /// There is no guarantee that this value is non-empty or unique.
+  const std::string& label() const { return label_; }
+
+  int AddInput(ExecNode* node) {
+    inputs_.push_back(node);
+    return static_cast<int>(inputs_.size() - 1);
+  }
+
+  void AddOutput(ExecNode* node, int input_index) {
+    outputs_.push_back({node, input_index});
+  }
+
+  static void Bind(ExecNode* input, ExecNode* output) {
+    input->AddOutput(output, output->AddInput(input));
+  }
+
+  Status Validate() const;
+
+  /// Upstream API:
+  /// These functions are called by input nodes that want to inform this node
+  /// about an updated condition (a new input batch, an error, an impeding
+  /// end of stream).
+  ///
+  /// Implementation rules:
+  /// - these may be called anytime after StartProducing() has succeeded
+  ///   (and even during or after StopProducing())
+  /// - these may be called concurrently
+  /// - these are allowed to call back into PauseProducing(), ResumeProducing()
+  ///   and StopProducing()
+
+  /// Transfer input batch to ExecNode
+  virtual void InputReceived(int input_index, int seq_num, compute::ExecBatch batch) = 0;
+
+  /// Signal error to ExecNode
+  virtual void ErrorReceived(int input_index, Status error) = 0;
+
+  /// Mark the inputs finished after the given number of batches.
+  ///
+  /// This may be called before all inputs are received.  This simply fixes
+  /// the total number of incoming batches for an input, so that the ExecNode
+  /// knows when it has received all input, regardless of order.
+  virtual void InputFinished(int input_index, int seq_stop) = 0;
+
+  /// Lifecycle API:
+  /// - start / stop to initiate and terminate production
+  /// - pause / resume to apply backpressure
+  ///
+  /// Implementation rules:
+  /// - StartProducing() should not recurse into the inputs, as it is
+  ///   handled by ExecPlan::StartProducing()
+  /// - PauseProducing(), ResumeProducing(), StopProducing() may be called
+  ///   concurrently (but only after StartProducing() has returned successfully)
+  /// - PauseProducing(), ResumeProducing(), StopProducing() may be called
+  ///   by the downstream nodes' InputReceived(), ErrorReceived(), InputFinished()
+  ///   methods
+  /// - StopProducing() should recurse into the inputs
+  /// - StopProducing() must be idempotent
+
+  // XXX What happens if StartProducing() calls an output's InputReceived()

Review comment:
       Also, there's no examples of a node with 2 inputs.  For example, what happens if one input arrives faster than the other?  I suppose you would queue for some limit and then apply back pressure?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on a change in pull request #10204: [WIP] ARROW-11928: [C++] Execution engine API

Posted by GitBox <gi...@apache.org>.
pitrou commented on a change in pull request #10204:
URL: https://github.com/apache/arrow/pull/10204#discussion_r628278889



##########
File path: cpp/src/arrow/engine/exec_plan.h
##########
@@ -0,0 +1,245 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "arrow/compute/type_fwd.h"
+#include "arrow/type_fwd.h"
+#include "arrow/util/macros.h"
+#include "arrow/util/visibility.h"
+
+// NOTES:
+// - ExecBatches only have arrays or scalars
+// - data streams may be ordered, so add input number?
+// - node to combine input needs to reorder
+
+namespace arrow {
+namespace engine {
+
+class ExecNode;
+
+class ARROW_EXPORT ExecPlan : public std::enable_shared_from_this<ExecPlan> {
+ public:
+  using NodeVector = std::vector<ExecNode*>;
+
+  virtual ~ExecPlan() = default;
+
+  /// Make an empty exec plan
+  static Result<std::shared_ptr<ExecPlan>> Make();
+
+  void AddNode(std::unique_ptr<ExecNode> node);
+
+  /// The initial inputs
+  const NodeVector& sources() const;
+
+  /// The final outputs
+  const NodeVector& sinks() const;
+
+  // XXX API question:
+  // There are clearly two phases in the ExecPlan lifecycle:
+  // - one construction phase where AddNode() and ExecNode::Bind() is called
+  //   (with optional validation at the end)
+  // - one execution phase where the nodes are topo-sorted and then started
+  //
+  // => Should we separate out those APIs? e.g. have a ExecPlanBuilder
+  // for the first phase.
+
+  Status Validate();
+
+  /// Start producing on all nodes
+  ///
+  /// Nodes are started in reverse topological order, such that any node
+  /// is started before all of its inputs.
+  Status StartProducing();
+
+  // XXX should we also have `void StopProducing()`?
+
+ protected:
+  ExecPlan() = default;
+};
+
+class ARROW_EXPORT ExecNode {
+ public:
+  struct OutputNode {
+    ExecNode* output;
+    // Index of corresponding input in `output` node
+    int input_index;
+  };
+
+  using NodeVector = std::vector<ExecNode*>;
+  using OutputNodeVector = std::vector<OutputNode>;
+  using BatchDescr = std::vector<ValueDescr>;
+
+  virtual ~ExecNode();
+
+  virtual const char* kind_name() = 0;
+  // The number of inputs and outputs expected by this node
+  // XXX should these simply return `input_descrs_.size()`
+  // (`output_descrs_.size()` respectively)?
+  virtual int num_inputs() const = 0;
+  virtual int num_outputs() const = 0;
+
+  /// This node's predecessors in the exec plan
+  const NodeVector& inputs() const { return inputs_; }
+
+  /// The datatypes for each input
+  // XXX Should it be std::vector<DataType>?
+  const std::vector<BatchDescr>& input_descrs() const { return input_descrs_; }
+
+  /// This node's successors in the exec plan
+  const OutputNodeVector& outputs() const { return outputs_; }
+
+  /// The datatypes for each output
+  // XXX Should it be std::vector<DataType>?
+  const std::vector<BatchDescr>& output_descrs() const { return output_descrs_; }
+
+  /// This node's exec plan
+  ExecPlan* plan() { return plan_; }
+  std::shared_ptr<ExecPlan> plan_ref() { return plan_->shared_from_this(); }
+
+  /// \brief An optional label, for display and debugging
+  ///
+  /// There is no guarantee that this value is non-empty or unique.
+  const std::string& label() const { return label_; }
+
+  int AddInput(ExecNode* node) {
+    inputs_.push_back(node);
+    return static_cast<int>(inputs_.size() - 1);
+  }
+
+  void AddOutput(ExecNode* node, int input_index) {
+    outputs_.push_back({node, input_index});
+  }
+
+  static void Bind(ExecNode* input, ExecNode* output) {
+    input->AddOutput(output, output->AddInput(input));
+  }
+
+  Status Validate() const;
+
+  /// Upstream API:
+  /// These functions are called by input nodes that want to inform this node
+  /// about an updated condition (a new input batch, an error, an impeding
+  /// end of stream).
+  ///
+  /// Implementation rules:
+  /// - these may be called anytime after StartProducing() has succeeded
+  ///   (and even during or after StopProducing())
+  /// - these may be called concurrently
+  /// - these are allowed to call back into PauseProducing(), ResumeProducing()
+  ///   and StopProducing()
+
+  /// Transfer input batch to ExecNode
+  virtual void InputReceived(int input_index, int seq_num, compute::ExecBatch batch) = 0;
+
+  /// Signal error to ExecNode
+  virtual void ErrorReceived(int input_index, Status error) = 0;

Review comment:
       Hmm, no preference from me.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on pull request #10204: [WIP] ARROW-11928: [C++] Execution engine API

Posted by GitBox <gi...@apache.org>.
pitrou commented on pull request #10204:
URL: https://github.com/apache/arrow/pull/10204#issuecomment-834550693


   Regardless of whether splitting is delegated to a specialized node (which I agree is a good idea), I'm not sure what it gains to remove multiple outputs from the API. Nodes with a single output don't have to deal with the complication in either case.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on a change in pull request #10204: [WIP] ARROW-11928: [C++] Execution engine API

Posted by GitBox <gi...@apache.org>.
pitrou commented on a change in pull request #10204:
URL: https://github.com/apache/arrow/pull/10204#discussion_r628279589



##########
File path: cpp/src/arrow/engine/exec_plan.h
##########
@@ -0,0 +1,245 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "arrow/compute/type_fwd.h"
+#include "arrow/type_fwd.h"
+#include "arrow/util/macros.h"
+#include "arrow/util/visibility.h"
+
+// NOTES:
+// - ExecBatches only have arrays or scalars
+// - data streams may be ordered, so add input number?
+// - node to combine input needs to reorder
+
+namespace arrow {
+namespace engine {
+
+class ExecNode;
+
+class ARROW_EXPORT ExecPlan : public std::enable_shared_from_this<ExecPlan> {
+ public:
+  using NodeVector = std::vector<ExecNode*>;
+
+  virtual ~ExecPlan() = default;
+
+  /// Make an empty exec plan
+  static Result<std::shared_ptr<ExecPlan>> Make();
+
+  void AddNode(std::unique_ptr<ExecNode> node);
+
+  /// The initial inputs
+  const NodeVector& sources() const;
+
+  /// The final outputs
+  const NodeVector& sinks() const;
+
+  // XXX API question:
+  // There are clearly two phases in the ExecPlan lifecycle:
+  // - one construction phase where AddNode() and ExecNode::Bind() is called
+  //   (with optional validation at the end)
+  // - one execution phase where the nodes are topo-sorted and then started
+  //
+  // => Should we separate out those APIs? e.g. have a ExecPlanBuilder
+  // for the first phase.
+
+  Status Validate();
+
+  /// Start producing on all nodes
+  ///
+  /// Nodes are started in reverse topological order, such that any node
+  /// is started before all of its inputs.
+  Status StartProducing();
+
+  // XXX should we also have `void StopProducing()`?
+
+ protected:
+  ExecPlan() = default;
+};
+
+class ARROW_EXPORT ExecNode {
+ public:
+  struct OutputNode {
+    ExecNode* output;
+    // Index of corresponding input in `output` node
+    int input_index;
+  };
+
+  using NodeVector = std::vector<ExecNode*>;
+  using OutputNodeVector = std::vector<OutputNode>;
+  using BatchDescr = std::vector<ValueDescr>;
+
+  virtual ~ExecNode();
+
+  virtual const char* kind_name() = 0;
+  // The number of inputs and outputs expected by this node
+  // XXX should these simply return `input_descrs_.size()`
+  // (`output_descrs_.size()` respectively)?
+  virtual int num_inputs() const = 0;
+  virtual int num_outputs() const = 0;
+
+  /// This node's predecessors in the exec plan
+  const NodeVector& inputs() const { return inputs_; }
+
+  /// The datatypes for each input
+  // XXX Should it be std::vector<DataType>?
+  const std::vector<BatchDescr>& input_descrs() const { return input_descrs_; }
+
+  /// This node's successors in the exec plan
+  const OutputNodeVector& outputs() const { return outputs_; }
+
+  /// The datatypes for each output
+  // XXX Should it be std::vector<DataType>?
+  const std::vector<BatchDescr>& output_descrs() const { return output_descrs_; }
+
+  /// This node's exec plan
+  ExecPlan* plan() { return plan_; }
+  std::shared_ptr<ExecPlan> plan_ref() { return plan_->shared_from_this(); }
+
+  /// \brief An optional label, for display and debugging
+  ///
+  /// There is no guarantee that this value is non-empty or unique.
+  const std::string& label() const { return label_; }
+
+  int AddInput(ExecNode* node) {
+    inputs_.push_back(node);
+    return static_cast<int>(inputs_.size() - 1);
+  }
+
+  void AddOutput(ExecNode* node, int input_index) {
+    outputs_.push_back({node, input_index});
+  }
+
+  static void Bind(ExecNode* input, ExecNode* output) {
+    input->AddOutput(output, output->AddInput(input));
+  }
+
+  Status Validate() const;
+
+  /// Upstream API:
+  /// These functions are called by input nodes that want to inform this node
+  /// about an updated condition (a new input batch, an error, an impeding
+  /// end of stream).
+  ///
+  /// Implementation rules:
+  /// - these may be called anytime after StartProducing() has succeeded
+  ///   (and even during or after StopProducing())
+  /// - these may be called concurrently
+  /// - these are allowed to call back into PauseProducing(), ResumeProducing()
+  ///   and StopProducing()
+
+  /// Transfer input batch to ExecNode
+  virtual void InputReceived(int input_index, int seq_num, compute::ExecBatch batch) = 0;

Review comment:
       That would certainly work, API-wise, but I'm a bit wary of introducing hash lookups out of convenience (unless you have another implementation in mind?).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] github-actions[bot] commented on pull request #10204: ARROW-11928: [C++] Execution engine API

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #10204:
URL: https://github.com/apache/arrow/pull/10204#issuecomment-837391595


   https://issues.apache.org/jira/browse/ARROW-11928


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] bkietz commented on a change in pull request #10204: [WIP] ARROW-11928: [C++] Execution engine API

Posted by GitBox <gi...@apache.org>.
bkietz commented on a change in pull request #10204:
URL: https://github.com/apache/arrow/pull/10204#discussion_r628309858



##########
File path: cpp/src/arrow/engine/exec_plan.h
##########
@@ -0,0 +1,245 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "arrow/compute/type_fwd.h"
+#include "arrow/type_fwd.h"
+#include "arrow/util/macros.h"
+#include "arrow/util/visibility.h"
+
+// NOTES:
+// - ExecBatches only have arrays or scalars
+// - data streams may be ordered, so add input number?
+// - node to combine input needs to reorder
+
+namespace arrow {
+namespace engine {
+
+class ExecNode;
+
+class ARROW_EXPORT ExecPlan : public std::enable_shared_from_this<ExecPlan> {
+ public:
+  using NodeVector = std::vector<ExecNode*>;
+
+  virtual ~ExecPlan() = default;
+
+  /// Make an empty exec plan
+  static Result<std::shared_ptr<ExecPlan>> Make();
+
+  void AddNode(std::unique_ptr<ExecNode> node);
+
+  /// The initial inputs
+  const NodeVector& sources() const;
+
+  /// The final outputs
+  const NodeVector& sinks() const;
+
+  // XXX API question:
+  // There are clearly two phases in the ExecPlan lifecycle:
+  // - one construction phase where AddNode() and ExecNode::Bind() is called
+  //   (with optional validation at the end)
+  // - one execution phase where the nodes are topo-sorted and then started
+  //
+  // => Should we separate out those APIs? e.g. have a ExecPlanBuilder
+  // for the first phase.
+
+  Status Validate();
+
+  /// Start producing on all nodes
+  ///
+  /// Nodes are started in reverse topological order, such that any node
+  /// is started before all of its inputs.
+  Status StartProducing();
+
+  // XXX should we also have `void StopProducing()`?
+
+ protected:
+  ExecPlan() = default;
+};
+
+class ARROW_EXPORT ExecNode {
+ public:
+  struct OutputNode {
+    ExecNode* output;
+    // Index of corresponding input in `output` node
+    int input_index;
+  };
+
+  using NodeVector = std::vector<ExecNode*>;
+  using OutputNodeVector = std::vector<OutputNode>;
+  using BatchDescr = std::vector<ValueDescr>;
+
+  virtual ~ExecNode();
+
+  virtual const char* kind_name() = 0;
+  // The number of inputs and outputs expected by this node
+  // XXX should these simply return `input_descrs_.size()`
+  // (`output_descrs_.size()` respectively)?
+  virtual int num_inputs() const = 0;
+  virtual int num_outputs() const = 0;
+
+  /// This node's predecessors in the exec plan
+  const NodeVector& inputs() const { return inputs_; }
+
+  /// The datatypes for each input
+  // XXX Should it be std::vector<DataType>?
+  const std::vector<BatchDescr>& input_descrs() const { return input_descrs_; }
+
+  /// This node's successors in the exec plan
+  const OutputNodeVector& outputs() const { return outputs_; }
+
+  /// The datatypes for each output
+  // XXX Should it be std::vector<DataType>?
+  const std::vector<BatchDescr>& output_descrs() const { return output_descrs_; }
+
+  /// This node's exec plan
+  ExecPlan* plan() { return plan_; }
+  std::shared_ptr<ExecPlan> plan_ref() { return plan_->shared_from_this(); }
+
+  /// \brief An optional label, for display and debugging
+  ///
+  /// There is no guarantee that this value is non-empty or unique.
+  const std::string& label() const { return label_; }
+
+  int AddInput(ExecNode* node) {
+    inputs_.push_back(node);
+    return static_cast<int>(inputs_.size() - 1);
+  }
+
+  void AddOutput(ExecNode* node, int input_index) {
+    outputs_.push_back({node, input_index});
+  }
+
+  static void Bind(ExecNode* input, ExecNode* output) {
+    input->AddOutput(output, output->AddInput(input));
+  }
+
+  Status Validate() const;
+
+  /// Upstream API:
+  /// These functions are called by input nodes that want to inform this node
+  /// about an updated condition (a new input batch, an error, an impeding
+  /// end of stream).
+  ///
+  /// Implementation rules:
+  /// - these may be called anytime after StartProducing() has succeeded
+  ///   (and even during or after StopProducing())
+  /// - these may be called concurrently
+  /// - these are allowed to call back into PauseProducing(), ResumeProducing()
+  ///   and StopProducing()
+
+  /// Transfer input batch to ExecNode
+  virtual void InputReceived(int input_index, int seq_num, compute::ExecBatch batch) = 0;

Review comment:
       I was thinking of a simple linear search. I wouldn't expect nodes to deal with a large number of inputs, and for vectors of integers shorter than ~256 I'd expect a linear search to be faster than a hash lookup anyway




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on a change in pull request #10204: [WIP] ARROW-11928: [C++] Execution engine API

Posted by GitBox <gi...@apache.org>.
pitrou commented on a change in pull request #10204:
URL: https://github.com/apache/arrow/pull/10204#discussion_r628278589



##########
File path: cpp/CMakeLists.txt
##########
@@ -343,6 +343,10 @@ if(ARROW_CUDA
   set(ARROW_IPC ON)
 endif()
 
+if(ARROW_ENGINE)

Review comment:
       Is there a risk of circular dependency (e.g. dataset -> compute + engine -> dataset)? If not, then agreed it's reasonable.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on a change in pull request #10204: [WIP] ARROW-11928: [C++] Execution engine API

Posted by GitBox <gi...@apache.org>.
pitrou commented on a change in pull request #10204:
URL: https://github.com/apache/arrow/pull/10204#discussion_r623789998



##########
File path: cpp/src/arrow/engine/exec_plan.h
##########
@@ -0,0 +1,242 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "arrow/compute/type_fwd.h"
+#include "arrow/type_fwd.h"
+#include "arrow/util/macros.h"
+#include "arrow/util/visibility.h"
+
+// NOTES:
+// - ExecBatches only have arrays or scalars
+// - data streams may be ordered, so add input number?
+// - node to combine input needs to reorder
+
+namespace arrow {
+namespace engine {
+
+class ExecNode;
+
+class ARROW_EXPORT ExecPlan : public std::enable_shared_from_this<ExecPlan> {
+ public:
+  using NodeVector = std::vector<ExecNode*>;
+
+  virtual ~ExecPlan() = default;
+
+  /// Make an empty exec plan
+  static Result<std::shared_ptr<ExecPlan>> Make();
+
+  void AddNode(std::unique_ptr<ExecNode> node);
+
+  /// The initial inputs
+  const NodeVector& sources() const;
+
+  /// The final outputs
+  const NodeVector& sinks() const;
+
+  // XXX API question:
+  // There are clearly two phases in the ExecPlan lifecycle:
+  // - one construction phase where AddNode() and ExecNode::Bind() is called
+  //   (with optional validation at the end)
+  // - one execution phase where the nodes are topo-sorted and then started
+  //
+  // => Should we separate out those APIs? e.g. have a ExecPlanBuilder
+  // for the first phase.
+
+  Status Validate();
+
+  /// Start producing on all nodes
+  ///
+  /// Nodes are started in reverse topological order, such that any node
+  /// is started before all of its inputs.
+  Status StartProducing();
+
+  // XXX should we also have `void StopProducing()`?
+
+ protected:
+  ExecPlan() = default;
+};
+
+class ARROW_EXPORT ExecNode {
+ public:
+  struct OutputNode {
+    ExecNode* output;
+    // Index of corresponding input in `output` node
+    int input_index;
+  };
+
+  using NodeVector = std::vector<ExecNode*>;
+  using OutputNodeVector = std::vector<OutputNode>;
+  using BatchDescr = std::vector<ValueDescr>;
+
+  virtual ~ExecNode();
+
+  virtual const char* kind_name() = 0;
+  // The number of inputs and outputs expected by this node
+  // XXX should these simply return `input_descrs_.size()`
+  // (`output_descrs_.size()` respectively)?
+  virtual int num_inputs() const = 0;
+  virtual int num_outputs() const = 0;
+
+  /// This node's predecessors in the exec plan
+  const NodeVector& inputs() const { return inputs_; }
+
+  /// The datatypes for each input
+  // XXX Should it be std::vector<DataType>?
+  const std::vector<BatchDescr>& input_descrs() const { return input_descrs_; }

Review comment:
       Ah! I hadn't thought about that... Like `num_inputs()` and `num_outputs()`, those should be mostly useful for validation, so perhaps we can remove them.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on a change in pull request #10204: [WIP] ARROW-11928: [C++] Execution engine API

Posted by GitBox <gi...@apache.org>.
pitrou commented on a change in pull request #10204:
URL: https://github.com/apache/arrow/pull/10204#discussion_r623789563



##########
File path: cpp/src/arrow/engine/exec_plan.h
##########
@@ -0,0 +1,242 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "arrow/compute/type_fwd.h"
+#include "arrow/type_fwd.h"
+#include "arrow/util/macros.h"
+#include "arrow/util/visibility.h"
+
+// NOTES:
+// - ExecBatches only have arrays or scalars
+// - data streams may be ordered, so add input number?
+// - node to combine input needs to reorder
+
+namespace arrow {
+namespace engine {
+
+class ExecNode;
+
+class ARROW_EXPORT ExecPlan : public std::enable_shared_from_this<ExecPlan> {
+ public:
+  using NodeVector = std::vector<ExecNode*>;
+
+  virtual ~ExecPlan() = default;
+
+  /// Make an empty exec plan
+  static Result<std::shared_ptr<ExecPlan>> Make();
+
+  void AddNode(std::unique_ptr<ExecNode> node);
+
+  /// The initial inputs
+  const NodeVector& sources() const;
+
+  /// The final outputs
+  const NodeVector& sinks() const;
+
+  // XXX API question:
+  // There are clearly two phases in the ExecPlan lifecycle:
+  // - one construction phase where AddNode() and ExecNode::Bind() is called
+  //   (with optional validation at the end)
+  // - one execution phase where the nodes are topo-sorted and then started
+  //
+  // => Should we separate out those APIs? e.g. have a ExecPlanBuilder
+  // for the first phase.
+
+  Status Validate();
+
+  /// Start producing on all nodes
+  ///
+  /// Nodes are started in reverse topological order, such that any node
+  /// is started before all of its inputs.
+  Status StartProducing();
+
+  // XXX should we also have `void StopProducing()`?

Review comment:
       Right. I think a StopToken would be used at a higher level, but I'm not sure.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] bkietz closed pull request #10204: ARROW-11928: [C++] Execution engine API

Posted by GitBox <gi...@apache.org>.
bkietz closed pull request #10204:
URL: https://github.com/apache/arrow/pull/10204


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on pull request #10204: [WIP] ARROW-11928: [C++] Execution engine API

Posted by GitBox <gi...@apache.org>.
pitrou commented on pull request #10204:
URL: https://github.com/apache/arrow/pull/10204#issuecomment-834490550


   > I think we can defer support for multiple consumers for the moment, I'm refactoring to singular output nodes
   
   Hmm, I'm not convinced about this. Say you have a RecordBatchReader with several columns of interest. You want each column to feed a separate execution node (say, one different scalar function per column). How do you do that if you can only have singular output nodes? Implement a "supernode" that applies multiple scalar functions to multiple columns?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on pull request #10204: [WIP] ARROW-11928: [C++] Execution engine API

Posted by GitBox <gi...@apache.org>.
westonpace commented on pull request #10204:
URL: https://github.com/apache/arrow/pull/10204#issuecomment-834711745


   This article has an interesting description of how DAGs (which implies multiple outputs) are used by Materialize to optimize query plans: https://scattered-thoughts.net/writing/materialize-decorrelation
   
   I don't know nearly enough to know how common or essential this is.
   
   As for complications, multiple outputs introduces buffering (in both pull and push models).  While you are delivering a result to consumer 1 you have to buffer the result so you can later deliver it to consumer 2.  If your query plan's bottleneck is down the consumer 1 path you could potentially accumulate results in the multicasting operator and need to trigger backpressure.
   
   That's the main complication that jumps to mind.  That being said, this "multicasting" is one of the more confusing points of Rx (Reactive).  But that may just come from the dynamic and linear way in which observers are chained.  Since you're already building a graph (that presumably is unchanging for the duration of the execution) that shouldn't be a problem.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] bkietz commented on a change in pull request #10204: [WIP] ARROW-11928: [C++] Execution engine API

Posted by GitBox <gi...@apache.org>.
bkietz commented on a change in pull request #10204:
URL: https://github.com/apache/arrow/pull/10204#discussion_r623930014



##########
File path: cpp/CMakeLists.txt
##########
@@ -343,6 +343,10 @@ if(ARROW_CUDA
   set(ARROW_IPC ON)
 endif()
 
+if(ARROW_ENGINE)

Review comment:
       I think we can just keep this in the compute:: namespace, in the compute/exec subdirectory. We can extract a separate component later if necessary

##########
File path: cpp/src/arrow/engine/exec_plan.h
##########
@@ -0,0 +1,245 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "arrow/compute/type_fwd.h"
+#include "arrow/type_fwd.h"
+#include "arrow/util/macros.h"
+#include "arrow/util/visibility.h"
+
+// NOTES:
+// - ExecBatches only have arrays or scalars
+// - data streams may be ordered, so add input number?
+// - node to combine input needs to reorder
+
+namespace arrow {
+namespace engine {
+
+class ExecNode;
+
+class ARROW_EXPORT ExecPlan : public std::enable_shared_from_this<ExecPlan> {
+ public:
+  using NodeVector = std::vector<ExecNode*>;
+
+  virtual ~ExecPlan() = default;
+
+  /// Make an empty exec plan
+  static Result<std::shared_ptr<ExecPlan>> Make();
+
+  void AddNode(std::unique_ptr<ExecNode> node);
+
+  /// The initial inputs
+  const NodeVector& sources() const;
+
+  /// The final outputs
+  const NodeVector& sinks() const;
+
+  // XXX API question:
+  // There are clearly two phases in the ExecPlan lifecycle:
+  // - one construction phase where AddNode() and ExecNode::Bind() is called
+  //   (with optional validation at the end)
+  // - one execution phase where the nodes are topo-sorted and then started
+  //
+  // => Should we separate out those APIs? e.g. have a ExecPlanBuilder
+  // for the first phase.
+
+  Status Validate();
+
+  /// Start producing on all nodes
+  ///
+  /// Nodes are started in reverse topological order, such that any node
+  /// is started before all of its inputs.
+  Status StartProducing();
+
+  // XXX should we also have `void StopProducing()`?
+
+ protected:
+  ExecPlan() = default;
+};
+
+class ARROW_EXPORT ExecNode {
+ public:
+  struct OutputNode {
+    ExecNode* output;
+    // Index of corresponding input in `output` node
+    int input_index;
+  };
+
+  using NodeVector = std::vector<ExecNode*>;
+  using OutputNodeVector = std::vector<OutputNode>;
+  using BatchDescr = std::vector<ValueDescr>;
+
+  virtual ~ExecNode();
+
+  virtual const char* kind_name() = 0;
+  // The number of inputs and outputs expected by this node
+  // XXX should these simply return `input_descrs_.size()`
+  // (`output_descrs_.size()` respectively)?
+  virtual int num_inputs() const = 0;
+  virtual int num_outputs() const = 0;
+
+  /// This node's predecessors in the exec plan
+  const NodeVector& inputs() const { return inputs_; }
+
+  /// The datatypes for each input
+  // XXX Should it be std::vector<DataType>?
+  const std::vector<BatchDescr>& input_descrs() const { return input_descrs_; }
+
+  /// This node's successors in the exec plan
+  const OutputNodeVector& outputs() const { return outputs_; }
+
+  /// The datatypes for each output
+  // XXX Should it be std::vector<DataType>?
+  const std::vector<BatchDescr>& output_descrs() const { return output_descrs_; }
+
+  /// This node's exec plan
+  ExecPlan* plan() { return plan_; }
+  std::shared_ptr<ExecPlan> plan_ref() { return plan_->shared_from_this(); }
+
+  /// \brief An optional label, for display and debugging
+  ///
+  /// There is no guarantee that this value is non-empty or unique.
+  const std::string& label() const { return label_; }
+
+  int AddInput(ExecNode* node) {
+    inputs_.push_back(node);
+    return static_cast<int>(inputs_.size() - 1);
+  }
+
+  void AddOutput(ExecNode* node, int input_index) {
+    outputs_.push_back({node, input_index});
+  }
+
+  static void Bind(ExecNode* input, ExecNode* output) {
+    input->AddOutput(output, output->AddInput(input));
+  }
+
+  Status Validate() const;
+
+  /// Upstream API:
+  /// These functions are called by input nodes that want to inform this node
+  /// about an updated condition (a new input batch, an error, an impeding
+  /// end of stream).
+  ///
+  /// Implementation rules:
+  /// - these may be called anytime after StartProducing() has succeeded
+  ///   (and even during or after StopProducing())
+  /// - these may be called concurrently
+  /// - these are allowed to call back into PauseProducing(), ResumeProducing()
+  ///   and StopProducing()
+
+  /// Transfer input batch to ExecNode
+  virtual void InputReceived(int input_index, int seq_num, compute::ExecBatch batch) = 0;
+
+  /// Signal error to ExecNode
+  virtual void ErrorReceived(int input_index, Status error) = 0;

Review comment:
       Maybe too magic, but:
   ```suggestion
     virtual void Receive(int input_index, int seq_num, Result<compute::ExecBatch> batch) = 0;
   ```

##########
File path: cpp/src/arrow/engine/exec_plan.h
##########
@@ -0,0 +1,245 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "arrow/compute/type_fwd.h"
+#include "arrow/type_fwd.h"
+#include "arrow/util/macros.h"
+#include "arrow/util/visibility.h"
+
+// NOTES:
+// - ExecBatches only have arrays or scalars
+// - data streams may be ordered, so add input number?
+// - node to combine input needs to reorder
+
+namespace arrow {
+namespace engine {
+
+class ExecNode;
+
+class ARROW_EXPORT ExecPlan : public std::enable_shared_from_this<ExecPlan> {
+ public:
+  using NodeVector = std::vector<ExecNode*>;
+
+  virtual ~ExecPlan() = default;
+
+  /// Make an empty exec plan
+  static Result<std::shared_ptr<ExecPlan>> Make();
+
+  void AddNode(std::unique_ptr<ExecNode> node);
+
+  /// The initial inputs
+  const NodeVector& sources() const;
+
+  /// The final outputs
+  const NodeVector& sinks() const;
+
+  // XXX API question:
+  // There are clearly two phases in the ExecPlan lifecycle:
+  // - one construction phase where AddNode() and ExecNode::Bind() is called
+  //   (with optional validation at the end)
+  // - one execution phase where the nodes are topo-sorted and then started
+  //
+  // => Should we separate out those APIs? e.g. have a ExecPlanBuilder
+  // for the first phase.
+
+  Status Validate();
+
+  /// Start producing on all nodes
+  ///
+  /// Nodes are started in reverse topological order, such that any node
+  /// is started before all of its inputs.
+  Status StartProducing();
+
+  // XXX should we also have `void StopProducing()`?
+
+ protected:
+  ExecPlan() = default;
+};
+
+class ARROW_EXPORT ExecNode {
+ public:
+  struct OutputNode {
+    ExecNode* output;
+    // Index of corresponding input in `output` node
+    int input_index;
+  };
+
+  using NodeVector = std::vector<ExecNode*>;
+  using OutputNodeVector = std::vector<OutputNode>;
+  using BatchDescr = std::vector<ValueDescr>;
+
+  virtual ~ExecNode();
+
+  virtual const char* kind_name() = 0;
+  // The number of inputs and outputs expected by this node
+  // XXX should these simply return `input_descrs_.size()`
+  // (`output_descrs_.size()` respectively)?
+  virtual int num_inputs() const = 0;
+  virtual int num_outputs() const = 0;
+
+  /// This node's predecessors in the exec plan
+  const NodeVector& inputs() const { return inputs_; }
+
+  /// The datatypes for each input
+  // XXX Should it be std::vector<DataType>?
+  const std::vector<BatchDescr>& input_descrs() const { return input_descrs_; }
+
+  /// This node's successors in the exec plan
+  const OutputNodeVector& outputs() const { return outputs_; }
+
+  /// The datatypes for each output
+  // XXX Should it be std::vector<DataType>?
+  const std::vector<BatchDescr>& output_descrs() const { return output_descrs_; }
+
+  /// This node's exec plan
+  ExecPlan* plan() { return plan_; }
+  std::shared_ptr<ExecPlan> plan_ref() { return plan_->shared_from_this(); }
+
+  /// \brief An optional label, for display and debugging
+  ///
+  /// There is no guarantee that this value is non-empty or unique.
+  const std::string& label() const { return label_; }
+
+  int AddInput(ExecNode* node) {

Review comment:
       ```suggestion
     int AddInput(ExecNode* node, int output_index_in_producer) {
   ```
   I think consumers will need to know their index within a producer

##########
File path: cpp/src/arrow/engine/exec_plan.h
##########
@@ -0,0 +1,245 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "arrow/compute/type_fwd.h"
+#include "arrow/type_fwd.h"
+#include "arrow/util/macros.h"
+#include "arrow/util/visibility.h"
+
+// NOTES:
+// - ExecBatches only have arrays or scalars
+// - data streams may be ordered, so add input number?
+// - node to combine input needs to reorder
+
+namespace arrow {
+namespace engine {
+
+class ExecNode;
+
+class ARROW_EXPORT ExecPlan : public std::enable_shared_from_this<ExecPlan> {
+ public:
+  using NodeVector = std::vector<ExecNode*>;
+
+  virtual ~ExecPlan() = default;
+
+  /// Make an empty exec plan
+  static Result<std::shared_ptr<ExecPlan>> Make();
+
+  void AddNode(std::unique_ptr<ExecNode> node);
+
+  /// The initial inputs
+  const NodeVector& sources() const;
+
+  /// The final outputs
+  const NodeVector& sinks() const;
+
+  // XXX API question:
+  // There are clearly two phases in the ExecPlan lifecycle:
+  // - one construction phase where AddNode() and ExecNode::Bind() is called
+  //   (with optional validation at the end)
+  // - one execution phase where the nodes are topo-sorted and then started
+  //
+  // => Should we separate out those APIs? e.g. have a ExecPlanBuilder
+  // for the first phase.
+
+  Status Validate();
+
+  /// Start producing on all nodes
+  ///
+  /// Nodes are started in reverse topological order, such that any node
+  /// is started before all of its inputs.
+  Status StartProducing();
+
+  // XXX should we also have `void StopProducing()`?
+
+ protected:
+  ExecPlan() = default;
+};
+
+class ARROW_EXPORT ExecNode {
+ public:
+  struct OutputNode {
+    ExecNode* output;
+    // Index of corresponding input in `output` node
+    int input_index;
+  };
+
+  using NodeVector = std::vector<ExecNode*>;
+  using OutputNodeVector = std::vector<OutputNode>;
+  using BatchDescr = std::vector<ValueDescr>;
+
+  virtual ~ExecNode();
+
+  virtual const char* kind_name() = 0;
+  // The number of inputs and outputs expected by this node
+  // XXX should these simply return `input_descrs_.size()`
+  // (`output_descrs_.size()` respectively)?
+  virtual int num_inputs() const = 0;
+  virtual int num_outputs() const = 0;
+
+  /// This node's predecessors in the exec plan
+  const NodeVector& inputs() const { return inputs_; }
+
+  /// The datatypes for each input
+  // XXX Should it be std::vector<DataType>?
+  const std::vector<BatchDescr>& input_descrs() const { return input_descrs_; }
+
+  /// This node's successors in the exec plan
+  const OutputNodeVector& outputs() const { return outputs_; }
+
+  /// The datatypes for each output
+  // XXX Should it be std::vector<DataType>?
+  const std::vector<BatchDescr>& output_descrs() const { return output_descrs_; }
+
+  /// This node's exec plan
+  ExecPlan* plan() { return plan_; }
+  std::shared_ptr<ExecPlan> plan_ref() { return plan_->shared_from_this(); }
+
+  /// \brief An optional label, for display and debugging
+  ///
+  /// There is no guarantee that this value is non-empty or unique.
+  const std::string& label() const { return label_; }
+
+  int AddInput(ExecNode* node) {
+    inputs_.push_back(node);
+    return static_cast<int>(inputs_.size() - 1);
+  }
+
+  void AddOutput(ExecNode* node, int input_index) {
+    outputs_.push_back({node, input_index});
+  }
+
+  static void Bind(ExecNode* input, ExecNode* output) {
+    input->AddOutput(output, output->AddInput(input));
+  }
+
+  Status Validate() const;
+
+  /// Upstream API:
+  /// These functions are called by input nodes that want to inform this node
+  /// about an updated condition (a new input batch, an error, an impeding
+  /// end of stream).
+  ///
+  /// Implementation rules:
+  /// - these may be called anytime after StartProducing() has succeeded
+  ///   (and even during or after StopProducing())
+  /// - these may be called concurrently
+  /// - these are allowed to call back into PauseProducing(), ResumeProducing()
+  ///   and StopProducing()
+
+  /// Transfer input batch to ExecNode
+  virtual void InputReceived(int input_index, int seq_num, compute::ExecBatch batch) = 0;
+
+  /// Signal error to ExecNode
+  virtual void ErrorReceived(int input_index, Status error) = 0;
+
+  /// Mark the inputs finished after the given number of batches.
+  ///
+  /// This may be called before all inputs are received.  This simply fixes
+  /// the total number of incoming batches for an input, so that the ExecNode
+  /// knows when it has received all input, regardless of order.
+  virtual void InputFinished(int input_index, int seq_stop) = 0;
+
+  /// Lifecycle API:
+  /// - start / stop to initiate and terminate production
+  /// - pause / resume to apply backpressure
+  ///
+  /// Implementation rules:
+  /// - StartProducing() should not recurse into the inputs, as it is
+  ///   handled by ExecPlan::StartProducing()
+  /// - PauseProducing(), ResumeProducing(), StopProducing() may be called
+  ///   concurrently (but only after StartProducing() has returned successfully)
+  /// - PauseProducing(), ResumeProducing(), StopProducing() may be called
+  ///   by the downstream nodes' InputReceived(), ErrorReceived(), InputFinished()
+  ///   methods
+  /// - StopProducing() should recurse into the inputs
+  /// - StopProducing() must be idempotent
+
+  // XXX What happens if StartProducing() calls an output's InputReceived()
+  // synchronously, and InputReceived() decides to call back into StopProducing()
+  // (or PauseProducing()) because it received enough data?
+  //
+  // Right now, since synchronous calls happen in both directions (input to
+  // output and then output to input), a node must be careful to be reentrant
+  // against synchronous calls from its output, *and* also concurrent calls from
+  // other threads.  The most reliable solution is to update the internal state
+  // first, and notify outputs only at the end.
+  //
+  // Alternate rules:
+  // - StartProducing(), ResumeProducing() can call synchronously into
+  //   its ouputs' consuming methods (InputReceived() etc.)
+  // - InputReceived(), ErrorReceived(), InputFinished() can call asynchronously
+  //   into its inputs' PauseProducing(), StopProducing()
+  //
+  // Alternate API:
+  // - InputReceived(), ErrorReceived(), InputFinished() return a ProductionHint
+  //   enum: either None (default), PauseProducing, ResumeProducing, StopProducing
+  // - A method allows passing a ProductionHint asynchronously from an output node
+  //   (replacing PauseProducing(), ResumeProducing(), StopProducing())
+
+  // TODO PauseProducing() etc. should probably take the index of the output which calls
+  // them?
+
+  /// \brief Start producing
+  ///
+  /// This must only be called once.  If this fails, then other lifecycle
+  /// methods must not be called.
+  ///
+  /// This is typically called automatically by ExecPlan::StartProducing().
+  virtual Status StartProducing() = 0;
+
+  /// \brief Pause producing temporarily
+  ///
+  /// This call is a hint that an output node is currently not willing
+  /// to receive data.
+  ///
+  /// This may be called any number of times after StartProducing() succeeds.
+  /// However, the node is still free to produce data (which may be difficult
+  /// to prevent anyway if data is producer using multiple threads).

Review comment:
       ```suggestion
     /// to prevent anyway if data is produced using multiple threads).
   ```

##########
File path: cpp/src/arrow/engine/exec_plan.h
##########
@@ -0,0 +1,245 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "arrow/compute/type_fwd.h"
+#include "arrow/type_fwd.h"
+#include "arrow/util/macros.h"
+#include "arrow/util/visibility.h"
+
+// NOTES:
+// - ExecBatches only have arrays or scalars
+// - data streams may be ordered, so add input number?
+// - node to combine input needs to reorder
+
+namespace arrow {
+namespace engine {
+
+class ExecNode;
+
+class ARROW_EXPORT ExecPlan : public std::enable_shared_from_this<ExecPlan> {
+ public:
+  using NodeVector = std::vector<ExecNode*>;
+
+  virtual ~ExecPlan() = default;
+
+  /// Make an empty exec plan
+  static Result<std::shared_ptr<ExecPlan>> Make();
+
+  void AddNode(std::unique_ptr<ExecNode> node);
+
+  /// The initial inputs
+  const NodeVector& sources() const;
+
+  /// The final outputs
+  const NodeVector& sinks() const;
+
+  // XXX API question:
+  // There are clearly two phases in the ExecPlan lifecycle:
+  // - one construction phase where AddNode() and ExecNode::Bind() is called
+  //   (with optional validation at the end)
+  // - one execution phase where the nodes are topo-sorted and then started
+  //
+  // => Should we separate out those APIs? e.g. have a ExecPlanBuilder
+  // for the first phase.
+
+  Status Validate();
+
+  /// Start producing on all nodes
+  ///
+  /// Nodes are started in reverse topological order, such that any node
+  /// is started before all of its inputs.
+  Status StartProducing();
+
+  // XXX should we also have `void StopProducing()`?
+
+ protected:
+  ExecPlan() = default;
+};
+
+class ARROW_EXPORT ExecNode {
+ public:
+  struct OutputNode {
+    ExecNode* output;
+    // Index of corresponding input in `output` node
+    int input_index;
+  };
+
+  using NodeVector = std::vector<ExecNode*>;
+  using OutputNodeVector = std::vector<OutputNode>;
+  using BatchDescr = std::vector<ValueDescr>;
+
+  virtual ~ExecNode();
+
+  virtual const char* kind_name() = 0;
+  // The number of inputs and outputs expected by this node
+  // XXX should these simply return `input_descrs_.size()`
+  // (`output_descrs_.size()` respectively)?
+  virtual int num_inputs() const = 0;
+  virtual int num_outputs() const = 0;
+
+  /// This node's predecessors in the exec plan
+  const NodeVector& inputs() const { return inputs_; }
+
+  /// The datatypes for each input
+  // XXX Should it be std::vector<DataType>?
+  const std::vector<BatchDescr>& input_descrs() const { return input_descrs_; }
+
+  /// This node's successors in the exec plan
+  const OutputNodeVector& outputs() const { return outputs_; }
+
+  /// The datatypes for each output
+  // XXX Should it be std::vector<DataType>?
+  const std::vector<BatchDescr>& output_descrs() const { return output_descrs_; }
+
+  /// This node's exec plan
+  ExecPlan* plan() { return plan_; }
+  std::shared_ptr<ExecPlan> plan_ref() { return plan_->shared_from_this(); }
+
+  /// \brief An optional label, for display and debugging
+  ///
+  /// There is no guarantee that this value is non-empty or unique.
+  const std::string& label() const { return label_; }
+
+  int AddInput(ExecNode* node) {
+    inputs_.push_back(node);
+    return static_cast<int>(inputs_.size() - 1);
+  }
+
+  void AddOutput(ExecNode* node, int input_index) {
+    outputs_.push_back({node, input_index});
+  }
+
+  static void Bind(ExecNode* input, ExecNode* output) {
+    input->AddOutput(output, output->AddInput(input));
+  }
+
+  Status Validate() const;
+
+  /// Upstream API:
+  /// These functions are called by input nodes that want to inform this node
+  /// about an updated condition (a new input batch, an error, an impeding
+  /// end of stream).
+  ///
+  /// Implementation rules:
+  /// - these may be called anytime after StartProducing() has succeeded
+  ///   (and even during or after StopProducing())
+  /// - these may be called concurrently
+  /// - these are allowed to call back into PauseProducing(), ResumeProducing()
+  ///   and StopProducing()
+
+  /// Transfer input batch to ExecNode
+  virtual void InputReceived(int input_index, int seq_num, compute::ExecBatch batch) = 0;

Review comment:
       Would it be possible to identify inputs and outputs by pointer rather than index?
   
   ```suggestion
     virtual void InputReceived(ExecNode* input, int seq_num, compute::ExecBatch batch) = 0;
   ```
   
   I think we won't be gaining a huge performance benefit by using indices at the top level, and it requires much more cognitive overhead of tracking a node's identifying indices within other nodes. If we identified by pointer instead, a node could push output with:
   
   ```c++
     auto batch = ...;
     // push to outputs
     for (ExecNode* output : outputs_) {
       output->InputReceived(this, seq_num, batch);
     }
   ```
   
   Instead of:
   
   ```c++
     auto batch = ...;
     // push to outputs
     for (OutputNode output : outputs_) {
       output.output->InputReceived(output.input_index, seq_num, batch);
     }
   ```

##########
File path: cpp/src/arrow/engine/exec_plan.h
##########
@@ -0,0 +1,245 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "arrow/compute/type_fwd.h"
+#include "arrow/type_fwd.h"
+#include "arrow/util/macros.h"
+#include "arrow/util/visibility.h"
+
+// NOTES:
+// - ExecBatches only have arrays or scalars
+// - data streams may be ordered, so add input number?
+// - node to combine input needs to reorder
+
+namespace arrow {
+namespace engine {
+
+class ExecNode;
+
+class ARROW_EXPORT ExecPlan : public std::enable_shared_from_this<ExecPlan> {
+ public:
+  using NodeVector = std::vector<ExecNode*>;
+
+  virtual ~ExecPlan() = default;
+
+  /// Make an empty exec plan
+  static Result<std::shared_ptr<ExecPlan>> Make();
+
+  void AddNode(std::unique_ptr<ExecNode> node);
+
+  /// The initial inputs
+  const NodeVector& sources() const;
+
+  /// The final outputs
+  const NodeVector& sinks() const;
+
+  // XXX API question:
+  // There are clearly two phases in the ExecPlan lifecycle:
+  // - one construction phase where AddNode() and ExecNode::Bind() is called
+  //   (with optional validation at the end)
+  // - one execution phase where the nodes are topo-sorted and then started
+  //
+  // => Should we separate out those APIs? e.g. have a ExecPlanBuilder
+  // for the first phase.
+
+  Status Validate();
+
+  /// Start producing on all nodes
+  ///
+  /// Nodes are started in reverse topological order, such that any node
+  /// is started before all of its inputs.
+  Status StartProducing();
+
+  // XXX should we also have `void StopProducing()`?
+
+ protected:
+  ExecPlan() = default;
+};
+
+class ARROW_EXPORT ExecNode {
+ public:
+  struct OutputNode {
+    ExecNode* output;
+    // Index of corresponding input in `output` node
+    int input_index;
+  };
+
+  using NodeVector = std::vector<ExecNode*>;
+  using OutputNodeVector = std::vector<OutputNode>;
+  using BatchDescr = std::vector<ValueDescr>;
+
+  virtual ~ExecNode();
+
+  virtual const char* kind_name() = 0;
+  // The number of inputs and outputs expected by this node
+  // XXX should these simply return `input_descrs_.size()`
+  // (`output_descrs_.size()` respectively)?
+  virtual int num_inputs() const = 0;
+  virtual int num_outputs() const = 0;
+
+  /// This node's predecessors in the exec plan
+  const NodeVector& inputs() const { return inputs_; }
+
+  /// The datatypes for each input
+  // XXX Should it be std::vector<DataType>?
+  const std::vector<BatchDescr>& input_descrs() const { return input_descrs_; }
+
+  /// This node's successors in the exec plan
+  const OutputNodeVector& outputs() const { return outputs_; }
+
+  /// The datatypes for each output
+  // XXX Should it be std::vector<DataType>?
+  const std::vector<BatchDescr>& output_descrs() const { return output_descrs_; }
+
+  /// This node's exec plan
+  ExecPlan* plan() { return plan_; }
+  std::shared_ptr<ExecPlan> plan_ref() { return plan_->shared_from_this(); }
+
+  /// \brief An optional label, for display and debugging
+  ///
+  /// There is no guarantee that this value is non-empty or unique.
+  const std::string& label() const { return label_; }
+
+  int AddInput(ExecNode* node) {
+    inputs_.push_back(node);
+    return static_cast<int>(inputs_.size() - 1);
+  }
+
+  void AddOutput(ExecNode* node, int input_index) {
+    outputs_.push_back({node, input_index});
+  }
+
+  static void Bind(ExecNode* input, ExecNode* output) {
+    input->AddOutput(output, output->AddInput(input));
+  }
+
+  Status Validate() const;
+
+  /// Upstream API:
+  /// These functions are called by input nodes that want to inform this node
+  /// about an updated condition (a new input batch, an error, an impeding
+  /// end of stream).
+  ///
+  /// Implementation rules:
+  /// - these may be called anytime after StartProducing() has succeeded
+  ///   (and even during or after StopProducing())
+  /// - these may be called concurrently
+  /// - these are allowed to call back into PauseProducing(), ResumeProducing()
+  ///   and StopProducing()
+
+  /// Transfer input batch to ExecNode
+  virtual void InputReceived(int input_index, int seq_num, compute::ExecBatch batch) = 0;
+
+  /// Signal error to ExecNode
+  virtual void ErrorReceived(int input_index, Status error) = 0;
+
+  /// Mark the inputs finished after the given number of batches.
+  ///
+  /// This may be called before all inputs are received.  This simply fixes
+  /// the total number of incoming batches for an input, so that the ExecNode
+  /// knows when it has received all input, regardless of order.
+  virtual void InputFinished(int input_index, int seq_stop) = 0;
+
+  /// Lifecycle API:
+  /// - start / stop to initiate and terminate production
+  /// - pause / resume to apply backpressure
+  ///
+  /// Implementation rules:
+  /// - StartProducing() should not recurse into the inputs, as it is
+  ///   handled by ExecPlan::StartProducing()
+  /// - PauseProducing(), ResumeProducing(), StopProducing() may be called
+  ///   concurrently (but only after StartProducing() has returned successfully)
+  /// - PauseProducing(), ResumeProducing(), StopProducing() may be called
+  ///   by the downstream nodes' InputReceived(), ErrorReceived(), InputFinished()
+  ///   methods
+  /// - StopProducing() should recurse into the inputs
+  /// - StopProducing() must be idempotent
+
+  // XXX What happens if StartProducing() calls an output's InputReceived()
+  // synchronously, and InputReceived() decides to call back into StopProducing()
+  // (or PauseProducing()) because it received enough data?
+  //
+  // Right now, since synchronous calls happen in both directions (input to
+  // output and then output to input), a node must be careful to be reentrant
+  // against synchronous calls from its output, *and* also concurrent calls from
+  // other threads.  The most reliable solution is to update the internal state
+  // first, and notify outputs only at the end.
+  //
+  // Alternate rules:
+  // - StartProducing(), ResumeProducing() can call synchronously into
+  //   its ouputs' consuming methods (InputReceived() etc.)
+  // - InputReceived(), ErrorReceived(), InputFinished() can call asynchronously
+  //   into its inputs' PauseProducing(), StopProducing()
+  //
+  // Alternate API:
+  // - InputReceived(), ErrorReceived(), InputFinished() return a ProductionHint
+  //   enum: either None (default), PauseProducing, ResumeProducing, StopProducing
+  // - A method allows passing a ProductionHint asynchronously from an output node
+  //   (replacing PauseProducing(), ResumeProducing(), StopProducing())
+
+  // TODO PauseProducing() etc. should probably take the index of the output which calls

Review comment:
       I think StopProducing() should also take the index of an output; one consumer of a producer may finish before the others and that shouldn't cause the whole node to stop.

##########
File path: cpp/src/arrow/engine/exec_plan.h
##########
@@ -0,0 +1,242 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "arrow/compute/type_fwd.h"
+#include "arrow/type_fwd.h"
+#include "arrow/util/macros.h"
+#include "arrow/util/visibility.h"
+
+// NOTES:
+// - ExecBatches only have arrays or scalars
+// - data streams may be ordered, so add input number?
+// - node to combine input needs to reorder
+
+namespace arrow {
+namespace engine {
+
+class ExecNode;
+
+class ARROW_EXPORT ExecPlan : public std::enable_shared_from_this<ExecPlan> {
+ public:
+  using NodeVector = std::vector<ExecNode*>;
+
+  virtual ~ExecPlan() = default;
+
+  /// Make an empty exec plan
+  static Result<std::shared_ptr<ExecPlan>> Make();
+
+  void AddNode(std::unique_ptr<ExecNode> node);
+
+  /// The initial inputs
+  const NodeVector& sources() const;
+
+  /// The final outputs
+  const NodeVector& sinks() const;
+
+  // XXX API question:
+  // There are clearly two phases in the ExecPlan lifecycle:
+  // - one construction phase where AddNode() and ExecNode::Bind() is called
+  //   (with optional validation at the end)
+  // - one execution phase where the nodes are topo-sorted and then started
+  //
+  // => Should we separate out those APIs? e.g. have a ExecPlanBuilder
+  // for the first phase.
+
+  Status Validate();
+
+  /// Start producing on all nodes
+  ///
+  /// Nodes are started in reverse topological order, such that any node
+  /// is started before all of its inputs.
+  Status StartProducing();
+
+  // XXX should we also have `void StopProducing()`?
+
+ protected:
+  ExecPlan() = default;
+};
+
+class ARROW_EXPORT ExecNode {
+ public:
+  struct OutputNode {
+    ExecNode* output;
+    // Index of corresponding input in `output` node
+    int input_index;
+  };
+
+  using NodeVector = std::vector<ExecNode*>;
+  using OutputNodeVector = std::vector<OutputNode>;
+  using BatchDescr = std::vector<ValueDescr>;
+
+  virtual ~ExecNode();
+
+  virtual const char* kind_name() = 0;
+  // The number of inputs and outputs expected by this node
+  // XXX should these simply return `input_descrs_.size()`
+  // (`output_descrs_.size()` respectively)?
+  virtual int num_inputs() const = 0;
+  virtual int num_outputs() const = 0;
+
+  /// This node's predecessors in the exec plan
+  const NodeVector& inputs() const { return inputs_; }
+
+  /// The datatypes for each input
+  // XXX Should it be std::vector<DataType>?
+  const std::vector<BatchDescr>& input_descrs() const { return input_descrs_; }

Review comment:
       The `dataset_schema` (reader schema in Avro parlance) is always known when we construct a ScanNode and add it to a plan. Inferring dataset schema from physical schema is deferred to the dataset factories in `discovery.h`. I believe it's safe to rely on having a known schema along all edges when constructing these nodes

##########
File path: cpp/src/arrow/util/iterator.h
##########
@@ -370,6 +370,12 @@ Iterator<T> MakeErrorIterator(Status s) {
   });
 }
 
+template <typename It,
+          typename T = typename decltype(std::declval<It>().Next())::ValueType>
+Iterator<T> MakePointerIterator(It* it) {

Review comment:
       Could use `MakeIteratorFromReader` instead




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on pull request #10204: [WIP] ARROW-11928: [C++] Execution engine API

Posted by GitBox <gi...@apache.org>.
pitrou commented on pull request #10204:
URL: https://github.com/apache/arrow/pull/10204#issuecomment-834713526


   It's worth repeating that multiple consumers _will_ exist one way or another. They can be exposed in the API, or they can be buried inside the implementation of a special node. Personally, I would find it slightly more logical to expose them in the API, and it would help give the user a better view of what happens (think visualizing the execution graph, for instance).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on a change in pull request #10204: [WIP] ARROW-11928: [C++] Execution engine API

Posted by GitBox <gi...@apache.org>.
pitrou commented on a change in pull request #10204:
URL: https://github.com/apache/arrow/pull/10204#discussion_r623790383



##########
File path: cpp/src/arrow/testing/future_util.h
##########
@@ -119,4 +119,14 @@ void AssertFailed(const Future<T>& fut) {
   }
 }
 
+template <typename T>
+Future<T> DelayAsync(Future<T> fut, double seconds) {

Review comment:
       `SleepAsync` returns a `Future<>`. That said, I finally did something else so will probably remove this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org