You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by jo...@apache.org on 2019/01/10 03:38:34 UTC
[incubator-druid] branch master updated: Fix TaskLockbox when there
are multiple intervals of the same start but differerent end (#6822)
This is an automated email from the ASF dual-hosted git repository.
jonwei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push:
new 934c83b Fix TaskLockbox when there are multiple intervals of the same start but differerent end (#6822)
934c83b is described below
commit 934c83bca6df5db913c130af039d8893a9fcb16a
Author: Jihoon Son <ji...@apache.org>
AuthorDate: Wed Jan 9 19:38:27 2019 -0800
Fix TaskLockbox when there are multiple intervals of the same start but differerent end (#6822)
* Fix TaskLockbox when there are multiple intervals of the same start but differernt end
* fix build
* fix npe
---
.../druid/indexing/overlord/TaskLockbox.java | 139 +++++++++++++--------
.../druid/indexing/overlord/TaskLockboxTest.java | 43 +++++++
2 files changed, 131 insertions(+), 51 deletions(-)
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
index 96451b7..626f4e3 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
@@ -38,6 +38,7 @@ import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.joda.time.DateTime;
import org.joda.time.Interval;
import javax.annotation.Nullable;
@@ -51,6 +52,7 @@ import java.util.Map;
import java.util.NavigableMap;
import java.util.NavigableSet;
import java.util.Set;
+import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
@@ -66,11 +68,14 @@ import java.util.stream.StreamSupport;
*/
public class TaskLockbox
{
- // Datasource -> Interval -> list of (Tasks + TaskLock)
+ // Datasource -> startTime -> Interval -> list of (Tasks + TaskLock)
// Multiple shared locks can be acquired for the same dataSource and interval.
// Note that revoked locks are also maintained in this map to notify that those locks are revoked to the callers when
// they acquire the same locks again.
- private final Map<String, NavigableMap<Interval, List<TaskLockPosse>>> running = new HashMap<>();
+ // Also, the key of the second inner map is the start time to find all intervals properly starting with the same
+ // startTime.
+ private final Map<String, NavigableMap<DateTime, SortedMap<Interval, List<TaskLockPosse>>>> running = new HashMap<>();
+
private final TaskStorage taskStorage;
private final ReentrantLock giant = new ReentrantLock(true);
private final Condition lockReleaseCondition = giant.newCondition();
@@ -326,7 +331,14 @@ public class TaskLockbox
final TaskLockType lockType
)
{
- return createOrFindLockPosse(task, interval, null, lockType);
+ giant.lock();
+
+ try {
+ return createOrFindLockPosse(task, interval, null, lockType);
+ }
+ finally {
+ giant.unlock();
+ }
}
/**
@@ -584,7 +596,8 @@ public class TaskLockbox
final TaskLockPosse posseToUse = new TaskLockPosse(
new TaskLock(lockType, groupId, dataSource, interval, version, priority, revoked)
);
- running.computeIfAbsent(dataSource, k -> new TreeMap<>(Comparators.intervalsByStartThenEnd()))
+ running.computeIfAbsent(dataSource, k -> new TreeMap<>())
+ .computeIfAbsent(interval.getStart(), k -> new TreeMap<>(Comparators.intervalsByStartThenEnd()))
.computeIfAbsent(interval, k -> new ArrayList<>())
.add(posseToUse);
@@ -612,7 +625,7 @@ public class TaskLockbox
CriticalAction<T> action
) throws Exception
{
- giant.lockInterruptibly();
+ giant.lock();
try {
return action.perform(isTaskLocksValid(task, intervals));
@@ -624,13 +637,19 @@ public class TaskLockbox
private boolean isTaskLocksValid(Task task, List<Interval> intervals)
{
- return intervals
- .stream()
- .allMatch(interval -> {
- final TaskLock lock = getOnlyTaskLockPosseContainingInterval(task, interval).getTaskLock();
- // Tasks cannot enter the critical section with a shared lock
- return !lock.isRevoked() && lock.getType() != TaskLockType.SHARED;
- });
+ giant.lock();
+ try {
+ return intervals
+ .stream()
+ .allMatch(interval -> {
+ final TaskLock lock = getOnlyTaskLockPosseContainingInterval(task, interval).getTaskLock();
+ // Tasks cannot enter the critical section with a shared lock
+ return !lock.isRevoked() && lock.getType() != TaskLockType.SHARED;
+ });
+ }
+ finally {
+ giant.unlock();
+ }
}
private void revokeLock(TaskLockPosse lockPosse)
@@ -676,7 +695,7 @@ public class TaskLockbox
final TaskLock revokedLock = lock.revokedCopy();
taskStorage.replaceLock(taskId, lock, revokedLock);
- final List<TaskLockPosse> possesHolder = running.get(task.getDataSource()).get(lock.getInterval());
+ final List<TaskLockPosse> possesHolder = running.get(task.getDataSource()).get(lock.getInterval().getStart()).get(lock.getInterval());
final TaskLockPosse foundPosse = possesHolder.stream()
.filter(posse -> posse.getTaskLock().equals(lock))
.findFirst()
@@ -733,13 +752,19 @@ public class TaskLockbox
try {
final String dataSource = task.getDataSource();
- final NavigableMap<Interval, List<TaskLockPosse>> dsRunning = running.get(task.getDataSource());
+ final NavigableMap<DateTime, SortedMap<Interval, List<TaskLockPosse>>> dsRunning = running.get(task.getDataSource());
if (dsRunning == null || dsRunning.isEmpty()) {
return;
}
- final List<TaskLockPosse> possesHolder = dsRunning.get(interval);
+ final SortedMap<Interval, List<TaskLockPosse>> intervalToPosses = dsRunning.get(interval.getStart());
+
+ if (intervalToPosses == null || intervalToPosses.isEmpty()) {
+ return;
+ }
+
+ final List<TaskLockPosse> possesHolder = intervalToPosses.get(interval);
if (possesHolder == null || possesHolder.isEmpty()) {
return;
}
@@ -760,8 +785,12 @@ public class TaskLockbox
possesHolder.remove(taskLockPosse);
}
- if (possesHolder.size() == 0) {
- dsRunning.remove(interval);
+ if (possesHolder.isEmpty()) {
+ intervalToPosses.remove(interval);
+ }
+
+ if (intervalToPosses.isEmpty()) {
+ dsRunning.remove(interval.getStart());
}
if (running.get(dataSource).size() == 0) {
@@ -797,6 +826,18 @@ public class TaskLockbox
}
}
+ public void add(Task task)
+ {
+ giant.lock();
+ try {
+ log.info("Adding task[%s] to activeTasks", task.getId());
+ activeTasks.add(task.getId());
+ }
+ finally {
+ giant.unlock();
+ }
+ }
+
/**
* Release all locks for a task and remove task from set of active tasks. Does nothing if the task is not currently locked or not an active task.
*
@@ -832,11 +873,12 @@ public class TaskLockbox
try {
// Scan through all locks for this datasource
- final NavigableMap<Interval, List<TaskLockPosse>> dsRunning = running.get(task.getDataSource());
+ final NavigableMap<DateTime, SortedMap<Interval, List<TaskLockPosse>>> dsRunning = running.get(task.getDataSource());
if (dsRunning == null) {
return ImmutableList.of();
} else {
return dsRunning.values().stream()
+ .flatMap(map -> map.values().stream())
.flatMap(Collection::stream)
.filter(taskLockPosse -> taskLockPosse.containsTask(task))
.collect(Collectors.toList());
@@ -870,29 +912,28 @@ public class TaskLockbox
giant.lock();
try {
- final NavigableMap<Interval, List<TaskLockPosse>> dsRunning = running.get(dataSource);
+ final NavigableMap<DateTime, SortedMap<Interval, List<TaskLockPosse>>> dsRunning = running.get(dataSource);
if (dsRunning == null) {
// No locks at all
return Collections.emptyList();
} else {
// Tasks are indexed by locked interval, which are sorted by interval start. Intervals are non-overlapping, so:
- final NavigableSet<Interval> dsLockbox = dsRunning.navigableKeySet();
- final Iterable<Interval> searchIntervals = Iterables.concat(
+ final NavigableSet<DateTime> dsLockbox = dsRunning.navigableKeySet();
+ final Iterable<DateTime> searchStartTimes = Iterables.concat(
// Single interval that starts at or before ours
- Collections.singletonList(dsLockbox.floor(new Interval(interval.getStart(), DateTimes.MAX))),
+ Collections.singletonList(dsLockbox.floor(interval.getStart())),
// All intervals that start somewhere between our start instant (exclusive) and end instant (exclusive)
- dsLockbox.subSet(
- new Interval(interval.getStart(), DateTimes.MAX),
- false,
- new Interval(interval.getEnd(), interval.getEnd()),
- false
- )
+ dsLockbox.subSet(interval.getStart(), false, interval.getEnd(), false)
);
- return StreamSupport.stream(searchIntervals.spliterator(), false)
- .filter(searchInterval -> searchInterval != null && searchInterval.overlaps(interval))
- .flatMap(searchInterval -> dsRunning.get(searchInterval).stream())
+ return StreamSupport.stream(searchStartTimes.spliterator(), false)
+ .filter(java.util.Objects::nonNull)
+ .map(dsRunning::get)
+ .filter(java.util.Objects::nonNull)
+ .flatMap(sortedMap -> sortedMap.entrySet().stream())
+ .filter(entry -> entry.getKey().overlaps(interval))
+ .flatMap(entry -> entry.getValue().stream())
.collect(Collectors.toList());
}
}
@@ -901,12 +942,24 @@ public class TaskLockbox
}
}
- public void add(Task task)
+ @VisibleForTesting
+ TaskLockPosse getOnlyTaskLockPosseContainingInterval(Task task, Interval interval)
{
giant.lock();
+
try {
- log.info("Adding task[%s] to activeTasks", task.getId());
- activeTasks.add(task.getId());
+ final List<TaskLockPosse> filteredPosses = findLockPossesContainingInterval(task.getDataSource(), interval)
+ .stream()
+ .filter(lockPosse -> lockPosse.containsTask(task))
+ .collect(Collectors.toList());
+
+ if (filteredPosses.isEmpty()) {
+ throw new ISE("Cannot find locks for task[%s] and interval[%s]", task.getId(), interval);
+ } else if (filteredPosses.size() > 1) {
+ throw new ISE("There are multiple lockPosses for task[%s] and interval[%s]?", task.getId(), interval);
+ } else {
+ return filteredPosses.get(0);
+ }
}
finally {
giant.unlock();
@@ -936,22 +989,6 @@ public class TaskLockbox
return existingLock.isRevoked() || existingLock.getNonNullPriority() < tryLockPriority;
}
- private TaskLockPosse getOnlyTaskLockPosseContainingInterval(Task task, Interval interval)
- {
- final List<TaskLockPosse> filteredPosses = findLockPossesContainingInterval(task.getDataSource(), interval)
- .stream()
- .filter(lockPosse -> lockPosse.containsTask(task))
- .collect(Collectors.toList());
-
- if (filteredPosses.isEmpty()) {
- throw new ISE("Cannot find locks for task[%s] and interval[%s]", task.getId(), interval);
- } else if (filteredPosses.size() > 1) {
- throw new ISE("There are multiple lockPosses for task[%s] and interval[%s]?", task.getId(), interval);
- } else {
- return filteredPosses.get(0);
- }
- }
-
@VisibleForTesting
Set<String> getActiveTasks()
{
@@ -959,7 +996,7 @@ public class TaskLockbox
}
@VisibleForTesting
- Map<String, NavigableMap<Interval, List<TaskLockPosse>>> getAllLocks()
+ Map<String, NavigableMap<DateTime, SortedMap<Interval, List<TaskLockPosse>>>> getAllLocks()
{
return running;
}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
index d598b1f..5604255 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
@@ -35,6 +35,7 @@ import org.apache.druid.indexing.common.config.TaskStorageConfig;
import org.apache.druid.indexing.common.task.AbstractTask;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.overlord.TaskLockbox.TaskLockPosse;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
@@ -624,6 +625,48 @@ public class TaskLockboxTest
Assert.assertTrue(lockbox.getAllLocks().isEmpty());
}
+ @Test
+ public void testFindLockPosseAfterRevokeWithDifferentLockIntervals() throws EntryExistsException
+ {
+ final Task lowPriorityTask = NoopTask.create(0);
+ final Task highPriorityTask = NoopTask.create(10);
+
+ taskStorage.insert(lowPriorityTask, TaskStatus.running(lowPriorityTask.getId()));
+ taskStorage.insert(highPriorityTask, TaskStatus.running(highPriorityTask.getId()));
+ lockbox.add(lowPriorityTask);
+ lockbox.add(highPriorityTask);
+
+ Assert.assertTrue(
+ lockbox.tryLock(
+ TaskLockType.EXCLUSIVE,
+ lowPriorityTask, Intervals.of("2018-12-16T09:00:00/2018-12-16T10:00:00")
+ ).isOk()
+ );
+
+ Assert.assertTrue(
+ lockbox.tryLock(
+ TaskLockType.EXCLUSIVE,
+ highPriorityTask, Intervals.of("2018-12-16T09:00:00/2018-12-16T09:30:00")
+ ).isOk()
+ );
+
+ final TaskLockPosse highLockPosse = lockbox.getOnlyTaskLockPosseContainingInterval(
+ highPriorityTask,
+ Intervals.of("2018-12-16T09:00:00/2018-12-16T09:30:00")
+ );
+
+ Assert.assertTrue(highLockPosse.containsTask(highPriorityTask));
+ Assert.assertFalse(highLockPosse.getTaskLock().isRevoked());
+
+ final TaskLockPosse lowLockPosse = lockbox.getOnlyTaskLockPosseContainingInterval(
+ lowPriorityTask,
+ Intervals.of("2018-12-16T09:00:00/2018-12-16T10:00:00")
+ );
+
+ Assert.assertTrue(lowLockPosse.containsTask(lowPriorityTask));
+ Assert.assertTrue(lowLockPosse.getTaskLock().isRevoked());
+ }
+
private Set<TaskLock> getAllLocks(List<Task> tasks)
{
return tasks.stream()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org