You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2022/04/15 01:37:26 UTC
[impala] branch master updated: IMPALA-10414: fix memory leak when canceling the retried query
This is an automated email from the ASF dual-hosted git repository.
stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new dfc2f175b IMPALA-10414: fix memory leak when canceling the retried query
dfc2f175b is described below
commit dfc2f175bdd10106e6984fc150ede9891a03eeb0
Author: xqhe <he...@126.com>
AuthorDate: Thu Jul 29 17:50:14 2021 +0800
IMPALA-10414: fix memory leak when canceling the retried query
The query retry launches in a separate thread. This thread may not
finishes when deleting the query from the given QueryDriverMap if
the query retry was failed launched. In this case, the resources
for the query retry thread will not release. So the reference
count of QueryDriver (via the shared_ptr) will not go to 0 and it
will not be destroyed.
We need wait until the query retry thread execution has completed
when deleting the query from the given QueryDiverMap.
Testing:
Modify the test_query_retries.py to verify memory leak by checking
the debug web UI of memz.
Change-Id: If804ca65da1794c819a6b2e6567ea7651ab5112f
Reviewed-on: http://gerrit.cloudera.org:8080/17735
Reviewed-by: Wenzhe Zhou <wz...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
Reviewed-by: Quanlong Huang <hu...@gmail.com>
---
be/src/runtime/query-driver.cc | 10 ++++++++++
be/src/service/client-request-state.cc | 8 ++++++--
tests/custom_cluster/test_query_retries.py | 21 +++++++++++++++++++++
3 files changed, 37 insertions(+), 2 deletions(-)
diff --git a/be/src/runtime/query-driver.cc b/be/src/runtime/query-driver.cc
index 3200ad7a7..636cf527d 100644
--- a/be/src/runtime/query-driver.cc
+++ b/be/src/runtime/query-driver.cc
@@ -363,6 +363,9 @@ void QueryDriver::RetryQueryFromThread(
// Close the original query.
QueryHandle query_handle;
query_handle.SetHandle(query_driver, request_state);
+ // Do the work of close that needs to be done synchronously, otherwise we'll
+ // hit some illegal states in destroying the request_state.
+ RETURN_VOID_IF_ERROR(query_handle->Finalize(true, nullptr));
parent_server_->CloseClientRequestState(query_handle);
parent_server_->MarkSessionInactive(session);
}
@@ -430,6 +433,13 @@ Status QueryDriver::Finalize(
Status QueryDriver::Unregister(ImpalaServer::QueryDriverMap* query_driver_map) {
DCHECK(finalized_.Load());
+ // Wait until retry_query_thread_ finishes, otherwise the resources for this thread
+ // may not be released.
+ if (retry_query_thread_.get() != nullptr) {
+ retry_query_thread_->Join();
+ retry_query_thread_.reset();
+ }
+ DCHECK(retry_query_thread_.get() == nullptr);
const TUniqueId* query_id = nullptr;
const TUniqueId* retry_query_id = nullptr;
{
diff --git a/be/src/service/client-request-state.cc b/be/src/service/client-request-state.cc
index af68245ff..f39389b98 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -1362,8 +1362,12 @@ Status ClientRequestState::Cancel(
|| retry_state() == RetryState::RETRYING);
}
- admission_control_client_->CancelAdmission();
- is_cancelled_ = true;
+ // To avoid recalling RemoteAdmissionControlClient::CancelAdmission() since it will
+ // send extra RPC.
+ if (!is_cancelled_) {
+ admission_control_client_->CancelAdmission();
+ is_cancelled_ = true;
+ }
} // Release lock_ before doing cancellation work.
// Cancel and close child queries before cancelling parent. 'lock_' should not be held
diff --git a/tests/custom_cluster/test_query_retries.py b/tests/custom_cluster/test_query_retries.py
index 2d7efe914..3db569a0b 100644
--- a/tests/custom_cluster/test_query_retries.py
+++ b/tests/custom_cluster/test_query_retries.py
@@ -127,6 +127,7 @@ class TestQueryRetries(CustomClusterTestSuite):
# state since it asserts that no queries are in flight.
self.client.close_query(handle)
self.__validate_web_ui_state()
+ self.__validate_memz()
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
@@ -175,6 +176,7 @@ class TestQueryRetries(CustomClusterTestSuite):
# state since it asserts that no queries are in flight.
self.client.close_query(handle)
self.__validate_web_ui_state()
+ self.__validate_memz()
# Assert that the web ui shows all queries are complete.
completed_queries = self.cluster.get_first_impalad().service.get_completed_queries()
@@ -237,6 +239,7 @@ class TestQueryRetries(CustomClusterTestSuite):
# Validate the state of the Web UI.
self.__validate_web_ui_state()
+ self.__validate_memz()
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
@@ -287,6 +290,7 @@ class TestQueryRetries(CustomClusterTestSuite):
# state since it asserts that no queries are in flight.
self.client.close_query(handle)
self.__validate_web_ui_state()
+ self.__validate_memz()
@SkipIfGCS.jira(reason="IMPALA-10562")
@SkipIfCOS.jira(reason="IMPALA-10562")
@@ -479,6 +483,7 @@ class TestQueryRetries(CustomClusterTestSuite):
# Validate the state of the web ui. The query must be closed before validating the
# state since it asserts that no queries are in flight.
self.__validate_web_ui_state()
+ self.__validate_memz()
@pytest.mark.execute_serially
def test_retry_fetched_rows(self):
@@ -722,6 +727,7 @@ class TestQueryRetries(CustomClusterTestSuite):
assert False
except Exception, e:
assert "Cancelled" in str(e)
+ self.__validate_memz()
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
@@ -751,6 +757,7 @@ class TestQueryRetries(CustomClusterTestSuite):
self.client.close_query(handle)
time.sleep(2)
assert self.cluster.impalads[0].get_pid() is not None, "Coordinator crashed"
+ self.__validate_memz()
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
@@ -777,6 +784,7 @@ class TestQueryRetries(CustomClusterTestSuite):
self.assert_eventually(60, 0.1,
lambda: impala_service.get_num_in_flight_queries() == 0,
lambda: "in-flight queries: %d" % impala_service.get_num_in_flight_queries())
+ self.__validate_memz()
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
@@ -803,6 +811,7 @@ class TestQueryRetries(CustomClusterTestSuite):
self.assert_eventually(60, 0.1,
lambda: impala_service.get_num_in_flight_queries() == 0,
lambda: "in-flight queries: %d" % impala_service.get_num_in_flight_queries())
+ self.__validate_memz()
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
@@ -827,6 +836,7 @@ class TestQueryRetries(CustomClusterTestSuite):
self.assert_eventually(60, 0.1,
lambda: impala_service.get_num_in_flight_queries() == 0,
lambda: "in-flight queries: %d" % impala_service.get_num_in_flight_queries())
+ self.__validate_memz()
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
@@ -1118,6 +1128,17 @@ class TestQueryRetries(CustomClusterTestSuite):
return query_id_search.group(1)
return None
+ def __exist_queries_in_web_ui_memz(self):
+ memz_breakdown = self.cluster.get_first_impalad() \
+ .service.get_debug_webpage_json('memz')['detailed']
+ query = re.compile("Query\([0-9a-f]{16}:[0-9a-f]{16}")
+ return query.search(memz_breakdown)
+
+ def __validate_memz(self):
+ # Validate that all queries are released
+ self.assert_eventually(60, 0.1,
+ lambda: self.__exist_queries_in_web_ui_memz() is None)
+
# Tests that verify the query-retries are properly triggered by disk IO failure.
# Coordinator adds an executor node to its blacklist if that node reports query