You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by to...@apache.org on 2018/08/31 18:54:23 UTC

[2/3] impala git commit: IMPALA-7464: fix race when ExecFInstance() RPC times out

IMPALA-7464: fix race when ExecFInstance() RPC times out

The "exec resources" reference count on the QueryState expects that
it will transition from 0 -> non-zero -> 0 at most once.  The
reference count is taken on the coordinator side (sender of this
RPC) and also the backend (receiver of this RPC).  Usually, the
lifetimes of those references overlap (the coordinator won't give up
the reference until the backend execution is complete or failed),
and so the assumption is not violated. However, when the RPC times
out, the receiver may run after the sender has given up its
reference (since the sender doesn't know the receiver is actually
still executing).

As it turns out, the coordinator doesn't really need to take a
reference given the current code (verified via code inspection), as
these resources are backend-only). So, stop taking the reference on
the coordinator side, and add some DCHECKs to document that (the
dchecks aren't particularly good at verifying it, however, since the
lifetimes generally will overlap).

Note that this patch can't be easily backported to older versions
without careful inspection since older versions of the code may have
relied on the reference count protecting things used by the
coordinator.

Testing:
- New test_rpc_timeout case that reproduced the problem 100%
- exhaustive build

Change-Id: If60d983e0e68b00e6557185db1f86757ab8b3f2d
Reviewed-on: http://gerrit.cloudera.org:8080/11339
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/fa0869d1
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/fa0869d1
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/fa0869d1

Branch: refs/heads/master
Commit: fa0869d1f20eb52ee2f649622b964abe51b6dc86
Parents: 5f4f01b
Author: Dan Hecht <dh...@cloudera.com>
Authored: Mon Aug 27 15:49:56 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Thu Aug 30 20:02:23 2018 +0000

----------------------------------------------------------------------
 .../benchmarks/process-wide-locks-benchmark.cc  |  4 +-
 be/src/runtime/coordinator.cc                   |  3 -
 be/src/runtime/query-exec-mgr.cc                |  6 +-
 be/src/runtime/query-state.cc                   | 33 +++++-----
 be/src/runtime/query-state.h                    | 63 +++++++++++++-------
 be/src/runtime/runtime-state.cc                 |  4 +-
 be/src/runtime/test-env.cc                      |  2 +-
 tests/custom_cluster/test_rpc_timeout.py        | 11 +++-
 8 files changed, 76 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/fa0869d1/be/src/benchmarks/process-wide-locks-benchmark.cc
----------------------------------------------------------------------
diff --git a/be/src/benchmarks/process-wide-locks-benchmark.cc b/be/src/benchmarks/process-wide-locks-benchmark.cc
index ffe4268..465bb00 100644
--- a/be/src/benchmarks/process-wide-locks-benchmark.cc
+++ b/be/src/benchmarks/process-wide-locks-benchmark.cc
@@ -86,7 +86,7 @@ void CreateAndAccessQueryStates(const TUniqueId& query_id, int num_accesses) {
   QueryState *query_state;
   query_state = ExecEnv::GetInstance()->query_exec_mgr()->CreateQueryState(query_ctx);
   DCHECK(query_state != nullptr);
-  query_state->AcquireExecResourceRefcount();
+  query_state->AcquireBackendResourceRefcount();
 
   for (int i=0; i < num_accesses ; ++i) {
     QueryState* qs;
@@ -95,7 +95,7 @@ void CreateAndAccessQueryStates(const TUniqueId& query_id, int num_accesses) {
     ExecEnv::GetInstance()->query_exec_mgr()->ReleaseQueryState(qs);
   }
 
-  query_state->ReleaseExecResourceRefcount();
+  query_state->ReleaseBackendResourceRefcount();
   // This should drop the last reference count to the QueryState and destroy it.
   ExecEnv::GetInstance()->query_exec_mgr()->ReleaseQueryState(query_state);
   // Make sure that the query doesn't exist in the map any longer.

http://git-wip-us.apache.org/repos/asf/impala/blob/fa0869d1/be/src/runtime/coordinator.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 99f0be5..d944942 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -108,7 +108,6 @@ Status Coordinator::Exec() {
   if (is_mt_execution) filter_mode_ = TRuntimeFilterMode::OFF;
 
   query_state_ = ExecEnv::GetInstance()->query_exec_mgr()->CreateQueryState(query_ctx());
-  query_state_->AcquireExecResourceRefcount(); // Decremented in ReleaseExecResources().
   filter_mem_tracker_ = query_state_->obj_pool()->Add(new MemTracker(
       -1, "Runtime Filter (Coordinator)", query_state_->query_mem_tracker(), false));
 
@@ -782,8 +781,6 @@ void Coordinator::ReleaseExecResources() {
   }
   // This may be NULL while executing UDFs.
   if (filter_mem_tracker_ != nullptr) filter_mem_tracker_->Close();
-  // Now that we've released our own resources, can release query-wide resources.
-  if (query_state_ != nullptr) query_state_->ReleaseExecResourceRefcount();
   // At this point some tracked memory may still be used in the coordinator for result
   // caching. The query MemTracker will be cleaned up later.
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/fa0869d1/be/src/runtime/query-exec-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-exec-mgr.cc b/be/src/runtime/query-exec-mgr.cc
index 26ed811..4e1a340 100644
--- a/be/src/runtime/query-exec-mgr.cc
+++ b/be/src/runtime/query-exec-mgr.cc
@@ -50,7 +50,7 @@ Status QueryExecMgr::StartQuery(const TExecQueryFInstancesParams& params) {
   QueryState* qs = GetOrCreateQueryState(params.query_ctx, &dummy);
   Status status = qs->Init(params);
   if (!status.ok()) {
-    qs->ReleaseExecResourceRefcount(); // Release refcnt acquired in Init().
+    qs->ReleaseBackendResourceRefcount(); // Release refcnt acquired in Init().
     ReleaseQueryState(qs);
     return status;
   }
@@ -62,7 +62,7 @@ Status QueryExecMgr::StartQuery(const TExecQueryFInstancesParams& params) {
           &QueryExecMgr::StartQueryHelper, this, qs, &t, true);
   if (!status.ok()) {
     // decrement refcount taken in QueryState::Init()
-    qs->ReleaseExecResourceRefcount();
+    qs->ReleaseBackendResourceRefcount();
     // decrement refcount taken in GetOrCreateQueryState()
     ReleaseQueryState(qs);
     return status;
@@ -140,7 +140,7 @@ void QueryExecMgr::StartQueryHelper(QueryState* qs) {
 #endif
 
   // decrement refcount taken in QueryState::Init();
-  qs->ReleaseExecResourceRefcount();
+  qs->ReleaseBackendResourceRefcount();
   // decrement refcount taken in StartQuery()
   ReleaseQueryState(qs);
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/fa0869d1/be/src/runtime/query-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index a020596..1dc41dc 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -57,7 +57,7 @@ QueryState::ScopedRef::~ScopedRef() {
 
 QueryState::QueryState(const TQueryCtx& query_ctx, const string& request_pool)
   : query_ctx_(query_ctx),
-    exec_resource_refcnt_(0),
+    backend_resource_refcnt_(0),
     refcnt_(0),
     is_cancelled_(0),
     query_spilled_(0) {
@@ -80,8 +80,8 @@ QueryState::QueryState(const TQueryCtx& query_ctx, const string& request_pool)
   InitMemTrackers();
 }
 
-void QueryState::ReleaseExecResources() {
-  DCHECK(!released_exec_resources_);
+void QueryState::ReleaseBackendResources() {
+  DCHECK(!released_backend_resources_);
   // Clean up temporary files.
   if (file_group_ != nullptr) file_group_->Close();
   // Release any remaining reservation.
@@ -94,13 +94,12 @@ void QueryState::ReleaseExecResources() {
   // At this point query execution should not be consuming any resources but some tracked
   // memory may still be used by the ClientRequestState for result caching. The query
   // MemTracker will be closed later when this QueryState is torn down.
-  released_exec_resources_ = true;
+  released_backend_resources_ = true;
 }
 
 QueryState::~QueryState() {
   DCHECK_EQ(refcnt_.Load(), 0);
-  DCHECK_EQ(exec_resource_refcnt_.Load(), 0);
-  DCHECK(released_exec_resources_);
+  DCHECK_EQ(backend_resource_refcnt_.Load(), 0);
   if (query_mem_tracker_ != nullptr) {
     // Disconnect the query MemTracker hierarchy from the global hierarchy. After this
     // point nothing must touch this query's MemTracker and all tracked memory associated
@@ -114,7 +113,7 @@ Status QueryState::Init(const TExecQueryFInstancesParams& rpc_params) {
   // Decremented in QueryExecMgr::StartQueryHelper() on success or by the caller of
   // Init() on failure. We need to do this before any returns because Init() always
   // returns a resource refcount to its caller.
-  AcquireExecResourceRefcount();
+  AcquireBackendResourceRefcount();
 
   // Starting a new query creates threads and consumes a non-trivial amount of memory.
   // If we are already starved for memory, fail as early as possible to avoid consuming
@@ -356,7 +355,7 @@ void QueryState::StartFInstances() {
   VLOG(2) << "StartFInstances(): query_id=" << PrintId(query_id())
           << " #instances=" << rpc_params_.fragment_instance_ctxs.size();
   DCHECK_GT(refcnt_.Load(), 0);
-  DCHECK_GT(exec_resource_refcnt_.Load(), 0) << "Should have been taken in Init()";
+  DCHECK_GT(backend_resource_refcnt_.Load(), 0) << "Should have been taken in Init()";
 
   // set up desc tbl
   DCHECK(query_ctx().__isset.desc_tbl);
@@ -394,7 +393,7 @@ void QueryState::StartFInstances() {
 
     // start new thread to execute instance
     refcnt_.Add(1); // decremented in ExecFInstance()
-    AcquireExecResourceRefcount(); // decremented in ExecFInstance()
+    AcquireBackendResourceRefcount(); // decremented in ExecFInstance()
 
     // Add the fragment instance ID to the 'fis_map_'. Has to happen before the thread is
     // spawned or we may race with users of 'fis_map_'.
@@ -422,7 +421,7 @@ void QueryState::StartFInstances() {
       // Undo refcnt increments done immediately prior to Thread::Create(). The
       // reference counts were both greater than zero before the increments, so
       // neither of these decrements will free any structures.
-      ReleaseExecResourceRefcount();
+      ReleaseBackendResourceRefcount();
       ExecEnv::GetInstance()->query_exec_mgr()->ReleaseQueryState(this);
       break;
     }
@@ -464,15 +463,15 @@ void QueryState::StartFInstances() {
       << BackendExecStateToString(backend_exec_state_);
 }
 
-void QueryState::AcquireExecResourceRefcount() {
-  DCHECK(!released_exec_resources_);
-  exec_resource_refcnt_.Add(1);
+void QueryState::AcquireBackendResourceRefcount() {
+  DCHECK(!released_backend_resources_);
+  backend_resource_refcnt_.Add(1);
 }
 
-void QueryState::ReleaseExecResourceRefcount() {
-  int32_t new_val = exec_resource_refcnt_.Add(-1);
+void QueryState::ReleaseBackendResourceRefcount() {
+  int32_t new_val = backend_resource_refcnt_.Add(-1);
   DCHECK_GE(new_val, 0);
-  if (new_val == 0) ReleaseExecResources();
+  if (new_val == 0) ReleaseBackendResources();
 }
 
 void QueryState::ExecFInstance(FragmentInstanceState* fis) {
@@ -495,7 +494,7 @@ void QueryState::ExecFInstance(FragmentInstanceState* fis) {
   // initiate cancellation if nobody has done so yet
   if (!status.ok()) Cancel();
   // decrement refcount taken in StartFInstances()
-  ReleaseExecResourceRefcount();
+  ReleaseBackendResourceRefcount();
   // decrement refcount taken in StartFInstances()
   ExecEnv::GetInstance()->query_exec_mgr()->ReleaseQueryState(this);
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/fa0869d1/be/src/runtime/query-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-state.h b/be/src/runtime/query-state.h
index 148b06f..5cb499a 100644
--- a/be/src/runtime/query-state.h
+++ b/be/src/runtime/query-state.h
@@ -124,21 +124,41 @@ class QueryState {
   }
   MemTracker* query_mem_tracker() const { return query_mem_tracker_; }
 
-  // the following getters are only valid after Init()
-  ReservationTracker* buffer_reservation() const { return buffer_reservation_; }
-  InitialReservations* initial_reservations() const { return initial_reservations_; }
+  /// The following getters are only valid after Init().
   ScannerMemLimiter* scanner_mem_limiter() const { return scanner_mem_limiter_; }
-  TmpFileMgr::FileGroup* file_group() const { return file_group_; }
   const TExecQueryFInstancesParams& rpc_params() const { return rpc_params_; }
 
-  // the following getters are only valid after StartFInstances()
-  const DescriptorTbl& desc_tbl() const { return *desc_tbl_; }
+  /// The following getters are only valid after Init() and should be called only from
+  /// the backend execution (ie. not the coordinator side, since they require holding
+  /// an backend resource refcnt).
+  ReservationTracker* buffer_reservation() const {
+    DCHECK_GT(backend_resource_refcnt_.Load(), 0);
+    return buffer_reservation_;
+  }
+  InitialReservations* initial_reservations() const {
+    DCHECK_GT(backend_resource_refcnt_.Load(), 0);
+    return initial_reservations_;
+  }
+  TmpFileMgr::FileGroup* file_group() const {
+    DCHECK_GT(backend_resource_refcnt_.Load(), 0);
+    return file_group_;
+  }
+
+  /// The following getters are only valid after StartFInstances().
   int64_t fragment_events_start_time() const { return fragment_events_start_time_; }
 
-  /// Sets up state required for fragment execution: memory reservations, etc. Fails
-  /// if resources could not be acquired. Acquires a resource refcount and returns it
-  /// to the caller on both success and failure. The caller must release it by calling
-  /// ReleaseExecResourceRefcount().
+  /// The following getters are only valid after StartFInstances() and should be called
+  /// only from the backend execution (ie. not the coordinator side, since they require
+  /// holding an backend resource refcnt).
+  const DescriptorTbl& desc_tbl() const {
+    DCHECK_GT(backend_resource_refcnt_.Load(), 0);
+    return *desc_tbl_;
+  }
+
+  /// Sets up state required for fragment execution: memory reservations, etc. Fails if
+  /// resources could not be acquired. Acquires a backend resource refcount and returns
+  /// it to the caller on both success and failure. The caller must release it by
+  /// calling ReleaseExecResourceRefcount().
   ///
   /// Uses few cycles and never blocks. Not idempotent, not thread-safe.
   /// The remaining public functions must be called only after Init().
@@ -162,14 +182,14 @@ class QueryState {
 
   /// Increment the resource refcount. Must be decremented before the query state
   /// reference is released. A refcount should be held by a fragment or other entity
-  /// for as long as it is consuming query execution resources (e.g. memory).
-  void AcquireExecResourceRefcount();
+  /// for as long as it is consuming query backend execution resources (e.g. memory).
+  void AcquireBackendResourceRefcount();
 
   /// Decrement the execution resource refcount and release resources if it goes to zero.
   /// All resource refcounts must be released before query state references are released.
   /// Should be called by the owner of the refcount after it is done consuming query
   /// execution resources.
-  void ReleaseExecResourceRefcount();
+  void ReleaseBackendResourceRefcount();
 
   /// Sends a ReportExecStatus rpc to the coordinator. If fis == nullptr, the
   /// status must be an error. If fis is given, the content will depend on whether
@@ -315,10 +335,10 @@ class QueryState {
   /// this daemon. Owned by 'obj_pool_'. Set in Init().
   ScannerMemLimiter* scanner_mem_limiter_ = nullptr;
 
-  /// Number of active fragment instances and coordinators for this query that may consume
-  /// resources for query execution (i.e. threads, memory) on the Impala daemon.
-  /// Query-wide execution resources for this query are released once this goes to zero.
-  AtomicInt32 exec_resource_refcnt_;
+  /// Number of active fragment instances for this query that may consume resources for
+  /// query backend execution (i.e. threads, memory) on the Impala daemon.  Query-wide
+  /// backend execution resources for this query are released once this goes to zero.
+  AtomicInt32 backend_resource_refcnt_;
 
   /// Temporary files for this query (owned by obj_pool_). Non-null if spilling is
   /// enabled. Set in Prepare().
@@ -359,7 +379,7 @@ class QueryState {
   AtomicInt32 is_cancelled_;
 
   /// True if and only if ReleaseExecResources() has been called.
-  bool released_exec_resources_ = false;
+  bool released_backend_resources_ = false;
 
   /// Whether the query has spilled. 0 if the query has not spilled. Atomically set to 1
   /// when the query first starts to spill. Required to correctly maintain the
@@ -384,9 +404,10 @@ class QueryState {
   /// Called from Init() to set up buffer reservations and the file group.
   Status InitBufferPoolState() WARN_UNUSED_RESULT;
 
-  /// Releases resources used for query execution. Guaranteed to be called only once.
-  /// Must be called before destroying the QueryState. Not idempotent and not thread-safe.
-  void ReleaseExecResources();
+  /// Releases resources used for query backend execution. Guaranteed to be called only
+  /// once. Must be called before destroying the QueryState. Not idempotent and not
+  /// thread-safe.
+  void ReleaseBackendResources();
 
   /// Same behavior as ReportExecStatus().
   /// Cancel on error only if instances_started is true.

http://git-wip-us.apache.org/repos/asf/impala/blob/fa0869d1/be/src/runtime/runtime-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-state.cc b/be/src/runtime/runtime-state.cc
index dedb5f5..3e5a169 100644
--- a/be/src/runtime/runtime-state.cc
+++ b/be/src/runtime/runtime-state.cc
@@ -93,7 +93,7 @@ RuntimeState::RuntimeState(
     profile_(RuntimeProfile::Create(obj_pool(), "<unnamed>")) {
   // We may use execution resources while evaluating exprs, etc. Decremented in
   // ReleaseResources() to release resources.
-  local_query_state_->AcquireExecResourceRefcount();
+  local_query_state_->AcquireBackendResourceRefcount();
   if (query_ctx().request_pool.empty()) {
     const_cast<TQueryCtx&>(query_ctx()).request_pool = "test-pool";
   }
@@ -279,7 +279,7 @@ void RuntimeState::ReleaseResources() {
   instance_mem_tracker_->Close();
 
   if (local_query_state_.get() != nullptr) {
-    local_query_state_->ReleaseExecResourceRefcount();
+    local_query_state_->ReleaseBackendResourceRefcount();
   }
   released_resources_ = true;
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/fa0869d1/be/src/runtime/test-env.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/test-env.cc b/be/src/runtime/test-env.cc
index 0d6c6f0..486a230 100644
--- a/be/src/runtime/test-env.cc
+++ b/be/src/runtime/test-env.cc
@@ -86,7 +86,7 @@ void TestEnv::TearDownQueries() {
   for (RuntimeState* runtime_state : runtime_states_) runtime_state->ReleaseResources();
   runtime_states_.clear();
   for (QueryState* query_state : query_states_) {
-    query_state->ReleaseExecResourceRefcount();
+    query_state->ReleaseBackendResourceRefcount(); // Acquired by QueryState::Init()
     exec_env_->query_exec_mgr()->ReleaseQueryState(query_state);
   }
   query_states_.clear();

http://git-wip-us.apache.org/repos/asf/impala/blob/fa0869d1/tests/custom_cluster/test_rpc_timeout.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_rpc_timeout.py b/tests/custom_cluster/test_rpc_timeout.py
index 4ea5351..419bec0 100644
--- a/tests/custom_cluster/test_rpc_timeout.py
+++ b/tests/custom_cluster/test_rpc_timeout.py
@@ -78,9 +78,18 @@ class TestRPCTimeout(CustomClusterTestSuite):
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args("--backend_client_rpc_timeout_ms=1000"
+      " --fault_injection_rpc_delay_ms=1000 --fault_injection_rpc_type=1"
+      " --datastream_sender_timeout_ms=30000")
+  def test_execqueryfinstances_race(self, vector):
+    """ Test for IMPALA-7464, where the rpc times out while the rpc handler continues to
+        run simultaneously."""
+    self.execute_query_verify_metrics(self.TEST_QUERY)
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args("--backend_client_rpc_timeout_ms=1000"
       " --fault_injection_rpc_delay_ms=3000 --fault_injection_rpc_type=1"
       " --datastream_sender_timeout_ms=30000")
-  def test_execplanfragment_timeout(self, vector):
+  def test_execqueryfinstances_timeout(self, vector):
     for i in range(3):
       ex= self.execute_query_expect_failure(self.client, self.TEST_QUERY)
       assert "RPC recv timed out" in str(ex)