You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2022/04/12 03:24:27 UTC
[arrow] branch master updated: ARROW-15515: [C++] Update ExecPlan example code and documentation with new options
This is an automated email from the ASF dual-hosted git repository.
westonpace pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 7d4e93dafd ARROW-15515: [C++] Update ExecPlan example code and documentation with new options
7d4e93dafd is described below
commit 7d4e93dafdd96c6540d822851ee3ccd10e167a4f
Author: Vibhatha Abeykoon <vi...@gmail.com>
AuthorDate: Mon Apr 11 17:24:05 2022 -1000
ARROW-15515: [C++] Update ExecPlan example code and documentation with new options
This PR includes a documentation update for streaming execution engine and a `table_sink` example.
Closes #12689 from vibhatha/arrow-15515
Authored-by: Vibhatha Abeykoon <vi...@gmail.com>
Signed-off-by: Weston Pace <we...@gmail.com>
---
.../arrow/execution_plan_documentation_examples.cc | 45 ++++++++++++++++++++++
cpp/src/arrow/compute/exec/options.h | 5 +--
docs/source/cpp/streaming_execution.rst | 21 ++++++++++
3 files changed, 68 insertions(+), 3 deletions(-)
diff --git a/cpp/examples/arrow/execution_plan_documentation_examples.cc b/cpp/examples/arrow/execution_plan_documentation_examples.cc
index 0505af223e..1ca3d36a34 100644
--- a/cpp/examples/arrow/execution_plan_documentation_examples.cc
+++ b/cpp/examples/arrow/execution_plan_documentation_examples.cc
@@ -855,6 +855,46 @@ arrow::Status SourceUnionSinkExample(cp::ExecContext& exec_context) {
// (Doc section: Union Example)
+// (Doc section: Table Sink Example)
+
+/// \brief An example showing a table sink node
+/// \param exec_context The execution context to run the plan in
+///
+/// TableSink Example
+/// This example shows how a table_sink can be used
+/// in an execution plan. This includes a source node
+/// receiving data as batches and the table sink node
+/// which emits the output as a table.
+arrow::Status TableSinkExample(cp::ExecContext& exec_context) {
+ ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
+ cp::ExecPlan::Make(&exec_context));
+
+ ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
+
+ auto source_node_options = cp::SourceNodeOptions{basic_data.schema, basic_data.gen()};
+
+ ARROW_ASSIGN_OR_RAISE(cp::ExecNode * source,
+ cp::MakeExecNode("source", plan.get(), {}, source_node_options));
+
+ std::shared_ptr<arrow::Table> output_table;
+ auto table_sink_options = cp::TableSinkNodeOptions{&output_table};
+
+ ARROW_RETURN_NOT_OK(
+ cp::MakeExecNode("table_sink", plan.get(), {source}, table_sink_options));
+ // validate the ExecPlan
+ ARROW_RETURN_NOT_OK(plan->Validate());
+ std::cout << "ExecPlan created : " << plan->ToString() << std::endl;
+ // start the ExecPlan
+ ARROW_RETURN_NOT_OK(plan->StartProducing());
+
+ // Wait for the plan to finish
+ auto finished = plan->finished();
+ RETURN_NOT_OK(finished.status());
+ std::cout << "Results : " << output_table->ToString() << std::endl;
+ return arrow::Status::OK();
+}
+// (Doc section: Table Sink Example)
+
enum ExampleMode {
SOURCE_SINK = 0,
TABLE_SOURCE_SINK = 1,
@@ -869,6 +909,7 @@ enum ExampleMode {
KSELECT = 10,
WRITE = 11,
UNION = 12,
+ TABLE_SOURCE_TABLE_SINK = 13
};
int main(int argc, char** argv) {
@@ -937,6 +978,10 @@ int main(int argc, char** argv) {
PrintBlock("Union Example");
status = SourceUnionSinkExample(exec_context);
break;
+ case TABLE_SOURCE_TABLE_SINK:
+ PrintBlock("TableSink Example");
+ status = TableSinkExample(exec_context);
+ break;
default:
break;
}
diff --git a/cpp/src/arrow/compute/exec/options.h b/cpp/src/arrow/compute/exec/options.h
index 259e467d97..9e99953e87 100644
--- a/cpp/src/arrow/compute/exec/options.h
+++ b/cpp/src/arrow/compute/exec/options.h
@@ -305,12 +305,11 @@ class ARROW_EXPORT SelectKSinkNodeOptions : public SinkNodeOptions {
/// SelectK options
SelectKOptions select_k_options;
};
-
/// @}
-/// \brief Adapt an Table as a sink node
+/// \brief Adapt a Table as a sink node
///
-/// obtains the output of a execution plan to
+/// obtains the output of an execution plan to
/// a table pointer.
class ARROW_EXPORT TableSinkNodeOptions : public ExecNodeOptions {
public:
diff --git a/docs/source/cpp/streaming_execution.rst b/docs/source/cpp/streaming_execution.rst
index 5bee70eb05..649968ad43 100644
--- a/docs/source/cpp/streaming_execution.rst
+++ b/docs/source/cpp/streaming_execution.rst
@@ -346,6 +346,8 @@ This is the list of operations associated with the execution plan:
- :class:`arrow::dataset::WriteNodeOptions`
* - ``union``
- N/A
+ * - ``table_sink``
+ - :class:`arrow::compute::TableSinkNodeOptions`
.. _stream_execution_source_docs:
@@ -647,6 +649,25 @@ SelectK example:
.. _stream_execution_scan_docs:
+``table_sink``
+----------------
+
+.. _stream_execution_table_sink_docs:
+
+The ``table_sink`` node provides the ability to receive the output as an in-memory table.
+This is simpler to use than the other sink nodes provided by the streaming execution engine
+but it only makes sense when the output fits comfortably in memory.
+The node is created using :class:`arrow::compute::TableSinkNodeOptions`.
+
+Example of using ``table_sink``
+
+.. literalinclude:: ../../../cpp/examples/arrow/execution_plan_documentation_examples.cc
+ :language: cpp
+ :start-after: (Doc section: Table Sink Example)
+ :end-before: (Doc section: Table Sink Example)
+ :linenos:
+ :lineno-match:
+
``scan``
---------