You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2021/03/30 05:15:40 UTC

[GitHub] [druid] jon-wei commented on a change in pull request #11025: Add an option for ingestion task to drop (mark unused) segments that are of the interval in the ingestionSpec

jon-wei commented on a change in pull request #11025:
URL: https://github.com/apache/druid/pull/11025#discussion_r603768694



##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java
##########
@@ -301,11 +317,12 @@ public boolean isAudited()
   public String toString()
   {
     return "SegmentTransactionalInsertAction{" +
-           "segmentsToBeOverwritten=" + SegmentUtils.commaSeparatedIdentifiers(segmentsToBeOverwritten) +
-           ", segments=" + SegmentUtils.commaSeparatedIdentifiers(segments) +
+           "segmentsToBeOverwritten=" + segmentsToBeOverwritten +

Review comment:
       Should this preserve the `SegmentUtils.commaSeparatedIdentifiers(` and apply it to `segmentsToBeDropped` as well?

##########
File path: docs/ingestion/native-batch.md
##########
@@ -719,6 +723,7 @@ that range if there's some stray data with unexpected timestamps.
 |type|The task type, this should always be "index".|none|yes|
 |inputFormat|[`inputFormat`](./data-formats.md#input-format) to specify how to parse input data.|none|yes|
 |appendToExisting|Creates segments as additional shards of the latest version, effectively appending to the segment set instead of replacing it. This means that you can append new segments to any datasource regardless of its original partitioning scheme. You must use the `dynamic` partitioning type for the appended segments. If you specify a different partitioning type, the task fails with an error.|false|no|
+|dropExisting|If set to true (and `appendToExisting` set to false and `interval` specified in `granularitySpec`), then the ingestion task would drop (mark unused) all existing segments that are fully contain by the `interval` in the `granularitySpec` when the task publishes new segments (no segments would be drop (mark unused) if the ingestion fails). Note that if either `appendToExisting` is `true` or `interval` is not specified in `granularitySpec` then no segments would be drop even if `dropExisting` is set to `true`.|false|no|

Review comment:
       ```suggestion
   |dropExisting|If set to true (and `appendToExisting` is set to false and `interval` is specified in `granularitySpec`), then the ingestion task would drop (mark unused) all existing segments that are fully contained by the `interval` in the `granularitySpec` when the task publishes new segments (no segments would be dropped (marked unused) if the ingestion fails). Note that if either `appendToExisting` is `true` or `interval` is not specified in `granularitySpec` then no segments would be dropped even if `dropExisting` is set to `true`.|false|no|
   ```

##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java
##########
@@ -63,13 +62,16 @@
   private final DataSourceMetadata endMetadata;
   @Nullable
   private final String dataSource;
+  @Nullable
+  private final Set<DataSegment> segmentsToBeDropped;

Review comment:
       Can you add some javadocs to this class that explain the difference between `segmentsToBeOverwritten` and `segmentsToBeDropped`?

##########
File path: docs/ingestion/native-batch.md
##########
@@ -89,6 +89,8 @@ You may want to consider the below things:
   data in segments where it actively adds data: if there are segments in your `granularitySpec`'s intervals that have
   no data written by this task, they will be left alone. If any existing segments partially overlap with the
   `granularitySpec`'s intervals, the portion of those segments outside the new segments' intervals will still be visible.
+  You can set `dropExisting` flag in the `ioConfig` to true if you want the ingestion task to drop all existing data 

Review comment:
       Suggest adding an example where the user would want to enable it, such as the YEAR/MONTH example in the PR description

##########
File path: docs/ingestion/native-batch.md
##########
@@ -193,6 +195,7 @@ that range if there's some stray data with unexpected timestamps.
 |type|The task type, this should always be `index_parallel`.|none|yes|
 |inputFormat|[`inputFormat`](./data-formats.md#input-format) to specify how to parse input data.|none|yes|
 |appendToExisting|Creates segments as additional shards of the latest version, effectively appending to the segment set instead of replacing it. This means that you can append new segments to any datasource regardless of its original partitioning scheme. You must use the `dynamic` partitioning type for the appended segments. If you specify a different partitioning type, the task fails with an error.|false|no|
+|dropExisting|If set to true (and `appendToExisting` set to false and `interval` specified in `granularitySpec`), then the ingestion task would drop (mark unused) all existing segments that are fully contain by the `interval` in the `granularitySpec` when the task publishes new segments (no segments would be drop (mark unused) if the ingestion fails). Note that if either `appendToExisting` is `true` or `interval` is not specified in `granularitySpec` then no segments would be drop even if `dropExisting` is set to `true`.|false|no|

Review comment:
       ```suggestion
   |dropExisting|If set to true (and `appendToExisting` is set to false and `interval` is specified in `granularitySpec`), then the ingestion task would drop (mark unused) all existing segments that are fully contained by the `interval` in the `granularitySpec` when the task publishes new segments (no segments would be dropped (marked unused) if the ingestion fails). Note that if either `appendToExisting` is `true` or `interval` is not specified in `granularitySpec` then no segments would be dropped even if `dropExisting` is set to `true`.|false|no|
   ```

##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
##########
@@ -490,6 +491,27 @@ public static boolean isGuaranteedRollup(IndexIOConfig ioConfig, IndexTuningConf
     }
   }
 
+  public static Set<DataSegment> getUsedSegmentsWithinInterval(
+      TaskToolbox toolbox,
+      String dataSource,
+      List<Interval> intervals
+  ) throws IOException
+  {
+    Set<DataSegment> segmentsFoundForDrop = new HashSet<>();
+    List<Interval> condensedIntervals = JodaUtils.condenseIntervals(intervals);
+    if (!intervals.isEmpty()) {
+      Collection<DataSegment> usedSegment = toolbox.getTaskActionClient().submit(new RetrieveUsedSegmentsAction(dataSource, null, condensedIntervals, Segments.ONLY_VISIBLE));
+      for (DataSegment segment : usedSegment) {
+        for (Interval interval : condensedIntervals) {
+          if (interval.contains(segment.getInterval())) {
+            segmentsFoundForDrop.add(segment);

Review comment:
       Could add a `continue` here once a segment is found for efficiency

##########
File path: server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
##########
@@ -108,7 +110,7 @@ public IndexerSQLMetadataStorageCoordinator(
     this.connector = connector;
   }
 
-  enum DataSourceMetadataUpdateResult
+  enum DataStoreMetadataUpdateResult

Review comment:
       What's the reasoning behind the rename?

##########
File path: docs/ingestion/native-batch.md
##########
@@ -193,6 +195,7 @@ that range if there's some stray data with unexpected timestamps.
 |type|The task type, this should always be `index_parallel`.|none|yes|
 |inputFormat|[`inputFormat`](./data-formats.md#input-format) to specify how to parse input data.|none|yes|
 |appendToExisting|Creates segments as additional shards of the latest version, effectively appending to the segment set instead of replacing it. This means that you can append new segments to any datasource regardless of its original partitioning scheme. You must use the `dynamic` partitioning type for the appended segments. If you specify a different partitioning type, the task fails with an error.|false|no|
+|dropExisting|If set to true (and `appendToExisting` set to false and `interval` specified in `granularitySpec`), then the ingestion task would drop (mark unused) all existing segments that are fully contain by the `interval` in the `granularitySpec` when the task publishes new segments (no segments would be drop (mark unused) if the ingestion fails). Note that if either `appendToExisting` is `true` or `interval` is not specified in `granularitySpec` then no segments would be drop even if `dropExisting` is set to `true`.|false|no|

Review comment:
       Suggest noting somewhere that compaction tasks will have this enabled




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org