You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2016/10/06 00:31:16 UTC

[2/4] incubator-impala git commit: IMPALA-3902: Scheduler improvements for running multiple fragment instances on a single backend

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/be/src/scheduling/query-schedule.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/query-schedule.cc b/be/src/scheduling/query-schedule.cc
index b5745c4..5ad84df 100644
--- a/be/src/scheduling/query-schedule.cc
+++ b/be/src/scheduling/query-schedule.cc
@@ -52,25 +52,90 @@ QuerySchedule::QuerySchedule(const TUniqueId& query_id,
     query_events_(query_events),
     num_fragment_instances_(0),
     num_scan_ranges_(0),
+    next_instance_id_(query_id),
     is_admitted_(false) {
   fragment_exec_params_.resize(request.fragments.size());
-  // Build two maps to map node ids to their fragments as well as to the offset in their
-  // fragment's plan's nodes list.
-  for (int i = 0; i < request.fragments.size(); ++i) {
-    int node_idx = 0;
-    for (const TPlanNode& node: request.fragments[i].plan.nodes) {
-      if (plan_node_to_fragment_idx_.size() < node.node_id + 1) {
-        plan_node_to_fragment_idx_.resize(node.node_id + 1);
-        plan_node_to_plan_node_idx_.resize(node.node_id + 1);
+  bool is_mt_execution = request.query_ctx.request.query_options.mt_dop > 0;
+
+  if (is_mt_execution) {
+    /// TODO-MT: remove else branch and move MtInit() logic here
+    MtInit();
+  } else {
+    // Build two maps to map node ids to their fragments as well as to the offset in their
+    // fragment's plan's nodes list.
+    for (int i = 0; i < request.fragments.size(); ++i) {
+      int node_idx = 0;
+      for (const TPlanNode& node: request.fragments[i].plan.nodes) {
+        if (plan_node_to_fragment_idx_.size() < node.node_id + 1) {
+          plan_node_to_fragment_idx_.resize(node.node_id + 1);
+          plan_node_to_plan_node_idx_.resize(node.node_id + 1);
+        }
+        DCHECK_EQ(plan_node_to_fragment_idx_.size(), plan_node_to_plan_node_idx_.size());
+        plan_node_to_fragment_idx_[node.node_id] = i;
+        plan_node_to_plan_node_idx_[node.node_id] = node_idx;
+        ++node_idx;
       }
-      DCHECK_EQ(plan_node_to_fragment_idx_.size(), plan_node_to_plan_node_idx_.size());
-      plan_node_to_fragment_idx_[node.node_id] = i;
-      plan_node_to_plan_node_idx_[node.node_id] = node_idx;
-      ++node_idx;
     }
   }
 }
 
+void QuerySchedule::MtInit() {
+  // extract TPlanFragments and order by fragment id
+  vector<const TPlanFragment*> fragments;
+  for (const TPlanExecInfo& plan_exec_info: request_.mt_plan_exec_info) {
+    for (const TPlanFragment& fragment: plan_exec_info.fragments) {
+      fragments.push_back(&fragment);
+    }
+  }
+  sort(fragments.begin(), fragments.end(),
+      [](const TPlanFragment* a, const TPlanFragment* b) { return a->idx < b->idx; });
+
+  DCHECK_EQ(mt_fragment_exec_params_.size(), 0);
+  for (const TPlanFragment* fragment: fragments) {
+    mt_fragment_exec_params_.emplace_back(*fragment);
+  }
+
+  // mark coordinator fragment
+  const TPlanFragment& coord_fragment = request_.mt_plan_exec_info[0].fragments[0];
+  if (coord_fragment.partition.type == TPartitionType::UNPARTITIONED) {
+    mt_fragment_exec_params_[coord_fragment.idx].is_coord_fragment = true;
+    next_instance_id_.lo = 1;  // generated instance ids start at 1
+  }
+
+  // compute input fragments and find max node id
+  int max_node_id = 0;
+  for (const TPlanExecInfo& plan_exec_info: request_.mt_plan_exec_info) {
+    for (const TPlanFragment& fragment: plan_exec_info.fragments) {
+      for (const TPlanNode& node: fragment.plan.nodes) {
+        max_node_id = max(node.node_id, max_node_id);
+      }
+    }
+
+    // fragments[i] sends its output to fragments[dest_fragment_idx[i-1]]
+    for (int i = 1; i < plan_exec_info.fragments.size(); ++i) {
+      const TPlanFragment& fragment = plan_exec_info.fragments[i];
+      FragmentIdx dest_idx =
+          plan_exec_info.fragments[plan_exec_info.dest_fragment_idx[i - 1]].idx;
+      MtFragmentExecParams& dest_params = mt_fragment_exec_params_[dest_idx];
+      dest_params.input_fragments.push_back(fragment.idx);
+    }
+  }
+
+  // populate plan_node_to_fragment_idx_ and plan_node_to_plan_node_idx_
+  plan_node_to_fragment_idx_.resize(max_node_id + 1);
+  plan_node_to_plan_node_idx_.resize(max_node_id + 1);
+  for (const TPlanExecInfo& plan_exec_info: request_.mt_plan_exec_info) {
+    for (const TPlanFragment& fragment: plan_exec_info.fragments) {
+      for (int i = 0; i < fragment.plan.nodes.size(); ++i) {
+        const TPlanNode& node = fragment.plan.nodes[i];
+        plan_node_to_fragment_idx_[node.node_id] = fragment.idx;
+        plan_node_to_plan_node_idx_[node.node_id] = i;
+      }
+    }
+  }
+}
+
+
 int64_t QuerySchedule::GetClusterMemoryEstimate() const {
   DCHECK_GT(unique_hosts_.size(), 0);
   const int64_t total_cluster_mem = GetPerHostMemoryEstimate() * unique_hosts_.size();
@@ -122,4 +187,73 @@ void QuerySchedule::SetUniqueHosts(const unordered_set<TNetworkAddress>& unique_
   unique_hosts_ = unique_hosts;
 }
 
+TUniqueId QuerySchedule::GetNextInstanceId() {
+  TUniqueId result = next_instance_id_;
+  ++next_instance_id_.lo;
+  return result;
+}
+
+const TPlanFragment& FInstanceExecParams::fragment() const {
+  return fragment_exec_params.fragment;
+}
+
+int QuerySchedule::GetNumFragmentInstances() const {
+  if (mt_fragment_exec_params_.empty()) return num_fragment_instances_;
+  int result = 0;
+  for (const MtFragmentExecParams& fragment_exec_params: mt_fragment_exec_params_) {
+    result += fragment_exec_params.instance_exec_params.size();
+  }
+  return result;
+}
+
+int QuerySchedule::GetNumRemoteFInstances() const {
+  bool has_coordinator_fragment = GetCoordFragment() != nullptr;
+  int result = GetNumFragmentInstances();
+  bool is_mt_execution = request_.query_ctx.request.query_options.mt_dop > 0;
+  if (is_mt_execution && has_coordinator_fragment) --result;
+  return result;
+}
+
+int QuerySchedule::GetTotalFInstances() const {
+  int result = GetNumRemoteFInstances();
+  return GetCoordFragment() != nullptr ? result + 1 : result;
+}
+
+const TPlanFragment* QuerySchedule::GetCoordFragment() const {
+  bool is_mt_exec = request_.query_ctx.request.query_options.mt_dop > 0;
+  const TPlanFragment* fragment = is_mt_exec
+      ? &request_.mt_plan_exec_info[0].fragments[0] : &request_.fragments[0];
+  if (fragment->partition.type == TPartitionType::UNPARTITIONED) {
+    return fragment;
+  } else {
+    return nullptr;
+  }
+}
+
+void QuerySchedule::GetTPlanFragments(vector<const TPlanFragment*>* fragments) const {
+  fragments->clear();
+  bool is_mt_exec = request_.query_ctx.request.query_options.mt_dop > 0;
+  if (is_mt_exec) {
+    for (const TPlanExecInfo& plan_info: request_.mt_plan_exec_info) {
+      for (const TPlanFragment& fragment: plan_info.fragments) {
+        fragments->push_back(&fragment);
+      }
+    }
+  } else {
+    for (const TPlanFragment& fragment: request_.fragments) {
+      fragments->push_back(&fragment);
+    }
+  }
+}
+
+const FInstanceExecParams& QuerySchedule::GetCoordInstanceExecParams() const {
+  const TPlanFragment& coord_fragment =  request_.mt_plan_exec_info[0].fragments[0];
+  DCHECK_EQ(coord_fragment.partition.type, TPartitionType::UNPARTITIONED);
+  const MtFragmentExecParams* fragment_params =
+      &mt_fragment_exec_params_[coord_fragment.idx];
+  DCHECK(fragment_params != nullptr);
+  DCHECK_EQ(fragment_params->instance_exec_params.size(), 1);
+  return fragment_params->instance_exec_params[0];
+}
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/be/src/scheduling/query-schedule.h
----------------------------------------------------------------------
diff --git a/be/src/scheduling/query-schedule.h b/be/src/scheduling/query-schedule.h
index c8ebd5d..39ce268 100644
--- a/be/src/scheduling/query-schedule.h
+++ b/be/src/scheduling/query-schedule.h
@@ -20,13 +20,14 @@
 
 #include <vector>
 #include <string>
+#include <unordered_map>
 #include <boost/unordered_set.hpp>
-#include <boost/unordered_map.hpp>
 #include <boost/scoped_ptr.hpp>
 
 #include "common/global-types.h"
 #include "common/status.h"
 #include "util/promise.h"
+#include "util/container-util.h"
 #include "util/runtime-profile.h"
 #include "gen-cpp/Types_types.h"  // for TNetworkAddress
 #include "gen-cpp/Frontend_types.h"
@@ -34,12 +35,14 @@
 namespace impala {
 
 class Coordinator;
+struct MtFragmentExecParams;
 
 /// map from scan node id to a list of scan ranges
 typedef std::map<TPlanNodeId, std::vector<TScanRangeParams>> PerNodeScanRanges;
+
 /// map from an impalad host address to the per-node assigned scan ranges;
 /// records scan range assignment for a single fragment
-typedef boost::unordered_map<TNetworkAddress, PerNodeScanRanges>
+typedef std::unordered_map<TNetworkAddress, PerNodeScanRanges>
     FragmentScanRangeAssignment;
 
 /// execution parameters for a single fragment; used to assemble the
@@ -58,14 +61,62 @@ struct FragmentExecParams {
   int sender_id_base;
 };
 
+/// execution parameters for a single fragment instance; used to assemble the
+/// TPlanFragmentInstanceCtx
+struct FInstanceExecParams {
+  TUniqueId instance_id;
+  TNetworkAddress host; // execution backend
+  PerNodeScanRanges per_node_scan_ranges;
+
+  /// 0-based ordinal of this particular instance within its fragment (not: query-wide)
+  int per_fragment_instance_idx;
+
+  /// In its role as a data sender, a fragment instance is assigned a "sender id" to
+  /// uniquely identify it to a receiver. -1 = invalid.
+  int sender_id;
+
+  /// the parent MtFragmentExecParams
+  const MtFragmentExecParams& fragment_exec_params;
+  const TPlanFragment& fragment() const;
+
+  FInstanceExecParams(const TUniqueId& instance_id, const TNetworkAddress& host,
+      int per_fragment_instance_idx, const MtFragmentExecParams& fragment_exec_params)
+    : instance_id(instance_id), host(host),
+      per_fragment_instance_idx(per_fragment_instance_idx),
+      sender_id(-1),
+      fragment_exec_params(fragment_exec_params) {}
+};
+
+/// Execution parameters shared between fragment instances
+struct MtFragmentExecParams {
+  /// output destinations of this fragment
+  std::vector<TPlanFragmentDestination> destinations;
+
+  /// map from node id to the number of senders (node id expected to be for an
+  /// ExchangeNode)
+  std::map<PlanNodeId, int> per_exch_num_senders;
+
+  // only needed as intermediate state during exec parameter computation;
+  // for scheduling, refer to FInstanceExecParams.per_node_scan_ranges
+  FragmentScanRangeAssignment scan_range_assignment;
+
+  bool is_coord_fragment;
+  const TPlanFragment& fragment;
+  std::vector<FragmentIdx> input_fragments;
+  std::vector<FInstanceExecParams> instance_exec_params;
+
+  MtFragmentExecParams(const TPlanFragment& fragment)
+    : is_coord_fragment(false), fragment(fragment) {}
+};
+
 /// A QuerySchedule contains all necessary information for a query coordinator to
 /// generate fragment execution requests and start query execution. If resource management
 /// is enabled, then a schedule also contains the resource reservation request
 /// and the granted resource reservation.
-/// TODO: Consider moving QuerySchedule and all Schedulers into
-/// their own lib (and out of statestore).
-/// TODO: Move all global state (e.g. profiles) to QueryExecState (after it is decoupled
-/// from ImpalaServer)
+///
+/// QuerySchedule is a container class for scheduling data, but it doesn't contain
+/// scheduling logic itself. Its state either comes from the static TQueryExecRequest
+/// or is computed by SimpleScheduler.
 class QuerySchedule {
  public:
   QuerySchedule(const TUniqueId& query_id, const TQueryExecRequest& request,
@@ -87,19 +138,86 @@ class QuerySchedule {
   int64_t GetClusterMemoryEstimate() const;
 
   /// Helper methods used by scheduler to populate this QuerySchedule.
-  void AddScanRanges(int64_t delta) { num_scan_ranges_ += delta; }
+  void IncNumScanRanges(int64_t delta) { num_scan_ranges_ += delta; }
+
+  /// The following 4 functions need to be replaced once we stop special-casing
+  /// the coordinator instance in the coordinator.
+  /// The replacement is a single function int GetNumFInstances() (which includes
+  /// the coordinator instance).
+
+  /// TODO-MT: remove; this is actually only the number of remote instances
+  /// (from the coordinator's perspective)
   void set_num_fragment_instances(int64_t num_fragment_instances) {
     num_fragment_instances_ = num_fragment_instances;
   }
-  int64_t num_fragment_instances() const { return num_fragment_instances_; }
+
+  /// Returns the number of fragment instances registered with this schedule.
+  /// MT: total number of fragment instances
+  /// ST: value set with set_num_fragment_instances(); excludes coord instance
+  /// (in effect the number of remote instances)
+  /// TODO-MT: get rid of special-casing of coordinator instance and always return the
+  /// total
+  int GetNumFragmentInstances() const;
+
+  /// Returns the total number of fragment instances, incl. coordinator fragment.
+  /// TODO-MT: remove
+  int GetTotalFInstances() const;
+
+  /// Returns the number of remote fragment instances (excludes coordinator).
+  /// Works for both MT and ST.
+  /// TODO-MT: remove
+  int GetNumRemoteFInstances() const;
+
+  /// Return the coordinator fragment, or nullptr if there isn't one.
+  const TPlanFragment* GetCoordFragment() const;
+
+  /// Return all fragments belonging to exec request in 'fragments'.
+  void GetTPlanFragments(std::vector<const TPlanFragment*>* fragments) const;
+
   int64_t num_scan_ranges() const { return num_scan_ranges_; }
 
-  /// Map node ids to the index of their fragment in TQueryExecRequest.fragments.
-  int32_t GetFragmentIdx(PlanNodeId id) const { return plan_node_to_fragment_idx_[id]; }
+  /// Map node ids to the id of their containing fragment.
+  FragmentIdx GetFragmentIdx(PlanNodeId id) const {
+    return plan_node_to_fragment_idx_[id];
+  }
+
+  /// Returns next instance id. Instance ids are consecutive numbers generated from
+  /// the query id.
+  /// If the query contains a coordinator fragment instance, the generated instance
+  /// ids start at 1 and the caller is responsible for assigning the correct id
+  /// to the coordinator instance. If the query does not contain a coordinator instance,
+  /// the generated instance ids start at 0.
+  TUniqueId GetNextInstanceId();
+
+  const TPlanFragment& GetContainingFragment(PlanNodeId node_id) const {
+    return mt_fragment_exec_params_[GetFragmentIdx(node_id)].fragment;
+  }
 
-  /// Map node ids to the index of the node inside their plan.nodes list.
+  /// Map node ids to the index of their node inside their plan.nodes list.
+  /// TODO-MT: remove; only needed for the ST path
   int32_t GetNodeIdx(PlanNodeId id) const { return plan_node_to_plan_node_idx_[id]; }
+
+  const TPlanNode& GetNode(PlanNodeId id) const {
+    const TPlanFragment& fragment = GetContainingFragment(id);
+    return fragment.plan.nodes[plan_node_to_plan_node_idx_[id]];
+  }
+
   std::vector<FragmentExecParams>* exec_params() { return &fragment_exec_params_; }
+  const std::vector<FragmentExecParams>& exec_params() const {
+    return fragment_exec_params_;
+  }
+  const std::vector<MtFragmentExecParams>& mt_fragment_exec_params() const {
+    return mt_fragment_exec_params_;
+  }
+  const MtFragmentExecParams& GetFragmentExecParams(FragmentIdx idx) const {
+    return mt_fragment_exec_params_[idx];
+  }
+  MtFragmentExecParams* GetFragmentExecParams(FragmentIdx idx) {
+    return &mt_fragment_exec_params_[idx];
+  }
+
+  const FInstanceExecParams& GetCoordInstanceExecParams() const;
+
   const boost::unordered_set<TNetworkAddress>& unique_hosts() const {
     return unique_hosts_;
   }
@@ -111,7 +229,6 @@ class QuerySchedule {
   void SetUniqueHosts(const boost::unordered_set<TNetworkAddress>& unique_hosts);
 
  private:
-
   /// These references are valid for the lifetime of this query schedule because they
   /// are all owned by the enclosing QueryExecState.
   const TUniqueId& query_id_;
@@ -122,7 +239,7 @@ class QuerySchedule {
   RuntimeProfile* summary_profile_;
   RuntimeProfile::EventSequence* query_events_;
 
-  /// Maps from plan node id to its fragment index. Filled in c'tor.
+  /// Maps from plan node id to its fragment idx. Filled in c'tor.
   std::vector<int32_t> plan_node_to_fragment_idx_;
 
   /// Maps from plan node id to its index in plan.nodes. Filled in c'tor.
@@ -132,21 +249,33 @@ class QuerySchedule {
   /// populated by Scheduler::Schedule()
   std::vector<FragmentExecParams> fragment_exec_params_;
 
+  // populated by Scheduler::Schedule (SimpleScheduler::ComputeMtFInstanceExecParams())
+  // indexed by fragment idx (TPlanFragment.idx)
+  std::vector<MtFragmentExecParams> mt_fragment_exec_params_;
+
   /// The set of hosts that the query will run on excluding the coordinator.
   boost::unordered_set<TNetworkAddress> unique_hosts_;
 
   /// Number of backends executing plan fragments on behalf of this query.
+  /// TODO-MT: remove
   int64_t num_fragment_instances_;
 
   /// Total number of scan ranges of this query.
   int64_t num_scan_ranges_;
 
+  /// Used to generate consecutive fragment instance ids.
+  TUniqueId next_instance_id_;
+
   /// Request pool to which the request was submitted for admission.
   std::string request_pool_;
 
   /// Indicates if the query has been admitted for execution.
   bool is_admitted_;
 
+  /// Populate mt_fragment_exec_params_ from request_.mt_plan_exec_info.
+  /// Sets is_coord_fragment and input_fragments.
+  /// Also populates plan_node_to_fragment_idx_ and plan_node_to_plan_node_idx_.
+  void MtInit();
 };
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/be/src/scheduling/scheduler.h
----------------------------------------------------------------------
diff --git a/be/src/scheduling/scheduler.h b/be/src/scheduling/scheduler.h
index 4c3a967..9bff424 100644
--- a/be/src/scheduling/scheduler.h
+++ b/be/src/scheduling/scheduler.h
@@ -46,12 +46,10 @@ class Scheduler {
   /// List of server descriptors.
   typedef std::vector<TBackendDescriptor> BackendList;
 
-  /// Populates given query schedule whose execution is to be coordinated by coord.
-  /// Assigns fragments to hosts based on scan ranges in the query exec request.
-  /// If resource management is enabled, also reserves resources from the central
-  /// resource manager (Yarn via Llama) to run the query in. This function blocks until
-  /// the reservation request has been granted or denied.
-  virtual Status Schedule(Coordinator* coord, QuerySchedule* schedule) = 0;
+  /// Populates given query schedule and assigns fragments to hosts based on scan
+  /// ranges in the query exec request. Submits schedule to admission control before
+  /// returning.
+  virtual Status Schedule(QuerySchedule* schedule) = 0;
 
   /// Releases the reserved resources (if any) from the given schedule.
   virtual Status Release(QuerySchedule* schedule) = 0;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/be/src/scheduling/simple-scheduler.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/simple-scheduler.cc b/be/src/scheduling/simple-scheduler.cc
index f3ba9a5..5b6303e 100644
--- a/be/src/scheduling/simple-scheduler.cc
+++ b/be/src/scheduling/simple-scheduler.cc
@@ -20,6 +20,7 @@
 #include <atomic>
 #include <random>
 #include <vector>
+#include <algorithm>
 
 #include <boost/algorithm/string.hpp>
 #include <boost/algorithm/string/join.hpp>
@@ -276,6 +277,7 @@ void SimpleScheduler::UpdateMembership(
       }
     }
     if (metrics_ != NULL) {
+      /// TODO-MT: fix this (do we even need to report it?)
       num_fragment_instances_metric_->set_value(current_membership_.size());
     }
   }
@@ -305,7 +307,7 @@ Status SimpleScheduler::ComputeScanRangeAssignment(const TQueryExecRequest& exec
   for (entry = exec_request.per_node_scan_ranges.begin();
       entry != exec_request.per_node_scan_ranges.end(); ++entry) {
     const TPlanNodeId node_id = entry->first;
-    int fragment_idx = schedule->GetFragmentIdx(node_id);
+    FragmentIdx fragment_idx = schedule->GetFragmentIdx(node_id);
     const TPlanFragment& fragment = exec_request.fragments[fragment_idx];
     bool exec_at_coord = (fragment.partition.type == TPartitionType::UNPARTITIONED);
 
@@ -326,11 +328,208 @@ Status SimpleScheduler::ComputeScanRangeAssignment(const TQueryExecRequest& exec
         node_id, node_replica_preference, node_random_replica, entry->second,
         exec_request.host_list, exec_at_coord, schedule->query_options(),
         total_assignment_timer, assignment));
-    schedule->AddScanRanges(entry->second.size());
+    schedule->IncNumScanRanges(entry->second.size());
   }
   return Status::OK();
 }
 
+Status SimpleScheduler::MtComputeScanRangeAssignment(QuerySchedule* schedule) {
+  RuntimeProfile::Counter* total_assignment_timer =
+      ADD_TIMER(schedule->summary_profile(), "ComputeScanRangeAssignmentTimer");
+  BackendConfigPtr backend_config = GetBackendConfig();
+  const TQueryExecRequest& exec_request = schedule->request();
+  for (const TPlanExecInfo& plan_exec_info: exec_request.mt_plan_exec_info) {
+    for (const auto& entry: plan_exec_info.per_node_scan_ranges) {
+      const TPlanNodeId node_id = entry.first;
+      const TPlanFragment& fragment = schedule->GetContainingFragment(node_id);
+      bool exec_at_coord = (fragment.partition.type == TPartitionType::UNPARTITIONED);
+
+      const TPlanNode& node = schedule->GetNode(node_id);
+      DCHECK_EQ(node.node_id, node_id);
+
+      const TReplicaPreference::type* node_replica_preference =
+          node.__isset.hdfs_scan_node && node.hdfs_scan_node.__isset.replica_preference
+            ? &node.hdfs_scan_node.replica_preference : NULL;
+      bool node_random_replica =
+          node.__isset.hdfs_scan_node && node.hdfs_scan_node.__isset.random_replica
+            && node.hdfs_scan_node.random_replica;
+
+      FragmentScanRangeAssignment* assignment =
+          &schedule->GetFragmentExecParams(fragment.idx)->scan_range_assignment;
+      RETURN_IF_ERROR(ComputeScanRangeAssignment(
+          *backend_config, node_id, node_replica_preference, node_random_replica,
+          entry.second, exec_request.host_list, exec_at_coord,
+          schedule->query_options(), total_assignment_timer, assignment));
+      schedule->IncNumScanRanges(entry.second.size());
+    }
+  }
+  return Status::OK();
+}
+
+void SimpleScheduler::MtComputeFragmentExecParams(QuerySchedule* schedule) {
+  const TQueryExecRequest& exec_request = schedule->request();
+
+  // for each plan, compute the FInstanceExecParams for the tree of fragments
+  for (const TPlanExecInfo& plan_exec_info: exec_request.mt_plan_exec_info) {
+    // set instance_id, host, per_node_scan_ranges
+    MtComputeFragmentExecParams(
+        plan_exec_info,
+        schedule->GetFragmentExecParams(plan_exec_info.fragments[0].idx),
+        schedule);
+
+    // Set destinations, per_exch_num_senders, sender_id.
+    // fragments[f] sends its output to fragments[dest_fragment_idx[f-1]];
+    // fragments[0] is an endpoint.
+    for (int i = 0; i < plan_exec_info.dest_fragment_idx.size(); ++i) {
+      int dest_idx = plan_exec_info.dest_fragment_idx[i];
+      DCHECK_LT(dest_idx, plan_exec_info.fragments.size());
+      const TPlanFragment& dest_fragment = plan_exec_info.fragments[dest_idx];
+      DCHECK_LT(i + 1, plan_exec_info.fragments.size());
+      const TPlanFragment& src_fragment = plan_exec_info.fragments[i + 1];
+      DCHECK(src_fragment.output_sink.__isset.stream_sink);
+      MtFragmentExecParams* dest_params =
+          schedule->GetFragmentExecParams(dest_fragment.idx);
+      MtFragmentExecParams* src_params =
+          schedule->GetFragmentExecParams(src_fragment.idx);
+
+      // populate src_params->destinations
+      src_params->destinations.resize(dest_params->instance_exec_params.size());
+      for (int j = 0; j < dest_params->instance_exec_params.size(); ++j) {
+        TPlanFragmentDestination& dest = src_params->destinations[j];
+        dest.__set_fragment_instance_id(dest_params->instance_exec_params[j].instance_id);
+        dest.__set_server(dest_params->instance_exec_params[j].host);
+      }
+
+      // enumerate senders consecutively;
+      // for distributed merge we need to enumerate senders across fragment instances
+      const TDataStreamSink& sink = src_fragment.output_sink.stream_sink;
+      DCHECK(
+          sink.output_partition.type == TPartitionType::UNPARTITIONED
+            || sink.output_partition.type == TPartitionType::HASH_PARTITIONED
+            || sink.output_partition.type == TPartitionType::RANDOM);
+      PlanNodeId exch_id = sink.dest_node_id;
+      int sender_id_base = dest_params->per_exch_num_senders[exch_id];
+      dest_params->per_exch_num_senders[exch_id] +=
+          src_params->instance_exec_params.size();
+      for (int j = 0; j < src_params->instance_exec_params.size(); ++j) {
+        FInstanceExecParams& src_instance_params = src_params->instance_exec_params[j];
+        src_instance_params.sender_id = sender_id_base + j;
+      }
+    }
+  }
+}
+
+void SimpleScheduler::MtComputeFragmentExecParams(
+    const TPlanExecInfo& plan_exec_info, MtFragmentExecParams* fragment_params,
+    QuerySchedule* schedule) {
+  // traverse input fragments
+  for (FragmentIdx input_fragment_idx: fragment_params->input_fragments) {
+    MtComputeFragmentExecParams(
+        plan_exec_info, schedule->GetFragmentExecParams(input_fragment_idx), schedule);
+  }
+
+  const TPlanFragment& fragment = fragment_params->fragment;
+  // TODO: deal with Union nodes
+  DCHECK(!ContainsNode(fragment.plan, TPlanNodeType::UNION_NODE));
+  // case 1: single instance executed at coordinator
+  if (fragment.partition.type == TPartitionType::UNPARTITIONED) {
+    const TNetworkAddress& coord = local_backend_descriptor_.address;
+    // make sure that the coordinator instance ends up with instance idx 0
+    TUniqueId instance_id = fragment_params->is_coord_fragment
+        ? schedule->query_id()
+        : schedule->GetNextInstanceId();
+    fragment_params->instance_exec_params.emplace_back(
+      instance_id, coord, 0, *fragment_params);
+    FInstanceExecParams& instance_params = fragment_params->instance_exec_params.back();
+
+    // That instance gets all of the scan ranges, if there are any.
+    if (!fragment_params->scan_range_assignment.empty()) {
+      DCHECK_EQ(fragment_params->scan_range_assignment.size(), 1);
+      auto first_entry = fragment_params->scan_range_assignment.begin();
+      instance_params.per_node_scan_ranges = first_entry->second;
+    }
+  } else {
+    PlanNodeId leftmost_scan_id = FindLeftmostScan(fragment.plan);
+    if (leftmost_scan_id != g_ImpalaInternalService_constants.INVALID_PLAN_NODE_ID) {
+      // case 2: leaf fragment with leftmost scan
+      // TODO: check that there's only one scan in this fragment
+      MtCreateScanInstances(leftmost_scan_id, fragment_params, schedule);
+    } else {
+      // case 3: interior fragment without leftmost scan
+      // we assign the same hosts as those of our leftmost input fragment (so that a
+      // merge aggregation fragment runs on the hosts that provide the input data)
+      MtCreateMirrorInstances(fragment_params, schedule);
+    }
+  }
+}
+
+void SimpleScheduler::MtCreateScanInstances(
+    PlanNodeId leftmost_scan_id, MtFragmentExecParams* fragment_params,
+    QuerySchedule* schedule) {
+  int max_num_instances = schedule->request().query_ctx.request.query_options.mt_dop;
+  for (const auto& assignment_entry: fragment_params->scan_range_assignment) {
+    // evenly divide up the scan ranges of the leftmost scan between at most
+    // <dop> instances
+    const TNetworkAddress& host = assignment_entry.first;
+    auto scan_ranges_it = assignment_entry.second.find(leftmost_scan_id);
+    DCHECK(scan_ranges_it != assignment_entry.second.end());
+    const vector<TScanRangeParams>& params_list = scan_ranges_it->second;
+
+    int64 total_size = 0;
+    for (const TScanRangeParams& params: params_list) {
+      // TODO: implement logic for hbase and kudu
+      DCHECK(params.scan_range.__isset.hdfs_file_split);
+      total_size += params.scan_range.hdfs_file_split.length;
+    }
+
+    // try to load-balance scan ranges by assigning just beyond the average number of
+    // bytes to each instance
+    // TODO: fix shortcomings introduced by uneven split sizes,
+    // this could end up assigning 0 scan ranges to an instance
+    int num_instances = ::min(max_num_instances, static_cast<int>(params_list.size()));
+    DCHECK_GT(num_instances, 0);
+    float avg_bytes_per_instance = static_cast<float>(total_size) / num_instances;
+    int64_t total_assigned_bytes = 0;
+    int params_idx = 0;  // into params_list
+    for (int i = 0; i < num_instances; ++i) {
+      fragment_params->instance_exec_params.emplace_back(
+          schedule->GetNextInstanceId(), host, i, *fragment_params);
+      FInstanceExecParams& instance_params = fragment_params->instance_exec_params.back();
+
+      // threshold beyond which we want to assign to the next instance
+      int64_t threshold_total_bytes = avg_bytes_per_instance * (i + 1);
+
+      // this will have assigned all scan ranges by the last instance:
+      // for the last instance, threshold_total_bytes == total_size and
+      // total_assigned_bytes won't hit total_size until everything is assigned
+      while (params_idx < params_list.size()
+          && total_assigned_bytes < threshold_total_bytes) {
+        const TScanRangeParams& scan_range_params = params_list[params_idx];
+        instance_params.per_node_scan_ranges[leftmost_scan_id].push_back(
+            scan_range_params);
+        total_assigned_bytes += scan_range_params.scan_range.hdfs_file_split.length;
+        ++params_idx;
+      }
+      if (params_idx >= params_list.size()) break;  // nothing left to assign
+    }
+    DCHECK_EQ(params_idx, params_list.size());  // everything got assigned
+  }
+}
+
+void SimpleScheduler::MtCreateMirrorInstances(
+    MtFragmentExecParams* fragment_params, QuerySchedule* schedule) {
+  DCHECK_GE(fragment_params->input_fragments.size(), 1);
+  const MtFragmentExecParams* input_fragment_params =
+      schedule->GetFragmentExecParams(fragment_params->input_fragments[0]);
+  int per_fragment_instance_idx = 0;
+  for (const FInstanceExecParams& input_instance_params:
+      input_fragment_params->instance_exec_params) {
+    fragment_params->instance_exec_params.emplace_back(
+        schedule->GetNextInstanceId(), input_instance_params.host,
+        per_fragment_instance_idx++, *fragment_params);
+  }
+}
+
 Status SimpleScheduler::ComputeScanRangeAssignment(
     const BackendConfig& backend_config, PlanNodeId node_id,
     const TReplicaPreference::type* node_replica_preference, bool node_random_replica,
@@ -459,12 +658,10 @@ void SimpleScheduler::ComputeFragmentExecParams(const TQueryExecRequest& exec_re
   // assign instance ids
   int64_t num_fragment_instances = 0;
   for (FragmentExecParams& params: *fragment_exec_params) {
-    for (int j = 0; j < params.hosts.size(); ++j) {
-      int instance_idx = num_fragment_instances + j;
+    for (int j = 0; j < params.hosts.size(); ++j, ++num_fragment_instances) {
       params.instance_ids.push_back(
-          CreateInstanceId(schedule->query_id(), instance_idx));
+          CreateInstanceId(schedule->query_id(), num_fragment_instances));
     }
-    num_fragment_instances += params.hosts.size();
   }
   if (exec_request.fragments[0].partition.type == TPartitionType::UNPARTITIONED) {
     // the root fragment is executed directly by the coordinator
@@ -511,11 +708,6 @@ void SimpleScheduler::ComputeFragmentHosts(const TQueryExecRequest& exec_request
     QuerySchedule* schedule) {
   vector<FragmentExecParams>* fragment_exec_params = schedule->exec_params();
   DCHECK_EQ(fragment_exec_params->size(), exec_request.fragments.size());
-  vector<TPlanNodeType::type> scan_node_types;
-  scan_node_types.push_back(TPlanNodeType::HDFS_SCAN_NODE);
-  scan_node_types.push_back(TPlanNodeType::HBASE_SCAN_NODE);
-  scan_node_types.push_back(TPlanNodeType::DATA_SOURCE_NODE);
-  scan_node_types.push_back(TPlanNodeType::KUDU_SCAN_NODE);
 
   // compute hosts of producer fragment before those of consumer fragment(s),
   // the latter might inherit the set of hosts from the former
@@ -535,6 +727,9 @@ void SimpleScheduler::ComputeFragmentHosts(const TQueryExecRequest& exec_request
     // a UnionNode with partitioned joins or grouping aggregates as children runs on
     // at least as many hosts as the input to those children).
     if (ContainsNode(fragment.plan, TPlanNodeType::UNION_NODE)) {
+      vector<TPlanNodeType::type> scan_node_types {
+        TPlanNodeType::HDFS_SCAN_NODE, TPlanNodeType::HBASE_SCAN_NODE,
+        TPlanNodeType::DATA_SOURCE_NODE, TPlanNodeType::KUDU_SCAN_NODE};
       vector<TPlanNodeId> scan_nodes;
       FindNodes(fragment.plan, scan_node_types, &scan_nodes);
       vector<TPlanNodeId> exch_nodes;
@@ -562,7 +757,7 @@ void SimpleScheduler::ComputeFragmentHosts(const TQueryExecRequest& exec_request
       continue;
     }
 
-    PlanNodeId leftmost_scan_id = FindLeftmostNode(fragment.plan, scan_node_types);
+    PlanNodeId leftmost_scan_id = FindLeftmostScan(fragment.plan);
     if (leftmost_scan_id == g_ImpalaInternalService_constants.INVALID_PLAN_NODE_ID) {
       // there is no leftmost scan; we assign the same hosts as those of our
       // leftmost input fragment (so that a partitioned aggregation fragment
@@ -606,6 +801,13 @@ PlanNodeId SimpleScheduler::FindLeftmostNode(
   return g_ImpalaInternalService_constants.INVALID_PLAN_NODE_ID;
 }
 
+PlanNodeId SimpleScheduler::FindLeftmostScan(const TPlan& plan) {
+  vector<TPlanNodeType::type> scan_node_types {
+      TPlanNodeType::HDFS_SCAN_NODE, TPlanNodeType::HBASE_SCAN_NODE,
+      TPlanNodeType::DATA_SOURCE_NODE, TPlanNodeType::KUDU_SCAN_NODE};
+  return FindLeftmostNode(plan, scan_node_types);
+}
+
 bool SimpleScheduler::ContainsNode(const TPlan& plan, TPlanNodeType::type type) {
   for (int i = 0; i < plan.nodes.size(); ++i) {
     if (plan.nodes[i].node_type == type) return true;
@@ -674,18 +876,35 @@ int SimpleScheduler::FindSenderFragment(TPlanNodeId exch_id, int fragment_idx,
   return g_ImpalaInternalService_constants.INVALID_PLAN_NODE_ID;
 }
 
-Status SimpleScheduler::Schedule(Coordinator* coord, QuerySchedule* schedule) {
+Status SimpleScheduler::Schedule(QuerySchedule* schedule) {
   string resolved_pool;
   RETURN_IF_ERROR(request_pool_service_->ResolveRequestPool(
       schedule->request().query_ctx, &resolved_pool));
   schedule->set_request_pool(resolved_pool);
   schedule->summary_profile()->AddInfoString("Request Pool", resolved_pool);
 
-  RETURN_IF_ERROR(ComputeScanRangeAssignment(schedule->request(), schedule));
-  ComputeFragmentHosts(schedule->request(), schedule);
-  ComputeFragmentExecParams(schedule->request(), schedule);
-  if (!FLAGS_disable_admission_control) {
-    RETURN_IF_ERROR(admission_controller_->AdmitQuery(schedule));
+  bool is_mt_execution = schedule->request().query_ctx.request.query_options.mt_dop > 0;
+  if (is_mt_execution) {
+    RETURN_IF_ERROR(MtComputeScanRangeAssignment(schedule));
+    MtComputeFragmentExecParams(schedule);
+
+    // compute unique hosts
+    unordered_set<TNetworkAddress> unique_hosts;
+    for (const MtFragmentExecParams& f: schedule->mt_fragment_exec_params()) {
+      for (const FInstanceExecParams& i: f.instance_exec_params) {
+        unique_hosts.insert(i.host);
+      }
+    }
+    schedule->SetUniqueHosts(unique_hosts);
+
+    // TODO-MT: call AdmitQuery()
+  } else {
+    RETURN_IF_ERROR(ComputeScanRangeAssignment(schedule->request(), schedule));
+    ComputeFragmentHosts(schedule->request(), schedule);
+    ComputeFragmentExecParams(schedule->request(), schedule);
+    if (!FLAGS_disable_admission_control) {
+      RETURN_IF_ERROR(admission_controller_->AdmitQuery(schedule));
+    }
   }
   return Status::OK();
 }
@@ -849,10 +1068,10 @@ void SimpleScheduler::AssignmentCtx::RecordScanRangeAssignment(
     if (!remote_read) total_local_assignments_->Increment(1);
   }
 
-  PerNodeScanRanges* scan_ranges = FindOrInsert(assignment, backend.address,
-      PerNodeScanRanges());
-  vector<TScanRangeParams>* scan_range_params_list = FindOrInsert(scan_ranges, node_id,
-      vector<TScanRangeParams>());
+  PerNodeScanRanges* scan_ranges =
+      FindOrInsert(assignment, backend.address, PerNodeScanRanges());
+  vector<TScanRangeParams>* scan_range_params_list =
+      FindOrInsert(scan_ranges, node_id, vector<TScanRangeParams>());
   // Add scan range.
   TScanRangeParams scan_range_params;
   scan_range_params.scan_range = scan_range_locations.scan_range;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/be/src/scheduling/simple-scheduler.h
----------------------------------------------------------------------
diff --git a/be/src/scheduling/simple-scheduler.h b/be/src/scheduling/simple-scheduler.h
index 9b51269..b7cc83e 100644
--- a/be/src/scheduling/simple-scheduler.h
+++ b/be/src/scheduling/simple-scheduler.h
@@ -88,7 +88,7 @@ class SimpleScheduler : public Scheduler {
   /// Register with the subscription manager if required
   virtual impala::Status Init();
 
-  virtual Status Schedule(Coordinator* coord, QuerySchedule* schedule);
+  virtual Status Schedule(QuerySchedule* schedule);
   virtual Status Release(QuerySchedule* schedule);
 
  private:
@@ -405,7 +405,39 @@ class SimpleScheduler : public Scheduler {
   void ComputeFragmentExecParams(const TQueryExecRequest& exec_request,
       QuerySchedule* schedule);
 
-  /// For each fragment in exec_request, compute the hosts on which to run the instances
+  /// Compute the assignment of scan ranges to hosts for each scan node in
+  /// the schedule's TQueryExecRequest.mt_plan_exec_info.
+  /// Unpartitioned fragments are assigned to the coordinator. Populate the schedule's
+  /// mt_fragment_exec_params_ with the resulting scan range assignment.
+  Status MtComputeScanRangeAssignment(QuerySchedule* schedule);
+
+  /// Compute the MtFragmentExecParams for all plans in the schedule's
+  /// TQueryExecRequest.mt_plan_exec_info.
+  /// This includes the routing information (destinations, per_exch_num_senders,
+  /// sender_id)
+  void MtComputeFragmentExecParams(QuerySchedule* schedule);
+
+  /// Recursively create FInstanceExecParams and set per_node_scan_ranges for
+  /// fragment_params and its input fragments via a depth-first traversal.
+  /// All fragments are part of plan_exec_info.
+  void MtComputeFragmentExecParams(const TPlanExecInfo& plan_exec_info,
+      MtFragmentExecParams* fragment_params, QuerySchedule* schedule);
+
+  /// Create instances of the fragment corresponding to fragment_params to run on the
+  /// selected replica hosts of the scan ranges of the node with id scan_id.
+  /// The maximum number of instances is the value of query option mt_dop.
+  /// This attempts to load balance among instances by computing the average number
+  /// of bytes per instances and then in a single pass assigning scan ranges to each
+  /// instances to roughly meet that average.
+  void MtCreateScanInstances(PlanNodeId scan_id,
+      MtFragmentExecParams* fragment_params, QuerySchedule* schedule);
+
+  /// For each instance of the single input fragment of the fragment corresponding to
+  /// fragment_params, create an instance for this fragment.
+  void MtCreateMirrorInstances(MtFragmentExecParams* fragment_params,
+      QuerySchedule* schedule);
+
+  /// For each fragment in exec_request, computes hosts on which to run the instances
   /// and stores result in fragment_exec_params_.hosts.
   void ComputeFragmentHosts(const TQueryExecRequest& exec_request,
       QuerySchedule* schedule);
@@ -414,6 +446,8 @@ class SimpleScheduler : public Scheduler {
   /// INVALID_PLAN_NODE_ID if no such node present.
   PlanNodeId FindLeftmostNode(
       const TPlan& plan, const std::vector<TPlanNodeType::type>& types);
+  /// Same for scan nodes.
+  PlanNodeId FindLeftmostScan(const TPlan& plan);
 
   /// Return the index (w/in exec_request.fragments) of fragment that sends its output to
   /// exec_request.fragment[fragment_idx]'s leftmost ExchangeNode.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/be/src/service/fragment-exec-state.cc
----------------------------------------------------------------------
diff --git a/be/src/service/fragment-exec-state.cc b/be/src/service/fragment-exec-state.cc
index 9069b03..76e11d1 100644
--- a/be/src/service/fragment-exec-state.cc
+++ b/be/src/service/fragment-exec-state.cc
@@ -81,7 +81,6 @@ void FragmentMgr::FragmentExecState::ReportStatusCb(
   TReportExecStatusParams params;
   params.protocol_version = ImpalaInternalServiceVersion::V1;
   params.__set_query_id(query_ctx_.query_id);
-  params.__set_instance_state_idx( fragment_instance_ctx_.instance_state_idx);
   params.__set_fragment_instance_id(fragment_instance_ctx_.fragment_instance_id);
   exec_status.SetTStatus(&params);
   params.__set_done(done);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/be/src/service/fragment-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/service/fragment-mgr.cc b/be/src/service/fragment-mgr.cc
index a98bbf9..64e9a78 100644
--- a/be/src/service/fragment-mgr.cc
+++ b/be/src/service/fragment-mgr.cc
@@ -38,10 +38,8 @@ DEFINE_int32(log_mem_usage_interval, 0, "If non-zero, impalad will output memory
 
 Status FragmentMgr::ExecPlanFragment(const TExecPlanFragmentParams& exec_params) {
   VLOG_QUERY << "ExecPlanFragment() instance_id="
-             << exec_params.fragment_instance_ctx.fragment_instance_id
-             << " coord=" << exec_params.query_ctx.coord_address
-             << " fragment instance#="
-             << exec_params.fragment_instance_ctx.instance_state_idx;
+             << PrintId(exec_params.fragment_instance_ctx.fragment_instance_id)
+             << " coord=" << exec_params.query_ctx.coord_address;
 
   // Preparing and opening the fragment creates a thread and consumes a non-trivial
   // amount of memory. If we are already starved for memory, cancel the fragment as
@@ -146,6 +144,7 @@ void FragmentMgr::CancelPlanFragment(TCancelPlanFragmentResult& return_val,
 
 void FragmentMgr::PublishFilter(TPublishFilterResult& return_val,
     const TPublishFilterParams& params) {
+  VLOG_FILE << "PublishFilter(): dst_instance_id=" << params.dst_instance_id;
   shared_ptr<FragmentExecState> fragment_exec_state =
       GetFragmentExecState(params.dst_instance_id);
   if (fragment_exec_state.get() == NULL) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/be/src/service/impala-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index d5fd59a..7f9d862 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -845,9 +845,7 @@ void ImpalaServer::PrepareQueryContext(TQueryCtx* query_ctx) {
   // benchmarks show it to be slightly cheaper than contending for a
   // single generator under a lock (since random_generator is not
   // thread-safe).
-  random_generator uuid_generator;
-  uuid query_uuid = uuid_generator();
-  query_ctx->query_id = UuidToQueryId(query_uuid);
+  query_ctx->query_id = UuidToQueryId(random_generator()());
 }
 
 Status ImpalaServer::RegisterQuery(shared_ptr<SessionState> session_state,
@@ -1077,9 +1075,8 @@ Status ImpalaServer::GetSessionState(const TUniqueId& session_id,
 
 void ImpalaServer::ReportExecStatus(
     TReportExecStatusResult& return_val, const TReportExecStatusParams& params) {
-  VLOG_FILE << "ReportExecStatus() query_id=" << params.query_id
-            << " fragment instance#=" << params.instance_state_idx
-            << " instance_id=" << params.fragment_instance_id
+  VLOG_FILE << "ReportExecStatus()"
+            << " instance_id=" << PrintId(params.fragment_instance_id)
             << " done=" << (params.done ? "true" : "false");
   // TODO: implement something more efficient here, we're currently
   // acquiring/releasing the map lock and doing a map lookup for
@@ -1090,10 +1087,8 @@ void ImpalaServer::ReportExecStatus(
     // This is expected occasionally (since a report RPC might be in flight while
     // cancellation is happening). Return an error to the caller to get it to stop.
     const string& err = Substitute("ReportExecStatus(): Received report for unknown "
-        "query ID (probably closed or cancelled). (query_id: $0, backend: $1, instance:"
-        " $2 done: $3)", PrintId(params.query_id),
-        params.instance_state_idx, PrintId(params.fragment_instance_id),
-        params.done);
+        "query ID (probably closed or cancelled). (instance: $0 done: $1)",
+        PrintId(params.fragment_instance_id), params.done);
     Status(TErrorCode::INTERNAL_ERROR, err).SetTStatus(&return_val);
     VLOG_QUERY << err;
     return;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/be/src/service/query-exec-state.cc
----------------------------------------------------------------------
diff --git a/be/src/service/query-exec-state.cc b/be/src/service/query-exec-state.cc
index 9b2fc88..d55ac54 100644
--- a/be/src/service/query-exec-state.cc
+++ b/be/src/service/query-exec-state.cc
@@ -373,7 +373,8 @@ Status ImpalaServer::QueryExecState::ExecLocalCatalogOp(
 Status ImpalaServer::QueryExecState::ExecQueryOrDmlRequest(
     const TQueryExecRequest& query_exec_request) {
   // we always need at least one plan fragment
-  DCHECK_GT(query_exec_request.fragments.size(), 0);
+  DCHECK(query_exec_request.fragments.size() > 0
+      || query_exec_request.mt_plan_exec_info.size() > 0);
 
   if (query_exec_request.__isset.query_plan) {
     stringstream plan_ss;
@@ -424,26 +425,31 @@ Status ImpalaServer::QueryExecState::ExecQueryOrDmlRequest(
   // case, the query can only have a single fragment, and that fragment needs to be
   // executed by the coordinator. This check confirms that.
   // If desc_tbl is set, the query may or may not have a coordinator fragment.
+  bool is_mt_exec = query_exec_request.query_ctx.request.query_options.mt_dop > 0;
+  const TPlanFragment& fragment = is_mt_exec
+      ? query_exec_request.mt_plan_exec_info[0].fragments[0]
+      : query_exec_request.fragments[0];
   bool has_coordinator_fragment =
-      query_exec_request.fragments[0].partition.type == TPartitionType::UNPARTITIONED;
+      fragment.partition.type == TPartitionType::UNPARTITIONED;
   DCHECK(has_coordinator_fragment || query_exec_request.__isset.desc_tbl);
 
   {
     lock_guard<mutex> l(lock_);
     // Don't start executing the query if Cancel() was called concurrently with Exec().
     if (is_cancelled_) return Status::CANCELLED;
+    // TODO: make schedule local to coordinator and move schedule_->Release() into
+    // Coordinator::TearDown()
     schedule_.reset(new QuerySchedule(query_id(), query_exec_request,
         exec_request_.query_options, &summary_profile_, query_events_));
-    coord_.reset(new Coordinator(exec_request_.query_options, exec_env_, query_events_));
   }
-  Status status = exec_env_->scheduler()->Schedule(coord_.get(), schedule_.get());
-
+  Status status = exec_env_->scheduler()->Schedule(schedule_.get());
   {
     lock_guard<mutex> l(lock_);
     RETURN_IF_ERROR(UpdateQueryStatus(status));
   }
 
-  status = coord_->Exec(*schedule_, &output_expr_ctxs_);
+  coord_.reset(new Coordinator(*schedule_, exec_env_, query_events_));
+  status = coord_->Exec(&output_expr_ctxs_);
   {
     lock_guard<mutex> l(lock_);
     RETURN_IF_ERROR(UpdateQueryStatus(status));

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/be/src/service/query-options.cc
----------------------------------------------------------------------
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index d0e4275..0823981 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -395,16 +395,16 @@ Status impala::SetQueryOption(const string& key, const string& value,
         }
         break;
       }
-      case TImpalaQueryOptions::MT_NUM_CORES: {
+      case TImpalaQueryOptions::MT_DOP: {
         StringParser::ParseResult result;
-        const int32_t num_cores =
+        const int32_t dop =
             StringParser::StringToInt<int32_t>(value.c_str(), value.length(), &result);
-        if (result != StringParser::PARSE_SUCCESS || num_cores < 0 || num_cores > 128) {
+        if (result != StringParser::PARSE_SUCCESS || dop < 0 || dop > 128) {
           return Status(
-              Substitute("$0 is not valid for mt_num_cores. Valid values are in "
+              Substitute("$0 is not valid for mt_dop. Valid values are in "
                 "[0, 128].", value));
         }
-        query_options->__set_mt_num_cores(num_cores);
+        query_options->__set_mt_dop(dop);
         break;
       }
       case TImpalaQueryOptions::S3_SKIP_INSERT_STAGING: {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/be/src/service/query-options.h
----------------------------------------------------------------------
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index 2c25700..b1194d3 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -79,7 +79,7 @@ class TQueryOptions;
   QUERY_OPT_FN(max_num_runtime_filters, MAX_NUM_RUNTIME_FILTERS)\
   QUERY_OPT_FN(parquet_annotate_strings_utf8, PARQUET_ANNOTATE_STRINGS_UTF8)\
   QUERY_OPT_FN(parquet_fallback_schema_resolution, PARQUET_FALLBACK_SCHEMA_RESOLUTION)\
-  QUERY_OPT_FN(mt_num_cores, MT_NUM_CORES)\
+  QUERY_OPT_FN(mt_dop, MT_DOP)\
   QUERY_OPT_FN(s3_skip_insert_staging, S3_SKIP_INSERT_STAGING)\
   QUERY_OPT_FN(runtime_filter_min_size, RUNTIME_FILTER_MIN_SIZE)\
   QUERY_OPT_FN(runtime_filter_max_size, RUNTIME_FILTER_MAX_SIZE)\

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/be/src/util/container-util.h
----------------------------------------------------------------------
diff --git a/be/src/util/container-util.h b/be/src/util/container-util.h
index b8cd1ef..efb4f52 100644
--- a/be/src/util/container-util.h
+++ b/be/src/util/container-util.h
@@ -20,6 +20,7 @@
 #define IMPALA_UTIL_CONTAINER_UTIL_H
 
 #include <map>
+#include <unordered_map>
 #include <boost/unordered_map.hpp>
 
 #include "util/hash-util.h"
@@ -35,6 +36,21 @@ inline std::size_t hash_value(const TNetworkAddress& host_port) {
   return HashUtil::Hash(&host_port.port, sizeof(host_port.port), hash);
 }
 
+}
+
+/// Hash function for std:: containers
+namespace std {
+
+template<> struct hash<impala::TNetworkAddress> {
+  std::size_t operator()(const impala::TNetworkAddress& host_port) const {
+    return impala::hash_value(host_port);
+  }
+};
+
+}
+
+namespace impala {
+
 struct HashTNetworkAddressPtr : public std::unary_function<TNetworkAddress*, size_t> {
   size_t operator()(const TNetworkAddress* const& p) const { return hash_value(*p); }
 };
@@ -49,6 +65,7 @@ struct TNetworkAddressPtrEquals : public std::unary_function<TNetworkAddress*, b
 
 /// FindOrInsert(): if the key is present, return the value; if the key is not present,
 /// create a new entry (key, default_val) and return default_val.
+/// TODO: replace with single template which takes a template param
 
 template <typename K, typename V>
 V* FindOrInsert(std::map<K,V>* m, const K& key, const V& default_val) {
@@ -60,6 +77,15 @@ V* FindOrInsert(std::map<K,V>* m, const K& key, const V& default_val) {
 }
 
 template <typename K, typename V>
+V* FindOrInsert(std::unordered_map<K,V>* m, const K& key, const V& default_val) {
+  typename std::unordered_map<K,V>::iterator it = m->find(key);
+  if (it == m->end()) {
+    it = m->insert(std::make_pair(key, default_val)).first;
+  }
+  return &it->second;
+}
+
+template <typename K, typename V>
 V* FindOrInsert(boost::unordered_map<K,V>* m, const K& key, const V& default_val) {
   typename boost::unordered_map<K,V>::iterator it = m->find(key);
   if (it == m->end()) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/be/src/util/uid-util-test.cc
----------------------------------------------------------------------
diff --git a/be/src/util/uid-util-test.cc b/be/src/util/uid-util-test.cc
index ebbc5eb..997cc0a 100644
--- a/be/src/util/uid-util-test.cc
+++ b/be/src/util/uid-util-test.cc
@@ -31,12 +31,7 @@ TEST(UidUtil, FragmentInstanceId) {
 
   for (int i = 0; i < 100; ++i) {
     TUniqueId instance_id = CreateInstanceId(query_id, i);
-    EXPECT_EQ(instance_id.hi, query_id.hi);
-
-    TUniqueId qid = GetQueryId(instance_id);
-    EXPECT_EQ(qid.hi, query_id.hi);
-    EXPECT_EQ(qid.lo, query_id.lo);
-
+    EXPECT_EQ(GetQueryId(instance_id), query_id);
     EXPECT_EQ(GetInstanceIdx(instance_id), i);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/be/src/util/uid-util.h
----------------------------------------------------------------------
diff --git a/be/src/util/uid-util.h b/be/src/util/uid-util.h
index f0f87ec..de18464 100644
--- a/be/src/util/uid-util.h
+++ b/be/src/util/uid-util.h
@@ -45,7 +45,7 @@ inline void UUIDToTUniqueId(const boost::uuids::uuid& uuid, TUniqueId* unique_id
 /// Query id: uuid with bottom 4 bytes set to 0
 /// Fragment instance id: query id with instance index stored in the bottom 4 bytes
 
-const int64_t FRAGMENT_IDX_MASK = (1L << 32) - 1;
+constexpr int64_t FRAGMENT_IDX_MASK = (1L << 32) - 1;
 
 inline TUniqueId UuidToQueryId(const boost::uuids::uuid& uuid) {
   TUniqueId result;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/common/thrift/ExecStats.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ExecStats.thrift b/common/thrift/ExecStats.thrift
index 8068b63..8e88b20 100644
--- a/common/thrift/ExecStats.thrift
+++ b/common/thrift/ExecStats.thrift
@@ -56,7 +56,7 @@ struct TExecStats {
 // node as well as per instance stats.
 struct TPlanNodeExecSummary {
   1: required Types.TPlanNodeId node_id
-  2: required i32 fragment_id
+  2: required Types.TFragmentIdx fragment_idx
   3: required string label
   4: optional string label_detail
   5: required i32 num_children
@@ -64,15 +64,11 @@ struct TPlanNodeExecSummary {
   // Estimated stats generated by the planner
   6: optional TExecStats estimated_stats
 
-  // One entry for each BE executing this plan node.
+  // One entry for each fragment instance executing this plan node.
   7: optional list<TExecStats> exec_stats
 
-  // One entry for each BE executing this plan node. True if this plan node is still
-  // running.
-  8: optional list<bool> is_active
-
   // If true, this plan node is an exchange node that is the receiver of a broadcast.
-  9: optional bool is_broadcast
+  8: optional bool is_broadcast
 }
 
 // Progress counters for an in-flight query.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/common/thrift/Frontend.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/Frontend.thrift b/common/thrift/Frontend.thrift
index 95d6ba3..91322b2 100644
--- a/common/thrift/Frontend.thrift
+++ b/common/thrift/Frontend.thrift
@@ -340,6 +340,25 @@ struct TLoadDataResp {
   1: required Data.TResultRow load_summary
 }
 
+// Execution parameters for a single plan; component of TQueryExecRequest
+struct TPlanExecInfo {
+  // fragments[i] may consume the output of fragments[j > i];
+  // fragments[0] is the root fragment and also the coordinator fragment, if
+  // it is unpartitioned.
+  1: required list<Planner.TPlanFragment> fragments
+
+  // Specifies the destination fragment of the output of each fragment.
+  // dest_fragment_idx.size() == fragments.size() - 1 and
+  // fragments[i] sends its output to fragments[dest_fragment_idx[i-1]]
+  // TODO: remove; TPlanFragment.output_sink.dest_node_id is sufficient
+  2: optional list<i32> dest_fragment_idx
+
+  // A map from scan node ids to a list of scan range locations.
+  // The node ids refer to scan nodes in fragments[].plan_tree
+  3: optional map<Types.TPlanNodeId, list<Planner.TScanRangeLocations>>
+      per_node_scan_ranges
+}
+
 // Result of call to ImpalaPlanService/JniFrontend.CreateQueryRequest()
 struct TQueryExecRequest {
   // global descriptor tbl for all fragments
@@ -348,12 +367,7 @@ struct TQueryExecRequest {
   // fragments[i] may consume the output of fragments[j > i];
   // fragments[0] is the root fragment and also the coordinator fragment, if
   // it is unpartitioned.
-  2: required list<Planner.TPlanFragment> fragments
-
-  // Multi-threaded execution: sequence of plans; the last one materializes
-  // the query result
-  // TODO: this will eventually supercede 'fragments'
-  14: optional list<Planner.TPlanFragmentTree> mt_plans
+  2: optional list<Planner.TPlanFragment> fragments
 
   // Specifies the destination fragment of the output of each fragment.
   // parent_fragment_idx.size() == fragments.size() - 1 and
@@ -365,6 +379,12 @@ struct TQueryExecRequest {
   4: optional map<Types.TPlanNodeId, list<Planner.TScanRangeLocations>>
       per_node_scan_ranges
 
+  // Multi-threaded execution: exec info for all plans; the first one materializes
+  // the query result
+  // TODO: this will eventually supercede fields fragments, dest_fragment_idx,
+  // per_node_scan_ranges
+  14: optional list<TPlanExecInfo> mt_plan_exec_info
+
   // Metadata of the query result set (only for select)
   5: optional Results.TResultSetMetadata result_set_metadata
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/common/thrift/ImpalaInternalService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index 089524d..3ee54ae 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -181,10 +181,11 @@ struct TQueryOptions {
   // "name".
   43: optional TParquetFallbackSchemaResolution parquet_fallback_schema_resolution = 0
 
-  // Multi-threaded execution: number of cores per query per node.
-  // > 0: multi-threaded execution mode, with given number of cores
+  // Multi-threaded execution: degree of parallelism (= number of active threads) per
+  // query per backend.
+  // > 0: multi-threaded execution mode, with given dop
   // 0: single-threaded execution mode
-  44: optional i32 mt_num_cores = 0
+  44: optional i32 mt_dop = 0
 
   // If true, INSERT writes to S3 go directly to their final location rather than being
   // copied there by the coordinator. We cannot do this for INSERT OVERWRITES because for
@@ -252,6 +253,7 @@ struct TClientRequest {
 // TODO: Separate into FE/BE initialized vars.
 struct TQueryCtx {
   // Client request containing stmt to execute and query options.
+  // TODO: rename to client_request, we have too many requests
   1: required TClientRequest request
 
   // A globally unique id assigned to the entire query in the BE.
@@ -302,9 +304,6 @@ struct TQueryCtx {
 // fragment.
 struct TPlanFragmentCtx {
   1: required Planner.TPlanFragment fragment
-
-  // total number of instances of this fragment
-  2: required i32 num_fragment_instances
 }
 
 // A scan range plus the parameters needed to execute that scan.
@@ -329,45 +328,42 @@ struct TPlanFragmentDestination {
 // TODO: for range partitioning, we also need to specify the range boundaries
 struct TPlanFragmentInstanceCtx {
   // The globally unique fragment instance id.
-  // Format: query id + query-wide fragment index
-  // The query-wide fragment index starts at 0, so that the query id
-  // and the id of the first fragment instance (the coordinator instance)
-  // are identical.
+  // Format: query id + query-wide fragment instance index
+  // The query-wide fragment instance index starts at 0, so that the query id
+  // and the id of the first fragment instance are identical.
+  // If there is a coordinator instance, it is the first one, with index 0.
   1: required Types.TUniqueId fragment_instance_id
 
   // Index of this fragment instance accross all instances of its parent fragment,
   // range [0, TPlanFragmentCtx.num_fragment_instances).
-  2: required i32 fragment_instance_idx
-
-  // Index of this fragment instance in Coordinator::fragment_instance_states_.
-  // TODO: remove; this is subsumed by the query-wide instance idx embedded
-  // in the fragment_instance_id
-  3: required i32 instance_state_idx
+  2: required i32 per_fragment_instance_idx
 
   // Initial scan ranges for each scan node in TPlanFragment.plan_tree
-  4: required map<Types.TPlanNodeId, list<TScanRangeParams>> per_node_scan_ranges
+  3: required map<Types.TPlanNodeId, list<TScanRangeParams>> per_node_scan_ranges
 
   // Number of senders for ExchangeNodes contained in TPlanFragment.plan_tree;
   // needed to create a DataStreamRecvr
-  5: required map<Types.TPlanNodeId, i32> per_exch_num_senders
+  // TODO for per-query exec rpc: move these to TPlanFragmentCtx
+  4: required map<Types.TPlanNodeId, i32> per_exch_num_senders
 
   // Output destinations, one per output partition.
   // The partitioning of the output is specified by
   // TPlanFragment.output_sink.output_partition.
   // The number of output partitions is destinations.size().
-  6: list<TPlanFragmentDestination> destinations
+  // TODO for per-query exec rpc: move these to TPlanFragmentCtx
+  5: list<TPlanFragmentDestination> destinations
 
   // Debug options: perform some action in a particular phase of a particular node
-  7: optional Types.TPlanNodeId debug_node_id
-  8: optional PlanNodes.TExecNodePhase debug_phase
-  9: optional PlanNodes.TDebugAction debug_action
+  6: optional Types.TPlanNodeId debug_node_id
+  7: optional PlanNodes.TExecNodePhase debug_phase
+  8: optional PlanNodes.TDebugAction debug_action
 
   // The pool to which this request has been submitted. Used to update pool statistics
   // for admission control.
-  10: optional string request_pool
+  9: optional string request_pool
 
   // Id of this fragment in its role as a sender.
-  11: optional i32 sender_id
+  10: optional i32 sender_id
 }
 
 
@@ -461,31 +457,26 @@ struct TReportExecStatusParams {
   2: optional Types.TUniqueId query_id
 
   // required in V1
-  // Used to look up the fragment instance state in the coordinator, same value as
-  // TExecPlanFragmentParams.instance_state_idx.
-  3: optional i32 instance_state_idx
-
-  // required in V1
-  4: optional Types.TUniqueId fragment_instance_id
+  3: optional Types.TUniqueId fragment_instance_id
 
   // Status of fragment execution; any error status means it's done.
   // required in V1
-  5: optional Status.TStatus status
+  4: optional Status.TStatus status
 
   // If true, fragment finished executing.
   // required in V1
-  6: optional bool done
+  5: optional bool done
 
   // cumulative profile
   // required in V1
-  7: optional RuntimeProfile.TRuntimeProfileTree profile
+  6: optional RuntimeProfile.TRuntimeProfileTree profile
 
   // Cumulative structural changes made by a table sink
   // optional in V1
-  8: optional TInsertExecStatus insert_exec_status;
+  7: optional TInsertExecStatus insert_exec_status;
 
   // New errors that have not been reported to the coordinator
-  9: optional map<ErrorCodes.TErrorCode, TErrorLogEntry> error_log;
+  8: optional map<ErrorCodes.TErrorCode, TErrorLogEntry> error_log;
 }
 
 struct TReportExecStatusResult {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/common/thrift/ImpalaService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift
index da41a8e..129be2d 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -213,8 +213,9 @@ enum TImpalaQueryOptions {
   // is always, since fields IDs are NYI). Valid values are "position" and "name".
   PARQUET_FALLBACK_SCHEMA_RESOLUTION,
 
-  // Multi-threaded execution: number of cores per machine
-  MT_NUM_CORES,
+  // Multi-threaded execution: degree of parallelism = number of active threads per
+  // backend
+  MT_DOP,
 
   // If true, INSERT writes to S3 go directly to their final location rather than being
   // copied there by the coordinator. We cannot do this for INSERT OVERWRITES because for

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/common/thrift/Planner.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/Planner.thrift b/common/thrift/Planner.thrift
index eb95585..92c8681 100644
--- a/common/thrift/Planner.thrift
+++ b/common/thrift/Planner.thrift
@@ -31,11 +31,14 @@ include "Partitions.thrift"
 // plan fragment, including how to produce and how to partition its output.
 // It leaves out node-specific parameters needed for the actual execution.
 struct TPlanFragment {
+  // Ordinal number of fragment within a query; range: 0..<total # fragments>
+  1: required Types.TFragmentIdx idx
+
   // display name to be shown in the runtime profile; unique within a query
-  1: required string display_name
+  2: required string display_name
 
   // no plan or descriptor table: query without From clause
-  2: optional PlanNodes.TPlan plan
+  3: optional PlanNodes.TPlan plan
 
   // exprs that produce values for slots of output tuple (one expr per slot);
   // if not set, plan fragment materializes full rows of plan_tree
@@ -74,6 +77,8 @@ struct TScanRangeLocation {
 }
 
 // A single scan range plus the hosts that serve it
+// TODO: rename to TScanRangeLocationList, having this differ from the above struct
+// by only a single letter has caused needless confusion
 struct TScanRangeLocations {
   1: required PlanNodes.TScanRange scan_range
   // non-empty list

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/common/thrift/Types.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/Types.thrift b/common/thrift/Types.thrift
index 770a414..30d168d 100644
--- a/common/thrift/Types.thrift
+++ b/common/thrift/Types.thrift
@@ -19,6 +19,7 @@ namespace cpp impala
 namespace java org.apache.impala.thrift
 
 typedef i64 TTimestamp
+typedef i32 TFragmentIdx
 typedef i32 TPlanNodeId
 typedef i32 TTupleId
 typedef i32 TSlotId

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/fe/src/main/java/org/apache/impala/common/TreeNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/common/TreeNode.java b/fe/src/main/java/org/apache/impala/common/TreeNode.java
index adaee18..3231e33 100644
--- a/fe/src/main/java/org/apache/impala/common/TreeNode.java
+++ b/fe/src/main/java/org/apache/impala/common/TreeNode.java
@@ -51,6 +51,22 @@ public abstract class TreeNode<NodeType extends TreeNode<NodeType>> {
   public ArrayList<NodeType> getChildren() { return children_; }
 
   /**
+   * Return list of all nodes of the tree rooted at 'this', obtained
+   * through pre-order traversal.
+   */
+  public <C extends TreeNode<NodeType>> ArrayList<C> getNodesPreOrder() {
+    ArrayList<C> result = new ArrayList<C>();
+    getNodesPreOrderAux(result);
+    return result;
+  }
+
+  protected <C extends TreeNode<NodeType>> void getNodesPreOrderAux(
+      ArrayList<C> result) {
+    result.add((C) this);
+    for (NodeType child: children_) child.getNodesPreOrderAux(result);
+  }
+
+  /**
    * Count the total number of nodes in this tree. Leaf node will return 1.
    * Non-leaf node will include all its children.
    */

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/fe/src/main/java/org/apache/impala/planner/Planner.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/Planner.java b/fe/src/main/java/org/apache/impala/planner/Planner.java
index e50aca5..405eebe 100644
--- a/fe/src/main/java/org/apache/impala/planner/Planner.java
+++ b/fe/src/main/java/org/apache/impala/planner/Planner.java
@@ -59,6 +59,11 @@ public class Planner {
     ctx_ = new PlannerContext(analysisResult, queryCtx);
   }
 
+  public TQueryCtx getQueryCtx() { return ctx_.getQueryCtx(); }
+  public AnalysisContext.AnalysisResult getAnalysisResult() {
+    return ctx_.getAnalysisResult();
+  }
+
   /**
    * Returns a list of plan fragments for executing an analyzed parse tree.
    * May return a single-node or distributed executable plan. If enabled (through a
@@ -204,6 +209,24 @@ public class Planner {
   /**
    * Return combined explain string for all plan fragments.
    * Includes the estimated resource requirements from the request if set.
+   * Uses a default level of EXTENDED, unless overriden by the
+   * 'explain_level' query option.
+   */
+  public String getExplainString(ArrayList<PlanFragment> fragments,
+      TQueryExecRequest request) {
+    // use EXTENDED by default for all non-explain statements
+    TExplainLevel explainLevel = TExplainLevel.EXTENDED;
+    // use the query option for explain stmts and tests (e.g., planner tests)
+    if (ctx_.getAnalysisResult().isExplainStmt() || RuntimeEnv.INSTANCE.isTestEnv()) {
+      explainLevel = ctx_.getQueryOptions().getExplain_level();
+    }
+    return getExplainString(fragments, request, explainLevel);
+  }
+
+  /**
+   * Return combined explain string for all plan fragments and with an
+   * explicit explain level.
+   * Includes the estimated resource requirements from the request if set.
    */
   public String getExplainString(ArrayList<PlanFragment> fragments,
       TQueryExecRequest request, TExplainLevel explainLevel) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/fe/src/main/java/org/apache/impala/service/Frontend.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java
index fe1f8f1..00a3d93 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -120,6 +120,7 @@ import org.apache.impala.thrift.TLineageGraph;
 import org.apache.impala.thrift.TLoadDataReq;
 import org.apache.impala.thrift.TLoadDataResp;
 import org.apache.impala.thrift.TMetadataOpRequest;
+import org.apache.impala.thrift.TPlanExecInfo;
 import org.apache.impala.thrift.TPlanFragment;
 import org.apache.impala.thrift.TPlanFragmentTree;
 import org.apache.impala.thrift.TQueryCtx;
@@ -908,70 +909,123 @@ public class Frontend {
   }
 
   /**
-   * Create a populated TExecRequest corresponding to the supplied TQueryCtx.
+   * Return a TPlanExecInfo corresponding to the plan with root fragment 'planRoot'.
    */
-  public TExecRequest createExecRequest(TQueryCtx queryCtx, StringBuilder explainString)
-      throws ImpalaException {
-    // Analyze the statement
-    AnalysisContext.AnalysisResult analysisResult = analyzeStmt(queryCtx);
-    EventSequence timeline = analysisResult.getAnalyzer().getTimeline();
-    timeline.markEvent("Analysis finished");
-    Preconditions.checkNotNull(analysisResult.getStmt());
-    TExecRequest result = new TExecRequest();
-    result.setQuery_options(queryCtx.request.getQuery_options());
-    result.setAccess_events(analysisResult.getAccessEvents());
-    result.analysis_warnings = analysisResult.getAnalyzer().getWarnings();
+  private TPlanExecInfo createPlanExecInfo(PlanFragment planRoot, Planner planner,
+      TQueryCtx queryCtx, TQueryExecRequest queryExecRequest) {
+    TPlanExecInfo result = new TPlanExecInfo();
+    ArrayList<PlanFragment> fragments = planRoot.getNodesPreOrder();
 
-    if (analysisResult.isCatalogOp()) {
-      result.stmt_type = TStmtType.DDL;
-      createCatalogOpRequest(analysisResult, result);
-      TLineageGraph thriftLineageGraph = analysisResult.getThriftLineageGraph();
-      if (thriftLineageGraph != null && thriftLineageGraph.isSetQuery_text()) {
-        result.catalog_op_request.setLineage_graph(thriftLineageGraph);
-      }
-      // All DDL operations except for CTAS are done with analysis at this point.
-      if (!analysisResult.isCreateTableAsSelectStmt()) return result;
-    } else if (analysisResult.isLoadDataStmt()) {
-      result.stmt_type = TStmtType.LOAD;
-      result.setResult_set_metadata(new TResultSetMetadata(Arrays.asList(
-          new TColumn("summary", Type.STRING.toThrift()))));
-      result.setLoad_data_request(analysisResult.getLoadDataStmt().toThrift());
-      return result;
-    } else if (analysisResult.isSetStmt()) {
-      result.stmt_type = TStmtType.SET;
-      result.setResult_set_metadata(new TResultSetMetadata(Arrays.asList(
-          new TColumn("option", Type.STRING.toThrift()),
-          new TColumn("value", Type.STRING.toThrift()))));
-      result.setSet_query_option_request(analysisResult.getSetStmt().toThrift());
-      return result;
+    // map from fragment to its index in TPlanExecInfo.fragments; needed for
+    // TPlanExecInfo.dest_fragment_idx
+    List<ScanNode> scanNodes = Lists.newArrayList();
+    Map<PlanFragment, Integer> fragmentIdx = Maps.newHashMap();
+    for (int idx = 0; idx < fragments.size(); ++idx) {
+      PlanFragment fragment = fragments.get(idx);
+      Preconditions.checkNotNull(fragment.getPlanRoot());
+      fragment.getPlanRoot().collect(Predicates.instanceOf(ScanNode.class), scanNodes);
+      fragmentIdx.put(fragment, idx);
     }
 
-    // create TQueryExecRequest
-    Preconditions.checkState(analysisResult.isQueryStmt() || analysisResult.isDmlStmt()
-        || analysisResult.isCreateTableAsSelectStmt() || analysisResult.isUpdateStmt()
-        || analysisResult.isDeleteStmt());
+    // set fragment destinations
+    for (int i = 1; i < fragments.size(); ++i) {
+      PlanFragment dest = fragments.get(i).getDestFragment();
+      Integer idx = fragmentIdx.get(dest);
+      Preconditions.checkState(idx != null);
+      result.addToDest_fragment_idx(idx.intValue());
+    }
 
-    TQueryExecRequest queryExecRequest = new TQueryExecRequest();
-    // create plan
-    LOG.debug("create plan");
-    Planner planner = new Planner(analysisResult, queryCtx);
-    if (RuntimeEnv.INSTANCE.isTestEnv()
-        && queryCtx.request.query_options.mt_num_cores > 0) {
-      // TODO: this is just to be able to run tests; implement this
-      List<PlanFragment> planRoots = planner.createParallelPlans();
-      for (PlanFragment planRoot: planRoots) {
-        TPlanFragmentTree thriftPlan = planRoot.treeToThrift();
-        queryExecRequest.addToMt_plans(thriftPlan);
+    // Set scan ranges/locations for scan nodes.
+    LOG.debug("get scan range locations");
+    Set<TTableName> tablesMissingStats = Sets.newTreeSet();
+    Set<TTableName> tablesWithCorruptStats = Sets.newTreeSet();
+    for (ScanNode scanNode: scanNodes) {
+      result.putToPer_node_scan_ranges(
+          scanNode.getId().asInt(), scanNode.getScanRangeLocations());
+      if (scanNode.isTableMissingStats()) {
+        tablesMissingStats.add(scanNode.getTupleDesc().getTableName().toThrift());
       }
-      queryExecRequest.setDesc_tbl(analysisResult.getAnalyzer().getDescTbl().toThrift());
-      queryExecRequest.setQuery_ctx(queryCtx);
-      explainString.append(planner.getExplainString(
-          Lists.newArrayList(planRoots.get(0)), queryExecRequest,
-          TExplainLevel.STANDARD));
-      queryExecRequest.setQuery_plan(explainString.toString());
-      result.setQuery_exec_request(queryExecRequest);
-      return result;
+      if (scanNode.hasCorruptTableStats()) {
+        tablesWithCorruptStats.add(scanNode.getTupleDesc().getTableName().toThrift());
+      }
+    }
+
+    for (TTableName tableName: tablesMissingStats) {
+      queryCtx.addToTables_missing_stats(tableName);
     }
+    for (TTableName tableName: tablesWithCorruptStats) {
+      queryCtx.addToTables_with_corrupt_stats(tableName);
+    }
+
+    // The fragment at this point has all state set, serialize it to thrift.
+    for (PlanFragment fragment: fragments) {
+      TPlanFragment thriftFragment = fragment.toThrift();
+      result.addToFragments(thriftFragment);
+    }
+
+    return result;
+  }
+
+  /**
+   * Create a populated TQueryExecRequest, corresponding to the supplied planner,
+   * for multi-threaded execution.
+   */
+  private TQueryExecRequest mtCreateExecRequest(
+      Planner planner, StringBuilder explainString)
+      throws ImpalaException {
+    TQueryCtx queryCtx = planner.getQueryCtx();
+    Preconditions.checkState(queryCtx.request.query_options.mt_dop > 0);
+    // for now, always disable spilling in the backend
+    // TODO-MT: re-enable spilling
+    queryCtx.setDisable_spilling(true);
+    TQueryExecRequest result = new TQueryExecRequest();
+
+    LOG.debug("create mt plan");
+    List<PlanFragment> planRoots = planner.createParallelPlans();
+
+    // create EXPLAIN output
+    result.setQuery_ctx(queryCtx);  // needed by getExplainString()
+    explainString.append(
+        planner.getExplainString(Lists.newArrayList(planRoots.get(0)), result));
+    result.setQuery_plan(explainString.toString());
+
+    // create per-plan exec info;
+    // also assemble list of names of tables with missing or corrupt stats for
+    // assembling a warning message
+    for (PlanFragment planRoot: planRoots) {
+      result.addToMt_plan_exec_info(
+          createPlanExecInfo(planRoot, planner, queryCtx, result));
+    }
+
+    // assign fragment ids
+    int idx = 0;
+    for (TPlanExecInfo planExecInfo: result.mt_plan_exec_info) {
+      for (TPlanFragment fragment: planExecInfo.fragments) fragment.setIdx(idx++);
+    }
+
+    // TODO-MT: implement
+    // Compute resource requirements after scan range locations because the cost
+    // estimates of scan nodes rely on them.
+    //try {
+      //planner.computeResourceReqs(fragments, true, queryExecRequest);
+    //} catch (Exception e) {
+      //// Turn exceptions into a warning to allow the query to execute.
+      //LOG.error("Failed to compute resource requirements for query\n" +
+          //queryCtx.request.getStmt(), e);
+    //}
+
+    return result;
+  }
+
+  /**
+   * Create a populated TQueryExecRequest corresponding to the supplied TQueryCtx.
+   * TODO-MT: remove this function and rename mtCreateExecRequest() to
+   * createExecRequest()
+   */
+  private TQueryExecRequest createExecRequest(
+      Planner planner, StringBuilder explainString)
+      throws ImpalaException {
+    LOG.debug("create plan");
     ArrayList<PlanFragment> fragments = planner.createPlan();
 
     List<ScanNode> scanNodes = Lists.newArrayList();
@@ -986,12 +1040,13 @@ public class Frontend {
       fragmentIdx.put(fragment, idx);
     }
 
+    TQueryExecRequest result = new TQueryExecRequest();
     // set fragment destinations
     for (int i = 1; i < fragments.size(); ++i) {
       PlanFragment dest = fragments.get(i).getDestFragment();
       Integer idx = fragmentIdx.get(dest);
       Preconditions.checkState(idx != null);
-      queryExecRequest.addToDest_fragment_idx(idx.intValue());
+      result.addToDest_fragment_idx(idx.intValue());
     }
 
     // Set scan ranges/locations for scan nodes.
@@ -1001,9 +1056,8 @@ public class Frontend {
     // Assemble a similar list for corrupt stats
     Set<TTableName> tablesWithCorruptStats = Sets.newTreeSet();
     for (ScanNode scanNode: scanNodes) {
-      queryExecRequest.putToPer_node_scan_ranges(
-          scanNode.getId().asInt(),
-          scanNode.getScanRangeLocations());
+      result.putToPer_node_scan_ranges(
+          scanNode.getId().asInt(), scanNode.getScanRangeLocations());
       if (scanNode.isTableMissingStats()) {
         tablesMissingStats.add(scanNode.getTupleDesc().getTableName().toThrift());
       }
@@ -1012,7 +1066,7 @@ public class Frontend {
       }
     }
 
-    queryExecRequest.setHost_list(analysisResult.getAnalyzer().getHostIndex().getList());
+    TQueryCtx queryCtx = planner.getQueryCtx();
     for (TTableName tableName: tablesMissingStats) {
       queryCtx.addToTables_missing_stats(tableName);
     }
@@ -1022,6 +1076,7 @@ public class Frontend {
 
     // Optionally disable spilling in the backend. Allow spilling if there are plan hints
     // or if all tables have stats.
+    AnalysisContext.AnalysisResult analysisResult = planner.getAnalysisResult();
     if (queryCtx.request.query_options.isDisable_unsafe_spills()
         && !tablesMissingStats.isEmpty()
         && !analysisResult.getAnalyzer().hasPlanHints()) {
@@ -1031,33 +1086,83 @@ public class Frontend {
     // Compute resource requirements after scan range locations because the cost
     // estimates of scan nodes rely on them.
     try {
-      planner.computeResourceReqs(fragments, true, queryExecRequest);
+      planner.computeResourceReqs(fragments, true, result);
     } catch (Exception e) {
       // Turn exceptions into a warning to allow the query to execute.
       LOG.error("Failed to compute resource requirements for query\n" +
           queryCtx.request.getStmt(), e);
     }
 
-    // The fragment at this point has all state set, serialize it to thrift.
-    for (PlanFragment fragment: fragments) {
+    // The fragment at this point has all state set, assign sequential ids
+    // and serialize to thrift.
+    for (int i = 0; i < fragments.size(); ++i) {
+      PlanFragment fragment = fragments.get(i);
       TPlanFragment thriftFragment = fragment.toThrift();
-      queryExecRequest.addToFragments(thriftFragment);
+      thriftFragment.setIdx(i);
+      result.addToFragments(thriftFragment);
     }
 
-    // Use EXTENDED by default for all non-explain statements.
-    TExplainLevel explainLevel = TExplainLevel.EXTENDED;
-    // Use the query option for explain stmts and tests (e.g., planner tests).
-    if (analysisResult.isExplainStmt() || RuntimeEnv.INSTANCE.isTestEnv()) {
-      explainLevel = queryCtx.request.query_options.getExplain_level();
+    result.setQuery_ctx(queryCtx);  // needed by getExplainString()
+    explainString.append(planner.getExplainString(fragments, result));
+    result.setQuery_plan(explainString.toString());
+    return result;
+  }
+
+  /**
+   * Create a populated TExecRequest corresponding to the supplied TQueryCtx.
+   */
+  public TExecRequest createExecRequest(TQueryCtx queryCtx, StringBuilder explainString)
+      throws ImpalaException {
+    // Analyze the statement
+    AnalysisContext.AnalysisResult analysisResult = analyzeStmt(queryCtx);
+    EventSequence timeline = analysisResult.getAnalyzer().getTimeline();
+    timeline.markEvent("Analysis finished");
+    Preconditions.checkNotNull(analysisResult.getStmt());
+    TExecRequest result = new TExecRequest();
+    result.setQuery_options(queryCtx.request.getQuery_options());
+    result.setAccess_events(analysisResult.getAccessEvents());
+    result.analysis_warnings = analysisResult.getAnalyzer().getWarnings();
+
+    if (analysisResult.isCatalogOp()) {
+      result.stmt_type = TStmtType.DDL;
+      createCatalogOpRequest(analysisResult, result);
+      TLineageGraph thriftLineageGraph = analysisResult.getThriftLineageGraph();
+      if (thriftLineageGraph != null && thriftLineageGraph.isSetQuery_text()) {
+        result.catalog_op_request.setLineage_graph(thriftLineageGraph);
+      }
+      // All DDL operations except for CTAS are done with analysis at this point.
+      if (!analysisResult.isCreateTableAsSelectStmt()) return result;
+    } else if (analysisResult.isLoadDataStmt()) {
+      result.stmt_type = TStmtType.LOAD;
+      result.setResult_set_metadata(new TResultSetMetadata(Arrays.asList(
+          new TColumn("summary", Type.STRING.toThrift()))));
+      result.setLoad_data_request(analysisResult.getLoadDataStmt().toThrift());
+      return result;
+    } else if (analysisResult.isSetStmt()) {
+      result.stmt_type = TStmtType.SET;
+      result.setResult_set_metadata(new TResultSetMetadata(Arrays.asList(
+          new TColumn("option", Type.STRING.toThrift()),
+          new TColumn("value", Type.STRING.toThrift()))));
+      result.setSet_query_option_request(analysisResult.getSetStmt().toThrift());
+      return result;
     }
 
-    // Global query parameters to be set in each TPlanExecRequest.
-    queryExecRequest.setQuery_ctx(queryCtx);
+    // create TQueryExecRequest
+    Preconditions.checkState(analysisResult.isQueryStmt() || analysisResult.isDmlStmt()
+        || analysisResult.isCreateTableAsSelectStmt() || analysisResult.isUpdateStmt()
+        || analysisResult.isDeleteStmt());
 
-    explainString.append(
-        planner.getExplainString(fragments, queryExecRequest, explainLevel));
-    queryExecRequest.setQuery_plan(explainString.toString());
-    queryExecRequest.setDesc_tbl(analysisResult.getAnalyzer().getDescTbl().toThrift());
+    Planner planner = new Planner(analysisResult, queryCtx);
+    TQueryExecRequest queryExecRequest;
+    if (analysisResult.isQueryStmt() && queryCtx.request.query_options.mt_dop > 0) {
+      queryExecRequest = mtCreateExecRequest(planner, explainString);
+    } else {
+      queryExecRequest = createExecRequest(planner, explainString);
+    }
+    queryExecRequest.setDesc_tbl(
+        planner.getAnalysisResult().getAnalyzer().getDescTbl().toThrift());
+    queryExecRequest.setQuery_ctx(queryCtx);
+    queryExecRequest.setHost_list(analysisResult.getAnalyzer().getHostIndex().getList());
 
     TLineageGraph thriftLineageGraph = analysisResult.getThriftLineageGraph();
     if (thriftLineageGraph != null && thriftLineageGraph.isSetQuery_text()) {
@@ -1071,7 +1176,6 @@ public class Frontend {
     }
 
     result.setQuery_exec_request(queryExecRequest);
-
     if (analysisResult.isQueryStmt()) {
       // fill in the metadata
       LOG.debug("create result set metadata");

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java b/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
index 4464203..284d7e5 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
@@ -374,9 +374,9 @@ public class PlannerTestBase extends FrontendTestBase {
   }
 
   /**
-   * Produces single-node and distributed plans for testCase and compares
+   * Produces single-node, distributed, and parallel plans for testCase and compares
    * plan and scan range results.
-   * Appends the actual single-node and distributed plan as well as the printed
+   * Appends the actual plans as well as the printed
    * scan ranges to actualOutput, along with the requisite section header.
    * locations to actualScanRangeLocations; compares both to the appropriate sections
    * of 'testCase'.
@@ -430,7 +430,7 @@ public class PlannerTestBase extends FrontendTestBase {
           ImpalaInternalServiceConstants.NUM_NODES_ALL);
     }
     if (section == Section.PARALLELPLANS) {
-      queryCtx.request.query_options.mt_num_cores = 2;
+      queryCtx.request.query_options.mt_dop = 2;
     }
     ArrayList<String> expectedPlan = testCase.getSectionContents(section);
     boolean sectionExists = expectedPlan != null && !expectedPlan.isEmpty();