You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2021/05/07 03:35:20 UTC

[GitHub] [druid] jihoonson commented on a change in pull request #11189: Fix idempotence of segment allocation and task report apis in native batch ingestion

jihoonson commented on a change in pull request #11189:
URL: https://github.com/apache/druid/pull/11189#discussion_r627901501



##########
File path: core/src/main/java/org/apache/druid/java/util/common/RetryUtils.java
##########
@@ -76,12 +77,34 @@
       @Nullable final CleanupAfterFailure cleanupAfterFailure,
       @Nullable final String messageOnRetry
   ) throws Exception
+  {
+    return retry(
+        f,
+        shouldRetry,
+        quietTries,
+        maxTries,
+        cleanupAfterFailure,
+        messageOnRetry,
+        false
+    );
+  }
+
+  @VisibleForTesting
+  static <T> T retry(
+      final Task<T> f,
+      final Predicate<Throwable> shouldRetry,
+      final int quietTries,
+      final int maxTries,
+      @Nullable final CleanupAfterFailure cleanupAfterFailure,
+      @Nullable final String messageOnRetry,
+      boolean skipSleep

Review comment:
       Added javadocs.

##########
File path: docs/ingestion/tasks.md
##########
@@ -347,9 +347,10 @@ The task context is used for various individual task configuration. The followin
 
 |property|default|description|
 |--------|-------|-----------|
-|taskLockTimeout|300000|task lock timeout in millisecond. For more details, see [Locking](#locking).|
-|forceTimeChunkLock|true|_Setting this to false is still experimental_<br/> Force to always use time chunk lock. If not set, each task automatically chooses a lock type to use. If this set, it will overwrite the `druid.indexer.tasklock.forceTimeChunkLock` [configuration for the overlord](../configuration/index.md#overlord-operations). See [Locking](#locking) for more details.|
-|priority|Different based on task types. See [Priority](#priority).|Task priority|
+|`taskLockTimeout`|300000|task lock timeout in millisecond. For more details, see [Locking](#locking).|
+|`forceTimeChunkLock`|true|_Setting this to false is still experimental_<br/> Force to always use time chunk lock. If not set, each task automatically chooses a lock type to use. If this set, it will overwrite the `druid.indexer.tasklock.forceTimeChunkLock` [configuration for the overlord](../configuration/index.md#overlord-operations). See [Locking](#locking) for more details.|
+|`priority`|Different based on task types. See [Priority](#priority).|Task priority|
+|`useLineageBasedSegmentAllocation`|false|Enable the new lineage-based segment allocation protocol for the native Parallel task with dynamic partitioning. This option should be off during the replacing rolling upgrade to Druid 0.22 or higher. Once the upgrade is done, it must be set to true.|

Review comment:
       Updated the doc per suggestion.

##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
##########
@@ -351,6 +355,7 @@ public boolean add(final Task task) throws EntryExistsException
 
     // Set forceTimeChunkLock before adding task spec to taskStorage, so that we can see always consistent task spec.
     task.addToContextIfAbsent(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockConfig.isForceTimeChunkLock());
+    defaultTaskConfig.getContext().forEach(task::addToContextIfAbsent);

Review comment:
       Good idea. I changed the default of the default context to an empty map, and added `useLineageBasedSegmentAllocation` here.

##########
File path: server/src/main/java/org/apache/druid/segment/realtime/appenderator/SegmentPublisherHelper.java
##########
@@ -72,7 +72,24 @@
       if (firstShardSpec instanceof OverwriteShardSpec) {
         annotateFn = annotateAtomicUpdateGroupFn(segmentsPerInterval.size());
       } else if (firstShardSpec instanceof BuildingShardSpec) {
-        annotateFn = annotateCorePartitionSetSizeFn(segmentsPerInterval.size());
+        // sanity check
+        // BuildingShardSpec is used in non-appending mode. In this mode,
+        // the segments in each interval should have contiguous partitionIds,
+        // so that they can be queryable (see PartitionHolder.isComplete()).
+        int expectedCorePartitionSetSize = segmentsPerInterval.size();
+        int actualCorePartitionSetSize = Math.toIntExact(
+            segmentsPerInterval
+                .stream()
+                .filter(segment -> segment.getShardSpec().getPartitionNum() < expectedCorePartitionSetSize)
+                .count()
+        );
+        if (expectedCorePartitionSetSize != actualCorePartitionSetSize) {
+          throw new ISE(
+              "Cannot publish segments due to incomplete time chunk. Segments are [%s]",
+              segmentsPerInterval.stream().map(DataSegment::getId).collect(Collectors.toList())

Review comment:
       Thanks for reminding me of that. Changed to use `log.errorSegments` and to not create a too large string for exception.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org