You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by kf...@apache.org on 2022/09/21 04:22:06 UTC
[druid] branch master updated: Add test framework to simulate segment loading and balancing (#13074)
This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 0039409817 Add test framework to simulate segment loading and balancing (#13074)
0039409817 is described below
commit 0039409817530bb0eaed0a78549b278e56d51778
Author: Kashif Faraz <ka...@gmail.com>
AuthorDate: Wed Sep 21 09:51:58 2022 +0530
Add test framework to simulate segment loading and balancing (#13074)
Fixes #12822
The framework added here make it easy to write tests that verify the behaviour and interactions
of the following entities under various conditions:
- `DruidCoordinator`
- `HttpLoadQueuePeon`, `LoadQueueTaskMaster`
- coordinator duties: `BalanceSegments`, `RunRules`, `UnloadUnusedSegments`, etc.
- datasource retention rules: `LoadRule`, `DropRule`
Changes:
Add the following main classes:
- `CoordinatorSimulation` and related interfaces to dictate behaviour of simulation
- `CoordinatorSimulationBuilder` to build a simulation.
- `BlockingExecutorService` to keep submitted tasks in queue and execute them
only when explicitly invoked.
Add tests:
- `CoordinatorSimulationBaseTest`, `SegmentLoadingTest`, `SegmentBalancingTest`
- `SegmentLoadingNegativeTest` to contain tests which assert the existing erroneous behaviour
of segment loading. Once the behaviour is fixed, these tests will be moved to the regular
`SegmentLoadingTest`.
Please refer to the README.md in `org.apache.druid.server.coordinator.simulate` for more details
---
.../java/util/metrics/StubServiceEmitter.java | 20 +-
.../druid/server/coordinator/DruidCoordinator.java | 8 +
.../server/coordinator/duty/BalanceSegments.java | 1 +
.../server/coordinator/BalanceSegmentsTest.java | 22 +-
.../server/coordinator/CreateDataSegments.java | 135 +++++
.../druid/server/coordinator/RunRulesTest.java | 26 +-
.../simulate/BlockingExecutorService.java | 237 +++++++++
.../simulate/CoordinatorSimulation.java | 102 ++++
.../simulate/CoordinatorSimulationBaseTest.java | 307 ++++++++++++
.../simulate/CoordinatorSimulationBuilder.java | 554 +++++++++++++++++++++
.../druid/server/coordinator/simulate/README.md | 141 ++++++
.../coordinator/simulate/SegmentBalancingTest.java | 128 +++++
.../simulate/SegmentLoadingNegativeTest.java | 260 ++++++++++
.../coordinator/simulate/SegmentLoadingTest.java | 192 +++++++
.../simulate/TestDruidLeaderSelector.java | 79 +++
.../simulate/TestMetadataRuleManager.java | 111 +++++
.../simulate/TestSegmentLoadingHttpClient.java | 167 +++++++
.../simulate/TestSegmentsMetadataManager.java | 199 ++++++++
.../simulate/TestServerInventoryView.java | 210 ++++++++
.../simulate/WrappingScheduledExecutorService.java | 240 +++++++++
20 files changed, 3105 insertions(+), 34 deletions(-)
diff --git a/core/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java b/core/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java
index 38f715f848..653dc8a08a 100644
--- a/core/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java
+++ b/core/src/test/java/org/apache/druid/java/util/metrics/StubServiceEmitter.java
@@ -21,13 +21,15 @@ package org.apache.druid.java.util.metrics;
import org.apache.druid.java.util.emitter.core.Event;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import java.util.ArrayList;
import java.util.List;
public class StubServiceEmitter extends ServiceEmitter
{
- private List<Event> events = new ArrayList<>();
+ private final List<Event> events = new ArrayList<>();
+ private final List<ServiceMetricEvent> metricEvents = new ArrayList<>();
public StubServiceEmitter(String service, String host)
{
@@ -37,14 +39,28 @@ public class StubServiceEmitter extends ServiceEmitter
@Override
public void emit(Event event)
{
+ if (event instanceof ServiceMetricEvent) {
+ metricEvents.add((ServiceMetricEvent) event);
+ }
events.add(event);
}
+ /**
+ * Gets all the events emitted since the previous {@link #flush()}.
+ */
public List<Event> getEvents()
{
return events;
}
+ /**
+ * Gets all the metric events emitted since the previous {@link #flush()}.
+ */
+ public List<ServiceMetricEvent> getMetricEvents()
+ {
+ return metricEvents;
+ }
+
@Override
public void start()
{
@@ -53,6 +69,8 @@ public class StubServiceEmitter extends ServiceEmitter
@Override
public void flush()
{
+ events.clear();
+ metricEvents.clear();
}
@Override
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
index 6b1e29d491..df13471aa8 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
@@ -973,6 +973,14 @@ public class DruidCoordinator
{
return duties;
}
+
+ @Override
+ public String toString()
+ {
+ return "DutiesRunnable{" +
+ "dutiesRunnableAlias='" + dutiesRunnableAlias + '\'' +
+ '}';
+ }
}
/**
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java
index d2a1c4c8da..198a7cf5e8 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java
@@ -92,6 +92,7 @@ public class BalanceSegments implements CoordinatorDuty
)
{
+ log.info("Balancing segments in tier [%s]", tier);
if (params.getUsedSegments().size() == 0) {
log.info("Metadata segments are not available. Cannot balance.");
// suppress emit zero stats
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsTest.java
index d4a89abb3d..a7c594fe09 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/BalanceSegmentsTest.java
@@ -30,7 +30,6 @@ import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NoneShardSpec;
import org.easymock.EasyMock;
-import org.hamcrest.Matchers;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.After;
@@ -277,9 +276,9 @@ public class BalanceSegmentsTest
params = new BalanceSegmentsTester(coordinator).run(params);
EasyMock.verify(strategy);
Assert.assertEquals(3L, params.getCoordinatorStats().getTieredStat("movedCount", "normal"));
- Assert.assertThat(
- peon3.getSegmentsToLoad(),
- Matchers.is(Matchers.equalTo(ImmutableSet.of(segment1, segment3, segment4)))
+ Assert.assertEquals(
+ ImmutableSet.of(segment1, segment3, segment4),
+ peon3.getSegmentsToLoad()
);
}
@@ -289,7 +288,7 @@ public class BalanceSegmentsTest
DruidCoordinatorRuntimeParams params = setupParamsForDecommissioningMaxPercentOfMaxSegmentsToMove(0);
params = new BalanceSegmentsTester(coordinator).run(params);
Assert.assertEquals(1L, params.getCoordinatorStats().getTieredStat("movedCount", "normal"));
- Assert.assertThat(peon3.getSegmentsToLoad(), Matchers.is(Matchers.equalTo(ImmutableSet.of(segment1))));
+ Assert.assertEquals(ImmutableSet.of(segment1), peon3.getSegmentsToLoad());
}
@Test
@@ -298,7 +297,7 @@ public class BalanceSegmentsTest
DruidCoordinatorRuntimeParams params = setupParamsForDecommissioningMaxPercentOfMaxSegmentsToMove(10);
params = new BalanceSegmentsTester(coordinator).run(params);
Assert.assertEquals(1L, params.getCoordinatorStats().getTieredStat("movedCount", "normal"));
- Assert.assertThat(peon3.getSegmentsToLoad(), Matchers.is(Matchers.equalTo(ImmutableSet.of(segment2))));
+ Assert.assertEquals(ImmutableSet.of(segment2), peon3.getSegmentsToLoad());
}
/**
@@ -347,9 +346,9 @@ public class BalanceSegmentsTest
params = new BalanceSegmentsTester(coordinator).run(params);
EasyMock.verify(strategy);
Assert.assertEquals(3L, params.getCoordinatorStats().getTieredStat("movedCount", "normal"));
- Assert.assertThat(
- peon3.getSegmentsToLoad(),
- Matchers.is(Matchers.equalTo(ImmutableSet.of(segment2, segment3, segment4)))
+ Assert.assertEquals(
+ ImmutableSet.of(segment2, segment3, segment4),
+ peon3.getSegmentsToLoad()
);
}
@@ -603,10 +602,7 @@ public class BalanceSegmentsTest
params = new BalanceSegmentsTester(coordinator).run(params);
EasyMock.verify(strategy);
Assert.assertEquals(1L, params.getCoordinatorStats().getTieredStat("movedCount", "normal"));
- Assert.assertThat(
- peon3.getSegmentsToLoad(),
- Matchers.is(Matchers.equalTo(ImmutableSet.of(segment3)))
- );
+ Assert.assertEquals(ImmutableSet.of(segment3), peon3.getSegmentsToLoad());
}
@Test
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/CreateDataSegments.java b/server/src/test/java/org/apache/druid/server/coordinator/CreateDataSegments.java
new file mode 100644
index 0000000000..8f2c123891
--- /dev/null
+++ b/server/src/test/java/org/apache/druid/server/coordinator/CreateDataSegments.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator;
+
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.segment.IndexIO;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.partition.NumberedShardSpec;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Test utility to create {@link DataSegment}s for a given datasource.
+ */
+public class CreateDataSegments
+{
+ private final String datasource;
+
+ private DateTime startTime;
+ private Granularity granularity;
+ private int numPartitions;
+ private int numIntervals;
+
+ public static CreateDataSegments ofDatasource(String datasource)
+ {
+ return new CreateDataSegments(datasource);
+ }
+
+ private CreateDataSegments(String datasource)
+ {
+ this.datasource = datasource;
+ }
+
+ public CreateDataSegments forIntervals(int numIntervals, Granularity intervalSize)
+ {
+ this.numIntervals = numIntervals;
+ this.granularity = intervalSize;
+ return this;
+ }
+
+ public CreateDataSegments startingAt(String startOfFirstInterval)
+ {
+ this.startTime = DateTimes.of(startOfFirstInterval);
+ return this;
+ }
+
+ public CreateDataSegments withNumPartitions(int numPartitions)
+ {
+ this.numPartitions = numPartitions;
+ return this;
+ }
+
+ public List<DataSegment> eachOfSizeInMb(long sizeMb)
+ {
+ final List<DataSegment> segments = new ArrayList<>();
+
+ int uniqueIdInInterval = 0;
+ DateTime nextStart = startTime;
+ for (int numInterval = 0; numInterval < numIntervals; ++numInterval) {
+ Interval nextInterval = new Interval(nextStart, granularity.increment(nextStart));
+ for (int numPartition = 0; numPartition < numPartitions; ++numPartition) {
+ segments.add(
+ new NumberedDataSegment(
+ datasource,
+ nextInterval,
+ new NumberedShardSpec(numPartition, numPartitions),
+ ++uniqueIdInInterval,
+ sizeMb
+ )
+ );
+ }
+ nextStart = granularity.increment(nextStart);
+ }
+
+ return Collections.unmodifiableList(segments);
+ }
+
+ /**
+ * Simple implementation of DataSegment with a unique integer id to make debugging easier.
+ */
+ private static class NumberedDataSegment extends DataSegment
+ {
+ private final int uniqueId;
+
+ private NumberedDataSegment(
+ String datasource,
+ Interval interval,
+ NumberedShardSpec shardSpec,
+ int uinqueId,
+ long size
+ )
+ {
+ super(
+ datasource,
+ interval,
+ "1",
+ Collections.emptyMap(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ shardSpec,
+ IndexIO.CURRENT_VERSION_ID,
+ size
+ );
+ this.uniqueId = uinqueId;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "{" + getDataSource() + "::" + uniqueId + "}";
+ }
+ }
+}
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/RunRulesTest.java b/server/src/test/java/org/apache/druid/server/coordinator/RunRulesTest.java
index dc4a6f0abe..eb3be4c895 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/RunRulesTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/RunRulesTest.java
@@ -26,6 +26,7 @@ import com.google.common.util.concurrent.MoreExecutors;
import org.apache.druid.client.DruidServer;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceEventBuilder;
@@ -41,8 +42,6 @@ import org.apache.druid.server.coordinator.rules.LoadRule;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NoneShardSpec;
import org.easymock.EasyMock;
-import org.joda.time.DateTime;
-import org.joda.time.Interval;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -81,24 +80,11 @@ public class RunRulesTest
databaseRuleManager = EasyMock.createMock(MetadataRuleManager.class);
segmentsMetadataManager = EasyMock.createNiceMock(SegmentsMetadataManager.class);
- DateTime start = DateTimes.of("2012-01-01");
- usedSegments = new ArrayList<>();
- for (int i = 0; i < 24; i++) {
- usedSegments.add(
- new DataSegment(
- "test",
- new Interval(start, start.plusHours(1)),
- DateTimes.nowUtc().toString(),
- new HashMap<>(),
- new ArrayList<>(),
- new ArrayList<>(),
- NoneShardSpec.instance(),
- IndexIO.CURRENT_VERSION_ID,
- 1
- )
- );
- start = start.plusHours(1);
- }
+ usedSegments = CreateDataSegments.ofDatasource("test")
+ .forIntervals(24, Granularities.HOUR)
+ .startingAt("2012-01-01")
+ .withNumPartitions(1)
+ .eachOfSizeInMb(1);
ruleRunner = new RunRules(new ReplicationThrottler(24, 1, false), coordinator);
}
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/BlockingExecutorService.java b/server/src/test/java/org/apache/druid/server/coordinator/simulate/BlockingExecutorService.java
new file mode 100644
index 0000000000..fc59a6bd9d
--- /dev/null
+++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/BlockingExecutorService.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.simulate;
+
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.logger.Logger;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * An executor that keeps submitted tasks in a queue until they are explicitly
+ * invoked by calling one of these methods:
+ * <ul>
+ * <li>{@link #finishNextPendingTask()}</li>
+ * <li>{@link #finishNextPendingTasks(int)}</li>
+ * <li>{@link #finishAllPendingTasks()}</li>
+ * </ul>
+ */
+public class BlockingExecutorService implements ExecutorService
+{
+ private static final Logger log = new Logger(BlockingExecutorService.class);
+
+ private final String nameFormat;
+ private final Queue<Task<?>> taskQueue = new ConcurrentLinkedQueue<>();
+
+ public BlockingExecutorService(String nameFormat)
+ {
+ this.nameFormat = nameFormat;
+ }
+
+ public boolean hasPendingTasks()
+ {
+ return !taskQueue.isEmpty();
+ }
+
+ /**
+ * Executes the next pending task on the calling thread itself.
+ */
+ public int finishNextPendingTask()
+ {
+ log.debug("[%s] Executing next pending task", nameFormat);
+ Task<?> task = taskQueue.poll();
+ if (task != null) {
+ task.executeNow();
+ return 1;
+ } else {
+ return 0;
+ }
+ }
+
+ /**
+ * Executes the next {@code numTasksToExecute} pending tasks on the calling
+ * thread itself.
+ */
+ public int finishNextPendingTasks(int numTasksToExecute)
+ {
+ log.debug("[%s] Executing %d pending tasks", nameFormat, numTasksToExecute);
+ int executedTaskCount = 0;
+ for (; executedTaskCount < numTasksToExecute; ++executedTaskCount) {
+ Task<?> task = taskQueue.poll();
+ if (task == null) {
+ break;
+ } else {
+ task.executeNow();
+ }
+ }
+ return executedTaskCount;
+ }
+
+ /**
+ * Executes all the remaining pending tasks on the calling thread itself.
+ * <p>
+ * Note: This method can keep running forever if another thread keeps submitting
+ * new tasks to the executor.
+ */
+ public int finishAllPendingTasks()
+ {
+ log.debug("[%s] Executing all pending tasks", nameFormat);
+ Task<?> task;
+ int executedTaskCount = 0;
+ while ((task = taskQueue.poll()) != null) {
+ task.executeNow();
+ ++executedTaskCount;
+ }
+
+ return executedTaskCount;
+ }
+
+ // Task submission operations
+ @Override
+ public <T> Future<T> submit(Callable<T> task)
+ {
+ return addTaskToQueue(task);
+ }
+
+ @Override
+ public <T> Future<T> submit(Runnable task, T result)
+ {
+ return addTaskToQueue(() -> {
+ task.run();
+ return result;
+ });
+ }
+
+ @Override
+ public Future<?> submit(Runnable task)
+ {
+ return addTaskToQueue(() -> {
+ task.run();
+ return null;
+ });
+ }
+
+ @Override
+ public void execute(Runnable command)
+ {
+ submit(command);
+ }
+
+ private <T> Future<T> addTaskToQueue(Callable<T> callable)
+ {
+ Task<T> task = new Task<>(callable);
+ taskQueue.add(task);
+ return task.future;
+ }
+
+ // Termination operations
+ @Override
+ public void shutdown()
+ {
+ taskQueue.clear();
+ }
+
+ @Override
+ public List<Runnable> shutdownNow()
+ {
+ return null;
+ }
+
+ @Override
+ public boolean isShutdown()
+ {
+ return false;
+ }
+
+ @Override
+ public boolean isTerminated()
+ {
+ return false;
+ }
+
+ @Override
+ public boolean awaitTermination(long timeout, TimeUnit unit)
+ {
+ return false;
+ }
+
+ // Unsupported operations
+ @Override
+ public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public <T> List<Future<T>> invokeAll(
+ Collection<? extends Callable<T>> tasks,
+ long timeout,
+ TimeUnit unit
+ )
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Task that can be invoked to complete the corresponding future.
+ */
+ private static class Task<T>
+ {
+ private final Callable<T> callable;
+ private final CompletableFuture<T> future = new CompletableFuture<>();
+
+ private Task(Callable<T> callable)
+ {
+ this.callable = callable;
+ }
+
+ private void executeNow()
+ {
+ try {
+ T result = callable.call();
+ future.complete(result);
+ }
+ catch (Exception e) {
+ throw new ISE("Error while executing task", e);
+ }
+ }
+ }
+
+}
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulation.java b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulation.java
new file mode 100644
index 0000000000..6822419f79
--- /dev/null
+++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulation.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.simulate;
+
+import org.apache.druid.client.DruidServer;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
+
+import java.util.List;
+
+/**
+ * Runner for a coordinator simulation.
+ */
+public interface CoordinatorSimulation
+{
+ /**
+ * Starts the simulation if not already started.
+ */
+ void start();
+
+ /**
+ * Stops the simulation.
+ */
+ void stop();
+
+ /**
+ * State of the coordinator during the simulation.
+ */
+ CoordinatorState coordinator();
+
+ /**
+ * State of the cluster during the simulation.
+ */
+ ClusterState cluster();
+
+ static CoordinatorSimulationBuilder builder()
+ {
+ return new CoordinatorSimulationBuilder();
+ }
+
+ interface CoordinatorState
+ {
+ /**
+ * Runs a single coordinator cycle.
+ */
+ void runCoordinatorCycle();
+
+ /**
+ * Synchronizes the inventory view maintained by the coordinator with the
+ * actual state of the cluster.
+ */
+ void syncInventoryView();
+
+ /**
+ * Sets the CoordinatorDynamicConfig.
+ */
+ void setDynamicConfig(CoordinatorDynamicConfig dynamicConfig);
+
+ /**
+ * Gets the inventory view of the specified server as maintained by the
+ * coordinator.
+ */
+ DruidServer getInventoryView(String serverName);
+
+ /**
+ * Returns the metric events emitted in the previous coordinator run.
+ */
+ List<ServiceMetricEvent> getMetricEvents();
+
+ /**
+ * Gets the load percentage of the specified datasource as seen by the coordinator.
+ */
+ double getLoadPercentage(String datasource);
+ }
+
+ interface ClusterState
+ {
+ /**
+ * Finishes load of all the segments that were queued in the previous
+ * coordinator run. Also handles the responses and executes the respective
+ * callbacks on the coordinator.
+ */
+ void loadQueuedSegments();
+ }
+}
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBaseTest.java b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBaseTest.java
new file mode 100644
index 0000000000..60e8e42824
--- /dev/null
+++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBaseTest.java
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.simulate;
+
+import org.apache.druid.client.DruidServer;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.server.coordination.ServerType;
+import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
+import org.apache.druid.server.coordinator.CreateDataSegments;
+import org.apache.druid.server.coordinator.rules.ForeverLoadRule;
+import org.apache.druid.server.coordinator.rules.Rule;
+import org.apache.druid.timeline.DataSegment;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Base test for coordinator simulations.
+ * <p>
+ * Each test must call {@link #startSimulation(CoordinatorSimulation)} to start
+ * the simulation. {@link CoordinatorSimulation#stop()} should not be called as
+ * the simulation is stopped when cleaning up after the test in {@link #tearDown()}.
+ * <p>
+ * Tests that verify balancing behaviour should set
+ * {@link CoordinatorDynamicConfig#useBatchedSegmentSampler()} to true.
+ * Otherwise, the segment sampling is random and can produce repeated values
+ * leading to flakiness in the tests. The simulation sets this field to true by
+ * default.
+ */
+public abstract class CoordinatorSimulationBaseTest
+ implements CoordinatorSimulation.CoordinatorState, CoordinatorSimulation.ClusterState
+{
+ static final double DOUBLE_DELTA = 10e-9;
+
+ private CoordinatorSimulation sim;
+ private final Map<String, List<ServiceMetricEvent>> latestMetricEvents = new HashMap<>();
+
+ @Before
+ public abstract void setUp();
+
+ @After
+ public void tearDown()
+ {
+ if (sim != null) {
+ sim.stop();
+ sim = null;
+ }
+ }
+
+ /**
+ * This must be called to start the simulation and set the correct state.
+ */
+ void startSimulation(CoordinatorSimulation simulation)
+ {
+ this.sim = simulation;
+ simulation.start();
+ }
+
+ @Override
+ public void runCoordinatorCycle()
+ {
+ latestMetricEvents.clear();
+ sim.coordinator().runCoordinatorCycle();
+
+ // Extract the metric values of this run
+ for (ServiceMetricEvent event : sim.coordinator().getMetricEvents()) {
+ latestMetricEvents.computeIfAbsent(event.getMetric(), m -> new ArrayList<>())
+ .add(event);
+ }
+ }
+
+ @Override
+ public List<ServiceMetricEvent> getMetricEvents()
+ {
+ return sim.coordinator().getMetricEvents();
+ }
+
+ @Override
+ public DruidServer getInventoryView(String serverName)
+ {
+ return sim.coordinator().getInventoryView(serverName);
+ }
+
+ @Override
+ public void syncInventoryView()
+ {
+ sim.coordinator().syncInventoryView();
+ }
+
+ @Override
+ public void setDynamicConfig(CoordinatorDynamicConfig dynamicConfig)
+ {
+ sim.coordinator().setDynamicConfig(dynamicConfig);
+ }
+
+ @Override
+ public void loadQueuedSegments()
+ {
+ sim.cluster().loadQueuedSegments();
+ }
+
+ @Override
+ public double getLoadPercentage(String datasource)
+ {
+ return sim.coordinator().getLoadPercentage(datasource);
+ }
+
+ // Verification methods
+ void verifyDatasourceIsFullyLoaded(String datasource)
+ {
+ Assert.assertEquals(100.0, getLoadPercentage(datasource), DOUBLE_DELTA);
+ }
+
+ void verifyNoEvent(String metricName)
+ {
+ Assert.assertTrue(getMetricValues(metricName, null).isEmpty());
+ }
+
+ /**
+ * Verifies the value of the specified metric emitted in the previous run.
+ */
+ void verifyValue(String metricName, Number expectedValue)
+ {
+ verifyValue(metricName, null, expectedValue);
+ }
+
+ /**
+ * Verifies the value of the event corresponding to the specified metric and
+ * dimensionFilters emitted in the previous run.
+ */
+ void verifyValue(String metricName, Map<String, String> dimensionFilters, Number expectedValue)
+ {
+ Assert.assertEquals(expectedValue, getValue(metricName, dimensionFilters));
+ }
+
+ /**
+ * Gets the value of the event corresponding to the specified metric and
+ * dimensionFilters emitted in the previous run.
+ */
+ Number getValue(String metricName, Map<String, String> dimensionFilters)
+ {
+ List<Number> values = getMetricValues(metricName, dimensionFilters);
+ Assert.assertEquals(
+ "Metric must have been emitted exactly once for the given dimensions.",
+ 1,
+ values.size()
+ );
+ return values.get(0);
+ }
+
+ private List<Number> getMetricValues(String metricName, Map<String, String> dimensionFilters)
+ {
+ final List<Number> values = new ArrayList<>();
+ final List<ServiceMetricEvent> events = latestMetricEvents.getOrDefault(metricName, Collections.emptyList());
+ final Map<String, String> filters = dimensionFilters == null
+ ? Collections.emptyMap() : dimensionFilters;
+ for (ServiceMetricEvent event : events) {
+ final Map<String, Object> userDims = event.getUserDims();
+ boolean match = filters.keySet().stream()
+ .map(d -> filters.get(d).equals(userDims.get(d)))
+ .reduce((a, b) -> a && b)
+ .orElse(true);
+ if (match) {
+ values.add(event.getValue());
+ }
+ }
+
+ return values;
+ }
+
+ // Utility methods
+
+ /**
+ * Creates a {@link CoordinatorDynamicConfig} with the specified values of:
+ * {@code maxSegmentsToMove, maxSegmentsInNodeLoadingQueue and replicationThrottleLimit}.
+ * The created config always has {@code useBatchedSegmentSampler=true} to avoid
+ * flakiness in tests.
+ *
+ * @see CoordinatorSimulationBaseTest
+ */
+ static CoordinatorDynamicConfig createDynamicConfig(
+ int maxSegmentsToMove,
+ int maxSegmentsInNodeLoadingQueue,
+ int replicationThrottleLimit
+ )
+ {
+ return CoordinatorDynamicConfig.builder()
+ .withMaxSegmentsToMove(maxSegmentsToMove)
+ .withReplicationThrottleLimit(replicationThrottleLimit)
+ .withMaxSegmentsInNodeLoadingQueue(maxSegmentsInNodeLoadingQueue)
+ .withUseBatchedSegmentSampler(true)
+ .build();
+ }
+
+ /**
+ * Creates a map containing dimension key-values to filter out metric events.
+ */
+ static Map<String, String> filter(String... dimensionValues)
+ {
+ if (dimensionValues.length < 2 || dimensionValues.length % 2 == 1) {
+ throw new IllegalArgumentException("Dimension key-values must be specified in pairs.");
+ }
+
+ final Map<String, String> filters = new HashMap<>();
+ for (int i = 0; i < dimensionValues.length; ) {
+ filters.put(dimensionValues[i], dimensionValues[i + 1]);
+ i += 2;
+ }
+ return filters;
+ }
+
+ /**
+ * Creates a historical. The {@code uniqueIdInTier} must be correctly specified
+ * as it is used to identify the historical throughout the simulation.
+ */
+ static DruidServer createHistorical(int uniqueIdInTier, String tier, long serverSizeMb)
+ {
+ final String name = tier + "__" + "hist__" + uniqueIdInTier;
+ return new DruidServer(name, name, name, serverSizeMb, ServerType.HISTORICAL, tier, 1);
+ }
+
+ // Utility and constant holder classes
+
+ static class DS
+ {
+ static final String WIKI = "wiki";
+ }
+
+ static class Tier
+ {
+ static final String T1 = "tier_t1";
+ static final String T2 = "tier_t2";
+ static final String T3 = "tier_t3";
+ }
+
+ static class Metric
+ {
+ static final String ASSIGNED_COUNT = "segment/assigned/count";
+ static final String MOVED_COUNT = "segment/moved/count";
+ static final String DROPPED_COUNT = "segment/dropped/count";
+ static final String LOAD_QUEUE_COUNT = "segment/loadQueue/count";
+ }
+
+ static class Segments
+ {
+ /**
+ * Segments of datasource {@link DS#WIKI}, size 500 MB each,
+ * spanning 1 day containing 10 partitions.
+ */
+ static final List<DataSegment> WIKI_10X1D =
+ CreateDataSegments.ofDatasource(DS.WIKI)
+ .forIntervals(1, Granularities.DAY)
+ .startingAt("2022-01-01")
+ .withNumPartitions(10)
+ .eachOfSizeInMb(500);
+ }
+
+ /**
+ * Builder for a load rule.
+ */
+ static class Load
+ {
+ private final Map<String, Integer> tieredReplicants = new HashMap<>();
+
+ static Load on(String tier, int numReplicas)
+ {
+ Load load = new Load();
+ load.tieredReplicants.put(tier, numReplicas);
+ return load;
+ }
+
+ Load andOn(String tier, int numReplicas)
+ {
+ tieredReplicants.put(tier, numReplicas);
+ return this;
+ }
+
+ Rule forever()
+ {
+ return new ForeverLoadRule(tieredReplicants);
+ }
+ }
+}
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java
new file mode 100644
index 0000000000..29301ea033
--- /dev/null
+++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/CoordinatorSimulationBuilder.java
@@ -0,0 +1,554 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.simulate;
+
+import com.fasterxml.jackson.databind.InjectableValues;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import org.apache.druid.client.DruidServer;
+import org.apache.druid.common.config.JacksonConfigManager;
+import org.apache.druid.curator.ZkEnablementConfig;
+import org.apache.druid.curator.discovery.ServiceAnnouncer;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.concurrent.DirectExecutorService;
+import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory;
+import org.apache.druid.java.util.common.lifecycle.Lifecycle;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.java.util.http.client.HttpClient;
+import org.apache.druid.java.util.metrics.StubServiceEmitter;
+import org.apache.druid.server.coordinator.BalancerStrategyFactory;
+import org.apache.druid.server.coordinator.CachingCostBalancerStrategyConfig;
+import org.apache.druid.server.coordinator.CachingCostBalancerStrategyFactory;
+import org.apache.druid.server.coordinator.CoordinatorCompactionConfig;
+import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
+import org.apache.druid.server.coordinator.CostBalancerStrategyFactory;
+import org.apache.druid.server.coordinator.DruidCoordinator;
+import org.apache.druid.server.coordinator.DruidCoordinatorConfig;
+import org.apache.druid.server.coordinator.LoadQueueTaskMaster;
+import org.apache.druid.server.coordinator.TestDruidCoordinatorConfig;
+import org.apache.druid.server.coordinator.duty.CoordinatorCustomDutyGroups;
+import org.apache.druid.server.coordinator.rules.Rule;
+import org.apache.druid.server.initialization.ZkPathsConfig;
+import org.apache.druid.server.lookup.cache.LookupCoordinatorManager;
+import org.apache.druid.timeline.DataSegment;
+import org.easymock.EasyMock;
+import org.joda.time.Duration;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Builder for {@link CoordinatorSimulation}.
+ */
+public class CoordinatorSimulationBuilder
+{
+ private static final long DEFAULT_COORDINATOR_PERIOD = 100L;
+ private static final ObjectMapper OBJECT_MAPPER = new DefaultObjectMapper()
+ .setInjectableValues(
+ new InjectableValues.Std().addValue(
+ DataSegment.PruneSpecsHolder.class,
+ DataSegment.PruneSpecsHolder.DEFAULT
+ )
+ );
+
+ private BalancerStrategyFactory balancerStrategyFactory;
+ private CoordinatorDynamicConfig dynamicConfig =
+ CoordinatorDynamicConfig.builder()
+ .withUseBatchedSegmentSampler(true)
+ .build();
+ private List<DruidServer> servers;
+ private List<DataSegment> segments;
+ private final Map<String, List<Rule>> datasourceRules = new HashMap<>();
+ private boolean loadImmediately = false;
+ private boolean autoSyncInventory = true;
+
+ /**
+ * Specifies the balancer strategy to be used.
+ * <p>
+ * Default: "cost" ({@link CostBalancerStrategyFactory})
+ */
+ public CoordinatorSimulationBuilder withBalancer(BalancerStrategyFactory strategyFactory)
+ {
+ this.balancerStrategyFactory = strategyFactory;
+ return this;
+ }
+
+ public CoordinatorSimulationBuilder withServers(List<DruidServer> servers)
+ {
+ this.servers = servers;
+ return this;
+ }
+
+ public CoordinatorSimulationBuilder withServers(DruidServer... servers)
+ {
+ return withServers(Arrays.asList(servers));
+ }
+
+ public CoordinatorSimulationBuilder withSegments(List<DataSegment> segments)
+ {
+ this.segments = segments;
+ return this;
+ }
+
+ public CoordinatorSimulationBuilder withRules(String datasource, Rule... rules)
+ {
+ this.datasourceRules.put(datasource, Arrays.asList(rules));
+ return this;
+ }
+
+ /**
+ * Specifies whether segments should be loaded as soon as they are queued.
+ * <p>
+ * Default: false
+ */
+ public CoordinatorSimulationBuilder withImmediateSegmentLoading(boolean loadImmediately)
+ {
+ this.loadImmediately = loadImmediately;
+ return this;
+ }
+
+ /**
+ * Specifies whether the inventory view maintained by the coordinator
+ * should be auto-synced as soon as any change is made to the cluster.
+ * <p>
+ * Default: true
+ */
+ public CoordinatorSimulationBuilder withAutoInventorySync(boolean autoSync)
+ {
+ this.autoSyncInventory = autoSync;
+ return this;
+ }
+
+ /**
+ * Specifies the CoordinatorDynamicConfig to be used in the simulation.
+ * <p>
+ * Default values: {@code useBatchedSegmentSampler = true}, other params as
+ * specified in {@link CoordinatorDynamicConfig.Builder}.
+ * <p>
+ * Tests that verify balancing behaviour should set
+ * {@link CoordinatorDynamicConfig#useBatchedSegmentSampler()} to true.
+ * Otherwise, the segment sampling is random and can produce repeated values
+ * leading to flakiness in the tests. The simulation sets this field to true by
+ * default.
+ */
+ public CoordinatorSimulationBuilder withDynamicConfig(CoordinatorDynamicConfig dynamicConfig)
+ {
+ this.dynamicConfig = dynamicConfig;
+ return this;
+ }
+
+ public CoordinatorSimulation build()
+ {
+ Preconditions.checkArgument(
+ servers != null && !servers.isEmpty(),
+ "Cannot run simulation for an empty cluster"
+ );
+
+ // Prepare the environment
+ final TestServerInventoryView serverInventoryView = new TestServerInventoryView();
+ servers.forEach(serverInventoryView::addServer);
+
+ final TestSegmentsMetadataManager segmentManager = new TestSegmentsMetadataManager();
+ if (segments != null) {
+ segments.forEach(segmentManager::addSegment);
+ }
+
+ final TestMetadataRuleManager ruleManager = new TestMetadataRuleManager();
+ datasourceRules.forEach(
+ (datasource, rules) ->
+ ruleManager.overrideRule(datasource, rules, null)
+ );
+
+ final Environment env = new Environment(
+ serverInventoryView,
+ segmentManager,
+ ruleManager,
+ dynamicConfig,
+ loadImmediately,
+ autoSyncInventory
+ );
+
+ // Build the coordinator
+ final DruidCoordinator coordinator = new DruidCoordinator(
+ env.coordinatorConfig,
+ new ZkPathsConfig(),
+ env.jacksonConfigManager,
+ env.segmentManager,
+ env.coordinatorInventoryView,
+ env.ruleManager,
+ () -> null,
+ env.serviceEmitter,
+ env.executorFactory,
+ null,
+ env.loadQueueTaskMaster,
+ new ServiceAnnouncer.Noop(),
+ null,
+ Collections.emptySet(),
+ null,
+ new CoordinatorCustomDutyGroups(Collections.emptySet()),
+ balancerStrategyFactory != null ? balancerStrategyFactory
+ : new CostBalancerStrategyFactory(),
+ env.lookupCoordinatorManager,
+ env.leaderSelector,
+ OBJECT_MAPPER,
+ ZkEnablementConfig.ENABLED
+ );
+
+ return new SimulationImpl(coordinator, env);
+ }
+
+ private BalancerStrategyFactory buildCachingCostBalancerStrategy(Environment env)
+ {
+ try {
+ return new CachingCostBalancerStrategyFactory(
+ env.coordinatorInventoryView,
+ env.lifecycle,
+ new CachingCostBalancerStrategyConfig()
+ );
+ }
+ catch (Exception e) {
+ throw new ISE(e, "Error building balancer strategy");
+ }
+ }
+
+ /**
+ * Implementation of {@link CoordinatorSimulation}.
+ */
+ private static class SimulationImpl implements CoordinatorSimulation,
+ CoordinatorSimulation.CoordinatorState, CoordinatorSimulation.ClusterState
+ {
+ private final AtomicBoolean running = new AtomicBoolean(false);
+
+ private final Environment env;
+ private final DruidCoordinator coordinator;
+
+ private SimulationImpl(DruidCoordinator coordinator, Environment env)
+ {
+ this.env = env;
+ this.coordinator = coordinator;
+ }
+
+ @Override
+ public void start()
+ {
+ if (!running.compareAndSet(false, true)) {
+ throw new ISE("Simulation is already running");
+ }
+
+ try {
+ env.setUp();
+ coordinator.start();
+ }
+ catch (Exception e) {
+ throw new ISE(e, "Exception while running simulation");
+ }
+ }
+
+ @Override
+ public void stop()
+ {
+ coordinator.stop();
+ env.leaderSelector.stopBeingLeader();
+ env.tearDown();
+ }
+
+ @Override
+ public CoordinatorState coordinator()
+ {
+ return this;
+ }
+
+ @Override
+ public ClusterState cluster()
+ {
+ return this;
+ }
+
+ @Override
+ public void runCoordinatorCycle()
+ {
+ verifySimulationRunning();
+ env.serviceEmitter.flush();
+
+ // Invoke historical duties and metadata duties
+ env.executorFactory.coordinatorRunner.finishNextPendingTasks(2);
+ }
+
+ @Override
+ public void syncInventoryView()
+ {
+ verifySimulationRunning();
+ Preconditions.checkState(
+ !env.autoSyncInventory,
+ "Cannot invoke syncInventoryView as simulation is running in auto-sync mode."
+ );
+ env.coordinatorInventoryView.sync(env.historicalInventoryView);
+ }
+
+ @Override
+ public void setDynamicConfig(CoordinatorDynamicConfig dynamicConfig)
+ {
+ env.setDynamicConfig(dynamicConfig);
+ }
+
+ @Override
+ public DruidServer getInventoryView(String serverName)
+ {
+ return env.coordinatorInventoryView.getInventoryValue(serverName);
+ }
+
+ @Override
+ public void loadQueuedSegments()
+ {
+ verifySimulationRunning();
+ Preconditions.checkState(
+ !env.loadImmediately,
+ "Cannot invoke loadQueuedSegments as simulation is running in immediate loading mode."
+ );
+
+ final BlockingExecutorService loadQueueExecutor = env.executorFactory.loadQueueExecutor;
+ while (loadQueueExecutor.hasPendingTasks()) {
+ // Drain all the items from the load queue executor
+ // This sends at most 1 load/drop request to each server
+ loadQueueExecutor.finishAllPendingTasks();
+
+ // Load all the queued segments, handle their responses and execute callbacks
+ int loadedSegments = env.executorFactory.historicalLoader.finishAllPendingTasks();
+ loadQueueExecutor.finishNextPendingTasks(loadedSegments);
+ env.executorFactory.loadCallbackExecutor.finishAllPendingTasks();
+ }
+ }
+
+ private void verifySimulationRunning()
+ {
+ if (!running.get()) {
+ throw new ISE("Simulation hasn't been started yet.");
+ }
+ }
+
+ @Override
+ public double getLoadPercentage(String datasource)
+ {
+ return coordinator.getLoadStatus().get(datasource);
+ }
+
+ @Override
+ public List<ServiceMetricEvent> getMetricEvents()
+ {
+ return new ArrayList<>(env.serviceEmitter.getMetricEvents());
+ }
+ }
+
+ /**
+ * Environment for a coordinator simulation.
+ */
+ private static class Environment
+ {
+ private final Lifecycle lifecycle = new Lifecycle("coord-sim");
+
+ // Executors
+ private final ExecutorFactory executorFactory;
+
+ private final TestDruidLeaderSelector leaderSelector = new TestDruidLeaderSelector();
+ private final TestSegmentsMetadataManager segmentManager;
+ private final TestMetadataRuleManager ruleManager;
+ private final TestServerInventoryView historicalInventoryView;
+
+ private final LoadQueueTaskMaster loadQueueTaskMaster;
+ private final StubServiceEmitter serviceEmitter
+ = new StubServiceEmitter("coordinator", "coordinator");
+ private final TestServerInventoryView coordinatorInventoryView;
+
+ private final AtomicReference<CoordinatorDynamicConfig> dynamicConfig = new AtomicReference<>();
+ private final JacksonConfigManager jacksonConfigManager;
+ private final LookupCoordinatorManager lookupCoordinatorManager;
+ private final DruidCoordinatorConfig coordinatorConfig;
+ private final boolean loadImmediately;
+ private final boolean autoSyncInventory;
+
+ private final List<Object> mocks = new ArrayList<>();
+
+ private Environment(
+ TestServerInventoryView clusterInventory,
+ TestSegmentsMetadataManager segmentManager,
+ TestMetadataRuleManager ruleManager,
+ CoordinatorDynamicConfig dynamicConfig,
+ boolean loadImmediately,
+ boolean autoSyncInventory
+ )
+ {
+ this.historicalInventoryView = clusterInventory;
+ this.segmentManager = segmentManager;
+ this.ruleManager = ruleManager;
+ this.loadImmediately = loadImmediately;
+ this.autoSyncInventory = autoSyncInventory;
+
+ this.coordinatorConfig = new TestDruidCoordinatorConfig.Builder()
+ .withCoordinatorStartDelay(new Duration(1L))
+ .withCoordinatorPeriod(new Duration(DEFAULT_COORDINATOR_PERIOD))
+ .withCoordinatorKillPeriod(new Duration(DEFAULT_COORDINATOR_PERIOD))
+ .withLoadQueuePeonRepeatDelay(new Duration("PT0S"))
+ .withLoadQueuePeonType("http")
+ .withCoordinatorKillIgnoreDurationToRetain(false)
+ .build();
+
+ this.executorFactory = new ExecutorFactory(loadImmediately);
+ this.coordinatorInventoryView = autoSyncInventory
+ ? clusterInventory
+ : new TestServerInventoryView();
+ HttpClient httpClient = new TestSegmentLoadingHttpClient(
+ OBJECT_MAPPER,
+ clusterInventory::getChangeHandlerForHost,
+ executorFactory.create(1, ExecutorFactory.HISTORICAL_LOADER)
+ );
+
+ this.loadQueueTaskMaster = new LoadQueueTaskMaster(
+ null,
+ OBJECT_MAPPER,
+ executorFactory.create(1, ExecutorFactory.LOAD_QUEUE_EXECUTOR),
+ executorFactory.create(1, ExecutorFactory.LOAD_CALLBACK_EXECUTOR),
+ coordinatorConfig,
+ httpClient,
+ null
+ );
+
+ this.jacksonConfigManager = mockConfigManager();
+ setDynamicConfig(dynamicConfig);
+
+ this.lookupCoordinatorManager = EasyMock.createNiceMock(LookupCoordinatorManager.class);
+ mocks.add(jacksonConfigManager);
+ mocks.add(lookupCoordinatorManager);
+ }
+
+ private void setUp() throws Exception
+ {
+ EmittingLogger.registerEmitter(serviceEmitter);
+ historicalInventoryView.setUp();
+ coordinatorInventoryView.setUp();
+ lifecycle.start();
+ executorFactory.setUp();
+ leaderSelector.becomeLeader();
+ EasyMock.replay(mocks.toArray());
+ }
+
+ private void tearDown()
+ {
+ EasyMock.verify(mocks.toArray());
+ executorFactory.tearDown();
+ lifecycle.stop();
+ }
+
+ private void setDynamicConfig(CoordinatorDynamicConfig dynamicConfig)
+ {
+ this.dynamicConfig.set(dynamicConfig);
+ }
+
+ private JacksonConfigManager mockConfigManager()
+ {
+ final JacksonConfigManager jacksonConfigManager
+ = EasyMock.createMock(JacksonConfigManager.class);
+ EasyMock.expect(
+ jacksonConfigManager.watch(
+ EasyMock.eq(CoordinatorDynamicConfig.CONFIG_KEY),
+ EasyMock.eq(CoordinatorDynamicConfig.class),
+ EasyMock.anyObject()
+ )
+ ).andReturn(dynamicConfig).anyTimes();
+
+ EasyMock.expect(
+ jacksonConfigManager.watch(
+ EasyMock.eq(CoordinatorCompactionConfig.CONFIG_KEY),
+ EasyMock.eq(CoordinatorCompactionConfig.class),
+ EasyMock.anyObject()
+ )
+ ).andReturn(new AtomicReference<>(CoordinatorCompactionConfig.empty())).anyTimes();
+
+ return jacksonConfigManager;
+ }
+ }
+
+ /**
+ * Implementation of {@link ScheduledExecutorFactory} used to create and keep
+ * a handle on the various executors used inside the coordinator.
+ */
+ private static class ExecutorFactory implements ScheduledExecutorFactory
+ {
+ static final String HISTORICAL_LOADER = "historical-loader-%d";
+ static final String LOAD_QUEUE_EXECUTOR = "load-queue-%d";
+ static final String LOAD_CALLBACK_EXECUTOR = "load-callback-%d";
+ static final String COORDINATOR_RUNNER = "Coordinator-Exec--%d";
+
+ private final Map<String, BlockingExecutorService> blockingExecutors = new HashMap<>();
+ private final boolean directExecution;
+
+ private BlockingExecutorService historicalLoader;
+ private BlockingExecutorService loadQueueExecutor;
+ private BlockingExecutorService loadCallbackExecutor;
+ private BlockingExecutorService coordinatorRunner;
+
+ private ExecutorFactory(boolean directExecution)
+ {
+ this.directExecution = directExecution;
+ }
+
+ @Override
+ public ScheduledExecutorService create(int corePoolSize, String nameFormat)
+ {
+ boolean isCoordinatorRunner = COORDINATOR_RUNNER.equals(nameFormat);
+
+ // Coordinator running executor must always be blocked
+ final ExecutorService executorService =
+ (directExecution && !isCoordinatorRunner)
+ ? new DirectExecutorService()
+ : blockingExecutors.computeIfAbsent(nameFormat, BlockingExecutorService::new);
+
+ return new WrappingScheduledExecutorService(nameFormat, executorService, !isCoordinatorRunner);
+ }
+
+ private BlockingExecutorService findExecutor(String nameFormat)
+ {
+ return blockingExecutors.get(nameFormat);
+ }
+
+ private void setUp()
+ {
+ coordinatorRunner = findExecutor(COORDINATOR_RUNNER);
+ historicalLoader = findExecutor(HISTORICAL_LOADER);
+ loadQueueExecutor = findExecutor(LOAD_QUEUE_EXECUTOR);
+ loadCallbackExecutor = findExecutor(LOAD_CALLBACK_EXECUTOR);
+ }
+
+ private void tearDown()
+ {
+ blockingExecutors.values().forEach(BlockingExecutorService::shutdown);
+ }
+ }
+
+}
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/README.md b/server/src/test/java/org/apache/druid/server/coordinator/simulate/README.md
new file mode 100644
index 0000000000..a1562c5187
--- /dev/null
+++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/README.md
@@ -0,0 +1,141 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied. See the License for the
+ ~ specific language governing permissions and limitations
+ ~ under the License.
+ -->
+
+# Coordinator simulations
+
+The simulation framework allows developers to recreate arbitrary cluster setups and verify coordinator behaviour. Tests
+written using the framework can also help identify performance bottlenecks or potential bugs in the system and even
+compare different balancing strategies.
+
+As opposed to unit tests, simulations are meant to test the coordinator as a whole and verify the interactions of all
+the underlying parts. In that regard, these simulations resemble integration tests more closely.
+
+## Test targets
+
+The primary test target is the `DruidCoordinator` itself. The behaviour of the following entities can also be verified
+using simulations:
+
+- `LoadQueuePeon`, `LoadQueueTaskMaster`
+- All coordinator duties, e.g. `BalanceSegments`, `RunRules`
+- All retention rules
+
+## Capabilities
+
+The framework provides control over the following aspects of the setup:
+
+| Input | Details | Actions |
+|-------|---------|---------|
+|cluster | server name, type, tier, size | add a server, remove a server|
+|segment |datasource, interval, version, partition num, size | add/remove from server, mark used/unused, publish new segments|
+|rules | type (foreverLoad, drop, etc), replica count per tier | set rules for a datasource|
+|configs |coordinator period, load queue type, load queue size, max segments to balance | set or update a config |
+
+The above actions can be performed at any point after building the simulation. So, you could even recreate scenarios
+where during a coordinator run, a server crashes or the retention rules of a datasource change, and verify the behaviour
+of the coordinator in these situations.
+
+## Design
+
+1. __Execution__: A tight dependency on time durations such as the period of a repeating task or the delay before a
+ scheduled task makes it difficult to reliably reproduce a test scenario. As a result, the tests become flaky. Thus,
+ all the executors required for coordinator operations have been allowed only two possible modes of execution:
+ - __immediate__: Execute tasks on the calling thread itself.
+ - __blocked__: Keep tasks in a queue until explicitly invoked.
+2. __Internal dependencies__: In order to allow realistic reproductions of the coordinator behaviour, none of the
+ internal parts of the coordinator have been mocked in the framework and new tests need not mock anything at all.
+3. __External dependencies__: Since these tests are meant to verify the behaviour of only the coordinator, the
+ interfaces to communicate with external dependencies have been provided as simple in-memory implementations:
+ - communication with metadata store: `SegmentMetadataManager`, `MetadataRuleManager`
+ - communication with historicals: `HttpClient`, `ServerInventoryView`
+4. __Inventory__: The coordinator maintains an inventory view of the cluster state. Simulations can choose from two
+ modes of inventory update - auto and manual. In auto update mode, any change made to the cluster is immediately
+ reflected in the inventory view. In manual update mode, the inventory must be explicitly synchronized with the
+ cluster state.
+
+## Limitations
+
+- The framework does not expose the coordinator HTTP endpoints.
+- It should not be used to verify the absolute values of execution latencies, e.g. the time taken to compute the
+ balancing cost of a segment. But the relative values can still be a good indicator while doing comparisons between,
+ say two balancing strategies.
+
+## Usage
+
+Writing a test class:
+
+- Extend `CoordinatorSimulationBaseTest`. This base test exposes methods to get or set the state of the cluster and
+ coordinator during a simulation.
+- Build a simulation using `CoordinatorSimulation.builder()` with specified segments, servers, rules and configs.
+- Start the simulation with `startSimulation(simulation)`.
+- Invoke coordinator runs with `runCoordinatorCycle()`
+- Verify emitted metrics and current cluster state
+
+Example:
+
+```java
+public class SimpleSimulationTest extends CoordinatorSimulationBaseTest
+{
+ @Test
+ public void testShiftSegmentsToDifferentTier()
+ {
+ // Create segments
+ List<DataSegment> segments =
+ CreateDataSegments.ofDatasource("wiki")
+ .forIntervals(30, Granularities.DAY)
+ .startingAt("2022-01-01")
+ .withNumPartitions(10)
+ .eachOfSizeInMb(500);
+
+ // Create servers
+ DruidServer historicalTier1 = createHistoricalTier(1, "tier_1", 10000);
+ DruidServer historicalTier2 = createHistoricalTier(1, "tier_2", 20000);
+
+ // Build simulation
+ CoordinatorSimulation sim =
+ CoordinatorSimulation.builder()
+ .withServers(historicalTier1, historicalTier2)
+ .withSegments(segments)
+ .withRules("wiki".Load.on("tier_2", 1).forever())
+ .build();
+
+ // Start the simulation with all segments loaded on tier_1
+ segments.forEach(historicalTier1::addSegment);
+ startSimulation(sim);
+
+ // Run a few coordinator cycles
+ int totalLoadedOnT2 = 0;
+ int totalDroppedFromT1 = 0;
+ for (int i = 0; i < 10; ++i) {
+ runCoordinatorCycle();
+ loadQueuedSegments();
+ totalLoadedOnT2 += getValue("segment/assigned/count", filter("tier", "tier_2"));
+ totalDroppedFromT1 += getValue("segment/dropped/count", filter("tier", "tier_1"));
+ }
+
+ // Verify that some segments have been loaded/dropped
+ Assert.assertTrue(totalLoadedOnT2 > 0 && totalLoadedOnT2 <= segments.size());
+ Assert.assertTrue(totalDroppedFromT1 > 0 && totalDroppedFromT1 <= segments.size());
+ Assert.assertTrue(totalDroppedFromT1 <= totalLoadedOnT2);
+ }
+}
+```
+
+## More examples
+
+See `org.apache.druid.server.coordinator.simulate.SegmentLoadingTest`
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/SegmentBalancingTest.java b/server/src/test/java/org/apache/druid/server/coordinator/simulate/SegmentBalancingTest.java
new file mode 100644
index 0000000000..77b9820a95
--- /dev/null
+++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/SegmentBalancingTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.simulate;
+
+import org.apache.druid.client.DruidServer;
+import org.apache.druid.query.DruidMetrics;
+import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
+import org.apache.druid.timeline.DataSegment;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.List;
+
+/**
+ * Coordinator simulation test to verify behaviour of segment balancing.
+ */
+public class SegmentBalancingTest extends CoordinatorSimulationBaseTest
+{
+ private DruidServer historicalT11;
+ private DruidServer historicalT12;
+
+ private final String datasource = DS.WIKI;
+ private final List<DataSegment> segments = Segments.WIKI_10X1D;
+
+ @Override
+ public void setUp()
+ {
+ // Setup historicals for 2 tiers, size 10 GB each
+ historicalT11 = createHistorical(1, Tier.T1, 10_000);
+ historicalT12 = createHistorical(2, Tier.T1, 10_000);
+ }
+
+ @Test
+ public void testBalancingWithSyncedInventory()
+ {
+ // maxSegmentsToMove = 10, unlimited load queue, replicationThrottleLimit = 10
+ CoordinatorDynamicConfig dynamicConfig = createDynamicConfig(10, 0, 10);
+
+ // historicals = 2(T1), replicas = 1(T1)
+ final CoordinatorSimulation sim =
+ CoordinatorSimulation.builder()
+ .withSegments(segments)
+ .withServers(historicalT11, historicalT12)
+ .withRules(datasource, Load.on(Tier.T1, 1).forever())
+ .withDynamicConfig(dynamicConfig)
+ .withAutoInventorySync(true)
+ .build();
+
+ // Put all the segments on histT11
+ segments.forEach(historicalT11::addDataSegment);
+
+ startSimulation(sim);
+ runCoordinatorCycle();
+
+ // Verify that segments have been chosen for balancing
+ verifyValue(Metric.MOVED_COUNT, 5L);
+
+ loadQueuedSegments();
+
+ // Verify that segments have now been balanced out
+ Assert.assertEquals(5, historicalT11.getTotalSegments());
+ Assert.assertEquals(5, historicalT12.getTotalSegments());
+ verifyDatasourceIsFullyLoaded(datasource);
+ }
+
+ @Test
+ public void testBalancingOfFullyReplicatedSegment()
+ {
+ // maxSegmentsToMove = 10, unlimited load queue, replicationThrottleLimit = 10
+ CoordinatorDynamicConfig dynamicConfig = createDynamicConfig(10, 0, 10);
+
+ // historicals = 2(in T1), replicas = 1(T1)
+ final CoordinatorSimulation sim =
+ CoordinatorSimulation.builder()
+ .withSegments(segments)
+ .withServers(historicalT11, historicalT12)
+ .withDynamicConfig(dynamicConfig)
+ .withRules(datasource, Load.on(Tier.T1, 1).forever())
+ .build();
+
+ // Put all the segments on histT11
+ segments.forEach(historicalT11::addDataSegment);
+
+ startSimulation(sim);
+ runCoordinatorCycle();
+
+ // Verify that there are segments in the load queue for balancing
+ verifyValue(Metric.MOVED_COUNT, 5L);
+ verifyValue(
+ Metric.LOAD_QUEUE_COUNT,
+ filter(DruidMetrics.SERVER, historicalT12.getName()),
+ 5
+ );
+
+ runCoordinatorCycle();
+
+ // Verify that the segments in the load queue are not considered as over-replicated
+ verifyValue("segment/dropped/count", 0L);
+ verifyValue(
+ Metric.LOAD_QUEUE_COUNT,
+ filter(DruidMetrics.SERVER, historicalT12.getName()),
+ 5
+ );
+
+ // Finish and verify balancing
+ loadQueuedSegments();
+ Assert.assertEquals(5, historicalT11.getTotalSegments());
+ Assert.assertEquals(5, historicalT12.getTotalSegments());
+ verifyDatasourceIsFullyLoaded(datasource);
+ }
+}
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/SegmentLoadingNegativeTest.java b/server/src/test/java/org/apache/druid/server/coordinator/simulate/SegmentLoadingNegativeTest.java
new file mode 100644
index 0000000000..52dc7c0933
--- /dev/null
+++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/SegmentLoadingNegativeTest.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.simulate;
+
+import org.apache.druid.client.DruidServer;
+import org.apache.druid.query.DruidMetrics;
+import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
+import org.apache.druid.timeline.DataSegment;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.List;
+
+/**
+ * Contains negative tests that verify existing erroneous behaviour of segment
+ * loading. The underlying issues should be fixed and the modified tests
+ * should be migrated to {@link SegmentLoadingTest}.
+ * <p>
+ * Identified issues:
+ * <a href="https://github.com/apache/druid/issues/12881">Apache #12881</a>
+ */
+public class SegmentLoadingNegativeTest extends CoordinatorSimulationBaseTest
+{
+ private DruidServer historicalT11;
+ private DruidServer historicalT12;
+ private DruidServer historicalT21;
+
+ private final String datasource = DS.WIKI;
+ private final List<DataSegment> segments = Segments.WIKI_10X1D;
+
+ @Override
+ public void setUp()
+ {
+ // Setup historicals for 2 tiers, size 10 GB each
+ historicalT11 = createHistorical(1, Tier.T1, 10_000);
+ historicalT12 = createHistorical(2, Tier.T1, 10_000);
+ historicalT21 = createHistorical(1, Tier.T2, 10_000);
+ }
+
+ /**
+ * Correct behaviour: replicationThrottleLimit should not be violated even if
+ * segment loading is fast.
+ * <p>
+ * Fix Apache #12881 to fix this test.
+ */
+ @Test
+ public void testImmediateLoadingViolatesThrottleLimit()
+ {
+ // Disable balancing, infinite load queue size, replicationThrottleLimit = 2
+ CoordinatorDynamicConfig dynamicConfig = createDynamicConfig(0, 0, 2);
+
+ // historicals = 2(in T1), segments = 10*1day
+ // replicas = 2(on T1), immediate segment loading
+ final CoordinatorSimulation sim =
+ CoordinatorSimulation.builder()
+ .withSegments(segments)
+ .withServers(historicalT11, historicalT12)
+ .withRules(datasource, Load.on(Tier.T1, 2).forever())
+ .withImmediateSegmentLoading(true)
+ .withDynamicConfig(dynamicConfig)
+ .build();
+
+ // Put the first replica of all the segments on histT11
+ segments.forEach(historicalT11::addDataSegment);
+
+ startSimulation(sim);
+ runCoordinatorCycle();
+
+ // Verify that number of replicas assigned exceeds the replicationThrottleLimit
+ verifyValue(Metric.ASSIGNED_COUNT, 10L);
+
+ Assert.assertEquals(10, historicalT11.getTotalSegments());
+ Assert.assertEquals(10, historicalT12.getTotalSegments());
+ verifyDatasourceIsFullyLoaded(datasource);
+ }
+
+ /**
+ * Correct behaviour: The first replica on any tier should not be throttled.
+ * <p>
+ * Fix Apache #12881 to fix this test.
+ */
+ @Test
+ public void testFirstReplicaOnAnyTierIsThrottled()
+ {
+ // Disable balancing, infinite load queue size, replicateThrottleLimit = 2
+ CoordinatorDynamicConfig dynamicConfig = createDynamicConfig(0, 0, 2);
+
+ // historicals = 1(in T1) + 1(in T2)
+ // replicas = 1(on T1) + 1(on T2)
+ final CoordinatorSimulation sim =
+ CoordinatorSimulation.builder()
+ .withSegments(segments)
+ .withServers(historicalT11, historicalT21)
+ .withDynamicConfig(dynamicConfig)
+ .withRules(
+ datasource,
+ Load.on(Tier.T1, 1).andOn(Tier.T2, 1).forever()
+ )
+ .build();
+
+ // Put the first replica of all the segments on T1
+ segments.forEach(historicalT11::addDataSegment);
+
+ startSimulation(sim);
+ runCoordinatorCycle();
+
+ // Verify that num replicas assigned to T2 are equal to the replicationthrottleLimit
+ verifyValue(
+ Metric.ASSIGNED_COUNT,
+ filter(DruidMetrics.TIER, Tier.T2),
+ 2L
+ );
+
+ loadQueuedSegments();
+
+ verifyDatasourceIsFullyLoaded(datasource);
+ Assert.assertEquals(10, historicalT11.getTotalSegments());
+ Assert.assertEquals(2, historicalT21.getTotalSegments());
+ }
+
+ /**
+ * Correct behaviour: Historical should not get overassigned even if loading is fast.
+ * <p>
+ * Fix Apache #12881 to fix this test.
+ */
+ @Test
+ public void testImmediateLoadingOverassignsHistorical()
+ {
+ // historicals = 1(in T1), size 1 GB
+ final DruidServer historicalT11 = createHistorical(1, Tier.T1, 1000);
+
+ // disable balancing, unlimited load queue, replicationThrottleLimit = 10
+ CoordinatorDynamicConfig dynamicConfig = createDynamicConfig(0, 0, 10);
+
+ // segments = 10*1day, size 500 MB
+ // strategy = cost, replicas = 1(T1)
+ final CoordinatorSimulation sim =
+ CoordinatorSimulation.builder()
+ .withSegments(segments)
+ .withServers(historicalT11)
+ .withDynamicConfig(dynamicConfig)
+ .withRules(datasource, Load.on(Tier.T1, 1).forever())
+ .withImmediateSegmentLoading(true)
+ .build();
+
+ startSimulation(sim);
+ runCoordinatorCycle();
+
+ // The historical is assigned several segments but loads only upto its capacity
+ verifyValue(Metric.ASSIGNED_COUNT, 10L);
+ Assert.assertEquals(2, historicalT11.getTotalSegments());
+ }
+
+ /**
+ * Correct behaviour: For a fully replicated segment, items that are in the load
+ * queue should get cancelled so that the coordinator does not have to wait
+ * for the loads to finish and then take remedial action.
+ * <p>
+ * Fix Apache #12881 to fix this test case.
+ */
+ @Test
+ public void testLoadOfFullyReplicatedSegmentIsNotCancelled()
+ {
+ // disable balancing, unlimited load queue, replicationThrottleLimit = 10
+ CoordinatorDynamicConfig dynamicConfig = createDynamicConfig(0, 0, 10);
+
+ // historicals = 2(in T1), replicas = 2(on T1)
+ final CoordinatorSimulation sim =
+ CoordinatorSimulation.builder()
+ .withSegments(segments)
+ .withServers(historicalT11, historicalT12)
+ .withDynamicConfig(dynamicConfig)
+ .withRules(datasource, Load.on(Tier.T1, 2).forever())
+ .build();
+
+ // Put the first replica of all the segments on histT11
+ segments.forEach(historicalT11::addDataSegment);
+
+ startSimulation(sim);
+ runCoordinatorCycle();
+
+ // Verify that there are segments in the load queue
+ verifyValue(Metric.ASSIGNED_COUNT, 10L);
+ verifyValue(
+ Metric.LOAD_QUEUE_COUNT,
+ filter(DruidMetrics.SERVER, historicalT12.getName()),
+ 10
+ );
+
+ // Put the second replica of all the segments on histT12
+ segments.forEach(historicalT12::addDataSegment);
+
+ runCoordinatorCycle();
+
+ // Verify that the segments are still in the load queue
+ verifyValue(
+ Metric.LOAD_QUEUE_COUNT,
+ filter(DruidMetrics.SERVER, historicalT12.getName()),
+ 10
+ );
+ }
+
+ /**
+ * Correct behaviour: Balancing should never cause over-replication, even when
+ * the inventory view is not updated.
+ * <p>
+ * Fix Apache #12881 to fix this test.
+ */
+ @Test
+ public void testBalancingWithStaleInventoryCausesOverReplication()
+ {
+ // maxSegmentsToMove = 10, unlimited load queue, replicationThrottleLimit = 10
+ CoordinatorDynamicConfig dynamicConfig = createDynamicConfig(10, 0, 10);
+
+ // historicals = 2(T1), replicas = 1(T1)
+ final CoordinatorSimulation sim =
+ CoordinatorSimulation.builder()
+ .withSegments(segments)
+ .withServers(historicalT11, historicalT12)
+ .withRules(datasource, Load.on(Tier.T1, 1).forever())
+ .withDynamicConfig(dynamicConfig)
+ .withAutoInventorySync(false)
+ .build();
+
+ // Put all the segments on histT11
+ segments.forEach(historicalT11::addDataSegment);
+
+ startSimulation(sim);
+ syncInventoryView();
+ runCoordinatorCycle();
+
+ // Verify that segments have been chosen for balancing
+ verifyValue(Metric.MOVED_COUNT, 5L);
+
+ loadQueuedSegments();
+
+ // Verify that segments have now been balanced out
+ Assert.assertEquals(10, historicalT11.getTotalSegments());
+ Assert.assertEquals(5, historicalT12.getTotalSegments());
+ verifyDatasourceIsFullyLoaded(datasource);
+ }
+
+}
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/SegmentLoadingTest.java b/server/src/test/java/org/apache/druid/server/coordinator/simulate/SegmentLoadingTest.java
new file mode 100644
index 0000000000..1edeab8a37
--- /dev/null
+++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/SegmentLoadingTest.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.simulate;
+
+import org.apache.druid.client.DruidServer;
+import org.apache.druid.query.DruidMetrics;
+import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
+import org.apache.druid.timeline.DataSegment;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.List;
+
+/**
+ * Coordinator simulation test to verify behaviour of segment loading.
+ */
+public class SegmentLoadingTest extends CoordinatorSimulationBaseTest
+{
+ private DruidServer historicalT11;
+ private DruidServer historicalT12;
+ private DruidServer historicalT21;
+ private DruidServer historicalT22;
+
+ private final String datasource = DS.WIKI;
+ private final List<DataSegment> segments = Segments.WIKI_10X1D;
+
+ @Override
+ public void setUp()
+ {
+ // Setup historicals for 2 tiers, size 10 GB each
+ historicalT11 = createHistorical(1, Tier.T1, 10_000);
+ historicalT12 = createHistorical(2, Tier.T1, 10_000);
+
+ historicalT21 = createHistorical(1, Tier.T2, 10_000);
+ historicalT22 = createHistorical(2, Tier.T2, 10_000);
+ }
+
+ @Test
+ public void testSecondReplicaOnAnyTierIsThrottled()
+ {
+ // Disable balancing, infinite load queue size, replicateThrottleLimit = 2
+ CoordinatorDynamicConfig dynamicConfig = createDynamicConfig(0, 0, 2);
+
+ // historicals = 2(in T1)
+ // replicas = 2(on T1)
+ final CoordinatorSimulation sim =
+ CoordinatorSimulation.builder()
+ .withSegments(segments)
+ .withServers(historicalT11, historicalT12)
+ .withRules(datasource, Load.on(Tier.T1, 2).forever())
+ .withDynamicConfig(dynamicConfig)
+ .build();
+
+ // Put the first replica of all the segments on histT11
+ segments.forEach(historicalT11::addDataSegment);
+
+ startSimulation(sim);
+ runCoordinatorCycle();
+
+ // Verify that that replicationThrottleLimit is honored
+ verifyValue(Metric.ASSIGNED_COUNT, 2L);
+
+ loadQueuedSegments();
+ Assert.assertEquals(10, historicalT11.getTotalSegments());
+ Assert.assertEquals(2, historicalT12.getTotalSegments());
+ }
+
+ @Test
+ public void testLoadingDoesNotOverassignHistorical()
+ {
+ // historicals = 1(in T1), size 1 GB
+ final DruidServer historicalT11 = createHistorical(1, Tier.T1, 1000);
+
+ // disable balancing, unlimited load queue, replicationThrottleLimit = 10
+ CoordinatorDynamicConfig dynamicConfig = createDynamicConfig(0, 0, 10);
+
+ // segments = 10*1day, size 500 MB
+ // strategy = cost, replicas = 1(T1)
+ final CoordinatorSimulation sim =
+ CoordinatorSimulation.builder()
+ .withSegments(segments)
+ .withServers(historicalT11)
+ .withDynamicConfig(dynamicConfig)
+ .withRules(datasource, Load.on(Tier.T1, 1).forever())
+ .withImmediateSegmentLoading(false)
+ .build();
+
+ startSimulation(sim);
+ runCoordinatorCycle();
+
+ // Verify that the number of segments assigned is within the historical capacity
+ verifyValue(Metric.ASSIGNED_COUNT, 2L);
+ loadQueuedSegments();
+ Assert.assertEquals(2, historicalT11.getTotalSegments());
+ }
+
+ @Test
+ public void testDropHappensAfterTargetReplicationOnEveryTier()
+ {
+ // maxNonPrimaryReplicants = 33 ensures that all target replicas (total 4)
+ // are assigned for some segments in the first run itself (pigeon-hole)
+ CoordinatorDynamicConfig dynamicConfig =
+ CoordinatorDynamicConfig.builder()
+ .withMaxSegmentsToMove(0)
+ .withReplicationThrottleLimit(10)
+ .withMaxNonPrimaryReplicantsToLoad(33)
+ .build();
+
+ // historicals = 1(in T1) + 2(in T2) + 2(in T3)
+ // segments = 10 * 1day, replicas = 2(T2) + 2(T3)
+ final DruidServer historicalT31 = createHistorical(1, Tier.T3, 10_000);
+ final DruidServer historicalT32 = createHistorical(2, Tier.T3, 10_000);
+ final CoordinatorSimulation sim =
+ CoordinatorSimulation.builder()
+ .withSegments(segments)
+ .withDynamicConfig(dynamicConfig)
+ .withRules(datasource, Load.on(Tier.T2, 2).andOn(Tier.T3, 2).forever())
+ .withServers(
+ historicalT11,
+ historicalT21,
+ historicalT22,
+ historicalT31,
+ historicalT32
+ )
+ .build();
+
+ // At the start, T1 has all the segments
+ segments.forEach(historicalT11::addDataSegment);
+
+ // Run 1: Nothing is dropped from T1 but things are assigned to T2 and T3
+ startSimulation(sim);
+ runCoordinatorCycle();
+
+ verifyNoEvent(Metric.DROPPED_COUNT);
+ int totalAssignedInRun1
+ = getValue(Metric.ASSIGNED_COUNT, filter(DruidMetrics.TIER, Tier.T2)).intValue()
+ + getValue(Metric.ASSIGNED_COUNT, filter(DruidMetrics.TIER, Tier.T3)).intValue();
+ Assert.assertTrue(totalAssignedInRun1 > 0 && totalAssignedInRun1 < 40);
+
+ // Run 2: Segments still queued, nothing is dropped from T1
+ runCoordinatorCycle();
+ loadQueuedSegments();
+
+ verifyNoEvent(Metric.DROPPED_COUNT);
+ int totalLoadedAfterRun2
+ = historicalT21.getTotalSegments() + historicalT22.getTotalSegments()
+ + historicalT31.getTotalSegments() + historicalT32.getTotalSegments();
+ Assert.assertEquals(totalAssignedInRun1, totalLoadedAfterRun2);
+
+ // Run 3: Some segments have been loaded
+ // segments fully replicated on T2 and T3 will now be dropped from T1
+ runCoordinatorCycle();
+ loadQueuedSegments();
+
+ int totalDroppedInRun3
+ = getValue(Metric.DROPPED_COUNT, filter(DruidMetrics.TIER, Tier.T1)).intValue();
+ Assert.assertTrue(totalDroppedInRun3 > 0 && totalDroppedInRun3 < 10);
+ int totalLoadedAfterRun3
+ = historicalT21.getTotalSegments() + historicalT22.getTotalSegments()
+ + historicalT31.getTotalSegments() + historicalT32.getTotalSegments();
+ Assert.assertEquals(40, totalLoadedAfterRun3);
+
+ // Run 4: All segments are fully replicated on T2 and T3
+ runCoordinatorCycle();
+ loadQueuedSegments();
+
+ int totalDroppedInRun4
+ = getValue(Metric.DROPPED_COUNT, filter(DruidMetrics.TIER, Tier.T1)).intValue();
+
+ Assert.assertEquals(10, totalDroppedInRun3 + totalDroppedInRun4);
+ Assert.assertEquals(0, historicalT11.getTotalSegments());
+ verifyDatasourceIsFullyLoaded(datasource);
+ }
+
+}
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestDruidLeaderSelector.java b/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestDruidLeaderSelector.java
new file mode 100644
index 0000000000..d84cbcff6e
--- /dev/null
+++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestDruidLeaderSelector.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.simulate;
+
+import org.apache.druid.discovery.DruidLeaderSelector;
+
+import javax.annotation.Nullable;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class TestDruidLeaderSelector implements DruidLeaderSelector
+{
+ private final AtomicBoolean isLeader = new AtomicBoolean(false);
+ private volatile Listener listener;
+
+ public void becomeLeader()
+ {
+ if (isLeader.compareAndSet(false, true) && listener != null) {
+ listener.becomeLeader();
+ }
+ }
+
+ public void stopBeingLeader()
+ {
+ if (isLeader.compareAndSet(true, false) && listener != null) {
+ listener.stopBeingLeader();
+ }
+ }
+
+ @Nullable
+ @Override
+ public String getCurrentLeader()
+ {
+ return "me";
+ }
+
+ @Override
+ public boolean isLeader()
+ {
+ return isLeader.get();
+ }
+
+ @Override
+ public int localTerm()
+ {
+ return 0;
+ }
+
+ @Override
+ public void registerListener(Listener listener)
+ {
+ this.listener = listener;
+ if (isLeader()) {
+ listener.becomeLeader();
+ }
+ }
+
+ @Override
+ public void unregisterListener()
+ {
+ listener = null;
+ }
+}
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestMetadataRuleManager.java b/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestMetadataRuleManager.java
new file mode 100644
index 0000000000..9ca037b0cf
--- /dev/null
+++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestMetadataRuleManager.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.simulate;
+
+import org.apache.druid.audit.AuditInfo;
+import org.apache.druid.metadata.MetadataRuleManager;
+import org.apache.druid.server.coordinator.rules.ForeverLoadRule;
+import org.apache.druid.server.coordinator.rules.Rule;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class TestMetadataRuleManager implements MetadataRuleManager
+{
+ private final Map<String, List<Rule>> rules = new HashMap<>();
+
+ private static final String DEFAULT_DATASOURCE = "_default";
+
+ public TestMetadataRuleManager()
+ {
+ rules.put(
+ DEFAULT_DATASOURCE,
+ Collections.singletonList(new ForeverLoadRule(null))
+ );
+ }
+
+ @Override
+ public void start()
+ {
+ // do nothing
+ }
+
+ @Override
+ public void stop()
+ {
+ // do nothing
+ }
+
+ @Override
+ public void poll()
+ {
+ // do nothing
+ }
+
+ @Override
+ public Map<String, List<Rule>> getAllRules()
+ {
+ return rules;
+ }
+
+ @Override
+ public List<Rule> getRules(final String dataSource)
+ {
+ List<Rule> retVal = rules.get(dataSource);
+ return retVal == null ? new ArrayList<>() : retVal;
+ }
+
+ @Override
+ public List<Rule> getRulesWithDefault(final String dataSource)
+ {
+ List<Rule> retVal = new ArrayList<>();
+ final Map<String, List<Rule>> theRules = rules;
+ if (theRules.get(dataSource) != null) {
+ retVal.addAll(theRules.get(dataSource));
+ }
+ if (theRules.get(DEFAULT_DATASOURCE) != null) {
+ retVal.addAll(theRules.get(DEFAULT_DATASOURCE));
+ }
+ return retVal;
+ }
+
+ @Override
+ public boolean overrideRule(final String dataSource, final List<Rule> newRules, final AuditInfo auditInfo)
+ {
+ rules.put(dataSource, newRules);
+ return true;
+ }
+
+ @Override
+ public int removeRulesForEmptyDatasourcesOlderThan(long timestamp)
+ {
+ return 0;
+ }
+
+ public void removeRulesForDatasource(String dataSource)
+ {
+ if (!DEFAULT_DATASOURCE.equals(dataSource)) {
+ rules.remove(dataSource);
+ }
+ }
+}
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentLoadingHttpClient.java b/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentLoadingHttpClient.java
new file mode 100644
index 0000000000..0b91e70090
--- /dev/null
+++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentLoadingHttpClient.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.simulate;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningScheduledExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import org.apache.druid.java.util.http.client.HttpClient;
+import org.apache.druid.java.util.http.client.Request;
+import org.apache.druid.java.util.http.client.response.HttpResponseHandler;
+import org.apache.druid.server.coordination.DataSegmentChangeCallback;
+import org.apache.druid.server.coordination.DataSegmentChangeHandler;
+import org.apache.druid.server.coordination.DataSegmentChangeRequest;
+import org.apache.druid.server.coordination.SegmentLoadDropHandler;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
+import org.jboss.netty.handler.codec.http.HttpResponse;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+import org.jboss.netty.handler.codec.http.HttpVersion;
+import org.joda.time.Duration;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public class TestSegmentLoadingHttpClient implements HttpClient
+{
+ private static final HttpResponseHandler.TrafficCop NOOP_TRAFFIC_COP = checkNum -> 0L;
+ private static final DataSegmentChangeCallback NOOP_CALLBACK = () -> {
+ };
+
+ private final ObjectMapper objectMapper;
+ private final Function<String, DataSegmentChangeHandler> hostToHandler;
+
+ private final ListeningScheduledExecutorService executorService;
+
+ public TestSegmentLoadingHttpClient(
+ ObjectMapper objectMapper,
+ Function<String, DataSegmentChangeHandler> hostToHandler,
+ ScheduledExecutorService executorService
+ )
+ {
+ this.objectMapper = objectMapper;
+ this.hostToHandler = hostToHandler;
+ this.executorService = MoreExecutors.listeningDecorator(executorService);
+ }
+
+ @Override
+ public <Intermediate, Final> ListenableFuture<Final> go(
+ Request request,
+ HttpResponseHandler<Intermediate, Final> handler
+ )
+ {
+ return go(request, handler, null);
+ }
+
+ @Override
+ public <Intermediate, Final> ListenableFuture<Final> go(
+ Request request,
+ HttpResponseHandler<Intermediate, Final> handler,
+ Duration readTimeout
+ )
+ {
+ return executorService.submit(() -> processRequest(request, handler));
+ }
+
+ private <Intermediate, Final> Final processRequest(
+ Request request,
+ HttpResponseHandler<Intermediate, Final> handler
+ )
+ {
+ try {
+ // Fail the request if there is no handler for this host
+ final DataSegmentChangeHandler changeHandler = hostToHandler
+ .apply(request.getUrl().getHost());
+ if (changeHandler == null) {
+ final HttpResponse failureResponse =
+ new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NOT_FOUND);
+ failureResponse.setContent(ChannelBuffers.EMPTY_BUFFER);
+ handler.handleResponse(failureResponse, NOOP_TRAFFIC_COP);
+ return (Final) new ByteArrayInputStream(new byte[0]);
+ }
+
+ // Handle change requests and serialize
+ final byte[] serializedContent;
+ try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+ objectMapper.writeValue(baos, processRequest(request, changeHandler));
+ serializedContent = baos.toByteArray();
+ }
+
+ // Set response content and status
+ final HttpResponse response =
+ new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
+ response.setContent(ChannelBuffers.EMPTY_BUFFER);
+ handler.handleResponse(response, NOOP_TRAFFIC_COP);
+ return (Final) new ByteArrayInputStream(serializedContent);
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Processes all the changes in the request.
+ */
+ private List<SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus> processRequest(
+ Request request,
+ DataSegmentChangeHandler changeHandler
+ ) throws IOException
+ {
+ final List<DataSegmentChangeRequest> changeRequests = objectMapper.readValue(
+ request.getContent().array(),
+ new TypeReference<List<DataSegmentChangeRequest>>()
+ {
+ }
+ );
+
+ return changeRequests
+ .stream()
+ .map(changeRequest -> processRequest(changeRequest, changeHandler))
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * Processes each DataSegmentChangeRequest using the handler.
+ */
+ private SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus processRequest(
+ DataSegmentChangeRequest request,
+ DataSegmentChangeHandler handler
+ )
+ {
+ SegmentLoadDropHandler.Status status;
+ try {
+ request.go(handler, NOOP_CALLBACK);
+ status = SegmentLoadDropHandler.Status.SUCCESS;
+ }
+ catch (Exception e) {
+ status = SegmentLoadDropHandler.Status.failed(e.getMessage());
+ }
+
+ return new SegmentLoadDropHandler
+ .DataSegmentChangeRequestAndStatus(request, status);
+ }
+}
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentsMetadataManager.java b/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentsMetadataManager.java
new file mode 100644
index 0000000000..43a96d6007
--- /dev/null
+++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentsMetadataManager.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.simulate;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.client.DataSourcesSnapshot;
+import org.apache.druid.client.ImmutableDruidDataSource;
+import org.apache.druid.metadata.SegmentsMetadataManager;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.Partitions;
+import org.apache.druid.timeline.SegmentId;
+import org.apache.druid.timeline.VersionedIntervalTimeline;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+public class TestSegmentsMetadataManager implements SegmentsMetadataManager
+{
+ private final ConcurrentMap<String, DataSegment> segments = new ConcurrentHashMap<>();
+ private final ConcurrentMap<String, DataSegment> usedSegments = new ConcurrentHashMap<>();
+
+ public void addSegment(DataSegment segment)
+ {
+ segments.put(segment.getId().toString(), segment);
+ usedSegments.put(segment.getId().toString(), segment);
+ }
+
+ public void removeSegment(DataSegment segment)
+ {
+ segments.remove(segment.getId().toString());
+ usedSegments.remove(segment.getId().toString());
+ }
+
+ @Override
+ public void startPollingDatabasePeriodically()
+ {
+
+ }
+
+ @Override
+ public void stopPollingDatabasePeriodically()
+ {
+
+ }
+
+ @Override
+ public boolean isPollingDatabasePeriodically()
+ {
+ return true;
+ }
+
+ @Override
+ public int markAsUsedAllNonOvershadowedSegmentsInDataSource(String dataSource)
+ {
+ return 0;
+ }
+
+ @Override
+ public int markAsUsedNonOvershadowedSegmentsInInterval(String dataSource, Interval interval)
+ {
+ return 0;
+ }
+
+ @Override
+ public int markAsUsedNonOvershadowedSegments(String dataSource, Set<String> segmentIds)
+ {
+ return 0;
+ }
+
+ @Override
+ public boolean markSegmentAsUsed(String segmentId)
+ {
+ if (!segments.containsKey(segmentId)) {
+ return false;
+ }
+
+ usedSegments.put(segmentId, segments.get(segmentId));
+ return true;
+ }
+
+ @Override
+ public int markAsUnusedAllSegmentsInDataSource(String dataSource)
+ {
+ return 0;
+ }
+
+ @Override
+ public int markAsUnusedSegmentsInInterval(String dataSource, Interval interval)
+ {
+ return 0;
+ }
+
+ @Override
+ public int markSegmentsAsUnused(Set<SegmentId> segmentIds)
+ {
+ int numModifiedSegments = 0;
+ for (SegmentId segmentId : segmentIds) {
+ if (usedSegments.remove(segmentId.toString()) != null) {
+ ++numModifiedSegments;
+ }
+ }
+ return numModifiedSegments;
+ }
+
+ @Override
+ public boolean markSegmentAsUnused(SegmentId segmentId)
+ {
+ return usedSegments.remove(segmentId.toString()) != null;
+ }
+
+ @Nullable
+ @Override
+ public ImmutableDruidDataSource getImmutableDataSourceWithUsedSegments(String dataSource)
+ {
+ return null;
+ }
+
+ @Override
+ public Collection<ImmutableDruidDataSource> getImmutableDataSourcesWithAllUsedSegments()
+ {
+ return getSnapshotOfDataSourcesWithAllUsedSegments().getDataSourcesWithAllUsedSegments();
+ }
+
+ @Override
+ public Set<SegmentId> getOvershadowedSegments()
+ {
+ return getSnapshotOfDataSourcesWithAllUsedSegments().getOvershadowedSegments();
+ }
+
+ @Override
+ public DataSourcesSnapshot getSnapshotOfDataSourcesWithAllUsedSegments()
+ {
+ return DataSourcesSnapshot.fromUsedSegments(usedSegments.values(), ImmutableMap.of());
+ }
+
+ @Override
+ public Iterable<DataSegment> iterateAllUsedSegments()
+ {
+ return usedSegments.values();
+ }
+
+ @Override
+ public Optional<Iterable<DataSegment>> iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(
+ String datasource,
+ Interval interval,
+ boolean requiresLatest
+ )
+ {
+ VersionedIntervalTimeline<String, DataSegment> usedSegmentsTimeline
+ = getSnapshotOfDataSourcesWithAllUsedSegments().getUsedSegmentsTimelinesPerDataSource().get(datasource);
+ return Optional.fromNullable(usedSegmentsTimeline)
+ .transform(timeline -> timeline.findNonOvershadowedObjectsInInterval(
+ interval,
+ Partitions.ONLY_COMPLETE
+ ));
+ }
+
+ @Override
+ public Set<String> retrieveAllDataSourceNames()
+ {
+ return null;
+ }
+
+ @Override
+ public List<Interval> getUnusedSegmentIntervals(String dataSource, DateTime maxEndTime, int limit)
+ {
+ return null;
+ }
+
+ @Override
+ public void poll()
+ {
+
+ }
+}
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestServerInventoryView.java b/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestServerInventoryView.java
new file mode 100644
index 0000000000..fedafc45c9
--- /dev/null
+++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestServerInventoryView.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.simulate;
+
+import org.apache.druid.client.DruidServer;
+import org.apache.druid.client.ServerInventoryView;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.server.coordination.DataSegmentChangeCallback;
+import org.apache.druid.server.coordination.DataSegmentChangeHandler;
+import org.apache.druid.timeline.DataSegment;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+
+public class TestServerInventoryView implements ServerInventoryView
+{
+ private static final Logger log = new Logger(TestServerInventoryView.class);
+
+ private final ConcurrentHashMap<String, DruidServer> servers = new ConcurrentHashMap<>();
+ private final ConcurrentHashMap<String, DataSegmentChangeHandler> segmentChangeHandlers = new ConcurrentHashMap<>();
+
+ private final ConcurrentHashMap<SegmentCallback, Executor> segmentCallbacks = new ConcurrentHashMap<>();
+ private final List<ServerChangeHandler> serverChangeHandlers = new ArrayList<>();
+
+ public void setUp()
+ {
+ segmentCallbacks.forEach(
+ (segmentCallback, executor) ->
+ executor.execute(segmentCallback::segmentViewInitialized)
+ );
+ }
+
+ /**
+ * Synchronizes this inventory view with the given inventory view.
+ */
+ public void sync(ServerInventoryView other)
+ {
+ // Clear the current inventory
+ for (ServerChangeHandler handler : serverChangeHandlers) {
+ servers.values().forEach(handler::removeServer);
+ }
+ servers.clear();
+ segmentChangeHandlers.clear();
+
+ for (DruidServer server : other.getInventory()) {
+ addServer(new DruidServer(
+ server.getName(),
+ server.getHostAndPort(),
+ server.getHostAndTlsPort(),
+ server.getMaxSize(),
+ server.getType(),
+ server.getTier(),
+ server.getPriority()
+ ));
+ DataSegmentChangeHandler handler = getChangeHandlerForHost(server.getName());
+ for (DataSegment segment : server.iterateAllSegments()) {
+ handler.addSegment(segment, null);
+ }
+ }
+ }
+
+ public void addServer(DruidServer server)
+ {
+ servers.put(server.getName(), server);
+ segmentChangeHandlers.put(server.getName(), new SegmentChangeHandler(server));
+ }
+
+ public void removeServer(DruidServer server)
+ {
+ servers.remove(server.getName());
+ segmentChangeHandlers.remove(server.getName());
+
+ for (ServerChangeHandler handler : serverChangeHandlers) {
+ handler.removeServer(server);
+ }
+ }
+
+ public DataSegmentChangeHandler getChangeHandlerForHost(String serverName)
+ {
+ return segmentChangeHandlers.get(serverName);
+ }
+
+ @Nullable
+ @Override
+ public DruidServer getInventoryValue(String serverKey)
+ {
+ return servers.get(serverKey);
+ }
+
+ @Override
+ public Collection<DruidServer> getInventory()
+ {
+ return Collections.unmodifiableCollection(servers.values());
+ }
+
+ @Override
+ public boolean isStarted()
+ {
+ return true;
+ }
+
+ @Override
+ public boolean isSegmentLoadedByServer(String serverKey, DataSegment segment)
+ {
+ DruidServer server = servers.get(serverKey);
+ return server != null && server.getSegment(segment.getId()) != null;
+ }
+
+ @Override
+ public void registerServerRemovedCallback(Executor exec, ServerRemovedCallback callback)
+ {
+ serverChangeHandlers.add(new ServerChangeHandler(callback, exec));
+ }
+
+ @Override
+ public void registerSegmentCallback(Executor exec, SegmentCallback callback)
+ {
+ segmentCallbacks.put(callback, exec);
+ }
+
+ private class SegmentChangeHandler implements DataSegmentChangeHandler
+ {
+ private final DruidServer server;
+
+ private SegmentChangeHandler(DruidServer server)
+ {
+ this.server = server;
+ }
+
+ @Override
+ public void addSegment(
+ DataSegment segment,
+ @Nullable DataSegmentChangeCallback callback
+ )
+ {
+ log.debug("Adding segment [%s] to server [%s]", segment.getId(), server.getName());
+
+ if (server.getMaxSize() - server.getCurrSize() >= segment.getSize()) {
+ server.addDataSegment(segment);
+ segmentCallbacks.forEach(
+ (segmentCallback, executor) -> executor.execute(
+ () -> segmentCallback.segmentAdded(server.getMetadata(), segment)
+ )
+ );
+ } else {
+ throw new ISE(
+ "Not enough free space on server %s. Segment size [%d]. Current free space [%d]",
+ server.getName(),
+ segment.getSize(),
+ server.getMaxSize() - server.getCurrSize()
+ );
+ }
+ }
+
+ @Override
+ public void removeSegment(
+ DataSegment segment,
+ @Nullable DataSegmentChangeCallback callback
+ )
+ {
+ log.debug("Removing segment [%s] from server [%s]", segment.getId(), server.getName());
+ server.removeDataSegment(segment.getId());
+ segmentCallbacks.forEach(
+ (segmentCallback, executor) -> executor.execute(
+ () -> segmentCallback.segmentAdded(server.getMetadata(), segment)
+ )
+ );
+ }
+ }
+
+ private static class ServerChangeHandler
+ {
+ private final Executor executor;
+ private final ServerRemovedCallback callback;
+
+ private ServerChangeHandler(ServerRemovedCallback callback, Executor executor)
+ {
+ this.callback = callback;
+ this.executor = executor;
+ }
+
+ private void removeServer(DruidServer server)
+ {
+ executor.execute(() -> callback.serverRemoved(server));
+ }
+ }
+}
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/WrappingScheduledExecutorService.java b/server/src/test/java/org/apache/druid/server/coordinator/simulate/WrappingScheduledExecutorService.java
new file mode 100644
index 0000000000..334651ee30
--- /dev/null
+++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/WrappingScheduledExecutorService.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.simulate;
+
+import org.apache.druid.java.util.common.logger.Logger;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Wraps an {@link ExecutorService} into a {@link ScheduledExecutorService}.
+ */
+public class WrappingScheduledExecutorService implements ScheduledExecutorService
+{
+ private static final Logger log = new Logger(WrappingScheduledExecutorService.class);
+
+ private final String nameFormat;
+ private final ExecutorService delegate;
+ private final boolean ignoreScheduledTasks;
+
+ public WrappingScheduledExecutorService(
+ String nameFormat,
+ ExecutorService delegate,
+ boolean ignoreScheduledTasks
+ )
+ {
+ this.nameFormat = nameFormat;
+ this.delegate = delegate;
+ this.ignoreScheduledTasks = ignoreScheduledTasks;
+ }
+
+ @Override
+ public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
+ {
+ if (ignoreScheduledTasks) {
+ log.debug("[%s] Ignoring scheduled task", nameFormat);
+ return new WrappingScheduledFuture<>(CompletableFuture.completedFuture(null));
+ }
+
+ // Ignore the delay and just queue the task
+ return new WrappingScheduledFuture<>(submit(command));
+ }
+
+ @Override
+ public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit)
+ {
+ if (ignoreScheduledTasks) {
+ log.debug("[%s] Ignoring scheduled task", nameFormat);
+ return new WrappingScheduledFuture<>(CompletableFuture.completedFuture(null));
+ }
+
+ // Ignore the delay and just queue the task
+ return new WrappingScheduledFuture<>(submit(callable));
+ }
+
+ @Override
+ public ScheduledFuture<?> scheduleAtFixedRate(
+ Runnable command,
+ long initialDelay,
+ long period,
+ TimeUnit unit
+ )
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ScheduledFuture<?> scheduleWithFixedDelay(
+ Runnable command,
+ long initialDelay,
+ long delay,
+ TimeUnit unit
+ )
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void shutdown()
+ {
+ delegate.shutdown();
+ }
+
+ @Override
+ public List<Runnable> shutdownNow()
+ {
+ return delegate.shutdownNow();
+ }
+
+ @Override
+ public boolean isShutdown()
+ {
+ return delegate.isShutdown();
+ }
+
+ @Override
+ public boolean isTerminated()
+ {
+ return delegate.isTerminated();
+ }
+
+ @Override
+ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException
+ {
+ return delegate.awaitTermination(timeout, unit);
+ }
+
+ @Override
+ public <T> Future<T> submit(Callable<T> task)
+ {
+ return delegate.submit(task);
+ }
+
+ @Override
+ public <T> Future<T> submit(Runnable task, T result)
+ {
+ return delegate.submit(task, result);
+ }
+
+ @Override
+ public Future<?> submit(Runnable task)
+ {
+ return delegate.submit(task);
+ }
+
+ @Override
+ public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException
+ {
+ return delegate.invokeAll(tasks);
+ }
+
+ @Override
+ public <T> List<Future<T>> invokeAll(
+ Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit
+ ) throws InterruptedException
+ {
+ return delegate.invokeAll(tasks, timeout, unit);
+ }
+
+ @Override
+ public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException
+ {
+ return delegate.invokeAny(tasks);
+ }
+
+ @Override
+ public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
+ throws InterruptedException, ExecutionException, TimeoutException
+ {
+ return delegate.invokeAny(tasks, timeout, unit);
+ }
+
+ @Override
+ public void execute(Runnable command)
+ {
+ delegate.execute(command);
+ }
+
+ /**
+ * Wraps a Future into a ScheduledFuture.
+ */
+ private static class WrappingScheduledFuture<V> implements ScheduledFuture<V>
+ {
+ private final Future<V> future;
+
+ private WrappingScheduledFuture(Future<V> future)
+ {
+ this.future = future;
+ }
+
+ @Override
+ public long getDelay(TimeUnit unit)
+ {
+ return 0;
+ }
+
+ @Override
+ public int compareTo(Delayed o)
+ {
+ return 0;
+ }
+
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning)
+ {
+ return future.cancel(mayInterruptIfRunning);
+ }
+
+ @Override
+ public boolean isCancelled()
+ {
+ return future.isCancelled();
+ }
+
+ @Override
+ public boolean isDone()
+ {
+ return future.isDone();
+ }
+
+ @Override
+ public V get() throws InterruptedException, ExecutionException
+ {
+ return future.get();
+ }
+
+ @Override
+ public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException
+ {
+ return future.get(timeout, unit);
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org