You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by mj...@apache.org on 2016/12/11 07:06:15 UTC

[3/4] incubator-impala git commit: IMPALA-4014: Introduce query-wide execution state.

IMPALA-4014: Introduce query-wide execution state.

This introduces a global structure to coordinate execution
of fragment instances on a backend for a single query.

New classes:
- QueryExecMgr: subsumes FragmentMgr
- QueryState
- FragmentInstanceState: replaces FragmentExecState

Change-Id: I962ae6b7cb7dc0d07fbb8f70317aeb01d88d400b
Reviewed-on: http://gerrit.cloudera.org:8080/4418
Reviewed-by: Marcel Kornacker <ma...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/4b2d76db
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/4b2d76db
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/4b2d76db

Branch: refs/heads/master
Commit: 4b2d76dbb523c3761a6f53983b635ce88bc67a0c
Parents: 48792eb
Author: Marcel Kornacker <ma...@cloudera.com>
Authored: Wed Oct 26 14:02:44 2016 -0700
Committer: Internal Jenkins <cl...@gerrit.cloudera.org>
Committed: Sun Dec 11 02:29:28 2016 +0000

----------------------------------------------------------------------
 be/src/benchmarks/expr-benchmark.cc             |   4 +-
 be/src/exec/catalog-op-executor.cc              |   2 +
 be/src/exec/data-source-scan-node.cc            |   2 +-
 be/src/exec/exchange-node.cc                    |   1 +
 be/src/exec/hash-table-test.cc                  |   7 +-
 be/src/exec/hdfs-scan-node-base.cc              |   4 +-
 be/src/exec/hdfs-scan-node.cc                   |   2 +-
 be/src/exec/union-node.cc                       |   2 +-
 be/src/exprs/expr-test.cc                       |   6 +-
 be/src/runtime/CMakeLists.txt                   |   3 +
 be/src/runtime/buffered-block-mgr-test.cc       |   8 +-
 be/src/runtime/buffered-tuple-stream-test.cc    |   3 +-
 be/src/runtime/coordinator.cc                   | 150 +++++++++--------
 be/src/runtime/coordinator.h                    |  39 +++--
 be/src/runtime/data-stream-test.cc              |  16 +-
 be/src/runtime/exec-env.cc                      |   6 +-
 be/src/runtime/exec-env.h                       |   6 +-
 be/src/runtime/fragment-instance-state.cc       | 165 +++++++++++++++++++
 be/src/runtime/fragment-instance-state.h        | 118 +++++++++++++
 be/src/runtime/plan-fragment-executor.cc        |  76 ++++-----
 be/src/runtime/plan-fragment-executor.h         |  16 +-
 be/src/runtime/query-exec-mgr.cc                | 165 +++++++++++++++++++
 be/src/runtime/query-exec-mgr.h                 |  78 +++++++++
 be/src/runtime/query-state.cc                   |  69 ++++++++
 be/src/runtime/query-state.h                    | 117 +++++++++++++
 be/src/runtime/runtime-filter-bank.cc           |   6 +-
 be/src/runtime/runtime-state.cc                 |  99 +++++++----
 be/src/runtime/runtime-state.h                  |  90 +++++-----
 be/src/runtime/test-env.cc                      |  51 +++---
 be/src/runtime/test-env.h                       |   7 +-
 be/src/runtime/thread-resource-mgr.h            |   2 +
 be/src/scheduling/query-schedule.cc             |   2 +-
 be/src/scheduling/request-pool-service.cc       |   2 +-
 be/src/scheduling/simple-scheduler.cc           |   3 +-
 be/src/service/CMakeLists.txt                   |   3 +-
 be/src/service/fe-support.cc                    |  10 +-
 be/src/service/fragment-exec-state.cc           | 145 ----------------
 be/src/service/fragment-exec-state.h            | 105 ------------
 be/src/service/fragment-mgr.cc                  | 154 -----------------
 be/src/service/fragment-mgr.h                   |  89 ----------
 be/src/service/impala-beeswax-server.cc         |  22 +--
 be/src/service/impala-hs2-server.cc             |  34 ++--
 be/src/service/impala-http-handler.cc           |   1 +
 be/src/service/impala-internal-service.cc       | 103 ++++++++++++
 be/src/service/impala-internal-service.h        |  54 ++----
 be/src/service/impala-server.cc                 |   9 +-
 be/src/service/impala-server.h                  |   6 +-
 be/src/service/query-exec-state.cc              |  25 +--
 be/src/service/query-exec-state.h               |   6 +-
 be/src/testutil/desc-tbl-builder.h              |   5 +
 be/src/udf/udf.cc                               |   4 +-
 be/src/util/container-util.h                    |   1 +
 be/src/util/thread.h                            |   5 +
 be/src/util/uid-util.h                          |  18 +-
 common/thrift/ImpalaInternalService.thrift      |   3 +-
 .../apache/impala/analysis/AnalysisContext.java |   2 +-
 .../org/apache/impala/analysis/Analyzer.java    |   2 +-
 .../impala/analysis/ColumnLineageGraph.java     |   6 +-
 .../org/apache/impala/analysis/SelectStmt.java  |   2 +-
 .../org/apache/impala/planner/HdfsScanNode.java |   2 +-
 .../impala/planner/SingleNodePlanner.java       |   2 +-
 .../org/apache/impala/planner/UnionNode.java    |   2 +-
 .../org/apache/impala/service/Frontend.java     |  18 +-
 .../apache/impala/common/FrontendTestBase.java  |   2 +-
 .../org/apache/impala/planner/PlannerTest.java  |   6 +-
 .../apache/impala/planner/PlannerTestBase.java  |  16 +-
 .../org/apache/impala/service/FrontendTest.java |   4 +-
 .../org/apache/impala/testutil/TestUtils.java   |   4 +-
 68 files changed, 1296 insertions(+), 901 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/benchmarks/expr-benchmark.cc
----------------------------------------------------------------------
diff --git a/be/src/benchmarks/expr-benchmark.cc b/be/src/benchmarks/expr-benchmark.cc
index 444df3a..cda183f 100644
--- a/be/src/benchmarks/expr-benchmark.cc
+++ b/be/src/benchmarks/expr-benchmark.cc
@@ -73,8 +73,8 @@ class Planner {
 
   Status GeneratePlan(const string& stmt, TExecRequest* result) {
     TQueryCtx query_ctx;
-    query_ctx.request.stmt = stmt;
-    query_ctx.request.query_options = query_options_;
+    query_ctx.client_request.stmt = stmt;
+    query_ctx.client_request.query_options = query_options_;
     query_ctx.__set_session(session_state_);
     ImpalaServer::PrepareQueryContext(&query_ctx);
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/exec/catalog-op-executor.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/catalog-op-executor.cc b/be/src/exec/catalog-op-executor.cc
index 4d39e95..f3aed05 100644
--- a/be/src/exec/catalog-op-executor.cc
+++ b/be/src/exec/catalog-op-executor.cc
@@ -22,6 +22,8 @@
 #include "exec/incr-stats-util.h"
 #include "common/status.h"
 #include "runtime/lib-cache.h"
+#include "runtime/client-cache-types.h"
+#include "runtime/exec-env.h"
 #include "service/impala-server.h"
 #include "service/hs2-util.h"
 #include "util/string-parser.h"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/exec/data-source-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/data-source-scan-node.cc b/be/src/exec/data-source-scan-node.cc
index 0faae94..3198a73 100644
--- a/be/src/exec/data-source-scan-node.cc
+++ b/be/src/exec/data-source-scan-node.cc
@@ -111,7 +111,7 @@ Status DataSourceScanNode::Open(RuntimeState* state) {
   params.__set_query_id(state->query_id());
   params.__set_table_name(tuple_desc_->table_desc()->name());
   params.__set_init_string(data_src_node_.init_string);
-  params.__set_authenticated_user_name(state->effective_user());
+  params.__set_authenticated_user_name(state->GetEffectiveUser());
   params.__set_row_schema(row_schema);
   params.__set_batch_size(FLAGS_data_source_batch_size);
   params.__set_predicates(data_src_node_.accepted_predicates);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/exec/exchange-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/exchange-node.cc b/be/src/exec/exchange-node.cc
index 0bbd42b..0f86339 100644
--- a/be/src/exec/exchange-node.cc
+++ b/be/src/exec/exchange-node.cc
@@ -23,6 +23,7 @@
 #include "runtime/data-stream-recvr.h"
 #include "runtime/runtime-state.h"
 #include "runtime/row-batch.h"
+#include "runtime/exec-env.h"
 #include "util/debug-util.h"
 #include "util/runtime-profile-counters.h"
 #include "util/time.h"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/exec/hash-table-test.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hash-table-test.cc b/be/src/exec/hash-table-test.cc
index a539bb1..9f428f1 100644
--- a/be/src/exec/hash-table-test.cc
+++ b/be/src/exec/hash-table-test.cc
@@ -185,8 +185,8 @@ class HashTableTest : public testing::Test {
   bool CreateHashTable(bool quadratic, int64_t initial_num_buckets,
       scoped_ptr<HashTable>* table, int block_size = 8 * 1024 * 1024,
       int max_num_blocks = 100, int reserved_blocks = 10) {
-    EXPECT_OK(test_env_->CreatePerQueryState(
-        next_query_id_++, max_num_blocks, block_size, &runtime_state_));
+    EXPECT_OK(test_env_->CreateQueryState(
+        next_query_id_++, max_num_blocks, block_size, nullptr, &runtime_state_));
     MemTracker* client_tracker = pool_.Add(
         new MemTracker(-1, "client", runtime_state_->instance_mem_tracker()));
     BufferedBlockMgr::Client* client;
@@ -604,7 +604,8 @@ TEST_F(HashTableTest, QuadraticInsertFullTest) {
 // Test that hashing empty string updates hash value.
 TEST_F(HashTableTest, HashEmpty) {
   EXPECT_TRUE(
-      test_env_->CreatePerQueryState(0, 100, 8 * 1024 * 1024, &runtime_state_).ok());
+      test_env_->CreateQueryState(
+        0, 100, 8 * 1024 * 1024, nullptr, &runtime_state_).ok());
   scoped_ptr<HashTableCtx> ht_ctx;
   Status status = HashTableCtx::Create(runtime_state_, build_expr_ctxs_,
       probe_expr_ctxs_, false /* !stores_nulls_ */,

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/exec/hdfs-scan-node-base.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc
index f7677eb..738ef36 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -235,7 +235,7 @@ Status HdfsScanNodeBase::Prepare(RuntimeState* state) {
       // TODO: this should be a DCHECK but we sometimes hit it. It's likely IMPALA-1702.
       LOG(ERROR) << "Bad table descriptor! table_id=" << hdfs_table_->id()
                  << " partition_id=" << split.partition_id
-                 << "\n" << PrintThrift(state->fragment_params());
+                 << "\n" << PrintThrift(state->instance_ctx());
       return Status("Query encountered invalid metadata, likely due to IMPALA-1702."
                     " Try rerunning the query.");
     }
@@ -367,7 +367,7 @@ Status HdfsScanNodeBase::Open(RuntimeState* state) {
     HdfsPartitionDescriptor* partition_desc = hdfs_table_->GetPartition(partition_id);
     DCHECK(partition_desc != NULL) << "table_id=" << hdfs_table_->id()
                                    << " partition_id=" << partition_id
-                                   << "\n" << PrintThrift(state->fragment_params());
+                                   << "\n" << PrintThrift(state->instance_ctx());
     partition_template_tuple_map_[partition_id] = InitTemplateTuple(
         partition_desc->partition_key_value_ctxs(), scan_node_pool_.get(), state);
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/exec/hdfs-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node.cc b/be/src/exec/hdfs-scan-node.cc
index 305fa28..03217e0 100644
--- a/be/src/exec/hdfs-scan-node.cc
+++ b/be/src/exec/hdfs-scan-node.cc
@@ -501,7 +501,7 @@ Status HdfsScanNode::ProcessSplit(const vector<FilterContext>& filter_ctxs,
   HdfsPartitionDescriptor* partition = hdfs_table_->GetPartition(partition_id);
   DCHECK(partition != NULL) << "table_id=" << hdfs_table_->id()
                             << " partition_id=" << partition_id
-                            << "\n" << PrintThrift(runtime_state_->fragment_params());
+                            << "\n" << PrintThrift(runtime_state_->instance_ctx());
 
   // IMPALA-3798: Filtering before the scanner is created can cause hangs if a header
   // split is filtered out, for sequence-based file formats. If the scanner does not

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/exec/union-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/union-node.cc b/be/src/exec/union-node.cc
index f4d3b69..5457737 100644
--- a/be/src/exec/union-node.cc
+++ b/be/src/exec/union-node.cc
@@ -172,7 +172,7 @@ Status UnionNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) {
   }
 
   // Only evaluate the const expr lists by the first fragment instance.
-  if (state->fragment_ctx().per_fragment_instance_idx == 0) {
+  if (state->instance_ctx().per_fragment_instance_idx == 0) {
     // Evaluate and materialize the const expr lists exactly once.
     while (const_expr_list_idx_ < const_expr_lists_.size()) {
       MaterializeExprs(

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/exprs/expr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/expr-test.cc b/be/src/exprs/expr-test.cc
index f439aa9..4185ea1 100644
--- a/be/src/exprs/expr-test.cc
+++ b/be/src/exprs/expr-test.cc
@@ -41,6 +41,7 @@
 #include "gen-cpp/hive_metastore_types.h"
 #include "rpc/thrift-client.h"
 #include "rpc/thrift-server.h"
+#include "runtime/runtime-state.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/raw-value.inline.h"
 #include "runtime/string-value.h"
@@ -52,6 +53,7 @@
 #include "util/debug-util.h"
 #include "util/string-parser.h"
 #include "util/test-info.h"
+#include "gen-cpp/ImpalaInternalService_types.h"
 
 #include "common/names.h"
 
@@ -1025,7 +1027,7 @@ template <typename T> void TestSingleLiteralConstruction(
     const ColumnType& type, const T& value, const string& string_val) {
   ObjectPool pool;
   RowDescriptor desc;
-  RuntimeState state(TExecPlanFragmentParams(), NULL);
+  RuntimeState state{TQueryCtx()};
   MemTracker tracker;
 
   Expr* expr = pool.Add(new Literal(type, value));
@@ -1041,7 +1043,7 @@ TEST_F(ExprTest, NullLiteral) {
   for (int type = TYPE_BOOLEAN; type != TYPE_DATE; ++type) {
     NullLiteral expr(static_cast<PrimitiveType>(type));
     ExprContext ctx(&expr);
-    RuntimeState state(TExecPlanFragmentParams(), NULL);
+    RuntimeState state{TQueryCtx()};
     MemTracker tracker;
     EXPECT_OK(ctx.Prepare(&state, RowDescriptor(), &tracker));
     EXPECT_OK(ctx.Open(&state));

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/runtime/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt
index 6602f07..640ab39 100644
--- a/be/src/runtime/CMakeLists.txt
+++ b/be/src/runtime/CMakeLists.txt
@@ -38,6 +38,7 @@ add_library(Runtime
   disk-io-mgr-stress.cc
   exec-env.cc
   free-pool.cc
+  fragment-instance-state.cc
   hbase-table.cc
   hbase-table-factory.cc
   hdfs-fs-cache.cc
@@ -47,6 +48,8 @@ add_library(Runtime
   multi-precision.cc
   parallel-executor.cc
   plan-fragment-executor.cc
+  query-exec-mgr.cc
+  query-state.cc
   test-env.cc
   types.cc
   raw-value.cc

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/runtime/buffered-block-mgr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-block-mgr-test.cc b/be/src/runtime/buffered-block-mgr-test.cc
index 5e99c09..1828ff8 100644
--- a/be/src/runtime/buffered-block-mgr-test.cc
+++ b/be/src/runtime/buffered-block-mgr-test.cc
@@ -142,8 +142,8 @@ class BufferedBlockMgrTest : public ::testing::Test {
   BufferedBlockMgr* CreateMgr(int64_t query_id, int max_buffers, int block_size,
       RuntimeState** query_state = NULL, TQueryOptions* query_options = NULL) {
     RuntimeState* state;
-    EXPECT_OK(test_env_->CreatePerQueryState(
-        query_id, max_buffers, block_size, &state, query_options));
+    EXPECT_OK(test_env_->CreateQueryState(
+        query_id, max_buffers, block_size, query_options, &state));
     if (query_state != NULL) *query_state = state;
     return state->block_mgr();
   }
@@ -558,8 +558,8 @@ class BufferedBlockMgrTest : public ::testing::Test {
     thread_group workers;
     // Create a shared RuntimeState with no BufferedBlockMgr.
     RuntimeState* shared_state =
-        new RuntimeState(TExecPlanFragmentParams(), test_env_->exec_env());
-    shared_state->InitMemTrackers(TUniqueId(), NULL, -1);
+        new RuntimeState(TQueryCtx(), test_env_->exec_env());
+    shared_state->InitMemTrackers(NULL, -1);
 
     for (int i = 0; i < num_threads; ++i) {
       thread* t = new thread(

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/runtime/buffered-tuple-stream-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-tuple-stream-test.cc b/be/src/runtime/buffered-tuple-stream-test.cc
index e98c334..1d36064 100644
--- a/be/src/runtime/buffered-tuple-stream-test.cc
+++ b/be/src/runtime/buffered-tuple-stream-test.cc
@@ -101,7 +101,8 @@ class SimpleTupleStreamTest : public testing::Test {
   /// Setup a block manager with the provided settings and client with no reservation,
   /// tracked by tracker_.
   void InitBlockMgr(int64_t limit, int block_size) {
-    ASSERT_OK(test_env_->CreatePerQueryState(0, limit, block_size, &runtime_state_));
+    ASSERT_OK(
+        test_env_->CreateQueryState(0, limit, block_size, nullptr, &runtime_state_));
     MemTracker* client_tracker = pool_.Add(
         new MemTracker(-1, "client", runtime_state_->instance_mem_tracker()));
     ASSERT_OK(runtime_state_->block_mgr()->RegisterClient(

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/runtime/coordinator.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 651d349..0cfb340 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -58,9 +58,11 @@
 #include "runtime/parallel-executor.h"
 #include "runtime/plan-fragment-executor.h"
 #include "runtime/row-batch.h"
+#include "runtime/query-exec-mgr.h"
+#include "runtime/query-state.h"
+#include "runtime/fragment-instance-state.h"
 #include "runtime/tuple-row.h"
 #include "scheduling/scheduler.h"
-#include "service/fragment-exec-state.h"
 #include "util/bloom-filter.h"
 #include "util/container-util.h"
 #include "util/counting-barrier.h"
@@ -123,9 +125,9 @@ struct DebugOptions {
 ///
 /// Concurrent accesses:
 /// - updates through UpdateFragmentExecStatus()
-class Coordinator::FragmentInstanceState {
+class Coordinator::InstanceState {
  public:
-  FragmentInstanceState(const FInstanceExecParams& params, ObjectPool* obj_pool)
+  InstanceState(const FInstanceExecParams& params, ObjectPool* obj_pool)
     : exec_params_(params),
       total_split_size_(0),
       profile_(nullptr),
@@ -333,7 +335,7 @@ class Coordinator::FilterState {
 
 };
 
-void Coordinator::FragmentInstanceState::ComputeTotalSplitSize(
+void Coordinator::InstanceState::ComputeTotalSplitSize(
     const PerNodeScanRanges& per_node_scan_ranges) {
   total_split_size_ = 0;
 
@@ -345,7 +347,7 @@ void Coordinator::FragmentInstanceState::ComputeTotalSplitSize(
   }
 }
 
-int64_t Coordinator::FragmentInstanceState::UpdateNumScanRangesCompleted() {
+int64_t Coordinator::InstanceState::UpdateNumScanRangesCompleted() {
   int64_t total = 0;
   CounterMap& complete = aggregate_counters_.scan_ranges_complete_counters;
   for (CounterMap::iterator i = complete.begin(); i != complete.end(); ++i) {
@@ -382,6 +384,10 @@ Coordinator::~Coordinator() {
   query_mem_tracker_.reset();
 }
 
+PlanFragmentExecutor* Coordinator::executor() {
+  return coord_instance_->executor();
+}
+
 TExecNodePhase::type GetExecNodePhase(const string& key) {
   map<int, const char*>::const_iterator entry =
       _TExecNodePhase_VALUES_TO_NAMES.begin();
@@ -439,7 +445,7 @@ Status Coordinator::Exec() {
   if (needs_finalization_) finalize_params_ = request.finalize_params;
 
   VLOG_QUERY << "Exec() query_id=" << schedule_.query_id()
-             << " stmt=" << request.query_ctx.request.stmt;
+             << " stmt=" << request.query_ctx.client_request.stmt;
   stmt_type_ = request.stmt_type;
   query_id_ = schedule_.query_id();
   desc_tbl_ = request.desc_tbl;
@@ -457,7 +463,7 @@ Status Coordinator::Exec() {
   progress_.Init(str, schedule_.num_scan_ranges());
 
   // runtime filters not yet supported for mt execution
-  bool is_mt_execution = request.query_ctx.request.query_options.mt_dop > 0;
+  bool is_mt_execution = request.query_ctx.client_request.query_options.mt_dop > 0;
   if (is_mt_execution) filter_mode_ = TRuntimeFilterMode::OFF;
 
   // to keep things simple, make async Cancel() calls wait until plan fragment
@@ -468,9 +474,9 @@ Status Coordinator::Exec() {
   // The coordinator may require a query mem tracker for result-caching, which tracks
   // memory via the query mem tracker.
   int64_t query_limit = -1;
-  if (query_ctx_.request.query_options.__isset.mem_limit
-      && query_ctx_.request.query_options.mem_limit > 0) {
-    query_limit = query_ctx_.request.query_options.mem_limit;
+  if (query_ctx_.client_request.query_options.__isset.mem_limit
+      && query_ctx_.client_request.query_options.mem_limit > 0) {
+    query_limit = query_ctx_.client_request.query_options.mem_limit;
   }
   MemTracker* pool_tracker = MemTracker::GetRequestPoolMemTracker(
       schedule_.request_pool(), exec_env_->process_mem_tracker());
@@ -484,38 +490,42 @@ Status Coordinator::Exec() {
   InitExecSummary();
   StartFInstances();
 
-  // In the error case, it's safe to return and not to get root_sink_ here to close - if
+  // In the error case, it's safe to return and not to get coord_sink_ here to close - if
   // there was an error, but the coordinator fragment was successfully started, it should
   // cancel itself when it receives an error status after reporting its profile.
   RETURN_IF_ERROR(FinishInstanceStartup());
 
   // Grab executor and wait until Prepare() has finished so that runtime state etc. will
-  // be set up. Must do this here in order to get a reference to root_fragment_instance_
-  // so that root_sink_ remains valid throughout query lifetime.
+  // be set up. Must do this here in order to get a reference to coord_instance_
+  // so that coord_sink_ remains valid throughout query lifetime.
   if (schedule_.GetCoordFragment() != nullptr) {
-    // Coordinator fragment instance has same ID as query.
-    root_fragment_instance_ =
-        ExecEnv::GetInstance()->fragment_mgr()->GetFragmentExecState(query_id_);
-    // Fragment instance might have been failed and unregistered itself even though it was
-    // successfully started (e.g. Prepare() might have failed).
-    if (root_fragment_instance_.get() == nullptr) {
-      FragmentInstanceState* root_state = fragment_instance_states_[0];
-      DCHECK(root_state != nullptr);
-      lock_guard<mutex> instance_state_lock(*root_state->lock());
+    QueryState* qs = ExecEnv::GetInstance()->query_exec_mgr()->GetQueryState(query_id_);
+    if (qs != nullptr) coord_instance_ = qs->GetFInstanceState(query_id_);
+    if (coord_instance_ == nullptr) {
+      // Coordinator instance might have failed and unregistered itself even
+      // though it was successfully started (e.g. Prepare() might have failed).
+      if (qs != nullptr) {
+        ExecEnv::GetInstance()->query_exec_mgr()->ReleaseQueryState(qs);
+        qs = nullptr;
+      }
+      InstanceState* coord_state = fragment_instance_states_[0];
+      DCHECK(coord_state != nullptr);
+      lock_guard<mutex> instance_state_lock(*coord_state->lock());
       // Try and return the fragment instance status if it was already set.
-      // TODO: Consider waiting for root_state->done() here.
-      RETURN_IF_ERROR(*root_state->status());
-      return Status(Substitute("Root fragment instance ($0) failed", PrintId(query_id_)));
+      // TODO: Consider waiting for coord_state->done() here.
+      RETURN_IF_ERROR(*coord_state->status());
+      return Status(
+          Substitute("Coordinator fragment instance ($0) failed", PrintId(query_id_)));
     }
-    executor_ = root_fragment_instance_->executor();
+
     // When WaitForPrepare() returns OK(), the executor's root sink will be set up. At
     // that point, the coordinator must be sure to call root_sink()->CloseConsumer(); the
     // fragment instance's executor will not complete until that point.
     // TODO: Consider moving this to Wait().
-    Status prepare_status = executor_->WaitForPrepare();
-    root_sink_ = executor_->root_sink();
+    Status prepare_status = executor()->WaitForPrepare();
+    coord_sink_ = executor()->root_sink();
     RETURN_IF_ERROR(prepare_status);
-    DCHECK(root_sink_ != nullptr);
+    DCHECK(coord_sink_ != nullptr);
   }
 
   PrintFragmentInstanceInfo();
@@ -523,7 +533,7 @@ Status Coordinator::Exec() {
 }
 
 void Coordinator::UpdateFilterRoutingTable(const FragmentExecParams& fragment_params) {
-  DCHECK(schedule_.request().query_ctx.request.query_options.mt_dop == 0);
+  DCHECK(schedule_.request().query_ctx.client_request.query_options.mt_dop == 0);
   int num_hosts = fragment_params.instance_exec_params.size();
   DCHECK_GT(num_hosts, 0);
   DCHECK_NE(filter_mode_, TRuntimeFilterMode::OFF)
@@ -614,8 +624,8 @@ void Coordinator::StartFInstances() {
     num_instances += fragment_params.instance_exec_params.size();
     for (const FInstanceExecParams& instance_params:
         fragment_params.instance_exec_params) {
-      FragmentInstanceState* exec_state = obj_pool()->Add(
-          new FragmentInstanceState(instance_params, obj_pool()));
+      InstanceState* exec_state = obj_pool()->Add(
+          new InstanceState(instance_params, obj_pool()));
       int instance_state_idx = GetInstanceIdx(instance_params.instance_id);
       fragment_instance_states_[instance_state_idx] = exec_state;
 
@@ -638,7 +648,7 @@ Status Coordinator::FinishInstanceStartup() {
   const TMetricDef& def =
       MakeTMetricDef("fragment-latencies", TMetricKind::HISTOGRAM, TUnit::TIME_MS);
   HistogramMetric latencies(def, 20000, 3);
-  for (FragmentInstanceState* exec_state: fragment_instance_states_) {
+  for (InstanceState* exec_state: fragment_instance_states_) {
     lock_guard<mutex> l(*exec_state->lock());
     // Preserve the first non-OK status, if there is one
     if (status.ok()) status = *exec_state->status();
@@ -859,7 +869,7 @@ Status Coordinator::FinalizeSuccessfulInsert() {
       HdfsPartitionDescriptor* part = hdfs_table->GetPartition(partition.second.id);
       DCHECK(part != NULL) << "table_id=" << hdfs_table->id()
                            << " partition_id=" << partition.second.id
-                           << "\n" <<  PrintThrift(runtime_state()->fragment_params());
+                           << "\n" <<  PrintThrift(runtime_state()->instance_ctx());
       part_path_ss << part->location();
     }
     const string& part_path = part_path_ss.str();
@@ -924,7 +934,8 @@ Status Coordinator::FinalizeSuccessfulInsert() {
           partition_create_ops.Add(CREATE_DIR, part_path);
         }
       }
-    } else if (!is_s3_path || !query_ctx_.request.query_options.s3_skip_insert_staging) {
+    } else if (!is_s3_path
+        || !query_ctx_.client_request.query_options.s3_skip_insert_staging) {
       // If the S3_SKIP_INSERT_STAGING query option is set, then the partition directories
       // would have already been created by the table sinks.
       if (FLAGS_insert_inherit_permissions && !is_s3_path) {
@@ -1071,8 +1082,8 @@ Status Coordinator::Wait() {
   has_called_wait_ = true;
 
   if (stmt_type_ == TStmtType::QUERY) {
-    DCHECK(executor_ != nullptr);
-    return UpdateStatus(executor_->WaitForOpen(), runtime_state()->fragment_instance_id(),
+    DCHECK(executor() != nullptr);
+    return UpdateStatus(executor()->WaitForOpen(), runtime_state()->fragment_instance_id(),
         FLAGS_hostname);
   }
 
@@ -1106,15 +1117,15 @@ Status Coordinator::GetNext(QueryResultSet* results, int max_rows, bool* eos) {
 
   if (returned_all_results_) {
     // May be called after the first time we set *eos. Re-set *eos and return here;
-    // already torn-down root_sink_ so no more work to do.
+    // already torn-down coord_sink_ so no more work to do.
     *eos = true;
     return Status::OK();
   }
 
-  DCHECK(root_sink_ != nullptr)
+  DCHECK(coord_sink_ != nullptr)
       << "GetNext() called without result sink. Perhaps Prepare() failed and was not "
       << "checked?";
-  Status status = root_sink_->GetNext(runtime_state(), results, max_rows, eos);
+  Status status = coord_sink_->GetNext(runtime_state(), results, max_rows, eos);
 
   // if there was an error, we need to return the query's error status rather than
   // the status we just got back from the local executor (which may well be CANCELLED
@@ -1126,8 +1137,8 @@ Status Coordinator::GetNext(QueryResultSet* results, int max_rows, bool* eos) {
     returned_all_results_ = true;
     // Trigger tear-down of coordinator fragment by closing the consumer. Must do before
     // WaitForAllInstances().
-    root_sink_->CloseConsumer();
-    root_sink_ = nullptr;
+    coord_sink_->CloseConsumer();
+    coord_sink_ = nullptr;
 
     // Don't return final NULL until all instances have completed.  GetNext must wait for
     // all instances to complete before ultimately signalling the end of execution via a
@@ -1148,12 +1159,12 @@ Status Coordinator::GetNext(QueryResultSet* results, int max_rows, bool* eos) {
 }
 
 void Coordinator::PrintFragmentInstanceInfo() {
-  for (FragmentInstanceState* state: fragment_instance_states_) {
+  for (InstanceState* state: fragment_instance_states_) {
     SummaryStats& acc = fragment_profiles_[state->fragment_idx()].bytes_assigned;
     acc(state->total_split_size());
   }
 
-  for (int id = (executor_ == NULL ? 0 : 1); id < fragment_profiles_.size(); ++id) {
+  for (int id = (executor() == NULL ? 0 : 1); id < fragment_profiles_.size(); ++id) {
     SummaryStats& acc = fragment_profiles_[id].bytes_assigned;
     double min = accumulators::min(acc);
     double max = accumulators::max(acc);
@@ -1168,7 +1179,7 @@ void Coordinator::PrintFragmentInstanceInfo() {
 
     if (VLOG_FILE_IS_ON) {
       VLOG_FILE << "Byte split for fragment " << id << " " << ss.str();
-      for (FragmentInstanceState* exec_state: fragment_instance_states_) {
+      for (InstanceState* exec_state: fragment_instance_states_) {
         if (exec_state->fragment_idx() != id) continue;
         VLOG_FILE << "data volume for ipaddress " << exec_state << ": "
                   << PrettyPrinter::Print(exec_state->total_split_size(), TUnit::BYTES);
@@ -1284,7 +1295,7 @@ void Coordinator::ExecRemoteFInstance(
     rpc_params.fragment_instance_ctx.__set_debug_phase(debug_options->phase);
   }
   int instance_state_idx = GetInstanceIdx(exec_params.instance_id);
-  FragmentInstanceState* exec_state = fragment_instance_states_[instance_state_idx];
+  InstanceState* exec_state = fragment_instance_states_[instance_state_idx];
   exec_state->ComputeTotalSplitSize(
       rpc_params.fragment_instance_ctx.per_node_scan_ranges);
   VLOG_FILE << "making rpc: ExecPlanFragment"
@@ -1357,7 +1368,7 @@ void Coordinator::CancelInternal() {
 
 void Coordinator::CancelFragmentInstances() {
   int num_cancelled = 0;
-  for (FragmentInstanceState* exec_state: fragment_instance_states_) {
+  for (InstanceState* exec_state: fragment_instance_states_) {
     DCHECK(exec_state != nullptr);
 
     // lock each exec_state individually to synchronize correctly with
@@ -1433,7 +1444,7 @@ Status Coordinator::UpdateFragmentExecStatus(const TReportExecStatusParams& para
         Substitute("Unknown fragment instance index $0 (max known: $1)",
             instance_state_idx, fragment_instance_states_.size() - 1));
   }
-  FragmentInstanceState* exec_state = fragment_instance_states_[instance_state_idx];
+  InstanceState* exec_state = fragment_instance_states_[instance_state_idx];
 
   const TRuntimeProfileTree& cumulative_profile = params.profile;
   Status status(params.status);
@@ -1507,9 +1518,8 @@ Status Coordinator::UpdateFragmentExecStatus(const TReportExecStatusParams& para
   if (VLOG_FILE_IS_ON) {
     stringstream s;
     exec_state->profile()->PrettyPrint(&s);
-    VLOG_FILE << "profile for query_id=" << query_id_
-              << " instance_id=" << exec_state->fragment_instance_id()
-               << "\n" << s.str();
+    VLOG_FILE << "profile for instance_id=" << exec_state->fragment_instance_id()
+              << "\n" << s.str();
   }
   // also print the cumulative profile
   // TODO: fix the coordinator/PlanFragmentExecutor, so this isn't needed
@@ -1533,14 +1543,14 @@ Status Coordinator::UpdateFragmentExecStatus(const TReportExecStatusParams& para
     lock_guard<mutex> l(lock_);
     exec_state->stopwatch()->Stop();
     DCHECK_GT(num_remaining_fragment_instances_, 0);
-    VLOG_QUERY << "Fragment instance completed: "
+    VLOG_QUERY << "Fragment instance completed:"
         << " id=" << PrintId(exec_state->fragment_instance_id())
         << " host=" << exec_state->impalad_address()
         << " remaining=" << num_remaining_fragment_instances_ - 1;
     if (VLOG_QUERY_IS_ON && num_remaining_fragment_instances_ > 1) {
       // print host/port info for the first backend that's still in progress as a
       // debugging aid for backend deadlocks
-      for (FragmentInstanceState* exec_state: fragment_instance_states_) {
+      for (InstanceState* exec_state: fragment_instance_states_) {
         lock_guard<mutex> l2(*exec_state->lock());
         if (!exec_state->done()) {
           VLOG_QUERY << "query_id=" << query_id_ << ": first in-progress backend: "
@@ -1567,7 +1577,7 @@ uint64_t Coordinator::GetLatestKuduInsertTimestamp() const {
 }
 
 RuntimeState* Coordinator::runtime_state() {
-  return executor_ == NULL ? NULL : executor_->runtime_state();
+  return executor() == NULL ? NULL : executor()->runtime_state();
 }
 
 MemTracker* Coordinator::query_mem_tracker() {
@@ -1595,7 +1605,7 @@ typedef struct {
   }
 } InstanceComparator;
 
-void Coordinator::UpdateAverageProfile(FragmentInstanceState* instance_state) {
+void Coordinator::UpdateAverageProfile(InstanceState* instance_state) {
   FragmentIdx fragment_idx = instance_state->fragment_idx();
   DCHECK_GE(fragment_idx, 0);
   DCHECK_LT(fragment_idx, fragment_profiles_.size());
@@ -1608,7 +1618,7 @@ void Coordinator::UpdateAverageProfile(FragmentInstanceState* instance_state) {
   data->root_profile->AddChild(instance_state->profile());
 }
 
-void Coordinator::ComputeFragmentSummaryStats(FragmentInstanceState* instance_state) {
+void Coordinator::ComputeFragmentSummaryStats(InstanceState* instance_state) {
   FragmentIdx fragment_idx = instance_state->fragment_idx();
   DCHECK_GE(fragment_idx, 0);
   DCHECK_LT(fragment_idx, fragment_profiles_.size());
@@ -1625,7 +1635,7 @@ void Coordinator::ComputeFragmentSummaryStats(FragmentInstanceState* instance_st
   data->root_profile->AddChild(instance_state->profile());
 }
 
-void Coordinator::UpdateExecSummary(const FragmentInstanceState& instance_state) {
+void Coordinator::UpdateExecSummary(const InstanceState& instance_state) {
   vector<RuntimeProfile*> children;
   instance_state.profile()->GetAllChildren(&children);
 
@@ -1667,12 +1677,12 @@ void Coordinator::ReportQuerySummary() {
 
   if (!fragment_instance_states_.empty()) {
     // Average all fragment instances for each fragment.
-    for (FragmentInstanceState* state: fragment_instance_states_) {
+    for (InstanceState* state: fragment_instance_states_) {
       state->profile()->ComputeTimeInProfile();
       UpdateAverageProfile(state);
       // Skip coordinator fragment, if one exists.
       // TODO: Can we remove the special casing here?
-      if (executor_ == nullptr || state->fragment_idx() != 0) {
+      if (coord_instance_ == nullptr || state->fragment_idx() != 0) {
         ComputeFragmentSummaryStats(state);
       }
       UpdateExecSummary(*state);
@@ -1680,7 +1690,7 @@ void Coordinator::ReportQuerySummary() {
 
     InstanceComparator comparator;
     // Per fragment instances have been collected, output summaries
-    for (int i = (executor_ != NULL ? 1 : 0); i < fragment_profiles_.size(); ++i) {
+    for (int i = (executor() != NULL ? 1 : 0); i < fragment_profiles_.size(); ++i) {
       fragment_profiles_[i].root_profile->SortChildren(comparator);
       SummaryStats& completion_times = fragment_profiles_[i].completion_times;
       SummaryStats& rates = fragment_profiles_[i].rates;
@@ -1719,7 +1729,7 @@ void Coordinator::ReportQuerySummary() {
     // Map from Impalad address to peak memory usage of this query
     typedef unordered_map<TNetworkAddress, int64_t> PerNodePeakMemoryUsage;
     PerNodePeakMemoryUsage per_node_peak_mem_usage;
-    for (FragmentInstanceState* state: fragment_instance_states_) {
+    for (InstanceState* state: fragment_instance_states_) {
       int64_t initial_usage = 0;
       int64_t* mem_usage = FindOrInsert(&per_node_peak_mem_usage,
           state->impalad_address(), initial_usage);
@@ -1740,7 +1750,7 @@ void Coordinator::ReportQuerySummary() {
 
 string Coordinator::GetErrorLog() {
   ErrorLogMap merged;
-  for (FragmentInstanceState* state: fragment_instance_states_) {
+  for (InstanceState* state: fragment_instance_states_) {
     lock_guard<mutex> l(*state->lock());
     if (state->error_log()->size() > 0)  MergeErrorMaps(&merged, *state->error_log());
   }
@@ -1760,7 +1770,7 @@ void Coordinator::SetExecPlanFragmentParams(
 
   // Remove filters that weren't selected during filter routing table construction.
   if (filter_mode_ != TRuntimeFilterMode::OFF) {
-    DCHECK(schedule_.request().query_ctx.request.query_options.mt_dop == 0);
+    DCHECK(schedule_.request().query_ctx.client_request.query_options.mt_dop == 0);
     int instance_idx = GetInstanceIdx(params.instance_id);
     for (TPlanNode& plan_node: rpc_params->fragment_ctx.fragment.plan.nodes) {
       if (plan_node.__isset.runtime_filters) {
@@ -1893,6 +1903,8 @@ void DistributeFilters(shared_ptr<TPublishFilterParams> params,
 
 }
 
+// TODO: call this as soon as it's clear that we won't reference the state
+// anymore, ie, in CancelInternal() and when GetNext() hits eos
 void Coordinator::TearDown() {
   DCHECK(!torn_down_) << "Coordinator::TearDown() may not be called twice";
   torn_down_ = true;
@@ -1909,7 +1921,15 @@ void Coordinator::TearDown() {
   }
 
   // Need to protect against failed Prepare(), where root_sink() would not be set.
-  if (root_sink_ != nullptr) root_sink_->CloseConsumer();
+  if (coord_sink_ != nullptr) {
+    coord_sink_->CloseConsumer();
+    coord_sink_ = nullptr;
+  }
+  if (coord_instance_ != nullptr) {
+    ExecEnv::GetInstance()->query_exec_mgr()->ReleaseQueryState(
+        coord_instance_->query_state());
+    coord_instance_ = nullptr;
+  }
 }
 
 void Coordinator::UpdateFilter(const TUpdateFilterParams& params) {
@@ -1987,7 +2007,7 @@ void Coordinator::UpdateFilter(const TUpdateFilterParams& params) {
   rpc_params->filter_id = params.filter_id;
 
   for (int target_idx: target_fragment_instance_state_idxs) {
-    FragmentInstanceState* fragment_inst = fragment_instance_states_[target_idx];
+    InstanceState* fragment_inst = fragment_instance_states_[target_idx];
     DCHECK(fragment_inst != NULL) << "Missing fragment instance: " << target_idx;
     exec_env_->rpc_pool()->Offer(bind<void>(DistributeFilters, rpc_params,
         fragment_inst->impalad_address(), fragment_inst->fragment_instance_id()));

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/runtime/coordinator.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h
index d53f16c..e1334db 100644
--- a/be/src/runtime/coordinator.h
+++ b/be/src/runtime/coordinator.h
@@ -38,13 +38,11 @@
 #include "common/status.h"
 #include "gen-cpp/Frontend_types.h"
 #include "gen-cpp/Types_types.h"
-#include "runtime/runtime-state.h"
-#include "scheduling/simple-scheduler.h"
-#include "service/fragment-exec-state.h"
-#include "service/fragment-mgr.h"
 #include "util/histogram-metric.h"
 #include "util/progress-updater.h"
 #include "util/runtime-profile.h"
+#include "scheduling/query-schedule.h"
+#include "runtime/runtime-state.h"  // for PartitionStatusMap; TODO: disentangle
 
 namespace impala {
 
@@ -69,6 +67,9 @@ class RuntimeProfile;
 class TablePrinter;
 class TPlanFragment;
 class QueryResultSet;
+class MemTracker;
+class PlanRootSink;
+class FragmentInstanceState;
 
 struct DebugOptions;
 
@@ -212,7 +213,7 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   void TearDown();
 
  private:
-  class FragmentInstanceState;
+  class InstanceState;
   struct FilterTarget;
   class FilterState;
 
@@ -249,10 +250,10 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
     CounterMap scan_ranges_complete_counters;
   };
 
-  /// FragmentInstanceStates for all fragment instances, including that of the coordinator
+  /// InstanceStates for all fragment instances, including that of the coordinator
   /// fragment. All elements are non-nullptr. Owned by obj_pool(). Filled in
   /// StartFInstances().
-  std::vector<FragmentInstanceState*> fragment_instance_states_;
+  std::vector<InstanceState*> fragment_instance_states_;
 
   /// True if the query needs a post-execution step to tidy up
   bool needs_finalization_;
@@ -273,7 +274,7 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   /// time.
   /// Lock ordering is
   /// 1. lock_
-  /// 2. FragmentInstanceState::lock_
+  /// 2. InstanceState::lock_
   boost::mutex lock_;
 
   /// Overall status of the entire query; set to the first reported fragment error
@@ -294,13 +295,13 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   /// Result rows are materialized by this fragment instance in its own thread. They are
   /// materialized into a QueryResultSet provided to the coordinator during GetNext().
   ///
-  /// Created during fragment instance start-up by FragmentExecState and set here in
-  /// Exec().  Keep a shared_ptr reference to the fragment state so that root_sink_ will
-  /// be valid for the lifetime of the coordinator. This is important if, for example, the
-  /// fragment instance is cancelled while the coordinator is calling GetNext().
-  std::shared_ptr<FragmentMgr::FragmentExecState> root_fragment_instance_;
-  PlanFragmentExecutor* executor_ = nullptr;
-  PlanRootSink* root_sink_ = nullptr;
+  /// Not owned by this class. Set in Exec(). Reset to nullptr (and the implied
+  /// reference of QueryState released) in TearDown().
+  FragmentInstanceState* coord_instance_ = nullptr;
+
+  /// Not owned by this class. Set in Exec(). Reset to nullptr in TearDown() or when
+  /// GetNext() hits eos.
+  PlanRootSink* coord_sink_ = nullptr;
 
   /// Query mem tracker for this coordinator initialized in Exec(). Only valid if there
   /// is no coordinator fragment (i.e. executor_ == NULL). If executor_ is not NULL,
@@ -314,6 +315,8 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   /// Returns a local object pool.
   ObjectPool* obj_pool() { return obj_pool_.get(); }
 
+  PlanFragmentExecutor* executor();
+
   // Sets the TDescriptorTable(s) for the current fragment.
   void SetExecPlanDescriptorTable(const TPlanFragment& fragment,
       TExecPlanFragmentParams* rpc_params);
@@ -501,11 +504,11 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   void InitExecSummary();
 
   /// Update fragment profile information from a fragment instance state.
-  void UpdateAverageProfile(FragmentInstanceState* fragment_instance_state);
+  void UpdateAverageProfile(InstanceState* instance_state);
 
   /// Compute the summary stats (completion_time and rates)
   /// for an individual fragment_profile_ based on the specified instance state.
-  void ComputeFragmentSummaryStats(FragmentInstanceState* fragment_instance_state);
+  void ComputeFragmentSummaryStats(InstanceState* instance_state);
 
   /// Outputs aggregate query profile summary.  This is assumed to be called at the end of
   /// a query -- remote fragments' profiles must not be updated while this is running.
@@ -513,7 +516,7 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
 
   /// Populates the summary execution stats from the profile. Can only be called when the
   /// query is done.
-  void UpdateExecSummary(const FragmentInstanceState& instance_state);
+  void UpdateExecSummary(const InstanceState& instance_state);
 
   /// Determines what the permissions of directories created by INSERT statements should
   /// be if permission inheritance is enabled. Populates a map from all prefixes of

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/runtime/data-stream-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-test.cc b/be/src/runtime/data-stream-test.cc
index d53a146..c76afb5 100644
--- a/be/src/runtime/data-stream-test.cc
+++ b/be/src/runtime/data-stream-test.cc
@@ -28,6 +28,7 @@
 #include "runtime/row-batch.h"
 #include "runtime/runtime-state.h"
 #include "runtime/data-stream-mgr.h"
+#include "runtime/exec-env.h"
 #include "runtime/data-stream-sender.h"
 #include "runtime/data-stream-recvr.h"
 #include "runtime/descriptors.h"
@@ -112,10 +113,12 @@ class ImpalaTestBackend : public ImpalaInternalServiceIf {
 
 class DataStreamTest : public testing::Test {
  protected:
-  DataStreamTest() : runtime_state_(TExecPlanFragmentParams(), &exec_env_), next_val_(0) {
+  DataStreamTest()
+    : runtime_state_(TQueryCtx(), &exec_env_),
+      next_val_(0) {
     // Initialize Mem trackers for use by the data stream receiver.
     exec_env_.InitForFeTests();
-    runtime_state_.InitMemTrackers(TUniqueId(), NULL, -1);
+    runtime_state_.InitMemTrackers(NULL, -1);
 
     // Stop tests that rely on mismatched sender / receiver pairs timing out from failing.
     FLAGS_datastream_sender_timeout_ms = 250;
@@ -480,9 +483,9 @@ class DataStreamTest : public testing::Test {
 
   void Sender(int sender_num, int channel_buffer_size,
               TPartitionType::type partition_type) {
-    RuntimeState state(TExecPlanFragmentParams(), &exec_env_);
+    RuntimeState state(TQueryCtx(), &exec_env_);
     state.set_desc_tbl(desc_tbl_);
-    state.InitMemTrackers(TUniqueId(), NULL, -1);
+    state.InitMemTrackers(NULL, -1);
     VLOG_QUERY << "create sender " << sender_num;
     const TDataStreamSink& sink = GetSink(partition_type);
     DataStreamSender sender(
@@ -593,9 +596,8 @@ TEST_F(DataStreamTest, BasicTest) {
 //
 // TODO: Make lifecycle requirements more explicit.
 TEST_F(DataStreamTest, CloseRecvrWhileReferencesRemain) {
-  scoped_ptr<RuntimeState> runtime_state(
-      new RuntimeState(TExecPlanFragmentParams(), &exec_env_));
-  runtime_state->InitMemTrackers(TUniqueId(), NULL, -1);
+  scoped_ptr<RuntimeState> runtime_state(new RuntimeState(TQueryCtx(), &exec_env_));
+  runtime_state->InitMemTrackers(NULL, -1);
 
   scoped_ptr<RuntimeProfile> profile(new RuntimeProfile(&obj_pool_, "TestReceiver"));
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/runtime/exec-env.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index db0d242..ad996e3 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -36,10 +36,10 @@
 #include "runtime/lib-cache.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/thread-resource-mgr.h"
+#include "runtime/query-exec-mgr.h"
 #include "runtime/tmp-file-mgr.h"
 #include "scheduling/request-pool-service.h"
 #include "scheduling/simple-scheduler.h"
-#include "service/fragment-mgr.h"
 #include "service/frontend.h"
 #include "statestore/statestore-subscriber.h"
 #include "util/debug-util.h"
@@ -146,7 +146,7 @@ ExecEnv::ExecEnv()
     fragment_exec_thread_pool_(new CallableThreadPool("coordinator-fragment-rpc",
         "worker", FLAGS_coordinator_rpc_threads, numeric_limits<int32_t>::max())),
     async_rpc_pool_(new CallableThreadPool("rpc-pool", "async-rpc-sender", 8, 10000)),
-    fragment_mgr_(new FragmentMgr()),
+    query_exec_mgr_(new QueryExecMgr()),
     enable_webserver_(FLAGS_enable_webserver),
     is_fe_tests_(false),
     backend_address_(MakeNetworkAddress(FLAGS_hostname, FLAGS_be_port)) {
@@ -199,7 +199,7 @@ ExecEnv::ExecEnv(const string& hostname, int backend_port, int subscriber_port,
     fragment_exec_thread_pool_(new CallableThreadPool("coordinator-fragment-rpc",
         "worker", FLAGS_coordinator_rpc_threads, numeric_limits<int32_t>::max())),
     async_rpc_pool_(new CallableThreadPool("rpc-pool", "async-rpc-sender", 8, 10000)),
-    fragment_mgr_(new FragmentMgr()),
+    query_exec_mgr_(new QueryExecMgr()),
     enable_webserver_(FLAGS_enable_webserver && webserver_port > 0),
     is_fe_tests_(false),
     backend_address_(MakeNetworkAddress(FLAGS_hostname, FLAGS_be_port)) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/runtime/exec-env.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.h b/be/src/runtime/exec-env.h
index d4cdf24..08ddd9f 100644
--- a/be/src/runtime/exec-env.h
+++ b/be/src/runtime/exec-env.h
@@ -31,7 +31,7 @@ namespace impala {
 class CallableThreadPool;
 class DataStreamMgr;
 class DiskIoMgr;
-class FragmentMgr;
+class QueryExecMgr;
 class Frontend;
 class HBaseTableFactory;
 class HdfsFsCache;
@@ -96,7 +96,7 @@ class ExecEnv {
   Frontend* frontend() { return frontend_.get(); }
   RequestPoolService* request_pool_service() { return request_pool_service_.get(); }
   CallableThreadPool* rpc_pool() { return async_rpc_pool_.get(); }
-  FragmentMgr* fragment_mgr() { return fragment_mgr_.get(); }
+  QueryExecMgr* query_exec_mgr() { return query_exec_mgr_.get(); }
 
   void set_enable_webserver(bool enable) { enable_webserver_ = enable; }
 
@@ -138,7 +138,7 @@ class ExecEnv {
   boost::scoped_ptr<Frontend> frontend_;
   boost::scoped_ptr<CallableThreadPool> fragment_exec_thread_pool_;
   boost::scoped_ptr<CallableThreadPool> async_rpc_pool_;
-  boost::scoped_ptr<FragmentMgr> fragment_mgr_;
+  boost::scoped_ptr<QueryExecMgr> query_exec_mgr_;
 
   /// Not owned by this class
   ImpalaServer* impala_server_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/runtime/fragment-instance-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/fragment-instance-state.cc b/be/src/runtime/fragment-instance-state.cc
new file mode 100644
index 0000000..5dcd4c6
--- /dev/null
+++ b/be/src/runtime/fragment-instance-state.cc
@@ -0,0 +1,165 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+#include "runtime/fragment-instance-state.h"
+
+#include <boost/thread/locks.hpp>
+#include <boost/thread/lock_guard.hpp>
+
+#include "runtime/exec-env.h"
+#include "runtime/backend-client.h"
+#include "runtime/runtime-filter-bank.h"
+#include "runtime/client-cache.h"
+#include "runtime/runtime-state.h"
+#include "runtime/query-state.h"
+#include "gen-cpp/ImpalaInternalService_types.h"
+
+using namespace impala;
+
+FragmentInstanceState::FragmentInstanceState(
+    QueryState* query_state, const TPlanFragmentCtx& fragment_ctx,
+    const TPlanFragmentInstanceCtx& instance_ctx, const TDescriptorTable& desc_tbl)
+  : query_state_(query_state),
+    fragment_ctx_(fragment_ctx),
+    instance_ctx_(instance_ctx),
+    desc_tbl_(desc_tbl),
+    executor_(
+        [this](const Status& status, RuntimeProfile* profile, bool done) {
+          ReportStatusCb(status, profile, done);
+        }) {
+}
+
+Status FragmentInstanceState::UpdateStatus(const Status& status) {
+  lock_guard<mutex> l(status_lock_);
+  if (!status.ok() && exec_status_.ok()) exec_status_ = status;
+  return exec_status_;
+}
+
+Status FragmentInstanceState::Cancel() {
+  lock_guard<mutex> l(status_lock_);
+  RETURN_IF_ERROR(exec_status_);
+  executor_.Cancel();
+  return Status::OK();
+}
+
+void FragmentInstanceState::Exec() {
+  Status status =
+      executor_.Prepare(query_state_, desc_tbl_, fragment_ctx_, instance_ctx_);
+  prepare_promise_.Set(status);
+  if (status.ok()) {
+    if (executor_.Open().ok()) {
+      executor_.Exec();
+    }
+  }
+  executor_.Close();
+}
+
+void FragmentInstanceState::ReportStatusCb(
+    const Status& status, RuntimeProfile* profile, bool done) {
+  DCHECK(status.ok() || done);  // if !status.ok() => done
+  Status exec_status = UpdateStatus(status);
+
+  Status coord_status;
+  ImpalaBackendConnection coord(
+      ExecEnv::GetInstance()->impalad_client_cache(), coord_address(), &coord_status);
+  if (!coord_status.ok()) {
+    stringstream s;
+    s << "Couldn't get a client for " << coord_address() <<"\tReason: "
+      << coord_status.GetDetail();
+    UpdateStatus(Status(ErrorMsg(TErrorCode::INTERNAL_ERROR, s.str())));
+    return;
+  }
+
+  TReportExecStatusParams params;
+  params.protocol_version = ImpalaInternalServiceVersion::V1;
+  params.__set_query_id(query_state_->query_ctx().query_id);
+  params.__set_fragment_instance_id(instance_ctx_.fragment_instance_id);
+  exec_status.SetTStatus(&params);
+  params.__set_done(done);
+
+  if (profile != NULL) {
+    profile->ToThrift(&params.profile);
+    params.__isset.profile = true;
+  }
+
+  RuntimeState* runtime_state = executor_.runtime_state();
+  // If executor_ did not successfully prepare, runtime state may not have been set.
+  if (runtime_state != NULL) {
+    // Only send updates to insert status if fragment is finished, the coordinator
+    // waits until query execution is done to use them anyhow.
+    if (done) {
+      TInsertExecStatus insert_status;
+
+      if (runtime_state->hdfs_files_to_move()->size() > 0) {
+        insert_status.__set_files_to_move(*runtime_state->hdfs_files_to_move());
+      }
+      if (runtime_state->per_partition_status()->size() > 0) {
+        insert_status.__set_per_partition_status(*runtime_state->per_partition_status());
+      }
+
+      params.__set_insert_exec_status(insert_status);
+    }
+
+    // Send new errors to coordinator
+    runtime_state->GetUnreportedErrors(&(params.error_log));
+  }
+  params.__isset.error_log = (params.error_log.size() > 0);
+
+  TReportExecStatusResult res;
+  Status rpc_status;
+  bool retry_is_safe;
+  // Try to send the RPC 3 times before failing.
+  for (int i = 0; i < 3; ++i) {
+    rpc_status = coord.DoRpc(
+        &ImpalaBackendClient::ReportExecStatus, params, &res, &retry_is_safe);
+    if (rpc_status.ok()) {
+      rpc_status = Status(res.status);
+      break;
+    }
+    if (!retry_is_safe) break;
+    if (i < 2) SleepForMs(100);
+  }
+  if (!rpc_status.ok()) {
+    UpdateStatus(rpc_status);
+    executor_.Cancel();
+  }
+}
+
+void FragmentInstanceState::PublishFilter(
+    int32_t filter_id, const TBloomFilter& thrift_bloom_filter) {
+  VLOG_FILE << "PublishFilter(): instance_id=" << PrintId(instance_id())
+            << " filter_id=" << filter_id;
+  // Defensively protect against blocking forever in case there's some problem with
+  // Prepare().
+  static const int WAIT_MS = 30000;
+  bool timed_out = false;
+  // Wait until Prepare() is done, so we know that the filter bank is set up.
+  // TODO: get rid of concurrency in the setup phase as part of the per-query exec rpc
+  Status prepare_status = prepare_promise_.Get(WAIT_MS, &timed_out);
+  if (timed_out) {
+    LOG(ERROR) << "Unexpected timeout in PublishFilter()";
+    return;
+  }
+  if (!prepare_status.ok()) return;
+  executor_.runtime_state()->filter_bank()->PublishGlobalFilter(
+      filter_id, thrift_bloom_filter);
+}
+
+const TQueryCtx& FragmentInstanceState::query_ctx() const {
+  return query_state_->query_ctx();
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/runtime/fragment-instance-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/fragment-instance-state.h b/be/src/runtime/fragment-instance-state.h
new file mode 100644
index 0000000..c938a31
--- /dev/null
+++ b/be/src/runtime/fragment-instance-state.h
@@ -0,0 +1,118 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+#ifndef IMPALA_RUNTIME_FRAGMENT_INSTANCE_STATE_H
+#define IMPALA_RUNTIME_FRAGMENT_INSTANCE_STATE_H
+
+#include "common/status.h"
+#include "util/promise.h"
+#include "runtime/plan-fragment-executor.h"
+
+#include "gen-cpp/ImpalaInternalService_types.h"
+
+namespace impala {
+
+class TPlanFragmentCtx;
+class TPlanFragmentInstanceCtx;
+class TBloomFilter;
+class TUniqueId;
+class TNetworkAddress;
+class QueryState;
+class PlanFragmentExecutor;
+class RuntimeProfile;
+
+/// Collection of state specific to the execution of a particular fragment instance,
+/// as well as manager of the execution of that fragment instance, including
+/// set-up and tear-down.
+/// Tear-down happens automatically in the d'tor and frees all memory allocated for
+/// this fragment instance and closes all data streams.
+///
+/// Aside from Cancel(), which may be called asynchronously, this class is not
+/// thread-safe.
+///
+/// TODO:
+/// - merge PlanFragmentExecutor into this class
+/// - move tear-down logic out of d'tor and into ReleaseResources() function
+/// - as part of the per-query exec rpc, get rid of concurrency during setup
+///   (and remove prepare_promise_)
+/// - move ReportStatusCb() logic into PFE::SendReport() and get rid of the callback
+class FragmentInstanceState {
+ public:
+  FragmentInstanceState(QueryState* query_state, const TPlanFragmentCtx& fragment_ctx,
+      const TPlanFragmentInstanceCtx& instance_ctx, const TDescriptorTable& desc_tbl);
+
+  /// Frees up all resources allocated in Exec().
+  /// It is an error to delete a FragmentInstanceState before Exec() returns.
+  ~FragmentInstanceState() { }
+
+  /// Main loop of plan fragment execution. Blocks until execution finishes.
+  void Exec();
+
+  /// Returns current execution status, if there was an error. Otherwise cancels
+  /// the fragment and returns OK.
+  Status Cancel();
+
+  /// Publishes filter with ID 'filter_id' to this fragment instance's filter bank.
+  void PublishFilter(int32_t filter_id, const TBloomFilter& thrift_bloom_filter);
+
+  QueryState* query_state() { return query_state_; }
+  PlanFragmentExecutor* executor() { return &executor_; }
+  const TQueryCtx& query_ctx() const;
+  const TPlanFragmentCtx& fragment_ctx() const { return fragment_ctx_; }
+  const TPlanFragmentInstanceCtx& instance_ctx() const { return instance_ctx_; }
+  const TUniqueId& query_id() const { return query_ctx().query_id; }
+  const TUniqueId& instance_id() const { return instance_ctx_.fragment_instance_id; }
+  const TNetworkAddress& coord_address() const { return query_ctx().coord_address; }
+
+ private:
+  QueryState* query_state_;
+  const TPlanFragmentCtx fragment_ctx_;
+  const TPlanFragmentInstanceCtx instance_ctx_;
+
+  /// instance-specific descriptor table
+  /// TODO: remove when switching to per-query exec rpc
+  const TDescriptorTable desc_tbl_;
+
+  PlanFragmentExecutor executor_;
+
+  /// protects exec_status_
+  boost::mutex status_lock_;
+
+  /// set in ReportStatusCb();
+  /// if set to anything other than OK, execution has terminated w/ an error
+  Status exec_status_;
+
+  /// Barrier for the completion of executor_.Prepare().
+  Promise<Status> prepare_promise_;
+
+  /// Update 'exec_status_' w/ 'status', if the former is not already an error.
+  /// Returns the value of 'exec_status_' after this method completes.
+  Status UpdateStatus(const Status& status);
+
+  /// Callback for executor; updates exec_status_ if 'status' indicates an error
+  /// or if there was a thrift error.
+  /// If not NULL, `profile` is encoded as a Thrift structure and transmitted as part of
+  /// the reporting RPC. `profile` may be NULL if a runtime profile has not been created
+  /// for this fragment (e.g. when the fragment has failed during preparation).
+  /// The executor must ensure that there is only one invocation at a time.
+  void ReportStatusCb(const Status& status, RuntimeProfile* profile, bool done);
+};
+
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/runtime/plan-fragment-executor.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/plan-fragment-executor.cc b/be/src/runtime/plan-fragment-executor.cc
index 3657882..464dbce 100644
--- a/be/src/runtime/plan-fragment-executor.cc
+++ b/be/src/runtime/plan-fragment-executor.cc
@@ -37,7 +37,9 @@
 #include "runtime/descriptors.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/row-batch.h"
+#include "runtime/query-state.h"
 #include "runtime/runtime-filter-bank.h"
+#include "runtime/exec-env.h"
 #include "util/container-util.h"
 #include "runtime/runtime-state.h"
 #include "util/cpu-info.h"
@@ -69,9 +71,8 @@ const string EXEC_TIMER_NAME = "ExecTime";
 }
 
 PlanFragmentExecutor::PlanFragmentExecutor(
-    ExecEnv* exec_env, const ReportStatusCallback& report_status_cb)
-  : exec_env_(exec_env),
-    exec_tree_(NULL),
+    const ReportStatusCallback& report_status_cb)
+  : exec_tree_(NULL),
     report_status_cb_(report_status_cb),
     report_thread_active_(false),
     closed_(false),
@@ -92,8 +93,10 @@ PlanFragmentExecutor::~PlanFragmentExecutor() {
   DCHECK(!report_thread_active_);
 }
 
-Status PlanFragmentExecutor::Prepare(const TExecPlanFragmentParams& request) {
-  Status status = PrepareInternal(request);
+Status PlanFragmentExecutor::Prepare(
+    QueryState* query_state, const TDescriptorTable& desc_tbl,
+    const TPlanFragmentCtx& fragment_ctx, const TPlanFragmentInstanceCtx& instance_ctx) {
+  Status status = PrepareInternal(query_state, desc_tbl, fragment_ctx, instance_ctx);
   prepared_promise_.Set(status);
   if (!status.ok()) FragmentComplete(status);
   return status;
@@ -105,7 +108,9 @@ Status PlanFragmentExecutor::WaitForOpen() {
   return opened_promise_.Get();
 }
 
-Status PlanFragmentExecutor::PrepareInternal(const TExecPlanFragmentParams& request) {
+Status PlanFragmentExecutor::PrepareInternal(
+    QueryState* qs, const TDescriptorTable& tdesc_tbl,
+    const TPlanFragmentCtx& fragment_ctx, const TPlanFragmentInstanceCtx& instance_ctx) {
   lock_guard<mutex> l(prepare_lock_);
   DCHECK(!is_prepared_);
 
@@ -113,19 +118,17 @@ Status PlanFragmentExecutor::PrepareInternal(const TExecPlanFragmentParams& requ
   is_prepared_ = true;
 
   // TODO: Break this method up.
-  const TPlanFragmentInstanceCtx& fragment_instance_ctx = request.fragment_instance_ctx;
-  query_id_ = request.query_ctx.query_id;
+  query_id_ = qs->query_ctx().query_id;
 
-  VLOG_QUERY << "Prepare(): query_id=" << PrintId(query_id_) << " instance_id="
-             << PrintId(request.fragment_instance_ctx.fragment_instance_id);
-  VLOG(2) << "fragment_instance_ctx:\n" << ThriftDebugString(fragment_instance_ctx);
-
-  DCHECK(request.__isset.fragment_ctx);
+  VLOG_QUERY << "Prepare(): instance_id="
+             << PrintId(instance_ctx.fragment_instance_id);
+  VLOG(2) << "fragment_instance_ctx:\n" << ThriftDebugString(instance_ctx);
 
   // Prepare() must not return before runtime_state_ is set if is_prepared_ was
   // set. Having runtime_state_.get() != NULL is a postcondition of this method in that
   // case. Do not call RETURN_IF_ERROR or explicitly return before this line.
-  runtime_state_.reset(new RuntimeState(request, exec_env_));
+  runtime_state_.reset(
+      new RuntimeState(qs, fragment_ctx, instance_ctx, ExecEnv::GetInstance()));
 
   // total_time_counter() is in the runtime_state_ so start it up now.
   SCOPED_TIMER(profile()->total_time_counter());
@@ -143,9 +146,8 @@ Status PlanFragmentExecutor::PrepareInternal(const TExecPlanFragmentParams& requ
                << PrettyPrinter::Print(bytes_limit, TUnit::BYTES);
   }
 
-  DCHECK(!fragment_instance_ctx.request_pool.empty());
-  runtime_state_->InitMemTrackers(
-      query_id_, &fragment_instance_ctx.request_pool, bytes_limit);
+  DCHECK(!instance_ctx.request_pool.empty());
+  runtime_state_->InitMemTrackers(&instance_ctx.request_pool, bytes_limit);
   RETURN_IF_ERROR(runtime_state_->CreateBlockMgr());
   runtime_state_->InitFilterBank();
 
@@ -167,27 +169,21 @@ Status PlanFragmentExecutor::PrepareInternal(const TExecPlanFragmentParams& requ
 
   // set up desc tbl
   DescriptorTbl* desc_tbl = NULL;
-  DCHECK(request.__isset.query_ctx);
-  DCHECK(request.query_ctx.__isset.desc_tbl);
-  RETURN_IF_ERROR(
-      DescriptorTbl::Create(obj_pool(), request.query_ctx.desc_tbl, &desc_tbl));
+  RETURN_IF_ERROR(DescriptorTbl::Create(obj_pool(), tdesc_tbl, &desc_tbl));
   runtime_state_->set_desc_tbl(desc_tbl);
-  VLOG_QUERY << "descriptor table for fragment="
-             << request.fragment_instance_ctx.fragment_instance_id
+  VLOG_QUERY << "descriptor table for fragment=" << instance_ctx.fragment_instance_id
              << "\n" << desc_tbl->DebugString();
 
   // set up plan
-  DCHECK(request.__isset.fragment_ctx);
   RETURN_IF_ERROR(ExecNode::CreateTree(
-      runtime_state_.get(), request.fragment_ctx.fragment.plan, *desc_tbl, &exec_tree_));
+      runtime_state_.get(), fragment_ctx.fragment.plan, *desc_tbl, &exec_tree_));
   runtime_state_->set_fragment_root_id(exec_tree_->id());
 
-  if (fragment_instance_ctx.__isset.debug_node_id) {
-    DCHECK(fragment_instance_ctx.__isset.debug_action);
-    DCHECK(fragment_instance_ctx.__isset.debug_phase);
-    ExecNode::SetDebugOptions(fragment_instance_ctx.debug_node_id,
-        fragment_instance_ctx.debug_phase, fragment_instance_ctx.debug_action,
-        exec_tree_);
+  if (instance_ctx.__isset.debug_node_id) {
+    DCHECK(instance_ctx.__isset.debug_action);
+    DCHECK(instance_ctx.__isset.debug_phase);
+    ExecNode::SetDebugOptions(instance_ctx.debug_node_id, instance_ctx.debug_phase,
+        instance_ctx.debug_action, exec_tree_);
   }
 
   // set #senders of exchange nodes before calling Prepare()
@@ -195,8 +191,8 @@ Status PlanFragmentExecutor::PrepareInternal(const TExecPlanFragmentParams& requ
   exec_tree_->CollectNodes(TPlanNodeType::EXCHANGE_NODE, &exch_nodes);
   for (ExecNode* exch_node : exch_nodes) {
     DCHECK_EQ(exch_node->type(), TPlanNodeType::EXCHANGE_NODE);
-    int num_senders = FindWithDefault(fragment_instance_ctx.per_exch_num_senders,
-        exch_node->id(), 0);
+    int num_senders =
+        FindWithDefault(instance_ctx.per_exch_num_senders, exch_node->id(), 0);
     DCHECK_GT(num_senders, 0);
     static_cast<ExchangeNode*>(exch_node)->set_num_senders(num_senders);
   }
@@ -208,7 +204,7 @@ Status PlanFragmentExecutor::PrepareInternal(const TExecPlanFragmentParams& requ
   for (int i = 0; i < scan_nodes.size(); ++i) {
     ScanNode* scan_node = static_cast<ScanNode*>(scan_nodes[i]);
     const vector<TScanRangeParams>& scan_ranges = FindWithDefault(
-        fragment_instance_ctx.per_node_scan_ranges, scan_node->id(), no_scan_ranges);
+        instance_ctx.per_node_scan_ranges, scan_node->id(), no_scan_ranges);
     scan_node->SetScanRanges(scan_ranges);
   }
 
@@ -220,13 +216,13 @@ Status PlanFragmentExecutor::PrepareInternal(const TExecPlanFragmentParams& requ
     RETURN_IF_ERROR(exec_tree_->Prepare(state));
   }
 
-  PrintVolumeIds(fragment_instance_ctx.per_node_scan_ranges);
+  PrintVolumeIds(instance_ctx.per_node_scan_ranges);
 
-  DCHECK(request.fragment_ctx.fragment.__isset.output_sink);
+  DCHECK(fragment_ctx.fragment.__isset.output_sink);
   RETURN_IF_ERROR(
-      DataSink::CreateDataSink(obj_pool(), request.fragment_ctx.fragment.output_sink,
-          request.fragment_ctx.fragment.output_exprs, fragment_instance_ctx,
-          exec_tree_->row_desc(), &sink_));
+      DataSink::CreateDataSink(obj_pool(), fragment_ctx.fragment.output_sink,
+          fragment_ctx.fragment.output_exprs, instance_ctx, exec_tree_->row_desc(),
+          &sink_));
   RETURN_IF_ERROR(
       sink_->Prepare(runtime_state(), runtime_state_->instance_mem_tracker()));
 
@@ -235,7 +231,7 @@ Status PlanFragmentExecutor::PrepareInternal(const TExecPlanFragmentParams& requ
     profile()->AddChild(sink_profile);
   }
 
-  if (request.fragment_ctx.fragment.output_sink.type == TDataSinkType::PLAN_ROOT_SINK) {
+  if (fragment_ctx.fragment.output_sink.type == TDataSinkType::PLAN_ROOT_SINK) {
     root_sink_ = reinterpret_cast<PlanRootSink*>(sink_.get());
     // Release the thread token on the root fragment instance. This fragment spends most
     // of the time waiting and doing very little work. Holding on to the token causes

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/runtime/plan-fragment-executor.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/plan-fragment-executor.h b/be/src/runtime/plan-fragment-executor.h
index f93dab4..2194e58 100644
--- a/be/src/runtime/plan-fragment-executor.h
+++ b/be/src/runtime/plan-fragment-executor.h
@@ -84,7 +84,7 @@ class PlanFragmentExecutor {
 
   /// report_status_cb, if !empty(), is used to report the accumulated profile
   /// information periodically during execution.
-  PlanFragmentExecutor(ExecEnv* exec_env, const ReportStatusCallback& report_status_cb);
+  PlanFragmentExecutor(const ReportStatusCallback& report_status_cb);
 
   /// It is an error to delete a PlanFragmentExecutor with a report callback before Exec()
   /// indicated that execution is finished, or to delete one that has not been Close()'d
@@ -103,11 +103,16 @@ class PlanFragmentExecutor {
   /// Status::CANCELLED;
   ///
   /// If Prepare() fails, it will invoke final status callback with the error status.
-  Status Prepare(const TExecPlanFragmentParams& request);
+  /// TODO: remove desc_tbl parameter once we do a per-query exec rpc (and we
+  /// have a single descriptor table to cover all fragment instances); at the moment
+  /// we need to pass the TDescriptorTable explicitly
+  Status Prepare(QueryState* query_state, const TDescriptorTable& desc_tbl,
+      const TPlanFragmentCtx& fragment_ctx, const TPlanFragmentInstanceCtx& instance_ctx);
 
   /// Opens the fragment plan and sink. Starts the profile reporting thread, if
   /// required.  Can be called only if Prepare() succeeded. If Open() fails it will
   /// invoke the final status callback with the error status.
+  /// TODO: is this needed? It's only ever called in conjunction with Exec() and Close()
   Status Open();
 
   /// Executes the fragment by repeatedly driving the sink with batches produced by the
@@ -153,7 +158,6 @@ class PlanFragmentExecutor {
   static const std::string PER_HOST_PEAK_MEM_COUNTER;
 
  private:
-  ExecEnv* exec_env_;  // not owned
   ExecNode* exec_tree_; // lives in runtime_state_->obj_pool()
   TUniqueId query_id_;
 
@@ -278,7 +282,10 @@ class PlanFragmentExecutor {
   Status ExecInternal();
 
   /// Performs all the logic of Prepare() and returns resulting status.
-  Status PrepareInternal(const TExecPlanFragmentParams& request);
+  /// TODO: remove desc_tbl parameter as part of per-query exec rpc
+  Status PrepareInternal(QueryState* qs, const TDescriptorTable& desc_tbl,
+      const TPlanFragmentCtx& fragment_ctx,
+      const TPlanFragmentInstanceCtx& instance_ctx);
 
   /// Releases the thread token for this fragment executor.
   void ReleaseThreadToken();
@@ -288,7 +295,6 @@ class PlanFragmentExecutor {
   void StopReportThread();
 
   /// Print stats about scan ranges for each volumeId in params to info log.
-  void PrintVolumeIds(const TPlanExecParams& params);
   void PrintVolumeIds(const PerNodeScanRanges& per_node_scan_ranges);
 
   const DescriptorTbl& desc_tbl() { return runtime_state_->desc_tbl(); }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/runtime/query-exec-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-exec-mgr.cc b/be/src/runtime/query-exec-mgr.cc
new file mode 100644
index 0000000..4a3742c
--- /dev/null
+++ b/be/src/runtime/query-exec-mgr.cc
@@ -0,0 +1,165 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+#include "runtime/query-exec-mgr.h"
+
+#include <gperftools/malloc_extension.h>
+#include <gutil/strings/substitute.h>
+#include <boost/thread/locks.hpp>
+#include <boost/thread/lock_guard.hpp>
+
+#include "common/logging.h"
+#include "runtime/query-state.h"
+#include "runtime/fragment-instance-state.h"
+#include "runtime/exec-env.h"
+#include "runtime/mem-tracker.h"
+#include "util/uid-util.h"
+#include "util/thread.h"
+#include "util/impalad-metrics.h"
+#include "util/debug-util.h"
+
+using boost::lock_guard;
+using namespace impala;
+
+// TODO: this logging should go into a per query log.
+DEFINE_int32(log_mem_usage_interval, 0, "If non-zero, impalad will output memory usage "
+    "every log_mem_usage_interval'th fragment completion.");
+
+Status QueryExecMgr::StartFInstance(const TExecPlanFragmentParams& params) {
+  TUniqueId instance_id = params.fragment_instance_ctx.fragment_instance_id;
+  VLOG_QUERY << "StartFInstance() instance_id=" << PrintId(instance_id)
+             << " coord=" << params.query_ctx.coord_address;
+
+  // Starting a new fragment instance creates a thread and consumes a non-trivial
+  // amount of memory. If we are already starved for memory, cancel the instance as
+  // early as possible to avoid digging the hole deeper.
+  MemTracker* process_mem_tracker = ExecEnv::GetInstance()->process_mem_tracker();
+  if (process_mem_tracker->LimitExceeded()) {
+    string msg = Substitute("Instance $0 of plan fragment $1 could not "
+        "start because the backend Impala daemon is over its memory limit",
+        PrintId(instance_id), params.fragment_ctx.fragment.display_name);
+    return process_mem_tracker->MemLimitExceeded(NULL, msg, 0);
+  }
+
+  QueryState* qs = nullptr;
+  int refcnt;
+  {
+    lock_guard<mutex> l(qs_map_lock_);
+    TUniqueId query_id = params.query_ctx.query_id;
+    auto it = qs_map_.find(query_id);
+    if (it == qs_map_.end()) {
+      // register new QueryState
+      qs = new QueryState(params.query_ctx);
+      qs_map_.insert(make_pair(query_id, qs));
+      VLOG_QUERY << "new QueryState: query_id=" << query_id;
+    } else {
+      qs = it->second;
+    }
+    // decremented at the end of ExecFInstance()
+    refcnt = qs->refcnt_.Add(1);
+  }
+  DCHECK(qs != nullptr && qs->refcnt_.Load() > 0);
+  VLOG_QUERY << "QueryState: query_id=" << params.query_ctx.query_id
+             << " refcnt=" << refcnt;
+
+  DCHECK(params.__isset.fragment_ctx);
+  DCHECK(params.__isset.fragment_instance_ctx);
+  FragmentInstanceState* fis = qs->obj_pool()->Add(
+      new FragmentInstanceState(qs, params.fragment_ctx, params.fragment_instance_ctx,
+        params.query_ctx.desc_tbl));
+  // register instance before returning so that async Cancel() calls can
+  // find the instance
+  qs->RegisterFInstance(fis);
+  // start new thread to execute instance
+  Thread t("query-exec-mgr",
+      Substitute("exec-fragment-instance-$0", PrintId(instance_id)),
+      &QueryExecMgr::ExecFInstance, this, fis);
+  t.Detach();
+
+  ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS_IN_FLIGHT->Increment(1L);
+  ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS->Increment(1L);
+  return Status::OK();
+}
+
+void QueryExecMgr::ExecFInstance(FragmentInstanceState* fis) {
+  fis->Exec();
+
+  ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS_IN_FLIGHT->Increment(-1L);
+  VLOG_QUERY << "Instance completed. instance_id=" << PrintId(fis->instance_id());
+
+#ifndef ADDRESS_SANITIZER
+  // tcmalloc and address sanitizer can not be used together
+  if (FLAGS_log_mem_usage_interval > 0) {
+    uint64_t num_complete = ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS->value();
+    if (num_complete % FLAGS_log_mem_usage_interval == 0) {
+      char buf[2048];
+      // This outputs how much memory is currently being used by this impalad
+      MallocExtension::instance()->GetStats(buf, 2048);
+      LOG(INFO) << buf;
+    }
+  }
+#endif
+
+  // decrement refcount taken in StartFInstance()
+  ReleaseQueryState(fis->query_state());
+}
+
+QueryState* QueryExecMgr::GetQueryState(const TUniqueId& query_id) {
+  VLOG_QUERY << "GetQueryState(): query_id=" << PrintId(query_id);
+  lock_guard<mutex> l(qs_map_lock_);
+  auto it = qs_map_.find(query_id);
+  if (it == qs_map_.end()) return nullptr;
+  QueryState* qs = it->second;
+  int32_t cnt = qs->refcnt_.Add(1);
+  DCHECK_GT(cnt, 0);
+  return qs;
+}
+
+void QueryExecMgr::ReleaseQueryState(QueryState* qs) {
+  DCHECK(qs != nullptr);
+  TUniqueId query_id;
+  {
+    int32_t cnt = qs->refcnt_.Add(-1);
+    VLOG_QUERY << "ReleaseQueryState(): query_id=" << PrintId(qs->query_id())
+               << " refcnt=" << cnt + 1;
+    DCHECK_GE(cnt, 0);
+    if (cnt > 0) return;
+    // don't reference anything from 'qs' beyond this point, 'qs' might get
+    // gc'd out from under us
+    query_id = qs->query_id();
+  }
+
+  {
+    // for now, gc right away
+    lock_guard<mutex> l(qs_map_lock_);
+    auto it = qs_map_.find(query_id);
+    // someone else might have gc'd the entry
+    if (it == qs_map_.end()) return;
+    qs = it->second;
+    DCHECK_EQ(qs->query_ctx().query_id, query_id);
+    int32_t cnt = qs->refcnt_.Load();
+    DCHECK_GE(cnt, 0);
+    // someone else might have increased the refcnt in the meantime
+    if (cnt > 0) return;
+    size_t num_erased = qs_map_.erase(qs->query_ctx().query_id);
+    DCHECK_EQ(num_erased, 1);
+  }
+  // TODO: send final status report during gc, but do this from a different thread
+  delete qs;
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/runtime/query-exec-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-exec-mgr.h b/be/src/runtime/query-exec-mgr.h
new file mode 100644
index 0000000..7b3fb84
--- /dev/null
+++ b/be/src/runtime/query-exec-mgr.h
@@ -0,0 +1,78 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+#ifndef IMPALA_RUNTIME_QUERY_EXEC_MGR_H
+#define IMPALA_RUNTIME_QUERY_EXEC_MGR_H
+
+#include <boost/thread/mutex.hpp>
+#include <unordered_map>
+
+#include "common/status.h"
+#include "util/uid-util.h"
+#include "gen-cpp/Types_types.h"
+
+namespace impala {
+
+class QueryState;
+class Thread;
+class TExecPlanFragmentParams;
+class TUniqueId;
+class FragmentInstanceState;
+
+/// A daemon-wide registry and manager of QueryStates. This is the central
+/// entry point for gaining refcounted access to a QueryState. It also initiates
+/// fragment instance execution.
+/// Thread-safe.
+///
+/// TODO: as part of Impala-2550 (per-query exec rpc)
+/// replace Start-/CancelFInstance() with StartQuery()/CancelQuery()
+class QueryExecMgr {
+ public:
+  /// Initiates execution of this fragment instance in a newly created thread.
+  /// Also creates a QueryState for this query, if none exists.
+  /// In both cases it increases the refcount prior to instance execution and decreases
+  /// it after execution finishes.
+  ///
+  /// Returns an error if there was some unrecoverable problem before the fragment
+  /// was started (like low memory). In that case, no QueryState is created or has its
+  /// refcount incremented. After this call returns, it is legal to call
+  /// FragmentInstanceState::Cancel() on this fragment instance, regardless of the
+  /// return value of this function.
+  Status StartFInstance(const TExecPlanFragmentParams& params);
+
+  /// If a QueryState for the given query exists, increments that refcount and returns
+  /// the QueryState, otherwise returns nullptr.
+  QueryState* GetQueryState(const TUniqueId& query_id);
+
+  /// Decrements the refcount for the given QueryState.
+  void ReleaseQueryState(QueryState* qs);
+
+ private:
+  /// protects qs_map_
+  boost::mutex qs_map_lock_;
+
+  /// map from query id to QueryState (owned by us)
+  std::unordered_map<TUniqueId, QueryState*> qs_map_;
+
+  /// Execute instance.
+  void ExecFInstance(FragmentInstanceState* fis);
+};
+
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4b2d76db/be/src/runtime/query-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
new file mode 100644
index 0000000..2757750
--- /dev/null
+++ b/be/src/runtime/query-state.cc
@@ -0,0 +1,69 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+#include "runtime/query-state.h"
+
+#include <boost/thread/locks.hpp>
+#include <boost/thread/lock_guard.hpp>
+
+#include "runtime/exec-env.h"
+#include "runtime/fragment-instance-state.h"
+#include "runtime/query-exec-mgr.h"
+
+using namespace impala;
+
+QueryState::ScopedRef::ScopedRef(const TUniqueId& query_id) {
+  DCHECK(ExecEnv::GetInstance()->query_exec_mgr() != nullptr);
+  query_state_ = ExecEnv::GetInstance()->query_exec_mgr()->GetQueryState(query_id);
+}
+
+QueryState::ScopedRef::~ScopedRef() {
+  if (query_state_ == nullptr) return;
+  ExecEnv::GetInstance()->query_exec_mgr()->ReleaseQueryState(query_state_);
+}
+
+QueryState::QueryState(const TQueryCtx& query_ctx)
+  : query_ctx_(query_ctx),
+    refcnt_(0) {
+  TQueryOptions& query_options = query_ctx_.client_request.query_options;
+  // max_errors does not indicate how many errors in total have been recorded, but rather
+  // how many are distinct. It is defined as the sum of the number of generic errors and
+  // the number of distinct other errors.
+  if (query_options.max_errors <= 0) {
+    // TODO: fix linker error and uncomment this
+    //query_options_.max_errors = FLAGS_max_errors;
+    query_options.max_errors = 100;
+  }
+  if (query_options.batch_size <= 0) {
+    query_options.__set_batch_size(DEFAULT_BATCH_SIZE);
+  }
+}
+
+void QueryState::RegisterFInstance(FragmentInstanceState* fis) {
+  VLOG_QUERY << "RegisterFInstance(): instance_id=" << PrintId(fis->instance_id());
+  lock_guard<mutex> l(fis_map_lock_);
+  DCHECK_EQ(fis_map_.count(fis->instance_id()), 0);
+  fis_map_.insert(make_pair(fis->instance_id(), fis));
+}
+
+FragmentInstanceState* QueryState::GetFInstanceState(const TUniqueId& instance_id) {
+  VLOG_FILE << "GetFInstanceState(): instance_id=" << PrintId(instance_id);
+  lock_guard<mutex> l(fis_map_lock_);
+  auto it = fis_map_.find(instance_id);
+  return it != fis_map_.end() ? it->second : nullptr;
+}