You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2016/10/06 00:31:15 UTC

[1/4] incubator-impala git commit: IMPALA-3229: Don't assume that AUX exists just because of shell env

Repository: incubator-impala
Updated Branches:
  refs/heads/master 3be113cb9 -> 08636ace0


IMPALA-3229: Don't assume that AUX exists just because of shell env

This caused breakage of the custom cluster tests -
bin/impala-config.sh sets IMPALA_AUX_TEST_HOME to
$IMPALA_HOME/../Impala-auxiliary-tests, unless it is already set, but
this directory is not guaranteed to
exist. bin/run-custom-cluster-tests.sh was expecting that if
IMPALA_AUX_TEST_HOME is set, then that directory must exist.

While I'm changing bin/run-custom-cluster-tests.sh, add some quotes to
protect against paths with spaces in them.

Change-Id: Ieffb6d52c5f95d3257de8c47a039ccb0d45f0867
Reviewed-on: http://gerrit.cloudera.org:8080/4563
Reviewed-by: Jim Apple <jb...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/5d9f5be6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/5d9f5be6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/5d9f5be6

Branch: refs/heads/master
Commit: 5d9f5be6e398a4014856db8d1608d2089345c22a
Parents: 3be113c
Author: Jim Apple <jb...@cloudera.com>
Authored: Thu Sep 29 09:15:57 2016 -0700
Committer: Internal Jenkins <cl...@gerrit.cloudera.org>
Committed: Wed Oct 5 22:55:36 2016 +0000

----------------------------------------------------------------------
 tests/run-custom-cluster-tests.sh | 29 ++++++++++++++++-------------
 1 file changed, 16 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5d9f5be6/tests/run-custom-cluster-tests.sh
----------------------------------------------------------------------
diff --git a/tests/run-custom-cluster-tests.sh b/tests/run-custom-cluster-tests.sh
index 2b25cdc..adfc02a 100755
--- a/tests/run-custom-cluster-tests.sh
+++ b/tests/run-custom-cluster-tests.sh
@@ -27,18 +27,21 @@ trap 'echo Error in $0 at line $LINENO: $(cd "'$PWD'" && awk "NR == $LINENO" $0)
 # TODO: Combine with run-process-failure-tests.sh
 export HEAPCHECK=
 
-AUX_CUSTOM_DIR=""
-if [ -n ${IMPALA_AUX_TEST_HOME} ]; then
-    AUX_CUSTOM_DIR=${IMPALA_AUX_TEST_HOME}/tests/aux_custom_cluster_tests/
-fi
-
-export LOG_DIR=${IMPALA_CUSTOM_CLUSTER_TEST_LOGS_DIR}
-RESULTS_DIR=${IMPALA_CUSTOM_CLUSTER_TEST_LOGS_DIR}/results
-mkdir -p ${RESULTS_DIR}
+export LOG_DIR="${IMPALA_CUSTOM_CLUSTER_TEST_LOGS_DIR}"
+RESULTS_DIR="${IMPALA_CUSTOM_CLUSTER_TEST_LOGS_DIR}/results"
+mkdir -p "${RESULTS_DIR}"
 
 # KERBEROS TODO We'll want to pass kerberos status in here.
-cd ${IMPALA_HOME}/tests
-. ${IMPALA_HOME}/bin/set-classpath.sh &> /dev/null
-impala-py.test custom_cluster/ authorization/ ${AUX_CUSTOM_DIR} \
-    --junitxml="${RESULTS_DIR}/TEST-impala-custom-cluster.xml" \
-    --resultlog="${RESULTS_DIR}/TEST-impala-custom-cluster.log" "$@"
+cd "${IMPALA_HOME}/tests"
+. "${IMPALA_HOME}/bin/set-classpath.sh" &> /dev/null
+
+AUX_CUSTOM_DIR="${IMPALA_AUX_TEST_HOME}/tests/aux_custom_cluster_tests/"
+ARGS=(custom_cluster/ authorization/)
+if [[ -d "${AUX_CUSTOM_DIR}" ]]
+then
+  ARGS+=("${AUX_CUSTOM_DIR}")
+fi
+ARGS+=('--junitxml="${RESULTS_DIR}/TEST-impala-custom-cluster.xml"')
+ARGS+=('--resultlog="${RESULTS_DIR}/TEST-impala-custom-cluster.log"')
+ARGS+=("$@")
+impala-py.test "${ARGS[@]}"


[4/4] incubator-impala git commit: IMPALA-4239: fix buffer pool test failures in release build

Posted by ta...@apache.org.
IMPALA-4239: fix buffer pool test failures in release build

Gtest's ASSERT_DEBUG_DEATH macro has peculiar semantics where in debug
builds it executes the code in a forked process, so it has no visible
side-effects, but in release builds it executes the code as normal. This
makes it difficult to write death tests that work in both debug and
release builds. To avoid this problem, update our wrapper macro to omit
the code in release builds (where we can't actually test DCHECKs
anyway).

Change-Id: Ia560e702ecac2d29dc72f444645d5a91743c95e3
Reviewed-on: http://gerrit.cloudera.org:8080/4596
Reviewed-by: Alex Behm <al...@cloudera.com>
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/08636ace
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/08636ace
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/08636ace

Branch: refs/heads/master
Commit: 08636ace0f1f569a4256b598685dd8e5fdda0d87
Parents: a9b9933
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Mon Oct 3 08:57:50 2016 -0700
Committer: Internal Jenkins <cl...@gerrit.cloudera.org>
Committed: Thu Oct 6 00:23:30 2016 +0000

----------------------------------------------------------------------
 be/src/testutil/death-test-util.h | 10 ++++++++++
 1 file changed, 10 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/08636ace/be/src/testutil/death-test-util.h
----------------------------------------------------------------------
diff --git a/be/src/testutil/death-test-util.h b/be/src/testutil/death-test-util.h
index d84b374..8522b61 100644
--- a/be/src/testutil/death-test-util.h
+++ b/be/src/testutil/death-test-util.h
@@ -24,11 +24,21 @@
 
 // Wrapper around gtest's ASSERT_DEBUG_DEATH that prevents coredumps and minidumps
 // being generated as the result of the death test.
+#ifndef NDEBUG
 #define IMPALA_ASSERT_DEBUG_DEATH(fn, msg)    \
   do {                                        \
     ScopedCoredumpDisabler disable_coredumps; \
     ASSERT_DEBUG_DEATH(fn, msg);              \
   } while (false);
+#else
+// Gtest's ASSERT_DEBUG_DEATH macro has peculiar semantics where in debug builds it
+// executes the code in a forked process, so it has no visible side-effects, but in
+// release builds it executes the code as normal. This makes it difficult to write
+// death tests that work in both debug and release builds. To avoid this problem, update
+// our wrapper macro to simply omit the death test expression in release builds, where we
+// can't actually test DCHECKs anyway.
+#define IMPALA_ASSERT_DEBUG_DEATH(fn, msg)
+#endif
 
 namespace impala {
 


[2/4] incubator-impala git commit: IMPALA-3902: Scheduler improvements for running multiple fragment instances on a single backend

Posted by ta...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/be/src/scheduling/query-schedule.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/query-schedule.cc b/be/src/scheduling/query-schedule.cc
index b5745c4..5ad84df 100644
--- a/be/src/scheduling/query-schedule.cc
+++ b/be/src/scheduling/query-schedule.cc
@@ -52,25 +52,90 @@ QuerySchedule::QuerySchedule(const TUniqueId& query_id,
     query_events_(query_events),
     num_fragment_instances_(0),
     num_scan_ranges_(0),
+    next_instance_id_(query_id),
     is_admitted_(false) {
   fragment_exec_params_.resize(request.fragments.size());
-  // Build two maps to map node ids to their fragments as well as to the offset in their
-  // fragment's plan's nodes list.
-  for (int i = 0; i < request.fragments.size(); ++i) {
-    int node_idx = 0;
-    for (const TPlanNode& node: request.fragments[i].plan.nodes) {
-      if (plan_node_to_fragment_idx_.size() < node.node_id + 1) {
-        plan_node_to_fragment_idx_.resize(node.node_id + 1);
-        plan_node_to_plan_node_idx_.resize(node.node_id + 1);
+  bool is_mt_execution = request.query_ctx.request.query_options.mt_dop > 0;
+
+  if (is_mt_execution) {
+    /// TODO-MT: remove else branch and move MtInit() logic here
+    MtInit();
+  } else {
+    // Build two maps to map node ids to their fragments as well as to the offset in their
+    // fragment's plan's nodes list.
+    for (int i = 0; i < request.fragments.size(); ++i) {
+      int node_idx = 0;
+      for (const TPlanNode& node: request.fragments[i].plan.nodes) {
+        if (plan_node_to_fragment_idx_.size() < node.node_id + 1) {
+          plan_node_to_fragment_idx_.resize(node.node_id + 1);
+          plan_node_to_plan_node_idx_.resize(node.node_id + 1);
+        }
+        DCHECK_EQ(plan_node_to_fragment_idx_.size(), plan_node_to_plan_node_idx_.size());
+        plan_node_to_fragment_idx_[node.node_id] = i;
+        plan_node_to_plan_node_idx_[node.node_id] = node_idx;
+        ++node_idx;
       }
-      DCHECK_EQ(plan_node_to_fragment_idx_.size(), plan_node_to_plan_node_idx_.size());
-      plan_node_to_fragment_idx_[node.node_id] = i;
-      plan_node_to_plan_node_idx_[node.node_id] = node_idx;
-      ++node_idx;
     }
   }
 }
 
+void QuerySchedule::MtInit() {
+  // extract TPlanFragments and order by fragment id
+  vector<const TPlanFragment*> fragments;
+  for (const TPlanExecInfo& plan_exec_info: request_.mt_plan_exec_info) {
+    for (const TPlanFragment& fragment: plan_exec_info.fragments) {
+      fragments.push_back(&fragment);
+    }
+  }
+  sort(fragments.begin(), fragments.end(),
+      [](const TPlanFragment* a, const TPlanFragment* b) { return a->idx < b->idx; });
+
+  DCHECK_EQ(mt_fragment_exec_params_.size(), 0);
+  for (const TPlanFragment* fragment: fragments) {
+    mt_fragment_exec_params_.emplace_back(*fragment);
+  }
+
+  // mark coordinator fragment
+  const TPlanFragment& coord_fragment = request_.mt_plan_exec_info[0].fragments[0];
+  if (coord_fragment.partition.type == TPartitionType::UNPARTITIONED) {
+    mt_fragment_exec_params_[coord_fragment.idx].is_coord_fragment = true;
+    next_instance_id_.lo = 1;  // generated instance ids start at 1
+  }
+
+  // compute input fragments and find max node id
+  int max_node_id = 0;
+  for (const TPlanExecInfo& plan_exec_info: request_.mt_plan_exec_info) {
+    for (const TPlanFragment& fragment: plan_exec_info.fragments) {
+      for (const TPlanNode& node: fragment.plan.nodes) {
+        max_node_id = max(node.node_id, max_node_id);
+      }
+    }
+
+    // fragments[i] sends its output to fragments[dest_fragment_idx[i-1]]
+    for (int i = 1; i < plan_exec_info.fragments.size(); ++i) {
+      const TPlanFragment& fragment = plan_exec_info.fragments[i];
+      FragmentIdx dest_idx =
+          plan_exec_info.fragments[plan_exec_info.dest_fragment_idx[i - 1]].idx;
+      MtFragmentExecParams& dest_params = mt_fragment_exec_params_[dest_idx];
+      dest_params.input_fragments.push_back(fragment.idx);
+    }
+  }
+
+  // populate plan_node_to_fragment_idx_ and plan_node_to_plan_node_idx_
+  plan_node_to_fragment_idx_.resize(max_node_id + 1);
+  plan_node_to_plan_node_idx_.resize(max_node_id + 1);
+  for (const TPlanExecInfo& plan_exec_info: request_.mt_plan_exec_info) {
+    for (const TPlanFragment& fragment: plan_exec_info.fragments) {
+      for (int i = 0; i < fragment.plan.nodes.size(); ++i) {
+        const TPlanNode& node = fragment.plan.nodes[i];
+        plan_node_to_fragment_idx_[node.node_id] = fragment.idx;
+        plan_node_to_plan_node_idx_[node.node_id] = i;
+      }
+    }
+  }
+}
+
+
 int64_t QuerySchedule::GetClusterMemoryEstimate() const {
   DCHECK_GT(unique_hosts_.size(), 0);
   const int64_t total_cluster_mem = GetPerHostMemoryEstimate() * unique_hosts_.size();
@@ -122,4 +187,73 @@ void QuerySchedule::SetUniqueHosts(const unordered_set<TNetworkAddress>& unique_
   unique_hosts_ = unique_hosts;
 }
 
+TUniqueId QuerySchedule::GetNextInstanceId() {
+  TUniqueId result = next_instance_id_;
+  ++next_instance_id_.lo;
+  return result;
+}
+
+const TPlanFragment& FInstanceExecParams::fragment() const {
+  return fragment_exec_params.fragment;
+}
+
+int QuerySchedule::GetNumFragmentInstances() const {
+  if (mt_fragment_exec_params_.empty()) return num_fragment_instances_;
+  int result = 0;
+  for (const MtFragmentExecParams& fragment_exec_params: mt_fragment_exec_params_) {
+    result += fragment_exec_params.instance_exec_params.size();
+  }
+  return result;
+}
+
+int QuerySchedule::GetNumRemoteFInstances() const {
+  bool has_coordinator_fragment = GetCoordFragment() != nullptr;
+  int result = GetNumFragmentInstances();
+  bool is_mt_execution = request_.query_ctx.request.query_options.mt_dop > 0;
+  if (is_mt_execution && has_coordinator_fragment) --result;
+  return result;
+}
+
+int QuerySchedule::GetTotalFInstances() const {
+  int result = GetNumRemoteFInstances();
+  return GetCoordFragment() != nullptr ? result + 1 : result;
+}
+
+const TPlanFragment* QuerySchedule::GetCoordFragment() const {
+  bool is_mt_exec = request_.query_ctx.request.query_options.mt_dop > 0;
+  const TPlanFragment* fragment = is_mt_exec
+      ? &request_.mt_plan_exec_info[0].fragments[0] : &request_.fragments[0];
+  if (fragment->partition.type == TPartitionType::UNPARTITIONED) {
+    return fragment;
+  } else {
+    return nullptr;
+  }
+}
+
+void QuerySchedule::GetTPlanFragments(vector<const TPlanFragment*>* fragments) const {
+  fragments->clear();
+  bool is_mt_exec = request_.query_ctx.request.query_options.mt_dop > 0;
+  if (is_mt_exec) {
+    for (const TPlanExecInfo& plan_info: request_.mt_plan_exec_info) {
+      for (const TPlanFragment& fragment: plan_info.fragments) {
+        fragments->push_back(&fragment);
+      }
+    }
+  } else {
+    for (const TPlanFragment& fragment: request_.fragments) {
+      fragments->push_back(&fragment);
+    }
+  }
+}
+
+const FInstanceExecParams& QuerySchedule::GetCoordInstanceExecParams() const {
+  const TPlanFragment& coord_fragment =  request_.mt_plan_exec_info[0].fragments[0];
+  DCHECK_EQ(coord_fragment.partition.type, TPartitionType::UNPARTITIONED);
+  const MtFragmentExecParams* fragment_params =
+      &mt_fragment_exec_params_[coord_fragment.idx];
+  DCHECK(fragment_params != nullptr);
+  DCHECK_EQ(fragment_params->instance_exec_params.size(), 1);
+  return fragment_params->instance_exec_params[0];
+}
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/be/src/scheduling/query-schedule.h
----------------------------------------------------------------------
diff --git a/be/src/scheduling/query-schedule.h b/be/src/scheduling/query-schedule.h
index c8ebd5d..39ce268 100644
--- a/be/src/scheduling/query-schedule.h
+++ b/be/src/scheduling/query-schedule.h
@@ -20,13 +20,14 @@
 
 #include <vector>
 #include <string>
+#include <unordered_map>
 #include <boost/unordered_set.hpp>
-#include <boost/unordered_map.hpp>
 #include <boost/scoped_ptr.hpp>
 
 #include "common/global-types.h"
 #include "common/status.h"
 #include "util/promise.h"
+#include "util/container-util.h"
 #include "util/runtime-profile.h"
 #include "gen-cpp/Types_types.h"  // for TNetworkAddress
 #include "gen-cpp/Frontend_types.h"
@@ -34,12 +35,14 @@
 namespace impala {
 
 class Coordinator;
+struct MtFragmentExecParams;
 
 /// map from scan node id to a list of scan ranges
 typedef std::map<TPlanNodeId, std::vector<TScanRangeParams>> PerNodeScanRanges;
+
 /// map from an impalad host address to the per-node assigned scan ranges;
 /// records scan range assignment for a single fragment
-typedef boost::unordered_map<TNetworkAddress, PerNodeScanRanges>
+typedef std::unordered_map<TNetworkAddress, PerNodeScanRanges>
     FragmentScanRangeAssignment;
 
 /// execution parameters for a single fragment; used to assemble the
@@ -58,14 +61,62 @@ struct FragmentExecParams {
   int sender_id_base;
 };
 
+/// execution parameters for a single fragment instance; used to assemble the
+/// TPlanFragmentInstanceCtx
+struct FInstanceExecParams {
+  TUniqueId instance_id;
+  TNetworkAddress host; // execution backend
+  PerNodeScanRanges per_node_scan_ranges;
+
+  /// 0-based ordinal of this particular instance within its fragment (not: query-wide)
+  int per_fragment_instance_idx;
+
+  /// In its role as a data sender, a fragment instance is assigned a "sender id" to
+  /// uniquely identify it to a receiver. -1 = invalid.
+  int sender_id;
+
+  /// the parent MtFragmentExecParams
+  const MtFragmentExecParams& fragment_exec_params;
+  const TPlanFragment& fragment() const;
+
+  FInstanceExecParams(const TUniqueId& instance_id, const TNetworkAddress& host,
+      int per_fragment_instance_idx, const MtFragmentExecParams& fragment_exec_params)
+    : instance_id(instance_id), host(host),
+      per_fragment_instance_idx(per_fragment_instance_idx),
+      sender_id(-1),
+      fragment_exec_params(fragment_exec_params) {}
+};
+
+/// Execution parameters shared between fragment instances
+struct MtFragmentExecParams {
+  /// output destinations of this fragment
+  std::vector<TPlanFragmentDestination> destinations;
+
+  /// map from node id to the number of senders (node id expected to be for an
+  /// ExchangeNode)
+  std::map<PlanNodeId, int> per_exch_num_senders;
+
+  // only needed as intermediate state during exec parameter computation;
+  // for scheduling, refer to FInstanceExecParams.per_node_scan_ranges
+  FragmentScanRangeAssignment scan_range_assignment;
+
+  bool is_coord_fragment;
+  const TPlanFragment& fragment;
+  std::vector<FragmentIdx> input_fragments;
+  std::vector<FInstanceExecParams> instance_exec_params;
+
+  MtFragmentExecParams(const TPlanFragment& fragment)
+    : is_coord_fragment(false), fragment(fragment) {}
+};
+
 /// A QuerySchedule contains all necessary information for a query coordinator to
 /// generate fragment execution requests and start query execution. If resource management
 /// is enabled, then a schedule also contains the resource reservation request
 /// and the granted resource reservation.
-/// TODO: Consider moving QuerySchedule and all Schedulers into
-/// their own lib (and out of statestore).
-/// TODO: Move all global state (e.g. profiles) to QueryExecState (after it is decoupled
-/// from ImpalaServer)
+///
+/// QuerySchedule is a container class for scheduling data, but it doesn't contain
+/// scheduling logic itself. Its state either comes from the static TQueryExecRequest
+/// or is computed by SimpleScheduler.
 class QuerySchedule {
  public:
   QuerySchedule(const TUniqueId& query_id, const TQueryExecRequest& request,
@@ -87,19 +138,86 @@ class QuerySchedule {
   int64_t GetClusterMemoryEstimate() const;
 
   /// Helper methods used by scheduler to populate this QuerySchedule.
-  void AddScanRanges(int64_t delta) { num_scan_ranges_ += delta; }
+  void IncNumScanRanges(int64_t delta) { num_scan_ranges_ += delta; }
+
+  /// The following 4 functions need to be replaced once we stop special-casing
+  /// the coordinator instance in the coordinator.
+  /// The replacement is a single function int GetNumFInstances() (which includes
+  /// the coordinator instance).
+
+  /// TODO-MT: remove; this is actually only the number of remote instances
+  /// (from the coordinator's perspective)
   void set_num_fragment_instances(int64_t num_fragment_instances) {
     num_fragment_instances_ = num_fragment_instances;
   }
-  int64_t num_fragment_instances() const { return num_fragment_instances_; }
+
+  /// Returns the number of fragment instances registered with this schedule.
+  /// MT: total number of fragment instances
+  /// ST: value set with set_num_fragment_instances(); excludes coord instance
+  /// (in effect the number of remote instances)
+  /// TODO-MT: get rid of special-casing of coordinator instance and always return the
+  /// total
+  int GetNumFragmentInstances() const;
+
+  /// Returns the total number of fragment instances, incl. coordinator fragment.
+  /// TODO-MT: remove
+  int GetTotalFInstances() const;
+
+  /// Returns the number of remote fragment instances (excludes coordinator).
+  /// Works for both MT and ST.
+  /// TODO-MT: remove
+  int GetNumRemoteFInstances() const;
+
+  /// Return the coordinator fragment, or nullptr if there isn't one.
+  const TPlanFragment* GetCoordFragment() const;
+
+  /// Return all fragments belonging to exec request in 'fragments'.
+  void GetTPlanFragments(std::vector<const TPlanFragment*>* fragments) const;
+
   int64_t num_scan_ranges() const { return num_scan_ranges_; }
 
-  /// Map node ids to the index of their fragment in TQueryExecRequest.fragments.
-  int32_t GetFragmentIdx(PlanNodeId id) const { return plan_node_to_fragment_idx_[id]; }
+  /// Map node ids to the id of their containing fragment.
+  FragmentIdx GetFragmentIdx(PlanNodeId id) const {
+    return plan_node_to_fragment_idx_[id];
+  }
+
+  /// Returns next instance id. Instance ids are consecutive numbers generated from
+  /// the query id.
+  /// If the query contains a coordinator fragment instance, the generated instance
+  /// ids start at 1 and the caller is responsible for assigning the correct id
+  /// to the coordinator instance. If the query does not contain a coordinator instance,
+  /// the generated instance ids start at 0.
+  TUniqueId GetNextInstanceId();
+
+  const TPlanFragment& GetContainingFragment(PlanNodeId node_id) const {
+    return mt_fragment_exec_params_[GetFragmentIdx(node_id)].fragment;
+  }
 
-  /// Map node ids to the index of the node inside their plan.nodes list.
+  /// Map node ids to the index of their node inside their plan.nodes list.
+  /// TODO-MT: remove; only needed for the ST path
   int32_t GetNodeIdx(PlanNodeId id) const { return plan_node_to_plan_node_idx_[id]; }
+
+  const TPlanNode& GetNode(PlanNodeId id) const {
+    const TPlanFragment& fragment = GetContainingFragment(id);
+    return fragment.plan.nodes[plan_node_to_plan_node_idx_[id]];
+  }
+
   std::vector<FragmentExecParams>* exec_params() { return &fragment_exec_params_; }
+  const std::vector<FragmentExecParams>& exec_params() const {
+    return fragment_exec_params_;
+  }
+  const std::vector<MtFragmentExecParams>& mt_fragment_exec_params() const {
+    return mt_fragment_exec_params_;
+  }
+  const MtFragmentExecParams& GetFragmentExecParams(FragmentIdx idx) const {
+    return mt_fragment_exec_params_[idx];
+  }
+  MtFragmentExecParams* GetFragmentExecParams(FragmentIdx idx) {
+    return &mt_fragment_exec_params_[idx];
+  }
+
+  const FInstanceExecParams& GetCoordInstanceExecParams() const;
+
   const boost::unordered_set<TNetworkAddress>& unique_hosts() const {
     return unique_hosts_;
   }
@@ -111,7 +229,6 @@ class QuerySchedule {
   void SetUniqueHosts(const boost::unordered_set<TNetworkAddress>& unique_hosts);
 
  private:
-
   /// These references are valid for the lifetime of this query schedule because they
   /// are all owned by the enclosing QueryExecState.
   const TUniqueId& query_id_;
@@ -122,7 +239,7 @@ class QuerySchedule {
   RuntimeProfile* summary_profile_;
   RuntimeProfile::EventSequence* query_events_;
 
-  /// Maps from plan node id to its fragment index. Filled in c'tor.
+  /// Maps from plan node id to its fragment idx. Filled in c'tor.
   std::vector<int32_t> plan_node_to_fragment_idx_;
 
   /// Maps from plan node id to its index in plan.nodes. Filled in c'tor.
@@ -132,21 +249,33 @@ class QuerySchedule {
   /// populated by Scheduler::Schedule()
   std::vector<FragmentExecParams> fragment_exec_params_;
 
+  // populated by Scheduler::Schedule (SimpleScheduler::ComputeMtFInstanceExecParams())
+  // indexed by fragment idx (TPlanFragment.idx)
+  std::vector<MtFragmentExecParams> mt_fragment_exec_params_;
+
   /// The set of hosts that the query will run on excluding the coordinator.
   boost::unordered_set<TNetworkAddress> unique_hosts_;
 
   /// Number of backends executing plan fragments on behalf of this query.
+  /// TODO-MT: remove
   int64_t num_fragment_instances_;
 
   /// Total number of scan ranges of this query.
   int64_t num_scan_ranges_;
 
+  /// Used to generate consecutive fragment instance ids.
+  TUniqueId next_instance_id_;
+
   /// Request pool to which the request was submitted for admission.
   std::string request_pool_;
 
   /// Indicates if the query has been admitted for execution.
   bool is_admitted_;
 
+  /// Populate mt_fragment_exec_params_ from request_.mt_plan_exec_info.
+  /// Sets is_coord_fragment and input_fragments.
+  /// Also populates plan_node_to_fragment_idx_ and plan_node_to_plan_node_idx_.
+  void MtInit();
 };
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/be/src/scheduling/scheduler.h
----------------------------------------------------------------------
diff --git a/be/src/scheduling/scheduler.h b/be/src/scheduling/scheduler.h
index 4c3a967..9bff424 100644
--- a/be/src/scheduling/scheduler.h
+++ b/be/src/scheduling/scheduler.h
@@ -46,12 +46,10 @@ class Scheduler {
   /// List of server descriptors.
   typedef std::vector<TBackendDescriptor> BackendList;
 
-  /// Populates given query schedule whose execution is to be coordinated by coord.
-  /// Assigns fragments to hosts based on scan ranges in the query exec request.
-  /// If resource management is enabled, also reserves resources from the central
-  /// resource manager (Yarn via Llama) to run the query in. This function blocks until
-  /// the reservation request has been granted or denied.
-  virtual Status Schedule(Coordinator* coord, QuerySchedule* schedule) = 0;
+  /// Populates given query schedule and assigns fragments to hosts based on scan
+  /// ranges in the query exec request. Submits schedule to admission control before
+  /// returning.
+  virtual Status Schedule(QuerySchedule* schedule) = 0;
 
   /// Releases the reserved resources (if any) from the given schedule.
   virtual Status Release(QuerySchedule* schedule) = 0;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/be/src/scheduling/simple-scheduler.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/simple-scheduler.cc b/be/src/scheduling/simple-scheduler.cc
index f3ba9a5..5b6303e 100644
--- a/be/src/scheduling/simple-scheduler.cc
+++ b/be/src/scheduling/simple-scheduler.cc
@@ -20,6 +20,7 @@
 #include <atomic>
 #include <random>
 #include <vector>
+#include <algorithm>
 
 #include <boost/algorithm/string.hpp>
 #include <boost/algorithm/string/join.hpp>
@@ -276,6 +277,7 @@ void SimpleScheduler::UpdateMembership(
       }
     }
     if (metrics_ != NULL) {
+      /// TODO-MT: fix this (do we even need to report it?)
       num_fragment_instances_metric_->set_value(current_membership_.size());
     }
   }
@@ -305,7 +307,7 @@ Status SimpleScheduler::ComputeScanRangeAssignment(const TQueryExecRequest& exec
   for (entry = exec_request.per_node_scan_ranges.begin();
       entry != exec_request.per_node_scan_ranges.end(); ++entry) {
     const TPlanNodeId node_id = entry->first;
-    int fragment_idx = schedule->GetFragmentIdx(node_id);
+    FragmentIdx fragment_idx = schedule->GetFragmentIdx(node_id);
     const TPlanFragment& fragment = exec_request.fragments[fragment_idx];
     bool exec_at_coord = (fragment.partition.type == TPartitionType::UNPARTITIONED);
 
@@ -326,11 +328,208 @@ Status SimpleScheduler::ComputeScanRangeAssignment(const TQueryExecRequest& exec
         node_id, node_replica_preference, node_random_replica, entry->second,
         exec_request.host_list, exec_at_coord, schedule->query_options(),
         total_assignment_timer, assignment));
-    schedule->AddScanRanges(entry->second.size());
+    schedule->IncNumScanRanges(entry->second.size());
   }
   return Status::OK();
 }
 
+Status SimpleScheduler::MtComputeScanRangeAssignment(QuerySchedule* schedule) {
+  RuntimeProfile::Counter* total_assignment_timer =
+      ADD_TIMER(schedule->summary_profile(), "ComputeScanRangeAssignmentTimer");
+  BackendConfigPtr backend_config = GetBackendConfig();
+  const TQueryExecRequest& exec_request = schedule->request();
+  for (const TPlanExecInfo& plan_exec_info: exec_request.mt_plan_exec_info) {
+    for (const auto& entry: plan_exec_info.per_node_scan_ranges) {
+      const TPlanNodeId node_id = entry.first;
+      const TPlanFragment& fragment = schedule->GetContainingFragment(node_id);
+      bool exec_at_coord = (fragment.partition.type == TPartitionType::UNPARTITIONED);
+
+      const TPlanNode& node = schedule->GetNode(node_id);
+      DCHECK_EQ(node.node_id, node_id);
+
+      const TReplicaPreference::type* node_replica_preference =
+          node.__isset.hdfs_scan_node && node.hdfs_scan_node.__isset.replica_preference
+            ? &node.hdfs_scan_node.replica_preference : NULL;
+      bool node_random_replica =
+          node.__isset.hdfs_scan_node && node.hdfs_scan_node.__isset.random_replica
+            && node.hdfs_scan_node.random_replica;
+
+      FragmentScanRangeAssignment* assignment =
+          &schedule->GetFragmentExecParams(fragment.idx)->scan_range_assignment;
+      RETURN_IF_ERROR(ComputeScanRangeAssignment(
+          *backend_config, node_id, node_replica_preference, node_random_replica,
+          entry.second, exec_request.host_list, exec_at_coord,
+          schedule->query_options(), total_assignment_timer, assignment));
+      schedule->IncNumScanRanges(entry.second.size());
+    }
+  }
+  return Status::OK();
+}
+
+void SimpleScheduler::MtComputeFragmentExecParams(QuerySchedule* schedule) {
+  const TQueryExecRequest& exec_request = schedule->request();
+
+  // for each plan, compute the FInstanceExecParams for the tree of fragments
+  for (const TPlanExecInfo& plan_exec_info: exec_request.mt_plan_exec_info) {
+    // set instance_id, host, per_node_scan_ranges
+    MtComputeFragmentExecParams(
+        plan_exec_info,
+        schedule->GetFragmentExecParams(plan_exec_info.fragments[0].idx),
+        schedule);
+
+    // Set destinations, per_exch_num_senders, sender_id.
+    // fragments[f] sends its output to fragments[dest_fragment_idx[f-1]];
+    // fragments[0] is an endpoint.
+    for (int i = 0; i < plan_exec_info.dest_fragment_idx.size(); ++i) {
+      int dest_idx = plan_exec_info.dest_fragment_idx[i];
+      DCHECK_LT(dest_idx, plan_exec_info.fragments.size());
+      const TPlanFragment& dest_fragment = plan_exec_info.fragments[dest_idx];
+      DCHECK_LT(i + 1, plan_exec_info.fragments.size());
+      const TPlanFragment& src_fragment = plan_exec_info.fragments[i + 1];
+      DCHECK(src_fragment.output_sink.__isset.stream_sink);
+      MtFragmentExecParams* dest_params =
+          schedule->GetFragmentExecParams(dest_fragment.idx);
+      MtFragmentExecParams* src_params =
+          schedule->GetFragmentExecParams(src_fragment.idx);
+
+      // populate src_params->destinations
+      src_params->destinations.resize(dest_params->instance_exec_params.size());
+      for (int j = 0; j < dest_params->instance_exec_params.size(); ++j) {
+        TPlanFragmentDestination& dest = src_params->destinations[j];
+        dest.__set_fragment_instance_id(dest_params->instance_exec_params[j].instance_id);
+        dest.__set_server(dest_params->instance_exec_params[j].host);
+      }
+
+      // enumerate senders consecutively;
+      // for distributed merge we need to enumerate senders across fragment instances
+      const TDataStreamSink& sink = src_fragment.output_sink.stream_sink;
+      DCHECK(
+          sink.output_partition.type == TPartitionType::UNPARTITIONED
+            || sink.output_partition.type == TPartitionType::HASH_PARTITIONED
+            || sink.output_partition.type == TPartitionType::RANDOM);
+      PlanNodeId exch_id = sink.dest_node_id;
+      int sender_id_base = dest_params->per_exch_num_senders[exch_id];
+      dest_params->per_exch_num_senders[exch_id] +=
+          src_params->instance_exec_params.size();
+      for (int j = 0; j < src_params->instance_exec_params.size(); ++j) {
+        FInstanceExecParams& src_instance_params = src_params->instance_exec_params[j];
+        src_instance_params.sender_id = sender_id_base + j;
+      }
+    }
+  }
+}
+
+void SimpleScheduler::MtComputeFragmentExecParams(
+    const TPlanExecInfo& plan_exec_info, MtFragmentExecParams* fragment_params,
+    QuerySchedule* schedule) {
+  // traverse input fragments
+  for (FragmentIdx input_fragment_idx: fragment_params->input_fragments) {
+    MtComputeFragmentExecParams(
+        plan_exec_info, schedule->GetFragmentExecParams(input_fragment_idx), schedule);
+  }
+
+  const TPlanFragment& fragment = fragment_params->fragment;
+  // TODO: deal with Union nodes
+  DCHECK(!ContainsNode(fragment.plan, TPlanNodeType::UNION_NODE));
+  // case 1: single instance executed at coordinator
+  if (fragment.partition.type == TPartitionType::UNPARTITIONED) {
+    const TNetworkAddress& coord = local_backend_descriptor_.address;
+    // make sure that the coordinator instance ends up with instance idx 0
+    TUniqueId instance_id = fragment_params->is_coord_fragment
+        ? schedule->query_id()
+        : schedule->GetNextInstanceId();
+    fragment_params->instance_exec_params.emplace_back(
+      instance_id, coord, 0, *fragment_params);
+    FInstanceExecParams& instance_params = fragment_params->instance_exec_params.back();
+
+    // That instance gets all of the scan ranges, if there are any.
+    if (!fragment_params->scan_range_assignment.empty()) {
+      DCHECK_EQ(fragment_params->scan_range_assignment.size(), 1);
+      auto first_entry = fragment_params->scan_range_assignment.begin();
+      instance_params.per_node_scan_ranges = first_entry->second;
+    }
+  } else {
+    PlanNodeId leftmost_scan_id = FindLeftmostScan(fragment.plan);
+    if (leftmost_scan_id != g_ImpalaInternalService_constants.INVALID_PLAN_NODE_ID) {
+      // case 2: leaf fragment with leftmost scan
+      // TODO: check that there's only one scan in this fragment
+      MtCreateScanInstances(leftmost_scan_id, fragment_params, schedule);
+    } else {
+      // case 3: interior fragment without leftmost scan
+      // we assign the same hosts as those of our leftmost input fragment (so that a
+      // merge aggregation fragment runs on the hosts that provide the input data)
+      MtCreateMirrorInstances(fragment_params, schedule);
+    }
+  }
+}
+
+void SimpleScheduler::MtCreateScanInstances(
+    PlanNodeId leftmost_scan_id, MtFragmentExecParams* fragment_params,
+    QuerySchedule* schedule) {
+  int max_num_instances = schedule->request().query_ctx.request.query_options.mt_dop;
+  for (const auto& assignment_entry: fragment_params->scan_range_assignment) {
+    // evenly divide up the scan ranges of the leftmost scan between at most
+    // <dop> instances
+    const TNetworkAddress& host = assignment_entry.first;
+    auto scan_ranges_it = assignment_entry.second.find(leftmost_scan_id);
+    DCHECK(scan_ranges_it != assignment_entry.second.end());
+    const vector<TScanRangeParams>& params_list = scan_ranges_it->second;
+
+    int64 total_size = 0;
+    for (const TScanRangeParams& params: params_list) {
+      // TODO: implement logic for hbase and kudu
+      DCHECK(params.scan_range.__isset.hdfs_file_split);
+      total_size += params.scan_range.hdfs_file_split.length;
+    }
+
+    // try to load-balance scan ranges by assigning just beyond the average number of
+    // bytes to each instance
+    // TODO: fix shortcomings introduced by uneven split sizes,
+    // this could end up assigning 0 scan ranges to an instance
+    int num_instances = ::min(max_num_instances, static_cast<int>(params_list.size()));
+    DCHECK_GT(num_instances, 0);
+    float avg_bytes_per_instance = static_cast<float>(total_size) / num_instances;
+    int64_t total_assigned_bytes = 0;
+    int params_idx = 0;  // into params_list
+    for (int i = 0; i < num_instances; ++i) {
+      fragment_params->instance_exec_params.emplace_back(
+          schedule->GetNextInstanceId(), host, i, *fragment_params);
+      FInstanceExecParams& instance_params = fragment_params->instance_exec_params.back();
+
+      // threshold beyond which we want to assign to the next instance
+      int64_t threshold_total_bytes = avg_bytes_per_instance * (i + 1);
+
+      // this will have assigned all scan ranges by the last instance:
+      // for the last instance, threshold_total_bytes == total_size and
+      // total_assigned_bytes won't hit total_size until everything is assigned
+      while (params_idx < params_list.size()
+          && total_assigned_bytes < threshold_total_bytes) {
+        const TScanRangeParams& scan_range_params = params_list[params_idx];
+        instance_params.per_node_scan_ranges[leftmost_scan_id].push_back(
+            scan_range_params);
+        total_assigned_bytes += scan_range_params.scan_range.hdfs_file_split.length;
+        ++params_idx;
+      }
+      if (params_idx >= params_list.size()) break;  // nothing left to assign
+    }
+    DCHECK_EQ(params_idx, params_list.size());  // everything got assigned
+  }
+}
+
+void SimpleScheduler::MtCreateMirrorInstances(
+    MtFragmentExecParams* fragment_params, QuerySchedule* schedule) {
+  DCHECK_GE(fragment_params->input_fragments.size(), 1);
+  const MtFragmentExecParams* input_fragment_params =
+      schedule->GetFragmentExecParams(fragment_params->input_fragments[0]);
+  int per_fragment_instance_idx = 0;
+  for (const FInstanceExecParams& input_instance_params:
+      input_fragment_params->instance_exec_params) {
+    fragment_params->instance_exec_params.emplace_back(
+        schedule->GetNextInstanceId(), input_instance_params.host,
+        per_fragment_instance_idx++, *fragment_params);
+  }
+}
+
 Status SimpleScheduler::ComputeScanRangeAssignment(
     const BackendConfig& backend_config, PlanNodeId node_id,
     const TReplicaPreference::type* node_replica_preference, bool node_random_replica,
@@ -459,12 +658,10 @@ void SimpleScheduler::ComputeFragmentExecParams(const TQueryExecRequest& exec_re
   // assign instance ids
   int64_t num_fragment_instances = 0;
   for (FragmentExecParams& params: *fragment_exec_params) {
-    for (int j = 0; j < params.hosts.size(); ++j) {
-      int instance_idx = num_fragment_instances + j;
+    for (int j = 0; j < params.hosts.size(); ++j, ++num_fragment_instances) {
       params.instance_ids.push_back(
-          CreateInstanceId(schedule->query_id(), instance_idx));
+          CreateInstanceId(schedule->query_id(), num_fragment_instances));
     }
-    num_fragment_instances += params.hosts.size();
   }
   if (exec_request.fragments[0].partition.type == TPartitionType::UNPARTITIONED) {
     // the root fragment is executed directly by the coordinator
@@ -511,11 +708,6 @@ void SimpleScheduler::ComputeFragmentHosts(const TQueryExecRequest& exec_request
     QuerySchedule* schedule) {
   vector<FragmentExecParams>* fragment_exec_params = schedule->exec_params();
   DCHECK_EQ(fragment_exec_params->size(), exec_request.fragments.size());
-  vector<TPlanNodeType::type> scan_node_types;
-  scan_node_types.push_back(TPlanNodeType::HDFS_SCAN_NODE);
-  scan_node_types.push_back(TPlanNodeType::HBASE_SCAN_NODE);
-  scan_node_types.push_back(TPlanNodeType::DATA_SOURCE_NODE);
-  scan_node_types.push_back(TPlanNodeType::KUDU_SCAN_NODE);
 
   // compute hosts of producer fragment before those of consumer fragment(s),
   // the latter might inherit the set of hosts from the former
@@ -535,6 +727,9 @@ void SimpleScheduler::ComputeFragmentHosts(const TQueryExecRequest& exec_request
     // a UnionNode with partitioned joins or grouping aggregates as children runs on
     // at least as many hosts as the input to those children).
     if (ContainsNode(fragment.plan, TPlanNodeType::UNION_NODE)) {
+      vector<TPlanNodeType::type> scan_node_types {
+        TPlanNodeType::HDFS_SCAN_NODE, TPlanNodeType::HBASE_SCAN_NODE,
+        TPlanNodeType::DATA_SOURCE_NODE, TPlanNodeType::KUDU_SCAN_NODE};
       vector<TPlanNodeId> scan_nodes;
       FindNodes(fragment.plan, scan_node_types, &scan_nodes);
       vector<TPlanNodeId> exch_nodes;
@@ -562,7 +757,7 @@ void SimpleScheduler::ComputeFragmentHosts(const TQueryExecRequest& exec_request
       continue;
     }
 
-    PlanNodeId leftmost_scan_id = FindLeftmostNode(fragment.plan, scan_node_types);
+    PlanNodeId leftmost_scan_id = FindLeftmostScan(fragment.plan);
     if (leftmost_scan_id == g_ImpalaInternalService_constants.INVALID_PLAN_NODE_ID) {
       // there is no leftmost scan; we assign the same hosts as those of our
       // leftmost input fragment (so that a partitioned aggregation fragment
@@ -606,6 +801,13 @@ PlanNodeId SimpleScheduler::FindLeftmostNode(
   return g_ImpalaInternalService_constants.INVALID_PLAN_NODE_ID;
 }
 
+PlanNodeId SimpleScheduler::FindLeftmostScan(const TPlan& plan) {
+  vector<TPlanNodeType::type> scan_node_types {
+      TPlanNodeType::HDFS_SCAN_NODE, TPlanNodeType::HBASE_SCAN_NODE,
+      TPlanNodeType::DATA_SOURCE_NODE, TPlanNodeType::KUDU_SCAN_NODE};
+  return FindLeftmostNode(plan, scan_node_types);
+}
+
 bool SimpleScheduler::ContainsNode(const TPlan& plan, TPlanNodeType::type type) {
   for (int i = 0; i < plan.nodes.size(); ++i) {
     if (plan.nodes[i].node_type == type) return true;
@@ -674,18 +876,35 @@ int SimpleScheduler::FindSenderFragment(TPlanNodeId exch_id, int fragment_idx,
   return g_ImpalaInternalService_constants.INVALID_PLAN_NODE_ID;
 }
 
-Status SimpleScheduler::Schedule(Coordinator* coord, QuerySchedule* schedule) {
+Status SimpleScheduler::Schedule(QuerySchedule* schedule) {
   string resolved_pool;
   RETURN_IF_ERROR(request_pool_service_->ResolveRequestPool(
       schedule->request().query_ctx, &resolved_pool));
   schedule->set_request_pool(resolved_pool);
   schedule->summary_profile()->AddInfoString("Request Pool", resolved_pool);
 
-  RETURN_IF_ERROR(ComputeScanRangeAssignment(schedule->request(), schedule));
-  ComputeFragmentHosts(schedule->request(), schedule);
-  ComputeFragmentExecParams(schedule->request(), schedule);
-  if (!FLAGS_disable_admission_control) {
-    RETURN_IF_ERROR(admission_controller_->AdmitQuery(schedule));
+  bool is_mt_execution = schedule->request().query_ctx.request.query_options.mt_dop > 0;
+  if (is_mt_execution) {
+    RETURN_IF_ERROR(MtComputeScanRangeAssignment(schedule));
+    MtComputeFragmentExecParams(schedule);
+
+    // compute unique hosts
+    unordered_set<TNetworkAddress> unique_hosts;
+    for (const MtFragmentExecParams& f: schedule->mt_fragment_exec_params()) {
+      for (const FInstanceExecParams& i: f.instance_exec_params) {
+        unique_hosts.insert(i.host);
+      }
+    }
+    schedule->SetUniqueHosts(unique_hosts);
+
+    // TODO-MT: call AdmitQuery()
+  } else {
+    RETURN_IF_ERROR(ComputeScanRangeAssignment(schedule->request(), schedule));
+    ComputeFragmentHosts(schedule->request(), schedule);
+    ComputeFragmentExecParams(schedule->request(), schedule);
+    if (!FLAGS_disable_admission_control) {
+      RETURN_IF_ERROR(admission_controller_->AdmitQuery(schedule));
+    }
   }
   return Status::OK();
 }
@@ -849,10 +1068,10 @@ void SimpleScheduler::AssignmentCtx::RecordScanRangeAssignment(
     if (!remote_read) total_local_assignments_->Increment(1);
   }
 
-  PerNodeScanRanges* scan_ranges = FindOrInsert(assignment, backend.address,
-      PerNodeScanRanges());
-  vector<TScanRangeParams>* scan_range_params_list = FindOrInsert(scan_ranges, node_id,
-      vector<TScanRangeParams>());
+  PerNodeScanRanges* scan_ranges =
+      FindOrInsert(assignment, backend.address, PerNodeScanRanges());
+  vector<TScanRangeParams>* scan_range_params_list =
+      FindOrInsert(scan_ranges, node_id, vector<TScanRangeParams>());
   // Add scan range.
   TScanRangeParams scan_range_params;
   scan_range_params.scan_range = scan_range_locations.scan_range;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/be/src/scheduling/simple-scheduler.h
----------------------------------------------------------------------
diff --git a/be/src/scheduling/simple-scheduler.h b/be/src/scheduling/simple-scheduler.h
index 9b51269..b7cc83e 100644
--- a/be/src/scheduling/simple-scheduler.h
+++ b/be/src/scheduling/simple-scheduler.h
@@ -88,7 +88,7 @@ class SimpleScheduler : public Scheduler {
   /// Register with the subscription manager if required
   virtual impala::Status Init();
 
-  virtual Status Schedule(Coordinator* coord, QuerySchedule* schedule);
+  virtual Status Schedule(QuerySchedule* schedule);
   virtual Status Release(QuerySchedule* schedule);
 
  private:
@@ -405,7 +405,39 @@ class SimpleScheduler : public Scheduler {
   void ComputeFragmentExecParams(const TQueryExecRequest& exec_request,
       QuerySchedule* schedule);
 
-  /// For each fragment in exec_request, compute the hosts on which to run the instances
+  /// Compute the assignment of scan ranges to hosts for each scan node in
+  /// the schedule's TQueryExecRequest.mt_plan_exec_info.
+  /// Unpartitioned fragments are assigned to the coordinator. Populate the schedule's
+  /// mt_fragment_exec_params_ with the resulting scan range assignment.
+  Status MtComputeScanRangeAssignment(QuerySchedule* schedule);
+
+  /// Compute the MtFragmentExecParams for all plans in the schedule's
+  /// TQueryExecRequest.mt_plan_exec_info.
+  /// This includes the routing information (destinations, per_exch_num_senders,
+  /// sender_id)
+  void MtComputeFragmentExecParams(QuerySchedule* schedule);
+
+  /// Recursively create FInstanceExecParams and set per_node_scan_ranges for
+  /// fragment_params and its input fragments via a depth-first traversal.
+  /// All fragments are part of plan_exec_info.
+  void MtComputeFragmentExecParams(const TPlanExecInfo& plan_exec_info,
+      MtFragmentExecParams* fragment_params, QuerySchedule* schedule);
+
+  /// Create instances of the fragment corresponding to fragment_params to run on the
+  /// selected replica hosts of the scan ranges of the node with id scan_id.
+  /// The maximum number of instances is the value of query option mt_dop.
+  /// This attempts to load balance among instances by computing the average number
+  /// of bytes per instances and then in a single pass assigning scan ranges to each
+  /// instances to roughly meet that average.
+  void MtCreateScanInstances(PlanNodeId scan_id,
+      MtFragmentExecParams* fragment_params, QuerySchedule* schedule);
+
+  /// For each instance of the single input fragment of the fragment corresponding to
+  /// fragment_params, create an instance for this fragment.
+  void MtCreateMirrorInstances(MtFragmentExecParams* fragment_params,
+      QuerySchedule* schedule);
+
+  /// For each fragment in exec_request, computes hosts on which to run the instances
   /// and stores result in fragment_exec_params_.hosts.
   void ComputeFragmentHosts(const TQueryExecRequest& exec_request,
       QuerySchedule* schedule);
@@ -414,6 +446,8 @@ class SimpleScheduler : public Scheduler {
   /// INVALID_PLAN_NODE_ID if no such node present.
   PlanNodeId FindLeftmostNode(
       const TPlan& plan, const std::vector<TPlanNodeType::type>& types);
+  /// Same for scan nodes.
+  PlanNodeId FindLeftmostScan(const TPlan& plan);
 
   /// Return the index (w/in exec_request.fragments) of fragment that sends its output to
   /// exec_request.fragment[fragment_idx]'s leftmost ExchangeNode.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/be/src/service/fragment-exec-state.cc
----------------------------------------------------------------------
diff --git a/be/src/service/fragment-exec-state.cc b/be/src/service/fragment-exec-state.cc
index 9069b03..76e11d1 100644
--- a/be/src/service/fragment-exec-state.cc
+++ b/be/src/service/fragment-exec-state.cc
@@ -81,7 +81,6 @@ void FragmentMgr::FragmentExecState::ReportStatusCb(
   TReportExecStatusParams params;
   params.protocol_version = ImpalaInternalServiceVersion::V1;
   params.__set_query_id(query_ctx_.query_id);
-  params.__set_instance_state_idx( fragment_instance_ctx_.instance_state_idx);
   params.__set_fragment_instance_id(fragment_instance_ctx_.fragment_instance_id);
   exec_status.SetTStatus(&params);
   params.__set_done(done);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/be/src/service/fragment-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/service/fragment-mgr.cc b/be/src/service/fragment-mgr.cc
index a98bbf9..64e9a78 100644
--- a/be/src/service/fragment-mgr.cc
+++ b/be/src/service/fragment-mgr.cc
@@ -38,10 +38,8 @@ DEFINE_int32(log_mem_usage_interval, 0, "If non-zero, impalad will output memory
 
 Status FragmentMgr::ExecPlanFragment(const TExecPlanFragmentParams& exec_params) {
   VLOG_QUERY << "ExecPlanFragment() instance_id="
-             << exec_params.fragment_instance_ctx.fragment_instance_id
-             << " coord=" << exec_params.query_ctx.coord_address
-             << " fragment instance#="
-             << exec_params.fragment_instance_ctx.instance_state_idx;
+             << PrintId(exec_params.fragment_instance_ctx.fragment_instance_id)
+             << " coord=" << exec_params.query_ctx.coord_address;
 
   // Preparing and opening the fragment creates a thread and consumes a non-trivial
   // amount of memory. If we are already starved for memory, cancel the fragment as
@@ -146,6 +144,7 @@ void FragmentMgr::CancelPlanFragment(TCancelPlanFragmentResult& return_val,
 
 void FragmentMgr::PublishFilter(TPublishFilterResult& return_val,
     const TPublishFilterParams& params) {
+  VLOG_FILE << "PublishFilter(): dst_instance_id=" << params.dst_instance_id;
   shared_ptr<FragmentExecState> fragment_exec_state =
       GetFragmentExecState(params.dst_instance_id);
   if (fragment_exec_state.get() == NULL) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/be/src/service/impala-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index d5fd59a..7f9d862 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -845,9 +845,7 @@ void ImpalaServer::PrepareQueryContext(TQueryCtx* query_ctx) {
   // benchmarks show it to be slightly cheaper than contending for a
   // single generator under a lock (since random_generator is not
   // thread-safe).
-  random_generator uuid_generator;
-  uuid query_uuid = uuid_generator();
-  query_ctx->query_id = UuidToQueryId(query_uuid);
+  query_ctx->query_id = UuidToQueryId(random_generator()());
 }
 
 Status ImpalaServer::RegisterQuery(shared_ptr<SessionState> session_state,
@@ -1077,9 +1075,8 @@ Status ImpalaServer::GetSessionState(const TUniqueId& session_id,
 
 void ImpalaServer::ReportExecStatus(
     TReportExecStatusResult& return_val, const TReportExecStatusParams& params) {
-  VLOG_FILE << "ReportExecStatus() query_id=" << params.query_id
-            << " fragment instance#=" << params.instance_state_idx
-            << " instance_id=" << params.fragment_instance_id
+  VLOG_FILE << "ReportExecStatus()"
+            << " instance_id=" << PrintId(params.fragment_instance_id)
             << " done=" << (params.done ? "true" : "false");
   // TODO: implement something more efficient here, we're currently
   // acquiring/releasing the map lock and doing a map lookup for
@@ -1090,10 +1087,8 @@ void ImpalaServer::ReportExecStatus(
     // This is expected occasionally (since a report RPC might be in flight while
     // cancellation is happening). Return an error to the caller to get it to stop.
     const string& err = Substitute("ReportExecStatus(): Received report for unknown "
-        "query ID (probably closed or cancelled). (query_id: $0, backend: $1, instance:"
-        " $2 done: $3)", PrintId(params.query_id),
-        params.instance_state_idx, PrintId(params.fragment_instance_id),
-        params.done);
+        "query ID (probably closed or cancelled). (instance: $0 done: $1)",
+        PrintId(params.fragment_instance_id), params.done);
     Status(TErrorCode::INTERNAL_ERROR, err).SetTStatus(&return_val);
     VLOG_QUERY << err;
     return;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/be/src/service/query-exec-state.cc
----------------------------------------------------------------------
diff --git a/be/src/service/query-exec-state.cc b/be/src/service/query-exec-state.cc
index 9b2fc88..d55ac54 100644
--- a/be/src/service/query-exec-state.cc
+++ b/be/src/service/query-exec-state.cc
@@ -373,7 +373,8 @@ Status ImpalaServer::QueryExecState::ExecLocalCatalogOp(
 Status ImpalaServer::QueryExecState::ExecQueryOrDmlRequest(
     const TQueryExecRequest& query_exec_request) {
   // we always need at least one plan fragment
-  DCHECK_GT(query_exec_request.fragments.size(), 0);
+  DCHECK(query_exec_request.fragments.size() > 0
+      || query_exec_request.mt_plan_exec_info.size() > 0);
 
   if (query_exec_request.__isset.query_plan) {
     stringstream plan_ss;
@@ -424,26 +425,31 @@ Status ImpalaServer::QueryExecState::ExecQueryOrDmlRequest(
   // case, the query can only have a single fragment, and that fragment needs to be
   // executed by the coordinator. This check confirms that.
   // If desc_tbl is set, the query may or may not have a coordinator fragment.
+  bool is_mt_exec = query_exec_request.query_ctx.request.query_options.mt_dop > 0;
+  const TPlanFragment& fragment = is_mt_exec
+      ? query_exec_request.mt_plan_exec_info[0].fragments[0]
+      : query_exec_request.fragments[0];
   bool has_coordinator_fragment =
-      query_exec_request.fragments[0].partition.type == TPartitionType::UNPARTITIONED;
+      fragment.partition.type == TPartitionType::UNPARTITIONED;
   DCHECK(has_coordinator_fragment || query_exec_request.__isset.desc_tbl);
 
   {
     lock_guard<mutex> l(lock_);
     // Don't start executing the query if Cancel() was called concurrently with Exec().
     if (is_cancelled_) return Status::CANCELLED;
+    // TODO: make schedule local to coordinator and move schedule_->Release() into
+    // Coordinator::TearDown()
     schedule_.reset(new QuerySchedule(query_id(), query_exec_request,
         exec_request_.query_options, &summary_profile_, query_events_));
-    coord_.reset(new Coordinator(exec_request_.query_options, exec_env_, query_events_));
   }
-  Status status = exec_env_->scheduler()->Schedule(coord_.get(), schedule_.get());
-
+  Status status = exec_env_->scheduler()->Schedule(schedule_.get());
   {
     lock_guard<mutex> l(lock_);
     RETURN_IF_ERROR(UpdateQueryStatus(status));
   }
 
-  status = coord_->Exec(*schedule_, &output_expr_ctxs_);
+  coord_.reset(new Coordinator(*schedule_, exec_env_, query_events_));
+  status = coord_->Exec(&output_expr_ctxs_);
   {
     lock_guard<mutex> l(lock_);
     RETURN_IF_ERROR(UpdateQueryStatus(status));

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/be/src/service/query-options.cc
----------------------------------------------------------------------
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index d0e4275..0823981 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -395,16 +395,16 @@ Status impala::SetQueryOption(const string& key, const string& value,
         }
         break;
       }
-      case TImpalaQueryOptions::MT_NUM_CORES: {
+      case TImpalaQueryOptions::MT_DOP: {
         StringParser::ParseResult result;
-        const int32_t num_cores =
+        const int32_t dop =
             StringParser::StringToInt<int32_t>(value.c_str(), value.length(), &result);
-        if (result != StringParser::PARSE_SUCCESS || num_cores < 0 || num_cores > 128) {
+        if (result != StringParser::PARSE_SUCCESS || dop < 0 || dop > 128) {
           return Status(
-              Substitute("$0 is not valid for mt_num_cores. Valid values are in "
+              Substitute("$0 is not valid for mt_dop. Valid values are in "
                 "[0, 128].", value));
         }
-        query_options->__set_mt_num_cores(num_cores);
+        query_options->__set_mt_dop(dop);
         break;
       }
       case TImpalaQueryOptions::S3_SKIP_INSERT_STAGING: {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/be/src/service/query-options.h
----------------------------------------------------------------------
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index 2c25700..b1194d3 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -79,7 +79,7 @@ class TQueryOptions;
   QUERY_OPT_FN(max_num_runtime_filters, MAX_NUM_RUNTIME_FILTERS)\
   QUERY_OPT_FN(parquet_annotate_strings_utf8, PARQUET_ANNOTATE_STRINGS_UTF8)\
   QUERY_OPT_FN(parquet_fallback_schema_resolution, PARQUET_FALLBACK_SCHEMA_RESOLUTION)\
-  QUERY_OPT_FN(mt_num_cores, MT_NUM_CORES)\
+  QUERY_OPT_FN(mt_dop, MT_DOP)\
   QUERY_OPT_FN(s3_skip_insert_staging, S3_SKIP_INSERT_STAGING)\
   QUERY_OPT_FN(runtime_filter_min_size, RUNTIME_FILTER_MIN_SIZE)\
   QUERY_OPT_FN(runtime_filter_max_size, RUNTIME_FILTER_MAX_SIZE)\

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/be/src/util/container-util.h
----------------------------------------------------------------------
diff --git a/be/src/util/container-util.h b/be/src/util/container-util.h
index b8cd1ef..efb4f52 100644
--- a/be/src/util/container-util.h
+++ b/be/src/util/container-util.h
@@ -20,6 +20,7 @@
 #define IMPALA_UTIL_CONTAINER_UTIL_H
 
 #include <map>
+#include <unordered_map>
 #include <boost/unordered_map.hpp>
 
 #include "util/hash-util.h"
@@ -35,6 +36,21 @@ inline std::size_t hash_value(const TNetworkAddress& host_port) {
   return HashUtil::Hash(&host_port.port, sizeof(host_port.port), hash);
 }
 
+}
+
+/// Hash function for std:: containers
+namespace std {
+
+template<> struct hash<impala::TNetworkAddress> {
+  std::size_t operator()(const impala::TNetworkAddress& host_port) const {
+    return impala::hash_value(host_port);
+  }
+};
+
+}
+
+namespace impala {
+
 struct HashTNetworkAddressPtr : public std::unary_function<TNetworkAddress*, size_t> {
   size_t operator()(const TNetworkAddress* const& p) const { return hash_value(*p); }
 };
@@ -49,6 +65,7 @@ struct TNetworkAddressPtrEquals : public std::unary_function<TNetworkAddress*, b
 
 /// FindOrInsert(): if the key is present, return the value; if the key is not present,
 /// create a new entry (key, default_val) and return default_val.
+/// TODO: replace with single template which takes a template param
 
 template <typename K, typename V>
 V* FindOrInsert(std::map<K,V>* m, const K& key, const V& default_val) {
@@ -60,6 +77,15 @@ V* FindOrInsert(std::map<K,V>* m, const K& key, const V& default_val) {
 }
 
 template <typename K, typename V>
+V* FindOrInsert(std::unordered_map<K,V>* m, const K& key, const V& default_val) {
+  typename std::unordered_map<K,V>::iterator it = m->find(key);
+  if (it == m->end()) {
+    it = m->insert(std::make_pair(key, default_val)).first;
+  }
+  return &it->second;
+}
+
+template <typename K, typename V>
 V* FindOrInsert(boost::unordered_map<K,V>* m, const K& key, const V& default_val) {
   typename boost::unordered_map<K,V>::iterator it = m->find(key);
   if (it == m->end()) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/be/src/util/uid-util-test.cc
----------------------------------------------------------------------
diff --git a/be/src/util/uid-util-test.cc b/be/src/util/uid-util-test.cc
index ebbc5eb..997cc0a 100644
--- a/be/src/util/uid-util-test.cc
+++ b/be/src/util/uid-util-test.cc
@@ -31,12 +31,7 @@ TEST(UidUtil, FragmentInstanceId) {
 
   for (int i = 0; i < 100; ++i) {
     TUniqueId instance_id = CreateInstanceId(query_id, i);
-    EXPECT_EQ(instance_id.hi, query_id.hi);
-
-    TUniqueId qid = GetQueryId(instance_id);
-    EXPECT_EQ(qid.hi, query_id.hi);
-    EXPECT_EQ(qid.lo, query_id.lo);
-
+    EXPECT_EQ(GetQueryId(instance_id), query_id);
     EXPECT_EQ(GetInstanceIdx(instance_id), i);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/be/src/util/uid-util.h
----------------------------------------------------------------------
diff --git a/be/src/util/uid-util.h b/be/src/util/uid-util.h
index f0f87ec..de18464 100644
--- a/be/src/util/uid-util.h
+++ b/be/src/util/uid-util.h
@@ -45,7 +45,7 @@ inline void UUIDToTUniqueId(const boost::uuids::uuid& uuid, TUniqueId* unique_id
 /// Query id: uuid with bottom 4 bytes set to 0
 /// Fragment instance id: query id with instance index stored in the bottom 4 bytes
 
-const int64_t FRAGMENT_IDX_MASK = (1L << 32) - 1;
+constexpr int64_t FRAGMENT_IDX_MASK = (1L << 32) - 1;
 
 inline TUniqueId UuidToQueryId(const boost::uuids::uuid& uuid) {
   TUniqueId result;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/common/thrift/ExecStats.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ExecStats.thrift b/common/thrift/ExecStats.thrift
index 8068b63..8e88b20 100644
--- a/common/thrift/ExecStats.thrift
+++ b/common/thrift/ExecStats.thrift
@@ -56,7 +56,7 @@ struct TExecStats {
 // node as well as per instance stats.
 struct TPlanNodeExecSummary {
   1: required Types.TPlanNodeId node_id
-  2: required i32 fragment_id
+  2: required Types.TFragmentIdx fragment_idx
   3: required string label
   4: optional string label_detail
   5: required i32 num_children
@@ -64,15 +64,11 @@ struct TPlanNodeExecSummary {
   // Estimated stats generated by the planner
   6: optional TExecStats estimated_stats
 
-  // One entry for each BE executing this plan node.
+  // One entry for each fragment instance executing this plan node.
   7: optional list<TExecStats> exec_stats
 
-  // One entry for each BE executing this plan node. True if this plan node is still
-  // running.
-  8: optional list<bool> is_active
-
   // If true, this plan node is an exchange node that is the receiver of a broadcast.
-  9: optional bool is_broadcast
+  8: optional bool is_broadcast
 }
 
 // Progress counters for an in-flight query.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/common/thrift/Frontend.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/Frontend.thrift b/common/thrift/Frontend.thrift
index 95d6ba3..91322b2 100644
--- a/common/thrift/Frontend.thrift
+++ b/common/thrift/Frontend.thrift
@@ -340,6 +340,25 @@ struct TLoadDataResp {
   1: required Data.TResultRow load_summary
 }
 
+// Execution parameters for a single plan; component of TQueryExecRequest
+struct TPlanExecInfo {
+  // fragments[i] may consume the output of fragments[j > i];
+  // fragments[0] is the root fragment and also the coordinator fragment, if
+  // it is unpartitioned.
+  1: required list<Planner.TPlanFragment> fragments
+
+  // Specifies the destination fragment of the output of each fragment.
+  // dest_fragment_idx.size() == fragments.size() - 1 and
+  // fragments[i] sends its output to fragments[dest_fragment_idx[i-1]]
+  // TODO: remove; TPlanFragment.output_sink.dest_node_id is sufficient
+  2: optional list<i32> dest_fragment_idx
+
+  // A map from scan node ids to a list of scan range locations.
+  // The node ids refer to scan nodes in fragments[].plan_tree
+  3: optional map<Types.TPlanNodeId, list<Planner.TScanRangeLocations>>
+      per_node_scan_ranges
+}
+
 // Result of call to ImpalaPlanService/JniFrontend.CreateQueryRequest()
 struct TQueryExecRequest {
   // global descriptor tbl for all fragments
@@ -348,12 +367,7 @@ struct TQueryExecRequest {
   // fragments[i] may consume the output of fragments[j > i];
   // fragments[0] is the root fragment and also the coordinator fragment, if
   // it is unpartitioned.
-  2: required list<Planner.TPlanFragment> fragments
-
-  // Multi-threaded execution: sequence of plans; the last one materializes
-  // the query result
-  // TODO: this will eventually supercede 'fragments'
-  14: optional list<Planner.TPlanFragmentTree> mt_plans
+  2: optional list<Planner.TPlanFragment> fragments
 
   // Specifies the destination fragment of the output of each fragment.
   // parent_fragment_idx.size() == fragments.size() - 1 and
@@ -365,6 +379,12 @@ struct TQueryExecRequest {
   4: optional map<Types.TPlanNodeId, list<Planner.TScanRangeLocations>>
       per_node_scan_ranges
 
+  // Multi-threaded execution: exec info for all plans; the first one materializes
+  // the query result
+  // TODO: this will eventually supercede fields fragments, dest_fragment_idx,
+  // per_node_scan_ranges
+  14: optional list<TPlanExecInfo> mt_plan_exec_info
+
   // Metadata of the query result set (only for select)
   5: optional Results.TResultSetMetadata result_set_metadata
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/common/thrift/ImpalaInternalService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index 089524d..3ee54ae 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -181,10 +181,11 @@ struct TQueryOptions {
   // "name".
   43: optional TParquetFallbackSchemaResolution parquet_fallback_schema_resolution = 0
 
-  // Multi-threaded execution: number of cores per query per node.
-  // > 0: multi-threaded execution mode, with given number of cores
+  // Multi-threaded execution: degree of parallelism (= number of active threads) per
+  // query per backend.
+  // > 0: multi-threaded execution mode, with given dop
   // 0: single-threaded execution mode
-  44: optional i32 mt_num_cores = 0
+  44: optional i32 mt_dop = 0
 
   // If true, INSERT writes to S3 go directly to their final location rather than being
   // copied there by the coordinator. We cannot do this for INSERT OVERWRITES because for
@@ -252,6 +253,7 @@ struct TClientRequest {
 // TODO: Separate into FE/BE initialized vars.
 struct TQueryCtx {
   // Client request containing stmt to execute and query options.
+  // TODO: rename to client_request, we have too many requests
   1: required TClientRequest request
 
   // A globally unique id assigned to the entire query in the BE.
@@ -302,9 +304,6 @@ struct TQueryCtx {
 // fragment.
 struct TPlanFragmentCtx {
   1: required Planner.TPlanFragment fragment
-
-  // total number of instances of this fragment
-  2: required i32 num_fragment_instances
 }
 
 // A scan range plus the parameters needed to execute that scan.
@@ -329,45 +328,42 @@ struct TPlanFragmentDestination {
 // TODO: for range partitioning, we also need to specify the range boundaries
 struct TPlanFragmentInstanceCtx {
   // The globally unique fragment instance id.
-  // Format: query id + query-wide fragment index
-  // The query-wide fragment index starts at 0, so that the query id
-  // and the id of the first fragment instance (the coordinator instance)
-  // are identical.
+  // Format: query id + query-wide fragment instance index
+  // The query-wide fragment instance index starts at 0, so that the query id
+  // and the id of the first fragment instance are identical.
+  // If there is a coordinator instance, it is the first one, with index 0.
   1: required Types.TUniqueId fragment_instance_id
 
   // Index of this fragment instance accross all instances of its parent fragment,
   // range [0, TPlanFragmentCtx.num_fragment_instances).
-  2: required i32 fragment_instance_idx
-
-  // Index of this fragment instance in Coordinator::fragment_instance_states_.
-  // TODO: remove; this is subsumed by the query-wide instance idx embedded
-  // in the fragment_instance_id
-  3: required i32 instance_state_idx
+  2: required i32 per_fragment_instance_idx
 
   // Initial scan ranges for each scan node in TPlanFragment.plan_tree
-  4: required map<Types.TPlanNodeId, list<TScanRangeParams>> per_node_scan_ranges
+  3: required map<Types.TPlanNodeId, list<TScanRangeParams>> per_node_scan_ranges
 
   // Number of senders for ExchangeNodes contained in TPlanFragment.plan_tree;
   // needed to create a DataStreamRecvr
-  5: required map<Types.TPlanNodeId, i32> per_exch_num_senders
+  // TODO for per-query exec rpc: move these to TPlanFragmentCtx
+  4: required map<Types.TPlanNodeId, i32> per_exch_num_senders
 
   // Output destinations, one per output partition.
   // The partitioning of the output is specified by
   // TPlanFragment.output_sink.output_partition.
   // The number of output partitions is destinations.size().
-  6: list<TPlanFragmentDestination> destinations
+  // TODO for per-query exec rpc: move these to TPlanFragmentCtx
+  5: list<TPlanFragmentDestination> destinations
 
   // Debug options: perform some action in a particular phase of a particular node
-  7: optional Types.TPlanNodeId debug_node_id
-  8: optional PlanNodes.TExecNodePhase debug_phase
-  9: optional PlanNodes.TDebugAction debug_action
+  6: optional Types.TPlanNodeId debug_node_id
+  7: optional PlanNodes.TExecNodePhase debug_phase
+  8: optional PlanNodes.TDebugAction debug_action
 
   // The pool to which this request has been submitted. Used to update pool statistics
   // for admission control.
-  10: optional string request_pool
+  9: optional string request_pool
 
   // Id of this fragment in its role as a sender.
-  11: optional i32 sender_id
+  10: optional i32 sender_id
 }
 
 
@@ -461,31 +457,26 @@ struct TReportExecStatusParams {
   2: optional Types.TUniqueId query_id
 
   // required in V1
-  // Used to look up the fragment instance state in the coordinator, same value as
-  // TExecPlanFragmentParams.instance_state_idx.
-  3: optional i32 instance_state_idx
-
-  // required in V1
-  4: optional Types.TUniqueId fragment_instance_id
+  3: optional Types.TUniqueId fragment_instance_id
 
   // Status of fragment execution; any error status means it's done.
   // required in V1
-  5: optional Status.TStatus status
+  4: optional Status.TStatus status
 
   // If true, fragment finished executing.
   // required in V1
-  6: optional bool done
+  5: optional bool done
 
   // cumulative profile
   // required in V1
-  7: optional RuntimeProfile.TRuntimeProfileTree profile
+  6: optional RuntimeProfile.TRuntimeProfileTree profile
 
   // Cumulative structural changes made by a table sink
   // optional in V1
-  8: optional TInsertExecStatus insert_exec_status;
+  7: optional TInsertExecStatus insert_exec_status;
 
   // New errors that have not been reported to the coordinator
-  9: optional map<ErrorCodes.TErrorCode, TErrorLogEntry> error_log;
+  8: optional map<ErrorCodes.TErrorCode, TErrorLogEntry> error_log;
 }
 
 struct TReportExecStatusResult {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/common/thrift/ImpalaService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift
index da41a8e..129be2d 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -213,8 +213,9 @@ enum TImpalaQueryOptions {
   // is always, since fields IDs are NYI). Valid values are "position" and "name".
   PARQUET_FALLBACK_SCHEMA_RESOLUTION,
 
-  // Multi-threaded execution: number of cores per machine
-  MT_NUM_CORES,
+  // Multi-threaded execution: degree of parallelism = number of active threads per
+  // backend
+  MT_DOP,
 
   // If true, INSERT writes to S3 go directly to their final location rather than being
   // copied there by the coordinator. We cannot do this for INSERT OVERWRITES because for

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/common/thrift/Planner.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/Planner.thrift b/common/thrift/Planner.thrift
index eb95585..92c8681 100644
--- a/common/thrift/Planner.thrift
+++ b/common/thrift/Planner.thrift
@@ -31,11 +31,14 @@ include "Partitions.thrift"
 // plan fragment, including how to produce and how to partition its output.
 // It leaves out node-specific parameters needed for the actual execution.
 struct TPlanFragment {
+  // Ordinal number of fragment within a query; range: 0..<total # fragments>
+  1: required Types.TFragmentIdx idx
+
   // display name to be shown in the runtime profile; unique within a query
-  1: required string display_name
+  2: required string display_name
 
   // no plan or descriptor table: query without From clause
-  2: optional PlanNodes.TPlan plan
+  3: optional PlanNodes.TPlan plan
 
   // exprs that produce values for slots of output tuple (one expr per slot);
   // if not set, plan fragment materializes full rows of plan_tree
@@ -74,6 +77,8 @@ struct TScanRangeLocation {
 }
 
 // A single scan range plus the hosts that serve it
+// TODO: rename to TScanRangeLocationList, having this differ from the above struct
+// by only a single letter has caused needless confusion
 struct TScanRangeLocations {
   1: required PlanNodes.TScanRange scan_range
   // non-empty list

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/common/thrift/Types.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/Types.thrift b/common/thrift/Types.thrift
index 770a414..30d168d 100644
--- a/common/thrift/Types.thrift
+++ b/common/thrift/Types.thrift
@@ -19,6 +19,7 @@ namespace cpp impala
 namespace java org.apache.impala.thrift
 
 typedef i64 TTimestamp
+typedef i32 TFragmentIdx
 typedef i32 TPlanNodeId
 typedef i32 TTupleId
 typedef i32 TSlotId

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/fe/src/main/java/org/apache/impala/common/TreeNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/common/TreeNode.java b/fe/src/main/java/org/apache/impala/common/TreeNode.java
index adaee18..3231e33 100644
--- a/fe/src/main/java/org/apache/impala/common/TreeNode.java
+++ b/fe/src/main/java/org/apache/impala/common/TreeNode.java
@@ -51,6 +51,22 @@ public abstract class TreeNode<NodeType extends TreeNode<NodeType>> {
   public ArrayList<NodeType> getChildren() { return children_; }
 
   /**
+   * Return list of all nodes of the tree rooted at 'this', obtained
+   * through pre-order traversal.
+   */
+  public <C extends TreeNode<NodeType>> ArrayList<C> getNodesPreOrder() {
+    ArrayList<C> result = new ArrayList<C>();
+    getNodesPreOrderAux(result);
+    return result;
+  }
+
+  protected <C extends TreeNode<NodeType>> void getNodesPreOrderAux(
+      ArrayList<C> result) {
+    result.add((C) this);
+    for (NodeType child: children_) child.getNodesPreOrderAux(result);
+  }
+
+  /**
    * Count the total number of nodes in this tree. Leaf node will return 1.
    * Non-leaf node will include all its children.
    */

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/fe/src/main/java/org/apache/impala/planner/Planner.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/Planner.java b/fe/src/main/java/org/apache/impala/planner/Planner.java
index e50aca5..405eebe 100644
--- a/fe/src/main/java/org/apache/impala/planner/Planner.java
+++ b/fe/src/main/java/org/apache/impala/planner/Planner.java
@@ -59,6 +59,11 @@ public class Planner {
     ctx_ = new PlannerContext(analysisResult, queryCtx);
   }
 
+  public TQueryCtx getQueryCtx() { return ctx_.getQueryCtx(); }
+  public AnalysisContext.AnalysisResult getAnalysisResult() {
+    return ctx_.getAnalysisResult();
+  }
+
   /**
    * Returns a list of plan fragments for executing an analyzed parse tree.
    * May return a single-node or distributed executable plan. If enabled (through a
@@ -204,6 +209,24 @@ public class Planner {
   /**
    * Return combined explain string for all plan fragments.
    * Includes the estimated resource requirements from the request if set.
+   * Uses a default level of EXTENDED, unless overriden by the
+   * 'explain_level' query option.
+   */
+  public String getExplainString(ArrayList<PlanFragment> fragments,
+      TQueryExecRequest request) {
+    // use EXTENDED by default for all non-explain statements
+    TExplainLevel explainLevel = TExplainLevel.EXTENDED;
+    // use the query option for explain stmts and tests (e.g., planner tests)
+    if (ctx_.getAnalysisResult().isExplainStmt() || RuntimeEnv.INSTANCE.isTestEnv()) {
+      explainLevel = ctx_.getQueryOptions().getExplain_level();
+    }
+    return getExplainString(fragments, request, explainLevel);
+  }
+
+  /**
+   * Return combined explain string for all plan fragments and with an
+   * explicit explain level.
+   * Includes the estimated resource requirements from the request if set.
    */
   public String getExplainString(ArrayList<PlanFragment> fragments,
       TQueryExecRequest request, TExplainLevel explainLevel) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/fe/src/main/java/org/apache/impala/service/Frontend.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java
index fe1f8f1..00a3d93 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -120,6 +120,7 @@ import org.apache.impala.thrift.TLineageGraph;
 import org.apache.impala.thrift.TLoadDataReq;
 import org.apache.impala.thrift.TLoadDataResp;
 import org.apache.impala.thrift.TMetadataOpRequest;
+import org.apache.impala.thrift.TPlanExecInfo;
 import org.apache.impala.thrift.TPlanFragment;
 import org.apache.impala.thrift.TPlanFragmentTree;
 import org.apache.impala.thrift.TQueryCtx;
@@ -908,70 +909,123 @@ public class Frontend {
   }
 
   /**
-   * Create a populated TExecRequest corresponding to the supplied TQueryCtx.
+   * Return a TPlanExecInfo corresponding to the plan with root fragment 'planRoot'.
    */
-  public TExecRequest createExecRequest(TQueryCtx queryCtx, StringBuilder explainString)
-      throws ImpalaException {
-    // Analyze the statement
-    AnalysisContext.AnalysisResult analysisResult = analyzeStmt(queryCtx);
-    EventSequence timeline = analysisResult.getAnalyzer().getTimeline();
-    timeline.markEvent("Analysis finished");
-    Preconditions.checkNotNull(analysisResult.getStmt());
-    TExecRequest result = new TExecRequest();
-    result.setQuery_options(queryCtx.request.getQuery_options());
-    result.setAccess_events(analysisResult.getAccessEvents());
-    result.analysis_warnings = analysisResult.getAnalyzer().getWarnings();
+  private TPlanExecInfo createPlanExecInfo(PlanFragment planRoot, Planner planner,
+      TQueryCtx queryCtx, TQueryExecRequest queryExecRequest) {
+    TPlanExecInfo result = new TPlanExecInfo();
+    ArrayList<PlanFragment> fragments = planRoot.getNodesPreOrder();
 
-    if (analysisResult.isCatalogOp()) {
-      result.stmt_type = TStmtType.DDL;
-      createCatalogOpRequest(analysisResult, result);
-      TLineageGraph thriftLineageGraph = analysisResult.getThriftLineageGraph();
-      if (thriftLineageGraph != null && thriftLineageGraph.isSetQuery_text()) {
-        result.catalog_op_request.setLineage_graph(thriftLineageGraph);
-      }
-      // All DDL operations except for CTAS are done with analysis at this point.
-      if (!analysisResult.isCreateTableAsSelectStmt()) return result;
-    } else if (analysisResult.isLoadDataStmt()) {
-      result.stmt_type = TStmtType.LOAD;
-      result.setResult_set_metadata(new TResultSetMetadata(Arrays.asList(
-          new TColumn("summary", Type.STRING.toThrift()))));
-      result.setLoad_data_request(analysisResult.getLoadDataStmt().toThrift());
-      return result;
-    } else if (analysisResult.isSetStmt()) {
-      result.stmt_type = TStmtType.SET;
-      result.setResult_set_metadata(new TResultSetMetadata(Arrays.asList(
-          new TColumn("option", Type.STRING.toThrift()),
-          new TColumn("value", Type.STRING.toThrift()))));
-      result.setSet_query_option_request(analysisResult.getSetStmt().toThrift());
-      return result;
+    // map from fragment to its index in TPlanExecInfo.fragments; needed for
+    // TPlanExecInfo.dest_fragment_idx
+    List<ScanNode> scanNodes = Lists.newArrayList();
+    Map<PlanFragment, Integer> fragmentIdx = Maps.newHashMap();
+    for (int idx = 0; idx < fragments.size(); ++idx) {
+      PlanFragment fragment = fragments.get(idx);
+      Preconditions.checkNotNull(fragment.getPlanRoot());
+      fragment.getPlanRoot().collect(Predicates.instanceOf(ScanNode.class), scanNodes);
+      fragmentIdx.put(fragment, idx);
     }
 
-    // create TQueryExecRequest
-    Preconditions.checkState(analysisResult.isQueryStmt() || analysisResult.isDmlStmt()
-        || analysisResult.isCreateTableAsSelectStmt() || analysisResult.isUpdateStmt()
-        || analysisResult.isDeleteStmt());
+    // set fragment destinations
+    for (int i = 1; i < fragments.size(); ++i) {
+      PlanFragment dest = fragments.get(i).getDestFragment();
+      Integer idx = fragmentIdx.get(dest);
+      Preconditions.checkState(idx != null);
+      result.addToDest_fragment_idx(idx.intValue());
+    }
 
-    TQueryExecRequest queryExecRequest = new TQueryExecRequest();
-    // create plan
-    LOG.debug("create plan");
-    Planner planner = new Planner(analysisResult, queryCtx);
-    if (RuntimeEnv.INSTANCE.isTestEnv()
-        && queryCtx.request.query_options.mt_num_cores > 0) {
-      // TODO: this is just to be able to run tests; implement this
-      List<PlanFragment> planRoots = planner.createParallelPlans();
-      for (PlanFragment planRoot: planRoots) {
-        TPlanFragmentTree thriftPlan = planRoot.treeToThrift();
-        queryExecRequest.addToMt_plans(thriftPlan);
+    // Set scan ranges/locations for scan nodes.
+    LOG.debug("get scan range locations");
+    Set<TTableName> tablesMissingStats = Sets.newTreeSet();
+    Set<TTableName> tablesWithCorruptStats = Sets.newTreeSet();
+    for (ScanNode scanNode: scanNodes) {
+      result.putToPer_node_scan_ranges(
+          scanNode.getId().asInt(), scanNode.getScanRangeLocations());
+      if (scanNode.isTableMissingStats()) {
+        tablesMissingStats.add(scanNode.getTupleDesc().getTableName().toThrift());
       }
-      queryExecRequest.setDesc_tbl(analysisResult.getAnalyzer().getDescTbl().toThrift());
-      queryExecRequest.setQuery_ctx(queryCtx);
-      explainString.append(planner.getExplainString(
-          Lists.newArrayList(planRoots.get(0)), queryExecRequest,
-          TExplainLevel.STANDARD));
-      queryExecRequest.setQuery_plan(explainString.toString());
-      result.setQuery_exec_request(queryExecRequest);
-      return result;
+      if (scanNode.hasCorruptTableStats()) {
+        tablesWithCorruptStats.add(scanNode.getTupleDesc().getTableName().toThrift());
+      }
+    }
+
+    for (TTableName tableName: tablesMissingStats) {
+      queryCtx.addToTables_missing_stats(tableName);
     }
+    for (TTableName tableName: tablesWithCorruptStats) {
+      queryCtx.addToTables_with_corrupt_stats(tableName);
+    }
+
+    // The fragment at this point has all state set, serialize it to thrift.
+    for (PlanFragment fragment: fragments) {
+      TPlanFragment thriftFragment = fragment.toThrift();
+      result.addToFragments(thriftFragment);
+    }
+
+    return result;
+  }
+
+  /**
+   * Create a populated TQueryExecRequest, corresponding to the supplied planner,
+   * for multi-threaded execution.
+   */
+  private TQueryExecRequest mtCreateExecRequest(
+      Planner planner, StringBuilder explainString)
+      throws ImpalaException {
+    TQueryCtx queryCtx = planner.getQueryCtx();
+    Preconditions.checkState(queryCtx.request.query_options.mt_dop > 0);
+    // for now, always disable spilling in the backend
+    // TODO-MT: re-enable spilling
+    queryCtx.setDisable_spilling(true);
+    TQueryExecRequest result = new TQueryExecRequest();
+
+    LOG.debug("create mt plan");
+    List<PlanFragment> planRoots = planner.createParallelPlans();
+
+    // create EXPLAIN output
+    result.setQuery_ctx(queryCtx);  // needed by getExplainString()
+    explainString.append(
+        planner.getExplainString(Lists.newArrayList(planRoots.get(0)), result));
+    result.setQuery_plan(explainString.toString());
+
+    // create per-plan exec info;
+    // also assemble list of names of tables with missing or corrupt stats for
+    // assembling a warning message
+    for (PlanFragment planRoot: planRoots) {
+      result.addToMt_plan_exec_info(
+          createPlanExecInfo(planRoot, planner, queryCtx, result));
+    }
+
+    // assign fragment ids
+    int idx = 0;
+    for (TPlanExecInfo planExecInfo: result.mt_plan_exec_info) {
+      for (TPlanFragment fragment: planExecInfo.fragments) fragment.setIdx(idx++);
+    }
+
+    // TODO-MT: implement
+    // Compute resource requirements after scan range locations because the cost
+    // estimates of scan nodes rely on them.
+    //try {
+      //planner.computeResourceReqs(fragments, true, queryExecRequest);
+    //} catch (Exception e) {
+      //// Turn exceptions into a warning to allow the query to execute.
+      //LOG.error("Failed to compute resource requirements for query\n" +
+          //queryCtx.request.getStmt(), e);
+    //}
+
+    return result;
+  }
+
+  /**
+   * Create a populated TQueryExecRequest corresponding to the supplied TQueryCtx.
+   * TODO-MT: remove this function and rename mtCreateExecRequest() to
+   * createExecRequest()
+   */
+  private TQueryExecRequest createExecRequest(
+      Planner planner, StringBuilder explainString)
+      throws ImpalaException {
+    LOG.debug("create plan");
     ArrayList<PlanFragment> fragments = planner.createPlan();
 
     List<ScanNode> scanNodes = Lists.newArrayList();
@@ -986,12 +1040,13 @@ public class Frontend {
       fragmentIdx.put(fragment, idx);
     }
 
+    TQueryExecRequest result = new TQueryExecRequest();
     // set fragment destinations
     for (int i = 1; i < fragments.size(); ++i) {
       PlanFragment dest = fragments.get(i).getDestFragment();
       Integer idx = fragmentIdx.get(dest);
       Preconditions.checkState(idx != null);
-      queryExecRequest.addToDest_fragment_idx(idx.intValue());
+      result.addToDest_fragment_idx(idx.intValue());
     }
 
     // Set scan ranges/locations for scan nodes.
@@ -1001,9 +1056,8 @@ public class Frontend {
     // Assemble a similar list for corrupt stats
     Set<TTableName> tablesWithCorruptStats = Sets.newTreeSet();
     for (ScanNode scanNode: scanNodes) {
-      queryExecRequest.putToPer_node_scan_ranges(
-          scanNode.getId().asInt(),
-          scanNode.getScanRangeLocations());
+      result.putToPer_node_scan_ranges(
+          scanNode.getId().asInt(), scanNode.getScanRangeLocations());
       if (scanNode.isTableMissingStats()) {
         tablesMissingStats.add(scanNode.getTupleDesc().getTableName().toThrift());
       }
@@ -1012,7 +1066,7 @@ public class Frontend {
       }
     }
 
-    queryExecRequest.setHost_list(analysisResult.getAnalyzer().getHostIndex().getList());
+    TQueryCtx queryCtx = planner.getQueryCtx();
     for (TTableName tableName: tablesMissingStats) {
       queryCtx.addToTables_missing_stats(tableName);
     }
@@ -1022,6 +1076,7 @@ public class Frontend {
 
     // Optionally disable spilling in the backend. Allow spilling if there are plan hints
     // or if all tables have stats.
+    AnalysisContext.AnalysisResult analysisResult = planner.getAnalysisResult();
     if (queryCtx.request.query_options.isDisable_unsafe_spills()
         && !tablesMissingStats.isEmpty()
         && !analysisResult.getAnalyzer().hasPlanHints()) {
@@ -1031,33 +1086,83 @@ public class Frontend {
     // Compute resource requirements after scan range locations because the cost
     // estimates of scan nodes rely on them.
     try {
-      planner.computeResourceReqs(fragments, true, queryExecRequest);
+      planner.computeResourceReqs(fragments, true, result);
     } catch (Exception e) {
       // Turn exceptions into a warning to allow the query to execute.
       LOG.error("Failed to compute resource requirements for query\n" +
           queryCtx.request.getStmt(), e);
     }
 
-    // The fragment at this point has all state set, serialize it to thrift.
-    for (PlanFragment fragment: fragments) {
+    // The fragment at this point has all state set, assign sequential ids
+    // and serialize to thrift.
+    for (int i = 0; i < fragments.size(); ++i) {
+      PlanFragment fragment = fragments.get(i);
       TPlanFragment thriftFragment = fragment.toThrift();
-      queryExecRequest.addToFragments(thriftFragment);
+      thriftFragment.setIdx(i);
+      result.addToFragments(thriftFragment);
     }
 
-    // Use EXTENDED by default for all non-explain statements.
-    TExplainLevel explainLevel = TExplainLevel.EXTENDED;
-    // Use the query option for explain stmts and tests (e.g., planner tests).
-    if (analysisResult.isExplainStmt() || RuntimeEnv.INSTANCE.isTestEnv()) {
-      explainLevel = queryCtx.request.query_options.getExplain_level();
+    result.setQuery_ctx(queryCtx);  // needed by getExplainString()
+    explainString.append(planner.getExplainString(fragments, result));
+    result.setQuery_plan(explainString.toString());
+    return result;
+  }
+
+  /**
+   * Create a populated TExecRequest corresponding to the supplied TQueryCtx.
+   */
+  public TExecRequest createExecRequest(TQueryCtx queryCtx, StringBuilder explainString)
+      throws ImpalaException {
+    // Analyze the statement
+    AnalysisContext.AnalysisResult analysisResult = analyzeStmt(queryCtx);
+    EventSequence timeline = analysisResult.getAnalyzer().getTimeline();
+    timeline.markEvent("Analysis finished");
+    Preconditions.checkNotNull(analysisResult.getStmt());
+    TExecRequest result = new TExecRequest();
+    result.setQuery_options(queryCtx.request.getQuery_options());
+    result.setAccess_events(analysisResult.getAccessEvents());
+    result.analysis_warnings = analysisResult.getAnalyzer().getWarnings();
+
+    if (analysisResult.isCatalogOp()) {
+      result.stmt_type = TStmtType.DDL;
+      createCatalogOpRequest(analysisResult, result);
+      TLineageGraph thriftLineageGraph = analysisResult.getThriftLineageGraph();
+      if (thriftLineageGraph != null && thriftLineageGraph.isSetQuery_text()) {
+        result.catalog_op_request.setLineage_graph(thriftLineageGraph);
+      }
+      // All DDL operations except for CTAS are done with analysis at this point.
+      if (!analysisResult.isCreateTableAsSelectStmt()) return result;
+    } else if (analysisResult.isLoadDataStmt()) {
+      result.stmt_type = TStmtType.LOAD;
+      result.setResult_set_metadata(new TResultSetMetadata(Arrays.asList(
+          new TColumn("summary", Type.STRING.toThrift()))));
+      result.setLoad_data_request(analysisResult.getLoadDataStmt().toThrift());
+      return result;
+    } else if (analysisResult.isSetStmt()) {
+      result.stmt_type = TStmtType.SET;
+      result.setResult_set_metadata(new TResultSetMetadata(Arrays.asList(
+          new TColumn("option", Type.STRING.toThrift()),
+          new TColumn("value", Type.STRING.toThrift()))));
+      result.setSet_query_option_request(analysisResult.getSetStmt().toThrift());
+      return result;
     }
 
-    // Global query parameters to be set in each TPlanExecRequest.
-    queryExecRequest.setQuery_ctx(queryCtx);
+    // create TQueryExecRequest
+    Preconditions.checkState(analysisResult.isQueryStmt() || analysisResult.isDmlStmt()
+        || analysisResult.isCreateTableAsSelectStmt() || analysisResult.isUpdateStmt()
+        || analysisResult.isDeleteStmt());
 
-    explainString.append(
-        planner.getExplainString(fragments, queryExecRequest, explainLevel));
-    queryExecRequest.setQuery_plan(explainString.toString());
-    queryExecRequest.setDesc_tbl(analysisResult.getAnalyzer().getDescTbl().toThrift());
+    Planner planner = new Planner(analysisResult, queryCtx);
+    TQueryExecRequest queryExecRequest;
+    if (analysisResult.isQueryStmt() && queryCtx.request.query_options.mt_dop > 0) {
+      queryExecRequest = mtCreateExecRequest(planner, explainString);
+    } else {
+      queryExecRequest = createExecRequest(planner, explainString);
+    }
+    queryExecRequest.setDesc_tbl(
+        planner.getAnalysisResult().getAnalyzer().getDescTbl().toThrift());
+    queryExecRequest.setQuery_ctx(queryCtx);
+    queryExecRequest.setHost_list(analysisResult.getAnalyzer().getHostIndex().getList());
 
     TLineageGraph thriftLineageGraph = analysisResult.getThriftLineageGraph();
     if (thriftLineageGraph != null && thriftLineageGraph.isSetQuery_text()) {
@@ -1071,7 +1176,6 @@ public class Frontend {
     }
 
     result.setQuery_exec_request(queryExecRequest);
-
     if (analysisResult.isQueryStmt()) {
       // fill in the metadata
       LOG.debug("create result set metadata");

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java b/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
index 4464203..284d7e5 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
@@ -374,9 +374,9 @@ public class PlannerTestBase extends FrontendTestBase {
   }
 
   /**
-   * Produces single-node and distributed plans for testCase and compares
+   * Produces single-node, distributed, and parallel plans for testCase and compares
    * plan and scan range results.
-   * Appends the actual single-node and distributed plan as well as the printed
+   * Appends the actual plans as well as the printed
    * scan ranges to actualOutput, along with the requisite section header.
    * locations to actualScanRangeLocations; compares both to the appropriate sections
    * of 'testCase'.
@@ -430,7 +430,7 @@ public class PlannerTestBase extends FrontendTestBase {
           ImpalaInternalServiceConstants.NUM_NODES_ALL);
     }
     if (section == Section.PARALLELPLANS) {
-      queryCtx.request.query_options.mt_num_cores = 2;
+      queryCtx.request.query_options.mt_dop = 2;
     }
     ArrayList<String> expectedPlan = testCase.getSectionContents(section);
     boolean sectionExists = expectedPlan != null && !expectedPlan.isEmpty();


[3/4] incubator-impala git commit: IMPALA-3902: Scheduler improvements for running multiple fragment instances on a single backend

Posted by ta...@apache.org.
IMPALA-3902: Scheduler improvements for running multiple fragment
instances on a single backend

This is an extension of the scheduler and coordinator for multi-threaded
execution. It mainly removes the assumption of having one instance per
fragment per host. The approach taken here is to create parallel data
structures and control flow functions, where necessary, and otherwise to
leave the existing single-instance logic in place. The parallel
structures' and functions' names are prefixed with "Mt" to facilitate
the enventual clean-up.
Not much of an attempt was made to factor out common functionality
between the Mt- and the single-threaded version, because the
single-threaded version will disappear in a follow-on patch and
refactoring the existing code to fit into two parallel functions from
which it's being called might end up obscuring the code more than helping
it. Also, this code is relatively stable and having two parallel paths won't
cause much extra work (in terms of having to apply the same changes/fixes
twice) in the medium term.

Changes to data structures:
- QuerySchedule: per-instance and per-fragment structs with complete
  execution parameters (instead of partially relying on TQueryExecRequest);
  the per-instance execution parameter struct is a child of the per-fragment
  parameter struct
- explicit fragment id, with range 0..#fragments-1 (instead of relying on an
  index into an array in TQueryExecRequest)

Excluded:
- runtime filter handling
- anything related to RM

Change-Id: I240445199e22f009f4e72fdb8754eb8d77e3d680
Reviewed-on: http://gerrit.cloudera.org:8080/4054
Reviewed-by: Marcel Kornacker <ma...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/a9b9933b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/a9b9933b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/a9b9933b

Branch: refs/heads/master
Commit: a9b9933b5f059bd908291fd94d6f6f4fb88eeb7a
Parents: 5d9f5be
Author: Marcel Kornacker <ma...@cloudera.com>
Authored: Mon Jul 18 08:37:21 2016 -0700
Committer: Internal Jenkins <cl...@gerrit.cloudera.org>
Committed: Thu Oct 6 00:20:36 2016 +0000

----------------------------------------------------------------------
 be/src/common/global-types.h                    |   1 +
 be/src/exec/exec-node.cc                        |   2 +-
 be/src/exec/union-node.cc                       |   2 +-
 be/src/runtime/coordinator.cc                   | 810 +++++++++++++------
 be/src/runtime/coordinator.h                    | 103 ++-
 be/src/runtime/runtime-filter-bank.cc           |   2 +
 be/src/scheduling/query-schedule.cc             | 158 +++-
 be/src/scheduling/query-schedule.h              | 155 +++-
 be/src/scheduling/scheduler.h                   |  10 +-
 be/src/scheduling/simple-scheduler.cc           | 263 +++++-
 be/src/scheduling/simple-scheduler.h            |  38 +-
 be/src/service/fragment-exec-state.cc           |   1 -
 be/src/service/fragment-mgr.cc                  |   7 +-
 be/src/service/impala-server.cc                 |  15 +-
 be/src/service/query-exec-state.cc              |  18 +-
 be/src/service/query-options.cc                 |  10 +-
 be/src/service/query-options.h                  |   2 +-
 be/src/util/container-util.h                    |  26 +
 be/src/util/uid-util-test.cc                    |   7 +-
 be/src/util/uid-util.h                          |   2 +-
 common/thrift/ExecStats.thrift                  |  10 +-
 common/thrift/Frontend.thrift                   |  32 +-
 common/thrift/ImpalaInternalService.thrift      |  61 +-
 common/thrift/ImpalaService.thrift              |   5 +-
 common/thrift/Planner.thrift                    |   9 +-
 common/thrift/Types.thrift                      |   1 +
 .../java/org/apache/impala/common/TreeNode.java |  16 +
 .../java/org/apache/impala/planner/Planner.java |  23 +
 .../org/apache/impala/service/Frontend.java     | 260 ++++--
 .../apache/impala/planner/PlannerTestBase.java  |   6 +-
 30 files changed, 1525 insertions(+), 530 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/be/src/common/global-types.h
----------------------------------------------------------------------
diff --git a/be/src/common/global-types.h b/be/src/common/global-types.h
index d4a41a5..3111e94 100644
--- a/be/src/common/global-types.h
+++ b/be/src/common/global-types.h
@@ -27,5 +27,6 @@ typedef int TupleId;
 typedef int SlotId;
 typedef int TableId;
 typedef int PlanNodeId;
+typedef int FragmentIdx;
 
 };

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/be/src/exec/exec-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/exec-node.cc b/be/src/exec/exec-node.cc
index 2dce0d3..837fc09 100644
--- a/be/src/exec/exec-node.cc
+++ b/be/src/exec/exec-node.cc
@@ -264,7 +264,7 @@ Status ExecNode::CreateNode(ObjectPool* pool, const TPlanNode& tnode,
   switch (tnode.node_type) {
     case TPlanNodeType::HDFS_SCAN_NODE:
       *node = pool->Add(new HdfsScanNode(pool, tnode, descs));
-      if (state->query_options().mt_num_cores > 0) {
+      if (state->query_options().mt_dop > 0) {
         *node = pool->Add(new HdfsScanNodeMt(pool, tnode, descs));
       } else {
         *node = pool->Add(new HdfsScanNode(pool, tnode, descs));

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/be/src/exec/union-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/union-node.cc b/be/src/exec/union-node.cc
index 9fa8d28..8b56099 100644
--- a/be/src/exec/union-node.cc
+++ b/be/src/exec/union-node.cc
@@ -171,7 +171,7 @@ Status UnionNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) {
   // Evaluate and materialize the const expr lists exactly once.
   while (const_result_expr_idx_ < const_result_expr_ctx_lists_.size()) {
     // Only evaluate the const expr lists by the first fragment instance.
-    if (state->fragment_ctx().fragment_instance_idx == 0) {
+    if (state->fragment_ctx().per_fragment_instance_idx == 0) {
       // Materialize expr results into row_batch.
       RETURN_IF_ERROR(EvalAndMaterializeExprs(
           const_result_expr_ctx_lists_[const_result_expr_idx_], true, &tuple,

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/be/src/runtime/coordinator.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index e2dd1a4..df4ad7b 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -66,6 +66,7 @@
 #include "util/pretty-printer.h"
 #include "util/summary-util.h"
 #include "util/table-printer.h"
+#include "util/uid-util.h"
 #include "gen-cpp/ImpalaInternalService.h"
 #include "gen-cpp/ImpalaInternalService_types.h"
 #include "gen-cpp/Frontend_types.h"
@@ -111,10 +112,10 @@ struct DebugOptions {
 
   // If these debug options apply to the candidate fragment instance, returns true
   // otherwise returns false.
-  bool IsApplicable(int candidate_fragment_instance_idx) {
+  bool IsApplicable(int candidate_instance_state_idx) {
     if (phase == TExecNodePhase::INVALID) return false;
     return (instance_state_idx == -1 ||
-        instance_state_idx == candidate_fragment_instance_idx);
+        instance_state_idx == candidate_instance_state_idx);
   }
 };
 
@@ -124,16 +125,35 @@ struct DebugOptions {
 /// - updates through UpdateFragmentExecStatus()
 class Coordinator::FragmentInstanceState {
  public:
-  FragmentInstanceState(int fragment_idx, const FragmentExecParams* params,
-      int instance_idx, ObjectPool* obj_pool)
-    : fragment_instance_id_(params->instance_ids[instance_idx]),
-      impalad_address_(params->hosts[instance_idx]),
-      total_split_size_(0),
+  // TODO-MT: remove this c'tor
+  FragmentInstanceState(FragmentIdx fragment_idx, const FragmentExecParams& params,
+      int per_fragment_instance_idx, ObjectPool* obj_pool)
+    : fragment_instance_id_(params.instance_ids[per_fragment_instance_idx]),
       fragment_idx_(fragment_idx),
-      instance_idx_(instance_idx),
+      per_fragment_instance_idx_(per_fragment_instance_idx),
+      impalad_address_(params.hosts[per_fragment_instance_idx]),
+      total_split_size_(0),
       rpc_sent_(false),
       done_(false),
       profile_created_(false),
+      profile_(NULL),
+      total_ranges_complete_(0),
+      rpc_latency_(0) {
+    const string& profile_name = Substitute("Instance $0 (host=$1)",
+        PrintId(fragment_instance_id_), lexical_cast<string>(impalad_address_));
+    profile_ = obj_pool->Add(new RuntimeProfile(obj_pool, profile_name));
+  }
+
+  FragmentInstanceState(const FInstanceExecParams& params, ObjectPool* obj_pool)
+    : fragment_instance_id_(params.instance_id),
+      fragment_idx_(params.fragment().idx),
+      per_fragment_instance_idx_(params.per_fragment_instance_idx),
+      impalad_address_(params.host),
+      total_split_size_(0),
+      rpc_sent_(false),
+      done_(false),
+      profile_created_(false),
+      profile_(NULL),
       total_ranges_complete_(0),
       rpc_latency_(0) {
     const string& profile_name = Substitute("Instance $0 (host=$1)",
@@ -161,26 +181,31 @@ class Coordinator::FragmentInstanceState {
 
   // The following getters do not require lock() to be held.
   const TUniqueId& fragment_instance_id() const { return fragment_instance_id_; }
+  FragmentIdx fragment_idx() const { return fragment_idx_; }
   MonotonicStopWatch* stopwatch() { return &stopwatch_; }
   const TNetworkAddress& impalad_address() const { return impalad_address_; }
   int64_t total_split_size() const { return total_split_size_; }
   bool done() const { return done_; }
-  int fragment_idx() const { return fragment_idx_; }
-  int instance_idx() const { return instance_idx_; }
+  int per_fragment_instance_idx() const { return per_fragment_instance_idx_; }
   bool rpc_sent() const { return rpc_sent_; }
   int64_t rpc_latency() const { return rpc_latency_; }
 
-  // Getters below must be accessed with lock() held
-  RuntimeProfile* profile() { return profile_; }
+  mutex* lock() { return &lock_; }
+
+  void set_status(const Status& status) { status_ = status; }
+  void set_done(bool done) { done_ = done; }
+  void set_rpc_latency(int64_t millis) {
+    DCHECK_EQ(rpc_latency_, 0);
+    rpc_latency_ = millis;
+  }
+
+  // Return values of the following functions must be accessed with lock() held
+  RuntimeProfile* profile() const { return profile_; }
+  void set_profile(RuntimeProfile* profile) { profile_ = profile; }
   FragmentInstanceCounters* aggregate_counters() { return &aggregate_counters_; }
   ErrorLogMap* error_log() { return &error_log_; }
   Status* status() { return &status_; }
 
-  mutex* lock() { return &lock_; }
-
-  void SetStatus(const Status& status) { status_ = status; }
-  void SetDone(bool done) { done_ = done; }
-
   /// Registers that the fragment instance's profile has been created and initially
   /// populated. Returns whether the profile had already been initialised so that callers
   /// can tell if they are the first to do so. Not thread-safe.
@@ -190,16 +215,17 @@ class Coordinator::FragmentInstanceState {
     return cur;
   }
 
-  void SetRpcLatency(int64_t millis) {
-    DCHECK_EQ(rpc_latency_, 0);
-    rpc_latency_ = millis;
-  }
-
  private:
   /// The unique ID of this instance of this fragment (there may be many instance of the
   /// same fragment, but this ID uniquely identifies this FragmentInstanceState).
   TUniqueId fragment_instance_id_;
 
+  // Same as TPlanFragment.idx
+  FragmentIdx fragment_idx_;
+
+  /// range: 0..<# instances of this fragment>-1
+  int per_fragment_instance_idx_;
+
   /// Wall clock timer for this fragment.
   MonotonicStopWatch stopwatch_;
 
@@ -209,12 +235,6 @@ class Coordinator::FragmentInstanceState {
   /// Summed across all splits; in bytes.
   int64_t total_split_size_;
 
-  /// Fragment idx for this ExecState
-  int fragment_idx_;
-
-  /// The 0-based instance idx.
-  int instance_idx_;
-
   /// Protects fields below. Can be held while doing an RPC, so SpinLock is a bad idea.
   /// lock ordering: Coordinator::lock_ must only be obtained *prior* to lock_
   mutex lock_;
@@ -268,10 +288,14 @@ class Coordinator::FilterState {
       disabled_(false) { }
 
   TBloomFilter* bloom_filter() { return bloom_filter_.get(); }
-  boost::unordered_set<int>* src_fragment_instance_idxs() {
-    return &src_fragment_instance_idxs_;
+  boost::unordered_set<int>* src_fragment_instance_state_idxs() {
+    return &src_fragment_instance_state_idxs_;
+  }
+  const boost::unordered_set<int>& src_fragment_instance_state_idxs() const {
+    return src_fragment_instance_state_idxs_;
   }
   std::vector<FilterTarget>* targets() { return &targets_; }
+  const std::vector<FilterTarget>& targets() const { return targets_; }
   int64_t first_arrival_time() const { return first_arrival_time_; }
   int64_t completion_time() const { return completion_time_; }
   const TPlanNodeId& src() const { return src_; }
@@ -295,7 +319,7 @@ class Coordinator::FilterState {
   std::vector<FilterTarget> targets_;
 
   // Index into fragment_instance_states_ for source fragment instances.
-  boost::unordered_set<int> src_fragment_instance_idxs_;
+  boost::unordered_set<int> src_fragment_instance_state_idxs_;
 
   /// Number of remaining backends to hear from before filter is complete.
   int pending_count_;
@@ -347,9 +371,10 @@ int64_t Coordinator::FragmentInstanceState::UpdateNumScanRangesCompleted() {
   return delta;
 }
 
-Coordinator::Coordinator(const TQueryOptions& query_options, ExecEnv* exec_env,
+Coordinator::Coordinator(const QuerySchedule& schedule, ExecEnv* exec_env,
     RuntimeProfile::EventSequence* events)
-  : exec_env_(exec_env),
+  : schedule_(schedule),
+    exec_env_(exec_env),
     has_called_wait_(false),
     returned_all_results_(false),
     executor_(NULL), // Set in Prepare()
@@ -358,7 +383,7 @@ Coordinator::Coordinator(const TQueryOptions& query_options, ExecEnv* exec_env,
     obj_pool_(new ObjectPool()),
     query_events_(events),
     filter_routing_table_complete_(false),
-    filter_mode_(query_options.runtime_filter_mode),
+    filter_mode_(schedule.query_options().runtime_filter_mode),
     torn_down_(false) {
 }
 
@@ -423,16 +448,16 @@ static void ProcessQueryOptions(
       << "because nodes cannot be cancelled in Close()";
 }
 
-Status Coordinator::Exec(QuerySchedule& schedule,
-    vector<ExprContext*>* output_expr_ctxs) {
-  const TQueryExecRequest& request = schedule.request();
-  DCHECK_GT(request.fragments.size(), 0);
+Status Coordinator::Exec(vector<ExprContext*>* output_expr_ctxs) {
+  const TQueryExecRequest& request = schedule_.request();
+  DCHECK(request.fragments.size() > 0 || request.mt_plan_exec_info.size() > 0);
   needs_finalization_ = request.__isset.finalize_params;
   if (needs_finalization_) finalize_params_ = request.finalize_params;
 
-  VLOG_QUERY << "Exec() query_id=" << schedule.query_id();
+  VLOG_QUERY << "Exec() query_id=" << schedule_.query_id()
+      << " stmt=" << request.query_ctx.request.stmt;
   stmt_type_ = request.stmt_type;
-  query_id_ = schedule.query_id();
+  query_id_ = schedule_.query_id();
   desc_tbl_ = request.desc_tbl;
   query_ctx_ = request.query_ctx;
 
@@ -443,51 +468,35 @@ Status Coordinator::Exec(QuerySchedule& schedule,
 
   SCOPED_TIMER(query_profile_->total_time_counter());
 
-  const TNetworkAddress& coord = MakeNetworkAddress(FLAGS_hostname, FLAGS_be_port);
-
   // After the coordinator fragment is started, it may call UpdateFilter() asynchronously,
   // which waits on this barrier for completion.
-  if (schedule.num_fragment_instances() > 0) {
-    exec_complete_barrier_.reset(new CountingBarrier(schedule.num_fragment_instances()));
+  // TODO: remove special treatment of coord fragment
+  int num_remote_instances = schedule_.GetNumRemoteFInstances();
+  if (num_remote_instances > 0) {
+    exec_complete_barrier_.reset(new CountingBarrier(num_remote_instances));
   }
+  num_remaining_fragment_instances_ = num_remote_instances;
+
+  // TODO: move initial setup into a separate function; right now part of it
+  // (InitExecProfile()) depends on the coordinator fragment having been started
+
+  // initialize progress updater
+  const string& str = Substitute("Query $0", PrintId(query_id_));
+  progress_.Init(str, schedule_.num_scan_ranges());
 
   // to keep things simple, make async Cancel() calls wait until plan fragment
   // execution has been initiated, otherwise we might try to cancel fragment
   // execution at Impala daemons where it hasn't even started
   lock_guard<mutex> l(lock_);
 
-  // we run the root fragment ourselves if it is unpartitioned
-  bool has_coordinator_fragment =
-      request.fragments[0].partition.type == TPartitionType::UNPARTITIONED;
-
+  bool has_coordinator_fragment = schedule_.GetCoordFragment() != NULL;
   if (has_coordinator_fragment) {
-    executor_.reset(new PlanFragmentExecutor(
-            exec_env_, PlanFragmentExecutor::ReportStatusCallback()));
-    // If a coordinator fragment is requested (for most queries this will be the case, the
-    // exception is parallel INSERT queries), start this before starting any more plan
+    // Start this before starting any more plan
     // fragments, otherwise they start sending data before the local exchange node had a
     // chance to register with the stream mgr.
     // TODO: This is no longer necessary (see IMPALA-1599). Consider starting all
     // fragments in the same way with no coordinator special case.
-    if (filter_mode_ != TRuntimeFilterMode::OFF) {
-      UpdateFilterRoutingTable(request.fragments[0].plan.nodes, 1, 0);
-      if (schedule.num_fragment_instances() == 0) MarkFilterRoutingTableComplete();
-    }
-    TExecPlanFragmentParams rpc_params;
-    SetExecPlanFragmentParams(schedule, request.fragments[0],
-        (*schedule.exec_params())[0], 0, 0, 0, coord, &rpc_params);
-    RETURN_IF_ERROR(executor_->Prepare(rpc_params));
-
-    // Prepare output_expr_ctxs before optimizing the LLVM module. The other exprs of this
-    // coordinator fragment have been prepared in executor_->Prepare().
-    DCHECK(output_expr_ctxs != NULL);
-    RETURN_IF_ERROR(Expr::CreateExprTrees(
-        runtime_state()->obj_pool(), request.fragments[0].output_exprs,
-        output_expr_ctxs));
-    MemTracker* output_expr_tracker = runtime_state()->obj_pool()->Add(new MemTracker(
-        -1, "Output exprs", runtime_state()->instance_mem_tracker(), false));
-    RETURN_IF_ERROR(Expr::Prepare(
-        *output_expr_ctxs, runtime_state(), row_desc(), output_expr_tracker));
+    RETURN_IF_ERROR(PrepareCoordFragment(output_expr_ctxs));
   } else {
     // The coordinator instance may require a query mem tracker even if there is no
     // coordinator fragment. For example, result-caching tracks memory via the query mem
@@ -500,7 +509,7 @@ Status Coordinator::Exec(QuerySchedule& schedule,
       query_limit = query_ctx_.request.query_options.mem_limit;
     }
     MemTracker* pool_tracker = MemTracker::GetRequestPoolMemTracker(
-        schedule.request_pool(), exec_env_->process_mem_tracker());
+        schedule_.request_pool(), exec_env_->process_mem_tracker());
     query_mem_tracker_ =
         MemTracker::GetQueryMemTracker(query_id_, query_limit, pool_tracker);
 
@@ -509,17 +518,29 @@ Status Coordinator::Exec(QuerySchedule& schedule,
   filter_mem_tracker_.reset(
       new MemTracker(-1, "Runtime Filter (Coordinator)", query_mem_tracker(), false));
 
-  // Initialize the execution profile structures.
-  InitExecProfile(request);
+  // initialize execution profile structures
+  bool is_mt_execution = request.query_ctx.request.query_options.mt_dop > 0;
+  if (is_mt_execution) {
+    MtInitExecProfiles();
+    MtInitExecSummary();
+  } else {
+    InitExecProfile(request);
+  }
 
-  // Once remote fragments are started, they can start making ReportExecStatus RPCs,
-  // which will update the progress updater. So initialize it before starting remote
-  // fragments.
-  const string& str = Substitute("Query $0", PrintId(query_id_));
-  progress_.Init(str, schedule.num_scan_ranges());
+  if (num_remote_instances > 0) {
+    // pre-size fragment_instance_states_ in order to directly address by instance idx
+    // when creating FragmentInstanceStates (instead of push_back())
+    int num_fragment_instances = schedule_.GetTotalFInstances();
+    DCHECK_GT(num_fragment_instances, 0);
+    fragment_instance_states_.resize(num_fragment_instances);
+
+    if (is_mt_execution) {
+      MtStartRemoteFInstances();
+    } else {
+      StartRemoteFragments();
+    }
+    RETURN_IF_ERROR(FinishRemoteInstanceStartup());
 
-  if (schedule.num_fragment_instances() > 0) {
-    RETURN_IF_ERROR(StartRemoteFragments(&schedule));
     // If we have a coordinator fragment and remote fragments (the common case), release
     // the thread token on the coordinator fragment. This fragment spends most of the time
     // waiting and doing very little work. Holding on to the token causes underutilization
@@ -533,7 +554,7 @@ Status Coordinator::Exec(QuerySchedule& schedule,
 }
 
 void Coordinator::UpdateFilterRoutingTable(const vector<TPlanNode>& plan_nodes,
-    int num_hosts, int start_fragment_instance_idx) {
+    int num_hosts, int start_fragment_instance_state_idx) {
   DCHECK_NE(filter_mode_, TRuntimeFilterMode::OFF)
       << "UpdateFilterRoutingTable() called although runtime filters are disabled";
   DCHECK(!filter_routing_table_complete_)
@@ -555,7 +576,7 @@ void Coordinator::UpdateFilterRoutingTable(const vector<TPlanNode>& plan_nodes,
         f->set_pending_count(pending_count);
         vector<int> src_idxs;
         for (int i = 0; i < num_hosts; ++i) {
-          src_idxs.push_back(start_fragment_instance_idx + i);
+          src_idxs.push_back(start_fragment_instance_state_idx + i);
         }
 
         // If this is a broadcast join with only non-local targets, build and publish it
@@ -567,7 +588,7 @@ void Coordinator::UpdateFilterRoutingTable(const vector<TPlanNode>& plan_nodes,
           random_shuffle(src_idxs.begin(), src_idxs.end());
           src_idxs.resize(MAX_BROADCAST_FILTER_PRODUCERS);
         }
-        f->src_fragment_instance_idxs()->insert(src_idxs.begin(), src_idxs.end());
+        f->src_fragment_instance_state_idxs()->insert(src_idxs.begin(), src_idxs.end());
       } else if (plan_node.__isset.hdfs_scan_node) {
         auto it = filter.planid_to_target_ndx.find(plan_node.node_id);
         DCHECK(it != filter.planid_to_target_ndx.end());
@@ -577,7 +598,8 @@ void Coordinator::UpdateFilterRoutingTable(const vector<TPlanNode>& plan_nodes,
         }
         FilterTarget target(tFilterTarget);
         for (int i = 0; i < num_hosts; ++i) {
-          target.fragment_instance_idxs.insert(start_fragment_instance_idx + i);
+          target.fragment_instance_state_idxs.insert(
+              start_fragment_instance_state_idx + i);
         }
         f->targets()->push_back(target);
       } else {
@@ -588,32 +610,82 @@ void Coordinator::UpdateFilterRoutingTable(const vector<TPlanNode>& plan_nodes,
   }
 }
 
-Status Coordinator::StartRemoteFragments(QuerySchedule* schedule) {
-  int32_t num_fragment_instances = schedule->num_fragment_instances();
-  DCHECK_GT(num_fragment_instances , 0);
+Status Coordinator::PrepareCoordFragment(vector<ExprContext*>* output_expr_ctxs) {
+  const TQueryExecRequest& request = schedule_.request();
+  bool is_mt_execution = request.query_ctx.request.query_options.mt_dop > 0;
+  if (!is_mt_execution && filter_mode_ != TRuntimeFilterMode::OFF) {
+    UpdateFilterRoutingTable(schedule_.GetCoordFragment()->plan.nodes, 1, 0);
+    if (schedule_.GetNumFragmentInstances() == 0) MarkFilterRoutingTableComplete();
+  }
+
+  // create rpc params and FragmentInstanceState for the coordinator fragment
+  TExecPlanFragmentParams rpc_params;
+  FragmentInstanceState* coord_state = nullptr;
+  if (is_mt_execution) {
+    const FInstanceExecParams& coord_params = schedule_.GetCoordInstanceExecParams();
+    MtSetExecPlanFragmentParams(coord_params, &rpc_params);
+    coord_state = obj_pool()->Add(
+        new FragmentInstanceState(coord_params, obj_pool()));
+  } else {
+    const TPlanFragment& coord_fragment = *schedule_.GetCoordFragment();
+    SetExecPlanFragmentParams(
+        coord_fragment, schedule_.exec_params()[0], 0, &rpc_params);
+    coord_state = obj_pool()->Add(
+        new FragmentInstanceState(
+          coord_fragment.idx, schedule_.exec_params()[0], 0, obj_pool()));
+    // apparently this was never called for the coordinator fragment
+    // TODO: fix this
+    //exec_state->ComputeTotalSplitSize(
+        //rpc_params.fragment_instance_ctx.per_node_scan_ranges);
+  }
+  // register state before calling Prepare(), in case it fails
+  DCHECK_EQ(GetInstanceIdx(coord_state->fragment_instance_id()), 0);
+  fragment_instance_states_.push_back(coord_state);
+  DCHECK(coord_state != nullptr);
+  DCHECK_EQ(fragment_instance_states_.size(), 1);
+  executor_.reset(new PlanFragmentExecutor(
+      exec_env_, PlanFragmentExecutor::ReportStatusCallback()));
+  RETURN_IF_ERROR(executor_->Prepare(rpc_params));
+  coord_state->set_profile(executor_->profile());
+
+  // Prepare output_expr_ctxs before optimizing the LLVM module. The other exprs of this
+  // coordinator fragment have been prepared in executor_->Prepare().
+  DCHECK(output_expr_ctxs != NULL);
+  RETURN_IF_ERROR(Expr::CreateExprTrees(
+      runtime_state()->obj_pool(), schedule_.GetCoordFragment()->output_exprs,
+      output_expr_ctxs));
+  MemTracker* output_expr_tracker = runtime_state()->obj_pool()->Add(new MemTracker(
+      -1, "Output exprs", runtime_state()->instance_mem_tracker(), false));
+  RETURN_IF_ERROR(Expr::Prepare(
+      *output_expr_ctxs, runtime_state(), row_desc(), output_expr_tracker));
+
+  return Status::OK();
+}
+
+void Coordinator::StartRemoteFragments() {
+  int num_fragment_instances = schedule_.GetNumFragmentInstances();
   DebugOptions debug_options;
-  ProcessQueryOptions(schedule->query_options(), &debug_options);
-  const TQueryExecRequest& request = schedule->request();
+  ProcessQueryOptions(schedule_.query_options(), &debug_options);
+  const TQueryExecRequest& request = schedule_.request();
 
-  fragment_instance_states_.resize(num_fragment_instances);
-  num_remaining_fragment_instances_ = num_fragment_instances;
   VLOG_QUERY << "starting " << num_fragment_instances << " fragment instances for query "
              << query_id_;
-
   query_events_->MarkEvent(
-      Substitute("Ready to start $0 remote fragments", num_fragment_instances));
+      Substitute("Ready to start $0 remote fragment instances", num_fragment_instances));
 
-  int instance_state_idx = 0;
   bool has_coordinator_fragment =
       request.fragments[0].partition.type == TPartitionType::UNPARTITIONED;
+  int instance_state_idx = has_coordinator_fragment ? 1 : 0;
   int first_remote_fragment_idx = has_coordinator_fragment ? 1 : 0;
   if (filter_mode_ != TRuntimeFilterMode::OFF) {
-    // Populate the runtime filter routing table. This should happen before
-    // starting the remote fragments.
+    // Populate the runtime filter routing table. This should happen before starting
+    // the remote fragments.
+    // This code anticipates the indices of the instance states created later on in
+    // ExecRemoteFragment()
     for (int fragment_idx = first_remote_fragment_idx;
          fragment_idx < request.fragments.size(); ++fragment_idx) {
-      const FragmentExecParams* params = &(*schedule->exec_params())[fragment_idx];
-      int num_hosts = params->hosts.size();
+      const FragmentExecParams& params = schedule_.exec_params()[fragment_idx];
+      int num_hosts = params.hosts.size();
       DCHECK_GT(num_hosts, 0);
       UpdateFilterRoutingTable(request.fragments[fragment_idx].plan.nodes, num_hosts,
           instance_state_idx);
@@ -622,13 +694,13 @@ Status Coordinator::StartRemoteFragments(QuerySchedule* schedule) {
     MarkFilterRoutingTableComplete();
   }
 
-  instance_state_idx = 0;
+  int num_instances = 0;
   // Start one fragment instance per fragment per host (number of hosts running each
   // fragment may not be constant).
   for (int fragment_idx = first_remote_fragment_idx;
        fragment_idx < request.fragments.size(); ++fragment_idx) {
-    const FragmentExecParams* params = &(*schedule->exec_params())[fragment_idx];
-    int num_hosts = params->hosts.size();
+    const FragmentExecParams& params = schedule_.exec_params()[fragment_idx];
+    int num_hosts = params.hosts.size();
     DCHECK_GT(num_hosts, 0);
     fragment_profiles_[fragment_idx].num_instances = num_hosts;
     // Start one fragment instance for every fragment_instance required by the
@@ -636,24 +708,62 @@ Status Coordinator::StartRemoteFragments(QuerySchedule* schedule) {
     // instances for fragment ID 0 being assigned IDs [0 .. num_hosts(fragment_id_0)] and
     // so on.
     for (int fragment_instance_idx = 0; fragment_instance_idx < num_hosts;
-         ++fragment_instance_idx) {
+         ++fragment_instance_idx, ++num_instances) {
       DebugOptions* fragment_instance_debug_options =
           debug_options.IsApplicable(instance_state_idx) ? &debug_options : NULL;
       exec_env_->fragment_exec_thread_pool()->Offer(
-          bind<void>(mem_fn(&Coordinator::ExecRemoteFragment), this,
-              params, // fragment_exec_params
-              &request.fragments[fragment_idx], // plan_fragment,
-              fragment_instance_debug_options,
-              schedule,
-              instance_state_idx++,
-              fragment_idx,
-              fragment_instance_idx));
+        std::bind(&Coordinator::ExecRemoteFragment, this, std::cref(params),
+          std::cref(request.fragments[fragment_idx]), fragment_instance_debug_options,
+          fragment_instance_idx));
+    }
+  }
+  exec_complete_barrier_->Wait();
+  query_events_->MarkEvent(
+      Substitute("All $0 remote fragments instances started", num_instances));
+}
+
+void Coordinator::MtStartRemoteFInstances() {
+  int num_fragment_instances = schedule_.GetNumFragmentInstances();
+  DebugOptions debug_options;
+  ProcessQueryOptions(schedule_.query_options(), &debug_options);
+  const TQueryExecRequest& request = schedule_.request();
+
+  VLOG_QUERY << "starting " << num_fragment_instances << " fragment instances for query "
+             << query_id_;
+  query_events_->MarkEvent(
+      Substitute("Ready to start $0 remote fragment instances", num_fragment_instances));
+
+  // TODO: populate the runtime filter routing table
+  // this requires local aggregation of filters prior to sending
+  // for broadcast joins in order to avoid more complicated merge logic here
+
+  int num_instances = 0;
+  for (const MtFragmentExecParams& fragment_params: schedule_.mt_fragment_exec_params()) {
+    if (fragment_params.is_coord_fragment) continue;
+    for (int i = 0; i < fragment_params.instance_exec_params.size();
+        ++i, ++num_instances) {
+      const FInstanceExecParams& instance_params =
+          fragment_params.instance_exec_params[i];
+      FragmentInstanceState* exec_state = obj_pool()->Add(
+          new FragmentInstanceState(instance_params, obj_pool()));
+      int instance_state_idx = GetInstanceIdx(instance_params.instance_id);
+      fragment_instance_states_[instance_state_idx] = exec_state;
+
+      DebugOptions* instance_debug_options =
+          debug_options.IsApplicable(instance_state_idx) ? &debug_options : NULL;
+      exec_env_->fragment_exec_thread_pool()->Offer(
+          std::bind(&Coordinator::MtExecRemoteFInstance,
+            this, std::cref(instance_params), instance_debug_options));
     }
   }
   exec_complete_barrier_->Wait();
+  VLOG_QUERY << "started " << num_fragment_instances << " fragment instances for query "
+      << query_id_;
   query_events_->MarkEvent(
-      Substitute("All $0 remote fragments started", instance_state_idx));
+      Substitute("All $0 remote fragment instances started", num_instances));
+}
 
+Status Coordinator::FinishRemoteInstanceStartup() {
   Status status = Status::OK();
   const TMetricDef& def =
       MakeTMetricDef("fragment-latencies", TMetricKind::HISTOGRAM, TUnit::TIME_MS);
@@ -665,7 +775,8 @@ Status Coordinator::StartRemoteFragments(QuerySchedule* schedule) {
     latencies.Update(exec_state->rpc_latency());
   }
 
-  query_profile_->AddInfoString("Fragment start latencies", latencies.ToHumanReadable());
+  query_profile_->AddInfoString(
+      "Fragment instance start latencies", latencies.ToHumanReadable());
 
   if (!status.ok()) {
     DCHECK(query_status_.ok()); // nobody should have been able to cancel
@@ -694,17 +805,17 @@ string Coordinator::FilterDebugString() {
   lock_guard<SpinLock> l(filter_lock_);
   for (FilterRoutingTable::value_type& v: filter_routing_table_) {
     vector<string> row;
-    FilterState& state = v.second;
+    const FilterState& state = v.second;
     row.push_back(lexical_cast<string>(v.first));
     row.push_back(lexical_cast<string>(state.src()));
     vector<string> target_ids;
     vector<string> num_target_instances;
     vector<string> target_types;
     vector<string> partition_filter;
-    for (const FilterTarget& target: *state.targets()) {
+    for (const FilterTarget& target: state.targets()) {
       target_ids.push_back(lexical_cast<string>(target.node_id));
       num_target_instances.push_back(
-          lexical_cast<string>(target.fragment_instance_idxs.size()));
+          lexical_cast<string>(target.fragment_instance_state_idxs.size()));
       target_types.push_back(target.is_local ? "LOCAL" : "REMOTE");
       partition_filter.push_back(target.is_bound_by_partition_columns ? "true" : "false");
     }
@@ -716,7 +827,7 @@ string Coordinator::FilterDebugString() {
     if (filter_mode_ == TRuntimeFilterMode::GLOBAL) {
       int pending_count = state.completion_time() != 0L ? 0 : state.pending_count();
       row.push_back(Substitute("$0 ($1)", pending_count,
-          state.src_fragment_instance_idxs()->size()));
+          state.src_fragment_instance_state_idxs().size()));
       if (state.first_arrival_time() == 0L) {
         row.push_back("N/A");
       } else {
@@ -1041,7 +1152,7 @@ Status Coordinator::FinalizeSuccessfulInsert() {
 }
 
 Status Coordinator::FinalizeQuery() {
-  // All backends must have reported their final statuses before finalization, which is a
+  // All instances must have reported their final statuses before finalization, which is a
   // post-condition of Wait. If the query was not successful, still try to clean up the
   // staging directory.
   DCHECK(has_called_wait_);
@@ -1066,17 +1177,17 @@ Status Coordinator::FinalizeQuery() {
   return return_status;
 }
 
-Status Coordinator::WaitForAllBackends() {
+Status Coordinator::WaitForAllInstances() {
   unique_lock<mutex> l(lock_);
   while (num_remaining_fragment_instances_ > 0 && query_status_.ok()) {
-    VLOG_QUERY << "Coordinator waiting for backends to finish, "
+    VLOG_QUERY << "Coordinator waiting for fragment instances to finish, "
                << num_remaining_fragment_instances_ << " remaining";
-    backend_completion_cv_.wait(l);
+    instance_completion_cv_.wait(l);
   }
   if (query_status_.ok()) {
-    VLOG_QUERY << "All backends finished successfully.";
+    VLOG_QUERY << "All fragment instances finished successfully.";
   } else {
-    VLOG_QUERY << "All backends finished due to one or more errors.";
+    VLOG_QUERY << "All fragment instances finished due to one or more errors.";
   }
 
   return query_status_;
@@ -1100,8 +1211,8 @@ Status Coordinator::Wait() {
       RuntimeState* state = runtime_state();
       DCHECK(state != NULL);
 
-      // No other backends should have updated these structures if the coordinator has a
-      // fragment.  (Backends have a sink only if the coordinator does not)
+      // No other instances should have updated these structures if the coordinator has a
+      // fragment. (Instances have a sink only if the coordinator does not)
       DCHECK_EQ(files_to_move_.size(), 0);
       DCHECK_EQ(per_partition_status_.size(), 0);
 
@@ -1110,13 +1221,13 @@ Status Coordinator::Wait() {
       per_partition_status_ = *state->per_partition_status();
     }
   } else {
-    // Query finalization can only happen when all backends have reported
+    // Query finalization can only happen when all instances have reported
     // relevant state. They only have relevant state to report in the parallel
     // INSERT case, otherwise all the relevant state is from the coordinator
     // fragment which will be available after Open() returns.
     // Ignore the returned status if finalization is required., since FinalizeQuery() will
     // pick it up and needs to execute regardless.
-    Status status = WaitForAllBackends();
+    Status status = WaitForAllInstances();
     if (!needs_finalization_ && !status.ok()) return status;
   }
 
@@ -1178,13 +1289,13 @@ Status Coordinator::GetNext(RowBatch** batch, RuntimeState* state) {
       }
     }
 
-    // Don't return final NULL until all backends have completed.
-    // GetNext must wait for all backends to complete before
+    // Don't return final NULL until all instances have completed.
+    // GetNext must wait for all instances to complete before
     // ultimately signalling the end of execution via a NULL
     // batch. After NULL is returned, the coordinator may tear down
     // query state, and perform post-query finalization which might
-    // depend on the reports from all backends.
-    RETURN_IF_ERROR(WaitForAllBackends());
+    // depend on the reports from all instances.
+    RETURN_IF_ERROR(WaitForAllInstances());
     if (query_status_.ok()) {
       // If the query completed successfully, report aggregate query profiles.
       ReportQuerySummary();
@@ -1218,14 +1329,13 @@ void Coordinator::ValidateCollectionSlots(RowBatch* batch) {
 }
 
 void Coordinator::PrintFragmentInstanceInfo() {
-  for (int i = 0; i < fragment_instance_states_.size(); ++i) {
-    SummaryStats& acc =
-        fragment_profiles_[fragment_instance_states_[i]->fragment_idx()].bytes_assigned;
-    acc(fragment_instance_states_[i]->total_split_size());
+  for (FragmentInstanceState* state: fragment_instance_states_) {
+    SummaryStats& acc = fragment_profiles_[state->fragment_idx()].bytes_assigned;
+    acc(state->total_split_size());
   }
 
-  for (int i = (executor_.get() == NULL ? 0 : 1); i < fragment_profiles_.size(); ++i) {
-    SummaryStats& acc = fragment_profiles_[i].bytes_assigned;
+  for (int id = (executor_.get() == NULL ? 0 : 1); id < fragment_profiles_.size(); ++id) {
+    SummaryStats& acc = fragment_profiles_[id].bytes_assigned;
     double min = accumulators::min(acc);
     double max = accumulators::max(acc);
     double mean = accumulators::mean(acc);
@@ -1235,13 +1345,12 @@ void Coordinator::PrintFragmentInstanceInfo() {
       << ", max: " << PrettyPrinter::Print(max, TUnit::BYTES)
       << ", avg: " << PrettyPrinter::Print(mean, TUnit::BYTES)
       << ", stddev: " << PrettyPrinter::Print(stddev, TUnit::BYTES);
-    fragment_profiles_[i].averaged_profile->AddInfoString("split sizes", ss.str());
+    fragment_profiles_[id].averaged_profile->AddInfoString("split sizes", ss.str());
 
     if (VLOG_FILE_IS_ON) {
-      VLOG_FILE << "Byte split for fragment " << i << " " << ss.str();
-      for (int j = 0; j < fragment_instance_states_.size(); ++j) {
-        FragmentInstanceState* exec_state = fragment_instance_states_[j];
-        if (exec_state->fragment_idx() != i) continue;
+      VLOG_FILE << "Byte split for fragment " << id << " " << ss.str();
+      for (FragmentInstanceState* exec_state: fragment_instance_states_) {
+        if (exec_state->fragment_idx() != id) continue;
         VLOG_FILE << "data volume for ipaddress " << exec_state << ": "
                   << PrettyPrinter::Print(exec_state->total_split_size(), TUnit::BYTES);
       }
@@ -1260,7 +1369,7 @@ void Coordinator::InitExecProfile(const TQueryExecRequest& request) {
     for (int j = 0; j < plan.nodes.size(); ++j) {
       TPlanNodeExecSummary node;
       node.node_id = plan.nodes[j].node_id;
-      node.fragment_id = i;
+      node.fragment_idx = i;
       node.label = plan.nodes[j].label;
       node.__set_label_detail(plan.nodes[j].label_detail);
       node.num_children = plan.nodes[j].num_children;
@@ -1331,12 +1440,102 @@ void Coordinator::InitExecProfile(const TQueryExecRequest& request) {
   }
 }
 
+void Coordinator::MtInitExecSummary() {
+  const TQueryExecRequest& request = schedule_.request();
+  // init exec_summary_.{nodes, exch_to_sender_map}
+  exec_summary_.__isset.nodes = true;
+  DCHECK(exec_summary_.nodes.empty());
+  for (const TPlanExecInfo& plan_exec_info: request.mt_plan_exec_info) {
+    for (const TPlanFragment& fragment: plan_exec_info.fragments) {
+      if (!fragment.__isset.plan) continue;
+
+      // eventual index of fragment's root node in exec_summary_.nodes
+      int root_node_idx = exec_summary_.nodes.size();
+
+      const TPlan& plan = fragment.plan;
+      int num_instances =
+          schedule_.GetFragmentExecParams(fragment.idx).instance_exec_params.size();
+      for (const TPlanNode& node: plan.nodes) {
+        plan_node_id_to_summary_map_[node.node_id] = exec_summary_.nodes.size();
+        exec_summary_.nodes.emplace_back();
+        TPlanNodeExecSummary& node_summary = exec_summary_.nodes.back();
+        node_summary.__set_node_id(node.node_id);
+        node_summary.__set_fragment_idx(fragment.idx);
+        node_summary.__set_label(node.label);
+        node_summary.__set_label_detail(node.label_detail);
+        node_summary.__set_num_children(node.num_children);
+        if (node.__isset.estimated_stats) {
+          node_summary.__set_estimated_stats(node.estimated_stats);
+        }
+        node_summary.exec_stats.resize(num_instances);
+      }
+
+      if (fragment.__isset.output_sink
+          && fragment.output_sink.type == TDataSinkType::DATA_STREAM_SINK) {
+        const TDataStreamSink& sink = fragment.output_sink.stream_sink;
+        int exch_idx = plan_node_id_to_summary_map_[sink.dest_node_id];
+        if (sink.output_partition.type == TPartitionType::UNPARTITIONED) {
+          exec_summary_.nodes[exch_idx].__set_is_broadcast(true);
+        }
+        exec_summary_.__isset.exch_to_sender_map = true;
+        exec_summary_.exch_to_sender_map[exch_idx] = root_node_idx;
+      }
+    }
+  }
+}
+
+void Coordinator::MtInitExecProfiles() {
+  const TQueryExecRequest& request = schedule_.request();
+  vector<const TPlanFragment*> fragments;
+  schedule_.GetTPlanFragments(&fragments);
+  fragment_profiles_.resize(fragments.size());
+
+  // start with coordinator fragment, if there is one
+  const TPlanFragment* coord_fragment = schedule_.GetCoordFragment();
+  if (coord_fragment != NULL) {
+    DCHECK(executor_.get() != NULL);
+    PerFragmentProfileData* data = &fragment_profiles_[coord_fragment->idx];
+    data->num_instances = 1;
+    // TODO: fix this; this is not an averaged profile; we should follow the exact
+    // same structure we have for all other profiles (average + root + single
+    // instance profile)
+    data->averaged_profile = executor_->profile();
+
+    // register coordinator's fragment profile in the query profile now, before those
+    // of the fragment instances, so it shows up at the top
+    query_profile_->AddChild(executor_->profile());
+    executor_->profile()->set_name(Substitute("Coordinator Fragment $0",
+        coord_fragment->display_name));
+    CollectScanNodeCounters(executor_->profile(), &coordinator_counters_);
+  }
+
+  // Initialize the runtime profile structure. This adds the per fragment average
+  // profiles followed by the per fragment instance profiles.
+  for (const TPlanFragment* fragment: fragments) {
+    if (fragment == coord_fragment) continue;
+    PerFragmentProfileData* data = &fragment_profiles_[fragment->idx];
+    data->num_instances =
+        schedule_.GetFragmentExecParams(fragment->idx).instance_exec_params.size();
+
+    data->averaged_profile =
+        obj_pool()->Add(new RuntimeProfile(obj_pool(),
+          Substitute("Averaged Fragment $0", fragment->display_name), true));
+    query_profile_->AddChild(data->averaged_profile, true);
+    data->root_profile =
+        obj_pool()->Add(new RuntimeProfile(obj_pool(),
+          Substitute("Fragment $0", fragment->display_name)));
+    // Note: we don't start the wall timer here for the fragment profile;
+    // it's uninteresting and misleading.
+    query_profile_->AddChild(data->root_profile);
+  }
+}
+
+
 void Coordinator::CollectScanNodeCounters(RuntimeProfile* profile,
     FragmentInstanceCounters* counters) {
   vector<RuntimeProfile*> children;
   profile->GetAllChildren(&children);
-  for (int i = 0; i < children.size(); ++i) {
-    RuntimeProfile* p = children[i];
+  for (RuntimeProfile* p: children) {
     PlanNodeId id = ExecNode::GetNodeIdFromProfile(p);
 
     // This profile is not for an exec node.
@@ -1355,28 +1554,87 @@ void Coordinator::CollectScanNodeCounters(RuntimeProfile* profile,
   }
 }
 
-void Coordinator::ExecRemoteFragment(const FragmentExecParams* fragment_exec_params,
-    const TPlanFragment* plan_fragment, DebugOptions* debug_options,
-    QuerySchedule* schedule, int instance_state_idx, int fragment_idx,
+void Coordinator::MtExecRemoteFInstance(
+    const FInstanceExecParams& exec_params, const DebugOptions* debug_options) {
+  NotifyBarrierOnExit notifier(exec_complete_barrier_.get());
+  TExecPlanFragmentParams rpc_params;
+  MtSetExecPlanFragmentParams(exec_params, &rpc_params);
+  if (debug_options != NULL) {
+    rpc_params.fragment_instance_ctx.__set_debug_node_id(debug_options->node_id);
+    rpc_params.fragment_instance_ctx.__set_debug_action(debug_options->action);
+    rpc_params.fragment_instance_ctx.__set_debug_phase(debug_options->phase);
+  }
+  int instance_state_idx = GetInstanceIdx(exec_params.instance_id);
+  FragmentInstanceState* exec_state = fragment_instance_states_[instance_state_idx];
+  exec_state->ComputeTotalSplitSize(
+      rpc_params.fragment_instance_ctx.per_node_scan_ranges);
+  VLOG_FILE << "making rpc: ExecPlanFragment"
+      << " host=" << exec_state->impalad_address()
+      << " instance_id=" << PrintId(exec_state->fragment_instance_id());
+
+  // Guard against concurrent UpdateExecStatus() that may arrive after RPC returns.
+  lock_guard<mutex> l(*exec_state->lock());
+  int64_t start = MonotonicMillis();
+
+  Status client_connect_status;
+  ImpalaBackendConnection backend_client(exec_env_->impalad_client_cache(),
+      exec_state->impalad_address(), &client_connect_status);
+  if (!client_connect_status.ok()) {
+    exec_state->SetInitialStatus(client_connect_status);
+    return;
+  }
+
+  TExecPlanFragmentResult thrift_result;
+  Status rpc_status = backend_client.DoRpc(&ImpalaBackendClient::ExecPlanFragment,
+      rpc_params, &thrift_result);
+  exec_state->set_rpc_latency(MonotonicMillis() - start);
+
+  const string ERR_TEMPLATE = "ExecPlanRequest rpc query_id=$0 instance_id=$1 failed: $2";
+
+  if (!rpc_status.ok()) {
+    const string& err_msg = Substitute(ERR_TEMPLATE, PrintId(query_id()),
+        PrintId(exec_state->fragment_instance_id()), rpc_status.msg().msg());
+    VLOG_QUERY << err_msg;
+    exec_state->SetInitialStatus(Status(err_msg));
+    return;
+  }
+
+  Status exec_status = Status(thrift_result.status);
+  if (!exec_status.ok()) {
+    const string& err_msg = Substitute(ERR_TEMPLATE, PrintId(query_id()),
+        PrintId(exec_state->fragment_instance_id()),
+        exec_status.msg().GetFullMessageDetails());
+    VLOG_QUERY << err_msg;
+    exec_state->SetInitialStatus(Status(err_msg));
+    return;
+  }
+
+  exec_state->SetInitialStatus(Status::OK());
+  VLOG_FILE << "rpc succeeded: ExecPlanFragment"
+      << " instance_id=" << PrintId(exec_state->fragment_instance_id());
+}
+
+void Coordinator::ExecRemoteFragment(const FragmentExecParams& fragment_exec_params,
+    const TPlanFragment& plan_fragment, DebugOptions* debug_options,
     int fragment_instance_idx) {
   NotifyBarrierOnExit notifier(exec_complete_barrier_.get());
   TExecPlanFragmentParams rpc_params;
-  SetExecPlanFragmentParams(*schedule, *plan_fragment, *fragment_exec_params,
-      instance_state_idx, fragment_idx, fragment_instance_idx,
-      MakeNetworkAddress(FLAGS_hostname, FLAGS_be_port), &rpc_params);
+  SetExecPlanFragmentParams(
+      plan_fragment, fragment_exec_params, fragment_instance_idx, &rpc_params);
   if (debug_options != NULL) {
     rpc_params.fragment_instance_ctx.__set_debug_node_id(debug_options->node_id);
     rpc_params.fragment_instance_ctx.__set_debug_action(debug_options->action);
     rpc_params.fragment_instance_ctx.__set_debug_phase(debug_options->phase);
   }
   FragmentInstanceState* exec_state = obj_pool()->Add(
-      new FragmentInstanceState(fragment_idx, fragment_exec_params, fragment_instance_idx,
-          obj_pool()));
+      new FragmentInstanceState(
+        plan_fragment.idx, fragment_exec_params, fragment_instance_idx, obj_pool()));
   exec_state->ComputeTotalSplitSize(
       rpc_params.fragment_instance_ctx.per_node_scan_ranges);
+  int instance_state_idx = GetInstanceIdx(exec_state->fragment_instance_id());
   fragment_instance_states_[instance_state_idx] = exec_state;
-  VLOG_FILE << "making rpc: ExecPlanFragment query_id=" << query_id_
-            << " instance_id=" << exec_state->fragment_instance_id()
+  VLOG_FILE << "making rpc: ExecPlanFragment"
+            << " instance_id=" << PrintId(exec_state->fragment_instance_id())
             << " host=" << exec_state->impalad_address();
 
   // Guard against concurrent UpdateExecStatus() that may arrive after RPC returns.
@@ -1395,13 +1653,14 @@ void Coordinator::ExecRemoteFragment(const FragmentExecParams* fragment_exec_par
   Status rpc_status = backend_client.DoRpc(&ImpalaBackendClient::ExecPlanFragment,
       rpc_params, &thrift_result);
 
-  exec_state->SetRpcLatency(MonotonicMillis() - start);
+  exec_state->set_rpc_latency(MonotonicMillis() - start);
 
-  const string ERR_TEMPLATE = "ExecPlanRequest rpc query_id=$0 instance_id=$1 failed: $2";
+  const string ERR_TEMPLATE = "ExecPlanRequest rpc instance_id=$0 failed: $1";
 
   if (!rpc_status.ok()) {
-    const string& err_msg = Substitute(ERR_TEMPLATE, PrintId(query_id()),
-        PrintId(exec_state->fragment_instance_id()), rpc_status.msg().msg());
+    const string& err_msg =
+        Substitute(ERR_TEMPLATE, PrintId(exec_state->fragment_instance_id()),
+          rpc_status.msg().msg());
     VLOG_QUERY << err_msg;
     exec_state->SetInitialStatus(Status(err_msg));
     return;
@@ -1409,9 +1668,9 @@ void Coordinator::ExecRemoteFragment(const FragmentExecParams* fragment_exec_par
 
   Status exec_plan_status = Status(thrift_result.status);
   if (!exec_plan_status.ok()) {
-    const string& err_msg = Substitute(ERR_TEMPLATE, PrintId(query_id()),
-        PrintId(exec_state->fragment_instance_id()),
-        exec_plan_status.msg().GetFullMessageDetails());
+    const string& err_msg =
+        Substitute(ERR_TEMPLATE, PrintId(exec_state->fragment_instance_id()),
+          exec_plan_status.msg().GetFullMessageDetails());
     VLOG_QUERY << err_msg;
     exec_state->SetInitialStatus(Status(err_msg));
     return;
@@ -1444,8 +1703,9 @@ void Coordinator::CancelInternal() {
 }
 
 void Coordinator::CancelRemoteFragments() {
-  for (int i = 0; i < fragment_instance_states_.size(); ++i) {
-    FragmentInstanceState* exec_state = fragment_instance_states_[i];
+  for (FragmentInstanceState* exec_state: fragment_instance_states_) {
+    DCHECK(exec_state != nullptr);
+    if (exec_state->fragment_idx() == 0) continue;  // the coord fragment
 
     // If a fragment failed before we finished issuing all remote fragments,
     // this function will have been called before we finished populating
@@ -1467,7 +1727,7 @@ void Coordinator::CancelRemoteFragments() {
     if (exec_state->done()) continue;
 
     // set an error status to make sure we only cancel this once
-    exec_state->SetStatus(Status::CANCELLED);
+    exec_state->set_status(Status::CANCELLED);
 
     // if we get an error while trying to get a connection to the backend,
     // keep going
@@ -1507,14 +1767,15 @@ void Coordinator::CancelRemoteFragments() {
   }
 
   // notify that we completed with an error
-  backend_completion_cv_.notify_all();
+  instance_completion_cv_.notify_all();
 }
 
 Status Coordinator::UpdateFragmentExecStatus(const TReportExecStatusParams& params) {
-  VLOG_FILE << "UpdateFragmentExecStatus() query_id=" << query_id_
+  VLOG_FILE << "UpdateFragmentExecStatus() "
+            << " instance=" << PrintId(params.fragment_instance_id)
             << " status=" << params.status.status_code
             << " done=" << (params.done ? "true" : "false");
-  uint32_t instance_state_idx = params.instance_state_idx;
+  int instance_state_idx = GetInstanceIdx(params.fragment_instance_id);
   if (instance_state_idx >= fragment_instance_states_.size()) {
     return Status(TErrorCode::INTERNAL_ERROR,
         Substitute("Unknown fragment instance index $0 (max known: $1)",
@@ -1531,15 +1792,15 @@ Status Coordinator::UpdateFragmentExecStatus(const TReportExecStatusParams& para
       // process a non-error message from a fragment executor that is sent
       // before query cancellation is invoked. Make sure we don't go from error status to
       // OK.
-      exec_state->SetStatus(status);
+      exec_state->set_status(status);
     }
-    exec_state->SetDone(params.done);
+    exec_state->set_done(params.done);
     if (exec_state->status()->ok()) {
       // We can't update this backend's profile if ReportQuerySummary() is running,
       // because it depends on all profiles not changing during its execution (when it
       // calls SortChildren()). ReportQuerySummary() only gets called after
-      // WaitForAllBackends() returns or at the end of CancelRemoteFragments().
-      // WaitForAllBackends() only returns after all backends have completed (in which
+      // WaitForAllInstances() returns or at the end of CancelRemoteFragments().
+      // WaitForAllInstances() only returns after all backends have completed (in which
       // case we wouldn't be in this function), or when there's an error, in which case
       // CancelRemoteFragments() is called. CancelRemoteFragments sets all exec_state's
       // statuses to cancelled.
@@ -1550,8 +1811,7 @@ Status Coordinator::UpdateFragmentExecStatus(const TReportExecStatusParams& para
       // Update the average profile for the fragment corresponding to this instance.
       exec_state->profile()->ComputeTimeInProfile();
       UpdateAverageProfile(exec_state);
-      UpdateExecSummary(exec_state->fragment_idx(), exec_state->instance_idx(),
-          exec_state->profile());
+      UpdateExecSummary(*exec_state);
     }
     if (!exec_state->SetProfileCreated()) {
       CollectScanNodeCounters(exec_state->profile(), exec_state->aggregate_counters());
@@ -1615,16 +1875,14 @@ Status Coordinator::UpdateFragmentExecStatus(const TReportExecStatusParams& para
     lock_guard<mutex> l(lock_);
     exec_state->stopwatch()->Stop();
     DCHECK_GT(num_remaining_fragment_instances_, 0);
-    VLOG_QUERY << "Fragment instance " << params.instance_state_idx << "("
-               << exec_state->fragment_instance_id() << ") on host "
-               << exec_state->impalad_address() << " completed, "
-               << num_remaining_fragment_instances_ - 1 << " remaining: query_id="
-               << query_id_;
+    VLOG_QUERY << "Fragment instance completed: "
+        << " id=" << PrintId(exec_state->fragment_instance_id())
+        << " host=" << exec_state->impalad_address()
+        << " remaining=" << num_remaining_fragment_instances_ - 1;
     if (VLOG_QUERY_IS_ON && num_remaining_fragment_instances_ > 1) {
       // print host/port info for the first backend that's still in progress as a
       // debugging aid for backend deadlocks
-      for (int i = 0; i < fragment_instance_states_.size(); ++i) {
-        FragmentInstanceState* exec_state = fragment_instance_states_[i];
+      for (FragmentInstanceState* exec_state: fragment_instance_states_) {
         lock_guard<mutex> l2(*exec_state->lock());
         if (!exec_state->done()) {
           VLOG_QUERY << "query_id=" << query_id_ << ": first in-progress backend: "
@@ -1634,7 +1892,7 @@ Status Coordinator::UpdateFragmentExecStatus(const TReportExecStatusParams& para
       }
     }
     if (--num_remaining_fragment_instances_ == 0) {
-      backend_completion_cv_.notify_all();
+      instance_completion_cv_.notify_all();
     }
   }
 
@@ -1666,7 +1924,7 @@ bool Coordinator::PrepareCatalogUpdate(TUpdateCatalogRequest* catalog_update) {
   return catalog_update->created_partitions.size() != 0;
 }
 
-// Comparator to order fragments by descending total time
+// Comparator to order RuntimeProfiles by descending total time
 typedef struct {
   typedef pair<RuntimeProfile*, bool> Profile;
   bool operator()(const Profile& a, const Profile& b) const {
@@ -1676,56 +1934,58 @@ typedef struct {
   }
 } InstanceComparator;
 
-// Update fragment average profile information from a backend execution state.
-void Coordinator::UpdateAverageProfile(FragmentInstanceState* fragment_instance_state) {
-  int fragment_idx = fragment_instance_state->fragment_idx();
+void Coordinator::UpdateAverageProfile(FragmentInstanceState* instance_state) {
+  FragmentIdx fragment_idx = instance_state->fragment_idx();
   DCHECK_GE(fragment_idx, 0);
   DCHECK_LT(fragment_idx, fragment_profiles_.size());
-  PerFragmentProfileData& data = fragment_profiles_[fragment_idx];
+  PerFragmentProfileData* data = &fragment_profiles_[fragment_idx];
 
   // No locks are taken since UpdateAverage() and AddChild() take their own locks
-  data.averaged_profile->UpdateAverage(fragment_instance_state->profile());
-  data.root_profile->AddChild(fragment_instance_state->profile());
+  data->averaged_profile->UpdateAverage(instance_state->profile());
+  data->root_profile->AddChild(instance_state->profile());
 }
 
-// Compute fragment summary information from a backend execution state.
-void Coordinator::ComputeFragmentSummaryStats(
-    FragmentInstanceState* fragment_instance_state) {
-  int fragment_idx = fragment_instance_state->fragment_idx();
+void Coordinator::ComputeFragmentSummaryStats(FragmentInstanceState* instance_state) {
+  FragmentIdx fragment_idx = instance_state->fragment_idx();
   DCHECK_GE(fragment_idx, 0);
   DCHECK_LT(fragment_idx, fragment_profiles_.size());
-  PerFragmentProfileData& data = fragment_profiles_[fragment_idx];
+  PerFragmentProfileData* data = &fragment_profiles_[fragment_idx];
 
-  int64_t completion_time = fragment_instance_state->stopwatch()->ElapsedTime();
-  data.completion_times(completion_time);
-  data.rates(fragment_instance_state->total_split_size() / (completion_time / 1000.0
-    / 1000.0 / 1000.0));
+  int64_t completion_time = instance_state->stopwatch()->ElapsedTime();
+  data->completion_times(completion_time);
+  data->rates(instance_state->total_split_size()
+      / (completion_time / 1000.0 / 1000.0 / 1000.0));
 
   // Add the child in case it has not been added previously
   // via UpdateAverageProfile(). AddChild() will do nothing if the child
   // already exists.
-  data.root_profile->AddChild(fragment_instance_state->profile());
+  data->root_profile->AddChild(instance_state->profile());
 }
 
-void Coordinator::UpdateExecSummary(int fragment_idx, int instance_idx,
-    RuntimeProfile* profile) {
+void Coordinator::UpdateExecSummary(const FragmentInstanceState& instance_state) {
   vector<RuntimeProfile*> children;
-  profile->GetAllChildren(&children);
+  instance_state.profile()->GetAllChildren(&children);
 
   lock_guard<SpinLock> l(exec_summary_lock_);
   for (int i = 0; i < children.size(); ++i) {
-    int id = ExecNode::GetNodeIdFromProfile(children[i]);
-    if (id == -1) continue;
+    int node_id = ExecNode::GetNodeIdFromProfile(children[i]);
+    if (node_id == -1) continue;
 
     TPlanNodeExecSummary& exec_summary =
-        exec_summary_.nodes[plan_node_id_to_summary_map_[id]];
+        exec_summary_.nodes[plan_node_id_to_summary_map_[node_id]];
     if (exec_summary.exec_stats.empty()) {
       // First time, make an exec_stats for each instance this plan node is running on.
-      DCHECK_LT(fragment_idx, fragment_profiles_.size());
-      exec_summary.exec_stats.resize(fragment_profiles_[fragment_idx].num_instances);
+      // TODO-MT: remove this and initialize all runtime state prior to starting
+      // instances
+      DCHECK_LT(instance_state.fragment_idx(), fragment_profiles_.size());
+      exec_summary.exec_stats.resize(
+          fragment_profiles_[instance_state.fragment_idx()].num_instances);
     }
-    DCHECK_LT(instance_idx, exec_summary.exec_stats.size());
-    TExecStats& stats = exec_summary.exec_stats[instance_idx];
+    DCHECK_LT(instance_state.per_fragment_instance_idx(), exec_summary.exec_stats.size());
+    DCHECK_EQ(fragment_profiles_[instance_state.fragment_idx()].num_instances,
+        exec_summary.exec_stats.size());
+    TExecStats& stats =
+        exec_summary.exec_stats[instance_state.per_fragment_instance_idx()];
 
     RuntimeProfile::Counter* rows_counter = children[i]->GetCounter("RowsReturned");
     RuntimeProfile::Counter* mem_counter = children[i]->GetCounter("PeakMemoryUsage");
@@ -1750,22 +2010,20 @@ void Coordinator::ReportQuerySummary() {
   // the query has made so little progress, reporting a summary is not very useful.
   if (!has_called_wait_) return;
 
-  // The fragment has finished executing.  Update the profile to compute the
-  // fraction of time spent in each node.
-  if (executor_.get() != NULL) {
-    executor_->profile()->ComputeTimeInProfile();
-    UpdateExecSummary(0, 0, executor_->profile());
-  }
-
   if (!fragment_instance_states_.empty()) {
     // Average all remote fragments for each fragment.
-    for (int i = 0; i < fragment_instance_states_.size(); ++i) {
-      fragment_instance_states_[i]->profile()->ComputeTimeInProfile();
-      UpdateAverageProfile(fragment_instance_states_[i]);
-      ComputeFragmentSummaryStats(fragment_instance_states_[i]);
-      UpdateExecSummary(fragment_instance_states_[i]->fragment_idx(),
-          fragment_instance_states_[i]->instance_idx(),
-          fragment_instance_states_[i]->profile());
+    for (FragmentInstanceState* state: fragment_instance_states_) {
+      // TODO: make profiles uniform across all fragments so we don't have
+      // to keep special-casing the coord fragment
+      if (state->fragment_idx() == 0) {
+        state->profile()->ComputeTimeInProfile();
+        UpdateExecSummary(*state);
+      } else {
+        state->profile()->ComputeTimeInProfile();
+        UpdateAverageProfile(state);
+        ComputeFragmentSummaryStats(state);
+        UpdateExecSummary(*state);
+      }
     }
 
     InstanceComparator comparator;
@@ -1809,26 +2067,14 @@ void Coordinator::ReportQuerySummary() {
     // Map from Impalad address to peak memory usage of this query
     typedef unordered_map<TNetworkAddress, int64_t> PerNodePeakMemoryUsage;
     PerNodePeakMemoryUsage per_node_peak_mem_usage;
-    if (executor_.get() != NULL) {
-      // Coordinator fragment is not included in fragment_instance_states_.
-      RuntimeProfile::Counter* mem_usage_counter =
-          executor_->profile()->GetCounter(
-              PlanFragmentExecutor::PER_HOST_PEAK_MEM_COUNTER);
-      if (mem_usage_counter != NULL) {
-        TNetworkAddress coord = MakeNetworkAddress(FLAGS_hostname, FLAGS_be_port);
-        per_node_peak_mem_usage[coord] = mem_usage_counter->value();
-      }
-    }
-    for (int i = 0; i < fragment_instance_states_.size(); ++i) {
+    for (FragmentInstanceState* state: fragment_instance_states_) {
       int64_t initial_usage = 0;
       int64_t* mem_usage = FindOrInsert(&per_node_peak_mem_usage,
-          fragment_instance_states_[i]->impalad_address(), initial_usage);
+          state->impalad_address(), initial_usage);
       RuntimeProfile::Counter* mem_usage_counter =
-          fragment_instance_states_[i]->profile()->GetCounter(
-              PlanFragmentExecutor::PER_HOST_PEAK_MEM_COUNTER);
+          state->profile()->GetCounter(PlanFragmentExecutor::PER_HOST_PEAK_MEM_COUNTER);
       if (mem_usage_counter != NULL && mem_usage_counter->value() > *mem_usage) {
-        per_node_peak_mem_usage[fragment_instance_states_[i]->impalad_address()] =
-            mem_usage_counter->value();
+        per_node_peak_mem_usage[state->impalad_address()] = mem_usage_counter->value();
       }
     }
     stringstream info;
@@ -1844,25 +2090,49 @@ string Coordinator::GetErrorLog() {
   ErrorLogMap merged;
   {
     lock_guard<mutex> l(lock_);
+    // TODO-MT: use FragmentInstanceState::error_log_ instead
+    // as part of getting rid of the special-casing of the coordinator instance
     if (executor_.get() != NULL && executor_->runtime_state() != NULL) {
       ErrorLogMap runtime_error_log;
       executor_->runtime_state()->GetErrors(&runtime_error_log);
       MergeErrorMaps(&merged, runtime_error_log);
     }
   }
-  for (int i = 0; i < fragment_instance_states_.size(); ++i) {
-    lock_guard<mutex> l(*fragment_instance_states_[i]->lock());
-    if (fragment_instance_states_[i]->error_log()->size() > 0) {
-      MergeErrorMaps(&merged, *fragment_instance_states_[i]->error_log());
-    }
+  for (FragmentInstanceState* state: fragment_instance_states_) {
+    lock_guard<mutex> l(*state->lock());
+    if (state->error_log()->size() > 0)  MergeErrorMaps(&merged, *state->error_log());
   }
   return PrintErrorMapToString(merged);
 }
 
-void Coordinator::SetExecPlanFragmentParams(QuerySchedule& schedule,
+void Coordinator::MtSetExecPlanFragmentParams(
+    const FInstanceExecParams& params, TExecPlanFragmentParams* rpc_params) {
+  rpc_params->__set_protocol_version(ImpalaInternalServiceVersion::V1);
+  rpc_params->__set_query_ctx(query_ctx_);
+
+  TPlanFragmentCtx fragment_ctx;
+  TPlanFragmentInstanceCtx fragment_instance_ctx;
+
+  fragment_ctx.__set_fragment(params.fragment());
+  // TODO: Remove filters that weren't selected during filter routing table construction.
+  SetExecPlanDescriptorTable(params.fragment(), rpc_params);
+
+  fragment_instance_ctx.__set_request_pool(schedule_.request_pool());
+  fragment_instance_ctx.__set_per_node_scan_ranges(params.per_node_scan_ranges);
+  fragment_instance_ctx.__set_per_exch_num_senders(
+      params.fragment_exec_params.per_exch_num_senders);
+  fragment_instance_ctx.__set_destinations(
+      params.fragment_exec_params.destinations);
+  fragment_instance_ctx.__set_sender_id(params.sender_id);
+  fragment_instance_ctx.fragment_instance_id = params.instance_id;
+  fragment_instance_ctx.per_fragment_instance_idx = params.per_fragment_instance_idx;
+  rpc_params->__set_fragment_ctx(fragment_ctx);
+  rpc_params->__set_fragment_instance_ctx(fragment_instance_ctx);
+}
+
+void Coordinator::SetExecPlanFragmentParams(
     const TPlanFragment& fragment, const FragmentExecParams& params,
-    int instance_state_idx, int fragment_idx, int fragment_instance_idx,
-    const TNetworkAddress& coord, TExecPlanFragmentParams* rpc_params) {
+    int per_fragment_instance_idx, TExecPlanFragmentParams* rpc_params) {
   rpc_params->__set_protocol_version(ImpalaInternalServiceVersion::V1);
   rpc_params->__set_query_ctx(query_ctx_);
 
@@ -1870,6 +2140,7 @@ void Coordinator::SetExecPlanFragmentParams(QuerySchedule& schedule,
   TPlanFragmentInstanceCtx fragment_instance_ctx;
 
   fragment_ctx.__set_fragment(fragment);
+  int instance_state_idx = GetInstanceIdx(params.instance_ids[per_fragment_instance_idx]);
   // Remove filters that weren't selected during filter routing table construction.
   if (filter_mode_ != TRuntimeFilterMode::OFF) {
     for (TPlanNode& plan_node: rpc_params->fragment_ctx.fragment.plan.nodes) {
@@ -1879,10 +2150,10 @@ void Coordinator::SetExecPlanFragmentParams(QuerySchedule& schedule,
           FilterRoutingTable::iterator filter_it =
               filter_routing_table_.find(desc.filter_id);
           if (filter_it == filter_routing_table_.end()) continue;
-          FilterState* f = &filter_it->second;
+          const FilterState& f = filter_it->second;
           if (plan_node.__isset.hash_join_node) {
-            if (f->src_fragment_instance_idxs()->find(instance_state_idx) ==
-                f->src_fragment_instance_idxs()->end()) {
+            if (f.src_fragment_instance_state_idxs().find(instance_state_idx) ==
+                f.src_fragment_instance_state_idxs().end()) {
               DCHECK(desc.is_broadcast_join);
               continue;
             }
@@ -1898,22 +2169,22 @@ void Coordinator::SetExecPlanFragmentParams(QuerySchedule& schedule,
   }
   SetExecPlanDescriptorTable(fragment, rpc_params);
 
-  TNetworkAddress exec_host = params.hosts[fragment_instance_idx];
+  TNetworkAddress exec_host = params.hosts[per_fragment_instance_idx];
   FragmentScanRangeAssignment::const_iterator it =
       params.scan_range_assignment.find(exec_host);
   // Scan ranges may not always be set, so use an empty structure if so.
   const PerNodeScanRanges& scan_ranges =
       (it != params.scan_range_assignment.end()) ? it->second : PerNodeScanRanges();
 
-  fragment_ctx.num_fragment_instances = params.instance_ids.size();
-  fragment_instance_ctx.__set_request_pool(schedule.request_pool());
+  fragment_instance_ctx.__set_request_pool(schedule_.request_pool());
   fragment_instance_ctx.__set_per_node_scan_ranges(scan_ranges);
   fragment_instance_ctx.__set_per_exch_num_senders(params.per_exch_num_senders);
   fragment_instance_ctx.__set_destinations(params.destinations);
-  fragment_instance_ctx.__set_sender_id(params.sender_id_base + fragment_instance_idx);
-  fragment_instance_ctx.fragment_instance_id = params.instance_ids[fragment_instance_idx];
-  fragment_instance_ctx.fragment_instance_idx = fragment_instance_idx;
-  fragment_instance_ctx.instance_state_idx = instance_state_idx;
+  fragment_instance_ctx.__set_sender_id(
+      params.sender_id_base + per_fragment_instance_idx);
+  fragment_instance_ctx.fragment_instance_id =
+      params.instance_ids[per_fragment_instance_idx];
+  fragment_instance_ctx.per_fragment_instance_idx = per_fragment_instance_idx;
   rpc_params->__set_fragment_ctx(fragment_ctx);
   rpc_params->__set_fragment_instance_ctx(fragment_instance_ctx);
 }
@@ -1987,10 +2258,16 @@ void Coordinator::SetExecPlanDescriptorTable(const TPlanFragment& fragment,
 
   rpc_params->query_ctx.__set_desc_tbl(thrift_desc_tbl);
 }
+
 namespace {
 
-void DistributeFilters(shared_ptr<TPublishFilterParams> params, TNetworkAddress impalad,
-    TUniqueId fragment_instance_id) {
+// Make a PublishFilter rpc to 'impalad' for given fragment_instance_id
+// and params.
+// This takes by-value parameters because we cannot guarantee that the originating
+// coordinator won't be destroyed while this executes.
+// TODO: switch to references when we fix the lifecycle problems of coordinators.
+void DistributeFilters(shared_ptr<TPublishFilterParams> params,
+    TNetworkAddress impalad, TUniqueId fragment_instance_id) {
   Status status;
   ImpalaBackendConnection backend_client(
       ExecEnv::GetInstance()->impalad_client_cache(), impalad, &status);
@@ -2026,7 +2303,7 @@ void Coordinator::UpdateFilter(const TUpdateFilterParams& params) {
 
   // Make a 'master' copy that will be shared by all concurrent delivery RPC attempts.
   shared_ptr<TPublishFilterParams> rpc_params(new TPublishFilterParams());
-  unordered_set<int32_t> target_fragment_instance_idxs;
+  unordered_set<int> target_fragment_instance_state_idxs;
   {
     lock_guard<SpinLock> l(filter_lock_);
     FilterRoutingTable::iterator it = filter_routing_table_.find(params.filter_id);
@@ -2060,12 +2337,13 @@ void Coordinator::UpdateFilter(const TUpdateFilterParams& params) {
 
     // No more updates are pending on this filter ID. Create a distribution payload and
     // offer it to the queue.
-    for (FilterTarget target: *state->targets()) {
+    for (const FilterTarget& target: *state->targets()) {
       // Don't publish the filter to targets that are in the same fragment as the join
       // that produced it.
       if (target.is_local) continue;
-      target_fragment_instance_idxs.insert(target.fragment_instance_idxs.begin(),
-          target.fragment_instance_idxs.end());
+      target_fragment_instance_state_idxs.insert(
+          target.fragment_instance_state_idxs.begin(),
+          target.fragment_instance_state_idxs.end());
     }
 
     // Assign outgoing bloom filter.
@@ -2088,11 +2366,15 @@ void Coordinator::UpdateFilter(const TUpdateFilterParams& params) {
 
   rpc_params->filter_id = params.filter_id;
 
-  for (int32_t target_idx: target_fragment_instance_idxs) {
+  for (int target_idx: target_fragment_instance_state_idxs) {
     FragmentInstanceState* fragment_inst = fragment_instance_states_[target_idx];
     DCHECK(fragment_inst != NULL) << "Missing fragment instance: " << target_idx;
     exec_env_->rpc_pool()->Offer(bind<void>(DistributeFilters, rpc_params,
         fragment_inst->impalad_address(), fragment_inst->fragment_instance_id()));
+        // TODO: switch back to the following once we fixed the lifecycle
+        // problems of Coordinator
+        //std::cref(fragment_inst->impalad_address()),
+        //std::cref(fragment_inst->fragment_instance_id())));
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/be/src/runtime/coordinator.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h
index 617ccb9..bb67377 100644
--- a/be/src/runtime/coordinator.h
+++ b/be/src/runtime/coordinator.h
@@ -66,6 +66,7 @@ class TPlanExecRequest;
 class TRuntimeProfileTree;
 class RuntimeProfile;
 class TablePrinter;
+class TPlanFragment;
 
 struct DebugOptions;
 
@@ -90,9 +91,16 @@ struct DebugOptions;
 //
 /// The implementation ensures that setting an overall error status and initiating
 /// cancellation of local and all remote fragments is atomic.
+///
+/// TODO: move into separate subdirectory and move nested classes into separate files
+/// and unnest them
+///
+/// TODO: remove all data structures and functions that are superceded by their
+/// multi-threaded counterpart and remove the "Mt" prefix with which the latter
+/// is currently marked
 class Coordinator {
  public:
-  Coordinator(const TQueryOptions& query_options, ExecEnv* exec_env,
+  Coordinator(const QuerySchedule& schedule, ExecEnv* exec_env,
       RuntimeProfile::EventSequence* events);
   ~Coordinator();
 
@@ -103,7 +111,7 @@ class Coordinator {
   /// Populates and prepares output_expr_ctxs from the coordinator's fragment if there is
   /// one, and LLVM optimizes them together with the fragment's other exprs.
   /// A call to Exec() must precede all other member function calls.
-  Status Exec(QuerySchedule& schedule, std::vector<ExprContext*>* output_expr_ctxs);
+  Status Exec(std::vector<ExprContext*>* output_expr_ctxs);
 
   /// Blocks until result rows are ready to be retrieved via GetNext(), or, if the
   /// query doesn't return rows, until the query finishes or is cancelled.
@@ -207,6 +215,7 @@ class Coordinator {
       boost::accumulators::tag::variance>
   > SummaryStats;
 
+  const QuerySchedule schedule_;
   ExecEnv* exec_env_;
   TUniqueId query_id_;
 
@@ -229,7 +238,9 @@ class Coordinator {
     CounterMap scan_ranges_complete_counters;
   };
 
-  /// FragmentInstanceState owned by obj_pool()
+  /// FragmentInstanceStates for all fragment instances, including that of the coordinator
+  /// fragment. All elements are non-nullptr. Owned by obj_pool(). Filled in
+  /// PrepareCoordFragment() and StartRemoteFragments()/MtStartRemoteFInstances().
   std::vector<FragmentInstanceState*> fragment_instance_states_;
 
   /// True if the query needs a post-execution step to tidy up
@@ -249,6 +260,9 @@ class Coordinator {
   /// Protects all fields below. This is held while making RPCs, so this lock should
   /// only be acquired if the acquiring thread is prepared to wait for a significant
   /// time.
+  /// Lock ordering is
+  /// 1. lock_
+  /// 2. FragmentInstanceState::lock_
   boost::mutex lock_;
 
   /// Overall status of the entire query; set to the first reported fragment error
@@ -286,10 +300,10 @@ class Coordinator {
   /// Number of remote fragments that have completed
   int num_remote_fragements_complete_;
 
-  /// If there is no coordinator fragment, Wait simply waits until all
-  /// backends report completion by notifying on backend_completion_cv_.
+  /// If there is no coordinator fragment, Wait() simply waits until all
+  /// backends report completion by notifying on instance_completion_cv_.
   /// Tied to lock_.
-  boost::condition_variable backend_completion_cv_;
+  boost::condition_variable instance_completion_cv_;
 
   /// Count of the number of backends for which done != true. When this
   /// hits 0, any Wait()'ing thread is notified
@@ -349,9 +363,12 @@ class Coordinator {
 
     /// Execution rates for instances of this fragment
     SummaryStats rates;
+
+    PerFragmentProfileData()
+      : averaged_profile(nullptr), num_instances(-1), root_profile(nullptr) {}
   };
 
-  /// This is indexed by fragment_idx.
+  /// This is indexed by fragment idx (TPlanFragment.idx).
   /// This array is only modified at coordinator startup and query completion and
   /// does not need locks.
   std::vector<PerFragmentProfileData> fragment_profiles_;
@@ -374,7 +391,9 @@ class Coordinator {
     TPlanNodeId node_id;
     bool is_local;
     bool is_bound_by_partition_columns;
-    boost::unordered_set<int> fragment_instance_idxs;
+
+    // indices into fragment_instance_states_
+    boost::unordered_set<int> fragment_instance_state_idxs;
 
     FilterTarget(const TRuntimeFilterTargetDesc& tFilterTarget) {
       node_id = tFilterTarget.node_id;
@@ -415,24 +434,22 @@ class Coordinator {
   void MarkFilterRoutingTableComplete();
 
   /// Fill in rpc_params based on parameters.
-  /// 'instance_state_idx' is the index of the fragment instance state in
-  /// fragment_instance_states_.
-  /// 'fragment_idx' is the 0-based query-wide ordinal of the fragment of which it is an
-  /// instance.
-  /// 'fragment_instance_idx' is the 0-based ordinal of this particular fragment
+  /// 'per_fragment_instance_idx' is the 0-based ordinal of this particular fragment
   /// instance within its fragment.
-  void SetExecPlanFragmentParams(QuerySchedule& schedule, const TPlanFragment& fragment,
-      const FragmentExecParams& params, int instance_state_idx, int fragment_idx,
-      int fragment_instance_idx, const TNetworkAddress& coord,
+  void SetExecPlanFragmentParams(const TPlanFragment& fragment,
+      const FragmentExecParams& params, int per_fragment_instance_idx,
       TExecPlanFragmentParams* rpc_params);
+  void MtSetExecPlanFragmentParams(
+      const FInstanceExecParams& params, TExecPlanFragmentParams* rpc_params);
 
   /// Wrapper for ExecPlanFragment() RPC. This function will be called in parallel from
   /// multiple threads. Creates a new FragmentInstanceState and registers it in
   /// fragment_instance_states_, then calls RPC to issue fragment on remote impalad.
-  void ExecRemoteFragment(const FragmentExecParams* fragment_exec_params,
-      const TPlanFragment* plan_fragment, DebugOptions* debug_options,
-      QuerySchedule* schedule, int instance_state_idx, int fragment_idx,
+  void ExecRemoteFragment(const FragmentExecParams& fragment_exec_params,
+      const TPlanFragment& plan_fragment, DebugOptions* debug_options,
       int fragment_instance_idx);
+  void MtExecRemoteFInstance(
+      const FInstanceExecParams& exec_params, const DebugOptions* debug_options);
 
   /// Determine fragment number, given fragment id.
   int GetFragmentNum(const TUniqueId& fragment_id);
@@ -441,21 +458,10 @@ class Coordinator {
   /// Attaches split size summary to the appropriate runtime profile
   void PrintFragmentInstanceInfo();
 
-  /// Create aggregate counters for all scan nodes in any of the fragments
-  void CreateAggregateCounters(const std::vector<TPlanFragment>& fragments);
-
   /// Collect scan node counters from the profile.
   /// Assumes lock protecting profile and result is held.
   void CollectScanNodeCounters(RuntimeProfile*, FragmentInstanceCounters* result);
 
-  /// Derived counter function: aggregates throughput for node_id across all fragment
-  /// instances (id needs to be for a ScanNode).
-  int64_t ComputeTotalThroughput(int node_id);
-
-  /// Derived counter function: aggregates total completed scan ranges for node_id
-  /// across all fragment instances (id needs to be for a ScanNode).
-  int64_t ComputeTotalScanRangesComplete(int node_id);
-
   /// Runs cancel logic. Assumes that lock_ is held.
   void CancelInternal();
 
@@ -475,9 +481,9 @@ class Coordinator {
   /// Returns only when either all fragment instances have reported success or the query
   /// is in error. Returns the status of the query.
   /// It is safe to call this concurrently, but any calls must be made only after Exec().
-  /// WaitForAllBackends may be called before Wait(), but note that Wait() guarantees
+  /// WaitForAllInstances may be called before Wait(), but note that Wait() guarantees
   /// that any coordinator fragment has finished, which this method does not.
-  Status WaitForAllBackends();
+  Status WaitForAllInstances();
 
   /// Perform any post-query cleanup required. Called by Wait() only after all fragment
   /// instances have returned, or if the query has failed, in which case it only cleans up
@@ -490,17 +496,17 @@ class Coordinator {
   /// Initializes the structures in runtime profile and exec_summary_. Must be
   /// called before RPCs to start remote fragments.
   void InitExecProfile(const TQueryExecRequest& request);
+  void MtInitExecProfiles();
+
+  /// Initialize the structures to collect execution summary of every plan node
+  /// (exec_summary_ and plan_node_id_to_summary_map_)
+  void MtInitExecSummary();
 
   /// Update fragment profile information from a fragment instance state.
-  /// This is called repeatedly from UpdateFragmentExecStatus(),
-  /// and also at the end of the query from ReportQuerySummary().
-  /// This method calls UpdateAverage() and AddChild(), which obtain their own locks
-  /// on the instance state.
   void UpdateAverageProfile(FragmentInstanceState* fragment_instance_state);
 
   /// Compute the summary stats (completion_time and rates)
-  /// for an individual fragment_profile_ based on the specified backed_exec_state.
-  /// Called only from ReportQuerySummary() below.
+  /// for an individual fragment_profile_ based on the specified instance state.
   void ComputeFragmentSummaryStats(FragmentInstanceState* fragment_instance_state);
 
   /// Outputs aggregate query profile summary.  This is assumed to be called at the end of
@@ -509,7 +515,7 @@ class Coordinator {
 
   /// Populates the summary execution stats from the profile. Can only be called when the
   /// query is done.
-  void UpdateExecSummary(int fragment_idx, int instance_idx, RuntimeProfile* profile);
+  void UpdateExecSummary(const FragmentInstanceState& instance_state);
 
   /// Determines what the permissions of directories created by INSERT statements should
   /// be if permission inheritance is enabled. Populates a map from all prefixes of
@@ -532,10 +538,23 @@ class Coordinator {
   /// SubplanNode with respect to setting collection-slots to NULL.
   void ValidateCollectionSlots(RowBatch* batch);
 
+  /// Prepare coordinator fragment for execution (update filter routing table,
+  /// prepare executor, set up output exprs) and create its FragmentInstanceState.
+  Status PrepareCoordFragment(std::vector<ExprContext*>* output_expr_ctxs);
+
   /// Starts all remote fragments contained in the schedule by issuing RPCs in parallel,
-  /// and then waiting for all of the RPCs to complete. Returns an error if there was any
-  /// error starting the fragments.
-  Status StartRemoteFragments(QuerySchedule* schedule);
+  /// and then waiting for all of the RPCs to complete.
+  void StartRemoteFragments();
+
+  /// Starts all remote fragment instances contained in the schedule by issuing RPCs in
+  /// parallel and then waiting for all of the RPCs to complete. Also sets up and
+  /// registers the state for all non-coordinator fragment instance.
+  void MtStartRemoteFInstances();
+
+  /// Calls CancelInternal() and returns an error if there was any error starting the
+  /// fragments.
+  /// Also updates query_profile_ with the startup latency histogram.
+  Status FinishRemoteInstanceStartup();
 
   /// Build the filter routing table by iterating over all plan nodes and collecting the
   /// filters that they either produce or consume. The source and target fragment

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/be/src/runtime/runtime-filter-bank.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-filter-bank.cc b/be/src/runtime/runtime-filter-bank.cc
index a25bf8d..53755f9 100644
--- a/be/src/runtime/runtime-filter-bank.cc
+++ b/be/src/runtime/runtime-filter-bank.cc
@@ -81,11 +81,13 @@ RuntimeFilter* RuntimeFilterBank::RegisterFilter(const TRuntimeFilterDesc& filte
   } else {
     if (consumed_filters_.find(filter_desc.filter_id) == consumed_filters_.end()) {
       consumed_filters_[filter_desc.filter_id] = ret;
+      VLOG_QUERY << "registered consumer filter " << filter_desc.filter_id;
     } else {
       // The filter has already been registered in this filter bank by another
       // target node.
       DCHECK_GT(filter_desc.targets.size(), 1);
       ret = consumed_filters_[filter_desc.filter_id];
+      VLOG_QUERY << "re-registered consumer filter " << filter_desc.filter_id;
     }
   }
   return ret;