You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2019/01/10 03:38:29 UTC

[GitHub] jon-wei closed pull request #6822: Fix TaskLockbox when there are multiple intervals of the same start but differerent end

jon-wei closed pull request #6822: Fix TaskLockbox when there are multiple intervals of the same start but differerent end
URL: https://github.com/apache/incubator-druid/pull/6822
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
index 96451b7c570..626f4e33344 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
@@ -38,6 +38,7 @@
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.guava.Comparators;
 import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.joda.time.DateTime;
 import org.joda.time.Interval;
 
 import javax.annotation.Nullable;
@@ -51,6 +52,7 @@
 import java.util.NavigableMap;
 import java.util.NavigableSet;
 import java.util.Set;
+import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Condition;
@@ -66,11 +68,14 @@
  */
 public class TaskLockbox
 {
-  // Datasource -> Interval -> list of (Tasks + TaskLock)
+  // Datasource -> startTime -> Interval -> list of (Tasks + TaskLock)
   // Multiple shared locks can be acquired for the same dataSource and interval.
   // Note that revoked locks are also maintained in this map to notify that those locks are revoked to the callers when
   // they acquire the same locks again.
-  private final Map<String, NavigableMap<Interval, List<TaskLockPosse>>> running = new HashMap<>();
+  // Also, the key of the second inner map is the start time to find all intervals properly starting with the same
+  // startTime.
+  private final Map<String, NavigableMap<DateTime, SortedMap<Interval, List<TaskLockPosse>>>> running = new HashMap<>();
+
   private final TaskStorage taskStorage;
   private final ReentrantLock giant = new ReentrantLock(true);
   private final Condition lockReleaseCondition = giant.newCondition();
@@ -326,7 +331,14 @@ private TaskLockPosse createOrFindLockPosse(
       final TaskLockType lockType
   )
   {
-    return createOrFindLockPosse(task, interval, null, lockType);
+    giant.lock();
+
+    try {
+      return createOrFindLockPosse(task, interval, null, lockType);
+    }
+    finally {
+      giant.unlock();
+    }
   }
 
   /**
@@ -584,7 +596,8 @@ private TaskLockPosse createNewTaskLockPosse(
       final TaskLockPosse posseToUse = new TaskLockPosse(
           new TaskLock(lockType, groupId, dataSource, interval, version, priority, revoked)
       );
-      running.computeIfAbsent(dataSource, k -> new TreeMap<>(Comparators.intervalsByStartThenEnd()))
+      running.computeIfAbsent(dataSource, k -> new TreeMap<>())
+             .computeIfAbsent(interval.getStart(), k -> new TreeMap<>(Comparators.intervalsByStartThenEnd()))
              .computeIfAbsent(interval, k -> new ArrayList<>())
              .add(posseToUse);
 
@@ -612,7 +625,7 @@ private TaskLockPosse createNewTaskLockPosse(
       CriticalAction<T> action
   ) throws Exception
   {
-    giant.lockInterruptibly();
+    giant.lock();
 
     try {
       return action.perform(isTaskLocksValid(task, intervals));
@@ -624,13 +637,19 @@ private TaskLockPosse createNewTaskLockPosse(
 
   private boolean isTaskLocksValid(Task task, List<Interval> intervals)
   {
-    return intervals
-        .stream()
-        .allMatch(interval -> {
-          final TaskLock lock = getOnlyTaskLockPosseContainingInterval(task, interval).getTaskLock();
-          // Tasks cannot enter the critical section with a shared lock
-          return !lock.isRevoked() && lock.getType() != TaskLockType.SHARED;
-        });
+    giant.lock();
+    try {
+      return intervals
+          .stream()
+          .allMatch(interval -> {
+            final TaskLock lock = getOnlyTaskLockPosseContainingInterval(task, interval).getTaskLock();
+            // Tasks cannot enter the critical section with a shared lock
+            return !lock.isRevoked() && lock.getType() != TaskLockType.SHARED;
+          });
+    }
+    finally {
+      giant.unlock();
+    }
   }
 
   private void revokeLock(TaskLockPosse lockPosse)
@@ -676,7 +695,7 @@ private void revokeLock(String taskId, TaskLock lock)
         final TaskLock revokedLock = lock.revokedCopy();
         taskStorage.replaceLock(taskId, lock, revokedLock);
 
-        final List<TaskLockPosse> possesHolder = running.get(task.getDataSource()).get(lock.getInterval());
+        final List<TaskLockPosse> possesHolder = running.get(task.getDataSource()).get(lock.getInterval().getStart()).get(lock.getInterval());
         final TaskLockPosse foundPosse = possesHolder.stream()
                                                      .filter(posse -> posse.getTaskLock().equals(lock))
                                                      .findFirst()
@@ -733,13 +752,19 @@ public void unlock(final Task task, final Interval interval)
 
     try {
       final String dataSource = task.getDataSource();
-      final NavigableMap<Interval, List<TaskLockPosse>> dsRunning = running.get(task.getDataSource());
+      final NavigableMap<DateTime, SortedMap<Interval, List<TaskLockPosse>>> dsRunning = running.get(task.getDataSource());
 
       if (dsRunning == null || dsRunning.isEmpty()) {
         return;
       }
 
-      final List<TaskLockPosse> possesHolder = dsRunning.get(interval);
+      final SortedMap<Interval, List<TaskLockPosse>> intervalToPosses = dsRunning.get(interval.getStart());
+
+      if (intervalToPosses == null || intervalToPosses.isEmpty()) {
+        return;
+      }
+
+      final List<TaskLockPosse> possesHolder = intervalToPosses.get(interval);
       if (possesHolder == null || possesHolder.isEmpty()) {
         return;
       }
@@ -760,8 +785,12 @@ public void unlock(final Task task, final Interval interval)
           possesHolder.remove(taskLockPosse);
         }
 
-        if (possesHolder.size() == 0) {
-          dsRunning.remove(interval);
+        if (possesHolder.isEmpty()) {
+          intervalToPosses.remove(interval);
+        }
+
+        if (intervalToPosses.isEmpty()) {
+          dsRunning.remove(interval.getStart());
         }
 
         if (running.get(dataSource).size() == 0) {
@@ -797,6 +826,18 @@ public void unlock(final Task task, final Interval interval)
     }
   }
 
+  public void add(Task task)
+  {
+    giant.lock();
+    try {
+      log.info("Adding task[%s] to activeTasks", task.getId());
+      activeTasks.add(task.getId());
+    }
+    finally {
+      giant.unlock();
+    }
+  }
+
   /**
    * Release all locks for a task and remove task from set of active tasks. Does nothing if the task is not currently locked or not an active task.
    *
@@ -832,11 +873,12 @@ public void remove(final Task task)
 
     try {
       // Scan through all locks for this datasource
-      final NavigableMap<Interval, List<TaskLockPosse>> dsRunning = running.get(task.getDataSource());
+      final NavigableMap<DateTime, SortedMap<Interval, List<TaskLockPosse>>> dsRunning = running.get(task.getDataSource());
       if (dsRunning == null) {
         return ImmutableList.of();
       } else {
         return dsRunning.values().stream()
+                        .flatMap(map -> map.values().stream())
                         .flatMap(Collection::stream)
                         .filter(taskLockPosse -> taskLockPosse.containsTask(task))
                         .collect(Collectors.toList());
@@ -870,29 +912,28 @@ public void remove(final Task task)
     giant.lock();
 
     try {
-      final NavigableMap<Interval, List<TaskLockPosse>> dsRunning = running.get(dataSource);
+      final NavigableMap<DateTime, SortedMap<Interval, List<TaskLockPosse>>> dsRunning = running.get(dataSource);
       if (dsRunning == null) {
         // No locks at all
         return Collections.emptyList();
       } else {
         // Tasks are indexed by locked interval, which are sorted by interval start. Intervals are non-overlapping, so:
-        final NavigableSet<Interval> dsLockbox = dsRunning.navigableKeySet();
-        final Iterable<Interval> searchIntervals = Iterables.concat(
+        final NavigableSet<DateTime> dsLockbox = dsRunning.navigableKeySet();
+        final Iterable<DateTime> searchStartTimes = Iterables.concat(
             // Single interval that starts at or before ours
-            Collections.singletonList(dsLockbox.floor(new Interval(interval.getStart(), DateTimes.MAX))),
+            Collections.singletonList(dsLockbox.floor(interval.getStart())),
 
             // All intervals that start somewhere between our start instant (exclusive) and end instant (exclusive)
-            dsLockbox.subSet(
-                new Interval(interval.getStart(), DateTimes.MAX),
-                false,
-                new Interval(interval.getEnd(), interval.getEnd()),
-                false
-            )
+            dsLockbox.subSet(interval.getStart(), false, interval.getEnd(), false)
         );
 
-        return StreamSupport.stream(searchIntervals.spliterator(), false)
-                            .filter(searchInterval -> searchInterval != null && searchInterval.overlaps(interval))
-                            .flatMap(searchInterval -> dsRunning.get(searchInterval).stream())
+        return StreamSupport.stream(searchStartTimes.spliterator(), false)
+                            .filter(java.util.Objects::nonNull)
+                            .map(dsRunning::get)
+                            .filter(java.util.Objects::nonNull)
+                            .flatMap(sortedMap -> sortedMap.entrySet().stream())
+                            .filter(entry -> entry.getKey().overlaps(interval))
+                            .flatMap(entry -> entry.getValue().stream())
                             .collect(Collectors.toList());
       }
     }
@@ -901,12 +942,24 @@ public void remove(final Task task)
     }
   }
 
-  public void add(Task task)
+  @VisibleForTesting
+  TaskLockPosse getOnlyTaskLockPosseContainingInterval(Task task, Interval interval)
   {
     giant.lock();
+
     try {
-      log.info("Adding task[%s] to activeTasks", task.getId());
-      activeTasks.add(task.getId());
+      final List<TaskLockPosse> filteredPosses = findLockPossesContainingInterval(task.getDataSource(), interval)
+          .stream()
+          .filter(lockPosse -> lockPosse.containsTask(task))
+          .collect(Collectors.toList());
+
+      if (filteredPosses.isEmpty()) {
+        throw new ISE("Cannot find locks for task[%s] and interval[%s]", task.getId(), interval);
+      } else if (filteredPosses.size() > 1) {
+        throw new ISE("There are multiple lockPosses for task[%s] and interval[%s]?", task.getId(), interval);
+      } else {
+        return filteredPosses.get(0);
+      }
     }
     finally {
       giant.unlock();
@@ -936,22 +989,6 @@ private static boolean isRevocable(TaskLockPosse lockPosse, int tryLockPriority)
     return existingLock.isRevoked() || existingLock.getNonNullPriority() < tryLockPriority;
   }
 
-  private TaskLockPosse getOnlyTaskLockPosseContainingInterval(Task task, Interval interval)
-  {
-    final List<TaskLockPosse> filteredPosses = findLockPossesContainingInterval(task.getDataSource(), interval)
-        .stream()
-        .filter(lockPosse -> lockPosse.containsTask(task))
-        .collect(Collectors.toList());
-
-    if (filteredPosses.isEmpty()) {
-      throw new ISE("Cannot find locks for task[%s] and interval[%s]", task.getId(), interval);
-    } else if (filteredPosses.size() > 1) {
-      throw new ISE("There are multiple lockPosses for task[%s] and interval[%s]?", task.getId(), interval);
-    } else {
-      return filteredPosses.get(0);
-    }
-  }
-
   @VisibleForTesting
   Set<String> getActiveTasks()
   {
@@ -959,7 +996,7 @@ private TaskLockPosse getOnlyTaskLockPosseContainingInterval(Task task, Interval
   }
 
   @VisibleForTesting
-  Map<String, NavigableMap<Interval, List<TaskLockPosse>>> getAllLocks()
+  Map<String, NavigableMap<DateTime, SortedMap<Interval, List<TaskLockPosse>>>> getAllLocks()
   {
     return running;
   }
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
index d598b1f897b..56042551251 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
@@ -35,6 +35,7 @@
 import org.apache.druid.indexing.common.task.AbstractTask;
 import org.apache.druid.indexing.common.task.NoopTask;
 import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.overlord.TaskLockbox.TaskLockPosse;
 import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.Intervals;
@@ -624,6 +625,48 @@ public void testUnlock() throws EntryExistsException
     Assert.assertTrue(lockbox.getAllLocks().isEmpty());
   }
 
+  @Test
+  public void testFindLockPosseAfterRevokeWithDifferentLockIntervals() throws EntryExistsException
+  {
+    final Task lowPriorityTask = NoopTask.create(0);
+    final Task highPriorityTask = NoopTask.create(10);
+
+    taskStorage.insert(lowPriorityTask, TaskStatus.running(lowPriorityTask.getId()));
+    taskStorage.insert(highPriorityTask, TaskStatus.running(highPriorityTask.getId()));
+    lockbox.add(lowPriorityTask);
+    lockbox.add(highPriorityTask);
+
+    Assert.assertTrue(
+        lockbox.tryLock(
+            TaskLockType.EXCLUSIVE,
+            lowPriorityTask, Intervals.of("2018-12-16T09:00:00/2018-12-16T10:00:00")
+        ).isOk()
+    );
+
+    Assert.assertTrue(
+        lockbox.tryLock(
+            TaskLockType.EXCLUSIVE,
+            highPriorityTask, Intervals.of("2018-12-16T09:00:00/2018-12-16T09:30:00")
+        ).isOk()
+    );
+
+    final TaskLockPosse highLockPosse = lockbox.getOnlyTaskLockPosseContainingInterval(
+        highPriorityTask,
+        Intervals.of("2018-12-16T09:00:00/2018-12-16T09:30:00")
+    );
+
+    Assert.assertTrue(highLockPosse.containsTask(highPriorityTask));
+    Assert.assertFalse(highLockPosse.getTaskLock().isRevoked());
+
+    final TaskLockPosse lowLockPosse = lockbox.getOnlyTaskLockPosseContainingInterval(
+        lowPriorityTask,
+        Intervals.of("2018-12-16T09:00:00/2018-12-16T10:00:00")
+    );
+
+    Assert.assertTrue(lowLockPosse.containsTask(lowPriorityTask));
+    Assert.assertTrue(lowLockPosse.getTaskLock().isRevoked());
+  }
+
   private Set<TaskLock> getAllLocks(List<Task> tasks)
   {
     return tasks.stream()


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org