You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2016/10/31 16:06:04 UTC

[2/3] incubator-impala git commit: IMPALA-4314: Standardize on MT-related data structures

IMPALA-4314: Standardize on MT-related data structures

This removes the data structures that were "superceded" in
IMPALA-3903 and changes all control flow to utilize the
new data structures. The new data structures are renamed
to remove the "Mt" prefix.

Change-Id: I465d0e15e2cf17cafe4c747d34c8f595d3645151
Reviewed-on: http://gerrit.cloudera.org:8080/4853
Reviewed-by: Marcel Kornacker <ma...@cloudera.com>
Tested-by: Tim Armstrong <ta...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/0d857237
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/0d857237
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/0d857237

Branch: refs/heads/master
Commit: 0d857237a8d5231caf9af4c30d4134b151cd1e0d
Parents: 29faca5
Author: Marcel Kornacker <ma...@cloudera.com>
Authored: Tue Oct 4 20:40:18 2016 -0400
Committer: Tim Armstrong <ta...@cloudera.com>
Committed: Mon Oct 31 16:03:32 2016 +0000

----------------------------------------------------------------------
 be/src/benchmarks/expr-benchmark.cc             |   2 +-
 be/src/runtime/coordinator.cc                   | 415 ++++--------------
 be/src/runtime/coordinator.h                    |  71 +---
 be/src/scheduling/query-schedule.cc             | 175 ++++----
 be/src/scheduling/query-schedule.h              |  77 ++--
 be/src/scheduling/simple-scheduler-test-util.cc |  10 +-
 be/src/scheduling/simple-scheduler-test-util.h  |  14 +-
 be/src/scheduling/simple-scheduler.cc           | 417 +++++++------------
 be/src/scheduling/simple-scheduler.h            |  81 ++--
 be/src/service/impala-http-handler.cc           |   7 +-
 be/src/service/impala-server.cc                 |   6 +-
 be/src/service/query-exec-state.cc              |   8 +-
 common/thrift/Frontend.thrift                   |  50 +--
 common/thrift/Planner.thrift                    |   4 +-
 .../impala/planner/DataSourceScanNode.java      |   4 +-
 .../apache/impala/planner/HBaseScanNode.java    |   6 +-
 .../org/apache/impala/planner/HdfsScanNode.java |   6 +-
 .../org/apache/impala/planner/KuduScanNode.java |   4 +-
 .../java/org/apache/impala/planner/Planner.java |   4 +-
 .../org/apache/impala/planner/ScanNode.java     |   6 +-
 .../org/apache/impala/service/Frontend.java     | 187 ++-------
 .../apache/impala/planner/PlannerTestBase.java  | 187 +++++----
 .../queries/PlannerTest/mt-dop-validation.test  |  50 +--
 23 files changed, 627 insertions(+), 1164 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0d857237/be/src/benchmarks/expr-benchmark.cc
----------------------------------------------------------------------
diff --git a/be/src/benchmarks/expr-benchmark.cc b/be/src/benchmarks/expr-benchmark.cc
index b95e6d7..444df3a 100644
--- a/be/src/benchmarks/expr-benchmark.cc
+++ b/be/src/benchmarks/expr-benchmark.cc
@@ -111,7 +111,7 @@ MemTracker tracker;
 // constant query
 static Status PrepareSelectList(const TExecRequest& request, ExprContext** ctx) {
   const TQueryExecRequest& query_request = request.query_exec_request;
-  vector<TExpr> texprs = query_request.fragments[0].output_exprs;
+  vector<TExpr> texprs = query_request.plan_exec_info[0].fragments[0].output_exprs;
   DCHECK_EQ(texprs.size(), 1);
   RETURN_IF_ERROR(Expr::CreateExprTree(&pool, texprs[0], ctx));
   RETURN_IF_ERROR((*ctx)->Prepare(NULL, RowDescriptor(), &tracker));

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0d857237/be/src/runtime/coordinator.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 7df0bf0..f719e98 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -125,39 +125,17 @@ struct DebugOptions {
 /// - updates through UpdateFragmentExecStatus()
 class Coordinator::FragmentInstanceState {
  public:
-  // TODO-MT: remove this c'tor
-  FragmentInstanceState(FragmentIdx fragment_idx, const FragmentExecParams& params,
-      int per_fragment_instance_idx, ObjectPool* obj_pool)
-    : fragment_instance_id_(params.instance_ids[per_fragment_instance_idx]),
-      fragment_idx_(fragment_idx),
-      per_fragment_instance_idx_(per_fragment_instance_idx),
-      impalad_address_(params.hosts[per_fragment_instance_idx]),
-      total_split_size_(0),
-      rpc_sent_(false),
-      done_(false),
-      profile_created_(false),
-      profile_(NULL),
-      total_ranges_complete_(0),
-      rpc_latency_(0) {
-    const string& profile_name = Substitute("Instance $0 (host=$1)",
-        PrintId(fragment_instance_id_), lexical_cast<string>(impalad_address_));
-    profile_ = obj_pool->Add(new RuntimeProfile(obj_pool, profile_name));
-  }
-
   FragmentInstanceState(const FInstanceExecParams& params, ObjectPool* obj_pool)
-    : fragment_instance_id_(params.instance_id),
-      fragment_idx_(params.fragment().idx),
-      per_fragment_instance_idx_(params.per_fragment_instance_idx),
-      impalad_address_(params.host),
+    : exec_params_(params),
       total_split_size_(0),
+      profile_(nullptr),
+      total_ranges_complete_(0),
+      rpc_latency_(0),
       rpc_sent_(false),
       done_(false),
-      profile_created_(false),
-      profile_(NULL),
-      total_ranges_complete_(0),
-      rpc_latency_(0) {
+      profile_created_(false) {
     const string& profile_name = Substitute("Instance $0 (host=$1)",
-        PrintId(fragment_instance_id_), lexical_cast<string>(impalad_address_));
+        PrintId(params.instance_id), lexical_cast<string>(params.host));
     profile_ = obj_pool->Add(new RuntimeProfile(obj_pool, profile_name));
   }
 
@@ -182,13 +160,13 @@ class Coordinator::FragmentInstanceState {
   int64_t UpdateNumScanRangesCompleted();
 
   // The following getters do not require lock() to be held.
-  const TUniqueId& fragment_instance_id() const { return fragment_instance_id_; }
-  FragmentIdx fragment_idx() const { return fragment_idx_; }
+  const TUniqueId& fragment_instance_id() const { return exec_params_.instance_id; }
+  FragmentIdx fragment_idx() const { return exec_params_.fragment().idx; }
   MonotonicStopWatch* stopwatch() { return &stopwatch_; }
-  const TNetworkAddress& impalad_address() const { return impalad_address_; }
+  const TNetworkAddress& impalad_address() const { return exec_params_.host; }
   int64_t total_split_size() const { return total_split_size_; }
   bool done() const { return done_; }
-  int per_fragment_instance_idx() const { return per_fragment_instance_idx_; }
+  int per_fragment_instance_idx() const { return exec_params_.per_fragment_instance_idx; }
   bool rpc_sent() const { return rpc_sent_; }
   int64_t rpc_latency() const { return rpc_latency_; }
 
@@ -218,22 +196,11 @@ class Coordinator::FragmentInstanceState {
   }
 
  private:
-  /// The unique ID of this instance of this fragment (there may be many instance of the
-  /// same fragment, but this ID uniquely identifies this FragmentInstanceState).
-  TUniqueId fragment_instance_id_;
-
-  // Same as TPlanFragment.idx
-  FragmentIdx fragment_idx_;
-
-  /// range: 0..<# instances of this fragment>-1
-  int per_fragment_instance_idx_;
+  const FInstanceExecParams& exec_params_;
 
   /// Wall clock timer for this fragment.
   MonotonicStopWatch stopwatch_;
 
-  /// Address of ImpalaInternalService this fragment is running on.
-  const TNetworkAddress impalad_address_;
-
   /// Summed across all splits; in bytes.
   int64_t total_split_size_;
 
@@ -246,16 +213,6 @@ class Coordinator::FragmentInstanceState {
   /// been initiated; either way, execution must not be cancelled.
   Status status_;
 
-  /// If true, ExecPlanFragment() rpc has been sent - even if it was not determined to be
-  /// successful.
-  bool rpc_sent_;
-
-  /// If true, execution terminated; do not cancel in that case.
-  bool done_;
-
-  /// True after the first call to profile->Update()
-  bool profile_created_;
-
   /// Owned by coordinator object pool provided in the c'tor
   RuntimeProfile* profile_;
 
@@ -270,6 +227,32 @@ class Coordinator::FragmentInstanceState {
 
   /// Time, in ms, that it took to execute the ExecRemoteFragment() RPC.
   int64_t rpc_latency_;
+
+  /// If true, ExecPlanFragment() rpc has been sent - even if it was not determined to be
+  /// successful.
+  bool rpc_sent_;
+
+  /// If true, execution terminated; do not cancel in that case.
+  bool done_;
+
+  /// True after the first call to profile->Update()
+  bool profile_created_;
+};
+
+/// Represents a runtime filter target.
+struct Coordinator::FilterTarget {
+  TPlanNodeId node_id;
+  bool is_local;
+  bool is_bound_by_partition_columns;
+
+  // indices into fragment_instance_states_
+  unordered_set<int> fragment_instance_state_idxs;
+
+  FilterTarget(const TRuntimeFilterTargetDesc& tFilterTarget) {
+    node_id = tFilterTarget.node_id;
+    is_bound_by_partition_columns = tFilterTarget.is_bound_by_partition_columns;
+    is_local = tFilterTarget.is_local_target;
+  }
 };
 
 
@@ -410,7 +393,6 @@ TExecNodePhase::type GetExecNodePhase(const string& key) {
   return TExecNodePhase::INVALID;
 }
 
-// TODO: templatize this
 TDebugAction::type GetDebugAction(const string& key) {
   map<int, const char*>::const_iterator entry =
       _TDebugAction_VALUES_TO_NAMES.begin();
@@ -451,7 +433,7 @@ static void ProcessQueryOptions(
 
 Status Coordinator::Exec() {
   const TQueryExecRequest& request = schedule_.request();
-  DCHECK(request.fragments.size() > 0 || request.mt_plan_exec_info.size() > 0);
+  DCHECK(request.plan_exec_info.size() > 0);
 
   needs_finalization_ = request.__isset.finalize_params;
   if (needs_finalization_) finalize_params_ = request.finalize_params;
@@ -474,6 +456,10 @@ Status Coordinator::Exec() {
   const string& str = Substitute("Query $0", PrintId(query_id_));
   progress_.Init(str, schedule_.num_scan_ranges());
 
+  // runtime filters not yet supported for mt execution
+  bool is_mt_execution = request.query_ctx.request.query_options.mt_dop > 0;
+  if (is_mt_execution) filter_mode_ = TRuntimeFilterMode::OFF;
+
   // to keep things simple, make async Cancel() calls wait until plan fragment
   // execution has been initiated, otherwise we might try to cancel fragment
   // execution at Impala daemons where it hasn't even started
@@ -494,16 +480,9 @@ Status Coordinator::Exec() {
   filter_mem_tracker_.reset(
       new MemTracker(-1, "Runtime Filter (Coordinator)", query_mem_tracker(), false));
 
-  // Initialize the execution profile structures.
-  bool is_mt_execution = request.query_ctx.request.query_options.mt_dop > 0;
-  if (is_mt_execution) {
-    MtInitExecProfiles();
-    MtInitExecSummary();
-    MtStartFInstances();
-  } else {
-    InitExecProfile(request);
-    StartFragments();
-  }
+  InitExecProfiles();
+  InitExecSummary();
+  StartFInstances();
 
   // In the error case, it's safe to return and not to get root_sink_ here to close - if
   // there was an error, but the coordinator fragment was successfully started, it should
@@ -543,13 +522,16 @@ Status Coordinator::Exec() {
   return Status::OK();
 }
 
-void Coordinator::UpdateFilterRoutingTable(const vector<TPlanNode>& plan_nodes,
-    int num_hosts, int start_fragment_instance_state_idx) {
+void Coordinator::UpdateFilterRoutingTable(const FragmentExecParams& fragment_params) {
+  DCHECK(schedule_.request().query_ctx.request.query_options.mt_dop == 0);
+  int num_hosts = fragment_params.instance_exec_params.size();
+  DCHECK_GT(num_hosts, 0);
   DCHECK_NE(filter_mode_, TRuntimeFilterMode::OFF)
       << "UpdateFilterRoutingTable() called although runtime filters are disabled";
   DCHECK(!filter_routing_table_complete_)
       << "UpdateFilterRoutingTable() called after setting filter_routing_table_complete_";
-  for (const TPlanNode& plan_node: plan_nodes) {
+
+  for (const TPlanNode& plan_node: fragment_params.fragment.plan.nodes) {
     if (!plan_node.__isset.runtime_filters) continue;
     for (const TRuntimeFilterDesc& filter: plan_node.runtime_filters) {
       if (filter_mode_ == TRuntimeFilterMode::LOCAL && !filter.has_local_targets) {
@@ -564,10 +546,7 @@ void Coordinator::UpdateFilterRoutingTable(const vector<TPlanNode>& plan_nodes,
         int pending_count = filter.is_broadcast_join ?
             (filter.has_remote_targets ? 1 : 0) : num_hosts;
         f->set_pending_count(pending_count);
-        vector<int> src_idxs;
-        for (int i = 0; i < num_hosts; ++i) {
-          src_idxs.push_back(start_fragment_instance_state_idx + i);
-        }
+        vector<int> src_idxs = fragment_params.GetInstanceIdxs();
 
         // If this is a broadcast join with only non-local targets, build and publish it
         // on MAX_BROADCAST_FILTER_PRODUCERS instances. If this is not a broadcast join
@@ -586,11 +565,9 @@ void Coordinator::UpdateFilterRoutingTable(const vector<TPlanNode>& plan_nodes,
         if (filter_mode_ == TRuntimeFilterMode::LOCAL && !tFilterTarget.is_local_target) {
           continue;
         }
+        vector<int> idxs = fragment_params.GetInstanceIdxs();
         FilterTarget target(tFilterTarget);
-        for (int i = 0; i < num_hosts; ++i) {
-          target.fragment_instance_state_idxs.insert(
-              start_fragment_instance_state_idx + i);
-        }
+        target.fragment_instance_state_idxs.insert(idxs.begin(), idxs.end());
         f->targets()->push_back(target);
       } else {
         DCHECK(false) << "Unexpected plan node with runtime filters: "
@@ -600,7 +577,7 @@ void Coordinator::UpdateFilterRoutingTable(const vector<TPlanNode>& plan_nodes,
   }
 }
 
-void Coordinator::StartFragments() {
+void Coordinator::StartFInstances() {
   int num_fragment_instances = schedule_.GetNumFragmentInstances();
   DCHECK_GT(num_fragment_instances, 0);
 
@@ -615,78 +592,28 @@ void Coordinator::StartFragments() {
   VLOG_QUERY << "starting " << num_fragment_instances << " fragment instances for query "
              << query_id_;
   query_events_->MarkEvent(
-      Substitute("Ready to start $0 fragments", num_fragment_instances));
+      Substitute("Ready to start $0 fragment instances", num_fragment_instances));
+
+  // TODO-MT: populate the runtime filter routing table
+  // This requires local aggregation of filters prior to sending
+  // for broadcast joins in order to avoid more complicated merge logic here.
 
-  int instance_state_idx = 0;
   if (filter_mode_ != TRuntimeFilterMode::OFF) {
+    DCHECK_EQ(request.plan_exec_info.size(), 1);
     // Populate the runtime filter routing table. This should happen before starting the
     // fragment instances. This code anticipates the indices of the instance states
     // created later on in ExecRemoteFragment()
-    for (int fragment_idx = 0; fragment_idx < request.fragments.size(); ++fragment_idx) {
-      const FragmentExecParams& params = schedule_.exec_params()[fragment_idx];
-      int num_hosts = params.hosts.size();
-      DCHECK_GT(num_hosts, 0);
-      UpdateFilterRoutingTable(request.fragments[fragment_idx].plan.nodes, num_hosts,
-          instance_state_idx);
-      instance_state_idx += num_hosts;
+    for (const FragmentExecParams& fragment_params: schedule_.fragment_exec_params()) {
+      UpdateFilterRoutingTable(fragment_params);
     }
     MarkFilterRoutingTableComplete();
   }
 
   int num_instances = 0;
-  // Start one fragment instance per fragment per host (number of hosts running each
-  // fragment may not be constant).
-  for (int fragment_idx = 0; fragment_idx < request.fragments.size(); ++fragment_idx) {
-    const FragmentExecParams& params = schedule_.exec_params()[fragment_idx];
-    int num_hosts = params.hosts.size();
-    DCHECK_GT(num_hosts, 0);
-    fragment_profiles_[fragment_idx].num_instances = num_hosts;
-    // Start one fragment instance for every fragment_instance required by the
-    // schedule. Each fragment instance is assigned a unique ID, numbered from 0, with
-    // instances for fragment ID 0 being assigned IDs [0 .. num_hosts(fragment_id_0)] and
-    // so on.
-    for (int fragment_instance_idx = 0; fragment_instance_idx < num_hosts;
-         ++fragment_instance_idx, ++num_instances) {
-      DebugOptions* fragment_instance_debug_options =
-          debug_options.IsApplicable(instance_state_idx) ? &debug_options : NULL;
-      exec_env_->fragment_exec_thread_pool()->Offer(
-        std::bind(&Coordinator::ExecRemoteFragment, this, std::cref(params),
-          std::cref(request.fragments[fragment_idx]), fragment_instance_debug_options,
-          fragment_instance_idx));
-    }
-  }
-  exec_complete_barrier_->Wait();
-  query_events_->MarkEvent(
-      Substitute("All $0 fragments instances started", num_instances));
-}
-
-void Coordinator::MtStartFInstances() {
-  int num_fragment_instances = schedule_.GetNumFragmentInstances();
-  DCHECK_GT(num_fragment_instances, 0);
-
-  fragment_instance_states_.resize(num_fragment_instances);
-  exec_complete_barrier_.reset(new CountingBarrier(num_fragment_instances));
-  num_remaining_fragment_instances_ = num_fragment_instances;
-
-  DebugOptions debug_options;
-  ProcessQueryOptions(schedule_.query_options(), &debug_options);
-  const TQueryExecRequest& request = schedule_.request();
-
-  VLOG_QUERY << "starting " << num_fragment_instances << " fragment instances for query "
-             << query_id_;
-  query_events_->MarkEvent(
-      Substitute("Ready to start $0 fragment instances", num_fragment_instances));
-
-  // TODO: populate the runtime filter routing table
-  // this requires local aggregation of filters prior to sending
-  // for broadcast joins in order to avoid more complicated merge logic here
-
-  int num_instances = 0;
-  for (const MtFragmentExecParams& fragment_params: schedule_.mt_fragment_exec_params()) {
-    for (int i = 0; i < fragment_params.instance_exec_params.size();
-        ++i, ++num_instances) {
-      const FInstanceExecParams& instance_params =
-          fragment_params.instance_exec_params[i];
+  for (const FragmentExecParams& fragment_params: schedule_.fragment_exec_params()) {
+    num_instances += fragment_params.instance_exec_params.size();
+    for (const FInstanceExecParams& instance_params:
+        fragment_params.instance_exec_params) {
       FragmentInstanceState* exec_state = obj_pool()->Add(
           new FragmentInstanceState(instance_params, obj_pool()));
       int instance_state_idx = GetInstanceIdx(instance_params.instance_id);
@@ -695,7 +622,7 @@ void Coordinator::MtStartFInstances() {
       DebugOptions* instance_debug_options =
           debug_options.IsApplicable(instance_state_idx) ? &debug_options : NULL;
       exec_env_->fragment_exec_thread_pool()->Offer(
-          std::bind(&Coordinator::MtExecRemoteFInstance,
+          std::bind(&Coordinator::ExecRemoteFInstance,
             this, std::cref(instance_params), instance_debug_options));
     }
   }
@@ -1243,79 +1170,12 @@ void Coordinator::PrintFragmentInstanceInfo() {
   }
 }
 
-void Coordinator::InitExecProfile(const TQueryExecRequest& request) {
-  // Initialize the structure to collect execution summary of every plan node.
-  fragment_profiles_.resize(request.fragments.size());
-  exec_summary_.__isset.nodes = true;
-  for (int i = 0; i < request.fragments.size(); ++i) {
-    if (!request.fragments[i].__isset.plan) continue;
-    const TPlan& plan = request.fragments[i].plan;
-    int fragment_first_node_idx = exec_summary_.nodes.size();
-
-    for (int j = 0; j < plan.nodes.size(); ++j) {
-      TPlanNodeExecSummary node;
-      node.node_id = plan.nodes[j].node_id;
-      node.fragment_idx = i;
-      node.label = plan.nodes[j].label;
-      node.__set_label_detail(plan.nodes[j].label_detail);
-      node.num_children = plan.nodes[j].num_children;
-
-      if (plan.nodes[j].__isset.estimated_stats) {
-        node.__set_estimated_stats(plan.nodes[j].estimated_stats);
-      }
-
-      plan_node_id_to_summary_map_[plan.nodes[j].node_id] = exec_summary_.nodes.size();
-      exec_summary_.nodes.push_back(node);
-    }
-
-    if (request.fragments[i].__isset.output_sink &&
-        request.fragments[i].output_sink.type == TDataSinkType::DATA_STREAM_SINK) {
-      const TDataStreamSink& sink = request.fragments[i].output_sink.stream_sink;
-      int exch_idx = plan_node_id_to_summary_map_[sink.dest_node_id];
-      if (sink.output_partition.type == TPartitionType::UNPARTITIONED) {
-        exec_summary_.nodes[exch_idx].__set_is_broadcast(true);
-      }
-      exec_summary_.__isset.exch_to_sender_map = true;
-      exec_summary_.exch_to_sender_map[exch_idx] = fragment_first_node_idx;
-    }
-  }
-
-  // Initialize the runtime profile structure. This adds the per fragment average
-  // profiles followed by the per fragment instance profiles.
-  for (int i = 0; i < request.fragments.size(); ++i) {
-    // Insert the avg profiles in ascending fragment number order. If there is a
-    // coordinator fragment, it's been placed in fragment_profiles_[0].averaged_profile,
-    // ensuring that this code will put the first averaged profile immediately after
-    // it. If there is no coordinator fragment, the first averaged profile will be
-    // inserted as the first child of query_profile_, and then all other averaged
-    // fragments will follow.
-    bool is_coordinator_fragment = (i == 0 && schedule_.GetCoordFragment() != nullptr);
-    string profile_name =
-        Substitute(is_coordinator_fragment ? "Coordinator Fragment $0" : "Fragment $0",
-            request.fragments[i].display_name);
-    fragment_profiles_[i].root_profile =
-        obj_pool()->Add(new RuntimeProfile(obj_pool(), profile_name));
-    if (is_coordinator_fragment) {
-      fragment_profiles_[i].averaged_profile = nullptr;
-    } else {
-      fragment_profiles_[i].averaged_profile = obj_pool()->Add(new RuntimeProfile(
-          obj_pool(),
-          Substitute("Averaged Fragment $0", request.fragments[i].display_name), true));
-      query_profile_->AddChild(fragment_profiles_[i].averaged_profile, true,
-          (i > 0) ? fragment_profiles_[i - 1].averaged_profile : NULL);
-    }
-    // Note: we don't start the wall timer here for the fragment
-    // profile; it's uninteresting and misleading.
-    query_profile_->AddChild(fragment_profiles_[i].root_profile);
-  }
-}
-
-void Coordinator::MtInitExecSummary() {
+void Coordinator::InitExecSummary() {
   const TQueryExecRequest& request = schedule_.request();
   // init exec_summary_.{nodes, exch_to_sender_map}
   exec_summary_.__isset.nodes = true;
   DCHECK(exec_summary_.nodes.empty());
-  for (const TPlanExecInfo& plan_exec_info: request.mt_plan_exec_info) {
+  for (const TPlanExecInfo& plan_exec_info: request.plan_exec_info) {
     for (const TPlanFragment& fragment: plan_exec_info.fragments) {
       if (!fragment.__isset.plan) continue;
 
@@ -1354,7 +1214,7 @@ void Coordinator::MtInitExecSummary() {
   }
 }
 
-void Coordinator::MtInitExecProfiles() {
+void Coordinator::InitExecProfiles() {
   const TQueryExecRequest& request = schedule_.request();
   vector<const TPlanFragment*> fragments;
   schedule_.GetTPlanFragments(&fragments);
@@ -1370,6 +1230,7 @@ void Coordinator::MtInitExecProfiles() {
     PerFragmentProfileData* data = &fragment_profiles_[fragment->idx];
     data->num_instances =
         schedule_.GetFragmentExecParams(fragment->idx).instance_exec_params.size();
+    // TODO-MT: stop special-casing the coordinator fragment
     if (fragment != coord_fragment) {
       data->averaged_profile = obj_pool()->Add(new RuntimeProfile(
           obj_pool(), Substitute("Averaged Fragment $0", fragment->display_name), true));
@@ -1383,7 +1244,6 @@ void Coordinator::MtInitExecProfiles() {
   }
 }
 
-
 void Coordinator::CollectScanNodeCounters(RuntimeProfile* profile,
     FragmentInstanceCounters* counters) {
   vector<RuntimeProfile*> children;
@@ -1407,11 +1267,11 @@ void Coordinator::CollectScanNodeCounters(RuntimeProfile* profile,
   }
 }
 
-void Coordinator::MtExecRemoteFInstance(
+void Coordinator::ExecRemoteFInstance(
     const FInstanceExecParams& exec_params, const DebugOptions* debug_options) {
   NotifyBarrierOnExit notifier(exec_complete_barrier_.get());
   TExecPlanFragmentParams rpc_params;
-  MtSetExecPlanFragmentParams(exec_params, &rpc_params);
+  SetExecPlanFragmentParams(exec_params, &rpc_params);
   if (debug_options != NULL) {
     rpc_params.fragment_instance_ctx.__set_debug_node_id(debug_options->node_id);
     rpc_params.fragment_instance_ctx.__set_debug_action(debug_options->action);
@@ -1467,70 +1327,6 @@ void Coordinator::MtExecRemoteFInstance(
       << " instance_id=" << PrintId(exec_state->fragment_instance_id());
 }
 
-void Coordinator::ExecRemoteFragment(const FragmentExecParams& fragment_exec_params,
-    const TPlanFragment& plan_fragment, DebugOptions* debug_options,
-    int fragment_instance_idx) {
-  NotifyBarrierOnExit notifier(exec_complete_barrier_.get());
-  TExecPlanFragmentParams rpc_params;
-  SetExecPlanFragmentParams(
-      plan_fragment, fragment_exec_params, fragment_instance_idx, &rpc_params);
-  if (debug_options != NULL) {
-    rpc_params.fragment_instance_ctx.__set_debug_node_id(debug_options->node_id);
-    rpc_params.fragment_instance_ctx.__set_debug_action(debug_options->action);
-    rpc_params.fragment_instance_ctx.__set_debug_phase(debug_options->phase);
-  }
-  FragmentInstanceState* exec_state = obj_pool()->Add(
-      new FragmentInstanceState(
-        plan_fragment.idx, fragment_exec_params, fragment_instance_idx, obj_pool()));
-  exec_state->ComputeTotalSplitSize(
-      rpc_params.fragment_instance_ctx.per_node_scan_ranges);
-  int instance_state_idx = GetInstanceIdx(exec_state->fragment_instance_id());
-  fragment_instance_states_[instance_state_idx] = exec_state;
-  VLOG_FILE << "making rpc: ExecPlanFragment"
-            << " instance_id=" << PrintId(exec_state->fragment_instance_id())
-            << " host=" << exec_state->impalad_address();
-
-  int64_t start = MonotonicMillis();
-
-  Status client_connect_status;
-  ImpalaBackendConnection backend_client(exec_env_->impalad_client_cache(),
-      exec_state->impalad_address(), &client_connect_status);
-  if (!client_connect_status.ok()) {
-    exec_state->SetInitialStatus(client_connect_status, true);
-    return;
-  }
-
-  TExecPlanFragmentResult thrift_result;
-  Status rpc_status = backend_client.DoRpc(&ImpalaBackendClient::ExecPlanFragment,
-      rpc_params, &thrift_result);
-
-  exec_state->set_rpc_latency(MonotonicMillis() - start);
-
-  const string ERR_TEMPLATE = "ExecPlanRequest rpc instance_id=$0 failed: $1";
-
-  if (!rpc_status.ok()) {
-    const string& err_msg =
-        Substitute(ERR_TEMPLATE, PrintId(exec_state->fragment_instance_id()),
-          rpc_status.msg().msg());
-    VLOG_QUERY << err_msg;
-    exec_state->SetInitialStatus(Status(err_msg), true);
-    return;
-  }
-
-  Status exec_plan_status = Status(thrift_result.status);
-  if (!exec_plan_status.ok()) {
-    const string& err_msg =
-        Substitute(ERR_TEMPLATE, PrintId(exec_state->fragment_instance_id()),
-          exec_plan_status.msg().GetFullMessageDetails());
-    VLOG_QUERY << err_msg;
-    exec_state->SetInitialStatus(Status(err_msg), true);
-    return;
-  }
-
-  exec_state->SetInitialStatus(Status::OK(), true);
-  return;
-}
-
 void Coordinator::Cancel(const Status* cause) {
   lock_guard<mutex> l(lock_);
   // if the query status indicates an error, cancellation has already been initiated
@@ -1834,14 +1630,6 @@ void Coordinator::UpdateExecSummary(const FragmentInstanceState& instance_state)
 
     TPlanNodeExecSummary& exec_summary =
         exec_summary_.nodes[plan_node_id_to_summary_map_[node_id]];
-    if (exec_summary.exec_stats.empty()) {
-      // First time, make an exec_stats for each instance this plan node is running on.
-      // TODO-MT: remove this and initialize all runtime state prior to starting
-      // instances
-      DCHECK_LT(instance_state.fragment_idx(), fragment_profiles_.size());
-      exec_summary.exec_stats.resize(
-          fragment_profiles_[instance_state.fragment_idx()].num_instances);
-    }
     DCHECK_LT(instance_state.per_fragment_instance_idx(), exec_summary.exec_stats.size());
     DCHECK_EQ(fragment_profiles_[instance_state.fragment_idx()].num_instances,
         exec_summary.exec_stats.size());
@@ -1956,7 +1744,7 @@ string Coordinator::GetErrorLog() {
   return PrintErrorMapToString(merged);
 }
 
-void Coordinator::MtSetExecPlanFragmentParams(
+void Coordinator::SetExecPlanFragmentParams(
     const FInstanceExecParams& params, TExecPlanFragmentParams* rpc_params) {
   rpc_params->__set_protocol_version(ImpalaInternalServiceVersion::V1);
   rpc_params->__set_query_ctx(query_ctx_);
@@ -1965,35 +1753,12 @@ void Coordinator::MtSetExecPlanFragmentParams(
   TPlanFragmentInstanceCtx fragment_instance_ctx;
 
   fragment_ctx.__set_fragment(params.fragment());
-  // TODO: Remove filters that weren't selected during filter routing table construction.
   SetExecPlanDescriptorTable(params.fragment(), rpc_params);
 
-  fragment_instance_ctx.__set_request_pool(schedule_.request_pool());
-  fragment_instance_ctx.__set_per_node_scan_ranges(params.per_node_scan_ranges);
-  fragment_instance_ctx.__set_per_exch_num_senders(
-      params.fragment_exec_params.per_exch_num_senders);
-  fragment_instance_ctx.__set_destinations(
-      params.fragment_exec_params.destinations);
-  fragment_instance_ctx.__set_sender_id(params.sender_id);
-  fragment_instance_ctx.fragment_instance_id = params.instance_id;
-  fragment_instance_ctx.per_fragment_instance_idx = params.per_fragment_instance_idx;
-  rpc_params->__set_fragment_ctx(fragment_ctx);
-  rpc_params->__set_fragment_instance_ctx(fragment_instance_ctx);
-}
-
-void Coordinator::SetExecPlanFragmentParams(
-    const TPlanFragment& fragment, const FragmentExecParams& params,
-    int per_fragment_instance_idx, TExecPlanFragmentParams* rpc_params) {
-  rpc_params->__set_protocol_version(ImpalaInternalServiceVersion::V1);
-  rpc_params->__set_query_ctx(query_ctx_);
-
-  TPlanFragmentCtx fragment_ctx;
-  TPlanFragmentInstanceCtx fragment_instance_ctx;
-
-  fragment_ctx.__set_fragment(fragment);
-  int instance_state_idx = GetInstanceIdx(params.instance_ids[per_fragment_instance_idx]);
   // Remove filters that weren't selected during filter routing table construction.
   if (filter_mode_ != TRuntimeFilterMode::OFF) {
+    DCHECK(schedule_.request().query_ctx.request.query_options.mt_dop == 0);
+    int instance_idx = GetInstanceIdx(params.instance_id);
     for (TPlanNode& plan_node: rpc_params->fragment_ctx.fragment.plan.nodes) {
       if (plan_node.__isset.runtime_filters) {
         vector<TRuntimeFilterDesc> required_filters;
@@ -2003,7 +1768,7 @@ void Coordinator::SetExecPlanFragmentParams(
           if (filter_it == filter_routing_table_.end()) continue;
           const FilterState& f = filter_it->second;
           if (plan_node.__isset.hash_join_node) {
-            if (f.src_fragment_instance_state_idxs().find(instance_state_idx) ==
+            if (f.src_fragment_instance_state_idxs().find(instance_idx) ==
                 f.src_fragment_instance_state_idxs().end()) {
               DCHECK(desc.is_broadcast_join);
               continue;
@@ -2018,24 +1783,16 @@ void Coordinator::SetExecPlanFragmentParams(
       }
     }
   }
-  SetExecPlanDescriptorTable(fragment, rpc_params);
-
-  TNetworkAddress exec_host = params.hosts[per_fragment_instance_idx];
-  FragmentScanRangeAssignment::const_iterator it =
-      params.scan_range_assignment.find(exec_host);
-  // Scan ranges may not always be set, so use an empty structure if so.
-  const PerNodeScanRanges& scan_ranges =
-      (it != params.scan_range_assignment.end()) ? it->second : PerNodeScanRanges();
 
   fragment_instance_ctx.__set_request_pool(schedule_.request_pool());
-  fragment_instance_ctx.__set_per_node_scan_ranges(scan_ranges);
-  fragment_instance_ctx.__set_per_exch_num_senders(params.per_exch_num_senders);
-  fragment_instance_ctx.__set_destinations(params.destinations);
-  fragment_instance_ctx.__set_sender_id(
-      params.sender_id_base + per_fragment_instance_idx);
-  fragment_instance_ctx.fragment_instance_id =
-      params.instance_ids[per_fragment_instance_idx];
-  fragment_instance_ctx.per_fragment_instance_idx = per_fragment_instance_idx;
+  fragment_instance_ctx.__set_per_node_scan_ranges(params.per_node_scan_ranges);
+  fragment_instance_ctx.__set_per_exch_num_senders(
+      params.fragment_exec_params.per_exch_num_senders);
+  fragment_instance_ctx.__set_destinations(
+      params.fragment_exec_params.destinations);
+  fragment_instance_ctx.__set_sender_id(params.sender_id);
+  fragment_instance_ctx.fragment_instance_id = params.instance_id;
+  fragment_instance_ctx.per_fragment_instance_idx = params.per_fragment_instance_idx;
   rpc_params->__set_fragment_ctx(fragment_ctx);
   rpc_params->__set_fragment_instance_ctx(fragment_instance_ctx);
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0d857237/be/src/runtime/coordinator.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h
index 176d89d..53f00eb 100644
--- a/be/src/runtime/coordinator.h
+++ b/be/src/runtime/coordinator.h
@@ -100,10 +100,6 @@ struct DebugOptions;
 ///
 /// TODO: move into separate subdirectory and move nested classes into separate files
 /// and unnest them
-///
-/// TODO: remove all data structures and functions that are superceded by their
-/// multi-threaded counterpart and remove the "Mt" prefix with which the latter
-/// is currently marked
 class Coordinator {
  public:
   Coordinator(const QuerySchedule& schedule, ExecEnv* exec_env,
@@ -216,7 +212,8 @@ class Coordinator {
 
  private:
   class FragmentInstanceState;
-  struct FilterState;
+  struct FilterTarget;
+  class FilterState;
 
   /// Typedef for boost utility to compute averaged stats
   /// TODO: including the median doesn't compile, looks like some includes are missing
@@ -253,7 +250,7 @@ class Coordinator {
 
   /// FragmentInstanceStates for all fragment instances, including that of the coordinator
   /// fragment. All elements are non-nullptr. Owned by obj_pool(). Filled in
-  /// PrepareCoordFragment() and StartRemoteFragments()/MtStartRemoteFInstances().
+  /// StartFInstances().
   std::vector<FragmentInstanceState*> fragment_instance_states_;
 
   /// True if the query needs a post-execution step to tidy up
@@ -412,22 +409,6 @@ class Coordinator {
   /// returned, successfully or not. Initialised during Exec().
   boost::scoped_ptr<CountingBarrier> exec_complete_barrier_;
 
-  /// Represents a runtime filter target.
-  struct FilterTarget {
-    TPlanNodeId node_id;
-    bool is_local;
-    bool is_bound_by_partition_columns;
-
-    // indices into fragment_instance_states_
-    boost::unordered_set<int> fragment_instance_state_idxs;
-
-    FilterTarget(const TRuntimeFilterTargetDesc& tFilterTarget) {
-      node_id = tFilterTarget.node_id;
-      is_bound_by_partition_columns = tFilterTarget.is_bound_by_partition_columns;
-      is_local = tFilterTarget.is_local_target;
-    }
-  };
-
   /// Protects filter_routing_table_.
   SpinLock filter_lock_;
 
@@ -444,7 +425,7 @@ class Coordinator {
   RuntimeProfile::Counter* filter_updates_received_;
 
   /// The filtering mode for this query. Set in constructor.
-  const TRuntimeFilterMode::type filter_mode_;
+  TRuntimeFilterMode::type filter_mode_;
 
   /// Tracks the memory consumed by runtime filters during aggregation. Child of
   /// query_mem_tracker_.
@@ -459,22 +440,13 @@ class Coordinator {
   /// Sets 'filter_routing_table_complete_' and prints the table to the profile and log.
   void MarkFilterRoutingTableComplete();
 
-  /// Fill in rpc_params based on parameters.
-  /// 'per_fragment_instance_idx' is the 0-based ordinal of this particular fragment
-  /// instance within its fragment.
-  void SetExecPlanFragmentParams(const TPlanFragment& fragment,
-      const FragmentExecParams& params, int per_fragment_instance_idx,
-      TExecPlanFragmentParams* rpc_params);
-  void MtSetExecPlanFragmentParams(
+  /// Fill in rpc_params based on params.
+  void SetExecPlanFragmentParams(
       const FInstanceExecParams& params, TExecPlanFragmentParams* rpc_params);
 
   /// Wrapper for ExecPlanFragment() RPC. This function will be called in parallel from
-  /// multiple threads. Creates a new FragmentInstanceState and registers it in
-  /// fragment_instance_states_, then calls RPC to issue fragment on remote impalad.
-  void ExecRemoteFragment(const FragmentExecParams& fragment_exec_params,
-      const TPlanFragment& plan_fragment, DebugOptions* debug_options,
-      int fragment_instance_idx);
-  void MtExecRemoteFInstance(
+  /// multiple threads.
+  void ExecRemoteFInstance(
       const FInstanceExecParams& exec_params, const DebugOptions* debug_options);
 
   /// Determine fragment number, given fragment id.
@@ -518,14 +490,13 @@ class Coordinator {
   /// Moves all temporary staging files to their final destinations.
   Status FinalizeSuccessfulInsert();
 
-  /// Initializes the structures in runtime profile and exec_summary_. Must be
-  /// called before RPCs to start remote fragments.
-  void InitExecProfile(const TQueryExecRequest& request);
-  void MtInitExecProfiles();
+  /// Initializes the structures in fragment_profiles_. Must be called before RPCs to
+  /// start remote fragments.
+  void InitExecProfiles();
 
   /// Initialize the structures to collect execution summary of every plan node
   /// (exec_summary_ and plan_node_id_to_summary_map_)
-  void MtInitExecSummary();
+  void InitExecSummary();
 
   /// Update fragment profile information from a fragment instance state.
   void UpdateAverageProfile(FragmentInstanceState* fragment_instance_state);
@@ -556,13 +527,10 @@ class Coordinator {
   void PopulatePathPermissionCache(hdfsFS fs, const std::string& path_str,
       PermissionCache* permissions_cache);
 
-  /// Starts all fragment instances contained in the schedule by issuing RPCs in parallel,
-  /// and then waiting for all of the RPCs to complete.
-  void StartFragments();
-
-  /// Starts all fragment instances contained in the schedule by issuing RPCs in parallel
-  /// and then waiting for all of the RPCs to complete.
-  void MtStartFInstances();
+  /// Starts all fragment instances contained in the schedule by issuing RPCs in
+  /// parallel and then waiting for all of the RPCs to complete. Also sets up and
+  /// registers the state for all fragment instances.
+  void StartFInstances();
 
   /// Calls CancelInternal() and returns an error if there was any error starting the
   /// fragments.
@@ -570,11 +538,8 @@ class Coordinator {
   Status FinishInstanceStartup();
 
   /// Build the filter routing table by iterating over all plan nodes and collecting the
-  /// filters that they either produce or consume. The source and target fragment
-  /// instance indexes for filters are numbered in the range
-  /// [start_fragment_instance_idx .. start_fragment_instance_idx + num_hosts]
-  void UpdateFilterRoutingTable(const std::vector<TPlanNode>& plan_nodes, int num_hosts,
-      int start_fragment_instance_idx);
+  /// filters that they either produce or consume.
+  void UpdateFilterRoutingTable(const FragmentExecParams& fragment_params);
 };
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0d857237/be/src/scheduling/query-schedule.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/query-schedule.cc b/be/src/scheduling/query-schedule.cc
index 1eb36e3..e2dc7c4 100644
--- a/be/src/scheduling/query-schedule.cc
+++ b/be/src/scheduling/query-schedule.cc
@@ -50,39 +50,16 @@ QuerySchedule::QuerySchedule(const TUniqueId& query_id,
     query_options_(query_options),
     summary_profile_(summary_profile),
     query_events_(query_events),
-    num_fragment_instances_(0),
     num_scan_ranges_(0),
     next_instance_id_(query_id),
     is_admitted_(false) {
-  fragment_exec_params_.resize(request.fragments.size());
-  bool is_mt_execution = request.query_ctx.request.query_options.mt_dop > 0;
-
-  if (is_mt_execution) {
-    /// TODO-MT: remove else branch and move MtInit() logic here
-    MtInit();
-  } else {
-    // Build two maps to map node ids to their fragments as well as to the offset in their
-    // fragment's plan's nodes list.
-    for (int i = 0; i < request.fragments.size(); ++i) {
-      int node_idx = 0;
-      for (const TPlanNode& node: request.fragments[i].plan.nodes) {
-        if (plan_node_to_fragment_idx_.size() < node.node_id + 1) {
-          plan_node_to_fragment_idx_.resize(node.node_id + 1);
-          plan_node_to_plan_node_idx_.resize(node.node_id + 1);
-        }
-        DCHECK_EQ(plan_node_to_fragment_idx_.size(), plan_node_to_plan_node_idx_.size());
-        plan_node_to_fragment_idx_[node.node_id] = i;
-        plan_node_to_plan_node_idx_[node.node_id] = node_idx;
-        ++node_idx;
-      }
-    }
-  }
+  Init();
 }
 
-void QuerySchedule::MtInit() {
-  // extract TPlanFragments and order by fragment id
+void QuerySchedule::Init() {
+  // extract TPlanFragments and order by fragment idx
   vector<const TPlanFragment*> fragments;
-  for (const TPlanExecInfo& plan_exec_info: request_.mt_plan_exec_info) {
+  for (const TPlanExecInfo& plan_exec_info: request_.plan_exec_info) {
     for (const TPlanFragment& fragment: plan_exec_info.fragments) {
       fragments.push_back(&fragment);
     }
@@ -90,41 +67,34 @@ void QuerySchedule::MtInit() {
   sort(fragments.begin(), fragments.end(),
       [](const TPlanFragment* a, const TPlanFragment* b) { return a->idx < b->idx; });
 
-  DCHECK_EQ(mt_fragment_exec_params_.size(), 0);
+  // this must only be called once
+  DCHECK_EQ(fragment_exec_params_.size(), 0);
   for (const TPlanFragment* fragment: fragments) {
-    mt_fragment_exec_params_.emplace_back(*fragment);
+    fragment_exec_params_.emplace_back(*fragment);
   }
 
   // mark coordinator fragment
-  const TPlanFragment& coord_fragment = request_.mt_plan_exec_info[0].fragments[0];
-  if (coord_fragment.partition.type == TPartitionType::UNPARTITIONED) {
-    mt_fragment_exec_params_[coord_fragment.idx].is_coord_fragment = true;
-    next_instance_id_.lo = 1;  // generated instance ids start at 1
+  const TPlanFragment& root_fragment = request_.plan_exec_info[0].fragments[0];
+  if (request_.stmt_type == TStmtType::QUERY) {
+    fragment_exec_params_[root_fragment.idx].is_coord_fragment = true;
+    // the coordinator instance gets index 0, generated instance ids start at 1
+    next_instance_id_.lo = 1;
   }
 
-  // compute input fragments and find max node id
+  // find max node id
   int max_node_id = 0;
-  for (const TPlanExecInfo& plan_exec_info: request_.mt_plan_exec_info) {
+  for (const TPlanExecInfo& plan_exec_info: request_.plan_exec_info) {
     for (const TPlanFragment& fragment: plan_exec_info.fragments) {
       for (const TPlanNode& node: fragment.plan.nodes) {
         max_node_id = max(node.node_id, max_node_id);
       }
     }
-
-    // fragments[i] sends its output to fragments[dest_fragment_idx[i-1]]
-    for (int i = 1; i < plan_exec_info.fragments.size(); ++i) {
-      const TPlanFragment& fragment = plan_exec_info.fragments[i];
-      FragmentIdx dest_idx =
-          plan_exec_info.fragments[plan_exec_info.dest_fragment_idx[i - 1]].idx;
-      MtFragmentExecParams& dest_params = mt_fragment_exec_params_[dest_idx];
-      dest_params.input_fragments.push_back(fragment.idx);
-    }
   }
 
   // populate plan_node_to_fragment_idx_ and plan_node_to_plan_node_idx_
   plan_node_to_fragment_idx_.resize(max_node_id + 1);
   plan_node_to_plan_node_idx_.resize(max_node_id + 1);
-  for (const TPlanExecInfo& plan_exec_info: request_.mt_plan_exec_info) {
+  for (const TPlanExecInfo& plan_exec_info: request_.plan_exec_info) {
     for (const TPlanFragment& fragment: plan_exec_info.fragments) {
       for (int i = 0; i < fragment.plan.nodes.size(); ++i) {
         const TPlanNode& node = fragment.plan.nodes[i];
@@ -133,8 +103,72 @@ void QuerySchedule::MtInit() {
       }
     }
   }
+
+  // compute input fragments
+  for (const TPlanExecInfo& plan_exec_info: request_.plan_exec_info) {
+    // each fragment sends its output to the fragment containing the destination node
+    // of its output sink
+    for (const TPlanFragment& fragment: plan_exec_info.fragments) {
+      if (!fragment.output_sink.__isset.stream_sink) continue;
+      PlanNodeId dest_node_id = fragment.output_sink.stream_sink.dest_node_id;
+      FragmentIdx dest_idx = plan_node_to_fragment_idx_[dest_node_id];
+      FragmentExecParams& dest_params = fragment_exec_params_[dest_idx];
+      dest_params.input_fragments.push_back(fragment.idx);
+    }
+  }
 }
 
+void QuerySchedule::Validate() const {
+  // all fragments have a FragmentExecParams
+  int num_fragments = 0;
+  for (const TPlanExecInfo& plan_exec_info: request_.plan_exec_info) {
+    for (const TPlanFragment& fragment: plan_exec_info.fragments) {
+      DCHECK_LT(fragment.idx, fragment_exec_params_.size());
+      DCHECK_EQ(fragment.idx, fragment_exec_params_[fragment.idx].fragment.idx);
+      ++num_fragments;
+    }
+  }
+  DCHECK_EQ(num_fragments, fragment_exec_params_.size());
+
+  // we assigned the correct number of scan ranges per (host, node id):
+  // assemble a map from host -> (map from node id -> #scan ranges)
+  unordered_map<TNetworkAddress, map<TPlanNodeId, int>> count_map;
+  for (const FragmentExecParams& fp: fragment_exec_params_) {
+    for (const FInstanceExecParams& ip: fp.instance_exec_params) {
+      auto host_it = count_map.find(ip.host);
+      if (host_it == count_map.end()) {
+        count_map.insert(make_pair(ip.host, map<TPlanNodeId, int>()));
+        host_it = count_map.find(ip.host);
+      }
+      map<TPlanNodeId, int>& node_map = host_it->second;
+
+      for (const PerNodeScanRanges::value_type& instance_entry: ip.per_node_scan_ranges) {
+        TPlanNodeId node_id = instance_entry.first;
+        auto count_entry = node_map.find(node_id);
+        if (count_entry == node_map.end()) {
+          node_map.insert(make_pair(node_id, 0));
+          count_entry = node_map.find(node_id);
+        }
+        count_entry->second += instance_entry.second.size();
+      }
+    }
+  }
+
+  for (const FragmentExecParams& fp: fragment_exec_params_) {
+    for (const FragmentScanRangeAssignment::value_type& assignment_entry:
+        fp.scan_range_assignment) {
+      const TNetworkAddress& host = assignment_entry.first;
+      DCHECK_GT(count_map.count(host), 0);
+      map<TPlanNodeId, int>& node_map = count_map.find(host)->second;
+      for (const PerNodeScanRanges::value_type& node_assignment:
+          assignment_entry.second) {
+        TPlanNodeId node_id = node_assignment.first;
+        DCHECK_GT(node_map.count(node_id), 0);
+        DCHECK_EQ(node_map[node_id], node_assignment.second.size());
+      }
+    }
+  }
+}
 
 int64_t QuerySchedule::GetClusterMemoryEstimate() const {
   DCHECK_GT(unique_hosts_.size(), 0);
@@ -199,15 +233,8 @@ const TPlanFragment& FInstanceExecParams::fragment() const {
 
 int QuerySchedule::GetNumFragmentInstances() const {
   int result = 0;
-  if (mt_fragment_exec_params_.empty()) {
-    DCHECK(!fragment_exec_params_.empty());
-    for (const FragmentExecParams& fragment_exec_params : fragment_exec_params_) {
-      result += fragment_exec_params.hosts.size();
-    }
-  } else {
-    for (const MtFragmentExecParams& fragment_exec_params : mt_fragment_exec_params_) {
-      result += fragment_exec_params.instance_exec_params.size();
-    }
+  for (const FragmentExecParams& fragment_exec_params : fragment_exec_params_) {
+    result += fragment_exec_params.instance_exec_params.size();
   }
   return result;
 }
@@ -215,37 +242,35 @@ int QuerySchedule::GetNumFragmentInstances() const {
 const TPlanFragment* QuerySchedule::GetCoordFragment() const {
   // Only have coordinator fragment for statements that return rows.
   if (request_.stmt_type != TStmtType::QUERY) return nullptr;
-  bool is_mt_exec = request_.query_ctx.request.query_options.mt_dop > 0;
-  const TPlanFragment* fragment = is_mt_exec
-      ? &request_.mt_plan_exec_info[0].fragments[0] : &request_.fragments[0];
-
-    return fragment;
+  const TPlanFragment* fragment = &request_.plan_exec_info[0].fragments[0];
+  DCHECK_EQ(fragment->partition.type, TPartitionType::UNPARTITIONED);
+  return fragment;
 }
 
+
 void QuerySchedule::GetTPlanFragments(vector<const TPlanFragment*>* fragments) const {
   fragments->clear();
-  bool is_mt_exec = request_.query_ctx.request.query_options.mt_dop > 0;
-  if (is_mt_exec) {
-    for (const TPlanExecInfo& plan_info: request_.mt_plan_exec_info) {
-      for (const TPlanFragment& fragment: plan_info.fragments) {
-        fragments->push_back(&fragment);
-      }
-    }
-  } else {
-    for (const TPlanFragment& fragment: request_.fragments) {
+  for (const TPlanExecInfo& plan_info: request_.plan_exec_info) {
+    for (const TPlanFragment& fragment: plan_info.fragments) {
       fragments->push_back(&fragment);
     }
   }
 }
 
 const FInstanceExecParams& QuerySchedule::GetCoordInstanceExecParams() const {
-  const TPlanFragment& coord_fragment =  request_.mt_plan_exec_info[0].fragments[0];
-  DCHECK_EQ(coord_fragment.partition.type, TPartitionType::UNPARTITIONED);
-  const MtFragmentExecParams* fragment_params =
-      &mt_fragment_exec_params_[coord_fragment.idx];
-  DCHECK(fragment_params != nullptr);
-  DCHECK_EQ(fragment_params->instance_exec_params.size(), 1);
-  return fragment_params->instance_exec_params[0];
+  DCHECK_EQ(request_.stmt_type, TStmtType::QUERY);
+  const TPlanFragment& coord_fragment =  request_.plan_exec_info[0].fragments[0];
+  const FragmentExecParams& fragment_params = fragment_exec_params_[coord_fragment.idx];
+  DCHECK_EQ(fragment_params.instance_exec_params.size(), 1);
+  return fragment_params.instance_exec_params[0];
+}
+
+vector<int> FragmentExecParams::GetInstanceIdxs() const {
+  vector<int> result;
+  for (const FInstanceExecParams& instance_params: instance_exec_params) {
+    result.push_back(GetInstanceIdx(instance_params.instance_id));
+  }
+  return result;
 }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0d857237/be/src/scheduling/query-schedule.h
----------------------------------------------------------------------
diff --git a/be/src/scheduling/query-schedule.h b/be/src/scheduling/query-schedule.h
index 77c9cd6..703c07c 100644
--- a/be/src/scheduling/query-schedule.h
+++ b/be/src/scheduling/query-schedule.h
@@ -35,7 +35,7 @@
 namespace impala {
 
 class Coordinator;
-struct MtFragmentExecParams;
+struct FragmentExecParams;
 
 /// map from scan node id to a list of scan ranges
 typedef std::map<TPlanNodeId, std::vector<TScanRangeParams>> PerNodeScanRanges;
@@ -45,22 +45,6 @@ typedef std::map<TPlanNodeId, std::vector<TScanRangeParams>> PerNodeScanRanges;
 typedef std::unordered_map<TNetworkAddress, PerNodeScanRanges>
     FragmentScanRangeAssignment;
 
-/// execution parameters for a single fragment; used to assemble the
-/// TPlanFragmentInstanceCtx;
-/// hosts.size() == instance_ids.size()
-struct FragmentExecParams {
-  std::vector<TNetworkAddress> hosts; // execution backends
-  std::vector<TUniqueId> instance_ids;
-  std::vector<TPlanFragmentDestination> destinations;
-  std::map<PlanNodeId, int> per_exch_num_senders;
-  FragmentScanRangeAssignment scan_range_assignment;
-  /// In its role as a data sender, a fragment instance is assigned a "sender id" to
-  /// uniquely identify it to a receiver. The id that a particular fragment instance
-  /// is assigned ranges from [sender_id_base, sender_id_base + N - 1], where
-  /// N = hosts.size (i.e. N = number of fragment instances)
-  int sender_id_base;
-};
-
 /// execution parameters for a single fragment instance; used to assemble the
 /// TPlanFragmentInstanceCtx
 struct FInstanceExecParams {
@@ -75,12 +59,12 @@ struct FInstanceExecParams {
   /// uniquely identify it to a receiver. -1 = invalid.
   int sender_id;
 
-  /// the parent MtFragmentExecParams
-  const MtFragmentExecParams& fragment_exec_params;
+  /// the parent FragmentExecParams
+  const FragmentExecParams& fragment_exec_params;
   const TPlanFragment& fragment() const;
 
   FInstanceExecParams(const TUniqueId& instance_id, const TNetworkAddress& host,
-      int per_fragment_instance_idx, const MtFragmentExecParams& fragment_exec_params)
+      int per_fragment_instance_idx, const FragmentExecParams& fragment_exec_params)
     : instance_id(instance_id), host(host),
       per_fragment_instance_idx(per_fragment_instance_idx),
       sender_id(-1),
@@ -88,7 +72,7 @@ struct FInstanceExecParams {
 };
 
 /// Execution parameters shared between fragment instances
-struct MtFragmentExecParams {
+struct FragmentExecParams {
   /// output destinations of this fragment
   std::vector<TPlanFragmentDestination> destinations;
 
@@ -105,8 +89,11 @@ struct MtFragmentExecParams {
   std::vector<FragmentIdx> input_fragments;
   std::vector<FInstanceExecParams> instance_exec_params;
 
-  MtFragmentExecParams(const TPlanFragment& fragment)
+  FragmentExecParams(const TPlanFragment& fragment)
     : is_coord_fragment(false), fragment(fragment) {}
+
+  // extract instance indices from instance_exec_params.instance_id
+  std::vector<int> GetInstanceIdxs() const;
 };
 
 /// A QuerySchedule contains all necessary information for a query coordinator to
@@ -123,6 +110,11 @@ class QuerySchedule {
       const TQueryOptions& query_options, RuntimeProfile* summary_profile,
       RuntimeProfile::EventSequence* query_events);
 
+  /// Verifies that the schedule is well-formed (and DCHECKs if it isn't):
+  /// - all fragments have a FragmentExecParams
+  /// - all scan ranges are assigned
+  void Validate() const;
+
   const TUniqueId& query_id() const { return query_id_; }
   const TQueryExecRequest& request() const { return request_; }
   const TQueryOptions& query_options() const { return query_options_; }
@@ -165,30 +157,24 @@ class QuerySchedule {
   TUniqueId GetNextInstanceId();
 
   const TPlanFragment& GetContainingFragment(PlanNodeId node_id) const {
-    return mt_fragment_exec_params_[GetFragmentIdx(node_id)].fragment;
+    FragmentIdx fragment_idx = GetFragmentIdx(node_id);
+    DCHECK_LT(fragment_idx, fragment_exec_params_.size());
+    return fragment_exec_params_[fragment_idx].fragment;
   }
 
-  /// Map node ids to the index of their node inside their plan.nodes list.
-  /// TODO-MT: remove; only needed for the ST path
-  int32_t GetNodeIdx(PlanNodeId id) const { return plan_node_to_plan_node_idx_[id]; }
-
   const TPlanNode& GetNode(PlanNodeId id) const {
     const TPlanFragment& fragment = GetContainingFragment(id);
     return fragment.plan.nodes[plan_node_to_plan_node_idx_[id]];
   }
 
-  std::vector<FragmentExecParams>* exec_params() { return &fragment_exec_params_; }
-  const std::vector<FragmentExecParams>& exec_params() const {
+  const std::vector<FragmentExecParams>& fragment_exec_params() const {
     return fragment_exec_params_;
   }
-  const std::vector<MtFragmentExecParams>& mt_fragment_exec_params() const {
-    return mt_fragment_exec_params_;
-  }
-  const MtFragmentExecParams& GetFragmentExecParams(FragmentIdx idx) const {
-    return mt_fragment_exec_params_[idx];
+  const FragmentExecParams& GetFragmentExecParams(FragmentIdx idx) const {
+    return fragment_exec_params_[idx];
   }
-  MtFragmentExecParams* GetFragmentExecParams(FragmentIdx idx) {
-    return &mt_fragment_exec_params_[idx];
+  FragmentExecParams* GetFragmentExecParams(FragmentIdx idx) {
+    return &fragment_exec_params_[idx];
   }
 
   const FInstanceExecParams& GetCoordInstanceExecParams() const;
@@ -211,6 +197,8 @@ class QuerySchedule {
 
   /// The query options from the TClientRequest
   const TQueryOptions& query_options_;
+
+  /// TODO: move these into QueryState
   RuntimeProfile* summary_profile_;
   RuntimeProfile::EventSequence* query_events_;
 
@@ -220,21 +208,14 @@ class QuerySchedule {
   /// Maps from plan node id to its index in plan.nodes. Filled in c'tor.
   std::vector<int32_t> plan_node_to_plan_node_idx_;
 
-  /// vector is indexed by fragment index from TQueryExecRequest.fragments;
-  /// populated by Scheduler::Schedule()
+  // populated in Init() and Scheduler::Schedule()
+  // (SimpleScheduler::ComputeFInstanceExecParams()), indexed by fragment idx
+  // (TPlanFragment.idx)
   std::vector<FragmentExecParams> fragment_exec_params_;
 
-  // populated by Scheduler::Schedule (SimpleScheduler::ComputeMtFInstanceExecParams())
-  // indexed by fragment idx (TPlanFragment.idx)
-  std::vector<MtFragmentExecParams> mt_fragment_exec_params_;
-
   /// The set of hosts that the query will run on excluding the coordinator.
   boost::unordered_set<TNetworkAddress> unique_hosts_;
 
-  /// Number of backends executing plan fragments on behalf of this query.
-  /// TODO-MT: remove
-  int64_t num_fragment_instances_;
-
   /// Total number of scan ranges of this query.
   int64_t num_scan_ranges_;
 
@@ -247,10 +228,10 @@ class QuerySchedule {
   /// Indicates if the query has been admitted for execution.
   bool is_admitted_;
 
-  /// Populate mt_fragment_exec_params_ from request_.mt_plan_exec_info.
+  /// Populate fragment_exec_params_ from request_.plan_exec_info.
   /// Sets is_coord_fragment and input_fragments.
   /// Also populates plan_node_to_fragment_idx_ and plan_node_to_plan_node_idx_.
-  void MtInit();
+  void Init();
 };
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0d857237/be/src/scheduling/simple-scheduler-test-util.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/simple-scheduler-test-util.cc b/be/src/scheduling/simple-scheduler-test-util.cc
index d3f5584..911279c 100644
--- a/be/src/scheduling/simple-scheduler-test-util.cc
+++ b/be/src/scheduling/simple-scheduler-test-util.cc
@@ -202,7 +202,7 @@ const vector<TNetworkAddress>& Plan::referenced_datanodes() const {
   return referenced_datanodes_;
 }
 
-const vector<TScanRangeLocations>& Plan::scan_range_locations() const {
+const vector<TScanRangeLocationList>& Plan::scan_range_locations() const {
   return scan_range_locations_;
 }
 
@@ -211,14 +211,14 @@ void Plan::AddTableScan(const TableName& table_name) {
   const vector<Block>& blocks = table.blocks;
   for (int i = 0; i < blocks.size(); ++i) {
     const Block& block = blocks[i];
-    TScanRangeLocations scan_range_locations;
-    BuildTScanRangeLocations(table_name, block, i, &scan_range_locations);
+    TScanRangeLocationList scan_range_locations;
+    BuildTScanRangeLocationList(table_name, block, i, &scan_range_locations);
     scan_range_locations_.push_back(scan_range_locations);
   }
 }
 
-void Plan::BuildTScanRangeLocations(const TableName& table_name, const Block& block,
-    int block_idx, TScanRangeLocations* scan_range_locations) {
+void Plan::BuildTScanRangeLocationList(const TableName& table_name, const Block& block,
+    int block_idx, TScanRangeLocationList* scan_range_locations) {
   const vector<int>& replica_idxs = block.replica_host_idxs;
   const vector<bool>& is_cached = block.replica_host_idx_is_cached;
   DCHECK_EQ(replica_idxs.size(), is_cached.size());

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0d857237/be/src/scheduling/simple-scheduler-test-util.h
----------------------------------------------------------------------
diff --git a/be/src/scheduling/simple-scheduler-test-util.h b/be/src/scheduling/simple-scheduler-test-util.h
index ab46e2a..82190ad 100644
--- a/be/src/scheduling/simple-scheduler-test-util.h
+++ b/be/src/scheduling/simple-scheduler-test-util.h
@@ -231,11 +231,11 @@ class Plan {
 
   const std::vector<TNetworkAddress>& referenced_datanodes() const;
 
-  const std::vector<TScanRangeLocations>& scan_range_locations() const;
+  const std::vector<TScanRangeLocationList>& scan_range_locations() const;
 
   /// Add a scan of table 'table_name' to the plan. This method will populate the internal
-  /// list of TScanRangeLocations and can be called multiple times for the same table to
-  /// schedule additional scans.
+  /// list of TScanRangeLocationList and can be called multiple times for the same table
+  /// to schedule additional scans.
   void AddTableScan(const TableName& table_name);
 
  private:
@@ -252,11 +252,11 @@ class Plan {
   std::unordered_map<int, int> host_idx_to_datanode_idx_;
 
   /// List of all scan range locations, which can be passed to the SimpleScheduler.
-  std::vector<TScanRangeLocations> scan_range_locations_;
+  std::vector<TScanRangeLocationList> scan_range_locations_;
 
-  /// Initialize a TScanRangeLocations object in place.
-  void BuildTScanRangeLocations(const TableName& table_name, const Block& block,
-      int block_idx, TScanRangeLocations* scan_range_locations);
+  /// Initialize a TScanRangeLocationList object in place.
+  void BuildTScanRangeLocationList(const TableName& table_name, const Block& block,
+      int block_idx, TScanRangeLocationList* scan_range_locations);
 
   void BuildScanRange(const TableName& table_name, const Block& block, int block_idx,
       TScanRange* scan_range);