You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ks...@apache.org on 2022/07/27 16:30:06 UTC

[arrow] 01/02: ARROW-15938: [C++][Compute] Fixing HashJoinBasicImpl in case of zero batches on build side (#13686)

This is an automated email from the ASF dual-hosted git repository.

kszucs pushed a commit to branch maint-9.0.0
in repository https://gitbox.apache.org/repos/asf/arrow.git

commit 9984bf3f4f2585af832d87353a89c7312dbbd716
Author: michalursa <76...@users.noreply.github.com>
AuthorDate: Wed Jul 27 09:14:21 2022 -0700

    ARROW-15938: [C++][Compute] Fixing HashJoinBasicImpl in case of zero batches on build side (#13686)
    
    Hash join implementation using HashJoinBasicImpl class was missing initialization in case of no batches one the build side.
    Initialization of a few data structures, mainly two RowEncoder instances for holding key and payload columns for rows on build side, was missing inside BuildHashTable_exec_task, the method responsible for transforming accumulated batches on build side of the hash join into a hash table.
    
    The initialization of RowEncoder inserts a single special row containing null values for all columns. This special row is accessed when outputting probe side rows with no matches in case of left outer and full outer join (these joins are supposed in that case to output nulls in place of all fields that would come from build side).
    
    Interestingly, the initialization was present in a similar case when batches were present on build side but all of them included zero rows. I modified the code to use the same code path for both these logically equivalent cases: a) zero build side batches and b) non-zero batches but with zero rows each.
    
    Authored-by: michalursa <mi...@ursacomputing.com>
    Signed-off-by: Krisztián Szűcs <sz...@gmail.com>
---
 cpp/src/arrow/compute/exec/hash_join.cc           | 57 +++++++++++------------
 cpp/src/arrow/compute/exec/hash_join_node_test.cc |  9 ++++
 2 files changed, 36 insertions(+), 30 deletions(-)

diff --git a/cpp/src/arrow/compute/exec/hash_join.cc b/cpp/src/arrow/compute/exec/hash_join.cc
index 07a3083fb9..5cf66b3d09 100644
--- a/cpp/src/arrow/compute/exec/hash_join.cc
+++ b/cpp/src/arrow/compute/exec/hash_join.cc
@@ -561,38 +561,35 @@ class HashJoinBasicImpl : public HashJoinImpl {
 
   Status BuildHashTable_exec_task(size_t thread_index, int64_t /*task_id*/) {
     AccumulationQueue batches = std::move(build_batches_);
-    if (batches.empty()) {
-      hash_table_empty_ = true;
-    } else {
-      dict_build_.InitEncoder(*schema_[1], &hash_table_keys_, ctx_);
-      bool has_payload = (schema_[1]->num_cols(HashJoinProjection::PAYLOAD) > 0);
-      if (has_payload) {
-        InitEncoder(1, HashJoinProjection::PAYLOAD, &hash_table_payloads_);
+    dict_build_.InitEncoder(*schema_[1], &hash_table_keys_, ctx_);
+    bool has_payload = (schema_[1]->num_cols(HashJoinProjection::PAYLOAD) > 0);
+    if (has_payload) {
+      InitEncoder(1, HashJoinProjection::PAYLOAD, &hash_table_payloads_);
+    }
+    hash_table_empty_ = true;
+
+    for (size_t ibatch = 0; ibatch < batches.batch_count(); ++ibatch) {
+      if (cancelled_) {
+        return Status::Cancelled("Hash join cancelled");
       }
-      hash_table_empty_ = true;
-      for (size_t ibatch = 0; ibatch < batches.batch_count(); ++ibatch) {
-        if (cancelled_) {
-          return Status::Cancelled("Hash join cancelled");
-        }
-        const ExecBatch& batch = batches[ibatch];
-        if (batch.length == 0) {
-          continue;
-        } else if (hash_table_empty_) {
-          hash_table_empty_ = false;
+      const ExecBatch& batch = batches[ibatch];
+      if (batch.length == 0) {
+        continue;
+      } else if (hash_table_empty_) {
+        hash_table_empty_ = false;
 
-          RETURN_NOT_OK(dict_build_.Init(*schema_[1], &batch, ctx_));
-        }
-        int32_t num_rows_before = hash_table_keys_.num_rows();
-        RETURN_NOT_OK(dict_build_.EncodeBatch(thread_index, *schema_[1], batch,
-                                              &hash_table_keys_, ctx_));
-        if (has_payload) {
-          RETURN_NOT_OK(
-              EncodeBatch(1, HashJoinProjection::PAYLOAD, &hash_table_payloads_, batch));
-        }
-        int32_t num_rows_after = hash_table_keys_.num_rows();
-        for (int32_t irow = num_rows_before; irow < num_rows_after; ++irow) {
-          hash_table_.insert(std::make_pair(hash_table_keys_.encoded_row(irow), irow));
-        }
+        RETURN_NOT_OK(dict_build_.Init(*schema_[1], &batch, ctx_));
+      }
+      int32_t num_rows_before = hash_table_keys_.num_rows();
+      RETURN_NOT_OK(dict_build_.EncodeBatch(thread_index, *schema_[1], batch,
+                                            &hash_table_keys_, ctx_));
+      if (has_payload) {
+        RETURN_NOT_OK(
+            EncodeBatch(1, HashJoinProjection::PAYLOAD, &hash_table_payloads_, batch));
+      }
+      int32_t num_rows_after = hash_table_keys_.num_rows();
+      for (int32_t irow = num_rows_before; irow < num_rows_after; ++irow) {
+        hash_table_.insert(std::make_pair(hash_table_keys_.encoded_row(irow), irow));
       }
     }
 
diff --git a/cpp/src/arrow/compute/exec/hash_join_node_test.cc b/cpp/src/arrow/compute/exec/hash_join_node_test.cc
index b4fd7ee643..d06b76159d 100644
--- a/cpp/src/arrow/compute/exec/hash_join_node_test.cc
+++ b/cpp/src/arrow/compute/exec/hash_join_node_test.cc
@@ -1298,6 +1298,15 @@ void TestHashJoinDictionaryHelper(
     }
   }
 
+  // Instead of sending 2 batches of size 0 we should not send any batches
+  // at all to more accurately simulate real world use cases
+  if (l_length == 0) {
+    l_batches.batches.resize(0);
+  }
+  if (r_length == 0) {
+    r_batches.batches.resize(0);
+  }
+
   auto exec_ctx = arrow::internal::make_unique<ExecContext>(
       default_memory_pool(), parallel ? arrow::internal::GetCpuThreadPool() : nullptr);
   ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make(exec_ctx.get()));