You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ch...@apache.org on 2023/05/15 05:38:39 UTC
[druid] branch master updated: Add new lock types: APPEND and REPLACE (#14258)
This is an automated email from the ASF dual-hosted git repository.
cheddar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new e9913abbbf Add new lock types: APPEND and REPLACE (#14258)
e9913abbbf is described below
commit e9913abbbfed82c11d1f876f30f24a768267ab48
Author: AmatyaAvadhanula <am...@imply.io>
AuthorDate: Mon May 15 11:08:32 2023 +0530
Add new lock types: APPEND and REPLACE (#14258)
* Add new lock types: APPEND and REPLACE
---
.../apache/druid/indexing/common/TaskLockType.java | 29 +-
.../druid/indexing/overlord/TaskLockbox.java | 237 ++++++--
.../druid/indexing/overlord/TaskLockboxTest.java | 648 +++++++++++++++++----
3 files changed, 744 insertions(+), 170 deletions(-)
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskLockType.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskLockType.java
index b51990cee0..43f79cf265 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskLockType.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskLockType.java
@@ -19,8 +19,35 @@
package org.apache.druid.indexing.common;
+/**
+ * 1) A revoked lock is compatible with every lock.
+ * 2) An appending task may use only EXCLUSIVE, SHARED or APPEND locks
+ * 3) A replacing task may use only EXCLUSIVE or REPLACE locks
+ * 4) REPLACE and APPEND locks can only be used with timechunk locking
+ */
public enum TaskLockType
{
+ /**
+ * There can be at most one active EXCLUSIVE lock for a given interval.
+ * It cannot co-exist with any other active locks with overlapping intervals.
+ */
+ EXCLUSIVE,
+ /**
+ * There can be any number of active SHARED locks for a given interval.
+ * They can coexist only with other SHARED locks, but not with active locks of other types.
+ */
SHARED,
- EXCLUSIVE // taskLocks of this type can be shared by tasks of the same groupId.
+ /**
+ * There can be at most one active REPLACE lock for a given interval.
+ * It can co-exist only with APPEND locks whose intervals are enclosed within its interval,
+ * and is incompatible with all other active locks with overlapping intervals.
+ */
+ REPLACE,
+ /**
+ * There can be any number of active APPEND locks for a given interval.
+ * They can coexist with other APPEND locks,
+ * and with at most one REPLACE lock whose interval encloses that of the APPEND lock.
+ * They are incompatible with all other active locks.
+ */
+ APPEND
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
index 927b09557f..f84f5f3075 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
@@ -39,6 +39,7 @@ import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.UOE;
import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
@@ -614,13 +615,21 @@ public class TaskLockbox
if (reusablePosses.size() == 0) {
// case 1) this task doesn't have any lock, but others do
- if (request.getType().equals(TaskLockType.SHARED)
- && areAllEqualOrHigherPriorityLocksSharedOrRevoked(conflictPosses, request.getPriority())) {
- // Any number of shared locks can be acquired for the same dataSource and interval
- // Exclusive locks of equal or greater priority, if present, must already be revoked
- // Exclusive locks of lower priority can be revoked
- revokeAllLowerPriorityNonSharedLocks(conflictPosses, request.getPriority());
+ if ((request.getType().equals(TaskLockType.APPEND) || request.getType().equals(TaskLockType.REPLACE))
+ && !request.getGranularity().equals(LockGranularity.TIME_CHUNK)) {
+ // APPEND and REPLACE locks are specific to time chunk locks
+ return null;
+ }
+
+ // First, check if the lock can coexist with its conflicting posses
+ if (canLockCoexist(conflictPosses, request)) {
+ return createNewTaskLockPosse(request);
+ }
+
+ // If not, revoke all lower priority locks of different types if the request has a greater priority
+ if (revokeAllIncompatibleActiveLocksIfPossible(conflictPosses, request)) {
return createNewTaskLockPosse(request);
+
} else {
// During a rolling update, tasks of mixed versions can be run at the same time. Old tasks would request
// timeChunkLocks while new tasks would ask segmentLocks. The below check is to allow for old and new tasks
@@ -637,19 +646,12 @@ public class TaskLockbox
// We can add a new taskLockPosse.
return createNewTaskLockPosse(request);
} else {
- if (isAllRevocable(conflictPosses, request.getPriority())) {
- // Revoke all existing locks
- conflictPosses.forEach(this::revokeLock);
-
- return createNewTaskLockPosse(request);
- } else {
- log.info(
- "Cannot create a new taskLockPosse for request[%s] because existing locks[%s] have same or higher priorities",
- request,
- conflictPosses
- );
- return null;
- }
+ log.info(
+ "Cannot create a new taskLockPosse for request[%s] because existing locks[%s] have same or higher priorities",
+ request,
+ conflictPosses
+ );
+ return null;
}
}
} else if (reusablePosses.size() == 1) {
@@ -1239,43 +1241,190 @@ public class TaskLockbox
}
/**
- * Check if all lockPosses are either shared
- * OR of lower priority
- * OR are revoked non-shared locks if their priorities are greater than or equal to the provided priority
- * @param lockPosses conflicting task lock posses to be checked
- * @param priority priority of the lock to be acquired
- * @return true if the condititons are met
+ * Check if the lock for a given request can coexist with a given set of conflicting posses without any revocation.
+ * @param conflictPosses conflict lock posses
+ * @param request lock request
+ * @return true iff the lock can coexist with all its conflicting locks
*/
- private static boolean areAllEqualOrHigherPriorityLocksSharedOrRevoked(List<TaskLockPosse> lockPosses, int priority)
+ private boolean canLockCoexist(List<TaskLockPosse> conflictPosses, LockRequest request)
{
- return lockPosses.stream()
- .filter(taskLockPosse -> taskLockPosse.getTaskLock().getNonNullPriority() >= priority)
- .allMatch(taskLockPosse -> taskLockPosse.getTaskLock().getType().equals(TaskLockType.SHARED)
- || taskLockPosse.getTaskLock().isRevoked());
+ switch (request.getType()) {
+ case APPEND:
+ return canAppendLockCoexist(conflictPosses, request);
+ case REPLACE:
+ return canReplaceLockCoexist(conflictPosses, request);
+ case SHARED:
+ return canSharedLockCoexist(conflictPosses);
+ case EXCLUSIVE:
+ return canExclusiveLockCoexist(conflictPosses);
+ default:
+ throw new UOE("Unsupported lock type: " + request.getType());
+ }
}
/**
- * Revokes all non-shared locks with priorities lower than the provided priority
- * @param lockPosses conflicting task lock posses which may be revoked
- * @param priority priority of the lock to be acquired
+ * Check if an APPEND lock can coexist with a given set of conflicting posses.
+ * An APPEND lock can coexist with any number of other APPEND locks
+ * OR with at most one REPLACE lock over an interval which encloes this request.
+ * @param conflictPosses conflicting lock posses
+ * @param appendRequest append lock request
+ * @return true iff append lock can coexist with all its conflicting locks
*/
- private void revokeAllLowerPriorityNonSharedLocks(List<TaskLockPosse> lockPosses, int priority)
+ private boolean canAppendLockCoexist(List<TaskLockPosse> conflictPosses, LockRequest appendRequest)
{
- lockPosses.stream()
- .filter(taskLockPosse -> !TaskLockType.SHARED.equals(taskLockPosse.getTaskLock().getType()))
- .filter(taskLockPosse -> taskLockPosse.getTaskLock().getNonNullPriority() < priority)
- .forEach(this::revokeLock);
+ TaskLock replaceLock = null;
+ for (TaskLockPosse posse : conflictPosses) {
+ if (posse.getTaskLock().isRevoked()) {
+ continue;
+ }
+ if (posse.getTaskLock().getType().equals(TaskLockType.EXCLUSIVE)
+ || posse.getTaskLock().getType().equals(TaskLockType.SHARED)) {
+ return false;
+ }
+ if (posse.getTaskLock().getType().equals(TaskLockType.REPLACE)) {
+ if (replaceLock != null) {
+ return false;
+ }
+ replaceLock = posse.getTaskLock();
+ if (!replaceLock.getInterval().contains(appendRequest.getInterval())) {
+ return false;
+ }
+ }
+ }
+ return true;
}
- private static boolean isAllRevocable(List<TaskLockPosse> lockPosses, int tryLockPriority)
+ /**
+ * Check if a REPLACE lock can coexist with a given set of conflicting posses.
+ * A REPLACE lock can coexist with any number of other APPEND locks and revoked locks
+ * @param conflictPosses conflicting lock posses
+ * @param replaceLock replace lock request
+ * @return true iff replace lock can coexist with all its conflicting locks
+ */
+ private boolean canReplaceLockCoexist(List<TaskLockPosse> conflictPosses, LockRequest replaceLock)
{
- return lockPosses.stream().allMatch(taskLockPosse -> isRevocable(taskLockPosse, tryLockPriority));
+ for (TaskLockPosse posse : conflictPosses) {
+ if (posse.getTaskLock().isRevoked()) {
+ continue;
+ }
+ if (posse.getTaskLock().getType().equals(TaskLockType.EXCLUSIVE)
+ || posse.getTaskLock().getType().equals(TaskLockType.SHARED)
+ || posse.getTaskLock().getType().equals(TaskLockType.REPLACE)) {
+ return false;
+ }
+ if (posse.getTaskLock().getType().equals(TaskLockType.APPEND)
+ && !replaceLock.getInterval().contains(posse.getTaskLock().getInterval())) {
+ return false;
+ }
+ }
+ return true;
}
- private static boolean isRevocable(TaskLockPosse lockPosse, int tryLockPriority)
+ /**
+ * Check if a SHARED lock can coexist with a given set of conflicting posses.
+ * A SHARED lock can coexist with any number of other active SHARED locks
+ * @param conflictPosses conflicting lock posses
+ * @return true iff shared lock can coexist with all its conflicting locks
+ */
+ private boolean canSharedLockCoexist(List<TaskLockPosse> conflictPosses)
{
- final TaskLock existingLock = lockPosse.getTaskLock();
- return existingLock.isRevoked() || existingLock.getNonNullPriority() < tryLockPriority;
+ for (TaskLockPosse posse : conflictPosses) {
+ if (posse.getTaskLock().isRevoked()) {
+ continue;
+ }
+ if (posse.getTaskLock().getType().equals(TaskLockType.EXCLUSIVE)
+ || posse.getTaskLock().getType().equals(TaskLockType.APPEND)
+ || posse.getTaskLock().getType().equals(TaskLockType.REPLACE)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Check if an EXCLUSIVE lock can coexist with a given set of conflicting posses.
+ * An EXCLUSIVE lock cannot coexist with any other overlapping active locks
+ * @param conflictPosses conflicting lock posses
+ * @return true iff the exclusive lock can coexist with all its conflicting locks
+ */
+ private boolean canExclusiveLockCoexist(List<TaskLockPosse> conflictPosses)
+ {
+ for (TaskLockPosse posse : conflictPosses) {
+ if (posse.getTaskLock().isRevoked()) {
+ continue;
+ }
+ return false;
+ }
+ return true;
+ }
+
+
+ /**
+ * Verify if every incompatible active lock is revokable. If yes, revoke all of them.
+ * - EXCLUSIVE locks are incompatible with every other conflicting lock
+ * - SHARED locks are incompatible with conflicting locks of every other type
+ * - REPLACE locks are incompatible with every conflicting lock which is not (APPEND and enclosed) within its interval
+ * - APPEND locks are incompatible with every EXCLUSIVE and SHARED lock.
+ * Conflicting REPLACE locks which don't enclose its interval are also incompatible.
+ * @param conflictPosses conflicting lock posses
+ * @param request lock request
+ * @return true iff every incompatible lock is revocable.
+ */
+ private boolean revokeAllIncompatibleActiveLocksIfPossible(
+ List<TaskLockPosse> conflictPosses,
+ LockRequest request
+ )
+ {
+ final int priority = request.getPriority();
+ final TaskLockType type = request.getType();
+ final List<TaskLockPosse> possesToRevoke = new ArrayList<>();
+
+ for (TaskLockPosse posse : conflictPosses) {
+ if (posse.getTaskLock().isRevoked()) {
+ continue;
+ }
+ switch (type) {
+ case EXCLUSIVE:
+ if (posse.getTaskLock().getPriority() >= priority) {
+ return false;
+ }
+ possesToRevoke.add(posse);
+ break;
+ case SHARED:
+ if (!posse.getTaskLock().getType().equals(TaskLockType.SHARED)) {
+ if (posse.getTaskLock().getPriority() >= priority) {
+ return false;
+ }
+ possesToRevoke.add(posse);
+ }
+ break;
+ case REPLACE:
+ if (!(posse.getTaskLock().getType().equals(TaskLockType.APPEND)
+ && request.getInterval().contains(posse.getTaskLock().getInterval()))) {
+ if (posse.getTaskLock().getPriority() >= priority) {
+ return false;
+ }
+ possesToRevoke.add(posse);
+ }
+ break;
+ case APPEND:
+ if (!(posse.getTaskLock().getType().equals(TaskLockType.APPEND)
+ || (posse.getTaskLock().getType().equals(TaskLockType.REPLACE)
+ && posse.getTaskLock().getInterval().contains(request.getInterval())))) {
+ if (posse.getTaskLock().getPriority() >= priority) {
+ return false;
+ }
+ possesToRevoke.add(posse);
+ }
+ break;
+ default:
+ throw new UOE("Unsupported lock type: " + type);
+ }
+ }
+ for (TaskLockPosse revokablePosse : possesToRevoke) {
+ revokeLock(revokablePosse);
+ }
+ return true;
}
/**
@@ -1350,6 +1499,8 @@ public class TaskLockbox
{
if (taskLock.getType() == request.getType() && taskLock.getGranularity() == request.getGranularity()) {
switch (taskLock.getType()) {
+ case REPLACE:
+ case APPEND:
case SHARED:
if (request instanceof TimeChunkLockRequest) {
return taskLock.getInterval().contains(request.getInterval())
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
index de847d59c0..15e59227c2 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
@@ -75,7 +75,6 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -93,6 +92,11 @@ public class TaskLockboxTest
private TaskStorage taskStorage;
private IndexerMetadataStorageCoordinator metadataStorageCoordinator;
private TaskLockbox lockbox;
+ private TaskLockboxValidator validator;
+
+ private final int HIGH_PRIORITY = 15;
+ private final int MEDIUM_PRIORITY = 10;
+ private final int LOW_PRIORITY = 5;
@Rule
public final ExpectedException exception = ExpectedException.none();
@@ -124,6 +128,7 @@ public class TaskLockboxTest
metadataStorageCoordinator = new IndexerSQLMetadataStorageCoordinator(objectMapper, tablesConfig, derbyConnector);
lockbox = new TaskLockbox(taskStorage, metadataStorageCoordinator);
+ validator = new TaskLockboxValidator(lockbox, taskStorage);
}
private LockResult acquireTimeChunkLock(TaskLockType lockType, Task task, Interval interval, long timeoutMs)
@@ -144,11 +149,13 @@ public class TaskLockboxTest
}
@Test
- public void testLock() throws InterruptedException
+ public void testLock() throws Exception
{
- Task task = NoopTask.create();
- lockbox.add(task);
- Assert.assertNotNull(acquireTimeChunkLock(TaskLockType.EXCLUSIVE, task, Intervals.of("2015-01-01/2015-01-02")));
+ validator.expectLockCreated(
+ TaskLockType.EXCLUSIVE,
+ Intervals.of("2015-01-01/2015-01-02"),
+ MEDIUM_PRIORITY
+ );
}
@Test(expected = IllegalStateException.class)
@@ -169,81 +176,46 @@ public class TaskLockboxTest
}
@Test
- public void testTrySharedLock() throws EntryExistsException
+ public void testTrySharedLock() throws Exception
{
final Interval interval = Intervals.of("2017-01/2017-02");
- final List<Task> tasks = new ArrayList<>();
- final Set<TaskLock> activeLocks = new HashSet<>();
-
- // Add an exclusive lock entry of the highest priority
- Task exclusiveHigherPriorityRevokedLockTask = NoopTask.create(100);
- tasks.add(exclusiveHigherPriorityRevokedLockTask);
- taskStorage.insert(
- exclusiveHigherPriorityRevokedLockTask,
- TaskStatus.running(exclusiveHigherPriorityRevokedLockTask.getId())
- );
- lockbox.add(exclusiveHigherPriorityRevokedLockTask);
- final TaskLock exclusiveRevokedLock = tryTimeChunkLock(
+
+ final TaskLock exclusiveRevokedLock = validator.expectLockCreated(
TaskLockType.EXCLUSIVE,
- exclusiveHigherPriorityRevokedLockTask,
- interval
- ).getTaskLock();
-
- // Any equal or lower priority shared lock must fail
- final Task sharedLockTask = NoopTask.create(100);
- lockbox.add(sharedLockTask);
- Assert.assertFalse(tryTimeChunkLock(TaskLockType.SHARED, sharedLockTask, interval).isOk());
-
- // Revoke existing active exclusive lock
- lockbox.revokeLock(exclusiveHigherPriorityRevokedLockTask.getId(), exclusiveRevokedLock);
- Assert.assertEquals(1, getAllLocks(tasks).size());
- Assert.assertEquals(0, getAllActiveLocks(tasks).size());
- Assert.assertEquals(activeLocks, getAllActiveLocks(tasks));
-
- // test creating new shared locks
- for (int i = 0; i < 3; i++) {
- final Task task = NoopTask.create(Math.max(0, (i - 1) * 10)); // the first two tasks have the same priority
- tasks.add(task);
- taskStorage.insert(task, TaskStatus.running(task.getId()));
- lockbox.add(task);
- final TaskLock lock = tryTimeChunkLock(TaskLockType.SHARED, task, interval).getTaskLock();
- Assert.assertNotNull(lock);
- activeLocks.add(lock);
- }
- Assert.assertEquals(4, getAllLocks(tasks).size());
- Assert.assertEquals(3, getAllActiveLocks(tasks).size());
- Assert.assertEquals(activeLocks, getAllActiveLocks(tasks));
-
- // Adding an exclusive task lock of priority 15 should revoke all existing active locks
- Task exclusiveLowerPriorityLockTask = NoopTask.create(15);
- tasks.add(exclusiveLowerPriorityLockTask);
- taskStorage.insert(exclusiveLowerPriorityLockTask, TaskStatus.running(exclusiveLowerPriorityLockTask.getId()));
- lockbox.add(exclusiveLowerPriorityLockTask);
- final TaskLock lowerPriorityExclusiveLock = tryTimeChunkLock(
+ interval,
+ HIGH_PRIORITY
+ );
+
+ validator.expectLockNotGranted(
+ TaskLockType.SHARED,
+ interval,
+ HIGH_PRIORITY
+ );
+
+ validator.revokeLock(exclusiveRevokedLock);
+ validator.expectRevokedLocks(exclusiveRevokedLock);
+
+ final TaskLock lowPrioritySharedLock = validator.expectLockCreated(
+ TaskLockType.SHARED,
+ interval,
+ LOW_PRIORITY
+ );
+
+ final TaskLock mediumPriorityExclusiveLock = validator.expectLockCreated(
TaskLockType.EXCLUSIVE,
- exclusiveLowerPriorityLockTask,
- interval
- ).getTaskLock();
- activeLocks.clear();
- activeLocks.add(lowerPriorityExclusiveLock);
- Assert.assertEquals(5, getAllLocks(tasks).size());
- Assert.assertEquals(1, getAllActiveLocks(tasks).size());
- Assert.assertEquals(activeLocks, getAllActiveLocks(tasks));
-
- // Add new shared locks which revoke the active exclusive task lock
- activeLocks.clear();
- for (int i = 3; i < 5; i++) {
- final Task task = NoopTask.create(Math.max(0, (i - 1) * 10)); // the first two tasks have the same priority
- tasks.add(task);
- taskStorage.insert(task, TaskStatus.running(task.getId()));
- lockbox.add(task);
- final TaskLock lock = tryTimeChunkLock(TaskLockType.SHARED, task, interval).getTaskLock();
- Assert.assertNotNull(lock);
- activeLocks.add(lock);
- }
- Assert.assertEquals(7, getAllLocks(tasks).size());
- Assert.assertEquals(2, getAllActiveLocks(tasks).size());
- Assert.assertEquals(activeLocks, getAllActiveLocks(tasks));
+ interval,
+ MEDIUM_PRIORITY
+ );
+ validator.expectActiveLocks(mediumPriorityExclusiveLock);
+ validator.expectRevokedLocks(exclusiveRevokedLock, lowPrioritySharedLock);
+
+ final TaskLock highPrioritySharedLock = validator.expectLockCreated(
+ TaskLockType.SHARED,
+ interval,
+ HIGH_PRIORITY
+ );
+ validator.expectActiveLocks(highPrioritySharedLock);
+ validator.expectRevokedLocks(exclusiveRevokedLock, lowPrioritySharedLock, mediumPriorityExclusiveLock);
}
@Test
@@ -532,8 +504,8 @@ public class TaskLockboxTest
newBox.syncFromStorage();
final Set<TaskLock> afterLocksInStorage = taskStorage.getActiveTasks().stream()
- .flatMap(task -> taskStorage.getLocks(task.getId()).stream())
- .collect(Collectors.toSet());
+ .flatMap(task -> taskStorage.getLocks(task.getId()).stream())
+ .collect(Collectors.toSet());
Assert.assertEquals(
beforeLocksInStorage.values().stream().flatMap(Collection::stream).collect(Collectors.toSet()),
@@ -1272,6 +1244,380 @@ public class TaskLockboxTest
);
}
+ @Test
+ public void testExclusiveLockCompatibility() throws Exception
+ {
+ final TaskLock theLock = validator.expectLockCreated(
+ TaskLockType.EXCLUSIVE,
+ Intervals.of("2017/2018"),
+ MEDIUM_PRIORITY
+ );
+
+ validator.expectLockNotGranted(
+ TaskLockType.EXCLUSIVE,
+ Intervals.of("2017-05-01/2017-06-01"),
+ MEDIUM_PRIORITY
+ );
+
+ validator.expectLockNotGranted(
+ TaskLockType.SHARED,
+ Intervals.of("2016/2019"),
+ MEDIUM_PRIORITY
+ );
+
+ validator.expectLockNotGranted(
+ TaskLockType.REPLACE,
+ Intervals.of("2017/2018"),
+ MEDIUM_PRIORITY
+ );
+
+ validator.expectLockNotGranted(
+ TaskLockType.APPEND,
+ Intervals.of("2017-05-01/2018-05-01"),
+ MEDIUM_PRIORITY
+ );
+
+ validator.expectActiveLocks(theLock);
+ validator.expectRevokedLocks();
+ }
+
+ @Test
+ public void testExclusiveLockCanRevokeAllIncompatible() throws Exception
+ {
+ final TaskLockboxValidator validator = new TaskLockboxValidator(lockbox, taskStorage);
+
+ final TaskLock sharedLock = validator.tryTaskLock(
+ TaskLockType.SHARED,
+ Intervals.of("2016/2019"),
+ HIGH_PRIORITY
+ );
+ validator.revokeLock(sharedLock);
+
+ final TaskLock exclusiveLock = validator.expectLockCreated(
+ TaskLockType.EXCLUSIVE,
+ Intervals.of("2017-01-01/2017-02-01"),
+ LOW_PRIORITY
+ );
+
+ final TaskLock replaceLock = validator.expectLockCreated(
+ TaskLockType.REPLACE,
+ Intervals.of("2017-07-01/2018-01-01"),
+ LOW_PRIORITY
+ );
+
+ final TaskLock appendLock = validator.expectLockCreated(
+ TaskLockType.APPEND,
+ Intervals.of("2017-09-01/2017-10-01"),
+ LOW_PRIORITY
+ );
+
+ validator.expectActiveLocks(exclusiveLock, replaceLock, appendLock);
+ validator.expectRevokedLocks(sharedLock);
+
+ final TaskLock theLock = validator.expectLockCreated(
+ TaskLockType.EXCLUSIVE,
+ Intervals.of("2017/2018"),
+ MEDIUM_PRIORITY
+ );
+
+ validator.expectActiveLocks(theLock);
+ validator.expectRevokedLocks(sharedLock, exclusiveLock, appendLock, replaceLock);
+ }
+
+ @Test
+ public void testSharedLockCompatibility() throws Exception
+ {
+ final TaskLock theLock = validator.expectLockCreated(
+ TaskLockType.SHARED,
+ Intervals.of("2017/2018"),
+ MEDIUM_PRIORITY
+ );
+
+ validator.expectLockNotGranted(
+ TaskLockType.EXCLUSIVE,
+ Intervals.of("2017-05-01/2017-06-01"),
+ MEDIUM_PRIORITY
+ );
+
+ final TaskLock sharedLock0 = validator.expectLockCreated(
+ TaskLockType.SHARED,
+ Intervals.of("2016/2019"),
+ LOW_PRIORITY
+ );
+
+ final TaskLock sharedLock1 = validator.expectLockCreated(
+ TaskLockType.SHARED,
+ Intervals.of("2017-06-01/2017-07-01"),
+ LOW_PRIORITY
+ );
+
+ final TaskLock sharedLock2 = validator.expectLockCreated(
+ TaskLockType.SHARED,
+ Intervals.of("2017-05-01/2018-05-01"),
+ MEDIUM_PRIORITY
+ );
+
+ validator.expectLockNotGranted(
+ TaskLockType.REPLACE,
+ Intervals.of("2017/2018"),
+ MEDIUM_PRIORITY
+ );
+
+ validator.expectLockNotGranted(
+ TaskLockType.APPEND,
+ Intervals.of("2017-05-01/2018-05-01"),
+ MEDIUM_PRIORITY
+ );
+
+ validator.expectActiveLocks(theLock, sharedLock0, sharedLock1, sharedLock2);
+ validator.expectRevokedLocks();
+ }
+
+ @Test
+ public void testSharedLockCanRevokeAllIncompatible() throws Exception
+ {
+ final TaskLock exclusiveLock = validator.expectLockCreated(
+ TaskLockType.EXCLUSIVE,
+ Intervals.of("2016/2019"),
+ HIGH_PRIORITY
+ );
+ validator.revokeLock(exclusiveLock);
+
+ final TaskLock sharedLock = validator.expectLockCreated(
+ TaskLockType.SHARED,
+ Intervals.of("2017-01-01/2017-02-01"),
+ MEDIUM_PRIORITY
+ );
+
+ final TaskLock replaceLock = validator.expectLockCreated(
+ TaskLockType.REPLACE,
+ Intervals.of("2017-07-01/2018-07-01"),
+ LOW_PRIORITY
+ );
+
+ final TaskLock appendLock = validator.expectLockCreated(
+ TaskLockType.APPEND,
+ Intervals.of("2017-02-01/2017-03-01"),
+ LOW_PRIORITY
+ );
+
+ validator.expectActiveLocks(sharedLock, replaceLock, appendLock);
+ validator.expectRevokedLocks(exclusiveLock);
+
+ final TaskLock theLock = validator.expectLockCreated(
+ TaskLockType.SHARED,
+ Intervals.of("2017/2018"),
+ MEDIUM_PRIORITY
+ );
+
+ validator.expectActiveLocks(theLock, sharedLock);
+ validator.expectRevokedLocks(exclusiveLock, replaceLock, appendLock);
+ }
+
+ @Test
+ public void testAppendLockCompatibility() throws Exception
+ {
+ final TaskLock theLock = validator.expectLockCreated(
+ TaskLockType.APPEND,
+ Intervals.of("2017/2018"),
+ MEDIUM_PRIORITY
+ );
+
+ validator.expectLockNotGranted(
+ TaskLockType.EXCLUSIVE,
+ Intervals.of("2017-05-01/2017-06-01"),
+ MEDIUM_PRIORITY
+ );
+
+ validator.expectLockNotGranted(
+ TaskLockType.SHARED,
+ Intervals.of("2016/2019"),
+ MEDIUM_PRIORITY
+ );
+
+ validator.expectLockNotGranted(
+ TaskLockType.REPLACE,
+ Intervals.of("2017-05-01/2018-01-01"),
+ MEDIUM_PRIORITY
+ );
+
+ final TaskLock replaceLock = validator.expectLockCreated(
+ TaskLockType.REPLACE,
+ Intervals.of("2017/2018"),
+ MEDIUM_PRIORITY
+ );
+
+ validator.expectLockNotGranted(
+ TaskLockType.REPLACE,
+ Intervals.of("2016/2019"),
+ MEDIUM_PRIORITY
+ );
+
+
+ // Any append lock can be created, provided that it lies within the interval of the previously created replace lock
+ // This should not revoke any of the existing locks even with a higher priority
+ final TaskLock appendLock0 = validator.expectLockCreated(
+ TaskLockType.APPEND,
+ Intervals.of("2017-05-01/2017-06-01"),
+ HIGH_PRIORITY
+ );
+
+ final TaskLock appendLock1 = validator.expectLockCreated(
+ TaskLockType.APPEND,
+ Intervals.of("2017-05-01/2017-06-01"),
+ LOW_PRIORITY
+ );
+
+ validator.expectActiveLocks(theLock, replaceLock, appendLock0, appendLock1);
+ validator.expectRevokedLocks();
+ }
+
+ @Test
+ public void testAppendLockCanRevokeAllIncompatible() throws Exception
+ {
+ final TaskLock sharedLock = validator.expectLockCreated(
+ TaskLockType.SHARED,
+ Intervals.of("2016/2019"),
+ HIGH_PRIORITY
+ );
+ validator.revokeLock(sharedLock);
+
+ final TaskLock exclusiveLock = validator.expectLockCreated(
+ TaskLockType.EXCLUSIVE,
+ Intervals.of("2017-01-01/2017-02-01"),
+ LOW_PRIORITY
+ );
+
+ final TaskLock replaceLock = validator.expectLockCreated(
+ TaskLockType.REPLACE,
+ Intervals.of("2017-07-01/2018-07-01"),
+ LOW_PRIORITY
+ );
+
+ final TaskLock appendLock0 = validator.expectLockCreated(
+ TaskLockType.APPEND,
+ Intervals.of("2017-02-01/2017-03-01"),
+ LOW_PRIORITY
+ );
+
+ final TaskLock appendLock1 = validator.expectLockCreated(
+ TaskLockType.APPEND,
+ Intervals.of("2017-02-01/2017-05-01"),
+ HIGH_PRIORITY
+ );
+
+ validator.expectActiveLocks(exclusiveLock, replaceLock, appendLock0, appendLock1);
+ validator.expectRevokedLocks(sharedLock);
+
+ final TaskLock theLock = validator.expectLockCreated(
+ TaskLockType.APPEND,
+ Intervals.of("2017/2018"),
+ MEDIUM_PRIORITY
+ );
+
+ validator.expectActiveLocks(theLock, appendLock0, appendLock1);
+ validator.expectRevokedLocks(sharedLock, exclusiveLock, replaceLock);
+ }
+
+
+ @Test
+ public void testReplaceLockCompatibility() throws Exception
+ {
+ final TaskLock theLock = validator.expectLockCreated(
+ TaskLockType.REPLACE,
+ Intervals.of("2017/2018"),
+ MEDIUM_PRIORITY
+ );
+
+ validator.expectLockNotGranted(
+ TaskLockType.EXCLUSIVE,
+ Intervals.of("2017-05-01/2017-06-01"),
+ MEDIUM_PRIORITY
+ );
+
+ validator.expectLockNotGranted(
+ TaskLockType.SHARED,
+ Intervals.of("2016/2019"),
+ MEDIUM_PRIORITY
+ );
+
+ validator.expectLockNotGranted(
+ TaskLockType.REPLACE,
+ Intervals.of("2017/2018"),
+ MEDIUM_PRIORITY
+ );
+
+ // An append lock can be created for an interval enclosed within the replace lock's.
+ // Also note that the append lock has a higher priority but doesn't revoke the replace lock as it can coexist.
+ final TaskLock appendLock = validator.expectLockCreated(
+ TaskLockType.APPEND,
+ Intervals.of("2017-05-01/2017-06-01"),
+ HIGH_PRIORITY
+ );
+
+ validator.expectLockNotGranted(
+ TaskLockType.APPEND,
+ Intervals.of("2016-05-01/2017-06-01"),
+ MEDIUM_PRIORITY
+ );
+
+ validator.expectActiveLocks(theLock, appendLock);
+ validator.expectRevokedLocks();
+ }
+
+ @Test
+ public void testReplaceLockCanRevokeAllIncompatible() throws Exception
+ {
+ final TaskLock appendLock0 = validator.expectLockCreated(
+ TaskLockType.APPEND,
+ Intervals.of("2016/2019"),
+ HIGH_PRIORITY
+ );
+ validator.revokeLock(appendLock0);
+
+ final TaskLock appendLock1 = validator.expectLockCreated(
+ TaskLockType.APPEND,
+ Intervals.of("2017-02-01/2017-03-01"),
+ HIGH_PRIORITY
+ );
+
+ final TaskLock appendLock2 = validator.expectLockCreated(
+ TaskLockType.APPEND,
+ Intervals.of("2017-09-01/2018-03-01"),
+ LOW_PRIORITY
+ );
+
+ final TaskLock exclusiveLock = validator.expectLockCreated(
+ TaskLockType.EXCLUSIVE,
+ Intervals.of("2017-05-01/2017-06-01"),
+ LOW_PRIORITY
+ );
+
+ final TaskLock replaceLock = validator.expectLockCreated(
+ TaskLockType.REPLACE,
+ Intervals.of("2016-09-01/2017-03-01"),
+ LOW_PRIORITY
+ );
+
+ final TaskLock sharedLock = validator.expectLockCreated(
+ TaskLockType.SHARED,
+ Intervals.of("2017-04-01/2017-05-01"),
+ LOW_PRIORITY
+ );
+
+ validator.expectActiveLocks(appendLock1, appendLock2, exclusiveLock, replaceLock, sharedLock);
+ validator.expectRevokedLocks(appendLock0);
+
+ final TaskLock theLock = validator.expectLockCreated(
+ TaskLockType.REPLACE,
+ Intervals.of("2017/2018"),
+ MEDIUM_PRIORITY
+ );
+
+ validator.expectActiveLocks(appendLock1, theLock);
+ validator.expectRevokedLocks(appendLock0, appendLock2, exclusiveLock, replaceLock, sharedLock);
+ }
+
@Test
public void testGetLockedIntervalsForRevokedLocks() throws Exception
{
@@ -1360,62 +1706,112 @@ public class TaskLockboxTest
@Test
public void testConflictsWithOverlappingSharedLocks() throws Exception
{
- final List<Task> tasks = new ArrayList<>();
-
- final Task conflictingTask = NoopTask.create(10);
- tasks.add(conflictingTask);
- lockbox.add(conflictingTask);
- taskStorage.insert(conflictingTask, TaskStatus.running(conflictingTask.getId()));
- TaskLock conflictingLock = tryTimeChunkLock(
+ TaskLock conflictingLock = validator.expectLockCreated(
TaskLockType.SHARED,
- conflictingTask,
- Intervals.of("2023-05-01/2023-06-01")
- ).getTaskLock();
- Assert.assertNotNull(conflictingLock);
- Assert.assertFalse(conflictingLock.isRevoked());
-
- final Task floorTask = NoopTask.create(10);
- tasks.add(floorTask);
- lockbox.add(floorTask);
- taskStorage.insert(floorTask, TaskStatus.running(floorTask.getId()));
- TaskLock floorLock = tryTimeChunkLock(
+ Intervals.of("2023-05-01/2023-06-01"),
+ MEDIUM_PRIORITY
+ );
+
+ TaskLock floorLock = validator.expectLockCreated(
TaskLockType.SHARED,
- floorTask,
- Intervals.of("2023-05-26/2023-05-27")
- ).getTaskLock();
- Assert.assertNotNull(floorLock);
- Assert.assertFalse(floorLock.isRevoked());
-
- final Task rightOverlapTask = NoopTask.create(10);
- tasks.add(rightOverlapTask);
- lockbox.add(rightOverlapTask);
- taskStorage.insert(rightOverlapTask, TaskStatus.running(rightOverlapTask.getId()));
- TaskLock rightOverlapLock = tryTimeChunkLock(
- TaskLockType.EXCLUSIVE,
- rightOverlapTask,
- Intervals.of("2023-05-28/2023-06-03")
- ).getTaskLock();
- Assert.assertNull(rightOverlapLock);
+ Intervals.of("2023-05-26/2023-05-27"),
+ MEDIUM_PRIORITY
+ );
- Assert.assertEquals(
- ImmutableSet.of(conflictingLock, floorLock),
- getAllActiveLocks(tasks)
+ validator.expectLockNotGranted(
+ TaskLockType.EXCLUSIVE,
+ Intervals.of("2023-05-28/2023-06-03"),
+ MEDIUM_PRIORITY
);
- }
- private Set<TaskLock> getAllActiveLocks(List<Task> tasks)
- {
- return tasks.stream()
- .flatMap(task -> taskStorage.getLocks(task.getId()).stream())
- .filter(taskLock -> !taskLock.isRevoked())
- .collect(Collectors.toSet());
+ validator.expectActiveLocks(conflictingLock, floorLock);
}
- private Set<TaskLock> getAllLocks(List<Task> tasks)
+
+ private class TaskLockboxValidator
{
- return tasks.stream()
- .flatMap(task -> taskStorage.getLocks(task.getId()).stream())
- .collect(Collectors.toSet());
+
+ private final List<Task> tasks;
+ private final TaskLockbox lockbox;
+ private final TaskStorage taskStorage;
+ private final Map<TaskLock, String> lockToTaskIdMap;
+
+ TaskLockboxValidator(TaskLockbox lockbox, TaskStorage taskStorage)
+ {
+ lockToTaskIdMap = new HashMap<>();
+ tasks = new ArrayList<>();
+ this.lockbox = lockbox;
+ this.taskStorage = taskStorage;
+ }
+
+ public TaskLock expectLockCreated(TaskLockType type, Interval interval, int priority) throws Exception
+ {
+ final TaskLock lock = tryTaskLock(type, interval, priority);
+ Assert.assertNotNull(lock);
+ Assert.assertFalse(lock.isRevoked());
+ return lock;
+ }
+
+ public void revokeLock(TaskLock lock)
+ {
+ lockbox.revokeLock(lockToTaskIdMap.get(lock), lock);
+ }
+
+ public void expectLockNotGranted(TaskLockType type, Interval interval, int priority) throws Exception
+ {
+ final TaskLock lock = tryTaskLock(type, interval, priority);
+ Assert.assertNull(lock);
+ }
+
+ public void expectRevokedLocks(TaskLock... locks)
+ {
+ final Set<TaskLock> allLocks = getAllLocks();
+ final Set<TaskLock> activeLocks = getAllActiveLocks();
+ Assert.assertEquals(allLocks.size() - activeLocks.size(), locks.length);
+ for (TaskLock lock : locks) {
+ Assert.assertTrue(allLocks.contains(lock.revokedCopy()));
+ Assert.assertFalse(activeLocks.contains(lock));
+ }
+ }
+
+ public void expectActiveLocks(TaskLock... locks)
+ {
+ final Set<TaskLock> allLocks = getAllLocks();
+ final Set<TaskLock> activeLocks = getAllActiveLocks();
+ Assert.assertEquals(activeLocks.size(), locks.length);
+ for (TaskLock lock : locks) {
+ Assert.assertTrue(allLocks.contains(lock));
+ Assert.assertTrue(activeLocks.contains(lock));
+ }
+ }
+
+ private TaskLock tryTaskLock(TaskLockType type, Interval interval, int priority) throws Exception
+ {
+ final Task task = NoopTask.create(priority);
+ tasks.add(task);
+ lockbox.add(task);
+ taskStorage.insert(task, TaskStatus.running(task.getId()));
+ TaskLock lock = tryTimeChunkLock(type, task, interval).getTaskLock();
+ if (lock != null) {
+ lockToTaskIdMap.put(lock, task.getId());
+ }
+ return lock;
+ }
+
+ private Set<TaskLock> getAllActiveLocks()
+ {
+ return tasks.stream()
+ .flatMap(task -> taskStorage.getLocks(task.getId()).stream())
+ .filter(taskLock -> !taskLock.isRevoked())
+ .collect(Collectors.toSet());
+ }
+
+ private Set<TaskLock> getAllLocks()
+ {
+ return tasks.stream()
+ .flatMap(task -> taskStorage.getLocks(task.getId()).stream())
+ .collect(Collectors.toSet());
+ }
}
private static class IntervalLockWithoutPriority extends TimeChunkLock
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org