You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by li...@apache.org on 2020/03/20 06:38:15 UTC
[incubator-doris] 01/01: Revert "[CodeStyle] Remove unused
PartitionedHashTable (#3156)"
This is an automated email from the ASF dual-hosted git repository.
lichaoyong pushed a commit to branch revert-3156-master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
commit cb1491b32d74950d4d4aeaa7fd732e303e9efb4e
Author: lichaoyong <li...@baidu.com>
AuthorDate: Fri Mar 20 14:38:07 2020 +0800
Revert "[CodeStyle] Remove unused PartitionedHashTable (#3156)"
This reverts commit d3fd44f0a2fe076d2c62851babc162fcebe4d63b.
---
be/src/exec/CMakeLists.txt | 2 +
be/src/exec/hash_join_node.cpp | 2 +-
be/src/exec/partitioned_hash_table.cc | 440 ++++++++++++++++++
be/src/exec/partitioned_hash_table.h | 673 ++++++++++++++++++++++++++++
be/src/exec/partitioned_hash_table.inline.h | 379 ++++++++++++++++
5 files changed, 1495 insertions(+), 1 deletion(-)
diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt
index 3c3fd71..20e00ef 100644
--- a/be/src/exec/CMakeLists.txt
+++ b/be/src/exec/CMakeLists.txt
@@ -83,6 +83,8 @@ set(EXEC_FILES
schema_scanner/schema_charsets_scanner.cpp
schema_scanner/schema_collations_scanner.cpp
schema_scanner/schema_helper.cpp
+ partitioned_hash_table.cc
+ partitioned_hash_table_ir.cc
new_partitioned_hash_table.cc
new_partitioned_hash_table_ir.cc
partitioned_aggregation_node.cc
diff --git a/be/src/exec/hash_join_node.cpp b/be/src/exec/hash_join_node.cpp
index 72db1be..d03dd3e 100644
--- a/be/src/exec/hash_join_node.cpp
+++ b/be/src/exec/hash_join_node.cpp
@@ -46,7 +46,7 @@ HashJoinNode::HashJoinNode(
_is_push_down = tnode.hash_join_node.is_push_down;
_build_unique = _join_op == TJoinOp::LEFT_ANTI_JOIN|| _join_op == TJoinOp::RIGHT_ANTI_JOIN
|| _join_op == TJoinOp::RIGHT_SEMI_JOIN || _join_op == TJoinOp::LEFT_SEMI_JOIN
- || _join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN;
+ || _join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN
}
HashJoinNode::~HashJoinNode() {
diff --git a/be/src/exec/partitioned_hash_table.cc b/be/src/exec/partitioned_hash_table.cc
new file mode 100644
index 0000000..61d9d2e
--- /dev/null
+++ b/be/src/exec/partitioned_hash_table.cc
@@ -0,0 +1,440 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/partitioned_hash_table.inline.h"
+
+#include "exprs/expr.h"
+#include "exprs/expr_context.h"
+#include "exprs/slot_ref.h"
+#include "runtime/buffered_block_mgr.h"
+#include "runtime/mem_tracker.h"
+#include "runtime/raw_value.h"
+#include "runtime/runtime_state.h"
+#include "runtime/string_value.hpp"
+#include "util/doris_metrics.h"
+
+// DEFINE_bool(enable_quadratic_probing, true, "Enable quadratic probing hash table");
+
+using std::string;
+using std::stringstream;
+using std::vector;
+using std::endl;
+
+namespace doris {
+
+// Random primes to multiply the seed with.
+static uint32_t SEED_PRIMES[] = {
+ 1, // First seed must be 1, level 0 is used by other operators in the fragment.
+ 1431655781,
+ 1183186591,
+ 622729787,
+ 472882027,
+ 338294347,
+ 275604541,
+ 41161739,
+ 29999999,
+ 27475109,
+ 611603,
+ 16313357,
+ 11380003,
+ 21261403,
+ 33393119,
+ 101,
+ 71043403
+};
+
+// Put a non-zero constant in the result location for NULL.
+// We don't want(NULL, 1) to hash to the same as (0, 1).
+// This needs to be as big as the biggest primitive type since the bytes
+// get copied directly.
+// TODO find a better approach, since primitives like CHAR(N) can be up to 128 bytes
+static int64_t NULL_VALUE[] = { HashUtil::FNV_SEED, HashUtil::FNV_SEED,
+ HashUtil::FNV_SEED, HashUtil::FNV_SEED,
+ HashUtil::FNV_SEED, HashUtil::FNV_SEED,
+ HashUtil::FNV_SEED, HashUtil::FNV_SEED,
+ HashUtil::FNV_SEED, HashUtil::FNV_SEED,
+ HashUtil::FNV_SEED, HashUtil::FNV_SEED,
+ HashUtil::FNV_SEED, HashUtil::FNV_SEED,
+ HashUtil::FNV_SEED, HashUtil::FNV_SEED };
+
+// The first NUM_SMALL_BLOCKS of _nodes are made of blocks less than the IO size (of 8MB)
+// to reduce the memory footprint of small queries. In particular, we always first use a
+// 64KB and a 512KB block before starting using IO-sized blocks.
+static const int64_t INITIAL_DATA_PAGE_SIZES[] = { 64 * 1024, 512 * 1024 };
+static const int NUM_SMALL_DATA_PAGES = sizeof(INITIAL_DATA_PAGE_SIZES) / sizeof(int64_t);
+
+PartitionedHashTableCtx::PartitionedHashTableCtx(
+ const vector<ExprContext*>& build_expr_ctxs, const vector<ExprContext*>& probe_expr_ctxs,
+ bool stores_nulls, bool finds_nulls,
+ int32_t initial_seed, int max_levels, int num_build_tuples) :
+ _build_expr_ctxs(build_expr_ctxs),
+ _probe_expr_ctxs(probe_expr_ctxs),
+ _stores_nulls(stores_nulls),
+ _finds_nulls(finds_nulls),
+ _level(0),
+ _row(reinterpret_cast<TupleRow*>(malloc(sizeof(Tuple*) * num_build_tuples))) {
+ // Compute the layout and buffer size to store the evaluated expr results
+ DCHECK_EQ(_build_expr_ctxs.size(), _probe_expr_ctxs.size());
+ DCHECK(!_build_expr_ctxs.empty());
+ _results_buffer_size = Expr::compute_results_layout(_build_expr_ctxs,
+ &_expr_values_buffer_offsets, &_var_result_begin);
+ _expr_values_buffer = new uint8_t[_results_buffer_size];
+ memset(_expr_values_buffer, 0, sizeof(uint8_t) * _results_buffer_size);
+ _expr_value_null_bits = new uint8_t[build_expr_ctxs.size()];
+
+ // Populate the seeds to use for all the levels. TODO: revisit how we generate these.
+ DCHECK_GE(max_levels, 0);
+ DCHECK_LT(max_levels, sizeof(SEED_PRIMES) / sizeof(SEED_PRIMES[0]));
+ DCHECK_NE(initial_seed, 0);
+ _seeds.resize(max_levels + 1);
+ _seeds[0] = initial_seed;
+ for (int i = 1; i <= max_levels; ++i) {
+ _seeds[i] = _seeds[i - 1] * SEED_PRIMES[i];
+ }
+}
+
+void PartitionedHashTableCtx::close() {
+ // TODO: use tr1::array?
+ DCHECK(_expr_values_buffer != NULL);
+ delete[] _expr_values_buffer;
+ _expr_values_buffer = NULL;
+ DCHECK(_expr_value_null_bits != NULL);
+ delete[] _expr_value_null_bits;
+ _expr_value_null_bits = NULL;
+ free(_row);
+ _row = NULL;
+}
+
+bool PartitionedHashTableCtx::eval_row(TupleRow* row, const vector<ExprContext*>& ctxs) {
+ bool has_null = false;
+ for (int i = 0; i < ctxs.size(); ++i) {
+ void* loc = _expr_values_buffer + _expr_values_buffer_offsets[i];
+ void* val = ctxs[i]->get_value(row);
+ if (val == NULL) {
+ // If the table doesn't store nulls, no reason to keep evaluating
+ if (!_stores_nulls) {
+ return true;
+ }
+
+ _expr_value_null_bits[i] = true;
+ val = reinterpret_cast<void*>(&NULL_VALUE);
+ has_null = true;
+ } else {
+ _expr_value_null_bits[i] = false;
+ }
+ DCHECK_LE(_build_expr_ctxs[i]->root()->type().get_slot_size(),
+ sizeof(NULL_VALUE));
+ RawValue::write(val, loc, _build_expr_ctxs[i]->root()->type(), NULL);
+ }
+ return has_null;
+}
+
+uint32_t PartitionedHashTableCtx::hash_variable_len_row() {
+ uint32_t hash_val = _seeds[_level];
+ // Hash the non-var length portions (if there are any)
+ if (_var_result_begin != 0) {
+ hash_val = hash_help(_expr_values_buffer, _var_result_begin, hash_val);
+ }
+
+ for (int i = 0; i < _build_expr_ctxs.size(); ++i) {
+ // non-string and null slots are already part of expr_values_buffer
+ // if (_build_expr_ctxs[i]->root()->type().type != TYPE_STRING &&
+ if (_build_expr_ctxs[i]->root()->type().type != TYPE_VARCHAR) {
+ continue;
+ }
+
+ void* loc = _expr_values_buffer + _expr_values_buffer_offsets[i];
+ if (_expr_value_null_bits[i]) {
+ // Hash the null random seed values at 'loc'
+ hash_val = hash_help(loc, sizeof(StringValue), hash_val);
+ } else {
+ // Hash the string
+ // TODO: when using CRC hash on empty string, this only swaps bytes.
+ StringValue* str = reinterpret_cast<StringValue*>(loc);
+ hash_val = hash_help(str->ptr, str->len, hash_val);
+ }
+ }
+ return hash_val;
+}
+
+bool PartitionedHashTableCtx::equals(TupleRow* build_row) {
+ for (int i = 0; i < _build_expr_ctxs.size(); ++i) {
+ void* val = _build_expr_ctxs[i]->get_value(build_row);
+ if (val == NULL) {
+ if (!_stores_nulls) {
+ return false;
+ }
+ if (!_expr_value_null_bits[i]) {
+ return false;
+ }
+ continue;
+ } else {
+ if (_expr_value_null_bits[i]) {
+ return false;
+ }
+ }
+
+ void* loc = _expr_values_buffer + _expr_values_buffer_offsets[i];
+ if (!RawValue::eq(loc, val, _build_expr_ctxs[i]->root()->type())) {
+ return false;
+ }
+ }
+ return true;
+}
+
+const double PartitionedHashTable::MAX_FILL_FACTOR = 0.75f;
+
+PartitionedHashTable* PartitionedHashTable::create(RuntimeState* state,
+ BufferedBlockMgr2::Client* client, int num_build_tuples,
+ BufferedTupleStream2* tuple_stream, int64_t max_num_buckets,
+ int64_t initial_num_buckets) {
+ // return new PartitionedHashTable(FLAGS_enable_quadratic_probing, state, client,
+ // num_build_tuples, tuple_stream, max_num_buckets, initial_num_buckets);
+ return new PartitionedHashTable(config::enable_quadratic_probing, state, client,
+ num_build_tuples, tuple_stream, max_num_buckets, initial_num_buckets);
+}
+
+PartitionedHashTable::PartitionedHashTable(bool quadratic_probing, RuntimeState* state,
+ BufferedBlockMgr2::Client* client, int num_build_tuples, BufferedTupleStream2* stream,
+ int64_t max_num_buckets, int64_t num_buckets) :
+ _state(state),
+ _block_mgr_client(client),
+ _tuple_stream(stream),
+ _stores_tuples(num_build_tuples == 1),
+ _quadratic_probing(quadratic_probing),
+ _total_data_page_size(0),
+ _next_node(NULL),
+ _node_remaining_current_page(0),
+ _num_duplicate_nodes(0),
+ _max_num_buckets(max_num_buckets),
+ _buckets(NULL),
+ _num_buckets(num_buckets),
+ _num_filled_buckets(0),
+ _num_buckets_with_duplicates(0),
+ _num_build_tuples(num_build_tuples),
+ _has_matches(false),
+ _num_probes(0),
+ _num_failed_probes(0),
+ _travel_length(0),
+ _num_hash_collisions(0),
+ _num_resizes(0) {
+ DCHECK_EQ((num_buckets & (num_buckets-1)), 0) << "num_buckets must be a power of 2";
+ DCHECK_GT(num_buckets, 0) << "num_buckets must be larger than 0";
+ DCHECK(_stores_tuples || stream != NULL);
+ DCHECK(client != NULL);
+}
+
+bool PartitionedHashTable::init() {
+ int64_t buckets_byte_size = _num_buckets * sizeof(Bucket);
+ if (!_state->block_mgr2()->consume_memory(_block_mgr_client, buckets_byte_size)) {
+ _num_buckets = 0;
+ return false;
+ }
+ _buckets = reinterpret_cast<Bucket*>(malloc(buckets_byte_size));
+ memset(_buckets, 0, buckets_byte_size);
+ return true;
+}
+
+void PartitionedHashTable::close() {
+ // Print statistics only for the large or heavily used hash tables.
+ // TODO: Tweak these numbers/conditions, or print them always?
+ const int64_t LARGE_HT = 128 * 1024;
+ const int64_t HEAVILY_USED = 1024 * 1024;
+ // TODO: These statistics should go to the runtime profile as well.
+ if ((_num_buckets > LARGE_HT) || (_num_probes > HEAVILY_USED)) {
+ VLOG(2) << print_stats();
+ }
+ for (int i = 0; i < _data_pages.size(); ++i) {
+ _data_pages[i]->del();
+ }
+#if 0
+ if (DorisMetrics::hash_table_total_bytes() != NULL) {
+ DorisMetrics::hash_table_total_bytes()->increment(-_total_data_page_size);
+ }
+#endif
+ _data_pages.clear();
+ if (_buckets != NULL) {
+ free(_buckets);
+ }
+ _state->block_mgr2()->release_memory(_block_mgr_client, _num_buckets * sizeof(Bucket));
+}
+
+int64_t PartitionedHashTable::current_mem_size() const {
+ return _num_buckets * sizeof(Bucket) + _num_duplicate_nodes * sizeof(DuplicateNode);
+}
+
+bool PartitionedHashTable::check_and_resize(
+ uint64_t buckets_to_fill, PartitionedHashTableCtx* ht_ctx) {
+ uint64_t shift = 0;
+ while (_num_filled_buckets + buckets_to_fill >
+ (_num_buckets << shift) * MAX_FILL_FACTOR) {
+ // TODO: next prime instead of double?
+ ++shift;
+ }
+ if (shift > 0) {
+ return resize_buckets(_num_buckets << shift, ht_ctx);
+ }
+ return true;
+}
+
+bool PartitionedHashTable::resize_buckets(int64_t num_buckets, PartitionedHashTableCtx* ht_ctx) {
+ DCHECK_EQ((num_buckets & (num_buckets-1)), 0)
+ << "num_buckets=" << num_buckets << " must be a power of 2";
+ DCHECK_GT(num_buckets, _num_filled_buckets) << "Cannot shrink the hash table to "
+ "smaller number of buckets than the number of filled buckets.";
+ VLOG(2) << "Resizing hash table from "
+ << _num_buckets << " to " << num_buckets << " buckets.";
+ if (_max_num_buckets != -1 && num_buckets > _max_num_buckets) {
+ return false;
+ }
+ ++_num_resizes;
+
+ // All memory that can grow proportional to the input should come from the block mgrs
+ // mem tracker.
+ // Note that while we copying over the contents of the old hash table, we need to have
+ // allocated both the old and the new hash table. Once we finish, we return the memory
+ // of the old hash table.
+ int64_t old_size = _num_buckets * sizeof(Bucket);
+ int64_t new_size = num_buckets * sizeof(Bucket);
+ if (!_state->block_mgr2()->consume_memory(_block_mgr_client, new_size)) {
+ return false;
+ }
+ Bucket* new_buckets = reinterpret_cast<Bucket*>(malloc(new_size));
+ DCHECK(new_buckets != NULL);
+ memset(new_buckets, 0, new_size);
+
+ // Walk the old table and copy all the filled buckets to the new (resized) table.
+ // We do not have to do anything with the duplicate nodes. This operation is expected
+ // to succeed.
+ for (PartitionedHashTable::Iterator iter = begin(ht_ctx); !iter.at_end();
+ next_filled_bucket(&iter._bucket_idx, &iter._node)) {
+ Bucket* bucket_to_copy = &_buckets[iter._bucket_idx];
+ bool found = false;
+ int64_t bucket_idx = probe(new_buckets, num_buckets, NULL, bucket_to_copy->hash, &found);
+ DCHECK(!found);
+ DCHECK_NE(bucket_idx, Iterator::BUCKET_NOT_FOUND) << " Probe failed even though "
+ " there are free buckets. " << num_buckets << " " << _num_filled_buckets;
+ Bucket* dst_bucket = &new_buckets[bucket_idx];
+ *dst_bucket = *bucket_to_copy;
+ }
+
+ _num_buckets = num_buckets;
+ free(_buckets);
+ _buckets = new_buckets;
+ _state->block_mgr2()->release_memory(_block_mgr_client, old_size);
+ return true;
+}
+
+bool PartitionedHashTable::grow_node_array() {
+ int64_t page_size = 0;
+ page_size = _state->block_mgr2()->max_block_size();
+ if (_data_pages.size() < NUM_SMALL_DATA_PAGES) {
+ page_size = std::min(page_size, INITIAL_DATA_PAGE_SIZES[_data_pages.size()]);
+ }
+ BufferedBlockMgr2::Block* block = NULL;
+ Status status = _state->block_mgr2()->get_new_block(
+ _block_mgr_client, NULL, &block, page_size);
+ DCHECK(status.ok() || block == NULL);
+ if (block == NULL) {
+ return false;
+ }
+ _data_pages.push_back(block);
+ _next_node = block->allocate<DuplicateNode>(page_size);
+#if 0
+ if (DorisMetrics::hash_table_total_bytes() != NULL) {
+ DorisMetrics::hash_table_total_bytes()->increment(page_size);
+ }
+#endif
+ _node_remaining_current_page = page_size / sizeof(DuplicateNode);
+ _total_data_page_size += page_size;
+ return true;
+}
+
+void PartitionedHashTable::debug_string_tuple(
+ stringstream& ss, HtData& htdata, const RowDescriptor* desc) {
+ if (_stores_tuples) {
+ ss << "(" << htdata.tuple << ")";
+ } else {
+ ss << "(" << htdata.idx.block() << ", " << htdata.idx.idx()
+ << ", " << htdata.idx.offset() << ")";
+ }
+ if (desc != NULL) {
+ Tuple* row[_num_build_tuples];
+ ss << " " << get_row(htdata, reinterpret_cast<TupleRow*>(row))->to_string(*desc);
+ }
+}
+
+string PartitionedHashTable::debug_string(
+ bool skip_empty, bool show_match, const RowDescriptor* desc) {
+ stringstream ss;
+ ss << endl;
+ for (int i = 0; i < _num_buckets; ++i) {
+ if (skip_empty && !_buckets[i].filled) {
+ continue;
+ }
+ ss << i << ": ";
+ if (show_match) {
+ if (_buckets[i].matched) {
+ ss << " [M]";
+ } else {
+ ss << " [U]";
+ }
+ }
+ if (_buckets[i].hasDuplicates) {
+ DuplicateNode* node = _buckets[i].bucketData.duplicates;
+ bool first = true;
+ ss << " [D] ";
+ while (node != NULL) {
+ if (!first) {
+ ss << ",";
+ }
+ debug_string_tuple(ss, node->htdata, desc);
+ node = node->next;
+ first = false;
+ }
+ } else {
+ ss << " [B] ";
+ if (_buckets[i].filled) {
+ debug_string_tuple(ss, _buckets[i].bucketData.htdata, desc);
+ } else {
+ ss << " - ";
+ }
+ }
+ ss << endl;
+ }
+ return ss.str();
+}
+
+string PartitionedHashTable::print_stats() const {
+ double curr_fill_factor = (double)_num_filled_buckets / (double)_num_buckets;
+ double avg_travel = (double)_travel_length / (double)_num_probes;
+ double avg_collisions = (double)_num_hash_collisions / (double)_num_filled_buckets;
+ stringstream ss;
+ ss << "Buckets: " << _num_buckets << " " << _num_filled_buckets << " "
+ << curr_fill_factor << endl;
+ ss << "Duplicates: " << _num_buckets_with_duplicates << " buckets "
+ << _num_duplicate_nodes << " nodes" << endl;
+ ss << "Probes: " << _num_probes << endl;
+ ss << "FailedProbes: " << _num_failed_probes << endl;
+ ss << "Travel: " << _travel_length << " " << avg_travel << endl;
+ ss << "HashCollisions: " << _num_hash_collisions << " " << avg_collisions << endl;
+ ss << "Resizes: " << _num_resizes << endl;
+ return ss.str();
+}
+
+} // namespace doris
+
diff --git a/be/src/exec/partitioned_hash_table.h b/be/src/exec/partitioned_hash_table.h
new file mode 100644
index 0000000..94f6c98
--- /dev/null
+++ b/be/src/exec/partitioned_hash_table.h
@@ -0,0 +1,673 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef DORIS_BE_SRC_EXEC_PARTITIONED_HASH_TABLE_H
+#define DORIS_BE_SRC_EXEC_PARTITIONED_HASH_TABLE_H
+
+#include <vector>
+#include <boost/cstdint.hpp>
+#include <boost/scoped_ptr.hpp>
+
+#include "codegen/doris_ir.h"
+#include "util/logging.h"
+#include "runtime/buffered_block_mgr2.h"
+#include "runtime/buffered_tuple_stream2.h"
+#include "runtime/buffered_tuple_stream2.inline.h"
+#include "runtime/mem_tracker.h"
+#include "runtime/mem_tracker.h"
+#include "runtime/tuple_row.h"
+#include "util/hash_util.hpp"
+#include "util/bit_util.h"
+
+namespace doris {
+
+class Expr;
+class ExprContext;
+class MemTracker;
+class MemTracker;
+class RowDescriptor;
+class RuntimeState;
+class Tuple;
+class TupleRow;
+class PartitionedHashTable;
+
+// Linear or quadratic probing hash table implementation tailored to the usage pattern
+// for partitioned hash aggregation and hash joins. The hash table stores TupleRows and
+// allows for different exprs for insertions and finds. This is the pattern we use for
+// joins and aggregation where the input/build tuple row descriptor is different from the
+// find/probe descriptor. The implementation is designed to allow codegen for some paths.
+//
+// In addition to the hash table there is also an accompanying hash table context that
+// is used for insertions and probes. For example, the hash table context stores
+// evaluated expr results for the current row being processed when possible into a
+// contiguous memory buffer. This allows for efficient hash computation.
+//
+// The hash table does not support removes. The hash table is not thread safe.
+// The table is optimized for the partition hash aggregation and hash joins and is not
+// intended to be a generic hash table implementation. The API loosely mimics the
+// std::hashset API.
+//
+// The data (rows) are stored in a BufferedTupleStream2. The basic data structure of this
+// hash table is a vector of buckets. The buckets (indexed by the mod of the hash)
+// contain a pointer to either the slot in the tuple-stream or in case of duplicate
+// values, to the head of a linked list of nodes that in turn contain a pointer to
+// tuple-stream slots. When inserting an entry we start at the bucket at position
+// (hash % size) and search for either a bucket with the same hash or for an empty
+// bucket. If a bucket with the same hash is found, we then compare for row equality and
+// either insert a duplicate node if the equality is true, or continue the search if the
+// row equality is false. Similarly, when probing we start from the bucket at position
+// (hash % size) and search for an entry with the same hash or for an empty bucket.
+// In the former case, we then check for row equality and continue the search if the row
+// equality is false. In the latter case, the probe is not successful. When growing the
+// hash table, the number of buckets is doubled. We trigger a resize when the fill
+// factor is approx 75%. Due to the doubling nature of the buckets, we require that the
+// number of buckets is a power of 2. This allows us to perform a modulo of the hash
+// using a bitmask.
+//
+// We choose to use linear or quadratic probing because they exhibit good (predictable)
+// cache behavior.
+//
+// The first NUM_SMALL_BLOCKS of _nodes are made of blocks less than the IO size (of 8MB)
+// to reduce the memory footprint of small queries.
+//
+// TODO: Compare linear and quadratic probing and remove the loser.
+// TODO: We currently use 32-bit hashes. There is room in the bucket structure for at
+// least 48-bits. We should exploit this space.
+// TODO: Consider capping the probes with a threshold value. If an insert reaches
+// that threshold it is inserted to another linked list of overflow entries.
+// TODO: Smarter resizes, and perhaps avoid using powers of 2 as the hash table size.
+// TODO: this is not a fancy hash table in terms of memory access patterns
+// (cuckoo-hashing or something that spills to disk). We will likely want to invest
+// more time into this.
+// TODO: hash-join and aggregation have very different access patterns. Joins insert
+// all the rows and then calls scan to find them. Aggregation interleaves find() and
+// Inserts(). We may want to optimize joins more heavily for Inserts() (in particular
+// growing).
+// TODO: Batched interface for inserts and finds.
+// TODO: Do we need to check mem limit exceeded so often. Check once per batch?
+
+// Control block for a hash table. This class contains the logic as well as the variables
+// needed by a thread to operate on a hash table.
+class PartitionedHashTableCtx {
+public:
+ // Create a hash table context.
+ // - build_exprs are the exprs that should be used to evaluate rows during insert().
+ // - probe_exprs are used during find()
+ // - stores_nulls: if false, TupleRows with nulls are ignored during insert
+ // - finds_nulls: if false, find() returns End() for TupleRows with nulls
+ // even if stores_nulls is true
+ // - initial_seed: Initial seed value to use when computing hashes for rows with
+ // level 0. Other levels have their seeds derived from this seed.
+ // - The max levels we will hash with.
+ PartitionedHashTableCtx(const std::vector<ExprContext*>& build_expr_ctxs,
+ const std::vector<ExprContext*>& probe_expr_ctxs, bool stores_nulls,
+ bool finds_nulls, int32_t initial_seed, int max_levels,
+ int num_build_tuples);
+
+ // Call to cleanup any resources.
+ void close();
+
+ void set_level(int level);
+ int level() const { return _level; }
+ uint32_t seed(int level) { return _seeds.at(level); }
+
+ TupleRow* row() const { return _row; }
+
+ // Returns the results of the exprs at 'expr_idx' evaluated over the last row
+ // processed.
+ // This value is invalid if the expr evaluated to NULL.
+ // TODO: this is an awkward abstraction but aggregation node can take advantage of
+ // it and save some expr evaluation calls.
+ void* last_expr_value(int expr_idx) const {
+ return _expr_values_buffer + _expr_values_buffer_offsets[expr_idx];
+ }
+
+ // Returns if the expr at 'expr_idx' evaluated to NULL for the last row.
+ bool last_expr_value_null(int expr_idx) const {
+ return _expr_value_null_bits[expr_idx];
+ }
+
+ // Evaluate and hash the build/probe row, returning in *hash. Returns false if this
+ // row should be rejected (doesn't need to be processed further) because it
+ // contains NULL.
+ // These need to be inlined in the IR module so we can find and replace the calls to
+ // EvalBuildRow()/EvalProbeRow().
+ bool IR_ALWAYS_INLINE eval_and_hash_build(TupleRow* row, uint32_t* hash);
+ bool IR_ALWAYS_INLINE eval_and_hash_probe(TupleRow* row, uint32_t* hash);
+
+ int results_buffer_size() const { return _results_buffer_size; }
+
+private:
+ friend class PartitionedHashTable;
+ friend class PartitionedHashTableTest_HashEmpty_Test;
+
+ // Compute the hash of the values in _expr_values_buffer.
+ // This will be replaced by codegen. We don't want this inlined for replacing
+ // with codegen'd functions so the function name does not change.
+ uint32_t IR_NO_INLINE HashCurrentRow() {
+ DCHECK_LT(_level, _seeds.size());
+ if (_var_result_begin == -1) {
+ // This handles NULLs implicitly since a constant seed value was put
+ // into results buffer for nulls.
+ // TODO: figure out which hash function to use. We need to generate uncorrelated
+ // hashes by changing just the seed. CRC does not have this property and FNV is
+ // okay. We should switch to something else.
+ return hash_help(_expr_values_buffer, _results_buffer_size, _seeds[_level]);
+ } else {
+ return PartitionedHashTableCtx::hash_variable_len_row();
+ }
+ }
+
+ // Wrapper function for calling correct HashUtil function in non-codegen'd case.
+ uint32_t inline hash_help(const void* input, int len, int32_t hash) {
+ // Use CRC hash at first level for better performance. Switch to murmur hash at
+ // subsequent levels since CRC doesn't randomize well with different seed inputs.
+ if (_level == 0) {
+ return HashUtil::hash(input, len, hash);
+ }
+ return HashUtil::murmur_hash2_64(input, len, hash);
+ }
+
+ // Evaluate 'row' over build exprs caching the results in '_expr_values_buffer' This
+ // will be replaced by codegen. We do not want this function inlined when cross
+ // compiled because we need to be able to differentiate between EvalBuildRow and
+ // EvalProbeRow by name and the build/probe exprs are baked into the codegen'd
+ // function.
+ bool IR_NO_INLINE EvalBuildRow(TupleRow* row) {
+ return eval_row(row, _build_expr_ctxs);
+ }
+
+ // Evaluate 'row' over probe exprs caching the results in '_expr_values_buffer'
+ // This will be replaced by codegen.
+ bool IR_NO_INLINE EvalProbeRow(TupleRow* row) {
+ return eval_row(row, _probe_expr_ctxs);
+ }
+
+ // Compute the hash of the values in _expr_values_buffer for rows with variable length
+ // fields (e.g. strings).
+ uint32_t hash_variable_len_row();
+
+ // Evaluate the exprs over row and cache the results in '_expr_values_buffer'.
+ // Returns whether any expr evaluated to NULL.
+ // This will be replaced by codegen.
+ bool eval_row(TupleRow* row, const std::vector<ExprContext*>& ctxs);
+
+ // Returns true if the values of build_exprs evaluated over 'build_row' equal
+ // the values cached in _expr_values_buffer.
+ // This will be replaced by codegen.
+ bool IR_NO_INLINE equals(TupleRow* build_row);
+
+ const std::vector<ExprContext*>& _build_expr_ctxs;
+ const std::vector<ExprContext*>& _probe_expr_ctxs;
+
+ // Constants on how the hash table should behave. Joins and aggs have slightly
+ // different behavior.
+ // TODO: these constants are an ideal candidate to be removed with codegen.
+ // TODO: ..or with template-ization
+ const bool _stores_nulls;
+ const bool _finds_nulls;
+
+ // The current level this context is working on. Each level needs to use a
+ // different seed.
+ int _level;
+
+ // The seeds to use for hashing. Indexed by the level.
+ std::vector<uint32_t> _seeds;
+
+ // Cache of exprs values for the current row being evaluated. This can either
+ // be a build row (during insert()) or probe row (during find()).
+ std::vector<int> _expr_values_buffer_offsets;
+
+ // Byte offset into '_expr_values_buffer' that begins the variable length results.
+ // If -1, there are no variable length slots. Never changes once set, can be removed
+ // with codegen.
+ int _var_result_begin;
+
+ // Byte size of '_expr_values_buffer'. Never changes once set, can be removed with
+ // codegen.
+ int _results_buffer_size;
+
+ // Buffer to store evaluated expr results. This address must not change once
+ // allocated since the address is baked into the codegen.
+ uint8_t* _expr_values_buffer;
+
+ // Use bytes instead of bools to be compatible with llvm. This address must
+ // not change once allocated.
+ uint8_t* _expr_value_null_bits;
+
+ // Scratch buffer to generate rows on the fly.
+ TupleRow* _row;
+
+ // Cross-compiled functions to access member variables used in codegen_hash_current_row().
+ uint32_t get_hash_seed() const;
+};
+
+// The hash table consists of a contiguous array of buckets that contain a pointer to the
+// data, the hash value and three flags: whether this bucket is filled, whether this
+// entry has been matched (used in right and full joins) and whether this entry has
+// duplicates. If there are duplicates, then the data is pointing to the head of a
+// linked list of duplicate nodes that point to the actual data. Note that the duplicate
+// nodes do not contain the hash value, because all the linked nodes have the same hash
+// value, the one in the bucket. The data is either a tuple stream index or a Tuple*.
+// This array of buckets is sparse, we are shooting for up to 3/4 fill factor (75%). The
+// data allocated by the hash table comes from the BufferedBlockMgr2.
+class PartitionedHashTable {
+private:
+
+ // Either the row in the tuple stream or a pointer to the single tuple of this row.
+ union HtData {
+ BufferedTupleStream2::RowIdx idx;
+ Tuple* tuple;
+ };
+
+ // Linked list of entries used for duplicates.
+ struct DuplicateNode {
+ // Used for full outer and right {outer, anti, semi} joins. Indicates whether the
+ // row in the DuplicateNode has been matched.
+ // From an abstraction point of view, this is an awkward place to store this
+ // information.
+ // TODO: Fold this flag in the next pointer below.
+ bool matched;
+
+ // Chain to next duplicate node, NULL when end of list.
+ DuplicateNode* next;
+ HtData htdata;
+ };
+
+ struct Bucket {
+ // Whether this bucket contains a vaild entry, or it is empty.
+ bool filled;
+
+ // Used for full outer and right {outer, anti, semi} joins. Indicates whether the
+ // row in the bucket has been matched.
+ // From an abstraction point of view, this is an awkward place to store this
+ // information but it is efficient. This space is otherwise unused.
+ bool matched;
+
+ // Used in case of duplicates. If true, then the bucketData union should be used as
+ // 'duplicates'.
+ bool hasDuplicates;
+
+ // Cache of the hash for data.
+ // TODO: Do we even have to cache the hash value?
+ uint32_t hash;
+
+ // Either the data for this bucket or the linked list of duplicates.
+ union {
+ HtData htdata;
+ DuplicateNode* duplicates;
+ } bucketData;
+ };
+
+public:
+ class Iterator;
+
+ // Returns a newly allocated PartitionedHashTable. The probing algorithm is set by the
+ // FLAG_enable_quadratic_probing.
+ // - client: block mgr client to allocate data pages from.
+ // - num_build_tuples: number of Tuples in the build tuple row.
+ // - tuple_stream: the tuple stream which contains the tuple rows index by the
+ // hash table. Can be NULL if the rows contain only a single tuple, in which
+ // case the 'tuple_stream' is unused.
+ // - max_num_buckets: the maximum number of buckets that can be stored. If we
+ // try to grow the number of buckets to a larger number, the inserts will fail.
+ // -1, if it unlimited.
+ // - initial_num_buckets: number of buckets that the hash table should be initialized
+ // with.
+ static PartitionedHashTable* create(RuntimeState* state, BufferedBlockMgr2::Client* client,
+ int num_build_tuples, BufferedTupleStream2* tuple_stream, int64_t max_num_buckets,
+ int64_t initial_num_buckets);
+
+ // Allocates the initial bucket structure. Returns false if OOM.
+ bool init();
+
+ // Call to cleanup any resources. Must be called once.
+ void close();
+
+ // Inserts the row to the hash table. Returns true if the insertion was successful.
+ // Always returns true if the table has free buckets and the key is not a duplicate.
+ // The caller is responsible for ensuring that the table has free buckets
+ // 'idx' is the index into _tuple_stream for this row. If the row contains more than
+ // one tuple, the 'idx' is stored instead of the 'row'. The 'row' is not copied by the
+ // hash table and the caller must guarantee it stays in memory. This will not grow the
+ // hash table. In the case that there is a need to insert a duplicate node, instead of
+ // filling a new bucket, and there is not enough memory to insert a duplicate node,
+ // the insert fails and this function returns false.
+ // Used during the build phase of hash joins.
+ bool IR_ALWAYS_INLINE insert(PartitionedHashTableCtx* ht_ctx,
+ const BufferedTupleStream2::RowIdx& idx, TupleRow* row, uint32_t hash);
+
+ // Same as insert() but for inserting a single Tuple. The 'tuple' is not copied by
+ // the hash table and the caller must guarantee it stays in memory.
+ bool IR_ALWAYS_INLINE insert(PartitionedHashTableCtx* ht_ctx, Tuple* tuple, uint32_t hash);
+
+ // Returns an iterator to the bucket matching the last row evaluated in 'ht_ctx'.
+ // Returns PartitionedHashTable::End() if no match is found. The iterator can be iterated until
+ // PartitionedHashTable::End() to find all the matching rows. Advancing the returned iterator will
+ // go to the next matching row. The matching rows do not need to be evaluated since all
+ // the nodes of a bucket are duplicates. One scan can be in progress for each 'ht_ctx'.
+ // Used during the probe phase of hash joins.
+ Iterator IR_ALWAYS_INLINE find(PartitionedHashTableCtx* ht_ctx, uint32_t hash);
+
+ // If a match is found in the table, return an iterator as in find(). If a match was
+ // not present, return an iterator pointing to the empty bucket where the key should
+ // be inserted. Returns End() if the table is full. The caller can set the data in
+ // the bucket using a Set*() method on the iterator.
+ Iterator IR_ALWAYS_INLINE find_bucket(PartitionedHashTableCtx* ht_ctx, uint32_t hash,
+ bool* found);
+
+ // Returns number of elements inserted in the hash table
+ int64_t size() const {
+ return _num_filled_buckets - _num_buckets_with_duplicates + _num_duplicate_nodes;
+ }
+
+ // Returns the number of empty buckets.
+ int64_t empty_buckets() const { return _num_buckets - _num_filled_buckets; }
+
+ // Returns the number of buckets
+ int64_t num_buckets() const { return _num_buckets; }
+
+ // Returns the load factor (the number of non-empty buckets)
+ double load_factor() const {
+ return static_cast<double>(_num_filled_buckets) / _num_buckets;
+ }
+
+ // Returns an estimate of the number of bytes needed to build the hash table
+ // structure for 'num_rows'. To do that, it estimates the number of buckets,
+ // rounded up to a power of two, and also assumes that there are no duplicates.
+ static int64_t EstimateNumBuckets(int64_t num_rows) {
+ // Assume max 66% fill factor and no duplicates.
+ return BitUtil::next_power_of_two(3 * num_rows / 2);
+ }
+ static int64_t EstimateSize(int64_t num_rows) {
+ int64_t num_buckets = EstimateNumBuckets(num_rows);
+ return num_buckets * sizeof(Bucket);
+ }
+
+ // Returns the memory occupied by the hash table, takes into account the number of
+ // duplicates.
+ int64_t current_mem_size() const;
+
+ // Calculates the fill factor if 'buckets_to_fill' additional buckets were to be
+ // filled and resizes the hash table so that the projected fill factor is below the
+ // max fill factor.
+ // If it returns true, then it is guaranteed at least 'rows_to_add' rows can be
+ // inserted without need to resize.
+ bool check_and_resize(uint64_t buckets_to_fill, PartitionedHashTableCtx* ht_ctx);
+
+ // Returns the number of bytes allocated to the hash table
+ int64_t byte_size() const { return _total_data_page_size; }
+
+ // Returns an iterator at the beginning of the hash table. Advancing this iterator
+ // will traverse all elements.
+ Iterator begin(PartitionedHashTableCtx* ht_ctx);
+
+ // Return an iterator pointing to the first element (Bucket or DuplicateNode, if the
+ // bucket has duplicates) in the hash table that does not have its matched flag set.
+ // Used in right joins and full-outer joins.
+ Iterator first_unmatched(PartitionedHashTableCtx* ctx);
+
+ // Return true if there was a least one match.
+ bool HasMatches() const { return _has_matches; }
+
+ // Return end marker.
+ Iterator End() { return Iterator(); }
+
+ // Dump out the entire hash table to string. If 'skip_empty', empty buckets are
+ // skipped. If 'show_match', it also prints the matched flag of each node. If
+ // 'build_desc' is non-null, the build rows will be printed. Otherwise, only the
+ // the addresses of the build rows will be printed.
+ std::string debug_string(bool skip_empty, bool show_match,
+ const RowDescriptor* build_desc);
+
+ // Print the content of a bucket or node.
+ void debug_string_tuple(std::stringstream& ss, HtData& htdata, const RowDescriptor* desc);
+
+ // Update and print some statistics that can be used for performance debugging.
+ std::string print_stats() const;
+
+ // stl-like iterator interface.
+ class Iterator {
+ private:
+ // Bucket index value when probe is not successful.
+ static const int64_t BUCKET_NOT_FOUND = -1;
+
+ public:
+
+ Iterator() : _table(NULL), _row(NULL), _bucket_idx(BUCKET_NOT_FOUND), _node(NULL) { }
+
+ // Iterates to the next element. It should be called only if !AtEnd().
+ void IR_ALWAYS_INLINE next();
+
+ // Iterates to the next duplicate node. If the bucket does not have duplicates or
+ // when it reaches the last duplicate node, then it moves the Iterator to AtEnd().
+ // Used when we want to iterate over all the duplicate nodes bypassing the next()
+ // interface (e.g. in semi/outer joins without other_join_conjuncts, in order to
+ // iterate over all nodes of an unmatched bucket).
+ void IR_ALWAYS_INLINE next_duplicate();
+
+ // Iterates to the next element that does not have its matched flag set. Used in
+ // right-outer and full-outer joins.
+ void next_unmatched();
+
+ // Return the current row or tuple. Callers must check the iterator is not AtEnd()
+ // before calling them. The returned row is owned by the iterator and valid until
+ // the next call to get_row(). It is safe to advance the iterator.
+ TupleRow* get_row() const;
+ Tuple* get_tuple() const;
+
+ // Set the current tuple for an empty bucket. Designed to be used with the
+ // iterator returned from find_bucket() in the case when the value is not found.
+ // It is not valid to call this function if the bucket already has an entry.
+ void set_tuple(Tuple* tuple, uint32_t hash);
+
+ // Sets as matched the Bucket or DuplicateNode currently pointed by the iterator,
+ // depending on whether the bucket has duplicates or not. The iterator cannot be
+ // AtEnd().
+ void set_matched();
+
+ // Returns the 'matched' flag of the current Bucket or DuplicateNode, depending on
+ // whether the bucket has duplicates or not. It should be called only if !AtEnd().
+ bool is_matched() const;
+
+ // Resets everything but the pointer to the hash table.
+ void set_at_end();
+
+ // Returns true if this iterator is at the end, i.e. get_row() cannot be called.
+ bool at_end() const { return _bucket_idx == BUCKET_NOT_FOUND; }
+
+ private:
+ friend class PartitionedHashTable;
+
+ Iterator(PartitionedHashTable* table, TupleRow* row, int bucket_idx, DuplicateNode* node)
+ : _table(table),
+ _row(row),
+ _bucket_idx(bucket_idx),
+ _node(node) {
+ }
+
+ PartitionedHashTable* _table;
+ TupleRow* _row;
+
+ // Current bucket idx.
+ // TODO: Use uint32_t?
+ int64_t _bucket_idx;
+
+ // Pointer to the current duplicate node.
+ DuplicateNode* _node;
+ };
+
+private:
+ friend class Iterator;
+ friend class PartitionedHashTableTest;
+
+ // Hash table constructor. Private because Create() should be used, instead
+ // of calling this constructor directly.
+ // - quadratic_probing: set to true when the probing algorithm is quadratic, as
+ // opposed to linear.
+ PartitionedHashTable(bool quadratic_probing, RuntimeState* state, BufferedBlockMgr2::Client* client,
+ int num_build_tuples, BufferedTupleStream2* tuple_stream,
+ int64_t max_num_buckets, int64_t initial_num_buckets);
+
+ // Performs the probing operation according to the probing algorithm (linear or
+ // quadratic. Returns one of the following:
+ // (a) the index of the bucket that contains the entry that matches with the last row
+ // evaluated in 'ht_ctx'. If 'ht_ctx' is NULL then it does not check for row
+ // equality and returns the index of the first empty bucket.
+ // (b) the index of the first empty bucket according to the probing algorithm (linear
+ // or quadratic), if the entry is not in the hash table or 'ht_ctx' is NULL.
+ // (c) Iterator::BUCKET_NOT_FOUND if the probe was not successful, i.e. the maximum
+ // distance was traveled without finding either an empty or a matching bucket.
+ // Using the returned index value, the caller can create an iterator that can be
+ // iterated until End() to find all the matching rows.
+ // EvalAndHashBuild() or EvalAndHashProb(e) must have been called before calling this.
+ // 'hash' must be the hash returned by these functions.
+ // 'found' indicates that a bucket that contains an equal row is found.
+ //
+ // There are wrappers of this function that perform the find and insert logic.
+ int64_t IR_ALWAYS_INLINE probe(Bucket* buckets, int64_t num_buckets,
+ PartitionedHashTableCtx* ht_ctx, uint32_t hash, bool* found);
+
+ // Performs the insert logic. Returns the HtData* of the bucket or duplicate node
+ // where the data should be inserted. Returns NULL if the insert was not successful.
+ HtData* IR_ALWAYS_INLINE insert_internal(PartitionedHashTableCtx* ht_ctx, uint32_t hash);
+
+ // Updates 'bucket_idx' to the index of the next non-empty bucket. If the bucket has
+ // duplicates, 'node' will be pointing to the head of the linked list of duplicates.
+ // Otherwise, 'node' should not be used. If there are no more buckets, sets
+ // 'bucket_idx' to BUCKET_NOT_FOUND.
+ void next_filled_bucket(int64_t* bucket_idx, DuplicateNode** node);
+
+ // Resize the hash table to 'num_buckets'. Returns false on OOM.
+ bool resize_buckets(int64_t num_buckets, PartitionedHashTableCtx* ht_ctx);
+
+ // Appends the DuplicateNode pointed by _next_node to 'bucket' and moves the _next_node
+ // pointer to the next DuplicateNode in the page, updating the remaining node counter.
+ DuplicateNode* IR_ALWAYS_INLINE append_next_node(Bucket* bucket);
+
+ // Creates a new DuplicateNode for a entry and chains it to the bucket with index
+ // 'bucket_idx'. The duplicate nodes of a bucket are chained as a linked list.
+ // This places the new duplicate node at the beginning of the list. If this is the
+ // first duplicate entry inserted in this bucket, then the entry already contained by
+ // the bucket is converted to a DuplicateNode. That is, the contents of 'data' of the
+ // bucket are copied to a DuplicateNode and 'data' is updated to pointing to a
+ // DuplicateNode.
+ // Returns NULL if the node array could not grow, i.e. there was not enough memory to
+ // allocate a new DuplicateNode.
+ DuplicateNode* IR_ALWAYS_INLINE insert_duplicate_node(int64_t bucket_idx);
+
+ // Resets the contents of the empty bucket with index 'bucket_idx', in preparation for
+ // an insert. Sets all the fields of the bucket other than 'data'.
+ void IR_ALWAYS_INLINE prepare_bucket_for_insert(int64_t bucket_idx, uint32_t hash);
+
+ // Return the TupleRow pointed by 'htdata'.
+ TupleRow* get_row(HtData& htdata, TupleRow* row) const;
+
+ // Returns the TupleRow of the pointed 'bucket'. In case of duplicates, it
+ // returns the content of the first chained duplicate node of the bucket.
+ TupleRow* get_row(Bucket* bucket, TupleRow* row) const;
+
+ // Grow the node array. Returns false on OOM.
+ bool grow_node_array();
+
+ // Load factor that will trigger growing the hash table on insert. This is
+ // defined as the number of non-empty buckets / total_buckets
+ static const double MAX_FILL_FACTOR;
+
+ RuntimeState* _state;
+
+ // Client to allocate data pages with.
+ BufferedBlockMgr2::Client* _block_mgr_client;
+
+ // Stream contains the rows referenced by the hash table. Can be NULL if the
+ // row only contains a single tuple, in which case the TupleRow indirection
+ // is removed by the hash table.
+ BufferedTupleStream2* _tuple_stream;
+
+ // Constants on how the hash table should behave. Joins and aggs have slightly
+ // different behavior.
+ // TODO: these constants are an ideal candidate to be removed with codegen.
+ // TODO: ..or with template-ization
+ const bool _stores_tuples;
+
+ // Quadratic probing enabled (as opposed to linear).
+ const bool _quadratic_probing;
+
+ // Data pages for all nodes. These are always pinned.
+ std::vector<BufferedBlockMgr2::Block*> _data_pages;
+
+ // Byte size of all buffers in _data_pages.
+ int64_t _total_data_page_size;
+
+ // Next duplicate node to insert. Vaild when _node_remaining_current_page > 0.
+ DuplicateNode* _next_node;
+
+ // Number of nodes left in the current page.
+ int _node_remaining_current_page;
+
+ // Number of duplicate nodes.
+ int64_t _num_duplicate_nodes;
+
+ const int64_t _max_num_buckets;
+
+ // Array of all buckets. Owned by this node. Using c-style array to control
+ // control memory footprint.
+ Bucket* _buckets;
+
+ // Total number of buckets (filled and empty).
+ int64_t _num_buckets;
+
+ // Number of non-empty buckets. Used to determine when to resize.
+ int64_t _num_filled_buckets;
+
+ // Number of (non-empty) buckets with duplicates. These buckets do not point to slots
+ // in the tuple stream, rather than to a linked list of Nodes.
+ int64_t _num_buckets_with_duplicates;
+
+ // Number of build tuples, used for constructing temp row* for probes.
+ // TODO: We should remove it.
+ const int _num_build_tuples;
+
+ // Flag used to disable spilling hash tables that already had matches in case of
+ // right joins (IMPALA-1488).
+ // TODO: Not fail when spilling hash tables with matches in right joins
+ bool _has_matches;
+
+ // The stats below can be used for debugging perf.
+ // TODO: Should we make these statistics atomic?
+ // Number of find(), insert(), or find_bucket() calls that probe the hash table.
+ int64_t _num_probes;
+
+ // Number of probes that failed and had to fall back to linear probing without cap.
+ int64_t _num_failed_probes;
+
+ // Total distance traveled for each probe. That is the sum of the diff between the end
+ // position of a probe (find/insert) and its start position
+ // (hash & (_num_buckets - 1)).
+ int64_t _travel_length;
+
+ // The number of cases where we had to compare buckets with the same hash value, but
+ // the row equality failed.
+ int64_t _num_hash_collisions;
+
+ // How many times this table has resized so far.
+ int64_t _num_resizes;
+};
+
+} // end namespace doris
+
+#endif // DORIS_BE_SRC_EXEC_PARTITIONED_HASH_TABLE_H
diff --git a/be/src/exec/partitioned_hash_table.inline.h b/be/src/exec/partitioned_hash_table.inline.h
new file mode 100644
index 0000000..09abc3d
--- /dev/null
+++ b/be/src/exec/partitioned_hash_table.inline.h
@@ -0,0 +1,379 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef DORIS_BE_SRC_EXEC_PARTITIONED_HASH_TABLE_INLINE_H
+#define DORIS_BE_SRC_EXEC_PARTITIONED_HASH_TABLE_INLINE_H
+
+#include "exec/partitioned_hash_table.h"
+
+namespace doris {
+
+inline bool PartitionedHashTableCtx::eval_and_hash_build(TupleRow* row, uint32_t* hash) {
+ bool has_null = EvalBuildRow(row);
+ if (!_stores_nulls && has_null) {
+ return false;
+ }
+ *hash = HashCurrentRow();
+ return true;
+}
+
+inline bool PartitionedHashTableCtx::eval_and_hash_probe(TupleRow* row, uint32_t* hash) {
+ bool has_null = EvalProbeRow(row);
+ if ((!_stores_nulls || !_finds_nulls) && has_null) {
+ return false;
+ }
+ *hash = HashCurrentRow();
+ return true;
+}
+
+inline int64_t PartitionedHashTable::probe(Bucket* buckets, int64_t num_buckets,
+ PartitionedHashTableCtx* ht_ctx, uint32_t hash, bool* found) {
+ DCHECK(buckets != NULL);
+ DCHECK_GT(num_buckets, 0);
+ *found = false;
+ int64_t bucket_idx = hash & (num_buckets - 1);
+
+ // In case of linear probing it counts the total number of steps for statistics and
+ // for knowing when to exit the loop (e.g. by capping the total travel length). In case
+ // of quadratic probing it is also used for calculating the length of the next jump.
+ int64_t step = 0;
+ do {
+ Bucket* bucket = &buckets[bucket_idx];
+ if (!bucket->filled) {
+ return bucket_idx;
+ }
+ if (hash == bucket->hash) {
+ if (ht_ctx != NULL && ht_ctx->equals(get_row(bucket, ht_ctx->_row))) {
+ *found = true;
+ return bucket_idx;
+ }
+ // Row equality failed, or not performed. This is a hash collision. Continue
+ // searching.
+ ++_num_hash_collisions;
+ }
+ // Move to the next bucket.
+ ++step;
+ ++_travel_length;
+ if (_quadratic_probing) {
+ // The i-th probe location is idx = (hash + (step * (step + 1)) / 2) mod num_buckets.
+ // This gives num_buckets unique idxs (between 0 and N-1) when num_buckets is a power
+ // of 2.
+ bucket_idx = (bucket_idx + step) & (num_buckets - 1);
+ } else {
+ bucket_idx = (bucket_idx + 1) & (num_buckets - 1);
+ }
+ } while (LIKELY(step < num_buckets));
+ DCHECK_EQ(_num_filled_buckets, num_buckets) << "Probing of a non-full table "
+ << "failed: " << _quadratic_probing << " " << hash;
+ return Iterator::BUCKET_NOT_FOUND;
+}
+
+inline PartitionedHashTable::HtData* PartitionedHashTable::insert_internal(
+ PartitionedHashTableCtx* ht_ctx, uint32_t hash) {
+ ++_num_probes;
+ bool found = false;
+ int64_t bucket_idx = probe(_buckets, _num_buckets, ht_ctx, hash, &found);
+ DCHECK_NE(bucket_idx, Iterator::BUCKET_NOT_FOUND);
+ if (found) {
+ // We need to insert a duplicate node, note that this may fail to allocate memory.
+ DuplicateNode* new_node = insert_duplicate_node(bucket_idx);
+ if (UNLIKELY(new_node == NULL)) {
+ return NULL;
+ }
+ return &new_node->htdata;
+ } else {
+ prepare_bucket_for_insert(bucket_idx, hash);
+ return &_buckets[bucket_idx].bucketData.htdata;
+ }
+}
+
+inline bool PartitionedHashTable::insert(PartitionedHashTableCtx* ht_ctx,
+ const BufferedTupleStream2::RowIdx& idx, TupleRow* row, uint32_t hash) {
+ if (_stores_tuples) {
+ return insert(ht_ctx, row->get_tuple(0), hash);
+ }
+ HtData* htdata = insert_internal(ht_ctx, hash);
+ // If successful insert, update the contents of the newly inserted entry with 'idx'.
+ if (LIKELY(htdata != NULL)) {
+ htdata->idx = idx;
+ return true;
+ }
+ return false;
+}
+
+inline bool PartitionedHashTable::insert(
+ PartitionedHashTableCtx* ht_ctx, Tuple* tuple, uint32_t hash) {
+ DCHECK(_stores_tuples);
+ HtData* htdata = insert_internal(ht_ctx, hash);
+ // If successful insert, update the contents of the newly inserted entry with 'tuple'.
+ if (LIKELY(htdata != NULL)) {
+ htdata->tuple = tuple;
+ return true;
+ }
+ return false;
+}
+
+inline PartitionedHashTable::Iterator PartitionedHashTable::find(
+ PartitionedHashTableCtx* ht_ctx, uint32_t hash) {
+ ++_num_probes;
+ bool found = false;
+ int64_t bucket_idx = probe(_buckets, _num_buckets, ht_ctx, hash, &found);
+ if (found) {
+ return Iterator(this, ht_ctx->row(), bucket_idx,
+ _buckets[bucket_idx].bucketData.duplicates);
+ }
+ return End();
+}
+
+inline PartitionedHashTable::Iterator PartitionedHashTable::find_bucket(
+ PartitionedHashTableCtx* ht_ctx, uint32_t hash,
+ bool* found) {
+ ++_num_probes;
+ int64_t bucket_idx = probe(_buckets, _num_buckets, ht_ctx, hash, found);
+ DuplicateNode* duplicates = LIKELY(bucket_idx != Iterator::BUCKET_NOT_FOUND) ?
+ _buckets[bucket_idx].bucketData.duplicates : NULL;
+ return Iterator(this, ht_ctx->row(), bucket_idx, duplicates);
+}
+
+inline PartitionedHashTable::Iterator PartitionedHashTable::begin(PartitionedHashTableCtx* ctx) {
+ int64_t bucket_idx = Iterator::BUCKET_NOT_FOUND;
+ DuplicateNode* node = NULL;
+ next_filled_bucket(&bucket_idx, &node);
+ return Iterator(this, ctx->row(), bucket_idx, node);
+}
+
+inline PartitionedHashTable::Iterator PartitionedHashTable::first_unmatched(
+ PartitionedHashTableCtx* ctx) {
+ int64_t bucket_idx = Iterator::BUCKET_NOT_FOUND;
+ DuplicateNode* node = NULL;
+ next_filled_bucket(&bucket_idx, &node);
+ Iterator it(this, ctx->row(), bucket_idx, node);
+ // Check whether the bucket, or its first duplicate node, is matched. If it is not
+ // matched, then return. Otherwise, move to the first unmatched entry (node or bucket).
+ Bucket* bucket = &_buckets[bucket_idx];
+ if ((!bucket->hasDuplicates && bucket->matched) ||
+ (bucket->hasDuplicates && node->matched)) {
+ it.next_unmatched();
+ }
+ return it;
+}
+
+inline void PartitionedHashTable::next_filled_bucket(int64_t* bucket_idx, DuplicateNode** node) {
+ ++*bucket_idx;
+ for (; *bucket_idx < _num_buckets; ++*bucket_idx) {
+ if (_buckets[*bucket_idx].filled) {
+ *node = _buckets[*bucket_idx].bucketData.duplicates;
+ return;
+ }
+ }
+ // Reached the end of the hash table.
+ *bucket_idx = Iterator::BUCKET_NOT_FOUND;
+ *node = NULL;
+}
+
+inline void PartitionedHashTable::prepare_bucket_for_insert(int64_t bucket_idx, uint32_t hash) {
+ DCHECK_GE(bucket_idx, 0);
+ DCHECK_LT(bucket_idx, _num_buckets);
+ Bucket* bucket = &_buckets[bucket_idx];
+ DCHECK(!bucket->filled);
+ ++_num_filled_buckets;
+ bucket->filled = true;
+ bucket->matched = false;
+ bucket->hasDuplicates = false;
+ bucket->hash = hash;
+}
+
+inline PartitionedHashTable::DuplicateNode* PartitionedHashTable::append_next_node(
+ Bucket* bucket) {
+ DCHECK_GT(_node_remaining_current_page, 0);
+ bucket->bucketData.duplicates = _next_node;
+ ++_num_duplicate_nodes;
+ --_node_remaining_current_page;
+ return _next_node++;
+}
+
+inline PartitionedHashTable::DuplicateNode* PartitionedHashTable::insert_duplicate_node(
+ int64_t bucket_idx) {
+ DCHECK_GE(bucket_idx, 0);
+ DCHECK_LT(bucket_idx, _num_buckets);
+ Bucket* bucket = &_buckets[bucket_idx];
+ DCHECK(bucket->filled);
+ // Allocate one duplicate node for the new data and one for the preexisting data,
+ // if needed.
+ while (_node_remaining_current_page < 1 + !bucket->hasDuplicates) {
+ if (UNLIKELY(!grow_node_array())) {
+ return NULL;
+ }
+ }
+ if (!bucket->hasDuplicates) {
+ // This is the first duplicate in this bucket. It means that we need to convert
+ // the current entry in the bucket to a node and link it from the bucket.
+ _next_node->htdata.idx = bucket->bucketData.htdata.idx;
+ DCHECK(!bucket->matched);
+ _next_node->matched = false;
+ _next_node->next = NULL;
+ append_next_node(bucket);
+ bucket->hasDuplicates = true;
+ ++_num_buckets_with_duplicates;
+ }
+ // Link a new node.
+ _next_node->next = bucket->bucketData.duplicates;
+ _next_node->matched = false;
+ return append_next_node(bucket);
+}
+
+inline TupleRow* PartitionedHashTable::get_row(HtData& htdata, TupleRow* row) const {
+ if (_stores_tuples) {
+ return reinterpret_cast<TupleRow*>(&htdata.tuple);
+ } else {
+ _tuple_stream->get_tuple_row(htdata.idx, row);
+ return row;
+ }
+}
+
+inline TupleRow* PartitionedHashTable::get_row(Bucket* bucket, TupleRow* row) const {
+ DCHECK(bucket != NULL);
+ if (UNLIKELY(bucket->hasDuplicates)) {
+ DuplicateNode* duplicate = bucket->bucketData.duplicates;
+ DCHECK(duplicate != NULL);
+ return get_row(duplicate->htdata, row);
+ } else {
+ return get_row(bucket->bucketData.htdata, row);
+ }
+}
+
+inline TupleRow* PartitionedHashTable::Iterator::get_row() const {
+ DCHECK(!at_end());
+ DCHECK(_table != NULL);
+ DCHECK(_row != NULL);
+ Bucket* bucket = &_table->_buckets[_bucket_idx];
+ if (UNLIKELY(bucket->hasDuplicates)) {
+ DCHECK(_node != NULL);
+ return _table->get_row(_node->htdata, _row);
+ } else {
+ return _table->get_row(bucket->bucketData.htdata, _row);
+ }
+}
+
+inline Tuple* PartitionedHashTable::Iterator::get_tuple() const {
+ DCHECK(!at_end());
+ DCHECK(_table->_stores_tuples);
+ Bucket* bucket = &_table->_buckets[_bucket_idx];
+ // TODO: To avoid the hasDuplicates check, store the HtData* in the Iterator.
+ if (UNLIKELY(bucket->hasDuplicates)) {
+ DCHECK(_node != NULL);
+ return _node->htdata.tuple;
+ } else {
+ return bucket->bucketData.htdata.tuple;
+ }
+}
+
+inline void PartitionedHashTable::Iterator::set_tuple(Tuple* tuple, uint32_t hash) {
+ DCHECK(!at_end());
+ DCHECK(_table->_stores_tuples);
+ _table->prepare_bucket_for_insert(_bucket_idx, hash);
+ _table->_buckets[_bucket_idx].bucketData.htdata.tuple = tuple;
+}
+
+inline void PartitionedHashTable::Iterator::set_matched() {
+ DCHECK(!at_end());
+ Bucket* bucket = &_table->_buckets[_bucket_idx];
+ if (bucket->hasDuplicates) {
+ _node->matched = true;
+ } else {
+ bucket->matched = true;
+ }
+ // Used for disabling spilling of hash tables in right and full-outer joins with
+ // matches. See IMPALA-1488.
+ _table->_has_matches = true;
+}
+
+inline bool PartitionedHashTable::Iterator::is_matched() const {
+ DCHECK(!at_end());
+ Bucket* bucket = &_table->_buckets[_bucket_idx];
+ if (bucket->hasDuplicates) {
+ return _node->matched;
+ }
+ return bucket->matched;
+}
+
+inline void PartitionedHashTable::Iterator::set_at_end() {
+ _bucket_idx = BUCKET_NOT_FOUND;
+ _node = NULL;
+}
+
+inline void PartitionedHashTable::Iterator::next() {
+ DCHECK(!at_end());
+ if (_table->_buckets[_bucket_idx].hasDuplicates && _node->next != NULL) {
+ _node = _node->next;
+ } else {
+ _table->next_filled_bucket(&_bucket_idx, &_node);
+ }
+}
+
+inline void PartitionedHashTable::Iterator::next_duplicate() {
+ DCHECK(!at_end());
+ if (_table->_buckets[_bucket_idx].hasDuplicates && _node->next != NULL) {
+ _node = _node->next;
+ } else {
+ _bucket_idx = BUCKET_NOT_FOUND;
+ _node = NULL;
+ }
+}
+
+inline void PartitionedHashTable::Iterator::next_unmatched() {
+ DCHECK(!at_end());
+ Bucket* bucket = &_table->_buckets[_bucket_idx];
+ // Check if there is any remaining unmatched duplicate node in the current bucket.
+ if (bucket->hasDuplicates) {
+ while (_node->next != NULL) {
+ _node = _node->next;
+ if (!_node->matched) {
+ return;
+ }
+ }
+ }
+ // Move to the next filled bucket and return if this bucket is not matched or
+ // iterate to the first not matched duplicate node.
+ _table->next_filled_bucket(&_bucket_idx, &_node);
+ while (_bucket_idx != Iterator::BUCKET_NOT_FOUND) {
+ bucket = &_table->_buckets[_bucket_idx];
+ if (!bucket->hasDuplicates) {
+ if (!bucket->matched) {
+ return;
+ }
+ } else {
+ while (_node->matched && _node->next != NULL) {
+ _node = _node->next;
+ }
+ if (!_node->matched) {
+ return;
+ }
+ }
+ _table->next_filled_bucket(&_bucket_idx, &_node);
+ }
+}
+
+inline void PartitionedHashTableCtx::set_level(int level) {
+ DCHECK_GE(level, 0);
+ DCHECK_LT(level, _seeds.size());
+ _level = level;
+}
+
+} // end namespace doris
+
+#endif // DORIS_BE_SRC_EXEC_PARTITIONED_HASH_TABLE_INLINE_H
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org