You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2017/08/23 15:41:11 UTC

incubator-impala git commit: IMPALA-5823: fix SET_DENY_RESERVATION_PROBABILITY

Repository: incubator-impala
Updated Branches:
  refs/heads/master 679ebc1ac -> d8bc570b6


IMPALA-5823: fix SET_DENY_RESERVATION_PROBABILITY

Sometimes the client is not open when the debug action fires at the
start of Open() or Prepare(). In that case we should set the
probability when the client is opened later.

This caused one of the large row tests to start failing with a "failed
to repartition" error in the aggregation. The error is a false positive
caused by two distinct keys hashing to the same partition. Removing the
check allows the query to succeed because the keys hash to different
partitions in the next round of repartitioning.

If we repeatedly get unlucky and have collisions, the query will still
fail when it reaches MAX_PARTITION_DEPTH.

Testing:
Ran TestSpilling in a loop for a couple of hours, including the
exhaustive-only tests.

Change-Id: Ib26b697544d6c2312a8e1fe91b0cf8c0917e5603
Reviewed-on: http://gerrit.cloudera.org:8080/7771
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/d8bc570b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/d8bc570b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/d8bc570b

Branch: refs/heads/master
Commit: d8bc570b67930dd136a7a10fc05dfa4b995c65fa
Parents: 679ebc1
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Mon Aug 21 16:47:50 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed Aug 23 07:18:33 2017 +0000

----------------------------------------------------------------------
 be/src/exec/exec-node.cc                    | 38 +++++++++++++++++-------
 be/src/exec/exec-node.h                     |  5 ++++
 be/src/exec/partitioned-aggregation-node.cc | 26 ----------------
 be/src/exec/partitioned-aggregation-node.h  |  5 ----
 common/thrift/generate_error_codes.py       |  4 +--
 5 files changed, 34 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d8bc570b/be/src/exec/exec-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/exec-node.cc b/be/src/exec/exec-node.cc
index afd7262..b326d94 100644
--- a/be/src/exec/exec-node.cc
+++ b/be/src/exec/exec-node.cc
@@ -244,6 +244,13 @@ Status ExecNode::ClaimBufferReservation(RuntimeState* state) {
   VLOG_FILE << id_ << " claiming reservation " << resource_profile_.min_reservation;
   state->query_state()->initial_reservations()->Claim(
       &buffer_pool_client_, resource_profile_.min_reservation);
+  if (debug_action_ == TDebugAction::SET_DENY_RESERVATION_PROBABILITY &&
+      (debug_phase_ == TExecNodePhase::PREPARE || debug_phase_ == TExecNodePhase::OPEN)) {
+    // We may not have been able to enable the debug action at the start of Prepare() or
+    // Open() because the client is not registered then. Do it now to be sure that it is
+    // effective.
+    RETURN_IF_ERROR(EnableDenyReservationDebugAction());
+  }
   return Status::OK();
 }
 
@@ -251,6 +258,22 @@ Status ExecNode::ReleaseUnusedReservation() {
   return buffer_pool_client_.DecreaseReservationTo(resource_profile_.min_reservation);
 }
 
+Status ExecNode::EnableDenyReservationDebugAction() {
+  DCHECK_EQ(debug_action_, TDebugAction::SET_DENY_RESERVATION_PROBABILITY);
+  DCHECK(buffer_pool_client_.is_registered());
+  // Parse [0.0, 1.0] probability.
+  StringParser::ParseResult parse_result;
+  double probability = StringParser::StringToFloat<double>(
+      debug_action_param_.c_str(), debug_action_param_.size(), &parse_result);
+  if (parse_result != StringParser::PARSE_SUCCESS || probability < 0.0
+      || probability > 1.0) {
+    return Status(Substitute(
+        "Invalid SET_DENY_RESERVATION_PROBABILITY param: '$0'", debug_action_param_));
+  }
+  buffer_pool_client_.SetDebugDenyIncreaseReservation(probability);
+  return Status::OK();
+}
+
 Status ExecNode::CreateTree(
     RuntimeState* state, const TPlan& plan, const DescriptorTbl& descs, ExecNode** root) {
   if (plan.nodes.size() == 0) {
@@ -463,17 +486,12 @@ Status ExecNode::ExecDebugActionImpl(TExecNodePhase::type phase, RuntimeState* s
     return mem_tracker()->MemLimitExceeded(state, "Debug Action: MEM_LIMIT_EXCEEDED");
   } else {
     DCHECK_EQ(debug_action_, TDebugAction::SET_DENY_RESERVATION_PROBABILITY);
+    // We can only enable the debug action right if the buffer pool client is registered.
+    // If the buffer client is not registered at this point (e.g. if phase is PREPARE or
+    // OPEN), then we will enable the debug action at the time when the client is
+    // registered.
     if (buffer_pool_client_.is_registered()) {
-      // Parse [0.0, 1.0] probability.
-      StringParser::ParseResult parse_result;
-      double probability = StringParser::StringToFloat<double>(
-          debug_action_param_.c_str(), debug_action_param_.size(), &parse_result);
-      if (parse_result != StringParser::PARSE_SUCCESS || probability < 0.0
-          || probability > 1.0) {
-        return Status(Substitute(
-            "Invalid SET_DENY_RESERVATION_PROBABILITY param: '$0'", debug_action_param_));
-      }
-      buffer_pool_client_.SetDebugDenyIncreaseReservation(probability);
+      RETURN_IF_ERROR(EnableDenyReservationDebugAction());
     }
   }
   return Status::OK();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d8bc570b/be/src/exec/exec-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/exec-node.h b/be/src/exec/exec-node.h
index 04470f2..2f3f714 100644
--- a/be/src/exec/exec-node.h
+++ b/be/src/exec/exec-node.h
@@ -244,6 +244,11 @@ class ExecNode {
   /// fails.
   Status ReleaseUnusedReservation() WARN_UNUSED_RESULT;
 
+  /// Enable the increase reservation denial probability on 'buffer_pool_client_' based on
+  /// the 'debug_action_' set on this node. Returns an error if 'debug_action_param_' is
+  /// invalid.
+  Status EnableDenyReservationDebugAction();
+
   /// Extends blocking queue for row batches. Row batches have a property that
   /// they must be processed in the order they were produced, even in cancellation
   /// paths. Preceding row batches can contain ptrs to memory in subsequent row batches

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d8bc570b/be/src/exec/partitioned-aggregation-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-aggregation-node.cc b/be/src/exec/partitioned-aggregation-node.cc
index b1d54a6..214810f 100644
--- a/be/src/exec/partitioned-aggregation-node.cc
+++ b/be/src/exec/partitioned-aggregation-node.cc
@@ -1190,21 +1190,6 @@ Status PartitionedAggregationNode::CheckAndResizeHashPartitions(
   return Status::OK();
 }
 
-int64_t PartitionedAggregationNode::LargestSpilledPartition() const {
-  DCHECK(!is_streaming_preagg_);
-  int64_t max_rows = 0;
-  for (int i = 0; i < hash_partitions_.size(); ++i) {
-    Partition* partition = hash_partitions_[i];
-    if (partition == nullptr || partition->is_closed || !partition->is_spilled()) {
-      continue;
-    }
-    int64_t rows = partition->aggregated_row_stream->num_rows()
-        + partition->unaggregated_row_stream->num_rows();
-    if (rows > max_rows) max_rows = rows;
-  }
-  return max_rows;
-}
-
 Status PartitionedAggregationNode::NextPartition() {
   DCHECK(output_partition_ == nullptr);
 
@@ -1336,17 +1321,6 @@ Status PartitionedAggregationNode::RepartitionSpilledPartition() {
   // spilled_partitions_/aggregated_partitions_.
   int64_t num_input_rows = partition->aggregated_row_stream->num_rows()
       + partition->unaggregated_row_stream->num_rows();
-
-  // Check if there was any reduction in the size of partitions after repartitioning.
-  int64_t largest_partition = LargestSpilledPartition();
-  DCHECK_GE(num_input_rows, largest_partition) << "Partition had more rows than input";
-  if (UNLIKELY(num_input_rows == largest_partition)) {
-    stringstream ss;
-    DebugString(2, &ss);
-    return Status(TErrorCode::PARTITIONED_AGG_REPARTITION_FAILS, id_,
-        partition->level + 1, num_input_rows, buffer_pool_client_.DebugString(),
-        ss.str());
-  }
   RETURN_IF_ERROR(MoveHashPartitions(num_input_rows));
   return Status::OK();
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d8bc570b/be/src/exec/partitioned-aggregation-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-aggregation-node.h b/be/src/exec/partitioned-aggregation-node.h
index fa8674c..ade223b 100644
--- a/be/src/exec/partitioned-aggregation-node.h
+++ b/be/src/exec/partitioned-aggregation-node.h
@@ -639,11 +639,6 @@ class PartitionedAggregationNode : public ExecNode {
   Status CheckAndResizeHashPartitions(
       bool aggregated_rows, int num_rows, const HashTableCtx* ht_ctx) WARN_UNUSED_RESULT;
 
-  /// Iterates over all the partitions in hash_partitions_ and returns the number of rows
-  /// of the largest spilled partition (in terms of number of aggregated and unaggregated
-  /// rows).
-  int64_t LargestSpilledPartition() const;
-
   /// Prepares the next partition to return results from. On return, this function
   /// initializes output_iterator_ and output_partition_. This either removes
   /// a partition from aggregated_partitions_ (and is done) or removes the next

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d8bc570b/common/thrift/generate_error_codes.py
----------------------------------------------------------------------
diff --git a/common/thrift/generate_error_codes.py b/common/thrift/generate_error_codes.py
index 32a54ca..1d3b7c6 100755
--- a/common/thrift/generate_error_codes.py
+++ b/common/thrift/generate_error_codes.py
@@ -240,9 +240,7 @@ error_codes = (
    "id $0. Repartitioning did not reduce the size of a spilled partition. Repartitioning "
    "level $1. Number of rows $2:\\n$3\\n$4"),
 
-  ("PARTITIONED_AGG_REPARTITION_FAILS", 77,  "Cannot perform aggregation at node with "
-   "id $0. Repartitioning did not reduce the size of a spilled partition. Repartitioning "
-   "level $1. Number of rows $2:\\n$3\\n$4"),
+  ("UNUSED_77", 77,  "Not in use."),
 
   ("AVRO_TRUNCATED_BLOCK", 78, "File '$0' is corrupt: truncated data block at offset $1"),