You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ch...@apache.org on 2023/05/15 05:38:39 UTC

[druid] branch master updated: Add new lock types: APPEND and REPLACE (#14258)

This is an automated email from the ASF dual-hosted git repository.

cheddar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new e9913abbbf Add new lock types: APPEND and REPLACE (#14258)
e9913abbbf is described below

commit e9913abbbfed82c11d1f876f30f24a768267ab48
Author: AmatyaAvadhanula <am...@imply.io>
AuthorDate: Mon May 15 11:08:32 2023 +0530

    Add new lock types: APPEND and REPLACE (#14258)
    
    * Add new lock types: APPEND and REPLACE
---
 .../apache/druid/indexing/common/TaskLockType.java |  29 +-
 .../druid/indexing/overlord/TaskLockbox.java       | 237 ++++++--
 .../druid/indexing/overlord/TaskLockboxTest.java   | 648 +++++++++++++++++----
 3 files changed, 744 insertions(+), 170 deletions(-)

diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskLockType.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskLockType.java
index b51990cee0..43f79cf265 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskLockType.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskLockType.java
@@ -19,8 +19,35 @@
 
 package org.apache.druid.indexing.common;
 
+/**
+ * 1) A revoked lock is compatible with every lock.
+ * 2) An appending task may use only EXCLUSIVE, SHARED or APPEND locks
+ * 3) A replacing task may use only EXCLUSIVE or REPLACE locks
+ * 4) REPLACE and APPEND locks can only be used with timechunk locking
+ */
 public enum TaskLockType
 {
+  /**
+   * There can be at most one active EXCLUSIVE lock for a given interval.
+   * It cannot co-exist with any other active locks with overlapping intervals.
+   */
+  EXCLUSIVE,
+  /**
+   * There can be any number of active SHARED locks for a given interval.
+   * They can coexist only with other SHARED locks, but not with active locks of other types.
+   */
   SHARED,
-  EXCLUSIVE // taskLocks of this type can be shared by tasks of the same groupId.
+  /**
+   * There can be at most one active REPLACE lock for a given interval.
+   * It can co-exist only with APPEND locks whose intervals are enclosed within its interval,
+   * and is incompatible with all other active locks with overlapping intervals.
+   */
+  REPLACE,
+  /**
+   * There can be any number of active APPEND locks for a given interval.
+   * They can coexist with other APPEND locks,
+   * and with at most one REPLACE lock whose interval encloses that of the APPEND lock.
+   * They are incompatible with all other active locks.
+   */
+  APPEND
 }
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
index 927b09557f..f84f5f3075 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
@@ -39,6 +39,7 @@ import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.UOE;
 import org.apache.druid.java.util.common.guava.Comparators;
 import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
@@ -614,13 +615,21 @@ public class TaskLockbox
         if (reusablePosses.size() == 0) {
           // case 1) this task doesn't have any lock, but others do
 
-          if (request.getType().equals(TaskLockType.SHARED)
-              && areAllEqualOrHigherPriorityLocksSharedOrRevoked(conflictPosses, request.getPriority())) {
-            // Any number of shared locks can be acquired for the same dataSource and interval
-            // Exclusive locks of equal or greater priority, if present, must already be revoked
-            // Exclusive locks of lower priority can be revoked
-            revokeAllLowerPriorityNonSharedLocks(conflictPosses, request.getPriority());
+          if ((request.getType().equals(TaskLockType.APPEND) || request.getType().equals(TaskLockType.REPLACE))
+              && !request.getGranularity().equals(LockGranularity.TIME_CHUNK)) {
+            // APPEND and REPLACE locks are specific to time chunk locks
+            return null;
+          }
+
+          // First, check if the lock can coexist with its conflicting posses
+          if (canLockCoexist(conflictPosses, request)) {
+            return createNewTaskLockPosse(request);
+          }
+
+          // If not, revoke all lower priority locks of different types if the request has a greater priority
+          if (revokeAllIncompatibleActiveLocksIfPossible(conflictPosses, request)) {
             return createNewTaskLockPosse(request);
+
           } else {
             // During a rolling update, tasks of mixed versions can be run at the same time. Old tasks would request
             // timeChunkLocks while new tasks would ask segmentLocks. The below check is to allow for old and new tasks
@@ -637,19 +646,12 @@ public class TaskLockbox
               // We can add a new taskLockPosse.
               return createNewTaskLockPosse(request);
             } else {
-              if (isAllRevocable(conflictPosses, request.getPriority())) {
-                // Revoke all existing locks
-                conflictPosses.forEach(this::revokeLock);
-
-                return createNewTaskLockPosse(request);
-              } else {
-                log.info(
-                    "Cannot create a new taskLockPosse for request[%s] because existing locks[%s] have same or higher priorities",
-                    request,
-                    conflictPosses
-                );
-                return null;
-              }
+              log.info(
+                  "Cannot create a new taskLockPosse for request[%s] because existing locks[%s] have same or higher priorities",
+                  request,
+                  conflictPosses
+              );
+              return null;
             }
           }
         } else if (reusablePosses.size() == 1) {
@@ -1239,43 +1241,190 @@ public class TaskLockbox
   }
 
   /**
-   * Check if all lockPosses are either shared
-   * OR of lower priority
-   * OR are revoked non-shared locks if their priorities are greater than or equal to the provided priority
-   * @param lockPosses conflicting task lock posses to be checked
-   * @param priority priority of the lock to be acquired
-   * @return true if the condititons are met
+   * Check if the lock for a given request can coexist with a given set of conflicting posses without any revocation.
+   * @param conflictPosses conflict lock posses
+   * @param request lock request
+   * @return true iff the lock can coexist with all its conflicting locks
    */
-  private static boolean areAllEqualOrHigherPriorityLocksSharedOrRevoked(List<TaskLockPosse> lockPosses, int priority)
+  private boolean canLockCoexist(List<TaskLockPosse> conflictPosses, LockRequest request)
   {
-    return lockPosses.stream()
-                     .filter(taskLockPosse -> taskLockPosse.getTaskLock().getNonNullPriority() >= priority)
-                     .allMatch(taskLockPosse -> taskLockPosse.getTaskLock().getType().equals(TaskLockType.SHARED)
-                                                || taskLockPosse.getTaskLock().isRevoked());
+    switch (request.getType()) {
+      case APPEND:
+        return canAppendLockCoexist(conflictPosses, request);
+      case REPLACE:
+        return canReplaceLockCoexist(conflictPosses, request);
+      case SHARED:
+        return canSharedLockCoexist(conflictPosses);
+      case EXCLUSIVE:
+        return canExclusiveLockCoexist(conflictPosses);
+      default:
+        throw new UOE("Unsupported lock type: " + request.getType());
+    }
   }
 
   /**
-   * Revokes all non-shared locks with priorities lower than the provided priority
-   * @param lockPosses conflicting task lock posses which may be revoked
-   * @param priority priority of the lock to be acquired
+   * Check if an APPEND lock can coexist with a given set of conflicting posses.
+   * An APPEND lock can coexist with any number of other APPEND locks
+   *    OR with at most one REPLACE lock over an interval which encloes this request.
+   * @param conflictPosses conflicting lock posses
+   * @param appendRequest append lock request
+   * @return true iff append lock can coexist with all its conflicting locks
    */
-  private void revokeAllLowerPriorityNonSharedLocks(List<TaskLockPosse> lockPosses, int priority)
+  private boolean canAppendLockCoexist(List<TaskLockPosse> conflictPosses, LockRequest appendRequest)
   {
-    lockPosses.stream()
-              .filter(taskLockPosse -> !TaskLockType.SHARED.equals(taskLockPosse.getTaskLock().getType()))
-              .filter(taskLockPosse -> taskLockPosse.getTaskLock().getNonNullPriority() < priority)
-              .forEach(this::revokeLock);
+    TaskLock replaceLock = null;
+    for (TaskLockPosse posse : conflictPosses) {
+      if (posse.getTaskLock().isRevoked()) {
+        continue;
+      }
+      if (posse.getTaskLock().getType().equals(TaskLockType.EXCLUSIVE)
+          || posse.getTaskLock().getType().equals(TaskLockType.SHARED)) {
+        return false;
+      }
+      if (posse.getTaskLock().getType().equals(TaskLockType.REPLACE)) {
+        if (replaceLock != null) {
+          return false;
+        }
+        replaceLock = posse.getTaskLock();
+        if (!replaceLock.getInterval().contains(appendRequest.getInterval())) {
+          return false;
+        }
+      }
+    }
+    return true;
   }
 
-  private static boolean isAllRevocable(List<TaskLockPosse> lockPosses, int tryLockPriority)
+  /**
+   * Check if a REPLACE lock can coexist with a given set of conflicting posses.
+   * A REPLACE lock can coexist with any number of other APPEND locks and revoked locks
+   * @param conflictPosses conflicting lock posses
+   * @param replaceLock replace lock request
+   * @return true iff replace lock can coexist with all its conflicting locks
+   */
+  private boolean canReplaceLockCoexist(List<TaskLockPosse> conflictPosses, LockRequest replaceLock)
   {
-    return lockPosses.stream().allMatch(taskLockPosse -> isRevocable(taskLockPosse, tryLockPriority));
+    for (TaskLockPosse posse : conflictPosses) {
+      if (posse.getTaskLock().isRevoked()) {
+        continue;
+      }
+      if (posse.getTaskLock().getType().equals(TaskLockType.EXCLUSIVE)
+          || posse.getTaskLock().getType().equals(TaskLockType.SHARED)
+          || posse.getTaskLock().getType().equals(TaskLockType.REPLACE)) {
+        return false;
+      }
+      if (posse.getTaskLock().getType().equals(TaskLockType.APPEND)
+          && !replaceLock.getInterval().contains(posse.getTaskLock().getInterval())) {
+        return false;
+      }
+    }
+    return true;
   }
 
-  private static boolean isRevocable(TaskLockPosse lockPosse, int tryLockPriority)
+  /**
+   * Check if a SHARED lock can coexist with a given set of conflicting posses.
+   * A SHARED lock can coexist with any number of other active SHARED locks
+   * @param conflictPosses conflicting lock posses
+   * @return true iff shared lock can coexist with all its conflicting locks
+   */
+  private boolean canSharedLockCoexist(List<TaskLockPosse> conflictPosses)
   {
-    final TaskLock existingLock = lockPosse.getTaskLock();
-    return existingLock.isRevoked() || existingLock.getNonNullPriority() < tryLockPriority;
+    for (TaskLockPosse posse : conflictPosses) {
+      if (posse.getTaskLock().isRevoked()) {
+        continue;
+      }
+      if (posse.getTaskLock().getType().equals(TaskLockType.EXCLUSIVE)
+          || posse.getTaskLock().getType().equals(TaskLockType.APPEND)
+          || posse.getTaskLock().getType().equals(TaskLockType.REPLACE)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Check if an EXCLUSIVE lock can coexist with a given set of conflicting posses.
+   * An EXCLUSIVE lock cannot coexist with any other overlapping active locks
+   * @param conflictPosses conflicting lock posses
+   * @return true iff the exclusive lock can coexist with all its conflicting locks
+   */
+  private boolean canExclusiveLockCoexist(List<TaskLockPosse> conflictPosses)
+  {
+    for (TaskLockPosse posse : conflictPosses) {
+      if (posse.getTaskLock().isRevoked()) {
+        continue;
+      }
+      return false;
+    }
+    return true;
+  }
+
+
+  /**
+   * Verify if every incompatible active lock is revokable. If yes, revoke all of them.
+   * - EXCLUSIVE locks are incompatible with every other conflicting lock
+   * - SHARED locks are incompatible with conflicting locks of every other type
+   * - REPLACE locks are incompatible with every conflicting lock which is not (APPEND and enclosed) within its interval
+   * - APPEND locks are incompatible with every EXCLUSIVE and SHARED lock.
+   *   Conflicting REPLACE locks which don't enclose its interval are also incompatible.
+   * @param conflictPosses conflicting lock posses
+   * @param request lock request
+   * @return true iff every incompatible lock is revocable.
+   */
+  private boolean revokeAllIncompatibleActiveLocksIfPossible(
+      List<TaskLockPosse> conflictPosses,
+      LockRequest request
+  )
+  {
+    final int priority = request.getPriority();
+    final TaskLockType type = request.getType();
+    final List<TaskLockPosse> possesToRevoke = new ArrayList<>();
+
+    for (TaskLockPosse posse : conflictPosses) {
+      if (posse.getTaskLock().isRevoked()) {
+        continue;
+      }
+      switch (type) {
+        case EXCLUSIVE:
+          if (posse.getTaskLock().getPriority() >= priority) {
+            return false;
+          }
+          possesToRevoke.add(posse);
+          break;
+        case SHARED:
+          if (!posse.getTaskLock().getType().equals(TaskLockType.SHARED)) {
+            if (posse.getTaskLock().getPriority() >= priority) {
+              return false;
+            }
+            possesToRevoke.add(posse);
+          }
+          break;
+        case REPLACE:
+          if (!(posse.getTaskLock().getType().equals(TaskLockType.APPEND)
+                && request.getInterval().contains(posse.getTaskLock().getInterval()))) {
+            if (posse.getTaskLock().getPriority() >= priority) {
+              return false;
+            }
+            possesToRevoke.add(posse);
+          }
+          break;
+        case APPEND:
+          if (!(posse.getTaskLock().getType().equals(TaskLockType.APPEND)
+                || (posse.getTaskLock().getType().equals(TaskLockType.REPLACE)
+                    && posse.getTaskLock().getInterval().contains(request.getInterval())))) {
+            if (posse.getTaskLock().getPriority() >= priority) {
+              return false;
+            }
+            possesToRevoke.add(posse);
+          }
+          break;
+        default:
+          throw new UOE("Unsupported lock type: " + type);
+      }
+    }
+    for (TaskLockPosse revokablePosse : possesToRevoke) {
+      revokeLock(revokablePosse);
+    }
+    return true;
   }
 
   /**
@@ -1350,6 +1499,8 @@ public class TaskLockbox
     {
       if (taskLock.getType() == request.getType() && taskLock.getGranularity() == request.getGranularity()) {
         switch (taskLock.getType()) {
+          case REPLACE:
+          case APPEND:
           case SHARED:
             if (request instanceof TimeChunkLockRequest) {
               return taskLock.getInterval().contains(request.getInterval())
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
index de847d59c0..15e59227c2 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
@@ -75,7 +75,6 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -93,6 +92,11 @@ public class TaskLockboxTest
   private TaskStorage taskStorage;
   private IndexerMetadataStorageCoordinator metadataStorageCoordinator;
   private TaskLockbox lockbox;
+  private TaskLockboxValidator validator;
+
+  private final int HIGH_PRIORITY = 15;
+  private final int MEDIUM_PRIORITY = 10;
+  private final int LOW_PRIORITY = 5;
 
   @Rule
   public final ExpectedException exception = ExpectedException.none();
@@ -124,6 +128,7 @@ public class TaskLockboxTest
     metadataStorageCoordinator = new IndexerSQLMetadataStorageCoordinator(objectMapper, tablesConfig, derbyConnector);
 
     lockbox = new TaskLockbox(taskStorage, metadataStorageCoordinator);
+    validator = new TaskLockboxValidator(lockbox, taskStorage);
   }
 
   private LockResult acquireTimeChunkLock(TaskLockType lockType, Task task, Interval interval, long timeoutMs)
@@ -144,11 +149,13 @@ public class TaskLockboxTest
   }
 
   @Test
-  public void testLock() throws InterruptedException
+  public void testLock() throws Exception
   {
-    Task task = NoopTask.create();
-    lockbox.add(task);
-    Assert.assertNotNull(acquireTimeChunkLock(TaskLockType.EXCLUSIVE, task, Intervals.of("2015-01-01/2015-01-02")));
+    validator.expectLockCreated(
+        TaskLockType.EXCLUSIVE,
+        Intervals.of("2015-01-01/2015-01-02"),
+        MEDIUM_PRIORITY
+    );
   }
 
   @Test(expected = IllegalStateException.class)
@@ -169,81 +176,46 @@ public class TaskLockboxTest
   }
 
   @Test
-  public void testTrySharedLock() throws EntryExistsException
+  public void testTrySharedLock() throws Exception
   {
     final Interval interval = Intervals.of("2017-01/2017-02");
-    final List<Task> tasks = new ArrayList<>();
-    final Set<TaskLock> activeLocks = new HashSet<>();
-
-    // Add an exclusive lock entry of the highest priority
-    Task exclusiveHigherPriorityRevokedLockTask = NoopTask.create(100);
-    tasks.add(exclusiveHigherPriorityRevokedLockTask);
-    taskStorage.insert(
-        exclusiveHigherPriorityRevokedLockTask,
-        TaskStatus.running(exclusiveHigherPriorityRevokedLockTask.getId())
-    );
-    lockbox.add(exclusiveHigherPriorityRevokedLockTask);
-    final TaskLock exclusiveRevokedLock = tryTimeChunkLock(
+
+    final TaskLock exclusiveRevokedLock = validator.expectLockCreated(
         TaskLockType.EXCLUSIVE,
-        exclusiveHigherPriorityRevokedLockTask,
-        interval
-    ).getTaskLock();
-
-    // Any equal or lower priority shared lock must fail
-    final Task sharedLockTask = NoopTask.create(100);
-    lockbox.add(sharedLockTask);
-    Assert.assertFalse(tryTimeChunkLock(TaskLockType.SHARED, sharedLockTask, interval).isOk());
-
-    // Revoke existing active exclusive lock
-    lockbox.revokeLock(exclusiveHigherPriorityRevokedLockTask.getId(), exclusiveRevokedLock);
-    Assert.assertEquals(1, getAllLocks(tasks).size());
-    Assert.assertEquals(0, getAllActiveLocks(tasks).size());
-    Assert.assertEquals(activeLocks, getAllActiveLocks(tasks));
-
-    // test creating new shared locks
-    for (int i = 0; i < 3; i++) {
-      final Task task = NoopTask.create(Math.max(0, (i - 1) * 10)); // the first two tasks have the same priority
-      tasks.add(task);
-      taskStorage.insert(task, TaskStatus.running(task.getId()));
-      lockbox.add(task);
-      final TaskLock lock = tryTimeChunkLock(TaskLockType.SHARED, task, interval).getTaskLock();
-      Assert.assertNotNull(lock);
-      activeLocks.add(lock);
-    }
-    Assert.assertEquals(4, getAllLocks(tasks).size());
-    Assert.assertEquals(3, getAllActiveLocks(tasks).size());
-    Assert.assertEquals(activeLocks, getAllActiveLocks(tasks));
-
-    // Adding an exclusive task lock of priority 15 should revoke all existing active locks
-    Task exclusiveLowerPriorityLockTask = NoopTask.create(15);
-    tasks.add(exclusiveLowerPriorityLockTask);
-    taskStorage.insert(exclusiveLowerPriorityLockTask, TaskStatus.running(exclusiveLowerPriorityLockTask.getId()));
-    lockbox.add(exclusiveLowerPriorityLockTask);
-    final TaskLock lowerPriorityExclusiveLock = tryTimeChunkLock(
+        interval,
+        HIGH_PRIORITY
+    );
+
+    validator.expectLockNotGranted(
+        TaskLockType.SHARED,
+        interval,
+        HIGH_PRIORITY
+    );
+
+    validator.revokeLock(exclusiveRevokedLock);
+    validator.expectRevokedLocks(exclusiveRevokedLock);
+
+    final TaskLock lowPrioritySharedLock = validator.expectLockCreated(
+        TaskLockType.SHARED,
+        interval,
+        LOW_PRIORITY
+    );
+
+    final TaskLock mediumPriorityExclusiveLock = validator.expectLockCreated(
         TaskLockType.EXCLUSIVE,
-        exclusiveLowerPriorityLockTask,
-        interval
-    ).getTaskLock();
-    activeLocks.clear();
-    activeLocks.add(lowerPriorityExclusiveLock);
-    Assert.assertEquals(5, getAllLocks(tasks).size());
-    Assert.assertEquals(1, getAllActiveLocks(tasks).size());
-    Assert.assertEquals(activeLocks, getAllActiveLocks(tasks));
-
-    // Add new shared locks which revoke the active exclusive task lock
-    activeLocks.clear();
-    for (int i = 3; i < 5; i++) {
-      final Task task = NoopTask.create(Math.max(0, (i - 1) * 10)); // the first two tasks have the same priority
-      tasks.add(task);
-      taskStorage.insert(task, TaskStatus.running(task.getId()));
-      lockbox.add(task);
-      final TaskLock lock = tryTimeChunkLock(TaskLockType.SHARED, task, interval).getTaskLock();
-      Assert.assertNotNull(lock);
-      activeLocks.add(lock);
-    }
-    Assert.assertEquals(7, getAllLocks(tasks).size());
-    Assert.assertEquals(2, getAllActiveLocks(tasks).size());
-    Assert.assertEquals(activeLocks, getAllActiveLocks(tasks));
+        interval,
+        MEDIUM_PRIORITY
+    );
+    validator.expectActiveLocks(mediumPriorityExclusiveLock);
+    validator.expectRevokedLocks(exclusiveRevokedLock, lowPrioritySharedLock);
+
+    final TaskLock highPrioritySharedLock = validator.expectLockCreated(
+        TaskLockType.SHARED,
+        interval,
+        HIGH_PRIORITY
+    );
+    validator.expectActiveLocks(highPrioritySharedLock);
+    validator.expectRevokedLocks(exclusiveRevokedLock, lowPrioritySharedLock, mediumPriorityExclusiveLock);
   }
 
   @Test
@@ -532,8 +504,8 @@ public class TaskLockboxTest
     newBox.syncFromStorage();
 
     final Set<TaskLock> afterLocksInStorage = taskStorage.getActiveTasks().stream()
-                                                          .flatMap(task -> taskStorage.getLocks(task.getId()).stream())
-                                                          .collect(Collectors.toSet());
+                                                         .flatMap(task -> taskStorage.getLocks(task.getId()).stream())
+                                                         .collect(Collectors.toSet());
 
     Assert.assertEquals(
         beforeLocksInStorage.values().stream().flatMap(Collection::stream).collect(Collectors.toSet()),
@@ -1272,6 +1244,380 @@ public class TaskLockboxTest
     );
   }
 
+  @Test
+  public void testExclusiveLockCompatibility() throws Exception
+  {
+    final TaskLock theLock = validator.expectLockCreated(
+        TaskLockType.EXCLUSIVE,
+        Intervals.of("2017/2018"),
+        MEDIUM_PRIORITY
+    );
+
+    validator.expectLockNotGranted(
+        TaskLockType.EXCLUSIVE,
+        Intervals.of("2017-05-01/2017-06-01"),
+        MEDIUM_PRIORITY
+    );
+
+    validator.expectLockNotGranted(
+        TaskLockType.SHARED,
+        Intervals.of("2016/2019"),
+        MEDIUM_PRIORITY
+    );
+
+    validator.expectLockNotGranted(
+        TaskLockType.REPLACE,
+        Intervals.of("2017/2018"),
+        MEDIUM_PRIORITY
+    );
+
+    validator.expectLockNotGranted(
+        TaskLockType.APPEND,
+        Intervals.of("2017-05-01/2018-05-01"),
+        MEDIUM_PRIORITY
+    );
+
+    validator.expectActiveLocks(theLock);
+    validator.expectRevokedLocks();
+  }
+
+  @Test
+  public void testExclusiveLockCanRevokeAllIncompatible() throws Exception
+  {
+    final TaskLockboxValidator validator = new TaskLockboxValidator(lockbox, taskStorage);
+
+    final TaskLock sharedLock = validator.tryTaskLock(
+        TaskLockType.SHARED,
+        Intervals.of("2016/2019"),
+        HIGH_PRIORITY
+    );
+    validator.revokeLock(sharedLock);
+
+    final TaskLock exclusiveLock = validator.expectLockCreated(
+        TaskLockType.EXCLUSIVE,
+        Intervals.of("2017-01-01/2017-02-01"),
+        LOW_PRIORITY
+    );
+
+    final TaskLock replaceLock = validator.expectLockCreated(
+        TaskLockType.REPLACE,
+        Intervals.of("2017-07-01/2018-01-01"),
+        LOW_PRIORITY
+    );
+
+    final TaskLock appendLock = validator.expectLockCreated(
+        TaskLockType.APPEND,
+        Intervals.of("2017-09-01/2017-10-01"),
+        LOW_PRIORITY
+    );
+
+    validator.expectActiveLocks(exclusiveLock, replaceLock, appendLock);
+    validator.expectRevokedLocks(sharedLock);
+
+    final TaskLock theLock = validator.expectLockCreated(
+        TaskLockType.EXCLUSIVE,
+        Intervals.of("2017/2018"),
+        MEDIUM_PRIORITY
+    );
+
+    validator.expectActiveLocks(theLock);
+    validator.expectRevokedLocks(sharedLock, exclusiveLock, appendLock, replaceLock);
+  }
+
+  @Test
+  public void testSharedLockCompatibility() throws Exception
+  {
+    final TaskLock theLock = validator.expectLockCreated(
+        TaskLockType.SHARED,
+        Intervals.of("2017/2018"),
+        MEDIUM_PRIORITY
+    );
+
+    validator.expectLockNotGranted(
+        TaskLockType.EXCLUSIVE,
+        Intervals.of("2017-05-01/2017-06-01"),
+        MEDIUM_PRIORITY
+    );
+
+    final TaskLock sharedLock0 = validator.expectLockCreated(
+        TaskLockType.SHARED,
+        Intervals.of("2016/2019"),
+        LOW_PRIORITY
+    );
+
+    final TaskLock sharedLock1 = validator.expectLockCreated(
+        TaskLockType.SHARED,
+        Intervals.of("2017-06-01/2017-07-01"),
+        LOW_PRIORITY
+    );
+
+    final TaskLock sharedLock2 = validator.expectLockCreated(
+        TaskLockType.SHARED,
+        Intervals.of("2017-05-01/2018-05-01"),
+        MEDIUM_PRIORITY
+    );
+
+    validator.expectLockNotGranted(
+        TaskLockType.REPLACE,
+        Intervals.of("2017/2018"),
+        MEDIUM_PRIORITY
+    );
+
+    validator.expectLockNotGranted(
+        TaskLockType.APPEND,
+        Intervals.of("2017-05-01/2018-05-01"),
+        MEDIUM_PRIORITY
+    );
+
+    validator.expectActiveLocks(theLock, sharedLock0, sharedLock1, sharedLock2);
+    validator.expectRevokedLocks();
+  }
+
+  @Test
+  public void testSharedLockCanRevokeAllIncompatible() throws Exception
+  {
+    final TaskLock exclusiveLock = validator.expectLockCreated(
+        TaskLockType.EXCLUSIVE,
+        Intervals.of("2016/2019"),
+        HIGH_PRIORITY
+    );
+    validator.revokeLock(exclusiveLock);
+
+    final TaskLock sharedLock = validator.expectLockCreated(
+        TaskLockType.SHARED,
+        Intervals.of("2017-01-01/2017-02-01"),
+        MEDIUM_PRIORITY
+    );
+
+    final TaskLock replaceLock = validator.expectLockCreated(
+        TaskLockType.REPLACE,
+        Intervals.of("2017-07-01/2018-07-01"),
+        LOW_PRIORITY
+    );
+
+    final TaskLock appendLock = validator.expectLockCreated(
+        TaskLockType.APPEND,
+        Intervals.of("2017-02-01/2017-03-01"),
+        LOW_PRIORITY
+    );
+
+    validator.expectActiveLocks(sharedLock, replaceLock, appendLock);
+    validator.expectRevokedLocks(exclusiveLock);
+
+    final TaskLock theLock = validator.expectLockCreated(
+        TaskLockType.SHARED,
+        Intervals.of("2017/2018"),
+        MEDIUM_PRIORITY
+    );
+
+    validator.expectActiveLocks(theLock, sharedLock);
+    validator.expectRevokedLocks(exclusiveLock, replaceLock, appendLock);
+  }
+
+  @Test
+  public void testAppendLockCompatibility() throws Exception
+  {
+    final TaskLock theLock = validator.expectLockCreated(
+        TaskLockType.APPEND,
+        Intervals.of("2017/2018"),
+        MEDIUM_PRIORITY
+    );
+
+    validator.expectLockNotGranted(
+        TaskLockType.EXCLUSIVE,
+        Intervals.of("2017-05-01/2017-06-01"),
+        MEDIUM_PRIORITY
+    );
+
+    validator.expectLockNotGranted(
+        TaskLockType.SHARED,
+        Intervals.of("2016/2019"),
+        MEDIUM_PRIORITY
+    );
+
+    validator.expectLockNotGranted(
+        TaskLockType.REPLACE,
+        Intervals.of("2017-05-01/2018-01-01"),
+        MEDIUM_PRIORITY
+    );
+
+    final TaskLock replaceLock = validator.expectLockCreated(
+        TaskLockType.REPLACE,
+        Intervals.of("2017/2018"),
+        MEDIUM_PRIORITY
+    );
+
+    validator.expectLockNotGranted(
+        TaskLockType.REPLACE,
+        Intervals.of("2016/2019"),
+        MEDIUM_PRIORITY
+    );
+
+
+    // Any append lock can be created, provided that it lies within the interval of the previously created replace lock
+    // This should not revoke any of the existing locks even with a higher priority
+    final TaskLock appendLock0 = validator.expectLockCreated(
+        TaskLockType.APPEND,
+        Intervals.of("2017-05-01/2017-06-01"),
+        HIGH_PRIORITY
+    );
+
+    final TaskLock appendLock1 = validator.expectLockCreated(
+        TaskLockType.APPEND,
+        Intervals.of("2017-05-01/2017-06-01"),
+        LOW_PRIORITY
+    );
+
+    validator.expectActiveLocks(theLock, replaceLock, appendLock0, appendLock1);
+    validator.expectRevokedLocks();
+  }
+
+  @Test
+  public void testAppendLockCanRevokeAllIncompatible() throws Exception
+  {
+    final TaskLock sharedLock = validator.expectLockCreated(
+        TaskLockType.SHARED,
+        Intervals.of("2016/2019"),
+        HIGH_PRIORITY
+    );
+    validator.revokeLock(sharedLock);
+
+    final TaskLock exclusiveLock = validator.expectLockCreated(
+        TaskLockType.EXCLUSIVE,
+        Intervals.of("2017-01-01/2017-02-01"),
+        LOW_PRIORITY
+    );
+
+    final TaskLock replaceLock = validator.expectLockCreated(
+        TaskLockType.REPLACE,
+        Intervals.of("2017-07-01/2018-07-01"),
+        LOW_PRIORITY
+    );
+
+    final TaskLock appendLock0 = validator.expectLockCreated(
+        TaskLockType.APPEND,
+        Intervals.of("2017-02-01/2017-03-01"),
+        LOW_PRIORITY
+    );
+
+    final TaskLock appendLock1 = validator.expectLockCreated(
+        TaskLockType.APPEND,
+        Intervals.of("2017-02-01/2017-05-01"),
+        HIGH_PRIORITY
+    );
+
+    validator.expectActiveLocks(exclusiveLock, replaceLock, appendLock0, appendLock1);
+    validator.expectRevokedLocks(sharedLock);
+
+    final TaskLock theLock = validator.expectLockCreated(
+        TaskLockType.APPEND,
+        Intervals.of("2017/2018"),
+        MEDIUM_PRIORITY
+    );
+
+    validator.expectActiveLocks(theLock, appendLock0, appendLock1);
+    validator.expectRevokedLocks(sharedLock, exclusiveLock, replaceLock);
+  }
+
+
+  @Test
+  public void testReplaceLockCompatibility() throws Exception
+  {
+    final TaskLock theLock = validator.expectLockCreated(
+        TaskLockType.REPLACE,
+        Intervals.of("2017/2018"),
+        MEDIUM_PRIORITY
+    );
+
+    validator.expectLockNotGranted(
+        TaskLockType.EXCLUSIVE,
+        Intervals.of("2017-05-01/2017-06-01"),
+        MEDIUM_PRIORITY
+    );
+
+    validator.expectLockNotGranted(
+        TaskLockType.SHARED,
+        Intervals.of("2016/2019"),
+        MEDIUM_PRIORITY
+    );
+
+    validator.expectLockNotGranted(
+        TaskLockType.REPLACE,
+        Intervals.of("2017/2018"),
+        MEDIUM_PRIORITY
+    );
+
+    // An append lock can be created for an interval enclosed within the replace lock's.
+    // Also note that the append lock has a higher priority but doesn't revoke the replace lock as it can coexist.
+    final TaskLock appendLock = validator.expectLockCreated(
+        TaskLockType.APPEND,
+        Intervals.of("2017-05-01/2017-06-01"),
+        HIGH_PRIORITY
+    );
+
+    validator.expectLockNotGranted(
+        TaskLockType.APPEND,
+        Intervals.of("2016-05-01/2017-06-01"),
+        MEDIUM_PRIORITY
+    );
+
+    validator.expectActiveLocks(theLock, appendLock);
+    validator.expectRevokedLocks();
+  }
+
+  @Test
+  public void testReplaceLockCanRevokeAllIncompatible() throws Exception
+  {
+    final TaskLock appendLock0 = validator.expectLockCreated(
+        TaskLockType.APPEND,
+        Intervals.of("2016/2019"),
+        HIGH_PRIORITY
+    );
+    validator.revokeLock(appendLock0);
+
+    final TaskLock appendLock1 = validator.expectLockCreated(
+        TaskLockType.APPEND,
+        Intervals.of("2017-02-01/2017-03-01"),
+        HIGH_PRIORITY
+    );
+
+    final TaskLock appendLock2 = validator.expectLockCreated(
+        TaskLockType.APPEND,
+        Intervals.of("2017-09-01/2018-03-01"),
+        LOW_PRIORITY
+    );
+
+    final TaskLock exclusiveLock = validator.expectLockCreated(
+        TaskLockType.EXCLUSIVE,
+        Intervals.of("2017-05-01/2017-06-01"),
+        LOW_PRIORITY
+    );
+
+    final TaskLock replaceLock = validator.expectLockCreated(
+        TaskLockType.REPLACE,
+        Intervals.of("2016-09-01/2017-03-01"),
+        LOW_PRIORITY
+    );
+
+    final TaskLock sharedLock = validator.expectLockCreated(
+        TaskLockType.SHARED,
+        Intervals.of("2017-04-01/2017-05-01"),
+        LOW_PRIORITY
+    );
+
+    validator.expectActiveLocks(appendLock1, appendLock2, exclusiveLock, replaceLock, sharedLock);
+    validator.expectRevokedLocks(appendLock0);
+
+    final TaskLock theLock = validator.expectLockCreated(
+        TaskLockType.REPLACE,
+        Intervals.of("2017/2018"),
+        MEDIUM_PRIORITY
+    );
+
+    validator.expectActiveLocks(appendLock1, theLock);
+    validator.expectRevokedLocks(appendLock0, appendLock2, exclusiveLock, replaceLock, sharedLock);
+  }
+
   @Test
   public void testGetLockedIntervalsForRevokedLocks() throws Exception
   {
@@ -1360,62 +1706,112 @@ public class TaskLockboxTest
   @Test
   public void testConflictsWithOverlappingSharedLocks() throws Exception
   {
-    final List<Task> tasks = new ArrayList<>();
-
-    final Task conflictingTask = NoopTask.create(10);
-    tasks.add(conflictingTask);
-    lockbox.add(conflictingTask);
-    taskStorage.insert(conflictingTask, TaskStatus.running(conflictingTask.getId()));
-    TaskLock conflictingLock = tryTimeChunkLock(
+    TaskLock conflictingLock = validator.expectLockCreated(
         TaskLockType.SHARED,
-        conflictingTask,
-        Intervals.of("2023-05-01/2023-06-01")
-    ).getTaskLock();
-    Assert.assertNotNull(conflictingLock);
-    Assert.assertFalse(conflictingLock.isRevoked());
-
-    final Task floorTask = NoopTask.create(10);
-    tasks.add(floorTask);
-    lockbox.add(floorTask);
-    taskStorage.insert(floorTask, TaskStatus.running(floorTask.getId()));
-    TaskLock floorLock = tryTimeChunkLock(
+        Intervals.of("2023-05-01/2023-06-01"),
+        MEDIUM_PRIORITY
+    );
+
+    TaskLock floorLock = validator.expectLockCreated(
         TaskLockType.SHARED,
-        floorTask,
-        Intervals.of("2023-05-26/2023-05-27")
-    ).getTaskLock();
-    Assert.assertNotNull(floorLock);
-    Assert.assertFalse(floorLock.isRevoked());
-
-    final Task rightOverlapTask = NoopTask.create(10);
-    tasks.add(rightOverlapTask);
-    lockbox.add(rightOverlapTask);
-    taskStorage.insert(rightOverlapTask, TaskStatus.running(rightOverlapTask.getId()));
-    TaskLock rightOverlapLock = tryTimeChunkLock(
-        TaskLockType.EXCLUSIVE,
-        rightOverlapTask,
-        Intervals.of("2023-05-28/2023-06-03")
-    ).getTaskLock();
-    Assert.assertNull(rightOverlapLock);
+        Intervals.of("2023-05-26/2023-05-27"),
+        MEDIUM_PRIORITY
+    );
 
-    Assert.assertEquals(
-        ImmutableSet.of(conflictingLock, floorLock),
-        getAllActiveLocks(tasks)
+    validator.expectLockNotGranted(
+        TaskLockType.EXCLUSIVE,
+        Intervals.of("2023-05-28/2023-06-03"),
+        MEDIUM_PRIORITY
     );
-  }
 
-  private Set<TaskLock> getAllActiveLocks(List<Task> tasks)
-  {
-    return tasks.stream()
-                .flatMap(task -> taskStorage.getLocks(task.getId()).stream())
-                .filter(taskLock -> !taskLock.isRevoked())
-                .collect(Collectors.toSet());
+    validator.expectActiveLocks(conflictingLock, floorLock);
   }
 
-  private Set<TaskLock> getAllLocks(List<Task> tasks)
+
+  private class TaskLockboxValidator
   {
-    return tasks.stream()
-                .flatMap(task -> taskStorage.getLocks(task.getId()).stream())
-                .collect(Collectors.toSet());
+
+    private final List<Task> tasks;
+    private final TaskLockbox lockbox;
+    private final TaskStorage taskStorage;
+    private final Map<TaskLock, String> lockToTaskIdMap;
+
+    TaskLockboxValidator(TaskLockbox lockbox, TaskStorage taskStorage)
+    {
+      lockToTaskIdMap = new HashMap<>();
+      tasks = new ArrayList<>();
+      this.lockbox = lockbox;
+      this.taskStorage = taskStorage;
+    }
+
+    public TaskLock expectLockCreated(TaskLockType type, Interval interval, int priority) throws Exception
+    {
+      final TaskLock lock = tryTaskLock(type, interval, priority);
+      Assert.assertNotNull(lock);
+      Assert.assertFalse(lock.isRevoked());
+      return lock;
+    }
+
+    public void revokeLock(TaskLock lock)
+    {
+      lockbox.revokeLock(lockToTaskIdMap.get(lock), lock);
+    }
+
+    public void expectLockNotGranted(TaskLockType type, Interval interval, int priority) throws Exception
+    {
+      final TaskLock lock = tryTaskLock(type, interval, priority);
+      Assert.assertNull(lock);
+    }
+
+    public void expectRevokedLocks(TaskLock... locks)
+    {
+      final Set<TaskLock> allLocks = getAllLocks();
+      final Set<TaskLock> activeLocks = getAllActiveLocks();
+      Assert.assertEquals(allLocks.size() - activeLocks.size(), locks.length);
+      for (TaskLock lock : locks) {
+        Assert.assertTrue(allLocks.contains(lock.revokedCopy()));
+        Assert.assertFalse(activeLocks.contains(lock));
+      }
+    }
+
+    public void expectActiveLocks(TaskLock... locks)
+    {
+      final Set<TaskLock> allLocks = getAllLocks();
+      final Set<TaskLock> activeLocks = getAllActiveLocks();
+      Assert.assertEquals(activeLocks.size(), locks.length);
+      for (TaskLock lock : locks) {
+        Assert.assertTrue(allLocks.contains(lock));
+        Assert.assertTrue(activeLocks.contains(lock));
+      }
+    }
+
+    private TaskLock tryTaskLock(TaskLockType type, Interval interval, int priority) throws Exception
+    {
+      final Task task = NoopTask.create(priority);
+      tasks.add(task);
+      lockbox.add(task);
+      taskStorage.insert(task, TaskStatus.running(task.getId()));
+      TaskLock lock = tryTimeChunkLock(type, task, interval).getTaskLock();
+      if (lock != null) {
+        lockToTaskIdMap.put(lock, task.getId());
+      }
+      return lock;
+    }
+
+    private Set<TaskLock> getAllActiveLocks()
+    {
+      return tasks.stream()
+                  .flatMap(task -> taskStorage.getLocks(task.getId()).stream())
+                  .filter(taskLock -> !taskLock.isRevoked())
+                  .collect(Collectors.toSet());
+    }
+
+    private Set<TaskLock> getAllLocks()
+    {
+      return tasks.stream()
+                  .flatMap(task -> taskStorage.getLocks(task.getId()).stream())
+                  .collect(Collectors.toSet());
+    }
   }
 
   private static class IntervalLockWithoutPriority extends TimeChunkLock


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org