You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2018/11/14 18:30:26 UTC

[GitHub] gianm closed pull request #1679: Priority based task locking

gianm closed pull request #1679: Priority based task locking
URL: https://github.com/apache/incubator-druid/pull/1679
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/common/src/main/java/io/druid/metadata/MetadataStorageActionHandler.java b/common/src/main/java/io/druid/metadata/MetadataStorageActionHandler.java
index 9087d4d0f5e..254a0c45e7a 100644
--- a/common/src/main/java/io/druid/metadata/MetadataStorageActionHandler.java
+++ b/common/src/main/java/io/druid/metadata/MetadataStorageActionHandler.java
@@ -102,6 +102,14 @@ public void insert(
    */
   public boolean addLock(String entryId, LockType lock);
 
+  /**
+   * Sets the lock with {@param lockId} to {@param lock}
+   *
+   * @param lockId lock id
+   * @return true if lock was set
+   * */
+  public boolean setLock(long lockId, LockType lock);
+
   /**
    * Remove the lock with the given lock id.
    *
diff --git a/docs/content/ingestion/tasks.md b/docs/content/ingestion/tasks.md
index ead17089335..784fba434dd 100644
--- a/docs/content/ingestion/tasks.md
+++ b/docs/content/ingestion/tasks.md
@@ -89,9 +89,34 @@ The Index Task is a simpler variation of the Index Hadoop task that is designed
 |property|description|required?|
 |--------|-----------|---------|
 |type|The task type, this should always be "index".|yes|
-|id|The task ID. If this is not explicitly specified, Druid generates the task ID using the name of the task file and date-time stamp. |no|
+|id|The task ID. If this is not explicitly specified, Druid generates the task ID using task type, data source name, interval, date-time stamp. |no|
 |spec|The ingestion spec. See below for more details. |yes|
 
+#### Task Priority
+
+This is applicable only when this feature is enabled by setting `druid.indexer.taskLockboxVersion` to `v2`.
+Task priority is used for acquiring a lock on an interval for a datasource.
+Tasks with higher priority can preempt lower-priority tasks for the same datasource and interval if ran concurrently.
+Priority order for different task types - Realtime Index Task > Hadoop/Index Task > Merge/Append Task > Other Tasks.
+Tasks of same priority cannot preempt one another.
+   - Default lock priorities for task
+       - Realtime Index Task - 75
+       - Hadoop/Index Task - 50
+       - Merge/Append Task - 25
+       - Other Tasks - 0
+
+Higher the number, higher the priority. Default priority can be overridden by setting context in task json like this -
+
+```
+"context" {
+    "lockPriority" : "80"
+}
+```
+
+For example, if a Hadoop Index task is running and a Realtime Index task starts that wants to publish a segment for the
+same (or overlapping) interval for the same datasource, then it will override the task locks of the Hadoop Index task.
+Consequently, the Hadoop Index task will fail before publishing the segment.
+
 #### DataSchema
 
 This field is required.
@@ -270,8 +295,13 @@ These tasks start, sleep for a time and are used only for testing. The available
 
 Locking
 -------
-
-Once an overlord node accepts a task, a lock is created for the data source and interval specified in the task. 
-Tasks do not need to explicitly release locks, they are released upon task completion. Tasks may potentially release 
-locks early if they desire. Tasks ids are unique by naming them using UUIDs or the timestamp in which the task was created. 
+Once an overlord node accepts a task, a priority based lock is created for the data source and interval specified in the task,
+where priority is based on the task type. Tasks do not need to explicitly release locks, they are released upon task completion.
+Tasks may potentially release locks early if they desire or their lock can be overridden by a high priority task.
+Tasks ids are unique by naming them using UUIDs or the timestamp in which the task was created.
 Tasks are also part of a "task group", which is a set of tasks that can share interval locks.
+Before committing the work (publishing segments), tasks upgrade their lock, failing to do so will result in task failure.
+The task will be able to upgrade if no other higher priority task came along and revoked its lock.
+Upgraded lock indicates that this lock cannot be overridden and other tasks have to wait for lock release.
+Note - In case Priority based locking is disabled (i.e. `druid.indexer.taskLockboxVersion` is not set to `v2`) then all
+tasks will have the default priority of 0 and lock is created only for the data source and interval specified in the task.
diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
index 3639bebd62a..1549f1f517f 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
@@ -52,7 +52,9 @@
 import io.druid.indexing.common.TaskStatus;
 import io.druid.indexing.common.TaskToolbox;
 import io.druid.indexing.common.actions.SegmentTransactionalInsertAction;
+import io.druid.indexing.common.actions.SetLockCriticalStateAction;
 import io.druid.indexing.common.actions.TaskActionClient;
+import io.druid.indexing.common.actions.TaskLockCriticalState;
 import io.druid.indexing.common.task.AbstractTask;
 import io.druid.indexing.common.task.TaskResource;
 import io.druid.query.DruidMetrics;
@@ -231,6 +233,12 @@ public KafkaIOConfig getIOConfig()
     return ioConfig;
   }
 
+  @Override
+  public int getLockPriority()
+  {
+    return getLockPriority(REALTIME_TASK_PRIORITY);
+  }
+
   @Override
   public TaskStatus run(final TaskToolbox toolbox) throws Exception
   {
@@ -483,6 +491,12 @@ public boolean publishSegments(Set<DataSegment> segments, Object commitMetadata)
             throw new ISE("WTF?! Driver attempted to publish invalid metadata[%s].", commitMetadata);
           }
 
+          // Upgrade TaskLocks for all segments
+          for (DataSegment segment: segments) {
+            toolbox.getTaskActionClient().submit(new SetLockCriticalStateAction(segment.getInterval(),
+                                                                                TaskLockCriticalState.UPGRADE));
+          }
+
           final SegmentTransactionalInsertAction action;
 
           if (ioConfig.isUseTransaction()) {
diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java
index 4f7f57131f8..3e088f66b05 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -72,6 +72,8 @@
 import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
 import io.druid.indexing.overlord.MetadataTaskStorage;
 import io.druid.indexing.overlord.TaskLockbox;
+import io.druid.indexing.overlord.TaskLockboxV1;
+import io.druid.indexing.overlord.TaskLockboxV2;
 import io.druid.indexing.overlord.TaskStorage;
 import io.druid.indexing.test.TestDataSegmentAnnouncer;
 import io.druid.indexing.test.TestDataSegmentKiller;
@@ -158,8 +160,11 @@
   private TaskStorage taskStorage;
   private TaskLockbox taskLockbox;
   private File directory;
+  private String taskLockboxVersion;
 
   private final List<Task> runningTasks = Lists.newArrayList();
+  private static final String TASKLOCKBOX_V1 = "v1";
+  private static final String TASKLOCKBOX_V2 = "v2";
 
   private static final Logger log = new Logger(KafkaIndexTaskTest.class);
   private static final ObjectMapper objectMapper = new DefaultObjectMapper();
@@ -203,15 +208,16 @@
     );
   }
 
-  @Parameterized.Parameters(name = "buildV9Directly = {0}")
+  @Parameterized.Parameters(name = "buildV9Directly = {0}, taskLockBoxVersion={1}")
   public static Iterable<Object[]> constructorFeeder()
   {
-    return ImmutableList.of(new Object[]{true}, new Object[]{false});
+    return ImmutableList.of(new Object[]{true, TASKLOCKBOX_V1}, new Object[]{false, TASKLOCKBOX_V2});
   }
 
-  public KafkaIndexTaskTest(boolean buildV9Directly)
+  public KafkaIndexTaskTest(boolean buildV9Directly, String taskLockboxVersion)
   {
     this.buildV9Directly = buildV9Directly;
+    this.taskLockboxVersion = taskLockboxVersion;
   }
 
   @Rule
@@ -1350,7 +1356,11 @@ private void makeToolboxFactory() throws IOException
         derby.metadataTablesConfigSupplier().get(),
         derbyConnector
     );
-    taskLockbox = new TaskLockbox(taskStorage);
+    if (taskLockboxVersion.equals(TASKLOCKBOX_V2)) {
+      taskLockbox = new TaskLockboxV2(taskStorage);
+    } else {
+      taskLockbox = new TaskLockboxV1(taskStorage);
+    }
     final TaskActionToolbox taskActionToolbox = new TaskActionToolbox(
         taskLockbox,
         metadataStorageCoordinator,
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/TaskLock.java b/indexing-service/src/main/java/io/druid/indexing/common/TaskLock.java
index ccbf721bb6f..24900bcdbbe 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/TaskLock.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/TaskLock.java
@@ -26,26 +26,45 @@
 
 /**
  * Represents a lock held by some task. Immutable.
+ *
  */
 public class TaskLock
 {
+  /**
+   * Represents the groupdId for the lock, tasks having same groupdId can share TaskLock
+   * */
   private final String groupId;
   private final String dataSource;
   private final Interval interval;
+  /**
+   * This version will be used to publish the segments
+   * */
   private final String version;
+  /**
+   * Priority used for acquiring the lock, value depends on the task type
+   * */
+  private final int priority;
+  /**
+   * If false this lock can be revoked by a higher priority TaskLock otherwise not
+   * */
+  private final boolean upgraded;
 
   @JsonCreator
   public TaskLock(
       @JsonProperty("groupId") String groupId,
       @JsonProperty("dataSource") String dataSource,
       @JsonProperty("interval") Interval interval,
-      @JsonProperty("version") String version
+      @JsonProperty("version") String version,
+      @JsonProperty("priority") int priority,
+      @JsonProperty("upgraded") boolean upgraded
   )
   {
     this.groupId = groupId;
     this.dataSource = dataSource;
     this.interval = interval;
     this.version = version;
+    this.priority = priority;
+    this.upgraded = upgraded;
   }
 
   @JsonProperty
@@ -72,6 +91,29 @@ public String getVersion()
     return version;
   }
 
+  @JsonProperty
+  public int getPriority()
+  {
+    return priority;
+  }
+
+  @JsonProperty
+  public boolean isUpgraded()
+  {
+    return upgraded;
+  }
+
+  public TaskLock withUpgraded(boolean upgraded) {
+    return new TaskLock(
+        getGroupId(),
+        getDataSource(),
+        getInterval(),
+        getVersion(),
+        getPriority(),
+        upgraded
+    );
+  }
+
   @Override
   public boolean equals(Object o)
   {
@@ -82,14 +124,16 @@ public boolean equals(Object o)
       return Objects.equal(this.groupId, x.groupId) &&
              Objects.equal(this.dataSource, x.dataSource) &&
              Objects.equal(this.interval, x.interval) &&
-             Objects.equal(this.version, x.version);
+             Objects.equal(this.version, x.version) &&
+             Objects.equal(this.priority, x.priority) &&
+             Objects.equal(this.upgraded, x.upgraded); // added priority and upgraded to equals check
     }
   }
 
   @Override
   public int hashCode()
   {
-    return Objects.hashCode(groupId, dataSource, interval, version);
+    return Objects.hashCode(groupId, dataSource, interval, version, priority, upgraded);
   }
 
   @Override
@@ -100,6 +144,8 @@ public String toString()
                   .add("dataSource", dataSource)
                   .add("interval", interval)
                   .add("version", version)
+                  .add("priority", priority)
+                  .add("upgraded", upgraded)
                   .toString();
   }
 }
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/actions/RemoteTaskActionClient.java b/indexing-service/src/main/java/io/druid/indexing/common/actions/RemoteTaskActionClient.java
index 3bbad6dd1dd..0f45f4bcd49 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/actions/RemoteTaskActionClient.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/actions/RemoteTaskActionClient.java
@@ -122,7 +122,7 @@ public RemoteTaskActionClient(
           // Want to retry, so throw an IOException.
           throw new IOException(
               String.format(
-                  "Scary HTTP status returned: %s. Check your overlord[%s] logs for exceptions.",
+                  "Scary HTTP status returned: %s. Check your overlord [%s] logs for exceptions.",
                   response.getStatus(),
                   server.getHost()
               )
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/actions/SetLockCriticalStateAction.java b/indexing-service/src/main/java/io/druid/indexing/common/actions/SetLockCriticalStateAction.java
new file mode 100644
index 00000000000..c11bd16af8a
--- /dev/null
+++ b/indexing-service/src/main/java/io/druid/indexing/common/actions/SetLockCriticalStateAction.java
@@ -0,0 +1,116 @@
+/*
+* Licensed to Metamarkets Group Inc. (Metamarkets) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. Metamarkets licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package io.druid.indexing.common.actions;
+
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.type.TypeReference;
+import io.druid.indexing.common.task.Task;
+import org.joda.time.Interval;
+
+import java.io.IOException;
+
+public class SetLockCriticalStateAction implements TaskAction<Boolean> {
+
+  @JsonIgnore
+  private final Interval interval;
+  @JsonIgnore
+  private final TaskLockCriticalState taskLockCriticalState;
+
+  @JsonCreator
+  public SetLockCriticalStateAction(
+      @JsonProperty("interval") Interval interval,
+      @JsonProperty("taskLockCriticalState") TaskLockCriticalState taskLockCriticalState
+  )
+  {
+    this.taskLockCriticalState = taskLockCriticalState;
+    this.interval = interval;
+  }
+
+  @JsonProperty
+  public Interval getInterval()
+  {
+    return interval;
+  }
+
+  @JsonProperty
+  public TaskLockCriticalState getTaskLockCriticalState()
+  {
+    return taskLockCriticalState;
+  }
+
+  @Override
+  public TypeReference<Boolean> getReturnTypeReference()
+  {
+    return new TypeReference<Boolean>()
+    {
+    };
+  }
+
+  @Override
+  public Boolean perform(
+      Task task, TaskActionToolbox toolbox
+  ) throws IOException
+  {
+    return toolbox.getTaskLockbox().setTaskLockCriticalState(task, interval, taskLockCriticalState);
+  }
+
+  @Override
+  public boolean isAudited()
+  {
+    return false;
+  }
+
+  @Override
+  public String toString(){
+    return "SetLockCriticalStateAction{ " +
+           "Interval = " + interval + ", TaskLockCriticalState = " + taskLockCriticalState +
+           " }";
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    SetLockCriticalStateAction that = (SetLockCriticalStateAction) o;
+
+    if (!getInterval().equals(that.getInterval())) {
+      return false;
+    }
+    return getTaskLockCriticalState() == that.getTaskLockCriticalState();
+
+  }
+
+  @Override
+  public int hashCode()
+  {
+    int result = getInterval().hashCode();
+    result = 31 * result + getTaskLockCriticalState().hashCode();
+    return result;
+  }
+}
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/actions/TaskAction.java b/indexing-service/src/main/java/io/druid/indexing/common/actions/TaskAction.java
index 26dc41f1d8a..ac243d22278 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/actions/TaskAction.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/actions/TaskAction.java
@@ -38,7 +38,8 @@
     @JsonSubTypes.Type(name = "segmentListUnused", value = SegmentListUnusedAction.class),
     @JsonSubTypes.Type(name = "segmentNuke", value = SegmentNukeAction.class),
     @JsonSubTypes.Type(name = "segmentMetadataUpdate", value = SegmentMetadataUpdateAction.class),
-    @JsonSubTypes.Type(name = "segmentAllocate", value = SegmentAllocateAction.class)
+    @JsonSubTypes.Type(name = "segmentAllocate", value = SegmentAllocateAction.class),
+    @JsonSubTypes.Type(name = "lockCriticalState", value = SetLockCriticalStateAction.class)
 })
 public interface TaskAction<RetType>
 {
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/actions/TaskActionToolbox.java b/indexing-service/src/main/java/io/druid/indexing/common/actions/TaskActionToolbox.java
index 2634bf37f13..757f68f3862 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/actions/TaskActionToolbox.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/actions/TaskActionToolbox.java
@@ -19,7 +19,6 @@
 
 package io.druid.indexing.common.actions;
 
-import com.google.common.base.Preconditions;
 import com.google.common.base.Predicate;
 import com.google.common.collect.Iterables;
 import com.google.inject.Inject;
@@ -73,7 +72,7 @@ public void verifyTaskLocks(
   )
   {
     if (!taskLockCoversSegments(task, segments)) {
-      throw new ISE("Segments not covered by locks for task: %s", task.getId());
+      throw new ISE("Segments not covered by upgraded locks for task: %s", task.getId());
     }
   }
 
@@ -83,6 +82,7 @@ public boolean taskLockCoversSegments(
   )
   {
     // Verify that each of these segments falls under some lock
+    // and each segment is covered by at least a lock which is in upgraded state
 
     // NOTE: It is possible for our lock to be revoked (if the task has failed and given up its locks) after we check
     // NOTE: it and before we perform the segment insert, but, that should be OK since the worst that happens is we
@@ -102,8 +102,19 @@ public boolean apply(TaskLock taskLock)
             }
           }
       );
+      final boolean isUpgraded = Iterables.all(
+          taskLocks, new Predicate<TaskLock>()
+          {
+            @Override
+            public boolean apply(TaskLock input)
+            {
+              return !(input.getInterval().contains(segment.getInterval())
+                       && input.getDataSource().equals(segment.getDataSource())) || input.isUpgraded();
+            }
+          }
+      );
 
-      if (!ok) {
+      if (!ok || !isUpgraded) {
         return false;
       }
     }
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/actions/TaskLockCriticalState.java b/indexing-service/src/main/java/io/druid/indexing/common/actions/TaskLockCriticalState.java
new file mode 100644
index 00000000000..66d8cfd06ea
--- /dev/null
+++ b/indexing-service/src/main/java/io/druid/indexing/common/actions/TaskLockCriticalState.java
@@ -0,0 +1,38 @@
+/*
+* Licensed to Metamarkets Group Inc. (Metamarkets) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. Metamarkets licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package io.druid.indexing.common.actions;
+
+public enum TaskLockCriticalState
+{
+  UPGRADE(true),
+  DOWNGRADE(false);
+
+  private boolean expectedState;
+
+  TaskLockCriticalState(boolean expectedState) {
+    this.expectedState = expectedState;
+  }
+
+  public boolean getExpectedState()
+  {
+    return expectedState;
+  }
+
+}
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/index/YeOldePlumberSchool.java b/indexing-service/src/main/java/io/druid/indexing/common/index/YeOldePlumberSchool.java
index de01f0099a2..cac460c187d 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/index/YeOldePlumberSchool.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/index/YeOldePlumberSchool.java
@@ -175,7 +175,7 @@ public void finishJob()
 
         try {
           // User should have persisted everything by now.
-          Preconditions.checkState(!theSink.swappable(), "All data must be persisted before fininshing the job!");
+          Preconditions.checkState(!theSink.swappable(), "All data must be persisted before finishing the job!");
 
           if (spilled.size() == 0) {
             throw new IllegalStateException("Nothing indexed?");
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/AbstractTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/AbstractTask.java
index 67bf7227d3e..88bc7455e3d 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/AbstractTask.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/AbstractTask.java
@@ -39,6 +39,13 @@
 public abstract class AbstractTask implements Task
 {
   private static final Joiner ID_JOINER = Joiner.on("_");
+  public static final int REALTIME_TASK_PRIORITY = 75;
+  public static final int INDEX_TASK_PRIORITY = 50;
+  public static final int MERGE_TASK_PRIORITY = 25;
+  public static final int DEFAULT_TASK_PRIORITY = 0;
+  // Number of segments to operate on at a time in critical section before downgrading and upgrading the lock again
+  // applicable for Archive, Restore, Move and Kill Task
+  protected static final int DEFAULT_BATCH_SIZE = 20;
 
   @JsonIgnore
   private final String id;
@@ -52,6 +59,7 @@
   @JsonIgnore
   private final String dataSource;
 
+  @JsonIgnore
   private final Map<String, Object> context;
 
   protected AbstractTask(String id, String dataSource, Map<String, Object> context)
@@ -104,6 +112,27 @@ public String getGroupId()
     return groupId;
   }
 
+  /**
+   * {@inheritDoc}
+   * */
+  @JsonIgnore
+  @Override
+  public int getLockPriority() {
+    return getLockPriority(DEFAULT_TASK_PRIORITY);
+  }
+
+  protected int getLockPriority(int defaultPriority) {
+    return getContextValue("lockPriority") == null ? defaultPriority:
+           Integer.parseInt(getContextValue("lockPriority").toString());
+  }
+
+  @JsonIgnore
+  public int getBatchSize() {
+    return getContextValue("batchSize") == null ? DEFAULT_BATCH_SIZE :
+           (Integer.parseInt(getContextValue("batchSize").toString()) > 0 ?
+            Integer.parseInt(getContextValue("batchSize").toString()) : DEFAULT_BATCH_SIZE);
+  }
+
   @JsonProperty("resource")
   @Override
   public TaskResource getTaskResource()
@@ -156,6 +185,7 @@ public String toString()
                   .add("id", id)
                   .add("type", getType())
                   .add("dataSource", dataSource)
+                  .add("lockPriority", getLockPriority())
                   .toString();
   }
 
@@ -176,6 +206,11 @@ public TaskStatus success()
     return TaskStatus.success(getId());
   }
 
+  public TaskStatus failure()
+  {
+    return TaskStatus.failure(getId());
+  }
+
   @Override
   public boolean equals(Object o)
   {
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/ArchiveTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/ArchiveTask.java
index 8a47e0b31fb..9d68de65111 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/ArchiveTask.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/ArchiveTask.java
@@ -27,13 +27,16 @@
 import io.druid.indexing.common.TaskLock;
 import io.druid.indexing.common.TaskStatus;
 import io.druid.indexing.common.TaskToolbox;
+import io.druid.indexing.common.actions.SetLockCriticalStateAction;
 import io.druid.indexing.common.actions.SegmentListUnusedAction;
 import io.druid.indexing.common.actions.SegmentMetadataUpdateAction;
+import io.druid.indexing.common.actions.TaskLockCriticalState;
 import io.druid.timeline.DataSegment;
 import org.joda.time.Interval;
 
 import java.util.List;
 import java.util.Map;
+import java.util.NoSuchElementException;
 
 public class ArchiveTask extends AbstractFixedIntervalTask
 {
@@ -63,8 +66,13 @@ public String getType()
   @Override
   public TaskStatus run(TaskToolbox toolbox) throws Exception
   {
-    // Confirm we have a lock (will throw if there isn't exactly one element)
-    final TaskLock myLock = Iterables.getOnlyElement(getTaskLocks(toolbox));
+    final TaskLock myLock;
+    // Confirm we have a lock and it has not been revoked by a higher priority task
+    try {
+      myLock = Iterables.getOnlyElement(getTaskLocks(toolbox));
+    } catch (NoSuchElementException e) {
+      throw new ISE("No valid lock found, dying now !! Is there a higher priority task running that would have revoked this lock ?");
+    }
 
     if (!myLock.getDataSource().equals(getDataSource())) {
       throw new ISE("WTF?! Lock dataSource[%s] != task dataSource[%s]", myLock.getDataSource(), getDataSource());
@@ -93,12 +101,40 @@ public TaskStatus run(TaskToolbox toolbox) throws Exception
       log.info("OK to archive segment: %s", unusedSegment.getIdentifier());
     }
 
-    // Move segments
+
+    // Archiving segments can take time which seems to be a lot of work to do in critical section
+    // Thus, after each batch downgrade the lock to ensure liveliness of the system
+    // so that higher priority tasks are not blocked for a long time
+    // Note - If we are unable to upgrade the lock again then the task will fail,
+    // there is no point in retrying here as upgrade should fail only when the taskLock is removed by a higher priority task
+    int counter = 0;
     for (DataSegment segment : unusedSegments) {
+      if (counter % getBatchSize() == 0) {
+        // SetLockCriticalStateAction is idempotent
+        if (!toolbox.getTaskActionClient().submit(new SetLockCriticalStateAction(getInterval(), TaskLockCriticalState.DOWNGRADE))) {
+          throw new ISE(
+              "Lock downgrade failed for interval [%s] !! Successfully archived [%s] segments out of [%s] before failing",
+              getInterval(),
+              counter,
+              unusedSegments.size()
+          );
+        }
+
+        // Try to upgrade the lock again - we will be successful if no other higher priority task needs to lock
+        if (!toolbox.getTaskActionClient().submit(new SetLockCriticalStateAction(getInterval(), TaskLockCriticalState.UPGRADE))) {
+          throw new ISE(
+              "Lock upgrade failed for interval [%s] !! Successfully archived [%s] segments out of [%s] before failing",
+              getInterval(),
+              counter,
+              unusedSegments.size()
+          );
+        }
+      }
+      // Move segment
       final DataSegment archivedSegment = toolbox.getDataSegmentArchiver().archive(segment);
       toolbox.getTaskActionClient().submit(new SegmentMetadataUpdateAction(ImmutableSet.of(archivedSegment)));
+      counter++;
     }
-
     return TaskStatus.success(getId());
   }
 }
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/ConvertSegmentTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/ConvertSegmentTask.java
index 526b4c23679..574d67b83fa 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/ConvertSegmentTask.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/ConvertSegmentTask.java
@@ -27,13 +27,16 @@
 import com.google.common.base.Predicate;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
+import com.metamx.common.ISE;
 import com.metamx.common.guava.FunctionalIterable;
 import com.metamx.common.logger.Logger;
 import io.druid.indexing.common.TaskStatus;
 import io.druid.indexing.common.TaskToolbox;
+import io.druid.indexing.common.actions.SetLockCriticalStateAction;
 import io.druid.indexing.common.actions.SegmentInsertAction;
 import io.druid.indexing.common.actions.SegmentListUsedAction;
 import io.druid.indexing.common.actions.TaskActionClient;
+import io.druid.indexing.common.actions.TaskLockCriticalState;
 import io.druid.segment.IndexIO;
 import io.druid.segment.IndexSpec;
 import io.druid.segment.loading.SegmentLoadingException;
@@ -59,7 +62,6 @@
   private static final Integer CURR_VERSION_INTEGER = IndexIO.CURRENT_VERSION_ID;
 
   private static final Logger log = new Logger(ConvertSegmentTask.class);
-
   @JsonIgnore
   private final DataSegment segment;
   private final IndexSpec indexSpec;
@@ -350,7 +352,7 @@ public String getType()
     public TaskStatus run(TaskToolbox toolbox) throws Exception
     {
       log.info("Subs are good!  Italian BMT and Meatball are probably my favorite.");
-      convertSegment(toolbox, segment, indexSpec, force, validate);
+      convertSegment(toolbox, segment, indexSpec, force, validate, getInterval());
       return success();
     }
   }
@@ -360,7 +362,8 @@ private static void convertSegment(
       final DataSegment segment,
       IndexSpec indexSpec,
       boolean force,
-      boolean validate
+      boolean validate,
+      Interval interval // needed to upgrade/downgrade lock, cannot use segment interval as it might be subset of interval for ConvertSegmentTask
   )
       throws SegmentLoadingException, IOException
   {
@@ -392,9 +395,17 @@ private static void convertSegment(
       DataSegment updatedSegment = segment.withVersion(String.format("%s_v%s", segment.getVersion(), outVersion));
       updatedSegment = toolbox.getSegmentPusher().push(outLocation, updatedSegment);
 
-      actionClient.submit(new SegmentInsertAction(Sets.newHashSet(updatedSegment)));
+      if (toolbox.getTaskActionClient().submit(new SetLockCriticalStateAction(interval, TaskLockCriticalState.UPGRADE))) {
+        actionClient.submit(new SegmentInsertAction(Sets.newHashSet(updatedSegment)));
+        log.info("Conversion successful for segment[%s], updated segment - [%s]", segment, updatedSegment);
+      } else {
+        throw new ISE("Lock upgrade failed for interval [%s] !!", interval);
+      }
+      if (!toolbox.getTaskActionClient().submit(new SetLockCriticalStateAction(interval, TaskLockCriticalState.DOWNGRADE))) {
+        throw new ISE("Lock downgrade failed for interval [%s] !!", interval);
+      }
     } else {
-      log.info("Conversion failed.");
+      log.error("Conversion failed for [%s]", segment);
     }
   }
 }
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java
index 41886d9a227..7be590a9e40 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java
@@ -29,6 +29,7 @@
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
+import com.metamx.common.ISE;
 import com.metamx.common.logger.Logger;
 import io.druid.common.utils.JodaUtils;
 import io.druid.indexer.HadoopDruidDetermineConfigurationJob;
@@ -41,8 +42,10 @@
 import io.druid.indexing.common.TaskStatus;
 import io.druid.indexing.common.TaskToolbox;
 import io.druid.indexing.common.actions.LockAcquireAction;
+import io.druid.indexing.common.actions.SetLockCriticalStateAction;
 import io.druid.indexing.common.actions.LockTryAcquireAction;
 import io.druid.indexing.common.actions.TaskActionClient;
+import io.druid.indexing.common.actions.TaskLockCriticalState;
 import io.druid.indexing.hadoop.OverlordActionBasedUsedSegmentLister;
 import io.druid.timeline.DataSegment;
 import org.joda.time.DateTime;
@@ -50,6 +53,8 @@
 
 import java.util.List;
 import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
 import java.util.SortedSet;
 
 public class HadoopIndexTask extends HadoopTask
@@ -129,10 +134,8 @@ public boolean isReady(TaskActionClient taskActionClient) throws Exception
   {
     Optional<SortedSet<Interval>> intervals = spec.getDataSchema().getGranularitySpec().bucketIntervals();
     if (intervals.isPresent()) {
-      Interval interval = JodaUtils.umbrellaInterval(
-          JodaUtils.condenseIntervals(
-              intervals.get()
-          )
+      Interval interval = getUmbrellaInterval(
+          intervals.get()
       );
       return taskActionClient.submit(new LockTryAcquireAction(interval)) != null;
     } else {
@@ -165,6 +168,7 @@ public TaskStatus run(TaskToolbox toolbox) throws Exception
   {
     final ClassLoader loader = buildClassLoader(toolbox);
     boolean determineIntervals = !spec.getDataSchema().getGranularitySpec().bucketIntervals().isPresent();
+    Interval interval;
 
     spec = HadoopIngestionSpec.updateSegmentListIfDatasourcePathSpecIsUsed(
         spec,
@@ -190,16 +194,22 @@ public TaskStatus run(TaskToolbox toolbox) throws Exception
     // We should have a lock from before we started running only if interval was specified
     final String version;
     if (determineIntervals) {
-      Interval interval = JodaUtils.umbrellaInterval(
-          JodaUtils.condenseIntervals(
-              indexerSchema.getDataSchema().getGranularitySpec().bucketIntervals().get()
-          )
+      interval = getUmbrellaInterval(
+          indexerSchema.getDataSchema().getGranularitySpec().bucketIntervals().get()
       );
       TaskLock lock = toolbox.getTaskActionClient().submit(new LockAcquireAction(interval));
       version = lock.getVersion();
     } else {
-      Iterable<TaskLock> locks = getTaskLocks(toolbox);
-      final TaskLock myLock = Iterables.getOnlyElement(locks);
+      interval = getUmbrellaInterval(
+          spec.getDataSchema().getGranularitySpec().bucketIntervals().get()
+      );
+      final TaskLock myLock;
+      // Confirm we have a lock and it has not been revoked by a higher priority task
+      try {
+        myLock = Iterables.getOnlyElement(getTaskLocks(toolbox));
+      } catch (NoSuchElementException e) {
+        throw new ISE("No valid lock found, dying now !! Is there a higher priority task running that may have revoked this lock ?");
+      }
       version = myLock.getVersion();
     }
 
@@ -221,14 +231,23 @@ public TaskStatus run(TaskToolbox toolbox) throws Exception
           {
           }
       );
-
-      toolbox.publishSegments(publishedSegments);
-      return TaskStatus.success(getId());
+      if (toolbox.getTaskActionClient().submit(new SetLockCriticalStateAction(interval, TaskLockCriticalState.UPGRADE))) {
+        toolbox.publishSegments(publishedSegments);
+        return TaskStatus.success(getId());
+      } else {
+        throw new ISE("Lock upgrade failed for interval [%s] !!", interval);
+      }
     } else {
       return TaskStatus.failure(getId());
     }
   }
 
+  private Interval getUmbrellaInterval(Set<Interval> intervals) {
+    return JodaUtils.umbrellaInterval(
+        JodaUtils.condenseIntervals(intervals)
+    );
+  }
+
   public static class HadoopIndexGeneratorInnerProcessing
   {
     public static String runTask(String[] args) throws Exception
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopTask.java
index 00d8a63194e..ca72211f72f 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopTask.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopTask.java
@@ -70,6 +70,11 @@ protected HadoopTask(
     this.hadoopDependencyCoordinates = hadoopDependencyCoordinates;
   }
 
+  @Override
+  public int getLockPriority() {
+    return getLockPriority(INDEX_TASK_PRIORITY);
+  }
+
   public List<String> getHadoopDependencyCoordinates()
   {
     return hadoopDependencyCoordinates == null ? null : ImmutableList.copyOf(hadoopDependencyCoordinates);
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java
index 4918af9e402..e6dae0ece69 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java
@@ -46,6 +46,8 @@
 import io.druid.indexing.common.TaskLock;
 import io.druid.indexing.common.TaskStatus;
 import io.druid.indexing.common.TaskToolbox;
+import io.druid.indexing.common.actions.SetLockCriticalStateAction;
+import io.druid.indexing.common.actions.TaskLockCriticalState;
 import io.druid.indexing.common.index.YeOldePlumberSchool;
 import io.druid.query.aggregation.hyperloglog.HyperLogLogCollector;
 import io.druid.segment.IndexSpec;
@@ -71,6 +73,7 @@
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
+import java.util.NoSuchElementException;
 import java.util.Set;
 import java.util.SortedSet;
 import java.util.concurrent.CopyOnWriteArrayList;
@@ -176,6 +179,11 @@ public IndexTask(
     this.jsonMapper = jsonMapper;
   }
 
+  @Override
+  public int getLockPriority() {
+    return getLockPriority(INDEX_TASK_PRIORITY);
+  }
+
   @Override
   public String getType()
   {
@@ -194,7 +202,14 @@ public TaskStatus run(TaskToolbox toolbox) throws Exception
     final GranularitySpec granularitySpec = ingestionSchema.getDataSchema().getGranularitySpec();
     final int targetPartitionSize = ingestionSchema.getTuningConfig().getTargetPartitionSize();
 
-    final TaskLock myLock = Iterables.getOnlyElement(getTaskLocks(toolbox));
+    final TaskLock myLock;
+    // Confirm we have a lock and it has not been revoked by a higher priority task
+    try {
+      myLock = Iterables.getOnlyElement(getTaskLocks(toolbox));
+    } catch (NoSuchElementException e) {
+      throw new ISE("No valid lock found, dying now !! Is there a higher priority task running that may have revoked this lock ?");
+    }
+
     final Set<DataSegment> segments = Sets.newHashSet();
 
     final Set<Interval> validIntervals = Sets.intersection(granularitySpec.bucketIntervals().get(), getDataIntervals());
@@ -228,8 +243,12 @@ public TaskStatus run(TaskToolbox toolbox) throws Exception
         segments.add(segment);
       }
     }
-    toolbox.publishSegments(segments);
-    return TaskStatus.success(getId());
+    if (toolbox.getTaskActionClient().submit(new SetLockCriticalStateAction(getInterval(), TaskLockCriticalState.UPGRADE))) {
+      toolbox.publishSegments(segments);
+      return TaskStatus.success(getId());
+    } else {
+      throw new ISE("Lock upgrade failed for interval [%s] !!", getInterval());
+    }
   }
 
   private SortedSet<Interval> getDataIntervals() throws IOException
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/KillTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/KillTask.java
index 9e3b197acf1..0c7b8475e20 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/KillTask.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/KillTask.java
@@ -28,13 +28,16 @@
 import io.druid.indexing.common.TaskLock;
 import io.druid.indexing.common.TaskStatus;
 import io.druid.indexing.common.TaskToolbox;
+import io.druid.indexing.common.actions.SetLockCriticalStateAction;
 import io.druid.indexing.common.actions.SegmentListUnusedAction;
 import io.druid.indexing.common.actions.SegmentNukeAction;
+import io.druid.indexing.common.actions.TaskLockCriticalState;
 import io.druid.timeline.DataSegment;
 import org.joda.time.Interval;
 
 import java.util.List;
 import java.util.Map;
+import java.util.NoSuchElementException;
 
 /**
  */
@@ -67,8 +70,13 @@ public String getType()
   @Override
   public TaskStatus run(TaskToolbox toolbox) throws Exception
   {
+    final TaskLock myLock;
     // Confirm we have a lock (will throw if there isn't exactly one element)
-    final TaskLock myLock = Iterables.getOnlyElement(getTaskLocks(toolbox));
+    try {
+      myLock = Iterables.getOnlyElement(getTaskLocks(toolbox));
+    } catch (NoSuchElementException e) {
+      throw new ISE("No valid lock found, dying now !! Is there a higher priority task running that may have revoked this lock ?");
+    }
 
     if (!myLock.getDataSource().equals(getDataSource())) {
       throw new ISE("WTF?! Lock dataSource[%s] != task dataSource[%s]", myLock.getDataSource(), getDataSource());
@@ -97,12 +105,33 @@ public TaskStatus run(TaskToolbox toolbox) throws Exception
       log.info("OK to kill segment: %s", unusedSegment.getIdentifier());
     }
 
-    // Kill segments
+    int counter = 0;
     for (DataSegment segment : unusedSegments) {
+      if (counter % getBatchSize() == 0) {
+        // SetLockCriticalStateAction is idempotent
+        if (!toolbox.getTaskActionClient().submit(new SetLockCriticalStateAction(getInterval(), TaskLockCriticalState.DOWNGRADE))) {
+          throw new ISE(
+              "Lock downgrade failed for interval [%s] !! Successfully killed [%s] segments out of [%s] before failing",
+              getInterval(),
+              counter,
+              unusedSegments.size()
+          );
+        }
+
+        // Try to upgrade the lock again - we will be successful if no other higher priority task needs to lock
+        if (!toolbox.getTaskActionClient().submit(new SetLockCriticalStateAction(getInterval(), TaskLockCriticalState.UPGRADE))) {
+          throw new ISE(
+              "Lock upgrade failed for interval [%s] !! Successfully killed [%s] segments out of [%s] before failing",
+              getInterval(),
+              counter,
+              unusedSegments.size()
+          );
+        }
+      }
       toolbox.getDataSegmentKiller().kill(segment);
       toolbox.getTaskActionClient().submit(new SegmentNukeAction(ImmutableSet.of(segment)));
+      counter++;
     }
-
     return TaskStatus.success(getId());
   }
 }
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java b/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java
index 2a17202b626..7c44aadce17 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java
@@ -41,8 +41,10 @@
 import io.druid.indexing.common.TaskLock;
 import io.druid.indexing.common.TaskStatus;
 import io.druid.indexing.common.TaskToolbox;
+import io.druid.indexing.common.actions.SetLockCriticalStateAction;
 import io.druid.indexing.common.actions.SegmentListUsedAction;
 import io.druid.indexing.common.actions.TaskActionClient;
+import io.druid.indexing.common.actions.TaskLockCriticalState;
 import io.druid.segment.IndexIO;
 import io.druid.timeline.DataSegment;
 import io.druid.timeline.partition.NoneShardSpec;
@@ -53,6 +55,7 @@
 import java.io.File;
 import java.util.List;
 import java.util.Map;
+import java.util.NoSuchElementException;
 import java.util.Set;
 
 /**
@@ -119,10 +122,22 @@ public boolean apply(@Nullable DataSegment segment)
     this.segments = segments;
   }
 
+  @Override
+  public int getLockPriority() {
+    return getLockPriority(MERGE_TASK_PRIORITY);
+  }
+
   @Override
   public TaskStatus run(TaskToolbox toolbox) throws Exception
   {
-    final TaskLock myLock = Iterables.getOnlyElement(getTaskLocks(toolbox));
+    final TaskLock myLock;
+    // Confirm we have a lock and it has not been revoked by a higher priority task
+    try {
+      myLock = Iterables.getOnlyElement(getTaskLocks(toolbox));
+    } catch (NoSuchElementException e) {
+      throw new ISE("No valid lock found, dying now !! Is there a higher priority task running that may have revoked this lock ?");
+    }
+
     final ServiceEmitter emitter = toolbox.getEmitter();
     final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder();
     final DataSegment mergedSegment = computeMergedSegment(getDataSource(), myLock.getVersion(), segments);
@@ -171,9 +186,12 @@ public String apply(DataSegment input)
       emitter.emit(builder.build("merger/uploadTime", System.currentTimeMillis() - uploadStart));
       emitter.emit(builder.build("merger/mergeSize", uploadedSegment.getSize()));
 
-      toolbox.publishSegments(ImmutableList.of(uploadedSegment));
-
-      return TaskStatus.success(getId());
+      if (toolbox.getTaskActionClient().submit(new SetLockCriticalStateAction(getInterval(), TaskLockCriticalState.UPGRADE))) {
+        toolbox.publishSegments(ImmutableList.of(uploadedSegment));
+        return TaskStatus.success(getId());
+      } else {
+        throw new ISE("Lock upgrade failed for interval [%s] !!", getInterval());
+      }
     }
     catch (Exception e) {
       log.makeAlert(e, "Exception merging[%s]", mergedSegment.getDataSource())
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/MoveTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/MoveTask.java
index 08bfcdc7e8f..b1d18ffa39f 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/MoveTask.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/MoveTask.java
@@ -28,13 +28,16 @@
 import io.druid.indexing.common.TaskLock;
 import io.druid.indexing.common.TaskStatus;
 import io.druid.indexing.common.TaskToolbox;
+import io.druid.indexing.common.actions.SetLockCriticalStateAction;
 import io.druid.indexing.common.actions.SegmentListUnusedAction;
 import io.druid.indexing.common.actions.SegmentMetadataUpdateAction;
+import io.druid.indexing.common.actions.TaskLockCriticalState;
 import io.druid.timeline.DataSegment;
 import org.joda.time.Interval;
 
 import java.util.List;
 import java.util.Map;
+import java.util.NoSuchElementException;
 
 public class MoveTask extends AbstractFixedIntervalTask
 {
@@ -71,8 +74,13 @@ public String getType()
   @Override
   public TaskStatus run(TaskToolbox toolbox) throws Exception
   {
-    // Confirm we have a lock (will throw if there isn't exactly one element)
-    final TaskLock myLock = Iterables.getOnlyElement(getTaskLocks(toolbox));
+    final TaskLock myLock;
+    // Confirm we have a lock and it has not been revoked by a higher priority task
+    try {
+      myLock = Iterables.getOnlyElement(getTaskLocks(toolbox));
+    } catch (NoSuchElementException e) {
+      throw new ISE("No valid lock found, dying now !! Is there a higher priority task running that may have revoked this lock ?");
+    }
 
     if(!myLock.getDataSource().equals(getDataSource())) {
       throw new ISE("WTF?! Lock dataSource[%s] != task dataSource[%s]", myLock.getDataSource(), getDataSource());
@@ -101,10 +109,35 @@ public TaskStatus run(TaskToolbox toolbox) throws Exception
       log.info("OK to move segment: %s", unusedSegment.getIdentifier());
     }
 
-    // Move segments
+    int counter = 0;
     for (DataSegment segment : unusedSegments) {
+      if (counter % getBatchSize() == 0) {
+        /** First time we enter the loop the taskLock will be downgraded before being upgraded at all but it should be ok
+         * as SetLockCriticalStateAction is idempotent
+        */
+        if (!toolbox.getTaskActionClient().submit(new SetLockCriticalStateAction(getInterval(), TaskLockCriticalState.DOWNGRADE))) {
+          throw new ISE(
+              "Lock downgrade failed for interval [%s] !! Successfully moved [%s] segments out of [%s] before failing",
+              getInterval(),
+              counter,
+              unusedSegments.size()
+          );
+        }
+
+        // Try to upgrade the lock again - we will be successful if no other higher priority task needs to lock
+        if (!toolbox.getTaskActionClient().submit(new SetLockCriticalStateAction(getInterval(), TaskLockCriticalState.UPGRADE))) {
+          throw new ISE(
+              "Lock upgrade failed for interval [%s] !! Successfully moved [%s] segments out of [%s] before failing",
+              getInterval(),
+              counter,
+              unusedSegments.size()
+          );
+        }
+      }
+      // Move segments
       final DataSegment movedSegment = toolbox.getDataSegmentMover().move(segment, targetLoadSpec);
       toolbox.getTaskActionClient().submit(new SegmentMetadataUpdateAction(ImmutableSet.of(movedSegment)));
+      counter++;
     }
 
     return TaskStatus.success(getId());
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java
index 4b2a93ef031..9a5c4d25d54 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java
@@ -27,6 +27,7 @@
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.primitives.Ints;
+import com.metamx.common.ISE;
 import com.metamx.common.guava.CloseQuietly;
 import com.metamx.emitter.EmittingLogger;
 import io.druid.data.input.Committer;
@@ -37,8 +38,10 @@
 import io.druid.indexing.common.TaskToolbox;
 import io.druid.indexing.common.actions.LockAcquireAction;
 import io.druid.indexing.common.actions.LockReleaseAction;
+import io.druid.indexing.common.actions.SetLockCriticalStateAction;
 import io.druid.indexing.common.actions.TaskActionClient;
 import io.druid.query.DruidMetrics;
+import io.druid.indexing.common.actions.TaskLockCriticalState;
 import io.druid.query.FinalizeResultsQueryRunner;
 import io.druid.query.Query;
 import io.druid.query.QueryRunner;
@@ -149,6 +152,11 @@ public RealtimeIndexTask(
     this.spec = fireDepartment;
   }
 
+  @Override
+  public int getLockPriority() {
+    return getLockPriority(REALTIME_TASK_PRIORITY);
+  }
+
   @Override
   public String getType()
   {
@@ -514,7 +522,14 @@ public TaskActionSegmentPublisher(Task task, TaskToolbox taskToolbox)
     @Override
     public void publishSegment(DataSegment segment) throws IOException
     {
-      taskToolbox.publishSegments(ImmutableList.of(segment));
+      if (taskToolbox.getTaskActionClient().submit(new SetLockCriticalStateAction(segment.getInterval(), TaskLockCriticalState.UPGRADE))) {
+        taskToolbox.publishSegments(ImmutableList.of(segment));
+      } else {
+        throw new ISE(
+            "WTF?! Lock upgrade failed for interval [%s] ! Is there a higher priority task running ?",
+            segment.getInterval()
+        );
+      }
     }
   }
 }
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/RestoreTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/RestoreTask.java
index 3eeefac83b7..d0aff1055fb 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/RestoreTask.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/RestoreTask.java
@@ -22,19 +22,21 @@
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
 import com.metamx.common.ISE;
 import com.metamx.common.logger.Logger;
 import io.druid.indexing.common.TaskLock;
 import io.druid.indexing.common.TaskStatus;
 import io.druid.indexing.common.TaskToolbox;
+import io.druid.indexing.common.actions.SetLockCriticalStateAction;
 import io.druid.indexing.common.actions.SegmentListUnusedAction;
 import io.druid.indexing.common.actions.SegmentMetadataUpdateAction;
+import io.druid.indexing.common.actions.TaskLockCriticalState;
 import io.druid.timeline.DataSegment;
 import org.joda.time.Interval;
 
 import java.util.List;
 import java.util.Map;
+import java.util.NoSuchElementException;
 
 public class RestoreTask extends AbstractFixedIntervalTask
 {
@@ -64,8 +66,13 @@ public String getType()
   @Override
     public TaskStatus run(TaskToolbox toolbox) throws Exception
     {
-      // Confirm we have a lock (will throw if there isn't exactly one element)
-      final TaskLock myLock = Iterables.getOnlyElement(getTaskLocks(toolbox));
+      final TaskLock myLock;
+      // Confirm we have a lock and it has not been revoked by a higher priority task
+      try {
+        myLock = Iterables.getOnlyElement(getTaskLocks(toolbox));
+      } catch (NoSuchElementException e) {
+        throw new ISE("No valid lock found, dying now !! Is there a higher priority task running that may have revoked this lock ?");
+      }
 
       if (!myLock.getDataSource().equals(getDataSource())) {
         throw new ISE("WTF?! Lock dataSource[%s] != task dataSource[%s]", myLock.getDataSource(), getDataSource());
@@ -94,20 +101,33 @@ public TaskStatus run(TaskToolbox toolbox) throws Exception
         log.info("OK to restore segment: %s", unusedSegment.getIdentifier());
       }
 
-      List<DataSegment> restoredSegments = Lists.newLinkedList();
-
-      // Move segments
+      int counter = 0;
       for (DataSegment segment : unusedSegments) {
-        restoredSegments.add(toolbox.getDataSegmentArchiver().restore(segment));
+        if (counter % getBatchSize() == 0) {
+          // SetLockCriticalStateAction is idempotent
+          if (!toolbox.getTaskActionClient().submit(new SetLockCriticalStateAction(getInterval(), TaskLockCriticalState.DOWNGRADE))) {
+            throw new ISE(
+                "Lock downgrade failed for interval [%s] !! Successfully restored [%s] segments out of [%s] before failing",
+                getInterval(),
+                counter,
+                unusedSegments.size()
+            );
+          }
+
+          // Try to upgrade the lock again - we will be successful if no other higher priority task needs to lock
+          if (!toolbox.getTaskActionClient().submit(new SetLockCriticalStateAction(getInterval(), TaskLockCriticalState.UPGRADE))) {
+            throw new ISE(
+                "Lock upgrade failed for interval [%s] !! Successfully archived [%s] segments out of [%s] before failing",
+                getInterval(),
+                counter,
+                unusedSegments.size()
+            );
+          }
+        }
+        DataSegment restoredSegment = toolbox.getDataSegmentArchiver().restore(segment);
+        toolbox.getTaskActionClient().submit(new SegmentMetadataUpdateAction(ImmutableSet.of(restoredSegment)));
+        counter++;
       }
-
-      // Update metadata for moved segments
-      toolbox.getTaskActionClient().submit(
-          new SegmentMetadataUpdateAction(
-              ImmutableSet.copyOf(restoredSegments)
-          )
-      );
-
       return TaskStatus.success(getId());
     }
 }
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java b/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java
index e8cf245e4e0..2782f29b747 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java
@@ -69,6 +69,26 @@
    */
   public String getId();
 
+  /**
+   * Returns lock priority of this task. Lock priority is used for acquiring lock on an interval for a datasource.
+   * Task with higher lock priority override the task with lower lock priority for overlapping interval for a datasource if ran concurrently.
+   *
+   * Tasks with no lock priority set will have the respective default priorities as per the task type
+   *  - Default lock priorities for task
+   *    - Realtime Index Task - 75
+   *    - Hadoop/Index Task - 50
+   *    - Merge/Append Task - 25
+   *    - Other Tasks - 0
+   * Higher the number, higher the priority. Default priority can be overridden by setting context in task json like this -
+   *
+   *  "context" {
+   *    "lockPriority" : "80"
+   *  }
+   *
+   * @return task lock priority
+   * */
+  public int getLockPriority();
+
   /**
    * Returns group ID of this task. Tasks with the same group ID can share locks. If tasks do not need to share locks,
    * a common convention is to set group ID equal to task ID.
diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/HeapMemoryTaskStorage.java b/indexing-service/src/main/java/io/druid/indexing/overlord/HeapMemoryTaskStorage.java
index d6b15c9d879..27676fcfedd 100644
--- a/indexing-service/src/main/java/io/druid/indexing/overlord/HeapMemoryTaskStorage.java
+++ b/indexing-service/src/main/java/io/druid/indexing/overlord/HeapMemoryTaskStorage.java
@@ -93,7 +93,6 @@ public void insert(Task task, TaskStatus status) throws EntryExistsException
   public Optional<Task> getTask(String taskid)
   {
     giant.lock();
-
     try {
       Preconditions.checkNotNull(taskid, "taskid");
       if(tasks.containsKey(taskid)) {
@@ -199,6 +198,36 @@ public void addLock(final String taskid, final TaskLock taskLock)
     }
   }
 
+  @Override
+  public void setLock(String taskid, TaskLock taskLockToSet)
+  {
+    giant.lock();
+    try {
+      Preconditions.checkNotNull(taskid, "taskId");
+      Preconditions.checkNotNull(taskLockToSet, "taskLock");
+      TaskLock taskLockToRemove = taskLockToSet.withUpgraded(!taskLockToSet.isUpgraded());
+      // First try to remove the taskLockToRemove from taskLocks otherwise we may miss the removal
+      // in case taskLockToSet is already present (it may happen if overlord reacquires the locks)
+
+      if (!taskLocks.remove(taskid, taskLockToRemove)) {
+        log.warn(
+            "No TaskLock [%s] found for task: [%s] to be removed",
+            taskLockToRemove,
+            taskid
+        );
+      }
+      if (taskLocks.get(taskid).contains(taskLockToSet)) {
+        log.warn("TaskLock [%s] for task [%s] already set", taskLockToSet, taskid);
+      } else {
+        taskLocks.put(taskid, taskLockToSet);
+        log.info("TaskLock for Task [%s] successfully set to [%s]", taskid, taskLockToSet);
+      }
+    }
+    finally {
+      giant.unlock();
+    }
+  }
+
   @Override
   public void removeLock(final String taskid, final TaskLock taskLock)
   {
diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/MetadataTaskStorage.java b/indexing-service/src/main/java/io/druid/indexing/overlord/MetadataTaskStorage.java
index 54ed75e37e3..39ce0dd208f 100644
--- a/indexing-service/src/main/java/io/druid/indexing/overlord/MetadataTaskStorage.java
+++ b/indexing-service/src/main/java/io/druid/indexing/overlord/MetadataTaskStorage.java
@@ -28,6 +28,7 @@
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import com.google.inject.Inject;
+import com.metamx.common.ISE;
 import com.metamx.common.Pair;
 import com.metamx.common.lifecycle.LifecycleStart;
 import com.metamx.common.lifecycle.LifecycleStop;
@@ -245,6 +246,66 @@ public void addLock(final String taskid, final TaskLock taskLock)
     handler.addLock(taskid, taskLock);
   }
 
+  @Override
+  public void setLock(String taskid, TaskLock taskLockToSet)
+  {
+    Preconditions.checkNotNull(taskid, "taskid");
+    Preconditions.checkNotNull(taskLockToSet, "taskLock");
+
+    int numSet = 0;
+    boolean alreadySet = false;
+
+    final Map<Long, TaskLock> taskLocks = getLocksWithIds(taskid);
+    log.info("Found [%s] locks for task [%s] in TaskStorage",
+             taskLocks.size(),
+             taskid
+    );
+
+    TaskLock taskLockToFind = taskLockToSet.withUpgraded(!taskLockToSet.isUpgraded());
+    // Change all the taskLocks equivalent to taskLockToFind to taskLockToSet
+    for (final Map.Entry<Long, TaskLock> taskLockWithId : taskLocks.entrySet()) {
+      final long id = taskLockWithId.getKey();
+      final TaskLock taskLock = taskLockWithId.getValue();
+
+      if (taskLockToFind.equals(taskLock)) {
+        handler.setLock(id, taskLockToSet);
+        log.debug(
+            "TaskLock with id:[%s] for task:[%s] set to [%s]",
+            id,
+            taskid,
+            taskLockToSet
+        );
+        numSet++;
+      } else if (taskLock.equals(taskLockToSet)) {
+        alreadySet = true;
+      }
+    }
+    if (numSet > 0) {
+      log.info(
+          "[%s] out of [%s] locks found for task:[%s] set to [%s]",
+          numSet,
+          taskLocks.size(),
+          taskid,
+          taskLockToSet
+      );
+    } else if (numSet == 0 && alreadySet) {
+      log.warn(
+          "No Locks changed for task:[%s] already set to [%s]",
+          taskLockToSet.getInterval(),
+          taskid,
+          taskLockToSet
+      );
+    } else {
+      throw new ISE(
+          "WTF ! No locks found for interval [%s] with version [%s] for task: [%s] to set to [%s]",
+          taskLockToSet.getInterval(),
+          taskLockToSet.getVersion(),
+          taskid,
+          taskLockToSet
+      );
+    }
+  }
+
   @Override
   public void removeLock(String taskid, TaskLock taskLockToRemove)
   {
@@ -253,6 +314,8 @@ public void removeLock(String taskid, TaskLock taskLockToRemove)
 
     final Map<Long, TaskLock> taskLocks = getLocksWithIds(taskid);
 
+    boolean removed = false;
+
     for (final Map.Entry<Long, TaskLock> taskLockWithId : taskLocks.entrySet()) {
       final long id = taskLockWithId.getKey();
       final TaskLock taskLock = taskLockWithId.getValue();
@@ -260,8 +323,12 @@ public void removeLock(String taskid, TaskLock taskLockToRemove)
       if (taskLock.equals(taskLockToRemove)) {
         log.info("Deleting TaskLock with id[%d]: %s", id, taskLock);
         handler.removeLock(id);
+        removed = true;
       }
     }
+    if (!removed) {
+      log.error("Did not find any TaskLock [%s] to remove", taskLockToRemove);
+    }
   }
 
   @Override
diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskLockbox.java b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskLockbox.java
index 64f133d90af..3a505e0b9ee 100644
--- a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskLockbox.java
+++ b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskLockbox.java
@@ -19,546 +19,81 @@
 
 package io.druid.indexing.overlord;
 
-import com.google.common.base.Function;
-import com.google.common.base.Objects;
 import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
-import com.google.common.collect.ComparisonChain;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Ordering;
-import com.google.common.collect.Sets;
-import com.google.inject.Inject;
-import com.metamx.common.ISE;
-import com.metamx.common.Pair;
-import com.metamx.common.guava.Comparators;
-import com.metamx.common.guava.FunctionalIterable;
-import com.metamx.emitter.EmittingLogger;
-import io.druid.common.utils.JodaUtils;
 import io.druid.indexing.common.TaskLock;
+import io.druid.indexing.common.actions.TaskLockCriticalState;
 import io.druid.indexing.common.task.Task;
-import org.joda.time.DateTime;
 import org.joda.time.Interval;
 
-import javax.annotation.Nullable;
-import java.util.Collections;
 import java.util.List;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.NavigableSet;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
 
 /**
  * Remembers which activeTasks have locked which intervals. Tasks are permitted to lock an interval if no other task
  * outside their group has locked an overlapping interval for the same datasource. When a task locks an interval,
  * it is assigned a version string that it can use to publish segments.
  */
-public class TaskLockbox
+public interface TaskLockbox
 {
-  // Datasource -> Interval -> Tasks + TaskLock
-  private final Map<String, NavigableMap<Interval, TaskLockPosse>> running = Maps.newHashMap();
-  private final TaskStorage taskStorage;
-  private final ReentrantLock giant = new ReentrantLock();
-  private final Condition lockReleaseCondition = giant.newCondition();
-
-  private static final EmittingLogger log = new EmittingLogger(TaskLockbox.class);
-
-  // Stores List of Active Tasks. TaskLockbox will only grant locks to active activeTasks.
-  // this set should be accessed under the giant lock.
-  private final Set<String> activeTasks = Sets.newHashSet();
-
-  @Inject
-  public TaskLockbox(
-      TaskStorage taskStorage
-  )
-  {
-    this.taskStorage = taskStorage;
-  }
-
   /**
    * Wipe out our current in-memory state and resync it from our bundled {@link io.druid.indexing.overlord.TaskStorage}.
    */
-  public void syncFromStorage()
-  {
-    giant.lock();
-
-    try {
-      // Load stuff from taskStorage first. If this fails, we don't want to lose all our locks.
-      final Set<String> storedActiveTasks = Sets.newHashSet();
-      final List<Pair<Task, TaskLock>> storedLocks = Lists.newArrayList();
-      for (final Task task : taskStorage.getActiveTasks()) {
-        storedActiveTasks.add(task.getId());
-        for (final TaskLock taskLock : taskStorage.getLocks(task.getId())) {
-          storedLocks.add(Pair.of(task, taskLock));
-        }
-      }
-      // Sort locks by version, so we add them back in the order they were acquired.
-      final Ordering<Pair<Task, TaskLock>> byVersionOrdering = new Ordering<Pair<Task, TaskLock>>()
-      {
-        @Override
-        public int compare(Pair<Task, TaskLock> left, Pair<Task, TaskLock> right)
-        {
-          // The second compare shouldn't be necessary, but, whatever.
-          return ComparisonChain.start()
-                                .compare(left.rhs.getVersion(), right.rhs.getVersion())
-                                .compare(left.lhs.getId(), right.lhs.getId())
-                                .result();
-        }
-      };
-      running.clear();
-      activeTasks.clear();
-      activeTasks.addAll(storedActiveTasks);
-      // Bookkeeping for a log message at the end
-      int taskLockCount = 0;
-      for (final Pair<Task, TaskLock> taskAndLock : byVersionOrdering.sortedCopy(storedLocks)) {
-        final Task task = taskAndLock.lhs;
-        final TaskLock savedTaskLock = taskAndLock.rhs;
-        if (savedTaskLock.getInterval().toDurationMillis() <= 0) {
-          // "Impossible", but you never know what crazy stuff can be restored from storage.
-          log.warn("WTF?! Got lock with empty interval for task: %s", task.getId());
-          continue;
-        }
-        final Optional<TaskLock> acquiredTaskLock = tryLock(
-            task,
-            savedTaskLock.getInterval(),
-            Optional.of(savedTaskLock.getVersion())
-        );
-        if (acquiredTaskLock.isPresent() && savedTaskLock.getVersion().equals(acquiredTaskLock.get().getVersion())) {
-          taskLockCount ++;
-          log.info(
-              "Reacquired lock on interval[%s] version[%s] for task: %s",
-              savedTaskLock.getInterval(),
-              savedTaskLock.getVersion(),
-              task.getId()
-          );
-        } else if (acquiredTaskLock.isPresent()) {
-          taskLockCount ++;
-          log.info(
-              "Could not reacquire lock on interval[%s] version[%s] (got version[%s] instead) for task: %s",
-              savedTaskLock.getInterval(),
-              savedTaskLock.getVersion(),
-              acquiredTaskLock.get().getVersion(),
-              task.getId()
-          );
-        } else {
-          log.info(
-              "Could not reacquire lock on interval[%s] version[%s] for task: %s",
-              savedTaskLock.getInterval(),
-              savedTaskLock.getVersion(),
-              task.getId()
-          );
-        }
-      }
-      log.info(
-          "Synced %,d locks for %,d activeTasks from storage (%,d locks ignored).",
-          taskLockCount,
-          activeTasks.size(),
-          storedLocks.size() - taskLockCount
-      );
-    } finally {
-      giant.unlock();
-    }
-  }
-
+  void syncFromStorage();
   /**
    * Acquires a lock on behalf of a task. Blocks until the lock is acquired. Throws an exception if the lock
    * cannot be acquired.
    *
-   * @param task task to acquire lock for
+   * @param task     task to acquire lock for
    * @param interval interval to lock
+   *
    * @return acquired TaskLock
    *
    * @throws java.lang.InterruptedException if the lock cannot be acquired
    */
-  public TaskLock lock(final Task task, final Interval interval) throws InterruptedException
-  {
-    giant.lock();
-    try {
-      Optional<TaskLock> taskLock;
-      while (!(taskLock = tryLock(task, interval)).isPresent()) {
-        lockReleaseCondition.await();
-      }
-
-      return taskLock.get();
-    } finally {
-      giant.unlock();
-    }
-  }
-
+  TaskLock lock(final Task task, final Interval interval) throws InterruptedException;
   /**
    * Attempt to lock a task, without removing it from the queue. Equivalent to the long form of {@code tryLock}
    * with no preferred version.
    *
-   * @param task             task that wants a lock
-   * @param interval         interval to lock
+   * @param task     task that wants a lock
+   * @param interval interval to lock
    *
    * @return lock version if lock was acquired, absent otherwise
-   * @throws IllegalStateException if the task is not a valid active task
-   */
-  public Optional<TaskLock> tryLock(final Task task, final Interval interval)
-  {
-    return tryLock(task, interval, Optional.<String>absent());
-  }
-
-  /**
-   * Attempt to lock a task, without removing it from the queue. Can safely be called multiple times on the same task.
-   * This method will attempt to assign version strings that obey the invariant that every version string is
-   * lexicographically greater than any other version string previously assigned to the same interval. This invariant
-   * is only mostly guaranteed, however; we assume clock monotonicity and we assume that callers specifying
-   * {@code preferredVersion} are doing the right thing.
-   *
-   * @param task             task that wants a lock
-   * @param interval         interval to lock
-   * @param preferredVersion use this version string if one has not yet been assigned
    *
-   * @return lock version if lock was acquired, absent otherwise
    * @throws IllegalStateException if the task is not a valid active task
    */
-  private Optional<TaskLock> tryLock(final Task task, final Interval interval, final Optional<String> preferredVersion)
-  {
-    giant.lock();
-
-    try {
-      if(!activeTasks.contains(task.getId())){
-        throw new ISE("Unable to grant lock to inactive Task [%s]", task.getId());
-      }
-      Preconditions.checkArgument(interval.toDurationMillis() > 0, "interval empty");
-      final String dataSource = task.getDataSource();
-      final List<TaskLockPosse> foundPosses = findLockPossesForInterval(dataSource, interval);
-      final TaskLockPosse posseToUse;
-
-      if (foundPosses.size() > 1) {
-
-        // Too many existing locks.
-        return Optional.absent();
-
-      } else if (foundPosses.size() == 1) {
-
-        // One existing lock -- check if we can add to it.
-
-        final TaskLockPosse foundPosse = Iterables.getOnlyElement(foundPosses);
-        if (foundPosse.getTaskLock().getInterval().contains(interval) && foundPosse.getTaskLock().getGroupId().equals(task.getGroupId())) {
-          posseToUse = foundPosse;
-        } else {
-          return Optional.absent();
-        }
-
-      } else {
-
-        // No existing locks. We can make a new one.
-        if (!running.containsKey(dataSource)) {
-          running.put(dataSource, new TreeMap<Interval, TaskLockPosse>(Comparators.intervalsByStartThenEnd()));
-        }
-
-        // Create new TaskLock and assign it a version.
-        // Assumption: We'll choose a version that is greater than any previously-chosen version for our interval. (This
-        // may not always be true, unfortunately. See below.)
-
-        final String version;
-
-        if (preferredVersion.isPresent()) {
-          // We have a preferred version. We'll trust our caller to not break our ordering assumptions and just use it.
-          version = preferredVersion.get();
-        } else {
-          // We are running under an interval lock right now, so just using the current time works as long as we can trust
-          // our clock to be monotonic and have enough resolution since the last time we created a TaskLock for the same
-          // interval. This may not always be true; to assure it we would need to use some method of timekeeping other
-          // than the wall clock.
-          version = new DateTime().toString();
-        }
-
-        posseToUse = new TaskLockPosse(new TaskLock(task.getGroupId(), dataSource, interval, version));
-        running.get(dataSource)
-               .put(interval, posseToUse);
-
-        log.info("Created new TaskLockPosse: %s", posseToUse);
-      }
-
-      // Add to existing TaskLockPosse, if necessary
-      if (posseToUse.getTaskIds().add(task.getId())) {
-        log.info("Added task[%s] to TaskLock[%s]", task.getId(), posseToUse.getTaskLock().getGroupId());
-
-        // Update task storage facility. If it fails, revoke the lock.
-        try {
-          taskStorage.addLock(task.getId(), posseToUse.getTaskLock());
-          return Optional.of(posseToUse.getTaskLock());
-        } catch(Exception e) {
-          log.makeAlert("Failed to persist lock in storage")
-             .addData("task", task.getId())
-             .addData("dataSource", posseToUse.getTaskLock().getDataSource())
-             .addData("interval", posseToUse.getTaskLock().getInterval())
-             .addData("version", posseToUse.getTaskLock().getVersion())
-             .emit();
-          unlock(task, interval);
-          return Optional.absent();
-        }
-      } else {
-        log.info("Task[%s] already present in TaskLock[%s]", task.getId(), posseToUse.getTaskLock().getGroupId());
-        return Optional.of(posseToUse.getTaskLock());
-      }
-    }
-    finally {
-      giant.unlock();
-    }
-
-  }
-
+  Optional<TaskLock> tryLock(final Task task, final Interval interval);
   /**
    * Return the currently-active locks for some task.
    *
    * @param task task for which to locate locks
+   *
    * @return currently-active locks for the given task
    */
-  public List<TaskLock> findLocksForTask(final Task task)
-  {
-    giant.lock();
-
-    try {
-      return Lists.transform(
-          findLockPossesForTask(task), new Function<TaskLockPosse, TaskLock>()
-          {
-            @Override
-            public TaskLock apply(TaskLockPosse taskLockPosse)
-            {
-              return taskLockPosse.getTaskLock();
-            }
-          }
-      );
-    } finally {
-      giant.unlock();
-    }
-  }
-
+  List<TaskLock> findLocksForTask(final Task task);
   /**
    * Release lock held for a task on a particular interval. Does nothing if the task does not currently
    * hold the mentioned lock.
    *
-   * @param task task to unlock
+   * @param task     task to unlock
    * @param interval interval to unlock
    */
-  public void unlock(final Task task, final Interval interval)
-  {
-    giant.lock();
-
-    try {
-      final String dataSource = task.getDataSource();
-      final NavigableMap<Interval, TaskLockPosse> dsRunning = running.get(dataSource);
-
-      // So we can alert if activeTasks try to release stuff they don't have
-      boolean removed = false;
-
-      if(dsRunning != null) {
-        final TaskLockPosse taskLockPosse = dsRunning.get(interval);
-        if(taskLockPosse != null) {
-          final TaskLock taskLock = taskLockPosse.getTaskLock();
-
-          // Remove task from live list
-          log.info("Removing task[%s] from TaskLock[%s]", task.getId(), taskLock.getGroupId());
-          removed = taskLockPosse.getTaskIds().remove(task.getId());
-
-          if (taskLockPosse.getTaskIds().isEmpty()) {
-            log.info("TaskLock is now empty: %s", taskLock);
-            running.get(dataSource).remove(taskLock.getInterval());
-          }
-
-          if (running.get(dataSource).size() == 0) {
-            running.remove(dataSource);
-          }
-
-          // Wake up blocking-lock waiters
-          lockReleaseCondition.signalAll();
-
-          // Remove lock from storage. If it cannot be removed, just ignore the failure.
-          try {
-            taskStorage.removeLock(task.getId(), taskLock);
-          } catch(Exception e) {
-            log.makeAlert(e, "Failed to clean up lock from storage")
-               .addData("task", task.getId())
-               .addData("dataSource", taskLock.getDataSource())
-               .addData("interval", taskLock.getInterval())
-               .addData("version", taskLock.getVersion())
-               .emit();
-          }
-        }
-      }
-
-      if(!removed) {
-        log.makeAlert("Lock release without acquire")
-           .addData("task", task.getId())
-           .addData("interval", interval)
-           .emit();
-      }
-    } finally {
-      giant.unlock();
-    }
-  }
-
+  void unlock(final Task task, final Interval interval);
   /**
    * Release all locks for a task and remove task from set of active tasks. Does nothing if the task is not currently locked or not an active task.
    *
    * @param task task to unlock
    */
-  public void remove(final Task task)
-  {
-    giant.lock();
-    try {
-      try {
-        log.info("Removing task[%s] from activeTasks", task.getId());
-        for (final TaskLockPosse taskLockPosse : findLockPossesForTask(task)) {
-          unlock(task, taskLockPosse.getTaskLock().getInterval());
-        }
-      }
-      finally {
-        activeTasks.remove(task.getId());
-      }
-    }
-    finally {
-      giant.unlock();
-    }
-  }
-
+  void remove(final Task task);
+  void add(Task task);
   /**
-   * Return the currently-active lock posses for some task.
+   * Sets the TaskLock state specified by <code>taskLockCriticalState</code> for <code>task</> with <code>interval</code>
+   * Only applicable when performing priority based task locking
    *
-   * @param task task for which to locate locks
-   */
-  private List<TaskLockPosse> findLockPossesForTask(final Task task)
-  {
-    giant.lock();
-
-    try {
-      final Iterable<TaskLockPosse> searchSpace;
-
-      // Scan through all locks for this datasource
-      final NavigableMap<Interval, TaskLockPosse> dsRunning = running.get(task.getDataSource());
-      if(dsRunning == null) {
-        searchSpace = ImmutableList.of();
-      } else {
-        searchSpace = dsRunning.values();
-      }
-
-      return ImmutableList.copyOf(
-          Iterables.filter(
-              searchSpace, new Predicate<TaskLockPosse>()
-          {
-            @Override
-            public boolean apply(TaskLockPosse taskLock)
-            {
-              return taskLock.getTaskIds().contains(task.getId());
-            }
-          }
-          )
-      );
-    }
-    finally {
-      giant.unlock();
-    }
-  }
-
-  /**
-   * Return all locks that overlap some search interval.
+   * @param task                  task corresponding to the lock
+   * @param interval              interval for the lock
+   * @param taskLockCriticalState upgrade or downgrade the lock depending on this parameter
+   *
+   * @return true if the TaskLock was set, false otherwise
    */
-  private List<TaskLockPosse> findLockPossesForInterval(final String dataSource, final Interval interval)
-  {
-    giant.lock();
-
-    try {
-      final NavigableMap<Interval, TaskLockPosse> dsRunning = running.get(dataSource);
-      if (dsRunning == null) {
-        // No locks at all
-        return Collections.emptyList();
-      } else {
-        // Tasks are indexed by locked interval, which are sorted by interval start. Intervals are non-overlapping, so:
-        final NavigableSet<Interval> dsLockbox = dsRunning.navigableKeySet();
-        final Iterable<Interval> searchIntervals = Iterables.concat(
-            // Single interval that starts at or before ours
-            Collections.singletonList(dsLockbox.floor(new Interval(interval.getStart(), new DateTime(JodaUtils.MAX_INSTANT)))),
-
-            // All intervals that start somewhere between our start instant (exclusive) and end instant (exclusive)
-            dsLockbox.subSet(
-                new Interval(interval.getStart(), new DateTime(JodaUtils.MAX_INSTANT)),
-                false,
-                new Interval(interval.getEnd(), interval.getEnd()),
-                false
-            )
-        );
-
-        return Lists.newArrayList(
-            FunctionalIterable
-                .create(searchIntervals)
-                .filter(
-                    new Predicate<Interval>()
-                    {
-                      @Override
-                      public boolean apply(@Nullable Interval searchInterval)
-                      {
-                        return searchInterval != null && searchInterval.overlaps(interval);
-                      }
-                    }
-                )
-                .transform(
-                    new Function<Interval, TaskLockPosse>()
-                    {
-                      @Override
-                      public TaskLockPosse apply(Interval interval)
-                      {
-                        return dsRunning.get(interval);
-                      }
-                    }
-                )
-        );
-      }
-    }
-    finally {
-      giant.unlock();
-    }
-  }
-
-  public void add(Task task)
-  {
-    giant.lock();
-    try {
-      log.info("Adding task[%s] to activeTasks", task.getId());
-      activeTasks.add(task.getId());
-    } finally {
-      giant.unlock();
-    }
-  }
-
-  private static class TaskLockPosse
-  {
-    final private TaskLock taskLock;
-    final private Set<String> taskIds;
-
-    public TaskLockPosse(TaskLock taskLock)
-    {
-      this.taskLock = taskLock;
-      taskIds = Sets.newHashSet();
-    }
-
-    public TaskLock getTaskLock()
-    {
-      return taskLock;
-    }
-
-    public Set<String> getTaskIds()
-    {
-      return taskIds;
-    }
-
-    @Override
-    public String toString()
-    {
-      return Objects.toStringHelper(this)
-                    .add("taskLock", taskLock)
-                    .add("taskIds", taskIds)
-                    .toString();
-    }
-  }
+  boolean setTaskLockCriticalState(Task task, Interval interval, TaskLockCriticalState taskLockCriticalState);
 }
diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskLockboxV1.java b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskLockboxV1.java
new file mode 100644
index 00000000000..0a59f1dcaa5
--- /dev/null
+++ b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskLockboxV1.java
@@ -0,0 +1,548 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.indexing.overlord;
+
+import com.google.common.base.Function;
+import com.google.common.base.Objects;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.collect.ComparisonChain;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Ordering;
+import com.google.common.collect.Sets;
+import com.google.inject.Inject;
+import com.metamx.common.ISE;
+import com.metamx.common.Pair;
+import com.metamx.common.guava.Comparators;
+import com.metamx.common.guava.FunctionalIterable;
+import com.metamx.emitter.EmittingLogger;
+import io.druid.common.utils.JodaUtils;
+import io.druid.indexing.common.TaskLock;
+import io.druid.indexing.common.actions.TaskLockCriticalState;
+import io.druid.indexing.common.task.Task;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.NavigableSet;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class TaskLockboxV1 implements TaskLockbox
+{
+  // Datasource -> Interval -> Tasks + TaskLock
+  private final Map<String, NavigableMap<Interval, TaskLockPosse>> running = Maps.newHashMap();
+  private final TaskStorage taskStorage;
+  private final ReentrantLock giant = new ReentrantLock();
+  private final Condition lockReleaseCondition = giant.newCondition();
+
+  private static final EmittingLogger log = new EmittingLogger(TaskLockboxV1.class);
+
+  // Stores List of Active Tasks. TaskLockbox will only grant locks to active activeTasks.
+  // this set should be accessed under the giant lock.
+  private final Set<String> activeTasks = Sets.newHashSet();
+
+  @Inject
+  public TaskLockboxV1(
+      TaskStorage taskStorage
+  )
+  {
+    this.taskStorage = taskStorage;
+  }
+
+  public void syncFromStorage()
+  {
+    giant.lock();
+
+    try {
+      // Load stuff from taskStorage first. If this fails, we don't want to lose all our locks.
+      final Set<String> storedActiveTasks = Sets.newHashSet();
+      final List<Pair<Task, TaskLock>> storedLocks = Lists.newArrayList();
+      for (final Task task : taskStorage.getActiveTasks()) {
+        storedActiveTasks.add(task.getId());
+        for (final TaskLock taskLock : taskStorage.getLocks(task.getId())) {
+          storedLocks.add(Pair.of(task, taskLock));
+        }
+      }
+      // Sort locks by version, so we add them back in the order they were acquired.
+      final Ordering<Pair<Task, TaskLock>> byVersionOrdering = new Ordering<Pair<Task, TaskLock>>()
+      {
+        @Override
+        public int compare(Pair<Task, TaskLock> left, Pair<Task, TaskLock> right)
+        {
+          // The second compare shouldn't be necessary, but, whatever.
+          return ComparisonChain.start()
+                                .compare(left.rhs.getVersion(), right.rhs.getVersion())
+                                .compare(left.lhs.getId(), right.lhs.getId())
+                                .result();
+        }
+      };
+      running.clear();
+      activeTasks.clear();
+      activeTasks.addAll(storedActiveTasks);
+      // Bookkeeping for a log message at the end
+      int taskLockCount = 0;
+      for (final Pair<Task, TaskLock> taskAndLock : byVersionOrdering.sortedCopy(storedLocks)) {
+        final Task task = taskAndLock.lhs;
+        final TaskLock savedTaskLock = taskAndLock.rhs;
+        if (savedTaskLock.getInterval().toDurationMillis() <= 0) {
+          // "Impossible", but you never know what crazy stuff can be restored from storage.
+          log.warn("WTF?! Got lock with empty interval for task: %s", task.getId());
+          continue;
+        }
+        final Optional<TaskLock> acquiredTaskLock = tryLock(
+            task,
+            savedTaskLock.getInterval(),
+            Optional.of(savedTaskLock.getVersion())
+        );
+        if (acquiredTaskLock.isPresent() && savedTaskLock.getVersion().equals(acquiredTaskLock.get().getVersion())) {
+          taskLockCount++;
+          log.info(
+              "Reacquired lock on interval[%s] version[%s] for task: %s",
+              savedTaskLock.getInterval(),
+              savedTaskLock.getVersion(),
+              task.getId()
+          );
+        } else if (acquiredTaskLock.isPresent()) {
+          taskLockCount++;
+          log.info(
+              "Could not reacquire lock on interval[%s] version[%s] (got version[%s] instead) for task: %s",
+              savedTaskLock.getInterval(),
+              savedTaskLock.getVersion(),
+              acquiredTaskLock.get().getVersion(),
+              task.getId()
+          );
+        } else {
+          log.info(
+              "Could not reacquire lock on interval[%s] version[%s] for task: %s",
+              savedTaskLock.getInterval(),
+              savedTaskLock.getVersion(),
+              task.getId()
+          );
+        }
+      }
+      log.info(
+          "Synced %,d locks for %,d activeTasks from storage (%,d locks ignored).",
+          taskLockCount,
+          activeTasks.size(),
+          storedLocks.size() - taskLockCount
+      );
+    }
+    finally {
+      giant.unlock();
+    }
+  }
+
+  public TaskLock lock(final Task task, final Interval interval) throws InterruptedException
+  {
+    giant.lock();
+    try {
+      Optional<TaskLock> taskLock;
+      while (!(taskLock = tryLock(task, interval)).isPresent()) {
+        lockReleaseCondition.await();
+      }
+
+      return taskLock.get();
+    }
+    finally {
+      giant.unlock();
+    }
+  }
+
+  public Optional<TaskLock> tryLock(final Task task, final Interval interval)
+  {
+    return tryLock(task, interval, Optional.<String>absent());
+  }
+
+  /**
+   * Attempt to lock a task, without removing it from the queue. Can safely be called multiple times on the same task.
+   * This method will attempt to assign version strings that obey the invariant that every version string is
+   * lexicographically greater than any other version string previously assigned to the same interval. This invariant
+   * is only mostly guaranteed, however; we assume clock monotonicity and we assume that callers specifying
+   * {@code preferredVersion} are doing the right thing.
+   *
+   * @param task             task that wants a lock
+   * @param interval         interval to lock
+   * @param preferredVersion use this version string if one has not yet been assigned
+   *
+   * @return lock version if lock was acquired, absent otherwise
+   *
+   * @throws IllegalStateException if the task is not a valid active task
+   */
+  private Optional<TaskLock> tryLock(final Task task, final Interval interval, final Optional<String> preferredVersion)
+  {
+    giant.lock();
+
+    try {
+      if (!activeTasks.contains(task.getId())) {
+        throw new ISE("Unable to grant lock to inactive Task [%s]", task.getId());
+      }
+      Preconditions.checkArgument(interval.toDurationMillis() > 0, "interval empty");
+      final String dataSource = task.getDataSource();
+      final List<TaskLockPosse> foundPosses = findLockPossesForInterval(dataSource, interval);
+      final TaskLockPosse posseToUse;
+
+      if (foundPosses.size() > 1) {
+
+        // Too many existing locks.
+        return Optional.absent();
+
+      } else if (foundPosses.size() == 1) {
+
+        // One existing lock -- check if we can add to it.
+
+        final TaskLockPosse foundPosse = Iterables.getOnlyElement(foundPosses);
+        if (foundPosse.getTaskLock().getInterval().contains(interval) && foundPosse.getTaskLock()
+                                                                                   .getGroupId()
+                                                                                   .equals(task.getGroupId())) {
+          posseToUse = foundPosse;
+        } else {
+          return Optional.absent();
+        }
+
+      } else {
+
+        // No existing locks. We can make a new one.
+        if (!running.containsKey(dataSource)) {
+          running.put(dataSource, new TreeMap<Interval, TaskLockPosse>(Comparators.intervalsByStartThenEnd()));
+        }
+
+        // Create new TaskLock and assign it a version.
+        // Assumption: We'll choose a version that is greater than any previously-chosen version for our interval. (This
+        // may not always be true, unfortunately. See below.)
+
+        final String version;
+
+        if (preferredVersion.isPresent()) {
+          // We have a preferred version. We'll trust our caller to not break our ordering assumptions and just use it.
+          version = preferredVersion.get();
+        } else {
+          // We are running under an interval lock right now, so just using the current time works as long as we can trust
+          // our clock to be monotonic and have enough resolution since the last time we created a TaskLock for the same
+          // interval. This may not always be true; to assure it we would need to use some method of timekeeping other
+          // than the wall clock.
+          version = new DateTime().toString();
+        }
+
+        posseToUse = new TaskLockPosse(new TaskLock(
+            task.getGroupId(),
+            dataSource,
+            interval,
+            version,
+            0, //TaskLockboxV1 does not do priority locking, therefore use 0 as priority for all tasks
+            true //TaskLockboxV1 does not do priority locking, therefore TaskLocks will always be in upgraded state
+        ));
+        running.get(dataSource)
+               .put(interval, posseToUse);
+
+        log.info("Created new TaskLockPosse: %s", posseToUse);
+      }
+
+      // Add to existing TaskLockPosse, if necessary
+      if (posseToUse.getTaskIds().add(task.getId())) {
+        log.info("Added task[%s] to TaskLock[%s]", task.getId(), posseToUse.getTaskLock().getGroupId());
+
+        // Update task storage facility. If it fails, revoke the lock.
+        try {
+          taskStorage.addLock(task.getId(), posseToUse.getTaskLock());
+          return Optional.of(posseToUse.getTaskLock());
+        }
+        catch (Exception e) {
+          log.makeAlert("Failed to persist lock in storage")
+             .addData("task", task.getId())
+             .addData("dataSource", posseToUse.getTaskLock().getDataSource())
+             .addData("interval", posseToUse.getTaskLock().getInterval())
+             .addData("version", posseToUse.getTaskLock().getVersion())
+             .emit();
+          unlock(task, interval);
+          return Optional.absent();
+        }
+      } else {
+        log.info("Task[%s] already present in TaskLock[%s]", task.getId(), posseToUse.getTaskLock().getGroupId());
+        return Optional.of(posseToUse.getTaskLock());
+      }
+    }
+    finally {
+      giant.unlock();
+    }
+
+  }
+
+  public List<TaskLock> findLocksForTask(final Task task)
+  {
+    giant.lock();
+
+    try {
+      return Lists.transform(
+          findLockPossesForTask(task), new Function<TaskLockPosse, TaskLock>()
+          {
+            @Override
+            public TaskLock apply(TaskLockPosse taskLockPosse)
+            {
+              return taskLockPosse.getTaskLock();
+            }
+          }
+      );
+    }
+    finally {
+      giant.unlock();
+    }
+  }
+
+  public void unlock(final Task task, final Interval interval)
+  {
+    giant.lock();
+
+    try {
+      final String dataSource = task.getDataSource();
+      final NavigableMap<Interval, TaskLockPosse> dsRunning = running.get(dataSource);
+
+      // So we can alert if activeTasks try to release stuff they don't have
+      boolean removed = false;
+
+      if (dsRunning != null) {
+        final TaskLockPosse taskLockPosse = dsRunning.get(interval);
+        if (taskLockPosse != null) {
+          final TaskLock taskLock = taskLockPosse.getTaskLock();
+
+          // Remove task from live list
+          log.info("Removing task[%s] from TaskLock[%s]", task.getId(), taskLock.getGroupId());
+          removed = taskLockPosse.getTaskIds().remove(task.getId());
+
+          if (taskLockPosse.getTaskIds().isEmpty()) {
+            log.info("TaskLock is now empty: %s", taskLock);
+            running.get(dataSource).remove(taskLock.getInterval());
+          }
+
+          if (running.get(dataSource).size() == 0) {
+            running.remove(dataSource);
+          }
+
+          // Wake up blocking-lock waiters
+          lockReleaseCondition.signalAll();
+
+          // Remove lock from storage. If it cannot be removed, just ignore the failure.
+          try {
+            taskStorage.removeLock(task.getId(), taskLock);
+          }
+          catch (Exception e) {
+            log.makeAlert(e, "Failed to clean up lock from storage")
+               .addData("task", task.getId())
+               .addData("dataSource", taskLock.getDataSource())
+               .addData("interval", taskLock.getInterval())
+               .addData("version", taskLock.getVersion())
+               .emit();
+          }
+        }
+      }
+
+      if (!removed) {
+        log.makeAlert("Lock release without acquire")
+           .addData("task", task.getId())
+           .addData("interval", interval)
+           .emit();
+      }
+    }
+    finally {
+      giant.unlock();
+    }
+  }
+
+  public void remove(final Task task)
+  {
+    giant.lock();
+    try {
+      try {
+        log.info("Removing task[%s] from activeTasks", task.getId());
+        for (final TaskLockPosse taskLockPosse : findLockPossesForTask(task)) {
+          unlock(task, taskLockPosse.getTaskLock().getInterval());
+        }
+      }
+      finally {
+        activeTasks.remove(task.getId());
+      }
+    }
+    finally {
+      giant.unlock();
+    }
+  }
+
+  /**
+   * Return the currently-active lock posses for some task.
+   *
+   * @param task task for which to locate locks
+   */
+  private List<TaskLockPosse> findLockPossesForTask(final Task task)
+  {
+    giant.lock();
+
+    try {
+      final Iterable<TaskLockPosse> searchSpace;
+
+      // Scan through all locks for this datasource
+      final NavigableMap<Interval, TaskLockPosse> dsRunning = running.get(task.getDataSource());
+      if (dsRunning == null) {
+        searchSpace = ImmutableList.of();
+      } else {
+        searchSpace = dsRunning.values();
+      }
+
+      return ImmutableList.copyOf(
+          Iterables.filter(
+              searchSpace, new Predicate<TaskLockPosse>()
+              {
+                @Override
+                public boolean apply(TaskLockPosse taskLock)
+                {
+                  return taskLock.getTaskIds().contains(task.getId());
+                }
+              }
+          )
+      );
+    }
+    finally {
+      giant.unlock();
+    }
+  }
+
+  /**
+   * Return all locks that overlap some search interval.
+   */
+  private List<TaskLockPosse> findLockPossesForInterval(final String dataSource, final Interval interval)
+  {
+    giant.lock();
+
+    try {
+      final NavigableMap<Interval, TaskLockPosse> dsRunning = running.get(dataSource);
+      if (dsRunning == null) {
+        // No locks at all
+        return Collections.emptyList();
+      } else {
+        // Tasks are indexed by locked interval, which are sorted by interval start. Intervals are non-overlapping, so:
+        final NavigableSet<Interval> dsLockbox = dsRunning.navigableKeySet();
+        final Iterable<Interval> searchIntervals = Iterables.concat(
+            // Single interval that starts at or before ours
+            Collections.singletonList(dsLockbox.floor(new Interval(
+                interval.getStart(),
+                new DateTime(JodaUtils.MAX_INSTANT)
+            ))),
+
+            // All intervals that start somewhere between our start instant (exclusive) and end instant (exclusive)
+            dsLockbox.subSet(
+                new Interval(interval.getStart(), new DateTime(JodaUtils.MAX_INSTANT)),
+                false,
+                new Interval(interval.getEnd(), interval.getEnd()),
+                false
+            )
+        );
+
+        return Lists.newArrayList(
+            FunctionalIterable
+                .create(searchIntervals)
+                .filter(
+                    new Predicate<Interval>()
+                    {
+                      @Override
+                      public boolean apply(@Nullable Interval searchInterval)
+                      {
+                        return searchInterval != null && searchInterval.overlaps(interval);
+                      }
+                    }
+                )
+                .transform(
+                    new Function<Interval, TaskLockPosse>()
+                    {
+                      @Override
+                      public TaskLockPosse apply(Interval interval)
+                      {
+                        return dsRunning.get(interval);
+                      }
+                    }
+                )
+        );
+      }
+    }
+    finally {
+      giant.unlock();
+    }
+  }
+
+  public void add(Task task)
+  {
+    giant.lock();
+    try {
+      log.info("Adding task[%s] to activeTasks", task.getId());
+      activeTasks.add(task.getId());
+    }
+    finally {
+      giant.unlock();
+    }
+  }
+
+  @Override
+  public boolean setTaskLockCriticalState(
+      Task task, Interval interval, TaskLockCriticalState taskLockCriticalState
+  )
+  {
+    // Always return true as TaskLockboxV1 does not do priority based locking
+    return true;
+  }
+
+  private static class TaskLockPosse
+  {
+    final private TaskLock taskLock;
+    final private Set<String> taskIds;
+
+    public TaskLockPosse(TaskLock taskLock)
+    {
+      this.taskLock = taskLock;
+      taskIds = Sets.newHashSet();
+    }
+
+    public TaskLock getTaskLock()
+    {
+      return taskLock;
+    }
+
+    public Set<String> getTaskIds()
+    {
+      return taskIds;
+    }
+
+    @Override
+    public String toString()
+    {
+      return Objects.toStringHelper(this)
+                    .add("taskLock", taskLock)
+                    .add("taskIds", taskIds)
+                    .toString();
+    }
+  }
+}
\ No newline at end of file
diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskLockboxV2.java b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskLockboxV2.java
new file mode 100644
index 00000000000..051e7c56a10
--- /dev/null
+++ b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskLockboxV2.java
@@ -0,0 +1,745 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.indexing.overlord;
+
+import com.google.common.base.Function;
+import com.google.common.base.Objects;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.collect.ComparisonChain;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Ordering;
+import com.google.common.collect.Sets;
+import com.google.inject.Inject;
+import com.metamx.common.ISE;
+import com.metamx.common.Pair;
+import com.metamx.common.guava.Comparators;
+import com.metamx.common.guava.FunctionalIterable;
+import com.metamx.emitter.EmittingLogger;
+import io.druid.common.utils.JodaUtils;
+import io.druid.indexing.common.TaskLock;
+import io.druid.indexing.common.actions.TaskLockCriticalState;
+import io.druid.indexing.common.task.Task;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.NavigableSet;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class TaskLockboxV2 implements TaskLockbox
+{
+  // Datasource -> Interval -> Tasks + TaskLock
+  private final Map<String, NavigableMap<Interval, TaskLockPosse>> running = Maps.newHashMap();
+  private final TaskStorage taskStorage;
+  private final ReentrantLock giant = new ReentrantLock();
+  private final Condition lockReleaseCondition = giant.newCondition();
+
+  private static final EmittingLogger log = new EmittingLogger(TaskLockboxV2.class);
+
+  // Stores List of Active Tasks. TaskLockbox will only grant locks to active activeTasks.
+  // this set should be accessed under the giant lock.
+  private final Set<String> activeTasks = Sets.newHashSet();
+
+  // Should be accessed under the giant lock
+  private final Set<Task> tasksWaitingForLock = Sets.newHashSet();
+
+  @Inject
+  public TaskLockboxV2(
+      TaskStorage taskStorage
+  )
+  {
+    this.taskStorage = taskStorage;
+  }
+
+  public void syncFromStorage()
+  {
+    giant.lock();
+
+    try {
+      // Load stuff from taskStorage first. If this fails, we don't want to lose all our locks.
+      final Set<String> storedActiveTasks = Sets.newHashSet();
+      final List<Pair<Task, TaskLock>> storedLocks = Lists.newArrayList();
+      for (final Task task : taskStorage.getActiveTasks()) {
+        storedActiveTasks.add(task.getId());
+        for (final TaskLock taskLock : taskStorage.getLocks(task.getId())) {
+          storedLocks.add(Pair.of(task, taskLock));
+        }
+      }
+      // Sort locks by version, so we add them back in the order they were acquired.
+      final Ordering<Pair<Task, TaskLock>> byVersionOrdering = new Ordering<Pair<Task, TaskLock>>()
+      {
+        @Override
+        public int compare(Pair<Task, TaskLock> left, Pair<Task, TaskLock> right)
+        {
+          // The second compare shouldn't be necessary, but, whatever.
+          return ComparisonChain.start()
+                                .compare(left.rhs.getVersion(), right.rhs.getVersion())
+                                .compare(left.lhs.getId(), right.lhs.getId())
+                                .result();
+        }
+      };
+      running.clear();
+      activeTasks.clear();
+      activeTasks.addAll(storedActiveTasks);
+      // Bookkeeping for a log message at the end
+      int taskLockCount = 0;
+      for (final Pair<Task, TaskLock> taskAndLock : byVersionOrdering.sortedCopy(storedLocks)) {
+        final Task task = taskAndLock.lhs;
+        final TaskLock savedTaskLock = taskAndLock.rhs;
+        if (savedTaskLock.getInterval().toDurationMillis() <= 0) {
+          // "Impossible", but you never know what crazy stuff can be restored from storage.
+          log.warn("WTF?! Got lock with empty interval for task: %s", task.getId());
+          continue;
+        }
+        final Optional<TaskLock> acquiredTaskLock = tryLock(
+            task,
+            savedTaskLock.getInterval(),
+            Optional.of(savedTaskLock.getVersion())
+        );
+        boolean didAcquireLock = false;
+        if (acquiredTaskLock.isPresent() && savedTaskLock.getVersion().equals(acquiredTaskLock.get().getVersion())) {
+          taskLockCount++;
+          didAcquireLock = true;
+          log.info(
+              "Reacquired lock on interval[%s] version[%s] for task: %s",
+              savedTaskLock.getInterval(),
+              savedTaskLock.getVersion(),
+              task.getId()
+          );
+        } else if (acquiredTaskLock.isPresent()) {
+          taskLockCount++;
+          didAcquireLock = true;
+          log.info(
+              "Could not reacquire lock on interval[%s] version[%s] (got version[%s] instead) for task: %s",
+              savedTaskLock.getInterval(),
+              savedTaskLock.getVersion(),
+              acquiredTaskLock.get().getVersion(),
+              task.getId()
+          );
+        } else {
+          log.info(
+              "Could not reacquire lock on interval[%s] version[%s] for task: %s",
+              savedTaskLock.getInterval(),
+              savedTaskLock.getVersion(),
+              task.getId()
+          );
+        }
+
+        // If the lock needs to be not upgraded, try to upgrade it
+        if (didAcquireLock && savedTaskLock.isUpgraded()) {
+          log.info(
+              "The lock on interval [%s] needs to be upgraded! trying to upgrade it for task [%s]",
+              savedTaskLock.getInterval(),
+              task.getId()
+          );
+          if (setTaskLockCriticalState(task, savedTaskLock.getInterval(), TaskLockCriticalState.UPGRADE)) {
+            log.info(
+                "Upgraded lock on interval [%s] for task: [%s]",
+                savedTaskLock.getInterval(),
+                task.getId()
+            );
+
+          } else {
+            log.error(
+                "WTF?! Could not upgrade lock on interval [%s] for task: [%s]",
+                savedTaskLock.getInterval(),
+                task.getId()
+            );
+          }
+        }
+      }
+
+      log.info(
+          "Synced %,d locks for %,d activeTasks from storage (%,d locks ignored).",
+          taskLockCount,
+          activeTasks.size(),
+          storedLocks.size() - taskLockCount
+      );
+    }
+    finally {
+      giant.unlock();
+    }
+  }
+
+  public TaskLock lock(final Task task, final Interval interval) throws InterruptedException
+  {
+    giant.lock();
+    try {
+      Optional<TaskLock> taskLock;
+      while (!(taskLock = tryLock(task, interval)).isPresent()) {
+        tasksWaitingForLock.add(task);
+        log.info("Task [%s] added to list of tasks waiting for a lock release", task.getId());
+        lockReleaseCondition.await();
+      }
+      return taskLock.get();
+    }
+    finally {
+      tasksWaitingForLock.remove(task);
+      giant.unlock();
+    }
+  }
+
+  public Optional<TaskLock> tryLock(final Task task, final Interval interval)
+  {
+    return tryLock(task, interval, Optional.<String>absent());
+  }
+
+  /**
+   * Attempt to lock a task, without removing it from the queue. Can safely be called multiple times on the same task.
+   * This method will attempt to assign version strings that obey the invariant that every version string is
+   * lexicographically greater than any other version string previously assigned to the same interval. This invariant
+   * is only mostly guaranteed, however; we assume clock monotonicity and we assume that callers specifying
+   * {@code preferredVersion} are doing the right thing.
+   *
+   * @param task             task that wants a lock
+   * @param interval         interval to lock
+   * @param preferredVersion use this version string if one has not yet been assigned
+   *
+   * @return lock version if lock was acquired, absent otherwise
+   *
+   * @throws IllegalStateException if the task is not a valid active task
+   */
+  private Optional<TaskLock> tryLock(final Task task, final Interval interval, final Optional<String> preferredVersion)
+  {
+    giant.lock();
+    final int priority = task.getLockPriority();
+    try {
+      if (!activeTasks.contains(task.getId())) {
+        throw new ISE("Unable to grant lock to inactive Task [%s]", task.getId());
+      }
+      Preconditions.checkArgument(interval.toDurationMillis() > 0, "interval empty");
+      final String dataSource = task.getDataSource();
+      final List<TaskLockPosse> foundPosses = findLockPossesForInterval(dataSource, interval);
+      final TaskLockPosse posseToUse;
+
+      if (foundPosses.size() > 0 && taskLocksAreRevocable(foundPosses, priority)) {
+        for (TaskLockPosse taskLockPosse : foundPosses) {
+          for (String taskId : taskLockPosse.getTaskIds()) {
+            revokeTaskLock(taskId, taskLockPosse.getTaskLock());
+          }
+        }
+
+        // Create new TaskLock and assign it a version.
+        // Assumption: We'll choose a version that is greater than any previously-chosen version for our interval.
+        // (This may not always be true, unfortunately. See the comment in getVersion() method.)
+
+        posseToUse = createNewTaskLockPosse(
+            task.getGroupId(),
+            dataSource,
+            interval,
+            getVersion(preferredVersion),
+            task.getLockPriority()
+        );
+      } else if (foundPosses.size() > 1) {
+        // Too many locks
+        return Optional.absent();
+
+      } else if (foundPosses.size() == 1) {
+
+        // One existing lock -- check if we can add to it.
+        final TaskLockPosse foundPosse = Iterables.getOnlyElement(foundPosses);
+        if (foundPosse.getTaskLock().getInterval().contains(interval)
+            && foundPosse.getTaskLock().getGroupId().equals(task.getGroupId())) {
+          posseToUse = foundPosse;
+        } else {
+          return Optional.absent();
+        }
+
+      } else {
+
+        // No existing locks. We can make a new one.
+        posseToUse = createNewTaskLockPosse(
+            task.getGroupId(),
+            dataSource,
+            interval,
+            getVersion(preferredVersion),
+            task.getLockPriority()
+        );
+      }
+
+      // Add to existing TaskLockPosse, if necessary
+      if (posseToUse.getTaskIds().add(task.getId())) {
+        log.info("Added task[%s] to TaskLock[%s]", task.getId(), posseToUse.getTaskLock().getGroupId());
+
+        // Update task storage facility. If it fails, revoke the lock.
+        try {
+          taskStorage.addLock(task.getId(), posseToUse.getTaskLock());
+          return Optional.of(posseToUse.getTaskLock());
+        }
+        catch (Exception e) {
+          log.makeAlert("Failed to persist lock in storage")
+             .addData("task", task.getId())
+             .addData("dataSource", posseToUse.getTaskLock().getDataSource())
+             .addData("interval", posseToUse.getTaskLock().getInterval())
+             .addData("version", posseToUse.getTaskLock().getVersion())
+             .emit();
+          unlock(task, interval);
+          return Optional.absent();
+        }
+      } else {
+        log.info("Task[%s] already present in TaskLock[%s]", task.getId(), posseToUse.getTaskLock().getGroupId());
+        return Optional.of(posseToUse.getTaskLock());
+      }
+    }
+    finally {
+      giant.unlock();
+    }
+
+  }
+
+  /**
+   * @param taskLockPosses TaskLockPosses to check for conflicting TaskLocks
+   * @param lockPriority   Lock priority to do the check against
+   *
+   * @return true if all existing TaskLocks can br revoked i.e. the lock priority is greater than all existing
+   * TaskLocks priority and there are no upgraded TaskLocks otherwise false
+   */
+  private boolean taskLocksAreRevocable(final List<TaskLockPosse> taskLockPosses, int lockPriority)
+  {
+    for (TaskLockPosse taskLockPosse : taskLockPosses) {
+      if (lockPriority <= taskLockPosse.getTaskLock().getPriority() || taskLockPosse.getTaskLock().isUpgraded()) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Unlocks the given TaskLock
+   */
+  private void revokeTaskLock(String taskId, TaskLock taskLock)
+  {
+    Task task = taskStorage.getTask(taskId).get();
+    log.info(
+        "Revoking task lock [%s] for task [%s]",
+        taskLock,
+        task.getId()
+    );
+    unlock(task, taskLock.getInterval());
+  }
+
+  /*
+   * Given GroupId, DataSource, Interval, Version and Priority create a new TaskLockPosse.Add the TaskLockPosse to the
+   * in-memory data structure keeping track of all the TaskLockPosse sorted by interval for each datasource
+   * */
+  private TaskLockPosse createNewTaskLockPosse(
+      String groupId,
+      String dataSource,
+      Interval interval,
+      String version,
+      Integer priority
+  )
+  {
+    giant.lock();
+    try {
+      if (!running.containsKey(dataSource)) {
+        running.put(dataSource, new TreeMap<Interval, TaskLockPosse>(Comparators.intervalsByStartThenEnd()));
+      }
+      TaskLockPosse taskLockPosse = new TaskLockPosse(
+          new TaskLock(
+              groupId,
+              dataSource,
+              interval,
+              version,
+              priority,
+              false
+          )
+      );
+      running.get(dataSource).put(interval, taskLockPosse);
+      log.info("Created new TaskLockPosse: %s", taskLockPosse);
+      return taskLockPosse;
+    }
+    finally {
+      giant.unlock();
+    }
+  }
+
+  public boolean setTaskLockCriticalState(Task task, Interval interval, TaskLockCriticalState taskLockCriticalState)
+  {
+    giant.lock();
+    final String taskId = task.getId();
+    try {
+      // If no higher priority task has arrived then there should still be a valid lock present inside TaskLockbox
+      List<TaskLockPosse> taskLockPosses = findLockPossesForTask(task);
+      List<TaskLockPosse> taskLockPossesToSetList = new ArrayList<>();
+      for (TaskLockPosse taskLockPosse : taskLockPosses) {
+        // Second check is not really required as a task operates on a single datasource but anyways
+        if (taskLockPosse.getTaskLock().getInterval().contains(interval) &&
+            taskLockPosse.getTaskLock().getDataSource().equals(task.getDataSource())) {
+          taskLockPossesToSetList.add(taskLockPosse);
+        }
+      }
+      // There should only be one TaskLockPosse corresponding to a task and an interval
+      TaskLockPosse taskLockPosseToSet = Iterables.getOnlyElement(taskLockPossesToSetList);
+
+      final TaskLock newTaskLock = taskLockPosseToSet.getTaskLock()
+                                                     .withUpgraded(taskLockCriticalState.getExpectedState());
+      // Change the state of lock in the TaskStorage
+      // Side-effect - this will cause the TaskLockPosse to have the changed TaskLock which is shared by all replicated tasks
+      log.info("Trying to [%s] TaskLock [%s] for Task [%s]",
+               taskLockCriticalState, taskLockPosseToSet.getTaskLock(), taskId
+      );
+
+      // If this is an upgrade request, check if any higher priority task is waiting to acquire lock
+      // If yes then fail the upgrade request else continue
+      if (newTaskLock.isUpgraded()) {
+        Optional<Task> higherPriorityTask = Iterators.tryFind(
+            tasksWaitingForLock.iterator(),
+            new Predicate<Task>()
+            {
+              @Override
+              public boolean apply(@Nullable Task input)
+              {
+                return input != null && input.getLockPriority() > newTaskLock.getPriority();
+              }
+            }
+        );
+        if (higherPriorityTask.isPresent()) {
+          log.warn("Cannot Upgrade ! a higher priority task [%s] found", higherPriorityTask.get().getId());
+          return false;
+        }
+      }
+
+      // This call to setLock is good to have even if the TaskLock is in the expected state
+      // to keep multiple entries of same TaskLock (may be created because of overlord restart) in sync
+      // when their state is Upgraded or Downgraded
+      taskStorage.setLock(taskId, newTaskLock);
+
+      if (taskLockPosseToSet.getTaskLock().isUpgraded() == taskLockCriticalState.getExpectedState()) {
+        log.warn("TaskLock for task [%s] already in [%s] state, this may happen when running replicated tasks.",
+                 taskId, taskLockCriticalState
+        );
+      }
+
+      // TaskStorage didn't threw any exceptions
+      // Setting the taskLock of TaskLockPosse to newTaskLock
+      taskLockPosseToSet.setTaskLock(newTaskLock);
+
+      if (taskLockCriticalState.equals(TaskLockCriticalState.DOWNGRADE)) {
+        lockReleaseCondition.signalAll();
+      }
+      return true;
+    }
+    catch (NoSuchElementException e) {
+      // No locks found
+      log.error(
+          "[%s] Failed! no locks found for task [%s] and interval [%s], "
+          + "Is there any higher priority task running that may have revoked the lock ?",
+          taskLockCriticalState,
+          taskId,
+          interval
+      );
+      return false;
+    }
+    catch (Exception e) {
+      log.makeAlert(String.format("Failed to [%s] lock in storage", taskLockCriticalState))
+         .addData("task", taskId)
+         .addData("dataSource", task.getDataSource())
+         .addData("interval", interval)
+         .addData("exception", e.getMessage())
+         .emit();
+      return false;
+    }
+    finally {
+      giant.unlock();
+    }
+  }
+
+  public List<TaskLock> findLocksForTask(final Task task)
+  {
+    giant.lock();
+
+    try {
+      return Lists.transform(
+          findLockPossesForTask(task), new Function<TaskLockPosse, TaskLock>()
+          {
+            @Override
+            public TaskLock apply(TaskLockPosse taskLockPosse)
+            {
+              return taskLockPosse.getTaskLock();
+            }
+          }
+      );
+    }
+    finally {
+      giant.unlock();
+    }
+  }
+
+  public void unlock(final Task task, final Interval interval)
+  {
+    giant.lock();
+
+    try {
+      final String dataSource = task.getDataSource();
+      final NavigableMap<Interval, TaskLockPosse> dsRunning = running.get(dataSource);
+
+      // So we can alert if activeTasks try to release stuff they don't have
+      boolean removed = false;
+
+      if (dsRunning != null) {
+        final TaskLockPosse taskLockPosse = dsRunning.get(interval);
+        if (taskLockPosse != null) {
+          final TaskLock taskLock = taskLockPosse.getTaskLock();
+
+          // Remove task from live list
+          log.info("Removing task[%s] from TaskLock[%s]", task.getId(), taskLock.getGroupId());
+          removed = taskLockPosse.getTaskIds().remove(task.getId());
+
+          if (taskLockPosse.getTaskIds().isEmpty()) {
+            log.info("TaskLock is now empty: %s", taskLock);
+            running.get(dataSource).remove(taskLock.getInterval());
+          }
+
+          if (running.get(dataSource).size() == 0) {
+            running.remove(dataSource);
+          }
+
+          // Wake up blocking-lock waiters
+          lockReleaseCondition.signalAll();
+
+          // Remove lock from storage. If it cannot be removed, just ignore the failure.
+          try {
+            taskStorage.removeLock(task.getId(), taskLock);
+          }
+          catch (Exception e) {
+            log.makeAlert(e, "Failed to clean up lock from storage")
+               .addData("task", task.getId())
+               .addData("dataSource", taskLock.getDataSource())
+               .addData("interval", taskLock.getInterval())
+               .addData("version", taskLock.getVersion())
+               .emit();
+          }
+        }
+      }
+
+      if (!removed) {
+        log.makeAlert("Lock release without acquire")
+           .addData("task", task.getId())
+           .addData("interval", interval)
+           .emit();
+      }
+    }
+    finally {
+      giant.unlock();
+    }
+  }
+
+  public void remove(final Task task)
+  {
+    giant.lock();
+    try {
+      try {
+        log.info("Removing task[%s] from activeTasks", task.getId());
+        for (final TaskLockPosse taskLockPosse : findLockPossesForTask(task)) {
+          unlock(task, taskLockPosse.getTaskLock().getInterval());
+        }
+      }
+      finally {
+        activeTasks.remove(task.getId());
+      }
+    }
+    finally {
+      giant.unlock();
+    }
+  }
+
+  /**
+   * Return the currently-active lock posses for some task.
+   *
+   * @param task task for which to locate locks
+   */
+  private List<TaskLockPosse> findLockPossesForTask(final Task task)
+  {
+    giant.lock();
+
+    try {
+      final Iterable<TaskLockPosse> searchSpace;
+
+      // Scan through all locks for this datasource
+      final NavigableMap<Interval, TaskLockPosse> dsRunning = running.get(task.getDataSource());
+      if (dsRunning == null) {
+        searchSpace = ImmutableList.of();
+      } else {
+        searchSpace = dsRunning.values();
+      }
+
+      return ImmutableList.copyOf(
+          Iterables.filter(
+              searchSpace, new Predicate<TaskLockPosse>()
+              {
+                @Override
+                public boolean apply(TaskLockPosse taskLock)
+                {
+                  return taskLock.getTaskIds().contains(task.getId());
+                }
+              }
+          )
+      );
+    }
+    finally {
+      giant.unlock();
+    }
+  }
+
+  /**
+   * Return all locks that overlap some search interval.
+   */
+  private List<TaskLockPosse> findLockPossesForInterval(final String dataSource, final Interval interval)
+  {
+    giant.lock();
+
+    try {
+      final NavigableMap<Interval, TaskLockPosse> dsRunning = running.get(dataSource);
+      if (dsRunning == null) {
+        // No locks at all
+        return Collections.emptyList();
+      } else {
+        // Tasks are indexed by locked interval, which are sorted by interval start. Intervals are non-overlapping, so:
+        final NavigableSet<Interval> dsLockbox = dsRunning.navigableKeySet();
+        final Iterable<Interval> searchIntervals = Iterables.concat(
+            // Single interval that starts at or before ours
+            Collections.singletonList(dsLockbox.floor(new Interval(
+                interval.getStart(),
+                new DateTime(JodaUtils.MAX_INSTANT)
+            ))),
+
+            // All intervals that start somewhere between our start instant (exclusive) and end instant (exclusive)
+            dsLockbox.subSet(
+                new Interval(interval.getStart(), new DateTime(JodaUtils.MAX_INSTANT)),
+                false,
+                new Interval(interval.getEnd(), interval.getEnd()),
+                false
+            )
+        );
+
+        return Lists.newArrayList(
+            FunctionalIterable
+                .create(searchIntervals)
+                .filter(
+                    new Predicate<Interval>()
+                    {
+                      @Override
+                      public boolean apply(@Nullable Interval searchInterval)
+                      {
+                        return searchInterval != null && searchInterval.overlaps(interval);
+                      }
+                    }
+                )
+                .transform(
+                    new Function<Interval, TaskLockPosse>()
+                    {
+                      @Override
+                      public TaskLockPosse apply(Interval interval)
+                      {
+                        return dsRunning.get(interval);
+                      }
+                    }
+                )
+        );
+      }
+    }
+    finally {
+      giant.unlock();
+    }
+  }
+
+  public void add(Task task)
+  {
+    giant.lock();
+    try {
+      log.info("Adding task[%s] to activeTasks", task.getId());
+      activeTasks.add(task.getId());
+    }
+    finally {
+      giant.unlock();
+    }
+  }
+
+  private String getVersion(Optional<String> preferredVersion)
+  {
+
+    final String version;
+
+    if (preferredVersion.isPresent()) {
+      // We have a preferred version. We'll trust our caller to not break our ordering assumptions and just use it.
+      version = preferredVersion.get();
+    } else {
+      // We are running under an interval lock right now, so just using the current time works as long as we can trust
+      // our clock to be monotonic and have enough resolution since the last time we created a TaskLock for the same
+      // interval. This may not always be true; to assure it we would need to use some method of timekeeping other
+      // than the wall clock.
+      version = new DateTime().toString();
+    }
+
+    return version;
+  }
+
+  private static class TaskLockPosse
+  {
+    private volatile TaskLock taskLock;
+    final private Set<String> taskIds;
+
+    public TaskLockPosse(TaskLock taskLock)
+    {
+      this.taskLock = taskLock;
+      taskIds = Sets.newHashSet();
+    }
+
+    public TaskLock getTaskLock()
+    {
+      return taskLock;
+    }
+
+    public void setTaskLock(TaskLock taskLock)
+    {
+      this.taskLock = taskLock;
+    }
+
+    public Set<String> getTaskIds()
+    {
+      return taskIds;
+    }
+
+    @Override
+    public String toString()
+    {
+      return Objects.toStringHelper(this)
+                    .add("taskLock", taskLock)
+                    .add("taskIds", taskIds)
+                    .toString();
+    }
+  }
+}
diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskStorage.java b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskStorage.java
index dc5986092e0..cdd069f18f8 100644
--- a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskStorage.java
+++ b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskStorage.java
@@ -54,6 +54,14 @@
    */
   public void addLock(String taskid, TaskLock taskLock);
 
+  /**
+   * Sets the lock state in the storage facility to {@param taskLock} for task with id {@param taskid}
+   * @param taskid task ID
+   * @param taskLock taskLock to set for the give {@param taskid}
+   * @throws com.metamx.common.ISE if there is no TaskLock with isUpgraded field set to opposite of what is in {@param taskLock}
+   */
+  public void setLock(String taskid, TaskLock taskLock);
+
   /**
    * Removes lock state from the storage facility. It is harmless to keep old locks in the storage facility, but
    * this method can help reclaim wasted space.
diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/ThreadPoolTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/ThreadPoolTaskRunner.java
index d6e6e20f05f..dcca7357534 100644
--- a/indexing-service/src/main/java/io/druid/indexing/overlord/ThreadPoolTaskRunner.java
+++ b/indexing-service/src/main/java/io/druid/indexing/overlord/ThreadPoolTaskRunner.java
@@ -19,6 +19,7 @@
 
 package io.druid.indexing.overlord;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
@@ -85,6 +86,7 @@
   private final TaskLocation location;
 
   private volatile boolean stopping = false;
+  private final int numThreads;
 
   @Inject
   public ThreadPoolTaskRunner(
@@ -93,9 +95,23 @@ public ThreadPoolTaskRunner(
       ServiceEmitter emitter,
       @Self DruidNode node
   )
+  {
+    this(toolboxFactory, taskConfig, emitter, 1, node);
+  }
+
+  // This constructor is created so that it is easy to create multi-threaded ThreadPoolTaskRunner for testing purposes
+  @VisibleForTesting
+  public ThreadPoolTaskRunner(
+      TaskToolboxFactory toolboxFactory,
+      TaskConfig taskConfig,
+      ServiceEmitter emitter,
+      int numThreads,
+      DruidNode node
+  )
   {
     this.toolboxFactory = Preconditions.checkNotNull(toolboxFactory, "toolboxFactory");
     this.taskConfig = taskConfig;
+    this.numThreads = numThreads;
     this.emitter = Preconditions.checkNotNull(emitter, "emitter");
     this.location = TaskLocation.create(node.getHost(), node.getPort());
   }
@@ -139,12 +155,18 @@ public void unregisterListener(String listenerId)
     }
   }
 
-  private static ListeningExecutorService buildExecutorService(int priority)
+  private ListeningExecutorService buildExecutorService(int priority)
   {
     return MoreExecutors.listeningDecorator(
+        numThreads == 1 ?
         Execs.singleThreaded(
             "task-runner-%d-priority-" + priority,
             TaskThreadPriority.getThreadPriorityFromTaskPriority(priority)
+        ) :
+        Execs.multiThreaded(
+            numThreads,
+            "task-runner-%d-priority-" + priority,
+            TaskThreadPriority.getThreadPriorityFromTaskPriority(priority)
         )
     );
   }
diff --git a/indexing-service/src/test/java/io/druid/indexing/common/TaskActionSerdeTest.java b/indexing-service/src/test/java/io/druid/indexing/common/TaskActionSerdeTest.java
new file mode 100644
index 00000000000..f590277d108
--- /dev/null
+++ b/indexing-service/src/test/java/io/druid/indexing/common/TaskActionSerdeTest.java
@@ -0,0 +1,56 @@
+/*
+* Licensed to Metamarkets Group Inc. (Metamarkets) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. Metamarkets licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package io.druid.indexing.common;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.druid.indexing.common.actions.SetLockCriticalStateAction;
+import io.druid.indexing.common.actions.TaskLockCriticalState;
+import io.druid.jackson.DefaultObjectMapper;
+import org.joda.time.Interval;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+public class TaskActionSerdeTest
+{
+  private ObjectMapper jsonMapper;
+  @Before
+  public void setUp(){
+    jsonMapper = new DefaultObjectMapper();
+  }
+
+  @Test
+  public void testSetLockCriticalStateActionDowngrade() throws IOException{
+    SetLockCriticalStateAction setLockCriticalStateAction = new SetLockCriticalStateAction(new Interval("2000/2020"), TaskLockCriticalState.DOWNGRADE);
+    SetLockCriticalStateAction testSetLockCriticalStateAction = jsonMapper.readValue(jsonMapper.writeValueAsString(
+        setLockCriticalStateAction), SetLockCriticalStateAction.class);
+    Assert.assertEquals(setLockCriticalStateAction, testSetLockCriticalStateAction);
+  }
+
+  @Test
+  public void testSetLockCriticalStateActionUpgrade() throws IOException{
+    SetLockCriticalStateAction setLockCriticalStateAction = new SetLockCriticalStateAction(new Interval("2000/2020"), TaskLockCriticalState.UPGRADE);
+    SetLockCriticalStateAction testSetLockCriticalStateAction = jsonMapper.readValue(jsonMapper.writeValueAsString(
+        setLockCriticalStateAction), SetLockCriticalStateAction.class);
+    Assert.assertEquals(setLockCriticalStateAction, testSetLockCriticalStateAction);
+  }
+}
diff --git a/indexing-service/src/test/java/io/druid/indexing/common/TaskLockSerdeTest.java b/indexing-service/src/test/java/io/druid/indexing/common/TaskLockSerdeTest.java
new file mode 100644
index 00000000000..2db97fea9fc
--- /dev/null
+++ b/indexing-service/src/test/java/io/druid/indexing/common/TaskLockSerdeTest.java
@@ -0,0 +1,60 @@
+/*
+* Licensed to Metamarkets Group Inc. (Metamarkets) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. Metamarkets licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package io.druid.indexing.common;
+
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.druid.jackson.DefaultObjectMapper;
+import org.joda.time.Interval;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+public class TaskLockSerdeTest
+{
+
+  private ObjectMapper jsonMapper;
+  @Before
+  public void setUp(){
+    jsonMapper = new DefaultObjectMapper();
+  }
+
+  @Test
+  public void testUpgradedTaskLockSerde() throws IOException{
+    String taskLockString = "{\"groupId\":\"group1\",\"dataSource\":\"DS\","
+                            + "\"interval\":\"2015-07-01T00:00:00.000Z/2015-07-03T00:00:00.000Z\","
+                            + "\"version\":\"2015-08-31T16:38:41.661Z\",\"priority\":0, \"upgraded\": true}";
+    TaskLock taskLock = new TaskLock("group1", "DS", new Interval("2015-07-01T00:00:00.000Z/2015-07-03T00:00:00.000Z"), "2015-08-31T16:38:41.661Z", 0, true);
+    TaskLock taskLockSerde = jsonMapper.readValue(taskLockString, TaskLock.class);
+    Assert.assertEquals(taskLock, taskLockSerde);
+  }
+
+  @Test
+  public void testBasicTaskLockSerde() throws IOException{
+    String taskLockString = "{\"groupId\":\"group1\",\"dataSource\":\"DS\","
+                            + "\"interval\":\"2015-07-01T00:00:00.000Z/2015-07-03T00:00:00.000Z\","
+                            + "\"version\":\"2015-08-31T16:38:41.661Z\", \"priority\":34}";
+    TaskLock taskLock = new TaskLock("group1", "DS", new Interval("2015-07-01T00:00:00.000Z/2015-07-03T00:00:00.000Z"), "2015-08-31T16:38:41.661Z", 34, false);
+    TaskLock taskLockSerde = jsonMapper.readValue(taskLockString, TaskLock.class);
+    Assert.assertEquals(taskLock, taskLockSerde);
+  }
+}
diff --git a/indexing-service/src/test/java/io/druid/indexing/common/actions/RemoteTaskActionClientTest.java b/indexing-service/src/test/java/io/druid/indexing/common/actions/RemoteTaskActionClientTest.java
index 6f3034a4793..35dc0003288 100644
--- a/indexing-service/src/test/java/io/druid/indexing/common/actions/RemoteTaskActionClientTest.java
+++ b/indexing-service/src/test/java/io/druid/indexing/common/actions/RemoteTaskActionClientTest.java
@@ -19,11 +19,13 @@
 
 package io.druid.indexing.common.actions;
 
-import static org.easymock.EasyMock.anyObject;
-import static org.easymock.EasyMock.createMock;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.replay;
-
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.util.concurrent.Futures;
+import com.metamx.http.client.HttpClient;
+import com.metamx.http.client.Request;
+import com.metamx.http.client.response.StatusResponseHandler;
+import com.metamx.http.client.response.StatusResponseHolder;
 import io.druid.client.selector.Server;
 import io.druid.curator.discovery.ServerDiscoverySelector;
 import io.druid.indexing.common.RetryPolicyConfig;
@@ -32,13 +34,6 @@
 import io.druid.indexing.common.task.NoopTask;
 import io.druid.indexing.common.task.Task;
 import io.druid.jackson.DefaultObjectMapper;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
 import org.easymock.EasyMock;
 import org.jboss.netty.handler.codec.http.HttpResponseStatus;
 import org.joda.time.Interval;
@@ -46,13 +41,16 @@
 import org.junit.Before;
 import org.junit.Test;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.util.concurrent.Futures;
-import com.metamx.http.client.HttpClient;
-import com.metamx.http.client.Request;
-import com.metamx.http.client.response.StatusResponseHandler;
-import com.metamx.http.client.response.StatusResponseHolder;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
 
 public class RemoteTaskActionClientTest
 {
@@ -99,7 +97,8 @@ public String getAddress()
 
     long now = System.currentTimeMillis();
 
-    result = Arrays.asList(new TaskLock("groupId", "dataSource", new Interval(now - 30 * 1000, now), "version"));
+    result = Arrays.asList(new TaskLock("groupId", "dataSource", new Interval(now - 30 * 1000, now), "version",
+                                        0, false));
   }
 
   @Test
diff --git a/indexing-service/src/test/java/io/druid/indexing/common/actions/SegmentInsertActionTest.java b/indexing-service/src/test/java/io/druid/indexing/common/actions/SegmentInsertActionTest.java
index f189356d0df..8075ce36f55 100644
--- a/indexing-service/src/test/java/io/druid/indexing/common/actions/SegmentInsertActionTest.java
+++ b/indexing-service/src/test/java/io/druid/indexing/common/actions/SegmentInsertActionTest.java
@@ -91,6 +91,7 @@ public void testSimple() throws Exception
     final SegmentInsertAction action = new SegmentInsertAction(ImmutableSet.of(SEGMENT1, SEGMENT2));
     actionTestKit.getTaskLockbox().add(task);
     actionTestKit.getTaskLockbox().lock(task, new Interval(INTERVAL));
+    actionTestKit.getTaskLockbox().setTaskLockCriticalState(task, new Interval(INTERVAL), TaskLockCriticalState.UPGRADE);
     action.perform(task, actionTestKit.getTaskActionToolbox());
 
     Assert.assertEquals(
@@ -109,9 +110,9 @@ public void testFailBadVersion() throws Exception
     final SegmentInsertAction action = new SegmentInsertAction(ImmutableSet.of(SEGMENT3));
     actionTestKit.getTaskLockbox().add(task);
     actionTestKit.getTaskLockbox().lock(task, new Interval(INTERVAL));
-
+    actionTestKit.getTaskLockbox().setTaskLockCriticalState(task, new Interval(INTERVAL), TaskLockCriticalState.UPGRADE);
     thrown.expect(IllegalStateException.class);
-    thrown.expectMessage(CoreMatchers.startsWith("Segments not covered by locks for task"));
+    thrown.expectMessage(CoreMatchers.startsWith("Segments not covered by upgraded locks for task"));
     final Set<DataSegment> segments = action.perform(task, actionTestKit.getTaskActionToolbox());
     Assert.assertEquals(ImmutableSet.of(SEGMENT3), segments);
   }
diff --git a/indexing-service/src/test/java/io/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java b/indexing-service/src/test/java/io/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java
index c52daae392b..d87d321c465 100644
--- a/indexing-service/src/test/java/io/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java
+++ b/indexing-service/src/test/java/io/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java
@@ -90,6 +90,7 @@ public void testTransactional() throws Exception
     final Task task = new NoopTask(null, 0, 0, null, null, null);
     actionTestKit.getTaskLockbox().add(task);
     actionTestKit.getTaskLockbox().lock(task, new Interval(INTERVAL));
+    actionTestKit.getTaskLockbox().setTaskLockCriticalState(task, new Interval(INTERVAL), TaskLockCriticalState.UPGRADE);
 
     SegmentPublishResult result1 = new SegmentTransactionalInsertAction(
         ImmutableSet.of(SEGMENT1),
@@ -131,6 +132,7 @@ public void testFailTransactional() throws Exception
     final Task task = new NoopTask(null, 0, 0, null, null, null);
     actionTestKit.getTaskLockbox().add(task);
     actionTestKit.getTaskLockbox().lock(task, new Interval(INTERVAL));
+    actionTestKit.getTaskLockbox().setTaskLockCriticalState(task, new Interval(INTERVAL), TaskLockCriticalState.UPGRADE);
 
     SegmentPublishResult result = new SegmentTransactionalInsertAction(
         ImmutableSet.of(SEGMENT1),
@@ -151,9 +153,10 @@ public void testFailBadVersion() throws Exception
     final SegmentTransactionalInsertAction action = new SegmentTransactionalInsertAction(ImmutableSet.of(SEGMENT3));
     actionTestKit.getTaskLockbox().add(task);
     actionTestKit.getTaskLockbox().lock(task, new Interval(INTERVAL));
+    actionTestKit.getTaskLockbox().setTaskLockCriticalState(task, new Interval(INTERVAL), TaskLockCriticalState.UPGRADE);
 
     thrown.expect(IllegalStateException.class);
-    thrown.expectMessage(CoreMatchers.startsWith("Segments not covered by locks for task"));
+    thrown.expectMessage(CoreMatchers.startsWith("Segments not covered by upgraded locks for task"));
     SegmentPublishResult result = action.perform(task, actionTestKit.getTaskActionToolbox());
     Assert.assertEquals(new SegmentPublishResult(ImmutableSet.of(SEGMENT3), true), result);
   }
diff --git a/indexing-service/src/test/java/io/druid/indexing/common/actions/TaskActionTestKit.java b/indexing-service/src/test/java/io/druid/indexing/common/actions/TaskActionTestKit.java
index 0178ecfaef5..257cd053133 100644
--- a/indexing-service/src/test/java/io/druid/indexing/common/actions/TaskActionTestKit.java
+++ b/indexing-service/src/test/java/io/druid/indexing/common/actions/TaskActionTestKit.java
@@ -25,6 +25,7 @@
 import io.druid.indexing.overlord.HeapMemoryTaskStorage;
 import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
 import io.druid.indexing.overlord.TaskLockbox;
+import io.druid.indexing.overlord.TaskLockboxV1;
 import io.druid.indexing.overlord.TaskStorage;
 import io.druid.metadata.IndexerSQLMetadataStorageCoordinator;
 import io.druid.metadata.MetadataStorageConnectorConfig;
@@ -78,7 +79,7 @@ public TaskActionToolbox getTaskActionToolbox()
   public void before()
   {
     taskStorage = new HeapMemoryTaskStorage(new TaskStorageConfig(new Period("PT24H")));
-    taskLockbox = new TaskLockbox(taskStorage);
+    taskLockbox = new TaskLockboxV1(taskStorage);
     testDerbyConnector = new TestDerbyConnector(
         Suppliers.ofInstance(new MetadataStorageConnectorConfig()),
         Suppliers.ofInstance(metadataStorageTablesConfig)
diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java
index 2bf321d8570..6bb74814e13 100644
--- a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java
+++ b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java
@@ -32,6 +32,7 @@
 import io.druid.indexing.common.TaskToolbox;
 import io.druid.indexing.common.TestUtils;
 import io.druid.indexing.common.actions.LockListAction;
+import io.druid.indexing.common.actions.SetLockCriticalStateAction;
 import io.druid.indexing.common.actions.TaskAction;
 import io.druid.indexing.common.actions.TaskActionClient;
 import io.druid.query.aggregation.AggregatorFactory;
@@ -228,21 +229,23 @@ public void testWithArbitraryGranularity() throws Exception
 
     indexTask.run(
         new TaskToolbox(
-            null, null, new TaskActionClient()
-        {
-          @Override
-          public <RetType> RetType submit(TaskAction<RetType> taskAction) throws IOException
-          {
-            if (taskAction instanceof LockListAction) {
-              return (RetType) Arrays.asList(
-                  new TaskLock(
-                      "", "", null, new DateTime().toString()
-                  )
-              );
-            }
-            return null;
-          }
-        }, null, new DataSegmentPusher()
+            null, indexTask, new TaskActionClient()
+            {
+              @Override
+              public <RetType> RetType submit(TaskAction<RetType> taskAction) throws IOException
+              {
+                if (taskAction instanceof LockListAction) {
+                  return (RetType) Arrays.asList(
+                      new TaskLock(
+                          "", "", null, new DateTime().toString(), indexTask.getLockPriority(), false
+                      )
+                  );
+                } else if (taskAction instanceof SetLockCriticalStateAction) {
+                  return (RetType) new Boolean(true);
+                }
+                return null;
+              }
+            }, null, new DataSegmentPusher()
         {
           @Deprecated
           @Override
diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java
index f71327daaee..1f473dacbbc 100644
--- a/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java
+++ b/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java
@@ -64,6 +64,8 @@
 import io.druid.indexing.overlord.HeapMemoryTaskStorage;
 import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
 import io.druid.indexing.overlord.TaskLockbox;
+import io.druid.indexing.overlord.TaskLockboxV1;
+import io.druid.indexing.overlord.TaskLockboxV2;
 import io.druid.indexing.overlord.TaskStorage;
 import io.druid.indexing.test.TestDataSegmentAnnouncer;
 import io.druid.indexing.test.TestDataSegmentKiller;
@@ -236,19 +238,24 @@ public Firehose connect(InputRowParser parser) throws IOException, ParseExceptio
   private DateTime now;
   private ListeningExecutorService taskExec;
   private Map<SegmentDescriptor, Pair<Executor, Runnable>> handOffCallbacks;
+  private final String taskLockboxVersion;
 
-  @Parameterized.Parameters(name = "buildV9Directly = {0}")
+  private static final String TASKLOCKBOX_V1 = "v1";
+  private static final String TASKLOCKBOX_V2 = "v2";
+
+  @Parameterized.Parameters(name = "buildV9Directly = {0}, taskLockBoxVersion={1}")
   public static Collection<?> constructorFeeder() throws IOException
   {
     return ImmutableList.of(
-        new Object[]{true},
-        new Object[]{false}
+        new Object[]{true, TASKLOCKBOX_V1},
+        new Object[]{false, TASKLOCKBOX_V2}
     );
   }
 
-  public RealtimeIndexTaskTest(boolean buildV9Directly)
+  public RealtimeIndexTaskTest(boolean buildV9Directly, String taskLockboxVersion)
   {
     this.buildV9Directly = buildV9Directly;
+    this.taskLockboxVersion = taskLockboxVersion;
   }
 
   @Before
@@ -949,7 +956,12 @@ private TaskToolbox makeToolbox(
   )
   {
     final TaskConfig taskConfig = new TaskConfig(directory.getPath(), null, null, 50000, null, false, null, null);
-    final TaskLockbox taskLockbox = new TaskLockbox(taskStorage);
+    final TaskLockbox taskLockbox;
+    if(taskLockboxVersion.equals(TASKLOCKBOX_V2)) {
+      taskLockbox = new TaskLockboxV2(taskStorage);
+    } else {
+      taskLockbox = new TaskLockboxV1(taskStorage);
+    }
     try {
       taskStorage.insert(task, TaskStatus.running(task.getId()));
     }
diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java
index ab06fa4dcf5..a9c66e70ead 100644
--- a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java
+++ b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java
@@ -111,6 +111,7 @@ public void testIndexTaskSerde() throws Exception
     Assert.assertEquals(task.getInterval(), task2.getInterval());
     Assert.assertTrue(task.getIngestionSchema().getIOConfig().getFirehoseFactory() instanceof LocalFirehoseFactory);
     Assert.assertTrue(task2.getIngestionSchema().getIOConfig().getFirehoseFactory() instanceof LocalFirehoseFactory);
+    Assert.assertEquals(AbstractTask.INDEX_TASK_PRIORITY, task2.getLockPriority());
   }
 
   @Test
@@ -209,6 +210,7 @@ public void testMergeTaskSerde() throws Exception
     Assert.assertEquals(new Interval("2010-01-01/P1D"), task3.getInterval());
     Assert.assertEquals(segments, task3.getSegments());
     Assert.assertEquals(aggregators, task3.getAggregators());
+    Assert.assertEquals(AbstractTask.MERGE_TASK_PRIORITY, task2.getLockPriority());
   }
 
   @Test
@@ -241,6 +243,7 @@ public void testKillTaskSerde() throws Exception
 
     Assert.assertEquals("foo", task3.getDataSource());
     Assert.assertEquals(new Interval("2010-01-01/P1D"), task3.getInterval());
+    Assert.assertEquals(AbstractTask.DEFAULT_TASK_PRIORITY, task2.getLockPriority());
   }
 
   @Test
@@ -267,6 +270,7 @@ public void testVersionConverterTaskSerde() throws Exception
     Assert.assertEquals(task.getDataSource(), task2.getDataSource());
     Assert.assertEquals(task.getInterval(), task2.getInterval());
     Assert.assertEquals(task.getSegment(), task.getSegment());
+    Assert.assertEquals(AbstractTask.DEFAULT_TASK_PRIORITY, task2.getLockPriority());
   }
 
   @Test
@@ -293,6 +297,7 @@ public void testVersionConverterSubTaskSerde() throws Exception
     Assert.assertEquals(task.getGroupId(), task2.getGroupId());
     Assert.assertEquals(task.getDataSource(), task2.getDataSource());
     Assert.assertEquals(task.getSegment(), task2.getSegment());
+    Assert.assertEquals(AbstractTask.DEFAULT_TASK_PRIORITY, task2.getLockPriority());
   }
 
   @Test
@@ -376,6 +381,7 @@ public Plumber findPlumber(
         task.getRealtimeIngestionSchema().getDataSchema().getGranularitySpec().getSegmentGranularity(),
         task2.getRealtimeIngestionSchema().getDataSchema().getGranularitySpec().getSegmentGranularity()
     );
+    Assert.assertEquals(AbstractTask.REALTIME_TASK_PRIORITY, task2.getLockPriority());
   }
 
   @Test
@@ -427,6 +433,7 @@ public void testAppendTaskSerde() throws Exception
     Assert.assertEquals(new Interval("2010-01-01/P2D"), task3.getInterval());
     Assert.assertEquals(task3.getSegments(), segments);
     Assert.assertEquals(task.getAggregators(), task2.getAggregators());
+    Assert.assertEquals(AbstractTask.MERGE_TASK_PRIORITY, task2.getLockPriority());
   }
 
   @Test
@@ -451,6 +458,7 @@ public void testArchiveTaskSerde() throws Exception
     Assert.assertEquals(task.getGroupId(), task2.getGroupId());
     Assert.assertEquals(task.getDataSource(), task2.getDataSource());
     Assert.assertEquals(task.getInterval(), task2.getInterval());
+    Assert.assertEquals(AbstractTask.DEFAULT_TASK_PRIORITY, task2.getLockPriority());
   }
 
   @Test
@@ -475,6 +483,7 @@ public void testRestoreTaskSerde() throws Exception
     Assert.assertEquals(task.getGroupId(), task2.getGroupId());
     Assert.assertEquals(task.getDataSource(), task2.getDataSource());
     Assert.assertEquals(task.getInterval(), task2.getInterval());
+    Assert.assertEquals(AbstractTask.DEFAULT_TASK_PRIORITY, task2.getLockPriority());
   }
 
   @Test
@@ -500,6 +509,7 @@ public void testSegmentConvetSerdeReflection() throws IOException
     final String json = jsonMapper.writeValueAsString(task);
     final ConvertSegmentTask taskFromJson = jsonMapper.readValue(json, ConvertSegmentTask.class);
     Assert.assertEquals(json, jsonMapper.writeValueAsString(taskFromJson));
+    Assert.assertEquals(AbstractTask.DEFAULT_TASK_PRIORITY, taskFromJson.getLockPriority());
   }
 
   @Test
@@ -546,6 +556,7 @@ public void testSegmentConvertSerde() throws IOException
     );
     Assert.assertEquals(false, convertSegmentTask.isForce());
     Assert.assertEquals(segment, convertSegmentTask.getSegment());
+    Assert.assertEquals(AbstractTask.DEFAULT_TASK_PRIORITY, convertSegmentTask.getLockPriority());
   }
 
   @Test
@@ -574,6 +585,7 @@ public void testMoveTaskSerde() throws Exception
     Assert.assertEquals(task.getDataSource(), task2.getDataSource());
     Assert.assertEquals(task.getInterval(), task2.getInterval());
     Assert.assertEquals(task.getTargetLoadSpec(), task2.getTargetLoadSpec());
+    Assert.assertEquals(AbstractTask.DEFAULT_TASK_PRIORITY, task2.getLockPriority());
   }
 
   @Test
@@ -613,5 +625,6 @@ public void testHadoopIndexTaskSerde() throws Exception
     );
     Assert.assertEquals("blah", task.getClasspathPrefix());
     Assert.assertEquals("blah", task2.getClasspathPrefix());
+    Assert.assertEquals(AbstractTask.INDEX_TASK_PRIORITY, task2.getLockPriority());
   }
 }
diff --git a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java
index 5fd3776a922..818cbfd2cf8 100644
--- a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java
+++ b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java
@@ -57,6 +57,7 @@
 import io.druid.indexing.common.config.TaskStorageConfig;
 import io.druid.indexing.overlord.HeapMemoryTaskStorage;
 import io.druid.indexing.overlord.TaskLockbox;
+import io.druid.indexing.overlord.TaskLockboxV1;
 import io.druid.metadata.IndexerSQLMetadataStorageCoordinator;
 import io.druid.query.aggregation.AggregatorFactory;
 import io.druid.query.aggregation.DoubleSumAggregatorFactory;
@@ -157,7 +158,7 @@
     }
     INDEX_MERGER.persist(index, persistDir, indexSpec);
 
-    final TaskLockbox tl = new TaskLockbox(ts);
+    final TaskLockbox tl = new TaskLockboxV1(ts);
     final IndexerSQLMetadataStorageCoordinator mdc = new IndexerSQLMetadataStorageCoordinator(null, null, null)
     {
       final private Set<DataSegment> published = Sets.newHashSet();
diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/RealtimeishTask.java b/indexing-service/src/test/java/io/druid/indexing/overlord/RealtimeishTask.java
index 3761ad05110..214744dd248 100644
--- a/indexing-service/src/test/java/io/druid/indexing/overlord/RealtimeishTask.java
+++ b/indexing-service/src/test/java/io/druid/indexing/overlord/RealtimeishTask.java
@@ -27,8 +27,10 @@
 import io.druid.indexing.common.actions.LockAcquireAction;
 import io.druid.indexing.common.actions.LockListAction;
 import io.druid.indexing.common.actions.LockReleaseAction;
+import io.druid.indexing.common.actions.SetLockCriticalStateAction;
 import io.druid.indexing.common.actions.SegmentInsertAction;
 import io.druid.indexing.common.actions.TaskActionClient;
+import io.druid.indexing.common.actions.TaskLockCriticalState;
 import io.druid.indexing.common.task.AbstractTask;
 import io.druid.indexing.common.task.TaskResource;
 import io.druid.timeline.DataSegment;
@@ -87,6 +89,9 @@ public TaskStatus run(TaskToolbox toolbox) throws Exception
     Assert.assertEquals("lock2 interval", interval2, lock2.getInterval());
     Assert.assertEquals("locks2", ImmutableList.of(lock1, lock2), locks2);
 
+    // Upgrade the lock to exclusive lock for first interval
+    toolbox.getTaskActionClient().submit(new SetLockCriticalStateAction(interval1, TaskLockCriticalState.UPGRADE));
+
     // Push first segment
     toolbox.getTaskActionClient()
            .submit(
@@ -108,6 +113,9 @@ public TaskStatus run(TaskToolbox toolbox) throws Exception
     // (Confirm lock sanity)
     Assert.assertEquals("locks3", ImmutableList.of(lock2), locks3);
 
+    // Upgrade lock for second interval
+    toolbox.getTaskActionClient().submit(new SetLockCriticalStateAction(interval2, TaskLockCriticalState.UPGRADE));
+
     // Push second segment
     toolbox.getTaskActionClient()
            .submit(
diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java
index d85e586d8af..25ff6f081e3 100644
--- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java
+++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java
@@ -34,6 +34,9 @@
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Ordering;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.metamx.common.Granularity;
 import com.metamx.common.ISE;
@@ -44,6 +47,7 @@
 import com.metamx.metrics.Monitor;
 import com.metamx.metrics.MonitorScheduler;
 import io.druid.client.cache.MapCache;
+import io.druid.concurrent.Execs;
 import io.druid.data.input.Firehose;
 import io.druid.data.input.FirehoseFactory;
 import io.druid.data.input.InputRow;
@@ -59,8 +63,10 @@
 import io.druid.indexing.common.actions.LocalTaskActionClientFactory;
 import io.druid.indexing.common.actions.LockListAction;
 import io.druid.indexing.common.actions.SegmentInsertAction;
+import io.druid.indexing.common.actions.SetLockCriticalStateAction;
 import io.druid.indexing.common.actions.TaskActionClientFactory;
 import io.druid.indexing.common.actions.TaskActionToolbox;
+import io.druid.indexing.common.actions.TaskLockCriticalState;
 import io.druid.indexing.common.config.TaskConfig;
 import io.druid.indexing.common.config.TaskStorageConfig;
 import io.druid.indexing.common.task.AbstractFixedIntervalTask;
@@ -126,6 +132,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
 
@@ -148,16 +155,24 @@
 
   private static final String HEAP_TASK_STORAGE = "HeapMemoryTaskStorage";
   private static final String METADATA_TASK_STORAGE = "MetadataTaskStorage";
+  private static final String TASKLOCKBOX_V1 = "v1";
+  private static final String TASKLOCKBOX_V2 = "v2";
 
-  @Parameterized.Parameters(name = "taskStorageType={0}")
+  @Parameterized.Parameters(name = "taskStorageType={0}, taskLockBoxVersion={1}")
   public static Collection<String[]> constructFeed()
   {
-    return Arrays.asList(new String[][]{{HEAP_TASK_STORAGE}, {METADATA_TASK_STORAGE}});
+    return Arrays.asList(new String[][]{
+        {HEAP_TASK_STORAGE, TASKLOCKBOX_V1},
+        {METADATA_TASK_STORAGE, TASKLOCKBOX_V1},
+        {HEAP_TASK_STORAGE, TASKLOCKBOX_V2},
+        {METADATA_TASK_STORAGE, TASKLOCKBOX_V2}
+    });
   }
 
-  public TaskLifecycleTest(String taskStorageType)
+  public TaskLifecycleTest(String taskStorageType, String taskLockboxVersion)
   {
     this.taskStorageType = taskStorageType;
+    this.taskLockboxVersion = taskLockboxVersion;
   }
 
   public final
@@ -190,8 +205,6 @@ public int compare(DataSegment dataSegment, DataSegment dataSegment2)
   @Rule
   public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new TestDerbyConnector.DerbyConnectorRule();
 
-  private final String taskStorageType;
-
   private ObjectMapper mapper;
   private TaskStorageQueryAdapter tsqa = null;
   private TaskStorage taskStorage = null;
@@ -209,12 +222,13 @@ public int compare(DataSegment dataSegment, DataSegment dataSegment2)
   private TaskConfig taskConfig;
   private DataSegmentPusher dataSegmentPusher;
 
+  private final String taskStorageType;
+  private final String taskLockboxVersion;
   private int pushedSegments;
   private int announcedSinks;
   private SegmentHandoffNotifierFactory handoffNotifierFactory;
   private Map<SegmentDescriptor, Pair<Executor, Runnable>> handOffCallbacks;
-
-  private static CountDownLatch publishCountDown;
+  private CountDownLatch publishCountDown;
 
   private static ServiceEmitter newMockEmitter()
   {
@@ -339,6 +353,7 @@ public void setUp() throws Exception
     // initialize variables
     announcedSinks = 0;
     pushedSegments = 0;
+    publishCountDown = new CountDownLatch(1);
     indexSpec = new IndexSpec();
     emitter = newMockEmitter();
     EmittingLogger.registerEmitter(emitter);
@@ -357,9 +372,9 @@ public void setUp() throws Exception
 
     mdc = setUpMetadataStorageCoordinator();
 
-    tb = setUpTaskToolboxFactory(dataSegmentPusher, handoffNotifierFactory, mdc);
+    tb = setUpTaskToolboxFactory(dataSegmentPusher, handoffNotifierFactory, mdc, taskLockboxVersion);
 
-    taskRunner = setUpThreadPoolTaskRunner(tb);
+    taskRunner = setUpThreadPoolTaskRunner(tb, 1);
 
     taskQueue = setUpTaskQueue(taskStorage, taskRunner);
   }
@@ -385,7 +400,8 @@ private TaskStorage setUpTaskStorage()
         TestDerbyConnector testDerbyConnector = derbyConnectorRule.getConnector();
         mapper.registerSubtypes(
             new NamedType(MockExceptionalFirehoseFactory.class, "mockExcepFirehoseFactory"),
-            new NamedType(MockFirehoseFactory.class, "mockFirehoseFactory")
+            new NamedType(MockFirehoseFactory.class, "mockFirehoseFactory"),
+            new NamedType(TestIndexTask.class, "test_index")
         );
         testDerbyConnector.createTaskTables();
         testDerbyConnector.createSegmentTable();
@@ -497,7 +513,8 @@ private TestIndexerMetadataStorageCoordinator setUpMetadataStorageCoordinator()
   private TaskToolboxFactory setUpTaskToolboxFactory(
       DataSegmentPusher dataSegmentPusher,
       SegmentHandoffNotifierFactory handoffNotifierFactory,
-      TestIndexerMetadataStorageCoordinator mdc
+      TestIndexerMetadataStorageCoordinator mdc,
+      String taskLockboxVersion
   ) throws IOException
   {
     Preconditions.checkNotNull(queryRunnerFactoryConglomerate);
@@ -505,7 +522,11 @@ private TaskToolboxFactory setUpTaskToolboxFactory(
     Preconditions.checkNotNull(taskStorage);
     Preconditions.checkNotNull(emitter);
 
-    taskLockbox = new TaskLockbox(taskStorage);
+    if (taskLockboxVersion.equals(TASKLOCKBOX_V2)) {
+      taskLockbox = new TaskLockboxV2(taskStorage);
+    } else {
+      taskLockbox = new TaskLockboxV1(taskStorage);
+    }
     tac = new LocalTaskActionClientFactory(taskStorage, new TaskActionToolbox(taskLockbox, mdc, emitter));
     File tmpDir = temporaryFolder.newFolder();
     taskConfig = new TaskConfig(tmpDir.toString(), null, null, 50000, null, false, null, null);
@@ -597,17 +618,12 @@ public boolean isAnnounced(DataSegment segment)
     );
   }
 
-  private TaskRunner setUpThreadPoolTaskRunner(TaskToolboxFactory tb)
+  private TaskRunner setUpThreadPoolTaskRunner(TaskToolboxFactory tb, int numThreads)
   {
     Preconditions.checkNotNull(taskConfig);
     Preconditions.checkNotNull(emitter);
 
-    return new ThreadPoolTaskRunner(
-        tb,
-        taskConfig,
-        emitter,
-        new DruidNode("dummy", "dummy", 10000)
-    );
+    return new ThreadPoolTaskRunner(tb, taskConfig, emitter, numThreads, new DruidNode("dummy", "dummy", 10000));
   }
 
   private TaskQueue setUpTaskQueue(TaskStorage ts, TaskRunner tr) throws Exception
@@ -870,6 +886,8 @@ public TaskStatus run(TaskToolbox toolbox) throws Exception
                                                .interval(new Interval("2012-01-01/P1D"))
                                                .version(myLock.getVersion())
                                                .build();
+        // Upgrade the lock to exclusive lock
+        toolbox.getTaskActionClient().submit(new SetLockCriticalStateAction(getInterval(), TaskLockCriticalState.UPGRADE));
 
         toolbox.getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.of(segment)));
         return TaskStatus.success(getId());
@@ -1021,9 +1039,9 @@ public DataSegment push(File file, DataSegment dataSegment) throws IOException
       }
     };
 
-    tb = setUpTaskToolboxFactory(dataSegmentPusher, handoffNotifierFactory, mdc);
+    tb = setUpTaskToolboxFactory(dataSegmentPusher, handoffNotifierFactory, mdc, taskLockboxVersion);
 
-    taskRunner = setUpThreadPoolTaskRunner(tb);
+    taskRunner = setUpThreadPoolTaskRunner(tb, 1);
 
     taskQueue = setUpTaskQueue(taskStorage, taskRunner);
 
@@ -1199,4 +1217,384 @@ private RealtimeIndexTask newRealtimeIndexTask()
         null
     );
   }
+
+  @Test (timeout=4000L)
+  public void testLockOverride() throws Exception
+  {
+    // TaskLockboxV1 does not do priority locking thus no overriding of locks
+    if (taskLockboxVersion.equals(TASKLOCKBOX_V2)) {
+      publishCountDown = new CountDownLatch(1);
+      taskRunner = setUpThreadPoolTaskRunner(tb, 2);
+      taskQueue = setUpTaskQueue(taskStorage, taskRunner);
+
+      final CountDownLatch waitForLockAcquisition = new CountDownLatch(1);
+      final CountDownLatch waitForRealtimeTaskCompletion = new CountDownLatch(1);
+      final CountDownLatch waitForIndexTaskCompletion = new CountDownLatch(1);
+
+      final TestIndexTask indexTask = new TestIndexTask(
+          null,
+          null,
+          new IndexTask.IndexIngestionSpec(
+              new DataSchema(
+                  "test_ds",
+                  null,
+                  new AggregatorFactory[]{new DoubleSumAggregatorFactory("met", "met")},
+                  new UniformGranularitySpec(
+                      Granularity.DAY,
+                      null,
+                      ImmutableList.of(new Interval(new DateTime().toString("YYYY-MM-dd") + "/P1D"))
+                  ),
+                  mapper
+              ),
+              new IndexTask.IndexIOConfig(new MockFirehoseFactory(true)),
+              new IndexTask.IndexTuningConfig(10000, 10, -1, indexSpec, false)
+          ),
+          mapper
+      );
+      indexTask.setLatches(waitForLockAcquisition, waitForRealtimeTaskCompletion, null, null);
+
+      Futures.addCallback(
+          runTaskWithListenableFuture(indexTask),
+          new FutureCallback<TaskStatus>()
+          {
+            @Override
+            public void onSuccess(TaskStatus result)
+            {
+              Assert.assertEquals(TaskStatus.Status.FAILED, result.getStatusCode());
+              waitForIndexTaskCompletion.countDown();
+            }
+
+            @Override
+            public void onFailure(Throwable t)
+            {
+              Throwables.propagate(t);
+            }
+          }
+      );
+
+      // Wait for Index task to acquire lock on the interval
+      // Realtime task will revoke this lock
+      waitForLockAcquisition.await();
+
+      final Task realtimeIndexTask = newRealtimeIndexTask();
+      Futures.addCallback(
+          runTaskWithListenableFuture(realtimeIndexTask),
+          new FutureCallback<TaskStatus>()
+          {
+            @Override
+            public void onSuccess(TaskStatus result)
+            {
+              Assert.assertEquals(TaskStatus.Status.SUCCESS, result.getStatusCode());
+              waitForRealtimeTaskCompletion.countDown();
+            }
+
+            @Override
+            public void onFailure(Throwable t)
+            {
+              Throwables.propagate(t);
+            }
+          }
+      );
+      // wait for realtime to announce segment
+      publishCountDown.await();
+
+      // Realtime Task has published the segment, simulate loading of segment to a historical node by running handoffCallbacks
+      // so that task finishes with SUCCESS status
+      runHandOffCallbacks();
+
+      waitForRealtimeTaskCompletion.await();
+      waitForIndexTaskCompletion.await();
+    }
+  }
+
+  @Test (timeout=4000L)
+  public void testLockOverrideDuringUpgrade() throws Exception
+  {
+    // TaskLockboxV1 does not do priority locking thus no overriding of locks
+    if (taskLockboxVersion.equals(TASKLOCKBOX_V2)) {
+      // Two segments will be published here - one by index task and other realtime index task
+      publishCountDown = new CountDownLatch(2);
+      taskRunner = setUpThreadPoolTaskRunner(tb, 2);
+      taskQueue = setUpTaskQueue(taskStorage, taskRunner);
+
+      final CountDownLatch waitForIndexTaskCompletion = new CountDownLatch(1);
+      final CountDownLatch initialLockUpgradeCountDownLatch = new CountDownLatch(1);
+      final CountDownLatch runFinishAwaitLatch = new CountDownLatch(1);
+      final CountDownLatch waitForRealtimeTaskCompletion = new CountDownLatch(1);
+
+      final TestIndexTask indexTask = new TestIndexTask(
+          null,
+          null,
+          new IndexTask.IndexIngestionSpec(
+              new DataSchema(
+                  "test_ds",
+                  null,
+                  new AggregatorFactory[]{new DoubleSumAggregatorFactory("met", "met")},
+                  new UniformGranularitySpec(
+                      Granularity.DAY,
+                      null,
+                      ImmutableList.of(new Interval(new DateTime().toString("YYYY-MM-dd") + "/P1D"))
+                  ),
+                  mapper
+              ),
+              new IndexTask.IndexIOConfig(new MockFirehoseFactory(true)),
+              new IndexTask.IndexTuningConfig(10000, 10, -1, indexSpec, false)
+          ),
+          mapper
+      );
+      indexTask.setLatches(null, null, runFinishAwaitLatch, initialLockUpgradeCountDownLatch);
+      indexTask.setAlternateTaskLockState(true);
+
+      Futures.addCallback(
+          runTaskWithListenableFuture(indexTask),
+          new FutureCallback<TaskStatus>()
+          {
+            @Override
+            public void onSuccess(TaskStatus result)
+            {
+              Assert.assertEquals(TaskStatus.Status.FAILED, result.getStatusCode());
+              // This message is set in the returned TaskStatus's id field of TestIndexTask so that it can be asserted here
+              Assert.assertEquals(indexTask.getId(), result.getId());
+              waitForIndexTaskCompletion.countDown();
+            }
+
+            @Override
+            public void onFailure(Throwable t)
+            {
+              Throwables.propagate(t);
+            }
+          }
+      );
+
+      // Wait for Index task to acquire upgraded lock on the interval
+      while (initialLockUpgradeCountDownLatch.getCount() > 0) {
+        Thread.sleep(100);
+      }
+
+      final Task realtimeIndexTask = newRealtimeIndexTask();
+      Futures.addCallback(
+          runTaskWithListenableFuture(realtimeIndexTask),
+          new FutureCallback<TaskStatus>()
+          {
+            @Override
+            public void onSuccess(TaskStatus result)
+            {
+              Assert.assertEquals(TaskStatus.Status.SUCCESS, result.getStatusCode());
+              waitForRealtimeTaskCompletion.countDown();
+              runFinishAwaitLatch.countDown();
+            }
+
+            @Override
+            public void onFailure(Throwable t)
+            {
+              Throwables.propagate(t);
+            }
+          }
+      );
+      publishCountDown.await();
+      // Realtime Task has published the segment, simulate loading of segment to a historical node by running handoffCallbacks
+      // so that task finishes with SUCCESS status
+      runHandOffCallbacks();
+
+
+      waitForIndexTaskCompletion.await();
+      waitForRealtimeTaskCompletion.await();
+    }
+  }
+
+  @Test (timeout=4000L)
+  public void testReacquireLockIndexTask() throws Exception
+  {
+    taskRunner = setUpThreadPoolTaskRunner(tb, 2);
+    taskQueue = setUpTaskQueue(taskStorage, taskRunner);
+
+    final CountDownLatch lockAcquisitionCountDownLatch = new CountDownLatch(1);
+    final CountDownLatch runStartAwaitLatch = new CountDownLatch(1);
+    final CountDownLatch runFinishAwaitLatch = new CountDownLatch(1);
+    final CountDownLatch taskCompletionLatch = new CountDownLatch(1);
+
+    final TestIndexTask indexTask = new TestIndexTask(
+        null,
+        null,
+        new IndexTask.IndexIngestionSpec(
+            new DataSchema(
+                "test_ds",
+                null,
+                new AggregatorFactory[]{new DoubleSumAggregatorFactory("met", "met")},
+                new UniformGranularitySpec(
+                    Granularity.DAY,
+                    null,
+                    ImmutableList.of(new Interval(new DateTime().toString("YYYY-MM-dd") + "/P1D"))
+                ),
+                mapper
+            ),
+            new IndexTask.IndexIOConfig(new MockFirehoseFactory(true)),
+            new IndexTask.IndexTuningConfig(10000, 10, -1, indexSpec, false)
+        ),
+        mapper
+    );
+    indexTask.setLatches(lockAcquisitionCountDownLatch, runStartAwaitLatch, runFinishAwaitLatch, null);
+
+    Futures.addCallback(
+        runTaskWithListenableFuture(indexTask),
+        new FutureCallback<TaskStatus>()
+        {
+          @Override
+          public void onSuccess(TaskStatus result)
+          {
+            Assert.assertEquals(TaskStatus.Status.SUCCESS, result.getStatusCode());
+            taskCompletionLatch.countDown();
+          }
+
+          @Override
+          public void onFailure(Throwable t)
+          {
+            Throwables.propagate(t);
+          }
+        }
+    );
+
+    // Wait for the task to acquire the lock
+    lockAcquisitionCountDownLatch.await();
+
+    // Simulate overlord restart, this will cause lock to be reacquired
+    taskLockbox.syncFromStorage();
+
+    // Let the task run
+    runStartAwaitLatch.countDown();
+
+    // There will two locks in TaskStorage as overlord reacquired the lock once
+    assertTaskLocks(2, indexTask);
+
+    // Let the task finish
+    runFinishAwaitLatch.countDown();
+
+    // wait for task status callback
+    taskCompletionLatch.await();
+  }
+
+  @Test (timeout=4000L)
+  public void testReacquireUpgradedLock() throws Exception
+  {
+    taskRunner = setUpThreadPoolTaskRunner(tb, 2);
+    taskQueue = setUpTaskQueue(taskStorage, taskRunner);
+
+    final CountDownLatch lockAcquisitionCountDownLatch = new CountDownLatch(1);
+    final CountDownLatch runStartAwaitLatch = new CountDownLatch(1);
+    final CountDownLatch runFinishAwaitLatch = new CountDownLatch(1);
+    final CountDownLatch taskCompletionLatch = new CountDownLatch(1);
+
+    final TestIndexTask indexTask = new TestIndexTask(
+        null,
+        null,
+        new IndexTask.IndexIngestionSpec(
+            new DataSchema(
+                "test_ds",
+                null,
+                new AggregatorFactory[]{new DoubleSumAggregatorFactory("met", "met")},
+                new UniformGranularitySpec(
+                    Granularity.DAY,
+                    null,
+                    ImmutableList.of(new Interval(new DateTime().toString("YYYY-MM-dd") + "/P1D"))
+                ),
+                mapper
+            ),
+            new IndexTask.IndexIOConfig(new MockFirehoseFactory(true)),
+            new IndexTask.IndexTuningConfig(10000, 10, -1, indexSpec, false)
+        ),
+        mapper
+    );
+    indexTask.setLatches(lockAcquisitionCountDownLatch, runStartAwaitLatch, runFinishAwaitLatch, null);
+
+    Futures.addCallback(
+        runTaskWithListenableFuture(indexTask),
+        new FutureCallback<TaskStatus>()
+        {
+          @Override
+          public void onSuccess(TaskStatus result)
+          {
+            Assert.assertEquals(TaskStatus.Status.SUCCESS, result.getStatusCode());
+            taskCompletionLatch.countDown();
+          }
+
+          @Override
+          public void onFailure(Throwable t)
+          {
+            Throwables.propagate(t);
+          }
+        }
+    );
+
+    // Wait for the task to acquire the lock
+    lockAcquisitionCountDownLatch.await();
+
+    // Simulate overlord restart, this will cause lock to be reacquired
+    // consequently there will be two lock entries in MetadataStorage because of the way syncFromStorage works
+    taskLockbox.syncFromStorage();
+
+    // Let the task run
+    runStartAwaitLatch.countDown();
+
+    // wait for task to upgrade the lock and push segment
+    while (pushedSegments != 1) {
+      Thread.sleep(50);
+    }
+
+    // Simulate overlord restart, this will cause lock to be reacquired
+    taskLockbox.syncFromStorage();
+
+    // Let the task finish
+
+    runFinishAwaitLatch.countDown();
+    // There will three locks in case of using MetadataTaskStorage as overlord restarted twice
+    // After the first restart there will be two basic locks in MetadataStorage
+    // as it will reacquire and re-upgrade the existing single lock
+    // Next time overlord runs it will create one more basic lock in MetadataStorage
+    // and upgrade that newly create lock
+    assertTaskLocks(3, indexTask);
+    taskCompletionLatch.await();
+  }
+
+  private void assertTaskLocks(int numLockEntriesInMetadata, Task task) {
+
+    // There should only be one lock in the in memory snapshot of TaskLockbox irrespective of the TaskStorage type
+    TaskLock indexTaskLock = Iterables.getOnlyElement(taskLockbox.findLocksForTask(task));
+
+    // Get the Locks from TaskStorage
+    List<TaskLock> taskLocks = taskStorage.getLocks(task.getId());
+
+    if(taskStorageType.equals(METADATA_TASK_STORAGE)) {
+      Assert.assertEquals(numLockEntriesInMetadata, taskLocks.size());
+      for (int i = 0; i < numLockEntriesInMetadata; i++) {
+        Assert.assertEquals(indexTaskLock, taskLocks.get(i));
+      }
+    } else {
+      // Heap TaskStorage always have single TaskLock entry even in case of restarts
+      Assert.assertEquals(1, taskLocks.size());
+      Assert.assertEquals(indexTaskLock, taskLocks.get(0));
+    }
+  }
+
+  private ListenableFuture<TaskStatus> runTaskWithListenableFuture(final Task task) {
+    return MoreExecutors.listeningDecorator(Execs.singleThreaded("tasklifecycle_test_%d")).submit(
+        new Callable<TaskStatus>()
+        {
+          @Override
+          public TaskStatus call() throws Exception
+          {
+            return runTask(task);
+          }
+        }
+    );
+  }
+
+  private void runHandOffCallbacks() {
+    Iterator<Map.Entry<SegmentDescriptor, Pair<Executor, Runnable>>> itr = handOffCallbacks.entrySet()
+                                                                                           .iterator();
+    while (itr.hasNext()) {
+      Map.Entry<SegmentDescriptor, Pair<Executor, Runnable>> entry = itr.next();
+      entry.getValue().lhs.execute(entry.getValue().rhs);
+      itr.remove();
+    }
+  }
 }
diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLockboxTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLockboxTest.java
index 1751462066e..5070512b239 100644
--- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLockboxTest.java
+++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLockboxTest.java
@@ -22,27 +22,81 @@
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableList;
 import io.druid.indexing.common.TaskLock;
+import io.druid.indexing.common.TaskStatus;
 import io.druid.indexing.common.config.TaskStorageConfig;
 import io.druid.indexing.common.task.NoopTask;
 import io.druid.indexing.common.task.Task;
+import io.druid.jackson.DefaultObjectMapper;
+import io.druid.metadata.EntryExistsException;
+import io.druid.metadata.SQLMetadataStorageActionHandlerFactory;
+import io.druid.metadata.TestDerbyConnector;
+import junit.framework.Assert;
 import org.joda.time.Interval;
-import org.junit.Assert;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
-import java.util.List;
+import java.util.Arrays;
+import java.util.Collection;
 
+@RunWith(Parameterized.class)
 public class TaskLockboxTest
 {
   private TaskStorage taskStorage;
-
   private TaskLockbox lockbox;
+  private String taskStorageType;
+  private String taskLockboxVersion;
 
-  @Before
-  public void setUp()
+  @Rule
+  public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new TestDerbyConnector.DerbyConnectorRule();
+
+  private static final String HEAP_TASK_STORAGE = "HeapMemoryTaskStorage";
+  private static final String METADATA_TASK_STORAGE = "MetadataTaskStorage";
+  private static final String TASKLOCKBOX_V1 = "v1";
+  private static final String TASKLOCKBOX_V2 = "v2";
+
+  @Parameterized.Parameters(name = "taskStorageType={0}, taskLockboxVersion={1}")
+  public static Collection<String[]> constructFeed()
+  {
+    return Arrays.asList(new String[][]{
+        {HEAP_TASK_STORAGE, TASKLOCKBOX_V1},
+        {METADATA_TASK_STORAGE, TASKLOCKBOX_V1},
+        {HEAP_TASK_STORAGE, TASKLOCKBOX_V2},
+        {METADATA_TASK_STORAGE, TASKLOCKBOX_V2}
+    });
+  }
+
+  public TaskLockboxTest(String taskStorageType, String taskLockboxVersion)
   {
-    taskStorage = new HeapMemoryTaskStorage(new TaskStorageConfig(null));
-    lockbox = new TaskLockbox(taskStorage);
+    this.taskStorageType = taskStorageType;
+    this.taskLockboxVersion = taskLockboxVersion;
+  }
+
+  @Before
+  public void setUp(){
+    if (taskStorageType.equals(HEAP_TASK_STORAGE)) {
+      taskStorage = new HeapMemoryTaskStorage(new TaskStorageConfig(null));
+    } else {
+      TestDerbyConnector testDerbyConnector = derbyConnectorRule.getConnector();
+      testDerbyConnector.createTaskTables();
+      testDerbyConnector.createSegmentTable();
+      taskStorage = new MetadataTaskStorage(
+          testDerbyConnector,
+          new TaskStorageConfig(null),
+          new SQLMetadataStorageActionHandlerFactory(
+              testDerbyConnector,
+              derbyConnectorRule.metadataTablesConfigSupplier().get(),
+              new DefaultObjectMapper()
+          )
+      );
+    }
+    if (taskLockboxVersion.equals(TASKLOCKBOX_V2)) {
+      lockbox = new TaskLockboxV2(taskStorage);
+    } else {
+      lockbox = new TaskLockboxV1(taskStorage);
+    }
   }
 
   @Test
@@ -69,14 +123,19 @@ public void testLockAfterTaskComplete() throws InterruptedException
   }
 
   @Test
-  public void testTryLock() throws InterruptedException
+  public void testTryLock() throws InterruptedException, EntryExistsException
   {
     Task task = NoopTask.create();
+    // add task to TaskStorage as well otherwise the task will be considered a zombie task as there will be
+    // an active lock without associate entry in TaskStorage. Thus, unit test will fail
+    taskStorage.insert(task, TaskStatus.running(task.getId()));
     lockbox.add(task);
     Assert.assertTrue(lockbox.tryLock(task, new Interval("2015-01-01/2015-01-03")).isPresent());
 
     // try to take lock for task 2 for overlapping interval
     Task task2 = NoopTask.create();
+    // add task to TaskStorage as well otherwise the task will be considered a zombie task and unit test will fail
+    taskStorage.insert(task2, TaskStatus.running(task2.getId()));
     lockbox.add(task2);
     Assert.assertFalse(lockbox.tryLock(task2, new Interval("2015-01-01/2015-01-02")).isPresent());
 
diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TestIndexTask.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TestIndexTask.java
new file mode 100644
index 00000000000..89ae70aa1d0
--- /dev/null
+++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TestIndexTask.java
@@ -0,0 +1,123 @@
+/*
+* Licensed to Metamarkets Group Inc. (Metamarkets) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. Metamarkets licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package io.druid.indexing.overlord;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.druid.indexing.common.TaskStatus;
+import io.druid.indexing.common.TaskToolbox;
+import io.druid.indexing.common.actions.SetLockCriticalStateAction;
+import io.druid.indexing.common.actions.TaskActionClient;
+import io.druid.indexing.common.actions.TaskLockCriticalState;
+import io.druid.indexing.common.task.IndexTask;
+import io.druid.indexing.common.task.TaskResource;
+
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * Task that simulates IndexTask with some instrumentation, used for unit testing.
+ * */
+@JsonTypeName("test_index")
+public class TestIndexTask extends IndexTask
+{
+  private CountDownLatch lockAcquisitionCountDownLatch;
+  private CountDownLatch runStartAwaitLatch;
+  private CountDownLatch runFinishAwaitLatch;
+  private CountDownLatch initialLockUpgradeCountDownLatch;
+  private boolean alternateTaskLockState;
+
+  @JsonCreator
+  public TestIndexTask(
+      @JsonProperty("id") String id,
+      @JsonProperty("resource") TaskResource taskResource,
+      @JsonProperty("spec") IndexIngestionSpec ingestionSchema,
+      @JacksonInject ObjectMapper jsonMapper
+  ){
+    super(id, taskResource, ingestionSchema, jsonMapper, null);
+  }
+  @Override
+  public boolean isReady(TaskActionClient taskActionClient) throws Exception {
+    boolean retVal = super.isReady(taskActionClient);
+    if(lockAcquisitionCountDownLatch != null) {
+      lockAcquisitionCountDownLatch.countDown();
+    }
+    return retVal;
+  }
+
+  @Override
+  public TaskStatus run(TaskToolbox toolbox) throws Exception
+  {
+    if(runStartAwaitLatch != null) {
+      runStartAwaitLatch.await();
+    }
+    TaskStatus retVal = super.run(toolbox);
+
+    // Lock was upgraded in super.run (above statement)
+    if(initialLockUpgradeCountDownLatch != null) {
+      initialLockUpgradeCountDownLatch.countDown();
+    }
+
+    if (alternateTaskLockState && runFinishAwaitLatch != null) {
+      while (runFinishAwaitLatch.getCount() > 0) {
+        if (!toolbox.getTaskActionClient().submit(
+            new SetLockCriticalStateAction(getInterval(), TaskLockCriticalState.DOWNGRADE))) {
+          // set custom message instead of task id inside task status so that
+          // the cause of failure can be ascertained
+          return TaskStatus.failure("We should not fail here");
+        }
+        if (!toolbox.getTaskActionClient().submit(
+            new SetLockCriticalStateAction(getInterval(), TaskLockCriticalState.UPGRADE))) {
+          return TaskStatus.failure(getId());
+        }
+        Thread.sleep(10);
+      }
+    }
+
+    if(runFinishAwaitLatch != null) {
+      runFinishAwaitLatch.await();
+    }
+
+    return retVal;
+  }
+
+  @Override
+  @JsonProperty
+  public String getType()
+  {
+    return "test_index";
+  }
+
+  public void setLatches(CountDownLatch lockAcquisitionCountDownLatch,
+                         CountDownLatch runAwaitLatch,
+                         CountDownLatch runFinishAwaitLatch,
+                         CountDownLatch initialLockUpgradeCountDownLatch) {
+    this.lockAcquisitionCountDownLatch = lockAcquisitionCountDownLatch;
+    this.runStartAwaitLatch = runAwaitLatch;
+    this.runFinishAwaitLatch = runFinishAwaitLatch;
+    this.initialLockUpgradeCountDownLatch = initialLockUpgradeCountDownLatch;
+  }
+
+  public void setAlternateTaskLockState(boolean alternateTaskLockState) {
+    this.alternateTaskLockState = alternateTaskLockState;
+  }
+}
diff --git a/server/src/main/java/io/druid/metadata/SQLMetadataStorageActionHandler.java b/server/src/main/java/io/druid/metadata/SQLMetadataStorageActionHandler.java
index 1681a3f69b2..e6e424b40cf 100644
--- a/server/src/main/java/io/druid/metadata/SQLMetadataStorageActionHandler.java
+++ b/server/src/main/java/io/druid/metadata/SQLMetadataStorageActionHandler.java
@@ -322,6 +322,28 @@ public Boolean withHandle(Handle handle) throws Exception
     );
   }
 
+  public boolean setLock(final long lockId, final LockType lock)
+  {
+    return connector.retryWithHandle(
+      new HandleCallback<Boolean>()
+      {
+        @Override
+        public Boolean withHandle(final Handle handle) throws Exception
+        {
+          return handle.createStatement(
+              String.format(
+                  "UPDATE %1$s SET lock_payload = :payload WHERE id = :id",
+                  lockTable
+              )
+          )
+                       .bind("payload", jsonMapper.writeValueAsBytes(lock))
+                       .bind("id", lockId)
+                       .execute() == 1;
+        }
+      }
+    );
+  }
+
   public void removeLock(final long lockId)
   {
     connector.retryWithHandle(
diff --git a/services/src/main/java/io/druid/cli/CliOverlord.java b/services/src/main/java/io/druid/cli/CliOverlord.java
index 3aea375a774..ee6bda2f0c8 100644
--- a/services/src/main/java/io/druid/cli/CliOverlord.java
+++ b/services/src/main/java/io/druid/cli/CliOverlord.java
@@ -56,11 +56,12 @@
 import io.druid.indexing.overlord.MetadataTaskStorage;
 import io.druid.indexing.overlord.RemoteTaskRunnerFactory;
 import io.druid.indexing.overlord.TaskLockbox;
+import io.druid.indexing.overlord.TaskLockboxV1;
+import io.druid.indexing.overlord.TaskLockboxV2;
 import io.druid.indexing.overlord.TaskMaster;
 import io.druid.indexing.overlord.TaskRunnerFactory;
 import io.druid.indexing.overlord.TaskStorage;
 import io.druid.indexing.overlord.TaskStorageQueryAdapter;
-import io.druid.indexing.overlord.WorkerTaskRunner;
 import io.druid.indexing.overlord.autoscaling.PendingTaskBasedWorkerResourceManagementConfig;
 import io.druid.indexing.overlord.autoscaling.PendingTaskBasedWorkerResourceManagementStrategy;
 import io.druid.indexing.overlord.autoscaling.ResourceManagementSchedulerConfig;
@@ -141,7 +142,6 @@ public void configure(Binder binder)
 
             binder.bind(TaskActionClientFactory.class).to(LocalTaskActionClientFactory.class).in(LazySingleton.class);
             binder.bind(TaskActionToolbox.class).in(LazySingleton.class);
-            binder.bind(TaskLockbox.class).in(LazySingleton.class);
             binder.bind(TaskStorageQueryAdapter.class).in(LazySingleton.class);
 
             binder.bind(ChatHandlerProvider.class).toProvider(Providers.<ChatHandlerProvider>of(null));
@@ -149,6 +149,7 @@ public void configure(Binder binder)
             configureTaskStorage(binder);
             configureAutoscale(binder);
             configureRunners(binder);
+            bindTaskLockboxs(binder);
 
             binder.bind(AuditManager.class)
                   .toProvider(AuditManagerProvider.class)
@@ -232,6 +233,23 @@ private void configureAutoscale(Binder binder)
             biddy.addBinding("pendingTaskBased").to(PendingTaskBasedWorkerResourceManagementStrategy.class);
 
           }
+
+          private void bindTaskLockboxs(Binder binder)
+          {
+            PolyBind.createChoice(
+                binder, "druid.indexer.taskLockboxVersion", Key.get(TaskLockbox.class), Key.get(TaskLockboxV1.class)
+            );
+            final MapBinder<String, TaskLockbox> storageBinder = PolyBind.optionBinder(
+                binder,
+                Key.get(TaskLockbox.class)
+            );
+
+            storageBinder.addBinding("v1").to(TaskLockboxV1.class);
+            binder.bind(TaskLockboxV1.class).in(LazySingleton.class);
+
+            storageBinder.addBinding("v2").to(TaskLockboxV2.class).in(ManageLifecycle.class);
+            binder.bind(TaskLockboxV2.class).in(LazySingleton.class);
+          }
         },
         new IndexingServiceFirehoseModule(),
         new IndexingServiceTaskLogsModule()
diff --git a/services/src/main/java/io/druid/cli/CliPeon.java b/services/src/main/java/io/druid/cli/CliPeon.java
index 7d421bdd583..88bafdb97ee 100644
--- a/services/src/main/java/io/druid/cli/CliPeon.java
+++ b/services/src/main/java/io/druid/cli/CliPeon.java
@@ -61,6 +61,9 @@
 import io.druid.indexing.common.task.Task;
 import io.druid.indexing.overlord.HeapMemoryTaskStorage;
 import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
+import io.druid.indexing.overlord.TaskLockbox;
+import io.druid.indexing.overlord.TaskLockboxV1;
+import io.druid.indexing.overlord.TaskLockboxV2;
 import io.druid.indexing.overlord.TaskRunner;
 import io.druid.indexing.overlord.TaskStorage;
 import io.druid.indexing.overlord.ThreadPoolTaskRunner;
@@ -150,6 +153,8 @@ public void configure(Binder binder)
 
             binder.bind(NoopChatHandlerProvider.class).in(LazySingleton.class);
 
+            bindTaskLockboxs(binder);
+
             binder.bind(TaskToolboxFactory.class).in(LazySingleton.class);
 
             JsonConfigProvider.bind(binder, "druid.indexer.task", TaskConfig.class);
@@ -258,6 +263,23 @@ public String getTaskIDFromTask(final Task task)
           {
             return task.getId();
           }
+
+          private void bindTaskLockboxs(Binder binder)
+          {
+            PolyBind.createChoice(
+                binder, "druid.indexer.taskLockboxVersion", Key.get(TaskLockbox.class), Key.get(TaskLockboxV1.class)
+            );
+            final MapBinder<String, TaskLockbox> storageBinder = PolyBind.optionBinder(
+                binder,
+                Key.get(TaskLockbox.class)
+            );
+
+            storageBinder.addBinding("v1").to(TaskLockboxV1.class);
+            binder.bind(TaskLockboxV1.class).in(LazySingleton.class);
+
+            storageBinder.addBinding("v2").to(TaskLockboxV2.class).in(ManageLifecycle.class);
+            binder.bind(TaskLockboxV2.class).in(LazySingleton.class);
+          }
         },
         new IndexingServiceFirehoseModule(),
         new ChatHandlerServerModule(properties),


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org