You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2021/01/04 06:24:45 UTC

[GitHub] [druid] abhishekagarwal87 commented on a change in pull request #10676: Allow client to configure batch ingestion task to wait to complete until segments are confirmed to be available by other

abhishekagarwal87 commented on a change in pull request #10676:
URL: https://github.com/apache/druid/pull/10676#discussion_r551135600



##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
##########
@@ -576,6 +582,73 @@ static Granularity findGranularityFromSegments(List<DataSegment> segments)
     }
   }
 
+  /**
+   * Wait for segments to become available on the cluster. If waitTimeout is reached, giveup on waiting. This is a
+   * QoS method that can be used to make Batch Ingest tasks wait to finish until their ingested data is available on
+   * the cluster. Doing so gives an end user assurance that a Successful task status means their data is available
+   * for querying.
+   *
+   * @param toolbox {@link TaskToolbox} object with for assisting with task work.
+   * @param segmentsToWaitFor {@link List} of segments to wait for availability.
+   * @param waitTimeout Millis to wait before giving up
+   * @return True if all segments became available, otherwise False.
+   */
+  protected boolean waitForSegmentAvailability(TaskToolbox toolbox, ExecutorService exec, List<DataSegment> segmentsToWaitFor, long waitTimeout)
+  {
+    if (segmentsToWaitFor.isEmpty()) {
+      log.info("Asked to wait for segments to be available, but I wasn't provided with any segments!?");
+      return false;
+    }
+    log.info("Waiting for segments to be loaded by the cluster...");
+
+    SegmentHandoffNotifier notifier = toolbox.getSegmentHandoffNotifierFactory()
+                                             .createSegmentHandoffNotifier(segmentsToWaitFor.get(0).getDataSource());
+
+    notifier.start();
+    for (DataSegment s : segmentsToWaitFor) {
+      notifier.registerSegmentHandoffCallback(
+          new SegmentDescriptor(s.getInterval(), s.getVersion(), s.getShardSpec().getPartitionNum()),
+          exec,
+          () -> {
+            log.info(
+                "Confirmed availability for [%s]. Removing from list of segments to wait for",
+                s.getId()
+            );
+            synchronized (availabilityCondition) {
+              segmentsToWaitFor.remove(s);

Review comment:
       Looks good to me @capistrant. Thanks.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org