You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by he...@apache.org on 2016/10/28 20:02:39 UTC

[5/5] incubator-impala git commit: IMPALA-4348 / IMPALA-4333: Improve coordinator fragment cancellation

IMPALA-4348 / IMPALA-4333: Improve coordinator fragment cancellation

This patch fixes two problems:

1. If a fragment instance is cancelled (by the coordinator) concurrently
to the coordinator calling GetNext() on its PlanRootSink, GetNext() may
access a destroyed PlanRootSink object and crash (IMPALA-4333).

2. If ExecPlanFragment() times out, but is successful, the coordinator
will never call CloseConsumer() to release its side of the PlanRootSink,
and the fragment instance will never finish (IMPALA-4348).

The following changes address this:

* Make PlanFragmentExecutor::Cancel() call CloseConsumer() on its root
  sink. This ensures that, no matter what the reason, the fragment
  instance will eventually terminate. Otherwise the coordinator may not
  call CloseConsumer() if it could not access the root instance's
  FragmentExecState.

* Ensure that the PlanRootSink has a lifetime at least as long as the
  coordinator object by taking a shared_ptr reference to its parent
  FragmentExecState object. Even if the fragment instance finishes, the
  object will still be valid until the coordinator releases this
  reference. In the future, we expect this to be replaced by an explicit
  reference take / relinquish API.

* Ensure the coordinator tries to cancel fragment instances if any
  attempt was made to start them, not just if the RPC was a
  success. This deals with the timeout case where the RPC was slow, but
  successful.

This patch relies on a fix for IMPALA-4383, which ensures that fragment
instances will always try to send reports, and so will always detect an
error and cancel themselves.

Testing:

* Ran TestRPCTimeout.test_execplanfragment_timeout in a loop for 90
minutes. Patch addresses hangs and failure modes that were observed
previously.

* Added a manual sleep just before Coordinator::GetNext() calls
  root_sink_->GetNext(), and cancelled a query manually in that
  window. Confirmed that query cancels successfully.

Change-Id: I5b4e25c1d658b3929182ba5e56b5c5e881dd394a
Reviewed-on: http://gerrit.cloudera.org:8080/4865
Tested-by: Internal Jenkins
Reviewed-by: Henry Robinson <he...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/3b8a0891
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/3b8a0891
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/3b8a0891

Branch: refs/heads/master
Commit: 3b8a0891d9eebec00fc2654bb4a705752edc6759
Parents: d82411f
Author: Henry Robinson <he...@cloudera.com>
Authored: Wed Oct 26 03:29:28 2016 -0700
Committer: Henry Robinson <he...@cloudera.com>
Committed: Fri Oct 28 19:40:58 2016 +0000

----------------------------------------------------------------------
 be/src/exec/plan-root-sink.cc            |  5 +--
 be/src/exec/plan-root-sink.h             | 25 +++++++-----
 be/src/runtime/coordinator.cc            | 57 +++++++++++++++++----------
 be/src/runtime/coordinator.h             |  7 +++-
 be/src/runtime/plan-fragment-executor.cc |  6 +++
 5 files changed, 65 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3b8a0891/be/src/exec/plan-root-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/plan-root-sink.cc b/be/src/exec/plan-root-sink.cc
index c728f4a..5f6cd75 100644
--- a/be/src/exec/plan-root-sink.cc
+++ b/be/src/exec/plan-root-sink.cc
@@ -152,16 +152,15 @@ void PlanRootSink::CloseConsumer() {
 Status PlanRootSink::GetNext(
     RuntimeState* state, QueryResultSet* results, int num_results, bool* eos) {
   unique_lock<mutex> l(lock_);
-  DCHECK(!consumer_done_);
 
   results_ = results;
   num_rows_requested_ = num_results;
   sender_cv_.notify_all();
 
   while (!eos_ && results_ != nullptr && !sender_done_) consumer_cv_.wait(l);
+
   *eos = eos_;
-  RETURN_IF_ERROR(state->CheckQueryState());
-  return Status::OK();
+  return state->CheckQueryState();
 }
 
 void PlanRootSink::GetRowValue(

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3b8a0891/be/src/exec/plan-root-sink.h
----------------------------------------------------------------------
diff --git a/be/src/exec/plan-root-sink.h b/be/src/exec/plan-root-sink.h
index cc7c045..a9c47c3b 100644
--- a/be/src/exec/plan-root-sink.h
+++ b/be/src/exec/plan-root-sink.h
@@ -36,16 +36,18 @@ class ExprContext;
 ///
 /// The consumer calls GetNext() with a QueryResultSet and a requested fetch
 /// size. GetNext() shares these fields with Send(), and then signals Send() to begin
-/// populating the result set. GetNext() returns when either the sender has sent all of
-/// its rows, or the requested fetch size has been satisfied.
+/// populating the result set. GetNext() returns when a) the sender has sent all of its
+/// rows b) the requested fetch size has been satisfied or c) the sender calls Close().
 ///
 /// Send() fills in as many rows as are requested from the current batch. When the batch
 /// is exhausted - which may take several calls to GetNext() - control is returned to the
 /// sender to produce another row batch.
 ///
-/// Consumers must call CloseConsumer() when finished to allow the fragment to shut
-/// down. Senders must call Close() to signal to the consumer that no more batches will be
-/// produced.
+/// When the consumer is finished, CloseConsumer() must be called to allow the sender to
+/// exit Send(). Senders must call Close() to signal to the consumer that no more batches
+/// will be produced. CloseConsumer() may be called concurrently with GetNext(). Senders
+/// should ensure that the consumer is not blocked in GetNext() before destroying the
+/// PlanRootSink.
 ///
 /// The sink is thread safe up to a single producer and single consumer.
 ///
@@ -72,17 +74,20 @@ class PlanRootSink : public DataSink {
   virtual Status FlushFinal(RuntimeState* state);
 
   /// To be called by sender only. Signals to the consumer that no more batches will be
-  /// produced, then blocks until the consumer calls CloseConsumer().
+  /// produced, then blocks until someone calls CloseConsumer().
   virtual void Close(RuntimeState* state);
 
   /// Populates 'result_set' with up to 'num_rows' rows produced by the fragment instance
-  /// that calls Send(). *eos is set to 'true' when there are no more rows to consume.
+  /// that calls Send(). *eos is set to 'true' when there are no more rows to consume. If
+  /// CloseConsumer() is called concurrently, GetNext() will return and may not populate
+  /// 'result_set'. All subsequent calls after CloseConsumer() will do no work.
   Status GetNext(
       RuntimeState* state, QueryResultSet* result_set, int num_rows, bool* eos);
 
-  /// Signals to the producer that the sink will no longer be used. It's an error to call
-  /// GetNext() after this returns. May be called more than once; only the first call has
-  /// any effect.
+  /// Signals to the producer that the sink will no longer be used. GetNext() may be
+  /// safely called after this returns (it does nothing), but consumers should consider
+  /// that the PlanRootSink may be undergoing destruction. May be called more than once;
+  /// only the first call has any effect.
   void CloseConsumer();
 
   static const std::string NAME;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3b8a0891/be/src/runtime/coordinator.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index f991e1c..7df0bf0 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -162,12 +162,14 @@ class Coordinator::FragmentInstanceState {
   }
 
   /// Called to set the initial status of the fragment instance after the
-  /// ExecRemoteFragment() RPC has returned.
-  void SetInitialStatus(const Status& status) {
+  /// ExecRemoteFragment() RPC has returned. If 'rpc_sent' is true,
+  /// CancelFragmentInstances() will include this instance in the set of potential
+  /// fragment instances to cancel.
+  void SetInitialStatus(const Status& status, bool rpc_sent) {
     DCHECK(!rpc_sent_);
+    rpc_sent_ = rpc_sent;
     status_ = status;
     if (!status_.ok()) return;
-    rpc_sent_ = true;
     stopwatch_.Start();
   }
 
@@ -244,7 +246,8 @@ class Coordinator::FragmentInstanceState {
   /// been initiated; either way, execution must not be cancelled.
   Status status_;
 
-  /// If true, ExecPlanFragment() rpc has been sent.
+  /// If true, ExecPlanFragment() rpc has been sent - even if it was not determined to be
+  /// successful.
   bool rpc_sent_;
 
   /// If true, execution terminated; do not cancel in that case.
@@ -502,17 +505,30 @@ Status Coordinator::Exec() {
     StartFragments();
   }
 
+  // In the error case, it's safe to return and not to get root_sink_ here to close - if
+  // there was an error, but the coordinator fragment was successfully started, it should
+  // cancel itself when it receives an error status after reporting its profile.
   RETURN_IF_ERROR(FinishInstanceStartup());
 
   // Grab executor and wait until Prepare() has finished so that runtime state etc. will
-  // be set up.
+  // be set up. Must do this here in order to get a reference to root_fragment_instance_
+  // so that root_sink_ remains valid throughout query lifetime.
   if (schedule_.GetCoordFragment() != nullptr) {
     // Coordinator fragment instance has same ID as query.
-    shared_ptr<FragmentMgr::FragmentExecState> root_fragment_instance =
+    root_fragment_instance_ =
         ExecEnv::GetInstance()->fragment_mgr()->GetFragmentExecState(query_id_);
-    DCHECK(root_fragment_instance.get() != nullptr);
-    executor_ = root_fragment_instance->executor();
-
+    // Fragment instance might have been failed and unregistered itself even though it was
+    // successfully started (e.g. Prepare() might have failed).
+    if (root_fragment_instance_.get() == nullptr) {
+      FragmentInstanceState* root_state = fragment_instance_states_[0];
+      DCHECK(root_state != nullptr);
+      lock_guard<mutex> instance_state_lock(*root_state->lock());
+      // Try and return the fragment instance status if it was already set.
+      // TODO: Consider waiting for root_state->done() here.
+      RETURN_IF_ERROR(*root_state->status());
+      return Status(Substitute("Root fragment instance ($0) failed", PrintId(query_id_)));
+    }
+    executor_ = root_fragment_instance_->executor();
     // When WaitForPrepare() returns OK(), the executor's root sink will be set up. At
     // that point, the coordinator must be sure to call root_sink()->CloseConsumer(); the
     // fragment instance's executor will not complete until that point.
@@ -1417,7 +1433,7 @@ void Coordinator::MtExecRemoteFInstance(
   ImpalaBackendConnection backend_client(exec_env_->impalad_client_cache(),
       exec_state->impalad_address(), &client_connect_status);
   if (!client_connect_status.ok()) {
-    exec_state->SetInitialStatus(client_connect_status);
+    exec_state->SetInitialStatus(client_connect_status, false);
     return;
   }
 
@@ -1432,7 +1448,7 @@ void Coordinator::MtExecRemoteFInstance(
     const string& err_msg = Substitute(ERR_TEMPLATE, PrintId(query_id()),
         PrintId(exec_state->fragment_instance_id()), rpc_status.msg().msg());
     VLOG_QUERY << err_msg;
-    exec_state->SetInitialStatus(Status(err_msg));
+    exec_state->SetInitialStatus(Status(err_msg), true);
     return;
   }
 
@@ -1442,11 +1458,11 @@ void Coordinator::MtExecRemoteFInstance(
         PrintId(exec_state->fragment_instance_id()),
         exec_status.msg().GetFullMessageDetails());
     VLOG_QUERY << err_msg;
-    exec_state->SetInitialStatus(Status(err_msg));
+    exec_state->SetInitialStatus(Status(err_msg), true);
     return;
   }
 
-  exec_state->SetInitialStatus(Status::OK());
+  exec_state->SetInitialStatus(Status::OK(), true);
   VLOG_FILE << "rpc succeeded: ExecPlanFragment"
       << " instance_id=" << PrintId(exec_state->fragment_instance_id());
 }
@@ -1480,7 +1496,7 @@ void Coordinator::ExecRemoteFragment(const FragmentExecParams& fragment_exec_par
   ImpalaBackendConnection backend_client(exec_env_->impalad_client_cache(),
       exec_state->impalad_address(), &client_connect_status);
   if (!client_connect_status.ok()) {
-    exec_state->SetInitialStatus(client_connect_status);
+    exec_state->SetInitialStatus(client_connect_status, true);
     return;
   }
 
@@ -1497,7 +1513,7 @@ void Coordinator::ExecRemoteFragment(const FragmentExecParams& fragment_exec_par
         Substitute(ERR_TEMPLATE, PrintId(exec_state->fragment_instance_id()),
           rpc_status.msg().msg());
     VLOG_QUERY << err_msg;
-    exec_state->SetInitialStatus(Status(err_msg));
+    exec_state->SetInitialStatus(Status(err_msg), true);
     return;
   }
 
@@ -1507,11 +1523,11 @@ void Coordinator::ExecRemoteFragment(const FragmentExecParams& fragment_exec_par
         Substitute(ERR_TEMPLATE, PrintId(exec_state->fragment_instance_id()),
           exec_plan_status.msg().GetFullMessageDetails());
     VLOG_QUERY << err_msg;
-    exec_state->SetInitialStatus(Status(err_msg));
+    exec_state->SetInitialStatus(Status(err_msg), true);
     return;
   }
 
-  exec_state->SetInitialStatus(Status::OK());
+  exec_state->SetInitialStatus(Status::OK(), true);
   return;
 }
 
@@ -1547,15 +1563,16 @@ void Coordinator::CancelFragmentInstances() {
     // to set its status)
     lock_guard<mutex> l(*exec_state->lock());
 
-    // no need to cancel if we already know it terminated w/ an error status
-    if (!exec_state->status()->ok()) continue;
-
     // Nothing to cancel if the exec rpc was not sent
     if (!exec_state->rpc_sent()) continue;
 
     // don't cancel if it already finished
     if (exec_state->done()) continue;
 
+    /// If the status is not OK, we still try to cancel - !OK status might mean
+    /// communication failure between fragment instance and coordinator, but fragment
+    /// instance might still be running.
+
     // set an error status to make sure we only cancel this once
     exec_state->set_status(Status::CANCELLED);
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3b8a0891/be/src/runtime/coordinator.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h
index 066d532..176d89d 100644
--- a/be/src/runtime/coordinator.h
+++ b/be/src/runtime/coordinator.h
@@ -296,8 +296,11 @@ class Coordinator {
   /// Result rows are materialized by this fragment instance in its own thread. They are
   /// materialized into a QueryResultSet provided to the coordinator during GetNext().
   ///
-  /// Not owned by this class, created during fragment instance start-up by
-  /// FragmentExecState and set here in Exec().
+  /// Created during fragment instance start-up by FragmentExecState and set here in
+  /// Exec().  Keep a shared_ptr reference to the fragment state so that root_sink_ will
+  /// be valid for the lifetime of the coordinator. This is important if, for example, the
+  /// fragment instance is cancelled while the coordinator is calling GetNext().
+  std::shared_ptr<FragmentMgr::FragmentExecState> root_fragment_instance_;
   PlanFragmentExecutor* executor_ = nullptr;
   PlanRootSink* root_sink_ = nullptr;
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3b8a0891/be/src/runtime/plan-fragment-executor.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/plan-fragment-executor.cc b/be/src/runtime/plan-fragment-executor.cc
index d079c36..c7b580e 100644
--- a/be/src/runtime/plan-fragment-executor.cc
+++ b/be/src/runtime/plan-fragment-executor.cc
@@ -504,6 +504,12 @@ void PlanFragmentExecutor::Cancel() {
     VLOG_QUERY << "Cancel() called before Prepare()";
     return;
   }
+
+  // Ensure that the sink is closed from both sides. Although in ordinary executions we
+  // rely on the consumer to do this, in error cases the consumer may not be able to send
+  // CloseConsumer() (see IMPALA-4348 for an example).
+  if (root_sink_ != nullptr) root_sink_->CloseConsumer();
+
   DCHECK(runtime_state_ != NULL);
   VLOG_QUERY << "Cancel(): instance_id=" << runtime_state_->fragment_instance_id();
   runtime_state_->set_is_cancelled(true);