You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2016/08/19 05:48:42 UTC
[2/2] incubator-impala git commit: IMPALA-3090: always log memory
limit errors
IMPALA-3090: always log memory limit errors
Consistently log memory limit errors so that the error message contains
a dump of the query memory trackers at the time that the memory limit
was hit (instead of after the fact when the query is already partially
cleaned up).
Testing:
Exhaustive build passed. Ran local stress test for a bit.
Change-Id: If5ec5572b0e26898da352b7e6b11eb01c6edb2e5
Reviewed-on: http://gerrit.cloudera.org:8080/4049
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Internal Jenkins
Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/7eb30309
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/7eb30309
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/7eb30309
Branch: refs/heads/master
Commit: 7eb30309f3847f416f204bd5f7d6925102e94b67
Parents: d113205
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Thu Aug 18 12:17:27 2016 -0700
Committer: Internal Jenkins <cl...@gerrit.cloudera.org>
Committed: Fri Aug 19 05:36:02 2016 +0000
----------------------------------------------------------------------
be/src/exec/analytic-eval-node.cc | 12 +++++++-----
be/src/exec/partitioned-hash-join-node.cc | 20 ++++++++------------
be/src/runtime/buffered-block-mgr.cc | 20 +++++++++-----------
be/src/runtime/mem-tracker.h | 4 +++-
be/src/runtime/plan-fragment-executor.cc | 3 ---
be/src/runtime/row-batch.cc | 1 -
be/src/runtime/runtime-state.h | 2 ++
be/src/runtime/sorter.cc | 21 +++++++++------------
8 files changed, 38 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7eb30309/be/src/exec/analytic-eval-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/analytic-eval-node.cc b/be/src/exec/analytic-eval-node.cc
index d401909..c9e35d8 100644
--- a/be/src/exec/analytic-eval-node.cc
+++ b/be/src/exec/analytic-eval-node.cc
@@ -22,6 +22,7 @@
#include "exprs/agg-fn-evaluator.h"
#include "runtime/buffered-tuple-stream.inline.h"
#include "runtime/descriptors.h"
+#include "runtime/mem-tracker.h"
#include "runtime/row-batch.h"
#include "runtime/runtime-state.h"
#include "udf/udf-internal.h"
@@ -31,6 +32,11 @@
static const int MAX_TUPLE_POOL_SIZE = 8 * 1024 * 1024; // 8MB
+const string PREPARE_FOR_READ_FAILED_ERROR_MSG =
+ "Failed to acquire initial read buffer for analytic function evaluation. Reducing "
+ "query concurrency or increasing the memory limit may help this query to complete "
+ "successfully.";
+
using namespace strings;
namespace impala {
@@ -197,11 +203,7 @@ Status AnalyticEvalNode::Open(RuntimeState* state) {
bool got_read_buffer;
RETURN_IF_ERROR(input_stream_->PrepareForRead(true, &got_read_buffer));
if (!got_read_buffer) {
- Status status = Status::MemLimitExceeded();
- status.AddDetail("Failed to acquire initial read buffer for analytic function "
- "evaluation. Reducing query concurrency or increasing the memory limit may "
- "help this query to complete successfully.");
- return status;
+ return mem_tracker()->MemLimitExceeded(state, PREPARE_FOR_READ_FAILED_ERROR_MSG);
}
DCHECK_EQ(evaluators_.size(), fn_ctxs_.size());
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7eb30309/be/src/exec/partitioned-hash-join-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-node.cc b/be/src/exec/partitioned-hash-join-node.cc
index a2b5001..d3aaf3d 100644
--- a/be/src/exec/partitioned-hash-join-node.cc
+++ b/be/src/exec/partitioned-hash-join-node.cc
@@ -659,9 +659,8 @@ Status PartitionedHashJoinNode::ProcessBuildInput(RuntimeState* state, int level
RETURN_IF_ERROR(
input_partition_->build_rows()->PrepareForRead(true, &got_read_buffer));
if (!got_read_buffer) {
- Status status = Status::MemLimitExceeded();
- status.AddDetail(Substitute(PREPARE_FOR_READ_FAILED_ERROR_MSG, id_));
- return status;
+ return mem_tracker()->MemLimitExceeded(
+ state, Substitute(PREPARE_FOR_READ_FAILED_ERROR_MSG, id_));
}
}
@@ -829,9 +828,8 @@ Status PartitionedHashJoinNode::PrepareNextPartition(RuntimeState* state) {
bool got_read_buffer;
RETURN_IF_ERROR(input_partition_->probe_rows()->PrepareForRead(true, &got_read_buffer));
if (!got_read_buffer) {
- Status status = Status::MemLimitExceeded();
- status.AddDetail(Substitute(PREPARE_FOR_READ_FAILED_ERROR_MSG, id_));
- return status;
+ return mem_tracker()->MemLimitExceeded(
+ state, Substitute(PREPARE_FOR_READ_FAILED_ERROR_MSG, id_));
}
ht_ctx_->set_level(input_partition_->level_);
@@ -1130,9 +1128,8 @@ Status PartitionedHashJoinNode::PrepareNullAwareNullProbe() {
bool got_read_buffer;
RETURN_IF_ERROR(null_probe_rows_->PrepareForRead(true, &got_read_buffer));
if (!got_read_buffer) {
- Status status = Status::MemLimitExceeded();
- status.AddDetail(Substitute(PREPARE_FOR_READ_FAILED_ERROR_MSG, id_));
- return status;
+ return mem_tracker()->MemLimitExceeded(
+ runtime_state_, Substitute(PREPARE_FOR_READ_FAILED_ERROR_MSG, id_));
}
DCHECK_EQ(probe_batch_->num_rows(), 0);
probe_batch_pos_ = 0;
@@ -1211,9 +1208,8 @@ Status PartitionedHashJoinNode::PrepareNullAwarePartition() {
bool got_read_buffer;
RETURN_IF_ERROR(probe_stream->PrepareForRead(true, &got_read_buffer));
if (!got_read_buffer) {
- Status status = Status::MemLimitExceeded();
- status.AddDetail(Substitute(PREPARE_FOR_READ_FAILED_ERROR_MSG, id_));
- return status;
+ return mem_tracker()->MemLimitExceeded(
+ runtime_state_, Substitute(PREPARE_FOR_READ_FAILED_ERROR_MSG, id_));
}
probe_batch_pos_ = 0;
return Status::OK();
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7eb30309/be/src/runtime/buffered-block-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-block-mgr.cc b/be/src/runtime/buffered-block-mgr.cc
index db62922..90c1041 100644
--- a/be/src/runtime/buffered-block-mgr.cc
+++ b/be/src/runtime/buffered-block-mgr.cc
@@ -403,16 +403,15 @@ bool BufferedBlockMgr::IsCancelled() {
}
Status BufferedBlockMgr::MemLimitTooLowError(Client* client, int node_id) {
- // TODO: what to print here. We can't know the value of the entire query here.
- Status status = Status::MemLimitExceeded();
- status.AddDetail(Substitute("The memory limit is set too low to initialize spilling"
- " operator (id=$0). The minimum required memory to spill this operator is $1.",
- node_id, PrettyPrinter::Print(client->num_reserved_buffers_ * max_block_size(),
- TUnit::BYTES)));
VLOG_QUERY << "Query: " << query_id_ << ". Node=" << node_id
<< " ran out of memory: " << endl
<< DebugInternal() << endl << client->DebugString();
- return status;
+ int64_t min_memory = client->num_reserved_buffers_ * max_block_size();
+ string msg = Substitute(
+ "The memory limit is set too low to initialize spilling operator (id=$0). The "
+ "minimum required memory to spill this operator is $1.",
+ node_id, PrettyPrinter::Print(min_memory, TUnit::BYTES));
+ return client->tracker_->MemLimitExceeded(client->state_, msg);
}
Status BufferedBlockMgr::GetNewBlock(Client* client, Block* unpin_block, Block** block,
@@ -1064,10 +1063,9 @@ Status BufferedBlockMgr::FindBufferForBlock(Block* block, bool* in_mem) {
<< endl << DebugInternal() << endl << client->DebugString();
VLOG_QUERY << ss.str();
}
- Status status = Status::MemLimitExceeded();
- status.AddDetail("Query did not have enough memory to get the minimum required "
- "buffers in the block manager.");
- return status;
+ return client->tracker_->MemLimitExceeded(client->state_,
+ "Query did not have enough memory to get the minimum required buffers in the "
+ "block manager.");
}
DCHECK(buffer_desc != NULL);
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7eb30309/be/src/runtime/mem-tracker.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/mem-tracker.h b/be/src/runtime/mem-tracker.h
index 687ca18..2c8d0a9 100644
--- a/be/src/runtime/mem-tracker.h
+++ b/be/src/runtime/mem-tracker.h
@@ -344,8 +344,10 @@ class MemTracker {
/// Log the memory usage when memory limit is exceeded and return a status object with
/// details of the allocation which caused the limit to be exceeded.
+ /// If 'failed_allocation_size' is greater than zero, logs the allocation size. If
+ /// 'failed_allocation_size' is zero, nothing about the allocation size is logged.
Status MemLimitExceeded(RuntimeState* state, const std::string& details,
- int64_t failed_allocation);
+ int64_t failed_allocation = 0);
static const std::string COUNTER_NAME;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7eb30309/be/src/runtime/plan-fragment-executor.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/plan-fragment-executor.cc b/be/src/runtime/plan-fragment-executor.cc
index 35ac757..c7bc3a6 100644
--- a/be/src/runtime/plan-fragment-executor.cc
+++ b/be/src/runtime/plan-fragment-executor.cc
@@ -523,9 +523,6 @@ void PlanFragmentExecutor::UpdateStatus(const Status& status) {
{
lock_guard<mutex> l(status_lock_);
if (status_.ok()) {
- // TODO: remove this once all locations which exceed query or process memory limit
- // will log query memory usages with MemTracker::MemLimitExceeded().
- if (status.IsMemLimitExceeded()) runtime_state_->LogMemLimitExceeded(NULL, 0);
status_ = status;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7eb30309/be/src/runtime/row-batch.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/row-batch.cc b/be/src/runtime/row-batch.cc
index e602293..3d076bf 100644
--- a/be/src/runtime/row-batch.cc
+++ b/be/src/runtime/row-batch.cc
@@ -448,7 +448,6 @@ Status RowBatch::ResizeAndAllocateTupleBuffer(RuntimeState* state,
*tuple_buffer_size = static_cast<int64_t>(row_size) * capacity_;
*buffer = tuple_data_pool_.TryAllocate(*tuple_buffer_size);
if (*buffer == NULL) {
- Status status = Status::MemLimitExceeded();
return mem_tracker_->MemLimitExceeded(state, "Failed to allocate tuple buffer",
*tuple_buffer_size);
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7eb30309/be/src/runtime/runtime-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-state.h b/be/src/runtime/runtime-state.h
index 4d425b6..f86fa77 100644
--- a/be/src/runtime/runtime-state.h
+++ b/be/src/runtime/runtime-state.h
@@ -240,6 +240,8 @@ class RuntimeState {
}
/// Function for logging memory usages to the error log when memory limit is exceeded.
+ /// If 'failed_allocation_size' is greater than zero, logs the allocation size. If
+ /// 'failed_allocation_size' is zero, nothing about the allocation size is logged.
void LogMemLimitExceeded(const MemTracker* tracker, int64_t failed_allocation_size);
/// Sets query_status_ to MEM_LIMIT_EXCEEDED and logs all the registered trackers.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7eb30309/be/src/runtime/sorter.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/sorter.cc b/be/src/runtime/sorter.cc
index d04389a..6757be0 100644
--- a/be/src/runtime/sorter.cc
+++ b/be/src/runtime/sorter.cc
@@ -22,6 +22,7 @@
#include <gutil/strings/substitute.h>
#include "runtime/buffered-block-mgr.h"
+#include "runtime/mem-tracker.h"
#include "runtime/row-batch.h"
#include "runtime/runtime-state.h"
#include "runtime/sorted-run-merger.h"
@@ -493,18 +494,16 @@ Status Sorter::Run::Init() {
RETURN_IF_ERROR(
sorter_->block_mgr_->GetNewBlock(sorter_->block_mgr_client_, NULL, &block));
if (block == NULL) {
- Status status = Status::MemLimitExceeded();
- status.AddDetail(Substitute(MEM_ALLOC_FAILED_ERROR_MSG, "fixed"));
- return status;
+ return sorter_->mem_tracker_->MemLimitExceeded(
+ sorter_->state_, Substitute(MEM_ALLOC_FAILED_ERROR_MSG, "fixed"));
}
fixed_len_blocks_.push_back(block);
if (has_var_len_slots_) {
RETURN_IF_ERROR(
sorter_->block_mgr_->GetNewBlock(sorter_->block_mgr_client_, NULL, &block));
if (block == NULL) {
- Status status = Status::MemLimitExceeded();
- status.AddDetail(Substitute(MEM_ALLOC_FAILED_ERROR_MSG, "variable"));
- return status;
+ return sorter_->mem_tracker_->MemLimitExceeded(
+ sorter_->state_, Substitute(MEM_ALLOC_FAILED_ERROR_MSG, "variable"));
}
var_len_blocks_.push_back(block);
if (initial_run_) {
@@ -512,9 +511,8 @@ Status Sorter::Run::Init() {
RETURN_IF_ERROR(sorter_->block_mgr_->GetNewBlock(
sorter_->block_mgr_client_, NULL, &var_len_copy_block_));
if (var_len_copy_block_ == NULL) {
- Status status = Status::MemLimitExceeded();
- status.AddDetail(Substitute(MEM_ALLOC_FAILED_ERROR_MSG, "variable"));
- return status;
+ return sorter_->mem_tracker_->MemLimitExceeded(
+ sorter_->state_, Substitute(MEM_ALLOC_FAILED_ERROR_MSG, "variable"));
}
}
}
@@ -1549,9 +1547,8 @@ Status Sorter::CreateMerger(int max_num_runs) {
// TODO: IMPALA-3200: we should not need this logic once we have reliable
// reservations (IMPALA-3200).
if (merging_runs_.size() < 2) {
- Status status = Status::MemLimitExceeded();
- status.AddDetail(Substitute(MERGE_FAILED_ERROR_MSG, merging_runs_.size()));
- return status;
+ return mem_tracker_->MemLimitExceeded(
+ state_, Substitute(MERGE_FAILED_ERROR_MSG, merging_runs_.size()));
}
// Merge the runs that we were able to prepare.
break;