You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2021/05/06 23:15:45 UTC

[GitHub] [druid] clintropolis commented on a change in pull request #11189: Fix idempotence of segment allocation and task report apis in native batch ingestion

clintropolis commented on a change in pull request #11189:
URL: https://github.com/apache/druid/pull/11189#discussion_r624978412



##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/IndexTaskClient.java
##########
@@ -398,7 +398,7 @@ private StringFullResponseHolder submitRequest(
         } else {
           try {
             final long sleepTime = delay.getMillis();
-            log.debug(
+            log.warn(

Review comment:
       :+1:

##########
File path: docs/ingestion/tasks.md
##########
@@ -347,9 +347,10 @@ The task context is used for various individual task configuration. The followin
 
 |property|default|description|
 |--------|-------|-----------|
-|taskLockTimeout|300000|task lock timeout in millisecond. For more details, see [Locking](#locking).|
-|forceTimeChunkLock|true|_Setting this to false is still experimental_<br/> Force to always use time chunk lock. If not set, each task automatically chooses a lock type to use. If this set, it will overwrite the `druid.indexer.tasklock.forceTimeChunkLock` [configuration for the overlord](../configuration/index.md#overlord-operations). See [Locking](#locking) for more details.|
-|priority|Different based on task types. See [Priority](#priority).|Task priority|
+|`taskLockTimeout`|300000|task lock timeout in millisecond. For more details, see [Locking](#locking).|
+|`forceTimeChunkLock`|true|_Setting this to false is still experimental_<br/> Force to always use time chunk lock. If not set, each task automatically chooses a lock type to use. If this set, it will overwrite the `druid.indexer.tasklock.forceTimeChunkLock` [configuration for the overlord](../configuration/index.md#overlord-operations). See [Locking](#locking) for more details.|
+|`priority`|Different based on task types. See [Priority](#priority).|Task priority|
+|`useLineageBasedSegmentAllocation`|false|Enable the new lineage-based segment allocation protocol for the native Parallel task with dynamic partitioning. This option should be off during the replacing rolling upgrade to Druid 0.22 or higher. Once the upgrade is done, it must be set to true.|

Review comment:
       maybe worth elaborating on why, e.g. "...must be set to true to ensure data correctness"

##########
File path: core/src/main/java/org/apache/druid/java/util/common/RetryUtils.java
##########
@@ -76,12 +77,34 @@
       @Nullable final CleanupAfterFailure cleanupAfterFailure,
       @Nullable final String messageOnRetry
   ) throws Exception
+  {
+    return retry(
+        f,
+        shouldRetry,
+        quietTries,
+        maxTries,
+        cleanupAfterFailure,
+        messageOnRetry,
+        false
+    );
+  }
+
+  @VisibleForTesting
+  static <T> T retry(
+      final Task<T> f,
+      final Predicate<Throwable> shouldRetry,
+      final int quietTries,
+      final int maxTries,
+      @Nullable final CleanupAfterFailure cleanupAfterFailure,
+      @Nullable final String messageOnRetry,
+      boolean skipSleep

Review comment:
       nit: is skip sleep the test parameter i guess? maybe worth javadocs

##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/TaskLockHelper.java
##########
@@ -281,16 +281,28 @@ public static void verifyRootPartitionIsAdjacentAndAtomicUpdateGroupIsFull(List<
         atomicUpdateGroupSize++;
       } else {
         if (curSegment.getEndRootPartitionId() != nextSegment.getStartRootPartitionId()) {
-          throw new ISE("Can't compact segments of non-consecutive rootPartition range");
+          throw new ISE(
+              "Can't compact segments of non-consecutive rootPartition range. Missing partitionIds between [%s] and [%s]",
+              curSegment.getEndRootPartitionId(),
+              nextSegment.getStartRootPartitionId()
+          );

Review comment:
       nice :+1:

##########
File path: indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
##########
@@ -173,17 +181,43 @@
           null
       );
 
+  protected static final double DEFAULT_TRANSIENT_TASK_FAILURE_RATE = 0.3;
+  protected static final double DEFAULT_TRANSIENT_API_FAILURE_RATE = 0.3;
+
   private static final Logger LOG = new Logger(AbstractParallelIndexSupervisorTaskTest.class);
 
   @Rule
   public final TemporaryFolder temporaryFolder = new TemporaryFolder();
 
+  /**
+   * Transient task failure rate emulated by the taskKiller in {@link SimpleThreadingTaskRunner}.
+   * Per {@link SubTaskSpec}, there could be at most one task failure.
+   */
+  private final double transientTaskFailureRate;
+
+  /**
+   * Transient API call failure rate emulated by {@link LocalParallelIndexSupervisorTaskClient}.
+   * This will be applied to every API calls in the future.
+   */
+  private final double transientApiCallFailureRate;
+

Review comment:
       cool :+1:

##########
File path: server/src/main/java/org/apache/druid/segment/realtime/appenderator/SegmentPublisherHelper.java
##########
@@ -72,7 +72,24 @@
       if (firstShardSpec instanceof OverwriteShardSpec) {
         annotateFn = annotateAtomicUpdateGroupFn(segmentsPerInterval.size());
       } else if (firstShardSpec instanceof BuildingShardSpec) {
-        annotateFn = annotateCorePartitionSetSizeFn(segmentsPerInterval.size());
+        // sanity check
+        // BuildingShardSpec is used in non-appending mode. In this mode,
+        // the segments in each interval should have contiguous partitionIds,
+        // so that they can be queryable (see PartitionHolder.isComplete()).
+        int expectedCorePartitionSetSize = segmentsPerInterval.size();
+        int actualCorePartitionSetSize = Math.toIntExact(
+            segmentsPerInterval
+                .stream()
+                .filter(segment -> segment.getShardSpec().getPartitionNum() < expectedCorePartitionSetSize)
+                .count()
+        );
+        if (expectedCorePartitionSetSize != actualCorePartitionSetSize) {
+          throw new ISE(
+              "Cannot publish segments due to incomplete time chunk. Segments are [%s]",
+              segmentsPerInterval.stream().map(DataSegment::getId).collect(Collectors.toList())

Review comment:
       :+1: on sanity check... is there any chance the list of segments is huge here? (maybe we should use `log.errorSegments` to log segments and just include count/interval or something?)

##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
##########
@@ -351,6 +355,7 @@ public boolean add(final Task task) throws EntryExistsException
 
     // Set forceTimeChunkLock before adding task spec to taskStorage, so that we can see always consistent task spec.
     task.addToContextIfAbsent(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockConfig.isForceTimeChunkLock());
+    defaultTaskConfig.getContext().forEach(task::addToContextIfAbsent);

Review comment:
       I think we should we also set the use lineage config if it is absent to true here so that custom taskContext configs that are missing that setting do not run with false. The documentation for the default config would then no longer need to indicate that config is the default config since it would be implicit




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org