You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@druid.apache.org by GitBox <gi...@apache.org> on 2018/07/09 18:22:45 UTC

[GitHub] gianm closed pull request #5967: [Backport] Fix mismatch in revoked task locks between memory and metastore after sync from storage

gianm closed pull request #5967: [Backport] Fix mismatch in revoked task locks between memory and metastore after sync from storage
URL: https://github.com/apache/incubator-druid/pull/5967
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/MetadataTaskStorage.java b/indexing-service/src/main/java/io/druid/indexing/overlord/MetadataTaskStorage.java
index 011aaa5d167..4025b1c5b32 100644
--- a/indexing-service/src/main/java/io/druid/indexing/overlord/MetadataTaskStorage.java
+++ b/indexing-service/src/main/java/io/druid/indexing/overlord/MetadataTaskStorage.java
@@ -258,15 +258,15 @@ public void replaceLock(String taskid, TaskLock oldLock, TaskLock newLock)
     Preconditions.checkNotNull(newLock, "newLock");
 
     log.info(
-        "Replacing lock on interval[%s] version[%s] for task: %s",
-        oldLock.getInterval(),
-        oldLock.getVersion(),
+        "Replacing an existing lock[%s] with a new lock[%s] for task: %s",
+        oldLock,
+        newLock,
         taskid
     );
 
     final Long oldLockId = handler.getLockId(taskid, oldLock);
     if (oldLockId == null) {
-      throw new ISE("Cannot find lock[%s]", oldLock);
+      throw new ISE("Cannot find an existing lock[%s]", oldLock);
     }
 
     handler.replaceLock(taskid, oldLockId, newLock);
diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskLockbox.java b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskLockbox.java
index 5a6c2a29773..286296d1c4c 100644
--- a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskLockbox.java
+++ b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskLockbox.java
@@ -31,14 +31,15 @@
 import com.google.common.collect.Ordering;
 import com.google.common.collect.Sets;
 import com.google.inject.Inject;
-import io.druid.java.util.emitter.EmittingLogger;
 import io.druid.indexing.common.TaskLock;
 import io.druid.indexing.common.TaskLockType;
 import io.druid.indexing.common.task.Task;
 import io.druid.java.util.common.DateTimes;
 import io.druid.java.util.common.ISE;
 import io.druid.java.util.common.Pair;
+import io.druid.java.util.common.StringUtils;
 import io.druid.java.util.common.guava.Comparators;
+import io.druid.java.util.emitter.EmittingLogger;
 import org.joda.time.Interval;
 
 import javax.annotation.Nullable;
@@ -133,12 +134,7 @@ public int compare(Pair<Task, TaskLock> left, Pair<Task, TaskLock> right)
           continue;
         }
 
-        final TaskLockPosse taskLockPosse = createOrFindLockPosse(
-            task,
-            savedTaskLock.getInterval(),
-            savedTaskLock.getVersion(),
-            savedTaskLock.getType()
-        );
+        final TaskLockPosse taskLockPosse = createOrFindLockPosse(task, savedTaskLock);
         if (taskLockPosse != null) {
           taskLockPosse.addTask(task);
 
@@ -147,9 +143,8 @@ public int compare(Pair<Task, TaskLock> left, Pair<Task, TaskLock> right)
           if (savedTaskLock.getVersion().equals(taskLock.getVersion())) {
             taskLockCount++;
             log.info(
-                "Reacquired lock on interval[%s] version[%s] for task: %s",
-                savedTaskLock.getInterval(),
-                savedTaskLock.getVersion(),
+                "Reacquired lock[%s] for task: %s",
+                taskLock,
                 task.getId()
             );
           } else {
@@ -340,7 +335,7 @@ private TaskLockPosse createOrFindLockPosse(
    *
    * @return a lock posse or null if any posse is found and a new poss cannot be created
    *
-   * @see #createNewTaskLockPosse(TaskLockType, String, String, Interval, String, int)
+   * @see #createNewTaskLockPosse
    */
   @Nullable
   private TaskLockPosse createOrFindLockPosse(
@@ -353,8 +348,78 @@ private TaskLockPosse createOrFindLockPosse(
     giant.lock();
 
     try {
-      final String dataSource = task.getDataSource();
-      final int priority = task.getPriority();
+      return createOrFindLockPosse(
+          lockType,
+          task.getId(),
+          task.getGroupId(),
+          task.getDataSource(),
+          interval,
+          preferredVersion,
+          task.getPriority(),
+          false
+      );
+    }
+    finally {
+      giant.unlock();
+    }
+  }
+
+  @Nullable
+  private TaskLockPosse createOrFindLockPosse(Task task, TaskLock taskLock)
+  {
+    giant.lock();
+
+    try {
+      Preconditions.checkArgument(
+          task.getGroupId().equals(taskLock.getGroupId()),
+          "lock groupId[%s] is different from task groupId[%s]",
+          taskLock.getGroupId(),
+          task.getGroupId()
+      );
+      Preconditions.checkArgument(
+          task.getDataSource().equals(taskLock.getDataSource()),
+          "lock dataSource[%s] is different from task dataSource[%s]",
+          taskLock.getDataSource(),
+          task.getDataSource()
+      );
+      Preconditions.checkArgument(
+          task.getPriority() == taskLock.getPriority(),
+          "lock priority[%s] is different from task priority[%s]",
+          taskLock.getPriority(),
+          task.getPriority()
+      );
+
+      return createOrFindLockPosse(
+          taskLock.getType(),
+          task.getId(),
+          taskLock.getGroupId(),
+          taskLock.getDataSource(),
+          taskLock.getInterval(),
+          taskLock.getVersion(),
+          taskLock.getPriority(),
+          taskLock.isRevoked()
+      );
+    }
+    finally {
+      giant.unlock();
+    }
+  }
+
+  @Nullable
+  private TaskLockPosse createOrFindLockPosse(
+      TaskLockType lockType,
+      String taskId,
+      String groupId,
+      String dataSource,
+      Interval interval,
+      @Nullable String preferredVersion,
+      int priority,
+      boolean revoked
+  )
+  {
+    giant.lock();
+
+    try {
       final List<TaskLockPosse> foundPosses = findLockPossesOverlapsInterval(dataSource, interval);
 
       if (foundPosses.size() > 0) {
@@ -362,7 +427,7 @@ private TaskLockPosse createOrFindLockPosse(
         // If they can't be reused, check lock priority and revoke existing locks if possible.
         final List<TaskLockPosse> filteredPosses = foundPosses
             .stream()
-            .filter(posse -> matchGroupIdAndContainInterval(posse.taskLock, task, interval))
+            .filter(posse -> matchGroupIdAndContainInterval(posse.taskLock, groupId, interval))
             .collect(Collectors.toList());
 
         if (filteredPosses.size() == 0) {
@@ -372,11 +437,12 @@ private TaskLockPosse createOrFindLockPosse(
             // Any number of shared locks can be acquired for the same dataSource and interval.
             return createNewTaskLockPosse(
                 lockType,
-                task.getGroupId(),
+                groupId,
                 dataSource,
                 interval,
                 preferredVersion,
-                priority
+                priority,
+                revoked
             );
           } else {
             if (isAllRevocable(foundPosses, priority)) {
@@ -385,14 +451,40 @@ private TaskLockPosse createOrFindLockPosse(
 
               return createNewTaskLockPosse(
                   lockType,
-                  task.getGroupId(),
+                  groupId,
                   dataSource,
                   interval,
                   preferredVersion,
-                  priority
+                  priority,
+                  revoked
               );
             } else {
-              log.info("Cannot create a new taskLockPosse because some locks of same or higher priorities exist");
+              final String messagePrefix;
+              if (preferredVersion == null) {
+                messagePrefix = StringUtils.format(
+                    "Cannot create a new taskLockPosse for task[%s], interval[%s], priority[%d], revoked[%s]",
+                    taskId,
+                    interval,
+                    priority,
+                    revoked
+                );
+              } else {
+                messagePrefix = StringUtils.format(
+                    "Cannot create a new taskLockPosse for task[%s], interval[%s],"
+                    + " preferredVersion[%s], priority[%d], revoked[%s]",
+                    taskId,
+                    interval,
+                    preferredVersion,
+                    priority,
+                    revoked
+                );
+              }
+
+              log.info(
+                  "%s because existing locks[%s] have same or higher priorities",
+                  messagePrefix,
+                  foundPosses
+              );
               return null;
             }
           }
@@ -404,7 +496,7 @@ private TaskLockPosse createOrFindLockPosse(
           } else {
             throw new ISE(
                 "Task[%s] already acquired a lock for interval[%s] but different type[%s]",
-                task.getId(),
+                taskId,
                 interval,
                 foundPosse.getTaskLock().getType()
             );
@@ -413,7 +505,7 @@ private TaskLockPosse createOrFindLockPosse(
           // case 3) we found multiple lock posses for the given task
           throw new ISE(
               "Task group[%s] has multiple locks for the same interval[%s]?",
-              task.getGroupId(),
+              groupId,
               interval
           );
         }
@@ -422,11 +514,12 @@ private TaskLockPosse createOrFindLockPosse(
         // Let's make a new one.
         return createNewTaskLockPosse(
             lockType,
-            task.getGroupId(),
+            groupId,
             dataSource,
             interval,
             preferredVersion,
-            priority
+            priority,
+            revoked
         );
       }
     }
@@ -447,6 +540,7 @@ private TaskLockPosse createOrFindLockPosse(
    * @param interval         interval to be locked
    * @param preferredVersion preferred version string
    * @param priority         lock priority
+   * @param revoked          indicate the lock is revoked
    *
    * @return a new {@link TaskLockPosse}
    */
@@ -456,7 +550,8 @@ private TaskLockPosse createNewTaskLockPosse(
       String dataSource,
       Interval interval,
       @Nullable String preferredVersion,
-      int priority
+      int priority,
+      boolean revoked
   )
   {
     giant.lock();
@@ -479,7 +574,7 @@ private TaskLockPosse createNewTaskLockPosse(
       }
 
       final TaskLockPosse posseToUse = new TaskLockPosse(
-          new TaskLock(lockType, groupId, dataSource, interval, version, priority)
+          new TaskLock(lockType, groupId, dataSource, interval, version, priority, revoked)
       );
       running.computeIfAbsent(dataSource, k -> new TreeMap<>(Comparators.intervalsByStartThenEnd()))
              .computeIfAbsent(interval, k -> new ArrayList<>())
@@ -810,10 +905,10 @@ public void add(Task task)
     }
   }
 
-  private static boolean matchGroupIdAndContainInterval(TaskLock existingLock, Task task, Interval interval)
+  private static boolean matchGroupIdAndContainInterval(TaskLock existingLock, String taskGroupId, Interval interval)
   {
     return existingLock.getInterval().contains(interval) &&
-           existingLock.getGroupId().equals(task.getGroupId());
+           existingLock.getGroupId().equals(taskGroupId);
   }
 
   private static boolean isAllSharedLocks(List<TaskLockPosse> lockPosses)
@@ -856,7 +951,7 @@ private TaskLockPosse getOnlyTaskLockPosseContainingInterval(Task task, Interval
   }
 
   @VisibleForTesting
-  public Map<String, NavigableMap<Interval, List<TaskLockPosse>>> getAllLocks()
+  Map<String, NavigableMap<Interval, List<TaskLockPosse>>> getAllLocks()
   {
     return running;
   }
@@ -914,7 +1009,7 @@ boolean isTasksEmpty()
 
     void forEachTask(Consumer<String> action)
     {
-      Preconditions.checkNotNull(action);
+      Preconditions.checkNotNull(action, "action");
       taskIds.forEach(action);
     }
 
diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLockboxTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLockboxTest.java
index 33dd21076e2..0aa90d6401e 100644
--- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLockboxTest.java
+++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLockboxTest.java
@@ -21,8 +21,6 @@
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.Iterables;
-import io.druid.java.util.emitter.EmittingLogger;
-import io.druid.java.util.emitter.service.ServiceEmitter;
 import io.druid.indexing.common.TaskLock;
 import io.druid.indexing.common.TaskLockType;
 import io.druid.indexing.common.TaskStatus;
@@ -33,6 +31,8 @@
 import io.druid.java.util.common.ISE;
 import io.druid.java.util.common.Intervals;
 import io.druid.java.util.common.StringUtils;
+import io.druid.java.util.emitter.EmittingLogger;
+import io.druid.java.util.emitter.service.ServiceEmitter;
 import io.druid.metadata.DerbyMetadataStorageActionHandlerFactory;
 import io.druid.metadata.EntryExistsException;
 import io.druid.metadata.TestDerbyConnector;
@@ -45,9 +45,11 @@
 import org.junit.rules.ExpectedException;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -259,6 +261,48 @@ public void testSyncFromStorage() throws EntryExistsException
     Assert.assertEquals(beforeLocksInStorage, afterLocksInStorage);
   }
 
+  @Test
+  public void testRevokedLockSyncFromStorage() throws EntryExistsException
+  {
+    final TaskLockbox originalBox = new TaskLockbox(taskStorage);
+
+    final Task task1 = NoopTask.create("task1", 10);
+    taskStorage.insert(task1, TaskStatus.running(task1.getId()));
+    originalBox.add(task1);
+    Assert.assertTrue(originalBox.tryLock(TaskLockType.EXCLUSIVE, task1, Intervals.of("2017/2018")).isOk());
+
+    // task2 revokes task1
+    final Task task2 = NoopTask.create("task2", 100);
+    taskStorage.insert(task2, TaskStatus.running(task2.getId()));
+    originalBox.add(task2);
+    Assert.assertTrue(originalBox.tryLock(TaskLockType.EXCLUSIVE, task2, Intervals.of("2017/2018")).isOk());
+
+    final Map<String, List<TaskLock>> beforeLocksInStorage = taskStorage
+        .getActiveTasks()
+        .stream()
+        .collect(Collectors.toMap(Task::getId, task -> taskStorage.getLocks(task.getId())));
+
+    final List<TaskLock> task1Locks = beforeLocksInStorage.get("task1");
+    Assert.assertEquals(1, task1Locks.size());
+    Assert.assertTrue(task1Locks.get(0).isRevoked());
+
+    final List<TaskLock> task2Locks = beforeLocksInStorage.get("task1");
+    Assert.assertEquals(1, task2Locks.size());
+    Assert.assertTrue(task2Locks.get(0).isRevoked());
+
+    final TaskLockbox newBox = new TaskLockbox(taskStorage);
+    newBox.syncFromStorage();
+
+    final Set<TaskLock> afterLocksInStorage = taskStorage.getActiveTasks().stream()
+                                                          .flatMap(task -> taskStorage.getLocks(task.getId()).stream())
+                                                          .collect(Collectors.toSet());
+
+    Assert.assertEquals(
+        beforeLocksInStorage.values().stream().flatMap(Collection::stream).collect(Collectors.toSet()),
+        afterLocksInStorage
+    );
+  }
+
   @Test
   public void testDoInCriticalSectionWithSharedLock() throws Exception
   {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@druid.apache.org
For additional commands, e-mail: dev-help@druid.apache.org