You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by jo...@apache.org on 2019/01/10 03:38:34 UTC

[incubator-druid] branch master updated: Fix TaskLockbox when there are multiple intervals of the same start but differerent end (#6822)

This is an automated email from the ASF dual-hosted git repository.

jonwei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 934c83b  Fix TaskLockbox when there are multiple intervals of the same start but differerent end (#6822)
934c83b is described below

commit 934c83bca6df5db913c130af039d8893a9fcb16a
Author: Jihoon Son <ji...@apache.org>
AuthorDate: Wed Jan 9 19:38:27 2019 -0800

    Fix TaskLockbox when there are multiple intervals of the same start but differerent end (#6822)
    
    * Fix TaskLockbox when there are multiple intervals of the same start but differernt end
    
    * fix build
    
    * fix npe
---
 .../druid/indexing/overlord/TaskLockbox.java       | 139 +++++++++++++--------
 .../druid/indexing/overlord/TaskLockboxTest.java   |  43 +++++++
 2 files changed, 131 insertions(+), 51 deletions(-)

diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
index 96451b7..626f4e3 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
@@ -38,6 +38,7 @@ import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.guava.Comparators;
 import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.joda.time.DateTime;
 import org.joda.time.Interval;
 
 import javax.annotation.Nullable;
@@ -51,6 +52,7 @@ import java.util.Map;
 import java.util.NavigableMap;
 import java.util.NavigableSet;
 import java.util.Set;
+import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Condition;
@@ -66,11 +68,14 @@ import java.util.stream.StreamSupport;
  */
 public class TaskLockbox
 {
-  // Datasource -> Interval -> list of (Tasks + TaskLock)
+  // Datasource -> startTime -> Interval -> list of (Tasks + TaskLock)
   // Multiple shared locks can be acquired for the same dataSource and interval.
   // Note that revoked locks are also maintained in this map to notify that those locks are revoked to the callers when
   // they acquire the same locks again.
-  private final Map<String, NavigableMap<Interval, List<TaskLockPosse>>> running = new HashMap<>();
+  // Also, the key of the second inner map is the start time to find all intervals properly starting with the same
+  // startTime.
+  private final Map<String, NavigableMap<DateTime, SortedMap<Interval, List<TaskLockPosse>>>> running = new HashMap<>();
+
   private final TaskStorage taskStorage;
   private final ReentrantLock giant = new ReentrantLock(true);
   private final Condition lockReleaseCondition = giant.newCondition();
@@ -326,7 +331,14 @@ public class TaskLockbox
       final TaskLockType lockType
   )
   {
-    return createOrFindLockPosse(task, interval, null, lockType);
+    giant.lock();
+
+    try {
+      return createOrFindLockPosse(task, interval, null, lockType);
+    }
+    finally {
+      giant.unlock();
+    }
   }
 
   /**
@@ -584,7 +596,8 @@ public class TaskLockbox
       final TaskLockPosse posseToUse = new TaskLockPosse(
           new TaskLock(lockType, groupId, dataSource, interval, version, priority, revoked)
       );
-      running.computeIfAbsent(dataSource, k -> new TreeMap<>(Comparators.intervalsByStartThenEnd()))
+      running.computeIfAbsent(dataSource, k -> new TreeMap<>())
+             .computeIfAbsent(interval.getStart(), k -> new TreeMap<>(Comparators.intervalsByStartThenEnd()))
              .computeIfAbsent(interval, k -> new ArrayList<>())
              .add(posseToUse);
 
@@ -612,7 +625,7 @@ public class TaskLockbox
       CriticalAction<T> action
   ) throws Exception
   {
-    giant.lockInterruptibly();
+    giant.lock();
 
     try {
       return action.perform(isTaskLocksValid(task, intervals));
@@ -624,13 +637,19 @@ public class TaskLockbox
 
   private boolean isTaskLocksValid(Task task, List<Interval> intervals)
   {
-    return intervals
-        .stream()
-        .allMatch(interval -> {
-          final TaskLock lock = getOnlyTaskLockPosseContainingInterval(task, interval).getTaskLock();
-          // Tasks cannot enter the critical section with a shared lock
-          return !lock.isRevoked() && lock.getType() != TaskLockType.SHARED;
-        });
+    giant.lock();
+    try {
+      return intervals
+          .stream()
+          .allMatch(interval -> {
+            final TaskLock lock = getOnlyTaskLockPosseContainingInterval(task, interval).getTaskLock();
+            // Tasks cannot enter the critical section with a shared lock
+            return !lock.isRevoked() && lock.getType() != TaskLockType.SHARED;
+          });
+    }
+    finally {
+      giant.unlock();
+    }
   }
 
   private void revokeLock(TaskLockPosse lockPosse)
@@ -676,7 +695,7 @@ public class TaskLockbox
         final TaskLock revokedLock = lock.revokedCopy();
         taskStorage.replaceLock(taskId, lock, revokedLock);
 
-        final List<TaskLockPosse> possesHolder = running.get(task.getDataSource()).get(lock.getInterval());
+        final List<TaskLockPosse> possesHolder = running.get(task.getDataSource()).get(lock.getInterval().getStart()).get(lock.getInterval());
         final TaskLockPosse foundPosse = possesHolder.stream()
                                                      .filter(posse -> posse.getTaskLock().equals(lock))
                                                      .findFirst()
@@ -733,13 +752,19 @@ public class TaskLockbox
 
     try {
       final String dataSource = task.getDataSource();
-      final NavigableMap<Interval, List<TaskLockPosse>> dsRunning = running.get(task.getDataSource());
+      final NavigableMap<DateTime, SortedMap<Interval, List<TaskLockPosse>>> dsRunning = running.get(task.getDataSource());
 
       if (dsRunning == null || dsRunning.isEmpty()) {
         return;
       }
 
-      final List<TaskLockPosse> possesHolder = dsRunning.get(interval);
+      final SortedMap<Interval, List<TaskLockPosse>> intervalToPosses = dsRunning.get(interval.getStart());
+
+      if (intervalToPosses == null || intervalToPosses.isEmpty()) {
+        return;
+      }
+
+      final List<TaskLockPosse> possesHolder = intervalToPosses.get(interval);
       if (possesHolder == null || possesHolder.isEmpty()) {
         return;
       }
@@ -760,8 +785,12 @@ public class TaskLockbox
           possesHolder.remove(taskLockPosse);
         }
 
-        if (possesHolder.size() == 0) {
-          dsRunning.remove(interval);
+        if (possesHolder.isEmpty()) {
+          intervalToPosses.remove(interval);
+        }
+
+        if (intervalToPosses.isEmpty()) {
+          dsRunning.remove(interval.getStart());
         }
 
         if (running.get(dataSource).size() == 0) {
@@ -797,6 +826,18 @@ public class TaskLockbox
     }
   }
 
+  public void add(Task task)
+  {
+    giant.lock();
+    try {
+      log.info("Adding task[%s] to activeTasks", task.getId());
+      activeTasks.add(task.getId());
+    }
+    finally {
+      giant.unlock();
+    }
+  }
+
   /**
    * Release all locks for a task and remove task from set of active tasks. Does nothing if the task is not currently locked or not an active task.
    *
@@ -832,11 +873,12 @@ public class TaskLockbox
 
     try {
       // Scan through all locks for this datasource
-      final NavigableMap<Interval, List<TaskLockPosse>> dsRunning = running.get(task.getDataSource());
+      final NavigableMap<DateTime, SortedMap<Interval, List<TaskLockPosse>>> dsRunning = running.get(task.getDataSource());
       if (dsRunning == null) {
         return ImmutableList.of();
       } else {
         return dsRunning.values().stream()
+                        .flatMap(map -> map.values().stream())
                         .flatMap(Collection::stream)
                         .filter(taskLockPosse -> taskLockPosse.containsTask(task))
                         .collect(Collectors.toList());
@@ -870,29 +912,28 @@ public class TaskLockbox
     giant.lock();
 
     try {
-      final NavigableMap<Interval, List<TaskLockPosse>> dsRunning = running.get(dataSource);
+      final NavigableMap<DateTime, SortedMap<Interval, List<TaskLockPosse>>> dsRunning = running.get(dataSource);
       if (dsRunning == null) {
         // No locks at all
         return Collections.emptyList();
       } else {
         // Tasks are indexed by locked interval, which are sorted by interval start. Intervals are non-overlapping, so:
-        final NavigableSet<Interval> dsLockbox = dsRunning.navigableKeySet();
-        final Iterable<Interval> searchIntervals = Iterables.concat(
+        final NavigableSet<DateTime> dsLockbox = dsRunning.navigableKeySet();
+        final Iterable<DateTime> searchStartTimes = Iterables.concat(
             // Single interval that starts at or before ours
-            Collections.singletonList(dsLockbox.floor(new Interval(interval.getStart(), DateTimes.MAX))),
+            Collections.singletonList(dsLockbox.floor(interval.getStart())),
 
             // All intervals that start somewhere between our start instant (exclusive) and end instant (exclusive)
-            dsLockbox.subSet(
-                new Interval(interval.getStart(), DateTimes.MAX),
-                false,
-                new Interval(interval.getEnd(), interval.getEnd()),
-                false
-            )
+            dsLockbox.subSet(interval.getStart(), false, interval.getEnd(), false)
         );
 
-        return StreamSupport.stream(searchIntervals.spliterator(), false)
-                            .filter(searchInterval -> searchInterval != null && searchInterval.overlaps(interval))
-                            .flatMap(searchInterval -> dsRunning.get(searchInterval).stream())
+        return StreamSupport.stream(searchStartTimes.spliterator(), false)
+                            .filter(java.util.Objects::nonNull)
+                            .map(dsRunning::get)
+                            .filter(java.util.Objects::nonNull)
+                            .flatMap(sortedMap -> sortedMap.entrySet().stream())
+                            .filter(entry -> entry.getKey().overlaps(interval))
+                            .flatMap(entry -> entry.getValue().stream())
                             .collect(Collectors.toList());
       }
     }
@@ -901,12 +942,24 @@ public class TaskLockbox
     }
   }
 
-  public void add(Task task)
+  @VisibleForTesting
+  TaskLockPosse getOnlyTaskLockPosseContainingInterval(Task task, Interval interval)
   {
     giant.lock();
+
     try {
-      log.info("Adding task[%s] to activeTasks", task.getId());
-      activeTasks.add(task.getId());
+      final List<TaskLockPosse> filteredPosses = findLockPossesContainingInterval(task.getDataSource(), interval)
+          .stream()
+          .filter(lockPosse -> lockPosse.containsTask(task))
+          .collect(Collectors.toList());
+
+      if (filteredPosses.isEmpty()) {
+        throw new ISE("Cannot find locks for task[%s] and interval[%s]", task.getId(), interval);
+      } else if (filteredPosses.size() > 1) {
+        throw new ISE("There are multiple lockPosses for task[%s] and interval[%s]?", task.getId(), interval);
+      } else {
+        return filteredPosses.get(0);
+      }
     }
     finally {
       giant.unlock();
@@ -936,22 +989,6 @@ public class TaskLockbox
     return existingLock.isRevoked() || existingLock.getNonNullPriority() < tryLockPriority;
   }
 
-  private TaskLockPosse getOnlyTaskLockPosseContainingInterval(Task task, Interval interval)
-  {
-    final List<TaskLockPosse> filteredPosses = findLockPossesContainingInterval(task.getDataSource(), interval)
-        .stream()
-        .filter(lockPosse -> lockPosse.containsTask(task))
-        .collect(Collectors.toList());
-
-    if (filteredPosses.isEmpty()) {
-      throw new ISE("Cannot find locks for task[%s] and interval[%s]", task.getId(), interval);
-    } else if (filteredPosses.size() > 1) {
-      throw new ISE("There are multiple lockPosses for task[%s] and interval[%s]?", task.getId(), interval);
-    } else {
-      return filteredPosses.get(0);
-    }
-  }
-
   @VisibleForTesting
   Set<String> getActiveTasks()
   {
@@ -959,7 +996,7 @@ public class TaskLockbox
   }
 
   @VisibleForTesting
-  Map<String, NavigableMap<Interval, List<TaskLockPosse>>> getAllLocks()
+  Map<String, NavigableMap<DateTime, SortedMap<Interval, List<TaskLockPosse>>>> getAllLocks()
   {
     return running;
   }
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
index d598b1f..5604255 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
@@ -35,6 +35,7 @@ import org.apache.druid.indexing.common.config.TaskStorageConfig;
 import org.apache.druid.indexing.common.task.AbstractTask;
 import org.apache.druid.indexing.common.task.NoopTask;
 import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.overlord.TaskLockbox.TaskLockPosse;
 import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.Intervals;
@@ -624,6 +625,48 @@ public class TaskLockboxTest
     Assert.assertTrue(lockbox.getAllLocks().isEmpty());
   }
 
+  @Test
+  public void testFindLockPosseAfterRevokeWithDifferentLockIntervals() throws EntryExistsException
+  {
+    final Task lowPriorityTask = NoopTask.create(0);
+    final Task highPriorityTask = NoopTask.create(10);
+
+    taskStorage.insert(lowPriorityTask, TaskStatus.running(lowPriorityTask.getId()));
+    taskStorage.insert(highPriorityTask, TaskStatus.running(highPriorityTask.getId()));
+    lockbox.add(lowPriorityTask);
+    lockbox.add(highPriorityTask);
+
+    Assert.assertTrue(
+        lockbox.tryLock(
+            TaskLockType.EXCLUSIVE,
+            lowPriorityTask, Intervals.of("2018-12-16T09:00:00/2018-12-16T10:00:00")
+        ).isOk()
+    );
+
+    Assert.assertTrue(
+        lockbox.tryLock(
+            TaskLockType.EXCLUSIVE,
+            highPriorityTask, Intervals.of("2018-12-16T09:00:00/2018-12-16T09:30:00")
+        ).isOk()
+    );
+
+    final TaskLockPosse highLockPosse = lockbox.getOnlyTaskLockPosseContainingInterval(
+        highPriorityTask,
+        Intervals.of("2018-12-16T09:00:00/2018-12-16T09:30:00")
+    );
+
+    Assert.assertTrue(highLockPosse.containsTask(highPriorityTask));
+    Assert.assertFalse(highLockPosse.getTaskLock().isRevoked());
+
+    final TaskLockPosse lowLockPosse = lockbox.getOnlyTaskLockPosseContainingInterval(
+        lowPriorityTask,
+        Intervals.of("2018-12-16T09:00:00/2018-12-16T10:00:00")
+    );
+
+    Assert.assertTrue(lowLockPosse.containsTask(lowPriorityTask));
+    Assert.assertTrue(lowLockPosse.getTaskLock().isRevoked());
+  }
+
   private Set<TaskLock> getAllLocks(List<Task> tasks)
   {
     return tasks.stream()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org