You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by lv...@apache.org on 2017/05/09 15:56:04 UTC

[10/13] incubator-impala git commit: IMPALA-2550: Switch to per-query exec rpc

IMPALA-2550: Switch to per-query exec rpc

Coordinator:
- FragmentInstanceState -> BackendState, which in turn records
  FragmentInstanceStats

QueryState
- does query-wide setup in a separate thread (which also launches
  the instance exec threads)
- has a query-wide 'prepared' state at which point all static setup
  is done and all FragmentInstanceStates are accessible

Also renamed QueryExecState to ClientRequestState.

Simplified handling of execution status (in FragmentInstanceState):
- status only transmitted via ReportExecStatus rpc
- in particular, it's not returned anymore from the Cancel rpc

FIS: Fixed bugs related to partially-prepared state (in Close() and ReleaseThreadToken())

Change-Id: I20769e420711737b6b385c744cef4851cee3facd
Reviewed-on: http://gerrit.cloudera.org:8080/6535
Reviewed-by: Marcel Kornacker <ma...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/368115cd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/368115cd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/368115cd

Branch: refs/heads/master
Commit: 368115cdae98344e5c826f099582a60aa536951d
Parents: 12f3ecc
Author: Marcel Kornacker <ma...@cloudera.com>
Authored: Mon Oct 31 14:42:51 2016 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Tue May 9 04:04:50 2017 +0000

----------------------------------------------------------------------
 be/src/benchmarks/expr-benchmark.cc             |    2 +-
 be/src/codegen/codegen-anyval.h                 |    2 +
 be/src/common/status.cc                         |    9 +
 be/src/common/status.h                          |    4 +
 be/src/exec/data-sink.cc                        |   36 +-
 be/src/exec/data-sink.h                         |   25 +-
 be/src/exec/exec-node.cc                        |   15 +-
 be/src/exec/exec-node.h                         |    6 +-
 be/src/exec/hdfs-avro-table-writer.cc           |    2 +
 be/src/exec/hdfs-avro-table-writer.h            |    3 +
 be/src/exec/hdfs-parquet-table-writer.cc        |    1 +
 be/src/exec/hdfs-scan-node-base.cc              |    8 +-
 be/src/exec/hdfs-scan-node-base.h               |    8 +-
 be/src/exec/hdfs-table-sink.cc                  |    1 +
 be/src/exec/hdfs-table-sink.h                   |    2 +-
 be/src/exec/hdfs-table-writer.cc                |    4 +
 be/src/exec/hdfs-table-writer.h                 |   16 +-
 be/src/exprs/expr-test.cc                       |    4 +-
 be/src/runtime/CMakeLists.txt                   |    3 +-
 be/src/runtime/buffered-block-mgr-test.cc       |    4 +-
 be/src/runtime/buffered-tuple-stream-v2-test.cc |    1 +
 be/src/runtime/bufferpool/buffer-pool-test.cc   |    1 +
 be/src/runtime/coordinator-backend-state.cc     |  524 ++++++
 be/src/runtime/coordinator-backend-state.h      |  272 +++
 be/src/runtime/coordinator-filter-state.h       |  123 ++
 be/src/runtime/coordinator.cc                   | 1613 +++++-------------
 be/src/runtime/coordinator.h                    |  378 ++--
 be/src/runtime/data-stream-test.cc              |   35 +-
 be/src/runtime/debug-options.cc                 |   83 +
 be/src/runtime/debug-options.h                  |   58 +
 be/src/runtime/descriptors.cc                   |   45 +-
 be/src/runtime/descriptors.h                    |   15 +-
 be/src/runtime/exec-env.cc                      |    5 +-
 be/src/runtime/exec-env.h                       |    6 +-
 be/src/runtime/fragment-instance-state.cc       |  500 ++++--
 be/src/runtime/fragment-instance-state.h        |  242 ++-
 be/src/runtime/mem-tracker.cc                   |    4 +-
 be/src/runtime/mem-tracker.h                    |    2 +-
 be/src/runtime/plan-fragment-executor.cc        |  518 ------
 be/src/runtime/plan-fragment-executor.h         |  305 ----
 be/src/runtime/query-exec-mgr.cc                |   82 +-
 be/src/runtime/query-exec-mgr.h                 |   34 +-
 be/src/runtime/query-state.cc                   |  271 ++-
 be/src/runtime/query-state.h                    |  179 +-
 be/src/runtime/runtime-filter-bank.cc           |    5 +-
 be/src/runtime/runtime-state.cc                 |   66 +-
 be/src/runtime/runtime-state.h                  |   29 +-
 be/src/runtime/test-env.cc                      |   34 +-
 be/src/runtime/test-env.h                       |    3 +-
 be/src/scheduling/query-schedule.cc             |   16 +-
 be/src/scheduling/query-schedule.h              |    8 +-
 be/src/service/CMakeLists.txt                   |    2 +-
 be/src/service/child-query.cc                   |    9 +-
 be/src/service/child-query.h                    |   22 +-
 be/src/service/client-request-state.cc          | 1085 ++++++++++++
 be/src/service/client-request-state.h           |  413 +++++
 be/src/service/fe-support.cc                    |    5 +-
 be/src/service/impala-beeswax-server.cc         |  121 +-
 be/src/service/impala-hs2-server.cc             |  143 +-
 be/src/service/impala-http-handler.cc           |   39 +-
 be/src/service/impala-internal-service.cc       |   60 +-
 be/src/service/impala-internal-service.h        |    8 +-
 be/src/service/impala-server.cc                 |  314 ++--
 be/src/service/impala-server.h                  |  334 ++--
 be/src/service/query-exec-state.cc              | 1084 ------------
 be/src/service/query-exec-state.h               |  408 -----
 be/src/testutil/desc-tbl-builder.cc             |    2 +-
 be/src/testutil/fault-injection-util.h          |    4 +-
 be/src/util/error-util-test.cc                  |    8 +-
 be/src/util/error-util.cc                       |    6 +-
 be/src/util/error-util.h                        |   10 +-
 be/src/util/uid-util.h                          |    4 +
 common/thrift/ExecStats.thrift                  |    1 +
 common/thrift/ImpalaInternalService.thrift      |  220 ++-
 tests/common/test_result_verifier.py            |   12 +-
 75 files changed, 5014 insertions(+), 4912 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/benchmarks/expr-benchmark.cc
----------------------------------------------------------------------
diff --git a/be/src/benchmarks/expr-benchmark.cc b/be/src/benchmarks/expr-benchmark.cc
index c6d075d..c9fd80f 100644
--- a/be/src/benchmarks/expr-benchmark.cc
+++ b/be/src/benchmarks/expr-benchmark.cc
@@ -74,7 +74,7 @@ class Planner {
     query_ctx.client_request.query_options = query_options_;
     query_ctx.__set_session(session_state_);
     ImpalaServer::PrepareQueryContext(&query_ctx);
-    runtime_state_.reset(new RuntimeState(query_ctx, &exec_env_, ""));
+    runtime_state_.reset(new RuntimeState(query_ctx, &exec_env_));
 
     return frontend_.GetExecRequest(query_ctx, result);
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/codegen/codegen-anyval.h
----------------------------------------------------------------------
diff --git a/be/src/codegen/codegen-anyval.h b/be/src/codegen/codegen-anyval.h
index 696f542..063004a 100644
--- a/be/src/codegen/codegen-anyval.h
+++ b/be/src/codegen/codegen-anyval.h
@@ -259,6 +259,8 @@ class CodegenAnyVal {
     : type_(INVALID_TYPE), value_(nullptr), name_(nullptr),
       codegen_(nullptr), builder_(nullptr) {}
 
+  LlvmCodeGen* codegen() const { return codegen_; }
+
  private:
   ColumnType type_;
   llvm::Value* value_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/common/status.cc
----------------------------------------------------------------------
diff --git a/be/src/common/status.cc b/be/src/common/status.cc
index e3b821c..1adc654 100644
--- a/be/src/common/status.cc
+++ b/be/src/common/status.cc
@@ -17,10 +17,13 @@
 
 #include <boost/algorithm/string/join.hpp>
 
+#include <ostream>
+
 #include "common/status.h"
 #include "util/debug-util.h"
 
 #include "common/names.h"
+#include "gen-cpp/ErrorCodes_types.h"
 
 namespace impala {
 
@@ -233,4 +236,10 @@ void Status::CopyMessageFrom(const Status& status) noexcept {
   msg_ = status.msg_ == NULL ? NULL : new ErrorMsg(*status.msg_);
 }
 
+ostream& operator<<(ostream& os, const Status& status) {
+  os << _TErrorCode_VALUES_TO_NAMES.at(status.code());
+  if (!status.ok()) os << ": " << status.GetDetail();
+  return os;
+}
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/common/status.h
----------------------------------------------------------------------
diff --git a/be/src/common/status.h b/be/src/common/status.h
index a3e44e0..9fa303b 100644
--- a/be/src/common/status.h
+++ b/be/src/common/status.h
@@ -19,6 +19,7 @@
 #ifndef IMPALA_COMMON_STATUS_H
 #define IMPALA_COMMON_STATUS_H
 
+#include <iosfwd>
 #include <string>
 #include <vector>
 
@@ -260,6 +261,9 @@ class Status {
   ErrorMsg* msg_;
 };
 
+/// for debugging
+std::ostream& operator<<(std::ostream& os, const Status& status);
+
 /// some generally useful macros
 #define RETURN_IF_ERROR(stmt)                          \
   do {                                                 \

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/exec/data-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/data-sink.cc b/be/src/exec/data-sink.cc
index 967eda3..dad6247 100644
--- a/be/src/exec/data-sink.cc
+++ b/be/src/exec/data-sink.cc
@@ -48,11 +48,12 @@ DataSink::~DataSink() {
   DCHECK(closed_);
 }
 
-Status DataSink::CreateDataSink(ObjectPool* pool,
-    const TDataSink& thrift_sink, const vector<TExpr>& output_exprs,
+Status DataSink::Create(ObjectPool* pool,
+    const TPlanFragmentCtx& fragment_ctx,
     const TPlanFragmentInstanceCtx& fragment_instance_ctx,
-    const RowDescriptor& row_desc, scoped_ptr<DataSink>* sink) {
-  DataSink* tmp_sink = NULL;
+    const RowDescriptor& row_desc, DataSink** sink) {
+  const TDataSink& thrift_sink = fragment_ctx.fragment.output_sink;
+  const vector<TExpr>& output_exprs = fragment_ctx.fragment.output_exprs;
   switch (thrift_sink.type) {
     case TDataSinkType::DATA_STREAM_SINK:
       if (!thrift_sink.__isset.stream_sink) {
@@ -60,52 +61,45 @@ Status DataSink::CreateDataSink(ObjectPool* pool,
       }
 
       // TODO: figure out good buffer size based on size of output row
-      tmp_sink = new DataStreamSender(pool,
-          fragment_instance_ctx.sender_id, row_desc, thrift_sink.stream_sink,
-          fragment_instance_ctx.destinations, 16 * 1024);
-      sink->reset(tmp_sink);
+      *sink = pool->Add(
+          new DataStreamSender(pool,
+            fragment_instance_ctx.sender_id, row_desc, thrift_sink.stream_sink,
+            fragment_ctx.destinations, 16 * 1024));
       break;
 
     case TDataSinkType::TABLE_SINK:
       if (!thrift_sink.__isset.table_sink) return Status("Missing table sink.");
       switch (thrift_sink.table_sink.type) {
         case TTableSinkType::HDFS:
-          tmp_sink = new HdfsTableSink(row_desc, output_exprs, thrift_sink);
-          sink->reset(tmp_sink);
+          *sink = pool->Add(new HdfsTableSink(row_desc, output_exprs, thrift_sink));
           break;
         case TTableSinkType::HBASE:
-          tmp_sink = new HBaseTableSink(row_desc, output_exprs, thrift_sink);
-          sink->reset(tmp_sink);
+          *sink = pool->Add(new HBaseTableSink(row_desc, output_exprs, thrift_sink));
           break;
         case TTableSinkType::KUDU:
           RETURN_IF_ERROR(CheckKuduAvailability());
-          tmp_sink = new KuduTableSink(row_desc, output_exprs, thrift_sink);
-          sink->reset(tmp_sink);
+          *sink = pool->Add(new KuduTableSink(row_desc, output_exprs, thrift_sink));
           break;
         default:
           stringstream error_msg;
           const char* str = "Unknown table sink";
           map<int, const char*>::const_iterator i =
               _TTableSinkType_VALUES_TO_NAMES.find(thrift_sink.table_sink.type);
-          if (i != _TTableSinkType_VALUES_TO_NAMES.end()) {
-            str = i->second;
-          }
+          if (i != _TTableSinkType_VALUES_TO_NAMES.end()) str = i->second;
           error_msg << str << " not implemented.";
           return Status(error_msg.str());
       }
 
       break;
     case TDataSinkType::PLAN_ROOT_SINK:
-      sink->reset(new PlanRootSink(row_desc, output_exprs, thrift_sink));
+      *sink = pool->Add(new PlanRootSink(row_desc, output_exprs, thrift_sink));
       break;
     default:
       stringstream error_msg;
       map<int, const char*>::const_iterator i =
           _TDataSinkType_VALUES_TO_NAMES.find(thrift_sink.type);
       const char* str = "Unknown data sink type ";
-      if (i != _TDataSinkType_VALUES_TO_NAMES.end()) {
-        str = i->second;
-      }
+      if (i != _TDataSinkType_VALUES_TO_NAMES.end()) str = i->second;
       error_msg << str << " not implemented.";
       return Status(error_msg.str());
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/exec/data-sink.h
----------------------------------------------------------------------
diff --git a/be/src/exec/data-sink.h b/be/src/exec/data-sink.h
index 0fc8eca..f20c40b 100644
--- a/be/src/exec/data-sink.h
+++ b/be/src/exec/data-sink.h
@@ -23,22 +23,21 @@
 #include <vector>
 
 #include "common/status.h"
-#include "runtime/runtime-state.h"
-#include "util/runtime-profile.h"
-#include "gen-cpp/DataSinks_types.h"
+#include "runtime/runtime-state.h"  // for PartitionStatusMap
+#include "runtime/mem-tracker.h"
 #include "gen-cpp/Exprs_types.h"
 
 namespace impala {
 
-class MemTracker;
 class ObjectPool;
 class RowBatch;
 class RuntimeProfile;
-class RuntimeState;
+class RowDescriptor;
+class TDataSink;
 class TPlanExecRequest;
 class TPlanExecParams;
 class TPlanFragmentInstanceCtx;
-class RowDescriptor;
+class TInsertStats;
 
 /// A data sink is an abstract interface for data sinks that consume RowBatches. E.g.
 /// a sink may write a HDFS table, send data across the network, or build hash tables
@@ -58,8 +57,8 @@ class DataSink {
   virtual std::string GetName() = 0;
 
   /// Setup. Call before Send(), Open(), or Close() during the prepare phase of the query
-  /// fragment. Creates a MemTracker for the sink that is a child of 'parent_mem_tracker'.
-  /// Subclasses must call DataSink::Prepare().
+  /// fragment. Creates a MemTracker (in obj_pool) for the sink that is a child of
+  /// 'parent_mem_tracker'. Subclasses must call DataSink::Prepare().
   virtual Status Prepare(RuntimeState* state, MemTracker* parent_mem_tracker);
 
   /// Call before Send() to open the sink.
@@ -78,12 +77,12 @@ class DataSink {
   /// Must be idempotent.
   virtual void Close(RuntimeState* state);
 
-  /// Creates a new data sink from thrift_sink. A pointer to the
-  /// new sink is written to *sink, and is owned by the caller.
-  static Status CreateDataSink(ObjectPool* pool,
-    const TDataSink& thrift_sink, const std::vector<TExpr>& output_exprs,
+  /// Creates a new data sink, allocated in pool and returned through *sink, from
+  /// thrift_sink.
+  static Status Create(ObjectPool* pool,
+    const TPlanFragmentCtx& fragment_ctx,
     const TPlanFragmentInstanceCtx& fragment_instance_ctx,
-    const RowDescriptor& row_desc, boost::scoped_ptr<DataSink>* sink);
+    const RowDescriptor& row_desc, DataSink** sink);
 
   /// Merges one update to the DML stats for a partition. dst_stats will have the
   /// combined stats of src_stats and dst_stats after this method returns.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/exec/exec-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/exec-node.cc b/be/src/exec/exec-node.cc
index 02d4bf6..4c06ece 100644
--- a/be/src/exec/exec-node.cc
+++ b/be/src/exec/exec-node.cc
@@ -372,16 +372,17 @@ Status ExecNode::CreateNode(ObjectPool* pool, const TPlanNode& tnode,
   return Status::OK();
 }
 
-void ExecNode::SetDebugOptions(
-    int node_id, TExecNodePhase::type phase, TDebugAction::type action,
-    ExecNode* root) {
-  if (root->id_ == node_id) {
-    root->debug_phase_ = phase;
-    root->debug_action_ = action;
+void ExecNode::SetDebugOptions(const TDebugOptions& debug_options, ExecNode* root) {
+  DCHECK(debug_options.__isset.node_id);
+  DCHECK(debug_options.__isset.phase);
+  DCHECK(debug_options.__isset.action);
+  if (root->id_ == debug_options.node_id) {
+    root->debug_phase_ = debug_options.phase;
+    root->debug_action_ = debug_options.action;
     return;
   }
   for (int i = 0; i < root->children_.size(); ++i) {
-    SetDebugOptions(node_id, phase, action, root->children_[i]);
+    SetDebugOptions(debug_options, root->children_[i]);
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/exec/exec-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/exec-node.h b/be/src/exec/exec-node.h
index aa0c379..c769be3 100644
--- a/be/src/exec/exec-node.h
+++ b/be/src/exec/exec-node.h
@@ -43,6 +43,7 @@ class TupleRow;
 class DataSink;
 class MemTracker;
 class SubplanNode;
+class TDebugOptions;
 
 /// Superclass of all executor nodes.
 /// All subclasses need to make sure to check RuntimeState::is_cancelled()
@@ -133,9 +134,8 @@ class ExecNode {
   static Status CreateTree(RuntimeState* state, const TPlan& plan,
       const DescriptorTbl& descs, ExecNode** root) WARN_UNUSED_RESULT;
 
-  /// Set debug action for node with given id in 'tree'
-  static void SetDebugOptions(
-      int node_id, TExecNodePhase::type phase, TDebugAction::type action, ExecNode* tree);
+  /// Set debug action in 'tree' according to debug_options.
+  static void SetDebugOptions(const TDebugOptions& debug_options, ExecNode* tree);
 
   /// Collect all nodes of given 'node_type' that are part of this subtree, and return in
   /// 'nodes'.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/exec/hdfs-avro-table-writer.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-avro-table-writer.cc b/be/src/exec/hdfs-avro-table-writer.cc
index ec0ee08..00a51dd 100644
--- a/be/src/exec/hdfs-avro-table-writer.cc
+++ b/be/src/exec/hdfs-avro-table-writer.cc
@@ -24,6 +24,7 @@
 #include <gutil/strings/substitute.h>
 
 #include "exec/exec-node.h"
+#include "exec/hdfs-table-sink.h"
 #include "util/compress.h"
 #include "util/hdfs-util.h"
 #include "util/uid-util.h"
@@ -35,6 +36,7 @@
 #include "runtime/row-batch.h"
 #include "runtime/runtime-state.h"
 #include "runtime/hdfs-fs-cache.h"
+#include "runtime/types.h"
 #include "util/runtime-profile-counters.h"
 #include "write-stream.inline.h"
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/exec/hdfs-avro-table-writer.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-avro-table-writer.h b/be/src/exec/hdfs-avro-table-writer.h
index 38e820e..01a79f7 100644
--- a/be/src/exec/hdfs-avro-table-writer.h
+++ b/be/src/exec/hdfs-avro-table-writer.h
@@ -22,13 +22,16 @@
 #include <sstream>
 #include <string>
 
+#include "common/status.h"
 #include "exec/hdfs-table-writer.h"
+#include "runtime/mem-pool.h"
 #include "util/codec.h"
 #include "exec/write-stream.h"
 
 namespace impala {
 
 class Expr;
+struct ColumnType;
 class TupleDescriptor;
 class TupleRow;
 class RuntimeState;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/exec/hdfs-parquet-table-writer.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-table-writer.cc b/be/src/exec/hdfs-parquet-table-writer.cc
index 5c2d24c..ea0059d 100644
--- a/be/src/exec/hdfs-parquet-table-writer.cc
+++ b/be/src/exec/hdfs-parquet-table-writer.cc
@@ -18,6 +18,7 @@
 #include "exec/hdfs-parquet-table-writer.h"
 
 #include "common/version.h"
+#include "exec/hdfs-table-sink.h"
 #include "exec/parquet-column-stats.inline.h"
 #include "exprs/expr-context.h"
 #include "exprs/expr.h"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/exec/hdfs-scan-node-base.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc
index b22e67c..e4ba44d 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -312,7 +312,7 @@ Status HdfsScanNodeBase::Prepare(RuntimeState* state) {
   ImpaladMetrics::NUM_RANGES_MISSING_VOLUME_ID->Increment(num_ranges_missing_volume_id);
 
   // Add per volume stats to the runtime profile
-  PerVolumnStats per_volume_stats;
+  PerVolumeStats per_volume_stats;
   stringstream str;
   UpdateHdfsSplitStats(*scan_range_params_, &per_volume_stats);
   PrintHdfsSplitStats(per_volume_stats, &str);
@@ -808,7 +808,7 @@ void HdfsScanNodeBase::ComputeSlotMaterializationOrder(vector<int>* order) const
 
 void HdfsScanNodeBase::UpdateHdfsSplitStats(
     const vector<TScanRangeParams>& scan_range_params_list,
-    PerVolumnStats* per_volume_stats) {
+    PerVolumeStats* per_volume_stats) {
   pair<int, int64_t> init_value(0, 0);
   for (const TScanRangeParams& scan_range_params: scan_range_params_list) {
     const TScanRange& scan_range = scan_range_params.scan_range;
@@ -821,9 +821,9 @@ void HdfsScanNodeBase::UpdateHdfsSplitStats(
   }
 }
 
-void HdfsScanNodeBase::PrintHdfsSplitStats(const PerVolumnStats& per_volume_stats,
+void HdfsScanNodeBase::PrintHdfsSplitStats(const PerVolumeStats& per_volume_stats,
     stringstream* ss) {
-  for (PerVolumnStats::const_iterator i = per_volume_stats.begin();
+  for (PerVolumeStats::const_iterator i = per_volume_stats.begin();
        i != per_volume_stats.end(); ++i) {
      (*ss) << i->first << ":" << i->second.first << "/"
          << PrettyPrinter::Print(i->second.second, TUnit::BYTES) << " ";

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/exec/hdfs-scan-node-base.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.h b/be/src/exec/hdfs-scan-node-base.h
index 3d97e2e..3453945 100644
--- a/be/src/exec/hdfs-scan-node-base.h
+++ b/be/src/exec/hdfs-scan-node-base.h
@@ -268,16 +268,18 @@ class HdfsScanNodeBase : public ScanNode {
   }
 
   /// map from volume id to <number of split, per volume split lengths>
-  typedef boost::unordered_map<int32_t, std::pair<int, int64_t>> PerVolumnStats;
+  /// TODO: move this into some global .h, no need to include this file just for this
+  /// typedef
+  typedef boost::unordered_map<int32_t, std::pair<int, int64_t>> PerVolumeStats;
 
   /// Update the per volume stats with the given scan range params list
   static void UpdateHdfsSplitStats(
       const std::vector<TScanRangeParams>& scan_range_params_list,
-      PerVolumnStats* per_volume_stats);
+      PerVolumeStats* per_volume_stats);
 
   /// Output the per_volume_stats to stringstream. The output format is a list of:
   /// <volume id>:<# splits>/<per volume split lengths>
-  static void PrintHdfsSplitStats(const PerVolumnStats& per_volume_stats,
+  static void PrintHdfsSplitStats(const PerVolumeStats& per_volume_stats,
       std::stringstream* ss);
 
   /// Description string for the per volume stats output.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/exec/hdfs-table-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-table-sink.cc b/be/src/exec/hdfs-table-sink.cc
index ef983e2..16833c1 100644
--- a/be/src/exec/hdfs-table-sink.cc
+++ b/be/src/exec/hdfs-table-sink.cc
@@ -16,6 +16,7 @@
 // under the License.
 
 #include "exec/hdfs-table-sink.h"
+#include "exec/hdfs-table-writer.h"
 #include "exec/hdfs-text-table-writer.h"
 #include "exec/hdfs-sequence-table-writer.h"
 #include "exec/hdfs-avro-table-writer.h"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/exec/hdfs-table-sink.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-table-sink.h b/be/src/exec/hdfs-table-sink.h
index d452f3d..4ccd2fe 100644
--- a/be/src/exec/hdfs-table-sink.h
+++ b/be/src/exec/hdfs-table-sink.h
@@ -26,6 +26,7 @@
 /// needed for scoped_ptr to work on ObjectPool
 #include "common/object-pool.h"
 #include "exec/data-sink.h"
+#include "exec/hdfs-table-writer.h"
 #include "runtime/descriptors.h"
 
 namespace impala {
@@ -34,7 +35,6 @@ class Expr;
 class TupleDescriptor;
 class TupleRow;
 class RuntimeState;
-class HdfsTableWriter;
 class MemTracker;
 
 /// Records the temporary and final Hdfs file name, the opened temporary Hdfs file, and

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/exec/hdfs-table-writer.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-table-writer.cc b/be/src/exec/hdfs-table-writer.cc
index 48349e3..b84915a 100644
--- a/be/src/exec/hdfs-table-writer.cc
+++ b/be/src/exec/hdfs-table-writer.cc
@@ -17,8 +17,12 @@
 
 #include "exec/hdfs-table-writer.h"
 
+#include <sstream>
+
 #include "common/names.h"
 #include "runtime/mem-tracker.h"
+#include "exec/hdfs-table-sink.h"
+#include "util/hdfs-util.h"
 
 namespace impala {
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/exec/hdfs-table-writer.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-table-writer.h b/be/src/exec/hdfs-table-writer.h
index 86fe56c..cad304f 100644
--- a/be/src/exec/hdfs-table-writer.h
+++ b/be/src/exec/hdfs-table-writer.h
@@ -19,16 +19,22 @@
 #ifndef IMPALA_EXEC_HDFS_TABLE_WRITER_H
 #define IMPALA_EXEC_HDFS_TABLE_WRITER_H
 
+#include <vector>
 #include <hdfs.h>
-#include <boost/scoped_ptr.hpp>
-#include <boost/unordered_map.hpp>
 
-#include "runtime/descriptors.h"
-#include "exec/hdfs-table-sink.h"
-#include "util/hdfs-util.h"
+#include "common/status.h"
+#include "gen-cpp/ImpalaInternalService_types.h"
 
 namespace impala {
 
+class RuntimeState;
+class OutputPartition;
+class ExprContext;
+class RowBatch;
+class HdfsPartitionDescriptor;
+class HdfsTableDescriptor;
+class HdfsTableSink;
+
 /// Pure virtual class for writing to hdfs table partition files.
 /// Subclasses implement the code needed to write to a specific file type.
 /// A subclass needs to implement functions to format and add rows to the file

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/exprs/expr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/expr-test.cc b/be/src/exprs/expr-test.cc
index 0de6833..a775e7b 100644
--- a/be/src/exprs/expr-test.cc
+++ b/be/src/exprs/expr-test.cc
@@ -1057,7 +1057,7 @@ void TestSingleLiteralConstruction(
     const ColumnType& type, const T& value, const string& string_val) {
   ObjectPool pool;
   RowDescriptor desc;
-  RuntimeState state{TQueryCtx(), ExecEnv::GetInstance(), "test-pool"};
+  RuntimeState state(TQueryCtx(), ExecEnv::GetInstance());
   MemTracker tracker;
 
   Expr* expr = pool.Add(new Literal(type, value));
@@ -1074,7 +1074,7 @@ TEST_F(ExprTest, NullLiteral) {
   for (int type = TYPE_BOOLEAN; type != TYPE_DATE; ++type) {
     NullLiteral expr(static_cast<PrimitiveType>(type));
     ExprContext ctx(&expr);
-    RuntimeState state{TQueryCtx(), ExecEnv::GetInstance(), "test-pool"};
+    RuntimeState state(TQueryCtx(), ExecEnv::GetInstance());
     MemTracker tracker;
     EXPECT_OK(ctx.Prepare(&state, RowDescriptor(), &tracker));
     EXPECT_OK(ctx.Open(&state));

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/runtime/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt
index 773192d..2de0f2e 100644
--- a/be/src/runtime/CMakeLists.txt
+++ b/be/src/runtime/CMakeLists.txt
@@ -29,9 +29,11 @@ add_library(Runtime
   buffered-tuple-stream-v2.cc
   client-cache.cc
   coordinator.cc
+  coordinator-backend-state.cc
   data-stream-mgr.cc
   data-stream-sender.cc
   data-stream-recvr.cc
+  debug-options.cc
   descriptors.cc
   disk-io-mgr.cc
   disk-io-mgr-reader-context.cc
@@ -48,7 +50,6 @@ add_library(Runtime
   mem-pool.cc
   multi-precision.cc
   parallel-executor.cc
-  plan-fragment-executor.cc
   query-exec-mgr.cc
   query-state.cc
   test-env.cc

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/runtime/buffered-block-mgr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-block-mgr-test.cc b/be/src/runtime/buffered-block-mgr-test.cc
index 65fd168..b8b90ee 100644
--- a/be/src/runtime/buffered-block-mgr-test.cc
+++ b/be/src/runtime/buffered-block-mgr-test.cc
@@ -573,7 +573,7 @@ class BufferedBlockMgrTest : public ::testing::Test {
     const int num_threads = 8;
     thread_group workers;
     // Create a shared RuntimeState with no BufferedBlockMgr.
-    RuntimeState shared_state(TQueryCtx(), test_env_->exec_env(), "test-pool");
+    RuntimeState shared_state(TQueryCtx(), test_env_->exec_env());
 
     for (int i = 0; i < num_threads; ++i) {
       thread* t = new thread(
@@ -978,7 +978,7 @@ TEST_F(BufferedBlockMgrTest, WriteCompleteWithCancelledRuntimeState) {
 
   // Cancel the runtime state and re-pin the blocks while writes are in flight to check
   // that WriteComplete() handles the case ok.
-  state->set_is_cancelled(true);
+  state->set_is_cancelled();
   PinBlocks(blocks);
 
   WaitForWrites(block_mgr);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/runtime/buffered-tuple-stream-v2-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-tuple-stream-v2-test.cc b/be/src/runtime/buffered-tuple-stream-v2-test.cc
index da75212..1ae9181 100644
--- a/be/src/runtime/buffered-tuple-stream-v2-test.cc
+++ b/be/src/runtime/buffered-tuple-stream-v2-test.cc
@@ -26,6 +26,7 @@
 #include "codegen/llvm-codegen.h"
 #include "gutil/gscoped_ptr.h"
 #include "runtime/buffered-tuple-stream-v2.inline.h"
+#include "runtime/query-state.h"
 #include "runtime/bufferpool/reservation-tracker.h"
 #include "runtime/collection-value-builder.h"
 #include "runtime/collection-value.h"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/runtime/bufferpool/buffer-pool-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool-test.cc b/be/src/runtime/bufferpool/buffer-pool-test.cc
index dae7bcd..6b9177e 100644
--- a/be/src/runtime/bufferpool/buffer-pool-test.cc
+++ b/be/src/runtime/bufferpool/buffer-pool-test.cc
@@ -34,6 +34,7 @@
 #include "runtime/bufferpool/buffer-pool.h"
 #include "runtime/bufferpool/reservation-tracker.h"
 #include "runtime/test-env.h"
+#include "runtime/query-state.h"
 #include "service/fe-support.h"
 #include "testutil/cpu-util.h"
 #include "testutil/death-test-util.h"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/runtime/coordinator-backend-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator-backend-state.cc b/be/src/runtime/coordinator-backend-state.cc
new file mode 100644
index 0000000..cd3a741
--- /dev/null
+++ b/be/src/runtime/coordinator-backend-state.cc
@@ -0,0 +1,524 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/coordinator-backend-state.h"
+
+#include <sstream>
+#include <boost/thread/locks.hpp>
+#include <boost/thread/lock_guard.hpp>
+#include <boost/accumulators/accumulators.hpp>
+
+#include "common/object-pool.h"
+#include "exec/exec-node.h"
+#include "exec/scan-node.h"
+#include "scheduling/query-schedule.h"
+#include "runtime/exec-env.h"
+#include "runtime/fragment-instance-state.h"
+#include "runtime/debug-options.h"
+#include "runtime/client-cache.h"
+#include "runtime/client-cache-types.h"
+#include "runtime/backend-client.h"
+#include "runtime/coordinator-filter-state.h"
+#include "util/error-util.h"
+#include "util/uid-util.h"
+#include "util/network-util.h"
+#include "util/counting-barrier.h"
+#include "util/progress-updater.h"
+#include "gen-cpp/Types_types.h"
+#include "gen-cpp/ImpalaInternalService_types.h"
+#include "gen-cpp/ImpalaInternalService_constants.h"
+#include "common/names.h"
+
+using namespace impala;
+namespace accumulators = boost::accumulators;
+
+Coordinator::BackendState::BackendState(
+    const TUniqueId& query_id, int state_idx, TRuntimeFilterMode::type filter_mode)
+  : query_id_(query_id),
+    state_idx_(state_idx),
+    filter_mode_(filter_mode),
+    rpc_latency_(0),
+    rpc_sent_(false),
+    peak_consumption_(0L) {
+}
+
+void Coordinator::BackendState::Init(
+    const vector<const FInstanceExecParams*>& instance_params_list,
+    const vector<FragmentStats*>& fragment_stats, ObjectPool* obj_pool) {
+  instance_params_list_ = instance_params_list;
+  host_ = instance_params_list_[0]->host;
+  num_remaining_instances_ = instance_params_list.size();
+
+  // populate instance_stats_map_ and install instance
+  // profiles as child profiles in fragment_stats' profile
+  int prev_fragment_idx = -1;
+  for (const FInstanceExecParams* instance_params: instance_params_list) {
+    DCHECK_EQ(host_, instance_params->host);  // all hosts must be the same
+    int fragment_idx = instance_params->fragment().idx;
+    DCHECK_LT(fragment_idx, fragment_stats.size());
+    if (prev_fragment_idx != -1 && fragment_idx != prev_fragment_idx) {
+      // all instances of a fragment are contiguous
+      DCHECK_EQ(fragments_.count(fragment_idx), 0);
+      prev_fragment_idx = fragment_idx;
+    }
+    fragments_.insert(fragment_idx);
+
+    instance_stats_map_.emplace(
+        GetInstanceIdx(instance_params->instance_id),
+        obj_pool->Add(
+          new InstanceStats(*instance_params, fragment_stats[fragment_idx], obj_pool)));
+  }
+}
+
+void Coordinator::BackendState::SetRpcParams(
+    const DebugOptions& debug_options, const FilterRoutingTable& filter_routing_table,
+    TExecQueryFInstancesParams* rpc_params) {
+  rpc_params->__set_protocol_version(ImpalaInternalServiceVersion::V1);
+  rpc_params->__set_coord_state_idx(state_idx_);
+
+  // set fragment_ctxs and fragment_instance_ctxs
+  rpc_params->fragment_instance_ctxs.resize(instance_params_list_.size());
+  for (int i = 0; i < instance_params_list_.size(); ++i) {
+    TPlanFragmentInstanceCtx& instance_ctx = rpc_params->fragment_instance_ctxs[i];
+    const FInstanceExecParams& params = *instance_params_list_[i];
+    int fragment_idx = params.fragment_exec_params.fragment.idx;
+
+    // add a TPlanFragmentCtx, if we don't already have it
+    if (rpc_params->fragment_ctxs.empty()
+        || rpc_params->fragment_ctxs.back().fragment.idx != fragment_idx) {
+      rpc_params->fragment_ctxs.emplace_back();
+      TPlanFragmentCtx& fragment_ctx = rpc_params->fragment_ctxs.back();
+      fragment_ctx.__set_fragment(params.fragment_exec_params.fragment);
+      fragment_ctx.__set_destinations(params.fragment_exec_params.destinations);
+    }
+
+    instance_ctx.fragment_idx = fragment_idx;
+    instance_ctx.fragment_instance_id = params.instance_id;
+    instance_ctx.per_fragment_instance_idx = params.per_fragment_instance_idx;
+    instance_ctx.__set_per_node_scan_ranges(params.per_node_scan_ranges);
+    instance_ctx.__set_per_exch_num_senders(
+        params.fragment_exec_params.per_exch_num_senders);
+    instance_ctx.__set_sender_id(params.sender_id);
+    if (debug_options.node_id() != -1
+        && (debug_options.instance_idx() == -1
+            || debug_options.instance_idx() == GetInstanceIdx(params.instance_id))) {
+      instance_ctx.__set_debug_options(debug_options.ToThrift());
+    }
+
+    if (filter_mode_ == TRuntimeFilterMode::OFF) continue;
+
+    // Remove filters that weren't selected during filter routing table construction.
+    // TODO: do this more efficiently, we're looping over the entire plan for each
+    // instance separately
+    DCHECK_EQ(rpc_params->query_ctx.client_request.query_options.mt_dop, 0);
+    int instance_idx = GetInstanceIdx(params.instance_id);
+    for (TPlanNode& plan_node: rpc_params->fragment_ctxs.back().fragment.plan.nodes) {
+      if (!plan_node.__isset.hash_join_node) continue;
+      if (!plan_node.__isset.runtime_filters) continue;
+
+      vector<TRuntimeFilterDesc> required_filters;
+      for (const TRuntimeFilterDesc& desc: plan_node.runtime_filters) {
+        FilterRoutingTable::const_iterator filter_it =
+            filter_routing_table.find(desc.filter_id);
+        // filter was dropped in Coordinator::InitFilterRoutingTable()
+        if (filter_it == filter_routing_table.end()) continue;
+        const FilterState& f = filter_it->second;
+        if (f.src_fragment_instance_idxs().find(instance_idx)
+            == f.src_fragment_instance_idxs().end()) {
+          DCHECK(desc.is_broadcast_join);
+          continue;
+        }
+        // We don't need a target-side check here, because a filter is either sent to
+        // all its targets or none, and the none case is handled by checking if the
+        // filter is in the routing table.
+        required_filters.push_back(desc);
+      }
+      plan_node.__set_runtime_filters(required_filters);
+    }
+  }
+}
+
+void Coordinator::BackendState::Exec(
+    const TQueryCtx& query_ctx, const DebugOptions& debug_options,
+    const FilterRoutingTable& filter_routing_table,
+    CountingBarrier* exec_complete_barrier) {
+  NotifyBarrierOnExit notifier(exec_complete_barrier);
+  TExecQueryFInstancesParams rpc_params;
+  rpc_params.__set_query_ctx(query_ctx);
+  SetRpcParams(debug_options, filter_routing_table, &rpc_params);
+  VLOG_FILE << "making rpc: ExecQueryFInstances"
+      << " host=" << impalad_address() << " query_id=" << PrintId(query_id_);
+
+  // guard against concurrent UpdateBackendExecStatus() that may arrive after RPC returns
+  lock_guard<mutex> l(lock_);
+  int64_t start = MonotonicMillis();
+
+  ImpalaBackendConnection backend_client(
+      ExecEnv::GetInstance()->impalad_client_cache(), impalad_address(), &status_);
+  if (!status_.ok()) return;
+
+  TExecQueryFInstancesResult thrift_result;
+  Status rpc_status = backend_client.DoRpc(
+      &ImpalaBackendClient::ExecQueryFInstances, rpc_params, &thrift_result);
+  rpc_sent_ = true;
+  rpc_latency_ = MonotonicMillis() - start;
+
+  const string ERR_TEMPLATE =
+      "ExecQueryFInstances rpc query_id=$0 failed: $1";
+
+  if (!rpc_status.ok()) {
+    const string& err_msg =
+        Substitute(ERR_TEMPLATE, PrintId(query_id_), rpc_status.msg().msg());
+    VLOG_QUERY << err_msg;
+    status_ = Status(err_msg);
+    return;
+  }
+
+  Status exec_status = Status(thrift_result.status);
+  if (!exec_status.ok()) {
+    const string& err_msg = Substitute(ERR_TEMPLATE, PrintId(query_id_),
+        exec_status.msg().GetFullMessageDetails());
+    VLOG_QUERY << err_msg;
+    status_ = Status(err_msg);
+    return;
+  }
+
+  for (const auto& entry: instance_stats_map_) entry.second->stopwatch_.Start();
+  VLOG_FILE << "rpc succeeded: ExecQueryFInstances query_id=" << PrintId(query_id_);
+}
+
+Status Coordinator::BackendState::GetStatus(TUniqueId* failed_instance_id) {
+  lock_guard<mutex> l(lock_);
+  if (!status_.ok() && failed_instance_id != nullptr) {
+    *failed_instance_id = failed_instance_id_;
+  }
+  return status_;
+}
+
+int64_t Coordinator::BackendState::GetPeakConsumption() {
+  lock_guard<mutex> l(lock_);
+  return peak_consumption_;
+}
+
+void Coordinator::BackendState::MergeErrorLog(ErrorLogMap* merged) {
+  lock_guard<mutex> l(lock_);
+  if (error_log_.size() > 0)  MergeErrorMaps(error_log_, merged);
+}
+
+bool Coordinator::BackendState::IsDone() {
+  lock_guard<mutex> l(lock_);
+  return IsDoneInternal();
+}
+
+inline bool Coordinator::BackendState::IsDoneInternal() const {
+  return num_remaining_instances_ == 0 || !status_.ok();
+}
+
+void Coordinator::BackendState::ApplyExecStatusReport(
+    const TReportExecStatusParams& backend_exec_status, ExecSummary* exec_summary,
+    ProgressUpdater* scan_range_progress, bool* done) {
+  lock_guard<SpinLock> l1(exec_summary->lock);
+  lock_guard<mutex> l2(lock_);
+  for (const TFragmentInstanceExecStatus& instance_exec_status:
+      backend_exec_status.instance_exec_status) {
+    Status instance_status(instance_exec_status.status);
+    if (instance_status.ok()) {
+      int instance_idx = GetInstanceIdx(instance_exec_status.fragment_instance_id);
+      DCHECK_EQ(instance_stats_map_.count(instance_idx), 1);
+      InstanceStats* instance_stats = instance_stats_map_[instance_idx];
+      DCHECK_EQ(instance_stats->exec_params_.instance_id,
+          instance_exec_status.fragment_instance_id);
+      instance_stats->Update(instance_exec_status, exec_summary, scan_range_progress);
+      if (instance_stats->peak_mem_counter_ != nullptr) {
+        // protect against out-of-order status updates
+        peak_consumption_ =
+            max(peak_consumption_, instance_stats->peak_mem_counter_->value());
+      }
+    } else {
+      // if a query is aborted due to an error encountered by a single fragment instance,
+      // all other fragment instances will report a cancelled status; make sure not
+      // to mask the original error status
+      if (status_.ok() || status_.IsCancelled()) {
+        status_ = instance_status;
+        failed_instance_id_ = instance_exec_status.fragment_instance_id;
+      }
+    }
+    DCHECK_GT(num_remaining_instances_, 0);
+    if (instance_exec_status.done) --num_remaining_instances_;
+
+    // TODO: clean up the ReportQuerySummary() mess
+    if (status_.ok()) {
+      // We can't update this backend's profile if ReportQuerySummary() is running,
+      // because it depends on all profiles not changing during its execution (when it
+      // calls SortChildren()). ReportQuerySummary() only gets called after
+      // WaitForBackendCompletion() returns or at the end of CancelFragmentInstances().
+      // WaitForBackendCompletion() only returns after all backends have completed (in
+      // which case we wouldn't be in this function), or when there's an error, in which
+      // case CancelFragmentInstances() is called. CancelFragmentInstances sets all
+      // exec_state's statuses to cancelled.
+      // TODO: We're losing this profile information. Call ReportQuerySummary only after
+      // all backends have completed.
+    }
+  }
+
+  // Log messages aggregated by type
+  if (backend_exec_status.__isset.error_log && backend_exec_status.error_log.size() > 0) {
+    // Append the log messages from each update with the global state of the query
+    // execution
+    MergeErrorMaps(backend_exec_status.error_log, &error_log_);
+    VLOG_FILE << "host=" << host_ << " error log: " << PrintErrorMapToString(error_log_);
+  }
+
+  *done = IsDoneInternal();
+  // TODO: keep backend-wide stopwatch?
+}
+
+void Coordinator::BackendState::UpdateExecStats(
+    const vector<FragmentStats*>& fragment_stats) {
+  lock_guard<mutex> l(lock_);
+  for (const auto& entry: instance_stats_map_) {
+    const InstanceStats& instance_stats = *entry.second;
+    int fragment_idx = instance_stats.exec_params_.fragment().idx;
+    DCHECK_LT(fragment_idx, fragment_stats.size());
+    FragmentStats* f = fragment_stats[fragment_idx];
+    int64_t completion_time = instance_stats.stopwatch_.ElapsedTime();
+    f->completion_times_(completion_time);
+    if (completion_time > 0) {
+      f->rates_(instance_stats.total_split_size_
+        / (completion_time / 1000.0 / 1000.0 / 1000.0));
+    }
+    f->avg_profile_->UpdateAverage(instance_stats.profile_);
+  }
+}
+
+bool Coordinator::BackendState::Cancel() {
+  lock_guard<mutex> l(lock_);
+
+  // Nothing to cancel if the exec rpc was not sent
+  if (!rpc_sent_) return false;
+
+  // don't cancel if it already finished (for any reason)
+  if (IsDoneInternal()) return false;
+
+  /// If the status is not OK, we still try to cancel - !OK status might mean
+  /// communication failure between backend and coordinator, but fragment
+  /// instances might still be running.
+
+  // set an error status to make sure we only cancel this once
+  if (status_.ok()) status_ = Status::CANCELLED;
+
+  Status status;
+  ImpalaBackendConnection backend_client(
+      ExecEnv::GetInstance()->impalad_client_cache(), impalad_address(), &status);
+  if (!status.ok()) return false;
+  TCancelQueryFInstancesParams params;
+  params.protocol_version = ImpalaInternalServiceVersion::V1;
+  params.__set_query_id(query_id_);
+  TCancelQueryFInstancesResult dummy;
+  VLOG_QUERY << "sending CancelQueryFInstances rpc for query_id="
+             << query_id_ << " backend=" << impalad_address();
+  Status rpc_status;
+  // Try to send the RPC 3 times before failing.
+  bool retry_is_safe;
+  for (int i = 0; i < 3; ++i) {
+    rpc_status = backend_client.DoRpc(
+        &ImpalaBackendClient::CancelQueryFInstances, params, &dummy, &retry_is_safe);
+    if (rpc_status.ok() || !retry_is_safe) break;
+  }
+  if (!rpc_status.ok()) {
+    status_.MergeStatus(rpc_status);
+    stringstream msg;
+    msg << "CancelQueryFInstances rpc query_id=" << query_id_
+        << " failed: " << rpc_status.msg().msg();
+    status_.AddDetail(msg.str());
+    return true;
+  }
+  return true;
+}
+
+void Coordinator::BackendState::PublishFilter(
+    shared_ptr<TPublishFilterParams> rpc_params) {
+  DCHECK_EQ(rpc_params->dst_query_id, query_id_);
+  if (fragments_.count(rpc_params->dst_fragment_idx) == 0) return;
+  Status status;
+  ImpalaBackendConnection backend_client(
+      ExecEnv::GetInstance()->impalad_client_cache(), host_, &status);
+  if (!status.ok()) return;
+  // Make a local copy of the shared 'master' set of parameters
+  TPublishFilterParams local_params(*rpc_params);
+  local_params.__set_bloom_filter(rpc_params->bloom_filter);
+  TPublishFilterResult res;
+  backend_client.DoRpc(&ImpalaBackendClient::PublishFilter, local_params, &res);
+  // TODO: switch back to the following once we fix the lifecycle
+  // problems of Coordinator
+  //std::cref(fragment_inst->impalad_address()),
+  //std::cref(fragment_inst->fragment_instance_id())));
+}
+
+Coordinator::BackendState::InstanceStats::InstanceStats(
+    const FInstanceExecParams& exec_params, FragmentStats* fragment_stats,
+    ObjectPool* obj_pool)
+  : exec_params_(exec_params),
+    profile_(nullptr),
+    profile_created_(false),
+    total_split_size_(0),
+    total_ranges_complete_(0) {
+  const string& profile_name = Substitute("Instance $0 (host=$1)",
+      PrintId(exec_params.instance_id), lexical_cast<string>(exec_params.host));
+  profile_ = obj_pool->Add(new RuntimeProfile(obj_pool, profile_name));
+  fragment_stats->root_profile()->AddChild(profile_);
+
+  // add total split size to fragment_stats->bytes_assigned()
+  for (const PerNodeScanRanges::value_type& entry: exec_params_.per_node_scan_ranges) {
+    for (const TScanRangeParams& scan_range_params: entry.second) {
+      if (!scan_range_params.scan_range.__isset.hdfs_file_split) continue;
+      total_split_size_ += scan_range_params.scan_range.hdfs_file_split.length;
+    }
+  }
+  (*fragment_stats->bytes_assigned())(total_split_size_);
+}
+
+void Coordinator::BackendState::InstanceStats::InitCounters() {
+  vector<RuntimeProfile*> children;
+  profile_->GetAllChildren(&children);
+  for (RuntimeProfile* p: children) {
+    PlanNodeId id = ExecNode::GetNodeIdFromProfile(p);
+    // This profile is not for an exec node.
+    if (id == g_ImpalaInternalService_constants.INVALID_PLAN_NODE_ID) continue;
+
+    RuntimeProfile::Counter* c =
+        p->GetCounter(ScanNode::SCAN_RANGES_COMPLETE_COUNTER);
+    if (c != nullptr) scan_ranges_complete_counters_.push_back(c);
+  }
+
+  peak_mem_counter_ =
+      profile_->GetCounter(FragmentInstanceState::PER_HOST_PEAK_MEM_COUNTER);
+}
+
+void Coordinator::BackendState::InstanceStats::Update(
+    const TFragmentInstanceExecStatus& exec_status,
+    ExecSummary* exec_summary, ProgressUpdater* scan_range_progress) {
+  DCHECK(Status(exec_status.status).ok());
+  if (exec_status.done) stopwatch_.Stop();
+  profile_->Update(exec_status.profile);
+  if (!profile_created_) {
+    profile_created_ = true;
+    InitCounters();
+  }
+  profile_->ComputeTimeInProfile();
+
+  // update exec_summary
+  // TODO: why do this every time we get an updated instance profile?
+  vector<RuntimeProfile*> children;
+  profile_->GetAllChildren(&children);
+
+  TExecSummary& thrift_exec_summary = exec_summary->thrift_exec_summary;
+  for (RuntimeProfile* child: children) {
+    int node_id = ExecNode::GetNodeIdFromProfile(child);
+    if (node_id == -1) continue;
+
+    // TODO: create plan_node_id_to_summary_map_
+    TPlanNodeExecSummary& node_exec_summary =
+        thrift_exec_summary.nodes[exec_summary->node_id_to_idx_map[node_id]];
+    int per_fragment_instance_idx = exec_params_.per_fragment_instance_idx;
+    DCHECK_LT(per_fragment_instance_idx, node_exec_summary.exec_stats.size())
+        << " node_id=" << node_id << " instance_id=" << PrintId(exec_params_.instance_id)
+        << " fragment_idx=" << exec_params_.fragment().idx;
+    TExecStats& instance_stats =
+        node_exec_summary.exec_stats[per_fragment_instance_idx];
+
+    RuntimeProfile::Counter* rows_counter = child->GetCounter("RowsReturned");
+    RuntimeProfile::Counter* mem_counter = child->GetCounter("PeakMemoryUsage");
+    if (rows_counter != nullptr) instance_stats.__set_cardinality(rows_counter->value());
+    if (mem_counter != nullptr) instance_stats.__set_memory_used(mem_counter->value());
+    instance_stats.__set_latency_ns(child->local_time());
+    // TODO: track interesting per-node metrics
+    node_exec_summary.__isset.exec_stats = true;
+  }
+
+  // determine newly-completed scan ranges and update scan_range_progress
+  int64_t total = 0;
+  for (RuntimeProfile::Counter* c: scan_ranges_complete_counters_) total += c->value();
+  int64_t delta = total - total_ranges_complete_;
+  total_ranges_complete_ = total;
+  scan_range_progress->Update(delta);
+}
+
+Coordinator::FragmentStats::FragmentStats(const string& avg_profile_name,
+    const string& root_profile_name, int num_instances, ObjectPool* obj_pool)
+  : avg_profile_(
+      obj_pool->Add(new RuntimeProfile(obj_pool, avg_profile_name, true))),
+    root_profile_(
+      obj_pool->Add(new RuntimeProfile(obj_pool, root_profile_name))),
+    num_instances_(num_instances) {
+}
+
+void Coordinator::FragmentStats::AddSplitStats() {
+  double min = accumulators::min(bytes_assigned_);
+  double max = accumulators::max(bytes_assigned_);
+  double mean = accumulators::mean(bytes_assigned_);
+  double stddev = sqrt(accumulators::variance(bytes_assigned_));
+  stringstream ss;
+  ss << " min: " << PrettyPrinter::Print(min, TUnit::BYTES)
+    << ", max: " << PrettyPrinter::Print(max, TUnit::BYTES)
+    << ", avg: " << PrettyPrinter::Print(mean, TUnit::BYTES)
+    << ", stddev: " << PrettyPrinter::Print(stddev, TUnit::BYTES);
+  avg_profile_->AddInfoString("split sizes", ss.str());
+}
+
+// Comparator to order RuntimeProfiles by descending total time
+typedef struct {
+  typedef pair<RuntimeProfile*, bool> Profile;
+  bool operator()(const Profile& a, const Profile& b) const {
+    // Reverse ordering: we want the longest first
+    return
+        a.first->total_time_counter()->value() > b.first->total_time_counter()->value();
+  }
+} InstanceComparator;
+
+void Coordinator::FragmentStats::AddExecStats() {
+  InstanceComparator comparator;
+  root_profile_->SortChildren(comparator);
+
+  stringstream times_label;
+  times_label
+    << "min:" << PrettyPrinter::Print(
+        accumulators::min(completion_times_), TUnit::TIME_NS)
+    << "  max:" << PrettyPrinter::Print(
+        accumulators::max(completion_times_), TUnit::TIME_NS)
+    << "  mean: " << PrettyPrinter::Print(
+        accumulators::mean(completion_times_), TUnit::TIME_NS)
+    << "  stddev:" << PrettyPrinter::Print(
+        sqrt(accumulators::variance(completion_times_)), TUnit::TIME_NS);
+
+  stringstream rates_label;
+  rates_label
+    << "min:" << PrettyPrinter::Print(
+        accumulators::min(rates_), TUnit::BYTES_PER_SECOND)
+    << "  max:" << PrettyPrinter::Print(
+        accumulators::max(rates_), TUnit::BYTES_PER_SECOND)
+    << "  mean:" << PrettyPrinter::Print(
+        accumulators::mean(rates_), TUnit::BYTES_PER_SECOND)
+    << "  stddev:" << PrettyPrinter::Print(
+        sqrt(accumulators::variance(rates_)), TUnit::BYTES_PER_SECOND);
+
+  // why plural?
+  avg_profile_->AddInfoString("completion times", times_label.str());
+  // why plural?
+  avg_profile_->AddInfoString("execution rates", rates_label.str());
+  avg_profile_->AddInfoString("num instances", lexical_cast<string>(num_instances_));
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/runtime/coordinator-backend-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator-backend-state.h b/be/src/runtime/coordinator-backend-state.h
new file mode 100644
index 0000000..b4f9fea
--- /dev/null
+++ b/be/src/runtime/coordinator-backend-state.h
@@ -0,0 +1,272 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_RUNTIME_COORDINATOR_BACKEND_STATE_H
+#define IMPALA_RUNTIME_COORDINATOR_BACKEND_STATE_H
+
+#include <vector>
+#include <unordered_set>
+
+#include <boost/thread/mutex.hpp>
+
+#include "runtime/coordinator.h"
+#include "util/progress-updater.h"
+#include "util/stopwatch.h"
+#include "util/runtime-profile.h"
+#include "gen-cpp/Types_types.h"
+
+namespace impala {
+
+class ProgressUpdater;
+class FInstanceExecParams;
+class ObjectPool;
+class DebugOptions;
+class CountingBarrier;
+class TUniqueId;
+class TQueryCtx;
+class TReportExecStatusParams;
+class ExecSummary;
+
+/// This class manages all aspects of the execution of all fragment instances of a
+/// single query on a particular backend.
+/// Thread-safe unless pointed out otherwise.
+class Coordinator::BackendState {
+ public:
+  BackendState(const TUniqueId& query_id, int state_idx,
+      TRuntimeFilterMode::type filter_mode);
+
+  /// Creates InstanceStats for all entries in instance_params_list in obj_pool
+  /// and installs the instance profiles as children of the corresponding FragmentStats'
+  /// root profile.
+  /// Separated from c'tor to simplify future handling of out-of-mem errors.
+  void Init(const vector<const FInstanceExecParams*>& instance_params_list,
+      const std::vector<FragmentStats*>& fragment_stats, ObjectPool* obj_pool);
+
+  /// Starts query execution at this backend by issuing an ExecQueryFInstances rpc and
+  /// notifies on rpc_complete_barrier when the rpc completes. Success/failure is
+  /// communicated through GetStatus(). Uses filter_routing_table to remove filters
+  /// that weren't selected during its construction.
+  /// The debug_options are applied to the appropriate TPlanFragmentInstanceCtxs, based
+  /// on their node_id/instance_idx.
+  void Exec(const TQueryCtx& query_ctx, const DebugOptions& debug_options,
+      const FilterRoutingTable& filter_routing_table,
+      CountingBarrier* rpc_complete_barrier);
+
+  /// Update overall execution status, including the instances' exec status/profiles
+  /// and the error log. Updates the fragment instances' TExecStats in exec_summary
+  /// (exec_summary->nodes.exec_stats) and updates progress_update, and sets
+  /// done to true if all fragment instances completed, regardless of status.
+  /// If any instance reports an error, the overall execution status becomes the first
+  /// reported error status and 'done' is set to true.
+  void ApplyExecStatusReport(const TReportExecStatusParams& backend_exec_status,
+      ExecSummary* exec_summary, ProgressUpdater* scan_range_progress, bool* done);
+
+  /// Update completion_times, rates, and avg_profile for all fragment_stats.
+  void UpdateExecStats(const std::vector<FragmentStats*>& fragment_stats);
+
+  /// Make a PublishFilter rpc with given params if this backend has instances of the
+  /// fragment with idx == rpc_params->dst_fragment_idx, otherwise do nothing.
+  /// This takes by-value parameters because we cannot guarantee that the originating
+  /// coordinator won't be destroyed while this executes.
+  /// TODO: switch to references when we fix the lifecycle problems of coordinators.
+  void PublishFilter(std::shared_ptr<TPublishFilterParams> rpc_params);
+
+  /// Cancel execution at this backend if anything is running. Returns true
+  /// if cancellation was attempted, false otherwise.
+  bool Cancel();
+
+  /// Return the overall execution status. For an error status, also return the id
+  /// of the instance that caused that status, if failed_instance_id != nullptr.
+  Status GetStatus(TUniqueId* failed_instance_id = nullptr) WARN_UNUSED_RESULT;
+
+  /// Return peak memory consumption.
+  int64_t GetPeakConsumption();
+
+  /// Merge the accumulated error log into 'merged'.
+  void MergeErrorLog(ErrorLogMap* merged);
+
+  const TNetworkAddress& impalad_address() const { return host_; }
+  int state_idx() const { return state_idx_; }
+
+  /// only valid after Exec()
+  int64_t rpc_latency() const { return rpc_latency_; }
+
+  /// Return true if execution at this backend is done.
+  bool IsDone();
+
+ private:
+  /// Execution stats for a single fragment instance.
+  /// Not thread-safe.
+  class InstanceStats {
+   public:
+    InstanceStats(const FInstanceExecParams& exec_params, FragmentStats* fragment_stats,
+        ObjectPool* obj_pool);
+
+    /// Update 'this' with exec_status, the fragment instances' TExecStats in
+    /// exec_summary, and 'progress_updater' with the number of
+    /// newly completed scan ranges. Also updates the instance's avg profile.
+    void Update(const TFragmentInstanceExecStatus& exec_status,
+        ExecSummary* exec_summary, ProgressUpdater* scan_range_progress);
+
+    int per_fragment_instance_idx() const {
+      return exec_params_.per_fragment_instance_idx;
+    }
+
+   private:
+    friend class BackendState;
+
+    /// query lifetime
+    const FInstanceExecParams& exec_params_;
+
+    /// owned by coordinator object pool provided in the c'tor, created in Update()
+    RuntimeProfile* profile_;
+
+    /// true after the first call to profile->Update()
+    bool profile_created_;
+
+    /// cumulative size of all splits; set in c'tor
+    int64_t total_split_size_;
+
+    /// wall clock timer for this instance
+    MonotonicStopWatch stopwatch_;
+
+    /// total scan ranges complete across all scan nodes
+    int64_t total_ranges_complete_;
+
+    /// SCAN_RANGES_COMPLETE_COUNTERs in profile_
+    std::vector<RuntimeProfile::Counter*> scan_ranges_complete_counters_;
+
+    /// PER_HOST_PEAK_MEM_COUNTER
+    RuntimeProfile::Counter* peak_mem_counter_;
+
+    /// Extract scan_ranges_complete_counters_ and peak_mem_counter_ from profile_.
+    void InitCounters();
+  };
+
+  const TUniqueId query_id_;
+  const int state_idx_;  /// index of 'this' in Coordinator::backend_states_
+  const TRuntimeFilterMode::type filter_mode_;
+
+  /// all instances of a particular fragment are contiguous in this vector;
+  /// query lifetime
+  std::vector<const FInstanceExecParams*> instance_params_list_;
+
+  /// map from instance idx to InstanceStats, the latter live in the obj_pool parameter
+  /// of Init()
+  std::unordered_map<int, InstanceStats*> instance_stats_map_;
+
+  /// indices of fragments executing on this backend, populated in Init()
+  std::unordered_set<int> fragments_;
+
+  TNetworkAddress host_;
+
+  /// protects fields below
+  /// lock ordering: Coordinator::lock_ must only be obtained *prior* to lock_
+  boost::mutex lock_;
+
+  // number of in-flight instances
+  int num_remaining_instances_;
+
+  /// If the status indicates an error status, execution has either been aborted by the
+  /// executing impalad (which then reported the error) or cancellation has been
+  /// initiated; either way, execution must not be cancelled.
+  Status status_;
+
+  /// Id of the first fragment instance that reports an error status.
+  /// Invalid if no fragment instance has reported an error status.
+  TUniqueId failed_instance_id_;
+
+  /// Errors reported by this fragment instance.
+  ErrorLogMap error_log_;
+
+  /// Time, in ms, that it took to execute the ExecRemoteFragment() RPC.
+  int64_t rpc_latency_;
+
+  /// If true, ExecPlanFragment() rpc has been sent - even if it was not determined to be
+  /// successful.
+  bool rpc_sent_;
+
+  /// peak memory used for this query (value of that node's query memtracker's
+  /// peak_consumption()
+  int64_t peak_consumption_;
+
+  /// Fill in rpc_params based on state. Uses filter_routing_table to remove filters
+  /// that weren't selected during its construction.
+  void SetRpcParams(const DebugOptions& debug_options,
+      const FilterRoutingTable& filter_routing_table,
+      TExecQueryFInstancesParams* rpc_params);
+
+  /// Return true if execution at this backend is done. Doesn't acquire lock.
+  bool IsDoneInternal() const;
+};
+
+/// Per fragment execution statistics.
+class Coordinator::FragmentStats {
+ public:
+  /// typedef for boost utility to compute averaged stats
+  typedef boost::accumulators::accumulator_set<int64_t,
+      boost::accumulators::features<
+      boost::accumulators::tag::min,
+      boost::accumulators::tag::max,
+      boost::accumulators::tag::mean,
+      boost::accumulators::tag::variance>
+  > SummaryStats;
+
+  /// Create avg and root profiles in obj_pool.
+  FragmentStats(const std::string& avg_profile_name,
+      const std::string& root_profile_name,
+      int num_instances, ObjectPool* obj_pool);
+
+  RuntimeProfile* avg_profile() { return avg_profile_; }
+  RuntimeProfile* root_profile() { return root_profile_; }
+  SummaryStats* bytes_assigned() { return &bytes_assigned_; }
+
+  /// Compute stats for 'bytes_assigned' and add as info string to avg_profile.
+  void AddSplitStats();
+
+  /// Add summary string with execution stats to avg profile.
+  void AddExecStats();
+
+ private:
+  friend class BackendState;
+
+  /// Averaged profile for this fragment.  Stored in obj_pool.
+  /// The counters in this profile are averages (type AveragedCounter) of the
+  /// counters in the fragment instance profiles.
+  /// Note that the individual fragment instance profiles themselves are stored and
+  /// displayed as children of the root_profile below.
+  RuntimeProfile* avg_profile_;
+
+  /// root profile for all fragment instances for this fragment; resides in obj_pool
+  RuntimeProfile* root_profile_;
+
+  /// Number of instances running this fragment.
+  int num_instances_;
+
+  /// Bytes assigned for instances of this fragment
+  SummaryStats bytes_assigned_;
+
+  /// Completion times for instances of this fragment
+  SummaryStats completion_times_;
+
+  /// Execution rates for instances of this fragment
+  SummaryStats rates_;
+};
+
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/runtime/coordinator-filter-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator-filter-state.h b/be/src/runtime/coordinator-filter-state.h
new file mode 100644
index 0000000..61dece9
--- /dev/null
+++ b/be/src/runtime/coordinator-filter-state.h
@@ -0,0 +1,123 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+#include <memory>
+#include <vector>
+#include <boost/unordered_set.hpp>
+
+#include "runtime/coordinator.h"
+#include "gen-cpp/ImpalaInternalService_types.h"
+#include "gen-cpp/PlanNodes_types.h"
+#include "gen-cpp/Types_types.h"
+
+namespace impala {
+
+class MemTracker;
+
+/// Represents a runtime filter target.
+struct Coordinator::FilterTarget {
+  TPlanNodeId node_id;
+  bool is_local;
+  bool is_bound_by_partition_columns;
+  int fragment_idx;
+
+  FilterTarget(const TRuntimeFilterTargetDesc& desc, int f_idx)
+    : node_id(desc.node_id),
+      is_local(desc.is_local_target),
+      is_bound_by_partition_columns(desc.is_bound_by_partition_columns),
+      fragment_idx(f_idx) {}
+};
+
+/// State of filters that are received for aggregation.
+///
+/// A broadcast join filter is published as soon as the first update is received for it
+/// and subsequent updates are ignored (as they will be the same).
+/// Updates for a partitioned join filter are aggregated in 'bloom_filter' and this is
+/// published once 'pending_count' reaches 0 and if the filter was not disabled before
+/// that.
+///
+/// A filter is disabled if an always_true filter update is received, an OOM is hit,
+/// filter aggregation is complete or if the query is complete.
+/// Once a filter is disabled, subsequent updates for that filter are ignored.
+class Coordinator::FilterState {
+ public:
+  FilterState(const TRuntimeFilterDesc& desc, const TPlanNodeId& src)
+    : desc_(desc), src_(src), pending_count_(0), first_arrival_time_(0L),
+      completion_time_(0L), disabled_(false) { }
+
+  TBloomFilter* bloom_filter() { return bloom_filter_.get(); }
+  boost::unordered_set<int>* src_fragment_instance_idxs() {
+    return &src_fragment_instance_idxs_;
+  }
+  const boost::unordered_set<int>& src_fragment_instance_idxs() const {
+    return src_fragment_instance_idxs_;
+  }
+  std::vector<FilterTarget>* targets() { return &targets_; }
+  const std::vector<FilterTarget>& targets() const { return targets_; }
+  int64_t first_arrival_time() const { return first_arrival_time_; }
+  int64_t completion_time() const { return completion_time_; }
+  const TPlanNodeId& src() const { return src_; }
+  const TRuntimeFilterDesc& desc() const { return desc_; }
+  int pending_count() const { return pending_count_; }
+  void set_pending_count(int pending_count) { pending_count_ = pending_count; }
+  bool disabled() const { return disabled_; }
+
+  /// Aggregates partitioned join filters and updates memory consumption.
+  /// Disables filter if always_true filter is received or OOM is hit.
+  void ApplyUpdate(const TUpdateFilterParams& params, Coordinator* coord);
+
+  /// Disables a filter. A disabled filter consumes no memory.
+  void Disable(MemTracker* tracker);
+
+ private:
+  /// Contains the specification of the runtime filter.
+  TRuntimeFilterDesc desc_;
+
+  TPlanNodeId src_;
+  std::vector<FilterTarget> targets_;
+
+  // Indices of source fragment instances (as returned by GetInstanceIdx()).
+  boost::unordered_set<int> src_fragment_instance_idxs_;
+
+  /// Number of remaining backends to hear from before filter is complete.
+  int pending_count_;
+
+  /// BloomFilter aggregated from all source plan nodes, to be broadcast to all
+  /// destination plan fragment instances. Owned by this object so that it can be
+  /// deallocated once finished with. Only set for partitioned joins (broadcast joins
+  /// need no aggregation).
+  /// In order to avoid memory spikes, an incoming filter is moved (vs. copied) to the
+  /// output structure in the case of a broadcast join. Similarly, for partitioned joins,
+  /// the filter is moved from the following member to the output structure.
+  std::unique_ptr<TBloomFilter> bloom_filter_;
+
+  /// Time at which first local filter arrived.
+  int64_t first_arrival_time_;
+
+  /// Time at which all local filters arrived.
+  int64_t completion_time_;
+
+  /// True if the filter is permanently disabled for this query.
+  bool disabled_;
+
+  /// TODO: Add a per-object lock so that we can avoid holding the global filter_lock_
+  /// for every filter update.
+
+};
+
+}