You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by sa...@apache.org on 2017/05/26 15:00:11 UTC

[3/3] incubator-impala git commit: IMPALA-4890/5143: Coordinator race involving TearDown()

IMPALA-4890/5143: Coordinator race involving TearDown()

TearDown() releases resources and destroys control
structures (the QueryState reference), and it can be called
while a concurrent thread executes Exec() or might call
GetNext() in the future. The solution is not to destroy
the control structures.

This also releases resources automatically at the end
of query execution.

Change-Id: I457a6424a0255c137336c4bc01a6e7ed830d18c7
Reviewed-on: http://gerrit.cloudera.org:8080/6897
Reviewed-by: Marcel Kornacker <ma...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/bad10da4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/bad10da4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/bad10da4

Branch: refs/heads/master
Commit: bad10da4a6eb13673aa44eb4d45d6ad87a2dd690
Parents: 5d59d85
Author: Marcel Kornacker <ma...@cloudera.com>
Authored: Sun May 14 21:32:41 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Fri May 26 13:45:42 2017 +0000

----------------------------------------------------------------------
 be/src/runtime/coordinator.cc          | 55 +++++++++++++----------------
 be/src/runtime/coordinator.h           | 36 ++++++++-----------
 be/src/service/client-request-state.cc |  3 --
 3 files changed, 38 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/bad10da4/be/src/runtime/coordinator.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 741da32..5b86690 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -108,7 +108,11 @@ Coordinator::Coordinator(
     query_events_(events) {}
 
 Coordinator::~Coordinator() {
-  DCHECK(torn_down_) << "TearDown() must be called before Coordinator is destroyed";
+  DCHECK(released_resources_)
+      << "ReleaseResources() must be called before Coordinator is destroyed";
+  if (query_state_ != nullptr) {
+    ExecEnv::GetInstance()->query_exec_mgr()->ReleaseQueryState(query_state_);
+  }
 }
 
 Status Coordinator::Exec() {
@@ -416,8 +420,6 @@ Status Coordinator::FinishBackendStartup() {
       "Backend startup latencies", latencies.ToHumanReadable());
 
   if (!status.ok()) {
-    // TODO: do not allow cancellation via the debug page until Exec() has returned
-    //DCHECK(query_status_.ok()); // nobody should have been able to cancel
     query_status_ = status;
     CancelInternal();
   }
@@ -881,19 +883,14 @@ Status Coordinator::GetNext(QueryResultSet* results, int max_rows, bool* eos) {
 
   if (*eos) {
     returned_all_results_ = true;
-    // Trigger tear-down of coordinator fragment by closing the consumer. Must do before
-    // WaitForBackendCompletion().
-    coord_sink_->CloseConsumer();
-    coord_sink_ = nullptr;
-
-    // Don't return final NULL until all instances have completed.  GetNext must wait for
-    // all instances to complete before ultimately signalling the end of execution via a
-    // NULL batch. After NULL is returned, the coordinator may tear down query state, and
-    // perform post-query finalization which might depend on the reports from all
-    // backends.
-    //
-    // TODO: Waiting should happen in TearDown() (and then we wouldn't need to call
-    // CloseConsumer() here). See IMPALA-4275 for details.
+    // release resources here, since we won't be fetching more result rows
+    {
+      lock_guard<mutex> l(lock_);
+      ReleaseResources();
+    }
+
+    // wait for all backends to complete before computing the summary
+    // TODO: relocate this so GetNext() won't have to wait for backends to complete?
     RETURN_IF_ERROR(WaitForBackendCompletion());
     // if the query completed successfully, compute the summary
     if (query_status_.ok()) ComputeQuerySummary();
@@ -912,12 +909,15 @@ void Coordinator::Cancel(const Status* cause) {
   // should explicitly pass Status::OK()). Fragment instances may be cancelled at the end
   // of a successful query. Need to clean up relationship between query_status_ here and
   // in QueryExecState. See IMPALA-4279.
-  query_status_ = (cause != NULL && !cause->ok()) ? *cause : Status::CANCELLED;
+  query_status_ = (cause != nullptr && !cause->ok()) ? *cause : Status::CANCELLED;
   CancelInternal();
 }
 
 void Coordinator::CancelInternal() {
   VLOG_QUERY << "Cancel() query_id=" << query_id();
+  // TODO: remove when restructuring cancellation, which should happen automatically
+  // as soon as the coordinator knows that the query is finished
+  DCHECK(!query_status_.ok());
 
   int num_cancelled = 0;
   for (BackendState* backend_state: backend_states_) {
@@ -931,6 +931,8 @@ void Coordinator::CancelInternal() {
 
   // Report the summary with whatever progress the query made before being cancelled.
   ComputeQuerySummary();
+
+  ReleaseResources();
 }
 
 Status Coordinator::UpdateBackendExecStatus(const TReportExecStatusParams& params) {
@@ -1076,11 +1078,9 @@ string Coordinator::GetErrorLog() {
   return PrintErrorMapToString(merged);
 }
 
-// TODO: call this as soon as it's clear that we won't reference the state
-// anymore, ie, in CancelInternal() and when GetNext() hits eos
-void Coordinator::TearDown() {
-  DCHECK(!torn_down_) << "Coordinator::TearDown() must not be called twice";
-  torn_down_ = true;
+void Coordinator::ReleaseResources() {
+  if (released_resources_) return;
+  released_resources_ = true;
   if (filter_routing_table_.size() > 0) {
     query_profile_->AddInfoString("Final filter table", FilterDebugString());
   }
@@ -1094,20 +1094,13 @@ void Coordinator::TearDown() {
   }
   // This may be NULL while executing UDFs.
   if (filter_mem_tracker_.get() != nullptr) {
+    // TODO: move this elsewhere, this isn't releasing resources (it's dismantling
+    // control structures)
     filter_mem_tracker_->UnregisterFromParent();
-    filter_mem_tracker_.reset();
   }
   // Need to protect against failed Prepare(), where root_sink() would not be set.
   if (coord_sink_ != nullptr) {
     coord_sink_->CloseConsumer();
-    coord_sink_ = nullptr;
-  }
-  coord_instance_ = nullptr;
-  if (query_state_ != nullptr) {
-    // Tear down the query state last - other members like 'filter_mem_tracker_'
-    // may reference objects with query lifetime, like the query MemTracker.
-    ExecEnv::GetInstance()->query_exec_mgr()->ReleaseQueryState(query_state_);
-    query_state_ = nullptr;
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/bad10da4/be/src/runtime/coordinator.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h
index bc635b1..0d772eb 100644
--- a/be/src/runtime/coordinator.h
+++ b/be/src/runtime/coordinator.h
@@ -75,7 +75,10 @@ class QueryState;
 /// Query coordinator: handles execution of fragment instances on remote nodes, given a
 /// TQueryExecRequest. As part of that, it handles all interactions with the executing
 /// backends; it is also responsible for implementing all client requests regarding the
-/// query, including cancellation.
+/// query, including cancellation. Once a query ends, either through cancellation or
+/// by returning eos, the coordinator releases resources. (Note that DML requests
+/// always end with cancellation, via ImpalaServer::UnregisterQuery()/
+/// ImpalaServer::CancelInternal()/ClientRequestState::Cancel().)
 ///
 /// The coordinator monitors the execution status of fragment instances and aborts the
 /// entire query if an error is reported by any of them.
@@ -84,10 +87,7 @@ class QueryState;
 /// rows are produced by a fragment instance that always executes on the same machine as
 /// the coordinator.
 ///
-/// Once a query has finished executing and all results have been returned either to the
-/// caller of GetNext() or a data sink, execution_completed() will return true. If the
-/// query is aborted, execution_completed should also be set to true. Coordinator is
-/// thread-safe, with the exception of GetNext().
+/// Thread-safe, with the exception of GetNext().
 //
 /// A typical sequence of calls for a single query (calls under the same numbered
 /// item can happen concurrently):
@@ -98,9 +98,6 @@ class QueryState;
 /// The implementation ensures that setting an overall error status and initiating
 /// cancellation of all fragment instances is atomic.
 ///
-/// TODO: remove TearDown() and replace with ReleaseResources(); TearDown() currently
-/// also disassembles the control structures (such as the local reference to the
-/// coordinator's FragmentInstanceState)
 /// TODO: move into separate subdirectory and move nested classes into separate files
 /// and unnest them
 /// TODO: clean up locking behavior; in particular, clarify dependency on lock_
@@ -143,7 +140,7 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   /// if any, as well as all plan fragments on remote nodes. Sets query_status_ to the
   /// given cause if non-NULL. Otherwise, sets query_status_ to Status::CANCELLED.
   /// Idempotent.
-  void Cancel(const Status* cause = NULL);
+  void Cancel(const Status* cause = nullptr);
 
   /// Updates execution status of a particular backend as well as Insert-related
   /// status (per_partition_status_ and files_to_move_). Also updates
@@ -151,13 +148,8 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   Status UpdateBackendExecStatus(const TReportExecStatusParams& params)
       WARN_UNUSED_RESULT;
 
-  /// Returns the query state.
-  /// Only valid to call after Exec() and before TearDown(). The returned
-  /// reference only remains valid until TearDown() is called.
-  QueryState* query_state() const {
-    DCHECK(!torn_down_);
-    return query_state_;
-  }
+  /// Only valid to call after Exec().
+  QueryState* query_state() const { return query_state_; }
 
   /// Only valid *after* calling Exec(). Return nullptr if the running query does not
   /// produce any rows.
@@ -207,10 +199,6 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   /// filter to fragment instances.
   void UpdateFilter(const TUpdateFilterParams& params);
 
-  /// Called once query execution is complete to tear down any remaining state.
-  /// TODO: change to ReleaseResources() and don't tear down control structures.
-  void TearDown();
-
  private:
   class BackendState;
   struct FilterTarget;
@@ -368,8 +356,8 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   /// safe to concurrently read from filter_routing_table_.
   bool filter_routing_table_complete_ = false;
 
-  /// True if and only if TearDown() has been called.
-  bool torn_down_ = false;
+  /// True if and only if ReleaseResources() has been called.
+  bool released_resources_ = false;
 
   /// Returns a local object pool.
   ObjectPool* obj_pool() { return obj_pool_.get(); }
@@ -447,6 +435,10 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   /// Build the filter routing table by iterating over all plan nodes and collecting the
   /// filters that they either produce or consume.
   void InitFilterRoutingTable();
+
+  /// Releases filter resources, unregisters the filter mem tracker, and calls
+  /// CloseConsumer() on coord_sink_. Requires lock_ to be held. Idempotent.
+  void ReleaseResources();
 };
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/bad10da4/be/src/service/client-request-state.cc
----------------------------------------------------------------------
diff --git a/be/src/service/client-request-state.cc b/be/src/service/client-request-state.cc
index 42e8de3..dcea8cb 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -446,8 +446,6 @@ Status ClientRequestState::ExecQueryOrDmlRequest(
     lock_guard<mutex> l(lock_);
     // Don't start executing the query if Cancel() was called concurrently with Exec().
     if (is_cancelled_) return Status::CANCELLED;
-    // TODO: make schedule local to coordinator and move schedule_->Release() into
-    // Coordinator::TearDown()
     schedule_.reset(new QuerySchedule(query_id(), query_exec_request,
         exec_request_.query_options, &summary_profile_, query_events_));
   }
@@ -585,7 +583,6 @@ void ClientRequestState::Done() {
                      << " because of error: " << status.GetDetail();
       }
     }
-    coord_->TearDown();
   }
 }