You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by lv...@apache.org on 2017/05/09 15:56:01 UTC

[07/13] incubator-impala git commit: IMPALA-2550: Switch to per-query exec rpc

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/runtime/plan-fragment-executor.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/plan-fragment-executor.cc b/be/src/runtime/plan-fragment-executor.cc
deleted file mode 100644
index df5b0d2..0000000
--- a/be/src/runtime/plan-fragment-executor.cc
+++ /dev/null
@@ -1,518 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "runtime/plan-fragment-executor.h"
-
-#include <thrift/protocol/TDebugProtocol.h>
-#include <boost/date_time/posix_time/posix_time_types.hpp>
-#include <boost/unordered_map.hpp>
-#include <gutil/strings/substitute.h>
-
-#include "codegen/llvm-codegen.h"
-#include "common/logging.h"
-#include "common/object-pool.h"
-#include "exec/data-sink.h"
-#include "exec/exchange-node.h"
-#include "exec/exec-node.h"
-#include "exec/hbase-table-scanner.h"
-#include "exec/hdfs-scan-node.h"
-#include "exec/plan-root-sink.h"
-#include "exec/scan-node.h"
-#include "exprs/expr.h"
-#include "runtime/data-stream-mgr.h"
-#include "runtime/descriptors.h"
-#include "runtime/mem-tracker.h"
-#include "runtime/row-batch.h"
-#include "runtime/query-state.h"
-#include "runtime/runtime-filter-bank.h"
-#include "runtime/exec-env.h"
-#include "util/container-util.h"
-#include "runtime/runtime-state.h"
-#include "util/cpu-info.h"
-#include "util/debug-util.h"
-#include "util/mem-info.h"
-#include "util/parse-util.h"
-#include "util/periodic-counter-updater.h"
-#include "util/pretty-printer.h"
-
-DEFINE_bool(serialize_batch, false, "serialize and deserialize each returned row batch");
-DEFINE_int32(status_report_interval, 5, "interval between profile reports; in seconds");
-
-#include "common/names.h"
-
-namespace posix_time = boost::posix_time;
-using boost::get_system_time;
-using boost::system_time;
-using namespace apache::thrift;
-using namespace strings;
-
-namespace impala {
-
-const string PlanFragmentExecutor::PER_HOST_PEAK_MEM_COUNTER = "PerHostPeakMemUsage";
-
-namespace {
-const string OPEN_TIMER_NAME = "OpenTime";
-const string PREPARE_TIMER_NAME = "PrepareTime";
-const string EXEC_TIMER_NAME = "ExecTime";
-}
-
-PlanFragmentExecutor::PlanFragmentExecutor(
-    const ReportStatusCallback& report_status_cb)
-  : exec_tree_(NULL),
-    report_status_cb_(report_status_cb),
-    report_thread_active_(false),
-    closed_(false),
-    has_thread_token_(false),
-    timings_profile_(NULL),
-    root_sink_(NULL),
-    is_prepared_(false),
-    is_cancelled_(false),
-    per_host_mem_usage_(NULL),
-    rows_produced_counter_(NULL),
-    average_thread_tokens_(NULL),
-    mem_usage_sampled_counter_(NULL),
-    thread_usage_sampled_counter_(NULL) {}
-
-PlanFragmentExecutor::~PlanFragmentExecutor() {
-  DCHECK(!is_prepared_ || closed_);
-  // at this point, the report thread should have been stopped
-  DCHECK(!report_thread_active_);
-}
-
-Status PlanFragmentExecutor::Prepare(
-    QueryState* query_state, const TDescriptorTable& desc_tbl,
-    const TPlanFragmentCtx& fragment_ctx, const TPlanFragmentInstanceCtx& instance_ctx) {
-  Status status = PrepareInternal(query_state, desc_tbl, fragment_ctx, instance_ctx);
-  prepared_promise_.Set(status);
-  if (!status.ok()) FragmentComplete(status);
-  return status;
-}
-
-Status PlanFragmentExecutor::WaitForOpen() {
-  DCHECK(prepared_promise_.IsSet()) << "Prepare() must complete before WaitForOpen()";
-  RETURN_IF_ERROR(prepared_promise_.Get());
-  return opened_promise_.Get();
-}
-
-Status PlanFragmentExecutor::PrepareInternal(
-    QueryState* qs, const TDescriptorTable& tdesc_tbl,
-    const TPlanFragmentCtx& fragment_ctx, const TPlanFragmentInstanceCtx& instance_ctx) {
-  lock_guard<mutex> l(prepare_lock_);
-  DCHECK(!is_prepared_);
-
-  if (is_cancelled_) return Status::CANCELLED;
-  is_prepared_ = true;
-
-  // TODO: Break this method up.
-  query_id_ = qs->query_ctx().query_id;
-
-  VLOG_QUERY << "Prepare(): instance_id="
-             << PrintId(instance_ctx.fragment_instance_id);
-  VLOG(2) << "fragment_instance_ctx:\n" << ThriftDebugString(instance_ctx);
-
-  // Prepare() must not return before runtime_state_ is set if is_prepared_ was
-  // set. Having runtime_state_.get() != NULL is a postcondition of this method in that
-  // case. Do not call RETURN_IF_ERROR or explicitly return before this line.
-  runtime_state_.reset(
-      new RuntimeState(qs, fragment_ctx, instance_ctx, ExecEnv::GetInstance()));
-
-  // total_time_counter() is in the runtime_state_ so start it up now.
-  SCOPED_TIMER(profile()->total_time_counter());
-  timings_profile_ = obj_pool()->Add(
-      new RuntimeProfile(obj_pool(), "Fragment Instance Lifecycle Timings"));
-  profile()->AddChild(timings_profile_);
-  SCOPED_TIMER(ADD_TIMER(timings_profile_, PREPARE_TIMER_NAME));
-
-  // reservation or a query option.
-  int64_t bytes_limit = -1;
-  if (runtime_state_->query_options().__isset.mem_limit &&
-      runtime_state_->query_options().mem_limit > 0) {
-    bytes_limit = runtime_state_->query_options().mem_limit;
-    VLOG_QUERY << "Using query memory limit from query options: "
-               << PrettyPrinter::Print(bytes_limit, TUnit::BYTES);
-  }
-
-  DCHECK(!instance_ctx.request_pool.empty());
-  RETURN_IF_ERROR(runtime_state_->CreateBlockMgr());
-  runtime_state_->InitFilterBank();
-
-  // Reserve one main thread from the pool
-  runtime_state_->resource_pool()->AcquireThreadToken();
-  has_thread_token_ = true;
-
-  average_thread_tokens_ = profile()->AddSamplingCounter("AverageThreadTokens",
-      bind<int64_t>(mem_fn(&ThreadResourceMgr::ResourcePool::num_threads),
-          runtime_state_->resource_pool()));
-  mem_usage_sampled_counter_ = profile()->AddTimeSeriesCounter("MemoryUsage",
-      TUnit::BYTES,
-      bind<int64_t>(mem_fn(&MemTracker::consumption),
-          runtime_state_->instance_mem_tracker()));
-  thread_usage_sampled_counter_ = profile()->AddTimeSeriesCounter("ThreadUsage",
-      TUnit::UNIT,
-      bind<int64_t>(mem_fn(&ThreadResourceMgr::ResourcePool::num_threads),
-          runtime_state_->resource_pool()));
-
-  // set up desc tbl
-  DescriptorTbl* desc_tbl = NULL;
-  RETURN_IF_ERROR(DescriptorTbl::Create(obj_pool(), tdesc_tbl, &desc_tbl));
-  runtime_state_->set_desc_tbl(desc_tbl);
-  VLOG_QUERY << "descriptor table for fragment=" << instance_ctx.fragment_instance_id
-             << "\n" << desc_tbl->DebugString();
-
-  // set up plan
-  RETURN_IF_ERROR(ExecNode::CreateTree(
-      runtime_state_.get(), fragment_ctx.fragment.plan, *desc_tbl, &exec_tree_));
-  runtime_state_->set_fragment_root_id(exec_tree_->id());
-
-  if (instance_ctx.__isset.debug_node_id) {
-    DCHECK(instance_ctx.__isset.debug_action);
-    DCHECK(instance_ctx.__isset.debug_phase);
-    ExecNode::SetDebugOptions(instance_ctx.debug_node_id, instance_ctx.debug_phase,
-        instance_ctx.debug_action, exec_tree_);
-  }
-
-  // set #senders of exchange nodes before calling Prepare()
-  vector<ExecNode*> exch_nodes;
-  exec_tree_->CollectNodes(TPlanNodeType::EXCHANGE_NODE, &exch_nodes);
-  for (ExecNode* exch_node : exch_nodes) {
-    DCHECK_EQ(exch_node->type(), TPlanNodeType::EXCHANGE_NODE);
-    int num_senders =
-        FindWithDefault(instance_ctx.per_exch_num_senders, exch_node->id(), 0);
-    DCHECK_GT(num_senders, 0);
-    static_cast<ExchangeNode*>(exch_node)->set_num_senders(num_senders);
-  }
-
-  // set scan ranges
-  vector<ExecNode*> scan_nodes;
-  vector<TScanRangeParams> no_scan_ranges;
-  exec_tree_->CollectScanNodes(&scan_nodes);
-  for (int i = 0; i < scan_nodes.size(); ++i) {
-    ScanNode* scan_node = static_cast<ScanNode*>(scan_nodes[i]);
-    const vector<TScanRangeParams>& scan_ranges = FindWithDefault(
-        instance_ctx.per_node_scan_ranges, scan_node->id(), no_scan_ranges);
-    scan_node->SetScanRanges(scan_ranges);
-  }
-
-  RuntimeState* state = runtime_state();
-  RuntimeProfile::Counter* prepare_timer =
-      ADD_CHILD_TIMER(timings_profile_, "ExecTreePrepareTime", PREPARE_TIMER_NAME);
-  {
-    SCOPED_TIMER(prepare_timer);
-    RETURN_IF_ERROR(exec_tree_->Prepare(state));
-  }
-
-  PrintVolumeIds(instance_ctx.per_node_scan_ranges);
-
-  DCHECK(fragment_ctx.fragment.__isset.output_sink);
-  RETURN_IF_ERROR(
-      DataSink::CreateDataSink(obj_pool(), fragment_ctx.fragment.output_sink,
-          fragment_ctx.fragment.output_exprs, instance_ctx, exec_tree_->row_desc(),
-          &sink_));
-  RETURN_IF_ERROR(
-      sink_->Prepare(runtime_state(), runtime_state_->instance_mem_tracker()));
-
-  RuntimeProfile* sink_profile = sink_->profile();
-  if (sink_profile != NULL) {
-    profile()->AddChild(sink_profile);
-  }
-
-  if (fragment_ctx.fragment.output_sink.type == TDataSinkType::PLAN_ROOT_SINK) {
-    root_sink_ = reinterpret_cast<PlanRootSink*>(sink_.get());
-    // Release the thread token on the root fragment instance. This fragment spends most
-    // of the time waiting and doing very little work. Holding on to the token causes
-    // underutilization of the machine. If there are 12 queries on this node, that's 12
-    // tokens reserved for no reason.
-    ReleaseThreadToken();
-  }
-
-  if (state->ShouldCodegen()) {
-    RETURN_IF_ERROR(state->CreateCodegen());
-    exec_tree_->Codegen(state);
-    // It shouldn't be fatal to fail codegen. However, until IMPALA-4233 is fixed,
-    // ScalarFnCall has no fall back to interpretation when codegen fails so propagates
-    // the error status for now.
-    RETURN_IF_ERROR(state->CodegenScalarFns());
-  }
-
-  // set up profile counters
-  profile()->AddChild(exec_tree_->runtime_profile());
-  rows_produced_counter_ =
-      ADD_COUNTER(profile(), "RowsProduced", TUnit::UNIT);
-  per_host_mem_usage_ =
-      ADD_COUNTER(profile(), PER_HOST_PEAK_MEM_COUNTER, TUnit::BYTES);
-
-  row_batch_.reset(new RowBatch(exec_tree_->row_desc(), state->batch_size(),
-      state->instance_mem_tracker()));
-  VLOG(2) << "plan_root=\n" << exec_tree_->DebugString();
-  return Status::OK();
-}
-
-Status PlanFragmentExecutor::OptimizeLlvmModule() {
-  if (!runtime_state_->ShouldCodegen()) return Status::OK();
-  LlvmCodeGen* codegen = runtime_state_->codegen();
-  DCHECK(codegen != NULL);
-  return codegen->FinalizeModule();
-}
-
-void PlanFragmentExecutor::PrintVolumeIds(
-    const PerNodeScanRanges& per_node_scan_ranges) {
-  if (per_node_scan_ranges.empty()) return;
-
-  HdfsScanNode::PerVolumnStats per_volume_stats;
-  for (const PerNodeScanRanges::value_type& entry: per_node_scan_ranges) {
-    HdfsScanNode::UpdateHdfsSplitStats(entry.second, &per_volume_stats);
-  }
-
-  stringstream str;
-
-  HdfsScanNode::PrintHdfsSplitStats(per_volume_stats, &str);
-  profile()->AddInfoString(HdfsScanNode::HDFS_SPLIT_STATS_DESC, str.str());
-  VLOG_FILE
-      << "Hdfs split stats (<volume id>:<# splits>/<split lengths>) for query="
-      << query_id_ << ":\n" << str.str();
-}
-
-Status PlanFragmentExecutor::Open() {
-  DCHECK(prepared_promise_.IsSet() && prepared_promise_.Get().ok());
-  Status status;
-  {
-    SCOPED_TIMER(profile()->total_time_counter());
-    SCOPED_TIMER(ADD_TIMER(timings_profile_, OPEN_TIMER_NAME));
-    VLOG_QUERY << "Open(): instance_id=" << runtime_state_->fragment_instance_id();
-    status = OpenInternal();
-  }
-  if (!status.ok()) FragmentComplete(status);
-  opened_promise_.Set(status);
-  return status;
-}
-
-Status PlanFragmentExecutor::OpenInternal() {
-  SCOPED_THREAD_COUNTER_MEASUREMENT(runtime_state_->total_thread_statistics());
-  RETURN_IF_ERROR(
-      runtime_state_->desc_tbl().PrepareAndOpenPartitionExprs(runtime_state_.get()));
-
-  // We need to start the profile-reporting thread before calling exec_tree_->Open(),
-  // since it may block.
-  if (!report_status_cb_.empty() && FLAGS_status_report_interval > 0) {
-    unique_lock<mutex> l(report_thread_lock_);
-    report_thread_.reset(
-        new Thread("plan-fragment-executor", "report-profile",
-            &PlanFragmentExecutor::ReportProfileThread, this));
-    // Make sure the thread started up, otherwise ReportProfileThread() might get into
-    // a race with StopReportThread().
-    while (!report_thread_active_) report_thread_started_cv_.wait(l);
-  }
-
-  RETURN_IF_ERROR(OptimizeLlvmModule());
-
-  {
-    SCOPED_TIMER(ADD_CHILD_TIMER(timings_profile_, "ExecTreeOpenTime", OPEN_TIMER_NAME));
-    RETURN_IF_ERROR(exec_tree_->Open(runtime_state_.get()));
-  }
-  return sink_->Open(runtime_state_.get());
-}
-
-Status PlanFragmentExecutor::Exec() {
-  DCHECK(opened_promise_.IsSet() && opened_promise_.Get().ok());
-  Status status;
-  {
-    // Must go out of scope before FragmentComplete(), otherwise counter will not be
-    // updated by time final profile is sent.
-    SCOPED_TIMER(profile()->total_time_counter());
-    SCOPED_TIMER(ADD_TIMER(timings_profile_, EXEC_TIMER_NAME));
-    status = ExecInternal();
-  }
-  FragmentComplete(status);
-  return status;
-}
-
-Status PlanFragmentExecutor::ExecInternal() {
-  RuntimeProfile::Counter* plan_exec_timer =
-      ADD_CHILD_TIMER(timings_profile_, "ExecTreeExecTime", EXEC_TIMER_NAME);
-  SCOPED_THREAD_COUNTER_MEASUREMENT(runtime_state_->total_thread_statistics());
-  bool exec_tree_complete = false;
-  do {
-    Status status;
-    row_batch_->Reset();
-    {
-      SCOPED_TIMER(plan_exec_timer);
-      status = exec_tree_->GetNext(
-          runtime_state_.get(), row_batch_.get(), &exec_tree_complete);
-    }
-    if (VLOG_ROW_IS_ON) row_batch_->VLogRows("PlanFragmentExecutor::ExecInternal()");
-    COUNTER_ADD(rows_produced_counter_, row_batch_->num_rows());
-    RETURN_IF_ERROR(status);
-    RETURN_IF_ERROR(sink_->Send(runtime_state(), row_batch_.get()));
-  } while (!exec_tree_complete);
-
-  // Flush the sink *before* stopping the report thread. Flush may need to add some
-  // important information to the last report that gets sent. (e.g. table sinks record the
-  // files they have written to in this method)
-  RETURN_IF_ERROR(sink_->FlushFinal(runtime_state()));
-  return Status::OK();
-}
-
-void PlanFragmentExecutor::ReportProfileThread() {
-  VLOG_FILE << "ReportProfileThread(): instance_id="
-            << runtime_state_->fragment_instance_id();
-  DCHECK(!report_status_cb_.empty());
-  unique_lock<mutex> l(report_thread_lock_);
-  // tell Open() that we started
-  report_thread_active_ = true;
-  report_thread_started_cv_.notify_one();
-
-  // Jitter the reporting time of remote fragments by a random amount between
-  // 0 and the report_interval.  This way, the coordinator doesn't get all the
-  // updates at once so its better for contention as well as smoother progress
-  // reporting.
-  int report_fragment_offset = rand() % FLAGS_status_report_interval;
-  system_time timeout = get_system_time()
-      + posix_time::seconds(report_fragment_offset);
-  // We don't want to wait longer than it takes to run the entire fragment.
-  stop_report_thread_cv_.timed_wait(l, timeout);
-
-  while (report_thread_active_) {
-    system_time timeout = get_system_time()
-        + posix_time::seconds(FLAGS_status_report_interval);
-
-    // timed_wait can return because the timeout occurred or the condition variable
-    // was signaled.  We can't rely on its return value to distinguish between the
-    // two cases (e.g. there is a race here where the wait timed out but before grabbing
-    // the lock, the condition variable was signaled).  Instead, we will use an external
-    // flag, report_thread_active_, to coordinate this.
-    stop_report_thread_cv_.timed_wait(l, timeout);
-
-    if (VLOG_FILE_IS_ON) {
-      VLOG_FILE << "Reporting " << (!report_thread_active_ ? "final " : " ")
-          << "profile for instance " << runtime_state_->fragment_instance_id();
-      stringstream ss;
-      profile()->PrettyPrint(&ss);
-      VLOG_FILE << ss.str();
-    }
-
-    if (!report_thread_active_) break;
-    SendReport(false, Status::OK());
-  }
-
-  VLOG_FILE << "exiting reporting thread: instance_id="
-      << runtime_state_->fragment_instance_id();
-}
-
-void PlanFragmentExecutor::SendReport(bool done, const Status& status) {
-  DCHECK(status.ok() || done);
-  if (report_status_cb_.empty()) return;
-
-  // Update the counter for the peak per host mem usage.
-  if (per_host_mem_usage_ != nullptr) {
-    per_host_mem_usage_->Set(runtime_state()->query_mem_tracker()->peak_consumption());
-  }
-
-  // This will send a report even if we are cancelled.  If the query completed correctly
-  // but fragments still need to be cancelled (e.g. limit reached), the coordinator will
-  // be waiting for a final report and profile.
-  RuntimeProfile* prof = is_prepared_ ? profile() : nullptr;
-  report_status_cb_(status, prof, done);
-}
-
-void PlanFragmentExecutor::StopReportThread() {
-  if (!report_thread_active_) return;
-  {
-    lock_guard<mutex> l(report_thread_lock_);
-    report_thread_active_ = false;
-  }
-  stop_report_thread_cv_.notify_one();
-  report_thread_->Join();
-}
-
-void PlanFragmentExecutor::FragmentComplete(const Status& status) {
-  ReleaseThreadToken();
-  StopReportThread();
-  // It's safe to send final report now that the reporting thread is stopped.
-  SendReport(true, status);
-}
-
-void PlanFragmentExecutor::Cancel() {
-  VLOG_QUERY << "Cancelling fragment instance...";
-  lock_guard<mutex> l(prepare_lock_);
-  is_cancelled_ = true;
-  if (!is_prepared_) {
-    VLOG_QUERY << "Cancel() called before Prepare()";
-    return;
-  }
-
-  // Ensure that the sink is closed from both sides. Although in ordinary executions we
-  // rely on the consumer to do this, in error cases the consumer may not be able to send
-  // CloseConsumer() (see IMPALA-4348 for an example).
-  if (root_sink_ != nullptr) root_sink_->CloseConsumer();
-
-  DCHECK(runtime_state_ != NULL);
-  VLOG_QUERY << "Cancel(): instance_id=" << runtime_state_->fragment_instance_id();
-  runtime_state_->set_is_cancelled(true);
-  runtime_state_->stream_mgr()->Cancel(runtime_state_->fragment_instance_id());
-}
-
-RuntimeProfile* PlanFragmentExecutor::profile() {
-  return runtime_state_->runtime_profile();
-}
-
-void PlanFragmentExecutor::ReleaseThreadToken() {
-  if (has_thread_token_) {
-    has_thread_token_ = false;
-    runtime_state_->resource_pool()->ReleaseThreadToken(true);
-    PeriodicCounterUpdater::StopSamplingCounter(average_thread_tokens_);
-    PeriodicCounterUpdater::StopTimeSeriesCounter(
-        thread_usage_sampled_counter_);
-  }
-}
-
-void PlanFragmentExecutor::Close() {
-  DCHECK(!has_thread_token_);
-  DCHECK(!report_thread_active_);
-
-  if (closed_) return;
-  if (!is_prepared_) return;
-  if (sink_.get() != nullptr) sink_->Close(runtime_state());
-
-  row_batch_.reset();
-
-  // Prepare should always have been called, and so runtime_state_ should be set
-  DCHECK(prepared_promise_.IsSet());
-  if (exec_tree_ != NULL) exec_tree_->Close(runtime_state_.get());
-  if (mem_usage_sampled_counter_ != NULL) {
-    // This counter references runtime_state_->instance_mem_tracker() so must be
-    // stopped before calling ReleaseResources().
-    PeriodicCounterUpdater::StopTimeSeriesCounter(mem_usage_sampled_counter_);
-    mem_usage_sampled_counter_ = NULL;
-  }
-  // Sanity timer checks
-#ifndef NDEBUG
-  int64_t total_time = profile()->total_time_counter()->value();
-  int64_t other_time = 0;
-  for (auto& name: {PREPARE_TIMER_NAME, OPEN_TIMER_NAME, EXEC_TIMER_NAME}) {
-    RuntimeProfile::Counter* counter = timings_profile_->GetCounter(name);
-    if (counter != nullptr) other_time += counter->value();
-  }
-  // TODO: IMPALA-4631: Occasionally we see other_time = total_time + 1 for some reason
-  // we don't yet understand, so add 1 to total_time to avoid DCHECKing in that case.
-  DCHECK_LE(other_time, total_time + 1);
-#endif
-  runtime_state_->ReleaseResources();
-
-  closed_ = true;
-}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/runtime/plan-fragment-executor.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/plan-fragment-executor.h b/be/src/runtime/plan-fragment-executor.h
deleted file mode 100644
index 2194e58..0000000
--- a/be/src/runtime/plan-fragment-executor.h
+++ /dev/null
@@ -1,305 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-
-#ifndef IMPALA_SERVICE_PLAN_EXECUTOR_H
-#define IMPALA_SERVICE_PLAN_EXECUTOR_H
-
-#include <vector>
-#include <boost/scoped_ptr.hpp>
-#include <boost/function.hpp>
-
-#include "common/object-pool.h"
-#include "common/status.h"
-#include "runtime/runtime-state.h"
-#include "util/promise.h"
-#include "util/runtime-profile-counters.h"
-#include "util/thread.h"
-
-namespace impala {
-
-class HdfsFsCache;
-class ExecNode;
-class PlanRootSink;
-class RowDescriptor;
-class RowBatch;
-class DataSink;
-class DataStreamMgr;
-class RuntimeProfile;
-class RuntimeState;
-class TRowBatch;
-class TPlanExecRequest;
-class TPlanFragment;
-class TPlanExecParams;
-
-/// PlanFragmentExecutor handles all aspects of the execution of a single plan fragment,
-/// including setup and tear-down, both in the success and error case. Tear-down, which
-/// happens in Close(), frees all memory allocated for this plan fragment and closes all
-/// data streams.
-///
-/// The lifecycle of a PlanFragmentExecutor is as follows:
-///     if (Prepare().ok()) {
-///       Open()
-///       Exec()
-///     }
-///     Close()
-///
-/// The executor makes an aggregated profile for the entire fragment available, which
-/// includes profile information for the plan itself as well as the output sink.
-///
-/// The ReportStatusCallback passed into the c'tor is invoked periodically to report the
-/// execution profile. The frequency of those reports is controlled by the flag
-/// status_report_interval; setting that flag to 0 disables periodic reporting altogether
-/// Regardless of the value of that flag, if a report callback is specified, it is invoked
-/// at least once at the end of execution with an overall status and profile (and 'done'
-/// indicator).
-///
-/// Aside from Cancel(), which may be called asynchronously, this class is not
-/// thread-safe.
-class PlanFragmentExecutor {
- public:
-  /// Callback to report execution status of plan fragment.
-  /// 'profile' is the cumulative profile, 'done' indicates whether the execution
-  /// is done or still continuing.
-  /// Note: this does not take a const RuntimeProfile&, because it might need to call
-  /// functions like PrettyPrint() or ToThrift(), neither of which is const
-  /// because they take locks.
-  typedef boost::function<
-      void (const Status& status, RuntimeProfile* profile, bool done)>
-      ReportStatusCallback;
-
-  /// report_status_cb, if !empty(), is used to report the accumulated profile
-  /// information periodically during execution.
-  PlanFragmentExecutor(const ReportStatusCallback& report_status_cb);
-
-  /// It is an error to delete a PlanFragmentExecutor with a report callback before Exec()
-  /// indicated that execution is finished, or to delete one that has not been Close()'d
-  /// if Prepare() has been called.
-  ~PlanFragmentExecutor();
-
-  /// Prepare for execution. Call this prior to Open().
-  ///
-  /// runtime_state() will not be valid until Prepare() is called. runtime_state() will
-  /// always be valid after Prepare() returns, unless the query was cancelled before
-  /// Prepare() was called.  If request.query_options.mem_limit > 0, it is used as an
-  /// approximate limit on the number of bytes this query can consume at runtime.  The
-  /// query will be aborted (MEM_LIMIT_EXCEEDED) if it goes over that limit.
-  ///
-  /// If Cancel() is called before Prepare(), Prepare() is a no-op and returns
-  /// Status::CANCELLED;
-  ///
-  /// If Prepare() fails, it will invoke final status callback with the error status.
-  /// TODO: remove desc_tbl parameter once we do a per-query exec rpc (and we
-  /// have a single descriptor table to cover all fragment instances); at the moment
-  /// we need to pass the TDescriptorTable explicitly
-  Status Prepare(QueryState* query_state, const TDescriptorTable& desc_tbl,
-      const TPlanFragmentCtx& fragment_ctx, const TPlanFragmentInstanceCtx& instance_ctx);
-
-  /// Opens the fragment plan and sink. Starts the profile reporting thread, if
-  /// required.  Can be called only if Prepare() succeeded. If Open() fails it will
-  /// invoke the final status callback with the error status.
-  /// TODO: is this needed? It's only ever called in conjunction with Exec() and Close()
-  Status Open();
-
-  /// Executes the fragment by repeatedly driving the sink with batches produced by the
-  /// exec node tree. report_status_cb will have been called for the final time when
-  /// Exec() returns, and the status-reporting thread will have been stopped. Can be
-  /// called only if Open() succeeded.
-  Status Exec();
-
-  /// Closes the underlying plan fragment and frees up all resources allocated in
-  /// Prepare() and Open(). Must be called if Prepare() has been called - no matter
-  /// whether or not Prepare() succeeded.
-  void Close();
-
-  /// Initiate cancellation. If called concurrently with Prepare(), will wait for
-  /// Prepare() to finish in order to properly tear down Prepare()'d state.
-  ///
-  /// Cancel() may be called more than once. Calls after the first will have no
-  /// effect. Duplicate calls to Cancel() are not serialised, and may safely execute
-  /// concurrently.
-  ///
-  /// It is legal to call Cancel() if Prepare() returned an error.
-  void Cancel();
-
-  /// call these only after Prepare()
-  RuntimeState* runtime_state() { return runtime_state_.get(); }
-
-  /// Profile information for plan and output sink.
-  RuntimeProfile* profile();
-
-  /// Blocks until Prepare() is completed.
-  Status WaitForPrepare() { return prepared_promise_.Get(); }
-
-  /// Blocks until exec tree and sink are both opened. It is an error to call this before
-  /// Prepare() has completed. If Prepare() returned an error, WaitForOpen() will
-  /// return that error without blocking.
-  Status WaitForOpen();
-
-  /// Returns fragment instance's sink if this is the root fragment instance. Valid after
-  /// Prepare() returns; if Prepare() fails may be nullptr.
-  PlanRootSink* root_sink() { return root_sink_; }
-
-  /// Name of the counter that is tracking per query, per host peak mem usage.
-  static const std::string PER_HOST_PEAK_MEM_COUNTER;
-
- private:
-  ExecNode* exec_tree_; // lives in runtime_state_->obj_pool()
-  TUniqueId query_id_;
-
-  /// profile reporting-related
-  ReportStatusCallback report_status_cb_;
-  boost::scoped_ptr<Thread> report_thread_;
-  boost::mutex report_thread_lock_;
-
-  /// Indicates that profile reporting thread should stop.
-  /// Tied to report_thread_lock_.
-  boost::condition_variable stop_report_thread_cv_;
-
-  /// Indicates that profile reporting thread started.
-  /// Tied to report_thread_lock_.
-  boost::condition_variable report_thread_started_cv_;
-
-  /// When the report thread starts, it sets 'report_thread_active_' to true and signals
-  /// 'report_thread_started_cv_'. The report thread is shut down by setting
-  /// 'report_thread_active_' to false and signalling 'stop_report_thread_cv_'. Protected
-  /// by 'report_thread_lock_'.
-  bool report_thread_active_;
-
-  /// true if Close() has been called
-  bool closed_;
-
-  /// true if this fragment has not returned the thread token to the thread resource mgr
-  bool has_thread_token_;
-
-  /// 'runtime_state_' has to be before 'sink_' as 'sink_' relies on the object pool of
-  /// 'runtime_state_'. This means 'sink_' is destroyed first so any implicit connections
-  /// (e.g. mem_trackers_) from 'runtime_state_' to 'sink_' need to be severed prior to
-  /// the dtor of 'runtime_state_'.
-  boost::scoped_ptr<RuntimeState> runtime_state_;
-
-  /// Profile for timings for each stage of the plan fragment instance's lifecycle.
-  RuntimeProfile* timings_profile_;
-
-  /// Output sink for rows sent to this fragment. Created in Prepare(), owned by this
-  /// object.
-  boost::scoped_ptr<DataSink> sink_;
-
-  /// Set if this fragment instance is the root of the entire plan, so that a consumer can
-  /// pull results by calling root_sink_->GetNext(). Same object as sink_.
-  PlanRootSink* root_sink_ = nullptr;
-
-  boost::scoped_ptr<RowBatch> row_batch_;
-
-  /// Protects is_prepared_ and is_cancelled_, and is also used to coordinate between
-  /// Prepare() and Cancel() to ensure mutual exclusion.
-  boost::mutex prepare_lock_;
-
-  /// True if Prepare() has been called and done some work - even if it returned an
-  /// error. If Cancel() was called before Prepare(), is_prepared_ will not be set.
-  bool is_prepared_;
-
-  /// Set when Prepare() returns.
-  Promise<Status> prepared_promise_;
-
-  /// Set when OpenInternal() returns.
-  Promise<Status> opened_promise_;
-
-  /// True if and only if Cancel() has been called.
-  bool is_cancelled_;
-
-  /// A counter for the per query, per host peak mem usage. Note that this is not the
-  /// max of the peak memory of all fragments running on a host since it needs to take
-  /// into account when they are running concurrently. All fragments for a single query
-  /// on a single host will have the same value for this counter.
-  RuntimeProfile::Counter* per_host_mem_usage_;
-
-  /// Number of rows returned by this fragment
-  RuntimeProfile::Counter* rows_produced_counter_;
-
-  /// Average number of thread tokens for the duration of the plan fragment execution.
-  /// Fragments that do a lot of cpu work (non-coordinator fragment) will have at
-  /// least 1 token.  Fragments that contain a hdfs scan node will have 1+ tokens
-  /// depending on system load.  Other nodes (e.g. hash join node) can also reserve
-  /// additional tokens.
-  /// This is a measure of how much CPU resources this fragment used during the course
-  /// of the execution.
-  RuntimeProfile::Counter* average_thread_tokens_;
-
-  /// Sampled memory usage at even time intervals.
-  RuntimeProfile::TimeSeriesCounter* mem_usage_sampled_counter_;
-
-  /// Sampled thread usage (tokens) at even time intervals.
-  RuntimeProfile::TimeSeriesCounter* thread_usage_sampled_counter_;
-
-  ObjectPool* obj_pool() { return runtime_state_->obj_pool(); }
-
-  /// typedef for TPlanFragmentInstanceCtx.per_node_scan_ranges
-  typedef std::map<TPlanNodeId, std::vector<TScanRangeParams>> PerNodeScanRanges;
-
-  /// Main loop of profile reporting thread.
-  /// Exits when notified on stop_report_thread_cv_ and report_thread_active_ is set to
-  /// false. This will not send the final report.
-  void ReportProfileThread();
-
-  /// Invoked the report callback. If 'done' is true, sends the final report with
-  /// 'status' and the profile. This type of report is sent once and only by the
-  /// instance execution thread.  Otherwise, a profile-only report is sent, which the
-  /// ReportProfileThread() thread will do periodically.
-  void SendReport(bool done, const Status& status);
-
-  /// Called when the fragment execution is complete to finalize counters and send
-  /// the final status report.  Must be called only once.
-  void FragmentComplete(const Status& status);
-
-  /// Optimizes the code-generated functions in runtime_state_->llvm_codegen().
-  /// Must be called after exec_tree_->Prepare() and before exec_tree_->Open().
-  /// Returns error if LLVM optimization or compilation fails.
-  Status OptimizeLlvmModule();
-
-  /// Executes Open() logic and returns resulting status. Does not set status_.
-  Status OpenInternal();
-
-  /// Pulls row batches from fragment instance and pushes them to sink_ in a loop. Returns
-  /// OK if the input was exhausted and sent to the sink successfully, an error otherwise.
-  /// If ExecInternal() returns without an error condition, all rows will have been sent
-  /// to the sink, the sink will have been closed, a final report will have been sent and
-  /// the report thread will have been stopped.
-  Status ExecInternal();
-
-  /// Performs all the logic of Prepare() and returns resulting status.
-  /// TODO: remove desc_tbl parameter as part of per-query exec rpc
-  Status PrepareInternal(QueryState* qs, const TDescriptorTable& desc_tbl,
-      const TPlanFragmentCtx& fragment_ctx,
-      const TPlanFragmentInstanceCtx& instance_ctx);
-
-  /// Releases the thread token for this fragment executor.
-  void ReleaseThreadToken();
-
-  /// Stops report thread, if one is running. Blocks until report thread terminates.
-  /// Idempotent.
-  void StopReportThread();
-
-  /// Print stats about scan ranges for each volumeId in params to info log.
-  void PrintVolumeIds(const PerNodeScanRanges& per_node_scan_ranges);
-
-  const DescriptorTbl& desc_tbl() { return runtime_state_->desc_tbl(); }
-};
-
-}
-
-#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/runtime/query-exec-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-exec-mgr.cc b/be/src/runtime/query-exec-mgr.cc
index 8b72ed9..6057b52 100644
--- a/be/src/runtime/query-exec-mgr.cc
+++ b/be/src/runtime/query-exec-mgr.cc
@@ -41,48 +41,51 @@ using namespace impala;
 DEFINE_int32(log_mem_usage_interval, 0, "If non-zero, impalad will output memory usage "
     "every log_mem_usage_interval'th fragment completion.");
 
-Status QueryExecMgr::StartFInstance(const TExecPlanFragmentParams& params) {
-  TUniqueId instance_id = params.fragment_instance_ctx.fragment_instance_id;
-  VLOG_QUERY << "StartFInstance() instance_id=" << PrintId(instance_id)
+Status QueryExecMgr::StartQuery(const TExecQueryFInstancesParams& params) {
+  TUniqueId query_id = params.query_ctx.query_id;
+  VLOG_QUERY << "StartQueryFInstances() query_id=" << PrintId(query_id)
              << " coord=" << params.query_ctx.coord_address;
 
   bool dummy;
-  QueryState* qs = GetOrCreateQueryState(
-      params.query_ctx, params.fragment_instance_ctx.request_pool, &dummy);
-  DCHECK(params.__isset.fragment_ctx);
-  DCHECK(params.__isset.fragment_instance_ctx);
-  Status status = qs->Prepare();
+  QueryState* qs = GetOrCreateQueryState(params.query_ctx, &dummy);
+  Status status = qs->Init(params);
   if (!status.ok()) {
     ReleaseQueryState(qs);
     return status;
   }
-
-  FragmentInstanceState* fis = qs->obj_pool()->Add(new FragmentInstanceState(
-      qs, params.fragment_ctx, params.fragment_instance_ctx, params.query_ctx.desc_tbl));
-  // register instance before returning so that async Cancel() calls can
-  // find the instance
-  qs->RegisterFInstance(fis);
-  // start new thread to execute instance
+  // avoid blocking the rpc handler thread for too long by starting a new thread for
+  // query startup (which takes ownership of the QueryState reference)
   Thread t("query-exec-mgr",
-      Substitute("exec-fragment-instance-$0", PrintId(instance_id)),
-      &QueryExecMgr::ExecFInstance, this, fis);
+      Substitute("start-query-finstances-$0", PrintId(query_id)),
+      &QueryExecMgr::StartQueryHelper, this, qs);
   t.Detach();
-
-  ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS_IN_FLIGHT->Increment(1L);
-  ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS->Increment(1L);
   return Status::OK();
 }
 
-QueryState* QueryExecMgr::CreateQueryState(
-    const TQueryCtx& query_ctx, const string& request_pool) {
+QueryState* QueryExecMgr::CreateQueryState(const TQueryCtx& query_ctx) {
   bool created;
-  QueryState* qs = GetOrCreateQueryState(query_ctx, request_pool, &created);
+  QueryState* qs = GetOrCreateQueryState(query_ctx, &created);
   DCHECK(created);
   return qs;
 }
 
+QueryState* QueryExecMgr::GetQueryState(const TUniqueId& query_id) {
+  QueryState* qs = nullptr;
+  int refcnt;
+  {
+    lock_guard<mutex> l(qs_map_lock_);
+    auto it = qs_map_.find(query_id);
+    if (it == qs_map_.end()) return nullptr;
+    qs = it->second;
+    refcnt = qs->refcnt_.Add(1);
+  }
+  DCHECK(qs != nullptr && refcnt > 0);
+  VLOG_QUERY << "QueryState: query_id=" << query_id << " refcnt=" << refcnt;
+  return qs;
+}
+
 QueryState* QueryExecMgr::GetOrCreateQueryState(
-    const TQueryCtx& query_ctx, const string& request_pool, bool* created) {
+    const TQueryCtx& query_ctx, bool* created) {
   QueryState* qs = nullptr;
   int refcnt;
   {
@@ -90,30 +93,26 @@ QueryState* QueryExecMgr::GetOrCreateQueryState(
     auto it = qs_map_.find(query_ctx.query_id);
     if (it == qs_map_.end()) {
       // register new QueryState
-      qs = new QueryState(query_ctx, request_pool);
+      qs = new QueryState(query_ctx);
       qs_map_.insert(make_pair(query_ctx.query_id, qs));
-      VLOG_QUERY << "new QueryState: query_id=" << query_ctx.query_id;
       *created = true;
     } else {
       qs = it->second;
       *created = false;
     }
-    // decremented at the end of ExecFInstance()
+    // decremented by ReleaseQueryState()
     refcnt = qs->refcnt_.Add(1);
   }
-  DCHECK(qs != nullptr && qs->refcnt_.Load() > 0);
-  VLOG_QUERY << "QueryState: query_id=" << query_ctx.query_id << " refcnt=" << refcnt;
+  DCHECK(qs != nullptr && refcnt > 0);
   return qs;
 }
 
-void QueryExecMgr::ExecFInstance(FragmentInstanceState* fis) {
-  fis->Exec();
 
-  ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS_IN_FLIGHT->Increment(-1L);
-  VLOG_QUERY << "Instance completed. instance_id=" << PrintId(fis->instance_id());
+void QueryExecMgr::StartQueryHelper(QueryState* qs) {
+  qs->StartFInstances();
 
 #ifndef ADDRESS_SANITIZER
-  // tcmalloc and address sanitizer can not be used together
+  // tcmalloc and address sanitizer cannot be used together
   if (FLAGS_log_mem_usage_interval > 0) {
     uint64_t num_complete = ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS->value();
     if (num_complete % FLAGS_log_mem_usage_interval == 0) {
@@ -125,19 +124,8 @@ void QueryExecMgr::ExecFInstance(FragmentInstanceState* fis) {
   }
 #endif
 
-  // decrement refcount taken in StartFInstance()
-  ReleaseQueryState(fis->query_state());
-}
-
-QueryState* QueryExecMgr::GetQueryState(const TUniqueId& query_id) {
-  VLOG_QUERY << "GetQueryState(): query_id=" << PrintId(query_id);
-  lock_guard<mutex> l(qs_map_lock_);
-  auto it = qs_map_.find(query_id);
-  if (it == qs_map_.end()) return nullptr;
-  QueryState* qs = it->second;
-  int32_t cnt = qs->refcnt_.Add(1);
-  DCHECK_GT(cnt, 0);
-  return qs;
+  // decrement refcount taken in StartQuery()
+  ReleaseQueryState(qs);
 }
 
 void QueryExecMgr::ReleaseQueryState(QueryState* qs) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/runtime/query-exec-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-exec-mgr.h b/be/src/runtime/query-exec-mgr.h
index 92e756f..8a0c884 100644
--- a/be/src/runtime/query-exec-mgr.h
+++ b/be/src/runtime/query-exec-mgr.h
@@ -37,30 +37,24 @@ class FragmentInstanceState;
 
 /// A daemon-wide registry and manager of QueryStates. This is the central
 /// entry point for gaining refcounted access to a QueryState. It also initiates
-/// fragment instance execution.
+/// query execution.
 /// Thread-safe.
-///
-/// TODO: as part of Impala-2550 (per-query exec rpc)
-/// replace Start-/CancelFInstance() with StartQuery()/CancelQuery()
 class QueryExecMgr {
  public:
-  /// Initiates execution of this fragment instance in a newly created thread.
-  /// Also creates a QueryState for this query, if none exists.
-  /// In both cases it increases the refcount prior to instance execution and decreases
-  /// it after execution finishes.
+  /// Creates QueryState if it doesn't exist and initiates execution of all fragment
+  /// instance for this query. All fragment instances hold a reference to their
+  /// QueryState for the duration of their execution.
   ///
-  /// Returns an error if there was some unrecoverable problem before the fragment
-  /// was started (like low memory). In that case, no QueryState is created or has its
-  /// refcount incremented. After this call returns, it is legal to call
-  /// FragmentInstanceState::Cancel() on this fragment instance, regardless of the
-  /// return value of this function.
-  Status StartFInstance(const TExecPlanFragmentParams& params);
+  /// Returns an error if there was some unrecoverable problem before any instance
+  /// was started (like low memory). In that case, no QueryState is created.
+  /// After this function returns, it is legal to call QueryState::Cancel(), regardless of
+  /// the return value of this function.
+  Status StartQuery(const TExecQueryFInstancesParams& params);
 
   /// Creates a QueryState for the given query with the provided parameters. Only valid
   /// to call if the QueryState does not already exist. The caller must call
   /// ReleaseQueryState() with the returned QueryState to decrement the refcount.
-  QueryState* CreateQueryState(
-      const TQueryCtx& query_ctx, const std::string& request_pool);
+  QueryState* CreateQueryState(const TQueryCtx& query_ctx);
 
   /// If a QueryState for the given query exists, increments that refcount and returns
   /// the QueryState, otherwise returns nullptr.
@@ -78,11 +72,11 @@ class QueryExecMgr {
 
   /// Gets the existing QueryState or creates a new one if not present.
   /// 'created' is set to true if it was created, false otherwise.
-  QueryState* GetOrCreateQueryState(
-      const TQueryCtx& query_ctx, const std::string& request_pool, bool* created);
+  /// Increments the refcount.
+  QueryState* GetOrCreateQueryState(const TQueryCtx& query_ctx, bool* created);
 
-  /// Execute instance.
-  void ExecFInstance(FragmentInstanceState* fis);
+  /// Execute instances and decrement refcount (acquire ownership of qs).
+  void StartQueryHelper(QueryState* qs);
 };
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/runtime/query-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index 2cc4818..1ae0f36 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -21,20 +21,27 @@
 #include <boost/thread/locks.hpp>
 #include <kudu/client/client.h>
 
+#include "exprs/expr.h"
 #include "exec/kudu-util.h"
 #include "runtime/bufferpool/buffer-pool.h"
 #include "runtime/bufferpool/reservation-tracker.h"
+#include "runtime/backend-client.h"
 #include "runtime/exec-env.h"
 #include "runtime/fragment-instance-state.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/query-exec-mgr.h"
+#include "runtime/runtime-state.h"
 #include "util/debug-util.h"
+#include "util/impalad-metrics.h"
+#include "util/thread.h"
 
 #include "common/names.h"
 
 using boost::algorithm::join;
 using namespace impala;
 
+#define RETRY_SLEEP_MS 100
+
 struct QueryState::KuduClientPtr {
   kudu::client::sp::shared_ptr<kudu::client::KuduClient> kudu_client;
 };
@@ -49,14 +56,17 @@ QueryState::ScopedRef::~ScopedRef() {
   ExecEnv::GetInstance()->query_exec_mgr()->ReleaseQueryState(query_state_);
 }
 
-QueryState::QueryState(const TQueryCtx& query_ctx, const std::string& pool)
+QueryState::QueryState(const TQueryCtx& query_ctx, const string& request_pool)
   : query_ctx_(query_ctx),
     refcnt_(0),
-    prepared_(false),
-    released_resources_(false),
-    buffer_reservation_(nullptr),
-    file_group_(nullptr) {
-  TQueryOptions& query_options = query_ctx_.client_request.query_options;
+    is_cancelled_(0) {
+  if (query_ctx_.request_pool.empty()) {
+    // fix up pool name for tests
+    DCHECK(!request_pool.empty());
+    const_cast<TQueryCtx&>(query_ctx_).request_pool = request_pool;
+  }
+  TQueryOptions& query_options =
+      const_cast<TQueryOptions&>(query_ctx_.client_request.query_options);
   // max_errors does not indicate how many errors in total have been recorded, but rather
   // how many are distinct. It is defined as the sum of the number of generic errors and
   // the number of distinct other errors.
@@ -66,59 +76,56 @@ QueryState::QueryState(const TQueryCtx& query_ctx, const std::string& pool)
   if (query_options.batch_size <= 0) {
     query_options.__set_batch_size(DEFAULT_BATCH_SIZE);
   }
-  InitMemTrackers(pool);
+  InitMemTrackers();
 }
 
 void QueryState::ReleaseResources() {
+  DCHECK(!released_resources_);
   // Clean up temporary files.
   if (file_group_ != nullptr) file_group_->Close();
   // Release any remaining reservation.
   if (buffer_reservation_ != nullptr) buffer_reservation_->Close();
   // Avoid dangling reference from the parent of 'query_mem_tracker_'.
   if (query_mem_tracker_ != nullptr) query_mem_tracker_->UnregisterFromParent();
+  if (desc_tbl_ != nullptr) desc_tbl_->ReleaseResources();
   released_resources_ = true;
 }
 
 QueryState::~QueryState() {
   DCHECK(released_resources_);
+  DCHECK_EQ(refcnt_.Load(), 0);
 }
 
-Status QueryState::Prepare() {
-  lock_guard<SpinLock> l(prepare_lock_);
-  if (prepared_) {
-    DCHECK(prepare_status_.ok());
-    return Status::OK();
-  }
-  RETURN_IF_ERROR(prepare_status_);
-
-  Status status;
+Status QueryState::Init(const TExecQueryFInstancesParams& rpc_params) {
   // Starting a new query creates threads and consumes a non-trivial amount of memory.
   // If we are already starved for memory, fail as early as possible to avoid consuming
   // more resources.
   ExecEnv* exec_env = ExecEnv::GetInstance();
   MemTracker* process_mem_tracker = exec_env->process_mem_tracker();
   if (process_mem_tracker->LimitExceeded()) {
-    string msg = Substitute("Query $0 could not start because the backend Impala daemon "
-                            "is over its memory limit",
-        PrintId(query_id()));
-    status = process_mem_tracker->MemLimitExceeded(NULL, msg, 0);
-    goto error;
+    string msg = Substitute(
+        "Query $0 could not start because the backend Impala daemon "
+        "is over its memory limit", PrintId(query_id()));
+    RETURN_IF_ERROR(process_mem_tracker->MemLimitExceeded(NULL, msg, 0));
   }
   // Do buffer-pool-related setup if running in a backend test that explicitly created
   // the pool.
-  if (exec_env->buffer_pool() != nullptr) {
-    status = InitBufferPoolState();
-    if (!status.ok()) goto error;
-  }
-  prepared_ = true;
-  return Status::OK();
+  if (exec_env->buffer_pool() != nullptr) RETURN_IF_ERROR(InitBufferPoolState());
 
-error:
-  prepare_status_ = status;
-  return status;
+  // don't copy query_ctx, it's large and we already did that in the c'tor
+  rpc_params_.__set_coord_state_idx(rpc_params.coord_state_idx);
+  TExecQueryFInstancesParams& non_const_params =
+      const_cast<TExecQueryFInstancesParams&>(rpc_params);
+  rpc_params_.fragment_ctxs.swap(non_const_params.fragment_ctxs);
+  rpc_params_.__isset.fragment_ctxs = true;
+  rpc_params_.fragment_instance_ctxs.swap(non_const_params.fragment_instance_ctxs);
+  rpc_params_.__isset.fragment_instance_ctxs = true;
+
+  return Status::OK();
 }
 
-void QueryState::InitMemTrackers(const std::string& pool) {
+void QueryState::InitMemTrackers() {
+  const string& pool = query_ctx_.request_pool;
   int64_t bytes_limit = -1;
   if (query_options().__isset.mem_limit && query_options().mem_limit > 0) {
     bytes_limit = query_options().mem_limit;
@@ -160,23 +167,194 @@ Status QueryState::InitBufferPoolState() {
   return Status::OK();
 }
 
-void QueryState::RegisterFInstance(FragmentInstanceState* fis) {
-  VLOG_QUERY << "RegisterFInstance(): instance_id=" << PrintId(fis->instance_id());
-  lock_guard<SpinLock> l(fis_map_lock_);
-  DCHECK_EQ(fis_map_.count(fis->instance_id()), 0);
-  fis_map_.insert(make_pair(fis->instance_id(), fis));
-}
-
 FragmentInstanceState* QueryState::GetFInstanceState(const TUniqueId& instance_id) {
   VLOG_FILE << "GetFInstanceState(): instance_id=" << PrintId(instance_id);
-  lock_guard<SpinLock> l(fis_map_lock_);
+  if (!instances_prepared_promise_.Get().ok()) return nullptr;
   auto it = fis_map_.find(instance_id);
   return it != fis_map_.end() ? it->second : nullptr;
 }
 
-Status QueryState::GetKuduClient(const std::vector<std::string>& master_addresses,
-                                 kudu::client::KuduClient** client) {
-  std::string master_addr_concat = join(master_addresses, ",");
+void QueryState::ReportExecStatus(bool done, const Status& status,
+    FragmentInstanceState* fis) {
+  ReportExecStatusAux(done, status, fis, true);
+}
+
+void QueryState::ReportExecStatusAux(bool done, const Status& status,
+    FragmentInstanceState* fis, bool instances_started) {
+  // if we're reporting an error, we're done
+  DCHECK(status.ok() || done);
+  // if this is not for a specific fragment instance, we're reporting an error
+  DCHECK(fis != nullptr || !status.ok());
+  DCHECK(fis == nullptr || fis->IsPrepared());
+
+  // This will send a report even if we are cancelled.  If the query completed correctly
+  // but fragments still need to be cancelled (e.g. limit reached), the coordinator will
+  // be waiting for a final report and profile.
+
+  Status coord_status;
+  ImpalaBackendConnection coord(ExecEnv::GetInstance()->impalad_client_cache(),
+      query_ctx().coord_address, &coord_status);
+  if (!coord_status.ok()) {
+    // TODO: this might flood the log
+    LOG(WARNING) << "Couldn't get a client for " << query_ctx().coord_address
+        <<"\tReason: " << coord_status.GetDetail();
+    if (instances_started) Cancel();
+    return;
+  }
+
+  TReportExecStatusParams params;
+  params.protocol_version = ImpalaInternalServiceVersion::V1;
+  params.__set_query_id(query_ctx().query_id);
+  DCHECK(rpc_params().__isset.coord_state_idx);
+  params.__set_coord_state_idx(rpc_params().coord_state_idx);
+
+  if (fis != nullptr) {
+    // create status for 'fis'
+    params.instance_exec_status.emplace_back();
+    params.__isset.instance_exec_status = true;
+    TFragmentInstanceExecStatus& instance_status = params.instance_exec_status.back();
+    instance_status.__set_fragment_instance_id(fis->instance_id());
+    status.SetTStatus(&instance_status);
+    instance_status.__set_done(done);
+
+    if (fis->profile() != nullptr) {
+      fis->profile()->ToThrift(&instance_status.profile);
+      instance_status.__isset.profile = true;
+    }
+
+    // Only send updates to insert status if fragment is finished, the coordinator
+    // waits until query execution is done to use them anyhow.
+    if (done) {
+      TInsertExecStatus insert_status;
+      if (fis->runtime_state()->hdfs_files_to_move()->size() > 0) {
+        insert_status.__set_files_to_move(*fis->runtime_state()->hdfs_files_to_move());
+      }
+      if (fis->runtime_state()->per_partition_status()->size() > 0) {
+        insert_status.__set_per_partition_status(
+            *fis->runtime_state()->per_partition_status());
+      }
+      params.__set_insert_exec_status(insert_status);
+    }
+
+    // Send new errors to coordinator
+    fis->runtime_state()->GetUnreportedErrors(&params.error_log);
+    params.__isset.error_log = (params.error_log.size() > 0);
+  }
+
+  TReportExecStatusResult res;
+  Status rpc_status;
+  bool retry_is_safe;
+  // Try to send the RPC 3 times before failing.
+  for (int i = 0; i < 3; ++i) {
+    rpc_status = coord.DoRpc(
+        &ImpalaBackendClient::ReportExecStatus, params, &res, &retry_is_safe);
+    if (rpc_status.ok()) break;
+    if (!retry_is_safe) break;
+    if (i < 2) SleepForMs(RETRY_SLEEP_MS);
+  }
+  Status result_status(res.status);
+  if ((!rpc_status.ok() || !result_status.ok()) && instances_started) {
+    // TODO: should we try to keep rpc_status for the final report? (but the final
+    // report, following this Cancel(), may not succeed anyway.)
+    // TODO: not keeping an error status here means that all instances might
+    // abort with CANCELLED status, despite there being an error
+    Cancel();
+  }
+}
+
+Status QueryState::WaitForPrepare() {
+  return instances_prepared_promise_.Get();
+}
+
+void QueryState::StartFInstances() {
+  VLOG_QUERY << "StartFInstances(): query_id=" << PrintId(query_id())
+      << " #instances=" << rpc_params_.fragment_instance_ctxs.size();
+  DCHECK_GT(refcnt_.Load(), 0);
+
+  // set up desc tbl
+  DCHECK(query_ctx().__isset.desc_tbl);
+  Status status = DescriptorTbl::Create(
+      &obj_pool_, query_ctx().desc_tbl, query_mem_tracker_, &desc_tbl_);
+  if (!status.ok()) {
+    instances_prepared_promise_.Set(status);
+    ReportExecStatusAux(true, status, nullptr, false);
+    return;
+  }
+  VLOG_QUERY << "descriptor table for query=" << PrintId(query_id())
+             << "\n" << desc_tbl_->DebugString();
+
+  DCHECK_GT(rpc_params_.fragment_ctxs.size(), 0);
+  TPlanFragmentCtx* fragment_ctx = &rpc_params_.fragment_ctxs[0];
+  int fragment_ctx_idx = 0;
+  for (const TPlanFragmentInstanceCtx& instance_ctx: rpc_params_.fragment_instance_ctxs) {
+    // determine corresponding TPlanFragmentCtx
+    if (fragment_ctx->fragment.idx != instance_ctx.fragment_idx) {
+      ++fragment_ctx_idx;
+      DCHECK_LT(fragment_ctx_idx, rpc_params_.fragment_ctxs.size());
+      fragment_ctx = &rpc_params_.fragment_ctxs[fragment_ctx_idx];
+      // we expect fragment and instance contexts to follow the same order
+      DCHECK_EQ(fragment_ctx->fragment.idx, instance_ctx.fragment_idx);
+    }
+    FragmentInstanceState* fis = obj_pool_.Add(
+        new FragmentInstanceState(this, *fragment_ctx, instance_ctx));
+    fis_map_.emplace(fis->instance_id(), fis);
+
+    // update fragment_map_
+    vector<FragmentInstanceState*>& fis_list = fragment_map_[instance_ctx.fragment_idx];
+    fis_list.push_back(fis);
+
+    // start new thread to execute instance
+    refcnt_.Add(1);  // decremented in ExecFInstance()
+    Thread t("query-state",
+        Substitute(
+          "exec-query-finstance-$0", PrintId(instance_ctx.fragment_instance_id)),
+        &QueryState::ExecFInstance, this, fis);
+    t.Detach();
+  }
+
+  // don't return until every instance is prepared and record the first non-OK
+  // (non-CANCELLED if available) status
+  Status prepare_status;
+  for (auto entry: fis_map_) {
+    Status instance_status = entry.second->WaitForPrepare();
+    // don't wipe out an error in one instance with the resulting CANCELLED from
+    // the remaining instances
+    if (!instance_status.ok() && (prepare_status.ok() || prepare_status.IsCancelled())) {
+      prepare_status = instance_status;
+    }
+  }
+  instances_prepared_promise_.Set(prepare_status);
+}
+
+void QueryState::ExecFInstance(FragmentInstanceState* fis) {
+  ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS_IN_FLIGHT->Increment(1L);
+  ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS->Increment(1L);
+  VLOG_QUERY << "Executing instance. instance_id=" << PrintId(fis->instance_id())
+      << " fragment_idx=" << fis->instance_ctx().fragment_idx
+      << " per_fragment_instance_idx=" << fis->instance_ctx().per_fragment_instance_idx
+      << " coord_state_idx=" << rpc_params().coord_state_idx
+      << " #in-flight=" << ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS_IN_FLIGHT->value();
+  Status status = fis->Exec();
+  ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS_IN_FLIGHT->Increment(-1L);
+  VLOG_QUERY << "Instance completed. instance_id=" << PrintId(fis->instance_id())
+      << " #in-flight=" << ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS_IN_FLIGHT->value()
+      << " status=" << status;
+  // initiate cancellation if nobody has done so yet
+  if (!status.ok()) Cancel();
+  // decrement refcount taken in StartFInstances()
+  ExecEnv::GetInstance()->query_exec_mgr()->ReleaseQueryState(this);
+}
+
+void QueryState::Cancel() {
+  VLOG_QUERY << "Cancel: query_id=" << query_id();
+  (void) instances_prepared_promise_.Get();
+  if (!is_cancelled_.CompareAndSwap(0, 1)) return;
+  for (auto entry: fis_map_) entry.second->Cancel();
+}
+
+Status QueryState::GetKuduClient(
+    const vector<string>& master_addresses, kudu::client::KuduClient** client) {
+  string master_addr_concat = join(master_addresses, ",");
   lock_guard<SpinLock> l(kudu_client_map_lock_);
   auto kudu_client_map_it = kudu_client_map_.find(master_addr_concat);
   if (kudu_client_map_it == kudu_client_map_.end()) {
@@ -191,3 +369,12 @@ Status QueryState::GetKuduClient(const std::vector<std::string>& master_addresse
   }
   return Status::OK();
 }
+
+void QueryState::PublishFilter(int32_t filter_id, int fragment_idx,
+    const TBloomFilter& thrift_bloom_filter) {
+  if (!instances_prepared_promise_.Get().ok()) return;
+  DCHECK_EQ(fragment_map_.count(fragment_idx), 1);
+  for (FragmentInstanceState* fis: fragment_map_[fragment_idx]) {
+    fis->PublishFilter(filter_id, thrift_bloom_filter);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/runtime/query-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-state.h b/be/src/runtime/query-state.h
index d7ce10f..5660a49 100644
--- a/be/src/runtime/query-state.h
+++ b/be/src/runtime/query-state.h
@@ -29,6 +29,7 @@
 #include "runtime/tmp-file-mgr.h"
 #include "util/spinlock.h"
 #include "util/uid-util.h"
+#include "util/promise.h"
 
 namespace kudu { namespace client { class KuduClient; } }
 
@@ -44,17 +45,35 @@ class ReservationTracker;
 /// instances; in contrast, fragment instance-specific state is collected in
 /// FragmentInstanceState.
 ///
-/// The lifetime of an instance of this class is dictated by a reference count.
-/// Any thread that executes on behalf of a query, and accesses any of its state,
-/// must obtain a reference to the corresponding QueryState and hold it for at least the
+/// The lifetime of a QueryState is dictated by a reference count. Any thread that
+/// executes on behalf of a query, and accesses any of its state, must obtain a
+/// reference to the corresponding QueryState and hold it for at least the
 /// duration of that access. The reference is obtained and released via
 /// QueryExecMgr::Get-/ReleaseQueryState() or via QueryState::ScopedRef (the latter
 /// for references limited to the scope of a single function or block).
-/// As long as the reference count is greater than 0, all query state (contained
-/// either in this class or accessible through this class, such as the
-/// FragmentInstanceStates) is guaranteed to be alive.
+/// As long as the reference count is greater than 0, all of a query's control
+/// structures (contained either in this class or accessible through this class, such
+/// as the FragmentInstanceStates) are guaranteed to be alive.
+///
+/// When any fragment instance execution returns with an error status, all
+/// fragment instances are automatically cancelled.
+///
+/// Status reporting: all instances currently report their status independently.
+/// Each instance sends at least one final status report with its overall execution
+/// status, so if any of the instances encountered an error, that error will be reported.
 ///
 /// Thread-safe, unless noted otherwise.
+///
+/// TODO:
+/// - set up kudu clients in Init(), remove related locking
+/// - release resources (those referenced directly or indirectly by the query result
+///   set) automatically when all instances have finished execution
+///   (either by returning all rows or by being cancelled), rather than waiting for an
+///   explicit call to ReleaseResources()
+/// - when ReportExecStatus() encounters an error, query execution at this node
+///   gets aborted, but it's possible for the coordinator not to find out about that;
+///   fix the coordinator to periodically ping the backends (should the coordinator
+///   simply poll for the status reports?)
 class QueryState {
  public:
   /// Use this class to obtain a QueryState for the duration of a function/block,
@@ -84,89 +103,126 @@ class QueryState {
   /// a shared pool for all objects that have query lifetime
   ObjectPool* obj_pool() { return &obj_pool_; }
 
-  /// This TQueryCtx was copied from the first fragment instance which led to the
-  /// creation of this QueryState. For all subsequently arriving fragment instances the
-  /// desc_tbl in this context will be incorrect, therefore query_ctx().desc_tbl should
-  /// not be used. This restriction will go away with the switch to a per-query exec
-  /// rpc.
   const TQueryCtx& query_ctx() const { return query_ctx_; }
-
-  const TUniqueId& query_id() const { return query_ctx_.query_id; }
-
+  const TUniqueId& query_id() const { return query_ctx().query_id; }
   const TQueryOptions& query_options() const {
     return query_ctx_.client_request.query_options;
   }
-
   MemTracker* query_mem_tracker() const { return query_mem_tracker_; }
+
+  // the following getters are only valid after Prepare()
   ReservationTracker* buffer_reservation() const { return buffer_reservation_; }
   TmpFileMgr::FileGroup* file_group() const { return file_group_; }
+  const TExecQueryFInstancesParams& rpc_params() const { return rpc_params_; }
+
+  // the following getters are only valid after StartFInstances()
+  const DescriptorTbl& desc_tbl() const { return *desc_tbl_; }
 
   /// Sets up state required for fragment execution: memory reservations, etc. Fails
-  /// if resources could not be acquired. Safe to call concurrently and idempotent:
-  /// the first thread to call this does the setup work.
-  Status Prepare();
+  /// if resources could not be acquired. Uses few cycles and never blocks.
+  /// Not idempotent, not thread-safe.
+  /// The remaining public functions must be called only after Init().
+  Status Init(const TExecQueryFInstancesParams& rpc_params) WARN_UNUSED_RESULT;
+
+  /// Performs the runtime-intensive parts of initial setup and starts all fragment
+  /// instances belonging to this query. Each instance receives its own execution
+  /// thread. Blocks until all fragment instances have finished their Prepare phase.
+  /// Not idempotent, not thread-safe.
+  void StartFInstances();
+
+  /// Return overall status of Prepare phases of fragment instances. A failure
+  /// in any instance's Prepare will cause this function to return an error status.
+  /// Blocks until all fragment instances have finished their Prepare phase.
+  Status WaitForPrepare();
+
+  /// Blocks until all fragment instances have finished their Prepare phase.
+  FragmentInstanceState* GetFInstanceState(const TUniqueId& instance_id);
 
-  /// Registers a new FInstanceState.
-  void RegisterFInstance(FragmentInstanceState* fis);
+  /// Blocks until all fragment instances have finished their Prepare phase.
+  void PublishFilter(int32_t filter_id, int fragment_idx,
+      const TBloomFilter& thrift_bloom_filter);
 
-  /// Returns the instance state or nullptr if the instance id has not previously
-  /// been registered. The returned FIS is valid for the duration of the QueryState.
-  FragmentInstanceState* GetFInstanceState(const TUniqueId& instance_id);
+  /// Cancels all actively executing fragment instances. Blocks until all fragment
+  /// instances have finished their Prepare phase. Idempotent.
+  void Cancel();
 
   /// Called once the query is complete to release any resources.
-  /// Must be called before destroying the QueryState.
+  /// Must be called only once and before destroying the QueryState.
+  /// Not idempotent, not thread-safe.
   void ReleaseResources();
 
   /// Gets a KuduClient for this list of master addresses. It will lookup and share
   /// an existing KuduClient if possible. Otherwise, it will create a new KuduClient
   /// internally and return a pointer to it. All KuduClients accessed through this
   /// interface are owned by the QueryState. Thread safe.
-  Status GetKuduClient(const std::vector<std::string>& master_addrs,
-                       kudu::client::KuduClient** client);
+  Status GetKuduClient(
+      const std::vector<std::string>& master_addrs, kudu::client::KuduClient** client)
+      WARN_UNUSED_RESULT;
+
+  /// Sends a ReportExecStatus rpc to the coordinator. If fis == nullptr, the
+  /// status must be an error. If fis is given, expects that fis finished its Prepare
+  /// phase; it then sends a report for that instance, including its profile.
+  /// If there is an error during the rpc, initiates cancellation.
+  void ReportExecStatus(bool done, const Status& status, FragmentInstanceState* fis);
 
   ~QueryState();
 
  private:
   friend class QueryExecMgr;
 
+  /// test execution
+  friend class RuntimeState;
+
   static const int DEFAULT_BATCH_SIZE = 1024;
 
-  TQueryCtx query_ctx_;
+  /// set in c'tor
+  const TQueryCtx query_ctx_;
 
-  ObjectPool obj_pool_;
-  AtomicInt32 refcnt_;
+  /// the top-level MemTracker for this query (owned by obj_pool_), created in c'tor
+  MemTracker* query_mem_tracker_ = nullptr;
 
-  /// Held for duration of Prepare(). Protects 'prepared_',
-  /// 'prepare_status_' and the members initialized in Prepare().
-  SpinLock prepare_lock_;
+  /// set in Prepare(); rpc_params_.query_ctx is *not* set to avoid duplication
+  /// with query_ctx_
+  /// TODO: find a way not to have to copy this
+  TExecQueryFInstancesParams rpc_params_;
 
-  /// Non-OK if Prepare() failed the first time it was called.
-  /// All subsequent calls to Prepare() return this status.
-  Status prepare_status_;
+  /// Buffer reservation for this query (owned by obj_pool_)
+  /// Only non-null in backend tests that explicitly enabled the new buffer pool
+  /// Set in Prepare().
+  /// TODO: this will always be non-null once IMPALA-3200 is done
+  ReservationTracker* buffer_reservation_ = nullptr;
 
-  /// True if Prepare() executed and finished successfully.
-  bool prepared_;
+  /// Temporary files for this query (owned by obj_pool_)
+  /// Only non-null in backend tests the explicitly enabled the new buffer pool
+  /// Set in Prepare().
+  /// TODO: this will always be non-null once IMPALA-3200 is done
+  TmpFileMgr::FileGroup* file_group_ = nullptr;
 
-  /// True if and only if ReleaseResources() has been called.
-  bool released_resources_;
+  /// created in StartFInstances(), owned by obj_pool_
+  DescriptorTbl* desc_tbl_ = nullptr;
 
-  SpinLock fis_map_lock_; // protects fis_map_
+  /// Barrier for the completion of the Prepare phases of all fragment instances,
+  /// set in StartFInstances().
+  Promise<Status> instances_prepared_promise_;
 
-  /// map from instance id to its state (owned by obj_pool_)
+  /// map from instance id to its state (owned by obj_pool_), populated in
+  /// StartFInstances(); not valid to read from until instances_prepare_promise_
+  /// is set
   std::unordered_map<TUniqueId, FragmentInstanceState*> fis_map_;
 
-  /// The top-level MemTracker for this query (owned by obj_pool_).
-  MemTracker* query_mem_tracker_;
+  /// map from fragment index to its instances (owned by obj_pool_), populated in
+  /// StartFInstances()
+  std::unordered_map<int, std::vector<FragmentInstanceState*>> fragment_map_;
 
-  /// Buffer reservation for this query (owned by obj_pool_)
-  /// Only non-null in backend tests that explicitly enabled the new buffer pool
-  /// TODO: this will always be non-null once IMPALA-3200 is done
-  ReservationTracker* buffer_reservation_;
+  ObjectPool obj_pool_;
+  AtomicInt32 refcnt_;
 
-  /// Temporary files for this query (owned by obj_pool_)
-  /// Only non-null in backend tests the explicitly enabled the new buffer pool
-  /// TODO: this will always be non-null once IMPALA-3200 is done
-  TmpFileMgr::FileGroup* file_group_;
+  /// set to 1 when any fragment instance fails or when Cancel() is called; used to
+  /// initiate cancellation exactly once
+  AtomicInt32 is_cancelled_;
+
+  /// True if and only if ReleaseResources() has been called.
+  bool released_resources_ = false;
 
   SpinLock kudu_client_map_lock_; // protects kudu_client_map_
 
@@ -184,16 +240,25 @@ class QueryState {
   /// that the master address lists be identical in order to share a KuduClient.
   KuduClientMap kudu_client_map_;
 
-  /// Create QueryState w/ copy of query_ctx and refcnt of 0.
-  /// The query is associated with the resource pool named 'pool'
-  QueryState(const TQueryCtx& query_ctx, const std::string& pool);
+  /// Create QueryState w/ refcnt of 0.
+  /// The query is associated with the resource pool query_ctx.request_pool or
+  /// 'request_pool', if the former is not set (needed for tests).
+  QueryState(const TQueryCtx& query_ctx, const std::string& request_pool = "");
+
+  /// Execute the fragment instance and decrement the refcnt when done.
+  void ExecFInstance(FragmentInstanceState* fis);
 
   /// Called from Prepare() to initialize MemTrackers.
-  void InitMemTrackers(const std::string& pool);
+  void InitMemTrackers();
 
-  /// Called from PrepareForExecution() to setup buffer reservations and the
+  /// Called from Prepare() to setup buffer reservations and the
   /// file group. Fails if required resources are not available.
-  Status InitBufferPoolState();
+  Status InitBufferPoolState() WARN_UNUSED_RESULT;
+
+  /// Same behavior as ReportExecStatus().
+  /// Cancel on error only if instances_started is true.
+  void ReportExecStatusAux(bool done, const Status& status, FragmentInstanceState* fis,
+      bool instances_started);
 };
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/runtime/runtime-filter-bank.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-filter-bank.cc b/be/src/runtime/runtime-filter-bank.cc
index 3e2dc6d..2a47cac 100644
--- a/be/src/runtime/runtime-filter-bank.cc
+++ b/be/src/runtime/runtime-filter-bank.cc
@@ -151,9 +151,10 @@ void RuntimeFilterBank::UpdateFilterFromLocal(int32_t filter_id,
 
   if (has_remote_target
       && state_->query_options().runtime_filter_mode == TRuntimeFilterMode::GLOBAL) {
+    params.__set_filter_id(filter_id);
+    params.__set_query_id(state_->query_id());
     BloomFilter::ToThrift(bloom_filter, &params.bloom_filter);
-    params.filter_id = filter_id;
-    params.query_id = state_->query_id();
+    params.__isset.bloom_filter = true;
 
     ExecEnv::GetInstance()->rpc_pool()->Offer(bind<void>(
         SendFilterToCoordinator, state_->query_ctx().coord_address, params,

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/runtime/runtime-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-state.cc b/be/src/runtime/runtime-state.cc
index 62d0737..c5e2f59 100644
--- a/be/src/runtime/runtime-state.cc
+++ b/be/src/runtime/runtime-state.cc
@@ -74,16 +74,13 @@ namespace impala {
 
 RuntimeState::RuntimeState(QueryState* query_state, const TPlanFragmentCtx& fragment_ctx,
     const TPlanFragmentInstanceCtx& instance_ctx, ExecEnv* exec_env)
-  : desc_tbl_(nullptr),
-    obj_pool_(new ObjectPool()),
-    query_state_(query_state),
+  : query_state_(query_state),
     fragment_ctx_(&fragment_ctx),
     instance_ctx_(&instance_ctx),
     now_(new TimestampValue(query_state->query_ctx().now_string.c_str(),
         query_state->query_ctx().now_string.size())),
     exec_env_(exec_env),
-    profile_(obj_pool_.get(), "Fragment " + PrintId(instance_ctx.fragment_instance_id)),
-    query_mem_tracker_(query_state_->query_mem_tracker()),
+    profile_(obj_pool(), "Fragment " + PrintId(instance_ctx.fragment_instance_id)),
     instance_buffer_reservation_(nullptr),
     is_cancelled_(false),
     root_node_id_(-1) {
@@ -91,20 +88,21 @@ RuntimeState::RuntimeState(QueryState* query_state, const TPlanFragmentCtx& frag
 }
 
 RuntimeState::RuntimeState(
-    const TQueryCtx& query_ctx, ExecEnv* exec_env, const std::string& request_pool)
-  : obj_pool_(new ObjectPool()),
-    query_state_(nullptr),
+    const TQueryCtx& qctx, ExecEnv* exec_env, DescriptorTbl* desc_tbl)
+  : query_state_(new QueryState(qctx, "test-pool")),
     fragment_ctx_(nullptr),
     instance_ctx_(nullptr),
-    local_query_ctx_(query_ctx),
-    now_(new TimestampValue(query_ctx.now_string.c_str(), query_ctx.now_string.size())),
+    local_query_state_(query_state_),
+    now_(new TimestampValue(qctx.now_string.c_str(), qctx.now_string.size())),
     exec_env_(exec_env),
-    profile_(obj_pool_.get(), "<unnamed>"),
-    query_mem_tracker_(MemTracker::CreateQueryMemTracker(
-        query_id(), query_options(), request_pool, obj_pool_.get())),
+    profile_(obj_pool(), "<unnamed>"),
     instance_buffer_reservation_(nullptr),
     is_cancelled_(false),
     root_node_id_(-1) {
+  if (query_ctx().request_pool.empty()) {
+    const_cast<TQueryCtx&>(query_ctx()).request_pool = "test-pool";
+  }
+  if (desc_tbl != nullptr) query_state_->desc_tbl_ = desc_tbl;
   Init();
 }
 
@@ -125,10 +123,10 @@ void RuntimeState::Init() {
   total_network_receive_timer_ = ADD_TIMER(runtime_profile(), "TotalNetworkReceiveTime");
 
   instance_mem_tracker_.reset(new MemTracker(
-      runtime_profile(), -1, runtime_profile()->name(), query_mem_tracker_));
+      runtime_profile(), -1, runtime_profile()->name(), query_mem_tracker()));
 
   if (query_state_ != nullptr && exec_env_->buffer_pool() != nullptr) {
-    instance_buffer_reservation_ = obj_pool_->Add(new ReservationTracker);
+    instance_buffer_reservation_ = obj_pool()->Add(new ReservationTracker);
     instance_buffer_reservation_->InitChildTracker(&profile_,
         query_state_->buffer_reservation(), instance_mem_tracker_.get(),
         numeric_limits<int64_t>::max());
@@ -143,7 +141,7 @@ Status RuntimeState::CreateBlockMgr() {
   DCHECK(block_mgr_.get() == NULL);
 
   // Compute the max memory the block mgr will use.
-  int64_t block_mgr_limit = query_mem_tracker_->lowest_limit();
+  int64_t block_mgr_limit = query_mem_tracker()->lowest_limit();
   if (block_mgr_limit < 0) block_mgr_limit = numeric_limits<int64_t>::max();
   block_mgr_limit = min(static_cast<int64_t>(block_mgr_limit * BLOCK_MGR_MEM_FRACTION),
       block_mgr_limit - BLOCK_MGR_MEM_MIN_REMAINING);
@@ -266,7 +264,6 @@ void RuntimeState::UnregisterReaderContexts() {
 
 void RuntimeState::ReleaseResources() {
   UnregisterReaderContexts();
-  if (desc_tbl_ != nullptr) desc_tbl_->ClosePartitionExprs(this);
   if (filter_bank_ != nullptr) filter_bank_->Close();
   if (resource_pool_ != nullptr) {
     exec_env_->thread_mgr()->UnregisterPool(resource_pool_);
@@ -277,17 +274,22 @@ void RuntimeState::ReleaseResources() {
   // Release the reservation, which should be unused at the point.
   if (instance_buffer_reservation_ != nullptr) instance_buffer_reservation_->Close();
 
-  // 'query_mem_tracker_' must be valid as long as 'instance_mem_tracker_' is so
+  // 'query_mem_tracker()' must be valid as long as 'instance_mem_tracker_' is so
   // delete 'instance_mem_tracker_' first.
   // LogUsage() walks the MemTracker tree top-down when the memory limit is exceeded, so
   // break the link between 'instance_mem_tracker_' and its parent before
   // 'instance_mem_tracker_' and its children are destroyed.
   instance_mem_tracker_->UnregisterFromParent();
+  if (instance_mem_tracker_->consumption() != 0) {
+    LOG(WARNING) << "Query " << query_id() << " may have leaked memory." << endl
+                 << instance_mem_tracker_->LogUsage();
+  }
   instance_mem_tracker_.reset();
 
-  // If this RuntimeState owns 'query_mem_tracker_' it must deregister it.
-  if (query_state_ == nullptr) query_mem_tracker_->UnregisterFromParent();
-  query_mem_tracker_ = nullptr;
+  if (local_query_state_.get() != nullptr) {
+    // if we created this QueryState, we must call ReleaseResources()
+    local_query_state_->ReleaseResources();
+  }
 }
 
 const std::string& RuntimeState::GetEffectiveUser() const {
@@ -314,14 +316,28 @@ HBaseTableFactory* RuntimeState::htable_factory() {
   return exec_env_->htable_factory();
 }
 
+ObjectPool* RuntimeState::obj_pool() const {
+  DCHECK(query_state_ != nullptr);
+  return query_state_->obj_pool();
+}
+
 const TQueryCtx& RuntimeState::query_ctx() const {
-  return query_state_ != nullptr ? query_state_->query_ctx() : local_query_ctx_;
+  DCHECK(query_state_ != nullptr);
+  return query_state_->query_ctx();
+}
+
+const DescriptorTbl& RuntimeState::desc_tbl() const {
+  DCHECK(query_state_ != nullptr);
+  return query_state_->desc_tbl();
 }
 
 const TQueryOptions& RuntimeState::query_options() const {
-  const TQueryCtx& query_ctx =
-      query_state_ != nullptr ? query_state_->query_ctx() : local_query_ctx_;
-  return query_ctx.client_request.query_options;
+  return query_ctx().client_request.query_options;
+}
+
+MemTracker* RuntimeState::query_mem_tracker() {
+  DCHECK(query_state_ != nullptr);
+  return query_state_->query_mem_tracker();
 }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/runtime/runtime-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-state.h b/be/src/runtime/runtime-state.h
index db9948f..d70459f 100644
--- a/be/src/runtime/runtime-state.h
+++ b/be/src/runtime/runtime-state.h
@@ -81,9 +81,10 @@ class RuntimeState {
   RuntimeState(QueryState* query_state, const TPlanFragmentCtx& fragment_ctx,
       const TPlanFragmentInstanceCtx& instance_ctx, ExecEnv* exec_env);
 
-  /// RuntimeState for executing expr in fe-support.
+  /// RuntimeState for test execution and fe-support.cc. Creates its own QueryState and
+  /// installs desc_tbl, if set. If query_ctx.request_pool isn't set, sets it to "test-pool".
   RuntimeState(
-      const TQueryCtx& query_ctx, ExecEnv* exec_env, const std::string& request_pool);
+      const TQueryCtx& query_ctx, ExecEnv* exec_env, DescriptorTbl* desc_tbl = nullptr);
 
   /// Empty d'tor to avoid issues with scoped_ptr.
   ~RuntimeState();
@@ -95,9 +96,9 @@ class RuntimeState {
   Status CreateBlockMgr();
 
   QueryState* query_state() const { return query_state_; }
-  ObjectPool* obj_pool() const { return obj_pool_.get(); }
-  const DescriptorTbl& desc_tbl() const { return *desc_tbl_; }
-  void set_desc_tbl(DescriptorTbl* desc_tbl) { desc_tbl_ = desc_tbl; }
+  /// Return the query's ObjectPool
+  ObjectPool* obj_pool() const;
+  const DescriptorTbl& desc_tbl() const;
   const TQueryOptions& query_options() const;
   int batch_size() const { return query_options().batch_size; }
   bool abort_on_error() const { return query_options().abort_on_error; }
@@ -128,7 +129,7 @@ class RuntimeState {
   CatalogServiceClientCache* catalogd_client_cache();
   DiskIoMgr* io_mgr();
   MemTracker* instance_mem_tracker() { return instance_mem_tracker_.get(); }
-  MemTracker* query_mem_tracker() { return query_mem_tracker_; }
+  MemTracker* query_mem_tracker();  // reference to the query_state_'s memtracker
   ReservationTracker* instance_buffer_reservation() {
     return instance_buffer_reservation_;
   }
@@ -253,7 +254,7 @@ class RuntimeState {
   Status LogOrReturnError(const ErrorMsg& message);
 
   bool is_cancelled() const { return is_cancelled_; }
-  void set_is_cancelled(bool v) { is_cancelled_ = v; }
+  void set_is_cancelled() { is_cancelled_ = true; }
 
   RuntimeProfile::Counter* total_storage_wait_timer() {
     return total_storage_wait_timer_;
@@ -320,9 +321,6 @@ class RuntimeState {
     block_mgr_ = block_mgr;
   }
 
-  DescriptorTbl* desc_tbl_ = nullptr;
-  boost::scoped_ptr<ObjectPool> obj_pool_;
-
   /// Lock protecting error_log_
   SpinLock error_log_lock_;
 
@@ -330,13 +328,12 @@ class RuntimeState {
   ErrorLogMap error_log_;
 
   /// Global QueryState and original thrift descriptors for this fragment instance.
-  /// Not set by the (const TQueryCtx&) c'tor.
   QueryState* const query_state_;
   const TPlanFragmentCtx* const fragment_ctx_;
   const TPlanFragmentInstanceCtx* const instance_ctx_;
 
-  /// Provides query ctx if query_state_ == nullptr.
-  TQueryCtx local_query_ctx_;
+  /// only populated by the (const QueryCtx&, ExecEnv*, DescriptorTbl*) c'tor
+  boost::scoped_ptr<QueryState> local_query_state_;
 
   /// Provides instance id if instance_ctx_ == nullptr
   TUniqueId no_instance_id_;
@@ -377,16 +374,12 @@ class RuntimeState {
   /// Total CPU utilization for all threads in this plan fragment.
   RuntimeProfile::ThreadCounters* total_thread_statistics_;
 
-  /// Reference to the query MemTracker, owned by 'query_state_' if that is non-NULL
-  /// or stored in 'obj_pool_' otherwise.
-  MemTracker* query_mem_tracker_;
-
   /// Memory usage of this fragment instance, a child of 'query_mem_tracker_'.
   boost::scoped_ptr<MemTracker> instance_mem_tracker_;
 
   /// Buffer reservation for this fragment instance - a child of the query buffer
   /// reservation. Non-NULL if 'query_state_' is not NULL and ExecEnv::buffer_pool_
-  /// was created by a backend test. Owned by 'obj_pool_'.
+  /// was created by a backend test. Owned by obj_pool().
   ReservationTracker* instance_buffer_reservation_;
 
   /// if true, execution should stop with a CANCELLED status

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/runtime/test-env.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/test-env.cc b/be/src/runtime/test-env.cc
index 026b2ee..37b4363 100644
--- a/be/src/runtime/test-env.cc
+++ b/be/src/runtime/test-env.cc
@@ -18,17 +18,15 @@
 #include "runtime/test-env.h"
 
 #include <limits>
+#include <memory>
 
 #include "runtime/buffered-block-mgr.h"
 #include "runtime/query-exec-mgr.h"
 #include "runtime/tmp-file-mgr.h"
+#include "runtime/query-state.h"
 #include "util/disk-info.h"
 #include "util/impalad-metrics.h"
-
 #include "gutil/strings/substitute.h"
-
-#include <memory>
-
 #include "common/names.h"
 
 using boost::scoped_ptr;
@@ -40,8 +38,8 @@ scoped_ptr<MetricGroup> TestEnv::static_metrics_;
 
 TestEnv::TestEnv()
   : have_tmp_file_mgr_args_(false),
-    buffer_pool_min_buffer_len_(1024),
-    buffer_pool_capacity_(0) {}
+    buffer_pool_min_buffer_len_(-1),
+    buffer_pool_capacity_(-1) {}
 
 Status TestEnv::Init() {
   if (static_metrics_ == NULL) {
@@ -61,7 +59,9 @@ Status TestEnv::Init() {
   } else {
     RETURN_IF_ERROR(tmp_file_mgr()->Init(metrics()));
   }
-  exec_env_->InitBufferPool(buffer_pool_min_buffer_len_, buffer_pool_capacity_);
+  if (buffer_pool_min_buffer_len_ != -1 && buffer_pool_capacity_ != -1) {
+    exec_env_->InitBufferPool(buffer_pool_min_buffer_len_, buffer_pool_capacity_);
+  }
   return Status::OK();
 }
 
@@ -80,7 +80,7 @@ void TestEnv::SetBufferPoolArgs(int64_t min_buffer_len, int64_t capacity) {
 TestEnv::~TestEnv() {
   // Queries must be torn down first since they are dependent on global state.
   TearDownQueries();
-  exec_env_->disk_io_mgr_.reset();
+  // tear down exec env state to avoid leaks
   exec_env_.reset();
 }
 
@@ -113,13 +113,23 @@ Status TestEnv::CreateQueryState(
   if (query_options != nullptr) query_ctx.client_request.query_options = *query_options;
   query_ctx.query_id.hi = 0;
   query_ctx.query_id.lo = query_id;
+  query_ctx.request_pool = "test-pool";
 
   // CreateQueryState() enforces the invariant that 'query_id' must be unique.
-  QueryState* qs = exec_env_->query_exec_mgr()->CreateQueryState(query_ctx, "test-pool");
+  QueryState* qs = exec_env_->query_exec_mgr()->CreateQueryState(query_ctx);
   query_states_.push_back(qs);
-  RETURN_IF_ERROR(qs->Prepare());
-  FragmentInstanceState* fis = qs->obj_pool()->Add(new FragmentInstanceState(
-      qs, TPlanFragmentCtx(), TPlanFragmentInstanceCtx(), TDescriptorTable()));
+  // make sure to initialize data structures unrelated to the TExecQueryFInstancesParams
+  // param
+  TExecQueryFInstancesParams rpc_params;
+  // create dummy -Ctx fields, we need them for FragmentInstance-/RuntimeState
+  rpc_params.__set_coord_state_idx(0);
+  rpc_params.__set_query_ctx(TQueryCtx());
+  rpc_params.__set_fragment_ctxs(vector<TPlanFragmentCtx>({TPlanFragmentCtx()}));
+  rpc_params.__set_fragment_instance_ctxs(
+      vector<TPlanFragmentInstanceCtx>({TPlanFragmentInstanceCtx()}));
+  RETURN_IF_ERROR(qs->Init(rpc_params));
+  FragmentInstanceState* fis = qs->obj_pool()->Add(
+      new FragmentInstanceState(qs, qs->rpc_params().fragment_ctxs[0], qs->rpc_params().fragment_instance_ctxs[0]));
   RuntimeState* rs = qs->obj_pool()->Add(
       new RuntimeState(qs, fis->fragment_ctx(), fis->instance_ctx(), exec_env_.get()));
   runtime_states_.push_back(rs);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/runtime/test-env.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/test-env.h b/be/src/runtime/test-env.h
index 30e9309..f314452 100644
--- a/be/src/runtime/test-env.h
+++ b/be/src/runtime/test-env.h
@@ -22,11 +22,12 @@
 #include "runtime/exec-env.h"
 #include "runtime/fragment-instance-state.h"
 #include "runtime/mem-tracker.h"
-#include "runtime/query-state.h"
 #include "runtime/runtime-state.h"
 
 namespace impala {
 
+class QueryState;
+
 /// Helper testing class that creates an environment with runtime memory management
 /// similar to the one used by the Impala runtime. Only one TestEnv can be active at a
 /// time, because it modifies the global ExecEnv singleton.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/scheduling/query-schedule.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/query-schedule.cc b/be/src/scheduling/query-schedule.cc
index bb44145..a035c7d 100644
--- a/be/src/scheduling/query-schedule.cc
+++ b/be/src/scheduling/query-schedule.cc
@@ -220,14 +220,6 @@ const TPlanFragment& FInstanceExecParams::fragment() const {
   return fragment_exec_params.fragment;
 }
 
-int QuerySchedule::GetNumFragmentInstances() const {
-  int result = 0;
-  for (const FragmentExecParams& fragment_exec_params : fragment_exec_params_) {
-    result += fragment_exec_params.instance_exec_params.size();
-  }
-  return result;
-}
-
 const TPlanFragment* QuerySchedule::GetCoordFragment() const {
   // Only have coordinator fragment for statements that return rows.
   if (request_.stmt_type != TStmtType::QUERY) return nullptr;
@@ -262,4 +254,12 @@ vector<int> FragmentExecParams::GetInstanceIdxs() const {
   return result;
 }
 
+int QuerySchedule::GetNumFragmentInstances() const {
+  int total = 0;
+  for (const FragmentExecParams& p: fragment_exec_params_) {
+    total += p.instance_exec_params.size();
+  }
+  return total;
+}
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/scheduling/query-schedule.h
----------------------------------------------------------------------
diff --git a/be/src/scheduling/query-schedule.h b/be/src/scheduling/query-schedule.h
index 47848e8..8a985c7 100644
--- a/be/src/scheduling/query-schedule.h
+++ b/be/src/scheduling/query-schedule.h
@@ -132,9 +132,6 @@ class QuerySchedule {
   /// Helper methods used by scheduler to populate this QuerySchedule.
   void IncNumScanRanges(int64_t delta) { num_scan_ranges_ += delta; }
 
-  /// Returns the total number of fragment instances.
-  int GetNumFragmentInstances() const;
-
   /// Return the coordinator fragment, or nullptr if there isn't one.
   const TPlanFragment* GetCoordFragment() const;
 
@@ -148,6 +145,9 @@ class QuerySchedule {
     return plan_node_to_fragment_idx_[id];
   }
 
+  /// Return the total number of instances across all fragments.
+  int GetNumFragmentInstances() const;
+
   /// Returns next instance id. Instance ids are consecutive numbers generated from
   /// the query id.
   /// If the query contains a coordinator fragment instance, the generated instance
@@ -213,7 +213,7 @@ class QuerySchedule {
   // (TPlanFragment.idx)
   std::vector<FragmentExecParams> fragment_exec_params_;
 
-  /// The set of hosts that the query will run on excluding the coordinator.
+  /// The set of hosts that the query will run on including the coordinator.
   boost::unordered_set<TNetworkAddress> unique_hosts_;
 
   /// Total number of scan ranges of this query.