You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2016/10/31 16:06:04 UTC
[2/3] incubator-impala git commit: IMPALA-4314: Standardize on
MT-related data structures
IMPALA-4314: Standardize on MT-related data structures
This removes the data structures that were "superceded" in
IMPALA-3903 and changes all control flow to utilize the
new data structures. The new data structures are renamed
to remove the "Mt" prefix.
Change-Id: I465d0e15e2cf17cafe4c747d34c8f595d3645151
Reviewed-on: http://gerrit.cloudera.org:8080/4853
Reviewed-by: Marcel Kornacker <ma...@cloudera.com>
Tested-by: Tim Armstrong <ta...@cloudera.com>
Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/0d857237
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/0d857237
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/0d857237
Branch: refs/heads/master
Commit: 0d857237a8d5231caf9af4c30d4134b151cd1e0d
Parents: 29faca5
Author: Marcel Kornacker <ma...@cloudera.com>
Authored: Tue Oct 4 20:40:18 2016 -0400
Committer: Tim Armstrong <ta...@cloudera.com>
Committed: Mon Oct 31 16:03:32 2016 +0000
----------------------------------------------------------------------
be/src/benchmarks/expr-benchmark.cc | 2 +-
be/src/runtime/coordinator.cc | 415 ++++--------------
be/src/runtime/coordinator.h | 71 +---
be/src/scheduling/query-schedule.cc | 175 ++++----
be/src/scheduling/query-schedule.h | 77 ++--
be/src/scheduling/simple-scheduler-test-util.cc | 10 +-
be/src/scheduling/simple-scheduler-test-util.h | 14 +-
be/src/scheduling/simple-scheduler.cc | 417 +++++++------------
be/src/scheduling/simple-scheduler.h | 81 ++--
be/src/service/impala-http-handler.cc | 7 +-
be/src/service/impala-server.cc | 6 +-
be/src/service/query-exec-state.cc | 8 +-
common/thrift/Frontend.thrift | 50 +--
common/thrift/Planner.thrift | 4 +-
.../impala/planner/DataSourceScanNode.java | 4 +-
.../apache/impala/planner/HBaseScanNode.java | 6 +-
.../org/apache/impala/planner/HdfsScanNode.java | 6 +-
.../org/apache/impala/planner/KuduScanNode.java | 4 +-
.../java/org/apache/impala/planner/Planner.java | 4 +-
.../org/apache/impala/planner/ScanNode.java | 6 +-
.../org/apache/impala/service/Frontend.java | 187 ++-------
.../apache/impala/planner/PlannerTestBase.java | 187 +++++----
.../queries/PlannerTest/mt-dop-validation.test | 50 +--
23 files changed, 627 insertions(+), 1164 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0d857237/be/src/benchmarks/expr-benchmark.cc
----------------------------------------------------------------------
diff --git a/be/src/benchmarks/expr-benchmark.cc b/be/src/benchmarks/expr-benchmark.cc
index b95e6d7..444df3a 100644
--- a/be/src/benchmarks/expr-benchmark.cc
+++ b/be/src/benchmarks/expr-benchmark.cc
@@ -111,7 +111,7 @@ MemTracker tracker;
// constant query
static Status PrepareSelectList(const TExecRequest& request, ExprContext** ctx) {
const TQueryExecRequest& query_request = request.query_exec_request;
- vector<TExpr> texprs = query_request.fragments[0].output_exprs;
+ vector<TExpr> texprs = query_request.plan_exec_info[0].fragments[0].output_exprs;
DCHECK_EQ(texprs.size(), 1);
RETURN_IF_ERROR(Expr::CreateExprTree(&pool, texprs[0], ctx));
RETURN_IF_ERROR((*ctx)->Prepare(NULL, RowDescriptor(), &tracker));
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0d857237/be/src/runtime/coordinator.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 7df0bf0..f719e98 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -125,39 +125,17 @@ struct DebugOptions {
/// - updates through UpdateFragmentExecStatus()
class Coordinator::FragmentInstanceState {
public:
- // TODO-MT: remove this c'tor
- FragmentInstanceState(FragmentIdx fragment_idx, const FragmentExecParams& params,
- int per_fragment_instance_idx, ObjectPool* obj_pool)
- : fragment_instance_id_(params.instance_ids[per_fragment_instance_idx]),
- fragment_idx_(fragment_idx),
- per_fragment_instance_idx_(per_fragment_instance_idx),
- impalad_address_(params.hosts[per_fragment_instance_idx]),
- total_split_size_(0),
- rpc_sent_(false),
- done_(false),
- profile_created_(false),
- profile_(NULL),
- total_ranges_complete_(0),
- rpc_latency_(0) {
- const string& profile_name = Substitute("Instance $0 (host=$1)",
- PrintId(fragment_instance_id_), lexical_cast<string>(impalad_address_));
- profile_ = obj_pool->Add(new RuntimeProfile(obj_pool, profile_name));
- }
-
FragmentInstanceState(const FInstanceExecParams& params, ObjectPool* obj_pool)
- : fragment_instance_id_(params.instance_id),
- fragment_idx_(params.fragment().idx),
- per_fragment_instance_idx_(params.per_fragment_instance_idx),
- impalad_address_(params.host),
+ : exec_params_(params),
total_split_size_(0),
+ profile_(nullptr),
+ total_ranges_complete_(0),
+ rpc_latency_(0),
rpc_sent_(false),
done_(false),
- profile_created_(false),
- profile_(NULL),
- total_ranges_complete_(0),
- rpc_latency_(0) {
+ profile_created_(false) {
const string& profile_name = Substitute("Instance $0 (host=$1)",
- PrintId(fragment_instance_id_), lexical_cast<string>(impalad_address_));
+ PrintId(params.instance_id), lexical_cast<string>(params.host));
profile_ = obj_pool->Add(new RuntimeProfile(obj_pool, profile_name));
}
@@ -182,13 +160,13 @@ class Coordinator::FragmentInstanceState {
int64_t UpdateNumScanRangesCompleted();
// The following getters do not require lock() to be held.
- const TUniqueId& fragment_instance_id() const { return fragment_instance_id_; }
- FragmentIdx fragment_idx() const { return fragment_idx_; }
+ const TUniqueId& fragment_instance_id() const { return exec_params_.instance_id; }
+ FragmentIdx fragment_idx() const { return exec_params_.fragment().idx; }
MonotonicStopWatch* stopwatch() { return &stopwatch_; }
- const TNetworkAddress& impalad_address() const { return impalad_address_; }
+ const TNetworkAddress& impalad_address() const { return exec_params_.host; }
int64_t total_split_size() const { return total_split_size_; }
bool done() const { return done_; }
- int per_fragment_instance_idx() const { return per_fragment_instance_idx_; }
+ int per_fragment_instance_idx() const { return exec_params_.per_fragment_instance_idx; }
bool rpc_sent() const { return rpc_sent_; }
int64_t rpc_latency() const { return rpc_latency_; }
@@ -218,22 +196,11 @@ class Coordinator::FragmentInstanceState {
}
private:
- /// The unique ID of this instance of this fragment (there may be many instance of the
- /// same fragment, but this ID uniquely identifies this FragmentInstanceState).
- TUniqueId fragment_instance_id_;
-
- // Same as TPlanFragment.idx
- FragmentIdx fragment_idx_;
-
- /// range: 0..<# instances of this fragment>-1
- int per_fragment_instance_idx_;
+ const FInstanceExecParams& exec_params_;
/// Wall clock timer for this fragment.
MonotonicStopWatch stopwatch_;
- /// Address of ImpalaInternalService this fragment is running on.
- const TNetworkAddress impalad_address_;
-
/// Summed across all splits; in bytes.
int64_t total_split_size_;
@@ -246,16 +213,6 @@ class Coordinator::FragmentInstanceState {
/// been initiated; either way, execution must not be cancelled.
Status status_;
- /// If true, ExecPlanFragment() rpc has been sent - even if it was not determined to be
- /// successful.
- bool rpc_sent_;
-
- /// If true, execution terminated; do not cancel in that case.
- bool done_;
-
- /// True after the first call to profile->Update()
- bool profile_created_;
-
/// Owned by coordinator object pool provided in the c'tor
RuntimeProfile* profile_;
@@ -270,6 +227,32 @@ class Coordinator::FragmentInstanceState {
/// Time, in ms, that it took to execute the ExecRemoteFragment() RPC.
int64_t rpc_latency_;
+
+ /// If true, ExecPlanFragment() rpc has been sent - even if it was not determined to be
+ /// successful.
+ bool rpc_sent_;
+
+ /// If true, execution terminated; do not cancel in that case.
+ bool done_;
+
+ /// True after the first call to profile->Update()
+ bool profile_created_;
+};
+
+/// Represents a runtime filter target.
+struct Coordinator::FilterTarget {
+ TPlanNodeId node_id;
+ bool is_local;
+ bool is_bound_by_partition_columns;
+
+ // indices into fragment_instance_states_
+ unordered_set<int> fragment_instance_state_idxs;
+
+ FilterTarget(const TRuntimeFilterTargetDesc& tFilterTarget) {
+ node_id = tFilterTarget.node_id;
+ is_bound_by_partition_columns = tFilterTarget.is_bound_by_partition_columns;
+ is_local = tFilterTarget.is_local_target;
+ }
};
@@ -410,7 +393,6 @@ TExecNodePhase::type GetExecNodePhase(const string& key) {
return TExecNodePhase::INVALID;
}
-// TODO: templatize this
TDebugAction::type GetDebugAction(const string& key) {
map<int, const char*>::const_iterator entry =
_TDebugAction_VALUES_TO_NAMES.begin();
@@ -451,7 +433,7 @@ static void ProcessQueryOptions(
Status Coordinator::Exec() {
const TQueryExecRequest& request = schedule_.request();
- DCHECK(request.fragments.size() > 0 || request.mt_plan_exec_info.size() > 0);
+ DCHECK(request.plan_exec_info.size() > 0);
needs_finalization_ = request.__isset.finalize_params;
if (needs_finalization_) finalize_params_ = request.finalize_params;
@@ -474,6 +456,10 @@ Status Coordinator::Exec() {
const string& str = Substitute("Query $0", PrintId(query_id_));
progress_.Init(str, schedule_.num_scan_ranges());
+ // runtime filters not yet supported for mt execution
+ bool is_mt_execution = request.query_ctx.request.query_options.mt_dop > 0;
+ if (is_mt_execution) filter_mode_ = TRuntimeFilterMode::OFF;
+
// to keep things simple, make async Cancel() calls wait until plan fragment
// execution has been initiated, otherwise we might try to cancel fragment
// execution at Impala daemons where it hasn't even started
@@ -494,16 +480,9 @@ Status Coordinator::Exec() {
filter_mem_tracker_.reset(
new MemTracker(-1, "Runtime Filter (Coordinator)", query_mem_tracker(), false));
- // Initialize the execution profile structures.
- bool is_mt_execution = request.query_ctx.request.query_options.mt_dop > 0;
- if (is_mt_execution) {
- MtInitExecProfiles();
- MtInitExecSummary();
- MtStartFInstances();
- } else {
- InitExecProfile(request);
- StartFragments();
- }
+ InitExecProfiles();
+ InitExecSummary();
+ StartFInstances();
// In the error case, it's safe to return and not to get root_sink_ here to close - if
// there was an error, but the coordinator fragment was successfully started, it should
@@ -543,13 +522,16 @@ Status Coordinator::Exec() {
return Status::OK();
}
-void Coordinator::UpdateFilterRoutingTable(const vector<TPlanNode>& plan_nodes,
- int num_hosts, int start_fragment_instance_state_idx) {
+void Coordinator::UpdateFilterRoutingTable(const FragmentExecParams& fragment_params) {
+ DCHECK(schedule_.request().query_ctx.request.query_options.mt_dop == 0);
+ int num_hosts = fragment_params.instance_exec_params.size();
+ DCHECK_GT(num_hosts, 0);
DCHECK_NE(filter_mode_, TRuntimeFilterMode::OFF)
<< "UpdateFilterRoutingTable() called although runtime filters are disabled";
DCHECK(!filter_routing_table_complete_)
<< "UpdateFilterRoutingTable() called after setting filter_routing_table_complete_";
- for (const TPlanNode& plan_node: plan_nodes) {
+
+ for (const TPlanNode& plan_node: fragment_params.fragment.plan.nodes) {
if (!plan_node.__isset.runtime_filters) continue;
for (const TRuntimeFilterDesc& filter: plan_node.runtime_filters) {
if (filter_mode_ == TRuntimeFilterMode::LOCAL && !filter.has_local_targets) {
@@ -564,10 +546,7 @@ void Coordinator::UpdateFilterRoutingTable(const vector<TPlanNode>& plan_nodes,
int pending_count = filter.is_broadcast_join ?
(filter.has_remote_targets ? 1 : 0) : num_hosts;
f->set_pending_count(pending_count);
- vector<int> src_idxs;
- for (int i = 0; i < num_hosts; ++i) {
- src_idxs.push_back(start_fragment_instance_state_idx + i);
- }
+ vector<int> src_idxs = fragment_params.GetInstanceIdxs();
// If this is a broadcast join with only non-local targets, build and publish it
// on MAX_BROADCAST_FILTER_PRODUCERS instances. If this is not a broadcast join
@@ -586,11 +565,9 @@ void Coordinator::UpdateFilterRoutingTable(const vector<TPlanNode>& plan_nodes,
if (filter_mode_ == TRuntimeFilterMode::LOCAL && !tFilterTarget.is_local_target) {
continue;
}
+ vector<int> idxs = fragment_params.GetInstanceIdxs();
FilterTarget target(tFilterTarget);
- for (int i = 0; i < num_hosts; ++i) {
- target.fragment_instance_state_idxs.insert(
- start_fragment_instance_state_idx + i);
- }
+ target.fragment_instance_state_idxs.insert(idxs.begin(), idxs.end());
f->targets()->push_back(target);
} else {
DCHECK(false) << "Unexpected plan node with runtime filters: "
@@ -600,7 +577,7 @@ void Coordinator::UpdateFilterRoutingTable(const vector<TPlanNode>& plan_nodes,
}
}
-void Coordinator::StartFragments() {
+void Coordinator::StartFInstances() {
int num_fragment_instances = schedule_.GetNumFragmentInstances();
DCHECK_GT(num_fragment_instances, 0);
@@ -615,78 +592,28 @@ void Coordinator::StartFragments() {
VLOG_QUERY << "starting " << num_fragment_instances << " fragment instances for query "
<< query_id_;
query_events_->MarkEvent(
- Substitute("Ready to start $0 fragments", num_fragment_instances));
+ Substitute("Ready to start $0 fragment instances", num_fragment_instances));
+
+ // TODO-MT: populate the runtime filter routing table
+ // This requires local aggregation of filters prior to sending
+ // for broadcast joins in order to avoid more complicated merge logic here.
- int instance_state_idx = 0;
if (filter_mode_ != TRuntimeFilterMode::OFF) {
+ DCHECK_EQ(request.plan_exec_info.size(), 1);
// Populate the runtime filter routing table. This should happen before starting the
// fragment instances. This code anticipates the indices of the instance states
// created later on in ExecRemoteFragment()
- for (int fragment_idx = 0; fragment_idx < request.fragments.size(); ++fragment_idx) {
- const FragmentExecParams& params = schedule_.exec_params()[fragment_idx];
- int num_hosts = params.hosts.size();
- DCHECK_GT(num_hosts, 0);
- UpdateFilterRoutingTable(request.fragments[fragment_idx].plan.nodes, num_hosts,
- instance_state_idx);
- instance_state_idx += num_hosts;
+ for (const FragmentExecParams& fragment_params: schedule_.fragment_exec_params()) {
+ UpdateFilterRoutingTable(fragment_params);
}
MarkFilterRoutingTableComplete();
}
int num_instances = 0;
- // Start one fragment instance per fragment per host (number of hosts running each
- // fragment may not be constant).
- for (int fragment_idx = 0; fragment_idx < request.fragments.size(); ++fragment_idx) {
- const FragmentExecParams& params = schedule_.exec_params()[fragment_idx];
- int num_hosts = params.hosts.size();
- DCHECK_GT(num_hosts, 0);
- fragment_profiles_[fragment_idx].num_instances = num_hosts;
- // Start one fragment instance for every fragment_instance required by the
- // schedule. Each fragment instance is assigned a unique ID, numbered from 0, with
- // instances for fragment ID 0 being assigned IDs [0 .. num_hosts(fragment_id_0)] and
- // so on.
- for (int fragment_instance_idx = 0; fragment_instance_idx < num_hosts;
- ++fragment_instance_idx, ++num_instances) {
- DebugOptions* fragment_instance_debug_options =
- debug_options.IsApplicable(instance_state_idx) ? &debug_options : NULL;
- exec_env_->fragment_exec_thread_pool()->Offer(
- std::bind(&Coordinator::ExecRemoteFragment, this, std::cref(params),
- std::cref(request.fragments[fragment_idx]), fragment_instance_debug_options,
- fragment_instance_idx));
- }
- }
- exec_complete_barrier_->Wait();
- query_events_->MarkEvent(
- Substitute("All $0 fragments instances started", num_instances));
-}
-
-void Coordinator::MtStartFInstances() {
- int num_fragment_instances = schedule_.GetNumFragmentInstances();
- DCHECK_GT(num_fragment_instances, 0);
-
- fragment_instance_states_.resize(num_fragment_instances);
- exec_complete_barrier_.reset(new CountingBarrier(num_fragment_instances));
- num_remaining_fragment_instances_ = num_fragment_instances;
-
- DebugOptions debug_options;
- ProcessQueryOptions(schedule_.query_options(), &debug_options);
- const TQueryExecRequest& request = schedule_.request();
-
- VLOG_QUERY << "starting " << num_fragment_instances << " fragment instances for query "
- << query_id_;
- query_events_->MarkEvent(
- Substitute("Ready to start $0 fragment instances", num_fragment_instances));
-
- // TODO: populate the runtime filter routing table
- // this requires local aggregation of filters prior to sending
- // for broadcast joins in order to avoid more complicated merge logic here
-
- int num_instances = 0;
- for (const MtFragmentExecParams& fragment_params: schedule_.mt_fragment_exec_params()) {
- for (int i = 0; i < fragment_params.instance_exec_params.size();
- ++i, ++num_instances) {
- const FInstanceExecParams& instance_params =
- fragment_params.instance_exec_params[i];
+ for (const FragmentExecParams& fragment_params: schedule_.fragment_exec_params()) {
+ num_instances += fragment_params.instance_exec_params.size();
+ for (const FInstanceExecParams& instance_params:
+ fragment_params.instance_exec_params) {
FragmentInstanceState* exec_state = obj_pool()->Add(
new FragmentInstanceState(instance_params, obj_pool()));
int instance_state_idx = GetInstanceIdx(instance_params.instance_id);
@@ -695,7 +622,7 @@ void Coordinator::MtStartFInstances() {
DebugOptions* instance_debug_options =
debug_options.IsApplicable(instance_state_idx) ? &debug_options : NULL;
exec_env_->fragment_exec_thread_pool()->Offer(
- std::bind(&Coordinator::MtExecRemoteFInstance,
+ std::bind(&Coordinator::ExecRemoteFInstance,
this, std::cref(instance_params), instance_debug_options));
}
}
@@ -1243,79 +1170,12 @@ void Coordinator::PrintFragmentInstanceInfo() {
}
}
-void Coordinator::InitExecProfile(const TQueryExecRequest& request) {
- // Initialize the structure to collect execution summary of every plan node.
- fragment_profiles_.resize(request.fragments.size());
- exec_summary_.__isset.nodes = true;
- for (int i = 0; i < request.fragments.size(); ++i) {
- if (!request.fragments[i].__isset.plan) continue;
- const TPlan& plan = request.fragments[i].plan;
- int fragment_first_node_idx = exec_summary_.nodes.size();
-
- for (int j = 0; j < plan.nodes.size(); ++j) {
- TPlanNodeExecSummary node;
- node.node_id = plan.nodes[j].node_id;
- node.fragment_idx = i;
- node.label = plan.nodes[j].label;
- node.__set_label_detail(plan.nodes[j].label_detail);
- node.num_children = plan.nodes[j].num_children;
-
- if (plan.nodes[j].__isset.estimated_stats) {
- node.__set_estimated_stats(plan.nodes[j].estimated_stats);
- }
-
- plan_node_id_to_summary_map_[plan.nodes[j].node_id] = exec_summary_.nodes.size();
- exec_summary_.nodes.push_back(node);
- }
-
- if (request.fragments[i].__isset.output_sink &&
- request.fragments[i].output_sink.type == TDataSinkType::DATA_STREAM_SINK) {
- const TDataStreamSink& sink = request.fragments[i].output_sink.stream_sink;
- int exch_idx = plan_node_id_to_summary_map_[sink.dest_node_id];
- if (sink.output_partition.type == TPartitionType::UNPARTITIONED) {
- exec_summary_.nodes[exch_idx].__set_is_broadcast(true);
- }
- exec_summary_.__isset.exch_to_sender_map = true;
- exec_summary_.exch_to_sender_map[exch_idx] = fragment_first_node_idx;
- }
- }
-
- // Initialize the runtime profile structure. This adds the per fragment average
- // profiles followed by the per fragment instance profiles.
- for (int i = 0; i < request.fragments.size(); ++i) {
- // Insert the avg profiles in ascending fragment number order. If there is a
- // coordinator fragment, it's been placed in fragment_profiles_[0].averaged_profile,
- // ensuring that this code will put the first averaged profile immediately after
- // it. If there is no coordinator fragment, the first averaged profile will be
- // inserted as the first child of query_profile_, and then all other averaged
- // fragments will follow.
- bool is_coordinator_fragment = (i == 0 && schedule_.GetCoordFragment() != nullptr);
- string profile_name =
- Substitute(is_coordinator_fragment ? "Coordinator Fragment $0" : "Fragment $0",
- request.fragments[i].display_name);
- fragment_profiles_[i].root_profile =
- obj_pool()->Add(new RuntimeProfile(obj_pool(), profile_name));
- if (is_coordinator_fragment) {
- fragment_profiles_[i].averaged_profile = nullptr;
- } else {
- fragment_profiles_[i].averaged_profile = obj_pool()->Add(new RuntimeProfile(
- obj_pool(),
- Substitute("Averaged Fragment $0", request.fragments[i].display_name), true));
- query_profile_->AddChild(fragment_profiles_[i].averaged_profile, true,
- (i > 0) ? fragment_profiles_[i - 1].averaged_profile : NULL);
- }
- // Note: we don't start the wall timer here for the fragment
- // profile; it's uninteresting and misleading.
- query_profile_->AddChild(fragment_profiles_[i].root_profile);
- }
-}
-
-void Coordinator::MtInitExecSummary() {
+void Coordinator::InitExecSummary() {
const TQueryExecRequest& request = schedule_.request();
// init exec_summary_.{nodes, exch_to_sender_map}
exec_summary_.__isset.nodes = true;
DCHECK(exec_summary_.nodes.empty());
- for (const TPlanExecInfo& plan_exec_info: request.mt_plan_exec_info) {
+ for (const TPlanExecInfo& plan_exec_info: request.plan_exec_info) {
for (const TPlanFragment& fragment: plan_exec_info.fragments) {
if (!fragment.__isset.plan) continue;
@@ -1354,7 +1214,7 @@ void Coordinator::MtInitExecSummary() {
}
}
-void Coordinator::MtInitExecProfiles() {
+void Coordinator::InitExecProfiles() {
const TQueryExecRequest& request = schedule_.request();
vector<const TPlanFragment*> fragments;
schedule_.GetTPlanFragments(&fragments);
@@ -1370,6 +1230,7 @@ void Coordinator::MtInitExecProfiles() {
PerFragmentProfileData* data = &fragment_profiles_[fragment->idx];
data->num_instances =
schedule_.GetFragmentExecParams(fragment->idx).instance_exec_params.size();
+ // TODO-MT: stop special-casing the coordinator fragment
if (fragment != coord_fragment) {
data->averaged_profile = obj_pool()->Add(new RuntimeProfile(
obj_pool(), Substitute("Averaged Fragment $0", fragment->display_name), true));
@@ -1383,7 +1244,6 @@ void Coordinator::MtInitExecProfiles() {
}
}
-
void Coordinator::CollectScanNodeCounters(RuntimeProfile* profile,
FragmentInstanceCounters* counters) {
vector<RuntimeProfile*> children;
@@ -1407,11 +1267,11 @@ void Coordinator::CollectScanNodeCounters(RuntimeProfile* profile,
}
}
-void Coordinator::MtExecRemoteFInstance(
+void Coordinator::ExecRemoteFInstance(
const FInstanceExecParams& exec_params, const DebugOptions* debug_options) {
NotifyBarrierOnExit notifier(exec_complete_barrier_.get());
TExecPlanFragmentParams rpc_params;
- MtSetExecPlanFragmentParams(exec_params, &rpc_params);
+ SetExecPlanFragmentParams(exec_params, &rpc_params);
if (debug_options != NULL) {
rpc_params.fragment_instance_ctx.__set_debug_node_id(debug_options->node_id);
rpc_params.fragment_instance_ctx.__set_debug_action(debug_options->action);
@@ -1467,70 +1327,6 @@ void Coordinator::MtExecRemoteFInstance(
<< " instance_id=" << PrintId(exec_state->fragment_instance_id());
}
-void Coordinator::ExecRemoteFragment(const FragmentExecParams& fragment_exec_params,
- const TPlanFragment& plan_fragment, DebugOptions* debug_options,
- int fragment_instance_idx) {
- NotifyBarrierOnExit notifier(exec_complete_barrier_.get());
- TExecPlanFragmentParams rpc_params;
- SetExecPlanFragmentParams(
- plan_fragment, fragment_exec_params, fragment_instance_idx, &rpc_params);
- if (debug_options != NULL) {
- rpc_params.fragment_instance_ctx.__set_debug_node_id(debug_options->node_id);
- rpc_params.fragment_instance_ctx.__set_debug_action(debug_options->action);
- rpc_params.fragment_instance_ctx.__set_debug_phase(debug_options->phase);
- }
- FragmentInstanceState* exec_state = obj_pool()->Add(
- new FragmentInstanceState(
- plan_fragment.idx, fragment_exec_params, fragment_instance_idx, obj_pool()));
- exec_state->ComputeTotalSplitSize(
- rpc_params.fragment_instance_ctx.per_node_scan_ranges);
- int instance_state_idx = GetInstanceIdx(exec_state->fragment_instance_id());
- fragment_instance_states_[instance_state_idx] = exec_state;
- VLOG_FILE << "making rpc: ExecPlanFragment"
- << " instance_id=" << PrintId(exec_state->fragment_instance_id())
- << " host=" << exec_state->impalad_address();
-
- int64_t start = MonotonicMillis();
-
- Status client_connect_status;
- ImpalaBackendConnection backend_client(exec_env_->impalad_client_cache(),
- exec_state->impalad_address(), &client_connect_status);
- if (!client_connect_status.ok()) {
- exec_state->SetInitialStatus(client_connect_status, true);
- return;
- }
-
- TExecPlanFragmentResult thrift_result;
- Status rpc_status = backend_client.DoRpc(&ImpalaBackendClient::ExecPlanFragment,
- rpc_params, &thrift_result);
-
- exec_state->set_rpc_latency(MonotonicMillis() - start);
-
- const string ERR_TEMPLATE = "ExecPlanRequest rpc instance_id=$0 failed: $1";
-
- if (!rpc_status.ok()) {
- const string& err_msg =
- Substitute(ERR_TEMPLATE, PrintId(exec_state->fragment_instance_id()),
- rpc_status.msg().msg());
- VLOG_QUERY << err_msg;
- exec_state->SetInitialStatus(Status(err_msg), true);
- return;
- }
-
- Status exec_plan_status = Status(thrift_result.status);
- if (!exec_plan_status.ok()) {
- const string& err_msg =
- Substitute(ERR_TEMPLATE, PrintId(exec_state->fragment_instance_id()),
- exec_plan_status.msg().GetFullMessageDetails());
- VLOG_QUERY << err_msg;
- exec_state->SetInitialStatus(Status(err_msg), true);
- return;
- }
-
- exec_state->SetInitialStatus(Status::OK(), true);
- return;
-}
-
void Coordinator::Cancel(const Status* cause) {
lock_guard<mutex> l(lock_);
// if the query status indicates an error, cancellation has already been initiated
@@ -1834,14 +1630,6 @@ void Coordinator::UpdateExecSummary(const FragmentInstanceState& instance_state)
TPlanNodeExecSummary& exec_summary =
exec_summary_.nodes[plan_node_id_to_summary_map_[node_id]];
- if (exec_summary.exec_stats.empty()) {
- // First time, make an exec_stats for each instance this plan node is running on.
- // TODO-MT: remove this and initialize all runtime state prior to starting
- // instances
- DCHECK_LT(instance_state.fragment_idx(), fragment_profiles_.size());
- exec_summary.exec_stats.resize(
- fragment_profiles_[instance_state.fragment_idx()].num_instances);
- }
DCHECK_LT(instance_state.per_fragment_instance_idx(), exec_summary.exec_stats.size());
DCHECK_EQ(fragment_profiles_[instance_state.fragment_idx()].num_instances,
exec_summary.exec_stats.size());
@@ -1956,7 +1744,7 @@ string Coordinator::GetErrorLog() {
return PrintErrorMapToString(merged);
}
-void Coordinator::MtSetExecPlanFragmentParams(
+void Coordinator::SetExecPlanFragmentParams(
const FInstanceExecParams& params, TExecPlanFragmentParams* rpc_params) {
rpc_params->__set_protocol_version(ImpalaInternalServiceVersion::V1);
rpc_params->__set_query_ctx(query_ctx_);
@@ -1965,35 +1753,12 @@ void Coordinator::MtSetExecPlanFragmentParams(
TPlanFragmentInstanceCtx fragment_instance_ctx;
fragment_ctx.__set_fragment(params.fragment());
- // TODO: Remove filters that weren't selected during filter routing table construction.
SetExecPlanDescriptorTable(params.fragment(), rpc_params);
- fragment_instance_ctx.__set_request_pool(schedule_.request_pool());
- fragment_instance_ctx.__set_per_node_scan_ranges(params.per_node_scan_ranges);
- fragment_instance_ctx.__set_per_exch_num_senders(
- params.fragment_exec_params.per_exch_num_senders);
- fragment_instance_ctx.__set_destinations(
- params.fragment_exec_params.destinations);
- fragment_instance_ctx.__set_sender_id(params.sender_id);
- fragment_instance_ctx.fragment_instance_id = params.instance_id;
- fragment_instance_ctx.per_fragment_instance_idx = params.per_fragment_instance_idx;
- rpc_params->__set_fragment_ctx(fragment_ctx);
- rpc_params->__set_fragment_instance_ctx(fragment_instance_ctx);
-}
-
-void Coordinator::SetExecPlanFragmentParams(
- const TPlanFragment& fragment, const FragmentExecParams& params,
- int per_fragment_instance_idx, TExecPlanFragmentParams* rpc_params) {
- rpc_params->__set_protocol_version(ImpalaInternalServiceVersion::V1);
- rpc_params->__set_query_ctx(query_ctx_);
-
- TPlanFragmentCtx fragment_ctx;
- TPlanFragmentInstanceCtx fragment_instance_ctx;
-
- fragment_ctx.__set_fragment(fragment);
- int instance_state_idx = GetInstanceIdx(params.instance_ids[per_fragment_instance_idx]);
// Remove filters that weren't selected during filter routing table construction.
if (filter_mode_ != TRuntimeFilterMode::OFF) {
+ DCHECK(schedule_.request().query_ctx.request.query_options.mt_dop == 0);
+ int instance_idx = GetInstanceIdx(params.instance_id);
for (TPlanNode& plan_node: rpc_params->fragment_ctx.fragment.plan.nodes) {
if (plan_node.__isset.runtime_filters) {
vector<TRuntimeFilterDesc> required_filters;
@@ -2003,7 +1768,7 @@ void Coordinator::SetExecPlanFragmentParams(
if (filter_it == filter_routing_table_.end()) continue;
const FilterState& f = filter_it->second;
if (plan_node.__isset.hash_join_node) {
- if (f.src_fragment_instance_state_idxs().find(instance_state_idx) ==
+ if (f.src_fragment_instance_state_idxs().find(instance_idx) ==
f.src_fragment_instance_state_idxs().end()) {
DCHECK(desc.is_broadcast_join);
continue;
@@ -2018,24 +1783,16 @@ void Coordinator::SetExecPlanFragmentParams(
}
}
}
- SetExecPlanDescriptorTable(fragment, rpc_params);
-
- TNetworkAddress exec_host = params.hosts[per_fragment_instance_idx];
- FragmentScanRangeAssignment::const_iterator it =
- params.scan_range_assignment.find(exec_host);
- // Scan ranges may not always be set, so use an empty structure if so.
- const PerNodeScanRanges& scan_ranges =
- (it != params.scan_range_assignment.end()) ? it->second : PerNodeScanRanges();
fragment_instance_ctx.__set_request_pool(schedule_.request_pool());
- fragment_instance_ctx.__set_per_node_scan_ranges(scan_ranges);
- fragment_instance_ctx.__set_per_exch_num_senders(params.per_exch_num_senders);
- fragment_instance_ctx.__set_destinations(params.destinations);
- fragment_instance_ctx.__set_sender_id(
- params.sender_id_base + per_fragment_instance_idx);
- fragment_instance_ctx.fragment_instance_id =
- params.instance_ids[per_fragment_instance_idx];
- fragment_instance_ctx.per_fragment_instance_idx = per_fragment_instance_idx;
+ fragment_instance_ctx.__set_per_node_scan_ranges(params.per_node_scan_ranges);
+ fragment_instance_ctx.__set_per_exch_num_senders(
+ params.fragment_exec_params.per_exch_num_senders);
+ fragment_instance_ctx.__set_destinations(
+ params.fragment_exec_params.destinations);
+ fragment_instance_ctx.__set_sender_id(params.sender_id);
+ fragment_instance_ctx.fragment_instance_id = params.instance_id;
+ fragment_instance_ctx.per_fragment_instance_idx = params.per_fragment_instance_idx;
rpc_params->__set_fragment_ctx(fragment_ctx);
rpc_params->__set_fragment_instance_ctx(fragment_instance_ctx);
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0d857237/be/src/runtime/coordinator.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h
index 176d89d..53f00eb 100644
--- a/be/src/runtime/coordinator.h
+++ b/be/src/runtime/coordinator.h
@@ -100,10 +100,6 @@ struct DebugOptions;
///
/// TODO: move into separate subdirectory and move nested classes into separate files
/// and unnest them
-///
-/// TODO: remove all data structures and functions that are superceded by their
-/// multi-threaded counterpart and remove the "Mt" prefix with which the latter
-/// is currently marked
class Coordinator {
public:
Coordinator(const QuerySchedule& schedule, ExecEnv* exec_env,
@@ -216,7 +212,8 @@ class Coordinator {
private:
class FragmentInstanceState;
- struct FilterState;
+ struct FilterTarget;
+ class FilterState;
/// Typedef for boost utility to compute averaged stats
/// TODO: including the median doesn't compile, looks like some includes are missing
@@ -253,7 +250,7 @@ class Coordinator {
/// FragmentInstanceStates for all fragment instances, including that of the coordinator
/// fragment. All elements are non-nullptr. Owned by obj_pool(). Filled in
- /// PrepareCoordFragment() and StartRemoteFragments()/MtStartRemoteFInstances().
+ /// StartFInstances().
std::vector<FragmentInstanceState*> fragment_instance_states_;
/// True if the query needs a post-execution step to tidy up
@@ -412,22 +409,6 @@ class Coordinator {
/// returned, successfully or not. Initialised during Exec().
boost::scoped_ptr<CountingBarrier> exec_complete_barrier_;
- /// Represents a runtime filter target.
- struct FilterTarget {
- TPlanNodeId node_id;
- bool is_local;
- bool is_bound_by_partition_columns;
-
- // indices into fragment_instance_states_
- boost::unordered_set<int> fragment_instance_state_idxs;
-
- FilterTarget(const TRuntimeFilterTargetDesc& tFilterTarget) {
- node_id = tFilterTarget.node_id;
- is_bound_by_partition_columns = tFilterTarget.is_bound_by_partition_columns;
- is_local = tFilterTarget.is_local_target;
- }
- };
-
/// Protects filter_routing_table_.
SpinLock filter_lock_;
@@ -444,7 +425,7 @@ class Coordinator {
RuntimeProfile::Counter* filter_updates_received_;
/// The filtering mode for this query. Set in constructor.
- const TRuntimeFilterMode::type filter_mode_;
+ TRuntimeFilterMode::type filter_mode_;
/// Tracks the memory consumed by runtime filters during aggregation. Child of
/// query_mem_tracker_.
@@ -459,22 +440,13 @@ class Coordinator {
/// Sets 'filter_routing_table_complete_' and prints the table to the profile and log.
void MarkFilterRoutingTableComplete();
- /// Fill in rpc_params based on parameters.
- /// 'per_fragment_instance_idx' is the 0-based ordinal of this particular fragment
- /// instance within its fragment.
- void SetExecPlanFragmentParams(const TPlanFragment& fragment,
- const FragmentExecParams& params, int per_fragment_instance_idx,
- TExecPlanFragmentParams* rpc_params);
- void MtSetExecPlanFragmentParams(
+ /// Fill in rpc_params based on params.
+ void SetExecPlanFragmentParams(
const FInstanceExecParams& params, TExecPlanFragmentParams* rpc_params);
/// Wrapper for ExecPlanFragment() RPC. This function will be called in parallel from
- /// multiple threads. Creates a new FragmentInstanceState and registers it in
- /// fragment_instance_states_, then calls RPC to issue fragment on remote impalad.
- void ExecRemoteFragment(const FragmentExecParams& fragment_exec_params,
- const TPlanFragment& plan_fragment, DebugOptions* debug_options,
- int fragment_instance_idx);
- void MtExecRemoteFInstance(
+ /// multiple threads.
+ void ExecRemoteFInstance(
const FInstanceExecParams& exec_params, const DebugOptions* debug_options);
/// Determine fragment number, given fragment id.
@@ -518,14 +490,13 @@ class Coordinator {
/// Moves all temporary staging files to their final destinations.
Status FinalizeSuccessfulInsert();
- /// Initializes the structures in runtime profile and exec_summary_. Must be
- /// called before RPCs to start remote fragments.
- void InitExecProfile(const TQueryExecRequest& request);
- void MtInitExecProfiles();
+ /// Initializes the structures in fragment_profiles_. Must be called before RPCs to
+ /// start remote fragments.
+ void InitExecProfiles();
/// Initialize the structures to collect execution summary of every plan node
/// (exec_summary_ and plan_node_id_to_summary_map_)
- void MtInitExecSummary();
+ void InitExecSummary();
/// Update fragment profile information from a fragment instance state.
void UpdateAverageProfile(FragmentInstanceState* fragment_instance_state);
@@ -556,13 +527,10 @@ class Coordinator {
void PopulatePathPermissionCache(hdfsFS fs, const std::string& path_str,
PermissionCache* permissions_cache);
- /// Starts all fragment instances contained in the schedule by issuing RPCs in parallel,
- /// and then waiting for all of the RPCs to complete.
- void StartFragments();
-
- /// Starts all fragment instances contained in the schedule by issuing RPCs in parallel
- /// and then waiting for all of the RPCs to complete.
- void MtStartFInstances();
+ /// Starts all fragment instances contained in the schedule by issuing RPCs in
+ /// parallel and then waiting for all of the RPCs to complete. Also sets up and
+ /// registers the state for all fragment instances.
+ void StartFInstances();
/// Calls CancelInternal() and returns an error if there was any error starting the
/// fragments.
@@ -570,11 +538,8 @@ class Coordinator {
Status FinishInstanceStartup();
/// Build the filter routing table by iterating over all plan nodes and collecting the
- /// filters that they either produce or consume. The source and target fragment
- /// instance indexes for filters are numbered in the range
- /// [start_fragment_instance_idx .. start_fragment_instance_idx + num_hosts]
- void UpdateFilterRoutingTable(const std::vector<TPlanNode>& plan_nodes, int num_hosts,
- int start_fragment_instance_idx);
+ /// filters that they either produce or consume.
+ void UpdateFilterRoutingTable(const FragmentExecParams& fragment_params);
};
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0d857237/be/src/scheduling/query-schedule.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/query-schedule.cc b/be/src/scheduling/query-schedule.cc
index 1eb36e3..e2dc7c4 100644
--- a/be/src/scheduling/query-schedule.cc
+++ b/be/src/scheduling/query-schedule.cc
@@ -50,39 +50,16 @@ QuerySchedule::QuerySchedule(const TUniqueId& query_id,
query_options_(query_options),
summary_profile_(summary_profile),
query_events_(query_events),
- num_fragment_instances_(0),
num_scan_ranges_(0),
next_instance_id_(query_id),
is_admitted_(false) {
- fragment_exec_params_.resize(request.fragments.size());
- bool is_mt_execution = request.query_ctx.request.query_options.mt_dop > 0;
-
- if (is_mt_execution) {
- /// TODO-MT: remove else branch and move MtInit() logic here
- MtInit();
- } else {
- // Build two maps to map node ids to their fragments as well as to the offset in their
- // fragment's plan's nodes list.
- for (int i = 0; i < request.fragments.size(); ++i) {
- int node_idx = 0;
- for (const TPlanNode& node: request.fragments[i].plan.nodes) {
- if (plan_node_to_fragment_idx_.size() < node.node_id + 1) {
- plan_node_to_fragment_idx_.resize(node.node_id + 1);
- plan_node_to_plan_node_idx_.resize(node.node_id + 1);
- }
- DCHECK_EQ(plan_node_to_fragment_idx_.size(), plan_node_to_plan_node_idx_.size());
- plan_node_to_fragment_idx_[node.node_id] = i;
- plan_node_to_plan_node_idx_[node.node_id] = node_idx;
- ++node_idx;
- }
- }
- }
+ Init();
}
-void QuerySchedule::MtInit() {
- // extract TPlanFragments and order by fragment id
+void QuerySchedule::Init() {
+ // extract TPlanFragments and order by fragment idx
vector<const TPlanFragment*> fragments;
- for (const TPlanExecInfo& plan_exec_info: request_.mt_plan_exec_info) {
+ for (const TPlanExecInfo& plan_exec_info: request_.plan_exec_info) {
for (const TPlanFragment& fragment: plan_exec_info.fragments) {
fragments.push_back(&fragment);
}
@@ -90,41 +67,34 @@ void QuerySchedule::MtInit() {
sort(fragments.begin(), fragments.end(),
[](const TPlanFragment* a, const TPlanFragment* b) { return a->idx < b->idx; });
- DCHECK_EQ(mt_fragment_exec_params_.size(), 0);
+ // this must only be called once
+ DCHECK_EQ(fragment_exec_params_.size(), 0);
for (const TPlanFragment* fragment: fragments) {
- mt_fragment_exec_params_.emplace_back(*fragment);
+ fragment_exec_params_.emplace_back(*fragment);
}
// mark coordinator fragment
- const TPlanFragment& coord_fragment = request_.mt_plan_exec_info[0].fragments[0];
- if (coord_fragment.partition.type == TPartitionType::UNPARTITIONED) {
- mt_fragment_exec_params_[coord_fragment.idx].is_coord_fragment = true;
- next_instance_id_.lo = 1; // generated instance ids start at 1
+ const TPlanFragment& root_fragment = request_.plan_exec_info[0].fragments[0];
+ if (request_.stmt_type == TStmtType::QUERY) {
+ fragment_exec_params_[root_fragment.idx].is_coord_fragment = true;
+ // the coordinator instance gets index 0, generated instance ids start at 1
+ next_instance_id_.lo = 1;
}
- // compute input fragments and find max node id
+ // find max node id
int max_node_id = 0;
- for (const TPlanExecInfo& plan_exec_info: request_.mt_plan_exec_info) {
+ for (const TPlanExecInfo& plan_exec_info: request_.plan_exec_info) {
for (const TPlanFragment& fragment: plan_exec_info.fragments) {
for (const TPlanNode& node: fragment.plan.nodes) {
max_node_id = max(node.node_id, max_node_id);
}
}
-
- // fragments[i] sends its output to fragments[dest_fragment_idx[i-1]]
- for (int i = 1; i < plan_exec_info.fragments.size(); ++i) {
- const TPlanFragment& fragment = plan_exec_info.fragments[i];
- FragmentIdx dest_idx =
- plan_exec_info.fragments[plan_exec_info.dest_fragment_idx[i - 1]].idx;
- MtFragmentExecParams& dest_params = mt_fragment_exec_params_[dest_idx];
- dest_params.input_fragments.push_back(fragment.idx);
- }
}
// populate plan_node_to_fragment_idx_ and plan_node_to_plan_node_idx_
plan_node_to_fragment_idx_.resize(max_node_id + 1);
plan_node_to_plan_node_idx_.resize(max_node_id + 1);
- for (const TPlanExecInfo& plan_exec_info: request_.mt_plan_exec_info) {
+ for (const TPlanExecInfo& plan_exec_info: request_.plan_exec_info) {
for (const TPlanFragment& fragment: plan_exec_info.fragments) {
for (int i = 0; i < fragment.plan.nodes.size(); ++i) {
const TPlanNode& node = fragment.plan.nodes[i];
@@ -133,8 +103,72 @@ void QuerySchedule::MtInit() {
}
}
}
+
+ // compute input fragments
+ for (const TPlanExecInfo& plan_exec_info: request_.plan_exec_info) {
+ // each fragment sends its output to the fragment containing the destination node
+ // of its output sink
+ for (const TPlanFragment& fragment: plan_exec_info.fragments) {
+ if (!fragment.output_sink.__isset.stream_sink) continue;
+ PlanNodeId dest_node_id = fragment.output_sink.stream_sink.dest_node_id;
+ FragmentIdx dest_idx = plan_node_to_fragment_idx_[dest_node_id];
+ FragmentExecParams& dest_params = fragment_exec_params_[dest_idx];
+ dest_params.input_fragments.push_back(fragment.idx);
+ }
+ }
}
+void QuerySchedule::Validate() const {
+ // all fragments have a FragmentExecParams
+ int num_fragments = 0;
+ for (const TPlanExecInfo& plan_exec_info: request_.plan_exec_info) {
+ for (const TPlanFragment& fragment: plan_exec_info.fragments) {
+ DCHECK_LT(fragment.idx, fragment_exec_params_.size());
+ DCHECK_EQ(fragment.idx, fragment_exec_params_[fragment.idx].fragment.idx);
+ ++num_fragments;
+ }
+ }
+ DCHECK_EQ(num_fragments, fragment_exec_params_.size());
+
+ // we assigned the correct number of scan ranges per (host, node id):
+ // assemble a map from host -> (map from node id -> #scan ranges)
+ unordered_map<TNetworkAddress, map<TPlanNodeId, int>> count_map;
+ for (const FragmentExecParams& fp: fragment_exec_params_) {
+ for (const FInstanceExecParams& ip: fp.instance_exec_params) {
+ auto host_it = count_map.find(ip.host);
+ if (host_it == count_map.end()) {
+ count_map.insert(make_pair(ip.host, map<TPlanNodeId, int>()));
+ host_it = count_map.find(ip.host);
+ }
+ map<TPlanNodeId, int>& node_map = host_it->second;
+
+ for (const PerNodeScanRanges::value_type& instance_entry: ip.per_node_scan_ranges) {
+ TPlanNodeId node_id = instance_entry.first;
+ auto count_entry = node_map.find(node_id);
+ if (count_entry == node_map.end()) {
+ node_map.insert(make_pair(node_id, 0));
+ count_entry = node_map.find(node_id);
+ }
+ count_entry->second += instance_entry.second.size();
+ }
+ }
+ }
+
+ for (const FragmentExecParams& fp: fragment_exec_params_) {
+ for (const FragmentScanRangeAssignment::value_type& assignment_entry:
+ fp.scan_range_assignment) {
+ const TNetworkAddress& host = assignment_entry.first;
+ DCHECK_GT(count_map.count(host), 0);
+ map<TPlanNodeId, int>& node_map = count_map.find(host)->second;
+ for (const PerNodeScanRanges::value_type& node_assignment:
+ assignment_entry.second) {
+ TPlanNodeId node_id = node_assignment.first;
+ DCHECK_GT(node_map.count(node_id), 0);
+ DCHECK_EQ(node_map[node_id], node_assignment.second.size());
+ }
+ }
+ }
+}
int64_t QuerySchedule::GetClusterMemoryEstimate() const {
DCHECK_GT(unique_hosts_.size(), 0);
@@ -199,15 +233,8 @@ const TPlanFragment& FInstanceExecParams::fragment() const {
int QuerySchedule::GetNumFragmentInstances() const {
int result = 0;
- if (mt_fragment_exec_params_.empty()) {
- DCHECK(!fragment_exec_params_.empty());
- for (const FragmentExecParams& fragment_exec_params : fragment_exec_params_) {
- result += fragment_exec_params.hosts.size();
- }
- } else {
- for (const MtFragmentExecParams& fragment_exec_params : mt_fragment_exec_params_) {
- result += fragment_exec_params.instance_exec_params.size();
- }
+ for (const FragmentExecParams& fragment_exec_params : fragment_exec_params_) {
+ result += fragment_exec_params.instance_exec_params.size();
}
return result;
}
@@ -215,37 +242,35 @@ int QuerySchedule::GetNumFragmentInstances() const {
const TPlanFragment* QuerySchedule::GetCoordFragment() const {
// Only have coordinator fragment for statements that return rows.
if (request_.stmt_type != TStmtType::QUERY) return nullptr;
- bool is_mt_exec = request_.query_ctx.request.query_options.mt_dop > 0;
- const TPlanFragment* fragment = is_mt_exec
- ? &request_.mt_plan_exec_info[0].fragments[0] : &request_.fragments[0];
-
- return fragment;
+ const TPlanFragment* fragment = &request_.plan_exec_info[0].fragments[0];
+ DCHECK_EQ(fragment->partition.type, TPartitionType::UNPARTITIONED);
+ return fragment;
}
+
void QuerySchedule::GetTPlanFragments(vector<const TPlanFragment*>* fragments) const {
fragments->clear();
- bool is_mt_exec = request_.query_ctx.request.query_options.mt_dop > 0;
- if (is_mt_exec) {
- for (const TPlanExecInfo& plan_info: request_.mt_plan_exec_info) {
- for (const TPlanFragment& fragment: plan_info.fragments) {
- fragments->push_back(&fragment);
- }
- }
- } else {
- for (const TPlanFragment& fragment: request_.fragments) {
+ for (const TPlanExecInfo& plan_info: request_.plan_exec_info) {
+ for (const TPlanFragment& fragment: plan_info.fragments) {
fragments->push_back(&fragment);
}
}
}
const FInstanceExecParams& QuerySchedule::GetCoordInstanceExecParams() const {
- const TPlanFragment& coord_fragment = request_.mt_plan_exec_info[0].fragments[0];
- DCHECK_EQ(coord_fragment.partition.type, TPartitionType::UNPARTITIONED);
- const MtFragmentExecParams* fragment_params =
- &mt_fragment_exec_params_[coord_fragment.idx];
- DCHECK(fragment_params != nullptr);
- DCHECK_EQ(fragment_params->instance_exec_params.size(), 1);
- return fragment_params->instance_exec_params[0];
+ DCHECK_EQ(request_.stmt_type, TStmtType::QUERY);
+ const TPlanFragment& coord_fragment = request_.plan_exec_info[0].fragments[0];
+ const FragmentExecParams& fragment_params = fragment_exec_params_[coord_fragment.idx];
+ DCHECK_EQ(fragment_params.instance_exec_params.size(), 1);
+ return fragment_params.instance_exec_params[0];
+}
+
+vector<int> FragmentExecParams::GetInstanceIdxs() const {
+ vector<int> result;
+ for (const FInstanceExecParams& instance_params: instance_exec_params) {
+ result.push_back(GetInstanceIdx(instance_params.instance_id));
+ }
+ return result;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0d857237/be/src/scheduling/query-schedule.h
----------------------------------------------------------------------
diff --git a/be/src/scheduling/query-schedule.h b/be/src/scheduling/query-schedule.h
index 77c9cd6..703c07c 100644
--- a/be/src/scheduling/query-schedule.h
+++ b/be/src/scheduling/query-schedule.h
@@ -35,7 +35,7 @@
namespace impala {
class Coordinator;
-struct MtFragmentExecParams;
+struct FragmentExecParams;
/// map from scan node id to a list of scan ranges
typedef std::map<TPlanNodeId, std::vector<TScanRangeParams>> PerNodeScanRanges;
@@ -45,22 +45,6 @@ typedef std::map<TPlanNodeId, std::vector<TScanRangeParams>> PerNodeScanRanges;
typedef std::unordered_map<TNetworkAddress, PerNodeScanRanges>
FragmentScanRangeAssignment;
-/// execution parameters for a single fragment; used to assemble the
-/// TPlanFragmentInstanceCtx;
-/// hosts.size() == instance_ids.size()
-struct FragmentExecParams {
- std::vector<TNetworkAddress> hosts; // execution backends
- std::vector<TUniqueId> instance_ids;
- std::vector<TPlanFragmentDestination> destinations;
- std::map<PlanNodeId, int> per_exch_num_senders;
- FragmentScanRangeAssignment scan_range_assignment;
- /// In its role as a data sender, a fragment instance is assigned a "sender id" to
- /// uniquely identify it to a receiver. The id that a particular fragment instance
- /// is assigned ranges from [sender_id_base, sender_id_base + N - 1], where
- /// N = hosts.size (i.e. N = number of fragment instances)
- int sender_id_base;
-};
-
/// execution parameters for a single fragment instance; used to assemble the
/// TPlanFragmentInstanceCtx
struct FInstanceExecParams {
@@ -75,12 +59,12 @@ struct FInstanceExecParams {
/// uniquely identify it to a receiver. -1 = invalid.
int sender_id;
- /// the parent MtFragmentExecParams
- const MtFragmentExecParams& fragment_exec_params;
+ /// the parent FragmentExecParams
+ const FragmentExecParams& fragment_exec_params;
const TPlanFragment& fragment() const;
FInstanceExecParams(const TUniqueId& instance_id, const TNetworkAddress& host,
- int per_fragment_instance_idx, const MtFragmentExecParams& fragment_exec_params)
+ int per_fragment_instance_idx, const FragmentExecParams& fragment_exec_params)
: instance_id(instance_id), host(host),
per_fragment_instance_idx(per_fragment_instance_idx),
sender_id(-1),
@@ -88,7 +72,7 @@ struct FInstanceExecParams {
};
/// Execution parameters shared between fragment instances
-struct MtFragmentExecParams {
+struct FragmentExecParams {
/// output destinations of this fragment
std::vector<TPlanFragmentDestination> destinations;
@@ -105,8 +89,11 @@ struct MtFragmentExecParams {
std::vector<FragmentIdx> input_fragments;
std::vector<FInstanceExecParams> instance_exec_params;
- MtFragmentExecParams(const TPlanFragment& fragment)
+ FragmentExecParams(const TPlanFragment& fragment)
: is_coord_fragment(false), fragment(fragment) {}
+
+ // extract instance indices from instance_exec_params.instance_id
+ std::vector<int> GetInstanceIdxs() const;
};
/// A QuerySchedule contains all necessary information for a query coordinator to
@@ -123,6 +110,11 @@ class QuerySchedule {
const TQueryOptions& query_options, RuntimeProfile* summary_profile,
RuntimeProfile::EventSequence* query_events);
+ /// Verifies that the schedule is well-formed (and DCHECKs if it isn't):
+ /// - all fragments have a FragmentExecParams
+ /// - all scan ranges are assigned
+ void Validate() const;
+
const TUniqueId& query_id() const { return query_id_; }
const TQueryExecRequest& request() const { return request_; }
const TQueryOptions& query_options() const { return query_options_; }
@@ -165,30 +157,24 @@ class QuerySchedule {
TUniqueId GetNextInstanceId();
const TPlanFragment& GetContainingFragment(PlanNodeId node_id) const {
- return mt_fragment_exec_params_[GetFragmentIdx(node_id)].fragment;
+ FragmentIdx fragment_idx = GetFragmentIdx(node_id);
+ DCHECK_LT(fragment_idx, fragment_exec_params_.size());
+ return fragment_exec_params_[fragment_idx].fragment;
}
- /// Map node ids to the index of their node inside their plan.nodes list.
- /// TODO-MT: remove; only needed for the ST path
- int32_t GetNodeIdx(PlanNodeId id) const { return plan_node_to_plan_node_idx_[id]; }
-
const TPlanNode& GetNode(PlanNodeId id) const {
const TPlanFragment& fragment = GetContainingFragment(id);
return fragment.plan.nodes[plan_node_to_plan_node_idx_[id]];
}
- std::vector<FragmentExecParams>* exec_params() { return &fragment_exec_params_; }
- const std::vector<FragmentExecParams>& exec_params() const {
+ const std::vector<FragmentExecParams>& fragment_exec_params() const {
return fragment_exec_params_;
}
- const std::vector<MtFragmentExecParams>& mt_fragment_exec_params() const {
- return mt_fragment_exec_params_;
- }
- const MtFragmentExecParams& GetFragmentExecParams(FragmentIdx idx) const {
- return mt_fragment_exec_params_[idx];
+ const FragmentExecParams& GetFragmentExecParams(FragmentIdx idx) const {
+ return fragment_exec_params_[idx];
}
- MtFragmentExecParams* GetFragmentExecParams(FragmentIdx idx) {
- return &mt_fragment_exec_params_[idx];
+ FragmentExecParams* GetFragmentExecParams(FragmentIdx idx) {
+ return &fragment_exec_params_[idx];
}
const FInstanceExecParams& GetCoordInstanceExecParams() const;
@@ -211,6 +197,8 @@ class QuerySchedule {
/// The query options from the TClientRequest
const TQueryOptions& query_options_;
+
+ /// TODO: move these into QueryState
RuntimeProfile* summary_profile_;
RuntimeProfile::EventSequence* query_events_;
@@ -220,21 +208,14 @@ class QuerySchedule {
/// Maps from plan node id to its index in plan.nodes. Filled in c'tor.
std::vector<int32_t> plan_node_to_plan_node_idx_;
- /// vector is indexed by fragment index from TQueryExecRequest.fragments;
- /// populated by Scheduler::Schedule()
+ // populated in Init() and Scheduler::Schedule()
+ // (SimpleScheduler::ComputeFInstanceExecParams()), indexed by fragment idx
+ // (TPlanFragment.idx)
std::vector<FragmentExecParams> fragment_exec_params_;
- // populated by Scheduler::Schedule (SimpleScheduler::ComputeMtFInstanceExecParams())
- // indexed by fragment idx (TPlanFragment.idx)
- std::vector<MtFragmentExecParams> mt_fragment_exec_params_;
-
/// The set of hosts that the query will run on excluding the coordinator.
boost::unordered_set<TNetworkAddress> unique_hosts_;
- /// Number of backends executing plan fragments on behalf of this query.
- /// TODO-MT: remove
- int64_t num_fragment_instances_;
-
/// Total number of scan ranges of this query.
int64_t num_scan_ranges_;
@@ -247,10 +228,10 @@ class QuerySchedule {
/// Indicates if the query has been admitted for execution.
bool is_admitted_;
- /// Populate mt_fragment_exec_params_ from request_.mt_plan_exec_info.
+ /// Populate fragment_exec_params_ from request_.plan_exec_info.
/// Sets is_coord_fragment and input_fragments.
/// Also populates plan_node_to_fragment_idx_ and plan_node_to_plan_node_idx_.
- void MtInit();
+ void Init();
};
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0d857237/be/src/scheduling/simple-scheduler-test-util.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/simple-scheduler-test-util.cc b/be/src/scheduling/simple-scheduler-test-util.cc
index d3f5584..911279c 100644
--- a/be/src/scheduling/simple-scheduler-test-util.cc
+++ b/be/src/scheduling/simple-scheduler-test-util.cc
@@ -202,7 +202,7 @@ const vector<TNetworkAddress>& Plan::referenced_datanodes() const {
return referenced_datanodes_;
}
-const vector<TScanRangeLocations>& Plan::scan_range_locations() const {
+const vector<TScanRangeLocationList>& Plan::scan_range_locations() const {
return scan_range_locations_;
}
@@ -211,14 +211,14 @@ void Plan::AddTableScan(const TableName& table_name) {
const vector<Block>& blocks = table.blocks;
for (int i = 0; i < blocks.size(); ++i) {
const Block& block = blocks[i];
- TScanRangeLocations scan_range_locations;
- BuildTScanRangeLocations(table_name, block, i, &scan_range_locations);
+ TScanRangeLocationList scan_range_locations;
+ BuildTScanRangeLocationList(table_name, block, i, &scan_range_locations);
scan_range_locations_.push_back(scan_range_locations);
}
}
-void Plan::BuildTScanRangeLocations(const TableName& table_name, const Block& block,
- int block_idx, TScanRangeLocations* scan_range_locations) {
+void Plan::BuildTScanRangeLocationList(const TableName& table_name, const Block& block,
+ int block_idx, TScanRangeLocationList* scan_range_locations) {
const vector<int>& replica_idxs = block.replica_host_idxs;
const vector<bool>& is_cached = block.replica_host_idx_is_cached;
DCHECK_EQ(replica_idxs.size(), is_cached.size());
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0d857237/be/src/scheduling/simple-scheduler-test-util.h
----------------------------------------------------------------------
diff --git a/be/src/scheduling/simple-scheduler-test-util.h b/be/src/scheduling/simple-scheduler-test-util.h
index ab46e2a..82190ad 100644
--- a/be/src/scheduling/simple-scheduler-test-util.h
+++ b/be/src/scheduling/simple-scheduler-test-util.h
@@ -231,11 +231,11 @@ class Plan {
const std::vector<TNetworkAddress>& referenced_datanodes() const;
- const std::vector<TScanRangeLocations>& scan_range_locations() const;
+ const std::vector<TScanRangeLocationList>& scan_range_locations() const;
/// Add a scan of table 'table_name' to the plan. This method will populate the internal
- /// list of TScanRangeLocations and can be called multiple times for the same table to
- /// schedule additional scans.
+ /// list of TScanRangeLocationList and can be called multiple times for the same table
+ /// to schedule additional scans.
void AddTableScan(const TableName& table_name);
private:
@@ -252,11 +252,11 @@ class Plan {
std::unordered_map<int, int> host_idx_to_datanode_idx_;
/// List of all scan range locations, which can be passed to the SimpleScheduler.
- std::vector<TScanRangeLocations> scan_range_locations_;
+ std::vector<TScanRangeLocationList> scan_range_locations_;
- /// Initialize a TScanRangeLocations object in place.
- void BuildTScanRangeLocations(const TableName& table_name, const Block& block,
- int block_idx, TScanRangeLocations* scan_range_locations);
+ /// Initialize a TScanRangeLocationList object in place.
+ void BuildTScanRangeLocationList(const TableName& table_name, const Block& block,
+ int block_idx, TScanRangeLocationList* scan_range_locations);
void BuildScanRange(const TableName& table_name, const Block& block, int block_idx,
TScanRange* scan_range);