You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2016/05/23 15:40:36 UTC

[10/17] incubator-impala git commit: IMPALA-3242: Remove most usages of RuntimeState::SetMemLimitExceeded()

IMPALA-3242: Remove most usages of RuntimeState::SetMemLimitExceeded()

There are multiple places in the code which call
RuntimeState::SetMemLimitExceeded(). Most of them are
unnecessary as the error status constructed will eventually
be propagated up the tree of exec nodes. There is no obvious
reason to treat query memory limit exceeded differently.
In some cases such as scan-node, calling SetMemLimitExceeded()
is actually confusing as all scanner threads may pick up error
status when any thread exceeds query memory limit, causing a
lot of noise in the log.

This change replaces most calls to RuntimeState::SetMemLimitExceeded()
with MemTracker::MemLimitExceeded(). The remaining places are:
the old hash table code, the UDF framework and QueryMaintenance()
which checks for memory limit periodically. The query maintenance
case will be removed eventually once IMPALA-2399 is fixed.

Change-Id: Ic0ca128c768d1e73713866e8c513a1b75e6b4b59
Reviewed-on: http://gerrit.cloudera.org:8080/3140
Reviewed-by: Michael Ho <kw...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/0243a21d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/0243a21d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/0243a21d

Branch: refs/heads/master
Commit: 0243a21da874f850381d49ad4fed273a1b1fa997
Parents: 7d5d36a
Author: Michael Ho <kw...@cloudera.com>
Authored: Thu May 19 01:14:10 2016 -0700
Committer: Tim Armstrong <ta...@cloudera.com>
Committed: Mon May 23 08:40:19 2016 -0700

----------------------------------------------------------------------
 be/src/exec/hash-table.cc                       |  2 +-
 be/src/exec/hash-table.h                        |  3 ++
 be/src/exec/hdfs-rcfile-scanner.cc              |  7 +--
 be/src/exec/hdfs-scan-node.cc                   |  4 +-
 be/src/exec/partitioned-aggregation-node.cc     | 56 ++++++++------------
 be/src/exec/partitioned-aggregation-node.h      |  4 ++
 be/src/exec/partitioned-hash-join-node.cc       | 17 +++---
 be/src/runtime/plan-fragment-executor.cc        |  4 +-
 be/src/runtime/runtime-state.h                  |  6 +--
 be/src/service/query-exec-state.cc              |  7 +--
 common/thrift/generate_error_codes.py           |  9 +++-
 .../tpch/queries/tpch-outer-joins.test          |  2 +-
 12 files changed, 60 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0243a21d/be/src/exec/hash-table.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hash-table.cc b/be/src/exec/hash-table.cc
index 7ff2886..76c9505 100644
--- a/be/src/exec/hash-table.cc
+++ b/be/src/exec/hash-table.cc
@@ -261,7 +261,7 @@ Status HashTableCtx::ExprValuesCache::Init(RuntimeState* state,
       MAX_EXPR_VALUES_ARRAY_SIZE / expr_values_bytes_per_row_));
 
   int mem_usage = MemUsage(capacity_, expr_values_bytes_per_row_, num_exprs_);
-  if (!tracker->TryConsume(mem_usage)) {
+  if (UNLIKELY(!tracker->TryConsume(mem_usage))) {
     capacity_ = 0;
     string details = Substitute("HashTableCtx::ExprValuesCache failed to allocate $0 bytes.",
         mem_usage);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0243a21d/be/src/exec/hash-table.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hash-table.h b/be/src/exec/hash-table.h
index 614917a..55dd120 100644
--- a/be/src/exec/hash-table.h
+++ b/be/src/exec/hash-table.h
@@ -612,6 +612,9 @@ class HashTable {
     return num_buckets * sizeof(Bucket);
   }
 
+  /// Return the size of a hash table bucket in bytes.
+  static int64_t BucketSize() { return sizeof(Bucket); }
+
   /// Returns the memory occupied by the hash table, takes into account the number of
   /// duplicates.
   int64_t CurrentMemSize() const;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0243a21d/be/src/exec/hdfs-rcfile-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-rcfile-scanner.cc b/be/src/exec/hdfs-rcfile-scanner.cc
index bd37900..ca24394 100644
--- a/be/src/exec/hdfs-rcfile-scanner.cc
+++ b/be/src/exec/hdfs-rcfile-scanner.cc
@@ -263,9 +263,10 @@ Status HdfsRCFileScanner::ReadRowGroup() {
       // The row group length depends on the user data and can be very big. This
       // can cause us to go way over the mem limit so use TryAllocate instead.
       row_group_buffer_ = data_buffer_pool_->TryAllocate(row_group_length_);
-      if (row_group_length_ > 0 && row_group_buffer_ == NULL) {
-        return state_->SetMemLimitExceeded(
-            scan_node_->mem_tracker(), row_group_length_);
+      if (UNLIKELY(row_group_buffer_ == NULL)) {
+        string details("RC file scanner failed to allocate row group buffer.");
+        return scan_node_->mem_tracker()->MemLimitExceeded(state_, details,
+            row_group_length_);
       }
       row_group_buffer_size_ = row_group_length_;
     }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0243a21d/be/src/exec/hdfs-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node.cc b/be/src/exec/hdfs-scan-node.cc
index a4ecbc7..4511867 100644
--- a/be/src/exec/hdfs-scan-node.cc
+++ b/be/src/exec/hdfs-scan-node.cc
@@ -266,7 +266,6 @@ Status HdfsScanNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos
   }
 
   Status status = GetNextInternal(state, row_batch, eos);
-  if (status.IsMemLimitExceeded()) state->SetMemLimitExceeded();
   if (!status.ok() || *eos) StopAndFinalizeCounters();
   return status;
 }
@@ -1093,7 +1092,6 @@ void HdfsScanNode::ScannerThread() {
         status_ = status;
       }
 
-      if (status.IsMemLimitExceeded()) runtime_state_->SetMemLimitExceeded();
       SetDone();
       break;
     }
@@ -1198,7 +1196,7 @@ Status HdfsScanNode::ProcessSplit(const vector<FilterContext>& filter_ctxs,
   }
 
   status = scanner->ProcessSplit();
-  if (VLOG_QUERY_IS_ON && !status.ok()) {
+  if (VLOG_QUERY_IS_ON && !status.ok() && !status.IsCancelled()) {
     // This thread hit an error, record it and bail
     stringstream ss;
     ss << "Scan node (id=" << id() << ") ran into a parse error for scan range "

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0243a21d/be/src/exec/partitioned-aggregation-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-aggregation-node.cc b/be/src/exec/partitioned-aggregation-node.cc
index c2c884a..51478fc 100644
--- a/be/src/exec/partitioned-aggregation-node.cc
+++ b/be/src/exec/partitioned-aggregation-node.cc
@@ -392,12 +392,10 @@ Status PartitionedAggregationNode::CopyStringData(const SlotDescriptor* slot_des
         tuple->GetSlot(slot_desc->tuple_offset()));
     if (sv == NULL || sv->len == 0) continue;
     char* new_ptr = reinterpret_cast<char*>(pool->TryAllocate(sv->len));
-    if (new_ptr == NULL) {
-      Status s = Status::MemLimitExceeded();
-      s.AddDetail(Substitute("Cannot perform aggregation at node with id $0."
-              " Failed to allocate $1 output bytes.", id_, sv->len));
-      state_->SetMemLimitExceeded();
-      return s;
+    if (UNLIKELY(new_ptr == NULL)) {
+      string details = Substitute("Cannot perform aggregation at node with id $0."
+          " Failed to allocate $1 output bytes.", id_, sv->len);
+      return pool->mem_tracker()->MemLimitExceeded(state_, details, sv->len);
     }
     memcpy(new_ptr, sv->ptr, sv->len);
     sv->ptr = new_ptr;
@@ -761,11 +759,11 @@ bool PartitionedAggregationNode::Partition::InitHashTable() {
   // TODO: we could switch to 64 bit hashes and then we don't need a max size.
   // It might be reasonable to limit individual hash table size for other reasons
   // though. Always start with small buffers.
-  // TODO: How many buckets? We currently use a default value, 1024.
-  static const int64_t PAGG_DEFAULT_HASH_TABLE_SZ = 1024;
   hash_tbl.reset(HashTable::Create(parent->state_, parent->block_mgr_client_,
       false, 1, NULL, 1L << (32 - NUM_PARTITIONING_BITS),
       PAGG_DEFAULT_HASH_TABLE_SZ));
+  // Please update the error message in CreateHashPartitions() if initial size of
+  // hash table changes.
   return hash_tbl->Init();
 }
 
@@ -927,13 +925,12 @@ Tuple* PartitionedAggregationNode::ConstructIntermediateTuple(
     const vector<FunctionContext*>& agg_fn_ctxs, MemPool* pool, Status* status) {
   const int fixed_size = intermediate_tuple_desc_->byte_size();
   const int varlen_size = GroupingExprsVarlenSize();
-  uint8_t* tuple_data = pool->TryAllocate(fixed_size + varlen_size);
-  if (tuple_data == NULL) {
-    *status = Status::MemLimitExceeded();
-    status->AddDetail(Substitute("Cannot perform aggregation at node with id $0."
-        " Failed to allocate $1 bytes for intermediate tuple.", id_,
-        fixed_size + varlen_size));
-    state_->SetMemLimitExceeded();
+  const int tuple_data_size = fixed_size + varlen_size;
+  uint8_t* tuple_data = pool->TryAllocate(tuple_data_size);
+  if (UNLIKELY(tuple_data == NULL)) {
+    string details = Substitute("Cannot perform aggregation at node with id $0. Failed "
+        "to allocate $1 bytes for intermediate tuple.", id_, tuple_data_size);
+    *status = pool->mem_tracker()->MemLimitExceeded(state_, details, tuple_data_size);
     return NULL;
   }
   memset(tuple_data, 0, fixed_size);
@@ -1127,9 +1124,8 @@ void PartitionedAggregationNode::DebugString(int indentation_level,
 
 Status PartitionedAggregationNode::CreateHashPartitions(int level) {
   if (is_streaming_preagg_) DCHECK_EQ(level, 0);
-  if (level >= MAX_PARTITION_DEPTH) {
-    return state_->SetMemLimitExceeded(ErrorMsg(
-        TErrorCode::PARTITIONED_AGG_MAX_PARTITION_DEPTH, id_, MAX_PARTITION_DEPTH));
+  if (UNLIKELY(level >= MAX_PARTITION_DEPTH)) {
+    return Status(TErrorCode::PARTITIONED_AGG_MAX_PARTITION_DEPTH, id_, MAX_PARTITION_DEPTH);
   }
   ht_ctx_->set_level(level);
 
@@ -1148,16 +1144,15 @@ Status PartitionedAggregationNode::CreateHashPartitions(int level) {
   // Now that all the streams are reserved (meaning we have enough memory to execute
   // the algorithm), allocate the hash tables. These can fail and we can still continue.
   for (int i = 0; i < PARTITION_FANOUT; ++i) {
-    if (!hash_partitions_[i]->InitHashTable()) {
+    if (UNLIKELY(!hash_partitions_[i]->InitHashTable())) {
       // We don't spill on preaggregations. If we have so little memory that we can't
       // allocate small hash tables, the mem limit is just too low.
       if (is_streaming_preagg_) {
-        Status status = Status::MemLimitExceeded();
-        status.AddDetail(Substitute("Cannot perform aggregation at node with id $0."
-              " Failed to initialize hash table in preaggregation. The memory limit"
-              " is too low to execute the query.", id_));
-        state_->SetMemLimitExceeded();
-        return status;
+        int64_t alloc_size = PAGG_DEFAULT_HASH_TABLE_SZ * HashTable::BucketSize();
+        string details = Substitute("Cannot perform aggregation at node with id $0."
+            " Failed to initialize hash table in preaggregation. The memory limit"
+            " is too low to execute the query.", id_);
+        return mem_tracker()->MemLimitExceeded(state_, details, alloc_size);
       }
       RETURN_IF_ERROR(hash_partitions_[i]->Spill());
     }
@@ -1255,14 +1250,9 @@ Status PartitionedAggregationNode::NextPartition() {
       int64_t largest_partition = LargestSpilledPartition();
       DCHECK_GE(num_input_rows, largest_partition) << "Cannot have a partition with "
           "more rows than the input";
-      if (num_input_rows == largest_partition) {
-        Status status = Status::MemLimitExceeded();
-        status.AddDetail(Substitute("Cannot perform aggregation at node with id $0. "
-            "Repartitioning did not reduce the size of a spilled partition. "
-            "Repartitioning level $1. Number of rows $2.",
-            id_, partition->level + 1, num_input_rows));
-        state_->SetMemLimitExceeded();
-        return status;
+      if (UNLIKELY(num_input_rows == largest_partition)) {
+        return Status(TErrorCode::PARTITIONED_AGG_REPARTITION_FAILS, id_,
+            partition->level + 1, num_input_rows);
       }
       RETURN_IF_ERROR(MoveHashPartitions(num_input_rows));
     }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0243a21d/be/src/exec/partitioned-aggregation-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-aggregation-node.h b/be/src/exec/partitioned-aggregation-node.h
index 73976c2..300b9e0 100644
--- a/be/src/exec/partitioned-aggregation-node.h
+++ b/be/src/exec/partitioned-aggregation-node.h
@@ -158,6 +158,10 @@ class PartitionedAggregationNode : public ExecNode {
   /// TODO: we can revisit and try harder to explicitly detect skew.
   static const int MAX_PARTITION_DEPTH = 16;
 
+  /// Default initial number of buckets in a hash table.
+  /// TODO: rethink this ?
+  static const int64_t PAGG_DEFAULT_HASH_TABLE_SZ = 1024;
+
   /// Codegen doesn't allow for automatic Status variables because then exception
   /// handling code is needed to destruct the Status, and our function call substitution
   /// doesn't know how to deal with the LLVM IR 'invoke' instruction. Workaround that by

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0243a21d/be/src/exec/partitioned-hash-join-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-node.cc b/be/src/exec/partitioned-hash-join-node.cc
index 8aa48fa..e45cbb6 100644
--- a/be/src/exec/partitioned-hash-join-node.cc
+++ b/be/src/exec/partitioned-hash-join-node.cc
@@ -633,9 +633,9 @@ Status PartitionedHashJoinNode::ConstructBuildSide(RuntimeState* state) {
 }
 
 Status PartitionedHashJoinNode::ProcessBuildInput(RuntimeState* state, int level) {
-  if (level >= MAX_PARTITION_DEPTH) {
-    return state->SetMemLimitExceeded(ErrorMsg(
-        TErrorCode::PARTITIONED_HASH_JOIN_MAX_PARTITION_DEPTH, id_, MAX_PARTITION_DEPTH));
+  if (UNLIKELY(level >= MAX_PARTITION_DEPTH)) {
+    return Status(TErrorCode::PARTITIONED_HASH_JOIN_MAX_PARTITION_DEPTH, id_,
+        MAX_PARTITION_DEPTH);
   }
 
   DCHECK(hash_partitions_.empty());
@@ -854,14 +854,9 @@ Status PartitionedHashJoinNode::PrepareNextPartition(RuntimeState* state) {
     int64_t largest_partition = LargestSpilledPartition();
     DCHECK_GE(num_input_rows, largest_partition) << "Cannot have a partition with "
         "more rows than the input";
-    if (num_input_rows == largest_partition) {
-      Status status = Status::MemLimitExceeded();
-      status.AddDetail(Substitute("Cannot perform hash join at node with id $0. "
-          "Repartitioning did not reduce the size of a spilled partition. "
-          "Repartitioning level $1. Number of rows $2.",
-          id_, input_partition_->level_ + 1, num_input_rows));
-      state->SetMemLimitExceeded();
-      return status;
+    if (UNLIKELY(num_input_rows == largest_partition)) {
+      return Status(TErrorCode::PARTITIONED_HASH_JOIN_REPARTITION_FAILS, id_,
+          input_partition_->level_ + 1, num_input_rows);
     }
   } else {
     DCHECK(hash_partitions_.empty());

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0243a21d/be/src/runtime/plan-fragment-executor.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/plan-fragment-executor.cc b/be/src/runtime/plan-fragment-executor.cc
index 820369e..99ed75d 100644
--- a/be/src/runtime/plan-fragment-executor.cc
+++ b/be/src/runtime/plan-fragment-executor.cc
@@ -529,7 +529,9 @@ void PlanFragmentExecutor::UpdateStatus(const Status& status) {
   {
     lock_guard<mutex> l(status_lock_);
     if (status_.ok()) {
-      if (status.IsMemLimitExceeded()) runtime_state_->SetMemLimitExceeded();
+      // TODO: remove this once all locations which exceed query or process memory limit
+      // will log query memory usages with MemTracker::MemLimitExceeded().
+      if (status.IsMemLimitExceeded()) runtime_state_->LogMemLimitExceeded(NULL, 0);
       status_ = status;
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0243a21d/be/src/runtime/runtime-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-state.h b/be/src/runtime/runtime-state.h
index cfc76c1..f7c0248 100644
--- a/be/src/runtime/runtime-state.h
+++ b/be/src/runtime/runtime-state.h
@@ -241,13 +241,11 @@ class RuntimeState {
   /// This value and tracker are only used for error reporting.
   /// If 'msg' is non-NULL, it will be appended to query_status_ in addition to the
   /// generic "Memory limit exceeded" error.
+  /// Note that this interface is deprecated and MemTracker::LimitExceeded() should be
+  /// used and the error status should be returned.
   Status SetMemLimitExceeded(MemTracker* tracker = NULL,
       int64_t failed_allocation_size = 0, const ErrorMsg* msg = NULL);
 
-  Status SetMemLimitExceeded(const ErrorMsg& msg) {
-    return SetMemLimitExceeded(NULL, 0, &msg);
-  }
-
   /// Returns a non-OK status if query execution should stop (e.g., the query was cancelled
   /// or a mem limit was exceeded). Exec nodes should check this periodically so execution
   /// doesn't continue if the query terminates abnormally.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0243a21d/be/src/service/query-exec-state.cc
----------------------------------------------------------------------
diff --git a/be/src/service/query-exec-state.cc b/be/src/service/query-exec-state.cc
index 4f97143..b534902 100644
--- a/be/src/service/query-exec-state.cc
+++ b/be/src/service/query-exec-state.cc
@@ -791,9 +791,10 @@ Status ImpalaServer::QueryExecState::FetchRowsInternal(const int32_t max_rows,
         fetched_rows->ByteSize(num_rows_fetched_from_cache, fetched_rows->size());
     MemTracker* query_mem_tracker = coord_->query_mem_tracker();
     // Count the cached rows towards the mem limit.
-    if (!query_mem_tracker->TryConsume(delta_bytes)) {
-      return coord_->runtime_state()->SetMemLimitExceeded(
-          query_mem_tracker, delta_bytes);
+    if (UNLIKELY(!query_mem_tracker->TryConsume(delta_bytes))) {
+      string details("Failed to allocate memory for result cache.");
+      return query_mem_tracker->MemLimitExceeded(coord_->runtime_state(), details,
+          delta_bytes);
     }
     // Append all rows fetched from the coordinator into the cache.
     int num_rows_added = result_cache_->AddRows(

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0243a21d/common/thrift/generate_error_codes.py
----------------------------------------------------------------------
diff --git a/common/thrift/generate_error_codes.py b/common/thrift/generate_error_codes.py
index 5a8a033..671d27e 100755
--- a/common/thrift/generate_error_codes.py
+++ b/common/thrift/generate_error_codes.py
@@ -236,8 +236,15 @@ error_codes = (
   ("KUDU_NOT_SUPPORTED_ON_OS", 75, "Kudu is not supported on this operating system."),
 
   ("KUDU_NOT_ENABLED", 76, "Kudu features are disabled by the startup flag "
-   "--disable_kudu.")
+   "--disable_kudu."),
 
+  ("PARTITIONED_HASH_JOIN_REPARTITION_FAILS", 77, "Cannot perform hash join at node with "
+   "id $0. Repartitioning did not reduce the size of a spilled partition. Repartitioning "
+   "level $1. Number of rows $2."),
+
+  ("PARTITIONED_AGG_REPARTITION_FAILS", 78,  "Cannot perform aggregation at node with "
+   "id $0. Repartitioning did not reduce the size of a spilled partition. Repartitioning "
+   "level $1. Number of rows $2."),
 )
 
 import sys

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0243a21d/testdata/workloads/tpch/queries/tpch-outer-joins.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/tpch/queries/tpch-outer-joins.test b/testdata/workloads/tpch/queries/tpch-outer-joins.test
index 1c255ab..efba5e8 100644
--- a/testdata/workloads/tpch/queries/tpch-outer-joins.test
+++ b/testdata/workloads/tpch/queries/tpch-outer-joins.test
@@ -30,7 +30,7 @@ RIGHT OUTER JOIN lineitem l ON o.o_orderkey =  if(l.l_orderkey % 2 = 0, 0, l.l_o
 ORDER BY l_receiptdate, l_orderkey, l_shipdate
 limit 10
 ---- CATCH
-Memory limit exceeded
+Repartitioning did not reduce the size of a spilled partition
 ====
 ---- QUERY
 # Regression test for IMPALA-2612. The following query will cause CastToChar