You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2022/04/15 01:37:26 UTC

[impala] branch master updated: IMPALA-10414: fix memory leak when canceling the retried query

This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new dfc2f175b IMPALA-10414: fix memory leak when canceling the retried query
dfc2f175b is described below

commit dfc2f175bdd10106e6984fc150ede9891a03eeb0
Author: xqhe <he...@126.com>
AuthorDate: Thu Jul 29 17:50:14 2021 +0800

    IMPALA-10414: fix memory leak when canceling the retried query
    
    The query retry launches in a separate thread. This thread may not
    finishes when deleting the query from the given QueryDriverMap if
    the query retry was failed launched. In this case, the resources
    for the query retry thread will not release. So the reference
    count of QueryDriver (via the shared_ptr) will not go to 0 and it
    will not be destroyed.
    
    We need wait until the query retry thread execution has completed
    when deleting the query from the given QueryDiverMap.
    
    Testing:
    Modify the test_query_retries.py to verify memory leak by checking
    the debug web UI of memz.
    
    Change-Id: If804ca65da1794c819a6b2e6567ea7651ab5112f
    Reviewed-on: http://gerrit.cloudera.org:8080/17735
    Reviewed-by: Wenzhe Zhou <wz...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
    Reviewed-by: Quanlong Huang <hu...@gmail.com>
---
 be/src/runtime/query-driver.cc             | 10 ++++++++++
 be/src/service/client-request-state.cc     |  8 ++++++--
 tests/custom_cluster/test_query_retries.py | 21 +++++++++++++++++++++
 3 files changed, 37 insertions(+), 2 deletions(-)

diff --git a/be/src/runtime/query-driver.cc b/be/src/runtime/query-driver.cc
index 3200ad7a7..636cf527d 100644
--- a/be/src/runtime/query-driver.cc
+++ b/be/src/runtime/query-driver.cc
@@ -363,6 +363,9 @@ void QueryDriver::RetryQueryFromThread(
   // Close the original query.
   QueryHandle query_handle;
   query_handle.SetHandle(query_driver, request_state);
+  // Do the work of close that needs to be done synchronously, otherwise we'll
+  // hit some illegal states in destroying the request_state.
+  RETURN_VOID_IF_ERROR(query_handle->Finalize(true, nullptr));
   parent_server_->CloseClientRequestState(query_handle);
   parent_server_->MarkSessionInactive(session);
 }
@@ -430,6 +433,13 @@ Status QueryDriver::Finalize(
 
 Status QueryDriver::Unregister(ImpalaServer::QueryDriverMap* query_driver_map) {
   DCHECK(finalized_.Load());
+  // Wait until retry_query_thread_ finishes, otherwise the resources for this thread
+  // may not be released.
+  if (retry_query_thread_.get() != nullptr) {
+    retry_query_thread_->Join();
+    retry_query_thread_.reset();
+  }
+  DCHECK(retry_query_thread_.get() == nullptr);
   const TUniqueId* query_id = nullptr;
   const TUniqueId* retry_query_id = nullptr;
   {
diff --git a/be/src/service/client-request-state.cc b/be/src/service/client-request-state.cc
index af68245ff..f39389b98 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -1362,8 +1362,12 @@ Status ClientRequestState::Cancel(
           || retry_state() == RetryState::RETRYING);
     }
 
-    admission_control_client_->CancelAdmission();
-    is_cancelled_ = true;
+    // To avoid recalling RemoteAdmissionControlClient::CancelAdmission() since it will
+    // send extra RPC.
+    if (!is_cancelled_) {
+      admission_control_client_->CancelAdmission();
+      is_cancelled_ = true;
+    }
   } // Release lock_ before doing cancellation work.
 
   // Cancel and close child queries before cancelling parent. 'lock_' should not be held
diff --git a/tests/custom_cluster/test_query_retries.py b/tests/custom_cluster/test_query_retries.py
index 2d7efe914..3db569a0b 100644
--- a/tests/custom_cluster/test_query_retries.py
+++ b/tests/custom_cluster/test_query_retries.py
@@ -127,6 +127,7 @@ class TestQueryRetries(CustomClusterTestSuite):
     # state since it asserts that no queries are in flight.
     self.client.close_query(handle)
     self.__validate_web_ui_state()
+    self.__validate_memz()
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
@@ -175,6 +176,7 @@ class TestQueryRetries(CustomClusterTestSuite):
     # state since it asserts that no queries are in flight.
     self.client.close_query(handle)
     self.__validate_web_ui_state()
+    self.__validate_memz()
 
     # Assert that the web ui shows all queries are complete.
     completed_queries = self.cluster.get_first_impalad().service.get_completed_queries()
@@ -237,6 +239,7 @@ class TestQueryRetries(CustomClusterTestSuite):
 
     # Validate the state of the Web UI.
     self.__validate_web_ui_state()
+    self.__validate_memz()
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
@@ -287,6 +290,7 @@ class TestQueryRetries(CustomClusterTestSuite):
     # state since it asserts that no queries are in flight.
     self.client.close_query(handle)
     self.__validate_web_ui_state()
+    self.__validate_memz()
 
   @SkipIfGCS.jira(reason="IMPALA-10562")
   @SkipIfCOS.jira(reason="IMPALA-10562")
@@ -479,6 +483,7 @@ class TestQueryRetries(CustomClusterTestSuite):
     # Validate the state of the web ui. The query must be closed before validating the
     # state since it asserts that no queries are in flight.
     self.__validate_web_ui_state()
+    self.__validate_memz()
 
   @pytest.mark.execute_serially
   def test_retry_fetched_rows(self):
@@ -722,6 +727,7 @@ class TestQueryRetries(CustomClusterTestSuite):
       assert False
     except Exception, e:
         assert "Cancelled" in str(e)
+    self.__validate_memz()
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
@@ -751,6 +757,7 @@ class TestQueryRetries(CustomClusterTestSuite):
     self.client.close_query(handle)
     time.sleep(2)
     assert self.cluster.impalads[0].get_pid() is not None, "Coordinator crashed"
+    self.__validate_memz()
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
@@ -777,6 +784,7 @@ class TestQueryRetries(CustomClusterTestSuite):
     self.assert_eventually(60, 0.1,
         lambda: impala_service.get_num_in_flight_queries() == 0,
         lambda: "in-flight queries: %d" % impala_service.get_num_in_flight_queries())
+    self.__validate_memz()
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
@@ -803,6 +811,7 @@ class TestQueryRetries(CustomClusterTestSuite):
     self.assert_eventually(60, 0.1,
         lambda: impala_service.get_num_in_flight_queries() == 0,
         lambda: "in-flight queries: %d" % impala_service.get_num_in_flight_queries())
+    self.__validate_memz()
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
@@ -827,6 +836,7 @@ class TestQueryRetries(CustomClusterTestSuite):
     self.assert_eventually(60, 0.1,
         lambda: impala_service.get_num_in_flight_queries() == 0,
         lambda: "in-flight queries: %d" % impala_service.get_num_in_flight_queries())
+    self.__validate_memz()
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
@@ -1118,6 +1128,17 @@ class TestQueryRetries(CustomClusterTestSuite):
           return query_id_search.group(1)
     return None
 
+  def __exist_queries_in_web_ui_memz(self):
+    memz_breakdown = self.cluster.get_first_impalad() \
+      .service.get_debug_webpage_json('memz')['detailed']
+    query = re.compile("Query\([0-9a-f]{16}:[0-9a-f]{16}")
+    return query.search(memz_breakdown)
+
+  def __validate_memz(self):
+    # Validate that all queries are released
+    self.assert_eventually(60, 0.1,
+        lambda: self.__exist_queries_in_web_ui_memz() is None)
+
 
 # Tests that verify the query-retries are properly triggered by disk IO failure.
 # Coordinator adds an executor node to its blacklist if that node reports query