You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2022/06/06 16:09:36 UTC
[arrow] branch master updated: ARROW-15901: [C++] Support flat custom output field names in Substrait (#13069)
This is an automated email from the ASF dual-hosted git repository.
westonpace pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 8f0ddc785d ARROW-15901: [C++] Support flat custom output field names in Substrait (#13069)
8f0ddc785d is described below
commit 8f0ddc785dd72e950b570f3bc380deb15c124c45
Author: rtpsw <rt...@hotmail.com>
AuthorDate: Mon Jun 6 19:09:28 2022 +0300
ARROW-15901: [C++] Support flat custom output field names in Substrait (#13069)
Replaces https://github.com/apache/arrow/pull/12601
Lead-authored-by: Yaron Gvili <rt...@hotmail.com>
Co-authored-by: rtpsw <rt...@hotmail.com>
Signed-off-by: Weston Pace <we...@gmail.com>
---
cpp/src/arrow/compute/exec/options.h | 10 +++++++--
cpp/src/arrow/compute/exec/plan_test.cc | 39 +++++++++++++++++++++++++++++++++
cpp/src/arrow/compute/exec/sink_node.cc | 26 ++++++++++++++++++----
cpp/src/arrow/engine/substrait/serde.cc | 11 +++++++---
4 files changed, 77 insertions(+), 9 deletions(-)
diff --git a/cpp/src/arrow/compute/exec/options.h b/cpp/src/arrow/compute/exec/options.h
index 90f1059d99..4691ad65a9 100644
--- a/cpp/src/arrow/compute/exec/options.h
+++ b/cpp/src/arrow/compute/exec/options.h
@@ -232,10 +232,16 @@ class ARROW_EXPORT SinkNodeConsumer {
/// \brief Add a sink node which consumes data within the exec plan run
class ARROW_EXPORT ConsumingSinkNodeOptions : public ExecNodeOptions {
public:
- explicit ConsumingSinkNodeOptions(std::shared_ptr<SinkNodeConsumer> consumer)
- : consumer(std::move(consumer)) {}
+ explicit ConsumingSinkNodeOptions(std::shared_ptr<SinkNodeConsumer> consumer,
+ std::vector<std::string> names = {})
+ : consumer(std::move(consumer)), names(std::move(names)) {}
std::shared_ptr<SinkNodeConsumer> consumer;
+ /// \brief Names to rename the sink's schema fields to
+ ///
+ /// If specified then names must be provided for all fields. Currently, only a flat
+ /// schema is supported (see ARROW-15901).
+ std::vector<std::string> names;
};
/// \brief Make a node which sorts rows passed through it
diff --git a/cpp/src/arrow/compute/exec/plan_test.cc b/cpp/src/arrow/compute/exec/plan_test.cc
index 900e3afddc..ea9d8ebf49 100644
--- a/cpp/src/arrow/compute/exec/plan_test.cc
+++ b/cpp/src/arrow/compute/exec/plan_test.cc
@@ -589,6 +589,45 @@ TEST(ExecPlanExecution, SourceTableConsumingSink) {
}
}
+TEST(ExecPlanExecution, ConsumingSinkNames) {
+ struct SchemaKeepingConsumer : public SinkNodeConsumer {
+ std::shared_ptr<Schema> schema_;
+ Status Init(const std::shared_ptr<Schema>& schema,
+ BackpressureControl* backpressure_control) override {
+ schema_ = schema;
+ return Status::OK();
+ }
+ Status Consume(ExecBatch batch) override { return Status::OK(); }
+ Future<> Finish() override { return Future<>::MakeFinished(); }
+ };
+ std::vector<std::vector<std::string>> names_data = {{}, {"a", "b"}, {"a", "b", "c"}};
+ for (const auto& names : names_data) {
+ auto consumer = std::make_shared<SchemaKeepingConsumer>();
+ ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
+ auto basic_data = MakeBasicBatches();
+ ASSERT_OK(Declaration::Sequence(
+ {{"source",
+ SourceNodeOptions(basic_data.schema, basic_data.gen(false, false))},
+ {"consuming_sink", ConsumingSinkNodeOptions(consumer, names)}})
+ .AddToPlan(plan.get()));
+ ASSERT_OK_AND_ASSIGN(
+ auto source,
+ MakeExecNode("source", plan.get(), {},
+ SourceNodeOptions(basic_data.schema, basic_data.gen(false, false))));
+ ASSERT_OK(MakeExecNode("consuming_sink", plan.get(), {source},
+ ConsumingSinkNodeOptions(consumer, names)));
+ if (names.size() != 0 &&
+ names.size() != static_cast<size_t>(basic_data.batches[0].num_values())) {
+ ASSERT_RAISES(Invalid, plan->StartProducing());
+ } else {
+ auto expected_names = names.size() == 0 ? basic_data.schema->field_names() : names;
+ ASSERT_OK(plan->StartProducing());
+ ASSERT_FINISHES_OK(plan->finished());
+ ASSERT_EQ(expected_names, consumer->schema_->field_names());
+ }
+ }
+}
+
TEST(ExecPlanExecution, ConsumingSinkError) {
struct InitErrorConsumer : public SinkNodeConsumer {
Status Init(const std::shared_ptr<Schema>& schema,
diff --git a/cpp/src/arrow/compute/exec/sink_node.cc b/cpp/src/arrow/compute/exec/sink_node.cc
index bd6c3b79b8..eae12bf729 100644
--- a/cpp/src/arrow/compute/exec/sink_node.cc
+++ b/cpp/src/arrow/compute/exec/sink_node.cc
@@ -263,10 +263,12 @@ class SinkNode : public ExecNode {
class ConsumingSinkNode : public ExecNode, public BackpressureControl {
public:
ConsumingSinkNode(ExecPlan* plan, std::vector<ExecNode*> inputs,
- std::shared_ptr<SinkNodeConsumer> consumer)
+ std::shared_ptr<SinkNodeConsumer> consumer,
+ std::vector<std::string> names)
: ExecNode(plan, std::move(inputs), {"to_consume"}, {},
/*num_outputs=*/0),
- consumer_(std::move(consumer)) {}
+ consumer_(std::move(consumer)),
+ names_(std::move(names)) {}
static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
const ExecNodeOptions& options) {
@@ -274,7 +276,8 @@ class ConsumingSinkNode : public ExecNode, public BackpressureControl {
const auto& sink_options = checked_cast<const ConsumingSinkNodeOptions&>(options);
return plan->EmplaceNode<ConsumingSinkNode>(plan, std::move(inputs),
- std::move(sink_options.consumer));
+ std::move(sink_options.consumer),
+ std::move(sink_options.names));
}
const char* kind_name() const override { return "ConsumingSinkNode"; }
@@ -285,7 +288,21 @@ class ConsumingSinkNode : public ExecNode, public BackpressureControl {
{"node.detail", ToString()},
{"node.kind", kind_name()}});
DCHECK_GT(inputs_.size(), 0);
- RETURN_NOT_OK(consumer_->Init(inputs_[0]->output_schema(), this));
+ auto output_schema = inputs_[0]->output_schema();
+ if (names_.size() > 0) {
+ int num_fields = output_schema->num_fields();
+ if (names_.size() != static_cast<size_t>(num_fields)) {
+ return Status::Invalid("ConsumingSinkNode with mismatched number of names");
+ }
+ FieldVector fields(num_fields);
+ int i = 0;
+ for (const auto& output_field : output_schema->fields()) {
+ fields[i] = field(names_[i], output_field->type());
+ ++i;
+ }
+ output_schema = schema(std::move(fields));
+ }
+ RETURN_NOT_OK(consumer_->Init(output_schema, this));
finished_ = Future<>::Make();
END_SPAN_ON_FUTURE_COMPLETION(span_, finished_, this);
return Status::OK();
@@ -373,6 +390,7 @@ class ConsumingSinkNode : public ExecNode, public BackpressureControl {
AtomicCounter input_counter_;
std::shared_ptr<SinkNodeConsumer> consumer_;
+ std::vector<std::string> names_;
int32_t backpressure_counter_ = 0;
};
diff --git a/cpp/src/arrow/engine/substrait/serde.cc b/cpp/src/arrow/engine/substrait/serde.cc
index 759908b122..2012f1fc26 100644
--- a/cpp/src/arrow/engine/substrait/serde.cc
+++ b/cpp/src/arrow/engine/substrait/serde.cc
@@ -67,15 +67,20 @@ Result<std::vector<compute::Declaration>> DeserializePlans(
std::vector<compute::Declaration> sink_decls;
for (const substrait::PlanRel& plan_rel : plan.relations()) {
+ ARROW_ASSIGN_OR_RAISE(
+ auto decl,
+ FromProto(plan_rel.has_root() ? plan_rel.root().input() : plan_rel.rel(),
+ ext_set));
+ std::vector<std::string> names;
if (plan_rel.has_root()) {
- return Status::NotImplemented("substrait::PlanRel with custom output field names");
+ names.assign(plan_rel.root().names().begin(), plan_rel.root().names().end());
}
- ARROW_ASSIGN_OR_RAISE(auto decl, FromProto(plan_rel.rel(), ext_set));
// pipe each relation into a consuming_sink node
auto sink_decl = compute::Declaration::Sequence({
std::move(decl),
- {"consuming_sink", compute::ConsumingSinkNodeOptions{consumer_factory()}},
+ {"consuming_sink",
+ compute::ConsumingSinkNodeOptions{consumer_factory(), std::move(names)}},
});
sink_decls.push_back(std::move(sink_decl));
}