You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ma...@apache.org on 2021/06/21 00:22:26 UTC

[druid] branch master updated: Temporarily skip compaction for locked intervals (#11190)

This is an automated email from the ASF dual-hosted git repository.

maytasm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new f0b105e  Temporarily skip compaction for locked intervals (#11190)
f0b105e is described below

commit f0b105ec63657643c85d82686ae1dea3e588a5f5
Author: Kashif Faraz <ka...@gmail.com>
AuthorDate: Mon Jun 21 05:51:59 2021 +0530

    Temporarily skip compaction for locked intervals (#11190)
    
    * Add overlord API /lockedIntervals. Skip compaction for locked intervals
    
    * Revert formatting changes
    
    * Add license info
    
    * Fix checkstyle
    
    * Remove invalid API invocation
    
    * Fix checkstyle
    
    * Add DatasourceIntervalsTest
    
    * Fix checkstyle
    
    * Remove LockedIntervalsResponse
    
    * Add integration tests for lockedIntervals
    
    * Add ITAutoCompactionLockContentionTest
    
    * Add config druid.coordinator.compaction.skipLockedIntervals
    
    * Add test for TaskQueue
---
 .../druid/indexing/overlord/TaskLockbox.java       |  61 +++
 .../indexing/overlord/TaskStorageQueryAdapter.java |  22 +-
 .../indexing/overlord/http/OverlordResource.java   |  14 +
 .../druid/indexing/overlord/TaskLifecycleTest.java |   2 +-
 .../druid/indexing/overlord/TaskLockboxTest.java   | 139 +++++++
 .../druid/indexing/overlord/TaskQueueTest.java     |  36 ++
 .../overlord/http/OverlordResourceTest.java        |  60 +++
 .../druid/indexing/overlord/http/OverlordTest.java |   2 +-
 .../clients/OverlordResourceTestClient.java        |  26 ++
 .../apache/druid/testing/utils/CompactionUtil.java |  77 ++++
 .../duty/ITAutoCompactionLockContentionTest.java   | 418 +++++++++++++++++++++
 .../druid/tests/indexer/AbstractIndexerTest.java   |  11 +-
 .../tests/indexer/AbstractStreamIndexingTest.java  |   9 +-
 .../apache/druid/tests/indexer/ITIndexerTest.java  |  42 +++
 .../client/indexing/HttpIndexingServiceClient.java |  23 ++
 .../client/indexing/IndexingServiceClient.java     |  12 +
 .../server/coordinator/DruidCoordinatorConfig.java |   7 +
 .../server/coordinator/duty/CompactSegments.java   |  68 +++-
 .../client/indexing/NoopIndexingServiceClient.java |   6 +
 .../coordinator/DruidCoordinatorConfigTest.java    |   3 +
 .../coordinator/duty/CompactSegmentsTest.java      | 133 ++++++-
 21 files changed, 1144 insertions(+), 27 deletions(-)

diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
index ddb764d..d33d96d 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
@@ -674,6 +674,67 @@ public class TaskLockbox
     }
   }
 
+  /**
+   * Gets a List of Intervals locked by higher priority tasks for each datasource.
+   * Here, Segment Locks are being treated the same as Time Chunk Locks i.e.
+   * a Task with a Segment Lock is assumed to lock a whole Interval and not just
+   * the corresponding Segment.
+   *
+   * @param minTaskPriority Minimum task priority for each datasource. Only the
+   *                        Intervals that are locked by Tasks with equal or
+   *                        higher priority than this are returned. Locked intervals
+   *                        for datasources that are not present in this Map are
+   *                        not returned.
+   * @return Map from Datasource to List of Intervals locked by Tasks that have
+   * priority greater than or equal to the {@code minTaskPriority} for that datasource.
+   */
+  public Map<String, List<Interval>> getLockedIntervals(Map<String, Integer> minTaskPriority)
+  {
+    final Map<String, Set<Interval>> datasourceToIntervals = new HashMap<>();
+
+    // Take a lock and populate the maps
+    giant.lock();
+    try {
+      running.forEach(
+          (datasource, datasourceLocks) -> {
+            // If this datasource is not requested, do not proceed
+            if (!minTaskPriority.containsKey(datasource)) {
+              return;
+            }
+
+            datasourceLocks.forEach(
+                (startTime, startTimeLocks) -> startTimeLocks.forEach(
+                    (interval, taskLockPosses) -> taskLockPosses.forEach(
+                        taskLockPosse -> {
+                          if (taskLockPosse.getTaskLock().isRevoked()) {
+                            // Do not proceed if the lock is revoked
+                            return;
+                          } else if (taskLockPosse.getTaskLock().getPriority() == null
+                                     || taskLockPosse.getTaskLock().getPriority() < minTaskPriority.get(datasource)) {
+                            // Do not proceed if the lock has a priority strictly less than the minimum
+                            return;
+                          }
+
+                          datasourceToIntervals
+                              .computeIfAbsent(datasource, k -> new HashSet<>())
+                              .add(interval);
+                        })
+                )
+            );
+          }
+      );
+    }
+    finally {
+      giant.unlock();
+    }
+
+    return datasourceToIntervals.entrySet().stream()
+                                .collect(Collectors.toMap(
+                                    Map.Entry::getKey,
+                                    entry -> new ArrayList<>(entry.getValue())
+                                ));
+  }
+
   public void unlock(final Task task, final Interval interval)
   {
     unlock(task, interval, null);
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorageQueryAdapter.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorageQueryAdapter.java
index 6a12b40..4ccd925 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorageQueryAdapter.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorageQueryAdapter.java
@@ -29,10 +29,12 @@ import org.apache.druid.indexing.common.actions.TaskAction;
 import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.timeline.DataSegment;
 import org.joda.time.Duration;
+import org.joda.time.Interval;
 
 import javax.annotation.Nullable;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 /**
@@ -41,11 +43,13 @@ import java.util.Set;
 public class TaskStorageQueryAdapter
 {
   private final TaskStorage storage;
+  private final TaskLockbox taskLockbox;
 
   @Inject
-  public TaskStorageQueryAdapter(TaskStorage storage)
+  public TaskStorageQueryAdapter(TaskStorage storage, TaskLockbox taskLockbox)
   {
     this.storage = storage;
+    this.taskLockbox = taskLockbox;
   }
 
   public List<Task> getActiveTasks()
@@ -53,6 +57,22 @@ public class TaskStorageQueryAdapter
     return storage.getActiveTasks();
   }
 
+  /**
+   * Gets a List of Intervals locked by higher priority tasks for each datasource.
+   *
+   * @param minTaskPriority Minimum task priority for each datasource. Only the
+   *                        Intervals that are locked by Tasks with equal or
+   *                        higher priority than this are returned. Locked intervals
+   *                        for datasources that are not present in this Map are
+   *                        not returned.
+   * @return Map from Datasource to List of Intervals locked by Tasks that have
+   * priority greater than or equal to the {@code minTaskPriority} for that datasource.
+   */
+  public Map<String, List<Interval>> getLockedIntervals(Map<String, Integer> minTaskPriority)
+  {
+    return taskLockbox.getLockedIntervals(minTaskPriority);
+  }
+
   public List<TaskInfo<Task, TaskStatus>> getActiveTaskInfo(@Nullable String dataSource)
   {
     return storage.getActiveTaskInfo(dataSource);
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
index bc01e55..0076eb0 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
@@ -227,6 +227,20 @@ public class OverlordResource
     }
   }
 
+  @POST
+  @Path("/lockedIntervals")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ResourceFilters(StateResourceFilter.class)
+  public Response getDatasourceLockedIntervals(Map<String, Integer> minTaskPriority)
+  {
+    if (minTaskPriority == null || minTaskPriority.isEmpty()) {
+      return Response.status(Status.BAD_REQUEST).entity("No Datasource provided").build();
+    }
+
+    // Build the response
+    return Response.ok(taskStorageQueryAdapter.getLockedIntervals(minTaskPriority)).build();
+  }
+
   @GET
   @Path("/task/{taskid}")
   @Produces(MediaType.APPLICATION_JSON)
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
index 7c41ba9..613e0ac 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
@@ -475,7 +475,7 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest
         throw new RE("Unknown task storage type [%s]", taskStorageType);
       }
     }
-    tsqa = new TaskStorageQueryAdapter(taskStorage);
+    tsqa = new TaskStorageQueryAdapter(taskStorage, taskLockbox);
     return taskStorage;
   }
 
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
index 6cb19ed..4b20ba4 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
@@ -70,8 +70,10 @@ import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -1119,6 +1121,143 @@ public class TaskLockboxTest
     );
   }
 
+  @Test
+  public void testGetLockedIntervals()
+  {
+    // Acquire locks for task1
+    final Task task1 = NoopTask.create("ds1");
+    lockbox.add(task1);
+
+    tryTimeChunkLock(
+        TaskLockType.EXCLUSIVE,
+        task1,
+        Intervals.of("2017-01-01/2017-02-01")
+    );
+    tryTimeChunkLock(
+        TaskLockType.EXCLUSIVE,
+        task1,
+        Intervals.of("2017-04-01/2017-05-01")
+    );
+
+    // Acquire locks for task2
+    final Task task2 = NoopTask.create("ds2");
+    lockbox.add(task2);
+    tryTimeChunkLock(
+        TaskLockType.EXCLUSIVE,
+        task2,
+        Intervals.of("2017-03-01/2017-04-01")
+    );
+
+    // Verify the locked intervals
+    final Map<String, Integer> minTaskPriority = new HashMap<>();
+    minTaskPriority.put(task1.getDataSource(), 10);
+    minTaskPriority.put(task2.getDataSource(), 10);
+    final Map<String, List<Interval>> lockedIntervals = lockbox.getLockedIntervals(minTaskPriority);
+    Assert.assertEquals(2, lockedIntervals.size());
+
+    Assert.assertEquals(
+        Arrays.asList(
+            Intervals.of("2017-01-01/2017-02-01"),
+            Intervals.of("2017-04-01/2017-05-01")
+        ),
+        lockedIntervals.get(task1.getDataSource())
+    );
+
+    Assert.assertEquals(
+        Collections.singletonList(
+            Intervals.of("2017-03-01/2017-04-01")),
+        lockedIntervals.get(task2.getDataSource())
+    );
+  }
+
+  @Test
+  public void testGetLockedIntervalsForLowPriorityTask() throws Exception
+  {
+    // Acquire lock for a low priority task
+    final Task lowPriorityTask = NoopTask.create(5);
+    lockbox.add(lowPriorityTask);
+    taskStorage.insert(lowPriorityTask, TaskStatus.running(lowPriorityTask.getId()));
+    tryTimeChunkLock(
+        TaskLockType.EXCLUSIVE,
+        lowPriorityTask,
+        Intervals.of("2017/2018")
+    );
+
+    final Map<String, Integer> minTaskPriority = new HashMap<>();
+    minTaskPriority.put(lowPriorityTask.getDataSource(), 10);
+
+    Map<String, List<Interval>> lockedIntervals = lockbox.getLockedIntervals(minTaskPriority);
+    Assert.assertTrue(lockedIntervals.isEmpty());
+  }
+
+  @Test
+  public void testGetLockedIntervalsForEqualPriorityTask() throws Exception
+  {
+    // Acquire lock for a low priority task
+    final Task task = NoopTask.create(5);
+    lockbox.add(task);
+    taskStorage.insert(task, TaskStatus.running(task.getId()));
+    tryTimeChunkLock(
+        TaskLockType.EXCLUSIVE,
+        task,
+        Intervals.of("2017/2018")
+    );
+
+    final Map<String, Integer> minTaskPriority = new HashMap<>();
+    minTaskPriority.put(task.getDataSource(), 5);
+
+    Map<String, List<Interval>> lockedIntervals = lockbox.getLockedIntervals(minTaskPriority);
+    Assert.assertEquals(1, lockedIntervals.size());
+    Assert.assertEquals(
+        Collections.singletonList(Intervals.of("2017/2018")),
+        lockedIntervals.get(task.getDataSource())
+    );
+  }
+
+  @Test
+  public void testGetLockedIntervalsForRevokedLocks() throws Exception
+  {
+    // Acquire lock for a low priority task
+    final Task lowPriorityTask = NoopTask.create(5);
+    lockbox.add(lowPriorityTask);
+    taskStorage.insert(lowPriorityTask, TaskStatus.running(lowPriorityTask.getId()));
+    tryTimeChunkLock(
+        TaskLockType.EXCLUSIVE,
+        lowPriorityTask,
+        Intervals.of("2017/2018")
+    );
+
+    final Map<String, Integer> minTaskPriority = new HashMap<>();
+    minTaskPriority.put(lowPriorityTask.getDataSource(), 1);
+
+    Map<String, List<Interval>> lockedIntervals = lockbox.getLockedIntervals(minTaskPriority);
+    Assert.assertEquals(1, lockedIntervals.size());
+    Assert.assertEquals(
+        Collections.singletonList(
+            Intervals.of("2017/2018")),
+        lockedIntervals.get(lowPriorityTask.getDataSource())
+    );
+
+    // Revoke the lowPriorityTask
+    final Task highPriorityTask = NoopTask.create(10);
+    lockbox.add(highPriorityTask);
+    tryTimeChunkLock(
+        TaskLockType.EXCLUSIVE,
+        highPriorityTask,
+        Intervals.of("2017-05-01/2017-06-01")
+    );
+
+    // Verify the locked intervals
+    minTaskPriority.put(highPriorityTask.getDataSource(), 1);
+    lockedIntervals = lockbox.getLockedIntervals(minTaskPriority);
+    Assert.assertEquals(1, lockedIntervals.size());
+    Assert.assertEquals(
+        Collections.singletonList(
+            Intervals.of("2017-05-01/2017-06-01")),
+        lockedIntervals.get(highPriorityTask.getDataSource())
+    );
+  }
+
   private Set<TaskLock> getAllLocks(List<Task> tasks)
   {
     return tasks.stream()
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java
index bdeef05..2dbd9d1 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java
@@ -25,6 +25,8 @@ import com.google.common.collect.ImmutableMap;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.TaskLock;
+import org.apache.druid.indexing.common.TaskLockType;
 import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.indexing.common.actions.TaskActionClient;
 import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
@@ -115,6 +117,40 @@ public class TaskQueueTest extends IngestionTestBase
   }
 
   @Test
+  public void testShutdownReleasesTaskLock() throws Exception
+  {
+    final TaskActionClientFactory actionClientFactory = createActionClientFactory();
+    final TaskQueue taskQueue = new TaskQueue(
+        new TaskLockConfig(),
+        new TaskQueueConfig(null, null, null, null),
+        new DefaultTaskConfig(),
+        getTaskStorage(),
+        new SimpleTaskRunner(actionClientFactory),
+        actionClientFactory,
+        getLockbox(),
+        new NoopServiceEmitter()
+    );
+    taskQueue.setActive(true);
+
+    // Create a Task and add it to the TaskQueue
+    final TestTask task = new TestTask("t1", Intervals.of("2021-01/P1M"));
+    taskQueue.add(task);
+
+    // Acquire a lock for the Task
+    getLockbox().lock(
+        task,
+        new TimeChunkLockRequest(TaskLockType.EXCLUSIVE, task, task.interval, null)
+    );
+    final List<TaskLock> locksForTask = getLockbox().findLocksForTask(task);
+    Assert.assertEquals(1, locksForTask.size());
+    Assert.assertEquals(task.interval, locksForTask.get(0).getInterval());
+
+    // Verify that locks are removed on calling shutdown
+    taskQueue.shutdown(task.getId(), "Shutdown Task");
+    Assert.assertTrue(getLockbox().findLocksForTask(task).isEmpty());
+  }
+
+  @Test
   public void testSetUseLineageBasedSegmentAllocationByDefault() throws EntryExistsException
   {
     final TaskActionClientFactory actionClientFactory = createActionClientFactory();
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java
index e41a6c9..c475166 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java
@@ -19,6 +19,8 @@
 
 package org.apache.druid.indexing.overlord.http;
 
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
@@ -43,6 +45,7 @@ import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
 import org.apache.druid.indexing.overlord.TaskStorageQueryAdapter;
 import org.apache.druid.indexing.overlord.WorkerTaskRunnerQueryAdapter;
 import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.RE;
 import org.apache.druid.segment.TestHelper;
 import org.apache.druid.server.security.Access;
@@ -70,6 +73,7 @@ import org.junit.rules.ExpectedException;
 import javax.servlet.http.HttpServletRequest;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
@@ -1060,6 +1064,62 @@ public class OverlordResourceTest
   }
 
   @Test
+  public void testGetLockedIntervals() throws Exception
+  {
+    final Map<String, Integer> minTaskPriority = Collections.singletonMap("ds1", 0);
+    final Map<String, List<Interval>> expectedLockedIntervals = Collections.singletonMap(
+        "ds1",
+        Arrays.asList(
+            Intervals.of("2012-01-01/2012-01-02"),
+            Intervals.of("2012-01-02/2012-01-03")
+        )
+    );
+
+    EasyMock.expect(taskStorageQueryAdapter.getLockedIntervals(minTaskPriority))
+            .andReturn(expectedLockedIntervals);
+    EasyMock.replay(
+        taskRunner,
+        taskMaster,
+        taskStorageQueryAdapter,
+        indexerMetadataStorageAdapter,
+        req,
+        workerTaskRunnerQueryAdapter
+    );
+
+    final Response response = overlordResource.getDatasourceLockedIntervals(minTaskPriority);
+    Assert.assertEquals(200, response.getStatus());
+
+    final ObjectMapper jsonMapper = TestHelper.makeJsonMapper();
+    Map<String, List<Interval>> observedLockedIntervals = jsonMapper.readValue(
+        jsonMapper.writeValueAsString(response.getEntity()),
+        new TypeReference<Map<String, List<Interval>>>()
+        {
+        }
+    );
+
+    Assert.assertEquals(expectedLockedIntervals, observedLockedIntervals);
+  }
+
+  @Test
+  public void testGetLockedIntervalsWithEmptyBody()
+  {
+    EasyMock.replay(
+        taskRunner,
+        taskMaster,
+        taskStorageQueryAdapter,
+        indexerMetadataStorageAdapter,
+        req,
+        workerTaskRunnerQueryAdapter
+    );
+
+    Response response = overlordResource.getDatasourceLockedIntervals(null);
+    Assert.assertEquals(400, response.getStatus());
+
+    response = overlordResource.getDatasourceLockedIntervals(Collections.emptyMap());
+    Assert.assertEquals(400, response.getStatus());
+  }
+
+  @Test
   public void testShutdownTask()
   {
     // This is disabled since OverlordResource.doShutdown is annotated with TaskResourceFilter
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java
index 5209954..108e2c8 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java
@@ -216,7 +216,7 @@ public class OverlordTest
     }
     Assert.assertEquals(taskMaster.getCurrentLeader(), druidNode.getHostAndPort());
 
-    final TaskStorageQueryAdapter taskStorageQueryAdapter = new TaskStorageQueryAdapter(taskStorage);
+    final TaskStorageQueryAdapter taskStorageQueryAdapter = new TaskStorageQueryAdapter(taskStorage, taskLockbox);
     final WorkerTaskRunnerQueryAdapter workerTaskRunnerQueryAdapter = new WorkerTaskRunnerQueryAdapter(taskMaster, null);
     // Test Overlord resource stuff
     overlordResource = new OverlordResource(
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java b/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java
index 6e40ee2..184f514 100644
--- a/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java
+++ b/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java
@@ -44,6 +44,7 @@ import org.apache.druid.testing.guice.TestClient;
 import org.apache.druid.testing.utils.ITRetryUtil;
 import org.jboss.netty.handler.codec.http.HttpMethod;
 import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+import org.joda.time.Interval;
 
 import java.net.URL;
 import java.util.ArrayList;
@@ -246,6 +247,31 @@ public class OverlordResourceTestClient
     }
   }
 
+  public Map<String, List<Interval>> getLockedIntervals(Map<String, Integer> minTaskPriority)
+  {
+    try {
+      String jsonBody = jsonMapper.writeValueAsString(minTaskPriority);
+
+      StatusResponseHolder response = httpClient.go(
+          new Request(HttpMethod.POST, new URL(getIndexerURL() + "lockedIntervals"))
+              .setContent(
+                  "application/json",
+                  StringUtils.toUtf8(jsonBody)
+              ),
+          StatusResponseHandler.getInstance()
+      ).get();
+      return jsonMapper.readValue(
+          response.getContent(),
+          new TypeReference<Map<String, List<Interval>>>()
+          {
+          }
+      );
+    }
+    catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
   public void waitUntilTaskCompletes(final String taskID)
   {
     waitUntilTaskCompletes(taskID, ITRetryUtil.DEFAULT_RETRY_SLEEP, ITRetryUtil.DEFAULT_RETRY_COUNT);
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/CompactionUtil.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/CompactionUtil.java
new file mode 100644
index 0000000..5ccd5f4
--- /dev/null
+++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/CompactionUtil.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.utils;
+
+import org.apache.druid.data.input.MaxSizeSplitHintSpec;
+import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
+import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
+import org.apache.druid.server.coordinator.UserCompactionTaskIOConfig;
+import org.apache.druid.server.coordinator.UserCompactionTaskQueryTuningConfig;
+import org.joda.time.Period;
+
+/**
+ * Contains utility methods for Compaction.
+ */
+public class CompactionUtil
+{
+
+  private CompactionUtil()
+  {
+    // no instantiation
+  }
+
+  public static DataSourceCompactionConfig createCompactionConfig(
+      String fullDatasourceName,
+      Integer maxRowsPerSegment,
+      Period skipOffsetFromLatest
+  )
+  {
+    return new DataSourceCompactionConfig(
+        fullDatasourceName,
+        null,
+        null,
+        null,
+        skipOffsetFromLatest,
+        new UserCompactionTaskQueryTuningConfig(
+            null,
+            null,
+            null,
+            new MaxSizeSplitHintSpec(null, 1),
+            new DynamicPartitionsSpec(maxRowsPerSegment, null),
+            null,
+            null,
+            null,
+            null,
+            null,
+            1,
+            null,
+            null,
+            null,
+            null,
+            null,
+            1
+        ),
+        null,
+        new UserCompactionTaskIOConfig(true),
+        null
+    );
+  }
+
+}
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionLockContentionTest.java b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionLockContentionTest.java
new file mode 100644
index 0000000..84c1bf5
--- /dev/null
+++ b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionLockContentionTest.java
@@ -0,0 +1,418 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.tests.coordinator.duty;
+
+import com.google.inject.Inject;
+import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
+import org.apache.druid.server.coordinator.CoordinatorCompactionConfig;
+import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
+import org.apache.druid.testing.clients.CompactionResourceTestClient;
+import org.apache.druid.testing.clients.TaskResponseObject;
+import org.apache.druid.testing.guice.DruidTestModuleFactory;
+import org.apache.druid.testing.utils.CompactionUtil;
+import org.apache.druid.testing.utils.EventSerializer;
+import org.apache.druid.testing.utils.ITRetryUtil;
+import org.apache.druid.testing.utils.KafkaUtil;
+import org.apache.druid.testing.utils.StreamEventWriter;
+import org.apache.druid.testing.utils.StreamGenerator;
+import org.apache.druid.testing.utils.WikipediaStreamEventStreamGenerator;
+import org.apache.druid.tests.TestNGGroup;
+import org.apache.druid.tests.indexer.AbstractKafkaIndexingServiceTest;
+import org.apache.druid.tests.indexer.AbstractStreamIndexingTest;
+import org.apache.druid.timeline.DataSegment;
+import org.joda.time.Interval;
+import org.joda.time.Period;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+import java.io.Closeable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Integration Test to verify behaviour when there is a lock contention between
+ * compaction tasks and on-going stream ingestion tasks.
+ */
+@Test(groups = {TestNGGroup.COMPACTION})
+@Guice(moduleFactory = DruidTestModuleFactory.class)
+public class ITAutoCompactionLockContentionTest extends AbstractKafkaIndexingServiceTest
+{
+  private static final Logger LOG = new Logger(ITAutoCompactionLockContentionTest.class);
+
+  @Inject
+  private CompactionResourceTestClient compactionResource;
+
+  private GeneratedTestConfig generatedTestConfig;
+  private StreamGenerator streamGenerator;
+
+  private String fullDatasourceName;
+
+  @DataProvider
+  public static Object[] getParameters()
+  {
+    return new Object[]{false, true};
+  }
+
+  @BeforeClass
+  public void setupClass() throws Exception
+  {
+    doBeforeClass();
+  }
+
+  @BeforeMethod
+  public void setup() throws Exception
+  {
+    generatedTestConfig = new GeneratedTestConfig(
+        Specs.PARSER_TYPE,
+        getResourceAsString(Specs.INPUT_FORMAT_PATH)
+    );
+    fullDatasourceName = generatedTestConfig.getFullDatasourceName();
+    final EventSerializer serializer = jsonMapper.readValue(
+        getResourceAsStream(Specs.SERIALIZER_PATH),
+        EventSerializer.class
+    );
+    streamGenerator = new WikipediaStreamEventStreamGenerator(serializer, 6, 100);
+  }
+
+  @Override
+  public String getTestNamePrefix()
+  {
+    return "autocompact_lock_contention";
+  }
+
+  @Test(dataProvider = "getParameters")
+  public void testAutoCompactionSkipsLockedIntervals(boolean transactionEnabled) throws Exception
+  {
+    if (shouldSkipTest(transactionEnabled)) {
+      return;
+    }
+
+    try (
+        final Closeable closer = createResourceCloser(generatedTestConfig);
+        final StreamEventWriter streamEventWriter = createStreamEventWriter(config, transactionEnabled)
+    ) {
+      // Start supervisor
+      final String taskSpec = generatedTestConfig.getStreamIngestionPropsTransform()
+                                                 .apply(getResourceAsString(SUPERVISOR_SPEC_TEMPLATE_PATH));
+      generatedTestConfig.setSupervisorId(indexer.submitSupervisor(taskSpec));
+      LOG.info("supervisorSpec: [%s]", taskSpec);
+
+      // Generate data for minutes 1, 2 and 3
+      final Interval minute1 = Intervals.of("2000-01-01T01:01:00Z/2000-01-01T01:02:00Z");
+      final long rowsForMinute1 = generateData(minute1, streamEventWriter);
+
+      final Interval minute2 = Intervals.of("2000-01-01T01:02:00Z/2000-01-01T01:03:00Z");
+      long rowsForMinute2 = generateData(minute2, streamEventWriter);
+
+      final Interval minute3 = Intervals.of("2000-01-01T01:03:00Z/2000-01-01T01:04:00Z");
+      final long rowsForMinute3 = generateData(minute3, streamEventWriter);
+
+      // Wait for data to be ingested for all the minutes
+      ensureRowCount(rowsForMinute1 + rowsForMinute2 + rowsForMinute3);
+
+      // Wait for the segments to be loaded and interval locks to be released
+      ensureLockedIntervals();
+      ensureSegmentsLoaded();
+
+      // 2 segments for each minute, total 6
+      ensureSegmentsCount(6);
+
+      // Generate more data for minute2 so that it gets locked
+      rowsForMinute2 += generateData(minute2, streamEventWriter);
+      ensureLockedIntervals(minute2);
+
+      // Trigger auto compaction
+      submitAndVerifyCompactionConfig();
+      compactionResource.forceTriggerAutoCompaction();
+
+      // Wait for segments to be loaded
+      ensureRowCount(rowsForMinute1 + rowsForMinute2 + rowsForMinute3);
+      ensureLockedIntervals();
+      ensureSegmentsLoaded();
+
+      // Verify that minute1 and minute3 have been compacted
+      ensureCompactionTaskCount(2);
+      verifyCompactedIntervals(minute1, minute3);
+
+      // Trigger auto compaction again
+      compactionResource.forceTriggerAutoCompaction();
+
+      // Verify that all the segments are now compacted
+      ensureCompactionTaskCount(3);
+      ensureSegmentsLoaded();
+      verifyCompactedIntervals(minute1, minute2, minute3);
+      ensureSegmentsCount(3);
+    }
+  }
+
+  /**
+   * Retries until the segment count is as expected.
+   */
+  private void ensureSegmentsCount(int numExpectedSegments)
+  {
+    ITRetryUtil.retryUntilTrue(
+        () -> {
+          List<DataSegment> segments = coordinator.getFullSegmentsMetadata(fullDatasourceName);
+          StringBuilder sb = new StringBuilder();
+          segments.forEach(
+              seg -> sb.append("{")
+                       .append(seg.getId())
+                       .append(", ")
+                       .append(seg.getSize())
+                       .append("}, ")
+          );
+          LOG.info("Found Segments: %s", sb);
+          LOG.info("Current metadata segment count: %d, expected: %d", segments.size(), numExpectedSegments);
+          return segments.size() == numExpectedSegments;
+        },
+        "Segment count check"
+    );
+  }
+
+  /**
+   * Verifies that the given intervals have been compacted.
+   */
+  private void verifyCompactedIntervals(Interval... compactedIntervals)
+  {
+    List<DataSegment> segments = coordinator.getFullSegmentsMetadata(fullDatasourceName);
+    List<DataSegment> observedCompactedSegments = new ArrayList<>();
+    Set<Interval> observedCompactedIntervals = new HashSet<>();
+    for (DataSegment segment : segments) {
+      if (segment.getLastCompactionState() != null) {
+        observedCompactedSegments.add(segment);
+        observedCompactedIntervals.add(segment.getInterval());
+      }
+    }
+
+    Set<Interval> expectedCompactedIntervals = new HashSet<>(Arrays.asList(compactedIntervals));
+    Assert.assertEquals(observedCompactedIntervals, expectedCompactedIntervals);
+
+    DynamicPartitionsSpec expectedPartitionSpec = new DynamicPartitionsSpec(
+        Specs.MAX_ROWS_PER_SEGMENT,
+        Long.MAX_VALUE
+    );
+    for (DataSegment compactedSegment : observedCompactedSegments) {
+      Assert.assertNotNull(compactedSegment.getLastCompactionState());
+      Assert.assertEquals(
+          compactedSegment.getLastCompactionState().getPartitionsSpec(),
+          expectedPartitionSpec
+      );
+    }
+  }
+
+  /**
+   * Generates data points for the specified interval.
+   *
+   * @return Number of rows generated.
+   */
+  private long generateData(Interval interval, StreamEventWriter streamEventWriter)
+  {
+    long rowCount = streamGenerator.run(
+        generatedTestConfig.getStreamName(),
+        streamEventWriter,
+        10,
+        interval.getStart()
+    );
+    LOG.info("Generated %d Rows for Interval [%s]", rowCount, interval);
+
+    return rowCount;
+  }
+
+  /**
+   * Retries until segments have been loaded.
+   */
+  private void ensureSegmentsLoaded()
+  {
+    ITRetryUtil.retryUntilTrue(
+        () -> coordinator.areSegmentsLoaded(fullDatasourceName),
+        "Segment Loading"
+    );
+  }
+
+  /**
+   * Retries until the specified Intervals are locked for the current datasource.
+   * If no interval has been specified, retries until no interval is locked
+   */
+  private void ensureLockedIntervals(Interval... intervals)
+  {
+    final Map<String, Integer> minTaskPriority = Collections.singletonMap(fullDatasourceName, 0);
+    final List<Interval> lockedIntervals = new ArrayList<>();
+    ITRetryUtil.retryUntilTrue(
+        () -> {
+          lockedIntervals.clear();
+
+          Map<String, List<Interval>> allIntervals = indexer.getLockedIntervals(minTaskPriority);
+          if (allIntervals.containsKey(fullDatasourceName)) {
+            lockedIntervals.addAll(allIntervals.get(fullDatasourceName));
+          }
+
+          LOG.info("Locked intervals: %s", lockedIntervals);
+          return intervals.length == lockedIntervals.size();
+        },
+        "Verify Locked Intervals"
+    );
+
+    Assert.assertEquals(lockedIntervals, Arrays.asList(intervals));
+  }
+
+  /**
+   * Checks if a test should be skipped based on whether transaction is enabled or not.
+   */
+  private boolean shouldSkipTest(boolean testEnableTransaction)
+  {
+    Map<String, String> kafkaTestProps = KafkaUtil
+        .getAdditionalKafkaTestConfigFromProperties(config);
+    boolean configEnableTransaction = Boolean.parseBoolean(
+        kafkaTestProps.getOrDefault(KafkaUtil.TEST_CONFIG_TRANSACTION_ENABLED, "false")
+    );
+
+    return configEnableTransaction != testEnableTransaction;
+  }
+
+  /**
+   * Submits a compaction config for the current datasource.
+   */
+  private void submitAndVerifyCompactionConfig() throws Exception
+  {
+    final DataSourceCompactionConfig compactionConfig = CompactionUtil
+        .createCompactionConfig(fullDatasourceName, Specs.MAX_ROWS_PER_SEGMENT, Period.ZERO);
+    compactionResource.updateCompactionTaskSlot(0.5, 10);
+    compactionResource.submitCompactionConfig(compactionConfig);
+
+    // Wait for compaction config to persist
+    Thread.sleep(2000);
+
+    // Verify that the compaction config is updated correctly.
+    CoordinatorCompactionConfig coordinatorCompactionConfig = compactionResource.getCoordinatorCompactionConfigs();
+    DataSourceCompactionConfig observedCompactionConfig = null;
+    for (DataSourceCompactionConfig dataSourceCompactionConfig : coordinatorCompactionConfig.getCompactionConfigs()) {
+      if (dataSourceCompactionConfig.getDataSource().equals(fullDatasourceName)) {
+        observedCompactionConfig = dataSourceCompactionConfig;
+      }
+    }
+    Assert.assertEquals(observedCompactionConfig, compactionConfig);
+
+    observedCompactionConfig = compactionResource.getDataSourceCompactionConfig(fullDatasourceName);
+    Assert.assertEquals(observedCompactionConfig, compactionConfig);
+  }
+
+  /**
+   * Checks if the given TaskResponseObject represents a Compaction Task.
+   */
+  private boolean isCompactionTask(TaskResponseObject taskResponse)
+  {
+    return "compact".equalsIgnoreCase(taskResponse.getType());
+  }
+
+  /**
+   * Retries until the total number of complete compaction tasks is as expected.
+   */
+  private void ensureCompactionTaskCount(int expectedCount)
+  {
+    LOG.info("Verifying compaction task count. Expected: %d", expectedCount);
+    ITRetryUtil.retryUntilTrue(
+        () -> getCompactionTaskCount() == expectedCount,
+        "Compaction Task Count"
+    );
+  }
+
+  /**
+   * Gets the number of complete compaction tasks.
+   */
+  private long getCompactionTaskCount()
+  {
+    List<TaskResponseObject> incompleteTasks = indexer
+        .getUncompletedTasksForDataSource(fullDatasourceName);
+    List<TaskResponseObject> completeTasks = indexer
+        .getCompleteTasksForDataSource(fullDatasourceName);
+
+    printTasks(incompleteTasks, "Incomplete");
+    printTasks(completeTasks, "Complete");
+
+    return completeTasks.stream().filter(this::isCompactionTask).count();
+  }
+
+  private void printTasks(List<TaskResponseObject> tasks, String taskState)
+  {
+    StringBuilder sb = new StringBuilder();
+    tasks.forEach(
+        task -> sb.append("{")
+                  .append(task.getType())
+                  .append(", ")
+                  .append(task.getStatus())
+                  .append(", ")
+                  .append(task.getCreatedTime())
+                  .append("}, ")
+    );
+    LOG.info("%s Tasks: %s", taskState, sb);
+  }
+
+  /**
+   * Retries until the total row count is as expected.
+   */
+  private void ensureRowCount(long totalRows)
+  {
+    LOG.info("Verifying Row Count. Expected: %s", totalRows);
+    ITRetryUtil.retryUntilTrue(
+        () ->
+            totalRows == this.queryHelper.countRows(
+                fullDatasourceName,
+                Intervals.ETERNITY,
+                name -> new LongSumAggregatorFactory(name, "count")
+            ),
+        StringUtils.format(
+            "dataSource[%s] consumed [%,d] events, expected [%,d]",
+            fullDatasourceName,
+            this.queryHelper.countRows(
+                fullDatasourceName,
+                Intervals.ETERNITY,
+                name -> new LongSumAggregatorFactory(name, "count")
+            ),
+            totalRows
+        )
+    );
+  }
+
+  /**
+   * Constants for test specs.
+   */
+  private static class Specs
+  {
+    static final String SERIALIZER_PATH = DATA_RESOURCE_ROOT + "/csv/serializer/serializer.json";
+    static final String INPUT_FORMAT_PATH = DATA_RESOURCE_ROOT + "/csv/input_format/input_format.json";
+    static final String PARSER_TYPE = AbstractStreamIndexingTest.INPUT_FORMAT;
+
+    static final int MAX_ROWS_PER_SEGMENT = 10000;
+  }
+
+}
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractIndexerTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractIndexerTest.java
index 6c5067b..f8fd41a 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractIndexerTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractIndexerTest.java
@@ -64,7 +64,7 @@ public abstract class AbstractIndexerTest
   protected TestQueryHelper queryHelper;
 
   @Inject
-  private IntegrationTestingConfig config;
+  protected IntegrationTestingConfig config;
 
   protected Closeable unloader(final String dataSource)
   {
@@ -101,7 +101,7 @@ public abstract class AbstractIndexerTest
     unloadAndKillData(dataSource, first, last);
   }
 
-  protected void loadData(String indexTask, final String fullDatasourceName) throws Exception
+  protected String submitIndexTask(String indexTask, final String fullDatasourceName) throws Exception
   {
     String taskSpec = getResourceAsString(indexTask);
     taskSpec = StringUtils.replace(taskSpec, "%%DATASOURCE%%", fullDatasourceName);
@@ -112,6 +112,13 @@ public abstract class AbstractIndexerTest
     );
     final String taskID = indexer.submitTask(taskSpec);
     LOG.info("TaskID for loading index task %s", taskID);
+
+    return taskID;
+  }
+
+  protected void loadData(String indexTask, final String fullDatasourceName) throws Exception
+  {
+    final String taskID = submitIndexTask(indexTask, fullDatasourceName);
     indexer.waitUntilTaskCompletes(taskID);
 
     ITRetryUtil.retryUntilTrue(
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java
index 3c9ecda..72fbef2 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java
@@ -98,9 +98,6 @@ public abstract class AbstractStreamIndexingTest extends AbstractIndexerTest
   @Inject
   private DruidClusterAdminClient druidClusterAdminClient;
 
-  @Inject
-  private IntegrationTestingConfig config;
-
   private StreamAdminClient streamAdminClient;
 
   abstract StreamAdminClient createStreamAdminClient(IntegrationTestingConfig config) throws Exception;
@@ -171,7 +168,7 @@ public abstract class AbstractStreamIndexingTest extends AbstractIndexerTest
     return map;
   }
 
-  private Closeable createResourceCloser(GeneratedTestConfig generatedTestConfig)
+  protected Closeable createResourceCloser(GeneratedTestConfig generatedTestConfig)
   {
     return Closer.create().register(() -> doMethodTeardown(generatedTestConfig));
   }
@@ -647,7 +644,7 @@ public abstract class AbstractStreamIndexingTest extends AbstractIndexerTest
     }
   }
 
-  private class GeneratedTestConfig
+  protected class GeneratedTestConfig
   {
     private final String streamName;
     private final String fullDatasourceName;
@@ -655,7 +652,7 @@ public abstract class AbstractStreamIndexingTest extends AbstractIndexerTest
     private Function<String, String> streamIngestionPropsTransform;
     private Function<String, String> streamQueryPropsTransform;
 
-    GeneratedTestConfig(String parserType, String parserOrInputFormat) throws Exception
+    public GeneratedTestConfig(String parserType, String parserOrInputFormat) throws Exception
     {
       streamName = getTestNamePrefix() + "_index_test_" + UUID.randomUUID();
       String datasource = getTestNamePrefix() + "_indexing_service_test_" + UUID.randomUUID();
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java
index ef24182..ec9c753 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java
@@ -21,6 +21,7 @@ package org.apache.druid.tests.indexer;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.google.inject.Inject;
+import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
@@ -28,10 +29,16 @@ import org.apache.druid.testing.clients.CoordinatorResourceTestClient;
 import org.apache.druid.testing.guice.DruidTestModuleFactory;
 import org.apache.druid.testing.utils.ITRetryUtil;
 import org.apache.druid.tests.TestNGGroup;
+import org.joda.time.Interval;
+import org.testng.Assert;
 import org.testng.annotations.Guice;
 import org.testng.annotations.Test;
 
 import java.io.Closeable;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.function.Function;
 
 @Test(groups = {TestNGGroup.BATCH_INDEX, TestNGGroup.QUICKSTART_COMPATIBLE})
@@ -65,6 +72,8 @@ public class ITIndexerTest extends AbstractITBatchIndexTest
   private static final String INDEX_WITH_MERGE_COLUMN_LIMIT_TASK = "/indexer/wikipedia_index_with_merge_column_limit_task.json";
   private static final String INDEX_WITH_MERGE_COLUMN_LIMIT_DATASOURCE = "wikipedia_index_with_merge_column_limit_test";
 
+  private static final String GET_LOCKED_INTERVALS = "wikipedia_index_get_locked_intervals_test";
+
   private static final CoordinatorDynamicConfig DYNAMIC_CONFIG_PAUSED =
       CoordinatorDynamicConfig.builder().withPauseCoordination(true).build();
   private static final CoordinatorDynamicConfig DYNAMIC_CONFIG_DEFAULT =
@@ -294,4 +303,37 @@ public class ITIndexerTest extends AbstractITBatchIndexTest
       );
     }
   }
+
+  @Test
+  public void testGetLockedIntervals() throws Exception
+  {
+    final String datasourceName = GET_LOCKED_INTERVALS + config.getExtraDatasourceNameSuffix();
+    try (final Closeable ignored = unloader(datasourceName)) {
+      // Submit an Indexing Task
+      submitIndexTask(INDEX_TASK, datasourceName);
+
+      // Wait until it acquires a lock
+      final Map<String, Integer> minTaskPriority = Collections.singletonMap(datasourceName, 0);
+      final Map<String, List<Interval>> lockedIntervals = new HashMap<>();
+      ITRetryUtil.retryUntilFalse(
+          () -> {
+            lockedIntervals.clear();
+            lockedIntervals.putAll(indexer.getLockedIntervals(minTaskPriority));
+            return lockedIntervals.isEmpty();
+          },
+          "Verify Intervals are Locked"
+      );
+
+      // Verify the locked intervals for this datasource
+      Assert.assertEquals(lockedIntervals.size(), 1);
+      Assert.assertEquals(
+          lockedIntervals.get(datasourceName),
+          Collections.singletonList(Intervals.of("2013-08-31/2013-09-02"))
+      );
+
+      waitForAllTasksToCompleteForDataSource(datasourceName);
+    }
+
+  }
+
 }
diff --git a/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java b/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java
index d8f8f35..44ba61c 100644
--- a/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java
+++ b/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java
@@ -44,6 +44,7 @@ import javax.ws.rs.core.MediaType;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -336,6 +337,28 @@ public class HttpIndexingServiceClient implements IndexingServiceClient
   }
 
   @Override
+  public Map<String, List<Interval>> getLockedIntervals(Map<String, Integer> minTaskPriority)
+  {
+    try {
+      final StringFullResponseHolder responseHolder = druidLeaderClient.go(
+          druidLeaderClient.makeRequest(HttpMethod.POST, "/druid/indexer/v1/lockedIntervals")
+                           .setContent(MediaType.APPLICATION_JSON, jsonMapper.writeValueAsBytes(minTaskPriority))
+      );
+
+      final Map<String, List<Interval>> response = jsonMapper.readValue(
+          responseHolder.getContent(),
+          new TypeReference<Map<String, List<Interval>>>()
+          {
+          }
+      );
+      return response == null ? Collections.emptyMap() : response;
+    }
+    catch (IOException | InterruptedException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
   public SamplerResponse sample(SamplerSpec samplerSpec)
   {
     try {
diff --git a/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java b/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java
index 14dfcfe..c379077 100644
--- a/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java
+++ b/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java
@@ -66,5 +66,17 @@ public interface IndexingServiceClient
   @Nullable
   TaskPayloadResponse getTaskPayload(String taskId);
 
+  /**
+   * Gets a List of Intervals locked by higher priority tasks for each datasource.
+   *
+   * @param minTaskPriority Minimum task priority for each datasource. Only the
+   *                        Intervals that are locked by Tasks higher than this
+   *                        priority are returned. Tasks for datasources that
+   *                        are not present in this Map are not returned.
+   * @return Map from Datasource to List of Intervals locked by Tasks that have
+   * priority strictly greater than the {@code minTaskPriority} for that datasource.
+   */
+  Map<String, List<Interval>> getLockedIntervals(Map<String, Integer> minTaskPriority);
+
   SamplerResponse sample(SamplerSpec samplerSpec);
 }
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorConfig.java
index c82419c..14e3ce0 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorConfig.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorConfig.java
@@ -132,4 +132,11 @@ public abstract class DruidCoordinatorConfig
   {
     return 1;
   }
+
+  @Config("druid.coordinator.compaction.skipLockedIntervals")
+  public boolean getCompactionSkipLockedIntervals()
+  {
+    return true;
+  }
+
 }
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
index 5d31751..e48731b 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
@@ -38,6 +38,7 @@ import org.apache.druid.server.coordinator.CompactionStatistics;
 import org.apache.druid.server.coordinator.CoordinatorCompactionConfig;
 import org.apache.druid.server.coordinator.CoordinatorStats;
 import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
+import org.apache.druid.server.coordinator.DruidCoordinatorConfig;
 import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.VersionedIntervalTimeline;
@@ -79,6 +80,7 @@ public class CompactSegments implements CoordinatorDuty
   private static final Logger LOG = new Logger(CompactSegments.class);
 
   private final CompactionSegmentSearchPolicy policy;
+  private final boolean skipLockedIntervals;
   private final IndexingServiceClient indexingServiceClient;
 
   // This variable is updated by the Coordinator thread executing duties and
@@ -87,13 +89,17 @@ public class CompactSegments implements CoordinatorDuty
 
   @Inject
   public CompactSegments(
+      DruidCoordinatorConfig config,
       ObjectMapper objectMapper,
       IndexingServiceClient indexingServiceClient
   )
   {
     this.policy = new NewestSegmentFirstPolicy(objectMapper);
     this.indexingServiceClient = indexingServiceClient;
+    this.skipLockedIntervals = config.getCompactionSkipLockedIntervals();
     autoCompactionSnapshotPerDataSource.set(new HashMap<>());
+
+    LOG.info("Scheduling compaction with skipLockedIntervals [%s]", skipLockedIntervals);
   }
 
   @Override
@@ -114,9 +120,10 @@ public class CompactSegments implements CoordinatorDuty
             .stream()
             .collect(Collectors.toMap(DataSourceCompactionConfig::getDataSource, Function.identity()));
         final List<TaskStatusPlus> compactionTasks = filterNonCompactionTasks(indexingServiceClient.getActiveTasks());
-        // dataSource -> list of intervals of compaction tasks
-        final Map<String, List<Interval>> compactionTaskIntervals = Maps.newHashMapWithExpectedSize(
-            compactionConfigList.size());
+
+        // dataSource -> list of intervals for which compaction will be skipped in this run
+        final Map<String, List<Interval>> intervalsToSkipCompaction = new HashMap<>();
+
         int numEstimatedNonCompleteCompactionTasks = 0;
         for (TaskStatusPlus status : compactionTasks) {
           final TaskPayloadResponse response = indexingServiceClient.getTaskPayload(status.getId());
@@ -144,7 +151,7 @@ public class CompactSegments implements CoordinatorDuty
             }
             // Skip interval as the current active compaction task is good
             final Interval interval = compactionTaskQuery.getIoConfig().getInputSpec().getInterval();
-            compactionTaskIntervals.computeIfAbsent(status.getDataSource(), k -> new ArrayList<>()).add(interval);
+            intervalsToSkipCompaction.computeIfAbsent(status.getDataSource(), k -> new ArrayList<>()).add(interval);
             // Since we keep the current active compaction task running, we count the active task slots
             numEstimatedNonCompleteCompactionTasks += findMaxNumTaskSlotsUsedByOneCompactionTask(
                 compactionTaskQuery.getTuningConfig()
@@ -154,8 +161,18 @@ public class CompactSegments implements CoordinatorDuty
           }
         }
 
+        // Skip all the intervals locked by higher priority tasks for each datasource
+        // This must be done after the invalid compaction tasks are cancelled
+        // in the loop above so that their intervals are not considered locked
+        getLockedIntervalsToSkip(compactionConfigList).forEach(
+            (dataSource, intervals) ->
+                intervalsToSkipCompaction
+                    .computeIfAbsent(dataSource, ds -> new ArrayList<>())
+                    .addAll(intervals)
+        );
+
         final CompactionSegmentIterator iterator =
-            policy.reset(compactionConfigs, dataSources, compactionTaskIntervals);
+            policy.reset(compactionConfigs, dataSources, intervalsToSkipCompaction);
 
         final int compactionTaskCapacity = (int) Math.min(
             indexingServiceClient.getTotalWorkerCapacity() * dynamicConfig.getCompactionTaskSlotRatio(),
@@ -209,6 +226,47 @@ public class CompactSegments implements CoordinatorDuty
   }
 
   /**
+   * Gets a List of Intervals locked by higher priority tasks for each datasource.
+   * Since compaction tasks submitted for these Intervals would have to wait anyway,
+   * we skip these Intervals until the next compaction run.
+   * <p>
+   * For now, Segment Locks are being treated the same as Time Chunk Locks even
+   * though they lock only a Segment and not the entire Interval. Thus,
+   * a compaction task will not be submitted for an Interval if
+   * <ul>
+   *   <li>either the whole Interval is locked by a higher priority Task</li>
+   *   <li>or there is atleast one Segment in the Interval that is locked by a
+   *   higher priority Task</li>
+   * </ul>
+   */
+  private Map<String, List<Interval>> getLockedIntervalsToSkip(
+      List<DataSourceCompactionConfig> compactionConfigs
+  )
+  {
+    if (!skipLockedIntervals) {
+      LOG.info("Not skipping any locked interval for Compaction");
+      return new HashMap<>();
+    }
+
+    final Map<String, Integer> minTaskPriority = compactionConfigs
+        .stream()
+        .collect(
+            Collectors.toMap(
+                DataSourceCompactionConfig::getDataSource,
+                DataSourceCompactionConfig::getTaskPriority
+            )
+        );
+    final Map<String, List<Interval>> datasourceToLockedIntervals =
+        new HashMap<>(indexingServiceClient.getLockedIntervals(minTaskPriority));
+    LOG.debug(
+        "Skipping the following intervals for Compaction as they are currently locked: %s",
+        datasourceToLockedIntervals
+    );
+
+    return datasourceToLockedIntervals;
+  }
+
+  /**
    * Returns the maximum number of task slots used by one compaction task at any time when the task is issued with
    * the given tuningConfig.
    */
diff --git a/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java b/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java
index 600b4be..af307a4 100644
--- a/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java
+++ b/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java
@@ -108,6 +108,12 @@ public class NoopIndexingServiceClient implements IndexingServiceClient
   }
 
   @Override
+  public Map<String, List<Interval>> getLockedIntervals(Map<String, Integer> minTaskPriority)
+  {
+    return Collections.emptyMap();
+  }
+
+  @Override
   public SamplerResponse sample(SamplerSpec samplerSpec)
   {
     return new SamplerResponse(0, 0, Collections.emptyList());
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorConfigTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorConfigTest.java
index ad56341..084e632 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorConfigTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorConfigTest.java
@@ -47,6 +47,7 @@ public class DruidCoordinatorConfigTest
     Assert.assertEquals(0, config.getCoordinatorKillMaxSegments());
     Assert.assertEquals(new Duration(15 * 60 * 1000), config.getLoadTimeoutDelay());
     Assert.assertEquals(Duration.millis(50), config.getLoadQueuePeonRepeatDelay());
+    Assert.assertTrue(config.getCompactionSkipLockedIntervals());
 
     //with non-defaults
     Properties props = new Properties();
@@ -60,6 +61,7 @@ public class DruidCoordinatorConfigTest
     props.setProperty("druid.coordinator.kill.pendingSegments.on", "true");
     props.setProperty("druid.coordinator.load.timeout", "PT1s");
     props.setProperty("druid.coordinator.loadqueuepeon.repeatDelay", "PT0.100s");
+    props.setProperty("druid.coordinator.compaction.skipLockedIntervals", "false");
 
     factory = Config.createFactory(props);
     config = factory.build(DruidCoordinatorConfig.class);
@@ -72,5 +74,6 @@ public class DruidCoordinatorConfigTest
     Assert.assertEquals(10000, config.getCoordinatorKillMaxSegments());
     Assert.assertEquals(new Duration("PT1s"), config.getLoadTimeoutDelay());
     Assert.assertEquals(Duration.millis(100), config.getLoadQueuePeonRepeatDelay());
+    Assert.assertFalse(config.getCompactionSkipLockedIntervals());
   }
 }
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
index 90b3b5d..a1932dd 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
@@ -64,6 +64,7 @@ import org.apache.druid.server.coordinator.CoordinatorCompactionConfig;
 import org.apache.druid.server.coordinator.CoordinatorRuntimeParamsTestHelpers;
 import org.apache.druid.server.coordinator.CoordinatorStats;
 import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
+import org.apache.druid.server.coordinator.DruidCoordinatorConfig;
 import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
 import org.apache.druid.server.coordinator.UserCompactionTaskGranularityConfig;
 import org.apache.druid.server.coordinator.UserCompactionTaskIOConfig;
@@ -114,6 +115,7 @@ import java.util.stream.Collectors;
 public class CompactSegmentsTest
 {
   private static final ObjectMapper JSON_MAPPER = new DefaultObjectMapper();
+  private static final DruidCoordinatorConfig COORDINATOR_CONFIG = Mockito.mock(DruidCoordinatorConfig.class);
   private static final String DATA_SOURCE_PREFIX = "dataSource_";
   private static final int PARTITION_PER_TIME_INTERVAL = 4;
   // Each dataSource starts with 440 byte, 44 segments, and 11 intervals needing compaction
@@ -188,6 +190,7 @@ public class CompactSegmentsTest
     dataSources = DataSourcesSnapshot
         .fromUsedSegments(allSegments, ImmutableMap.of())
         .getUsedSegmentsTimelinesPerDataSource();
+    Mockito.when(COORDINATOR_CONFIG.getCompactionSkipLockedIntervals()).thenReturn(true);
   }
 
   private DataSegment createSegment(String dataSource, int startDay, boolean beforeNoon, int partition)
@@ -227,7 +230,7 @@ public class CompactSegmentsTest
     final TestDruidLeaderClient leaderClient = new TestDruidLeaderClient(JSON_MAPPER);
     leaderClient.start();
     final HttpIndexingServiceClient indexingServiceClient = new HttpIndexingServiceClient(JSON_MAPPER, leaderClient);
-    final CompactSegments compactSegments = new CompactSegments(JSON_MAPPER, indexingServiceClient);
+    final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, indexingServiceClient);
 
     final Supplier<String> expectedVersionSupplier = new Supplier<String>()
     {
@@ -305,7 +308,7 @@ public class CompactSegmentsTest
     final TestDruidLeaderClient leaderClient = new TestDruidLeaderClient(JSON_MAPPER);
     leaderClient.start();
     final HttpIndexingServiceClient indexingServiceClient = new HttpIndexingServiceClient(JSON_MAPPER, leaderClient);
-    final CompactSegments compactSegments = new CompactSegments(JSON_MAPPER, indexingServiceClient);
+    final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, indexingServiceClient);
 
     // Before any compaction, we do not have any snapshot of compactions
     Map<String, AutoCompactionSnapshot> autoCompactionSnapshots = compactSegments.getAutoCompactionSnapshot();
@@ -397,7 +400,7 @@ public class CompactSegmentsTest
     final TestDruidLeaderClient leaderClient = new TestDruidLeaderClient(JSON_MAPPER);
     leaderClient.start();
     final HttpIndexingServiceClient indexingServiceClient = new HttpIndexingServiceClient(JSON_MAPPER, leaderClient);
-    final CompactSegments compactSegments = new CompactSegments(JSON_MAPPER, indexingServiceClient);
+    final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, indexingServiceClient);
 
     // Before any compaction, we do not have any snapshot of compactions
     Map<String, AutoCompactionSnapshot> autoCompactionSnapshots = compactSegments.getAutoCompactionSnapshot();
@@ -490,7 +493,7 @@ public class CompactSegmentsTest
     final TestDruidLeaderClient leaderClient = new TestDruidLeaderClient(JSON_MAPPER);
     leaderClient.start();
     final HttpIndexingServiceClient indexingServiceClient = new HttpIndexingServiceClient(JSON_MAPPER, leaderClient);
-    final CompactSegments compactSegments = new CompactSegments(JSON_MAPPER, indexingServiceClient);
+    final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, indexingServiceClient);
 
     // Before any compaction, we do not have any snapshot of compactions
     Map<String, AutoCompactionSnapshot> autoCompactionSnapshots = compactSegments.getAutoCompactionSnapshot();
@@ -551,7 +554,7 @@ public class CompactSegmentsTest
     final TestDruidLeaderClient leaderClient = new TestDruidLeaderClient(JSON_MAPPER);
     leaderClient.start();
     final HttpIndexingServiceClient indexingServiceClient = new HttpIndexingServiceClient(JSON_MAPPER, leaderClient);
-    final CompactSegments compactSegments = new CompactSegments(JSON_MAPPER, indexingServiceClient);
+    final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, indexingServiceClient);
 
     final CoordinatorStats stats = doCompactSegments(compactSegments, 3);
     Assert.assertEquals(3, stats.getGlobalStat(CompactSegments.AVAILABLE_COMPACTION_TASK_SLOT));
@@ -563,7 +566,7 @@ public class CompactSegmentsTest
   public void testCompactWithoutGranularitySpec()
   {
     final HttpIndexingServiceClient mockIndexingServiceClient = Mockito.mock(HttpIndexingServiceClient.class);
-    final CompactSegments compactSegments = new CompactSegments(JSON_MAPPER, mockIndexingServiceClient);
+    final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, mockIndexingServiceClient);
     final List<DataSourceCompactionConfig> compactionConfigs = new ArrayList<>();
     final String dataSource = DATA_SOURCE_PREFIX + 0;
     compactionConfigs.add(
@@ -619,7 +622,7 @@ public class CompactSegmentsTest
   public void testCompactWithNotNullIOConfig()
   {
     final HttpIndexingServiceClient mockIndexingServiceClient = Mockito.mock(HttpIndexingServiceClient.class);
-    final CompactSegments compactSegments = new CompactSegments(JSON_MAPPER, mockIndexingServiceClient);
+    final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, mockIndexingServiceClient);
     final List<DataSourceCompactionConfig> compactionConfigs = new ArrayList<>();
     final String dataSource = DATA_SOURCE_PREFIX + 0;
     compactionConfigs.add(
@@ -671,7 +674,7 @@ public class CompactSegmentsTest
   public void testCompactWithNullIOConfig()
   {
     final HttpIndexingServiceClient mockIndexingServiceClient = Mockito.mock(HttpIndexingServiceClient.class);
-    final CompactSegments compactSegments = new CompactSegments(JSON_MAPPER, mockIndexingServiceClient);
+    final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, mockIndexingServiceClient);
     final List<DataSourceCompactionConfig> compactionConfigs = new ArrayList<>();
     final String dataSource = DATA_SOURCE_PREFIX + 0;
     compactionConfigs.add(
@@ -723,7 +726,7 @@ public class CompactSegmentsTest
   public void testCompactWithGranularitySpec()
   {
     final HttpIndexingServiceClient mockIndexingServiceClient = Mockito.mock(HttpIndexingServiceClient.class);
-    final CompactSegments compactSegments = new CompactSegments(JSON_MAPPER, mockIndexingServiceClient);
+    final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, mockIndexingServiceClient);
     final List<DataSourceCompactionConfig> compactionConfigs = new ArrayList<>();
     final String dataSource = DATA_SOURCE_PREFIX + 0;
     compactionConfigs.add(
@@ -818,7 +821,7 @@ public class CompactSegmentsTest
     Mockito.when(mockIndexingServiceClient.getActiveTasks()).thenReturn(ImmutableList.of(runningConflictCompactionTask));
     Mockito.when(mockIndexingServiceClient.getTaskPayload(ArgumentMatchers.eq(conflictTaskId))).thenReturn(runningConflictCompactionTaskPayload);
 
-    final CompactSegments compactSegments = new CompactSegments(JSON_MAPPER, mockIndexingServiceClient);
+    final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, mockIndexingServiceClient);
     final List<DataSourceCompactionConfig> compactionConfigs = new ArrayList<>();
     compactionConfigs.add(
         new DataSourceCompactionConfig(
@@ -884,7 +887,7 @@ public class CompactSegmentsTest
     final TestDruidLeaderClient leaderClient = new TestDruidLeaderClient(JSON_MAPPER);
     leaderClient.start();
     final HttpIndexingServiceClient indexingServiceClient = new HttpIndexingServiceClient(JSON_MAPPER, leaderClient);
-    final CompactSegments compactSegments = new CompactSegments(JSON_MAPPER, indexingServiceClient);
+    final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, indexingServiceClient);
 
     final CoordinatorStats stats = doCompactSegments(compactSegments, createCompactionConfigs(2), 4);
     Assert.assertEquals(4, stats.getGlobalStat(CompactSegments.AVAILABLE_COMPACTION_TASK_SLOT));
@@ -892,6 +895,99 @@ public class CompactSegmentsTest
     Assert.assertEquals(2, stats.getGlobalStat(CompactSegments.COMPACTION_TASK_COUNT));
   }
 
+  @Test
+  public void testRunWithLockedIntervals()
+  {
+    final TestDruidLeaderClient leaderClient = new TestDruidLeaderClient(JSON_MAPPER);
+    leaderClient.start();
+    HttpIndexingServiceClient indexingServiceClient = new HttpIndexingServiceClient(JSON_MAPPER, leaderClient);
+
+    // Lock all intervals for dataSource_1 and dataSource_2
+    final String datasource1 = DATA_SOURCE_PREFIX + 1;
+    leaderClient.lockedIntervals
+        .computeIfAbsent(datasource1, k -> new ArrayList<>())
+        .add(Intervals.of("2017/2018"));
+
+    final String datasource2 = DATA_SOURCE_PREFIX + 2;
+    leaderClient.lockedIntervals
+        .computeIfAbsent(datasource2, k -> new ArrayList<>())
+        .add(Intervals.of("2017/2018"));
+
+    // Lock all intervals but one for dataSource_0
+    final String datasource0 = DATA_SOURCE_PREFIX + 0;
+    leaderClient.lockedIntervals
+        .computeIfAbsent(datasource0, k -> new ArrayList<>())
+        .add(Intervals.of("2017-01-01T13:00:00Z/2017-02-01"));
+
+    // Verify that locked intervals are skipped and only one compaction task
+    // is submitted for dataSource_0
+    CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, indexingServiceClient);
+    final CoordinatorStats stats = doCompactSegments(compactSegments, createCompactionConfigs(2), 4);
+    Assert.assertEquals(1, stats.getGlobalStat(CompactSegments.COMPACTION_TASK_COUNT));
+    Assert.assertEquals(1, leaderClient.submittedCompactionTasks.size());
+
+    final ClientCompactionTaskQuery compactionTask = leaderClient.submittedCompactionTasks.get(0);
+    Assert.assertEquals(datasource0, compactionTask.getDataSource());
+    Assert.assertEquals(
+        Intervals.of("2017-01-01T00:00:00/2017-01-01T12:00:00"),
+        compactionTask.getIoConfig().getInputSpec().getInterval()
+    );
+  }
+
+  @Test
+  public void testRunWithLockedIntervalsNoSkip()
+  {
+    Mockito.when(COORDINATOR_CONFIG.getCompactionSkipLockedIntervals()).thenReturn(false);
+
+    final TestDruidLeaderClient leaderClient = new TestDruidLeaderClient(JSON_MAPPER);
+    leaderClient.start();
+    HttpIndexingServiceClient indexingServiceClient = new HttpIndexingServiceClient(JSON_MAPPER, leaderClient);
+
+    // Lock all intervals for all the dataSources
+    final String datasource0 = DATA_SOURCE_PREFIX + 0;
+    leaderClient.lockedIntervals
+        .computeIfAbsent(datasource0, k -> new ArrayList<>())
+        .add(Intervals.of("2017/2018"));
+
+    final String datasource1 = DATA_SOURCE_PREFIX + 1;
+    leaderClient.lockedIntervals
+        .computeIfAbsent(datasource1, k -> new ArrayList<>())
+        .add(Intervals.of("2017/2018"));
+
+    final String datasource2 = DATA_SOURCE_PREFIX + 2;
+    leaderClient.lockedIntervals
+        .computeIfAbsent(datasource2, k -> new ArrayList<>())
+        .add(Intervals.of("2017/2018"));
+
+    // Verify that no locked intervals are skipped
+    CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, indexingServiceClient);
+    int maxTaskSlots = partitionsSpec instanceof SingleDimensionPartitionsSpec ? 5 : 3;
+    final CoordinatorStats stats = doCompactSegments(compactSegments, createCompactionConfigs(1), maxTaskSlots);
+    Assert.assertEquals(3, stats.getGlobalStat(CompactSegments.COMPACTION_TASK_COUNT));
+    Assert.assertEquals(3, leaderClient.submittedCompactionTasks.size());
+    leaderClient.submittedCompactionTasks.forEach(task -> {
+      System.out.println(task.getDataSource() + " : " + task.getIoConfig().getInputSpec().getInterval());
+    });
+
+    // Verify that tasks are submitted for the latest interval of each dataSource
+    final Map<String, Interval> datasourceToInterval = new HashMap<>();
+    leaderClient.submittedCompactionTasks.forEach(
+        task -> datasourceToInterval.put(
+            task.getDataSource(), task.getIoConfig().getInputSpec().getInterval()));
+    Assert.assertEquals(
+        Intervals.of("2017-01-09T00:00:00Z/2017-01-09T12:00:00Z"),
+        datasourceToInterval.get(datasource0)
+    );
+    Assert.assertEquals(
+        Intervals.of("2017-01-09T00:00:00Z/2017-01-09T12:00:00Z"),
+        datasourceToInterval.get(datasource1)
+    );
+    Assert.assertEquals(
+        Intervals.of("2017-01-09T00:00:00Z/2017-01-09T12:00:00Z"),
+        datasourceToInterval.get(datasource2)
+    );
+  }
+
   private void verifySnapshot(
       CompactSegments compactSegments,
       AutoCompactionSnapshot.AutoCompactionScheduleStatus scheduleStatus,
@@ -1183,6 +1279,12 @@ public class CompactSegmentsTest
   {
     private final ObjectMapper jsonMapper;
 
+    // Map from Task Id to the intervals locked by that task
+    private final Map<String, List<Interval>> lockedIntervals = new HashMap<>();
+
+    // List of submitted compaction tasks for verification in the tests
+    private final List<ClientCompactionTaskQuery> submittedCompactionTasks = new ArrayList<>();
+
     private int compactVersionSuffix = 0;
 
     private TestDruidLeaderClient(ObjectMapper jsonMapper)
@@ -1209,6 +1311,8 @@ public class CompactSegmentsTest
                  || urlString.contains("/druid/indexer/v1/pendingTasks")
                  || urlString.contains("/druid/indexer/v1/runningTasks")) {
         return createStringFullResponseHolder(jsonMapper.writeValueAsString(Collections.emptyList()));
+      } else if (urlString.contains(("/druid/indexer/v1/lockedIntervals"))) {
+        return handleLockedIntervals();
       } else {
         throw new IAE("Cannot handle request for url[%s]", request.getUrl());
       }
@@ -1252,6 +1356,8 @@ public class CompactSegmentsTest
         throw new IAE("Cannot run non-compaction task");
       }
       final ClientCompactionTaskQuery compactionTaskQuery = (ClientCompactionTaskQuery) taskQuery;
+      submittedCompactionTasks.add(compactionTaskQuery);
+
       final Interval intervalToCompact = compactionTaskQuery.getIoConfig().getInputSpec().getInterval();
       final VersionedIntervalTimeline<String, DataSegment> timeline = dataSources.get(
           compactionTaskQuery.getDataSource()
@@ -1269,6 +1375,11 @@ public class CompactSegmentsTest
       return createStringFullResponseHolder(jsonMapper.writeValueAsString(ImmutableMap.of("task", taskQuery.getId())));
     }
 
+    private StringFullResponseHolder handleLockedIntervals() throws IOException
+    {
+      return createStringFullResponseHolder(jsonMapper.writeValueAsString(lockedIntervals));
+    }
+
     private void compactSegments(
         VersionedIntervalTimeline<String, DataSegment> timeline,
         List<DataSegment> segments,

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org