You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ab...@apache.org on 2022/09/29 06:36:38 UTC

[druid] branch master updated: Fix over-replication caused by balancing when inventory is not updated yet (#13114)

This is an automated email from the ASF dual-hosted git repository.

abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new ce5f55e5ce Fix over-replication caused by balancing when inventory is not updated yet (#13114)
ce5f55e5ce is described below

commit ce5f55e5ce00d876277424ea0724b70a330315b3
Author: Kashif Faraz <ka...@gmail.com>
AuthorDate: Thu Sep 29 12:06:23 2022 +0530

    Fix over-replication caused by balancing when inventory is not updated yet (#13114)
    
    * Add coordinator test framework
    
    * Remove outdated changes
    
    * Add more tests
    
    * Add option to auto-sync inventory
    
    * Minor cleanup
    
    * Fix inspections
    
    * Add README for simulations, add SegmentLoadingNegativeTest
    
    * Fix over-replication from balancing
    
    * Fix README
    
    * Cleanup unnecessary fields from DruidCoordinator
    
    * Add a test
    
    * Fix DruidCoordinatorTest
    
    * Remove unused import
    
    * Fix CuratorDruidCoordinatorTest
    
    * Remove test log4j2.xml
---
 .../druid/indexing/overlord/http/OverlordTest.java |  21 +--
 .../server/coordinator/CuratorLoadQueuePeon.java   |  66 ++++----
 .../druid/server/coordinator/DruidCoordinator.java |  60 +++----
 .../server/coordinator/HttpLoadQueuePeon.java      |   8 +-
 .../druid/server/coordinator/LoadPeonCallback.java |   2 +-
 .../server/coordinator/LoadQueueTaskMaster.java    |   9 +-
 .../server/coordinator/duty/BalanceSegments.java   |  25 +--
 .../coordinator/duty/UnloadUnusedSegments.java     |   2 +-
 .../druid/server/coordinator/rules/LoadRule.java   |   5 +-
 .../discovery/LatchableServiceAnnouncer.java       |  64 ++++++++
 .../server/coordinator/BalanceSegmentsTester.java  |   2 +-
 .../coordinator/CuratorDruidCoordinatorTest.java   | 112 +++----------
 .../server/coordinator/DruidCoordinatorTest.java   | 181 +++------------------
 .../server/coordinator/HttpLoadQueuePeonTest.java  |  16 +-
 .../server/coordinator/LoadQueuePeonTest.java      |  24 +--
 .../simulate/CoordinatorSimulation.java            |   5 +
 .../simulate/CoordinatorSimulationBaseTest.java    |   6 +
 .../simulate/CoordinatorSimulationBuilder.java     |  44 +++--
 .../druid/server/coordinator/simulate/README.md    |   4 +-
 .../coordinator/simulate/SegmentBalancingTest.java |  55 ++++++-
 .../simulate/SegmentLoadingNegativeTest.java       |  40 -----
 21 files changed, 299 insertions(+), 452 deletions(-)

diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java
index 02a71f8560..d0652d3aa6 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java
@@ -32,7 +32,7 @@ import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.TestingServer;
 import org.apache.curator.test.Timing;
 import org.apache.druid.curator.PotentiallyGzippedCompressionProvider;
-import org.apache.druid.curator.discovery.NoopServiceAnnouncer;
+import org.apache.druid.curator.discovery.LatchableServiceAnnouncer;
 import org.apache.druid.discovery.DruidLeaderSelector;
 import org.apache.druid.indexer.TaskLocation;
 import org.apache.druid.indexer.TaskState;
@@ -180,22 +180,9 @@ public class OverlordTest
         taskStorage,
         taskActionClientFactory,
         druidNode,
-        new TaskRunnerFactory<MockTaskRunner>()
-        {
-          @Override
-          public MockTaskRunner build()
-          {
-            return new MockTaskRunner(runTaskCountDownLatches, taskCompletionCountDownLatches);
-          }
-        },
-        new NoopServiceAnnouncer()
-        {
-          @Override
-          public void announce(DruidNode node)
-          {
-            announcementLatch.countDown();
-          }
-        },
+        (TaskRunnerFactory<MockTaskRunner>) () ->
+            new MockTaskRunner(runTaskCountDownLatches, taskCompletionCountDownLatches),
+        new LatchableServiceAnnouncer(announcementLatch, null),
         new CoordinatorOverlordServiceConfig(null, null),
         serviceEmitter,
         supervisorManager,
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/CuratorLoadQueuePeon.java b/server/src/main/java/org/apache/druid/server/coordinator/CuratorLoadQueuePeon.java
index ff7e2fcc39..24c96e19d2 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/CuratorLoadQueuePeon.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/CuratorLoadQueuePeon.java
@@ -66,8 +66,6 @@ import java.util.concurrent.atomic.AtomicLong;
 public class CuratorLoadQueuePeon extends LoadQueuePeon
 {
   private static final EmittingLogger log = new EmittingLogger(CuratorLoadQueuePeon.class);
-  private static final int DROP = 0;
-  private static final int LOAD = 1;
 
   private final CuratorFramework curator;
   private final String basePath;
@@ -185,7 +183,7 @@ public class CuratorLoadQueuePeon extends LoadQueuePeon
   @Override
   public void loadSegment(final DataSegment segment, @Nullable final LoadPeonCallback callback)
   {
-    SegmentHolder segmentHolder = new SegmentHolder(segment, LOAD, Collections.singletonList(callback));
+    SegmentHolder segmentHolder = new SegmentHolder(segment, Action.LOAD, Collections.singletonList(callback));
     final SegmentHolder existingHolder = segmentsToLoad.putIfAbsent(segment, segmentHolder);
     if (existingHolder != null) {
       existingHolder.addCallback(callback);
@@ -199,7 +197,7 @@ public class CuratorLoadQueuePeon extends LoadQueuePeon
   @Override
   public void dropSegment(final DataSegment segment, @Nullable final LoadPeonCallback callback)
   {
-    SegmentHolder segmentHolder = new SegmentHolder(segment, DROP, Collections.singletonList(callback));
+    SegmentHolder segmentHolder = new SegmentHolder(segment, Action.DROP, Collections.singletonList(callback));
     final SegmentHolder existingHolder = segmentsToDrop.putIfAbsent(segment, segmentHolder);
     if (existingHolder != null) {
       existingHolder.addCallback(callback);
@@ -240,7 +238,7 @@ public class CuratorLoadQueuePeon extends LoadQueuePeon
         log.debug(
             "ZKNode created for server to [%s] %s [%s]",
             basePath,
-            segmentHolder.getType() == LOAD ? "load" : "drop",
+            segmentHolder.getAction(),
             segmentHolder.getSegmentIdentifier()
         );
         final ScheduledFuture<?> nodeDeletedCheck = scheduleNodeDeletedCheck(path);
@@ -251,7 +249,7 @@ public class CuratorLoadQueuePeon extends LoadQueuePeon
                   // Cancel the check node deleted task since we have already
                   // been notified by the zk watcher
                   nodeDeletedCheck.cancel(true);
-                  entryRemoved(segmentHolder, watchedEvent.getPath());
+                  onZkNodeDeleted(segmentHolder, watchedEvent.getPath());
                   break;
                 default:
                   // do nothing
@@ -259,9 +257,8 @@ public class CuratorLoadQueuePeon extends LoadQueuePeon
             }
         ).forPath(path);
 
+        // Cleanup watcher to avoid memory leak if we missed the NodeDeleted event
         if (stat == null) {
-          final byte[] noopPayload = jsonMapper.writeValueAsBytes(new SegmentChangeRequestNoop());
-
           // Create a node and then delete it to remove the registered watcher.  This is a work-around for
           // a zookeeper race condition.  Specifically, when you set a watcher, it fires on the next event
           // that happens for that node.  If no events happen, the watcher stays registered foreverz.
@@ -275,15 +272,16 @@ public class CuratorLoadQueuePeon extends LoadQueuePeon
           //
           // We do not create the existence watcher first, because then it will fire when we create the
           // node and we'll have the same race when trying to refresh that watcher.
+          final byte[] noopPayload = jsonMapper.writeValueAsBytes(new SegmentChangeRequestNoop());
           curator.create().withMode(CreateMode.EPHEMERAL).forPath(path, noopPayload);
-          entryRemoved(segmentHolder, path);
+          onZkNodeDeleted(segmentHolder, path);
         }
       }
       catch (KeeperException.NodeExistsException ne) {
         // This is expected when historicals haven't yet picked up processing this segment and coordinator
         // tries reassigning it to the same node.
         log.warn(ne, "ZK node already exists because segment change request hasn't yet been processed");
-        failAssign(segmentHolder, true);
+        failAssign(segmentHolder, true, null);
       }
       catch (Exception e) {
         failAssign(segmentHolder, false, e);
@@ -300,13 +298,15 @@ public class CuratorLoadQueuePeon extends LoadQueuePeon
                 failAssign(
                     segmentHolder,
                     true,
-                    new ISE("Failing this %s operation since it timed out and %s was never removed! These segments might still get processed",
-                            segmentHolder.getType() == DROP ? "DROP" : "LOAD",
-                            path
+                    new ISE(
+                        "%s operation timed out and [%s] was never removed! "
+                        + "These segments may still get processed.",
+                        segmentHolder.getAction(),
+                        path
                     )
                 );
               } else {
-                log.debug("%s detected to be removed. ", path);
+                log.debug("Path [%s] has been removed.", path);
               }
             }
             catch (Exception e) {
@@ -322,7 +322,7 @@ public class CuratorLoadQueuePeon extends LoadQueuePeon
 
   private void actionCompleted(SegmentHolder segmentHolder)
   {
-    switch (segmentHolder.getType()) {
+    switch (segmentHolder.getAction()) {
       case LOAD:
         // When load failed a segment will be removed from the segmentsToLoad twice and
         // null value will be returned at the second time in which case queueSize may be negative.
@@ -339,7 +339,7 @@ public class CuratorLoadQueuePeon extends LoadQueuePeon
       default:
         throw new UnsupportedOperationException();
     }
-    executeCallbacks(segmentHolder);
+    executeCallbacks(segmentHolder, true);
   }
 
 
@@ -352,12 +352,12 @@ public class CuratorLoadQueuePeon extends LoadQueuePeon
   public void stop()
   {
     for (SegmentHolder holder : segmentsToDrop.values()) {
-      executeCallbacks(holder);
+      executeCallbacks(holder, false);
     }
     segmentsToDrop.clear();
 
     for (SegmentHolder holder : segmentsToLoad.values()) {
-      executeCallbacks(holder);
+      executeCallbacks(holder, false);
     }
     segmentsToLoad.clear();
 
@@ -366,7 +366,7 @@ public class CuratorLoadQueuePeon extends LoadQueuePeon
     failedAssignCount.set(0);
   }
 
-  private void entryRemoved(SegmentHolder segmentHolder, String path)
+  private void onZkNodeDeleted(SegmentHolder segmentHolder, String path)
   {
     if (!ZKPaths.getNodeFromPath(path).equals(segmentHolder.getSegmentIdentifier())) {
       log.warn(
@@ -381,16 +381,11 @@ public class CuratorLoadQueuePeon extends LoadQueuePeon
     log.debug(
         "Server[%s] done processing %s of segment [%s]",
         basePath,
-        segmentHolder.getType() == LOAD ? "load" : "drop",
+        segmentHolder.getAction(),
         path
     );
   }
 
-  private void failAssign(SegmentHolder segmentHolder, boolean handleTimeout)
-  {
-    failAssign(segmentHolder, handleTimeout, null);
-  }
-
   private void failAssign(SegmentHolder segmentHolder, boolean handleTimeout, Exception e)
   {
     if (e != null) {
@@ -404,33 +399,38 @@ public class CuratorLoadQueuePeon extends LoadQueuePeon
       // needs to take this into account.
       log.debug(
           "Skipping segment removal from [%s] queue, since ZK Node still exists!",
-          segmentHolder.getType() == DROP ? "DROP" : "LOAD"
+          segmentHolder.getAction()
       );
       timedOutSegments.add(segmentHolder.getSegment());
-      executeCallbacks(segmentHolder);
+      executeCallbacks(segmentHolder, false);
     } else {
       // This may have failed for a different reason and so act like it was completed.
       actionCompleted(segmentHolder);
     }
   }
 
+  private enum Action
+  {
+    LOAD, DROP
+  }
+
   private static class SegmentHolder
   {
     private final DataSegment segment;
     private final DataSegmentChangeRequest changeRequest;
-    private final int type;
+    private final Action type;
     // Guaranteed to store only non-null elements
     private final List<LoadPeonCallback> callbacks = new ArrayList<>();
 
     private SegmentHolder(
         DataSegment segment,
-        int type,
+        Action type,
         Collection<LoadPeonCallback> callbacksParam
     )
     {
       this.segment = segment;
       this.type = type;
-      this.changeRequest = (type == LOAD)
+      this.changeRequest = (type == Action.LOAD)
                            ? new SegmentChangeRequestLoad(segment)
                            : new SegmentChangeRequestDrop(segment);
       Iterator<LoadPeonCallback> itr = callbacksParam.iterator();
@@ -447,7 +447,7 @@ public class CuratorLoadQueuePeon extends LoadQueuePeon
       return segment;
     }
 
-    public int getType()
+    public Action getAction()
     {
       return type;
     }
@@ -491,10 +491,10 @@ public class CuratorLoadQueuePeon extends LoadQueuePeon
     }
   }
 
-  private void executeCallbacks(SegmentHolder holder)
+  private void executeCallbacks(SegmentHolder holder, boolean success)
   {
     for (LoadPeonCallback callback : holder.snapshotCallbacks()) {
-      callBackExecutor.submit(() -> callback.execute());
+      callBackExecutor.submit(() -> callback.execute(success));
     }
   }
 }
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
index df13471aa8..ab025c0da3 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
@@ -27,13 +27,10 @@ import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.inject.Inject;
-import com.google.inject.Provider;
 import it.unimi.dsi.fastutil.objects.Object2IntMap;
 import it.unimi.dsi.fastutil.objects.Object2IntMaps;
 import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
 import it.unimi.dsi.fastutil.objects.Object2LongMap;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.utils.ZKPaths;
 import org.apache.druid.client.DataSourcesSnapshot;
 import org.apache.druid.client.DruidDataSource;
 import org.apache.druid.client.DruidServer;
@@ -43,7 +40,6 @@ import org.apache.druid.client.ServerInventoryView;
 import org.apache.druid.client.coordinator.Coordinator;
 import org.apache.druid.client.indexing.IndexingServiceClient;
 import org.apache.druid.common.config.JacksonConfigManager;
-import org.apache.druid.curator.ZkEnablementConfig;
 import org.apache.druid.curator.discovery.ServiceAnnouncer;
 import org.apache.druid.discovery.DruidLeaderSelector;
 import org.apache.druid.guice.ManageLifecycle;
@@ -79,7 +75,6 @@ import org.apache.druid.server.coordinator.duty.RunRules;
 import org.apache.druid.server.coordinator.duty.UnloadUnusedSegments;
 import org.apache.druid.server.coordinator.rules.LoadRule;
 import org.apache.druid.server.coordinator.rules.Rule;
-import org.apache.druid.server.initialization.ZkPathsConfig;
 import org.apache.druid.server.initialization.jetty.ServiceUnavailableException;
 import org.apache.druid.server.lookup.cache.LookupCoordinatorManager;
 import org.apache.druid.timeline.DataSegment;
@@ -137,15 +132,11 @@ public class DruidCoordinator
 
   private final Object lock = new Object();
   private final DruidCoordinatorConfig config;
-  private final ZkPathsConfig zkPaths;
   private final JacksonConfigManager configManager;
   private final SegmentsMetadataManager segmentsMetadataManager;
   private final ServerInventoryView serverInventoryView;
   private final MetadataRuleManager metadataRuleManager;
 
-  @Nullable // Null if zk is disabled
-  private final CuratorFramework curator;
-
   private final ServiceEmitter emitter;
   private final IndexingServiceClient indexingServiceClient;
   private final ScheduledExecutorService exec;
@@ -177,12 +168,10 @@ public class DruidCoordinator
   @Inject
   public DruidCoordinator(
       DruidCoordinatorConfig config,
-      ZkPathsConfig zkPaths,
       JacksonConfigManager configManager,
       SegmentsMetadataManager segmentsMetadataManager,
       ServerInventoryView serverInventoryView,
       MetadataRuleManager metadataRuleManager,
-      Provider<CuratorFramework> curatorProvider,
       ServiceEmitter emitter,
       ScheduledExecutorFactory scheduledExecutorFactory,
       IndexingServiceClient indexingServiceClient,
@@ -195,18 +184,15 @@ public class DruidCoordinator
       BalancerStrategyFactory factory,
       LookupCoordinatorManager lookupCoordinatorManager,
       @Coordinator DruidLeaderSelector coordLeaderSelector,
-      ObjectMapper objectMapper,
-      ZkEnablementConfig zkEnablementConfig
+      ObjectMapper objectMapper
   )
   {
     this(
         config,
-        zkPaths,
         configManager,
         segmentsMetadataManager,
         serverInventoryView,
         metadataRuleManager,
-        curatorProvider,
         emitter,
         scheduledExecutorFactory,
         indexingServiceClient,
@@ -220,19 +206,16 @@ public class DruidCoordinator
         factory,
         lookupCoordinatorManager,
         coordLeaderSelector,
-        objectMapper,
-        zkEnablementConfig
+        objectMapper
     );
   }
 
   DruidCoordinator(
       DruidCoordinatorConfig config,
-      ZkPathsConfig zkPaths,
       JacksonConfigManager configManager,
       SegmentsMetadataManager segmentsMetadataManager,
       ServerInventoryView serverInventoryView,
       MetadataRuleManager metadataRuleManager,
-      Provider<CuratorFramework> curatorProvider,
       ServiceEmitter emitter,
       ScheduledExecutorFactory scheduledExecutorFactory,
       IndexingServiceClient indexingServiceClient,
@@ -246,22 +229,15 @@ public class DruidCoordinator
       BalancerStrategyFactory factory,
       LookupCoordinatorManager lookupCoordinatorManager,
       DruidLeaderSelector coordLeaderSelector,
-      ObjectMapper objectMapper,
-      ZkEnablementConfig zkEnablementConfig
+      ObjectMapper objectMapper
   )
   {
     this.config = config;
-    this.zkPaths = zkPaths;
     this.configManager = configManager;
 
     this.segmentsMetadataManager = segmentsMetadataManager;
     this.serverInventoryView = serverInventoryView;
     this.metadataRuleManager = metadataRuleManager;
-    if (zkEnablementConfig.isEnabled()) {
-      this.curator = curatorProvider.get();
-    } else {
-      this.curator = null;
-    }
     this.emitter = emitter;
     this.indexingServiceClient = indexingServiceClient;
     this.taskMaster = taskMaster;
@@ -440,7 +416,7 @@ public class DruidCoordinator
     if (segment == null) {
       log.makeAlert(new IAE("Can not move null DataSegment"), "Exception moving null segment").emit();
       if (callback != null) {
-        callback.execute();
+        callback.execute(false);
       }
       throw new ISE("Cannot move null DataSegment");
     }
@@ -483,13 +459,10 @@ public class DruidCoordinator
         );
       }
 
-      final String toLoadQueueSegPath =
-          ZKPaths.makePath(zkPaths.getLoadQueuePath(), toServer.getName(), segmentId.toString());
-
-      final LoadPeonCallback loadPeonCallback = () -> {
+      final LoadPeonCallback loadPeonCallback = success -> {
         dropPeon.unmarkSegmentToDrop(segmentToLoad);
         if (callback != null) {
-          callback.execute();
+          callback.execute(success);
         }
       };
 
@@ -499,14 +472,23 @@ public class DruidCoordinator
       try {
         loadPeon.loadSegment(
             segmentToLoad,
-            () -> {
+            success -> {
+              // Drop segment only if:
+              // (1) segment load was successful on toServer
+              // AND (2) segment not already queued for drop on fromServer
+              // AND (3a) loading is http-based
+              //     OR (3b) inventory shows segment loaded on toServer
+
+              // Do not check the inventory with http loading as the HTTP
+              // response is enough to determine load success or failure
               try {
-                if (serverInventoryView.isSegmentLoadedByServer(toServer.getName(), segment) &&
-                    (curator == null || curator.checkExists().forPath(toLoadQueueSegPath) == null) &&
-                    !dropPeon.getSegmentsToDrop().contains(segment)) {
+                if (success
+                    && !dropPeon.getSegmentsToDrop().contains(segment)
+                    && (taskMaster.isHttpLoading()
+                     || serverInventoryView.isSegmentLoadedByServer(toServer.getName(), segment))) {
                   dropPeon.dropSegment(segment, loadPeonCallback);
                 } else {
-                  loadPeonCallback.execute();
+                  loadPeonCallback.execute(success);
                 }
               }
               catch (Exception e) {
@@ -523,7 +505,7 @@ public class DruidCoordinator
     catch (Exception e) {
       log.makeAlert(e, "Exception moving segment %s", segmentId).emit();
       if (callback != null) {
-        callback.execute();
+        callback.execute(false);
       }
     }
   }
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/HttpLoadQueuePeon.java b/server/src/main/java/org/apache/druid/server/coordinator/HttpLoadQueuePeon.java
index 4a171e51a1..e970c56c4e 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/HttpLoadQueuePeon.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/HttpLoadQueuePeon.java
@@ -375,7 +375,7 @@ public class HttpLoadQueuePeon extends LoadQueuePeon
             serverId,
             segment.getId()
         );
-        callback.execute();
+        callback.execute(false);
         return;
       }
 
@@ -401,7 +401,7 @@ public class HttpLoadQueuePeon extends LoadQueuePeon
             serverId,
             segment.getId()
         );
-        callback.execute();
+        callback.execute(false);
         return;
       }
       SegmentHolder holder = segmentsToDrop.get(segment);
@@ -536,7 +536,7 @@ public class HttpLoadQueuePeon extends LoadQueuePeon
       callBackExecutor.execute(() -> {
         for (LoadPeonCallback callback : callbacks) {
           if (callback != null) {
-            callback.execute();
+            callback.execute(true);
           }
         }
       });
@@ -557,7 +557,7 @@ public class HttpLoadQueuePeon extends LoadQueuePeon
       callBackExecutor.execute(() -> {
         for (LoadPeonCallback callback : callbacks) {
           if (callback != null) {
-            callback.execute();
+            callback.execute(false);
           }
         }
       });
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/LoadPeonCallback.java b/server/src/main/java/org/apache/druid/server/coordinator/LoadPeonCallback.java
index 67ff8f387f..d53d878e27 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/LoadPeonCallback.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/LoadPeonCallback.java
@@ -30,5 +30,5 @@ public interface LoadPeonCallback
    * is important to take extra measures to ensure that whatever side effects they expect to happen upon success
    * have happened. Coordinator will have a complete and correct view of the cluster in the next run period.
    */
-  void execute();
+  void execute(boolean success);
 }
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/LoadQueueTaskMaster.java b/server/src/main/java/org/apache/druid/server/coordinator/LoadQueueTaskMaster.java
index e4f22832e9..88c74dea0f 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/LoadQueueTaskMaster.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/LoadQueueTaskMaster.java
@@ -42,6 +42,7 @@ public class LoadQueueTaskMaster
   private final DruidCoordinatorConfig config;
   private final HttpClient httpClient;
   private final ZkPathsConfig zkPaths;
+  private final boolean httpLoading;
 
   public LoadQueueTaskMaster(
       Provider<CuratorFramework> curatorFrameworkProvider,
@@ -60,11 +61,12 @@ public class LoadQueueTaskMaster
     this.config = config;
     this.httpClient = httpClient;
     this.zkPaths = zkPaths;
+    this.httpLoading = "http".equalsIgnoreCase(config.getLoadQueuePeonType());
   }
 
   public LoadQueuePeon giveMePeon(ImmutableDruidServer server)
   {
-    if ("http".equalsIgnoreCase(config.getLoadQueuePeonType())) {
+    if (httpLoading) {
       return new HttpLoadQueuePeon(server.getURL(), jsonMapper, httpClient, config, peonExec, callbackExec);
     } else {
       return new CuratorLoadQueuePeon(
@@ -77,4 +79,9 @@ public class LoadQueueTaskMaster
       );
     }
   }
+
+  public boolean isHttpLoading()
+  {
+    return httpLoading;
+  }
 }
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java
index 198a7cf5e8..41871dd039 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java
@@ -22,7 +22,6 @@ package org.apache.druid.server.coordinator.duty;
 import com.google.common.collect.Lists;
 import org.apache.druid.client.ImmutableDruidServer;
 import org.apache.druid.java.util.common.Pair;
-import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.server.coordinator.BalancerSegmentHolder;
 import org.apache.druid.server.coordinator.BalancerStrategy;
@@ -293,26 +292,18 @@ public class BalanceSegments implements CoordinatorDuty
         new ServerHolder(toServer, toPeon).getAvailableSize() > segmentToMove.getSize()) {
       log.debug("Moving [%s] from [%s] to [%s]", segmentId, fromServer.getName(), toServer.getName());
 
-      LoadPeonCallback callback = null;
+      ConcurrentMap<SegmentId, BalancerSegmentHolder> movingSegments =
+          currentlyMovingSegments.get(toServer.getTier());
+      movingSegments.put(segmentId, segment);
+      final LoadPeonCallback callback = moveSuccess -> movingSegments.remove(segmentId);
       try {
-        ConcurrentMap<SegmentId, BalancerSegmentHolder> movingSegments =
-            currentlyMovingSegments.get(toServer.getTier());
-        movingSegments.put(segmentId, segment);
-        callback = () -> movingSegments.remove(segmentId);
-        coordinator.moveSegment(
-            params,
-            fromServer,
-            toServer,
-            segmentToMove,
-            callback
-        );
+        coordinator
+            .moveSegment(params, fromServer, toServer, segmentToMove, callback);
         return true;
       }
       catch (Exception e) {
-        log.makeAlert(e, StringUtils.format("[%s] : Moving exception", segmentId)).emit();
-        if (callback != null) {
-          callback.execute();
-        }
+        log.makeAlert(e, "[%s] : Moving exception", segmentId).emit();
+        callback.execute(false);
       }
     }
     return false;
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/UnloadUnusedSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/UnloadUnusedSegments.java
index bd8b2c30d5..75bcb8fe17 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/duty/UnloadUnusedSegments.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/UnloadUnusedSegments.java
@@ -136,7 +136,7 @@ public class UnloadUnusedSegments implements CoordinatorDuty
           LoadQueuePeon queuePeon = params.getLoadManagementPeons().get(server.getName());
 
           if (!queuePeon.getSegmentsToDrop().contains(segment)) {
-            queuePeon.dropSegment(segment, () -> {});
+            queuePeon.dropSegment(segment, success -> {});
             stats.addToTieredStat("unneededCount", server.getTier(), 1);
             log.info(
                 "Dropping uneeded segment [%s] from server [%s] in tier [%s]",
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/rules/LoadRule.java b/server/src/main/java/org/apache/druid/server/coordinator/rules/LoadRule.java
index 9e6dd04a38..3b20cf0913 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/rules/LoadRule.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/rules/LoadRule.java
@@ -377,7 +377,10 @@ public abstract class LoadRule implements Rule
           holder.getServer().getTier(),
           getReplicationLogString()
       );
-      holder.getPeon().loadSegment(segment, () -> throttler.unregisterReplicantCreation(tier, segmentId));
+      holder.getPeon().loadSegment(
+          segment,
+          loadSuccess -> throttler.unregisterReplicantCreation(tier, segmentId)
+      );
     }
 
     return numToAssign;
diff --git a/server/src/test/java/org/apache/druid/curator/discovery/LatchableServiceAnnouncer.java b/server/src/test/java/org/apache/druid/curator/discovery/LatchableServiceAnnouncer.java
new file mode 100644
index 0000000000..61e77ba256
--- /dev/null
+++ b/server/src/test/java/org/apache/druid/curator/discovery/LatchableServiceAnnouncer.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.curator.discovery;
+
+import org.apache.druid.server.DruidNode;
+
+import javax.annotation.Nullable;
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * A test service announcer that counts down the corresponding latches upon
+ * invocation of {@link #announce(DruidNode)} and {@link #unannounce(DruidNode)}.
+ */
+public class LatchableServiceAnnouncer implements ServiceAnnouncer
+{
+  private final CountDownLatch announceLatch;
+  private final CountDownLatch unannounceLatch;
+
+  /**
+   * Creates a new {@link LatchableServiceAnnouncer} with the given countdown
+   * latches for announce and unannounce actions.
+   */
+  public LatchableServiceAnnouncer(
+      @Nullable CountDownLatch announceLatch,
+      @Nullable CountDownLatch unannounceLatch
+  )
+  {
+    this.announceLatch = announceLatch;
+    this.unannounceLatch = unannounceLatch;
+  }
+
+  @Override
+  public void announce(DruidNode node)
+  {
+    if (announceLatch != null) {
+      announceLatch.countDown();
+    }
+  }
+
+  @Override
+  public void unannounce(DruidNode node)
+  {
+    if (unannounceLatch != null) {
+      unannounceLatch.countDown();
+    }
+  }
+}
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsTester.java b/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsTester.java
index 9a4166be74..011b01cf67 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsTester.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsTester.java
@@ -59,7 +59,7 @@ public class BalanceSegmentsTester extends BalanceSegments
       try {
         final LoadQueuePeon loadPeon = params.getLoadManagementPeons().get(toServerName);
 
-        loadPeon.loadSegment(segment.getSegment(), () -> {});
+        loadPeon.loadSegment(segment.getSegment(), success -> {});
 
         final LoadQueuePeon dropPeon = params.getLoadManagementPeons().get(fromServerName);
         dropPeon.markSegmentToDrop(segment.getSegment());
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java
index 64ffe8d5fd..926bccc53e 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java
@@ -38,14 +38,12 @@ import org.apache.druid.client.ImmutableDruidDataSource;
 import org.apache.druid.common.config.JacksonConfigManager;
 import org.apache.druid.curator.CuratorTestBase;
 import org.apache.druid.curator.CuratorUtils;
-import org.apache.druid.curator.ZkEnablementConfig;
-import org.apache.druid.curator.discovery.NoopServiceAnnouncer;
+import org.apache.druid.curator.discovery.LatchableServiceAnnouncer;
 import org.apache.druid.discovery.DruidLeaderSelector;
 import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.java.util.common.concurrent.Execs;
-import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory;
 import org.apache.druid.metadata.MetadataRuleManager;
 import org.apache.druid.metadata.SegmentsMetadataManager;
 import org.apache.druid.segment.TestHelper;
@@ -91,19 +89,16 @@ public class CuratorDruidCoordinatorTest extends CuratorTestBase
   private DataSourcesSnapshot dataSourcesSnapshot;
   private DruidCoordinatorRuntimeParams coordinatorRuntimeParams;
 
-  private ScheduledExecutorFactory scheduledExecutorFactory;
   private ConcurrentMap<String, LoadQueuePeon> loadManagementPeons;
   private LoadQueuePeon sourceLoadQueuePeon;
   private LoadQueuePeon destinationLoadQueuePeon;
-  private MetadataRuleManager metadataRuleManager;
   private CountDownLatch leaderAnnouncerLatch;
   private CountDownLatch leaderUnannouncerLatch;
   private PathChildrenCache sourceLoadQueueChildrenCache;
   private PathChildrenCache destinationLoadQueueChildrenCache;
   private DruidCoordinatorConfig druidCoordinatorConfig;
-  private ObjectMapper objectMapper;
   private JacksonConfigManager configManager;
-  private DruidNode druidNode;
+
   private static final String SEGPATH = "/druid/segments";
   private static final String SOURCE_LOAD_PATH = "/druid/loadQueue/localhost:1";
   private static final String DESTINATION_LOAD_PATH = "/druid/loadQueue/localhost:2";
@@ -123,8 +118,8 @@ public class CuratorDruidCoordinatorTest extends CuratorTestBase
   private final ObjectMapper jsonMapper;
   private final ZkPathsConfig zkPathsConfig;
 
-  private ScheduledExecutorService peonExec = Execs.scheduledSingleThreaded("Master-PeonExec--%d");
-  private ExecutorService callbackExec = Execs.multiThreaded(4, "LoadQueuePeon-callbackexec--%d");
+  private final ScheduledExecutorService peonExec = Execs.scheduledSingleThreaded("Master-PeonExec--%d");
+  private final ExecutorService callbackExec = Execs.multiThreaded(4, "LoadQueuePeon-callbackexec--%d");
 
   public CuratorDruidCoordinatorTest()
   {
@@ -139,7 +134,6 @@ public class CuratorDruidCoordinatorTest extends CuratorTestBase
     dataSourcesSnapshot = EasyMock.createNiceMock(DataSourcesSnapshot.class);
     coordinatorRuntimeParams = EasyMock.createNiceMock(DruidCoordinatorRuntimeParams.class);
 
-    metadataRuleManager = EasyMock.createNiceMock(MetadataRuleManager.class);
     configManager = EasyMock.createNiceMock(JacksonConfigManager.class);
     EasyMock.expect(
         configManager.watch(
@@ -154,7 +148,7 @@ public class CuratorDruidCoordinatorTest extends CuratorTestBase
             EasyMock.anyObject(Class.class),
             EasyMock.anyObject()
         )
-    ).andReturn(new AtomicReference(CoordinatorCompactionConfig.empty())).anyTimes();
+    ).andReturn(new AtomicReference<>(CoordinatorCompactionConfig.empty())).anyTimes();
     EasyMock.replay(configManager);
 
     setupServerAndCurator();
@@ -164,7 +158,7 @@ public class CuratorDruidCoordinatorTest extends CuratorTestBase
     curator.create().creatingParentsIfNeeded().forPath(SOURCE_LOAD_PATH);
     curator.create().creatingParentsIfNeeded().forPath(DESTINATION_LOAD_PATH);
 
-    objectMapper = new DefaultObjectMapper();
+    final ObjectMapper objectMapper = new DefaultObjectMapper();
     druidCoordinatorConfig = new TestDruidCoordinatorConfig.Builder()
         .withCoordinatorStartDelay(new Duration(COORDINATOR_START_DELAY))
         .withCoordinatorPeriod(new Duration(COORDINATOR_PERIOD))
@@ -203,57 +197,9 @@ public class CuratorDruidCoordinatorTest extends CuratorTestBase
         callbackExec,
         druidCoordinatorConfig
     );
-    druidNode = new DruidNode("hey", "what", false, 1234, null, true, false);
     loadManagementPeons = new ConcurrentHashMap<>();
-    scheduledExecutorFactory = (corePoolSize, nameFormat) -> Executors.newSingleThreadScheduledExecutor();
     leaderAnnouncerLatch = new CountDownLatch(1);
     leaderUnannouncerLatch = new CountDownLatch(1);
-    coordinator = new DruidCoordinator(
-        druidCoordinatorConfig,
-        new ZkPathsConfig()
-        {
-
-          @Override
-          public String getBase()
-          {
-            return "druid";
-          }
-        },
-        configManager,
-        segmentsMetadataManager,
-        baseView,
-        metadataRuleManager,
-        () -> curator,
-        new NoopServiceEmitter(),
-        scheduledExecutorFactory,
-        null,
-        null,
-        new NoopServiceAnnouncer()
-        {
-          @Override
-          public void announce(DruidNode node)
-          {
-            // count down when this coordinator becomes the leader
-            leaderAnnouncerLatch.countDown();
-          }
-
-          @Override
-          public void unannounce(DruidNode node)
-          {
-            leaderUnannouncerLatch.countDown();
-          }
-        },
-        druidNode,
-        loadManagementPeons,
-        null,
-        null,
-        new CoordinatorCustomDutyGroups(ImmutableSet.of()),
-        new CostBalancerStrategyFactory(),
-        EasyMock.createNiceMock(LookupCoordinatorManager.class),
-        new TestDruidLeaderSelector(),
-        null,
-        ZkEnablementConfig.ENABLED
-    );
   }
 
   @After
@@ -286,8 +232,6 @@ public class CuratorDruidCoordinatorTest extends CuratorTestBase
 
     segmentRemovedLatch = new CountDownLatch(0);
 
-    CountDownLatch destCountdown = new CountDownLatch(1);
-    CountDownLatch srcCountdown = new CountDownLatch(1);
     setupView();
 
     DruidServer source = new DruidServer(
@@ -347,6 +291,7 @@ public class CuratorDruidCoordinatorTest extends CuratorTestBase
     // these child watchers are used to simulate actions of historicals, announcing a segment on noticing a load queue
     // for the destination and unannouncing from source server when noticing a drop request
 
+    CountDownLatch srcCountdown = new CountDownLatch(1);
     sourceLoadQueueChildrenCache.getListenable().addListener(
         (CuratorFramework curatorFramework, PathChildrenCacheEvent event) -> {
           if (event.getType().equals(PathChildrenCacheEvent.Type.INITIALIZED)) {
@@ -358,6 +303,7 @@ public class CuratorDruidCoordinatorTest extends CuratorTestBase
         }
     );
 
+    CountDownLatch destCountdown = new CountDownLatch(1);
     destinationLoadQueueChildrenCache.getListenable().addListener(
         (CuratorFramework curatorFramework, PathChildrenCacheEvent event) -> {
           if (event.getType().equals(PathChildrenCacheEvent.Type.INITIALIZED)) {
@@ -393,6 +339,7 @@ public class CuratorDruidCoordinatorTest extends CuratorTestBase
 
     EasyMock.expect(dataSourcesSnapshot.getDataSource(EasyMock.anyString())).andReturn(druidDataSource).anyTimes();
     EasyMock.replay(dataSourcesSnapshot);
+
     coordinator.moveSegment(
         coordinatorRuntimeParams,
         source.toImmutableDruidServer(),
@@ -510,42 +457,22 @@ public class CuratorDruidCoordinatorTest extends CuratorTestBase
     sourceLoadQueuePeon.start();
     destinationLoadQueuePeon.start();
 
+    final LoadQueueTaskMaster loadQueueTaskMaster = EasyMock.createMock(LoadQueueTaskMaster.class);
+    EasyMock.expect(loadQueueTaskMaster.isHttpLoading()).andReturn(false).anyTimes();
+    EasyMock.replay(loadQueueTaskMaster);
+
     coordinator = new DruidCoordinator(
         druidCoordinatorConfig,
-        new ZkPathsConfig()
-        {
-
-          @Override
-          public String getBase()
-          {
-            return "druid";
-          }
-        },
         configManager,
         segmentsMetadataManager,
         baseView,
-        metadataRuleManager,
-        () -> curator,
+        EasyMock.createNiceMock(MetadataRuleManager.class),
         new NoopServiceEmitter(),
-        scheduledExecutorFactory,
+        (corePoolSize, nameFormat) -> Executors.newSingleThreadScheduledExecutor(),
         null,
-        null,
-        new NoopServiceAnnouncer()
-        {
-          @Override
-          public void announce(DruidNode node)
-          {
-            // count down when this coordinator becomes the leader
-            leaderAnnouncerLatch.countDown();
-          }
-
-          @Override
-          public void unannounce(DruidNode node)
-          {
-            leaderUnannouncerLatch.countDown();
-          }
-        },
-        druidNode,
+        loadQueueTaskMaster,
+        new LatchableServiceAnnouncer(leaderAnnouncerLatch, leaderUnannouncerLatch),
+        new DruidNode("hey", "what", false, 1234, null, true, false),
         loadManagementPeons,
         null,
         null,
@@ -553,8 +480,7 @@ public class CuratorDruidCoordinatorTest extends CuratorTestBase
         new CostBalancerStrategyFactory(),
         EasyMock.createNiceMock(LookupCoordinatorManager.class),
         new TestDruidLeaderSelector(),
-        null,
-        ZkEnablementConfig.ENABLED
+        null
     );
   }
 
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
index 7b2840d0e2..980c336aa2 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
@@ -38,8 +38,7 @@ import org.apache.druid.client.ImmutableDruidServer;
 import org.apache.druid.common.config.JacksonConfigManager;
 import org.apache.druid.curator.CuratorTestBase;
 import org.apache.druid.curator.CuratorUtils;
-import org.apache.druid.curator.ZkEnablementConfig;
-import org.apache.druid.curator.discovery.NoopServiceAnnouncer;
+import org.apache.druid.curator.discovery.LatchableServiceAnnouncer;
 import org.apache.druid.discovery.DruidLeaderSelector;
 import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.java.util.common.Intervals;
@@ -64,7 +63,6 @@ import org.apache.druid.server.coordinator.rules.ForeverBroadcastDistributionRul
 import org.apache.druid.server.coordinator.rules.ForeverLoadRule;
 import org.apache.druid.server.coordinator.rules.IntervalLoadRule;
 import org.apache.druid.server.coordinator.rules.Rule;
-import org.apache.druid.server.initialization.ZkPathsConfig;
 import org.apache.druid.server.lookup.cache.LookupCoordinatorManager;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.SegmentId;
@@ -85,7 +83,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
@@ -106,6 +103,7 @@ public class DruidCoordinatorTest extends CuratorTestBase
   private DruidServer druidServer;
   private ConcurrentMap<String, LoadQueuePeon> loadManagementPeons;
   private LoadQueuePeon loadQueuePeon;
+  private LoadQueueTaskMaster loadQueueTaskMaster;
   private MetadataRuleManager metadataRuleManager;
   private CountDownLatch leaderAnnouncerLatch;
   private CountDownLatch leaderUnannouncerLatch;
@@ -113,7 +111,7 @@ public class DruidCoordinatorTest extends CuratorTestBase
   private DruidCoordinatorConfig druidCoordinatorConfig;
   private ObjectMapper objectMapper;
   private DruidNode druidNode;
-  private LatchableServiceEmitter serviceEmitter = new LatchableServiceEmitter();
+  private final LatchableServiceEmitter serviceEmitter = new LatchableServiceEmitter();
 
   @Before
   public void setUp() throws Exception
@@ -124,6 +122,7 @@ public class DruidCoordinatorTest extends CuratorTestBase
     dataSourcesSnapshot = EasyMock.createNiceMock(DataSourcesSnapshot.class);
     coordinatorRuntimeParams = EasyMock.createNiceMock(DruidCoordinatorRuntimeParams.class);
     metadataRuleManager = EasyMock.createNiceMock(MetadataRuleManager.class);
+    loadQueueTaskMaster = EasyMock.createMock(LoadQueueTaskMaster.class);
     JacksonConfigManager configManager = EasyMock.createNiceMock(JacksonConfigManager.class);
     EasyMock.expect(
         configManager.watch(
@@ -131,14 +130,14 @@ public class DruidCoordinatorTest extends CuratorTestBase
             EasyMock.anyObject(Class.class),
             EasyMock.anyObject()
         )
-    ).andReturn(new AtomicReference(CoordinatorDynamicConfig.builder().build())).anyTimes();
+    ).andReturn(new AtomicReference<>(CoordinatorDynamicConfig.builder().build())).anyTimes();
     EasyMock.expect(
         configManager.watch(
             EasyMock.eq(CoordinatorCompactionConfig.CONFIG_KEY),
             EasyMock.anyObject(Class.class),
             EasyMock.anyObject()
         )
-    ).andReturn(new AtomicReference(CoordinatorCompactionConfig.empty())).anyTimes();
+    ).andReturn(new AtomicReference<>(CoordinatorCompactionConfig.empty())).anyTimes();
     EasyMock.replay(configManager);
     setupServerAndCurator();
     curator.start();
@@ -170,51 +169,20 @@ public class DruidCoordinatorTest extends CuratorTestBase
     loadQueuePeon.start();
     druidNode = new DruidNode("hey", "what", false, 1234, null, true, false);
     loadManagementPeons = new ConcurrentHashMap<>();
-    scheduledExecutorFactory = new ScheduledExecutorFactory()
-    {
-      @Override
-      public ScheduledExecutorService create(int corePoolSize, final String nameFormat)
-      {
-        return Executors.newSingleThreadScheduledExecutor();
-      }
-    };
+    scheduledExecutorFactory = (corePoolSize, nameFormat) -> Executors.newSingleThreadScheduledExecutor();
     leaderAnnouncerLatch = new CountDownLatch(1);
     leaderUnannouncerLatch = new CountDownLatch(1);
     coordinator = new DruidCoordinator(
         druidCoordinatorConfig,
-        new ZkPathsConfig()
-        {
-
-          @Override
-          public String getBase()
-          {
-            return "druid";
-          }
-        },
         configManager,
         segmentsMetadataManager,
         serverInventoryView,
         metadataRuleManager,
-        () -> curator,
         serviceEmitter,
         scheduledExecutorFactory,
         null,
-        null,
-        new NoopServiceAnnouncer()
-        {
-          @Override
-          public void announce(DruidNode node)
-          {
-            // count down when this coordinator becomes the leader
-            leaderAnnouncerLatch.countDown();
-          }
-
-          @Override
-          public void unannounce(DruidNode node)
-          {
-            leaderUnannouncerLatch.countDown();
-          }
-        },
+        loadQueueTaskMaster,
+        new LatchableServiceAnnouncer(leaderAnnouncerLatch, leaderUnannouncerLatch),
         druidNode,
         loadManagementPeons,
         null,
@@ -223,8 +191,7 @@ public class DruidCoordinatorTest extends CuratorTestBase
         new CostBalancerStrategyFactory(),
         EasyMock.createNiceMock(LookupCoordinatorManager.class),
         new TestDruidLeaderSelector(),
-        null,
-        ZkEnablementConfig.ENABLED
+        null
     );
   }
 
@@ -302,6 +269,9 @@ public class DruidCoordinatorTest extends CuratorTestBase
     EasyMock.expect(serverInventoryView.isSegmentLoadedByServer("to", segment)).andReturn(true).once();
     EasyMock.replay(serverInventoryView);
 
+    EasyMock.expect(loadQueueTaskMaster.isHttpLoading()).andReturn(false).anyTimes();
+    EasyMock.replay(loadQueueTaskMaster);
+
     mockCoordinatorRuntimeParams();
 
     coordinator.moveSegment(
@@ -313,10 +283,10 @@ public class DruidCoordinatorTest extends CuratorTestBase
     );
 
     LoadPeonCallback loadCallback = loadCallbackCapture.getValue();
-    loadCallback.execute();
+    loadCallback.execute(true);
 
     LoadPeonCallback dropCallback = dropCallbackCapture.getValue();
-    dropCallback.execute();
+    dropCallback.execute(true);
 
     EasyMock.verify(druidServer, druidServer2, loadQueuePeon, serverInventoryView, metadataRuleManager);
     EasyMock.verify(coordinatorRuntimeParams);
@@ -729,12 +699,10 @@ public class DruidCoordinatorTest extends CuratorTestBase
 
     DruidCoordinator c = new DruidCoordinator(
         druidCoordinatorConfig,
-        null,
         configManager,
         null,
         null,
         null,
-        () -> null,
         null,
         scheduledExecutorFactory,
         null,
@@ -747,8 +715,7 @@ public class DruidCoordinatorTest extends CuratorTestBase
         null,
         null,
         null,
-        null,
-        ZkEnablementConfig.ENABLED
+        null
     );
 
     DruidCoordinator.DutiesRunnable duty = c.new DutiesRunnable(Collections.emptyList(), 0, "TEST");
@@ -785,39 +752,15 @@ public class DruidCoordinatorTest extends CuratorTestBase
     CoordinatorCustomDutyGroups emptyCustomDutyGroups = new CoordinatorCustomDutyGroups(ImmutableSet.of());
     coordinator = new DruidCoordinator(
         druidCoordinatorConfig,
-        new ZkPathsConfig()
-        {
-
-          @Override
-          public String getBase()
-          {
-            return "druid";
-          }
-        },
         null,
         segmentsMetadataManager,
         serverInventoryView,
         metadataRuleManager,
-        () -> curator,
         serviceEmitter,
         scheduledExecutorFactory,
         null,
         null,
-        new NoopServiceAnnouncer()
-        {
-          @Override
-          public void announce(DruidNode node)
-          {
-            // count down when this coordinator becomes the leader
-            leaderAnnouncerLatch.countDown();
-          }
-
-          @Override
-          public void unannounce(DruidNode node)
-          {
-            leaderUnannouncerLatch.countDown();
-          }
-        },
+        new LatchableServiceAnnouncer(leaderAnnouncerLatch, leaderUnannouncerLatch),
         druidNode,
         loadManagementPeons,
         ImmutableSet.of(),
@@ -826,8 +769,7 @@ public class DruidCoordinatorTest extends CuratorTestBase
         new CostBalancerStrategyFactory(),
         EasyMock.createNiceMock(LookupCoordinatorManager.class),
         new TestDruidLeaderSelector(),
-        null,
-        ZkEnablementConfig.ENABLED
+        null
     );
     // Since CompactSegments is not enabled in Custom Duty Group, then CompactSegments must be created in IndexingServiceDuties
     List<CoordinatorDuty> indexingDuties = coordinator.makeIndexingServiceDuties();
@@ -850,39 +792,15 @@ public class DruidCoordinatorTest extends CuratorTestBase
     CoordinatorCustomDutyGroups customDutyGroups = new CoordinatorCustomDutyGroups(ImmutableSet.of(group));
     coordinator = new DruidCoordinator(
         druidCoordinatorConfig,
-        new ZkPathsConfig()
-        {
-
-          @Override
-          public String getBase()
-          {
-            return "druid";
-          }
-        },
         null,
         segmentsMetadataManager,
         serverInventoryView,
         metadataRuleManager,
-        () -> curator,
         serviceEmitter,
         scheduledExecutorFactory,
         null,
         null,
-        new NoopServiceAnnouncer()
-        {
-          @Override
-          public void announce(DruidNode node)
-          {
-            // count down when this coordinator becomes the leader
-            leaderAnnouncerLatch.countDown();
-          }
-
-          @Override
-          public void unannounce(DruidNode node)
-          {
-            leaderUnannouncerLatch.countDown();
-          }
-        },
+        new LatchableServiceAnnouncer(leaderAnnouncerLatch, leaderUnannouncerLatch),
         druidNode,
         loadManagementPeons,
         ImmutableSet.of(),
@@ -891,8 +809,7 @@ public class DruidCoordinatorTest extends CuratorTestBase
         new CostBalancerStrategyFactory(),
         EasyMock.createNiceMock(LookupCoordinatorManager.class),
         new TestDruidLeaderSelector(),
-        null,
-        ZkEnablementConfig.ENABLED
+        null
     );
     // Since CompactSegments is not enabled in Custom Duty Group, then CompactSegments must be created in IndexingServiceDuties
     List<CoordinatorDuty> indexingDuties = coordinator.makeIndexingServiceDuties();
@@ -924,39 +841,15 @@ public class DruidCoordinatorTest extends CuratorTestBase
     CoordinatorCustomDutyGroups customDutyGroups = new CoordinatorCustomDutyGroups(ImmutableSet.of(compactSegmentCustomGroup));
     coordinator = new DruidCoordinator(
         druidCoordinatorConfig,
-        new ZkPathsConfig()
-        {
-
-          @Override
-          public String getBase()
-          {
-            return "druid";
-          }
-        },
         null,
         segmentsMetadataManager,
         serverInventoryView,
         metadataRuleManager,
-        () -> curator,
         serviceEmitter,
         scheduledExecutorFactory,
         null,
         null,
-        new NoopServiceAnnouncer()
-        {
-          @Override
-          public void announce(DruidNode node)
-          {
-            // count down when this coordinator becomes the leader
-            leaderAnnouncerLatch.countDown();
-          }
-
-          @Override
-          public void unannounce(DruidNode node)
-          {
-            leaderUnannouncerLatch.countDown();
-          }
-        },
+        new LatchableServiceAnnouncer(leaderAnnouncerLatch, leaderUnannouncerLatch),
         druidNode,
         loadManagementPeons,
         ImmutableSet.of(),
@@ -965,8 +858,7 @@ public class DruidCoordinatorTest extends CuratorTestBase
         new CostBalancerStrategyFactory(),
         EasyMock.createNiceMock(LookupCoordinatorManager.class),
         new TestDruidLeaderSelector(),
-        null,
-        ZkEnablementConfig.ENABLED
+        null
     );
     // Since CompactSegments is enabled in Custom Duty Group, then CompactSegments must not be created in IndexingServiceDuties
     List<CoordinatorDuty> indexingDuties = coordinator.makeIndexingServiceDuties();
@@ -1058,39 +950,15 @@ public class DruidCoordinatorTest extends CuratorTestBase
 
     coordinator = new DruidCoordinator(
         druidCoordinatorConfig,
-        new ZkPathsConfig()
-        {
-
-          @Override
-          public String getBase()
-          {
-            return "druid";
-          }
-        },
         configManager,
         segmentsMetadataManager,
         serverInventoryView,
         metadataRuleManager,
-        () -> curator,
         serviceEmitter,
         scheduledExecutorFactory,
         null,
         null,
-        new NoopServiceAnnouncer()
-        {
-          @Override
-          public void announce(DruidNode node)
-          {
-            // count down when this coordinator becomes the leader
-            leaderAnnouncerLatch.countDown();
-          }
-
-          @Override
-          public void unannounce(DruidNode node)
-          {
-            leaderUnannouncerLatch.countDown();
-          }
-        },
+        new LatchableServiceAnnouncer(leaderAnnouncerLatch, leaderUnannouncerLatch),
         druidNode,
         loadManagementPeons,
         null,
@@ -1099,8 +967,7 @@ public class DruidCoordinatorTest extends CuratorTestBase
         new CostBalancerStrategyFactory(),
         EasyMock.createNiceMock(LookupCoordinatorManager.class),
         new TestDruidLeaderSelector(),
-        null,
-        ZkEnablementConfig.ENABLED
+        null
     );
     coordinator.start();
 
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/HttpLoadQueuePeonTest.java b/server/src/test/java/org/apache/druid/server/coordinator/HttpLoadQueuePeonTest.java
index e270fa1b1d..a1a1f06e05 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/HttpLoadQueuePeonTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/HttpLoadQueuePeonTest.java
@@ -104,10 +104,10 @@ public class HttpLoadQueuePeonTest
         segment4.getId(), new CountDownLatch(1)
     );
 
-    httpLoadQueuePeon.dropSegment(segment1, () -> latches.get(segment1.getId()).countDown());
-    httpLoadQueuePeon.loadSegment(segment2, () -> latches.get(segment2.getId()).countDown());
-    httpLoadQueuePeon.dropSegment(segment3, () -> latches.get(segment3.getId()).countDown());
-    httpLoadQueuePeon.loadSegment(segment4, () -> latches.get(segment4.getId()).countDown());
+    httpLoadQueuePeon.dropSegment(segment1, success -> latches.get(segment1.getId()).countDown());
+    httpLoadQueuePeon.loadSegment(segment2, success -> latches.get(segment2.getId()).countDown());
+    httpLoadQueuePeon.dropSegment(segment3, success -> latches.get(segment3.getId()).countDown());
+    httpLoadQueuePeon.loadSegment(segment4, success -> latches.get(segment4.getId()).countDown());
 
     latches.get(segment1.getId()).await();
     latches.get(segment2.getId()).await();
@@ -141,13 +141,13 @@ public class HttpLoadQueuePeonTest
         segment4.getId(), new CountDownLatch(1)
     );
 
-    httpLoadQueuePeon.dropSegment(segment1, () -> latches.get(segment1.getId()).countDown());
-    httpLoadQueuePeon.loadSegment(segment2, () -> latches.get(segment2.getId()).countDown());
+    httpLoadQueuePeon.dropSegment(segment1, success -> latches.get(segment1.getId()).countDown());
+    httpLoadQueuePeon.loadSegment(segment2, success -> latches.get(segment2.getId()).countDown());
     latches.get(segment1.getId()).await();
     latches.get(segment2.getId()).await();
     httpLoadQueuePeon.stop();
-    httpLoadQueuePeon.dropSegment(segment3, () -> latches.get(segment3.getId()).countDown());
-    httpLoadQueuePeon.loadSegment(segment4, () -> latches.get(segment4.getId()).countDown());
+    httpLoadQueuePeon.dropSegment(segment3, success -> latches.get(segment3.getId()).countDown());
+    httpLoadQueuePeon.loadSegment(segment4, success -> latches.get(segment4.getId()).countDown());
     latches.get(segment3.getId()).await();
     latches.get(segment4.getId()).await();
 
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTest.java b/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTest.java
index 3bb8fcda25..6410f9e5ef 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/LoadQueuePeonTest.java
@@ -206,14 +206,14 @@ public class LoadQueuePeonTest extends CuratorTestBase
     for (final DataSegment segment : segmentToDrop) {
       loadQueuePeon.dropSegment(
           segment,
-          () -> segmentDroppedSignals.get(segment.getId()).countDown()
+          success -> segmentDroppedSignals.get(segment.getId()).countDown()
       );
     }
 
     for (final DataSegment segment : segmentToLoad) {
       loadQueuePeon.loadSegment(
           segment,
-          () -> segmentLoadedSignals.get(segment.getId()).countDown()
+          success -> segmentLoadedSignals.get(segment.getId()).countDown()
       );
     }
 
@@ -292,14 +292,7 @@ public class LoadQueuePeonTest extends CuratorTestBase
 
     loadQueuePeon.loadSegment(
         segment,
-        new LoadPeonCallback()
-        {
-          @Override
-          public void execute()
-          {
-            segmentLoadedSignal.countDown();
-          }
-        }
+        success -> segmentLoadedSignal.countDown()
     );
 
     Assert.assertTrue(timing.forWaiting().awaitLatch(segmentLoadedSignal));
@@ -359,14 +352,9 @@ public class LoadQueuePeonTest extends CuratorTestBase
 
     loadQueuePeon.loadSegment(
         segment,
-        new LoadPeonCallback()
-        {
-          @Override
-          public void execute()
-          {
-            segmentLoadedSignal.countDown();
-            delayedSegmentLoadedSignal.countDown();
-          }
+        success -> {
+          segmentLoadedSignal.countDown();
+          delayedSegmentLoadedSignal.countDown();
         }
     );
 
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulation.java b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulation.java
index 6822419f79..0c2af979bb 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulation.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulation.java
@@ -98,5 +98,10 @@ public interface CoordinatorSimulation
      * callbacks on the coordinator.
      */
     void loadQueuedSegments();
+
+    /**
+     * Removes the specified server from the cluster.
+     */
+    void removeServer(DruidServer server);
   }
 }
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBaseTest.java b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBaseTest.java
index 60e8e42824..ee0ed6cc26 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBaseTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBaseTest.java
@@ -123,6 +123,12 @@ public abstract class CoordinatorSimulationBaseTest
     sim.cluster().loadQueuedSegments();
   }
 
+  @Override
+  public void removeServer(DruidServer server)
+  {
+    sim.cluster().removeServer(server);
+  }
+
   @Override
   public double getLoadPercentage(String datasource)
   {
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java
index 29301ea033..a5e02e9898 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java
@@ -24,7 +24,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
 import org.apache.druid.client.DruidServer;
 import org.apache.druid.common.config.JacksonConfigManager;
-import org.apache.druid.curator.ZkEnablementConfig;
 import org.apache.druid.curator.discovery.ServiceAnnouncer;
 import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.java.util.common.ISE;
@@ -47,7 +46,6 @@ import org.apache.druid.server.coordinator.LoadQueueTaskMaster;
 import org.apache.druid.server.coordinator.TestDruidCoordinatorConfig;
 import org.apache.druid.server.coordinator.duty.CoordinatorCustomDutyGroups;
 import org.apache.druid.server.coordinator.rules.Rule;
-import org.apache.druid.server.initialization.ZkPathsConfig;
 import org.apache.druid.server.lookup.cache.LookupCoordinatorManager;
 import org.apache.druid.timeline.DataSegment;
 import org.easymock.EasyMock;
@@ -198,12 +196,10 @@ public class CoordinatorSimulationBuilder
     // Build the coordinator
     final DruidCoordinator coordinator = new DruidCoordinator(
         env.coordinatorConfig,
-        new ZkPathsConfig(),
         env.jacksonConfigManager,
         env.segmentManager,
         env.coordinatorInventoryView,
         env.ruleManager,
-        () -> null,
         env.serviceEmitter,
         env.executorFactory,
         null,
@@ -217,8 +213,7 @@ public class CoordinatorSimulationBuilder
                                         : new CostBalancerStrategyFactory(),
         env.lookupCoordinatorManager,
         env.leaderSelector,
-        OBJECT_MAPPER,
-        ZkEnablementConfig.ENABLED
+        OBJECT_MAPPER
     );
 
     return new SimulationImpl(coordinator, env);
@@ -309,7 +304,7 @@ public class CoordinatorSimulationBuilder
           !env.autoSyncInventory,
           "Cannot invoke syncInventoryView as simulation is running in auto-sync mode."
       );
-      env.coordinatorInventoryView.sync(env.historicalInventoryView);
+      env.coordinatorInventoryView.sync(env.inventory);
     }
 
     @Override
@@ -346,6 +341,12 @@ public class CoordinatorSimulationBuilder
       }
     }
 
+    @Override
+    public void removeServer(DruidServer server)
+    {
+      env.inventory.removeServer(server);
+    }
+
     private void verifySimulationRunning()
     {
       if (!running.get()) {
@@ -372,24 +373,35 @@ public class CoordinatorSimulationBuilder
   private static class Environment
   {
     private final Lifecycle lifecycle = new Lifecycle("coord-sim");
+    private final StubServiceEmitter serviceEmitter
+        = new StubServiceEmitter("coordinator", "coordinator");
+    private final AtomicReference<CoordinatorDynamicConfig> dynamicConfig
+        = new AtomicReference<>();
+    private final TestDruidLeaderSelector leaderSelector
+        = new TestDruidLeaderSelector();
 
-    // Executors
     private final ExecutorFactory executorFactory;
-
-    private final TestDruidLeaderSelector leaderSelector = new TestDruidLeaderSelector();
     private final TestSegmentsMetadataManager segmentManager;
     private final TestMetadataRuleManager ruleManager;
-    private final TestServerInventoryView historicalInventoryView;
 
     private final LoadQueueTaskMaster loadQueueTaskMaster;
-    private final StubServiceEmitter serviceEmitter
-        = new StubServiceEmitter("coordinator", "coordinator");
+
+    /**
+     * Represents the current inventory of all servers (typically historicals)
+     * actually present in the cluster.
+     */
+    private final TestServerInventoryView inventory;
+
+    /**
+     * Represents the view of the cluster inventory as seen by the coordinator.
+     * When {@code autoSyncInventory=true}, this is the same as {@link #inventory}.
+     */
     private final TestServerInventoryView coordinatorInventoryView;
 
-    private final AtomicReference<CoordinatorDynamicConfig> dynamicConfig = new AtomicReference<>();
     private final JacksonConfigManager jacksonConfigManager;
     private final LookupCoordinatorManager lookupCoordinatorManager;
     private final DruidCoordinatorConfig coordinatorConfig;
+
     private final boolean loadImmediately;
     private final boolean autoSyncInventory;
 
@@ -404,7 +416,7 @@ public class CoordinatorSimulationBuilder
         boolean autoSyncInventory
     )
     {
-      this.historicalInventoryView = clusterInventory;
+      this.inventory = clusterInventory;
       this.segmentManager = segmentManager;
       this.ruleManager = ruleManager;
       this.loadImmediately = loadImmediately;
@@ -450,7 +462,7 @@ public class CoordinatorSimulationBuilder
     private void setUp() throws Exception
     {
       EmittingLogger.registerEmitter(serviceEmitter);
-      historicalInventoryView.setUp();
+      inventory.setUp();
       coordinatorInventoryView.setUp();
       lifecycle.start();
       executorFactory.setUp();
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/README.md b/server/src/test/java/org/apache/druid/server/coordinator/simulate/README.md
index a1562c5187..16c7ee185b 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/simulate/README.md
+++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/README.md
@@ -31,7 +31,7 @@ the underlying parts. In that regard, these simulations resemble integration tes
 The primary test target is the `DruidCoordinator` itself. The behaviour of the following entities can also be verified
 using simulations:
 
-- `LoadQueuePeon`, `LoadQueueTaskMaster`
+- `HttpLoadQueuePeon`, `LoadQueueTaskMaster`
 - All coordinator duties, e.g. `BalanceSegments`, `RunRules`
 - All retention rules
 
@@ -63,6 +63,7 @@ of the coordinator in these situations.
    interfaces to communicate with external dependencies have been provided as simple in-memory implementations:
     - communication with metadata store: `SegmentMetadataManager`, `MetadataRuleManager`
     - communication with historicals: `HttpClient`, `ServerInventoryView`
+    - `CuratorFramework`: provided as a mock as simulations of `CuratorLoadQueuePeon` are not supported yet
 4. __Inventory__: The coordinator maintains an inventory view of the cluster state. Simulations can choose from two
    modes of inventory update - auto and manual. In auto update mode, any change made to the cluster is immediately
    reflected in the inventory view. In manual update mode, the inventory must be explicitly synchronized with the
@@ -74,6 +75,7 @@ of the coordinator in these situations.
 - It should not be used to verify the absolute values of execution latencies, e.g. the time taken to compute the
   balancing cost of a segment. But the relative values can still be a good indicator while doing comparisons between,
   say two balancing strategies.
+- It does not support simulation of the zk-based `CuratorLoadQueuePeon`.
 
 ## Usage
 
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/SegmentBalancingTest.java b/server/src/test/java/org/apache/druid/server/coordinator/simulate/SegmentBalancingTest.java
index 77b9820a95..a2c4ef6422 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/simulate/SegmentBalancingTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/SegmentBalancingTest.java
@@ -48,10 +48,21 @@ public class SegmentBalancingTest extends CoordinatorSimulationBaseTest
   }
 
   @Test
-  public void testBalancingWithSyncedInventory()
+  public void testBalancingDoesNotOverReplicate()
   {
-    // maxSegmentsToMove = 10, unlimited load queue, replicationThrottleLimit = 10
-    CoordinatorDynamicConfig dynamicConfig = createDynamicConfig(10, 0, 10);
+    testBalancingWithAutoSyncInventory(true);
+  }
+
+  @Test
+  public void testBalancingWithStaleViewDoesNotOverReplicate()
+  {
+    testBalancingWithAutoSyncInventory(false);
+  }
+
+  private void testBalancingWithAutoSyncInventory(boolean autoSyncInventory)
+  {
+    // maxSegmentsToMove = 10, unlimited load queue, no replication
+    CoordinatorDynamicConfig dynamicConfig = createDynamicConfig(10, 0, 0);
 
     // historicals = 2(T1), replicas = 1(T1)
     final CoordinatorSimulation sim =
@@ -60,13 +71,16 @@ public class SegmentBalancingTest extends CoordinatorSimulationBaseTest
                              .withServers(historicalT11, historicalT12)
                              .withRules(datasource, Load.on(Tier.T1, 1).forever())
                              .withDynamicConfig(dynamicConfig)
-                             .withAutoInventorySync(true)
+                             .withAutoInventorySync(autoSyncInventory)
                              .build();
 
     // Put all the segments on histT11
     segments.forEach(historicalT11::addDataSegment);
 
     startSimulation(sim);
+    if (!autoSyncInventory) {
+      syncInventoryView();
+    }
     runCoordinatorCycle();
 
     // Verify that segments have been chosen for balancing
@@ -80,6 +94,39 @@ public class SegmentBalancingTest extends CoordinatorSimulationBaseTest
     verifyDatasourceIsFullyLoaded(datasource);
   }
 
+  @Test
+  public void testDropDoesNotHappenWhenLoadFails()
+  {
+    // maxSegmentsToMove = 10, unlimited load queue, no replication
+    CoordinatorDynamicConfig dynamicConfig = createDynamicConfig(10, 0, 0);
+
+    // historicals = 2(T1), replicas = 1(T1)
+    final CoordinatorSimulation sim =
+        CoordinatorSimulation.builder()
+                             .withSegments(segments)
+                             .withServers(historicalT11, historicalT12)
+                             .withRules(datasource, Load.on(Tier.T1, 1).forever())
+                             .withDynamicConfig(dynamicConfig)
+                             .build();
+
+    // Put all the segments on histT11
+    segments.forEach(historicalT11::addDataSegment);
+
+    startSimulation(sim);
+    runCoordinatorCycle();
+
+    // Verify that segments have been chosen for balancing
+    verifyValue(Metric.MOVED_COUNT, 5L);
+
+    removeServer(historicalT12);
+    loadQueuedSegments();
+
+    // Verify that no segment has been loaded or dropped
+    Assert.assertEquals(10, historicalT11.getTotalSegments());
+    Assert.assertEquals(0, historicalT12.getTotalSegments());
+    verifyDatasourceIsFullyLoaded(datasource);
+  }
+
   @Test
   public void testBalancingOfFullyReplicatedSegment()
   {
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/SegmentLoadingNegativeTest.java b/server/src/test/java/org/apache/druid/server/coordinator/simulate/SegmentLoadingNegativeTest.java
index 52dc7c0933..b118819e3c 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/simulate/SegmentLoadingNegativeTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/SegmentLoadingNegativeTest.java
@@ -217,44 +217,4 @@ public class SegmentLoadingNegativeTest extends CoordinatorSimulationBaseTest
     );
   }
 
-  /**
-   * Correct behaviour: Balancing should never cause over-replication, even when
-   * the inventory view is not updated.
-   * <p>
-   * Fix Apache #12881 to fix this test.
-   */
-  @Test
-  public void testBalancingWithStaleInventoryCausesOverReplication()
-  {
-    // maxSegmentsToMove = 10, unlimited load queue, replicationThrottleLimit = 10
-    CoordinatorDynamicConfig dynamicConfig = createDynamicConfig(10, 0, 10);
-
-    // historicals = 2(T1), replicas = 1(T1)
-    final CoordinatorSimulation sim =
-        CoordinatorSimulation.builder()
-                             .withSegments(segments)
-                             .withServers(historicalT11, historicalT12)
-                             .withRules(datasource, Load.on(Tier.T1, 1).forever())
-                             .withDynamicConfig(dynamicConfig)
-                             .withAutoInventorySync(false)
-                             .build();
-
-    // Put all the segments on histT11
-    segments.forEach(historicalT11::addDataSegment);
-
-    startSimulation(sim);
-    syncInventoryView();
-    runCoordinatorCycle();
-
-    // Verify that segments have been chosen for balancing
-    verifyValue(Metric.MOVED_COUNT, 5L);
-
-    loadQueuedSegments();
-
-    // Verify that segments have now been balanced out
-    Assert.assertEquals(10, historicalT11.getTotalSegments());
-    Assert.assertEquals(5, historicalT12.getTotalSegments());
-    verifyDatasourceIsFullyLoaded(datasource);
-  }
-
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org