You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ji...@apache.org on 2019/08/03 03:30:13 UTC

[incubator-druid] branch master updated: Add a cluster-wide configuration to force timeChunk lock and add a doc for segment locking (#8173)

This is an automated email from the ASF dual-hosted git repository.

jihoonson pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 1ee828f  Add a cluster-wide configuration to force timeChunk lock and add a doc for segment locking (#8173)
1ee828f is described below

commit 1ee828ff498638ace7d12d0d3d5023b08506271d
Author: Jihoon Son <ji...@apache.org>
AuthorDate: Fri Aug 2 20:30:05 2019 -0700

    Add a cluster-wide configuration to force timeChunk lock and add a doc for segment locking (#8173)
    
    * Add a cluster-wide configuration to force timeChunk lock and add a doc for segment locking
    
    * add more test
    
    * javadoc for missingIntervalsInOverwriteMode
    
    * Fix test
    
    * Address comments
    
    * avoid spotbugs
---
 docs/content/configuration/index.md                |   1 +
 docs/content/ingestion/locking-and-priority.md     |  86 ++++++++++++---
 .../druid/indexing/common/task/NoopTask.java       |  10 +-
 .../apache/druid/indexing/common/task/Task.java    |   6 ++
 .../task/batch/parallel/ParallelIndexSubTask.java  |  45 +++++---
 .../parallel/ParallelIndexSupervisorTask.java      |  53 ++++++----
 .../apache/druid/indexing/overlord/TaskMaster.java |   3 +
 .../apache/druid/indexing/overlord/TaskQueue.java  |  10 +-
 .../indexing/overlord/config/TaskLockConfig.java   |  36 +++++++
 .../druid/indexing/overlord/TaskLifecycleTest.java |   5 +-
 .../indexing/overlord/TaskLockConfigTest.java      | 117 +++++++++++++++++++++
 .../druid/indexing/overlord/http/OverlordTest.java |   2 +
 .../java/org/apache/druid/cli/CliOverlord.java     |   2 +
 13 files changed, 323 insertions(+), 53 deletions(-)

diff --git a/docs/content/configuration/index.md b/docs/content/configuration/index.md
index 073b512..0e9e663 100644
--- a/docs/content/configuration/index.md
+++ b/docs/content/configuration/index.md
@@ -905,6 +905,7 @@ These Overlord static configurations can be defined in the `overlord/runtime.pro
 |`druid.indexer.runner.type`|Choices "local" or "remote". Indicates whether tasks should be run locally or in a distributed environment. Experimental task runner "httpRemote" is also available which is same as "remote" but uses HTTP to interact with Middle Manaters instead of Zookeeper.|local|
 |`druid.indexer.storage.type`|Choices are "local" or "metadata". Indicates whether incoming tasks should be stored locally (in heap) or in metadata storage. Storing incoming tasks in metadata storage allows for tasks to be resumed if the Overlord should fail.|local|
 |`druid.indexer.storage.recentlyFinishedThreshold`|A duration of time to store task results.|PT24H|
+|`druid.indexer.tasklock.forceTimeChunkLock`|If set, all tasks are enforced to use time chunk lock. If not set, each task automatically chooses a lock type to use. This configuration can be overwritten by setting `forceTimeChunkLock` in the [task context](../ingestion/locking-and-priority.html#task-context). See [Task Locking & Priority](../ingestion/locking-and-priority.html) for more details about locking in tasks.|true|
 |`druid.indexer.queue.maxSize`|Maximum number of active tasks at one time.|Integer.MAX_VALUE|
 |`druid.indexer.queue.startDelay`|Sleep this long before starting Overlord queue management. This can be useful to give a cluster time to re-orient itself after e.g. a widespread network issue.|PT1M|
 |`druid.indexer.queue.restartDelay`|Sleep this long when Overlord queue management throws an exception before trying again.|PT30S|
diff --git a/docs/content/ingestion/locking-and-priority.md b/docs/content/ingestion/locking-and-priority.md
index e9bbbeb..f0ba3e1 100644
--- a/docs/content/ingestion/locking-and-priority.md
+++ b/docs/content/ingestion/locking-and-priority.md
@@ -24,30 +24,85 @@ title: "Task Locking & Priority"
 
 # Task Locking & Priority
 
+This document explains the task locking system in Druid. Druid's locking system
+and versioning system are tighly coupled with each other to guarantee the correctness of ingested data.
+
+## Overshadow Relation between Segments
+
+You can run a task to overwrite existing data. The segments created by an overwriting task _overshadows_ existing segments.
+Note that the overshadow relation holds only for the same time chunk and the same data source.
+These overshadowed segments are not considered in query processing to filter out stale data.
+
+Each segment has a _major_ version and a _minor_ version. The major version is
+represented as a timestamp in the format of [`"yyyy-MM-dd'T'hh:mm:ss"`](https://www.joda.org/joda-time/apidocs/org/joda/time/format/DateTimeFormat.html)
+while the minor version is an integer number. These major and minor versions
+are used to determine the overshadow relation between segments as seen below. 
+
+A segment `s1` overshadows another `s2` if
+
+- `s1` has a higher major version than `s2`.
+- `s1` has the same major version and a higher minor version than `s2`.
+
+Here are some examples.
+
+- A segment of the major version of `2019-01-01T00:00:00.000Z` and the minor version of `0` overshadows
+ another of the major version of `2018-01-01T00:00:00.000Z` and the minor version of `1`.
+- A segment of the major version of `2019-01-01T00:00:00.000Z` and the minor version of `1` overshadows
+ another of the major version of `2019-01-01T00:00:00.000Z` and the minor version of `0`.
+
 ## Locking
 
-Once an Overlord process accepts a task, the task acquires locks for the data source and intervals specified in the task.
+If you are running two or more [druid tasks](./tasks.html) which generate segments for the same data source and the same time chunk,
+the generated segments could potentially overshadow each other, which could lead to incorrect query results.
+
+To avoid this problem, tasks will attempt to get locks prior to creating any segment in Druid.
+There are two types of locks, i.e., _time chunk lock_ and _segment lock_.
+
+When the time chunk lock is used, a task locks the entire time chunk of a data source where generated segments will be written.
+For example, suppose we have a task ingesting data into the time chunk of `2019-01-01T00:00:00.000Z/2019-01-02T00:00:00.000Z` of the `wikipedia` data source.
+With the time chunk locking, this task will lock the entire time chunk of `2019-01-01T00:00:00.000Z/2019-01-02T00:00:00.000Z` of the `wikipedia` data source
+before it creates any segments. As long as it holds the lock, any other tasks will be unable to create segments for the same time chunk of the same data source.
+The segments created with the time chunk locking have a _higher_ major version than existing segments. Their minor version is always `0`.
+
+When the segment lock is used, a task locks individual segments instead of the entire time chunk.
+As a result, two or more tasks can create segments for the same time chunk of the same data source simultaneously
+if they are reading different segments.
+For example, a Kafka indexing task and a compaction task can always write segments into the same time chunk of the same data source simultaneously.
+The reason for this is because a Kafka indexing task always appends new segments, while a compaction task always overwrites existing segments.
+The segments created with the segment locking have the _same_ major version and a _higher_ minor version.
 
-There are two lock types, i.e., _shared lock_ and _exclusive lock_.
+To enable segment locking, you may need to set `forceTimeChunkLock` to `false` in the [task context](#task-context).
+Once `forceTimeChunkLock` is unset, the task will choose a proper lock type to use automatically.
+Please note that segment lock is not always available. The most common use case where time chunk lock is enforced is
+when an overwriting task changes the segment granularity.
+Also, the segment locking is supported by only native indexing tasks and Kafka/Kinesis indexing tasks.
+The Hadoop indexing tasks and realtime indexing tasks (with [Tranquility](./stream-push.html)) don't support it yet.
 
-- A task needs to acquire a shared lock before it reads segments of an interval. Multiple shared locks can be acquired for the same dataSource and interval. Shared locks are always preemptable, but they don't preempt each other.
-- A task needs to acquire an exclusive lock before it writes segments for an interval. An exclusive lock is also preemptable except while the task is publishing segments.
+`forceTimeChunkLock` in the task context is only applied to individual tasks.
+If you want to unset it for all tasks, you would want to set `druid.indexer.tasklock.forceTimeChunkLock` to false in the [overlord configuration](../configuration/index.html#overlord-operations).
 
-Each task can have different lock priorities. The locks of higher-priority tasks can preempt the locks of lower-priority tasks. The lock preemption works based on _optimistic locking_. When a lock is preempted, it is not notified to the owner task immediately. Instead, it's notified when the owner task tries to acquire the same lock again. (Note that lock acquisition is idempotent unless the lock is preempted.) In general, tasks don't compete for acquiring locks because they usually targ [...]
+Lock requests can conflict with each other if two or more tasks try to get locks for the overlapped time chunks of the same data source.
+Note that the lock conflict can happen between different locks types.
 
-A task writing data into a dataSource must acquire exclusive locks for target intervals. Note that exclusive locks are still preemptable. That is, they also be able to be preempted by higher priority locks unless they are _publishing segments_ in a critical section. Once publishing segments is finished, those locks become preemptable again.
+The behavior on lock conflicts depends on the [task priority](#task-lock-priority).
+If all tasks of conflicting lock requests have the same priority, then the task who requested first will get the lock.
+Other tasks will wait for the task to release the lock.
 
-Tasks do not need to explicitly release locks, they are released upon task completion. Tasks may potentially release 
-locks early if they desire. Task ids are unique by naming them using UUIDs or the timestamp in which the task was created. 
-Tasks are also part of a "task group", which is a set of tasks that can share interval locks.
+If a task of a lower priority asks a lock later than another of a higher priority,
+this task will also wait for the task of a higher priority to release the lock.
+If a task of a higher priority asks a lock later than another of a lower priority,
+then this task will _preempt_ the other task of a lower priority. The lock
+of the lower-prioritized task will be revoked and the higher-prioritized task will acquire a new lock.
 
-## Priority
+This lock preemption can happen at any time while a task is running except
+when it is _publishing segments_ in a critical section. Its locks become preemptable again once publishing segments is finished.
 
-Apache Druid (incubating)'s indexing tasks use locks for atomic data ingestion. Each lock is acquired for the combination of a dataSource and an interval. Once a task acquires a lock, it can write data for the dataSource and the interval of the acquired lock unless the lock is released or preempted. Please see [the below Locking section](#locking)
+Note that locks are shared by the tasks of the same groupId.
+For example, Kafka indexing tasks of the same supervisor have the same groupId and share all locks with each other.
 
-Each task has a priority which is used for lock acquisition. The locks of higher-priority tasks can preempt the locks of lower-priority tasks if they try to acquire for the same dataSource and interval. If some locks of a task are preempted, the behavior of the preempted task depends on the task implementation. Usually, most tasks finish as failed if they are preempted.
+## Task Lock Priority
 
-Tasks can have different default priorities depening on their types. Here are a list of default priorities. Higher the number, higher the priority.
+Each task type has a different default lock priority. The below table shows the default priorities of different task types. Higher the number, higher the priority.
 
 |task type|default priority|
 |---------|----------------|
@@ -56,7 +111,7 @@ Tasks can have different default priorities depening on their types. Here are a
 |Merge/Append/Compaction task|25|
 |Other tasks|0|
 
-You can override the task priority by setting your priority in the task context like below.
+You can override the task priority by setting your priority in the task context as below.
 
 ```json
 "context" : {
@@ -66,11 +121,12 @@ You can override the task priority by setting your priority in the task context
 
 ## Task Context
 
-The task context is used for various task configuration parameters. The following parameters apply to all task types.
+The task context is used for various individual task configuration. The following parameters apply to all task types.
 
 |property|default|description|
 |--------|-------|-----------|
 |taskLockTimeout|300000|task lock timeout in millisecond. For more details, see [Locking](#locking).|
+|forceTimeChunkLock|true|Force to always use time chunk lock. If not set, each task automatically chooses a lock type to use. If this set, it will overwrite the `druid.indexer.tasklock.forceTimeChunkLock` [configuration for the overlord](../configuration/index.html#overlord-operations). See [Locking](#locking) for more details.|
 |priority|Different based on task types. See [Priority](#priority).|Task priority|
 
 <div class="note caution">
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NoopTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NoopTask.java
index b400c3b..1bbe4ee 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NoopTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NoopTask.java
@@ -23,7 +23,6 @@ import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableMap;
 import org.apache.druid.data.input.Firehose;
 import org.apache.druid.data.input.FirehoseFactory;
 import org.apache.druid.indexer.TaskStatus;
@@ -34,6 +33,7 @@ import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.logger.Logger;
 
+import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
 
@@ -179,12 +179,16 @@ public class NoopTask extends AbstractTask
   @VisibleForTesting
   public static NoopTask create(int priority)
   {
-    return new NoopTask(null, null, null, 0, 0, null, null, ImmutableMap.of(Tasks.PRIORITY_KEY, priority));
+    final Map<String, Object> context = new HashMap<>();
+    context.put(Tasks.PRIORITY_KEY, priority);
+    return new NoopTask(null, null, null, 0, 0, null, null, context);
   }
 
   @VisibleForTesting
   public static NoopTask create(String id, int priority)
   {
-    return new NoopTask(id, null, null, 0, 0, null, null, ImmutableMap.of(Tasks.PRIORITY_KEY, priority));
+    final Map<String, Object> context = new HashMap<>();
+    context.put(Tasks.PRIORITY_KEY, priority);
+    return new NoopTask(id, null, null, 0, 0, null, null, context);
   }
 }
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java
index c32f27d..45aabd8 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java
@@ -187,6 +187,12 @@ public interface Task
     return getContext();
   }
 
+  default Map<String, Object> addToContextIfAbsent(String key, Object val)
+  {
+    getContext().putIfAbsent(key, val);
+    return getContext();
+  }
+
   Map<String, Object> getContext();
 
   default <ContextValueType> ContextValueType getContextValue(String key)
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSubTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSubTask.java
index 644b1c1..fb902cb 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSubTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSubTask.java
@@ -98,7 +98,7 @@ public class ParallelIndexSubTask extends AbstractBatchIndexTask
 {
   public static final String TYPE = "index_sub";
 
-  private static final Logger log = new Logger(ParallelIndexSubTask.class);
+  private static final Logger LOG = new Logger(ParallelIndexSubTask.class);
 
   private final int numAttempts;
   private final ParallelIndexIngestionSpec ingestionSchema;
@@ -111,6 +111,20 @@ public class ParallelIndexSubTask extends AbstractBatchIndexTask
   private Thread runThread;
   private boolean stopped = false;
 
+  /**
+   * If intervals are missing in the granularitySpec, parallel index task runs in "dynamic locking mode".
+   * In this mode, sub tasks ask new locks whenever they see a new row which is not covered by existing locks.
+   * If this task is overwriting existing segments, then we should know this task is changing segment granularity
+   * in advance to know what types of lock we should use. However, if intervals are missing, we can't know
+   * the segment granularity of existing segments until the task reads all data because we don't know what segments
+   * are going to be overwritten. As a result, we assume that segment granularity is going to be changed if intervals
+   * are missing and force to use timeChunk lock.
+   *
+   * This variable is initialized in the constructor and used in {@link #run} to log that timeChunk lock was enforced
+   * in the task logs.
+   */
+  private final boolean missingIntervalsInOverwriteMode;
+
   @JsonCreator
   public ParallelIndexSubTask(
       // id shouldn't be null except when this task is created by ParallelIndexSupervisorTask
@@ -144,6 +158,14 @@ public class ParallelIndexSubTask extends AbstractBatchIndexTask
     this.indexingServiceClient = indexingServiceClient;
     this.taskClientFactory = taskClientFactory;
     this.appenderatorsManager = appenderatorsManager;
+    this.missingIntervalsInOverwriteMode = !ingestionSchema.getIOConfig().isAppendToExisting()
+                                           && !ingestionSchema.getDataSchema()
+                                                              .getGranularitySpec()
+                                                              .bucketIntervals()
+                                                              .isPresent();
+    if (missingIntervalsInOverwriteMode) {
+      addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, true);
+    }
   }
 
   @Override
@@ -161,17 +183,6 @@ public class ParallelIndexSubTask extends AbstractBatchIndexTask
   @Override
   public boolean isReady(TaskActionClient taskActionClient) throws IOException
   {
-    if (!ingestionSchema.getDataSchema().getGranularitySpec().bucketIntervals().isPresent()
-        && !ingestionSchema.getIOConfig().isAppendToExisting()) {
-      // If intervals are missing in the granularitySpec, parallel index task runs in "dynamic locking mode".
-      // In this mode, sub tasks ask new locks whenever they see a new row which is not covered by existing locks.
-      // If this task is overwriting existing segments, then we should know this task is changing segment granularity
-      // in advance to know what types of lock we should use. However, if intervals are missing, we can't know
-      // the segment granularity of existing segments until the task reads all data because we don't know what segments
-      // are going to be overwritten. As a result, we assume that segment granularity will be changed if intervals are
-      // missing force to use timeChunk lock.
-      addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, true);
-    }
     return determineLockGranularityAndTryLock(taskActionClient, ingestionSchema.getDataSchema().getGranularitySpec());
   }
 
@@ -204,6 +215,12 @@ public class ParallelIndexSubTask extends AbstractBatchIndexTask
       }
     }
 
+    if (missingIntervalsInOverwriteMode) {
+      LOG.warn(
+          "Intervals are missing in granularitySpec while this task is potentially overwriting existing segments. "
+          + "Forced to use timeChunk lock."
+      );
+    }
     final FirehoseFactory firehoseFactory = ingestionSchema.getIOConfig().getFirehoseFactory();
 
     final File firehoseTempDir = toolbox.getFirehoseTemporaryDir();
@@ -459,7 +476,7 @@ public class ParallelIndexSubTask extends AbstractBatchIndexTask
               // which makes the size of segments smaller.
               final SegmentsAndMetadata pushed = driver.pushAllAndClear(pushTimeout);
               pushedSegments.addAll(pushed.getSegments());
-              log.info("Pushed segments[%s]", pushed.getSegments());
+              LOG.info("Pushed segments[%s]", pushed.getSegments());
             }
           } else {
             throw new ISE("Failed to add a row with timestamp[%s]", inputRow.getTimestamp());
@@ -478,7 +495,7 @@ public class ParallelIndexSubTask extends AbstractBatchIndexTask
 
       final SegmentsAndMetadata pushed = driver.pushAllAndClear(pushTimeout);
       pushedSegments.addAll(pushed.getSegments());
-      log.info("Pushed segments[%s]", pushed.getSegments());
+      LOG.info("Pushed segments[%s]", pushed.getSegments());
 
       return pushedSegments;
     }
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
index 535b046..c8cd3d9 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
@@ -99,7 +99,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
 {
   public static final String TYPE = "index_parallel";
 
-  private static final Logger log = new Logger(ParallelIndexSupervisorTask.class);
+  private static final Logger LOG = new Logger(ParallelIndexSupervisorTask.class);
 
   private final ParallelIndexIngestionSpec ingestionSchema;
   private final FiniteFirehoseFactory<?, ?> baseFirehoseFactory;
@@ -109,6 +109,20 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
   private final RowIngestionMetersFactory rowIngestionMetersFactory;
   private final AppenderatorsManager appenderatorsManager;
 
+  /**
+   * If intervals are missing in the granularitySpec, parallel index task runs in "dynamic locking mode".
+   * In this mode, sub tasks ask new locks whenever they see a new row which is not covered by existing locks.
+   * If this task is overwriting existing segments, then we should know this task is changing segment granularity
+   * in advance to know what types of lock we should use. However, if intervals are missing, we can't know
+   * the segment granularity of existing segments until the task reads all data because we don't know what segments
+   * are going to be overwritten. As a result, we assume that segment granularity is going to be changed if intervals
+   * are missing and force to use timeChunk lock.
+   *
+   * This variable is initialized in the constructor and used in {@link #run} to log that timeChunk lock was enforced
+   * in the task logs.
+   */
+  private final boolean missingIntervalsInOverwriteMode;
+
   private final ConcurrentHashMap<Interval, AtomicInteger> partitionNumCountersPerInterval = new ConcurrentHashMap<>();
 
   private volatile ParallelIndexTaskRunner runner;
@@ -153,16 +167,24 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
     this.authorizerMapper = authorizerMapper;
     this.rowIngestionMetersFactory = rowIngestionMetersFactory;
     this.appenderatorsManager = appenderatorsManager;
+    this.missingIntervalsInOverwriteMode = !ingestionSchema.getIOConfig().isAppendToExisting()
+                                           && !ingestionSchema.getDataSchema()
+                                                              .getGranularitySpec()
+                                                              .bucketIntervals()
+                                                              .isPresent();
+    if (missingIntervalsInOverwriteMode) {
+      addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, true);
+    }
 
     if (ingestionSchema.getTuningConfig().getMaxSavedParseExceptions()
         != TuningConfig.DEFAULT_MAX_SAVED_PARSE_EXCEPTIONS) {
-      log.warn("maxSavedParseExceptions is not supported yet");
+      LOG.warn("maxSavedParseExceptions is not supported yet");
     }
     if (ingestionSchema.getTuningConfig().getMaxParseExceptions() != TuningConfig.DEFAULT_MAX_PARSE_EXCEPTIONS) {
-      log.warn("maxParseExceptions is not supported yet");
+      LOG.warn("maxParseExceptions is not supported yet");
     }
     if (ingestionSchema.getTuningConfig().isLogParseExceptions() != TuningConfig.DEFAULT_LOG_PARSE_EXCEPTIONS) {
-      log.warn("logParseExceptions is not supported yet");
+      LOG.warn("logParseExceptions is not supported yet");
     }
   }
 
@@ -225,17 +247,6 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
   @Override
   public boolean isReady(TaskActionClient taskActionClient) throws Exception
   {
-    if (!ingestionSchema.getDataSchema().getGranularitySpec().bucketIntervals().isPresent()
-        && !ingestionSchema.getIOConfig().isAppendToExisting()) {
-      // If intervals are missing in the granularitySpec, parallel index task runs in "dynamic locking mode".
-      // In this mode, sub tasks ask new locks whenever they see a new row which is not covered by existing locks.
-      // If this task is overwriting existing segments, then we should know this task is changing segment granularity
-      // in advance to know what types of lock we should use. However, if intervals are missing, we can't know
-      // the segment granularity of existing segments until the task reads all data because we don't know what segments
-      // are going to be overwritten. As a result, we assume that segment granularity will be changed if intervals are
-      // missing force to use timeChunk lock.
-      addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, true);
-    }
     return determineLockGranularityAndTryLock(taskActionClient, ingestionSchema.getDataSchema().getGranularitySpec());
   }
 
@@ -291,7 +302,13 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
   @Override
   public TaskStatus run(TaskToolbox toolbox) throws Exception
   {
-    log.info(
+    if (missingIntervalsInOverwriteMode) {
+      LOG.warn(
+          "Intervals are missing in granularitySpec while this task is potentially overwriting existing segments. "
+          + "Forced to use timeChunk lock."
+      );
+    }
+    LOG.info(
         "Found chat handler of class[%s]",
         Preconditions.checkNotNull(chatHandlerProvider, "chatHandlerProvider").getClass().getName()
     );
@@ -302,12 +319,12 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
         return runParallel(toolbox);
       } else {
         if (!baseFirehoseFactory.isSplittable()) {
-          log.warn(
+          LOG.warn(
               "firehoseFactory[%s] is not splittable. Running sequentially.",
               baseFirehoseFactory.getClass().getSimpleName()
           );
         } else if (ingestionSchema.getTuningConfig().getMaxNumSubTasks() == 1) {
-          log.warn(
+          LOG.warn(
               "maxNumSubTasks is 1. Running sequentially. "
               + "Please set maxNumSubTasks to something higher than 1 if you want to run in parallel ingestion mode."
           );
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java
index 5347fa6..0145439 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java
@@ -30,6 +30,7 @@ import org.apache.druid.indexing.common.actions.TaskActionClient;
 import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
 import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
+import org.apache.druid.indexing.overlord.config.TaskLockConfig;
 import org.apache.druid.indexing.overlord.config.TaskQueueConfig;
 import org.apache.druid.indexing.overlord.helpers.OverlordHelperManager;
 import org.apache.druid.indexing.overlord.supervisor.SupervisorManager;
@@ -74,6 +75,7 @@ public class TaskMaster implements TaskCountStatsProvider
 
   @Inject
   public TaskMaster(
+      final TaskLockConfig taskLockConfig,
       final TaskQueueConfig taskQueueConfig,
       final TaskLockbox taskLockbox,
       final TaskStorage taskStorage,
@@ -110,6 +112,7 @@ public class TaskMaster implements TaskCountStatsProvider
           taskLockbox.syncFromStorage();
           taskRunner = runnerFactory.build();
           taskQueue = new TaskQueue(
+              taskLockConfig,
               taskQueueConfig,
               taskStorage,
               taskRunner,
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
index ee3a56d..332356b 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
@@ -30,12 +30,13 @@ import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.google.inject.Inject;
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexing.common.Counters;
 import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
 import org.apache.druid.indexing.common.task.IndexTaskUtils;
 import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.common.task.Tasks;
+import org.apache.druid.indexing.overlord.config.TaskLockConfig;
 import org.apache.druid.indexing.overlord.config.TaskQueueConfig;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
@@ -80,6 +81,7 @@ public class TaskQueue
   private final List<Task> tasks = new ArrayList<>();
   private final Map<String, ListenableFuture<TaskStatus>> taskFutures = new HashMap<>();
 
+  private final TaskLockConfig lockConfig;
   private final TaskQueueConfig config;
   private final TaskStorage taskStorage;
   private final TaskRunner taskRunner;
@@ -109,8 +111,8 @@ public class TaskQueue
   private Map<String, Long> prevTotalSuccessfulTaskCount = new HashMap<>();
   private Map<String, Long> prevTotalFailedTaskCount = new HashMap<>();
 
-  @Inject
   public TaskQueue(
+      TaskLockConfig lockConfig,
       TaskQueueConfig config,
       TaskStorage taskStorage,
       TaskRunner taskRunner,
@@ -119,6 +121,7 @@ public class TaskQueue
       ServiceEmitter emitter
   )
   {
+    this.lockConfig = Preconditions.checkNotNull(lockConfig, "lockConfig");
     this.config = Preconditions.checkNotNull(config, "config");
     this.taskStorage = Preconditions.checkNotNull(taskStorage, "taskStorage");
     this.taskRunner = Preconditions.checkNotNull(taskRunner, "taskRunner");
@@ -343,6 +346,9 @@ public class TaskQueue
       throw new EntryExistsException(StringUtils.format("Task %s already exists", task.getId()));
     }
 
+    // Set forceTimeChunkLock before adding task spec to taskStorage, so that we can see always consistent task spec.
+    task.addToContextIfAbsent(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockConfig.isForceTimeChunkLock());
+
     giant.lock();
 
     try {
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/TaskLockConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/TaskLockConfig.java
new file mode 100644
index 0000000..422657a
--- /dev/null
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/TaskLockConfig.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.overlord.config;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * Global configurations for task lock. Used by the overlord.
+ */
+public class TaskLockConfig
+{
+  @JsonProperty
+  private boolean forceTimeChunkLock = true;
+
+  public boolean isForceTimeChunkLock()
+  {
+    return forceTimeChunkLock;
+  }
+}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
index bfbbd95..f7dbfde 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
@@ -71,6 +71,7 @@ import org.apache.druid.indexing.common.task.RealtimeIndexTask;
 import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.indexing.common.task.TaskResource;
 import org.apache.druid.indexing.common.task.TestAppenderatorsManager;
+import org.apache.druid.indexing.overlord.config.TaskLockConfig;
 import org.apache.druid.indexing.overlord.config.TaskQueueConfig;
 import org.apache.druid.indexing.overlord.supervisor.SupervisorManager;
 import org.apache.druid.indexing.test.TestIndexerMetadataStorageCoordinator;
@@ -227,6 +228,7 @@ public class TaskLifecycleTest
   private QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate;
   private MonitorScheduler monitorScheduler;
   private ServiceEmitter emitter;
+  private TaskLockConfig lockConfig;
   private TaskQueueConfig tqc;
   private TaskConfig taskConfig;
   private DataSegmentPusher dataSegmentPusher;
@@ -653,12 +655,13 @@ public class TaskLifecycleTest
     Preconditions.checkNotNull(tac);
     Preconditions.checkNotNull(emitter);
 
+    lockConfig = new TaskLockConfig();
     tqc = mapper.readValue(
         "{\"startDelay\":\"PT0S\", \"restartDelay\":\"PT1S\", \"storageSyncRate\":\"PT0.5S\"}",
         TaskQueueConfig.class
     );
 
-    return new TaskQueue(tqc, ts, tr, tac, taskLockbox, emitter);
+    return new TaskQueue(lockConfig, tqc, ts, tr, tac, taskLockbox, emitter);
   }
 
   @After
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockConfigTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockConfigTest.java
new file mode 100644
index 0000000..8993357
--- /dev/null
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockConfigTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.overlord;
+
+import com.google.common.base.Optional;
+import org.apache.druid.indexing.common.actions.LocalTaskActionClientFactory;
+import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
+import org.apache.druid.indexing.common.config.TaskStorageConfig;
+import org.apache.druid.indexing.common.task.NoopTask;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.common.task.Tasks;
+import org.apache.druid.indexing.overlord.config.TaskLockConfig;
+import org.apache.druid.indexing.overlord.config.TaskQueueConfig;
+import org.apache.druid.indexing.test.TestIndexerMetadataStorageCoordinator;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.metadata.EntryExistsException;
+import org.apache.druid.server.metrics.NoopServiceEmitter;
+import org.easymock.EasyMock;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+public class TaskLockConfigTest
+{
+  private TaskStorage taskStorage;
+
+  @Before
+  public void setup()
+  {
+    taskStorage = new HeapMemoryTaskStorage(new TaskStorageConfig(null));
+  }
+
+  @Test
+  public void testDefault() throws EntryExistsException
+  {
+    final TaskQueue taskQueue = createTaskQueue(null);
+    taskQueue.start();
+    final Task task = NoopTask.create();
+    Assert.assertTrue(taskQueue.add(task));
+    taskQueue.stop();
+    final Optional<Task> optionalTask = taskStorage.getTask(task.getId());
+    Assert.assertTrue(optionalTask.isPresent());
+    final Task fromTaskStorage = optionalTask.get();
+    Assert.assertTrue(fromTaskStorage.getContextValue(Tasks.FORCE_TIME_CHUNK_LOCK_KEY));
+  }
+
+  @Test
+  public void testNotForceTimeChunkLock() throws EntryExistsException
+  {
+    final TaskQueue taskQueue = createTaskQueue(false);
+    taskQueue.start();
+    final Task task = NoopTask.create();
+    Assert.assertTrue(taskQueue.add(task));
+    taskQueue.stop();
+    final Optional<Task> optionalTask = taskStorage.getTask(task.getId());
+    Assert.assertTrue(optionalTask.isPresent());
+    final Task fromTaskStorage = optionalTask.get();
+    Assert.assertFalse(fromTaskStorage.getContextValue(Tasks.FORCE_TIME_CHUNK_LOCK_KEY));
+  }
+
+  @Test
+  public void testOverwriteDefault() throws EntryExistsException
+  {
+    final TaskQueue taskQueue = createTaskQueue(null);
+    taskQueue.start();
+    final Task task = NoopTask.create();
+    task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, false);
+    Assert.assertTrue(taskQueue.add(task));
+    taskQueue.stop();
+    final Optional<Task> optionalTask = taskStorage.getTask(task.getId());
+    Assert.assertTrue(optionalTask.isPresent());
+    final Task fromTaskStorage = optionalTask.get();
+    Assert.assertFalse(fromTaskStorage.getContextValue(Tasks.FORCE_TIME_CHUNK_LOCK_KEY));
+  }
+
+  private TaskQueue createTaskQueue(@Nullable Boolean forceTimeChunkLock)
+  {
+    final TaskLockConfig lockConfig;
+    if (forceTimeChunkLock != null) {
+      lockConfig = new TaskLockConfig()
+      {
+        @Override
+        public boolean isForceTimeChunkLock()
+        {
+          return forceTimeChunkLock;
+        }
+      };
+    } else {
+      lockConfig = new TaskLockConfig();
+    }
+    final TaskQueueConfig queueConfig = new TaskQueueConfig(null, null, null, null);
+    final TaskRunner taskRunner = EasyMock.createNiceMock(RemoteTaskRunner.class);
+    final TaskActionClientFactory actionClientFactory = EasyMock.createNiceMock(LocalTaskActionClientFactory.class);
+    final TaskLockbox lockbox = new TaskLockbox(taskStorage, new TestIndexerMetadataStorageCoordinator());
+    final ServiceEmitter emitter = new NoopServiceEmitter();
+    return new TaskQueue(lockConfig, queueConfig, taskStorage, taskRunner, actionClientFactory, lockbox, emitter);
+  }
+}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java
index 58f93a2..52373ea 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java
@@ -54,6 +54,7 @@ import org.apache.druid.indexing.overlord.TaskStorage;
 import org.apache.druid.indexing.overlord.TaskStorageQueryAdapter;
 import org.apache.druid.indexing.overlord.WorkerTaskRunnerQueryAdapter;
 import org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
+import org.apache.druid.indexing.overlord.config.TaskLockConfig;
 import org.apache.druid.indexing.overlord.config.TaskQueueConfig;
 import org.apache.druid.indexing.overlord.helpers.OverlordHelperManager;
 import org.apache.druid.indexing.overlord.supervisor.SupervisorManager;
@@ -170,6 +171,7 @@ public class OverlordTest
     druidNode = new DruidNode("hey", "what", false, 1234, null, true, false);
     ServiceEmitter serviceEmitter = new NoopServiceEmitter();
     taskMaster = new TaskMaster(
+        new TaskLockConfig(),
         new TaskQueueConfig(null, new Period(1), null, new Period(10)),
         taskLockbox,
         taskStorage,
diff --git a/services/src/main/java/org/apache/druid/cli/CliOverlord.java b/services/src/main/java/org/apache/druid/cli/CliOverlord.java
index d8a6ec9..00377a3 100644
--- a/services/src/main/java/org/apache/druid/cli/CliOverlord.java
+++ b/services/src/main/java/org/apache/druid/cli/CliOverlord.java
@@ -79,6 +79,7 @@ import org.apache.druid.indexing.overlord.autoscaling.ProvisioningSchedulerConfi
 import org.apache.druid.indexing.overlord.autoscaling.ProvisioningStrategy;
 import org.apache.druid.indexing.overlord.autoscaling.SimpleWorkerProvisioningConfig;
 import org.apache.druid.indexing.overlord.autoscaling.SimpleWorkerProvisioningStrategy;
+import org.apache.druid.indexing.overlord.config.TaskLockConfig;
 import org.apache.druid.indexing.overlord.config.TaskQueueConfig;
 import org.apache.druid.indexing.overlord.helpers.OverlordHelper;
 import org.apache.druid.indexing.overlord.helpers.TaskLogAutoCleaner;
@@ -174,6 +175,7 @@ public class CliOverlord extends ServerRunnable
 
             JsonConfigProvider.bind(binder, "druid.coordinator.asOverlord", CoordinatorOverlordServiceConfig.class);
             JsonConfigProvider.bind(binder, "druid.indexer.queue", TaskQueueConfig.class);
+            JsonConfigProvider.bind(binder, "druid.indexer.tasklock", TaskLockConfig.class);
             JsonConfigProvider.bind(binder, "druid.indexer.task", TaskConfig.class);
             JsonConfigProvider.bind(binder, "druid.indexer.auditlog", TaskAuditLogConfig.class);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org