You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by sa...@apache.org on 2016/07/26 21:49:06 UTC

[2/2] incubator-impala git commit: IMPALA-2979: Fix scheduling on remote hosts

IMPALA-2979: Fix scheduling on remote hosts

Also fixes: IMPALA-2400, IMPALA-3043

This change fixes scheduling scan-ranges on remote hosts by adding
remote backend selection capability to SimpleScheduler. Prior to this
change the scheduler would try to select a local backend even when
remote scheduling was requested.

This change also allows pseudo-randomized remote backend selection to
prevent convoying, which could happen when different independent
schedulers had the same internal state, e.g. after a cluster restart. To
enable the new behavior set the query option SCHEDULE_RANDOM_REPLICA to
true.

This change also fixes IMPALA-2400: Unpredictable locality behavior
for reading Parquet files

This change also fixes IMPALA-3043: SimpleScheduler does not handle
hosts with multiple IP addresses correctly

This change also does some clean-up in scheduler.h and
simple-scheduler.{h,cc}.

Change-Id: I044f83806fcde820fcb38047cf6b8e780d803858
Reviewed-on: http://gerrit.cloudera.org:8080/3771
Reviewed-by: Lars Volker <lv...@cloudera.com>
Reviewed-by: Sailesh Mukil <sa...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/0ad935b6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/0ad935b6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/0ad935b6

Branch: refs/heads/master
Commit: 0ad935b63c23029bd4cac4beefde6b3b7c0e322b
Parents: 45ff0f9
Author: Lars Volker <lv...@cloudera.com>
Authored: Fri Feb 12 16:31:36 2016 -0800
Committer: Internal Jenkins <cl...@gerrit.cloudera.org>
Committed: Tue Jul 26 21:44:37 2016 +0000

----------------------------------------------------------------------
 be/src/scheduling/scheduler.h                   |  11 -
 be/src/scheduling/simple-scheduler-test.cc      |  84 +-
 be/src/scheduling/simple-scheduler.cc           | 855 +++++++++++--------
 be/src/scheduling/simple-scheduler.h            | 420 +++++++--
 be/src/service/query-options.cc                 |  16 +
 be/src/service/query-options.h                  |   1 +
 common/thrift/ImpalaInternalService.thrift      |  13 +-
 common/thrift/ImpalaService.thrift              |   7 +-
 .../com/cloudera/impala/analysis/TableRef.java  |  11 +-
 .../impala/analysis/AnalyzeStmtsTest.java       |  16 +-
 .../cloudera/impala/analysis/ParserTest.java    |  25 +-
 .../com/cloudera/impala/analysis/ToSqlTest.java |  12 +-
 12 files changed, 983 insertions(+), 488 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0ad935b6/be/src/scheduling/scheduler.h
----------------------------------------------------------------------
diff --git a/be/src/scheduling/scheduler.h b/be/src/scheduling/scheduler.h
index 4eee9ef..39d8309 100644
--- a/be/src/scheduling/scheduler.h
+++ b/be/src/scheduling/scheduler.h
@@ -44,17 +44,6 @@ class Scheduler {
   /// List of server descriptors.
   typedef std::vector<TBackendDescriptor> BackendList;
 
-  /// Return a host/port pair of known ImpalaInternalServices that is running on or nearby
-  /// the given data location
-  virtual Status GetBackend(const TNetworkAddress& data_location,
-      TBackendDescriptor* backend) = 0;
-
-  /// Return true if there is a backend located on the given data_location
-  virtual bool HasLocalBackend(const TNetworkAddress& data_location) = 0;
-
-  /// Return a list of all backends known to the scheduler
-  virtual void GetAllKnownBackends(BackendList* backends) = 0;
-
   /// Populates given query schedule whose execution is to be coordinated by coord.
   /// Assigns fragments to hosts based on scan ranges in the query exec request.
   /// If resource management is enabled, also reserves resources from the central

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0ad935b6/be/src/scheduling/simple-scheduler-test.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/simple-scheduler-test.cc b/be/src/scheduling/simple-scheduler-test.cc
index bebb1fb..d43682e 100644
--- a/be/src/scheduling/simple-scheduler-test.cc
+++ b/be/src/scheduling/simple-scheduler-test.cc
@@ -21,6 +21,7 @@
 
 #include "common/logging.h"
 #include "simple-scheduler.h"
+#include "util/runtime-profile.h"
 
 #include "common/names.h"
 
@@ -147,7 +148,7 @@ class Cluster {
 
   /// Add a number of hosts with the same properties by repeatedly calling AddHost(..).
   void AddHosts(int num_hosts, bool has_backend, bool has_datanode) {
-    for (int i = 0; i < num_hosts; ++i) AddHost(true, true);
+    for (int i = 0; i < num_hosts; ++i) AddHost(has_backend, has_datanode);
   }
 
   /// Convert a host index to a hostname.
@@ -365,6 +366,10 @@ class Plan {
 
   const TQueryOptions& query_options() const { return query_options_; }
 
+  void SetReplicaPreference(TReplicaPreference::type p) {
+    query_options_.replica_preference = p;
+  }
+
   void SetRandomReplica(bool b) { query_options_.schedule_random_replica = b; }
   void SetDisableCachedReads(bool b) { query_options_.disable_cached_reads = b; }
   const Cluster& cluster() const { return schema_.cluster(); }
@@ -745,15 +750,20 @@ class SchedulerWrapper {
     InitializeScheduler();
   }
 
-  /// Call ComputeScanRangeAssignment().
+  /// Call ComputeScanRangeAssignment() with exec_at_coord set to false.
   void Compute(Result* result) {
+    Compute(false, result);
+  }
+
+  /// Call ComputeScanRangeAssignment().
+  void Compute(bool exec_at_coord, Result* result) {
     DCHECK(scheduler_ != NULL);
 
     // Compute Assignment.
     FragmentScanRangeAssignment* assignment = result->AddAssignment();
-    scheduler_->ComputeScanRangeAssignment(0, NULL, false,
-        plan_.scan_range_locations(), plan_.referenced_datanodes(), false,
-        plan_.query_options(), assignment);
+    scheduler_->ComputeScanRangeAssignment(*scheduler_->GetBackendConfig(), 0, NULL,
+        false, plan_.scan_range_locations(), plan_.referenced_datanodes(), exec_at_coord,
+        plan_.query_options(), NULL, assignment);
   }
 
   /// Reset the state of the scheduler by re-creating and initializing it.
@@ -887,6 +897,27 @@ TEST_F(SchedulerTest, SingleHostSingleFile) {
   EXPECT_EQ(0, result.NumCachedAssignments());
 }
 
+/// Test assigning all scan ranges to the coordinator.
+TEST_F(SchedulerTest, ExecAtCoord) {
+  Cluster cluster;
+  cluster.AddHosts(3, true, true);
+
+  Schema schema(cluster);
+  schema.AddMultiBlockTable("T", 3, ReplicaPlacement::LOCAL_ONLY, 3);
+
+  Plan plan(schema);
+  plan.AddTableScan("T");
+
+  Result result(plan);
+  SchedulerWrapper scheduler(plan);
+  bool exec_at_coord = true;
+  scheduler.Compute(exec_at_coord, &result);
+
+  EXPECT_EQ(3 * Block::DEFAULT_BLOCK_SIZE, result.NumTotalAssignedBytes(0));
+  EXPECT_EQ(0, result.NumTotalAssignedBytes(1));
+  EXPECT_EQ(0, result.NumTotalAssignedBytes(2));
+}
+
 /// Test scanning a simple table twice.
 TEST_F(SchedulerTest, ScanTableTwice) {
   Cluster cluster;
@@ -955,29 +986,6 @@ TEST_F(SchedulerTest, LocalReadsPickFirstReplica) {
   EXPECT_EQ(0, result.NumDiskAssignments(2));
 }
 
-/// Verify that scheduling with random_replica = true results in a pseudo-random
-/// round-robin selection of backends.
-/// Disabled, global backend rotation not implemented.
-TEST_F(SchedulerTest, DISABLED_RandomReplicaRoundRobin) {
-  Cluster cluster;
-  for (int i = 0; i < 10; ++i) cluster.AddHost(i < 5, true);
-
-  Schema schema(cluster);
-  schema.AddSingleBlockTable("T1", {0, 1, 2});
-
-  Plan plan(schema);
-  plan.AddTableScan("T1");
-  plan.SetRandomReplica(true);
-
-  Result result(plan);
-  SchedulerWrapper scheduler(plan);
-  for (int i = 0; i < 3; ++i) scheduler.Compute(&result);
-
-  EXPECT_EQ(1 * Block::DEFAULT_BLOCK_SIZE, result.NumDiskAssignedBytes(0));
-  EXPECT_EQ(1 * Block::DEFAULT_BLOCK_SIZE, result.NumDiskAssignedBytes(1));
-  EXPECT_EQ(1 * Block::DEFAULT_BLOCK_SIZE, result.NumDiskAssignedBytes(2));
-}
-
 /// Create a medium sized cluster with 100 nodes and compute a schedule over 3 tables.
 TEST_F(SchedulerTest, TestMediumSizedCluster) {
   Cluster cluster;
@@ -1126,10 +1134,12 @@ TEST_F(SchedulerTest, TestDisableCachedReads) {
 
 /// IMPALA-3019: Test for round robin reset problem. We schedule the same plan twice but
 /// send an empty statestored message in between.
-TEST_F(SchedulerTest, DISABLED_EmptyStatestoreMessage) {
+/// TODO: This problem cannot occur anymore and the test is merely green for random
+/// behavior. Remove.
+TEST_F(SchedulerTest, EmptyStatestoreMessage) {
   Cluster cluster;
   cluster.AddHosts(3, false, true);
-  cluster.AddHost(true, false);
+  cluster.AddHosts(2, true, false);
 
   Schema schema(cluster);
   schema.AddMultiBlockTable("T1", 1, ReplicaPlacement::RANDOM, 3);
@@ -1141,14 +1151,20 @@ TEST_F(SchedulerTest, DISABLED_EmptyStatestoreMessage) {
   SchedulerWrapper scheduler(plan);
 
   scheduler.Compute(&result);
-  EXPECT_EQ(1, result.NumDiskAssignments(0));
-  EXPECT_EQ(0, result.NumDiskAssignments(1));
+  EXPECT_EQ(0, result.NumTotalAssignedBytes(0));
+  EXPECT_EQ(0, result.NumTotalAssignedBytes(1));
+  EXPECT_EQ(0, result.NumTotalAssignedBytes(2));
+  EXPECT_EQ(0, result.NumTotalAssignedBytes(3));
+  EXPECT_EQ(1 * Block::DEFAULT_BLOCK_SIZE, result.NumTotalAssignedBytes(4));
   result.Reset();
 
   scheduler.SendEmptyUpdate();
   scheduler.Compute(&result);
-  EXPECT_EQ(0, result.NumDiskAssignments(0));
-  EXPECT_EQ(1, result.NumDiskAssignments(1));
+  EXPECT_EQ(0, result.NumTotalAssignedBytes(0));
+  EXPECT_EQ(0, result.NumTotalAssignedBytes(1));
+  EXPECT_EQ(0, result.NumTotalAssignedBytes(2));
+  EXPECT_EQ(1 * Block::DEFAULT_BLOCK_SIZE, result.NumTotalAssignedBytes(3));
+  EXPECT_EQ(0, result.NumTotalAssignedBytes(4));
 }
 
 /// Test sending updates to the scheduler.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0ad935b6/be/src/scheduling/simple-scheduler.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/simple-scheduler.cc b/be/src/scheduling/simple-scheduler.cc
index 1e46c67..c2e948f 100644
--- a/be/src/scheduling/simple-scheduler.cc
+++ b/be/src/scheduling/simple-scheduler.cc
@@ -14,6 +14,8 @@
 
 #include "scheduling/simple-scheduler.h"
 
+#include <atomic>
+#include <random>
 #include <vector>
 
 #include <boost/algorithm/string.hpp>
@@ -41,6 +43,7 @@
 #include "util/llama-util.h"
 #include "util/mem-info.h"
 #include "util/parse-util.h"
+#include "util/runtime-profile-counters.h"
 #include "gen-cpp/ResourceBrokerService_types.h"
 
 #include "common/names.h"
@@ -65,6 +68,7 @@ static const string ASSIGNMENTS_KEY("simple-scheduler.assignments.total");
 static const string SCHEDULER_INIT_KEY("simple-scheduler.initialized");
 static const string NUM_BACKENDS_KEY("simple-scheduler.num-backends");
 
+
 static const string BACKENDS_WEB_PAGE = "/backends";
 static const string BACKENDS_TEMPLATE = "backends.tmpl";
 
@@ -74,19 +78,19 @@ SimpleScheduler::SimpleScheduler(StatestoreSubscriber* subscriber,
     const string& backend_id, const TNetworkAddress& backend_address,
     MetricGroup* metrics, Webserver* webserver, ResourceBroker* resource_broker,
     RequestPoolService* request_pool_service)
-  : metrics_(metrics->GetChildGroup("scheduler")),
+  : backend_config_(std::make_shared<const BackendConfig>()),
+    metrics_(metrics->GetChildGroup("scheduler")),
     webserver_(webserver),
     statestore_subscriber_(subscriber),
-    backend_id_(backend_id),
+    local_backend_id_(backend_id),
     thrift_serializer_(false),
     total_assignments_(NULL),
     total_local_assignments_(NULL),
-    initialised_(NULL),
-    update_count_(0),
+    initialized_(NULL),
     resource_broker_(resource_broker),
     request_pool_service_(request_pool_service) {
-  backend_descriptor_.address = backend_address;
-  next_nonlocal_backend_entry_ = backend_map_.begin();
+  local_backend_descriptor_.address = backend_address;
+
   if (FLAGS_disable_admission_control) LOG(INFO) << "Admission control is disabled.";
   if (!FLAGS_disable_admission_control) {
     admission_controller_.reset(
@@ -118,57 +122,43 @@ SimpleScheduler::SimpleScheduler(StatestoreSubscriber* subscriber,
 SimpleScheduler::SimpleScheduler(const vector<TNetworkAddress>& backends,
     MetricGroup* metrics, Webserver* webserver, ResourceBroker* resource_broker,
     RequestPoolService* request_pool_service)
-  : metrics_(metrics),
+  : backend_config_(std::make_shared<const BackendConfig>(backends)),
+    metrics_(metrics),
     webserver_(webserver),
     statestore_subscriber_(NULL),
     thrift_serializer_(false),
     total_assignments_(NULL),
     total_local_assignments_(NULL),
-    initialised_(NULL),
-    update_count_(0),
+    initialized_(NULL),
     resource_broker_(resource_broker),
     request_pool_service_(request_pool_service) {
   DCHECK(backends.size() > 0);
+  local_backend_descriptor_.address = MakeNetworkAddress(FLAGS_hostname, FLAGS_be_port);
   if (FLAGS_disable_admission_control) LOG(INFO) << "Admission control is disabled.";
   // request_pool_service_ may be null in unit tests
   if (request_pool_service_ != NULL && !FLAGS_disable_admission_control) {
     admission_controller_.reset(
         new AdmissionController(request_pool_service_, metrics, TNetworkAddress()));
   }
-
-  for (int i = 0; i < backends.size(); ++i) {
-    vector<string> ipaddrs;
-    Status status = HostnameToIpAddrs(backends[i].hostname, &ipaddrs);
-    if (!status.ok()) {
-      VLOG(1) << "Failed to resolve " << backends[i].hostname << ": "
-              << status.GetDetail();
-      continue;
-    }
-
-    // Try to find a non-localhost address, otherwise just use the
-    // first IP address returned.
-    string ipaddr = ipaddrs[0];
-    if (!FindFirstNonLocalhost(ipaddrs, &ipaddr)) {
-      VLOG(1) << "Only localhost addresses found for " << backends[i].hostname;
-    }
-
-    BackendMap::iterator it = backend_map_.find(ipaddr);
-    if (it == backend_map_.end()) {
-      it = backend_map_.insert(
-          make_pair(ipaddr, list<TBackendDescriptor>())).first;
-      backend_ip_map_[backends[i].hostname] = ipaddr;
-    }
-
-    TBackendDescriptor descriptor;
-    descriptor.address = MakeNetworkAddress(ipaddr, backends[i].port);
-    it->second.push_back(descriptor);
-  }
-  next_nonlocal_backend_entry_ = backend_map_.begin();
 }
 
 Status SimpleScheduler::Init() {
   LOG(INFO) << "Starting simple scheduler";
 
+  // Figure out what our IP address is, so that each subscriber
+  // doesn't have to resolve it on every heartbeat.
+  IpAddr ip;
+  const Hostname& hostname = local_backend_descriptor_.address.hostname;
+  Status status = HostnameToIpAddr(hostname, &ip);
+  if (!status.ok()) {
+    VLOG(1) << status.GetDetail();
+    status.AddDetail("SimpleScheduler failed to start");
+    return status;
+  }
+
+  local_backend_descriptor_.ip_address = ip;
+  LOG(INFO) << "Simple-scheduler using " << ip << " as IP address";
+
   if (webserver_ != NULL) {
     Webserver::UrlCallback backends_callback =
         bind<void>(mem_fn(&SimpleScheduler::BackendsUrlCallback), this, _1, _2);
@@ -188,55 +178,33 @@ Status SimpleScheduler::Init() {
       RETURN_IF_ERROR(admission_controller_->Init(statestore_subscriber_));
     }
   }
+
   if (metrics_ != NULL) {
+    // This is after registering with the statestored, so we already have to synchronize
+    // access to the backend_config_ shared_ptr.
+    int num_backends = GetBackendConfig()->backend_map().size();
     total_assignments_ = metrics_->AddCounter<int64_t>(ASSIGNMENTS_KEY, 0);
     total_local_assignments_ = metrics_->AddCounter<int64_t>(LOCAL_ASSIGNMENTS_KEY, 0);
-    initialised_ = metrics_->AddProperty(SCHEDULER_INIT_KEY, true);
+    initialized_ = metrics_->AddProperty(SCHEDULER_INIT_KEY, true);
     num_fragment_instances_metric_ = metrics_->AddGauge<int64_t>(
-        NUM_BACKENDS_KEY, backend_map_.size());
+        NUM_BACKENDS_KEY, num_backends);
   }
 
   if (statestore_subscriber_ != NULL) {
-    // Figure out what our IP address is, so that each subscriber
-    // doesn't have to resolve it on every heartbeat.
-    vector<string> ipaddrs;
-    const string& hostname = backend_descriptor_.address.hostname;
-    Status status = HostnameToIpAddrs(hostname, &ipaddrs);
-    if (!status.ok()) {
-      VLOG(1) << "Failed to resolve " << hostname << ": " << status.GetDetail();
-      status.AddDetail("SimpleScheduler failed to start");
-      return status;
-    }
-    // Find a non-localhost address for this host; if one can't be
-    // found use the first address returned by HostnameToIpAddrs
-    string ipaddr = ipaddrs[0];
-    if (!FindFirstNonLocalhost(ipaddrs, &ipaddr)) {
-      VLOG(3) << "Only localhost addresses found for " << hostname;
-    }
-
-    backend_descriptor_.ip_address = ipaddr;
-    LOG(INFO) << "Simple-scheduler using " << ipaddr << " as IP address";
-
     if (webserver_ != NULL) {
       const TNetworkAddress& webserver_address = webserver_->http_address();
       if (IsWildcardAddress(webserver_address.hostname)) {
-        backend_descriptor_.__set_debug_http_address(
-            MakeNetworkAddress(ipaddr, webserver_address.port));
+        local_backend_descriptor_.__set_debug_http_address(
+            MakeNetworkAddress(ip, webserver_address.port));
       } else {
-        backend_descriptor_.__set_debug_http_address(webserver_address);
+        local_backend_descriptor_.__set_debug_http_address(webserver_address);
       }
-      backend_descriptor_.__set_secure_webserver(webserver_->IsSecure());
+      local_backend_descriptor_.__set_secure_webserver(webserver_->IsSecure());
     }
   }
   return Status::OK();
 }
 
-// Utility method to help sort backends by ascending network address
-bool TBackendDescriptorComparator(const TBackendDescriptor& a,
-    const TBackendDescriptor& b) {
-  return TNetworkAddressComparator(a.address, b.address);
-}
-
 void SimpleScheduler::BackendsUrlCallback(const Webserver::ArgumentMap& args,
     Document* document) {
   BackendList backends;
@@ -253,8 +221,6 @@ void SimpleScheduler::BackendsUrlCallback(const Webserver::ArgumentMap& args,
 void SimpleScheduler::UpdateMembership(
     const StatestoreSubscriber::TopicDeltaMap& incoming_topic_deltas,
     vector<TTopicDelta>* subscriber_topic_updates) {
-  ++update_count_;
-  // TODO: Work on a copy if possible, or at least do resolution as a separate step
   // First look to see if the topic(s) we're interested in have an update
   StatestoreSubscriber::TopicDeltaMap::const_iterator topic =
       incoming_topic_deltas.find(IMPALA_MEMBERSHIP_TOPIC);
@@ -262,74 +228,59 @@ void SimpleScheduler::UpdateMembership(
   if (topic != incoming_topic_deltas.end()) {
     const TTopicDelta& delta = topic->second;
 
-    // This function needs to handle both delta and non-delta updates. For delta
-    // updates, it is desireable to minimize the number of copies to only
-    // the added/removed items. To accomplish this, all updates are processed
-    // under a lock and applied to the shared backend maps (backend_map_ and
-    // backend_ip_map_) in place.
-    {
-      lock_guard<mutex> lock(backend_map_lock_);
-      bool backend_map_changed = false;
-      if (!delta.is_delta) {
-        backend_map_changed = true;
-        current_membership_.clear();
-        backend_map_.clear();
-        backend_ip_map_.clear();
-      }
+    // This function needs to handle both delta and non-delta updates. To minimize the
+    // time needed to hold locks, all updates are applied to a copy of backend_config_,
+    // which is then swapped into place atomically.
+    std::shared_ptr<BackendConfig> new_backend_config;
 
-      // Process new entries to the topic
-      for (const TTopicItem& item: delta.topic_entries) {
-        TBackendDescriptor be_desc;
-        // Benchmarks have suggested that this method can deserialize
-        // ~10m messages per second, so no immediate need to consider optimisation.
-        uint32_t len = item.value.size();
-        Status status = DeserializeThriftMsg(reinterpret_cast<const uint8_t*>(
-            item.value.data()), &len, false, &be_desc);
-        if (!status.ok()) {
-          VLOG(2) << "Error deserializing membership topic item with key: " << item.key;
-          continue;
-        }
-        if (item.key == backend_id_ && be_desc.address != backend_descriptor_.address) {
-          // Someone else has registered this subscriber ID with a
-          // different address. We will try to re-register
-          // (i.e. overwrite their subscription), but there is likely
-          // a configuration problem.
-          LOG_EVERY_N(WARNING, 30) << "Duplicate subscriber registration from address: "
-                                   << be_desc.address;
-        }
+    if (!delta.is_delta) {
+      current_membership_.clear();
+      new_backend_config = std::make_shared<BackendConfig>();
+    } else {
+      // Make a copy
+      lock_guard<mutex> lock(backend_config_lock_);
+      new_backend_config = std::make_shared<BackendConfig>(*backend_config_);
+    }
 
-        backend_map_changed = true;
-        list<TBackendDescriptor>* be_descs = &backend_map_[be_desc.ip_address];
-        if (find(be_descs->begin(), be_descs->end(), be_desc) == be_descs->end()) {
-          backend_map_[be_desc.ip_address].push_back(be_desc);
-        }
-        backend_ip_map_[be_desc.address.hostname] = be_desc.ip_address;
-        current_membership_.insert(make_pair(item.key, be_desc));
+    // Process new entries to the topic
+    for (const TTopicItem& item: delta.topic_entries) {
+      TBackendDescriptor be_desc;
+      // Benchmarks have suggested that this method can deserialize
+      // ~10m messages per second, so no immediate need to consider optimization.
+      uint32_t len = item.value.size();
+      Status status = DeserializeThriftMsg(reinterpret_cast<const uint8_t*>(
+          item.value.data()), &len, false, &be_desc);
+      if (!status.ok()) {
+        VLOG(2) << "Error deserializing membership topic item with key: " << item.key;
+        continue;
       }
-      // Process deletions from the topic
-      for (const string& backend_id: delta.topic_deletions) {
-        if (current_membership_.find(backend_id) != current_membership_.end()) {
-          backend_map_changed = true;
-          const TBackendDescriptor& be_desc = current_membership_[backend_id];
-          backend_ip_map_.erase(be_desc.address.hostname);
-          list<TBackendDescriptor>* be_descs = &backend_map_[be_desc.ip_address];
-          be_descs->erase(
-              remove(be_descs->begin(), be_descs->end(), be_desc), be_descs->end());
-          if (be_descs->empty()) backend_map_.erase(be_desc.ip_address);
-          current_membership_.erase(backend_id);
-        }
+      if (item.key == local_backend_id_
+          && be_desc.address != local_backend_descriptor_.address) {
+        // Someone else has registered this subscriber ID with a different address. We
+        // will try to re-register (i.e. overwrite their subscription), but there is
+        // likely a configuration problem.
+        LOG_EVERY_N(WARNING, 30) << "Duplicate subscriber registration from address: "
+          << be_desc.address;
+      }
+
+      new_backend_config->AddBackend(be_desc);
+      current_membership_.insert(make_pair(item.key, be_desc));
+    }
+    // Process deletions from the topic
+    for (const string& backend_id: delta.topic_deletions) {
+      if (current_membership_.find(backend_id) != current_membership_.end()) {
+        new_backend_config->RemoveBackend(current_membership_[backend_id]);
+        current_membership_.erase(backend_id);
       }
-      // Update invalidated iterator.
-      if (backend_map_changed) next_nonlocal_backend_entry_ = backend_map_.begin();
     }
+    SetBackendConfig(new_backend_config);
 
     // If this impalad is not in our view of the membership list, we should add it and
     // tell the statestore.
-    // TODO: Inject global dependencies to make this code testable.
     bool is_offline = ExecEnv::GetInstance() &&
         ExecEnv::GetInstance()->impala_server()->IsOffline();
     if (!is_offline &&
-        current_membership_.find(backend_id_) == current_membership_.end()) {
+        current_membership_.find(local_backend_id_) == current_membership_.end()) {
       VLOG(1) << "Registering local backend with statestore";
       subscriber_topic_updates->push_back(TTopicDelta());
       TTopicDelta& update = subscriber_topic_updates->back();
@@ -337,20 +288,21 @@ void SimpleScheduler::UpdateMembership(
       update.topic_entries.push_back(TTopicItem());
 
       TTopicItem& item = update.topic_entries.back();
-      item.key = backend_id_;
-      Status status = thrift_serializer_.Serialize(&backend_descriptor_, &item.value);
+      item.key = local_backend_id_;
+      Status status = thrift_serializer_.Serialize(
+          &local_backend_descriptor_, &item.value);
       if (!status.ok()) {
-        LOG(WARNING) << "Failed to serialize Impala backend address for statestore topic: "
-                     << status.GetDetail();
+        LOG(WARNING) << "Failed to serialize Impala backend address for statestore topic:"
+                     << " " << status.GetDetail();
         subscriber_topic_updates->pop_back();
       }
     } else if (is_offline &&
-        current_membership_.find(backend_id_) != current_membership_.end()) {
+        current_membership_.find(local_backend_id_) != current_membership_.end()) {
       LOG(WARNING) << "Removing offline ImpalaServer from statestore";
       subscriber_topic_updates->push_back(TTopicDelta());
       TTopicDelta& update = subscriber_topic_updates->back();
       update.topic_name = IMPALA_MEMBERSHIP_TOPIC;
-      update.topic_deletions.push_back(backend_id_);
+      update.topic_deletions.push_back(local_backend_id_);
     }
     if (metrics_ != NULL) {
       num_fragment_instances_metric_->set_value(current_membership_.size());
@@ -358,63 +310,24 @@ void SimpleScheduler::UpdateMembership(
   }
 }
 
-Status SimpleScheduler::GetBackend(const TNetworkAddress& data_location,
-    TBackendDescriptor* backend) {
-  lock_guard<mutex> lock(backend_map_lock_);
-  if (backend_map_.size() == 0) {
-    return Status("No backends configured");
-  }
-  bool local_assignment = false;
-  BackendMap::iterator entry = backend_map_.find(data_location.hostname);
-
-  if (entry == backend_map_.end()) {
-    // backend_map_ maps ip address to backend but
-    // data_location.hostname might be a hostname.
-    // Find the ip address of the data_location from backend_ip_map_.
-    BackendIpAddressMap::const_iterator itr =
-        backend_ip_map_.find(data_location.hostname);
-    if (itr != backend_ip_map_.end()) {
-      entry = backend_map_.find(itr->second);
-    }
-  }
-
-  if (entry == backend_map_.end()) {
-    // round robin the ipaddress
-    entry = next_nonlocal_backend_entry_;
-    ++next_nonlocal_backend_entry_;
-    if (next_nonlocal_backend_entry_ == backend_map_.end()) {
-      next_nonlocal_backend_entry_ = backend_map_.begin();
-    }
-  } else {
-    local_assignment = true;
-  }
-  DCHECK(!entry->second.empty());
-  // Round-robin between impalads on the same ipaddress.
-  // Pick the first one, then move it to the back of the queue
-  *backend = entry->second.front();
-  entry->second.pop_front();
-  entry->second.push_back(*backend);
-
-  if (metrics_ != NULL) {
-    total_assignments_->Increment(1);
-    if (local_assignment) {
-      total_local_assignments_->Increment(1);
-    }
-  }
+SimpleScheduler::BackendConfigPtr SimpleScheduler::GetBackendConfig() const {
+  lock_guard<mutex> l(backend_config_lock_);
+  DCHECK(backend_config_.get() != NULL);
+  BackendConfigPtr backend_config = backend_config_;
+  return backend_config;
+}
 
-  if (VLOG_FILE_IS_ON) {
-    stringstream s;
-    s << "(" << data_location;
-    s << " -> " << backend->address << ")";
-    VLOG_FILE << "SimpleScheduler assignment (data->backend):  " << s.str();
-  }
-  return Status::OK();
+void SimpleScheduler::SetBackendConfig(const BackendConfigPtr& backend_config)
+{
+  lock_guard<mutex> l(backend_config_lock_);
+  backend_config_ = backend_config;
 }
 
+
 void SimpleScheduler::GetAllKnownBackends(BackendList* backends) {
-  lock_guard<mutex> lock(backend_map_lock_);
   backends->clear();
-  for (const BackendMap::value_type& backend_list: backend_map_) {
+  BackendConfigPtr backend_config = GetBackendConfig();
+  for (const BackendMap::value_type& backend_list: backend_config->backend_map()) {
     backends->insert(backends->end(), backend_list.second.begin(),
                      backend_list.second.end());
   }
@@ -423,6 +336,11 @@ void SimpleScheduler::GetAllKnownBackends(BackendList* backends) {
 Status SimpleScheduler::ComputeScanRangeAssignment(const TQueryExecRequest& exec_request,
     QuerySchedule* schedule) {
   map<TPlanNodeId, vector<TScanRangeLocations>>::const_iterator entry;
+  RuntimeProfile::Counter* total_assignment_timer =
+      ADD_TIMER(schedule->summary_profile(), "ComputeScanRangeAssignmentTimer");
+
+  BackendConfigPtr backend_config = GetBackendConfig();
+
   for (entry = exec_request.per_node_scan_ranges.begin();
       entry != exec_request.per_node_scan_ranges.end(); ++entry) {
     const TPlanNodeId node_id = entry->first;
@@ -436,32 +354,37 @@ Status SimpleScheduler::ComputeScanRangeAssignment(const TQueryExecRequest& exec
     const TReplicaPreference::type* node_replica_preference = node.__isset.hdfs_scan_node
         && node.hdfs_scan_node.__isset.replica_preference
         ? &node.hdfs_scan_node.replica_preference : NULL;
-    bool node_random_replica = node.__isset.hdfs_scan_node &&
-        node.hdfs_scan_node.__isset.random_replica &&
-        node.hdfs_scan_node.random_replica;
+
+    bool node_random_replica = node.__isset.hdfs_scan_node
+        && node.hdfs_scan_node.__isset.random_replica
+        && node.hdfs_scan_node.random_replica;
 
     FragmentScanRangeAssignment* assignment =
         &(*schedule->exec_params())[fragment_idx].scan_range_assignment;
-    RETURN_IF_ERROR(ComputeScanRangeAssignment(
+    RETURN_IF_ERROR(ComputeScanRangeAssignment(*backend_config,
         node_id, node_replica_preference, node_random_replica, entry->second,
-        exec_request.host_list, exec_at_coord, schedule->query_options(), assignment));
+        exec_request.host_list, exec_at_coord, schedule->query_options(),
+        total_assignment_timer, assignment));
     schedule->AddScanRanges(entry->second.size());
   }
   return Status::OK();
 }
 
 Status SimpleScheduler::ComputeScanRangeAssignment(
-    PlanNodeId node_id, const TReplicaPreference::type* node_replica_preference,
-    bool node_random_replica, const vector<TScanRangeLocations>& locations,
+    const BackendConfig& backend_config, PlanNodeId node_id,
+    const TReplicaPreference::type* node_replica_preference, bool node_random_replica,
+    const vector<TScanRangeLocations>& locations,
     const vector<TNetworkAddress>& host_list, bool exec_at_coord,
-    const TQueryOptions& query_options, FragmentScanRangeAssignment* assignment) {
+    const TQueryOptions& query_options, RuntimeProfile::Counter* timer,
+    FragmentScanRangeAssignment* assignment) {
+  SCOPED_TIMER(timer);
   // We adjust all replicas with memory distance less than base_distance to base_distance
-  // and view all replicas with equal or better distance as the same. For a full list of
-  // memory distance classes see TReplicaPreference in PlanNodes.thrift.
-  TReplicaPreference::type base_distance = TReplicaPreference::CACHE_LOCAL;
+  // and collect all replicas with equal or better distance as candidates. For a full list
+  // of memory distance classes see TReplicaPreference in PlanNodes.thrift.
+  TReplicaPreference::type base_distance = query_options.replica_preference;
+
   // The query option to disable cached reads adjusts the memory base distance to view
-  // all replicas as disk_local or worse.
-  // TODO remove in CDH6
+  // all replicas as having a distance disk_local or worse.
   if (query_options.disable_cached_reads &&
       base_distance == TReplicaPreference::CACHE_LOCAL) {
     base_distance = TReplicaPreference::DISK_LOCAL;
@@ -470,166 +393,102 @@ Status SimpleScheduler::ComputeScanRangeAssignment(
   // A preference attached to the plan node takes precedence.
   if (node_replica_preference) base_distance = *node_replica_preference;
 
-  // On otherwise equivalent disk replicas we either pick the first one, or we pick a
-  // random one. Picking random ones helps with preventing hot spots across several
-  // queries. On cached replica we will always break ties randomly.
-  bool random_non_cached_tiebreak = node_random_replica
-      || query_options.schedule_random_replica;
+  // Between otherwise equivalent backends we optionally break ties by comparing their
+  // random rank.
+  bool random_replica = query_options.schedule_random_replica || node_random_replica;
 
-  // map from datanode host to total assigned bytes.
-  unordered_map<TNetworkAddress, uint64_t> assigned_bytes_per_host;
-  unordered_set<TNetworkAddress> remote_hosts;
-  int64_t remote_bytes = 0L;
-  int64_t local_bytes = 0L;
-  int64_t cached_bytes = 0L;
+  AssignmentCtx assignment_ctx(backend_config, total_assignments_,
+      total_local_assignments_);
 
+  vector<const TScanRangeLocations*> remote_scan_range_locations;
+
+  // Loop over all scan ranges, select a backend for those with local impalads and collect
+  // all others for later processing.
   for (const TScanRangeLocations& scan_range_locations: locations) {
-    // Assign scans to replica with smallest memory distance.
     TReplicaPreference::type min_distance = TReplicaPreference::REMOTE;
-    // Assign this scan range to the host w/ the fewest assigned bytes.
-    uint64_t min_assigned_bytes = numeric_limits<uint64_t>::max();
-    const TNetworkAddress* data_host = NULL;  // data server; not necessarily backend
-    int volume_id = -1;
-    bool is_cached = false;
-    bool remote_read = false;
-
-    // Equivalent replicas have the same adjusted memory distance and the same number of
-    // assigned bytes.
-    int num_equivalent_replicas = 0;
-    for (const TScanRangeLocation& location: scan_range_locations.locations) {
-      TReplicaPreference::type memory_distance = TReplicaPreference::REMOTE;
-      const TNetworkAddress& replica_host = host_list[location.host_idx];
-      if (HasLocalBackend(replica_host)) {
-        // Adjust whether or not this replica should count as being cached based on the
-        // query option and whether it is collocated. If the DN is not collocated treat
-        // the replica as not cached (network transfer dominates anyway in this case).
-        // TODO: measure this in a cluster setup. Are remote reads better with caching?
-        if (location.is_cached) {
-          memory_distance = TReplicaPreference::CACHE_LOCAL;
-        } else {
-          memory_distance = TReplicaPreference::DISK_LOCAL;
-        }
-      } else {
-        memory_distance = TReplicaPreference::REMOTE;
-      }
-      memory_distance = max(memory_distance, base_distance);
-
-      // Named variable is needed here for template parameter deduction to work.
-      uint64_t initial_bytes = 0L;
-      uint64_t assigned_bytes =
-          *FindOrInsert(&assigned_bytes_per_host, replica_host, initial_bytes);
-
-      bool found_new_replica = false;
-
-      // Check if we can already accept based on memory distance.
-      if (memory_distance < min_distance) {
-        min_distance = memory_distance;
-        num_equivalent_replicas = 1;
-        found_new_replica = true;
-      } else if (memory_distance == min_distance) {
-        // Check the effective memory distance of the current replica to decide whether to
-        // treat it as cached. If the actual distance has been increased to base_distance,
-        // then cached_replica will be different from is_cached.
-        bool cached_replica = memory_distance == TReplicaPreference::CACHE_LOCAL;
-        // Load based scheduling
-        if (assigned_bytes < min_assigned_bytes) {
-          num_equivalent_replicas = 1;
-          found_new_replica = true;
-        } else if (assigned_bytes == min_assigned_bytes &&
-            (random_non_cached_tiebreak || cached_replica)) {
-          // We do reservoir sampling: assume we have k equivalent replicas and encounter
-          // another equivalent one. Then we want to select the new one with probability
-          // 1/(k+1). This is achieved by rand % k+1 == 0. Now, assume the probability for
-          // one of the other replicas to be selected had been 1/k before. It will now be
-          // 1/k * k/(k+1) = 1/(k+1). Thus we achieve the goal of picking a replica
-          // uniformly at random.
-          ++num_equivalent_replicas;
-          const int r = rand();  // make debugging easier.
-          found_new_replica = (r % num_equivalent_replicas == 0);
-        }
-      }
 
-      if (found_new_replica) {
-        min_assigned_bytes = assigned_bytes;
-        data_host = &replica_host;
-        volume_id = location.volume_id;
-        is_cached = location.is_cached;
-        remote_read = min_distance == TReplicaPreference::REMOTE;
-      }
-    }  // end of for()
-
-    int64_t scan_range_length = 0;
-    if (scan_range_locations.scan_range.__isset.hdfs_file_split) {
-      scan_range_length = scan_range_locations.scan_range.hdfs_file_split.length;
-    } else if (scan_range_locations.scan_range.__isset.kudu_key_range) {
-      // Hack so that kudu ranges are well distributed.
-      // TODO: KUDU-1133 Use the tablet size instead.
-      scan_range_length = 1000;
-    }
-
-    if (remote_read) {
-      remote_bytes += scan_range_length;
-      remote_hosts.insert(*data_host);
+    // Select backend host for the current scan range.
+    if (exec_at_coord) {
+      assignment_ctx.RecordScanRangeAssignment(local_backend_descriptor_, node_id,
+          host_list, scan_range_locations, assignment);
     } else {
-      local_bytes += scan_range_length;
-      if (is_cached) cached_bytes += scan_range_length;
-    }
-    assigned_bytes_per_host[*data_host] += scan_range_length;
-
-    // translate data host to backend host
-    DCHECK(data_host != NULL);
-
-    TNetworkAddress exec_hostport;
-    if (!exec_at_coord) {
-      TBackendDescriptor backend;
-      RETURN_IF_ERROR(GetBackend(*data_host, &backend));
-      exec_hostport = backend.address;
-    } else {
-      exec_hostport = MakeNetworkAddress(FLAGS_hostname, FLAGS_be_port);
-    }
+      // Collect backend candidates with smallest memory distance.
+      vector<IpAddr> backend_candidates;
+      if (base_distance < TReplicaPreference::REMOTE) {
+        for (const TScanRangeLocation& location: scan_range_locations.locations) {
+          const TNetworkAddress& replica_host = host_list[location.host_idx];
+          // Determine the adjusted memory distance to the closest backend for the replica
+          // host.
+          TReplicaPreference::type memory_distance = TReplicaPreference::REMOTE;
+          IpAddr backend_ip;
+          bool has_local_backend = assignment_ctx.backend_config().LookUpBackendIp(
+              replica_host.hostname, &backend_ip);
+          if (has_local_backend) {
+            if (location.is_cached) {
+              memory_distance = TReplicaPreference::CACHE_LOCAL;
+            } else {
+              memory_distance = TReplicaPreference::DISK_LOCAL;
+            }
+          } else {
+            memory_distance = TReplicaPreference::REMOTE;
+          }
+          memory_distance = max(memory_distance, base_distance);
+
+          // We only need to collect backend candidates for non-remote reads, as it is the
+          // nature of remote reads that there is no backend available.
+          if (memory_distance < TReplicaPreference::REMOTE) {
+            DCHECK(has_local_backend);
+            // Check if we found a closer replica than the previous ones.
+            if (memory_distance < min_distance) {
+              min_distance = memory_distance;
+              backend_candidates.clear();
+              backend_candidates.push_back(backend_ip);
+            } else if (memory_distance == min_distance) {
+              backend_candidates.push_back(backend_ip);
+            }
+          }
+        }
+      }  // End of candidate selection.
+      DCHECK(!backend_candidates.empty() || min_distance == TReplicaPreference::REMOTE);
 
-    PerNodeScanRanges* scan_ranges =
-        FindOrInsert(assignment, exec_hostport, PerNodeScanRanges());
-    vector<TScanRangeParams>* scan_range_params_list =
-        FindOrInsert(scan_ranges, node_id, vector<TScanRangeParams>());
-    // add scan range
-    TScanRangeParams scan_range_params;
-    scan_range_params.scan_range = scan_range_locations.scan_range;
-    // Explicitly set the optional fields.
-    scan_range_params.__set_volume_id(volume_id);
-    scan_range_params.__set_is_cached(is_cached);
-    scan_range_params.__set_is_remote(remote_read);
-    scan_range_params_list->push_back(scan_range_params);
-  }
+      // Check the effective memory distance of the candidates to decide whether to treat
+      // the scan range as cached.
+      bool cached_replica = min_distance == TReplicaPreference::CACHE_LOCAL;
 
-  if (VLOG_FILE_IS_ON) {
-    VLOG_FILE << "Total remote scan volume = " <<
-        PrettyPrinter::Print(remote_bytes, TUnit::BYTES);
-    VLOG_FILE << "Total local scan volume = " <<
-        PrettyPrinter::Print(local_bytes, TUnit::BYTES);
-    VLOG_FILE << "Total cached scan volume = " <<
-        PrettyPrinter::Print(cached_bytes, TUnit::BYTES);
-    if (remote_hosts.size() > 0) {
-      stringstream remote_node_log;
-      remote_node_log << "Remote data node list: ";
-      for (const TNetworkAddress& remote_host: remote_hosts) {
-        remote_node_log << remote_host << " ";
-      }
-      VLOG_FILE << remote_node_log.str();
-    }
+      // Pick backend host based on data location.
+      bool local_backend = min_distance != TReplicaPreference::REMOTE;
 
-    for (FragmentScanRangeAssignment::value_type& entry: *assignment) {
-      VLOG_FILE << "ScanRangeAssignment: server=" << ThriftDebugString(entry.first);
-      for (PerNodeScanRanges::value_type& per_node_scan_ranges: entry.second) {
-        stringstream str;
-        for (TScanRangeParams& params: per_node_scan_ranges.second) {
-          str << ThriftDebugString(params) << " ";
-        }
-        VLOG_FILE << "node_id=" << per_node_scan_ranges.first << " ranges=" << str.str();
+      if (!local_backend) {
+        remote_scan_range_locations.push_back(&scan_range_locations);
+        continue;
       }
-    }
+      // For local reads we want to break ties by backend rank in these cases:
+      // - if it is enforced via a query option.
+      // - when selecting between cached replicas. In this case there is no OS buffer
+      //   cache to worry about.
+      // Remote reads will always break ties by backend rank.
+      bool decide_local_assignment_by_rank = random_replica || cached_replica;
+      const IpAddr* backend_ip = NULL;
+      backend_ip = assignment_ctx.SelectLocalBackendHost(backend_candidates,
+          decide_local_assignment_by_rank);
+      TBackendDescriptor backend;
+      assignment_ctx.SelectBackendOnHost(*backend_ip, &backend);
+      assignment_ctx.RecordScanRangeAssignment(backend, node_id, host_list,
+          scan_range_locations, assignment);
+    }  // End of backend host selection.
+  }  // End of for loop over scan ranges.
+
+  // Assign remote scans to backends.
+  for (const TScanRangeLocations* scan_range_locations: remote_scan_range_locations) {
+    const IpAddr* backend_ip = assignment_ctx.SelectRemoteBackendHost();
+    TBackendDescriptor backend;
+    assignment_ctx.SelectBackendOnHost(*backend_ip, &backend);
+    assignment_ctx.RecordScanRangeAssignment(backend, node_id, host_list,
+        *scan_range_locations, assignment);
   }
 
+  if (VLOG_FILE_IS_ON) assignment_ctx.PrintAssignment(*assignment);
+
   return Status::OK();
 }
 
@@ -695,7 +554,6 @@ void SimpleScheduler::ComputeFragmentExecParams(const TQueryExecRequest& exec_re
 void SimpleScheduler::ComputeFragmentHosts(const TQueryExecRequest& exec_request,
     QuerySchedule* schedule) {
   vector<FragmentExecParams>* fragment_exec_params = schedule->exec_params();
-  TNetworkAddress coord = MakeNetworkAddress(FLAGS_hostname, FLAGS_be_port);
   DCHECK_EQ(fragment_exec_params->size(), exec_request.fragments.size());
   vector<TPlanNodeType::type> scan_node_types;
   scan_node_types.push_back(TPlanNodeType::HDFS_SCAN_NODE);
@@ -710,7 +568,7 @@ void SimpleScheduler::ComputeFragmentHosts(const TQueryExecRequest& exec_request
     FragmentExecParams& params = (*fragment_exec_params)[i];
     if (fragment.partition.type == TPartitionType::UNPARTITIONED) {
       // all single-node fragments run on the coordinator host
-      params.hosts.push_back(coord);
+      params.hosts.push_back(local_backend_descriptor_.address);
       continue;
     }
 
@@ -822,7 +680,7 @@ void SimpleScheduler::GetScanHosts(TPlanNodeId scan_id,
     // TODO: we'll need to revisit this strategy once we can partition joins
     // (in which case this fragment might be executing a right outer join
     // with a large build table)
-    scan_hosts->push_back(MakeNetworkAddress(FLAGS_hostname, FLAGS_be_port));
+    scan_hosts->push_back(local_backend_descriptor_.address);
     return;
   }
 
@@ -890,7 +748,7 @@ Status SimpleScheduler::Schedule(Coordinator* coord, QuerySchedule* schedule) {
     if (!status.ok()) {
       // Warn about missing table and/or column stats if necessary.
       const TQueryCtx& query_ctx = schedule->request().query_ctx;
-      if(!query_ctx.__isset.parent_query_id &&
+      if (!query_ctx.__isset.parent_query_id &&
           query_ctx.__isset.tables_missing_stats &&
           !query_ctx.tables_missing_stats.empty()) {
         status.AddDetail(GetTablesMissingStatsWarning(query_ctx.tables_missing_stats));
@@ -1007,4 +865,297 @@ void SimpleScheduler::HandleLostResource(const TUniqueId& client_resource_id) {
   }
 }
 
+Status SimpleScheduler::HostnameToIpAddr(const Hostname& hostname, IpAddr* ip) {
+  // Try to resolve via the operating system.
+  vector<IpAddr> ipaddrs;
+  Status status = HostnameToIpAddrs(hostname, &ipaddrs);
+  if (!status.ok() || ipaddrs.empty()) {
+    stringstream ss;
+    ss << "Failed to resolve " << hostname << ": " << status.GetDetail();
+    return Status(ss.str());
+  }
+
+  // HostnameToIpAddrs() calls getaddrinfo() from glibc and will preserve the order of the
+  // result. RFC 3484 only specifies a partial order so we need to sort the addresses
+  // before picking the first non-localhost one.
+  sort(ipaddrs.begin(), ipaddrs.end());
+
+  // Try to find a non-localhost address, otherwise just use the first IP address
+  // returned.
+  *ip = ipaddrs[0];
+  if (!FindFirstNonLocalhost(ipaddrs, ip)) {
+    VLOG(3) << "Only localhost addresses found for " << hostname;
+  }
+  return Status::OK();
+}
+
+SimpleScheduler::BackendConfig::BackendConfig(
+    const std::vector<TNetworkAddress>& backends) {
+  // Construct backend_map and backend_ip_map.
+  for (int i = 0; i < backends.size(); ++i) {
+    IpAddr ip;
+    Status status = HostnameToIpAddr(backends[i].hostname, &ip);
+    if (!status.ok()) {
+      VLOG(1) << status.GetDetail();
+      continue;
+    }
+
+    BackendMap::iterator it = backend_map_.find(ip);
+    if (it == backend_map_.end()) {
+      it = backend_map_.insert(
+          make_pair(ip, BackendList())).first;
+      backend_ip_map_[backends[i].hostname] = ip;
+    }
+
+    TBackendDescriptor descriptor;
+    descriptor.address = MakeNetworkAddress(ip, backends[i].port);
+    descriptor.ip_address = ip;
+    it->second.push_back(descriptor);
+  }
+}
+
+void SimpleScheduler::BackendConfig::AddBackend(const TBackendDescriptor& be_desc) {
+  BackendList* be_descs = &backend_map_[be_desc.ip_address];
+  if (find(be_descs->begin(), be_descs->end(), be_desc) == be_descs->end()) {
+    be_descs->push_back(be_desc);
+  }
+  backend_ip_map_[be_desc.address.hostname] = be_desc.ip_address;
+}
+
+void SimpleScheduler::BackendConfig::RemoveBackend(const TBackendDescriptor& be_desc) {
+  backend_ip_map_.erase(be_desc.address.hostname);
+  auto be_descs_it = backend_map_.find(be_desc.ip_address);
+  if (be_descs_it != backend_map_.end()) {
+    BackendList* be_descs = &be_descs_it->second;
+    be_descs->erase(remove(be_descs->begin(), be_descs->end(), be_desc), be_descs->end());
+    if (be_descs->empty()) backend_map_.erase(be_descs_it);
+  }
+}
+
+bool SimpleScheduler::BackendConfig::LookUpBackendIp(const Hostname& hostname,
+    IpAddr* ip) const {
+  // Check if hostname is already a valid IP address.
+  if (backend_map_.find(hostname) != backend_map_.end()) {
+    if (ip) *ip = hostname;
+    return true;
+  }
+  auto it = backend_ip_map_.find(hostname);
+  if (it != backend_ip_map_.end()) {
+    if (ip) *ip = it->second;
+    return true;
+  }
+  return false;
+}
+
+SimpleScheduler::AssignmentCtx::AssignmentCtx(
+    const BackendConfig& backend_config,
+    IntCounter* total_assignments, IntCounter* total_local_assignments)
+  : backend_config_(backend_config), first_unused_backend_idx_(0),
+    total_assignments_(total_assignments),
+    total_local_assignments_(total_local_assignments) {
+  random_backend_order_.reserve(backend_map().size());
+  for (auto& v: backend_map()) random_backend_order_.push_back(&v);
+  std::mt19937 g(rand());
+  std::shuffle(random_backend_order_.begin(), random_backend_order_.end(), g);
+  // Initialize inverted map for backend rank lookups
+  int i = 0;
+  for (const BackendMap::value_type* v: random_backend_order_) {
+    random_backend_rank_[v->first] = i++;
+  }
+}
+
+const SimpleScheduler::IpAddr* SimpleScheduler::AssignmentCtx::SelectLocalBackendHost(
+    const std::vector<IpAddr>& data_locations, bool break_ties_by_rank) {
+  DCHECK(!data_locations.empty());
+  // List of candidate indexes into 'data_locations'.
+  vector<int> candidates_idxs;
+  // Find locations with minimum number of assigned bytes.
+  int64_t min_assigned_bytes = numeric_limits<int64_t>::max();
+  for (int i = 0; i < data_locations.size(); ++i) {
+    const IpAddr& backend_ip = data_locations[i];
+    int64_t assigned_bytes = 0;
+    auto handle_it = assignment_heap_.find(backend_ip);
+    if (handle_it != assignment_heap_.end()) {
+      assigned_bytes = (*handle_it->second).assigned_bytes;
+    }
+    if (assigned_bytes < min_assigned_bytes) {
+      candidates_idxs.clear();
+      min_assigned_bytes = assigned_bytes;
+    }
+    if (assigned_bytes == min_assigned_bytes) candidates_idxs.push_back(i);
+  }
+
+  DCHECK(!candidates_idxs.empty());
+  auto min_rank_idx = candidates_idxs.begin();
+  if (break_ties_by_rank) {
+    min_rank_idx = min_element(candidates_idxs.begin(), candidates_idxs.end(),
+        [&data_locations, this](const int& a, const int& b) {
+          return GetBackendRank(data_locations[a]) < GetBackendRank(data_locations[b]);
+        });
+  }
+  return &data_locations[*min_rank_idx];
+}
+
+const SimpleScheduler::IpAddr* SimpleScheduler::AssignmentCtx::SelectRemoteBackendHost() {
+  const IpAddr* candidate_ip;
+  if (HasUnusedBackends()) {
+    // Pick next unused backend.
+    candidate_ip = GetNextUnusedBackendAndIncrement();
+  } else {
+    // Pick next backend from assignment_heap. All backends must have been inserted into
+    // the heap at this point.
+    DCHECK(backend_config_.NumBackends() == assignment_heap_.size());
+    candidate_ip = &(assignment_heap_.top().ip);
+  }
+  DCHECK(candidate_ip != NULL);
+  return candidate_ip;
+}
+
+bool SimpleScheduler::AssignmentCtx::HasUnusedBackends() const {
+  return first_unused_backend_idx_ < random_backend_order_.size();
+}
+
+const SimpleScheduler::IpAddr*
+    SimpleScheduler::AssignmentCtx::GetNextUnusedBackendAndIncrement() {
+  DCHECK(HasUnusedBackends());
+  const IpAddr* ip = &(random_backend_order_[first_unused_backend_idx_++])->first;
+  DCHECK(backend_map().find(*ip) != backend_map().end());
+  return ip;
+}
+
+int SimpleScheduler::AssignmentCtx::GetBackendRank(const IpAddr& ip) const {
+  auto it = random_backend_rank_.find(ip);
+  DCHECK(it != random_backend_rank_.end());
+  return it->second;
+}
+
+void SimpleScheduler::AssignmentCtx::SelectBackendOnHost(const IpAddr& backend_ip,
+    TBackendDescriptor* backend) {
+  BackendMap::const_iterator backend_it = backend_map().find(backend_ip);
+  DCHECK(backend_it != backend_map().end());
+  const BackendList& backends_on_host = backend_it->second;
+  DCHECK(backends_on_host.size() > 0);
+  if (backends_on_host.size() == 1) {
+    *backend = *backends_on_host.begin();
+  } else {
+    BackendList::const_iterator* next_backend_on_host;
+    next_backend_on_host = FindOrInsert(&next_backend_per_host_, backend_ip,
+        backends_on_host.begin());
+    DCHECK(find(backends_on_host.begin(), backends_on_host.end(), **next_backend_on_host)
+        != backends_on_host.end());
+    *backend = **next_backend_on_host;
+    // Rotate
+    ++(*next_backend_on_host);
+    if (*next_backend_on_host == backends_on_host.end()) {
+      *next_backend_on_host = backends_on_host.begin();
+    }
+  }
+}
+
+void SimpleScheduler::AssignmentCtx::RecordScanRangeAssignment(
+    const TBackendDescriptor& backend, PlanNodeId node_id,
+    const vector<TNetworkAddress>& host_list,
+    const TScanRangeLocations& scan_range_locations,
+    FragmentScanRangeAssignment* assignment) {
+  int64_t scan_range_length = 0;
+  if (scan_range_locations.scan_range.__isset.hdfs_file_split) {
+    scan_range_length = scan_range_locations.scan_range.hdfs_file_split.length;
+  } else if (scan_range_locations.scan_range.__isset.kudu_key_range) {
+    // Hack so that kudu ranges are well distributed.
+    // TODO: KUDU-1133 Use the tablet size instead.
+    scan_range_length = 1000;
+  }
+
+  IpAddr backend_ip;
+  backend_config_.LookUpBackendIp(backend.address.hostname, &backend_ip);
+  DCHECK(backend_map().find(backend_ip) != backend_map().end());
+  assignment_heap_.InsertOrUpdate(backend_ip, scan_range_length,
+      GetBackendRank(backend_ip));
+
+  // See if the read will be remote. This is not the case if the impalad runs on one of
+  // the replica's datanodes.
+  bool remote_read = true;
+  // For local reads we can set volume_id and is_cached. For remote reads HDFS will
+  // decide which replica to use so we keep those at default values.
+  int volume_id = -1;
+  bool is_cached = false;
+  for (const TScanRangeLocation& location: scan_range_locations.locations) {
+    const TNetworkAddress& replica_host = host_list[location.host_idx];
+    IpAddr replica_ip;
+    if (backend_config_.LookUpBackendIp(replica_host.hostname, &replica_ip)
+        && backend_ip == replica_ip) {
+      remote_read = false;
+      volume_id = location.volume_id;
+      is_cached = location.is_cached;
+      break;
+    }
+  }
+
+  if (remote_read) {
+    assignment_byte_counters_.remote_bytes += scan_range_length;
+  } else {
+    assignment_byte_counters_.local_bytes += scan_range_length;
+    if (is_cached) assignment_byte_counters_.cached_bytes += scan_range_length;
+  }
+
+  if (total_assignments_ != NULL) {
+    DCHECK(total_local_assignments_ != NULL);
+    total_assignments_->Increment(1);
+    if (!remote_read) total_local_assignments_->Increment(1);
+  }
+
+  PerNodeScanRanges* scan_ranges = FindOrInsert(assignment, backend.address,
+      PerNodeScanRanges());
+  vector<TScanRangeParams>* scan_range_params_list = FindOrInsert(scan_ranges, node_id,
+      vector<TScanRangeParams>());
+  // Add scan range.
+  TScanRangeParams scan_range_params;
+  scan_range_params.scan_range = scan_range_locations.scan_range;
+  scan_range_params.__set_volume_id(volume_id);
+  scan_range_params.__set_is_cached(is_cached);
+  scan_range_params.__set_is_remote(remote_read);
+  scan_range_params_list->push_back(scan_range_params);
+
+  if (VLOG_FILE_IS_ON) {
+    VLOG_FILE << "SimpleScheduler assignment to backend: " << backend.address
+        << "(" << (remote_read ? "remote" : "local") << " selection)";
+  }
+}
+
+void SimpleScheduler::AssignmentCtx::PrintAssignment(
+    const FragmentScanRangeAssignment& assignment) {
+  VLOG_FILE << "Total remote scan volume = " <<
+    PrettyPrinter::Print(assignment_byte_counters_.remote_bytes, TUnit::BYTES);
+  VLOG_FILE << "Total local scan volume = " <<
+    PrettyPrinter::Print(assignment_byte_counters_.local_bytes, TUnit::BYTES);
+  VLOG_FILE << "Total cached scan volume = " <<
+    PrettyPrinter::Print(assignment_byte_counters_.cached_bytes, TUnit::BYTES);
+
+  for (const FragmentScanRangeAssignment::value_type& entry: assignment) {
+    VLOG_FILE << "ScanRangeAssignment: server=" << ThriftDebugString(entry.first);
+    for (const PerNodeScanRanges::value_type& per_node_scan_ranges: entry.second) {
+      stringstream str;
+      for (const TScanRangeParams& params: per_node_scan_ranges.second) {
+        str << ThriftDebugString(params) << " ";
+      }
+      VLOG_FILE << "node_id=" << per_node_scan_ranges.first << " ranges=" << str.str();
+    }
+  }
+}
+
+void SimpleScheduler::AddressableAssignmentHeap::InsertOrUpdate(const IpAddr& ip,
+    int64_t assigned_bytes, int rank) {
+  auto handle_it = backend_handles_.find(ip);
+  if (handle_it == backend_handles_.end()) {
+    AssignmentHeap::handle_type handle = backend_heap_.push({assigned_bytes, rank, ip});
+    backend_handles_.emplace(ip, handle);
+  } else {
+    // We need to rebuild the heap after every update operation. Calling decrease once is
+    // sufficient as both assignments decrease the key.
+    AssignmentHeap::handle_type handle = handle_it->second;
+    (*handle).assigned_bytes += assigned_bytes;
+    backend_heap_.decrease(handle);
+  }
+}
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0ad935b6/be/src/scheduling/simple-scheduler.h
----------------------------------------------------------------------
diff --git a/be/src/scheduling/simple-scheduler.h b/be/src/scheduling/simple-scheduler.h
index 5cff7a5..a3a9290 100644
--- a/be/src/scheduling/simple-scheduler.h
+++ b/be/src/scheduling/simple-scheduler.h
@@ -20,6 +20,7 @@
 #include <vector>
 #include <string>
 #include <list>
+#include <boost/heap/binomial_heap.hpp>
 #include <boost/unordered_map.hpp>
 #include <boost/thread/mutex.hpp>
 
@@ -28,6 +29,7 @@
 #include "statestore/statestore-subscriber.h"
 #include "statestore/statestore.h"
 #include "util/metrics.h"
+#include "util/runtime-profile.h"
 #include "scheduling/admission-controller.h"
 #include "gen-cpp/Types_types.h"  // for TNetworkAddress
 #include "gen-cpp/ResourceBrokerService_types.h"
@@ -42,10 +44,26 @@ class SchedulerWrapper;
 
 /// Performs simple scheduling by matching between a list of backends configured
 /// either from the statestore, or from a static list of addresses, and a list
-/// of target data locations.
-//
+/// of target data locations. The current set of backends is stored in backend_config_.
+/// When receiving changes to the backend configuration from the statestore we will make a
+/// copy of this configuration, apply the updates to the copy and atomically swap the
+/// contents of the backend_config_ pointer.
+///
 /// TODO: Notice when there are duplicate statestore registrations (IMPALA-23)
-/// TODO: Handle deltas from the statestore
+/// TODO: Track assignments (assignment_ctx in ComputeScanRangeAssignment) per query
+///       instead of per plan node?
+/// TODO: Remove disable_cached_reads query option in CDH6
+/// TODO: Replace the usage of shared_ptr with atomic_shared_ptr once compilers support
+///       it. Alternatively consider using Kudu's rw locks.
+/// TODO: Inject global dependencies into the class (for example ExecEnv::GetInstance(),
+///       RNG used during scheduling, FLAGS_*)
+///       to make it testable.
+/// TODO: Benchmark the performance of the scheduler. The tests need to include setups
+///       with:
+///         - Small and large number of backends.
+///         - Small and large query plans.
+///         - Scheduling query plans with concurrent updates to the internal backend
+///           configuration.
 class SimpleScheduler : public Scheduler {
  public:
   static const std::string IMPALA_MEMBERSHIP_TOPIC;
@@ -64,20 +82,7 @@ class SimpleScheduler : public Scheduler {
       Webserver* webserver, ResourceBroker* resource_broker,
       RequestPoolService* request_pool_service);
 
-  /// Return a backend such that the impalad at backend.address should be used to read
-  /// data from the given data_loation
-  virtual impala::Status GetBackend(const TNetworkAddress& data_location,
-      TBackendDescriptor* backend);
-
-  virtual void GetAllKnownBackends(BackendList* backends);
-
-  virtual bool HasLocalBackend(const TNetworkAddress& data_location) {
-    boost::lock_guard<boost::mutex> l(backend_map_lock_);
-    BackendMap::iterator entry = backend_map_.find(data_location.hostname);
-    return (entry != backend_map_.end() && entry->second.size() > 0);
-  }
-
-  /// Registers with the subscription manager if required
+  /// Register with the subscription manager if required
   virtual impala::Status Init();
 
   virtual Status Schedule(Coordinator* coord, QuerySchedule* schedule);
@@ -87,20 +92,222 @@ class SimpleScheduler : public Scheduler {
   virtual void HandleLostResource(const TUniqueId& client_resource_id);
 
  private:
-  /// Protects access to backend_map_ and backend_ip_map_, which might otherwise be
-  /// updated asynchronously with respect to reads. Also protects the locality
-  /// counters, which are updated in GetBackends.
-  boost::mutex backend_map_lock_;
-
-  /// Map from a datanode's IP address to a list of backend addresses running on that
-  /// node.
-  typedef boost::unordered_map<std::string, std::list<TBackendDescriptor>> BackendMap;
-  BackendMap backend_map_;
-
-  /// Map from a datanode's hostname to its IP address to support both hostname based
-  /// lookup.
-  typedef boost::unordered_map<std::string, std::string> BackendIpAddressMap;
-  BackendIpAddressMap backend_ip_map_;
+  /// Type to store hostnames, which can be rfc1123 hostnames or IPv4 addresses.
+  typedef std::string Hostname;
+
+  /// Type to store IPv4 addresses.
+  typedef std::string IpAddr;
+
+  typedef std::list<TBackendDescriptor> BackendList;
+
+  /// Map from a host's IP address to a list of backends running on that node.
+  typedef boost::unordered_map<IpAddr, BackendList> BackendMap;
+
+  /// Map from a host's IP address to the next backend to be round-robin scheduled for
+  /// that host (needed for setups with multiple backends on a single host)
+  typedef boost::unordered_map<IpAddr, BackendList::const_iterator> NextBackendPerHost;
+
+  /// Map from a hostname to its IP address to support hostname based backend lookup.
+  typedef boost::unordered_map<Hostname, IpAddr> BackendIpAddressMap;
+
+  /// Configuration class to store a list of backends per IP address and a mapping from
+  /// hostnames to IP addresses. backend_ip_map contains entries for all backends in
+  /// backend_map and needs to be updated whenever backend_map changes. Each plan node
+  /// creates a read-only copy of the scheduler's current backend_config_ to use during
+  /// scheduling.
+  class BackendConfig {
+   public:
+    BackendConfig() {}
+
+    /// Construct config from list of backends.
+    BackendConfig(const std::vector<TNetworkAddress>& backends);
+
+    void AddBackend(const TBackendDescriptor& be_desc);
+    void RemoveBackend(const TBackendDescriptor& be_desc);
+
+    /// Look up the IP address of 'hostname' in the internal backend maps and return
+    /// whether the lookup was successful. If 'hostname' itself is a valid IP address then
+    /// it is copied to 'ip' and true is returned. 'ip' can be NULL if the caller only
+    /// wants to check whether the lookup succeeds. Use this method to resolve datanode
+    /// hostnames to IP addresses during scheduling, to prevent blocking on the OS.
+    bool LookUpBackendIp(const Hostname& hostname, IpAddr* ip) const;
+
+    int NumBackends() const { return backend_map().size(); }
+
+    const BackendMap& backend_map() const { return backend_map_; }
+    const BackendIpAddressMap& backend_ip_map() const { return backend_ip_map_; }
+
+   private:
+    BackendMap backend_map_;
+    BackendIpAddressMap backend_ip_map_;
+  };
+
+  typedef std::shared_ptr<const BackendConfig> BackendConfigPtr;
+
+  /// Internal structure to track scan range assignments for a backend host. This struct
+  /// is used as the heap element in and maintained by AddressableAssignmentHeap.
+  struct BackendAssignmentInfo {
+    /// The number of bytes assigned to a backend host.
+    int64_t assigned_bytes;
+
+    /// Each host gets assigned a random rank to break ties in a random but deterministic
+    /// order per plan node.
+    const int random_rank;
+
+    /// IP address of the backend.
+    IpAddr ip;
+
+    /// Compare two elements of this struct. The key is (assigned_bytes, random_rank).
+    bool operator>(const BackendAssignmentInfo& rhs) const {
+      if (assigned_bytes != rhs.assigned_bytes) {
+        return assigned_bytes > rhs.assigned_bytes;
+      }
+      return random_rank > rhs.random_rank;
+    }
+  };
+
+  /// Heap to compute candidates for scan range assignments. Elements are of type
+  /// BackendAssignmentInfo and track assignment information for each backend. By default
+  /// boost implements a max-heap so we use std::greater<T> to obtain a min-heap. This
+  /// will make the top() element of the heap be the backend with the lowest number of
+  /// assigned bytes and the lowest random rank.
+  typedef boost::heap::binomial_heap<BackendAssignmentInfo,
+      boost::heap::compare<std::greater<BackendAssignmentInfo>>> AssignmentHeap;
+
+  /// Map to look up handles to heap elements to modify heap element keys.
+  typedef boost::unordered_map<IpAddr, AssignmentHeap::handle_type> BackendHandleMap;
+
+  /// Class to store backend information in an addressable heap. In addition to
+  /// AssignmentHeap it can be used to look up heap elements by their IP address and
+  /// update their key. For each plan node we create a new heap, so they are not shared
+  /// between concurrent invocations of the scheduler.
+  class AddressableAssignmentHeap {
+   public:
+    const AssignmentHeap& backend_heap() const { return backend_heap_; }
+    const BackendHandleMap& backend_handles() const { return backend_handles_; }
+
+    void InsertOrUpdate(const IpAddr& ip, int64_t assigned_bytes, int rank);
+
+    // Forward interface for boost::heap
+    decltype(auto) size() const { return backend_heap_.size(); }
+    decltype(auto) top() const { return backend_heap_.top(); }
+
+    // Forward interface for boost::unordered_map
+    decltype(auto) find(const IpAddr& ip) const { return backend_handles_.find(ip); }
+    decltype(auto) end() const { return backend_handles_.end(); }
+
+   private:
+    // Heap to determine next backend.
+    AssignmentHeap backend_heap_;
+    // Maps backend IPs to handles in the heap.
+    BackendHandleMap backend_handles_;
+  };
+
+  /// Class to store context information on assignments during scheduling. It is
+  /// initialized with a copy of the global backend information and assigns a random rank
+  /// to each backend to break ties in cases where multiple backends have been assigned
+  /// the same number or bytes. It tracks the number of assigned bytes, which backends
+  /// have already been used, etc. Objects of this class are created in
+  /// ComputeScanRangeAssignment() and thus don't need to be thread safe.
+  class AssignmentCtx {
+   public:
+    AssignmentCtx(const BackendConfig& backend_config, IntCounter* total_assignments,
+        IntCounter* total_local_assignments);
+
+    /// Among hosts in 'data_locations', select the one with the minimum number of
+    /// assigned bytes. If backends have been assigned equal amounts of work and
+    /// 'break_ties_by_rank' is true, then the backend rank is used to break ties.
+    /// Otherwise the first backend according to their order in 'data_locations' is
+    /// selected.
+    const IpAddr* SelectLocalBackendHost(const std::vector<IpAddr>& data_locations,
+        bool break_ties_by_rank);
+
+    /// Select a backend host for a remote read. If there are unused backend hosts, then
+    /// those will be preferred. Otherwise the one with the lowest number of assigned
+    /// bytes is picked. If backends have been assigned equal amounts of work, then the
+    /// backend rank is used to break ties.
+    const IpAddr* SelectRemoteBackendHost();
+
+    /// Return the next backend that has not been assigned to. This assumes that a
+    /// returned backend will also be assigned to. The caller must make sure that
+    /// HasUnusedBackends() is true.
+    const IpAddr* GetNextUnusedBackendAndIncrement();
+
+    /// Pick a backend in round-robin fashion from multiple backends on a single host.
+    void SelectBackendOnHost(const IpAddr& backend_ip, TBackendDescriptor* backend);
+
+    /// Build a new TScanRangeParams object and append it to the assignment list for the
+    /// tuple (backend, node_id) in 'assignment'. Also, update assignment_heap_ and
+    /// assignment_byte_counters_, increase the counters 'total_assignments_' and
+    /// 'total_local_assignments_'. 'scan_range_locations' contains information about the
+    /// scan range and its replica locations.
+    void RecordScanRangeAssignment(const TBackendDescriptor& backend, PlanNodeId node_id,
+        const vector<TNetworkAddress>& host_list,
+        const TScanRangeLocations& scan_range_locations,
+        FragmentScanRangeAssignment* assignment);
+
+    const BackendConfig& backend_config() const { return backend_config_; }
+    const BackendMap& backend_map() const { return backend_config_.backend_map(); }
+
+    /// Print the assignment and statistics to VLOG_FILE.
+    void PrintAssignment(const FragmentScanRangeAssignment& assignment);
+
+   private:
+    /// A struct to track various counts of assigned bytes during scheduling.
+    struct AssignmentByteCounters {
+      int64_t remote_bytes = 0;
+      int64_t local_bytes = 0;
+      int64_t cached_bytes = 0;
+    };
+
+    /// Used to look up hostnames to IP addresses and IP addresses to backend.
+    const BackendConfig& backend_config_;
+
+    // Addressable heap to select remote backends from. Elements are ordered by the number
+    // of already assigned bytes (and a random rank to break ties).
+    AddressableAssignmentHeap assignment_heap_;
+
+    /// Store a random rank per backend host to break ties between otherwise equivalent
+    /// replicas (e.g., those having the same number of assigned bytes).
+    boost::unordered_map<IpAddr, int> random_backend_rank_;
+
+    // Index into random_backend_order. It points to the first unused backend and is used
+    // to select unused backends and inserting them into the assignment_heap_.
+    int first_unused_backend_idx_;
+
+    /// Store a random permutation of backend hosts to select backends from.
+    std::vector<const BackendMap::value_type*> random_backend_order_;
+
+    /// Track round robin information per backend host.
+    NextBackendPerHost next_backend_per_host_;
+
+    /// Track number of assigned bytes that have been read from cache, locally, or
+    /// remotely.
+    AssignmentByteCounters assignment_byte_counters_;
+
+    /// Pointers to the scheduler's counters.
+    IntCounter* total_assignments_;
+    IntCounter* total_local_assignments_;
+
+    /// Return whether there are backends that have not been assigned a scan range.
+    bool HasUnusedBackends() const;
+
+    /// Return the rank of a backend.
+    int GetBackendRank(const IpAddr& ip) const;
+  };
+
+  /// The scheduler's backend configuration. When receiving changes to the backend
+  /// configuration from the statestore we will make a copy of the stored object, apply
+  /// the updates to the copy and atomically swap the contents of this pointer.
+  BackendConfigPtr backend_config_;
+
+  /// Protect access to backend_config_ which might otherwise be updated asynchronously
+  /// with respect to reads.
+  mutable boost::mutex backend_config_lock_;
+
+  /// Total number of scan ranges assigned to backends during the lifetime of the
+  /// scheduler.
+  int64_t num_assignments_;
 
   /// Map from unique backend id to TBackendDescriptor. Used to track the known backends
   /// from the statestore. It's important to track both the backend ID as well as the
@@ -116,19 +323,16 @@ class SimpleScheduler : public Scheduler {
   /// Webserver for /backends. Not owned by us.
   Webserver* webserver_;
 
-  /// round robin entry in BackendMap for non-local host assignment
-  BackendMap::iterator next_nonlocal_backend_entry_;
-
   /// Pointer to a subscription manager (which we do not own) which is used to register
   /// for dynamic updates to the set of available backends. May be NULL if the set of
   /// backends is fixed.
   StatestoreSubscriber* statestore_subscriber_;
 
-  /// Unique - across the cluster - identifier for this impala backend
-  const std::string backend_id_;
+  /// Unique - across the cluster - identifier for this impala backend.
+  const std::string local_backend_id_;
 
-  /// Describes this backend, including the Impalad service address
-  TBackendDescriptor backend_descriptor_;
+  /// Describe this backend, including the Impalad service address.
+  TBackendDescriptor local_backend_descriptor_;
 
   ThriftSerializer thrift_serializer_;
 
@@ -136,25 +340,23 @@ class SimpleScheduler : public Scheduler {
   IntCounter* total_assignments_;
   IntCounter* total_local_assignments_;
 
-  /// Initialisation metric
-  BooleanProperty* initialised_;
+  /// Initialization metric
+  BooleanProperty* initialized_;
+
   /// Current number of backends
   IntGauge* num_fragment_instances_metric_;
 
-  /// Counts the number of UpdateMembership invocations, to help throttle the logging.
-  uint32_t update_count_;
-
-  /// Protects active_reservations_ and active_client_resources_.
+  /// Protect active_reservations_ and active_client_resources_.
   boost::mutex active_resources_lock_;
 
-  /// Maps from a Llama reservation id to the coordinator of the query using that
+  /// Map from a Llama reservation id to the coordinator of the query using that
   /// reservation. The map is used to cancel queries whose reservation has been preempted.
   /// Entries are added in Schedule() calls that result in granted resource allocations.
   /// Entries are removed in Release().
   typedef boost::unordered_map<TUniqueId, Coordinator*> ActiveReservationsMap;
   ActiveReservationsMap active_reservations_;
 
-  /// Maps from client resource id to the coordinator of the query using that resource.
+  /// Map from client resource id to the coordinator of the query using that resource.
   /// The map is used to cancel queries whose resource(s) have been preempted.
   /// Entries are added in Schedule() calls that result in granted resource allocations.
   /// Entries are removed in Release().
@@ -172,12 +374,20 @@ class SimpleScheduler : public Scheduler {
   /// Used to make admission decisions in 'Schedule()'
   boost::scoped_ptr<AdmissionController> admission_controller_;
 
-  /// Adds the granted reservation and resources to the active_reservations_ and
+  /// Helper methods to access backend_config_ (the shared_ptr, not its contents),
+  /// protecting the access with backend_config_lock_.
+  BackendConfigPtr GetBackendConfig() const;
+  void SetBackendConfig(const BackendConfigPtr& backend_config);
+
+  /// Return a list of all backends registered with the scheduler.
+  void GetAllKnownBackends(BackendList* backends);
+
+  /// Add the granted reservation and resources to the active_reservations_ and
   /// active_client_resources_ maps, respectively.
   void AddToActiveResourceMaps(
       const TResourceBrokerReservationResponse& reservation, Coordinator* coord);
 
-  /// Removes the given reservation and resources from the active_reservations_ and
+  /// Remove the given reservation and resources from the active_reservations_ and
   /// active_client_resources_ maps, respectively.
   void RemoveFromActiveResourceMaps(
       const TResourceBrokerReservationResponse& reservation);
@@ -194,64 +404,126 @@ class SimpleScheduler : public Scheduler {
   void BackendsUrlCallback(const Webserver::ArgumentMap& args,
       rapidjson::Document* document);
 
-  /// Determines the pool for a user and query options via request_pool_service_.
-  Status GetRequestPool(const std::string& user,
-      const TQueryOptions& query_options, std::string* pool) const;
+  /// Determine the pool for a user and query options via request_pool_service_.
+  Status GetRequestPool(const std::string& user, const TQueryOptions& query_options,
+      std::string* pool) const;
 
-  /// Computes the assignment of scan ranges to hosts for each scan node in schedule.
-  /// Unpartitioned fragments are assigned to the coord. Populates the schedule's
+  /// Compute the assignment of scan ranges to hosts for each scan node in 'schedule'.
+  /// Unpartitioned fragments are assigned to the coordinator. Populate the schedule's
   /// fragment_exec_params_ with the resulting scan range assignment.
   Status ComputeScanRangeAssignment(const TQueryExecRequest& exec_request,
       QuerySchedule* schedule);
 
-  /// Does a scan range assignment (returned in 'assignment') based on a list of scan
-  /// range locations for a particular scan node.
-  /// If exec_at_coord is true, all scan ranges will be assigned to the coord node.
-  Status ComputeScanRangeAssignment(PlanNodeId node_id,
-      const TReplicaPreference::type* node_replica_preference, bool node_random_replica,
-      const std::vector<TScanRangeLocations>& locations,
+  /// Process the list of scan ranges of a single plan node and compute scan range
+  /// assignments (returned in 'assignment'). The result is a mapping from hosts to their
+  /// assigned scan ranges per plan node.
+  ///
+  /// If exec_at_coord is true, all scan ranges will be assigned to the coordinator host.
+  /// Otherwise the assignment is computed for each scan range as follows:
+  ///
+  /// Scan ranges refer to data, which is usually replicated on multiple hosts. All scan
+  /// ranges where one of the replica hosts also runs an impala backend are processed
+  /// first. If more than one of the replicas run an impala backend, then the 'memory
+  /// distance' of each backend is considered. The concept of memory distance reflects the
+  /// cost of moving data into the processing backend's main memory. Reading from cached
+  /// replicas is generally considered less costly than reading from a local disk, which
+  /// in turn is cheaper than reading data from a remote node. If multiple backends of the
+  /// same memory distance are found, then the one with the least amount of previously
+  /// assigned work is picked, thus aiming to distribute the work as evenly as possible.
+  ///
+  /// Finally, scan ranges are considered which do not have an impalad backend running on
+  /// any of their data nodes. They will be load-balanced by assigned bytes across all
+  /// backends
+  ///
+  /// The resulting assignment is influenced by the following query options:
+  ///
+  /// replica_preference:
+  ///   This value is used as a minimum memory distance for all replicas. For example, by
+  ///   setting this to DISK_LOCAL, all cached replicas will be treated as if they were
+  ///   not cached, but local disk replicas. This can help prevent hot-spots by spreading
+  ///   the assignments over more replicas. Allowed values are CACHE_LOCAL (default),
+  ///   DISK_LOCAL and REMOTE.
+  ///
+  /// disable_cached_reads:
+  ///   Setting this value to true is equivalent to setting replica_preference to
+  ///   DISK_LOCAL and takes precedence over replica_preference. The default setting is
+  ///   false.
+  ///
+  /// schedule_random_replica:
+  ///   When equivalent backends with a memory distance of DISK_LOCAL are found for a scan
+  ///   range (same memory distance, same amount of assigned work), then the first one
+  ///   will be picked deterministically. This aims to make better use of OS buffer
+  ///   caches, but can lead to performance bottlenecks on individual hosts. Setting this
+  ///   option to true will randomly change the order in which equivalent replicas are
+  ///   picked for different plan nodes. This helps to compute a more even assignment,
+  ///   with the downside being an increased memory usage for OS buffer caches. The
+  ///   default setting is false. Selection between equivalent replicas with memory
+  ///   distance of CACHE_LOCAL or REMOTE happens based on a random order.
+  ///
+  /// The method takes the following parameters:
+  ///
+  /// backend_config:          Backend configuration to use for scheduling.
+  /// node_id:                 ID of the plan node.
+  /// node_replica_preference: Query hint equivalent to replica_preference.
+  /// node_random_replica:     Query hint equivalent to schedule_random_replica.
+  /// locations:               List of scan ranges to be assigned to backends.
+  /// host_list:               List of hosts, into which 'locations' will index.
+  /// exec_at_coord:           Whether to schedule all scan ranges on the coordinator.
+  /// query_options:           Query options for the current query.
+  /// timer:                   Tracks execution time of ComputeScanRangeAssignment.
+  /// assignment:              Output parameter, to which new assignments will be added.
+  Status ComputeScanRangeAssignment(const BackendConfig& backend_config,
+      PlanNodeId node_id, const TReplicaPreference::type* node_replica_preference,
+      bool node_random_replica, const std::vector<TScanRangeLocations>& locations,
       const std::vector<TNetworkAddress>& host_list, bool exec_at_coord,
-      const TQueryOptions& query_options, FragmentScanRangeAssignment* assignment);
+      const TQueryOptions& query_options, RuntimeProfile::Counter* timer,
+      FragmentScanRangeAssignment* assignment);
 
-  /// Populates fragment_exec_params_ in schedule.
+  /// Populate fragment_exec_params_ in schedule.
   void ComputeFragmentExecParams(const TQueryExecRequest& exec_request,
       QuerySchedule* schedule);
 
-  /// For each fragment in exec_request, computes hosts on which to run the instances
+  /// For each fragment in exec_request, compute the hosts on which to run the instances
   /// and stores result in fragment_exec_params_.hosts.
   void ComputeFragmentHosts(const TQueryExecRequest& exec_request,
       QuerySchedule* schedule);
 
-  /// Returns the id of the leftmost node of any of the given types in 'plan',
-  /// or INVALID_PLAN_NODE_ID if no such node present.
+  /// Return the id of the leftmost node of any of the given types in 'plan', or
+  /// INVALID_PLAN_NODE_ID if no such node present.
   PlanNodeId FindLeftmostNode(
       const TPlan& plan, const std::vector<TPlanNodeType::type>& types);
 
-  /// Returns the index (w/in exec_request.fragments) of fragment that sends its output
-  /// to exec_request.fragment[fragment_idx]'s leftmost ExchangeNode.
-  /// Returns INVALID_PLAN_NODE_ID if the leftmost node is not an exchange node.
-  int FindLeftmostInputFragment(
-      int fragment_idx, const TQueryExecRequest& exec_request);
+  /// Return the index (w/in exec_request.fragments) of fragment that sends its output to
+  /// exec_request.fragment[fragment_idx]'s leftmost ExchangeNode.
+  /// Return INVALID_PLAN_NODE_ID if the leftmost node is not an exchange node.
+  int FindLeftmostInputFragment(int fragment_idx, const TQueryExecRequest& exec_request);
 
-  /// Adds all hosts the given scan is executed on to scan_hosts.
+  /// Add all hosts the given scan is executed on to scan_hosts.
   void GetScanHosts(TPlanNodeId scan_id, const TQueryExecRequest& exec_request,
       const FragmentExecParams& params, std::vector<TNetworkAddress>* scan_hosts);
 
-  /// Returns true if 'plan' contains a node of the given type.
+  /// Return true if 'plan' contains a node of the given type.
   bool ContainsNode(const TPlan& plan, TPlanNodeType::type type);
 
-  /// Returns all ids of nodes in 'plan' of any of the given types.
-  void FindNodes(const TPlan& plan,
-      const std::vector<TPlanNodeType::type>& types, std::vector<TPlanNodeId>* results);
+  /// Return all ids of nodes in 'plan' of any of the given types.
+  void FindNodes(const TPlan& plan, const std::vector<TPlanNodeType::type>& types,
+      std::vector<TPlanNodeId>* results);
 
   /// Returns the index (w/in exec_request.fragments) of fragment that sends its output
   /// to the given exchange in the given fragment index.
   int FindSenderFragment(TPlanNodeId exch_id, int fragment_idx,
       const TQueryExecRequest& exec_request);
 
+  /// Deterministically resolve a host to one of its IP addresses. This method will call
+  /// into the OS, so it can take a long time to return. Use this method to resolve
+  /// hostnames during initialization and while processing statestore updates.
+  static Status HostnameToIpAddr(const Hostname& hostname, IpAddr* ip);
+
   friend class impala::SchedulerWrapper;
   FRIEND_TEST(SimpleAssignmentTest, ComputeAssignmentDeterministicNonCached);
   FRIEND_TEST(SimpleAssignmentTest, ComputeAssignmentRandomNonCached);
+  FRIEND_TEST(SimpleAssignmentTest, ComputeAssignmentRandomDiskLocal);
+  FRIEND_TEST(SimpleAssignmentTest, ComputeAssignmentRandomRemote);
 };
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0ad935b6/be/src/service/query-options.cc
----------------------------------------------------------------------
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index bdfb831..a59a31e 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -281,6 +281,22 @@ Status impala::SetQueryOption(const string& key, const string& value,
         query_options->__set_optimize_partition_key_scans(
             iequals(value, "true") || iequals(value, "1"));
         break;
+      case TImpalaQueryOptions::REPLICA_PREFERENCE:
+        if (iequals(value, "cache_local") || iequals(value, "0")) {
+          if (query_options->disable_cached_reads) {
+            return Status("Conflicting settings: DISABLE_CACHED_READS = true and"
+                " REPLICA_PREFERENCE = CACHE_LOCAL");
+          }
+          query_options->__set_replica_preference(TReplicaPreference::CACHE_LOCAL);
+        } else if (iequals(value, "disk_local") || iequals(value, "2")) {
+          query_options->__set_replica_preference(TReplicaPreference::DISK_LOCAL);
+        } else if (iequals(value, "remote") || iequals(value, "4")) {
+          query_options->__set_replica_preference(TReplicaPreference::REMOTE);
+        } else {
+          return Status(Substitute("Invalid replica memory distance preference '$0'."
+              "Valid values are CACHE_LOCAL(0), DISK_LOCAL(2), REMOTE(4)", value));
+        }
+        break;
       case TImpalaQueryOptions::SCHEDULE_RANDOM_REPLICA:
         query_options->__set_schedule_random_replica(
             iequals(value, "true") || iequals(value, "1"));

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0ad935b6/be/src/service/query-options.h
----------------------------------------------------------------------
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index 4104f2b..5677b56 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -65,6 +65,7 @@ class TQueryOptions;
   QUERY_OPT_FN(seq_compression_mode, SEQ_COMPRESSION_MODE)\
   QUERY_OPT_FN(exec_single_node_rows_threshold, EXEC_SINGLE_NODE_ROWS_THRESHOLD)\
   QUERY_OPT_FN(optimize_partition_key_scans, OPTIMIZE_PARTITION_KEY_SCANS)\
+  QUERY_OPT_FN(replica_preference, REPLICA_PREFERENCE)\
   QUERY_OPT_FN(schedule_random_replica, SCHEDULE_RANDOM_REPLICA)\
   QUERY_OPT_FN(scan_node_codegen_threshold, SCAN_NODE_CODEGEN_THRESHOLD)\
   QUERY_OPT_FN(disable_streaming_preaggregations, DISABLE_STREAMING_PREAGGREGATIONS)\