You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2016/09/29 00:32:34 UTC
[2/6] incubator-impala git commit: IMPALA-3567 Part 2,
IMPALA-3899: factor out PHJ builder
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1ccd83b2/be/src/exec/partitioned-hash-join-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-node.cc b/be/src/exec/partitioned-hash-join-node.cc
index f634193..46b91ba 100644
--- a/be/src/exec/partitioned-hash-join-node.cc
+++ b/be/src/exec/partitioned-hash-join-node.cc
@@ -31,10 +31,7 @@
#include "runtime/buffered-tuple-stream.inline.h"
#include "runtime/mem-tracker.h"
#include "runtime/row-batch.h"
-#include "runtime/runtime-filter.h"
-#include "runtime/runtime-filter-bank.h"
#include "runtime/runtime-state.h"
-#include "util/bloom-filter.h"
#include "util/debug-util.h"
#include "util/runtime-profile-counters.h"
@@ -44,35 +41,26 @@
DEFINE_bool(enable_phj_probe_side_filtering, true, "Deprecated.");
-const string PREPARE_FOR_READ_FAILED_ERROR_MSG = "Failed to acquire initial read buffer "
- "for stream in hash join node $0. Reducing query concurrency or increasing the "
- "memory limit may help this query to complete successfully.";
+static const string PREPARE_FOR_READ_FAILED_ERROR_MSG =
+ "Failed to acquire initial read buffer for stream in hash join node $0. Reducing "
+ "query concurrency or increasing the memory limit may help this query to complete "
+ "successfully.";
using namespace impala;
using namespace llvm;
using namespace strings;
+using std::unique_ptr;
PartitionedHashJoinNode::PartitionedHashJoinNode(
ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
- : BlockingJoinNode("PartitionedHashJoinNode", tnode.hash_join_node.join_op,
- pool, tnode, descs),
- is_not_distinct_from_(),
- block_mgr_client_(NULL),
- partition_build_timer_(NULL),
+ : BlockingJoinNode(
+ "PartitionedHashJoinNode", tnode.hash_join_node.join_op, pool, tnode, descs),
+ num_probe_rows_partitioned_(NULL),
null_aware_eval_timer_(NULL),
state_(PARTITIONING_BUILD),
- partition_pool_(new ObjectPool()),
- input_partition_(NULL),
- null_aware_partition_(NULL),
- non_empty_build_(false),
- null_probe_rows_(NULL),
null_probe_output_idx_(-1),
- process_build_batch_fn_(NULL),
- process_build_batch_fn_level0_(NULL),
process_probe_batch_fn_(NULL),
- process_probe_batch_fn_level0_(NULL),
- insert_batch_fn_(NULL),
- insert_batch_fn_level0_(NULL) {
+ process_probe_batch_fn_level0_(NULL) {
memset(hash_tbls_, 0, sizeof(HashTable*) * PARTITION_FANOUT);
}
@@ -86,35 +74,22 @@ Status PartitionedHashJoinNode::Init(const TPlanNode& tnode, RuntimeState* state
DCHECK(tnode.__isset.hash_join_node);
const vector<TEqJoinCondition>& eq_join_conjuncts =
tnode.hash_join_node.eq_join_conjuncts;
- for (int i = 0; i < eq_join_conjuncts.size(); ++i) {
+ // TODO: allow PhjBuilder to be the sink of a separate fragment. For now, PhjBuilder is
+ // owned by this node, but duplicates some state (exprs, etc) in anticipation of it
+ // being separated out further.
+ builder_.reset(
+ new PhjBuilder(id(), join_op_, child(0)->row_desc(), child(1)->row_desc(), state));
+ RETURN_IF_ERROR(builder_->Init(state, eq_join_conjuncts, tnode.runtime_filters));
+
+ for (const TEqJoinCondition& eq_join_conjunct : eq_join_conjuncts) {
ExprContext* ctx;
- RETURN_IF_ERROR(Expr::CreateExprTree(pool_, eq_join_conjuncts[i].left, &ctx));
+ RETURN_IF_ERROR(Expr::CreateExprTree(pool_, eq_join_conjunct.left, &ctx));
probe_expr_ctxs_.push_back(ctx);
- RETURN_IF_ERROR(Expr::CreateExprTree(pool_, eq_join_conjuncts[i].right, &ctx));
+ RETURN_IF_ERROR(Expr::CreateExprTree(pool_, eq_join_conjunct.right, &ctx));
build_expr_ctxs_.push_back(ctx);
- is_not_distinct_from_.push_back(eq_join_conjuncts[i].is_not_distinct_from);
}
- RETURN_IF_ERROR(
- Expr::CreateExprTrees(pool_, tnode.hash_join_node.other_join_conjuncts,
- &other_join_conjunct_ctxs_));
-
- for (const TRuntimeFilterDesc& filter: tnode.runtime_filters) {
- // If filter propagation not enabled, only consider building broadcast joins (that may
- // be consumed by this fragment).
- if (state->query_options().runtime_filter_mode != TRuntimeFilterMode::GLOBAL &&
- !filter.is_broadcast_join) {
- continue;
- }
- if (state->query_options().disable_row_runtime_filtering
- && !filter.applied_on_partition_columns) {
- continue;
- }
- FilterContext filter_ctx;
- filter_ctx.filter = state->filter_bank()->RegisterFilter(filter, true);
- RETURN_IF_ERROR(Expr::CreateExprTree(pool_, filter.src_expr, &filter_ctx.expr));
- filters_.push_back(filter_ctx);
- }
-
+ RETURN_IF_ERROR(Expr::CreateExprTrees(
+ pool_, tnode.hash_join_node.other_join_conjuncts, &other_join_conjunct_ctxs_));
DCHECK(join_op_ != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ||
eq_join_conjuncts.size() == 1);
return Status::OK();
@@ -134,20 +109,17 @@ Status PartitionedHashJoinNode::Prepare(RuntimeState* state) {
RETURN_IF_ERROR(BlockingJoinNode::Prepare(state));
runtime_state_ = state;
+ RETURN_IF_ERROR(builder_->Prepare(state, mem_tracker()));
+ runtime_profile()->PrependChild(builder_->profile());
+
// build and probe exprs are evaluated in the context of the rows produced by our
// right and left children, respectively
RETURN_IF_ERROR(
Expr::Prepare(build_expr_ctxs_, state, child(1)->row_desc(), expr_mem_tracker()));
RETURN_IF_ERROR(
Expr::Prepare(probe_expr_ctxs_, state, child(0)->row_desc(), expr_mem_tracker()));
- for (const FilterContext& ctx: filters_) {
- RETURN_IF_ERROR(ctx.expr->Prepare(state, child(1)->row_desc(), expr_mem_tracker()));
- AddExprCtxToFree(ctx.expr);
- }
- // Although ProcessBuildInput() may be run in a separate thread, it is safe to free
- // local allocations in QueryMaintenance() since the build thread is not run
- // concurrently with other expr evaluation in this join node.
+ // Build expressions may be evaluated during probing, so must be freed.
// Probe side expr is not included in QueryMaintenance(). We cache the probe expression
// values in ExprValuesCache. Local allocations need to survive until the cache is reset
// so we need to manually free probe expr local allocations.
@@ -161,84 +133,25 @@ Status PartitionedHashJoinNode::Prepare(RuntimeState* state) {
Expr::Prepare(other_join_conjunct_ctxs_, state, full_row_desc, expr_mem_tracker()));
AddExprCtxsToFree(other_join_conjunct_ctxs_);
- RETURN_IF_ERROR(state->block_mgr()->RegisterClient(
- Substitute("PartitionedHashJoinNode id=$0 ptr=$1", id_, this),
- MinRequiredBuffers(), true, mem_tracker(), state, &block_mgr_client_));
-
- const bool should_store_nulls = join_op_ == TJoinOp::RIGHT_OUTER_JOIN ||
- join_op_ == TJoinOp::RIGHT_ANTI_JOIN || join_op_ == TJoinOp::FULL_OUTER_JOIN ||
- std::accumulate(is_not_distinct_from_.begin(), is_not_distinct_from_.end(), false,
- std::logical_or<bool>());
RETURN_IF_ERROR(HashTableCtx::Create(state, build_expr_ctxs_, probe_expr_ctxs_,
- should_store_nulls, is_not_distinct_from_, state->fragment_hash_seed(),
- MAX_PARTITION_DEPTH, child(1)->row_desc().tuple_descriptors().size(), mem_tracker(),
- &ht_ctx_));
+ builder_->HashTableStoresNulls(), builder_->is_not_distinct_from(),
+ state->fragment_hash_seed(), MAX_PARTITION_DEPTH,
+ child(1)->row_desc().tuple_descriptors().size(), mem_tracker(), &ht_ctx_));
if (join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
null_aware_eval_timer_ = ADD_TIMER(runtime_profile(), "NullAwareAntiJoinEvalTime");
}
- partition_build_timer_ = ADD_TIMER(runtime_profile(), "BuildPartitionTime");
- num_hash_buckets_ =
- ADD_COUNTER(runtime_profile(), "HashBuckets", TUnit::UNIT);
- partitions_created_ =
- ADD_COUNTER(runtime_profile(), "PartitionsCreated", TUnit::UNIT);
- max_partition_level_ = runtime_profile()->AddHighWaterMarkCounter(
- "MaxPartitionLevel", TUnit::UNIT);
- num_build_rows_partitioned_ =
- ADD_COUNTER(runtime_profile(), "BuildRowsPartitioned", TUnit::UNIT);
num_probe_rows_partitioned_ =
ADD_COUNTER(runtime_profile(), "ProbeRowsPartitioned", TUnit::UNIT);
- num_repartitions_ =
- ADD_COUNTER(runtime_profile(), "NumRepartitions", TUnit::UNIT);
- num_spilled_partitions_ =
- ADD_COUNTER(runtime_profile(), "SpilledPartitions", TUnit::UNIT);
- largest_partition_percent_ = runtime_profile()->AddHighWaterMarkCounter(
- "LargestPartitionPercent", TUnit::UNIT);
- num_hash_collisions_ =
- ADD_COUNTER(runtime_profile(), "HashCollisions", TUnit::UNIT);
-
- bool build_codegen_enabled = false;
+
bool probe_codegen_enabled = false;
- bool ht_construction_codegen_enabled = false;
- Status codegen_status;
- Status build_codegen_status;
Status probe_codegen_status;
- Status insert_codegen_status;
if (state->codegen_enabled()) {
- // Codegen for hashing rows
- Function* hash_fn;
- codegen_status = ht_ctx_->CodegenHashRow(state, false, &hash_fn);
- Function* murmur_hash_fn;
- codegen_status.MergeStatus(ht_ctx_->CodegenHashRow(state, true, &murmur_hash_fn));
-
- // Codegen for evaluating build rows
- Function* eval_build_row_fn;
- codegen_status.MergeStatus(ht_ctx_->CodegenEvalRow(state, true, &eval_build_row_fn));
-
- if (codegen_status.ok()) {
- // Codegen for build path
- build_codegen_status =
- CodegenProcessBuildBatch(state, hash_fn, murmur_hash_fn, eval_build_row_fn);
- if (build_codegen_status.ok()) build_codegen_enabled = true;
- // Codegen for probe path
- probe_codegen_status = CodegenProcessProbeBatch(state, hash_fn, murmur_hash_fn);
- if (probe_codegen_status.ok()) probe_codegen_enabled = true;
- // Codegen for InsertBatch()
- insert_codegen_status = CodegenInsertBatch(state, hash_fn, murmur_hash_fn,
- eval_build_row_fn);
- if (insert_codegen_status.ok()) ht_construction_codegen_enabled = true;
- } else {
- build_codegen_status = codegen_status;
- probe_codegen_status = codegen_status;
- insert_codegen_status = codegen_status;
- }
+ probe_codegen_status = CodegenProcessProbeBatch(state);
+ probe_codegen_enabled = probe_codegen_status.ok();
}
runtime_profile()->AddCodegenMsg(
- build_codegen_enabled, codegen_status, "Build Side");
- runtime_profile()->AddCodegenMsg(
- probe_codegen_enabled, codegen_status, "Probe Side");
- runtime_profile()->AddCodegenMsg(
- ht_construction_codegen_enabled, codegen_status, "Hash Table Construction");
+ probe_codegen_enabled, probe_codegen_status, "Probe Side");
return Status::OK();
}
@@ -248,19 +161,10 @@ Status PartitionedHashJoinNode::Open(RuntimeState* state) {
RETURN_IF_ERROR(Expr::Open(build_expr_ctxs_, state));
RETURN_IF_ERROR(Expr::Open(probe_expr_ctxs_, state));
RETURN_IF_ERROR(Expr::Open(other_join_conjunct_ctxs_, state));
- for (const FilterContext& filter: filters_) RETURN_IF_ERROR(filter.expr->Open(state));
- AllocateRuntimeFilters(state);
if (join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
- null_aware_partition_ = partition_pool_->Add(new Partition(state, this, 0));
- RETURN_IF_ERROR(
- null_aware_partition_->build_rows()->Init(id(), runtime_profile(), false));
- RETURN_IF_ERROR(
- null_aware_partition_->probe_rows()->Init(id(), runtime_profile(), false));
- null_probe_rows_ = new BufferedTupleStream(
- state, child(0)->row_desc(), state->block_mgr(), block_mgr_client_,
- true /* use_initial_small_buffers */, false /* read_write */ );
- RETURN_IF_ERROR(null_probe_rows_->Init(id(), runtime_profile(), false));
+ RETURN_IF_ERROR(InitNullAwareProbePartition());
+ RETURN_IF_ERROR(InitNullProbeRows());
}
// Check for errors and free local allocations before opening children.
@@ -269,489 +173,116 @@ Status PartitionedHashJoinNode::Open(RuntimeState* state) {
// The prepare functions of probe expressions may have done local allocations implicitly
// (e.g. calling UdfBuiltins::Lower()). The probe expressions' local allocations need to
// be freed now as they don't get freed again till probing. Other exprs' local allocations
- // are freed in ExecNode::FreeLocalAllocations() in ProcessBuildInput().
+ // are freed in ExecNode::FreeLocalAllocations().
ExprContext::FreeLocalAllocations(probe_expr_ctxs_);
- RETURN_IF_ERROR(BlockingJoinNode::ConstructBuildAndOpenProbe(state, NULL));
+ RETURN_IF_ERROR(BlockingJoinNode::ProcessBuildInputAndOpenProbe(state, builder_.get()));
+ RETURN_IF_ERROR(PrepareForProbe());
+
+ UpdateState(PARTITIONING_PROBE);
RETURN_IF_ERROR(BlockingJoinNode::GetFirstProbeRow(state));
ResetForProbe();
- DCHECK(null_aware_partition_ == NULL || join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN);
+ DCHECK(null_aware_probe_partition_ == NULL
+ || join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN);
return Status::OK();
}
Status PartitionedHashJoinNode::Reset(RuntimeState* state) {
if (join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
- non_empty_build_ = false;
null_probe_output_idx_ = -1;
matched_null_probe_.clear();
nulls_build_batch_.reset();
}
state_ = PARTITIONING_BUILD;
ht_ctx_->set_level(0);
- ClosePartitions();
+ CloseAndDeletePartitions();
+ builder_->Reset();
memset(hash_tbls_, 0, sizeof(HashTable*) * PARTITION_FANOUT);
return ExecNode::Reset(state);
}
-void PartitionedHashJoinNode::ClosePartitions() {
- for (int i = 0; i < hash_partitions_.size(); ++i) {
- hash_partitions_[i]->Close(NULL);
+Status PartitionedHashJoinNode::ProcessBuildInput(RuntimeState* state) {
+ DCHECK(false) << "Should not be called, PHJ uses the BuildSink API";
+ return Status::OK();
+}
+
+void PartitionedHashJoinNode::CloseAndDeletePartitions() {
+ // Close all the partitions and clean up all references to them.
+ for (unique_ptr<ProbePartition>& partition : probe_hash_partitions_) {
+ if (partition != NULL) partition->Close(NULL);
}
- hash_partitions_.clear();
- for (list<Partition*>::iterator it = spilled_partitions_.begin();
- it != spilled_partitions_.end(); ++it) {
- (*it)->Close(NULL);
+ probe_hash_partitions_.clear();
+ for (unique_ptr<ProbePartition>& partition : spilled_partitions_) {
+ partition->Close(NULL);
}
spilled_partitions_.clear();
- for (list<Partition*>::iterator it = output_build_partitions_.begin();
- it != output_build_partitions_.end(); ++it) {
- (*it)->Close(NULL);
- }
- output_build_partitions_.clear();
if (input_partition_ != NULL) {
input_partition_->Close(NULL);
- input_partition_ = NULL;
+ input_partition_.reset();
}
- if (null_aware_partition_ != NULL) {
- null_aware_partition_->Close(NULL);
- null_aware_partition_ = NULL;
+ if (null_aware_probe_partition_ != NULL) {
+ null_aware_probe_partition_->Close(NULL);
+ null_aware_probe_partition_.reset();
}
+ output_build_partitions_.clear();
if (null_probe_rows_ != NULL) {
null_probe_rows_->Close();
- delete null_probe_rows_;
- null_probe_rows_ = NULL;
+ null_probe_rows_.reset();
}
- partition_pool_->Clear();
}
void PartitionedHashJoinNode::Close(RuntimeState* state) {
if (is_closed()) return;
- if (ht_ctx_.get() != NULL) ht_ctx_->Close();
-
+ if (ht_ctx_ != NULL) ht_ctx_->Close();
nulls_build_batch_.reset();
-
- ClosePartitions();
-
- if (block_mgr_client_ != NULL) {
- state->block_mgr()->ClearReservations(block_mgr_client_);
- }
+ CloseAndDeletePartitions();
+ if (builder_ != NULL) builder_->Close(state);
Expr::Close(build_expr_ctxs_, state);
Expr::Close(probe_expr_ctxs_, state);
Expr::Close(other_join_conjunct_ctxs_, state);
- for (const FilterContext& ctx: filters_) {
- ctx.expr->Close(state);
- }
BlockingJoinNode::Close(state);
}
-PartitionedHashJoinNode::Partition::Partition(RuntimeState* state,
- PartitionedHashJoinNode* parent, int level)
+PartitionedHashJoinNode::ProbePartition::ProbePartition(RuntimeState* state,
+ PartitionedHashJoinNode* parent, PhjBuilder::Partition* build_partition,
+ unique_ptr<BufferedTupleStream> probe_rows)
: parent_(parent),
- is_closed_(false),
- is_spilled_(false),
- level_(level) {
- build_rows_ = new BufferedTupleStream(state, parent_->child(1)->row_desc(),
- state->block_mgr(), parent_->block_mgr_client_,
- true /* use_initial_small_buffers */, false /* read_write */);
- DCHECK(build_rows_ != NULL);
- probe_rows_ = new BufferedTupleStream(state, parent_->child(0)->row_desc(),
- state->block_mgr(), parent_->block_mgr_client_,
- true /* use_initial_small_buffers */, false /* read_write */ );
- DCHECK(probe_rows_ != NULL);
-}
-
-PartitionedHashJoinNode::Partition::~Partition() {
- DCHECK(is_closed());
+ build_partition_(build_partition),
+ probe_rows_(std::move(probe_rows)) {
+ DCHECK(probe_rows_->has_write_block());
+ DCHECK(!probe_rows_->using_small_buffers());
+ DCHECK(!probe_rows_->is_pinned());
}
-int64_t PartitionedHashJoinNode::Partition::EstimatedInMemSize() const {
- return build_rows_->byte_size() + HashTable::EstimateSize(build_rows_->num_rows());
+PartitionedHashJoinNode::ProbePartition::~ProbePartition() {
+ DCHECK(IsClosed());
}
-void PartitionedHashJoinNode::Partition::Close(RowBatch* batch) {
- if (is_closed()) return;
- is_closed_ = true;
-
- if (hash_tbl_.get() != NULL) {
- COUNTER_ADD(parent_->num_hash_collisions_, hash_tbl_->NumHashCollisions());
- hash_tbl_->Close();
- }
-
- // Transfer ownership of build_rows_/probe_rows_ to batch if batch is not NULL.
- // Otherwise, close the stream here.
- if (build_rows_ != NULL) {
- if (batch == NULL) {
- build_rows_->Close();
- delete build_rows_;
- } else {
- batch->AddTupleStream(build_rows_);
- }
- build_rows_ = NULL;
- }
- if (probe_rows_ != NULL) {
- if (batch == NULL) {
- probe_rows_->Close();
- delete probe_rows_;
- } else {
- batch->AddTupleStream(probe_rows_);
- }
- probe_rows_ = NULL;
- }
-}
-
-Status PartitionedHashJoinNode::Partition::Spill(bool unpin_all_build) {
- DCHECK(!is_closed_);
- // Spilling should occur before we start processing probe rows.
- DCHECK(parent_->state_ != PROCESSING_PROBE &&
- parent_->state_ != PROBING_SPILLED_PARTITION) << parent_->state_;
- DCHECK((is_spilled_ && parent_->state_ == REPARTITIONING) ||
- probe_rows_->num_rows() == 0);
- // Close the hash table as soon as possible to release memory.
- if (hash_tbl() != NULL) {
- hash_tbl_->Close();
- hash_tbl_.reset();
- }
-
- bool got_buffer = true;
- if (build_rows_->using_small_buffers()) {
- RETURN_IF_ERROR(build_rows_->SwitchToIoBuffers(&got_buffer));
- }
- // Unpin the stream as soon as possible to increase the chances that the
- // SwitchToIoBuffers() call below will succeed.
- RETURN_IF_ERROR(build_rows_->UnpinStream(unpin_all_build));
-
- if (got_buffer && probe_rows_->using_small_buffers()) {
- RETURN_IF_ERROR(probe_rows_->SwitchToIoBuffers(&got_buffer));
- }
- if (!got_buffer) {
- // We'll try again to get the buffers when the stream fills up the small buffers.
- VLOG_QUERY << "Not enough memory to switch to IO-sized buffer for partition "
- << this << " of join=" << parent_->id_ << " build small buffers="
- << build_rows_->using_small_buffers() << " probe small buffers="
- << probe_rows_->using_small_buffers();
- VLOG_FILE << GetStackTrace();
- }
-
- if (!is_spilled_) {
- COUNTER_ADD(parent_->num_spilled_partitions_, 1);
- if (parent_->num_spilled_partitions_->value() == 1) {
- parent_->runtime_profile()->AppendExecOption("Spilled");
- }
- }
-
- is_spilled_ = true;
- return Status::OK();
-}
-
-Status PartitionedHashJoinNode::Partition::BuildHashTable(RuntimeState* state,
- bool* built) {
- DCHECK(build_rows_ != NULL);
- *built = false;
-
- // TODO: estimate the entire size of the hash table and reserve all of it from
- // the block mgr.
-
- // We got the buffers we think we will need, try to build the hash table.
- RETURN_IF_ERROR(build_rows_->PinStream(false, built));
- if (!*built) return Status::OK();
+Status PartitionedHashJoinNode::ProbePartition::PrepareForRead() {
bool got_read_buffer;
- RETURN_IF_ERROR(build_rows_->PrepareForRead(false, &got_read_buffer));
- DCHECK(got_read_buffer) << "Stream was already pinned.";
-
- RowBatch batch(parent_->child(1)->row_desc(), state->batch_size(),
- parent_->mem_tracker());
- HashTableCtx* ctx = parent_->ht_ctx_.get();
- // TODO: move the batch and indices as members to avoid reallocating.
- vector<BufferedTupleStream::RowIdx> indices;
- bool eos = false;
-
- // Allocate the partition-local hash table. Initialize the number of buckets based on
- // the number of build rows (the number of rows is known at this point). This assumes
- // there are no duplicates which can be wrong. However, the upside in the common case
- // (few/no duplicates) is large and the downside when there are is low (a bit more
- // memory; the bucket memory is small compared to the memory needed for all the build
- // side allocations).
- // One corner case is if the stream contains tuples with zero footprint (no materialized
- // slots). If the tuples occupy no space, this implies all rows will be duplicates, so
- // create a small hash table, IMPALA-2256.
- // We always start with small pages in the hash table.
- int64_t estimated_num_buckets = build_rows()->RowConsumesMemory() ?
- HashTable::EstimateNumBuckets(build_rows()->num_rows()) : state->batch_size() * 2;
- hash_tbl_.reset(HashTable::Create(state, parent_->block_mgr_client_,
- true /* store_duplicates */,
- parent_->child(1)->row_desc().tuple_descriptors().size(), build_rows(),
- 1 << (32 - NUM_PARTITIONING_BITS), estimated_num_buckets));
- if (!hash_tbl_->Init()) goto not_built;
-
- do {
- RETURN_IF_ERROR(build_rows_->GetNext(&batch, &eos, &indices));
- DCHECK_EQ(batch.num_rows(), indices.size());
- DCHECK_LE(batch.num_rows(), hash_tbl_->EmptyBuckets())
- << build_rows()->RowConsumesMemory();
- TPrefetchMode::type prefetch_mode = state->query_options().prefetch_mode;
- SCOPED_TIMER(parent_->build_timer_);
- if (parent_->insert_batch_fn_ != NULL) {
- InsertBatchFn insert_batch_fn;
- if (ctx->level() == 0) {
- insert_batch_fn = parent_->insert_batch_fn_level0_;
- } else {
- insert_batch_fn = parent_->insert_batch_fn_;
- }
- DCHECK(insert_batch_fn != NULL);
- if (UNLIKELY(!insert_batch_fn(this, prefetch_mode, ctx, &batch, indices))) {
- goto not_built;
- }
- } else {
- if (UNLIKELY(!InsertBatch(prefetch_mode, ctx, &batch, indices))) {
- goto not_built;
- }
- }
- RETURN_IF_ERROR(state->GetQueryStatus());
- parent_->FreeLocalAllocations();
- batch.Reset();
- } while (!eos);
-
- // The hash table fits in memory and is built.
- DCHECK(*built);
- DCHECK(hash_tbl_.get() != NULL);
- is_spilled_ = false;
- COUNTER_ADD(parent_->num_hash_buckets_, hash_tbl_->num_buckets());
- return Status::OK();
-
-not_built:
- *built = false;
- if (hash_tbl_.get() != NULL) {
- hash_tbl_->Close();
- hash_tbl_.reset();
- }
- return Status::OK();
-}
-
-void PartitionedHashJoinNode::AllocateRuntimeFilters(RuntimeState* state) {
- DCHECK(join_op_ != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN || filters_.size() == 0)
- << "Runtime filters not supported with NULL_AWARE_LEFT_ANTI_JOIN";
- DCHECK(ht_ctx_.get() != NULL);
- for (int i = 0; i < filters_.size(); ++i) {
- filters_[i].local_bloom_filter =
- state->filter_bank()->AllocateScratchBloomFilter(filters_[i].filter->id());
- }
-}
-
-void PartitionedHashJoinNode::PublishRuntimeFilters(RuntimeState* state,
- int64_t total_build_rows) {
- int32_t num_enabled_filters = 0;
- // Use total_build_rows to estimate FP-rate of each Bloom filter, and publish
- // 'always-true' filters if it's too high. Doing so saves CPU at the coordinator,
- // serialisation time, and reduces the cost of applying the filter at the scan - most
- // significantly for per-row filters. However, the number of build rows could be a very
- // poor estimate of the NDV - particularly if the filter expression is a function of
- // several columns.
- // TODO: Better heuristic.
- for (const FilterContext& ctx: filters_) {
- // TODO: Consider checking actual number of bits set in filter to compute FP rate.
- // TODO: Consider checking this every few batches or so.
- bool fp_rate_too_high =
- state->filter_bank()->FpRateTooHigh(ctx.filter->filter_size(), total_build_rows);
- state->filter_bank()->UpdateFilterFromLocal(ctx.filter->id(),
- fp_rate_too_high ? BloomFilter::ALWAYS_TRUE_FILTER : ctx.local_bloom_filter);
-
- num_enabled_filters += !fp_rate_too_high;
- }
-
- if (filters_.size() > 0) {
- if (num_enabled_filters == filters_.size()) {
- runtime_profile()->AppendExecOption(
- Substitute("$0 of $0 Runtime Filter$1 Published", filters_.size(),
- filters_.size() == 1 ? "" : "s"));
- } else {
- string exec_option = Substitute("$0 of $1 Runtime Filter$2 Published, $3 Disabled",
- num_enabled_filters, filters_.size(), filters_.size() == 1 ? "" : "s",
- filters_.size() - num_enabled_filters);
- runtime_profile()->AppendExecOption(exec_option);
- }
- }
-}
-
-bool PartitionedHashJoinNode::AppendRowStreamFull(BufferedTupleStream* stream,
- TupleRow* row, Status* status) {
- while (status->ok()) {
- // Check if the stream is still using small buffers and try to switch to IO-buffers.
- if (stream->using_small_buffers()) {
- bool got_buffer;
- *status = stream->SwitchToIoBuffers(&got_buffer);
- if (!status->ok()) return false;
- if (got_buffer) {
- if (LIKELY(stream->AddRow(row, status))) return true;
- if (!status->ok()) return false;
- }
- }
- // We ran out of memory. Pick a partition to spill.
- Partition* spilled_partition;
- *status = SpillPartition(&spilled_partition);
- if (!status->ok()) return false;
- if (stream->AddRow(row, status)) return true;
- // Spilling one partition does not guarantee we can append a row. Keep
- // spilling until we can append this row.
- }
- return false;
-}
-
-// TODO: Can we do better with a different spilling heuristic?
-Status PartitionedHashJoinNode::SpillPartition(Partition** spilled_partition) {
- int64_t max_freed_mem = 0;
- int partition_idx = -1;
- *spilled_partition = NULL;
-
- // Iterate over the partitions and pick the largest partition to spill.
- for (int i = 0; i < hash_partitions_.size(); ++i) {
- Partition* candidate = hash_partitions_[i];
- if (candidate->is_closed()) continue;
- if (candidate->is_spilled()) continue;
- int64_t mem = candidate->build_rows()->bytes_in_mem(false);
- // TODO: What should we do here if probe_rows()->num_rows() > 0 ? We should be able
- // to spill INNER JOINS, but not many of the other joins.
- if (candidate->hash_tbl() != NULL) {
- // IMPALA-1488: Do not spill partitions that already had matches, because we
- // are going to lose information and return wrong results.
- if (UNLIKELY(candidate->hash_tbl()->HasMatches())) continue;
- mem += candidate->hash_tbl()->ByteSize();
- }
- if (mem > max_freed_mem) {
- max_freed_mem = mem;
- partition_idx = i;
- }
- }
-
- if (partition_idx == -1) {
- // Could not find a partition to spill. This means the mem limit was just too low.
- return runtime_state_->block_mgr()->MemLimitTooLowError(block_mgr_client_, id());
- }
-
- VLOG(2) << "Spilling partition: " << partition_idx << endl << NodeDebugString();
- RETURN_IF_ERROR(hash_partitions_[partition_idx]->Spill(false));
- DCHECK(hash_partitions_[partition_idx]->probe_rows()->has_write_block());
- hash_tbls_[partition_idx] = NULL;
- *spilled_partition = hash_partitions_[partition_idx];
- return Status::OK();
-}
-
-Status PartitionedHashJoinNode::ProcessBuildInput(RuntimeState* state) {
- // Do a full scan of child(1) and partition the rows.
- {
- SCOPED_STOP_WATCH(&built_probe_overlap_stop_watch_);
- RETURN_IF_ERROR(child(1)->Open(state));
+ RETURN_IF_ERROR(probe_rows_->PrepareForRead(true, &got_read_buffer));
+ if (!got_read_buffer) {
+ return parent_->mem_tracker()->MemLimitExceeded(parent_->runtime_state_,
+ Substitute(PREPARE_FOR_READ_FAILED_ERROR_MSG, parent_->id_));
}
- RETURN_IF_ERROR(ProcessBuildInput(state, 0));
-
- UpdateState(PROCESSING_PROBE);
return Status::OK();
}
-Status PartitionedHashJoinNode::ProcessBuildInput(RuntimeState* state, int level) {
- if (UNLIKELY(level >= MAX_PARTITION_DEPTH)) {
- return Status(TErrorCode::PARTITIONED_HASH_JOIN_MAX_PARTITION_DEPTH, id_,
- MAX_PARTITION_DEPTH);
- }
-
- DCHECK(hash_partitions_.empty());
- if (input_partition_ != NULL) {
- DCHECK(input_partition_->build_rows() != NULL);
- DCHECK_EQ(input_partition_->build_rows()->blocks_pinned(), 0) << NodeDebugString();
- bool got_read_buffer;
- RETURN_IF_ERROR(
- input_partition_->build_rows()->PrepareForRead(true, &got_read_buffer));
- if (!got_read_buffer) {
- return mem_tracker()->MemLimitExceeded(
- state, Substitute(PREPARE_FOR_READ_FAILED_ERROR_MSG, id_));
- }
- }
-
- for (int i = 0; i < PARTITION_FANOUT; ++i) {
- Partition* new_partition = new Partition(state, this, level);
- DCHECK(new_partition != NULL);
- hash_partitions_.push_back(partition_pool_->Add(new_partition));
- RETURN_IF_ERROR(new_partition->build_rows()->Init(id(), runtime_profile(), true));
- // Initialize a buffer for the probe here to make sure why have it if we need it.
- // While this is not strictly necessary (there are some cases where we won't need this
- // buffer), the benefit is low. Not grabbing this buffer means there is an additional
- // buffer that could be used for the build side. However since this is only one
- // buffer, there is only a small range of build input sizes where this is beneficial
- // (an IO buffer size). It makes the logic much more complex to enable this
- // optimization.
- RETURN_IF_ERROR(new_partition->probe_rows()->Init(id(), runtime_profile(), false));
- }
- COUNTER_ADD(partitions_created_, PARTITION_FANOUT);
- COUNTER_SET(max_partition_level_, level);
-
- DCHECK_EQ(build_batch_->num_rows(), 0);
- bool eos = false;
- int64_t total_build_rows = 0;
- while (!eos) {
- RETURN_IF_CANCELLED(state);
- RETURN_IF_ERROR(QueryMaintenance(state));
- // 'probe_expr_ctxs_' should have made no local allocations in this function.
- DCHECK(!ExprContext::HasLocalAllocations(probe_expr_ctxs_))
- << Expr::DebugString(probe_expr_ctxs_);
- if (input_partition_ == NULL) {
- // If we are still consuming batches from the build side.
- {
- SCOPED_STOP_WATCH(&built_probe_overlap_stop_watch_);
- RETURN_IF_ERROR(child(1)->GetNext(state, build_batch_.get(), &eos));
- }
- COUNTER_ADD(build_row_counter_, build_batch_->num_rows());
- } else {
- // If we are consuming batches that have already been partitioned.
- RETURN_IF_ERROR(input_partition_->build_rows()->GetNext(build_batch_.get(), &eos));
- }
- total_build_rows += build_batch_->num_rows();
-
- SCOPED_TIMER(partition_build_timer_);
- if (process_build_batch_fn_ == NULL) {
- bool build_filters = ht_ctx_->level() == 0;
- RETURN_IF_ERROR(ProcessBuildBatch(build_batch_.get(), build_filters));
+void PartitionedHashJoinNode::ProbePartition::Close(RowBatch* batch) {
+ if (IsClosed()) return;
+ if (probe_rows_ != NULL) {
+ if (batch == NULL) {
+ probe_rows_->Close();
} else {
- DCHECK(process_build_batch_fn_level0_ != NULL);
- if (ht_ctx_->level() == 0) {
- RETURN_IF_ERROR(
- process_build_batch_fn_level0_(this, build_batch_.get(), true));
- } else {
- RETURN_IF_ERROR(process_build_batch_fn_(this, build_batch_.get(), false));
- }
+ batch->AddTupleStream(probe_rows_.release());
}
- build_batch_->Reset();
- DCHECK(!build_batch_->AtCapacity());
+ probe_rows_.reset();
}
-
- if (ht_ctx_->level() == 0) PublishRuntimeFilters(state, total_build_rows);
-
- if (input_partition_ != NULL) {
- // Done repartitioning build input, close it now.
- input_partition_->build_rows_->Close();
- input_partition_->build_rows_ = NULL;
- }
-
- stringstream ss;
- ss << Substitute("PHJ(node_id=$0) partitioned(level=$1) $2 rows into:", id(),
- hash_partitions_[0]->level_, total_build_rows);
- for (int i = 0; i < hash_partitions_.size(); ++i) {
- Partition* partition = hash_partitions_[i];
- double percent =
- partition->build_rows()->num_rows() * 100 / static_cast<double>(total_build_rows);
- ss << " " << i << " " << (partition->is_spilled() ? "spilled" : "not spilled")
- << " (fraction=" << fixed << setprecision(2) << percent << "%)" << endl
- << " #rows:" << partition->build_rows()->num_rows() << endl;
- COUNTER_SET(largest_partition_percent_, static_cast<int64_t>(percent));
- }
- VLOG(2) << ss.str();
-
- COUNTER_ADD(num_build_rows_partitioned_, total_build_rows);
- non_empty_build_ |= (total_build_rows > 0);
- RETURN_IF_ERROR(BuildHashTables(state));
- return Status::OK();
}
Status PartitionedHashJoinNode::NextProbeRowBatch(
RuntimeState* state, RowBatch* out_batch) {
+ DCHECK_EQ(state_, PARTITIONING_PROBE);
DCHECK(probe_batch_pos_ == probe_batch_->num_rows() || probe_batch_pos_ == -1);
do {
// Loop until we find a non-empty row batch.
@@ -777,6 +308,7 @@ Status PartitionedHashJoinNode::NextProbeRowBatch(
Status PartitionedHashJoinNode::NextSpilledProbeRowBatch(
RuntimeState* state, RowBatch* out_batch) {
DCHECK(input_partition_ != NULL);
+ DCHECK(state_ == PROBING_SPILLED_PARTITION || state_ == REPARTITIONING_PROBE);
probe_batch_->TransferResourceOwnership(out_batch);
if (out_batch->AtCapacity()) {
// The out_batch has resources associated with it that will be recycled on the
@@ -788,114 +320,108 @@ Status PartitionedHashJoinNode::NextSpilledProbeRowBatch(
if (LIKELY(probe_rows->rows_returned() < probe_rows->num_rows())) {
// Continue from the current probe stream.
bool eos = false;
- RETURN_IF_ERROR(input_partition_->probe_rows()->GetNext(probe_batch_.get(), &eos));
+ RETURN_IF_ERROR(probe_rows->GetNext(probe_batch_.get(), &eos));
DCHECK_GT(probe_batch_->num_rows(), 0);
ResetForProbe();
} else {
- // Done with this partition.
- if (!input_partition_->is_spilled() &&
- (join_op_ == TJoinOp::RIGHT_OUTER_JOIN || join_op_ == TJoinOp::RIGHT_ANTI_JOIN ||
- join_op_ == TJoinOp::FULL_OUTER_JOIN)) {
+ // Finished processing spilled probe rows from this partition.
+ if (state_ == PROBING_SPILLED_PARTITION && NeedToProcessUnmatchedBuildRows()) {
+ // If the build partition was in memory, we are done probing this partition.
// In case of right-outer, right-anti and full-outer joins, we move this partition
// to the list of partitions that we need to output their unmatched build rows.
DCHECK(output_build_partitions_.empty());
- DCHECK(input_partition_->hash_tbl_.get() != NULL) << " id: " << id_
- << " Build: " << input_partition_->build_rows()->num_rows()
- << " Probe: " << probe_rows->num_rows() << endl
- << GetStackTrace();
+ DCHECK(input_partition_->build_partition()->hash_tbl() != NULL)
+ << " id: " << id_
+ << " Build: " << input_partition_->build_partition()->build_rows()->num_rows()
+ << " Probe: " << probe_rows->num_rows() << endl
+ << GetStackTrace();
hash_tbl_iterator_ =
- input_partition_->hash_tbl_->FirstUnmatched(ht_ctx_.get());
- output_build_partitions_.push_back(input_partition_);
+ input_partition_->build_partition()->hash_tbl()->FirstUnmatched(ht_ctx_.get());
+ output_build_partitions_.push_back(input_partition_->build_partition());
} else {
- // In any other case, just close the input partition.
- input_partition_->Close(out_batch);
- input_partition_ = NULL;
+ // In any other case, just close the input build partition.
+ input_partition_->build_partition()->Close(out_batch);
}
+ input_partition_->Close(out_batch);
+ input_partition_.reset();
current_probe_row_ = NULL;
probe_batch_pos_ = -1;
}
return Status::OK();
}
-Status PartitionedHashJoinNode::PrepareNextPartition(RuntimeState* state) {
+Status PartitionedHashJoinNode::PrepareSpilledPartitionForProbe(
+ RuntimeState* state, bool* got_partition) {
+ VLOG(2) << "PrepareSpilledPartitionForProbe\n" << NodeDebugString();
DCHECK(input_partition_ == NULL);
- if (spilled_partitions_.empty()) return Status::OK();
- VLOG(2) << "PrepareNextPartition\n" << NodeDebugString();
+ DCHECK_EQ(builder_->num_hash_partitions(), 0);
+ DCHECK(probe_hash_partitions_.empty());
+ if (spilled_partitions_.empty()) {
+ *got_partition = false;
+ return Status::OK();
+ }
- input_partition_ = spilled_partitions_.front();
+ input_partition_ = std::move(spilled_partitions_.front());
spilled_partitions_.pop_front();
- DCHECK(input_partition_->is_spilled());
+ PhjBuilder::Partition* build_partition = input_partition_->build_partition();
+ DCHECK(build_partition->is_spilled());
- // Reserve one buffer to read the probe side.
- bool got_read_buffer;
- RETURN_IF_ERROR(input_partition_->probe_rows()->PrepareForRead(true, &got_read_buffer));
- if (!got_read_buffer) {
- return mem_tracker()->MemLimitExceeded(
- state, Substitute(PREPARE_FOR_READ_FAILED_ERROR_MSG, id_));
- }
- ht_ctx_->set_level(input_partition_->level_);
-
- int64_t mem_limit = mem_tracker()->SpareCapacity();
- // Try to build a hash table on top the spilled build rows.
- bool built = false;
- int64_t estimated_memory = input_partition_->EstimatedInMemSize();
- if (estimated_memory < mem_limit) {
- ht_ctx_->set_level(input_partition_->level_);
- RETURN_IF_ERROR(input_partition_->BuildHashTable(state, &built));
- } else {
- LOG(INFO) << "In hash join id=" << id_ << " the estimated needed memory ("
- << estimated_memory << ") for partition " << input_partition_ << " with "
- << input_partition_->build_rows()->num_rows() << " build rows is larger "
- << " than the mem_limit (" << mem_limit << ").";
- }
+ // Make sure we have a buffer to read the probe rows before we build the hash table.
+ RETURN_IF_ERROR(input_partition_->PrepareForRead());
+ ht_ctx_->set_level(build_partition->level());
+
+ // Try to build a hash table for the spilled build partition.
+ bool built;
+ RETURN_IF_ERROR(build_partition->BuildHashTable(&built));
if (!built) {
// This build partition still does not fit in memory, repartition.
- UpdateState(REPARTITIONING);
- DCHECK(input_partition_->is_spilled());
- input_partition_->Spill(false);
- ht_ctx_->set_level(input_partition_->level_ + 1);
- int64_t num_input_rows = input_partition_->build_rows()->num_rows();
- RETURN_IF_ERROR(ProcessBuildInput(state, input_partition_->level_ + 1));
+ UpdateState(REPARTITIONING_BUILD);
+
+ int next_partition_level = build_partition->level() + 1;
+ if (UNLIKELY(next_partition_level >= MAX_PARTITION_DEPTH)) {
+ return Status(TErrorCode::PARTITIONED_HASH_JOIN_MAX_PARTITION_DEPTH, id(),
+ MAX_PARTITION_DEPTH);
+ }
+ ht_ctx_->set_level(next_partition_level);
+
+ // Spill to free memory from hash tables and pinned streams for use in new partitions.
+ build_partition->Spill(BufferedTupleStream::UNPIN_ALL);
+ // Temporarily free up the probe buffer to use when repartitioning.
+ input_partition_->probe_rows()->UnpinStream(BufferedTupleStream::UNPIN_ALL);
+ DCHECK_EQ(build_partition->build_rows()->blocks_pinned(), 0) << NodeDebugString();
+ DCHECK_EQ(input_partition_->probe_rows()->blocks_pinned(), 0) << NodeDebugString();
+ int64_t num_input_rows = build_partition->build_rows()->num_rows();
+ RETURN_IF_ERROR(builder_->RepartitionBuildInput(
+ build_partition, next_partition_level, input_partition_->probe_rows()));
// Check if there was any reduction in the size of partitions after repartitioning.
- int64_t largest_partition = LargestSpilledPartition();
- DCHECK_GE(num_input_rows, largest_partition) << "Cannot have a partition with "
- "more rows than the input";
- if (UNLIKELY(num_input_rows == largest_partition)) {
+ int64_t largest_partition_rows = builder_->LargestPartitionRows();
+ DCHECK_GE(num_input_rows, largest_partition_rows) << "Cannot have a partition with "
+ "more rows than the input";
+ if (UNLIKELY(num_input_rows == largest_partition_rows)) {
return Status(TErrorCode::PARTITIONED_HASH_JOIN_REPARTITION_FAILS, id_,
- input_partition_->level_ + 1, num_input_rows);
+ next_partition_level, num_input_rows);
}
+
+ RETURN_IF_ERROR(PrepareForProbe());
+ UpdateState(REPARTITIONING_PROBE);
} else {
- DCHECK(hash_partitions_.empty());
- DCHECK(!input_partition_->is_spilled());
- DCHECK(input_partition_->hash_tbl() != NULL);
+ DCHECK(!input_partition_->build_partition()->is_spilled());
+ DCHECK(input_partition_->build_partition()->hash_tbl() != NULL);
// In this case, we did not have to partition the build again, we just built
// a hash table. This means the probe does not have to be partitioned either.
for (int i = 0; i < PARTITION_FANOUT; ++i) {
- hash_tbls_[i] = input_partition_->hash_tbl();
+ hash_tbls_[i] = input_partition_->build_partition()->hash_tbl();
}
UpdateState(PROBING_SPILLED_PARTITION);
}
- COUNTER_ADD(num_repartitions_, 1);
COUNTER_ADD(num_probe_rows_partitioned_, input_partition_->probe_rows()->num_rows());
+ *got_partition = true;
return Status::OK();
}
-int64_t PartitionedHashJoinNode::LargestSpilledPartition() const {
- int64_t max_rows = 0;
- for (int i = 0; i < hash_partitions_.size(); ++i) {
- Partition* partition = hash_partitions_[i];
- DCHECK(partition != NULL) << i << " " << hash_partitions_.size();
- if (partition->is_closed() || !partition->is_spilled()) continue;
- int64_t rows = partition->build_rows()->num_rows();
- rows += partition->probe_rows()->num_rows();
- if (rows > max_rows) max_rows = rows;
- }
- return max_rows;
-}
-
int PartitionedHashJoinNode::ProcessProbeBatch(
const TJoinOp::type join_op, TPrefetchMode::type prefetch_mode,
RowBatch* out_batch, HashTableCtx* ht_ctx, Status* status) {
@@ -948,24 +474,26 @@ Status PartitionedHashJoinNode::GetNext(RuntimeState* state, RowBatch* out_batch
Status status = Status::OK();
while (true) {
+ DCHECK(!*eos);
DCHECK(status.ok());
DCHECK_NE(state_, PARTITIONING_BUILD) << "Should not be in GetNext()";
RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR(QueryMaintenance(state));
- if ((join_op_ == TJoinOp::RIGHT_OUTER_JOIN || join_op_ == TJoinOp::RIGHT_ANTI_JOIN ||
- join_op_ == TJoinOp::FULL_OUTER_JOIN) &&
- !output_build_partitions_.empty()) {
- // In case of right-outer, right-anti and full-outer joins, flush the remaining
- // unmatched build rows of any partition we are done processing, before processing
- // the next batch.
+ if (!output_build_partitions_.empty()) {
+ DCHECK(NeedToProcessUnmatchedBuildRows());
+
+ // Flush the remaining unmatched build rows of any partitions we are done
+ // processing before moving onto the next partition.
OutputUnmatchedBuild(out_batch);
if (!output_build_partitions_.empty()) break;
- // Finished to output unmatched build rows, move to next partition.
- DCHECK(hash_partitions_.empty());
- RETURN_IF_ERROR(PrepareNextPartition(state));
- if (input_partition_ == NULL) {
+ // Finished outputting unmatched build rows, move to next partition.
+ DCHECK_EQ(builder_->num_hash_partitions(), 0);
+ DCHECK(probe_hash_partitions_.empty());
+ bool got_partition;
+ RETURN_IF_ERROR(PrepareSpilledPartitionForProbe(state, &got_partition));
+ if (!got_partition) {
*eos = true;
break;
}
@@ -974,7 +502,8 @@ Status PartitionedHashJoinNode::GetNext(RuntimeState* state, RowBatch* out_batch
if (join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
// In this case, we want to output rows from the null aware partition.
- if (null_aware_partition_ == NULL) {
+ if (builder_->null_aware_partition() == NULL) {
+ DCHECK(null_aware_probe_partition_ == NULL);
*eos = true;
break;
}
@@ -985,14 +514,14 @@ Status PartitionedHashJoinNode::GetNext(RuntimeState* state, RowBatch* out_batch
continue;
}
- if (nulls_build_batch_.get() != NULL) {
+ if (nulls_build_batch_ != NULL) {
RETURN_IF_ERROR(OutputNullAwareProbeRows(state, out_batch));
if (out_batch->AtCapacity()) break;
continue;
}
}
- // Finish up the current batch.
+ // Finish processing rows in the current probe batch.
if (probe_batch_pos_ != -1) {
// Putting SCOPED_TIMER in ProcessProbeBatch() causes weird exception handling IR
// in the xcompiled function, so call it here instead.
@@ -1026,10 +555,16 @@ Status PartitionedHashJoinNode::GetNext(RuntimeState* state, RowBatch* out_batch
}
// Try to continue from the current probe side input.
- if (input_partition_ == NULL) {
+ if (state_ == PARTITIONING_PROBE) {
+ DCHECK(input_partition_ == NULL);
RETURN_IF_ERROR(NextProbeRowBatch(state, out_batch));
} else {
- RETURN_IF_ERROR(NextSpilledProbeRowBatch(state, out_batch));
+ DCHECK(state_ == REPARTITIONING_PROBE || state_ == PROBING_SPILLED_PARTITION)
+ << state_;
+ DCHECK(probe_side_eos_);
+ if (input_partition_ != NULL) {
+ RETURN_IF_ERROR(NextSpilledProbeRowBatch(state, out_batch));
+ }
}
// Free local allocations of the probe side expressions only after ExprValuesCache
// has been reset.
@@ -1045,30 +580,34 @@ Status PartitionedHashJoinNode::GetNext(RuntimeState* state, RowBatch* out_batch
if (probe_batch_pos_ == 0) continue;
DCHECK_EQ(probe_batch_pos_, -1);
- // Finished up all probe rows for hash_partitions_.
- RETURN_IF_ERROR(CleanUpHashPartitions(out_batch));
- if (out_batch->AtCapacity()) break;
+ // Finished up all probe rows for 'hash_partitions_'. We may have already cleaned up
+ // the hash partitions, e.g. if we had to output some unmatched build rows below.
+ if (builder_->num_hash_partitions() != 0) {
+ RETURN_IF_ERROR(CleanUpHashPartitions(out_batch));
+ if (out_batch->AtCapacity()) break;
+ }
- if ((join_op_ == TJoinOp::RIGHT_OUTER_JOIN || join_op_ == TJoinOp::RIGHT_ANTI_JOIN ||
- join_op_ == TJoinOp::FULL_OUTER_JOIN) &&
- !output_build_partitions_.empty()) {
+ if (!output_build_partitions_.empty()) {
+ DCHECK(NeedToProcessUnmatchedBuildRows());
// There are some partitions that need to flush their unmatched build rows.
- continue;
+ OutputUnmatchedBuild(out_batch);
+ if (!output_build_partitions_.empty()) break;
}
- // Move onto the next partition.
- RETURN_IF_ERROR(PrepareNextPartition(state));
+ // Move onto the next spilled partition.
+ bool got_partition;
+ RETURN_IF_ERROR(PrepareSpilledPartitionForProbe(state, &got_partition));
+ if (got_partition) continue; // Probe the spilled partition.
- if (input_partition_ == NULL) {
- if (join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
- RETURN_IF_ERROR(PrepareNullAwarePartition());
- }
- if (null_aware_partition_ == NULL) {
- *eos = true;
- break;
- } else {
- *eos = false;
- }
+ if (join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
+ // Prepare the null-aware partitions, then resume at the top of the loop to output
+ // the rows.
+ RETURN_IF_ERROR(PrepareNullAwarePartition());
+ continue;
}
+ DCHECK(builder_->null_aware_partition() == NULL);
+
+ *eos = true;
+ break;
}
if (ReachedLimit()) *eos = true;
@@ -1077,8 +616,7 @@ Status PartitionedHashJoinNode::GetNext(RuntimeState* state, RowBatch* out_batch
void PartitionedHashJoinNode::OutputUnmatchedBuild(RowBatch* out_batch) {
SCOPED_TIMER(probe_timer_);
- DCHECK(join_op_ == TJoinOp::RIGHT_OUTER_JOIN || join_op_ == TJoinOp::RIGHT_ANTI_JOIN ||
- join_op_ == TJoinOp::FULL_OUTER_JOIN);
+ DCHECK(NeedToProcessUnmatchedBuildRows());
DCHECK(!output_build_partitions_.empty());
ExprContext* const* conjunct_ctxs = &conjunct_ctxs_[0];
const int num_conjuncts = conjunct_ctxs_.size();
@@ -1141,8 +679,9 @@ Status PartitionedHashJoinNode::PrepareNullAwareNullProbe() {
Status PartitionedHashJoinNode::OutputNullAwareNullProbe(RuntimeState* state,
RowBatch* out_batch) {
- DCHECK(null_aware_partition_ != NULL);
- DCHECK(nulls_build_batch_.get() == NULL);
+ DCHECK(builder_->null_aware_partition() != NULL);
+ DCHECK(null_aware_probe_partition_ != NULL);
+ DCHECK(nulls_build_batch_ == NULL);
DCHECK_NE(probe_batch_pos_, -1);
if (probe_batch_pos_ == probe_batch_->num_rows()) {
@@ -1151,12 +690,12 @@ Status PartitionedHashJoinNode::OutputNullAwareNullProbe(RuntimeState* state,
if (out_batch->AtCapacity()) return Status::OK();
bool eos;
RETURN_IF_ERROR(null_probe_rows_->GetNext(probe_batch_.get(), &eos));
- if (probe_batch_->num_rows() == 0) {
+ if (probe_batch_->num_rows() == 0 && eos) {
// All done.
- null_aware_partition_->Close(out_batch);
- null_aware_partition_ = NULL;
- out_batch->AddTupleStream(null_probe_rows_);
- null_probe_rows_ = NULL;
+ builder_->CloseNullAwarePartition(out_batch);
+ null_aware_probe_partition_->Close(out_batch);
+ null_aware_probe_partition_.reset();
+ out_batch->AddTupleStream(null_probe_rows_.release());
return Status::OK();
}
}
@@ -1183,14 +722,54 @@ static Status NullAwareAntiJoinError(bool build) {
" many NULLs on the $0 side to perform this join.", build ? "build" : "probe"));
}
+Status PartitionedHashJoinNode::InitNullAwareProbePartition() {
+ RuntimeState* state = runtime_state_;
+ unique_ptr<BufferedTupleStream> probe_rows = std::make_unique<BufferedTupleStream>(
+ state, child(0)->row_desc(), state->block_mgr(), builder_->block_mgr_client(),
+ false /* use_initial_small_buffers */, false /* read_write */);
+ Status status = probe_rows->Init(id(), runtime_profile(), false);
+ if (!status.ok()) goto error;
+ bool got_buffer;
+ status = probe_rows->PrepareForWrite(&got_buffer);
+ if (!status.ok()) goto error;
+ if (!got_buffer) {
+ status = state->block_mgr()->MemLimitTooLowError(builder_->block_mgr_client(), id());
+ goto error;
+ }
+ null_aware_probe_partition_.reset(new ProbePartition(
+ state, this, builder_->null_aware_partition(), std::move(probe_rows)));
+ return Status::OK();
+
+error:
+ DCHECK(!status.ok());
+ // Ensure the temporary 'probe_rows' stream is closed correctly on error.
+ probe_rows->Close();
+ return status;
+}
+
+Status PartitionedHashJoinNode::InitNullProbeRows() {
+ RuntimeState* state = runtime_state_;
+ null_probe_rows_ = std::make_unique<BufferedTupleStream>(state, child(0)->row_desc(),
+ state->block_mgr(), builder_->block_mgr_client(),
+ false /* use_initial_small_buffers */, false /* read_write */);
+ RETURN_IF_ERROR(null_probe_rows_->Init(id(), runtime_profile(), false));
+ bool got_buffer;
+ RETURN_IF_ERROR(null_probe_rows_->PrepareForWrite(&got_buffer));
+ if (!got_buffer) {
+ return state->block_mgr()->MemLimitTooLowError(builder_->block_mgr_client(), id());
+ }
+ return Status::OK();
+}
+
Status PartitionedHashJoinNode::PrepareNullAwarePartition() {
- DCHECK(null_aware_partition_ != NULL);
- DCHECK(nulls_build_batch_.get() == NULL);
+ DCHECK(builder_->null_aware_partition() != NULL);
+ DCHECK(null_aware_probe_partition_ != NULL);
+ DCHECK(nulls_build_batch_ == NULL);
DCHECK_EQ(probe_batch_pos_, -1);
DCHECK_EQ(probe_batch_->num_rows(), 0);
- BufferedTupleStream* build_stream = null_aware_partition_->build_rows();
- BufferedTupleStream* probe_stream = null_aware_partition_->probe_rows();
+ BufferedTupleStream* build_stream = builder_->null_aware_partition()->build_rows();
+ BufferedTupleStream* probe_stream = null_aware_probe_partition_->probe_rows();
if (build_stream->num_rows() == 0) {
// There were no build rows. Nothing to do. Just prepare to output the null
@@ -1219,14 +798,15 @@ Status PartitionedHashJoinNode::PrepareNullAwarePartition() {
Status PartitionedHashJoinNode::OutputNullAwareProbeRows(RuntimeState* state,
RowBatch* out_batch) {
- DCHECK(null_aware_partition_ != NULL);
- DCHECK(nulls_build_batch_.get() != NULL);
+ DCHECK(builder_->null_aware_partition() != NULL);
+ DCHECK(null_aware_probe_partition_ != NULL);
+ DCHECK(nulls_build_batch_ != NULL);
ExprContext* const* join_conjunct_ctxs = &other_join_conjunct_ctxs_[0];
int num_join_conjuncts = other_join_conjunct_ctxs_.size();
- DCHECK(probe_batch_.get() != NULL);
+ DCHECK(probe_batch_ != NULL);
- BufferedTupleStream* probe_stream = null_aware_partition_->probe_rows();
+ BufferedTupleStream* probe_stream = null_aware_probe_partition_->probe_rows();
if (probe_batch_pos_ == probe_batch_->num_rows()) {
probe_batch_pos_ = 0;
probe_batch_->TransferResourceOwnership(out_batch);
@@ -1237,7 +817,7 @@ Status PartitionedHashJoinNode::OutputNullAwareProbeRows(RuntimeState* state,
RETURN_IF_ERROR(probe_stream->GetNext(probe_batch_.get(), &eos));
if (probe_batch_->num_rows() == 0) {
- RETURN_IF_ERROR(EvaluateNullProbe(null_aware_partition_->build_rows()));
+ RETURN_IF_ERROR(EvaluateNullProbe(builder_->null_aware_partition()->build_rows()));
nulls_build_batch_.reset();
RETURN_IF_ERROR(PrepareNullAwareNullProbe());
return Status::OK();
@@ -1269,100 +849,55 @@ Status PartitionedHashJoinNode::OutputNullAwareProbeRows(RuntimeState* state,
return Status::OK();
}
-// When this function is called, we've finished processing the current build input
-// (either from child(1) or from repartitioning a spilled partition). The build rows
-// have only been partitioned, we still need to build hash tables over them. Some
-// of the partitions could have already been spilled and attempting to build hash
-// tables over the non-spilled ones can cause them to spill.
-//
-// At the end of the function we'd like all partitions to either have a hash table
-// (and therefore not spilled) or be spilled. Partitions that have a hash table don't
-// need to spill on the probe side.
-//
-// This maps perfectly to a 0-1 knapsack where the weight is the memory to keep the
-// build rows and hash table and the value is the expected IO savings.
-// For now, we go with a greedy solution.
-//
-// TODO: implement the knapsack solution.
-Status PartitionedHashJoinNode::BuildHashTables(RuntimeState* state) {
- DCHECK_EQ(hash_partitions_.size(), PARTITION_FANOUT);
-
- // First loop over the partitions and build hash tables for the partitions that did
- // not already spill.
- for (Partition* partition: hash_partitions_) {
- if (partition->build_rows()->num_rows() == 0) {
- // This partition is empty, no need to do anything else.
- partition->Close(NULL);
- continue;
- }
-
- if (!partition->is_spilled()) {
- bool built = false;
- DCHECK(partition->build_rows()->is_pinned());
- RETURN_IF_ERROR(partition->BuildHashTable(state, &built));
- // If we did not have enough memory to build this hash table, we need to spill this
- // partition (clean up the hash table, unpin build).
- if (!built) RETURN_IF_ERROR(partition->Spill(true));
- }
- }
+Status PartitionedHashJoinNode::PrepareForProbe() {
+ DCHECK_EQ(builder_->num_hash_partitions(), PARTITION_FANOUT);
+ DCHECK(probe_hash_partitions_.empty());
- // Collect all the spilled partitions that don't have an IO buffer. We need to reserve
- // an IO buffer for those partitions. Reserving an IO buffer can cause more partitions
- // to spill so this process is recursive.
- list<Partition*> spilled_partitions;
- for (Partition* partition: hash_partitions_) {
- if (partition->is_closed()) continue;
- if (partition->is_spilled() && partition->probe_rows()->using_small_buffers()) {
- spilled_partitions.push_back(partition);
- }
- }
- while (!spilled_partitions.empty()) {
- Partition* partition = spilled_partitions.front();
- spilled_partitions.pop_front();
-
- while (true) {
- bool got_buffer;
- RETURN_IF_ERROR(partition->probe_rows()->SwitchToIoBuffers(&got_buffer));
- if (got_buffer) break;
- Partition* spilled_partition;
- RETURN_IF_ERROR(SpillPartition(&spilled_partition));
- DCHECK(spilled_partition->is_spilled());
- if (spilled_partition->probe_rows()->using_small_buffers()) {
- spilled_partitions.push_back(spilled_partition);
- }
- }
+ // Initialize the probe partitions, providing them with probe streams.
+ vector<unique_ptr<BufferedTupleStream>> probe_streams = builder_->TransferProbeStreams();
+ probe_hash_partitions_.resize(PARTITION_FANOUT);
+ for (int i = 0; i < PARTITION_FANOUT; ++i) {
+ PhjBuilder::Partition* build_partition = builder_->hash_partition(i);
+ if (build_partition->IsClosed() || !build_partition->is_spilled()) continue;
- DCHECK(partition->probe_rows()->has_write_block());
- DCHECK(!partition->probe_rows()->using_small_buffers());
+ DCHECK(!probe_streams.empty()) << "Builder should have created enough streams";
+ CreateProbePartition(i, std::move(probe_streams.back()));
+ probe_streams.pop_back();
}
+ DCHECK(probe_streams.empty()) << "Builder should not have created extra streams";
- // At this point, the partition is in one of these states:
- // 1. closed. All done, no buffers in either the build or probe stream.
- // 2. in_mem. The build side is pinned and has a hash table built.
- // 3. spilled. The build side is fully unpinned and the probe side has an io
- // sized buffer.
- for (Partition* partition: hash_partitions_) {
- if (partition->hash_tbl() != NULL) partition->probe_rows()->Close();
+ // Initialize the hash_tbl_ caching array.
+ for (int i = 0; i < PARTITION_FANOUT; ++i) {
+ hash_tbls_[i] = builder_->hash_partition(i)->hash_tbl();
}
- // TODO: at this point we could have freed enough memory to pin and build some
- // spilled partitions. This can happen, for example is there is a lot of skew.
- // Partition 1: 10GB (pinned initially).
- // Partition 2,3,4: 1GB (spilled during partitioning the build).
- // In the previous step, we could have unpinned 10GB (because there was not enough
- // memory to build a hash table over it) which can now free enough memory to
- // build hash tables over the remaining 3 partitions.
- // We start by spilling the largest partition though so the build input would have
- // to be pretty pathological.
- // Investigate if this is worthwhile.
-
- // Initialize the hash_tbl_ caching array.
+ // Validate the state of the partitions.
for (int i = 0; i < PARTITION_FANOUT; ++i) {
- hash_tbls_[i] = hash_partitions_[i]->hash_tbl();
+ PhjBuilder::Partition* build_partition = builder_->hash_partition(i);
+ ProbePartition* probe_partition = probe_hash_partitions_[i].get();
+ if (build_partition->IsClosed()) {
+ DCHECK(hash_tbls_[i] == NULL);
+ DCHECK(probe_partition == NULL);
+ } else if (build_partition->is_spilled()) {
+ DCHECK(hash_tbls_[i] == NULL);
+ DCHECK(probe_partition != NULL);
+ } else {
+ DCHECK(hash_tbls_[i] != NULL);
+ DCHECK(probe_partition == NULL);
+ }
}
return Status::OK();
}
+void PartitionedHashJoinNode::CreateProbePartition(
+ int partition_idx, unique_ptr<BufferedTupleStream> probe_rows) {
+ DCHECK_GE(partition_idx, 0);
+ DCHECK_LT(partition_idx, probe_hash_partitions_.size());
+ DCHECK(probe_hash_partitions_[partition_idx] == NULL);
+ probe_hash_partitions_[partition_idx] = std::make_unique<ProbePartition>(runtime_state_,
+ this, builder_->hash_partition(partition_idx), std::move(probe_rows));
+}
+
Status PartitionedHashJoinNode::EvaluateNullProbe(BufferedTupleStream* build) {
if (null_probe_rows_ == NULL || null_probe_rows_->num_rows() == 0) {
return Status::OK();
@@ -1411,48 +946,59 @@ Status PartitionedHashJoinNode::CleanUpHashPartitions(RowBatch* batch) {
// add them to the list of partitions that need to output any unmatched build rows.
// This partition will be closed by the function that actually outputs unmatched build
// rows.
- for (int i = 0; i < hash_partitions_.size(); ++i) {
- Partition* partition = hash_partitions_[i];
- if (partition->is_closed()) continue;
- if (partition->is_spilled()) {
- DCHECK(partition->hash_tbl() == NULL) << NodeDebugString();
- // Unpin the build and probe stream to free up more memory. We need to free all
- // memory so we can recurse the algorithm and create new hash partitions from
- // spilled partitions.
- RETURN_IF_ERROR(partition->build_rows()->UnpinStream(true));
- RETURN_IF_ERROR(partition->probe_rows()->UnpinStream(true));
-
- // Push new created partitions at the front. This means a depth first walk
+ DCHECK_EQ(builder_->num_hash_partitions(), PARTITION_FANOUT);
+ DCHECK_EQ(probe_hash_partitions_.size(), PARTITION_FANOUT);
+ for (int i = 0; i < PARTITION_FANOUT; ++i) {
+ ProbePartition* probe_partition = probe_hash_partitions_[i].get();
+ PhjBuilder::Partition* build_partition = builder_->hash_partition(i);
+ if (build_partition->IsClosed()) {
+ DCHECK(probe_partition == NULL);
+ continue;
+ }
+ if (build_partition->is_spilled()) {
+ DCHECK(probe_partition != NULL);
+ DCHECK(build_partition->hash_tbl() == NULL) << NodeDebugString();
+ // Unpin the probe stream to free up more memory. We need to free all memory so we
+ // can recurse the algorithm and create new hash partitions from spilled partitions.
+ // TODO: we shouldn't need to unpin the build stream if we stop spilling
+ // while probing.
+ RETURN_IF_ERROR(
+ build_partition->build_rows()->UnpinStream(BufferedTupleStream::UNPIN_ALL));
+ DCHECK_EQ(build_partition->build_rows()->blocks_pinned(), 0);
+ RETURN_IF_ERROR(
+ probe_partition->probe_rows()->UnpinStream(BufferedTupleStream::UNPIN_ALL));
+
+ // Push newly created partitions at the front. This means a depth first walk
// (more finely partitioned partitions are processed first). This allows us
// to delete blocks earlier and bottom out the recursion earlier.
- spilled_partitions_.push_front(partition);
+ spilled_partitions_.push_front(std::move(probe_hash_partitions_[i]));
} else {
- DCHECK_EQ(partition->probe_rows()->num_rows(), 0)
- << "No probe rows should have been spilled for this partition.";
- if (join_op_ == TJoinOp::RIGHT_OUTER_JOIN || join_op_ == TJoinOp::RIGHT_ANTI_JOIN ||
- join_op_ == TJoinOp::FULL_OUTER_JOIN) {
+ DCHECK(probe_partition == NULL);
+ if (NeedToProcessUnmatchedBuildRows()) {
if (output_build_partitions_.empty()) {
- hash_tbl_iterator_ = partition->hash_tbl_->FirstUnmatched(ht_ctx_.get());
+ hash_tbl_iterator_ = build_partition->hash_tbl()->FirstUnmatched(ht_ctx_.get());
}
- output_build_partitions_.push_back(partition);
+ output_build_partitions_.push_back(build_partition);
} else if (join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
// For NAAJ, we need to try to match all the NULL probe rows with this partition
// before closing it. The NULL probe rows could have come from any partition
// so we collect them all and match them at the end.
- RETURN_IF_ERROR(EvaluateNullProbe(partition->build_rows()));
- partition->Close(batch);
+ RETURN_IF_ERROR(EvaluateNullProbe(build_partition->build_rows()));
+ build_partition->Close(batch);
} else {
- partition->Close(batch);
+ build_partition->Close(batch);
}
}
}
// Just finished evaluating the null probe rows with all the non-spilled build
// partitions. Unpin this now to free this memory for repartitioning.
- if (null_probe_rows_ != NULL) RETURN_IF_ERROR(null_probe_rows_->UnpinStream());
+ if (null_probe_rows_ != NULL)
+ RETURN_IF_ERROR(
+ null_probe_rows_->UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT));
- hash_partitions_.clear();
- input_partition_ = NULL;
+ builder_->ClearHashPartitions();
+ probe_hash_partitions_.clear();
return Status::OK();
}
@@ -1465,17 +1011,30 @@ void PartitionedHashJoinNode::AddToDebugString(int indent, stringstream* out) co
*out << ")";
}
-void PartitionedHashJoinNode::UpdateState(HashJoinState s) {
- state_ = s;
+void PartitionedHashJoinNode::UpdateState(HashJoinState next_state) {
+ // Validate the state transition.
+ switch (state_) {
+ case PARTITIONING_BUILD: DCHECK_EQ(next_state, PARTITIONING_PROBE); break;
+ case PARTITIONING_PROBE:
+ case REPARTITIONING_PROBE:
+ case PROBING_SPILLED_PARTITION:
+ DCHECK(
+ next_state == REPARTITIONING_BUILD || next_state == PROBING_SPILLED_PARTITION);
+ break;
+ case REPARTITIONING_BUILD: DCHECK_EQ(next_state, REPARTITIONING_PROBE); break;
+ default: DCHECK(false) << "Invalid state " << state_;
+ }
+ state_ = next_state;
VLOG(2) << "Transitioned State:" << endl << NodeDebugString();
}
string PartitionedHashJoinNode::PrintState() const {
switch (state_) {
case PARTITIONING_BUILD: return "PartitioningBuild";
- case PROCESSING_PROBE: return "ProcessingProbe";
- case PROBING_SPILLED_PARTITION: return "ProbingSpilledPartitions";
- case REPARTITIONING: return "Repartitioning";
+ case PARTITIONING_PROBE: return "PartitioningProbe";
+ case PROBING_SPILLED_PARTITION: return "ProbingSpilledPartition";
+ case REPARTITIONING_BUILD: return "RepartitioningBuild";
+ case REPARTITIONING_PROBE: return "RepartitioningProbe";
default: DCHECK(false);
}
return "";
@@ -1485,58 +1044,41 @@ string PartitionedHashJoinNode::NodeDebugString() const {
stringstream ss;
ss << "PartitionedHashJoinNode (id=" << id() << " op=" << join_op_
<< " state=" << PrintState()
- << " #partitions=" << hash_partitions_.size()
- << " #spilled_partitions=" << spilled_partitions_.size()
- << ")" << endl;
-
- for (int i = 0; i < hash_partitions_.size(); ++i) {
- Partition* partition = hash_partitions_[i];
- ss << i << ": ptr=" << partition;
- DCHECK(partition != NULL);
- if (partition->is_closed()) {
- ss << " Closed" << endl;
- continue;
- }
- if (partition->is_spilled()) {
- ss << " Spilled" << endl;
- }
- DCHECK(partition->build_rows() != NULL);
- DCHECK(partition->probe_rows() != NULL);
- ss << endl
- << " Build Rows: " << partition->build_rows()->num_rows()
- << " (Blocks pinned: " << partition->build_rows()->blocks_pinned() << ")"
- << endl;
- ss << " Probe Rows: " << partition->probe_rows()->num_rows()
- << " (Blocks pinned: " << partition->probe_rows()->blocks_pinned() << ")"
+ << " #hash_partitions=" << builder_->num_hash_partitions()
+ << " #spilled_partitions=" << spilled_partitions_.size() << ")" << endl;
+
+ ss << "PhjBuilder: " << builder_->DebugString();
+
+ ss << "Probe hash partitions: " << probe_hash_partitions_.size() << ":" << endl;
+ for (int i = 0; i < probe_hash_partitions_.size(); ++i) {
+ ProbePartition* probe_partition = probe_hash_partitions_[i].get();
+ ss << " Probe hash partition " << i << ": probe ptr=" << probe_partition
+ << " Probe Rows: " << probe_partition->probe_rows()->num_rows()
+ << " (Blocks pinned: " << probe_partition->probe_rows()->blocks_pinned() << ")"
<< endl;
- if (partition->hash_tbl() != NULL) {
- ss << " Hash Table Rows: " << partition->hash_tbl()->size() << endl;
- }
}
if (!spilled_partitions_.empty()) {
ss << "SpilledPartitions" << endl;
- for (list<Partition*>::const_iterator it = spilled_partitions_.begin();
- it != spilled_partitions_.end(); ++it) {
- DCHECK((*it)->is_spilled());
- DCHECK((*it)->hash_tbl() == NULL);
- DCHECK((*it)->build_rows() != NULL);
- DCHECK((*it)->probe_rows() != NULL);
- ss << " Partition=" << *it << endl
- << " Spilled Build Rows: "
- << (*it)->build_rows()->num_rows() << endl
- << " Spilled Probe Rows: "
- << (*it)->probe_rows()->num_rows() << endl;
+ for (const unique_ptr<ProbePartition>& probe_partition : spilled_partitions_) {
+ PhjBuilder::Partition* build_partition = probe_partition->build_partition();
+ DCHECK(build_partition->is_spilled());
+ DCHECK(build_partition->hash_tbl() == NULL);
+ DCHECK(build_partition->build_rows() != NULL);
+ DCHECK(probe_partition->probe_rows() != NULL);
+ ss << " Partition=" << probe_partition.get() << endl
+ << " Spilled Build Rows: " << build_partition->build_rows()->num_rows() << endl
+ << " Spilled Probe Rows: " << probe_partition->probe_rows()->num_rows()
+ << endl;
}
}
if (input_partition_ != NULL) {
- DCHECK(input_partition_->build_rows() != NULL);
+ DCHECK(input_partition_->build_partition()->build_rows() != NULL);
DCHECK(input_partition_->probe_rows() != NULL);
- ss << "InputPartition: " << input_partition_ << endl
+ ss << "InputPartition: " << input_partition_.get() << endl
<< " Spilled Build Rows: "
- << input_partition_->build_rows()->num_rows() << endl
- << " Spilled Probe Rows: "
- << input_partition_->probe_rows()->num_rows() << endl;
+ << input_partition_->build_partition()->build_rows()->num_rows() << endl
+ << " Spilled Probe Rows: " << input_partition_->probe_rows()->num_rows() << endl;
} else {
ss << "InputPartition: NULL" << endl;
}
@@ -1648,81 +1190,14 @@ Status PartitionedHashJoinNode::CodegenCreateOutputRow(LlvmCodeGen* codegen,
return Status::OK();
}
-Status PartitionedHashJoinNode::CodegenProcessBuildBatch(RuntimeState* state,
- Function* hash_fn, Function* murmur_hash_fn, Function* eval_row_fn) {
- LlvmCodeGen* codegen;
- RETURN_IF_ERROR(state->GetCodegen(&codegen));
-
- Function* process_build_batch_fn =
- codegen->GetFunction(IRFunction::PHJ_PROCESS_BUILD_BATCH, true);
- DCHECK(process_build_batch_fn != NULL);
-
- // Replace call sites
- int replaced = codegen->ReplaceCallSites(process_build_batch_fn, eval_row_fn,
- "EvalBuildRow");
- DCHECK_EQ(replaced, 1);
-
- // Replace some hash table parameters with constants.
- HashTableCtx::HashTableReplacedConstants replaced_constants;
- const bool stores_duplicates = true;
- const int num_build_tuples = child(1)->row_desc().tuple_descriptors().size();
- RETURN_IF_ERROR(ht_ctx_->ReplaceHashTableConstants(state, stores_duplicates,
- num_build_tuples, process_build_batch_fn, &replaced_constants));
- DCHECK_GE(replaced_constants.stores_nulls, 1);
- DCHECK_EQ(replaced_constants.finds_some_nulls, 0);
- DCHECK_EQ(replaced_constants.stores_duplicates, 0);
- DCHECK_EQ(replaced_constants.stores_tuples, 0);
- DCHECK_EQ(replaced_constants.quadratic_probing, 0);
-
- Function* process_build_batch_fn_level0 =
- codegen->CloneFunction(process_build_batch_fn);
-
- // Always build runtime filters at level0 (if there are any).
- // Note that the first argument of this function is the return value.
- Value* build_filters_l0_arg = codegen->GetArgument(process_build_batch_fn_level0, 3);
- build_filters_l0_arg->replaceAllUsesWith(
- ConstantInt::get(Type::getInt1Ty(codegen->context()), filters_.size() > 0));
-
- // process_build_batch_fn_level0 uses CRC hash if available,
- replaced = codegen->ReplaceCallSites(process_build_batch_fn_level0, hash_fn, "HashRow");
- DCHECK_EQ(replaced, 1);
-
- // process_build_batch_fn uses murmur
- replaced = codegen->ReplaceCallSites(process_build_batch_fn, murmur_hash_fn, "HashRow");
- DCHECK_EQ(replaced, 1);
-
- // Never build filters after repartitioning, as all rows have already been added to the
- // filters during the level0 build. Note that the first argument of this function is the
- // return value.
- Value* build_filters_arg = codegen->GetArgument(process_build_batch_fn, 3);
- build_filters_arg->replaceAllUsesWith(
- ConstantInt::get(Type::getInt1Ty(codegen->context()), false));
-
- // Finalize ProcessBuildBatch functions
- process_build_batch_fn = codegen->FinalizeFunction(process_build_batch_fn);
- if (process_build_batch_fn == NULL) {
- return Status("Codegen'd PartitionedHashJoinNode::ProcessBuildBatch() function "
- "failed verification, see log");
- }
- process_build_batch_fn_level0 =
- codegen->FinalizeFunction(process_build_batch_fn_level0);
- if (process_build_batch_fn == NULL) {
- return Status("Codegen'd level-zero PartitionedHashJoinNode::ProcessBuildBatch() "
- "function failed verification, see log");
- }
-
- // Register native function pointers
- codegen->AddFunctionToJit(process_build_batch_fn,
- reinterpret_cast<void**>(&process_build_batch_fn_));
- codegen->AddFunctionToJit(process_build_batch_fn_level0,
- reinterpret_cast<void**>(&process_build_batch_fn_level0_));
- return Status::OK();
-}
-
-Status PartitionedHashJoinNode::CodegenProcessProbeBatch(
- RuntimeState* state, Function* hash_fn, Function* murmur_hash_fn) {
+Status PartitionedHashJoinNode::CodegenProcessProbeBatch(RuntimeState* state) {
LlvmCodeGen* codegen;
RETURN_IF_ERROR(state->GetCodegen(&codegen));
+ // Codegen for hashing rows
+ Function* hash_fn;
+ Function* murmur_hash_fn;
+ RETURN_IF_ERROR(ht_ctx_->CodegenHashRow(state, false, &hash_fn));
+ RETURN_IF_ERROR(ht_ctx_->CodegenHashRow(state, true, &murmur_hash_fn));
// Get cross compiled function
IRFunction::Type ir_fn = IRFunction::FN_END;
@@ -1881,65 +1356,3 @@ Status PartitionedHashJoinNode::CodegenProcessProbeBatch(
reinterpret_cast<void**>(&process_probe_batch_fn_level0_));
return Status::OK();
}
-
-Status PartitionedHashJoinNode::CodegenInsertBatch(RuntimeState* state,
- Function* hash_fn, Function* murmur_hash_fn, Function* eval_row_fn) {
- LlvmCodeGen* codegen;
- RETURN_IF_ERROR(state->GetCodegen(&codegen));
-
- Function* insert_batch_fn = codegen->GetFunction(IRFunction::PHJ_INSERT_BATCH, true);
- Function* build_equals_fn;
- RETURN_IF_ERROR(ht_ctx_->CodegenEquals(state, true, &build_equals_fn));
-
- // Replace the parameter 'prefetch_mode' with constant.
- Value* prefetch_mode_arg = codegen->GetArgument(insert_batch_fn, 1);
- TPrefetchMode::type prefetch_mode = state->query_options().prefetch_mode;
- DCHECK_GE(prefetch_mode, TPrefetchMode::NONE);
- DCHECK_LE(prefetch_mode, TPrefetchMode::HT_BUCKET);
- prefetch_mode_arg->replaceAllUsesWith(
- ConstantInt::get(Type::getInt32Ty(codegen->context()), prefetch_mode));
-
- // Use codegen'd EvalBuildRow() function
- int replaced = codegen->ReplaceCallSites(insert_batch_fn, eval_row_fn, "EvalBuildRow");
- DCHECK_EQ(replaced, 1);
-
- // Use codegen'd Equals() function
- replaced = codegen->ReplaceCallSites(insert_batch_fn, build_equals_fn, "Equals");
- DCHECK_EQ(replaced, 1);
-
- // Replace hash-table parameters with constants.
- HashTableCtx::HashTableReplacedConstants replaced_constants;
- const bool stores_duplicates = true;
- const int num_build_tuples = child(1)->row_desc().tuple_descriptors().size();
- RETURN_IF_ERROR(ht_ctx_->ReplaceHashTableConstants(state, stores_duplicates,
- num_build_tuples, insert_batch_fn, &replaced_constants));
- DCHECK_GE(replaced_constants.stores_nulls, 1);
- DCHECK_EQ(replaced_constants.finds_some_nulls, 0);
- DCHECK_GE(replaced_constants.stores_duplicates, 1);
- DCHECK_GE(replaced_constants.stores_tuples, 1);
- DCHECK_GE(replaced_constants.quadratic_probing, 1);
-
- Function* insert_batch_fn_level0 = codegen->CloneFunction(insert_batch_fn);
-
- // Use codegen'd hash functions
- replaced = codegen->ReplaceCallSites(insert_batch_fn_level0, hash_fn, "HashRow");
- DCHECK_EQ(replaced, 1);
- replaced = codegen->ReplaceCallSites(insert_batch_fn, murmur_hash_fn, "HashRow");
- DCHECK_EQ(replaced, 1);
-
- insert_batch_fn = codegen->FinalizeFunction(insert_batch_fn);
- if (insert_batch_fn == NULL) {
- return Status("PartitionedHashJoinNode::CodegenInsertBatch(): codegen'd "
- "InsertBatch() function failed verification, see log");
- }
- insert_batch_fn_level0 = codegen->FinalizeFunction(insert_batch_fn_level0);
- if (insert_batch_fn_level0 == NULL) {
- return Status("PartitionedHashJoinNode::CodegenInsertBatch(): codegen'd zero-level "
- "InsertBatch() function failed verification, see log");
- }
-
- codegen->AddFunctionToJit(insert_batch_fn, reinterpret_cast<void**>(&insert_batch_fn_));
- codegen->AddFunctionToJit(insert_batch_fn_level0,
- reinterpret_cast<void**>(&insert_batch_fn_level0_));
- return Status::OK();
-}