You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2019/04/01 21:36:00 UTC

[GitHub] [incubator-druid] jihoonson commented on a change in pull request #7048: Make IngestSegmentFirehoseFactory splittable for parallel ingestion

jihoonson commented on a change in pull request #7048: Make IngestSegmentFirehoseFactory splittable for parallel ingestion
URL: https://github.com/apache/incubator-druid/pull/7048#discussion_r271060090
 
 

 ##########
 File path: indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java
 ##########
 @@ -251,6 +285,179 @@ private long jitter(long input)
     return retval < 0 ? 0 : retval;
   }
 
+  private List<TimelineObjectHolder<String, DataSegment>> getTimeline()
+  {
+    if (interval == null) {
+      return getTimelineForSegmentIds();
+    } else {
+      return getTimelineForInterval();
+    }
+  }
+
+  private List<TimelineObjectHolder<String, DataSegment>> getTimelineForInterval()
+  {
+    Preconditions.checkNotNull(interval);
+
+    // This call used to use the TaskActionClient, so for compatibility we use the same retry configuration
+    // as TaskActionClient.
+    final RetryPolicy retryPolicy = retryPolicyFactory.makeRetryPolicy();
+    List<DataSegment> usedSegments;
+    while (true) {
+      try {
+        usedSegments =
+            coordinatorClient.getDatabaseSegmentDataSourceSegments(dataSource, Collections.singletonList(interval));
+        break;
+      }
+      catch (Throwable e) {
+        log.warn(e, "Exception getting database segments");
+        final Duration delay = retryPolicy.getAndIncrementRetryDelay();
+        if (delay == null) {
+          throw e;
+        } else {
+          final long sleepTime = jitter(delay.getMillis());
+          log.info("Will try again in [%s].", new Duration(sleepTime).toString());
+          try {
+            Thread.sleep(sleepTime);
+          }
+          catch (InterruptedException e2) {
+            throw new RuntimeException(e2);
+          }
+        }
+      }
+    }
+
+    return VersionedIntervalTimeline.forSegments(usedSegments).lookup(interval);
+  }
+
+  private List<TimelineObjectHolder<String, DataSegment>> getTimelineForSegmentIds()
+  {
+    final SortedMap<Interval, TimelineObjectHolder<String, DataSegment>> timeline = new TreeMap<>(
+        Comparators.intervalsByStartThenEnd()
+    );
+    for (WindowedSegmentId windowedSegmentId : Preconditions.checkNotNull(segmentIds)) {
+      final DataSegment segment = coordinatorClient.getDatabaseSegmentDataSourceSegment(
+          dataSource,
+          windowedSegmentId.getSegmentId()
+      );
+      for (Interval interval : windowedSegmentId.getIntervals()) {
+        final TimelineObjectHolder<String, DataSegment> existingHolder = timeline.get(interval);
+        if (existingHolder != null) {
+          if (!existingHolder.getVersion().equals(segment.getVersion())) {
+            throw new ISE("Timeline segments with the same interval should have the same version: " +
+                          "existing version[%s] vs new segment[%s]", existingHolder.getVersion(), segment);
+          }
+          existingHolder.getObject().add(segment.getShardSpec().createChunk(segment));
+        } else {
+          timeline.put(interval, new TimelineObjectHolder<>(
+              interval,
+              segment.getInterval(),
+              segment.getVersion(),
+              new PartitionHolder<DataSegment>(segment.getShardSpec().createChunk(segment))
+          ));
+        }
+      }
+    }
+
+    // Validate that none of the given windows overlaps (except for when multiple segments share exactly the
+    // same interval).
+    Interval lastInterval = null;
+    for (Interval interval : timeline.keySet()) {
+      if (lastInterval != null) {
+        if (interval.overlaps(lastInterval)) {
+          throw new IAE(
+              "Distinct intervals in input segments may not overlap: [%s] vs [%s]",
+              lastInterval,
+              interval
+          );
+        }
+      }
+      lastInterval = interval;
+    }
+
+    return new ArrayList<>(timeline.values());
+  }
+
+  private void initializeSplitsIfNeeded()
+  {
+    if (splits != null) {
+      return;
+    }
+
+    // isSplittable() ensures this is only called when we have an interval.
+    final List<TimelineObjectHolder<String, DataSegment>> timelineSegments = getTimelineForInterval();
+
+    // We do the simplest possible greedy algorithm here instead of anything cleverer. The general bin packing
+    // problem is NP-hard, and we'd like to get segments from the same interval into the same split so that their
+    // data can combine with each other anyway.
+
+    List<InputSplit<List<WindowedSegmentId>>> newSplits = new ArrayList<>();
+    List<WindowedSegmentId> currentSplit = new ArrayList<>();
+    Map<DataSegment, WindowedSegmentId> windowedSegmentIds = new HashMap<>();
+    long bytesInCurrentSplit = 0;
+    for (TimelineObjectHolder<String, DataSegment> timelineHolder : timelineSegments) {
+      for (PartitionChunk<DataSegment> chunk : timelineHolder.getObject()) {
+        final DataSegment segment = chunk.getObject();
+        final WindowedSegmentId existingWindowedSegmentId = windowedSegmentIds.get(segment);
+        if (existingWindowedSegmentId != null) {
+          // We've already seen this segment in the timeline, so just add this interval to it. It has already
+          // been placed into a split.
+          existingWindowedSegmentId.getIntervals().add(timelineHolder.getInterval());
+        } else {
+          // It's the first time we've seen this segment, so create a new WindowedSegmentId.
+          List<Interval> intervals = new ArrayList<>();
+          // Use the interval that contributes to the timeline, not the entire segment's true interval.
+          intervals.add(timelineHolder.getInterval());
+          final WindowedSegmentId newWindowedSegmentId = new WindowedSegmentId(segment.getId().toString(), intervals);
+          windowedSegmentIds.put(segment, newWindowedSegmentId);
+
+          // Now figure out if it goes in the current split or not.
+          final long segmentBytes = segment.getSize();
+          if (bytesInCurrentSplit + segmentBytes > maxInputSegmentBytesPerTask && !currentSplit.isEmpty()) {
+            // This segment won't fit in the current non-empty split, so this split is done.
+            newSplits.add(new InputSplit<>(currentSplit));
+            currentSplit = new ArrayList<>();
+            bytesInCurrentSplit = 0;
+          }
+          if (segmentBytes > maxInputSegmentBytesPerTask) {
 
 Review comment:
   Ok, it makes sense.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org