You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2016/12/15 23:01:08 UTC

[42/50] [abbrv] incubator-impala git commit: IMPALA-4654: KuduScanner must return when ReachedLimit()

IMPALA-4654: KuduScanner must return when ReachedLimit()

Fixes a bug in the KuduScanner where the scan node's limit
was not respected and thus the scanner thread would
continue executing until the scan range was fully consumed.
This could result in completed queries leaving fragments
running and those threads could be using significant CPU and
memory.

For example, the query 'select * from tpch_kudu.lineitem
limit 90' when running in the minicluster and lineitem is
partitioned into 3 hash partitions would end up leaving a
scanner thread running for ~60 seconds. In real world
scenarios this can cause unexpected resource consumption.
This could build up over time leading to query failures if
these queries are submitted frequently.

The fix is to ensure KuduScanner::GetNext() returns with
eos=true when it finds ReachedLimit=true. An unnecessary and
somewhat confusing flag 'batch_done' was being returned by a
helper function DecodeRowsIntoRowBatch, which isn't
necessary and was removed in order to make it more clear how
the code in GetNext() should behave.

Change-Id: Iaddd51111a1b2647995d68e6d37d0500b3a322de
Reviewed-on: http://gerrit.cloudera.org:8080/5493
Reviewed-by: Alex Behm <al...@cloudera.com>
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Reviewed-by: Dan Hecht <dh...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/652e7d56
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/652e7d56
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/652e7d56

Branch: refs/heads/hadoop-next
Commit: 652e7d56d9ac52a8c3d36ca4b04298d4b89897aa
Parents: b222d90
Author: Matthew Jacobs <mj...@cloudera.com>
Authored: Tue Dec 13 14:57:01 2016 -0800
Committer: Internal Jenkins <cl...@gerrit.cloudera.org>
Committed: Wed Dec 14 23:24:47 2016 +0000

----------------------------------------------------------------------
 be/src/exec/kudu-scanner.cc   | 25 +++++++------------------
 be/src/exec/kudu-scanner.h    | 10 ++++++----
 tests/query_test/test_kudu.py | 12 ++++++------
 3 files changed, 19 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/652e7d56/be/src/exec/kudu-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-scanner.cc b/be/src/exec/kudu-scanner.cc
index 55975e0..9ec5201 100644
--- a/be/src/exec/kudu-scanner.cc
+++ b/be/src/exec/kudu-scanner.cc
@@ -105,12 +105,11 @@ Status KuduScanner::GetNext(RowBatch* row_batch, bool* eos) {
     RETURN_IF_CANCELLED(state_);
 
     if (cur_kudu_batch_num_read_ < cur_kudu_batch_.NumRows()) {
-      bool batch_done;
-      RETURN_IF_ERROR(DecodeRowsIntoRowBatch(row_batch, &tuple, &batch_done));
-      if (batch_done) break;
+      RETURN_IF_ERROR(DecodeRowsIntoRowBatch(row_batch, &tuple));
+      if (row_batch->AtCapacity()) break;
     }
 
-    if (scanner_->HasMoreRows()) {
+    if (scanner_->HasMoreRows() && !scan_node_->ReachedLimit()) {
       RETURN_IF_ERROR(GetNextScannerBatch());
       continue;
     }
@@ -161,26 +160,19 @@ void KuduScanner::CloseCurrentClientScanner() {
   scanner_.reset();
 }
 
-Status KuduScanner::HandleEmptyProjection(RowBatch* row_batch, bool* batch_done) {
+Status KuduScanner::HandleEmptyProjection(RowBatch* row_batch) {
   int num_rows_remaining = cur_kudu_batch_.NumRows() - cur_kudu_batch_num_read_;
   int rows_to_add = std::min(row_batch->capacity() - row_batch->num_rows(),
       num_rows_remaining);
   cur_kudu_batch_num_read_ += rows_to_add;
   row_batch->CommitRows(rows_to_add);
-  // If we've reached the capacity, or the LIMIT for the scan, return.
-  if (row_batch->AtCapacity() || scan_node_->ReachedLimit()) {
-    *batch_done = true;
-  }
   return Status::OK();
 }
 
-Status KuduScanner::DecodeRowsIntoRowBatch(RowBatch* row_batch, Tuple** tuple_mem,
-    bool* batch_done) {
-  *batch_done = false;
-
+Status KuduScanner::DecodeRowsIntoRowBatch(RowBatch* row_batch, Tuple** tuple_mem) {
   // Short-circuit the count(*) case.
   if (scan_node_->tuple_desc_->slots().empty()) {
-    return HandleEmptyProjection(row_batch, batch_done);
+    return HandleEmptyProjection(row_batch);
   }
 
   // Iterate through the Kudu rows, evaluate conjuncts and deep-copy survivors into
@@ -205,10 +197,7 @@ Status KuduScanner::DecodeRowsIntoRowBatch(RowBatch* row_batch, Tuple** tuple_me
     row->SetTuple(0, *tuple_mem);
     row_batch->CommitLastRow();
     // If we've reached the capacity, or the LIMIT for the scan, return.
-    if (row_batch->AtCapacity() || scan_node_->ReachedLimit()) {
-      *batch_done = true;
-      break;
-    }
+    if (row_batch->AtCapacity() || scan_node_->ReachedLimit()) break;
     // Move to the next tuple in the tuple buffer.
     *tuple_mem = next_tuple(*tuple_mem);
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/652e7d56/be/src/exec/kudu-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-scanner.h b/be/src/exec/kudu-scanner.h
index bf84b08..8c8c663 100644
--- a/be/src/exec/kudu-scanner.h
+++ b/be/src/exec/kudu-scanner.h
@@ -61,14 +61,16 @@ class KuduScanner {
  private:
   /// Handles the case where the projection is empty (e.g. count(*)).
   /// Does this by adding sets of rows to 'row_batch' instead of adding one-by-one.
-  Status HandleEmptyProjection(RowBatch* row_batch, bool* batch_done);
+  Status HandleEmptyProjection(RowBatch* row_batch);
 
   /// Decodes rows previously fetched from kudu, now in 'cur_rows_' into a RowBatch.
   ///  - 'batch' is the batch that will point to the new tuples.
   ///  - *tuple_mem should be the location to output tuples.
-  ///  - Sets 'batch_done' to true to indicate that the batch was filled to capacity or
-  ///    the limit was reached.
-  Status DecodeRowsIntoRowBatch(RowBatch* batch, Tuple** tuple_mem, bool* batch_done);
+  /// Returns OK when one of the following conditions occur:
+  ///  - cur_kudu_batch_ is fully consumed
+  ///  - batch is full
+  ///  - scan_node_ limit has been reached
+  Status DecodeRowsIntoRowBatch(RowBatch* batch, Tuple** tuple_mem);
 
   /// Fetches the next batch of rows from the current kudu::client::KuduScanner.
   Status GetNextScannerBatch();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/652e7d56/tests/query_test/test_kudu.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_kudu.py b/tests/query_test/test_kudu.py
index 17769bd..5b28120 100644
--- a/tests/query_test/test_kudu.py
+++ b/tests/query_test/test_kudu.py
@@ -617,11 +617,11 @@ class TestKuduMemLimits(KuduTestSuite):
           raise
         assert "Memory limit exceeded" in str(e)
 
-    # IMPALA-4645: Wait for fragments to complete; in some tests KuduScanNodes took some
-    # time to Close() after the query returned all rows. This is necessary to ensure
-    # these queries do not impact other tests.
-    # TODO: Scan nodes shouldn't take so long to shutdown; remove when this is
-    # fixed (IMPALA-4654).
+    # IMPALA-4654: Validate the fix for a bug where LimitReached() wasn't respected in
+    # the KuduScanner and the limit query above would result in a fragment running an
+    # additional minute. This ensures that the num fragments 'in flight' reaches 0 in
+    # less time than IMPALA-4654 was reproducing (~60sec) but yet still enough time that
+    # this test won't be flaky.
     verifiers = [ MetricVerifier(i.service) for i in ImpalaCluster().impalads ]
     for v in verifiers:
-      v.wait_for_metric("impala-server.num-fragments-in-flight", 0, timeout=120)
+      v.wait_for_metric("impala-server.num-fragments-in-flight", 0, timeout=30)