You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2017/08/05 03:18:20 UTC
[09/11] incubator-impala git commit: IMPALA-4674: Part 2: port
backend exec to BufferPool
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/exec/partitioned-hash-join-builder.h
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-builder.h b/be/src/exec/partitioned-hash-join-builder.h
index e0393b5..912613d 100644
--- a/be/src/exec/partitioned-hash-join-builder.h
+++ b/be/src/exec/partitioned-hash-join-builder.h
@@ -26,8 +26,9 @@
#include "exec/data-sink.h"
#include "exec/filter-context.h"
#include "exec/hash-table.h"
-#include "runtime/buffered-block-mgr.h"
-#include "runtime/buffered-tuple-stream.h"
+#include "runtime/buffered-tuple-stream-v2.h"
+#include "runtime/bufferpool/buffer-pool.h"
+#include "runtime/bufferpool/suballocator.h"
#include "gen-cpp/PlanNodes_types.h"
@@ -56,7 +57,7 @@ class ScalarExprEvaluator;
/// RepartitionBuildInput() to repartition a level n partition into multiple level n + 1
/// partitions.
///
-/// Both the PartitionedHashJoinNode and the builder share a BufferedBlockMgr client
+/// Both the PartitionedHashJoinNode and the builder share a BufferPool client
/// and the corresponding reservations. Different stages of the spilling algorithm
/// require different mixes of build and probe buffers and hash tables, so we can
/// share the reservation to minimize the combined memory requirement. Initial probe-side
@@ -72,7 +73,8 @@ class PhjBuilder : public DataSink {
class Partition;
PhjBuilder(int join_node_id, TJoinOp::type join_op, const RowDescriptor* probe_row_desc,
- const RowDescriptor* build_row_desc, RuntimeState* state);
+ const RowDescriptor* build_row_desc, RuntimeState* state,
+ BufferPool::ClientHandle* buffer_pool_client, int64_t spillable_buffer_size);
Status InitExprsAndFilters(RuntimeState* state,
const std::vector<TEqJoinCondition>& eq_join_conjuncts,
@@ -101,7 +103,7 @@ class PhjBuilder : public DataSink {
/// Transfer ownership of the probe streams to the caller. One stream was allocated per
/// spilled partition in FlushFinal(). The probe streams are empty but prepared for
/// writing with a write buffer allocated.
- std::vector<std::unique_ptr<BufferedTupleStream>> TransferProbeStreams();
+ std::vector<std::unique_ptr<BufferedTupleStreamV2>> TransferProbeStreams();
/// Clears the current list of hash partitions. Called after probing of the partitions
/// is done. The partitions are not closed or destroyed, since they may be spilled or
@@ -122,7 +124,7 @@ class PhjBuilder : public DataSink {
/// 'input_probe_rows' for reading in "delete_on_read" mode, so that the probe phase
/// has enough buffers preallocated to execute successfully.
Status RepartitionBuildInput(Partition* input_partition, int level,
- BufferedTupleStream* input_probe_rows) WARN_UNUSED_RESULT;
+ BufferedTupleStreamV2* input_probe_rows) WARN_UNUSED_RESULT;
/// Returns the largest build row count out of the current hash partitions.
int64_t LargestPartitionRows() const;
@@ -132,7 +134,6 @@ class PhjBuilder : public DataSink {
bool HashTableStoresNulls() const;
/// Accessor functions, mainly required to expose state to PartitionedHashJoinNode.
- inline BufferedBlockMgr::Client* block_mgr_client() const { return block_mgr_client_; }
inline bool non_empty_build() const { return non_empty_build_; }
inline const std::vector<bool>& is_not_distinct_from() const {
return is_not_distinct_from_;
@@ -200,24 +201,27 @@ class PhjBuilder : public DataSink {
/// Spills this partition, the partition's stream is unpinned with 'mode' and
/// its hash table is destroyed if it was built.
- Status Spill(BufferedTupleStream::UnpinMode mode) WARN_UNUSED_RESULT;
+ Status Spill(BufferedTupleStreamV2::UnpinMode mode) WARN_UNUSED_RESULT;
bool ALWAYS_INLINE IsClosed() const { return build_rows_ == NULL; }
- BufferedTupleStream* ALWAYS_INLINE build_rows() { return build_rows_.get(); }
+ BufferedTupleStreamV2* ALWAYS_INLINE build_rows() { return build_rows_.get(); }
HashTable* ALWAYS_INLINE hash_tbl() const { return hash_tbl_.get(); }
bool ALWAYS_INLINE is_spilled() const { return is_spilled_; }
int ALWAYS_INLINE level() const { return level_; }
private:
- /// Inserts each row in 'batch' into 'hash_tbl_' using 'ctx'. 'indices' is an array
- /// containing the index of each row's index into the hash table's tuple stream.
+ /// Inserts each row in 'batch' into 'hash_tbl_' using 'ctx'. 'flat_rows' is an array
+ /// containing the rows in the hash table's tuple stream.
/// 'prefetch_mode' is the prefetching mode in use. If it's not PREFETCH_NONE, hash
/// table buckets which the rows hashes to will be prefetched. This parameter is
/// replaced with a constant during codegen time. This function may be replaced with
/// a codegen'd version. Returns true if all rows in 'batch' are successfully
- /// inserted.
+ /// inserted and false otherwise. If inserting failed, 'status' indicates why it
+ /// failed: if 'status' is ok, inserting failed because not enough reservation
+ /// was available and if 'status' is an error, inserting failed because of that error.
bool InsertBatch(TPrefetchMode::type prefetch_mode, HashTableCtx* ctx,
- RowBatch* batch, const std::vector<BufferedTupleStream::RowIdx>& indices);
+ RowBatch* batch, const std::vector<BufferedTupleStreamV2::FlatRowPtr>& flat_rows,
+ Status* status);
const PhjBuilder* parent_;
@@ -235,16 +239,9 @@ class PhjBuilder : public DataSink {
/// Stream of build tuples in this partition. Initially owned by this object but
/// transferred to the parent exec node (via the row batch) when the partition
/// is closed. If NULL, ownership has been transferred and the partition is closed.
- std::unique_ptr<BufferedTupleStream> build_rows_;
+ std::unique_ptr<BufferedTupleStreamV2> build_rows_;
};
- protected:
- /// Init() function inherited from DataSink. Overridden to be a no-op for now.
- /// TODO: Merge with InitExprsAndFilters() once this class becomes a true data sink.
- virtual Status Init(const std::vector<TExpr>& thrift_output_exprs,
- const TDataSink& tsink, RuntimeState* state) override;
-
- private:
/// Computes the minimum number of buffers required to execute the spilling partitioned
/// hash algorithm successfully for any input size (assuming enough disk space is
/// available for spilled rows). The buffers are used for buffering both build and
@@ -255,15 +252,22 @@ class PhjBuilder : public DataSink {
/// For NAAJ, we need 3 additional buffers for 'null_aware_partition_',
/// 'null_aware_probe_partition_' and 'null_probe_rows_'.
int MinRequiredBuffers() const {
- // Must be kept in sync with HashJoinNode.computeResourceProfile() in fe.
+ // Must be kept in sync with HashJoinNode.computeNodeResourceProfile() in fe.
int num_reserved_buffers = PARTITION_FANOUT + 1;
if (join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) num_reserved_buffers += 3;
return num_reserved_buffers;
}
+ protected:
+ /// Init() function inherited from DataSink. Overridden to be a no-op for now.
+ /// TODO: Merge with InitExprsAndFilters() once this class becomes a true data sink.
+ virtual Status Init(const std::vector<TExpr>& thrift_output_exprs,
+ const TDataSink& tsink, RuntimeState* state) override;
+
/// Free local allocations made from expr evaluators during hash table construction.
void FreeLocalAllocations() const;
+ private:
/// Create and initialize a set of hash partitions for partitioning level 'level'.
/// The previous hash partitions must have been cleared with ClearHashPartitions().
/// After calling this, batches are added to the new partitions by calling Send().
@@ -284,19 +288,19 @@ class PhjBuilder : public DataSink {
/// partitions. This odd return convention is used to avoid emitting unnecessary code
/// for ~Status in perf-critical code.
bool AppendRow(
- BufferedTupleStream* stream, TupleRow* row, Status* status) WARN_UNUSED_RESULT;
+ BufferedTupleStreamV2* stream, TupleRow* row, Status* status) WARN_UNUSED_RESULT;
/// Slow path for AppendRow() above. It is called when the stream has failed to append
/// the row. We need to find more memory by either switching to IO-buffers, in case the
/// stream still uses small buffers, or spilling a partition. Returns false and sets
/// 'status' if it was unable to append the row, even after spilling partitions.
- bool AppendRowStreamFull(BufferedTupleStream* stream, TupleRow* row,
+ bool AppendRowStreamFull(BufferedTupleStreamV2* stream, TupleRow* row,
Status* status) noexcept WARN_UNUSED_RESULT;
/// Frees memory by spilling one of the hash partitions. The 'mode' argument is passed
/// to the Spill() call for the selected partition. The current policy is to spill the
/// largest partition. Returns non-ok status if we couldn't spill a partition.
- Status SpillPartition(BufferedTupleStream::UnpinMode mode) WARN_UNUSED_RESULT;
+ Status SpillPartition(BufferedTupleStreamV2::UnpinMode mode) WARN_UNUSED_RESULT;
/// Tries to build hash tables for all unspilled hash partitions. Called after
/// FlushFinal() when all build rows have been partitioned and added to the appropriate
@@ -358,14 +362,20 @@ class PhjBuilder : public DataSink {
/// Pool for objects with same lifetime as builder.
ObjectPool pool_;
- /// Client to the buffered block mgr, used to allocate build partition buffers and hash
- /// tables. When probing, the spilling algorithm keeps some build partitions in memory
- /// while using memory for probe buffers for spilled partitions. To support dynamically
- /// dividing memory between build and probe, this client is owned by the builder but
- /// shared with the PartitionedHashJoinNode.
+ /// Client to the buffer pool, used to allocate build partition buffers and hash tables.
+ /// When probing, the spilling algorithm keeps some build partitions in memory while
+ /// using memory for probe buffers for spilled partitions. To support dynamically
+ /// dividing memory between build and probe, this client is shared between the builder
+ /// and the PartitionedHashJoinNode.
/// TODO: this approach to sharing will not work for spilling broadcast joins with a
/// 1:N relationship from builders to join nodes.
- BufferedBlockMgr::Client* block_mgr_client_;
+ BufferPool::ClientHandle* buffer_pool_client_;
+
+ /// The size of buffers to use in the build and probe streams.
+ const int64_t spillable_buffer_size_;
+
+ /// Allocator for hash table memory.
+ boost::scoped_ptr<Suballocator> ht_allocator_;
/// If true, the build side has at least one row.
bool non_empty_build_;
@@ -454,7 +464,7 @@ class PhjBuilder : public DataSink {
///
/// Because of this, at the end of the build phase, we always have sufficient memory
/// to execute the probe phase of the algorithm without spilling more partitions.
- std::vector<std::unique_ptr<BufferedTupleStream>> spilled_partition_probe_streams_;
+ std::vector<std::unique_ptr<BufferedTupleStreamV2>> spilled_partition_probe_streams_;
/// END: Members that must be Reset()
/////////////////////////////////////////
@@ -469,7 +479,7 @@ class PhjBuilder : public DataSink {
ProcessBuildBatchFn process_build_batch_fn_level0_;
typedef bool (*InsertBatchFn)(Partition*, TPrefetchMode::type, HashTableCtx*, RowBatch*,
- const std::vector<BufferedTupleStream::RowIdx>&);
+ const std::vector<BufferedTupleStreamV2::FlatRowPtr>&, Status*);
/// Jitted Partition::InsertBatch() function pointers. NULL if codegen is disabled.
InsertBatchFn insert_batch_fn_;
InsertBatchFn insert_batch_fn_level0_;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/exec/partitioned-hash-join-node-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-node-ir.cc b/be/src/exec/partitioned-hash-join-node-ir.cc
index 2c951d1..b890eb9 100644
--- a/be/src/exec/partitioned-hash-join-node-ir.cc
+++ b/be/src/exec/partitioned-hash-join-node-ir.cc
@@ -313,7 +313,7 @@ bool IR_ALWAYS_INLINE PartitionedHashJoinNode::NextProbeRow(
// The partition is not in memory, spill the probe row and move to the next row.
// Skip the current row if we manage to append to the spilled partition's BTS.
// Otherwise, we need to bail out and report the failure.
- BufferedTupleStream* probe_rows = probe_partition->probe_rows();
+ BufferedTupleStreamV2* probe_rows = probe_partition->probe_rows();
if (UNLIKELY(!AppendProbeRow(probe_rows, current_probe_row_, status))) {
DCHECK(!status->ok());
return false;
@@ -438,9 +438,8 @@ int PartitionedHashJoinNode::ProcessProbeBatch(TPrefetchMode::type prefetch_mode
}
inline bool PartitionedHashJoinNode::AppendProbeRow(
- BufferedTupleStream* stream, TupleRow* row, Status* status) {
- DCHECK(stream->has_write_block());
- DCHECK(!stream->using_small_buffers());
+ BufferedTupleStreamV2* stream, TupleRow* row, Status* status) {
+ DCHECK(stream->has_write_iterator());
DCHECK(!stream->is_pinned());
return stream->AddRow(row, status);
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/exec/partitioned-hash-join-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-node.cc b/be/src/exec/partitioned-hash-join-node.cc
index a5c9897..2db9e00 100644
--- a/be/src/exec/partitioned-hash-join-node.cc
+++ b/be/src/exec/partitioned-hash-join-node.cc
@@ -27,8 +27,7 @@
#include "exprs/scalar-expr.h"
#include "exprs/scalar-expr-evaluator.h"
#include "exprs/slot-ref.h"
-#include "runtime/buffered-block-mgr.h"
-#include "runtime/buffered-tuple-stream.inline.h"
+#include "runtime/buffered-tuple-stream-v2.inline.h"
#include "runtime/mem-tracker.h"
#include "runtime/row-batch.h"
#include "runtime/runtime-state.h"
@@ -47,9 +46,15 @@ static const string PREPARE_FOR_READ_FAILED_ERROR_MSG =
"successfully.";
using namespace impala;
-using namespace llvm;
-using namespace strings;
-using std::unique_ptr;
+using llvm::BasicBlock;
+using llvm::ConstantInt;
+using llvm::Function;
+using llvm::GlobalValue;
+using llvm::LLVMContext;
+using llvm::PointerType;
+using llvm::Type;
+using llvm::Value;
+using strings::Substitute;
PartitionedHashJoinNode::PartitionedHashJoinNode(
ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
@@ -77,8 +82,9 @@ Status PartitionedHashJoinNode::Init(const TPlanNode& tnode, RuntimeState* state
// TODO: allow PhjBuilder to be the sink of a separate fragment. For now, PhjBuilder is
// owned by this node, but duplicates some state (exprs, etc) in anticipation of it
// being separated out further.
- builder_.reset(
- new PhjBuilder(id(), join_op_, child(0)->row_desc(), child(1)->row_desc(), state));
+ builder_.reset(new PhjBuilder(id(), join_op_, child(0)->row_desc(),
+ child(1)->row_desc(), state, &buffer_pool_client_,
+ resource_profile_.spillable_buffer_size));
RETURN_IF_ERROR(
builder_->InitExprsAndFilters(state, eq_join_conjuncts, tnode.runtime_filters));
@@ -177,6 +183,11 @@ Status PartitionedHashJoinNode::Open(RuntimeState* state) {
}
Status PartitionedHashJoinNode::AcquireResourcesForBuild(RuntimeState* state) {
+ DCHECK_GE(resource_profile_.min_reservation,
+ resource_profile_.spillable_buffer_size * builder_->MinRequiredBuffers());
+ if (!buffer_pool_client_.is_registered()) {
+ RETURN_IF_ERROR(ClaimBufferReservation(state));
+ }
if (join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
// Initialize these partitions before doing the build so that the build does not
// use the reservation intended for them.
@@ -254,12 +265,10 @@ void PartitionedHashJoinNode::Close(RuntimeState* state) {
PartitionedHashJoinNode::ProbePartition::ProbePartition(RuntimeState* state,
PartitionedHashJoinNode* parent, PhjBuilder::Partition* build_partition,
- unique_ptr<BufferedTupleStream> probe_rows)
- : parent_(parent),
- build_partition_(build_partition),
+ unique_ptr<BufferedTupleStreamV2> probe_rows)
+ : build_partition_(build_partition),
probe_rows_(std::move(probe_rows)) {
- DCHECK(probe_rows_->has_write_block());
- DCHECK(!probe_rows_->using_small_buffers());
+ DCHECK(probe_rows_->has_write_iterator());
DCHECK(!probe_rows_->is_pinned());
}
@@ -270,10 +279,7 @@ PartitionedHashJoinNode::ProbePartition::~ProbePartition() {
Status PartitionedHashJoinNode::ProbePartition::PrepareForRead() {
bool got_read_buffer;
RETURN_IF_ERROR(probe_rows_->PrepareForRead(true, &got_read_buffer));
- if (!got_read_buffer) {
- return parent_->mem_tracker()->MemLimitExceeded(parent_->runtime_state_,
- Substitute(PREPARE_FOR_READ_FAILED_ERROR_MSG, parent_->id_));
- }
+ DCHECK(got_read_buffer) << "Accounted in min reservation";
return Status::OK();
}
@@ -322,7 +328,7 @@ Status PartitionedHashJoinNode::NextSpilledProbeRowBatch(
probe_batch_pos_ = -1;
return Status::OK();
}
- BufferedTupleStream* probe_rows = input_partition_->probe_rows();
+ BufferedTupleStreamV2* probe_rows = input_partition_->probe_rows();
if (LIKELY(probe_rows->rows_returned() < probe_rows->num_rows())) {
// Continue from the current probe stream.
bool eos = false;
@@ -414,12 +420,11 @@ Status PartitionedHashJoinNode::PrepareSpilledPartitionForProbe(
ht_ctx_->set_level(next_partition_level);
// Spill to free memory from hash tables and pinned streams for use in new partitions.
- RETURN_IF_ERROR(build_partition->Spill(BufferedTupleStream::UNPIN_ALL));
+ RETURN_IF_ERROR(build_partition->Spill(BufferedTupleStreamV2::UNPIN_ALL));
// Temporarily free up the probe buffer to use when repartitioning.
- RETURN_IF_ERROR(
- input_partition_->probe_rows()->UnpinStream(BufferedTupleStream::UNPIN_ALL));
- DCHECK_EQ(build_partition->build_rows()->blocks_pinned(), 0) << NodeDebugString();
- DCHECK_EQ(input_partition_->probe_rows()->blocks_pinned(), 0) << NodeDebugString();
+ input_partition_->probe_rows()->UnpinStream(BufferedTupleStreamV2::UNPIN_ALL);
+ DCHECK_EQ(build_partition->build_rows()->BytesPinned(false), 0) << NodeDebugString();
+ DCHECK_EQ(input_partition_->probe_rows()->BytesPinned(false), 0) << NodeDebugString();
int64_t num_input_rows = build_partition->build_rows()->num_rows();
RETURN_IF_ERROR(builder_->RepartitionBuildInput(
build_partition, next_partition_level, input_partition_->probe_rows()));
@@ -430,7 +435,8 @@ Status PartitionedHashJoinNode::PrepareSpilledPartitionForProbe(
"more rows than the input";
if (UNLIKELY(num_input_rows == largest_partition_rows)) {
return Status(TErrorCode::PARTITIONED_HASH_JOIN_REPARTITION_FAILS, id_,
- next_partition_level, num_input_rows);
+ next_partition_level, num_input_rows, NodeDebugString(),
+ buffer_pool_client_.DebugString());
}
RETURN_IF_ERROR(PrepareForProbe());
@@ -816,18 +822,18 @@ static Status NullAwareAntiJoinError(bool build) {
Status PartitionedHashJoinNode::InitNullAwareProbePartition() {
RuntimeState* state = runtime_state_;
- unique_ptr<BufferedTupleStream> probe_rows = std::make_unique<BufferedTupleStream>(
- state, child(0)->row_desc(), state->block_mgr(), builder_->block_mgr_client(),
- false /* use_initial_small_buffers */, false /* read_write */);
- Status status = probe_rows->Init(id(), runtime_profile(), false);
+ unique_ptr<BufferedTupleStreamV2> probe_rows = make_unique<BufferedTupleStreamV2>(
+ state, child(0)->row_desc(), &buffer_pool_client_,
+ resource_profile_.spillable_buffer_size,
+ resource_profile_.spillable_buffer_size);
+ // TODO: this should be pinned if spilling is disabled.
+ Status status = probe_rows->Init(id(), false);
if (!status.ok()) goto error;
bool got_buffer;
status = probe_rows->PrepareForWrite(&got_buffer);
if (!status.ok()) goto error;
- if (!got_buffer) {
- status = state->block_mgr()->MemLimitTooLowError(builder_->block_mgr_client(), id());
- goto error;
- }
+ DCHECK(got_buffer)
+ << "Accounted in min reservation" << buffer_pool_client_.DebugString();
null_aware_probe_partition_.reset(new ProbePartition(
state, this, builder_->null_aware_partition(), std::move(probe_rows)));
return Status::OK();
@@ -841,15 +847,15 @@ error:
Status PartitionedHashJoinNode::InitNullProbeRows() {
RuntimeState* state = runtime_state_;
- null_probe_rows_ = std::make_unique<BufferedTupleStream>(state, child(0)->row_desc(),
- state->block_mgr(), builder_->block_mgr_client(),
- false /* use_initial_small_buffers */, false /* read_write */);
- RETURN_IF_ERROR(null_probe_rows_->Init(id(), runtime_profile(), false));
+ null_probe_rows_ = make_unique<BufferedTupleStreamV2>(state, child(0)->row_desc(),
+ &buffer_pool_client_, resource_profile_.spillable_buffer_size,
+ resource_profile_.spillable_buffer_size);
+ // TODO: we shouldn't start with this unpinned if spilling is disabled.
+ RETURN_IF_ERROR(null_probe_rows_->Init(id(), false));
bool got_buffer;
RETURN_IF_ERROR(null_probe_rows_->PrepareForWrite(&got_buffer));
- if (!got_buffer) {
- return state->block_mgr()->MemLimitTooLowError(builder_->block_mgr_client(), id());
- }
+ DCHECK(got_buffer)
+ << "Accounted in min reservation" << buffer_pool_client_.DebugString();
return Status::OK();
}
@@ -860,8 +866,8 @@ Status PartitionedHashJoinNode::PrepareNullAwarePartition() {
DCHECK_EQ(probe_batch_pos_, -1);
DCHECK_EQ(probe_batch_->num_rows(), 0);
- BufferedTupleStream* build_stream = builder_->null_aware_partition()->build_rows();
- BufferedTupleStream* probe_stream = null_aware_probe_partition_->probe_rows();
+ BufferedTupleStreamV2* build_stream = builder_->null_aware_partition()->build_rows();
+ BufferedTupleStreamV2* probe_stream = null_aware_probe_partition_->probe_rows();
if (build_stream->num_rows() == 0) {
// There were no build rows. Nothing to do. Just prepare to output the null
@@ -874,7 +880,7 @@ Status PartitionedHashJoinNode::PrepareNullAwarePartition() {
// Bring the entire spilled build stream into memory and read into a single batch.
bool got_rows;
- RETURN_IF_ERROR(build_stream->GetRows(&nulls_build_batch_, &got_rows));
+ RETURN_IF_ERROR(build_stream->GetRows(mem_tracker(), &nulls_build_batch_, &got_rows));
if (!got_rows) return NullAwareAntiJoinError(true);
// Initialize the streams for read.
@@ -898,7 +904,7 @@ Status PartitionedHashJoinNode::OutputNullAwareProbeRows(RuntimeState* state,
int num_join_conjuncts = other_join_conjuncts_.size();
DCHECK(probe_batch_ != NULL);
- BufferedTupleStream* probe_stream = null_aware_probe_partition_->probe_rows();
+ BufferedTupleStreamV2* probe_stream = null_aware_probe_partition_->probe_rows();
if (probe_batch_pos_ == probe_batch_->num_rows()) {
probe_batch_pos_ = 0;
probe_batch_->TransferResourceOwnership(out_batch);
@@ -946,7 +952,8 @@ Status PartitionedHashJoinNode::PrepareForProbe() {
DCHECK(probe_hash_partitions_.empty());
// Initialize the probe partitions, providing them with probe streams.
- vector<unique_ptr<BufferedTupleStream>> probe_streams = builder_->TransferProbeStreams();
+ vector<unique_ptr<BufferedTupleStreamV2>> probe_streams =
+ builder_->TransferProbeStreams();
probe_hash_partitions_.resize(PARTITION_FANOUT);
for (int i = 0; i < PARTITION_FANOUT; ++i) {
PhjBuilder::Partition* build_partition = builder_->hash_partition(i);
@@ -982,16 +989,16 @@ Status PartitionedHashJoinNode::PrepareForProbe() {
}
void PartitionedHashJoinNode::CreateProbePartition(
- int partition_idx, unique_ptr<BufferedTupleStream> probe_rows) {
+ int partition_idx, unique_ptr<BufferedTupleStreamV2> probe_rows) {
DCHECK_GE(partition_idx, 0);
DCHECK_LT(partition_idx, probe_hash_partitions_.size());
DCHECK(probe_hash_partitions_[partition_idx] == NULL);
- probe_hash_partitions_[partition_idx] = std::make_unique<ProbePartition>(runtime_state_,
+ probe_hash_partitions_[partition_idx] = make_unique<ProbePartition>(runtime_state_,
this, builder_->hash_partition(partition_idx), std::move(probe_rows));
}
Status PartitionedHashJoinNode::EvaluateNullProbe(
- RuntimeState* state, BufferedTupleStream* build) {
+ RuntimeState* state, BufferedTupleStreamV2* build) {
if (null_probe_rows_ == NULL || null_probe_rows_->num_rows() == 0) {
return Status::OK();
}
@@ -1000,10 +1007,10 @@ Status PartitionedHashJoinNode::EvaluateNullProbe(
// Bring both the build and probe side into memory and do a pairwise evaluation.
bool got_rows;
scoped_ptr<RowBatch> build_rows;
- RETURN_IF_ERROR(build->GetRows(&build_rows, &got_rows));
+ RETURN_IF_ERROR(build->GetRows(mem_tracker(), &build_rows, &got_rows));
if (!got_rows) return NullAwareAntiJoinError(true);
scoped_ptr<RowBatch> probe_rows;
- RETURN_IF_ERROR(null_probe_rows_->GetRows(&probe_rows, &got_rows));
+ RETURN_IF_ERROR(null_probe_rows_->GetRows(mem_tracker(), &probe_rows, &got_rows));
if (!got_rows) return NullAwareAntiJoinError(false);
ScalarExprEvaluator* const* join_conjunct_evals = other_join_conjunct_evals_.data();
@@ -1060,11 +1067,9 @@ Status PartitionedHashJoinNode::CleanUpHashPartitions(
// can recurse the algorithm and create new hash partitions from spilled partitions.
// TODO: we shouldn't need to unpin the build stream if we stop spilling
// while probing.
- RETURN_IF_ERROR(
- build_partition->build_rows()->UnpinStream(BufferedTupleStream::UNPIN_ALL));
- DCHECK_EQ(build_partition->build_rows()->blocks_pinned(), 0);
- RETURN_IF_ERROR(
- probe_partition->probe_rows()->UnpinStream(BufferedTupleStream::UNPIN_ALL));
+ build_partition->build_rows()->UnpinStream(BufferedTupleStreamV2::UNPIN_ALL);
+ DCHECK_EQ(build_partition->build_rows()->BytesPinned(false), 0);
+ probe_partition->probe_rows()->UnpinStream(BufferedTupleStreamV2::UNPIN_ALL);
if (probe_partition->probe_rows()->num_rows() != 0
|| NeedToProcessUnmatchedBuildRows()) {
@@ -1102,9 +1107,9 @@ Status PartitionedHashJoinNode::CleanUpHashPartitions(
// Just finished evaluating the null probe rows with all the non-spilled build
// partitions. Unpin this now to free this memory for repartitioning.
- if (null_probe_rows_ != NULL)
- RETURN_IF_ERROR(
- null_probe_rows_->UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT));
+ if (null_probe_rows_ != NULL) {
+ null_probe_rows_->UnpinStream(BufferedTupleStreamV2::UNPIN_ALL_EXCEPT_CURRENT);
+ }
builder_->ClearHashPartitions();
probe_hash_partitions_.clear();
@@ -1165,10 +1170,10 @@ string PartitionedHashJoinNode::NodeDebugString() const {
ss << " Probe hash partition " << i << ": ";
if (probe_partition != NULL) {
ss << "probe ptr=" << probe_partition;
- BufferedTupleStream* probe_rows = probe_partition->probe_rows();
+ BufferedTupleStreamV2* probe_rows = probe_partition->probe_rows();
if (probe_rows != NULL) {
- ss << " Probe Rows: " << probe_rows->num_rows()
- << " (Blocks pinned: " << probe_rows->blocks_pinned() << ")";
+ ss << " Probe Rows: " << probe_rows->num_rows()
+ << " (Bytes pinned: " << probe_rows->BytesPinned(false) << ")";
}
}
ss << endl;
@@ -1189,12 +1194,15 @@ string PartitionedHashJoinNode::NodeDebugString() const {
}
}
if (input_partition_ != NULL) {
- DCHECK(input_partition_->build_partition()->build_rows() != NULL);
DCHECK(input_partition_->probe_rows() != NULL);
- ss << "InputPartition: " << input_partition_.get() << endl
- << " Spilled Build Rows: "
- << input_partition_->build_partition()->build_rows()->num_rows() << endl
- << " Spilled Probe Rows: " << input_partition_->probe_rows()->num_rows() << endl;
+ ss << "InputPartition: " << input_partition_.get() << endl;
+ PhjBuilder::Partition* build_partition = input_partition_->build_partition();
+ if (build_partition->IsClosed()) {
+ ss << " Build Partition Closed" << endl;
+ } else {
+ ss << " Build Rows: " << build_partition->build_rows()->num_rows() << endl;
+ }
+ ss << " Probe Rows: " << input_partition_->probe_rows()->num_rows() << endl;
} else {
ss << "InputPartition: NULL" << endl;
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/exec/partitioned-hash-join-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-node.h b/be/src/exec/partitioned-hash-join-node.h
index 73e0dd5..b3f663e 100644
--- a/be/src/exec/partitioned-hash-join-node.h
+++ b/be/src/exec/partitioned-hash-join-node.h
@@ -15,28 +15,24 @@
// specific language governing permissions and limitations
// under the License.
-
#ifndef IMPALA_EXEC_PARTITIONED_HASH_JOIN_NODE_H
#define IMPALA_EXEC_PARTITIONED_HASH_JOIN_NODE_H
-#include <boost/scoped_ptr.hpp>
-#include <boost/thread.hpp>
#include <list>
#include <memory>
#include <string>
+#include <boost/scoped_ptr.hpp>
+#include <boost/thread.hpp>
#include "exec/blocking-join-node.h"
#include "exec/exec-node.h"
#include "exec/partitioned-hash-join-builder.h"
-#include "runtime/buffered-block-mgr.h"
#include "gen-cpp/Types_types.h"
namespace impala {
class BloomFilter;
-class BufferedBlockMgr;
-class BufferedTupleStream;
class MemPool;
class RowBatch;
class RuntimeFilter;
@@ -100,8 +96,6 @@ class TupleRow;
/// NULLs into several different streams, which are processed in a separate step to
/// produce additional output rows. The NAAJ algorithm is documented in more detail in
/// header comments for the null aware functions and data structures.
-///
-/// TODO: don't copy tuple rows so often.
class PartitionedHashJoinNode : public BlockingJoinNode {
public:
PartitionedHashJoinNode(ObjectPool* pool, const TPlanNode& tnode,
@@ -168,7 +162,7 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
/// Creates an initialized probe partition at 'partition_idx' in
/// 'probe_hash_partitions_'.
void CreateProbePartition(
- int partition_idx, std::unique_ptr<BufferedTupleStream> probe_rows);
+ int partition_idx, std::unique_ptr<BufferedTupleStreamV2> probe_rows);
/// Append the probe row 'row' to 'stream'. The stream must be unpinned and must have
/// a write buffer allocated, so this will succeed unless an error is encountered.
@@ -176,7 +170,7 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
/// return convention is used to avoid emitting unnecessary code for ~Status in perf-
/// critical code.
bool AppendProbeRow(
- BufferedTupleStream* stream, TupleRow* row, Status* status) WARN_UNUSED_RESULT;
+ BufferedTupleStreamV2* stream, TupleRow* row, Status* status) WARN_UNUSED_RESULT;
/// Probes the hash table for rows matching the current probe row and appends
/// all the matching build rows (with probe row) to output batch. Returns true
@@ -331,7 +325,7 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
/// conjuncts pass (i.e. there is a match).
/// This is used for NAAJ, when there are NULL probe rows.
Status EvaluateNullProbe(
- RuntimeState* state, BufferedTupleStream* build) WARN_UNUSED_RESULT;
+ RuntimeState* state, BufferedTupleStreamV2* build) WARN_UNUSED_RESULT;
/// Prepares to output NULLs on the probe side for NAAJ. Before calling this,
/// matched_null_probe_ should have been fully evaluated.
@@ -478,7 +472,7 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
/// For NAAJ, this stream contains all probe rows that had NULL on the hash table
/// conjuncts. Must be unique_ptr so we can release it and transfer to output batches.
- std::unique_ptr<BufferedTupleStream> null_probe_rows_;
+ std::unique_ptr<BufferedTupleStreamV2> null_probe_rows_;
/// For each row in null_probe_rows_, true if this row has matched any build row
/// (i.e. the resulting joined row passes other_join_conjuncts).
@@ -510,7 +504,7 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
/// that has been prepared for writing with an I/O-sized write buffer.
ProbePartition(RuntimeState* state, PartitionedHashJoinNode* parent,
PhjBuilder::Partition* build_partition,
- std::unique_ptr<BufferedTupleStream> probe_rows);
+ std::unique_ptr<BufferedTupleStreamV2> probe_rows);
~ProbePartition();
/// Prepare to read the probe rows. Allocates the first read block, so reads will
@@ -523,21 +517,19 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
/// resources if 'batch' is NULL. Idempotent.
void Close(RowBatch* batch);
- BufferedTupleStream* ALWAYS_INLINE probe_rows() { return probe_rows_.get(); }
+ BufferedTupleStreamV2* ALWAYS_INLINE probe_rows() { return probe_rows_.get(); }
PhjBuilder::Partition* build_partition() { return build_partition_; }
inline bool IsClosed() const { return probe_rows_ == NULL; }
private:
- PartitionedHashJoinNode* parent_;
-
/// The corresponding build partition. Not NULL. Owned by PhjBuilder.
PhjBuilder::Partition* build_partition_;
/// Stream of probe tuples in this partition. Initially owned by this object but
/// transferred to the parent exec node (via the row batch) when the partition
/// is complete. If NULL, ownership was transferred and the partition is closed.
- std::unique_ptr<BufferedTupleStream> probe_rows_;
+ std::unique_ptr<BufferedTupleStreamV2> probe_rows_;
};
/// For the below codegen'd functions, xxx_fn_level0_ uses CRC hashing when available
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/exec/partitioned-hash-join-node.inline.h
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-node.inline.h b/be/src/exec/partitioned-hash-join-node.inline.h
index a53b40e..3441aac 100644
--- a/be/src/exec/partitioned-hash-join-node.inline.h
+++ b/be/src/exec/partitioned-hash-join-node.inline.h
@@ -20,7 +20,7 @@
#include "exec/partitioned-hash-join-node.h"
-#include "runtime/buffered-tuple-stream.inline.h"
+#include "runtime/buffered-tuple-stream-v2.inline.h"
namespace impala {
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/exec/sort-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/sort-node.cc b/be/src/exec/sort-node.cc
index fd42124..440f809 100644
--- a/be/src/exec/sort-node.cc
+++ b/be/src/exec/sort-node.cc
@@ -52,9 +52,12 @@ Status SortNode::Prepare(RuntimeState* state) {
SCOPED_TIMER(runtime_profile_->total_time_counter());
RETURN_IF_ERROR(ExecNode::Prepare(state));
less_than_.reset(new TupleRowComparator(ordering_exprs_, is_asc_order_, nulls_first_));
- sorter_.reset(new Sorter(*less_than_, sort_tuple_exprs_,
- &row_descriptor_, mem_tracker(), runtime_profile(), state));
+ sorter_.reset(
+ new Sorter(*less_than_, sort_tuple_exprs_, &row_descriptor_, mem_tracker(),
+ &buffer_pool_client_, resource_profile_.spillable_buffer_size,
+ runtime_profile(), state, id(), true));
RETURN_IF_ERROR(sorter_->Prepare(pool_, expr_mem_pool()));
+ DCHECK_GE(resource_profile_.min_reservation, sorter_->ComputeMinReservation());
AddCodegenDisabledMessage(state);
return Status::OK();
}
@@ -69,9 +72,13 @@ void SortNode::Codegen(RuntimeState* state) {
Status SortNode::Open(RuntimeState* state) {
SCOPED_TIMER(runtime_profile_->total_time_counter());
- // Open the child before consuming resources in this node.
- RETURN_IF_ERROR(child(0)->Open(state));
RETURN_IF_ERROR(ExecNode::Open(state));
+ RETURN_IF_ERROR(child(0)->Open(state));
+ // Claim reservation after the child has been opened to reduce the peak reservation
+ // requirement.
+ if (!buffer_pool_client_.is_registered()) {
+ RETURN_IF_ERROR(ClaimBufferReservation(state));
+ }
RETURN_IF_ERROR(less_than_->Open(pool_, state, expr_mem_pool()));
RETURN_IF_ERROR(sorter_->Open());
RETURN_IF_CANCELLED(state);
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/exec/sort-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/sort-node.h b/be/src/exec/sort-node.h
index 8b3de11..a11d424 100644
--- a/be/src/exec/sort-node.h
+++ b/be/src/exec/sort-node.h
@@ -20,13 +20,12 @@
#include "exec/exec-node.h"
#include "runtime/sorter.h"
-#include "runtime/buffered-block-mgr.h"
namespace impala {
/// Node that implements a full sort of its input with a fixed memory budget, spilling
/// to disk if the input is larger than available memory.
-/// Uses Sorter and BufferedBlockMgr for the external sort implementation.
+/// Uses Sorter for the external sort implementation.
/// Input rows to SortNode are materialized by the Sorter into a single tuple
/// using the expressions specified in sort_tuple_exprs_.
/// In GetNext(), SortNode passes in the output batch to the sorter instance created
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/runtime/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt
index 2de0f2e..92af968 100644
--- a/be/src/runtime/CMakeLists.txt
+++ b/be/src/runtime/CMakeLists.txt
@@ -24,8 +24,6 @@ set(LIBRARY_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/runtime")
set(EXECUTABLE_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/runtime")
add_library(Runtime
- buffered-block-mgr.cc
- buffered-tuple-stream.cc
buffered-tuple-stream-v2.cc
client-cache.cc
coordinator.cc
@@ -45,6 +43,7 @@ add_library(Runtime
hbase-table.cc
hbase-table-factory.cc
hdfs-fs-cache.cc
+ initial-reservations.cc
lib-cache.cc
mem-tracker.cc
mem-pool.cc
@@ -83,7 +82,6 @@ ADD_BE_TEST(string-buffer-test)
ADD_BE_TEST(data-stream-test)
ADD_BE_TEST(timestamp-test)
ADD_BE_TEST(disk-io-mgr-test)
-ADD_BE_TEST(buffered-block-mgr-test)
ADD_BE_TEST(parallel-executor-test)
ADD_BE_TEST(raw-value-test)
ADD_BE_TEST(string-compare-test)
@@ -93,7 +91,6 @@ ADD_BE_TEST(thread-resource-mgr-test)
ADD_BE_TEST(mem-tracker-test)
ADD_BE_TEST(multi-precision-test)
ADD_BE_TEST(decimal-test)
-ADD_BE_TEST(buffered-tuple-stream-test)
ADD_BE_TEST(buffered-tuple-stream-v2-test)
ADD_BE_TEST(hdfs-fs-cache-test)
ADD_BE_TEST(tmp-file-mgr-test)