You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2019/09/19 19:29:48 UTC

[impala] branch master updated: IMPALA-8924, IMPALA-8934: Result spooling failpoint tests, fix DCHECKs

This is an automated email from the ASF dual-hosted git repository.

stakiar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 391942d  IMPALA-8924, IMPALA-8934: Result spooling failpoint tests, fix DCHECKs
391942d is described below

commit 391942d79dec18b086a85e29a75a873e74955518
Author: Sahil Takiar <ta...@gmail.com>
AuthorDate: Tue Sep 10 13:31:01 2019 -0700

    IMPALA-8924, IMPALA-8934: Result spooling failpoint tests, fix DCHECKs
    
    Adds several "failpoint" tests to test_result_spooling.py. These tests
    use debug_actions spread throughout buffered-plan-root-sink.cc to
    trigger failures while result spooling is running. The tests validate
    that all queries gracefully fail and do not cause any impalad crashes.
    
    Fixed a few bugs that came up when adding these tests, as well as the
    crash reported in IMPALA-8924 (which is now covered by the failpoint
    tests added in this patch).
    
    The first bug fixed was a DCHECK in SpillableRowBatchQueue::IsEmpty()
    where the method was being called after the queue had been closed. The
    fix is to only call IsEmpty() if IsOpen() returns true.
    
    The second bug was an issue in the cancellation path where
    BufferedPlanRootSink::GetNext would enter an infinite loop if the query
    was cancelled and then GetNext was called. The fix is to check the
    cancellation state in the outer while loop.
    
    Testing:
    * Added new tests to test_result_spooling.py
    * Ran core tests
    
    Change-Id: Ib96f797bc8a5ba8baf9fb28abd1f645345bbe932
    Reviewed-on: http://gerrit.cloudera.org:8080/14214
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exec/buffered-plan-root-sink.cc   | 31 ++++++++++++-----
 be/src/exec/buffered-plan-root-sink.h    | 21 ++++++++++--
 tests/common/impala_test_suite.py        |  5 +--
 tests/query_test/test_result_spooling.py | 58 ++++++++++++++++++++++++++++++++
 tests/util/failpoints_util.py            | 36 ++++++++++++++++++++
 5 files changed, 137 insertions(+), 14 deletions(-)

diff --git a/be/src/exec/buffered-plan-root-sink.cc b/be/src/exec/buffered-plan-root-sink.cc
index 620bb02..9fbb82c 100644
--- a/be/src/exec/buffered-plan-root-sink.cc
+++ b/be/src/exec/buffered-plan-root-sink.cc
@@ -44,9 +44,10 @@ Status BufferedPlanRootSink::Prepare(
 }
 
 Status BufferedPlanRootSink::Open(RuntimeState* state) {
+  // Debug action before Open is called.
+  RETURN_IF_ERROR(DebugAction(state->query_options(), "BPRS_BEFORE_OPEN"));
   RETURN_IF_ERROR(DataSink::Open(state));
-  current_batch_ =
-      make_unique<RowBatch>(row_desc_, state->batch_size(), mem_tracker());
+  current_batch_ = make_unique<RowBatch>(row_desc_, state->batch_size(), mem_tracker());
   batch_queue_.reset(new SpillableRowBatchQueue(name_,
       state->query_options().max_spilled_result_spooling_mem, state, mem_tracker(),
       profile(), row_desc_, resource_profile_, debug_options_));
@@ -79,6 +80,9 @@ Status BufferedPlanRootSink::Send(RuntimeState* state, RowBatch* batch) {
     }
     RETURN_IF_CANCELLED(state);
 
+    // Debug action before AddBatch is called.
+    RETURN_IF_ERROR(DebugAction(state->query_options(), "BPRS_BEFORE_ADD_BATCH"));
+
     // Add the batch to the queue and then notify the consumer that rows are available.
     RETURN_IF_ERROR(batch_queue_->AddBatch(batch));
     rows_sent_counter_->Add(batch->num_rows());
@@ -91,6 +95,8 @@ Status BufferedPlanRootSink::Send(RuntimeState* state, RowBatch* batch) {
 
 Status BufferedPlanRootSink::FlushFinal(RuntimeState* state) {
   SCOPED_TIMER(profile()->total_time_counter());
+  // Debug action before FlushFinal is called.
+  RETURN_IF_ERROR(DebugAction(state->query_options(), "BPRS_BEFORE_FLUSH_FINAL"));
   DCHECK(!closed_);
   unique_lock<mutex> l(lock_);
   sender_state_ = SenderState::EOS;
@@ -155,18 +161,20 @@ Status BufferedPlanRootSink::GetNext(
     // Track the number of rows read from the queue and the number of rows to read.
     int num_rows_read = 0;
     // If 'num_results' <= 0 then by default fetch FETCH_NUM_BATCHES batches.
-    int num_rows_to_read =
+    const int num_rows_to_read =
         num_results <= 0 ? FETCH_NUM_BATCHES * state->batch_size() : num_results;
 
     // True if the consumer timed out waiting for the producer to send rows or if the
     // consumer timed out while materializing rows, false otherwise.
     bool timed_out = false;
 
-    // Read from the queue until all requested rows have been read, or eos is hit.
-    while (!*eos && num_rows_read < num_rows_to_read && !timed_out) {
+    // Read from the queue until the query is cancelled or the sink is closed, eos is
+    // hit, all requested rows have been read, or the timeout has been hit.
+    while (!IsCancelledOrClosed(state) && !*eos && num_rows_read < num_rows_to_read
+        && !timed_out) {
       // Wait for the queue to have rows in it.
-      while (IsQueueEmpty() && sender_state_ == SenderState::ROWS_PENDING
-          && !state->is_cancelled() && !timed_out) {
+      while (!IsCancelledOrClosed(state) && IsQueueEmpty(state)
+          && sender_state_ == SenderState::ROWS_PENDING && !timed_out) {
         // Wait fetch_rows_timeout_us_ - row_batches_get_wait_timer_ microseconds for
         // rows to become available before returning to the client. Subtracting
         // wait_timeout_timer ensures the client only ever waits up to
@@ -182,9 +190,11 @@ Status BufferedPlanRootSink::GetNext(
       // eos and then return. The queue could be empty if the sink was closed while
       // waiting for rows to become available, or if the sink was closed before the
       // current call to GetNext.
-      if (!state->is_cancelled() && !IsQueueEmpty()) {
+      if (!IsCancelledOrClosed(state) && !IsQueueEmpty(state)) {
         // If current_batch_ is empty, then read directly from the queue.
         if (current_batch_row_ == 0) {
+          // Debug action before GetBatch is called.
+          RETURN_IF_ERROR(DebugAction(state->query_options(), "BPRS_BEFORE_GET_BATCH"));
           RETURN_IF_ERROR(batch_queue_->GetBatch(current_batch_.get()));
 
           // After reading a RowBatch from the queue, it now has additional capacity,
@@ -219,8 +229,11 @@ Status BufferedPlanRootSink::GetNext(
       }
       timed_out = timed_out
           || wait_timeout_timer.ElapsedTime() >= PlanRootSink::fetch_rows_timeout_us();
-      *eos = IsQueueEmpty() && sender_state_ == SenderState::EOS;
+      // If we have read all rows, then break out of the while loop.
+      *eos = IsGetNextEos(state);
     }
+    // If the query was cancelled while reading rows, update eos and return.
+    *eos = IsGetNextEos(state);
     if (*eos) consumer_eos_.NotifyOne();
   }
   return state->GetQueryStatus();
diff --git a/be/src/exec/buffered-plan-root-sink.h b/be/src/exec/buffered-plan-root-sink.h
index a8f4b70..8261e95 100644
--- a/be/src/exec/buffered-plan-root-sink.h
+++ b/be/src/exec/buffered-plan-root-sink.h
@@ -140,11 +140,26 @@ class BufferedPlanRootSink : public PlanRootSink {
   /// 'GetNext'. If 'current_batch_' is nullptr, the value of 'current_batch_row_' is 0.
   int current_batch_row_ = 0;
 
-  /// Returns true if the 'queue' is empty (not the 'batch_queue_'). 'queue' refers to
+  /// Returns true if the 'queue' (not the 'batch_queue_') is empty. 'queue' refers to
   /// the logical queue of RowBatches and thus includes any RowBatch that
-  /// 'current_batch_' points to. Must be called while holding 'lock_'.
-  bool IsQueueEmpty() const {
+  /// 'current_batch_' points to. Must be called while holding 'lock_'. Cannot be called
+  /// once the query has been cancelled or closed.
+  bool IsQueueEmpty(RuntimeState* state) const {
+    DCHECK(!IsCancelledOrClosed(state));
     return batch_queue_->IsEmpty() && current_batch_row_ == 0;
   }
+
+  /// Sets the value of eos inside GetNext. eos is set to true if the queue is closed or
+  /// empty and the producer has set sender_state_ to EOS.
+  bool IsGetNextEos(RuntimeState* state) const {
+    return (IsCancelledOrClosed(state) || IsQueueEmpty(state))
+        && sender_state_ == SenderState::EOS;
+  }
+
+  /// Returns true if the query has been cancelled or if the PlanRootSink has been
+  /// closed, returns false otherwise.
+  bool IsCancelledOrClosed(RuntimeState* state) const {
+    return state->is_cancelled() || closed_;
+  }
 };
 }
diff --git a/tests/common/impala_test_suite.py b/tests/common/impala_test_suite.py
index 03ca651..1060faf 100644
--- a/tests/common/impala_test_suite.py
+++ b/tests/common/impala_test_suite.py
@@ -760,13 +760,14 @@ class ImpalaTestSuite(BaseTestSuite):
     assert result.success
     return result
 
+  @classmethod
   @execute_wrapper
-  def execute_query_expect_failure(self, impalad_client, query, query_options=None,
+  def execute_query_expect_failure(cls, impalad_client, query, query_options=None,
       user=None):
     """Executes a query and asserts if the query succeeds"""
     result = None
     try:
-      result = self.__execute_query(impalad_client, query, query_options, user)
+      result = cls.__execute_query(impalad_client, query, query_options, user)
     except Exception, e:
       return e
 
diff --git a/tests/query_test/test_result_spooling.py b/tests/query_test/test_result_spooling.py
index 7ac93ac..6932e82 100644
--- a/tests/query_test/test_result_spooling.py
+++ b/tests/query_test/test_result_spooling.py
@@ -25,6 +25,7 @@ from tests.common.impala_test_suite import ImpalaTestSuite
 from tests.common.test_dimensions import create_exec_option_dimension
 from tests.common.test_vector import ImpalaTestDimension
 from tests.util.cancel_util import cancel_query_and_validate_state
+from tests.util.failpoints_util import execute_query_expect_debug_action_failure
 
 
 class TestResultSpooling(ImpalaTestSuite):
@@ -179,6 +180,19 @@ class TestResultSpooling(ImpalaTestSuite):
     finally:
       self.client.close_query(handle)
 
+  def test_exec_tree_failpoint(self, vector):
+    """Inject a failure during exec tree execution. The GETNEXT:DELAY is necessary to
+    ensure the client issues a fetch request before the MEM_LIMIT_EXCEEDED exception is
+    thrown. Unlike the tests in TestResultSpoolingFailpoints this test injects a fail
+    during the execution of the exec tree, rather than in the result spooling code."""
+    vector.get_value('exec_option')['batch_size'] = 10
+    vector.get_value('exec_option')['debug_action'] = \
+        '4:GETNEXT:MEM_LIMIT_EXCEEDED|0:GETNEXT:DELAY'
+    vector.get_value('exec_option')['spool_query_results'] = 'true'
+    query = "select 1 from functional.alltypessmall a join functional.alltypessmall b " \
+        "on a.id = b.id"
+    execute_query_expect_debug_action_failure(self, query, vector)
+
   def __validate_query(self, query, exec_options):
     """Compares the results of the given query with and without result spooling
     enabled."""
@@ -344,3 +358,47 @@ class TestResultSpoolingCancellation(ImpalaTestSuite):
           "Unexpected status code from cancel request: {0}".format(cancel_result)
     finally:
       if handle: self.client.close_query(handle)
+
+
+class TestResultSpoolingFailpoints(ImpalaTestSuite):
+  """Test result spooling failure handling. Uses debug actions to inject failures at
+  various points of result spooling execution (e.g. the when results are actually getting
+  spooled)."""
+
+  _debug_actions = [
+      # Inject a failure in BufferedPlanRootSink::Open.
+      'BPRS_BEFORE_OPEN:FAIL',
+      # Inject a failure immediately before BufferedPlanRootSink::Send adds a batch to
+      # the queue. The probability ensures that the error is thrown on a random
+      # RowBatch.
+      'BPRS_BEFORE_ADD_BATCH:FAIL@1.0',
+      # Inject a failure in BufferedPlanRootSink::FlushFinal.
+      'BPRS_BEFORE_FLUSH_FINAL:FAIL',
+      # Inject a failure immediately before the BufferedPlanRootSink::GetNext reads a
+      # batch from the queue. The probability ensures that the error is thrown on a
+      # random RowBatch.
+      'BPRS_BEFORE_GET_BATCH:FAIL@1.0']
+
+  _query = "select * from functional.alltypes"
+
+  @classmethod
+  def get_workload(cls):
+    return 'functional-query'
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestResultSpoolingFailpoints, cls).add_test_dimensions()
+    cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('debug_action',
+        *cls._debug_actions))
+
+    # Result spooling should be independent of file format, so only testing for
+    # table_format=parquet/none in order to avoid a test dimension explosion.
+    cls.ImpalaTestMatrix.add_constraint(lambda v:
+        v.get_value('table_format').file_format == 'parquet' and
+        v.get_value('table_format').compression_codec == 'none')
+
+  def test_failpoints(self, vector):
+    vector.get_value('exec_option')['batch_size'] = 10
+    vector.get_value('exec_option')['debug_action'] = vector.get_value('debug_action')
+    vector.get_value('exec_option')['spool_query_results'] = 'true'
+    execute_query_expect_debug_action_failure(self, self._query, vector)
diff --git a/tests/util/failpoints_util.py b/tests/util/failpoints_util.py
new file mode 100644
index 0000000..2970567
--- /dev/null
+++ b/tests/util/failpoints_util.py
@@ -0,0 +1,36 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from tests.common.impala_test_suite import ImpalaTestSuite, LOG
+
+
+def execute_query_expect_debug_action_failure(impala_test_suite, query, vector):
+  """Executes the given query with the configured debug_action and asserts that the
+  query fails. Removes the debug_action from the exec options, re-runs the query, and
+  assert that it succeeds."""
+  assert 'debug_action' in vector.get_value('exec_option')
+  # Run the query with the given debug_action and assert that the query fails.
+  # execute_query_expect_failure either returns the client exception thrown when executing
+  # the query, or the result of the query if it failed but did the client did not throw an
+  # exception. Either way, log the result.
+  LOG.debug(ImpalaTestSuite.execute_query_expect_failure(
+      impala_test_suite.client, query, vector.get_value('exec_option')))
+
+  # Assert that the query can be run without the debug_action.
+  del vector.get_value('exec_option')['debug_action']
+  result = impala_test_suite.execute_query(query, vector.get_value('exec_option'))
+  assert result.success, "Failed to run {0} without debug action".format(query)