You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by sa...@apache.org on 2016/07/26 21:49:06 UTC
[2/2] incubator-impala git commit: IMPALA-2979: Fix scheduling on
remote hosts
IMPALA-2979: Fix scheduling on remote hosts
Also fixes: IMPALA-2400, IMPALA-3043
This change fixes scheduling scan-ranges on remote hosts by adding
remote backend selection capability to SimpleScheduler. Prior to this
change the scheduler would try to select a local backend even when
remote scheduling was requested.
This change also allows pseudo-randomized remote backend selection to
prevent convoying, which could happen when different independent
schedulers had the same internal state, e.g. after a cluster restart. To
enable the new behavior set the query option SCHEDULE_RANDOM_REPLICA to
true.
This change also fixes IMPALA-2400: Unpredictable locality behavior
for reading Parquet files
This change also fixes IMPALA-3043: SimpleScheduler does not handle
hosts with multiple IP addresses correctly
This change also does some clean-up in scheduler.h and
simple-scheduler.{h,cc}.
Change-Id: I044f83806fcde820fcb38047cf6b8e780d803858
Reviewed-on: http://gerrit.cloudera.org:8080/3771
Reviewed-by: Lars Volker <lv...@cloudera.com>
Reviewed-by: Sailesh Mukil <sa...@cloudera.com>
Tested-by: Internal Jenkins
Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/0ad935b6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/0ad935b6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/0ad935b6
Branch: refs/heads/master
Commit: 0ad935b63c23029bd4cac4beefde6b3b7c0e322b
Parents: 45ff0f9
Author: Lars Volker <lv...@cloudera.com>
Authored: Fri Feb 12 16:31:36 2016 -0800
Committer: Internal Jenkins <cl...@gerrit.cloudera.org>
Committed: Tue Jul 26 21:44:37 2016 +0000
----------------------------------------------------------------------
be/src/scheduling/scheduler.h | 11 -
be/src/scheduling/simple-scheduler-test.cc | 84 +-
be/src/scheduling/simple-scheduler.cc | 855 +++++++++++--------
be/src/scheduling/simple-scheduler.h | 420 +++++++--
be/src/service/query-options.cc | 16 +
be/src/service/query-options.h | 1 +
common/thrift/ImpalaInternalService.thrift | 13 +-
common/thrift/ImpalaService.thrift | 7 +-
.../com/cloudera/impala/analysis/TableRef.java | 11 +-
.../impala/analysis/AnalyzeStmtsTest.java | 16 +-
.../cloudera/impala/analysis/ParserTest.java | 25 +-
.../com/cloudera/impala/analysis/ToSqlTest.java | 12 +-
12 files changed, 983 insertions(+), 488 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0ad935b6/be/src/scheduling/scheduler.h
----------------------------------------------------------------------
diff --git a/be/src/scheduling/scheduler.h b/be/src/scheduling/scheduler.h
index 4eee9ef..39d8309 100644
--- a/be/src/scheduling/scheduler.h
+++ b/be/src/scheduling/scheduler.h
@@ -44,17 +44,6 @@ class Scheduler {
/// List of server descriptors.
typedef std::vector<TBackendDescriptor> BackendList;
- /// Return a host/port pair of known ImpalaInternalServices that is running on or nearby
- /// the given data location
- virtual Status GetBackend(const TNetworkAddress& data_location,
- TBackendDescriptor* backend) = 0;
-
- /// Return true if there is a backend located on the given data_location
- virtual bool HasLocalBackend(const TNetworkAddress& data_location) = 0;
-
- /// Return a list of all backends known to the scheduler
- virtual void GetAllKnownBackends(BackendList* backends) = 0;
-
/// Populates given query schedule whose execution is to be coordinated by coord.
/// Assigns fragments to hosts based on scan ranges in the query exec request.
/// If resource management is enabled, also reserves resources from the central
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0ad935b6/be/src/scheduling/simple-scheduler-test.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/simple-scheduler-test.cc b/be/src/scheduling/simple-scheduler-test.cc
index bebb1fb..d43682e 100644
--- a/be/src/scheduling/simple-scheduler-test.cc
+++ b/be/src/scheduling/simple-scheduler-test.cc
@@ -21,6 +21,7 @@
#include "common/logging.h"
#include "simple-scheduler.h"
+#include "util/runtime-profile.h"
#include "common/names.h"
@@ -147,7 +148,7 @@ class Cluster {
/// Add a number of hosts with the same properties by repeatedly calling AddHost(..).
void AddHosts(int num_hosts, bool has_backend, bool has_datanode) {
- for (int i = 0; i < num_hosts; ++i) AddHost(true, true);
+ for (int i = 0; i < num_hosts; ++i) AddHost(has_backend, has_datanode);
}
/// Convert a host index to a hostname.
@@ -365,6 +366,10 @@ class Plan {
const TQueryOptions& query_options() const { return query_options_; }
+ void SetReplicaPreference(TReplicaPreference::type p) {
+ query_options_.replica_preference = p;
+ }
+
void SetRandomReplica(bool b) { query_options_.schedule_random_replica = b; }
void SetDisableCachedReads(bool b) { query_options_.disable_cached_reads = b; }
const Cluster& cluster() const { return schema_.cluster(); }
@@ -745,15 +750,20 @@ class SchedulerWrapper {
InitializeScheduler();
}
- /// Call ComputeScanRangeAssignment().
+ /// Call ComputeScanRangeAssignment() with exec_at_coord set to false.
void Compute(Result* result) {
+ Compute(false, result);
+ }
+
+ /// Call ComputeScanRangeAssignment().
+ void Compute(bool exec_at_coord, Result* result) {
DCHECK(scheduler_ != NULL);
// Compute Assignment.
FragmentScanRangeAssignment* assignment = result->AddAssignment();
- scheduler_->ComputeScanRangeAssignment(0, NULL, false,
- plan_.scan_range_locations(), plan_.referenced_datanodes(), false,
- plan_.query_options(), assignment);
+ scheduler_->ComputeScanRangeAssignment(*scheduler_->GetBackendConfig(), 0, NULL,
+ false, plan_.scan_range_locations(), plan_.referenced_datanodes(), exec_at_coord,
+ plan_.query_options(), NULL, assignment);
}
/// Reset the state of the scheduler by re-creating and initializing it.
@@ -887,6 +897,27 @@ TEST_F(SchedulerTest, SingleHostSingleFile) {
EXPECT_EQ(0, result.NumCachedAssignments());
}
+/// Test assigning all scan ranges to the coordinator.
+TEST_F(SchedulerTest, ExecAtCoord) {
+ Cluster cluster;
+ cluster.AddHosts(3, true, true);
+
+ Schema schema(cluster);
+ schema.AddMultiBlockTable("T", 3, ReplicaPlacement::LOCAL_ONLY, 3);
+
+ Plan plan(schema);
+ plan.AddTableScan("T");
+
+ Result result(plan);
+ SchedulerWrapper scheduler(plan);
+ bool exec_at_coord = true;
+ scheduler.Compute(exec_at_coord, &result);
+
+ EXPECT_EQ(3 * Block::DEFAULT_BLOCK_SIZE, result.NumTotalAssignedBytes(0));
+ EXPECT_EQ(0, result.NumTotalAssignedBytes(1));
+ EXPECT_EQ(0, result.NumTotalAssignedBytes(2));
+}
+
/// Test scanning a simple table twice.
TEST_F(SchedulerTest, ScanTableTwice) {
Cluster cluster;
@@ -955,29 +986,6 @@ TEST_F(SchedulerTest, LocalReadsPickFirstReplica) {
EXPECT_EQ(0, result.NumDiskAssignments(2));
}
-/// Verify that scheduling with random_replica = true results in a pseudo-random
-/// round-robin selection of backends.
-/// Disabled, global backend rotation not implemented.
-TEST_F(SchedulerTest, DISABLED_RandomReplicaRoundRobin) {
- Cluster cluster;
- for (int i = 0; i < 10; ++i) cluster.AddHost(i < 5, true);
-
- Schema schema(cluster);
- schema.AddSingleBlockTable("T1", {0, 1, 2});
-
- Plan plan(schema);
- plan.AddTableScan("T1");
- plan.SetRandomReplica(true);
-
- Result result(plan);
- SchedulerWrapper scheduler(plan);
- for (int i = 0; i < 3; ++i) scheduler.Compute(&result);
-
- EXPECT_EQ(1 * Block::DEFAULT_BLOCK_SIZE, result.NumDiskAssignedBytes(0));
- EXPECT_EQ(1 * Block::DEFAULT_BLOCK_SIZE, result.NumDiskAssignedBytes(1));
- EXPECT_EQ(1 * Block::DEFAULT_BLOCK_SIZE, result.NumDiskAssignedBytes(2));
-}
-
/// Create a medium sized cluster with 100 nodes and compute a schedule over 3 tables.
TEST_F(SchedulerTest, TestMediumSizedCluster) {
Cluster cluster;
@@ -1126,10 +1134,12 @@ TEST_F(SchedulerTest, TestDisableCachedReads) {
/// IMPALA-3019: Test for round robin reset problem. We schedule the same plan twice but
/// send an empty statestored message in between.
-TEST_F(SchedulerTest, DISABLED_EmptyStatestoreMessage) {
+/// TODO: This problem cannot occur anymore and the test is merely green for random
+/// behavior. Remove.
+TEST_F(SchedulerTest, EmptyStatestoreMessage) {
Cluster cluster;
cluster.AddHosts(3, false, true);
- cluster.AddHost(true, false);
+ cluster.AddHosts(2, true, false);
Schema schema(cluster);
schema.AddMultiBlockTable("T1", 1, ReplicaPlacement::RANDOM, 3);
@@ -1141,14 +1151,20 @@ TEST_F(SchedulerTest, DISABLED_EmptyStatestoreMessage) {
SchedulerWrapper scheduler(plan);
scheduler.Compute(&result);
- EXPECT_EQ(1, result.NumDiskAssignments(0));
- EXPECT_EQ(0, result.NumDiskAssignments(1));
+ EXPECT_EQ(0, result.NumTotalAssignedBytes(0));
+ EXPECT_EQ(0, result.NumTotalAssignedBytes(1));
+ EXPECT_EQ(0, result.NumTotalAssignedBytes(2));
+ EXPECT_EQ(0, result.NumTotalAssignedBytes(3));
+ EXPECT_EQ(1 * Block::DEFAULT_BLOCK_SIZE, result.NumTotalAssignedBytes(4));
result.Reset();
scheduler.SendEmptyUpdate();
scheduler.Compute(&result);
- EXPECT_EQ(0, result.NumDiskAssignments(0));
- EXPECT_EQ(1, result.NumDiskAssignments(1));
+ EXPECT_EQ(0, result.NumTotalAssignedBytes(0));
+ EXPECT_EQ(0, result.NumTotalAssignedBytes(1));
+ EXPECT_EQ(0, result.NumTotalAssignedBytes(2));
+ EXPECT_EQ(1 * Block::DEFAULT_BLOCK_SIZE, result.NumTotalAssignedBytes(3));
+ EXPECT_EQ(0, result.NumTotalAssignedBytes(4));
}
/// Test sending updates to the scheduler.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0ad935b6/be/src/scheduling/simple-scheduler.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/simple-scheduler.cc b/be/src/scheduling/simple-scheduler.cc
index 1e46c67..c2e948f 100644
--- a/be/src/scheduling/simple-scheduler.cc
+++ b/be/src/scheduling/simple-scheduler.cc
@@ -14,6 +14,8 @@
#include "scheduling/simple-scheduler.h"
+#include <atomic>
+#include <random>
#include <vector>
#include <boost/algorithm/string.hpp>
@@ -41,6 +43,7 @@
#include "util/llama-util.h"
#include "util/mem-info.h"
#include "util/parse-util.h"
+#include "util/runtime-profile-counters.h"
#include "gen-cpp/ResourceBrokerService_types.h"
#include "common/names.h"
@@ -65,6 +68,7 @@ static const string ASSIGNMENTS_KEY("simple-scheduler.assignments.total");
static const string SCHEDULER_INIT_KEY("simple-scheduler.initialized");
static const string NUM_BACKENDS_KEY("simple-scheduler.num-backends");
+
static const string BACKENDS_WEB_PAGE = "/backends";
static const string BACKENDS_TEMPLATE = "backends.tmpl";
@@ -74,19 +78,19 @@ SimpleScheduler::SimpleScheduler(StatestoreSubscriber* subscriber,
const string& backend_id, const TNetworkAddress& backend_address,
MetricGroup* metrics, Webserver* webserver, ResourceBroker* resource_broker,
RequestPoolService* request_pool_service)
- : metrics_(metrics->GetChildGroup("scheduler")),
+ : backend_config_(std::make_shared<const BackendConfig>()),
+ metrics_(metrics->GetChildGroup("scheduler")),
webserver_(webserver),
statestore_subscriber_(subscriber),
- backend_id_(backend_id),
+ local_backend_id_(backend_id),
thrift_serializer_(false),
total_assignments_(NULL),
total_local_assignments_(NULL),
- initialised_(NULL),
- update_count_(0),
+ initialized_(NULL),
resource_broker_(resource_broker),
request_pool_service_(request_pool_service) {
- backend_descriptor_.address = backend_address;
- next_nonlocal_backend_entry_ = backend_map_.begin();
+ local_backend_descriptor_.address = backend_address;
+
if (FLAGS_disable_admission_control) LOG(INFO) << "Admission control is disabled.";
if (!FLAGS_disable_admission_control) {
admission_controller_.reset(
@@ -118,57 +122,43 @@ SimpleScheduler::SimpleScheduler(StatestoreSubscriber* subscriber,
SimpleScheduler::SimpleScheduler(const vector<TNetworkAddress>& backends,
MetricGroup* metrics, Webserver* webserver, ResourceBroker* resource_broker,
RequestPoolService* request_pool_service)
- : metrics_(metrics),
+ : backend_config_(std::make_shared<const BackendConfig>(backends)),
+ metrics_(metrics),
webserver_(webserver),
statestore_subscriber_(NULL),
thrift_serializer_(false),
total_assignments_(NULL),
total_local_assignments_(NULL),
- initialised_(NULL),
- update_count_(0),
+ initialized_(NULL),
resource_broker_(resource_broker),
request_pool_service_(request_pool_service) {
DCHECK(backends.size() > 0);
+ local_backend_descriptor_.address = MakeNetworkAddress(FLAGS_hostname, FLAGS_be_port);
if (FLAGS_disable_admission_control) LOG(INFO) << "Admission control is disabled.";
// request_pool_service_ may be null in unit tests
if (request_pool_service_ != NULL && !FLAGS_disable_admission_control) {
admission_controller_.reset(
new AdmissionController(request_pool_service_, metrics, TNetworkAddress()));
}
-
- for (int i = 0; i < backends.size(); ++i) {
- vector<string> ipaddrs;
- Status status = HostnameToIpAddrs(backends[i].hostname, &ipaddrs);
- if (!status.ok()) {
- VLOG(1) << "Failed to resolve " << backends[i].hostname << ": "
- << status.GetDetail();
- continue;
- }
-
- // Try to find a non-localhost address, otherwise just use the
- // first IP address returned.
- string ipaddr = ipaddrs[0];
- if (!FindFirstNonLocalhost(ipaddrs, &ipaddr)) {
- VLOG(1) << "Only localhost addresses found for " << backends[i].hostname;
- }
-
- BackendMap::iterator it = backend_map_.find(ipaddr);
- if (it == backend_map_.end()) {
- it = backend_map_.insert(
- make_pair(ipaddr, list<TBackendDescriptor>())).first;
- backend_ip_map_[backends[i].hostname] = ipaddr;
- }
-
- TBackendDescriptor descriptor;
- descriptor.address = MakeNetworkAddress(ipaddr, backends[i].port);
- it->second.push_back(descriptor);
- }
- next_nonlocal_backend_entry_ = backend_map_.begin();
}
Status SimpleScheduler::Init() {
LOG(INFO) << "Starting simple scheduler";
+ // Figure out what our IP address is, so that each subscriber
+ // doesn't have to resolve it on every heartbeat.
+ IpAddr ip;
+ const Hostname& hostname = local_backend_descriptor_.address.hostname;
+ Status status = HostnameToIpAddr(hostname, &ip);
+ if (!status.ok()) {
+ VLOG(1) << status.GetDetail();
+ status.AddDetail("SimpleScheduler failed to start");
+ return status;
+ }
+
+ local_backend_descriptor_.ip_address = ip;
+ LOG(INFO) << "Simple-scheduler using " << ip << " as IP address";
+
if (webserver_ != NULL) {
Webserver::UrlCallback backends_callback =
bind<void>(mem_fn(&SimpleScheduler::BackendsUrlCallback), this, _1, _2);
@@ -188,55 +178,33 @@ Status SimpleScheduler::Init() {
RETURN_IF_ERROR(admission_controller_->Init(statestore_subscriber_));
}
}
+
if (metrics_ != NULL) {
+ // This is after registering with the statestored, so we already have to synchronize
+ // access to the backend_config_ shared_ptr.
+ int num_backends = GetBackendConfig()->backend_map().size();
total_assignments_ = metrics_->AddCounter<int64_t>(ASSIGNMENTS_KEY, 0);
total_local_assignments_ = metrics_->AddCounter<int64_t>(LOCAL_ASSIGNMENTS_KEY, 0);
- initialised_ = metrics_->AddProperty(SCHEDULER_INIT_KEY, true);
+ initialized_ = metrics_->AddProperty(SCHEDULER_INIT_KEY, true);
num_fragment_instances_metric_ = metrics_->AddGauge<int64_t>(
- NUM_BACKENDS_KEY, backend_map_.size());
+ NUM_BACKENDS_KEY, num_backends);
}
if (statestore_subscriber_ != NULL) {
- // Figure out what our IP address is, so that each subscriber
- // doesn't have to resolve it on every heartbeat.
- vector<string> ipaddrs;
- const string& hostname = backend_descriptor_.address.hostname;
- Status status = HostnameToIpAddrs(hostname, &ipaddrs);
- if (!status.ok()) {
- VLOG(1) << "Failed to resolve " << hostname << ": " << status.GetDetail();
- status.AddDetail("SimpleScheduler failed to start");
- return status;
- }
- // Find a non-localhost address for this host; if one can't be
- // found use the first address returned by HostnameToIpAddrs
- string ipaddr = ipaddrs[0];
- if (!FindFirstNonLocalhost(ipaddrs, &ipaddr)) {
- VLOG(3) << "Only localhost addresses found for " << hostname;
- }
-
- backend_descriptor_.ip_address = ipaddr;
- LOG(INFO) << "Simple-scheduler using " << ipaddr << " as IP address";
-
if (webserver_ != NULL) {
const TNetworkAddress& webserver_address = webserver_->http_address();
if (IsWildcardAddress(webserver_address.hostname)) {
- backend_descriptor_.__set_debug_http_address(
- MakeNetworkAddress(ipaddr, webserver_address.port));
+ local_backend_descriptor_.__set_debug_http_address(
+ MakeNetworkAddress(ip, webserver_address.port));
} else {
- backend_descriptor_.__set_debug_http_address(webserver_address);
+ local_backend_descriptor_.__set_debug_http_address(webserver_address);
}
- backend_descriptor_.__set_secure_webserver(webserver_->IsSecure());
+ local_backend_descriptor_.__set_secure_webserver(webserver_->IsSecure());
}
}
return Status::OK();
}
-// Utility method to help sort backends by ascending network address
-bool TBackendDescriptorComparator(const TBackendDescriptor& a,
- const TBackendDescriptor& b) {
- return TNetworkAddressComparator(a.address, b.address);
-}
-
void SimpleScheduler::BackendsUrlCallback(const Webserver::ArgumentMap& args,
Document* document) {
BackendList backends;
@@ -253,8 +221,6 @@ void SimpleScheduler::BackendsUrlCallback(const Webserver::ArgumentMap& args,
void SimpleScheduler::UpdateMembership(
const StatestoreSubscriber::TopicDeltaMap& incoming_topic_deltas,
vector<TTopicDelta>* subscriber_topic_updates) {
- ++update_count_;
- // TODO: Work on a copy if possible, or at least do resolution as a separate step
// First look to see if the topic(s) we're interested in have an update
StatestoreSubscriber::TopicDeltaMap::const_iterator topic =
incoming_topic_deltas.find(IMPALA_MEMBERSHIP_TOPIC);
@@ -262,74 +228,59 @@ void SimpleScheduler::UpdateMembership(
if (topic != incoming_topic_deltas.end()) {
const TTopicDelta& delta = topic->second;
- // This function needs to handle both delta and non-delta updates. For delta
- // updates, it is desireable to minimize the number of copies to only
- // the added/removed items. To accomplish this, all updates are processed
- // under a lock and applied to the shared backend maps (backend_map_ and
- // backend_ip_map_) in place.
- {
- lock_guard<mutex> lock(backend_map_lock_);
- bool backend_map_changed = false;
- if (!delta.is_delta) {
- backend_map_changed = true;
- current_membership_.clear();
- backend_map_.clear();
- backend_ip_map_.clear();
- }
+ // This function needs to handle both delta and non-delta updates. To minimize the
+ // time needed to hold locks, all updates are applied to a copy of backend_config_,
+ // which is then swapped into place atomically.
+ std::shared_ptr<BackendConfig> new_backend_config;
- // Process new entries to the topic
- for (const TTopicItem& item: delta.topic_entries) {
- TBackendDescriptor be_desc;
- // Benchmarks have suggested that this method can deserialize
- // ~10m messages per second, so no immediate need to consider optimisation.
- uint32_t len = item.value.size();
- Status status = DeserializeThriftMsg(reinterpret_cast<const uint8_t*>(
- item.value.data()), &len, false, &be_desc);
- if (!status.ok()) {
- VLOG(2) << "Error deserializing membership topic item with key: " << item.key;
- continue;
- }
- if (item.key == backend_id_ && be_desc.address != backend_descriptor_.address) {
- // Someone else has registered this subscriber ID with a
- // different address. We will try to re-register
- // (i.e. overwrite their subscription), but there is likely
- // a configuration problem.
- LOG_EVERY_N(WARNING, 30) << "Duplicate subscriber registration from address: "
- << be_desc.address;
- }
+ if (!delta.is_delta) {
+ current_membership_.clear();
+ new_backend_config = std::make_shared<BackendConfig>();
+ } else {
+ // Make a copy
+ lock_guard<mutex> lock(backend_config_lock_);
+ new_backend_config = std::make_shared<BackendConfig>(*backend_config_);
+ }
- backend_map_changed = true;
- list<TBackendDescriptor>* be_descs = &backend_map_[be_desc.ip_address];
- if (find(be_descs->begin(), be_descs->end(), be_desc) == be_descs->end()) {
- backend_map_[be_desc.ip_address].push_back(be_desc);
- }
- backend_ip_map_[be_desc.address.hostname] = be_desc.ip_address;
- current_membership_.insert(make_pair(item.key, be_desc));
+ // Process new entries to the topic
+ for (const TTopicItem& item: delta.topic_entries) {
+ TBackendDescriptor be_desc;
+ // Benchmarks have suggested that this method can deserialize
+ // ~10m messages per second, so no immediate need to consider optimization.
+ uint32_t len = item.value.size();
+ Status status = DeserializeThriftMsg(reinterpret_cast<const uint8_t*>(
+ item.value.data()), &len, false, &be_desc);
+ if (!status.ok()) {
+ VLOG(2) << "Error deserializing membership topic item with key: " << item.key;
+ continue;
}
- // Process deletions from the topic
- for (const string& backend_id: delta.topic_deletions) {
- if (current_membership_.find(backend_id) != current_membership_.end()) {
- backend_map_changed = true;
- const TBackendDescriptor& be_desc = current_membership_[backend_id];
- backend_ip_map_.erase(be_desc.address.hostname);
- list<TBackendDescriptor>* be_descs = &backend_map_[be_desc.ip_address];
- be_descs->erase(
- remove(be_descs->begin(), be_descs->end(), be_desc), be_descs->end());
- if (be_descs->empty()) backend_map_.erase(be_desc.ip_address);
- current_membership_.erase(backend_id);
- }
+ if (item.key == local_backend_id_
+ && be_desc.address != local_backend_descriptor_.address) {
+ // Someone else has registered this subscriber ID with a different address. We
+ // will try to re-register (i.e. overwrite their subscription), but there is
+ // likely a configuration problem.
+ LOG_EVERY_N(WARNING, 30) << "Duplicate subscriber registration from address: "
+ << be_desc.address;
+ }
+
+ new_backend_config->AddBackend(be_desc);
+ current_membership_.insert(make_pair(item.key, be_desc));
+ }
+ // Process deletions from the topic
+ for (const string& backend_id: delta.topic_deletions) {
+ if (current_membership_.find(backend_id) != current_membership_.end()) {
+ new_backend_config->RemoveBackend(current_membership_[backend_id]);
+ current_membership_.erase(backend_id);
}
- // Update invalidated iterator.
- if (backend_map_changed) next_nonlocal_backend_entry_ = backend_map_.begin();
}
+ SetBackendConfig(new_backend_config);
// If this impalad is not in our view of the membership list, we should add it and
// tell the statestore.
- // TODO: Inject global dependencies to make this code testable.
bool is_offline = ExecEnv::GetInstance() &&
ExecEnv::GetInstance()->impala_server()->IsOffline();
if (!is_offline &&
- current_membership_.find(backend_id_) == current_membership_.end()) {
+ current_membership_.find(local_backend_id_) == current_membership_.end()) {
VLOG(1) << "Registering local backend with statestore";
subscriber_topic_updates->push_back(TTopicDelta());
TTopicDelta& update = subscriber_topic_updates->back();
@@ -337,20 +288,21 @@ void SimpleScheduler::UpdateMembership(
update.topic_entries.push_back(TTopicItem());
TTopicItem& item = update.topic_entries.back();
- item.key = backend_id_;
- Status status = thrift_serializer_.Serialize(&backend_descriptor_, &item.value);
+ item.key = local_backend_id_;
+ Status status = thrift_serializer_.Serialize(
+ &local_backend_descriptor_, &item.value);
if (!status.ok()) {
- LOG(WARNING) << "Failed to serialize Impala backend address for statestore topic: "
- << status.GetDetail();
+ LOG(WARNING) << "Failed to serialize Impala backend address for statestore topic:"
+ << " " << status.GetDetail();
subscriber_topic_updates->pop_back();
}
} else if (is_offline &&
- current_membership_.find(backend_id_) != current_membership_.end()) {
+ current_membership_.find(local_backend_id_) != current_membership_.end()) {
LOG(WARNING) << "Removing offline ImpalaServer from statestore";
subscriber_topic_updates->push_back(TTopicDelta());
TTopicDelta& update = subscriber_topic_updates->back();
update.topic_name = IMPALA_MEMBERSHIP_TOPIC;
- update.topic_deletions.push_back(backend_id_);
+ update.topic_deletions.push_back(local_backend_id_);
}
if (metrics_ != NULL) {
num_fragment_instances_metric_->set_value(current_membership_.size());
@@ -358,63 +310,24 @@ void SimpleScheduler::UpdateMembership(
}
}
-Status SimpleScheduler::GetBackend(const TNetworkAddress& data_location,
- TBackendDescriptor* backend) {
- lock_guard<mutex> lock(backend_map_lock_);
- if (backend_map_.size() == 0) {
- return Status("No backends configured");
- }
- bool local_assignment = false;
- BackendMap::iterator entry = backend_map_.find(data_location.hostname);
-
- if (entry == backend_map_.end()) {
- // backend_map_ maps ip address to backend but
- // data_location.hostname might be a hostname.
- // Find the ip address of the data_location from backend_ip_map_.
- BackendIpAddressMap::const_iterator itr =
- backend_ip_map_.find(data_location.hostname);
- if (itr != backend_ip_map_.end()) {
- entry = backend_map_.find(itr->second);
- }
- }
-
- if (entry == backend_map_.end()) {
- // round robin the ipaddress
- entry = next_nonlocal_backend_entry_;
- ++next_nonlocal_backend_entry_;
- if (next_nonlocal_backend_entry_ == backend_map_.end()) {
- next_nonlocal_backend_entry_ = backend_map_.begin();
- }
- } else {
- local_assignment = true;
- }
- DCHECK(!entry->second.empty());
- // Round-robin between impalads on the same ipaddress.
- // Pick the first one, then move it to the back of the queue
- *backend = entry->second.front();
- entry->second.pop_front();
- entry->second.push_back(*backend);
-
- if (metrics_ != NULL) {
- total_assignments_->Increment(1);
- if (local_assignment) {
- total_local_assignments_->Increment(1);
- }
- }
+SimpleScheduler::BackendConfigPtr SimpleScheduler::GetBackendConfig() const {
+ lock_guard<mutex> l(backend_config_lock_);
+ DCHECK(backend_config_.get() != NULL);
+ BackendConfigPtr backend_config = backend_config_;
+ return backend_config;
+}
- if (VLOG_FILE_IS_ON) {
- stringstream s;
- s << "(" << data_location;
- s << " -> " << backend->address << ")";
- VLOG_FILE << "SimpleScheduler assignment (data->backend): " << s.str();
- }
- return Status::OK();
+void SimpleScheduler::SetBackendConfig(const BackendConfigPtr& backend_config)
+{
+ lock_guard<mutex> l(backend_config_lock_);
+ backend_config_ = backend_config;
}
+
void SimpleScheduler::GetAllKnownBackends(BackendList* backends) {
- lock_guard<mutex> lock(backend_map_lock_);
backends->clear();
- for (const BackendMap::value_type& backend_list: backend_map_) {
+ BackendConfigPtr backend_config = GetBackendConfig();
+ for (const BackendMap::value_type& backend_list: backend_config->backend_map()) {
backends->insert(backends->end(), backend_list.second.begin(),
backend_list.second.end());
}
@@ -423,6 +336,11 @@ void SimpleScheduler::GetAllKnownBackends(BackendList* backends) {
Status SimpleScheduler::ComputeScanRangeAssignment(const TQueryExecRequest& exec_request,
QuerySchedule* schedule) {
map<TPlanNodeId, vector<TScanRangeLocations>>::const_iterator entry;
+ RuntimeProfile::Counter* total_assignment_timer =
+ ADD_TIMER(schedule->summary_profile(), "ComputeScanRangeAssignmentTimer");
+
+ BackendConfigPtr backend_config = GetBackendConfig();
+
for (entry = exec_request.per_node_scan_ranges.begin();
entry != exec_request.per_node_scan_ranges.end(); ++entry) {
const TPlanNodeId node_id = entry->first;
@@ -436,32 +354,37 @@ Status SimpleScheduler::ComputeScanRangeAssignment(const TQueryExecRequest& exec
const TReplicaPreference::type* node_replica_preference = node.__isset.hdfs_scan_node
&& node.hdfs_scan_node.__isset.replica_preference
? &node.hdfs_scan_node.replica_preference : NULL;
- bool node_random_replica = node.__isset.hdfs_scan_node &&
- node.hdfs_scan_node.__isset.random_replica &&
- node.hdfs_scan_node.random_replica;
+
+ bool node_random_replica = node.__isset.hdfs_scan_node
+ && node.hdfs_scan_node.__isset.random_replica
+ && node.hdfs_scan_node.random_replica;
FragmentScanRangeAssignment* assignment =
&(*schedule->exec_params())[fragment_idx].scan_range_assignment;
- RETURN_IF_ERROR(ComputeScanRangeAssignment(
+ RETURN_IF_ERROR(ComputeScanRangeAssignment(*backend_config,
node_id, node_replica_preference, node_random_replica, entry->second,
- exec_request.host_list, exec_at_coord, schedule->query_options(), assignment));
+ exec_request.host_list, exec_at_coord, schedule->query_options(),
+ total_assignment_timer, assignment));
schedule->AddScanRanges(entry->second.size());
}
return Status::OK();
}
Status SimpleScheduler::ComputeScanRangeAssignment(
- PlanNodeId node_id, const TReplicaPreference::type* node_replica_preference,
- bool node_random_replica, const vector<TScanRangeLocations>& locations,
+ const BackendConfig& backend_config, PlanNodeId node_id,
+ const TReplicaPreference::type* node_replica_preference, bool node_random_replica,
+ const vector<TScanRangeLocations>& locations,
const vector<TNetworkAddress>& host_list, bool exec_at_coord,
- const TQueryOptions& query_options, FragmentScanRangeAssignment* assignment) {
+ const TQueryOptions& query_options, RuntimeProfile::Counter* timer,
+ FragmentScanRangeAssignment* assignment) {
+ SCOPED_TIMER(timer);
// We adjust all replicas with memory distance less than base_distance to base_distance
- // and view all replicas with equal or better distance as the same. For a full list of
- // memory distance classes see TReplicaPreference in PlanNodes.thrift.
- TReplicaPreference::type base_distance = TReplicaPreference::CACHE_LOCAL;
+ // and collect all replicas with equal or better distance as candidates. For a full list
+ // of memory distance classes see TReplicaPreference in PlanNodes.thrift.
+ TReplicaPreference::type base_distance = query_options.replica_preference;
+
// The query option to disable cached reads adjusts the memory base distance to view
- // all replicas as disk_local or worse.
- // TODO remove in CDH6
+ // all replicas as having a distance disk_local or worse.
if (query_options.disable_cached_reads &&
base_distance == TReplicaPreference::CACHE_LOCAL) {
base_distance = TReplicaPreference::DISK_LOCAL;
@@ -470,166 +393,102 @@ Status SimpleScheduler::ComputeScanRangeAssignment(
// A preference attached to the plan node takes precedence.
if (node_replica_preference) base_distance = *node_replica_preference;
- // On otherwise equivalent disk replicas we either pick the first one, or we pick a
- // random one. Picking random ones helps with preventing hot spots across several
- // queries. On cached replica we will always break ties randomly.
- bool random_non_cached_tiebreak = node_random_replica
- || query_options.schedule_random_replica;
+ // Between otherwise equivalent backends we optionally break ties by comparing their
+ // random rank.
+ bool random_replica = query_options.schedule_random_replica || node_random_replica;
- // map from datanode host to total assigned bytes.
- unordered_map<TNetworkAddress, uint64_t> assigned_bytes_per_host;
- unordered_set<TNetworkAddress> remote_hosts;
- int64_t remote_bytes = 0L;
- int64_t local_bytes = 0L;
- int64_t cached_bytes = 0L;
+ AssignmentCtx assignment_ctx(backend_config, total_assignments_,
+ total_local_assignments_);
+ vector<const TScanRangeLocations*> remote_scan_range_locations;
+
+ // Loop over all scan ranges, select a backend for those with local impalads and collect
+ // all others for later processing.
for (const TScanRangeLocations& scan_range_locations: locations) {
- // Assign scans to replica with smallest memory distance.
TReplicaPreference::type min_distance = TReplicaPreference::REMOTE;
- // Assign this scan range to the host w/ the fewest assigned bytes.
- uint64_t min_assigned_bytes = numeric_limits<uint64_t>::max();
- const TNetworkAddress* data_host = NULL; // data server; not necessarily backend
- int volume_id = -1;
- bool is_cached = false;
- bool remote_read = false;
-
- // Equivalent replicas have the same adjusted memory distance and the same number of
- // assigned bytes.
- int num_equivalent_replicas = 0;
- for (const TScanRangeLocation& location: scan_range_locations.locations) {
- TReplicaPreference::type memory_distance = TReplicaPreference::REMOTE;
- const TNetworkAddress& replica_host = host_list[location.host_idx];
- if (HasLocalBackend(replica_host)) {
- // Adjust whether or not this replica should count as being cached based on the
- // query option and whether it is collocated. If the DN is not collocated treat
- // the replica as not cached (network transfer dominates anyway in this case).
- // TODO: measure this in a cluster setup. Are remote reads better with caching?
- if (location.is_cached) {
- memory_distance = TReplicaPreference::CACHE_LOCAL;
- } else {
- memory_distance = TReplicaPreference::DISK_LOCAL;
- }
- } else {
- memory_distance = TReplicaPreference::REMOTE;
- }
- memory_distance = max(memory_distance, base_distance);
-
- // Named variable is needed here for template parameter deduction to work.
- uint64_t initial_bytes = 0L;
- uint64_t assigned_bytes =
- *FindOrInsert(&assigned_bytes_per_host, replica_host, initial_bytes);
-
- bool found_new_replica = false;
-
- // Check if we can already accept based on memory distance.
- if (memory_distance < min_distance) {
- min_distance = memory_distance;
- num_equivalent_replicas = 1;
- found_new_replica = true;
- } else if (memory_distance == min_distance) {
- // Check the effective memory distance of the current replica to decide whether to
- // treat it as cached. If the actual distance has been increased to base_distance,
- // then cached_replica will be different from is_cached.
- bool cached_replica = memory_distance == TReplicaPreference::CACHE_LOCAL;
- // Load based scheduling
- if (assigned_bytes < min_assigned_bytes) {
- num_equivalent_replicas = 1;
- found_new_replica = true;
- } else if (assigned_bytes == min_assigned_bytes &&
- (random_non_cached_tiebreak || cached_replica)) {
- // We do reservoir sampling: assume we have k equivalent replicas and encounter
- // another equivalent one. Then we want to select the new one with probability
- // 1/(k+1). This is achieved by rand % k+1 == 0. Now, assume the probability for
- // one of the other replicas to be selected had been 1/k before. It will now be
- // 1/k * k/(k+1) = 1/(k+1). Thus we achieve the goal of picking a replica
- // uniformly at random.
- ++num_equivalent_replicas;
- const int r = rand(); // make debugging easier.
- found_new_replica = (r % num_equivalent_replicas == 0);
- }
- }
- if (found_new_replica) {
- min_assigned_bytes = assigned_bytes;
- data_host = &replica_host;
- volume_id = location.volume_id;
- is_cached = location.is_cached;
- remote_read = min_distance == TReplicaPreference::REMOTE;
- }
- } // end of for()
-
- int64_t scan_range_length = 0;
- if (scan_range_locations.scan_range.__isset.hdfs_file_split) {
- scan_range_length = scan_range_locations.scan_range.hdfs_file_split.length;
- } else if (scan_range_locations.scan_range.__isset.kudu_key_range) {
- // Hack so that kudu ranges are well distributed.
- // TODO: KUDU-1133 Use the tablet size instead.
- scan_range_length = 1000;
- }
-
- if (remote_read) {
- remote_bytes += scan_range_length;
- remote_hosts.insert(*data_host);
+ // Select backend host for the current scan range.
+ if (exec_at_coord) {
+ assignment_ctx.RecordScanRangeAssignment(local_backend_descriptor_, node_id,
+ host_list, scan_range_locations, assignment);
} else {
- local_bytes += scan_range_length;
- if (is_cached) cached_bytes += scan_range_length;
- }
- assigned_bytes_per_host[*data_host] += scan_range_length;
-
- // translate data host to backend host
- DCHECK(data_host != NULL);
-
- TNetworkAddress exec_hostport;
- if (!exec_at_coord) {
- TBackendDescriptor backend;
- RETURN_IF_ERROR(GetBackend(*data_host, &backend));
- exec_hostport = backend.address;
- } else {
- exec_hostport = MakeNetworkAddress(FLAGS_hostname, FLAGS_be_port);
- }
+ // Collect backend candidates with smallest memory distance.
+ vector<IpAddr> backend_candidates;
+ if (base_distance < TReplicaPreference::REMOTE) {
+ for (const TScanRangeLocation& location: scan_range_locations.locations) {
+ const TNetworkAddress& replica_host = host_list[location.host_idx];
+ // Determine the adjusted memory distance to the closest backend for the replica
+ // host.
+ TReplicaPreference::type memory_distance = TReplicaPreference::REMOTE;
+ IpAddr backend_ip;
+ bool has_local_backend = assignment_ctx.backend_config().LookUpBackendIp(
+ replica_host.hostname, &backend_ip);
+ if (has_local_backend) {
+ if (location.is_cached) {
+ memory_distance = TReplicaPreference::CACHE_LOCAL;
+ } else {
+ memory_distance = TReplicaPreference::DISK_LOCAL;
+ }
+ } else {
+ memory_distance = TReplicaPreference::REMOTE;
+ }
+ memory_distance = max(memory_distance, base_distance);
+
+ // We only need to collect backend candidates for non-remote reads, as it is the
+ // nature of remote reads that there is no backend available.
+ if (memory_distance < TReplicaPreference::REMOTE) {
+ DCHECK(has_local_backend);
+ // Check if we found a closer replica than the previous ones.
+ if (memory_distance < min_distance) {
+ min_distance = memory_distance;
+ backend_candidates.clear();
+ backend_candidates.push_back(backend_ip);
+ } else if (memory_distance == min_distance) {
+ backend_candidates.push_back(backend_ip);
+ }
+ }
+ }
+ } // End of candidate selection.
+ DCHECK(!backend_candidates.empty() || min_distance == TReplicaPreference::REMOTE);
- PerNodeScanRanges* scan_ranges =
- FindOrInsert(assignment, exec_hostport, PerNodeScanRanges());
- vector<TScanRangeParams>* scan_range_params_list =
- FindOrInsert(scan_ranges, node_id, vector<TScanRangeParams>());
- // add scan range
- TScanRangeParams scan_range_params;
- scan_range_params.scan_range = scan_range_locations.scan_range;
- // Explicitly set the optional fields.
- scan_range_params.__set_volume_id(volume_id);
- scan_range_params.__set_is_cached(is_cached);
- scan_range_params.__set_is_remote(remote_read);
- scan_range_params_list->push_back(scan_range_params);
- }
+ // Check the effective memory distance of the candidates to decide whether to treat
+ // the scan range as cached.
+ bool cached_replica = min_distance == TReplicaPreference::CACHE_LOCAL;
- if (VLOG_FILE_IS_ON) {
- VLOG_FILE << "Total remote scan volume = " <<
- PrettyPrinter::Print(remote_bytes, TUnit::BYTES);
- VLOG_FILE << "Total local scan volume = " <<
- PrettyPrinter::Print(local_bytes, TUnit::BYTES);
- VLOG_FILE << "Total cached scan volume = " <<
- PrettyPrinter::Print(cached_bytes, TUnit::BYTES);
- if (remote_hosts.size() > 0) {
- stringstream remote_node_log;
- remote_node_log << "Remote data node list: ";
- for (const TNetworkAddress& remote_host: remote_hosts) {
- remote_node_log << remote_host << " ";
- }
- VLOG_FILE << remote_node_log.str();
- }
+ // Pick backend host based on data location.
+ bool local_backend = min_distance != TReplicaPreference::REMOTE;
- for (FragmentScanRangeAssignment::value_type& entry: *assignment) {
- VLOG_FILE << "ScanRangeAssignment: server=" << ThriftDebugString(entry.first);
- for (PerNodeScanRanges::value_type& per_node_scan_ranges: entry.second) {
- stringstream str;
- for (TScanRangeParams& params: per_node_scan_ranges.second) {
- str << ThriftDebugString(params) << " ";
- }
- VLOG_FILE << "node_id=" << per_node_scan_ranges.first << " ranges=" << str.str();
+ if (!local_backend) {
+ remote_scan_range_locations.push_back(&scan_range_locations);
+ continue;
}
- }
+ // For local reads we want to break ties by backend rank in these cases:
+ // - if it is enforced via a query option.
+ // - when selecting between cached replicas. In this case there is no OS buffer
+ // cache to worry about.
+ // Remote reads will always break ties by backend rank.
+ bool decide_local_assignment_by_rank = random_replica || cached_replica;
+ const IpAddr* backend_ip = NULL;
+ backend_ip = assignment_ctx.SelectLocalBackendHost(backend_candidates,
+ decide_local_assignment_by_rank);
+ TBackendDescriptor backend;
+ assignment_ctx.SelectBackendOnHost(*backend_ip, &backend);
+ assignment_ctx.RecordScanRangeAssignment(backend, node_id, host_list,
+ scan_range_locations, assignment);
+ } // End of backend host selection.
+ } // End of for loop over scan ranges.
+
+ // Assign remote scans to backends.
+ for (const TScanRangeLocations* scan_range_locations: remote_scan_range_locations) {
+ const IpAddr* backend_ip = assignment_ctx.SelectRemoteBackendHost();
+ TBackendDescriptor backend;
+ assignment_ctx.SelectBackendOnHost(*backend_ip, &backend);
+ assignment_ctx.RecordScanRangeAssignment(backend, node_id, host_list,
+ *scan_range_locations, assignment);
}
+ if (VLOG_FILE_IS_ON) assignment_ctx.PrintAssignment(*assignment);
+
return Status::OK();
}
@@ -695,7 +554,6 @@ void SimpleScheduler::ComputeFragmentExecParams(const TQueryExecRequest& exec_re
void SimpleScheduler::ComputeFragmentHosts(const TQueryExecRequest& exec_request,
QuerySchedule* schedule) {
vector<FragmentExecParams>* fragment_exec_params = schedule->exec_params();
- TNetworkAddress coord = MakeNetworkAddress(FLAGS_hostname, FLAGS_be_port);
DCHECK_EQ(fragment_exec_params->size(), exec_request.fragments.size());
vector<TPlanNodeType::type> scan_node_types;
scan_node_types.push_back(TPlanNodeType::HDFS_SCAN_NODE);
@@ -710,7 +568,7 @@ void SimpleScheduler::ComputeFragmentHosts(const TQueryExecRequest& exec_request
FragmentExecParams& params = (*fragment_exec_params)[i];
if (fragment.partition.type == TPartitionType::UNPARTITIONED) {
// all single-node fragments run on the coordinator host
- params.hosts.push_back(coord);
+ params.hosts.push_back(local_backend_descriptor_.address);
continue;
}
@@ -822,7 +680,7 @@ void SimpleScheduler::GetScanHosts(TPlanNodeId scan_id,
// TODO: we'll need to revisit this strategy once we can partition joins
// (in which case this fragment might be executing a right outer join
// with a large build table)
- scan_hosts->push_back(MakeNetworkAddress(FLAGS_hostname, FLAGS_be_port));
+ scan_hosts->push_back(local_backend_descriptor_.address);
return;
}
@@ -890,7 +748,7 @@ Status SimpleScheduler::Schedule(Coordinator* coord, QuerySchedule* schedule) {
if (!status.ok()) {
// Warn about missing table and/or column stats if necessary.
const TQueryCtx& query_ctx = schedule->request().query_ctx;
- if(!query_ctx.__isset.parent_query_id &&
+ if (!query_ctx.__isset.parent_query_id &&
query_ctx.__isset.tables_missing_stats &&
!query_ctx.tables_missing_stats.empty()) {
status.AddDetail(GetTablesMissingStatsWarning(query_ctx.tables_missing_stats));
@@ -1007,4 +865,297 @@ void SimpleScheduler::HandleLostResource(const TUniqueId& client_resource_id) {
}
}
+Status SimpleScheduler::HostnameToIpAddr(const Hostname& hostname, IpAddr* ip) {
+ // Try to resolve via the operating system.
+ vector<IpAddr> ipaddrs;
+ Status status = HostnameToIpAddrs(hostname, &ipaddrs);
+ if (!status.ok() || ipaddrs.empty()) {
+ stringstream ss;
+ ss << "Failed to resolve " << hostname << ": " << status.GetDetail();
+ return Status(ss.str());
+ }
+
+ // HostnameToIpAddrs() calls getaddrinfo() from glibc and will preserve the order of the
+ // result. RFC 3484 only specifies a partial order so we need to sort the addresses
+ // before picking the first non-localhost one.
+ sort(ipaddrs.begin(), ipaddrs.end());
+
+ // Try to find a non-localhost address, otherwise just use the first IP address
+ // returned.
+ *ip = ipaddrs[0];
+ if (!FindFirstNonLocalhost(ipaddrs, ip)) {
+ VLOG(3) << "Only localhost addresses found for " << hostname;
+ }
+ return Status::OK();
+}
+
+SimpleScheduler::BackendConfig::BackendConfig(
+ const std::vector<TNetworkAddress>& backends) {
+ // Construct backend_map and backend_ip_map.
+ for (int i = 0; i < backends.size(); ++i) {
+ IpAddr ip;
+ Status status = HostnameToIpAddr(backends[i].hostname, &ip);
+ if (!status.ok()) {
+ VLOG(1) << status.GetDetail();
+ continue;
+ }
+
+ BackendMap::iterator it = backend_map_.find(ip);
+ if (it == backend_map_.end()) {
+ it = backend_map_.insert(
+ make_pair(ip, BackendList())).first;
+ backend_ip_map_[backends[i].hostname] = ip;
+ }
+
+ TBackendDescriptor descriptor;
+ descriptor.address = MakeNetworkAddress(ip, backends[i].port);
+ descriptor.ip_address = ip;
+ it->second.push_back(descriptor);
+ }
+}
+
+void SimpleScheduler::BackendConfig::AddBackend(const TBackendDescriptor& be_desc) {
+ BackendList* be_descs = &backend_map_[be_desc.ip_address];
+ if (find(be_descs->begin(), be_descs->end(), be_desc) == be_descs->end()) {
+ be_descs->push_back(be_desc);
+ }
+ backend_ip_map_[be_desc.address.hostname] = be_desc.ip_address;
+}
+
+void SimpleScheduler::BackendConfig::RemoveBackend(const TBackendDescriptor& be_desc) {
+ backend_ip_map_.erase(be_desc.address.hostname);
+ auto be_descs_it = backend_map_.find(be_desc.ip_address);
+ if (be_descs_it != backend_map_.end()) {
+ BackendList* be_descs = &be_descs_it->second;
+ be_descs->erase(remove(be_descs->begin(), be_descs->end(), be_desc), be_descs->end());
+ if (be_descs->empty()) backend_map_.erase(be_descs_it);
+ }
+}
+
+bool SimpleScheduler::BackendConfig::LookUpBackendIp(const Hostname& hostname,
+ IpAddr* ip) const {
+ // Check if hostname is already a valid IP address.
+ if (backend_map_.find(hostname) != backend_map_.end()) {
+ if (ip) *ip = hostname;
+ return true;
+ }
+ auto it = backend_ip_map_.find(hostname);
+ if (it != backend_ip_map_.end()) {
+ if (ip) *ip = it->second;
+ return true;
+ }
+ return false;
+}
+
+SimpleScheduler::AssignmentCtx::AssignmentCtx(
+ const BackendConfig& backend_config,
+ IntCounter* total_assignments, IntCounter* total_local_assignments)
+ : backend_config_(backend_config), first_unused_backend_idx_(0),
+ total_assignments_(total_assignments),
+ total_local_assignments_(total_local_assignments) {
+ random_backend_order_.reserve(backend_map().size());
+ for (auto& v: backend_map()) random_backend_order_.push_back(&v);
+ std::mt19937 g(rand());
+ std::shuffle(random_backend_order_.begin(), random_backend_order_.end(), g);
+ // Initialize inverted map for backend rank lookups
+ int i = 0;
+ for (const BackendMap::value_type* v: random_backend_order_) {
+ random_backend_rank_[v->first] = i++;
+ }
+}
+
+const SimpleScheduler::IpAddr* SimpleScheduler::AssignmentCtx::SelectLocalBackendHost(
+ const std::vector<IpAddr>& data_locations, bool break_ties_by_rank) {
+ DCHECK(!data_locations.empty());
+ // List of candidate indexes into 'data_locations'.
+ vector<int> candidates_idxs;
+ // Find locations with minimum number of assigned bytes.
+ int64_t min_assigned_bytes = numeric_limits<int64_t>::max();
+ for (int i = 0; i < data_locations.size(); ++i) {
+ const IpAddr& backend_ip = data_locations[i];
+ int64_t assigned_bytes = 0;
+ auto handle_it = assignment_heap_.find(backend_ip);
+ if (handle_it != assignment_heap_.end()) {
+ assigned_bytes = (*handle_it->second).assigned_bytes;
+ }
+ if (assigned_bytes < min_assigned_bytes) {
+ candidates_idxs.clear();
+ min_assigned_bytes = assigned_bytes;
+ }
+ if (assigned_bytes == min_assigned_bytes) candidates_idxs.push_back(i);
+ }
+
+ DCHECK(!candidates_idxs.empty());
+ auto min_rank_idx = candidates_idxs.begin();
+ if (break_ties_by_rank) {
+ min_rank_idx = min_element(candidates_idxs.begin(), candidates_idxs.end(),
+ [&data_locations, this](const int& a, const int& b) {
+ return GetBackendRank(data_locations[a]) < GetBackendRank(data_locations[b]);
+ });
+ }
+ return &data_locations[*min_rank_idx];
+}
+
+const SimpleScheduler::IpAddr* SimpleScheduler::AssignmentCtx::SelectRemoteBackendHost() {
+ const IpAddr* candidate_ip;
+ if (HasUnusedBackends()) {
+ // Pick next unused backend.
+ candidate_ip = GetNextUnusedBackendAndIncrement();
+ } else {
+ // Pick next backend from assignment_heap. All backends must have been inserted into
+ // the heap at this point.
+ DCHECK(backend_config_.NumBackends() == assignment_heap_.size());
+ candidate_ip = &(assignment_heap_.top().ip);
+ }
+ DCHECK(candidate_ip != NULL);
+ return candidate_ip;
+}
+
+bool SimpleScheduler::AssignmentCtx::HasUnusedBackends() const {
+ return first_unused_backend_idx_ < random_backend_order_.size();
+}
+
+const SimpleScheduler::IpAddr*
+ SimpleScheduler::AssignmentCtx::GetNextUnusedBackendAndIncrement() {
+ DCHECK(HasUnusedBackends());
+ const IpAddr* ip = &(random_backend_order_[first_unused_backend_idx_++])->first;
+ DCHECK(backend_map().find(*ip) != backend_map().end());
+ return ip;
+}
+
+int SimpleScheduler::AssignmentCtx::GetBackendRank(const IpAddr& ip) const {
+ auto it = random_backend_rank_.find(ip);
+ DCHECK(it != random_backend_rank_.end());
+ return it->second;
+}
+
+void SimpleScheduler::AssignmentCtx::SelectBackendOnHost(const IpAddr& backend_ip,
+ TBackendDescriptor* backend) {
+ BackendMap::const_iterator backend_it = backend_map().find(backend_ip);
+ DCHECK(backend_it != backend_map().end());
+ const BackendList& backends_on_host = backend_it->second;
+ DCHECK(backends_on_host.size() > 0);
+ if (backends_on_host.size() == 1) {
+ *backend = *backends_on_host.begin();
+ } else {
+ BackendList::const_iterator* next_backend_on_host;
+ next_backend_on_host = FindOrInsert(&next_backend_per_host_, backend_ip,
+ backends_on_host.begin());
+ DCHECK(find(backends_on_host.begin(), backends_on_host.end(), **next_backend_on_host)
+ != backends_on_host.end());
+ *backend = **next_backend_on_host;
+ // Rotate
+ ++(*next_backend_on_host);
+ if (*next_backend_on_host == backends_on_host.end()) {
+ *next_backend_on_host = backends_on_host.begin();
+ }
+ }
+}
+
+void SimpleScheduler::AssignmentCtx::RecordScanRangeAssignment(
+ const TBackendDescriptor& backend, PlanNodeId node_id,
+ const vector<TNetworkAddress>& host_list,
+ const TScanRangeLocations& scan_range_locations,
+ FragmentScanRangeAssignment* assignment) {
+ int64_t scan_range_length = 0;
+ if (scan_range_locations.scan_range.__isset.hdfs_file_split) {
+ scan_range_length = scan_range_locations.scan_range.hdfs_file_split.length;
+ } else if (scan_range_locations.scan_range.__isset.kudu_key_range) {
+ // Hack so that kudu ranges are well distributed.
+ // TODO: KUDU-1133 Use the tablet size instead.
+ scan_range_length = 1000;
+ }
+
+ IpAddr backend_ip;
+ backend_config_.LookUpBackendIp(backend.address.hostname, &backend_ip);
+ DCHECK(backend_map().find(backend_ip) != backend_map().end());
+ assignment_heap_.InsertOrUpdate(backend_ip, scan_range_length,
+ GetBackendRank(backend_ip));
+
+ // See if the read will be remote. This is not the case if the impalad runs on one of
+ // the replica's datanodes.
+ bool remote_read = true;
+ // For local reads we can set volume_id and is_cached. For remote reads HDFS will
+ // decide which replica to use so we keep those at default values.
+ int volume_id = -1;
+ bool is_cached = false;
+ for (const TScanRangeLocation& location: scan_range_locations.locations) {
+ const TNetworkAddress& replica_host = host_list[location.host_idx];
+ IpAddr replica_ip;
+ if (backend_config_.LookUpBackendIp(replica_host.hostname, &replica_ip)
+ && backend_ip == replica_ip) {
+ remote_read = false;
+ volume_id = location.volume_id;
+ is_cached = location.is_cached;
+ break;
+ }
+ }
+
+ if (remote_read) {
+ assignment_byte_counters_.remote_bytes += scan_range_length;
+ } else {
+ assignment_byte_counters_.local_bytes += scan_range_length;
+ if (is_cached) assignment_byte_counters_.cached_bytes += scan_range_length;
+ }
+
+ if (total_assignments_ != NULL) {
+ DCHECK(total_local_assignments_ != NULL);
+ total_assignments_->Increment(1);
+ if (!remote_read) total_local_assignments_->Increment(1);
+ }
+
+ PerNodeScanRanges* scan_ranges = FindOrInsert(assignment, backend.address,
+ PerNodeScanRanges());
+ vector<TScanRangeParams>* scan_range_params_list = FindOrInsert(scan_ranges, node_id,
+ vector<TScanRangeParams>());
+ // Add scan range.
+ TScanRangeParams scan_range_params;
+ scan_range_params.scan_range = scan_range_locations.scan_range;
+ scan_range_params.__set_volume_id(volume_id);
+ scan_range_params.__set_is_cached(is_cached);
+ scan_range_params.__set_is_remote(remote_read);
+ scan_range_params_list->push_back(scan_range_params);
+
+ if (VLOG_FILE_IS_ON) {
+ VLOG_FILE << "SimpleScheduler assignment to backend: " << backend.address
+ << "(" << (remote_read ? "remote" : "local") << " selection)";
+ }
+}
+
+void SimpleScheduler::AssignmentCtx::PrintAssignment(
+ const FragmentScanRangeAssignment& assignment) {
+ VLOG_FILE << "Total remote scan volume = " <<
+ PrettyPrinter::Print(assignment_byte_counters_.remote_bytes, TUnit::BYTES);
+ VLOG_FILE << "Total local scan volume = " <<
+ PrettyPrinter::Print(assignment_byte_counters_.local_bytes, TUnit::BYTES);
+ VLOG_FILE << "Total cached scan volume = " <<
+ PrettyPrinter::Print(assignment_byte_counters_.cached_bytes, TUnit::BYTES);
+
+ for (const FragmentScanRangeAssignment::value_type& entry: assignment) {
+ VLOG_FILE << "ScanRangeAssignment: server=" << ThriftDebugString(entry.first);
+ for (const PerNodeScanRanges::value_type& per_node_scan_ranges: entry.second) {
+ stringstream str;
+ for (const TScanRangeParams& params: per_node_scan_ranges.second) {
+ str << ThriftDebugString(params) << " ";
+ }
+ VLOG_FILE << "node_id=" << per_node_scan_ranges.first << " ranges=" << str.str();
+ }
+ }
+}
+
+void SimpleScheduler::AddressableAssignmentHeap::InsertOrUpdate(const IpAddr& ip,
+ int64_t assigned_bytes, int rank) {
+ auto handle_it = backend_handles_.find(ip);
+ if (handle_it == backend_handles_.end()) {
+ AssignmentHeap::handle_type handle = backend_heap_.push({assigned_bytes, rank, ip});
+ backend_handles_.emplace(ip, handle);
+ } else {
+ // We need to rebuild the heap after every update operation. Calling decrease once is
+ // sufficient as both assignments decrease the key.
+ AssignmentHeap::handle_type handle = handle_it->second;
+ (*handle).assigned_bytes += assigned_bytes;
+ backend_heap_.decrease(handle);
+ }
+}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0ad935b6/be/src/scheduling/simple-scheduler.h
----------------------------------------------------------------------
diff --git a/be/src/scheduling/simple-scheduler.h b/be/src/scheduling/simple-scheduler.h
index 5cff7a5..a3a9290 100644
--- a/be/src/scheduling/simple-scheduler.h
+++ b/be/src/scheduling/simple-scheduler.h
@@ -20,6 +20,7 @@
#include <vector>
#include <string>
#include <list>
+#include <boost/heap/binomial_heap.hpp>
#include <boost/unordered_map.hpp>
#include <boost/thread/mutex.hpp>
@@ -28,6 +29,7 @@
#include "statestore/statestore-subscriber.h"
#include "statestore/statestore.h"
#include "util/metrics.h"
+#include "util/runtime-profile.h"
#include "scheduling/admission-controller.h"
#include "gen-cpp/Types_types.h" // for TNetworkAddress
#include "gen-cpp/ResourceBrokerService_types.h"
@@ -42,10 +44,26 @@ class SchedulerWrapper;
/// Performs simple scheduling by matching between a list of backends configured
/// either from the statestore, or from a static list of addresses, and a list
-/// of target data locations.
-//
+/// of target data locations. The current set of backends is stored in backend_config_.
+/// When receiving changes to the backend configuration from the statestore we will make a
+/// copy of this configuration, apply the updates to the copy and atomically swap the
+/// contents of the backend_config_ pointer.
+///
/// TODO: Notice when there are duplicate statestore registrations (IMPALA-23)
-/// TODO: Handle deltas from the statestore
+/// TODO: Track assignments (assignment_ctx in ComputeScanRangeAssignment) per query
+/// instead of per plan node?
+/// TODO: Remove disable_cached_reads query option in CDH6
+/// TODO: Replace the usage of shared_ptr with atomic_shared_ptr once compilers support
+/// it. Alternatively consider using Kudu's rw locks.
+/// TODO: Inject global dependencies into the class (for example ExecEnv::GetInstance(),
+/// RNG used during scheduling, FLAGS_*)
+/// to make it testable.
+/// TODO: Benchmark the performance of the scheduler. The tests need to include setups
+/// with:
+/// - Small and large number of backends.
+/// - Small and large query plans.
+/// - Scheduling query plans with concurrent updates to the internal backend
+/// configuration.
class SimpleScheduler : public Scheduler {
public:
static const std::string IMPALA_MEMBERSHIP_TOPIC;
@@ -64,20 +82,7 @@ class SimpleScheduler : public Scheduler {
Webserver* webserver, ResourceBroker* resource_broker,
RequestPoolService* request_pool_service);
- /// Return a backend such that the impalad at backend.address should be used to read
- /// data from the given data_loation
- virtual impala::Status GetBackend(const TNetworkAddress& data_location,
- TBackendDescriptor* backend);
-
- virtual void GetAllKnownBackends(BackendList* backends);
-
- virtual bool HasLocalBackend(const TNetworkAddress& data_location) {
- boost::lock_guard<boost::mutex> l(backend_map_lock_);
- BackendMap::iterator entry = backend_map_.find(data_location.hostname);
- return (entry != backend_map_.end() && entry->second.size() > 0);
- }
-
- /// Registers with the subscription manager if required
+ /// Register with the subscription manager if required
virtual impala::Status Init();
virtual Status Schedule(Coordinator* coord, QuerySchedule* schedule);
@@ -87,20 +92,222 @@ class SimpleScheduler : public Scheduler {
virtual void HandleLostResource(const TUniqueId& client_resource_id);
private:
- /// Protects access to backend_map_ and backend_ip_map_, which might otherwise be
- /// updated asynchronously with respect to reads. Also protects the locality
- /// counters, which are updated in GetBackends.
- boost::mutex backend_map_lock_;
-
- /// Map from a datanode's IP address to a list of backend addresses running on that
- /// node.
- typedef boost::unordered_map<std::string, std::list<TBackendDescriptor>> BackendMap;
- BackendMap backend_map_;
-
- /// Map from a datanode's hostname to its IP address to support both hostname based
- /// lookup.
- typedef boost::unordered_map<std::string, std::string> BackendIpAddressMap;
- BackendIpAddressMap backend_ip_map_;
+ /// Type to store hostnames, which can be rfc1123 hostnames or IPv4 addresses.
+ typedef std::string Hostname;
+
+ /// Type to store IPv4 addresses.
+ typedef std::string IpAddr;
+
+ typedef std::list<TBackendDescriptor> BackendList;
+
+ /// Map from a host's IP address to a list of backends running on that node.
+ typedef boost::unordered_map<IpAddr, BackendList> BackendMap;
+
+ /// Map from a host's IP address to the next backend to be round-robin scheduled for
+ /// that host (needed for setups with multiple backends on a single host)
+ typedef boost::unordered_map<IpAddr, BackendList::const_iterator> NextBackendPerHost;
+
+ /// Map from a hostname to its IP address to support hostname based backend lookup.
+ typedef boost::unordered_map<Hostname, IpAddr> BackendIpAddressMap;
+
+ /// Configuration class to store a list of backends per IP address and a mapping from
+ /// hostnames to IP addresses. backend_ip_map contains entries for all backends in
+ /// backend_map and needs to be updated whenever backend_map changes. Each plan node
+ /// creates a read-only copy of the scheduler's current backend_config_ to use during
+ /// scheduling.
+ class BackendConfig {
+ public:
+ BackendConfig() {}
+
+ /// Construct config from list of backends.
+ BackendConfig(const std::vector<TNetworkAddress>& backends);
+
+ void AddBackend(const TBackendDescriptor& be_desc);
+ void RemoveBackend(const TBackendDescriptor& be_desc);
+
+ /// Look up the IP address of 'hostname' in the internal backend maps and return
+ /// whether the lookup was successful. If 'hostname' itself is a valid IP address then
+ /// it is copied to 'ip' and true is returned. 'ip' can be NULL if the caller only
+ /// wants to check whether the lookup succeeds. Use this method to resolve datanode
+ /// hostnames to IP addresses during scheduling, to prevent blocking on the OS.
+ bool LookUpBackendIp(const Hostname& hostname, IpAddr* ip) const;
+
+ int NumBackends() const { return backend_map().size(); }
+
+ const BackendMap& backend_map() const { return backend_map_; }
+ const BackendIpAddressMap& backend_ip_map() const { return backend_ip_map_; }
+
+ private:
+ BackendMap backend_map_;
+ BackendIpAddressMap backend_ip_map_;
+ };
+
+ typedef std::shared_ptr<const BackendConfig> BackendConfigPtr;
+
+ /// Internal structure to track scan range assignments for a backend host. This struct
+ /// is used as the heap element in and maintained by AddressableAssignmentHeap.
+ struct BackendAssignmentInfo {
+ /// The number of bytes assigned to a backend host.
+ int64_t assigned_bytes;
+
+ /// Each host gets assigned a random rank to break ties in a random but deterministic
+ /// order per plan node.
+ const int random_rank;
+
+ /// IP address of the backend.
+ IpAddr ip;
+
+ /// Compare two elements of this struct. The key is (assigned_bytes, random_rank).
+ bool operator>(const BackendAssignmentInfo& rhs) const {
+ if (assigned_bytes != rhs.assigned_bytes) {
+ return assigned_bytes > rhs.assigned_bytes;
+ }
+ return random_rank > rhs.random_rank;
+ }
+ };
+
+ /// Heap to compute candidates for scan range assignments. Elements are of type
+ /// BackendAssignmentInfo and track assignment information for each backend. By default
+ /// boost implements a max-heap so we use std::greater<T> to obtain a min-heap. This
+ /// will make the top() element of the heap be the backend with the lowest number of
+ /// assigned bytes and the lowest random rank.
+ typedef boost::heap::binomial_heap<BackendAssignmentInfo,
+ boost::heap::compare<std::greater<BackendAssignmentInfo>>> AssignmentHeap;
+
+ /// Map to look up handles to heap elements to modify heap element keys.
+ typedef boost::unordered_map<IpAddr, AssignmentHeap::handle_type> BackendHandleMap;
+
+ /// Class to store backend information in an addressable heap. In addition to
+ /// AssignmentHeap it can be used to look up heap elements by their IP address and
+ /// update their key. For each plan node we create a new heap, so they are not shared
+ /// between concurrent invocations of the scheduler.
+ class AddressableAssignmentHeap {
+ public:
+ const AssignmentHeap& backend_heap() const { return backend_heap_; }
+ const BackendHandleMap& backend_handles() const { return backend_handles_; }
+
+ void InsertOrUpdate(const IpAddr& ip, int64_t assigned_bytes, int rank);
+
+ // Forward interface for boost::heap
+ decltype(auto) size() const { return backend_heap_.size(); }
+ decltype(auto) top() const { return backend_heap_.top(); }
+
+ // Forward interface for boost::unordered_map
+ decltype(auto) find(const IpAddr& ip) const { return backend_handles_.find(ip); }
+ decltype(auto) end() const { return backend_handles_.end(); }
+
+ private:
+ // Heap to determine next backend.
+ AssignmentHeap backend_heap_;
+ // Maps backend IPs to handles in the heap.
+ BackendHandleMap backend_handles_;
+ };
+
+ /// Class to store context information on assignments during scheduling. It is
+ /// initialized with a copy of the global backend information and assigns a random rank
+ /// to each backend to break ties in cases where multiple backends have been assigned
+ /// the same number or bytes. It tracks the number of assigned bytes, which backends
+ /// have already been used, etc. Objects of this class are created in
+ /// ComputeScanRangeAssignment() and thus don't need to be thread safe.
+ class AssignmentCtx {
+ public:
+ AssignmentCtx(const BackendConfig& backend_config, IntCounter* total_assignments,
+ IntCounter* total_local_assignments);
+
+ /// Among hosts in 'data_locations', select the one with the minimum number of
+ /// assigned bytes. If backends have been assigned equal amounts of work and
+ /// 'break_ties_by_rank' is true, then the backend rank is used to break ties.
+ /// Otherwise the first backend according to their order in 'data_locations' is
+ /// selected.
+ const IpAddr* SelectLocalBackendHost(const std::vector<IpAddr>& data_locations,
+ bool break_ties_by_rank);
+
+ /// Select a backend host for a remote read. If there are unused backend hosts, then
+ /// those will be preferred. Otherwise the one with the lowest number of assigned
+ /// bytes is picked. If backends have been assigned equal amounts of work, then the
+ /// backend rank is used to break ties.
+ const IpAddr* SelectRemoteBackendHost();
+
+ /// Return the next backend that has not been assigned to. This assumes that a
+ /// returned backend will also be assigned to. The caller must make sure that
+ /// HasUnusedBackends() is true.
+ const IpAddr* GetNextUnusedBackendAndIncrement();
+
+ /// Pick a backend in round-robin fashion from multiple backends on a single host.
+ void SelectBackendOnHost(const IpAddr& backend_ip, TBackendDescriptor* backend);
+
+ /// Build a new TScanRangeParams object and append it to the assignment list for the
+ /// tuple (backend, node_id) in 'assignment'. Also, update assignment_heap_ and
+ /// assignment_byte_counters_, increase the counters 'total_assignments_' and
+ /// 'total_local_assignments_'. 'scan_range_locations' contains information about the
+ /// scan range and its replica locations.
+ void RecordScanRangeAssignment(const TBackendDescriptor& backend, PlanNodeId node_id,
+ const vector<TNetworkAddress>& host_list,
+ const TScanRangeLocations& scan_range_locations,
+ FragmentScanRangeAssignment* assignment);
+
+ const BackendConfig& backend_config() const { return backend_config_; }
+ const BackendMap& backend_map() const { return backend_config_.backend_map(); }
+
+ /// Print the assignment and statistics to VLOG_FILE.
+ void PrintAssignment(const FragmentScanRangeAssignment& assignment);
+
+ private:
+ /// A struct to track various counts of assigned bytes during scheduling.
+ struct AssignmentByteCounters {
+ int64_t remote_bytes = 0;
+ int64_t local_bytes = 0;
+ int64_t cached_bytes = 0;
+ };
+
+ /// Used to look up hostnames to IP addresses and IP addresses to backend.
+ const BackendConfig& backend_config_;
+
+ // Addressable heap to select remote backends from. Elements are ordered by the number
+ // of already assigned bytes (and a random rank to break ties).
+ AddressableAssignmentHeap assignment_heap_;
+
+ /// Store a random rank per backend host to break ties between otherwise equivalent
+ /// replicas (e.g., those having the same number of assigned bytes).
+ boost::unordered_map<IpAddr, int> random_backend_rank_;
+
+ // Index into random_backend_order. It points to the first unused backend and is used
+ // to select unused backends and inserting them into the assignment_heap_.
+ int first_unused_backend_idx_;
+
+ /// Store a random permutation of backend hosts to select backends from.
+ std::vector<const BackendMap::value_type*> random_backend_order_;
+
+ /// Track round robin information per backend host.
+ NextBackendPerHost next_backend_per_host_;
+
+ /// Track number of assigned bytes that have been read from cache, locally, or
+ /// remotely.
+ AssignmentByteCounters assignment_byte_counters_;
+
+ /// Pointers to the scheduler's counters.
+ IntCounter* total_assignments_;
+ IntCounter* total_local_assignments_;
+
+ /// Return whether there are backends that have not been assigned a scan range.
+ bool HasUnusedBackends() const;
+
+ /// Return the rank of a backend.
+ int GetBackendRank(const IpAddr& ip) const;
+ };
+
+ /// The scheduler's backend configuration. When receiving changes to the backend
+ /// configuration from the statestore we will make a copy of the stored object, apply
+ /// the updates to the copy and atomically swap the contents of this pointer.
+ BackendConfigPtr backend_config_;
+
+ /// Protect access to backend_config_ which might otherwise be updated asynchronously
+ /// with respect to reads.
+ mutable boost::mutex backend_config_lock_;
+
+ /// Total number of scan ranges assigned to backends during the lifetime of the
+ /// scheduler.
+ int64_t num_assignments_;
/// Map from unique backend id to TBackendDescriptor. Used to track the known backends
/// from the statestore. It's important to track both the backend ID as well as the
@@ -116,19 +323,16 @@ class SimpleScheduler : public Scheduler {
/// Webserver for /backends. Not owned by us.
Webserver* webserver_;
- /// round robin entry in BackendMap for non-local host assignment
- BackendMap::iterator next_nonlocal_backend_entry_;
-
/// Pointer to a subscription manager (which we do not own) which is used to register
/// for dynamic updates to the set of available backends. May be NULL if the set of
/// backends is fixed.
StatestoreSubscriber* statestore_subscriber_;
- /// Unique - across the cluster - identifier for this impala backend
- const std::string backend_id_;
+ /// Unique - across the cluster - identifier for this impala backend.
+ const std::string local_backend_id_;
- /// Describes this backend, including the Impalad service address
- TBackendDescriptor backend_descriptor_;
+ /// Describe this backend, including the Impalad service address.
+ TBackendDescriptor local_backend_descriptor_;
ThriftSerializer thrift_serializer_;
@@ -136,25 +340,23 @@ class SimpleScheduler : public Scheduler {
IntCounter* total_assignments_;
IntCounter* total_local_assignments_;
- /// Initialisation metric
- BooleanProperty* initialised_;
+ /// Initialization metric
+ BooleanProperty* initialized_;
+
/// Current number of backends
IntGauge* num_fragment_instances_metric_;
- /// Counts the number of UpdateMembership invocations, to help throttle the logging.
- uint32_t update_count_;
-
- /// Protects active_reservations_ and active_client_resources_.
+ /// Protect active_reservations_ and active_client_resources_.
boost::mutex active_resources_lock_;
- /// Maps from a Llama reservation id to the coordinator of the query using that
+ /// Map from a Llama reservation id to the coordinator of the query using that
/// reservation. The map is used to cancel queries whose reservation has been preempted.
/// Entries are added in Schedule() calls that result in granted resource allocations.
/// Entries are removed in Release().
typedef boost::unordered_map<TUniqueId, Coordinator*> ActiveReservationsMap;
ActiveReservationsMap active_reservations_;
- /// Maps from client resource id to the coordinator of the query using that resource.
+ /// Map from client resource id to the coordinator of the query using that resource.
/// The map is used to cancel queries whose resource(s) have been preempted.
/// Entries are added in Schedule() calls that result in granted resource allocations.
/// Entries are removed in Release().
@@ -172,12 +374,20 @@ class SimpleScheduler : public Scheduler {
/// Used to make admission decisions in 'Schedule()'
boost::scoped_ptr<AdmissionController> admission_controller_;
- /// Adds the granted reservation and resources to the active_reservations_ and
+ /// Helper methods to access backend_config_ (the shared_ptr, not its contents),
+ /// protecting the access with backend_config_lock_.
+ BackendConfigPtr GetBackendConfig() const;
+ void SetBackendConfig(const BackendConfigPtr& backend_config);
+
+ /// Return a list of all backends registered with the scheduler.
+ void GetAllKnownBackends(BackendList* backends);
+
+ /// Add the granted reservation and resources to the active_reservations_ and
/// active_client_resources_ maps, respectively.
void AddToActiveResourceMaps(
const TResourceBrokerReservationResponse& reservation, Coordinator* coord);
- /// Removes the given reservation and resources from the active_reservations_ and
+ /// Remove the given reservation and resources from the active_reservations_ and
/// active_client_resources_ maps, respectively.
void RemoveFromActiveResourceMaps(
const TResourceBrokerReservationResponse& reservation);
@@ -194,64 +404,126 @@ class SimpleScheduler : public Scheduler {
void BackendsUrlCallback(const Webserver::ArgumentMap& args,
rapidjson::Document* document);
- /// Determines the pool for a user and query options via request_pool_service_.
- Status GetRequestPool(const std::string& user,
- const TQueryOptions& query_options, std::string* pool) const;
+ /// Determine the pool for a user and query options via request_pool_service_.
+ Status GetRequestPool(const std::string& user, const TQueryOptions& query_options,
+ std::string* pool) const;
- /// Computes the assignment of scan ranges to hosts for each scan node in schedule.
- /// Unpartitioned fragments are assigned to the coord. Populates the schedule's
+ /// Compute the assignment of scan ranges to hosts for each scan node in 'schedule'.
+ /// Unpartitioned fragments are assigned to the coordinator. Populate the schedule's
/// fragment_exec_params_ with the resulting scan range assignment.
Status ComputeScanRangeAssignment(const TQueryExecRequest& exec_request,
QuerySchedule* schedule);
- /// Does a scan range assignment (returned in 'assignment') based on a list of scan
- /// range locations for a particular scan node.
- /// If exec_at_coord is true, all scan ranges will be assigned to the coord node.
- Status ComputeScanRangeAssignment(PlanNodeId node_id,
- const TReplicaPreference::type* node_replica_preference, bool node_random_replica,
- const std::vector<TScanRangeLocations>& locations,
+ /// Process the list of scan ranges of a single plan node and compute scan range
+ /// assignments (returned in 'assignment'). The result is a mapping from hosts to their
+ /// assigned scan ranges per plan node.
+ ///
+ /// If exec_at_coord is true, all scan ranges will be assigned to the coordinator host.
+ /// Otherwise the assignment is computed for each scan range as follows:
+ ///
+ /// Scan ranges refer to data, which is usually replicated on multiple hosts. All scan
+ /// ranges where one of the replica hosts also runs an impala backend are processed
+ /// first. If more than one of the replicas run an impala backend, then the 'memory
+ /// distance' of each backend is considered. The concept of memory distance reflects the
+ /// cost of moving data into the processing backend's main memory. Reading from cached
+ /// replicas is generally considered less costly than reading from a local disk, which
+ /// in turn is cheaper than reading data from a remote node. If multiple backends of the
+ /// same memory distance are found, then the one with the least amount of previously
+ /// assigned work is picked, thus aiming to distribute the work as evenly as possible.
+ ///
+ /// Finally, scan ranges are considered which do not have an impalad backend running on
+ /// any of their data nodes. They will be load-balanced by assigned bytes across all
+ /// backends
+ ///
+ /// The resulting assignment is influenced by the following query options:
+ ///
+ /// replica_preference:
+ /// This value is used as a minimum memory distance for all replicas. For example, by
+ /// setting this to DISK_LOCAL, all cached replicas will be treated as if they were
+ /// not cached, but local disk replicas. This can help prevent hot-spots by spreading
+ /// the assignments over more replicas. Allowed values are CACHE_LOCAL (default),
+ /// DISK_LOCAL and REMOTE.
+ ///
+ /// disable_cached_reads:
+ /// Setting this value to true is equivalent to setting replica_preference to
+ /// DISK_LOCAL and takes precedence over replica_preference. The default setting is
+ /// false.
+ ///
+ /// schedule_random_replica:
+ /// When equivalent backends with a memory distance of DISK_LOCAL are found for a scan
+ /// range (same memory distance, same amount of assigned work), then the first one
+ /// will be picked deterministically. This aims to make better use of OS buffer
+ /// caches, but can lead to performance bottlenecks on individual hosts. Setting this
+ /// option to true will randomly change the order in which equivalent replicas are
+ /// picked for different plan nodes. This helps to compute a more even assignment,
+ /// with the downside being an increased memory usage for OS buffer caches. The
+ /// default setting is false. Selection between equivalent replicas with memory
+ /// distance of CACHE_LOCAL or REMOTE happens based on a random order.
+ ///
+ /// The method takes the following parameters:
+ ///
+ /// backend_config: Backend configuration to use for scheduling.
+ /// node_id: ID of the plan node.
+ /// node_replica_preference: Query hint equivalent to replica_preference.
+ /// node_random_replica: Query hint equivalent to schedule_random_replica.
+ /// locations: List of scan ranges to be assigned to backends.
+ /// host_list: List of hosts, into which 'locations' will index.
+ /// exec_at_coord: Whether to schedule all scan ranges on the coordinator.
+ /// query_options: Query options for the current query.
+ /// timer: Tracks execution time of ComputeScanRangeAssignment.
+ /// assignment: Output parameter, to which new assignments will be added.
+ Status ComputeScanRangeAssignment(const BackendConfig& backend_config,
+ PlanNodeId node_id, const TReplicaPreference::type* node_replica_preference,
+ bool node_random_replica, const std::vector<TScanRangeLocations>& locations,
const std::vector<TNetworkAddress>& host_list, bool exec_at_coord,
- const TQueryOptions& query_options, FragmentScanRangeAssignment* assignment);
+ const TQueryOptions& query_options, RuntimeProfile::Counter* timer,
+ FragmentScanRangeAssignment* assignment);
- /// Populates fragment_exec_params_ in schedule.
+ /// Populate fragment_exec_params_ in schedule.
void ComputeFragmentExecParams(const TQueryExecRequest& exec_request,
QuerySchedule* schedule);
- /// For each fragment in exec_request, computes hosts on which to run the instances
+ /// For each fragment in exec_request, compute the hosts on which to run the instances
/// and stores result in fragment_exec_params_.hosts.
void ComputeFragmentHosts(const TQueryExecRequest& exec_request,
QuerySchedule* schedule);
- /// Returns the id of the leftmost node of any of the given types in 'plan',
- /// or INVALID_PLAN_NODE_ID if no such node present.
+ /// Return the id of the leftmost node of any of the given types in 'plan', or
+ /// INVALID_PLAN_NODE_ID if no such node present.
PlanNodeId FindLeftmostNode(
const TPlan& plan, const std::vector<TPlanNodeType::type>& types);
- /// Returns the index (w/in exec_request.fragments) of fragment that sends its output
- /// to exec_request.fragment[fragment_idx]'s leftmost ExchangeNode.
- /// Returns INVALID_PLAN_NODE_ID if the leftmost node is not an exchange node.
- int FindLeftmostInputFragment(
- int fragment_idx, const TQueryExecRequest& exec_request);
+ /// Return the index (w/in exec_request.fragments) of fragment that sends its output to
+ /// exec_request.fragment[fragment_idx]'s leftmost ExchangeNode.
+ /// Return INVALID_PLAN_NODE_ID if the leftmost node is not an exchange node.
+ int FindLeftmostInputFragment(int fragment_idx, const TQueryExecRequest& exec_request);
- /// Adds all hosts the given scan is executed on to scan_hosts.
+ /// Add all hosts the given scan is executed on to scan_hosts.
void GetScanHosts(TPlanNodeId scan_id, const TQueryExecRequest& exec_request,
const FragmentExecParams& params, std::vector<TNetworkAddress>* scan_hosts);
- /// Returns true if 'plan' contains a node of the given type.
+ /// Return true if 'plan' contains a node of the given type.
bool ContainsNode(const TPlan& plan, TPlanNodeType::type type);
- /// Returns all ids of nodes in 'plan' of any of the given types.
- void FindNodes(const TPlan& plan,
- const std::vector<TPlanNodeType::type>& types, std::vector<TPlanNodeId>* results);
+ /// Return all ids of nodes in 'plan' of any of the given types.
+ void FindNodes(const TPlan& plan, const std::vector<TPlanNodeType::type>& types,
+ std::vector<TPlanNodeId>* results);
/// Returns the index (w/in exec_request.fragments) of fragment that sends its output
/// to the given exchange in the given fragment index.
int FindSenderFragment(TPlanNodeId exch_id, int fragment_idx,
const TQueryExecRequest& exec_request);
+ /// Deterministically resolve a host to one of its IP addresses. This method will call
+ /// into the OS, so it can take a long time to return. Use this method to resolve
+ /// hostnames during initialization and while processing statestore updates.
+ static Status HostnameToIpAddr(const Hostname& hostname, IpAddr* ip);
+
friend class impala::SchedulerWrapper;
FRIEND_TEST(SimpleAssignmentTest, ComputeAssignmentDeterministicNonCached);
FRIEND_TEST(SimpleAssignmentTest, ComputeAssignmentRandomNonCached);
+ FRIEND_TEST(SimpleAssignmentTest, ComputeAssignmentRandomDiskLocal);
+ FRIEND_TEST(SimpleAssignmentTest, ComputeAssignmentRandomRemote);
};
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0ad935b6/be/src/service/query-options.cc
----------------------------------------------------------------------
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index bdfb831..a59a31e 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -281,6 +281,22 @@ Status impala::SetQueryOption(const string& key, const string& value,
query_options->__set_optimize_partition_key_scans(
iequals(value, "true") || iequals(value, "1"));
break;
+ case TImpalaQueryOptions::REPLICA_PREFERENCE:
+ if (iequals(value, "cache_local") || iequals(value, "0")) {
+ if (query_options->disable_cached_reads) {
+ return Status("Conflicting settings: DISABLE_CACHED_READS = true and"
+ " REPLICA_PREFERENCE = CACHE_LOCAL");
+ }
+ query_options->__set_replica_preference(TReplicaPreference::CACHE_LOCAL);
+ } else if (iequals(value, "disk_local") || iequals(value, "2")) {
+ query_options->__set_replica_preference(TReplicaPreference::DISK_LOCAL);
+ } else if (iequals(value, "remote") || iequals(value, "4")) {
+ query_options->__set_replica_preference(TReplicaPreference::REMOTE);
+ } else {
+ return Status(Substitute("Invalid replica memory distance preference '$0'."
+ "Valid values are CACHE_LOCAL(0), DISK_LOCAL(2), REMOTE(4)", value));
+ }
+ break;
case TImpalaQueryOptions::SCHEDULE_RANDOM_REPLICA:
query_options->__set_schedule_random_replica(
iequals(value, "true") || iequals(value, "1"));
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0ad935b6/be/src/service/query-options.h
----------------------------------------------------------------------
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index 4104f2b..5677b56 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -65,6 +65,7 @@ class TQueryOptions;
QUERY_OPT_FN(seq_compression_mode, SEQ_COMPRESSION_MODE)\
QUERY_OPT_FN(exec_single_node_rows_threshold, EXEC_SINGLE_NODE_ROWS_THRESHOLD)\
QUERY_OPT_FN(optimize_partition_key_scans, OPTIMIZE_PARTITION_KEY_SCANS)\
+ QUERY_OPT_FN(replica_preference, REPLICA_PREFERENCE)\
QUERY_OPT_FN(schedule_random_replica, SCHEDULE_RANDOM_REPLICA)\
QUERY_OPT_FN(scan_node_codegen_threshold, SCAN_NODE_CODEGEN_THRESHOLD)\
QUERY_OPT_FN(disable_streaming_preaggregations, DISABLE_STREAMING_PREAGGREGATIONS)\