You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2022/02/10 01:27:45 UTC
[arrow] branch master updated: ARROW-15212: [C++] Handle suffix argument in joins
This is an automated email from the ASF dual-hosted git repository.
westonpace pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 401746b ARROW-15212: [C++] Handle suffix argument in joins
401746b is described below
commit 401746bede014cf3d06a88930c282755f151a93b
Author: Vibhatha Abeykoon <vi...@gmail.com>
AuthorDate: Wed Feb 9 15:25:35 2022 -1000
ARROW-15212: [C++] Handle suffix argument in joins
In this PR,
- [x] Replaced the prefixes with suffixes
- [x] Added a test case to check suffixes
This change is made to enable the consistency with the join APIs in `dplyr` for R and discussed in detail in: https://issues.apache.org/jira/browse/ARROW-14679
Closes #12110 from vibhatha/arrow-15212
Authored-by: Vibhatha Abeykoon <vi...@gmail.com>
Signed-off-by: Weston Pace <we...@gmail.com>
---
cpp/examples/arrow/CMakeLists.txt | 5 +-
cpp/examples/arrow/join_example.cc | 169 ++++++++++++++++++++++
cpp/src/arrow/compute/exec/hash_join.h | 4 +-
cpp/src/arrow/compute/exec/hash_join_node.cc | 82 +++++++----
cpp/src/arrow/compute/exec/hash_join_node_test.cc | 67 +++++++++
cpp/src/arrow/compute/exec/options.h | 40 ++---
cpp/src/arrow/compute/exec/util_test.cc | 44 +++---
testing | 2 +-
8 files changed, 341 insertions(+), 72 deletions(-)
diff --git a/cpp/examples/arrow/CMakeLists.txt b/cpp/examples/arrow/CMakeLists.txt
index 166970f..89e459f 100644
--- a/cpp/examples/arrow/CMakeLists.txt
+++ b/cpp/examples/arrow/CMakeLists.txt
@@ -103,5 +103,8 @@ if(ARROW_PARQUET AND ARROW_DATASET)
add_arrow_example(execution_plan_documentation_examples EXTRA_LINK_LIBS
${DATASET_EXAMPLES_LINK_LIBS})
- add_dependencies(dataset_documentation_example parquet)
+ add_dependencies(execution_plan_documentation_examples parquet)
+
+ add_arrow_example(join_example EXTRA_LINK_LIBS ${DATASET_EXAMPLES_LINK_LIBS})
+ add_dependencies(join_example parquet)
endif()
diff --git a/cpp/examples/arrow/join_example.cc b/cpp/examples/arrow/join_example.cc
new file mode 100644
index 0000000..05e97de
--- /dev/null
+++ b/cpp/examples/arrow/join_example.cc
@@ -0,0 +1,169 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// This example showcases various ways to work with Datasets. It's
+// intended to be paired with the documentation.
+
+#include <arrow/api.h>
+#include <arrow/compute/api.h>
+#include <arrow/compute/exec/exec_plan.h>
+#include <arrow/compute/exec/expression.h>
+#include <arrow/csv/api.h>
+
+#include <arrow/dataset/dataset.h>
+#include <arrow/dataset/plan.h>
+#include <arrow/dataset/scanner.h>
+
+#include <arrow/io/interfaces.h>
+#include <arrow/io/memory.h>
+#include <arrow/io/stdio.h>
+
+#include <arrow/filesystem/filesystem.h>
+
+#include <arrow/result.h>
+#include <arrow/status.h>
+
+#include <arrow/util/vector.h>
+
+#include <iostream>
+#include <vector>
+
+namespace ds = arrow::dataset;
+namespace cp = arrow::compute;
+
+#define ABORT_ON_FAILURE(expr) \
+ do { \
+ arrow::Status status_ = (expr); \
+ if (!status_.ok()) { \
+ std::cerr << status_.message() << std::endl; \
+ abort(); \
+ } \
+ } while (0);
+
+char kLeftRelationCsvData[] = R"csv(lkey,shared,ldistinct
+1,4,7
+2,5,8
+11,20,21
+3,6,9)csv";
+
+char kRightRelationCsvData[] = R"csv(rkey,shared,rdistinct
+1,10,13
+124,10,11
+2,11,14
+3,12,15)csv";
+
+arrow::Result<std::shared_ptr<arrow::dataset::Dataset>> CreateDataSetFromCSVData(
+ bool is_left) {
+ const arrow::io::IOContext& io_context = arrow::io::default_io_context();
+ std::shared_ptr<arrow::io::InputStream> input;
+ std::string csv_data = is_left ? kLeftRelationCsvData : kRightRelationCsvData;
+ std::cout << csv_data << std::endl;
+ arrow::util::string_view sv = csv_data;
+ input = std::make_shared<arrow::io::BufferReader>(sv);
+ auto read_options = arrow::csv::ReadOptions::Defaults();
+ auto parse_options = arrow::csv::ParseOptions::Defaults();
+ auto convert_options = arrow::csv::ConvertOptions::Defaults();
+
+ // Instantiate TableReader from input stream and options
+ ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::csv::TableReader> table_reader,
+ arrow::csv::TableReader::Make(io_context, input, read_options,
+ parse_options, convert_options));
+
+ // Read table from CSV file
+ ARROW_ASSIGN_OR_RAISE(auto maybe_table, table_reader->Read());
+ auto ds = std::make_shared<arrow::dataset::InMemoryDataset>(maybe_table);
+ arrow::Result<std::shared_ptr<arrow::dataset::InMemoryDataset>> result(std::move(ds));
+ return result;
+}
+
+arrow::Status DoHashJoin() {
+ cp::ExecContext exec_context(arrow::default_memory_pool(),
+ ::arrow::internal::GetCpuThreadPool());
+
+ arrow::dataset::internal::Initialize();
+
+ ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
+ cp::ExecPlan::Make(&exec_context));
+
+ arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
+
+ cp::ExecNode* left_source;
+ cp::ExecNode* right_source;
+
+ ARROW_ASSIGN_OR_RAISE(auto l_dataset, CreateDataSetFromCSVData(true));
+ ARROW_ASSIGN_OR_RAISE(auto r_dataset, CreateDataSetFromCSVData(false));
+
+ auto l_options = std::make_shared<arrow::dataset::ScanOptions>();
+ // create empty projection: "default" projection where each field is mapped to a
+ // field_ref
+ l_options->projection = cp::project({}, {});
+
+ auto r_options = std::make_shared<arrow::dataset::ScanOptions>();
+ // create empty projection: "default" projection where each field is mapped to a
+ // field_ref
+ r_options->projection = cp::project({}, {});
+
+ // construct the scan node
+ auto l_scan_node_options = arrow::dataset::ScanNodeOptions{l_dataset, l_options};
+ auto r_scan_node_options = arrow::dataset::ScanNodeOptions{r_dataset, r_options};
+
+ ARROW_ASSIGN_OR_RAISE(left_source,
+ cp::MakeExecNode("scan", plan.get(), {}, l_scan_node_options));
+ ARROW_ASSIGN_OR_RAISE(right_source,
+ cp::MakeExecNode("scan", plan.get(), {}, r_scan_node_options));
+
+ arrow::compute::HashJoinNodeOptions join_opts{arrow::compute::JoinType::INNER,
+ /*in_left_keys=*/{"lkey"},
+ /*in_right_keys=*/{"rkey"},
+ /*filter*/ arrow::compute::literal(true),
+ /*output_suffix_for_left*/ "_l",
+ /*output_suffix_for_right*/ "_r"};
+
+ ARROW_ASSIGN_OR_RAISE(
+ auto hashjoin,
+ cp::MakeExecNode("hashjoin", plan.get(), {left_source, right_source}, join_opts));
+
+ ARROW_ASSIGN_OR_RAISE(std::ignore, cp::MakeExecNode("sink", plan.get(), {hashjoin},
+ cp::SinkNodeOptions{&sink_gen}));
+ // expected columns l_a, l_b
+ std::shared_ptr<arrow::RecordBatchReader> sink_reader = cp::MakeGeneratorReader(
+ hashjoin->output_schema(), std::move(sink_gen), exec_context.memory_pool());
+
+ // validate the ExecPlan
+ ABORT_ON_FAILURE(plan->Validate());
+ // start the ExecPlan
+ ABORT_ON_FAILURE(plan->StartProducing());
+
+ // collect sink_reader into a Table
+ std::shared_ptr<arrow::Table> response_table;
+
+ ARROW_ASSIGN_OR_RAISE(response_table,
+ arrow::Table::FromRecordBatchReader(sink_reader.get()));
+
+ std::cout << "Results : " << response_table->ToString() << std::endl;
+
+ return arrow::Status::OK();
+}
+
+int main(int argc, char** argv) {
+ auto status = DoHashJoin();
+ if (!status.ok()) {
+ std::cerr << "Error occurred: " << status.message() << std::endl;
+ return EXIT_FAILURE;
+ }
+ return EXIT_SUCCESS;
+}
diff --git a/cpp/src/arrow/compute/exec/hash_join.h b/cpp/src/arrow/compute/exec/hash_join.h
index 83ad4cb..12455f0 100644
--- a/cpp/src/arrow/compute/exec/hash_join.h
+++ b/cpp/src/arrow/compute/exec/hash_join.h
@@ -59,8 +59,8 @@ class ARROW_EXPORT HashJoinSchema {
Result<Expression> BindFilter(Expression filter, const Schema& left_schema,
const Schema& right_schema);
- std::shared_ptr<Schema> MakeOutputSchema(const std::string& left_field_name_prefix,
- const std::string& right_field_name_prefix);
+ std::shared_ptr<Schema> MakeOutputSchema(const std::string& left_field_name_suffix,
+ const std::string& right_field_name_suffix);
bool LeftPayloadIsEmpty() { return PayloadIsEmpty(0); }
diff --git a/cpp/src/arrow/compute/exec/hash_join_node.cc b/cpp/src/arrow/compute/exec/hash_join_node.cc
index 9295b5a..93e54c6 100644
--- a/cpp/src/arrow/compute/exec/hash_join_node.cc
+++ b/cpp/src/arrow/compute/exec/hash_join_node.cc
@@ -86,8 +86,8 @@ Status HashJoinSchema::Init(JoinType join_type, const Schema& left_schema,
const Schema& right_schema,
const std::vector<FieldRef>& right_keys,
const Expression& filter,
- const std::string& left_field_name_prefix,
- const std::string& right_field_name_prefix) {
+ const std::string& left_field_name_suffix,
+ const std::string& right_field_name_suffix) {
std::vector<FieldRef> left_output;
if (join_type != JoinType::RIGHT_SEMI && join_type != JoinType::RIGHT_ANTI) {
const FieldVector& left_fields = left_schema.fields();
@@ -106,18 +106,18 @@ Status HashJoinSchema::Init(JoinType join_type, const Schema& left_schema,
}
}
return Init(join_type, left_schema, left_keys, left_output, right_schema, right_keys,
- right_output, filter, left_field_name_prefix, right_field_name_prefix);
+ right_output, filter, left_field_name_suffix, right_field_name_suffix);
}
Status HashJoinSchema::Init(
JoinType join_type, const Schema& left_schema, const std::vector<FieldRef>& left_keys,
const std::vector<FieldRef>& left_output, const Schema& right_schema,
const std::vector<FieldRef>& right_keys, const std::vector<FieldRef>& right_output,
- const Expression& filter, const std::string& left_field_name_prefix,
- const std::string& right_field_name_prefix) {
+ const Expression& filter, const std::string& left_field_name_suffix,
+ const std::string& right_field_name_suffix) {
RETURN_NOT_OK(ValidateSchemas(join_type, left_schema, left_keys, left_output,
right_schema, right_keys, right_output,
- left_field_name_prefix, right_field_name_prefix));
+ left_field_name_suffix, right_field_name_suffix));
std::vector<HashJoinProjection> handles;
std::vector<const std::vector<FieldRef>*> field_refs;
@@ -172,8 +172,8 @@ Status HashJoinSchema::ValidateSchemas(JoinType join_type, const Schema& left_sc
const Schema& right_schema,
const std::vector<FieldRef>& right_keys,
const std::vector<FieldRef>& right_output,
- const std::string& left_field_name_prefix,
- const std::string& right_field_name_prefix) {
+ const std::string& left_field_name_suffix,
+ const std::string& right_field_name_suffix) {
// Checks for key fields:
// 1. Key field refs must match exactly one input field
// 2. Same number of key fields on left and right
@@ -241,7 +241,7 @@ Status HashJoinSchema::ValidateSchemas(JoinType join_type, const Schema& left_sc
// 4. Left semi/anti join (right semi/anti join) must not output fields from right
// (left)
// 5. No name collisions in output fields after adding (potentially empty)
- // prefixes to left and right output
+ // suffixes to left and right output
//
if (left_output.empty() && right_output.empty()) {
return Status::Invalid("Join must output at least one field");
@@ -275,30 +275,60 @@ Status HashJoinSchema::ValidateSchemas(JoinType join_type, const Schema& left_sc
}
std::shared_ptr<Schema> HashJoinSchema::MakeOutputSchema(
- const std::string& left_field_name_prefix,
- const std::string& right_field_name_prefix) {
+ const std::string& left_field_name_suffix,
+ const std::string& right_field_name_suffix) {
std::vector<std::shared_ptr<Field>> fields;
int left_size = proj_maps[0].num_cols(HashJoinProjection::OUTPUT);
int right_size = proj_maps[1].num_cols(HashJoinProjection::OUTPUT);
fields.resize(left_size + right_size);
- for (int i = 0; i < left_size + right_size; ++i) {
- bool is_left = (i < left_size);
- int side = (is_left ? 0 : 1);
- int input_field_id = proj_maps[side]
- .map(HashJoinProjection::OUTPUT, HashJoinProjection::INPUT)
- .get(is_left ? i : i - left_size);
+ std::unordered_multimap<std::string, int> left_field_map;
+ left_field_map.reserve(left_size);
+ for (int i = 0; i < left_size; ++i) {
+ int side = 0; // left
+ int input_field_id =
+ proj_maps[side].map(HashJoinProjection::OUTPUT, HashJoinProjection::INPUT).get(i);
const std::string& input_field_name =
proj_maps[side].field_name(HashJoinProjection::INPUT, input_field_id);
const std::shared_ptr<DataType>& input_data_type =
proj_maps[side].data_type(HashJoinProjection::INPUT, input_field_id);
+ left_field_map.insert({input_field_name, i});
+ // insert left table field
+ fields[i] =
+ std::make_shared<Field>(input_field_name, input_data_type, true /*nullable*/);
+ }
- std::string output_field_name =
- (is_left ? left_field_name_prefix : right_field_name_prefix) + input_field_name;
+ for (int i = 0; i < right_size; ++i) {
+ int side = 1; // right
+ int input_field_id =
+ proj_maps[side].map(HashJoinProjection::OUTPUT, HashJoinProjection::INPUT).get(i);
+ const std::string& input_field_name =
+ proj_maps[side].field_name(HashJoinProjection::INPUT, input_field_id);
+ const std::shared_ptr<DataType>& input_data_type =
+ proj_maps[side].data_type(HashJoinProjection::INPUT, input_field_id);
+ // search the map and add suffix to the elements which
+ // are present both in left and right tables
+ auto search_it = left_field_map.equal_range(input_field_name);
+ bool match_found = false;
+ for (auto search = search_it.first; search != search_it.second; ++search) {
+ match_found = true;
+ auto left_val = search->first;
+ auto left_index = search->second;
+ auto left_field = fields[left_index];
+ // update left table field with suffix
+ fields[left_index] =
+ std::make_shared<Field>(input_field_name + left_field_name_suffix,
+ left_field->type(), true /*nullable*/);
+ // insert right table field with suffix
+ fields[left_size + i] = std::make_shared<Field>(
+ input_field_name + right_field_name_suffix, input_data_type, true /*nullable*/);
+ }
- // All fields coming out of join are marked as nullable.
- fields[i] =
- std::make_shared<Field>(output_field_name, input_data_type, true /*nullable*/);
+ if (!match_found) {
+ // insert right table field without suffix
+ fields[left_size + i] =
+ std::make_shared<Field>(input_field_name, input_data_type, true /*nullable*/);
+ }
}
return std::make_shared<Schema>(std::move(fields));
}
@@ -452,18 +482,19 @@ class HashJoinNode : public ExecNode {
const auto& left_schema = *(inputs[0]->output_schema());
const auto& right_schema = *(inputs[1]->output_schema());
+
// This will also validate input schemas
if (join_options.output_all) {
RETURN_NOT_OK(schema_mgr->Init(
join_options.join_type, left_schema, join_options.left_keys, right_schema,
join_options.right_keys, join_options.filter,
- join_options.output_prefix_for_left, join_options.output_prefix_for_right));
+ join_options.output_suffix_for_left, join_options.output_suffix_for_right));
} else {
RETURN_NOT_OK(schema_mgr->Init(
join_options.join_type, left_schema, join_options.left_keys,
join_options.left_output, right_schema, join_options.right_keys,
join_options.right_output, join_options.filter,
- join_options.output_prefix_for_left, join_options.output_prefix_for_right));
+ join_options.output_suffix_for_left, join_options.output_suffix_for_right));
}
ARROW_ASSIGN_OR_RAISE(
@@ -472,8 +503,7 @@ class HashJoinNode : public ExecNode {
// Generate output schema
std::shared_ptr<Schema> output_schema = schema_mgr->MakeOutputSchema(
- join_options.output_prefix_for_left, join_options.output_prefix_for_right);
-
+ join_options.output_suffix_for_left, join_options.output_suffix_for_right);
// Create hash join implementation object
ARROW_ASSIGN_OR_RAISE(std::unique_ptr<HashJoinImpl> impl, HashJoinImpl::MakeBasic());
diff --git a/cpp/src/arrow/compute/exec/hash_join_node_test.cc b/cpp/src/arrow/compute/exec/hash_join_node_test.cc
index 4db8e9e..a0c75af 100644
--- a/cpp/src/arrow/compute/exec/hash_join_node_test.cc
+++ b/cpp/src/arrow/compute/exec/hash_join_node_test.cc
@@ -937,6 +937,73 @@ void HashJoinWithExecPlan(Random64Bit& rng, bool parallel,
ASSERT_OK_AND_ASSIGN(*output, TableFromExecBatches(output_schema, res));
}
+TEST(HashJoin, Suffix) {
+ BatchesWithSchema input_left;
+ input_left.batches = {ExecBatchFromJSON({int32(), int32(), int32()}, R"([
+ [1, 4, 7],
+ [2, 5, 8],
+ [3, 6, 9]
+ ])")};
+ input_left.schema = schema(
+ {field("lkey", int32()), field("shared", int32()), field("ldistinct", int32())});
+
+ BatchesWithSchema input_right;
+ input_right.batches = {ExecBatchFromJSON({int32(), int32(), int32()}, R"([
+ [1, 10, 13],
+ [2, 11, 14],
+ [3, 12, 15]
+ ])")};
+ input_right.schema = schema(
+ {field("rkey", int32()), field("shared", int32()), field("rdistinct", int32())});
+
+ BatchesWithSchema expected;
+ expected.batches = {
+ ExecBatchFromJSON({int32(), int32(), int32(), int32(), int32(), int32()}, R"([
+ [1, 4, 7, 1, 10, 13],
+ [2, 5, 8, 2, 11, 14],
+ [3, 6, 9, 3, 12, 15]
+ ])")};
+
+ expected.schema = schema({field("lkey", int32()), field("shared_l", int32()),
+ field("ldistinct", int32()), field("rkey", int32()),
+ field("shared_r", int32()), field("rdistinct", int32())});
+
+ ExecContext exec_ctx;
+
+ ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make(&exec_ctx));
+ AsyncGenerator<util::optional<ExecBatch>> sink_gen;
+
+ ExecNode* left_source;
+ ExecNode* right_source;
+ ASSERT_OK_AND_ASSIGN(
+ left_source,
+ MakeExecNode("source", plan.get(), {},
+ SourceNodeOptions{input_left.schema, input_left.gen(/*parallel=*/false,
+ /*slow=*/false)}));
+
+ ASSERT_OK_AND_ASSIGN(right_source,
+ MakeExecNode("source", plan.get(), {},
+ SourceNodeOptions{input_right.schema,
+ input_right.gen(/*parallel=*/false,
+ /*slow=*/false)}))
+
+ HashJoinNodeOptions join_opts{JoinType::INNER,
+ /*left_keys=*/{"lkey"},
+ /*right_keys=*/{"rkey"}, literal(true), "_l", "_r"};
+
+ ASSERT_OK_AND_ASSIGN(
+ auto hashjoin,
+ MakeExecNode("hashjoin", plan.get(), {left_source, right_source}, join_opts));
+
+ ASSERT_OK_AND_ASSIGN(std::ignore, MakeExecNode("sink", plan.get(), {hashjoin},
+ SinkNodeOptions{&sink_gen}));
+
+ ASSERT_FINISHES_OK_AND_ASSIGN(auto result, StartAndCollect(plan.get(), sink_gen));
+
+ AssertExecBatchesEqual(expected.schema, expected.batches, result);
+ AssertSchemaEqual(expected.schema, hashjoin->output_schema());
+}
+
TEST(HashJoin, Random) {
Random64Bit rng(42);
#if defined(THREAD_SANITIZER)
diff --git a/cpp/src/arrow/compute/exec/options.h b/cpp/src/arrow/compute/exec/options.h
index d12b838..9544b9e 100644
--- a/cpp/src/arrow/compute/exec/options.h
+++ b/cpp/src/arrow/compute/exec/options.h
@@ -179,19 +179,19 @@ enum class JoinKeyCmp { EQ, IS };
/// \brief Make a node which implements join operation using hash join strategy.
class ARROW_EXPORT HashJoinNodeOptions : public ExecNodeOptions {
public:
- static constexpr const char* default_output_prefix_for_left = "";
- static constexpr const char* default_output_prefix_for_right = "";
+ static constexpr const char* default_output_suffix_for_left = "";
+ static constexpr const char* default_output_suffix_for_right = "";
HashJoinNodeOptions(
JoinType in_join_type, std::vector<FieldRef> in_left_keys,
std::vector<FieldRef> in_right_keys, Expression filter = literal(true),
- std::string output_prefix_for_left = default_output_prefix_for_left,
- std::string output_prefix_for_right = default_output_prefix_for_right)
+ std::string output_suffix_for_left = default_output_suffix_for_left,
+ std::string output_suffix_for_right = default_output_suffix_for_right)
: join_type(in_join_type),
left_keys(std::move(in_left_keys)),
right_keys(std::move(in_right_keys)),
output_all(true),
- output_prefix_for_left(std::move(output_prefix_for_left)),
- output_prefix_for_right(std::move(output_prefix_for_right)),
+ output_suffix_for_left(std::move(output_suffix_for_left)),
+ output_suffix_for_right(std::move(output_suffix_for_right)),
filter(std::move(filter)) {
this->key_cmp.resize(this->left_keys.size());
for (size_t i = 0; i < this->left_keys.size(); ++i) {
@@ -202,16 +202,16 @@ class ARROW_EXPORT HashJoinNodeOptions : public ExecNodeOptions {
JoinType join_type, std::vector<FieldRef> left_keys,
std::vector<FieldRef> right_keys, std::vector<FieldRef> left_output,
std::vector<FieldRef> right_output, Expression filter = literal(true),
- std::string output_prefix_for_left = default_output_prefix_for_left,
- std::string output_prefix_for_right = default_output_prefix_for_right)
+ std::string output_suffix_for_left = default_output_suffix_for_left,
+ std::string output_suffix_for_right = default_output_suffix_for_right)
: join_type(join_type),
left_keys(std::move(left_keys)),
right_keys(std::move(right_keys)),
output_all(false),
left_output(std::move(left_output)),
right_output(std::move(right_output)),
- output_prefix_for_left(std::move(output_prefix_for_left)),
- output_prefix_for_right(std::move(output_prefix_for_right)),
+ output_suffix_for_left(std::move(output_suffix_for_left)),
+ output_suffix_for_right(std::move(output_suffix_for_right)),
filter(std::move(filter)) {
this->key_cmp.resize(this->left_keys.size());
for (size_t i = 0; i < this->left_keys.size(); ++i) {
@@ -223,8 +223,8 @@ class ARROW_EXPORT HashJoinNodeOptions : public ExecNodeOptions {
std::vector<FieldRef> right_keys, std::vector<FieldRef> left_output,
std::vector<FieldRef> right_output, std::vector<JoinKeyCmp> key_cmp,
Expression filter = literal(true),
- std::string output_prefix_for_left = default_output_prefix_for_left,
- std::string output_prefix_for_right = default_output_prefix_for_right)
+ std::string output_suffix_for_left = default_output_suffix_for_left,
+ std::string output_suffix_for_right = default_output_suffix_for_right)
: join_type(join_type),
left_keys(std::move(left_keys)),
right_keys(std::move(right_keys)),
@@ -232,8 +232,8 @@ class ARROW_EXPORT HashJoinNodeOptions : public ExecNodeOptions {
left_output(std::move(left_output)),
right_output(std::move(right_output)),
key_cmp(std::move(key_cmp)),
- output_prefix_for_left(std::move(output_prefix_for_left)),
- output_prefix_for_right(std::move(output_prefix_for_right)),
+ output_suffix_for_left(std::move(output_suffix_for_left)),
+ output_suffix_for_right(std::move(output_suffix_for_right)),
filter(std::move(filter)) {}
// type of join (inner, left, semi...)
@@ -252,12 +252,12 @@ class ARROW_EXPORT HashJoinNodeOptions : public ExecNodeOptions {
// key comparison function (determines whether a null key is equal another null
// key or not)
std::vector<JoinKeyCmp> key_cmp;
- // prefix added to names of output fields coming from left input (used to
- // distinguish, if necessary, between fields of the same name in left and right
- // input and can be left empty if there are no name collisions)
- std::string output_prefix_for_left;
- // prefix added to names of output fields coming from right input
- std::string output_prefix_for_right;
+ // suffix added to names of output fields coming from left input (used to distinguish,
+ // if necessary, between fields of the same name in left and right input and can be left
+ // empty if there are no name collisions)
+ std::string output_suffix_for_left;
+ // suffix added to names of output fields coming from right input
+ std::string output_suffix_for_right;
// residual filter which is applied to matching rows. Rows that do not match
// the filter are not included. The filter is applied against the
// concatenated input schema (left fields then right fields) and can reference
diff --git a/cpp/src/arrow/compute/exec/util_test.cc b/cpp/src/arrow/compute/exec/util_test.cc
index 6f4b531..6d85991 100644
--- a/cpp/src/arrow/compute/exec/util_test.cc
+++ b/cpp/src/arrow/compute/exec/util_test.cc
@@ -25,8 +25,8 @@ using testing::Eq;
namespace arrow {
namespace compute {
-const char* kLeftPrefix = "left.";
-const char* kRightPrefix = "right.";
+const char* kLeftSuffix = ".left";
+const char* kRightSuffix = ".right";
TEST(FieldMap, Trivial) {
HashJoinSchema schema_mgr;
@@ -35,12 +35,12 @@ TEST(FieldMap, Trivial) {
auto right = schema({field("i32", int32())});
ASSERT_OK(schema_mgr.Init(JoinType::INNER, *left, {"i32"}, *right, {"i32"},
- literal(true), kLeftPrefix, kRightPrefix));
+ literal(true), kLeftSuffix, kRightSuffix));
- auto output = schema_mgr.MakeOutputSchema(kLeftPrefix, kRightPrefix);
+ auto output = schema_mgr.MakeOutputSchema(kLeftSuffix, kRightSuffix);
EXPECT_THAT(*output, Eq(Schema({
- field("left.i32", int32()),
- field("right.i32", int32()),
+ field("i32.left", int32()),
+ field("i32.right", int32()),
})));
auto i =
@@ -75,7 +75,7 @@ TEST(FieldMap, SingleKeyField) {
auto right = schema({field("f32", float32()), field("i32", int32())});
ASSERT_OK(schema_mgr.Init(JoinType::INNER, *left, {"i32"}, *right, {"i32"},
- literal(true), kLeftPrefix, kRightPrefix));
+ literal(true), kLeftSuffix, kRightSuffix));
EXPECT_EQ(schema_mgr.proj_maps[0].num_cols(HashJoinProjection::INPUT), 2);
EXPECT_EQ(schema_mgr.proj_maps[1].num_cols(HashJoinProjection::INPUT), 2);
@@ -84,12 +84,12 @@ TEST(FieldMap, SingleKeyField) {
EXPECT_EQ(schema_mgr.proj_maps[0].num_cols(HashJoinProjection::OUTPUT), 2);
EXPECT_EQ(schema_mgr.proj_maps[1].num_cols(HashJoinProjection::OUTPUT), 2);
- auto output = schema_mgr.MakeOutputSchema(kLeftPrefix, kRightPrefix);
+ auto output = schema_mgr.MakeOutputSchema(kLeftSuffix, kRightSuffix);
EXPECT_THAT(*output, Eq(Schema({
- field("left.i32", int32()),
- field("left.str", utf8()),
- field("right.f32", float32()),
- field("right.i32", int32()),
+ field("i32.left", int32()),
+ field("str", utf8()),
+ field("f32", float32()),
+ field("i32.right", int32()),
})));
auto i =
@@ -113,18 +113,18 @@ TEST(FieldMap, TwoKeyFields) {
});
ASSERT_OK(schema_mgr.Init(JoinType::INNER, *left, {"i32", "str"}, *right,
- {"i32", "str"}, literal(true), kLeftPrefix, kRightPrefix));
+ {"i32", "str"}, literal(true), kLeftSuffix, kRightSuffix));
- auto output = schema_mgr.MakeOutputSchema(kLeftPrefix, kRightPrefix);
+ auto output = schema_mgr.MakeOutputSchema(kLeftSuffix, kRightSuffix);
EXPECT_THAT(*output, Eq(Schema({
- field("left.i32", int32()),
- field("left.str", utf8()),
- field("left.bool", boolean()),
-
- field("right.i32", int32()),
- field("right.str", utf8()),
- field("right.f32", float32()),
- field("right.f64", float64()),
+ field("i32.left", int32()),
+ field("str.left", utf8()),
+ field("bool", boolean()),
+
+ field("i32.right", int32()),
+ field("str.right", utf8()),
+ field("f32", float32()),
+ field("f64", float64()),
})));
}
diff --git a/testing b/testing
index d9e7237..634739c 160000
--- a/testing
+++ b/testing
@@ -1 +1 @@
-Subproject commit d9e7237f4f71c91acb8daeda97354b073d28ac5b
+Subproject commit 634739c664433cec366b4b9a81d1e1044a8c5eda