You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ma...@apache.org on 2021/06/21 00:22:26 UTC
[druid] branch master updated: Temporarily skip compaction for
locked intervals (#11190)
This is an automated email from the ASF dual-hosted git repository.
maytasm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new f0b105e Temporarily skip compaction for locked intervals (#11190)
f0b105e is described below
commit f0b105ec63657643c85d82686ae1dea3e588a5f5
Author: Kashif Faraz <ka...@gmail.com>
AuthorDate: Mon Jun 21 05:51:59 2021 +0530
Temporarily skip compaction for locked intervals (#11190)
* Add overlord API /lockedIntervals. Skip compaction for locked intervals
* Revert formatting changes
* Add license info
* Fix checkstyle
* Remove invalid API invocation
* Fix checkstyle
* Add DatasourceIntervalsTest
* Fix checkstyle
* Remove LockedIntervalsResponse
* Add integration tests for lockedIntervals
* Add ITAutoCompactionLockContentionTest
* Add config druid.coordinator.compaction.skipLockedIntervals
* Add test for TaskQueue
---
.../druid/indexing/overlord/TaskLockbox.java | 61 +++
.../indexing/overlord/TaskStorageQueryAdapter.java | 22 +-
.../indexing/overlord/http/OverlordResource.java | 14 +
.../druid/indexing/overlord/TaskLifecycleTest.java | 2 +-
.../druid/indexing/overlord/TaskLockboxTest.java | 139 +++++++
.../druid/indexing/overlord/TaskQueueTest.java | 36 ++
.../overlord/http/OverlordResourceTest.java | 60 +++
.../druid/indexing/overlord/http/OverlordTest.java | 2 +-
.../clients/OverlordResourceTestClient.java | 26 ++
.../apache/druid/testing/utils/CompactionUtil.java | 77 ++++
.../duty/ITAutoCompactionLockContentionTest.java | 418 +++++++++++++++++++++
.../druid/tests/indexer/AbstractIndexerTest.java | 11 +-
.../tests/indexer/AbstractStreamIndexingTest.java | 9 +-
.../apache/druid/tests/indexer/ITIndexerTest.java | 42 +++
.../client/indexing/HttpIndexingServiceClient.java | 23 ++
.../client/indexing/IndexingServiceClient.java | 12 +
.../server/coordinator/DruidCoordinatorConfig.java | 7 +
.../server/coordinator/duty/CompactSegments.java | 68 +++-
.../client/indexing/NoopIndexingServiceClient.java | 6 +
.../coordinator/DruidCoordinatorConfigTest.java | 3 +
.../coordinator/duty/CompactSegmentsTest.java | 133 ++++++-
21 files changed, 1144 insertions(+), 27 deletions(-)
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
index ddb764d..d33d96d 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
@@ -674,6 +674,67 @@ public class TaskLockbox
}
}
+ /**
+ * Gets a List of Intervals locked by higher priority tasks for each datasource.
+ * Here, Segment Locks are being treated the same as Time Chunk Locks i.e.
+ * a Task with a Segment Lock is assumed to lock a whole Interval and not just
+ * the corresponding Segment.
+ *
+ * @param minTaskPriority Minimum task priority for each datasource. Only the
+ * Intervals that are locked by Tasks with equal or
+ * higher priority than this are returned. Locked intervals
+ * for datasources that are not present in this Map are
+ * not returned.
+ * @return Map from Datasource to List of Intervals locked by Tasks that have
+ * priority greater than or equal to the {@code minTaskPriority} for that datasource.
+ */
+ public Map<String, List<Interval>> getLockedIntervals(Map<String, Integer> minTaskPriority)
+ {
+ final Map<String, Set<Interval>> datasourceToIntervals = new HashMap<>();
+
+ // Take a lock and populate the maps
+ giant.lock();
+ try {
+ running.forEach(
+ (datasource, datasourceLocks) -> {
+ // If this datasource is not requested, do not proceed
+ if (!minTaskPriority.containsKey(datasource)) {
+ return;
+ }
+
+ datasourceLocks.forEach(
+ (startTime, startTimeLocks) -> startTimeLocks.forEach(
+ (interval, taskLockPosses) -> taskLockPosses.forEach(
+ taskLockPosse -> {
+ if (taskLockPosse.getTaskLock().isRevoked()) {
+ // Do not proceed if the lock is revoked
+ return;
+ } else if (taskLockPosse.getTaskLock().getPriority() == null
+ || taskLockPosse.getTaskLock().getPriority() < minTaskPriority.get(datasource)) {
+ // Do not proceed if the lock has a priority strictly less than the minimum
+ return;
+ }
+
+ datasourceToIntervals
+ .computeIfAbsent(datasource, k -> new HashSet<>())
+ .add(interval);
+ })
+ )
+ );
+ }
+ );
+ }
+ finally {
+ giant.unlock();
+ }
+
+ return datasourceToIntervals.entrySet().stream()
+ .collect(Collectors.toMap(
+ Map.Entry::getKey,
+ entry -> new ArrayList<>(entry.getValue())
+ ));
+ }
+
public void unlock(final Task task, final Interval interval)
{
unlock(task, interval, null);
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorageQueryAdapter.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorageQueryAdapter.java
index 6a12b40..4ccd925 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorageQueryAdapter.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorageQueryAdapter.java
@@ -29,10 +29,12 @@ import org.apache.druid.indexing.common.actions.TaskAction;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.Duration;
+import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
/**
@@ -41,11 +43,13 @@ import java.util.Set;
public class TaskStorageQueryAdapter
{
private final TaskStorage storage;
+ private final TaskLockbox taskLockbox;
@Inject
- public TaskStorageQueryAdapter(TaskStorage storage)
+ public TaskStorageQueryAdapter(TaskStorage storage, TaskLockbox taskLockbox)
{
this.storage = storage;
+ this.taskLockbox = taskLockbox;
}
public List<Task> getActiveTasks()
@@ -53,6 +57,22 @@ public class TaskStorageQueryAdapter
return storage.getActiveTasks();
}
+ /**
+ * Gets a List of Intervals locked by higher priority tasks for each datasource.
+ *
+ * @param minTaskPriority Minimum task priority for each datasource. Only the
+ * Intervals that are locked by Tasks with equal or
+ * higher priority than this are returned. Locked intervals
+ * for datasources that are not present in this Map are
+ * not returned.
+ * @return Map from Datasource to List of Intervals locked by Tasks that have
+ * priority greater than or equal to the {@code minTaskPriority} for that datasource.
+ */
+ public Map<String, List<Interval>> getLockedIntervals(Map<String, Integer> minTaskPriority)
+ {
+ return taskLockbox.getLockedIntervals(minTaskPriority);
+ }
+
public List<TaskInfo<Task, TaskStatus>> getActiveTaskInfo(@Nullable String dataSource)
{
return storage.getActiveTaskInfo(dataSource);
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
index bc01e55..0076eb0 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
@@ -227,6 +227,20 @@ public class OverlordResource
}
}
+ @POST
+ @Path("/lockedIntervals")
+ @Produces(MediaType.APPLICATION_JSON)
+ @ResourceFilters(StateResourceFilter.class)
+ public Response getDatasourceLockedIntervals(Map<String, Integer> minTaskPriority)
+ {
+ if (minTaskPriority == null || minTaskPriority.isEmpty()) {
+ return Response.status(Status.BAD_REQUEST).entity("No Datasource provided").build();
+ }
+
+ // Build the response
+ return Response.ok(taskStorageQueryAdapter.getLockedIntervals(minTaskPriority)).build();
+ }
+
@GET
@Path("/task/{taskid}")
@Produces(MediaType.APPLICATION_JSON)
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
index 7c41ba9..613e0ac 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
@@ -475,7 +475,7 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest
throw new RE("Unknown task storage type [%s]", taskStorageType);
}
}
- tsqa = new TaskStorageQueryAdapter(taskStorage);
+ tsqa = new TaskStorageQueryAdapter(taskStorage, taskLockbox);
return taskStorage;
}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
index 6cb19ed..4b20ba4 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
@@ -70,8 +70,10 @@ import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -1119,6 +1121,143 @@ public class TaskLockboxTest
);
}
+ @Test
+ public void testGetLockedIntervals()
+ {
+ // Acquire locks for task1
+ final Task task1 = NoopTask.create("ds1");
+ lockbox.add(task1);
+
+ tryTimeChunkLock(
+ TaskLockType.EXCLUSIVE,
+ task1,
+ Intervals.of("2017-01-01/2017-02-01")
+ );
+ tryTimeChunkLock(
+ TaskLockType.EXCLUSIVE,
+ task1,
+ Intervals.of("2017-04-01/2017-05-01")
+ );
+
+ // Acquire locks for task2
+ final Task task2 = NoopTask.create("ds2");
+ lockbox.add(task2);
+ tryTimeChunkLock(
+ TaskLockType.EXCLUSIVE,
+ task2,
+ Intervals.of("2017-03-01/2017-04-01")
+ );
+
+ // Verify the locked intervals
+ final Map<String, Integer> minTaskPriority = new HashMap<>();
+ minTaskPriority.put(task1.getDataSource(), 10);
+ minTaskPriority.put(task2.getDataSource(), 10);
+ final Map<String, List<Interval>> lockedIntervals = lockbox.getLockedIntervals(minTaskPriority);
+ Assert.assertEquals(2, lockedIntervals.size());
+
+ Assert.assertEquals(
+ Arrays.asList(
+ Intervals.of("2017-01-01/2017-02-01"),
+ Intervals.of("2017-04-01/2017-05-01")
+ ),
+ lockedIntervals.get(task1.getDataSource())
+ );
+
+ Assert.assertEquals(
+ Collections.singletonList(
+ Intervals.of("2017-03-01/2017-04-01")),
+ lockedIntervals.get(task2.getDataSource())
+ );
+ }
+
+ @Test
+ public void testGetLockedIntervalsForLowPriorityTask() throws Exception
+ {
+ // Acquire lock for a low priority task
+ final Task lowPriorityTask = NoopTask.create(5);
+ lockbox.add(lowPriorityTask);
+ taskStorage.insert(lowPriorityTask, TaskStatus.running(lowPriorityTask.getId()));
+ tryTimeChunkLock(
+ TaskLockType.EXCLUSIVE,
+ lowPriorityTask,
+ Intervals.of("2017/2018")
+ );
+
+ final Map<String, Integer> minTaskPriority = new HashMap<>();
+ minTaskPriority.put(lowPriorityTask.getDataSource(), 10);
+
+ Map<String, List<Interval>> lockedIntervals = lockbox.getLockedIntervals(minTaskPriority);
+ Assert.assertTrue(lockedIntervals.isEmpty());
+ }
+
+ @Test
+ public void testGetLockedIntervalsForEqualPriorityTask() throws Exception
+ {
+ // Acquire lock for a low priority task
+ final Task task = NoopTask.create(5);
+ lockbox.add(task);
+ taskStorage.insert(task, TaskStatus.running(task.getId()));
+ tryTimeChunkLock(
+ TaskLockType.EXCLUSIVE,
+ task,
+ Intervals.of("2017/2018")
+ );
+
+ final Map<String, Integer> minTaskPriority = new HashMap<>();
+ minTaskPriority.put(task.getDataSource(), 5);
+
+ Map<String, List<Interval>> lockedIntervals = lockbox.getLockedIntervals(minTaskPriority);
+ Assert.assertEquals(1, lockedIntervals.size());
+ Assert.assertEquals(
+ Collections.singletonList(Intervals.of("2017/2018")),
+ lockedIntervals.get(task.getDataSource())
+ );
+ }
+
+ @Test
+ public void testGetLockedIntervalsForRevokedLocks() throws Exception
+ {
+ // Acquire lock for a low priority task
+ final Task lowPriorityTask = NoopTask.create(5);
+ lockbox.add(lowPriorityTask);
+ taskStorage.insert(lowPriorityTask, TaskStatus.running(lowPriorityTask.getId()));
+ tryTimeChunkLock(
+ TaskLockType.EXCLUSIVE,
+ lowPriorityTask,
+ Intervals.of("2017/2018")
+ );
+
+ final Map<String, Integer> minTaskPriority = new HashMap<>();
+ minTaskPriority.put(lowPriorityTask.getDataSource(), 1);
+
+ Map<String, List<Interval>> lockedIntervals = lockbox.getLockedIntervals(minTaskPriority);
+ Assert.assertEquals(1, lockedIntervals.size());
+ Assert.assertEquals(
+ Collections.singletonList(
+ Intervals.of("2017/2018")),
+ lockedIntervals.get(lowPriorityTask.getDataSource())
+ );
+
+ // Revoke the lowPriorityTask
+ final Task highPriorityTask = NoopTask.create(10);
+ lockbox.add(highPriorityTask);
+ tryTimeChunkLock(
+ TaskLockType.EXCLUSIVE,
+ highPriorityTask,
+ Intervals.of("2017-05-01/2017-06-01")
+ );
+
+ // Verify the locked intervals
+ minTaskPriority.put(highPriorityTask.getDataSource(), 1);
+ lockedIntervals = lockbox.getLockedIntervals(minTaskPriority);
+ Assert.assertEquals(1, lockedIntervals.size());
+ Assert.assertEquals(
+ Collections.singletonList(
+ Intervals.of("2017-05-01/2017-06-01")),
+ lockedIntervals.get(highPriorityTask.getDataSource())
+ );
+ }
+
private Set<TaskLock> getAllLocks(List<Task> tasks)
{
return tasks.stream()
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java
index bdeef05..2dbd9d1 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java
@@ -25,6 +25,8 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.TaskLock;
+import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
@@ -115,6 +117,40 @@ public class TaskQueueTest extends IngestionTestBase
}
@Test
+ public void testShutdownReleasesTaskLock() throws Exception
+ {
+ final TaskActionClientFactory actionClientFactory = createActionClientFactory();
+ final TaskQueue taskQueue = new TaskQueue(
+ new TaskLockConfig(),
+ new TaskQueueConfig(null, null, null, null),
+ new DefaultTaskConfig(),
+ getTaskStorage(),
+ new SimpleTaskRunner(actionClientFactory),
+ actionClientFactory,
+ getLockbox(),
+ new NoopServiceEmitter()
+ );
+ taskQueue.setActive(true);
+
+ // Create a Task and add it to the TaskQueue
+ final TestTask task = new TestTask("t1", Intervals.of("2021-01/P1M"));
+ taskQueue.add(task);
+
+ // Acquire a lock for the Task
+ getLockbox().lock(
+ task,
+ new TimeChunkLockRequest(TaskLockType.EXCLUSIVE, task, task.interval, null)
+ );
+ final List<TaskLock> locksForTask = getLockbox().findLocksForTask(task);
+ Assert.assertEquals(1, locksForTask.size());
+ Assert.assertEquals(task.interval, locksForTask.get(0).getInterval());
+
+ // Verify that locks are removed on calling shutdown
+ taskQueue.shutdown(task.getId(), "Shutdown Task");
+ Assert.assertTrue(getLockbox().findLocksForTask(task).isEmpty());
+ }
+
+ @Test
public void testSetUseLineageBasedSegmentAllocationByDefault() throws EntryExistsException
{
final TaskActionClientFactory actionClientFactory = createActionClientFactory();
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java
index e41a6c9..c475166 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java
@@ -19,6 +19,8 @@
package org.apache.druid.indexing.overlord.http;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
@@ -43,6 +45,7 @@ import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
import org.apache.druid.indexing.overlord.TaskStorageQueryAdapter;
import org.apache.druid.indexing.overlord.WorkerTaskRunnerQueryAdapter;
import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.server.security.Access;
@@ -70,6 +73,7 @@ import org.junit.rules.ExpectedException;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
@@ -1060,6 +1064,62 @@ public class OverlordResourceTest
}
@Test
+ public void testGetLockedIntervals() throws Exception
+ {
+ final Map<String, Integer> minTaskPriority = Collections.singletonMap("ds1", 0);
+ final Map<String, List<Interval>> expectedLockedIntervals = Collections.singletonMap(
+ "ds1",
+ Arrays.asList(
+ Intervals.of("2012-01-01/2012-01-02"),
+ Intervals.of("2012-01-02/2012-01-03")
+ )
+ );
+
+ EasyMock.expect(taskStorageQueryAdapter.getLockedIntervals(minTaskPriority))
+ .andReturn(expectedLockedIntervals);
+ EasyMock.replay(
+ taskRunner,
+ taskMaster,
+ taskStorageQueryAdapter,
+ indexerMetadataStorageAdapter,
+ req,
+ workerTaskRunnerQueryAdapter
+ );
+
+ final Response response = overlordResource.getDatasourceLockedIntervals(minTaskPriority);
+ Assert.assertEquals(200, response.getStatus());
+
+ final ObjectMapper jsonMapper = TestHelper.makeJsonMapper();
+ Map<String, List<Interval>> observedLockedIntervals = jsonMapper.readValue(
+ jsonMapper.writeValueAsString(response.getEntity()),
+ new TypeReference<Map<String, List<Interval>>>()
+ {
+ }
+ );
+
+ Assert.assertEquals(expectedLockedIntervals, observedLockedIntervals);
+ }
+
+ @Test
+ public void testGetLockedIntervalsWithEmptyBody()
+ {
+ EasyMock.replay(
+ taskRunner,
+ taskMaster,
+ taskStorageQueryAdapter,
+ indexerMetadataStorageAdapter,
+ req,
+ workerTaskRunnerQueryAdapter
+ );
+
+ Response response = overlordResource.getDatasourceLockedIntervals(null);
+ Assert.assertEquals(400, response.getStatus());
+
+ response = overlordResource.getDatasourceLockedIntervals(Collections.emptyMap());
+ Assert.assertEquals(400, response.getStatus());
+ }
+
+ @Test
public void testShutdownTask()
{
// This is disabled since OverlordResource.doShutdown is annotated with TaskResourceFilter
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java
index 5209954..108e2c8 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java
@@ -216,7 +216,7 @@ public class OverlordTest
}
Assert.assertEquals(taskMaster.getCurrentLeader(), druidNode.getHostAndPort());
- final TaskStorageQueryAdapter taskStorageQueryAdapter = new TaskStorageQueryAdapter(taskStorage);
+ final TaskStorageQueryAdapter taskStorageQueryAdapter = new TaskStorageQueryAdapter(taskStorage, taskLockbox);
final WorkerTaskRunnerQueryAdapter workerTaskRunnerQueryAdapter = new WorkerTaskRunnerQueryAdapter(taskMaster, null);
// Test Overlord resource stuff
overlordResource = new OverlordResource(
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java b/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java
index 6e40ee2..184f514 100644
--- a/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java
+++ b/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java
@@ -44,6 +44,7 @@ import org.apache.druid.testing.guice.TestClient;
import org.apache.druid.testing.utils.ITRetryUtil;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+import org.joda.time.Interval;
import java.net.URL;
import java.util.ArrayList;
@@ -246,6 +247,31 @@ public class OverlordResourceTestClient
}
}
+ public Map<String, List<Interval>> getLockedIntervals(Map<String, Integer> minTaskPriority)
+ {
+ try {
+ String jsonBody = jsonMapper.writeValueAsString(minTaskPriority);
+
+ StatusResponseHolder response = httpClient.go(
+ new Request(HttpMethod.POST, new URL(getIndexerURL() + "lockedIntervals"))
+ .setContent(
+ "application/json",
+ StringUtils.toUtf8(jsonBody)
+ ),
+ StatusResponseHandler.getInstance()
+ ).get();
+ return jsonMapper.readValue(
+ response.getContent(),
+ new TypeReference<Map<String, List<Interval>>>()
+ {
+ }
+ );
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
public void waitUntilTaskCompletes(final String taskID)
{
waitUntilTaskCompletes(taskID, ITRetryUtil.DEFAULT_RETRY_SLEEP, ITRetryUtil.DEFAULT_RETRY_COUNT);
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/CompactionUtil.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/CompactionUtil.java
new file mode 100644
index 0000000..5ccd5f4
--- /dev/null
+++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/CompactionUtil.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.utils;
+
+import org.apache.druid.data.input.MaxSizeSplitHintSpec;
+import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
+import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
+import org.apache.druid.server.coordinator.UserCompactionTaskIOConfig;
+import org.apache.druid.server.coordinator.UserCompactionTaskQueryTuningConfig;
+import org.joda.time.Period;
+
+/**
+ * Contains utility methods for Compaction.
+ */
+public class CompactionUtil
+{
+
+ private CompactionUtil()
+ {
+ // no instantiation
+ }
+
+ public static DataSourceCompactionConfig createCompactionConfig(
+ String fullDatasourceName,
+ Integer maxRowsPerSegment,
+ Period skipOffsetFromLatest
+ )
+ {
+ return new DataSourceCompactionConfig(
+ fullDatasourceName,
+ null,
+ null,
+ null,
+ skipOffsetFromLatest,
+ new UserCompactionTaskQueryTuningConfig(
+ null,
+ null,
+ null,
+ new MaxSizeSplitHintSpec(null, 1),
+ new DynamicPartitionsSpec(maxRowsPerSegment, null),
+ null,
+ null,
+ null,
+ null,
+ null,
+ 1,
+ null,
+ null,
+ null,
+ null,
+ null,
+ 1
+ ),
+ null,
+ new UserCompactionTaskIOConfig(true),
+ null
+ );
+ }
+
+}
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionLockContentionTest.java b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionLockContentionTest.java
new file mode 100644
index 0000000..84c1bf5
--- /dev/null
+++ b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionLockContentionTest.java
@@ -0,0 +1,418 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.tests.coordinator.duty;
+
+import com.google.inject.Inject;
+import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
+import org.apache.druid.server.coordinator.CoordinatorCompactionConfig;
+import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
+import org.apache.druid.testing.clients.CompactionResourceTestClient;
+import org.apache.druid.testing.clients.TaskResponseObject;
+import org.apache.druid.testing.guice.DruidTestModuleFactory;
+import org.apache.druid.testing.utils.CompactionUtil;
+import org.apache.druid.testing.utils.EventSerializer;
+import org.apache.druid.testing.utils.ITRetryUtil;
+import org.apache.druid.testing.utils.KafkaUtil;
+import org.apache.druid.testing.utils.StreamEventWriter;
+import org.apache.druid.testing.utils.StreamGenerator;
+import org.apache.druid.testing.utils.WikipediaStreamEventStreamGenerator;
+import org.apache.druid.tests.TestNGGroup;
+import org.apache.druid.tests.indexer.AbstractKafkaIndexingServiceTest;
+import org.apache.druid.tests.indexer.AbstractStreamIndexingTest;
+import org.apache.druid.timeline.DataSegment;
+import org.joda.time.Interval;
+import org.joda.time.Period;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+import java.io.Closeable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Integration Test to verify behaviour when there is a lock contention between
+ * compaction tasks and on-going stream ingestion tasks.
+ */
+@Test(groups = {TestNGGroup.COMPACTION})
+@Guice(moduleFactory = DruidTestModuleFactory.class)
+public class ITAutoCompactionLockContentionTest extends AbstractKafkaIndexingServiceTest
+{
+ private static final Logger LOG = new Logger(ITAutoCompactionLockContentionTest.class);
+
+ @Inject
+ private CompactionResourceTestClient compactionResource;
+
+ private GeneratedTestConfig generatedTestConfig;
+ private StreamGenerator streamGenerator;
+
+ private String fullDatasourceName;
+
+ @DataProvider
+ public static Object[] getParameters()
+ {
+ return new Object[]{false, true};
+ }
+
+ @BeforeClass
+ public void setupClass() throws Exception
+ {
+ doBeforeClass();
+ }
+
+ @BeforeMethod
+ public void setup() throws Exception
+ {
+ generatedTestConfig = new GeneratedTestConfig(
+ Specs.PARSER_TYPE,
+ getResourceAsString(Specs.INPUT_FORMAT_PATH)
+ );
+ fullDatasourceName = generatedTestConfig.getFullDatasourceName();
+ final EventSerializer serializer = jsonMapper.readValue(
+ getResourceAsStream(Specs.SERIALIZER_PATH),
+ EventSerializer.class
+ );
+ streamGenerator = new WikipediaStreamEventStreamGenerator(serializer, 6, 100);
+ }
+
+ @Override
+ public String getTestNamePrefix()
+ {
+ return "autocompact_lock_contention";
+ }
+
+ @Test(dataProvider = "getParameters")
+ public void testAutoCompactionSkipsLockedIntervals(boolean transactionEnabled) throws Exception
+ {
+ if (shouldSkipTest(transactionEnabled)) {
+ return;
+ }
+
+ try (
+ final Closeable closer = createResourceCloser(generatedTestConfig);
+ final StreamEventWriter streamEventWriter = createStreamEventWriter(config, transactionEnabled)
+ ) {
+ // Start supervisor
+ final String taskSpec = generatedTestConfig.getStreamIngestionPropsTransform()
+ .apply(getResourceAsString(SUPERVISOR_SPEC_TEMPLATE_PATH));
+ generatedTestConfig.setSupervisorId(indexer.submitSupervisor(taskSpec));
+ LOG.info("supervisorSpec: [%s]", taskSpec);
+
+ // Generate data for minutes 1, 2 and 3
+ final Interval minute1 = Intervals.of("2000-01-01T01:01:00Z/2000-01-01T01:02:00Z");
+ final long rowsForMinute1 = generateData(minute1, streamEventWriter);
+
+ final Interval minute2 = Intervals.of("2000-01-01T01:02:00Z/2000-01-01T01:03:00Z");
+ long rowsForMinute2 = generateData(minute2, streamEventWriter);
+
+ final Interval minute3 = Intervals.of("2000-01-01T01:03:00Z/2000-01-01T01:04:00Z");
+ final long rowsForMinute3 = generateData(minute3, streamEventWriter);
+
+ // Wait for data to be ingested for all the minutes
+ ensureRowCount(rowsForMinute1 + rowsForMinute2 + rowsForMinute3);
+
+ // Wait for the segments to be loaded and interval locks to be released
+ ensureLockedIntervals();
+ ensureSegmentsLoaded();
+
+ // 2 segments for each minute, total 6
+ ensureSegmentsCount(6);
+
+ // Generate more data for minute2 so that it gets locked
+ rowsForMinute2 += generateData(minute2, streamEventWriter);
+ ensureLockedIntervals(minute2);
+
+ // Trigger auto compaction
+ submitAndVerifyCompactionConfig();
+ compactionResource.forceTriggerAutoCompaction();
+
+ // Wait for segments to be loaded
+ ensureRowCount(rowsForMinute1 + rowsForMinute2 + rowsForMinute3);
+ ensureLockedIntervals();
+ ensureSegmentsLoaded();
+
+ // Verify that minute1 and minute3 have been compacted
+ ensureCompactionTaskCount(2);
+ verifyCompactedIntervals(minute1, minute3);
+
+ // Trigger auto compaction again
+ compactionResource.forceTriggerAutoCompaction();
+
+ // Verify that all the segments are now compacted
+ ensureCompactionTaskCount(3);
+ ensureSegmentsLoaded();
+ verifyCompactedIntervals(minute1, minute2, minute3);
+ ensureSegmentsCount(3);
+ }
+ }
+
+ /**
+ * Retries until the segment count is as expected.
+ */
+ private void ensureSegmentsCount(int numExpectedSegments)
+ {
+ ITRetryUtil.retryUntilTrue(
+ () -> {
+ List<DataSegment> segments = coordinator.getFullSegmentsMetadata(fullDatasourceName);
+ StringBuilder sb = new StringBuilder();
+ segments.forEach(
+ seg -> sb.append("{")
+ .append(seg.getId())
+ .append(", ")
+ .append(seg.getSize())
+ .append("}, ")
+ );
+ LOG.info("Found Segments: %s", sb);
+ LOG.info("Current metadata segment count: %d, expected: %d", segments.size(), numExpectedSegments);
+ return segments.size() == numExpectedSegments;
+ },
+ "Segment count check"
+ );
+ }
+
+ /**
+ * Verifies that the given intervals have been compacted.
+ */
+ private void verifyCompactedIntervals(Interval... compactedIntervals)
+ {
+ List<DataSegment> segments = coordinator.getFullSegmentsMetadata(fullDatasourceName);
+ List<DataSegment> observedCompactedSegments = new ArrayList<>();
+ Set<Interval> observedCompactedIntervals = new HashSet<>();
+ for (DataSegment segment : segments) {
+ if (segment.getLastCompactionState() != null) {
+ observedCompactedSegments.add(segment);
+ observedCompactedIntervals.add(segment.getInterval());
+ }
+ }
+
+ Set<Interval> expectedCompactedIntervals = new HashSet<>(Arrays.asList(compactedIntervals));
+ Assert.assertEquals(observedCompactedIntervals, expectedCompactedIntervals);
+
+ DynamicPartitionsSpec expectedPartitionSpec = new DynamicPartitionsSpec(
+ Specs.MAX_ROWS_PER_SEGMENT,
+ Long.MAX_VALUE
+ );
+ for (DataSegment compactedSegment : observedCompactedSegments) {
+ Assert.assertNotNull(compactedSegment.getLastCompactionState());
+ Assert.assertEquals(
+ compactedSegment.getLastCompactionState().getPartitionsSpec(),
+ expectedPartitionSpec
+ );
+ }
+ }
+
+ /**
+ * Generates data points for the specified interval.
+ *
+ * @return Number of rows generated.
+ */
+ private long generateData(Interval interval, StreamEventWriter streamEventWriter)
+ {
+ long rowCount = streamGenerator.run(
+ generatedTestConfig.getStreamName(),
+ streamEventWriter,
+ 10,
+ interval.getStart()
+ );
+ LOG.info("Generated %d Rows for Interval [%s]", rowCount, interval);
+
+ return rowCount;
+ }
+
+ /**
+ * Retries until segments have been loaded.
+ */
+ private void ensureSegmentsLoaded()
+ {
+ ITRetryUtil.retryUntilTrue(
+ () -> coordinator.areSegmentsLoaded(fullDatasourceName),
+ "Segment Loading"
+ );
+ }
+
+ /**
+ * Retries until the specified Intervals are locked for the current datasource.
+ * If no interval has been specified, retries until no interval is locked
+ */
+ private void ensureLockedIntervals(Interval... intervals)
+ {
+ final Map<String, Integer> minTaskPriority = Collections.singletonMap(fullDatasourceName, 0);
+ final List<Interval> lockedIntervals = new ArrayList<>();
+ ITRetryUtil.retryUntilTrue(
+ () -> {
+ lockedIntervals.clear();
+
+ Map<String, List<Interval>> allIntervals = indexer.getLockedIntervals(minTaskPriority);
+ if (allIntervals.containsKey(fullDatasourceName)) {
+ lockedIntervals.addAll(allIntervals.get(fullDatasourceName));
+ }
+
+ LOG.info("Locked intervals: %s", lockedIntervals);
+ return intervals.length == lockedIntervals.size();
+ },
+ "Verify Locked Intervals"
+ );
+
+ Assert.assertEquals(lockedIntervals, Arrays.asList(intervals));
+ }
+
+ /**
+ * Checks if a test should be skipped based on whether transaction is enabled or not.
+ */
+ private boolean shouldSkipTest(boolean testEnableTransaction)
+ {
+ Map<String, String> kafkaTestProps = KafkaUtil
+ .getAdditionalKafkaTestConfigFromProperties(config);
+ boolean configEnableTransaction = Boolean.parseBoolean(
+ kafkaTestProps.getOrDefault(KafkaUtil.TEST_CONFIG_TRANSACTION_ENABLED, "false")
+ );
+
+ return configEnableTransaction != testEnableTransaction;
+ }
+
+ /**
+ * Submits a compaction config for the current datasource.
+ */
+ private void submitAndVerifyCompactionConfig() throws Exception
+ {
+ final DataSourceCompactionConfig compactionConfig = CompactionUtil
+ .createCompactionConfig(fullDatasourceName, Specs.MAX_ROWS_PER_SEGMENT, Period.ZERO);
+ compactionResource.updateCompactionTaskSlot(0.5, 10);
+ compactionResource.submitCompactionConfig(compactionConfig);
+
+ // Wait for compaction config to persist
+ Thread.sleep(2000);
+
+ // Verify that the compaction config is updated correctly.
+ CoordinatorCompactionConfig coordinatorCompactionConfig = compactionResource.getCoordinatorCompactionConfigs();
+ DataSourceCompactionConfig observedCompactionConfig = null;
+ for (DataSourceCompactionConfig dataSourceCompactionConfig : coordinatorCompactionConfig.getCompactionConfigs()) {
+ if (dataSourceCompactionConfig.getDataSource().equals(fullDatasourceName)) {
+ observedCompactionConfig = dataSourceCompactionConfig;
+ }
+ }
+ Assert.assertEquals(observedCompactionConfig, compactionConfig);
+
+ observedCompactionConfig = compactionResource.getDataSourceCompactionConfig(fullDatasourceName);
+ Assert.assertEquals(observedCompactionConfig, compactionConfig);
+ }
+
+ /**
+ * Checks if the given TaskResponseObject represents a Compaction Task.
+ */
+ private boolean isCompactionTask(TaskResponseObject taskResponse)
+ {
+ return "compact".equalsIgnoreCase(taskResponse.getType());
+ }
+
+ /**
+ * Retries until the total number of complete compaction tasks is as expected.
+ */
+ private void ensureCompactionTaskCount(int expectedCount)
+ {
+ LOG.info("Verifying compaction task count. Expected: %d", expectedCount);
+ ITRetryUtil.retryUntilTrue(
+ () -> getCompactionTaskCount() == expectedCount,
+ "Compaction Task Count"
+ );
+ }
+
+ /**
+ * Gets the number of complete compaction tasks.
+ */
+ private long getCompactionTaskCount()
+ {
+ List<TaskResponseObject> incompleteTasks = indexer
+ .getUncompletedTasksForDataSource(fullDatasourceName);
+ List<TaskResponseObject> completeTasks = indexer
+ .getCompleteTasksForDataSource(fullDatasourceName);
+
+ printTasks(incompleteTasks, "Incomplete");
+ printTasks(completeTasks, "Complete");
+
+ return completeTasks.stream().filter(this::isCompactionTask).count();
+ }
+
+ private void printTasks(List<TaskResponseObject> tasks, String taskState)
+ {
+ StringBuilder sb = new StringBuilder();
+ tasks.forEach(
+ task -> sb.append("{")
+ .append(task.getType())
+ .append(", ")
+ .append(task.getStatus())
+ .append(", ")
+ .append(task.getCreatedTime())
+ .append("}, ")
+ );
+ LOG.info("%s Tasks: %s", taskState, sb);
+ }
+
+ /**
+ * Retries until the total row count is as expected.
+ */
+ private void ensureRowCount(long totalRows)
+ {
+ LOG.info("Verifying Row Count. Expected: %s", totalRows);
+ ITRetryUtil.retryUntilTrue(
+ () ->
+ totalRows == this.queryHelper.countRows(
+ fullDatasourceName,
+ Intervals.ETERNITY,
+ name -> new LongSumAggregatorFactory(name, "count")
+ ),
+ StringUtils.format(
+ "dataSource[%s] consumed [%,d] events, expected [%,d]",
+ fullDatasourceName,
+ this.queryHelper.countRows(
+ fullDatasourceName,
+ Intervals.ETERNITY,
+ name -> new LongSumAggregatorFactory(name, "count")
+ ),
+ totalRows
+ )
+ );
+ }
+
+ /**
+ * Constants for test specs.
+ */
+ private static class Specs
+ {
+ static final String SERIALIZER_PATH = DATA_RESOURCE_ROOT + "/csv/serializer/serializer.json";
+ static final String INPUT_FORMAT_PATH = DATA_RESOURCE_ROOT + "/csv/input_format/input_format.json";
+ static final String PARSER_TYPE = AbstractStreamIndexingTest.INPUT_FORMAT;
+
+ static final int MAX_ROWS_PER_SEGMENT = 10000;
+ }
+
+}
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractIndexerTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractIndexerTest.java
index 6c5067b..f8fd41a 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractIndexerTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractIndexerTest.java
@@ -64,7 +64,7 @@ public abstract class AbstractIndexerTest
protected TestQueryHelper queryHelper;
@Inject
- private IntegrationTestingConfig config;
+ protected IntegrationTestingConfig config;
protected Closeable unloader(final String dataSource)
{
@@ -101,7 +101,7 @@ public abstract class AbstractIndexerTest
unloadAndKillData(dataSource, first, last);
}
- protected void loadData(String indexTask, final String fullDatasourceName) throws Exception
+ protected String submitIndexTask(String indexTask, final String fullDatasourceName) throws Exception
{
String taskSpec = getResourceAsString(indexTask);
taskSpec = StringUtils.replace(taskSpec, "%%DATASOURCE%%", fullDatasourceName);
@@ -112,6 +112,13 @@ public abstract class AbstractIndexerTest
);
final String taskID = indexer.submitTask(taskSpec);
LOG.info("TaskID for loading index task %s", taskID);
+
+ return taskID;
+ }
+
+ protected void loadData(String indexTask, final String fullDatasourceName) throws Exception
+ {
+ final String taskID = submitIndexTask(indexTask, fullDatasourceName);
indexer.waitUntilTaskCompletes(taskID);
ITRetryUtil.retryUntilTrue(
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java
index 3c9ecda..72fbef2 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java
@@ -98,9 +98,6 @@ public abstract class AbstractStreamIndexingTest extends AbstractIndexerTest
@Inject
private DruidClusterAdminClient druidClusterAdminClient;
- @Inject
- private IntegrationTestingConfig config;
-
private StreamAdminClient streamAdminClient;
abstract StreamAdminClient createStreamAdminClient(IntegrationTestingConfig config) throws Exception;
@@ -171,7 +168,7 @@ public abstract class AbstractStreamIndexingTest extends AbstractIndexerTest
return map;
}
- private Closeable createResourceCloser(GeneratedTestConfig generatedTestConfig)
+ protected Closeable createResourceCloser(GeneratedTestConfig generatedTestConfig)
{
return Closer.create().register(() -> doMethodTeardown(generatedTestConfig));
}
@@ -647,7 +644,7 @@ public abstract class AbstractStreamIndexingTest extends AbstractIndexerTest
}
}
- private class GeneratedTestConfig
+ protected class GeneratedTestConfig
{
private final String streamName;
private final String fullDatasourceName;
@@ -655,7 +652,7 @@ public abstract class AbstractStreamIndexingTest extends AbstractIndexerTest
private Function<String, String> streamIngestionPropsTransform;
private Function<String, String> streamQueryPropsTransform;
- GeneratedTestConfig(String parserType, String parserOrInputFormat) throws Exception
+ public GeneratedTestConfig(String parserType, String parserOrInputFormat) throws Exception
{
streamName = getTestNamePrefix() + "_index_test_" + UUID.randomUUID();
String datasource = getTestNamePrefix() + "_indexing_service_test_" + UUID.randomUUID();
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java
index ef24182..ec9c753 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java
@@ -21,6 +21,7 @@ package org.apache.druid.tests.indexer;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.inject.Inject;
+import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
@@ -28,10 +29,16 @@ import org.apache.druid.testing.clients.CoordinatorResourceTestClient;
import org.apache.druid.testing.guice.DruidTestModuleFactory;
import org.apache.druid.testing.utils.ITRetryUtil;
import org.apache.druid.tests.TestNGGroup;
+import org.joda.time.Interval;
+import org.testng.Assert;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;
import java.io.Closeable;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import java.util.function.Function;
@Test(groups = {TestNGGroup.BATCH_INDEX, TestNGGroup.QUICKSTART_COMPATIBLE})
@@ -65,6 +72,8 @@ public class ITIndexerTest extends AbstractITBatchIndexTest
private static final String INDEX_WITH_MERGE_COLUMN_LIMIT_TASK = "/indexer/wikipedia_index_with_merge_column_limit_task.json";
private static final String INDEX_WITH_MERGE_COLUMN_LIMIT_DATASOURCE = "wikipedia_index_with_merge_column_limit_test";
+ private static final String GET_LOCKED_INTERVALS = "wikipedia_index_get_locked_intervals_test";
+
private static final CoordinatorDynamicConfig DYNAMIC_CONFIG_PAUSED =
CoordinatorDynamicConfig.builder().withPauseCoordination(true).build();
private static final CoordinatorDynamicConfig DYNAMIC_CONFIG_DEFAULT =
@@ -294,4 +303,37 @@ public class ITIndexerTest extends AbstractITBatchIndexTest
);
}
}
+
+ @Test
+ public void testGetLockedIntervals() throws Exception
+ {
+ final String datasourceName = GET_LOCKED_INTERVALS + config.getExtraDatasourceNameSuffix();
+ try (final Closeable ignored = unloader(datasourceName)) {
+ // Submit an Indexing Task
+ submitIndexTask(INDEX_TASK, datasourceName);
+
+ // Wait until it acquires a lock
+ final Map<String, Integer> minTaskPriority = Collections.singletonMap(datasourceName, 0);
+ final Map<String, List<Interval>> lockedIntervals = new HashMap<>();
+ ITRetryUtil.retryUntilFalse(
+ () -> {
+ lockedIntervals.clear();
+ lockedIntervals.putAll(indexer.getLockedIntervals(minTaskPriority));
+ return lockedIntervals.isEmpty();
+ },
+ "Verify Intervals are Locked"
+ );
+
+ // Verify the locked intervals for this datasource
+ Assert.assertEquals(lockedIntervals.size(), 1);
+ Assert.assertEquals(
+ lockedIntervals.get(datasourceName),
+ Collections.singletonList(Intervals.of("2013-08-31/2013-09-02"))
+ );
+
+ waitForAllTasksToCompleteForDataSource(datasourceName);
+ }
+
+ }
+
}
diff --git a/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java b/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java
index d8f8f35..44ba61c 100644
--- a/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java
+++ b/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java
@@ -44,6 +44,7 @@ import javax.ws.rs.core.MediaType;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -336,6 +337,28 @@ public class HttpIndexingServiceClient implements IndexingServiceClient
}
@Override
+ public Map<String, List<Interval>> getLockedIntervals(Map<String, Integer> minTaskPriority)
+ {
+ try {
+ final StringFullResponseHolder responseHolder = druidLeaderClient.go(
+ druidLeaderClient.makeRequest(HttpMethod.POST, "/druid/indexer/v1/lockedIntervals")
+ .setContent(MediaType.APPLICATION_JSON, jsonMapper.writeValueAsBytes(minTaskPriority))
+ );
+
+ final Map<String, List<Interval>> response = jsonMapper.readValue(
+ responseHolder.getContent(),
+ new TypeReference<Map<String, List<Interval>>>()
+ {
+ }
+ );
+ return response == null ? Collections.emptyMap() : response;
+ }
+ catch (IOException | InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
public SamplerResponse sample(SamplerSpec samplerSpec)
{
try {
diff --git a/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java b/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java
index 14dfcfe..c379077 100644
--- a/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java
+++ b/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java
@@ -66,5 +66,17 @@ public interface IndexingServiceClient
@Nullable
TaskPayloadResponse getTaskPayload(String taskId);
+ /**
+ * Gets a List of Intervals locked by higher priority tasks for each datasource.
+ *
+ * @param minTaskPriority Minimum task priority for each datasource. Only the
+ * Intervals that are locked by Tasks higher than this
+ * priority are returned. Tasks for datasources that
+ * are not present in this Map are not returned.
+ * @return Map from Datasource to List of Intervals locked by Tasks that have
+ * priority strictly greater than the {@code minTaskPriority} for that datasource.
+ */
+ Map<String, List<Interval>> getLockedIntervals(Map<String, Integer> minTaskPriority);
+
SamplerResponse sample(SamplerSpec samplerSpec);
}
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorConfig.java
index c82419c..14e3ce0 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorConfig.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorConfig.java
@@ -132,4 +132,11 @@ public abstract class DruidCoordinatorConfig
{
return 1;
}
+
+ @Config("druid.coordinator.compaction.skipLockedIntervals")
+ public boolean getCompactionSkipLockedIntervals()
+ {
+ return true;
+ }
+
}
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
index 5d31751..e48731b 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
@@ -38,6 +38,7 @@ import org.apache.druid.server.coordinator.CompactionStatistics;
import org.apache.druid.server.coordinator.CoordinatorCompactionConfig;
import org.apache.druid.server.coordinator.CoordinatorStats;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
+import org.apache.druid.server.coordinator.DruidCoordinatorConfig;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.VersionedIntervalTimeline;
@@ -79,6 +80,7 @@ public class CompactSegments implements CoordinatorDuty
private static final Logger LOG = new Logger(CompactSegments.class);
private final CompactionSegmentSearchPolicy policy;
+ private final boolean skipLockedIntervals;
private final IndexingServiceClient indexingServiceClient;
// This variable is updated by the Coordinator thread executing duties and
@@ -87,13 +89,17 @@ public class CompactSegments implements CoordinatorDuty
@Inject
public CompactSegments(
+ DruidCoordinatorConfig config,
ObjectMapper objectMapper,
IndexingServiceClient indexingServiceClient
)
{
this.policy = new NewestSegmentFirstPolicy(objectMapper);
this.indexingServiceClient = indexingServiceClient;
+ this.skipLockedIntervals = config.getCompactionSkipLockedIntervals();
autoCompactionSnapshotPerDataSource.set(new HashMap<>());
+
+ LOG.info("Scheduling compaction with skipLockedIntervals [%s]", skipLockedIntervals);
}
@Override
@@ -114,9 +120,10 @@ public class CompactSegments implements CoordinatorDuty
.stream()
.collect(Collectors.toMap(DataSourceCompactionConfig::getDataSource, Function.identity()));
final List<TaskStatusPlus> compactionTasks = filterNonCompactionTasks(indexingServiceClient.getActiveTasks());
- // dataSource -> list of intervals of compaction tasks
- final Map<String, List<Interval>> compactionTaskIntervals = Maps.newHashMapWithExpectedSize(
- compactionConfigList.size());
+
+ // dataSource -> list of intervals for which compaction will be skipped in this run
+ final Map<String, List<Interval>> intervalsToSkipCompaction = new HashMap<>();
+
int numEstimatedNonCompleteCompactionTasks = 0;
for (TaskStatusPlus status : compactionTasks) {
final TaskPayloadResponse response = indexingServiceClient.getTaskPayload(status.getId());
@@ -144,7 +151,7 @@ public class CompactSegments implements CoordinatorDuty
}
// Skip interval as the current active compaction task is good
final Interval interval = compactionTaskQuery.getIoConfig().getInputSpec().getInterval();
- compactionTaskIntervals.computeIfAbsent(status.getDataSource(), k -> new ArrayList<>()).add(interval);
+ intervalsToSkipCompaction.computeIfAbsent(status.getDataSource(), k -> new ArrayList<>()).add(interval);
// Since we keep the current active compaction task running, we count the active task slots
numEstimatedNonCompleteCompactionTasks += findMaxNumTaskSlotsUsedByOneCompactionTask(
compactionTaskQuery.getTuningConfig()
@@ -154,8 +161,18 @@ public class CompactSegments implements CoordinatorDuty
}
}
+ // Skip all the intervals locked by higher priority tasks for each datasource
+ // This must be done after the invalid compaction tasks are cancelled
+ // in the loop above so that their intervals are not considered locked
+ getLockedIntervalsToSkip(compactionConfigList).forEach(
+ (dataSource, intervals) ->
+ intervalsToSkipCompaction
+ .computeIfAbsent(dataSource, ds -> new ArrayList<>())
+ .addAll(intervals)
+ );
+
final CompactionSegmentIterator iterator =
- policy.reset(compactionConfigs, dataSources, compactionTaskIntervals);
+ policy.reset(compactionConfigs, dataSources, intervalsToSkipCompaction);
final int compactionTaskCapacity = (int) Math.min(
indexingServiceClient.getTotalWorkerCapacity() * dynamicConfig.getCompactionTaskSlotRatio(),
@@ -209,6 +226,47 @@ public class CompactSegments implements CoordinatorDuty
}
/**
+ * Gets a List of Intervals locked by higher priority tasks for each datasource.
+ * Since compaction tasks submitted for these Intervals would have to wait anyway,
+ * we skip these Intervals until the next compaction run.
+ * <p>
+ * For now, Segment Locks are being treated the same as Time Chunk Locks even
+ * though they lock only a Segment and not the entire Interval. Thus,
+ * a compaction task will not be submitted for an Interval if
+ * <ul>
+ * <li>either the whole Interval is locked by a higher priority Task</li>
+ * <li>or there is atleast one Segment in the Interval that is locked by a
+ * higher priority Task</li>
+ * </ul>
+ */
+ private Map<String, List<Interval>> getLockedIntervalsToSkip(
+ List<DataSourceCompactionConfig> compactionConfigs
+ )
+ {
+ if (!skipLockedIntervals) {
+ LOG.info("Not skipping any locked interval for Compaction");
+ return new HashMap<>();
+ }
+
+ final Map<String, Integer> minTaskPriority = compactionConfigs
+ .stream()
+ .collect(
+ Collectors.toMap(
+ DataSourceCompactionConfig::getDataSource,
+ DataSourceCompactionConfig::getTaskPriority
+ )
+ );
+ final Map<String, List<Interval>> datasourceToLockedIntervals =
+ new HashMap<>(indexingServiceClient.getLockedIntervals(minTaskPriority));
+ LOG.debug(
+ "Skipping the following intervals for Compaction as they are currently locked: %s",
+ datasourceToLockedIntervals
+ );
+
+ return datasourceToLockedIntervals;
+ }
+
+ /**
* Returns the maximum number of task slots used by one compaction task at any time when the task is issued with
* the given tuningConfig.
*/
diff --git a/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java b/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java
index 600b4be..af307a4 100644
--- a/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java
+++ b/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java
@@ -108,6 +108,12 @@ public class NoopIndexingServiceClient implements IndexingServiceClient
}
@Override
+ public Map<String, List<Interval>> getLockedIntervals(Map<String, Integer> minTaskPriority)
+ {
+ return Collections.emptyMap();
+ }
+
+ @Override
public SamplerResponse sample(SamplerSpec samplerSpec)
{
return new SamplerResponse(0, 0, Collections.emptyList());
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorConfigTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorConfigTest.java
index ad56341..084e632 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorConfigTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorConfigTest.java
@@ -47,6 +47,7 @@ public class DruidCoordinatorConfigTest
Assert.assertEquals(0, config.getCoordinatorKillMaxSegments());
Assert.assertEquals(new Duration(15 * 60 * 1000), config.getLoadTimeoutDelay());
Assert.assertEquals(Duration.millis(50), config.getLoadQueuePeonRepeatDelay());
+ Assert.assertTrue(config.getCompactionSkipLockedIntervals());
//with non-defaults
Properties props = new Properties();
@@ -60,6 +61,7 @@ public class DruidCoordinatorConfigTest
props.setProperty("druid.coordinator.kill.pendingSegments.on", "true");
props.setProperty("druid.coordinator.load.timeout", "PT1s");
props.setProperty("druid.coordinator.loadqueuepeon.repeatDelay", "PT0.100s");
+ props.setProperty("druid.coordinator.compaction.skipLockedIntervals", "false");
factory = Config.createFactory(props);
config = factory.build(DruidCoordinatorConfig.class);
@@ -72,5 +74,6 @@ public class DruidCoordinatorConfigTest
Assert.assertEquals(10000, config.getCoordinatorKillMaxSegments());
Assert.assertEquals(new Duration("PT1s"), config.getLoadTimeoutDelay());
Assert.assertEquals(Duration.millis(100), config.getLoadQueuePeonRepeatDelay());
+ Assert.assertFalse(config.getCompactionSkipLockedIntervals());
}
}
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
index 90b3b5d..a1932dd 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
@@ -64,6 +64,7 @@ import org.apache.druid.server.coordinator.CoordinatorCompactionConfig;
import org.apache.druid.server.coordinator.CoordinatorRuntimeParamsTestHelpers;
import org.apache.druid.server.coordinator.CoordinatorStats;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
+import org.apache.druid.server.coordinator.DruidCoordinatorConfig;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.UserCompactionTaskGranularityConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskIOConfig;
@@ -114,6 +115,7 @@ import java.util.stream.Collectors;
public class CompactSegmentsTest
{
private static final ObjectMapper JSON_MAPPER = new DefaultObjectMapper();
+ private static final DruidCoordinatorConfig COORDINATOR_CONFIG = Mockito.mock(DruidCoordinatorConfig.class);
private static final String DATA_SOURCE_PREFIX = "dataSource_";
private static final int PARTITION_PER_TIME_INTERVAL = 4;
// Each dataSource starts with 440 byte, 44 segments, and 11 intervals needing compaction
@@ -188,6 +190,7 @@ public class CompactSegmentsTest
dataSources = DataSourcesSnapshot
.fromUsedSegments(allSegments, ImmutableMap.of())
.getUsedSegmentsTimelinesPerDataSource();
+ Mockito.when(COORDINATOR_CONFIG.getCompactionSkipLockedIntervals()).thenReturn(true);
}
private DataSegment createSegment(String dataSource, int startDay, boolean beforeNoon, int partition)
@@ -227,7 +230,7 @@ public class CompactSegmentsTest
final TestDruidLeaderClient leaderClient = new TestDruidLeaderClient(JSON_MAPPER);
leaderClient.start();
final HttpIndexingServiceClient indexingServiceClient = new HttpIndexingServiceClient(JSON_MAPPER, leaderClient);
- final CompactSegments compactSegments = new CompactSegments(JSON_MAPPER, indexingServiceClient);
+ final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, indexingServiceClient);
final Supplier<String> expectedVersionSupplier = new Supplier<String>()
{
@@ -305,7 +308,7 @@ public class CompactSegmentsTest
final TestDruidLeaderClient leaderClient = new TestDruidLeaderClient(JSON_MAPPER);
leaderClient.start();
final HttpIndexingServiceClient indexingServiceClient = new HttpIndexingServiceClient(JSON_MAPPER, leaderClient);
- final CompactSegments compactSegments = new CompactSegments(JSON_MAPPER, indexingServiceClient);
+ final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, indexingServiceClient);
// Before any compaction, we do not have any snapshot of compactions
Map<String, AutoCompactionSnapshot> autoCompactionSnapshots = compactSegments.getAutoCompactionSnapshot();
@@ -397,7 +400,7 @@ public class CompactSegmentsTest
final TestDruidLeaderClient leaderClient = new TestDruidLeaderClient(JSON_MAPPER);
leaderClient.start();
final HttpIndexingServiceClient indexingServiceClient = new HttpIndexingServiceClient(JSON_MAPPER, leaderClient);
- final CompactSegments compactSegments = new CompactSegments(JSON_MAPPER, indexingServiceClient);
+ final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, indexingServiceClient);
// Before any compaction, we do not have any snapshot of compactions
Map<String, AutoCompactionSnapshot> autoCompactionSnapshots = compactSegments.getAutoCompactionSnapshot();
@@ -490,7 +493,7 @@ public class CompactSegmentsTest
final TestDruidLeaderClient leaderClient = new TestDruidLeaderClient(JSON_MAPPER);
leaderClient.start();
final HttpIndexingServiceClient indexingServiceClient = new HttpIndexingServiceClient(JSON_MAPPER, leaderClient);
- final CompactSegments compactSegments = new CompactSegments(JSON_MAPPER, indexingServiceClient);
+ final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, indexingServiceClient);
// Before any compaction, we do not have any snapshot of compactions
Map<String, AutoCompactionSnapshot> autoCompactionSnapshots = compactSegments.getAutoCompactionSnapshot();
@@ -551,7 +554,7 @@ public class CompactSegmentsTest
final TestDruidLeaderClient leaderClient = new TestDruidLeaderClient(JSON_MAPPER);
leaderClient.start();
final HttpIndexingServiceClient indexingServiceClient = new HttpIndexingServiceClient(JSON_MAPPER, leaderClient);
- final CompactSegments compactSegments = new CompactSegments(JSON_MAPPER, indexingServiceClient);
+ final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, indexingServiceClient);
final CoordinatorStats stats = doCompactSegments(compactSegments, 3);
Assert.assertEquals(3, stats.getGlobalStat(CompactSegments.AVAILABLE_COMPACTION_TASK_SLOT));
@@ -563,7 +566,7 @@ public class CompactSegmentsTest
public void testCompactWithoutGranularitySpec()
{
final HttpIndexingServiceClient mockIndexingServiceClient = Mockito.mock(HttpIndexingServiceClient.class);
- final CompactSegments compactSegments = new CompactSegments(JSON_MAPPER, mockIndexingServiceClient);
+ final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, mockIndexingServiceClient);
final List<DataSourceCompactionConfig> compactionConfigs = new ArrayList<>();
final String dataSource = DATA_SOURCE_PREFIX + 0;
compactionConfigs.add(
@@ -619,7 +622,7 @@ public class CompactSegmentsTest
public void testCompactWithNotNullIOConfig()
{
final HttpIndexingServiceClient mockIndexingServiceClient = Mockito.mock(HttpIndexingServiceClient.class);
- final CompactSegments compactSegments = new CompactSegments(JSON_MAPPER, mockIndexingServiceClient);
+ final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, mockIndexingServiceClient);
final List<DataSourceCompactionConfig> compactionConfigs = new ArrayList<>();
final String dataSource = DATA_SOURCE_PREFIX + 0;
compactionConfigs.add(
@@ -671,7 +674,7 @@ public class CompactSegmentsTest
public void testCompactWithNullIOConfig()
{
final HttpIndexingServiceClient mockIndexingServiceClient = Mockito.mock(HttpIndexingServiceClient.class);
- final CompactSegments compactSegments = new CompactSegments(JSON_MAPPER, mockIndexingServiceClient);
+ final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, mockIndexingServiceClient);
final List<DataSourceCompactionConfig> compactionConfigs = new ArrayList<>();
final String dataSource = DATA_SOURCE_PREFIX + 0;
compactionConfigs.add(
@@ -723,7 +726,7 @@ public class CompactSegmentsTest
public void testCompactWithGranularitySpec()
{
final HttpIndexingServiceClient mockIndexingServiceClient = Mockito.mock(HttpIndexingServiceClient.class);
- final CompactSegments compactSegments = new CompactSegments(JSON_MAPPER, mockIndexingServiceClient);
+ final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, mockIndexingServiceClient);
final List<DataSourceCompactionConfig> compactionConfigs = new ArrayList<>();
final String dataSource = DATA_SOURCE_PREFIX + 0;
compactionConfigs.add(
@@ -818,7 +821,7 @@ public class CompactSegmentsTest
Mockito.when(mockIndexingServiceClient.getActiveTasks()).thenReturn(ImmutableList.of(runningConflictCompactionTask));
Mockito.when(mockIndexingServiceClient.getTaskPayload(ArgumentMatchers.eq(conflictTaskId))).thenReturn(runningConflictCompactionTaskPayload);
- final CompactSegments compactSegments = new CompactSegments(JSON_MAPPER, mockIndexingServiceClient);
+ final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, mockIndexingServiceClient);
final List<DataSourceCompactionConfig> compactionConfigs = new ArrayList<>();
compactionConfigs.add(
new DataSourceCompactionConfig(
@@ -884,7 +887,7 @@ public class CompactSegmentsTest
final TestDruidLeaderClient leaderClient = new TestDruidLeaderClient(JSON_MAPPER);
leaderClient.start();
final HttpIndexingServiceClient indexingServiceClient = new HttpIndexingServiceClient(JSON_MAPPER, leaderClient);
- final CompactSegments compactSegments = new CompactSegments(JSON_MAPPER, indexingServiceClient);
+ final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, indexingServiceClient);
final CoordinatorStats stats = doCompactSegments(compactSegments, createCompactionConfigs(2), 4);
Assert.assertEquals(4, stats.getGlobalStat(CompactSegments.AVAILABLE_COMPACTION_TASK_SLOT));
@@ -892,6 +895,99 @@ public class CompactSegmentsTest
Assert.assertEquals(2, stats.getGlobalStat(CompactSegments.COMPACTION_TASK_COUNT));
}
+ @Test
+ public void testRunWithLockedIntervals()
+ {
+ final TestDruidLeaderClient leaderClient = new TestDruidLeaderClient(JSON_MAPPER);
+ leaderClient.start();
+ HttpIndexingServiceClient indexingServiceClient = new HttpIndexingServiceClient(JSON_MAPPER, leaderClient);
+
+ // Lock all intervals for dataSource_1 and dataSource_2
+ final String datasource1 = DATA_SOURCE_PREFIX + 1;
+ leaderClient.lockedIntervals
+ .computeIfAbsent(datasource1, k -> new ArrayList<>())
+ .add(Intervals.of("2017/2018"));
+
+ final String datasource2 = DATA_SOURCE_PREFIX + 2;
+ leaderClient.lockedIntervals
+ .computeIfAbsent(datasource2, k -> new ArrayList<>())
+ .add(Intervals.of("2017/2018"));
+
+ // Lock all intervals but one for dataSource_0
+ final String datasource0 = DATA_SOURCE_PREFIX + 0;
+ leaderClient.lockedIntervals
+ .computeIfAbsent(datasource0, k -> new ArrayList<>())
+ .add(Intervals.of("2017-01-01T13:00:00Z/2017-02-01"));
+
+ // Verify that locked intervals are skipped and only one compaction task
+ // is submitted for dataSource_0
+ CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, indexingServiceClient);
+ final CoordinatorStats stats = doCompactSegments(compactSegments, createCompactionConfigs(2), 4);
+ Assert.assertEquals(1, stats.getGlobalStat(CompactSegments.COMPACTION_TASK_COUNT));
+ Assert.assertEquals(1, leaderClient.submittedCompactionTasks.size());
+
+ final ClientCompactionTaskQuery compactionTask = leaderClient.submittedCompactionTasks.get(0);
+ Assert.assertEquals(datasource0, compactionTask.getDataSource());
+ Assert.assertEquals(
+ Intervals.of("2017-01-01T00:00:00/2017-01-01T12:00:00"),
+ compactionTask.getIoConfig().getInputSpec().getInterval()
+ );
+ }
+
+ @Test
+ public void testRunWithLockedIntervalsNoSkip()
+ {
+ Mockito.when(COORDINATOR_CONFIG.getCompactionSkipLockedIntervals()).thenReturn(false);
+
+ final TestDruidLeaderClient leaderClient = new TestDruidLeaderClient(JSON_MAPPER);
+ leaderClient.start();
+ HttpIndexingServiceClient indexingServiceClient = new HttpIndexingServiceClient(JSON_MAPPER, leaderClient);
+
+ // Lock all intervals for all the dataSources
+ final String datasource0 = DATA_SOURCE_PREFIX + 0;
+ leaderClient.lockedIntervals
+ .computeIfAbsent(datasource0, k -> new ArrayList<>())
+ .add(Intervals.of("2017/2018"));
+
+ final String datasource1 = DATA_SOURCE_PREFIX + 1;
+ leaderClient.lockedIntervals
+ .computeIfAbsent(datasource1, k -> new ArrayList<>())
+ .add(Intervals.of("2017/2018"));
+
+ final String datasource2 = DATA_SOURCE_PREFIX + 2;
+ leaderClient.lockedIntervals
+ .computeIfAbsent(datasource2, k -> new ArrayList<>())
+ .add(Intervals.of("2017/2018"));
+
+ // Verify that no locked intervals are skipped
+ CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, indexingServiceClient);
+ int maxTaskSlots = partitionsSpec instanceof SingleDimensionPartitionsSpec ? 5 : 3;
+ final CoordinatorStats stats = doCompactSegments(compactSegments, createCompactionConfigs(1), maxTaskSlots);
+ Assert.assertEquals(3, stats.getGlobalStat(CompactSegments.COMPACTION_TASK_COUNT));
+ Assert.assertEquals(3, leaderClient.submittedCompactionTasks.size());
+ leaderClient.submittedCompactionTasks.forEach(task -> {
+ System.out.println(task.getDataSource() + " : " + task.getIoConfig().getInputSpec().getInterval());
+ });
+
+ // Verify that tasks are submitted for the latest interval of each dataSource
+ final Map<String, Interval> datasourceToInterval = new HashMap<>();
+ leaderClient.submittedCompactionTasks.forEach(
+ task -> datasourceToInterval.put(
+ task.getDataSource(), task.getIoConfig().getInputSpec().getInterval()));
+ Assert.assertEquals(
+ Intervals.of("2017-01-09T00:00:00Z/2017-01-09T12:00:00Z"),
+ datasourceToInterval.get(datasource0)
+ );
+ Assert.assertEquals(
+ Intervals.of("2017-01-09T00:00:00Z/2017-01-09T12:00:00Z"),
+ datasourceToInterval.get(datasource1)
+ );
+ Assert.assertEquals(
+ Intervals.of("2017-01-09T00:00:00Z/2017-01-09T12:00:00Z"),
+ datasourceToInterval.get(datasource2)
+ );
+ }
+
private void verifySnapshot(
CompactSegments compactSegments,
AutoCompactionSnapshot.AutoCompactionScheduleStatus scheduleStatus,
@@ -1183,6 +1279,12 @@ public class CompactSegmentsTest
{
private final ObjectMapper jsonMapper;
+ // Map from Task Id to the intervals locked by that task
+ private final Map<String, List<Interval>> lockedIntervals = new HashMap<>();
+
+ // List of submitted compaction tasks for verification in the tests
+ private final List<ClientCompactionTaskQuery> submittedCompactionTasks = new ArrayList<>();
+
private int compactVersionSuffix = 0;
private TestDruidLeaderClient(ObjectMapper jsonMapper)
@@ -1209,6 +1311,8 @@ public class CompactSegmentsTest
|| urlString.contains("/druid/indexer/v1/pendingTasks")
|| urlString.contains("/druid/indexer/v1/runningTasks")) {
return createStringFullResponseHolder(jsonMapper.writeValueAsString(Collections.emptyList()));
+ } else if (urlString.contains(("/druid/indexer/v1/lockedIntervals"))) {
+ return handleLockedIntervals();
} else {
throw new IAE("Cannot handle request for url[%s]", request.getUrl());
}
@@ -1252,6 +1356,8 @@ public class CompactSegmentsTest
throw new IAE("Cannot run non-compaction task");
}
final ClientCompactionTaskQuery compactionTaskQuery = (ClientCompactionTaskQuery) taskQuery;
+ submittedCompactionTasks.add(compactionTaskQuery);
+
final Interval intervalToCompact = compactionTaskQuery.getIoConfig().getInputSpec().getInterval();
final VersionedIntervalTimeline<String, DataSegment> timeline = dataSources.get(
compactionTaskQuery.getDataSource()
@@ -1269,6 +1375,11 @@ public class CompactSegmentsTest
return createStringFullResponseHolder(jsonMapper.writeValueAsString(ImmutableMap.of("task", taskQuery.getId())));
}
+ private StringFullResponseHolder handleLockedIntervals() throws IOException
+ {
+ return createStringFullResponseHolder(jsonMapper.writeValueAsString(lockedIntervals));
+ }
+
private void compactSegments(
VersionedIntervalTimeline<String, DataSegment> timeline,
List<DataSegment> segments,
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org