You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ji...@apache.org on 2018/08/04 02:23:41 UTC
[incubator-druid] branch 0.12.2 updated: [Backport] Fix
IllegalArgumentException in TaskLockBox.syncFromStorage() when updating
from 0.12.x to 0.12.2. (#6106)
This is an automated email from the ASF dual-hosted git repository.
jihoonson pushed a commit to branch 0.12.2
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/0.12.2 by this push:
new 7cc4fa6 [Backport] Fix IllegalArgumentException in TaskLockBox.syncFromStorage() when updating from 0.12.x to 0.12.2. (#6106)
7cc4fa6 is described below
commit 7cc4fa6355eef876846d358c40c5cce07fcfabfa
Author: Jihoon Son <ji...@apache.org>
AuthorDate: Fri Aug 3 19:23:38 2018 -0700
[Backport] Fix IllegalArgumentException in TaskLockBox.syncFromStorage() when updating from 0.12.x to 0.12.2. (#6106)
---
.../io/druid/indexing/kafka/KafkaIndexTask.java | 4 +-
.../java/io/druid/indexing/common/TaskLock.java | 27 +++-
.../druid/indexing/common/task/CompactionTask.java | 4 +-
.../indexing/common/task/HadoopIndexTask.java | 4 +-
.../io/druid/indexing/common/task/IndexTask.java | 4 +-
.../druid/indexing/common/task/MergeTaskBase.java | 4 +-
.../io/druid/indexing/common/task/NoopTask.java | 4 +-
.../indexing/common/task/RealtimeIndexTask.java | 4 +-
.../java/io/druid/indexing/common/task/Task.java | 8 --
.../io/druid/indexing/overlord/TaskLockbox.java | 35 +++--
.../indexing/overlord/http/OverlordResource.java | 6 -
.../druid/indexing/overlord/TaskLockboxTest.java | 144 +++++++++++++++++++++
.../druid/indexing/overlord/http/OverlordTest.java | 8 --
13 files changed, 203 insertions(+), 53 deletions(-)
diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
index 76468b6..5f039c0 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
@@ -299,9 +299,9 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
}
@Override
- public int getDefaultPriority()
+ public int getPriority()
{
- return Tasks.DEFAULT_REALTIME_TASK_PRIORITY;
+ return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_REALTIME_TASK_PRIORITY);
}
@Override
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/TaskLock.java b/indexing-service/src/main/java/io/druid/indexing/common/TaskLock.java
index 5c7ca14..273335e 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/TaskLock.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/TaskLock.java
@@ -37,9 +37,22 @@ public class TaskLock
private final String dataSource;
private final Interval interval;
private final String version;
- private final int priority;
+ private final Integer priority;
private final boolean revoked;
+ public static TaskLock withPriority(TaskLock lock, int priority)
+ {
+ return new TaskLock(
+ lock.type,
+ lock.getGroupId(),
+ lock.getDataSource(),
+ lock.getInterval(),
+ lock.getVersion(),
+ priority,
+ lock.isRevoked()
+ );
+ }
+
@JsonCreator
public TaskLock(
@JsonProperty("type") @Nullable TaskLockType type, // nullable for backward compatibility
@@ -47,7 +60,7 @@ public class TaskLock
@JsonProperty("dataSource") String dataSource,
@JsonProperty("interval") Interval interval,
@JsonProperty("version") String version,
- @JsonProperty("priority") int priority,
+ @JsonProperty("priority") @Nullable Integer priority,
@JsonProperty("revoked") boolean revoked
)
{
@@ -116,11 +129,17 @@ public class TaskLock
}
@JsonProperty
- public int getPriority()
+ @Nullable
+ public Integer getPriority()
{
return priority;
}
+ public int getNonNullPriority()
+ {
+ return Preconditions.checkNotNull(priority, "priority");
+ }
+
@JsonProperty
public boolean isRevoked()
{
@@ -139,7 +158,7 @@ public class TaskLock
this.dataSource.equals(that.dataSource) &&
this.interval.equals(that.interval) &&
this.version.equals(that.version) &&
- this.priority == that.priority &&
+ Objects.equal(this.priority, that.priority) &&
this.revoked == that.revoked;
}
}
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/CompactionTask.java
index ad77601..5e94285 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/CompactionTask.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/CompactionTask.java
@@ -160,9 +160,9 @@ public class CompactionTask extends AbstractTask
}
@Override
- public int getDefaultPriority()
+ public int getPriority()
{
- return Tasks.DEFAULT_MERGE_TASK_PRIORITY;
+ return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_MERGE_TASK_PRIORITY);
}
@VisibleForTesting
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java
index 6a08a82..4386b5b 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java
@@ -121,9 +121,9 @@ public class HadoopIndexTask extends HadoopTask
}
@Override
- public int getDefaultPriority()
+ public int getPriority()
{
- return Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY;
+ return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY);
}
@Override
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java
index 6cbc278..b9ec270 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java
@@ -170,9 +170,9 @@ public class IndexTask extends AbstractTask
}
@Override
- public int getDefaultPriority()
+ public int getPriority()
{
- return Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY;
+ return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY);
}
@Override
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java b/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java
index 3a9fc97..acf1be0 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java
@@ -133,9 +133,9 @@ public abstract class MergeTaskBase extends AbstractFixedIntervalTask
}
@Override
- public int getDefaultPriority()
+ public int getPriority()
{
- return Tasks.DEFAULT_MERGE_TASK_PRIORITY;
+ return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_MERGE_TASK_PRIORITY);
}
@Override
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/NoopTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/NoopTask.java
index 6c6358d..e202774 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/NoopTask.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/NoopTask.java
@@ -152,9 +152,9 @@ public class NoopTask extends AbstractTask
}
@Override
- public int getDefaultPriority()
+ public int getPriority()
{
- return Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY;
+ return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY);
}
public static NoopTask create()
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java
index 053df11..b3a12f7 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java
@@ -162,9 +162,9 @@ public class RealtimeIndexTask extends AbstractTask
}
@Override
- public int getDefaultPriority()
+ public int getPriority()
{
- return Tasks.DEFAULT_REALTIME_TASK_PRIORITY;
+ return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_REALTIME_TASK_PRIORITY);
}
@Override
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java b/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java
index c69b6b5..1c7a134 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java
@@ -98,14 +98,6 @@ public interface Task
}
/**
- * Returns the default task priority. It can vary depending on the task type.
- */
- default int getDefaultPriority()
- {
- return Tasks.DEFAULT_TASK_PRIORITY;
- }
-
- /**
* Returns a {@link TaskResource} for this task. Task resources define specific worker requirements a task may
* require.
*
diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskLockbox.java b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskLockbox.java
index 286296d..8488257 100644
--- a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskLockbox.java
+++ b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskLockbox.java
@@ -130,17 +130,23 @@ public class TaskLockbox
final TaskLock savedTaskLock = taskAndLock.rhs;
if (savedTaskLock.getInterval().toDurationMillis() <= 0) {
// "Impossible", but you never know what crazy stuff can be restored from storage.
- log.warn("WTF?! Got lock with empty interval for task: %s", task.getId());
+ log.warn("WTF?! Got lock[%s] with empty interval for task: %s", savedTaskLock, task.getId());
continue;
}
- final TaskLockPosse taskLockPosse = createOrFindLockPosse(task, savedTaskLock);
+ // Create a new taskLock if it doesn't have a proper priority,
+ // so that every taskLock in memory has the priority.
+ final TaskLock savedTaskLockWithPriority = savedTaskLock.getPriority() == null
+ ? TaskLock.withPriority(savedTaskLock, task.getPriority())
+ : savedTaskLock;
+
+ final TaskLockPosse taskLockPosse = createOrFindLockPosse(task, savedTaskLockWithPriority);
if (taskLockPosse != null) {
taskLockPosse.addTask(task);
final TaskLock taskLock = taskLockPosse.getTaskLock();
- if (savedTaskLock.getVersion().equals(taskLock.getVersion())) {
+ if (savedTaskLockWithPriority.getVersion().equals(taskLock.getVersion())) {
taskLockCount++;
log.info(
"Reacquired lock[%s] for task: %s",
@@ -151,8 +157,8 @@ public class TaskLockbox
taskLockCount++;
log.info(
"Could not reacquire lock on interval[%s] version[%s] (got version[%s] instead) for task: %s",
- savedTaskLock.getInterval(),
- savedTaskLock.getVersion(),
+ savedTaskLockWithPriority.getInterval(),
+ savedTaskLockWithPriority.getVersion(),
taskLock.getVersion(),
task.getId()
);
@@ -160,8 +166,8 @@ public class TaskLockbox
} else {
throw new ISE(
"Could not reacquire lock on interval[%s] version[%s] for task: %s",
- savedTaskLock.getInterval(),
- savedTaskLock.getVersion(),
+ savedTaskLockWithPriority.getInterval(),
+ savedTaskLockWithPriority.getVersion(),
task.getId()
);
}
@@ -382,11 +388,14 @@ public class TaskLockbox
taskLock.getDataSource(),
task.getDataSource()
);
+ final int taskPriority = task.getPriority();
+ final int lockPriority = taskLock.getNonNullPriority();
+
Preconditions.checkArgument(
- task.getPriority() == taskLock.getPriority(),
+ lockPriority == taskPriority,
"lock priority[%s] is different from task priority[%s]",
- taskLock.getPriority(),
- task.getPriority()
+ lockPriority,
+ taskPriority
);
return createOrFindLockPosse(
@@ -396,7 +405,7 @@ public class TaskLockbox
taskLock.getDataSource(),
taskLock.getInterval(),
taskLock.getVersion(),
- taskLock.getPriority(),
+ taskPriority,
taskLock.isRevoked()
);
}
@@ -925,7 +934,7 @@ public class TaskLockbox
private static boolean isRevocable(TaskLockPosse lockPosse, int tryLockPriority)
{
final TaskLock existingLock = lockPosse.getTaskLock();
- return existingLock.isRevoked() || existingLock.getPriority() < tryLockPriority;
+ return existingLock.isRevoked() || existingLock.getNonNullPriority() < tryLockPriority;
}
private TaskLockPosse getOnlyTaskLockPosseContainingInterval(Task task, Interval interval)
@@ -986,7 +995,7 @@ public class TaskLockbox
boolean addTask(Task task)
{
Preconditions.checkArgument(taskLock.getGroupId().equals(task.getGroupId()));
- Preconditions.checkArgument(taskLock.getPriority() == task.getPriority());
+ Preconditions.checkArgument(taskLock.getNonNullPriority() == task.getPriority());
return taskIds.add(task.getId());
}
diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java b/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java
index aeb3141..8e4be31 100644
--- a/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java
+++ b/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java
@@ -41,7 +41,6 @@ import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.indexing.common.actions.TaskActionHolder;
import io.druid.indexing.common.task.Task;
-import io.druid.indexing.common.task.Tasks;
import io.druid.indexing.overlord.IndexerMetadataStorageAdapter;
import io.druid.indexing.overlord.TaskMaster;
import io.druid.indexing.overlord.TaskQueue;
@@ -165,11 +164,6 @@ public class OverlordResource
public Response apply(TaskQueue taskQueue)
{
try {
- // Set default priority if needed
- final Integer priority = task.getContextValue(Tasks.PRIORITY_KEY);
- if (priority == null) {
- task.addToContext(Tasks.PRIORITY_KEY, task.getDefaultPriority());
- }
taskQueue.add(task);
return Response.ok(ImmutableMap.of("task", task.getId())).build();
}
diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLockboxTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLockboxTest.java
index 0aa90d6..ed16b9b 100644
--- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLockboxTest.java
+++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLockboxTest.java
@@ -19,6 +19,9 @@
package io.druid.indexing.overlord;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Iterables;
import io.druid.indexing.common.TaskLock;
@@ -262,6 +265,84 @@ public class TaskLockboxTest
}
@Test
+ public void testSyncFromStorageWithMissingTaskLockPriority() throws EntryExistsException
+ {
+ final Task task = NoopTask.create();
+ taskStorage.insert(task, TaskStatus.running(task.getId()));
+ taskStorage.addLock(
+ task.getId(),
+ new TaskLockWithoutPriority(task.getGroupId(), task.getDataSource(), Intervals.of("2017/2018"), "v1")
+ );
+
+ final List<TaskLock> beforeLocksInStorage = taskStorage.getActiveTasks().stream()
+ .flatMap(t -> taskStorage.getLocks(t.getId()).stream())
+ .collect(Collectors.toList());
+
+ final TaskLockbox lockbox = new TaskLockbox(taskStorage);
+ lockbox.syncFromStorage();
+
+ final List<TaskLock> afterLocksInStorage = taskStorage.getActiveTasks().stream()
+ .flatMap(t -> taskStorage.getLocks(t.getId()).stream())
+ .collect(Collectors.toList());
+
+ Assert.assertEquals(beforeLocksInStorage, afterLocksInStorage);
+ }
+
+ @Test
+ public void testSyncFromStorageWithMissingTaskPriority() throws EntryExistsException
+ {
+ final Task task = NoopTask.create();
+ taskStorage.insert(task, TaskStatus.running(task.getId()));
+ taskStorage.addLock(
+ task.getId(),
+ new TaskLock(
+ TaskLockType.EXCLUSIVE,
+ task.getGroupId(),
+ task.getDataSource(),
+ Intervals.of("2017/2018"),
+ "v1",
+ task.getPriority()
+ )
+ );
+
+ final List<TaskLock> beforeLocksInStorage = taskStorage.getActiveTasks().stream()
+ .flatMap(t -> taskStorage.getLocks(t.getId()).stream())
+ .collect(Collectors.toList());
+
+ final TaskLockbox lockbox = new TaskLockbox(taskStorage);
+ lockbox.syncFromStorage();
+
+ final List<TaskLock> afterLocksInStorage = taskStorage.getActiveTasks().stream()
+ .flatMap(t -> taskStorage.getLocks(t.getId()).stream())
+ .collect(Collectors.toList());
+
+ Assert.assertEquals(beforeLocksInStorage, afterLocksInStorage);
+ }
+
+ @Test
+ public void testSyncFromStorageWithInvalidPriority() throws EntryExistsException
+ {
+ final Task task = NoopTask.create();
+ taskStorage.insert(task, TaskStatus.running(task.getId()));
+ taskStorage.addLock(
+ task.getId(),
+ new TaskLock(
+ TaskLockType.EXCLUSIVE,
+ task.getGroupId(),
+ task.getDataSource(),
+ Intervals.of("2017/2018"),
+ "v1",
+ 10
+ )
+ );
+
+ final TaskLockbox lockbox = new TaskLockbox(taskStorage);
+ expectedException.expect(IllegalArgumentException.class);
+ expectedException.expectMessage("lock priority[10] is different from task priority[50]");
+ lockbox.syncFromStorage();
+ }
+
+ @Test
public void testRevokedLockSyncFromStorage() throws EntryExistsException
{
final TaskLockbox originalBox = new TaskLockbox(taskStorage);
@@ -504,4 +585,67 @@ public class TaskLockboxTest
.flatMap(task -> taskStorage.getLocks(task.getId()).stream())
.collect(Collectors.toSet());
}
+
+ private static class TaskLockWithoutPriority extends TaskLock
+ {
+ @JsonCreator
+ TaskLockWithoutPriority(
+ String groupId,
+ String dataSource,
+ Interval interval,
+ String version
+ )
+ {
+ super(null, groupId, dataSource, interval, version, 0, false);
+ }
+
+ @Override
+ @JsonProperty
+ public TaskLockType getType()
+ {
+ return super.getType();
+ }
+
+ @Override
+ @JsonProperty
+ public String getGroupId()
+ {
+ return super.getGroupId();
+ }
+
+ @Override
+ @JsonProperty
+ public String getDataSource()
+ {
+ return super.getDataSource();
+ }
+
+ @Override
+ @JsonProperty
+ public Interval getInterval()
+ {
+ return super.getInterval();
+ }
+
+ @Override
+ @JsonProperty
+ public String getVersion()
+ {
+ return super.getVersion();
+ }
+
+ @JsonIgnore
+ @Override
+ public Integer getPriority()
+ {
+ return super.getPriority();
+ }
+
+ @JsonIgnore
+ @Override
+ public boolean isRevoked()
+ {
+ return super.isRevoked();
+ }
+ }
}
diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordTest.java
index f389351..8bfdc3c 100644
--- a/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordTest.java
+++ b/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordTest.java
@@ -37,7 +37,6 @@ import io.druid.indexing.common.actions.TaskActionClientFactory;
import io.druid.indexing.common.config.TaskStorageConfig;
import io.druid.indexing.common.task.NoopTask;
import io.druid.indexing.common.task.Task;
-import io.druid.indexing.common.task.Tasks;
import io.druid.indexing.overlord.HeapMemoryTaskStorage;
import io.druid.indexing.overlord.IndexerMetadataStorageAdapter;
import io.druid.indexing.overlord.TaskLockbox;
@@ -81,7 +80,6 @@ import javax.ws.rs.core.Response;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
-import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
@@ -232,12 +230,6 @@ public class OverlordTest
Assert.assertEquals(200, response.getStatus());
Assert.assertEquals(ImmutableMap.of("task", taskId_0), response.getEntity());
- final Map<String, Object> context = task_0.getContext();
- Assert.assertEquals(1, context.size());
- final Integer priority = (Integer) context.get(Tasks.PRIORITY_KEY);
- Assert.assertNotNull(priority);
- Assert.assertEquals(Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY, priority.intValue());
-
// Duplicate task - should fail
response = overlordResource.taskPost(task_0, req);
Assert.assertEquals(400, response.getStatus());
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org