You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by tm...@apache.org on 2018/05/29 19:14:10 UTC

[07/15] impala git commit: IMPALA-7011: Simplify PlanRootSink control logic

IMPALA-7011: Simplify PlanRootSink control logic

1) The eos_ and sender_done_ bits really encode three possible states
   that the sender can be in. Make this explicit using an enum with
   three values.

2) The purpose of CloseConsumer() has changed over time and we can clean
   this up now:

 a) Originally, it looks like it was used to unblock the sender when the
   consumer finishes before eos, but also keep the sink alive long
   enough for the coordinator. This is no longer necessary now that
   control structures are owned by the QueryState whose lifetime is
   controlled by a reference count taken by the coordinator. So, we don't
   need the coordinator to tell the sink it's done calling it and we
   don't need the consumer_done_ state.

 b) Later on, CloseConsumer() was used as a cancellation mechinism.
   We need to keep this around (or use timeouts on the condvars) to kick
   both the consumer and producer on cancellation. But let's make the
   cancellation logic similar to the exec nodes and other sinks by
   driving the cancellation using the RuntimeState's cancellation
   flag. Now that CloseConsumer() is only about cancellation, rename it
   to Cancel() (later we may promote it to DataSink and implement in the
   data stream sender as well).

Testing:
- Exhaustive
- Minicluster concurrent_select.py stress

Change-Id: Ifc75617a253fd43a6122baa4b4dc7aeb1dbe633f
Reviewed-on: http://gerrit.cloudera.org:8080/10449
Reviewed-by: Dan Hecht <dh...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/3b8a9648
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/3b8a9648
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/3b8a9648

Branch: refs/heads/2.x
Commit: 3b8a96485e36336c8e823479559cd81edc2a231d
Parents: fd27b17
Author: Dan Hecht <dh...@cloudera.com>
Authored: Thu May 17 17:03:54 2018 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Fri May 25 23:17:16 2018 +0000

----------------------------------------------------------------------
 be/src/exec/plan-root-sink.cc             | 39 +++++++-------
 be/src/exec/plan-root-sink.h              | 72 +++++++++++++-------------
 be/src/runtime/coordinator.cc             | 11 +---
 be/src/runtime/fragment-instance-state.cc |  6 +--
 4 files changed, 60 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/3b8a9648/be/src/exec/plan-root-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/plan-root-sink.cc b/be/src/exec/plan-root-sink.cc
index 836a376..a64dbb9 100644
--- a/be/src/exec/plan-root-sink.cc
+++ b/be/src/exec/plan-root-sink.cc
@@ -72,11 +72,10 @@ Status PlanRootSink::Send(RuntimeState* state, RowBatch* batch) {
   // written clients may not cope correctly with them. See IMPALA-4335.
   while (current_batch_row < batch->num_rows()) {
     unique_lock<mutex> l(lock_);
-    while (results_ == nullptr && !consumer_done_) sender_cv_.Wait(l);
-    if (consumer_done_ || batch == nullptr) {
-      eos_ = true;
-      return Status::OK();
-    }
+    // Wait until the consumer gives us a result set to fill in, or the fragment
+    // instance has been cancelled.
+    while (results_ == nullptr && !state->is_cancelled()) sender_cv_.Wait(l);
+    RETURN_IF_CANCELLED(state);
 
     // Otherwise the consumer is ready. Fill out the rows.
     DCHECK(results_ != nullptr);
@@ -107,29 +106,26 @@ Status PlanRootSink::Send(RuntimeState* state, RowBatch* batch) {
 
 Status PlanRootSink::FlushFinal(RuntimeState* state) {
   unique_lock<mutex> l(lock_);
-  sender_done_ = true;
-  eos_ = true;
+  sender_state_ = SenderState::EOS;
   consumer_cv_.NotifyAll();
   return Status::OK();
 }
 
 void PlanRootSink::Close(RuntimeState* state) {
   unique_lock<mutex> l(lock_);
-  // No guarantee that FlushFinal() has been called, so need to mark sender_done_ here as
-  // well.
-  // TODO: shouldn't this also set eos to true? do we need both eos and sender_done_?
-  sender_done_ = true;
+  // FlushFinal() won't have been called when the fragment instance encounters an error
+  // before sending all rows.
+  if (sender_state_ == SenderState::ROWS_PENDING) {
+    sender_state_ = SenderState::CLOSED_NOT_EOS;
+  }
   consumer_cv_.NotifyAll();
-  // Wait for consumer to be done, in case sender tries to tear-down this sink while the
-  // sender is still reading from it.
-  while (!consumer_done_) sender_cv_.Wait(l);
   DataSink::Close(state);
 }
 
-void PlanRootSink::CloseConsumer() {
-  unique_lock<mutex> l(lock_);
-  consumer_done_ = true;
+void PlanRootSink::Cancel(RuntimeState* state) {
+  DCHECK(state->is_cancelled());
   sender_cv_.NotifyAll();
+  consumer_cv_.NotifyAll();
 }
 
 Status PlanRootSink::GetNext(
@@ -140,9 +136,14 @@ Status PlanRootSink::GetNext(
   num_rows_requested_ = num_results;
   sender_cv_.NotifyAll();
 
-  while (!eos_ && results_ != nullptr && !sender_done_) consumer_cv_.Wait(l);
+  // Wait while the sender is still producing rows and hasn't filled in the current
+  // result set.
+  while (sender_state_ == SenderState::ROWS_PENDING && results_ != nullptr &&
+      !state->is_cancelled()) {
+    consumer_cv_.Wait(l);
+  }
 
-  *eos = eos_;
+  *eos = sender_state_ == SenderState::EOS;
   return state->GetQueryStatus();
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/3b8a9648/be/src/exec/plan-root-sink.h
----------------------------------------------------------------------
diff --git a/be/src/exec/plan-root-sink.h b/be/src/exec/plan-root-sink.h
index 87ab3ef..1d64b21 100644
--- a/be/src/exec/plan-root-sink.h
+++ b/be/src/exec/plan-root-sink.h
@@ -36,19 +36,25 @@ class ScalarExprEvaluator;
 /// The consumer calls GetNext() with a QueryResultSet and a requested fetch
 /// size. GetNext() shares these fields with Send(), and then signals Send() to begin
 /// populating the result set. GetNext() returns when a) the sender has sent all of its
-/// rows b) the requested fetch size has been satisfied or c) the sender calls Close().
+/// rows b) the requested fetch size has been satisfied or c) the sender fragment
+/// instance was cancelled.
 ///
-/// Send() fills in as many rows as are requested from the current batch. When the batch
-/// is exhausted - which may take several calls to GetNext() - control is returned to the
-/// sender to produce another row batch.
+/// The sender uses Send() to fill in as many rows as are requested from the current
+/// batch. When the batch is exhausted - which may take several calls to GetNext() -
+/// Send() returns so that the fragment instance can produce another row batch.
 ///
-/// When the consumer is finished, CloseConsumer() must be called to allow the sender to
-/// exit Send(). Senders must call Close() to signal to the consumer that no more batches
-/// will be produced. CloseConsumer() may be called concurrently with GetNext(). Senders
-/// should ensure that the consumer is not blocked in GetNext() before destroying the
-/// PlanRootSink.
+/// FlushFinal() should be called by the sender to signal it has finished calling
+/// Send() for all rows. Close() should be called by the sender to release resources.
 ///
-/// The sink is thread safe up to a single producer and single consumer.
+/// When the fragment instance is cancelled, Cancel() is called to unblock both the
+/// sender and consumer. Cancel() may be called concurrently with Send(), GetNext() and
+/// Close().
+///
+/// The sink is thread safe up to a single sender and single consumer.
+///
+/// Lifetime: The sink is owned by the QueryState and has the same lifetime as
+/// QueryState. The QueryState references from the fragment instance and the Coordinator
+/// ensures that this outlives any calls to Send() and GetNext(), respectively.
 ///
 /// TODO: The consumer drives the sender in lock-step with GetNext() calls, forcing a
 /// context-switch on every invocation. Measure the impact of this, and consider moving to
@@ -62,25 +68,23 @@ class PlanRootSink : public DataSink {
   /// consumer has consumed 'batch' by calling GetNext().
   virtual Status Send(RuntimeState* state, RowBatch* batch);
 
-  /// Sets eos and notifies consumer.
+  /// Indicates eos and notifies consumer.
   virtual Status FlushFinal(RuntimeState* state);
 
-  /// To be called by sender only. Signals to the consumer that no more batches will be
-  /// produced, then blocks until someone calls CloseConsumer().
+  /// To be called by sender only. Release resources and unblocks consumer.
   virtual void Close(RuntimeState* state);
 
-  /// Populates 'result_set' with up to 'num_rows' rows produced by the fragment instance
-  /// that calls Send(). *eos is set to 'true' when there are no more rows to consume. If
-  /// CloseConsumer() is called concurrently, GetNext() will return and may not populate
-  /// 'result_set'. All subsequent calls after CloseConsumer() will do no work.
+  /// To be called by the consumer only. 'result_set' with up to 'num_rows' rows
+  /// produced by the fragment instance that calls Send(). *eos is set to 'true' when
+  /// there are no more rows to consume. If Cancel() or Close() are called concurrently,
+  /// GetNext() will return and may not populate 'result_set'. All subsequent calls
+  /// after Cancel() or Close() are no-ops.
   Status GetNext(
       RuntimeState* state, QueryResultSet* result_set, int num_rows, bool* eos);
 
-  /// Signals to the producer that the sink will no longer be used. GetNext() may be
-  /// safely called after this returns (it does nothing), but consumers should consider
-  /// that the PlanRootSink may be undergoing destruction. May be called more than once;
-  /// only the first call has any effect.
-  void CloseConsumer();
+  /// Unblocks both the consumer and sender so they can check the cancellation flag in
+  /// the RuntimeState. The cancellation flag should be set prior to calling this.
+  void Cancel(RuntimeState* state);
 
   static const std::string NAME;
 
@@ -90,21 +94,22 @@ class PlanRootSink : public DataSink {
 
   /// Waited on by the sender only. Signalled when the consumer has written results_ and
   /// num_rows_requested_, and so the sender may begin satisfying that request for rows
-  /// from its current batch. Also signalled when CloseConsumer() is called, to unblock
-  /// the sender.
+  /// from its current batch. Also signalled when Cancel() is called, to unblock the
+  /// sender.
   ConditionVariable sender_cv_;
 
   /// Waited on by the consumer only. Signalled when the sender has finished serving a
-  /// request for rows. Also signalled by Close() and FlushFinal() to signal to the
-  /// consumer that no more rows are coming.
+  /// request for rows. Also signalled by FlushFinal(), Close() and Cancel() to unblock
+  /// the consumer.
   ConditionVariable consumer_cv_;
 
-  /// Signals to producer that the consumer is done, and the sink may be torn down.
-  bool consumer_done_ = false;
-
-  /// Signals to consumer that the sender is done, and that there are no more row batches
-  /// to consume.
-  bool sender_done_ = false;
+  /// State of the sender:
+  /// - ROWS_PENDING: the sender is still producing rows; the only non-terminal state
+  /// - EOS: the sender has passed all rows to Send()
+  /// - CLOSED_NOT_EOS: the sender (i.e. sink) was closed before all rows were passed to
+  ///   Send()
+  enum class SenderState { ROWS_PENDING, EOS, CLOSED_NOT_EOS };
+  SenderState sender_state_ = SenderState::ROWS_PENDING;
 
   /// The current result set passed to GetNext(), to fill in Send(). Not owned by this
   /// sink. Reset to nullptr after Send() completes the request to signal to the consumer
@@ -114,9 +119,6 @@ class PlanRootSink : public DataSink {
   /// Set by GetNext() to indicate to Send() how many rows it should write to results_.
   int num_rows_requested_ = 0;
 
-  /// Set to true in Send() and FlushFinal() when the Sink() has finished producing rows.
-  bool eos_ = false;
-
   /// Writes a single row into 'result' and 'scales' by evaluating
   /// output_expr_evals_ over 'row'.
   void GetRowValue(TupleRow* row, std::vector<void*>* result, std::vector<int>* scales);

http://git-wip-us.apache.org/repos/asf/impala/blob/3b8a9648/be/src/runtime/coordinator.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 042605d..995d747 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -148,14 +148,8 @@ Status Coordinator::Exec() {
       DCHECK(!prepare_status.ok());
       return UpdateExecState(prepare_status, nullptr, FLAGS_hostname);
     }
-
-    // When GetFInstanceState() returns the coordinator instance, the Prepare phase
-    // is done and the FragmentInstanceState's root sink will be set up. At that point,
-    // the coordinator must be sure to call root_sink()->CloseConsumer(); the
-    // fragment instance's executor will not complete until that point.
-    // TODO: what does this mean?
-    // TODO: Consider moving this to Wait().
-    // TODO: clarify need for synchronization on this event
+    // When GetFInstanceState() returns the coordinator instance, the Prepare phase is
+    // done and the FragmentInstanceState's root sink will be set up.
     DCHECK(coord_instance_->IsPrepared() && coord_instance_->WaitForPrepare().ok());
     coord_sink_ = coord_instance_->root_sink();
     DCHECK(coord_sink_ != nullptr);
@@ -527,7 +521,6 @@ void Coordinator::HandleExecStateTransition(
       exec_rpcs_complete_barrier_->pending() <= 0) << "exec rpcs not completed";
 
   query_events_->MarkEvent(exec_state_to_event.at(new_state));
-  if (coord_sink_ != nullptr) coord_sink_->CloseConsumer();
   // This thread won the race to transitioning into a terminal state - terminate
   // execution and release resources.
   ReleaseExecResources();

http://git-wip-us.apache.org/repos/asf/impala/blob/3b8a9648/be/src/runtime/fragment-instance-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/fragment-instance-state.cc b/be/src/runtime/fragment-instance-state.cc
index a14bf31..c61cb81 100644
--- a/be/src/runtime/fragment-instance-state.cc
+++ b/be/src/runtime/fragment-instance-state.cc
@@ -103,13 +103,9 @@ void FragmentInstanceState::Cancel() {
   // being cancelled.
   discard_result(WaitForPrepare());
 
-  // Ensure that the sink is closed from both sides. Although in ordinary executions we
-  // rely on the consumer to do this, in error cases the consumer may not be able to send
-  // CloseConsumer() (see IMPALA-4348 for an example).
-  if (root_sink_ != nullptr) root_sink_->CloseConsumer();
-
   DCHECK(runtime_state_ != nullptr);
   runtime_state_->set_is_cancelled();
+  if (root_sink_ != nullptr) root_sink_->Cancel(runtime_state_);
   runtime_state_->stream_mgr()->Cancel(runtime_state_->fragment_instance_id());
 }