You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@druid.apache.org by GitBox <gi...@apache.org> on 2018/07/09 18:22:25 UTC

[GitHub] gianm closed pull request #5970: [Backport] Coordinator segment balancer max load queue fix

gianm closed pull request #5970: [Backport] Coordinator segment balancer max load queue fix
URL: https://github.com/apache/incubator-druid/pull/5970
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorBalancer.java b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorBalancer.java
index cc266d705df..15f13438163 100644
--- a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorBalancer.java
+++ b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorBalancer.java
@@ -20,9 +20,9 @@
 package io.druid.server.coordinator.helper;
 
 import com.google.common.collect.Lists;
-import io.druid.java.util.emitter.EmittingLogger;
 import io.druid.client.ImmutableDruidServer;
 import io.druid.java.util.common.StringUtils;
+import io.druid.java.util.emitter.EmittingLogger;
 import io.druid.server.coordinator.BalancerSegmentHolder;
 import io.druid.server.coordinator.BalancerStrategy;
 import io.druid.server.coordinator.CoordinatorStats;
@@ -103,31 +103,37 @@ private void balanceTier(
       return;
     }
 
-    final List<ServerHolder> serverHolderList = Lists.newArrayList(servers);
+    final List<ServerHolder> toMoveFrom = Lists.newArrayList(servers);
+    final List<ServerHolder> toMoveTo = Lists.newArrayList(servers);
 
-    if (serverHolderList.size() <= 1) {
+    if (toMoveTo.size() <= 1) {
       log.info("[%s]: One or fewer servers found.  Cannot balance.", tier);
       return;
     }
 
     int numSegments = 0;
-    for (ServerHolder server : serverHolderList) {
-      numSegments += server.getServer().getSegments().size();
+    for (ServerHolder sourceHolder : toMoveFrom) {
+      numSegments += sourceHolder.getServer().getSegments().size();
     }
 
     if (numSegments == 0) {
       log.info("No segments found.  Cannot balance.");
       return;
     }
+
+    final int maxToLoad = params.getCoordinatorDynamicConfig().getMaxSegmentsInNodeLoadingQueue();
     long unmoved = 0L;
     for (int iter = 0; iter < maxSegmentsToMove; iter++) {
-      final BalancerSegmentHolder segmentToMove = strategy.pickSegmentToMove(serverHolderList);
+      if (maxToLoad > 0) {
+        toMoveTo.removeIf(s -> s.getNumberOfSegmentsInQueue() >= maxToLoad);
+      }
+      final BalancerSegmentHolder segmentToMove = strategy.pickSegmentToMove(toMoveFrom);
 
       if (segmentToMove != null && params.getAvailableSegments().contains(segmentToMove.getSegment())) {
-        final ServerHolder holder = strategy.findNewSegmentHomeBalancer(segmentToMove.getSegment(), serverHolderList);
+        final ServerHolder destinationHolder = strategy.findNewSegmentHomeBalancer(segmentToMove.getSegment(), toMoveTo);
 
-        if (holder != null) {
-          moveSegment(segmentToMove, holder.getServer(), params);
+        if (destinationHolder != null) {
+          moveSegment(segmentToMove, destinationHolder.getServer(), params);
         } else {
           ++unmoved;
         }
@@ -140,7 +146,7 @@ private void balanceTier(
     stats.addToTieredStat("unmovedCount", tier, unmoved);
     stats.addToTieredStat("movedCount", tier, currentlyMovingSegments.get(tier).size());
     if (params.getCoordinatorDynamicConfig().emitBalancingStats()) {
-      strategy.emitStats(tier, stats, serverHolderList);
+      strategy.emitStats(tier, stats, toMoveFrom);
     }
     log.info(
         "[%s]: Segments Moved: [%d] Segments Let Alone: [%d]",
diff --git a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorBalancerTest.java b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorBalancerTest.java
index ec0b841fbcd..84cf5ce7285 100644
--- a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorBalancerTest.java
+++ b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorBalancerTest.java
@@ -163,7 +163,6 @@ public void tearDown() throws Exception
     balancerStrategyExecutor.shutdownNow();
   }
 
-
   @Test
   public void testMoveToEmptyServerBalancer() throws IOException
   {
@@ -186,7 +185,7 @@ public void testMoveToEmptyServerBalancer() throws IOException
         )
     );
 
-    DruidCoordinatorRuntimeParams params = defaullRuntimeParamsBuilder(
+    DruidCoordinatorRuntimeParams params = defaultRuntimeParamsBuilder(
         ImmutableList.of(druidServer1, druidServer2),
         ImmutableList.of(peon1, peon2)
     )
@@ -197,6 +196,48 @@ public void testMoveToEmptyServerBalancer() throws IOException
     Assert.assertEquals(2, params.getCoordinatorStats().getTieredStat("movedCount", "normal"));
   }
 
+  @Test
+  public void testMoveMaxLoadQueueServerBalancer()
+  {
+    mockDruidServer(druidServer1, "1", "normal", 30L, 100L, segments);
+    mockDruidServer(druidServer2, "2", "normal", 0L, 100L, Collections.emptyMap());
+
+    EasyMock.replay(druidServer3);
+    EasyMock.replay(druidServer4);
+
+    // Mock stuff that the coordinator needs
+    mockCoordinator(coordinator);
+
+    BalancerStrategy predefinedPickOrderStrategy = new PredefinedPickOrderBalancerStrategy(
+        balancerStrategy,
+        ImmutableList.of(
+            new BalancerSegmentHolder(druidServer1, segment1),
+            new BalancerSegmentHolder(druidServer1, segment2),
+            new BalancerSegmentHolder(druidServer1, segment3),
+            new BalancerSegmentHolder(druidServer1, segment4)
+        )
+    );
+
+    DruidCoordinatorRuntimeParams params = defaultRuntimeParamsBuilder(
+        ImmutableList.of(druidServer1, druidServer2),
+        ImmutableList.of(peon1, peon2)
+    )
+        .withBalancerStrategy(predefinedPickOrderStrategy)
+        .withDynamicConfigs(
+            CoordinatorDynamicConfig
+                .builder()
+                .withMaxSegmentsToMove(MAX_SEGMENTS_TO_MOVE)
+                .withMaxSegmentsInNodeLoadingQueue(1)
+                .build()
+        )
+        .build();
+
+    params = new DruidCoordinatorBalancerTester(coordinator).run(params);
+
+    // max to move is 5, all segments on server 1, but only expect to move 1 to server 2 since max node load queue is 1
+    Assert.assertEquals(1, params.getCoordinatorStats().getTieredStat("movedCount", "normal"));
+  }
+
   @Test
   public void testMoveSameSegmentTwice() throws Exception
   {
@@ -216,7 +257,7 @@ public void testMoveSameSegmentTwice() throws Exception
         )
     );
 
-    DruidCoordinatorRuntimeParams params = defaullRuntimeParamsBuilder(
+    DruidCoordinatorRuntimeParams params = defaultRuntimeParamsBuilder(
         ImmutableList.of(druidServer1, druidServer2),
         ImmutableList.of(peon1, peon2)
     )
@@ -245,7 +286,7 @@ public void testRun1() throws IOException
     // Mock stuff that the coordinator needs
     mockCoordinator(coordinator);
 
-    DruidCoordinatorRuntimeParams params = defaullRuntimeParamsBuilder(
+    DruidCoordinatorRuntimeParams params = defaultRuntimeParamsBuilder(
         ImmutableList.of(druidServer1, druidServer2),
         ImmutableList.of(peon1, peon2)
     ).build();
@@ -254,7 +295,6 @@ public void testRun1() throws IOException
     Assert.assertTrue(params.getCoordinatorStats().getTieredStat("movedCount", "normal") > 0);
   }
 
-
   @Test
   public void testRun2() throws IOException
   {
@@ -267,13 +307,13 @@ public void testRun2() throws IOException
     // Mock stuff that the coordinator needs
     mockCoordinator(coordinator);
 
-    DruidCoordinatorRuntimeParams params = defaullRuntimeParamsBuilder(druidServers, peons).build();
+    DruidCoordinatorRuntimeParams params = defaultRuntimeParamsBuilder(druidServers, peons).build();
 
     params = new DruidCoordinatorBalancerTester(coordinator).run(params);
     Assert.assertTrue(params.getCoordinatorStats().getTieredStat("movedCount", "normal") > 0);
   }
 
-  private DruidCoordinatorRuntimeParams.Builder defaullRuntimeParamsBuilder(
+  private DruidCoordinatorRuntimeParams.Builder defaultRuntimeParamsBuilder(
       List<ImmutableDruidServer> druidServers,
       List<LoadQueuePeon> peons
   )
@@ -393,5 +433,4 @@ public void emitStats(
       delegate.emitStats(tier, stats, serverHolderList);
     }
   }
-
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@druid.apache.org
For additional commands, e-mail: dev-help@druid.apache.org