You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jo...@apache.org on 2019/08/14 23:10:36 UTC

[impala] branch master updated: IMPALA-8685, IMPALA-8677: Use consistent scheduling for small clusters

This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 33475a3  IMPALA-8685,IMPALA-8677: Use consistent scheduling for small clusters
33475a3 is described below

commit 33475a3e2c482681f1bab307055b75b0643ecdee
Author: Joe McDonnell <jo...@cloudera.com>
AuthorDate: Tue Aug 6 17:50:56 2019 -0700

    IMPALA-8685,IMPALA-8677: Use consistent scheduling for small clusters
    
    In the original change for consistent scheduling, if a cluster has
    fewer nodes than the number of remote executor candidates, then
    the scheduler falls back to using the old SelectRemoteExecutor().
    SelectRemoteExecutor() considers all backends and picks the backend
    with the least assigned bytes; to break ties, it uses randomness.
    This means that clusters with fewer backends than
    num_remote_executor_candidates do not have consistent placement.
    
    For the file handle cache (the original user of consistent
    placement), this is not a major problem. However, for data caching,
    it can result in slower warm up of the data cache and greater
    duplication of the same data across different nodes.
    
    This changes the algorithm to use consistent placement even for
    small clusters (num nodes <= num_remote_executor_candidates).
    To make this more predictable, it increases the maximum number
    of iterations.
    
    This also changes GetRemoteExecutorCandidates() to return the
    candidates in the order that they were selected. While still
    using a set for detecting duplicate backends, the vector of
    distinct backends is constructed directly rather than by
    iterating over the set.
    
    Testing:
     - Modify the scheduler-test backend test to verify that small
       clusters use consistent scheduling.
    
    Change-Id: Icfdb2cc53d7206e316ea8a1cc28ad443f246f741
    Reviewed-on: http://gerrit.cloudera.org:8080/14026
    Reviewed-by: Lars Volker <lv...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/scheduling/scheduler-test.cc | 66 +++++++++++++++++++++----------------
 be/src/scheduling/scheduler.cc      | 65 +++++++++++++++++++++++++++---------
 2 files changed, 87 insertions(+), 44 deletions(-)

diff --git a/be/src/scheduling/scheduler-test.cc b/be/src/scheduling/scheduler-test.cc
index 8a1a541..1347457 100644
--- a/be/src/scheduling/scheduler-test.cc
+++ b/be/src/scheduling/scheduler-test.cc
@@ -227,31 +227,43 @@ TEST_F(SchedulerTest, RemoteExecutorCandidates) {
   // of 5, 6, and 7.
   schema.AddSingleBlockTable("T1", {5, 6, 7});
 
-  // Test a range of number of remote executor candidates, including cases where the
-  // number of remote executor candidates exceeds the number of Impala nodes.
-  for (int num_candidates = 1; num_candidates <= num_impala_nodes + 1; ++num_candidates) {
-    Plan plan(schema);
-    plan.AddTableScan("T1");
-    plan.SetRandomReplica(true);
-    plan.SetNumRemoteExecutorCandidates(num_candidates);
-
-    Result result(plan);
-    SchedulerWrapper scheduler(plan);
-    for (int i = 0; i < 100; ++i) ASSERT_OK(scheduler.Compute(&result));
-
-    ASSERT_EQ(100, result.NumAssignments());
-    EXPECT_EQ(100, result.NumTotalAssignments());
-    EXPECT_EQ(100 * Block::DEFAULT_BLOCK_SIZE, result.NumTotalAssignedBytes());
-    if (num_candidates < num_impala_nodes) {
-      EXPECT_EQ(num_candidates, result.NumDistinctBackends());
-    } else {
-      EXPECT_EQ(num_impala_nodes, result.NumDistinctBackends());
-    }
-    EXPECT_GE(result.MinNumAssignedBytesPerHost(), Block::DEFAULT_BLOCK_SIZE);
-    // If there is only one remote executor candidate, then all scan ranges will be
-    // assigned to one backend.
-    if (num_candidates == 1) {
-      EXPECT_EQ(result.MinNumAssignedBytesPerHost(), 100 * Block::DEFAULT_BLOCK_SIZE);
+  // Test a range of number of remote executor candidates with both true and false for
+  // schedule_random replica. This includes cases where the number of remote executor
+  // candidates exceeds the number of Impala nodes.
+  for (bool schedule_random_replica : {true, false}) {
+    for (int num_candidates = 1; num_candidates <= num_impala_nodes + 2;
+         ++num_candidates) {
+      Plan plan(schema);
+      plan.AddTableScan("T1");
+      plan.SetRandomReplica(schedule_random_replica);
+      plan.SetNumRemoteExecutorCandidates(num_candidates);
+
+      Result result(plan);
+      SchedulerWrapper scheduler(plan);
+      for (int i = 0; i < 100; ++i) ASSERT_OK(scheduler.Compute(&result));
+
+      ASSERT_EQ(100, result.NumAssignments());
+      EXPECT_EQ(100, result.NumTotalAssignments());
+      EXPECT_EQ(100 * Block::DEFAULT_BLOCK_SIZE, result.NumTotalAssignedBytes());
+
+      if (schedule_random_replica && num_candidates > 1) {
+        if (num_candidates < num_impala_nodes) {
+          EXPECT_EQ(num_candidates, result.NumDistinctBackends());
+        } else {
+          // Since this scenario still uses the consistent placement algorithm, there is
+          // no guarantee that the scan range will be placed on all the nodes. But
+          // it should be placed on almost all of the nodes.
+          EXPECT_GE(result.NumDistinctBackends(), num_impala_nodes - 1);
+        }
+        EXPECT_GE(result.MinNumAssignedBytesPerHost(), Block::DEFAULT_BLOCK_SIZE);
+      } else {
+        // If schedule_random_replica is false, then the scheduler will pick the first
+        // candidate (as none of the backends have any assignments). This means that all
+        // the iterations will assign work to the same backend. This is also true when
+        // the number of remote executor candidates is one.
+        EXPECT_EQ(result.NumDistinctBackends(), 1);
+        EXPECT_EQ(result.MinNumAssignedBytesPerHost(), 100 * Block::DEFAULT_BLOCK_SIZE);
+      }
     }
   }
 }
@@ -375,9 +387,7 @@ TEST_F(SchedulerTest, RemoteExecutorCandidateConsistency) {
   Plan plan(schema);
   plan.AddTableScan("T1");
   plan.SetRandomReplica(false);
-  // TODO: Consistent scheduling is only completely consistent to removing an unused
-  // node when the number of executor candidates is 1. See IMPALA-8677.
-  plan.SetNumRemoteExecutorCandidates(1);
+  plan.SetNumRemoteExecutorCandidates(3);
 
   Result result_base(plan);
   SchedulerWrapper scheduler(plan);
diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc
index 467694b..7e15bc7 100644
--- a/be/src/scheduling/scheduler.cc
+++ b/be/src/scheduling/scheduler.cc
@@ -56,6 +56,16 @@ static const string LOCAL_ASSIGNMENTS_KEY("simple-scheduler.local-assignments.to
 static const string ASSIGNMENTS_KEY("simple-scheduler.assignments.total");
 static const string SCHEDULER_INIT_KEY("simple-scheduler.initialized");
 
+// Consistent scheduling requires picking up to k distinct candidates out of n nodes.
+// Since each iteration can pick a node that it already picked (i.e. it is sampling with
+// replacement), it may need more than k iterations to pick k distinct candidates.
+// There is also no guaranteed bound on the number of iterations. To protect against
+// bugs and large numbers of iterations, we limit the number of iterations. This constant
+// determines the number of iterations per distinct candidate allowed. Eight iterations
+// per distinct candidate provides a very high probability of actually getting k distinct
+// candidates. See GetRemoteExecutorCandidates() for a deeper description.
+static const int MAX_ITERATIONS_PER_EXECUTOR_CANDIDATE = 8;
+
 Scheduler::Scheduler(MetricGroup* metrics, RequestPoolService* request_pool_service)
   : metrics_(metrics->GetOrCreateChildGroup("scheduler")),
     request_pool_service_(request_pool_service) {
@@ -542,7 +552,8 @@ Status Scheduler::ComputeScanRangeAssignment(const ExecutorConfig& executor_conf
   } // End of for loop over scan ranges.
 
   // Assign remote scans to executors.
-  int num_remote_executor_candidates = query_options.num_remote_executor_candidates;
+  int num_remote_executor_candidates =
+      min(query_options.num_remote_executor_candidates, executor_group.NumExecutors());
   for (const TScanRangeLocationList* scan_range_locations : remote_scan_range_locations) {
     DCHECK(!exec_at_coord);
     const IpAddr* executor_ip;
@@ -551,13 +562,10 @@ Status Scheduler::ComputeScanRangeAssignment(const ExecutorConfig& executor_conf
     // 1. When enabled by setting 'num_remote_executor_candidates' > 0
     // AND
     // 2. This is an HDFS file split
-    // AND
-    // 3. The number of remote executor candidates is less than the number of backends.
     // Otherwise, fall back to the normal method of selecting executors for remote
     // ranges, which allows for execution on any backend.
     if (scan_range_locations->scan_range.__isset.hdfs_file_split &&
-        num_remote_executor_candidates > 0 &&
-        num_remote_executor_candidates < executor_group.NumExecutors()) {
+        num_remote_executor_candidates > 0) {
       assignment_ctx.GetRemoteExecutorCandidates(
           &scan_range_locations->scan_range.hdfs_file_split,
           num_remote_executor_candidates, &remote_executor_candidates);
@@ -765,13 +773,14 @@ void Scheduler::AssignmentCtx::GetRemoteExecutorCandidates(
     vector<IpAddr>* remote_executor_candidates) {
   // This should be given an empty vector
   DCHECK_EQ(remote_executor_candidates->size(), 0);
-  // This function should not be used when 'num_candidates' exceeds the number
-  // of executors.
-  DCHECK_LT(num_candidates, executor_group_.NumExecutors());
+  // This function should not be called with 'num_candidates' exceeding the number of
+  // executors.
+  DCHECK_LE(num_candidates, executor_group_.NumExecutors());
   // Two different hashes of the filename can result in the same executor.
   // The function should return distinct executors, so it may need to do more hashes
   // than 'num_candidates'.
-  set<IpAddr> distinct_backends;
+  unordered_set<IpAddr> distinct_backends;
+  distinct_backends.reserve(num_candidates);
   // Generate multiple hashes of the file split by using the hash as a seed to a PRNG.
   // Note: The hash includes the partition path hash, the filename (relative to the
   // partition directory), and the offset. The offset is used to allow very large files
@@ -781,18 +790,42 @@ void Scheduler::AssignmentCtx::GetRemoteExecutorCandidates(
       hdfs_file_split->relative_path.length(), hash);
   hash = HashUtil::Hash(&hdfs_file_split->offset, sizeof(hdfs_file_split->offset), hash);
   pcg32 prng(hash);
-  // To avoid any problem scenarios, limit the total number of iterations
-  int max_iterations = num_candidates * 3;
+  // The function should return distinct executors, so it may need to do more hashes
+  // than 'num_candidates'. To avoid any problem scenarios, limit the total number of
+  // iterations. The number of iterations is set to a reasonably high level, because
+  // on average the loop short circuits considerably earlier. Using a higher number of
+  // iterations is useful for smaller clusters where we are using this function to get
+  // all the backends in a consistent order rather than picking a consistent subset.
+  // Suppose there are three nodes and the number of remote executor candidates is three.
+  // One can calculate the probability of picking three distinct executors in at most
+  // n iterations. For n=3, the second pick must not overlap the first (probability 2/3),
+  // and the third pick must not be either the first or second (probability 1/3). So:
+  // P(3) = 1*(2/3)*(1/3)=2/9
+  // The probability that it is done in at most n+1 steps is the probability that
+  // it completed in n steps combined with the probability that it completes in the n+1st
+  // step. In order to complete in the n+1st step, the previous n steps must not have
+  // all landed on a single backend (probability (1/3)^(n-1)) and this step must not land
+  // on the two backends already chosen (probability 1/3). So, the recursive step is:
+  // P(n+1) = P(n) + (1 - P(n))*(1-(1/3)^(n-1))*(1/3)
+  // Here are some example probabilities:
+  // Probability of completing in at most 5 iterations: 0.6284
+  // Probability of completing in at most 10 iterations: 0.9506
+  // Probability of completing in at most 15 iterations: 0.9935
+  // Probability of completing in at most 20 iterations: 0.9991
+  int max_iterations = num_candidates * MAX_ITERATIONS_PER_EXECUTOR_CANDIDATE;
   for (int i = 0; i < max_iterations; ++i) {
     // Look up nearest IpAddr
     const IpAddr* executor_addr = executor_group_.GetHashRing()->GetNode(prng());
     DCHECK(executor_addr != nullptr);
-    distinct_backends.insert(*executor_addr);
+    auto insert_ret = distinct_backends.insert(*executor_addr);
+    // The return type of unordered_set.insert() is a pair<iterator, bool> where the
+    // second element is whether this was a new element. If this is a new element,
+    // add this element to the return vector.
+    if (insert_ret.second) {
+      remote_executor_candidates->push_back(*executor_addr);
+    }
     // Short-circuit if we reach the appropriate number of replicas
-    if (distinct_backends.size() == num_candidates) break;
-  }
-  for (const IpAddr& addr : distinct_backends) {
-    remote_executor_candidates->push_back(addr);
+    if (remote_executor_candidates->size() == num_candidates) break;
   }
 }