You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by "github-code-scanning[bot] (via GitHub)" <gi...@apache.org> on 2023/05/11 12:13:55 UTC

[GitHub] [druid] github-code-scanning[bot] commented on a diff in pull request #14258: Add new lock types: APPEND and REPLACE

github-code-scanning[bot] commented on code in PR #14258:
URL: https://github.com/apache/druid/pull/14258#discussion_r1191080978


##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java:
##########
@@ -1239,43 +1262,192 @@
   }
 
   /**
-   * Check if all lockPosses are either shared
-   * OR of lower priority
-   * OR are revoked non-shared locks if their priorities are greater than or equal to the provided priority
-   * @param lockPosses conflicting task lock posses to be checked
-   * @param priority priority of the lock to be acquired
-   * @return true if the condititons are met
+   * Check if the lock for a given request can coexist with a given set of conflicting posses without any revocation.
+   * @param conflictPosses conflict lock posses
+   * @param request lock request
+   * @return true iff the lock can coexist with all its conflicting locks
    */
-  private static boolean areAllEqualOrHigherPriorityLocksSharedOrRevoked(List<TaskLockPosse> lockPosses, int priority)
+  private boolean canLockCoexist(List<TaskLockPosse> conflictPosses, LockRequest request)
   {
-    return lockPosses.stream()
-                     .filter(taskLockPosse -> taskLockPosse.getTaskLock().getNonNullPriority() >= priority)
-                     .allMatch(taskLockPosse -> taskLockPosse.getTaskLock().getType().equals(TaskLockType.SHARED)
-                                                || taskLockPosse.getTaskLock().isRevoked());
+    switch (request.getType()) {
+      case APPEND:
+        return canAppendLockCoexist(conflictPosses, request);
+      case REPLACE:
+        return canReplaceLockCoexist(conflictPosses, request);
+      case SHARED:
+        return canSharedLockCoexist(conflictPosses, request);
+      case EXCLUSIVE:
+        return canExclusiveLockCoexist(conflictPosses, request);
+      default:
+        return false;
+    }
   }
 
   /**
-   * Revokes all non-shared locks with priorities lower than the provided priority
-   * @param lockPosses conflicting task lock posses which may be revoked
-   * @param priority priority of the lock to be acquired
+   * Check if an APPEND lock can coexist with a given set of conflicting posses.
+   * An APPEND lock can coexist with any number of other APPEND locks
+   *    OR with at most one REPLACE lock over an interval which encloes this request.
+   * @param conflictPosses conflicting lock posses
+   * @param appendRequest append lock request
+   * @return true iff append lock can coexist with all its conflicting locks
    */
-  private void revokeAllLowerPriorityNonSharedLocks(List<TaskLockPosse> lockPosses, int priority)
+  private boolean canAppendLockCoexist(List<TaskLockPosse> conflictPosses, LockRequest appendRequest)
   {
-    lockPosses.stream()
-              .filter(taskLockPosse -> !TaskLockType.SHARED.equals(taskLockPosse.getTaskLock().getType()))
-              .filter(taskLockPosse -> taskLockPosse.getTaskLock().getNonNullPriority() < priority)
-              .forEach(this::revokeLock);
+    TaskLock replaceLock = null;
+    for (TaskLockPosse posse : conflictPosses) {
+      if (posse.getTaskLock().isRevoked()) {
+        continue;
+      }
+      if (posse.getTaskLock().getType().equals(TaskLockType.EXCLUSIVE)
+          || posse.getTaskLock().getType().equals(TaskLockType.SHARED)) {
+        return false;
+      }
+      if (posse.getTaskLock().getType().equals(TaskLockType.REPLACE)) {
+        if (replaceLock != null) {
+          return false;
+        }
+        replaceLock = posse.getTaskLock();
+        if (!replaceLock.getInterval().contains(appendRequest.getInterval())) {
+          return false;
+        }
+      }
+    }
+    return true;
   }
 
-  private static boolean isAllRevocable(List<TaskLockPosse> lockPosses, int tryLockPriority)
+  /**
+   * Check if a REPLACE lock can coexist with a given set of conflicting posses.
+   * A REPLACE lock can coexist with any number of other APPEND locks and revoked locks
+   * @param conflictPosses conflicting lock posses
+   * @param replaceLock replace lock request
+   * @return true iff replace lock can coexist with all its conflicting locks
+   */
+  private static boolean canReplaceLockCoexist(List<TaskLockPosse> conflictPosses, LockRequest replaceLock)
   {
-    return lockPosses.stream().allMatch(taskLockPosse -> isRevocable(taskLockPosse, tryLockPriority));
+    for (TaskLockPosse posse : conflictPosses) {
+      if (posse.getTaskLock().isRevoked()) {
+        continue;
+      }
+      if (posse.getTaskLock().getType().equals(TaskLockType.EXCLUSIVE)
+          || posse.getTaskLock().getType().equals(TaskLockType.SHARED)
+          || posse.getTaskLock().getType().equals(TaskLockType.REPLACE)) {
+        return false;
+      }
+      if (posse.getTaskLock().getType().equals(TaskLockType.APPEND)
+          && !replaceLock.getInterval().contains(posse.getTaskLock().getInterval())) {
+        return false;
+      }
+    }
+    return true;
   }
 
-  private static boolean isRevocable(TaskLockPosse lockPosse, int tryLockPriority)
+  /**
+   * Check if a SHARED lock can coexist with a given set of conflicting posses.
+   * A SHARED lock can coexist with any number of other active SHARED locks
+   * @param conflictPosses conflicting lock posses
+   * @param sharedLock replace lock request
+   * @return true iff shared lock can coexist with all its conflicting locks
+   */
+  private static boolean canSharedLockCoexist(List<TaskLockPosse> conflictPosses, LockRequest sharedLock)
   {
-    final TaskLock existingLock = lockPosse.getTaskLock();
-    return existingLock.isRevoked() || existingLock.getNonNullPriority() < tryLockPriority;
+    for (TaskLockPosse posse : conflictPosses) {
+      if (posse.getTaskLock().isRevoked()) {
+        continue;
+      }
+      if (posse.getTaskLock().getType().equals(TaskLockType.EXCLUSIVE)
+          || posse.getTaskLock().getType().equals(TaskLockType.APPEND)
+          || posse.getTaskLock().getType().equals(TaskLockType.REPLACE)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Check if an EXCLUSIVE lock can coexist with a given set of conflicting posses.
+   * An EXCLUSIVE lock cannot coexist with any other overlapping active locks
+   * @param conflictPosses conflicting lock posses
+   * @param exclusiveLock exclusive lock request
+   * @return true iff the exclusive lock can coexist with all its conflicting locks
+   */
+  private static boolean canExclusiveLockCoexist(List<TaskLockPosse> conflictPosses, LockRequest exclusiveLock)

Review Comment:
   ## Useless parameter
   
   The parameter 'exclusiveLock' is never used.
   
   [Show more details](https://github.com/apache/druid/security/code-scanning/4945)



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java:
##########
@@ -1239,43 +1262,192 @@
   }
 
   /**
-   * Check if all lockPosses are either shared
-   * OR of lower priority
-   * OR are revoked non-shared locks if their priorities are greater than or equal to the provided priority
-   * @param lockPosses conflicting task lock posses to be checked
-   * @param priority priority of the lock to be acquired
-   * @return true if the condititons are met
+   * Check if the lock for a given request can coexist with a given set of conflicting posses without any revocation.
+   * @param conflictPosses conflict lock posses
+   * @param request lock request
+   * @return true iff the lock can coexist with all its conflicting locks
    */
-  private static boolean areAllEqualOrHigherPriorityLocksSharedOrRevoked(List<TaskLockPosse> lockPosses, int priority)
+  private boolean canLockCoexist(List<TaskLockPosse> conflictPosses, LockRequest request)
   {
-    return lockPosses.stream()
-                     .filter(taskLockPosse -> taskLockPosse.getTaskLock().getNonNullPriority() >= priority)
-                     .allMatch(taskLockPosse -> taskLockPosse.getTaskLock().getType().equals(TaskLockType.SHARED)
-                                                || taskLockPosse.getTaskLock().isRevoked());
+    switch (request.getType()) {
+      case APPEND:
+        return canAppendLockCoexist(conflictPosses, request);
+      case REPLACE:
+        return canReplaceLockCoexist(conflictPosses, request);
+      case SHARED:
+        return canSharedLockCoexist(conflictPosses, request);
+      case EXCLUSIVE:
+        return canExclusiveLockCoexist(conflictPosses, request);
+      default:
+        return false;
+    }
   }
 
   /**
-   * Revokes all non-shared locks with priorities lower than the provided priority
-   * @param lockPosses conflicting task lock posses which may be revoked
-   * @param priority priority of the lock to be acquired
+   * Check if an APPEND lock can coexist with a given set of conflicting posses.
+   * An APPEND lock can coexist with any number of other APPEND locks
+   *    OR with at most one REPLACE lock over an interval which encloes this request.
+   * @param conflictPosses conflicting lock posses
+   * @param appendRequest append lock request
+   * @return true iff append lock can coexist with all its conflicting locks
    */
-  private void revokeAllLowerPriorityNonSharedLocks(List<TaskLockPosse> lockPosses, int priority)
+  private boolean canAppendLockCoexist(List<TaskLockPosse> conflictPosses, LockRequest appendRequest)
   {
-    lockPosses.stream()
-              .filter(taskLockPosse -> !TaskLockType.SHARED.equals(taskLockPosse.getTaskLock().getType()))
-              .filter(taskLockPosse -> taskLockPosse.getTaskLock().getNonNullPriority() < priority)
-              .forEach(this::revokeLock);
+    TaskLock replaceLock = null;
+    for (TaskLockPosse posse : conflictPosses) {
+      if (posse.getTaskLock().isRevoked()) {
+        continue;
+      }
+      if (posse.getTaskLock().getType().equals(TaskLockType.EXCLUSIVE)
+          || posse.getTaskLock().getType().equals(TaskLockType.SHARED)) {
+        return false;
+      }
+      if (posse.getTaskLock().getType().equals(TaskLockType.REPLACE)) {
+        if (replaceLock != null) {
+          return false;
+        }
+        replaceLock = posse.getTaskLock();
+        if (!replaceLock.getInterval().contains(appendRequest.getInterval())) {
+          return false;
+        }
+      }
+    }
+    return true;
   }
 
-  private static boolean isAllRevocable(List<TaskLockPosse> lockPosses, int tryLockPriority)
+  /**
+   * Check if a REPLACE lock can coexist with a given set of conflicting posses.
+   * A REPLACE lock can coexist with any number of other APPEND locks and revoked locks
+   * @param conflictPosses conflicting lock posses
+   * @param replaceLock replace lock request
+   * @return true iff replace lock can coexist with all its conflicting locks
+   */
+  private static boolean canReplaceLockCoexist(List<TaskLockPosse> conflictPosses, LockRequest replaceLock)
   {
-    return lockPosses.stream().allMatch(taskLockPosse -> isRevocable(taskLockPosse, tryLockPriority));
+    for (TaskLockPosse posse : conflictPosses) {
+      if (posse.getTaskLock().isRevoked()) {
+        continue;
+      }
+      if (posse.getTaskLock().getType().equals(TaskLockType.EXCLUSIVE)
+          || posse.getTaskLock().getType().equals(TaskLockType.SHARED)
+          || posse.getTaskLock().getType().equals(TaskLockType.REPLACE)) {
+        return false;
+      }
+      if (posse.getTaskLock().getType().equals(TaskLockType.APPEND)
+          && !replaceLock.getInterval().contains(posse.getTaskLock().getInterval())) {
+        return false;
+      }
+    }
+    return true;
   }
 
-  private static boolean isRevocable(TaskLockPosse lockPosse, int tryLockPriority)
+  /**
+   * Check if a SHARED lock can coexist with a given set of conflicting posses.
+   * A SHARED lock can coexist with any number of other active SHARED locks
+   * @param conflictPosses conflicting lock posses
+   * @param sharedLock replace lock request
+   * @return true iff shared lock can coexist with all its conflicting locks
+   */
+  private static boolean canSharedLockCoexist(List<TaskLockPosse> conflictPosses, LockRequest sharedLock)

Review Comment:
   ## Useless parameter
   
   The parameter 'sharedLock' is never used.
   
   [Show more details](https://github.com/apache/druid/security/code-scanning/4946)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org