You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2022/02/10 01:27:45 UTC

[arrow] branch master updated: ARROW-15212: [C++] Handle suffix argument in joins

This is an automated email from the ASF dual-hosted git repository.

westonpace pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 401746b  ARROW-15212: [C++] Handle suffix argument in joins
401746b is described below

commit 401746bede014cf3d06a88930c282755f151a93b
Author: Vibhatha Abeykoon <vi...@gmail.com>
AuthorDate: Wed Feb 9 15:25:35 2022 -1000

    ARROW-15212: [C++] Handle suffix argument in joins
    
    In this PR,
    
    - [x] Replaced the prefixes with suffixes
    - [x] Added a test case to check suffixes
    
    This change is made to enable the consistency with the join APIs in `dplyr` for R and discussed in detail in: https://issues.apache.org/jira/browse/ARROW-14679
    
    Closes #12110 from vibhatha/arrow-15212
    
    Authored-by: Vibhatha Abeykoon <vi...@gmail.com>
    Signed-off-by: Weston Pace <we...@gmail.com>
---
 cpp/examples/arrow/CMakeLists.txt                 |   5 +-
 cpp/examples/arrow/join_example.cc                | 169 ++++++++++++++++++++++
 cpp/src/arrow/compute/exec/hash_join.h            |   4 +-
 cpp/src/arrow/compute/exec/hash_join_node.cc      |  82 +++++++----
 cpp/src/arrow/compute/exec/hash_join_node_test.cc |  67 +++++++++
 cpp/src/arrow/compute/exec/options.h              |  40 ++---
 cpp/src/arrow/compute/exec/util_test.cc           |  44 +++---
 testing                                           |   2 +-
 8 files changed, 341 insertions(+), 72 deletions(-)

diff --git a/cpp/examples/arrow/CMakeLists.txt b/cpp/examples/arrow/CMakeLists.txt
index 166970f..89e459f 100644
--- a/cpp/examples/arrow/CMakeLists.txt
+++ b/cpp/examples/arrow/CMakeLists.txt
@@ -103,5 +103,8 @@ if(ARROW_PARQUET AND ARROW_DATASET)
 
   add_arrow_example(execution_plan_documentation_examples EXTRA_LINK_LIBS
                     ${DATASET_EXAMPLES_LINK_LIBS})
-  add_dependencies(dataset_documentation_example parquet)
+  add_dependencies(execution_plan_documentation_examples parquet)
+
+  add_arrow_example(join_example EXTRA_LINK_LIBS ${DATASET_EXAMPLES_LINK_LIBS})
+  add_dependencies(join_example parquet)
 endif()
diff --git a/cpp/examples/arrow/join_example.cc b/cpp/examples/arrow/join_example.cc
new file mode 100644
index 0000000..05e97de
--- /dev/null
+++ b/cpp/examples/arrow/join_example.cc
@@ -0,0 +1,169 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// This example showcases various ways to work with Datasets. It's
+// intended to be paired with the documentation.
+
+#include <arrow/api.h>
+#include <arrow/compute/api.h>
+#include <arrow/compute/exec/exec_plan.h>
+#include <arrow/compute/exec/expression.h>
+#include <arrow/csv/api.h>
+
+#include <arrow/dataset/dataset.h>
+#include <arrow/dataset/plan.h>
+#include <arrow/dataset/scanner.h>
+
+#include <arrow/io/interfaces.h>
+#include <arrow/io/memory.h>
+#include <arrow/io/stdio.h>
+
+#include <arrow/filesystem/filesystem.h>
+
+#include <arrow/result.h>
+#include <arrow/status.h>
+
+#include <arrow/util/vector.h>
+
+#include <iostream>
+#include <vector>
+
+namespace ds = arrow::dataset;
+namespace cp = arrow::compute;
+
+#define ABORT_ON_FAILURE(expr)                     \
+  do {                                             \
+    arrow::Status status_ = (expr);                \
+    if (!status_.ok()) {                           \
+      std::cerr << status_.message() << std::endl; \
+      abort();                                     \
+    }                                              \
+  } while (0);
+
+char kLeftRelationCsvData[] = R"csv(lkey,shared,ldistinct
+1,4,7
+2,5,8
+11,20,21
+3,6,9)csv";
+
+char kRightRelationCsvData[] = R"csv(rkey,shared,rdistinct
+1,10,13
+124,10,11
+2,11,14
+3,12,15)csv";
+
+arrow::Result<std::shared_ptr<arrow::dataset::Dataset>> CreateDataSetFromCSVData(
+    bool is_left) {
+  const arrow::io::IOContext& io_context = arrow::io::default_io_context();
+  std::shared_ptr<arrow::io::InputStream> input;
+  std::string csv_data = is_left ? kLeftRelationCsvData : kRightRelationCsvData;
+  std::cout << csv_data << std::endl;
+  arrow::util::string_view sv = csv_data;
+  input = std::make_shared<arrow::io::BufferReader>(sv);
+  auto read_options = arrow::csv::ReadOptions::Defaults();
+  auto parse_options = arrow::csv::ParseOptions::Defaults();
+  auto convert_options = arrow::csv::ConvertOptions::Defaults();
+
+  // Instantiate TableReader from input stream and options
+  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::csv::TableReader> table_reader,
+                        arrow::csv::TableReader::Make(io_context, input, read_options,
+                                                      parse_options, convert_options));
+
+  // Read table from CSV file
+  ARROW_ASSIGN_OR_RAISE(auto maybe_table, table_reader->Read());
+  auto ds = std::make_shared<arrow::dataset::InMemoryDataset>(maybe_table);
+  arrow::Result<std::shared_ptr<arrow::dataset::InMemoryDataset>> result(std::move(ds));
+  return result;
+}
+
+arrow::Status DoHashJoin() {
+  cp::ExecContext exec_context(arrow::default_memory_pool(),
+                               ::arrow::internal::GetCpuThreadPool());
+
+  arrow::dataset::internal::Initialize();
+
+  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<cp::ExecPlan> plan,
+                        cp::ExecPlan::Make(&exec_context));
+
+  arrow::AsyncGenerator<arrow::util::optional<cp::ExecBatch>> sink_gen;
+
+  cp::ExecNode* left_source;
+  cp::ExecNode* right_source;
+
+  ARROW_ASSIGN_OR_RAISE(auto l_dataset, CreateDataSetFromCSVData(true));
+  ARROW_ASSIGN_OR_RAISE(auto r_dataset, CreateDataSetFromCSVData(false));
+
+  auto l_options = std::make_shared<arrow::dataset::ScanOptions>();
+  // create empty projection: "default" projection where each field is mapped to a
+  // field_ref
+  l_options->projection = cp::project({}, {});
+
+  auto r_options = std::make_shared<arrow::dataset::ScanOptions>();
+  // create empty projection: "default" projection where each field is mapped to a
+  // field_ref
+  r_options->projection = cp::project({}, {});
+
+  // construct the scan node
+  auto l_scan_node_options = arrow::dataset::ScanNodeOptions{l_dataset, l_options};
+  auto r_scan_node_options = arrow::dataset::ScanNodeOptions{r_dataset, r_options};
+
+  ARROW_ASSIGN_OR_RAISE(left_source,
+                        cp::MakeExecNode("scan", plan.get(), {}, l_scan_node_options));
+  ARROW_ASSIGN_OR_RAISE(right_source,
+                        cp::MakeExecNode("scan", plan.get(), {}, r_scan_node_options));
+
+  arrow::compute::HashJoinNodeOptions join_opts{arrow::compute::JoinType::INNER,
+                                                /*in_left_keys=*/{"lkey"},
+                                                /*in_right_keys=*/{"rkey"},
+                                                /*filter*/ arrow::compute::literal(true),
+                                                /*output_suffix_for_left*/ "_l",
+                                                /*output_suffix_for_right*/ "_r"};
+
+  ARROW_ASSIGN_OR_RAISE(
+      auto hashjoin,
+      cp::MakeExecNode("hashjoin", plan.get(), {left_source, right_source}, join_opts));
+
+  ARROW_ASSIGN_OR_RAISE(std::ignore, cp::MakeExecNode("sink", plan.get(), {hashjoin},
+                                                      cp::SinkNodeOptions{&sink_gen}));
+  // expected columns l_a, l_b
+  std::shared_ptr<arrow::RecordBatchReader> sink_reader = cp::MakeGeneratorReader(
+      hashjoin->output_schema(), std::move(sink_gen), exec_context.memory_pool());
+
+  // validate the ExecPlan
+  ABORT_ON_FAILURE(plan->Validate());
+  // start the ExecPlan
+  ABORT_ON_FAILURE(plan->StartProducing());
+
+  // collect sink_reader into a Table
+  std::shared_ptr<arrow::Table> response_table;
+
+  ARROW_ASSIGN_OR_RAISE(response_table,
+                        arrow::Table::FromRecordBatchReader(sink_reader.get()));
+
+  std::cout << "Results : " << response_table->ToString() << std::endl;
+
+  return arrow::Status::OK();
+}
+
+int main(int argc, char** argv) {
+  auto status = DoHashJoin();
+  if (!status.ok()) {
+    std::cerr << "Error occurred: " << status.message() << std::endl;
+    return EXIT_FAILURE;
+  }
+  return EXIT_SUCCESS;
+}
diff --git a/cpp/src/arrow/compute/exec/hash_join.h b/cpp/src/arrow/compute/exec/hash_join.h
index 83ad4cb..12455f0 100644
--- a/cpp/src/arrow/compute/exec/hash_join.h
+++ b/cpp/src/arrow/compute/exec/hash_join.h
@@ -59,8 +59,8 @@ class ARROW_EXPORT HashJoinSchema {
 
   Result<Expression> BindFilter(Expression filter, const Schema& left_schema,
                                 const Schema& right_schema);
-  std::shared_ptr<Schema> MakeOutputSchema(const std::string& left_field_name_prefix,
-                                           const std::string& right_field_name_prefix);
+  std::shared_ptr<Schema> MakeOutputSchema(const std::string& left_field_name_suffix,
+                                           const std::string& right_field_name_suffix);
 
   bool LeftPayloadIsEmpty() { return PayloadIsEmpty(0); }
 
diff --git a/cpp/src/arrow/compute/exec/hash_join_node.cc b/cpp/src/arrow/compute/exec/hash_join_node.cc
index 9295b5a..93e54c6 100644
--- a/cpp/src/arrow/compute/exec/hash_join_node.cc
+++ b/cpp/src/arrow/compute/exec/hash_join_node.cc
@@ -86,8 +86,8 @@ Status HashJoinSchema::Init(JoinType join_type, const Schema& left_schema,
                             const Schema& right_schema,
                             const std::vector<FieldRef>& right_keys,
                             const Expression& filter,
-                            const std::string& left_field_name_prefix,
-                            const std::string& right_field_name_prefix) {
+                            const std::string& left_field_name_suffix,
+                            const std::string& right_field_name_suffix) {
   std::vector<FieldRef> left_output;
   if (join_type != JoinType::RIGHT_SEMI && join_type != JoinType::RIGHT_ANTI) {
     const FieldVector& left_fields = left_schema.fields();
@@ -106,18 +106,18 @@ Status HashJoinSchema::Init(JoinType join_type, const Schema& left_schema,
     }
   }
   return Init(join_type, left_schema, left_keys, left_output, right_schema, right_keys,
-              right_output, filter, left_field_name_prefix, right_field_name_prefix);
+              right_output, filter, left_field_name_suffix, right_field_name_suffix);
 }
 
 Status HashJoinSchema::Init(
     JoinType join_type, const Schema& left_schema, const std::vector<FieldRef>& left_keys,
     const std::vector<FieldRef>& left_output, const Schema& right_schema,
     const std::vector<FieldRef>& right_keys, const std::vector<FieldRef>& right_output,
-    const Expression& filter, const std::string& left_field_name_prefix,
-    const std::string& right_field_name_prefix) {
+    const Expression& filter, const std::string& left_field_name_suffix,
+    const std::string& right_field_name_suffix) {
   RETURN_NOT_OK(ValidateSchemas(join_type, left_schema, left_keys, left_output,
                                 right_schema, right_keys, right_output,
-                                left_field_name_prefix, right_field_name_prefix));
+                                left_field_name_suffix, right_field_name_suffix));
 
   std::vector<HashJoinProjection> handles;
   std::vector<const std::vector<FieldRef>*> field_refs;
@@ -172,8 +172,8 @@ Status HashJoinSchema::ValidateSchemas(JoinType join_type, const Schema& left_sc
                                        const Schema& right_schema,
                                        const std::vector<FieldRef>& right_keys,
                                        const std::vector<FieldRef>& right_output,
-                                       const std::string& left_field_name_prefix,
-                                       const std::string& right_field_name_prefix) {
+                                       const std::string& left_field_name_suffix,
+                                       const std::string& right_field_name_suffix) {
   // Checks for key fields:
   // 1. Key field refs must match exactly one input field
   // 2. Same number of key fields on left and right
@@ -241,7 +241,7 @@ Status HashJoinSchema::ValidateSchemas(JoinType join_type, const Schema& left_sc
   // 4. Left semi/anti join (right semi/anti join) must not output fields from right
   // (left)
   // 5. No name collisions in output fields after adding (potentially empty)
-  // prefixes to left and right output
+  // suffixes to left and right output
   //
   if (left_output.empty() && right_output.empty()) {
     return Status::Invalid("Join must output at least one field");
@@ -275,30 +275,60 @@ Status HashJoinSchema::ValidateSchemas(JoinType join_type, const Schema& left_sc
 }
 
 std::shared_ptr<Schema> HashJoinSchema::MakeOutputSchema(
-    const std::string& left_field_name_prefix,
-    const std::string& right_field_name_prefix) {
+    const std::string& left_field_name_suffix,
+    const std::string& right_field_name_suffix) {
   std::vector<std::shared_ptr<Field>> fields;
   int left_size = proj_maps[0].num_cols(HashJoinProjection::OUTPUT);
   int right_size = proj_maps[1].num_cols(HashJoinProjection::OUTPUT);
   fields.resize(left_size + right_size);
 
-  for (int i = 0; i < left_size + right_size; ++i) {
-    bool is_left = (i < left_size);
-    int side = (is_left ? 0 : 1);
-    int input_field_id = proj_maps[side]
-                             .map(HashJoinProjection::OUTPUT, HashJoinProjection::INPUT)
-                             .get(is_left ? i : i - left_size);
+  std::unordered_multimap<std::string, int> left_field_map;
+  left_field_map.reserve(left_size);
+  for (int i = 0; i < left_size; ++i) {
+    int side = 0;  // left
+    int input_field_id =
+        proj_maps[side].map(HashJoinProjection::OUTPUT, HashJoinProjection::INPUT).get(i);
     const std::string& input_field_name =
         proj_maps[side].field_name(HashJoinProjection::INPUT, input_field_id);
     const std::shared_ptr<DataType>& input_data_type =
         proj_maps[side].data_type(HashJoinProjection::INPUT, input_field_id);
+    left_field_map.insert({input_field_name, i});
+    // insert left table field
+    fields[i] =
+        std::make_shared<Field>(input_field_name, input_data_type, true /*nullable*/);
+  }
 
-    std::string output_field_name =
-        (is_left ? left_field_name_prefix : right_field_name_prefix) + input_field_name;
+  for (int i = 0; i < right_size; ++i) {
+    int side = 1;  // right
+    int input_field_id =
+        proj_maps[side].map(HashJoinProjection::OUTPUT, HashJoinProjection::INPUT).get(i);
+    const std::string& input_field_name =
+        proj_maps[side].field_name(HashJoinProjection::INPUT, input_field_id);
+    const std::shared_ptr<DataType>& input_data_type =
+        proj_maps[side].data_type(HashJoinProjection::INPUT, input_field_id);
+    // search the map and add suffix to the elements which
+    // are present both in left and right tables
+    auto search_it = left_field_map.equal_range(input_field_name);
+    bool match_found = false;
+    for (auto search = search_it.first; search != search_it.second; ++search) {
+      match_found = true;
+      auto left_val = search->first;
+      auto left_index = search->second;
+      auto left_field = fields[left_index];
+      // update left table field with suffix
+      fields[left_index] =
+          std::make_shared<Field>(input_field_name + left_field_name_suffix,
+                                  left_field->type(), true /*nullable*/);
+      // insert right table field with suffix
+      fields[left_size + i] = std::make_shared<Field>(
+          input_field_name + right_field_name_suffix, input_data_type, true /*nullable*/);
+    }
 
-    // All fields coming out of join are marked as nullable.
-    fields[i] =
-        std::make_shared<Field>(output_field_name, input_data_type, true /*nullable*/);
+    if (!match_found) {
+      // insert right table field without suffix
+      fields[left_size + i] =
+          std::make_shared<Field>(input_field_name, input_data_type, true /*nullable*/);
+    }
   }
   return std::make_shared<Schema>(std::move(fields));
 }
@@ -452,18 +482,19 @@ class HashJoinNode : public ExecNode {
 
     const auto& left_schema = *(inputs[0]->output_schema());
     const auto& right_schema = *(inputs[1]->output_schema());
+
     // This will also validate input schemas
     if (join_options.output_all) {
       RETURN_NOT_OK(schema_mgr->Init(
           join_options.join_type, left_schema, join_options.left_keys, right_schema,
           join_options.right_keys, join_options.filter,
-          join_options.output_prefix_for_left, join_options.output_prefix_for_right));
+          join_options.output_suffix_for_left, join_options.output_suffix_for_right));
     } else {
       RETURN_NOT_OK(schema_mgr->Init(
           join_options.join_type, left_schema, join_options.left_keys,
           join_options.left_output, right_schema, join_options.right_keys,
           join_options.right_output, join_options.filter,
-          join_options.output_prefix_for_left, join_options.output_prefix_for_right));
+          join_options.output_suffix_for_left, join_options.output_suffix_for_right));
     }
 
     ARROW_ASSIGN_OR_RAISE(
@@ -472,8 +503,7 @@ class HashJoinNode : public ExecNode {
 
     // Generate output schema
     std::shared_ptr<Schema> output_schema = schema_mgr->MakeOutputSchema(
-        join_options.output_prefix_for_left, join_options.output_prefix_for_right);
-
+        join_options.output_suffix_for_left, join_options.output_suffix_for_right);
     // Create hash join implementation object
     ARROW_ASSIGN_OR_RAISE(std::unique_ptr<HashJoinImpl> impl, HashJoinImpl::MakeBasic());
 
diff --git a/cpp/src/arrow/compute/exec/hash_join_node_test.cc b/cpp/src/arrow/compute/exec/hash_join_node_test.cc
index 4db8e9e..a0c75af 100644
--- a/cpp/src/arrow/compute/exec/hash_join_node_test.cc
+++ b/cpp/src/arrow/compute/exec/hash_join_node_test.cc
@@ -937,6 +937,73 @@ void HashJoinWithExecPlan(Random64Bit& rng, bool parallel,
   ASSERT_OK_AND_ASSIGN(*output, TableFromExecBatches(output_schema, res));
 }
 
+TEST(HashJoin, Suffix) {
+  BatchesWithSchema input_left;
+  input_left.batches = {ExecBatchFromJSON({int32(), int32(), int32()}, R"([
+                   [1, 4, 7],
+                   [2, 5, 8],
+                   [3, 6, 9]
+                 ])")};
+  input_left.schema = schema(
+      {field("lkey", int32()), field("shared", int32()), field("ldistinct", int32())});
+
+  BatchesWithSchema input_right;
+  input_right.batches = {ExecBatchFromJSON({int32(), int32(), int32()}, R"([
+                   [1, 10, 13],
+                   [2, 11, 14],
+                   [3, 12, 15]
+                 ])")};
+  input_right.schema = schema(
+      {field("rkey", int32()), field("shared", int32()), field("rdistinct", int32())});
+
+  BatchesWithSchema expected;
+  expected.batches = {
+      ExecBatchFromJSON({int32(), int32(), int32(), int32(), int32(), int32()}, R"([
+    [1, 4, 7, 1, 10, 13],
+    [2, 5, 8, 2, 11, 14],
+    [3, 6, 9, 3, 12, 15]
+  ])")};
+
+  expected.schema = schema({field("lkey", int32()), field("shared_l", int32()),
+                            field("ldistinct", int32()), field("rkey", int32()),
+                            field("shared_r", int32()), field("rdistinct", int32())});
+
+  ExecContext exec_ctx;
+
+  ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make(&exec_ctx));
+  AsyncGenerator<util::optional<ExecBatch>> sink_gen;
+
+  ExecNode* left_source;
+  ExecNode* right_source;
+  ASSERT_OK_AND_ASSIGN(
+      left_source,
+      MakeExecNode("source", plan.get(), {},
+                   SourceNodeOptions{input_left.schema, input_left.gen(/*parallel=*/false,
+                                                                       /*slow=*/false)}));
+
+  ASSERT_OK_AND_ASSIGN(right_source,
+                       MakeExecNode("source", plan.get(), {},
+                                    SourceNodeOptions{input_right.schema,
+                                                      input_right.gen(/*parallel=*/false,
+                                                                      /*slow=*/false)}))
+
+  HashJoinNodeOptions join_opts{JoinType::INNER,
+                                /*left_keys=*/{"lkey"},
+                                /*right_keys=*/{"rkey"}, literal(true), "_l", "_r"};
+
+  ASSERT_OK_AND_ASSIGN(
+      auto hashjoin,
+      MakeExecNode("hashjoin", plan.get(), {left_source, right_source}, join_opts));
+
+  ASSERT_OK_AND_ASSIGN(std::ignore, MakeExecNode("sink", plan.get(), {hashjoin},
+                                                 SinkNodeOptions{&sink_gen}));
+
+  ASSERT_FINISHES_OK_AND_ASSIGN(auto result, StartAndCollect(plan.get(), sink_gen));
+
+  AssertExecBatchesEqual(expected.schema, expected.batches, result);
+  AssertSchemaEqual(expected.schema, hashjoin->output_schema());
+}
+
 TEST(HashJoin, Random) {
   Random64Bit rng(42);
 #if defined(THREAD_SANITIZER)
diff --git a/cpp/src/arrow/compute/exec/options.h b/cpp/src/arrow/compute/exec/options.h
index d12b838..9544b9e 100644
--- a/cpp/src/arrow/compute/exec/options.h
+++ b/cpp/src/arrow/compute/exec/options.h
@@ -179,19 +179,19 @@ enum class JoinKeyCmp { EQ, IS };
 /// \brief Make a node which implements join operation using hash join strategy.
 class ARROW_EXPORT HashJoinNodeOptions : public ExecNodeOptions {
  public:
-  static constexpr const char* default_output_prefix_for_left = "";
-  static constexpr const char* default_output_prefix_for_right = "";
+  static constexpr const char* default_output_suffix_for_left = "";
+  static constexpr const char* default_output_suffix_for_right = "";
   HashJoinNodeOptions(
       JoinType in_join_type, std::vector<FieldRef> in_left_keys,
       std::vector<FieldRef> in_right_keys, Expression filter = literal(true),
-      std::string output_prefix_for_left = default_output_prefix_for_left,
-      std::string output_prefix_for_right = default_output_prefix_for_right)
+      std::string output_suffix_for_left = default_output_suffix_for_left,
+      std::string output_suffix_for_right = default_output_suffix_for_right)
       : join_type(in_join_type),
         left_keys(std::move(in_left_keys)),
         right_keys(std::move(in_right_keys)),
         output_all(true),
-        output_prefix_for_left(std::move(output_prefix_for_left)),
-        output_prefix_for_right(std::move(output_prefix_for_right)),
+        output_suffix_for_left(std::move(output_suffix_for_left)),
+        output_suffix_for_right(std::move(output_suffix_for_right)),
         filter(std::move(filter)) {
     this->key_cmp.resize(this->left_keys.size());
     for (size_t i = 0; i < this->left_keys.size(); ++i) {
@@ -202,16 +202,16 @@ class ARROW_EXPORT HashJoinNodeOptions : public ExecNodeOptions {
       JoinType join_type, std::vector<FieldRef> left_keys,
       std::vector<FieldRef> right_keys, std::vector<FieldRef> left_output,
       std::vector<FieldRef> right_output, Expression filter = literal(true),
-      std::string output_prefix_for_left = default_output_prefix_for_left,
-      std::string output_prefix_for_right = default_output_prefix_for_right)
+      std::string output_suffix_for_left = default_output_suffix_for_left,
+      std::string output_suffix_for_right = default_output_suffix_for_right)
       : join_type(join_type),
         left_keys(std::move(left_keys)),
         right_keys(std::move(right_keys)),
         output_all(false),
         left_output(std::move(left_output)),
         right_output(std::move(right_output)),
-        output_prefix_for_left(std::move(output_prefix_for_left)),
-        output_prefix_for_right(std::move(output_prefix_for_right)),
+        output_suffix_for_left(std::move(output_suffix_for_left)),
+        output_suffix_for_right(std::move(output_suffix_for_right)),
         filter(std::move(filter)) {
     this->key_cmp.resize(this->left_keys.size());
     for (size_t i = 0; i < this->left_keys.size(); ++i) {
@@ -223,8 +223,8 @@ class ARROW_EXPORT HashJoinNodeOptions : public ExecNodeOptions {
       std::vector<FieldRef> right_keys, std::vector<FieldRef> left_output,
       std::vector<FieldRef> right_output, std::vector<JoinKeyCmp> key_cmp,
       Expression filter = literal(true),
-      std::string output_prefix_for_left = default_output_prefix_for_left,
-      std::string output_prefix_for_right = default_output_prefix_for_right)
+      std::string output_suffix_for_left = default_output_suffix_for_left,
+      std::string output_suffix_for_right = default_output_suffix_for_right)
       : join_type(join_type),
         left_keys(std::move(left_keys)),
         right_keys(std::move(right_keys)),
@@ -232,8 +232,8 @@ class ARROW_EXPORT HashJoinNodeOptions : public ExecNodeOptions {
         left_output(std::move(left_output)),
         right_output(std::move(right_output)),
         key_cmp(std::move(key_cmp)),
-        output_prefix_for_left(std::move(output_prefix_for_left)),
-        output_prefix_for_right(std::move(output_prefix_for_right)),
+        output_suffix_for_left(std::move(output_suffix_for_left)),
+        output_suffix_for_right(std::move(output_suffix_for_right)),
         filter(std::move(filter)) {}
 
   // type of join (inner, left, semi...)
@@ -252,12 +252,12 @@ class ARROW_EXPORT HashJoinNodeOptions : public ExecNodeOptions {
   // key comparison function (determines whether a null key is equal another null
   // key or not)
   std::vector<JoinKeyCmp> key_cmp;
-  // prefix added to names of output fields coming from left input (used to
-  // distinguish, if necessary, between fields of the same name in left and right
-  // input and can be left empty if there are no name collisions)
-  std::string output_prefix_for_left;
-  // prefix added to names of output fields coming from right input
-  std::string output_prefix_for_right;
+  // suffix added to names of output fields coming from left input (used to distinguish,
+  // if necessary, between fields of the same name in left and right input and can be left
+  // empty if there are no name collisions)
+  std::string output_suffix_for_left;
+  // suffix added to names of output fields coming from right input
+  std::string output_suffix_for_right;
   // residual filter which is applied to matching rows.  Rows that do not match
   // the filter are not included.  The filter is applied against the
   // concatenated input schema (left fields then right fields) and can reference
diff --git a/cpp/src/arrow/compute/exec/util_test.cc b/cpp/src/arrow/compute/exec/util_test.cc
index 6f4b531..6d85991 100644
--- a/cpp/src/arrow/compute/exec/util_test.cc
+++ b/cpp/src/arrow/compute/exec/util_test.cc
@@ -25,8 +25,8 @@ using testing::Eq;
 namespace arrow {
 namespace compute {
 
-const char* kLeftPrefix = "left.";
-const char* kRightPrefix = "right.";
+const char* kLeftSuffix = ".left";
+const char* kRightSuffix = ".right";
 
 TEST(FieldMap, Trivial) {
   HashJoinSchema schema_mgr;
@@ -35,12 +35,12 @@ TEST(FieldMap, Trivial) {
   auto right = schema({field("i32", int32())});
 
   ASSERT_OK(schema_mgr.Init(JoinType::INNER, *left, {"i32"}, *right, {"i32"},
-                            literal(true), kLeftPrefix, kRightPrefix));
+                            literal(true), kLeftSuffix, kRightSuffix));
 
-  auto output = schema_mgr.MakeOutputSchema(kLeftPrefix, kRightPrefix);
+  auto output = schema_mgr.MakeOutputSchema(kLeftSuffix, kRightSuffix);
   EXPECT_THAT(*output, Eq(Schema({
-                           field("left.i32", int32()),
-                           field("right.i32", int32()),
+                           field("i32.left", int32()),
+                           field("i32.right", int32()),
                        })));
 
   auto i =
@@ -75,7 +75,7 @@ TEST(FieldMap, SingleKeyField) {
   auto right = schema({field("f32", float32()), field("i32", int32())});
 
   ASSERT_OK(schema_mgr.Init(JoinType::INNER, *left, {"i32"}, *right, {"i32"},
-                            literal(true), kLeftPrefix, kRightPrefix));
+                            literal(true), kLeftSuffix, kRightSuffix));
 
   EXPECT_EQ(schema_mgr.proj_maps[0].num_cols(HashJoinProjection::INPUT), 2);
   EXPECT_EQ(schema_mgr.proj_maps[1].num_cols(HashJoinProjection::INPUT), 2);
@@ -84,12 +84,12 @@ TEST(FieldMap, SingleKeyField) {
   EXPECT_EQ(schema_mgr.proj_maps[0].num_cols(HashJoinProjection::OUTPUT), 2);
   EXPECT_EQ(schema_mgr.proj_maps[1].num_cols(HashJoinProjection::OUTPUT), 2);
 
-  auto output = schema_mgr.MakeOutputSchema(kLeftPrefix, kRightPrefix);
+  auto output = schema_mgr.MakeOutputSchema(kLeftSuffix, kRightSuffix);
   EXPECT_THAT(*output, Eq(Schema({
-                           field("left.i32", int32()),
-                           field("left.str", utf8()),
-                           field("right.f32", float32()),
-                           field("right.i32", int32()),
+                           field("i32.left", int32()),
+                           field("str", utf8()),
+                           field("f32", float32()),
+                           field("i32.right", int32()),
                        })));
 
   auto i =
@@ -113,18 +113,18 @@ TEST(FieldMap, TwoKeyFields) {
   });
 
   ASSERT_OK(schema_mgr.Init(JoinType::INNER, *left, {"i32", "str"}, *right,
-                            {"i32", "str"}, literal(true), kLeftPrefix, kRightPrefix));
+                            {"i32", "str"}, literal(true), kLeftSuffix, kRightSuffix));
 
-  auto output = schema_mgr.MakeOutputSchema(kLeftPrefix, kRightPrefix);
+  auto output = schema_mgr.MakeOutputSchema(kLeftSuffix, kRightSuffix);
   EXPECT_THAT(*output, Eq(Schema({
-                           field("left.i32", int32()),
-                           field("left.str", utf8()),
-                           field("left.bool", boolean()),
-
-                           field("right.i32", int32()),
-                           field("right.str", utf8()),
-                           field("right.f32", float32()),
-                           field("right.f64", float64()),
+                           field("i32.left", int32()),
+                           field("str.left", utf8()),
+                           field("bool", boolean()),
+
+                           field("i32.right", int32()),
+                           field("str.right", utf8()),
+                           field("f32", float32()),
+                           field("f64", float64()),
                        })));
 }
 
diff --git a/testing b/testing
index d9e7237..634739c 160000
--- a/testing
+++ b/testing
@@ -1 +1 @@
-Subproject commit d9e7237f4f71c91acb8daeda97354b073d28ac5b
+Subproject commit 634739c664433cec366b4b9a81d1e1044a8c5eda