You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2021/05/19 01:36:53 UTC

[impala] 03/05: IMPALA-10704: Fix retried query id not being unregistered when retry fails

This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit d111443e8f9692a3eac734e565a5afc41980a0ba
Author: stiga-huang <hu...@gmail.com>
AuthorDate: Mon May 17 17:04:54 2021 +0800

    IMPALA-10704: Fix retried query id not being unregistered when retry fails
    
    When query retry fails in RetryQueryFromThread(), the retried query id
    may not be unregistered if the failure happens before we store the
    retry_request_state. In this case, QueryDriver::Unregister() has no way
    to get the retried query id so it's not deleted. Note that the retried
    query id is registered in RetryQueryFromThread() so should be deleted
    later. This finally results in a leak in the query driver map, where
    queries in it are shown as in-flight queries.
    
    test_retry_query_result_cacheing_failed and
    test_retry_query_set_query_in_flight_failed (added in IMPALA-10413)
    asserts one in-flight query at the end. This is satisfied by the leak.
    Instead, we should verify no running queries at the end.
    
    This patch adds a new field in QueryDriver to remember the registered
    retry query id as a backup way for getting it when query retry fails
    before we store the ClientRequestState of the retried query (so
    retried_client_request_state_ is null).
    
    Tests:
     - Run test_retry_query_result_cacheing_failed and
       test_retry_query_set_query_in_flight_failed 100 times.
    
    Change-Id: I074526799d68041a425b2379e74f8d8b45ce892a
    Reviewed-on: http://gerrit.cloudera.org:8080/17465
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/runtime/query-driver.cc             |  8 ++++++++
 be/src/runtime/query-driver.h              |  5 +++++
 tests/custom_cluster/test_query_retries.py | 25 +++++++++++++++++++++++--
 3 files changed, 36 insertions(+), 2 deletions(-)

diff --git a/be/src/runtime/query-driver.cc b/be/src/runtime/query-driver.cc
index 8a1a4bf..db8c3bd 100644
--- a/be/src/runtime/query-driver.cc
+++ b/be/src/runtime/query-driver.cc
@@ -30,7 +30,10 @@
 #include "common/thread-debug-info.h"
 
 DECLARE_string(debug_actions);
+
 namespace impala {
+// A 0 unique id, which indicates that one has not been set.
+const TUniqueId ZERO_UNIQUE_ID;
 
 QueryDriver::QueryDriver(ImpalaServer* parent_server) : parent_server_(parent_server) {}
 
@@ -263,6 +266,9 @@ void QueryDriver::RetryQueryFromThread(
     HandleRetryFailure(&status, &error_msg, request_state, retry_query_id);
     return;
   }
+  // The retried query id is now registered. Remember it so we can still delete it if we
+  // fail to retry.
+  registered_retry_query_id_ = retry_query_id;
 
   // Transfer the blacklisted_executor_addresses from the original query to the query to
   // be retried.
@@ -429,6 +435,8 @@ Status QueryDriver::Unregister(ImpalaServer::QueryDriverMap* query_driver_map) {
     query_id = &client_request_state_->query_id();
     if (retried_client_request_state_ != nullptr) {
       retry_query_id = &retried_client_request_state_->query_id();
+    } else if (registered_retry_query_id_ != ZERO_UNIQUE_ID) {
+      retry_query_id = &registered_retry_query_id_;
     }
   }
   RETURN_IF_ERROR(query_driver_map->Delete(*query_id));
diff --git a/be/src/runtime/query-driver.h b/be/src/runtime/query-driver.h
index 188b4da..3ca375a 100644
--- a/be/src/runtime/query-driver.h
+++ b/be/src/runtime/query-driver.h
@@ -259,6 +259,11 @@ class QueryDriver {
   /// control service RPC threads.
   std::unique_ptr<Thread> retry_query_thread_;
 
+  /// The retry query id that has been registered. 0 if no retry or the retry fails before
+  /// registering the retry query id. Used to delete the retry query id in the
+  /// query_driver_map.
+  TUniqueId registered_retry_query_id_;
+
   /// True if a thread has called Finalize(). Threads calling Finalize() do a
   /// compare-and-swap on this so that only one thread can proceed.
   AtomicBool finalized_{false};
diff --git a/tests/custom_cluster/test_query_retries.py b/tests/custom_cluster/test_query_retries.py
index 1b6c3b9..5a6e8a4 100644
--- a/tests/custom_cluster/test_query_retries.py
+++ b/tests/custom_cluster/test_query_retries.py
@@ -752,13 +752,24 @@ class TestQueryRetries(CustomClusterTestSuite):
   def test_retry_query_result_cacheing_failed(self):
     """Test setting up results cacheing failed."""
 
+    # Kill an impalad, and run a query. The query should be retried.
     self.cluster.impalads[1].kill()
     query = "select count(*) from tpch_parquet.lineitem"
     self.hs2_client.set_configuration({'retry_failed_queries': 'true'})
     self.hs2_client.set_configuration_option('impala.resultset.cache.size', '1024')
     self.hs2_client.execute_async(query)
+    # The number of in-flight queries is 0 at the beginning, then 1 when the original
+    # query is submitted. It's 2 when the retried query is registered. Although the retry
+    # will immediately fail due to the debug action and the 2 queries are unregistered,
+    # the number can't come back to 0 immediately. The reason is the registered queries
+    # are cleaned up by a backend thread. Sleep a while to make sure these finish.
+    time.sleep(2)
+    # TODO(IMPALA-10705): Verify the retry failure.
+    # No queries running at the end.
+    impala_service = self.cluster.get_first_impalad().service
     self.assert_eventually(60, 0.1,
-        lambda: self.cluster.get_first_impalad().service.get_num_in_flight_queries() == 1)
+        lambda: impala_service.get_num_in_flight_queries() == 0,
+        lambda: "in-flight queries: %d" % impala_service.get_num_in_flight_queries())
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
@@ -771,8 +782,18 @@ class TestQueryRetries(CustomClusterTestSuite):
     self.cluster.impalads[1].kill()
     query = "select count(*) from tpch_parquet.lineitem"
     self.execute_query_async(query, query_options={'retry_failed_queries': 'true'})
+    # The number of in-flight queries is 0 at the beginning, then 1 when the original
+    # query is submitted. It's 2 when the retried query is registered. Although the retry
+    # will immediately fail due to the debug action and the 2 queries are unregistered,
+    # the number can't come back to 0 immediately. The reason is the registered queries
+    # are cleaned up by a backend thread. Sleep a while to make sure these finish.
+    time.sleep(2)
+    # TODO(IMPALA-10705): Verify the retry failure.
+    # No queries running at the end.
+    impala_service = self.cluster.get_first_impalad().service
     self.assert_eventually(60, 0.1,
-        lambda: self.cluster.get_first_impalad().service.get_num_in_flight_queries() == 1)
+        lambda: impala_service.get_num_in_flight_queries() == 0,
+        lambda: "in-flight queries: %d" % impala_service.get_num_in_flight_queries())
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(