You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2016/10/06 00:31:16 UTC
[2/4] incubator-impala git commit: IMPALA-3902: Scheduler
improvements for running multiple fragment instances on a single backend
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/be/src/scheduling/query-schedule.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/query-schedule.cc b/be/src/scheduling/query-schedule.cc
index b5745c4..5ad84df 100644
--- a/be/src/scheduling/query-schedule.cc
+++ b/be/src/scheduling/query-schedule.cc
@@ -52,25 +52,90 @@ QuerySchedule::QuerySchedule(const TUniqueId& query_id,
query_events_(query_events),
num_fragment_instances_(0),
num_scan_ranges_(0),
+ next_instance_id_(query_id),
is_admitted_(false) {
fragment_exec_params_.resize(request.fragments.size());
- // Build two maps to map node ids to their fragments as well as to the offset in their
- // fragment's plan's nodes list.
- for (int i = 0; i < request.fragments.size(); ++i) {
- int node_idx = 0;
- for (const TPlanNode& node: request.fragments[i].plan.nodes) {
- if (plan_node_to_fragment_idx_.size() < node.node_id + 1) {
- plan_node_to_fragment_idx_.resize(node.node_id + 1);
- plan_node_to_plan_node_idx_.resize(node.node_id + 1);
+ bool is_mt_execution = request.query_ctx.request.query_options.mt_dop > 0;
+
+ if (is_mt_execution) {
+ /// TODO-MT: remove else branch and move MtInit() logic here
+ MtInit();
+ } else {
+ // Build two maps to map node ids to their fragments as well as to the offset in their
+ // fragment's plan's nodes list.
+ for (int i = 0; i < request.fragments.size(); ++i) {
+ int node_idx = 0;
+ for (const TPlanNode& node: request.fragments[i].plan.nodes) {
+ if (plan_node_to_fragment_idx_.size() < node.node_id + 1) {
+ plan_node_to_fragment_idx_.resize(node.node_id + 1);
+ plan_node_to_plan_node_idx_.resize(node.node_id + 1);
+ }
+ DCHECK_EQ(plan_node_to_fragment_idx_.size(), plan_node_to_plan_node_idx_.size());
+ plan_node_to_fragment_idx_[node.node_id] = i;
+ plan_node_to_plan_node_idx_[node.node_id] = node_idx;
+ ++node_idx;
}
- DCHECK_EQ(plan_node_to_fragment_idx_.size(), plan_node_to_plan_node_idx_.size());
- plan_node_to_fragment_idx_[node.node_id] = i;
- plan_node_to_plan_node_idx_[node.node_id] = node_idx;
- ++node_idx;
}
}
}
+void QuerySchedule::MtInit() {
+ // extract TPlanFragments and order by fragment id
+ vector<const TPlanFragment*> fragments;
+ for (const TPlanExecInfo& plan_exec_info: request_.mt_plan_exec_info) {
+ for (const TPlanFragment& fragment: plan_exec_info.fragments) {
+ fragments.push_back(&fragment);
+ }
+ }
+ sort(fragments.begin(), fragments.end(),
+ [](const TPlanFragment* a, const TPlanFragment* b) { return a->idx < b->idx; });
+
+ DCHECK_EQ(mt_fragment_exec_params_.size(), 0);
+ for (const TPlanFragment* fragment: fragments) {
+ mt_fragment_exec_params_.emplace_back(*fragment);
+ }
+
+ // mark coordinator fragment
+ const TPlanFragment& coord_fragment = request_.mt_plan_exec_info[0].fragments[0];
+ if (coord_fragment.partition.type == TPartitionType::UNPARTITIONED) {
+ mt_fragment_exec_params_[coord_fragment.idx].is_coord_fragment = true;
+ next_instance_id_.lo = 1; // generated instance ids start at 1
+ }
+
+ // compute input fragments and find max node id
+ int max_node_id = 0;
+ for (const TPlanExecInfo& plan_exec_info: request_.mt_plan_exec_info) {
+ for (const TPlanFragment& fragment: plan_exec_info.fragments) {
+ for (const TPlanNode& node: fragment.plan.nodes) {
+ max_node_id = max(node.node_id, max_node_id);
+ }
+ }
+
+ // fragments[i] sends its output to fragments[dest_fragment_idx[i-1]]
+ for (int i = 1; i < plan_exec_info.fragments.size(); ++i) {
+ const TPlanFragment& fragment = plan_exec_info.fragments[i];
+ FragmentIdx dest_idx =
+ plan_exec_info.fragments[plan_exec_info.dest_fragment_idx[i - 1]].idx;
+ MtFragmentExecParams& dest_params = mt_fragment_exec_params_[dest_idx];
+ dest_params.input_fragments.push_back(fragment.idx);
+ }
+ }
+
+ // populate plan_node_to_fragment_idx_ and plan_node_to_plan_node_idx_
+ plan_node_to_fragment_idx_.resize(max_node_id + 1);
+ plan_node_to_plan_node_idx_.resize(max_node_id + 1);
+ for (const TPlanExecInfo& plan_exec_info: request_.mt_plan_exec_info) {
+ for (const TPlanFragment& fragment: plan_exec_info.fragments) {
+ for (int i = 0; i < fragment.plan.nodes.size(); ++i) {
+ const TPlanNode& node = fragment.plan.nodes[i];
+ plan_node_to_fragment_idx_[node.node_id] = fragment.idx;
+ plan_node_to_plan_node_idx_[node.node_id] = i;
+ }
+ }
+ }
+}
+
+
int64_t QuerySchedule::GetClusterMemoryEstimate() const {
DCHECK_GT(unique_hosts_.size(), 0);
const int64_t total_cluster_mem = GetPerHostMemoryEstimate() * unique_hosts_.size();
@@ -122,4 +187,73 @@ void QuerySchedule::SetUniqueHosts(const unordered_set<TNetworkAddress>& unique_
unique_hosts_ = unique_hosts;
}
+TUniqueId QuerySchedule::GetNextInstanceId() {
+ TUniqueId result = next_instance_id_;
+ ++next_instance_id_.lo;
+ return result;
+}
+
+const TPlanFragment& FInstanceExecParams::fragment() const {
+ return fragment_exec_params.fragment;
+}
+
+int QuerySchedule::GetNumFragmentInstances() const {
+ if (mt_fragment_exec_params_.empty()) return num_fragment_instances_;
+ int result = 0;
+ for (const MtFragmentExecParams& fragment_exec_params: mt_fragment_exec_params_) {
+ result += fragment_exec_params.instance_exec_params.size();
+ }
+ return result;
+}
+
+int QuerySchedule::GetNumRemoteFInstances() const {
+ bool has_coordinator_fragment = GetCoordFragment() != nullptr;
+ int result = GetNumFragmentInstances();
+ bool is_mt_execution = request_.query_ctx.request.query_options.mt_dop > 0;
+ if (is_mt_execution && has_coordinator_fragment) --result;
+ return result;
+}
+
+int QuerySchedule::GetTotalFInstances() const {
+ int result = GetNumRemoteFInstances();
+ return GetCoordFragment() != nullptr ? result + 1 : result;
+}
+
+const TPlanFragment* QuerySchedule::GetCoordFragment() const {
+ bool is_mt_exec = request_.query_ctx.request.query_options.mt_dop > 0;
+ const TPlanFragment* fragment = is_mt_exec
+ ? &request_.mt_plan_exec_info[0].fragments[0] : &request_.fragments[0];
+ if (fragment->partition.type == TPartitionType::UNPARTITIONED) {
+ return fragment;
+ } else {
+ return nullptr;
+ }
+}
+
+void QuerySchedule::GetTPlanFragments(vector<const TPlanFragment*>* fragments) const {
+ fragments->clear();
+ bool is_mt_exec = request_.query_ctx.request.query_options.mt_dop > 0;
+ if (is_mt_exec) {
+ for (const TPlanExecInfo& plan_info: request_.mt_plan_exec_info) {
+ for (const TPlanFragment& fragment: plan_info.fragments) {
+ fragments->push_back(&fragment);
+ }
+ }
+ } else {
+ for (const TPlanFragment& fragment: request_.fragments) {
+ fragments->push_back(&fragment);
+ }
+ }
+}
+
+const FInstanceExecParams& QuerySchedule::GetCoordInstanceExecParams() const {
+ const TPlanFragment& coord_fragment = request_.mt_plan_exec_info[0].fragments[0];
+ DCHECK_EQ(coord_fragment.partition.type, TPartitionType::UNPARTITIONED);
+ const MtFragmentExecParams* fragment_params =
+ &mt_fragment_exec_params_[coord_fragment.idx];
+ DCHECK(fragment_params != nullptr);
+ DCHECK_EQ(fragment_params->instance_exec_params.size(), 1);
+ return fragment_params->instance_exec_params[0];
+}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/be/src/scheduling/query-schedule.h
----------------------------------------------------------------------
diff --git a/be/src/scheduling/query-schedule.h b/be/src/scheduling/query-schedule.h
index c8ebd5d..39ce268 100644
--- a/be/src/scheduling/query-schedule.h
+++ b/be/src/scheduling/query-schedule.h
@@ -20,13 +20,14 @@
#include <vector>
#include <string>
+#include <unordered_map>
#include <boost/unordered_set.hpp>
-#include <boost/unordered_map.hpp>
#include <boost/scoped_ptr.hpp>
#include "common/global-types.h"
#include "common/status.h"
#include "util/promise.h"
+#include "util/container-util.h"
#include "util/runtime-profile.h"
#include "gen-cpp/Types_types.h" // for TNetworkAddress
#include "gen-cpp/Frontend_types.h"
@@ -34,12 +35,14 @@
namespace impala {
class Coordinator;
+struct MtFragmentExecParams;
/// map from scan node id to a list of scan ranges
typedef std::map<TPlanNodeId, std::vector<TScanRangeParams>> PerNodeScanRanges;
+
/// map from an impalad host address to the per-node assigned scan ranges;
/// records scan range assignment for a single fragment
-typedef boost::unordered_map<TNetworkAddress, PerNodeScanRanges>
+typedef std::unordered_map<TNetworkAddress, PerNodeScanRanges>
FragmentScanRangeAssignment;
/// execution parameters for a single fragment; used to assemble the
@@ -58,14 +61,62 @@ struct FragmentExecParams {
int sender_id_base;
};
+/// execution parameters for a single fragment instance; used to assemble the
+/// TPlanFragmentInstanceCtx
+struct FInstanceExecParams {
+ TUniqueId instance_id;
+ TNetworkAddress host; // execution backend
+ PerNodeScanRanges per_node_scan_ranges;
+
+ /// 0-based ordinal of this particular instance within its fragment (not: query-wide)
+ int per_fragment_instance_idx;
+
+ /// In its role as a data sender, a fragment instance is assigned a "sender id" to
+ /// uniquely identify it to a receiver. -1 = invalid.
+ int sender_id;
+
+ /// the parent MtFragmentExecParams
+ const MtFragmentExecParams& fragment_exec_params;
+ const TPlanFragment& fragment() const;
+
+ FInstanceExecParams(const TUniqueId& instance_id, const TNetworkAddress& host,
+ int per_fragment_instance_idx, const MtFragmentExecParams& fragment_exec_params)
+ : instance_id(instance_id), host(host),
+ per_fragment_instance_idx(per_fragment_instance_idx),
+ sender_id(-1),
+ fragment_exec_params(fragment_exec_params) {}
+};
+
+/// Execution parameters shared between fragment instances
+struct MtFragmentExecParams {
+ /// output destinations of this fragment
+ std::vector<TPlanFragmentDestination> destinations;
+
+ /// map from node id to the number of senders (node id expected to be for an
+ /// ExchangeNode)
+ std::map<PlanNodeId, int> per_exch_num_senders;
+
+ // only needed as intermediate state during exec parameter computation;
+ // for scheduling, refer to FInstanceExecParams.per_node_scan_ranges
+ FragmentScanRangeAssignment scan_range_assignment;
+
+ bool is_coord_fragment;
+ const TPlanFragment& fragment;
+ std::vector<FragmentIdx> input_fragments;
+ std::vector<FInstanceExecParams> instance_exec_params;
+
+ MtFragmentExecParams(const TPlanFragment& fragment)
+ : is_coord_fragment(false), fragment(fragment) {}
+};
+
/// A QuerySchedule contains all necessary information for a query coordinator to
/// generate fragment execution requests and start query execution. If resource management
/// is enabled, then a schedule also contains the resource reservation request
/// and the granted resource reservation.
-/// TODO: Consider moving QuerySchedule and all Schedulers into
-/// their own lib (and out of statestore).
-/// TODO: Move all global state (e.g. profiles) to QueryExecState (after it is decoupled
-/// from ImpalaServer)
+///
+/// QuerySchedule is a container class for scheduling data, but it doesn't contain
+/// scheduling logic itself. Its state either comes from the static TQueryExecRequest
+/// or is computed by SimpleScheduler.
class QuerySchedule {
public:
QuerySchedule(const TUniqueId& query_id, const TQueryExecRequest& request,
@@ -87,19 +138,86 @@ class QuerySchedule {
int64_t GetClusterMemoryEstimate() const;
/// Helper methods used by scheduler to populate this QuerySchedule.
- void AddScanRanges(int64_t delta) { num_scan_ranges_ += delta; }
+ void IncNumScanRanges(int64_t delta) { num_scan_ranges_ += delta; }
+
+ /// The following 4 functions need to be replaced once we stop special-casing
+ /// the coordinator instance in the coordinator.
+ /// The replacement is a single function int GetNumFInstances() (which includes
+ /// the coordinator instance).
+
+ /// TODO-MT: remove; this is actually only the number of remote instances
+ /// (from the coordinator's perspective)
void set_num_fragment_instances(int64_t num_fragment_instances) {
num_fragment_instances_ = num_fragment_instances;
}
- int64_t num_fragment_instances() const { return num_fragment_instances_; }
+
+ /// Returns the number of fragment instances registered with this schedule.
+ /// MT: total number of fragment instances
+ /// ST: value set with set_num_fragment_instances(); excludes coord instance
+ /// (in effect the number of remote instances)
+ /// TODO-MT: get rid of special-casing of coordinator instance and always return the
+ /// total
+ int GetNumFragmentInstances() const;
+
+ /// Returns the total number of fragment instances, incl. coordinator fragment.
+ /// TODO-MT: remove
+ int GetTotalFInstances() const;
+
+ /// Returns the number of remote fragment instances (excludes coordinator).
+ /// Works for both MT and ST.
+ /// TODO-MT: remove
+ int GetNumRemoteFInstances() const;
+
+ /// Return the coordinator fragment, or nullptr if there isn't one.
+ const TPlanFragment* GetCoordFragment() const;
+
+ /// Return all fragments belonging to exec request in 'fragments'.
+ void GetTPlanFragments(std::vector<const TPlanFragment*>* fragments) const;
+
int64_t num_scan_ranges() const { return num_scan_ranges_; }
- /// Map node ids to the index of their fragment in TQueryExecRequest.fragments.
- int32_t GetFragmentIdx(PlanNodeId id) const { return plan_node_to_fragment_idx_[id]; }
+ /// Map node ids to the id of their containing fragment.
+ FragmentIdx GetFragmentIdx(PlanNodeId id) const {
+ return plan_node_to_fragment_idx_[id];
+ }
+
+ /// Returns next instance id. Instance ids are consecutive numbers generated from
+ /// the query id.
+ /// If the query contains a coordinator fragment instance, the generated instance
+ /// ids start at 1 and the caller is responsible for assigning the correct id
+ /// to the coordinator instance. If the query does not contain a coordinator instance,
+ /// the generated instance ids start at 0.
+ TUniqueId GetNextInstanceId();
+
+ const TPlanFragment& GetContainingFragment(PlanNodeId node_id) const {
+ return mt_fragment_exec_params_[GetFragmentIdx(node_id)].fragment;
+ }
- /// Map node ids to the index of the node inside their plan.nodes list.
+ /// Map node ids to the index of their node inside their plan.nodes list.
+ /// TODO-MT: remove; only needed for the ST path
int32_t GetNodeIdx(PlanNodeId id) const { return plan_node_to_plan_node_idx_[id]; }
+
+ const TPlanNode& GetNode(PlanNodeId id) const {
+ const TPlanFragment& fragment = GetContainingFragment(id);
+ return fragment.plan.nodes[plan_node_to_plan_node_idx_[id]];
+ }
+
std::vector<FragmentExecParams>* exec_params() { return &fragment_exec_params_; }
+ const std::vector<FragmentExecParams>& exec_params() const {
+ return fragment_exec_params_;
+ }
+ const std::vector<MtFragmentExecParams>& mt_fragment_exec_params() const {
+ return mt_fragment_exec_params_;
+ }
+ const MtFragmentExecParams& GetFragmentExecParams(FragmentIdx idx) const {
+ return mt_fragment_exec_params_[idx];
+ }
+ MtFragmentExecParams* GetFragmentExecParams(FragmentIdx idx) {
+ return &mt_fragment_exec_params_[idx];
+ }
+
+ const FInstanceExecParams& GetCoordInstanceExecParams() const;
+
const boost::unordered_set<TNetworkAddress>& unique_hosts() const {
return unique_hosts_;
}
@@ -111,7 +229,6 @@ class QuerySchedule {
void SetUniqueHosts(const boost::unordered_set<TNetworkAddress>& unique_hosts);
private:
-
/// These references are valid for the lifetime of this query schedule because they
/// are all owned by the enclosing QueryExecState.
const TUniqueId& query_id_;
@@ -122,7 +239,7 @@ class QuerySchedule {
RuntimeProfile* summary_profile_;
RuntimeProfile::EventSequence* query_events_;
- /// Maps from plan node id to its fragment index. Filled in c'tor.
+ /// Maps from plan node id to its fragment idx. Filled in c'tor.
std::vector<int32_t> plan_node_to_fragment_idx_;
/// Maps from plan node id to its index in plan.nodes. Filled in c'tor.
@@ -132,21 +249,33 @@ class QuerySchedule {
/// populated by Scheduler::Schedule()
std::vector<FragmentExecParams> fragment_exec_params_;
+ // populated by Scheduler::Schedule (SimpleScheduler::ComputeMtFInstanceExecParams())
+ // indexed by fragment idx (TPlanFragment.idx)
+ std::vector<MtFragmentExecParams> mt_fragment_exec_params_;
+
/// The set of hosts that the query will run on excluding the coordinator.
boost::unordered_set<TNetworkAddress> unique_hosts_;
/// Number of backends executing plan fragments on behalf of this query.
+ /// TODO-MT: remove
int64_t num_fragment_instances_;
/// Total number of scan ranges of this query.
int64_t num_scan_ranges_;
+ /// Used to generate consecutive fragment instance ids.
+ TUniqueId next_instance_id_;
+
/// Request pool to which the request was submitted for admission.
std::string request_pool_;
/// Indicates if the query has been admitted for execution.
bool is_admitted_;
+ /// Populate mt_fragment_exec_params_ from request_.mt_plan_exec_info.
+ /// Sets is_coord_fragment and input_fragments.
+ /// Also populates plan_node_to_fragment_idx_ and plan_node_to_plan_node_idx_.
+ void MtInit();
};
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/be/src/scheduling/scheduler.h
----------------------------------------------------------------------
diff --git a/be/src/scheduling/scheduler.h b/be/src/scheduling/scheduler.h
index 4c3a967..9bff424 100644
--- a/be/src/scheduling/scheduler.h
+++ b/be/src/scheduling/scheduler.h
@@ -46,12 +46,10 @@ class Scheduler {
/// List of server descriptors.
typedef std::vector<TBackendDescriptor> BackendList;
- /// Populates given query schedule whose execution is to be coordinated by coord.
- /// Assigns fragments to hosts based on scan ranges in the query exec request.
- /// If resource management is enabled, also reserves resources from the central
- /// resource manager (Yarn via Llama) to run the query in. This function blocks until
- /// the reservation request has been granted or denied.
- virtual Status Schedule(Coordinator* coord, QuerySchedule* schedule) = 0;
+ /// Populates given query schedule and assigns fragments to hosts based on scan
+ /// ranges in the query exec request. Submits schedule to admission control before
+ /// returning.
+ virtual Status Schedule(QuerySchedule* schedule) = 0;
/// Releases the reserved resources (if any) from the given schedule.
virtual Status Release(QuerySchedule* schedule) = 0;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/be/src/scheduling/simple-scheduler.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/simple-scheduler.cc b/be/src/scheduling/simple-scheduler.cc
index f3ba9a5..5b6303e 100644
--- a/be/src/scheduling/simple-scheduler.cc
+++ b/be/src/scheduling/simple-scheduler.cc
@@ -20,6 +20,7 @@
#include <atomic>
#include <random>
#include <vector>
+#include <algorithm>
#include <boost/algorithm/string.hpp>
#include <boost/algorithm/string/join.hpp>
@@ -276,6 +277,7 @@ void SimpleScheduler::UpdateMembership(
}
}
if (metrics_ != NULL) {
+ /// TODO-MT: fix this (do we even need to report it?)
num_fragment_instances_metric_->set_value(current_membership_.size());
}
}
@@ -305,7 +307,7 @@ Status SimpleScheduler::ComputeScanRangeAssignment(const TQueryExecRequest& exec
for (entry = exec_request.per_node_scan_ranges.begin();
entry != exec_request.per_node_scan_ranges.end(); ++entry) {
const TPlanNodeId node_id = entry->first;
- int fragment_idx = schedule->GetFragmentIdx(node_id);
+ FragmentIdx fragment_idx = schedule->GetFragmentIdx(node_id);
const TPlanFragment& fragment = exec_request.fragments[fragment_idx];
bool exec_at_coord = (fragment.partition.type == TPartitionType::UNPARTITIONED);
@@ -326,11 +328,208 @@ Status SimpleScheduler::ComputeScanRangeAssignment(const TQueryExecRequest& exec
node_id, node_replica_preference, node_random_replica, entry->second,
exec_request.host_list, exec_at_coord, schedule->query_options(),
total_assignment_timer, assignment));
- schedule->AddScanRanges(entry->second.size());
+ schedule->IncNumScanRanges(entry->second.size());
}
return Status::OK();
}
+Status SimpleScheduler::MtComputeScanRangeAssignment(QuerySchedule* schedule) {
+ RuntimeProfile::Counter* total_assignment_timer =
+ ADD_TIMER(schedule->summary_profile(), "ComputeScanRangeAssignmentTimer");
+ BackendConfigPtr backend_config = GetBackendConfig();
+ const TQueryExecRequest& exec_request = schedule->request();
+ for (const TPlanExecInfo& plan_exec_info: exec_request.mt_plan_exec_info) {
+ for (const auto& entry: plan_exec_info.per_node_scan_ranges) {
+ const TPlanNodeId node_id = entry.first;
+ const TPlanFragment& fragment = schedule->GetContainingFragment(node_id);
+ bool exec_at_coord = (fragment.partition.type == TPartitionType::UNPARTITIONED);
+
+ const TPlanNode& node = schedule->GetNode(node_id);
+ DCHECK_EQ(node.node_id, node_id);
+
+ const TReplicaPreference::type* node_replica_preference =
+ node.__isset.hdfs_scan_node && node.hdfs_scan_node.__isset.replica_preference
+ ? &node.hdfs_scan_node.replica_preference : NULL;
+ bool node_random_replica =
+ node.__isset.hdfs_scan_node && node.hdfs_scan_node.__isset.random_replica
+ && node.hdfs_scan_node.random_replica;
+
+ FragmentScanRangeAssignment* assignment =
+ &schedule->GetFragmentExecParams(fragment.idx)->scan_range_assignment;
+ RETURN_IF_ERROR(ComputeScanRangeAssignment(
+ *backend_config, node_id, node_replica_preference, node_random_replica,
+ entry.second, exec_request.host_list, exec_at_coord,
+ schedule->query_options(), total_assignment_timer, assignment));
+ schedule->IncNumScanRanges(entry.second.size());
+ }
+ }
+ return Status::OK();
+}
+
+void SimpleScheduler::MtComputeFragmentExecParams(QuerySchedule* schedule) {
+ const TQueryExecRequest& exec_request = schedule->request();
+
+ // for each plan, compute the FInstanceExecParams for the tree of fragments
+ for (const TPlanExecInfo& plan_exec_info: exec_request.mt_plan_exec_info) {
+ // set instance_id, host, per_node_scan_ranges
+ MtComputeFragmentExecParams(
+ plan_exec_info,
+ schedule->GetFragmentExecParams(plan_exec_info.fragments[0].idx),
+ schedule);
+
+ // Set destinations, per_exch_num_senders, sender_id.
+ // fragments[f] sends its output to fragments[dest_fragment_idx[f-1]];
+ // fragments[0] is an endpoint.
+ for (int i = 0; i < plan_exec_info.dest_fragment_idx.size(); ++i) {
+ int dest_idx = plan_exec_info.dest_fragment_idx[i];
+ DCHECK_LT(dest_idx, plan_exec_info.fragments.size());
+ const TPlanFragment& dest_fragment = plan_exec_info.fragments[dest_idx];
+ DCHECK_LT(i + 1, plan_exec_info.fragments.size());
+ const TPlanFragment& src_fragment = plan_exec_info.fragments[i + 1];
+ DCHECK(src_fragment.output_sink.__isset.stream_sink);
+ MtFragmentExecParams* dest_params =
+ schedule->GetFragmentExecParams(dest_fragment.idx);
+ MtFragmentExecParams* src_params =
+ schedule->GetFragmentExecParams(src_fragment.idx);
+
+ // populate src_params->destinations
+ src_params->destinations.resize(dest_params->instance_exec_params.size());
+ for (int j = 0; j < dest_params->instance_exec_params.size(); ++j) {
+ TPlanFragmentDestination& dest = src_params->destinations[j];
+ dest.__set_fragment_instance_id(dest_params->instance_exec_params[j].instance_id);
+ dest.__set_server(dest_params->instance_exec_params[j].host);
+ }
+
+ // enumerate senders consecutively;
+ // for distributed merge we need to enumerate senders across fragment instances
+ const TDataStreamSink& sink = src_fragment.output_sink.stream_sink;
+ DCHECK(
+ sink.output_partition.type == TPartitionType::UNPARTITIONED
+ || sink.output_partition.type == TPartitionType::HASH_PARTITIONED
+ || sink.output_partition.type == TPartitionType::RANDOM);
+ PlanNodeId exch_id = sink.dest_node_id;
+ int sender_id_base = dest_params->per_exch_num_senders[exch_id];
+ dest_params->per_exch_num_senders[exch_id] +=
+ src_params->instance_exec_params.size();
+ for (int j = 0; j < src_params->instance_exec_params.size(); ++j) {
+ FInstanceExecParams& src_instance_params = src_params->instance_exec_params[j];
+ src_instance_params.sender_id = sender_id_base + j;
+ }
+ }
+ }
+}
+
+void SimpleScheduler::MtComputeFragmentExecParams(
+ const TPlanExecInfo& plan_exec_info, MtFragmentExecParams* fragment_params,
+ QuerySchedule* schedule) {
+ // traverse input fragments
+ for (FragmentIdx input_fragment_idx: fragment_params->input_fragments) {
+ MtComputeFragmentExecParams(
+ plan_exec_info, schedule->GetFragmentExecParams(input_fragment_idx), schedule);
+ }
+
+ const TPlanFragment& fragment = fragment_params->fragment;
+ // TODO: deal with Union nodes
+ DCHECK(!ContainsNode(fragment.plan, TPlanNodeType::UNION_NODE));
+ // case 1: single instance executed at coordinator
+ if (fragment.partition.type == TPartitionType::UNPARTITIONED) {
+ const TNetworkAddress& coord = local_backend_descriptor_.address;
+ // make sure that the coordinator instance ends up with instance idx 0
+ TUniqueId instance_id = fragment_params->is_coord_fragment
+ ? schedule->query_id()
+ : schedule->GetNextInstanceId();
+ fragment_params->instance_exec_params.emplace_back(
+ instance_id, coord, 0, *fragment_params);
+ FInstanceExecParams& instance_params = fragment_params->instance_exec_params.back();
+
+ // That instance gets all of the scan ranges, if there are any.
+ if (!fragment_params->scan_range_assignment.empty()) {
+ DCHECK_EQ(fragment_params->scan_range_assignment.size(), 1);
+ auto first_entry = fragment_params->scan_range_assignment.begin();
+ instance_params.per_node_scan_ranges = first_entry->second;
+ }
+ } else {
+ PlanNodeId leftmost_scan_id = FindLeftmostScan(fragment.plan);
+ if (leftmost_scan_id != g_ImpalaInternalService_constants.INVALID_PLAN_NODE_ID) {
+ // case 2: leaf fragment with leftmost scan
+ // TODO: check that there's only one scan in this fragment
+ MtCreateScanInstances(leftmost_scan_id, fragment_params, schedule);
+ } else {
+ // case 3: interior fragment without leftmost scan
+ // we assign the same hosts as those of our leftmost input fragment (so that a
+ // merge aggregation fragment runs on the hosts that provide the input data)
+ MtCreateMirrorInstances(fragment_params, schedule);
+ }
+ }
+}
+
+void SimpleScheduler::MtCreateScanInstances(
+ PlanNodeId leftmost_scan_id, MtFragmentExecParams* fragment_params,
+ QuerySchedule* schedule) {
+ int max_num_instances = schedule->request().query_ctx.request.query_options.mt_dop;
+ for (const auto& assignment_entry: fragment_params->scan_range_assignment) {
+ // evenly divide up the scan ranges of the leftmost scan between at most
+ // <dop> instances
+ const TNetworkAddress& host = assignment_entry.first;
+ auto scan_ranges_it = assignment_entry.second.find(leftmost_scan_id);
+ DCHECK(scan_ranges_it != assignment_entry.second.end());
+ const vector<TScanRangeParams>& params_list = scan_ranges_it->second;
+
+ int64 total_size = 0;
+ for (const TScanRangeParams& params: params_list) {
+ // TODO: implement logic for hbase and kudu
+ DCHECK(params.scan_range.__isset.hdfs_file_split);
+ total_size += params.scan_range.hdfs_file_split.length;
+ }
+
+ // try to load-balance scan ranges by assigning just beyond the average number of
+ // bytes to each instance
+ // TODO: fix shortcomings introduced by uneven split sizes,
+ // this could end up assigning 0 scan ranges to an instance
+ int num_instances = ::min(max_num_instances, static_cast<int>(params_list.size()));
+ DCHECK_GT(num_instances, 0);
+ float avg_bytes_per_instance = static_cast<float>(total_size) / num_instances;
+ int64_t total_assigned_bytes = 0;
+ int params_idx = 0; // into params_list
+ for (int i = 0; i < num_instances; ++i) {
+ fragment_params->instance_exec_params.emplace_back(
+ schedule->GetNextInstanceId(), host, i, *fragment_params);
+ FInstanceExecParams& instance_params = fragment_params->instance_exec_params.back();
+
+ // threshold beyond which we want to assign to the next instance
+ int64_t threshold_total_bytes = avg_bytes_per_instance * (i + 1);
+
+ // this will have assigned all scan ranges by the last instance:
+ // for the last instance, threshold_total_bytes == total_size and
+ // total_assigned_bytes won't hit total_size until everything is assigned
+ while (params_idx < params_list.size()
+ && total_assigned_bytes < threshold_total_bytes) {
+ const TScanRangeParams& scan_range_params = params_list[params_idx];
+ instance_params.per_node_scan_ranges[leftmost_scan_id].push_back(
+ scan_range_params);
+ total_assigned_bytes += scan_range_params.scan_range.hdfs_file_split.length;
+ ++params_idx;
+ }
+ if (params_idx >= params_list.size()) break; // nothing left to assign
+ }
+ DCHECK_EQ(params_idx, params_list.size()); // everything got assigned
+ }
+}
+
+void SimpleScheduler::MtCreateMirrorInstances(
+ MtFragmentExecParams* fragment_params, QuerySchedule* schedule) {
+ DCHECK_GE(fragment_params->input_fragments.size(), 1);
+ const MtFragmentExecParams* input_fragment_params =
+ schedule->GetFragmentExecParams(fragment_params->input_fragments[0]);
+ int per_fragment_instance_idx = 0;
+ for (const FInstanceExecParams& input_instance_params:
+ input_fragment_params->instance_exec_params) {
+ fragment_params->instance_exec_params.emplace_back(
+ schedule->GetNextInstanceId(), input_instance_params.host,
+ per_fragment_instance_idx++, *fragment_params);
+ }
+}
+
Status SimpleScheduler::ComputeScanRangeAssignment(
const BackendConfig& backend_config, PlanNodeId node_id,
const TReplicaPreference::type* node_replica_preference, bool node_random_replica,
@@ -459,12 +658,10 @@ void SimpleScheduler::ComputeFragmentExecParams(const TQueryExecRequest& exec_re
// assign instance ids
int64_t num_fragment_instances = 0;
for (FragmentExecParams& params: *fragment_exec_params) {
- for (int j = 0; j < params.hosts.size(); ++j) {
- int instance_idx = num_fragment_instances + j;
+ for (int j = 0; j < params.hosts.size(); ++j, ++num_fragment_instances) {
params.instance_ids.push_back(
- CreateInstanceId(schedule->query_id(), instance_idx));
+ CreateInstanceId(schedule->query_id(), num_fragment_instances));
}
- num_fragment_instances += params.hosts.size();
}
if (exec_request.fragments[0].partition.type == TPartitionType::UNPARTITIONED) {
// the root fragment is executed directly by the coordinator
@@ -511,11 +708,6 @@ void SimpleScheduler::ComputeFragmentHosts(const TQueryExecRequest& exec_request
QuerySchedule* schedule) {
vector<FragmentExecParams>* fragment_exec_params = schedule->exec_params();
DCHECK_EQ(fragment_exec_params->size(), exec_request.fragments.size());
- vector<TPlanNodeType::type> scan_node_types;
- scan_node_types.push_back(TPlanNodeType::HDFS_SCAN_NODE);
- scan_node_types.push_back(TPlanNodeType::HBASE_SCAN_NODE);
- scan_node_types.push_back(TPlanNodeType::DATA_SOURCE_NODE);
- scan_node_types.push_back(TPlanNodeType::KUDU_SCAN_NODE);
// compute hosts of producer fragment before those of consumer fragment(s),
// the latter might inherit the set of hosts from the former
@@ -535,6 +727,9 @@ void SimpleScheduler::ComputeFragmentHosts(const TQueryExecRequest& exec_request
// a UnionNode with partitioned joins or grouping aggregates as children runs on
// at least as many hosts as the input to those children).
if (ContainsNode(fragment.plan, TPlanNodeType::UNION_NODE)) {
+ vector<TPlanNodeType::type> scan_node_types {
+ TPlanNodeType::HDFS_SCAN_NODE, TPlanNodeType::HBASE_SCAN_NODE,
+ TPlanNodeType::DATA_SOURCE_NODE, TPlanNodeType::KUDU_SCAN_NODE};
vector<TPlanNodeId> scan_nodes;
FindNodes(fragment.plan, scan_node_types, &scan_nodes);
vector<TPlanNodeId> exch_nodes;
@@ -562,7 +757,7 @@ void SimpleScheduler::ComputeFragmentHosts(const TQueryExecRequest& exec_request
continue;
}
- PlanNodeId leftmost_scan_id = FindLeftmostNode(fragment.plan, scan_node_types);
+ PlanNodeId leftmost_scan_id = FindLeftmostScan(fragment.plan);
if (leftmost_scan_id == g_ImpalaInternalService_constants.INVALID_PLAN_NODE_ID) {
// there is no leftmost scan; we assign the same hosts as those of our
// leftmost input fragment (so that a partitioned aggregation fragment
@@ -606,6 +801,13 @@ PlanNodeId SimpleScheduler::FindLeftmostNode(
return g_ImpalaInternalService_constants.INVALID_PLAN_NODE_ID;
}
+PlanNodeId SimpleScheduler::FindLeftmostScan(const TPlan& plan) {
+ vector<TPlanNodeType::type> scan_node_types {
+ TPlanNodeType::HDFS_SCAN_NODE, TPlanNodeType::HBASE_SCAN_NODE,
+ TPlanNodeType::DATA_SOURCE_NODE, TPlanNodeType::KUDU_SCAN_NODE};
+ return FindLeftmostNode(plan, scan_node_types);
+}
+
bool SimpleScheduler::ContainsNode(const TPlan& plan, TPlanNodeType::type type) {
for (int i = 0; i < plan.nodes.size(); ++i) {
if (plan.nodes[i].node_type == type) return true;
@@ -674,18 +876,35 @@ int SimpleScheduler::FindSenderFragment(TPlanNodeId exch_id, int fragment_idx,
return g_ImpalaInternalService_constants.INVALID_PLAN_NODE_ID;
}
-Status SimpleScheduler::Schedule(Coordinator* coord, QuerySchedule* schedule) {
+Status SimpleScheduler::Schedule(QuerySchedule* schedule) {
string resolved_pool;
RETURN_IF_ERROR(request_pool_service_->ResolveRequestPool(
schedule->request().query_ctx, &resolved_pool));
schedule->set_request_pool(resolved_pool);
schedule->summary_profile()->AddInfoString("Request Pool", resolved_pool);
- RETURN_IF_ERROR(ComputeScanRangeAssignment(schedule->request(), schedule));
- ComputeFragmentHosts(schedule->request(), schedule);
- ComputeFragmentExecParams(schedule->request(), schedule);
- if (!FLAGS_disable_admission_control) {
- RETURN_IF_ERROR(admission_controller_->AdmitQuery(schedule));
+ bool is_mt_execution = schedule->request().query_ctx.request.query_options.mt_dop > 0;
+ if (is_mt_execution) {
+ RETURN_IF_ERROR(MtComputeScanRangeAssignment(schedule));
+ MtComputeFragmentExecParams(schedule);
+
+ // compute unique hosts
+ unordered_set<TNetworkAddress> unique_hosts;
+ for (const MtFragmentExecParams& f: schedule->mt_fragment_exec_params()) {
+ for (const FInstanceExecParams& i: f.instance_exec_params) {
+ unique_hosts.insert(i.host);
+ }
+ }
+ schedule->SetUniqueHosts(unique_hosts);
+
+ // TODO-MT: call AdmitQuery()
+ } else {
+ RETURN_IF_ERROR(ComputeScanRangeAssignment(schedule->request(), schedule));
+ ComputeFragmentHosts(schedule->request(), schedule);
+ ComputeFragmentExecParams(schedule->request(), schedule);
+ if (!FLAGS_disable_admission_control) {
+ RETURN_IF_ERROR(admission_controller_->AdmitQuery(schedule));
+ }
}
return Status::OK();
}
@@ -849,10 +1068,10 @@ void SimpleScheduler::AssignmentCtx::RecordScanRangeAssignment(
if (!remote_read) total_local_assignments_->Increment(1);
}
- PerNodeScanRanges* scan_ranges = FindOrInsert(assignment, backend.address,
- PerNodeScanRanges());
- vector<TScanRangeParams>* scan_range_params_list = FindOrInsert(scan_ranges, node_id,
- vector<TScanRangeParams>());
+ PerNodeScanRanges* scan_ranges =
+ FindOrInsert(assignment, backend.address, PerNodeScanRanges());
+ vector<TScanRangeParams>* scan_range_params_list =
+ FindOrInsert(scan_ranges, node_id, vector<TScanRangeParams>());
// Add scan range.
TScanRangeParams scan_range_params;
scan_range_params.scan_range = scan_range_locations.scan_range;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/be/src/scheduling/simple-scheduler.h
----------------------------------------------------------------------
diff --git a/be/src/scheduling/simple-scheduler.h b/be/src/scheduling/simple-scheduler.h
index 9b51269..b7cc83e 100644
--- a/be/src/scheduling/simple-scheduler.h
+++ b/be/src/scheduling/simple-scheduler.h
@@ -88,7 +88,7 @@ class SimpleScheduler : public Scheduler {
/// Register with the subscription manager if required
virtual impala::Status Init();
- virtual Status Schedule(Coordinator* coord, QuerySchedule* schedule);
+ virtual Status Schedule(QuerySchedule* schedule);
virtual Status Release(QuerySchedule* schedule);
private:
@@ -405,7 +405,39 @@ class SimpleScheduler : public Scheduler {
void ComputeFragmentExecParams(const TQueryExecRequest& exec_request,
QuerySchedule* schedule);
- /// For each fragment in exec_request, compute the hosts on which to run the instances
+ /// Compute the assignment of scan ranges to hosts for each scan node in
+ /// the schedule's TQueryExecRequest.mt_plan_exec_info.
+ /// Unpartitioned fragments are assigned to the coordinator. Populate the schedule's
+ /// mt_fragment_exec_params_ with the resulting scan range assignment.
+ Status MtComputeScanRangeAssignment(QuerySchedule* schedule);
+
+ /// Compute the MtFragmentExecParams for all plans in the schedule's
+ /// TQueryExecRequest.mt_plan_exec_info.
+ /// This includes the routing information (destinations, per_exch_num_senders,
+ /// sender_id)
+ void MtComputeFragmentExecParams(QuerySchedule* schedule);
+
+ /// Recursively create FInstanceExecParams and set per_node_scan_ranges for
+ /// fragment_params and its input fragments via a depth-first traversal.
+ /// All fragments are part of plan_exec_info.
+ void MtComputeFragmentExecParams(const TPlanExecInfo& plan_exec_info,
+ MtFragmentExecParams* fragment_params, QuerySchedule* schedule);
+
+ /// Create instances of the fragment corresponding to fragment_params to run on the
+ /// selected replica hosts of the scan ranges of the node with id scan_id.
+ /// The maximum number of instances is the value of query option mt_dop.
+ /// This attempts to load balance among instances by computing the average number
+ /// of bytes per instances and then in a single pass assigning scan ranges to each
+ /// instances to roughly meet that average.
+ void MtCreateScanInstances(PlanNodeId scan_id,
+ MtFragmentExecParams* fragment_params, QuerySchedule* schedule);
+
+ /// For each instance of the single input fragment of the fragment corresponding to
+ /// fragment_params, create an instance for this fragment.
+ void MtCreateMirrorInstances(MtFragmentExecParams* fragment_params,
+ QuerySchedule* schedule);
+
+ /// For each fragment in exec_request, computes hosts on which to run the instances
/// and stores result in fragment_exec_params_.hosts.
void ComputeFragmentHosts(const TQueryExecRequest& exec_request,
QuerySchedule* schedule);
@@ -414,6 +446,8 @@ class SimpleScheduler : public Scheduler {
/// INVALID_PLAN_NODE_ID if no such node present.
PlanNodeId FindLeftmostNode(
const TPlan& plan, const std::vector<TPlanNodeType::type>& types);
+ /// Same for scan nodes.
+ PlanNodeId FindLeftmostScan(const TPlan& plan);
/// Return the index (w/in exec_request.fragments) of fragment that sends its output to
/// exec_request.fragment[fragment_idx]'s leftmost ExchangeNode.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/be/src/service/fragment-exec-state.cc
----------------------------------------------------------------------
diff --git a/be/src/service/fragment-exec-state.cc b/be/src/service/fragment-exec-state.cc
index 9069b03..76e11d1 100644
--- a/be/src/service/fragment-exec-state.cc
+++ b/be/src/service/fragment-exec-state.cc
@@ -81,7 +81,6 @@ void FragmentMgr::FragmentExecState::ReportStatusCb(
TReportExecStatusParams params;
params.protocol_version = ImpalaInternalServiceVersion::V1;
params.__set_query_id(query_ctx_.query_id);
- params.__set_instance_state_idx( fragment_instance_ctx_.instance_state_idx);
params.__set_fragment_instance_id(fragment_instance_ctx_.fragment_instance_id);
exec_status.SetTStatus(¶ms);
params.__set_done(done);
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/be/src/service/fragment-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/service/fragment-mgr.cc b/be/src/service/fragment-mgr.cc
index a98bbf9..64e9a78 100644
--- a/be/src/service/fragment-mgr.cc
+++ b/be/src/service/fragment-mgr.cc
@@ -38,10 +38,8 @@ DEFINE_int32(log_mem_usage_interval, 0, "If non-zero, impalad will output memory
Status FragmentMgr::ExecPlanFragment(const TExecPlanFragmentParams& exec_params) {
VLOG_QUERY << "ExecPlanFragment() instance_id="
- << exec_params.fragment_instance_ctx.fragment_instance_id
- << " coord=" << exec_params.query_ctx.coord_address
- << " fragment instance#="
- << exec_params.fragment_instance_ctx.instance_state_idx;
+ << PrintId(exec_params.fragment_instance_ctx.fragment_instance_id)
+ << " coord=" << exec_params.query_ctx.coord_address;
// Preparing and opening the fragment creates a thread and consumes a non-trivial
// amount of memory. If we are already starved for memory, cancel the fragment as
@@ -146,6 +144,7 @@ void FragmentMgr::CancelPlanFragment(TCancelPlanFragmentResult& return_val,
void FragmentMgr::PublishFilter(TPublishFilterResult& return_val,
const TPublishFilterParams& params) {
+ VLOG_FILE << "PublishFilter(): dst_instance_id=" << params.dst_instance_id;
shared_ptr<FragmentExecState> fragment_exec_state =
GetFragmentExecState(params.dst_instance_id);
if (fragment_exec_state.get() == NULL) {
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/be/src/service/impala-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index d5fd59a..7f9d862 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -845,9 +845,7 @@ void ImpalaServer::PrepareQueryContext(TQueryCtx* query_ctx) {
// benchmarks show it to be slightly cheaper than contending for a
// single generator under a lock (since random_generator is not
// thread-safe).
- random_generator uuid_generator;
- uuid query_uuid = uuid_generator();
- query_ctx->query_id = UuidToQueryId(query_uuid);
+ query_ctx->query_id = UuidToQueryId(random_generator()());
}
Status ImpalaServer::RegisterQuery(shared_ptr<SessionState> session_state,
@@ -1077,9 +1075,8 @@ Status ImpalaServer::GetSessionState(const TUniqueId& session_id,
void ImpalaServer::ReportExecStatus(
TReportExecStatusResult& return_val, const TReportExecStatusParams& params) {
- VLOG_FILE << "ReportExecStatus() query_id=" << params.query_id
- << " fragment instance#=" << params.instance_state_idx
- << " instance_id=" << params.fragment_instance_id
+ VLOG_FILE << "ReportExecStatus()"
+ << " instance_id=" << PrintId(params.fragment_instance_id)
<< " done=" << (params.done ? "true" : "false");
// TODO: implement something more efficient here, we're currently
// acquiring/releasing the map lock and doing a map lookup for
@@ -1090,10 +1087,8 @@ void ImpalaServer::ReportExecStatus(
// This is expected occasionally (since a report RPC might be in flight while
// cancellation is happening). Return an error to the caller to get it to stop.
const string& err = Substitute("ReportExecStatus(): Received report for unknown "
- "query ID (probably closed or cancelled). (query_id: $0, backend: $1, instance:"
- " $2 done: $3)", PrintId(params.query_id),
- params.instance_state_idx, PrintId(params.fragment_instance_id),
- params.done);
+ "query ID (probably closed or cancelled). (instance: $0 done: $1)",
+ PrintId(params.fragment_instance_id), params.done);
Status(TErrorCode::INTERNAL_ERROR, err).SetTStatus(&return_val);
VLOG_QUERY << err;
return;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/be/src/service/query-exec-state.cc
----------------------------------------------------------------------
diff --git a/be/src/service/query-exec-state.cc b/be/src/service/query-exec-state.cc
index 9b2fc88..d55ac54 100644
--- a/be/src/service/query-exec-state.cc
+++ b/be/src/service/query-exec-state.cc
@@ -373,7 +373,8 @@ Status ImpalaServer::QueryExecState::ExecLocalCatalogOp(
Status ImpalaServer::QueryExecState::ExecQueryOrDmlRequest(
const TQueryExecRequest& query_exec_request) {
// we always need at least one plan fragment
- DCHECK_GT(query_exec_request.fragments.size(), 0);
+ DCHECK(query_exec_request.fragments.size() > 0
+ || query_exec_request.mt_plan_exec_info.size() > 0);
if (query_exec_request.__isset.query_plan) {
stringstream plan_ss;
@@ -424,26 +425,31 @@ Status ImpalaServer::QueryExecState::ExecQueryOrDmlRequest(
// case, the query can only have a single fragment, and that fragment needs to be
// executed by the coordinator. This check confirms that.
// If desc_tbl is set, the query may or may not have a coordinator fragment.
+ bool is_mt_exec = query_exec_request.query_ctx.request.query_options.mt_dop > 0;
+ const TPlanFragment& fragment = is_mt_exec
+ ? query_exec_request.mt_plan_exec_info[0].fragments[0]
+ : query_exec_request.fragments[0];
bool has_coordinator_fragment =
- query_exec_request.fragments[0].partition.type == TPartitionType::UNPARTITIONED;
+ fragment.partition.type == TPartitionType::UNPARTITIONED;
DCHECK(has_coordinator_fragment || query_exec_request.__isset.desc_tbl);
{
lock_guard<mutex> l(lock_);
// Don't start executing the query if Cancel() was called concurrently with Exec().
if (is_cancelled_) return Status::CANCELLED;
+ // TODO: make schedule local to coordinator and move schedule_->Release() into
+ // Coordinator::TearDown()
schedule_.reset(new QuerySchedule(query_id(), query_exec_request,
exec_request_.query_options, &summary_profile_, query_events_));
- coord_.reset(new Coordinator(exec_request_.query_options, exec_env_, query_events_));
}
- Status status = exec_env_->scheduler()->Schedule(coord_.get(), schedule_.get());
-
+ Status status = exec_env_->scheduler()->Schedule(schedule_.get());
{
lock_guard<mutex> l(lock_);
RETURN_IF_ERROR(UpdateQueryStatus(status));
}
- status = coord_->Exec(*schedule_, &output_expr_ctxs_);
+ coord_.reset(new Coordinator(*schedule_, exec_env_, query_events_));
+ status = coord_->Exec(&output_expr_ctxs_);
{
lock_guard<mutex> l(lock_);
RETURN_IF_ERROR(UpdateQueryStatus(status));
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/be/src/service/query-options.cc
----------------------------------------------------------------------
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index d0e4275..0823981 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -395,16 +395,16 @@ Status impala::SetQueryOption(const string& key, const string& value,
}
break;
}
- case TImpalaQueryOptions::MT_NUM_CORES: {
+ case TImpalaQueryOptions::MT_DOP: {
StringParser::ParseResult result;
- const int32_t num_cores =
+ const int32_t dop =
StringParser::StringToInt<int32_t>(value.c_str(), value.length(), &result);
- if (result != StringParser::PARSE_SUCCESS || num_cores < 0 || num_cores > 128) {
+ if (result != StringParser::PARSE_SUCCESS || dop < 0 || dop > 128) {
return Status(
- Substitute("$0 is not valid for mt_num_cores. Valid values are in "
+ Substitute("$0 is not valid for mt_dop. Valid values are in "
"[0, 128].", value));
}
- query_options->__set_mt_num_cores(num_cores);
+ query_options->__set_mt_dop(dop);
break;
}
case TImpalaQueryOptions::S3_SKIP_INSERT_STAGING: {
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/be/src/service/query-options.h
----------------------------------------------------------------------
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index 2c25700..b1194d3 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -79,7 +79,7 @@ class TQueryOptions;
QUERY_OPT_FN(max_num_runtime_filters, MAX_NUM_RUNTIME_FILTERS)\
QUERY_OPT_FN(parquet_annotate_strings_utf8, PARQUET_ANNOTATE_STRINGS_UTF8)\
QUERY_OPT_FN(parquet_fallback_schema_resolution, PARQUET_FALLBACK_SCHEMA_RESOLUTION)\
- QUERY_OPT_FN(mt_num_cores, MT_NUM_CORES)\
+ QUERY_OPT_FN(mt_dop, MT_DOP)\
QUERY_OPT_FN(s3_skip_insert_staging, S3_SKIP_INSERT_STAGING)\
QUERY_OPT_FN(runtime_filter_min_size, RUNTIME_FILTER_MIN_SIZE)\
QUERY_OPT_FN(runtime_filter_max_size, RUNTIME_FILTER_MAX_SIZE)\
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/be/src/util/container-util.h
----------------------------------------------------------------------
diff --git a/be/src/util/container-util.h b/be/src/util/container-util.h
index b8cd1ef..efb4f52 100644
--- a/be/src/util/container-util.h
+++ b/be/src/util/container-util.h
@@ -20,6 +20,7 @@
#define IMPALA_UTIL_CONTAINER_UTIL_H
#include <map>
+#include <unordered_map>
#include <boost/unordered_map.hpp>
#include "util/hash-util.h"
@@ -35,6 +36,21 @@ inline std::size_t hash_value(const TNetworkAddress& host_port) {
return HashUtil::Hash(&host_port.port, sizeof(host_port.port), hash);
}
+}
+
+/// Hash function for std:: containers
+namespace std {
+
+template<> struct hash<impala::TNetworkAddress> {
+ std::size_t operator()(const impala::TNetworkAddress& host_port) const {
+ return impala::hash_value(host_port);
+ }
+};
+
+}
+
+namespace impala {
+
struct HashTNetworkAddressPtr : public std::unary_function<TNetworkAddress*, size_t> {
size_t operator()(const TNetworkAddress* const& p) const { return hash_value(*p); }
};
@@ -49,6 +65,7 @@ struct TNetworkAddressPtrEquals : public std::unary_function<TNetworkAddress*, b
/// FindOrInsert(): if the key is present, return the value; if the key is not present,
/// create a new entry (key, default_val) and return default_val.
+/// TODO: replace with single template which takes a template param
template <typename K, typename V>
V* FindOrInsert(std::map<K,V>* m, const K& key, const V& default_val) {
@@ -60,6 +77,15 @@ V* FindOrInsert(std::map<K,V>* m, const K& key, const V& default_val) {
}
template <typename K, typename V>
+V* FindOrInsert(std::unordered_map<K,V>* m, const K& key, const V& default_val) {
+ typename std::unordered_map<K,V>::iterator it = m->find(key);
+ if (it == m->end()) {
+ it = m->insert(std::make_pair(key, default_val)).first;
+ }
+ return &it->second;
+}
+
+template <typename K, typename V>
V* FindOrInsert(boost::unordered_map<K,V>* m, const K& key, const V& default_val) {
typename boost::unordered_map<K,V>::iterator it = m->find(key);
if (it == m->end()) {
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/be/src/util/uid-util-test.cc
----------------------------------------------------------------------
diff --git a/be/src/util/uid-util-test.cc b/be/src/util/uid-util-test.cc
index ebbc5eb..997cc0a 100644
--- a/be/src/util/uid-util-test.cc
+++ b/be/src/util/uid-util-test.cc
@@ -31,12 +31,7 @@ TEST(UidUtil, FragmentInstanceId) {
for (int i = 0; i < 100; ++i) {
TUniqueId instance_id = CreateInstanceId(query_id, i);
- EXPECT_EQ(instance_id.hi, query_id.hi);
-
- TUniqueId qid = GetQueryId(instance_id);
- EXPECT_EQ(qid.hi, query_id.hi);
- EXPECT_EQ(qid.lo, query_id.lo);
-
+ EXPECT_EQ(GetQueryId(instance_id), query_id);
EXPECT_EQ(GetInstanceIdx(instance_id), i);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/be/src/util/uid-util.h
----------------------------------------------------------------------
diff --git a/be/src/util/uid-util.h b/be/src/util/uid-util.h
index f0f87ec..de18464 100644
--- a/be/src/util/uid-util.h
+++ b/be/src/util/uid-util.h
@@ -45,7 +45,7 @@ inline void UUIDToTUniqueId(const boost::uuids::uuid& uuid, TUniqueId* unique_id
/// Query id: uuid with bottom 4 bytes set to 0
/// Fragment instance id: query id with instance index stored in the bottom 4 bytes
-const int64_t FRAGMENT_IDX_MASK = (1L << 32) - 1;
+constexpr int64_t FRAGMENT_IDX_MASK = (1L << 32) - 1;
inline TUniqueId UuidToQueryId(const boost::uuids::uuid& uuid) {
TUniqueId result;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/common/thrift/ExecStats.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ExecStats.thrift b/common/thrift/ExecStats.thrift
index 8068b63..8e88b20 100644
--- a/common/thrift/ExecStats.thrift
+++ b/common/thrift/ExecStats.thrift
@@ -56,7 +56,7 @@ struct TExecStats {
// node as well as per instance stats.
struct TPlanNodeExecSummary {
1: required Types.TPlanNodeId node_id
- 2: required i32 fragment_id
+ 2: required Types.TFragmentIdx fragment_idx
3: required string label
4: optional string label_detail
5: required i32 num_children
@@ -64,15 +64,11 @@ struct TPlanNodeExecSummary {
// Estimated stats generated by the planner
6: optional TExecStats estimated_stats
- // One entry for each BE executing this plan node.
+ // One entry for each fragment instance executing this plan node.
7: optional list<TExecStats> exec_stats
- // One entry for each BE executing this plan node. True if this plan node is still
- // running.
- 8: optional list<bool> is_active
-
// If true, this plan node is an exchange node that is the receiver of a broadcast.
- 9: optional bool is_broadcast
+ 8: optional bool is_broadcast
}
// Progress counters for an in-flight query.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/common/thrift/Frontend.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/Frontend.thrift b/common/thrift/Frontend.thrift
index 95d6ba3..91322b2 100644
--- a/common/thrift/Frontend.thrift
+++ b/common/thrift/Frontend.thrift
@@ -340,6 +340,25 @@ struct TLoadDataResp {
1: required Data.TResultRow load_summary
}
+// Execution parameters for a single plan; component of TQueryExecRequest
+struct TPlanExecInfo {
+ // fragments[i] may consume the output of fragments[j > i];
+ // fragments[0] is the root fragment and also the coordinator fragment, if
+ // it is unpartitioned.
+ 1: required list<Planner.TPlanFragment> fragments
+
+ // Specifies the destination fragment of the output of each fragment.
+ // dest_fragment_idx.size() == fragments.size() - 1 and
+ // fragments[i] sends its output to fragments[dest_fragment_idx[i-1]]
+ // TODO: remove; TPlanFragment.output_sink.dest_node_id is sufficient
+ 2: optional list<i32> dest_fragment_idx
+
+ // A map from scan node ids to a list of scan range locations.
+ // The node ids refer to scan nodes in fragments[].plan_tree
+ 3: optional map<Types.TPlanNodeId, list<Planner.TScanRangeLocations>>
+ per_node_scan_ranges
+}
+
// Result of call to ImpalaPlanService/JniFrontend.CreateQueryRequest()
struct TQueryExecRequest {
// global descriptor tbl for all fragments
@@ -348,12 +367,7 @@ struct TQueryExecRequest {
// fragments[i] may consume the output of fragments[j > i];
// fragments[0] is the root fragment and also the coordinator fragment, if
// it is unpartitioned.
- 2: required list<Planner.TPlanFragment> fragments
-
- // Multi-threaded execution: sequence of plans; the last one materializes
- // the query result
- // TODO: this will eventually supercede 'fragments'
- 14: optional list<Planner.TPlanFragmentTree> mt_plans
+ 2: optional list<Planner.TPlanFragment> fragments
// Specifies the destination fragment of the output of each fragment.
// parent_fragment_idx.size() == fragments.size() - 1 and
@@ -365,6 +379,12 @@ struct TQueryExecRequest {
4: optional map<Types.TPlanNodeId, list<Planner.TScanRangeLocations>>
per_node_scan_ranges
+ // Multi-threaded execution: exec info for all plans; the first one materializes
+ // the query result
+ // TODO: this will eventually supercede fields fragments, dest_fragment_idx,
+ // per_node_scan_ranges
+ 14: optional list<TPlanExecInfo> mt_plan_exec_info
+
// Metadata of the query result set (only for select)
5: optional Results.TResultSetMetadata result_set_metadata
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/common/thrift/ImpalaInternalService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index 089524d..3ee54ae 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -181,10 +181,11 @@ struct TQueryOptions {
// "name".
43: optional TParquetFallbackSchemaResolution parquet_fallback_schema_resolution = 0
- // Multi-threaded execution: number of cores per query per node.
- // > 0: multi-threaded execution mode, with given number of cores
+ // Multi-threaded execution: degree of parallelism (= number of active threads) per
+ // query per backend.
+ // > 0: multi-threaded execution mode, with given dop
// 0: single-threaded execution mode
- 44: optional i32 mt_num_cores = 0
+ 44: optional i32 mt_dop = 0
// If true, INSERT writes to S3 go directly to their final location rather than being
// copied there by the coordinator. We cannot do this for INSERT OVERWRITES because for
@@ -252,6 +253,7 @@ struct TClientRequest {
// TODO: Separate into FE/BE initialized vars.
struct TQueryCtx {
// Client request containing stmt to execute and query options.
+ // TODO: rename to client_request, we have too many requests
1: required TClientRequest request
// A globally unique id assigned to the entire query in the BE.
@@ -302,9 +304,6 @@ struct TQueryCtx {
// fragment.
struct TPlanFragmentCtx {
1: required Planner.TPlanFragment fragment
-
- // total number of instances of this fragment
- 2: required i32 num_fragment_instances
}
// A scan range plus the parameters needed to execute that scan.
@@ -329,45 +328,42 @@ struct TPlanFragmentDestination {
// TODO: for range partitioning, we also need to specify the range boundaries
struct TPlanFragmentInstanceCtx {
// The globally unique fragment instance id.
- // Format: query id + query-wide fragment index
- // The query-wide fragment index starts at 0, so that the query id
- // and the id of the first fragment instance (the coordinator instance)
- // are identical.
+ // Format: query id + query-wide fragment instance index
+ // The query-wide fragment instance index starts at 0, so that the query id
+ // and the id of the first fragment instance are identical.
+ // If there is a coordinator instance, it is the first one, with index 0.
1: required Types.TUniqueId fragment_instance_id
// Index of this fragment instance accross all instances of its parent fragment,
// range [0, TPlanFragmentCtx.num_fragment_instances).
- 2: required i32 fragment_instance_idx
-
- // Index of this fragment instance in Coordinator::fragment_instance_states_.
- // TODO: remove; this is subsumed by the query-wide instance idx embedded
- // in the fragment_instance_id
- 3: required i32 instance_state_idx
+ 2: required i32 per_fragment_instance_idx
// Initial scan ranges for each scan node in TPlanFragment.plan_tree
- 4: required map<Types.TPlanNodeId, list<TScanRangeParams>> per_node_scan_ranges
+ 3: required map<Types.TPlanNodeId, list<TScanRangeParams>> per_node_scan_ranges
// Number of senders for ExchangeNodes contained in TPlanFragment.plan_tree;
// needed to create a DataStreamRecvr
- 5: required map<Types.TPlanNodeId, i32> per_exch_num_senders
+ // TODO for per-query exec rpc: move these to TPlanFragmentCtx
+ 4: required map<Types.TPlanNodeId, i32> per_exch_num_senders
// Output destinations, one per output partition.
// The partitioning of the output is specified by
// TPlanFragment.output_sink.output_partition.
// The number of output partitions is destinations.size().
- 6: list<TPlanFragmentDestination> destinations
+ // TODO for per-query exec rpc: move these to TPlanFragmentCtx
+ 5: list<TPlanFragmentDestination> destinations
// Debug options: perform some action in a particular phase of a particular node
- 7: optional Types.TPlanNodeId debug_node_id
- 8: optional PlanNodes.TExecNodePhase debug_phase
- 9: optional PlanNodes.TDebugAction debug_action
+ 6: optional Types.TPlanNodeId debug_node_id
+ 7: optional PlanNodes.TExecNodePhase debug_phase
+ 8: optional PlanNodes.TDebugAction debug_action
// The pool to which this request has been submitted. Used to update pool statistics
// for admission control.
- 10: optional string request_pool
+ 9: optional string request_pool
// Id of this fragment in its role as a sender.
- 11: optional i32 sender_id
+ 10: optional i32 sender_id
}
@@ -461,31 +457,26 @@ struct TReportExecStatusParams {
2: optional Types.TUniqueId query_id
// required in V1
- // Used to look up the fragment instance state in the coordinator, same value as
- // TExecPlanFragmentParams.instance_state_idx.
- 3: optional i32 instance_state_idx
-
- // required in V1
- 4: optional Types.TUniqueId fragment_instance_id
+ 3: optional Types.TUniqueId fragment_instance_id
// Status of fragment execution; any error status means it's done.
// required in V1
- 5: optional Status.TStatus status
+ 4: optional Status.TStatus status
// If true, fragment finished executing.
// required in V1
- 6: optional bool done
+ 5: optional bool done
// cumulative profile
// required in V1
- 7: optional RuntimeProfile.TRuntimeProfileTree profile
+ 6: optional RuntimeProfile.TRuntimeProfileTree profile
// Cumulative structural changes made by a table sink
// optional in V1
- 8: optional TInsertExecStatus insert_exec_status;
+ 7: optional TInsertExecStatus insert_exec_status;
// New errors that have not been reported to the coordinator
- 9: optional map<ErrorCodes.TErrorCode, TErrorLogEntry> error_log;
+ 8: optional map<ErrorCodes.TErrorCode, TErrorLogEntry> error_log;
}
struct TReportExecStatusResult {
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/common/thrift/ImpalaService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift
index da41a8e..129be2d 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -213,8 +213,9 @@ enum TImpalaQueryOptions {
// is always, since fields IDs are NYI). Valid values are "position" and "name".
PARQUET_FALLBACK_SCHEMA_RESOLUTION,
- // Multi-threaded execution: number of cores per machine
- MT_NUM_CORES,
+ // Multi-threaded execution: degree of parallelism = number of active threads per
+ // backend
+ MT_DOP,
// If true, INSERT writes to S3 go directly to their final location rather than being
// copied there by the coordinator. We cannot do this for INSERT OVERWRITES because for
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/common/thrift/Planner.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/Planner.thrift b/common/thrift/Planner.thrift
index eb95585..92c8681 100644
--- a/common/thrift/Planner.thrift
+++ b/common/thrift/Planner.thrift
@@ -31,11 +31,14 @@ include "Partitions.thrift"
// plan fragment, including how to produce and how to partition its output.
// It leaves out node-specific parameters needed for the actual execution.
struct TPlanFragment {
+ // Ordinal number of fragment within a query; range: 0..<total # fragments>
+ 1: required Types.TFragmentIdx idx
+
// display name to be shown in the runtime profile; unique within a query
- 1: required string display_name
+ 2: required string display_name
// no plan or descriptor table: query without From clause
- 2: optional PlanNodes.TPlan plan
+ 3: optional PlanNodes.TPlan plan
// exprs that produce values for slots of output tuple (one expr per slot);
// if not set, plan fragment materializes full rows of plan_tree
@@ -74,6 +77,8 @@ struct TScanRangeLocation {
}
// A single scan range plus the hosts that serve it
+// TODO: rename to TScanRangeLocationList, having this differ from the above struct
+// by only a single letter has caused needless confusion
struct TScanRangeLocations {
1: required PlanNodes.TScanRange scan_range
// non-empty list
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/common/thrift/Types.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/Types.thrift b/common/thrift/Types.thrift
index 770a414..30d168d 100644
--- a/common/thrift/Types.thrift
+++ b/common/thrift/Types.thrift
@@ -19,6 +19,7 @@ namespace cpp impala
namespace java org.apache.impala.thrift
typedef i64 TTimestamp
+typedef i32 TFragmentIdx
typedef i32 TPlanNodeId
typedef i32 TTupleId
typedef i32 TSlotId
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/fe/src/main/java/org/apache/impala/common/TreeNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/common/TreeNode.java b/fe/src/main/java/org/apache/impala/common/TreeNode.java
index adaee18..3231e33 100644
--- a/fe/src/main/java/org/apache/impala/common/TreeNode.java
+++ b/fe/src/main/java/org/apache/impala/common/TreeNode.java
@@ -51,6 +51,22 @@ public abstract class TreeNode<NodeType extends TreeNode<NodeType>> {
public ArrayList<NodeType> getChildren() { return children_; }
/**
+ * Return list of all nodes of the tree rooted at 'this', obtained
+ * through pre-order traversal.
+ */
+ public <C extends TreeNode<NodeType>> ArrayList<C> getNodesPreOrder() {
+ ArrayList<C> result = new ArrayList<C>();
+ getNodesPreOrderAux(result);
+ return result;
+ }
+
+ protected <C extends TreeNode<NodeType>> void getNodesPreOrderAux(
+ ArrayList<C> result) {
+ result.add((C) this);
+ for (NodeType child: children_) child.getNodesPreOrderAux(result);
+ }
+
+ /**
* Count the total number of nodes in this tree. Leaf node will return 1.
* Non-leaf node will include all its children.
*/
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/fe/src/main/java/org/apache/impala/planner/Planner.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/Planner.java b/fe/src/main/java/org/apache/impala/planner/Planner.java
index e50aca5..405eebe 100644
--- a/fe/src/main/java/org/apache/impala/planner/Planner.java
+++ b/fe/src/main/java/org/apache/impala/planner/Planner.java
@@ -59,6 +59,11 @@ public class Planner {
ctx_ = new PlannerContext(analysisResult, queryCtx);
}
+ public TQueryCtx getQueryCtx() { return ctx_.getQueryCtx(); }
+ public AnalysisContext.AnalysisResult getAnalysisResult() {
+ return ctx_.getAnalysisResult();
+ }
+
/**
* Returns a list of plan fragments for executing an analyzed parse tree.
* May return a single-node or distributed executable plan. If enabled (through a
@@ -204,6 +209,24 @@ public class Planner {
/**
* Return combined explain string for all plan fragments.
* Includes the estimated resource requirements from the request if set.
+ * Uses a default level of EXTENDED, unless overriden by the
+ * 'explain_level' query option.
+ */
+ public String getExplainString(ArrayList<PlanFragment> fragments,
+ TQueryExecRequest request) {
+ // use EXTENDED by default for all non-explain statements
+ TExplainLevel explainLevel = TExplainLevel.EXTENDED;
+ // use the query option for explain stmts and tests (e.g., planner tests)
+ if (ctx_.getAnalysisResult().isExplainStmt() || RuntimeEnv.INSTANCE.isTestEnv()) {
+ explainLevel = ctx_.getQueryOptions().getExplain_level();
+ }
+ return getExplainString(fragments, request, explainLevel);
+ }
+
+ /**
+ * Return combined explain string for all plan fragments and with an
+ * explicit explain level.
+ * Includes the estimated resource requirements from the request if set.
*/
public String getExplainString(ArrayList<PlanFragment> fragments,
TQueryExecRequest request, TExplainLevel explainLevel) {
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/fe/src/main/java/org/apache/impala/service/Frontend.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java
index fe1f8f1..00a3d93 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -120,6 +120,7 @@ import org.apache.impala.thrift.TLineageGraph;
import org.apache.impala.thrift.TLoadDataReq;
import org.apache.impala.thrift.TLoadDataResp;
import org.apache.impala.thrift.TMetadataOpRequest;
+import org.apache.impala.thrift.TPlanExecInfo;
import org.apache.impala.thrift.TPlanFragment;
import org.apache.impala.thrift.TPlanFragmentTree;
import org.apache.impala.thrift.TQueryCtx;
@@ -908,70 +909,123 @@ public class Frontend {
}
/**
- * Create a populated TExecRequest corresponding to the supplied TQueryCtx.
+ * Return a TPlanExecInfo corresponding to the plan with root fragment 'planRoot'.
*/
- public TExecRequest createExecRequest(TQueryCtx queryCtx, StringBuilder explainString)
- throws ImpalaException {
- // Analyze the statement
- AnalysisContext.AnalysisResult analysisResult = analyzeStmt(queryCtx);
- EventSequence timeline = analysisResult.getAnalyzer().getTimeline();
- timeline.markEvent("Analysis finished");
- Preconditions.checkNotNull(analysisResult.getStmt());
- TExecRequest result = new TExecRequest();
- result.setQuery_options(queryCtx.request.getQuery_options());
- result.setAccess_events(analysisResult.getAccessEvents());
- result.analysis_warnings = analysisResult.getAnalyzer().getWarnings();
+ private TPlanExecInfo createPlanExecInfo(PlanFragment planRoot, Planner planner,
+ TQueryCtx queryCtx, TQueryExecRequest queryExecRequest) {
+ TPlanExecInfo result = new TPlanExecInfo();
+ ArrayList<PlanFragment> fragments = planRoot.getNodesPreOrder();
- if (analysisResult.isCatalogOp()) {
- result.stmt_type = TStmtType.DDL;
- createCatalogOpRequest(analysisResult, result);
- TLineageGraph thriftLineageGraph = analysisResult.getThriftLineageGraph();
- if (thriftLineageGraph != null && thriftLineageGraph.isSetQuery_text()) {
- result.catalog_op_request.setLineage_graph(thriftLineageGraph);
- }
- // All DDL operations except for CTAS are done with analysis at this point.
- if (!analysisResult.isCreateTableAsSelectStmt()) return result;
- } else if (analysisResult.isLoadDataStmt()) {
- result.stmt_type = TStmtType.LOAD;
- result.setResult_set_metadata(new TResultSetMetadata(Arrays.asList(
- new TColumn("summary", Type.STRING.toThrift()))));
- result.setLoad_data_request(analysisResult.getLoadDataStmt().toThrift());
- return result;
- } else if (analysisResult.isSetStmt()) {
- result.stmt_type = TStmtType.SET;
- result.setResult_set_metadata(new TResultSetMetadata(Arrays.asList(
- new TColumn("option", Type.STRING.toThrift()),
- new TColumn("value", Type.STRING.toThrift()))));
- result.setSet_query_option_request(analysisResult.getSetStmt().toThrift());
- return result;
+ // map from fragment to its index in TPlanExecInfo.fragments; needed for
+ // TPlanExecInfo.dest_fragment_idx
+ List<ScanNode> scanNodes = Lists.newArrayList();
+ Map<PlanFragment, Integer> fragmentIdx = Maps.newHashMap();
+ for (int idx = 0; idx < fragments.size(); ++idx) {
+ PlanFragment fragment = fragments.get(idx);
+ Preconditions.checkNotNull(fragment.getPlanRoot());
+ fragment.getPlanRoot().collect(Predicates.instanceOf(ScanNode.class), scanNodes);
+ fragmentIdx.put(fragment, idx);
}
- // create TQueryExecRequest
- Preconditions.checkState(analysisResult.isQueryStmt() || analysisResult.isDmlStmt()
- || analysisResult.isCreateTableAsSelectStmt() || analysisResult.isUpdateStmt()
- || analysisResult.isDeleteStmt());
+ // set fragment destinations
+ for (int i = 1; i < fragments.size(); ++i) {
+ PlanFragment dest = fragments.get(i).getDestFragment();
+ Integer idx = fragmentIdx.get(dest);
+ Preconditions.checkState(idx != null);
+ result.addToDest_fragment_idx(idx.intValue());
+ }
- TQueryExecRequest queryExecRequest = new TQueryExecRequest();
- // create plan
- LOG.debug("create plan");
- Planner planner = new Planner(analysisResult, queryCtx);
- if (RuntimeEnv.INSTANCE.isTestEnv()
- && queryCtx.request.query_options.mt_num_cores > 0) {
- // TODO: this is just to be able to run tests; implement this
- List<PlanFragment> planRoots = planner.createParallelPlans();
- for (PlanFragment planRoot: planRoots) {
- TPlanFragmentTree thriftPlan = planRoot.treeToThrift();
- queryExecRequest.addToMt_plans(thriftPlan);
+ // Set scan ranges/locations for scan nodes.
+ LOG.debug("get scan range locations");
+ Set<TTableName> tablesMissingStats = Sets.newTreeSet();
+ Set<TTableName> tablesWithCorruptStats = Sets.newTreeSet();
+ for (ScanNode scanNode: scanNodes) {
+ result.putToPer_node_scan_ranges(
+ scanNode.getId().asInt(), scanNode.getScanRangeLocations());
+ if (scanNode.isTableMissingStats()) {
+ tablesMissingStats.add(scanNode.getTupleDesc().getTableName().toThrift());
}
- queryExecRequest.setDesc_tbl(analysisResult.getAnalyzer().getDescTbl().toThrift());
- queryExecRequest.setQuery_ctx(queryCtx);
- explainString.append(planner.getExplainString(
- Lists.newArrayList(planRoots.get(0)), queryExecRequest,
- TExplainLevel.STANDARD));
- queryExecRequest.setQuery_plan(explainString.toString());
- result.setQuery_exec_request(queryExecRequest);
- return result;
+ if (scanNode.hasCorruptTableStats()) {
+ tablesWithCorruptStats.add(scanNode.getTupleDesc().getTableName().toThrift());
+ }
+ }
+
+ for (TTableName tableName: tablesMissingStats) {
+ queryCtx.addToTables_missing_stats(tableName);
}
+ for (TTableName tableName: tablesWithCorruptStats) {
+ queryCtx.addToTables_with_corrupt_stats(tableName);
+ }
+
+ // The fragment at this point has all state set, serialize it to thrift.
+ for (PlanFragment fragment: fragments) {
+ TPlanFragment thriftFragment = fragment.toThrift();
+ result.addToFragments(thriftFragment);
+ }
+
+ return result;
+ }
+
+ /**
+ * Create a populated TQueryExecRequest, corresponding to the supplied planner,
+ * for multi-threaded execution.
+ */
+ private TQueryExecRequest mtCreateExecRequest(
+ Planner planner, StringBuilder explainString)
+ throws ImpalaException {
+ TQueryCtx queryCtx = planner.getQueryCtx();
+ Preconditions.checkState(queryCtx.request.query_options.mt_dop > 0);
+ // for now, always disable spilling in the backend
+ // TODO-MT: re-enable spilling
+ queryCtx.setDisable_spilling(true);
+ TQueryExecRequest result = new TQueryExecRequest();
+
+ LOG.debug("create mt plan");
+ List<PlanFragment> planRoots = planner.createParallelPlans();
+
+ // create EXPLAIN output
+ result.setQuery_ctx(queryCtx); // needed by getExplainString()
+ explainString.append(
+ planner.getExplainString(Lists.newArrayList(planRoots.get(0)), result));
+ result.setQuery_plan(explainString.toString());
+
+ // create per-plan exec info;
+ // also assemble list of names of tables with missing or corrupt stats for
+ // assembling a warning message
+ for (PlanFragment planRoot: planRoots) {
+ result.addToMt_plan_exec_info(
+ createPlanExecInfo(planRoot, planner, queryCtx, result));
+ }
+
+ // assign fragment ids
+ int idx = 0;
+ for (TPlanExecInfo planExecInfo: result.mt_plan_exec_info) {
+ for (TPlanFragment fragment: planExecInfo.fragments) fragment.setIdx(idx++);
+ }
+
+ // TODO-MT: implement
+ // Compute resource requirements after scan range locations because the cost
+ // estimates of scan nodes rely on them.
+ //try {
+ //planner.computeResourceReqs(fragments, true, queryExecRequest);
+ //} catch (Exception e) {
+ //// Turn exceptions into a warning to allow the query to execute.
+ //LOG.error("Failed to compute resource requirements for query\n" +
+ //queryCtx.request.getStmt(), e);
+ //}
+
+ return result;
+ }
+
+ /**
+ * Create a populated TQueryExecRequest corresponding to the supplied TQueryCtx.
+ * TODO-MT: remove this function and rename mtCreateExecRequest() to
+ * createExecRequest()
+ */
+ private TQueryExecRequest createExecRequest(
+ Planner planner, StringBuilder explainString)
+ throws ImpalaException {
+ LOG.debug("create plan");
ArrayList<PlanFragment> fragments = planner.createPlan();
List<ScanNode> scanNodes = Lists.newArrayList();
@@ -986,12 +1040,13 @@ public class Frontend {
fragmentIdx.put(fragment, idx);
}
+ TQueryExecRequest result = new TQueryExecRequest();
// set fragment destinations
for (int i = 1; i < fragments.size(); ++i) {
PlanFragment dest = fragments.get(i).getDestFragment();
Integer idx = fragmentIdx.get(dest);
Preconditions.checkState(idx != null);
- queryExecRequest.addToDest_fragment_idx(idx.intValue());
+ result.addToDest_fragment_idx(idx.intValue());
}
// Set scan ranges/locations for scan nodes.
@@ -1001,9 +1056,8 @@ public class Frontend {
// Assemble a similar list for corrupt stats
Set<TTableName> tablesWithCorruptStats = Sets.newTreeSet();
for (ScanNode scanNode: scanNodes) {
- queryExecRequest.putToPer_node_scan_ranges(
- scanNode.getId().asInt(),
- scanNode.getScanRangeLocations());
+ result.putToPer_node_scan_ranges(
+ scanNode.getId().asInt(), scanNode.getScanRangeLocations());
if (scanNode.isTableMissingStats()) {
tablesMissingStats.add(scanNode.getTupleDesc().getTableName().toThrift());
}
@@ -1012,7 +1066,7 @@ public class Frontend {
}
}
- queryExecRequest.setHost_list(analysisResult.getAnalyzer().getHostIndex().getList());
+ TQueryCtx queryCtx = planner.getQueryCtx();
for (TTableName tableName: tablesMissingStats) {
queryCtx.addToTables_missing_stats(tableName);
}
@@ -1022,6 +1076,7 @@ public class Frontend {
// Optionally disable spilling in the backend. Allow spilling if there are plan hints
// or if all tables have stats.
+ AnalysisContext.AnalysisResult analysisResult = planner.getAnalysisResult();
if (queryCtx.request.query_options.isDisable_unsafe_spills()
&& !tablesMissingStats.isEmpty()
&& !analysisResult.getAnalyzer().hasPlanHints()) {
@@ -1031,33 +1086,83 @@ public class Frontend {
// Compute resource requirements after scan range locations because the cost
// estimates of scan nodes rely on them.
try {
- planner.computeResourceReqs(fragments, true, queryExecRequest);
+ planner.computeResourceReqs(fragments, true, result);
} catch (Exception e) {
// Turn exceptions into a warning to allow the query to execute.
LOG.error("Failed to compute resource requirements for query\n" +
queryCtx.request.getStmt(), e);
}
- // The fragment at this point has all state set, serialize it to thrift.
- for (PlanFragment fragment: fragments) {
+ // The fragment at this point has all state set, assign sequential ids
+ // and serialize to thrift.
+ for (int i = 0; i < fragments.size(); ++i) {
+ PlanFragment fragment = fragments.get(i);
TPlanFragment thriftFragment = fragment.toThrift();
- queryExecRequest.addToFragments(thriftFragment);
+ thriftFragment.setIdx(i);
+ result.addToFragments(thriftFragment);
}
- // Use EXTENDED by default for all non-explain statements.
- TExplainLevel explainLevel = TExplainLevel.EXTENDED;
- // Use the query option for explain stmts and tests (e.g., planner tests).
- if (analysisResult.isExplainStmt() || RuntimeEnv.INSTANCE.isTestEnv()) {
- explainLevel = queryCtx.request.query_options.getExplain_level();
+ result.setQuery_ctx(queryCtx); // needed by getExplainString()
+ explainString.append(planner.getExplainString(fragments, result));
+ result.setQuery_plan(explainString.toString());
+ return result;
+ }
+
+ /**
+ * Create a populated TExecRequest corresponding to the supplied TQueryCtx.
+ */
+ public TExecRequest createExecRequest(TQueryCtx queryCtx, StringBuilder explainString)
+ throws ImpalaException {
+ // Analyze the statement
+ AnalysisContext.AnalysisResult analysisResult = analyzeStmt(queryCtx);
+ EventSequence timeline = analysisResult.getAnalyzer().getTimeline();
+ timeline.markEvent("Analysis finished");
+ Preconditions.checkNotNull(analysisResult.getStmt());
+ TExecRequest result = new TExecRequest();
+ result.setQuery_options(queryCtx.request.getQuery_options());
+ result.setAccess_events(analysisResult.getAccessEvents());
+ result.analysis_warnings = analysisResult.getAnalyzer().getWarnings();
+
+ if (analysisResult.isCatalogOp()) {
+ result.stmt_type = TStmtType.DDL;
+ createCatalogOpRequest(analysisResult, result);
+ TLineageGraph thriftLineageGraph = analysisResult.getThriftLineageGraph();
+ if (thriftLineageGraph != null && thriftLineageGraph.isSetQuery_text()) {
+ result.catalog_op_request.setLineage_graph(thriftLineageGraph);
+ }
+ // All DDL operations except for CTAS are done with analysis at this point.
+ if (!analysisResult.isCreateTableAsSelectStmt()) return result;
+ } else if (analysisResult.isLoadDataStmt()) {
+ result.stmt_type = TStmtType.LOAD;
+ result.setResult_set_metadata(new TResultSetMetadata(Arrays.asList(
+ new TColumn("summary", Type.STRING.toThrift()))));
+ result.setLoad_data_request(analysisResult.getLoadDataStmt().toThrift());
+ return result;
+ } else if (analysisResult.isSetStmt()) {
+ result.stmt_type = TStmtType.SET;
+ result.setResult_set_metadata(new TResultSetMetadata(Arrays.asList(
+ new TColumn("option", Type.STRING.toThrift()),
+ new TColumn("value", Type.STRING.toThrift()))));
+ result.setSet_query_option_request(analysisResult.getSetStmt().toThrift());
+ return result;
}
- // Global query parameters to be set in each TPlanExecRequest.
- queryExecRequest.setQuery_ctx(queryCtx);
+ // create TQueryExecRequest
+ Preconditions.checkState(analysisResult.isQueryStmt() || analysisResult.isDmlStmt()
+ || analysisResult.isCreateTableAsSelectStmt() || analysisResult.isUpdateStmt()
+ || analysisResult.isDeleteStmt());
- explainString.append(
- planner.getExplainString(fragments, queryExecRequest, explainLevel));
- queryExecRequest.setQuery_plan(explainString.toString());
- queryExecRequest.setDesc_tbl(analysisResult.getAnalyzer().getDescTbl().toThrift());
+ Planner planner = new Planner(analysisResult, queryCtx);
+ TQueryExecRequest queryExecRequest;
+ if (analysisResult.isQueryStmt() && queryCtx.request.query_options.mt_dop > 0) {
+ queryExecRequest = mtCreateExecRequest(planner, explainString);
+ } else {
+ queryExecRequest = createExecRequest(planner, explainString);
+ }
+ queryExecRequest.setDesc_tbl(
+ planner.getAnalysisResult().getAnalyzer().getDescTbl().toThrift());
+ queryExecRequest.setQuery_ctx(queryCtx);
+ queryExecRequest.setHost_list(analysisResult.getAnalyzer().getHostIndex().getList());
TLineageGraph thriftLineageGraph = analysisResult.getThriftLineageGraph();
if (thriftLineageGraph != null && thriftLineageGraph.isSetQuery_text()) {
@@ -1071,7 +1176,6 @@ public class Frontend {
}
result.setQuery_exec_request(queryExecRequest);
-
if (analysisResult.isQueryStmt()) {
// fill in the metadata
LOG.debug("create result set metadata");
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9b9933b/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java b/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
index 4464203..284d7e5 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
@@ -374,9 +374,9 @@ public class PlannerTestBase extends FrontendTestBase {
}
/**
- * Produces single-node and distributed plans for testCase and compares
+ * Produces single-node, distributed, and parallel plans for testCase and compares
* plan and scan range results.
- * Appends the actual single-node and distributed plan as well as the printed
+ * Appends the actual plans as well as the printed
* scan ranges to actualOutput, along with the requisite section header.
* locations to actualScanRangeLocations; compares both to the appropriate sections
* of 'testCase'.
@@ -430,7 +430,7 @@ public class PlannerTestBase extends FrontendTestBase {
ImpalaInternalServiceConstants.NUM_NODES_ALL);
}
if (section == Section.PARALLELPLANS) {
- queryCtx.request.query_options.mt_num_cores = 2;
+ queryCtx.request.query_options.mt_dop = 2;
}
ArrayList<String> expectedPlan = testCase.getSectionContents(section);
boolean sectionExists = expectedPlan != null && !expectedPlan.isEmpty();