You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by kf...@apache.org on 2022/09/21 04:22:06 UTC

[druid] branch master updated: Add test framework to simulate segment loading and balancing (#13074)

This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 0039409817 Add test framework to simulate segment loading and balancing (#13074)
0039409817 is described below

commit 0039409817530bb0eaed0a78549b278e56d51778
Author: Kashif Faraz <ka...@gmail.com>
AuthorDate: Wed Sep 21 09:51:58 2022 +0530

    Add test framework to simulate segment loading and balancing (#13074)
    
    Fixes #12822
    
    The framework added here make it easy to write tests that verify the behaviour and interactions
    of the following entities under various conditions:
    - `DruidCoordinator`
    - `HttpLoadQueuePeon`, `LoadQueueTaskMaster`
    - coordinator duties: `BalanceSegments`, `RunRules`, `UnloadUnusedSegments`, etc.
    - datasource retention rules: `LoadRule`, `DropRule`
    
    Changes:
    Add the following main classes:
    - `CoordinatorSimulation` and related interfaces to dictate behaviour of simulation
    - `CoordinatorSimulationBuilder` to build a simulation.
    - `BlockingExecutorService` to keep submitted tasks in queue and execute them
      only when explicitly invoked.
    
    Add tests:
    - `CoordinatorSimulationBaseTest`, `SegmentLoadingTest`, `SegmentBalancingTest`
    - `SegmentLoadingNegativeTest` to contain tests which assert the existing erroneous behaviour
    of segment loading. Once the behaviour is fixed, these tests will be moved to the regular
    `SegmentLoadingTest`.
    
    Please refer to the README.md in `org.apache.druid.server.coordinator.simulate` for more details
---
 .../java/util/metrics/StubServiceEmitter.java      |  20 +-
 .../druid/server/coordinator/DruidCoordinator.java |   8 +
 .../server/coordinator/duty/BalanceSegments.java   |   1 +
 .../server/coordinator/BalanceSegmentsTest.java    |  22 +-
 .../server/coordinator/CreateDataSegments.java     | 135 +++++
 .../druid/server/coordinator/RunRulesTest.java     |  26 +-
 .../simulate/BlockingExecutorService.java          | 237 +++++++++
 .../simulate/CoordinatorSimulation.java            | 102 ++++
 .../simulate/CoordinatorSimulationBaseTest.java    | 307 ++++++++++++
 .../simulate/CoordinatorSimulationBuilder.java     | 554 +++++++++++++++++++++
 .../druid/server/coordinator/simulate/README.md    | 141 ++++++
 .../coordinator/simulate/SegmentBalancingTest.java | 128 +++++
 .../simulate/SegmentLoadingNegativeTest.java       | 260 ++++++++++
 .../coordinator/simulate/SegmentLoadingTest.java   | 192 +++++++
 .../simulate/TestDruidLeaderSelector.java          |  79 +++
 .../simulate/TestMetadataRuleManager.java          | 111 +++++
 .../simulate/TestSegmentLoadingHttpClient.java     | 167 +++++++
 .../simulate/TestSegmentsMetadataManager.java      | 199 ++++++++
 .../simulate/TestServerInventoryView.java          | 210 ++++++++
 .../simulate/WrappingScheduledExecutorService.java | 240 +++++++++
 20 files changed, 3105 insertions(+), 34 deletions(-)

diff --git a/core/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java b/core/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java
index 38f715f848..653dc8a08a 100644
--- a/core/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java
+++ b/core/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java
@@ -21,13 +21,15 @@ package org.apache.druid.java.util.metrics;
 
 import org.apache.druid.java.util.emitter.core.Event;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
 
 import java.util.ArrayList;
 import java.util.List;
 
 public class StubServiceEmitter extends ServiceEmitter
 {
-  private List<Event> events = new ArrayList<>();
+  private final List<Event> events = new ArrayList<>();
+  private final List<ServiceMetricEvent> metricEvents = new ArrayList<>();
 
   public StubServiceEmitter(String service, String host)
   {
@@ -37,14 +39,28 @@ public class StubServiceEmitter extends ServiceEmitter
   @Override
   public void emit(Event event)
   {
+    if (event instanceof ServiceMetricEvent) {
+      metricEvents.add((ServiceMetricEvent) event);
+    }
     events.add(event);
   }
 
+  /**
+   * Gets all the events emitted since the previous {@link #flush()}.
+   */
   public List<Event> getEvents()
   {
     return events;
   }
 
+  /**
+   * Gets all the metric events emitted since the previous {@link #flush()}.
+   */
+  public List<ServiceMetricEvent> getMetricEvents()
+  {
+    return metricEvents;
+  }
+
   @Override
   public void start()
   {
@@ -53,6 +69,8 @@ public class StubServiceEmitter extends ServiceEmitter
   @Override
   public void flush()
   {
+    events.clear();
+    metricEvents.clear();
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
index 6b1e29d491..df13471aa8 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
@@ -973,6 +973,14 @@ public class DruidCoordinator
     {
       return duties;
     }
+
+    @Override
+    public String toString()
+    {
+      return "DutiesRunnable{" +
+             "dutiesRunnableAlias='" + dutiesRunnableAlias + '\'' +
+             '}';
+    }
   }
 
   /**
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java
index d2a1c4c8da..198a7cf5e8 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java
@@ -92,6 +92,7 @@ public class BalanceSegments implements CoordinatorDuty
   )
   {
 
+    log.info("Balancing segments in tier [%s]", tier);
     if (params.getUsedSegments().size() == 0) {
       log.info("Metadata segments are not available. Cannot balance.");
       // suppress emit zero stats
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsTest.java
index d4a89abb3d..a7c594fe09 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsTest.java
@@ -30,7 +30,6 @@ import org.apache.druid.server.coordination.ServerType;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.partition.NoneShardSpec;
 import org.easymock.EasyMock;
-import org.hamcrest.Matchers;
 import org.joda.time.DateTime;
 import org.joda.time.Interval;
 import org.junit.After;
@@ -277,9 +276,9 @@ public class BalanceSegmentsTest
     params = new BalanceSegmentsTester(coordinator).run(params);
     EasyMock.verify(strategy);
     Assert.assertEquals(3L, params.getCoordinatorStats().getTieredStat("movedCount", "normal"));
-    Assert.assertThat(
-        peon3.getSegmentsToLoad(),
-        Matchers.is(Matchers.equalTo(ImmutableSet.of(segment1, segment3, segment4)))
+    Assert.assertEquals(
+        ImmutableSet.of(segment1, segment3, segment4),
+        peon3.getSegmentsToLoad()
     );
   }
 
@@ -289,7 +288,7 @@ public class BalanceSegmentsTest
     DruidCoordinatorRuntimeParams params = setupParamsForDecommissioningMaxPercentOfMaxSegmentsToMove(0);
     params = new BalanceSegmentsTester(coordinator).run(params);
     Assert.assertEquals(1L, params.getCoordinatorStats().getTieredStat("movedCount", "normal"));
-    Assert.assertThat(peon3.getSegmentsToLoad(), Matchers.is(Matchers.equalTo(ImmutableSet.of(segment1))));
+    Assert.assertEquals(ImmutableSet.of(segment1), peon3.getSegmentsToLoad());
   }
 
   @Test
@@ -298,7 +297,7 @@ public class BalanceSegmentsTest
     DruidCoordinatorRuntimeParams params = setupParamsForDecommissioningMaxPercentOfMaxSegmentsToMove(10);
     params = new BalanceSegmentsTester(coordinator).run(params);
     Assert.assertEquals(1L, params.getCoordinatorStats().getTieredStat("movedCount", "normal"));
-    Assert.assertThat(peon3.getSegmentsToLoad(), Matchers.is(Matchers.equalTo(ImmutableSet.of(segment2))));
+    Assert.assertEquals(ImmutableSet.of(segment2), peon3.getSegmentsToLoad());
   }
 
   /**
@@ -347,9 +346,9 @@ public class BalanceSegmentsTest
     params = new BalanceSegmentsTester(coordinator).run(params);
     EasyMock.verify(strategy);
     Assert.assertEquals(3L, params.getCoordinatorStats().getTieredStat("movedCount", "normal"));
-    Assert.assertThat(
-        peon3.getSegmentsToLoad(),
-        Matchers.is(Matchers.equalTo(ImmutableSet.of(segment2, segment3, segment4)))
+    Assert.assertEquals(
+        ImmutableSet.of(segment2, segment3, segment4),
+        peon3.getSegmentsToLoad()
     );
   }
 
@@ -603,10 +602,7 @@ public class BalanceSegmentsTest
     params = new BalanceSegmentsTester(coordinator).run(params);
     EasyMock.verify(strategy);
     Assert.assertEquals(1L, params.getCoordinatorStats().getTieredStat("movedCount", "normal"));
-    Assert.assertThat(
-        peon3.getSegmentsToLoad(),
-        Matchers.is(Matchers.equalTo(ImmutableSet.of(segment3)))
-    );
+    Assert.assertEquals(ImmutableSet.of(segment3), peon3.getSegmentsToLoad());
   }
 
   @Test
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/CreateDataSegments.java b/server/src/test/java/org/apache/druid/server/coordinator/CreateDataSegments.java
new file mode 100644
index 0000000000..8f2c123891
--- /dev/null
+++ b/server/src/test/java/org/apache/druid/server/coordinator/CreateDataSegments.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator;
+
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.segment.IndexIO;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.partition.NumberedShardSpec;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Test utility to create {@link DataSegment}s for a given datasource.
+ */
+public class CreateDataSegments
+{
+  private final String datasource;
+
+  private DateTime startTime;
+  private Granularity granularity;
+  private int numPartitions;
+  private int numIntervals;
+
+  public static CreateDataSegments ofDatasource(String datasource)
+  {
+    return new CreateDataSegments(datasource);
+  }
+
+  private CreateDataSegments(String datasource)
+  {
+    this.datasource = datasource;
+  }
+
+  public CreateDataSegments forIntervals(int numIntervals, Granularity intervalSize)
+  {
+    this.numIntervals = numIntervals;
+    this.granularity = intervalSize;
+    return this;
+  }
+
+  public CreateDataSegments startingAt(String startOfFirstInterval)
+  {
+    this.startTime = DateTimes.of(startOfFirstInterval);
+    return this;
+  }
+
+  public CreateDataSegments withNumPartitions(int numPartitions)
+  {
+    this.numPartitions = numPartitions;
+    return this;
+  }
+
+  public List<DataSegment> eachOfSizeInMb(long sizeMb)
+  {
+    final List<DataSegment> segments = new ArrayList<>();
+
+    int uniqueIdInInterval = 0;
+    DateTime nextStart = startTime;
+    for (int numInterval = 0; numInterval < numIntervals; ++numInterval) {
+      Interval nextInterval = new Interval(nextStart, granularity.increment(nextStart));
+      for (int numPartition = 0; numPartition < numPartitions; ++numPartition) {
+        segments.add(
+            new NumberedDataSegment(
+                datasource,
+                nextInterval,
+                new NumberedShardSpec(numPartition, numPartitions),
+                ++uniqueIdInInterval,
+                sizeMb
+            )
+        );
+      }
+      nextStart = granularity.increment(nextStart);
+    }
+
+    return Collections.unmodifiableList(segments);
+  }
+
+  /**
+   * Simple implementation of DataSegment with a unique integer id to make debugging easier.
+   */
+  private static class NumberedDataSegment extends DataSegment
+  {
+    private final int uniqueId;
+
+    private NumberedDataSegment(
+        String datasource,
+        Interval interval,
+        NumberedShardSpec shardSpec,
+        int uinqueId,
+        long size
+    )
+    {
+      super(
+          datasource,
+          interval,
+          "1",
+          Collections.emptyMap(),
+          Collections.emptyList(),
+          Collections.emptyList(),
+          shardSpec,
+          IndexIO.CURRENT_VERSION_ID,
+          size
+      );
+      this.uniqueId = uinqueId;
+    }
+
+    @Override
+    public String toString()
+    {
+      return "{" + getDataSource() + "::" + uniqueId + "}";
+    }
+  }
+}
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/RunRulesTest.java b/server/src/test/java/org/apache/druid/server/coordinator/RunRulesTest.java
index dc4a6f0abe..eb3be4c895 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/RunRulesTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/RunRulesTest.java
@@ -26,6 +26,7 @@ import com.google.common.util.concurrent.MoreExecutors;
 import org.apache.druid.client.DruidServer;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.java.util.emitter.service.ServiceEventBuilder;
@@ -41,8 +42,6 @@ import org.apache.druid.server.coordinator.rules.LoadRule;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.partition.NoneShardSpec;
 import org.easymock.EasyMock;
-import org.joda.time.DateTime;
-import org.joda.time.Interval;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -81,24 +80,11 @@ public class RunRulesTest
     databaseRuleManager = EasyMock.createMock(MetadataRuleManager.class);
     segmentsMetadataManager = EasyMock.createNiceMock(SegmentsMetadataManager.class);
 
-    DateTime start = DateTimes.of("2012-01-01");
-    usedSegments = new ArrayList<>();
-    for (int i = 0; i < 24; i++) {
-      usedSegments.add(
-          new DataSegment(
-              "test",
-              new Interval(start, start.plusHours(1)),
-              DateTimes.nowUtc().toString(),
-              new HashMap<>(),
-              new ArrayList<>(),
-              new ArrayList<>(),
-              NoneShardSpec.instance(),
-              IndexIO.CURRENT_VERSION_ID,
-              1
-          )
-      );
-      start = start.plusHours(1);
-    }
+    usedSegments = CreateDataSegments.ofDatasource("test")
+                                     .forIntervals(24, Granularities.HOUR)
+                                     .startingAt("2012-01-01")
+                                     .withNumPartitions(1)
+                                     .eachOfSizeInMb(1);
 
     ruleRunner = new RunRules(new ReplicationThrottler(24, 1, false), coordinator);
   }
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/BlockingExecutorService.java b/server/src/test/java/org/apache/druid/server/coordinator/simulate/BlockingExecutorService.java
new file mode 100644
index 0000000000..fc59a6bd9d
--- /dev/null
+++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/BlockingExecutorService.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.simulate;
+
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.logger.Logger;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * An executor that keeps submitted tasks in a queue until they are explicitly
+ * invoked by calling one of these methods:
+ * <ul>
+ *   <li>{@link #finishNextPendingTask()}</li>
+ *   <li>{@link #finishNextPendingTasks(int)}</li>
+ *   <li>{@link #finishAllPendingTasks()}</li>
+ * </ul>
+ */
+public class BlockingExecutorService implements ExecutorService
+{
+  private static final Logger log = new Logger(BlockingExecutorService.class);
+
+  private final String nameFormat;
+  private final Queue<Task<?>> taskQueue = new ConcurrentLinkedQueue<>();
+
+  public BlockingExecutorService(String nameFormat)
+  {
+    this.nameFormat = nameFormat;
+  }
+
+  public boolean hasPendingTasks()
+  {
+    return !taskQueue.isEmpty();
+  }
+
+  /**
+   * Executes the next pending task on the calling thread itself.
+   */
+  public int finishNextPendingTask()
+  {
+    log.debug("[%s] Executing next pending task", nameFormat);
+    Task<?> task = taskQueue.poll();
+    if (task != null) {
+      task.executeNow();
+      return 1;
+    } else {
+      return 0;
+    }
+  }
+
+  /**
+   * Executes the next {@code numTasksToExecute} pending tasks on the calling
+   * thread itself.
+   */
+  public int finishNextPendingTasks(int numTasksToExecute)
+  {
+    log.debug("[%s] Executing %d pending tasks", nameFormat, numTasksToExecute);
+    int executedTaskCount = 0;
+    for (; executedTaskCount < numTasksToExecute; ++executedTaskCount) {
+      Task<?> task = taskQueue.poll();
+      if (task == null) {
+        break;
+      } else {
+        task.executeNow();
+      }
+    }
+    return executedTaskCount;
+  }
+
+  /**
+   * Executes all the remaining pending tasks on the calling thread itself.
+   * <p>
+   * Note: This method can keep running forever if another thread keeps submitting
+   * new tasks to the executor.
+   */
+  public int finishAllPendingTasks()
+  {
+    log.debug("[%s] Executing all pending tasks", nameFormat);
+    Task<?> task;
+    int executedTaskCount = 0;
+    while ((task = taskQueue.poll()) != null) {
+      task.executeNow();
+      ++executedTaskCount;
+    }
+
+    return executedTaskCount;
+  }
+
+  // Task submission operations
+  @Override
+  public <T> Future<T> submit(Callable<T> task)
+  {
+    return addTaskToQueue(task);
+  }
+
+  @Override
+  public <T> Future<T> submit(Runnable task, T result)
+  {
+    return addTaskToQueue(() -> {
+      task.run();
+      return result;
+    });
+  }
+
+  @Override
+  public Future<?> submit(Runnable task)
+  {
+    return addTaskToQueue(() -> {
+      task.run();
+      return null;
+    });
+  }
+
+  @Override
+  public void execute(Runnable command)
+  {
+    submit(command);
+  }
+
+  private <T> Future<T> addTaskToQueue(Callable<T> callable)
+  {
+    Task<T> task = new Task<>(callable);
+    taskQueue.add(task);
+    return task.future;
+  }
+
+  // Termination operations
+  @Override
+  public void shutdown()
+  {
+    taskQueue.clear();
+  }
+
+  @Override
+  public List<Runnable> shutdownNow()
+  {
+    return null;
+  }
+
+  @Override
+  public boolean isShutdown()
+  {
+    return false;
+  }
+
+  @Override
+  public boolean isTerminated()
+  {
+    return false;
+  }
+
+  @Override
+  public boolean awaitTermination(long timeout, TimeUnit unit)
+  {
+    return false;
+  }
+
+  // Unsupported operations
+  @Override
+  public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public <T> List<Future<T>> invokeAll(
+      Collection<? extends Callable<T>> tasks,
+      long timeout,
+      TimeUnit unit
+  )
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  /**
+   * Task that can be invoked to complete the corresponding future.
+   */
+  private static class Task<T>
+  {
+    private final Callable<T> callable;
+    private final CompletableFuture<T> future = new CompletableFuture<>();
+
+    private Task(Callable<T> callable)
+    {
+      this.callable = callable;
+    }
+
+    private void executeNow()
+    {
+      try {
+        T result = callable.call();
+        future.complete(result);
+      }
+      catch (Exception e) {
+        throw new ISE("Error while executing task", e);
+      }
+    }
+  }
+
+}
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulation.java b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulation.java
new file mode 100644
index 0000000000..6822419f79
--- /dev/null
+++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulation.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.simulate;
+
+import org.apache.druid.client.DruidServer;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
+
+import java.util.List;
+
+/**
+ * Runner for a coordinator simulation.
+ */
+public interface CoordinatorSimulation
+{
+  /**
+   * Starts the simulation if not already started.
+   */
+  void start();
+
+  /**
+   * Stops the simulation.
+   */
+  void stop();
+
+  /**
+   * State of the coordinator during the simulation.
+   */
+  CoordinatorState coordinator();
+
+  /**
+   * State of the cluster during the simulation.
+   */
+  ClusterState cluster();
+
+  static CoordinatorSimulationBuilder builder()
+  {
+    return new CoordinatorSimulationBuilder();
+  }
+
+  interface CoordinatorState
+  {
+    /**
+     * Runs a single coordinator cycle.
+     */
+    void runCoordinatorCycle();
+
+    /**
+     * Synchronizes the inventory view maintained by the coordinator with the
+     * actual state of the cluster.
+     */
+    void syncInventoryView();
+
+    /**
+     * Sets the CoordinatorDynamicConfig.
+     */
+    void setDynamicConfig(CoordinatorDynamicConfig dynamicConfig);
+
+    /**
+     * Gets the inventory view of the specified server as maintained by the
+     * coordinator.
+     */
+    DruidServer getInventoryView(String serverName);
+
+    /**
+     * Returns the metric events emitted in the previous coordinator run.
+     */
+    List<ServiceMetricEvent> getMetricEvents();
+
+    /**
+     * Gets the load percentage of the specified datasource as seen by the coordinator.
+     */
+    double getLoadPercentage(String datasource);
+  }
+
+  interface ClusterState
+  {
+    /**
+     * Finishes load of all the segments that were queued in the previous
+     * coordinator run. Also handles the responses and executes the respective
+     * callbacks on the coordinator.
+     */
+    void loadQueuedSegments();
+  }
+}
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBaseTest.java b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBaseTest.java
new file mode 100644
index 0000000000..60e8e42824
--- /dev/null
+++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBaseTest.java
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.simulate;
+
+import org.apache.druid.client.DruidServer;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.server.coordination.ServerType;
+import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
+import org.apache.druid.server.coordinator.CreateDataSegments;
+import org.apache.druid.server.coordinator.rules.ForeverLoadRule;
+import org.apache.druid.server.coordinator.rules.Rule;
+import org.apache.druid.timeline.DataSegment;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Base test for coordinator simulations.
+ * <p>
+ * Each test must call {@link #startSimulation(CoordinatorSimulation)} to start
+ * the simulation. {@link CoordinatorSimulation#stop()} should not be called as
+ * the simulation is stopped when cleaning up after the test in {@link #tearDown()}.
+ * <p>
+ * Tests that verify balancing behaviour should set
+ * {@link CoordinatorDynamicConfig#useBatchedSegmentSampler()} to true.
+ * Otherwise, the segment sampling is random and can produce repeated values
+ * leading to flakiness in the tests. The simulation sets this field to true by
+ * default.
+ */
+public abstract class CoordinatorSimulationBaseTest
+    implements CoordinatorSimulation.CoordinatorState, CoordinatorSimulation.ClusterState
+{
+  static final double DOUBLE_DELTA = 10e-9;
+
+  private CoordinatorSimulation sim;
+  private final Map<String, List<ServiceMetricEvent>> latestMetricEvents = new HashMap<>();
+
+  @Before
+  public abstract void setUp();
+
+  @After
+  public void tearDown()
+  {
+    if (sim != null) {
+      sim.stop();
+      sim = null;
+    }
+  }
+
+  /**
+   * This must be called to start the simulation and set the correct state.
+   */
+  void startSimulation(CoordinatorSimulation simulation)
+  {
+    this.sim = simulation;
+    simulation.start();
+  }
+
+  @Override
+  public void runCoordinatorCycle()
+  {
+    latestMetricEvents.clear();
+    sim.coordinator().runCoordinatorCycle();
+
+    // Extract the metric values of this run
+    for (ServiceMetricEvent event : sim.coordinator().getMetricEvents()) {
+      latestMetricEvents.computeIfAbsent(event.getMetric(), m -> new ArrayList<>())
+                        .add(event);
+    }
+  }
+
+  @Override
+  public List<ServiceMetricEvent> getMetricEvents()
+  {
+    return sim.coordinator().getMetricEvents();
+  }
+
+  @Override
+  public DruidServer getInventoryView(String serverName)
+  {
+    return sim.coordinator().getInventoryView(serverName);
+  }
+
+  @Override
+  public void syncInventoryView()
+  {
+    sim.coordinator().syncInventoryView();
+  }
+
+  @Override
+  public void setDynamicConfig(CoordinatorDynamicConfig dynamicConfig)
+  {
+    sim.coordinator().setDynamicConfig(dynamicConfig);
+  }
+
+  @Override
+  public void loadQueuedSegments()
+  {
+    sim.cluster().loadQueuedSegments();
+  }
+
+  @Override
+  public double getLoadPercentage(String datasource)
+  {
+    return sim.coordinator().getLoadPercentage(datasource);
+  }
+
+  // Verification methods
+  void verifyDatasourceIsFullyLoaded(String datasource)
+  {
+    Assert.assertEquals(100.0, getLoadPercentage(datasource), DOUBLE_DELTA);
+  }
+
+  void verifyNoEvent(String metricName)
+  {
+    Assert.assertTrue(getMetricValues(metricName, null).isEmpty());
+  }
+
+  /**
+   * Verifies the value of the specified metric emitted in the previous run.
+   */
+  void verifyValue(String metricName, Number expectedValue)
+  {
+    verifyValue(metricName, null, expectedValue);
+  }
+
+  /**
+   * Verifies the value of the event corresponding to the specified metric and
+   * dimensionFilters emitted in the previous run.
+   */
+  void verifyValue(String metricName, Map<String, String> dimensionFilters, Number expectedValue)
+  {
+    Assert.assertEquals(expectedValue, getValue(metricName, dimensionFilters));
+  }
+
+  /**
+   * Gets the value of the event corresponding to the specified metric and
+   * dimensionFilters emitted in the previous run.
+   */
+  Number getValue(String metricName, Map<String, String> dimensionFilters)
+  {
+    List<Number> values = getMetricValues(metricName, dimensionFilters);
+    Assert.assertEquals(
+        "Metric must have been emitted exactly once for the given dimensions.",
+        1,
+        values.size()
+    );
+    return values.get(0);
+  }
+
+  private List<Number> getMetricValues(String metricName, Map<String, String> dimensionFilters)
+  {
+    final List<Number> values = new ArrayList<>();
+    final List<ServiceMetricEvent> events = latestMetricEvents.getOrDefault(metricName, Collections.emptyList());
+    final Map<String, String> filters = dimensionFilters == null
+                                        ? Collections.emptyMap() : dimensionFilters;
+    for (ServiceMetricEvent event : events) {
+      final Map<String, Object> userDims = event.getUserDims();
+      boolean match = filters.keySet().stream()
+                             .map(d -> filters.get(d).equals(userDims.get(d)))
+                             .reduce((a, b) -> a && b)
+                             .orElse(true);
+      if (match) {
+        values.add(event.getValue());
+      }
+    }
+
+    return values;
+  }
+
+  // Utility methods
+
+  /**
+   * Creates a {@link CoordinatorDynamicConfig} with the specified values of:
+   * {@code maxSegmentsToMove, maxSegmentsInNodeLoadingQueue and replicationThrottleLimit}.
+   * The created config always has {@code useBatchedSegmentSampler=true} to avoid
+   * flakiness in tests.
+   *
+   * @see CoordinatorSimulationBaseTest
+   */
+  static CoordinatorDynamicConfig createDynamicConfig(
+      int maxSegmentsToMove,
+      int maxSegmentsInNodeLoadingQueue,
+      int replicationThrottleLimit
+  )
+  {
+    return CoordinatorDynamicConfig.builder()
+                                   .withMaxSegmentsToMove(maxSegmentsToMove)
+                                   .withReplicationThrottleLimit(replicationThrottleLimit)
+                                   .withMaxSegmentsInNodeLoadingQueue(maxSegmentsInNodeLoadingQueue)
+                                   .withUseBatchedSegmentSampler(true)
+                                   .build();
+  }
+
+  /**
+   * Creates a map containing dimension key-values to filter out metric events.
+   */
+  static Map<String, String> filter(String... dimensionValues)
+  {
+    if (dimensionValues.length < 2 || dimensionValues.length % 2 == 1) {
+      throw new IllegalArgumentException("Dimension key-values must be specified in pairs.");
+    }
+
+    final Map<String, String> filters = new HashMap<>();
+    for (int i = 0; i < dimensionValues.length; ) {
+      filters.put(dimensionValues[i], dimensionValues[i + 1]);
+      i += 2;
+    }
+    return filters;
+  }
+
+  /**
+   * Creates a historical. The {@code uniqueIdInTier} must be correctly specified
+   * as it is used to identify the historical throughout the simulation.
+   */
+  static DruidServer createHistorical(int uniqueIdInTier, String tier, long serverSizeMb)
+  {
+    final String name = tier + "__" + "hist__" + uniqueIdInTier;
+    return new DruidServer(name, name, name, serverSizeMb, ServerType.HISTORICAL, tier, 1);
+  }
+
+  // Utility and constant holder classes
+
+  static class DS
+  {
+    static final String WIKI = "wiki";
+  }
+
+  static class Tier
+  {
+    static final String T1 = "tier_t1";
+    static final String T2 = "tier_t2";
+    static final String T3 = "tier_t3";
+  }
+
+  static class Metric
+  {
+    static final String ASSIGNED_COUNT = "segment/assigned/count";
+    static final String MOVED_COUNT = "segment/moved/count";
+    static final String DROPPED_COUNT = "segment/dropped/count";
+    static final String LOAD_QUEUE_COUNT = "segment/loadQueue/count";
+  }
+
+  static class Segments
+  {
+    /**
+     * Segments of datasource {@link DS#WIKI}, size 500 MB each,
+     * spanning 1 day containing 10 partitions.
+     */
+    static final List<DataSegment> WIKI_10X1D =
+        CreateDataSegments.ofDatasource(DS.WIKI)
+                          .forIntervals(1, Granularities.DAY)
+                          .startingAt("2022-01-01")
+                          .withNumPartitions(10)
+                          .eachOfSizeInMb(500);
+  }
+
+  /**
+   * Builder for a load rule.
+   */
+  static class Load
+  {
+    private final Map<String, Integer> tieredReplicants = new HashMap<>();
+
+    static Load on(String tier, int numReplicas)
+    {
+      Load load = new Load();
+      load.tieredReplicants.put(tier, numReplicas);
+      return load;
+    }
+
+    Load andOn(String tier, int numReplicas)
+    {
+      tieredReplicants.put(tier, numReplicas);
+      return this;
+    }
+
+    Rule forever()
+    {
+      return new ForeverLoadRule(tieredReplicants);
+    }
+  }
+}
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java
new file mode 100644
index 0000000000..29301ea033
--- /dev/null
+++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java
@@ -0,0 +1,554 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.simulate;
+
+import com.fasterxml.jackson.databind.InjectableValues;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import org.apache.druid.client.DruidServer;
+import org.apache.druid.common.config.JacksonConfigManager;
+import org.apache.druid.curator.ZkEnablementConfig;
+import org.apache.druid.curator.discovery.ServiceAnnouncer;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.concurrent.DirectExecutorService;
+import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory;
+import org.apache.druid.java.util.common.lifecycle.Lifecycle;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.java.util.http.client.HttpClient;
+import org.apache.druid.java.util.metrics.StubServiceEmitter;
+import org.apache.druid.server.coordinator.BalancerStrategyFactory;
+import org.apache.druid.server.coordinator.CachingCostBalancerStrategyConfig;
+import org.apache.druid.server.coordinator.CachingCostBalancerStrategyFactory;
+import org.apache.druid.server.coordinator.CoordinatorCompactionConfig;
+import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
+import org.apache.druid.server.coordinator.CostBalancerStrategyFactory;
+import org.apache.druid.server.coordinator.DruidCoordinator;
+import org.apache.druid.server.coordinator.DruidCoordinatorConfig;
+import org.apache.druid.server.coordinator.LoadQueueTaskMaster;
+import org.apache.druid.server.coordinator.TestDruidCoordinatorConfig;
+import org.apache.druid.server.coordinator.duty.CoordinatorCustomDutyGroups;
+import org.apache.druid.server.coordinator.rules.Rule;
+import org.apache.druid.server.initialization.ZkPathsConfig;
+import org.apache.druid.server.lookup.cache.LookupCoordinatorManager;
+import org.apache.druid.timeline.DataSegment;
+import org.easymock.EasyMock;
+import org.joda.time.Duration;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Builder for {@link CoordinatorSimulation}.
+ */
+public class CoordinatorSimulationBuilder
+{
+  private static final long DEFAULT_COORDINATOR_PERIOD = 100L;
+  private static final ObjectMapper OBJECT_MAPPER = new DefaultObjectMapper()
+      .setInjectableValues(
+          new InjectableValues.Std().addValue(
+              DataSegment.PruneSpecsHolder.class,
+              DataSegment.PruneSpecsHolder.DEFAULT
+          )
+      );
+
+  private BalancerStrategyFactory balancerStrategyFactory;
+  private CoordinatorDynamicConfig dynamicConfig =
+      CoordinatorDynamicConfig.builder()
+                              .withUseBatchedSegmentSampler(true)
+                              .build();
+  private List<DruidServer> servers;
+  private List<DataSegment> segments;
+  private final Map<String, List<Rule>> datasourceRules = new HashMap<>();
+  private boolean loadImmediately = false;
+  private boolean autoSyncInventory = true;
+
+  /**
+   * Specifies the balancer strategy to be used.
+   * <p>
+   * Default: "cost" ({@link CostBalancerStrategyFactory})
+   */
+  public CoordinatorSimulationBuilder withBalancer(BalancerStrategyFactory strategyFactory)
+  {
+    this.balancerStrategyFactory = strategyFactory;
+    return this;
+  }
+
+  public CoordinatorSimulationBuilder withServers(List<DruidServer> servers)
+  {
+    this.servers = servers;
+    return this;
+  }
+
+  public CoordinatorSimulationBuilder withServers(DruidServer... servers)
+  {
+    return withServers(Arrays.asList(servers));
+  }
+
+  public CoordinatorSimulationBuilder withSegments(List<DataSegment> segments)
+  {
+    this.segments = segments;
+    return this;
+  }
+
+  public CoordinatorSimulationBuilder withRules(String datasource, Rule... rules)
+  {
+    this.datasourceRules.put(datasource, Arrays.asList(rules));
+    return this;
+  }
+
+  /**
+   * Specifies whether segments should be loaded as soon as they are queued.
+   * <p>
+   * Default: false
+   */
+  public CoordinatorSimulationBuilder withImmediateSegmentLoading(boolean loadImmediately)
+  {
+    this.loadImmediately = loadImmediately;
+    return this;
+  }
+
+  /**
+   * Specifies whether the inventory view maintained by the coordinator
+   * should be auto-synced as soon as any change is made to the cluster.
+   * <p>
+   * Default: true
+   */
+  public CoordinatorSimulationBuilder withAutoInventorySync(boolean autoSync)
+  {
+    this.autoSyncInventory = autoSync;
+    return this;
+  }
+
+  /**
+   * Specifies the CoordinatorDynamicConfig to be used in the simulation.
+   * <p>
+   * Default values: {@code useBatchedSegmentSampler = true}, other params as
+   * specified in {@link CoordinatorDynamicConfig.Builder}.
+   * <p>
+   * Tests that verify balancing behaviour should set
+   * {@link CoordinatorDynamicConfig#useBatchedSegmentSampler()} to true.
+   * Otherwise, the segment sampling is random and can produce repeated values
+   * leading to flakiness in the tests. The simulation sets this field to true by
+   * default.
+   */
+  public CoordinatorSimulationBuilder withDynamicConfig(CoordinatorDynamicConfig dynamicConfig)
+  {
+    this.dynamicConfig = dynamicConfig;
+    return this;
+  }
+
+  public CoordinatorSimulation build()
+  {
+    Preconditions.checkArgument(
+        servers != null && !servers.isEmpty(),
+        "Cannot run simulation for an empty cluster"
+    );
+
+    // Prepare the environment
+    final TestServerInventoryView serverInventoryView = new TestServerInventoryView();
+    servers.forEach(serverInventoryView::addServer);
+
+    final TestSegmentsMetadataManager segmentManager = new TestSegmentsMetadataManager();
+    if (segments != null) {
+      segments.forEach(segmentManager::addSegment);
+    }
+
+    final TestMetadataRuleManager ruleManager = new TestMetadataRuleManager();
+    datasourceRules.forEach(
+        (datasource, rules) ->
+            ruleManager.overrideRule(datasource, rules, null)
+    );
+
+    final Environment env = new Environment(
+        serverInventoryView,
+        segmentManager,
+        ruleManager,
+        dynamicConfig,
+        loadImmediately,
+        autoSyncInventory
+    );
+
+    // Build the coordinator
+    final DruidCoordinator coordinator = new DruidCoordinator(
+        env.coordinatorConfig,
+        new ZkPathsConfig(),
+        env.jacksonConfigManager,
+        env.segmentManager,
+        env.coordinatorInventoryView,
+        env.ruleManager,
+        () -> null,
+        env.serviceEmitter,
+        env.executorFactory,
+        null,
+        env.loadQueueTaskMaster,
+        new ServiceAnnouncer.Noop(),
+        null,
+        Collections.emptySet(),
+        null,
+        new CoordinatorCustomDutyGroups(Collections.emptySet()),
+        balancerStrategyFactory != null ? balancerStrategyFactory
+                                        : new CostBalancerStrategyFactory(),
+        env.lookupCoordinatorManager,
+        env.leaderSelector,
+        OBJECT_MAPPER,
+        ZkEnablementConfig.ENABLED
+    );
+
+    return new SimulationImpl(coordinator, env);
+  }
+
+  private BalancerStrategyFactory buildCachingCostBalancerStrategy(Environment env)
+  {
+    try {
+      return new CachingCostBalancerStrategyFactory(
+          env.coordinatorInventoryView,
+          env.lifecycle,
+          new CachingCostBalancerStrategyConfig()
+      );
+    }
+    catch (Exception e) {
+      throw new ISE(e, "Error building balancer strategy");
+    }
+  }
+
+  /**
+   * Implementation of {@link CoordinatorSimulation}.
+   */
+  private static class SimulationImpl implements CoordinatorSimulation,
+      CoordinatorSimulation.CoordinatorState, CoordinatorSimulation.ClusterState
+  {
+    private final AtomicBoolean running = new AtomicBoolean(false);
+
+    private final Environment env;
+    private final DruidCoordinator coordinator;
+
+    private SimulationImpl(DruidCoordinator coordinator, Environment env)
+    {
+      this.env = env;
+      this.coordinator = coordinator;
+    }
+
+    @Override
+    public void start()
+    {
+      if (!running.compareAndSet(false, true)) {
+        throw new ISE("Simulation is already running");
+      }
+
+      try {
+        env.setUp();
+        coordinator.start();
+      }
+      catch (Exception e) {
+        throw new ISE(e, "Exception while running simulation");
+      }
+    }
+
+    @Override
+    public void stop()
+    {
+      coordinator.stop();
+      env.leaderSelector.stopBeingLeader();
+      env.tearDown();
+    }
+
+    @Override
+    public CoordinatorState coordinator()
+    {
+      return this;
+    }
+
+    @Override
+    public ClusterState cluster()
+    {
+      return this;
+    }
+
+    @Override
+    public void runCoordinatorCycle()
+    {
+      verifySimulationRunning();
+      env.serviceEmitter.flush();
+
+      // Invoke historical duties and metadata duties
+      env.executorFactory.coordinatorRunner.finishNextPendingTasks(2);
+    }
+
+    @Override
+    public void syncInventoryView()
+    {
+      verifySimulationRunning();
+      Preconditions.checkState(
+          !env.autoSyncInventory,
+          "Cannot invoke syncInventoryView as simulation is running in auto-sync mode."
+      );
+      env.coordinatorInventoryView.sync(env.historicalInventoryView);
+    }
+
+    @Override
+    public void setDynamicConfig(CoordinatorDynamicConfig dynamicConfig)
+    {
+      env.setDynamicConfig(dynamicConfig);
+    }
+
+    @Override
+    public DruidServer getInventoryView(String serverName)
+    {
+      return env.coordinatorInventoryView.getInventoryValue(serverName);
+    }
+
+    @Override
+    public void loadQueuedSegments()
+    {
+      verifySimulationRunning();
+      Preconditions.checkState(
+          !env.loadImmediately,
+          "Cannot invoke loadQueuedSegments as simulation is running in immediate loading mode."
+      );
+
+      final BlockingExecutorService loadQueueExecutor = env.executorFactory.loadQueueExecutor;
+      while (loadQueueExecutor.hasPendingTasks()) {
+        // Drain all the items from the load queue executor
+        // This sends at most 1 load/drop request to each server
+        loadQueueExecutor.finishAllPendingTasks();
+
+        // Load all the queued segments, handle their responses and execute callbacks
+        int loadedSegments = env.executorFactory.historicalLoader.finishAllPendingTasks();
+        loadQueueExecutor.finishNextPendingTasks(loadedSegments);
+        env.executorFactory.loadCallbackExecutor.finishAllPendingTasks();
+      }
+    }
+
+    private void verifySimulationRunning()
+    {
+      if (!running.get()) {
+        throw new ISE("Simulation hasn't been started yet.");
+      }
+    }
+
+    @Override
+    public double getLoadPercentage(String datasource)
+    {
+      return coordinator.getLoadStatus().get(datasource);
+    }
+
+    @Override
+    public List<ServiceMetricEvent> getMetricEvents()
+    {
+      return new ArrayList<>(env.serviceEmitter.getMetricEvents());
+    }
+  }
+
+  /**
+   * Environment for a coordinator simulation.
+   */
+  private static class Environment
+  {
+    private final Lifecycle lifecycle = new Lifecycle("coord-sim");
+
+    // Executors
+    private final ExecutorFactory executorFactory;
+
+    private final TestDruidLeaderSelector leaderSelector = new TestDruidLeaderSelector();
+    private final TestSegmentsMetadataManager segmentManager;
+    private final TestMetadataRuleManager ruleManager;
+    private final TestServerInventoryView historicalInventoryView;
+
+    private final LoadQueueTaskMaster loadQueueTaskMaster;
+    private final StubServiceEmitter serviceEmitter
+        = new StubServiceEmitter("coordinator", "coordinator");
+    private final TestServerInventoryView coordinatorInventoryView;
+
+    private final AtomicReference<CoordinatorDynamicConfig> dynamicConfig = new AtomicReference<>();
+    private final JacksonConfigManager jacksonConfigManager;
+    private final LookupCoordinatorManager lookupCoordinatorManager;
+    private final DruidCoordinatorConfig coordinatorConfig;
+    private final boolean loadImmediately;
+    private final boolean autoSyncInventory;
+
+    private final List<Object> mocks = new ArrayList<>();
+
+    private Environment(
+        TestServerInventoryView clusterInventory,
+        TestSegmentsMetadataManager segmentManager,
+        TestMetadataRuleManager ruleManager,
+        CoordinatorDynamicConfig dynamicConfig,
+        boolean loadImmediately,
+        boolean autoSyncInventory
+    )
+    {
+      this.historicalInventoryView = clusterInventory;
+      this.segmentManager = segmentManager;
+      this.ruleManager = ruleManager;
+      this.loadImmediately = loadImmediately;
+      this.autoSyncInventory = autoSyncInventory;
+
+      this.coordinatorConfig = new TestDruidCoordinatorConfig.Builder()
+          .withCoordinatorStartDelay(new Duration(1L))
+          .withCoordinatorPeriod(new Duration(DEFAULT_COORDINATOR_PERIOD))
+          .withCoordinatorKillPeriod(new Duration(DEFAULT_COORDINATOR_PERIOD))
+          .withLoadQueuePeonRepeatDelay(new Duration("PT0S"))
+          .withLoadQueuePeonType("http")
+          .withCoordinatorKillIgnoreDurationToRetain(false)
+          .build();
+
+      this.executorFactory = new ExecutorFactory(loadImmediately);
+      this.coordinatorInventoryView = autoSyncInventory
+                                      ? clusterInventory
+                                      : new TestServerInventoryView();
+      HttpClient httpClient = new TestSegmentLoadingHttpClient(
+          OBJECT_MAPPER,
+          clusterInventory::getChangeHandlerForHost,
+          executorFactory.create(1, ExecutorFactory.HISTORICAL_LOADER)
+      );
+
+      this.loadQueueTaskMaster = new LoadQueueTaskMaster(
+          null,
+          OBJECT_MAPPER,
+          executorFactory.create(1, ExecutorFactory.LOAD_QUEUE_EXECUTOR),
+          executorFactory.create(1, ExecutorFactory.LOAD_CALLBACK_EXECUTOR),
+          coordinatorConfig,
+          httpClient,
+          null
+      );
+
+      this.jacksonConfigManager = mockConfigManager();
+      setDynamicConfig(dynamicConfig);
+
+      this.lookupCoordinatorManager = EasyMock.createNiceMock(LookupCoordinatorManager.class);
+      mocks.add(jacksonConfigManager);
+      mocks.add(lookupCoordinatorManager);
+    }
+
+    private void setUp() throws Exception
+    {
+      EmittingLogger.registerEmitter(serviceEmitter);
+      historicalInventoryView.setUp();
+      coordinatorInventoryView.setUp();
+      lifecycle.start();
+      executorFactory.setUp();
+      leaderSelector.becomeLeader();
+      EasyMock.replay(mocks.toArray());
+    }
+
+    private void tearDown()
+    {
+      EasyMock.verify(mocks.toArray());
+      executorFactory.tearDown();
+      lifecycle.stop();
+    }
+
+    private void setDynamicConfig(CoordinatorDynamicConfig dynamicConfig)
+    {
+      this.dynamicConfig.set(dynamicConfig);
+    }
+
+    private JacksonConfigManager mockConfigManager()
+    {
+      final JacksonConfigManager jacksonConfigManager
+          = EasyMock.createMock(JacksonConfigManager.class);
+      EasyMock.expect(
+          jacksonConfigManager.watch(
+              EasyMock.eq(CoordinatorDynamicConfig.CONFIG_KEY),
+              EasyMock.eq(CoordinatorDynamicConfig.class),
+              EasyMock.anyObject()
+          )
+      ).andReturn(dynamicConfig).anyTimes();
+
+      EasyMock.expect(
+          jacksonConfigManager.watch(
+              EasyMock.eq(CoordinatorCompactionConfig.CONFIG_KEY),
+              EasyMock.eq(CoordinatorCompactionConfig.class),
+              EasyMock.anyObject()
+          )
+      ).andReturn(new AtomicReference<>(CoordinatorCompactionConfig.empty())).anyTimes();
+
+      return jacksonConfigManager;
+    }
+  }
+
+  /**
+   * Implementation of {@link ScheduledExecutorFactory} used to create and keep
+   * a handle on the various executors used inside the coordinator.
+   */
+  private static class ExecutorFactory implements ScheduledExecutorFactory
+  {
+    static final String HISTORICAL_LOADER = "historical-loader-%d";
+    static final String LOAD_QUEUE_EXECUTOR = "load-queue-%d";
+    static final String LOAD_CALLBACK_EXECUTOR = "load-callback-%d";
+    static final String COORDINATOR_RUNNER = "Coordinator-Exec--%d";
+
+    private final Map<String, BlockingExecutorService> blockingExecutors = new HashMap<>();
+    private final boolean directExecution;
+
+    private BlockingExecutorService historicalLoader;
+    private BlockingExecutorService loadQueueExecutor;
+    private BlockingExecutorService loadCallbackExecutor;
+    private BlockingExecutorService coordinatorRunner;
+
+    private ExecutorFactory(boolean directExecution)
+    {
+      this.directExecution = directExecution;
+    }
+
+    @Override
+    public ScheduledExecutorService create(int corePoolSize, String nameFormat)
+    {
+      boolean isCoordinatorRunner = COORDINATOR_RUNNER.equals(nameFormat);
+
+      // Coordinator running executor must always be blocked
+      final ExecutorService executorService =
+          (directExecution && !isCoordinatorRunner)
+          ? new DirectExecutorService()
+          : blockingExecutors.computeIfAbsent(nameFormat, BlockingExecutorService::new);
+
+      return new WrappingScheduledExecutorService(nameFormat, executorService, !isCoordinatorRunner);
+    }
+
+    private BlockingExecutorService findExecutor(String nameFormat)
+    {
+      return blockingExecutors.get(nameFormat);
+    }
+
+    private void setUp()
+    {
+      coordinatorRunner = findExecutor(COORDINATOR_RUNNER);
+      historicalLoader = findExecutor(HISTORICAL_LOADER);
+      loadQueueExecutor = findExecutor(LOAD_QUEUE_EXECUTOR);
+      loadCallbackExecutor = findExecutor(LOAD_CALLBACK_EXECUTOR);
+    }
+
+    private void tearDown()
+    {
+      blockingExecutors.values().forEach(BlockingExecutorService::shutdown);
+    }
+  }
+
+}
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/README.md b/server/src/test/java/org/apache/druid/server/coordinator/simulate/README.md
new file mode 100644
index 0000000000..a1562c5187
--- /dev/null
+++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/README.md
@@ -0,0 +1,141 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+
+# Coordinator simulations
+
+The simulation framework allows developers to recreate arbitrary cluster setups and verify coordinator behaviour. Tests
+written using the framework can also help identify performance bottlenecks or potential bugs in the system and even
+compare different balancing strategies.
+
+As opposed to unit tests, simulations are meant to test the coordinator as a whole and verify the interactions of all
+the underlying parts. In that regard, these simulations resemble integration tests more closely.
+
+## Test targets
+
+The primary test target is the `DruidCoordinator` itself. The behaviour of the following entities can also be verified
+using simulations:
+
+- `LoadQueuePeon`, `LoadQueueTaskMaster`
+- All coordinator duties, e.g. `BalanceSegments`, `RunRules`
+- All retention rules
+
+## Capabilities
+
+The framework provides control over the following aspects of the setup:
+
+| Input | Details | Actions |
+|-------|---------|---------|
+|cluster | server name, type, tier, size | add a server, remove a server|
+|segment |datasource, interval, version, partition num, size | add/remove from server, mark used/unused, publish new segments|
+|rules | type (foreverLoad, drop, etc), replica count per tier | set rules for a datasource| 
+|configs |coordinator period, load queue type, load queue size, max segments to balance | set or update a config |
+
+The above actions can be performed at any point after building the simulation. So, you could even recreate scenarios
+where during a coordinator run, a server crashes or the retention rules of a datasource change, and verify the behaviour
+of the coordinator in these situations.
+
+## Design
+
+1. __Execution__: A tight dependency on time durations such as the period of a repeating task or the delay before a
+   scheduled task makes it difficult to reliably reproduce a test scenario. As a result, the tests become flaky. Thus,
+   all the executors required for coordinator operations have been allowed only two possible modes of execution:
+    - __immediate__: Execute tasks on the calling thread itself.
+    - __blocked__: Keep tasks in a queue until explicitly invoked.
+2. __Internal dependencies__: In order to allow realistic reproductions of the coordinator behaviour, none of the
+   internal parts of the coordinator have been mocked in the framework and new tests need not mock anything at all.
+3. __External dependencies__: Since these tests are meant to verify the behaviour of only the coordinator, the
+   interfaces to communicate with external dependencies have been provided as simple in-memory implementations:
+    - communication with metadata store: `SegmentMetadataManager`, `MetadataRuleManager`
+    - communication with historicals: `HttpClient`, `ServerInventoryView`
+4. __Inventory__: The coordinator maintains an inventory view of the cluster state. Simulations can choose from two
+   modes of inventory update - auto and manual. In auto update mode, any change made to the cluster is immediately
+   reflected in the inventory view. In manual update mode, the inventory must be explicitly synchronized with the
+   cluster state.
+
+## Limitations
+
+- The framework does not expose the coordinator HTTP endpoints.
+- It should not be used to verify the absolute values of execution latencies, e.g. the time taken to compute the
+  balancing cost of a segment. But the relative values can still be a good indicator while doing comparisons between,
+  say two balancing strategies.
+
+## Usage
+
+Writing a test class:
+
+- Extend `CoordinatorSimulationBaseTest`. This base test exposes methods to get or set the state of the cluster and
+  coordinator during a simulation.
+- Build a simulation using `CoordinatorSimulation.builder()` with specified segments, servers, rules and configs.
+- Start the simulation with `startSimulation(simulation)`.
+- Invoke coordinator runs with `runCoordinatorCycle()`
+- Verify emitted metrics and current cluster state
+
+Example:
+
+```java
+public class SimpleSimulationTest extends CoordinatorSimulationBaseTest
+{
+  @Test
+  public void testShiftSegmentsToDifferentTier()
+  {
+    // Create segments
+    List<DataSegment> segments =
+        CreateDataSegments.ofDatasource("wiki")
+                          .forIntervals(30, Granularities.DAY)
+                          .startingAt("2022-01-01")
+                          .withNumPartitions(10)
+                          .eachOfSizeInMb(500);
+
+    // Create servers
+    DruidServer historicalTier1 = createHistoricalTier(1, "tier_1", 10000);
+    DruidServer historicalTier2 = createHistoricalTier(1, "tier_2", 20000);
+
+    // Build simulation
+    CoordinatorSimulation sim =
+        CoordinatorSimulation.builder()
+                             .withServers(historicalTier1, historicalTier2)
+                             .withSegments(segments)
+                             .withRules("wiki".Load.on("tier_2", 1).forever())
+                             .build();
+
+    // Start the simulation with all segments loaded on tier_1
+    segments.forEach(historicalTier1::addSegment);
+    startSimulation(sim);
+
+    // Run a few coordinator cycles
+    int totalLoadedOnT2 = 0;
+    int totalDroppedFromT1 = 0;
+    for (int i = 0; i < 10; ++i) {
+      runCoordinatorCycle();
+      loadQueuedSegments();
+      totalLoadedOnT2 += getValue("segment/assigned/count", filter("tier", "tier_2"));
+      totalDroppedFromT1 += getValue("segment/dropped/count", filter("tier", "tier_1"));
+    }
+
+    // Verify that some segments have been loaded/dropped
+    Assert.assertTrue(totalLoadedOnT2 > 0 && totalLoadedOnT2 <= segments.size());
+    Assert.assertTrue(totalDroppedFromT1 > 0 && totalDroppedFromT1 <= segments.size());
+    Assert.assertTrue(totalDroppedFromT1 <= totalLoadedOnT2);
+  }
+}
+```
+
+## More examples
+
+See `org.apache.druid.server.coordinator.simulate.SegmentLoadingTest`
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/SegmentBalancingTest.java b/server/src/test/java/org/apache/druid/server/coordinator/simulate/SegmentBalancingTest.java
new file mode 100644
index 0000000000..77b9820a95
--- /dev/null
+++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/SegmentBalancingTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.simulate;
+
+import org.apache.druid.client.DruidServer;
+import org.apache.druid.query.DruidMetrics;
+import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
+import org.apache.druid.timeline.DataSegment;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.List;
+
+/**
+ * Coordinator simulation test to verify behaviour of segment balancing.
+ */
+public class SegmentBalancingTest extends CoordinatorSimulationBaseTest
+{
+  private DruidServer historicalT11;
+  private DruidServer historicalT12;
+
+  private final String datasource = DS.WIKI;
+  private final List<DataSegment> segments = Segments.WIKI_10X1D;
+
+  @Override
+  public void setUp()
+  {
+    // Setup historicals for 2 tiers, size 10 GB each
+    historicalT11 = createHistorical(1, Tier.T1, 10_000);
+    historicalT12 = createHistorical(2, Tier.T1, 10_000);
+  }
+
+  @Test
+  public void testBalancingWithSyncedInventory()
+  {
+    // maxSegmentsToMove = 10, unlimited load queue, replicationThrottleLimit = 10
+    CoordinatorDynamicConfig dynamicConfig = createDynamicConfig(10, 0, 10);
+
+    // historicals = 2(T1), replicas = 1(T1)
+    final CoordinatorSimulation sim =
+        CoordinatorSimulation.builder()
+                             .withSegments(segments)
+                             .withServers(historicalT11, historicalT12)
+                             .withRules(datasource, Load.on(Tier.T1, 1).forever())
+                             .withDynamicConfig(dynamicConfig)
+                             .withAutoInventorySync(true)
+                             .build();
+
+    // Put all the segments on histT11
+    segments.forEach(historicalT11::addDataSegment);
+
+    startSimulation(sim);
+    runCoordinatorCycle();
+
+    // Verify that segments have been chosen for balancing
+    verifyValue(Metric.MOVED_COUNT, 5L);
+
+    loadQueuedSegments();
+
+    // Verify that segments have now been balanced out
+    Assert.assertEquals(5, historicalT11.getTotalSegments());
+    Assert.assertEquals(5, historicalT12.getTotalSegments());
+    verifyDatasourceIsFullyLoaded(datasource);
+  }
+
+  @Test
+  public void testBalancingOfFullyReplicatedSegment()
+  {
+    // maxSegmentsToMove = 10, unlimited load queue, replicationThrottleLimit = 10
+    CoordinatorDynamicConfig dynamicConfig = createDynamicConfig(10, 0, 10);
+
+    // historicals = 2(in T1), replicas = 1(T1)
+    final CoordinatorSimulation sim =
+        CoordinatorSimulation.builder()
+                             .withSegments(segments)
+                             .withServers(historicalT11, historicalT12)
+                             .withDynamicConfig(dynamicConfig)
+                             .withRules(datasource, Load.on(Tier.T1, 1).forever())
+                             .build();
+
+    // Put all the segments on histT11
+    segments.forEach(historicalT11::addDataSegment);
+
+    startSimulation(sim);
+    runCoordinatorCycle();
+
+    // Verify that there are segments in the load queue for balancing
+    verifyValue(Metric.MOVED_COUNT, 5L);
+    verifyValue(
+        Metric.LOAD_QUEUE_COUNT,
+        filter(DruidMetrics.SERVER, historicalT12.getName()),
+        5
+    );
+
+    runCoordinatorCycle();
+
+    // Verify that the segments in the load queue are not considered as over-replicated
+    verifyValue("segment/dropped/count", 0L);
+    verifyValue(
+        Metric.LOAD_QUEUE_COUNT,
+        filter(DruidMetrics.SERVER, historicalT12.getName()),
+        5
+    );
+
+    // Finish and verify balancing
+    loadQueuedSegments();
+    Assert.assertEquals(5, historicalT11.getTotalSegments());
+    Assert.assertEquals(5, historicalT12.getTotalSegments());
+    verifyDatasourceIsFullyLoaded(datasource);
+  }
+}
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/SegmentLoadingNegativeTest.java b/server/src/test/java/org/apache/druid/server/coordinator/simulate/SegmentLoadingNegativeTest.java
new file mode 100644
index 0000000000..52dc7c0933
--- /dev/null
+++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/SegmentLoadingNegativeTest.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.simulate;
+
+import org.apache.druid.client.DruidServer;
+import org.apache.druid.query.DruidMetrics;
+import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
+import org.apache.druid.timeline.DataSegment;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.List;
+
+/**
+ * Contains negative tests that verify existing erroneous behaviour of segment
+ * loading. The underlying issues should be fixed and the modified tests
+ * should be migrated to {@link SegmentLoadingTest}.
+ * <p>
+ * Identified issues:
+ * <a href="https://github.com/apache/druid/issues/12881">Apache #12881</a>
+ */
+public class SegmentLoadingNegativeTest extends CoordinatorSimulationBaseTest
+{
+  private DruidServer historicalT11;
+  private DruidServer historicalT12;
+  private DruidServer historicalT21;
+
+  private final String datasource = DS.WIKI;
+  private final List<DataSegment> segments = Segments.WIKI_10X1D;
+
+  @Override
+  public void setUp()
+  {
+    // Setup historicals for 2 tiers, size 10 GB each
+    historicalT11 = createHistorical(1, Tier.T1, 10_000);
+    historicalT12 = createHistorical(2, Tier.T1, 10_000);
+    historicalT21 = createHistorical(1, Tier.T2, 10_000);
+  }
+
+  /**
+   * Correct behaviour: replicationThrottleLimit should not be violated even if
+   * segment loading is fast.
+   * <p>
+   * Fix Apache #12881 to fix this test.
+   */
+  @Test
+  public void testImmediateLoadingViolatesThrottleLimit()
+  {
+    // Disable balancing, infinite load queue size, replicationThrottleLimit = 2
+    CoordinatorDynamicConfig dynamicConfig = createDynamicConfig(0, 0, 2);
+
+    // historicals = 2(in T1), segments = 10*1day
+    // replicas = 2(on T1), immediate segment loading
+    final CoordinatorSimulation sim =
+        CoordinatorSimulation.builder()
+                             .withSegments(segments)
+                             .withServers(historicalT11, historicalT12)
+                             .withRules(datasource, Load.on(Tier.T1, 2).forever())
+                             .withImmediateSegmentLoading(true)
+                             .withDynamicConfig(dynamicConfig)
+                             .build();
+
+    // Put the first replica of all the segments on histT11
+    segments.forEach(historicalT11::addDataSegment);
+
+    startSimulation(sim);
+    runCoordinatorCycle();
+
+    // Verify that number of replicas assigned exceeds the replicationThrottleLimit
+    verifyValue(Metric.ASSIGNED_COUNT, 10L);
+
+    Assert.assertEquals(10, historicalT11.getTotalSegments());
+    Assert.assertEquals(10, historicalT12.getTotalSegments());
+    verifyDatasourceIsFullyLoaded(datasource);
+  }
+
+  /**
+   * Correct behaviour: The first replica on any tier should not be throttled.
+   * <p>
+   * Fix Apache #12881 to fix this test.
+   */
+  @Test
+  public void testFirstReplicaOnAnyTierIsThrottled()
+  {
+    // Disable balancing, infinite load queue size, replicateThrottleLimit = 2
+    CoordinatorDynamicConfig dynamicConfig = createDynamicConfig(0, 0, 2);
+
+    // historicals = 1(in T1) + 1(in T2)
+    // replicas = 1(on T1) + 1(on T2)
+    final CoordinatorSimulation sim =
+        CoordinatorSimulation.builder()
+                             .withSegments(segments)
+                             .withServers(historicalT11, historicalT21)
+                             .withDynamicConfig(dynamicConfig)
+                             .withRules(
+                                 datasource,
+                                 Load.on(Tier.T1, 1).andOn(Tier.T2, 1).forever()
+                             )
+                             .build();
+
+    // Put the first replica of all the segments on T1
+    segments.forEach(historicalT11::addDataSegment);
+
+    startSimulation(sim);
+    runCoordinatorCycle();
+
+    // Verify that num replicas assigned to T2 are equal to the replicationthrottleLimit
+    verifyValue(
+        Metric.ASSIGNED_COUNT,
+        filter(DruidMetrics.TIER, Tier.T2),
+        2L
+    );
+
+    loadQueuedSegments();
+
+    verifyDatasourceIsFullyLoaded(datasource);
+    Assert.assertEquals(10, historicalT11.getTotalSegments());
+    Assert.assertEquals(2, historicalT21.getTotalSegments());
+  }
+
+  /**
+   * Correct behaviour: Historical should not get overassigned even if loading is fast.
+   * <p>
+   * Fix Apache #12881 to fix this test.
+   */
+  @Test
+  public void testImmediateLoadingOverassignsHistorical()
+  {
+    // historicals = 1(in T1), size 1 GB
+    final DruidServer historicalT11 = createHistorical(1, Tier.T1, 1000);
+
+    // disable balancing, unlimited load queue, replicationThrottleLimit = 10
+    CoordinatorDynamicConfig dynamicConfig = createDynamicConfig(0, 0, 10);
+
+    // segments = 10*1day, size 500 MB
+    // strategy = cost, replicas = 1(T1)
+    final CoordinatorSimulation sim =
+        CoordinatorSimulation.builder()
+                             .withSegments(segments)
+                             .withServers(historicalT11)
+                             .withDynamicConfig(dynamicConfig)
+                             .withRules(datasource, Load.on(Tier.T1, 1).forever())
+                             .withImmediateSegmentLoading(true)
+                             .build();
+
+    startSimulation(sim);
+    runCoordinatorCycle();
+
+    // The historical is assigned several segments but loads only upto its capacity
+    verifyValue(Metric.ASSIGNED_COUNT, 10L);
+    Assert.assertEquals(2, historicalT11.getTotalSegments());
+  }
+
+  /**
+   * Correct behaviour: For a fully replicated segment, items that are in the load
+   * queue should get cancelled so that the coordinator does not have to wait
+   * for the loads to finish and then take remedial action.
+   * <p>
+   * Fix Apache #12881 to fix this test case.
+   */
+  @Test
+  public void testLoadOfFullyReplicatedSegmentIsNotCancelled()
+  {
+    // disable balancing, unlimited load queue, replicationThrottleLimit = 10
+    CoordinatorDynamicConfig dynamicConfig = createDynamicConfig(0, 0, 10);
+
+    // historicals = 2(in T1), replicas = 2(on T1)
+    final CoordinatorSimulation sim =
+        CoordinatorSimulation.builder()
+                             .withSegments(segments)
+                             .withServers(historicalT11, historicalT12)
+                             .withDynamicConfig(dynamicConfig)
+                             .withRules(datasource, Load.on(Tier.T1, 2).forever())
+                             .build();
+
+    // Put the first replica of all the segments on histT11
+    segments.forEach(historicalT11::addDataSegment);
+
+    startSimulation(sim);
+    runCoordinatorCycle();
+
+    // Verify that there are segments in the load queue
+    verifyValue(Metric.ASSIGNED_COUNT, 10L);
+    verifyValue(
+        Metric.LOAD_QUEUE_COUNT,
+        filter(DruidMetrics.SERVER, historicalT12.getName()),
+        10
+    );
+
+    // Put the second replica of all the segments on histT12
+    segments.forEach(historicalT12::addDataSegment);
+
+    runCoordinatorCycle();
+
+    // Verify that the segments are still in the load queue
+    verifyValue(
+        Metric.LOAD_QUEUE_COUNT,
+        filter(DruidMetrics.SERVER, historicalT12.getName()),
+        10
+    );
+  }
+
+  /**
+   * Correct behaviour: Balancing should never cause over-replication, even when
+   * the inventory view is not updated.
+   * <p>
+   * Fix Apache #12881 to fix this test.
+   */
+  @Test
+  public void testBalancingWithStaleInventoryCausesOverReplication()
+  {
+    // maxSegmentsToMove = 10, unlimited load queue, replicationThrottleLimit = 10
+    CoordinatorDynamicConfig dynamicConfig = createDynamicConfig(10, 0, 10);
+
+    // historicals = 2(T1), replicas = 1(T1)
+    final CoordinatorSimulation sim =
+        CoordinatorSimulation.builder()
+                             .withSegments(segments)
+                             .withServers(historicalT11, historicalT12)
+                             .withRules(datasource, Load.on(Tier.T1, 1).forever())
+                             .withDynamicConfig(dynamicConfig)
+                             .withAutoInventorySync(false)
+                             .build();
+
+    // Put all the segments on histT11
+    segments.forEach(historicalT11::addDataSegment);
+
+    startSimulation(sim);
+    syncInventoryView();
+    runCoordinatorCycle();
+
+    // Verify that segments have been chosen for balancing
+    verifyValue(Metric.MOVED_COUNT, 5L);
+
+    loadQueuedSegments();
+
+    // Verify that segments have now been balanced out
+    Assert.assertEquals(10, historicalT11.getTotalSegments());
+    Assert.assertEquals(5, historicalT12.getTotalSegments());
+    verifyDatasourceIsFullyLoaded(datasource);
+  }
+
+}
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/SegmentLoadingTest.java b/server/src/test/java/org/apache/druid/server/coordinator/simulate/SegmentLoadingTest.java
new file mode 100644
index 0000000000..1edeab8a37
--- /dev/null
+++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/SegmentLoadingTest.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.simulate;
+
+import org.apache.druid.client.DruidServer;
+import org.apache.druid.query.DruidMetrics;
+import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
+import org.apache.druid.timeline.DataSegment;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.List;
+
+/**
+ * Coordinator simulation test to verify behaviour of segment loading.
+ */
+public class SegmentLoadingTest extends CoordinatorSimulationBaseTest
+{
+  private DruidServer historicalT11;
+  private DruidServer historicalT12;
+  private DruidServer historicalT21;
+  private DruidServer historicalT22;
+
+  private final String datasource = DS.WIKI;
+  private final List<DataSegment> segments = Segments.WIKI_10X1D;
+
+  @Override
+  public void setUp()
+  {
+    // Setup historicals for 2 tiers, size 10 GB each
+    historicalT11 = createHistorical(1, Tier.T1, 10_000);
+    historicalT12 = createHistorical(2, Tier.T1, 10_000);
+
+    historicalT21 = createHistorical(1, Tier.T2, 10_000);
+    historicalT22 = createHistorical(2, Tier.T2, 10_000);
+  }
+
+  @Test
+  public void testSecondReplicaOnAnyTierIsThrottled()
+  {
+    // Disable balancing, infinite load queue size, replicateThrottleLimit = 2
+    CoordinatorDynamicConfig dynamicConfig = createDynamicConfig(0, 0, 2);
+
+    // historicals = 2(in T1)
+    // replicas = 2(on T1)
+    final CoordinatorSimulation sim =
+        CoordinatorSimulation.builder()
+                             .withSegments(segments)
+                             .withServers(historicalT11, historicalT12)
+                             .withRules(datasource, Load.on(Tier.T1, 2).forever())
+                             .withDynamicConfig(dynamicConfig)
+                             .build();
+
+    // Put the first replica of all the segments on histT11
+    segments.forEach(historicalT11::addDataSegment);
+
+    startSimulation(sim);
+    runCoordinatorCycle();
+
+    // Verify that that replicationThrottleLimit is honored
+    verifyValue(Metric.ASSIGNED_COUNT, 2L);
+
+    loadQueuedSegments();
+    Assert.assertEquals(10, historicalT11.getTotalSegments());
+    Assert.assertEquals(2, historicalT12.getTotalSegments());
+  }
+
+  @Test
+  public void testLoadingDoesNotOverassignHistorical()
+  {
+    // historicals = 1(in T1), size 1 GB
+    final DruidServer historicalT11 = createHistorical(1, Tier.T1, 1000);
+
+    // disable balancing, unlimited load queue, replicationThrottleLimit = 10
+    CoordinatorDynamicConfig dynamicConfig = createDynamicConfig(0, 0, 10);
+
+    // segments = 10*1day, size 500 MB
+    // strategy = cost, replicas = 1(T1)
+    final CoordinatorSimulation sim =
+        CoordinatorSimulation.builder()
+                             .withSegments(segments)
+                             .withServers(historicalT11)
+                             .withDynamicConfig(dynamicConfig)
+                             .withRules(datasource, Load.on(Tier.T1, 1).forever())
+                             .withImmediateSegmentLoading(false)
+                             .build();
+
+    startSimulation(sim);
+    runCoordinatorCycle();
+
+    // Verify that the number of segments assigned is within the historical capacity
+    verifyValue(Metric.ASSIGNED_COUNT, 2L);
+    loadQueuedSegments();
+    Assert.assertEquals(2, historicalT11.getTotalSegments());
+  }
+
+  @Test
+  public void testDropHappensAfterTargetReplicationOnEveryTier()
+  {
+    // maxNonPrimaryReplicants = 33 ensures that all target replicas (total 4)
+    // are assigned for some segments in the first run itself (pigeon-hole)
+    CoordinatorDynamicConfig dynamicConfig =
+        CoordinatorDynamicConfig.builder()
+                                .withMaxSegmentsToMove(0)
+                                .withReplicationThrottleLimit(10)
+                                .withMaxNonPrimaryReplicantsToLoad(33)
+                                .build();
+
+    // historicals = 1(in T1) + 2(in T2) + 2(in T3)
+    // segments = 10 * 1day, replicas = 2(T2) + 2(T3)
+    final DruidServer historicalT31 = createHistorical(1, Tier.T3, 10_000);
+    final DruidServer historicalT32 = createHistorical(2, Tier.T3, 10_000);
+    final CoordinatorSimulation sim =
+        CoordinatorSimulation.builder()
+                             .withSegments(segments)
+                             .withDynamicConfig(dynamicConfig)
+                             .withRules(datasource, Load.on(Tier.T2, 2).andOn(Tier.T3, 2).forever())
+                             .withServers(
+                                 historicalT11,
+                                 historicalT21,
+                                 historicalT22,
+                                 historicalT31,
+                                 historicalT32
+                             )
+                             .build();
+
+    // At the start, T1 has all the segments
+    segments.forEach(historicalT11::addDataSegment);
+
+    // Run 1: Nothing is dropped from T1 but things are assigned to T2 and T3
+    startSimulation(sim);
+    runCoordinatorCycle();
+
+    verifyNoEvent(Metric.DROPPED_COUNT);
+    int totalAssignedInRun1
+        = getValue(Metric.ASSIGNED_COUNT, filter(DruidMetrics.TIER, Tier.T2)).intValue()
+          + getValue(Metric.ASSIGNED_COUNT, filter(DruidMetrics.TIER, Tier.T3)).intValue();
+    Assert.assertTrue(totalAssignedInRun1 > 0 && totalAssignedInRun1 < 40);
+
+    // Run 2: Segments still queued, nothing is dropped from T1
+    runCoordinatorCycle();
+    loadQueuedSegments();
+
+    verifyNoEvent(Metric.DROPPED_COUNT);
+    int totalLoadedAfterRun2
+        = historicalT21.getTotalSegments() + historicalT22.getTotalSegments()
+          + historicalT31.getTotalSegments() + historicalT32.getTotalSegments();
+    Assert.assertEquals(totalAssignedInRun1, totalLoadedAfterRun2);
+
+    // Run 3: Some segments have been loaded
+    // segments fully replicated on T2 and T3 will now be dropped from T1
+    runCoordinatorCycle();
+    loadQueuedSegments();
+
+    int totalDroppedInRun3
+        = getValue(Metric.DROPPED_COUNT, filter(DruidMetrics.TIER, Tier.T1)).intValue();
+    Assert.assertTrue(totalDroppedInRun3 > 0 && totalDroppedInRun3 < 10);
+    int totalLoadedAfterRun3
+        = historicalT21.getTotalSegments() + historicalT22.getTotalSegments()
+          + historicalT31.getTotalSegments() + historicalT32.getTotalSegments();
+    Assert.assertEquals(40, totalLoadedAfterRun3);
+
+    // Run 4: All segments are fully replicated on T2 and T3
+    runCoordinatorCycle();
+    loadQueuedSegments();
+
+    int totalDroppedInRun4
+        = getValue(Metric.DROPPED_COUNT, filter(DruidMetrics.TIER, Tier.T1)).intValue();
+
+    Assert.assertEquals(10, totalDroppedInRun3 + totalDroppedInRun4);
+    Assert.assertEquals(0, historicalT11.getTotalSegments());
+    verifyDatasourceIsFullyLoaded(datasource);
+  }
+
+}
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestDruidLeaderSelector.java b/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestDruidLeaderSelector.java
new file mode 100644
index 0000000000..d84cbcff6e
--- /dev/null
+++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestDruidLeaderSelector.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.simulate;
+
+import org.apache.druid.discovery.DruidLeaderSelector;
+
+import javax.annotation.Nullable;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class TestDruidLeaderSelector implements DruidLeaderSelector
+{
+  private final AtomicBoolean isLeader = new AtomicBoolean(false);
+  private volatile Listener listener;
+
+  public void becomeLeader()
+  {
+    if (isLeader.compareAndSet(false, true) && listener != null) {
+      listener.becomeLeader();
+    }
+  }
+
+  public void stopBeingLeader()
+  {
+    if (isLeader.compareAndSet(true, false) && listener != null) {
+      listener.stopBeingLeader();
+    }
+  }
+
+  @Nullable
+  @Override
+  public String getCurrentLeader()
+  {
+    return "me";
+  }
+
+  @Override
+  public boolean isLeader()
+  {
+    return isLeader.get();
+  }
+
+  @Override
+  public int localTerm()
+  {
+    return 0;
+  }
+
+  @Override
+  public void registerListener(Listener listener)
+  {
+    this.listener = listener;
+    if (isLeader()) {
+      listener.becomeLeader();
+    }
+  }
+
+  @Override
+  public void unregisterListener()
+  {
+    listener = null;
+  }
+}
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestMetadataRuleManager.java b/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestMetadataRuleManager.java
new file mode 100644
index 0000000000..9ca037b0cf
--- /dev/null
+++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestMetadataRuleManager.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.simulate;
+
+import org.apache.druid.audit.AuditInfo;
+import org.apache.druid.metadata.MetadataRuleManager;
+import org.apache.druid.server.coordinator.rules.ForeverLoadRule;
+import org.apache.druid.server.coordinator.rules.Rule;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class TestMetadataRuleManager implements MetadataRuleManager
+{
+  private final Map<String, List<Rule>> rules = new HashMap<>();
+
+  private static final String DEFAULT_DATASOURCE = "_default";
+
+  public TestMetadataRuleManager()
+  {
+    rules.put(
+        DEFAULT_DATASOURCE,
+        Collections.singletonList(new ForeverLoadRule(null))
+    );
+  }
+
+  @Override
+  public void start()
+  {
+    // do nothing
+  }
+
+  @Override
+  public void stop()
+  {
+    // do nothing
+  }
+
+  @Override
+  public void poll()
+  {
+    // do nothing
+  }
+
+  @Override
+  public Map<String, List<Rule>> getAllRules()
+  {
+    return rules;
+  }
+
+  @Override
+  public List<Rule> getRules(final String dataSource)
+  {
+    List<Rule> retVal = rules.get(dataSource);
+    return retVal == null ? new ArrayList<>() : retVal;
+  }
+
+  @Override
+  public List<Rule> getRulesWithDefault(final String dataSource)
+  {
+    List<Rule> retVal = new ArrayList<>();
+    final Map<String, List<Rule>> theRules = rules;
+    if (theRules.get(dataSource) != null) {
+      retVal.addAll(theRules.get(dataSource));
+    }
+    if (theRules.get(DEFAULT_DATASOURCE) != null) {
+      retVal.addAll(theRules.get(DEFAULT_DATASOURCE));
+    }
+    return retVal;
+  }
+
+  @Override
+  public boolean overrideRule(final String dataSource, final List<Rule> newRules, final AuditInfo auditInfo)
+  {
+    rules.put(dataSource, newRules);
+    return true;
+  }
+
+  @Override
+  public int removeRulesForEmptyDatasourcesOlderThan(long timestamp)
+  {
+    return 0;
+  }
+
+  public void removeRulesForDatasource(String dataSource)
+  {
+    if (!DEFAULT_DATASOURCE.equals(dataSource)) {
+      rules.remove(dataSource);
+    }
+  }
+}
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentLoadingHttpClient.java b/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentLoadingHttpClient.java
new file mode 100644
index 0000000000..0b91e70090
--- /dev/null
+++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentLoadingHttpClient.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.simulate;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningScheduledExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import org.apache.druid.java.util.http.client.HttpClient;
+import org.apache.druid.java.util.http.client.Request;
+import org.apache.druid.java.util.http.client.response.HttpResponseHandler;
+import org.apache.druid.server.coordination.DataSegmentChangeCallback;
+import org.apache.druid.server.coordination.DataSegmentChangeHandler;
+import org.apache.druid.server.coordination.DataSegmentChangeRequest;
+import org.apache.druid.server.coordination.SegmentLoadDropHandler;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
+import org.jboss.netty.handler.codec.http.HttpResponse;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+import org.jboss.netty.handler.codec.http.HttpVersion;
+import org.joda.time.Duration;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public class TestSegmentLoadingHttpClient implements HttpClient
+{
+  private static final HttpResponseHandler.TrafficCop NOOP_TRAFFIC_COP = checkNum -> 0L;
+  private static final DataSegmentChangeCallback NOOP_CALLBACK = () -> {
+  };
+
+  private final ObjectMapper objectMapper;
+  private final Function<String, DataSegmentChangeHandler> hostToHandler;
+
+  private final ListeningScheduledExecutorService executorService;
+
+  public TestSegmentLoadingHttpClient(
+      ObjectMapper objectMapper,
+      Function<String, DataSegmentChangeHandler> hostToHandler,
+      ScheduledExecutorService executorService
+  )
+  {
+    this.objectMapper = objectMapper;
+    this.hostToHandler = hostToHandler;
+    this.executorService = MoreExecutors.listeningDecorator(executorService);
+  }
+
+  @Override
+  public <Intermediate, Final> ListenableFuture<Final> go(
+      Request request,
+      HttpResponseHandler<Intermediate, Final> handler
+  )
+  {
+    return go(request, handler, null);
+  }
+
+  @Override
+  public <Intermediate, Final> ListenableFuture<Final> go(
+      Request request,
+      HttpResponseHandler<Intermediate, Final> handler,
+      Duration readTimeout
+  )
+  {
+    return executorService.submit(() -> processRequest(request, handler));
+  }
+
+  private <Intermediate, Final> Final processRequest(
+      Request request,
+      HttpResponseHandler<Intermediate, Final> handler
+  )
+  {
+    try {
+      // Fail the request if there is no handler for this host
+      final DataSegmentChangeHandler changeHandler = hostToHandler
+          .apply(request.getUrl().getHost());
+      if (changeHandler == null) {
+        final HttpResponse failureResponse =
+            new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NOT_FOUND);
+        failureResponse.setContent(ChannelBuffers.EMPTY_BUFFER);
+        handler.handleResponse(failureResponse, NOOP_TRAFFIC_COP);
+        return (Final) new ByteArrayInputStream(new byte[0]);
+      }
+
+      // Handle change requests and serialize
+      final byte[] serializedContent;
+      try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+        objectMapper.writeValue(baos, processRequest(request, changeHandler));
+        serializedContent = baos.toByteArray();
+      }
+
+      // Set response content and status
+      final HttpResponse response =
+          new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
+      response.setContent(ChannelBuffers.EMPTY_BUFFER);
+      handler.handleResponse(response, NOOP_TRAFFIC_COP);
+      return (Final) new ByteArrayInputStream(serializedContent);
+    }
+    catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * Processes all the changes in the request.
+   */
+  private List<SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus> processRequest(
+      Request request,
+      DataSegmentChangeHandler changeHandler
+  ) throws IOException
+  {
+    final List<DataSegmentChangeRequest> changeRequests = objectMapper.readValue(
+        request.getContent().array(),
+        new TypeReference<List<DataSegmentChangeRequest>>()
+        {
+        }
+    );
+
+    return changeRequests
+        .stream()
+        .map(changeRequest -> processRequest(changeRequest, changeHandler))
+        .collect(Collectors.toList());
+  }
+
+  /**
+   * Processes each DataSegmentChangeRequest using the handler.
+   */
+  private SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus processRequest(
+      DataSegmentChangeRequest request,
+      DataSegmentChangeHandler handler
+  )
+  {
+    SegmentLoadDropHandler.Status status;
+    try {
+      request.go(handler, NOOP_CALLBACK);
+      status = SegmentLoadDropHandler.Status.SUCCESS;
+    }
+    catch (Exception e) {
+      status = SegmentLoadDropHandler.Status.failed(e.getMessage());
+    }
+
+    return new SegmentLoadDropHandler
+        .DataSegmentChangeRequestAndStatus(request, status);
+  }
+}
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentsMetadataManager.java b/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentsMetadataManager.java
new file mode 100644
index 0000000000..43a96d6007
--- /dev/null
+++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentsMetadataManager.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.simulate;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.client.DataSourcesSnapshot;
+import org.apache.druid.client.ImmutableDruidDataSource;
+import org.apache.druid.metadata.SegmentsMetadataManager;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.Partitions;
+import org.apache.druid.timeline.SegmentId;
+import org.apache.druid.timeline.VersionedIntervalTimeline;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+public class TestSegmentsMetadataManager implements SegmentsMetadataManager
+{
+  private final ConcurrentMap<String, DataSegment> segments = new ConcurrentHashMap<>();
+  private final ConcurrentMap<String, DataSegment> usedSegments = new ConcurrentHashMap<>();
+
+  public void addSegment(DataSegment segment)
+  {
+    segments.put(segment.getId().toString(), segment);
+    usedSegments.put(segment.getId().toString(), segment);
+  }
+
+  public void removeSegment(DataSegment segment)
+  {
+    segments.remove(segment.getId().toString());
+    usedSegments.remove(segment.getId().toString());
+  }
+
+  @Override
+  public void startPollingDatabasePeriodically()
+  {
+
+  }
+
+  @Override
+  public void stopPollingDatabasePeriodically()
+  {
+
+  }
+
+  @Override
+  public boolean isPollingDatabasePeriodically()
+  {
+    return true;
+  }
+
+  @Override
+  public int markAsUsedAllNonOvershadowedSegmentsInDataSource(String dataSource)
+  {
+    return 0;
+  }
+
+  @Override
+  public int markAsUsedNonOvershadowedSegmentsInInterval(String dataSource, Interval interval)
+  {
+    return 0;
+  }
+
+  @Override
+  public int markAsUsedNonOvershadowedSegments(String dataSource, Set<String> segmentIds)
+  {
+    return 0;
+  }
+
+  @Override
+  public boolean markSegmentAsUsed(String segmentId)
+  {
+    if (!segments.containsKey(segmentId)) {
+      return false;
+    }
+
+    usedSegments.put(segmentId, segments.get(segmentId));
+    return true;
+  }
+
+  @Override
+  public int markAsUnusedAllSegmentsInDataSource(String dataSource)
+  {
+    return 0;
+  }
+
+  @Override
+  public int markAsUnusedSegmentsInInterval(String dataSource, Interval interval)
+  {
+    return 0;
+  }
+
+  @Override
+  public int markSegmentsAsUnused(Set<SegmentId> segmentIds)
+  {
+    int numModifiedSegments = 0;
+    for (SegmentId segmentId : segmentIds) {
+      if (usedSegments.remove(segmentId.toString()) != null) {
+        ++numModifiedSegments;
+      }
+    }
+    return numModifiedSegments;
+  }
+
+  @Override
+  public boolean markSegmentAsUnused(SegmentId segmentId)
+  {
+    return usedSegments.remove(segmentId.toString()) != null;
+  }
+
+  @Nullable
+  @Override
+  public ImmutableDruidDataSource getImmutableDataSourceWithUsedSegments(String dataSource)
+  {
+    return null;
+  }
+
+  @Override
+  public Collection<ImmutableDruidDataSource> getImmutableDataSourcesWithAllUsedSegments()
+  {
+    return getSnapshotOfDataSourcesWithAllUsedSegments().getDataSourcesWithAllUsedSegments();
+  }
+
+  @Override
+  public Set<SegmentId> getOvershadowedSegments()
+  {
+    return getSnapshotOfDataSourcesWithAllUsedSegments().getOvershadowedSegments();
+  }
+
+  @Override
+  public DataSourcesSnapshot getSnapshotOfDataSourcesWithAllUsedSegments()
+  {
+    return DataSourcesSnapshot.fromUsedSegments(usedSegments.values(), ImmutableMap.of());
+  }
+
+  @Override
+  public Iterable<DataSegment> iterateAllUsedSegments()
+  {
+    return usedSegments.values();
+  }
+
+  @Override
+  public Optional<Iterable<DataSegment>> iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(
+      String datasource,
+      Interval interval,
+      boolean requiresLatest
+  )
+  {
+    VersionedIntervalTimeline<String, DataSegment> usedSegmentsTimeline
+        = getSnapshotOfDataSourcesWithAllUsedSegments().getUsedSegmentsTimelinesPerDataSource().get(datasource);
+    return Optional.fromNullable(usedSegmentsTimeline)
+                   .transform(timeline -> timeline.findNonOvershadowedObjectsInInterval(
+                       interval,
+                       Partitions.ONLY_COMPLETE
+                   ));
+  }
+
+  @Override
+  public Set<String> retrieveAllDataSourceNames()
+  {
+    return null;
+  }
+
+  @Override
+  public List<Interval> getUnusedSegmentIntervals(String dataSource, DateTime maxEndTime, int limit)
+  {
+    return null;
+  }
+
+  @Override
+  public void poll()
+  {
+
+  }
+}
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestServerInventoryView.java b/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestServerInventoryView.java
new file mode 100644
index 0000000000..fedafc45c9
--- /dev/null
+++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestServerInventoryView.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.simulate;
+
+import org.apache.druid.client.DruidServer;
+import org.apache.druid.client.ServerInventoryView;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.server.coordination.DataSegmentChangeCallback;
+import org.apache.druid.server.coordination.DataSegmentChangeHandler;
+import org.apache.druid.timeline.DataSegment;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+
+public class TestServerInventoryView implements ServerInventoryView
+{
+  private static final Logger log = new Logger(TestServerInventoryView.class);
+
+  private final ConcurrentHashMap<String, DruidServer> servers = new ConcurrentHashMap<>();
+  private final ConcurrentHashMap<String, DataSegmentChangeHandler> segmentChangeHandlers = new ConcurrentHashMap<>();
+
+  private final ConcurrentHashMap<SegmentCallback, Executor> segmentCallbacks = new ConcurrentHashMap<>();
+  private final List<ServerChangeHandler> serverChangeHandlers = new ArrayList<>();
+
+  public void setUp()
+  {
+    segmentCallbacks.forEach(
+        (segmentCallback, executor) ->
+            executor.execute(segmentCallback::segmentViewInitialized)
+    );
+  }
+
+  /**
+   * Synchronizes this inventory view with the given inventory view.
+   */
+  public void sync(ServerInventoryView other)
+  {
+    // Clear the current inventory
+    for (ServerChangeHandler handler : serverChangeHandlers) {
+      servers.values().forEach(handler::removeServer);
+    }
+    servers.clear();
+    segmentChangeHandlers.clear();
+
+    for (DruidServer server : other.getInventory()) {
+      addServer(new DruidServer(
+          server.getName(),
+          server.getHostAndPort(),
+          server.getHostAndTlsPort(),
+          server.getMaxSize(),
+          server.getType(),
+          server.getTier(),
+          server.getPriority()
+      ));
+      DataSegmentChangeHandler handler = getChangeHandlerForHost(server.getName());
+      for (DataSegment segment : server.iterateAllSegments()) {
+        handler.addSegment(segment, null);
+      }
+    }
+  }
+
+  public void addServer(DruidServer server)
+  {
+    servers.put(server.getName(), server);
+    segmentChangeHandlers.put(server.getName(), new SegmentChangeHandler(server));
+  }
+
+  public void removeServer(DruidServer server)
+  {
+    servers.remove(server.getName());
+    segmentChangeHandlers.remove(server.getName());
+
+    for (ServerChangeHandler handler : serverChangeHandlers) {
+      handler.removeServer(server);
+    }
+  }
+
+  public DataSegmentChangeHandler getChangeHandlerForHost(String serverName)
+  {
+    return segmentChangeHandlers.get(serverName);
+  }
+
+  @Nullable
+  @Override
+  public DruidServer getInventoryValue(String serverKey)
+  {
+    return servers.get(serverKey);
+  }
+
+  @Override
+  public Collection<DruidServer> getInventory()
+  {
+    return Collections.unmodifiableCollection(servers.values());
+  }
+
+  @Override
+  public boolean isStarted()
+  {
+    return true;
+  }
+
+  @Override
+  public boolean isSegmentLoadedByServer(String serverKey, DataSegment segment)
+  {
+    DruidServer server = servers.get(serverKey);
+    return server != null && server.getSegment(segment.getId()) != null;
+  }
+
+  @Override
+  public void registerServerRemovedCallback(Executor exec, ServerRemovedCallback callback)
+  {
+    serverChangeHandlers.add(new ServerChangeHandler(callback, exec));
+  }
+
+  @Override
+  public void registerSegmentCallback(Executor exec, SegmentCallback callback)
+  {
+    segmentCallbacks.put(callback, exec);
+  }
+
+  private class SegmentChangeHandler implements DataSegmentChangeHandler
+  {
+    private final DruidServer server;
+
+    private SegmentChangeHandler(DruidServer server)
+    {
+      this.server = server;
+    }
+
+    @Override
+    public void addSegment(
+        DataSegment segment,
+        @Nullable DataSegmentChangeCallback callback
+    )
+    {
+      log.debug("Adding segment [%s] to server [%s]", segment.getId(), server.getName());
+
+      if (server.getMaxSize() - server.getCurrSize() >= segment.getSize()) {
+        server.addDataSegment(segment);
+        segmentCallbacks.forEach(
+            (segmentCallback, executor) -> executor.execute(
+                () -> segmentCallback.segmentAdded(server.getMetadata(), segment)
+            )
+        );
+      } else {
+        throw new ISE(
+            "Not enough free space on server %s. Segment size [%d]. Current free space [%d]",
+            server.getName(),
+            segment.getSize(),
+            server.getMaxSize() - server.getCurrSize()
+        );
+      }
+    }
+
+    @Override
+    public void removeSegment(
+        DataSegment segment,
+        @Nullable DataSegmentChangeCallback callback
+    )
+    {
+      log.debug("Removing segment [%s] from server [%s]", segment.getId(), server.getName());
+      server.removeDataSegment(segment.getId());
+      segmentCallbacks.forEach(
+          (segmentCallback, executor) -> executor.execute(
+              () -> segmentCallback.segmentAdded(server.getMetadata(), segment)
+          )
+      );
+    }
+  }
+
+  private static class ServerChangeHandler
+  {
+    private final Executor executor;
+    private final ServerRemovedCallback callback;
+
+    private ServerChangeHandler(ServerRemovedCallback callback, Executor executor)
+    {
+      this.callback = callback;
+      this.executor = executor;
+    }
+
+    private void removeServer(DruidServer server)
+    {
+      executor.execute(() -> callback.serverRemoved(server));
+    }
+  }
+}
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/WrappingScheduledExecutorService.java b/server/src/test/java/org/apache/druid/server/coordinator/simulate/WrappingScheduledExecutorService.java
new file mode 100644
index 0000000000..334651ee30
--- /dev/null
+++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/WrappingScheduledExecutorService.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.simulate;
+
+import org.apache.druid.java.util.common.logger.Logger;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Wraps an {@link ExecutorService} into a {@link ScheduledExecutorService}.
+ */
+public class WrappingScheduledExecutorService implements ScheduledExecutorService
+{
+  private static final Logger log = new Logger(WrappingScheduledExecutorService.class);
+
+  private final String nameFormat;
+  private final ExecutorService delegate;
+  private final boolean ignoreScheduledTasks;
+
+  public WrappingScheduledExecutorService(
+      String nameFormat,
+      ExecutorService delegate,
+      boolean ignoreScheduledTasks
+  )
+  {
+    this.nameFormat = nameFormat;
+    this.delegate = delegate;
+    this.ignoreScheduledTasks = ignoreScheduledTasks;
+  }
+
+  @Override
+  public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
+  {
+    if (ignoreScheduledTasks) {
+      log.debug("[%s] Ignoring scheduled task", nameFormat);
+      return new WrappingScheduledFuture<>(CompletableFuture.completedFuture(null));
+    }
+
+    // Ignore the delay and just queue the task
+    return new WrappingScheduledFuture<>(submit(command));
+  }
+
+  @Override
+  public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit)
+  {
+    if (ignoreScheduledTasks) {
+      log.debug("[%s] Ignoring scheduled task", nameFormat);
+      return new WrappingScheduledFuture<>(CompletableFuture.completedFuture(null));
+    }
+
+    // Ignore the delay and just queue the task
+    return new WrappingScheduledFuture<>(submit(callable));
+  }
+
+  @Override
+  public ScheduledFuture<?> scheduleAtFixedRate(
+      Runnable command,
+      long initialDelay,
+      long period,
+      TimeUnit unit
+  )
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public ScheduledFuture<?> scheduleWithFixedDelay(
+      Runnable command,
+      long initialDelay,
+      long delay,
+      TimeUnit unit
+  )
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void shutdown()
+  {
+    delegate.shutdown();
+  }
+
+  @Override
+  public List<Runnable> shutdownNow()
+  {
+    return delegate.shutdownNow();
+  }
+
+  @Override
+  public boolean isShutdown()
+  {
+    return delegate.isShutdown();
+  }
+
+  @Override
+  public boolean isTerminated()
+  {
+    return delegate.isTerminated();
+  }
+
+  @Override
+  public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException
+  {
+    return delegate.awaitTermination(timeout, unit);
+  }
+
+  @Override
+  public <T> Future<T> submit(Callable<T> task)
+  {
+    return delegate.submit(task);
+  }
+
+  @Override
+  public <T> Future<T> submit(Runnable task, T result)
+  {
+    return delegate.submit(task, result);
+  }
+
+  @Override
+  public Future<?> submit(Runnable task)
+  {
+    return delegate.submit(task);
+  }
+
+  @Override
+  public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException
+  {
+    return delegate.invokeAll(tasks);
+  }
+
+  @Override
+  public <T> List<Future<T>> invokeAll(
+      Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit
+  ) throws InterruptedException
+  {
+    return delegate.invokeAll(tasks, timeout, unit);
+  }
+
+  @Override
+  public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException
+  {
+    return delegate.invokeAny(tasks);
+  }
+
+  @Override
+  public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
+      throws InterruptedException, ExecutionException, TimeoutException
+  {
+    return delegate.invokeAny(tasks, timeout, unit);
+  }
+
+  @Override
+  public void execute(Runnable command)
+  {
+    delegate.execute(command);
+  }
+
+  /**
+   * Wraps a Future into a ScheduledFuture.
+   */
+  private static class WrappingScheduledFuture<V> implements ScheduledFuture<V>
+  {
+    private final Future<V> future;
+
+    private WrappingScheduledFuture(Future<V> future)
+    {
+      this.future = future;
+    }
+
+    @Override
+    public long getDelay(TimeUnit unit)
+    {
+      return 0;
+    }
+
+    @Override
+    public int compareTo(Delayed o)
+    {
+      return 0;
+    }
+
+    @Override
+    public boolean cancel(boolean mayInterruptIfRunning)
+    {
+      return future.cancel(mayInterruptIfRunning);
+    }
+
+    @Override
+    public boolean isCancelled()
+    {
+      return future.isCancelled();
+    }
+
+    @Override
+    public boolean isDone()
+    {
+      return future.isDone();
+    }
+
+    @Override
+    public V get() throws InterruptedException, ExecutionException
+    {
+      return future.get();
+    }
+
+    @Override
+    public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException
+    {
+      return future.get(timeout, unit);
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org