You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2017/08/05 03:18:22 UTC
[11/11] incubator-impala git commit: IMPALA-4674: Part 2: port
backend exec to BufferPool
IMPALA-4674: Part 2: port backend exec to BufferPool
Always create global BufferPool at startup using 80% of memory and
limit reservations to 80% of query memory (same as BufferedBlockMgr).
The query's initial reservation is computed in the planner, claimed
centrally (managed by the InitialReservations class) and distributed
to query operators from there.
min_spillable_buffer_size and default_spillable_buffer_size query
options control the buffer size that the planner selects for
spilling operators.
Port ExecNodes to use BufferPool:
* Each ExecNode has to claim its reservation during Open()
* Port Sorter to use BufferPool.
* Switch from BufferedTupleStream to BufferedTupleStreamV2
* Port HashTable to use BufferPool via a Suballocator.
This also makes PAGG memory consumption more efficient (avoid wasting buffers)
and improve the spilling algorithm:
* Allow preaggs to execute with 0 reservation - if streams and hash tables
cannot be allocated, it will pass through rows.
* Halve the buffer requirement for spilling aggs - avoid allocating
buffers for aggregated and unaggregated streams simultaneously.
* Rebuild spilled partitions instead of repartitioning (IMPALA-2708)
TODO in follow-up patches:
* Rename BufferedTupleStreamV2 to BufferedTupleStream
* Implement max_row_size query option.
Testing:
* Updated tests to reflect new memory requirements
Change-Id: I7fc7fe1c04e9dfb1a0c749fb56a5e0f2bf9c6c3e
Reviewed-on: http://gerrit.cloudera.org:8080/5801
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins
Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/a98b90bd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/a98b90bd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/a98b90bd
Branch: refs/heads/master
Commit: a98b90bd3877886e97dc2385cfdf5e3f95245533
Parents: d5b0c6b
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Wed Mar 16 16:09:36 2016 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Sat Aug 5 01:03:02 2017 +0000
----------------------------------------------------------------------
be/src/codegen/gen_ir_descriptions.py | 2 +-
be/src/exec/analytic-eval-node.cc | 53 +-
be/src/exec/analytic-eval-node.h | 25 +-
be/src/exec/exec-node.cc | 45 +-
be/src/exec/exec-node.h | 17 +
be/src/exec/hash-table-test.cc | 251 ++-
be/src/exec/hash-table.cc | 149 +-
be/src/exec/hash-table.h | 140 +-
be/src/exec/hash-table.inline.h | 20 +-
be/src/exec/nested-loop-join-builder.cc | 3 +-
be/src/exec/partial-sort-node.cc | 7 +-
be/src/exec/partial-sort-node.h | 1 -
be/src/exec/partitioned-aggregation-node-ir.cc | 20 +-
be/src/exec/partitioned-aggregation-node.cc | 639 ++++----
be/src/exec/partitioned-aggregation-node.h | 192 ++-
be/src/exec/partitioned-hash-join-builder-ir.cc | 12 +-
be/src/exec/partitioned-hash-join-builder.cc | 159 +-
be/src/exec/partitioned-hash-join-builder.h | 76 +-
be/src/exec/partitioned-hash-join-node-ir.cc | 7 +-
be/src/exec/partitioned-hash-join-node.cc | 136 +-
be/src/exec/partitioned-hash-join-node.h | 26 +-
be/src/exec/partitioned-hash-join-node.inline.h | 2 +-
be/src/exec/sort-node.cc | 15 +-
be/src/exec/sort-node.h | 3 +-
be/src/runtime/CMakeLists.txt | 5 +-
be/src/runtime/buffered-block-mgr-test.cc | 1547 ------------------
be/src/runtime/buffered-block-mgr.cc | 1254 --------------
be/src/runtime/buffered-block-mgr.h | 606 -------
be/src/runtime/buffered-tuple-stream-test.cc | 1264 --------------
be/src/runtime/buffered-tuple-stream.cc | 903 ----------
be/src/runtime/buffered-tuple-stream.h | 561 -------
be/src/runtime/buffered-tuple-stream.inline.h | 59 -
be/src/runtime/bufferpool/buffer-pool.cc | 12 +-
be/src/runtime/bufferpool/buffer-pool.h | 8 +
be/src/runtime/bufferpool/reservation-tracker.h | 4 +
be/src/runtime/disk-io-mgr.cc | 7 +-
be/src/runtime/exec-env.cc | 35 +-
be/src/runtime/exec-env.h | 4 +-
be/src/runtime/fragment-instance-state.cc | 2 -
be/src/runtime/initial-reservations.cc | 90 +
be/src/runtime/initial-reservations.h | 79 +
be/src/runtime/query-exec-mgr.cc | 2 +
be/src/runtime/query-state.cc | 91 +-
be/src/runtime/query-state.h | 51 +-
be/src/runtime/row-batch.cc | 19 -
be/src/runtime/row-batch.h | 13 -
be/src/runtime/runtime-filter.h | 1 +
be/src/runtime/runtime-state.cc | 52 +-
be/src/runtime/runtime-state.h | 32 +-
be/src/runtime/sorter.cc | 1058 ++++++------
be/src/runtime/sorter.h | 65 +-
be/src/runtime/test-env.cc | 23 +-
be/src/runtime/test-env.h | 9 +-
be/src/runtime/tmp-file-mgr-test.cc | 10 +-
be/src/runtime/tmp-file-mgr.h | 23 +-
be/src/service/client-request-state.cc | 4 +-
be/src/service/query-options.cc | 28 +-
be/src/service/query-options.h | 6 +-
be/src/util/bloom-filter.h | 2 +-
be/src/util/static-asserts.cc | 2 -
common/thrift/Frontend.thrift | 16 +-
common/thrift/ImpalaInternalService.thrift | 22 +-
common/thrift/ImpalaService.thrift | 8 +-
common/thrift/PlanNodes.thrift | 18 +
common/thrift/generate_error_codes.py | 10 +-
.../org/apache/impala/common/RuntimeEnv.java | 10 -
.../apache/impala/planner/AggregationNode.java | 11 +-
.../apache/impala/planner/AnalyticEvalNode.java | 7 +-
.../impala/planner/DataSourceScanNode.java | 2 +-
.../apache/impala/planner/DataStreamSink.java | 2 +-
.../org/apache/impala/planner/EmptySetNode.java | 2 +-
.../org/apache/impala/planner/ExchangeNode.java | 2 +-
.../apache/impala/planner/HBaseScanNode.java | 2 +-
.../apache/impala/planner/HBaseTableSink.java | 2 +-
.../org/apache/impala/planner/HashJoinNode.java | 8 +-
.../org/apache/impala/planner/HdfsScanNode.java | 4 +-
.../apache/impala/planner/HdfsTableSink.java | 2 +-
.../apache/impala/planner/JoinBuildSink.java | 2 +-
.../org/apache/impala/planner/KuduScanNode.java | 2 +-
.../apache/impala/planner/KuduTableSink.java | 2 +-
.../impala/planner/NestedLoopJoinNode.java | 5 +-
.../org/apache/impala/planner/PlanNode.java | 14 +-
.../org/apache/impala/planner/PlanRootSink.java | 2 +-
.../java/org/apache/impala/planner/Planner.java | 12 +-
.../apache/impala/planner/ResourceProfile.java | 72 +-
.../org/apache/impala/planner/SelectNode.java | 2 +-
.../impala/planner/SingularRowSrcNode.java | 2 +-
.../org/apache/impala/planner/SortNode.java | 47 +-
.../org/apache/impala/planner/SubplanNode.java | 2 +-
.../org/apache/impala/planner/UnionNode.java | 2 +-
.../org/apache/impala/planner/UnnestNode.java | 2 +-
.../org/apache/impala/service/Frontend.java | 2 +-
.../org/apache/impala/planner/PlannerTest.java | 2 -
.../queries/PlannerTest/constant-folding.test | 32 +-
.../queries/PlannerTest/disable-codegen.test | 2 +-
.../PlannerTest/fk-pk-join-detection.test | 52 +-
.../queries/PlannerTest/mt-dop-validation.test | 30 +-
.../queries/PlannerTest/parquet-filtering.test | 6 +-
.../PlannerTest/resource-requirements.test | 418 ++---
.../PlannerTest/sort-expr-materialization.test | 30 +-
.../PlannerTest/spillable-buffer-sizing.test | 112 +-
.../queries/PlannerTest/tablesample.test | 4 +-
.../queries/QueryTest/analytic-fns.test | 12 +-
.../queries/QueryTest/explain-level0.test | 2 +-
.../queries/QueryTest/explain-level1.test | 2 +-
.../queries/QueryTest/explain-level2.test | 6 +-
.../queries/QueryTest/explain-level3.test | 6 +-
.../queries/QueryTest/nested-types-tpch.test | 24 +-
.../QueryTest/runtime_row_filters_phj.test | 5 +-
...ingle-node-joins-with-limits-exhaustive.test | 2 +-
.../QueryTest/single-node-large-sorts.test | 2 +-
.../queries/QueryTest/spilling.test | 87 +-
.../targeted-stress/queries/agg_stress.test | 2 +-
.../workloads/tpch/queries/insert_parquet.test | 2 +
tests/comparison/discrepancy_searcher.py | 4 +-
tests/custom_cluster/test_scratch_disk.py | 12 +-
tests/custom_cluster/test_spilling.py | 47 -
tests/query_test/test_cancellation.py | 10 +-
tests/query_test/test_mem_usage_scaling.py | 31 +-
tests/query_test/test_nested_types.py | 1 -
tests/query_test/test_scratch_limit.py | 12 +-
tests/query_test/test_sort.py | 26 +-
tests/query_test/test_spilling.py | 39 +
123 files changed, 2885 insertions(+), 8366 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/codegen/gen_ir_descriptions.py
----------------------------------------------------------------------
diff --git a/be/src/codegen/gen_ir_descriptions.py b/be/src/codegen/gen_ir_descriptions.py
index 94dc86a..be4be80 100755
--- a/be/src/codegen/gen_ir_descriptions.py
+++ b/be/src/codegen/gen_ir_descriptions.py
@@ -119,7 +119,7 @@ ir_functions = [
["PHJ_PROCESS_PROBE_BATCH_FULL_OUTER_JOIN",
"_ZN6impala23PartitionedHashJoinNode17ProcessProbeBatchILi8EEEiNS_13TPrefetchMode4typeEPNS_8RowBatchEPNS_12HashTableCtxEPNS_6StatusE"],
["PHJ_INSERT_BATCH",
- "_ZN6impala10PhjBuilder9Partition11InsertBatchENS_13TPrefetchMode4typeEPNS_12HashTableCtxEPNS_8RowBatchERKSt6vectorINS_19BufferedTupleStream6RowIdxESaISA_EE"],
+ "_ZN6impala10PhjBuilder9Partition11InsertBatchENS_13TPrefetchMode4typeEPNS_12HashTableCtxEPNS_8RowBatchERKSt6vectorIPhSaIS9_EEPNS_6StatusE"],
["HASH_TABLE_GET_HASH_SEED",
"_ZNK6impala12HashTableCtx11GetHashSeedEv"],
["HASH_TABLE_GET_BUILD_EXPR_EVALUATORS",
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/exec/analytic-eval-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/analytic-eval-node.cc b/be/src/exec/analytic-eval-node.cc
index b789188..f6d96ae 100644
--- a/be/src/exec/analytic-eval-node.cc
+++ b/be/src/exec/analytic-eval-node.cc
@@ -23,9 +23,10 @@
#include "exprs/agg-fn-evaluator.h"
#include "exprs/scalar-expr.h"
#include "exprs/scalar-expr-evaluator.h"
-#include "runtime/buffered-tuple-stream.inline.h"
+#include "runtime/buffered-tuple-stream-v2.inline.h"
#include "runtime/descriptors.h"
#include "runtime/mem-tracker.h"
+#include "runtime/query-state.h"
#include "runtime/row-batch.h"
#include "runtime/runtime-state.h"
#include "udf/udf-internal.h"
@@ -34,13 +35,14 @@
#include "common/names.h"
static const int MAX_TUPLE_POOL_SIZE = 8 * 1024 * 1024; // 8MB
+static const int MIN_REQUIRED_BUFFERS = 2;
using namespace strings;
namespace impala {
-AnalyticEvalNode::AnalyticEvalNode(ObjectPool* pool, const TPlanNode& tnode,
- const DescriptorTbl& descs)
+AnalyticEvalNode::AnalyticEvalNode(
+ ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
: ExecNode(pool, tnode, descs),
window_(tnode.analytic_node.window),
intermediate_tuple_desc_(
@@ -51,7 +53,6 @@ AnalyticEvalNode::AnalyticEvalNode(ObjectPool* pool, const TPlanNode& tnode,
rows_end_offset_(0),
has_first_val_null_offset_(false),
first_val_null_offset_(0),
- client_(nullptr),
child_tuple_cmp_row_(nullptr),
last_result_idx_(-1),
prev_pool_last_result_idx_(-1),
@@ -110,6 +111,7 @@ AnalyticEvalNode::~AnalyticEvalNode() {
Status AnalyticEvalNode::Init(const TPlanNode& tnode, RuntimeState* state) {
RETURN_IF_ERROR(ExecNode::Init(tnode, state));
DCHECK_EQ(conjunct_evals_.size(), 0);
+ state_ = state;
const TAnalyticNode& analytic_node = tnode.analytic_node;
bool has_lead_fn = false;
@@ -154,6 +156,8 @@ Status AnalyticEvalNode::Prepare(RuntimeState* state) {
SCOPED_TIMER(runtime_profile_->total_time_counter());
RETURN_IF_ERROR(ExecNode::Prepare(state));
DCHECK(child(0)->row_desc()->IsPrefixOf(*row_desc()));
+ DCHECK_GE(resource_profile_.min_reservation,
+ resource_profile_.spillable_buffer_size * MIN_REQUIRED_BUFFERS);
curr_tuple_pool_.reset(new MemPool(mem_tracker()));
prev_tuple_pool_.reset(new MemPool(mem_tracker()));
mem_pool_.reset(new MemPool(mem_tracker()));
@@ -175,12 +179,6 @@ Status AnalyticEvalNode::Prepare(RuntimeState* state) {
fn_pool_.get(), &order_by_eq_expr_eval_));
AddEvaluatorToFree(order_by_eq_expr_eval_);
}
-
- // Must be kept in sync with AnalyticEvalNode.computeResourceProfile() in fe.
- const int MIN_REQUIRED_BUFFERS = 2;
- RETURN_IF_ERROR(state->block_mgr()->RegisterClient(
- Substitute("AnalyticEvalNode id=$0 ptr=$1", id_, this),
- MIN_REQUIRED_BUFFERS, false, mem_tracker(), state, &client_));
return Status::OK();
}
@@ -190,22 +188,20 @@ Status AnalyticEvalNode::Open(RuntimeState* state) {
RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR(QueryMaintenance(state));
RETURN_IF_ERROR(child(0)->Open(state));
- DCHECK(client_ != nullptr);
- DCHECK(input_stream_ == nullptr);
- input_stream_.reset(
- new BufferedTupleStream(state, child(0)->row_desc(), state->block_mgr(), client_,
- false /* use_initial_small_buffers */, true /* read_write */));
- RETURN_IF_ERROR(input_stream_->Init(id(), runtime_profile(), true));
- bool got_write_buffer;
- RETURN_IF_ERROR(input_stream_->PrepareForWrite(&got_write_buffer));
- if (!got_write_buffer) {
- return state->block_mgr()->MemLimitTooLowError(client_, id());
- }
- bool got_read_buffer;
- RETURN_IF_ERROR(input_stream_->PrepareForRead(true, &got_read_buffer));
- if (!got_read_buffer) {
- return state->block_mgr()->MemLimitTooLowError(client_, id());
+
+ // Claim reservation after the child has been opened to reduce the peak reservation
+ // requirement.
+ if (!buffer_pool_client_.is_registered()) {
+ RETURN_IF_ERROR(ClaimBufferReservation(state));
}
+ DCHECK(input_stream_ == nullptr);
+ input_stream_.reset(new BufferedTupleStreamV2(state, child(0)->row_desc(),
+ &buffer_pool_client_, resource_profile_.spillable_buffer_size,
+ resource_profile_.spillable_buffer_size));
+ RETURN_IF_ERROR(input_stream_->Init(id(), true));
+ bool success;
+ RETURN_IF_ERROR(input_stream_->PrepareForReadWrite(true, &success));
+ DCHECK(success) << "Had reservation: " << buffer_pool_client_.DebugString();
for (int i = 0; i < analytic_fn_evals_.size(); ++i) {
RETURN_IF_ERROR(analytic_fn_evals_[i]->Open(state));
@@ -366,8 +362,8 @@ inline Status AnalyticEvalNode::AddRow(int64_t stream_idx, TupleRow* row) {
// the stream and continue writing/reading in unpinned mode.
// TODO: Consider re-pinning later if the output stream is fully consumed.
RETURN_IF_ERROR(status);
- RETURN_IF_ERROR(
- input_stream_->UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT));
+ RETURN_IF_ERROR(state_->StartSpilling(mem_tracker()));
+ input_stream_->UnpinStream(BufferedTupleStreamV2::UNPIN_ALL_EXCEPT_CURRENT);
VLOG_FILE << id() << " Unpin input stream while adding row idx=" << stream_idx;
if (!input_stream_->AddRow(row, &status)) {
// Rows should be added in unpinned mode unless an error occurs.
@@ -627,7 +623,7 @@ Status AnalyticEvalNode::ProcessChildBatch(RuntimeState* state) {
<< " tuple pool size:" << curr_tuple_pool_->total_allocated_bytes();
SCOPED_TIMER(evaluation_timer_);
- // BufferedTupleStream::num_rows() returns the total number of rows that have been
+ // BufferedTupleStreamV2::num_rows() returns the total number of rows that have been
// inserted into the stream (it does not decrease when we read rows), so the index of
// the next input row that will be inserted will be the current size of the stream.
int64_t stream_idx = input_stream_->num_rows();
@@ -857,7 +853,6 @@ Status AnalyticEvalNode::Reset(RuntimeState* state) {
void AnalyticEvalNode::Close(RuntimeState* state) {
if (is_closed()) return;
- if (client_ != nullptr) state->block_mgr()->ClearReservations(client_);
// We may need to clean up input_stream_ if an error occurred at some point.
if (input_stream_ != nullptr) {
input_stream_->Close(nullptr, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/exec/analytic-eval-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/analytic-eval-node.h b/be/src/exec/analytic-eval-node.h
index 89c5cf3..eab9198 100644
--- a/be/src/exec/analytic-eval-node.h
+++ b/be/src/exec/analytic-eval-node.h
@@ -19,8 +19,7 @@
#define IMPALA_EXEC_ANALYTIC_EVAL_NODE_H
#include "exec/exec-node.h"
-#include "runtime/buffered-block-mgr.h"
-#include "runtime/buffered-tuple-stream.h"
+#include "runtime/buffered-tuple-stream-v2.h"
#include "runtime/tuple.h"
namespace impala {
@@ -189,6 +188,10 @@ class AnalyticEvalNode : public ExecNode {
/// Debug string containing the window definition.
std::string DebugWindowString() const;
+ /// The RuntimeState for the fragment instance containing this AnalyticEvalNode. Set
+ /// in Init().
+ RuntimeState* state_;
+
/// Window over which the analytic functions are evaluated. Only used if fn_scope_
/// is ROWS or RANGE.
/// TODO: fn_scope_ and window_ are candidates to be removed during codegen
@@ -254,9 +257,6 @@ class AnalyticEvalNode : public ExecNode {
boost::scoped_ptr<MemPool> curr_tuple_pool_;
boost::scoped_ptr<MemPool> prev_tuple_pool_;
- /// Block manager client used by input_stream_. Not owned.
- BufferedBlockMgr::Client* client_ = nullptr;
-
/////////////////////////////////////////
/// BEGIN: Members that must be Reset()
@@ -330,15 +330,16 @@ class AnalyticEvalNode : public ExecNode {
/// Buffers input rows added in ProcessChildBatch() until enough rows are able to
/// be returned by GetNextOutputBatch(), in which case row batches are returned from
- /// the front of the stream and the underlying buffered blocks are deleted once read.
+ /// the front of the stream and the underlying buffers are deleted once read.
/// The number of rows that must be buffered may vary from an entire partition (e.g.
- /// no order by clause) to a single row (e.g. ROWS windows). When the amount of
- /// buffered data exceeds the available memory in the underlying BufferedBlockMgr,
- /// input_stream_ is unpinned (i.e., possibly spilled to disk if necessary).
- /// The input stream owns tuple data backing rows returned in GetNext(). The blocks
- /// with tuple data are attached to an output row batch on eos or ReachedLimit().
+ /// no order by clause) to a single row (e.g. ROWS windows). If the amount of buffered
+ /// data in 'input_stream_' exceeds the ExecNode's buffer reservation and the stream
+ /// cannot increase the reservation, then 'input_stream_' is unpinned (i.e., spilled to
+ /// disk). The input stream owns tuple data backing rows returned in GetNext(). The
+ /// buffers with tuple data are attached to an output row batch on eos or
+ /// ReachedLimit().
/// TODO: Consider re-pinning unpinned streams when possible.
- boost::scoped_ptr<BufferedTupleStream> input_stream_;
+ boost::scoped_ptr<BufferedTupleStreamV2> input_stream_;
/// Pool used for O(1) allocations that live until Close() or Reset().
/// Does not own data backing tuples returned in GetNext(), so it does not
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/exec/exec-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/exec-node.cc b/be/src/exec/exec-node.cc
index c3d9946..61c8d40 100644
--- a/be/src/exec/exec-node.cc
+++ b/be/src/exec/exec-node.cc
@@ -34,10 +34,10 @@
#include "exec/empty-set-node.h"
#include "exec/exchange-node.h"
#include "exec/hbase-scan-node.h"
-#include "exec/hdfs-scan-node.h"
#include "exec/hdfs-scan-node-mt.h"
-#include "exec/kudu-scan-node.h"
+#include "exec/hdfs-scan-node.h"
#include "exec/kudu-scan-node-mt.h"
+#include "exec/kudu-scan-node.h"
#include "exec/kudu-util.h"
#include "exec/nested-loop-join-node.h"
#include "exec/partial-sort-node.h"
@@ -50,9 +50,14 @@
#include "exec/topn-node.h"
#include "exec/union-node.h"
#include "exec/unnest-node.h"
+#include "exprs/expr.h"
+#include "gutil/strings/substitute.h"
#include "runtime/descriptors.h"
-#include "runtime/mem-tracker.h"
+#include "runtime/exec-env.h"
+#include "runtime/initial-reservations.h"
#include "runtime/mem-pool.h"
+#include "runtime/mem-tracker.h"
+#include "runtime/query-state.h"
#include "runtime/row-batch.h"
#include "runtime/runtime-state.h"
#include "util/debug-util.h"
@@ -61,7 +66,10 @@
#include "common/names.h"
using namespace llvm;
+using strings::Substitute;
+DECLARE_int32(be_port);
+DECLARE_string(hostname);
DEFINE_bool(enable_partitioned_hash_join, true, "Deprecated - has no effect");
DEFINE_bool(enable_partitioned_aggregation, true, "Deprecated - has no effect");
@@ -116,6 +124,7 @@ ExecNode::ExecNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl
type_(tnode.node_type),
pool_(pool),
row_descriptor_(descs, tnode.row_tuples, tnode.nullable_tuples),
+ resource_profile_(tnode.resource_profile),
debug_phase_(TExecNodePhase::INVALID),
debug_action_(TDebugAction::WAIT),
limit_(tnode.limit),
@@ -195,7 +204,12 @@ void ExecNode::Close(RuntimeState* state) {
ScalarExprEvaluator::Close(conjunct_evals_, state);
ScalarExpr::Close(conjuncts_);
if (expr_mem_pool() != nullptr) expr_mem_pool_->FreeAll();
-
+ if (buffer_pool_client_.is_registered()) {
+ VLOG_FILE << id_ << " returning reservation " << resource_profile_.min_reservation;
+ state->query_state()->initial_reservations()->Return(
+ &buffer_pool_client_, resource_profile_.min_reservation);
+ state->exec_env()->buffer_pool()->DeregisterClient(&buffer_pool_client_);
+ }
if (mem_tracker() != NULL && mem_tracker()->consumption() != 0) {
LOG(WARNING) << "Query " << state->query_id() << " may have leaked memory." << endl
<< state->instance_mem_tracker()->LogUsage();
@@ -204,6 +218,29 @@ void ExecNode::Close(RuntimeState* state) {
}
}
+Status ExecNode::ClaimBufferReservation(RuntimeState* state) {
+ DCHECK(!buffer_pool_client_.is_registered());
+ BufferPool* buffer_pool = ExecEnv::GetInstance()->buffer_pool();
+ // Check the minimum buffer size in case the minimum buffer size used by the planner
+ // doesn't match this backend's.
+ if (resource_profile_.__isset.spillable_buffer_size &&
+ resource_profile_.spillable_buffer_size < buffer_pool->min_buffer_len()) {
+ return Status(Substitute("Spillable buffer size for node $0 of $1 bytes is less "
+ "than the minimum buffer pool buffer size of $2 bytes",
+ id_, resource_profile_.spillable_buffer_size, buffer_pool->min_buffer_len()));
+ }
+
+ RETURN_IF_ERROR(buffer_pool->RegisterClient(
+ Substitute("$0 id=$1 ptr=$2", PrintPlanNodeType(type_), id_, this),
+ state->query_state()->file_group(), state->instance_buffer_reservation(),
+ mem_tracker(), resource_profile_.max_reservation, runtime_profile(),
+ &buffer_pool_client_));
+ VLOG_FILE << id_ << " claiming reservation " << resource_profile_.min_reservation;
+ state->query_state()->initial_reservations()->Claim(
+ &buffer_pool_client_, resource_profile_.min_reservation);
+ return Status::OK();
+}
+
Status ExecNode::CreateTree(
RuntimeState* state, const TPlan& plan, const DescriptorTbl& descs, ExecNode** root) {
if (plan.nodes.size() == 0) {
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/exec/exec-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/exec-node.h b/be/src/exec/exec-node.h
index a107f62..60efff0 100644
--- a/be/src/exec/exec-node.h
+++ b/be/src/exec/exec-node.h
@@ -26,6 +26,8 @@
#include "common/status.h"
#include "exprs/scalar-expr-evaluator.h"
#include "gen-cpp/PlanNodes_types.h"
+#include "runtime/bufferpool/buffer-pool.h"
+#include "runtime/bufferpool/reservation-tracker.h"
#include "runtime/descriptors.h" // for RowDescriptor
#include "util/blocking-queue.h"
#include "util/runtime-profile.h"
@@ -227,6 +229,12 @@ class ExecNode {
protected:
friend class DataSink;
+ /// Initialize 'buffer_pool_client_' and claim the initial reservation for this
+ /// ExecNode. Only needs to be called by ExecNodes that will use the client.
+ /// The client is automatically cleaned up in Close(). Should not be called if
+ /// the client is already open.
+ Status ClaimBufferReservation(RuntimeState* state);
+
/// Extends blocking queue for row batches. Row batches have a property that
/// they must be processed in the order they were produced, even in cancellation
/// paths. Preceding row batches can contain ptrs to memory in subsequent row batches
@@ -276,6 +284,9 @@ class ExecNode {
std::vector<ExecNode*> children_;
RowDescriptor row_descriptor_;
+ /// Resource information sent from the frontend.
+ const TBackendResourceProfile resource_profile_;
+
/// debug-only: if debug_action_ is not INVALID, node will perform action in
/// debug_phase_
TExecNodePhase::type debug_phase_;
@@ -298,6 +309,12 @@ class ExecNode {
/// Created in Prepare().
boost::scoped_ptr<MemPool> expr_mem_pool_;
+ /// Buffer pool client for this node. Initialized with the node's minimum reservation
+ /// in ClaimBufferReservation(). After initialization, the client must hold onto at
+ /// least the minimum reservation so that it can be returned to the initial
+ /// reservations pool in Close().
+ BufferPool::ClientHandle buffer_pool_client_;
+
bool is_closed() const { return is_closed_; }
/// Pointer to the containing SubplanNode or NULL if not inside a subplan.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/exec/hash-table-test.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hash-table-test.cc b/be/src/exec/hash-table-test.cc
index 42bc7e1..7a6ec9d 100644
--- a/be/src/exec/hash-table-test.cc
+++ b/be/src/exec/hash-table-test.cc
@@ -17,24 +17,27 @@
#include <boost/scoped_ptr.hpp>
-#include <stdlib.h>
#include <stdio.h>
+#include <stdlib.h>
#include <iostream>
+#include <limits>
#include <vector>
-#include "testutil/gtest-util.h"
#include "common/compiler-util.h"
#include "common/init.h"
#include "exec/hash-table.inline.h"
#include "exprs/scalar-expr.h"
#include "exprs/scalar-expr-evaluator.h"
#include "exprs/slot-ref.h"
+#include "runtime/bufferpool/buffer-pool.h"
+#include "runtime/bufferpool/reservation-tracker.h"
#include "runtime/mem-pool.h"
#include "runtime/mem-tracker.h"
#include "runtime/string-value.h"
#include "runtime/test-env.h"
#include "runtime/tuple-row.h"
#include "service/fe-support.h"
+#include "testutil/gtest-util.h"
#include "util/cpu-info.h"
#include "util/runtime-profile-counters.h"
#include "util/test-info.h"
@@ -51,9 +54,16 @@ class HashTableTest : public testing::Test {
HashTableTest() : mem_pool_(&tracker_) {}
protected:
+ /// Temporary runtime environment for the hash table.
scoped_ptr<TestEnv> test_env_;
RuntimeState* runtime_state_;
+
+ /// Hash tables and associated clients - automatically closed in TearDown().
+ vector<BufferPool::ClientHandle*> clients_;
+ vector<HashTable*> hash_tables_;
+
ObjectPool pool_;
+ /// A dummy MemTracker used for exprs and other things we don't need to have limits on.
MemTracker tracker_;
MemPool mem_pool_;
vector<ScalarExpr*> build_exprs_;
@@ -83,6 +93,8 @@ class HashTableTest : public testing::Test {
ASSERT_OK(ScalarExprEvaluator::Create(probe_exprs_, nullptr, &pool_, &mem_pool_,
&probe_expr_evals_));
ASSERT_OK(ScalarExprEvaluator::Open(probe_expr_evals_, nullptr));
+
+ CreateTestEnv();
}
virtual void TearDown() {
@@ -90,9 +102,34 @@ class HashTableTest : public testing::Test {
ScalarExprEvaluator::Close(probe_expr_evals_, nullptr);
ScalarExpr::Close(build_exprs_);
ScalarExpr::Close(probe_exprs_);
+
+ for (HashTable* hash_table : hash_tables_) hash_table->Close();
+ hash_tables_.clear();
+
+ for (BufferPool::ClientHandle* client : clients_) {
+ test_env_->exec_env()->buffer_pool()->DeregisterClient(client);
+ }
+ clients_.clear();
+
runtime_state_ = nullptr;
test_env_.reset();
mem_pool_.FreeAll();
+ pool_.Clear();
+ }
+
+ /// Initialize test_env_ and runtime_state_ with the given page size and capacity
+ /// for the given number of pages. If test_env_ was already created, then re-creates it.
+ void CreateTestEnv(int64_t min_page_size = 64 * 1024,
+ int64_t buffer_bytes_limit = 4L * 1024 * 1024 * 1024) {
+ test_env_.reset(new TestEnv());
+ test_env_->SetBufferPoolArgs(min_page_size, buffer_bytes_limit);
+ ASSERT_OK(test_env_->Init());
+
+ TQueryOptions query_options;
+ query_options.__set_default_spillable_buffer_size(min_page_size);
+ query_options.__set_min_spillable_buffer_size(min_page_size);
+ query_options.__set_buffer_pool_limit(buffer_bytes_limit);
+ ASSERT_OK(test_env_->CreateQueryState(0, &query_options, &runtime_state_));
}
TupleRow* CreateTupleRow(int32_t val) {
@@ -116,8 +153,9 @@ class HashTableTest : public testing::Test {
// Wrapper to call private methods on HashTable
// TODO: understand google testing, there must be a more natural way to do this
- void ResizeTable(HashTable* table, int64_t new_size, HashTableCtx* ht_ctx) {
- table->ResizeBuckets(new_size, ht_ctx);
+ Status ResizeTable(
+ HashTable* table, int64_t new_size, HashTableCtx* ht_ctx, bool* success) {
+ return table->ResizeBuckets(new_size, ht_ctx, success);
}
// Do a full table scan on table. All values should be between [min,max). If
@@ -188,24 +226,41 @@ class HashTableTest : public testing::Test {
}
}
- // Construct hash table with custom block manager. Returns result of HashTable::Init()
- bool CreateHashTable(bool quadratic, int64_t initial_num_buckets,
- scoped_ptr<HashTable>* table, int block_size = 8 * 1024 * 1024,
- int max_num_blocks = 100, int reserved_blocks = 10) {
- EXPECT_OK(test_env_->CreateQueryStateWithBlockMgr(
- next_query_id_++, max_num_blocks, block_size, nullptr, &runtime_state_));
+ /// Construct hash table and buffer pool client.
+ /// Returns true if HashTable::Init() was successful. Created objects
+ /// and resources (e.g. reservations) are automatically freed in TearDown().
+ bool CreateHashTable(bool quadratic, int64_t initial_num_buckets, HashTable** table,
+ int64_t block_size = 8 * 1024 * 1024, int max_num_blocks = 100,
+ int initial_reserved_blocks = 10, int64_t suballocator_buffer_len = 64 * 1024) {
+ BufferPool* buffer_pool = test_env_->exec_env()->buffer_pool();
+ RuntimeProfile* profile = pool_.Add(new RuntimeProfile(&pool_, "ht"));
+
+ // Set up memory tracking for the hash table.
MemTracker* client_tracker =
pool_.Add(new MemTracker(-1, "client", runtime_state_->instance_mem_tracker()));
- BufferedBlockMgr::Client* client;
- EXPECT_OK(runtime_state_->block_mgr()->RegisterClient(
- "", reserved_blocks, false, client_tracker, runtime_state_, &client));
+ int64_t initial_reservation_bytes = block_size * initial_reserved_blocks;
+ int64_t max_reservation_bytes = block_size * max_num_blocks;
+
+ // Set up the memory allocator.
+ BufferPool::ClientHandle* client = pool_.Add(new BufferPool::ClientHandle);
+ clients_.push_back(client);
+ EXPECT_OK(buffer_pool->RegisterClient("", nullptr,
+ runtime_state_->instance_buffer_reservation(), client_tracker,
+ max_reservation_bytes, profile, client));
+ EXPECT_TRUE(client->IncreaseReservation(initial_reservation_bytes));
+ Suballocator* allocator =
+ pool_.Add(new Suballocator(buffer_pool, client, suballocator_buffer_len));
// Initial_num_buckets must be a power of two.
EXPECT_EQ(initial_num_buckets, BitUtil::RoundUpToPowerOfTwo(initial_num_buckets));
int64_t max_num_buckets = 1L << 31;
- table->reset(new HashTable(quadratic, runtime_state_, client, true, 1, nullptr,
- max_num_buckets, initial_num_buckets));
- return (*table)->Init();
+ *table = pool_.Add(new HashTable(
+ quadratic, allocator, true, 1, nullptr, max_num_buckets, initial_num_buckets));
+ hash_tables_.push_back(*table);
+ bool success;
+ Status status = (*table)->Init(&success);
+ EXPECT_OK(status);
+ return status.ok() && success;
}
// Constructs and closes a hash table.
@@ -229,14 +284,12 @@ class HashTableTest : public testing::Test {
EXPECT_EQ(*val_row4, 4);
// Create and close the hash table.
- scoped_ptr<HashTable> hash_table;
+ HashTable* hash_table;
bool initialized = CreateHashTable(quadratic, initial_num_buckets, &hash_table);
EXPECT_EQ(too_big, !initialized);
if (initialized && initial_num_buckets > 0) {
EXPECT_NE(hash_table->ByteSize(), 0);
}
-
- hash_table->Close();
}
// IMPALA-2897: Build rows that are equivalent (where nullptrs are counted as equivalent)
@@ -246,7 +299,7 @@ class HashTableTest : public testing::Test {
for (int i = 0; i < 2; ++i) build_rows[i] = CreateNullTupleRow();
// Create the hash table and insert the build rows
- scoped_ptr<HashTable> hash_table;
+ HashTable* hash_table;
ASSERT_TRUE(CreateHashTable(true, 1024, &hash_table));
scoped_ptr<HashTableCtx> ht_ctx;
EXPECT_OK(HashTableCtx::Create(&pool_, runtime_state_,
@@ -256,13 +309,15 @@ class HashTableTest : public testing::Test {
for (int i = 0; i < 2; ++i) {
if (!ht_ctx->EvalAndHashBuild(build_rows[i])) continue;
- BufferedTupleStream::RowIdx dummy_row_idx;
+ BufferedTupleStreamV2::FlatRowPtr dummy_flat_row = nullptr;
EXPECT_TRUE(hash_table->stores_tuples_);
- bool inserted = hash_table->Insert(ht_ctx.get(), dummy_row_idx, build_rows[i]);
+ Status status;
+ bool inserted =
+ hash_table->Insert(ht_ctx.get(), dummy_flat_row, build_rows[i], &status);
EXPECT_TRUE(inserted);
+ ASSERT_OK(status);
}
EXPECT_EQ(hash_table->num_buckets() - hash_table->EmptyBuckets(), 1);
- hash_table->Close();
ht_ctx->Close(runtime_state_);
}
@@ -282,7 +337,7 @@ class HashTableTest : public testing::Test {
}
// Create the hash table and insert the build rows
- scoped_ptr<HashTable> hash_table;
+ HashTable* hash_table;
ASSERT_TRUE(CreateHashTable(quadratic, initial_num_buckets, &hash_table));
scoped_ptr<HashTableCtx> ht_ctx;
Status status = HashTableCtx::Create(&pool_, runtime_state_, build_exprs_,
@@ -290,52 +345,57 @@ class HashTableTest : public testing::Test {
vector<bool>(build_exprs_.size(), false), 1, 0, 1, &mem_pool_, &ht_ctx);
EXPECT_OK(status);
EXPECT_OK(ht_ctx->Open(runtime_state_));
- bool success = hash_table->CheckAndResize(5, ht_ctx.get());
+ bool success;
+ EXPECT_OK(hash_table->CheckAndResize(5, ht_ctx.get(), &success));
ASSERT_TRUE(success);
for (int i = 0; i < 5; ++i) {
if (!ht_ctx->EvalAndHashBuild(build_rows[i])) continue;
- BufferedTupleStream::RowIdx dummy_row_idx;
+ BufferedTupleStreamV2::FlatRowPtr dummy_flat_row = nullptr;
EXPECT_TRUE(hash_table->stores_tuples_);
- bool inserted = hash_table->Insert(ht_ctx.get(), dummy_row_idx, build_rows[i]);
+ bool inserted =
+ hash_table->Insert(ht_ctx.get(), dummy_flat_row, build_rows[i], &status);
EXPECT_TRUE(inserted);
+ ASSERT_OK(status);
}
EXPECT_EQ(hash_table->size(), 5);
// Do a full table scan and validate returned pointers
- FullScan(hash_table.get(), ht_ctx.get(), 0, 5, true, scan_rows, build_rows);
- ProbeTest(hash_table.get(), ht_ctx.get(), probe_rows, 10, false);
+ FullScan(hash_table, ht_ctx.get(), 0, 5, true, scan_rows, build_rows);
+ ProbeTest(hash_table, ht_ctx.get(), probe_rows, 10, false);
// Double the size of the hash table and scan again.
- ResizeTable(hash_table.get(), 2048, ht_ctx.get());
+ EXPECT_OK(ResizeTable(hash_table, 2048, ht_ctx.get(), &success));
+ EXPECT_TRUE(success);
EXPECT_EQ(hash_table->num_buckets(), 2048);
EXPECT_EQ(hash_table->size(), 5);
memset(scan_rows, 0, sizeof(scan_rows));
- FullScan(hash_table.get(), ht_ctx.get(), 0, 5, true, scan_rows, build_rows);
- ProbeTest(hash_table.get(), ht_ctx.get(), probe_rows, 10, false);
+ FullScan(hash_table, ht_ctx.get(), 0, 5, true, scan_rows, build_rows);
+ ProbeTest(hash_table, ht_ctx.get(), probe_rows, 10, false);
// Try to shrink and scan again.
- ResizeTable(hash_table.get(), 64, ht_ctx.get());
+ EXPECT_OK(ResizeTable(hash_table, 64, ht_ctx.get(), &success));
+ EXPECT_TRUE(success);
EXPECT_EQ(hash_table->num_buckets(), 64);
EXPECT_EQ(hash_table->size(), 5);
memset(scan_rows, 0, sizeof(scan_rows));
- FullScan(hash_table.get(), ht_ctx.get(), 0, 5, true, scan_rows, build_rows);
- ProbeTest(hash_table.get(), ht_ctx.get(), probe_rows, 10, false);
+ FullScan(hash_table, ht_ctx.get(), 0, 5, true, scan_rows, build_rows);
+ ProbeTest(hash_table, ht_ctx.get(), probe_rows, 10, false);
// Resize to 8, which is the smallest value to fit the number of filled buckets.
- ResizeTable(hash_table.get(), 8, ht_ctx.get());
+ EXPECT_OK(ResizeTable(hash_table, 8, ht_ctx.get(), &success));
+ EXPECT_TRUE(success);
EXPECT_EQ(hash_table->num_buckets(), 8);
EXPECT_EQ(hash_table->size(), 5);
memset(scan_rows, 0, sizeof(scan_rows));
- FullScan(hash_table.get(), ht_ctx.get(), 0, 5, true, scan_rows, build_rows);
- ProbeTest(hash_table.get(), ht_ctx.get(), probe_rows, 10, false);
+ FullScan(hash_table, ht_ctx.get(), 0, 5, true, scan_rows, build_rows);
+ ProbeTest(hash_table, ht_ctx.get(), probe_rows, 10, false);
- hash_table->Close();
ht_ctx->Close(runtime_state_);
}
- void ScanTest(bool quadratic, int initial_size, int rows_to_insert,
- int additional_rows) {
- scoped_ptr<HashTable> hash_table;
+ void ScanTest(
+ bool quadratic, int initial_size, int rows_to_insert, int additional_rows) {
+ HashTable* hash_table;
ASSERT_TRUE(CreateHashTable(quadratic, initial_size, &hash_table));
int total_rows = rows_to_insert + additional_rows;
@@ -347,19 +407,21 @@ class HashTableTest : public testing::Test {
EXPECT_OK(ht_ctx->Open(runtime_state_));
// Add 1 row with val 1, 2 with val 2, etc.
+ bool success;
vector<TupleRow*> build_rows;
ProbeTestData* probe_rows = new ProbeTestData[total_rows];
probe_rows[0].probe_row = CreateTupleRow(0);
for (int val = 1; val <= rows_to_insert; ++val) {
- bool success = hash_table->CheckAndResize(val, ht_ctx.get());
+ EXPECT_OK(hash_table->CheckAndResize(val, ht_ctx.get(), &success));
EXPECT_TRUE(success) << " failed to resize: " << val;
probe_rows[val].probe_row = CreateTupleRow(val);
for (int i = 0; i < val; ++i) {
TupleRow* row = CreateTupleRow(val);
if (!ht_ctx->EvalAndHashBuild(row)) continue;
- BufferedTupleStream::RowIdx dummy_row_idx;
+ BufferedTupleStreamV2::FlatRowPtr dummy_flat_row = nullptr;
EXPECT_TRUE(hash_table->stores_tuples_);
- hash_table->Insert(ht_ctx.get(), dummy_row_idx, row);
+ ASSERT_TRUE(hash_table->Insert(ht_ctx.get(), dummy_flat_row, row, &status));
+ ASSERT_OK(status);
build_rows.push_back(row);
probe_rows[val].expected_build_rows.push_back(row);
}
@@ -371,21 +433,22 @@ class HashTableTest : public testing::Test {
}
// Test that all the builds were found.
- ProbeTest(hash_table.get(), ht_ctx.get(), probe_rows, total_rows, true);
+ ProbeTest(hash_table, ht_ctx.get(), probe_rows, total_rows, true);
// Resize and try again.
int target_size = BitUtil::RoundUpToPowerOfTwo(2 * total_rows);
- ResizeTable(hash_table.get(), target_size, ht_ctx.get());
+ EXPECT_OK(ResizeTable(hash_table, target_size, ht_ctx.get(), &success));
+ EXPECT_TRUE(success);
EXPECT_EQ(hash_table->num_buckets(), target_size);
- ProbeTest(hash_table.get(), ht_ctx.get(), probe_rows, total_rows, true);
+ ProbeTest(hash_table, ht_ctx.get(), probe_rows, total_rows, true);
target_size = BitUtil::RoundUpToPowerOfTwo(total_rows + 1);
- ResizeTable(hash_table.get(), target_size, ht_ctx.get());
+ EXPECT_OK(ResizeTable(hash_table, target_size, ht_ctx.get(), &success));
+ EXPECT_TRUE(success);
EXPECT_EQ(hash_table->num_buckets(), target_size);
- ProbeTest(hash_table.get(), ht_ctx.get(), probe_rows, total_rows, true);
+ ProbeTest(hash_table, ht_ctx.get(), probe_rows, total_rows, true);
delete [] probe_rows;
- hash_table->Close();
ht_ctx->Close(runtime_state_);
}
@@ -395,9 +458,11 @@ class HashTableTest : public testing::Test {
uint64_t num_to_add = 4;
int expected_size = 0;
- MemTracker tracker(100 * 1024 * 1024);
- scoped_ptr<HashTable> hash_table;
- ASSERT_TRUE(CreateHashTable(quadratic, num_to_add, &hash_table));
+ // Need enough memory for two hash table bucket directories during resize.
+ const int64_t mem_limit_mb = 128 + 64;
+ HashTable* hash_table;
+ ASSERT_TRUE(
+ CreateHashTable(quadratic, num_to_add, &hash_table, 1024 * 1024, mem_limit_mb));
scoped_ptr<HashTableCtx> ht_ctx;
Status status = HashTableCtx::Create(&pool_, runtime_state_, build_exprs_,
probe_exprs_, false /* !stores_nulls_ */,
@@ -408,27 +473,32 @@ class HashTableTest : public testing::Test {
// entries. When num_to_add == 4, then the total number of inserts is 4194300.
int build_row_val = 0;
for (int i = 0; i < 20; ++i) {
- // Currently the mem used for the bucket is not being tracked by the mem tracker.
- // Thus the resize is expected to be successful.
- // TODO: Keep track of the mem used for the buckets and test cases where we actually
- // hit OOM.
- // TODO: Insert duplicates to also hit OOM.
- bool success = hash_table->CheckAndResize(num_to_add, ht_ctx.get());
- EXPECT_TRUE(success) << " failed to resize: " << num_to_add;
+ bool success;
+ EXPECT_OK(hash_table->CheckAndResize(num_to_add, ht_ctx.get(), &success));
+ EXPECT_TRUE(success) << " failed to resize: " << num_to_add << "\n"
+ << tracker_.LogUsage() << "\n"
+ << clients_.back()->DebugString();
for (int j = 0; j < num_to_add; ++build_row_val, ++j) {
TupleRow* row = CreateTupleRow(build_row_val);
if (!ht_ctx->EvalAndHashBuild(row)) continue;
- BufferedTupleStream::RowIdx dummy_row_idx;
+ BufferedTupleStreamV2::FlatRowPtr dummy_flat_row = nullptr;
EXPECT_TRUE(hash_table->stores_tuples_);
- bool inserted = hash_table->Insert(ht_ctx.get(), dummy_row_idx, row);
+ bool inserted = hash_table->Insert(ht_ctx.get(), dummy_flat_row, row, &status);
+ ASSERT_OK(status);
if (!inserted) goto done_inserting;
}
expected_size += num_to_add;
num_to_add *= 2;
}
- done_inserting:
- EXPECT_FALSE(tracker.LimitExceeded());
+ done_inserting:
EXPECT_EQ(hash_table->size(), 4194300);
+
+ // The next allocation should put us over the limit, since we'll need 128MB for
+ // the old buckets and 256MB for the new buckets.
+ bool success;
+ EXPECT_OK(hash_table->CheckAndResize(num_to_add * 2, ht_ctx.get(), &success));
+ EXPECT_FALSE(success);
+
// Validate that we can find the entries before we went over the limit
for (int i = 0; i < expected_size * 5; i += 100000) {
TupleRow* probe_row = CreateTupleRow(i);
@@ -441,7 +511,34 @@ class HashTableTest : public testing::Test {
EXPECT_TRUE(iter.AtEnd()) << " i: " << i;
}
}
- hash_table->Close();
+
+ // Insert duplicates to also hit OOM.
+ int64_t num_duplicates_inserted = 0;
+ const int DUPLICATE_VAL = 1234;
+ while (true) {
+ TupleRow* duplicate_row = CreateTupleRow(DUPLICATE_VAL);
+ if (!ht_ctx->EvalAndHashBuild(duplicate_row)) continue;
+ BufferedTupleStreamV2::FlatRowPtr dummy_flat_row = nullptr;
+ bool inserted =
+ hash_table->Insert(ht_ctx.get(), dummy_flat_row, duplicate_row, &status);
+ ASSERT_OK(status);
+ if (!inserted) break;
+ ++num_duplicates_inserted;
+ }
+
+ // Check that the duplicates that we successfully inserted are all present.
+ TupleRow* duplicate_row = CreateTupleRow(DUPLICATE_VAL);
+ ASSERT_TRUE(ht_ctx->EvalAndHashProbe(duplicate_row));
+ HashTable::Iterator iter = hash_table->FindProbeRow(ht_ctx.get());
+ ValidateMatch(duplicate_row, iter.GetRow());
+ for (int64_t i = 0; i < num_duplicates_inserted; ++i) {
+ ASSERT_FALSE(iter.AtEnd());
+ iter.NextDuplicate();
+ ValidateMatch(duplicate_row, iter.GetRow());
+ }
+ iter.NextDuplicate();
+ EXPECT_TRUE(iter.AtEnd());
+
ht_ctx->Close(runtime_state_);
}
@@ -450,7 +547,7 @@ class HashTableTest : public testing::Test {
// enough space in the hash table (it is also expected to be slow). It also expects that
// a probe for a N+1 element will return BUCKET_NOT_FOUND.
void InsertFullTest(bool quadratic, int table_size) {
- scoped_ptr<HashTable> hash_table;
+ HashTable* hash_table;
ASSERT_TRUE(CreateHashTable(quadratic, table_size, &hash_table));
EXPECT_EQ(hash_table->EmptyBuckets(), table_size);
scoped_ptr<HashTableCtx> ht_ctx;
@@ -472,10 +569,11 @@ class HashTableTest : public testing::Test {
// Insert using both Insert() and FindBucket() methods.
if (build_row_val % 2 == 0) {
- BufferedTupleStream::RowIdx dummy_row_idx;
+ BufferedTupleStreamV2::FlatRowPtr dummy_flat_row = nullptr;
EXPECT_TRUE(hash_table->stores_tuples_);
- bool inserted = hash_table->Insert(ht_ctx.get(), dummy_row_idx, row);
+ bool inserted = hash_table->Insert(ht_ctx.get(), dummy_flat_row, row, &status);
EXPECT_TRUE(inserted);
+ ASSERT_OK(status);
} else {
iter = hash_table->FindBuildRowBucket(ht_ctx.get(), &found);
EXPECT_FALSE(iter.AtEnd());
@@ -511,20 +609,20 @@ class HashTableTest : public testing::Test {
EXPECT_TRUE(iter.AtEnd());
EXPECT_FALSE(found);
- hash_table->Close();
ht_ctx->Close(runtime_state_);
}
// This test makes sure we can tolerate the low memory case where we do not have enough
// memory to allocate the array of buckets for the hash table.
void VeryLowMemTest(bool quadratic) {
- const int block_size = 2 * 1024;
+ const int64_t block_size = 2 * 1024;
const int max_num_blocks = 1;
- const int reserved_blocks = 0;
const int table_size = 1024;
- scoped_ptr<HashTable> hash_table;
- ASSERT_FALSE(CreateHashTable(quadratic, table_size, &hash_table, block_size,
- max_num_blocks, reserved_blocks));
+ CreateTestEnv(block_size, block_size * max_num_blocks);
+
+ HashTable* hash_table;
+ ASSERT_FALSE(CreateHashTable(
+ quadratic, table_size, &hash_table, block_size, max_num_blocks, 0, 1024));
scoped_ptr<HashTableCtx> ht_ctx;
Status status = HashTableCtx::Create(&pool_, runtime_state_, build_exprs_,
probe_exprs_, false /* !stores_nulls_ */, vector<bool>(build_exprs_.size(), false), 1, 0, 1,
@@ -532,7 +630,6 @@ class HashTableTest : public testing::Test {
EXPECT_OK(status);
HashTable::Iterator iter = hash_table->Begin(ht_ctx.get());
EXPECT_TRUE(iter.AtEnd());
- hash_table->Close();
ht_ctx->Close(runtime_state_);
}
};
@@ -612,8 +709,6 @@ TEST_F(HashTableTest, QuadraticInsertFullTest) {
// Test that hashing empty string updates hash value.
TEST_F(HashTableTest, HashEmpty) {
- EXPECT_OK(test_env_->CreateQueryStateWithBlockMgr(
- 0, 100, 8 * 1024 * 1024, nullptr, &runtime_state_));
scoped_ptr<HashTableCtx> ht_ctx;
Status status = HashTableCtx::Create(&pool_, runtime_state_, build_exprs_,
probe_exprs_, false /* !stores_nulls_ */,
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/exec/hash-table.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hash-table.cc b/be/src/exec/hash-table.cc
index a4856e9..aacedc2 100644
--- a/be/src/exec/hash-table.cc
+++ b/be/src/exec/hash-table.cc
@@ -26,7 +26,7 @@
#include "exprs/slot-ref.h"
#include "exprs/scalar-expr.h"
#include "exprs/scalar-expr-evaluator.h"
-#include "runtime/buffered-block-mgr.h"
+#include "runtime/bufferpool/reservation-tracker.h"
#include "runtime/mem-tracker.h"
#include "runtime/raw-value.inline.h"
#include "runtime/runtime-state.h"
@@ -37,8 +37,17 @@
#include "common/names.h"
using namespace impala;
-using namespace llvm;
-using namespace strings;
+using llvm::APFloat;
+using llvm::ArrayRef;
+using llvm::BasicBlock;
+using llvm::ConstantFP;
+using llvm::Function;
+using llvm::LLVMContext;
+using llvm::PHINode;
+using llvm::PointerType;
+using llvm::Type;
+using llvm::Value;
+using strings::Substitute;
DEFINE_bool(enable_quadratic_probing, true, "Enable quadratic probing hash table");
@@ -85,12 +94,6 @@ static int64_t NULL_VALUE[] = {
static_assert(sizeof(NULL_VALUE) >= ColumnType::MAX_CHAR_LENGTH,
"NULL_VALUE must be at least as large as the largest possible slot");
-// The first NUM_SMALL_BLOCKS of nodes_ are made of blocks less than the IO size (of 8MB)
-// to reduce the memory footprint of small queries. In particular, we always first use a
-// 64KB and a 512KB block before starting using IO-sized blocks.
-static const int64_t INITIAL_DATA_PAGE_SIZES[] = { 64 * 1024, 512 * 1024 };
-static const int NUM_SMALL_DATA_PAGES = sizeof(INITIAL_DATA_PAGE_SIZES) / sizeof(int64_t);
-
HashTableCtx::HashTableCtx(const std::vector<ScalarExpr*>& build_exprs,
const std::vector<ScalarExpr*>& probe_exprs, bool stores_nulls,
const std::vector<bool>& finds_nulls, int32_t initial_seed,
@@ -378,21 +381,20 @@ void HashTableCtx::ExprValuesCache::ResetForRead() {
ResetIterators();
}
-const double HashTable::MAX_FILL_FACTOR = 0.75f;
+constexpr double HashTable::MAX_FILL_FACTOR;
+constexpr int64_t HashTable::DATA_PAGE_SIZE;
-HashTable* HashTable::Create(RuntimeState* state,
- BufferedBlockMgr::Client* client, bool stores_duplicates, int num_build_tuples,
- BufferedTupleStream* tuple_stream, int64_t max_num_buckets,
+HashTable* HashTable::Create(Suballocator* allocator, bool stores_duplicates,
+ int num_build_tuples, BufferedTupleStreamV2* tuple_stream, int64_t max_num_buckets,
int64_t initial_num_buckets) {
- return new HashTable(FLAGS_enable_quadratic_probing, state, client, stores_duplicates,
+ return new HashTable(FLAGS_enable_quadratic_probing, allocator, stores_duplicates,
num_build_tuples, tuple_stream, max_num_buckets, initial_num_buckets);
}
-HashTable::HashTable(bool quadratic_probing, RuntimeState* state,
- BufferedBlockMgr::Client* client, bool stores_duplicates, int num_build_tuples,
- BufferedTupleStream* stream, int64_t max_num_buckets, int64_t num_buckets)
- : state_(state),
- block_mgr_client_(client),
+HashTable::HashTable(bool quadratic_probing, Suballocator* allocator,
+ bool stores_duplicates, int num_build_tuples, BufferedTupleStreamV2* stream,
+ int64_t max_num_buckets, int64_t num_buckets)
+ : allocator_(allocator),
tuple_stream_(stream),
stores_tuples_(num_build_tuples == 1),
stores_duplicates_(stores_duplicates),
@@ -410,26 +412,23 @@ HashTable::HashTable(bool quadratic_probing, RuntimeState* state,
has_matches_(false),
num_probes_(0), num_failed_probes_(0), travel_length_(0), num_hash_collisions_(0),
num_resizes_(0) {
- DCHECK_EQ((num_buckets & (num_buckets-1)), 0) << "num_buckets must be a power of 2";
+ DCHECK_EQ((num_buckets & (num_buckets - 1)), 0) << "num_buckets must be a power of 2";
DCHECK_GT(num_buckets, 0) << "num_buckets must be larger than 0";
DCHECK(stores_tuples_ || stream != NULL);
- DCHECK(client != NULL);
}
-bool HashTable::Init() {
+Status HashTable::Init(bool* got_memory) {
int64_t buckets_byte_size = num_buckets_ * sizeof(Bucket);
- if (!state_->block_mgr()->ConsumeMemory(block_mgr_client_, buckets_byte_size)) {
- num_buckets_ = 0;
- return false;
- }
- buckets_ = reinterpret_cast<Bucket*>(malloc(buckets_byte_size));
- if (buckets_ == NULL) {
- state_->block_mgr()->ReleaseMemory(block_mgr_client_, buckets_byte_size);
+ RETURN_IF_ERROR(allocator_->Allocate(buckets_byte_size, &bucket_allocation_));
+ if (bucket_allocation_ == nullptr) {
num_buckets_ = 0;
- return false;
+ *got_memory = false;
+ return Status::OK();
}
+ buckets_ = reinterpret_cast<Bucket*>(bucket_allocation_->data());
memset(buckets_, 0, buckets_byte_size);
- return true;
+ *got_memory = true;
+ return Status::OK();
}
void HashTable::Close() {
@@ -439,36 +438,39 @@ void HashTable::Close() {
const int64_t HEAVILY_USED = 1024 * 1024;
// TODO: These statistics should go to the runtime profile as well.
if ((num_buckets_ > LARGE_HT) || (num_probes_ > HEAVILY_USED)) VLOG(2) << PrintStats();
- for (int i = 0; i < data_pages_.size(); ++i) {
- data_pages_[i]->Delete();
- }
+ for (auto& data_page : data_pages_) allocator_->Free(move(data_page));
+ data_pages_.clear();
if (ImpaladMetrics::HASH_TABLE_TOTAL_BYTES != NULL) {
ImpaladMetrics::HASH_TABLE_TOTAL_BYTES->Increment(-total_data_page_size_);
}
- data_pages_.clear();
- if (buckets_ != NULL) free(buckets_);
- state_->block_mgr()->ReleaseMemory(block_mgr_client_, num_buckets_ * sizeof(Bucket));
+ if (bucket_allocation_ != nullptr) allocator_->Free(move(bucket_allocation_));
}
-bool HashTable::CheckAndResize(uint64_t buckets_to_fill, const HashTableCtx* ht_ctx) {
+Status HashTable::CheckAndResize(
+ uint64_t buckets_to_fill, const HashTableCtx* ht_ctx, bool* got_memory) {
uint64_t shift = 0;
while (num_filled_buckets_ + buckets_to_fill >
(num_buckets_ << shift) * MAX_FILL_FACTOR) {
- // TODO: next prime instead of double?
++shift;
}
- if (shift > 0) return ResizeBuckets(num_buckets_ << shift, ht_ctx);
- return true;
+ if (shift > 0) return ResizeBuckets(num_buckets_ << shift, ht_ctx, got_memory);
+ *got_memory = true;
+ return Status::OK();
}
-bool HashTable::ResizeBuckets(int64_t num_buckets, const HashTableCtx* ht_ctx) {
- DCHECK_EQ((num_buckets & (num_buckets-1)), 0)
+Status HashTable::ResizeBuckets(
+ int64_t num_buckets, const HashTableCtx* ht_ctx, bool* got_memory) {
+ DCHECK_EQ((num_buckets & (num_buckets - 1)), 0)
<< "num_buckets=" << num_buckets << " must be a power of 2";
- DCHECK_GT(num_buckets, num_filled_buckets_) << "Cannot shrink the hash table to "
- "smaller number of buckets than the number of filled buckets.";
- VLOG(2) << "Resizing hash table from "
- << num_buckets_ << " to " << num_buckets << " buckets.";
- if (max_num_buckets_ != -1 && num_buckets > max_num_buckets_) return false;
+ DCHECK_GT(num_buckets, num_filled_buckets_)
+ << "Cannot shrink the hash table to smaller number of buckets than the number of "
+ << "filled buckets.";
+ VLOG(2) << "Resizing hash table from " << num_buckets_ << " to " << num_buckets
+ << " buckets.";
+ if (max_num_buckets_ != -1 && num_buckets > max_num_buckets_) {
+ *got_memory = false;
+ return Status::OK();
+ }
++num_resizes_;
// All memory that can grow proportional to the input should come from the block mgrs
@@ -476,14 +478,16 @@ bool HashTable::ResizeBuckets(int64_t num_buckets, const HashTableCtx* ht_ctx) {
// Note that while we copying over the contents of the old hash table, we need to have
// allocated both the old and the new hash table. Once we finish, we return the memory
// of the old hash table.
- int64_t old_size = num_buckets_ * sizeof(Bucket);
+ // int64_t old_size = num_buckets_ * sizeof(Bucket);
int64_t new_size = num_buckets * sizeof(Bucket);
- if (!state_->block_mgr()->ConsumeMemory(block_mgr_client_, new_size)) return false;
- Bucket* new_buckets = reinterpret_cast<Bucket*>(malloc(new_size));
- if (new_buckets == NULL) {
- state_->block_mgr()->ReleaseMemory(block_mgr_client_, new_size);
- return false;
+
+ unique_ptr<Suballocation> new_allocation;
+ RETURN_IF_ERROR(allocator_->Allocate(new_size, &new_allocation));
+ if (new_allocation == NULL) {
+ *got_memory = false;
+ return Status::OK();
}
+ Bucket* new_buckets = reinterpret_cast<Bucket*>(new_allocation->data());
memset(new_buckets, 0, new_size);
// Walk the old table and copy all the filled buckets to the new (resized) table.
@@ -503,28 +507,22 @@ bool HashTable::ResizeBuckets(int64_t num_buckets, const HashTableCtx* ht_ctx) {
}
num_buckets_ = num_buckets;
- free(buckets_);
- buckets_ = new_buckets;
- state_->block_mgr()->ReleaseMemory(block_mgr_client_, old_size);
- return true;
+ allocator_->Free(move(bucket_allocation_));
+ bucket_allocation_ = move(new_allocation);
+ buckets_ = reinterpret_cast<Bucket*>(bucket_allocation_->data());
+ *got_memory = true;
+ return Status::OK();
}
-bool HashTable::GrowNodeArray() {
- int64_t page_size = 0;
- page_size = state_->block_mgr()->max_block_size();
- if (data_pages_.size() < NUM_SMALL_DATA_PAGES) {
- page_size = min(page_size, INITIAL_DATA_PAGE_SIZES[data_pages_.size()]);
- }
- BufferedBlockMgr::Block* block = NULL;
- Status status = state_->block_mgr()->GetNewBlock(
- block_mgr_client_, NULL, &block, page_size);
- DCHECK(status.ok() || block == NULL);
- if (block == NULL) return false;
- data_pages_.push_back(block);
- next_node_ = block->Allocate<DuplicateNode>(page_size);
- ImpaladMetrics::HASH_TABLE_TOTAL_BYTES->Increment(page_size);
- node_remaining_current_page_ = page_size / sizeof(DuplicateNode);
- total_data_page_size_ += page_size;
+bool HashTable::GrowNodeArray(Status* status) {
+ unique_ptr<Suballocation> allocation;
+ *status = allocator_->Allocate(DATA_PAGE_SIZE, &allocation);
+ if (!status->ok() || allocation == nullptr) return false;
+ next_node_ = reinterpret_cast<DuplicateNode*>(allocation->data());
+ data_pages_.push_back(move(allocation));
+ ImpaladMetrics::HASH_TABLE_TOTAL_BYTES->Increment(DATA_PAGE_SIZE);
+ node_remaining_current_page_ = DATA_PAGE_SIZE / sizeof(DuplicateNode);
+ total_data_page_size_ += DATA_PAGE_SIZE;
return true;
}
@@ -533,8 +531,7 @@ void HashTable::DebugStringTuple(stringstream& ss, HtData& htdata,
if (stores_tuples_) {
ss << "(" << htdata.tuple << ")";
} else {
- ss << "(" << htdata.idx.block() << ", " << htdata.idx.idx()
- << ", " << htdata.idx.offset() << ")";
+ ss << "(" << htdata.flat_row << ")";
}
if (desc != NULL) {
Tuple* row[num_build_tuples_];
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/exec/hash-table.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hash-table.h b/be/src/exec/hash-table.h
index 9ba5b04..297e619 100644
--- a/be/src/exec/hash-table.h
+++ b/be/src/exec/hash-table.h
@@ -15,19 +15,21 @@
// specific language governing permissions and limitations
// under the License.
-
#ifndef IMPALA_EXEC_HASH_TABLE_H
#define IMPALA_EXEC_HASH_TABLE_H
+#include <memory>
#include <vector>
#include <boost/cstdint.hpp>
#include <boost/scoped_ptr.hpp>
+
#include "codegen/impala-ir.h"
-#include "common/logging.h"
#include "common/compiler-util.h"
-#include "runtime/buffered-block-mgr.h"
-#include "runtime/buffered-tuple-stream.h"
-#include "runtime/buffered-tuple-stream.inline.h"
+#include "common/logging.h"
+#include "runtime/buffered-tuple-stream-v2.h"
+#include "runtime/buffered-tuple-stream-v2.inline.h"
+#include "runtime/bufferpool/buffer-pool.h"
+#include "runtime/bufferpool/suballocator.h"
#include "runtime/tuple-row.h"
#include "util/bitmap.h"
#include "util/hash-util.h"
@@ -101,7 +103,6 @@ class HashTable;
/// Inserts(). We may want to optimize joins more heavily for Inserts() (in particular
/// growing).
/// TODO: Batched interface for inserts and finds.
-/// TODO: Do we need to check mem limit exceeded so often. Check once per batch?
/// TODO: as an optimization, compute variable-length data size for the agg node.
/// Control block for a hash table. This class contains the logic as well as the variables
@@ -525,13 +526,15 @@ class HashTableCtx {
/// nodes do not contain the hash value, because all the linked nodes have the same hash
/// value, the one in the bucket. The data is either a tuple stream index or a Tuple*.
/// This array of buckets is sparse, we are shooting for up to 3/4 fill factor (75%). The
-/// data allocated by the hash table comes from the BufferedBlockMgr.
+/// data allocated by the hash table comes from the BufferPool.
class HashTable {
private:
-
- /// Either the row in the tuple stream or a pointer to the single tuple of this row.
+ /// Rows are represented as pointers into the BufferedTupleStream data with one
+ /// of two formats, depending on the number of tuples in the row.
union HtData {
- BufferedTupleStream::RowIdx idx;
+ // For rows with multiple tuples per row, a pointer to the flattened TupleRow.
+ BufferedTupleStreamV2::FlatRowPtr flat_row;
+ // For rows with one tuple per row, a pointer to the Tuple itself.
Tuple* tuple;
};
@@ -584,7 +587,7 @@ class HashTable {
/// Returns a newly allocated HashTable. The probing algorithm is set by the
/// FLAG_enable_quadratic_probing.
- /// - client: block mgr client to allocate data pages from.
+ /// - allocator: allocator to allocate bucket directory and data pages from.
/// - stores_duplicates: true if rows with duplicate keys may be inserted into the
/// hash table.
/// - num_build_tuples: number of Tuples in the build tuple row.
@@ -596,31 +599,35 @@ class HashTable {
/// -1, if it unlimited.
/// - initial_num_buckets: number of buckets that the hash table should be initialized
/// with.
- static HashTable* Create(RuntimeState* state, BufferedBlockMgr::Client* client,
- bool stores_duplicates, int num_build_tuples, BufferedTupleStream* tuple_stream,
- int64_t max_num_buckets, int64_t initial_num_buckets);
+ static HashTable* Create(Suballocator* allocator, bool stores_duplicates,
+ int num_build_tuples, BufferedTupleStreamV2* tuple_stream, int64_t max_num_buckets,
+ int64_t initial_num_buckets);
- /// Allocates the initial bucket structure. Returns false if OOM.
- bool Init();
+ /// Allocates the initial bucket structure. Returns a non-OK status if an error is
+ /// encountered. If an OK status is returned , 'got_memory' is set to indicate whether
+ /// enough memory for the initial buckets was allocated from the Suballocator.
+ Status Init(bool* got_memory) WARN_UNUSED_RESULT;
/// Call to cleanup any resources. Must be called once.
void Close();
- /// Inserts the row to the hash table. Returns true if the insertion was successful.
- /// Always returns true if the table has free buckets and the key is not a duplicate.
- /// The caller is responsible for ensuring that the table has free buckets
- /// 'idx' is the index into tuple_stream_ for this row. If the row contains more than
- /// one tuple, the 'idx' is stored instead of the 'row'. The 'row' is not copied by the
- /// hash table and the caller must guarantee it stays in memory. This will not grow the
- /// hash table. In the case that there is a need to insert a duplicate node, instead of
- /// filling a new bucket, and there is not enough memory to insert a duplicate node,
- /// the insert fails and this function returns false.
- /// Used during the build phase of hash joins.
+ /// Inserts the row to the hash table. The caller is responsible for ensuring that the
+ /// table has free buckets. Returns true if the insertion was successful. Always
+ /// returns true if the table has free buckets and the key is not a duplicate. If the
+ /// key was a duplicate and memory could not be allocated for the new duplicate node,
+ /// returns false. If an error is encountered while creating a duplicate node, returns
+ /// false and sets 'status' to the error.
+ ///
+ /// 'flat_row' is a pointer to the flattened row in 'tuple_stream_' If the row contains
+ /// only one tuple, a pointer to that tuple is stored. Otherwise the 'flat_row' pointer
+ /// is stored. The 'row' is not copied by the hash table and the caller must guarantee
+ /// it stays in memory. This will not grow the hash table.
bool IR_ALWAYS_INLINE Insert(HashTableCtx* ht_ctx,
- const BufferedTupleStream::RowIdx& idx, TupleRow* row);
+ BufferedTupleStreamV2::FlatRowPtr flat_row, TupleRow* row,
+ Status* status) WARN_UNUSED_RESULT;
/// Prefetch the hash table bucket which the given hash value 'hash' maps to.
- template<const bool READ>
+ template <const bool READ>
void IR_ALWAYS_INLINE PrefetchBucket(uint32_t hash);
/// Returns an iterator to the bucket that matches the probe expression results that
@@ -680,12 +687,17 @@ class HashTable {
/// Calculates the fill factor if 'buckets_to_fill' additional buckets were to be
/// filled and resizes the hash table so that the projected fill factor is below the
/// max fill factor.
- /// If it returns true, then it is guaranteed at least 'rows_to_add' rows can be
- /// inserted without need to resize.
- bool CheckAndResize(uint64_t buckets_to_fill, const HashTableCtx* ht_ctx);
+ /// If 'got_memory' is true, then it is guaranteed at least 'rows_to_add' rows can be
+ /// inserted without need to resize. If there is not enough memory available to
+ /// resize the hash table, Status::OK() is returned and 'got_memory' is false. If a
+ /// another error occurs, an error status may be returned.
+ Status CheckAndResize(uint64_t buckets_to_fill, const HashTableCtx* ht_ctx,
+ bool* got_memory) WARN_UNUSED_RESULT;
/// Returns the number of bytes allocated to the hash table from the block manager.
- int64_t ByteSize() const { return num_buckets_ * sizeof(Bucket) + total_data_page_size_; }
+ int64_t ByteSize() const {
+ return num_buckets_ * sizeof(Bucket) + total_data_page_size_;
+ }
/// Returns an iterator at the beginning of the hash table. Advancing this iterator
/// will traverse all elements.
@@ -792,7 +804,6 @@ class HashTable {
TupleRow* scratch_row_;
/// Current bucket idx.
- /// TODO: Use uint32_t?
int64_t bucket_idx_;
/// Pointer to the current duplicate node.
@@ -807,9 +818,9 @@ class HashTable {
/// of calling this constructor directly.
/// - quadratic_probing: set to true when the probing algorithm is quadratic, as
/// opposed to linear.
- HashTable(bool quadratic_probing, RuntimeState* state, BufferedBlockMgr::Client* client,
- bool stores_duplicates, int num_build_tuples, BufferedTupleStream* tuple_stream,
- int64_t max_num_buckets, int64_t initial_num_buckets);
+ HashTable(bool quadratic_probing, Suballocator* allocator, bool stores_duplicates,
+ int num_build_tuples, BufferedTupleStreamV2* tuple_stream, int64_t max_num_buckets,
+ int64_t initial_num_buckets);
/// Performs the probing operation according to the probing algorithm (linear or
/// quadratic. Returns one of the following:
@@ -839,8 +850,10 @@ class HashTable {
HashTableCtx* ht_ctx, uint32_t hash, bool* found);
/// Performs the insert logic. Returns the HtData* of the bucket or duplicate node
- /// where the data should be inserted. Returns NULL if the insert was not successful.
- HtData* IR_ALWAYS_INLINE InsertInternal(HashTableCtx* ht_ctx);
+ /// where the data should be inserted. Returns NULL if the insert was not successful
+ /// and either sets 'status' to OK if it failed because not enough reservation was
+ /// available or the error if an error was encountered.
+ HtData* IR_ALWAYS_INLINE InsertInternal(HashTableCtx* ht_ctx, Status* status);
/// Updates 'bucket_idx' to the index of the next non-empty bucket. If the bucket has
/// duplicates, 'node' will be pointing to the head of the linked list of duplicates.
@@ -848,8 +861,8 @@ class HashTable {
/// 'bucket_idx' to BUCKET_NOT_FOUND.
void NextFilledBucket(int64_t* bucket_idx, DuplicateNode** node);
- /// Resize the hash table to 'num_buckets'. Returns false on OOM.
- bool ResizeBuckets(int64_t num_buckets, const HashTableCtx* ht_ctx);
+ /// Resize the hash table to 'num_buckets'. 'got_memory' is false on OOM.
+ Status ResizeBuckets(int64_t num_buckets, const HashTableCtx* ht_ctx, bool* got_memory);
/// Appends the DuplicateNode pointed by next_node_ to 'bucket' and moves the next_node_
/// pointer to the next DuplicateNode in the page, updating the remaining node counter.
@@ -862,9 +875,10 @@ class HashTable {
/// the bucket is converted to a DuplicateNode. That is, the contents of 'data' of the
/// bucket are copied to a DuplicateNode and 'data' is updated to pointing to a
/// DuplicateNode.
- /// Returns NULL if the node array could not grow, i.e. there was not enough memory to
- /// allocate a new DuplicateNode.
- DuplicateNode* IR_ALWAYS_INLINE InsertDuplicateNode(int64_t bucket_idx);
+ /// Returns NULL and sets 'status' to OK if the node array could not grow, i.e. there
+ /// was not enough memory to allocate a new DuplicateNode. Returns NULL and sets
+ /// 'status' to an error if another error was encountered.
+ DuplicateNode* IR_ALWAYS_INLINE InsertDuplicateNode(int64_t bucket_idx, Status* status);
/// Resets the contents of the empty bucket with index 'bucket_idx', in preparation for
/// an insert. Sets all the fields of the bucket other than 'data'.
@@ -877,8 +891,10 @@ class HashTable {
/// returns the content of the first chained duplicate node of the bucket.
TupleRow* GetRow(Bucket* bucket, TupleRow* row) const;
- /// Grow the node array. Returns false on OOM.
- bool GrowNodeArray();
+ /// Grow the node array. Returns true and sets 'status' to OK on success. Returns false
+ /// and set 'status' to OK if we can't get sufficient reservation to allocate the next
+ /// data page. Returns false and sets 'status' if another error is encountered.
+ bool GrowNodeArray(Status* status);
/// Functions to be replaced by codegen to specialize the hash table.
bool IR_NO_INLINE stores_tuples() const { return stores_tuples_; }
@@ -887,20 +903,26 @@ class HashTable {
/// Load factor that will trigger growing the hash table on insert. This is
/// defined as the number of non-empty buckets / total_buckets
- static const double MAX_FILL_FACTOR;
+ static constexpr double MAX_FILL_FACTOR = 0.75;
+
+ /// The size in bytes of each page of duplicate nodes. Should be large enough to fit
+ /// enough DuplicateNodes to amortise the overhead of allocating each page and low
+ /// enough to not waste excessive memory to internal fragmentation.
+ static constexpr int64_t DATA_PAGE_SIZE = 64L * 1024;
RuntimeState* state_;
- /// Client to allocate data pages with.
- BufferedBlockMgr::Client* block_mgr_client_;
+ /// Suballocator to allocate data pages and hash table buckets with.
+ Suballocator* allocator_;
/// Stream contains the rows referenced by the hash table. Can be NULL if the
/// row only contains a single tuple, in which case the TupleRow indirection
/// is removed by the hash table.
- BufferedTupleStream* tuple_stream_;
+ BufferedTupleStreamV2* tuple_stream_;
- /// Constants on how the hash table should behave. Joins and aggs have slightly
- /// different behavior.
+ /// Constants on how the hash table should behave.
+
+ /// True if the HtData uses the Tuple* representation, or false if it uses FlatRowPtr.
const bool stores_tuples_;
/// True if duplicates may be inserted into hash table.
@@ -909,8 +931,9 @@ class HashTable {
/// Quadratic probing enabled (as opposed to linear).
const bool quadratic_probing_;
- /// Data pages for all nodes. These are always pinned.
- std::vector<BufferedBlockMgr::Block*> data_pages_;
+ /// Data pages for all nodes. Allocated from suballocator to reduce memory
+ /// consumption of small tables.
+ std::vector<std::unique_ptr<Suballocation>> data_pages_;
/// Byte size of all buffers in data_pages_.
int64_t total_data_page_size_;
@@ -926,8 +949,10 @@ class HashTable {
const int64_t max_num_buckets_;
- /// Array of all buckets. Owned by this node. Using c-style array to control
- /// control memory footprint.
+ /// Allocation containing all buckets.
+ std::unique_ptr<Suballocation> bucket_allocation_;
+
+ /// Pointer to the 'buckets_' array from 'bucket_allocation_'.
Bucket* buckets_;
/// Total number of buckets (filled and empty).
@@ -943,9 +968,8 @@ class HashTable {
/// Number of build tuples, used for constructing temp row* for probes.
const int num_build_tuples_;
- /// Flag used to disable spilling hash tables that already had matches in case of
- /// right joins (IMPALA-1488).
- /// TODO: Not fail when spilling hash tables with matches in right joins
+ /// Flag used to check that we don't lose stored matches when spilling hash tables
+ /// (IMPALA-1488).
bool has_matches_;
/// The stats below can be used for debugging perf.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/exec/hash-table.inline.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hash-table.inline.h b/be/src/exec/hash-table.inline.h
index aff7c14..ce2f784 100644
--- a/be/src/exec/hash-table.inline.h
+++ b/be/src/exec/hash-table.inline.h
@@ -90,7 +90,8 @@ inline int64_t HashTable::Probe(Bucket* buckets, int64_t num_buckets,
return Iterator::BUCKET_NOT_FOUND;
}
-inline HashTable::HtData* HashTable::InsertInternal(HashTableCtx* ht_ctx) {
+inline HashTable::HtData* HashTable::InsertInternal(
+ HashTableCtx* ht_ctx, Status* status) {
++num_probes_;
bool found = false;
uint32_t hash = ht_ctx->expr_values_cache()->CurExprValuesHash();
@@ -98,7 +99,7 @@ inline HashTable::HtData* HashTable::InsertInternal(HashTableCtx* ht_ctx) {
DCHECK_NE(bucket_idx, Iterator::BUCKET_NOT_FOUND);
if (found) {
// We need to insert a duplicate node, note that this may fail to allocate memory.
- DuplicateNode* new_node = InsertDuplicateNode(bucket_idx);
+ DuplicateNode* new_node = InsertDuplicateNode(bucket_idx, status);
if (UNLIKELY(new_node == NULL)) return NULL;
return &new_node->htdata;
} else {
@@ -108,14 +109,14 @@ inline HashTable::HtData* HashTable::InsertInternal(HashTableCtx* ht_ctx) {
}
inline bool HashTable::Insert(HashTableCtx* ht_ctx,
- const BufferedTupleStream::RowIdx& idx, TupleRow* row) {
- HtData* htdata = InsertInternal(ht_ctx);
+ BufferedTupleStreamV2::FlatRowPtr flat_row, TupleRow* row, Status* status) {
+ HtData* htdata = InsertInternal(ht_ctx, status);
// If successful insert, update the contents of the newly inserted entry with 'idx'.
if (LIKELY(htdata != NULL)) {
if (stores_tuples()) {
htdata->tuple = row->GetTuple(0);
} else {
- htdata->idx = idx;
+ htdata->flat_row = flat_row;
}
return true;
}
@@ -213,7 +214,8 @@ inline HashTable::DuplicateNode* HashTable::AppendNextNode(Bucket* bucket) {
return next_node_++;
}
-inline HashTable::DuplicateNode* HashTable::InsertDuplicateNode(int64_t bucket_idx) {
+inline HashTable::DuplicateNode* HashTable::InsertDuplicateNode(
+ int64_t bucket_idx, Status* status) {
DCHECK_GE(bucket_idx, 0);
DCHECK_LT(bucket_idx, num_buckets_);
Bucket* bucket = &buckets_[bucket_idx];
@@ -222,12 +224,12 @@ inline HashTable::DuplicateNode* HashTable::InsertDuplicateNode(int64_t bucket_i
// Allocate one duplicate node for the new data and one for the preexisting data,
// if needed.
while (node_remaining_current_page_ < 1 + !bucket->hasDuplicates) {
- if (UNLIKELY(!GrowNodeArray())) return NULL;
+ if (UNLIKELY(!GrowNodeArray(status))) return NULL;
}
if (!bucket->hasDuplicates) {
// This is the first duplicate in this bucket. It means that we need to convert
// the current entry in the bucket to a node and link it from the bucket.
- next_node_->htdata.idx = bucket->bucketData.htdata.idx;
+ next_node_->htdata.flat_row = bucket->bucketData.htdata.flat_row;
DCHECK(!bucket->matched);
next_node_->matched = false;
next_node_->next = NULL;
@@ -246,7 +248,7 @@ inline TupleRow* IR_ALWAYS_INLINE HashTable::GetRow(HtData& htdata, TupleRow* ro
return reinterpret_cast<TupleRow*>(&htdata.tuple);
} else {
// TODO: GetTupleRow() has interpreted code that iterates over the row's descriptor.
- tuple_stream_->GetTupleRow(htdata.idx, row);
+ tuple_stream_->GetTupleRow(htdata.flat_row, row);
return row;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/exec/nested-loop-join-builder.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/nested-loop-join-builder.cc b/be/src/exec/nested-loop-join-builder.cc
index 67e6ed6..fdd94ee 100644
--- a/be/src/exec/nested-loop-join-builder.cc
+++ b/be/src/exec/nested-loop-join-builder.cc
@@ -45,8 +45,7 @@ Status NljBuilder::Send(RuntimeState* state, RowBatch* batch) {
build_batch->AcquireState(batch);
AddBuildBatch(build_batch);
- if (build_batch->needs_deep_copy() || build_batch->num_blocks() > 0
- || build_batch->num_buffers() > 0) {
+ if (build_batch->needs_deep_copy() || build_batch->num_buffers() > 0) {
// This batch and earlier batches may refer to resources passed from the child
// that aren't owned by the row batch itself. Deep copying ensures that the row
// batches are backed by memory owned by this node that is safe to hold on to.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/exec/partial-sort-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partial-sort-node.cc b/be/src/exec/partial-sort-node.cc
index 4f485d5..88b2f26 100644
--- a/be/src/exec/partial-sort-node.cc
+++ b/be/src/exec/partial-sort-node.cc
@@ -58,8 +58,10 @@ Status PartialSortNode::Prepare(RuntimeState* state) {
RETURN_IF_ERROR(ExecNode::Prepare(state));
less_than_.reset(new TupleRowComparator(ordering_exprs_, is_asc_order_, nulls_first_));
sorter_.reset(new Sorter(*less_than_, sort_tuple_exprs_, &row_descriptor_,
- mem_tracker(), runtime_profile(), state, false));
+ mem_tracker(), &buffer_pool_client_, resource_profile_.spillable_buffer_size,
+ runtime_profile(), state, id(), false));
RETURN_IF_ERROR(sorter_->Prepare(pool_, expr_mem_pool()));
+ DCHECK_GE(resource_profile_.min_reservation, sorter_->ComputeMinReservation());
AddCodegenDisabledMessage(state);
input_batch_.reset(
new RowBatch(child(0)->row_desc(), state->batch_size(), mem_tracker()));
@@ -81,6 +83,9 @@ Status PartialSortNode::Open(RuntimeState* state) {
RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR(QueryMaintenance(state));
RETURN_IF_ERROR(child(0)->Open(state));
+ if (!buffer_pool_client_.is_registered()) {
+ RETURN_IF_ERROR(ClaimBufferReservation(state));
+ }
return Status::OK();
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/exec/partial-sort-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/partial-sort-node.h b/be/src/exec/partial-sort-node.h
index ab4c547..d40d653 100644
--- a/be/src/exec/partial-sort-node.h
+++ b/be/src/exec/partial-sort-node.h
@@ -19,7 +19,6 @@
#define IMPALA_EXEC_PARTIAL_SORT_NODE_H
#include "exec/exec-node.h"
-#include "runtime/buffered-block-mgr.h"
#include "runtime/sorter.h"
namespace impala {
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/exec/partitioned-aggregation-node-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-aggregation-node-ir.cc b/be/src/exec/partitioned-aggregation-node-ir.cc
index cd5d336..126a2a5 100644
--- a/be/src/exec/partitioned-aggregation-node-ir.cc
+++ b/be/src/exec/partitioned-aggregation-node-ir.cc
@@ -21,7 +21,7 @@
#include "exprs/agg-fn-evaluator.h"
#include "exprs/scalar-expr.h"
#include "exprs/scalar-expr-evaluator.h"
-#include "runtime/buffered-tuple-stream.inline.h"
+#include "runtime/buffered-tuple-stream-v2.inline.h"
#include "runtime/row-batch.h"
#include "runtime/tuple-row.h"
@@ -46,7 +46,8 @@ Status PartitionedAggregationNode::ProcessBatch(RowBatch* batch,
// will end up to the same partition.
// TODO: Once we have a histogram with the number of rows per partition, we will have
// accurate resize calls.
- RETURN_IF_ERROR(CheckAndResizeHashPartitions(batch->num_rows(), ht_ctx));
+ RETURN_IF_ERROR(
+ CheckAndResizeHashPartitions(AGGREGATED_ROWS, batch->num_rows(), ht_ctx));
HashTableCtx::ExprValuesCache* expr_vals_cache = ht_ctx->expr_values_cache();
const int cache_size = expr_vals_cache->capacity();
@@ -108,6 +109,7 @@ Status PartitionedAggregationNode::ProcessRow(TupleRow* __restrict__ row,
// so we can try again to insert the row.
HashTable* hash_tbl = GetHashTable(partition_idx);
Partition* dst_partition = hash_partitions_[partition_idx];
+ DCHECK(dst_partition != nullptr);
DCHECK_EQ(dst_partition->is_spilled(), hash_tbl == NULL);
if (hash_tbl == NULL) {
// This partition is already spilled, just append the row.
@@ -155,24 +157,13 @@ Status PartitionedAggregationNode::AddIntermediateTuple(Partition* __restrict__
}
// We did not have enough memory to add intermediate_tuple to the stream.
- RETURN_IF_ERROR(SpillPartition());
+ RETURN_IF_ERROR(SpillPartition(AGGREGATED_ROWS));
if (partition->is_spilled()) {
return AppendSpilledRow<AGGREGATED_ROWS>(partition, row);
}
}
}
-template<bool AGGREGATED_ROWS>
-Status PartitionedAggregationNode::AppendSpilledRow(Partition* __restrict__ partition,
- TupleRow* __restrict__ row) {
- DCHECK(!is_streaming_preagg_);
- DCHECK(partition->is_spilled());
- BufferedTupleStream* stream = AGGREGATED_ROWS ?
- partition->aggregated_row_stream.get() :
- partition->unaggregated_row_stream.get();
- return AppendSpilledRow(stream, row);
-}
-
Status PartitionedAggregationNode::ProcessBatchStreaming(bool needs_serialize,
TPrefetchMode::type prefetch_mode, RowBatch* in_batch, RowBatch* out_batch,
HashTableCtx* __restrict__ ht_ctx, int remaining_capacity[PARTITION_FANOUT]) {
@@ -230,6 +221,7 @@ bool PartitionedAggregationNode::TryAddToHashTable(
DCHECK(remaining_capacity != NULL);
DCHECK_EQ(hash_tbl, partition->hash_tbl.get());
DCHECK_GE(*remaining_capacity, 0);
+ if (hash_tbl == nullptr) return false; // Hash table was not created - pass through.
bool found;
// This is called from ProcessBatchStreaming() so the rows are not aggregated.
HashTable::Iterator it = hash_tbl->FindBuildRowBucket(ht_ctx, &found);