You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by pj...@apache.org on 2020/06/25 11:15:17 UTC

[druid] branch master updated: Fix balancer strategy (#10070)

This is an automated email from the ASF dual-hosted git repository.

pjain1 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 422a8af  Fix balancer strategy (#10070)
422a8af is described below

commit 422a8af14e932d4da0cd7b78d4b729884dd25a34
Author: Parag Jain <pj...@apache.org>
AuthorDate: Thu Jun 25 16:45:00 2020 +0530

    Fix balancer strategy (#10070)
    
    * fix server overassignment
    
    * fix random balancer strategy, add more tests
    
    * comment
    
    * added more tests
    
    * fix forbidden apis
    
    * fix typo
---
 .../server/coordinator/CostBalancerStrategy.java   |   9 +-
 .../server/coordinator/RandomBalancerStrategy.java |  14 +-
 .../server/coordinator/BalancerStrategyTest.java   | 124 ++++++++++
 .../druid/server/coordinator/RunRulesTest.java     | 270 ++++++++++++++++++++-
 4 files changed, 406 insertions(+), 11 deletions(-)

diff --git a/server/src/main/java/org/apache/druid/server/coordinator/CostBalancerStrategy.java b/server/src/main/java/org/apache/druid/server/coordinator/CostBalancerStrategy.java
index 5d656d6..e5e3cb5 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/CostBalancerStrategy.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/CostBalancerStrategy.java
@@ -367,7 +367,8 @@ public class CostBalancerStrategy implements BalancerStrategy
       final boolean includeCurrentServer
   )
   {
-    Pair<Double, ServerHolder> bestServer = Pair.of(Double.POSITIVE_INFINITY, null);
+    final Pair<Double, ServerHolder> noServer = Pair.of(Double.POSITIVE_INFINITY, null);
+    Pair<Double, ServerHolder> bestServer = noServer;
 
     List<ListenableFuture<Pair<Double, ServerHolder>>> futures = new ArrayList<>();
 
@@ -391,7 +392,11 @@ public class CostBalancerStrategy implements BalancerStrategy
           bestServers.add(server);
         }
       }
-
+      // If the best server list contains server whose cost of serving the segment is INFINITE then this means
+      // no usable servers are found so return a null server so that segment assignment does not happen
+      if (bestServers.get(0).lhs.isInfinite()) {
+        return noServer;
+      }
       // Randomly choose a server from the best servers
       bestServer = bestServers.get(ThreadLocalRandom.current().nextInt(bestServers.size()));
     }
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/RandomBalancerStrategy.java b/server/src/main/java/org/apache/druid/server/coordinator/RandomBalancerStrategy.java
index 72fdedf..de3e46e 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/RandomBalancerStrategy.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/RandomBalancerStrategy.java
@@ -28,20 +28,22 @@ import java.util.List;
 import java.util.NavigableSet;
 import java.util.Set;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
 
 public class RandomBalancerStrategy implements BalancerStrategy
 {
   @Override
   public ServerHolder findNewSegmentHomeReplicator(DataSegment proposalSegment, List<ServerHolder> serverHolders)
   {
-    if (serverHolders.size() == 1) {
+    // filter out servers whose avaialable size is less than required for this segment and those already serving this segment
+    final List<ServerHolder> usableServerHolders = serverHolders.stream().filter(
+        serverHolder -> serverHolder.getAvailableSize() >= proposalSegment.getSize() && !serverHolder.isServingSegment(
+            proposalSegment)
+    ).collect(Collectors.toList());
+    if (usableServerHolders.size() == 0) {
       return null;
     } else {
-      ServerHolder holder = serverHolders.get(ThreadLocalRandom.current().nextInt(serverHolders.size()));
-      while (holder.isServingSegment(proposalSegment)) {
-        holder = serverHolders.get(ThreadLocalRandom.current().nextInt(serverHolders.size()));
-      }
-      return holder;
+      return usableServerHolders.get(ThreadLocalRandom.current().nextInt(usableServerHolders.size()));
     }
   }
 
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/BalancerStrategyTest.java b/server/src/test/java/org/apache/druid/server/coordinator/BalancerStrategyTest.java
new file mode 100644
index 0000000..b4d3ac5
--- /dev/null
+++ b/server/src/test/java/org/apache/druid/server/coordinator/BalancerStrategyTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator;
+
+import org.apache.druid.client.DruidServer;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.server.coordination.ServerType;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.partition.NoneShardSpec;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+
+@RunWith(Parameterized.class)
+public class BalancerStrategyTest
+{
+  private final BalancerStrategy balancerStrategy;
+  private DataSegment proposedDataSegment;
+  private List<ServerHolder> serverHolders;
+
+  @Parameterized.Parameters(name = "{index}: BalancerStrategy:{0}")
+  public static Iterable<Object[]> data()
+  {
+    return Arrays.asList(
+        new Object[][]{
+            {new CostBalancerStrategy(Execs.directExecutor())},
+            {new RandomBalancerStrategy()}
+        }
+    );
+  }
+
+  public BalancerStrategyTest(BalancerStrategy balancerStrategy)
+  {
+    this.balancerStrategy = balancerStrategy;
+  }
+
+  @Before
+  public void setUp()
+  {
+    this.proposedDataSegment = new DataSegment(
+        "datasource1",
+        Intervals.utc(0, 1),
+        "",
+        new HashMap<>(),
+        new ArrayList<>(),
+        new ArrayList<>(),
+        NoneShardSpec.instance(),
+        0,
+        11L
+    );
+  }
+
+
+  @Test
+  public void findNewSegmentHomeReplicatorNotEnoughSpace()
+  {
+    final ServerHolder serverHolder = new ServerHolder(
+        new DruidServer("server1", "host1", null, 10L, ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, 0).addDataSegment(proposedDataSegment).toImmutableDruidServer(),
+        new LoadQueuePeonTester());
+    serverHolders = new ArrayList<>();
+    serverHolders.add(serverHolder);
+    final ServerHolder foundServerHolder = balancerStrategy.findNewSegmentHomeReplicator(proposedDataSegment, serverHolders);
+    // since there is not enough space on server having available size 10L to host a segment of size 11L, it should be null
+    Assert.assertNull(foundServerHolder);
+  }
+
+  @Test(timeout = 5000L)
+  public void findNewSegmentHomeReplicatorNotEnoughNodesForReplication()
+  {
+    final ServerHolder serverHolder1 = new ServerHolder(
+        new DruidServer("server1", "host1", null, 1000L, ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, 0).addDataSegment(proposedDataSegment).toImmutableDruidServer(),
+        new LoadQueuePeonTester());
+
+    final ServerHolder serverHolder2 = new ServerHolder(
+        new DruidServer("server2", "host2", null, 1000L, ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, 0).addDataSegment(proposedDataSegment).toImmutableDruidServer(),
+        new LoadQueuePeonTester());
+
+    serverHolders = new ArrayList<>();
+    serverHolders.add(serverHolder1);
+    serverHolders.add(serverHolder2);
+
+    final ServerHolder foundServerHolder = balancerStrategy.findNewSegmentHomeReplicator(proposedDataSegment, serverHolders);
+    // since there is not enough nodes to load 3 replicas of segment
+    Assert.assertNull(foundServerHolder);
+  }
+
+  @Test
+  public void findNewSegmentHomeReplicatorEnoughSpace()
+  {
+    final ServerHolder serverHolder = new ServerHolder(
+        new DruidServer("server1", "host1", null, 1000L, ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, 0).toImmutableDruidServer(),
+        new LoadQueuePeonTester());
+    serverHolders = new ArrayList<>();
+    serverHolders.add(serverHolder);
+    final ServerHolder foundServerHolder = balancerStrategy.findNewSegmentHomeReplicator(proposedDataSegment, serverHolders);
+    // since there is enough space on server it should be selected
+    Assert.assertEquals(serverHolder, foundServerHolder);
+  }
+}
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/RunRulesTest.java b/server/src/test/java/org/apache/druid/server/coordinator/RunRulesTest.java
index 96f38c3..e138111 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/RunRulesTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/RunRulesTest.java
@@ -37,6 +37,7 @@ import org.apache.druid.server.coordinator.duty.RunRules;
 import org.apache.druid.server.coordinator.rules.ForeverLoadRule;
 import org.apache.druid.server.coordinator.rules.IntervalDropRule;
 import org.apache.druid.server.coordinator.rules.IntervalLoadRule;
+import org.apache.druid.server.coordinator.rules.LoadRule;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.partition.NoneShardSpec;
 import org.easymock.EasyMock;
@@ -193,17 +194,31 @@ public class RunRulesTest
       BalancerStrategy balancerStrategy
   )
   {
-    return createCoordinatorRuntimeParams(druidCluster)
-        .withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster()))
+    return makeCoordinatorRuntimeParams(druidCluster, balancerStrategy, usedSegments);
+  }
+
+  private DruidCoordinatorRuntimeParams.Builder makeCoordinatorRuntimeParams(
+      DruidCluster druidCluster,
+      BalancerStrategy balancerStrategy,
+      List<DataSegment> dataSegments
+  )
+  {
+    return createCoordinatorRuntimeParams(druidCluster, dataSegments)
+        .withSegmentReplicantLookup(SegmentReplicantLookup.make(druidCluster))
         .withBalancerStrategy(balancerStrategy);
   }
 
   private DruidCoordinatorRuntimeParams.Builder createCoordinatorRuntimeParams(DruidCluster druidCluster)
   {
+    return createCoordinatorRuntimeParams(druidCluster, usedSegments);
+  }
+
+  private DruidCoordinatorRuntimeParams.Builder createCoordinatorRuntimeParams(DruidCluster druidCluster, List<DataSegment> dataSegments)
+  {
     return CoordinatorRuntimeParamsTestHelpers
         .newBuilder()
         .withDruidCluster(druidCluster)
-        .withUsedSegmentsInTest(usedSegments)
+        .withUsedSegmentsInTest(dataSegments)
         .withDatabaseRuleManager(databaseRuleManager);
   }
 
@@ -1067,6 +1082,255 @@ public class RunRulesTest
     exec.shutdown();
   }
 
+  /**
+   * Tier - __default_tier
+   * Nodes - 2
+   * Replicants - 3
+   * Random balancer strategy should not assign anything and not get into loop as there are not enough nodes for replication
+   */
+  @Test(timeout = 5000L)
+  public void testTwoNodesOneTierThreeReplicantsRandomStrategyNotEnoughNodes()
+  {
+    mockCoordinator();
+    mockEmptyPeon();
+
+    EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.anyObject())).andReturn(
+        Collections.singletonList(
+            new ForeverLoadRule(
+                ImmutableMap.of(DruidServer.DEFAULT_TIER, 3)
+            )
+        )).atLeastOnce();
+    EasyMock.replay(databaseRuleManager);
+
+    DataSegment dataSegment = new DataSegment(
+        "test",
+        Intervals.utc(0, 1),
+        DateTimes.nowUtc().toString(),
+        new HashMap<>(),
+        new ArrayList<>(),
+        new ArrayList<>(),
+        NoneShardSpec.instance(),
+        IndexIO.CURRENT_VERSION_ID,
+        1
+    );
+
+    DruidCluster druidCluster = DruidClusterBuilder
+        .newBuilder()
+        .addTier(
+            DruidServer.DEFAULT_TIER,
+            new ServerHolder(
+                new DruidServer("server1", "host1", null, 1000, ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, 0).addDataSegment(dataSegment)
+                                                                                                                   .toImmutableDruidServer(),
+                mockPeon
+            ),
+            new ServerHolder(
+                new DruidServer("server2", "host2", null, 1000, ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, 0).addDataSegment(dataSegment)
+                                                                                                                   .toImmutableDruidServer(),
+                mockPeon
+            )
+        )
+        .build();
+
+    RandomBalancerStrategy balancerStrategy = new RandomBalancerStrategy();
+
+    DruidCoordinatorRuntimeParams params = makeCoordinatorRuntimeParams(druidCluster, balancerStrategy, Collections.singletonList(dataSegment))
+        .withDynamicConfigs(CoordinatorDynamicConfig.builder().withMaxSegmentsToMove(5).build())
+        .build();
+
+    DruidCoordinatorRuntimeParams afterParams = ruleRunner.run(params);
+    CoordinatorStats stats = afterParams.getCoordinatorStats();
+
+    Assert.assertEquals(0L, stats.getTieredStat("assignedCount", DruidServer.DEFAULT_TIER));
+    Assert.assertTrue(stats.getTiers("unassignedCount").isEmpty());
+    Assert.assertTrue(stats.getTiers("unassignedSize").isEmpty());
+
+    EasyMock.verify(mockPeon);
+  }
+
+
+  /**
+   * Tier - __default_tier
+   * Nodes - 1
+   * Replicants - 1
+   * Random balancer strategy should select the only node
+   */
+  @Test(timeout = 5000L)
+  public void testOneNodesOneTierOneReplicantRandomStrategyEnoughSpace()
+  {
+    mockCoordinator();
+    mockPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject());
+    EasyMock.expectLastCall().atLeastOnce();
+    mockEmptyPeon();
+
+    EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.anyObject())).andReturn(
+        Collections.singletonList(
+            new ForeverLoadRule(
+                ImmutableMap.of(DruidServer.DEFAULT_TIER, 1)
+            )
+        )).atLeastOnce();
+    EasyMock.replay(databaseRuleManager);
+
+    DataSegment dataSegment = new DataSegment(
+        "test",
+        Intervals.utc(0, 1),
+        DateTimes.nowUtc().toString(),
+        new HashMap<>(),
+        new ArrayList<>(),
+        new ArrayList<>(),
+        NoneShardSpec.instance(),
+        IndexIO.CURRENT_VERSION_ID,
+        1
+    );
+
+    DruidCluster druidCluster = DruidClusterBuilder
+        .newBuilder()
+        .addTier(
+            DruidServer.DEFAULT_TIER,
+            new ServerHolder(
+                new DruidServer("server1", "host1", null, 1000, ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, 0)
+                    .toImmutableDruidServer(),
+                mockPeon
+            )
+        )
+        .build();
+
+    RandomBalancerStrategy balancerStrategy = new RandomBalancerStrategy();
+
+    DruidCoordinatorRuntimeParams params = makeCoordinatorRuntimeParams(druidCluster, balancerStrategy, Collections.singletonList(dataSegment))
+        .withDynamicConfigs(CoordinatorDynamicConfig.builder().withMaxSegmentsToMove(5).build())
+        .build();
+
+    DruidCoordinatorRuntimeParams afterParams = ruleRunner.run(params);
+    CoordinatorStats stats = afterParams.getCoordinatorStats();
+    Assert.assertEquals(1L, stats.getTieredStat("assignedCount", DruidServer.DEFAULT_TIER));
+    Assert.assertTrue(stats.getTiers("unassignedCount").isEmpty());
+    Assert.assertTrue(stats.getTiers("unassignedSize").isEmpty());
+
+    EasyMock.verify(mockPeon);
+  }
+
+  /**
+   * Tier - __default_tier
+   * Nodes - 1
+   * Replicants - 1
+   * Random balancer strategy should not assign anything as there is not enough space
+   */
+  @Test(timeout = 5000L)
+  public void testOneNodesOneTierOneReplicantRandomStrategyNotEnoughSpace()
+  {
+    mockCoordinator();
+    mockEmptyPeon();
+    int numReplicants = 1;
+    EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.anyObject())).andReturn(
+        Collections.singletonList(
+            new ForeverLoadRule(
+                ImmutableMap.of(DruidServer.DEFAULT_TIER, numReplicants)
+            )
+        )).atLeastOnce();
+    EasyMock.replay(databaseRuleManager);
+
+    DataSegment dataSegment = new DataSegment(
+        "test",
+        Intervals.utc(0, 1),
+        DateTimes.nowUtc().toString(),
+        new HashMap<>(),
+        new ArrayList<>(),
+        new ArrayList<>(),
+        NoneShardSpec.instance(),
+        IndexIO.CURRENT_VERSION_ID,
+        11
+    );
+
+    DruidCluster druidCluster = DruidClusterBuilder
+        .newBuilder()
+        .addTier(
+            DruidServer.DEFAULT_TIER,
+            new ServerHolder(
+                new DruidServer("server1", "host1", null, 10, ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, 0)
+                    .toImmutableDruidServer(),
+                mockPeon
+            )
+        )
+        .build();
+
+    RandomBalancerStrategy balancerStrategy = new RandomBalancerStrategy();
+
+    DruidCoordinatorRuntimeParams params = makeCoordinatorRuntimeParams(druidCluster, balancerStrategy, Collections.singletonList(dataSegment))
+        .withDynamicConfigs(CoordinatorDynamicConfig.builder().withMaxSegmentsToMove(5).build())
+        .build();
+
+    DruidCoordinatorRuntimeParams afterParams = ruleRunner.run(params);
+    CoordinatorStats stats = afterParams.getCoordinatorStats();
+    Assert.assertEquals(dataSegment.getSize() * numReplicants, stats.getTieredStat(LoadRule.REQUIRED_CAPACITY, DruidServer.DEFAULT_TIER));
+    Assert.assertTrue(stats.getTiers("assignedCount").isEmpty()); // since primary assignment failed
+    Assert.assertTrue(stats.getTiers("unassignedCount").isEmpty());
+    Assert.assertTrue(stats.getTiers("unassignedSize").isEmpty());
+
+    EasyMock.verify(mockPeon);
+  }
+
+  /**
+   * Tier - __default_tier
+   * Nodes - 1
+   * Replicants - 1
+   * Cost balancer strategy should not assign anything as there is not enough space
+   */
+  @Test
+  public void testOneNodesOneTierOneReplicantCostBalancerStrategyNotEnoughSpace()
+  {
+    mockCoordinator();
+    mockEmptyPeon();
+    int numReplicants = 1;
+    EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.anyObject())).andReturn(
+        Collections.singletonList(
+            new ForeverLoadRule(
+                ImmutableMap.of(DruidServer.DEFAULT_TIER, numReplicants)
+            )
+        )).atLeastOnce();
+    EasyMock.replay(databaseRuleManager);
+
+    DataSegment dataSegment = new DataSegment(
+        "test",
+        Intervals.utc(0, 1),
+        DateTimes.nowUtc().toString(),
+        new HashMap<>(),
+        new ArrayList<>(),
+        new ArrayList<>(),
+        NoneShardSpec.instance(),
+        IndexIO.CURRENT_VERSION_ID,
+        11
+    );
+
+    DruidCluster druidCluster = DruidClusterBuilder
+        .newBuilder()
+        .addTier(
+            DruidServer.DEFAULT_TIER,
+            new ServerHolder(
+                new DruidServer("server1", "host1", null, 10, ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, 0)
+                    .toImmutableDruidServer(),
+                mockPeon
+            )
+        )
+        .build();
+
+    ListeningExecutorService exec = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1));
+    CostBalancerStrategy balancerStrategy = new CostBalancerStrategy(exec);
+
+    DruidCoordinatorRuntimeParams params = makeCoordinatorRuntimeParams(druidCluster, balancerStrategy, Collections.singletonList(dataSegment))
+        .withDynamicConfigs(CoordinatorDynamicConfig.builder().withMaxSegmentsToMove(5).build())
+        .build();
+
+    DruidCoordinatorRuntimeParams afterParams = ruleRunner.run(params);
+    CoordinatorStats stats = afterParams.getCoordinatorStats();
+    Assert.assertEquals(dataSegment.getSize() * numReplicants, stats.getTieredStat(LoadRule.REQUIRED_CAPACITY, DruidServer.DEFAULT_TIER));
+    Assert.assertTrue(stats.getTiers("assignedCount").isEmpty()); // since primary assignment should fail
+    Assert.assertTrue(stats.getTiers("unassignedCount").isEmpty());
+    Assert.assertTrue(stats.getTiers("unassignedSize").isEmpty());
+
+    exec.shutdown();
+    EasyMock.verify(mockPeon);
+  }
+
   private void mockCoordinator()
   {
     EasyMock.expect(coordinator.getDynamicConfigs()).andReturn(createCoordinatorDynamicConfig()).anyTimes();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org