You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2022/05/26 01:18:32 UTC
[arrow] branch master updated: ARROW-16646: [C++] Allow key columns to be scalars in Bloom filter
This is an automated email from the ASF dual-hosted git repository.
westonpace pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 5c02620ab3 ARROW-16646: [C++] Allow key columns to be scalars in Bloom filter
5c02620ab3 is described below
commit 5c02620ab34bdd94a51c2c3c7429018c0ea48359
Author: Sasha Krassovsky <kr...@gmail.com>
AuthorDate: Wed May 25 15:18:09 2022 -1000
ARROW-16646: [C++] Allow key columns to be scalars in Bloom filter
Closes #13236 from save-buffer/sasha_scalars
Authored-by: Sasha Krassovsky <kr...@gmail.com>
Signed-off-by: Weston Pace <we...@gmail.com>
---
cpp/src/arrow/compute/exec/hash_join.cc | 10 ++++++++++
cpp/src/arrow/compute/exec/hash_join_node.cc | 22 ++++++++++++----------
cpp/src/arrow/compute/exec/hash_join_node_test.cc | 18 ++++++++++++++++++
3 files changed, 40 insertions(+), 10 deletions(-)
diff --git a/cpp/src/arrow/compute/exec/hash_join.cc b/cpp/src/arrow/compute/exec/hash_join.cc
index 15a006c81d..63d9522c44 100644
--- a/cpp/src/arrow/compute/exec/hash_join.cc
+++ b/cpp/src/arrow/compute/exec/hash_join.cc
@@ -671,6 +671,11 @@ class HashJoinBasicImpl : public HashJoinImpl {
for (size_t i = 0; i < keys.size(); i++) {
int input_idx = bloom_filter_column_maps_[ifilter][i];
keys[i] = batch[input_idx];
+ if (keys[i].is_scalar()) {
+ ARROW_ASSIGN_OR_RAISE(
+ keys[i],
+ MakeArrayFromScalar(*keys[i].scalar(), batch.length, ctx_->memory_pool()));
+ }
}
ARROW_ASSIGN_OR_RAISE(ExecBatch key_batch, ExecBatch::Make(std::move(keys)));
RETURN_NOT_OK(Hashing32::HashBatch(
@@ -715,6 +720,11 @@ class HashJoinBasicImpl : public HashJoinImpl {
for (size_t i = 0; i < key_columns.size(); i++) {
int input_idx = key_to_in.get(static_cast<int>(i));
key_columns[i] = input_batch[input_idx];
+ if (key_columns[i].is_scalar()) {
+ ARROW_ASSIGN_OR_RAISE(
+ key_columns[i], MakeArrayFromScalar(*key_columns[i].scalar(),
+ input_batch.length, ctx_->memory_pool()));
+ }
}
ARROW_ASSIGN_OR_RAISE(ExecBatch key_batch, ExecBatch::Make(std::move(key_columns)));
diff --git a/cpp/src/arrow/compute/exec/hash_join_node.cc b/cpp/src/arrow/compute/exec/hash_join_node.cc
index d8d729dd1a..e47d609554 100644
--- a/cpp/src/arrow/compute/exec/hash_join_node.cc
+++ b/cpp/src/arrow/compute/exec/hash_join_node.cc
@@ -600,16 +600,18 @@ class HashJoinNode : public ExecNode {
join_type_ == JoinType::FULL_OUTER;
disable_bloom_filter_ = disable_bloom_filter_ || bloom_filter_does_not_apply_to_join;
- SchemaProjectionMap build_keys_to_input =
- schema_mgr_->proj_maps[1].map(HashJoinProjection::KEY, HashJoinProjection::INPUT);
- // Bloom filter currently doesn't support dictionaries.
- for (int i = 0; i < build_keys_to_input.num_cols; i++) {
- int idx = build_keys_to_input.get(i);
- bool is_dict =
- inputs_[1]->output_schema()->field(idx)->type()->id() == Type::DICTIONARY;
- if (is_dict) {
- disable_bloom_filter_ = true;
- break;
+ for (int side = 0; side <= 1 && !disable_bloom_filter_; side++) {
+ SchemaProjectionMap keys_to_input = schema_mgr_->proj_maps[side].map(
+ HashJoinProjection::KEY, HashJoinProjection::INPUT);
+ // Bloom filter currently doesn't support dictionaries.
+ for (int i = 0; i < keys_to_input.num_cols; i++) {
+ int idx = keys_to_input.get(i);
+ bool is_dict =
+ inputs_[side]->output_schema()->field(idx)->type()->id() == Type::DICTIONARY;
+ if (is_dict) {
+ disable_bloom_filter_ = true;
+ break;
+ }
}
}
diff --git a/cpp/src/arrow/compute/exec/hash_join_node_test.cc b/cpp/src/arrow/compute/exec/hash_join_node_test.cc
index f8da71c7b5..c4eccd68d3 100644
--- a/cpp/src/arrow/compute/exec/hash_join_node_test.cc
+++ b/cpp/src/arrow/compute/exec/hash_join_node_test.cc
@@ -1667,6 +1667,24 @@ TEST(HashJoin, Scalars) {
ArrayFromJSON(utf8(), R"(["p", "q", "p", "q", "r"])"), 1, swap_sides);
}
}
+
+ // Scalars in key columns, Inner join to exercise Bloom filter
+ for (auto use_scalar_dict : {false, true}) {
+ for (auto swap_sides : {false, true}) {
+ TestHashJoinDictionaryHelper(
+ JoinType::INNER, JoinKeyCmp::EQ, false /*parallel*/,
+ // Input
+ use_scalar_dict ? DictScalarFromJSON(int8_utf8, "1", R"(["b", "a", "c"])")
+ : ScalarFromJSON(utf8(), "\"a\""),
+ ArrayFromJSON(utf8(), R"(["x", "y"])"),
+ ArrayFromJSON(utf8(), R"(["a", null, "b"])"),
+ ArrayFromJSON(utf8(), R"(["p", "q", "r"])"),
+ // Expected output
+ ArrayFromJSON(utf8(), R"(["a", "a"])"), ArrayFromJSON(utf8(), R"(["x", "y"])"),
+ ArrayFromJSON(utf8(), R"(["a", "a"])"), ArrayFromJSON(utf8(), R"(["p", "p"])"),
+ 2, swap_sides);
+ }
+ }
}
TEST(HashJoin, DictNegative) {