You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2016/09/22 16:12:57 UTC
[1/2] incubator-impala git commit: IMPALA-4117: Factor simple
scheduler test code into own files
Repository: incubator-impala
Updated Branches:
refs/heads/master 1b9d9ea7c -> 0d0c93ec8
IMPALA-4117: Factor simple scheduler test code into own files
This change merely splits the helper classes from
simple-scheduler-test.cc into separate .h and .cc files. It does not
change the semantics of the code. Whitespace formatting has been done
with git-clang-format for any changed lines.
Change-Id: Id3a6b3336db175eb095cbeb7ec623a5957b77ccc
Reviewed-on: http://gerrit.cloudera.org:8080/4486
Reviewed-by: Matthew Jacobs <mj...@cloudera.com>
Reviewed-by: Dan Hecht <dh...@cloudera.com>
Tested-by: Internal Jenkins
Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/d76a2b22
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/d76a2b22
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/d76a2b22
Branch: refs/heads/master
Commit: d76a2b2272ea4b2d2b9e5636c7f4e2e8da18961c
Parents: 1b9d9ea
Author: Lars Volker <lv...@cloudera.com>
Authored: Wed Sep 14 17:17:51 2016 -0700
Committer: Internal Jenkins <cl...@gerrit.cloudera.org>
Committed: Thu Sep 22 05:19:26 2016 +0000
----------------------------------------------------------------------
be/src/scheduling/CMakeLists.txt | 1 +
be/src/scheduling/simple-scheduler-test-util.cc | 543 ++++++++++++
be/src/scheduling/simple-scheduler-test-util.h | 465 ++++++++++
be/src/scheduling/simple-scheduler-test.cc | 852 +------------------
be/src/scheduling/simple-scheduler.h | 4 +-
5 files changed, 1015 insertions(+), 850 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d76a2b22/be/src/scheduling/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/scheduling/CMakeLists.txt b/be/src/scheduling/CMakeLists.txt
index 9cfb672..ef49a14 100644
--- a/be/src/scheduling/CMakeLists.txt
+++ b/be/src/scheduling/CMakeLists.txt
@@ -28,6 +28,7 @@ add_library(Scheduling STATIC
backend-config.cc
query-schedule.cc
request-pool-service.cc
+ simple-scheduler-test-util.cc
simple-scheduler.cc
)
add_dependencies(Scheduling thrift-deps)
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d76a2b22/be/src/scheduling/simple-scheduler-test-util.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/simple-scheduler-test-util.cc b/be/src/scheduling/simple-scheduler-test-util.cc
new file mode 100644
index 0000000..3e14ea7
--- /dev/null
+++ b/be/src/scheduling/simple-scheduler-test-util.cc
@@ -0,0 +1,543 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "simple-scheduler-test-util.h"
+#include "simple-scheduler.h"
+
+#include "common/names.h"
+
+using namespace impala;
+using namespace impala::test;
+
+/// Sample 'n' elements without replacement from the set [0..N-1].
+/// This is an implementation of "Algorithm R" by J. Vitter.
+void SampleN(int n, int N, vector<int>* out) {
+ if (n == 0) return;
+ DCHECK(n <= N);
+ out->reserve(n);
+ out->clear();
+ for (int i = 0; i < n; ++i) out->push_back(i);
+ for (int i = n; i < N; ++i) {
+ // Accept element with probability n/i. Place at random position.
+ int r = rand() % i;
+ if (r < n) (*out)[r] = i;
+ }
+}
+
+/// Sample a set of 'n' elements from 'in' without replacement and copy them to
+/// 'out'.
+template <typename T>
+void SampleNElements(int n, const vector<T>& in, vector<T>* out) {
+ vector<int> idxs;
+ SampleN(n, in.size(), &idxs);
+ DCHECK_EQ(n, idxs.size());
+ out->reserve(n);
+ for (int idx : idxs) out->push_back(in[idx]);
+}
+
+/// Define constants from simple-scheduler-test-util.h here.
+const int Cluster::BACKEND_PORT = 1000;
+const int Cluster::DATANODE_PORT = 2000;
+const string Cluster::HOSTNAME_PREFIX = "host_";
+const string Cluster::IP_PREFIX = "10";
+
+/// Default size for new blocks is 1MB.
+const int64_t Block::DEFAULT_BLOCK_SIZE = 1 << 20;
+
+int Cluster::AddHost(bool has_backend, bool has_datanode) {
+ int host_idx = hosts_.size();
+ int be_port = has_backend ? BACKEND_PORT : -1;
+ int dn_port = has_datanode ? DATANODE_PORT : -1;
+ IpAddr ip = HostIdxToIpAddr(host_idx);
+ DCHECK(ip_to_idx_.find(ip) == ip_to_idx_.end());
+ ip_to_idx_[ip] = host_idx;
+ hosts_.push_back(Host(HostIdxToHostname(host_idx), ip, be_port, dn_port));
+ // Add host to lists of backend indexes per type.
+ if (has_backend) backend_host_idxs_.push_back(host_idx);
+ if (has_datanode) {
+ datanode_host_idxs_.push_back(host_idx);
+ if (has_backend) {
+ datanode_with_backend_host_idxs_.push_back(host_idx);
+ } else {
+ datanode_only_host_idxs_.push_back(host_idx);
+ }
+ }
+ return host_idx;
+}
+
+void Cluster::AddHosts(int num_hosts, bool has_backend, bool has_datanode) {
+ for (int i = 0; i < num_hosts; ++i) AddHost(has_backend, has_datanode);
+}
+
+Hostname Cluster::HostIdxToHostname(int host_idx) {
+ return HOSTNAME_PREFIX + std::to_string(host_idx);
+}
+
+void Cluster::GetBackendAddress(int host_idx, TNetworkAddress* addr) const {
+ DCHECK_LT(host_idx, hosts_.size());
+ addr->hostname = hosts_[host_idx].ip;
+ addr->port = hosts_[host_idx].be_port;
+}
+
+const vector<int>& Cluster::datanode_with_backend_host_idxs() const {
+ return datanode_with_backend_host_idxs_;
+}
+
+const vector<int>& Cluster::datanode_only_host_idxs() const {
+ return datanode_only_host_idxs_;
+}
+
+IpAddr Cluster::HostIdxToIpAddr(int host_idx) {
+ DCHECK_LT(host_idx, (1 << 24));
+ string suffix;
+ for (int i = 0; i < 3; ++i) {
+ suffix = "." + std::to_string(host_idx % 256) + suffix; // prepend
+ host_idx /= 256;
+ }
+ DCHECK_EQ(0, host_idx);
+ return IP_PREFIX + suffix;
+}
+
+void Schema::AddSingleBlockTable(
+ const TableName& table_name, const vector<int>& non_cached_replica_host_idxs) {
+ AddSingleBlockTable(table_name, non_cached_replica_host_idxs, {});
+}
+
+void Schema::AddSingleBlockTable(const TableName& table_name,
+ const vector<int>& non_cached_replica_host_idxs,
+ const vector<int>& cached_replica_host_idxs) {
+ DCHECK(tables_.find(table_name) == tables_.end());
+ Block block;
+ int num_replicas =
+ non_cached_replica_host_idxs.size() + cached_replica_host_idxs.size();
+ block.replica_host_idxs = non_cached_replica_host_idxs;
+ block.replica_host_idxs.insert(block.replica_host_idxs.end(),
+ cached_replica_host_idxs.begin(), cached_replica_host_idxs.end());
+ // Initialize for non-cached replicas first.
+ block.replica_host_idx_is_cached.resize(non_cached_replica_host_idxs.size(), false);
+ // Fill up to final size for cached replicas.
+ block.replica_host_idx_is_cached.insert(
+ block.replica_host_idx_is_cached.end(), cached_replica_host_idxs.size(), true);
+ DCHECK_EQ(block.replica_host_idxs.size(), block.replica_host_idx_is_cached.size());
+ DCHECK_EQ(block.replica_host_idxs.size(), num_replicas);
+ // Create table
+ Table table;
+ table.blocks.push_back(block);
+ // Insert table
+ tables_.emplace(table_name, table);
+}
+
+void Schema::AddMultiBlockTable(const TableName& table_name, int num_blocks,
+ ReplicaPlacement replica_placement, int num_replicas) {
+ AddMultiBlockTable(table_name, num_blocks, replica_placement, num_replicas, 0);
+}
+
+void Schema::AddMultiBlockTable(const TableName& table_name, int num_blocks,
+ ReplicaPlacement replica_placement, int num_replicas, int num_cached_replicas) {
+ DCHECK_GT(num_replicas, 0);
+ DCHECK(num_cached_replicas <= num_replicas);
+ Table table;
+ for (int i = 0; i < num_blocks; ++i) {
+ Block block;
+ vector<int>& replica_idxs = block.replica_host_idxs;
+
+ // Determine replica host indexes.
+ switch (replica_placement) {
+ case ReplicaPlacement::RANDOM:
+ SampleNElements(num_replicas, cluster_.datanode_host_idxs(), &replica_idxs);
+ break;
+ case ReplicaPlacement::LOCAL_ONLY:
+ DCHECK(num_replicas <= cluster_.datanode_with_backend_host_idxs().size());
+ SampleNElements(
+ num_replicas, cluster_.datanode_with_backend_host_idxs(), &replica_idxs);
+ break;
+ case ReplicaPlacement::REMOTE_ONLY:
+ DCHECK(num_replicas <= cluster_.datanode_only_host_idxs().size());
+ SampleNElements(num_replicas, cluster_.datanode_only_host_idxs(), &replica_idxs);
+ break;
+ default:
+ DCHECK(false) << "Unsupported replica placement: " << (int)replica_placement;
+ }
+
+ // Determine cached replicas.
+ vector<int> cached_replicas;
+ vector<bool>& is_cached = block.replica_host_idx_is_cached;
+ is_cached.resize(num_replicas, false);
+ SampleN(num_cached_replicas, num_replicas, &cached_replicas);
+ // Flag cached entries.
+ for (const int idx : cached_replicas) is_cached[idx] = true;
+
+ DCHECK_EQ(replica_idxs.size(), is_cached.size());
+ table.blocks.push_back(block);
+ }
+ // Insert table
+ tables_[table_name] = table;
+}
+
+const Table& Schema::GetTable(const TableName& table_name) const {
+ auto it = tables_.find(table_name);
+ DCHECK(it != tables_.end());
+ return it->second;
+}
+
+void Plan::SetReplicaPreference(TReplicaPreference::type p) {
+ query_options_.replica_preference = p;
+}
+
+const vector<TNetworkAddress>& Plan::referenced_datanodes() const {
+ return referenced_datanodes_;
+}
+
+const vector<TScanRangeLocations>& Plan::scan_range_locations() const {
+ return scan_range_locations_;
+}
+
+void Plan::AddTableScan(const TableName& table_name) {
+ const Table& table = schema_.GetTable(table_name);
+ const vector<Block>& blocks = table.blocks;
+ for (int i = 0; i < blocks.size(); ++i) {
+ const Block& block = blocks[i];
+ TScanRangeLocations scan_range_locations;
+ BuildTScanRangeLocations(table_name, block, i, &scan_range_locations);
+ scan_range_locations_.push_back(scan_range_locations);
+ }
+}
+
+void Plan::BuildTScanRangeLocations(const TableName& table_name, const Block& block,
+ int block_idx, TScanRangeLocations* scan_range_locations) {
+ const vector<int>& replica_idxs = block.replica_host_idxs;
+ const vector<bool>& is_cached = block.replica_host_idx_is_cached;
+ DCHECK_EQ(replica_idxs.size(), is_cached.size());
+ int num_replicas = replica_idxs.size();
+ BuildScanRange(table_name, block, block_idx, &scan_range_locations->scan_range);
+ scan_range_locations->locations.resize(num_replicas);
+ for (int i = 0; i < num_replicas; ++i) {
+ TScanRangeLocation& location = scan_range_locations->locations[i];
+ location.host_idx = FindOrInsertDatanodeIndex(replica_idxs[i]);
+ location.__set_is_cached(is_cached[i]);
+ }
+}
+
+void Plan::BuildScanRange(const TableName& table_name, const Block& block, int block_idx,
+ TScanRange* scan_range) {
+ // Initialize locations.scan_range correctly.
+ THdfsFileSplit file_split;
+ // 'length' is the only member considered by the scheduler.
+ file_split.length = block.length;
+ // Encoding the table name and block index in the file helps debugging.
+ file_split.file_name = table_name + "_block_" + std::to_string(block_idx);
+ file_split.offset = 0;
+ file_split.partition_id = 0;
+ // For now, we model each file by a single block.
+ file_split.file_length = block.length;
+ file_split.file_compression = THdfsCompression::NONE;
+ file_split.mtime = 1;
+ scan_range->__set_hdfs_file_split(file_split);
+}
+
+int Plan::FindOrInsertDatanodeIndex(int cluster_datanode_idx) {
+ const Host& host = schema_.cluster().hosts()[cluster_datanode_idx];
+ auto ret = host_idx_to_datanode_idx_.emplace(
+ cluster_datanode_idx, referenced_datanodes_.size());
+ bool inserted_new_element = ret.second;
+ if (inserted_new_element) {
+ TNetworkAddress datanode;
+ datanode.hostname = host.ip;
+ datanode.port = host.dn_port;
+ referenced_datanodes_.push_back(datanode);
+ }
+ return ret.first->second;
+}
+
+int Result::NumTotalAssignments(int host_idx) const {
+ return CountAssignmentsIf(IsHost(host_idx));
+}
+
+int Result::NumTotalAssignedBytes(int host_idx) const {
+ return CountAssignedBytesIf(IsHost(host_idx));
+}
+
+int Result::NumCachedAssignments(int host_idx) const {
+ return CountAssignmentsIf(IsCached(IsHost(host_idx)));
+}
+
+int Result::NumCachedAssignedBytes(int host_idx) const {
+ return CountAssignedBytesIf(IsCached(IsHost(host_idx)));
+}
+
+int Result::NumDiskAssignments(int host_idx) const {
+ return CountAssignmentsIf(IsDisk(IsHost(host_idx)));
+}
+
+int Result::NumDiskAssignedBytes(int host_idx) const {
+ return CountAssignedBytesIf(IsDisk(IsHost(host_idx)));
+}
+
+int Result::NumRemoteAssignments(int host_idx) const {
+ return CountAssignmentsIf(IsRemote(IsHost(host_idx)));
+}
+
+int Result::NumRemoteAssignedBytes(int host_idx) const {
+ return CountAssignedBytesIf(IsRemote(IsHost(host_idx)));
+}
+
+int Result::MaxNumAssignmentsPerHost() const {
+ NumAssignmentsPerBackend num_assignments_per_backend;
+ CountAssignmentsPerBackend(&num_assignments_per_backend);
+ int max_count = 0;
+ for (const auto& elem : num_assignments_per_backend) {
+ max_count = max(max_count, elem.second);
+ }
+ return max_count;
+}
+
+int64_t Result::MaxNumAssignedBytesPerHost() const {
+ NumAssignedBytesPerBackend num_assigned_bytes_per_backend;
+ CountAssignedBytesPerBackend(&num_assigned_bytes_per_backend);
+ int64_t max_assigned_bytes = 0;
+ for (const auto& elem : num_assigned_bytes_per_backend) {
+ max_assigned_bytes = max(max_assigned_bytes, elem.second);
+ }
+ return max_assigned_bytes;
+}
+
+int Result::MinNumAssignmentsPerHost() const {
+ NumAssignmentsPerBackend num_assignments_per_backend;
+ CountAssignmentsPerBackend(&num_assignments_per_backend);
+ int min_count = numeric_limits<int>::max();
+ for (const auto& elem : num_assignments_per_backend) {
+ min_count = min(min_count, elem.second);
+ }
+ DCHECK_GT(min_count, 0);
+ return min_count;
+}
+
+int64_t Result::MinNumAssignedBytesPerHost() const {
+ NumAssignedBytesPerBackend num_assigned_bytes_per_backend;
+ CountAssignedBytesPerBackend(&num_assigned_bytes_per_backend);
+ int64_t min_assigned_bytes = 0;
+ for (const auto& elem : num_assigned_bytes_per_backend) {
+ min_assigned_bytes = max(min_assigned_bytes, elem.second);
+ }
+ DCHECK_GT(min_assigned_bytes, 0);
+ return min_assigned_bytes;
+}
+
+int Result::NumDistinctBackends() const {
+ unordered_set<IpAddr> backends;
+ AssignmentCallback cb = [&backends](
+ const AssignmentInfo& assignment) { backends.insert(assignment.addr.hostname); };
+ ProcessAssignments(cb);
+ return backends.size();
+}
+
+const FragmentScanRangeAssignment& Result::GetAssignment(int index) const {
+ DCHECK_GT(assignments_.size(), index);
+ return assignments_[index];
+}
+
+FragmentScanRangeAssignment* Result::AddAssignment() {
+ assignments_.push_back(FragmentScanRangeAssignment());
+ return &assignments_.back();
+}
+
+Result::AssignmentFilter Result::Any() const {
+ return [](const AssignmentInfo& assignment) { return true; };
+}
+
+Result::AssignmentFilter Result::IsHost(int host_idx) const {
+ TNetworkAddress expected_addr;
+ plan_.cluster().GetBackendAddress(host_idx, &expected_addr);
+ return [expected_addr](
+ const AssignmentInfo& assignment) { return assignment.addr == expected_addr; };
+}
+
+Result::AssignmentFilter Result::IsCached(AssignmentFilter filter) const {
+ return [filter](const AssignmentInfo& assignment) {
+ return filter(assignment) && assignment.is_cached;
+ };
+}
+
+Result::AssignmentFilter Result::IsDisk(AssignmentFilter filter) const {
+ return [filter](const AssignmentInfo& assignment) {
+ return filter(assignment) && !assignment.is_cached && !assignment.is_remote;
+ };
+}
+
+Result::AssignmentFilter Result::IsRemote(AssignmentFilter filter) const {
+ return [filter](const AssignmentInfo& assignment) {
+ return filter(assignment) && assignment.is_remote;
+ };
+}
+
+void Result::ProcessAssignments(const AssignmentCallback& cb) const {
+ for (const FragmentScanRangeAssignment& assignment : assignments_) {
+ for (const auto& assignment_elem : assignment) {
+ const TNetworkAddress& addr = assignment_elem.first;
+ const PerNodeScanRanges& per_node_ranges = assignment_elem.second;
+ for (const auto& per_node_ranges_elem : per_node_ranges) {
+ const vector<TScanRangeParams> scan_range_params_vector =
+ per_node_ranges_elem.second;
+ for (const TScanRangeParams& scan_range_params : scan_range_params_vector) {
+ const TScanRange& scan_range = scan_range_params.scan_range;
+ DCHECK(scan_range.__isset.hdfs_file_split);
+ const THdfsFileSplit& hdfs_file_split = scan_range.hdfs_file_split;
+ bool is_cached =
+ scan_range_params.__isset.is_cached ? scan_range_params.is_cached : false;
+ bool is_remote =
+ scan_range_params.__isset.is_remote ? scan_range_params.is_remote : false;
+ cb({addr, hdfs_file_split, is_cached, is_remote});
+ }
+ }
+ }
+ }
+}
+
+int Result::CountAssignmentsIf(const AssignmentFilter& filter) const {
+ int count = 0;
+ AssignmentCallback cb = [&count, filter](const AssignmentInfo& assignment) {
+ if (filter(assignment)) ++count;
+ };
+ ProcessAssignments(cb);
+ return count;
+}
+
+int64_t Result::CountAssignedBytesIf(const AssignmentFilter& filter) const {
+ int64_t assigned_bytes = 0;
+ AssignmentCallback cb = [&assigned_bytes, filter](const AssignmentInfo& assignment) {
+ if (filter(assignment)) assigned_bytes += assignment.hdfs_file_split.length;
+ };
+ ProcessAssignments(cb);
+ return assigned_bytes;
+}
+
+void Result::CountAssignmentsPerBackend(
+ NumAssignmentsPerBackend* num_assignments_per_backend) const {
+ AssignmentCallback cb = [&num_assignments_per_backend](
+ const AssignmentInfo& assignment) {
+ ++(*num_assignments_per_backend)[assignment.addr.hostname];
+ };
+ ProcessAssignments(cb);
+}
+
+void Result::CountAssignedBytesPerBackend(
+ NumAssignedBytesPerBackend* num_assignments_per_backend) const {
+ AssignmentCallback cb = [&num_assignments_per_backend](
+ const AssignmentInfo& assignment) {
+ (*num_assignments_per_backend)[assignment.addr.hostname] +=
+ assignment.hdfs_file_split.length;
+ };
+ ProcessAssignments(cb);
+}
+
+SchedulerWrapper::SchedulerWrapper(const Plan& plan)
+ : plan_(plan), metrics_("TestMetrics") {
+ InitializeScheduler();
+}
+
+void SchedulerWrapper::Compute(bool exec_at_coord, Result* result) {
+ DCHECK(scheduler_ != NULL);
+
+ // Compute Assignment.
+ FragmentScanRangeAssignment* assignment = result->AddAssignment();
+ scheduler_->ComputeScanRangeAssignment(*scheduler_->GetBackendConfig(), 0, NULL, false,
+ plan_.scan_range_locations(), plan_.referenced_datanodes(), exec_at_coord,
+ plan_.query_options(), NULL, assignment);
+}
+
+void SchedulerWrapper::AddBackend(const Host& host) {
+ // Add to topic delta
+ TTopicDelta delta;
+ delta.topic_name = SimpleScheduler::IMPALA_MEMBERSHIP_TOPIC;
+ delta.is_delta = true;
+ AddHostToTopicDelta(host, &delta);
+ SendTopicDelta(delta);
+}
+
+void SchedulerWrapper::RemoveBackend(const Host& host) {
+ // Add deletion to topic delta
+ TTopicDelta delta;
+ delta.topic_name = SimpleScheduler::IMPALA_MEMBERSHIP_TOPIC;
+ delta.is_delta = true;
+ delta.topic_deletions.push_back(host.ip);
+ SendTopicDelta(delta);
+}
+
+void SchedulerWrapper::SendFullMembershipMap() {
+ TTopicDelta delta;
+ delta.topic_name = SimpleScheduler::IMPALA_MEMBERSHIP_TOPIC;
+ delta.is_delta = false;
+ for (const Host& host : plan_.cluster().hosts()) {
+ if (host.be_port >= 0) AddHostToTopicDelta(host, &delta);
+ }
+ SendTopicDelta(delta);
+}
+
+void SchedulerWrapper::SendEmptyUpdate() {
+ TTopicDelta delta;
+ delta.topic_name = SimpleScheduler::IMPALA_MEMBERSHIP_TOPIC;
+ delta.is_delta = true;
+ SendTopicDelta(delta);
+}
+
+void SchedulerWrapper::InitializeScheduler() {
+ DCHECK(scheduler_ == NULL);
+ DCHECK_GT(plan_.cluster().NumHosts(), 0) << "Cannot initialize scheduler with 0 "
+ << "hosts.";
+ const Host& scheduler_host = plan_.cluster().hosts()[0];
+ string scheduler_backend_id = scheduler_host.ip;
+ TNetworkAddress scheduler_backend_address;
+ scheduler_backend_address.hostname = scheduler_host.ip;
+ scheduler_backend_address.port = scheduler_host.be_port;
+
+ scheduler_.reset(new SimpleScheduler(
+ NULL, scheduler_backend_id, scheduler_backend_address, &metrics_, NULL, NULL));
+ scheduler_->Init();
+ // Initialize the scheduler backend maps.
+ SendFullMembershipMap();
+}
+
+void SchedulerWrapper::AddHostToTopicDelta(const Host& host, TTopicDelta* delta) const {
+ DCHECK_GT(host.be_port, 0) << "Host cannot be added to scheduler without a running "
+ << "backend";
+ // Build backend descriptor.
+ TBackendDescriptor be_desc;
+ be_desc.address.hostname = host.ip;
+ be_desc.address.port = host.be_port;
+ be_desc.ip_address = host.ip;
+
+ // Build topic item.
+ TTopicItem item;
+ item.key = host.ip;
+ ThriftSerializer serializer(false);
+ Status status = serializer.Serialize(&be_desc, &item.value);
+ DCHECK(status.ok());
+
+ // Add to topic delta.
+ delta->topic_entries.push_back(item);
+}
+
+void SchedulerWrapper::SendTopicDelta(const TTopicDelta& delta) {
+ DCHECK(scheduler_ != NULL);
+ // Wrap in topic delta map.
+ StatestoreSubscriber::TopicDeltaMap delta_map;
+ delta_map.emplace(SimpleScheduler::IMPALA_MEMBERSHIP_TOPIC, delta);
+
+ // Send to the scheduler.
+ vector<TTopicDelta> dummy_result;
+ scheduler_->UpdateMembership(delta_map, &dummy_result);
+}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d76a2b22/be/src/scheduling/simple-scheduler-test-util.h
----------------------------------------------------------------------
diff --git a/be/src/scheduling/simple-scheduler-test-util.h b/be/src/scheduling/simple-scheduler-test-util.h
new file mode 100644
index 0000000..85bb1a5
--- /dev/null
+++ b/be/src/scheduling/simple-scheduler-test-util.h
@@ -0,0 +1,465 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <string>
+#include <unordered_map>
+#include <vector>
+
+#include <boost/scoped_ptr.hpp>
+
+#include "gen-cpp/ImpalaInternalService.h" // for TQueryOptions
+#include "scheduling/query-schedule.h"
+#include "util/metrics.h"
+
+namespace impala {
+
+class SimpleScheduler;
+class TTopicDelta;
+
+namespace test {
+
+typedef std::string TableName;
+
+/// Helper classes to be used by the scheduler tests.
+
+/// Overall testing approach: Each test builds a list of hosts and a plan, both to which
+/// elements can be added using various helper methods. Then scheduling can be tested
+/// by instantiating SchedulerWrapper and calling Compute(...). The result can be verified
+/// using a set of helper methods. There are also helper methods to modify the internal
+/// state of the scheduler between subsequent calls to SchedulerWrapper::Compute().
+///
+/// The model currently comes with some known limitations:
+///
+/// - Files map 1:1 to blocks and to scan ranges.
+/// - All files have the same size (1 block of 1M). Tables that differ in size can be
+/// expressed as having a different number of blocks.
+/// - We don't support multiple backends on a single host.
+/// - Ports are assigned to hosts automatically and are not configurable by the test.
+
+// TODO: Extend the model to support files with multiple blocks.
+// TODO: Test more methods of the scheduler.
+// TODO: Add support to add skewed table scans with multiple scan ranges: often there are
+// 3 replicas where there may be skew for 1 of the replicas (e.g. after a single
+// node insert) but the other 2 are random.
+// TODO: Make use of the metrics provided by the scheduler.
+// TODO: Add checks for MinNumAssignmentsPerHost() to all tests where applicable.
+// TODO: Add post-condition checks that have to hold for all successful scheduler runs.
+// TODO: Add possibility to explicitly specify the replica location per file.
+// TODO: Add methods to retrieve and verify generated file placements from plan.
+// TODO: Extend the model to specify a physical schema independently of a plan (ie,
+// tables/files, blocks, replicas and cached replicas exist independently of the
+// queries that run against them).
+
+/// File blocks store a list of all datanodes that have a replica of the block. When
+/// defining tables you can specify the desired replica placement among all available
+/// datanodes in the cluster.
+///
+/// - RANDOM means that any datanode can be picked.
+/// - LOCAL_ONLY means that only datanodes with a backend will be picked.
+/// - REMOTE_ONLY means that only datanodes without a backend will be picked.
+///
+/// Whether replicas will be cached or not is not determined by this value, but by
+/// additional function arguments when adding tables to the schema.
+enum class ReplicaPlacement {
+ RANDOM,
+ LOCAL_ONLY,
+ REMOTE_ONLY,
+};
+
+/// Host model. Each host can have either a backend, a datanode, or both. To specify that
+/// a host should not act as a backend or datanode specify '-1' as the respective port.
+struct Host {
+ Host(const Hostname& name, const IpAddr& ip, int be_port, int dn_port)
+ : name(name), ip(ip), be_port(be_port), dn_port(dn_port) {}
+ Hostname name;
+ IpAddr ip;
+ int be_port; // Backend port
+ int dn_port; // Datanode port
+};
+
+/// A cluster stores a list of hosts and provides various methods to add hosts to the
+/// cluster. All hosts are guaranteed to have unique IP addresses and hostnames.
+class Cluster {
+ public:
+ /// Add a host and return the host's index. 'hostname' and 'ip' of the new host will be
+ /// generated and are guaranteed to be unique.
+ int AddHost(bool has_backend, bool has_datanode);
+
+ /// Add a number of hosts with the same properties by repeatedly calling AddHost(..).
+ void AddHosts(int num_hosts, bool has_backend, bool has_datanode);
+
+ /// Convert a host index to a hostname.
+ static Hostname HostIdxToHostname(int host_idx);
+
+ /// Return the backend address (ip, port) for the host with index 'host_idx'.
+ void GetBackendAddress(int host_idx, TNetworkAddress* addr) const;
+
+ const std::vector<Host>& hosts() const { return hosts_; }
+ int NumHosts() const { return hosts_.size(); }
+
+ /// These methods return lists of host indexes, grouped by their type, which can be used
+ /// to draw samples of random sets of hosts.
+ /// TODO: Think of a nicer abstraction to expose this information.
+ const std::vector<int>& backend_host_idxs() const { return backend_host_idxs_; }
+ const std::vector<int>& datanode_host_idxs() const { return datanode_host_idxs_; }
+ const std::vector<int>& datanode_with_backend_host_idxs() const;
+ const std::vector<int>& datanode_only_host_idxs() const;
+
+ private:
+ /// Port for all backends.
+ static const int BACKEND_PORT;
+
+ /// Port for all datanodes.
+ static const int DATANODE_PORT;
+
+ /// Prefix for all generated hostnames.
+ static const std::string HOSTNAME_PREFIX;
+
+ /// First octet for all generated IP addresses.
+ static const std::string IP_PREFIX;
+
+ /// List of hosts in this cluster.
+ std::vector<Host> hosts_;
+
+ /// Lists of indexes of hosts, grouped by their type. The lists reference hosts in
+ /// 'hosts_' by index and are used for random sampling.
+ ///
+ /// All hosts with a backend.
+ std::vector<int> backend_host_idxs_;
+ /// All hosts with a datanode.
+ std::vector<int> datanode_host_idxs_;
+ /// All hosts with a datanode and a backend.
+ std::vector<int> datanode_with_backend_host_idxs_;
+ /// All hosts with a datanode but no backend.
+ std::vector<int> datanode_only_host_idxs_;
+
+ /// Map from IP addresses to host indexes.
+ std::unordered_map<IpAddr, int> ip_to_idx_;
+
+ /// Convert a host index to an IP address. The host index must be smaller than 2^24 and
+ /// will specify the lower 24 bits of the IPv4 address (the lower 3 octets).
+ static IpAddr HostIdxToIpAddr(int host_idx);
+};
+
+struct Block {
+ /// By default all blocks are of the same size.
+ int64_t length = DEFAULT_BLOCK_SIZE;
+
+ /// Index into the cluster that owns the table that owns this block.
+ std::vector<int> replica_host_idxs;
+
+ /// Flag for each entry in replica_host_idxs whether it is a cached replica or not.
+ std::vector<bool> replica_host_idx_is_cached;
+
+ /// Default size for new blocks.
+ static const int64_t DEFAULT_BLOCK_SIZE;
+};
+
+struct Table {
+ std::vector<Block> blocks;
+};
+
+class Schema {
+ public:
+ Schema(const Cluster& cluster) : cluster_(cluster) {}
+
+ /// Add a table consisting of a single block to the schema with explicitly specified
+ /// replica indexes for non-cached replicas and without any cached replicas. Replica
+ /// indexes must refer to hosts in cluster_.hosts() by index.
+ void AddSingleBlockTable(
+ const TableName& table_name, const std::vector<int>& non_cached_replica_host_idxs);
+
+ /// Add a table consisting of a single block to the schema with explicitly specified
+ /// replica indexes for both non-cached and cached replicas. Values in both lists must
+ /// refer to hosts in cluster_.hosts() by index. Both lists must be disjoint, i.e., a
+ /// replica can either be cached or not.
+ void AddSingleBlockTable(const TableName& table_name,
+ const std::vector<int>& non_cached_replica_host_idxs,
+ const std::vector<int>& cached_replica_host_idxs);
+
+ /// Add a table to the schema, selecting replica hosts according to the given replica
+ /// placement preference. All replicas will be non-cached.
+ void AddMultiBlockTable(const TableName& table_name, int num_blocks,
+ ReplicaPlacement replica_placement, int num_replicas);
+
+ /// Add a table to the schema, selecting replica hosts according to the given replica
+ /// placement preference. After replica selection has been done, 'num_cached_replicas'
+ /// of them are marked as cached.
+ void AddMultiBlockTable(const TableName& table_name, int num_blocks,
+ ReplicaPlacement replica_placement, int num_replicas, int num_cached_replicas);
+
+ const Table& GetTable(const TableName& table_name) const;
+
+ const Cluster& cluster() const { return cluster_; }
+
+ private:
+ /// Store a reference to the cluster, from which hosts are sampled. Test results will
+ /// use the cluster to resolve host indexes to hostnames and IP addresses.
+ const Cluster& cluster_;
+
+ std::unordered_map<TableName, Table> tables_;
+};
+
+/// Plan model. A plan contains a list of tables to scan and the query options to be used
+/// during scheduling.
+class Plan {
+ public:
+ Plan(const Schema& schema) : schema_(schema) {}
+
+ const TQueryOptions& query_options() const { return query_options_; }
+
+ void SetReplicaPreference(TReplicaPreference::type p);
+
+ void SetRandomReplica(bool b) { query_options_.schedule_random_replica = b; }
+ void SetDisableCachedReads(bool b) { query_options_.disable_cached_reads = b; }
+ const Cluster& cluster() const { return schema_.cluster(); }
+
+ const std::vector<TNetworkAddress>& referenced_datanodes() const;
+
+ const std::vector<TScanRangeLocations>& scan_range_locations() const;
+
+ /// Add a scan of table 'table_name' to the plan. This method will populate the internal
+ /// list of TScanRangeLocations and can be called multiple times for the same table to
+ /// schedule additional scans.
+ void AddTableScan(const TableName& table_name);
+
+ private:
+ /// Store a reference to the schema, from which scanned tables will be read.
+ const Schema& schema_;
+
+ TQueryOptions query_options_;
+
+ /// List of all datanodes that are referenced by this plan. Only hosts that have an
+ /// assigned scan range are added here.
+ std::vector<TNetworkAddress> referenced_datanodes_;
+
+ /// Map from plan host index to an index in 'referenced_datanodes_'.
+ std::unordered_map<int, int> host_idx_to_datanode_idx_;
+
+ /// List of all scan range locations, which can be passed to the SimpleScheduler.
+ std::vector<TScanRangeLocations> scan_range_locations_;
+
+ /// Initialize a TScanRangeLocations object in place.
+ void BuildTScanRangeLocations(const TableName& table_name, const Block& block,
+ int block_idx, TScanRangeLocations* scan_range_locations);
+
+ void BuildScanRange(const TableName& table_name, const Block& block, int block_idx,
+ TScanRange* scan_range);
+
+ /// Look up the plan-local host index of 'cluster_datanode_idx'. If the host has not
+ /// been added to the plan before, it will add it to 'referenced_datanodes_' and return
+ /// the new index.
+ int FindOrInsertDatanodeIndex(int cluster_datanode_idx);
+};
+
+class Result {
+ private:
+ /// Map to count the number of assignments per backend.
+ typedef std::unordered_map<IpAddr, int> NumAssignmentsPerBackend;
+
+ /// Map to count the number of assigned bytes per backend.
+ typedef std::unordered_map<IpAddr, int64_t> NumAssignedBytesPerBackend;
+
+ /// Parameter type for callbacks, which are used to filter scheduling results.
+ struct AssignmentInfo {
+ const TNetworkAddress& addr;
+ const THdfsFileSplit& hdfs_file_split;
+ bool is_cached;
+ bool is_remote;
+ };
+
+ /// These functions are used as callbacks when processing the scheduling result. They
+ /// will be called once per assigned scan range.
+ typedef std::function<bool(const AssignmentInfo& assignment)> AssignmentFilter;
+ typedef std::function<void(const AssignmentInfo& assignment)> AssignmentCallback;
+
+ public:
+ Result(const Plan& plan) : plan_(plan) {}
+
+ /// Return the total number of scheduled assignments.
+ int NumTotalAssignments() const { return CountAssignmentsIf(Any()); }
+
+ /// Return the total number of assigned bytes.
+ int NumTotalAssignedBytes() const { return CountAssignedBytesIf(Any()); }
+
+ /// Return the number of scheduled assignments for a single host.
+ int NumTotalAssignments(int host_idx) const;
+
+ /// Return the number of assigned bytes for a single host.
+ int NumTotalAssignedBytes(int host_idx) const;
+
+ /// Return the total number of assigned cached reads.
+ int NumCachedAssignments() const { return CountAssignmentsIf(IsCached(Any())); }
+
+ /// Return the total number of assigned bytes for cached reads.
+ int NumCachedAssignedBytes() const { return CountAssignedBytesIf(IsCached(Any())); }
+
+ /// Return the total number of assigned cached reads for a single host.
+ int NumCachedAssignments(int host_idx) const;
+
+ /// Return the total number of assigned bytes for cached reads for a single host.
+ int NumCachedAssignedBytes(int host_idx) const;
+
+ /// Return the total number of assigned non-cached reads.
+ int NumDiskAssignments() const { return CountAssignmentsIf(IsDisk(Any())); }
+
+ /// Return the total number of assigned bytes for non-cached reads.
+ int NumDiskAssignedBytes() const { return CountAssignedBytesIf(IsDisk(Any())); }
+
+ /// Return the total number of assigned non-cached reads for a single host.
+ int NumDiskAssignments(int host_idx) const;
+
+ /// Return the total number of assigned bytes for non-cached reads for a single host.
+ int NumDiskAssignedBytes(int host_idx) const;
+
+ /// Return the total number of assigned remote reads.
+ int NumRemoteAssignments() const { return CountAssignmentsIf(IsRemote(Any())); }
+
+ /// Return the total number of assigned bytes for remote reads.
+ int NumRemoteAssignedBytes() const { return CountAssignedBytesIf(IsRemote(Any())); }
+
+ /// Return the total number of assigned remote reads for a single host.
+ int NumRemoteAssignments(int host_idx) const;
+
+ /// Return the total number of assigned bytes for remote reads for a single host.
+ int NumRemoteAssignedBytes(int host_idx) const;
+
+ /// Return the maximum number of assigned reads over all hosts.
+ int MaxNumAssignmentsPerHost() const;
+
+ /// Return the maximum number of assigned reads over all hosts.
+ int64_t MaxNumAssignedBytesPerHost() const;
+
+ /// Return the minimum number of assigned reads over all hosts.
+ /// NOTE: This is computed by traversing all recorded assignments and thus will not
+ /// consider hosts without any assignments. Hence the minimum value to expect is 1 (not
+ /// 0).
+ int MinNumAssignmentsPerHost() const;
+
+ /// Return the minimum number of assigned bytes over all hosts.
+ /// NOTE: This is computed by traversing all recorded assignments and thus will not
+ /// consider hosts without any assignments. Hence the minimum value to expect is 1 (not
+ /// 0).
+ int64_t MinNumAssignedBytesPerHost() const;
+
+ /// Return the number of scan range assignments stored in this result.
+ int NumAssignments() const { return assignments_.size(); }
+
+ /// Return the number of distinct backends that have been picked by the scheduler so
+ /// far.
+ int NumDistinctBackends() const;
+
+ /// Return the full assignment for manual matching.
+ const FragmentScanRangeAssignment& GetAssignment(int index = 0) const;
+
+ /// Add an assignment to the result and return a reference, which can then be passed on
+ /// to the scheduler.
+ FragmentScanRangeAssignment* AddAssignment();
+
+ /// Reset the result to an empty state.
+ void Reset() { assignments_.clear(); }
+
+ private:
+ /// Vector to store results of consecutive scheduler runs.
+ std::vector<FragmentScanRangeAssignment> assignments_;
+
+ /// Reference to the plan, needed to look up hosts.
+ const Plan& plan_;
+
+ /// Dummy filter matching any assignment.
+ AssignmentFilter Any() const;
+
+ /// Filter to only match assignments of a particular host.
+ AssignmentFilter IsHost(int host_idx) const;
+
+ /// Filter to only match assignments of cached reads.
+ AssignmentFilter IsCached(AssignmentFilter filter) const;
+
+ /// Filter to only match assignments of non-cached, local disk reads.
+ AssignmentFilter IsDisk(AssignmentFilter filter) const;
+
+ /// Filter to only match assignments of remote reads.
+ AssignmentFilter IsRemote(AssignmentFilter filter) const;
+
+ /// Process all recorded assignments and call the supplied callback on each tuple of IP
+ /// address and scan_range it iterates over.
+ void ProcessAssignments(const AssignmentCallback& cb) const;
+
+ /// Count all assignments matching the supplied filter callback.
+ int CountAssignmentsIf(const AssignmentFilter& filter) const;
+
+ /// Count all assignments matching the supplied filter callback.
+ int64_t CountAssignedBytesIf(const AssignmentFilter& filter) const;
+
+ /// Create a map containing the number of assigned scan ranges per node.
+ void CountAssignmentsPerBackend(
+ NumAssignmentsPerBackend* num_assignments_per_backend) const;
+
+ /// Create a map containing the number of assigned bytes per node.
+ void CountAssignedBytesPerBackend(
+ NumAssignedBytesPerBackend* num_assignments_per_backend) const;
+};
+
+/// This class wraps the SimpleScheduler and provides helper for easier instrumentation
+/// during tests.
+class SchedulerWrapper {
+ public:
+ SchedulerWrapper(const Plan& plan);
+
+ /// Call ComputeScanRangeAssignment() with exec_at_coord set to false.
+ void Compute(Result* result) { Compute(false, result); }
+
+ /// Call ComputeScanRangeAssignment().
+ void Compute(bool exec_at_coord, Result* result);
+
+ /// Reset the state of the scheduler by re-creating and initializing it.
+ void Reset() { InitializeScheduler(); }
+
+ /// Methods to modify the internal lists of backends maintained by the scheduler.
+
+ /// Add a backend to the scheduler.
+ void AddBackend(const Host& host);
+
+ /// Remove a backend from the scheduler.
+ void RemoveBackend(const Host& host);
+
+ /// Send a full map of the backends to the scheduler instead of deltas.
+ void SendFullMembershipMap();
+
+ /// Send an empty update message to the scheduler.
+ void SendEmptyUpdate();
+
+ private:
+ const Plan& plan_;
+ boost::scoped_ptr<SimpleScheduler> scheduler_;
+ MetricGroup metrics_;
+
+ /// Initialize the internal scheduler object. The method uses the 'real' constructor
+ /// used in the rest of the codebase, in contrast to the one that takes a list of
+ /// backends, which is only used for testing purposes. This allows us to properly
+ /// initialize the scheduler and exercise the UpdateMembership() method in tests.
+ void InitializeScheduler();
+
+ /// Add a single host to the given TTopicDelta.
+ void AddHostToTopicDelta(const Host& host, TTopicDelta* delta) const;
+
+ /// Send the given topic delta to the scheduler.
+ void SendTopicDelta(const TTopicDelta& delta);
+};
+
+} // end namespace test
+} // end namespace impala
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d76a2b22/be/src/scheduling/simple-scheduler-test.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/simple-scheduler-test.cc b/be/src/scheduling/simple-scheduler-test.cc
index 7116e21..d9964af 100644
--- a/be/src/scheduling/simple-scheduler-test.cc
+++ b/be/src/scheduling/simple-scheduler-test.cc
@@ -15,862 +15,16 @@
// specific language governing permissions and limitations
// under the License.
-#include <set>
-#include <vector>
-
-#include <boost/scoped_ptr.hpp>
-
#include "simple-scheduler.h"
+#include "common/logging.h"
+#include "simple-scheduler-test-util.h"
#include "testutil/gtest-util.h"
-#include "util/runtime-profile.h"
-
-#include "common/names.h"
using namespace impala;
-
-DECLARE_string(pool_conf_file);
+using namespace impala::test;
namespace impala {
-// Typedefs to make the rest of the code more readable.
-typedef string Hostname;
-typedef string IpAddr;
-typedef string TableName;
-
-/// Sample 'n' elements without replacement from the set [0..N-1].
-/// This is an implementation of "Algorithm R" by J. Vitter.
-void SampleN(int n, int N, vector<int>* out) {
- if (n == 0) return;
- DCHECK(n <= N);
- out->reserve(n);
- out->clear();
- for (int i = 0; i < n; ++i) out->push_back(i);
- for (int i = n; i < N; ++i) {
- // Accept element with probability n/i. Place at random position.
- int r = rand() % i;
- if (r < n) (*out)[r] = i;
- }
-}
-
-/// Sample a set of 'n' elements from 'in' without replacement and copy them to
-/// 'out'.
-template <typename T>
-void SampleNElements(int n, const vector<T>& in, vector<T>* out) {
- vector<int> idxs;
- SampleN(n, in.size(), &idxs);
- DCHECK_EQ(n, idxs.size());
- out->reserve(n);
- for (int idx: idxs) out->push_back(in[idx]);
-}
-
-/// Helper classes to be used by the scheduler tests.
-
-/// Overall testing approach: Each test builds a list of hosts and a plan, both to which
-/// elements can be added using various helper methods. Then scheduling can be tested
-/// by instantiating SchedulerWrapper and calling Compute(...). The result can be verified
-/// using a set of helper methods. There are also helper methods to modify the internal
-/// state of the scheduler between subsequent calls to SchedulerWrapper::Compute().
-///
-/// The model currently comes with some known limitations:
-///
-/// - Files map 1:1 to blocks and to scan ranges.
-/// - All files have the same size (1 block of 1M). Tables that differ in size can be
-/// expressed as having a different number of blocks.
-/// - We don't support multiple backends on a single host.
-/// - Ports are assigned to hosts automatically and are not configurable by the test.
-
-// TODO: Extend the model to support files with multiple blocks.
-// TODO: Test more methods of the scheduler.
-// TODO: Add support to add skewed table scans with multiple scan ranges: often there are
-// 3 replicas where there may be skew for 1 of the replicas (e.g. after a single
-// node insert) but the other 2 are random.
-// TODO: Make use of the metrics provided by the scheduler.
-// TODO: Add checks for MinNumAssignmentsPerHost() to all tests where applicable.
-// TODO: Add post-condition checks that have to hold for all successful scheduler runs.
-// TODO: Add possibility to explicitly specify the replica location per file.
-// TODO: Add methods to retrieve and verify generated file placements from plan.
-// TODO: Extend the model to specify a physical schema independently of a plan (ie,
-// tables/files, blocks, replicas and cached replicas exist independently of the
-// queries that run against them).
-
-/// File blocks store a list of all datanodes that have a replica of the block. When
-/// defining tables you can specify the desired replica placement among all available
-/// datanodes in the cluster.
-///
-/// - RANDOM means that any datanode can be picked.
-/// - LOCAL_ONLY means that only datanodes with a backend will be picked.
-/// - REMOTE_ONLY means that only datanodes without a backend will be picked.
-///
-/// Whether replicas will be cached or not is not determined by this value, but by
-/// additional function arguments when adding tables to the schema.
-enum class ReplicaPlacement {
- RANDOM,
- LOCAL_ONLY,
- REMOTE_ONLY,
-};
-
-/// Host model. Each host can have either a backend, a datanode, or both. To specify that
-/// a host should not act as a backend or datanode specify '-1' as the respective port.
-struct Host {
- Host(const Hostname& name, const IpAddr& ip, int be_port, int dn_port)
- : name(name), ip(ip), be_port(be_port), dn_port(dn_port) {}
- Hostname name;
- IpAddr ip;
- int be_port; // Backend port
- int dn_port; // Datanode port
-};
-
-/// A cluster stores a list of hosts and provides various methods to add hosts to the
-/// cluster. All hosts are guaranteed to have unique IP addresses and hostnames.
-class Cluster {
- public:
- /// Add a host and return the host's index. 'hostname' and 'ip' of the new host will be
- /// generated and are guaranteed to be unique.
- int AddHost(bool has_backend, bool has_datanode) {
- int host_idx = hosts_.size();
- int be_port = has_backend ? BACKEND_PORT : -1;
- int dn_port = has_datanode ? DATANODE_PORT : -1;
- IpAddr ip = HostIdxToIpAddr(host_idx);
- DCHECK(ip_to_idx_.find(ip) == ip_to_idx_.end());
- ip_to_idx_[ip] = host_idx;
- hosts_.push_back(Host(HostIdxToHostname(host_idx), ip, be_port, dn_port));
- // Add host to lists of backend indexes per type.
- if (has_backend) backend_host_idxs_.push_back(host_idx);
- if (has_datanode) {
- datanode_host_idxs_.push_back(host_idx);
- if (has_backend) {
- datanode_with_backend_host_idxs_.push_back(host_idx);
- } else {
- datanode_only_host_idxs_.push_back(host_idx);
- }
- }
- return host_idx;
- }
-
- /// Add a number of hosts with the same properties by repeatedly calling AddHost(..).
- void AddHosts(int num_hosts, bool has_backend, bool has_datanode) {
- for (int i = 0; i < num_hosts; ++i) AddHost(has_backend, has_datanode);
- }
-
- /// Convert a host index to a hostname.
- static Hostname HostIdxToHostname(int host_idx) {
- return HOSTNAME_PREFIX + std::to_string(host_idx);
- }
-
- /// Return the backend address (ip, port) for the host with index 'host_idx'.
- void GetBackendAddress(int host_idx, TNetworkAddress* addr) const {
- DCHECK_LT(host_idx, hosts_.size());
- addr->hostname = hosts_[host_idx].ip;
- addr->port = hosts_[host_idx].be_port;
- }
-
- const vector<Host>& hosts() const { return hosts_; }
- int NumHosts() const { return hosts_.size(); }
-
- /// These methods return lists of host indexes, grouped by their type, which can be used
- /// to draw samples of random sets of hosts.
- /// TODO: Think of a nicer abstraction to expose this information.
- const vector<int>& backend_host_idxs() const { return backend_host_idxs_; }
- const vector<int>& datanode_host_idxs() const { return datanode_host_idxs_; }
-
- const vector<int>& datanode_with_backend_host_idxs() const {
- return datanode_with_backend_host_idxs_;
- }
-
- const vector<int>& datanode_only_host_idxs() const { return datanode_only_host_idxs_; }
-
- private:
- /// Port for all backends.
- static const int BACKEND_PORT;
-
- /// Port for all datanodes.
- static const int DATANODE_PORT;
-
- /// Prefix for all generated hostnames.
- static const string HOSTNAME_PREFIX;
-
- /// First octet for all generated IP addresses.
- static const string IP_PREFIX;
-
- /// List of hosts in this cluster.
- vector<Host> hosts_;
-
- /// Lists of indexes of hosts, grouped by their type. The lists reference hosts in
- /// 'hosts_' by index and are used for random sampling.
- ///
- /// All hosts with a backend.
- vector<int> backend_host_idxs_;
- /// All hosts with a datanode.
- vector<int> datanode_host_idxs_;
- /// All hosts with a datanode and a backend.
- vector<int> datanode_with_backend_host_idxs_;
- /// All hosts with a datanode but no backend.
- vector<int> datanode_only_host_idxs_;
-
- /// Map from IP addresses to host indexes.
- unordered_map<IpAddr, int> ip_to_idx_;
-
- /// Convert a host index to an IP address. The host index must be smaller than 2^24 and
- /// will specify the lower 24 bits of the IPv4 address (the lower 3 octets).
- static IpAddr HostIdxToIpAddr(int host_idx) {
- DCHECK_LT(host_idx, (1 << 24));
- string suffix;
- for (int i = 0; i < 3; ++i) {
- suffix = "." + std::to_string(host_idx % 256) + suffix; // prepend
- host_idx /= 256;
- }
- DCHECK_EQ(0, host_idx);
- return IP_PREFIX + suffix;
- }
-};
-
-const int Cluster::BACKEND_PORT = 1000;
-const int Cluster::DATANODE_PORT = 2000;
-const string Cluster::HOSTNAME_PREFIX = "host_";
-const string Cluster::IP_PREFIX = "10";
-
-struct Block {
- /// By default all blocks are of the same size.
- int64_t length = DEFAULT_BLOCK_SIZE;
-
- /// Index into the cluster that owns the table that owns this block.
- vector<int> replica_host_idxs;
-
- /// Flag for each entry in replica_host_idxs whether it is a cached replica or not.
- vector<bool> replica_host_idx_is_cached;
-
- /// Default size for new blocks.
- static const int64_t DEFAULT_BLOCK_SIZE;
-};
-/// Default size for new blocks is 1MB.
-const int64_t Block::DEFAULT_BLOCK_SIZE = 1 << 20;
-
-struct Table {
- vector<Block> blocks;
-};
-
-class Schema {
- public:
- Schema(const Cluster& cluster) : cluster_(cluster) {}
-
- /// Add a table consisting of a single block to the schema with explicitly specified
- /// replica indexes for non-cached replicas and without any cached replicas. Replica
- /// indexes must refer to hosts in cluster_.hosts() by index.
- void AddSingleBlockTable(const TableName& table_name,
- const vector<int>& non_cached_replica_host_idxs) {
- AddSingleBlockTable(table_name, non_cached_replica_host_idxs, {});
- }
-
- /// Add a table consisting of a single block to the schema with explicitly specified
- /// replica indexes for both non-cached and cached replicas. Values in both lists must
- /// refer to hosts in cluster_.hosts() by index. Both lists must be disjoint, i.e., a
- /// replica can either be cached or not.
- void AddSingleBlockTable(const TableName& table_name,
- const vector<int>& non_cached_replica_host_idxs,
- const vector<int>& cached_replica_host_idxs) {
- DCHECK(tables_.find(table_name) == tables_.end());
- Block block;
- int num_replicas = non_cached_replica_host_idxs.size() +
- cached_replica_host_idxs.size();
- block.replica_host_idxs = non_cached_replica_host_idxs;
- block.replica_host_idxs.insert(block.replica_host_idxs.end(),
- cached_replica_host_idxs.begin(), cached_replica_host_idxs.end());
- // Initialize for non-cached replicas first.
- block.replica_host_idx_is_cached.resize(non_cached_replica_host_idxs.size(), false);
- // Fill up to final size for cached replicas.
- block.replica_host_idx_is_cached.insert(block.replica_host_idx_is_cached.end(),
- cached_replica_host_idxs.size(), true);
- DCHECK_EQ(block.replica_host_idxs.size(), block.replica_host_idx_is_cached.size());
- DCHECK_EQ(block.replica_host_idxs.size(), num_replicas);
- // Create table
- Table table;
- table.blocks.push_back(block);
- // Insert table
- tables_.emplace(table_name, table);
- }
-
- /// Add a table to the schema, selecting replica hosts according to the given replica
- /// placement preference. All replicas will be non-cached.
- void AddMultiBlockTable(const TableName& table_name, int num_blocks,
- ReplicaPlacement replica_placement, int num_replicas) {
- AddMultiBlockTable(table_name, num_blocks, replica_placement, num_replicas, 0);
- }
-
- /// Add a table to the schema, selecting replica hosts according to the given replica
- /// placement preference. After replica selection has been done, 'num_cached_replicas'
- /// of them are marked as cached.
- void AddMultiBlockTable(const TableName& table_name, int num_blocks,
- ReplicaPlacement replica_placement, int num_replicas, int num_cached_replicas) {
- DCHECK_GT(num_replicas, 0);
- DCHECK(num_cached_replicas <= num_replicas);
- Table table;
- for (int i = 0; i < num_blocks; ++i) {
- Block block;
- vector<int>& replica_idxs = block.replica_host_idxs;
-
- // Determine replica host indexes.
- switch (replica_placement) {
- case ReplicaPlacement::RANDOM:
- SampleNElements(num_replicas, cluster_.datanode_host_idxs(), &replica_idxs);
- break;
- case ReplicaPlacement::LOCAL_ONLY:
- DCHECK(num_replicas <= cluster_.datanode_with_backend_host_idxs().size());
- SampleNElements(num_replicas, cluster_.datanode_with_backend_host_idxs(),
- &replica_idxs);
- break;
- case ReplicaPlacement::REMOTE_ONLY:
- DCHECK(num_replicas <= cluster_.datanode_only_host_idxs().size());
- SampleNElements(num_replicas, cluster_.datanode_only_host_idxs(),
- &replica_idxs);
- break;
- default:
- DCHECK(false) << "Unsupported replica placement: "
- << (int)replica_placement;
- }
-
- // Determine cached replicas.
- vector<int> cached_replicas;
- vector<bool>& is_cached = block.replica_host_idx_is_cached;
- is_cached.resize(num_replicas, false);
- SampleN(num_cached_replicas, num_replicas, &cached_replicas);
- // Flag cached entries.
- for (const int idx: cached_replicas) is_cached[idx] = true;
-
- DCHECK_EQ(replica_idxs.size(), is_cached.size());
- table.blocks.push_back(block);
- }
- // Insert table
- tables_[table_name] = table;
- }
-
- const Table& GetTable(const TableName& table_name) const {
- auto it = tables_.find(table_name);
- DCHECK(it != tables_.end());
- return it->second;
- }
-
- const Cluster& cluster() const { return cluster_; }
-
- private:
- /// Store a reference to the cluster, from which hosts are sampled. Test results will
- /// use the cluster to resolve host indexes to hostnames and IP addresses.
- const Cluster& cluster_;
-
- unordered_map<TableName, Table> tables_;
-};
-
-/// Plan model. A plan contains a list of tables to scan and the query options to be used
-/// during scheduling.
-class Plan {
- public:
- Plan(const Schema& schema) : schema_(schema) {}
-
- const TQueryOptions& query_options() const { return query_options_; }
-
- void SetReplicaPreference(TReplicaPreference::type p) {
- query_options_.replica_preference = p;
- }
-
- void SetRandomReplica(bool b) { query_options_.schedule_random_replica = b; }
- void SetDisableCachedReads(bool b) { query_options_.disable_cached_reads = b; }
- const Cluster& cluster() const { return schema_.cluster(); }
-
- const vector<TNetworkAddress>& referenced_datanodes() const {
- return referenced_datanodes_;
- }
-
- const vector<TScanRangeLocations>& scan_range_locations() const {
- return scan_range_locations_;
- }
-
- /// Add a scan of table 'table_name' to the plan. This method will populate the internal
- /// list of TScanRangeLocations and can be called multiple times for the same table to
- /// schedule additional scans.
- void AddTableScan(const TableName& table_name) {
- const Table& table = schema_.GetTable(table_name);
- const vector<Block>& blocks = table.blocks;
- for (int i = 0; i < blocks.size(); ++i) {
- const Block& block = blocks[i];
- TScanRangeLocations scan_range_locations;
- BuildTScanRangeLocations(table_name, block, i, &scan_range_locations);
- scan_range_locations_.push_back(scan_range_locations);
- }
- }
-
- private:
- /// Store a reference to the schema, from which scanned tables will be read.
- const Schema& schema_;
-
- TQueryOptions query_options_;
-
- /// List of all datanodes that are referenced by this plan. Only hosts that have an
- /// assigned scan range are added here.
- vector<TNetworkAddress> referenced_datanodes_;
-
- /// Map from plan host index to an index in 'referenced_datanodes_'.
- boost::unordered_map<int, int> host_idx_to_datanode_idx_;
-
- /// List of all scan range locations, which can be passed to the SimpleScheduler.
- vector<TScanRangeLocations> scan_range_locations_;
-
- /// Initialize a TScanRangeLocations object in place.
- void BuildTScanRangeLocations(const TableName& table_name, const Block& block,
- int block_idx, TScanRangeLocations* scan_range_locations) {
- const vector<int>& replica_idxs = block.replica_host_idxs;
- const vector<bool>& is_cached = block.replica_host_idx_is_cached;
- DCHECK_EQ(replica_idxs.size(), is_cached.size());
- int num_replicas = replica_idxs.size();
- BuildScanRange(table_name, block, block_idx, &scan_range_locations->scan_range);
- scan_range_locations->locations.resize(num_replicas);
- for (int i = 0; i < num_replicas; ++i) {
- TScanRangeLocation& location = scan_range_locations->locations[i];
- location.host_idx = FindOrInsertDatanodeIndex(replica_idxs[i]);
- location.__set_is_cached(is_cached[i]);
- }
- }
-
- void BuildScanRange(const TableName& table_name, const Block& block, int block_idx,
- TScanRange* scan_range) {
- // Initialize locations.scan_range correctly.
- THdfsFileSplit file_split;
- // 'length' is the only member considered by the scheduler.
- file_split.length = block.length;
- // Encoding the table name and block index in the file helps debugging.
- file_split.file_name = table_name + "_block_" + std::to_string(block_idx);
- file_split.offset = 0;
- file_split.partition_id = 0;
- // For now, we model each file by a single block.
- file_split.file_length = block.length;
- file_split.file_compression = THdfsCompression::NONE;
- file_split.mtime = 1;
- scan_range->__set_hdfs_file_split(file_split);
- }
-
- /// Look up the plan-local host index of 'cluster_datanode_idx'. If the host has not
- /// been added to the plan before, it will add it to 'referenced_datanodes_' and return
- /// the new index.
- int FindOrInsertDatanodeIndex(int cluster_datanode_idx) {
- const Host& host = schema_.cluster().hosts()[cluster_datanode_idx];
- auto ret = host_idx_to_datanode_idx_.emplace(
- cluster_datanode_idx, referenced_datanodes_.size());
- bool inserted_new_element = ret.second;
- if (inserted_new_element) {
- TNetworkAddress datanode;
- datanode.hostname = host.ip;
- datanode.port = host.dn_port;
- referenced_datanodes_.push_back(datanode);
- }
- return ret.first->second;
- }
-};
-
-class Result {
- private:
- /// Map to count the number of assignments per backend.
- typedef unordered_map<IpAddr, int> NumAssignmentsPerBackend;
-
- /// Map to count the number of assigned bytes per backend.
- typedef unordered_map<IpAddr, int64_t> NumAssignedBytesPerBackend;
-
- /// Parameter type for callbacks, which are used to filter scheduling results.
- struct AssignmentInfo {
- const TNetworkAddress& addr;
- const THdfsFileSplit& hdfs_file_split;
- bool is_cached;
- bool is_remote;
- };
-
- /// These functions are used as callbacks when processing the scheduling result. They
- /// will be called once per assigned scan range.
- typedef std::function<bool(const AssignmentInfo& assignment)> AssignmentFilter;
- typedef std::function<void(const AssignmentInfo& assignment)> AssignmentCallback;
-
- public:
- Result(const Plan& plan) : plan_(plan) {}
-
- /// Return the total number of scheduled assignments.
- int NumTotalAssignments() const { return CountAssignmentsIf(Any()); }
-
- /// Return the total number of assigned bytes.
- int NumTotalAssignedBytes() const { return CountAssignedBytesIf(Any()); }
-
- /// Return the number of scheduled assignments for a single host.
- int NumTotalAssignments(int host_idx) const {
- return CountAssignmentsIf(IsHost(host_idx));
- }
-
- /// Return the number of assigned bytes for a single host.
- int NumTotalAssignedBytes(int host_idx) const {
- return CountAssignedBytesIf(IsHost(host_idx));
- }
-
- /// Return the total number of assigned cached reads.
- int NumCachedAssignments() const { return CountAssignmentsIf(IsCached(Any())); }
-
- /// Return the total number of assigned bytes for cached reads.
- int NumCachedAssignedBytes() const { return CountAssignedBytesIf(IsCached(Any())); }
-
- /// Return the total number of assigned cached reads for a single host.
- int NumCachedAssignments(int host_idx) const {
- return CountAssignmentsIf(IsCached(IsHost(host_idx)));
- }
-
- /// Return the total number of assigned bytes for cached reads for a single host.
- int NumCachedAssignedBytes(int host_idx) const {
- return CountAssignedBytesIf(IsCached(IsHost(host_idx)));
- }
-
- /// Return the total number of assigned non-cached reads.
- int NumDiskAssignments() const { return CountAssignmentsIf(IsDisk(Any())); }
-
- /// Return the total number of assigned bytes for non-cached reads.
- int NumDiskAssignedBytes() const { return CountAssignedBytesIf(IsDisk(Any())); }
-
- /// Return the total number of assigned non-cached reads for a single host.
- int NumDiskAssignments(int host_idx) const {
- return CountAssignmentsIf(IsDisk(IsHost(host_idx)));
- }
-
- /// Return the total number of assigned bytes for non-cached reads for a single host.
- int NumDiskAssignedBytes(int host_idx) const {
- return CountAssignedBytesIf(IsDisk(IsHost(host_idx)));
- }
-
- /// Return the total number of assigned remote reads.
- int NumRemoteAssignments() const { return CountAssignmentsIf(IsRemote(Any())); }
-
- /// Return the total number of assigned bytes for remote reads.
- int NumRemoteAssignedBytes() const { return CountAssignedBytesIf(IsRemote(Any())); }
-
- /// Return the total number of assigned remote reads for a single host.
- int NumRemoteAssignments(int host_idx) const {
- return CountAssignmentsIf(IsRemote(IsHost(host_idx)));
- }
-
- /// Return the total number of assigned bytes for remote reads for a single host.
- int NumRemoteAssignedBytes(int host_idx) const {
- return CountAssignedBytesIf(IsRemote(IsHost(host_idx)));
- }
-
- /// Return the maximum number of assigned reads over all hosts.
- int MaxNumAssignmentsPerHost() const {
- NumAssignmentsPerBackend num_assignments_per_backend;
- CountAssignmentsPerBackend(&num_assignments_per_backend);
- int max_count = 0;
- for (const auto& elem: num_assignments_per_backend) {
- max_count = max(max_count, elem.second);
- }
- return max_count;
- }
-
- /// Return the maximum number of assigned reads over all hosts.
- int64_t MaxNumAssignedBytesPerHost() const {
- NumAssignedBytesPerBackend num_assigned_bytes_per_backend;
- CountAssignedBytesPerBackend(&num_assigned_bytes_per_backend);
- int64_t max_assigned_bytes = 0;
- for (const auto& elem: num_assigned_bytes_per_backend) {
- max_assigned_bytes = max(max_assigned_bytes, elem.second);
- }
- return max_assigned_bytes;
- }
-
- /// Return the minimum number of assigned reads over all hosts.
- /// NOTE: This is computed by traversing all recorded assignments and thus will not
- /// consider hosts without any assignments. Hence the minimum value to expect is 1 (not
- /// 0).
- int MinNumAssignmentsPerHost() const {
- NumAssignmentsPerBackend num_assignments_per_backend;
- CountAssignmentsPerBackend(&num_assignments_per_backend);
- int min_count = numeric_limits<int>::max();
- for (const auto& elem: num_assignments_per_backend) {
- min_count = min(min_count, elem.second);
- }
- DCHECK_GT(min_count, 0);
- return min_count;
- }
-
- /// Return the minimum number of assigned bytes over all hosts.
- /// NOTE: This is computed by traversing all recorded assignments and thus will not
- /// consider hosts without any assignments. Hence the minimum value to expect is 1 (not
- /// 0).
- int64_t MinNumAssignedBytesPerHost() const {
- NumAssignedBytesPerBackend num_assigned_bytes_per_backend;
- CountAssignedBytesPerBackend(&num_assigned_bytes_per_backend);
- int64_t min_assigned_bytes = 0;
- for (const auto& elem: num_assigned_bytes_per_backend) {
- min_assigned_bytes = max(min_assigned_bytes, elem.second);
- }
- DCHECK_GT(min_assigned_bytes, 0);
- return min_assigned_bytes;
- }
-
- /// Return the number of scan range assignments stored in this result.
- int NumAssignments() const { return assignments_.size(); }
-
- /// Return the number of distinct backends that have been picked by the scheduler so
- /// far.
- int NumDistinctBackends() const {
- unordered_set<IpAddr> backends;
- AssignmentCallback cb = [&backends](const AssignmentInfo& assignment) {
- backends.insert(assignment.addr.hostname);
- };
- ProcessAssignments(cb);
- return backends.size();
- }
-
- /// Return the full assignment for manual matching.
- const FragmentScanRangeAssignment& GetAssignment(int index = 0) const {
- DCHECK_GT(assignments_.size(), index);
- return assignments_[index];
- }
-
- /// Add an assignment to the result and return a reference, which can then be passed on
- /// to the scheduler.
- FragmentScanRangeAssignment* AddAssignment() {
- assignments_.push_back(FragmentScanRangeAssignment());
- return &assignments_.back();
- }
-
- /// Reset the result to an empty state.
- void Reset() { assignments_.clear(); }
-
- private:
- /// Vector to store results of consecutive scheduler runs.
- vector<FragmentScanRangeAssignment> assignments_;
-
- /// Reference to the plan, needed to look up hosts.
- const Plan& plan_;
-
- /// Dummy filter matching any assignment.
- AssignmentFilter Any() const {
- return [](const AssignmentInfo& assignment) { return true; };
- }
-
- /// Filter to only match assignments of a particular host.
- AssignmentFilter IsHost(int host_idx) const {
- TNetworkAddress expected_addr;
- plan_.cluster().GetBackendAddress(host_idx, &expected_addr);
- return [expected_addr](const AssignmentInfo& assignment) {
- return assignment.addr == expected_addr;
- };
- }
-
- /// Filter to only match assignments of cached reads.
- AssignmentFilter IsCached(AssignmentFilter filter) const {
- return [filter](const AssignmentInfo& assignment) {
- return filter(assignment) && assignment.is_cached;
- };
- }
-
- /// Filter to only match assignments of non-cached, local disk reads.
- AssignmentFilter IsDisk(AssignmentFilter filter) const {
- return [filter](const AssignmentInfo& assignment) {
- return filter(assignment) && !assignment.is_cached && !assignment.is_remote;
- };
- }
-
- /// Filter to only match assignments of remote reads.
- AssignmentFilter IsRemote(AssignmentFilter filter) const {
- return [filter](const AssignmentInfo& assignment) {
- return filter(assignment) && assignment.is_remote;
- };
- }
-
- /// Process all recorded assignments and call the supplied callback on each tuple of IP
- /// address and scan_range it iterates over.
- void ProcessAssignments(const AssignmentCallback& cb) const {
- for (const FragmentScanRangeAssignment& assignment: assignments_) {
- for (const auto& assignment_elem: assignment) {
- const TNetworkAddress& addr = assignment_elem.first;
- const PerNodeScanRanges& per_node_ranges = assignment_elem.second;
- for (const auto& per_node_ranges_elem: per_node_ranges) {
- const vector<TScanRangeParams> scan_range_params_vector =
- per_node_ranges_elem.second;
- for (const TScanRangeParams& scan_range_params: scan_range_params_vector) {
- const TScanRange& scan_range = scan_range_params.scan_range;
- DCHECK(scan_range.__isset.hdfs_file_split);
- const THdfsFileSplit& hdfs_file_split = scan_range.hdfs_file_split;
- bool is_cached = scan_range_params.__isset.is_cached
- ? scan_range_params.is_cached : false;
- bool is_remote = scan_range_params.__isset.is_remote
- ? scan_range_params.is_remote : false;
- cb({addr, hdfs_file_split, is_cached, is_remote});
- }
- }
- }
- }
- }
-
- /// Count all assignments matching the supplied filter callback.
- int CountAssignmentsIf(const AssignmentFilter& filter) const {
- int count = 0;
- AssignmentCallback cb = [&count, filter](const AssignmentInfo& assignment) {
- if (filter(assignment)) ++count;
- };
- ProcessAssignments(cb);
- return count;
- }
-
- /// Count all assignments matching the supplied filter callback.
- int64_t CountAssignedBytesIf(const AssignmentFilter& filter) const {
- int64_t assigned_bytes = 0;
- AssignmentCallback cb = [&assigned_bytes, filter](const AssignmentInfo& assignment) {
- if (filter(assignment)) assigned_bytes += assignment.hdfs_file_split.length;
- };
- ProcessAssignments(cb);
- return assigned_bytes;
- }
-
- /// Create a map containing the number of assigned scan ranges per node.
- void CountAssignmentsPerBackend(
- NumAssignmentsPerBackend* num_assignments_per_backend) const {
- AssignmentCallback cb = [&num_assignments_per_backend](
- const AssignmentInfo& assignment) {
- ++(*num_assignments_per_backend)[assignment.addr.hostname];
- };
- ProcessAssignments(cb);
- }
-
- /// Create a map containing the number of assigned bytes per node.
- void CountAssignedBytesPerBackend(
- NumAssignedBytesPerBackend* num_assignments_per_backend) const {
- AssignmentCallback cb = [&num_assignments_per_backend](
- const AssignmentInfo& assignment) {
- (*num_assignments_per_backend)[assignment.addr.hostname] +=
- assignment.hdfs_file_split.length;
- };
- ProcessAssignments(cb);
- }
-};
-
-/// This class wraps the SimpleScheduler and provides helper for easier instrumentation
-/// during tests.
-class SchedulerWrapper {
- public:
- SchedulerWrapper(const Plan& plan) : plan_(plan), metrics_("TestMetrics") {
- InitializeScheduler();
- }
-
- /// Call ComputeScanRangeAssignment() with exec_at_coord set to false.
- void Compute(Result* result) {
- Compute(false, result);
- }
-
- /// Call ComputeScanRangeAssignment().
- void Compute(bool exec_at_coord, Result* result) {
- DCHECK(scheduler_ != NULL);
-
- // Compute Assignment.
- FragmentScanRangeAssignment* assignment = result->AddAssignment();
- scheduler_->ComputeScanRangeAssignment(*scheduler_->GetBackendConfig(), 0, NULL,
- false, plan_.scan_range_locations(), plan_.referenced_datanodes(), exec_at_coord,
- plan_.query_options(), NULL, assignment);
- }
-
- /// Reset the state of the scheduler by re-creating and initializing it.
- void Reset() { InitializeScheduler(); }
-
- /// Methods to modify the internal lists of backends maintained by the scheduler.
-
- /// Add a backend to the scheduler.
- void AddBackend(const Host& host) {
- // Add to topic delta
- TTopicDelta delta;
- delta.topic_name = SimpleScheduler::IMPALA_MEMBERSHIP_TOPIC;
- delta.is_delta = true;
- AddHostToTopicDelta(host, &delta);
- SendTopicDelta(delta);
- }
-
- /// Remove a backend from the scheduler.
- void RemoveBackend(const Host& host) {
- // Add deletion to topic delta
- TTopicDelta delta;
- delta.topic_name = SimpleScheduler::IMPALA_MEMBERSHIP_TOPIC;
- delta.is_delta = true;
- delta.topic_deletions.push_back(host.ip);
- SendTopicDelta(delta);
- }
-
- /// Send a full map of the backends to the scheduler instead of deltas.
- void SendFullMembershipMap() {
- TTopicDelta delta;
- delta.topic_name = SimpleScheduler::IMPALA_MEMBERSHIP_TOPIC;
- delta.is_delta = false;
- for (const Host& host: plan_.cluster().hosts()) {
- if (host.be_port >= 0) AddHostToTopicDelta(host, &delta);
- }
- SendTopicDelta(delta);
- }
-
- /// Send an empty update message to the scheduler.
- void SendEmptyUpdate() {
- TTopicDelta delta;
- delta.topic_name = SimpleScheduler::IMPALA_MEMBERSHIP_TOPIC;
- delta.is_delta = true;
- SendTopicDelta(delta);
- }
-
- private:
- const Plan& plan_;
- boost::scoped_ptr<SimpleScheduler> scheduler_;
- MetricGroup metrics_;
-
- /// Initialize the internal scheduler object. The method uses the 'real' constructor
- /// used in the rest of the codebase, in contrast to the one that takes a list of
- /// backends, which is only used for testing purposes. This allows us to properly
- /// initialize the scheduler and exercise the UpdateMembership() method in tests.
- void InitializeScheduler() {
- DCHECK(scheduler_ == NULL);
- DCHECK_GT(plan_.cluster().NumHosts(), 0) << "Cannot initialize scheduler with 0 "
- << "hosts.";
- const Host& scheduler_host = plan_.cluster().hosts()[0];
- string scheduler_backend_id = scheduler_host.ip;
- TNetworkAddress scheduler_backend_address;
- scheduler_backend_address.hostname = scheduler_host.ip;
- scheduler_backend_address.port = scheduler_host.be_port;
-
- scheduler_.reset(new SimpleScheduler(
- NULL, scheduler_backend_id, scheduler_backend_address, &metrics_, NULL, NULL));
- scheduler_->Init();
- // Initialize the scheduler backend maps.
- SendFullMembershipMap();
- }
-
- /// Add a single host to the given TTopicDelta.
- void AddHostToTopicDelta(const Host& host, TTopicDelta* delta) const {
- DCHECK_GT(host.be_port, 0) << "Host cannot be added to scheduler without a running "
- << "backend";
- // Build backend descriptor.
- TBackendDescriptor be_desc;
- be_desc.address.hostname = host.ip;
- be_desc.address.port = host.be_port;
- be_desc.ip_address = host.ip;
-
- // Build topic item.
- TTopicItem item;
- item.key = host.ip;
- ThriftSerializer serializer(false);
- Status status = serializer.Serialize(&be_desc, &item.value);
- DCHECK(status.ok());
-
- // Add to topic delta.
- delta->topic_entries.push_back(item);
- }
-
- /// Send the given topic delta to the scheduler.
- void SendTopicDelta(const TTopicDelta& delta) {
- DCHECK(scheduler_ != NULL);
- // Wrap in topic delta map.
- StatestoreSubscriber::TopicDeltaMap delta_map;
- delta_map.emplace(SimpleScheduler::IMPALA_MEMBERSHIP_TOPIC, delta);
-
- // Send to the scheduler.
- vector<TTopicDelta> dummy_result;
- scheduler_->UpdateMembership(delta_map, &dummy_result);
- }
-};
-
class SchedulerTest : public testing::Test {
protected:
SchedulerTest() { srand(0); };
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d76a2b22/be/src/scheduling/simple-scheduler.h
----------------------------------------------------------------------
diff --git a/be/src/scheduling/simple-scheduler.h b/be/src/scheduling/simple-scheduler.h
index 8c96dde..9b51269 100644
--- a/be/src/scheduling/simple-scheduler.h
+++ b/be/src/scheduling/simple-scheduler.h
@@ -42,7 +42,9 @@ namespace impala {
class Coordinator;
+namespace test {
class SchedulerWrapper;
+}
/// Performs simple scheduling by matching between a list of backends configured
/// either from the statestore, or from a static list of addresses, and a list
@@ -434,7 +436,7 @@ class SimpleScheduler : public Scheduler {
int FindSenderFragment(TPlanNodeId exch_id, int fragment_idx,
const TQueryExecRequest& exec_request);
- friend class impala::SchedulerWrapper;
+ friend class impala::test::SchedulerWrapper;
FRIEND_TEST(SimpleAssignmentTest, ComputeAssignmentDeterministicNonCached);
FRIEND_TEST(SimpleAssignmentTest, ComputeAssignmentRandomNonCached);
FRIEND_TEST(SimpleAssignmentTest, ComputeAssignmentRandomDiskLocal);
[2/2] incubator-impala git commit: IMPALA-4037,
IMPALA-4038: fix locking during query cancellation
Posted by ta...@apache.org.
IMPALA-4037,IMPALA-4038: fix locking during query cancellation
* Refactor the child query handling out of QueryExecState and clarify
locking rules.
* Avoid holding QueryExecState::lock_ while calling
Coordinator::Cancel() or ChildQuery::Cancel(), which can both do RPCs
or acquire ImpalaServer::query_exec_state_map_lock_.
* Fix a potential race between QueryExecState::Exec() and
QueryExecState::Cancel() where the cancelling thread did an unlocked
read of the 'coordinator_' field and may not have cancelled the
coordinator.
Testing:
Ran exhaustive build, ran local stress test for a bit.
Change-Id: Ibe3024803e03595ee69c47759b58e8443d7bd167
Reviewed-on: http://gerrit.cloudera.org:8080/4163
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Internal Jenkins
Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/0d0c93ec
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/0d0c93ec
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/0d0c93ec
Branch: refs/heads/master
Commit: 0d0c93ec8c4949940ec113192731f2adb66a0c5e
Parents: d76a2b2
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Mon Aug 29 11:10:38 2016 -0700
Committer: Internal Jenkins <cl...@gerrit.cloudera.org>
Committed: Thu Sep 22 10:07:52 2016 +0000
----------------------------------------------------------------------
be/src/runtime/coordinator.h | 4 +-
be/src/service/child-query.cc | 69 ++++++++++++++++
be/src/service/child-query.h | 66 +++++++++++++++
be/src/service/impala-server.cc | 13 +--
be/src/service/query-exec-state.cc | 125 ++++++++++++++++-------------
be/src/service/query-exec-state.h | 49 +++++------
tests/query_test/test_cancellation.py | 5 --
7 files changed, 227 insertions(+), 104 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0d0c93ec/be/src/runtime/coordinator.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h
index 0e0b6d5..617ccb9 100644
--- a/be/src/runtime/coordinator.h
+++ b/be/src/runtime/coordinator.h
@@ -246,7 +246,9 @@ class Coordinator {
/// Keeps track of number of completed ranges and total scan ranges.
ProgressUpdater progress_;
- /// protects all fields below
+ /// Protects all fields below. This is held while making RPCs, so this lock should
+ /// only be acquired if the acquiring thread is prepared to wait for a significant
+ /// time.
boost::mutex lock_;
/// Overall status of the entire query; set to the first reported fragment error
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0d0c93ec/be/src/service/child-query.cc
----------------------------------------------------------------------
diff --git a/be/src/service/child-query.cc b/be/src/service/child-query.cc
index 99eb4fe..5b68de4 100644
--- a/be/src/service/child-query.cc
+++ b/be/src/service/child-query.cc
@@ -146,4 +146,73 @@ Status ChildQuery::IsCancelled() {
return Status::CANCELLED;
}
+ChildQueryExecutor::ChildQueryExecutor() : is_cancelled_(false), is_running_(false) {}
+ChildQueryExecutor::~ChildQueryExecutor() {
+ DCHECK(!is_running_);
+}
+
+void ChildQueryExecutor::ExecAsync(vector<ChildQuery>&& child_queries) {
+ DCHECK(!child_queries.empty());
+ lock_guard<SpinLock> lock(lock_);
+ DCHECK(child_queries_.empty());
+ DCHECK(child_queries_thread_.get() == NULL);
+ if (is_cancelled_) return;
+ child_queries_ = move(child_queries);
+ child_queries_thread_.reset(new Thread("query-exec-state", "async child queries",
+ bind(&ChildQueryExecutor::ExecChildQueries, this)));
+ is_running_ = true;
+}
+
+void ChildQueryExecutor::ExecChildQueries() {
+ for (ChildQuery& child_query : child_queries_) {
+ // Execute without holding 'lock_'.
+ Status status = child_query.ExecAndFetch();
+ if (!status.ok()) {
+ lock_guard<SpinLock> lock(lock_);
+ child_queries_status_ = status;
+ break;
+ }
+ }
+
+ {
+ lock_guard<SpinLock> lock(lock_);
+ is_running_ = false;
+ }
+}
+
+Status ChildQueryExecutor::WaitForAll(vector<ChildQuery*>* completed_queries) {
+ // Safe to read without lock since we don't call this concurrently with ExecAsync().
+ if (child_queries_thread_ == NULL) {
+ DCHECK(!is_running_);
+ return Status::OK();
+ }
+ child_queries_thread_->Join();
+
+ // Safe to read below fields without 'lock_' because they are immutable after the
+ // thread finishes.
+ RETURN_IF_ERROR(child_queries_status_);
+ for (ChildQuery& child_query : child_queries_) {
+ completed_queries->push_back(&child_query);
+ }
+ return Status::OK();
+}
+
+void ChildQueryExecutor::Cancel() {
+ {
+ lock_guard<SpinLock> l(lock_);
+ // Prevent more child queries from starting. After this critical section,
+ // 'child_queries_' will not be modified.
+ is_cancelled_ = true;
+ if (!is_running_) return;
+ DCHECK_EQ(child_queries_thread_ == NULL, child_queries_.empty());
+ }
+
+ // Cancel child queries without holding 'lock_'.
+ // Safe because 'child_queries_' and 'child_queries_thread_' are immutable after
+ // cancellation.
+ for (ChildQuery& child_query : child_queries_) {
+ child_query.Cancel();
+ }
+ child_queries_thread_->Join();
+}
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0d0c93ec/be/src/service/child-query.h
----------------------------------------------------------------------
diff --git a/be/src/service/child-query.h b/be/src/service/child-query.h
index 2fdcc5a..1c7a20e 100644
--- a/be/src/service/child-query.h
+++ b/be/src/service/child-query.h
@@ -142,6 +142,72 @@ class ChildQuery {
bool is_cancelled_;
};
+/// Asynchronously executes a set of child queries in a separate thread.
+///
+/// ExecAsync() is called at most once per executor to execute a set of child queries
+/// asynchronously. After ExecAsync() is called, either WaitForAll() or Cancel() must be
+/// called to ensure that the child queries are no longer executing before destroying the
+/// object.
+class ChildQueryExecutor {
+ public:
+ ChildQueryExecutor();
+ ~ChildQueryExecutor();
+
+ /// Asynchronously executes 'child_queries' one by one in a new thread. 'child_queries'
+ /// must be non-empty. May clear or modify the 'child_queries' arg. Can only be called
+ /// once. Does nothing if Cancel() was already called.
+ void ExecAsync(std::vector<ChildQuery>&& child_queries);
+
+ /// Waits for all child queries to complete successfully or with an error. Returns a
+ /// non-OK status if a child query fails. Returns OK if ExecAsync() was not called,
+ /// Cancel() was called before an error occurred, or if all child queries finished
+ /// successfully. If returning OK, populates 'completed_queries' with the completed
+ /// queries. Any returned ChildQueries remain owned by the executor. Should not be
+ /// called concurrently with ExecAsync(). After WaitForAll() returns, the object can
+ /// safely be destroyed.
+ Status WaitForAll(std::vector<ChildQuery*>* completed_queries);
+
+ /// Cancels all child queries and prevents any more from starting. Returns once all
+ /// child queries are cancelled, after which the object can safely be destroyed. Can
+ /// be safely called concurrently with ExecAsync() or WaitForAll().
+ void Cancel();
+
+ private:
+ /// Serially executes the queries in child_queries_ by calling the child query's
+ /// ExecAndWait(). This function blocks until all queries complete and is run
+ /// in 'child_queries_thread_'.
+ /// Sets 'child_queries_status_'.
+ void ExecChildQueries();
+
+ /// Protects all fields below.
+ /// Should not be held at the same time as 'ChildQuery::lock_'.
+ SpinLock lock_;
+
+ /// True if cancellation of child queries has been initiated and no more child queries
+ /// should be started.
+ bool is_cancelled_;
+
+ /// True if 'child_queries_thread_' is in the process of executing child queries.
+ /// Set to false by 'child_queries_thread_' just before it exits. 'is_running_' must
+ /// be false when ChildQueryExecutor is destroyed: once execution is started,
+ /// WaitForAll() or Cancel() must be called to ensure the thread exits.
+ bool is_running_;
+
+ /// List of child queries to be executed. Not modified after it is initially populated,
+ /// so safe to read without holding 'lock_' if 'is_running_' or 'is_cancelled_' is
+ /// true, or 'child_queries_thread_' is non-NULL.
+ std::vector<ChildQuery> child_queries_;
+
+ /// Thread to execute 'child_queries_' in. Immutable after the first time it is set or
+ /// after 'is_cancelled_' is true.
+ boost::scoped_ptr<Thread> child_queries_thread_;
+
+ /// The status of the child queries. The status is OK iff all child queries complete
+ /// successfully. Otherwise, status contains the error of the first child query that
+ /// failed (child queries are executed serially and abort on the first error).
+ /// Immutable after 'child_queries_thread_' exits
+ Status child_queries_status_;
+};
}
#endif
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0d0c93ec/be/src/service/impala-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 1b10aec..d5fd59a 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -995,18 +995,9 @@ Status ImpalaServer::UpdateCatalogMetrics() {
Status ImpalaServer::CancelInternal(const TUniqueId& query_id, bool check_inflight,
const Status* cause) {
VLOG_QUERY << "Cancel(): query_id=" << PrintId(query_id);
- shared_ptr<QueryExecState> exec_state = GetQueryExecState(query_id, true);
+ shared_ptr<QueryExecState> exec_state = GetQueryExecState(query_id, false);
if (exec_state == NULL) return Status("Invalid or unknown query handle");
- lock_guard<mutex> l(*exec_state->lock(), adopt_lock_t());
- if (check_inflight) {
- lock_guard<mutex> l2(exec_state->session()->lock);
- if (exec_state->session()->inflight_queries.find(query_id) ==
- exec_state->session()->inflight_queries.end()) {
- return Status("Query not yet running");
- }
- }
- // TODO: can we call Coordinator::Cancel() here while holding lock?
- exec_state->Cancel(cause);
+ exec_state->Cancel(check_inflight, cause);
return Status::OK();
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0d0c93ec/be/src/service/query-exec-state.cc
----------------------------------------------------------------------
diff --git a/be/src/service/query-exec-state.cc b/be/src/service/query-exec-state.cc
index eb710fb..9b2fc88 100644
--- a/be/src/service/query-exec-state.cc
+++ b/be/src/service/query-exec-state.cc
@@ -64,6 +64,7 @@ ImpalaServer::QueryExecState::QueryExecState(
: query_ctx_(query_ctx),
last_active_time_(numeric_limits<int64_t>::max()),
ref_count_(0L),
+ child_query_executor_(new ChildQueryExecutor),
exec_env_(exec_env),
is_block_on_wait_joining_(false),
session_(session),
@@ -73,6 +74,7 @@ ImpalaServer::QueryExecState::QueryExecState(
profile_(&profile_pool_, "Query"), // assign name w/ id after planning
server_profile_(&profile_pool_, "ImpalaServer"),
summary_profile_(&profile_pool_, "Summary"),
+ is_cancelled_(false),
eos_(false),
query_state_(beeswax::QueryState::CREATED),
current_batch_(NULL),
@@ -426,9 +428,14 @@ Status ImpalaServer::QueryExecState::ExecQueryOrDmlRequest(
query_exec_request.fragments[0].partition.type == TPartitionType::UNPARTITIONED;
DCHECK(has_coordinator_fragment || query_exec_request.__isset.desc_tbl);
- schedule_.reset(new QuerySchedule(query_id(), query_exec_request,
- exec_request_.query_options, &summary_profile_, query_events_));
- coord_.reset(new Coordinator(exec_request_.query_options, exec_env_, query_events_));
+ {
+ lock_guard<mutex> l(lock_);
+ // Don't start executing the query if Cancel() was called concurrently with Exec().
+ if (is_cancelled_) return Status::CANCELLED;
+ schedule_.reset(new QuerySchedule(query_id(), query_exec_request,
+ exec_request_.query_options, &summary_profile_, query_events_));
+ coord_.reset(new Coordinator(exec_request_.query_options, exec_env_, query_events_));
+ }
Status status = exec_env_->scheduler()->Schedule(coord_.get(), schedule_.get());
{
@@ -462,15 +469,17 @@ Status ImpalaServer::QueryExecState::ExecDdlRequest() {
TComputeStatsParams& compute_stats_params =
exec_request_.catalog_op_request.ddl_params.compute_stats_params;
// Add child queries for computing table and column stats.
+ vector<ChildQuery> child_queries;
if (compute_stats_params.__isset.tbl_stats_query) {
- child_queries_.push_back(
+ child_queries.push_back(
ChildQuery(compute_stats_params.tbl_stats_query, this, parent_server_));
}
if (compute_stats_params.__isset.col_stats_query) {
- child_queries_.push_back(
+ child_queries.push_back(
ChildQuery(compute_stats_params.col_stats_query, this, parent_server_));
}
- if (child_queries_.size() > 0) ExecChildQueriesAsync();
+
+ if (child_queries.size() > 0) child_query_executor_->ExecAsync(move(child_queries));
return Status::OK();
}
@@ -600,7 +609,15 @@ Status ImpalaServer::QueryExecState::WaitInternal() {
return Status::OK();
}
- RETURN_IF_ERROR(WaitForChildQueries());
+ vector<ChildQuery*> child_queries;
+ Status child_queries_status = child_query_executor_->WaitForAll(&child_queries);
+ {
+ lock_guard<mutex> l(lock_);
+ RETURN_IF_ERROR(query_status_);
+ RETURN_IF_ERROR(UpdateQueryStatus(child_queries_status));
+ }
+ query_events_->MarkEvent("Child queries finished");
+
if (coord_.get() != NULL) {
RETURN_IF_ERROR(coord_->Wait());
RETURN_IF_ERROR(Expr::Open(output_expr_ctxs_, coord_->runtime_state()));
@@ -608,8 +625,8 @@ Status ImpalaServer::QueryExecState::WaitInternal() {
}
if (catalog_op_type() == TCatalogOpType::DDL &&
- ddl_type() == TDdlType::COMPUTE_STATS && child_queries_.size() > 0) {
- RETURN_IF_ERROR(UpdateTableAndColumnStats());
+ ddl_type() == TDdlType::COMPUTE_STATS && child_queries.size() > 0) {
+ RETURN_IF_ERROR(UpdateTableAndColumnStats(child_queries));
}
if (!returns_result_set()) {
@@ -820,22 +837,40 @@ Status ImpalaServer::QueryExecState::GetRowValue(TupleRow* row, vector<void*>* r
return Status::OK();
}
-void ImpalaServer::QueryExecState::Cancel(const Status* cause) {
- // Cancel and close child queries before cancelling parent.
- for (ChildQuery& child_query: child_queries_) {
- child_query.Cancel();
- }
-
- // If the query is completed or cancelled, no need to cancel.
- if (eos_ || query_state_ == QueryState::EXCEPTION) return;
+Status ImpalaServer::QueryExecState::Cancel(bool check_inflight, const Status* cause) {
+ Coordinator* coord;
+ {
+ lock_guard<mutex> lock(lock_);
+ if (check_inflight) {
+ lock_guard<mutex> session_lock(session_->lock);
+ if (session_->inflight_queries.find(query_id()) ==
+ session_->inflight_queries.end()) {
+ return Status("Query not yet running");
+ }
+ }
- if (cause != NULL) {
- DCHECK(!cause->ok());
- UpdateQueryStatus(*cause);
- query_events_->MarkEvent("Cancelled");
- DCHECK_EQ(query_state_, QueryState::EXCEPTION);
- }
- if (coord_.get() != NULL) coord_->Cancel(cause);
+ // If the query is completed or cancelled, no need to update state.
+ bool already_done = eos_ || query_state_ == QueryState::EXCEPTION;
+ if (!already_done && cause != NULL) {
+ DCHECK(!cause->ok());
+ UpdateQueryStatus(*cause);
+ query_events_->MarkEvent("Cancelled");
+ DCHECK_EQ(query_state_, QueryState::EXCEPTION);
+ }
+ // Get a copy of the coordinator pointer while holding 'lock_'.
+ coord = coord_.get();
+ is_cancelled_ = true;
+ } // Release lock_ before doing cancellation work.
+
+ // Cancel and close child queries before cancelling parent. 'lock_' should not be held
+ // because a) ChildQuery::Cancel() calls back into ImpalaServer and b) cancellation
+ // involves RPCs and can take quite some time.
+ child_query_executor_->Cancel();
+
+ // Cancel the parent query. 'lock_' should not be held because cancellation involves
+ // RPCs and can block for a long time.
+ if (coord != NULL) coord->Cancel(cause);
+ return Status::OK();
}
Status ImpalaServer::QueryExecState::UpdateCatalog() {
@@ -988,9 +1023,10 @@ void ImpalaServer::QueryExecState::MarkActive() {
++ref_count_;
}
-Status ImpalaServer::QueryExecState::UpdateTableAndColumnStats() {
- DCHECK_GE(child_queries_.size(), 1);
- DCHECK_LE(child_queries_.size(), 2);
+Status ImpalaServer::QueryExecState::UpdateTableAndColumnStats(
+ const vector<ChildQuery*>& child_queries) {
+ DCHECK_GE(child_queries.size(), 1);
+ DCHECK_LE(child_queries.size(), 2);
catalog_op_executor_.reset(
new CatalogOpExecutor(exec_env_, frontend_, &server_profile_));
@@ -998,15 +1034,15 @@ Status ImpalaServer::QueryExecState::UpdateTableAndColumnStats() {
// ExecComputeStats(). Otherwise pass in the column stats result.
TTableSchema col_stats_schema;
TRowSet col_stats_data;
- if (child_queries_.size() > 1) {
- col_stats_schema = child_queries_[1].result_schema();
- col_stats_data = child_queries_[1].result_data();
+ if (child_queries.size() > 1) {
+ col_stats_schema = child_queries[1]->result_schema();
+ col_stats_data = child_queries[1]->result_data();
}
Status status = catalog_op_executor_->ExecComputeStats(
exec_request_.catalog_op_request.ddl_params.compute_stats_params,
- child_queries_[0].result_schema(),
- child_queries_[0].result_data(),
+ child_queries[0]->result_schema(),
+ child_queries[0]->result_data(),
col_stats_schema,
col_stats_data);
{
@@ -1030,31 +1066,6 @@ Status ImpalaServer::QueryExecState::UpdateTableAndColumnStats() {
return Status::OK();
}
-void ImpalaServer::QueryExecState::ExecChildQueriesAsync() {
- DCHECK(child_queries_thread_.get() == NULL);
- child_queries_thread_.reset(new Thread("query-exec-state", "async child queries",
- bind(&ImpalaServer::QueryExecState::ExecChildQueries, this)));
-}
-
-void ImpalaServer::QueryExecState::ExecChildQueries() {
- for (int i = 0; i < child_queries_.size(); ++i) {
- if (!child_queries_status_.ok()) return;
- child_queries_status_ = child_queries_[i].ExecAndFetch();
- }
-}
-
-Status ImpalaServer::QueryExecState::WaitForChildQueries() {
- if (child_queries_thread_.get() == NULL) return Status::OK();
- child_queries_thread_->Join();
- {
- lock_guard<mutex> l(lock_);
- RETURN_IF_ERROR(query_status_);
- RETURN_IF_ERROR(UpdateQueryStatus(child_queries_status_));
- }
- query_events_->MarkEvent("Child queries finished");
- return Status::OK();
-}
-
void ImpalaServer::QueryExecState::ClearResultCache() {
if (result_cache_ == NULL) return;
// Update result set cache metrics and mem limit accounting.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0d0c93ec/be/src/service/query-exec-state.h
----------------------------------------------------------------------
diff --git a/be/src/service/query-exec-state.h b/be/src/service/query-exec-state.h
index feb8afe..0a763ff 100644
--- a/be/src/service/query-exec-state.h
+++ b/be/src/service/query-exec-state.h
@@ -124,9 +124,12 @@ class ImpalaServer::QueryExecState {
/// Cancels the child queries and the coordinator with the given cause.
/// If cause is NULL, assume this was deliberately cancelled by the user.
/// Otherwise, sets state to EXCEPTION.
- /// Caller needs to hold lock_.
/// Does nothing if the query has reached EOS or already cancelled.
- void Cancel(const Status* cause = NULL);
+ ///
+ /// Only returns an error if 'check_inflight' is true and the query is not yet
+ /// in-flight. Otherwise, proceed and return Status::OK() even if the query isn't
+ /// in-flight (for cleaning up after an error on the query issuing path).
+ Status Cancel(bool check_inflight, const Status* cause);
/// This is called when the query is done (finished, cancelled, or failed).
/// Takes lock_: callers must not hold lock() before calling.
@@ -217,7 +220,16 @@ class ImpalaServer::QueryExecState {
/// increased, and decreased once that work is completed.
uint32_t ref_count_;
- boost::mutex lock_; // protects all following fields
+ /// Executor for any child queries (e.g. compute stats subqueries). Always non-NULL.
+ const boost::scoped_ptr<ChildQueryExecutor> child_query_executor_;
+
+ // Protects all following fields. Acquirers should be careful not to hold it for too
+ // long, e.g. during RPCs because this lock is required to make progress on various
+ // ImpalaServer requests. If held for too long it can block progress of client
+ // requests for this query, e.g. query status and cancellation. Furthermore, until
+ // IMPALA-3882 is fixed, it can indirectly block progress on all other queries.
+ boost::mutex lock_;
+
ExecEnv* exec_env_;
/// Thread for asynchronously running Wait().
@@ -282,6 +294,7 @@ class ImpalaServer::QueryExecState {
RuntimeProfile::EventSequence* query_events_;
std::vector<ExprContext*> output_expr_ctxs_;
+ bool is_cancelled_; // if true, Cancel() was called.
bool eos_; // if true, there are no more rows to return
// We enforce the invariant that query_status_ is not OK iff query_state_
// is EXCEPTION, given that lock_ is held.
@@ -308,16 +321,6 @@ class ImpalaServer::QueryExecState {
/// Start/end time of the query
TimestampValue start_time_, end_time_;
- /// List of child queries to be executed on behalf of this query.
- std::vector<ChildQuery> child_queries_;
-
- /// Thread to execute child_queries_ in and the resulting status. The status is OK iff
- /// all child queries complete successfully. Otherwise, status contains the error of the
- /// first child query that failed (child queries are executed serially and abort on the
- /// first error).
- Status child_queries_status_;
- boost::scoped_ptr<Thread> child_queries_thread_;
-
/// Executes a local catalog operation (an operation that does not need to execute
/// against the catalog service). Includes USE, SHOW, DESCRIBE, and EXPLAIN statements.
Status ExecLocalCatalogOp(const TCatalogOpRequest& catalog_op);
@@ -333,6 +336,8 @@ class ImpalaServer::QueryExecState {
/// Core logic of initiating a query or dml execution request.
/// Initiates execution of plan fragments, if there are any, and sets
/// up the output exprs for subsequent calls to FetchRows().
+ /// 'coord_' is only valid after this method is called, and may be invalid if it
+ /// returns an error.
/// Also sets up profile and pre-execution counters.
/// Non-blocking.
Status ExecQueryOrDmlRequest(const TQueryExecRequest& query_exec_request);
@@ -386,23 +391,7 @@ class ImpalaServer::QueryExecState {
/// For example, INSERT queries update partition metadata in UpdateCatalog() using a
/// TUpdateCatalogRequest, whereas our DDL uses a TCatalogOpRequest for very similar
/// purposes. Perhaps INSERT should use a TCatalogOpRequest as well.
- Status UpdateTableAndColumnStats();
-
- /// Asynchronously executes all child_queries_ one by one. Calls ExecChildQueries()
- /// in a new child_queries_thread_.
- void ExecChildQueriesAsync();
-
- /// Serially executes the queries in child_queries_ by calling the child query's
- /// ExecAndWait(). This function is blocking and is intended to be run in a separate
- /// thread to ensure that Exec() remains non-blocking. Sets child_queries_status_.
- /// Must not be called while holding lock_.
- void ExecChildQueries();
-
- /// Waits for all child queries to complete successfully or with an error, by joining
- /// child_queries_thread_. Returns a non-OK status if a child query fails or if the
- /// parent query is cancelled (subsequent children will not be executed). Returns OK
- /// if child_queries_thread_ is not set or if all child queries finished successfully.
- Status WaitForChildQueries();
+ Status UpdateTableAndColumnStats(const std::vector<ChildQuery*>& child_queries);
/// Sets result_cache_ to NULL and updates its associated metrics and mem consumption.
/// This function is a no-op if the cache has already been cleared.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0d0c93ec/tests/query_test/test_cancellation.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_cancellation.py b/tests/query_test/test_cancellation.py
index c7cc6da..265c781 100644
--- a/tests/query_test/test_cancellation.py
+++ b/tests/query_test/test_cancellation.py
@@ -72,11 +72,6 @@ class TestCancellation(ImpalaTestSuite):
# Ignore 'compute stats' queries for the CTAS query type.
cls.TestMatrix.add_constraint(lambda v: not (v.get_value('query_type') == 'CTAS' and
v.get_value('query').startswith('compute stats')))
- # Ignore debug actions for 'compute stats' because cancellation of 'compute stats'
- # relies on child queries eventually making forward progress, but debug actions
- # will cause child queries to hang indefinitely.
- cls.TestMatrix.add_constraint(lambda v: not (v.get_value('action') == 'WAIT' and
- v.get_value('query').startswith('compute stats')))
# tpch tables are not generated for hbase as the data loading takes a very long time.
# TODO: Add cancellation tests for hbase.
cls.TestMatrix.add_constraint(lambda v:\