You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2016/09/29 00:32:35 UTC
[3/6] incubator-impala git commit: IMPALA-3567 Part 2,
IMPALA-3899: factor out PHJ builder
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1ccd83b2/be/src/exec/partitioned-hash-join-builder.h
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-builder.h b/be/src/exec/partitioned-hash-join-builder.h
new file mode 100644
index 0000000..23822b2
--- /dev/null
+++ b/be/src/exec/partitioned-hash-join-builder.h
@@ -0,0 +1,460 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_EXEC_PARTITIONED_HASH_JOIN_BUILDER_H
+#define IMPALA_EXEC_PARTITIONED_HASH_JOIN_BUILDER_H
+
+#include <boost/scoped_ptr.hpp>
+#include <memory>
+
+#include "common/object-pool.h"
+#include "common/status.h"
+#include "exec/data-sink.h"
+#include "exec/filter-context.h"
+#include "exec/hash-table.h"
+#include "runtime/buffered-block-mgr.h"
+#include "runtime/buffered-tuple-stream.h"
+
+#include "gen-cpp/PlanNodes_types.h"
+
+namespace impala {
+
+class ExprContext;
+class RowDescriptor;
+class RuntimeState;
+
+/// The build side for the PartitionedHashJoinNode. Build-side rows are hash-partitioned
+/// into PARTITION_FANOUT partitions, with partitions spilled if the full build side
+/// does not fit in memory. Spilled partitions can be repartitioned with a different
+/// hash function per level of repartitioning.
+///
+/// The builder owns the hash tables and build row streams. The builder first does the
+/// level 0 partitioning of build rows. After FlushFinal() the builder has produced some
+/// in-memory partitions and some spilled partitions. The in-memory partitions have hash
+/// tables and the spilled partitions have a probe-side stream prepared with one write
+/// buffer, which is sufficient to spill the partition's probe rows to disk without
+/// allocating additional buffers.
+///
+/// After this initial partitioning, the join node probes the in-memory hash partitions.
+/// The join node then drives processing of any spilled partitions, calling
+/// Partition::BuildHashTable() to build hash tables for a spilled partition or calling
+/// RepartitionBuildInput() to repartition a level n partition into multiple level n + 1
+/// partitions.
+///
+/// Both the PartitionedHashJoinNode and the builder share a BufferedBlockMgr client
+/// and the corresponding reservations. Different stages of the spilling algorithm
+/// require different mixes of build and probe buffers and hash tables, so we can
+/// share the reservation to minimize the combined memory requirement. Initial probe-side
+/// buffers are allocated in the builder then handed off to the probe side to implement
+/// this reservation sharing.
+///
+/// TODO: after we have reliable reservations (IMPALA-3200), we can simplify the handoff
+/// to the probe side by using reservations instead of preparing the streams.
+///
+/// The full hash join algorithm is documented in PartitionedHashJoinNode.
+class PhjBuilder : public DataSink {
+ public:
+ class Partition;
+
+ PhjBuilder(int join_node_id, TJoinOp::type join_op, const RowDescriptor& probe_row_desc,
+ const RowDescriptor& build_row_desc, RuntimeState* state);
+
+ Status Init(RuntimeState* state, const std::vector<TEqJoinCondition>& eq_join_conjuncts,
+ const std::vector<TRuntimeFilterDesc>& filters);
+
+ /// Implementations of DataSink interface methods.
+ virtual std::string GetName() override;
+ virtual Status Prepare(RuntimeState* state, MemTracker* mem_tracker) override;
+ virtual Status Open(RuntimeState* state) override;
+ virtual Status Send(RuntimeState* state, RowBatch* batch) override;
+ virtual Status FlushFinal(RuntimeState* state) override;
+ virtual void Close(RuntimeState* state) override;
+
+ /////////////////////////////////////////
+ // The following functions are used only by PartitionedHashJoinNode.
+ /////////////////////////////////////////
+
+ /// Reset the builder to the same state as it was in after calling Open().
+ void Reset();
+
+ /// Transfer ownership of the probe streams to the caller. One stream was allocated per
+ /// spilled partition in FlushFinal(). The probe streams are empty but prepared for
+ /// writing with a write buffer allocated.
+ std::vector<std::unique_ptr<BufferedTupleStream>> TransferProbeStreams();
+
+ /// Clears the current list of hash partitions. Called after probing of the partitions
+ /// is done. The partitions are not closed or destroyed, since they may be spilled or
+ /// may contain unmatched build rows for certain join modes (e.g. right outer join).
+ void ClearHashPartitions() { hash_partitions_.clear(); }
+
+ /// Close the null aware partition (if there is one) and set it to NULL.
+ void CloseNullAwarePartition(RowBatch* out_batch) {
+ if (null_aware_partition_ != NULL) {
+ null_aware_partition_->Close(out_batch);
+ null_aware_partition_ = NULL;
+ }
+ }
+
+ /// Creates new hash partitions and repartitions 'input_partition'. The previous
+ /// hash partitions must have been cleared with ClearHashPartitions().
+ /// 'level' is the level new partitions should be created with. This functions prepares
+ /// 'input_probe_rows' for reading in "delete_on_read" mode, so that the probe phase
+ /// has enough buffers preallocated to execute successfully.
+ Status RepartitionBuildInput(
+ Partition* input_partition, int level, BufferedTupleStream* input_probe_rows);
+
+ /// Returns the largest build row count out of the current hash partitions.
+ int64_t LargestPartitionRows() const;
+
+ /// True if the hash table may contain rows with one or more NULL join keys. This
+ /// depends on the join type and the equijoin conjuncts.
+ bool HashTableStoresNulls() const;
+
+ /// Accessor functions, mainly required to expose state to PartitionedHashJoinNode.
+ inline BufferedBlockMgr::Client* block_mgr_client() const { return block_mgr_client_; }
+ inline bool non_empty_build() const { return non_empty_build_; }
+ inline const std::vector<bool>& is_not_distinct_from() const {
+ return is_not_distinct_from_;
+ }
+ inline int num_hash_partitions() const { return hash_partitions_.size(); }
+ inline Partition* hash_partition(int partition_idx) const {
+ DCHECK_GE(partition_idx, 0);
+ DCHECK_LT(partition_idx, hash_partitions_.size());
+ return hash_partitions_[partition_idx];
+ }
+ inline Partition* null_aware_partition() const { return null_aware_partition_; }
+
+ std::string DebugString() const;
+
+ /// Number of initial partitions to create. Must be a power of two.
+ static const int PARTITION_FANOUT = 16;
+
+ /// Needs to be log2(PARTITION_FANOUT).
+ static const int NUM_PARTITIONING_BITS = 4;
+
+ /// Maximum number of times we will repartition. The maximum build table we
+ /// can process is:
+ /// MEM_LIMIT * (PARTITION_FANOUT ^ MAX_PARTITION_DEPTH). With a (low) 1GB
+ /// limit and 64 fanout, we can support 256TB build tables in the case where
+ /// there is no skew.
+ /// In the case where there is skew, repartitioning is unlikely to help (assuming a
+ /// reasonable hash function).
+ /// Note that we need to have at least as many SEED_PRIMES in HashTableCtx.
+ /// TODO: we can revisit and try harder to explicitly detect skew.
+ static const int MAX_PARTITION_DEPTH = 16;
+
+ /// A partition containing a subset of build rows.
+ ///
+ /// A partition may contain two data structures: the build rows and optionally a hash
+ /// table built over the build rows. Building the hash table requires all build rows to
+ /// be pinned in memory. The build rows are kept in memory if all partitions fit in
+ /// memory, but can be unpinned and spilled to disk to free up memory. Reading or
+ /// writing the unpinned rows requires a single read or write buffer. If the unpinned
+ /// rows are not being read or written, they can be completely unpinned, requiring no
+ /// buffers.
+ ///
+ /// The build input is first partitioned by hash function level 0 into the level 0
+ /// partitions. Then, if the join spills and the size of a level n partition is too
+ /// large to fit in memory, the partition's rows can be repartitioned with the level
+ /// n + 1 hash function into the level n + 1 partitions.
+ class Partition {
+ public:
+ Partition(RuntimeState* state, PhjBuilder* parent, int level);
+ ~Partition();
+
+ /// Close the partition and attach resources to 'batch' if non-NULL or free the
+ /// resources if 'batch' is NULL. Idempotent.
+ void Close(RowBatch* batch);
+
+ /// Returns the estimated byte size of the in-memory data structures for this
+ /// partition. This includes all build rows and the hash table.
+ int64_t EstimatedInMemSize() const;
+
+ /// Pins the build tuples for this partition and constructs the hash table from it.
+ /// Build rows cannot be added after calling this. If the build rows could not be
+ /// pinned or the hash table could not be built due to memory pressure, sets *built
+ /// to false and returns OK. Returns an error status if any other error is
+ /// encountered.
+ Status BuildHashTable(bool* built);
+
+ /// Spills this partition, the partition's stream is unpinned with 'mode' and
+ /// its hash table is destroyed if it was built.
+ Status Spill(BufferedTupleStream::UnpinMode mode);
+
+ bool ALWAYS_INLINE IsClosed() const { return build_rows_ == NULL; }
+ BufferedTupleStream* ALWAYS_INLINE build_rows() { return build_rows_.get(); }
+ HashTable* ALWAYS_INLINE hash_tbl() const { return hash_tbl_.get(); }
+ bool ALWAYS_INLINE is_spilled() const { return is_spilled_; }
+ int ALWAYS_INLINE level() const { return level_; }
+
+ private:
+ /// Inserts each row in 'batch' into 'hash_tbl_' using 'ctx'. 'indices' is an array
+ /// containing the index of each row's index into the hash table's tuple stream.
+ /// 'prefetch_mode' is the prefetching mode in use. If it's not PREFETCH_NONE, hash
+ /// table buckets which the rows hashes to will be prefetched. This parameter is
+ /// replaced with a constant during codegen time. This function may be replaced with
+ /// a codegen'd version. Returns true if all rows in 'batch' are successfully
+ /// inserted.
+ bool InsertBatch(TPrefetchMode::type prefetch_mode, HashTableCtx* ctx,
+ RowBatch* batch, const std::vector<BufferedTupleStream::RowIdx>& indices);
+
+ const PhjBuilder* parent_;
+
+ /// True if this partition is spilled.
+ bool is_spilled_;
+
+ /// How many times rows in this partition have been repartitioned. Partitions created
+ /// from the node's children's input is level 0, 1 after the first repartitioning,
+ /// etc.
+ const int level_;
+
+ /// The hash table for this partition.
+ boost::scoped_ptr<HashTable> hash_tbl_;
+
+ /// Stream of build tuples in this partition. Initially owned by this object but
+ /// transferred to the parent exec node (via the row batch) when the partition
+ /// is closed. If NULL, ownership has been transferred and the partition is closed.
+ std::unique_ptr<BufferedTupleStream> build_rows_;
+ };
+
+ private:
+ /// Computes the minimum number of buffers required to execute the spilling partitioned
+ /// hash algorithm successfully for any input size (assuming enough disk space is
+ /// available for spilled rows). The buffers are used for buffering both build and
+ /// probe rows at different times, so the total requirement is the peak sum of build
+ /// and probe buffers required.
+ /// We need one output buffer per partition to partition the build or probe side. We
+ /// need one additional buffer for the input while repartitioning the build or probe.
+ /// For NAAJ, we need 3 additional buffers for 'null_aware_partition_',
+ /// 'null_aware_probe_partition_' and 'null_probe_rows_'.
+ int MinRequiredBuffers() const {
+ int num_reserved_buffers = PARTITION_FANOUT + 1;
+ if (join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) num_reserved_buffers += 3;
+ return num_reserved_buffers;
+ }
+
+ /// Create and initialize a set of hash partitions for partitioning level 'level'.
+ /// The previous hash partitions must have been cleared with ClearHashPartitions().
+ /// After calling this, batches are added to the new partitions by calling Send().
+ Status CreateHashPartitions(int level);
+
+ /// Create a new partition in 'all_partitions_' and prepare it for writing.
+ Status CreateAndPreparePartition(int level, Partition** partition);
+
+ /// Reads the rows in build_batch and partitions them into hash_partitions_. If
+ /// 'build_filters' is true, runtime filters are populated.
+ Status ProcessBuildBatch(RowBatch* build_batch, HashTableCtx* ctx, bool build_filters);
+
+ /// Append 'row' to 'stream'. In the common case, appending the row to the stream
+ /// immediately succeeds. Otherwise this function falls back to the slower path of
+ /// AppendRowStreamFull(), which may spill partitions to free memory. Returns an error
+ /// if it was unable to append the row, even after spilling partitions.
+ Status AppendRow(BufferedTupleStream* stream, TupleRow* row);
+
+ /// Slow path for AppendRow() above. It is called when the stream has failed to append
+ /// the row. We need to find more memory by either switching to IO-buffers, in case the
+ /// stream still uses small buffers, or spilling a partition.
+ Status AppendRowStreamFull(BufferedTupleStream* stream, TupleRow* row);
+
+ /// Frees memory by spilling one of the hash partitions. The 'mode' argument is passed
+ /// to the Spill() call for the selected partition. The current policy is to spill the
+ /// largest partition. Returns non-ok status if we couldn't spill a partition.
+ Status SpillPartition(BufferedTupleStream::UnpinMode mode);
+
+ /// Tries to build hash tables for all unspilled hash partitions. Called after
+ /// FlushFinal() when all build rows have been partitioned and added to the appropriate
+ /// streams. If the hash table could not be built for a partition, the partition is
+ /// spilled (with all build blocks unpinned) and the probe stream is prepared for
+ /// writing (i.e. has an initial probe buffer allocated).
+ ///
+ /// When this function returns successfully, each partition is in one of these states:
+ /// 1. closed. No probe partition is created and the build partition is closed.
+ /// 2. in-memory. The build rows are pinned and has a hash table built. No probe
+ /// partition is created.
+ /// 3. spilled. The build rows are fully unpinned and the probe stream is prepared.
+ Status BuildHashTablesAndPrepareProbeStreams();
+
+ /// Ensures that 'spilled_partition_probe_streams_' has a stream per spilled partition
+ /// in 'hash_partitions_'. May spill additional partitions until it can create enough
+ /// probe streams with write buffers. Returns an error if an error is encountered or
+ /// if it runs out of partitions to spill.
+ Status InitSpilledPartitionProbeStreams();
+
+ /// Calls Close() on every Partition, deletes them, and cleans up any pointers that
+ /// may reference them. Also cleans up 'spilled_partition_probe_streams_'.
+ void CloseAndDeletePartitions();
+
+ /// For each filter in filters_, allocate a bloom_filter from the fragment-local
+ /// RuntimeFilterBank and store it in runtime_filters_ to populate during the build
+ /// phase.
+ void AllocateRuntimeFilters();
+
+ /// Publish the runtime filters to the fragment-local RuntimeFilterBank.
+ /// 'num_build_rows' is used to determine whether the computed filters have an
+ /// unacceptably high false-positive rate.
+ void PublishRuntimeFilters(int64_t num_build_rows);
+
+ /// Does all codegen for the builder (if codegen is enabled).
+ /// Updates the the builder's runtime profile with info about whether the codegen was
+ /// enabled and whether any errors occured during codegen.
+ void Codegen(RuntimeState* state);
+
+ /// Codegen processing build batches. Identical signature to ProcessBuildBatch().
+ /// Returns non-OK status if codegen was not possible.
+ Status CodegenProcessBuildBatch(llvm::Function* hash_fn, llvm::Function* murmur_hash_fn,
+ llvm::Function* eval_row_fn);
+
+ /// Codegen inserting batches into a partition's hash table. Identical signature to
+ /// Partition::InsertBatch(). Returns non-OK if codegen was not possible.
+ Status CodegenInsertBatch(llvm::Function* hash_fn, llvm::Function* murmur_hash_fn,
+ llvm::Function* eval_row_fn);
+
+ RuntimeState* const runtime_state_;
+
+ // The ID of the plan join node this is associated with.
+ // TODO: we may want to replace this with a sink ID once we progress further with
+ // multithreading.
+ const int join_node_id_;
+
+ /// The join operation this is building for.
+ const TJoinOp::type join_op_;
+
+ /// Descriptor for the probe rows, needed to initialize probe streams.
+ const RowDescriptor& probe_row_desc_;
+
+ /// Pool for objects with same lifetime as builder.
+ ObjectPool pool_;
+
+ /// Client to the buffered block mgr, used to allocate build partition buffers and hash
+ /// tables. When probing, the spilling algorithm keeps some build partitions in memory
+ /// while using memory for probe buffers for spilled partitions. To support dynamically
+ /// dividing memory between build and probe, this client is owned by the builder but
+ /// shared with the PartitionedHashJoinNode.
+ /// TODO: this approach to sharing will not work for spilling broadcast joins with a
+ /// 1:N relationship from builders to join nodes.
+ BufferedBlockMgr::Client* block_mgr_client_;
+
+ /// If true, the build side has at least one row.
+ bool non_empty_build_;
+
+ /// Expr contexts to free after partitioning or inserting each batch.
+ std::vector<ExprContext*> expr_ctxs_to_free_;
+
+ /// Expression contexts over input rows for hash table build.
+ std::vector<ExprContext*> build_expr_ctxs_;
+
+ /// is_not_distinct_from_[i] is true if and only if the ith equi-join predicate is IS
+ /// NOT DISTINCT FROM, rather than equality.
+ std::vector<bool> is_not_distinct_from_;
+
+ /// List of filters to build.
+ std::vector<FilterContext> filters_;
+
+ /// Used for hash-related functionality, such as evaluating rows and calculating hashes.
+ /// The level is set to the same level as 'hash_partitions_'.
+ boost::scoped_ptr<HashTableCtx> ht_ctx_;
+
+ /// Total number of partitions created.
+ RuntimeProfile::Counter* partitions_created_;
+
+ /// The largest fraction (of build side) after repartitioning. This is expected to be
+ /// 1 / PARTITION_FANOUT. A value much larger indicates skew.
+ RuntimeProfile::HighWaterMarkCounter* largest_partition_percent_;
+
+ /// Level of max partition (i.e. number of repartitioning steps).
+ RuntimeProfile::HighWaterMarkCounter* max_partition_level_;
+
+ /// Number of build rows that have been partitioned.
+ RuntimeProfile::Counter* num_build_rows_partitioned_;
+
+ /// Number of hash collisions - unequal rows that have identical hash values
+ RuntimeProfile::Counter* num_hash_collisions_;
+
+ /// Total number of hash buckets across all partitions.
+ RuntimeProfile::Counter* num_hash_buckets_;
+
+ /// Number of partitions that have been spilled.
+ RuntimeProfile::Counter* num_spilled_partitions_;
+
+ /// Number of partitions that have been repartitioned.
+ RuntimeProfile::Counter* num_repartitions_;
+
+ /// Time spent partitioning build rows.
+ RuntimeProfile::Counter* partition_build_rows_timer_;
+
+ /// Time spent building hash tables.
+ RuntimeProfile::Counter* build_hash_table_timer_;
+
+ /// Time spent repartitioning and building hash tables of any resulting partitions
+ /// that were not spilled.
+ RuntimeProfile::Counter* repartition_timer_;
+
+ /////////////////////////////////////////
+ /// BEGIN: Members that must be Reset()
+
+ /// Vector that owns all of the Partition objects.
+ std::vector<std::unique_ptr<Partition>> all_partitions_;
+
+ /// The current set of partitions that are being built or probed. This vector is
+ /// initialized before partitioning or re-partitioning the build input
+ /// and cleared after we've finished probing the partitions.
+ /// This is not used when processing a single spilled partition.
+ std::vector<Partition*> hash_partitions_;
+
+ /// Partition used for null-aware joins. This partition is always processed at the end
+ /// after all build and probe rows are processed. In this partition's 'build_rows_', we
+ /// store all the rows for which 'build_expr_ctxs_' evaluated over the row returns NULL
+ /// (i.e. it has a NULL on the eq join slot).
+ /// NULL if the join is not null aware or we are done processing this partition.
+ Partition* null_aware_partition_;
+
+ /// Populated during the hash table building phase if any partitions spilled.
+ /// One probe stream per spilled partition is prepared for writing so that the
+ /// initial write buffer is allocated.
+ ///
+ /// These streams are handed off to PartitionedHashJoinNode for use in buffering
+ /// spilled probe rows. The allocation is done in the builder so that it can divide
+ /// memory between the in-memory build partitions and write buffers based on the size
+ /// of the partitions and available memory. E.g. if all the partitions fit in memory, no
+ /// write buffers need to be allocated, but if some partitions are spilled, more build
+ /// partitions may be spilled to free up memory for write buffers.
+ ///
+ /// Because of this, at the end of the build phase, we always have sufficient memory
+ /// to execute the probe phase of the algorithm without spilling more partitions.
+ std::vector<std::unique_ptr<BufferedTupleStream>> spilled_partition_probe_streams_;
+
+ /// END: Members that must be Reset()
+ /////////////////////////////////////////
+
+ /// For the below codegen'd functions, xxx_fn_level0_ uses CRC hashing when available
+ /// and is used when the partition level is 0, otherwise xxx_fn_ uses murmur hash and is
+ /// used for subsequent levels.
+ typedef Status (*ProcessBuildBatchFn)(
+ PhjBuilder*, RowBatch*, HashTableCtx*, bool build_filters);
+ /// Jitted ProcessBuildBatch function pointers. NULL if codegen is disabled.
+ ProcessBuildBatchFn process_build_batch_fn_;
+ ProcessBuildBatchFn process_build_batch_fn_level0_;
+
+ typedef bool (*InsertBatchFn)(Partition*, TPrefetchMode::type, HashTableCtx*, RowBatch*,
+ const std::vector<BufferedTupleStream::RowIdx>&);
+ /// Jitted Partition::InsertBatch() function pointers. NULL if codegen is disabled.
+ InsertBatchFn insert_batch_fn_;
+ InsertBatchFn insert_batch_fn_level0_;
+};
+}
+
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1ccd83b2/be/src/exec/partitioned-hash-join-node-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-node-ir.cc b/be/src/exec/partitioned-hash-join-node-ir.cc
index bab1bf8..44cb14b 100644
--- a/be/src/exec/partitioned-hash-join-node-ir.cc
+++ b/be/src/exec/partitioned-hash-join-node-ir.cc
@@ -19,10 +19,7 @@
#include "codegen/impala-ir.h"
#include "exec/hash-table.inline.h"
-#include "runtime/raw-value.inline.h"
#include "runtime/row-batch.h"
-#include "runtime/runtime-filter.h"
-#include "util/bloom-filter.h"
#include "common/names.h"
@@ -151,12 +148,11 @@ bool IR_ALWAYS_INLINE PartitionedHashJoinNode::ProcessProbeRowLeftSemiJoins(
// should interpret the hash table probe as "unknown" if there are nulls on the
// build side. For those rows, we need to process the remaining join
// predicates later.
- if (null_aware_partition_->build_rows()->num_rows() != 0) {
- if (num_other_join_conjuncts > 0 &&
- UNLIKELY(!AppendRow(null_aware_partition_->probe_rows(),
- current_probe_row_, status))) {
- DCHECK(!status->ok());
- return false;
+ if (builder_->null_aware_partition()->build_rows()->num_rows() != 0) {
+ if (num_other_join_conjuncts > 0) {
+ *status = AppendProbeRow(
+ null_aware_probe_partition_->probe_rows(), current_probe_row_);
+ if (UNLIKELY(!status->ok())) return false;
}
return true;
}
@@ -271,7 +267,7 @@ bool IR_ALWAYS_INLINE PartitionedHashJoinNode::NextProbeRow(
// Fetch the hash and expr values' nullness for this row.
if (expr_vals_cache->IsRowNull()) {
- if (JoinOp == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN && non_empty_build_) {
+ if (JoinOp == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN && builder_->non_empty_build()) {
const int num_other_join_conjuncts = other_join_conjunct_ctxs_.size();
// For NAAJ, we need to treat NULLs on the probe carefully. The logic is:
// 1. No build rows -> Return this row. The check for 'non_empty_build_'
@@ -284,14 +280,12 @@ bool IR_ALWAYS_INLINE PartitionedHashJoinNode::NextProbeRow(
if (num_other_join_conjuncts == 0) {
// Condition 2 above.
skip_row = true;
- } else if (LIKELY(AppendRow(null_probe_rows_, current_probe_row_, status))) {
+ } else {
// Condition 3 above.
+ *status = AppendProbeRow(null_probe_rows_.get(), current_probe_row_);
+ if (UNLIKELY(!status->ok())) return false;
matched_null_probe_.push_back(false);
skip_row = true;
- } else {
- // Condition 3 above but failed to append to 'null_probe_rows_'. Bail out.
- DCHECK(!status->ok());
- return false;
}
}
} else {
@@ -300,20 +294,20 @@ bool IR_ALWAYS_INLINE PartitionedHashJoinNode::NextProbeRow(
hash_tbl_iterator_ = hash_tbl->FindProbeRow(ht_ctx);
} else {
// The build partition is either empty or spilled.
- Partition* partition = hash_partitions_[partition_idx];
- // This partition is closed, meaning the build side for this partition was empty.
- if (UNLIKELY(partition->is_closed())) {
- DCHECK(state_ == PROCESSING_PROBE || state_ == REPARTITIONING);
+ PhjBuilder::Partition* build_partition = builder_->hash_partition(partition_idx);
+ ProbePartition* probe_partition = probe_hash_partitions_[partition_idx].get();
+ DCHECK((build_partition->IsClosed() && probe_partition == NULL)
+ || (build_partition->is_spilled() && probe_partition != NULL));
+
+ if (UNLIKELY(probe_partition == NULL)) {
+ // A closed partition implies that the build side for this partition was empty.
} else {
- // This partition is not in memory, spill the probe row and move to the next row.
- DCHECK(partition->is_spilled());
- DCHECK(partition->probe_rows() != NULL);
+ // The partition is not in memory, spill the probe row and move to the next row.
// Skip the current row if we manage to append to the spilled partition's BTS.
// Otherwise, we need to bail out and report the failure.
- if (UNLIKELY(!AppendRow(partition->probe_rows(), current_probe_row_, status))) {
- DCHECK(!status->ok());
- return false;
- }
+ BufferedTupleStream* probe_rows = probe_partition->probe_rows();
+ *status = AppendProbeRow(probe_rows, current_probe_row_);
+ if (UNLIKELY(!status->ok())) return false;
skip_row = true;
}
}
@@ -358,9 +352,11 @@ void IR_ALWAYS_INLINE PartitionedHashJoinNode::EvalAndHashProbePrefetchGroup(
}
// CreateOutputRow, EvalOtherJoinConjuncts, and EvalConjuncts are replaced by codegen.
-template<int const JoinOp>
+template <int const JoinOp>
int PartitionedHashJoinNode::ProcessProbeBatch(TPrefetchMode::type prefetch_mode,
RowBatch* out_batch, HashTableCtx* __restrict__ ht_ctx, Status* __restrict__ status) {
+ DCHECK(state_ == PARTITIONING_PROBE || state_ == PROBING_SPILLED_PARTITION
+ || state_ == REPARTITIONING_PROBE);
ExprContext* const* other_join_conjunct_ctxs = &other_join_conjunct_ctxs_[0];
const int num_other_join_conjuncts = other_join_conjunct_ctxs_.size();
ExprContext* const* conjunct_ctxs = &conjunct_ctxs_[0];
@@ -430,82 +426,15 @@ int PartitionedHashJoinNode::ProcessProbeBatch(TPrefetchMode::type prefetch_mode
return num_rows_added;
}
-Status PartitionedHashJoinNode::ProcessBuildBatch(RowBatch* build_batch,
- bool build_filters) {
- HashTableCtx::ExprValuesCache* expr_vals_cache = ht_ctx_->expr_values_cache();
- expr_vals_cache->Reset();
- FOREACH_ROW(build_batch, 0, build_batch_iter) {
- DCHECK(build_status_.ok());
- TupleRow* build_row = build_batch_iter.Get();
- if (!ht_ctx_->EvalAndHashBuild(build_row)) {
- if (null_aware_partition_ != NULL) {
- // TODO: remove with codegen/template
- // If we are NULL aware and this build row has NULL in the eq join slot,
- // append it to the null_aware partition. We will need it later.
- if (UNLIKELY(!AppendRow(null_aware_partition_->build_rows(),
- build_row, &build_status_))) {
- return build_status_;
- }
- }
- continue;
- }
- if (build_filters) {
- DCHECK_EQ(ht_ctx_->level(), 0)
- << "Runtime filters should not be built during repartitioning.";
- for (const FilterContext& ctx: filters_) {
- // TODO: codegen expr evaluation and hashing
- if (ctx.local_bloom_filter == NULL) continue;
- void* e = ctx.expr->GetValue(build_row);
- uint32_t filter_hash = RawValue::GetHashValue(e, ctx.expr->root()->type(),
- RuntimeFilterBank::DefaultHashSeed());
- ctx.local_bloom_filter->Insert(filter_hash);
- }
- }
- const uint32_t hash = expr_vals_cache->CurExprValuesHash();
- const uint32_t partition_idx = hash >> (32 - NUM_PARTITIONING_BITS);
- Partition* partition = hash_partitions_[partition_idx];
- const bool result = AppendRow(partition->build_rows(), build_row, &build_status_);
- if (UNLIKELY(!result)) return build_status_;
- }
- return Status::OK();
-}
-
-bool PartitionedHashJoinNode::Partition::InsertBatch(
- TPrefetchMode::type prefetch_mode, HashTableCtx* ht_ctx, RowBatch* batch,
- const vector<BufferedTupleStream::RowIdx>& indices) {
- // Compute the hash values and prefetch the hash table buckets.
- const int num_rows = batch->num_rows();
- HashTableCtx::ExprValuesCache* expr_vals_cache = ht_ctx->expr_values_cache();
- const int prefetch_size = expr_vals_cache->capacity();
- const BufferedTupleStream::RowIdx* row_indices = indices.data();
- for (int prefetch_group_row = 0; prefetch_group_row < num_rows;
- prefetch_group_row += prefetch_size) {
- int cur_row = prefetch_group_row;
- expr_vals_cache->Reset();
- FOREACH_ROW_LIMIT(batch, cur_row, prefetch_size, batch_iter) {
- if (ht_ctx->EvalAndHashBuild(batch_iter.Get())) {
- if (prefetch_mode != TPrefetchMode::NONE) {
- hash_tbl_->PrefetchBucket<false>(expr_vals_cache->CurExprValuesHash());
- }
- } else {
- expr_vals_cache->SetRowNull();
- }
- expr_vals_cache->NextRow();
- }
- // Do the insertion.
- expr_vals_cache->ResetForRead();
- FOREACH_ROW_LIMIT(batch, cur_row, prefetch_size, batch_iter) {
- TupleRow* row = batch_iter.Get();
- BufferedTupleStream::RowIdx row_idx = row_indices[cur_row];
- if (!expr_vals_cache->IsRowNull() &&
- UNLIKELY(!hash_tbl_->Insert(ht_ctx, row_idx, row))) {
- return false;
- }
- expr_vals_cache->NextRow();
- ++cur_row;
- }
- }
- return true;
+inline Status PartitionedHashJoinNode::AppendProbeRow(
+ BufferedTupleStream* stream, TupleRow* row) {
+ DCHECK(stream->has_write_block());
+ DCHECK(!stream->using_small_buffers());
+ DCHECK(!stream->is_pinned());
+ Status status;
+ if (LIKELY(stream->AddRow(row, &status))) return Status::OK();
+ DCHECK(!status.ok());
+ return status;
}
template int PartitionedHashJoinNode::ProcessProbeBatch<TJoinOp::INNER_JOIN>(