You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ji...@apache.org on 2018/08/04 02:23:41 UTC

[incubator-druid] branch 0.12.2 updated: [Backport] Fix IllegalArgumentException in TaskLockBox.syncFromStorage() when updating from 0.12.x to 0.12.2. (#6106)

This is an automated email from the ASF dual-hosted git repository.

jihoonson pushed a commit to branch 0.12.2
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/0.12.2 by this push:
     new 7cc4fa6  [Backport] Fix IllegalArgumentException in TaskLockBox.syncFromStorage() when updating from 0.12.x to 0.12.2. (#6106)
7cc4fa6 is described below

commit 7cc4fa6355eef876846d358c40c5cce07fcfabfa
Author: Jihoon Son <ji...@apache.org>
AuthorDate: Fri Aug 3 19:23:38 2018 -0700

    [Backport] Fix IllegalArgumentException in TaskLockBox.syncFromStorage() when updating from 0.12.x to 0.12.2. (#6106)
---
 .../io/druid/indexing/kafka/KafkaIndexTask.java    |   4 +-
 .../java/io/druid/indexing/common/TaskLock.java    |  27 +++-
 .../druid/indexing/common/task/CompactionTask.java |   4 +-
 .../indexing/common/task/HadoopIndexTask.java      |   4 +-
 .../io/druid/indexing/common/task/IndexTask.java   |   4 +-
 .../druid/indexing/common/task/MergeTaskBase.java  |   4 +-
 .../io/druid/indexing/common/task/NoopTask.java    |   4 +-
 .../indexing/common/task/RealtimeIndexTask.java    |   4 +-
 .../java/io/druid/indexing/common/task/Task.java   |   8 --
 .../io/druid/indexing/overlord/TaskLockbox.java    |  35 +++--
 .../indexing/overlord/http/OverlordResource.java   |   6 -
 .../druid/indexing/overlord/TaskLockboxTest.java   | 144 +++++++++++++++++++++
 .../druid/indexing/overlord/http/OverlordTest.java |   8 --
 13 files changed, 203 insertions(+), 53 deletions(-)

diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
index 76468b6..5f039c0 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
@@ -299,9 +299,9 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
   }
 
   @Override
-  public int getDefaultPriority()
+  public int getPriority()
   {
-    return Tasks.DEFAULT_REALTIME_TASK_PRIORITY;
+    return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_REALTIME_TASK_PRIORITY);
   }
 
   @Override
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/TaskLock.java b/indexing-service/src/main/java/io/druid/indexing/common/TaskLock.java
index 5c7ca14..273335e 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/TaskLock.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/TaskLock.java
@@ -37,9 +37,22 @@ public class TaskLock
   private final String dataSource;
   private final Interval interval;
   private final String version;
-  private final int priority;
+  private final Integer priority;
   private final boolean revoked;
 
+  public static TaskLock withPriority(TaskLock lock, int priority)
+  {
+    return new TaskLock(
+        lock.type,
+        lock.getGroupId(),
+        lock.getDataSource(),
+        lock.getInterval(),
+        lock.getVersion(),
+        priority,
+        lock.isRevoked()
+    );
+  }
+
   @JsonCreator
   public TaskLock(
       @JsonProperty("type") @Nullable TaskLockType type,            // nullable for backward compatibility
@@ -47,7 +60,7 @@ public class TaskLock
       @JsonProperty("dataSource") String dataSource,
       @JsonProperty("interval") Interval interval,
       @JsonProperty("version") String version,
-      @JsonProperty("priority") int priority,
+      @JsonProperty("priority") @Nullable Integer priority,
       @JsonProperty("revoked") boolean revoked
   )
   {
@@ -116,11 +129,17 @@ public class TaskLock
   }
 
   @JsonProperty
-  public int getPriority()
+  @Nullable
+  public Integer getPriority()
   {
     return priority;
   }
 
+  public int getNonNullPriority()
+  {
+    return Preconditions.checkNotNull(priority, "priority");
+  }
+
   @JsonProperty
   public boolean isRevoked()
   {
@@ -139,7 +158,7 @@ public class TaskLock
              this.dataSource.equals(that.dataSource) &&
              this.interval.equals(that.interval) &&
              this.version.equals(that.version) &&
-             this.priority == that.priority &&
+             Objects.equal(this.priority, that.priority) &&
              this.revoked == that.revoked;
     }
   }
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/CompactionTask.java
index ad77601..5e94285 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/CompactionTask.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/CompactionTask.java
@@ -160,9 +160,9 @@ public class CompactionTask extends AbstractTask
   }
 
   @Override
-  public int getDefaultPriority()
+  public int getPriority()
   {
-    return Tasks.DEFAULT_MERGE_TASK_PRIORITY;
+    return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_MERGE_TASK_PRIORITY);
   }
 
   @VisibleForTesting
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java
index 6a08a82..4386b5b 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java
@@ -121,9 +121,9 @@ public class HadoopIndexTask extends HadoopTask
   }
 
   @Override
-  public int getDefaultPriority()
+  public int getPriority()
   {
-    return Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY;
+    return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY);
   }
 
   @Override
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java
index 6cbc278..b9ec270 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java
@@ -170,9 +170,9 @@ public class IndexTask extends AbstractTask
   }
 
   @Override
-  public int getDefaultPriority()
+  public int getPriority()
   {
-    return Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY;
+    return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY);
   }
 
   @Override
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java b/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java
index 3a9fc97..acf1be0 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java
@@ -133,9 +133,9 @@ public abstract class MergeTaskBase extends AbstractFixedIntervalTask
   }
 
   @Override
-  public int getDefaultPriority()
+  public int getPriority()
   {
-    return Tasks.DEFAULT_MERGE_TASK_PRIORITY;
+    return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_MERGE_TASK_PRIORITY);
   }
 
   @Override
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/NoopTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/NoopTask.java
index 6c6358d..e202774 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/NoopTask.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/NoopTask.java
@@ -152,9 +152,9 @@ public class NoopTask extends AbstractTask
   }
 
   @Override
-  public int getDefaultPriority()
+  public int getPriority()
   {
-    return Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY;
+    return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY);
   }
 
   public static NoopTask create()
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java
index 053df11..b3a12f7 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java
@@ -162,9 +162,9 @@ public class RealtimeIndexTask extends AbstractTask
   }
 
   @Override
-  public int getDefaultPriority()
+  public int getPriority()
   {
-    return Tasks.DEFAULT_REALTIME_TASK_PRIORITY;
+    return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_REALTIME_TASK_PRIORITY);
   }
 
   @Override
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java b/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java
index c69b6b5..1c7a134 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java
@@ -98,14 +98,6 @@ public interface Task
   }
 
   /**
-   * Returns the default task priority. It can vary depending on the task type.
-   */
-  default int getDefaultPriority()
-  {
-    return Tasks.DEFAULT_TASK_PRIORITY;
-  }
-
-  /**
    * Returns a {@link TaskResource} for this task. Task resources define specific worker requirements a task may
    * require.
    *
diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskLockbox.java b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskLockbox.java
index 286296d..8488257 100644
--- a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskLockbox.java
+++ b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskLockbox.java
@@ -130,17 +130,23 @@ public class TaskLockbox
         final TaskLock savedTaskLock = taskAndLock.rhs;
         if (savedTaskLock.getInterval().toDurationMillis() <= 0) {
           // "Impossible", but you never know what crazy stuff can be restored from storage.
-          log.warn("WTF?! Got lock with empty interval for task: %s", task.getId());
+          log.warn("WTF?! Got lock[%s] with empty interval for task: %s", savedTaskLock, task.getId());
           continue;
         }
 
-        final TaskLockPosse taskLockPosse = createOrFindLockPosse(task, savedTaskLock);
+        // Create a new taskLock if it doesn't have a proper priority,
+        // so that every taskLock in memory has the priority.
+        final TaskLock savedTaskLockWithPriority = savedTaskLock.getPriority() == null
+                                      ? TaskLock.withPriority(savedTaskLock, task.getPriority())
+                                      : savedTaskLock;
+
+        final TaskLockPosse taskLockPosse = createOrFindLockPosse(task, savedTaskLockWithPriority);
         if (taskLockPosse != null) {
           taskLockPosse.addTask(task);
 
           final TaskLock taskLock = taskLockPosse.getTaskLock();
 
-          if (savedTaskLock.getVersion().equals(taskLock.getVersion())) {
+          if (savedTaskLockWithPriority.getVersion().equals(taskLock.getVersion())) {
             taskLockCount++;
             log.info(
                 "Reacquired lock[%s] for task: %s",
@@ -151,8 +157,8 @@ public class TaskLockbox
             taskLockCount++;
             log.info(
                 "Could not reacquire lock on interval[%s] version[%s] (got version[%s] instead) for task: %s",
-                savedTaskLock.getInterval(),
-                savedTaskLock.getVersion(),
+                savedTaskLockWithPriority.getInterval(),
+                savedTaskLockWithPriority.getVersion(),
                 taskLock.getVersion(),
                 task.getId()
             );
@@ -160,8 +166,8 @@ public class TaskLockbox
         } else {
           throw new ISE(
               "Could not reacquire lock on interval[%s] version[%s] for task: %s",
-              savedTaskLock.getInterval(),
-              savedTaskLock.getVersion(),
+              savedTaskLockWithPriority.getInterval(),
+              savedTaskLockWithPriority.getVersion(),
               task.getId()
           );
         }
@@ -382,11 +388,14 @@ public class TaskLockbox
           taskLock.getDataSource(),
           task.getDataSource()
       );
+      final int taskPriority = task.getPriority();
+      final int lockPriority = taskLock.getNonNullPriority();
+
       Preconditions.checkArgument(
-          task.getPriority() == taskLock.getPriority(),
+          lockPriority == taskPriority,
           "lock priority[%s] is different from task priority[%s]",
-          taskLock.getPriority(),
-          task.getPriority()
+          lockPriority,
+          taskPriority
       );
 
       return createOrFindLockPosse(
@@ -396,7 +405,7 @@ public class TaskLockbox
           taskLock.getDataSource(),
           taskLock.getInterval(),
           taskLock.getVersion(),
-          taskLock.getPriority(),
+          taskPriority,
           taskLock.isRevoked()
       );
     }
@@ -925,7 +934,7 @@ public class TaskLockbox
   private static boolean isRevocable(TaskLockPosse lockPosse, int tryLockPriority)
   {
     final TaskLock existingLock = lockPosse.getTaskLock();
-    return existingLock.isRevoked() || existingLock.getPriority() < tryLockPriority;
+    return existingLock.isRevoked() || existingLock.getNonNullPriority() < tryLockPriority;
   }
 
   private TaskLockPosse getOnlyTaskLockPosseContainingInterval(Task task, Interval interval)
@@ -986,7 +995,7 @@ public class TaskLockbox
     boolean addTask(Task task)
     {
       Preconditions.checkArgument(taskLock.getGroupId().equals(task.getGroupId()));
-      Preconditions.checkArgument(taskLock.getPriority() == task.getPriority());
+      Preconditions.checkArgument(taskLock.getNonNullPriority() == task.getPriority());
       return taskIds.add(task.getId());
     }
 
diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java b/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java
index aeb3141..8e4be31 100644
--- a/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java
+++ b/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java
@@ -41,7 +41,6 @@ import io.druid.indexing.common.TaskStatus;
 import io.druid.indexing.common.actions.TaskActionClient;
 import io.druid.indexing.common.actions.TaskActionHolder;
 import io.druid.indexing.common.task.Task;
-import io.druid.indexing.common.task.Tasks;
 import io.druid.indexing.overlord.IndexerMetadataStorageAdapter;
 import io.druid.indexing.overlord.TaskMaster;
 import io.druid.indexing.overlord.TaskQueue;
@@ -165,11 +164,6 @@ public class OverlordResource
           public Response apply(TaskQueue taskQueue)
           {
             try {
-              // Set default priority if needed
-              final Integer priority = task.getContextValue(Tasks.PRIORITY_KEY);
-              if (priority == null) {
-                task.addToContext(Tasks.PRIORITY_KEY, task.getDefaultPriority());
-              }
               taskQueue.add(task);
               return Response.ok(ImmutableMap.of("task", task.getId())).build();
             }
diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLockboxTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLockboxTest.java
index 0aa90d6..ed16b9b 100644
--- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLockboxTest.java
+++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLockboxTest.java
@@ -19,6 +19,9 @@
 
 package io.druid.indexing.overlord;
 
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.Iterables;
 import io.druid.indexing.common.TaskLock;
@@ -262,6 +265,84 @@ public class TaskLockboxTest
   }
 
   @Test
+  public void testSyncFromStorageWithMissingTaskLockPriority() throws EntryExistsException
+  {
+    final Task task = NoopTask.create();
+    taskStorage.insert(task, TaskStatus.running(task.getId()));
+    taskStorage.addLock(
+        task.getId(),
+        new TaskLockWithoutPriority(task.getGroupId(), task.getDataSource(), Intervals.of("2017/2018"), "v1")
+    );
+
+    final List<TaskLock> beforeLocksInStorage = taskStorage.getActiveTasks().stream()
+                                                           .flatMap(t -> taskStorage.getLocks(t.getId()).stream())
+                                                           .collect(Collectors.toList());
+
+    final TaskLockbox lockbox = new TaskLockbox(taskStorage);
+    lockbox.syncFromStorage();
+
+    final List<TaskLock> afterLocksInStorage = taskStorage.getActiveTasks().stream()
+                                                          .flatMap(t -> taskStorage.getLocks(t.getId()).stream())
+                                                          .collect(Collectors.toList());
+
+    Assert.assertEquals(beforeLocksInStorage, afterLocksInStorage);
+  }
+
+  @Test
+  public void testSyncFromStorageWithMissingTaskPriority() throws EntryExistsException
+  {
+    final Task task = NoopTask.create();
+    taskStorage.insert(task, TaskStatus.running(task.getId()));
+    taskStorage.addLock(
+        task.getId(),
+        new TaskLock(
+            TaskLockType.EXCLUSIVE,
+            task.getGroupId(),
+            task.getDataSource(),
+            Intervals.of("2017/2018"),
+            "v1",
+            task.getPriority()
+        )
+    );
+
+    final List<TaskLock> beforeLocksInStorage = taskStorage.getActiveTasks().stream()
+                                                           .flatMap(t -> taskStorage.getLocks(t.getId()).stream())
+                                                           .collect(Collectors.toList());
+
+    final TaskLockbox lockbox = new TaskLockbox(taskStorage);
+    lockbox.syncFromStorage();
+
+    final List<TaskLock> afterLocksInStorage = taskStorage.getActiveTasks().stream()
+                                                          .flatMap(t -> taskStorage.getLocks(t.getId()).stream())
+                                                          .collect(Collectors.toList());
+
+    Assert.assertEquals(beforeLocksInStorage, afterLocksInStorage);
+  }
+
+  @Test
+  public void testSyncFromStorageWithInvalidPriority() throws EntryExistsException
+  {
+    final Task task = NoopTask.create();
+    taskStorage.insert(task, TaskStatus.running(task.getId()));
+    taskStorage.addLock(
+        task.getId(),
+        new TaskLock(
+            TaskLockType.EXCLUSIVE,
+            task.getGroupId(),
+            task.getDataSource(),
+            Intervals.of("2017/2018"),
+            "v1",
+            10
+        )
+    );
+
+    final TaskLockbox lockbox = new TaskLockbox(taskStorage);
+    expectedException.expect(IllegalArgumentException.class);
+    expectedException.expectMessage("lock priority[10] is different from task priority[50]");
+    lockbox.syncFromStorage();
+  }
+
+  @Test
   public void testRevokedLockSyncFromStorage() throws EntryExistsException
   {
     final TaskLockbox originalBox = new TaskLockbox(taskStorage);
@@ -504,4 +585,67 @@ public class TaskLockboxTest
                 .flatMap(task -> taskStorage.getLocks(task.getId()).stream())
                 .collect(Collectors.toSet());
   }
+
+  private static class TaskLockWithoutPriority extends TaskLock
+  {
+    @JsonCreator
+    TaskLockWithoutPriority(
+        String groupId,
+        String dataSource,
+        Interval interval,
+        String version
+    )
+    {
+      super(null, groupId, dataSource, interval, version, 0, false);
+    }
+
+    @Override
+    @JsonProperty
+    public TaskLockType getType()
+    {
+      return super.getType();
+    }
+
+    @Override
+    @JsonProperty
+    public String getGroupId()
+    {
+      return super.getGroupId();
+    }
+
+    @Override
+    @JsonProperty
+    public String getDataSource()
+    {
+      return super.getDataSource();
+    }
+
+    @Override
+    @JsonProperty
+    public Interval getInterval()
+    {
+      return super.getInterval();
+    }
+
+    @Override
+    @JsonProperty
+    public String getVersion()
+    {
+      return super.getVersion();
+    }
+
+    @JsonIgnore
+    @Override
+    public Integer getPriority()
+    {
+      return super.getPriority();
+    }
+
+    @JsonIgnore
+    @Override
+    public boolean isRevoked()
+    {
+      return super.isRevoked();
+    }
+  }
 }
diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordTest.java
index f389351..8bfdc3c 100644
--- a/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordTest.java
+++ b/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordTest.java
@@ -37,7 +37,6 @@ import io.druid.indexing.common.actions.TaskActionClientFactory;
 import io.druid.indexing.common.config.TaskStorageConfig;
 import io.druid.indexing.common.task.NoopTask;
 import io.druid.indexing.common.task.Task;
-import io.druid.indexing.common.task.Tasks;
 import io.druid.indexing.overlord.HeapMemoryTaskStorage;
 import io.druid.indexing.overlord.IndexerMetadataStorageAdapter;
 import io.druid.indexing.overlord.TaskLockbox;
@@ -81,7 +80,6 @@ import javax.ws.rs.core.Response;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
@@ -232,12 +230,6 @@ public class OverlordTest
     Assert.assertEquals(200, response.getStatus());
     Assert.assertEquals(ImmutableMap.of("task", taskId_0), response.getEntity());
 
-    final Map<String, Object> context = task_0.getContext();
-    Assert.assertEquals(1, context.size());
-    final Integer priority = (Integer) context.get(Tasks.PRIORITY_KEY);
-    Assert.assertNotNull(priority);
-    Assert.assertEquals(Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY, priority.intValue());
-
     // Duplicate task - should fail
     response = overlordResource.taskPost(task_0, req);
     Assert.assertEquals(400, response.getStatus());


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org