You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2016/05/17 08:30:19 UTC
[3/4] incubator-impala git commit: IMPALA-3286: Prefetching for PHJ
probing.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a59408b5/be/src/exec/partitioned-aggregation-node-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-aggregation-node-ir.cc b/be/src/exec/partitioned-aggregation-node-ir.cc
index 45b5a0e..d575c01 100644
--- a/be/src/exec/partitioned-aggregation-node-ir.cc
+++ b/be/src/exec/partitioned-aggregation-node-ir.cc
@@ -53,13 +53,13 @@ Status PartitionedAggregationNode::ProcessBatch(RowBatch* batch,
template<bool AGGREGATED_ROWS>
Status PartitionedAggregationNode::ProcessRow(TupleRow* __restrict__ row,
HashTableCtx* __restrict__ ht_ctx) {
- uint32_t hash = 0;
if (AGGREGATED_ROWS) {
- if (!ht_ctx->EvalAndHashBuild(row, &hash)) return Status::OK();
+ if (!ht_ctx->EvalAndHashBuild(row)) return Status::OK();
} else {
- if (!ht_ctx->EvalAndHashProbe(row, &hash)) return Status::OK();
+ if (!ht_ctx->EvalAndHashProbe(row)) return Status::OK();
}
+ uint32_t hash = ht_ctx->expr_values_cache()->ExprValuesHash();
// To process this row, we first see if it can be aggregated or inserted into this
// partition's hash table. If we need to insert it and that fails, due to OOM, we
// spill the partition. The partition to spill is not necessarily dst_partition,
@@ -76,7 +76,7 @@ Status PartitionedAggregationNode::ProcessRow(TupleRow* __restrict__ row,
bool found;
// Find the appropriate bucket in the hash table. There will always be a free
// bucket because we checked the size above.
- HashTable::Iterator it = ht->FindBuildRowBucket(ht_ctx, hash, &found);
+ HashTable::Iterator it = ht->FindBuildRowBucket(ht_ctx, &found);
DCHECK(!it.AtEnd()) << "Hash table had no free buckets";
if (AGGREGATED_ROWS) {
// If the row is already an aggregate row, it cannot match anything in the
@@ -149,9 +149,9 @@ Status PartitionedAggregationNode::ProcessBatchStreaming(bool needs_serialize,
RowBatch::Iterator out_batch_iterator(out_batch, out_batch->num_rows());
FOREACH_ROW(in_batch, 0, in_batch_iter) {
- uint32_t hash;
TupleRow* in_row = in_batch_iter.Get();
- if (!ht_ctx->EvalAndHashProbe(in_row, &hash)) continue;
+ if (!ht_ctx->EvalAndHashProbe(in_row)) continue;
+ uint32_t hash = ht_ctx->expr_values_cache()->ExprValuesHash();
const uint32_t partition_idx = hash >> (32 - NUM_PARTITIONING_BITS);
if (TryAddToHashTable(ht_ctx, hash_partitions_[partition_idx], in_row, hash,
@@ -192,7 +192,7 @@ bool PartitionedAggregationNode::TryAddToHashTable(
DCHECK_GE(*remaining_capacity, 0);
bool found;
// This is called from ProcessBatchStreaming() so the rows are not aggregated.
- HashTable::Iterator it = partition->hash_tbl->FindBuildRowBucket(ht_ctx, hash, &found);
+ HashTable::Iterator it = partition->hash_tbl->FindBuildRowBucket(ht_ctx, &found);
Tuple* intermediate_tuple;
if (found) {
intermediate_tuple = it.GetTuple();
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a59408b5/be/src/exec/partitioned-aggregation-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-aggregation-node.cc b/be/src/exec/partitioned-aggregation-node.cc
index b7dca61..2cde059 100644
--- a/be/src/exec/partitioned-aggregation-node.cc
+++ b/be/src/exec/partitioned-aggregation-node.cc
@@ -252,9 +252,9 @@ Status PartitionedAggregationNode::Prepare(RuntimeState* state) {
RETURN_IF_ERROR(state_->GetQueryStatus());
singleton_output_tuple_returned_ = false;
} else {
- ht_ctx_.reset(new HashTableCtx(build_expr_ctxs_, grouping_expr_ctxs_, true,
- std::vector<bool>(build_expr_ctxs_.size(), true), state->fragment_hash_seed(),
- MAX_PARTITION_DEPTH, 1));
+ RETURN_IF_ERROR(HashTableCtx::Create(state, build_expr_ctxs_, grouping_expr_ctxs_,
+ true, vector<bool>(build_expr_ctxs_.size(), true), state->fragment_hash_seed(),
+ MAX_PARTITION_DEPTH, 1, mem_tracker(), &ht_ctx_));
RETURN_IF_ERROR(state_->block_mgr()->RegisterClient(
Substitute("PartitionedAggregationNode id=$0 ptr=$1", id_, this),
MinRequiredBuffers(), true, mem_tracker(), state, &block_mgr_client_));
@@ -984,9 +984,9 @@ int PartitionedAggregationNode::GroupingExprsVarlenSize() {
int varlen_size = 0;
// TODO: The hash table could compute this as it hashes.
for (int expr_idx: string_grouping_exprs_) {
- StringValue* sv = reinterpret_cast<StringValue*>(ht_ctx_->last_expr_value(expr_idx));
+ StringValue* sv = reinterpret_cast<StringValue*>(ht_ctx_->ExprValue(expr_idx));
// Avoid branching by multiplying length by null bit.
- varlen_size += sv->len * !ht_ctx_->last_expr_value_null(expr_idx);
+ varlen_size += sv->len * !ht_ctx_->ExprValueNull(expr_idx);
}
return varlen_size;
}
@@ -997,17 +997,17 @@ void PartitionedAggregationNode::CopyGroupingValues(Tuple* intermediate_tuple,
// Copy over all grouping slots (the variable length data is copied below).
for (int i = 0; i < grouping_expr_ctxs_.size(); ++i) {
SlotDescriptor* slot_desc = intermediate_tuple_desc_->slots()[i];
- if (ht_ctx_->last_expr_value_null(i)) {
+ if (ht_ctx_->ExprValueNull(i)) {
intermediate_tuple->SetNull(slot_desc->null_indicator_offset());
} else {
- void* src = ht_ctx_->last_expr_value(i);
+ void* src = ht_ctx_->ExprValue(i);
void* dst = intermediate_tuple->GetSlot(slot_desc->tuple_offset());
memcpy(dst, src, slot_desc->slot_size());
}
}
for (int expr_idx: string_grouping_exprs_) {
- if (ht_ctx_->last_expr_value_null(expr_idx)) continue;
+ if (ht_ctx_->ExprValueNull(expr_idx)) continue;
SlotDescriptor* slot_desc = intermediate_tuple_desc_->slots()[expr_idx];
// ptr and len were already copied to the fixed-len part of string value
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a59408b5/be/src/exec/partitioned-hash-join-node-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-node-ir.cc b/be/src/exec/partitioned-hash-join-node-ir.cc
index ef4c010..d66eda0 100644
--- a/be/src/exec/partitioned-hash-join-node-ir.cc
+++ b/be/src/exec/partitioned-hash-join-node-ir.cc
@@ -219,67 +219,145 @@ bool IR_ALWAYS_INLINE PartitionedHashJoinNode::ProcessProbeRowOuterJoins(
}
template<int const JoinOp>
+bool IR_ALWAYS_INLINE PartitionedHashJoinNode::ProcessProbeRow(
+ ExprContext* const* other_join_conjunct_ctxs, int num_other_join_conjuncts,
+ ExprContext* const* conjunct_ctxs, int num_conjuncts,
+ RowBatch::Iterator* out_batch_iterator, int* remaining_capacity, Status* status) {
+ if (JoinOp == TJoinOp::INNER_JOIN) {
+ return ProcessProbeRowInnerJoin(other_join_conjunct_ctxs, num_other_join_conjuncts,
+ conjunct_ctxs, num_conjuncts, out_batch_iterator, remaining_capacity);
+ } else if (JoinOp == TJoinOp::RIGHT_SEMI_JOIN ||
+ JoinOp == TJoinOp::RIGHT_ANTI_JOIN) {
+ return ProcessProbeRowRightSemiJoins<JoinOp>(other_join_conjunct_ctxs,
+ num_other_join_conjuncts, conjunct_ctxs, num_conjuncts, out_batch_iterator,
+ remaining_capacity);
+ } else if (JoinOp == TJoinOp::LEFT_SEMI_JOIN ||
+ JoinOp == TJoinOp::LEFT_ANTI_JOIN ||
+ JoinOp == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
+ return ProcessProbeRowLeftSemiJoins<JoinOp>(other_join_conjunct_ctxs,
+ num_other_join_conjuncts, conjunct_ctxs, num_conjuncts, out_batch_iterator,
+ remaining_capacity, status);
+ } else {
+ DCHECK(JoinOp == TJoinOp::RIGHT_OUTER_JOIN ||
+ JoinOp == TJoinOp::LEFT_OUTER_JOIN || TJoinOp::FULL_OUTER_JOIN);
+ return ProcessProbeRowOuterJoins<JoinOp>(other_join_conjunct_ctxs,
+ num_other_join_conjuncts, conjunct_ctxs, num_conjuncts, out_batch_iterator,
+ remaining_capacity);
+ }
+}
+
+template<int const JoinOp>
bool IR_ALWAYS_INLINE PartitionedHashJoinNode::NextProbeRow(
HashTableCtx* ht_ctx, RowBatch::Iterator* probe_batch_iterator,
- int* remaining_capacity, int num_other_join_conjuncts, Status* status) {
- while (!probe_batch_iterator->AtEnd()) {
+ int* remaining_capacity, Status* status) {
+ HashTableCtx::ExprValuesCache* expr_vals_cache = ht_ctx->expr_values_cache();
+ while (!expr_vals_cache->AtEnd()) {
// Establish current_probe_row_ and find its corresponding partition.
+ DCHECK(!probe_batch_iterator->AtEnd());
current_probe_row_ = probe_batch_iterator->Get();
- probe_batch_iterator->Next();
matched_probe_ = false;
- uint32_t hash;
- if (!ht_ctx->EvalAndHashProbe(current_probe_row_, &hash)) {
+ // True if the current row should be skipped for probing.
+ bool skip_row = false;
+
+ // The hash of the expressions results for the current probe row.
+ uint32_t hash = expr_vals_cache->ExprValuesHash();
+ // Hoist the followings out of the else statement below to speed up non-null case.
+ const uint32_t partition_idx = hash >> (32 - NUM_PARTITIONING_BITS);
+ HashTable* hash_tbl = hash_tbls_[partition_idx];
+
+ // Fetch the hash and expr values' nullness for this row.
+ if (expr_vals_cache->IsRowNull()) {
if (JoinOp == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN && non_empty_build_) {
+ const int num_other_join_conjuncts = other_join_conjunct_ctxs_.size();
// For NAAJ, we need to treat NULLs on the probe carefully. The logic is:
- // 1. No build rows -> Return this row.
+ // 1. No build rows -> Return this row. The check for 'non_empty_build_'
+ // is for this case.
// 2. Has build rows & no other join predicates, skip row.
// 3. Has build rows & other join predicates, we need to evaluate against all
// build rows. First evaluate it against this partition, and if there is not
// a match, save it to evaluate against other partitions later. If there
// is a match, the row is skipped.
- if (num_other_join_conjuncts > 0) {
- if (UNLIKELY(!AppendRow(null_probe_rows_, current_probe_row_, status))) {
+ if (num_other_join_conjuncts == 0) {
+ // Condition 2 above.
+ skip_row = true;
+ } else if (LIKELY(AppendRow(null_probe_rows_, current_probe_row_, status))) {
+ // Condition 3 above.
+ matched_null_probe_.push_back(false);
+ skip_row = true;
+ } else {
+ // Condition 3 above but failed to append to 'null_probe_rows_'. Bail out.
+ DCHECK(!status->ok());
+ return false;
+ }
+ }
+ } else {
+ // The build partition is in memory. Return this row for probing.
+ if (LIKELY(hash_tbl != NULL)) {
+ hash_tbl_iterator_ = hash_tbl->FindProbeRow(ht_ctx);
+ } else {
+ // The build partition is either empty or spilled.
+ Partition* partition = hash_partitions_[partition_idx];
+ // This partition is closed, meaning the build side for this partition was empty.
+ if (UNLIKELY(partition->is_closed())) {
+ DCHECK(state_ == PROCESSING_PROBE || state_ == REPARTITIONING);
+ } else {
+ // This partition is not in memory, spill the probe row and move to the next row.
+ DCHECK(partition->is_spilled());
+ DCHECK(partition->probe_rows() != NULL);
+ // Skip the current row if we manage to append to the spilled partition's BTS.
+ // Otherwise, we need to bail out and report the failure.
+ if (UNLIKELY(!AppendRow(partition->probe_rows(), current_probe_row_, status))) {
DCHECK(!status->ok());
return false;
}
- matched_null_probe_.push_back(false);
+ skip_row = true;
}
- continue;
}
- return true;
- }
- const uint32_t partition_idx = hash >> (32 - NUM_PARTITIONING_BITS);
- // The build partition is in memory. Return this row for probing.
- if (LIKELY(hash_tbls_[partition_idx] != NULL)) {
- hash_tbl_iterator_ = hash_tbls_[partition_idx]->FindProbeRow(ht_ctx, hash);
- return true;
- }
- // The build partition is either empty or spilled.
- Partition* partition = hash_partitions_[partition_idx];
- // This partition is closed, meaning the build side for this partition was empty.
- if (UNLIKELY(partition->is_closed())) {
- DCHECK(state_ == PROCESSING_PROBE || state_ == REPARTITIONING);
- return true;
- }
- // This partition is not in memory, spill the probe row and move to the next row.
- DCHECK(partition->is_spilled());
- DCHECK(partition->probe_rows() != NULL);
- if (UNLIKELY(!AppendRow(partition->probe_rows(), current_probe_row_, status))) {
- DCHECK(!status->ok());
- return false;
}
+ // Move to the next probe row and hash table context's cached value.
+ probe_batch_iterator->Next();
+ expr_vals_cache->NextRow();
+ if (skip_row) continue;
+ DCHECK(status->ok());
+ return true;
+ }
+ if (probe_batch_iterator->AtEnd()) {
+ // No more probe row.
+ current_probe_row_ = NULL;
}
- // Finished this batch.
- current_probe_row_ = NULL;
return false;
}
-// CreateOutputRow, EvalOtherJoinConjuncts, and EvalConjuncts are replaced by
-// codegen.
+void IR_ALWAYS_INLINE PartitionedHashJoinNode::EvalAndHashProbePrefetchGroup(
+ TPrefetchMode::type prefetch_mode, HashTableCtx* ht_ctx) {
+ RowBatch* probe_batch = probe_batch_.get();
+ HashTableCtx::ExprValuesCache* expr_vals_cache = ht_ctx->expr_values_cache();
+ const int prefetch_size = expr_vals_cache->capacity();
+ DCHECK(expr_vals_cache->AtEnd());
+
+ expr_vals_cache->Reset();
+ FOREACH_ROW_LIMIT(probe_batch, probe_batch_pos_, prefetch_size, batch_iter) {
+ TupleRow* row = batch_iter.Get();
+ if (ht_ctx->EvalAndHashProbe(row)) {
+ if (prefetch_mode != TPrefetchMode::NONE) {
+ uint32_t hash = expr_vals_cache->ExprValuesHash();
+ const uint32_t partition_idx = hash >> (32 - NUM_PARTITIONING_BITS);
+ HashTable* hash_tbl = hash_tbls_[partition_idx];
+ if (LIKELY(hash_tbl != NULL)) hash_tbl->PrefetchBucket<true>(hash);
+ }
+ } else {
+ expr_vals_cache->SetRowNull();
+ }
+ expr_vals_cache->NextRow();
+ }
+ expr_vals_cache->ResetForRead();
+}
+
+// CreateOutputRow, EvalOtherJoinConjuncts, and EvalConjuncts are replaced by codegen.
template<int const JoinOp>
-int PartitionedHashJoinNode::ProcessProbeBatch(RowBatch* out_batch,
- HashTableCtx* __restrict__ ht_ctx, Status* __restrict__ status) {
+int PartitionedHashJoinNode::ProcessProbeBatch(TPrefetchMode::type prefetch_mode,
+ RowBatch* out_batch, HashTableCtx* __restrict__ ht_ctx, Status* __restrict__ status) {
ExprContext* const* other_join_conjunct_ctxs = &other_join_conjunct_ctxs_[0];
const int num_other_join_conjuncts = other_join_conjunct_ctxs_.size();
ExprContext* const* conjunct_ctxs = &conjunct_ctxs_[0];
@@ -292,48 +370,51 @@ int PartitionedHashJoinNode::ProcessProbeBatch(RowBatch* out_batch,
// Note that 'probe_batch_pos_' is the row no. of the row after 'current_probe_row_'.
RowBatch::Iterator probe_batch_iterator(probe_batch_.get(), probe_batch_pos_);
int remaining_capacity = max_rows;
+ bool has_probe_rows = current_probe_row_ != NULL || !probe_batch_iterator.AtEnd();
+ HashTableCtx::ExprValuesCache* expr_vals_cache = ht_ctx->expr_values_cache();
- do {
- if (current_probe_row_ != NULL) {
- if (JoinOp == TJoinOp::INNER_JOIN) {
- if (!ProcessProbeRowInnerJoin(other_join_conjunct_ctxs, num_other_join_conjuncts,
- conjunct_ctxs, num_conjuncts, &out_batch_iterator, &remaining_capacity)) {
- break;
- }
- } else if (JoinOp == TJoinOp::RIGHT_SEMI_JOIN ||
- JoinOp == TJoinOp::RIGHT_ANTI_JOIN) {
- if (!ProcessProbeRowRightSemiJoins<JoinOp>(other_join_conjunct_ctxs,
- num_other_join_conjuncts, conjunct_ctxs, num_conjuncts, &out_batch_iterator,
- &remaining_capacity)) {
- break;
- }
- } else if (JoinOp == TJoinOp::LEFT_SEMI_JOIN ||
- JoinOp == TJoinOp::LEFT_ANTI_JOIN ||
- JoinOp == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
- if (!ProcessProbeRowLeftSemiJoins<JoinOp>(other_join_conjunct_ctxs,
- num_other_join_conjuncts, conjunct_ctxs, num_conjuncts, &out_batch_iterator,
- &remaining_capacity, status)) {
- break;
- }
- } else {
- DCHECK(JoinOp == TJoinOp::RIGHT_OUTER_JOIN ||
- JoinOp == TJoinOp::LEFT_OUTER_JOIN || TJoinOp::FULL_OUTER_JOIN);
- if (!ProcessProbeRowOuterJoins<JoinOp>(other_join_conjunct_ctxs,
- num_other_join_conjuncts, conjunct_ctxs, num_conjuncts, &out_batch_iterator,
- &remaining_capacity)) {
+ // Keep processing more probe rows if there are more to process and the output batch
+ // has room and we haven't hit any error yet.
+ while (has_probe_rows && remaining_capacity > 0 && status->ok()) {
+ // Prefetch for the current hash_tbl_iterator_.
+ if (prefetch_mode != TPrefetchMode::NONE) {
+ hash_tbl_iterator_.PrefetchBucket<true>();
+ }
+ // Evaluate and hash more rows if prefetch group is empty. A prefetch group is a cache
+ // of probe expressions results, nullness of the expression values and hash values
+ // against some consecutive number of rows in the probe batch. Prefetching, if
+ // enabled, is interleaved with the rows' evaluation and hashing. If the prefetch
+ // group is partially full (e.g. we returned before the current prefetch group was
+ // exhausted in the previous iteration), we will proceed with the remaining items in
+ // the values cache.
+ if (expr_vals_cache->AtEnd()) {
+ EvalAndHashProbePrefetchGroup(prefetch_mode, ht_ctx);
+ }
+ // Process the prefetch group.
+ do {
+ // 'current_probe_row_' can be NULL on the first iteration through this loop.
+ if (current_probe_row_ != NULL) {
+ if (!ProcessProbeRow<JoinOp>(other_join_conjunct_ctxs, num_other_join_conjuncts,
+ conjunct_ctxs, num_conjuncts, &out_batch_iterator, &remaining_capacity,
+ status)) {
+ if (status->ok()) DCHECK_EQ(remaining_capacity, 0);
break;
}
}
- }
- // Must have reached the end of the hash table iterator for the current row before
- // moving to the next row.
- DCHECK(hash_tbl_iterator_.AtEnd());
- DCHECK(status->ok());
- } while (NextProbeRow<JoinOp>(ht_ctx, &probe_batch_iterator, &remaining_capacity,
- num_other_join_conjuncts, status));
- // Update where we are in the probe batch.
- probe_batch_pos_ = (probe_batch_iterator.Get() - probe_batch_->GetRow(0)) /
- probe_batch_->num_tuples_per_row();
+ // Must have reached the end of the hash table iterator for the current row before
+ // moving to the next row.
+ DCHECK(hash_tbl_iterator_.AtEnd());
+ DCHECK(status->ok());
+ } while (NextProbeRow<JoinOp>(ht_ctx, &probe_batch_iterator, &remaining_capacity,
+ status));
+ // Update whether there are more probe rows to process in the current batch.
+ has_probe_rows = current_probe_row_ != NULL;
+ if (!has_probe_rows) DCHECK(probe_batch_iterator.AtEnd());
+ // Update where we are in the probe batch.
+ probe_batch_pos_ = (probe_batch_iterator.Get() - probe_batch_->GetRow(0)) /
+ probe_batch_->num_tuples_per_row();
+ }
+
int num_rows_added;
if (LIKELY(status->ok())) {
num_rows_added = max_rows - remaining_capacity;
@@ -346,42 +427,14 @@ int PartitionedHashJoinNode::ProcessProbeBatch(RowBatch* out_batch,
return num_rows_added;
}
-int PartitionedHashJoinNode::ProcessProbeBatch(
- const TJoinOp::type join_op, RowBatch* out_batch,
- HashTableCtx* __restrict__ ht_ctx, Status* __restrict__ status) {
- switch (join_op) {
- case TJoinOp::INNER_JOIN:
- return ProcessProbeBatch<TJoinOp::INNER_JOIN>(out_batch, ht_ctx, status);
- case TJoinOp::LEFT_OUTER_JOIN:
- return ProcessProbeBatch<TJoinOp::LEFT_OUTER_JOIN>(out_batch, ht_ctx, status);
- case TJoinOp::LEFT_SEMI_JOIN:
- return ProcessProbeBatch<TJoinOp::LEFT_SEMI_JOIN>(out_batch, ht_ctx, status);
- case TJoinOp::LEFT_ANTI_JOIN:
- return ProcessProbeBatch<TJoinOp::LEFT_ANTI_JOIN>(out_batch, ht_ctx, status);
- case TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN:
- return ProcessProbeBatch<TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN>(out_batch, ht_ctx,
- status);
- case TJoinOp::RIGHT_OUTER_JOIN:
- return ProcessProbeBatch<TJoinOp::RIGHT_OUTER_JOIN>(out_batch, ht_ctx, status);
- case TJoinOp::RIGHT_SEMI_JOIN:
- return ProcessProbeBatch<TJoinOp::RIGHT_SEMI_JOIN>(out_batch, ht_ctx, status);
- case TJoinOp::RIGHT_ANTI_JOIN:
- return ProcessProbeBatch<TJoinOp::RIGHT_ANTI_JOIN>(out_batch, ht_ctx, status);
- case TJoinOp::FULL_OUTER_JOIN:
- return ProcessProbeBatch<TJoinOp::FULL_OUTER_JOIN>(out_batch, ht_ctx, status);
- default:
- DCHECK(false) << "Unknown join type";
- return -1;
- }
-}
-
Status PartitionedHashJoinNode::ProcessBuildBatch(RowBatch* build_batch,
bool build_filters) {
+ HashTableCtx::ExprValuesCache* expr_vals_cache = ht_ctx_->expr_values_cache();
+ expr_vals_cache->Reset();
FOREACH_ROW(build_batch, 0, build_batch_iter) {
DCHECK(build_status_.ok());
- uint32_t hash;
TupleRow* build_row = build_batch_iter.Get();
- if (!ht_ctx_->EvalAndHashBuild(build_row, &hash)) {
+ if (!ht_ctx_->EvalAndHashBuild(build_row)) {
if (null_aware_partition_ != NULL) {
// TODO: remove with codegen/template
// If we are NULL aware and this build row has NULL in the eq join slot,
@@ -405,6 +458,7 @@ Status PartitionedHashJoinNode::ProcessBuildBatch(RowBatch* build_batch,
ctx.local_bloom_filter->Insert(filter_hash);
}
}
+ const uint32_t hash = expr_vals_cache->ExprValuesHash();
const uint32_t partition_idx = hash >> (32 - NUM_PARTITIONING_BITS);
Partition* partition = hash_partitions_[partition_idx];
const bool result = AppendRow(partition->build_rows(), build_row, &build_status_);
@@ -413,37 +467,69 @@ Status PartitionedHashJoinNode::ProcessBuildBatch(RowBatch* build_batch,
return Status::OK();
}
-bool PartitionedHashJoinNode::Partition::InsertBatch(HashTableCtx* ht_ctx,
- RowBatch* batch, const vector<BufferedTupleStream::RowIdx>& indices) {
- DCHECK_LE(batch->num_rows(), hash_values_.size());
- DCHECK_LE(batch->num_rows(), null_bitmap_.num_bits());
+bool PartitionedHashJoinNode::Partition::InsertBatch(
+ TPrefetchMode::type prefetch_mode, HashTableCtx* ht_ctx, RowBatch* batch,
+ const vector<BufferedTupleStream::RowIdx>& indices) {
// Compute the hash values and prefetch the hash table buckets.
- int i = 0;
- uint32_t* hash_values = hash_values_.data();
- null_bitmap_.SetAllBits(false);
- FOREACH_ROW(batch, 0, batch_iter) {
- if (ht_ctx->EvalAndHashBuild(batch_iter.Get(), &hash_values[i])) {
- // TODO: Find the optimal prefetch batch size. This may be something
- // processor dependent so we may need calibration at Impala startup time.
- hash_tbl_->PrefetchBucket<false>(hash_values[i]);
- } else {
- null_bitmap_.Set<false>(i, true);
+ const int num_rows = batch->num_rows();
+ HashTableCtx::ExprValuesCache* expr_vals_cache = ht_ctx->expr_values_cache();
+ const int prefetch_size = expr_vals_cache->capacity();
+ const BufferedTupleStream::RowIdx* row_indices = indices.data();
+ for (int prefetch_group_row = 0; prefetch_group_row < num_rows;
+ prefetch_group_row += prefetch_size) {
+ int cur_row = prefetch_group_row;
+ expr_vals_cache->Reset();
+ FOREACH_ROW_LIMIT(batch, cur_row, prefetch_size, batch_iter) {
+ if (ht_ctx->EvalAndHashBuild(batch_iter.Get())) {
+ if (prefetch_mode != TPrefetchMode::NONE) {
+ hash_tbl_->PrefetchBucket<false>(expr_vals_cache->ExprValuesHash());
+ }
+ } else {
+ expr_vals_cache->SetRowNull();
+ }
+ expr_vals_cache->NextRow();
}
- ++i;
- }
- // Do the insertion.
- i = 0;
- const BufferedTupleStream::RowIdx* row_idx = indices.data();
- FOREACH_ROW(batch, 0, batch_iter) {
- if (LIKELY(!null_bitmap_.Get<false>(i))) {
+ // Do the insertion.
+ expr_vals_cache->ResetForRead();
+ FOREACH_ROW_LIMIT(batch, cur_row, prefetch_size, batch_iter) {
TupleRow* row = batch_iter.Get();
- if (UNLIKELY(!hash_tbl_->Insert(ht_ctx, row_idx[i], row, hash_values[i]))) {
+ BufferedTupleStream::RowIdx row_idx = row_indices[cur_row];
+ if (!expr_vals_cache->IsRowNull() &&
+ UNLIKELY(!hash_tbl_->Insert(ht_ctx, row_idx, row))) {
return false;
}
+ expr_vals_cache->NextRow();
+ ++cur_row;
}
- ++i;
}
return true;
}
+template int PartitionedHashJoinNode::ProcessProbeBatch<TJoinOp::INNER_JOIN>(
+ TPrefetchMode::type prefetch_mode, RowBatch* out_batch, HashTableCtx* ht_ctx,
+ Status* status);
+template int PartitionedHashJoinNode::ProcessProbeBatch<TJoinOp::LEFT_OUTER_JOIN>(
+ TPrefetchMode::type prefetch_mode, RowBatch* out_batch, HashTableCtx* ht_ctx,
+ Status* status);
+template int PartitionedHashJoinNode::ProcessProbeBatch<TJoinOp::LEFT_SEMI_JOIN>(
+ TPrefetchMode::type prefetch_mode, RowBatch* out_batch, HashTableCtx* ht_ctx,
+ Status* status);
+template int PartitionedHashJoinNode::ProcessProbeBatch<TJoinOp::LEFT_ANTI_JOIN>(
+ TPrefetchMode::type prefetch_mode, RowBatch* out_batch, HashTableCtx* ht_ctx,
+ Status* status);
+template int PartitionedHashJoinNode::ProcessProbeBatch<TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN>(
+ TPrefetchMode::type prefetch_mode, RowBatch* out_batch, HashTableCtx* ht_ctx,
+ Status* status);
+template int PartitionedHashJoinNode::ProcessProbeBatch<TJoinOp::RIGHT_OUTER_JOIN>(
+ TPrefetchMode::type prefetch_mode, RowBatch* out_batch, HashTableCtx* ht_ctx,
+ Status* status);
+template int PartitionedHashJoinNode::ProcessProbeBatch<TJoinOp::RIGHT_SEMI_JOIN>(
+ TPrefetchMode::type prefetch_mode, RowBatch* out_batch, HashTableCtx* ht_ctx,
+ Status* status);
+template int PartitionedHashJoinNode::ProcessProbeBatch<TJoinOp::RIGHT_ANTI_JOIN>(
+ TPrefetchMode::type prefetch_mode, RowBatch* out_batch, HashTableCtx* ht_ctx,
+ Status* status);
+template int PartitionedHashJoinNode::ProcessProbeBatch<TJoinOp::FULL_OUTER_JOIN>(
+ TPrefetchMode::type prefetch_mode, RowBatch* out_batch, HashTableCtx* ht_ctx,
+ Status* status);
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a59408b5/be/src/exec/partitioned-hash-join-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-node.cc b/be/src/exec/partitioned-hash-join-node.cc
index 47c118a..1888e06 100644
--- a/be/src/exec/partitioned-hash-join-node.cc
+++ b/be/src/exec/partitioned-hash-join-node.cc
@@ -143,7 +143,9 @@ Status PartitionedHashJoinNode::Prepare(RuntimeState* state) {
// Although ConstructBuildSide() maybe be run in a separate thread, it is safe to free
// local allocations in QueryMaintenance() since the build thread is not run
// concurrently with other expr evaluation in this join node.
- AddExprCtxsToFree(probe_expr_ctxs_);
+ // Probe side expr is not included in QueryMaintenance(). We cache the probe expression
+ // values in ExprValuesCache. Local allocations need to survive until the cache is reset
+ // so we need to manually free probe expr local allocations.
AddExprCtxsToFree(build_expr_ctxs_);
// other_join_conjunct_ctxs_ are evaluated in the context of rows assembled from all
@@ -162,10 +164,10 @@ Status PartitionedHashJoinNode::Prepare(RuntimeState* state) {
join_op_ == TJoinOp::RIGHT_ANTI_JOIN || join_op_ == TJoinOp::FULL_OUTER_JOIN ||
std::accumulate(is_not_distinct_from_.begin(), is_not_distinct_from_.end(), false,
std::logical_or<bool>());
- ht_ctx_.reset(new HashTableCtx(build_expr_ctxs_, probe_expr_ctxs_, should_store_nulls,
- is_not_distinct_from_, state->fragment_hash_seed(), MAX_PARTITION_DEPTH,
- child(1)->row_desc().tuple_descriptors().size()));
-
+ RETURN_IF_ERROR(HashTableCtx::Create(state, build_expr_ctxs_, probe_expr_ctxs_,
+ should_store_nulls, is_not_distinct_from_, state->fragment_hash_seed(),
+ MAX_PARTITION_DEPTH, child(1)->row_desc().tuple_descriptors().size(), mem_tracker(),
+ &ht_ctx_));
if (join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
null_aware_eval_timer_ = ADD_TIMER(runtime_profile(), "NullAwareAntiJoinEvalTime");
}
@@ -206,20 +208,20 @@ Status PartitionedHashJoinNode::Prepare(RuntimeState* state) {
ht_ctx_->CodegenHashCurrentRow(state, true, &murmur_hash_fn));
// Codegen for evaluating build rows
- Function* eval_row_fn;
- codegen_status.MergeStatus(ht_ctx_->CodegenEvalRow(state, true, &eval_row_fn));
+ Function* eval_build_row_fn;
+ codegen_status.MergeStatus(ht_ctx_->CodegenEvalRow(state, true, &eval_build_row_fn));
if (codegen_status.ok()) {
// Codegen for build path
build_codegen_status =
- CodegenProcessBuildBatch(state, hash_fn, murmur_hash_fn, eval_row_fn);
+ CodegenProcessBuildBatch(state, hash_fn, murmur_hash_fn, eval_build_row_fn);
if (build_codegen_status.ok()) build_codegen_enabled = true;
// Codegen for probe path
probe_codegen_status = CodegenProcessProbeBatch(state, hash_fn, murmur_hash_fn);
if (probe_codegen_status.ok()) probe_codegen_enabled = true;
// Codegen for InsertBatch()
insert_codegen_status = CodegenInsertBatch(state, hash_fn, murmur_hash_fn,
- eval_row_fn);
+ eval_build_row_fn);
if (insert_codegen_status.ok()) ht_construction_codegen_enabled = true;
} else {
build_codegen_status = codegen_status;
@@ -324,8 +326,7 @@ PartitionedHashJoinNode::Partition::Partition(RuntimeState* state,
: parent_(parent),
is_closed_(false),
is_spilled_(false),
- level_(level),
- null_bitmap_(state->batch_size()) {
+ level_(level) {
build_rows_ = new BufferedTupleStream(state, parent_->child(1)->row_desc(),
state->block_mgr(), parent_->block_mgr_client_,
true /* use_initial_small_buffers */, false /* read_write */);
@@ -334,8 +335,6 @@ PartitionedHashJoinNode::Partition::Partition(RuntimeState* state,
state->block_mgr(), parent_->block_mgr_client_,
true /* use_initial_small_buffers */, false /* read_write */ );
DCHECK(probe_rows_ != NULL);
- hash_values_.resize(state->batch_size());
- null_bitmap_.SetAllBits(false);
}
PartitionedHashJoinNode::Partition::~Partition() {
@@ -465,6 +464,7 @@ Status PartitionedHashJoinNode::Partition::BuildHashTable(RuntimeState* state,
DCHECK_EQ(batch.num_rows(), indices.size());
DCHECK_LE(batch.num_rows(), hash_tbl_->EmptyBuckets())
<< build_rows()->RowConsumesMemory();
+ TPrefetchMode::type prefetch_mode = state->query_options().prefetch_mode;
SCOPED_TIMER(parent_->build_timer_);
if (parent_->insert_batch_fn_ != NULL) {
InsertBatchFn insert_batch_fn;
@@ -474,9 +474,13 @@ Status PartitionedHashJoinNode::Partition::BuildHashTable(RuntimeState* state,
insert_batch_fn = parent_->insert_batch_fn_;
}
DCHECK(insert_batch_fn != NULL);
- if (UNLIKELY(!insert_batch_fn(this, ctx, &batch, indices))) goto not_built;
+ if (UNLIKELY(!insert_batch_fn(this, prefetch_mode, ctx, &batch, indices))) {
+ goto not_built;
+ }
} else {
- if (UNLIKELY(!InsertBatch(ctx, &batch, indices))) goto not_built;
+ if (UNLIKELY(!InsertBatch(prefetch_mode, ctx, &batch, indices))) {
+ goto not_built;
+ }
}
RETURN_IF_ERROR(state->GetQueryStatus());
parent_->FreeLocalAllocations();
@@ -652,7 +656,6 @@ Status PartitionedHashJoinNode::ProcessBuildInput(RuntimeState* state, int level
DCHECK(new_partition != NULL);
hash_partitions_.push_back(partition_pool_->Add(new_partition));
RETURN_IF_ERROR(new_partition->build_rows()->Init(id(), runtime_profile(), true));
-
// Initialize a buffer for the probe here to make sure why have it if we need it.
// While this is not strictly necessary (there are some cases where we won't need this
// buffer), the benefit is low. Not grabbing this buffer means there is an additional
@@ -671,6 +674,8 @@ Status PartitionedHashJoinNode::ProcessBuildInput(RuntimeState* state, int level
while (!eos) {
RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR(QueryMaintenance(state));
+ // 'probe_expr_ctxs_' should have made no local allocations in this function.
+ DCHECK(!ExprContext::HasLocalAllocations(probe_expr_ctxs_));
if (input_partition_ == NULL) {
// If we are still consuming batches from the build side.
{
@@ -887,6 +892,43 @@ int64_t PartitionedHashJoinNode::LargestSpilledPartition() const {
return max_rows;
}
+int PartitionedHashJoinNode::ProcessProbeBatch(
+ const TJoinOp::type join_op, TPrefetchMode::type prefetch_mode,
+ RowBatch* out_batch, HashTableCtx* ht_ctx, Status* status) {
+ switch (join_op) {
+ case TJoinOp::INNER_JOIN:
+ return ProcessProbeBatch<TJoinOp::INNER_JOIN>(prefetch_mode, out_batch,
+ ht_ctx, status);
+ case TJoinOp::LEFT_OUTER_JOIN:
+ return ProcessProbeBatch<TJoinOp::LEFT_OUTER_JOIN>(prefetch_mode, out_batch,
+ ht_ctx, status);
+ case TJoinOp::LEFT_SEMI_JOIN:
+ return ProcessProbeBatch<TJoinOp::LEFT_SEMI_JOIN>(prefetch_mode, out_batch,
+ ht_ctx, status);
+ case TJoinOp::LEFT_ANTI_JOIN:
+ return ProcessProbeBatch<TJoinOp::LEFT_ANTI_JOIN>(prefetch_mode, out_batch,
+ ht_ctx, status);
+ case TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN:
+ return ProcessProbeBatch<TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN>(prefetch_mode,
+ out_batch, ht_ctx, status);
+ case TJoinOp::RIGHT_OUTER_JOIN:
+ return ProcessProbeBatch<TJoinOp::RIGHT_OUTER_JOIN>(prefetch_mode, out_batch,
+ ht_ctx, status);
+ case TJoinOp::RIGHT_SEMI_JOIN:
+ return ProcessProbeBatch<TJoinOp::RIGHT_SEMI_JOIN>(prefetch_mode, out_batch,
+ ht_ctx, status);
+ case TJoinOp::RIGHT_ANTI_JOIN:
+ return ProcessProbeBatch<TJoinOp::RIGHT_ANTI_JOIN>(prefetch_mode, out_batch,
+ ht_ctx, status);
+ case TJoinOp::FULL_OUTER_JOIN:
+ return ProcessProbeBatch<TJoinOp::FULL_OUTER_JOIN>(prefetch_mode, out_batch,
+ ht_ctx, status);
+ default:
+ DCHECK(false) << "Unknown join type";
+ return -1;
+ }
+}
+
Status PartitionedHashJoinNode::GetNext(RuntimeState* state, RowBatch* out_batch,
bool* eos) {
SCOPED_TIMER(runtime_profile_->total_time_counter());
@@ -951,16 +993,19 @@ Status PartitionedHashJoinNode::GetNext(RuntimeState* state, RowBatch* out_batch
// Putting SCOPED_TIMER in ProcessProbeBatch() causes weird exception handling IR
// in the xcompiled function, so call it here instead.
int rows_added = 0;
+ TPrefetchMode::type prefetch_mode = state->query_options().prefetch_mode;
SCOPED_TIMER(probe_timer_);
if (process_probe_batch_fn_ == NULL) {
- rows_added = ProcessProbeBatch(join_op_, out_batch, ht_ctx_.get(), &status);
+ rows_added = ProcessProbeBatch(join_op_, prefetch_mode, out_batch, ht_ctx_.get(),
+ &status);
} else {
DCHECK(process_probe_batch_fn_level0_ != NULL);
if (ht_ctx_->level() == 0) {
- rows_added = process_probe_batch_fn_level0_(this, out_batch, ht_ctx_.get(),
- &status);
+ rows_added = process_probe_batch_fn_level0_(this, prefetch_mode, out_batch,
+ ht_ctx_.get(), &status);
} else {
- rows_added = process_probe_batch_fn_(this, out_batch, ht_ctx_.get(), &status);
+ rows_added = process_probe_batch_fn_(this, prefetch_mode, out_batch,
+ ht_ctx_.get(), &status);
}
}
if (UNLIKELY(rows_added < 0)) {
@@ -982,6 +1027,10 @@ Status PartitionedHashJoinNode::GetNext(RuntimeState* state, RowBatch* out_batch
} else {
RETURN_IF_ERROR(NextSpilledProbeRowBatch(state, out_batch));
}
+ // Free local allocations of the probe side expressions only after ExprValuesCache
+ // has been reset.
+ DCHECK(ht_ctx_->expr_values_cache()->AtEnd());
+ ExprContext::FreeLocalAllocations(probe_expr_ctxs_);
// We want to return as soon as we have attached a tuple stream to the out_batch
// (before preparing a new partition). The attached tuple stream will be recycled
@@ -1615,6 +1664,7 @@ Status PartitionedHashJoinNode::CodegenProcessBuildBatch(RuntimeState* state,
codegen->CloneFunction(process_build_batch_fn);
// Always build runtime filters at level0 (if there are any).
+ // Note that the first argument of this function is the return value.
Value* build_filters_l0_arg = codegen->GetArgument(process_build_batch_fn_level0, 3);
build_filters_l0_arg->replaceAllUsesWith(
ConstantInt::get(Type::getInt1Ty(codegen->context()), filters_.size() > 0));
@@ -1630,7 +1680,8 @@ Status PartitionedHashJoinNode::CodegenProcessBuildBatch(RuntimeState* state,
DCHECK_EQ(replaced, 1);
// Never build filters after repartitioning, as all rows have already been added to the
- // filters during the level0 build.
+ // filters during the level0 build. Note that the first argument of this function is the
+ // return value.
Value* build_filters_arg = codegen->GetArgument(process_build_batch_fn, 3);
build_filters_arg->replaceAllUsesWith(
ConstantInt::get(Type::getInt1Ty(codegen->context()), false));
@@ -1698,21 +1749,26 @@ Status PartitionedHashJoinNode::CodegenProcessProbeBatch(
DCHECK(process_probe_batch_fn != NULL);
process_probe_batch_fn->setName("ProcessProbeBatch");
- // Since ProcessProbeBatch() is a templated function, it has linkonce_odr linkage, which
- // means the function can be removed if it's not referenced. Change to weak_odr, which
- // has the same semantics except it can't be removed.
- // See http://llvm.org/docs/LangRef.html#linkage-types
- DCHECK(process_probe_batch_fn->getLinkage() == GlobalValue::LinkOnceODRLinkage)
+ // Verifies that ProcessProbeBatch() has weak_odr linkage so it's not discarded even
+ // if it's not referenced. See http://llvm.org/docs/LangRef.html#linkage-types
+ DCHECK(process_probe_batch_fn->getLinkage() == GlobalValue::WeakODRLinkage)
<< LlvmCodeGen::Print(process_probe_batch_fn);
- process_probe_batch_fn->setLinkage(GlobalValue::WeakODRLinkage);
// Bake in %this pointer argument to process_probe_batch_fn.
Value* this_arg = codegen->GetArgument(process_probe_batch_fn, 0);
Value* this_loc = codegen->CastPtrToLlvmPtr(this_arg->getType(), this);
this_arg->replaceAllUsesWith(this_loc);
+ // Replace the parameter 'prefetch_mode' with constant.
+ Value* prefetch_mode_arg = codegen->GetArgument(process_probe_batch_fn, 1);
+ TPrefetchMode::type prefetch_mode = state->query_options().prefetch_mode;
+ DCHECK_GE(prefetch_mode, TPrefetchMode::NONE);
+ DCHECK_LE(prefetch_mode, TPrefetchMode::HT_BUCKET);
+ prefetch_mode_arg->replaceAllUsesWith(
+ ConstantInt::get(Type::getInt32Ty(codegen->context()), prefetch_mode));
+
// Bake in %ht_ctx pointer argument to process_probe_batch_fn
- Value* ht_ctx_arg = codegen->GetArgument(process_probe_batch_fn, 2);
+ Value* ht_ctx_arg = codegen->GetArgument(process_probe_batch_fn, 3);
Value* ht_ctx_loc = codegen->CastPtrToLlvmPtr(ht_ctx_arg->getType(), ht_ctx_.get());
ht_ctx_arg->replaceAllUsesWith(ht_ctx_loc);
@@ -1823,9 +1879,17 @@ Status PartitionedHashJoinNode::CodegenInsertBatch(RuntimeState* state,
Function* build_equals_fn;
RETURN_IF_ERROR(ht_ctx_->CodegenEquals(state, true, &build_equals_fn));
+ // Replace the parameter 'prefetch_mode' with constant.
+ Value* prefetch_mode_arg = codegen->GetArgument(insert_batch_fn, 1);
+ TPrefetchMode::type prefetch_mode = state->query_options().prefetch_mode;
+ DCHECK_GE(prefetch_mode, TPrefetchMode::NONE);
+ DCHECK_LE(prefetch_mode, TPrefetchMode::HT_BUCKET);
+ prefetch_mode_arg->replaceAllUsesWith(
+ ConstantInt::get(Type::getInt32Ty(codegen->context()), prefetch_mode));
+
// Use codegen'd EvalBuildRow() function
int replaced = codegen->ReplaceCallSites(insert_batch_fn, eval_row_fn, "EvalBuildRow");
- DCHECK_EQ(replaced, 2);
+ DCHECK_EQ(replaced, 1);
// Use codegen'd Equals() function
replaced = codegen->ReplaceCallSites(insert_batch_fn, build_equals_fn, "Equals");
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a59408b5/be/src/exec/partitioned-hash-join-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-node.h b/be/src/exec/partitioned-hash-join-node.h
index 37c2c4f..a76dced 100644
--- a/be/src/exec/partitioned-hash-join-node.h
+++ b/be/src/exec/partitioned-hash-join-node.h
@@ -189,7 +189,7 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
/// Probes and updates the hash table for the current probe row for either
/// RIGHT_SEMI_JOIN or RIGHT_ANTI_JOIN. For RIGHT_SEMI_JOIN, all matching build
- /// rows will be appended to the 'out_batch'; For RIGHT_ANTI_JOIN, update the
+ /// rows will be appended to the output batch; For RIGHT_ANTI_JOIN, update the
/// hash table only if matches are found. The actual output happens in
/// OutputUnmatchedBuild(). Returns true if probing is done for the current
/// probe row and should continue to next row.
@@ -207,8 +207,9 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
/// Probes the hash table for the current probe row for LEFT_SEMI_JOIN,
/// LEFT_ANTI_JOIN or NULL_AWARE_LEFT_ANTI_JOIN. The probe row will be appended
- /// to 'out_batch' if it's part of the output. Returns true if probing
- /// is done for the current probe row and should continue to next row.
+ /// to output batch if there is a match (for LEFT_SEMI_JOIN) or if there is no
+ /// match (for LEFT_ANTI_JOIN). Returns true if probing is done for the current
+ /// probe row and should continue to next row.
///
/// 'out_batch_iterator' is the iterator for the output batch.
/// 'remaining_capacity' tracks the number of additional rows that can be added to
@@ -223,7 +224,7 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
/// Probes the hash table for the current probe row for LEFT_OUTER_JOIN,
/// RIGHT_OUTER_JOIN or FULL_OUTER_JOIN. The matching build and/or probe row
- /// will appended to 'out_batch'. For RIGHT/FULL_OUTER_JOIN, some of the outputs
+ /// will be appended to output batch. For RIGHT/FULL_OUTER_JOIN, some of the outputs
/// are added in OutputUnmatchedBuild(). Returns true if probing is done for the
/// current probe row and should continue to next row.
///
@@ -239,6 +240,25 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
ExprContext* const* conjunct_ctxs, int num_conjuncts,
RowBatch::Iterator* out_batch_iterator, int* remaining_capacity);
+ /// Probes 'current_probe_row_' against the the hash tables and append outputs
+ /// to output batch. Wrapper around the join-type specific probe row functions
+ /// declared above.
+ template<int const JoinOp>
+ bool inline ProcessProbeRow(
+ ExprContext* const* other_join_conjunct_ctxs, int num_other_join_conjuncts,
+ ExprContext* const* conjunct_ctxs, int num_conjuncts,
+ RowBatch::Iterator* out_batch_iterator, int* remaining_capacity, Status* status);
+
+ /// Evaluates some number of rows in 'probe_batch_' against the probe expressions
+ /// and hashes the results to 32-bit hash values. The evaluation results and the hash
+ /// values are stored in the expression values cache in 'ht_ctx'. The number of rows
+ /// processed depends on the capacity available in 'ht_ctx->expr_values_cache_'.
+ /// 'prefetch_mode' specifies the prefetching mode in use. If it's not PREFETCH_NONE,
+ /// hash table buckets will be prefetched based on the hash values computed. Note
+ /// that 'prefetch_mode' will be substituted with constants during codegen time.
+ void EvalAndHashProbePrefetchGroup(TPrefetchMode::type prefetch_mode,
+ HashTableCtx* ctx);
+
/// Find the next probe row. Returns true if a probe row is found. In which case,
/// 'current_probe_row_' and 'hash_tbl_iterator_' have been set up to point to the
/// next probe row and its corresponding partition. 'status' may be updated if
@@ -246,7 +266,7 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
template<int const JoinOp>
bool inline NextProbeRow(
HashTableCtx* ht_ctx, RowBatch::Iterator* probe_batch_iterator,
- int* remaining_capacity, int num_other_join_conjuncts, Status* status);
+ int* remaining_capacity, Status* status);
/// Process probe rows from probe_batch_. Returns either if out_batch is full or
/// probe_batch_ is entirely consumed.
@@ -256,11 +276,12 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
/// set). This function doesn't commit rows to the output batch so it's the caller's
/// responsibility to do so.
template<int const JoinOp>
- int ProcessProbeBatch(RowBatch* out_batch, HashTableCtx* ht_ctx, Status* status);
+ int ProcessProbeBatch(TPrefetchMode::type, RowBatch* out_batch, HashTableCtx* ht_ctx,
+ Status* status);
/// Wrapper that calls the templated version of ProcessProbeBatch() based on 'join_op'.
- int ProcessProbeBatch(const TJoinOp::type join_op, RowBatch* out_batch,
- HashTableCtx* ht_ctx, Status* status);
+ int ProcessProbeBatch(const TJoinOp::type join_op, TPrefetchMode::type,
+ RowBatch* out_batch, HashTableCtx* ht_ctx, Status* status);
/// Sweep the hash_tbl_ of the partition that is at the front of
/// output_build_partitions_, using hash_tbl_iterator_ and output any unmatched build
@@ -553,9 +574,13 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
/// Inserts each row in 'batch' into 'hash_tbl_' using 'ctx'. 'indices' is an array
/// containing the index of each row's index into the hash table's tuple stream.
- /// This function is replaced with a codegen'd version.
- bool InsertBatch(HashTableCtx* ctx, RowBatch* batch,
- const std::vector<BufferedTupleStream::RowIdx>& indices);
+ /// 'prefetch_mode' is the prefetching mode in use. If it's not PREFETCH_NONE, hash
+ /// table buckets which the rows hashes to will be prefetched. This parameter is
+ /// replaced with a constant during codegen time. This function may be replaced with
+ /// a codegen'd version. Returns true if all rows in 'batch' are successfully
+ /// inserted.
+ bool InsertBatch(TPrefetchMode::type prefetch_mode, HashTableCtx* ctx,
+ RowBatch* batch, const std::vector<BufferedTupleStream::RowIdx>& indices);
PartitionedHashJoinNode* parent_;
@@ -581,13 +606,6 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
/// If NULL, ownership has been transfered.
BufferedTupleStream* build_rows_;
BufferedTupleStream* probe_rows_;
-
- /// Store hash values of each row for the current batch computed during prefetching.
- std::vector<uint32_t> hash_values_;
-
- /// Bitmap to indicate rows evaluated to NULL for the current batch when building
- /// hash tables.
- Bitmap null_bitmap_;
};
/// For the below codegen'd functions, xxx_fn_level0_ uses CRC hashing when available
@@ -600,14 +618,14 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
ProcessBuildBatchFn process_build_batch_fn_;
ProcessBuildBatchFn process_build_batch_fn_level0_;
- typedef int (*ProcessProbeBatchFn)(
- PartitionedHashJoinNode*, RowBatch*, HashTableCtx*, Status*);
+ typedef int (*ProcessProbeBatchFn)(PartitionedHashJoinNode*,
+ TPrefetchMode::type, RowBatch*, HashTableCtx*, Status*);
/// Jitted ProcessProbeBatch function pointers. NULL if codegen is disabled.
ProcessProbeBatchFn process_probe_batch_fn_;
ProcessProbeBatchFn process_probe_batch_fn_level0_;
- typedef bool (*InsertBatchFn)(Partition*, HashTableCtx*, RowBatch*,
- const std::vector<BufferedTupleStream::RowIdx>&);
+ typedef bool (*InsertBatchFn)(Partition*, TPrefetchMode::type, HashTableCtx*,
+ RowBatch*, const std::vector<BufferedTupleStream::RowIdx>&);
/// Jitted Partition::InsertBatch() function pointers. NULL if codegen is disabled.
InsertBatchFn insert_batch_fn_;
InsertBatchFn insert_batch_fn_level0_;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a59408b5/be/src/exec/partitioned-hash-join-node.inline.h
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-node.inline.h b/be/src/exec/partitioned-hash-join-node.inline.h
index d63df1e..8ebeab3 100644
--- a/be/src/exec/partitioned-hash-join-node.inline.h
+++ b/be/src/exec/partitioned-hash-join-node.inline.h
@@ -26,6 +26,7 @@ inline void PartitionedHashJoinNode::ResetForProbe() {
probe_batch_pos_ = 0;
matched_probe_ = true;
hash_tbl_iterator_.SetAtEnd();
+ ht_ctx_->expr_values_cache()->Reset();
}
inline bool PartitionedHashJoinNode::AppendRow(BufferedTupleStream* stream,
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a59408b5/be/src/exprs/expr-context.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/expr-context.cc b/be/src/exprs/expr-context.cc
index 7231c5e..2a01bfa 100644
--- a/be/src/exprs/expr-context.cc
+++ b/be/src/exprs/expr-context.cc
@@ -110,8 +110,23 @@ Status ExprContext::Clone(RuntimeState* state, ExprContext** new_ctx) {
return root_->Open(state, *new_ctx, FunctionContext::THREAD_LOCAL);
}
-void ExprContext::FreeLocalAllocations() {
- FreeLocalAllocations(fn_contexts_);
+bool ExprContext::HasLocalAllocations(const vector<ExprContext*>& ctxs) {
+ for (int i = 0; i < ctxs.size(); ++i) {
+ if (ctxs[i]->HasLocalAllocations()) return true;
+ }
+ return false;
+}
+
+bool ExprContext::HasLocalAllocations() {
+ return HasLocalAllocations(fn_contexts_);
+}
+
+bool ExprContext::HasLocalAllocations(const std::vector<FunctionContext*>& fn_ctxs) {
+ for (int i = 0; i < fn_ctxs.size(); ++i) {
+ if (fn_ctxs[i]->impl()->closed()) continue;
+ if (fn_ctxs[i]->impl()->HasLocalAllocations()) return true;
+ }
+ return false;
}
void ExprContext::FreeLocalAllocations(const vector<ExprContext*>& ctxs) {
@@ -120,6 +135,10 @@ void ExprContext::FreeLocalAllocations(const vector<ExprContext*>& ctxs) {
}
}
+void ExprContext::FreeLocalAllocations() {
+ FreeLocalAllocations(fn_contexts_);
+}
+
void ExprContext::FreeLocalAllocations(const vector<FunctionContext*>& fn_ctxs) {
for (int i = 0; i < fn_ctxs.size(); ++i) {
if (fn_ctxs[i]->impl()->closed()) continue;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a59408b5/be/src/exprs/expr-context.h
----------------------------------------------------------------------
diff --git a/be/src/exprs/expr-context.h b/be/src/exprs/expr-context.h
index 0816774..9078ce6 100644
--- a/be/src/exprs/expr-context.h
+++ b/be/src/exprs/expr-context.h
@@ -123,10 +123,16 @@ class ExprContext {
TimestampVal GetTimestampVal(TupleRow* row);
DecimalVal GetDecimalVal(TupleRow* row);
+ /// Returns true if any of the expression contexts in the array has local allocations.
+ /// The last two are helper functions.
+ static bool HasLocalAllocations(const std::vector<ExprContext*>& ctxs);
+ bool HasLocalAllocations();
+ static bool HasLocalAllocations(const std::vector<FunctionContext*>& fn_ctxs);
+
/// Frees all local allocations made by fn_contexts_. This can be called when result
- /// data from this context is no longer needed.
- void FreeLocalAllocations();
+ /// data from this context is no longer needed. The last two are helper functions.
static void FreeLocalAllocations(const std::vector<ExprContext*>& ctxs);
+ void FreeLocalAllocations();
static void FreeLocalAllocations(const std::vector<FunctionContext*>& ctxs);
static const char* LLVM_CLASS_NAME;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a59408b5/be/src/runtime/row-batch.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/row-batch.h b/be/src/runtime/row-batch.h
index aac0043..3f2ba29 100644
--- a/be/src/runtime/row-batch.h
+++ b/be/src/runtime/row-batch.h
@@ -143,16 +143,21 @@ class RowBatch {
}
/// An iterator for going through a row batch, starting at 'row_idx'.
- /// This is more efficient than using GetRow() as it avoids loading the
- /// row batch state and doing multiplication on each loop with GetRow().
+ /// If 'limit' is specified, it will iterate up to row number 'row_idx + limit'
+ /// or the last row, whichever comes first. Otherwise, it will iterate till the last
+ /// row in the batch. This is more efficient than using GetRow() as it avoids loading
+ /// the row batch state and doing multiplication on each loop with GetRow().
class Iterator {
public:
- IR_ALWAYS_INLINE Iterator(RowBatch* parent, int row_idx) :
+ Iterator(RowBatch* parent, int row_idx, int limit = -1) :
num_tuples_per_row_(parent->num_tuples_per_row_),
- row_(parent->tuple_ptrs_ + row_idx * num_tuples_per_row_),
- row_batch_end_(parent->tuple_ptrs_ + parent->num_rows_ * num_tuples_per_row_),
+ row_(parent->tuple_ptrs_ + num_tuples_per_row_ * row_idx),
+ row_batch_end_(parent->tuple_ptrs_ + num_tuples_per_row_ *
+ (limit == -1 ? parent->num_rows_ :
+ std::min<int>(row_idx + limit, parent->num_rows_))),
parent_(parent) {
DCHECK_GE(row_idx, 0);
+ DCHECK_GT(num_tuples_per_row_, 0);
/// We allow empty row batches with num_rows_ == capacity_ == 0.
/// That's why we cannot call GetRow() above to initialize 'row_'.
DCHECK_LE(row_idx, parent->capacity_);
@@ -397,12 +402,17 @@ class RowBatch {
}
-/// Macro for iterating through '_row_batch', starting at '_start_row_idx'.
+/// Macros for iterating through '_row_batch', starting at '_start_row_idx'.
/// '_row_batch' is the row batch to iterate through.
/// '_start_row_idx' is the starting row index.
/// '_iter' is the iterator.
+/// '_limit' is the max number of rows to iterate over.
#define FOREACH_ROW(_row_batch, _start_row_idx, _iter) \
for (RowBatch::Iterator _iter(_row_batch, _start_row_idx); \
!_iter.AtEnd(); _iter.Next())
+#define FOREACH_ROW_LIMIT(_row_batch, _start_row_idx, _limit, _iter) \
+ for (RowBatch::Iterator _iter(_row_batch, _start_row_idx, _limit); \
+ !_iter.AtEnd(); _iter.Next())
+
#endif
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a59408b5/be/src/runtime/test-env.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/test-env.cc b/be/src/runtime/test-env.cc
index 99b227c..39fc7cd 100644
--- a/be/src/runtime/test-env.cc
+++ b/be/src/runtime/test-env.cc
@@ -78,6 +78,7 @@ Status TestEnv::CreateQueryState(int64_t query_id, int max_buffers, int block_si
block_mgr_parent_tracker_.get(), (*runtime_state)->runtime_profile(),
tmp_file_mgr_.get(), CalculateMemLimit(max_buffers, block_size), block_size, &mgr));
(*runtime_state)->set_block_mgr(mgr);
+ (*runtime_state)->InitMemTrackers(TUniqueId(), NULL, -1);
query_states_.push_back(shared_ptr<RuntimeState>(*runtime_state));
return Status::OK();
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a59408b5/be/src/service/query-options.cc
----------------------------------------------------------------------
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index 274776c..d12ce1f 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -390,6 +390,17 @@ Status impala::SetQueryOption(const string& key, const string& value,
iequals(value, "true") || iequals(value, "1"));
break;
}
+ case TImpalaQueryOptions::PREFETCH_MODE: {
+ if (iequals(value, "NONE") || iequals(value, "0")) {
+ query_options->__set_prefetch_mode(TPrefetchMode::NONE);
+ } else if (iequals(value, "HT_BUCKET") || iequals(value, "1")) {
+ query_options->__set_prefetch_mode(TPrefetchMode::HT_BUCKET);
+ } else {
+ return Status(Substitute("Invalid prefetch mode '$0'. Valid modes are "
+ "NONE(0) or HT_BUCKET(1)", value));
+ }
+ break;
+ }
default:
// We hit this DCHECK(false) if we forgot to add the corresponding entry here
// when we add a new query option.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a59408b5/be/src/service/query-options.h
----------------------------------------------------------------------
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index fb24530..60c122d 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -32,7 +32,7 @@ class TQueryOptions;
// the DCHECK.
#define QUERY_OPTS_TABLE\
DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),\
- TImpalaQueryOptions::RUNTIME_FILTER_MIN_SIZE + 1);\
+ TImpalaQueryOptions::PREFETCH_MODE + 1);\
QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED)\
QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR)\
QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS)\
@@ -78,7 +78,8 @@ class TQueryOptions;
QUERY_OPT_FN(mt_num_cores, MT_NUM_CORES)\
QUERY_OPT_FN(s3_skip_insert_staging, S3_SKIP_INSERT_STAGING)\
QUERY_OPT_FN(runtime_filter_min_size, RUNTIME_FILTER_MIN_SIZE)\
- QUERY_OPT_FN(runtime_filter_max_size, RUNTIME_FILTER_MAX_SIZE);
+ QUERY_OPT_FN(runtime_filter_max_size, RUNTIME_FILTER_MAX_SIZE)\
+ QUERY_OPT_FN(prefetch_mode, PREFETCH_MODE);
/// Converts a TQueryOptions struct into a map of key, value pairs.
void TQueryOptionsToMap(const TQueryOptions& query_options,
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a59408b5/be/src/udf/udf-internal.h
----------------------------------------------------------------------
diff --git a/be/src/udf/udf-internal.h b/be/src/udf/udf-internal.h
index 3e5993b..10ce43b 100644
--- a/be/src/udf/udf-internal.h
+++ b/be/src/udf/udf-internal.h
@@ -84,6 +84,9 @@ class FunctionContextImpl {
/// Frees all allocations returned by AllocateLocal().
void FreeLocalAllocations();
+ /// Returns true if there are any allocations returned by AllocateLocal().
+ bool HasLocalAllocations() const { return !local_allocations_.empty(); }
+
/// Sets constant_args_. The AnyVal* values are owned by the caller.
void SetConstantArgs(const std::vector<impala_udf::AnyVal*>& constant_args);
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a59408b5/common/thrift/ImpalaInternalService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index 74bef28..ada1a20 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -193,6 +193,9 @@ struct TQueryOptions {
// Maximum runtime filter size, in bytes
47: optional i32 runtime_filter_max_size = 16777216
+
+ // Prefetching behavior during hash tables' building and probing.
+ 48: optional Types.TPrefetchMode prefetch_mode = Types.TPrefetchMode.HT_BUCKET
}
// Impala currently has two types of sessions: Beeswax and HiveServer2
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a59408b5/common/thrift/ImpalaService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift
index 9421ec4..29ae4cc 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -219,7 +219,10 @@ enum TImpalaQueryOptions {
RUNTIME_FILTER_MAX_SIZE,
// Minimum runtime filter size, in bytes.
- RUNTIME_FILTER_MIN_SIZE
+ RUNTIME_FILTER_MIN_SIZE,
+
+ // Prefetching behavior during hash tables' building and probing.
+ PREFETCH_MODE
}
// The summary of an insert.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a59408b5/common/thrift/Types.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/Types.thrift b/common/thrift/Types.thrift
index 85a14dc..24f4524 100644
--- a/common/thrift/Types.thrift
+++ b/common/thrift/Types.thrift
@@ -120,6 +120,14 @@ enum TRuntimeFilterMode {
GLOBAL
}
+enum TPrefetchMode {
+ // No prefetching at all.
+ NONE,
+
+ // Prefetch the hash table buckets.
+ HT_BUCKET
+}
+
// A TNetworkAddress is the standard host, port representation of a
// network address. The hostname field must be resolvable to an IPv4
// address.