You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2016/08/19 05:48:42 UTC

[2/2] incubator-impala git commit: IMPALA-3090: always log memory limit errors

IMPALA-3090: always log memory limit errors

Consistently log memory limit errors so that the error message contains
a dump of the query memory trackers at the time that the memory limit
was hit (instead of after the fact when the query is already partially
cleaned up).

Testing:
Exhaustive build passed. Ran local stress test for a bit.

Change-Id: If5ec5572b0e26898da352b7e6b11eb01c6edb2e5
Reviewed-on: http://gerrit.cloudera.org:8080/4049
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/7eb30309
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/7eb30309
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/7eb30309

Branch: refs/heads/master
Commit: 7eb30309f3847f416f204bd5f7d6925102e94b67
Parents: d113205
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Thu Aug 18 12:17:27 2016 -0700
Committer: Internal Jenkins <cl...@gerrit.cloudera.org>
Committed: Fri Aug 19 05:36:02 2016 +0000

----------------------------------------------------------------------
 be/src/exec/analytic-eval-node.cc         | 12 +++++++-----
 be/src/exec/partitioned-hash-join-node.cc | 20 ++++++++------------
 be/src/runtime/buffered-block-mgr.cc      | 20 +++++++++-----------
 be/src/runtime/mem-tracker.h              |  4 +++-
 be/src/runtime/plan-fragment-executor.cc  |  3 ---
 be/src/runtime/row-batch.cc               |  1 -
 be/src/runtime/runtime-state.h            |  2 ++
 be/src/runtime/sorter.cc                  | 21 +++++++++------------
 8 files changed, 38 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7eb30309/be/src/exec/analytic-eval-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/analytic-eval-node.cc b/be/src/exec/analytic-eval-node.cc
index d401909..c9e35d8 100644
--- a/be/src/exec/analytic-eval-node.cc
+++ b/be/src/exec/analytic-eval-node.cc
@@ -22,6 +22,7 @@
 #include "exprs/agg-fn-evaluator.h"
 #include "runtime/buffered-tuple-stream.inline.h"
 #include "runtime/descriptors.h"
+#include "runtime/mem-tracker.h"
 #include "runtime/row-batch.h"
 #include "runtime/runtime-state.h"
 #include "udf/udf-internal.h"
@@ -31,6 +32,11 @@
 
 static const int MAX_TUPLE_POOL_SIZE = 8 * 1024 * 1024; // 8MB
 
+const string PREPARE_FOR_READ_FAILED_ERROR_MSG =
+    "Failed to acquire initial read buffer for analytic function evaluation. Reducing "
+    "query concurrency or increasing the memory limit may help this query to complete "
+    "successfully.";
+
 using namespace strings;
 
 namespace impala {
@@ -197,11 +203,7 @@ Status AnalyticEvalNode::Open(RuntimeState* state) {
   bool got_read_buffer;
   RETURN_IF_ERROR(input_stream_->PrepareForRead(true, &got_read_buffer));
   if (!got_read_buffer) {
-    Status status = Status::MemLimitExceeded();
-    status.AddDetail("Failed to acquire initial read buffer for analytic function "
-        "evaluation. Reducing query concurrency or increasing the memory limit may "
-        "help this query to complete successfully.");
-    return status;
+    return mem_tracker()->MemLimitExceeded(state, PREPARE_FOR_READ_FAILED_ERROR_MSG);
   }
 
   DCHECK_EQ(evaluators_.size(), fn_ctxs_.size());

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7eb30309/be/src/exec/partitioned-hash-join-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-node.cc b/be/src/exec/partitioned-hash-join-node.cc
index a2b5001..d3aaf3d 100644
--- a/be/src/exec/partitioned-hash-join-node.cc
+++ b/be/src/exec/partitioned-hash-join-node.cc
@@ -659,9 +659,8 @@ Status PartitionedHashJoinNode::ProcessBuildInput(RuntimeState* state, int level
     RETURN_IF_ERROR(
         input_partition_->build_rows()->PrepareForRead(true, &got_read_buffer));
     if (!got_read_buffer) {
-      Status status = Status::MemLimitExceeded();
-      status.AddDetail(Substitute(PREPARE_FOR_READ_FAILED_ERROR_MSG, id_));
-      return status;
+      return mem_tracker()->MemLimitExceeded(
+          state, Substitute(PREPARE_FOR_READ_FAILED_ERROR_MSG, id_));
     }
   }
 
@@ -829,9 +828,8 @@ Status PartitionedHashJoinNode::PrepareNextPartition(RuntimeState* state) {
   bool got_read_buffer;
   RETURN_IF_ERROR(input_partition_->probe_rows()->PrepareForRead(true, &got_read_buffer));
   if (!got_read_buffer) {
-    Status status = Status::MemLimitExceeded();
-    status.AddDetail(Substitute(PREPARE_FOR_READ_FAILED_ERROR_MSG, id_));
-    return status;
+    return mem_tracker()->MemLimitExceeded(
+        state, Substitute(PREPARE_FOR_READ_FAILED_ERROR_MSG, id_));
   }
   ht_ctx_->set_level(input_partition_->level_);
 
@@ -1130,9 +1128,8 @@ Status PartitionedHashJoinNode::PrepareNullAwareNullProbe() {
   bool got_read_buffer;
   RETURN_IF_ERROR(null_probe_rows_->PrepareForRead(true, &got_read_buffer));
   if (!got_read_buffer) {
-    Status status = Status::MemLimitExceeded();
-    status.AddDetail(Substitute(PREPARE_FOR_READ_FAILED_ERROR_MSG, id_));
-    return status;
+    return mem_tracker()->MemLimitExceeded(
+        runtime_state_, Substitute(PREPARE_FOR_READ_FAILED_ERROR_MSG, id_));
   }
   DCHECK_EQ(probe_batch_->num_rows(), 0);
   probe_batch_pos_ = 0;
@@ -1211,9 +1208,8 @@ Status PartitionedHashJoinNode::PrepareNullAwarePartition() {
   bool got_read_buffer;
   RETURN_IF_ERROR(probe_stream->PrepareForRead(true, &got_read_buffer));
   if (!got_read_buffer) {
-    Status status = Status::MemLimitExceeded();
-    status.AddDetail(Substitute(PREPARE_FOR_READ_FAILED_ERROR_MSG, id_));
-    return status;
+    return mem_tracker()->MemLimitExceeded(
+        runtime_state_, Substitute(PREPARE_FOR_READ_FAILED_ERROR_MSG, id_));
   }
   probe_batch_pos_ = 0;
   return Status::OK();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7eb30309/be/src/runtime/buffered-block-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-block-mgr.cc b/be/src/runtime/buffered-block-mgr.cc
index db62922..90c1041 100644
--- a/be/src/runtime/buffered-block-mgr.cc
+++ b/be/src/runtime/buffered-block-mgr.cc
@@ -403,16 +403,15 @@ bool BufferedBlockMgr::IsCancelled() {
 }
 
 Status BufferedBlockMgr::MemLimitTooLowError(Client* client, int node_id) {
-  // TODO: what to print here. We can't know the value of the entire query here.
-  Status status = Status::MemLimitExceeded();
-  status.AddDetail(Substitute("The memory limit is set too low to initialize spilling"
-      " operator (id=$0). The minimum required memory to spill this operator is $1.",
-      node_id, PrettyPrinter::Print(client->num_reserved_buffers_ * max_block_size(),
-      TUnit::BYTES)));
   VLOG_QUERY << "Query: " << query_id_ << ". Node=" << node_id
              << " ran out of memory: " << endl
              << DebugInternal() << endl << client->DebugString();
-  return status;
+  int64_t min_memory = client->num_reserved_buffers_ * max_block_size();
+  string msg = Substitute(
+      "The memory limit is set too low to initialize spilling operator (id=$0). The "
+      "minimum required memory to spill this operator is $1.",
+      node_id, PrettyPrinter::Print(min_memory, TUnit::BYTES));
+  return client->tracker_->MemLimitExceeded(client->state_, msg);
 }
 
 Status BufferedBlockMgr::GetNewBlock(Client* client, Block* unpin_block, Block** block,
@@ -1064,10 +1063,9 @@ Status BufferedBlockMgr::FindBufferForBlock(Block* block, bool* in_mem) {
            << endl << DebugInternal() << endl << client->DebugString();
         VLOG_QUERY << ss.str();
       }
-      Status status = Status::MemLimitExceeded();
-      status.AddDetail("Query did not have enough memory to get the minimum required "
-          "buffers in the block manager.");
-      return status;
+      return client->tracker_->MemLimitExceeded(client->state_,
+          "Query did not have enough memory to get the minimum required buffers in the "
+          "block manager.");
     }
 
     DCHECK(buffer_desc != NULL);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7eb30309/be/src/runtime/mem-tracker.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/mem-tracker.h b/be/src/runtime/mem-tracker.h
index 687ca18..2c8d0a9 100644
--- a/be/src/runtime/mem-tracker.h
+++ b/be/src/runtime/mem-tracker.h
@@ -344,8 +344,10 @@ class MemTracker {
 
   /// Log the memory usage when memory limit is exceeded and return a status object with
   /// details of the allocation which caused the limit to be exceeded.
+  /// If 'failed_allocation_size' is greater than zero, logs the allocation size. If
+  /// 'failed_allocation_size' is zero, nothing about the allocation size is logged.
   Status MemLimitExceeded(RuntimeState* state, const std::string& details,
-      int64_t failed_allocation);
+      int64_t failed_allocation = 0);
 
   static const std::string COUNTER_NAME;
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7eb30309/be/src/runtime/plan-fragment-executor.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/plan-fragment-executor.cc b/be/src/runtime/plan-fragment-executor.cc
index 35ac757..c7bc3a6 100644
--- a/be/src/runtime/plan-fragment-executor.cc
+++ b/be/src/runtime/plan-fragment-executor.cc
@@ -523,9 +523,6 @@ void PlanFragmentExecutor::UpdateStatus(const Status& status) {
   {
     lock_guard<mutex> l(status_lock_);
     if (status_.ok()) {
-      // TODO: remove this once all locations which exceed query or process memory limit
-      // will log query memory usages with MemTracker::MemLimitExceeded().
-      if (status.IsMemLimitExceeded()) runtime_state_->LogMemLimitExceeded(NULL, 0);
       status_ = status;
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7eb30309/be/src/runtime/row-batch.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/row-batch.cc b/be/src/runtime/row-batch.cc
index e602293..3d076bf 100644
--- a/be/src/runtime/row-batch.cc
+++ b/be/src/runtime/row-batch.cc
@@ -448,7 +448,6 @@ Status RowBatch::ResizeAndAllocateTupleBuffer(RuntimeState* state,
   *tuple_buffer_size = static_cast<int64_t>(row_size) * capacity_;
   *buffer = tuple_data_pool_.TryAllocate(*tuple_buffer_size);
   if (*buffer == NULL) {
-    Status status = Status::MemLimitExceeded();
     return mem_tracker_->MemLimitExceeded(state, "Failed to allocate tuple buffer",
         *tuple_buffer_size);
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7eb30309/be/src/runtime/runtime-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-state.h b/be/src/runtime/runtime-state.h
index 4d425b6..f86fa77 100644
--- a/be/src/runtime/runtime-state.h
+++ b/be/src/runtime/runtime-state.h
@@ -240,6 +240,8 @@ class RuntimeState {
   }
 
   /// Function for logging memory usages to the error log when memory limit is exceeded.
+  /// If 'failed_allocation_size' is greater than zero, logs the allocation size. If
+  /// 'failed_allocation_size' is zero, nothing about the allocation size is logged.
   void LogMemLimitExceeded(const MemTracker* tracker, int64_t failed_allocation_size);
 
   /// Sets query_status_ to MEM_LIMIT_EXCEEDED and logs all the registered trackers.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/7eb30309/be/src/runtime/sorter.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/sorter.cc b/be/src/runtime/sorter.cc
index d04389a..6757be0 100644
--- a/be/src/runtime/sorter.cc
+++ b/be/src/runtime/sorter.cc
@@ -22,6 +22,7 @@
 #include <gutil/strings/substitute.h>
 
 #include "runtime/buffered-block-mgr.h"
+#include "runtime/mem-tracker.h"
 #include "runtime/row-batch.h"
 #include "runtime/runtime-state.h"
 #include "runtime/sorted-run-merger.h"
@@ -493,18 +494,16 @@ Status Sorter::Run::Init() {
   RETURN_IF_ERROR(
       sorter_->block_mgr_->GetNewBlock(sorter_->block_mgr_client_, NULL, &block));
   if (block == NULL) {
-    Status status = Status::MemLimitExceeded();
-    status.AddDetail(Substitute(MEM_ALLOC_FAILED_ERROR_MSG, "fixed"));
-    return status;
+    return sorter_->mem_tracker_->MemLimitExceeded(
+        sorter_->state_, Substitute(MEM_ALLOC_FAILED_ERROR_MSG, "fixed"));
   }
   fixed_len_blocks_.push_back(block);
   if (has_var_len_slots_) {
     RETURN_IF_ERROR(
         sorter_->block_mgr_->GetNewBlock(sorter_->block_mgr_client_, NULL, &block));
     if (block == NULL) {
-      Status status = Status::MemLimitExceeded();
-      status.AddDetail(Substitute(MEM_ALLOC_FAILED_ERROR_MSG, "variable"));
-      return status;
+      return sorter_->mem_tracker_->MemLimitExceeded(
+          sorter_->state_, Substitute(MEM_ALLOC_FAILED_ERROR_MSG, "variable"));
     }
     var_len_blocks_.push_back(block);
     if (initial_run_) {
@@ -512,9 +511,8 @@ Status Sorter::Run::Init() {
       RETURN_IF_ERROR(sorter_->block_mgr_->GetNewBlock(
           sorter_->block_mgr_client_, NULL, &var_len_copy_block_));
       if (var_len_copy_block_ == NULL) {
-        Status status = Status::MemLimitExceeded();
-        status.AddDetail(Substitute(MEM_ALLOC_FAILED_ERROR_MSG, "variable"));
-        return status;
+        return sorter_->mem_tracker_->MemLimitExceeded(
+            sorter_->state_, Substitute(MEM_ALLOC_FAILED_ERROR_MSG, "variable"));
       }
     }
   }
@@ -1549,9 +1547,8 @@ Status Sorter::CreateMerger(int max_num_runs) {
       // TODO: IMPALA-3200: we should not need this logic once we have reliable
       // reservations (IMPALA-3200).
       if (merging_runs_.size() < 2) {
-        Status status = Status::MemLimitExceeded();
-        status.AddDetail(Substitute(MERGE_FAILED_ERROR_MSG, merging_runs_.size()));
-        return status;
+        return mem_tracker_->MemLimitExceeded(
+            state_, Substitute(MERGE_FAILED_ERROR_MSG, merging_runs_.size()));
       }
       // Merge the runs that we were able to prepare.
       break;