You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by pj...@apache.org on 2020/06/25 11:15:17 UTC
[druid] branch master updated: Fix balancer strategy (#10070)
This is an automated email from the ASF dual-hosted git repository.
pjain1 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 422a8af Fix balancer strategy (#10070)
422a8af is described below
commit 422a8af14e932d4da0cd7b78d4b729884dd25a34
Author: Parag Jain <pj...@apache.org>
AuthorDate: Thu Jun 25 16:45:00 2020 +0530
Fix balancer strategy (#10070)
* fix server overassignment
* fix random balancer strategy, add more tests
* comment
* added more tests
* fix forbidden apis
* fix typo
---
.../server/coordinator/CostBalancerStrategy.java | 9 +-
.../server/coordinator/RandomBalancerStrategy.java | 14 +-
.../server/coordinator/BalancerStrategyTest.java | 124 ++++++++++
.../druid/server/coordinator/RunRulesTest.java | 270 ++++++++++++++++++++-
4 files changed, 406 insertions(+), 11 deletions(-)
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/CostBalancerStrategy.java b/server/src/main/java/org/apache/druid/server/coordinator/CostBalancerStrategy.java
index 5d656d6..e5e3cb5 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/CostBalancerStrategy.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/CostBalancerStrategy.java
@@ -367,7 +367,8 @@ public class CostBalancerStrategy implements BalancerStrategy
final boolean includeCurrentServer
)
{
- Pair<Double, ServerHolder> bestServer = Pair.of(Double.POSITIVE_INFINITY, null);
+ final Pair<Double, ServerHolder> noServer = Pair.of(Double.POSITIVE_INFINITY, null);
+ Pair<Double, ServerHolder> bestServer = noServer;
List<ListenableFuture<Pair<Double, ServerHolder>>> futures = new ArrayList<>();
@@ -391,7 +392,11 @@ public class CostBalancerStrategy implements BalancerStrategy
bestServers.add(server);
}
}
-
+ // If the best server list contains server whose cost of serving the segment is INFINITE then this means
+ // no usable servers are found so return a null server so that segment assignment does not happen
+ if (bestServers.get(0).lhs.isInfinite()) {
+ return noServer;
+ }
// Randomly choose a server from the best servers
bestServer = bestServers.get(ThreadLocalRandom.current().nextInt(bestServers.size()));
}
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/RandomBalancerStrategy.java b/server/src/main/java/org/apache/druid/server/coordinator/RandomBalancerStrategy.java
index 72fdedf..de3e46e 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/RandomBalancerStrategy.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/RandomBalancerStrategy.java
@@ -28,20 +28,22 @@ import java.util.List;
import java.util.NavigableSet;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
public class RandomBalancerStrategy implements BalancerStrategy
{
@Override
public ServerHolder findNewSegmentHomeReplicator(DataSegment proposalSegment, List<ServerHolder> serverHolders)
{
- if (serverHolders.size() == 1) {
+ // filter out servers whose avaialable size is less than required for this segment and those already serving this segment
+ final List<ServerHolder> usableServerHolders = serverHolders.stream().filter(
+ serverHolder -> serverHolder.getAvailableSize() >= proposalSegment.getSize() && !serverHolder.isServingSegment(
+ proposalSegment)
+ ).collect(Collectors.toList());
+ if (usableServerHolders.size() == 0) {
return null;
} else {
- ServerHolder holder = serverHolders.get(ThreadLocalRandom.current().nextInt(serverHolders.size()));
- while (holder.isServingSegment(proposalSegment)) {
- holder = serverHolders.get(ThreadLocalRandom.current().nextInt(serverHolders.size()));
- }
- return holder;
+ return usableServerHolders.get(ThreadLocalRandom.current().nextInt(usableServerHolders.size()));
}
}
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/BalancerStrategyTest.java b/server/src/test/java/org/apache/druid/server/coordinator/BalancerStrategyTest.java
new file mode 100644
index 0000000..b4d3ac5
--- /dev/null
+++ b/server/src/test/java/org/apache/druid/server/coordinator/BalancerStrategyTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator;
+
+import org.apache.druid.client.DruidServer;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.server.coordination.ServerType;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.partition.NoneShardSpec;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+
+@RunWith(Parameterized.class)
+public class BalancerStrategyTest
+{
+ private final BalancerStrategy balancerStrategy;
+ private DataSegment proposedDataSegment;
+ private List<ServerHolder> serverHolders;
+
+ @Parameterized.Parameters(name = "{index}: BalancerStrategy:{0}")
+ public static Iterable<Object[]> data()
+ {
+ return Arrays.asList(
+ new Object[][]{
+ {new CostBalancerStrategy(Execs.directExecutor())},
+ {new RandomBalancerStrategy()}
+ }
+ );
+ }
+
+ public BalancerStrategyTest(BalancerStrategy balancerStrategy)
+ {
+ this.balancerStrategy = balancerStrategy;
+ }
+
+ @Before
+ public void setUp()
+ {
+ this.proposedDataSegment = new DataSegment(
+ "datasource1",
+ Intervals.utc(0, 1),
+ "",
+ new HashMap<>(),
+ new ArrayList<>(),
+ new ArrayList<>(),
+ NoneShardSpec.instance(),
+ 0,
+ 11L
+ );
+ }
+
+
+ @Test
+ public void findNewSegmentHomeReplicatorNotEnoughSpace()
+ {
+ final ServerHolder serverHolder = new ServerHolder(
+ new DruidServer("server1", "host1", null, 10L, ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, 0).addDataSegment(proposedDataSegment).toImmutableDruidServer(),
+ new LoadQueuePeonTester());
+ serverHolders = new ArrayList<>();
+ serverHolders.add(serverHolder);
+ final ServerHolder foundServerHolder = balancerStrategy.findNewSegmentHomeReplicator(proposedDataSegment, serverHolders);
+ // since there is not enough space on server having available size 10L to host a segment of size 11L, it should be null
+ Assert.assertNull(foundServerHolder);
+ }
+
+ @Test(timeout = 5000L)
+ public void findNewSegmentHomeReplicatorNotEnoughNodesForReplication()
+ {
+ final ServerHolder serverHolder1 = new ServerHolder(
+ new DruidServer("server1", "host1", null, 1000L, ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, 0).addDataSegment(proposedDataSegment).toImmutableDruidServer(),
+ new LoadQueuePeonTester());
+
+ final ServerHolder serverHolder2 = new ServerHolder(
+ new DruidServer("server2", "host2", null, 1000L, ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, 0).addDataSegment(proposedDataSegment).toImmutableDruidServer(),
+ new LoadQueuePeonTester());
+
+ serverHolders = new ArrayList<>();
+ serverHolders.add(serverHolder1);
+ serverHolders.add(serverHolder2);
+
+ final ServerHolder foundServerHolder = balancerStrategy.findNewSegmentHomeReplicator(proposedDataSegment, serverHolders);
+ // since there is not enough nodes to load 3 replicas of segment
+ Assert.assertNull(foundServerHolder);
+ }
+
+ @Test
+ public void findNewSegmentHomeReplicatorEnoughSpace()
+ {
+ final ServerHolder serverHolder = new ServerHolder(
+ new DruidServer("server1", "host1", null, 1000L, ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, 0).toImmutableDruidServer(),
+ new LoadQueuePeonTester());
+ serverHolders = new ArrayList<>();
+ serverHolders.add(serverHolder);
+ final ServerHolder foundServerHolder = balancerStrategy.findNewSegmentHomeReplicator(proposedDataSegment, serverHolders);
+ // since there is enough space on server it should be selected
+ Assert.assertEquals(serverHolder, foundServerHolder);
+ }
+}
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/RunRulesTest.java b/server/src/test/java/org/apache/druid/server/coordinator/RunRulesTest.java
index 96f38c3..e138111 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/RunRulesTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/RunRulesTest.java
@@ -37,6 +37,7 @@ import org.apache.druid.server.coordinator.duty.RunRules;
import org.apache.druid.server.coordinator.rules.ForeverLoadRule;
import org.apache.druid.server.coordinator.rules.IntervalDropRule;
import org.apache.druid.server.coordinator.rules.IntervalLoadRule;
+import org.apache.druid.server.coordinator.rules.LoadRule;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NoneShardSpec;
import org.easymock.EasyMock;
@@ -193,17 +194,31 @@ public class RunRulesTest
BalancerStrategy balancerStrategy
)
{
- return createCoordinatorRuntimeParams(druidCluster)
- .withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster()))
+ return makeCoordinatorRuntimeParams(druidCluster, balancerStrategy, usedSegments);
+ }
+
+ private DruidCoordinatorRuntimeParams.Builder makeCoordinatorRuntimeParams(
+ DruidCluster druidCluster,
+ BalancerStrategy balancerStrategy,
+ List<DataSegment> dataSegments
+ )
+ {
+ return createCoordinatorRuntimeParams(druidCluster, dataSegments)
+ .withSegmentReplicantLookup(SegmentReplicantLookup.make(druidCluster))
.withBalancerStrategy(balancerStrategy);
}
private DruidCoordinatorRuntimeParams.Builder createCoordinatorRuntimeParams(DruidCluster druidCluster)
{
+ return createCoordinatorRuntimeParams(druidCluster, usedSegments);
+ }
+
+ private DruidCoordinatorRuntimeParams.Builder createCoordinatorRuntimeParams(DruidCluster druidCluster, List<DataSegment> dataSegments)
+ {
return CoordinatorRuntimeParamsTestHelpers
.newBuilder()
.withDruidCluster(druidCluster)
- .withUsedSegmentsInTest(usedSegments)
+ .withUsedSegmentsInTest(dataSegments)
.withDatabaseRuleManager(databaseRuleManager);
}
@@ -1067,6 +1082,255 @@ public class RunRulesTest
exec.shutdown();
}
+ /**
+ * Tier - __default_tier
+ * Nodes - 2
+ * Replicants - 3
+ * Random balancer strategy should not assign anything and not get into loop as there are not enough nodes for replication
+ */
+ @Test(timeout = 5000L)
+ public void testTwoNodesOneTierThreeReplicantsRandomStrategyNotEnoughNodes()
+ {
+ mockCoordinator();
+ mockEmptyPeon();
+
+ EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.anyObject())).andReturn(
+ Collections.singletonList(
+ new ForeverLoadRule(
+ ImmutableMap.of(DruidServer.DEFAULT_TIER, 3)
+ )
+ )).atLeastOnce();
+ EasyMock.replay(databaseRuleManager);
+
+ DataSegment dataSegment = new DataSegment(
+ "test",
+ Intervals.utc(0, 1),
+ DateTimes.nowUtc().toString(),
+ new HashMap<>(),
+ new ArrayList<>(),
+ new ArrayList<>(),
+ NoneShardSpec.instance(),
+ IndexIO.CURRENT_VERSION_ID,
+ 1
+ );
+
+ DruidCluster druidCluster = DruidClusterBuilder
+ .newBuilder()
+ .addTier(
+ DruidServer.DEFAULT_TIER,
+ new ServerHolder(
+ new DruidServer("server1", "host1", null, 1000, ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, 0).addDataSegment(dataSegment)
+ .toImmutableDruidServer(),
+ mockPeon
+ ),
+ new ServerHolder(
+ new DruidServer("server2", "host2", null, 1000, ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, 0).addDataSegment(dataSegment)
+ .toImmutableDruidServer(),
+ mockPeon
+ )
+ )
+ .build();
+
+ RandomBalancerStrategy balancerStrategy = new RandomBalancerStrategy();
+
+ DruidCoordinatorRuntimeParams params = makeCoordinatorRuntimeParams(druidCluster, balancerStrategy, Collections.singletonList(dataSegment))
+ .withDynamicConfigs(CoordinatorDynamicConfig.builder().withMaxSegmentsToMove(5).build())
+ .build();
+
+ DruidCoordinatorRuntimeParams afterParams = ruleRunner.run(params);
+ CoordinatorStats stats = afterParams.getCoordinatorStats();
+
+ Assert.assertEquals(0L, stats.getTieredStat("assignedCount", DruidServer.DEFAULT_TIER));
+ Assert.assertTrue(stats.getTiers("unassignedCount").isEmpty());
+ Assert.assertTrue(stats.getTiers("unassignedSize").isEmpty());
+
+ EasyMock.verify(mockPeon);
+ }
+
+
+ /**
+ * Tier - __default_tier
+ * Nodes - 1
+ * Replicants - 1
+ * Random balancer strategy should select the only node
+ */
+ @Test(timeout = 5000L)
+ public void testOneNodesOneTierOneReplicantRandomStrategyEnoughSpace()
+ {
+ mockCoordinator();
+ mockPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject());
+ EasyMock.expectLastCall().atLeastOnce();
+ mockEmptyPeon();
+
+ EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.anyObject())).andReturn(
+ Collections.singletonList(
+ new ForeverLoadRule(
+ ImmutableMap.of(DruidServer.DEFAULT_TIER, 1)
+ )
+ )).atLeastOnce();
+ EasyMock.replay(databaseRuleManager);
+
+ DataSegment dataSegment = new DataSegment(
+ "test",
+ Intervals.utc(0, 1),
+ DateTimes.nowUtc().toString(),
+ new HashMap<>(),
+ new ArrayList<>(),
+ new ArrayList<>(),
+ NoneShardSpec.instance(),
+ IndexIO.CURRENT_VERSION_ID,
+ 1
+ );
+
+ DruidCluster druidCluster = DruidClusterBuilder
+ .newBuilder()
+ .addTier(
+ DruidServer.DEFAULT_TIER,
+ new ServerHolder(
+ new DruidServer("server1", "host1", null, 1000, ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, 0)
+ .toImmutableDruidServer(),
+ mockPeon
+ )
+ )
+ .build();
+
+ RandomBalancerStrategy balancerStrategy = new RandomBalancerStrategy();
+
+ DruidCoordinatorRuntimeParams params = makeCoordinatorRuntimeParams(druidCluster, balancerStrategy, Collections.singletonList(dataSegment))
+ .withDynamicConfigs(CoordinatorDynamicConfig.builder().withMaxSegmentsToMove(5).build())
+ .build();
+
+ DruidCoordinatorRuntimeParams afterParams = ruleRunner.run(params);
+ CoordinatorStats stats = afterParams.getCoordinatorStats();
+ Assert.assertEquals(1L, stats.getTieredStat("assignedCount", DruidServer.DEFAULT_TIER));
+ Assert.assertTrue(stats.getTiers("unassignedCount").isEmpty());
+ Assert.assertTrue(stats.getTiers("unassignedSize").isEmpty());
+
+ EasyMock.verify(mockPeon);
+ }
+
+ /**
+ * Tier - __default_tier
+ * Nodes - 1
+ * Replicants - 1
+ * Random balancer strategy should not assign anything as there is not enough space
+ */
+ @Test(timeout = 5000L)
+ public void testOneNodesOneTierOneReplicantRandomStrategyNotEnoughSpace()
+ {
+ mockCoordinator();
+ mockEmptyPeon();
+ int numReplicants = 1;
+ EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.anyObject())).andReturn(
+ Collections.singletonList(
+ new ForeverLoadRule(
+ ImmutableMap.of(DruidServer.DEFAULT_TIER, numReplicants)
+ )
+ )).atLeastOnce();
+ EasyMock.replay(databaseRuleManager);
+
+ DataSegment dataSegment = new DataSegment(
+ "test",
+ Intervals.utc(0, 1),
+ DateTimes.nowUtc().toString(),
+ new HashMap<>(),
+ new ArrayList<>(),
+ new ArrayList<>(),
+ NoneShardSpec.instance(),
+ IndexIO.CURRENT_VERSION_ID,
+ 11
+ );
+
+ DruidCluster druidCluster = DruidClusterBuilder
+ .newBuilder()
+ .addTier(
+ DruidServer.DEFAULT_TIER,
+ new ServerHolder(
+ new DruidServer("server1", "host1", null, 10, ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, 0)
+ .toImmutableDruidServer(),
+ mockPeon
+ )
+ )
+ .build();
+
+ RandomBalancerStrategy balancerStrategy = new RandomBalancerStrategy();
+
+ DruidCoordinatorRuntimeParams params = makeCoordinatorRuntimeParams(druidCluster, balancerStrategy, Collections.singletonList(dataSegment))
+ .withDynamicConfigs(CoordinatorDynamicConfig.builder().withMaxSegmentsToMove(5).build())
+ .build();
+
+ DruidCoordinatorRuntimeParams afterParams = ruleRunner.run(params);
+ CoordinatorStats stats = afterParams.getCoordinatorStats();
+ Assert.assertEquals(dataSegment.getSize() * numReplicants, stats.getTieredStat(LoadRule.REQUIRED_CAPACITY, DruidServer.DEFAULT_TIER));
+ Assert.assertTrue(stats.getTiers("assignedCount").isEmpty()); // since primary assignment failed
+ Assert.assertTrue(stats.getTiers("unassignedCount").isEmpty());
+ Assert.assertTrue(stats.getTiers("unassignedSize").isEmpty());
+
+ EasyMock.verify(mockPeon);
+ }
+
+ /**
+ * Tier - __default_tier
+ * Nodes - 1
+ * Replicants - 1
+ * Cost balancer strategy should not assign anything as there is not enough space
+ */
+ @Test
+ public void testOneNodesOneTierOneReplicantCostBalancerStrategyNotEnoughSpace()
+ {
+ mockCoordinator();
+ mockEmptyPeon();
+ int numReplicants = 1;
+ EasyMock.expect(databaseRuleManager.getRulesWithDefault(EasyMock.anyObject())).andReturn(
+ Collections.singletonList(
+ new ForeverLoadRule(
+ ImmutableMap.of(DruidServer.DEFAULT_TIER, numReplicants)
+ )
+ )).atLeastOnce();
+ EasyMock.replay(databaseRuleManager);
+
+ DataSegment dataSegment = new DataSegment(
+ "test",
+ Intervals.utc(0, 1),
+ DateTimes.nowUtc().toString(),
+ new HashMap<>(),
+ new ArrayList<>(),
+ new ArrayList<>(),
+ NoneShardSpec.instance(),
+ IndexIO.CURRENT_VERSION_ID,
+ 11
+ );
+
+ DruidCluster druidCluster = DruidClusterBuilder
+ .newBuilder()
+ .addTier(
+ DruidServer.DEFAULT_TIER,
+ new ServerHolder(
+ new DruidServer("server1", "host1", null, 10, ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, 0)
+ .toImmutableDruidServer(),
+ mockPeon
+ )
+ )
+ .build();
+
+ ListeningExecutorService exec = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1));
+ CostBalancerStrategy balancerStrategy = new CostBalancerStrategy(exec);
+
+ DruidCoordinatorRuntimeParams params = makeCoordinatorRuntimeParams(druidCluster, balancerStrategy, Collections.singletonList(dataSegment))
+ .withDynamicConfigs(CoordinatorDynamicConfig.builder().withMaxSegmentsToMove(5).build())
+ .build();
+
+ DruidCoordinatorRuntimeParams afterParams = ruleRunner.run(params);
+ CoordinatorStats stats = afterParams.getCoordinatorStats();
+ Assert.assertEquals(dataSegment.getSize() * numReplicants, stats.getTieredStat(LoadRule.REQUIRED_CAPACITY, DruidServer.DEFAULT_TIER));
+ Assert.assertTrue(stats.getTiers("assignedCount").isEmpty()); // since primary assignment should fail
+ Assert.assertTrue(stats.getTiers("unassignedCount").isEmpty());
+ Assert.assertTrue(stats.getTiers("unassignedSize").isEmpty());
+
+ exec.shutdown();
+ EasyMock.verify(mockPeon);
+ }
+
private void mockCoordinator()
{
EasyMock.expect(coordinator.getDynamicConfigs()).andReturn(createCoordinatorDynamicConfig()).anyTimes();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org