You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2022/04/12 03:24:27 UTC

[arrow] branch master updated: ARROW-15515: [C++] Update ExecPlan example code and documentation with new options

This is an automated email from the ASF dual-hosted git repository.

westonpace pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 7d4e93dafd ARROW-15515: [C++] Update ExecPlan example code and documentation with new options
7d4e93dafd is described below

commit 7d4e93dafdd96c6540d822851ee3ccd10e167a4f
Author: Vibhatha Abeykoon <vi...@gmail.com>
AuthorDate: Mon Apr 11 17:24:05 2022 -1000

    ARROW-15515: [C++] Update ExecPlan example code and documentation with new options
    
    This PR includes a documentation update for streaming execution engine and a `table_sink` example.
    
    Closes #12689 from vibhatha/arrow-15515
    
    Authored-by: Vibhatha Abeykoon <vi...@gmail.com>
    Signed-off-by: Weston Pace <we...@gmail.com>
---
 .../arrow/execution_plan_documentation_examples.cc | 45 ++++++++++++++++++++++
 cpp/src/arrow/compute/exec/options.h               |  5 +--
 docs/source/cpp/streaming_execution.rst            | 21 ++++++++++
 3 files changed, 68 insertions(+), 3 deletions(-)

diff --git a/cpp/examples/arrow/execution_plan_documentation_examples.cc b/cpp/examples/arrow/execution_plan_documentation_examples.cc
index 0505af223e..1ca3d36a34 100644
--- a/cpp/examples/arrow/execution_plan_documentation_examples.cc
+++ b/cpp/examples/arrow/execution_plan_documentation_examples.cc
@@ -855,6 +855,46 @@ arrow::Status SourceUnionSinkExample(cp::ExecContext& exec_context) {
 
 // (Doc section: Union Example)
 
+// (Doc section: Table Sink Example)
+
+/// \brief An example showing a table sink node
+/// \param exec_context The execution context to run the plan in
+///
+/// TableSink Example
+/// This example shows how a table_sink can be used
+/// in an execution plan. This includes a source node
+/// receiving data as batches and the table sink node
+/// which emits the output as a table.
+arrow::Status TableSinkExample(cp::ExecContext& exec_context) {
+  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
+                        cp::ExecPlan::Make(&exec_context));
+
+  ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
+
+  auto source_node_options = cp::SourceNodeOptions{basic_data.schema, basic_data.gen()};
+
+  ARROW_ASSIGN_OR_RAISE(cp::ExecNode * source,
+                        cp::MakeExecNode("source", plan.get(), {}, source_node_options));
+
+  std::shared_ptr<arrow::Table> output_table;
+  auto table_sink_options = cp::TableSinkNodeOptions{&output_table};
+
+  ARROW_RETURN_NOT_OK(
+      cp::MakeExecNode("table_sink", plan.get(), {source}, table_sink_options));
+  // validate the ExecPlan
+  ARROW_RETURN_NOT_OK(plan->Validate());
+  std::cout << "ExecPlan created : " << plan->ToString() << std::endl;
+  // start the ExecPlan
+  ARROW_RETURN_NOT_OK(plan->StartProducing());
+
+  // Wait for the plan to finish
+  auto finished = plan->finished();
+  RETURN_NOT_OK(finished.status());
+  std::cout << "Results : " << output_table->ToString() << std::endl;
+  return arrow::Status::OK();
+}
+// (Doc section: Table Sink Example)
+
 enum ExampleMode {
   SOURCE_SINK = 0,
   TABLE_SOURCE_SINK = 1,
@@ -869,6 +909,7 @@ enum ExampleMode {
   KSELECT = 10,
   WRITE = 11,
   UNION = 12,
+  TABLE_SOURCE_TABLE_SINK = 13
 };
 
 int main(int argc, char** argv) {
@@ -937,6 +978,10 @@ int main(int argc, char** argv) {
       PrintBlock("Union Example");
       status = SourceUnionSinkExample(exec_context);
       break;
+    case TABLE_SOURCE_TABLE_SINK:
+      PrintBlock("TableSink Example");
+      status = TableSinkExample(exec_context);
+      break;
     default:
       break;
   }
diff --git a/cpp/src/arrow/compute/exec/options.h b/cpp/src/arrow/compute/exec/options.h
index 259e467d97..9e99953e87 100644
--- a/cpp/src/arrow/compute/exec/options.h
+++ b/cpp/src/arrow/compute/exec/options.h
@@ -305,12 +305,11 @@ class ARROW_EXPORT SelectKSinkNodeOptions : public SinkNodeOptions {
   /// SelectK options
   SelectKOptions select_k_options;
 };
-
 /// @}
 
-/// \brief Adapt an Table as a sink node
+/// \brief Adapt a Table as a sink node
 ///
-/// obtains the output of a execution plan to
+/// obtains the output of an execution plan to
 /// a table pointer.
 class ARROW_EXPORT TableSinkNodeOptions : public ExecNodeOptions {
  public:
diff --git a/docs/source/cpp/streaming_execution.rst b/docs/source/cpp/streaming_execution.rst
index 5bee70eb05..649968ad43 100644
--- a/docs/source/cpp/streaming_execution.rst
+++ b/docs/source/cpp/streaming_execution.rst
@@ -346,6 +346,8 @@ This is the list of operations associated with the execution plan:
      - :class:`arrow::dataset::WriteNodeOptions`
    * - ``union``
      - N/A
+   * - ``table_sink``
+     - :class:`arrow::compute::TableSinkNodeOptions`
 
 .. _stream_execution_source_docs:
 
@@ -647,6 +649,25 @@ SelectK example:
 
 .. _stream_execution_scan_docs:
 
+``table_sink``
+----------------
+
+.. _stream_execution_table_sink_docs:
+
+The ``table_sink`` node provides the ability to receive the output as an in-memory table. 
+This is simpler to use than the other sink nodes provided by the streaming execution engine
+but it only makes sense when the output fits comfortably in memory.
+The node is created using :class:`arrow::compute::TableSinkNodeOptions`.
+
+Example of using ``table_sink``
+
+.. literalinclude:: ../../../cpp/examples/arrow/execution_plan_documentation_examples.cc
+  :language: cpp
+  :start-after: (Doc section: Table Sink Example)
+  :end-before: (Doc section: Table Sink Example)
+  :linenos:
+  :lineno-match:
+
 ``scan``
 ---------