You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2017/08/02 06:28:27 UTC
[4/6] incubator-impala git commit: IMPALA-4674: Part 1: remove old
aggs and joins
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/507bd8be/be/src/exec/hash-join-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hash-join-node.cc b/be/src/exec/hash-join-node.cc
deleted file mode 100644
index c35d05d..0000000
--- a/be/src/exec/hash-join-node.cc
+++ /dev/null
@@ -1,673 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "exec/hash-join-node.h"
-
-#include <functional>
-#include <numeric>
-#include <sstream>
-
-#include "codegen/llvm-codegen.h"
-#include "exec/old-hash-table.inline.h"
-#include "exprs/scalar-expr.h"
-#include "gutil/strings/substitute.h"
-#include "runtime/mem-tracker.h"
-#include "runtime/row-batch.h"
-#include "runtime/runtime-filter.h"
-#include "runtime/runtime-filter-bank.h"
-#include "runtime/runtime-state.h"
-#include "runtime/tuple-row.h"
-#include "util/debug-util.h"
-#include "util/bloom-filter.h"
-#include "util/runtime-profile-counters.h"
-
-#include "gen-cpp/PlanNodes_types.h"
-
-#include "common/names.h"
-
-DEFINE_bool(enable_probe_side_filtering, true, "Deprecated.");
-
-using namespace impala;
-using namespace llvm;
-using namespace strings;
-
-const char* HashJoinNode::LLVM_CLASS_NAME = "class.impala::HashJoinNode";
-
-HashJoinNode::HashJoinNode(
- ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
- : BlockingJoinNode("HashJoinNode", tnode.hash_join_node.join_op, pool, tnode, descs),
- is_not_distinct_from_(),
- codegen_process_build_batch_fn_(NULL),
- process_build_batch_fn_(NULL),
- process_probe_batch_fn_(NULL) {
- // The hash join node does not support cross or anti joins
- DCHECK_NE(join_op_, TJoinOp::CROSS_JOIN);
- DCHECK_NE(join_op_, TJoinOp::LEFT_ANTI_JOIN);
- DCHECK_NE(join_op_, TJoinOp::RIGHT_SEMI_JOIN);
- DCHECK_NE(join_op_, TJoinOp::RIGHT_ANTI_JOIN);
- DCHECK_NE(join_op_, TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN);
-
- match_all_probe_ =
- join_op_ == TJoinOp::LEFT_OUTER_JOIN || join_op_ == TJoinOp::FULL_OUTER_JOIN;
- match_one_build_ = join_op_ == TJoinOp::LEFT_SEMI_JOIN;
- match_all_build_ =
- join_op_ == TJoinOp::RIGHT_OUTER_JOIN || join_op_ == TJoinOp::FULL_OUTER_JOIN;
-}
-
-Status HashJoinNode::Init(const TPlanNode& tnode, RuntimeState* state) {
- RETURN_IF_ERROR(BlockingJoinNode::Init(tnode, state));
- DCHECK(tnode.__isset.hash_join_node);
- const vector<TEqJoinCondition>& eq_join_conjuncts =
- tnode.hash_join_node.eq_join_conjuncts;
-
- for (int i = 0; i < eq_join_conjuncts.size(); ++i) {
- ScalarExpr* probe_expr;
- RETURN_IF_ERROR(ScalarExpr::Create(
- eq_join_conjuncts[i].left, *child(0)->row_desc(), state, &probe_expr));
- probe_exprs_.push_back(probe_expr);
- ScalarExpr* build_expr;
- RETURN_IF_ERROR(ScalarExpr::Create(
- eq_join_conjuncts[i].right, *child(1)->row_desc(), state, &build_expr));
- build_exprs_.push_back(build_expr);
- is_not_distinct_from_.push_back(eq_join_conjuncts[i].is_not_distinct_from);
- }
-
- // other_join_conjunct_evals_ are evaluated in the context of rows assembled from
- // all build and probe tuples; full_row_desc is not necessarily the same as the output
- // row desc, e.g., because semi joins only return the build xor probe tuples
- RowDescriptor full_row_desc(*child(0)->row_desc(), *child(1)->row_desc());
- RETURN_IF_ERROR(ScalarExpr::Create(tnode.hash_join_node.other_join_conjuncts,
- full_row_desc, state, &other_join_conjuncts_));
-
- for (const TRuntimeFilterDesc& tfilter: tnode.runtime_filters) {
- // If filter propagation not enabled, only consider building broadcast joins (that may
- // be consumed by this fragment).
- if (state->query_options().runtime_filter_mode != TRuntimeFilterMode::GLOBAL &&
- !tfilter.is_broadcast_join) {
- continue;
- }
- if (state->query_options().disable_row_runtime_filtering &&
- !tfilter.applied_on_partition_columns) {
- continue;
- }
- filters_.push_back(state->filter_bank()->RegisterFilter(tfilter, true));
- ScalarExpr* filter_expr;
- RETURN_IF_ERROR(
- ScalarExpr::Create(tfilter.src_expr, *child(1)->row_desc(), state, &filter_expr));
- filter_exprs_.push_back(filter_expr);
- }
- return Status::OK();
-}
-
-Status HashJoinNode::Prepare(RuntimeState* state) {
- SCOPED_TIMER(runtime_profile_->total_time_counter());
- RETURN_IF_ERROR(BlockingJoinNode::Prepare(state));
-
- build_buckets_counter_ =
- ADD_COUNTER(runtime_profile(), "BuildBuckets", TUnit::UNIT);
- hash_tbl_load_factor_counter_ =
- ADD_COUNTER(runtime_profile(), "LoadFactor", TUnit::DOUBLE_VALUE);
-
- // build and probe exprs are evaluated in the context of the rows produced by our
- // right and left children, respectively
- RETURN_IF_ERROR(ScalarExprEvaluator::Create(other_join_conjuncts_, state, pool_,
- expr_mem_pool(), &other_join_conjunct_evals_));
- AddEvaluatorsToFree(other_join_conjunct_evals_);
-
- // TODO: default buckets
- const bool stores_nulls = join_op_ == TJoinOp::RIGHT_OUTER_JOIN
- || join_op_ == TJoinOp::FULL_OUTER_JOIN
- || std::accumulate(is_not_distinct_from_.begin(), is_not_distinct_from_.end(),
- false, std::logical_or<bool>());
-
- RETURN_IF_ERROR(OldHashTable::Create(pool_, state, build_exprs_, probe_exprs_,
- filter_exprs_, child(1)->row_desc()->tuple_descriptors().size(), stores_nulls,
- is_not_distinct_from_, state->fragment_hash_seed(), mem_tracker(), filters_,
- &hash_tbl_));
- build_pool_.reset(new MemPool(mem_tracker()));
- AddCodegenDisabledMessage(state);
- return Status::OK();
-}
-
-void HashJoinNode::Codegen(RuntimeState* state) {
- DCHECK(state->ShouldCodegen());
- ExecNode::Codegen(state);
- if (IsNodeCodegenDisabled()) return;
-
- LlvmCodeGen* codegen = state->codegen();
- DCHECK(codegen != NULL);
- bool build_codegen_enabled = false;
- bool probe_codegen_enabled = false;
-
- // Codegen for hashing rows
- Function* hash_fn = hash_tbl_->CodegenHashCurrentRow(codegen);
- if (hash_fn != NULL) {
- // Codegen for build path
- codegen_process_build_batch_fn_ = CodegenProcessBuildBatch(codegen, hash_fn);
- if (codegen_process_build_batch_fn_ != NULL) {
- codegen->AddFunctionToJit(codegen_process_build_batch_fn_,
- reinterpret_cast<void**>(&process_build_batch_fn_));
- build_codegen_enabled = true;
- }
-
- // Codegen for probe path (only for left joins)
- if (!match_all_build_) {
- Function* codegen_process_probe_batch_fn =
- CodegenProcessProbeBatch(codegen, hash_fn);
- if (codegen_process_probe_batch_fn != NULL) {
- codegen->AddFunctionToJit(codegen_process_probe_batch_fn,
- reinterpret_cast<void**>(&process_probe_batch_fn_));
- probe_codegen_enabled = true;
- }
- }
- }
- runtime_profile()->AddCodegenMsg(build_codegen_enabled, "", "Build Side");
- runtime_profile()->AddCodegenMsg(probe_codegen_enabled, "", "Probe Side");
-}
-
-Status HashJoinNode::Reset(RuntimeState* state) {
- DCHECK(false) << "NYI";
- return Status("NYI");
-}
-
-void HashJoinNode::Close(RuntimeState* state) {
- if (is_closed()) return;
- if (hash_tbl_.get() != NULL) hash_tbl_->Close(state);
- if (build_pool_.get() != NULL) build_pool_->FreeAll();
- ScalarExprEvaluator::Close(other_join_conjunct_evals_, state);
- ScalarExpr::Close(probe_exprs_);
- ScalarExpr::Close(build_exprs_);
- ScalarExpr::Close(other_join_conjuncts_);
- ScalarExpr::Close(filter_exprs_);
- BlockingJoinNode::Close(state);
-}
-
-Status HashJoinNode::Open(RuntimeState* state) {
- SCOPED_TIMER(runtime_profile_->total_time_counter());
- RETURN_IF_ERROR(BlockingJoinNode::Open(state));
- RETURN_IF_ERROR(hash_tbl_->Open(state));
- RETURN_IF_ERROR(ScalarExprEvaluator::Open(other_join_conjunct_evals_, state));
-
- // Check for errors and free local allocations before opening children.
- RETURN_IF_CANCELLED(state);
- RETURN_IF_ERROR(QueryMaintenance(state));
-
- RETURN_IF_ERROR(BlockingJoinNode::ProcessBuildInputAndOpenProbe(state, NULL));
- RETURN_IF_ERROR(BlockingJoinNode::GetFirstProbeRow(state));
- InitGetNext();
- return Status::OK();
-}
-
-Status HashJoinNode::QueryMaintenance(RuntimeState* state) {
- if (hash_tbl_.get() != nullptr) hash_tbl_->FreeLocalAllocations();
- return ExecNode::QueryMaintenance(state);
-}
-
-Status HashJoinNode::ProcessBuildInput(RuntimeState* state) {
- // Do a full scan of child(1) and store everything in hash_tbl_
- // The hash join node needs to keep in memory all build tuples, including the tuple
- // row ptrs. The row ptrs are copied into the hash table's internal structure so they
- // don't need to be stored in the build_pool_.
- RowBatch build_batch(child(1)->row_desc(), state->batch_size(), mem_tracker());
- while (true) {
- RETURN_IF_CANCELLED(state);
- RETURN_IF_ERROR(QueryMaintenance(state));
- bool eos;
- {
- SCOPED_STOP_WATCH(&built_probe_overlap_stop_watch_);
- RETURN_IF_ERROR(child(1)->GetNext(state, &build_batch, &eos));
- }
- SCOPED_TIMER(build_timer_);
- // take ownership of tuple data of build_batch
- build_pool_->AcquireData(build_batch.tuple_data_pool(), false);
- RETURN_IF_ERROR(QueryMaintenance(state));
-
- // Call codegen version if possible
- if (process_build_batch_fn_ == NULL) {
- ProcessBuildBatch(&build_batch);
- } else {
- process_build_batch_fn_(this, &build_batch);
- }
- VLOG_ROW << hash_tbl_->DebugString(true, false, child(1)->row_desc());
-
- COUNTER_SET(build_row_counter_, hash_tbl_->size());
- COUNTER_SET(build_buckets_counter_, hash_tbl_->num_buckets());
- COUNTER_SET(hash_tbl_load_factor_counter_, hash_tbl_->load_factor());
- build_batch.Reset();
- DCHECK(!build_batch.AtCapacity());
- if (eos) break;
- }
-
- if (filters_.size() > 0) {
- int num_enabled_filters = hash_tbl_->AddBloomFilters();
- if (num_enabled_filters == filters_.size()) {
- runtime_profile()->AppendExecOption(
- Substitute("$0 of $0 Runtime Filter$1 Published", filters_.size(),
- filters_.size() == 1 ? "" : "s"));
- } else {
- string exec_option = Substitute("$0 of $1 Runtime Filter$2 Published, $3 Disabled",
- num_enabled_filters, filters_.size(), filters_.size() == 1 ? "" : "s",
- filters_.size() - num_enabled_filters);
- runtime_profile()->AppendExecOption(exec_option);
- }
- }
-
- return Status::OK();
-}
-
-void HashJoinNode::InitGetNext() {
- if (current_probe_row_ == NULL) {
- hash_tbl_iterator_ = hash_tbl_->Begin();
- } else {
- matched_probe_ = false;
- hash_tbl_iterator_ = hash_tbl_->Find(current_probe_row_);
- }
-}
-
-Status HashJoinNode::GetNext(RuntimeState* state, RowBatch* out_batch, bool* eos) {
- SCOPED_TIMER(runtime_profile_->total_time_counter());
- RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::GETNEXT, state));
- RETURN_IF_CANCELLED(state);
- RETURN_IF_ERROR(QueryMaintenance(state));
- if (ReachedLimit()) {
- *eos = true;
- return Status::OK();
- }
- *eos = false;
-
- // These cases are simpler and use a more efficient processing loop
- if (!match_all_build_) {
- if (eos_) {
- *eos = true;
- return Status::OK();
- }
- return LeftJoinGetNext(state, out_batch, eos);
- }
-
- const int num_other_conjuncts = other_join_conjuncts_.size();
- DCHECK_EQ(num_other_conjuncts, other_join_conjunct_evals_.size());
-
- const int num_conjuncts = conjuncts_.size();
- DCHECK_EQ(num_conjuncts, conjunct_evals_.size());
-
- // Explicitly manage the timer counter to avoid measuring time in the child
- // GetNext call.
- ScopedTimer<MonotonicStopWatch> probe_timer(probe_timer_);
-
- while (!eos_) {
- // create output rows as long as:
- // 1) we haven't already created an output row for the probe row and are doing
- // a semi-join;
- // 2) there are more matching build rows
- while (!hash_tbl_iterator_.AtEnd()) {
- int row_idx = out_batch->AddRow();
- TupleRow* out_row = out_batch->GetRow(row_idx);
-
- TupleRow* matched_build_row = hash_tbl_iterator_.GetRow();
- CreateOutputRow(out_row, current_probe_row_, matched_build_row);
- if (!EvalConjuncts(other_join_conjunct_evals_.data(),
- num_other_conjuncts, out_row)) {
- hash_tbl_iterator_.Next<true>();
- continue;
- }
- // we have a match for the purpose of the (outer?) join as soon as we
- // satisfy the JOIN clause conjuncts
- matched_probe_ = true;
- if (match_all_build_) {
- // remember that we matched this build row
- hash_tbl_iterator_.set_matched(true);
- VLOG_ROW << "joined build row: " << matched_build_row;
- }
-
- hash_tbl_iterator_.Next<true>();
- if (EvalConjuncts(conjunct_evals_.data(), num_conjuncts, out_row)) {
- out_batch->CommitLastRow();
- VLOG_ROW << "match row: " << PrintRow(out_row, *row_desc());
- ++num_rows_returned_;
- COUNTER_SET(rows_returned_counter_, num_rows_returned_);
- if (out_batch->AtCapacity() || ReachedLimit()) {
- *eos = ReachedLimit();
- return Status::OK();
- }
- }
- }
-
- // If a probe row exists at this point, check whether we need to output the current
- // probe row before getting a new probe batch. (IMPALA-2440)
- bool probe_row_exists = probe_batch_->num_rows() > 0;
- if (match_all_probe_ && !matched_probe_ && probe_row_exists) {
- int row_idx = out_batch->AddRow();
- TupleRow* out_row = out_batch->GetRow(row_idx);
- CreateOutputRow(out_row, current_probe_row_, NULL);
- if (EvalConjuncts(conjunct_evals_.data(), num_conjuncts, out_row)) {
- out_batch->CommitLastRow();
- VLOG_ROW << "match row: " << PrintRow(out_row, *row_desc());
- ++num_rows_returned_;
- COUNTER_SET(rows_returned_counter_, num_rows_returned_);
- matched_probe_ = true;
- if (out_batch->AtCapacity() || ReachedLimit()) {
- *eos = ReachedLimit();
- return Status::OK();
- }
- }
- }
-
- if (probe_batch_pos_ == probe_batch_->num_rows()) {
- // pass on resources, out_batch might still need them
- probe_batch_->TransferResourceOwnership(out_batch);
- probe_batch_pos_ = 0;
- if (out_batch->AtCapacity()) return Status::OK();
- // get new probe batch
- if (!probe_side_eos_) {
- while (true) {
- probe_timer.Stop();
- RETURN_IF_ERROR(child(0)->GetNext(state, probe_batch_.get(), &probe_side_eos_));
- probe_timer.Start();
- if (probe_batch_->num_rows() == 0) {
- // Empty batches can still contain IO buffers, which need to be passed up to
- // the caller; transferring resources can fill up out_batch.
- probe_batch_->TransferResourceOwnership(out_batch);
- if (probe_side_eos_) {
- eos_ = true;
- break;
- }
- if (out_batch->AtCapacity()) return Status::OK();
- continue;
- } else {
- COUNTER_ADD(probe_row_counter_, probe_batch_->num_rows());
- break;
- }
- }
- } else {
- eos_ = true;
- }
- // finish up right outer join
- if (eos_ && match_all_build_) {
- hash_tbl_iterator_ = hash_tbl_->Begin();
- }
- }
-
- if (eos_) break;
-
- // join remaining rows in probe batch_
- current_probe_row_ = probe_batch_->GetRow(probe_batch_pos_++);
- VLOG_ROW << "probe row: " << GetLeftChildRowString(current_probe_row_);
- matched_probe_ = false;
- hash_tbl_iterator_ = hash_tbl_->Find(current_probe_row_);
- }
-
- *eos = true;
- if (match_all_build_) {
- // output remaining unmatched build rows
- TupleRow* build_row = NULL;
- while (!out_batch->AtCapacity() && !hash_tbl_iterator_.AtEnd()) {
- build_row = hash_tbl_iterator_.GetRow();
- bool matched = hash_tbl_iterator_.matched();
- hash_tbl_iterator_.Next<false>();
- if (matched) continue;
-
- int row_idx = out_batch->AddRow();
- TupleRow* out_row = out_batch->GetRow(row_idx);
- CreateOutputRow(out_row, NULL, build_row);
- if (EvalConjuncts(conjunct_evals_.data(), num_conjuncts, out_row)) {
- out_batch->CommitLastRow();
- VLOG_ROW << "match row: " << PrintRow(out_row, *row_desc());
- ++num_rows_returned_;
- COUNTER_SET(rows_returned_counter_, num_rows_returned_);
- if (ReachedLimit()) {
- *eos = true;
- return Status::OK();
- }
- }
- }
- // we're done if there are no more rows left to check
- *eos = hash_tbl_iterator_.AtEnd();
- }
- return Status::OK();
-}
-
-Status HashJoinNode::LeftJoinGetNext(RuntimeState* state,
- RowBatch* out_batch, bool* eos) {
- *eos = eos_;
-
- ScopedTimer<MonotonicStopWatch> probe_timer(probe_timer_);
- while (!eos_) {
- // Compute max rows that should be added to out_batch
- int64_t max_added_rows = out_batch->capacity() - out_batch->num_rows();
- if (limit() != -1) max_added_rows = min(max_added_rows, limit() - rows_returned());
-
- // Continue processing this row batch
- if (process_probe_batch_fn_ == NULL) {
- num_rows_returned_ +=
- ProcessProbeBatch(out_batch, probe_batch_.get(), max_added_rows);
- } else {
- // Use codegen'd function
- num_rows_returned_ +=
- process_probe_batch_fn_(this, out_batch, probe_batch_.get(), max_added_rows);
- }
- COUNTER_SET(rows_returned_counter_, num_rows_returned_);
-
- if (ReachedLimit() || out_batch->AtCapacity()) {
- *eos = ReachedLimit();
- break;
- }
-
- // Check to see if we're done processing the current probe batch
- if (hash_tbl_iterator_.AtEnd() && probe_batch_pos_ == probe_batch_->num_rows()) {
- probe_batch_->TransferResourceOwnership(out_batch);
- probe_batch_pos_ = 0;
- if (out_batch->AtCapacity()) break;
- if (probe_side_eos_) {
- *eos = eos_ = true;
- break;
- } else {
- probe_timer.Stop();
- RETURN_IF_ERROR(child(0)->GetNext(state, probe_batch_.get(), &probe_side_eos_));
- probe_timer.Start();
- COUNTER_ADD(probe_row_counter_, probe_batch_->num_rows());
- }
- }
- }
-
- return Status::OK();
-}
-
-void HashJoinNode::AddToDebugString(int indentation_level, stringstream* out) const {
- *out << " hash_tbl=";
- *out << string(indentation_level * 2, ' ');
- *out << "HashTbl("
- << " build_exprs=" << ScalarExpr::DebugString(build_exprs_)
- << " probe_exprs=" << ScalarExpr::DebugString(probe_exprs_);
- *out << ")";
-}
-
-// This codegen'd function should only be used for left join cases so it assumes that
-// the probe row is non-null. For a left outer join, the IR looks like:
-// define void @CreateOutputRow(%"class.impala::BlockingJoinNode"* %this_ptr,
-// %"class.impala::TupleRow"* %out_arg,
-// %"class.impala::TupleRow"* %probe_arg,
-// %"class.impala::TupleRow"* %build_arg) {
-// entry:
-// %out = bitcast %"class.impala::TupleRow"* %out_arg to i8**
-// %probe = bitcast %"class.impala::TupleRow"* %probe_arg to i8**
-// %build = bitcast %"class.impala::TupleRow"* %build_arg to i8**
-// %0 = bitcast i8** %out to i8*
-// %1 = bitcast i8** %probe to i8*
-// call void @llvm.memcpy.p0i8.p0i8.i32(i8* %0, i8* %1, i32 16, i32 16, i1 false)
-// %is_build_null = icmp eq i8** %build, null
-// br i1 %is_build_null, label %build_null, label %build_not_null
-//
-// build_not_null: ; preds = %entry
-// %dst_tuple_ptr1 = getelementptr i8** %out, i32 1
-// %src_tuple_ptr = getelementptr i8** %build, i32 0
-// %2 = load i8** %src_tuple_ptr
-// store i8* %2, i8** %dst_tuple_ptr1
-// ret void
-//
-// build_null: ; preds = %entry
-// %dst_tuple_ptr = getelementptr i8** %out, i32 1
-// call void @llvm.memcpy.p0i8.p0i8.i32(
-// i8* %dst_tuple_ptr, i8* %1, i32 16, i32 16, i1 false)
-// ret void
-// }
-Function* HashJoinNode::CodegenCreateOutputRow(LlvmCodeGen* codegen) {
- Type* tuple_row_type = codegen->GetType(TupleRow::LLVM_CLASS_NAME);
- DCHECK(tuple_row_type != NULL);
- PointerType* tuple_row_ptr_type = PointerType::get(tuple_row_type, 0);
-
- Type* this_type = codegen->GetType(BlockingJoinNode::LLVM_CLASS_NAME);
- DCHECK(this_type != NULL);
- PointerType* this_ptr_type = PointerType::get(this_type, 0);
-
- // TupleRows are really just an array of pointers. Easier to work with them
- // this way.
- PointerType* tuple_row_working_type = PointerType::get(codegen->ptr_type(), 0);
-
- // Construct function signature to match CreateOutputRow()
- LlvmCodeGen::FnPrototype prototype(codegen, "CreateOutputRow", codegen->void_type());
- prototype.AddArgument(LlvmCodeGen::NamedVariable("this_ptr", this_ptr_type));
- prototype.AddArgument(LlvmCodeGen::NamedVariable("out_arg", tuple_row_ptr_type));
- prototype.AddArgument(LlvmCodeGen::NamedVariable("probe_arg", tuple_row_ptr_type));
- prototype.AddArgument(LlvmCodeGen::NamedVariable("build_arg", tuple_row_ptr_type));
-
- LLVMContext& context = codegen->context();
- LlvmBuilder builder(context);
- Value* args[4];
- Function* fn = prototype.GeneratePrototype(&builder, args);
- Value* out_row_arg = builder.CreateBitCast(args[1], tuple_row_working_type, "out");
- Value* probe_row_arg = builder.CreateBitCast(args[2], tuple_row_working_type, "probe");
- Value* build_row_arg = builder.CreateBitCast(args[3], tuple_row_working_type, "build");
-
- int num_probe_tuples = child(0)->row_desc()->tuple_descriptors().size();
- int num_build_tuples = child(1)->row_desc()->tuple_descriptors().size();
-
- // Copy probe row
- codegen->CodegenMemcpy(&builder, out_row_arg, probe_row_arg, probe_tuple_row_size_);
- Value* build_row_idx[] = {codegen->GetIntConstant(TYPE_INT, num_probe_tuples)};
- Value* build_row_dst =
- builder.CreateInBoundsGEP(out_row_arg, build_row_idx, "build_dst_ptr");
-
- // Copy build row.
- BasicBlock* build_not_null_block = BasicBlock::Create(context, "build_not_null", fn);
- BasicBlock* build_null_block = NULL;
-
- if (match_all_probe_) {
- // build tuple can be null
- build_null_block = BasicBlock::Create(context, "build_null", fn);
- Value* is_build_null = builder.CreateIsNull(build_row_arg, "is_build_null");
- builder.CreateCondBr(is_build_null, build_null_block, build_not_null_block);
-
- // Set tuple build ptrs to NULL
- // TODO: this should be replaced with memset() but I can't get the llvm intrinsic
- // to work.
- builder.SetInsertPoint(build_null_block);
- for (int i = 0; i < num_build_tuples; ++i) {
- Value* array_idx[] = {codegen->GetIntConstant(TYPE_INT, i + num_probe_tuples)};
- Value* dst = builder.CreateInBoundsGEP(out_row_arg, array_idx, "dst_tuple_ptr");
- builder.CreateStore(codegen->null_ptr_value(), dst);
- }
- builder.CreateRetVoid();
- } else {
- // build row can't be NULL
- builder.CreateBr(build_not_null_block);
- }
-
- // Copy build tuple ptrs
- builder.SetInsertPoint(build_not_null_block);
- codegen->CodegenMemcpy(&builder, build_row_dst, build_row_arg, build_tuple_row_size_);
- builder.CreateRetVoid();
-
- return codegen->FinalizeFunction(fn);
-}
-
-Function* HashJoinNode::CodegenProcessBuildBatch(LlvmCodeGen* codegen,
- Function* hash_fn) {
- // Get cross compiled function
- Function* process_build_batch_fn =
- codegen->GetFunction(IRFunction::HASH_JOIN_PROCESS_BUILD_BATCH, true);
- DCHECK(process_build_batch_fn != NULL);
-
- // Codegen for evaluating build rows
- Function* eval_row_fn = hash_tbl_->CodegenEvalTupleRow(codegen, true);
- if (eval_row_fn == NULL) return NULL;
-
- int replaced = codegen->ReplaceCallSites(process_build_batch_fn, eval_row_fn,
- "EvalBuildRow");
- DCHECK_EQ(replaced, 1);
-
- replaced = codegen->ReplaceCallSites(process_build_batch_fn, hash_fn, "HashCurrentRow");
- DCHECK_EQ(replaced, 1);
-
- return codegen->FinalizeFunction(process_build_batch_fn);
-}
-
-Function* HashJoinNode::CodegenProcessProbeBatch(LlvmCodeGen* codegen,
- Function* hash_fn) {
- // Get cross compiled function
- Function* process_probe_batch_fn =
- codegen->GetFunction(IRFunction::HASH_JOIN_PROCESS_PROBE_BATCH, true);
- DCHECK(process_probe_batch_fn != NULL);
-
- // Codegen HashTable::Equals()
- Function* equals_fn = hash_tbl_->CodegenEquals(codegen);
- if (equals_fn == NULL) return NULL;
-
- // Codegen for evaluating build rows
- Function* eval_row_fn = hash_tbl_->CodegenEvalTupleRow(codegen, false);
- if (eval_row_fn == NULL) return NULL;
-
- // Codegen CreateOutputRow()
- Function* create_output_row_fn = CodegenCreateOutputRow(codegen);
- if (create_output_row_fn == NULL) return NULL;
-
- // Codegen evaluating other join conjuncts
- Function* eval_other_conjuncts_fn;
- Status status = ExecNode::CodegenEvalConjuncts(codegen, other_join_conjuncts_,
- &eval_other_conjuncts_fn, "EvalOtherConjuncts");
- if (!status.ok()) return NULL;
-
- // Codegen evaluating conjuncts
- Function* eval_conjuncts_fn;
- status = ExecNode::CodegenEvalConjuncts(codegen, conjuncts_, &eval_conjuncts_fn);
- if (!status.ok()) return NULL;
-
- // Replace all call sites with codegen version
- int replaced = codegen->ReplaceCallSites(process_probe_batch_fn, hash_fn,
- "HashCurrentRow");
- DCHECK_EQ(replaced, 1);
-
- replaced = codegen->ReplaceCallSites(process_probe_batch_fn, eval_row_fn,
- "EvalProbeRow");
- DCHECK_EQ(replaced, 1);
-
- replaced = codegen->ReplaceCallSites(process_probe_batch_fn, create_output_row_fn,
- "CreateOutputRow");
- DCHECK_EQ(replaced, 3);
-
- replaced = codegen->ReplaceCallSites(process_probe_batch_fn, eval_conjuncts_fn,
- "EvalConjuncts");
- DCHECK_EQ(replaced, 2);
-
- replaced = codegen->ReplaceCallSites(process_probe_batch_fn, eval_other_conjuncts_fn,
- "EvalOtherJoinConjuncts");
- DCHECK_EQ(replaced, 2);
-
- replaced = codegen->ReplaceCallSites(process_probe_batch_fn, equals_fn, "Equals");
- DCHECK_EQ(replaced, 2);
-
- return codegen->FinalizeFunction(process_probe_batch_fn);
-}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/507bd8be/be/src/exec/hash-join-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hash-join-node.h b/be/src/exec/hash-join-node.h
deleted file mode 100644
index b49f8bb..0000000
--- a/be/src/exec/hash-join-node.h
+++ /dev/null
@@ -1,164 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-
-#ifndef IMPALA_EXEC_HASH_JOIN_NODE_H
-#define IMPALA_EXEC_HASH_JOIN_NODE_H
-
-#include <boost/scoped_ptr.hpp>
-#include <boost/thread.hpp>
-#include <string>
-
-#include "exec/exec-node.h"
-#include "exec/old-hash-table.h"
-#include "exec/blocking-join-node.h"
-#include "exprs/scalar-expr.h"
-#include "exprs/scalar-expr-evaluator.h"
-#include "util/promise.h"
-
-#include "gen-cpp/PlanNodes_types.h" // for TJoinOp
-
-namespace impala {
-
-class MemPool;
-class RowBatch;
-class ScalarExpr;
-class ScalarExprEvaluator;
-class TupleRow;
-
-/// Node for in-memory hash joins:
-/// - builds up a hash table with the rows produced by our right input
-/// (child(1)); build exprs are the rhs exprs of our equi-join predicates
-/// - for each row from our left input, probes the hash table to retrieve
-/// matching entries; the probe exprs are the lhs exprs of our equi-join predicates
-//
-/// Row batches:
-/// - In general, we are not able to pass our output row batch on to our left child (when
-/// we're fetching the probe rows): if we have a 1xn join, our output will contain
-/// multiple rows per left input row
-/// - TODO: fix this, so in the case of 1x1/nx1 joins (for instance, fact to dimension tbl)
-/// we don't do these extra copies
-class HashJoinNode : public BlockingJoinNode {
- public:
- HashJoinNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
-
- virtual Status Init(const TPlanNode& tnode, RuntimeState* state);
- virtual Status Prepare(RuntimeState* state);
- virtual void Codegen(RuntimeState* state);
- virtual Status Open(RuntimeState* state);
- virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos);
- virtual Status Reset(RuntimeState* state);
- virtual void Close(RuntimeState* state);
-
- static const char* LLVM_CLASS_NAME;
-
- protected:
- virtual Status QueryMaintenance(RuntimeState* state);
- virtual void AddToDebugString(int indentation_level, std::stringstream* out) const;
- virtual Status ProcessBuildInput(RuntimeState* state);
-
- private:
- boost::scoped_ptr<OldHashTable> hash_tbl_;
- OldHashTable::Iterator hash_tbl_iterator_;
-
- /// holds everything referenced from build side
- boost::scoped_ptr<MemPool> build_pool_;
-
- /// our equi-join predicates "<lhs> = <rhs>" are separated into
- /// build_exprs_ (over child(1)) and probe_exprs_ (over child(0))
- std::vector<ScalarExpr*> probe_exprs_;
- std::vector<ScalarExpr*> build_exprs_;
-
- /// Expressions used to build runtime filters, one per entry in filters_.
- std::vector<ScalarExpr*> filter_exprs_;
-
- /// is_not_distinct_from_[i] is true if and only if the ith equi-join predicate is IS
- /// NOT DISTINCT FROM, rather than equality.
- std::vector<bool> is_not_distinct_from_;
-
- /// non-equi-join conjuncts from the JOIN clause
- std::vector<ScalarExpr*> other_join_conjuncts_;
- std::vector<ScalarExprEvaluator*> other_join_conjunct_evals_;
-
- /// Derived from join_op_
- /// Output all rows coming from the probe input. Used in LEFT_OUTER_JOIN and
- /// FULL_OUTER_JOIN.
- bool match_all_probe_;
-
- /// Match at most one build row to each probe row. Used in LEFT_SEMI_JOIN.
- bool match_one_build_;
-
- /// Output all rows coming from the build input. Used in RIGHT_OUTER_JOIN and
- /// FULL_OUTER_JOIN.
- bool match_all_build_;
-
- /// llvm function for build batch
- llvm::Function* codegen_process_build_batch_fn_;
-
- /// Function declaration for codegen'd function. Signature must match
- /// HashJoinNode::ProcessBuildBatch
- typedef void (*ProcessBuildBatchFn)(HashJoinNode*, RowBatch*);
- ProcessBuildBatchFn process_build_batch_fn_;
-
- /// HashJoinNode::ProcessProbeBatch() exactly
- typedef int (*ProcessProbeBatchFn)(HashJoinNode*, RowBatch*, RowBatch*, int);
- /// Jitted ProcessProbeBatch function pointer. Null if codegen is disabled.
- ProcessProbeBatchFn process_probe_batch_fn_;
-
- /// RuntimeFilters to build.
- std::vector<RuntimeFilter*> filters_;
-
- RuntimeProfile::Counter* build_buckets_counter_; // num buckets in hash table
- RuntimeProfile::Counter* hash_tbl_load_factor_counter_;
-
- /// Prepares for the first call to GetNext(). Must be called after GetFirstProbeRow().
- void InitGetNext();
-
- /// GetNext helper function for the common join cases: Inner join, left semi and left
- /// outer
- Status LeftJoinGetNext(RuntimeState* state, RowBatch* row_batch, bool* eos);
-
- /// Processes a probe batch for the common (non right-outer join) cases.
- /// out_batch: the batch for resulting tuple rows
- /// probe_batch: the probe batch to process. This function can be called to
- /// continue processing a batch in the middle
- /// max_added_rows: maximum rows that can be added to out_batch
- /// return the number of rows added to out_batch
- int ProcessProbeBatch(RowBatch* out_batch, RowBatch* probe_batch, int max_added_rows);
-
- /// Construct the build hash table, adding all the rows in 'build_batch'
- void ProcessBuildBatch(RowBatch* build_batch);
-
- /// Codegen function to create output row
- llvm::Function* CodegenCreateOutputRow(LlvmCodeGen* codegen);
-
- /// Codegen processing build batches. Identical signature to ProcessBuildBatch.
- /// hash_fn is the codegen'd function for computing hashes over tuple rows in the
- /// hash table.
- /// Returns NULL if codegen was not possible.
- llvm::Function* CodegenProcessBuildBatch(LlvmCodeGen* codegen, llvm::Function* hash_fn);
-
- /// Codegen processing probe batches. Identical signature to ProcessProbeBatch.
- /// hash_fn is the codegen'd function for computing hashes over tuple rows in the
- /// hash table.
- /// Returns NULL if codegen was not possible.
- llvm::Function* CodegenProcessProbeBatch(LlvmCodeGen* codegen, llvm::Function* hash_fn);
-};
-
-}
-
-#endif
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/507bd8be/be/src/exec/hdfs-parquet-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc
index 63298cb..e97572c 100644
--- a/be/src/exec/hdfs-parquet-scanner.cc
+++ b/be/src/exec/hdfs-parquet-scanner.cc
@@ -54,8 +54,6 @@ using namespace llvm;
DEFINE_double(parquet_min_filter_reject_ratio, 0.1, "(Advanced) If the percentage of "
"rows rejected by a runtime filter drops below this value, the filter is disabled.");
-DECLARE_bool(enable_partitioned_aggregation);
-DECLARE_bool(enable_partitioned_hash_join);
// The number of row batches between checks to see if a filter is effective, and
// should be disabled. Must be a power of two.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/507bd8be/be/src/exec/nested-loop-join-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/nested-loop-join-node.cc b/be/src/exec/nested-loop-join-node.cc
index 01a7f19..d377a22 100644
--- a/be/src/exec/nested-loop-join-node.cc
+++ b/be/src/exec/nested-loop-join-node.cc
@@ -186,11 +186,6 @@ Status NestedLoopJoinNode::ResetMatchingBuildRows(RuntimeState* state, int64_t n
return Status::OK();
}
-Status NestedLoopJoinNode::ProcessBuildInput(RuntimeState* state) {
- DCHECK(false) << "Should not be called, NLJ uses the BuildSink API";
- return Status::OK();
-}
-
void NestedLoopJoinNode::ResetForProbe() {
DCHECK(build_batches_ != NULL);
build_row_iterator_ = build_batches_->Iterator();
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/507bd8be/be/src/exec/nested-loop-join-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/nested-loop-join-node.h b/be/src/exec/nested-loop-join-node.h
index 94f1dae..c94abbf 100644
--- a/be/src/exec/nested-loop-join-node.h
+++ b/be/src/exec/nested-loop-join-node.h
@@ -51,9 +51,6 @@ class NestedLoopJoinNode : public BlockingJoinNode {
virtual Status Reset(RuntimeState* state);
virtual void Close(RuntimeState* state);
- protected:
- virtual Status ProcessBuildInput(RuntimeState* state);
-
private:
/////////////////////////////////////////
/// BEGIN: Members that must be Reset()
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/507bd8be/be/src/exec/old-hash-table-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/old-hash-table-ir.cc b/be/src/exec/old-hash-table-ir.cc
deleted file mode 100644
index 2436ef1..0000000
--- a/be/src/exec/old-hash-table-ir.cc
+++ /dev/null
@@ -1,42 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifdef IR_COMPILE
-
-#include "exec/old-hash-table.h"
-
-namespace impala {
-
-uint8_t* OldHashTable::expr_values_buffer() const {
- return expr_values_buffer_;
-}
-
-uint8_t* OldHashTable::expr_value_null_bits() const {
- return expr_value_null_bits_;
-}
-
-ScalarExprEvaluator* const* OldHashTable::build_expr_evals() const {
- return build_expr_evals_.data();
-}
-
-ScalarExprEvaluator* const* OldHashTable::probe_expr_evals() const {
- return probe_expr_evals_.data();
-}
-
-}
-
-#endif
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/507bd8be/be/src/exec/old-hash-table-test.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/old-hash-table-test.cc b/be/src/exec/old-hash-table-test.cc
deleted file mode 100644
index e873791..0000000
--- a/be/src/exec/old-hash-table-test.cc
+++ /dev/null
@@ -1,337 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include <stdlib.h>
-#include <stdio.h>
-#include <iostream>
-#include <vector>
-
-#include "common/compiler-util.h"
-#include "exec/old-hash-table.inline.h"
-#include "exprs/scalar-expr.h"
-#include "exprs/scalar-expr-evaluator.h"
-#include "exprs/slot-ref.h"
-#include "runtime/mem-pool.h"
-#include "runtime/mem-tracker.h"
-#include "runtime/string-value.h"
-#include "runtime/tuple-row.h"
-#include "testutil/gtest-util.h"
-#include "util/cpu-info.h"
-#include "util/runtime-profile-counters.h"
-
-#include "common/names.h"
-
-namespace impala {
-
-class OldHashTableTest : public testing::Test {
- public:
- OldHashTableTest() : mem_pool_(&tracker_) {}
-
- protected:
- ObjectPool pool_;
- MemTracker tracker_;
- MemPool mem_pool_;
-
- vector<ScalarExpr*> build_exprs_;
- vector<ScalarExprEvaluator*> build_expr_evals_;
- vector<ScalarExpr*> probe_exprs_;
- vector<ScalarExprEvaluator*> probe_expr_evals_;
-
- virtual void SetUp() {
- RowDescriptor desc;
- // Not very easy to test complex tuple layouts so this test will use the
- // simplest. The purpose of these tests is to exercise the hash map
- // internals so a simple build/probe expr is fine.
- ScalarExpr* build_expr = pool_.Add(new SlotRef(TYPE_INT, 0));
- ASSERT_OK(build_expr->Init(desc, nullptr));
- build_exprs_.push_back(build_expr);
- ASSERT_OK(ScalarExprEvaluator::Create(build_exprs_, nullptr, &pool_, &mem_pool_,
- &build_expr_evals_));
- ASSERT_OK(ScalarExprEvaluator::Open(build_expr_evals_, nullptr));
-
- ScalarExpr* probe_expr = pool_.Add(new SlotRef(TYPE_INT, 0));
- ASSERT_OK(probe_expr->Init(desc, nullptr));
- probe_exprs_.push_back(probe_expr);
- ASSERT_OK(ScalarExprEvaluator::Create(probe_exprs_, nullptr, &pool_, &mem_pool_,
- &probe_expr_evals_));
- ASSERT_OK(ScalarExprEvaluator::Open(probe_expr_evals_, nullptr));
- }
-
- virtual void TearDown() {
- ScalarExprEvaluator::Close(build_expr_evals_, nullptr);
- ScalarExprEvaluator::Close(probe_expr_evals_, nullptr);
- ScalarExpr::Close(build_exprs_);
- ScalarExpr::Close(probe_exprs_);
- }
-
- TupleRow* CreateTupleRow(int32_t val) {
- uint8_t* tuple_row_mem = mem_pool_.Allocate(sizeof(int32_t*));
- Tuple* tuple_mem = Tuple::Create(sizeof(int32_t), &mem_pool_);
- *reinterpret_cast<int32_t*>(tuple_mem) = val;
- TupleRow* row = reinterpret_cast<TupleRow*>(tuple_row_mem);
- row->SetTuple(0, tuple_mem);
- return row;
- }
-
- // Wrapper to call private methods on OldHashTable
- // TODO: understand google testing, there must be a more natural way to do this
- void ResizeTable(OldHashTable* table, int64_t new_size) {
- table->ResizeBuckets(new_size);
- }
-
- // Do a full table scan on table. All values should be between [min,max). If
- // all_unique, then each key(int value) should only appear once. Results are
- // stored in results, indexed by the key. Results must have been preallocated to
- // be at least max size.
- void FullScan(OldHashTable* table, int min, int max, bool all_unique,
- TupleRow** results, TupleRow** expected) {
- OldHashTable::Iterator iter = table->Begin();
- while (iter != table->End()) {
- TupleRow* row = iter.GetRow();
- int32_t val = *reinterpret_cast<int32_t*>(build_expr_evals_[0]->GetValue(row));
- EXPECT_GE(val, min);
- EXPECT_LT(val, max);
- if (all_unique) EXPECT_TRUE(results[val] == nullptr);
- EXPECT_EQ(row->GetTuple(0), expected[val]->GetTuple(0));
- results[val] = row;
- iter.Next<false>();
- }
- }
-
- // Validate that probe_row evaluates overs probe_exprs is equal to build_row
- // evaluated over build_exprs
- void ValidateMatch(TupleRow* probe_row, TupleRow* build_row) {
- EXPECT_TRUE(probe_row != build_row);
- int32_t build_val =
- *reinterpret_cast<int32_t*>(build_expr_evals_[0]->GetValue(probe_row));
- int32_t probe_val =
- *reinterpret_cast<int32_t*>(probe_expr_evals_[0]->GetValue(build_row));
- EXPECT_EQ(build_val, probe_val);
- }
-
- struct ProbeTestData {
- TupleRow* probe_row;
- vector<TupleRow*> expected_build_rows;
- };
-
- void ProbeTest(OldHashTable* table, ProbeTestData* data, int num_data, bool scan) {
- for (int i = 0; i < num_data; ++i) {
- TupleRow* row = data[i].probe_row;
-
- OldHashTable::Iterator iter;
- iter = table->Find(row);
-
- if (data[i].expected_build_rows.size() == 0) {
- EXPECT_TRUE(iter == table->End());
- } else {
- if (scan) {
- map<TupleRow*, bool> matched;
- while (iter != table->End()) {
- EXPECT_TRUE(matched.find(iter.GetRow()) == matched.end());
- matched[iter.GetRow()] = true;
- iter.Next<true>();
- }
- EXPECT_EQ(matched.size(), data[i].expected_build_rows.size());
- for (int j = 0; i < data[j].expected_build_rows.size(); ++j) {
- EXPECT_TRUE(matched[data[i].expected_build_rows[j]]);
- }
- } else {
- EXPECT_EQ(data[i].expected_build_rows.size(), 1);
- EXPECT_TRUE(
- data[i].expected_build_rows[0]->GetTuple(0) == iter.GetRow()->GetTuple(0));
- ValidateMatch(row, iter.GetRow());
- }
- }
- }
- }
-};
-
-TEST_F(OldHashTableTest, SetupTest) {
- TupleRow* build_row1 = CreateTupleRow(1);
- TupleRow* build_row2 = CreateTupleRow(2);
- TupleRow* probe_row3 = CreateTupleRow(3);
- TupleRow* probe_row4 = CreateTupleRow(4);
-
- int32_t* val_row1 =
- reinterpret_cast<int32_t*>(build_expr_evals_[0]->GetValue(build_row1));
- EXPECT_EQ(*val_row1, 1);
- int32_t* val_row2 =
- reinterpret_cast<int32_t*>(build_expr_evals_[0]->GetValue(build_row2));
- EXPECT_EQ(*val_row2, 2);
- int32_t* val_row3 =
- reinterpret_cast<int32_t*>(probe_expr_evals_[0]->GetValue(probe_row3));
- EXPECT_EQ(*val_row3, 3);
- int32_t* val_row4 =
- reinterpret_cast<int32_t*>(probe_expr_evals_[0]->GetValue(probe_row4));
- EXPECT_EQ(*val_row4, 4);
-
- mem_pool_.FreeAll();
-}
-
-// This tests inserts the build rows [0->5) to hash table. It validates that they
-// are all there using a full table scan. It also validates that Find() is correct
-// testing for probe rows that are both there and not.
-// The hash table is rehashed a few times and the scans/finds are tested again.
-TEST_F(OldHashTableTest, BasicTest) {
- TupleRow* build_rows[5];
- TupleRow* scan_rows[5] = {0};
- for (int i = 0; i < 5; ++i) {
- build_rows[i] = CreateTupleRow(i);
- }
-
- ProbeTestData probe_rows[10];
- for (int i = 0; i < 10; ++i) {
- probe_rows[i].probe_row = CreateTupleRow(i);
- if (i < 5) {
- probe_rows[i].expected_build_rows.push_back(build_rows[i]);
- }
- }
-
- // Create the hash table and insert the build rows
- MemTracker tracker;
- scoped_ptr<OldHashTable> hash_table;
- EXPECT_OK(OldHashTable::Create(&pool_, nullptr, build_exprs_, probe_exprs_,
- vector<ScalarExpr*>(), 1, false, std::vector<bool>(build_exprs_.size(), false),
- 0, &tracker, vector<RuntimeFilter*>(), &hash_table));
- EXPECT_OK(hash_table->Open(nullptr));
- for (int i = 0; i < 5; ++i) {
- hash_table->Insert(build_rows[i]);
- }
- EXPECT_EQ(hash_table->size(), 5);
-
- // Do a full table scan and validate returned pointers
- FullScan(hash_table.get(), 0, 5, true, scan_rows, build_rows);
- ProbeTest(hash_table.get(), probe_rows, 10, false);
-
- // Resize and scan again
- ResizeTable(hash_table.get(), 64);
- EXPECT_EQ(hash_table->num_buckets(), 64);
- EXPECT_EQ(hash_table->size(), 5);
- memset(scan_rows, 0, sizeof(scan_rows));
- FullScan(hash_table.get(), 0, 5, true, scan_rows, build_rows);
- ProbeTest(hash_table.get(), probe_rows, 10, false);
-
- // Resize to two and cause some collisions
- ResizeTable(hash_table.get(), 2);
- EXPECT_EQ(hash_table->num_buckets(), 2);
- EXPECT_EQ(hash_table->size(), 5);
- memset(scan_rows, 0, sizeof(scan_rows));
- FullScan(hash_table.get(), 0, 5, true, scan_rows, build_rows);
- ProbeTest(hash_table.get(), probe_rows, 10, false);
-
- // Resize to one and turn it into a linked list
- ResizeTable(hash_table.get(), 1);
- EXPECT_EQ(hash_table->num_buckets(), 1);
- EXPECT_EQ(hash_table->size(), 5);
- memset(scan_rows, 0, sizeof(scan_rows));
- FullScan(hash_table.get(), 0, 5, true, scan_rows, build_rows);
- ProbeTest(hash_table.get(), probe_rows, 10, false);
-
- hash_table->Close(nullptr);
- mem_pool_.FreeAll();
-}
-
-// This tests makes sure we can scan ranges of buckets
-TEST_F(OldHashTableTest, ScanTest) {
- MemTracker tracker;
- scoped_ptr<OldHashTable> hash_table;
- EXPECT_OK(OldHashTable::Create(&pool_, nullptr, build_exprs_, probe_exprs_,
- vector<ScalarExpr*>(), 1, false, std::vector<bool>(build_exprs_.size(), false),
- 0, &tracker, vector<RuntimeFilter*>(), &hash_table));
- EXPECT_OK(hash_table->Open(nullptr));
- // Add 1 row with val 1, 2 with val 2, etc
- vector<TupleRow*> build_rows;
- ProbeTestData probe_rows[15];
- probe_rows[0].probe_row = CreateTupleRow(0);
- for (int val = 1; val <= 10; ++val) {
- probe_rows[val].probe_row = CreateTupleRow(val);
- for (int i = 0; i < val; ++i) {
- TupleRow* row = CreateTupleRow(val);
- hash_table->Insert(row);
- build_rows.push_back(row);
- probe_rows[val].expected_build_rows.push_back(row);
- }
- }
-
- // Add some more probe rows that aren't there
- for (int val = 11; val < 15; ++val) {
- probe_rows[val].probe_row = CreateTupleRow(val);
- }
-
- // Test that all the builds were found
- ProbeTest(hash_table.get(), probe_rows, 15, true);
-
- // Resize and try again
- ResizeTable(hash_table.get(), 128);
- EXPECT_EQ(hash_table->num_buckets(), 128);
- ProbeTest(hash_table.get(), probe_rows, 15, true);
-
- ResizeTable(hash_table.get(), 16);
- EXPECT_EQ(hash_table->num_buckets(), 16);
- ProbeTest(hash_table.get(), probe_rows, 15, true);
-
- ResizeTable(hash_table.get(), 2);
- EXPECT_EQ(hash_table->num_buckets(), 2);
- ProbeTest(hash_table.get(), probe_rows, 15, true);
-
- hash_table->Close(nullptr);
- mem_pool_.FreeAll();
-}
-
-// This test continues adding to the hash table to trigger the resize code paths
-TEST_F(OldHashTableTest, GrowTableTest) {
- int num_to_add = 4;
- int expected_size = 0;
- MemTracker tracker(100 * 1024 * 1024);
- scoped_ptr<OldHashTable> hash_table;
- EXPECT_OK(OldHashTable::Create(&pool_, nullptr, build_exprs_, probe_exprs_,
- vector<ScalarExpr*>(), 1, false, std::vector<bool>(build_exprs_.size(), false),
- 0, &tracker, vector<RuntimeFilter*>(), &hash_table, false, num_to_add));
- EXPECT_OK(hash_table->Open(nullptr));
- EXPECT_FALSE(hash_table->mem_limit_exceeded());
- EXPECT_TRUE(!tracker.LimitExceeded());
-
- // This inserts about 5M entries
- int build_row_val = 0;
- for (int i = 0; i < 20; ++i) {
- for (int j = 0; j < num_to_add; ++build_row_val, ++j) {
- hash_table->Insert(CreateTupleRow(build_row_val));
- }
- expected_size += num_to_add;
- num_to_add *= 2;
- }
- EXPECT_TRUE(hash_table->mem_limit_exceeded());
- EXPECT_TRUE(tracker.LimitExceeded());
-
- // Validate that we can find the entries before we went over the limit
- for (int i = 0; i < expected_size * 5; i += 100000) {
- TupleRow* probe_row = CreateTupleRow(i);
- OldHashTable::Iterator iter = hash_table->Find(probe_row);
- if (i < hash_table->size()) {
- EXPECT_TRUE(iter != hash_table->End());
- ValidateMatch(probe_row, iter.GetRow());
- } else {
- EXPECT_TRUE(iter == hash_table->End());
- }
- }
- hash_table->Close(nullptr);
- mem_pool_.FreeAll();
-}
-
-}
-
-IMPALA_TEST_MAIN();
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/507bd8be/be/src/exec/old-hash-table.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/old-hash-table.cc b/be/src/exec/old-hash-table.cc
deleted file mode 100644
index 9105226..0000000
--- a/be/src/exec/old-hash-table.cc
+++ /dev/null
@@ -1,872 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "exec/old-hash-table.inline.h"
-
-#include <functional>
-#include <numeric>
-
-#include "codegen/codegen-anyval.h"
-#include "codegen/llvm-codegen.h"
-#include "exprs/scalar-expr.h"
-#include "exprs/scalar-expr-evaluator.h"
-#include "exprs/slot-ref.h"
-#include "runtime/mem-tracker.h"
-#include "runtime/raw-value.inline.h"
-#include "runtime/runtime-filter.h"
-#include "runtime/runtime-filter-bank.h"
-#include "runtime/runtime-state.h"
-#include "runtime/string-value.inline.h"
-#include "runtime/tuple-row.h"
-#include "util/bloom-filter.h"
-#include "runtime/tuple.h"
-#include "util/debug-util.h"
-#include "util/error-util.h"
-#include "util/impalad-metrics.h"
-
-#include "common/names.h"
-
-using namespace impala;
-using namespace llvm;
-
-const char* OldHashTable::LLVM_CLASS_NAME = "class.impala::OldHashTable";
-
-const float OldHashTable::MAX_BUCKET_OCCUPANCY_FRACTION = 0.75f;
-static const int HT_PAGE_SIZE = 8 * 1024 * 1024;
-
-// Put a non-zero constant in the result location for NULL.
-// We don't want(NULL, 1) to hash to the same as (0, 1).
-// This needs to be as big as the biggest primitive type since the bytes
-// get copied directly.
-// TODO find a better approach, since primitives like CHAR(N) can be up to 128 bytes
-static int64_t NULL_VALUE[] = { HashUtil::FNV_SEED, HashUtil::FNV_SEED,
- HashUtil::FNV_SEED, HashUtil::FNV_SEED,
- HashUtil::FNV_SEED, HashUtil::FNV_SEED,
- HashUtil::FNV_SEED, HashUtil::FNV_SEED,
- HashUtil::FNV_SEED, HashUtil::FNV_SEED,
- HashUtil::FNV_SEED, HashUtil::FNV_SEED,
- HashUtil::FNV_SEED, HashUtil::FNV_SEED,
- HashUtil::FNV_SEED, HashUtil::FNV_SEED };
-
-OldHashTable::OldHashTable(RuntimeState* state,
- const vector<ScalarExpr*>& build_exprs, const vector<ScalarExpr*>& probe_exprs,
- const vector<ScalarExpr*>& filter_exprs, int num_build_tuples, bool stores_nulls,
- const vector<bool>& finds_nulls, int32_t initial_seed, MemTracker* mem_tracker,
- const vector<RuntimeFilter*>& runtime_filters, bool stores_tuples,
- int64_t num_buckets)
- : state_(state),
- build_exprs_(build_exprs),
- probe_exprs_(probe_exprs),
- filter_exprs_(filter_exprs),
- filters_(runtime_filters),
- num_build_tuples_(num_build_tuples),
- stores_nulls_(stores_nulls),
- finds_nulls_(finds_nulls),
- finds_some_nulls_(std::accumulate(
- finds_nulls_.begin(), finds_nulls_.end(), false, std::logical_or<bool>())),
- stores_tuples_(stores_tuples),
- initial_seed_(initial_seed),
- num_filled_buckets_(0),
- num_nodes_(0),
- mem_pool_(new MemPool(mem_tracker)),
- num_data_pages_(0),
- next_node_(NULL),
- node_remaining_current_page_(0),
- mem_tracker_(mem_tracker),
- mem_limit_exceeded_(false) {
- DCHECK(mem_tracker != NULL);
- DCHECK_EQ(build_exprs_.size(), probe_exprs_.size());
- DCHECK_EQ(build_exprs_.size(), finds_nulls_.size());
- DCHECK_EQ((num_buckets & (num_buckets-1)), 0) << "num_buckets must be a power of 2";
- buckets_.resize(num_buckets);
- num_buckets_ = num_buckets;
- num_buckets_till_resize_ = MAX_BUCKET_OCCUPANCY_FRACTION * num_buckets_;
- mem_tracker_->Consume(buckets_.capacity() * sizeof(Bucket));
-
- // Compute the layout and buffer size to store the evaluated expr results
- results_buffer_size_ = ScalarExpr::ComputeResultsLayout(build_exprs_,
- &expr_values_buffer_offsets_, &var_result_begin_);
- expr_values_buffer_= new uint8_t[results_buffer_size_];
- memset(expr_values_buffer_, 0, sizeof(uint8_t) * results_buffer_size_);
- expr_value_null_bits_ = new uint8_t[build_exprs_.size()];
-
- GrowNodeArray();
-}
-
-Status OldHashTable::Init(ObjectPool* pool, RuntimeState* state) {
- RETURN_IF_ERROR(ScalarExprEvaluator::Create(build_exprs_, state, pool,
- mem_pool_.get(), &build_expr_evals_));
- DCHECK_EQ(build_exprs_.size(), build_expr_evals_.size());
- RETURN_IF_ERROR(ScalarExprEvaluator::Create(probe_exprs_, state, pool,
- mem_pool_.get(), &probe_expr_evals_));
- DCHECK_EQ(probe_exprs_.size(), probe_expr_evals_.size());
- RETURN_IF_ERROR(ScalarExprEvaluator::Create(filter_exprs_, state, pool,
- mem_pool_.get(), &filter_expr_evals_));
- DCHECK_EQ(filter_exprs_.size(), filter_expr_evals_.size());
- return Status::OK();
-}
-
-Status OldHashTable::Create(ObjectPool* pool, RuntimeState* state,
- const vector<ScalarExpr*>& build_exprs, const vector<ScalarExpr*>& probe_exprs,
- const vector<ScalarExpr*>& filter_exprs, int num_build_tuples, bool stores_nulls,
- const vector<bool>& finds_nulls, int32_t initial_seed, MemTracker* mem_tracker,
- const vector<RuntimeFilter*>& runtime_filters, scoped_ptr<OldHashTable>* hash_tbl,
- bool stores_tuples, int64_t num_buckets) {
- hash_tbl->reset(new OldHashTable(state, build_exprs, probe_exprs, filter_exprs,
- num_build_tuples, stores_nulls, finds_nulls, initial_seed, mem_tracker,
- runtime_filters, stores_tuples, num_buckets));
- return (*hash_tbl)->Init(pool, state);
-}
-
-Status OldHashTable::Open(RuntimeState* state) {
- RETURN_IF_ERROR(ScalarExprEvaluator::Open(build_expr_evals_, state));
- DCHECK_EQ(build_exprs_.size(), build_expr_evals_.size());
- RETURN_IF_ERROR(ScalarExprEvaluator::Open(probe_expr_evals_, state));
- DCHECK_EQ(probe_exprs_.size(), probe_expr_evals_.size());
- RETURN_IF_ERROR(ScalarExprEvaluator::Open(filter_expr_evals_, state));
- DCHECK_EQ(filter_exprs_.size(), filter_expr_evals_.size());
- return Status::OK();
-}
-
-void OldHashTable::Close(RuntimeState* state) {
- // TODO: use tr1::array?
- delete[] expr_values_buffer_;
- delete[] expr_value_null_bits_;
- expr_values_buffer_ = NULL;
- expr_value_null_bits_ = NULL;
- ScalarExprEvaluator::Close(build_expr_evals_, state);
- ScalarExprEvaluator::Close(probe_expr_evals_, state);
- ScalarExprEvaluator::Close(filter_expr_evals_, state);
- mem_pool_->FreeAll();
- if (ImpaladMetrics::HASH_TABLE_TOTAL_BYTES != NULL) {
- ImpaladMetrics::HASH_TABLE_TOTAL_BYTES->Increment(-num_data_pages_ * HT_PAGE_SIZE);
- }
- mem_tracker_->Release(buckets_.capacity() * sizeof(Bucket));
- buckets_.clear();
-}
-
-void OldHashTable::FreeLocalAllocations() {
- ScalarExprEvaluator::FreeLocalAllocations(build_expr_evals_);
- ScalarExprEvaluator::FreeLocalAllocations(probe_expr_evals_);
- ScalarExprEvaluator::FreeLocalAllocations(filter_expr_evals_);
-}
-
-bool OldHashTable::EvalRow(
- TupleRow* row, const vector<ScalarExprEvaluator*>& evals) {
- bool has_null = false;
- for (int i = 0; i < evals.size(); ++i) {
- void* loc = expr_values_buffer_ + expr_values_buffer_offsets_[i];
- void* val = evals[i]->GetValue(row);
- if (val == NULL) {
- // If the table doesn't store nulls, no reason to keep evaluating
- if (!stores_nulls_) return true;
-
- expr_value_null_bits_[i] = true;
- val = &NULL_VALUE;
- has_null = true;
- } else {
- expr_value_null_bits_[i] = false;
- }
- RawValue::Write(val, loc, build_exprs_[i]->type(), NULL);
- }
- return has_null;
-}
-
-int OldHashTable::AddBloomFilters() {
- int num_enabled_filters = 0;
- vector<BloomFilter*> bloom_filters;
- bloom_filters.resize(filters_.size());
- for (int i = 0; i < filters_.size(); ++i) {
- if (state_->filter_bank()->FpRateTooHigh(filters_[i]->filter_size(), size())) {
- bloom_filters[i] = BloomFilter::ALWAYS_TRUE_FILTER;
- } else {
- bloom_filters[i] =
- state_->filter_bank()->AllocateScratchBloomFilter(filters_[i]->id());
- ++num_enabled_filters;
- }
- }
-
- OldHashTable::Iterator iter = Begin();
- while (iter != End()) {
- TupleRow* row = iter.GetRow();
- for (int i = 0; i < filters_.size(); ++i) {
- if (bloom_filters[i] == NULL) continue;
- void* e = filter_expr_evals_[i]->GetValue(row);
- uint32_t h = RawValue::GetHashValue(e, filter_exprs_[i]->type(),
- RuntimeFilterBank::DefaultHashSeed());
- bloom_filters[i]->Insert(h);
- }
- iter.Next<false>();
- }
-
- // Update all the local filters in the filter bank.
- for (int i = 0; i < filters_.size(); ++i) {
- state_->filter_bank()->UpdateFilterFromLocal(filters_[i]->id(), bloom_filters[i]);
- }
-
- return num_enabled_filters;
-}
-
-// Helper function to store a value into the results buffer if the expr
-// evaluated to NULL. We don't want (NULL, 1) to hash to the same as (0,1) so
-// we'll pick a more random value.
-static void CodegenAssignNullValue(
- LlvmCodeGen* codegen, LlvmBuilder* builder, Value* dst, const ColumnType& type) {
- uint64_t fnv_seed = HashUtil::FNV_SEED;
-
- if (type.type == TYPE_STRING || type.type == TYPE_VARCHAR) {
- Value* dst_ptr = builder->CreateStructGEP(NULL, dst, 0, "string_ptr");
- Value* dst_len = builder->CreateStructGEP(NULL, dst, 1, "string_len");
- Value* null_len = codegen->GetIntConstant(TYPE_INT, fnv_seed);
- Value* null_ptr = builder->CreateIntToPtr(null_len, codegen->ptr_type());
- builder->CreateStore(null_ptr, dst_ptr);
- builder->CreateStore(null_len, dst_len);
- return;
- } else {
- Value* null_value = NULL;
- int byte_size = type.GetByteSize();
- // Get a type specific representation of fnv_seed
- switch (type.type) {
- case TYPE_BOOLEAN:
- // In results, booleans are stored as 1 byte
- dst = builder->CreateBitCast(dst, codegen->ptr_type());
- null_value = codegen->GetIntConstant(TYPE_TINYINT, fnv_seed);
- break;
- case TYPE_TIMESTAMP: {
- // Cast 'dst' to 'i128*'
- DCHECK_EQ(byte_size, 16);
- PointerType* fnv_seed_ptr_type =
- codegen->GetPtrType(Type::getIntNTy(codegen->context(), byte_size * 8));
- dst = builder->CreateBitCast(dst, fnv_seed_ptr_type);
- null_value = codegen->GetIntConstant(byte_size, fnv_seed, fnv_seed);
- break;
- }
- case TYPE_TINYINT:
- case TYPE_SMALLINT:
- case TYPE_INT:
- case TYPE_BIGINT:
- case TYPE_DECIMAL:
- null_value = codegen->GetIntConstant(byte_size, fnv_seed, fnv_seed);
- break;
- case TYPE_FLOAT: {
- // Don't care about the value, just the bit pattern
- float fnv_seed_float = *reinterpret_cast<float*>(&fnv_seed);
- null_value = ConstantFP::get(codegen->context(), APFloat(fnv_seed_float));
- break;
- }
- case TYPE_DOUBLE: {
- // Don't care about the value, just the bit pattern
- double fnv_seed_double = *reinterpret_cast<double*>(&fnv_seed);
- null_value = ConstantFP::get(codegen->context(), APFloat(fnv_seed_double));
- break;
- }
- default:
- DCHECK(false);
- }
- builder->CreateStore(null_value, dst);
- }
-}
-
-// Codegen for evaluating a tuple row over either build_exprs_ or probe_exprs_.
-// For the case where we are joining on a single int, the IR looks like
-// define i1 @EvaBuildRow(%"class.impala::OldHashTable"* %this_ptr,
-// %"class.impala::TupleRow"* %row) {
-// entry:
-// %null_ptr = alloca i1
-// %0 = bitcast %"class.impala::TupleRow"* %row to i8**
-// %eval = call i32 @SlotRef(i8** %0, i8* null, i1* %null_ptr)
-// %1 = load i1* %null_ptr
-// br i1 %1, label %null, label %not_null
-//
-// null: ; preds = %entry
-// ret i1 true
-//
-// not_null: ; preds = %entry
-// store i32 %eval, i32* inttoptr (i64 46146336 to i32*)
-// br label %continue
-//
-// continue: ; preds = %not_null
-// %2 = zext i1 %1 to i8
-// store i8 %2, i8* inttoptr (i64 46146248 to i8*)
-// ret i1 false
-// }
-// For each expr, we create 3 code blocks. The null, not null and continue blocks.
-// Both the null and not null branch into the continue block. The continue block
-// becomes the start of the next block for codegen (either the next expr or just the
-// end of the function).
-Function* OldHashTable::CodegenEvalTupleRow(LlvmCodeGen* codegen, bool build) {
- DCHECK_EQ(build_exprs_.size(), probe_exprs_.size());
- const vector<ScalarExpr*>& exprs = build ? build_exprs_ : probe_exprs_;
- for (int i = 0; i < exprs.size(); ++i) {
- PrimitiveType type = exprs[i]->type().type;
- if (type == TYPE_CHAR) return NULL;
- }
-
- // Get types to generate function prototype
- Type* tuple_row_type = codegen->GetType(TupleRow::LLVM_CLASS_NAME);
- DCHECK(tuple_row_type != NULL);
- PointerType* tuple_row_ptr_type = PointerType::get(tuple_row_type, 0);
-
- Type* this_type = codegen->GetType(OldHashTable::LLVM_CLASS_NAME);
- DCHECK(this_type != NULL);
- PointerType* this_ptr_type = PointerType::get(this_type, 0);
-
- LlvmCodeGen::FnPrototype prototype(codegen, build ? "EvalBuildRow" : "EvalProbeRow",
- codegen->GetType(TYPE_BOOLEAN));
- prototype.AddArgument(LlvmCodeGen::NamedVariable("this_ptr", this_ptr_type));
- prototype.AddArgument(LlvmCodeGen::NamedVariable("row", tuple_row_ptr_type));
-
- LLVMContext& context = codegen->context();
- LlvmBuilder builder(context);
- Value* args[2];
- Function* fn = prototype.GeneratePrototype(&builder, args);
- Value* this_ptr = args[0];
- Value* row = args[1];
- Value* has_null = codegen->false_value();
-
- IRFunction::Type fn_name = build ?
- IRFunction::OLD_HASH_TABLE_GET_BUILD_EXPR_EVALUATORS :
- IRFunction::OLD_HASH_TABLE_GET_PROBE_EXPR_EVALUATORS;
- Function* get_expr_eval_fn = codegen->GetFunction(fn_name, false);
- DCHECK(get_expr_eval_fn != NULL);
-
- // Aggregation with no grouping exprs also use the hash table interface for
- // code simplicity. In that case, there are no build exprs.
- if (!exprs.empty()) {
- // Load build_expr_evals_.data() / probe_expr_evals_.data()
- Value* eval_vector = codegen->CodegenCallFunction(&builder, build ?
- IRFunction::OLD_HASH_TABLE_GET_BUILD_EXPR_EVALUATORS :
- IRFunction::OLD_HASH_TABLE_GET_PROBE_EXPR_EVALUATORS,
- this_ptr, "eval_vector");
-
- // Load expr_values_buffer_
- Value* expr_values_buffer = codegen->CodegenCallFunction(&builder,
- IRFunction::OLD_HASH_TABLE_GET_EXPR_VALUES_BUFFER,
- this_ptr, "expr_values_buffer");
-
- // Load expr_values_null_bits_
- Value* expr_value_null_bits = codegen->CodegenCallFunction(&builder,
- IRFunction::OLD_HASH_TABLE_GET_EXPR_VALUE_NULL_BITS,
- this_ptr, "expr_value_null_bits");
-
- for (int i = 0; i < exprs.size(); ++i) {
- BasicBlock* null_block = BasicBlock::Create(context, "null", fn);
- BasicBlock* not_null_block = BasicBlock::Create(context, "not_null", fn);
- BasicBlock* continue_block = BasicBlock::Create(context, "continue", fn);
-
- // loc_addr = expr_values_buffer_ + expr_values_buffer_offsets_[i]
- Value* llvm_loc = builder.CreateInBoundsGEP(NULL, expr_values_buffer,
- codegen->GetIntConstant(TYPE_INT, expr_values_buffer_offsets_[i]), "loc_addr");
- llvm_loc = builder.CreatePointerCast(llvm_loc,
- codegen->GetPtrType(exprs[i]->type()), "loc");
-
- // Codegen GetValue() for exprs[i]
- Function* expr_fn;
- Status status = exprs[i]->GetCodegendComputeFn(codegen, &expr_fn);
- if (!status.ok()) {
- fn->eraseFromParent(); // deletes function
- VLOG_QUERY << "Failed to codegen EvalTupleRow(): " << status.GetDetail();
- return NULL;
- }
-
- // Load evals[i] and call GetValue()
- Value* eval_arg =
- codegen->CodegenArrayAt(&builder, eval_vector, i, "eval");
- DCHECK(eval_arg->getType()->isPointerTy());
- CodegenAnyVal result = CodegenAnyVal::CreateCallWrapped(codegen, &builder,
- exprs[i]->type(), expr_fn, {eval_arg, row}, "result");
- Value* is_null = result.GetIsNull();
-
- // Set null-byte result
- Value* null_bits = builder.CreateZExt(is_null, codegen->GetType(TYPE_TINYINT));
- Value* llvm_null_bits_loc = builder.CreateInBoundsGEP(NULL, expr_value_null_bits,
- codegen->GetIntConstant(TYPE_INT, i), "null_bits_loc");
- builder.CreateStore(null_bits, llvm_null_bits_loc);
- builder.CreateCondBr(is_null, null_block, not_null_block);
-
- // Null block
- builder.SetInsertPoint(null_block);
- if (!stores_nulls_) {
- // hash table doesn't store nulls, no reason to keep evaluating exprs
- builder.CreateRet(codegen->true_value());
- } else {
- CodegenAssignNullValue(codegen, &builder, llvm_loc, exprs[i]->type());
- has_null = codegen->true_value();
- builder.CreateBr(continue_block);
- }
-
- // Not null block
- builder.SetInsertPoint(not_null_block);
- result.ToNativePtr(llvm_loc);
- builder.CreateBr(continue_block);
-
- builder.SetInsertPoint(continue_block);
- }
- }
- builder.CreateRet(has_null);
- return codegen->FinalizeFunction(fn);
-}
-
-uint32_t OldHashTable::HashVariableLenRow() {
- uint32_t hash = initial_seed_;
- // Hash the non-var length portions (if there are any)
- if (var_result_begin_ != 0) {
- hash = HashUtil::Hash(expr_values_buffer_, var_result_begin_, hash);
- }
-
- for (int i = 0; i < build_exprs_.size(); ++i) {
- // non-string and null slots are already part of expr_values_buffer
- if (build_exprs_[i]->type().type != TYPE_STRING &&
- build_exprs_[i]->type().type != TYPE_VARCHAR) {
- continue;
- }
-
- void* loc = expr_values_buffer_ + expr_values_buffer_offsets_[i];
- if (expr_value_null_bits_[i]) {
- // Hash the null random seed values at 'loc'
- hash = HashUtil::Hash(loc, sizeof(StringValue), hash);
- } else {
- // Hash the string
- StringValue* str = reinterpret_cast<StringValue*>(loc);
- hash = HashUtil::Hash(str->ptr, str->len, hash);
- }
- }
- return hash;
-}
-
-// Codegen for hashing the current row. In the case with both string and non-string data
-// (group by int_col, string_col), the IR looks like:
-// define i32 @HashCurrentRow(%"class.impala::OldHashTable"* %this_ptr) {
-// entry:
-// %0 = call i32 @IrCrcHash(i8* inttoptr (i64 51107808 to i8*), i32 16, i32 0)
-// %1 = load i8* inttoptr (i64 29500112 to i8*)
-// %2 = icmp ne i8 %1, 0
-// br i1 %2, label %null, label %not_null
-//
-// null: ; preds = %entry
-// %3 = call i32 @IrCrcHash(i8* inttoptr (i64 51107824 to i8*), i32 16, i32 %0)
-// br label %continue
-//
-// not_null: ; preds = %entry
-// %4 = load i8** getelementptr inbounds (
-// %"struct.impala::StringValue"* inttoptr
-// (i64 51107824 to %"struct.impala::StringValue"*), i32 0, i32 0)
-// %5 = load i32* getelementptr inbounds (
-// %"struct.impala::StringValue"* inttoptr
-// (i64 51107824 to %"struct.impala::StringValue"*), i32 0, i32 1)
-// %6 = call i32 @IrCrcHash(i8* %4, i32 %5, i32 %0)
-// br label %continue
-//
-// continue: ; preds = %not_null, %null
-// %7 = phi i32 [ %6, %not_null ], [ %3, %null ]
-// ret i32 %7
-// }
-// TODO: can this be cross-compiled?
-Function* OldHashTable::CodegenHashCurrentRow(LlvmCodeGen* codegen) {
- for (int i = 0; i < build_exprs_.size(); ++i) {
- // Disable codegen for CHAR
- if (build_exprs_[i]->type().type == TYPE_CHAR) return NULL;
- }
-
- // Get types to generate function prototype
- Type* this_type = codegen->GetType(OldHashTable::LLVM_CLASS_NAME);
- DCHECK(this_type != NULL);
- PointerType* this_ptr_type = PointerType::get(this_type, 0);
-
- LlvmCodeGen::FnPrototype prototype(codegen, "HashCurrentRow",
- codegen->GetType(TYPE_INT));
- prototype.AddArgument(LlvmCodeGen::NamedVariable("this_ptr", this_ptr_type));
-
- LLVMContext& context = codegen->context();
- LlvmBuilder builder(context);
- Value* this_ptr;
- Function* fn = prototype.GeneratePrototype(&builder, &this_ptr);
-
- // Load expr_values_buffer_
- Value* expr_values_buffer = codegen->CodegenCallFunction(&builder,
- IRFunction::OLD_HASH_TABLE_GET_EXPR_VALUES_BUFFER, this_ptr, "expr_values_buffer");
-
- Value* hash_result = codegen->GetIntConstant(TYPE_INT, initial_seed_);
- if (var_result_begin_ == -1) {
- // No variable length slots, just hash what is in 'expr_values_buffer_'
- if (results_buffer_size_ > 0) {
- Function* hash_fn = codegen->GetHashFunction(results_buffer_size_);
- Value* len = codegen->GetIntConstant(TYPE_INT, results_buffer_size_);
- hash_result = builder.CreateCall(hash_fn, {expr_values_buffer, len, hash_result});
- }
- } else {
- if (var_result_begin_ > 0) {
- Function* hash_fn = codegen->GetHashFunction(var_result_begin_);
- Value* len = codegen->GetIntConstant(TYPE_INT, var_result_begin_);
- hash_result = builder.CreateCall(hash_fn, {expr_values_buffer, len, hash_result});
- }
-
- // Load expr_value_null_bits_
- Value* expr_value_null_bits = codegen->CodegenCallFunction(&builder,
- IRFunction::OLD_HASH_TABLE_GET_EXPR_VALUE_NULL_BITS,
- this_ptr, "expr_value_null_bits");
-
- // Hash string slots
- for (int i = 0; i < build_exprs_.size(); ++i) {
- if (build_exprs_[i]->type().type != TYPE_STRING
- && build_exprs_[i]->type().type != TYPE_VARCHAR) continue;
-
- BasicBlock* null_block = NULL;
- BasicBlock* not_null_block = NULL;
- BasicBlock* continue_block = NULL;
- Value* str_null_result = NULL;
-
- Value* llvm_buffer_loc = builder.CreateInBoundsGEP(NULL, expr_values_buffer,
- codegen->GetIntConstant(TYPE_INT, expr_values_buffer_offsets_[i]), "buffer_loc");
-
- // If the hash table stores nulls, we need to check if the stringval
- // evaluated to NULL
- if (stores_nulls_) {
- null_block = BasicBlock::Create(context, "null", fn);
- not_null_block = BasicBlock::Create(context, "not_null", fn);
- continue_block = BasicBlock::Create(context, "continue", fn);
-
- // Load expr_values_null_bits_[i] and check if it's set.
- Value* llvm_null_bits_loc = builder.CreateInBoundsGEP(NULL, expr_value_null_bits,
- codegen->GetIntConstant(TYPE_INT, i), "null_bits_loc");
- Value* null_bits = builder.CreateLoad(llvm_null_bits_loc);
- Value* is_null = builder.CreateICmpNE(null_bits,
- codegen->GetIntConstant(TYPE_TINYINT, 0));
- builder.CreateCondBr(is_null, null_block, not_null_block);
-
- // For null, we just want to call the hash function on a portion of the data.
- builder.SetInsertPoint(null_block);
- Function* null_hash_fn = codegen->GetHashFunction(sizeof(StringValue));
- Value* len = codegen->GetIntConstant(TYPE_INT, sizeof(StringValue));
- str_null_result = builder.CreateCall(null_hash_fn,
- ArrayRef<Value*>({llvm_buffer_loc, len, hash_result}));
- builder.CreateBr(continue_block);
-
- builder.SetInsertPoint(not_null_block);
- }
-
- // Convert expr_values_buffer_ loc to llvm value
- Value* str_val = builder.CreatePointerCast(llvm_buffer_loc,
- codegen->GetPtrType(TYPE_STRING), "str_val");
-
- Value* ptr = builder.CreateStructGEP(NULL, str_val, 0, "ptr");
- Value* len = builder.CreateStructGEP(NULL, str_val, 1, "len");
- ptr = builder.CreateLoad(ptr);
- len = builder.CreateLoad(len);
-
- // Call hash(ptr, len, hash_result);
- Function* general_hash_fn = codegen->GetHashFunction();
- Value* string_hash_result =
- builder.CreateCall(general_hash_fn, ArrayRef<Value*>({ptr, len, hash_result}));
-
- if (stores_nulls_) {
- builder.CreateBr(continue_block);
- builder.SetInsertPoint(continue_block);
- // Use phi node to reconcile that we could have come from the string-null
- // path and string not null paths.
- PHINode* phi_node = builder.CreatePHI(codegen->GetType(TYPE_INT), 2);
- phi_node->addIncoming(string_hash_result, not_null_block);
- phi_node->addIncoming(str_null_result, null_block);
- hash_result = phi_node;
- } else {
- hash_result = string_hash_result;
- }
- }
- }
-
- builder.CreateRet(hash_result);
- return codegen->FinalizeFunction(fn);
-}
-
-bool OldHashTable::Equals(TupleRow* build_row) {
- for (int i = 0; i < build_exprs_.size(); ++i) {
- void* val = build_expr_evals_[i]->GetValue(build_row);
- if (val == NULL) {
- if (!(stores_nulls_ && finds_nulls_[i])) return false;
- if (!expr_value_null_bits_[i]) return false;
- continue;
- } else {
- if (expr_value_null_bits_[i]) return false;
- }
-
- void* loc = expr_values_buffer_ + expr_values_buffer_offsets_[i];
- if (!RawValue::Eq(loc, val, build_exprs_[i]->type())) {
- return false;
- }
- }
- return true;
-}
-
-// Codegen for OldHashTable::Equals. For a hash table with two exprs (string,int), the
-// IR looks like:
-//
-// define i1 @Equals(%"class.impala::OldHashTable"* %this_ptr,
-// %"class.impala::TupleRow"* %row) {
-// entry:
-// %result = call i64 @GetSlotRef(%"class.impala::ScalarExpr"* inttoptr
-// (i64 146381856 to %"class.impala::ScalarExpr"*),
-// %"class.impala::TupleRow"* %row)
-// %0 = trunc i64 %result to i1
-// br i1 %0, label %null, label %not_null
-//
-// false_block: ; preds = %not_null2, %null1, %not_null, %null
-// ret i1 false
-//
-// null: ; preds = %entry
-// br i1 false, label %continue, label %false_block
-//
-// not_null: ; preds = %entry
-// %1 = load i32* inttoptr (i64 104774368 to i32*)
-// %2 = ashr i64 %result, 32
-// %3 = trunc i64 %2 to i32
-// %cmp_raw = icmp eq i32 %3, %1
-// br i1 %cmp_raw, label %continue, label %false_block
-//
-// continue: ; preds = %not_null, %null
-// %result4 = call { i64, i8* } @GetSlotRef1(
-// %"class.impala::ScalarExpr"* inttoptr
-// (i64 146381696 to %"class.impala::ScalarExpr"*),
-// %"class.impala::TupleRow"* %row)
-// %4 = extractvalue { i64, i8* } %result4, 0
-// %5 = trunc i64 %4 to i1
-// br i1 %5, label %null1, label %not_null2
-//
-// null1: ; preds = %continue
-// br i1 false, label %continue3, label %false_block
-//
-// not_null2: ; preds = %continue
-// %6 = extractvalue { i64, i8* } %result4, 0
-// %7 = ashr i64 %6, 32
-// %8 = trunc i64 %7 to i32
-// %result5 = extractvalue { i64, i8* } %result4, 1
-// %cmp_raw6 = call i1 @_Z11StringValEQPciPKN6impala11StringValueE(
-// i8* %result5, i32 %8, %"struct.impala::StringValue"* inttoptr
-// (i64 104774384 to %"struct.impala::StringValue"*))
-// br i1 %cmp_raw6, label %continue3, label %false_block
-//
-// continue3: ; preds = %not_null2, %null1
-// ret i1 true
-// }
-Function* OldHashTable::CodegenEquals(LlvmCodeGen* codegen) {
- for (int i = 0; i < build_exprs_.size(); ++i) {
- // Disable codegen for CHAR
- if (build_exprs_[i]->type().type == TYPE_CHAR) return NULL;
- }
-
- // Get types to generate function prototype
- Type* tuple_row_type = codegen->GetType(TupleRow::LLVM_CLASS_NAME);
- DCHECK(tuple_row_type != NULL);
- PointerType* tuple_row_ptr_type = PointerType::get(tuple_row_type, 0);
-
- Type* this_type = codegen->GetType(OldHashTable::LLVM_CLASS_NAME);
- DCHECK(this_type != NULL);
- PointerType* this_ptr_type = PointerType::get(this_type, 0);
-
- LlvmCodeGen::FnPrototype prototype(codegen, "Equals", codegen->GetType(TYPE_BOOLEAN));
- prototype.AddArgument(LlvmCodeGen::NamedVariable("this_ptr", this_ptr_type));
- prototype.AddArgument(LlvmCodeGen::NamedVariable("row", tuple_row_ptr_type));
-
- LLVMContext& context = codegen->context();
- LlvmBuilder builder(context);
- Value* args[2];
- Function* fn = prototype.GeneratePrototype(&builder, args);
- Value* this_ptr = args[0];
- Value* row = args[1];
-
- if (!build_exprs_.empty()) {
- BasicBlock* false_block = BasicBlock::Create(context, "false_block", fn);
-
- // Load build_expr_evals_.data()
- Value* eval_vector = codegen->CodegenCallFunction(&builder,
- IRFunction::OLD_HASH_TABLE_GET_BUILD_EXPR_EVALUATORS,
- this_ptr, "eval_vector");
-
- // Load expr_values_buffer_
- Value* expr_values_buffer = codegen->CodegenCallFunction(&builder,
- IRFunction::OLD_HASH_TABLE_GET_EXPR_VALUES_BUFFER,
- this_ptr, "expr_values_buffer");
-
- // Load expr_value_null_bits_
- Value* expr_value_null_bits = codegen->CodegenCallFunction(&builder,
- IRFunction::OLD_HASH_TABLE_GET_EXPR_VALUE_NULL_BITS,
- this_ptr, "expr_value_null_bits");
-
- for (int i = 0; i < build_exprs_.size(); ++i) {
- BasicBlock* null_block = BasicBlock::Create(context, "null", fn);
- BasicBlock* not_null_block = BasicBlock::Create(context, "not_null", fn);
- BasicBlock* continue_block = BasicBlock::Create(context, "continue", fn);
-
- // Generate GetValue() of build_expr_evals_[i]
- Function* expr_fn;
- Status status = build_exprs_[i]->GetCodegendComputeFn(codegen, &expr_fn);
- if (!status.ok()) {
- fn->eraseFromParent(); // deletes function
- VLOG_QUERY << "Failed to codegen Equals(): " << status.GetDetail();
- return NULL;
- }
-
- // Call GetValue() on build_expr_evals_[i]
- Value* eval_arg =
- codegen->CodegenArrayAt(&builder, eval_vector, i, "eval");
- CodegenAnyVal result = CodegenAnyVal::CreateCallWrapped(codegen, &builder,
- build_exprs_[i]->type(), expr_fn, {eval_arg, row}, "result");
- Value* is_null = result.GetIsNull();
-
- // Determine if probe is null (i.e. expr_value_null_bits_[i] == true). In
- // the case where the hash table does not store nulls, this is always false.
- Value* probe_is_null = codegen->false_value();
- if (stores_nulls_ && finds_nulls_[i]) {
- Value* llvm_null_bits_loc = builder.CreateInBoundsGEP(NULL, expr_value_null_bits,
- codegen->GetIntConstant(TYPE_INT, i), "null_bits_loc");
- Value* null_bits = builder.CreateLoad(llvm_null_bits_loc, "null_bits");
- probe_is_null = builder.CreateICmpNE(null_bits,
- codegen->GetIntConstant(TYPE_TINYINT, 0));
- }
-
- // Get llvm value for probe_val from 'expr_values_buffer_'
- Value* probe_val = builder.CreateInBoundsGEP(NULL, expr_values_buffer,
- codegen->GetIntConstant(TYPE_INT, expr_values_buffer_offsets_[i]), "probe_val");
- probe_val = builder.CreatePointerCast(
- probe_val, codegen->GetPtrType(build_exprs_[i]->type()));
-
- // Branch for GetValue() returning NULL
- builder.CreateCondBr(is_null, null_block, not_null_block);
-
- // Null block
- builder.SetInsertPoint(null_block);
- builder.CreateCondBr(probe_is_null, continue_block, false_block);
-
- // Not-null block
- builder.SetInsertPoint(not_null_block);
- if (stores_nulls_) {
- BasicBlock* cmp_block = BasicBlock::Create(context, "cmp", fn);
- // First need to compare that probe_expr[i] is not null
- builder.CreateCondBr(probe_is_null, false_block, cmp_block);
- builder.SetInsertPoint(cmp_block);
- }
- // Check result == probe_val
- Value* is_equal = result.EqToNativePtr(probe_val);
- builder.CreateCondBr(is_equal, continue_block, false_block);
-
- builder.SetInsertPoint(continue_block);
- }
- builder.CreateRet(codegen->true_value());
-
- builder.SetInsertPoint(false_block);
- builder.CreateRet(codegen->false_value());
- } else {
- builder.CreateRet(codegen->true_value());
- }
- return codegen->FinalizeFunction(fn);
-}
-
-void OldHashTable::ResizeBuckets(int64_t num_buckets) {
- DCHECK_EQ((num_buckets & (num_buckets-1)), 0)
- << "num_buckets=" << num_buckets << " must be a power of 2";
-
- int64_t old_num_buckets = num_buckets_;
- // This can be a rather large allocation so check the limit before (to prevent
- // us from going over the limits too much).
- int64_t delta_size = (num_buckets - old_num_buckets) * sizeof(Bucket);
- if (!mem_tracker_->TryConsume(delta_size)) {
- MemLimitExceeded(delta_size);
- return;
- }
- buckets_.resize(num_buckets);
-
- // If we're doubling the number of buckets, all nodes in a particular bucket
- // either remain there, or move down to an analogous bucket in the other half.
- // In order to efficiently check which of the two buckets a node belongs in, the number
- // of buckets must be a power of 2.
- bool doubled_buckets = (num_buckets == old_num_buckets * 2);
- for (int i = 0; i < num_buckets_; ++i) {
- Bucket* bucket = &buckets_[i];
- Bucket* sister_bucket = &buckets_[i + old_num_buckets];
- Node* last_node = NULL;
- Node* node = bucket->node;
-
- while (node != NULL) {
- Node* next = node->next;
- uint32_t hash = node->hash;
-
- bool node_must_move;
- Bucket* move_to;
- if (doubled_buckets) {
- node_must_move = ((hash & old_num_buckets) != 0);
- move_to = sister_bucket;
- } else {
- int64_t bucket_idx = hash & (num_buckets - 1);
- node_must_move = (bucket_idx != i);
- move_to = &buckets_[bucket_idx];
- }
-
- if (node_must_move) {
- MoveNode(bucket, move_to, node, last_node);
- } else {
- last_node = node;
- }
-
- node = next;
- }
- }
-
- num_buckets_ = num_buckets;
- num_buckets_till_resize_ = MAX_BUCKET_OCCUPANCY_FRACTION * num_buckets_;
-}
-
-void OldHashTable::GrowNodeArray() {
- node_remaining_current_page_ = HT_PAGE_SIZE / sizeof(Node);
- next_node_ = reinterpret_cast<Node*>(mem_pool_->Allocate(HT_PAGE_SIZE));
- ++num_data_pages_;
- if (ImpaladMetrics::HASH_TABLE_TOTAL_BYTES != NULL) {
- ImpaladMetrics::HASH_TABLE_TOTAL_BYTES->Increment(HT_PAGE_SIZE);
- }
- if (mem_tracker_->LimitExceeded()) MemLimitExceeded(HT_PAGE_SIZE);
-}
-
-void OldHashTable::MemLimitExceeded(int64_t allocation_size) {
- mem_limit_exceeded_ = true;
- if (state_ != NULL) state_->SetMemLimitExceeded(mem_tracker_, allocation_size);
-}
-
-string OldHashTable::DebugString(bool skip_empty, bool show_match,
- const RowDescriptor* desc) {
- stringstream ss;
- ss << endl;
- for (int i = 0; i < buckets_.size(); ++i) {
- Node* node = buckets_[i].node;
- bool first = true;
- if (skip_empty && node == NULL) continue;
- ss << i << ": ";
- while (node != NULL) {
- if (!first) ss << ",";
- ss << node << "(" << node->data << ")";
- if (desc != NULL) ss << " " << PrintRow(GetRow(node), *desc);
- if (show_match) {
- if (node->matched) {
- ss << " [M]";
- } else {
- ss << " [U]";
- }
- }
- node = node->next;
- first = false;
- }
- ss << endl;
- }
- return ss.str();
-}