You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2022/06/06 16:09:36 UTC

[arrow] branch master updated: ARROW-15901: [C++] Support flat custom output field names in Substrait (#13069)

This is an automated email from the ASF dual-hosted git repository.

westonpace pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 8f0ddc785d ARROW-15901: [C++] Support flat custom output field names in Substrait (#13069)
8f0ddc785d is described below

commit 8f0ddc785dd72e950b570f3bc380deb15c124c45
Author: rtpsw <rt...@hotmail.com>
AuthorDate: Mon Jun 6 19:09:28 2022 +0300

    ARROW-15901: [C++] Support flat custom output field names in Substrait (#13069)
    
    Replaces https://github.com/apache/arrow/pull/12601
    
    Lead-authored-by: Yaron Gvili <rt...@hotmail.com>
    Co-authored-by: rtpsw <rt...@hotmail.com>
    Signed-off-by: Weston Pace <we...@gmail.com>
---
 cpp/src/arrow/compute/exec/options.h    | 10 +++++++--
 cpp/src/arrow/compute/exec/plan_test.cc | 39 +++++++++++++++++++++++++++++++++
 cpp/src/arrow/compute/exec/sink_node.cc | 26 ++++++++++++++++++----
 cpp/src/arrow/engine/substrait/serde.cc | 11 +++++++---
 4 files changed, 77 insertions(+), 9 deletions(-)

diff --git a/cpp/src/arrow/compute/exec/options.h b/cpp/src/arrow/compute/exec/options.h
index 90f1059d99..4691ad65a9 100644
--- a/cpp/src/arrow/compute/exec/options.h
+++ b/cpp/src/arrow/compute/exec/options.h
@@ -232,10 +232,16 @@ class ARROW_EXPORT SinkNodeConsumer {
 /// \brief Add a sink node which consumes data within the exec plan run
 class ARROW_EXPORT ConsumingSinkNodeOptions : public ExecNodeOptions {
  public:
-  explicit ConsumingSinkNodeOptions(std::shared_ptr<SinkNodeConsumer> consumer)
-      : consumer(std::move(consumer)) {}
+  explicit ConsumingSinkNodeOptions(std::shared_ptr<SinkNodeConsumer> consumer,
+                                    std::vector<std::string> names = {})
+      : consumer(std::move(consumer)), names(std::move(names)) {}
 
   std::shared_ptr<SinkNodeConsumer> consumer;
+  /// \brief Names to rename the sink's schema fields to
+  ///
+  /// If specified then names must be provided for all fields. Currently, only a flat
+  /// schema is supported (see ARROW-15901).
+  std::vector<std::string> names;
 };
 
 /// \brief Make a node which sorts rows passed through it
diff --git a/cpp/src/arrow/compute/exec/plan_test.cc b/cpp/src/arrow/compute/exec/plan_test.cc
index 900e3afddc..ea9d8ebf49 100644
--- a/cpp/src/arrow/compute/exec/plan_test.cc
+++ b/cpp/src/arrow/compute/exec/plan_test.cc
@@ -589,6 +589,45 @@ TEST(ExecPlanExecution, SourceTableConsumingSink) {
   }
 }
 
+TEST(ExecPlanExecution, ConsumingSinkNames) {
+  struct SchemaKeepingConsumer : public SinkNodeConsumer {
+    std::shared_ptr<Schema> schema_;
+    Status Init(const std::shared_ptr<Schema>& schema,
+                BackpressureControl* backpressure_control) override {
+      schema_ = schema;
+      return Status::OK();
+    }
+    Status Consume(ExecBatch batch) override { return Status::OK(); }
+    Future<> Finish() override { return Future<>::MakeFinished(); }
+  };
+  std::vector<std::vector<std::string>> names_data = {{}, {"a", "b"}, {"a", "b", "c"}};
+  for (const auto& names : names_data) {
+    auto consumer = std::make_shared<SchemaKeepingConsumer>();
+    ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
+    auto basic_data = MakeBasicBatches();
+    ASSERT_OK(Declaration::Sequence(
+                  {{"source",
+                    SourceNodeOptions(basic_data.schema, basic_data.gen(false, false))},
+                   {"consuming_sink", ConsumingSinkNodeOptions(consumer, names)}})
+                  .AddToPlan(plan.get()));
+    ASSERT_OK_AND_ASSIGN(
+        auto source,
+        MakeExecNode("source", plan.get(), {},
+                     SourceNodeOptions(basic_data.schema, basic_data.gen(false, false))));
+    ASSERT_OK(MakeExecNode("consuming_sink", plan.get(), {source},
+                           ConsumingSinkNodeOptions(consumer, names)));
+    if (names.size() != 0 &&
+        names.size() != static_cast<size_t>(basic_data.batches[0].num_values())) {
+      ASSERT_RAISES(Invalid, plan->StartProducing());
+    } else {
+      auto expected_names = names.size() == 0 ? basic_data.schema->field_names() : names;
+      ASSERT_OK(plan->StartProducing());
+      ASSERT_FINISHES_OK(plan->finished());
+      ASSERT_EQ(expected_names, consumer->schema_->field_names());
+    }
+  }
+}
+
 TEST(ExecPlanExecution, ConsumingSinkError) {
   struct InitErrorConsumer : public SinkNodeConsumer {
     Status Init(const std::shared_ptr<Schema>& schema,
diff --git a/cpp/src/arrow/compute/exec/sink_node.cc b/cpp/src/arrow/compute/exec/sink_node.cc
index bd6c3b79b8..eae12bf729 100644
--- a/cpp/src/arrow/compute/exec/sink_node.cc
+++ b/cpp/src/arrow/compute/exec/sink_node.cc
@@ -263,10 +263,12 @@ class SinkNode : public ExecNode {
 class ConsumingSinkNode : public ExecNode, public BackpressureControl {
  public:
   ConsumingSinkNode(ExecPlan* plan, std::vector<ExecNode*> inputs,
-                    std::shared_ptr<SinkNodeConsumer> consumer)
+                    std::shared_ptr<SinkNodeConsumer> consumer,
+                    std::vector<std::string> names)
       : ExecNode(plan, std::move(inputs), {"to_consume"}, {},
                  /*num_outputs=*/0),
-        consumer_(std::move(consumer)) {}
+        consumer_(std::move(consumer)),
+        names_(std::move(names)) {}
 
   static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
                                 const ExecNodeOptions& options) {
@@ -274,7 +276,8 @@ class ConsumingSinkNode : public ExecNode, public BackpressureControl {
 
     const auto& sink_options = checked_cast<const ConsumingSinkNodeOptions&>(options);
     return plan->EmplaceNode<ConsumingSinkNode>(plan, std::move(inputs),
-                                                std::move(sink_options.consumer));
+                                                std::move(sink_options.consumer),
+                                                std::move(sink_options.names));
   }
 
   const char* kind_name() const override { return "ConsumingSinkNode"; }
@@ -285,7 +288,21 @@ class ConsumingSinkNode : public ExecNode, public BackpressureControl {
                         {"node.detail", ToString()},
                         {"node.kind", kind_name()}});
     DCHECK_GT(inputs_.size(), 0);
-    RETURN_NOT_OK(consumer_->Init(inputs_[0]->output_schema(), this));
+    auto output_schema = inputs_[0]->output_schema();
+    if (names_.size() > 0) {
+      int num_fields = output_schema->num_fields();
+      if (names_.size() != static_cast<size_t>(num_fields)) {
+        return Status::Invalid("ConsumingSinkNode with mismatched number of names");
+      }
+      FieldVector fields(num_fields);
+      int i = 0;
+      for (const auto& output_field : output_schema->fields()) {
+        fields[i] = field(names_[i], output_field->type());
+        ++i;
+      }
+      output_schema = schema(std::move(fields));
+    }
+    RETURN_NOT_OK(consumer_->Init(output_schema, this));
     finished_ = Future<>::Make();
     END_SPAN_ON_FUTURE_COMPLETION(span_, finished_, this);
     return Status::OK();
@@ -373,6 +390,7 @@ class ConsumingSinkNode : public ExecNode, public BackpressureControl {
 
   AtomicCounter input_counter_;
   std::shared_ptr<SinkNodeConsumer> consumer_;
+  std::vector<std::string> names_;
   int32_t backpressure_counter_ = 0;
 };
 
diff --git a/cpp/src/arrow/engine/substrait/serde.cc b/cpp/src/arrow/engine/substrait/serde.cc
index 759908b122..2012f1fc26 100644
--- a/cpp/src/arrow/engine/substrait/serde.cc
+++ b/cpp/src/arrow/engine/substrait/serde.cc
@@ -67,15 +67,20 @@ Result<std::vector<compute::Declaration>> DeserializePlans(
 
   std::vector<compute::Declaration> sink_decls;
   for (const substrait::PlanRel& plan_rel : plan.relations()) {
+    ARROW_ASSIGN_OR_RAISE(
+        auto decl,
+        FromProto(plan_rel.has_root() ? plan_rel.root().input() : plan_rel.rel(),
+                  ext_set));
+    std::vector<std::string> names;
     if (plan_rel.has_root()) {
-      return Status::NotImplemented("substrait::PlanRel with custom output field names");
+      names.assign(plan_rel.root().names().begin(), plan_rel.root().names().end());
     }
-    ARROW_ASSIGN_OR_RAISE(auto decl, FromProto(plan_rel.rel(), ext_set));
 
     // pipe each relation into a consuming_sink node
     auto sink_decl = compute::Declaration::Sequence({
         std::move(decl),
-        {"consuming_sink", compute::ConsumingSinkNodeOptions{consumer_factory()}},
+        {"consuming_sink",
+         compute::ConsumingSinkNodeOptions{consumer_factory(), std::move(names)}},
     });
     sink_decls.push_back(std::move(sink_decl));
   }