You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2021/05/03 07:19:28 UTC

[GitHub] [druid] jihoonson opened a new pull request #11189: Fix idempotence of segment allocation and task report apis in native batch ingestion

jihoonson opened a new pull request #11189:
URL: https://github.com/apache/druid/pull/11189


   ### Description
   
   Most internal APIs used in Druid's ingestion should be idempotent to handle transient errors. This PR fixes the idempotence of two APIs used in native batch ingestion.
   
   The first API is the segment allocation API used in dynamic partitioning. Currently, transient network errors or task failures can lead to non-contiguous segment partitionIds allocated by this API. This can be a problem because `PartitionHolder` of those segments of non-contiguous partitionIds will be never complete in the broker timeline. As a result, everything will look fine, that means, the task will succeed, segments will be published into the metadata store, historicals will load and announce them, but you will never be able to query them.
   
   To fix the segment allocation API, I had to add a new API that accepts extra parameters such as `sequenceName` to guarantee the idempotence. This will break the rolling upgrade that replaces nodes with a newer version one at a time. To resolve this issue, I added a new taskContext, `useLineageBasedSegmentAllocation`, to control which protocol to use for segment allocation in dynamic partitioning. This context is true by default and must be set to false during rolling upgrade. The in-place rolling upgrade is not a consideration because batch ingestion doesn't support it.
   
   The second API is the task report API used in all native batch ingestion types. This API can handle retries triggered by transient network errors, but cannot handle duplicate reports by task retries. As a result, if there is a task that failed after sending its report, the supervisor task will count both the report of the failed task and that of the retry task. Because of this bug, the parallel task can incorrectly estimate the cardinality of partition column and the distribution of partition column in hash and range partitioning, respectively.
   
   Finally, to test the fix, I added random task failures and API call retries (emulating transient network failures) in `AbstractParallelIndexSupervisorTaskTest`. All unit tests extending this class, such as `CompactionTaskParallelRunTest`, `HashPartitionMultiPhaseParallelIndexingTest`, `SinglePhaseParallelIndexingTest`, and `RangePartitionMultiPhaseParallelIndexingTest`, now run with potential transient task failures and API call retries.
   
   <hr>
   
   ##### Key changed/added classes in this PR
    * `SinglePhaseParallelIndexTaskRunner`
    * `TaskMonitor`
    * `AbstractParallelIndexSupervisorTaskTest`
   
   <hr>
   
   <!-- Check the items by putting "x" in the brackets for the done things. Not all of these items apply to every PR. Remove the items which are not done or not relevant to the PR. None of the items from the checklist below are strictly necessary, but it would be very helpful if you at least self-review the PR. -->
   
   This PR has:
   - [x] been self-reviewed.
      - [ ] using the [concurrency checklist](https://github.com/apache/druid/blob/master/dev/code-review/concurrency.md) (Remove this item if the PR doesn't have any relation to concurrency.)
   - [ ] added documentation for new or modified features or behaviors.
   - [x] added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
   - [ ] added or updated version, license, or notice information in [licenses.yaml](https://github.com/apache/druid/blob/master/dev/license.md)
   - [x] added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
   - [x] added unit tests or modified existing tests to cover new code paths, ensuring the threshold for [code coverage](https://github.com/apache/druid/blob/master/dev/code-review/code-coverage.md) is met.
   - [ ] added integration tests.
   - [ ] been tested in a test Druid cluster.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on pull request #11189: Fix idempotence of segment allocation and task report apis in native batch ingestion

Posted by GitBox <gi...@apache.org>.
jihoonson commented on pull request #11189:
URL: https://github.com/apache/druid/pull/11189#issuecomment-831479796


   I added `druid.indexer.task.useLineageBasedSegmentAllocation` for middleManagers instead of taskContext.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on pull request #11189: Fix idempotence of segment allocation and task report apis in native batch ingestion

Posted by GitBox <gi...@apache.org>.
jihoonson commented on pull request #11189:
URL: https://github.com/apache/druid/pull/11189#issuecomment-831122926


   I think we need a cluster-wide configuration corresponding to the new taskContext. I will add it soon. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on pull request #11189: Fix idempotence of segment allocation and task report apis in native batch ingestion

Posted by GitBox <gi...@apache.org>.
jihoonson commented on pull request #11189:
URL: https://github.com/apache/druid/pull/11189#issuecomment-831565132


   I manually tested the behavior with `druid.indexer.task.useLineageBasedSegmentAllocation` during rolling upgrade.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson edited a comment on pull request #11189: Fix idempotence of segment allocation and task report apis in native batch ingestion

Posted by GitBox <gi...@apache.org>.
jihoonson edited a comment on pull request #11189:
URL: https://github.com/apache/druid/pull/11189#issuecomment-831565132


   I manually tested the behavior with `druid.indexer.task.default.context = { "useLineageBasedSegmentAllocation": true}` during rolling upgrade.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on a change in pull request #11189: Fix idempotence of segment allocation and task report apis in native batch ingestion

Posted by GitBox <gi...@apache.org>.
jihoonson commented on a change in pull request #11189:
URL: https://github.com/apache/druid/pull/11189#discussion_r627901501



##########
File path: core/src/main/java/org/apache/druid/java/util/common/RetryUtils.java
##########
@@ -76,12 +77,34 @@
       @Nullable final CleanupAfterFailure cleanupAfterFailure,
       @Nullable final String messageOnRetry
   ) throws Exception
+  {
+    return retry(
+        f,
+        shouldRetry,
+        quietTries,
+        maxTries,
+        cleanupAfterFailure,
+        messageOnRetry,
+        false
+    );
+  }
+
+  @VisibleForTesting
+  static <T> T retry(
+      final Task<T> f,
+      final Predicate<Throwable> shouldRetry,
+      final int quietTries,
+      final int maxTries,
+      @Nullable final CleanupAfterFailure cleanupAfterFailure,
+      @Nullable final String messageOnRetry,
+      boolean skipSleep

Review comment:
       Added javadocs.

##########
File path: docs/ingestion/tasks.md
##########
@@ -347,9 +347,10 @@ The task context is used for various individual task configuration. The followin
 
 |property|default|description|
 |--------|-------|-----------|
-|taskLockTimeout|300000|task lock timeout in millisecond. For more details, see [Locking](#locking).|
-|forceTimeChunkLock|true|_Setting this to false is still experimental_<br/> Force to always use time chunk lock. If not set, each task automatically chooses a lock type to use. If this set, it will overwrite the `druid.indexer.tasklock.forceTimeChunkLock` [configuration for the overlord](../configuration/index.md#overlord-operations). See [Locking](#locking) for more details.|
-|priority|Different based on task types. See [Priority](#priority).|Task priority|
+|`taskLockTimeout`|300000|task lock timeout in millisecond. For more details, see [Locking](#locking).|
+|`forceTimeChunkLock`|true|_Setting this to false is still experimental_<br/> Force to always use time chunk lock. If not set, each task automatically chooses a lock type to use. If this set, it will overwrite the `druid.indexer.tasklock.forceTimeChunkLock` [configuration for the overlord](../configuration/index.md#overlord-operations). See [Locking](#locking) for more details.|
+|`priority`|Different based on task types. See [Priority](#priority).|Task priority|
+|`useLineageBasedSegmentAllocation`|false|Enable the new lineage-based segment allocation protocol for the native Parallel task with dynamic partitioning. This option should be off during the replacing rolling upgrade to Druid 0.22 or higher. Once the upgrade is done, it must be set to true.|

Review comment:
       Updated the doc per suggestion.

##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
##########
@@ -351,6 +355,7 @@ public boolean add(final Task task) throws EntryExistsException
 
     // Set forceTimeChunkLock before adding task spec to taskStorage, so that we can see always consistent task spec.
     task.addToContextIfAbsent(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockConfig.isForceTimeChunkLock());
+    defaultTaskConfig.getContext().forEach(task::addToContextIfAbsent);

Review comment:
       Good idea. I changed the default of the default context to an empty map, and added `useLineageBasedSegmentAllocation` here.

##########
File path: server/src/main/java/org/apache/druid/segment/realtime/appenderator/SegmentPublisherHelper.java
##########
@@ -72,7 +72,24 @@
       if (firstShardSpec instanceof OverwriteShardSpec) {
         annotateFn = annotateAtomicUpdateGroupFn(segmentsPerInterval.size());
       } else if (firstShardSpec instanceof BuildingShardSpec) {
-        annotateFn = annotateCorePartitionSetSizeFn(segmentsPerInterval.size());
+        // sanity check
+        // BuildingShardSpec is used in non-appending mode. In this mode,
+        // the segments in each interval should have contiguous partitionIds,
+        // so that they can be queryable (see PartitionHolder.isComplete()).
+        int expectedCorePartitionSetSize = segmentsPerInterval.size();
+        int actualCorePartitionSetSize = Math.toIntExact(
+            segmentsPerInterval
+                .stream()
+                .filter(segment -> segment.getShardSpec().getPartitionNum() < expectedCorePartitionSetSize)
+                .count()
+        );
+        if (expectedCorePartitionSetSize != actualCorePartitionSetSize) {
+          throw new ISE(
+              "Cannot publish segments due to incomplete time chunk. Segments are [%s]",
+              segmentsPerInterval.stream().map(DataSegment::getId).collect(Collectors.toList())

Review comment:
       Thanks for reminding me of that. Changed to use `log.errorSegments` and to not create a too large string for exception.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson merged pull request #11189: Fix idempotence of segment allocation and task report apis in native batch ingestion

Posted by GitBox <gi...@apache.org>.
jihoonson merged pull request #11189:
URL: https://github.com/apache/druid/pull/11189


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] eeren-mosaic commented on pull request #11189: Fix idempotence of segment allocation and task report apis in native batch ingestion

Posted by GitBox <gi...@apache.org>.
eeren-mosaic commented on pull request #11189:
URL: https://github.com/apache/druid/pull/11189#issuecomment-865851201


   Hi, I am wondering if this fix may be related to the issue observed in https://github.com/apache/druid/issues/11348 as well?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] clintropolis commented on a change in pull request #11189: Fix idempotence of segment allocation and task report apis in native batch ingestion

Posted by GitBox <gi...@apache.org>.
clintropolis commented on a change in pull request #11189:
URL: https://github.com/apache/druid/pull/11189#discussion_r624978412



##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/IndexTaskClient.java
##########
@@ -398,7 +398,7 @@ private StringFullResponseHolder submitRequest(
         } else {
           try {
             final long sleepTime = delay.getMillis();
-            log.debug(
+            log.warn(

Review comment:
       :+1:

##########
File path: docs/ingestion/tasks.md
##########
@@ -347,9 +347,10 @@ The task context is used for various individual task configuration. The followin
 
 |property|default|description|
 |--------|-------|-----------|
-|taskLockTimeout|300000|task lock timeout in millisecond. For more details, see [Locking](#locking).|
-|forceTimeChunkLock|true|_Setting this to false is still experimental_<br/> Force to always use time chunk lock. If not set, each task automatically chooses a lock type to use. If this set, it will overwrite the `druid.indexer.tasklock.forceTimeChunkLock` [configuration for the overlord](../configuration/index.md#overlord-operations). See [Locking](#locking) for more details.|
-|priority|Different based on task types. See [Priority](#priority).|Task priority|
+|`taskLockTimeout`|300000|task lock timeout in millisecond. For more details, see [Locking](#locking).|
+|`forceTimeChunkLock`|true|_Setting this to false is still experimental_<br/> Force to always use time chunk lock. If not set, each task automatically chooses a lock type to use. If this set, it will overwrite the `druid.indexer.tasklock.forceTimeChunkLock` [configuration for the overlord](../configuration/index.md#overlord-operations). See [Locking](#locking) for more details.|
+|`priority`|Different based on task types. See [Priority](#priority).|Task priority|
+|`useLineageBasedSegmentAllocation`|false|Enable the new lineage-based segment allocation protocol for the native Parallel task with dynamic partitioning. This option should be off during the replacing rolling upgrade to Druid 0.22 or higher. Once the upgrade is done, it must be set to true.|

Review comment:
       maybe worth elaborating on why, e.g. "...must be set to true to ensure data correctness"

##########
File path: core/src/main/java/org/apache/druid/java/util/common/RetryUtils.java
##########
@@ -76,12 +77,34 @@
       @Nullable final CleanupAfterFailure cleanupAfterFailure,
       @Nullable final String messageOnRetry
   ) throws Exception
+  {
+    return retry(
+        f,
+        shouldRetry,
+        quietTries,
+        maxTries,
+        cleanupAfterFailure,
+        messageOnRetry,
+        false
+    );
+  }
+
+  @VisibleForTesting
+  static <T> T retry(
+      final Task<T> f,
+      final Predicate<Throwable> shouldRetry,
+      final int quietTries,
+      final int maxTries,
+      @Nullable final CleanupAfterFailure cleanupAfterFailure,
+      @Nullable final String messageOnRetry,
+      boolean skipSleep

Review comment:
       nit: is skip sleep the test parameter i guess? maybe worth javadocs

##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/TaskLockHelper.java
##########
@@ -281,16 +281,28 @@ public static void verifyRootPartitionIsAdjacentAndAtomicUpdateGroupIsFull(List<
         atomicUpdateGroupSize++;
       } else {
         if (curSegment.getEndRootPartitionId() != nextSegment.getStartRootPartitionId()) {
-          throw new ISE("Can't compact segments of non-consecutive rootPartition range");
+          throw new ISE(
+              "Can't compact segments of non-consecutive rootPartition range. Missing partitionIds between [%s] and [%s]",
+              curSegment.getEndRootPartitionId(),
+              nextSegment.getStartRootPartitionId()
+          );

Review comment:
       nice :+1:

##########
File path: indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
##########
@@ -173,17 +181,43 @@
           null
       );
 
+  protected static final double DEFAULT_TRANSIENT_TASK_FAILURE_RATE = 0.3;
+  protected static final double DEFAULT_TRANSIENT_API_FAILURE_RATE = 0.3;
+
   private static final Logger LOG = new Logger(AbstractParallelIndexSupervisorTaskTest.class);
 
   @Rule
   public final TemporaryFolder temporaryFolder = new TemporaryFolder();
 
+  /**
+   * Transient task failure rate emulated by the taskKiller in {@link SimpleThreadingTaskRunner}.
+   * Per {@link SubTaskSpec}, there could be at most one task failure.
+   */
+  private final double transientTaskFailureRate;
+
+  /**
+   * Transient API call failure rate emulated by {@link LocalParallelIndexSupervisorTaskClient}.
+   * This will be applied to every API calls in the future.
+   */
+  private final double transientApiCallFailureRate;
+

Review comment:
       cool :+1:

##########
File path: server/src/main/java/org/apache/druid/segment/realtime/appenderator/SegmentPublisherHelper.java
##########
@@ -72,7 +72,24 @@
       if (firstShardSpec instanceof OverwriteShardSpec) {
         annotateFn = annotateAtomicUpdateGroupFn(segmentsPerInterval.size());
       } else if (firstShardSpec instanceof BuildingShardSpec) {
-        annotateFn = annotateCorePartitionSetSizeFn(segmentsPerInterval.size());
+        // sanity check
+        // BuildingShardSpec is used in non-appending mode. In this mode,
+        // the segments in each interval should have contiguous partitionIds,
+        // so that they can be queryable (see PartitionHolder.isComplete()).
+        int expectedCorePartitionSetSize = segmentsPerInterval.size();
+        int actualCorePartitionSetSize = Math.toIntExact(
+            segmentsPerInterval
+                .stream()
+                .filter(segment -> segment.getShardSpec().getPartitionNum() < expectedCorePartitionSetSize)
+                .count()
+        );
+        if (expectedCorePartitionSetSize != actualCorePartitionSetSize) {
+          throw new ISE(
+              "Cannot publish segments due to incomplete time chunk. Segments are [%s]",
+              segmentsPerInterval.stream().map(DataSegment::getId).collect(Collectors.toList())

Review comment:
       :+1: on sanity check... is there any chance the list of segments is huge here? (maybe we should use `log.errorSegments` to log segments and just include count/interval or something?)

##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
##########
@@ -351,6 +355,7 @@ public boolean add(final Task task) throws EntryExistsException
 
     // Set forceTimeChunkLock before adding task spec to taskStorage, so that we can see always consistent task spec.
     task.addToContextIfAbsent(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockConfig.isForceTimeChunkLock());
+    defaultTaskConfig.getContext().forEach(task::addToContextIfAbsent);

Review comment:
       I think we should we also set the use lineage config if it is absent to true here so that custom taskContext configs that are missing that setting do not run with false. The documentation for the default config would then no longer need to indicate that config is the default config since it would be implicit




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson edited a comment on pull request #11189: Fix idempotence of segment allocation and task report apis in native batch ingestion

Posted by GitBox <gi...@apache.org>.
jihoonson edited a comment on pull request #11189:
URL: https://github.com/apache/druid/pull/11189#issuecomment-831479796


   ~I added `druid.indexer.task.useLineageBasedSegmentAllocation` for middleManagers instead of taskContext.~


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jon-wei commented on a change in pull request #11189: Fix idempotence of segment allocation and task report apis in native batch ingestion

Posted by GitBox <gi...@apache.org>.
jon-wei commented on a change in pull request #11189:
URL: https://github.com/apache/druid/pull/11189#discussion_r627847812



##########
File path: docs/ingestion/tasks.md
##########
@@ -347,9 +347,10 @@ The task context is used for various individual task configuration. The followin
 
 |property|default|description|
 |--------|-------|-----------|
-|taskLockTimeout|300000|task lock timeout in millisecond. For more details, see [Locking](#locking).|
-|forceTimeChunkLock|true|_Setting this to false is still experimental_<br/> Force to always use time chunk lock. If not set, each task automatically chooses a lock type to use. If this set, it will overwrite the `druid.indexer.tasklock.forceTimeChunkLock` [configuration for the overlord](../configuration/index.md#overlord-operations). See [Locking](#locking) for more details.|
-|priority|Different based on task types. See [Priority](#priority).|Task priority|
+|`taskLockTimeout`|300000|task lock timeout in millisecond. For more details, see [Locking](#locking).|
+|`forceTimeChunkLock`|true|_Setting this to false is still experimental_<br/> Force to always use time chunk lock. If not set, each task automatically chooses a lock type to use. If this set, it will overwrite the `druid.indexer.tasklock.forceTimeChunkLock` [configuration for the overlord](../configuration/index.md#overlord-operations). See [Locking](#locking) for more details.|
+|`priority`|Different based on task types. See [Priority](#priority).|Task priority|
+|`useLineageBasedSegmentAllocation`|false|Enable the new lineage-based segment allocation protocol for the native Parallel task with dynamic partitioning. This option should be off during the replacing rolling upgrade to Druid 0.22 or higher. Once the upgrade is done, it must be set to true.|

Review comment:
       Suggest also adding a note that this applies if upgrading from a pre-0.22.0 version




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on pull request #11189: Fix idempotence of segment allocation and task report apis in native batch ingestion

Posted by GitBox <gi...@apache.org>.
jihoonson commented on pull request #11189:
URL: https://github.com/apache/druid/pull/11189#issuecomment-834793112


   @jon-wei @clintropolis thanks for the review!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org