You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by mi...@apache.org on 2023/12/21 17:47:35 UTC

(impala) 02/02: IMPALA-12633: Remove DCHECK for slow SetQueryInflight

This is an automated email from the ASF dual-hosted git repository.

michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 6c6142ba2e602d0c37543c81dfcd0652455e6368
Author: Michael Smith <mi...@cloudera.com>
AuthorDate: Thu Dec 14 17:18:02 2023 -0800

    IMPALA-12633: Remove DCHECK for slow SetQueryInflight
    
    Removes the DCHECK that the original query is inflight before trying to
    close it during a query retry. SetQueryInflight is a separate operation
    the server performs after a query has started executing async, and it's
    possible for the query to fail and retry before the server calls
    SetQueryInflight. When that happens, we still need to perform cleanup
    or the original request_state is never closed and we hit a different
    DCHECK: "BlockOnWait() needs to be called!"
    
    Adds an option to CloseClientRequestState for when we close a
    ClientRequestState but the query is retrying with a new state. It
    ensures that we bypass most of SetQueryInflight in case
    CloseClientRequestState was called first.
    
    Updates the message from DCHECK in ClientRequestState's destructor to
    reflect that wait_thread_ is only reset in Finalize.
    
    Adds a debug action and test where just the original query is delayed
    during the SetQueryInflight call.
    
    Change-Id: Ic17a5e12d9db61cb19306270174518a8dfd281a7
    Reviewed-on: http://gerrit.cloudera.org:8080/20799
    Reviewed-by: Riza Suminto <ri...@cloudera.com>
    Reviewed-by: Wenzhe Zhou <wz...@cloudera.com>
    Reviewed-by: Quanlong Huang <hu...@gmail.com>
    Tested-by: Michael Smith <mi...@cloudera.com>
---
 be/src/runtime/query-driver.cc             | 20 +++++++++++++++--
 be/src/service/client-request-state.cc     |  2 +-
 be/src/service/impala-server.cc            | 36 +++++++++++++++++++++++++++---
 be/src/service/impala-server.h             | 21 ++++++++++++-----
 tests/custom_cluster/test_query_retries.py | 31 +++++++++++++++++++++++++
 5 files changed, 99 insertions(+), 11 deletions(-)

diff --git a/be/src/runtime/query-driver.cc b/be/src/runtime/query-driver.cc
index 6bc677dea..0a285639b 100644
--- a/be/src/runtime/query-driver.cc
+++ b/be/src/runtime/query-driver.cc
@@ -372,8 +372,17 @@ void QueryDriver::RetryQueryFromThread(
   QueryHandle query_handle;
   query_handle.SetHandle(query_driver, request_state);
   // Do the work of close that needs to be done synchronously, otherwise we'll
-  // hit some illegal states in destroying the request_state.
-  DCHECK(query_handle->is_inflight());
+  // hit some illegal states in destroying the request_state. It's possible (but rare)
+  // that the query is not marked inflight yet; what happens in that case is:
+  //   1. RegisterQuery (adds the original ID to query_driver_map_), then Exec.
+  //   2. Query fails and retries with a new query ID (this function). The retried query
+  //      won't also retry on failure, so this problem doesn't recurse.
+  //   3. We Finalize the query and CloseClientRequestState; it will fail to remove the
+  //      original ID from inflight_queries, and add it to prestopped_queries for later.
+  //   4. At some point later SetQueryInflight executes; it will remove the original ID
+  //      from prestopped_queries and skip adding it to inflight_queries.
+  //   5. When the query is closed, UnregisterQuery calls query_driver->Unregister, which
+  //      removes both the original and retry query ID from query_driver_map_.
   query_handle->Finalize(nullptr);
   parent_server_->CloseClientRequestState(query_handle);
   parent_server_->MarkSessionInactive(session);
@@ -396,6 +405,13 @@ void QueryDriver::CreateRetriedClientRequestState(ClientRequestState* request_st
     VLOG_QUERY << "Unset SPOOL_ALL_RESULTS_FOR_RETRIES when retrying query "
         << PrintId(client_request_state_->query_id());
   }
+  if (UNLIKELY(query_ctx.client_request.query_options.__isset.debug_action)) {
+    // We need to be able to test actions that don't reproduce in the retried query. If we
+    // later need to target retried queries, we can add a separate query option for that.
+    query_ctx.client_request.query_options.__set_debug_action("");
+    VLOG_QUERY << "Unset DEBUG_ACTION when retrying query "
+        << PrintId(client_request_state_->query_id());
+  }
   parent_server_->PrepareQueryContext(&query_ctx);
   retry_exec_request_->query_exec_request.__set_query_ctx(query_ctx);
 
diff --git a/be/src/service/client-request-state.cc b/be/src/service/client-request-state.cc
index 80ebe0326..aa6da5170 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -176,7 +176,7 @@ ClientRequestState::ClientRequestState(const TQueryCtx& query_ctx, Frontend* fro
 }
 
 ClientRequestState::~ClientRequestState() {
-  DCHECK(wait_thread_.get() == NULL) << "BlockOnWait() needs to be called!";
+  DCHECK(wait_thread_.get() == NULL) << "Finalize() needs to be called!";
   DCHECK(!track_rpcs_);  // Should get set to false in Finalize()
   DCHECK(pending_rpcs_.empty()); // Should get cleared in Finalize()
   UnRegisterRemainingRPCs(); // Avoid memory leaks if Finalize() didn't get called
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 6e89c78d6..21b778302 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -1461,6 +1461,7 @@ Status ImpalaServer::RegisterQuery(const TUniqueId& query_id,
 
 Status ImpalaServer::SetQueryInflight(
     shared_ptr<SessionState> session_state, const QueryHandle& query_handle) {
+  DebugActionNoFail(query_handle->query_options(), "SET_QUERY_INFLIGHT");
   const TUniqueId& query_id = query_handle->query_id();
   lock_guard<mutex> l(session_state->lock);
   // The session wasn't expired at the time it was checked out and it isn't allowed to
@@ -1472,9 +1473,21 @@ Status ImpalaServer::SetQueryInflight(
     VLOG(1) << "Session closed: cannot set " << PrintId(query_id) << " in-flight";
     return Status::Expected("Session closed");
   }
-  // Add query to the set that will be unregistered if sesssion is closed.
-  session_state->inflight_queries.insert(query_id);
+
+  // Acknowledge the query by incrementing total_queries.
   ++session_state->total_queries;
+  // If the query was already closed - only possible by query retry logic - skip
+  // scheduling it to be unregistered with the session and adding timeouts checks.
+  if (session_state->prestopped_queries.erase(query_id) > 0) {
+    VLOG_QUERY << "Query " << PrintId(query_id) << " closed, skipping in-flight.";
+    return Status::OK();
+  }
+  // Add query to the set that will be unregistered if session is closed.
+  auto inflight_it = session_state->inflight_queries.insert(query_id);
+  if (UNLIKELY(!inflight_it.second)) {
+    LOG(WARNING) << "Query " << PrintId(query_id) << " is already in-flight.";
+    DCHECK(false) << "SetQueryInflight called twice for query_id=" << PrintId(query_id);
+  }
 
   // If the query has a timeout or time limit, schedule checks.
   int32_t idle_timeout_s = query_handle->query_options().query_timeout_s;
@@ -1604,7 +1617,24 @@ void ImpalaServer::CloseClientRequestState(const QueryHandle& query_handle) {
 
   {
     lock_guard<mutex> l(query_handle->session()->lock);
-    query_handle->session()->inflight_queries.erase(query_handle->query_id());
+    if (query_handle->session()->inflight_queries.erase(query_handle->query_id()) == 0
+        && query_handle->IsSetRetriedId()) {
+      // Closing a ClientRequestState but the query is retrying with a new state and the
+      // original ID was not yet inflight: skip adding it later. We don't want to track
+      // other scenarios because they happen when the query was started and then errored
+      // before a call to SetQueryInflight; we don't expect it to ever be called.
+      auto prestopped_it =
+          query_handle->session()->prestopped_queries.insert(query_handle->query_id());
+      if (UNLIKELY(!prestopped_it.second)) {
+        LOG(WARNING) << "Query " << PrintId(query_handle->query_id())
+                     << " closed again before in-flight.";
+        DCHECK(false) << "CloseClientRequestState called twice for query_id="
+                      << PrintId(query_handle->query_id());
+      } else {
+        VLOG_QUERY << "Query " << PrintId(query_handle->query_id())
+                   << " closed before in-flight.";
+      }
+    }
   }
 
   if (query_handle->GetCoordinator() != nullptr) {
diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h
index 02bdd4581..de686a965 100644
--- a/be/src/service/impala-server.h
+++ b/be/src/service/impala-server.h
@@ -636,6 +636,12 @@ class ImpalaServer : public ImpalaServiceIf,
     /// that need to be closed if the session closes or expires.
     boost::unordered_set<TUniqueId> inflight_queries;
 
+    /// Queries are added to inflight_queries via SetQueryInflight. Execution is async
+    /// and can start before SetQueryInflight is called. If a query is retried before
+    /// SetQueryInflight is called, the original may be cleaned up before it is added to
+    /// inflight_queries. In that case we add it to prestopped_queries instead.
+    std::set<TUniqueId> prestopped_queries;
+
     /// Total number of queries run as part of this session.
     int64_t total_queries;
 
@@ -755,13 +761,18 @@ class ImpalaServer : public ImpalaServiceIf,
       QueryHandle* query_handle) WARN_UNUSED_RESULT;
 
   /// Adds the query to the set of in-flight queries for the session. The query remains
-  /// in-flight until the query is unregistered.  Until a query is in-flight, an attempt
-  /// to cancel or close the query by the user will return an error status.  If the
-  /// session is closed before a query is in-flight, then the query cancellation is
-  /// deferred until after the issuing path has completed initializing the query.  Once
+  /// in-flight until the query is unregistered. Until a query is in-flight, an attempt
+  /// to cancel or close the query by the user will return an error status. A query that
+  /// retries may close the original query state before SetQueryInflight is called; in
+  /// that case it adds the query to prestopped_queries, which will bypass unnecessary
+  /// setup when SetQueryInflight is later called on it.
+  ///
+  /// If the session is closed before a query is in-flight, then the query cancellation
+  /// is deferred until after the issuing path has completed initializing the query. Once
   /// a query is in-flight, it can be cancelled/closed asynchronously by the user
   /// (e.g. via an RPC) and the session close path can close (cancel and unregister) it.
-  /// The query must have already been registered using RegisterQuery().  The caller
+  ///
+  /// The query must have already been registered using RegisterQuery(). The caller
   /// must have checked out the session state.
   Status SetQueryInflight(std::shared_ptr<SessionState> session_state,
       const QueryHandle& query_handle) WARN_UNUSED_RESULT;
diff --git a/tests/custom_cluster/test_query_retries.py b/tests/custom_cluster/test_query_retries.py
index 56e852970..c6fe3e74d 100644
--- a/tests/custom_cluster/test_query_retries.py
+++ b/tests/custom_cluster/test_query_retries.py
@@ -760,6 +760,37 @@ class TestQueryRetries(CustomClusterTestSuite):
     assert self.cluster.impalads[0].get_pid() is not None, "Coordinator crashed"
     self.__validate_memz()
 
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+      statestored_args="--statestore_heartbeat_frequency_ms=60000")
+  def test_retrying_query_before_inflight(self):
+    """Trigger a query retry, and delay setting the original query inflight as that may
+    happen after the query is retried. Validate that the query succeeds. Set a really
+    high statestore heartbeat frequency so that killed impalads are not removed from
+    the cluster membership."""
+
+    # Kill an impalad, and run a query. The query should be retried.
+    self.cluster.impalads[1].kill()
+    query = self._count_query
+    handle = self.execute_query_async(query,
+        query_options={'retry_failed_queries': 'true',
+                       'debug_action': 'SET_QUERY_INFLIGHT:SLEEP@1000'})
+    self.__wait_until_retry_state(handle, 'RETRIED')
+
+    # SetQueryInflight will complete before execute_query_async returns because it will
+    # be completed before Impala acknowledges that the query has started.
+    page = self.cluster.get_first_impalad().service.get_debug_webpage_json('sessions')
+    for session in page['sessions']:
+      # Every session should have one completed query: either test setup, or the original
+      # query that's being retried.
+      assert session['inflight_queries'] < session['total_queries']
+
+    self.client.close_query(handle)
+    # If original query state closure is skipped, the coordinator will crash on a DCHECK.
+    time.sleep(2)
+    assert self.cluster.impalads[0].get_pid() is not None, "Coordinator crashed"
+    self.__validate_memz()
+
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
       impalad_args="--debug_actions=RETRY_DELAY_GET_QUERY_DRIVER:SLEEP@2000",