You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2021/03/25 21:23:08 UTC

[GitHub] [druid] suneet-s commented on a change in pull request #10676: Allow client to configure batch ingestion task to wait to complete until segments are confirmed to be available by other

suneet-s commented on a change in pull request #10676:
URL: https://github.com/apache/druid/pull/10676#discussion_r601822317



##########
File path: server/src/main/java/org/apache/druid/segment/indexing/TuningConfig.java
##########
@@ -40,6 +40,7 @@
   int DEFAULT_MAX_SAVED_PARSE_EXCEPTIONS = 0;
   int DEFAULT_MAX_ROWS_IN_MEMORY = 1_000_000;
   boolean DEFAULT_SKIP_BYTES_IN_MEMORY_OVERHEAD_CHECK = false;
+  int DEFAULT_AWAIT_SEGMENT_AVAILABILITY_TIMEOUT_MILLIS = 0;

Review comment:
       nit: This is a long elsewhere. I don't think this matters though.
   
   ```suggestion
     long DEFAULT_AWAIT_SEGMENT_AVAILABILITY_TIMEOUT_MILLIS = 0;
   ```

##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/IngestionStatsAndErrorsTaskReportData.java
##########
@@ -41,17 +41,22 @@
   @Nullable
   private String errorMsg;
 
+  @JsonProperty
+  private boolean segmentAvailabilityConfirmed;
+
   public IngestionStatsAndErrorsTaskReportData(
       @JsonProperty("ingestionState") IngestionState ingestionState,
       @JsonProperty("unparseableEvents") Map<String, Object> unparseableEvents,
       @JsonProperty("rowStats") Map<String, Object> rowStats,
-      @JsonProperty("errorMsg") @Nullable String errorMsg
+      @JsonProperty("errorMsg") @Nullable String errorMsg,
+      @JsonProperty("segmentAvailabilityConfirmed") boolean segmentAvailabilityConfirmed

Review comment:
       Naive question: Should this be
   ```suggestion
         @JsonProperty("segmentAvailabilityConfirmed") @Nullable Boolean segmentAvailabilityConfirmed
   ```
   
   Seeing the json properties automatically make me think about version mismatches - but I don't exactly know how this is used - so I'm just asking in the hope you can save me some time from digging :)

##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
##########
@@ -566,6 +574,65 @@ static Granularity findGranularityFromSegments(List<DataSegment> segments)
     }
   }
 
+  /**
+   * Wait for segments to become available on the cluster. If waitTimeout is reached, giveup on waiting. This is a
+   * QoS method that can be used to make Batch Ingest tasks wait to finish until their ingested data is available on
+   * the cluster. Doing so gives an end user assurance that a Successful task status means their data is available
+   * for querying.
+   *
+   * @param toolbox {@link TaskToolbox} object with for assisting with task work.
+   * @param segmentsToWaitFor {@link List} of segments to wait for availability.
+   * @param waitTimeout Millis to wait before giving up
+   * @return True if all segments became available, otherwise False.
+   */
+  protected boolean waitForSegmentAvailability(
+      TaskToolbox toolbox,
+      List<DataSegment> segmentsToWaitFor,
+      long waitTimeout
+  )
+  {
+    if (segmentsToWaitFor.isEmpty()) {
+      log.info("Asked to wait for segments to be available, but I wasn't provided with any segments!?");

Review comment:
       nit: The question mark in this message makes it seem like this is an un-expected state. Should this be logged at a warn level instead?
   
   Honestly, if I didn't read the question mark at the end, I would have thought this is a reasonable info level message.

##########
File path: website/.spelling
##########
@@ -970,6 +970,7 @@ InputSplit
 JobHistory
 a.example.com
 assumeGrouped
+awaitSegmentAvailabilityTimeoutMillis

Review comment:
       nit: did you mean to add this twice?
   
   ```suggestion
   ```

##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
##########
@@ -566,6 +574,65 @@ static Granularity findGranularityFromSegments(List<DataSegment> segments)
     }
   }
 
+  /**
+   * Wait for segments to become available on the cluster. If waitTimeout is reached, giveup on waiting. This is a
+   * QoS method that can be used to make Batch Ingest tasks wait to finish until their ingested data is available on
+   * the cluster. Doing so gives an end user assurance that a Successful task status means their data is available
+   * for querying.
+   *
+   * @param toolbox {@link TaskToolbox} object with for assisting with task work.
+   * @param segmentsToWaitFor {@link List} of segments to wait for availability.
+   * @param waitTimeout Millis to wait before giving up
+   * @return True if all segments became available, otherwise False.
+   */
+  protected boolean waitForSegmentAvailability(
+      TaskToolbox toolbox,
+      List<DataSegment> segmentsToWaitFor,
+      long waitTimeout
+  )
+  {
+    if (segmentsToWaitFor.isEmpty()) {
+      log.info("Asked to wait for segments to be available, but I wasn't provided with any segments!?");
+      return true;
+    } else if (waitTimeout <= 0) {

Review comment:
       This shouldn't be a warn message since the default is 0. Maybe you want a less than check, which should technically never happen, since there is a check in the constructor.
   
   ```suggestion
       } else if (waitTimeout < 0) {
   ```

##########
File path: indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskSerdeTest.java
##########
@@ -267,6 +272,7 @@ public void testBestEffortRollupWithHashedPartitionsSpec()
         true,
         10,
         100,
+        null,
         null

Review comment:
       Do we need an `IndexTaskSerdeTest` where `awaitSegmentAvailabilityTimeoutMillis` is not null

##########
File path: indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
##########
@@ -1447,6 +1452,7 @@ private void assertIngestionSchema(
             null,
             null,
             null,
+            null,
             null

Review comment:
       Do we need a `CompactionTaskTest` where `awaitSegmentAvailabilityTimeoutMillis` is non null

##########
File path: integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
##########
@@ -376,6 +376,11 @@ private void loadData(String indexTask) throws Exception
   {
     String taskSpec = getResourceAsString(indexTask);
     taskSpec = StringUtils.replace(taskSpec, "%%DATASOURCE%%", fullDatasourceName);
+    taskSpec = StringUtils.replace(
+        taskSpec,
+        "%%SEGMENT_AVAIL_TIMEOUT_MILLIS%%",
+        jsonMapper.writeValueAsString("0")

Review comment:
       Is it worth testing what happens when this is non zero?
   
   Do we also want to check that the IngestionStatsAndErrorsTaskReportData reports `segmentAvailabilityConfirmed` as false now?

##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
##########
@@ -566,6 +574,65 @@ static Granularity findGranularityFromSegments(List<DataSegment> segments)
     }
   }
 
+  /**
+   * Wait for segments to become available on the cluster. If waitTimeout is reached, giveup on waiting. This is a
+   * QoS method that can be used to make Batch Ingest tasks wait to finish until their ingested data is available on
+   * the cluster. Doing so gives an end user assurance that a Successful task status means their data is available
+   * for querying.
+   *
+   * @param toolbox {@link TaskToolbox} object with for assisting with task work.
+   * @param segmentsToWaitFor {@link List} of segments to wait for availability.
+   * @param waitTimeout Millis to wait before giving up
+   * @return True if all segments became available, otherwise False.
+   */
+  protected boolean waitForSegmentAvailability(
+      TaskToolbox toolbox,
+      List<DataSegment> segmentsToWaitFor,
+      long waitTimeout
+  )
+  {
+    if (segmentsToWaitFor.isEmpty()) {
+      log.info("Asked to wait for segments to be available, but I wasn't provided with any segments!?");
+      return true;
+    } else if (waitTimeout <= 0) {
+      log.warn("Asked to wait for availability for <= 0 seconds?! Requested waitTimeout: [%s]", waitTimeout);
+      return false;
+    }
+    log.info("Waiting for [%d] segments to be loaded by the cluster...", segmentsToWaitFor.size());
+
+    SegmentHandoffNotifier notifier = toolbox.getSegmentHandoffNotifierFactory()
+                                             .createSegmentHandoffNotifier(segmentsToWaitFor.get(0).getDataSource());
+    ExecutorService exec = Execs.directExecutor();
+    CountDownLatch doneSignal = new CountDownLatch(segmentsToWaitFor.size());
+
+    notifier.start();
+    for (DataSegment s : segmentsToWaitFor) {
+      notifier.registerSegmentHandoffCallback(

Review comment:
       Another naive question: Is it possible for the segments to be handed off before the callback was registered?
   
   I haven't dug deep into this part of Druid yet, so I'm just if it's something you've considered

##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
##########
@@ -566,6 +574,65 @@ static Granularity findGranularityFromSegments(List<DataSegment> segments)
     }
   }
 
+  /**
+   * Wait for segments to become available on the cluster. If waitTimeout is reached, giveup on waiting. This is a
+   * QoS method that can be used to make Batch Ingest tasks wait to finish until their ingested data is available on
+   * the cluster. Doing so gives an end user assurance that a Successful task status means their data is available
+   * for querying.
+   *
+   * @param toolbox {@link TaskToolbox} object with for assisting with task work.
+   * @param segmentsToWaitFor {@link List} of segments to wait for availability.
+   * @param waitTimeout Millis to wait before giving up
+   * @return True if all segments became available, otherwise False.
+   */
+  protected boolean waitForSegmentAvailability(
+      TaskToolbox toolbox,
+      List<DataSegment> segmentsToWaitFor,
+      long waitTimeout
+  )
+  {
+    if (segmentsToWaitFor.isEmpty()) {
+      log.info("Asked to wait for segments to be available, but I wasn't provided with any segments!?");
+      return true;
+    } else if (waitTimeout <= 0) {
+      log.warn("Asked to wait for availability for <= 0 seconds?! Requested waitTimeout: [%s]", waitTimeout);
+      return false;
+    }
+    log.info("Waiting for [%d] segments to be loaded by the cluster...", segmentsToWaitFor.size());
+
+    SegmentHandoffNotifier notifier = toolbox.getSegmentHandoffNotifierFactory()
+                                             .createSegmentHandoffNotifier(segmentsToWaitFor.get(0).getDataSource());
+    ExecutorService exec = Execs.directExecutor();
+    CountDownLatch doneSignal = new CountDownLatch(segmentsToWaitFor.size());
+
+    notifier.start();
+    for (DataSegment s : segmentsToWaitFor) {
+      notifier.registerSegmentHandoffCallback(
+          new SegmentDescriptor(s.getInterval(), s.getVersion(), s.getShardSpec().getPartitionNum()),
+          exec,
+          () -> {
+            log.debug(
+                "Confirmed availability for [%s]. Removing from list of segments to wait for",
+                s.getId()
+            );
+            doneSignal.countDown();
+          }
+      );
+    }
+
+    try {
+      return doneSignal.await(waitTimeout, TimeUnit.MILLISECONDS);
+    }
+    catch (InterruptedException e) {
+      log.warn("Interrupted while waiting for segment availablity; Unable to confirm availability!");
+      Thread.currentThread().interrupt();
+      return false;
+    }
+    finally {
+      notifier.close();

Review comment:
       Instead of catching this in the finally block, I think a try-with-resource pattern would be safer. I don't think anything can fail in the for loop where we're registering callbacks, but if it does - the SegmentHandoffNotifier won't clean up after itself
   
   ```
   try (SegmentHandoffNotifier notifier = toolbox.getSegmentHandoffNotifierFactory().createSegmentHandoffNotifier(segmentsToWaitFor.get(0).getDataSource())) {
       // register handoffs and wait for signal
   }
   
   ```

##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
##########
@@ -1066,7 +1075,8 @@ private synchronized void persistSequences() throws IOException
                 ingestionState,
                 getTaskCompletionUnparseableEvents(),
                 getTaskCompletionRowStats(),
-                errorMsg
+                errorMsg,
+                errorMsg == null

Review comment:
       I think we should add a comment here that says something to the effect of "For streaming ingestion, segments are considered to be available immediately if the task is successful. If the task failed, the segments are not available"
   
   At least that's what I think this is trying to say.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org