You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2022/05/26 01:18:32 UTC

[arrow] branch master updated: ARROW-16646: [C++] Allow key columns to be scalars in Bloom filter

This is an automated email from the ASF dual-hosted git repository.

westonpace pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 5c02620ab3 ARROW-16646: [C++] Allow key columns to be scalars in Bloom filter
5c02620ab3 is described below

commit 5c02620ab34bdd94a51c2c3c7429018c0ea48359
Author: Sasha Krassovsky <kr...@gmail.com>
AuthorDate: Wed May 25 15:18:09 2022 -1000

    ARROW-16646: [C++] Allow key columns to be scalars in Bloom filter
    
    Closes #13236 from save-buffer/sasha_scalars
    
    Authored-by: Sasha Krassovsky <kr...@gmail.com>
    Signed-off-by: Weston Pace <we...@gmail.com>
---
 cpp/src/arrow/compute/exec/hash_join.cc           | 10 ++++++++++
 cpp/src/arrow/compute/exec/hash_join_node.cc      | 22 ++++++++++++----------
 cpp/src/arrow/compute/exec/hash_join_node_test.cc | 18 ++++++++++++++++++
 3 files changed, 40 insertions(+), 10 deletions(-)

diff --git a/cpp/src/arrow/compute/exec/hash_join.cc b/cpp/src/arrow/compute/exec/hash_join.cc
index 15a006c81d..63d9522c44 100644
--- a/cpp/src/arrow/compute/exec/hash_join.cc
+++ b/cpp/src/arrow/compute/exec/hash_join.cc
@@ -671,6 +671,11 @@ class HashJoinBasicImpl : public HashJoinImpl {
       for (size_t i = 0; i < keys.size(); i++) {
         int input_idx = bloom_filter_column_maps_[ifilter][i];
         keys[i] = batch[input_idx];
+        if (keys[i].is_scalar()) {
+          ARROW_ASSIGN_OR_RAISE(
+              keys[i],
+              MakeArrayFromScalar(*keys[i].scalar(), batch.length, ctx_->memory_pool()));
+        }
       }
       ARROW_ASSIGN_OR_RAISE(ExecBatch key_batch, ExecBatch::Make(std::move(keys)));
       RETURN_NOT_OK(Hashing32::HashBatch(
@@ -715,6 +720,11 @@ class HashJoinBasicImpl : public HashJoinImpl {
     for (size_t i = 0; i < key_columns.size(); i++) {
       int input_idx = key_to_in.get(static_cast<int>(i));
       key_columns[i] = input_batch[input_idx];
+      if (key_columns[i].is_scalar()) {
+        ARROW_ASSIGN_OR_RAISE(
+            key_columns[i], MakeArrayFromScalar(*key_columns[i].scalar(),
+                                                input_batch.length, ctx_->memory_pool()));
+      }
     }
     ARROW_ASSIGN_OR_RAISE(ExecBatch key_batch, ExecBatch::Make(std::move(key_columns)));
 
diff --git a/cpp/src/arrow/compute/exec/hash_join_node.cc b/cpp/src/arrow/compute/exec/hash_join_node.cc
index d8d729dd1a..e47d609554 100644
--- a/cpp/src/arrow/compute/exec/hash_join_node.cc
+++ b/cpp/src/arrow/compute/exec/hash_join_node.cc
@@ -600,16 +600,18 @@ class HashJoinNode : public ExecNode {
                                                join_type_ == JoinType::FULL_OUTER;
     disable_bloom_filter_ = disable_bloom_filter_ || bloom_filter_does_not_apply_to_join;
 
-    SchemaProjectionMap build_keys_to_input =
-        schema_mgr_->proj_maps[1].map(HashJoinProjection::KEY, HashJoinProjection::INPUT);
-    // Bloom filter currently doesn't support dictionaries.
-    for (int i = 0; i < build_keys_to_input.num_cols; i++) {
-      int idx = build_keys_to_input.get(i);
-      bool is_dict =
-          inputs_[1]->output_schema()->field(idx)->type()->id() == Type::DICTIONARY;
-      if (is_dict) {
-        disable_bloom_filter_ = true;
-        break;
+    for (int side = 0; side <= 1 && !disable_bloom_filter_; side++) {
+      SchemaProjectionMap keys_to_input = schema_mgr_->proj_maps[side].map(
+          HashJoinProjection::KEY, HashJoinProjection::INPUT);
+      // Bloom filter currently doesn't support dictionaries.
+      for (int i = 0; i < keys_to_input.num_cols; i++) {
+        int idx = keys_to_input.get(i);
+        bool is_dict =
+            inputs_[side]->output_schema()->field(idx)->type()->id() == Type::DICTIONARY;
+        if (is_dict) {
+          disable_bloom_filter_ = true;
+          break;
+        }
       }
     }
 
diff --git a/cpp/src/arrow/compute/exec/hash_join_node_test.cc b/cpp/src/arrow/compute/exec/hash_join_node_test.cc
index f8da71c7b5..c4eccd68d3 100644
--- a/cpp/src/arrow/compute/exec/hash_join_node_test.cc
+++ b/cpp/src/arrow/compute/exec/hash_join_node_test.cc
@@ -1667,6 +1667,24 @@ TEST(HashJoin, Scalars) {
           ArrayFromJSON(utf8(), R"(["p", "q", "p", "q", "r"])"), 1, swap_sides);
     }
   }
+
+  // Scalars in key columns, Inner join to exercise Bloom filter
+  for (auto use_scalar_dict : {false, true}) {
+    for (auto swap_sides : {false, true}) {
+      TestHashJoinDictionaryHelper(
+          JoinType::INNER, JoinKeyCmp::EQ, false /*parallel*/,
+          // Input
+          use_scalar_dict ? DictScalarFromJSON(int8_utf8, "1", R"(["b", "a", "c"])")
+                          : ScalarFromJSON(utf8(), "\"a\""),
+          ArrayFromJSON(utf8(), R"(["x", "y"])"),
+          ArrayFromJSON(utf8(), R"(["a", null, "b"])"),
+          ArrayFromJSON(utf8(), R"(["p", "q", "r"])"),
+          // Expected output
+          ArrayFromJSON(utf8(), R"(["a", "a"])"), ArrayFromJSON(utf8(), R"(["x", "y"])"),
+          ArrayFromJSON(utf8(), R"(["a", "a"])"), ArrayFromJSON(utf8(), R"(["p", "p"])"),
+          2, swap_sides);
+    }
+  }
 }
 
 TEST(HashJoin, DictNegative) {