You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by "AmatyaAvadhanula (via GitHub)" <gi...@apache.org> on 2023/05/11 11:37:10 UTC

[GitHub] [druid] AmatyaAvadhanula opened a new pull request, #14258: Add new lock types: APPEND and REPLACE

AmatyaAvadhanula opened a new pull request, #14258:
URL: https://github.com/apache/druid/pull/14258

   <!-- Thanks for trying to help us make Apache Druid be the best it can be! Please fill out as much of the following information as is possible (where relevant, and remove it when irrelevant) to help make the intention and scope of this PR clear in order to ease review. -->
   
   
   <!-- Please read the doc for contribution (https://github.com/apache/druid/blob/master/CONTRIBUTING.md) before making this PR. Also, once you open a PR, please _avoid using force pushes and rebasing_ since these make it difficult for reviewers to see what you've changed in response to their reviews. See [the 'If your pull request shows conflicts with master' section](https://github.com/apache/druid/blob/master/CONTRIBUTING.md#if-your-pull-request-shows-conflicts-with-master) for more details. -->
   
   Introduce two new lock types: APPEND and REPLACE
   
   <!-- Replace XXXX with the id of the issue fixed in this PR. Remove this section if there is no corresponding issue. Don't reference the issue in the title of this pull-request. -->
   
   <!-- If you are a committer, follow the PR action item checklist for committers:
   https://github.com/apache/druid/blob/master/dev/committer-instructions.md#pr-and-issue-action-item-checklist-for-committers. -->
   
   ### Description
   
   <!-- Describe the goal of this PR, what problem are you fixing. If there is a corresponding issue (referenced above), it's not necessary to repeat the description here, however, you may choose to keep one summary sentence. -->
   - An interval may have at most one EXCLUSIVE lock and no other locks may coexist with it. -> This is useful when replacing -the data within an interval
   - An interval may have any number of SHARED locks of different priorities, with any overlap. No other types of locks may coexist with such intervals. This facilitates several appending jobs to run concurrently on the same interval.
   
   This means that we can only replace or append to an interval, but not both, with timechunk locking.
   
   
   <!-- Describe your patch: what did you change in code? How did you fix the problem? -->
   We introduce two new lock types in this PR to lay the foundation for a replace job to run concurrently with multiple appending jobs for an interval. These locks are called REPLACE and APPEND and are meant to be used for replacing and appending jobs respectively, and may only be used with TIMECHUNK locking.
   `Please note that this PR doesn't enable the usage of these locks.`
   
   - An interval may have at most one REPLACE lock and any number of APPEND locks with intervals completely enclosed by that of the REPLACE lock's.
   - An interval may have any number of APPEND locks and each of these may be coexist with at most one REPLACE lock, whose interval completely encloses the APPEND lock's.
   
   ### Algorithm
   We now have 4 types of locks, with different compatibility criteria described above.
   - When a lock is requested, we first see if it can peacefully exist with other conflicting locks in which case we grant the lock immediately.
   - When the request is incompatible with one or more of the existing locks, we try to see if each of the incompatible locks can be revoked. If yes, we grant the requested lock after revoking all of them.
   - Otherwise, the request fails without affecting any of the existing locks.
   
   
   
   <!-- If there are several relatively logically separate changes in this PR, create a mini-section for each of them. For example: -->
   
   
   <!--
   In each section, please describe design decisions made, including:
    - Choice of algorithms
    - Behavioral aspects. What configuration values are acceptable? How are corner cases and error conditions handled, such as when there are insufficient resources?
    - Class organization and design (how the logic is split between classes, inheritance, composition, design patterns)
    - Method organization and design (how the logic is split between methods, parameters and return types)
    - Naming (class, method, API, configuration, HTTP endpoint, names of emitted metrics)
   -->
   
   
   <!-- It's good to describe an alternative design (or mention an alternative name) for every design (or naming) decision point and compare the alternatives with the designs that you've implemented (or the names you've chosen) to highlight the advantages of the chosen designs and names. -->
   
   <!-- If there was a discussion of the design of the feature implemented in this PR elsewhere (e. g. a "Proposal" issue, any other issue, or a thread in the development mailing list), link to that discussion from this PR description and explain what have changed in your final design compared to your original proposal or the consensus version in the end of the discussion. If something hasn't changed since the original discussion, you can omit a detailed discussion of those aspects of the design here, perhaps apart from brief mentioning for the sake of readability of this PR description. -->
   
   <!-- Some of the aspects mentioned above may be omitted for simple and small changes. -->
   
   <!-- Give your best effort to summarize your changes in a couple of sentences aimed toward Druid users. 
   
   If your change doesn't have end user impact, you can skip this section.
   
   For tips about how to write a good release note, see [Release notes](https://github.com/apache/druid/blob/master/CONTRIBUTING.md#release-notes).
   
   -->
   
   
   <hr>
   
   ##### Key changed/added classes in this PR
    * `TaskLockbox`
    * `TaskLockType`
   
   <hr>
   
   <!-- Check the items by putting "x" in the brackets for the done things. Not all of these items apply to every PR. Remove the items which are not done or not relevant to the PR. None of the items from the checklist below are strictly necessary, but it would be very helpful if you at least self-review the PR. -->
   
   This PR has:
   
   - [x] been self-reviewed.
   - [ ] added documentation for new or modified features or behaviors.
   - [ ] a release note entry in the PR description.
   - [x] added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
   - [ ] added or updated version, license, or notice information in [licenses.yaml](https://github.com/apache/druid/blob/master/dev/license.md)
   - [x] added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
   - [x] added unit tests or modified existing tests to cover new code paths, ensuring the threshold for [code coverage](https://github.com/apache/druid/blob/master/dev/code-review/code-coverage.md) is met.
   - [ ] added integration tests.
   - [ ] been tested in a test Druid cluster.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] AmatyaAvadhanula commented on a diff in pull request #14258: Add new lock types: APPEND and REPLACE

Posted by "AmatyaAvadhanula (via GitHub)" <gi...@apache.org>.
AmatyaAvadhanula commented on code in PR #14258:
URL: https://github.com/apache/druid/pull/14258#discussion_r1192131281


##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java:
##########
@@ -1239,43 +1262,190 @@ Map<String, NavigableMap<DateTime, SortedMap<Interval, List<TaskLockPosse>>>> ge
   }
 
   /**
-   * Check if all lockPosses are either shared
-   * OR of lower priority
-   * OR are revoked non-shared locks if their priorities are greater than or equal to the provided priority
-   * @param lockPosses conflicting task lock posses to be checked
-   * @param priority priority of the lock to be acquired
-   * @return true if the condititons are met
+   * Check if the lock for a given request can coexist with a given set of conflicting posses without any revocation.
+   * @param conflictPosses conflict lock posses
+   * @param request lock request
+   * @return true iff the lock can coexist with all its conflicting locks
    */
-  private static boolean areAllEqualOrHigherPriorityLocksSharedOrRevoked(List<TaskLockPosse> lockPosses, int priority)
+  private boolean canLockCoexist(List<TaskLockPosse> conflictPosses, LockRequest request)
   {
-    return lockPosses.stream()
-                     .filter(taskLockPosse -> taskLockPosse.getTaskLock().getNonNullPriority() >= priority)
-                     .allMatch(taskLockPosse -> taskLockPosse.getTaskLock().getType().equals(TaskLockType.SHARED)
-                                                || taskLockPosse.getTaskLock().isRevoked());
+    switch (request.getType()) {
+      case APPEND:
+        return canAppendLockCoexist(conflictPosses, request);
+      case REPLACE:
+        return canReplaceLockCoexist(conflictPosses, request);
+      case SHARED:
+        return canSharedLockCoexist(conflictPosses);
+      case EXCLUSIVE:
+        return canExclusiveLockCoexist(conflictPosses);
+      default:
+        return false;
+    }

Review Comment:
   Added a UOE



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java:
##########
@@ -1239,43 +1262,190 @@ Map<String, NavigableMap<DateTime, SortedMap<Interval, List<TaskLockPosse>>>> ge
   }
 
   /**
-   * Check if all lockPosses are either shared
-   * OR of lower priority
-   * OR are revoked non-shared locks if their priorities are greater than or equal to the provided priority
-   * @param lockPosses conflicting task lock posses to be checked
-   * @param priority priority of the lock to be acquired
-   * @return true if the condititons are met
+   * Check if the lock for a given request can coexist with a given set of conflicting posses without any revocation.
+   * @param conflictPosses conflict lock posses
+   * @param request lock request
+   * @return true iff the lock can coexist with all its conflicting locks
    */
-  private static boolean areAllEqualOrHigherPriorityLocksSharedOrRevoked(List<TaskLockPosse> lockPosses, int priority)
+  private boolean canLockCoexist(List<TaskLockPosse> conflictPosses, LockRequest request)
   {
-    return lockPosses.stream()
-                     .filter(taskLockPosse -> taskLockPosse.getTaskLock().getNonNullPriority() >= priority)
-                     .allMatch(taskLockPosse -> taskLockPosse.getTaskLock().getType().equals(TaskLockType.SHARED)
-                                                || taskLockPosse.getTaskLock().isRevoked());
+    switch (request.getType()) {
+      case APPEND:
+        return canAppendLockCoexist(conflictPosses, request);
+      case REPLACE:
+        return canReplaceLockCoexist(conflictPosses, request);
+      case SHARED:
+        return canSharedLockCoexist(conflictPosses);
+      case EXCLUSIVE:
+        return canExclusiveLockCoexist(conflictPosses);
+      default:
+        return false;
+    }
   }
 
   /**
-   * Revokes all non-shared locks with priorities lower than the provided priority
-   * @param lockPosses conflicting task lock posses which may be revoked
-   * @param priority priority of the lock to be acquired
+   * Check if an APPEND lock can coexist with a given set of conflicting posses.
+   * An APPEND lock can coexist with any number of other APPEND locks
+   *    OR with at most one REPLACE lock over an interval which encloes this request.
+   * @param conflictPosses conflicting lock posses
+   * @param appendRequest append lock request
+   * @return true iff append lock can coexist with all its conflicting locks
    */
-  private void revokeAllLowerPriorityNonSharedLocks(List<TaskLockPosse> lockPosses, int priority)
+  private boolean canAppendLockCoexist(List<TaskLockPosse> conflictPosses, LockRequest appendRequest)
   {
-    lockPosses.stream()
-              .filter(taskLockPosse -> !TaskLockType.SHARED.equals(taskLockPosse.getTaskLock().getType()))
-              .filter(taskLockPosse -> taskLockPosse.getTaskLock().getNonNullPriority() < priority)
-              .forEach(this::revokeLock);
+    TaskLock replaceLock = null;
+    for (TaskLockPosse posse : conflictPosses) {
+      if (posse.getTaskLock().isRevoked()) {
+        continue;
+      }
+      if (posse.getTaskLock().getType().equals(TaskLockType.EXCLUSIVE)
+          || posse.getTaskLock().getType().equals(TaskLockType.SHARED)) {
+        return false;
+      }
+      if (posse.getTaskLock().getType().equals(TaskLockType.REPLACE)) {
+        if (replaceLock != null) {
+          return false;
+        }
+        replaceLock = posse.getTaskLock();
+        if (!replaceLock.getInterval().contains(appendRequest.getInterval())) {
+          return false;
+        }
+      }
+    }
+    return true;
   }
 
-  private static boolean isAllRevocable(List<TaskLockPosse> lockPosses, int tryLockPriority)
+  /**
+   * Check if a REPLACE lock can coexist with a given set of conflicting posses.
+   * A REPLACE lock can coexist with any number of other APPEND locks and revoked locks
+   * @param conflictPosses conflicting lock posses
+   * @param replaceLock replace lock request
+   * @return true iff replace lock can coexist with all its conflicting locks
+   */
+  private boolean canReplaceLockCoexist(List<TaskLockPosse> conflictPosses, LockRequest replaceLock)
   {
-    return lockPosses.stream().allMatch(taskLockPosse -> isRevocable(taskLockPosse, tryLockPriority));
+    for (TaskLockPosse posse : conflictPosses) {
+      if (posse.getTaskLock().isRevoked()) {
+        continue;
+      }
+      if (posse.getTaskLock().getType().equals(TaskLockType.EXCLUSIVE)
+          || posse.getTaskLock().getType().equals(TaskLockType.SHARED)
+          || posse.getTaskLock().getType().equals(TaskLockType.REPLACE)) {
+        return false;
+      }
+      if (posse.getTaskLock().getType().equals(TaskLockType.APPEND)
+          && !replaceLock.getInterval().contains(posse.getTaskLock().getInterval())) {
+        return false;
+      }
+    }
+    return true;
   }
 
-  private static boolean isRevocable(TaskLockPosse lockPosse, int tryLockPriority)
+  /**
+   * Check if a SHARED lock can coexist with a given set of conflicting posses.
+   * A SHARED lock can coexist with any number of other active SHARED locks
+   * @param conflictPosses conflicting lock posses
+   * @return true iff shared lock can coexist with all its conflicting locks
+   */
+  private boolean canSharedLockCoexist(List<TaskLockPosse> conflictPosses)
   {
-    final TaskLock existingLock = lockPosse.getTaskLock();
-    return existingLock.isRevoked() || existingLock.getNonNullPriority() < tryLockPriority;
+    for (TaskLockPosse posse : conflictPosses) {
+      if (posse.getTaskLock().isRevoked()) {
+        continue;
+      }
+      if (posse.getTaskLock().getType().equals(TaskLockType.EXCLUSIVE)
+          || posse.getTaskLock().getType().equals(TaskLockType.APPEND)
+          || posse.getTaskLock().getType().equals(TaskLockType.REPLACE)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Check if an EXCLUSIVE lock can coexist with a given set of conflicting posses.
+   * An EXCLUSIVE lock cannot coexist with any other overlapping active locks
+   * @param conflictPosses conflicting lock posses
+   * @return true iff the exclusive lock can coexist with all its conflicting locks
+   */
+  private boolean canExclusiveLockCoexist(List<TaskLockPosse> conflictPosses)
+  {
+    for (TaskLockPosse posse : conflictPosses) {
+      if (posse.getTaskLock().isRevoked()) {
+        continue;
+      }
+      return false;
+    }
+    return true;
+  }
+
+
+  /**
+   * Verify if each incompatible active lock is revokable. If yes, revoke all of them.

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] AmatyaAvadhanula commented on a diff in pull request #14258: Add new lock types: APPEND and REPLACE

Posted by "AmatyaAvadhanula (via GitHub)" <gi...@apache.org>.
AmatyaAvadhanula commented on code in PR #14258:
URL: https://github.com/apache/druid/pull/14258#discussion_r1192060321


##########
indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java:
##########
@@ -1272,6 +1276,627 @@ public void testGetLockedIntervalsForEqualPriorityTask() throws Exception
     );
   }
 
+  @Test
+  public void testExclusiveLockCompatibility() throws Exception
+  {
+    final List<Task> tasks = new ArrayList<>();
+
+    final TaskLock theLock = insertTaskLock(
+        TaskLockType.EXCLUSIVE,
+        Intervals.of("2017/2018"),
+        MEDIUM_PRIORITY,
+        tasks
+    );
+    Assert.assertNotNull(theLock);
+    Assert.assertFalse(theLock.isRevoked());
+
+    // Another exclusive lock cannot be created for an overlapping interval
+    Assert.assertNull(
+        insertTaskLock(

Review Comment:
   Done



##########
indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java:
##########
@@ -1272,6 +1276,627 @@ public void testGetLockedIntervalsForEqualPriorityTask() throws Exception
     );
   }
 
+  @Test
+  public void testExclusiveLockCompatibility() throws Exception
+  {
+    final List<Task> tasks = new ArrayList<>();
+
+    final TaskLock theLock = insertTaskLock(
+        TaskLockType.EXCLUSIVE,
+        Intervals.of("2017/2018"),
+        MEDIUM_PRIORITY,
+        tasks
+    );
+    Assert.assertNotNull(theLock);
+    Assert.assertFalse(theLock.isRevoked());
+
+    // Another exclusive lock cannot be created for an overlapping interval
+    Assert.assertNull(
+        insertTaskLock(
+            TaskLockType.EXCLUSIVE,
+            Intervals.of("2017-05-01/2017-06-01"),
+            MEDIUM_PRIORITY,
+            tasks
+        )
+    );
+
+    // A shared lock cannot be created for an overlapping interval
+    Assert.assertNull(
+        insertTaskLock(
+            TaskLockType.SHARED,
+            Intervals.of("2016/2019"),
+            MEDIUM_PRIORITY,
+            tasks
+        )
+    );
+
+    // A replace lock cannot be created for an overlapping interval
+    Assert.assertNull(
+        insertTaskLock(
+            TaskLockType.REPLACE,
+            Intervals.of("2017/2018"),
+            MEDIUM_PRIORITY,
+            tasks
+        )
+    );
+
+    // An append lock cannot be created for an overlapping interval
+    Assert.assertNull(
+        insertTaskLock(
+            TaskLockType.APPEND,
+            Intervals.of("2017-05-01/2018-05-01"),
+            MEDIUM_PRIORITY,
+            tasks
+        )
+    );
+
+    final Set<TaskLock> expectedLocks = ImmutableSet.of(
+        theLock
+    );
+    Assert.assertEquals(expectedLocks, getAllLocks(tasks));
+  }
+
+  @Test
+  public void testExclusiveLockCanRevokeAllIncompatible() throws Exception
+  {
+    final List<Task> tasks = new ArrayList<>();
+
+    // Revoked shared lock of higher priority -> revoked locks are not incompatible
+    final TaskLock sharedLock = insertTaskLock(
+        TaskLockType.SHARED,
+        Intervals.of("2016/2019"),
+        HIGH_PRIORITY,
+        tasks
+    );
+    Assert.assertNotNull(sharedLock);
+    lockbox.revokeLock(tasks.get(0).getId(), sharedLock);

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] imply-cheddar commented on a diff in pull request #14258: Add new lock types: APPEND and REPLACE

Posted by "imply-cheddar (via GitHub)" <gi...@apache.org>.
imply-cheddar commented on code in PR #14258:
URL: https://github.com/apache/druid/pull/14258#discussion_r1192071592


##########
indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java:
##########
@@ -1272,6 +1250,419 @@ public void testGetLockedIntervalsForEqualPriorityTask() throws Exception
     );
   }
 
+  @Test
+  public void testExclusiveLockCompatibility() throws Exception
+  {
+    final TaskLock theLock = validator.expectLockCreated(
+        TaskLockType.EXCLUSIVE,
+        Intervals.of("2017/2018"),
+        MEDIUM_PRIORITY
+    );
+
+    // Another exclusive lock cannot be created for an overlapping interval
+    validator.expectLockNotGranted(
+        TaskLockType.EXCLUSIVE,
+        Intervals.of("2017-05-01/2017-06-01"),
+        MEDIUM_PRIORITY
+    );
+
+    // A shared lock cannot be created for an overlapping interval
+    validator.expectLockNotGranted(
+        TaskLockType.SHARED,
+        Intervals.of("2016/2019"),
+        MEDIUM_PRIORITY
+    );
+
+    // A replace lock cannot be created for an overlapping interval
+    validator.expectLockNotGranted(
+        TaskLockType.REPLACE,
+        Intervals.of("2017/2018"),
+        MEDIUM_PRIORITY
+    );
+
+    // An append lock cannot be created for an overlapping interval
+    validator.expectLockNotGranted(
+        TaskLockType.APPEND,
+        Intervals.of("2017-05-01/2018-05-01"),
+        MEDIUM_PRIORITY
+    );
+
+    validator.expectActiveLocks(theLock);
+    validator.expectRevokedLocks();
+  }
+
+  @Test
+  public void testExclusiveLockCanRevokeAllIncompatible() throws Exception
+  {
+    final TaskLockboxValidator validator = new TaskLockboxValidator(lockbox, taskStorage);
+
+    // Revoked shared lock of higher priority -> revoked locks are compatible
+    final TaskLock sharedLock = validator.tryTaskLock(
+        TaskLockType.SHARED,
+        Intervals.of("2016/2019"),
+        HIGH_PRIORITY
+    );
+    validator.revokeLock(sharedLock);
+
+    // Active Exclusive lock of lower priority -> will be revoked
+    final TaskLock exclusiveLock = validator.expectLockCreated(
+        TaskLockType.EXCLUSIVE,
+        Intervals.of("2017-01-01/2017-02-01"),
+        LOW_PRIORITY
+    );
+
+    // Active replace lock of lower priority -> will be revoked
+    final TaskLock replaceLock = validator.expectLockCreated(
+        TaskLockType.REPLACE,
+        Intervals.of("2017-07-01/2018-01-01"),
+        LOW_PRIORITY
+    );
+
+    // Active append lock of lower priority -> will be revoked
+    final TaskLock appendLock = validator.expectLockCreated(
+        TaskLockType.APPEND,
+        Intervals.of("2017-09-01/2017-10-01"),
+        LOW_PRIORITY
+    );
+
+    validator.expectActiveLocks(exclusiveLock, replaceLock, appendLock);
+    validator.expectRevokedLocks(sharedLock);
+
+    final TaskLock theLock = validator.expectLockCreated(
+        TaskLockType.EXCLUSIVE,
+        Intervals.of("2017/2018"),
+        MEDIUM_PRIORITY
+    );
+
+    validator.expectActiveLocks(theLock);
+    validator.expectRevokedLocks(sharedLock, exclusiveLock, appendLock, replaceLock);
+  }
+
+  @Test
+  public void testSharedLockCompatibility() throws Exception
+  {
+    final TaskLock theLock = validator.expectLockCreated(
+        TaskLockType.SHARED,
+        Intervals.of("2017/2018"),
+        MEDIUM_PRIORITY
+    );
+
+    // No overlapping exclusive lock is compatible
+    validator.expectLockNotGranted(
+        TaskLockType.EXCLUSIVE,
+        Intervals.of("2017-05-01/2017-06-01"),
+        MEDIUM_PRIORITY
+    );
+
+    // Enclosing shared lock of lower priority - compatible
+    final TaskLock sharedLock0 = validator.expectLockCreated(
+        TaskLockType.SHARED,
+        Intervals.of("2016/2019"),
+        LOW_PRIORITY
+    );
+
+    // Enclosed shared lock of higher priority - compatible
+    final TaskLock sharedLock1 = validator.expectLockCreated(
+        TaskLockType.SHARED,
+        Intervals.of("2017-06-01/2017-07-01"),
+        LOW_PRIORITY
+    );
+
+    // Partially Overlapping shared lock of equal priority - compatible
+    final TaskLock sharedLock2 = validator.expectLockCreated(
+        TaskLockType.SHARED,
+        Intervals.of("2017-05-01/2018-05-01"),
+        MEDIUM_PRIORITY
+    );
+
+    // Conficting replace locks are incompatible
+    validator.expectLockNotGranted(
+        TaskLockType.REPLACE,
+        Intervals.of("2017/2018"),
+        MEDIUM_PRIORITY
+    );
+
+    // Conflicting append locks are incompatible
+    validator.expectLockNotGranted(
+        TaskLockType.APPEND,
+        Intervals.of("2017-05-01/2018-05-01"),
+        MEDIUM_PRIORITY
+    );
+
+    validator.expectActiveLocks(theLock, sharedLock0, sharedLock1, sharedLock2);
+    validator.expectRevokedLocks();
+  }
+
+  @Test
+  public void testSharedLockCanRevokeAllIncompatible() throws Exception
+  {
+    // Revoked Exclusive lock of higher priority -> revoked locks are compatible
+    final TaskLock exclusiveLock = validator.expectLockCreated(
+        TaskLockType.EXCLUSIVE,
+        Intervals.of("2016/2019"),
+        HIGH_PRIORITY
+    );
+    validator.revokeLock(exclusiveLock);
+
+    // Active Shared lock of same priority -> will not be affected
+    final TaskLock sharedLock = validator.expectLockCreated(
+        TaskLockType.SHARED,
+        Intervals.of("2017-01-01/2017-02-01"),
+        MEDIUM_PRIORITY
+    );
+
+    // Active replace lock of lower priority -> will be revoked
+    final TaskLock replaceLock = validator.expectLockCreated(
+        TaskLockType.REPLACE,
+        Intervals.of("2017-07-01/2018-07-01"),
+        LOW_PRIORITY
+    );
+
+    // Active append lock of lower priority -> will be revoked
+    final TaskLock appendLock = validator.expectLockCreated(
+        TaskLockType.APPEND,
+        Intervals.of("2017-02-01/2017-03-01"),
+        LOW_PRIORITY
+    );
+
+    validator.expectActiveLocks(sharedLock, replaceLock, appendLock);
+    validator.expectRevokedLocks(exclusiveLock);
+
+    final TaskLock theLock = validator.expectLockCreated(
+        TaskLockType.SHARED,
+        Intervals.of("2017/2018"),
+        MEDIUM_PRIORITY
+    );
+
+    validator.expectActiveLocks(theLock, sharedLock);
+    validator.expectRevokedLocks(exclusiveLock, replaceLock, appendLock);
+  }
+
+  @Test
+  public void testAppendLockCompatibility() throws Exception
+  {
+    final TaskLock theLock = validator.expectLockCreated(
+        TaskLockType.APPEND,
+        Intervals.of("2017/2018"),
+        MEDIUM_PRIORITY
+    );
+
+    // An exclusive lock cannot be created for an overlapping interval
+    validator.expectLockNotGranted(
+        TaskLockType.EXCLUSIVE,
+        Intervals.of("2017-05-01/2017-06-01"),
+        MEDIUM_PRIORITY
+    );
+
+    // A shared lock cannot be created for an overlapping interval
+    validator.expectLockNotGranted(
+        TaskLockType.SHARED,
+        Intervals.of("2016/2019"),
+        MEDIUM_PRIORITY
+    );
+
+    // A replace lock cannot be created for a non-enclosing interval
+    validator.expectLockNotGranted(
+        TaskLockType.REPLACE,
+        Intervals.of("2017-05-01/2018-01-01"),
+        MEDIUM_PRIORITY
+    );
+
+    // A replace lock can be created for an enclosing interval
+    final TaskLock replaceLock = validator.expectLockCreated(
+        TaskLockType.REPLACE,
+        Intervals.of("2017/2018"),
+        MEDIUM_PRIORITY
+    );
+
+    // Another replace lock cannot be created
+    validator.expectLockNotGranted(
+        TaskLockType.REPLACE,
+        Intervals.of("2016/2019"),
+        MEDIUM_PRIORITY
+    );
+
+
+    // Any append lock can be created, provided that it lies within the interval of the previously created replace lock
+    // This should not revoke any of the existing locks even with a higher priority
+    final TaskLock appendLock0 = validator.expectLockCreated(
+        TaskLockType.APPEND,
+        Intervals.of("2017-05-01/2017-06-01"),
+        HIGH_PRIORITY
+    );
+
+    // Append lock with a lower priority can be created as well
+    final TaskLock appendLock1 = validator.expectLockCreated(
+        TaskLockType.APPEND,
+        Intervals.of("2017-05-01/2017-06-01"),
+        LOW_PRIORITY
+    );
+
+    validator.expectActiveLocks(theLock, replaceLock, appendLock0, appendLock1);
+    validator.expectRevokedLocks();
+  }
+
+  @Test
+  public void testAppendLockCanRevokeAllIncompatible() throws Exception
+  {
+    // Revoked Shared lock of higher priority -> revoked locks are compatible
+    final TaskLock sharedLock = validator.expectLockCreated(
+        TaskLockType.SHARED,
+        Intervals.of("2016/2019"),
+        HIGH_PRIORITY
+    );
+    validator.revokeLock(sharedLock);
+
+    // Active Exclusive lock of lower priority -> will be revoked

Review Comment:
   While I now understand the meaning of the "->", you still have it here and anyone who comes and reads this without the extra explanation that you gave me won't understand it.  Fwiw, with the addition of the validator, I don't know that the comments really add anything anymore, all of the text from the comments are basically already in the method names. 



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java:
##########
@@ -1239,43 +1262,190 @@ Map<String, NavigableMap<DateTime, SortedMap<Interval, List<TaskLockPosse>>>> ge
   }
 
   /**
-   * Check if all lockPosses are either shared
-   * OR of lower priority
-   * OR are revoked non-shared locks if their priorities are greater than or equal to the provided priority
-   * @param lockPosses conflicting task lock posses to be checked
-   * @param priority priority of the lock to be acquired
-   * @return true if the condititons are met
+   * Check if the lock for a given request can coexist with a given set of conflicting posses without any revocation.
+   * @param conflictPosses conflict lock posses
+   * @param request lock request
+   * @return true iff the lock can coexist with all its conflicting locks
    */
-  private static boolean areAllEqualOrHigherPriorityLocksSharedOrRevoked(List<TaskLockPosse> lockPosses, int priority)
+  private boolean canLockCoexist(List<TaskLockPosse> conflictPosses, LockRequest request)
   {
-    return lockPosses.stream()
-                     .filter(taskLockPosse -> taskLockPosse.getTaskLock().getNonNullPriority() >= priority)
-                     .allMatch(taskLockPosse -> taskLockPosse.getTaskLock().getType().equals(TaskLockType.SHARED)
-                                                || taskLockPosse.getTaskLock().isRevoked());
+    switch (request.getType()) {
+      case APPEND:
+        return canAppendLockCoexist(conflictPosses, request);
+      case REPLACE:
+        return canReplaceLockCoexist(conflictPosses, request);
+      case SHARED:
+        return canSharedLockCoexist(conflictPosses);
+      case EXCLUSIVE:
+        return canExclusiveLockCoexist(conflictPosses);
+      default:
+        return false;
+    }
   }
 
   /**
-   * Revokes all non-shared locks with priorities lower than the provided priority
-   * @param lockPosses conflicting task lock posses which may be revoked
-   * @param priority priority of the lock to be acquired
+   * Check if an APPEND lock can coexist with a given set of conflicting posses.
+   * An APPEND lock can coexist with any number of other APPEND locks
+   *    OR with at most one REPLACE lock over an interval which encloes this request.
+   * @param conflictPosses conflicting lock posses
+   * @param appendRequest append lock request
+   * @return true iff append lock can coexist with all its conflicting locks
    */
-  private void revokeAllLowerPriorityNonSharedLocks(List<TaskLockPosse> lockPosses, int priority)
+  private boolean canAppendLockCoexist(List<TaskLockPosse> conflictPosses, LockRequest appendRequest)
   {
-    lockPosses.stream()
-              .filter(taskLockPosse -> !TaskLockType.SHARED.equals(taskLockPosse.getTaskLock().getType()))
-              .filter(taskLockPosse -> taskLockPosse.getTaskLock().getNonNullPriority() < priority)
-              .forEach(this::revokeLock);
+    TaskLock replaceLock = null;
+    for (TaskLockPosse posse : conflictPosses) {
+      if (posse.getTaskLock().isRevoked()) {
+        continue;
+      }
+      if (posse.getTaskLock().getType().equals(TaskLockType.EXCLUSIVE)
+          || posse.getTaskLock().getType().equals(TaskLockType.SHARED)) {
+        return false;
+      }
+      if (posse.getTaskLock().getType().equals(TaskLockType.REPLACE)) {
+        if (replaceLock != null) {
+          return false;
+        }
+        replaceLock = posse.getTaskLock();
+        if (!replaceLock.getInterval().contains(appendRequest.getInterval())) {
+          return false;
+        }
+      }
+    }
+    return true;
   }
 
-  private static boolean isAllRevocable(List<TaskLockPosse> lockPosses, int tryLockPriority)
+  /**
+   * Check if a REPLACE lock can coexist with a given set of conflicting posses.
+   * A REPLACE lock can coexist with any number of other APPEND locks and revoked locks
+   * @param conflictPosses conflicting lock posses
+   * @param replaceLock replace lock request
+   * @return true iff replace lock can coexist with all its conflicting locks
+   */
+  private boolean canReplaceLockCoexist(List<TaskLockPosse> conflictPosses, LockRequest replaceLock)
   {
-    return lockPosses.stream().allMatch(taskLockPosse -> isRevocable(taskLockPosse, tryLockPriority));
+    for (TaskLockPosse posse : conflictPosses) {
+      if (posse.getTaskLock().isRevoked()) {
+        continue;
+      }
+      if (posse.getTaskLock().getType().equals(TaskLockType.EXCLUSIVE)
+          || posse.getTaskLock().getType().equals(TaskLockType.SHARED)
+          || posse.getTaskLock().getType().equals(TaskLockType.REPLACE)) {
+        return false;
+      }
+      if (posse.getTaskLock().getType().equals(TaskLockType.APPEND)
+          && !replaceLock.getInterval().contains(posse.getTaskLock().getInterval())) {
+        return false;
+      }
+    }
+    return true;
   }
 
-  private static boolean isRevocable(TaskLockPosse lockPosse, int tryLockPriority)
+  /**
+   * Check if a SHARED lock can coexist with a given set of conflicting posses.
+   * A SHARED lock can coexist with any number of other active SHARED locks
+   * @param conflictPosses conflicting lock posses
+   * @return true iff shared lock can coexist with all its conflicting locks
+   */
+  private boolean canSharedLockCoexist(List<TaskLockPosse> conflictPosses)
   {
-    final TaskLock existingLock = lockPosse.getTaskLock();
-    return existingLock.isRevoked() || existingLock.getNonNullPriority() < tryLockPriority;
+    for (TaskLockPosse posse : conflictPosses) {
+      if (posse.getTaskLock().isRevoked()) {
+        continue;
+      }
+      if (posse.getTaskLock().getType().equals(TaskLockType.EXCLUSIVE)
+          || posse.getTaskLock().getType().equals(TaskLockType.APPEND)
+          || posse.getTaskLock().getType().equals(TaskLockType.REPLACE)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Check if an EXCLUSIVE lock can coexist with a given set of conflicting posses.
+   * An EXCLUSIVE lock cannot coexist with any other overlapping active locks
+   * @param conflictPosses conflicting lock posses
+   * @return true iff the exclusive lock can coexist with all its conflicting locks
+   */
+  private boolean canExclusiveLockCoexist(List<TaskLockPosse> conflictPosses)
+  {
+    for (TaskLockPosse posse : conflictPosses) {
+      if (posse.getTaskLock().isRevoked()) {
+        continue;
+      }
+      return false;
+    }
+    return true;
+  }
+
+
+  /**
+   * Verify if each incompatible active lock is revokable. If yes, revoke all of them.
+   * - EXCLUSIVE locks are incompatible with every other conflicting lock
+   * - SHARED locks are incompatible with conflicting locks of every other type
+   * - REPLACE locks are incompatible with every conflicting lock which is not (APPEND and enclosed) within its interval
+   * - APPEND locks are incompatible with every EXCLUSIVE and SHARED lock.
+   *   Conflicting REPLACE locks which don't enclose its interval are also incompatible.
+   * @param conflictPosses conflicting lock posses
+   * @param request lock request
+   * @return true iff every incompatible lock is revocable.
+   */
+  private boolean revokeAllIncompatibleActiveLocksIfPossible(
+      List<TaskLockPosse> conflictPosses,
+      LockRequest request
+  )
+  {
+    final int priority = request.getPriority();
+    final TaskLockType type = request.getType();
+    final List<TaskLockPosse> possesToRevoke = new ArrayList<>();
+
+    for (TaskLockPosse posse : conflictPosses) {
+      if (posse.getTaskLock().isRevoked()) {
+        continue;
+      }
+      switch (type) {
+        case EXCLUSIVE:
+          if (posse.getTaskLock().getPriority() >= priority) {
+            return false;
+          }
+          possesToRevoke.add(posse);
+          break;
+        case SHARED:
+          if (!posse.getTaskLock().getType().equals(TaskLockType.SHARED)) {
+            if (posse.getTaskLock().getPriority() >= priority) {
+              return false;
+            }
+            possesToRevoke.add(posse);
+          }
+          break;
+        case REPLACE:
+          if (!(posse.getTaskLock().getType().equals(TaskLockType.APPEND)
+                && request.getInterval().contains(posse.getTaskLock().getInterval()))) {
+            if (posse.getTaskLock().getPriority() >= priority) {
+              return false;
+            }
+            possesToRevoke.add(posse);
+          }
+          break;
+        case APPEND:
+          if (!(posse.getTaskLock().getType().equals(TaskLockType.APPEND)
+                || (posse.getTaskLock().getType().equals(TaskLockType.REPLACE)
+                    && posse.getTaskLock().getInterval().contains(request.getInterval())))) {
+            if (posse.getTaskLock().getPriority() >= priority) {
+              return false;
+            }
+            possesToRevoke.add(posse);
+          }
+          break;
+        default:
+          return false;
+      }

Review Comment:
   Hrm...  That's unfortunate, I wonder how important that really is, but ah well.
   
   In that case, at a minimum, change all of the defaults to throw a `UOE` that says that it got an unknown TaskLockType.  This is so that if/when a new task lock type is created, all of the places that enumerate all of them in a switch like this will fail loudly instead of return something that is potentially wrong and create a hard to track bug.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] imply-cheddar commented on a diff in pull request #14258: Add new lock types: APPEND and REPLACE

Posted by "imply-cheddar (via GitHub)" <gi...@apache.org>.
imply-cheddar commented on code in PR #14258:
URL: https://github.com/apache/druid/pull/14258#discussion_r1192869692


##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java:
##########
@@ -1257,7 +1258,7 @@ private boolean canLockCoexist(List<TaskLockPosse> conflictPosses, LockRequest r
       case EXCLUSIVE:
         return canExclusiveLockCoexist(conflictPosses);
       default:
-        return false;
+        throw new UOE("Unsupposted lock type: " + request.getType());

Review Comment:
   typo: supported not supposted



##########
indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java:
##########
@@ -1447,51 +1423,43 @@ public void testAppendLockCompatibility() throws Exception
         MEDIUM_PRIORITY
     );
 
-    // An exclusive lock cannot be created for an overlapping interval
     validator.expectLockNotGranted(
         TaskLockType.EXCLUSIVE,
         Intervals.of("2017-05-01/2017-06-01"),
         MEDIUM_PRIORITY
     );
 
-    // A shared lock cannot be created for an overlapping interval
     validator.expectLockNotGranted(
         TaskLockType.SHARED,
         Intervals.of("2016/2019"),
         MEDIUM_PRIORITY
     );
 
-    // A replace lock cannot be created for a non-enclosing interval
     validator.expectLockNotGranted(
         TaskLockType.REPLACE,
         Intervals.of("2017-05-01/2018-01-01"),
         MEDIUM_PRIORITY
     );
 
-    // A replace lock can be created for an enclosing interval
     final TaskLock replaceLock = validator.expectLockCreated(
         TaskLockType.REPLACE,
         Intervals.of("2017/2018"),
         MEDIUM_PRIORITY
     );
 
-    // Another replace lock cannot be created
     validator.expectLockNotGranted(
         TaskLockType.REPLACE,
         Intervals.of("2016/2019"),
         MEDIUM_PRIORITY
     );
 
 
-    // Any append lock can be created, provided that it lies within the interval of the previously created replace lock
-    // This should not revoke any of the existing locks even with a higher priority

Review Comment:
   This comment was actually useful fwiw...  It's explaining an intention that is not immediately apparent from the code.  It was a good example of when to write a comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] AmatyaAvadhanula commented on a diff in pull request #14258: Add new lock types: APPEND and REPLACE

Posted by "AmatyaAvadhanula (via GitHub)" <gi...@apache.org>.
AmatyaAvadhanula commented on code in PR #14258:
URL: https://github.com/apache/druid/pull/14258#discussion_r1192131009


##########
indexing-service/src/main/java/org/apache/druid/indexing/common/TaskLockType.java:
##########
@@ -21,6 +21,8 @@
 
 public enum TaskLockType
 {
+  REPLACE, // Meant to be used only with Replacing jobs and timechunk locking
+  APPEND, // Meant to be used only with Appending jobs and timechunk locking

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] imply-cheddar commented on a diff in pull request #14258: Add new lock types: APPEND and REPLACE

Posted by "imply-cheddar (via GitHub)" <gi...@apache.org>.
imply-cheddar commented on code in PR #14258:
URL: https://github.com/apache/druid/pull/14258#discussion_r1192079036


##########
indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java:
##########
@@ -1272,6 +1276,627 @@ public void testGetLockedIntervalsForEqualPriorityTask() throws Exception
     );
   }
 
+  @Test
+  public void testExclusiveLockCompatibility() throws Exception
+  {
+    final List<Task> tasks = new ArrayList<>();
+
+    final TaskLock theLock = insertTaskLock(
+        TaskLockType.EXCLUSIVE,
+        Intervals.of("2017/2018"),
+        MEDIUM_PRIORITY,
+        tasks
+    );
+    Assert.assertNotNull(theLock);
+    Assert.assertFalse(theLock.isRevoked());
+
+    // Another exclusive lock cannot be created for an overlapping interval
+    Assert.assertNull(
+        insertTaskLock(
+            TaskLockType.EXCLUSIVE,
+            Intervals.of("2017-05-01/2017-06-01"),
+            MEDIUM_PRIORITY,
+            tasks
+        )
+    );
+
+    // A shared lock cannot be created for an overlapping interval
+    Assert.assertNull(
+        insertTaskLock(
+            TaskLockType.SHARED,
+            Intervals.of("2016/2019"),
+            MEDIUM_PRIORITY,
+            tasks
+        )
+    );
+
+    // A replace lock cannot be created for an overlapping interval
+    Assert.assertNull(
+        insertTaskLock(
+            TaskLockType.REPLACE,
+            Intervals.of("2017/2018"),
+            MEDIUM_PRIORITY,
+            tasks
+        )
+    );
+
+    // An append lock cannot be created for an overlapping interval
+    Assert.assertNull(
+        insertTaskLock(
+            TaskLockType.APPEND,
+            Intervals.of("2017-05-01/2018-05-01"),
+            MEDIUM_PRIORITY,
+            tasks
+        )
+    );
+
+    final Set<TaskLock> expectedLocks = ImmutableSet.of(
+        theLock
+    );
+    Assert.assertEquals(expectedLocks, getAllLocks(tasks));
+  }
+
+  @Test
+  public void testExclusiveLockCanRevokeAllIncompatible() throws Exception
+  {
+    final List<Task> tasks = new ArrayList<>();
+
+    // Revoked shared lock of higher priority -> revoked locks are not incompatible
+    final TaskLock sharedLock = insertTaskLock(
+        TaskLockType.SHARED,
+        Intervals.of("2016/2019"),
+        HIGH_PRIORITY,
+        tasks
+    );
+    Assert.assertNotNull(sharedLock);
+    lockbox.revokeLock(tasks.get(0).getId(), sharedLock);
+
+    // Active Exclusive lock of lower priority -> will be revoked
+    final TaskLock exclusiveLock = insertTaskLock(
+        TaskLockType.EXCLUSIVE,
+        Intervals.of("2017-01-01/2017-02-01"),
+        LOW_PRIORITY,
+        tasks
+    );
+    Assert.assertNotNull(exclusiveLock);
+    Assert.assertFalse(exclusiveLock.isRevoked());
+
+    // Active replace lock of lower priority -> will be revoked
+    final TaskLock replaceLock = insertTaskLock(
+        TaskLockType.REPLACE,
+        Intervals.of("2017-07-01/2018-01-01"),
+        LOW_PRIORITY,
+        tasks
+    );
+    Assert.assertNotNull(replaceLock);
+    Assert.assertFalse(replaceLock.isRevoked());

Review Comment:
   Just an over-arching comment about comments.  Comments in code are most often a negative addition to the code.  This is because they are most commonly
   
   1) Written from the perspective of the developer who is writing the code and lacking enough context for someone who didn't write to code to know what they mean
   2) Go stale after the first code adjustment because they get overlooked in code changes.
   
   Comments should only be added when the code itself cannot be read.  When there's some extra detail in there that it would be difficult for someone to know by reading the code (there's a great example of a place where a comment is helpful in your most recent committed code where you are explaining that an append lock with a higher priority than a replace lock still doesn't revoke the replace lock).
   
   So, when you find yourself writing comments in code please always make sure of two things
   
   1) Is the comment really needed?  Is the code really not understandable and if it's not understandable, can it be re-arranged to be understandable
   2) Assuming you've determined that a comment is actually necessary, make sure that you write it from the perspective of trying to explain something to someone who is browsing the code for the very first time, because that's the only way you will include enough context for it to be meaningful to the next person.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] AmatyaAvadhanula commented on a diff in pull request #14258: Add new lock types: APPEND and REPLACE

Posted by "AmatyaAvadhanula (via GitHub)" <gi...@apache.org>.
AmatyaAvadhanula commented on code in PR #14258:
URL: https://github.com/apache/druid/pull/14258#discussion_r1192132656


##########
indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java:
##########
@@ -1272,6 +1250,419 @@ public void testGetLockedIntervalsForEqualPriorityTask() throws Exception
     );
   }
 
+  @Test
+  public void testExclusiveLockCompatibility() throws Exception
+  {
+    final TaskLock theLock = validator.expectLockCreated(
+        TaskLockType.EXCLUSIVE,
+        Intervals.of("2017/2018"),
+        MEDIUM_PRIORITY
+    );
+
+    // Another exclusive lock cannot be created for an overlapping interval
+    validator.expectLockNotGranted(
+        TaskLockType.EXCLUSIVE,
+        Intervals.of("2017-05-01/2017-06-01"),
+        MEDIUM_PRIORITY
+    );
+
+    // A shared lock cannot be created for an overlapping interval
+    validator.expectLockNotGranted(
+        TaskLockType.SHARED,
+        Intervals.of("2016/2019"),
+        MEDIUM_PRIORITY
+    );
+
+    // A replace lock cannot be created for an overlapping interval
+    validator.expectLockNotGranted(
+        TaskLockType.REPLACE,
+        Intervals.of("2017/2018"),
+        MEDIUM_PRIORITY
+    );
+
+    // An append lock cannot be created for an overlapping interval
+    validator.expectLockNotGranted(
+        TaskLockType.APPEND,
+        Intervals.of("2017-05-01/2018-05-01"),
+        MEDIUM_PRIORITY
+    );
+
+    validator.expectActiveLocks(theLock);
+    validator.expectRevokedLocks();
+  }
+
+  @Test
+  public void testExclusiveLockCanRevokeAllIncompatible() throws Exception
+  {
+    final TaskLockboxValidator validator = new TaskLockboxValidator(lockbox, taskStorage);
+
+    // Revoked shared lock of higher priority -> revoked locks are compatible
+    final TaskLock sharedLock = validator.tryTaskLock(
+        TaskLockType.SHARED,
+        Intervals.of("2016/2019"),
+        HIGH_PRIORITY
+    );
+    validator.revokeLock(sharedLock);
+
+    // Active Exclusive lock of lower priority -> will be revoked
+    final TaskLock exclusiveLock = validator.expectLockCreated(
+        TaskLockType.EXCLUSIVE,
+        Intervals.of("2017-01-01/2017-02-01"),
+        LOW_PRIORITY
+    );
+
+    // Active replace lock of lower priority -> will be revoked
+    final TaskLock replaceLock = validator.expectLockCreated(
+        TaskLockType.REPLACE,
+        Intervals.of("2017-07-01/2018-01-01"),
+        LOW_PRIORITY
+    );
+
+    // Active append lock of lower priority -> will be revoked
+    final TaskLock appendLock = validator.expectLockCreated(
+        TaskLockType.APPEND,
+        Intervals.of("2017-09-01/2017-10-01"),
+        LOW_PRIORITY
+    );
+
+    validator.expectActiveLocks(exclusiveLock, replaceLock, appendLock);
+    validator.expectRevokedLocks(sharedLock);
+
+    final TaskLock theLock = validator.expectLockCreated(
+        TaskLockType.EXCLUSIVE,
+        Intervals.of("2017/2018"),
+        MEDIUM_PRIORITY
+    );
+
+    validator.expectActiveLocks(theLock);
+    validator.expectRevokedLocks(sharedLock, exclusiveLock, appendLock, replaceLock);
+  }
+
+  @Test
+  public void testSharedLockCompatibility() throws Exception
+  {
+    final TaskLock theLock = validator.expectLockCreated(
+        TaskLockType.SHARED,
+        Intervals.of("2017/2018"),
+        MEDIUM_PRIORITY
+    );
+
+    // No overlapping exclusive lock is compatible
+    validator.expectLockNotGranted(
+        TaskLockType.EXCLUSIVE,
+        Intervals.of("2017-05-01/2017-06-01"),
+        MEDIUM_PRIORITY
+    );
+
+    // Enclosing shared lock of lower priority - compatible
+    final TaskLock sharedLock0 = validator.expectLockCreated(
+        TaskLockType.SHARED,
+        Intervals.of("2016/2019"),
+        LOW_PRIORITY
+    );
+
+    // Enclosed shared lock of higher priority - compatible
+    final TaskLock sharedLock1 = validator.expectLockCreated(
+        TaskLockType.SHARED,
+        Intervals.of("2017-06-01/2017-07-01"),
+        LOW_PRIORITY
+    );
+
+    // Partially Overlapping shared lock of equal priority - compatible
+    final TaskLock sharedLock2 = validator.expectLockCreated(
+        TaskLockType.SHARED,
+        Intervals.of("2017-05-01/2018-05-01"),
+        MEDIUM_PRIORITY
+    );
+
+    // Conficting replace locks are incompatible
+    validator.expectLockNotGranted(
+        TaskLockType.REPLACE,
+        Intervals.of("2017/2018"),
+        MEDIUM_PRIORITY
+    );
+
+    // Conflicting append locks are incompatible
+    validator.expectLockNotGranted(
+        TaskLockType.APPEND,
+        Intervals.of("2017-05-01/2018-05-01"),
+        MEDIUM_PRIORITY
+    );
+
+    validator.expectActiveLocks(theLock, sharedLock0, sharedLock1, sharedLock2);
+    validator.expectRevokedLocks();
+  }
+
+  @Test
+  public void testSharedLockCanRevokeAllIncompatible() throws Exception
+  {
+    // Revoked Exclusive lock of higher priority -> revoked locks are compatible
+    final TaskLock exclusiveLock = validator.expectLockCreated(
+        TaskLockType.EXCLUSIVE,
+        Intervals.of("2016/2019"),
+        HIGH_PRIORITY
+    );
+    validator.revokeLock(exclusiveLock);
+
+    // Active Shared lock of same priority -> will not be affected
+    final TaskLock sharedLock = validator.expectLockCreated(
+        TaskLockType.SHARED,
+        Intervals.of("2017-01-01/2017-02-01"),
+        MEDIUM_PRIORITY
+    );
+
+    // Active replace lock of lower priority -> will be revoked
+    final TaskLock replaceLock = validator.expectLockCreated(
+        TaskLockType.REPLACE,
+        Intervals.of("2017-07-01/2018-07-01"),
+        LOW_PRIORITY
+    );
+
+    // Active append lock of lower priority -> will be revoked
+    final TaskLock appendLock = validator.expectLockCreated(
+        TaskLockType.APPEND,
+        Intervals.of("2017-02-01/2017-03-01"),
+        LOW_PRIORITY
+    );
+
+    validator.expectActiveLocks(sharedLock, replaceLock, appendLock);
+    validator.expectRevokedLocks(exclusiveLock);
+
+    final TaskLock theLock = validator.expectLockCreated(
+        TaskLockType.SHARED,
+        Intervals.of("2017/2018"),
+        MEDIUM_PRIORITY
+    );
+
+    validator.expectActiveLocks(theLock, sharedLock);
+    validator.expectRevokedLocks(exclusiveLock, replaceLock, appendLock);
+  }
+
+  @Test
+  public void testAppendLockCompatibility() throws Exception
+  {
+    final TaskLock theLock = validator.expectLockCreated(
+        TaskLockType.APPEND,
+        Intervals.of("2017/2018"),
+        MEDIUM_PRIORITY
+    );
+
+    // An exclusive lock cannot be created for an overlapping interval
+    validator.expectLockNotGranted(
+        TaskLockType.EXCLUSIVE,
+        Intervals.of("2017-05-01/2017-06-01"),
+        MEDIUM_PRIORITY
+    );
+
+    // A shared lock cannot be created for an overlapping interval
+    validator.expectLockNotGranted(
+        TaskLockType.SHARED,
+        Intervals.of("2016/2019"),
+        MEDIUM_PRIORITY
+    );
+
+    // A replace lock cannot be created for a non-enclosing interval
+    validator.expectLockNotGranted(
+        TaskLockType.REPLACE,
+        Intervals.of("2017-05-01/2018-01-01"),
+        MEDIUM_PRIORITY
+    );
+
+    // A replace lock can be created for an enclosing interval
+    final TaskLock replaceLock = validator.expectLockCreated(
+        TaskLockType.REPLACE,
+        Intervals.of("2017/2018"),
+        MEDIUM_PRIORITY
+    );
+
+    // Another replace lock cannot be created
+    validator.expectLockNotGranted(
+        TaskLockType.REPLACE,
+        Intervals.of("2016/2019"),
+        MEDIUM_PRIORITY
+    );
+
+
+    // Any append lock can be created, provided that it lies within the interval of the previously created replace lock
+    // This should not revoke any of the existing locks even with a higher priority
+    final TaskLock appendLock0 = validator.expectLockCreated(
+        TaskLockType.APPEND,
+        Intervals.of("2017-05-01/2017-06-01"),
+        HIGH_PRIORITY
+    );
+
+    // Append lock with a lower priority can be created as well
+    final TaskLock appendLock1 = validator.expectLockCreated(
+        TaskLockType.APPEND,
+        Intervals.of("2017-05-01/2017-06-01"),
+        LOW_PRIORITY
+    );
+
+    validator.expectActiveLocks(theLock, replaceLock, appendLock0, appendLock1);
+    validator.expectRevokedLocks();
+  }
+
+  @Test
+  public void testAppendLockCanRevokeAllIncompatible() throws Exception
+  {
+    // Revoked Shared lock of higher priority -> revoked locks are compatible
+    final TaskLock sharedLock = validator.expectLockCreated(
+        TaskLockType.SHARED,
+        Intervals.of("2016/2019"),
+        HIGH_PRIORITY
+    );
+    validator.revokeLock(sharedLock);
+
+    // Active Exclusive lock of lower priority -> will be revoked

Review Comment:
   The comments have been cleaned up



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] github-code-scanning[bot] commented on a diff in pull request #14258: Add new lock types: APPEND and REPLACE

Posted by "github-code-scanning[bot] (via GitHub)" <gi...@apache.org>.
github-code-scanning[bot] commented on code in PR #14258:
URL: https://github.com/apache/druid/pull/14258#discussion_r1191080978


##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java:
##########
@@ -1239,43 +1262,192 @@
   }
 
   /**
-   * Check if all lockPosses are either shared
-   * OR of lower priority
-   * OR are revoked non-shared locks if their priorities are greater than or equal to the provided priority
-   * @param lockPosses conflicting task lock posses to be checked
-   * @param priority priority of the lock to be acquired
-   * @return true if the condititons are met
+   * Check if the lock for a given request can coexist with a given set of conflicting posses without any revocation.
+   * @param conflictPosses conflict lock posses
+   * @param request lock request
+   * @return true iff the lock can coexist with all its conflicting locks
    */
-  private static boolean areAllEqualOrHigherPriorityLocksSharedOrRevoked(List<TaskLockPosse> lockPosses, int priority)
+  private boolean canLockCoexist(List<TaskLockPosse> conflictPosses, LockRequest request)
   {
-    return lockPosses.stream()
-                     .filter(taskLockPosse -> taskLockPosse.getTaskLock().getNonNullPriority() >= priority)
-                     .allMatch(taskLockPosse -> taskLockPosse.getTaskLock().getType().equals(TaskLockType.SHARED)
-                                                || taskLockPosse.getTaskLock().isRevoked());
+    switch (request.getType()) {
+      case APPEND:
+        return canAppendLockCoexist(conflictPosses, request);
+      case REPLACE:
+        return canReplaceLockCoexist(conflictPosses, request);
+      case SHARED:
+        return canSharedLockCoexist(conflictPosses, request);
+      case EXCLUSIVE:
+        return canExclusiveLockCoexist(conflictPosses, request);
+      default:
+        return false;
+    }
   }
 
   /**
-   * Revokes all non-shared locks with priorities lower than the provided priority
-   * @param lockPosses conflicting task lock posses which may be revoked
-   * @param priority priority of the lock to be acquired
+   * Check if an APPEND lock can coexist with a given set of conflicting posses.
+   * An APPEND lock can coexist with any number of other APPEND locks
+   *    OR with at most one REPLACE lock over an interval which encloes this request.
+   * @param conflictPosses conflicting lock posses
+   * @param appendRequest append lock request
+   * @return true iff append lock can coexist with all its conflicting locks
    */
-  private void revokeAllLowerPriorityNonSharedLocks(List<TaskLockPosse> lockPosses, int priority)
+  private boolean canAppendLockCoexist(List<TaskLockPosse> conflictPosses, LockRequest appendRequest)
   {
-    lockPosses.stream()
-              .filter(taskLockPosse -> !TaskLockType.SHARED.equals(taskLockPosse.getTaskLock().getType()))
-              .filter(taskLockPosse -> taskLockPosse.getTaskLock().getNonNullPriority() < priority)
-              .forEach(this::revokeLock);
+    TaskLock replaceLock = null;
+    for (TaskLockPosse posse : conflictPosses) {
+      if (posse.getTaskLock().isRevoked()) {
+        continue;
+      }
+      if (posse.getTaskLock().getType().equals(TaskLockType.EXCLUSIVE)
+          || posse.getTaskLock().getType().equals(TaskLockType.SHARED)) {
+        return false;
+      }
+      if (posse.getTaskLock().getType().equals(TaskLockType.REPLACE)) {
+        if (replaceLock != null) {
+          return false;
+        }
+        replaceLock = posse.getTaskLock();
+        if (!replaceLock.getInterval().contains(appendRequest.getInterval())) {
+          return false;
+        }
+      }
+    }
+    return true;
   }
 
-  private static boolean isAllRevocable(List<TaskLockPosse> lockPosses, int tryLockPriority)
+  /**
+   * Check if a REPLACE lock can coexist with a given set of conflicting posses.
+   * A REPLACE lock can coexist with any number of other APPEND locks and revoked locks
+   * @param conflictPosses conflicting lock posses
+   * @param replaceLock replace lock request
+   * @return true iff replace lock can coexist with all its conflicting locks
+   */
+  private static boolean canReplaceLockCoexist(List<TaskLockPosse> conflictPosses, LockRequest replaceLock)
   {
-    return lockPosses.stream().allMatch(taskLockPosse -> isRevocable(taskLockPosse, tryLockPriority));
+    for (TaskLockPosse posse : conflictPosses) {
+      if (posse.getTaskLock().isRevoked()) {
+        continue;
+      }
+      if (posse.getTaskLock().getType().equals(TaskLockType.EXCLUSIVE)
+          || posse.getTaskLock().getType().equals(TaskLockType.SHARED)
+          || posse.getTaskLock().getType().equals(TaskLockType.REPLACE)) {
+        return false;
+      }
+      if (posse.getTaskLock().getType().equals(TaskLockType.APPEND)
+          && !replaceLock.getInterval().contains(posse.getTaskLock().getInterval())) {
+        return false;
+      }
+    }
+    return true;
   }
 
-  private static boolean isRevocable(TaskLockPosse lockPosse, int tryLockPriority)
+  /**
+   * Check if a SHARED lock can coexist with a given set of conflicting posses.
+   * A SHARED lock can coexist with any number of other active SHARED locks
+   * @param conflictPosses conflicting lock posses
+   * @param sharedLock replace lock request
+   * @return true iff shared lock can coexist with all its conflicting locks
+   */
+  private static boolean canSharedLockCoexist(List<TaskLockPosse> conflictPosses, LockRequest sharedLock)
   {
-    final TaskLock existingLock = lockPosse.getTaskLock();
-    return existingLock.isRevoked() || existingLock.getNonNullPriority() < tryLockPriority;
+    for (TaskLockPosse posse : conflictPosses) {
+      if (posse.getTaskLock().isRevoked()) {
+        continue;
+      }
+      if (posse.getTaskLock().getType().equals(TaskLockType.EXCLUSIVE)
+          || posse.getTaskLock().getType().equals(TaskLockType.APPEND)
+          || posse.getTaskLock().getType().equals(TaskLockType.REPLACE)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Check if an EXCLUSIVE lock can coexist with a given set of conflicting posses.
+   * An EXCLUSIVE lock cannot coexist with any other overlapping active locks
+   * @param conflictPosses conflicting lock posses
+   * @param exclusiveLock exclusive lock request
+   * @return true iff the exclusive lock can coexist with all its conflicting locks
+   */
+  private static boolean canExclusiveLockCoexist(List<TaskLockPosse> conflictPosses, LockRequest exclusiveLock)

Review Comment:
   ## Useless parameter
   
   The parameter 'exclusiveLock' is never used.
   
   [Show more details](https://github.com/apache/druid/security/code-scanning/4945)



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java:
##########
@@ -1239,43 +1262,192 @@
   }
 
   /**
-   * Check if all lockPosses are either shared
-   * OR of lower priority
-   * OR are revoked non-shared locks if their priorities are greater than or equal to the provided priority
-   * @param lockPosses conflicting task lock posses to be checked
-   * @param priority priority of the lock to be acquired
-   * @return true if the condititons are met
+   * Check if the lock for a given request can coexist with a given set of conflicting posses without any revocation.
+   * @param conflictPosses conflict lock posses
+   * @param request lock request
+   * @return true iff the lock can coexist with all its conflicting locks
    */
-  private static boolean areAllEqualOrHigherPriorityLocksSharedOrRevoked(List<TaskLockPosse> lockPosses, int priority)
+  private boolean canLockCoexist(List<TaskLockPosse> conflictPosses, LockRequest request)
   {
-    return lockPosses.stream()
-                     .filter(taskLockPosse -> taskLockPosse.getTaskLock().getNonNullPriority() >= priority)
-                     .allMatch(taskLockPosse -> taskLockPosse.getTaskLock().getType().equals(TaskLockType.SHARED)
-                                                || taskLockPosse.getTaskLock().isRevoked());
+    switch (request.getType()) {
+      case APPEND:
+        return canAppendLockCoexist(conflictPosses, request);
+      case REPLACE:
+        return canReplaceLockCoexist(conflictPosses, request);
+      case SHARED:
+        return canSharedLockCoexist(conflictPosses, request);
+      case EXCLUSIVE:
+        return canExclusiveLockCoexist(conflictPosses, request);
+      default:
+        return false;
+    }
   }
 
   /**
-   * Revokes all non-shared locks with priorities lower than the provided priority
-   * @param lockPosses conflicting task lock posses which may be revoked
-   * @param priority priority of the lock to be acquired
+   * Check if an APPEND lock can coexist with a given set of conflicting posses.
+   * An APPEND lock can coexist with any number of other APPEND locks
+   *    OR with at most one REPLACE lock over an interval which encloes this request.
+   * @param conflictPosses conflicting lock posses
+   * @param appendRequest append lock request
+   * @return true iff append lock can coexist with all its conflicting locks
    */
-  private void revokeAllLowerPriorityNonSharedLocks(List<TaskLockPosse> lockPosses, int priority)
+  private boolean canAppendLockCoexist(List<TaskLockPosse> conflictPosses, LockRequest appendRequest)
   {
-    lockPosses.stream()
-              .filter(taskLockPosse -> !TaskLockType.SHARED.equals(taskLockPosse.getTaskLock().getType()))
-              .filter(taskLockPosse -> taskLockPosse.getTaskLock().getNonNullPriority() < priority)
-              .forEach(this::revokeLock);
+    TaskLock replaceLock = null;
+    for (TaskLockPosse posse : conflictPosses) {
+      if (posse.getTaskLock().isRevoked()) {
+        continue;
+      }
+      if (posse.getTaskLock().getType().equals(TaskLockType.EXCLUSIVE)
+          || posse.getTaskLock().getType().equals(TaskLockType.SHARED)) {
+        return false;
+      }
+      if (posse.getTaskLock().getType().equals(TaskLockType.REPLACE)) {
+        if (replaceLock != null) {
+          return false;
+        }
+        replaceLock = posse.getTaskLock();
+        if (!replaceLock.getInterval().contains(appendRequest.getInterval())) {
+          return false;
+        }
+      }
+    }
+    return true;
   }
 
-  private static boolean isAllRevocable(List<TaskLockPosse> lockPosses, int tryLockPriority)
+  /**
+   * Check if a REPLACE lock can coexist with a given set of conflicting posses.
+   * A REPLACE lock can coexist with any number of other APPEND locks and revoked locks
+   * @param conflictPosses conflicting lock posses
+   * @param replaceLock replace lock request
+   * @return true iff replace lock can coexist with all its conflicting locks
+   */
+  private static boolean canReplaceLockCoexist(List<TaskLockPosse> conflictPosses, LockRequest replaceLock)
   {
-    return lockPosses.stream().allMatch(taskLockPosse -> isRevocable(taskLockPosse, tryLockPriority));
+    for (TaskLockPosse posse : conflictPosses) {
+      if (posse.getTaskLock().isRevoked()) {
+        continue;
+      }
+      if (posse.getTaskLock().getType().equals(TaskLockType.EXCLUSIVE)
+          || posse.getTaskLock().getType().equals(TaskLockType.SHARED)
+          || posse.getTaskLock().getType().equals(TaskLockType.REPLACE)) {
+        return false;
+      }
+      if (posse.getTaskLock().getType().equals(TaskLockType.APPEND)
+          && !replaceLock.getInterval().contains(posse.getTaskLock().getInterval())) {
+        return false;
+      }
+    }
+    return true;
   }
 
-  private static boolean isRevocable(TaskLockPosse lockPosse, int tryLockPriority)
+  /**
+   * Check if a SHARED lock can coexist with a given set of conflicting posses.
+   * A SHARED lock can coexist with any number of other active SHARED locks
+   * @param conflictPosses conflicting lock posses
+   * @param sharedLock replace lock request
+   * @return true iff shared lock can coexist with all its conflicting locks
+   */
+  private static boolean canSharedLockCoexist(List<TaskLockPosse> conflictPosses, LockRequest sharedLock)

Review Comment:
   ## Useless parameter
   
   The parameter 'sharedLock' is never used.
   
   [Show more details](https://github.com/apache/druid/security/code-scanning/4946)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] AmatyaAvadhanula commented on a diff in pull request #14258: Add new lock types: APPEND and REPLACE

Posted by "AmatyaAvadhanula (via GitHub)" <gi...@apache.org>.
AmatyaAvadhanula commented on code in PR #14258:
URL: https://github.com/apache/druid/pull/14258#discussion_r1192131752


##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java:
##########
@@ -1239,43 +1262,190 @@ Map<String, NavigableMap<DateTime, SortedMap<Interval, List<TaskLockPosse>>>> ge
   }
 
   /**
-   * Check if all lockPosses are either shared
-   * OR of lower priority
-   * OR are revoked non-shared locks if their priorities are greater than or equal to the provided priority
-   * @param lockPosses conflicting task lock posses to be checked
-   * @param priority priority of the lock to be acquired
-   * @return true if the condititons are met
+   * Check if the lock for a given request can coexist with a given set of conflicting posses without any revocation.
+   * @param conflictPosses conflict lock posses
+   * @param request lock request
+   * @return true iff the lock can coexist with all its conflicting locks
    */
-  private static boolean areAllEqualOrHigherPriorityLocksSharedOrRevoked(List<TaskLockPosse> lockPosses, int priority)
+  private boolean canLockCoexist(List<TaskLockPosse> conflictPosses, LockRequest request)
   {
-    return lockPosses.stream()
-                     .filter(taskLockPosse -> taskLockPosse.getTaskLock().getNonNullPriority() >= priority)
-                     .allMatch(taskLockPosse -> taskLockPosse.getTaskLock().getType().equals(TaskLockType.SHARED)
-                                                || taskLockPosse.getTaskLock().isRevoked());
+    switch (request.getType()) {
+      case APPEND:
+        return canAppendLockCoexist(conflictPosses, request);
+      case REPLACE:
+        return canReplaceLockCoexist(conflictPosses, request);
+      case SHARED:
+        return canSharedLockCoexist(conflictPosses);
+      case EXCLUSIVE:
+        return canExclusiveLockCoexist(conflictPosses);
+      default:
+        return false;
+    }
   }
 
   /**
-   * Revokes all non-shared locks with priorities lower than the provided priority
-   * @param lockPosses conflicting task lock posses which may be revoked
-   * @param priority priority of the lock to be acquired
+   * Check if an APPEND lock can coexist with a given set of conflicting posses.
+   * An APPEND lock can coexist with any number of other APPEND locks
+   *    OR with at most one REPLACE lock over an interval which encloes this request.
+   * @param conflictPosses conflicting lock posses
+   * @param appendRequest append lock request
+   * @return true iff append lock can coexist with all its conflicting locks
    */
-  private void revokeAllLowerPriorityNonSharedLocks(List<TaskLockPosse> lockPosses, int priority)
+  private boolean canAppendLockCoexist(List<TaskLockPosse> conflictPosses, LockRequest appendRequest)
   {
-    lockPosses.stream()
-              .filter(taskLockPosse -> !TaskLockType.SHARED.equals(taskLockPosse.getTaskLock().getType()))
-              .filter(taskLockPosse -> taskLockPosse.getTaskLock().getNonNullPriority() < priority)
-              .forEach(this::revokeLock);
+    TaskLock replaceLock = null;
+    for (TaskLockPosse posse : conflictPosses) {
+      if (posse.getTaskLock().isRevoked()) {
+        continue;
+      }
+      if (posse.getTaskLock().getType().equals(TaskLockType.EXCLUSIVE)
+          || posse.getTaskLock().getType().equals(TaskLockType.SHARED)) {
+        return false;
+      }
+      if (posse.getTaskLock().getType().equals(TaskLockType.REPLACE)) {
+        if (replaceLock != null) {
+          return false;
+        }
+        replaceLock = posse.getTaskLock();
+        if (!replaceLock.getInterval().contains(appendRequest.getInterval())) {
+          return false;
+        }
+      }
+    }
+    return true;
   }
 
-  private static boolean isAllRevocable(List<TaskLockPosse> lockPosses, int tryLockPriority)
+  /**
+   * Check if a REPLACE lock can coexist with a given set of conflicting posses.
+   * A REPLACE lock can coexist with any number of other APPEND locks and revoked locks
+   * @param conflictPosses conflicting lock posses
+   * @param replaceLock replace lock request
+   * @return true iff replace lock can coexist with all its conflicting locks
+   */
+  private boolean canReplaceLockCoexist(List<TaskLockPosse> conflictPosses, LockRequest replaceLock)
   {
-    return lockPosses.stream().allMatch(taskLockPosse -> isRevocable(taskLockPosse, tryLockPriority));
+    for (TaskLockPosse posse : conflictPosses) {
+      if (posse.getTaskLock().isRevoked()) {
+        continue;
+      }
+      if (posse.getTaskLock().getType().equals(TaskLockType.EXCLUSIVE)
+          || posse.getTaskLock().getType().equals(TaskLockType.SHARED)
+          || posse.getTaskLock().getType().equals(TaskLockType.REPLACE)) {
+        return false;
+      }
+      if (posse.getTaskLock().getType().equals(TaskLockType.APPEND)
+          && !replaceLock.getInterval().contains(posse.getTaskLock().getInterval())) {
+        return false;
+      }
+    }
+    return true;
   }
 
-  private static boolean isRevocable(TaskLockPosse lockPosse, int tryLockPriority)
+  /**
+   * Check if a SHARED lock can coexist with a given set of conflicting posses.
+   * A SHARED lock can coexist with any number of other active SHARED locks
+   * @param conflictPosses conflicting lock posses
+   * @return true iff shared lock can coexist with all its conflicting locks
+   */
+  private boolean canSharedLockCoexist(List<TaskLockPosse> conflictPosses)
   {
-    final TaskLock existingLock = lockPosse.getTaskLock();
-    return existingLock.isRevoked() || existingLock.getNonNullPriority() < tryLockPriority;
+    for (TaskLockPosse posse : conflictPosses) {
+      if (posse.getTaskLock().isRevoked()) {
+        continue;
+      }
+      if (posse.getTaskLock().getType().equals(TaskLockType.EXCLUSIVE)
+          || posse.getTaskLock().getType().equals(TaskLockType.APPEND)
+          || posse.getTaskLock().getType().equals(TaskLockType.REPLACE)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Check if an EXCLUSIVE lock can coexist with a given set of conflicting posses.
+   * An EXCLUSIVE lock cannot coexist with any other overlapping active locks
+   * @param conflictPosses conflicting lock posses
+   * @return true iff the exclusive lock can coexist with all its conflicting locks
+   */
+  private boolean canExclusiveLockCoexist(List<TaskLockPosse> conflictPosses)
+  {
+    for (TaskLockPosse posse : conflictPosses) {
+      if (posse.getTaskLock().isRevoked()) {
+        continue;
+      }
+      return false;
+    }
+    return true;
+  }
+
+
+  /**
+   * Verify if each incompatible active lock is revokable. If yes, revoke all of them.
+   * - EXCLUSIVE locks are incompatible with every other conflicting lock
+   * - SHARED locks are incompatible with conflicting locks of every other type
+   * - REPLACE locks are incompatible with every conflicting lock which is not (APPEND and enclosed) within its interval
+   * - APPEND locks are incompatible with every EXCLUSIVE and SHARED lock.
+   *   Conflicting REPLACE locks which don't enclose its interval are also incompatible.
+   * @param conflictPosses conflicting lock posses
+   * @param request lock request
+   * @return true iff every incompatible lock is revocable.
+   */
+  private boolean revokeAllIncompatibleActiveLocksIfPossible(
+      List<TaskLockPosse> conflictPosses,
+      LockRequest request
+  )
+  {
+    final int priority = request.getPriority();
+    final TaskLockType type = request.getType();
+    final List<TaskLockPosse> possesToRevoke = new ArrayList<>();
+
+    for (TaskLockPosse posse : conflictPosses) {
+      if (posse.getTaskLock().isRevoked()) {
+        continue;
+      }
+      switch (type) {
+        case EXCLUSIVE:
+          if (posse.getTaskLock().getPriority() >= priority) {
+            return false;
+          }
+          possesToRevoke.add(posse);
+          break;
+        case SHARED:
+          if (!posse.getTaskLock().getType().equals(TaskLockType.SHARED)) {
+            if (posse.getTaskLock().getPriority() >= priority) {
+              return false;
+            }
+            possesToRevoke.add(posse);
+          }
+          break;
+        case REPLACE:
+          if (!(posse.getTaskLock().getType().equals(TaskLockType.APPEND)
+                && request.getInterval().contains(posse.getTaskLock().getInterval()))) {
+            if (posse.getTaskLock().getPriority() >= priority) {
+              return false;
+            }
+            possesToRevoke.add(posse);
+          }
+          break;
+        case APPEND:
+          if (!(posse.getTaskLock().getType().equals(TaskLockType.APPEND)
+                || (posse.getTaskLock().getType().equals(TaskLockType.REPLACE)
+                    && posse.getTaskLock().getInterval().contains(request.getInterval())))) {
+            if (posse.getTaskLock().getPriority() >= priority) {
+              return false;
+            }
+            possesToRevoke.add(posse);
+          }
+          break;
+        default:
+          return false;
+      }

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] AmatyaAvadhanula commented on a diff in pull request #14258: Add new lock types: APPEND and REPLACE

Posted by "AmatyaAvadhanula (via GitHub)" <gi...@apache.org>.
AmatyaAvadhanula commented on code in PR #14258:
URL: https://github.com/apache/druid/pull/14258#discussion_r1192054608


##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java:
##########
@@ -1239,43 +1262,190 @@ Map<String, NavigableMap<DateTime, SortedMap<Interval, List<TaskLockPosse>>>> ge
   }
 
   /**
-   * Check if all lockPosses are either shared
-   * OR of lower priority
-   * OR are revoked non-shared locks if their priorities are greater than or equal to the provided priority
-   * @param lockPosses conflicting task lock posses to be checked
-   * @param priority priority of the lock to be acquired
-   * @return true if the condititons are met
+   * Check if the lock for a given request can coexist with a given set of conflicting posses without any revocation.
+   * @param conflictPosses conflict lock posses
+   * @param request lock request
+   * @return true iff the lock can coexist with all its conflicting locks
    */
-  private static boolean areAllEqualOrHigherPriorityLocksSharedOrRevoked(List<TaskLockPosse> lockPosses, int priority)
+  private boolean canLockCoexist(List<TaskLockPosse> conflictPosses, LockRequest request)
   {
-    return lockPosses.stream()
-                     .filter(taskLockPosse -> taskLockPosse.getTaskLock().getNonNullPriority() >= priority)
-                     .allMatch(taskLockPosse -> taskLockPosse.getTaskLock().getType().equals(TaskLockType.SHARED)
-                                                || taskLockPosse.getTaskLock().isRevoked());
+    switch (request.getType()) {
+      case APPEND:
+        return canAppendLockCoexist(conflictPosses, request);
+      case REPLACE:
+        return canReplaceLockCoexist(conflictPosses, request);
+      case SHARED:
+        return canSharedLockCoexist(conflictPosses);
+      case EXCLUSIVE:
+        return canExclusiveLockCoexist(conflictPosses);
+      default:
+        return false;
+    }
   }
 
   /**
-   * Revokes all non-shared locks with priorities lower than the provided priority
-   * @param lockPosses conflicting task lock posses which may be revoked
-   * @param priority priority of the lock to be acquired
+   * Check if an APPEND lock can coexist with a given set of conflicting posses.
+   * An APPEND lock can coexist with any number of other APPEND locks
+   *    OR with at most one REPLACE lock over an interval which encloes this request.
+   * @param conflictPosses conflicting lock posses
+   * @param appendRequest append lock request
+   * @return true iff append lock can coexist with all its conflicting locks
    */
-  private void revokeAllLowerPriorityNonSharedLocks(List<TaskLockPosse> lockPosses, int priority)
+  private boolean canAppendLockCoexist(List<TaskLockPosse> conflictPosses, LockRequest appendRequest)
   {
-    lockPosses.stream()
-              .filter(taskLockPosse -> !TaskLockType.SHARED.equals(taskLockPosse.getTaskLock().getType()))
-              .filter(taskLockPosse -> taskLockPosse.getTaskLock().getNonNullPriority() < priority)
-              .forEach(this::revokeLock);
+    TaskLock replaceLock = null;
+    for (TaskLockPosse posse : conflictPosses) {
+      if (posse.getTaskLock().isRevoked()) {
+        continue;
+      }
+      if (posse.getTaskLock().getType().equals(TaskLockType.EXCLUSIVE)
+          || posse.getTaskLock().getType().equals(TaskLockType.SHARED)) {
+        return false;
+      }
+      if (posse.getTaskLock().getType().equals(TaskLockType.REPLACE)) {
+        if (replaceLock != null) {
+          return false;
+        }
+        replaceLock = posse.getTaskLock();
+        if (!replaceLock.getInterval().contains(appendRequest.getInterval())) {
+          return false;
+        }
+      }
+    }
+    return true;
   }
 
-  private static boolean isAllRevocable(List<TaskLockPosse> lockPosses, int tryLockPriority)
+  /**
+   * Check if a REPLACE lock can coexist with a given set of conflicting posses.
+   * A REPLACE lock can coexist with any number of other APPEND locks and revoked locks
+   * @param conflictPosses conflicting lock posses
+   * @param replaceLock replace lock request
+   * @return true iff replace lock can coexist with all its conflicting locks
+   */
+  private boolean canReplaceLockCoexist(List<TaskLockPosse> conflictPosses, LockRequest replaceLock)
   {
-    return lockPosses.stream().allMatch(taskLockPosse -> isRevocable(taskLockPosse, tryLockPriority));
+    for (TaskLockPosse posse : conflictPosses) {
+      if (posse.getTaskLock().isRevoked()) {
+        continue;
+      }
+      if (posse.getTaskLock().getType().equals(TaskLockType.EXCLUSIVE)
+          || posse.getTaskLock().getType().equals(TaskLockType.SHARED)
+          || posse.getTaskLock().getType().equals(TaskLockType.REPLACE)) {
+        return false;
+      }
+      if (posse.getTaskLock().getType().equals(TaskLockType.APPEND)
+          && !replaceLock.getInterval().contains(posse.getTaskLock().getInterval())) {
+        return false;
+      }
+    }
+    return true;
   }
 
-  private static boolean isRevocable(TaskLockPosse lockPosse, int tryLockPriority)
+  /**
+   * Check if a SHARED lock can coexist with a given set of conflicting posses.
+   * A SHARED lock can coexist with any number of other active SHARED locks
+   * @param conflictPosses conflicting lock posses
+   * @return true iff shared lock can coexist with all its conflicting locks
+   */
+  private boolean canSharedLockCoexist(List<TaskLockPosse> conflictPosses)
   {
-    final TaskLock existingLock = lockPosse.getTaskLock();
-    return existingLock.isRevoked() || existingLock.getNonNullPriority() < tryLockPriority;
+    for (TaskLockPosse posse : conflictPosses) {
+      if (posse.getTaskLock().isRevoked()) {
+        continue;
+      }
+      if (posse.getTaskLock().getType().equals(TaskLockType.EXCLUSIVE)
+          || posse.getTaskLock().getType().equals(TaskLockType.APPEND)
+          || posse.getTaskLock().getType().equals(TaskLockType.REPLACE)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Check if an EXCLUSIVE lock can coexist with a given set of conflicting posses.
+   * An EXCLUSIVE lock cannot coexist with any other overlapping active locks
+   * @param conflictPosses conflicting lock posses
+   * @return true iff the exclusive lock can coexist with all its conflicting locks
+   */
+  private boolean canExclusiveLockCoexist(List<TaskLockPosse> conflictPosses)
+  {
+    for (TaskLockPosse posse : conflictPosses) {
+      if (posse.getTaskLock().isRevoked()) {
+        continue;
+      }
+      return false;
+    }
+    return true;
+  }
+
+
+  /**
+   * Verify if each incompatible active lock is revokable. If yes, revoke all of them.

Review Comment:
   I meant "if every" by "if each"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] AmatyaAvadhanula commented on a diff in pull request #14258: Add new lock types: APPEND and REPLACE

Posted by "AmatyaAvadhanula (via GitHub)" <gi...@apache.org>.
AmatyaAvadhanula commented on code in PR #14258:
URL: https://github.com/apache/druid/pull/14258#discussion_r1191858557


##########
indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java:
##########
@@ -1272,6 +1276,627 @@ public void testGetLockedIntervalsForEqualPriorityTask() throws Exception
     );
   }
 
+  @Test
+  public void testExclusiveLockCompatibility() throws Exception
+  {
+    final List<Task> tasks = new ArrayList<>();
+
+    final TaskLock theLock = insertTaskLock(
+        TaskLockType.EXCLUSIVE,
+        Intervals.of("2017/2018"),
+        MEDIUM_PRIORITY,
+        tasks
+    );
+    Assert.assertNotNull(theLock);
+    Assert.assertFalse(theLock.isRevoked());
+
+    // Another exclusive lock cannot be created for an overlapping interval
+    Assert.assertNull(
+        insertTaskLock(
+            TaskLockType.EXCLUSIVE,
+            Intervals.of("2017-05-01/2017-06-01"),
+            MEDIUM_PRIORITY,
+            tasks
+        )
+    );
+
+    // A shared lock cannot be created for an overlapping interval
+    Assert.assertNull(
+        insertTaskLock(
+            TaskLockType.SHARED,
+            Intervals.of("2016/2019"),
+            MEDIUM_PRIORITY,
+            tasks
+        )
+    );
+
+    // A replace lock cannot be created for an overlapping interval
+    Assert.assertNull(
+        insertTaskLock(
+            TaskLockType.REPLACE,
+            Intervals.of("2017/2018"),
+            MEDIUM_PRIORITY,
+            tasks
+        )
+    );
+
+    // An append lock cannot be created for an overlapping interval
+    Assert.assertNull(
+        insertTaskLock(
+            TaskLockType.APPEND,
+            Intervals.of("2017-05-01/2018-05-01"),
+            MEDIUM_PRIORITY,
+            tasks
+        )
+    );
+
+    final Set<TaskLock> expectedLocks = ImmutableSet.of(
+        theLock
+    );
+    Assert.assertEquals(expectedLocks, getAllLocks(tasks));
+  }
+
+  @Test
+  public void testExclusiveLockCanRevokeAllIncompatible() throws Exception
+  {
+    final List<Task> tasks = new ArrayList<>();
+
+    // Revoked shared lock of higher priority -> revoked locks are not incompatible
+    final TaskLock sharedLock = insertTaskLock(
+        TaskLockType.SHARED,
+        Intervals.of("2016/2019"),
+        HIGH_PRIORITY,
+        tasks
+    );
+    Assert.assertNotNull(sharedLock);
+    lockbox.revokeLock(tasks.get(0).getId(), sharedLock);
+
+    // Active Exclusive lock of lower priority -> will be revoked
+    final TaskLock exclusiveLock = insertTaskLock(
+        TaskLockType.EXCLUSIVE,
+        Intervals.of("2017-01-01/2017-02-01"),
+        LOW_PRIORITY,
+        tasks
+    );
+    Assert.assertNotNull(exclusiveLock);
+    Assert.assertFalse(exclusiveLock.isRevoked());
+
+    // Active replace lock of lower priority -> will be revoked
+    final TaskLock replaceLock = insertTaskLock(
+        TaskLockType.REPLACE,
+        Intervals.of("2017-07-01/2018-01-01"),
+        LOW_PRIORITY,
+        tasks
+    );
+    Assert.assertNotNull(replaceLock);
+    Assert.assertFalse(replaceLock.isRevoked());

Review Comment:
   Apologies, I should have elaborated further in the comment.
   The tests to check compatiblity create a lock of a specific type initially and then try to see if the lock was granted or not later. 
   The tests to check the revoking create a few locks that may be active initially, and are then revoked at the very end after the lock of the chosen type is granted.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] AmatyaAvadhanula commented on a diff in pull request #14258: Add new lock types: APPEND and REPLACE

Posted by "AmatyaAvadhanula (via GitHub)" <gi...@apache.org>.
AmatyaAvadhanula commented on code in PR #14258:
URL: https://github.com/apache/druid/pull/14258#discussion_r1192117721


##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java:
##########
@@ -1097,6 +1099,27 @@ public void remove(final Task task)
     }
   }
 
+  public Set<TaskLock> getAllExclusiveLocksForDatasource(final String datasource)

Review Comment:
   Removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] AmatyaAvadhanula commented on a diff in pull request #14258: Add new lock types: APPEND and REPLACE

Posted by "AmatyaAvadhanula (via GitHub)" <gi...@apache.org>.
AmatyaAvadhanula commented on code in PR #14258:
URL: https://github.com/apache/druid/pull/14258#discussion_r1192058635


##########
indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java:
##########
@@ -1272,6 +1276,627 @@ public void testGetLockedIntervalsForEqualPriorityTask() throws Exception
     );
   }
 
+  @Test
+  public void testExclusiveLockCompatibility() throws Exception
+  {
+    final List<Task> tasks = new ArrayList<>();
+
+    final TaskLock theLock = insertTaskLock(
+        TaskLockType.EXCLUSIVE,
+        Intervals.of("2017/2018"),
+        MEDIUM_PRIORITY,
+        tasks
+    );
+    Assert.assertNotNull(theLock);
+    Assert.assertFalse(theLock.isRevoked());
+
+    // Another exclusive lock cannot be created for an overlapping interval
+    Assert.assertNull(
+        insertTaskLock(
+            TaskLockType.EXCLUSIVE,
+            Intervals.of("2017-05-01/2017-06-01"),
+            MEDIUM_PRIORITY,
+            tasks
+        )
+    );
+
+    // A shared lock cannot be created for an overlapping interval
+    Assert.assertNull(
+        insertTaskLock(
+            TaskLockType.SHARED,
+            Intervals.of("2016/2019"),
+            MEDIUM_PRIORITY,
+            tasks
+        )
+    );
+
+    // A replace lock cannot be created for an overlapping interval
+    Assert.assertNull(
+        insertTaskLock(
+            TaskLockType.REPLACE,
+            Intervals.of("2017/2018"),
+            MEDIUM_PRIORITY,
+            tasks
+        )
+    );
+
+    // An append lock cannot be created for an overlapping interval
+    Assert.assertNull(
+        insertTaskLock(
+            TaskLockType.APPEND,
+            Intervals.of("2017-05-01/2018-05-01"),
+            MEDIUM_PRIORITY,
+            tasks
+        )
+    );
+
+    final Set<TaskLock> expectedLocks = ImmutableSet.of(
+        theLock
+    );
+    Assert.assertEquals(expectedLocks, getAllLocks(tasks));
+  }
+
+  @Test
+  public void testExclusiveLockCanRevokeAllIncompatible() throws Exception
+  {
+    final List<Task> tasks = new ArrayList<>();
+
+    // Revoked shared lock of higher priority -> revoked locks are not incompatible
+    final TaskLock sharedLock = insertTaskLock(
+        TaskLockType.SHARED,
+        Intervals.of("2016/2019"),
+        HIGH_PRIORITY,
+        tasks
+    );
+    Assert.assertNotNull(sharedLock);
+    lockbox.revokeLock(tasks.get(0).getId(), sharedLock);
+
+    // Active Exclusive lock of lower priority -> will be revoked
+    final TaskLock exclusiveLock = insertTaskLock(
+        TaskLockType.EXCLUSIVE,
+        Intervals.of("2017-01-01/2017-02-01"),
+        LOW_PRIORITY,
+        tasks
+    );
+    Assert.assertNotNull(exclusiveLock);
+    Assert.assertFalse(exclusiveLock.isRevoked());
+
+    // Active replace lock of lower priority -> will be revoked
+    final TaskLock replaceLock = insertTaskLock(
+        TaskLockType.REPLACE,
+        Intervals.of("2017-07-01/2018-01-01"),
+        LOW_PRIORITY,
+        tasks
+    );
+    Assert.assertNotNull(replaceLock);
+    Assert.assertFalse(replaceLock.isRevoked());
+
+    // Active append lock of lower priority -> will be revoked
+    final TaskLock appendLock = insertTaskLock(
+        TaskLockType.APPEND,
+        Intervals.of("2017-09-01/2017-10-01"),
+        LOW_PRIORITY,
+        tasks
+    );
+    Assert.assertNotNull(appendLock);
+    Assert.assertFalse(appendLock.isRevoked());
+
+    final TaskLock theLock = insertTaskLock(
+        TaskLockType.EXCLUSIVE,
+        Intervals.of("2017/2018"),
+        MEDIUM_PRIORITY,
+        tasks
+    );
+    Assert.assertNotNull(theLock);
+    Assert.assertFalse(theLock.isRevoked());
+
+    final Set<TaskLock> expectedLocks = ImmutableSet.of(
+        theLock,
+        sharedLock.revokedCopy(),
+        exclusiveLock.revokedCopy(),
+        appendLock.revokedCopy(),
+        replaceLock.revokedCopy()
+    );
+    Assert.assertEquals(expectedLocks, getAllLocks(tasks));
+
+    final Set<TaskLock> expectedActiveLocks = ImmutableSet.of(
+        theLock
+    );
+    Assert.assertEquals(expectedActiveLocks, getAllActiveLocks(tasks));
+  }
+
+  @Test
+  public void testSharedLockCompatibility() throws Exception
+  {
+    final List<Task> tasks = new ArrayList<>();
+
+    final TaskLock theLock = insertTaskLock(
+        TaskLockType.SHARED,
+        Intervals.of("2017/2018"),
+        MEDIUM_PRIORITY,
+        tasks
+    );
+    Assert.assertNotNull(theLock);
+    Assert.assertFalse(theLock.isRevoked());
+
+    // No overlapping exclusive lock is compatible
+    Assert.assertNull(
+        insertTaskLock(
+            TaskLockType.EXCLUSIVE,
+            Intervals.of("2017-05-01/2017-06-01"),
+            MEDIUM_PRIORITY,
+            tasks
+        )
+    );
+
+    // Enclosing shared lock of lower priority - compatible
+    final TaskLock sharedLock0 = insertTaskLock(
+            TaskLockType.SHARED,
+            Intervals.of("2016/2019"),
+            LOW_PRIORITY,
+            tasks
+    );
+    Assert.assertNotNull(sharedLock0);
+    Assert.assertFalse(sharedLock0.isRevoked());
+
+    // Enclosed shared lock of higher priority - compatible
+    final TaskLock sharedLock1 = insertTaskLock(
+        TaskLockType.SHARED,
+        Intervals.of("2017-06-01/2017-07-01"),
+        LOW_PRIORITY,
+        tasks
+    );
+    Assert.assertNotNull(sharedLock1);
+    Assert.assertFalse(sharedLock1.isRevoked());
+
+    // Partially Overlapping shared lock of equal priority - compatible
+    final TaskLock sharedLock2 = insertTaskLock(
+        TaskLockType.SHARED,
+        Intervals.of("2017-05-01/2018-05-01"),
+        MEDIUM_PRIORITY,
+        tasks
+    );
+    Assert.assertNotNull(sharedLock2);
+    Assert.assertFalse(sharedLock2.isRevoked());
+
+    // Conficting replace locks are incompatible
+    Assert.assertNull(
+        insertTaskLock(
+            TaskLockType.REPLACE,
+            Intervals.of("2017/2018"),
+            MEDIUM_PRIORITY,
+            tasks
+        )
+    );
+
+    // Conflicting append locks are incompatible
+    Assert.assertNull(
+        insertTaskLock(
+            TaskLockType.APPEND,
+            Intervals.of("2017-05-01/2018-05-01"),
+            MEDIUM_PRIORITY,
+            tasks
+        )
+    );
+
+    final Set<TaskLock> expectedLocks = ImmutableSet.of(
+        theLock,
+        sharedLock0,
+        sharedLock1,
+        sharedLock2
+    );
+    Assert.assertEquals(expectedLocks, getAllLocks(tasks));
+  }

Review Comment:
   Done. Thanks



##########
indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java:
##########
@@ -1403,6 +1987,57 @@ public void testConflictsWithOverlappingSharedLocks() throws Exception
     );
   }
 
+  @Test
+  public void testFailedToReacquireTaskLock() throws Exception
+  {
+    // Tasks to be failed have a group id with the substring "FailingLockAcquisition"
+    // Please refer to NullLockPosseTaskLockbox
+    final Task taskWithFailingLockAcquisition0 = NoopTask.withGroupId("FailingLockAcquisition");
+    final Task taskWithFailingLockAcquisition1 = NoopTask.withGroupId("FailingLockAcquisition");
+    final Task taskWithSuccessfulLockAcquisition = NoopTask.create();
+    taskStorage.insert(taskWithFailingLockAcquisition0, TaskStatus.running(taskWithFailingLockAcquisition0.getId()));
+    taskStorage.insert(taskWithFailingLockAcquisition1, TaskStatus.running(taskWithFailingLockAcquisition1.getId()));
+    taskStorage.insert(taskWithSuccessfulLockAcquisition, TaskStatus.running(taskWithSuccessfulLockAcquisition.getId()));
+
+    TaskLockbox testLockbox = new NullLockPosseTaskLockbox(taskStorage, metadataStorageCoordinator);
+    testLockbox.add(taskWithFailingLockAcquisition0);
+    testLockbox.add(taskWithFailingLockAcquisition1);
+    testLockbox.add(taskWithSuccessfulLockAcquisition);
+
+    testLockbox.tryLock(taskWithFailingLockAcquisition0,
+                        new TimeChunkLockRequest(TaskLockType.EXCLUSIVE,
+                                                 taskWithFailingLockAcquisition0,
+                                                 Intervals.of("2017-07-01/2017-08-01"),
+                                                 null
+                        )
+    );
+
+    testLockbox.tryLock(taskWithSuccessfulLockAcquisition,
+                        new TimeChunkLockRequest(TaskLockType.EXCLUSIVE,
+                                                 taskWithSuccessfulLockAcquisition,
+                                                 Intervals.of("2017-07-01/2017-08-01"),
+                                                 null
+                        )
+    );
+
+    Assert.assertEquals(3, taskStorage.getActiveTasks().size());
+
+    // The tasks must be marked for failure
+    TaskLockboxSyncResult result = testLockbox.syncFromStorage();
+    Assert.assertEquals(ImmutableSet.of(taskWithFailingLockAcquisition0, taskWithFailingLockAcquisition1),
+                        result.getTasksToFail());
+  }

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] imply-cheddar commented on a diff in pull request #14258: Add new lock types: APPEND and REPLACE

Posted by "imply-cheddar (via GitHub)" <gi...@apache.org>.
imply-cheddar commented on code in PR #14258:
URL: https://github.com/apache/druid/pull/14258#discussion_r1191752099


##########
indexing-service/src/main/java/org/apache/druid/indexing/common/TaskLockType.java:
##########
@@ -21,6 +21,8 @@
 
 public enum TaskLockType
 {
+  REPLACE, // Meant to be used only with Replacing jobs and timechunk locking
+  APPEND, // Meant to be used only with Appending jobs and timechunk locking

Review Comment:
   Probably makes sense to just add javadoc explanations for each of the lock types at this point.  3 of them have comments to try to explain them, so just make it javadoc and perhaps add more words.



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java:
##########
@@ -1239,43 +1262,190 @@ Map<String, NavigableMap<DateTime, SortedMap<Interval, List<TaskLockPosse>>>> ge
   }
 
   /**
-   * Check if all lockPosses are either shared
-   * OR of lower priority
-   * OR are revoked non-shared locks if their priorities are greater than or equal to the provided priority
-   * @param lockPosses conflicting task lock posses to be checked
-   * @param priority priority of the lock to be acquired
-   * @return true if the condititons are met
+   * Check if the lock for a given request can coexist with a given set of conflicting posses without any revocation.
+   * @param conflictPosses conflict lock posses
+   * @param request lock request
+   * @return true iff the lock can coexist with all its conflicting locks
    */
-  private static boolean areAllEqualOrHigherPriorityLocksSharedOrRevoked(List<TaskLockPosse> lockPosses, int priority)
+  private boolean canLockCoexist(List<TaskLockPosse> conflictPosses, LockRequest request)
   {
-    return lockPosses.stream()
-                     .filter(taskLockPosse -> taskLockPosse.getTaskLock().getNonNullPriority() >= priority)
-                     .allMatch(taskLockPosse -> taskLockPosse.getTaskLock().getType().equals(TaskLockType.SHARED)
-                                                || taskLockPosse.getTaskLock().isRevoked());
+    switch (request.getType()) {
+      case APPEND:
+        return canAppendLockCoexist(conflictPosses, request);
+      case REPLACE:
+        return canReplaceLockCoexist(conflictPosses, request);
+      case SHARED:
+        return canSharedLockCoexist(conflictPosses);
+      case EXCLUSIVE:
+        return canExclusiveLockCoexist(conflictPosses);
+      default:
+        return false;
+    }

Review Comment:
   Enumerations can have methods, given that you've basically created an independent method for each of the enumerations, you might as well add an abstract method to `TaskLockType` and call that.  Let the JVM do the switching for you.



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java:
##########
@@ -1239,43 +1262,190 @@ Map<String, NavigableMap<DateTime, SortedMap<Interval, List<TaskLockPosse>>>> ge
   }
 
   /**
-   * Check if all lockPosses are either shared
-   * OR of lower priority
-   * OR are revoked non-shared locks if their priorities are greater than or equal to the provided priority
-   * @param lockPosses conflicting task lock posses to be checked
-   * @param priority priority of the lock to be acquired
-   * @return true if the condititons are met
+   * Check if the lock for a given request can coexist with a given set of conflicting posses without any revocation.
+   * @param conflictPosses conflict lock posses
+   * @param request lock request
+   * @return true iff the lock can coexist with all its conflicting locks
    */
-  private static boolean areAllEqualOrHigherPriorityLocksSharedOrRevoked(List<TaskLockPosse> lockPosses, int priority)
+  private boolean canLockCoexist(List<TaskLockPosse> conflictPosses, LockRequest request)
   {
-    return lockPosses.stream()
-                     .filter(taskLockPosse -> taskLockPosse.getTaskLock().getNonNullPriority() >= priority)
-                     .allMatch(taskLockPosse -> taskLockPosse.getTaskLock().getType().equals(TaskLockType.SHARED)
-                                                || taskLockPosse.getTaskLock().isRevoked());
+    switch (request.getType()) {
+      case APPEND:
+        return canAppendLockCoexist(conflictPosses, request);
+      case REPLACE:
+        return canReplaceLockCoexist(conflictPosses, request);
+      case SHARED:
+        return canSharedLockCoexist(conflictPosses);
+      case EXCLUSIVE:
+        return canExclusiveLockCoexist(conflictPosses);
+      default:
+        return false;
+    }
   }
 
   /**
-   * Revokes all non-shared locks with priorities lower than the provided priority
-   * @param lockPosses conflicting task lock posses which may be revoked
-   * @param priority priority of the lock to be acquired
+   * Check if an APPEND lock can coexist with a given set of conflicting posses.
+   * An APPEND lock can coexist with any number of other APPEND locks
+   *    OR with at most one REPLACE lock over an interval which encloes this request.
+   * @param conflictPosses conflicting lock posses
+   * @param appendRequest append lock request
+   * @return true iff append lock can coexist with all its conflicting locks
    */
-  private void revokeAllLowerPriorityNonSharedLocks(List<TaskLockPosse> lockPosses, int priority)
+  private boolean canAppendLockCoexist(List<TaskLockPosse> conflictPosses, LockRequest appendRequest)
   {
-    lockPosses.stream()
-              .filter(taskLockPosse -> !TaskLockType.SHARED.equals(taskLockPosse.getTaskLock().getType()))
-              .filter(taskLockPosse -> taskLockPosse.getTaskLock().getNonNullPriority() < priority)
-              .forEach(this::revokeLock);
+    TaskLock replaceLock = null;
+    for (TaskLockPosse posse : conflictPosses) {
+      if (posse.getTaskLock().isRevoked()) {
+        continue;
+      }
+      if (posse.getTaskLock().getType().equals(TaskLockType.EXCLUSIVE)
+          || posse.getTaskLock().getType().equals(TaskLockType.SHARED)) {
+        return false;
+      }
+      if (posse.getTaskLock().getType().equals(TaskLockType.REPLACE)) {
+        if (replaceLock != null) {
+          return false;
+        }
+        replaceLock = posse.getTaskLock();
+        if (!replaceLock.getInterval().contains(appendRequest.getInterval())) {
+          return false;
+        }
+      }
+    }
+    return true;
   }
 
-  private static boolean isAllRevocable(List<TaskLockPosse> lockPosses, int tryLockPriority)
+  /**
+   * Check if a REPLACE lock can coexist with a given set of conflicting posses.
+   * A REPLACE lock can coexist with any number of other APPEND locks and revoked locks
+   * @param conflictPosses conflicting lock posses
+   * @param replaceLock replace lock request
+   * @return true iff replace lock can coexist with all its conflicting locks
+   */
+  private boolean canReplaceLockCoexist(List<TaskLockPosse> conflictPosses, LockRequest replaceLock)
   {
-    return lockPosses.stream().allMatch(taskLockPosse -> isRevocable(taskLockPosse, tryLockPriority));
+    for (TaskLockPosse posse : conflictPosses) {
+      if (posse.getTaskLock().isRevoked()) {
+        continue;
+      }
+      if (posse.getTaskLock().getType().equals(TaskLockType.EXCLUSIVE)
+          || posse.getTaskLock().getType().equals(TaskLockType.SHARED)
+          || posse.getTaskLock().getType().equals(TaskLockType.REPLACE)) {
+        return false;
+      }
+      if (posse.getTaskLock().getType().equals(TaskLockType.APPEND)
+          && !replaceLock.getInterval().contains(posse.getTaskLock().getInterval())) {
+        return false;
+      }
+    }
+    return true;
   }
 
-  private static boolean isRevocable(TaskLockPosse lockPosse, int tryLockPriority)
+  /**
+   * Check if a SHARED lock can coexist with a given set of conflicting posses.
+   * A SHARED lock can coexist with any number of other active SHARED locks
+   * @param conflictPosses conflicting lock posses
+   * @return true iff shared lock can coexist with all its conflicting locks
+   */
+  private boolean canSharedLockCoexist(List<TaskLockPosse> conflictPosses)
   {
-    final TaskLock existingLock = lockPosse.getTaskLock();
-    return existingLock.isRevoked() || existingLock.getNonNullPriority() < tryLockPriority;
+    for (TaskLockPosse posse : conflictPosses) {
+      if (posse.getTaskLock().isRevoked()) {
+        continue;
+      }
+      if (posse.getTaskLock().getType().equals(TaskLockType.EXCLUSIVE)
+          || posse.getTaskLock().getType().equals(TaskLockType.APPEND)
+          || posse.getTaskLock().getType().equals(TaskLockType.REPLACE)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Check if an EXCLUSIVE lock can coexist with a given set of conflicting posses.
+   * An EXCLUSIVE lock cannot coexist with any other overlapping active locks
+   * @param conflictPosses conflicting lock posses
+   * @return true iff the exclusive lock can coexist with all its conflicting locks
+   */
+  private boolean canExclusiveLockCoexist(List<TaskLockPosse> conflictPosses)
+  {
+    for (TaskLockPosse posse : conflictPosses) {
+      if (posse.getTaskLock().isRevoked()) {
+        continue;
+      }
+      return false;
+    }
+    return true;
+  }
+
+
+  /**
+   * Verify if each incompatible active lock is revokable. If yes, revoke all of them.
+   * - EXCLUSIVE locks are incompatible with every other conflicting lock
+   * - SHARED locks are incompatible with conflicting locks of every other type
+   * - REPLACE locks are incompatible with every conflicting lock which is not (APPEND and enclosed) within its interval
+   * - APPEND locks are incompatible with every EXCLUSIVE and SHARED lock.

Review Comment:
   As I get to these too, I think that this definition/explanation deserves to be on the javadoc for the lock type rahter than hidden here on this one method.



##########
indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java:
##########
@@ -1272,6 +1276,627 @@ public void testGetLockedIntervalsForEqualPriorityTask() throws Exception
     );
   }
 
+  @Test
+  public void testExclusiveLockCompatibility() throws Exception
+  {
+    final List<Task> tasks = new ArrayList<>();
+
+    final TaskLock theLock = insertTaskLock(
+        TaskLockType.EXCLUSIVE,
+        Intervals.of("2017/2018"),
+        MEDIUM_PRIORITY,
+        tasks
+    );
+    Assert.assertNotNull(theLock);
+    Assert.assertFalse(theLock.isRevoked());
+
+    // Another exclusive lock cannot be created for an overlapping interval
+    Assert.assertNull(
+        insertTaskLock(

Review Comment:
   Instead of `Assert.assertNull` I think the test would be easier to decipher the intent if you were to create a helper method that is like
   
   ```
   assertLockNotGranted()
   ```
   
   It can do a null check inside of itself, but when reading the test, it becomes clear what you are trying to test for without the reader having to figure out how to interpret `assertNull`



##########
indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java:
##########
@@ -1272,6 +1276,627 @@ public void testGetLockedIntervalsForEqualPriorityTask() throws Exception
     );
   }
 
+  @Test
+  public void testExclusiveLockCompatibility() throws Exception
+  {
+    final List<Task> tasks = new ArrayList<>();
+
+    final TaskLock theLock = insertTaskLock(
+        TaskLockType.EXCLUSIVE,
+        Intervals.of("2017/2018"),
+        MEDIUM_PRIORITY,
+        tasks
+    );
+    Assert.assertNotNull(theLock);
+    Assert.assertFalse(theLock.isRevoked());
+
+    // Another exclusive lock cannot be created for an overlapping interval
+    Assert.assertNull(
+        insertTaskLock(
+            TaskLockType.EXCLUSIVE,
+            Intervals.of("2017-05-01/2017-06-01"),
+            MEDIUM_PRIORITY,
+            tasks
+        )
+    );
+
+    // A shared lock cannot be created for an overlapping interval
+    Assert.assertNull(
+        insertTaskLock(
+            TaskLockType.SHARED,
+            Intervals.of("2016/2019"),
+            MEDIUM_PRIORITY,
+            tasks
+        )
+    );
+
+    // A replace lock cannot be created for an overlapping interval
+    Assert.assertNull(
+        insertTaskLock(
+            TaskLockType.REPLACE,
+            Intervals.of("2017/2018"),
+            MEDIUM_PRIORITY,
+            tasks
+        )
+    );
+
+    // An append lock cannot be created for an overlapping interval
+    Assert.assertNull(
+        insertTaskLock(
+            TaskLockType.APPEND,
+            Intervals.of("2017-05-01/2018-05-01"),
+            MEDIUM_PRIORITY,
+            tasks
+        )
+    );
+
+    final Set<TaskLock> expectedLocks = ImmutableSet.of(
+        theLock
+    );
+    Assert.assertEquals(expectedLocks, getAllLocks(tasks));
+  }
+
+  @Test
+  public void testExclusiveLockCanRevokeAllIncompatible() throws Exception
+  {
+    final List<Task> tasks = new ArrayList<>();
+
+    // Revoked shared lock of higher priority -> revoked locks are not incompatible
+    final TaskLock sharedLock = insertTaskLock(
+        TaskLockType.SHARED,
+        Intervals.of("2016/2019"),
+        HIGH_PRIORITY,
+        tasks
+    );
+    Assert.assertNotNull(sharedLock);
+    lockbox.revokeLock(tasks.get(0).getId(), sharedLock);
+
+    // Active Exclusive lock of lower priority -> will be revoked
+    final TaskLock exclusiveLock = insertTaskLock(
+        TaskLockType.EXCLUSIVE,
+        Intervals.of("2017-01-01/2017-02-01"),
+        LOW_PRIORITY,
+        tasks
+    );
+    Assert.assertNotNull(exclusiveLock);
+    Assert.assertFalse(exclusiveLock.isRevoked());
+
+    // Active replace lock of lower priority -> will be revoked
+    final TaskLock replaceLock = insertTaskLock(
+        TaskLockType.REPLACE,
+        Intervals.of("2017-07-01/2018-01-01"),
+        LOW_PRIORITY,
+        tasks
+    );
+    Assert.assertNotNull(replaceLock);
+    Assert.assertFalse(replaceLock.isRevoked());

Review Comment:
   This also seems like it should be fine, the shared lock was revoked?
   
   Same mis-reading here.  If you are taking the time to have comments, please always make sure that they are in sync.  An incorrect comment is *significantly* worse than no comment at all as it creates confusion and encourages people to assume they know what the code is doing without actually reading it.



##########
indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java:
##########
@@ -1403,6 +1987,57 @@ public void testConflictsWithOverlappingSharedLocks() throws Exception
     );
   }
 
+  @Test
+  public void testFailedToReacquireTaskLock() throws Exception
+  {
+    // Tasks to be failed have a group id with the substring "FailingLockAcquisition"
+    // Please refer to NullLockPosseTaskLockbox
+    final Task taskWithFailingLockAcquisition0 = NoopTask.withGroupId("FailingLockAcquisition");
+    final Task taskWithFailingLockAcquisition1 = NoopTask.withGroupId("FailingLockAcquisition");
+    final Task taskWithSuccessfulLockAcquisition = NoopTask.create();
+    taskStorage.insert(taskWithFailingLockAcquisition0, TaskStatus.running(taskWithFailingLockAcquisition0.getId()));
+    taskStorage.insert(taskWithFailingLockAcquisition1, TaskStatus.running(taskWithFailingLockAcquisition1.getId()));
+    taskStorage.insert(taskWithSuccessfulLockAcquisition, TaskStatus.running(taskWithSuccessfulLockAcquisition.getId()));
+
+    TaskLockbox testLockbox = new NullLockPosseTaskLockbox(taskStorage, metadataStorageCoordinator);
+    testLockbox.add(taskWithFailingLockAcquisition0);
+    testLockbox.add(taskWithFailingLockAcquisition1);
+    testLockbox.add(taskWithSuccessfulLockAcquisition);
+
+    testLockbox.tryLock(taskWithFailingLockAcquisition0,
+                        new TimeChunkLockRequest(TaskLockType.EXCLUSIVE,
+                                                 taskWithFailingLockAcquisition0,
+                                                 Intervals.of("2017-07-01/2017-08-01"),
+                                                 null
+                        )
+    );
+
+    testLockbox.tryLock(taskWithSuccessfulLockAcquisition,
+                        new TimeChunkLockRequest(TaskLockType.EXCLUSIVE,
+                                                 taskWithSuccessfulLockAcquisition,
+                                                 Intervals.of("2017-07-01/2017-08-01"),
+                                                 null
+                        )
+    );
+
+    Assert.assertEquals(3, taskStorage.getActiveTasks().size());
+
+    // The tasks must be marked for failure
+    TaskLockboxSyncResult result = testLockbox.syncFromStorage();
+    Assert.assertEquals(ImmutableSet.of(taskWithFailingLockAcquisition0, taskWithFailingLockAcquisition1),
+                        result.getTasksToFail());
+  }

Review Comment:
   Did this test change or just move?  If it just moved, why move it?  Please move it back



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java:
##########
@@ -1097,6 +1099,27 @@ public void remove(final Task task)
     }
   }
 
+  public Set<TaskLock> getAllExclusiveLocksForDatasource(final String datasource)

Review Comment:
   I don't see this method being used?  Why was it created?



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java:
##########
@@ -1239,43 +1262,190 @@ Map<String, NavigableMap<DateTime, SortedMap<Interval, List<TaskLockPosse>>>> ge
   }
 
   /**
-   * Check if all lockPosses are either shared
-   * OR of lower priority
-   * OR are revoked non-shared locks if their priorities are greater than or equal to the provided priority
-   * @param lockPosses conflicting task lock posses to be checked
-   * @param priority priority of the lock to be acquired
-   * @return true if the condititons are met
+   * Check if the lock for a given request can coexist with a given set of conflicting posses without any revocation.
+   * @param conflictPosses conflict lock posses
+   * @param request lock request
+   * @return true iff the lock can coexist with all its conflicting locks
    */
-  private static boolean areAllEqualOrHigherPriorityLocksSharedOrRevoked(List<TaskLockPosse> lockPosses, int priority)
+  private boolean canLockCoexist(List<TaskLockPosse> conflictPosses, LockRequest request)
   {
-    return lockPosses.stream()
-                     .filter(taskLockPosse -> taskLockPosse.getTaskLock().getNonNullPriority() >= priority)
-                     .allMatch(taskLockPosse -> taskLockPosse.getTaskLock().getType().equals(TaskLockType.SHARED)
-                                                || taskLockPosse.getTaskLock().isRevoked());
+    switch (request.getType()) {
+      case APPEND:
+        return canAppendLockCoexist(conflictPosses, request);
+      case REPLACE:
+        return canReplaceLockCoexist(conflictPosses, request);
+      case SHARED:
+        return canSharedLockCoexist(conflictPosses);
+      case EXCLUSIVE:
+        return canExclusiveLockCoexist(conflictPosses);
+      default:
+        return false;
+    }
   }
 
   /**
-   * Revokes all non-shared locks with priorities lower than the provided priority
-   * @param lockPosses conflicting task lock posses which may be revoked
-   * @param priority priority of the lock to be acquired
+   * Check if an APPEND lock can coexist with a given set of conflicting posses.
+   * An APPEND lock can coexist with any number of other APPEND locks
+   *    OR with at most one REPLACE lock over an interval which encloes this request.
+   * @param conflictPosses conflicting lock posses
+   * @param appendRequest append lock request
+   * @return true iff append lock can coexist with all its conflicting locks
    */
-  private void revokeAllLowerPriorityNonSharedLocks(List<TaskLockPosse> lockPosses, int priority)
+  private boolean canAppendLockCoexist(List<TaskLockPosse> conflictPosses, LockRequest appendRequest)
   {
-    lockPosses.stream()
-              .filter(taskLockPosse -> !TaskLockType.SHARED.equals(taskLockPosse.getTaskLock().getType()))
-              .filter(taskLockPosse -> taskLockPosse.getTaskLock().getNonNullPriority() < priority)
-              .forEach(this::revokeLock);
+    TaskLock replaceLock = null;
+    for (TaskLockPosse posse : conflictPosses) {
+      if (posse.getTaskLock().isRevoked()) {
+        continue;
+      }
+      if (posse.getTaskLock().getType().equals(TaskLockType.EXCLUSIVE)
+          || posse.getTaskLock().getType().equals(TaskLockType.SHARED)) {
+        return false;
+      }
+      if (posse.getTaskLock().getType().equals(TaskLockType.REPLACE)) {
+        if (replaceLock != null) {
+          return false;
+        }
+        replaceLock = posse.getTaskLock();
+        if (!replaceLock.getInterval().contains(appendRequest.getInterval())) {
+          return false;
+        }
+      }
+    }
+    return true;
   }
 
-  private static boolean isAllRevocable(List<TaskLockPosse> lockPosses, int tryLockPriority)
+  /**
+   * Check if a REPLACE lock can coexist with a given set of conflicting posses.
+   * A REPLACE lock can coexist with any number of other APPEND locks and revoked locks
+   * @param conflictPosses conflicting lock posses
+   * @param replaceLock replace lock request
+   * @return true iff replace lock can coexist with all its conflicting locks
+   */
+  private boolean canReplaceLockCoexist(List<TaskLockPosse> conflictPosses, LockRequest replaceLock)
   {
-    return lockPosses.stream().allMatch(taskLockPosse -> isRevocable(taskLockPosse, tryLockPriority));
+    for (TaskLockPosse posse : conflictPosses) {
+      if (posse.getTaskLock().isRevoked()) {
+        continue;
+      }
+      if (posse.getTaskLock().getType().equals(TaskLockType.EXCLUSIVE)
+          || posse.getTaskLock().getType().equals(TaskLockType.SHARED)
+          || posse.getTaskLock().getType().equals(TaskLockType.REPLACE)) {
+        return false;
+      }
+      if (posse.getTaskLock().getType().equals(TaskLockType.APPEND)
+          && !replaceLock.getInterval().contains(posse.getTaskLock().getInterval())) {
+        return false;
+      }
+    }
+    return true;
   }
 
-  private static boolean isRevocable(TaskLockPosse lockPosse, int tryLockPriority)
+  /**
+   * Check if a SHARED lock can coexist with a given set of conflicting posses.
+   * A SHARED lock can coexist with any number of other active SHARED locks
+   * @param conflictPosses conflicting lock posses
+   * @return true iff shared lock can coexist with all its conflicting locks
+   */
+  private boolean canSharedLockCoexist(List<TaskLockPosse> conflictPosses)
   {
-    final TaskLock existingLock = lockPosse.getTaskLock();
-    return existingLock.isRevoked() || existingLock.getNonNullPriority() < tryLockPriority;
+    for (TaskLockPosse posse : conflictPosses) {
+      if (posse.getTaskLock().isRevoked()) {
+        continue;
+      }
+      if (posse.getTaskLock().getType().equals(TaskLockType.EXCLUSIVE)
+          || posse.getTaskLock().getType().equals(TaskLockType.APPEND)
+          || posse.getTaskLock().getType().equals(TaskLockType.REPLACE)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Check if an EXCLUSIVE lock can coexist with a given set of conflicting posses.
+   * An EXCLUSIVE lock cannot coexist with any other overlapping active locks
+   * @param conflictPosses conflicting lock posses
+   * @return true iff the exclusive lock can coexist with all its conflicting locks
+   */
+  private boolean canExclusiveLockCoexist(List<TaskLockPosse> conflictPosses)
+  {
+    for (TaskLockPosse posse : conflictPosses) {
+      if (posse.getTaskLock().isRevoked()) {
+        continue;
+      }
+      return false;
+    }
+    return true;
+  }
+
+
+  /**
+   * Verify if each incompatible active lock is revokable. If yes, revoke all of them.

Review Comment:
   > If yes, revoke all of them.
   
   So, if one lock is revokable, then revoke all locks regardless of whether they are revokable or not?  Or, what is that trying to say?



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java:
##########
@@ -1239,43 +1262,190 @@ Map<String, NavigableMap<DateTime, SortedMap<Interval, List<TaskLockPosse>>>> ge
   }
 
   /**
-   * Check if all lockPosses are either shared
-   * OR of lower priority
-   * OR are revoked non-shared locks if their priorities are greater than or equal to the provided priority
-   * @param lockPosses conflicting task lock posses to be checked
-   * @param priority priority of the lock to be acquired
-   * @return true if the condititons are met
+   * Check if the lock for a given request can coexist with a given set of conflicting posses without any revocation.
+   * @param conflictPosses conflict lock posses
+   * @param request lock request
+   * @return true iff the lock can coexist with all its conflicting locks
    */
-  private static boolean areAllEqualOrHigherPriorityLocksSharedOrRevoked(List<TaskLockPosse> lockPosses, int priority)
+  private boolean canLockCoexist(List<TaskLockPosse> conflictPosses, LockRequest request)
   {
-    return lockPosses.stream()
-                     .filter(taskLockPosse -> taskLockPosse.getTaskLock().getNonNullPriority() >= priority)
-                     .allMatch(taskLockPosse -> taskLockPosse.getTaskLock().getType().equals(TaskLockType.SHARED)
-                                                || taskLockPosse.getTaskLock().isRevoked());
+    switch (request.getType()) {
+      case APPEND:
+        return canAppendLockCoexist(conflictPosses, request);
+      case REPLACE:
+        return canReplaceLockCoexist(conflictPosses, request);
+      case SHARED:
+        return canSharedLockCoexist(conflictPosses);
+      case EXCLUSIVE:
+        return canExclusiveLockCoexist(conflictPosses);
+      default:
+        return false;
+    }
   }
 
   /**
-   * Revokes all non-shared locks with priorities lower than the provided priority
-   * @param lockPosses conflicting task lock posses which may be revoked
-   * @param priority priority of the lock to be acquired
+   * Check if an APPEND lock can coexist with a given set of conflicting posses.
+   * An APPEND lock can coexist with any number of other APPEND locks
+   *    OR with at most one REPLACE lock over an interval which encloes this request.
+   * @param conflictPosses conflicting lock posses
+   * @param appendRequest append lock request
+   * @return true iff append lock can coexist with all its conflicting locks
    */
-  private void revokeAllLowerPriorityNonSharedLocks(List<TaskLockPosse> lockPosses, int priority)
+  private boolean canAppendLockCoexist(List<TaskLockPosse> conflictPosses, LockRequest appendRequest)
   {
-    lockPosses.stream()
-              .filter(taskLockPosse -> !TaskLockType.SHARED.equals(taskLockPosse.getTaskLock().getType()))
-              .filter(taskLockPosse -> taskLockPosse.getTaskLock().getNonNullPriority() < priority)
-              .forEach(this::revokeLock);
+    TaskLock replaceLock = null;
+    for (TaskLockPosse posse : conflictPosses) {
+      if (posse.getTaskLock().isRevoked()) {
+        continue;
+      }
+      if (posse.getTaskLock().getType().equals(TaskLockType.EXCLUSIVE)
+          || posse.getTaskLock().getType().equals(TaskLockType.SHARED)) {
+        return false;
+      }
+      if (posse.getTaskLock().getType().equals(TaskLockType.REPLACE)) {
+        if (replaceLock != null) {
+          return false;
+        }
+        replaceLock = posse.getTaskLock();
+        if (!replaceLock.getInterval().contains(appendRequest.getInterval())) {
+          return false;
+        }
+      }
+    }
+    return true;
   }
 
-  private static boolean isAllRevocable(List<TaskLockPosse> lockPosses, int tryLockPriority)
+  /**
+   * Check if a REPLACE lock can coexist with a given set of conflicting posses.
+   * A REPLACE lock can coexist with any number of other APPEND locks and revoked locks
+   * @param conflictPosses conflicting lock posses
+   * @param replaceLock replace lock request
+   * @return true iff replace lock can coexist with all its conflicting locks
+   */
+  private boolean canReplaceLockCoexist(List<TaskLockPosse> conflictPosses, LockRequest replaceLock)
   {
-    return lockPosses.stream().allMatch(taskLockPosse -> isRevocable(taskLockPosse, tryLockPriority));
+    for (TaskLockPosse posse : conflictPosses) {
+      if (posse.getTaskLock().isRevoked()) {
+        continue;
+      }
+      if (posse.getTaskLock().getType().equals(TaskLockType.EXCLUSIVE)
+          || posse.getTaskLock().getType().equals(TaskLockType.SHARED)
+          || posse.getTaskLock().getType().equals(TaskLockType.REPLACE)) {
+        return false;
+      }
+      if (posse.getTaskLock().getType().equals(TaskLockType.APPEND)
+          && !replaceLock.getInterval().contains(posse.getTaskLock().getInterval())) {
+        return false;
+      }
+    }
+    return true;
   }
 
-  private static boolean isRevocable(TaskLockPosse lockPosse, int tryLockPriority)
+  /**
+   * Check if a SHARED lock can coexist with a given set of conflicting posses.
+   * A SHARED lock can coexist with any number of other active SHARED locks
+   * @param conflictPosses conflicting lock posses
+   * @return true iff shared lock can coexist with all its conflicting locks
+   */
+  private boolean canSharedLockCoexist(List<TaskLockPosse> conflictPosses)
   {
-    final TaskLock existingLock = lockPosse.getTaskLock();
-    return existingLock.isRevoked() || existingLock.getNonNullPriority() < tryLockPriority;
+    for (TaskLockPosse posse : conflictPosses) {
+      if (posse.getTaskLock().isRevoked()) {
+        continue;
+      }
+      if (posse.getTaskLock().getType().equals(TaskLockType.EXCLUSIVE)
+          || posse.getTaskLock().getType().equals(TaskLockType.APPEND)
+          || posse.getTaskLock().getType().equals(TaskLockType.REPLACE)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Check if an EXCLUSIVE lock can coexist with a given set of conflicting posses.
+   * An EXCLUSIVE lock cannot coexist with any other overlapping active locks
+   * @param conflictPosses conflicting lock posses
+   * @return true iff the exclusive lock can coexist with all its conflicting locks
+   */
+  private boolean canExclusiveLockCoexist(List<TaskLockPosse> conflictPosses)
+  {
+    for (TaskLockPosse posse : conflictPosses) {
+      if (posse.getTaskLock().isRevoked()) {
+        continue;
+      }
+      return false;
+    }
+    return true;
+  }
+
+
+  /**
+   * Verify if each incompatible active lock is revokable. If yes, revoke all of them.
+   * - EXCLUSIVE locks are incompatible with every other conflicting lock
+   * - SHARED locks are incompatible with conflicting locks of every other type

Review Comment:
   "SHARED locks are only compatible with other shared locks, incompatible with everything else"
   
   Might be more clear



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java:
##########
@@ -1239,43 +1262,190 @@ Map<String, NavigableMap<DateTime, SortedMap<Interval, List<TaskLockPosse>>>> ge
   }
 
   /**
-   * Check if all lockPosses are either shared
-   * OR of lower priority
-   * OR are revoked non-shared locks if their priorities are greater than or equal to the provided priority
-   * @param lockPosses conflicting task lock posses to be checked
-   * @param priority priority of the lock to be acquired
-   * @return true if the condititons are met
+   * Check if the lock for a given request can coexist with a given set of conflicting posses without any revocation.
+   * @param conflictPosses conflict lock posses
+   * @param request lock request
+   * @return true iff the lock can coexist with all its conflicting locks
    */
-  private static boolean areAllEqualOrHigherPriorityLocksSharedOrRevoked(List<TaskLockPosse> lockPosses, int priority)
+  private boolean canLockCoexist(List<TaskLockPosse> conflictPosses, LockRequest request)
   {
-    return lockPosses.stream()
-                     .filter(taskLockPosse -> taskLockPosse.getTaskLock().getNonNullPriority() >= priority)
-                     .allMatch(taskLockPosse -> taskLockPosse.getTaskLock().getType().equals(TaskLockType.SHARED)
-                                                || taskLockPosse.getTaskLock().isRevoked());
+    switch (request.getType()) {
+      case APPEND:
+        return canAppendLockCoexist(conflictPosses, request);
+      case REPLACE:
+        return canReplaceLockCoexist(conflictPosses, request);
+      case SHARED:
+        return canSharedLockCoexist(conflictPosses);
+      case EXCLUSIVE:
+        return canExclusiveLockCoexist(conflictPosses);
+      default:
+        return false;
+    }
   }
 
   /**
-   * Revokes all non-shared locks with priorities lower than the provided priority
-   * @param lockPosses conflicting task lock posses which may be revoked
-   * @param priority priority of the lock to be acquired
+   * Check if an APPEND lock can coexist with a given set of conflicting posses.
+   * An APPEND lock can coexist with any number of other APPEND locks
+   *    OR with at most one REPLACE lock over an interval which encloes this request.
+   * @param conflictPosses conflicting lock posses
+   * @param appendRequest append lock request
+   * @return true iff append lock can coexist with all its conflicting locks
    */
-  private void revokeAllLowerPriorityNonSharedLocks(List<TaskLockPosse> lockPosses, int priority)
+  private boolean canAppendLockCoexist(List<TaskLockPosse> conflictPosses, LockRequest appendRequest)
   {
-    lockPosses.stream()
-              .filter(taskLockPosse -> !TaskLockType.SHARED.equals(taskLockPosse.getTaskLock().getType()))
-              .filter(taskLockPosse -> taskLockPosse.getTaskLock().getNonNullPriority() < priority)
-              .forEach(this::revokeLock);
+    TaskLock replaceLock = null;
+    for (TaskLockPosse posse : conflictPosses) {
+      if (posse.getTaskLock().isRevoked()) {
+        continue;
+      }
+      if (posse.getTaskLock().getType().equals(TaskLockType.EXCLUSIVE)
+          || posse.getTaskLock().getType().equals(TaskLockType.SHARED)) {
+        return false;
+      }
+      if (posse.getTaskLock().getType().equals(TaskLockType.REPLACE)) {
+        if (replaceLock != null) {
+          return false;
+        }
+        replaceLock = posse.getTaskLock();
+        if (!replaceLock.getInterval().contains(appendRequest.getInterval())) {
+          return false;
+        }
+      }
+    }
+    return true;
   }
 
-  private static boolean isAllRevocable(List<TaskLockPosse> lockPosses, int tryLockPriority)
+  /**
+   * Check if a REPLACE lock can coexist with a given set of conflicting posses.
+   * A REPLACE lock can coexist with any number of other APPEND locks and revoked locks
+   * @param conflictPosses conflicting lock posses
+   * @param replaceLock replace lock request
+   * @return true iff replace lock can coexist with all its conflicting locks
+   */
+  private boolean canReplaceLockCoexist(List<TaskLockPosse> conflictPosses, LockRequest replaceLock)
   {
-    return lockPosses.stream().allMatch(taskLockPosse -> isRevocable(taskLockPosse, tryLockPriority));
+    for (TaskLockPosse posse : conflictPosses) {
+      if (posse.getTaskLock().isRevoked()) {
+        continue;
+      }
+      if (posse.getTaskLock().getType().equals(TaskLockType.EXCLUSIVE)
+          || posse.getTaskLock().getType().equals(TaskLockType.SHARED)
+          || posse.getTaskLock().getType().equals(TaskLockType.REPLACE)) {
+        return false;
+      }
+      if (posse.getTaskLock().getType().equals(TaskLockType.APPEND)
+          && !replaceLock.getInterval().contains(posse.getTaskLock().getInterval())) {
+        return false;
+      }
+    }
+    return true;
   }
 
-  private static boolean isRevocable(TaskLockPosse lockPosse, int tryLockPriority)
+  /**
+   * Check if a SHARED lock can coexist with a given set of conflicting posses.
+   * A SHARED lock can coexist with any number of other active SHARED locks
+   * @param conflictPosses conflicting lock posses
+   * @return true iff shared lock can coexist with all its conflicting locks
+   */
+  private boolean canSharedLockCoexist(List<TaskLockPosse> conflictPosses)
   {
-    final TaskLock existingLock = lockPosse.getTaskLock();
-    return existingLock.isRevoked() || existingLock.getNonNullPriority() < tryLockPriority;
+    for (TaskLockPosse posse : conflictPosses) {
+      if (posse.getTaskLock().isRevoked()) {
+        continue;
+      }
+      if (posse.getTaskLock().getType().equals(TaskLockType.EXCLUSIVE)
+          || posse.getTaskLock().getType().equals(TaskLockType.APPEND)
+          || posse.getTaskLock().getType().equals(TaskLockType.REPLACE)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Check if an EXCLUSIVE lock can coexist with a given set of conflicting posses.
+   * An EXCLUSIVE lock cannot coexist with any other overlapping active locks
+   * @param conflictPosses conflicting lock posses
+   * @return true iff the exclusive lock can coexist with all its conflicting locks
+   */
+  private boolean canExclusiveLockCoexist(List<TaskLockPosse> conflictPosses)
+  {
+    for (TaskLockPosse posse : conflictPosses) {
+      if (posse.getTaskLock().isRevoked()) {
+        continue;
+      }
+      return false;
+    }
+    return true;
+  }
+
+
+  /**
+   * Verify if each incompatible active lock is revokable. If yes, revoke all of them.
+   * - EXCLUSIVE locks are incompatible with every other conflicting lock
+   * - SHARED locks are incompatible with conflicting locks of every other type
+   * - REPLACE locks are incompatible with every conflicting lock which is not (APPEND and enclosed) within its interval
+   * - APPEND locks are incompatible with every EXCLUSIVE and SHARED lock.
+   *   Conflicting REPLACE locks which don't enclose its interval are also incompatible.
+   * @param conflictPosses conflicting lock posses
+   * @param request lock request
+   * @return true iff every incompatible lock is revocable.
+   */
+  private boolean revokeAllIncompatibleActiveLocksIfPossible(
+      List<TaskLockPosse> conflictPosses,
+      LockRequest request
+  )
+  {
+    final int priority = request.getPriority();
+    final TaskLockType type = request.getType();
+    final List<TaskLockPosse> possesToRevoke = new ArrayList<>();
+
+    for (TaskLockPosse posse : conflictPosses) {
+      if (posse.getTaskLock().isRevoked()) {
+        continue;
+      }
+      switch (type) {
+        case EXCLUSIVE:
+          if (posse.getTaskLock().getPriority() >= priority) {
+            return false;
+          }
+          possesToRevoke.add(posse);
+          break;
+        case SHARED:
+          if (!posse.getTaskLock().getType().equals(TaskLockType.SHARED)) {
+            if (posse.getTaskLock().getPriority() >= priority) {
+              return false;
+            }
+            possesToRevoke.add(posse);
+          }
+          break;
+        case REPLACE:
+          if (!(posse.getTaskLock().getType().equals(TaskLockType.APPEND)
+                && request.getInterval().contains(posse.getTaskLock().getInterval()))) {
+            if (posse.getTaskLock().getPriority() >= priority) {
+              return false;
+            }
+            possesToRevoke.add(posse);
+          }
+          break;
+        case APPEND:
+          if (!(posse.getTaskLock().getType().equals(TaskLockType.APPEND)
+                || (posse.getTaskLock().getType().equals(TaskLockType.REPLACE)
+                    && posse.getTaskLock().getInterval().contains(request.getInterval())))) {
+            if (posse.getTaskLock().getPriority() >= priority) {
+              return false;
+            }
+            possesToRevoke.add(posse);
+          }
+          break;
+        default:
+          return false;
+      }

Review Comment:
   I think this could be pretty nicely done as an abstract method on `TaskLockType`.  Perhaps
   
   ```
   List<TaskLockPosse> determineLocksToRevoke(List<TaskLockPosse>, LockRequest request)
   ```
   
   ?



##########
indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java:
##########
@@ -1272,6 +1276,627 @@ public void testGetLockedIntervalsForEqualPriorityTask() throws Exception
     );
   }
 
+  @Test
+  public void testExclusiveLockCompatibility() throws Exception
+  {
+    final List<Task> tasks = new ArrayList<>();
+
+    final TaskLock theLock = insertTaskLock(
+        TaskLockType.EXCLUSIVE,
+        Intervals.of("2017/2018"),
+        MEDIUM_PRIORITY,
+        tasks
+    );
+    Assert.assertNotNull(theLock);
+    Assert.assertFalse(theLock.isRevoked());
+
+    // Another exclusive lock cannot be created for an overlapping interval
+    Assert.assertNull(
+        insertTaskLock(
+            TaskLockType.EXCLUSIVE,
+            Intervals.of("2017-05-01/2017-06-01"),
+            MEDIUM_PRIORITY,
+            tasks
+        )
+    );
+
+    // A shared lock cannot be created for an overlapping interval
+    Assert.assertNull(
+        insertTaskLock(
+            TaskLockType.SHARED,
+            Intervals.of("2016/2019"),
+            MEDIUM_PRIORITY,
+            tasks
+        )
+    );
+
+    // A replace lock cannot be created for an overlapping interval
+    Assert.assertNull(
+        insertTaskLock(
+            TaskLockType.REPLACE,
+            Intervals.of("2017/2018"),
+            MEDIUM_PRIORITY,
+            tasks
+        )
+    );
+
+    // An append lock cannot be created for an overlapping interval
+    Assert.assertNull(
+        insertTaskLock(
+            TaskLockType.APPEND,
+            Intervals.of("2017-05-01/2018-05-01"),
+            MEDIUM_PRIORITY,
+            tasks
+        )
+    );
+
+    final Set<TaskLock> expectedLocks = ImmutableSet.of(
+        theLock
+    );
+    Assert.assertEquals(expectedLocks, getAllLocks(tasks));
+  }
+
+  @Test
+  public void testExclusiveLockCanRevokeAllIncompatible() throws Exception
+  {
+    final List<Task> tasks = new ArrayList<>();
+
+    // Revoked shared lock of higher priority -> revoked locks are not incompatible
+    final TaskLock sharedLock = insertTaskLock(
+        TaskLockType.SHARED,
+        Intervals.of("2016/2019"),
+        HIGH_PRIORITY,
+        tasks
+    );
+    Assert.assertNotNull(sharedLock);
+    lockbox.revokeLock(tasks.get(0).getId(), sharedLock);
+
+    // Active Exclusive lock of lower priority -> will be revoked
+    final TaskLock exclusiveLock = insertTaskLock(
+        TaskLockType.EXCLUSIVE,
+        Intervals.of("2017-01-01/2017-02-01"),
+        LOW_PRIORITY,
+        tasks
+    );
+    Assert.assertNotNull(exclusiveLock);
+    Assert.assertFalse(exclusiveLock.isRevoked());
+
+    // Active replace lock of lower priority -> will be revoked
+    final TaskLock replaceLock = insertTaskLock(
+        TaskLockType.REPLACE,
+        Intervals.of("2017-07-01/2018-01-01"),
+        LOW_PRIORITY,
+        tasks
+    );
+    Assert.assertNotNull(replaceLock);
+    Assert.assertFalse(replaceLock.isRevoked());
+
+    // Active append lock of lower priority -> will be revoked
+    final TaskLock appendLock = insertTaskLock(
+        TaskLockType.APPEND,
+        Intervals.of("2017-09-01/2017-10-01"),
+        LOW_PRIORITY,
+        tasks
+    );
+    Assert.assertNotNull(appendLock);
+    Assert.assertFalse(appendLock.isRevoked());
+
+    final TaskLock theLock = insertTaskLock(
+        TaskLockType.EXCLUSIVE,
+        Intervals.of("2017/2018"),
+        MEDIUM_PRIORITY,
+        tasks
+    );
+    Assert.assertNotNull(theLock);
+    Assert.assertFalse(theLock.isRevoked());
+
+    final Set<TaskLock> expectedLocks = ImmutableSet.of(
+        theLock,
+        sharedLock.revokedCopy(),
+        exclusiveLock.revokedCopy(),
+        appendLock.revokedCopy(),
+        replaceLock.revokedCopy()
+    );
+    Assert.assertEquals(expectedLocks, getAllLocks(tasks));
+
+    final Set<TaskLock> expectedActiveLocks = ImmutableSet.of(
+        theLock
+    );
+    Assert.assertEquals(expectedActiveLocks, getAllActiveLocks(tasks));
+  }
+
+  @Test
+  public void testSharedLockCompatibility() throws Exception
+  {
+    final List<Task> tasks = new ArrayList<>();
+
+    final TaskLock theLock = insertTaskLock(
+        TaskLockType.SHARED,
+        Intervals.of("2017/2018"),
+        MEDIUM_PRIORITY,
+        tasks
+    );
+    Assert.assertNotNull(theLock);
+    Assert.assertFalse(theLock.isRevoked());
+
+    // No overlapping exclusive lock is compatible
+    Assert.assertNull(
+        insertTaskLock(
+            TaskLockType.EXCLUSIVE,
+            Intervals.of("2017-05-01/2017-06-01"),
+            MEDIUM_PRIORITY,
+            tasks
+        )
+    );
+
+    // Enclosing shared lock of lower priority - compatible
+    final TaskLock sharedLock0 = insertTaskLock(
+            TaskLockType.SHARED,
+            Intervals.of("2016/2019"),
+            LOW_PRIORITY,
+            tasks
+    );
+    Assert.assertNotNull(sharedLock0);
+    Assert.assertFalse(sharedLock0.isRevoked());
+
+    // Enclosed shared lock of higher priority - compatible
+    final TaskLock sharedLock1 = insertTaskLock(
+        TaskLockType.SHARED,
+        Intervals.of("2017-06-01/2017-07-01"),
+        LOW_PRIORITY,
+        tasks
+    );
+    Assert.assertNotNull(sharedLock1);
+    Assert.assertFalse(sharedLock1.isRevoked());
+
+    // Partially Overlapping shared lock of equal priority - compatible
+    final TaskLock sharedLock2 = insertTaskLock(
+        TaskLockType.SHARED,
+        Intervals.of("2017-05-01/2018-05-01"),
+        MEDIUM_PRIORITY,
+        tasks
+    );
+    Assert.assertNotNull(sharedLock2);
+    Assert.assertFalse(sharedLock2.isRevoked());
+
+    // Conficting replace locks are incompatible
+    Assert.assertNull(
+        insertTaskLock(
+            TaskLockType.REPLACE,
+            Intervals.of("2017/2018"),
+            MEDIUM_PRIORITY,
+            tasks
+        )
+    );
+
+    // Conflicting append locks are incompatible
+    Assert.assertNull(
+        insertTaskLock(
+            TaskLockType.APPEND,
+            Intervals.of("2017-05-01/2018-05-01"),
+            MEDIUM_PRIORITY,
+            tasks
+        )
+    );
+
+    final Set<TaskLock> expectedLocks = ImmutableSet.of(
+        theLock,
+        sharedLock0,
+        sharedLock1,
+        sharedLock2
+    );
+    Assert.assertEquals(expectedLocks, getAllLocks(tasks));
+  }

Review Comment:
   Looking over these tests, they follow a very similar pattern.  Along with my commentary about wanting it to be simpler to decipher what the tests are validating, I think you can simplify things quite a bit.  You can make a `TaskLockboxValidator` which is a class that each test would instantiate.  That class can maintain the list of tasks (that you call task).  You can add methods on it like
   
   ```
   TaskLock expectLockCreated(Type, Interval, priority);
   
   void expectLockNotGranted(Type, Interval, priority);
   
   void expectLockIsNowRevoked(TaskLock);
   
   void validateExpectedLocks(List<TaskLock>);
   ```
   
   And then if you adjust these tests to be written in terms of that, I think that it will be both easier to understand what the tests are doing *and* simpler to write the tests.



##########
indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java:
##########
@@ -1272,6 +1276,627 @@ public void testGetLockedIntervalsForEqualPriorityTask() throws Exception
     );
   }
 
+  @Test
+  public void testExclusiveLockCompatibility() throws Exception
+  {
+    final List<Task> tasks = new ArrayList<>();
+
+    final TaskLock theLock = insertTaskLock(
+        TaskLockType.EXCLUSIVE,
+        Intervals.of("2017/2018"),
+        MEDIUM_PRIORITY,
+        tasks
+    );
+    Assert.assertNotNull(theLock);
+    Assert.assertFalse(theLock.isRevoked());
+
+    // Another exclusive lock cannot be created for an overlapping interval
+    Assert.assertNull(
+        insertTaskLock(
+            TaskLockType.EXCLUSIVE,
+            Intervals.of("2017-05-01/2017-06-01"),
+            MEDIUM_PRIORITY,
+            tasks
+        )
+    );
+
+    // A shared lock cannot be created for an overlapping interval
+    Assert.assertNull(
+        insertTaskLock(
+            TaskLockType.SHARED,
+            Intervals.of("2016/2019"),
+            MEDIUM_PRIORITY,
+            tasks
+        )
+    );
+
+    // A replace lock cannot be created for an overlapping interval
+    Assert.assertNull(
+        insertTaskLock(
+            TaskLockType.REPLACE,
+            Intervals.of("2017/2018"),
+            MEDIUM_PRIORITY,
+            tasks
+        )
+    );
+
+    // An append lock cannot be created for an overlapping interval
+    Assert.assertNull(
+        insertTaskLock(
+            TaskLockType.APPEND,
+            Intervals.of("2017-05-01/2018-05-01"),
+            MEDIUM_PRIORITY,
+            tasks
+        )
+    );
+
+    final Set<TaskLock> expectedLocks = ImmutableSet.of(
+        theLock
+    );
+    Assert.assertEquals(expectedLocks, getAllLocks(tasks));
+  }
+
+  @Test
+  public void testExclusiveLockCanRevokeAllIncompatible() throws Exception
+  {
+    final List<Task> tasks = new ArrayList<>();
+
+    // Revoked shared lock of higher priority -> revoked locks are not incompatible
+    final TaskLock sharedLock = insertTaskLock(
+        TaskLockType.SHARED,
+        Intervals.of("2016/2019"),
+        HIGH_PRIORITY,
+        tasks
+    );
+    Assert.assertNotNull(sharedLock);
+    lockbox.revokeLock(tasks.get(0).getId(), sharedLock);
+
+    // Active Exclusive lock of lower priority -> will be revoked
+    final TaskLock exclusiveLock = insertTaskLock(
+        TaskLockType.EXCLUSIVE,
+        Intervals.of("2017-01-01/2017-02-01"),
+        LOW_PRIORITY,
+        tasks
+    );
+    Assert.assertNotNull(exclusiveLock);
+    Assert.assertFalse(exclusiveLock.isRevoked());

Review Comment:
   Oooooh, I see, it's asserting that the lock was granted and the revoked is false...  I totally read it wrong, partially because your comment made me think that you were testing that it was going to be revoked.



##########
indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java:
##########
@@ -1272,6 +1276,627 @@ public void testGetLockedIntervalsForEqualPriorityTask() throws Exception
     );
   }
 
+  @Test
+  public void testExclusiveLockCompatibility() throws Exception
+  {
+    final List<Task> tasks = new ArrayList<>();
+
+    final TaskLock theLock = insertTaskLock(
+        TaskLockType.EXCLUSIVE,
+        Intervals.of("2017/2018"),
+        MEDIUM_PRIORITY,
+        tasks
+    );
+    Assert.assertNotNull(theLock);
+    Assert.assertFalse(theLock.isRevoked());
+
+    // Another exclusive lock cannot be created for an overlapping interval
+    Assert.assertNull(
+        insertTaskLock(
+            TaskLockType.EXCLUSIVE,
+            Intervals.of("2017-05-01/2017-06-01"),
+            MEDIUM_PRIORITY,
+            tasks
+        )
+    );
+
+    // A shared lock cannot be created for an overlapping interval
+    Assert.assertNull(
+        insertTaskLock(
+            TaskLockType.SHARED,
+            Intervals.of("2016/2019"),
+            MEDIUM_PRIORITY,
+            tasks
+        )
+    );
+
+    // A replace lock cannot be created for an overlapping interval
+    Assert.assertNull(
+        insertTaskLock(
+            TaskLockType.REPLACE,
+            Intervals.of("2017/2018"),
+            MEDIUM_PRIORITY,
+            tasks
+        )
+    );
+
+    // An append lock cannot be created for an overlapping interval
+    Assert.assertNull(
+        insertTaskLock(
+            TaskLockType.APPEND,
+            Intervals.of("2017-05-01/2018-05-01"),
+            MEDIUM_PRIORITY,
+            tasks
+        )
+    );
+
+    final Set<TaskLock> expectedLocks = ImmutableSet.of(
+        theLock
+    );
+    Assert.assertEquals(expectedLocks, getAllLocks(tasks));
+  }
+
+  @Test
+  public void testExclusiveLockCanRevokeAllIncompatible() throws Exception
+  {
+    final List<Task> tasks = new ArrayList<>();
+
+    // Revoked shared lock of higher priority -> revoked locks are not incompatible
+    final TaskLock sharedLock = insertTaskLock(
+        TaskLockType.SHARED,
+        Intervals.of("2016/2019"),
+        HIGH_PRIORITY,
+        tasks
+    );
+    Assert.assertNotNull(sharedLock);
+    lockbox.revokeLock(tasks.get(0).getId(), sharedLock);

Review Comment:
   not incompatible means compatible right?



##########
indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java:
##########
@@ -1272,6 +1276,627 @@ public void testGetLockedIntervalsForEqualPriorityTask() throws Exception
     );
   }
 
+  @Test
+  public void testExclusiveLockCompatibility() throws Exception
+  {
+    final List<Task> tasks = new ArrayList<>();
+
+    final TaskLock theLock = insertTaskLock(
+        TaskLockType.EXCLUSIVE,
+        Intervals.of("2017/2018"),
+        MEDIUM_PRIORITY,
+        tasks
+    );
+    Assert.assertNotNull(theLock);
+    Assert.assertFalse(theLock.isRevoked());
+
+    // Another exclusive lock cannot be created for an overlapping interval
+    Assert.assertNull(
+        insertTaskLock(
+            TaskLockType.EXCLUSIVE,
+            Intervals.of("2017-05-01/2017-06-01"),
+            MEDIUM_PRIORITY,
+            tasks
+        )
+    );
+
+    // A shared lock cannot be created for an overlapping interval
+    Assert.assertNull(
+        insertTaskLock(
+            TaskLockType.SHARED,
+            Intervals.of("2016/2019"),
+            MEDIUM_PRIORITY,
+            tasks
+        )
+    );
+
+    // A replace lock cannot be created for an overlapping interval
+    Assert.assertNull(
+        insertTaskLock(
+            TaskLockType.REPLACE,
+            Intervals.of("2017/2018"),
+            MEDIUM_PRIORITY,
+            tasks
+        )
+    );
+
+    // An append lock cannot be created for an overlapping interval
+    Assert.assertNull(
+        insertTaskLock(
+            TaskLockType.APPEND,
+            Intervals.of("2017-05-01/2018-05-01"),
+            MEDIUM_PRIORITY,
+            tasks
+        )
+    );
+
+    final Set<TaskLock> expectedLocks = ImmutableSet.of(
+        theLock
+    );
+    Assert.assertEquals(expectedLocks, getAllLocks(tasks));
+  }
+
+  @Test
+  public void testExclusiveLockCanRevokeAllIncompatible() throws Exception
+  {
+    final List<Task> tasks = new ArrayList<>();
+
+    // Revoked shared lock of higher priority -> revoked locks are not incompatible
+    final TaskLock sharedLock = insertTaskLock(
+        TaskLockType.SHARED,
+        Intervals.of("2016/2019"),
+        HIGH_PRIORITY,
+        tasks
+    );
+    Assert.assertNotNull(sharedLock);
+    lockbox.revokeLock(tasks.get(0).getId(), sharedLock);
+
+    // Active Exclusive lock of lower priority -> will be revoked
+    final TaskLock exclusiveLock = insertTaskLock(
+        TaskLockType.EXCLUSIVE,
+        Intervals.of("2017-01-01/2017-02-01"),
+        LOW_PRIORITY,
+        tasks
+    );
+    Assert.assertNotNull(exclusiveLock);
+    Assert.assertFalse(exclusiveLock.isRevoked());

Review Comment:
   Why will this be revoked?  the shared lock that it would conflict with was revoked, in which case this should be fine, no?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] AmatyaAvadhanula commented on a diff in pull request #14258: Add new lock types: APPEND and REPLACE

Posted by "AmatyaAvadhanula (via GitHub)" <gi...@apache.org>.
AmatyaAvadhanula commented on code in PR #14258:
URL: https://github.com/apache/druid/pull/14258#discussion_r1192055445


##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java:
##########
@@ -1239,43 +1262,190 @@ Map<String, NavigableMap<DateTime, SortedMap<Interval, List<TaskLockPosse>>>> ge
   }
 
   /**
-   * Check if all lockPosses are either shared
-   * OR of lower priority
-   * OR are revoked non-shared locks if their priorities are greater than or equal to the provided priority
-   * @param lockPosses conflicting task lock posses to be checked
-   * @param priority priority of the lock to be acquired
-   * @return true if the condititons are met
+   * Check if the lock for a given request can coexist with a given set of conflicting posses without any revocation.
+   * @param conflictPosses conflict lock posses
+   * @param request lock request
+   * @return true iff the lock can coexist with all its conflicting locks
    */
-  private static boolean areAllEqualOrHigherPriorityLocksSharedOrRevoked(List<TaskLockPosse> lockPosses, int priority)
+  private boolean canLockCoexist(List<TaskLockPosse> conflictPosses, LockRequest request)
   {
-    return lockPosses.stream()
-                     .filter(taskLockPosse -> taskLockPosse.getTaskLock().getNonNullPriority() >= priority)
-                     .allMatch(taskLockPosse -> taskLockPosse.getTaskLock().getType().equals(TaskLockType.SHARED)
-                                                || taskLockPosse.getTaskLock().isRevoked());
+    switch (request.getType()) {
+      case APPEND:
+        return canAppendLockCoexist(conflictPosses, request);
+      case REPLACE:
+        return canReplaceLockCoexist(conflictPosses, request);
+      case SHARED:
+        return canSharedLockCoexist(conflictPosses);
+      case EXCLUSIVE:
+        return canExclusiveLockCoexist(conflictPosses);
+      default:
+        return false;
+    }
   }
 
   /**
-   * Revokes all non-shared locks with priorities lower than the provided priority
-   * @param lockPosses conflicting task lock posses which may be revoked
-   * @param priority priority of the lock to be acquired
+   * Check if an APPEND lock can coexist with a given set of conflicting posses.
+   * An APPEND lock can coexist with any number of other APPEND locks
+   *    OR with at most one REPLACE lock over an interval which encloes this request.
+   * @param conflictPosses conflicting lock posses
+   * @param appendRequest append lock request
+   * @return true iff append lock can coexist with all its conflicting locks
    */
-  private void revokeAllLowerPriorityNonSharedLocks(List<TaskLockPosse> lockPosses, int priority)
+  private boolean canAppendLockCoexist(List<TaskLockPosse> conflictPosses, LockRequest appendRequest)
   {
-    lockPosses.stream()
-              .filter(taskLockPosse -> !TaskLockType.SHARED.equals(taskLockPosse.getTaskLock().getType()))
-              .filter(taskLockPosse -> taskLockPosse.getTaskLock().getNonNullPriority() < priority)
-              .forEach(this::revokeLock);
+    TaskLock replaceLock = null;
+    for (TaskLockPosse posse : conflictPosses) {
+      if (posse.getTaskLock().isRevoked()) {
+        continue;
+      }
+      if (posse.getTaskLock().getType().equals(TaskLockType.EXCLUSIVE)
+          || posse.getTaskLock().getType().equals(TaskLockType.SHARED)) {
+        return false;
+      }
+      if (posse.getTaskLock().getType().equals(TaskLockType.REPLACE)) {
+        if (replaceLock != null) {
+          return false;
+        }
+        replaceLock = posse.getTaskLock();
+        if (!replaceLock.getInterval().contains(appendRequest.getInterval())) {
+          return false;
+        }
+      }
+    }
+    return true;
   }
 
-  private static boolean isAllRevocable(List<TaskLockPosse> lockPosses, int tryLockPriority)
+  /**
+   * Check if a REPLACE lock can coexist with a given set of conflicting posses.
+   * A REPLACE lock can coexist with any number of other APPEND locks and revoked locks
+   * @param conflictPosses conflicting lock posses
+   * @param replaceLock replace lock request
+   * @return true iff replace lock can coexist with all its conflicting locks
+   */
+  private boolean canReplaceLockCoexist(List<TaskLockPosse> conflictPosses, LockRequest replaceLock)
   {
-    return lockPosses.stream().allMatch(taskLockPosse -> isRevocable(taskLockPosse, tryLockPriority));
+    for (TaskLockPosse posse : conflictPosses) {
+      if (posse.getTaskLock().isRevoked()) {
+        continue;
+      }
+      if (posse.getTaskLock().getType().equals(TaskLockType.EXCLUSIVE)
+          || posse.getTaskLock().getType().equals(TaskLockType.SHARED)
+          || posse.getTaskLock().getType().equals(TaskLockType.REPLACE)) {
+        return false;
+      }
+      if (posse.getTaskLock().getType().equals(TaskLockType.APPEND)
+          && !replaceLock.getInterval().contains(posse.getTaskLock().getInterval())) {
+        return false;
+      }
+    }
+    return true;
   }
 
-  private static boolean isRevocable(TaskLockPosse lockPosse, int tryLockPriority)
+  /**
+   * Check if a SHARED lock can coexist with a given set of conflicting posses.
+   * A SHARED lock can coexist with any number of other active SHARED locks
+   * @param conflictPosses conflicting lock posses
+   * @return true iff shared lock can coexist with all its conflicting locks
+   */
+  private boolean canSharedLockCoexist(List<TaskLockPosse> conflictPosses)
   {
-    final TaskLock existingLock = lockPosse.getTaskLock();
-    return existingLock.isRevoked() || existingLock.getNonNullPriority() < tryLockPriority;
+    for (TaskLockPosse posse : conflictPosses) {
+      if (posse.getTaskLock().isRevoked()) {
+        continue;
+      }
+      if (posse.getTaskLock().getType().equals(TaskLockType.EXCLUSIVE)
+          || posse.getTaskLock().getType().equals(TaskLockType.APPEND)
+          || posse.getTaskLock().getType().equals(TaskLockType.REPLACE)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Check if an EXCLUSIVE lock can coexist with a given set of conflicting posses.
+   * An EXCLUSIVE lock cannot coexist with any other overlapping active locks
+   * @param conflictPosses conflicting lock posses
+   * @return true iff the exclusive lock can coexist with all its conflicting locks
+   */
+  private boolean canExclusiveLockCoexist(List<TaskLockPosse> conflictPosses)
+  {
+    for (TaskLockPosse posse : conflictPosses) {
+      if (posse.getTaskLock().isRevoked()) {
+        continue;
+      }
+      return false;
+    }
+    return true;
+  }
+
+
+  /**
+   * Verify if each incompatible active lock is revokable. If yes, revoke all of them.
+   * - EXCLUSIVE locks are incompatible with every other conflicting lock
+   * - SHARED locks are incompatible with conflicting locks of every other type
+   * - REPLACE locks are incompatible with every conflicting lock which is not (APPEND and enclosed) within its interval
+   * - APPEND locks are incompatible with every EXCLUSIVE and SHARED lock.
+   *   Conflicting REPLACE locks which don't enclose its interval are also incompatible.
+   * @param conflictPosses conflicting lock posses
+   * @param request lock request
+   * @return true iff every incompatible lock is revocable.
+   */
+  private boolean revokeAllIncompatibleActiveLocksIfPossible(
+      List<TaskLockPosse> conflictPosses,
+      LockRequest request
+  )
+  {
+    final int priority = request.getPriority();
+    final TaskLockType type = request.getType();
+    final List<TaskLockPosse> possesToRevoke = new ArrayList<>();
+
+    for (TaskLockPosse posse : conflictPosses) {
+      if (posse.getTaskLock().isRevoked()) {
+        continue;
+      }
+      switch (type) {
+        case EXCLUSIVE:
+          if (posse.getTaskLock().getPriority() >= priority) {
+            return false;
+          }
+          possesToRevoke.add(posse);
+          break;
+        case SHARED:
+          if (!posse.getTaskLock().getType().equals(TaskLockType.SHARED)) {
+            if (posse.getTaskLock().getPriority() >= priority) {
+              return false;
+            }
+            possesToRevoke.add(posse);
+          }
+          break;
+        case REPLACE:
+          if (!(posse.getTaskLock().getType().equals(TaskLockType.APPEND)
+                && request.getInterval().contains(posse.getTaskLock().getInterval()))) {
+            if (posse.getTaskLock().getPriority() >= priority) {
+              return false;
+            }
+            possesToRevoke.add(posse);
+          }
+          break;
+        case APPEND:
+          if (!(posse.getTaskLock().getType().equals(TaskLockType.APPEND)
+                || (posse.getTaskLock().getType().equals(TaskLockType.REPLACE)
+                    && posse.getTaskLock().getInterval().contains(request.getInterval())))) {
+            if (posse.getTaskLock().getPriority() >= priority) {
+              return false;
+            }
+            possesToRevoke.add(posse);
+          }
+          break;
+        default:
+          return false;
+      }

Review Comment:
   TaskLockPosse is not available in TaskLockType



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] AmatyaAvadhanula commented on a diff in pull request #14258: Add new lock types: APPEND and REPLACE

Posted by "AmatyaAvadhanula (via GitHub)" <gi...@apache.org>.
AmatyaAvadhanula commented on code in PR #14258:
URL: https://github.com/apache/druid/pull/14258#discussion_r1192054957


##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java:
##########
@@ -1239,43 +1262,190 @@ Map<String, NavigableMap<DateTime, SortedMap<Interval, List<TaskLockPosse>>>> ge
   }
 
   /**
-   * Check if all lockPosses are either shared
-   * OR of lower priority
-   * OR are revoked non-shared locks if their priorities are greater than or equal to the provided priority
-   * @param lockPosses conflicting task lock posses to be checked
-   * @param priority priority of the lock to be acquired
-   * @return true if the condititons are met
+   * Check if the lock for a given request can coexist with a given set of conflicting posses without any revocation.
+   * @param conflictPosses conflict lock posses
+   * @param request lock request
+   * @return true iff the lock can coexist with all its conflicting locks
    */
-  private static boolean areAllEqualOrHigherPriorityLocksSharedOrRevoked(List<TaskLockPosse> lockPosses, int priority)
+  private boolean canLockCoexist(List<TaskLockPosse> conflictPosses, LockRequest request)
   {
-    return lockPosses.stream()
-                     .filter(taskLockPosse -> taskLockPosse.getTaskLock().getNonNullPriority() >= priority)
-                     .allMatch(taskLockPosse -> taskLockPosse.getTaskLock().getType().equals(TaskLockType.SHARED)
-                                                || taskLockPosse.getTaskLock().isRevoked());
+    switch (request.getType()) {
+      case APPEND:
+        return canAppendLockCoexist(conflictPosses, request);
+      case REPLACE:
+        return canReplaceLockCoexist(conflictPosses, request);
+      case SHARED:
+        return canSharedLockCoexist(conflictPosses);
+      case EXCLUSIVE:
+        return canExclusiveLockCoexist(conflictPosses);
+      default:
+        return false;
+    }
   }
 
   /**
-   * Revokes all non-shared locks with priorities lower than the provided priority
-   * @param lockPosses conflicting task lock posses which may be revoked
-   * @param priority priority of the lock to be acquired
+   * Check if an APPEND lock can coexist with a given set of conflicting posses.
+   * An APPEND lock can coexist with any number of other APPEND locks
+   *    OR with at most one REPLACE lock over an interval which encloes this request.
+   * @param conflictPosses conflicting lock posses
+   * @param appendRequest append lock request
+   * @return true iff append lock can coexist with all its conflicting locks
    */
-  private void revokeAllLowerPriorityNonSharedLocks(List<TaskLockPosse> lockPosses, int priority)
+  private boolean canAppendLockCoexist(List<TaskLockPosse> conflictPosses, LockRequest appendRequest)
   {
-    lockPosses.stream()
-              .filter(taskLockPosse -> !TaskLockType.SHARED.equals(taskLockPosse.getTaskLock().getType()))
-              .filter(taskLockPosse -> taskLockPosse.getTaskLock().getNonNullPriority() < priority)
-              .forEach(this::revokeLock);
+    TaskLock replaceLock = null;
+    for (TaskLockPosse posse : conflictPosses) {
+      if (posse.getTaskLock().isRevoked()) {
+        continue;
+      }
+      if (posse.getTaskLock().getType().equals(TaskLockType.EXCLUSIVE)
+          || posse.getTaskLock().getType().equals(TaskLockType.SHARED)) {
+        return false;
+      }
+      if (posse.getTaskLock().getType().equals(TaskLockType.REPLACE)) {
+        if (replaceLock != null) {
+          return false;
+        }
+        replaceLock = posse.getTaskLock();
+        if (!replaceLock.getInterval().contains(appendRequest.getInterval())) {
+          return false;
+        }
+      }
+    }
+    return true;
   }
 
-  private static boolean isAllRevocable(List<TaskLockPosse> lockPosses, int tryLockPriority)
+  /**
+   * Check if a REPLACE lock can coexist with a given set of conflicting posses.
+   * A REPLACE lock can coexist with any number of other APPEND locks and revoked locks
+   * @param conflictPosses conflicting lock posses
+   * @param replaceLock replace lock request
+   * @return true iff replace lock can coexist with all its conflicting locks
+   */
+  private boolean canReplaceLockCoexist(List<TaskLockPosse> conflictPosses, LockRequest replaceLock)
   {
-    return lockPosses.stream().allMatch(taskLockPosse -> isRevocable(taskLockPosse, tryLockPriority));
+    for (TaskLockPosse posse : conflictPosses) {
+      if (posse.getTaskLock().isRevoked()) {
+        continue;
+      }
+      if (posse.getTaskLock().getType().equals(TaskLockType.EXCLUSIVE)
+          || posse.getTaskLock().getType().equals(TaskLockType.SHARED)
+          || posse.getTaskLock().getType().equals(TaskLockType.REPLACE)) {
+        return false;
+      }
+      if (posse.getTaskLock().getType().equals(TaskLockType.APPEND)
+          && !replaceLock.getInterval().contains(posse.getTaskLock().getInterval())) {
+        return false;
+      }
+    }
+    return true;
   }
 
-  private static boolean isRevocable(TaskLockPosse lockPosse, int tryLockPriority)
+  /**
+   * Check if a SHARED lock can coexist with a given set of conflicting posses.
+   * A SHARED lock can coexist with any number of other active SHARED locks
+   * @param conflictPosses conflicting lock posses
+   * @return true iff shared lock can coexist with all its conflicting locks
+   */
+  private boolean canSharedLockCoexist(List<TaskLockPosse> conflictPosses)
   {
-    final TaskLock existingLock = lockPosse.getTaskLock();
-    return existingLock.isRevoked() || existingLock.getNonNullPriority() < tryLockPriority;
+    for (TaskLockPosse posse : conflictPosses) {
+      if (posse.getTaskLock().isRevoked()) {
+        continue;
+      }
+      if (posse.getTaskLock().getType().equals(TaskLockType.EXCLUSIVE)
+          || posse.getTaskLock().getType().equals(TaskLockType.APPEND)
+          || posse.getTaskLock().getType().equals(TaskLockType.REPLACE)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Check if an EXCLUSIVE lock can coexist with a given set of conflicting posses.
+   * An EXCLUSIVE lock cannot coexist with any other overlapping active locks
+   * @param conflictPosses conflicting lock posses
+   * @return true iff the exclusive lock can coexist with all its conflicting locks
+   */
+  private boolean canExclusiveLockCoexist(List<TaskLockPosse> conflictPosses)
+  {
+    for (TaskLockPosse posse : conflictPosses) {
+      if (posse.getTaskLock().isRevoked()) {
+        continue;
+      }
+      return false;
+    }
+    return true;
+  }
+
+
+  /**
+   * Verify if each incompatible active lock is revokable. If yes, revoke all of them.
+   * - EXCLUSIVE locks are incompatible with every other conflicting lock
+   * - SHARED locks are incompatible with conflicting locks of every other type
+   * - REPLACE locks are incompatible with every conflicting lock which is not (APPEND and enclosed) within its interval
+   * - APPEND locks are incompatible with every EXCLUSIVE and SHARED lock.

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] cheddar merged pull request #14258: Add new lock types: APPEND and REPLACE

Posted by "cheddar (via GitHub)" <gi...@apache.org>.
cheddar merged PR #14258:
URL: https://github.com/apache/druid/pull/14258


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org