You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2019/01/16 01:53:10 UTC

[incubator-druid] Diff for: [GitHub] drcrallen closed pull request #6829: Don't force rebalance of the whole cluster all the time

diff --git a/docs/content/configuration/index.md b/docs/content/configuration/index.md
index 77a89abcdbb..4d52bc9392b 100644
--- a/docs/content/configuration/index.md
+++ b/docs/content/configuration/index.md
@@ -766,6 +766,7 @@ Issuing a GET request at the same URL will return the spec that is currently in
 |`killAllDataSources`|Send kill tasks for ALL dataSources if property `druid.coordinator.kill.on` is true. If this is set to true then `killDataSourceWhitelist` must not be specified or be empty list.|false|
 |`killPendingSegmentsSkipList`|List of dataSources for which pendingSegments are _NOT_ cleaned up if property `druid.coordinator.kill.pendingSegments.on` is true. This can be a list of comma-separated dataSources or a JSON array.|none|
 |`maxSegmentsInNodeLoadingQueue`|The maximum number of segments that could be queued for loading to any given server. This parameter could be used to speed up segments loading process, especially if there are "slow" nodes in the cluster (with low loading speed) or if too much segments scheduled to be replicated to some particular node (faster loading could be preferred to better segments distribution). Desired value depends on segments loading speed, acceptable replication time and number of nodes. Value 1000 could be a start point for a rather big cluster. Default value is 0 (loading queue is unbounded) |0|
+|`balancerNodeLimit`|The maximum number of nodes when considering rebalancing. Of all nodes available during rebalancing, a random subsample is considered for moving segments off of, and another random subsample is considered for moving segments onto|no limit, all nodes considered|
 
 To view the audit history of coordinator dynamic config issue a GET request to the URL -
 
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java
index c51a04a8dea..31add37fd4c 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java
@@ -67,6 +67,7 @@
    * See {@link LoadQueuePeon}, {@link org.apache.druid.server.coordinator.rules.LoadRule#run}
    */
   private final int maxSegmentsInNodeLoadingQueue;
+  private final int balancerNodeLimit;
 
   @JsonCreator
   public CoordinatorDynamicConfig(
@@ -85,7 +86,8 @@ public CoordinatorDynamicConfig(
       @JsonProperty("killDataSourceWhitelist") Object killDataSourceWhitelist,
       @JsonProperty("killAllDataSources") boolean killAllDataSources,
       @JsonProperty("killPendingSegmentsSkipList") Object killPendingSegmentsSkipList,
-      @JsonProperty("maxSegmentsInNodeLoadingQueue") int maxSegmentsInNodeLoadingQueue
+      @JsonProperty("maxSegmentsInNodeLoadingQueue") int maxSegmentsInNodeLoadingQueue,
+      @JsonProperty("balancerNodeLimit") int balancerNodeLimit
   )
   {
     this.millisToWaitBeforeDeleting = millisToWaitBeforeDeleting;
@@ -100,6 +102,7 @@ public CoordinatorDynamicConfig(
     this.killDataSourceWhitelist = parseJsonStringOrArray(killDataSourceWhitelist);
     this.killPendingSegmentsSkipList = parseJsonStringOrArray(killPendingSegmentsSkipList);
     this.maxSegmentsInNodeLoadingQueue = maxSegmentsInNodeLoadingQueue;
+    this.balancerNodeLimit = balancerNodeLimit;
 
     if (this.killAllDataSources && !this.killDataSourceWhitelist.isEmpty()) {
       throw new IAE("can't have killAllDataSources and non-empty killDataSourceWhitelist");
@@ -140,6 +143,11 @@ public static CoordinatorDynamicConfig current(final JacksonConfigManager config
     return Preconditions.checkNotNull(watch(configManager).get(), "Got null config from watcher?!");
   }
 
+  public static Builder builder()
+  {
+    return new Builder();
+  }
+
   @JsonProperty
   public long getMillisToWaitBeforeDeleting()
   {
@@ -212,6 +220,12 @@ public int getMaxSegmentsInNodeLoadingQueue()
     return maxSegmentsInNodeLoadingQueue;
   }
 
+  @JsonProperty
+  public int getBalancerNodeLimit()
+  {
+    return balancerNodeLimit;
+  }
+
   @Override
   public String toString()
   {
@@ -240,43 +254,20 @@ public boolean equals(Object o)
     if (o == null || getClass() != o.getClass()) {
       return false;
     }
-
     CoordinatorDynamicConfig that = (CoordinatorDynamicConfig) o;
-
-    if (millisToWaitBeforeDeleting != that.millisToWaitBeforeDeleting) {
-      return false;
-    }
-    if (mergeBytesLimit != that.mergeBytesLimit) {
-      return false;
-    }
-    if (mergeSegmentsLimit != that.mergeSegmentsLimit) {
-      return false;
-    }
-    if (maxSegmentsToMove != that.maxSegmentsToMove) {
-      return false;
-    }
-    if (replicantLifetime != that.replicantLifetime) {
-      return false;
-    }
-    if (replicationThrottleLimit != that.replicationThrottleLimit) {
-      return false;
-    }
-    if (balancerComputeThreads != that.balancerComputeThreads) {
-      return false;
-    }
-    if (emitBalancingStats != that.emitBalancingStats) {
-      return false;
-    }
-    if (killAllDataSources != that.killAllDataSources) {
-      return false;
-    }
-    if (maxSegmentsInNodeLoadingQueue != that.maxSegmentsInNodeLoadingQueue) {
-      return false;
-    }
-    if (!Objects.equals(killDataSourceWhitelist, that.killDataSourceWhitelist)) {
-      return false;
-    }
-    return Objects.equals(killPendingSegmentsSkipList, that.killPendingSegmentsSkipList);
+    return millisToWaitBeforeDeleting == that.millisToWaitBeforeDeleting &&
+           mergeBytesLimit == that.mergeBytesLimit &&
+           mergeSegmentsLimit == that.mergeSegmentsLimit &&
+           maxSegmentsToMove == that.maxSegmentsToMove &&
+           replicantLifetime == that.replicantLifetime &&
+           replicationThrottleLimit == that.replicationThrottleLimit &&
+           balancerComputeThreads == that.balancerComputeThreads &&
+           emitBalancingStats == that.emitBalancingStats &&
+           killAllDataSources == that.killAllDataSources &&
+           maxSegmentsInNodeLoadingQueue == that.maxSegmentsInNodeLoadingQueue &&
+           balancerNodeLimit == that.balancerNodeLimit &&
+           killDataSourceWhitelist.equals(that.killDataSourceWhitelist) &&
+           killPendingSegmentsSkipList.equals(that.killPendingSegmentsSkipList);
   }
 
   @Override
@@ -292,17 +283,13 @@ public int hashCode()
         balancerComputeThreads,
         emitBalancingStats,
         killAllDataSources,
-        maxSegmentsInNodeLoadingQueue,
         killDataSourceWhitelist,
-        killPendingSegmentsSkipList
+        killPendingSegmentsSkipList,
+        maxSegmentsInNodeLoadingQueue,
+        balancerNodeLimit
     );
   }
 
-  public static Builder builder()
-  {
-    return new Builder();
-  }
-
   public static class Builder
   {
     private static final long DEFAULT_MILLIS_TO_WAIT_BEFORE_DELETING = TimeUnit.MINUTES.toMillis(15);
@@ -315,6 +302,7 @@ public static Builder builder()
     private static final boolean DEFAULT_EMIT_BALANCING_STATS = false;
     private static final boolean DEFAULT_KILL_ALL_DATA_SOURCES = false;
     private static final int DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE = 0;
+    private static final int DEFAULT_BALANCER_NODE_LIMIT = 0;
 
     private Long millisToWaitBeforeDeleting;
     private Long mergeBytesLimit;
@@ -328,6 +316,7 @@ public static Builder builder()
     private Boolean killAllDataSources;
     private Object killPendingSegmentsSkipList;
     private Integer maxSegmentsInNodeLoadingQueue;
+    private Integer balancerNodeLimit;
 
     public Builder()
     {
@@ -346,7 +335,8 @@ public Builder(
         @JsonProperty("killDataSourceWhitelist") @Nullable Object killDataSourceWhitelist,
         @JsonProperty("killAllDataSources") @Nullable Boolean killAllDataSources,
         @JsonProperty("killPendingSegmentsSkipList") @Nullable Object killPendingSegmentsSkipList,
-        @JsonProperty("maxSegmentsInNodeLoadingQueue") @Nullable Integer maxSegmentsInNodeLoadingQueue
+        @JsonProperty("maxSegmentsInNodeLoadingQueue") @Nullable Integer maxSegmentsInNodeLoadingQueue,
+        @JsonProperty("balancerNodeLimit") @Nullable Integer balancerNodeLimit
     )
     {
       this.millisToWaitBeforeDeleting = millisToWaitBeforeDeleting;
@@ -361,6 +351,7 @@ public Builder(
       this.killDataSourceWhitelist = killDataSourceWhitelist;
       this.killPendingSegmentsSkipList = killPendingSegmentsSkipList;
       this.maxSegmentsInNodeLoadingQueue = maxSegmentsInNodeLoadingQueue;
+      this.balancerNodeLimit = balancerNodeLimit;
     }
 
     public Builder withMillisToWaitBeforeDeleting(long millisToWaitBeforeDeleting)
@@ -429,6 +420,12 @@ public Builder withMaxSegmentsInNodeLoadingQueue(int maxSegmentsInNodeLoadingQue
       return this;
     }
 
+    public Builder withBalancerNodeLimit(int balancerNodeLimit)
+    {
+      this.balancerNodeLimit = balancerNodeLimit;
+      return this;
+    }
+
     public CoordinatorDynamicConfig build()
     {
       return new CoordinatorDynamicConfig(
@@ -445,7 +442,8 @@ public CoordinatorDynamicConfig build()
           killPendingSegmentsSkipList,
           maxSegmentsInNodeLoadingQueue == null
           ? DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE
-          : maxSegmentsInNodeLoadingQueue
+          : maxSegmentsInNodeLoadingQueue,
+          balancerNodeLimit == null ? DEFAULT_BALANCER_NODE_LIMIT : balancerNodeLimit
       );
     }
 
@@ -465,7 +463,8 @@ public CoordinatorDynamicConfig build(CoordinatorDynamicConfig defaults)
           killPendingSegmentsSkipList == null ? defaults.getKillPendingSegmentsSkipList() : killPendingSegmentsSkipList,
           maxSegmentsInNodeLoadingQueue == null
           ? defaults.getMaxSegmentsInNodeLoadingQueue()
-          : maxSegmentsInNodeLoadingQueue
+          : maxSegmentsInNodeLoadingQueue,
+          balancerNodeLimit == null ? defaults.getBalancerNodeLimit() : balancerNodeLimit
       );
     }
   }
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorBalancer.java b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorBalancer.java
index 129b4535085..dae6fe3e108 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorBalancer.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorBalancer.java
@@ -33,6 +33,7 @@
 import org.apache.druid.server.coordinator.ServerHolder;
 import org.apache.druid.timeline.DataSegment;
 
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
@@ -43,6 +44,7 @@
 import java.util.stream.Collectors;
 
 /**
+ *
  */
 public class DruidCoordinatorBalancer implements DruidCoordinatorHelper
 {
@@ -106,8 +108,18 @@ private void balanceTier(
       return;
     }
 
-    final List<ServerHolder> toMoveFrom = Lists.newArrayList(servers);
-    final List<ServerHolder> toMoveTo = Lists.newArrayList(servers);
+    // Sort order is ascending based on available size. We reverse the "from" to shed the largest nodes first
+    final List<ServerHolder> toMoveFrom;
+    final List<ServerHolder> toMoveTo;
+
+    final int nodeLimit = params.getCoordinatorDynamicConfig().getBalancerNodeLimit();
+    if (nodeLimit > 0) {
+      toMoveFrom = Lists.reverse(randomSortedSubList(servers, nodeLimit));
+      toMoveTo = randomSortedSubList(servers, nodeLimit);
+    } else {
+      toMoveFrom = Lists.reverse(Lists.newArrayList(servers));
+      toMoveTo = Lists.newArrayList(servers);
+    }
 
     if (toMoveTo.size() <= 1) {
       log.info("[%s]: One or fewer servers found.  Cannot balance.", tier);
@@ -169,7 +181,9 @@ private void balanceTier(
       if (iter >= maxIterations) {
         log.info(
             "Unable to select %d remaining candidate segments out of %d total to balance after %d iterations, ending run.",
-            (maxSegmentsToMove - moved - unmoved), maxSegmentsToMove, iter
+            (maxSegmentsToMove - moved - unmoved),
+            maxSegmentsToMove,
+            iter
         );
         break;
       }
@@ -192,6 +206,13 @@ private void balanceTier(
     );
   }
 
+  static <T> List<T> randomSortedSubList(Iterable<T> in, int limit)
+  {
+    final List<T> toShuffle = Lists.newArrayList(in);
+    Collections.shuffle(toShuffle);
+    return toShuffle.stream().limit(limit).sorted().collect(Collectors.toList());
+  }
+
   protected void moveSegment(
       final BalancerSegmentHolder segment,
       final ImmutableDruidServer toServer,
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java
index 0996058b4ad..61059d3df8e 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java
@@ -1423,6 +1423,7 @@ private CoordinatorDynamicConfig createCoordinatorDynamicConfig()
                                    .withEmitBalancingStats(false)
                                    .withKillDataSourceWhitelist(null)
                                    .withKillAllDataSources(false)
+                                   .withBalancerNodeLimit(0)
                                    .withMaxSegmentsInNodeLoadingQueue(1000)
                                    .build();
   }
diff --git a/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java b/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java
index 002e1977c15..3a79bb3bd6d 100644
--- a/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java
+++ b/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java
@@ -184,7 +184,7 @@ public void testUpdate()
     Assert.assertEquals(
         current,
         new CoordinatorDynamicConfig
-            .Builder(null, null, null, null, null, null, null, null, null, null, null, null)
+            .Builder(null, null, null, null, null, null, null, null, null, null, null, null, null)
             .build(current)
     );
   }


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org