You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by "imply-cheddar (via GitHub)" <gi...@apache.org> on 2023/05/11 23:42:16 UTC

[GitHub] [druid] imply-cheddar commented on a diff in pull request #14258: Add new lock types: APPEND and REPLACE

imply-cheddar commented on code in PR #14258:
URL: https://github.com/apache/druid/pull/14258#discussion_r1191752099


##########
indexing-service/src/main/java/org/apache/druid/indexing/common/TaskLockType.java:
##########
@@ -21,6 +21,8 @@
 
 public enum TaskLockType
 {
+  REPLACE, // Meant to be used only with Replacing jobs and timechunk locking
+  APPEND, // Meant to be used only with Appending jobs and timechunk locking

Review Comment:
   Probably makes sense to just add javadoc explanations for each of the lock types at this point.  3 of them have comments to try to explain them, so just make it javadoc and perhaps add more words.



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java:
##########
@@ -1239,43 +1262,190 @@ Map<String, NavigableMap<DateTime, SortedMap<Interval, List<TaskLockPosse>>>> ge
   }
 
   /**
-   * Check if all lockPosses are either shared
-   * OR of lower priority
-   * OR are revoked non-shared locks if their priorities are greater than or equal to the provided priority
-   * @param lockPosses conflicting task lock posses to be checked
-   * @param priority priority of the lock to be acquired
-   * @return true if the condititons are met
+   * Check if the lock for a given request can coexist with a given set of conflicting posses without any revocation.
+   * @param conflictPosses conflict lock posses
+   * @param request lock request
+   * @return true iff the lock can coexist with all its conflicting locks
    */
-  private static boolean areAllEqualOrHigherPriorityLocksSharedOrRevoked(List<TaskLockPosse> lockPosses, int priority)
+  private boolean canLockCoexist(List<TaskLockPosse> conflictPosses, LockRequest request)
   {
-    return lockPosses.stream()
-                     .filter(taskLockPosse -> taskLockPosse.getTaskLock().getNonNullPriority() >= priority)
-                     .allMatch(taskLockPosse -> taskLockPosse.getTaskLock().getType().equals(TaskLockType.SHARED)
-                                                || taskLockPosse.getTaskLock().isRevoked());
+    switch (request.getType()) {
+      case APPEND:
+        return canAppendLockCoexist(conflictPosses, request);
+      case REPLACE:
+        return canReplaceLockCoexist(conflictPosses, request);
+      case SHARED:
+        return canSharedLockCoexist(conflictPosses);
+      case EXCLUSIVE:
+        return canExclusiveLockCoexist(conflictPosses);
+      default:
+        return false;
+    }

Review Comment:
   Enumerations can have methods, given that you've basically created an independent method for each of the enumerations, you might as well add an abstract method to `TaskLockType` and call that.  Let the JVM do the switching for you.



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java:
##########
@@ -1239,43 +1262,190 @@ Map<String, NavigableMap<DateTime, SortedMap<Interval, List<TaskLockPosse>>>> ge
   }
 
   /**
-   * Check if all lockPosses are either shared
-   * OR of lower priority
-   * OR are revoked non-shared locks if their priorities are greater than or equal to the provided priority
-   * @param lockPosses conflicting task lock posses to be checked
-   * @param priority priority of the lock to be acquired
-   * @return true if the condititons are met
+   * Check if the lock for a given request can coexist with a given set of conflicting posses without any revocation.
+   * @param conflictPosses conflict lock posses
+   * @param request lock request
+   * @return true iff the lock can coexist with all its conflicting locks
    */
-  private static boolean areAllEqualOrHigherPriorityLocksSharedOrRevoked(List<TaskLockPosse> lockPosses, int priority)
+  private boolean canLockCoexist(List<TaskLockPosse> conflictPosses, LockRequest request)
   {
-    return lockPosses.stream()
-                     .filter(taskLockPosse -> taskLockPosse.getTaskLock().getNonNullPriority() >= priority)
-                     .allMatch(taskLockPosse -> taskLockPosse.getTaskLock().getType().equals(TaskLockType.SHARED)
-                                                || taskLockPosse.getTaskLock().isRevoked());
+    switch (request.getType()) {
+      case APPEND:
+        return canAppendLockCoexist(conflictPosses, request);
+      case REPLACE:
+        return canReplaceLockCoexist(conflictPosses, request);
+      case SHARED:
+        return canSharedLockCoexist(conflictPosses);
+      case EXCLUSIVE:
+        return canExclusiveLockCoexist(conflictPosses);
+      default:
+        return false;
+    }
   }
 
   /**
-   * Revokes all non-shared locks with priorities lower than the provided priority
-   * @param lockPosses conflicting task lock posses which may be revoked
-   * @param priority priority of the lock to be acquired
+   * Check if an APPEND lock can coexist with a given set of conflicting posses.
+   * An APPEND lock can coexist with any number of other APPEND locks
+   *    OR with at most one REPLACE lock over an interval which encloes this request.
+   * @param conflictPosses conflicting lock posses
+   * @param appendRequest append lock request
+   * @return true iff append lock can coexist with all its conflicting locks
    */
-  private void revokeAllLowerPriorityNonSharedLocks(List<TaskLockPosse> lockPosses, int priority)
+  private boolean canAppendLockCoexist(List<TaskLockPosse> conflictPosses, LockRequest appendRequest)
   {
-    lockPosses.stream()
-              .filter(taskLockPosse -> !TaskLockType.SHARED.equals(taskLockPosse.getTaskLock().getType()))
-              .filter(taskLockPosse -> taskLockPosse.getTaskLock().getNonNullPriority() < priority)
-              .forEach(this::revokeLock);
+    TaskLock replaceLock = null;
+    for (TaskLockPosse posse : conflictPosses) {
+      if (posse.getTaskLock().isRevoked()) {
+        continue;
+      }
+      if (posse.getTaskLock().getType().equals(TaskLockType.EXCLUSIVE)
+          || posse.getTaskLock().getType().equals(TaskLockType.SHARED)) {
+        return false;
+      }
+      if (posse.getTaskLock().getType().equals(TaskLockType.REPLACE)) {
+        if (replaceLock != null) {
+          return false;
+        }
+        replaceLock = posse.getTaskLock();
+        if (!replaceLock.getInterval().contains(appendRequest.getInterval())) {
+          return false;
+        }
+      }
+    }
+    return true;
   }
 
-  private static boolean isAllRevocable(List<TaskLockPosse> lockPosses, int tryLockPriority)
+  /**
+   * Check if a REPLACE lock can coexist with a given set of conflicting posses.
+   * A REPLACE lock can coexist with any number of other APPEND locks and revoked locks
+   * @param conflictPosses conflicting lock posses
+   * @param replaceLock replace lock request
+   * @return true iff replace lock can coexist with all its conflicting locks
+   */
+  private boolean canReplaceLockCoexist(List<TaskLockPosse> conflictPosses, LockRequest replaceLock)
   {
-    return lockPosses.stream().allMatch(taskLockPosse -> isRevocable(taskLockPosse, tryLockPriority));
+    for (TaskLockPosse posse : conflictPosses) {
+      if (posse.getTaskLock().isRevoked()) {
+        continue;
+      }
+      if (posse.getTaskLock().getType().equals(TaskLockType.EXCLUSIVE)
+          || posse.getTaskLock().getType().equals(TaskLockType.SHARED)
+          || posse.getTaskLock().getType().equals(TaskLockType.REPLACE)) {
+        return false;
+      }
+      if (posse.getTaskLock().getType().equals(TaskLockType.APPEND)
+          && !replaceLock.getInterval().contains(posse.getTaskLock().getInterval())) {
+        return false;
+      }
+    }
+    return true;
   }
 
-  private static boolean isRevocable(TaskLockPosse lockPosse, int tryLockPriority)
+  /**
+   * Check if a SHARED lock can coexist with a given set of conflicting posses.
+   * A SHARED lock can coexist with any number of other active SHARED locks
+   * @param conflictPosses conflicting lock posses
+   * @return true iff shared lock can coexist with all its conflicting locks
+   */
+  private boolean canSharedLockCoexist(List<TaskLockPosse> conflictPosses)
   {
-    final TaskLock existingLock = lockPosse.getTaskLock();
-    return existingLock.isRevoked() || existingLock.getNonNullPriority() < tryLockPriority;
+    for (TaskLockPosse posse : conflictPosses) {
+      if (posse.getTaskLock().isRevoked()) {
+        continue;
+      }
+      if (posse.getTaskLock().getType().equals(TaskLockType.EXCLUSIVE)
+          || posse.getTaskLock().getType().equals(TaskLockType.APPEND)
+          || posse.getTaskLock().getType().equals(TaskLockType.REPLACE)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Check if an EXCLUSIVE lock can coexist with a given set of conflicting posses.
+   * An EXCLUSIVE lock cannot coexist with any other overlapping active locks
+   * @param conflictPosses conflicting lock posses
+   * @return true iff the exclusive lock can coexist with all its conflicting locks
+   */
+  private boolean canExclusiveLockCoexist(List<TaskLockPosse> conflictPosses)
+  {
+    for (TaskLockPosse posse : conflictPosses) {
+      if (posse.getTaskLock().isRevoked()) {
+        continue;
+      }
+      return false;
+    }
+    return true;
+  }
+
+
+  /**
+   * Verify if each incompatible active lock is revokable. If yes, revoke all of them.
+   * - EXCLUSIVE locks are incompatible with every other conflicting lock
+   * - SHARED locks are incompatible with conflicting locks of every other type
+   * - REPLACE locks are incompatible with every conflicting lock which is not (APPEND and enclosed) within its interval
+   * - APPEND locks are incompatible with every EXCLUSIVE and SHARED lock.

Review Comment:
   As I get to these too, I think that this definition/explanation deserves to be on the javadoc for the lock type rahter than hidden here on this one method.



##########
indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java:
##########
@@ -1272,6 +1276,627 @@ public void testGetLockedIntervalsForEqualPriorityTask() throws Exception
     );
   }
 
+  @Test
+  public void testExclusiveLockCompatibility() throws Exception
+  {
+    final List<Task> tasks = new ArrayList<>();
+
+    final TaskLock theLock = insertTaskLock(
+        TaskLockType.EXCLUSIVE,
+        Intervals.of("2017/2018"),
+        MEDIUM_PRIORITY,
+        tasks
+    );
+    Assert.assertNotNull(theLock);
+    Assert.assertFalse(theLock.isRevoked());
+
+    // Another exclusive lock cannot be created for an overlapping interval
+    Assert.assertNull(
+        insertTaskLock(

Review Comment:
   Instead of `Assert.assertNull` I think the test would be easier to decipher the intent if you were to create a helper method that is like
   
   ```
   assertLockNotGranted()
   ```
   
   It can do a null check inside of itself, but when reading the test, it becomes clear what you are trying to test for without the reader having to figure out how to interpret `assertNull`



##########
indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java:
##########
@@ -1272,6 +1276,627 @@ public void testGetLockedIntervalsForEqualPriorityTask() throws Exception
     );
   }
 
+  @Test
+  public void testExclusiveLockCompatibility() throws Exception
+  {
+    final List<Task> tasks = new ArrayList<>();
+
+    final TaskLock theLock = insertTaskLock(
+        TaskLockType.EXCLUSIVE,
+        Intervals.of("2017/2018"),
+        MEDIUM_PRIORITY,
+        tasks
+    );
+    Assert.assertNotNull(theLock);
+    Assert.assertFalse(theLock.isRevoked());
+
+    // Another exclusive lock cannot be created for an overlapping interval
+    Assert.assertNull(
+        insertTaskLock(
+            TaskLockType.EXCLUSIVE,
+            Intervals.of("2017-05-01/2017-06-01"),
+            MEDIUM_PRIORITY,
+            tasks
+        )
+    );
+
+    // A shared lock cannot be created for an overlapping interval
+    Assert.assertNull(
+        insertTaskLock(
+            TaskLockType.SHARED,
+            Intervals.of("2016/2019"),
+            MEDIUM_PRIORITY,
+            tasks
+        )
+    );
+
+    // A replace lock cannot be created for an overlapping interval
+    Assert.assertNull(
+        insertTaskLock(
+            TaskLockType.REPLACE,
+            Intervals.of("2017/2018"),
+            MEDIUM_PRIORITY,
+            tasks
+        )
+    );
+
+    // An append lock cannot be created for an overlapping interval
+    Assert.assertNull(
+        insertTaskLock(
+            TaskLockType.APPEND,
+            Intervals.of("2017-05-01/2018-05-01"),
+            MEDIUM_PRIORITY,
+            tasks
+        )
+    );
+
+    final Set<TaskLock> expectedLocks = ImmutableSet.of(
+        theLock
+    );
+    Assert.assertEquals(expectedLocks, getAllLocks(tasks));
+  }
+
+  @Test
+  public void testExclusiveLockCanRevokeAllIncompatible() throws Exception
+  {
+    final List<Task> tasks = new ArrayList<>();
+
+    // Revoked shared lock of higher priority -> revoked locks are not incompatible
+    final TaskLock sharedLock = insertTaskLock(
+        TaskLockType.SHARED,
+        Intervals.of("2016/2019"),
+        HIGH_PRIORITY,
+        tasks
+    );
+    Assert.assertNotNull(sharedLock);
+    lockbox.revokeLock(tasks.get(0).getId(), sharedLock);
+
+    // Active Exclusive lock of lower priority -> will be revoked
+    final TaskLock exclusiveLock = insertTaskLock(
+        TaskLockType.EXCLUSIVE,
+        Intervals.of("2017-01-01/2017-02-01"),
+        LOW_PRIORITY,
+        tasks
+    );
+    Assert.assertNotNull(exclusiveLock);
+    Assert.assertFalse(exclusiveLock.isRevoked());
+
+    // Active replace lock of lower priority -> will be revoked
+    final TaskLock replaceLock = insertTaskLock(
+        TaskLockType.REPLACE,
+        Intervals.of("2017-07-01/2018-01-01"),
+        LOW_PRIORITY,
+        tasks
+    );
+    Assert.assertNotNull(replaceLock);
+    Assert.assertFalse(replaceLock.isRevoked());

Review Comment:
   This also seems like it should be fine, the shared lock was revoked?
   
   Same mis-reading here.  If you are taking the time to have comments, please always make sure that they are in sync.  An incorrect comment is *significantly* worse than no comment at all as it creates confusion and encourages people to assume they know what the code is doing without actually reading it.



##########
indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java:
##########
@@ -1403,6 +1987,57 @@ public void testConflictsWithOverlappingSharedLocks() throws Exception
     );
   }
 
+  @Test
+  public void testFailedToReacquireTaskLock() throws Exception
+  {
+    // Tasks to be failed have a group id with the substring "FailingLockAcquisition"
+    // Please refer to NullLockPosseTaskLockbox
+    final Task taskWithFailingLockAcquisition0 = NoopTask.withGroupId("FailingLockAcquisition");
+    final Task taskWithFailingLockAcquisition1 = NoopTask.withGroupId("FailingLockAcquisition");
+    final Task taskWithSuccessfulLockAcquisition = NoopTask.create();
+    taskStorage.insert(taskWithFailingLockAcquisition0, TaskStatus.running(taskWithFailingLockAcquisition0.getId()));
+    taskStorage.insert(taskWithFailingLockAcquisition1, TaskStatus.running(taskWithFailingLockAcquisition1.getId()));
+    taskStorage.insert(taskWithSuccessfulLockAcquisition, TaskStatus.running(taskWithSuccessfulLockAcquisition.getId()));
+
+    TaskLockbox testLockbox = new NullLockPosseTaskLockbox(taskStorage, metadataStorageCoordinator);
+    testLockbox.add(taskWithFailingLockAcquisition0);
+    testLockbox.add(taskWithFailingLockAcquisition1);
+    testLockbox.add(taskWithSuccessfulLockAcquisition);
+
+    testLockbox.tryLock(taskWithFailingLockAcquisition0,
+                        new TimeChunkLockRequest(TaskLockType.EXCLUSIVE,
+                                                 taskWithFailingLockAcquisition0,
+                                                 Intervals.of("2017-07-01/2017-08-01"),
+                                                 null
+                        )
+    );
+
+    testLockbox.tryLock(taskWithSuccessfulLockAcquisition,
+                        new TimeChunkLockRequest(TaskLockType.EXCLUSIVE,
+                                                 taskWithSuccessfulLockAcquisition,
+                                                 Intervals.of("2017-07-01/2017-08-01"),
+                                                 null
+                        )
+    );
+
+    Assert.assertEquals(3, taskStorage.getActiveTasks().size());
+
+    // The tasks must be marked for failure
+    TaskLockboxSyncResult result = testLockbox.syncFromStorage();
+    Assert.assertEquals(ImmutableSet.of(taskWithFailingLockAcquisition0, taskWithFailingLockAcquisition1),
+                        result.getTasksToFail());
+  }

Review Comment:
   Did this test change or just move?  If it just moved, why move it?  Please move it back



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java:
##########
@@ -1097,6 +1099,27 @@ public void remove(final Task task)
     }
   }
 
+  public Set<TaskLock> getAllExclusiveLocksForDatasource(final String datasource)

Review Comment:
   I don't see this method being used?  Why was it created?



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java:
##########
@@ -1239,43 +1262,190 @@ Map<String, NavigableMap<DateTime, SortedMap<Interval, List<TaskLockPosse>>>> ge
   }
 
   /**
-   * Check if all lockPosses are either shared
-   * OR of lower priority
-   * OR are revoked non-shared locks if their priorities are greater than or equal to the provided priority
-   * @param lockPosses conflicting task lock posses to be checked
-   * @param priority priority of the lock to be acquired
-   * @return true if the condititons are met
+   * Check if the lock for a given request can coexist with a given set of conflicting posses without any revocation.
+   * @param conflictPosses conflict lock posses
+   * @param request lock request
+   * @return true iff the lock can coexist with all its conflicting locks
    */
-  private static boolean areAllEqualOrHigherPriorityLocksSharedOrRevoked(List<TaskLockPosse> lockPosses, int priority)
+  private boolean canLockCoexist(List<TaskLockPosse> conflictPosses, LockRequest request)
   {
-    return lockPosses.stream()
-                     .filter(taskLockPosse -> taskLockPosse.getTaskLock().getNonNullPriority() >= priority)
-                     .allMatch(taskLockPosse -> taskLockPosse.getTaskLock().getType().equals(TaskLockType.SHARED)
-                                                || taskLockPosse.getTaskLock().isRevoked());
+    switch (request.getType()) {
+      case APPEND:
+        return canAppendLockCoexist(conflictPosses, request);
+      case REPLACE:
+        return canReplaceLockCoexist(conflictPosses, request);
+      case SHARED:
+        return canSharedLockCoexist(conflictPosses);
+      case EXCLUSIVE:
+        return canExclusiveLockCoexist(conflictPosses);
+      default:
+        return false;
+    }
   }
 
   /**
-   * Revokes all non-shared locks with priorities lower than the provided priority
-   * @param lockPosses conflicting task lock posses which may be revoked
-   * @param priority priority of the lock to be acquired
+   * Check if an APPEND lock can coexist with a given set of conflicting posses.
+   * An APPEND lock can coexist with any number of other APPEND locks
+   *    OR with at most one REPLACE lock over an interval which encloes this request.
+   * @param conflictPosses conflicting lock posses
+   * @param appendRequest append lock request
+   * @return true iff append lock can coexist with all its conflicting locks
    */
-  private void revokeAllLowerPriorityNonSharedLocks(List<TaskLockPosse> lockPosses, int priority)
+  private boolean canAppendLockCoexist(List<TaskLockPosse> conflictPosses, LockRequest appendRequest)
   {
-    lockPosses.stream()
-              .filter(taskLockPosse -> !TaskLockType.SHARED.equals(taskLockPosse.getTaskLock().getType()))
-              .filter(taskLockPosse -> taskLockPosse.getTaskLock().getNonNullPriority() < priority)
-              .forEach(this::revokeLock);
+    TaskLock replaceLock = null;
+    for (TaskLockPosse posse : conflictPosses) {
+      if (posse.getTaskLock().isRevoked()) {
+        continue;
+      }
+      if (posse.getTaskLock().getType().equals(TaskLockType.EXCLUSIVE)
+          || posse.getTaskLock().getType().equals(TaskLockType.SHARED)) {
+        return false;
+      }
+      if (posse.getTaskLock().getType().equals(TaskLockType.REPLACE)) {
+        if (replaceLock != null) {
+          return false;
+        }
+        replaceLock = posse.getTaskLock();
+        if (!replaceLock.getInterval().contains(appendRequest.getInterval())) {
+          return false;
+        }
+      }
+    }
+    return true;
   }
 
-  private static boolean isAllRevocable(List<TaskLockPosse> lockPosses, int tryLockPriority)
+  /**
+   * Check if a REPLACE lock can coexist with a given set of conflicting posses.
+   * A REPLACE lock can coexist with any number of other APPEND locks and revoked locks
+   * @param conflictPosses conflicting lock posses
+   * @param replaceLock replace lock request
+   * @return true iff replace lock can coexist with all its conflicting locks
+   */
+  private boolean canReplaceLockCoexist(List<TaskLockPosse> conflictPosses, LockRequest replaceLock)
   {
-    return lockPosses.stream().allMatch(taskLockPosse -> isRevocable(taskLockPosse, tryLockPriority));
+    for (TaskLockPosse posse : conflictPosses) {
+      if (posse.getTaskLock().isRevoked()) {
+        continue;
+      }
+      if (posse.getTaskLock().getType().equals(TaskLockType.EXCLUSIVE)
+          || posse.getTaskLock().getType().equals(TaskLockType.SHARED)
+          || posse.getTaskLock().getType().equals(TaskLockType.REPLACE)) {
+        return false;
+      }
+      if (posse.getTaskLock().getType().equals(TaskLockType.APPEND)
+          && !replaceLock.getInterval().contains(posse.getTaskLock().getInterval())) {
+        return false;
+      }
+    }
+    return true;
   }
 
-  private static boolean isRevocable(TaskLockPosse lockPosse, int tryLockPriority)
+  /**
+   * Check if a SHARED lock can coexist with a given set of conflicting posses.
+   * A SHARED lock can coexist with any number of other active SHARED locks
+   * @param conflictPosses conflicting lock posses
+   * @return true iff shared lock can coexist with all its conflicting locks
+   */
+  private boolean canSharedLockCoexist(List<TaskLockPosse> conflictPosses)
   {
-    final TaskLock existingLock = lockPosse.getTaskLock();
-    return existingLock.isRevoked() || existingLock.getNonNullPriority() < tryLockPriority;
+    for (TaskLockPosse posse : conflictPosses) {
+      if (posse.getTaskLock().isRevoked()) {
+        continue;
+      }
+      if (posse.getTaskLock().getType().equals(TaskLockType.EXCLUSIVE)
+          || posse.getTaskLock().getType().equals(TaskLockType.APPEND)
+          || posse.getTaskLock().getType().equals(TaskLockType.REPLACE)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Check if an EXCLUSIVE lock can coexist with a given set of conflicting posses.
+   * An EXCLUSIVE lock cannot coexist with any other overlapping active locks
+   * @param conflictPosses conflicting lock posses
+   * @return true iff the exclusive lock can coexist with all its conflicting locks
+   */
+  private boolean canExclusiveLockCoexist(List<TaskLockPosse> conflictPosses)
+  {
+    for (TaskLockPosse posse : conflictPosses) {
+      if (posse.getTaskLock().isRevoked()) {
+        continue;
+      }
+      return false;
+    }
+    return true;
+  }
+
+
+  /**
+   * Verify if each incompatible active lock is revokable. If yes, revoke all of them.

Review Comment:
   > If yes, revoke all of them.
   
   So, if one lock is revokable, then revoke all locks regardless of whether they are revokable or not?  Or, what is that trying to say?



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java:
##########
@@ -1239,43 +1262,190 @@ Map<String, NavigableMap<DateTime, SortedMap<Interval, List<TaskLockPosse>>>> ge
   }
 
   /**
-   * Check if all lockPosses are either shared
-   * OR of lower priority
-   * OR are revoked non-shared locks if their priorities are greater than or equal to the provided priority
-   * @param lockPosses conflicting task lock posses to be checked
-   * @param priority priority of the lock to be acquired
-   * @return true if the condititons are met
+   * Check if the lock for a given request can coexist with a given set of conflicting posses without any revocation.
+   * @param conflictPosses conflict lock posses
+   * @param request lock request
+   * @return true iff the lock can coexist with all its conflicting locks
    */
-  private static boolean areAllEqualOrHigherPriorityLocksSharedOrRevoked(List<TaskLockPosse> lockPosses, int priority)
+  private boolean canLockCoexist(List<TaskLockPosse> conflictPosses, LockRequest request)
   {
-    return lockPosses.stream()
-                     .filter(taskLockPosse -> taskLockPosse.getTaskLock().getNonNullPriority() >= priority)
-                     .allMatch(taskLockPosse -> taskLockPosse.getTaskLock().getType().equals(TaskLockType.SHARED)
-                                                || taskLockPosse.getTaskLock().isRevoked());
+    switch (request.getType()) {
+      case APPEND:
+        return canAppendLockCoexist(conflictPosses, request);
+      case REPLACE:
+        return canReplaceLockCoexist(conflictPosses, request);
+      case SHARED:
+        return canSharedLockCoexist(conflictPosses);
+      case EXCLUSIVE:
+        return canExclusiveLockCoexist(conflictPosses);
+      default:
+        return false;
+    }
   }
 
   /**
-   * Revokes all non-shared locks with priorities lower than the provided priority
-   * @param lockPosses conflicting task lock posses which may be revoked
-   * @param priority priority of the lock to be acquired
+   * Check if an APPEND lock can coexist with a given set of conflicting posses.
+   * An APPEND lock can coexist with any number of other APPEND locks
+   *    OR with at most one REPLACE lock over an interval which encloes this request.
+   * @param conflictPosses conflicting lock posses
+   * @param appendRequest append lock request
+   * @return true iff append lock can coexist with all its conflicting locks
    */
-  private void revokeAllLowerPriorityNonSharedLocks(List<TaskLockPosse> lockPosses, int priority)
+  private boolean canAppendLockCoexist(List<TaskLockPosse> conflictPosses, LockRequest appendRequest)
   {
-    lockPosses.stream()
-              .filter(taskLockPosse -> !TaskLockType.SHARED.equals(taskLockPosse.getTaskLock().getType()))
-              .filter(taskLockPosse -> taskLockPosse.getTaskLock().getNonNullPriority() < priority)
-              .forEach(this::revokeLock);
+    TaskLock replaceLock = null;
+    for (TaskLockPosse posse : conflictPosses) {
+      if (posse.getTaskLock().isRevoked()) {
+        continue;
+      }
+      if (posse.getTaskLock().getType().equals(TaskLockType.EXCLUSIVE)
+          || posse.getTaskLock().getType().equals(TaskLockType.SHARED)) {
+        return false;
+      }
+      if (posse.getTaskLock().getType().equals(TaskLockType.REPLACE)) {
+        if (replaceLock != null) {
+          return false;
+        }
+        replaceLock = posse.getTaskLock();
+        if (!replaceLock.getInterval().contains(appendRequest.getInterval())) {
+          return false;
+        }
+      }
+    }
+    return true;
   }
 
-  private static boolean isAllRevocable(List<TaskLockPosse> lockPosses, int tryLockPriority)
+  /**
+   * Check if a REPLACE lock can coexist with a given set of conflicting posses.
+   * A REPLACE lock can coexist with any number of other APPEND locks and revoked locks
+   * @param conflictPosses conflicting lock posses
+   * @param replaceLock replace lock request
+   * @return true iff replace lock can coexist with all its conflicting locks
+   */
+  private boolean canReplaceLockCoexist(List<TaskLockPosse> conflictPosses, LockRequest replaceLock)
   {
-    return lockPosses.stream().allMatch(taskLockPosse -> isRevocable(taskLockPosse, tryLockPriority));
+    for (TaskLockPosse posse : conflictPosses) {
+      if (posse.getTaskLock().isRevoked()) {
+        continue;
+      }
+      if (posse.getTaskLock().getType().equals(TaskLockType.EXCLUSIVE)
+          || posse.getTaskLock().getType().equals(TaskLockType.SHARED)
+          || posse.getTaskLock().getType().equals(TaskLockType.REPLACE)) {
+        return false;
+      }
+      if (posse.getTaskLock().getType().equals(TaskLockType.APPEND)
+          && !replaceLock.getInterval().contains(posse.getTaskLock().getInterval())) {
+        return false;
+      }
+    }
+    return true;
   }
 
-  private static boolean isRevocable(TaskLockPosse lockPosse, int tryLockPriority)
+  /**
+   * Check if a SHARED lock can coexist with a given set of conflicting posses.
+   * A SHARED lock can coexist with any number of other active SHARED locks
+   * @param conflictPosses conflicting lock posses
+   * @return true iff shared lock can coexist with all its conflicting locks
+   */
+  private boolean canSharedLockCoexist(List<TaskLockPosse> conflictPosses)
   {
-    final TaskLock existingLock = lockPosse.getTaskLock();
-    return existingLock.isRevoked() || existingLock.getNonNullPriority() < tryLockPriority;
+    for (TaskLockPosse posse : conflictPosses) {
+      if (posse.getTaskLock().isRevoked()) {
+        continue;
+      }
+      if (posse.getTaskLock().getType().equals(TaskLockType.EXCLUSIVE)
+          || posse.getTaskLock().getType().equals(TaskLockType.APPEND)
+          || posse.getTaskLock().getType().equals(TaskLockType.REPLACE)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Check if an EXCLUSIVE lock can coexist with a given set of conflicting posses.
+   * An EXCLUSIVE lock cannot coexist with any other overlapping active locks
+   * @param conflictPosses conflicting lock posses
+   * @return true iff the exclusive lock can coexist with all its conflicting locks
+   */
+  private boolean canExclusiveLockCoexist(List<TaskLockPosse> conflictPosses)
+  {
+    for (TaskLockPosse posse : conflictPosses) {
+      if (posse.getTaskLock().isRevoked()) {
+        continue;
+      }
+      return false;
+    }
+    return true;
+  }
+
+
+  /**
+   * Verify if each incompatible active lock is revokable. If yes, revoke all of them.
+   * - EXCLUSIVE locks are incompatible with every other conflicting lock
+   * - SHARED locks are incompatible with conflicting locks of every other type

Review Comment:
   "SHARED locks are only compatible with other shared locks, incompatible with everything else"
   
   Might be more clear



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java:
##########
@@ -1239,43 +1262,190 @@ Map<String, NavigableMap<DateTime, SortedMap<Interval, List<TaskLockPosse>>>> ge
   }
 
   /**
-   * Check if all lockPosses are either shared
-   * OR of lower priority
-   * OR are revoked non-shared locks if their priorities are greater than or equal to the provided priority
-   * @param lockPosses conflicting task lock posses to be checked
-   * @param priority priority of the lock to be acquired
-   * @return true if the condititons are met
+   * Check if the lock for a given request can coexist with a given set of conflicting posses without any revocation.
+   * @param conflictPosses conflict lock posses
+   * @param request lock request
+   * @return true iff the lock can coexist with all its conflicting locks
    */
-  private static boolean areAllEqualOrHigherPriorityLocksSharedOrRevoked(List<TaskLockPosse> lockPosses, int priority)
+  private boolean canLockCoexist(List<TaskLockPosse> conflictPosses, LockRequest request)
   {
-    return lockPosses.stream()
-                     .filter(taskLockPosse -> taskLockPosse.getTaskLock().getNonNullPriority() >= priority)
-                     .allMatch(taskLockPosse -> taskLockPosse.getTaskLock().getType().equals(TaskLockType.SHARED)
-                                                || taskLockPosse.getTaskLock().isRevoked());
+    switch (request.getType()) {
+      case APPEND:
+        return canAppendLockCoexist(conflictPosses, request);
+      case REPLACE:
+        return canReplaceLockCoexist(conflictPosses, request);
+      case SHARED:
+        return canSharedLockCoexist(conflictPosses);
+      case EXCLUSIVE:
+        return canExclusiveLockCoexist(conflictPosses);
+      default:
+        return false;
+    }
   }
 
   /**
-   * Revokes all non-shared locks with priorities lower than the provided priority
-   * @param lockPosses conflicting task lock posses which may be revoked
-   * @param priority priority of the lock to be acquired
+   * Check if an APPEND lock can coexist with a given set of conflicting posses.
+   * An APPEND lock can coexist with any number of other APPEND locks
+   *    OR with at most one REPLACE lock over an interval which encloes this request.
+   * @param conflictPosses conflicting lock posses
+   * @param appendRequest append lock request
+   * @return true iff append lock can coexist with all its conflicting locks
    */
-  private void revokeAllLowerPriorityNonSharedLocks(List<TaskLockPosse> lockPosses, int priority)
+  private boolean canAppendLockCoexist(List<TaskLockPosse> conflictPosses, LockRequest appendRequest)
   {
-    lockPosses.stream()
-              .filter(taskLockPosse -> !TaskLockType.SHARED.equals(taskLockPosse.getTaskLock().getType()))
-              .filter(taskLockPosse -> taskLockPosse.getTaskLock().getNonNullPriority() < priority)
-              .forEach(this::revokeLock);
+    TaskLock replaceLock = null;
+    for (TaskLockPosse posse : conflictPosses) {
+      if (posse.getTaskLock().isRevoked()) {
+        continue;
+      }
+      if (posse.getTaskLock().getType().equals(TaskLockType.EXCLUSIVE)
+          || posse.getTaskLock().getType().equals(TaskLockType.SHARED)) {
+        return false;
+      }
+      if (posse.getTaskLock().getType().equals(TaskLockType.REPLACE)) {
+        if (replaceLock != null) {
+          return false;
+        }
+        replaceLock = posse.getTaskLock();
+        if (!replaceLock.getInterval().contains(appendRequest.getInterval())) {
+          return false;
+        }
+      }
+    }
+    return true;
   }
 
-  private static boolean isAllRevocable(List<TaskLockPosse> lockPosses, int tryLockPriority)
+  /**
+   * Check if a REPLACE lock can coexist with a given set of conflicting posses.
+   * A REPLACE lock can coexist with any number of other APPEND locks and revoked locks
+   * @param conflictPosses conflicting lock posses
+   * @param replaceLock replace lock request
+   * @return true iff replace lock can coexist with all its conflicting locks
+   */
+  private boolean canReplaceLockCoexist(List<TaskLockPosse> conflictPosses, LockRequest replaceLock)
   {
-    return lockPosses.stream().allMatch(taskLockPosse -> isRevocable(taskLockPosse, tryLockPriority));
+    for (TaskLockPosse posse : conflictPosses) {
+      if (posse.getTaskLock().isRevoked()) {
+        continue;
+      }
+      if (posse.getTaskLock().getType().equals(TaskLockType.EXCLUSIVE)
+          || posse.getTaskLock().getType().equals(TaskLockType.SHARED)
+          || posse.getTaskLock().getType().equals(TaskLockType.REPLACE)) {
+        return false;
+      }
+      if (posse.getTaskLock().getType().equals(TaskLockType.APPEND)
+          && !replaceLock.getInterval().contains(posse.getTaskLock().getInterval())) {
+        return false;
+      }
+    }
+    return true;
   }
 
-  private static boolean isRevocable(TaskLockPosse lockPosse, int tryLockPriority)
+  /**
+   * Check if a SHARED lock can coexist with a given set of conflicting posses.
+   * A SHARED lock can coexist with any number of other active SHARED locks
+   * @param conflictPosses conflicting lock posses
+   * @return true iff shared lock can coexist with all its conflicting locks
+   */
+  private boolean canSharedLockCoexist(List<TaskLockPosse> conflictPosses)
   {
-    final TaskLock existingLock = lockPosse.getTaskLock();
-    return existingLock.isRevoked() || existingLock.getNonNullPriority() < tryLockPriority;
+    for (TaskLockPosse posse : conflictPosses) {
+      if (posse.getTaskLock().isRevoked()) {
+        continue;
+      }
+      if (posse.getTaskLock().getType().equals(TaskLockType.EXCLUSIVE)
+          || posse.getTaskLock().getType().equals(TaskLockType.APPEND)
+          || posse.getTaskLock().getType().equals(TaskLockType.REPLACE)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Check if an EXCLUSIVE lock can coexist with a given set of conflicting posses.
+   * An EXCLUSIVE lock cannot coexist with any other overlapping active locks
+   * @param conflictPosses conflicting lock posses
+   * @return true iff the exclusive lock can coexist with all its conflicting locks
+   */
+  private boolean canExclusiveLockCoexist(List<TaskLockPosse> conflictPosses)
+  {
+    for (TaskLockPosse posse : conflictPosses) {
+      if (posse.getTaskLock().isRevoked()) {
+        continue;
+      }
+      return false;
+    }
+    return true;
+  }
+
+
+  /**
+   * Verify if each incompatible active lock is revokable. If yes, revoke all of them.
+   * - EXCLUSIVE locks are incompatible with every other conflicting lock
+   * - SHARED locks are incompatible with conflicting locks of every other type
+   * - REPLACE locks are incompatible with every conflicting lock which is not (APPEND and enclosed) within its interval
+   * - APPEND locks are incompatible with every EXCLUSIVE and SHARED lock.
+   *   Conflicting REPLACE locks which don't enclose its interval are also incompatible.
+   * @param conflictPosses conflicting lock posses
+   * @param request lock request
+   * @return true iff every incompatible lock is revocable.
+   */
+  private boolean revokeAllIncompatibleActiveLocksIfPossible(
+      List<TaskLockPosse> conflictPosses,
+      LockRequest request
+  )
+  {
+    final int priority = request.getPriority();
+    final TaskLockType type = request.getType();
+    final List<TaskLockPosse> possesToRevoke = new ArrayList<>();
+
+    for (TaskLockPosse posse : conflictPosses) {
+      if (posse.getTaskLock().isRevoked()) {
+        continue;
+      }
+      switch (type) {
+        case EXCLUSIVE:
+          if (posse.getTaskLock().getPriority() >= priority) {
+            return false;
+          }
+          possesToRevoke.add(posse);
+          break;
+        case SHARED:
+          if (!posse.getTaskLock().getType().equals(TaskLockType.SHARED)) {
+            if (posse.getTaskLock().getPriority() >= priority) {
+              return false;
+            }
+            possesToRevoke.add(posse);
+          }
+          break;
+        case REPLACE:
+          if (!(posse.getTaskLock().getType().equals(TaskLockType.APPEND)
+                && request.getInterval().contains(posse.getTaskLock().getInterval()))) {
+            if (posse.getTaskLock().getPriority() >= priority) {
+              return false;
+            }
+            possesToRevoke.add(posse);
+          }
+          break;
+        case APPEND:
+          if (!(posse.getTaskLock().getType().equals(TaskLockType.APPEND)
+                || (posse.getTaskLock().getType().equals(TaskLockType.REPLACE)
+                    && posse.getTaskLock().getInterval().contains(request.getInterval())))) {
+            if (posse.getTaskLock().getPriority() >= priority) {
+              return false;
+            }
+            possesToRevoke.add(posse);
+          }
+          break;
+        default:
+          return false;
+      }

Review Comment:
   I think this could be pretty nicely done as an abstract method on `TaskLockType`.  Perhaps
   
   ```
   List<TaskLockPosse> determineLocksToRevoke(List<TaskLockPosse>, LockRequest request)
   ```
   
   ?



##########
indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java:
##########
@@ -1272,6 +1276,627 @@ public void testGetLockedIntervalsForEqualPriorityTask() throws Exception
     );
   }
 
+  @Test
+  public void testExclusiveLockCompatibility() throws Exception
+  {
+    final List<Task> tasks = new ArrayList<>();
+
+    final TaskLock theLock = insertTaskLock(
+        TaskLockType.EXCLUSIVE,
+        Intervals.of("2017/2018"),
+        MEDIUM_PRIORITY,
+        tasks
+    );
+    Assert.assertNotNull(theLock);
+    Assert.assertFalse(theLock.isRevoked());
+
+    // Another exclusive lock cannot be created for an overlapping interval
+    Assert.assertNull(
+        insertTaskLock(
+            TaskLockType.EXCLUSIVE,
+            Intervals.of("2017-05-01/2017-06-01"),
+            MEDIUM_PRIORITY,
+            tasks
+        )
+    );
+
+    // A shared lock cannot be created for an overlapping interval
+    Assert.assertNull(
+        insertTaskLock(
+            TaskLockType.SHARED,
+            Intervals.of("2016/2019"),
+            MEDIUM_PRIORITY,
+            tasks
+        )
+    );
+
+    // A replace lock cannot be created for an overlapping interval
+    Assert.assertNull(
+        insertTaskLock(
+            TaskLockType.REPLACE,
+            Intervals.of("2017/2018"),
+            MEDIUM_PRIORITY,
+            tasks
+        )
+    );
+
+    // An append lock cannot be created for an overlapping interval
+    Assert.assertNull(
+        insertTaskLock(
+            TaskLockType.APPEND,
+            Intervals.of("2017-05-01/2018-05-01"),
+            MEDIUM_PRIORITY,
+            tasks
+        )
+    );
+
+    final Set<TaskLock> expectedLocks = ImmutableSet.of(
+        theLock
+    );
+    Assert.assertEquals(expectedLocks, getAllLocks(tasks));
+  }
+
+  @Test
+  public void testExclusiveLockCanRevokeAllIncompatible() throws Exception
+  {
+    final List<Task> tasks = new ArrayList<>();
+
+    // Revoked shared lock of higher priority -> revoked locks are not incompatible
+    final TaskLock sharedLock = insertTaskLock(
+        TaskLockType.SHARED,
+        Intervals.of("2016/2019"),
+        HIGH_PRIORITY,
+        tasks
+    );
+    Assert.assertNotNull(sharedLock);
+    lockbox.revokeLock(tasks.get(0).getId(), sharedLock);
+
+    // Active Exclusive lock of lower priority -> will be revoked
+    final TaskLock exclusiveLock = insertTaskLock(
+        TaskLockType.EXCLUSIVE,
+        Intervals.of("2017-01-01/2017-02-01"),
+        LOW_PRIORITY,
+        tasks
+    );
+    Assert.assertNotNull(exclusiveLock);
+    Assert.assertFalse(exclusiveLock.isRevoked());
+
+    // Active replace lock of lower priority -> will be revoked
+    final TaskLock replaceLock = insertTaskLock(
+        TaskLockType.REPLACE,
+        Intervals.of("2017-07-01/2018-01-01"),
+        LOW_PRIORITY,
+        tasks
+    );
+    Assert.assertNotNull(replaceLock);
+    Assert.assertFalse(replaceLock.isRevoked());
+
+    // Active append lock of lower priority -> will be revoked
+    final TaskLock appendLock = insertTaskLock(
+        TaskLockType.APPEND,
+        Intervals.of("2017-09-01/2017-10-01"),
+        LOW_PRIORITY,
+        tasks
+    );
+    Assert.assertNotNull(appendLock);
+    Assert.assertFalse(appendLock.isRevoked());
+
+    final TaskLock theLock = insertTaskLock(
+        TaskLockType.EXCLUSIVE,
+        Intervals.of("2017/2018"),
+        MEDIUM_PRIORITY,
+        tasks
+    );
+    Assert.assertNotNull(theLock);
+    Assert.assertFalse(theLock.isRevoked());
+
+    final Set<TaskLock> expectedLocks = ImmutableSet.of(
+        theLock,
+        sharedLock.revokedCopy(),
+        exclusiveLock.revokedCopy(),
+        appendLock.revokedCopy(),
+        replaceLock.revokedCopy()
+    );
+    Assert.assertEquals(expectedLocks, getAllLocks(tasks));
+
+    final Set<TaskLock> expectedActiveLocks = ImmutableSet.of(
+        theLock
+    );
+    Assert.assertEquals(expectedActiveLocks, getAllActiveLocks(tasks));
+  }
+
+  @Test
+  public void testSharedLockCompatibility() throws Exception
+  {
+    final List<Task> tasks = new ArrayList<>();
+
+    final TaskLock theLock = insertTaskLock(
+        TaskLockType.SHARED,
+        Intervals.of("2017/2018"),
+        MEDIUM_PRIORITY,
+        tasks
+    );
+    Assert.assertNotNull(theLock);
+    Assert.assertFalse(theLock.isRevoked());
+
+    // No overlapping exclusive lock is compatible
+    Assert.assertNull(
+        insertTaskLock(
+            TaskLockType.EXCLUSIVE,
+            Intervals.of("2017-05-01/2017-06-01"),
+            MEDIUM_PRIORITY,
+            tasks
+        )
+    );
+
+    // Enclosing shared lock of lower priority - compatible
+    final TaskLock sharedLock0 = insertTaskLock(
+            TaskLockType.SHARED,
+            Intervals.of("2016/2019"),
+            LOW_PRIORITY,
+            tasks
+    );
+    Assert.assertNotNull(sharedLock0);
+    Assert.assertFalse(sharedLock0.isRevoked());
+
+    // Enclosed shared lock of higher priority - compatible
+    final TaskLock sharedLock1 = insertTaskLock(
+        TaskLockType.SHARED,
+        Intervals.of("2017-06-01/2017-07-01"),
+        LOW_PRIORITY,
+        tasks
+    );
+    Assert.assertNotNull(sharedLock1);
+    Assert.assertFalse(sharedLock1.isRevoked());
+
+    // Partially Overlapping shared lock of equal priority - compatible
+    final TaskLock sharedLock2 = insertTaskLock(
+        TaskLockType.SHARED,
+        Intervals.of("2017-05-01/2018-05-01"),
+        MEDIUM_PRIORITY,
+        tasks
+    );
+    Assert.assertNotNull(sharedLock2);
+    Assert.assertFalse(sharedLock2.isRevoked());
+
+    // Conficting replace locks are incompatible
+    Assert.assertNull(
+        insertTaskLock(
+            TaskLockType.REPLACE,
+            Intervals.of("2017/2018"),
+            MEDIUM_PRIORITY,
+            tasks
+        )
+    );
+
+    // Conflicting append locks are incompatible
+    Assert.assertNull(
+        insertTaskLock(
+            TaskLockType.APPEND,
+            Intervals.of("2017-05-01/2018-05-01"),
+            MEDIUM_PRIORITY,
+            tasks
+        )
+    );
+
+    final Set<TaskLock> expectedLocks = ImmutableSet.of(
+        theLock,
+        sharedLock0,
+        sharedLock1,
+        sharedLock2
+    );
+    Assert.assertEquals(expectedLocks, getAllLocks(tasks));
+  }

Review Comment:
   Looking over these tests, they follow a very similar pattern.  Along with my commentary about wanting it to be simpler to decipher what the tests are validating, I think you can simplify things quite a bit.  You can make a `TaskLockboxValidator` which is a class that each test would instantiate.  That class can maintain the list of tasks (that you call task).  You can add methods on it like
   
   ```
   TaskLock expectLockCreated(Type, Interval, priority);
   
   void expectLockNotGranted(Type, Interval, priority);
   
   void expectLockIsNowRevoked(TaskLock);
   
   void validateExpectedLocks(List<TaskLock>);
   ```
   
   And then if you adjust these tests to be written in terms of that, I think that it will be both easier to understand what the tests are doing *and* simpler to write the tests.



##########
indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java:
##########
@@ -1272,6 +1276,627 @@ public void testGetLockedIntervalsForEqualPriorityTask() throws Exception
     );
   }
 
+  @Test
+  public void testExclusiveLockCompatibility() throws Exception
+  {
+    final List<Task> tasks = new ArrayList<>();
+
+    final TaskLock theLock = insertTaskLock(
+        TaskLockType.EXCLUSIVE,
+        Intervals.of("2017/2018"),
+        MEDIUM_PRIORITY,
+        tasks
+    );
+    Assert.assertNotNull(theLock);
+    Assert.assertFalse(theLock.isRevoked());
+
+    // Another exclusive lock cannot be created for an overlapping interval
+    Assert.assertNull(
+        insertTaskLock(
+            TaskLockType.EXCLUSIVE,
+            Intervals.of("2017-05-01/2017-06-01"),
+            MEDIUM_PRIORITY,
+            tasks
+        )
+    );
+
+    // A shared lock cannot be created for an overlapping interval
+    Assert.assertNull(
+        insertTaskLock(
+            TaskLockType.SHARED,
+            Intervals.of("2016/2019"),
+            MEDIUM_PRIORITY,
+            tasks
+        )
+    );
+
+    // A replace lock cannot be created for an overlapping interval
+    Assert.assertNull(
+        insertTaskLock(
+            TaskLockType.REPLACE,
+            Intervals.of("2017/2018"),
+            MEDIUM_PRIORITY,
+            tasks
+        )
+    );
+
+    // An append lock cannot be created for an overlapping interval
+    Assert.assertNull(
+        insertTaskLock(
+            TaskLockType.APPEND,
+            Intervals.of("2017-05-01/2018-05-01"),
+            MEDIUM_PRIORITY,
+            tasks
+        )
+    );
+
+    final Set<TaskLock> expectedLocks = ImmutableSet.of(
+        theLock
+    );
+    Assert.assertEquals(expectedLocks, getAllLocks(tasks));
+  }
+
+  @Test
+  public void testExclusiveLockCanRevokeAllIncompatible() throws Exception
+  {
+    final List<Task> tasks = new ArrayList<>();
+
+    // Revoked shared lock of higher priority -> revoked locks are not incompatible
+    final TaskLock sharedLock = insertTaskLock(
+        TaskLockType.SHARED,
+        Intervals.of("2016/2019"),
+        HIGH_PRIORITY,
+        tasks
+    );
+    Assert.assertNotNull(sharedLock);
+    lockbox.revokeLock(tasks.get(0).getId(), sharedLock);
+
+    // Active Exclusive lock of lower priority -> will be revoked
+    final TaskLock exclusiveLock = insertTaskLock(
+        TaskLockType.EXCLUSIVE,
+        Intervals.of("2017-01-01/2017-02-01"),
+        LOW_PRIORITY,
+        tasks
+    );
+    Assert.assertNotNull(exclusiveLock);
+    Assert.assertFalse(exclusiveLock.isRevoked());

Review Comment:
   Oooooh, I see, it's asserting that the lock was granted and the revoked is false...  I totally read it wrong, partially because your comment made me think that you were testing that it was going to be revoked.



##########
indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java:
##########
@@ -1272,6 +1276,627 @@ public void testGetLockedIntervalsForEqualPriorityTask() throws Exception
     );
   }
 
+  @Test
+  public void testExclusiveLockCompatibility() throws Exception
+  {
+    final List<Task> tasks = new ArrayList<>();
+
+    final TaskLock theLock = insertTaskLock(
+        TaskLockType.EXCLUSIVE,
+        Intervals.of("2017/2018"),
+        MEDIUM_PRIORITY,
+        tasks
+    );
+    Assert.assertNotNull(theLock);
+    Assert.assertFalse(theLock.isRevoked());
+
+    // Another exclusive lock cannot be created for an overlapping interval
+    Assert.assertNull(
+        insertTaskLock(
+            TaskLockType.EXCLUSIVE,
+            Intervals.of("2017-05-01/2017-06-01"),
+            MEDIUM_PRIORITY,
+            tasks
+        )
+    );
+
+    // A shared lock cannot be created for an overlapping interval
+    Assert.assertNull(
+        insertTaskLock(
+            TaskLockType.SHARED,
+            Intervals.of("2016/2019"),
+            MEDIUM_PRIORITY,
+            tasks
+        )
+    );
+
+    // A replace lock cannot be created for an overlapping interval
+    Assert.assertNull(
+        insertTaskLock(
+            TaskLockType.REPLACE,
+            Intervals.of("2017/2018"),
+            MEDIUM_PRIORITY,
+            tasks
+        )
+    );
+
+    // An append lock cannot be created for an overlapping interval
+    Assert.assertNull(
+        insertTaskLock(
+            TaskLockType.APPEND,
+            Intervals.of("2017-05-01/2018-05-01"),
+            MEDIUM_PRIORITY,
+            tasks
+        )
+    );
+
+    final Set<TaskLock> expectedLocks = ImmutableSet.of(
+        theLock
+    );
+    Assert.assertEquals(expectedLocks, getAllLocks(tasks));
+  }
+
+  @Test
+  public void testExclusiveLockCanRevokeAllIncompatible() throws Exception
+  {
+    final List<Task> tasks = new ArrayList<>();
+
+    // Revoked shared lock of higher priority -> revoked locks are not incompatible
+    final TaskLock sharedLock = insertTaskLock(
+        TaskLockType.SHARED,
+        Intervals.of("2016/2019"),
+        HIGH_PRIORITY,
+        tasks
+    );
+    Assert.assertNotNull(sharedLock);
+    lockbox.revokeLock(tasks.get(0).getId(), sharedLock);

Review Comment:
   not incompatible means compatible right?



##########
indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java:
##########
@@ -1272,6 +1276,627 @@ public void testGetLockedIntervalsForEqualPriorityTask() throws Exception
     );
   }
 
+  @Test
+  public void testExclusiveLockCompatibility() throws Exception
+  {
+    final List<Task> tasks = new ArrayList<>();
+
+    final TaskLock theLock = insertTaskLock(
+        TaskLockType.EXCLUSIVE,
+        Intervals.of("2017/2018"),
+        MEDIUM_PRIORITY,
+        tasks
+    );
+    Assert.assertNotNull(theLock);
+    Assert.assertFalse(theLock.isRevoked());
+
+    // Another exclusive lock cannot be created for an overlapping interval
+    Assert.assertNull(
+        insertTaskLock(
+            TaskLockType.EXCLUSIVE,
+            Intervals.of("2017-05-01/2017-06-01"),
+            MEDIUM_PRIORITY,
+            tasks
+        )
+    );
+
+    // A shared lock cannot be created for an overlapping interval
+    Assert.assertNull(
+        insertTaskLock(
+            TaskLockType.SHARED,
+            Intervals.of("2016/2019"),
+            MEDIUM_PRIORITY,
+            tasks
+        )
+    );
+
+    // A replace lock cannot be created for an overlapping interval
+    Assert.assertNull(
+        insertTaskLock(
+            TaskLockType.REPLACE,
+            Intervals.of("2017/2018"),
+            MEDIUM_PRIORITY,
+            tasks
+        )
+    );
+
+    // An append lock cannot be created for an overlapping interval
+    Assert.assertNull(
+        insertTaskLock(
+            TaskLockType.APPEND,
+            Intervals.of("2017-05-01/2018-05-01"),
+            MEDIUM_PRIORITY,
+            tasks
+        )
+    );
+
+    final Set<TaskLock> expectedLocks = ImmutableSet.of(
+        theLock
+    );
+    Assert.assertEquals(expectedLocks, getAllLocks(tasks));
+  }
+
+  @Test
+  public void testExclusiveLockCanRevokeAllIncompatible() throws Exception
+  {
+    final List<Task> tasks = new ArrayList<>();
+
+    // Revoked shared lock of higher priority -> revoked locks are not incompatible
+    final TaskLock sharedLock = insertTaskLock(
+        TaskLockType.SHARED,
+        Intervals.of("2016/2019"),
+        HIGH_PRIORITY,
+        tasks
+    );
+    Assert.assertNotNull(sharedLock);
+    lockbox.revokeLock(tasks.get(0).getId(), sharedLock);
+
+    // Active Exclusive lock of lower priority -> will be revoked
+    final TaskLock exclusiveLock = insertTaskLock(
+        TaskLockType.EXCLUSIVE,
+        Intervals.of("2017-01-01/2017-02-01"),
+        LOW_PRIORITY,
+        tasks
+    );
+    Assert.assertNotNull(exclusiveLock);
+    Assert.assertFalse(exclusiveLock.isRevoked());

Review Comment:
   Why will this be revoked?  the shared lock that it would conflict with was revoked, in which case this should be fine, no?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org