You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by he...@apache.org on 2016/10/16 15:56:02 UTC

[6/7] incubator-impala git commit: IMPALA-2905: Handle coordinator fragment lifecycle like all others

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f61397f/be/src/runtime/plan-fragment-executor.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/plan-fragment-executor.cc b/be/src/runtime/plan-fragment-executor.cc
index aba4a26..5269fe5 100644
--- a/be/src/runtime/plan-fragment-executor.cc
+++ b/be/src/runtime/plan-fragment-executor.cc
@@ -26,22 +26,23 @@
 #include "common/logging.h"
 #include "common/object-pool.h"
 #include "exec/data-sink.h"
-#include "exec/exec-node.h"
 #include "exec/exchange-node.h"
-#include "exec/scan-node.h"
-#include "exec/hdfs-scan-node.h"
+#include "exec/exec-node.h"
 #include "exec/hbase-table-scanner.h"
+#include "exec/hdfs-scan-node.h"
+#include "exec/plan-root-sink.h"
+#include "exec/scan-node.h"
 #include "exprs/expr.h"
-#include "runtime/descriptors.h"
 #include "runtime/data-stream-mgr.h"
+#include "runtime/descriptors.h"
+#include "runtime/mem-tracker.h"
 #include "runtime/row-batch.h"
 #include "runtime/runtime-filter-bank.h"
-#include "runtime/mem-tracker.h"
+#include "util/container-util.h"
 #include "util/cpu-info.h"
 #include "util/debug-util.h"
-#include "util/container-util.h"
-#include "util/parse-util.h"
 #include "util/mem-info.h"
+#include "util/parse-util.h"
 #include "util/periodic-counter-updater.h"
 #include "util/pretty-printer.h"
 
@@ -60,28 +61,45 @@ namespace impala {
 
 const string PlanFragmentExecutor::PER_HOST_PEAK_MEM_COUNTER = "PerHostPeakMemUsage";
 
-PlanFragmentExecutor::PlanFragmentExecutor(ExecEnv* exec_env,
-    const ReportStatusCallback& report_status_cb) :
-    exec_env_(exec_env), plan_(NULL), report_status_cb_(report_status_cb),
-    report_thread_active_(false), done_(false), closed_(false),
-    has_thread_token_(false), is_prepared_(false), is_cancelled_(false),
-    average_thread_tokens_(NULL), mem_usage_sampled_counter_(NULL),
-    thread_usage_sampled_counter_(NULL) {
-}
+PlanFragmentExecutor::PlanFragmentExecutor(
+    ExecEnv* exec_env, const ReportStatusCallback& report_status_cb)
+  : exec_env_(exec_env),
+    exec_tree_(NULL),
+    report_status_cb_(report_status_cb),
+    report_thread_active_(false),
+    closed_(false),
+    has_thread_token_(false),
+    is_prepared_(false),
+    is_cancelled_(false),
+    average_thread_tokens_(NULL),
+    mem_usage_sampled_counter_(NULL),
+    thread_usage_sampled_counter_(NULL) {}
 
 PlanFragmentExecutor::~PlanFragmentExecutor() {
-  Close();
+  DCHECK(!is_prepared_ || closed_);
   // at this point, the report thread should have been stopped
   DCHECK(!report_thread_active_);
 }
 
 Status PlanFragmentExecutor::Prepare(const TExecPlanFragmentParams& request) {
+  Status status = PrepareInternal(request);
+  prepared_promise_.Set(status);
+  return status;
+}
+
+Status PlanFragmentExecutor::WaitForOpen() {
+  DCHECK(prepared_promise_.IsSet()) << "Prepare() must complete before WaitForOpen()";
+  RETURN_IF_ERROR(prepared_promise_.Get());
+  return opened_promise_.Get();
+}
+
+Status PlanFragmentExecutor::PrepareInternal(const TExecPlanFragmentParams& request) {
   lock_guard<mutex> l(prepare_lock_);
   DCHECK(!is_prepared_);
 
   if (is_cancelled_) return Status::CANCELLED;
-
   is_prepared_ = true;
+
   // TODO: Break this method up.
   fragment_sw_.Start();
   const TPlanFragmentInstanceCtx& fragment_instance_ctx = request.fragment_instance_ctx;
@@ -100,6 +118,10 @@ Status PlanFragmentExecutor::Prepare(const TExecPlanFragmentParams& request) {
 
   // total_time_counter() is in the runtime_state_ so start it up now.
   SCOPED_TIMER(profile()->total_time_counter());
+  timings_profile_ =
+      obj_pool()->Add(new RuntimeProfile(obj_pool(), "PlanFragmentExecutor"));
+  profile()->AddChild(timings_profile_);
+  SCOPED_TIMER(ADD_TIMER(timings_profile_, "PrepareTime"));
 
   // reservation or a query option.
   int64_t bytes_limit = -1;
@@ -145,22 +167,22 @@ Status PlanFragmentExecutor::Prepare(const TExecPlanFragmentParams& request) {
 
   // set up plan
   DCHECK(request.__isset.fragment_ctx);
-  RETURN_IF_ERROR(ExecNode::CreateTree(runtime_state_.get(),
-      request.fragment_ctx.fragment.plan, *desc_tbl, &plan_));
-  runtime_state_->set_fragment_root_id(plan_->id());
+  RETURN_IF_ERROR(ExecNode::CreateTree(
+      runtime_state_.get(), request.fragment_ctx.fragment.plan, *desc_tbl, &exec_tree_));
+  runtime_state_->set_fragment_root_id(exec_tree_->id());
 
   if (fragment_instance_ctx.__isset.debug_node_id) {
     DCHECK(fragment_instance_ctx.__isset.debug_action);
     DCHECK(fragment_instance_ctx.__isset.debug_phase);
     ExecNode::SetDebugOptions(fragment_instance_ctx.debug_node_id,
-        fragment_instance_ctx.debug_phase, fragment_instance_ctx.debug_action, plan_);
+        fragment_instance_ctx.debug_phase, fragment_instance_ctx.debug_action,
+        exec_tree_);
   }
 
   // set #senders of exchange nodes before calling Prepare()
   vector<ExecNode*> exch_nodes;
-  plan_->CollectNodes(TPlanNodeType::EXCHANGE_NODE, &exch_nodes);
-  for (ExecNode* exch_node: exch_nodes)
-  {
+  exec_tree_->CollectNodes(TPlanNodeType::EXCHANGE_NODE, &exch_nodes);
+  for (ExecNode* exch_node : exch_nodes) {
     DCHECK_EQ(exch_node->type(), TPlanNodeType::EXCHANGE_NODE);
     int num_senders = FindWithDefault(fragment_instance_ctx.per_exch_num_senders,
         exch_node->id(), 0);
@@ -171,7 +193,7 @@ Status PlanFragmentExecutor::Prepare(const TExecPlanFragmentParams& request) {
   // set scan ranges
   vector<ExecNode*> scan_nodes;
   vector<TScanRangeParams> no_scan_ranges;
-  plan_->CollectScanNodes(&scan_nodes);
+  exec_tree_->CollectScanNodes(&scan_nodes);
   for (int i = 0; i < scan_nodes.size(); ++i) {
     ScanNode* scan_node = static_cast<ScanNode*>(scan_nodes[i]);
     const vector<TScanRangeParams>& scan_ranges = FindWithDefault(
@@ -179,42 +201,47 @@ Status PlanFragmentExecutor::Prepare(const TExecPlanFragmentParams& request) {
     scan_node->SetScanRanges(scan_ranges);
   }
 
-  RuntimeProfile::Counter* prepare_timer = ADD_TIMER(profile(), "PrepareTime");
+  RuntimeProfile::Counter* prepare_timer = ADD_TIMER(profile(), "ExecTreePrepareTime");
   {
     SCOPED_TIMER(prepare_timer);
-    RETURN_IF_ERROR(plan_->Prepare(runtime_state_.get()));
+    RETURN_IF_ERROR(exec_tree_->Prepare(runtime_state_.get()));
   }
 
   PrintVolumeIds(fragment_instance_ctx.per_node_scan_ranges);
 
-  // set up sink, if required
-  if (request.fragment_ctx.fragment.__isset.output_sink) {
-    RETURN_IF_ERROR(DataSink::CreateDataSink(
-        obj_pool(), request.fragment_ctx.fragment.output_sink,
-        request.fragment_ctx.fragment.output_exprs,
-        fragment_instance_ctx, row_desc(), &sink_));
-    sink_mem_tracker_.reset(new MemTracker(
-        -1, sink_->GetName(), runtime_state_->instance_mem_tracker(), true));
-    RETURN_IF_ERROR(sink_->Prepare(runtime_state(), sink_mem_tracker_.get()));
-
-    RuntimeProfile* sink_profile = sink_->profile();
-    if (sink_profile != NULL) {
-      profile()->AddChild(sink_profile);
-    }
-  } else {
-    sink_.reset(NULL);
+  DCHECK(request.fragment_ctx.fragment.__isset.output_sink);
+  RETURN_IF_ERROR(
+      DataSink::CreateDataSink(obj_pool(), request.fragment_ctx.fragment.output_sink,
+          request.fragment_ctx.fragment.output_exprs, fragment_instance_ctx,
+          exec_tree_->row_desc(), &sink_));
+  sink_mem_tracker_.reset(
+      new MemTracker(-1, sink_->GetName(), runtime_state_->instance_mem_tracker(), true));
+  RETURN_IF_ERROR(sink_->Prepare(runtime_state(), sink_mem_tracker_.get()));
+
+  RuntimeProfile* sink_profile = sink_->profile();
+  if (sink_profile != NULL) {
+    profile()->AddChild(sink_profile);
+  }
+
+  if (request.fragment_ctx.fragment.output_sink.type == TDataSinkType::PLAN_ROOT_SINK) {
+    root_sink_ = reinterpret_cast<PlanRootSink*>(sink_.get());
+    // Release the thread token on the root fragment instance. This fragment spends most
+    // of the time waiting and doing very little work. Holding on to the token causes
+    // underutilization of the machine. If there are 12 queries on this node, that's 12
+    // tokens reserved for no reason.
+    ReleaseThreadToken();
   }
 
   // set up profile counters
-  profile()->AddChild(plan_->runtime_profile());
+  profile()->AddChild(exec_tree_->runtime_profile());
   rows_produced_counter_ =
       ADD_COUNTER(profile(), "RowsProduced", TUnit::UNIT);
   per_host_mem_usage_ =
       ADD_COUNTER(profile(), PER_HOST_PEAK_MEM_COUNTER, TUnit::BYTES);
 
-  row_batch_.reset(new RowBatch(plan_->row_desc(), runtime_state_->batch_size(),
-        runtime_state_->instance_mem_tracker()));
-  VLOG(2) << "plan_root=\n" << plan_->DebugString();
+  row_batch_.reset(new RowBatch(exec_tree_->row_desc(), runtime_state_->batch_size(),
+      runtime_state_->instance_mem_tracker()));
+  VLOG(2) << "plan_root=\n" << exec_tree_->DebugString();
   return Status::OK();
 }
 
@@ -251,12 +278,21 @@ void PlanFragmentExecutor::PrintVolumeIds(
 }
 
 Status PlanFragmentExecutor::Open() {
-  VLOG_QUERY << "Open(): instance_id="
-      << runtime_state_->fragment_instance_id();
+  SCOPED_TIMER(profile()->total_time_counter());
+  SCOPED_TIMER(ADD_TIMER(timings_profile_, "OpenTime"));
+  VLOG_QUERY << "Open(): instance_id=" << runtime_state_->fragment_instance_id();
+  Status status = OpenInternal();
+  UpdateStatus(status);
+  opened_promise_.Set(status);
+  return status;
+}
 
-  RETURN_IF_ERROR(runtime_state_->desc_tbl().PrepareAndOpenPartitionExprs(runtime_state_.get()));
+Status PlanFragmentExecutor::OpenInternal() {
+  RETURN_IF_ERROR(
+      runtime_state_->desc_tbl().PrepareAndOpenPartitionExprs(runtime_state_.get()));
 
-  // we need to start the profile-reporting thread before calling Open(), since it
+  // we need to start the profile-reporting thread before calling exec_tree_->Open(),
+  // since it
   // may block
   if (!report_status_cb_.empty() && FLAGS_status_report_interval > 0) {
     unique_lock<mutex> l(report_thread_lock_);
@@ -271,22 +307,25 @@ Status PlanFragmentExecutor::Open() {
 
   OptimizeLlvmModule();
 
-  Status status = OpenInternal();
-  if (sink_.get() != NULL) {
-    // We call Close() here rather than in OpenInternal() because we want to make sure
-    // that Close() gets called even if there was an error in OpenInternal().
-    // We also want to call sink_->Close() here rather than in PlanFragmentExecutor::Close
-    // because we do not want the sink_ to hold on to all its resources as we will never
-    // use it after this.
-    sink_->Close(runtime_state());
-    // If there's a sink and no error, OpenInternal() completed the fragment execution.
-    if (status.ok()) {
-      done_ = true;
-      FragmentComplete();
-    }
+  {
+    SCOPED_TIMER(ADD_TIMER(timings_profile_, "ExecTreeOpenTime"));
+    RETURN_IF_ERROR(exec_tree_->Open(runtime_state_.get()));
   }
+  return sink_->Open(runtime_state_.get());
+}
 
-  if (!status.ok() && !status.IsCancelled() && !status.IsMemLimitExceeded()) {
+Status PlanFragmentExecutor::Exec() {
+  SCOPED_TIMER(ADD_TIMER(timings_profile_, "ExecTime"));
+  {
+    lock_guard<mutex> l(status_lock_);
+    RETURN_IF_ERROR(status_);
+  }
+  Status status = ExecInternal();
+
+  // If there's no error, ExecInternal() completed the fragment instance's execution.
+  if (status.ok()) {
+    FragmentComplete();
+  } else if (!status.IsCancelled() && !status.IsMemLimitExceeded()) {
     // Log error message in addition to returning in Status. Queries that do not
     // fetch results (e.g. insert) may not receive the message directly and can
     // only retrieve the log.
@@ -296,21 +335,23 @@ Status PlanFragmentExecutor::Open() {
   return status;
 }
 
-Status PlanFragmentExecutor::OpenInternal() {
-  SCOPED_TIMER(profile()->total_time_counter());
-  RETURN_IF_ERROR(plan_->Open(runtime_state_.get()));
-  if (sink_.get() == NULL) return Status::OK();
-
-  // If there is a sink, do all the work of driving it here, so that
-  // when this returns the query has actually finished
-  RETURN_IF_ERROR(sink_->Open(runtime_state_.get()));
-  while (!done_) {
+Status PlanFragmentExecutor::ExecInternal() {
+  RuntimeProfile::Counter* plan_exec_timer =
+      ADD_TIMER(timings_profile_, "ExecTreeExecTime");
+  bool exec_tree_complete = false;
+  do {
+    Status status;
     row_batch_->Reset();
-    RETURN_IF_ERROR(plan_->GetNext(runtime_state_.get(), row_batch_.get(), &done_));
-    if (VLOG_ROW_IS_ON) row_batch_->VLogRows("PlanFragmentExecutor::OpenInternal()");
+    {
+      SCOPED_TIMER(plan_exec_timer);
+      status = exec_tree_->GetNext(
+          runtime_state_.get(), row_batch_.get(), &exec_tree_complete);
+    }
+    if (VLOG_ROW_IS_ON) row_batch_->VLogRows("PlanFragmentExecutor::ExecInternal()");
     COUNTER_ADD(rows_produced_counter_, row_batch_->num_rows());
+    RETURN_IF_ERROR(status);
     RETURN_IF_ERROR(sink_->Send(runtime_state(), row_batch_.get()));
-  }
+  } while (!exec_tree_complete);
 
   // Flush the sink *before* stopping the report thread. Flush may need to add some
   // important information to the last report that gets sent. (e.g. table sinks record the
@@ -376,13 +417,20 @@ void PlanFragmentExecutor::SendReport(bool done) {
     status = status_;
   }
 
+  // If status is not OK, we need to make sure that only one sender sends a 'done'
+  // response.
+  // TODO: Clean all this up - move 'done' reporting to Close()?
+  if (!done && !status.ok()) {
+    done = completed_report_sent_.CompareAndSwap(0, 1);
+  }
+
   // Update the counter for the peak per host mem usage.
   per_host_mem_usage_->Set(runtime_state()->query_mem_tracker()->peak_consumption());
 
   // This will send a report even if we are cancelled.  If the query completed correctly
   // but fragments still need to be cancelled (e.g. limit reached), the coordinator will
   // be waiting for a final report and profile.
-  report_status_cb_(status, profile(), done || !status.ok());
+  report_status_cb_(status, profile(), done);
 }
 
 void PlanFragmentExecutor::StopReportThread() {
@@ -395,36 +443,6 @@ void PlanFragmentExecutor::StopReportThread() {
   report_thread_->Join();
 }
 
-Status PlanFragmentExecutor::GetNext(RowBatch** batch) {
-  SCOPED_TIMER(profile()->total_time_counter());
-  VLOG_FILE << "GetNext(): instance_id=" << runtime_state_->fragment_instance_id();
-
-  Status status = Status::OK();
-  row_batch_->Reset();
-  // Loop until we've got a non-empty batch, hit an error or exhausted the input.
-  while (!done_) {
-    status = plan_->GetNext(runtime_state_.get(), row_batch_.get(), &done_);
-    if (VLOG_ROW_IS_ON) row_batch_->VLogRows("PlanFragmentExecutor::GetNext()");
-    if (!status.ok()) break;
-    if (row_batch_->num_rows() > 0) break;
-    row_batch_->Reset();
-  }
-  UpdateStatus(status);
-  COUNTER_ADD(rows_produced_counter_, row_batch_->num_rows());
-
-  if (done_) {
-    VLOG_QUERY << "Finished executing fragment query_id=" << PrintId(query_id_)
-        << " instance_id=" << PrintId(runtime_state_->fragment_instance_id());
-    FragmentComplete();
-    // Once all rows are returned, signal that we're done with an empty batch.
-    *batch = row_batch_->num_rows() == 0 ? NULL : row_batch_.get();
-    return status;
-  }
-
-  *batch = row_batch_.get();
-  return status;
-}
-
 void PlanFragmentExecutor::FragmentComplete() {
   // Check the atomic flag. If it is set, then a fragment complete report has already
   // been sent.
@@ -463,7 +481,7 @@ void PlanFragmentExecutor::UpdateStatus(const Status& status) {
 }
 
 void PlanFragmentExecutor::Cancel() {
-  VLOG_QUERY << "Cancelling plan fragment...";
+  VLOG_QUERY << "Cancelling fragment instance...";
   lock_guard<mutex> l(prepare_lock_);
   is_cancelled_ = true;
   if (!is_prepared_) {
@@ -476,18 +494,10 @@ void PlanFragmentExecutor::Cancel() {
   runtime_state_->stream_mgr()->Cancel(runtime_state_->fragment_instance_id());
 }
 
-const RowDescriptor& PlanFragmentExecutor::row_desc() {
-  return plan_->row_desc();
-}
-
 RuntimeProfile* PlanFragmentExecutor::profile() {
   return runtime_state_->runtime_profile();
 }
 
-bool PlanFragmentExecutor::ReachedLimit() {
-  return plan_->ReachedLimit();
-}
-
 void PlanFragmentExecutor::ReleaseThreadToken() {
   if (has_thread_token_) {
     has_thread_token_ = false;
@@ -500,19 +510,23 @@ void PlanFragmentExecutor::ReleaseThreadToken() {
 
 void PlanFragmentExecutor::Close() {
   if (closed_) return;
+  if (!is_prepared_) return;
+  if (sink_.get() != nullptr) sink_->Close(runtime_state());
+
   row_batch_.reset();
   if (sink_mem_tracker_ != NULL) {
     sink_mem_tracker_->UnregisterFromParent();
     sink_mem_tracker_.reset();
   }
-  // Prepare may not have been called, which sets runtime_state_
-  if (runtime_state_.get() != NULL) {
-    if (plan_ != NULL) plan_->Close(runtime_state_.get());
-    runtime_state_->UnregisterReaderContexts();
-    exec_env_->thread_mgr()->UnregisterPool(runtime_state_->resource_pool());
-    runtime_state_->desc_tbl().ClosePartitionExprs(runtime_state_.get());
-    runtime_state_->filter_bank()->Close();
-  }
+
+  // Prepare should always have been called, and so runtime_state_ should be set
+  DCHECK(prepared_promise_.IsSet());
+  if (exec_tree_ != NULL) exec_tree_->Close(runtime_state_.get());
+  runtime_state_->UnregisterReaderContexts();
+  exec_env_->thread_mgr()->UnregisterPool(runtime_state_->resource_pool());
+  runtime_state_->desc_tbl().ClosePartitionExprs(runtime_state_.get());
+  runtime_state_->filter_bank()->Close();
+
   if (mem_usage_sampled_counter_ != NULL) {
     PeriodicCounterUpdater::StopTimeSeriesCounter(mem_usage_sampled_counter_);
     mem_usage_sampled_counter_ = NULL;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f61397f/be/src/runtime/plan-fragment-executor.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/plan-fragment-executor.h b/be/src/runtime/plan-fragment-executor.h
index f4355ea..82d3001 100644
--- a/be/src/runtime/plan-fragment-executor.h
+++ b/be/src/runtime/plan-fragment-executor.h
@@ -23,9 +23,10 @@
 #include <boost/scoped_ptr.hpp>
 #include <boost/function.hpp>
 
-#include "common/status.h"
 #include "common/object-pool.h"
+#include "common/status.h"
 #include "runtime/runtime-state.h"
+#include "util/promise.h"
 #include "util/runtime-profile-counters.h"
 #include "util/thread.h"
 
@@ -33,6 +34,7 @@ namespace impala {
 
 class HdfsFsCache;
 class ExecNode;
+class PlanRootSink;
 class RowDescriptor;
 class RowBatch;
 class DataSink;
@@ -45,21 +47,28 @@ class TPlanFragment;
 class TPlanExecParams;
 
 /// PlanFragmentExecutor handles all aspects of the execution of a single plan fragment,
-/// including setup and tear-down, both in the success and error case.
-/// Tear-down frees all memory allocated for this plan fragment and closes all data
-/// streams; it happens automatically in the d'tor.
-//
-/// The executor makes an aggregated profile for the entire fragment available,
-/// which includes profile information for the plan itself as well as the output
-/// sink, if any.
+/// including setup and tear-down, both in the success and error case. Tear-down, which
+/// happens in Close(), frees all memory allocated for this plan fragment and closes all
+/// data streams.
+///
+/// The lifecycle of a PlanFragmentExecutor is as follows:
+///     if (Prepare().ok()) {
+///       Open()
+///       Exec()
+///     }
+///     Close()
+///
+/// The executor makes an aggregated profile for the entire fragment available, which
+/// includes profile information for the plan itself as well as the output sink.
+///
 /// The ReportStatusCallback passed into the c'tor is invoked periodically to report the
 /// execution status. The frequency of those reports is controlled by the flag
 /// status_report_interval; setting that flag to 0 disables periodic reporting altogether
-/// Regardless of the value of that flag, if a report callback is specified, it is
-/// invoked at least once at the end of execution with an overall status and profile
-/// (and 'done' indicator). The only exception is when execution is cancelled, in which
-/// case the callback is *not* invoked (the coordinator already knows that execution
-/// stopped, because it initiated the cancellation).
+/// Regardless of the value of that flag, if a report callback is specified, it is invoked
+/// at least once at the end of execution with an overall status and profile (and 'done'
+/// indicator). The only exception is when execution is cancelled, in which case the
+/// callback is *not* invoked (the coordinator already knows that execution stopped,
+/// because it initiated the cancellation).
 //
 /// Aside from Cancel(), which may be called asynchronously, this class is not
 /// thread-safe.
@@ -76,49 +85,37 @@ class PlanFragmentExecutor {
       ReportStatusCallback;
 
   /// report_status_cb, if !empty(), is used to report the accumulated profile
-  /// information periodically during execution (Open() or GetNext()).
+  /// information periodically during execution.
   PlanFragmentExecutor(ExecEnv* exec_env, const ReportStatusCallback& report_status_cb);
 
-  /// Closes the underlying plan fragment and frees up all resources allocated
-  /// in Open()/GetNext().
-  /// It is an error to delete a PlanFragmentExecutor with a report callback
-  /// before Open()/GetNext() (depending on whether the fragment has a sink)
-  /// indicated that execution is finished.
+  /// It is an error to delete a PlanFragmentExecutor with a report callback before Exec()
+  /// indicated that execution is finished, or to delete one that has not been Close()'d
+  /// if Prepare() has been called.
   ~PlanFragmentExecutor();
 
   /// Prepare for execution. Call this prior to Open().
   ///
-  /// runtime_state() and row_desc() will not be valid until Prepare() is
-  /// called. runtime_state() will always be valid after Prepare() returns, unless the
-  /// query was cancelled before Prepare() was called.  If request.query_options.mem_limit
-  /// > 0, it is used as an approximate limit on the number of bytes this query can
-  /// consume at runtime.  The query will be aborted (MEM_LIMIT_EXCEEDED) if it goes over
-  /// that limit.
+  /// runtime_state() will not be valid until Prepare() is called. runtime_state() will
+  /// always be valid after Prepare() returns, unless the query was cancelled before
+  /// Prepare() was called.  If request.query_options.mem_limit > 0, it is used as an
+  /// approximate limit on the number of bytes this query can consume at runtime.  The
+  /// query will be aborted (MEM_LIMIT_EXCEEDED) if it goes over that limit.
   ///
   /// If Cancel() is called before Prepare(), Prepare() is a no-op and returns
   /// Status::CANCELLED;
   Status Prepare(const TExecPlanFragmentParams& request);
 
-  /// Start execution. Call this prior to GetNext().
-  /// If this fragment has a sink, Open() will send all rows produced
-  /// by the fragment to that sink. Therefore, Open() may block until
-  /// all rows are produced (and a subsequent call to GetNext() will not return
-  /// any rows).
-  /// This also starts the status-reporting thread, if the interval flag
-  /// is > 0 and a callback was specified in the c'tor.
-  /// If this fragment has a sink, report_status_cb will have been called for the final
-  /// time when Open() returns, and the status-reporting thread will have been stopped.
+  /// Opens the fragment plan and sink. Starts the profile reporting thread, if required.
   Status Open();
 
-  /// Return results through 'batch'. Sets '*batch' to NULL if no more results.
-  /// '*batch' is owned by PlanFragmentExecutor and must not be deleted.
-  /// When *batch == NULL, GetNext() should not be called anymore. Also, report_status_cb
-  /// will have been called for the final time and the status-reporting thread
-  /// will have been stopped.
-  Status GetNext(RowBatch** batch);
+  /// Executes the fragment by repeatedly driving the sink with batches produced by the
+  /// exec node tree. report_status_cb will have been called for the final time when
+  /// Exec() returns, and the status-reporting thread will have been stopped.
+  Status Exec();
 
-  /// Closes the underlying plan fragment and frees up all resources allocated
-  /// in Open()/GetNext().
+  /// Closes the underlying plan fragment and frees up all resources allocated in
+  /// Prepare() and Open(). Must be called if Prepare() has been called - no matter
+  /// whether or not Prepare() succeeded.
   void Close();
 
   /// Initiate cancellation. If called concurrently with Prepare(), will wait for
@@ -131,25 +128,30 @@ class PlanFragmentExecutor {
   /// It is legal to call Cancel() if Prepare() returned an error.
   void Cancel();
 
-  /// Returns true if this query has a limit and it has been reached.
-  bool ReachedLimit();
-
-  /// Releases the thread token for this fragment executor.
-  void ReleaseThreadToken();
-
   /// call these only after Prepare()
   RuntimeState* runtime_state() { return runtime_state_.get(); }
-  const RowDescriptor& row_desc();
 
   /// Profile information for plan and output sink.
   RuntimeProfile* profile();
 
+  /// Blocks until Prepare() is completed.
+  Status WaitForPrepare() { return prepared_promise_.Get(); }
+
+  /// Blocks until exec tree and sink are both opened. It is an error to call this before
+  /// Prepare() has completed. If Prepare() returned an error, WaitForOpen() will
+  /// return that error without blocking.
+  Status WaitForOpen();
+
+  /// Returns fragment instance's sink if this is the root fragment instance. Valid after
+  /// Prepare() returns; if Prepare() fails may be nullptr.
+  PlanRootSink* root_sink() { return root_sink_; }
+
   /// Name of the counter that is tracking per query, per host peak mem usage.
   static const std::string PER_HOST_PEAK_MEM_COUNTER;
 
  private:
   ExecEnv* exec_env_;  // not owned
-  ExecNode* plan_;  // lives in runtime_state_->obj_pool()
+  ExecNode* exec_tree_; // lives in runtime_state_->obj_pool()
   TUniqueId query_id_;
 
   /// profile reporting-related
@@ -166,9 +168,6 @@ class PlanFragmentExecutor {
   boost::condition_variable report_thread_started_cv_;
   bool report_thread_active_;  // true if we started the thread
 
-  /// true if plan_->GetNext() indicated that it's done
-  bool done_;
-
   /// true if Close() has been called
   bool closed_;
 
@@ -190,14 +189,20 @@ class PlanFragmentExecutor {
   /// (e.g. mem_trackers_) from 'runtime_state_' to 'sink_' need to be severed prior to
   /// the dtor of 'runtime_state_'.
   boost::scoped_ptr<RuntimeState> runtime_state_;
-  /// Output sink for rows sent to this fragment. May not be set, in which case rows are
-  /// returned via GetNext's row batch
-  /// Created in Prepare (if required), owned by this object.
+
+  /// Profile for timings for each stage of the plan fragment instance's lifecycle.
+  RuntimeProfile* timings_profile_;
+
+  /// Output sink for rows sent to this fragment. Created in Prepare(), owned by this
+  /// object.
   boost::scoped_ptr<DataSink> sink_;
   boost::scoped_ptr<MemTracker> sink_mem_tracker_;
 
+  /// Set if this fragment instance is the root of the entire plan, so that a consumer can
+  /// pull results by calling root_sink_->GetNext(). Same object as sink_.
+  PlanRootSink* root_sink_ = nullptr;
+
   boost::scoped_ptr<RowBatch> row_batch_;
-  boost::scoped_ptr<TRowBatch> thrift_batch_;
 
   /// Protects is_prepared_ and is_cancelled_, and is also used to coordinate between
   /// Prepare() and Cancel() to ensure mutual exclusion.
@@ -207,6 +212,12 @@ class PlanFragmentExecutor {
   /// error. If Cancel() was called before Prepare(), is_prepared_ will not be set.
   bool is_prepared_;
 
+  /// Set when Prepare() returns.
+  Promise<Status> prepared_promise_;
+
+  /// Set when OpenInternal() returns.
+  Promise<Status> opened_promise_;
+
   /// True if and only if Cancel() has been called.
   bool is_cancelled_;
 
@@ -267,21 +278,25 @@ class PlanFragmentExecutor {
   void FragmentComplete();
 
   /// Optimizes the code-generated functions in runtime_state_->llvm_codegen().
-  /// Must be called between plan_->Prepare() and plan_->Open().
-  /// This is somewhat time consuming so we don't want it to do it in
-  /// PlanFragmentExecutor()::Prepare() to allow starting plan fragments more
-  /// quickly and in parallel (in a deep plan tree, the fragments are started
-  /// in level order).
+  /// Must be called after exec_tree_->Prepare() and before exec_tree_->Open().
   void OptimizeLlvmModule();
 
   /// Executes Open() logic and returns resulting status. Does not set status_.
-  /// If this plan fragment has no sink, OpenInternal() does nothing.
-  /// If this plan fragment has a sink and OpenInternal() returns without an
-  /// error condition, all rows will have been sent to the sink, the sink will
-  /// have been closed, a final report will have been sent and the report thread will
-  /// have been stopped. sink_ will be set to NULL after successful execution.
   Status OpenInternal();
 
+  /// Pulls row batches from fragment instance and pushes them to sink_ in a loop. Returns
+  /// OK if the input was exhausted and sent to the sink successfully, an error otherwise.
+  /// If ExecInternal() returns without an error condition, all rows will have been sent
+  /// to the sink, the sink will have been closed, a final report will have been sent and
+  /// the report thread will have been stopped.
+  Status ExecInternal();
+
+  /// Performs all the logic of Prepare() and returns resulting status.
+  Status PrepareInternal(const TExecPlanFragmentParams& request);
+
+  /// Releases the thread token for this fragment executor.
+  void ReleaseThreadToken();
+
   /// Stops report thread, if one is running. Blocks until report thread terminates.
   /// Idempotent.
   void StopReportThread();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f61397f/be/src/scheduling/query-schedule.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/query-schedule.cc b/be/src/scheduling/query-schedule.cc
index 5ad84df..1eb36e3 100644
--- a/be/src/scheduling/query-schedule.cc
+++ b/be/src/scheduling/query-schedule.cc
@@ -198,36 +198,28 @@ const TPlanFragment& FInstanceExecParams::fragment() const {
 }
 
 int QuerySchedule::GetNumFragmentInstances() const {
-  if (mt_fragment_exec_params_.empty()) return num_fragment_instances_;
   int result = 0;
-  for (const MtFragmentExecParams& fragment_exec_params: mt_fragment_exec_params_) {
-    result += fragment_exec_params.instance_exec_params.size();
+  if (mt_fragment_exec_params_.empty()) {
+    DCHECK(!fragment_exec_params_.empty());
+    for (const FragmentExecParams& fragment_exec_params : fragment_exec_params_) {
+      result += fragment_exec_params.hosts.size();
+    }
+  } else {
+    for (const MtFragmentExecParams& fragment_exec_params : mt_fragment_exec_params_) {
+      result += fragment_exec_params.instance_exec_params.size();
+    }
   }
   return result;
 }
 
-int QuerySchedule::GetNumRemoteFInstances() const {
-  bool has_coordinator_fragment = GetCoordFragment() != nullptr;
-  int result = GetNumFragmentInstances();
-  bool is_mt_execution = request_.query_ctx.request.query_options.mt_dop > 0;
-  if (is_mt_execution && has_coordinator_fragment) --result;
-  return result;
-}
-
-int QuerySchedule::GetTotalFInstances() const {
-  int result = GetNumRemoteFInstances();
-  return GetCoordFragment() != nullptr ? result + 1 : result;
-}
-
 const TPlanFragment* QuerySchedule::GetCoordFragment() const {
+  // Only have coordinator fragment for statements that return rows.
+  if (request_.stmt_type != TStmtType::QUERY) return nullptr;
   bool is_mt_exec = request_.query_ctx.request.query_options.mt_dop > 0;
   const TPlanFragment* fragment = is_mt_exec
       ? &request_.mt_plan_exec_info[0].fragments[0] : &request_.fragments[0];
-  if (fragment->partition.type == TPartitionType::UNPARTITIONED) {
+
     return fragment;
-  } else {
-    return nullptr;
-  }
 }
 
 void QuerySchedule::GetTPlanFragments(vector<const TPlanFragment*>* fragments) const {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f61397f/be/src/scheduling/query-schedule.h
----------------------------------------------------------------------
diff --git a/be/src/scheduling/query-schedule.h b/be/src/scheduling/query-schedule.h
index 39ce268..77c9cd6 100644
--- a/be/src/scheduling/query-schedule.h
+++ b/be/src/scheduling/query-schedule.h
@@ -140,34 +140,9 @@ class QuerySchedule {
   /// Helper methods used by scheduler to populate this QuerySchedule.
   void IncNumScanRanges(int64_t delta) { num_scan_ranges_ += delta; }
 
-  /// The following 4 functions need to be replaced once we stop special-casing
-  /// the coordinator instance in the coordinator.
-  /// The replacement is a single function int GetNumFInstances() (which includes
-  /// the coordinator instance).
-
-  /// TODO-MT: remove; this is actually only the number of remote instances
-  /// (from the coordinator's perspective)
-  void set_num_fragment_instances(int64_t num_fragment_instances) {
-    num_fragment_instances_ = num_fragment_instances;
-  }
-
-  /// Returns the number of fragment instances registered with this schedule.
-  /// MT: total number of fragment instances
-  /// ST: value set with set_num_fragment_instances(); excludes coord instance
-  /// (in effect the number of remote instances)
-  /// TODO-MT: get rid of special-casing of coordinator instance and always return the
-  /// total
+  /// Returns the total number of fragment instances.
   int GetNumFragmentInstances() const;
 
-  /// Returns the total number of fragment instances, incl. coordinator fragment.
-  /// TODO-MT: remove
-  int GetTotalFInstances() const;
-
-  /// Returns the number of remote fragment instances (excludes coordinator).
-  /// Works for both MT and ST.
-  /// TODO-MT: remove
-  int GetNumRemoteFInstances() const;
-
   /// Return the coordinator fragment, or nullptr if there isn't one.
   const TPlanFragment* GetCoordFragment() const;
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f61397f/be/src/scheduling/simple-scheduler.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/simple-scheduler.cc b/be/src/scheduling/simple-scheduler.cc
index 5b6303e..9b52d5a 100644
--- a/be/src/scheduling/simple-scheduler.cc
+++ b/be/src/scheduling/simple-scheduler.cc
@@ -663,11 +663,6 @@ void SimpleScheduler::ComputeFragmentExecParams(const TQueryExecRequest& exec_re
           CreateInstanceId(schedule->query_id(), num_fragment_instances));
     }
   }
-  if (exec_request.fragments[0].partition.type == TPartitionType::UNPARTITIONED) {
-    // the root fragment is executed directly by the coordinator
-    --num_fragment_instances;
-  }
-  schedule->set_num_fragment_instances(num_fragment_instances);
 
   // compute destinations and # senders per exchange node
   // (the root fragment doesn't have a destination)

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f61397f/be/src/service/fragment-exec-state.cc
----------------------------------------------------------------------
diff --git a/be/src/service/fragment-exec-state.cc b/be/src/service/fragment-exec-state.cc
index 76e11d1..cc56c19 100644
--- a/be/src/service/fragment-exec-state.cc
+++ b/be/src/service/fragment-exec-state.cc
@@ -54,8 +54,10 @@ Status FragmentMgr::FragmentExecState::Prepare() {
 }
 
 void FragmentMgr::FragmentExecState::Exec() {
-  // Open() does the full execution, because all plan fragments have sinks
-  executor_.Open();
+  if (Prepare().ok()) {
+    executor_.Open();
+    executor_.Exec();
+  }
   executor_.Close();
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f61397f/be/src/service/fragment-exec-state.h
----------------------------------------------------------------------
diff --git a/be/src/service/fragment-exec-state.h b/be/src/service/fragment-exec-state.h
index 6cff7ce..c795cd8 100644
--- a/be/src/service/fragment-exec-state.h
+++ b/be/src/service/fragment-exec-state.h
@@ -47,9 +47,6 @@ class FragmentMgr::FragmentExecState {
   /// the fragment and returns OK.
   Status Cancel();
 
-  /// Call Prepare() and create and initialize data sink.
-  Status Prepare();
-
   /// Main loop of plan fragment execution. Blocks until execution finishes.
   void Exec();
 
@@ -67,6 +64,8 @@ class FragmentMgr::FragmentExecState {
   /// Publishes filter with ID 'filter_id' to this fragment's filter bank.
   void PublishFilter(int32_t filter_id, const TBloomFilter& thrift_bloom_filter);
 
+  PlanFragmentExecutor* executor() { return &executor_; }
+
  private:
   TQueryCtx query_ctx_;
   TPlanFragmentInstanceCtx fragment_instance_ctx_;
@@ -98,6 +97,9 @@ class FragmentMgr::FragmentExecState {
   /// the reporting RPC. `profile` may be NULL if a runtime profile has not been created
   /// for this fragment (e.g. when the fragment has failed during preparation).
   void ReportStatusCb(const Status& status, RuntimeProfile* profile, bool done);
+
+  /// Call Prepare() and create and initialize data sink.
+  Status Prepare();
 };
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f61397f/be/src/service/fragment-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/service/fragment-mgr.cc b/be/src/service/fragment-mgr.cc
index 64e9a78..8e8fc05 100644
--- a/be/src/service/fragment-mgr.cc
+++ b/be/src/service/fragment-mgr.cc
@@ -54,9 +54,6 @@ Status FragmentMgr::ExecPlanFragment(const TExecPlanFragmentParams& exec_params)
     return process_mem_tracker->MemLimitExceeded(NULL, msg, 0);
   }
 
-  // Remote fragments must always have a sink. Remove when IMPALA-2905 is resolved.
-  DCHECK(exec_params.fragment_ctx.fragment.__isset.output_sink);
-
   shared_ptr<FragmentExecState> exec_state(
       new FragmentExecState(exec_params, ExecEnv::GetInstance()));
 
@@ -64,6 +61,8 @@ Status FragmentMgr::ExecPlanFragment(const TExecPlanFragmentParams& exec_params)
   // only happen after this RPC returns) can always find this fragment.
   {
     lock_guard<SpinLock> l(fragment_exec_state_map_lock_);
+    DCHECK(fragment_exec_state_map_.find(exec_state->fragment_instance_id())
+        == fragment_exec_state_map_.end());
     fragment_exec_state_map_.insert(
         make_pair(exec_state->fragment_instance_id(), exec_state));
   }
@@ -84,8 +83,7 @@ Status FragmentMgr::ExecPlanFragment(const TExecPlanFragmentParams& exec_params)
 void FragmentMgr::FragmentThread(TUniqueId fragment_instance_id) {
   shared_ptr<FragmentExecState> exec_state = GetFragmentExecState(fragment_instance_id);
   if (exec_state.get() == NULL) return;
-  Status status = exec_state->Prepare();
-  if (status.ok()) exec_state->Exec();
+  exec_state->Exec();
 
   // We're done with this plan fragment
   {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f61397f/be/src/service/impala-beeswax-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-beeswax-server.cc b/be/src/service/impala-beeswax-server.cc
index 3daa36b..ee7f958 100644
--- a/be/src/service/impala-beeswax-server.cc
+++ b/be/src/service/impala-beeswax-server.cc
@@ -17,55 +17,19 @@
 
 #include "service/impala-server.h"
 
-#include <algorithm>
 #include <boost/algorithm/string/join.hpp>
-#include <boost/date_time/posix_time/posix_time_types.hpp>
-#include <boost/unordered_set.hpp>
-#include <jni.h>
-#include <thrift/protocol/TDebugProtocol.h>
-#include <gtest/gtest.h>
-#include <boost/bind.hpp>
-#include <boost/algorithm/string.hpp>
-#include <gperftools/heap-profiler.h>
-#include <gperftools/malloc_extension.h>
-
-#include "codegen/llvm-codegen.h"
+
 #include "common/logging.h"
-#include "common/version.h"
-#include "exec/exec-node.h"
-#include "exec/hdfs-table-sink.h"
-#include "exec/scan-node.h"
-#include "exprs/expr.h"
-#include "runtime/data-stream-mgr.h"
-#include "runtime/client-cache.h"
-#include "runtime/descriptors.h"
-#include "runtime/data-stream-sender.h"
-#include "runtime/row-batch.h"
-#include "runtime/plan-fragment-executor.h"
-#include "runtime/hdfs-fs-cache.h"
+#include "gen-cpp/Frontend_types.h"
+#include "rpc/thrift-util.h"
 #include "runtime/exec-env.h"
-#include "runtime/mem-tracker.h"
 #include "runtime/raw-value.inline.h"
 #include "runtime/timestamp-value.h"
-#include "scheduling/simple-scheduler.h"
 #include "service/query-exec-state.h"
 #include "service/query-options.h"
-#include "util/container-util.h"
-#include "util/debug-util.h"
+#include "service/query-result-set.h"
 #include "util/impalad-metrics.h"
-#include "util/string-parser.h"
-#include "rpc/thrift-util.h"
-#include "rpc/thrift-server.h"
-#include "util/jni-util.h"
 #include "util/webserver.h"
-#include "gen-cpp/Types_types.h"
-#include "gen-cpp/ImpalaService.h"
-#include "gen-cpp/DataSinks_types.h"
-#include "gen-cpp/Types_types.h"
-#include "gen-cpp/ImpalaService.h"
-#include "gen-cpp/ImpalaService_types.h"
-#include "gen-cpp/ImpalaInternalService.h"
-#include "gen-cpp/Frontend_types.h"
 
 #include "common/names.h"
 
@@ -83,11 +47,17 @@ using namespace beeswax;
     }                                                           \
   } while (false)
 
+namespace {
+
+/// Ascii output precision for double/float
+constexpr int ASCII_PRECISION = 16;
+}
+
 namespace impala {
 
 // Ascii result set for Beeswax.
 // Beeswax returns rows in ascii, using "\t" as column delimiter.
-class ImpalaServer::AsciiQueryResultSet : public ImpalaServer::QueryResultSet {
+class AsciiQueryResultSet : public QueryResultSet {
  public:
   // Rows are added into rowset.
   AsciiQueryResultSet(const TResultSetMetadata& metadata, vector<string>* rowset)

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f61397f/be/src/service/impala-hs2-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-hs2-server.cc b/be/src/service/impala-hs2-server.cc
index ee79b4b..de0e2f3 100644
--- a/be/src/service/impala-hs2-server.cc
+++ b/be/src/service/impala-hs2-server.cc
@@ -36,13 +36,14 @@
 #include "exprs/expr.h"
 #include "rpc/thrift-util.h"
 #include "runtime/raw-value.h"
+#include "service/hs2-util.h"
 #include "service/query-exec-state.h"
 #include "service/query-options.h"
+#include "service/query-result-set.h"
 #include "util/debug-util.h"
-#include "util/runtime-profile-counters.h"
 #include "util/impalad-metrics.h"
+#include "util/runtime-profile-counters.h"
 #include "util/string-parser.h"
-#include "service/hs2-util.h"
 
 #include "common/names.h"
 
@@ -129,7 +130,7 @@ static TOperationState::type QueryStateToTOperationState(
 
 // Result set container for Hive protocol versions >= V6, where results are returned in
 // column-orientation.
-class ImpalaServer::HS2ColumnarResultSet : public ImpalaServer::QueryResultSet {
+class HS2ColumnarResultSet : public QueryResultSet {
  public:
   HS2ColumnarResultSet(const TResultSetMetadata& metadata, TRowSet* rowset = NULL)
       : metadata_(metadata), result_set_(rowset), num_rows_(0) {
@@ -317,7 +318,7 @@ class ImpalaServer::HS2ColumnarResultSet : public ImpalaServer::QueryResultSet {
 };
 
 // TRow result set for HiveServer2
-class ImpalaServer::HS2RowOrientedResultSet : public ImpalaServer::QueryResultSet {
+class HS2RowOrientedResultSet : public QueryResultSet {
  public:
   // Rows are added into rowset.
   HS2RowOrientedResultSet(const TResultSetMetadata& metadata, TRowSet* rowset = NULL)
@@ -393,16 +394,6 @@ class ImpalaServer::HS2RowOrientedResultSet : public ImpalaServer::QueryResultSe
   scoped_ptr<TRowSet> owned_result_set_;
 };
 
-ImpalaServer::QueryResultSet* ImpalaServer::CreateHS2ResultSet(
-    TProtocolVersion::type version, const TResultSetMetadata& metadata,
-    TRowSet* rowset) {
-  if (version < TProtocolVersion::HIVE_CLI_SERVICE_PROTOCOL_V6) {
-    return new HS2RowOrientedResultSet(metadata, rowset);
-  } else {
-    return new HS2ColumnarResultSet(metadata, rowset);
-  }
-}
-
 void ImpalaServer::ExecuteMetadataOp(const THandleIdentifier& session_handle,
     TMetadataOpRequest* request, TOperationHandle* handle, thrift::TStatus* status) {
   TUniqueId session_id;
@@ -482,6 +473,18 @@ void ImpalaServer::ExecuteMetadataOp(const THandleIdentifier& session_handle,
   status->__set_statusCode(thrift::TStatusCode::SUCCESS_STATUS);
 }
 
+namespace {
+
+QueryResultSet* CreateHS2ResultSet(
+    TProtocolVersion::type version, const TResultSetMetadata& metadata, TRowSet* rowset) {
+  if (version < TProtocolVersion::HIVE_CLI_SERVICE_PROTOCOL_V6) {
+    return new HS2RowOrientedResultSet(metadata, rowset);
+  } else {
+    return new HS2ColumnarResultSet(metadata, rowset);
+  }
+}
+}
+
 Status ImpalaServer::FetchInternal(const TUniqueId& query_id, int32_t fetch_size,
     bool fetch_first, TFetchResultsResp* fetch_results) {
   shared_ptr<QueryExecState> exec_state = GetQueryExecState(query_id, false);
@@ -759,8 +762,9 @@ void ImpalaServer::ExecuteStatement(TExecuteStatementResp& return_val,
 
   // Optionally enable result caching on the QueryExecState.
   if (cache_num_rows > 0) {
-    status = exec_state->SetResultCache(CreateHS2ResultSet(session->hs2_version,
-            *exec_state->result_metadata()), cache_num_rows);
+    status = exec_state->SetResultCache(
+        CreateHS2ResultSet(session->hs2_version, *exec_state->result_metadata(), nullptr),
+        cache_num_rows);
     if (!status.ok()) {
       UnregisterQuery(exec_state->query_id(), false, &status);
       HS2_RETURN_ERROR(return_val, status.GetDetail(), SQLSTATE_GENERAL_ERROR);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f61397f/be/src/service/impala-internal-service.h
----------------------------------------------------------------------
diff --git a/be/src/service/impala-internal-service.h b/be/src/service/impala-internal-service.h
index a238f65..af54c35 100644
--- a/be/src/service/impala-internal-service.h
+++ b/be/src/service/impala-internal-service.h
@@ -18,8 +18,6 @@
 #ifndef IMPALA_SERVICE_IMPALA_INTERNAL_SERVICE_H
 #define IMPALA_SERVICE_IMPALA_INTERNAL_SERVICE_H
 
-#include <boost/shared_ptr.hpp>
-
 #include "gen-cpp/ImpalaInternalService.h"
 #include "gen-cpp/ImpalaInternalService_types.h"
 #include "service/impala-server.h"
@@ -32,9 +30,12 @@ namespace impala {
 /// ImpalaInternalService service.
 class ImpalaInternalService : public ImpalaInternalServiceIf {
  public:
-  ImpalaInternalService(const boost::shared_ptr<ImpalaServer>& impala_server,
-      const boost::shared_ptr<FragmentMgr>& fragment_mgr)
-      : impala_server_(impala_server), fragment_mgr_(fragment_mgr) { }
+  ImpalaInternalService() {
+    impala_server_ = ExecEnv::GetInstance()->impala_server();
+    DCHECK(impala_server_ != nullptr);
+    fragment_mgr_ = ExecEnv::GetInstance()->fragment_mgr();
+    DCHECK(fragment_mgr_ != nullptr);
+  }
 
   virtual void ExecPlanFragment(TExecPlanFragmentResult& return_val,
       const TExecPlanFragmentParams& params) {
@@ -74,10 +75,10 @@ class ImpalaInternalService : public ImpalaInternalServiceIf {
 
  private:
   /// Manages fragment reporting and data transmission
-  boost::shared_ptr<ImpalaServer> impala_server_;
+  ImpalaServer* impala_server_;
 
   /// Manages fragment execution
-  boost::shared_ptr<FragmentMgr> fragment_mgr_;
+  FragmentMgr* fragment_mgr_;
 };
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f61397f/be/src/service/impala-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 7f9d862..bf83eec 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -206,7 +206,6 @@ const string HS2_SERVER_NAME = "hiveserver2-frontend";
 const char* ImpalaServer::SQLSTATE_SYNTAX_ERROR_OR_ACCESS_VIOLATION = "42000";
 const char* ImpalaServer::SQLSTATE_GENERAL_ERROR = "HY000";
 const char* ImpalaServer::SQLSTATE_OPTIONAL_FEATURE_NOT_IMPLEMENTED = "HYC00";
-const int ImpalaServer::ASCII_PRECISION = 16; // print 16 digits for double/float
 
 const int MAX_NM_MISSED_HEARTBEATS = 5;
 
@@ -1866,9 +1865,7 @@ Status CreateImpalaServer(ExecEnv* exec_env, int beeswax_port, int hs2_port, int
   }
 
   if (be_port != 0 && be_server != NULL) {
-    boost::shared_ptr<FragmentMgr> fragment_mgr(new FragmentMgr());
-    boost::shared_ptr<ImpalaInternalService> thrift_if(
-        new ImpalaInternalService(handler, fragment_mgr));
+    boost::shared_ptr<ImpalaInternalService> thrift_if(new ImpalaInternalService());
     boost::shared_ptr<TProcessor> be_processor(
         new ImpalaInternalServiceProcessor(thrift_if));
     boost::shared_ptr<TProcessorEventHandler> event_handler(

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f61397f/be/src/service/impala-server.h
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h
index 2104c5e..53f3384 100644
--- a/be/src/service/impala-server.h
+++ b/be/src/service/impala-server.h
@@ -249,45 +249,6 @@ class ImpalaServer : public ImpalaServiceIf, public ImpalaHiveServer2ServiceIf,
 
   boost::scoped_ptr<ImpalaHttpHandler> http_handler_;
 
-  /// Query result set stores converted rows returned by QueryExecState.fetchRows(). It
-  /// provides an interface to convert Impala rows to external API rows.
-  /// It is an abstract class. Subclass must implement AddOneRow().
-  class QueryResultSet {
-   public:
-    QueryResultSet() {}
-    virtual ~QueryResultSet() {}
-
-    /// Add the row (list of expr value) from a select query to this result set. When a row
-    /// comes from a select query, the row is in the form of expr values (void*). 'scales'
-    /// contains the values' scales (# of digits after decimal), with -1 indicating no
-    /// scale specified.
-    virtual Status AddOneRow(
-        const std::vector<void*>& row, const std::vector<int>& scales) = 0;
-
-    /// Add the TResultRow to this result set. When a row comes from a DDL/metadata
-    /// operation, the row in the form of TResultRow.
-    virtual Status AddOneRow(const TResultRow& row) = 0;
-
-    /// Copies rows in the range [start_idx, start_idx + num_rows) from the other result
-    /// set into this result set. Returns the number of rows added to this result set.
-    /// Returns 0 if the given range is out of bounds of the other result set.
-    virtual int AddRows(const QueryResultSet* other, int start_idx, int num_rows) = 0;
-
-    /// Returns the approximate size of this result set in bytes.
-    int64_t ByteSize() { return ByteSize(0, size()); }
-
-    /// Returns the approximate size of the given range of rows in bytes.
-    virtual int64_t ByteSize(int start_idx, int num_rows) = 0;
-
-    /// Returns the size of this result set in number of rows.
-    virtual size_t size() = 0;
-  };
-
-  /// Result set implementations for Beeswax and HS2
-  class AsciiQueryResultSet;
-  class HS2RowOrientedResultSet;
-  class HS2ColumnarResultSet;
-
   struct SessionState;
 
   /// Execution state of a query.
@@ -299,14 +260,6 @@ class ImpalaServer : public ImpalaServiceIf, public ImpalaHiveServer2ServiceIf,
   static const char* SQLSTATE_GENERAL_ERROR;
   static const char* SQLSTATE_OPTIONAL_FEATURE_NOT_IMPLEMENTED;
 
-  /// Ascii output precision for double/float
-  static const int ASCII_PRECISION;
-
-  QueryResultSet* CreateHS2ResultSet(
-      apache::hive::service::cli::thrift::TProtocolVersion::type version,
-      const TResultSetMetadata& metadata,
-      apache::hive::service::cli::thrift::TRowSet* rowset = NULL);
-
   /// Return exec state for given query_id, or NULL if not found.
   /// If 'lock' is true, the returned exec state's lock() will be acquired before
   /// the query_exec_state_map_lock_ is released.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f61397f/be/src/service/query-exec-state.cc
----------------------------------------------------------------------
diff --git a/be/src/service/query-exec-state.cc b/be/src/service/query-exec-state.cc
index d55ac54..1532ecf 100644
--- a/be/src/service/query-exec-state.cc
+++ b/be/src/service/query-exec-state.cc
@@ -19,14 +19,15 @@
 #include <limits>
 #include <gutil/strings/substitute.h>
 
-#include "exprs/expr.h"
 #include "exprs/expr-context.h"
+#include "exprs/expr.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/row-batch.h"
 #include "runtime/runtime-state.h"
-#include "service/impala-server.h"
 #include "service/frontend.h"
+#include "service/impala-server.h"
 #include "service/query-options.h"
+#include "service/query-result-set.h"
 #include "util/debug-util.h"
 #include "util/impalad-metrics.h"
 #include "util/runtime-profile-counters.h"
@@ -191,6 +192,7 @@ Status ImpalaServer::QueryExecState::Exec(TExecRequest* exec_request) {
             exec_request_.set_query_option_request.value,
             &session_->default_query_options,
             &session_->set_query_options_mask));
+        SetResultSet({}, {});
       } else {
         // "SET" returns a table of all query options.
         map<string, string> config;
@@ -421,17 +423,10 @@ Status ImpalaServer::QueryExecState::ExecQueryOrDmlRequest(
     summary_profile_.AddInfoString(TABLES_WITH_CORRUPT_STATS_KEY, ss.str());
   }
 
-  // If desc_tbl is not set, query has SELECT with no FROM. In that
-  // case, the query can only have a single fragment, and that fragment needs to be
-  // executed by the coordinator. This check confirms that.
-  // If desc_tbl is set, the query may or may not have a coordinator fragment.
   bool is_mt_exec = query_exec_request.query_ctx.request.query_options.mt_dop > 0;
   const TPlanFragment& fragment = is_mt_exec
       ? query_exec_request.mt_plan_exec_info[0].fragments[0]
       : query_exec_request.fragments[0];
-  bool has_coordinator_fragment =
-      fragment.partition.type == TPartitionType::UNPARTITIONED;
-  DCHECK(has_coordinator_fragment || query_exec_request.__isset.desc_tbl);
 
   {
     lock_guard<mutex> l(lock_);
@@ -449,7 +444,7 @@ Status ImpalaServer::QueryExecState::ExecQueryOrDmlRequest(
   }
 
   coord_.reset(new Coordinator(*schedule_, exec_env_, query_events_));
-  status = coord_->Exec(&output_expr_ctxs_);
+  status = coord_->Exec();
   {
     lock_guard<mutex> l(lock_);
     RETURN_IF_ERROR(UpdateQueryStatus(status));
@@ -538,12 +533,11 @@ void ImpalaServer::QueryExecState::Done() {
   query_events_->MarkEvent("Unregister query");
 
   if (coord_.get() != NULL) {
-    Expr::Close(output_expr_ctxs_, coord_->runtime_state());
     // Release any reserved resources.
     Status status = exec_env_->scheduler()->Release(schedule_.get());
     if (!status.ok()) {
       LOG(WARNING) << "Failed to release resources of query " << schedule_->query_id()
-            << " because of error: " << status.GetDetail();
+                   << " because of error: " << status.GetDetail();
     }
     coord_->TearDown();
   }
@@ -626,7 +620,6 @@ Status ImpalaServer::QueryExecState::WaitInternal() {
 
   if (coord_.get() != NULL) {
     RETURN_IF_ERROR(coord_->Wait());
-    RETURN_IF_ERROR(Expr::Open(output_expr_ctxs_, coord_->runtime_state()));
     RETURN_IF_ERROR(UpdateCatalog());
   }
 
@@ -719,6 +712,10 @@ Status ImpalaServer::QueryExecState::FetchRowsInternal(const int32_t max_rows,
     return Status::OK();
   }
 
+  if (coord_.get() == nullptr) {
+    return Status("Client tried to fetch rows on a query that produces no results.");
+  }
+
   int32_t num_rows_fetched_from_cache = 0;
   if (result_cache_max_size_ > 0 && result_cache_ != NULL) {
     // Satisfy the fetch from the result cache if possible.
@@ -729,27 +726,7 @@ Status ImpalaServer::QueryExecState::FetchRowsInternal(const int32_t max_rows,
     if (num_rows_fetched_from_cache >= max_rows) return Status::OK();
   }
 
-  // List of expr values to hold evaluated rows from the query
-  vector<void*> result_row;
-  result_row.resize(output_expr_ctxs_.size());
-
-  // List of scales for floating point values in result_row
-  vector<int> scales;
-  scales.resize(result_row.size());
-
-  if (coord_ == NULL) {
-    // Query with LIMIT 0.
-    query_state_ = QueryState::FINISHED;
-    eos_ = true;
-    return Status::OK();
-  }
-
   query_state_ = QueryState::FINISHED;  // results will be ready after this call
-  // Fetch the next batch if we've returned the current batch entirely
-  if (current_batch_ == NULL || current_batch_row_ >= current_batch_->num_rows()) {
-    RETURN_IF_ERROR(FetchNextBatch());
-  }
-  if (current_batch_ == NULL) return Status::OK();
 
   // Maximum number of rows to be fetched from the coord.
   int32_t max_coord_rows = max_rows;
@@ -759,22 +736,26 @@ Status ImpalaServer::QueryExecState::FetchRowsInternal(const int32_t max_rows,
   }
   {
     SCOPED_TIMER(row_materialization_timer_);
-    // Convert the available rows, limited by max_coord_rows
-    int available = current_batch_->num_rows() - current_batch_row_;
-    int fetched_count = available;
-    // max_coord_rows <= 0 means no limit
-    if (max_coord_rows > 0 && max_coord_rows < available) fetched_count = max_coord_rows;
-    for (int i = 0; i < fetched_count; ++i) {
-      TupleRow* row = current_batch_->GetRow(current_batch_row_);
-      RETURN_IF_ERROR(GetRowValue(row, &result_row, &scales));
-      RETURN_IF_ERROR(fetched_rows->AddOneRow(result_row, scales));
-      ++num_rows_fetched_;
-      ++current_batch_row_;
+    size_t before = fetched_rows->size();
+    // Temporarily release lock so calls to Cancel() are not blocked. fetch_rows_lock_
+    // (already held) ensures that we do not call coord_->GetNext() multiple times
+    // concurrently.
+    // TODO: Simplify this.
+    lock_.unlock();
+    Status status = coord_->GetNext(fetched_rows, max_coord_rows, &eos_);
+    lock_.lock();
+    int num_fetched = fetched_rows->size() - before;
+    DCHECK(max_coord_rows <= 0 || num_fetched <= max_coord_rows) << Substitute(
+        "Fetched more rows ($0) than asked for ($1)", num_fetched, max_coord_rows);
+    num_rows_fetched_ += num_fetched;
+
+    RETURN_IF_ERROR(status);
+    // Check if query status has changed during GetNext() call
+    if (!query_status_.ok()) {
+      eos_ = true;
+      return query_status_;
     }
   }
-  ExprContext::FreeLocalAllocations(output_expr_ctxs_);
-  // Check if there was an error evaluating a row value.
-  RETURN_IF_ERROR(coord_->runtime_state()->CheckQueryState());
 
   // Update the result cache if necessary.
   if (result_cache_max_size_ > 0 && result_cache_.get() != NULL) {
@@ -833,16 +814,6 @@ Status ImpalaServer::QueryExecState::FetchRowsInternal(const int32_t max_rows,
   return Status::OK();
 }
 
-Status ImpalaServer::QueryExecState::GetRowValue(TupleRow* row, vector<void*>* result,
-                                                 vector<int>* scales) {
-  DCHECK(result->size() >= output_expr_ctxs_.size());
-  for (int i = 0; i < output_expr_ctxs_.size(); ++i) {
-    (*result)[i] = output_expr_ctxs_[i]->GetValue(row);
-    (*scales)[i] = output_expr_ctxs_[i]->root()->output_scale();
-  }
-  return Status::OK();
-}
-
 Status ImpalaServer::QueryExecState::Cancel(bool check_inflight, const Status* cause) {
   Coordinator* coord;
   {
@@ -931,28 +902,6 @@ Status ImpalaServer::QueryExecState::UpdateCatalog() {
   return Status::OK();
 }
 
-Status ImpalaServer::QueryExecState::FetchNextBatch() {
-  DCHECK(!eos_);
-  DCHECK(coord_.get() != NULL);
-
-  // Temporarily release lock so calls to Cancel() are not blocked.  fetch_rows_lock_
-  // ensures that we do not call coord_->GetNext() multiple times concurrently.
-  lock_.unlock();
-  Status status = coord_->GetNext(&current_batch_, coord_->runtime_state());
-  lock_.lock();
-  if (!status.ok()) return status;
-
-  // Check if query status has changed during GetNext() call
-  if (!query_status_.ok()) {
-    current_batch_ = NULL;
-    return query_status_;
-  }
-
-  current_batch_row_ = 0;
-  eos_ = current_batch_ == NULL;
-  return Status::OK();
-}
-
 void ImpalaServer::QueryExecState::SetResultSet(const vector<string>& results) {
   request_result_set_.reset(new vector<TResultRow>);
   request_result_set_->resize(results.size());

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f61397f/be/src/service/query-exec-state.h
----------------------------------------------------------------------
diff --git a/be/src/service/query-exec-state.h b/be/src/service/query-exec-state.h
index 0a763ff..54ee929 100644
--- a/be/src/service/query-exec-state.h
+++ b/be/src/service/query-exec-state.h
@@ -248,7 +248,7 @@ class ImpalaServer::QueryExecState {
   /// Resource assignment determined by scheduler. Owned by obj_pool_.
   boost::scoped_ptr<QuerySchedule> schedule_;
 
-  /// not set for ddl queries, or queries with "limit 0"
+  /// Not set for ddl queries.
   boost::scoped_ptr<Coordinator> coord_;
 
   /// Runs statements that query or modify the catalog via the CatalogService.
@@ -293,7 +293,7 @@ class ImpalaServer::QueryExecState {
   MonotonicStopWatch client_wait_sw_;
 
   RuntimeProfile::EventSequence* query_events_;
-  std::vector<ExprContext*> output_expr_ctxs_;
+
   bool is_cancelled_; // if true, Cancel() was called.
   bool eos_;  // if true, there are no more rows to return
   // We enforce the invariant that query_status_ is not OK iff query_state_
@@ -356,13 +356,6 @@ class ImpalaServer::QueryExecState {
   /// Caller needs to hold fetch_rows_lock_ and lock_.
   Status FetchRowsInternal(const int32_t max_rows, QueryResultSet* fetched_rows);
 
-  /// Fetch the next row batch and store the results in current_batch_. Only called for
-  /// non-DDL / DML queries. current_batch_ is set to NULL if execution is complete or the
-  /// query was cancelled.
-  /// Caller needs to hold fetch_rows_lock_ and lock_. Blocks, during which time lock_ is
-  /// released.
-  Status FetchNextBatch();
-
   /// Evaluates 'output_expr_ctxs_' against 'row' and output the evaluated row in
   /// 'result'. The values' scales (# of digits after decimal) are stored in 'scales'.
   /// result and scales must have been resized to the number of columns before call.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f61397f/be/src/service/query-result-set.h
----------------------------------------------------------------------
diff --git a/be/src/service/query-result-set.h b/be/src/service/query-result-set.h
new file mode 100644
index 0000000..b444ca3
--- /dev/null
+++ b/be/src/service/query-result-set.h
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_SERVICE_QUERY_RESULT_SET_H
+#define IMPALA_SERVICE_QUERY_RESULT_SET_H
+
+#include "common/status.h"
+#include "gen-cpp/Data_types.h"
+
+#include <vector>
+
+namespace impala {
+
+/// Stores client-ready query result rows returned by
+/// QueryExecState::FetchRows(). Subclasses implement AddRows() / AddOneRow() to
+/// specialise how Impala's row batches are converted to client-API result
+/// representations.
+class QueryResultSet {
+ public:
+  QueryResultSet() {}
+  virtual ~QueryResultSet() {}
+
+  /// Add a single row to this result set. The row is a vector of pointers to values,
+  /// whose memory belongs to the caller. 'scales' contains the scales for decimal values
+  /// (# of digits after decimal), with -1 indicating no scale specified or the
+  /// corresponding value is not a decimal.
+  virtual Status AddOneRow(
+      const std::vector<void*>& row, const std::vector<int>& scales) = 0;
+
+  /// Add the TResultRow to this result set. When a row comes from a DDL/metadata
+  /// operation, the row in the form of TResultRow.
+  virtual Status AddOneRow(const TResultRow& row) = 0;
+
+  /// Copies rows in the range [start_idx, start_idx + num_rows) from the other result
+  /// set into this result set. Returns the number of rows added to this result set.
+  /// Returns 0 if the given range is out of bounds of the other result set.
+  virtual int AddRows(const QueryResultSet* other, int start_idx, int num_rows) = 0;
+
+  /// Returns the approximate size of this result set in bytes.
+  int64_t ByteSize() { return ByteSize(0, size()); }
+
+  /// Returns the approximate size of the given range of rows in bytes.
+  virtual int64_t ByteSize(int start_idx, int num_rows) = 0;
+
+  /// Returns the size of this result set in number of rows.
+  virtual size_t size() = 0;
+};
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f61397f/be/src/testutil/in-process-servers.cc
----------------------------------------------------------------------
diff --git a/be/src/testutil/in-process-servers.cc b/be/src/testutil/in-process-servers.cc
index cf0e28e..b28f7fc 100644
--- a/be/src/testutil/in-process-servers.cc
+++ b/be/src/testutil/in-process-servers.cc
@@ -34,6 +34,7 @@
 
 DECLARE_string(ssl_server_certificate);
 DECLARE_string(ssl_private_key);
+DECLARE_int32(be_port);
 
 using namespace apache::thrift;
 using namespace impala;
@@ -43,6 +44,9 @@ InProcessImpalaServer* InProcessImpalaServer::StartWithEphemeralPorts(
   for (int tries = 0; tries < 10; ++tries) {
     int backend_port = FindUnusedEphemeralPort();
     if (backend_port == -1) continue;
+    // This flag is read directly in several places to find the address of the local
+    // backend interface.
+    FLAGS_be_port = backend_port;
 
     int subscriber_port = FindUnusedEphemeralPort();
     if (subscriber_port == -1) continue;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f61397f/common/thrift/DataSinks.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/DataSinks.thrift b/common/thrift/DataSinks.thrift
index 12a75b9..83c63b7 100644
--- a/common/thrift/DataSinks.thrift
+++ b/common/thrift/DataSinks.thrift
@@ -26,7 +26,8 @@ include "Partitions.thrift"
 enum TDataSinkType {
   DATA_STREAM_SINK,
   TABLE_SINK,
-  JOIN_BUILD_SINK
+  JOIN_BUILD_SINK,
+  PLAN_ROOT_SINK
 }
 
 enum TSinkAction {
@@ -87,10 +88,10 @@ struct TJoinBuildSink {
 
 // Union type of all table sinks.
 struct TTableSink {
-  1: required Types.TTableId  target_table_id
+  1: required Types.TTableId target_table_id
   2: required TTableSinkType type
   3: required TSinkAction action
-  4: optional THdfsTableSink  hdfs_table_sink
+  4: optional THdfsTableSink hdfs_table_sink
   5: optional TKuduTableSink kudu_table_sink
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f61397f/fe/src/main/java/org/apache/impala/analysis/QueryStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/QueryStmt.java b/fe/src/main/java/org/apache/impala/analysis/QueryStmt.java
index b02bc73..392b961 100644
--- a/fe/src/main/java/org/apache/impala/analysis/QueryStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/QueryStmt.java
@@ -25,6 +25,8 @@ import java.util.Set;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.TreeNode;
+import org.apache.impala.planner.DataSink;
+import org.apache.impala.planner.PlanRootSink;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Predicates;
 import com.google.common.collect.Lists;
@@ -409,6 +411,10 @@ public abstract class QueryStmt extends StatementBase {
     resultExprs_ = Expr.substituteList(resultExprs_, smap, analyzer, true);
   }
 
+  public DataSink createDataSink() {
+    return new PlanRootSink();
+  }
+
   public ArrayList<OrderByElement> cloneOrderByElements() {
     if (orderByElements_ == null) return null;
     ArrayList<OrderByElement> result =

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f61397f/fe/src/main/java/org/apache/impala/planner/PlanRootSink.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/PlanRootSink.java b/fe/src/main/java/org/apache/impala/planner/PlanRootSink.java
new file mode 100644
index 0000000..a199f54
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/planner/PlanRootSink.java
@@ -0,0 +1,39 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.planner;
+
+import org.apache.impala.thrift.TDataSink;
+import org.apache.impala.thrift.TDataSinkType;
+import org.apache.impala.thrift.TExplainLevel;
+
+/**
+ * Sink for the root of a query plan that produces result rows. Allows coordination
+ * between the sender which produces those rows, and the consumer which sends them to the
+ * client, despite both executing concurrently.
+ */
+public class PlanRootSink extends DataSink {
+
+  public String getExplainString(String prefix, String detailPrefix,
+      TExplainLevel explainLevel) {
+    return String.format("%sPLAN-ROOT SINK\n", prefix);
+  }
+
+  protected TDataSink toThrift() {
+    return new TDataSink(TDataSinkType.PLAN_ROOT_SINK);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f61397f/fe/src/main/java/org/apache/impala/planner/Planner.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/Planner.java b/fe/src/main/java/org/apache/impala/planner/Planner.java
index 405eebe..ed4c677 100644
--- a/fe/src/main/java/org/apache/impala/planner/Planner.java
+++ b/fe/src/main/java/org/apache/impala/planner/Planner.java
@@ -149,6 +149,8 @@ public class Planner {
       } else if (ctx_.isDelete()) {
         // Set up delete sink for root fragment
         rootFragment.setSink(ctx_.getAnalysisResult().getDeleteStmt().createDataSink());
+      } else if (ctx_.isQuery()) {
+        rootFragment.setSink(ctx_.getAnalysisResult().getQueryStmt().createDataSink());
       }
       QueryStmt queryStmt = ctx_.getQueryStmt();
       queryStmt.substituteResultExprs(rootNodeSmap, ctx_.getRootAnalyzer());

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f61397f/fe/src/main/java/org/apache/impala/planner/PlannerContext.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/PlannerContext.java b/fe/src/main/java/org/apache/impala/planner/PlannerContext.java
index 29cca13..3275a7a 100644
--- a/fe/src/main/java/org/apache/impala/planner/PlannerContext.java
+++ b/fe/src/main/java/org/apache/impala/planner/PlannerContext.java
@@ -90,6 +90,7 @@ public class PlannerContext {
   public boolean isInsertOrCtas() {
     return analysisResult_.isInsertStmt() || analysisResult_.isCreateTableAsSelectStmt();
   }
+  public boolean isQuery() { return analysisResult_.isQueryStmt(); }
 
   public boolean hasSubplan() { return !subplans_.isEmpty(); }
   public SubplanNode getSubplan() { return subplans_.getFirst(); }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9f61397f/testdata/workloads/functional-planner/queries/PlannerTest/aggregation.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/aggregation.test b/testdata/workloads/functional-planner/queries/PlannerTest/aggregation.test
index 47bfb23..d7838f9 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/aggregation.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/aggregation.test
@@ -3,12 +3,16 @@ select count(*), count(tinyint_col), min(tinyint_col), max(tinyint_col), sum(tin
 avg(tinyint_col)
 from functional.alltypesagg
 ---- PLAN
+PLAN-ROOT SINK
+|
 01:AGGREGATE [FINALIZE]
 |  output: count(*), count(tinyint_col), min(tinyint_col), max(tinyint_col), sum(tinyint_col), avg(tinyint_col)
 |
 00:SCAN HDFS [functional.alltypesagg]
    partitions=11/11 files=11 size=814.73KB
 ---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
 03:AGGREGATE [FINALIZE]
 |  output: count:merge(*), count:merge(tinyint_col), min:merge(tinyint_col), max:merge(tinyint_col), sum:merge(tinyint_col), avg:merge(tinyint_col)
 |
@@ -26,6 +30,8 @@ avg(tinyint_col)
 from functional.alltypesagg
 group by 2, 1
 ---- PLAN
+PLAN-ROOT SINK
+|
 01:AGGREGATE [FINALIZE]
 |  output: count(*), min(tinyint_col), max(tinyint_col), sum(tinyint_col), avg(tinyint_col)
 |  group by: bigint_col, tinyint_col
@@ -33,6 +39,8 @@ group by 2, 1
 00:SCAN HDFS [functional.alltypesagg]
    partitions=11/11 files=11 size=814.73KB
 ---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
 04:EXCHANGE [UNPARTITIONED]
 |
 03:AGGREGATE [FINALIZE]
@@ -54,6 +62,8 @@ from functional.testtbl
 having count(id) > 0
 order by avg(zip) limit 10
 ---- PLAN
+PLAN-ROOT SINK
+|
 02:TOP-N [LIMIT=10]
 |  order by: avg(zip) ASC
 |
@@ -64,6 +74,8 @@ order by avg(zip) limit 10
 00:SCAN HDFS [functional.testtbl]
    partitions=1/1 files=0 size=0B
 ---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
 02:TOP-N [LIMIT=10]
 |  order by: avg(zip) ASC
 |
@@ -85,6 +97,8 @@ from functional.alltypesagg
 group by int_col + int_col, int_col * int_col, int_col + int_col
 having (int_col * int_col) < 0 limit 10
 ---- PLAN
+PLAN-ROOT SINK
+|
 01:AGGREGATE [FINALIZE]
 |  group by: int_col + int_col, int_col * int_col
 |  having: int_col * int_col < 0
@@ -93,6 +107,8 @@ having (int_col * int_col) < 0 limit 10
 00:SCAN HDFS [functional.alltypesagg]
    partitions=11/11 files=11 size=814.73KB
 ---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
 04:EXCHANGE [UNPARTITIONED]
 |  limit: 10
 |
@@ -116,6 +132,8 @@ functional.alltypes t1 inner join functional.alltypestiny t2
 group by t1.tinyint_col, t2.smallint_col
 having count(t2.int_col) = count(t1.bigint_col)
 ---- PLAN
+PLAN-ROOT SINK
+|
 03:AGGREGATE [FINALIZE]
 |  output: count(*), count(t2.int_col), count(t1.bigint_col)
 |  group by: t1.tinyint_col, t2.smallint_col
@@ -141,6 +159,8 @@ select 1 from
    group by int_col) t
 where t.x > 10
 ---- PLAN
+PLAN-ROOT SINK
+|
 01:AGGREGATE [FINALIZE]
 |  output: avg(bigint_col)
 |  group by: int_col
@@ -157,6 +177,8 @@ select count(*) from
    select * from functional.alltypessmall) t
 limit 10
 ---- PLAN
+PLAN-ROOT SINK
+|
 03:AGGREGATE [FINALIZE]
 |  output: count(*)
 |  limit: 10
@@ -169,6 +191,8 @@ limit 10
 01:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
 ---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
 05:AGGREGATE [FINALIZE]
 |  output: count:merge(*)
 |  limit: 10
@@ -194,6 +218,8 @@ select count(*) from
 group by t.bigint_col
 limit 10
 ---- PLAN
+PLAN-ROOT SINK
+|
 03:AGGREGATE [FINALIZE]
 |  output: count(*)
 |  group by: bigint_col
@@ -207,6 +233,8 @@ limit 10
 01:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
 ---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
 06:EXCHANGE [UNPARTITIONED]
 |  limit: 10
 |
@@ -237,6 +265,8 @@ from
    select * from functional.alltypessmall) t
 limit 10
 ---- PLAN
+PLAN-ROOT SINK
+|
 04:AGGREGATE [FINALIZE]
 |  output: count(int_col)
 |  limit: 10
@@ -252,6 +282,8 @@ limit 10
 01:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
 ---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
 08:AGGREGATE [FINALIZE]
 |  output: count:merge(int_col)
 |  limit: 10
@@ -286,6 +318,8 @@ from
 group by t.bigint_col
 limit 10
 ---- PLAN
+PLAN-ROOT SINK
+|
 04:AGGREGATE [FINALIZE]
 |  output: count(int_col)
 |  group by: t.bigint_col
@@ -302,6 +336,8 @@ limit 10
 01:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
 ---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
 07:EXCHANGE [UNPARTITIONED]
 |  limit: 10
 |
@@ -334,6 +370,8 @@ from
    select * from functional.alltypessmall) t
 limit 10
 ---- PLAN
+PLAN-ROOT SINK
+|
 04:AGGREGATE [FINALIZE]
 |  output: count(int_col), count:merge(smallint_col)
 |  limit: 10
@@ -350,6 +388,8 @@ limit 10
 01:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
 ---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
 08:AGGREGATE [FINALIZE]
 |  output: count:merge(int_col), count:merge(smallint_col)
 |  limit: 10
@@ -386,6 +426,8 @@ from
 group by t.bigint_col
 limit 10
 ---- PLAN
+PLAN-ROOT SINK
+|
 04:AGGREGATE [FINALIZE]
 |  output: count(int_col), count:merge(smallint_col)
 |  group by: t.bigint_col
@@ -403,6 +445,8 @@ limit 10
 01:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
 ---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
 07:EXCHANGE [UNPARTITIONED]
 |  limit: 10
 |
@@ -438,6 +482,8 @@ from
 group by t.bigint_col
 limit 10
 ---- PLAN
+PLAN-ROOT SINK
+|
 05:AGGREGATE [FINALIZE]
 |  output: count(int_col), count:merge(smallint_col)
 |  group by: t.bigint_col
@@ -458,6 +504,8 @@ limit 10
 01:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
 ---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
 10:EXCHANGE [UNPARTITIONED]
 |  limit: 10
 |
@@ -495,6 +543,8 @@ limit 10
 # test that aggregations are not placed below an unpartitioned exchange with a limit
 select count(*) from (select * from functional.alltypes limit 10) t
 ---- PLAN
+PLAN-ROOT SINK
+|
 01:AGGREGATE [FINALIZE]
 |  output: count(*)
 |
@@ -502,6 +552,8 @@ select count(*) from (select * from functional.alltypes limit 10) t
    partitions=24/24 files=24 size=478.45KB
    limit: 10
 ---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
 01:AGGREGATE [FINALIZE]
 |  output: count(*)
 |
@@ -518,6 +570,8 @@ select count(*) from
    union all
    (select * from functional.alltypessmall) limit 10) t
 ---- PLAN
+PLAN-ROOT SINK
+|
 03:AGGREGATE [FINALIZE]
 |  output: count(*)
 |
@@ -530,6 +584,8 @@ select count(*) from
 01:SCAN HDFS [functional.alltypes]
    partitions=24/24 files=24 size=478.45KB
 ---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
 03:AGGREGATE [FINALIZE]
 |  output: count(*)
 |
@@ -555,6 +611,8 @@ select * from (
   limit 2) v
 limit 1
 ---- PLAN
+PLAN-ROOT SINK
+|
 06:AGGREGATE [FINALIZE]
 |  output: count(cnt)
 |  limit: 1
@@ -580,6 +638,8 @@ limit 1
    partitions=11/11 files=11 size=814.73KB
    runtime filters: RF000 -> t1.id
 ---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
 06:AGGREGATE [FINALIZE]
 |  output: count(cnt)
 |  limit: 1
@@ -629,6 +689,8 @@ select * from
    group by 1, 2, 3, 4) v
 where v.a = v.b and v.b = v.c and v.c = v.d and v.a = v.c and v.a = v.d
 ---- PLAN
+PLAN-ROOT SINK
+|
 01:AGGREGATE [FINALIZE]
 |  group by: tinyint_col, smallint_col, int_col + int_col, coalesce(bigint_col, year)
 |  having: int_col + int_col = coalesce(bigint_col, year), smallint_col = int_col + int_col
@@ -643,6 +705,8 @@ select cnt from
    from functional.alltypestiny
    group by bool_col, x) v
 ---- PLAN
+PLAN-ROOT SINK
+|
 01:AGGREGATE [FINALIZE]
 |  output: count(*)
 |  group by: bool_col, CAST(NULL AS INT)
@@ -656,6 +720,8 @@ select cnt from
    from functional.alltypestiny
    group by bool_col, x) v
 ---- PLAN
+PLAN-ROOT SINK
+|
 02:AGGREGATE [FINALIZE]
 |  output: count(int_col)
 |  group by: bool_col, NULL
@@ -669,6 +735,8 @@ select cnt from
 # test simple group_concat with distinct
 select group_concat(distinct string_col) from functional.alltypesagg
 ---- PLAN
+PLAN-ROOT SINK
+|
 02:AGGREGATE [FINALIZE]
 |  output: group_concat(string_col)
 |
@@ -678,6 +746,8 @@ select group_concat(distinct string_col) from functional.alltypesagg
 00:SCAN HDFS [functional.alltypesagg]
    partitions=11/11 files=11 size=814.73KB
 ---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
 06:AGGREGATE [FINALIZE]
 |  output: group_concat:merge(string_col)
 |
@@ -702,6 +772,8 @@ select day, group_concat(distinct string_col)
 from (select * from functional.alltypesagg where id % 100 = day order by id limit 99999) a
 group by day
 ---- PLAN
+PLAN-ROOT SINK
+|
 03:AGGREGATE [FINALIZE]
 |  output: group_concat(string_col)
 |  group by: day
@@ -716,6 +788,8 @@ group by day
    partitions=11/11 files=11 size=814.73KB
    predicates: id % 100 = day
 ---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
 03:AGGREGATE [FINALIZE]
 |  output: group_concat(string_col)
 |  group by: day
@@ -739,6 +813,8 @@ select count(distinct cast(timestamp_col as string)),
 group_concat(distinct cast(timestamp_col as string))
 from functional.alltypesagg group by year
 ---- PLAN
+PLAN-ROOT SINK
+|
 02:AGGREGATE [FINALIZE]
 |  output: count(CAST(timestamp_col AS STRING)), group_concat(CAST(timestamp_col AS STRING))
 |  group by: year
@@ -749,6 +825,8 @@ from functional.alltypesagg group by year
 00:SCAN HDFS [functional.alltypesagg]
    partitions=11/11 files=11 size=814.73KB
 ---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
 05:EXCHANGE [UNPARTITIONED]
 |
 02:AGGREGATE [FINALIZE]
@@ -769,6 +847,8 @@ from functional.alltypesagg group by year
 # test group_concat distinct with other non-distinct aggregate functions
  select group_concat(distinct string_col), count(*) from functional.alltypesagg
 ---- PLAN
+PLAN-ROOT SINK
+|
 02:AGGREGATE [FINALIZE]
 |  output: group_concat(string_col), count:merge(*)
 |
@@ -779,6 +859,8 @@ from functional.alltypesagg group by year
 00:SCAN HDFS [functional.alltypesagg]
    partitions=11/11 files=11 size=814.73KB
 ---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
 06:AGGREGATE [FINALIZE]
 |  output: group_concat:merge(string_col), count:merge(*)
 |
@@ -804,6 +886,8 @@ from functional.alltypesagg group by year
 select group_concat(distinct string_col, '-'), sum(int_col), count(distinct string_col)
 from functional.alltypesagg
 ---- PLAN
+PLAN-ROOT SINK
+|
 02:AGGREGATE [FINALIZE]
 |  output: group_concat(string_col, '-'), count(string_col), sum:merge(int_col)
 |
@@ -814,6 +898,8 @@ from functional.alltypesagg
 00:SCAN HDFS [functional.alltypesagg]
    partitions=11/11 files=11 size=814.73KB
 ---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
 06:AGGREGATE [FINALIZE]
 |  output: group_concat:merge(string_col, '-'), count:merge(string_col), sum:merge(int_col)
 |
@@ -841,6 +927,8 @@ select month, year, count(*), count(distinct date_string_col),
 group_concat(distinct date_string_col, '-') from functional.alltypesagg
 group by month, year
 ---- PLAN
+PLAN-ROOT SINK
+|
 02:AGGREGATE [FINALIZE]
 |  output: count(date_string_col), group_concat(date_string_col, '-'), count:merge(*)
 |  group by: month, year
@@ -852,6 +940,8 @@ group by month, year
 00:SCAN HDFS [functional.alltypesagg]
    partitions=11/11 files=11 size=814.73KB
 ---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
 05:EXCHANGE [UNPARTITIONED]
 |
 02:AGGREGATE [FINALIZE]
@@ -875,6 +965,8 @@ group by month, year
 select group_concat(distinct string_col), group_concat(distinct string_col, '-'),
 group_concat(distinct string_col, '---')  from functional.alltypesagg
 ---- PLAN
+PLAN-ROOT SINK
+|
 02:AGGREGATE [FINALIZE]
 |  output: group_concat(string_col), group_concat(string_col, '-'), group_concat(string_col, '---')
 |
@@ -884,6 +976,8 @@ group_concat(distinct string_col, '---')  from functional.alltypesagg
 00:SCAN HDFS [functional.alltypesagg]
    partitions=11/11 files=11 size=814.73KB
 ---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
 06:AGGREGATE [FINALIZE]
 |  output: group_concat:merge(string_col), group_concat:merge(string_col, '-'), group_concat:merge(string_col, '---')
 |
@@ -906,6 +1000,8 @@ group_concat(distinct string_col, '---')  from functional.alltypesagg
 # IMPALA-852: Aggregation only in the HAVING clause.
 select 1 from functional.alltypestiny having count(*) > 0
 ---- PLAN
+PLAN-ROOT SINK
+|
 01:AGGREGATE [FINALIZE]
 |  output: count(*)
 |  having: count(*) > 0
@@ -923,6 +1019,8 @@ group by 1
 having count(*) < 150000
 limit 1000000
 ---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
 06:EXCHANGE [UNPARTITIONED]
 |  limit: 1000000
 |
@@ -957,6 +1055,8 @@ select col from (
 where col > 50
 limit 50
 ---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
 08:AGGREGATE [FINALIZE]
 |  output: count:merge(c_custkey)
 |  having: count(c_custkey) > 50
@@ -992,6 +1092,8 @@ select straight_join c_custkey, count(distinct c_custkey)
 from tpch_parquet.orders inner join [shuffle] tpch_parquet.customer on c_custkey = o_custkey
 group by 1
 ---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
 07:EXCHANGE [UNPARTITIONED]
 |
 04:AGGREGATE [FINALIZE]
@@ -1029,6 +1131,8 @@ group by 1, 2
 having count(*) > 10
 limit 10
 ---- DISTRIBUTEDPLAN
+PLAN-ROOT SINK
+|
 09:EXCHANGE [UNPARTITIONED]
 |  limit: 10
 |