You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by li...@apache.org on 2020/03/20 06:38:15 UTC

[incubator-doris] 01/01: Revert "[CodeStyle] Remove unused PartitionedHashTable (#3156)"

This is an automated email from the ASF dual-hosted git repository.

lichaoyong pushed a commit to branch revert-3156-master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit cb1491b32d74950d4d4aeaa7fd732e303e9efb4e
Author: lichaoyong <li...@baidu.com>
AuthorDate: Fri Mar 20 14:38:07 2020 +0800

    Revert "[CodeStyle] Remove unused PartitionedHashTable (#3156)"
    
    This reverts commit d3fd44f0a2fe076d2c62851babc162fcebe4d63b.
---
 be/src/exec/CMakeLists.txt                  |   2 +
 be/src/exec/hash_join_node.cpp              |   2 +-
 be/src/exec/partitioned_hash_table.cc       | 440 ++++++++++++++++++
 be/src/exec/partitioned_hash_table.h        | 673 ++++++++++++++++++++++++++++
 be/src/exec/partitioned_hash_table.inline.h | 379 ++++++++++++++++
 5 files changed, 1495 insertions(+), 1 deletion(-)

diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt
index 3c3fd71..20e00ef 100644
--- a/be/src/exec/CMakeLists.txt
+++ b/be/src/exec/CMakeLists.txt
@@ -83,6 +83,8 @@ set(EXEC_FILES
     schema_scanner/schema_charsets_scanner.cpp
     schema_scanner/schema_collations_scanner.cpp
     schema_scanner/schema_helper.cpp
+    partitioned_hash_table.cc
+    partitioned_hash_table_ir.cc
     new_partitioned_hash_table.cc
     new_partitioned_hash_table_ir.cc
     partitioned_aggregation_node.cc
diff --git a/be/src/exec/hash_join_node.cpp b/be/src/exec/hash_join_node.cpp
index 72db1be..d03dd3e 100644
--- a/be/src/exec/hash_join_node.cpp
+++ b/be/src/exec/hash_join_node.cpp
@@ -46,7 +46,7 @@ HashJoinNode::HashJoinNode(
     _is_push_down = tnode.hash_join_node.is_push_down;
     _build_unique = _join_op == TJoinOp::LEFT_ANTI_JOIN|| _join_op == TJoinOp::RIGHT_ANTI_JOIN
         || _join_op == TJoinOp::RIGHT_SEMI_JOIN || _join_op == TJoinOp::LEFT_SEMI_JOIN 
-        || _join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN;
+        || _join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN
 }
 
 HashJoinNode::~HashJoinNode() {
diff --git a/be/src/exec/partitioned_hash_table.cc b/be/src/exec/partitioned_hash_table.cc
new file mode 100644
index 0000000..61d9d2e
--- /dev/null
+++ b/be/src/exec/partitioned_hash_table.cc
@@ -0,0 +1,440 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/partitioned_hash_table.inline.h"
+
+#include "exprs/expr.h"
+#include "exprs/expr_context.h"
+#include "exprs/slot_ref.h"
+#include "runtime/buffered_block_mgr.h"
+#include "runtime/mem_tracker.h"
+#include "runtime/raw_value.h"
+#include "runtime/runtime_state.h"
+#include "runtime/string_value.hpp"
+#include "util/doris_metrics.h"
+
+// DEFINE_bool(enable_quadratic_probing, true, "Enable quadratic probing hash table");
+
+using std::string;
+using std::stringstream;
+using std::vector;
+using std::endl;
+
+namespace doris {
+
+// Random primes to multiply the seed with.
+static uint32_t SEED_PRIMES[] = {
+    1, // First seed must be 1, level 0 is used by other operators in the fragment.
+    1431655781,
+    1183186591,
+    622729787,
+    472882027,
+    338294347,
+    275604541,
+    41161739,
+    29999999,
+    27475109,
+    611603,
+    16313357,
+    11380003,
+    21261403,
+    33393119,
+    101,
+    71043403
+};
+
+// Put a non-zero constant in the result location for NULL.
+// We don't want(NULL, 1) to hash to the same as (0, 1).
+// This needs to be as big as the biggest primitive type since the bytes
+// get copied directly.
+// TODO find a better approach, since primitives like CHAR(N) can be up to 128 bytes
+static int64_t NULL_VALUE[] = { HashUtil::FNV_SEED, HashUtil::FNV_SEED,
+                                HashUtil::FNV_SEED, HashUtil::FNV_SEED,
+                                HashUtil::FNV_SEED, HashUtil::FNV_SEED,
+                                HashUtil::FNV_SEED, HashUtil::FNV_SEED,
+                                HashUtil::FNV_SEED, HashUtil::FNV_SEED,
+                                HashUtil::FNV_SEED, HashUtil::FNV_SEED,
+                                HashUtil::FNV_SEED, HashUtil::FNV_SEED,
+                                HashUtil::FNV_SEED, HashUtil::FNV_SEED };
+
+// The first NUM_SMALL_BLOCKS of _nodes are made of blocks less than the IO size (of 8MB)
+// to reduce the memory footprint of small queries. In particular, we always first use a
+// 64KB and a 512KB block before starting using IO-sized blocks.
+static const int64_t INITIAL_DATA_PAGE_SIZES[] = { 64 * 1024, 512 * 1024 };
+static const int NUM_SMALL_DATA_PAGES = sizeof(INITIAL_DATA_PAGE_SIZES) / sizeof(int64_t);
+
+PartitionedHashTableCtx::PartitionedHashTableCtx(
+        const vector<ExprContext*>& build_expr_ctxs, const vector<ExprContext*>& probe_expr_ctxs,
+        bool stores_nulls, bool finds_nulls,
+        int32_t initial_seed, int max_levels, int num_build_tuples) :
+            _build_expr_ctxs(build_expr_ctxs),
+            _probe_expr_ctxs(probe_expr_ctxs),
+            _stores_nulls(stores_nulls),
+            _finds_nulls(finds_nulls),
+            _level(0),
+            _row(reinterpret_cast<TupleRow*>(malloc(sizeof(Tuple*) * num_build_tuples))) {
+    // Compute the layout and buffer size to store the evaluated expr results
+    DCHECK_EQ(_build_expr_ctxs.size(), _probe_expr_ctxs.size());
+    DCHECK(!_build_expr_ctxs.empty());
+    _results_buffer_size = Expr::compute_results_layout(_build_expr_ctxs,
+            &_expr_values_buffer_offsets, &_var_result_begin);
+    _expr_values_buffer = new uint8_t[_results_buffer_size];
+    memset(_expr_values_buffer, 0, sizeof(uint8_t) * _results_buffer_size);
+    _expr_value_null_bits = new uint8_t[build_expr_ctxs.size()];
+
+    // Populate the seeds to use for all the levels. TODO: revisit how we generate these.
+    DCHECK_GE(max_levels, 0);
+    DCHECK_LT(max_levels, sizeof(SEED_PRIMES) / sizeof(SEED_PRIMES[0]));
+    DCHECK_NE(initial_seed, 0);
+    _seeds.resize(max_levels + 1);
+    _seeds[0] = initial_seed;
+    for (int i = 1; i <= max_levels; ++i) {
+        _seeds[i] = _seeds[i - 1] * SEED_PRIMES[i];
+    }
+}
+
+void PartitionedHashTableCtx::close() {
+    // TODO: use tr1::array?
+    DCHECK(_expr_values_buffer != NULL);
+    delete[] _expr_values_buffer;
+    _expr_values_buffer = NULL;
+    DCHECK(_expr_value_null_bits != NULL);
+    delete[] _expr_value_null_bits;
+    _expr_value_null_bits = NULL;
+    free(_row);
+    _row = NULL;
+}
+
+bool PartitionedHashTableCtx::eval_row(TupleRow* row, const vector<ExprContext*>& ctxs) {
+    bool has_null = false;
+    for (int i = 0; i < ctxs.size(); ++i) {
+        void* loc = _expr_values_buffer + _expr_values_buffer_offsets[i];
+        void* val = ctxs[i]->get_value(row);
+        if (val == NULL) {
+            // If the table doesn't store nulls, no reason to keep evaluating
+            if (!_stores_nulls) {
+                return true;
+            }
+
+            _expr_value_null_bits[i] = true;
+            val = reinterpret_cast<void*>(&NULL_VALUE);
+            has_null = true;
+        } else {
+            _expr_value_null_bits[i] = false;
+        }
+        DCHECK_LE(_build_expr_ctxs[i]->root()->type().get_slot_size(),
+                sizeof(NULL_VALUE));
+        RawValue::write(val, loc, _build_expr_ctxs[i]->root()->type(), NULL);
+    }
+    return has_null;
+}
+
+uint32_t PartitionedHashTableCtx::hash_variable_len_row() {
+    uint32_t hash_val = _seeds[_level];
+    // Hash the non-var length portions (if there are any)
+    if (_var_result_begin != 0) {
+        hash_val = hash_help(_expr_values_buffer, _var_result_begin, hash_val);
+    }
+
+    for (int i = 0; i < _build_expr_ctxs.size(); ++i) {
+        // non-string and null slots are already part of expr_values_buffer
+        // if (_build_expr_ctxs[i]->root()->type().type != TYPE_STRING &&
+        if (_build_expr_ctxs[i]->root()->type().type != TYPE_VARCHAR) {
+            continue;
+        }
+
+        void* loc = _expr_values_buffer + _expr_values_buffer_offsets[i];
+        if (_expr_value_null_bits[i]) {
+            // Hash the null random seed values at 'loc'
+            hash_val = hash_help(loc, sizeof(StringValue), hash_val);
+        } else {
+            // Hash the string
+            // TODO: when using CRC hash on empty string, this only swaps bytes.
+            StringValue* str = reinterpret_cast<StringValue*>(loc);
+            hash_val = hash_help(str->ptr, str->len, hash_val);
+        }
+    }
+    return hash_val;
+}
+
+bool PartitionedHashTableCtx::equals(TupleRow* build_row) {
+    for (int i = 0; i < _build_expr_ctxs.size(); ++i) {
+        void* val = _build_expr_ctxs[i]->get_value(build_row);
+        if (val == NULL) {
+            if (!_stores_nulls) {
+                return false;
+            }
+            if (!_expr_value_null_bits[i]) {
+                return false;
+            }
+            continue;
+        } else {
+            if (_expr_value_null_bits[i]) {
+                return false;
+            }
+        }
+
+        void* loc = _expr_values_buffer + _expr_values_buffer_offsets[i];
+        if (!RawValue::eq(loc, val, _build_expr_ctxs[i]->root()->type())) {
+            return false;
+        }
+    }
+    return true;
+}
+
+const double PartitionedHashTable::MAX_FILL_FACTOR = 0.75f;
+
+PartitionedHashTable* PartitionedHashTable::create(RuntimeState* state,
+        BufferedBlockMgr2::Client* client, int num_build_tuples,
+        BufferedTupleStream2* tuple_stream, int64_t max_num_buckets,
+        int64_t initial_num_buckets) {
+    // return new PartitionedHashTable(FLAGS_enable_quadratic_probing, state, client,
+    //         num_build_tuples, tuple_stream, max_num_buckets, initial_num_buckets);
+    return new PartitionedHashTable(config::enable_quadratic_probing, state, client,
+            num_build_tuples, tuple_stream, max_num_buckets, initial_num_buckets);
+}
+
+PartitionedHashTable::PartitionedHashTable(bool quadratic_probing, RuntimeState* state,
+        BufferedBlockMgr2::Client* client, int num_build_tuples, BufferedTupleStream2* stream,
+        int64_t max_num_buckets, int64_t num_buckets) :
+            _state(state),
+            _block_mgr_client(client),
+            _tuple_stream(stream),
+            _stores_tuples(num_build_tuples == 1),
+            _quadratic_probing(quadratic_probing),
+            _total_data_page_size(0),
+            _next_node(NULL),
+            _node_remaining_current_page(0),
+            _num_duplicate_nodes(0),
+            _max_num_buckets(max_num_buckets),
+            _buckets(NULL),
+            _num_buckets(num_buckets),
+            _num_filled_buckets(0),
+            _num_buckets_with_duplicates(0),
+            _num_build_tuples(num_build_tuples),
+            _has_matches(false),
+            _num_probes(0),
+            _num_failed_probes(0),
+            _travel_length(0),
+            _num_hash_collisions(0),
+            _num_resizes(0) {
+    DCHECK_EQ((num_buckets & (num_buckets-1)), 0) << "num_buckets must be a power of 2";
+    DCHECK_GT(num_buckets, 0) << "num_buckets must be larger than 0";
+    DCHECK(_stores_tuples || stream != NULL);
+    DCHECK(client != NULL);
+}
+
+bool PartitionedHashTable::init() {
+    int64_t buckets_byte_size = _num_buckets * sizeof(Bucket);
+    if (!_state->block_mgr2()->consume_memory(_block_mgr_client, buckets_byte_size)) {
+        _num_buckets = 0;
+        return false;
+    }
+    _buckets = reinterpret_cast<Bucket*>(malloc(buckets_byte_size));
+    memset(_buckets, 0, buckets_byte_size);
+    return true;
+}
+
+void PartitionedHashTable::close() {
+    // Print statistics only for the large or heavily used hash tables.
+    // TODO: Tweak these numbers/conditions, or print them always?
+    const int64_t LARGE_HT = 128 * 1024;
+    const int64_t HEAVILY_USED = 1024 * 1024;
+    // TODO: These statistics should go to the runtime profile as well.
+    if ((_num_buckets > LARGE_HT) || (_num_probes > HEAVILY_USED)) {
+        VLOG(2) << print_stats();
+    }
+    for (int i = 0; i < _data_pages.size(); ++i) {
+        _data_pages[i]->del();
+    }
+#if 0
+    if (DorisMetrics::hash_table_total_bytes() != NULL) {
+        DorisMetrics::hash_table_total_bytes()->increment(-_total_data_page_size);
+    }
+#endif
+    _data_pages.clear();
+    if (_buckets != NULL) {
+        free(_buckets);
+    }
+    _state->block_mgr2()->release_memory(_block_mgr_client, _num_buckets * sizeof(Bucket));
+}
+
+int64_t PartitionedHashTable::current_mem_size() const {
+    return _num_buckets * sizeof(Bucket) + _num_duplicate_nodes * sizeof(DuplicateNode);
+}
+
+bool PartitionedHashTable::check_and_resize(
+        uint64_t buckets_to_fill, PartitionedHashTableCtx* ht_ctx) {
+    uint64_t shift = 0;
+    while (_num_filled_buckets + buckets_to_fill >
+            (_num_buckets << shift) * MAX_FILL_FACTOR) {
+        // TODO: next prime instead of double?
+        ++shift;
+    }
+    if (shift > 0) {
+        return resize_buckets(_num_buckets << shift, ht_ctx);
+    }
+    return true;
+}
+
+bool PartitionedHashTable::resize_buckets(int64_t num_buckets, PartitionedHashTableCtx* ht_ctx) {
+    DCHECK_EQ((num_buckets & (num_buckets-1)), 0)
+        << "num_buckets=" << num_buckets << " must be a power of 2";
+    DCHECK_GT(num_buckets, _num_filled_buckets) << "Cannot shrink the hash table to "
+        "smaller number of buckets than the number of filled buckets.";
+    VLOG(2) << "Resizing hash table from "
+        << _num_buckets << " to " << num_buckets << " buckets.";
+    if (_max_num_buckets != -1 && num_buckets > _max_num_buckets) {
+        return false;
+    }
+    ++_num_resizes;
+
+    // All memory that can grow proportional to the input should come from the block mgrs
+    // mem tracker.
+    // Note that while we copying over the contents of the old hash table, we need to have
+    // allocated both the old and the new hash table. Once we finish, we return the memory
+    // of the old hash table.
+    int64_t old_size = _num_buckets * sizeof(Bucket);
+    int64_t new_size = num_buckets * sizeof(Bucket);
+    if (!_state->block_mgr2()->consume_memory(_block_mgr_client, new_size)) {
+        return false;
+    }
+    Bucket* new_buckets = reinterpret_cast<Bucket*>(malloc(new_size));
+    DCHECK(new_buckets != NULL);
+    memset(new_buckets, 0, new_size);
+
+    // Walk the old table and copy all the filled buckets to the new (resized) table.
+    // We do not have to do anything with the duplicate nodes. This operation is expected
+    // to succeed.
+    for (PartitionedHashTable::Iterator iter = begin(ht_ctx); !iter.at_end();
+            next_filled_bucket(&iter._bucket_idx, &iter._node)) {
+        Bucket* bucket_to_copy = &_buckets[iter._bucket_idx];
+        bool found = false;
+        int64_t bucket_idx = probe(new_buckets, num_buckets, NULL, bucket_to_copy->hash, &found);
+        DCHECK(!found);
+        DCHECK_NE(bucket_idx, Iterator::BUCKET_NOT_FOUND) << " Probe failed even though "
+            " there are free buckets. " << num_buckets << " " << _num_filled_buckets;
+        Bucket* dst_bucket = &new_buckets[bucket_idx];
+        *dst_bucket = *bucket_to_copy;
+    }
+
+    _num_buckets = num_buckets;
+    free(_buckets);
+    _buckets = new_buckets;
+    _state->block_mgr2()->release_memory(_block_mgr_client, old_size);
+    return true;
+}
+
+bool PartitionedHashTable::grow_node_array() {
+    int64_t page_size = 0;
+    page_size = _state->block_mgr2()->max_block_size();
+    if (_data_pages.size() < NUM_SMALL_DATA_PAGES) {
+        page_size = std::min(page_size, INITIAL_DATA_PAGE_SIZES[_data_pages.size()]);
+    }
+    BufferedBlockMgr2::Block* block = NULL;
+    Status status = _state->block_mgr2()->get_new_block(
+            _block_mgr_client, NULL, &block, page_size);
+    DCHECK(status.ok() || block == NULL);
+    if (block == NULL) {
+        return false;
+    }
+    _data_pages.push_back(block);
+    _next_node = block->allocate<DuplicateNode>(page_size);
+#if 0
+    if (DorisMetrics::hash_table_total_bytes() != NULL) {
+        DorisMetrics::hash_table_total_bytes()->increment(page_size);
+    }
+#endif
+    _node_remaining_current_page = page_size / sizeof(DuplicateNode);
+    _total_data_page_size += page_size;
+    return true;
+}
+
+void PartitionedHashTable::debug_string_tuple(
+        stringstream& ss, HtData& htdata, const RowDescriptor* desc) {
+    if (_stores_tuples) {
+        ss << "(" << htdata.tuple << ")";
+    } else {
+        ss << "(" << htdata.idx.block() << ", " << htdata.idx.idx()
+            << ", " << htdata.idx.offset() << ")";
+    }
+    if (desc != NULL) {
+        Tuple* row[_num_build_tuples];
+        ss << " " << get_row(htdata, reinterpret_cast<TupleRow*>(row))->to_string(*desc);
+    }
+}
+
+string PartitionedHashTable::debug_string(
+        bool skip_empty, bool show_match, const RowDescriptor* desc) {
+    stringstream ss;
+    ss << endl;
+    for (int i = 0; i < _num_buckets; ++i) {
+        if (skip_empty && !_buckets[i].filled) {
+            continue;
+        }
+        ss << i << ": ";
+        if (show_match) {
+            if (_buckets[i].matched) {
+                ss << " [M]";
+            } else {
+                ss << " [U]";
+            }
+        }
+        if (_buckets[i].hasDuplicates) {
+            DuplicateNode* node = _buckets[i].bucketData.duplicates;
+            bool first = true;
+            ss << " [D] ";
+            while (node != NULL) {
+                if (!first) {
+                    ss << ",";
+                }
+                debug_string_tuple(ss, node->htdata, desc);
+                node = node->next;
+                first = false;
+            }
+        } else {
+            ss << " [B] ";
+            if (_buckets[i].filled) {
+                debug_string_tuple(ss, _buckets[i].bucketData.htdata, desc);
+            } else {
+                ss << " - ";
+            }
+        }
+        ss << endl;
+    }
+    return ss.str();
+}
+
+string PartitionedHashTable::print_stats() const {
+    double curr_fill_factor = (double)_num_filled_buckets / (double)_num_buckets;
+    double avg_travel = (double)_travel_length / (double)_num_probes;
+    double avg_collisions = (double)_num_hash_collisions / (double)_num_filled_buckets;
+    stringstream ss;
+    ss << "Buckets: " << _num_buckets << " " << _num_filled_buckets << " "
+        << curr_fill_factor << endl;
+    ss << "Duplicates: " << _num_buckets_with_duplicates << " buckets "
+        << _num_duplicate_nodes << " nodes" << endl;
+    ss << "Probes: " << _num_probes << endl;
+    ss << "FailedProbes: " << _num_failed_probes << endl;
+    ss << "Travel: " << _travel_length << " " << avg_travel << endl;
+    ss << "HashCollisions: " << _num_hash_collisions << " " << avg_collisions << endl;
+    ss << "Resizes: " << _num_resizes << endl;
+    return ss.str();
+}
+
+} // namespace doris
+
diff --git a/be/src/exec/partitioned_hash_table.h b/be/src/exec/partitioned_hash_table.h
new file mode 100644
index 0000000..94f6c98
--- /dev/null
+++ b/be/src/exec/partitioned_hash_table.h
@@ -0,0 +1,673 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef DORIS_BE_SRC_EXEC_PARTITIONED_HASH_TABLE_H
+#define DORIS_BE_SRC_EXEC_PARTITIONED_HASH_TABLE_H
+
+#include <vector>
+#include <boost/cstdint.hpp>
+#include <boost/scoped_ptr.hpp>
+
+#include "codegen/doris_ir.h"
+#include "util/logging.h"
+#include "runtime/buffered_block_mgr2.h"
+#include "runtime/buffered_tuple_stream2.h"
+#include "runtime/buffered_tuple_stream2.inline.h"
+#include "runtime/mem_tracker.h"
+#include "runtime/mem_tracker.h"
+#include "runtime/tuple_row.h"
+#include "util/hash_util.hpp"
+#include "util/bit_util.h"
+
+namespace doris {
+
+class Expr;
+class ExprContext;
+class MemTracker;
+class MemTracker;
+class RowDescriptor;
+class RuntimeState;
+class Tuple;
+class TupleRow;
+class PartitionedHashTable;
+
+// Linear or quadratic probing hash table implementation tailored to the usage pattern
+// for partitioned hash aggregation and hash joins. The hash table stores TupleRows and
+// allows for different exprs for insertions and finds. This is the pattern we use for
+// joins and aggregation where the input/build tuple row descriptor is different from the
+// find/probe descriptor. The implementation is designed to allow codegen for some paths.
+//
+// In addition to the hash table there is also an accompanying hash table context that
+// is used for insertions and probes. For example, the hash table context stores
+// evaluated expr results for the current row being processed when possible into a
+// contiguous memory buffer. This allows for efficient hash computation.
+//
+// The hash table does not support removes. The hash table is not thread safe.
+// The table is optimized for the partition hash aggregation and hash joins and is not
+// intended to be a generic hash table implementation. The API loosely mimics the
+// std::hashset API.
+//
+// The data (rows) are stored in a BufferedTupleStream2. The basic data structure of this
+// hash table is a vector of buckets. The buckets (indexed by the mod of the hash)
+// contain a pointer to either the slot in the tuple-stream or in case of duplicate
+// values, to the head of a linked list of nodes that in turn contain a pointer to
+// tuple-stream slots. When inserting an entry we start at the bucket at position
+// (hash % size) and search for either a bucket with the same hash or for an empty
+// bucket. If a bucket with the same hash is found, we then compare for row equality and
+// either insert a duplicate node if the equality is true, or continue the search if the
+// row equality is false. Similarly, when probing we start from the bucket at position
+// (hash % size) and search for an entry with the same hash or for an empty bucket.
+// In the former case, we then check for row equality and continue the search if the row
+// equality is false. In the latter case, the probe is not successful. When growing the
+// hash table, the number of buckets is doubled. We trigger a resize when the fill
+// factor is approx 75%. Due to the doubling nature of the buckets, we require that the
+// number of buckets is a power of 2. This allows us to perform a modulo of the hash
+// using a bitmask.
+//
+// We choose to use linear or quadratic probing because they exhibit good (predictable)
+// cache behavior.
+//
+// The first NUM_SMALL_BLOCKS of _nodes are made of blocks less than the IO size (of 8MB)
+// to reduce the memory footprint of small queries.
+//
+// TODO: Compare linear and quadratic probing and remove the loser.
+// TODO: We currently use 32-bit hashes. There is room in the bucket structure for at
+// least 48-bits. We should exploit this space.
+// TODO: Consider capping the probes with a threshold value. If an insert reaches
+// that threshold it is inserted to another linked list of overflow entries.
+// TODO: Smarter resizes, and perhaps avoid using powers of 2 as the hash table size.
+// TODO: this is not a fancy hash table in terms of memory access patterns
+// (cuckoo-hashing or something that spills to disk). We will likely want to invest
+// more time into this.
+// TODO: hash-join and aggregation have very different access patterns.  Joins insert
+// all the rows and then calls scan to find them.  Aggregation interleaves find() and
+// Inserts().  We may want to optimize joins more heavily for Inserts() (in particular
+// growing).
+// TODO: Batched interface for inserts and finds.
+// TODO: Do we need to check mem limit exceeded so often. Check once per batch?
+
+// Control block for a hash table. This class contains the logic as well as the variables
+// needed by a thread to operate on a hash table.
+class PartitionedHashTableCtx {
+public:
+    // Create a hash table context.
+    //  - build_exprs are the exprs that should be used to evaluate rows during insert().
+    //  - probe_exprs are used during find()
+    //  - stores_nulls: if false, TupleRows with nulls are ignored during insert
+    //  - finds_nulls: if false, find() returns End() for TupleRows with nulls
+    //      even if stores_nulls is true
+    //  - initial_seed: Initial seed value to use when computing hashes for rows with
+    //    level 0. Other levels have their seeds derived from this seed.
+    //  - The max levels we will hash with.
+    PartitionedHashTableCtx(const std::vector<ExprContext*>& build_expr_ctxs,
+            const std::vector<ExprContext*>& probe_expr_ctxs, bool stores_nulls,
+            bool finds_nulls, int32_t initial_seed, int max_levels,
+            int num_build_tuples);
+
+    // Call to cleanup any resources.
+    void close();
+
+    void set_level(int level);
+    int level() const { return _level; }
+    uint32_t seed(int level) { return _seeds.at(level); }
+
+    TupleRow* row() const { return _row; }
+
+    // Returns the results of the exprs at 'expr_idx' evaluated over the last row
+    // processed.
+    // This value is invalid if the expr evaluated to NULL.
+    // TODO: this is an awkward abstraction but aggregation node can take advantage of
+    // it and save some expr evaluation calls.
+    void* last_expr_value(int expr_idx) const {
+        return _expr_values_buffer + _expr_values_buffer_offsets[expr_idx];
+    }
+
+    // Returns if the expr at 'expr_idx' evaluated to NULL for the last row.
+    bool last_expr_value_null(int expr_idx) const {
+        return _expr_value_null_bits[expr_idx];
+    }
+
+    // Evaluate and hash the build/probe row, returning in *hash. Returns false if this
+    // row should be rejected (doesn't need to be processed further) because it
+    // contains NULL.
+    // These need to be inlined in the IR module so we can find and replace the calls to
+    // EvalBuildRow()/EvalProbeRow().
+    bool IR_ALWAYS_INLINE eval_and_hash_build(TupleRow* row, uint32_t* hash);
+    bool IR_ALWAYS_INLINE eval_and_hash_probe(TupleRow* row, uint32_t* hash);
+
+    int results_buffer_size() const { return _results_buffer_size; }
+
+private:
+    friend class PartitionedHashTable;
+    friend class PartitionedHashTableTest_HashEmpty_Test;
+
+    // Compute the hash of the values in _expr_values_buffer.
+    // This will be replaced by codegen.  We don't want this inlined for replacing
+    // with codegen'd functions so the function name does not change.
+    uint32_t IR_NO_INLINE HashCurrentRow() {
+        DCHECK_LT(_level, _seeds.size());
+        if (_var_result_begin == -1) {
+            // This handles NULLs implicitly since a constant seed value was put
+            // into results buffer for nulls.
+            // TODO: figure out which hash function to use. We need to generate uncorrelated
+            // hashes by changing just the seed. CRC does not have this property and FNV is
+            // okay. We should switch to something else.
+            return hash_help(_expr_values_buffer, _results_buffer_size, _seeds[_level]);
+        } else {
+            return PartitionedHashTableCtx::hash_variable_len_row();
+        }
+    }
+
+    // Wrapper function for calling correct HashUtil function in non-codegen'd case.
+    uint32_t inline hash_help(const void* input, int len, int32_t hash) {
+        // Use CRC hash at first level for better performance. Switch to murmur hash at
+        // subsequent levels since CRC doesn't randomize well with different seed inputs.
+        if (_level == 0) {
+            return HashUtil::hash(input, len, hash);
+        }
+        return HashUtil::murmur_hash2_64(input, len, hash);
+    }
+
+    // Evaluate 'row' over build exprs caching the results in '_expr_values_buffer' This
+    // will be replaced by codegen.  We do not want this function inlined when cross
+    // compiled because we need to be able to differentiate between EvalBuildRow and
+    // EvalProbeRow by name and the build/probe exprs are baked into the codegen'd
+    // function.
+    bool IR_NO_INLINE EvalBuildRow(TupleRow* row) {
+        return eval_row(row, _build_expr_ctxs);
+    }
+
+    // Evaluate 'row' over probe exprs caching the results in '_expr_values_buffer'
+    // This will be replaced by codegen.
+    bool IR_NO_INLINE EvalProbeRow(TupleRow* row) {
+        return eval_row(row, _probe_expr_ctxs);
+    }
+
+    // Compute the hash of the values in _expr_values_buffer for rows with variable length
+    // fields (e.g. strings).
+    uint32_t hash_variable_len_row();
+
+    // Evaluate the exprs over row and cache the results in '_expr_values_buffer'.
+    // Returns whether any expr evaluated to NULL.
+    // This will be replaced by codegen.
+    bool eval_row(TupleRow* row, const std::vector<ExprContext*>& ctxs);
+
+    // Returns true if the values of build_exprs evaluated over 'build_row' equal
+    // the values cached in _expr_values_buffer.
+    // This will be replaced by codegen.
+    bool IR_NO_INLINE equals(TupleRow* build_row);
+
+    const std::vector<ExprContext*>& _build_expr_ctxs;
+    const std::vector<ExprContext*>& _probe_expr_ctxs;
+
+    // Constants on how the hash table should behave. Joins and aggs have slightly
+    // different behavior.
+    // TODO: these constants are an ideal candidate to be removed with codegen.
+    // TODO: ..or with template-ization
+    const bool _stores_nulls;
+    const bool _finds_nulls;
+
+    // The current level this context is working on. Each level needs to use a
+    // different seed.
+    int _level;
+
+    // The seeds to use for hashing. Indexed by the level.
+    std::vector<uint32_t> _seeds;
+
+    // Cache of exprs values for the current row being evaluated.  This can either
+    // be a build row (during insert()) or probe row (during find()).
+    std::vector<int> _expr_values_buffer_offsets;
+
+    // Byte offset into '_expr_values_buffer' that begins the variable length results.
+    // If -1, there are no variable length slots. Never changes once set, can be removed
+    // with codegen.
+    int _var_result_begin;
+
+    // Byte size of '_expr_values_buffer'. Never changes once set, can be removed with
+    // codegen.
+    int _results_buffer_size;
+
+    // Buffer to store evaluated expr results.  This address must not change once
+    // allocated since the address is baked into the codegen.
+    uint8_t* _expr_values_buffer;
+
+    // Use bytes instead of bools to be compatible with llvm.  This address must
+    // not change once allocated.
+    uint8_t* _expr_value_null_bits;
+
+    // Scratch buffer to generate rows on the fly.
+    TupleRow* _row;
+
+    // Cross-compiled functions to access member variables used in codegen_hash_current_row().
+    uint32_t get_hash_seed() const;
+};
+
+// The hash table consists of a contiguous array of buckets that contain a pointer to the
+// data, the hash value and three flags: whether this bucket is filled, whether this
+// entry has been matched (used in right and full joins) and whether this entry has
+// duplicates. If there are duplicates, then the data is pointing to the head of a
+// linked list of duplicate nodes that point to the actual data. Note that the duplicate
+// nodes do not contain the hash value, because all the linked nodes have the same hash
+// value, the one in the bucket. The data is either a tuple stream index or a Tuple*.
+// This array of buckets is sparse, we are shooting for up to 3/4 fill factor (75%). The
+// data allocated by the hash table comes from the BufferedBlockMgr2.
+class PartitionedHashTable {
+private:
+
+    // Either the row in the tuple stream or a pointer to the single tuple of this row.
+    union HtData {
+        BufferedTupleStream2::RowIdx idx;
+        Tuple* tuple;
+    };
+
+    // Linked list of entries used for duplicates.
+    struct DuplicateNode {
+        // Used for full outer and right {outer, anti, semi} joins. Indicates whether the
+        // row in the DuplicateNode has been matched.
+        // From an abstraction point of view, this is an awkward place to store this
+        // information.
+        // TODO: Fold this flag in the next pointer below.
+        bool matched;
+
+        // Chain to next duplicate node, NULL when end of list.
+        DuplicateNode* next;
+        HtData htdata;
+    };
+
+    struct Bucket {
+        // Whether this bucket contains a vaild entry, or it is empty.
+        bool filled;
+
+        // Used for full outer and right {outer, anti, semi} joins. Indicates whether the
+        // row in the bucket has been matched.
+        // From an abstraction point of view, this is an awkward place to store this
+        // information but it is efficient. This space is otherwise unused.
+        bool matched;
+
+        // Used in case of duplicates. If true, then the bucketData union should be used as
+        // 'duplicates'.
+        bool hasDuplicates;
+
+        // Cache of the hash for data.
+        // TODO: Do we even have to cache the hash value?
+        uint32_t hash;
+
+        // Either the data for this bucket or the linked list of duplicates.
+        union {
+            HtData htdata;
+            DuplicateNode* duplicates;
+        } bucketData;
+    };
+
+public:
+    class Iterator;
+
+    // Returns a newly allocated PartitionedHashTable. The probing algorithm is set by the
+    // FLAG_enable_quadratic_probing.
+    //  - client: block mgr client to allocate data pages from.
+    //  - num_build_tuples: number of Tuples in the build tuple row.
+    //  - tuple_stream: the tuple stream which contains the tuple rows index by the
+    //    hash table. Can be NULL if the rows contain only a single tuple, in which
+    //    case the 'tuple_stream' is unused.
+    //  - max_num_buckets: the maximum number of buckets that can be stored. If we
+    //    try to grow the number of buckets to a larger number, the inserts will fail.
+    //    -1, if it unlimited.
+    //  - initial_num_buckets: number of buckets that the hash table should be initialized
+    //    with.
+    static PartitionedHashTable* create(RuntimeState* state, BufferedBlockMgr2::Client* client,
+            int num_build_tuples, BufferedTupleStream2* tuple_stream, int64_t max_num_buckets,
+            int64_t initial_num_buckets);
+
+    // Allocates the initial bucket structure. Returns false if OOM.
+    bool init();
+
+    // Call to cleanup any resources. Must be called once.
+    void close();
+
+    // Inserts the row to the hash table. Returns true if the insertion was successful.
+    // Always returns true if the table has free buckets and the key is not a duplicate.
+    // The caller is responsible for ensuring that the table has free buckets
+    // 'idx' is the index into _tuple_stream for this row. If the row contains more than
+    // one tuple, the 'idx' is stored instead of the 'row'. The 'row' is not copied by the
+    // hash table and the caller must guarantee it stays in memory. This will not grow the
+    // hash table. In the case that there is a need to insert a duplicate node, instead of
+    // filling a new bucket, and there is not enough memory to insert a duplicate node,
+    // the insert fails and this function returns false.
+    // Used during the build phase of hash joins.
+    bool IR_ALWAYS_INLINE insert(PartitionedHashTableCtx* ht_ctx,
+            const BufferedTupleStream2::RowIdx& idx, TupleRow* row, uint32_t hash);
+
+    // Same as insert() but for inserting a single Tuple. The 'tuple' is not copied by
+    // the hash table and the caller must guarantee it stays in memory.
+    bool IR_ALWAYS_INLINE insert(PartitionedHashTableCtx* ht_ctx, Tuple* tuple, uint32_t hash);
+
+    // Returns an iterator to the bucket matching the last row evaluated in 'ht_ctx'.
+    // Returns PartitionedHashTable::End() if no match is found. The iterator can be iterated until
+    // PartitionedHashTable::End() to find all the matching rows. Advancing the returned iterator will
+    // go to the next matching row. The matching rows do not need to be evaluated since all
+    // the nodes of a bucket are duplicates. One scan can be in progress for each 'ht_ctx'.
+    // Used during the probe phase of hash joins.
+    Iterator IR_ALWAYS_INLINE find(PartitionedHashTableCtx* ht_ctx, uint32_t hash);
+
+    // If a match is found in the table, return an iterator as in find(). If a match was
+    // not present, return an iterator pointing to the empty bucket where the key should
+    // be inserted. Returns End() if the table is full. The caller can set the data in
+    // the bucket using a Set*() method on the iterator.
+    Iterator IR_ALWAYS_INLINE find_bucket(PartitionedHashTableCtx* ht_ctx, uint32_t hash,
+            bool* found);
+
+    // Returns number of elements inserted in the hash table
+    int64_t size() const {
+        return _num_filled_buckets - _num_buckets_with_duplicates + _num_duplicate_nodes;
+    }
+
+    // Returns the number of empty buckets.
+    int64_t empty_buckets() const { return _num_buckets - _num_filled_buckets; }
+
+    // Returns the number of buckets
+    int64_t num_buckets() const { return _num_buckets; }
+
+    // Returns the load factor (the number of non-empty buckets)
+    double load_factor() const {
+        return static_cast<double>(_num_filled_buckets) / _num_buckets;
+    }
+
+    // Returns an estimate of the number of bytes needed to build the hash table
+    // structure for 'num_rows'. To do that, it estimates the number of buckets,
+    // rounded up to a power of two, and also assumes that there are no duplicates.
+    static int64_t EstimateNumBuckets(int64_t num_rows) {
+        // Assume max 66% fill factor and no duplicates.
+        return BitUtil::next_power_of_two(3 * num_rows / 2);
+    }
+    static int64_t EstimateSize(int64_t num_rows) {
+        int64_t num_buckets = EstimateNumBuckets(num_rows);
+        return num_buckets * sizeof(Bucket);
+    }
+
+    // Returns the memory occupied by the hash table, takes into account the number of
+    // duplicates.
+    int64_t current_mem_size() const;
+
+    // Calculates the fill factor if 'buckets_to_fill' additional buckets were to be
+    // filled and resizes the hash table so that the projected fill factor is below the
+    // max fill factor.
+    // If it returns true, then it is guaranteed at least 'rows_to_add' rows can be
+    // inserted without need to resize.
+    bool check_and_resize(uint64_t buckets_to_fill, PartitionedHashTableCtx* ht_ctx);
+
+    // Returns the number of bytes allocated to the hash table
+    int64_t byte_size() const { return _total_data_page_size; }
+
+    // Returns an iterator at the beginning of the hash table.  Advancing this iterator
+    // will traverse all elements.
+    Iterator begin(PartitionedHashTableCtx* ht_ctx);
+
+    // Return an iterator pointing to the first element (Bucket or DuplicateNode, if the
+    // bucket has duplicates) in the hash table that does not have its matched flag set.
+    // Used in right joins and full-outer joins.
+    Iterator first_unmatched(PartitionedHashTableCtx* ctx);
+
+    // Return true if there was a least one match.
+    bool HasMatches() const { return _has_matches; }
+
+    // Return end marker.
+    Iterator End() { return Iterator(); }
+
+    // Dump out the entire hash table to string.  If 'skip_empty', empty buckets are
+    // skipped.  If 'show_match', it also prints the matched flag of each node. If
+    // 'build_desc' is non-null, the build rows will be printed. Otherwise, only the
+    // the addresses of the build rows will be printed.
+    std::string debug_string(bool skip_empty, bool show_match,
+            const RowDescriptor* build_desc);
+
+    // Print the content of a bucket or node.
+    void debug_string_tuple(std::stringstream& ss, HtData& htdata, const RowDescriptor* desc);
+
+    // Update and print some statistics that can be used for performance debugging.
+    std::string print_stats() const;
+
+    // stl-like iterator interface.
+    class Iterator {
+    private:
+        // Bucket index value when probe is not successful.
+        static const int64_t BUCKET_NOT_FOUND = -1;
+
+    public:
+
+        Iterator() : _table(NULL), _row(NULL), _bucket_idx(BUCKET_NOT_FOUND), _node(NULL) { }
+
+        // Iterates to the next element. It should be called only if !AtEnd().
+        void IR_ALWAYS_INLINE next();
+
+        // Iterates to the next duplicate node. If the bucket does not have duplicates or
+        // when it reaches the last duplicate node, then it moves the Iterator to AtEnd().
+        // Used when we want to iterate over all the duplicate nodes bypassing the next()
+        // interface (e.g. in semi/outer joins without other_join_conjuncts, in order to
+        // iterate over all nodes of an unmatched bucket).
+        void IR_ALWAYS_INLINE next_duplicate();
+
+        // Iterates to the next element that does not have its matched flag set. Used in
+        // right-outer and full-outer joins.
+        void next_unmatched();
+
+        // Return the current row or tuple. Callers must check the iterator is not AtEnd()
+        // before calling them.  The returned row is owned by the iterator and valid until
+        // the next call to get_row(). It is safe to advance the iterator.
+        TupleRow* get_row() const;
+        Tuple* get_tuple() const;
+
+        // Set the current tuple for an empty bucket. Designed to be used with the
+        // iterator returned from find_bucket() in the case when the value is not found.
+        // It is not valid to call this function if the bucket already has an entry.
+        void set_tuple(Tuple* tuple, uint32_t hash);
+
+        // Sets as matched the Bucket or DuplicateNode currently pointed by the iterator,
+        // depending on whether the bucket has duplicates or not. The iterator cannot be
+        // AtEnd().
+        void set_matched();
+
+        // Returns the 'matched' flag of the current Bucket or DuplicateNode, depending on
+        // whether the bucket has duplicates or not. It should be called only if !AtEnd().
+        bool is_matched() const;
+
+        // Resets everything but the pointer to the hash table.
+        void set_at_end();
+
+        // Returns true if this iterator is at the end, i.e. get_row() cannot be called.
+        bool at_end() const { return _bucket_idx == BUCKET_NOT_FOUND; }
+
+    private:
+        friend class PartitionedHashTable;
+
+        Iterator(PartitionedHashTable* table, TupleRow* row, int bucket_idx, DuplicateNode* node)
+            : _table(table),
+            _row(row),
+            _bucket_idx(bucket_idx),
+            _node(node) {
+            }
+
+        PartitionedHashTable* _table;
+        TupleRow* _row;
+
+        // Current bucket idx.
+        // TODO: Use uint32_t?
+        int64_t _bucket_idx;
+
+        // Pointer to the current duplicate node.
+        DuplicateNode* _node;
+    };
+
+private:
+    friend class Iterator;
+    friend class PartitionedHashTableTest;
+
+    // Hash table constructor. Private because Create() should be used, instead
+    // of calling this constructor directly.
+    //  - quadratic_probing: set to true when the probing algorithm is quadratic, as
+    //    opposed to linear.
+    PartitionedHashTable(bool quadratic_probing, RuntimeState* state, BufferedBlockMgr2::Client* client,
+            int num_build_tuples, BufferedTupleStream2* tuple_stream,
+            int64_t max_num_buckets, int64_t initial_num_buckets);
+
+    // Performs the probing operation according to the probing algorithm (linear or
+    // quadratic. Returns one of the following:
+    // (a) the index of the bucket that contains the entry that matches with the last row
+    //     evaluated in 'ht_ctx'. If 'ht_ctx' is NULL then it does not check for row
+    //     equality and returns the index of the first empty bucket.
+    // (b) the index of the first empty bucket according to the probing algorithm (linear
+    //     or quadratic), if the entry is not in the hash table or 'ht_ctx' is NULL.
+    // (c) Iterator::BUCKET_NOT_FOUND if the probe was not successful, i.e. the maximum
+    //     distance was traveled without finding either an empty or a matching bucket.
+    // Using the returned index value, the caller can create an iterator that can be
+    // iterated until End() to find all the matching rows.
+    // EvalAndHashBuild() or EvalAndHashProb(e) must have been called before calling this.
+    // 'hash' must be the hash returned by these functions.
+    // 'found' indicates that a bucket that contains an equal row is found.
+    //
+    // There are wrappers of this function that perform the find and insert logic.
+    int64_t IR_ALWAYS_INLINE probe(Bucket* buckets, int64_t num_buckets,
+            PartitionedHashTableCtx* ht_ctx, uint32_t hash,  bool* found);
+
+    // Performs the insert logic. Returns the HtData* of the bucket or duplicate node
+    // where the data should be inserted. Returns NULL if the insert was not successful.
+    HtData* IR_ALWAYS_INLINE insert_internal(PartitionedHashTableCtx* ht_ctx, uint32_t hash);
+
+    // Updates 'bucket_idx' to the index of the next non-empty bucket. If the bucket has
+    // duplicates, 'node' will be pointing to the head of the linked list of duplicates.
+    // Otherwise, 'node' should not be used. If there are no more buckets, sets
+    // 'bucket_idx' to BUCKET_NOT_FOUND.
+    void next_filled_bucket(int64_t* bucket_idx, DuplicateNode** node);
+
+    // Resize the hash table to 'num_buckets'. Returns false on OOM.
+    bool resize_buckets(int64_t num_buckets, PartitionedHashTableCtx* ht_ctx);
+
+    // Appends the DuplicateNode pointed by _next_node to 'bucket' and moves the _next_node
+    // pointer to the next DuplicateNode in the page, updating the remaining node counter.
+    DuplicateNode* IR_ALWAYS_INLINE append_next_node(Bucket* bucket);
+
+    // Creates a new DuplicateNode for a entry and chains it to the bucket with index
+    // 'bucket_idx'. The duplicate nodes of a bucket are chained as a linked list.
+    // This places the new duplicate node at the beginning of the list. If this is the
+    // first duplicate entry inserted in this bucket, then the entry already contained by
+    // the bucket is converted to a DuplicateNode. That is, the contents of 'data' of the
+    // bucket are copied to a DuplicateNode and 'data' is updated to pointing to a
+    // DuplicateNode.
+    // Returns NULL if the node array could not grow, i.e. there was not enough memory to
+    // allocate a new DuplicateNode.
+    DuplicateNode* IR_ALWAYS_INLINE insert_duplicate_node(int64_t bucket_idx);
+
+    // Resets the contents of the empty bucket with index 'bucket_idx', in preparation for
+    // an insert. Sets all the fields of the bucket other than 'data'.
+    void IR_ALWAYS_INLINE prepare_bucket_for_insert(int64_t bucket_idx, uint32_t hash);
+
+    // Return the TupleRow pointed by 'htdata'.
+    TupleRow* get_row(HtData& htdata, TupleRow* row) const;
+
+    // Returns the TupleRow of the pointed 'bucket'. In case of duplicates, it
+    // returns the content of the first chained duplicate node of the bucket.
+    TupleRow* get_row(Bucket* bucket, TupleRow* row) const;
+
+    // Grow the node array. Returns false on OOM.
+    bool grow_node_array();
+
+    // Load factor that will trigger growing the hash table on insert.  This is
+    // defined as the number of non-empty buckets / total_buckets
+    static const double MAX_FILL_FACTOR;
+
+    RuntimeState* _state;
+
+    // Client to allocate data pages with.
+    BufferedBlockMgr2::Client* _block_mgr_client;
+
+    // Stream contains the rows referenced by the hash table. Can be NULL if the
+    // row only contains a single tuple, in which case the TupleRow indirection
+    // is removed by the hash table.
+    BufferedTupleStream2* _tuple_stream;
+
+    // Constants on how the hash table should behave. Joins and aggs have slightly
+    // different behavior.
+    // TODO: these constants are an ideal candidate to be removed with codegen.
+    // TODO: ..or with template-ization
+    const bool _stores_tuples;
+
+    // Quadratic probing enabled (as opposed to linear).
+    const bool _quadratic_probing;
+
+    // Data pages for all nodes. These are always pinned.
+    std::vector<BufferedBlockMgr2::Block*> _data_pages;
+
+    // Byte size of all buffers in _data_pages.
+    int64_t _total_data_page_size;
+
+    // Next duplicate node to insert. Vaild when _node_remaining_current_page > 0.
+    DuplicateNode* _next_node;
+
+    // Number of nodes left in the current page.
+    int _node_remaining_current_page;
+
+    // Number of duplicate nodes.
+    int64_t _num_duplicate_nodes;
+
+    const int64_t _max_num_buckets;
+
+    // Array of all buckets. Owned by this node. Using c-style array to control
+    // control memory footprint.
+    Bucket* _buckets;
+
+    // Total number of buckets (filled and empty).
+    int64_t _num_buckets;
+
+    // Number of non-empty buckets.  Used to determine when to resize.
+    int64_t _num_filled_buckets;
+
+    // Number of (non-empty) buckets with duplicates. These buckets do not point to slots
+    // in the tuple stream, rather than to a linked list of Nodes.
+    int64_t _num_buckets_with_duplicates;
+
+    // Number of build tuples, used for constructing temp row* for probes.
+    // TODO: We should remove it.
+    const int _num_build_tuples;
+
+    // Flag used to disable spilling hash tables that already had matches in case of
+    // right joins (IMPALA-1488).
+    // TODO: Not fail when spilling hash tables with matches in right joins
+    bool _has_matches;
+
+    // The stats below can be used for debugging perf.
+    // TODO: Should we make these statistics atomic?
+    // Number of find(), insert(), or find_bucket() calls that probe the hash table.
+    int64_t _num_probes;
+
+    // Number of probes that failed and had to fall back to linear probing without cap.
+    int64_t _num_failed_probes;
+
+    // Total distance traveled for each probe. That is the sum of the diff between the end
+    // position of a probe (find/insert) and its start position
+    // (hash & (_num_buckets - 1)).
+    int64_t _travel_length;
+
+    // The number of cases where we had to compare buckets with the same hash value, but
+    // the row equality failed.
+    int64_t _num_hash_collisions;
+
+    // How many times this table has resized so far.
+    int64_t _num_resizes;
+};
+
+} // end namespace doris
+
+#endif // DORIS_BE_SRC_EXEC_PARTITIONED_HASH_TABLE_H
diff --git a/be/src/exec/partitioned_hash_table.inline.h b/be/src/exec/partitioned_hash_table.inline.h
new file mode 100644
index 0000000..09abc3d
--- /dev/null
+++ b/be/src/exec/partitioned_hash_table.inline.h
@@ -0,0 +1,379 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef DORIS_BE_SRC_EXEC_PARTITIONED_HASH_TABLE_INLINE_H
+#define DORIS_BE_SRC_EXEC_PARTITIONED_HASH_TABLE_INLINE_H
+
+#include "exec/partitioned_hash_table.h"
+
+namespace doris {
+
+inline bool PartitionedHashTableCtx::eval_and_hash_build(TupleRow* row, uint32_t* hash) {
+    bool has_null = EvalBuildRow(row);
+    if (!_stores_nulls && has_null) {
+        return false;
+    }
+    *hash = HashCurrentRow();
+    return true;
+}
+
+inline bool PartitionedHashTableCtx::eval_and_hash_probe(TupleRow* row, uint32_t* hash) {
+    bool has_null = EvalProbeRow(row);
+    if ((!_stores_nulls || !_finds_nulls) && has_null) {
+        return false;
+    }
+    *hash = HashCurrentRow();
+    return true;
+}
+
+inline int64_t PartitionedHashTable::probe(Bucket* buckets, int64_t num_buckets,
+        PartitionedHashTableCtx* ht_ctx, uint32_t hash, bool* found) {
+    DCHECK(buckets != NULL);
+    DCHECK_GT(num_buckets, 0);
+    *found = false;
+    int64_t bucket_idx = hash & (num_buckets - 1);
+
+    // In case of linear probing it counts the total number of steps for statistics and
+    // for knowing when to exit the loop (e.g. by capping the total travel length). In case
+    // of quadratic probing it is also used for calculating the length of the next jump.
+    int64_t step = 0;
+    do {
+        Bucket* bucket = &buckets[bucket_idx];
+        if (!bucket->filled) {
+            return bucket_idx;
+        }
+        if (hash == bucket->hash) {
+            if (ht_ctx != NULL && ht_ctx->equals(get_row(bucket, ht_ctx->_row))) {
+                *found = true;
+                return bucket_idx;
+            }
+            // Row equality failed, or not performed. This is a hash collision. Continue
+            // searching.
+            ++_num_hash_collisions;
+        }
+        // Move to the next bucket.
+        ++step;
+        ++_travel_length;
+        if (_quadratic_probing) {
+            // The i-th probe location is idx = (hash + (step * (step + 1)) / 2) mod num_buckets.
+            // This gives num_buckets unique idxs (between 0 and N-1) when num_buckets is a power
+            // of 2.
+            bucket_idx = (bucket_idx + step) & (num_buckets - 1);
+        } else {
+            bucket_idx = (bucket_idx + 1) & (num_buckets - 1);
+        }
+    } while (LIKELY(step < num_buckets));
+    DCHECK_EQ(_num_filled_buckets, num_buckets) << "Probing of a non-full table "
+        << "failed: " << _quadratic_probing << " " << hash;
+    return Iterator::BUCKET_NOT_FOUND;
+}
+
+inline PartitionedHashTable::HtData* PartitionedHashTable::insert_internal(
+        PartitionedHashTableCtx* ht_ctx, uint32_t hash) {
+    ++_num_probes;
+    bool found = false;
+    int64_t bucket_idx = probe(_buckets, _num_buckets, ht_ctx, hash, &found);
+    DCHECK_NE(bucket_idx, Iterator::BUCKET_NOT_FOUND);
+    if (found) {
+        // We need to insert a duplicate node, note that this may fail to allocate memory.
+        DuplicateNode* new_node = insert_duplicate_node(bucket_idx);
+        if (UNLIKELY(new_node == NULL)) {
+            return NULL;
+        }
+        return &new_node->htdata;
+    } else {
+        prepare_bucket_for_insert(bucket_idx, hash);
+        return &_buckets[bucket_idx].bucketData.htdata;
+    }
+}
+
+inline bool PartitionedHashTable::insert(PartitionedHashTableCtx* ht_ctx,
+        const BufferedTupleStream2::RowIdx& idx, TupleRow* row, uint32_t hash) {
+    if (_stores_tuples) {
+        return insert(ht_ctx, row->get_tuple(0), hash);
+    }
+    HtData* htdata = insert_internal(ht_ctx, hash);
+    // If successful insert, update the contents of the newly inserted entry with 'idx'.
+    if (LIKELY(htdata != NULL)) {
+        htdata->idx = idx;
+        return true;
+    }
+    return false;
+}
+
+inline bool PartitionedHashTable::insert(
+        PartitionedHashTableCtx* ht_ctx, Tuple* tuple, uint32_t hash) {
+    DCHECK(_stores_tuples);
+    HtData* htdata = insert_internal(ht_ctx, hash);
+    // If successful insert, update the contents of the newly inserted entry with 'tuple'.
+    if (LIKELY(htdata != NULL)) {
+        htdata->tuple = tuple;
+        return true;
+    }
+    return false;
+}
+
+inline PartitionedHashTable::Iterator PartitionedHashTable::find(
+        PartitionedHashTableCtx* ht_ctx, uint32_t hash) {
+    ++_num_probes;
+    bool found = false;
+    int64_t bucket_idx = probe(_buckets, _num_buckets, ht_ctx, hash, &found);
+    if (found) {
+        return Iterator(this, ht_ctx->row(), bucket_idx,
+                _buckets[bucket_idx].bucketData.duplicates);
+    }
+    return End();
+}
+
+inline PartitionedHashTable::Iterator PartitionedHashTable::find_bucket(
+        PartitionedHashTableCtx* ht_ctx, uint32_t hash,
+        bool* found) {
+    ++_num_probes;
+    int64_t bucket_idx = probe(_buckets, _num_buckets, ht_ctx, hash, found);
+    DuplicateNode* duplicates = LIKELY(bucket_idx != Iterator::BUCKET_NOT_FOUND) ?
+        _buckets[bucket_idx].bucketData.duplicates : NULL;
+    return Iterator(this, ht_ctx->row(), bucket_idx, duplicates);
+}
+
+inline PartitionedHashTable::Iterator PartitionedHashTable::begin(PartitionedHashTableCtx* ctx) {
+    int64_t bucket_idx = Iterator::BUCKET_NOT_FOUND;
+    DuplicateNode* node = NULL;
+    next_filled_bucket(&bucket_idx, &node);
+    return Iterator(this, ctx->row(), bucket_idx, node);
+}
+
+inline PartitionedHashTable::Iterator PartitionedHashTable::first_unmatched(
+        PartitionedHashTableCtx* ctx) {
+    int64_t bucket_idx = Iterator::BUCKET_NOT_FOUND;
+    DuplicateNode* node = NULL;
+    next_filled_bucket(&bucket_idx, &node);
+    Iterator it(this, ctx->row(), bucket_idx, node);
+    // Check whether the bucket, or its first duplicate node, is matched. If it is not
+    // matched, then return. Otherwise, move to the first unmatched entry (node or bucket).
+    Bucket* bucket = &_buckets[bucket_idx];
+    if ((!bucket->hasDuplicates && bucket->matched) ||
+            (bucket->hasDuplicates && node->matched)) {
+        it.next_unmatched();
+    }
+    return it;
+}
+
+inline void PartitionedHashTable::next_filled_bucket(int64_t* bucket_idx, DuplicateNode** node) {
+    ++*bucket_idx;
+    for (; *bucket_idx < _num_buckets; ++*bucket_idx) {
+        if (_buckets[*bucket_idx].filled) {
+            *node = _buckets[*bucket_idx].bucketData.duplicates;
+            return;
+        }
+    }
+    // Reached the end of the hash table.
+    *bucket_idx = Iterator::BUCKET_NOT_FOUND;
+    *node = NULL;
+}
+
+inline void PartitionedHashTable::prepare_bucket_for_insert(int64_t bucket_idx, uint32_t hash) {
+    DCHECK_GE(bucket_idx, 0);
+    DCHECK_LT(bucket_idx, _num_buckets);
+    Bucket* bucket = &_buckets[bucket_idx];
+    DCHECK(!bucket->filled);
+    ++_num_filled_buckets;
+    bucket->filled = true;
+    bucket->matched = false;
+    bucket->hasDuplicates = false;
+    bucket->hash = hash;
+}
+
+inline PartitionedHashTable::DuplicateNode* PartitionedHashTable::append_next_node(
+        Bucket* bucket) {
+    DCHECK_GT(_node_remaining_current_page, 0);
+    bucket->bucketData.duplicates = _next_node;
+    ++_num_duplicate_nodes;
+    --_node_remaining_current_page;
+    return _next_node++;
+}
+
+inline PartitionedHashTable::DuplicateNode* PartitionedHashTable::insert_duplicate_node(
+        int64_t bucket_idx) {
+    DCHECK_GE(bucket_idx, 0);
+    DCHECK_LT(bucket_idx, _num_buckets);
+    Bucket* bucket = &_buckets[bucket_idx];
+    DCHECK(bucket->filled);
+    // Allocate one duplicate node for the new data and one for the preexisting data,
+    // if needed.
+    while (_node_remaining_current_page < 1 + !bucket->hasDuplicates) {
+        if (UNLIKELY(!grow_node_array())) {
+            return NULL;
+        }
+    }
+    if (!bucket->hasDuplicates) {
+        // This is the first duplicate in this bucket. It means that we need to convert
+        // the current entry in the bucket to a node and link it from the bucket.
+        _next_node->htdata.idx = bucket->bucketData.htdata.idx;
+        DCHECK(!bucket->matched);
+        _next_node->matched = false;
+        _next_node->next = NULL;
+        append_next_node(bucket);
+        bucket->hasDuplicates = true;
+        ++_num_buckets_with_duplicates;
+    }
+    // Link a new node.
+    _next_node->next = bucket->bucketData.duplicates;
+    _next_node->matched = false;
+    return append_next_node(bucket);
+}
+
+inline TupleRow* PartitionedHashTable::get_row(HtData& htdata, TupleRow* row) const {
+    if (_stores_tuples) {
+        return reinterpret_cast<TupleRow*>(&htdata.tuple);
+    } else {
+        _tuple_stream->get_tuple_row(htdata.idx, row);
+        return row;
+    }
+}
+
+inline TupleRow* PartitionedHashTable::get_row(Bucket* bucket, TupleRow* row) const {
+    DCHECK(bucket != NULL);
+    if (UNLIKELY(bucket->hasDuplicates)) {
+        DuplicateNode* duplicate = bucket->bucketData.duplicates;
+        DCHECK(duplicate != NULL);
+        return get_row(duplicate->htdata, row);
+    } else {
+        return get_row(bucket->bucketData.htdata, row);
+    }
+}
+
+inline TupleRow* PartitionedHashTable::Iterator::get_row() const {
+    DCHECK(!at_end());
+    DCHECK(_table != NULL);
+    DCHECK(_row != NULL);
+    Bucket* bucket = &_table->_buckets[_bucket_idx];
+    if (UNLIKELY(bucket->hasDuplicates)) {
+        DCHECK(_node != NULL);
+        return _table->get_row(_node->htdata, _row);
+    } else {
+        return _table->get_row(bucket->bucketData.htdata, _row);
+    }
+}
+
+inline Tuple* PartitionedHashTable::Iterator::get_tuple() const {
+    DCHECK(!at_end());
+    DCHECK(_table->_stores_tuples);
+    Bucket* bucket = &_table->_buckets[_bucket_idx];
+    // TODO: To avoid the hasDuplicates check, store the HtData* in the Iterator.
+    if (UNLIKELY(bucket->hasDuplicates)) {
+        DCHECK(_node != NULL);
+        return _node->htdata.tuple;
+    } else {
+        return bucket->bucketData.htdata.tuple;
+    }
+}
+
+inline void PartitionedHashTable::Iterator::set_tuple(Tuple* tuple, uint32_t hash) {
+    DCHECK(!at_end());
+    DCHECK(_table->_stores_tuples);
+    _table->prepare_bucket_for_insert(_bucket_idx, hash);
+    _table->_buckets[_bucket_idx].bucketData.htdata.tuple = tuple;
+}
+
+inline void PartitionedHashTable::Iterator::set_matched() {
+    DCHECK(!at_end());
+    Bucket* bucket = &_table->_buckets[_bucket_idx];
+    if (bucket->hasDuplicates) {
+        _node->matched = true;
+    } else {
+        bucket->matched = true;
+    }
+    // Used for disabling spilling of hash tables in right and full-outer joins with
+    // matches. See IMPALA-1488.
+    _table->_has_matches = true;
+}
+
+inline bool PartitionedHashTable::Iterator::is_matched() const {
+    DCHECK(!at_end());
+    Bucket* bucket = &_table->_buckets[_bucket_idx];
+    if (bucket->hasDuplicates) {
+        return _node->matched;
+    }
+    return bucket->matched;
+}
+
+inline void PartitionedHashTable::Iterator::set_at_end() {
+    _bucket_idx = BUCKET_NOT_FOUND;
+    _node = NULL;
+}
+
+inline void PartitionedHashTable::Iterator::next() {
+    DCHECK(!at_end());
+    if (_table->_buckets[_bucket_idx].hasDuplicates && _node->next != NULL) {
+        _node = _node->next;
+    } else {
+        _table->next_filled_bucket(&_bucket_idx, &_node);
+    }
+}
+
+inline void PartitionedHashTable::Iterator::next_duplicate() {
+    DCHECK(!at_end());
+    if (_table->_buckets[_bucket_idx].hasDuplicates && _node->next != NULL) {
+        _node = _node->next;
+    } else {
+        _bucket_idx = BUCKET_NOT_FOUND;
+        _node = NULL;
+    }
+}
+
+inline void PartitionedHashTable::Iterator::next_unmatched() {
+    DCHECK(!at_end());
+    Bucket* bucket = &_table->_buckets[_bucket_idx];
+    // Check if there is any remaining unmatched duplicate node in the current bucket.
+    if (bucket->hasDuplicates) {
+        while (_node->next != NULL) {
+            _node = _node->next;
+            if (!_node->matched) {
+                return;
+            }
+        }
+    }
+    // Move to the next filled bucket and return if this bucket is not matched or
+    // iterate to the first not matched duplicate node.
+    _table->next_filled_bucket(&_bucket_idx, &_node);
+    while (_bucket_idx != Iterator::BUCKET_NOT_FOUND) {
+        bucket = &_table->_buckets[_bucket_idx];
+        if (!bucket->hasDuplicates) {
+            if (!bucket->matched) {
+                return;
+            }
+        } else {
+            while (_node->matched && _node->next != NULL) {
+                _node = _node->next;
+            }
+            if (!_node->matched) {
+                return;
+            }
+        }
+        _table->next_filled_bucket(&_bucket_idx, &_node);
+    }
+}
+
+inline void PartitionedHashTableCtx::set_level(int level) {
+    DCHECK_GE(level, 0);
+    DCHECK_LT(level, _seeds.size());
+    _level = level;
+}
+
+} // end namespace doris
+
+#endif // DORIS_BE_SRC_EXEC_PARTITIONED_HASH_TABLE_INLINE_H


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org