You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2019/04/20 06:50:34 UTC

[GitHub] [incubator-druid] justinborromeo commented on a change in pull request #7514: Introduce segment limit for indexing tasks

justinborromeo commented on a change in pull request #7514: Introduce segment limit for indexing tasks
URL: https://github.com/apache/incubator-druid/pull/7514#discussion_r277126265
 
 

 ##########
 File path: indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
 ##########
 @@ -973,6 +979,38 @@ private TaskStatus generateAndPublishSegments(
             // (in append mode) or may be created on our own authority (in overwrite mode).
             sequenceName = getId();
           }
+          // Check if the upcoming row will result in the creation of a new segment.  If so and the new number of segments
+          // is greater than max total segments, push the segments.
+          if (ingestionSchema.getDataSchema().getGranularitySpec() instanceof UniformGranularitySpec) {
+            Interval targetInterval = ingestionSchema.getDataSchema()
+                                                     .getGranularitySpec()
+                                                     .getSegmentGranularity()
+                                                     .bucket(inputRow.getTimestamp());
+            if (!appenderator.getUnpublishedSegments()
+                             .stream()
+                             .map(SegmentIdWithShardSpec::getInterval)
+                             .collect(Collectors.toSet())
+                             .contains(targetInterval)
+                && !isGuaranteedRollup
+                && appenderator.getUnpublishedSegments().size() >= maxTotalSegments) {
+              final SegmentsAndMetadata pushed = driver.pushAllAndClear(pushTimeout);
+              log.info("Pushed segments[%s]", pushed.getSegments());
+            }
+          } else {
+            boolean rowInNewSegment = true;
+            for (SegmentIdWithShardSpec segment : appenderator.getUnpublishedSegments()) {
 
 Review comment:
   This can be done O(log(n)) instead of O(n) if the list of segments is sorted by time.  If a reviewer thinks this approach for enforcing max total segments is cool, I can optimize this part (it'll take a bit more work).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org