You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by "github-code-scanning[bot] (via GitHub)" <gi...@apache.org> on 2023/05/02 06:08:33 UTC

[GitHub] [druid] github-code-scanning[bot] commented on a diff in pull request #13197: Segment loading: Allow cancellation and prioritization of load queue items

github-code-scanning[bot] commented on code in PR #13197:
URL: https://github.com/apache/druid/pull/13197#discussion_r1182120694


##########
server/src/test/java/org/apache/druid/server/coordinator/rules/LoadRuleTest.java:
##########
@@ -117,397 +99,276 @@
   @Before
   public void setUp()
   {
-    EmittingLogger.registerEmitter(EMITTER);
-    EMITTER.start();
-    throttler = EasyMock.createMock(ReplicationThrottler.class);
-
     exec = MoreExecutors.listeningDecorator(Execs.multiThreaded(1, "LoadRuleTest-%d"));
     balancerStrategy = new CostBalancerStrategyFactory().createBalancerStrategy(exec);
     cachingCostBalancerStrategy = new CachingCostBalancerStrategy(ClusterCostCache.builder().build(), exec);
 
     mockBalancerStrategy = EasyMock.createMock(BalancerStrategy.class);
+    loadQueueManager = new SegmentLoadQueueManager(null, null, null);
   }
 
   @After
-  public void tearDown() throws Exception
+  public void tearDown()
   {
     exec.shutdown();
-    EMITTER.close();
   }
 
   @Test
   public void testLoad()
   {
-    EasyMock.expect(throttler.canCreateReplicant(EasyMock.anyString())).andReturn(true).anyTimes();
-
     final LoadQueuePeon mockPeon = createEmptyPeon();
-    mockPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject());
+    mockPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject());
     EasyMock.expectLastCall().atLeastOnce();
 
-    LoadRule rule = createLoadRule(ImmutableMap.of(
-        "hot", 1,
-        DruidServer.DEFAULT_TIER, 2
-    ));
-
-    final DataSegment segment = createDataSegment("foo");
-
-    throttler.registerReplicantCreation(DruidServer.DEFAULT_TIER, segment.getId(), "hostNorm");
-    EasyMock.expectLastCall().once();
-
     if (!useRoundRobinAssignment) {
-      EasyMock.expect(mockBalancerStrategy.findNewSegmentHomeReplicator(EasyMock.anyObject(), EasyMock.anyObject()))
+      EasyMock.expect(mockBalancerStrategy.findServerToLoadSegment(EasyMock.anyObject(), EasyMock.anyObject()))
               .andDelegateTo(balancerStrategy)
-              .times(3);
+              .times(2);
     }
+    EasyMock.replay(mockPeon, mockBalancerStrategy);
 
-    EasyMock.replay(throttler, mockPeon, mockBalancerStrategy);
-
-    DruidCluster druidCluster = DruidClusterBuilder
-        .newBuilder()
-        .addTier(
-            "hot",
-            new ServerHolder(
-                new DruidServer("serverHot", "hostHot", null, 1000, ServerType.HISTORICAL, "hot", 1)
-                    .toImmutableDruidServer(),
-                mockPeon
-            )
-        )
-        .addTier(
-            DruidServer.DEFAULT_TIER,
-            new ServerHolder(
-                new DruidServer(
-                    "serverNorm",
-                    "hostNorm",
-                    null,
-                    1000,
-                    ServerType.HISTORICAL,
-                    DruidServer.DEFAULT_TIER,
-                    0
-                ).toImmutableDruidServer(),
-                mockPeon
-            )
-        )
+    DruidCluster druidCluster = DruidCluster
+        .builder()
+        .addTier(Tier.T1, createServerHolder(Tier.T1, mockPeon, false))
+        .addTier(Tier.T2, createServerHolder(Tier.T2, mockPeon, false))
         .build();
 
-    CoordinatorStats stats = rule.run(null, makeCoordinatorRuntimeParams(druidCluster, segment), segment);
+    final DataSegment segment = createDataSegment(DS_WIKI);
+    LoadRule rule = loadForever(ImmutableMap.of(Tier.T1, 1, Tier.T2, 2));
+    CoordinatorRunStats stats = runRuleAndGetStats(rule, segment, druidCluster);
+
+    Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED, Tier.T1, DS_WIKI));
+    Assert.assertEquals(1L, stats.getSegmentStat(Stats.Segments.ASSIGNED, Tier.T2, DS_WIKI));
+
+    EasyMock.verify(mockPeon, mockBalancerStrategy);
+  }
 
-    Assert.assertEquals(1L, stats.getTieredStat(LoadRule.ASSIGNED_COUNT, "hot"));
-    Assert.assertEquals(1L, stats.getTieredStat(LoadRule.ASSIGNED_COUNT, DruidServer.DEFAULT_TIER));
+  private CoordinatorRunStats runRuleAndGetStats(
+      LoadRule rule,
+      DataSegment segment,
+      DruidCluster cluster
+  )
+  {
+    return runRuleAndGetStats(rule, segment, makeCoordinatorRuntimeParams(cluster, segment));
+  }
 
-    EasyMock.verify(throttler, mockPeon, mockBalancerStrategy);
+  private CoordinatorRunStats runRuleAndGetStats(
+      LoadRule rule,
+      DataSegment segment,
+      DruidCoordinatorRuntimeParams params
+  )
+  {
+    final StrategicSegmentAssigner segmentAssigner = params.getSegmentAssigner();
+    rule.run(segment, segmentAssigner);
+    return segmentAssigner.getStats();
   }
 
   private DruidCoordinatorRuntimeParams makeCoordinatorRuntimeParams(
       DruidCluster druidCluster,
       DataSegment... usedSegments
   )
   {
-    return CoordinatorRuntimeParamsTestHelpers
-        .newBuilder()
-        .withDruidCluster(druidCluster)
-        .withSegmentReplicantLookup(SegmentReplicantLookup.make(druidCluster, false))
-        .withReplicationManager(throttler)
-        .withDynamicConfigs(
-            CoordinatorDynamicConfig
-                .builder()
-                .withUseRoundRobinSegmentAssignment(useRoundRobinAssignment)
-                .build()
-        )
-        .withRoundRobinServerSelector(useRoundRobinAssignment ? new RoundRobinServerSelector(druidCluster) : null)
-        .withBalancerStrategy(mockBalancerStrategy)
-        .withUsedSegmentsInTest(usedSegments)
-        .build();
+    return makeCoordinatorRuntimeParams(druidCluster, false, usedSegments);
   }
 
-  private DruidCoordinatorRuntimeParams makeCoordinatorRuntimeParamsWithLoadReplicationOnTimeout(
+  private DruidCoordinatorRuntimeParams makeCoordinatorRuntimeParams(
       DruidCluster druidCluster,
+      boolean replicateAfterLoadTimeout,

Review Comment:
   ## Useless parameter
   
   The parameter 'replicateAfterLoadTimeout' is never used.
   
   [Show more details](https://github.com/apache/druid/security/code-scanning/4912)



##########
server/src/main/java/org/apache/druid/server/coordinator/StrategicSegmentAssigner.java:
##########
@@ -0,0 +1,585 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator;
+
+import com.google.common.collect.Sets;
+import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.server.coordinator.balancer.BalancerStrategy;
+import org.apache.druid.server.coordinator.loadqueue.SegmentAction;
+import org.apache.druid.server.coordinator.loadqueue.SegmentLoadQueueManager;
+import org.apache.druid.server.coordinator.rules.SegmentActionHandler;
+import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
+import org.apache.druid.server.coordinator.stats.CoordinatorStat;
+import org.apache.druid.server.coordinator.stats.RowKey;
+import org.apache.druid.server.coordinator.stats.Stats;
+import org.apache.druid.timeline.DataSegment;
+
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+/**
+ * Used by the coordinator in each run for segment loading, dropping, balancing
+ * and broadcasting.
+ * <p>
+ * An instance of this class is freshly created for each coordinator run.
+ */
+public class StrategicSegmentAssigner implements SegmentActionHandler
+{
+  private static final EmittingLogger log = new EmittingLogger(StrategicSegmentAssigner.class);
+
+  private final SegmentLoadQueueManager loadQueueManager;
+  private final DruidCluster cluster;
+  private final CoordinatorRunStats stats;
+  private final SegmentReplicantLookup replicantLookup;
+  private final ReplicationThrottler replicationThrottler;
+  private final RoundRobinServerSelector serverSelector;
+  private final BalancerStrategy strategy;
+
+  private final boolean useRoundRobinAssignment;
+
+  private final Set<String> tiersWithNoServer = new HashSet<>();
+
+  public StrategicSegmentAssigner(
+      SegmentLoadQueueManager loadQueueManager,
+      DruidCluster cluster,
+      BalancerStrategy strategy,
+      CoordinatorDynamicConfig dynamicConfig,
+      CoordinatorRunStats stats
+  )
+  {
+    this.stats = stats;
+    this.cluster = cluster;
+    this.strategy = strategy;
+    this.loadQueueManager = loadQueueManager;
+    this.replicantLookup = SegmentReplicantLookup.make(cluster);
+    this.replicationThrottler = createReplicationThrottler(dynamicConfig);
+    this.useRoundRobinAssignment = dynamicConfig.isUseRoundRobinSegmentAssignment();
+    this.serverSelector = useRoundRobinAssignment ? new RoundRobinServerSelector(cluster) : null;
+  }
+
+  public CoordinatorRunStats getStats()
+  {
+    return stats;
+  }
+
+  public SegmentReplicantLookup getReplicantLookup()
+  {
+    return replicantLookup;
+  }
+
+  public void makeAlerts()
+  {
+    if (!tiersWithNoServer.isEmpty()) {
+      log.makeAlert("Tiers [%s] have no servers! Check your cluster configuration.", tiersWithNoServer).emit();
+    }
+  }
+
+  /**
+   * Moves the given segment from the source server to an eligible destination
+   * server.
+   * <p>
+   * An eligible destination server must:
+   * <ul>
+   *   <li>be present in the given list of destination servers</li>
+   *   <li>belong to the same tier as the source server</li>
+   *   <li>not already be serving or loading a replica of the segment</li>
+   *   <li>have enough space to load the segment</li>
+   * </ul>
+   * <p>
+   * The segment is not moved if:
+   * <ul>
+   *   <li>there is no eligible destination server, or</li>
+   *   <li>skipIfOptimallyPlaced is true and segment is already optimally placed, or</li>
+   *   <li>some other error occurs</li>
+   * </ul>
+   */
+  public boolean moveSegment(
+      DataSegment segment,
+      ServerHolder sourceServer,
+      List<ServerHolder> destinationServers
+  )
+  {
+    final String tier = sourceServer.getServer().getTier();
+    final List<ServerHolder> eligibleDestinationServers =
+        destinationServers.stream()
+                          .filter(s -> s.getServer().getTier().equals(tier))
+                          .filter(s -> s.canLoadSegment(segment))
+                          .collect(Collectors.toList());
+
+    if (eligibleDestinationServers.isEmpty()) {
+      incrementStat(Error.NO_ELIGIBLE_SERVER_FOR_MOVE, segment, tier);
+      return false;
+    }
+
+    // If the source server is not decommissioning, move can be skipped if the
+    // segment is already optimally placed
+    if (!sourceServer.isDecommissioning()) {
+      eligibleDestinationServers.add(sourceServer);
+    }
+
+    final ServerHolder destination =
+        strategy.findDestinationServerToMoveSegment(segment, sourceServer, eligibleDestinationServers);
+
+    if (destination == null || destination.getServer().equals(sourceServer.getServer())) {
+      incrementStat(Error.MOVE_SKIPPED_OPTIMALLY_PLACED, segment, tier);
+      return false;
+    } else if (moveSegment(segment, sourceServer, destination)) {
+      incrementStat(Stats.Segments.MOVED, segment, tier);
+      return true;
+    } else {
+      incrementStat(Error.MOVE_FAILED, segment, tier);
+      return false;
+    }
+  }
+
+  /**
+   * Moves the given segment from serverA to serverB.
+   */
+  private boolean moveSegment(DataSegment segment, ServerHolder serverA, ServerHolder serverB)
+  {
+    final String tier = serverA.getServer().getTier();
+    if (serverA.isLoadingSegment(segment)) {
+      // Cancel the load on serverA and load on serverB instead
+      if (serverA.cancelOperation(SegmentAction.LOAD, segment)) {
+        int loadedCountOnTier = replicantLookup.getServedReplicas(segment.getId(), tier);
+        return loadSegment(segment, serverB, loadedCountOnTier >= 1);
+      }
+
+      // Could not cancel load, let the segment load on serverA and count it as unmoved
+      return false;
+    } else if (serverA.isServingSegment(segment)) {
+      return loadQueueManager.moveSegment(segment, serverA, serverB);
+    } else {
+      return false;
+    }
+  }
+
+  @Override
+  public void updateSegmentReplicasInTiers(DataSegment segment, Map<String, Integer> tierToReplicaCount)
+  {
+    // Identify empty tiers and determine total required replicas
+    final AtomicInteger requiredTotalReplicas = new AtomicInteger(0);
+    final Set<String> allTiers = Sets.newHashSet(cluster.getTierNames());
+    tierToReplicaCount.forEach((tier, requiredReplicas) -> {
+      reportTierCapacityStats(segment, requiredReplicas, tier);
+      replicantLookup.setRequiredReplicas(segment.getId(), false, tier, requiredReplicas);
+      if (allTiers.contains(tier)) {
+        requiredTotalReplicas.addAndGet(requiredReplicas);
+      } else {
+        tiersWithNoServer.add(tier);
+      }
+    });
+
+    final int totalOverReplication =
+        replicantLookup.getTotalServedReplicas(segment.getId()) - requiredTotalReplicas.get();
+
+    // Update replicas in every tier
+    int totalDropsQueued = 0;
+    for (String tier : allTiers) {
+      totalDropsQueued += updateReplicasInTier(
+          segment,
+          tier,
+          tierToReplicaCount.getOrDefault(tier, 0),
+          totalOverReplication - totalDropsQueued
+      );
+    }
+  }
+
+  /**
+   * Queues load or drop operations on this tier based on the required
+   * number of replicas and the current state.
+   * <p>
+   * The {@code maxReplicasToDrop} helps to maintain the required level of
+   * replication in the cluster. This ensures that segment read concurrency does
+   * not suffer during a tier shift or load rule change.
+   * <p>
+   * Returns the number of new drop operations queued on this tier.
+   */
+  private int updateReplicasInTier(
+      DataSegment segment,
+      String tier,
+      int requiredReplicas,
+      int maxReplicasToDrop
+  )
+  {
+    final int projectedReplicas = replicantLookup.getProjectedReplicas(segment.getId(), tier);
+    final int movingReplicas = replicantLookup.getMovingReplicas(segment.getId(), tier);
+    final boolean shouldCancelMoves = requiredReplicas == 0 && movingReplicas > 0;
+
+    // Check if there is any action required on this tier
+    if (projectedReplicas == requiredReplicas && !shouldCancelMoves) {
+      return 0;
+    }
+
+    final SegmentTierStatus segmentStatus =
+        new SegmentTierStatus(segment, cluster.getHistoricalsByTier(tier));
+
+    // Cancel all moves in this tier if it does not need to have replicas
+    if (shouldCancelMoves) {
+      cancelOperations(SegmentAction.MOVE_TO, movingReplicas, segment, segmentStatus);
+      cancelOperations(SegmentAction.MOVE_FROM, movingReplicas, segment, segmentStatus);
+    }
+
+    // Cancel drops and queue loads if the projected count is below the requirement
+    if (projectedReplicas < requiredReplicas) {
+      int replicaDeficit = requiredReplicas - projectedReplicas;
+      int cancelledDrops =
+          cancelOperations(SegmentAction.DROP, replicaDeficit, segment, segmentStatus);
+
+      // Cancelled drops can be counted as loaded replicas, thus reducing deficit
+      int numReplicasToLoad = replicaDeficit - cancelledDrops;
+      if (numReplicasToLoad > 0) {
+        boolean isAlreadyLoadedOnTier = replicantLookup.getServedReplicas(segment.getId(), tier)
+                                        + cancelledDrops >= 1;
+        int numLoadsQueued = loadReplicas(numReplicasToLoad, segment, tier, segmentStatus, isAlreadyLoadedOnTier);
+        incrementStat(Stats.Segments.ASSIGNED, segment, tier, numLoadsQueued);
+      }
+    }
+
+    // Cancel loads and queue drops if the projected count exceeds the requirement
+    if (projectedReplicas > requiredReplicas) {
+      int replicaSurplus = projectedReplicas - requiredReplicas;
+      int cancelledLoads =
+          cancelOperations(SegmentAction.LOAD, replicaSurplus, segment, segmentStatus);
+
+      int numReplicasToDrop = Math.min(replicaSurplus - cancelledLoads, maxReplicasToDrop);
+      if (numReplicasToDrop > 0) {
+        int dropsQueuedOnTier = dropReplicas(numReplicasToDrop, segment, tier, segmentStatus);
+        incrementStat(Stats.Segments.DROPPED, segment, tier, dropsQueuedOnTier);
+        return dropsQueuedOnTier;
+      }
+    }
+
+    return 0;
+  }
+
+  private void reportTierCapacityStats(DataSegment segment, int requiredReplicas, String tier)
+  {
+    final RowKey rowKey = RowKey.forTier(tier);
+    stats.updateMax(Stats.Tier.REPLICATION_FACTOR, rowKey, requiredReplicas);
+    stats.add(Stats.Tier.REQUIRED_CAPACITY, rowKey, segment.getSize() * requiredReplicas);
+  }
+
+  @Override
+  public void broadcastSegment(DataSegment segment)
+  {
+    final Object2IntOpenHashMap<String> tierToRequiredReplicas = new Object2IntOpenHashMap<>();
+    for (ServerHolder server : cluster.getAllServers()) {
+      // Ignore servers which are not broadcast targets
+      if (!server.getServer().getType().isSegmentBroadcastTarget()) {
+        continue;
+      }
+
+      // Drop from decommissioning servers and load on active servers
+      final String tier = server.getServer().getTier();
+      if (server.isDecommissioning()) {
+        boolean dropQueued = dropBroadcastSegment(segment, server);
+        incrementStat(Stats.Segments.DROPPED_BROADCAST, segment, tier, dropQueued ? 1 : 0);
+      } else if (loadBroadcastSegment(segment, server)) {
+        tierToRequiredReplicas.addTo(tier, 1);
+        incrementStat(Stats.Segments.ASSIGNED_BROADCAST, segment, tier);
+      } else {
+        tierToRequiredReplicas.addTo(tier, 1);
+      }
+    }
+
+    // Update required replica counts
+    tierToRequiredReplicas.object2IntEntrySet().fastForEach(
+        entry -> replicantLookup
+            .setRequiredReplicas(segment.getId(), true, entry.getKey(), entry.getIntValue())
+    );
+  }
+
+  @Override
+  public void deleteSegment(DataSegment segment)
+  {
+    loadQueueManager.deleteSegment(segment);
+    stats.addToDatasourceStat(Stats.Segments.DELETED, segment.getDataSource(), 1);
+  }
+
+  /**
+   * Loads the broadcast segment if it is not loaded on the given server.
+   * Returns true only if the segment was successfully queued for load on the server.
+   */
+  private boolean loadBroadcastSegment(DataSegment segment, ServerHolder server)
+  {
+    if (server.isServingSegment(segment) || server.isLoadingSegment(segment)) {
+      return false;
+    } else if (server.isDroppingSegment(segment)) {
+      return server.cancelOperation(SegmentAction.DROP, segment);
+    }
+
+    if (server.canLoadSegment(segment) && loadSegment(segment, server, false)) {
+      return true;
+    } else {
+      log.makeAlert("Could not assign broadcast segment for datasource [%s]", segment.getDataSource())
+         .addData("segmentId", segment.getId())
+         .addData("segmentSize", segment.getSize())
+         .addData("hostName", server.getServer().getHost())
+         .addData("availableSize", server.getAvailableSize())
+         .emit();
+      return false;
+    }
+  }
+
+  /**
+   * Drops the broadcast segment if it is loaded on the given server.
+   * Returns true only if the segment was successfully queued for drop on the server.
+   */
+  private boolean dropBroadcastSegment(DataSegment segment, ServerHolder server)
+  {
+    if (server.isLoadingSegment(segment)) {
+      return server.cancelOperation(SegmentAction.LOAD, segment);
+    } else if (server.isServingSegment(segment)) {
+      return loadQueueManager.dropSegment(segment, server);
+    } else {
+      return false;
+    }
+  }
+
+  /**
+   * Queues drop of {@code numToDrop} replicas of the segment from a tier.
+   * Tries to drop replicas first from decommissioning servers and then from
+   * active servers.
+   * <p>
+   * Returns the number of successfully queued drop operations.
+   */
+  private int dropReplicas(
+      final int numToDrop,
+      DataSegment segment,
+      String tier,
+      SegmentTierStatus segmentStatus
+  )
+  {
+    if (numToDrop <= 0) {
+      return 0;
+    }
+
+    final List<ServerHolder> eligibleServers = segmentStatus.getServersEligibleToDrop();
+    if (eligibleServers.isEmpty()) {
+      incrementStat(Error.NO_ELIGIBLE_SERVER_FOR_DROP, segment, tier);
+      return 0;
+    }
+
+    // Keep eligible servers sorted by most full first
+    final TreeSet<ServerHolder> eligibleLiveServers = new TreeSet<>(Comparator.reverseOrder());
+    final TreeSet<ServerHolder> eligibleDyingServers = new TreeSet<>(Comparator.reverseOrder());
+    for (ServerHolder server : eligibleServers) {
+      if (server.isDecommissioning()) {
+        eligibleDyingServers.add(server);
+      } else {
+        eligibleLiveServers.add(server);
+      }
+    }
+
+    // Drop as many replicas as possible from decommissioning servers
+    int remainingNumToDrop = numToDrop;
+    int numDropsQueued =
+        dropReplicasFromServers(remainingNumToDrop, segment, eligibleDyingServers.iterator(), tier);
+
+    // Drop replicas from active servers if required
+    if (numToDrop > numDropsQueued) {
+      remainingNumToDrop = numToDrop - numDropsQueued;
+      Iterator<ServerHolder> serverIterator =
+          (useRoundRobinAssignment || eligibleLiveServers.size() >= remainingNumToDrop)
+          ? eligibleLiveServers.iterator()
+          : strategy.pickServersToDrop(segment, eligibleLiveServers);
+      numDropsQueued += dropReplicasFromServers(remainingNumToDrop, segment, serverIterator, tier);
+    }
+
+    return numDropsQueued;
+  }
+
+  /**
+   * Queues drop of {@code numToDrop} replicas of the segment from the servers.
+   * Returns the number of successfully queued drop operations.
+   */
+  private int dropReplicasFromServers(
+      int numToDrop,
+      DataSegment segment,
+      Iterator<ServerHolder> serverIterator,
+      String tier
+  )
+  {
+    int numDropsQueued = 0;
+    while (numToDrop > numDropsQueued && serverIterator.hasNext()) {
+      ServerHolder holder = serverIterator.next();
+      boolean dropped = loadQueueManager.dropSegment(segment, holder);
+
+      if (dropped) {
+        ++numDropsQueued;
+      } else {
+        incrementStat(Error.DROP_FAILED, segment, tier);
+      }
+    }
+
+    return numDropsQueued;
+  }
+
+  /**
+   * Queues load of {@code numToLoad} replicas of the segment on a tier.
+   */
+  private int loadReplicas(
+      int numToLoad,
+      DataSegment segment,
+      String tier,
+      SegmentTierStatus segmentStatus,
+      boolean isAlreadyLoadedOnTier
+  )
+  {
+    // Do not assign replicas if tier is already busy loading some
+    if (isAlreadyLoadedOnTier && replicationThrottler.isTierLoadingReplicas(tier)) {
+      return 0;
+    }
+
+    final List<ServerHolder> eligibleServers = segmentStatus.getServersEligibleToLoad();
+    if (eligibleServers.isEmpty()) {
+      incrementStat(Error.NO_ELIGIBLE_SERVER_FOR_LOAD, segment, tier);
+      return 0;
+    }
+
+    final Iterator<ServerHolder> serverIterator =
+        useRoundRobinAssignment
+        ? serverSelector.getServersInTierToLoadSegment(tier, segment)
+        : strategy.findServerToLoadSegment(segment, eligibleServers);
+    if (!serverIterator.hasNext()) {
+      incrementStat(Error.NO_STRATEGIC_SERVER_FOR_LOAD, segment, tier);
+      return 0;
+    }
+
+    // Load the replicas on this tier
+    int numLoadsQueued = 0;
+    while (numLoadsQueued < numToLoad && serverIterator.hasNext()) {
+      numLoadsQueued += loadSegment(segment, serverIterator.next(), isAlreadyLoadedOnTier)
+                        ? 1 : 0;
+    }
+
+    return numLoadsQueued;
+  }
+
+  private boolean loadSegment(DataSegment segment, ServerHolder server, boolean isAlreadyLoadedOnTier)
+  {
+    final String tier = server.getServer().getTier();
+    if (isAlreadyLoadedOnTier && !replicationThrottler.canAssignReplica(tier)) {
+      incrementStat(Error.REPLICA_THROTTLED, segment, tier);
+      return false;
+    }
+
+    final SegmentAction action = isAlreadyLoadedOnTier ? SegmentAction.REPLICATE : SegmentAction.LOAD;
+    final boolean assigned = loadQueueManager.loadSegment(segment, server, action);
+
+    if (!assigned) {
+      incrementStat(Error.LOAD_FAILED, segment, tier);
+    } else if (isAlreadyLoadedOnTier) {
+      replicationThrottler.incrementAssignedReplicas(tier);
+    }
+
+    return assigned;
+  }
+
+  private ReplicationThrottler createReplicationThrottler(CoordinatorDynamicConfig dynamicConfig)
+  {
+    final Set<String> tiersLoadingReplicas = new HashSet<>();
+
+    cluster.getHistoricals().forEach(
+        (tier, historicals) -> {
+          int numLoadingReplicas = historicals.stream().mapToInt(ServerHolder::getNumLoadingReplicas).sum();
+          if (numLoadingReplicas > 0) {
+            log.info(
+                "Tier [%s] will not be assigned replicas as it is already loading [%d] replicas.",
+                tier, numLoadingReplicas
+            );
+            tiersLoadingReplicas.add(tier);
+          }
+        }
+    );
+    return new ReplicationThrottler(
+        tiersLoadingReplicas,
+        dynamicConfig.getReplicationThrottleLimit(),
+        dynamicConfig.getMaxNonPrimaryReplicantsToLoad()

Review Comment:
   ## Deprecated method or constructor invocation
   
   Invoking [CoordinatorDynamicConfig.getMaxNonPrimaryReplicantsToLoad](1) should be avoided because it has been deprecated.
   
   [Show more details](https://github.com/apache/druid/security/code-scanning/4909)



##########
server/src/main/java/org/apache/druid/server/coordinator/balancer/TierSegmentBalancer.java:
##########
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.balancer;
+
+import com.google.common.collect.Lists;
+import org.apache.druid.client.ImmutableDruidDataSource;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
+import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
+import org.apache.druid.server.coordinator.ServerHolder;
+import org.apache.druid.server.coordinator.StrategicSegmentAssigner;
+import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
+import org.apache.druid.server.coordinator.stats.CoordinatorStat;
+import org.apache.druid.timeline.DataSegment;
+
+import javax.annotation.Nullable;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Balances segments within the servers of a tier using the balancer strategy.
+ * Segments are prioritized for move in the following order:
+ * <ul>
+ *   <li>Segments loaded on decommissioning servers</li>
+ *   <li>Segments loading on active servers</li>
+ *   <li>Segments loaded on active servers</li>
+ * </ul>
+ */
+public class TierSegmentBalancer
+{
+  private static final EmittingLogger log = new EmittingLogger(TierSegmentBalancer.class);
+
+  private final String tier;
+  private final DruidCoordinatorRuntimeParams params;
+  private final StrategicSegmentAssigner segmentAssigner;
+
+  private final BalancerStrategy strategy;
+  private final CoordinatorDynamicConfig dynamicConfig;
+  private final CoordinatorRunStats runStats;
+
+  private final Set<ServerHolder> allServers;
+  private final List<ServerHolder> activeServers;
+  private final List<ServerHolder> decommissioningServers;
+  private final int totalMaxSegmentsToMove;
+
+  private final int movingSegmentCount;
+
+  public TierSegmentBalancer(
+      String tier,
+      Set<ServerHolder> servers,
+      DruidCoordinatorRuntimeParams params
+  )
+  {
+    this.tier = tier;
+    this.params = params;
+    this.segmentAssigner = params.getSegmentAssigner();
+
+    this.strategy = params.getBalancerStrategy();
+    this.dynamicConfig = params.getCoordinatorDynamicConfig();
+    this.totalMaxSegmentsToMove = dynamicConfig.getMaxSegmentsToMove();
+    this.runStats = segmentAssigner.getStats();
+
+    Map<Boolean, List<ServerHolder>> partitions =
+        servers.stream().collect(Collectors.partitioningBy(ServerHolder::isDecommissioning));
+    this.decommissioningServers = partitions.get(true);
+    this.activeServers = partitions.get(false);
+    this.allServers = servers;
+
+    this.movingSegmentCount = activeServers.stream().mapToInt(ServerHolder::getNumMovingSegments).sum();
+  }
+
+  public void run()
+  {
+    if (activeServers.isEmpty() || (activeServers.size() <= 1 && decommissioningServers.isEmpty())) {
+      log.warn(
+          "Skipping balance for tier [%s] with [%d] active servers and [%d] decomissioning servers.",
+          tier, activeServers.size(), decommissioningServers.size()
+      );
+      return;
+    }
+
+    log.info(
+        "Moving max [%d] segments in tier [%s] with [%d] active servers and"
+        + " [%d] decommissioning servers. There are [%d] segments already in queue.",
+        totalMaxSegmentsToMove, tier, activeServers.size(), decommissioningServers.size(), movingSegmentCount
+    );
+
+    // Move segments from decommissioning to active servers
+    int movedDecommSegments = 0;
+    if (!decommissioningServers.isEmpty()) {
+      int maxDecommPercentToMove = dynamicConfig.getDecommissioningMaxPercentOfMaxSegmentsToMove();
+      int maxDecommSegmentsToMove = (int) Math.ceil(totalMaxSegmentsToMove * (maxDecommPercentToMove / 100.0));
+      movedDecommSegments +=
+          moveSegmentsFromTo(decommissioningServers, activeServers, maxDecommSegmentsToMove);
+      log.info(
+          "Moved [%d] segments out of max [%d (%d%%)] from decommissioning to active servers in tier [%s].",
+          movedDecommSegments, maxDecommSegmentsToMove, maxDecommPercentToMove, tier
+      );
+    }
+
+    // Move segments across active servers
+    int maxGeneralSegmentsToMove = totalMaxSegmentsToMove - movedDecommSegments;
+    int movedGeneralSegments =
+        moveSegmentsFromTo(activeServers, activeServers, maxGeneralSegmentsToMove);
+    log.info(
+        "Moved [%d] segments out of max [%d] between active servers in tier [%s].",
+        movedGeneralSegments, maxGeneralSegmentsToMove, tier
+    );
+
+    if (dynamicConfig.emitBalancingStats()) {
+      strategy.emitStats(tier, runStats, Lists.newArrayList(allServers));
+    }
+  }
+
+  private int moveSegmentsFromTo(
+      List<ServerHolder> sourceServers,
+      List<ServerHolder> destServers,
+      int maxSegmentsToMove
+  )
+  {
+    if (maxSegmentsToMove <= 0 || sourceServers.isEmpty() || destServers.isEmpty()) {
+      return 0;
+    }
+
+    // Always move loading segments first as it is a cheaper operation
+    Iterator<BalancerSegmentHolder> pickedSegments
+        = pickSegmentsFrom(sourceServers, maxSegmentsToMove, true);
+    int movedCount = moveSegmentsTo(destServers, pickedSegments, maxSegmentsToMove);
+
+    // Move loaded segments only if tier is not already busy moving segments
+    if (movingSegmentCount <= 0) {
+      maxSegmentsToMove -= movedCount;
+      pickedSegments = pickSegmentsFrom(sourceServers, maxSegmentsToMove, false);
+      movedCount += moveSegmentsTo(destServers, pickedSegments, maxSegmentsToMove);
+    }
+
+    return movedCount;
+  }
+
+  private Iterator<BalancerSegmentHolder> pickSegmentsFrom(
+      List<ServerHolder> sourceServers,
+      int maxSegmentsToPick,
+      boolean pickLoadingSegments
+  )
+  {
+    if (maxSegmentsToPick <= 0 || sourceServers.isEmpty()) {
+      return Collections.emptyIterator();
+    } else if (dynamicConfig.useBatchedSegmentSampler()) {
+      return strategy.pickSegmentsToMove(
+          sourceServers,
+          params.getBroadcastDatasources(),
+          maxSegmentsToPick,
+          pickLoadingSegments
+      );
+    } else if (pickLoadingSegments) {
+      // non-batched sampler cannot pick loading segments
+      return Collections.emptyIterator();
+    } else {
+      return strategy.pickSegmentsToMove(
+          sourceServers,
+          params.getBroadcastDatasources(),
+          dynamicConfig.getPercentOfSegmentsToConsiderPerMove()
+      );

Review Comment:
   ## Deprecated method or constructor invocation
   
   Invoking [BalancerStrategy.pickSegmentsToMove](1) should be avoided because it has been deprecated.
   
   [Show more details](https://github.com/apache/druid/security/code-scanning/4910)



##########
server/src/main/java/org/apache/druid/server/coordinator/balancer/TierSegmentBalancer.java:
##########
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.balancer;
+
+import com.google.common.collect.Lists;
+import org.apache.druid.client.ImmutableDruidDataSource;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
+import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
+import org.apache.druid.server.coordinator.ServerHolder;
+import org.apache.druid.server.coordinator.StrategicSegmentAssigner;
+import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
+import org.apache.druid.server.coordinator.stats.CoordinatorStat;
+import org.apache.druid.timeline.DataSegment;
+
+import javax.annotation.Nullable;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Balances segments within the servers of a tier using the balancer strategy.
+ * Segments are prioritized for move in the following order:
+ * <ul>
+ *   <li>Segments loaded on decommissioning servers</li>
+ *   <li>Segments loading on active servers</li>
+ *   <li>Segments loaded on active servers</li>
+ * </ul>
+ */
+public class TierSegmentBalancer
+{
+  private static final EmittingLogger log = new EmittingLogger(TierSegmentBalancer.class);
+
+  private final String tier;
+  private final DruidCoordinatorRuntimeParams params;
+  private final StrategicSegmentAssigner segmentAssigner;
+
+  private final BalancerStrategy strategy;
+  private final CoordinatorDynamicConfig dynamicConfig;
+  private final CoordinatorRunStats runStats;
+
+  private final Set<ServerHolder> allServers;
+  private final List<ServerHolder> activeServers;
+  private final List<ServerHolder> decommissioningServers;
+  private final int totalMaxSegmentsToMove;
+
+  private final int movingSegmentCount;
+
+  public TierSegmentBalancer(
+      String tier,
+      Set<ServerHolder> servers,
+      DruidCoordinatorRuntimeParams params
+  )
+  {
+    this.tier = tier;
+    this.params = params;
+    this.segmentAssigner = params.getSegmentAssigner();
+
+    this.strategy = params.getBalancerStrategy();
+    this.dynamicConfig = params.getCoordinatorDynamicConfig();
+    this.totalMaxSegmentsToMove = dynamicConfig.getMaxSegmentsToMove();
+    this.runStats = segmentAssigner.getStats();
+
+    Map<Boolean, List<ServerHolder>> partitions =
+        servers.stream().collect(Collectors.partitioningBy(ServerHolder::isDecommissioning));
+    this.decommissioningServers = partitions.get(true);
+    this.activeServers = partitions.get(false);
+    this.allServers = servers;
+
+    this.movingSegmentCount = activeServers.stream().mapToInt(ServerHolder::getNumMovingSegments).sum();
+  }
+
+  public void run()
+  {
+    if (activeServers.isEmpty() || (activeServers.size() <= 1 && decommissioningServers.isEmpty())) {
+      log.warn(
+          "Skipping balance for tier [%s] with [%d] active servers and [%d] decomissioning servers.",
+          tier, activeServers.size(), decommissioningServers.size()
+      );
+      return;
+    }
+
+    log.info(
+        "Moving max [%d] segments in tier [%s] with [%d] active servers and"
+        + " [%d] decommissioning servers. There are [%d] segments already in queue.",
+        totalMaxSegmentsToMove, tier, activeServers.size(), decommissioningServers.size(), movingSegmentCount
+    );
+
+    // Move segments from decommissioning to active servers
+    int movedDecommSegments = 0;
+    if (!decommissioningServers.isEmpty()) {
+      int maxDecommPercentToMove = dynamicConfig.getDecommissioningMaxPercentOfMaxSegmentsToMove();
+      int maxDecommSegmentsToMove = (int) Math.ceil(totalMaxSegmentsToMove * (maxDecommPercentToMove / 100.0));
+      movedDecommSegments +=
+          moveSegmentsFromTo(decommissioningServers, activeServers, maxDecommSegmentsToMove);
+      log.info(
+          "Moved [%d] segments out of max [%d (%d%%)] from decommissioning to active servers in tier [%s].",
+          movedDecommSegments, maxDecommSegmentsToMove, maxDecommPercentToMove, tier
+      );
+    }
+
+    // Move segments across active servers
+    int maxGeneralSegmentsToMove = totalMaxSegmentsToMove - movedDecommSegments;
+    int movedGeneralSegments =
+        moveSegmentsFromTo(activeServers, activeServers, maxGeneralSegmentsToMove);
+    log.info(
+        "Moved [%d] segments out of max [%d] between active servers in tier [%s].",
+        movedGeneralSegments, maxGeneralSegmentsToMove, tier
+    );
+
+    if (dynamicConfig.emitBalancingStats()) {
+      strategy.emitStats(tier, runStats, Lists.newArrayList(allServers));
+    }
+  }
+
+  private int moveSegmentsFromTo(
+      List<ServerHolder> sourceServers,
+      List<ServerHolder> destServers,
+      int maxSegmentsToMove
+  )
+  {
+    if (maxSegmentsToMove <= 0 || sourceServers.isEmpty() || destServers.isEmpty()) {
+      return 0;
+    }
+
+    // Always move loading segments first as it is a cheaper operation
+    Iterator<BalancerSegmentHolder> pickedSegments
+        = pickSegmentsFrom(sourceServers, maxSegmentsToMove, true);
+    int movedCount = moveSegmentsTo(destServers, pickedSegments, maxSegmentsToMove);
+
+    // Move loaded segments only if tier is not already busy moving segments
+    if (movingSegmentCount <= 0) {
+      maxSegmentsToMove -= movedCount;
+      pickedSegments = pickSegmentsFrom(sourceServers, maxSegmentsToMove, false);
+      movedCount += moveSegmentsTo(destServers, pickedSegments, maxSegmentsToMove);
+    }
+
+    return movedCount;
+  }
+
+  private Iterator<BalancerSegmentHolder> pickSegmentsFrom(
+      List<ServerHolder> sourceServers,
+      int maxSegmentsToPick,
+      boolean pickLoadingSegments
+  )
+  {
+    if (maxSegmentsToPick <= 0 || sourceServers.isEmpty()) {
+      return Collections.emptyIterator();
+    } else if (dynamicConfig.useBatchedSegmentSampler()) {
+      return strategy.pickSegmentsToMove(
+          sourceServers,
+          params.getBroadcastDatasources(),
+          maxSegmentsToPick,
+          pickLoadingSegments
+      );
+    } else if (pickLoadingSegments) {
+      // non-batched sampler cannot pick loading segments
+      return Collections.emptyIterator();
+    } else {
+      return strategy.pickSegmentsToMove(
+          sourceServers,
+          params.getBroadcastDatasources(),
+          dynamicConfig.getPercentOfSegmentsToConsiderPerMove()

Review Comment:
   ## Deprecated method or constructor invocation
   
   Invoking [CoordinatorDynamicConfig.getPercentOfSegmentsToConsiderPerMove](1) should be avoided because it has been deprecated.
   
   [Show more details](https://github.com/apache/druid/security/code-scanning/4911)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org