You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ji...@apache.org on 2019/04/02 21:59:23 UTC

[incubator-druid] branch master updated: Make IngestSegmentFirehoseFactory splittable for parallel ingestion (#7048)

This is an automated email from the ASF dual-hosted git repository.

jihoonson pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 4e23c11  Make IngestSegmentFirehoseFactory splittable for parallel ingestion (#7048)
4e23c11 is described below

commit 4e23c11345de694c997881d72ca6c668b3fb795b
Author: David Glasser <gl...@apollographql.com>
AuthorDate: Tue Apr 2 14:59:17 2019 -0700

    Make IngestSegmentFirehoseFactory splittable for parallel ingestion (#7048)
    
    * Make IngestSegmentFirehoseFactory splittable for parallel ingestion
    
    * Code review feedback
    
    - Get rid of WindowedSegment
    - Don't document 'segments' parameter or support splitting firehoses that use it
    - Require 'intervals' in WindowedSegmentId (since it won't be written by hand)
    
    * Add missing @JsonProperty
    
    * Integration test passes
    
    * Add unit test
    
    * Remove two FIXME comments from CompactionTask
    
    I'd like to leave this PR in a potentially mergeable state, but I still would
    appreciate reviewer eyes on the questions I'm removing here.
    
    * Updates from code review
---
 docs/content/ingestion/firehose.md                 |   4 +-
 docs/content/ingestion/native_tasks.md             |   2 +-
 .../druid/indexing/common/task/CompactionTask.java |   2 +
 .../firehose/IngestSegmentFirehoseFactory.java     | 293 ++++++++++++++++++---
 .../druid/indexing/firehose/WindowedSegmentId.java |  61 +++++
 .../firehose/IngestSegmentFirehoseFactoryTest.java |   2 +
 .../IngestSegmentFirehoseFactoryTimelineTest.java  |  67 ++++-
 .../druid/tests/indexer/ITParallelIndexTest.java   |  14 +-
 ...kipedia_parallel_ingest_segment_index_task.json |  63 +++++
 .../client/coordinator/CoordinatorClient.java      |  32 +++
 10 files changed, 495 insertions(+), 45 deletions(-)

diff --git a/docs/content/ingestion/firehose.md b/docs/content/ingestion/firehose.md
index b451c25..8cfdc72 100644
--- a/docs/content/ingestion/firehose.md
+++ b/docs/content/ingestion/firehose.md
@@ -87,7 +87,8 @@ The below configurations can be optionally used for tuning the firehose performa
 ### IngestSegmentFirehose
 
 This Firehose can be used to read the data from existing druid segments.
-It can be used ingest existing druid segments using a new schema and change the name, dimensions, metrics, rollup, etc. of the segment.
+It can be used to ingest existing druid segments using a new schema and change the name, dimensions, metrics, rollup, etc. of the segment.
+This firehose is _splittable_ and can be used by [native parallel index tasks](./native_tasks.html#parallel-index-task).
 A sample ingest firehose spec is shown below -
 
 ```json
@@ -106,6 +107,7 @@ A sample ingest firehose spec is shown below -
 |dimensions|The list of dimensions to select. If left empty, no dimensions are returned. If left null or not defined, all dimensions are returned. |no|
 |metrics|The list of metrics to select. If left empty, no metrics are returned. If left null or not defined, all metrics are selected.|no|
 |filter| See [Filters](../querying/filters.html)|no|
+|maxInputSegmentBytesPerTask|When used with the native parallel index task, the maximum number of bytes of input segments to process in a single task. If a single segment is larger than this number, it will be processed by itself in a single task (input segments are never split across tasks). Defaults to 150MB.|no|
 
 #### SqlFirehose
 
diff --git a/docs/content/ingestion/native_tasks.md b/docs/content/ingestion/native_tasks.md
index f61b395..adadcf3 100644
--- a/docs/content/ingestion/native_tasks.md
+++ b/docs/content/ingestion/native_tasks.md
@@ -45,7 +45,7 @@ task statuses. If one of them fails, it retries the failed task until the retryi
 If all worker tasks succeed, then it collects the reported list of generated segments and publishes those segments at once.
 
 To use this task, the `firehose` in `ioConfig` should be _splittable_. If it's not, this task runs sequentially. The
-current splittable fireshoses are [`LocalFirehose`](./firehose.html#localfirehose), [`HttpFirehose`](./firehose.html#httpfirehose)
+current splittable fireshoses are [`LocalFirehose`](./firehose.html#localfirehose), [`IngestSegmentFirehose`](./firehose.html#ingestsegmentfirehose), [`HttpFirehose`](./firehose.html#httpfirehose)
 , [`StaticS3Firehose`](../development/extensions-core/s3.html#statics3firehose), [`StaticAzureBlobStoreFirehose`](../development/extensions-contrib/azure.html#staticazureblobstorefirehose)
 , [`StaticGoogleBlobStoreFirehose`](../development/extensions-contrib/google.html#staticgoogleblobstorefirehose), and [`StaticCloudFilesFirehose`](../development/extensions-contrib/cloudfiles.html#staticcloudfilesfirehose).
 
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
index 53c3641..7608f88 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
@@ -506,10 +506,12 @@ public class CompactionTask extends AbstractTask
         new IngestSegmentFirehoseFactory(
             dataSchema.getDataSource(),
             interval,
+            null,
             null, // no filter
             // set dimensions and metrics names to make sure that the generated dataSchema is used for the firehose
             dataSchema.getParser().getParseSpec().getDimensionsSpec().getDimensionNames(),
             Arrays.stream(dataSchema.getAggregators()).map(AggregatorFactory::getName).collect(Collectors.toList()),
+            null,
             toolbox.getIndexIO(),
             coordinatorClient,
             segmentLoaderFactory,
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java b/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java
index 5ccd9d3..4d1e0bf 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactory.java
@@ -31,12 +31,16 @@ import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import org.apache.druid.client.coordinator.CoordinatorClient;
+import org.apache.druid.data.input.FiniteFirehoseFactory;
 import org.apache.druid.data.input.Firehose;
-import org.apache.druid.data.input.FirehoseFactory;
+import org.apache.druid.data.input.InputSplit;
 import org.apache.druid.data.input.impl.InputRowParser;
 import org.apache.druid.indexing.common.RetryPolicy;
 import org.apache.druid.indexing.common.RetryPolicyFactory;
 import org.apache.druid.indexing.common.SegmentLoaderFactory;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.guava.Comparators;
 import org.apache.druid.java.util.common.parsers.ParseException;
 import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.query.filter.DimFilter;
@@ -51,40 +55,60 @@ import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.TimelineObjectHolder;
 import org.apache.druid.timeline.VersionedIntervalTimeline;
 import org.apache.druid.timeline.partition.PartitionChunk;
+import org.apache.druid.timeline.partition.PartitionHolder;
 import org.joda.time.Duration;
 import org.joda.time.Interval;
 
 import javax.annotation.Nullable;
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
+import java.util.stream.Stream;
 
-public class IngestSegmentFirehoseFactory implements FirehoseFactory<InputRowParser>
+public class IngestSegmentFirehoseFactory implements FiniteFirehoseFactory<InputRowParser, List<WindowedSegmentId>>
 {
   private static final EmittingLogger log = new EmittingLogger(IngestSegmentFirehoseFactory.class);
+  private static final long DEFAULT_MAX_INPUT_SEGMENT_BYTES_PER_TASK = 150 * 1024 * 1024;
   private final String dataSource;
+  // Exactly one of interval and segmentIds should be non-null. Typically 'interval' is specified directly
+  // by the user creating this firehose and 'segmentIds' is used for sub-tasks if it is split for parallel
+  // batch ingestion.
+  @Nullable
   private final Interval interval;
+  @Nullable
+  private final List<WindowedSegmentId> segmentIds;
   private final DimFilter dimFilter;
   private final List<String> dimensions;
   private final List<String> metrics;
+  private final long maxInputSegmentBytesPerTask;
   private final IndexIO indexIO;
   private final CoordinatorClient coordinatorClient;
   private final SegmentLoaderFactory segmentLoaderFactory;
   private final RetryPolicyFactory retryPolicyFactory;
 
+  private List<InputSplit<List<WindowedSegmentId>>> splits;
+
   @JsonCreator
   public IngestSegmentFirehoseFactory(
       @JsonProperty("dataSource") final String dataSource,
-      @JsonProperty("interval") Interval interval,
+      @Nullable @JsonProperty("interval") Interval interval,
+      // Specifying "segments" is intended only for when this FirehoseFactory has split itself,
+      // not for direct end user use.
+      @Nullable @JsonProperty("segments") List<WindowedSegmentId> segmentIds,
       @JsonProperty("filter") DimFilter dimFilter,
       @JsonProperty("dimensions") List<String> dimensions,
       @JsonProperty("metrics") List<String> metrics,
+      @JsonProperty("maxInputSegmentBytesPerTask") Long maxInputSegmentBytesPerTask,
       @JacksonInject IndexIO indexIO,
       @JacksonInject CoordinatorClient coordinatorClient,
       @JacksonInject SegmentLoaderFactory segmentLoaderFactory,
@@ -92,18 +116,42 @@ public class IngestSegmentFirehoseFactory implements FirehoseFactory<InputRowPar
   )
   {
     Preconditions.checkNotNull(dataSource, "dataSource");
-    Preconditions.checkNotNull(interval, "interval");
+    if ((interval == null && segmentIds == null) || (interval != null && segmentIds != null)) {
+      throw new IAE("Specify exactly one of 'interval' and 'segments'");
+    }
     this.dataSource = dataSource;
     this.interval = interval;
+    this.segmentIds = segmentIds;
     this.dimFilter = dimFilter;
     this.dimensions = dimensions;
     this.metrics = metrics;
+    this.maxInputSegmentBytesPerTask = maxInputSegmentBytesPerTask == null
+                                       ? DEFAULT_MAX_INPUT_SEGMENT_BYTES_PER_TASK
+                                       : maxInputSegmentBytesPerTask;
     this.indexIO = Preconditions.checkNotNull(indexIO, "null IndexIO");
     this.coordinatorClient = Preconditions.checkNotNull(coordinatorClient, "null CoordinatorClient");
     this.segmentLoaderFactory = Preconditions.checkNotNull(segmentLoaderFactory, "null SegmentLoaderFactory");
     this.retryPolicyFactory = Preconditions.checkNotNull(retryPolicyFactory, "null RetryPolicyFactory");
   }
 
+  @Override
+  public FiniteFirehoseFactory<InputRowParser, List<WindowedSegmentId>> withSplit(InputSplit<List<WindowedSegmentId>> split)
+  {
+    return new IngestSegmentFirehoseFactory(
+        dataSource,
+        null,
+        split.get(),
+        dimFilter,
+        dimensions,
+        metrics,
+        maxInputSegmentBytesPerTask,
+        indexIO,
+        coordinatorClient,
+        segmentLoaderFactory,
+        retryPolicyFactory
+    );
+  }
+
   @JsonProperty
   public String getDataSource()
   {
@@ -116,6 +164,12 @@ public class IngestSegmentFirehoseFactory implements FirehoseFactory<InputRowPar
     return interval;
   }
 
+  @JsonProperty
+  public List<WindowedSegmentId> getSegments()
+  {
+    return segmentIds;
+  }
+
   @JsonProperty("filter")
   public DimFilter getDimensionsFilter()
   {
@@ -134,50 +188,40 @@ public class IngestSegmentFirehoseFactory implements FirehoseFactory<InputRowPar
     return metrics;
   }
 
+  @JsonProperty
+  public long getMaxInputSegmentBytesPerTask()
+  {
+    return maxInputSegmentBytesPerTask;
+  }
+
   @Override
   public Firehose connect(InputRowParser inputRowParser, File temporaryDirectory) throws ParseException
   {
-    log.info("Connecting firehose: dataSource[%s], interval[%s]", dataSource, interval);
+    log.info(
+        "Connecting firehose: dataSource[%s], interval[%s], segmentIds[%s]",
+        dataSource,
+        interval,
+        segmentIds
+    );
 
     try {
-      // This call used to use the TaskActionClient, so for compatibility we use the same retry configuration
-      // as TaskActionClient.
-      final RetryPolicy retryPolicy = retryPolicyFactory.makeRetryPolicy();
-      List<DataSegment> usedSegments;
-      while (true) {
-        try {
-          usedSegments =
-              coordinatorClient.getDatabaseSegmentDataSourceSegments(dataSource, Collections.singletonList(interval));
-          break;
-        }
-        catch (Throwable e) {
-          log.warn(e, "Exception getting database segments");
-          final Duration delay = retryPolicy.getAndIncrementRetryDelay();
-          if (delay == null) {
-            throw e;
-          } else {
-            final long sleepTime = jitter(delay.getMillis());
-            log.info("Will try again in [%s].", new Duration(sleepTime).toString());
-            try {
-              Thread.sleep(sleepTime);
-            }
-            catch (InterruptedException e2) {
-              throw new RuntimeException(e2);
-            }
-          }
-        }
-      }
+      final List<TimelineObjectHolder<String, DataSegment>> timeLineSegments = getTimeline();
 
+      // Download all segments locally.
+      // Note: this requires enough local storage space to fit all of the segments, even though
+      // IngestSegmentFirehose iterates over the segments in series. We may want to change this
+      // to download files lazily, perhaps sharing code with PrefetchableTextFilesFirehoseFactory.
       final SegmentLoader segmentLoader = segmentLoaderFactory.manufacturate(temporaryDirectory);
       Map<DataSegment, File> segmentFileMap = Maps.newLinkedHashMap();
-      for (DataSegment segment : usedSegments) {
-        segmentFileMap.put(segment, segmentLoader.getSegmentFiles(segment));
+      for (TimelineObjectHolder<String, DataSegment> holder : timeLineSegments) {
+        for (PartitionChunk<DataSegment> chunk : holder.getObject()) {
+          final DataSegment segment = chunk.getObject();
+          if (!segmentFileMap.containsKey(segment)) {
+            segmentFileMap.put(segment, segmentLoader.getSegmentFiles(segment));
+          }
+        }
       }
 
-      final List<TimelineObjectHolder<String, DataSegment>> timeLineSegments = VersionedIntervalTimeline
-          .forSegments(usedSegments)
-          .lookup(interval);
-
       final List<String> dims;
       if (dimensions != null) {
         dims = dimensions;
@@ -250,6 +294,179 @@ public class IngestSegmentFirehoseFactory implements FirehoseFactory<InputRowPar
     return retval < 0 ? 0 : retval;
   }
 
+  private List<TimelineObjectHolder<String, DataSegment>> getTimeline()
+  {
+    if (interval == null) {
+      return getTimelineForSegmentIds();
+    } else {
+      return getTimelineForInterval();
+    }
+  }
+
+  private List<TimelineObjectHolder<String, DataSegment>> getTimelineForInterval()
+  {
+    Preconditions.checkNotNull(interval);
+
+    // This call used to use the TaskActionClient, so for compatibility we use the same retry configuration
+    // as TaskActionClient.
+    final RetryPolicy retryPolicy = retryPolicyFactory.makeRetryPolicy();
+    List<DataSegment> usedSegments;
+    while (true) {
+      try {
+        usedSegments =
+            coordinatorClient.getDatabaseSegmentDataSourceSegments(dataSource, Collections.singletonList(interval));
+        break;
+      }
+      catch (Throwable e) {
+        log.warn(e, "Exception getting database segments");
+        final Duration delay = retryPolicy.getAndIncrementRetryDelay();
+        if (delay == null) {
+          throw e;
+        } else {
+          final long sleepTime = jitter(delay.getMillis());
+          log.info("Will try again in [%s].", new Duration(sleepTime).toString());
+          try {
+            Thread.sleep(sleepTime);
+          }
+          catch (InterruptedException e2) {
+            throw new RuntimeException(e2);
+          }
+        }
+      }
+    }
+
+    return VersionedIntervalTimeline.forSegments(usedSegments).lookup(interval);
+  }
+
+  private List<TimelineObjectHolder<String, DataSegment>> getTimelineForSegmentIds()
+  {
+    final SortedMap<Interval, TimelineObjectHolder<String, DataSegment>> timeline = new TreeMap<>(
+        Comparators.intervalsByStartThenEnd()
+    );
+    for (WindowedSegmentId windowedSegmentId : Preconditions.checkNotNull(segmentIds)) {
+      final DataSegment segment = coordinatorClient.getDatabaseSegmentDataSourceSegment(
+          dataSource,
+          windowedSegmentId.getSegmentId()
+      );
+      for (Interval interval : windowedSegmentId.getIntervals()) {
+        final TimelineObjectHolder<String, DataSegment> existingHolder = timeline.get(interval);
+        if (existingHolder != null) {
+          if (!existingHolder.getVersion().equals(segment.getVersion())) {
+            throw new ISE("Timeline segments with the same interval should have the same version: " +
+                          "existing version[%s] vs new segment[%s]", existingHolder.getVersion(), segment);
+          }
+          existingHolder.getObject().add(segment.getShardSpec().createChunk(segment));
+        } else {
+          timeline.put(interval, new TimelineObjectHolder<>(
+              interval,
+              segment.getInterval(),
+              segment.getVersion(),
+              new PartitionHolder<DataSegment>(segment.getShardSpec().createChunk(segment))
+          ));
+        }
+      }
+    }
+
+    // Validate that none of the given windows overlaps (except for when multiple segments share exactly the
+    // same interval).
+    Interval lastInterval = null;
+    for (Interval interval : timeline.keySet()) {
+      if (lastInterval != null) {
+        if (interval.overlaps(lastInterval)) {
+          throw new IAE(
+              "Distinct intervals in input segments may not overlap: [%s] vs [%s]",
+              lastInterval,
+              interval
+          );
+        }
+      }
+      lastInterval = interval;
+    }
+
+    return new ArrayList<>(timeline.values());
+  }
+
+  private void initializeSplitsIfNeeded()
+  {
+    if (splits != null) {
+      return;
+    }
+
+    // isSplittable() ensures this is only called when we have an interval.
+    final List<TimelineObjectHolder<String, DataSegment>> timelineSegments = getTimelineForInterval();
+
+    // We do the simplest possible greedy algorithm here instead of anything cleverer. The general bin packing
+    // problem is NP-hard, and we'd like to get segments from the same interval into the same split so that their
+    // data can combine with each other anyway.
+
+    List<InputSplit<List<WindowedSegmentId>>> newSplits = new ArrayList<>();
+    List<WindowedSegmentId> currentSplit = new ArrayList<>();
+    Map<DataSegment, WindowedSegmentId> windowedSegmentIds = new HashMap<>();
+    long bytesInCurrentSplit = 0;
+    for (TimelineObjectHolder<String, DataSegment> timelineHolder : timelineSegments) {
+      for (PartitionChunk<DataSegment> chunk : timelineHolder.getObject()) {
+        final DataSegment segment = chunk.getObject();
+        final WindowedSegmentId existingWindowedSegmentId = windowedSegmentIds.get(segment);
+        if (existingWindowedSegmentId != null) {
+          // We've already seen this segment in the timeline, so just add this interval to it. It has already
+          // been placed into a split.
+          existingWindowedSegmentId.getIntervals().add(timelineHolder.getInterval());
+        } else {
+          // It's the first time we've seen this segment, so create a new WindowedSegmentId.
+          List<Interval> intervals = new ArrayList<>();
+          // Use the interval that contributes to the timeline, not the entire segment's true interval.
+          intervals.add(timelineHolder.getInterval());
+          final WindowedSegmentId newWindowedSegmentId = new WindowedSegmentId(segment.getId().toString(), intervals);
+          windowedSegmentIds.put(segment, newWindowedSegmentId);
+
+          // Now figure out if it goes in the current split or not.
+          final long segmentBytes = segment.getSize();
+          if (bytesInCurrentSplit + segmentBytes > maxInputSegmentBytesPerTask && !currentSplit.isEmpty()) {
+            // This segment won't fit in the current non-empty split, so this split is done.
+            newSplits.add(new InputSplit<>(currentSplit));
+            currentSplit = new ArrayList<>();
+            bytesInCurrentSplit = 0;
+          }
+          if (segmentBytes > maxInputSegmentBytesPerTask) {
+            // If this segment is itself bigger than our max, just put it in its own split.
+            Preconditions.checkState(currentSplit.isEmpty() && bytesInCurrentSplit == 0);
+            newSplits.add(new InputSplit<>(Collections.singletonList(newWindowedSegmentId)));
+          } else {
+            currentSplit.add(newWindowedSegmentId);
+            bytesInCurrentSplit += segmentBytes;
+          }
+        }
+      }
+    }
+    if (!currentSplit.isEmpty()) {
+      newSplits.add(new InputSplit<>(currentSplit));
+    }
+
+    splits = newSplits;
+  }
+
+  @Override
+  public boolean isSplittable()
+  {
+    // Specifying 'segments' to this factory instead of 'interval' is intended primarily for internal use by
+    // parallel batch injection: we don't need to support splitting a list of segments.
+    return interval != null;
+  }
+
+  @Override
+  public Stream<InputSplit<List<WindowedSegmentId>>> getSplits()
+  {
+    initializeSplitsIfNeeded();
+    return splits.stream();
+  }
+
+  @Override
+  public int getNumSplits()
+  {
+    initializeSplitsIfNeeded();
+    return splits.size();
+  }
+
   @VisibleForTesting
   static List<String> getUniqueDimensions(
       List<TimelineObjectHolder<String, DataSegment>> timelineSegments,
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/firehose/WindowedSegmentId.java b/indexing-service/src/main/java/org/apache/druid/indexing/firehose/WindowedSegmentId.java
new file mode 100644
index 0000000..c3f04bb
--- /dev/null
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/firehose/WindowedSegmentId.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.firehose;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import org.joda.time.Interval;
+
+import java.util.List;
+
+/**
+ * A WindowedSegment represents a segment plus the list of intervals inside it which contribute to a timeline.
+ * <p>
+ * This class is intended for serialization in specs.
+ */
+public class WindowedSegmentId
+{
+  // This is of the form used by SegmentId.
+  private final String segmentId;
+  private final List<Interval> intervals;
+
+  @JsonCreator
+  public WindowedSegmentId(
+      @JsonProperty("segmentId") String segmentId,
+      @JsonProperty("intervals") List<Interval> intervals
+  )
+  {
+    this.segmentId = Preconditions.checkNotNull(segmentId, "null segmentId");
+    this.intervals = Preconditions.checkNotNull(intervals, "null intervals");
+  }
+
+  @JsonProperty
+  public String getSegmentId()
+  {
+    return segmentId;
+  }
+
+  @JsonProperty
+  public List<Interval> getIntervals()
+  {
+    return intervals;
+  }
+}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java
index 7f44ad6..70e5544 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java
@@ -197,9 +197,11 @@ public class IngestSegmentFirehoseFactoryTest
             final IngestSegmentFirehoseFactory isfFactory = new IngestSegmentFirehoseFactory(
                 TASK.getDataSource(),
                 Intervals.ETERNITY,
+                null,
                 new SelectorDimFilter(DIM_NAME, DIM_VALUE, null),
                 dim_names,
                 metric_names,
+                null,
                 INDEX_IO,
                 cc,
                 slf,
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java
index e224b95..119097b 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java
@@ -26,8 +26,10 @@ import com.google.common.collect.Iterables;
 import com.google.common.io.Files;
 import org.apache.commons.io.FileUtils;
 import org.apache.druid.client.coordinator.CoordinatorClient;
+import org.apache.druid.data.input.FiniteFirehoseFactory;
 import org.apache.druid.data.input.Firehose;
 import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputSplit;
 import org.apache.druid.data.input.MapBasedInputRow;
 import org.apache.druid.data.input.impl.DimensionsSpec;
 import org.apache.druid.data.input.impl.InputRowParser;
@@ -72,6 +74,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.stream.Collectors;
 
 @RunWith(Parameterized.class)
 public class IngestSegmentFirehoseFactoryTimelineTest
@@ -101,6 +104,7 @@ public class IngestSegmentFirehoseFactoryTimelineTest
   private final File tmpDir;
   private final int expectedCount;
   private final long expectedSum;
+  private final int segmentCount;
 
   private static final ObjectMapper MAPPER;
   private static final IndexIO INDEX_IO;
@@ -118,17 +122,28 @@ public class IngestSegmentFirehoseFactoryTimelineTest
       IngestSegmentFirehoseFactory factory,
       File tmpDir,
       int expectedCount,
-      long expectedSum
+      long expectedSum,
+      int segmentCount
   )
   {
     this.factory = factory;
     this.tmpDir = tmpDir;
     this.expectedCount = expectedCount;
     this.expectedSum = expectedSum;
+    this.segmentCount = segmentCount;
   }
 
   @Test
-  public void testSimple() throws Exception
+  public void test() throws Exception
+  {
+    // Junit 4.12 doesn't have a good way to run tearDown after multiple tests in a Parameterized
+    // class run. (Junit 4.13 adds @AfterParam but isn't released yet.) Fake it by just running
+    // "tests" in series inside one @Test.
+    testSimple();
+    testSplit();
+  }
+
+  private void testSimple() throws Exception
   {
     int count = 0;
     long sum = 0;
@@ -145,6 +160,36 @@ public class IngestSegmentFirehoseFactoryTimelineTest
     Assert.assertEquals("sum", expectedSum, sum);
   }
 
+  private void testSplit() throws Exception
+  {
+    Assert.assertTrue(factory.isSplittable());
+    final int numSplits = factory.getNumSplits();
+    // We set maxInputSegmentBytesPerSplit to 2 so each segment should become a byte.
+    Assert.assertEquals(segmentCount, numSplits);
+    final List<InputSplit<List<WindowedSegmentId>>> splits =
+        factory.getSplits().collect(Collectors.toList());
+    Assert.assertEquals(numSplits, splits.size());
+
+    int count = 0;
+    long sum = 0;
+
+    for (InputSplit<List<WindowedSegmentId>> split : splits) {
+      final FiniteFirehoseFactory<InputRowParser, List<WindowedSegmentId>> splitFactory =
+          factory.withSplit(split);
+      try (final Firehose firehose = splitFactory.connect(ROW_PARSER, null)) {
+        while (firehose.hasMore()) {
+          final InputRow row = firehose.nextRow();
+          count++;
+          sum += row.getMetric(METRICS[0]).longValue();
+        }
+      }
+    }
+
+    Assert.assertEquals("count", expectedCount, count);
+    Assert.assertEquals("sum", expectedSum, sum);
+
+  }
+
   @After
   public void tearDown() throws Exception
   {
@@ -285,13 +330,26 @@ public class IngestSegmentFirehoseFactoryTimelineTest
             throw new IllegalArgumentException("WTF");
           }
         }
+
+        @Override
+        public DataSegment getDatabaseSegmentDataSourceSegment(String dataSource, String segmentId)
+        {
+          return testCase.segments
+              .stream()
+              .filter(s -> s.getId().toString().equals(segmentId))
+              .findAny()
+              .get();  // throwing if not found is exactly what the real code does
+        }
       };
       final IngestSegmentFirehoseFactory factory = new IngestSegmentFirehoseFactory(
           DATA_SOURCE,
           testCase.interval,
+          null,
           new TrueDimFilter(),
           Arrays.asList(DIMENSIONS),
           Arrays.asList(METRICS),
+          // Split as much as possible
+          1L,
           INDEX_IO,
           cc,
           slf,
@@ -304,7 +362,8 @@ public class IngestSegmentFirehoseFactoryTimelineTest
               factory,
               testCase.tmpDir,
               testCase.expectedCount,
-              testCase.expectedSum
+              testCase.expectedSum,
+              testCase.segments.size()
           }
       );
     }
@@ -384,7 +443,7 @@ public class IngestSegmentFirehoseFactoryTimelineTest
           Arrays.asList(METRICS),
           new LinearShardSpec(partitionNum),
           -1,
-          0L
+          2L
       );
     }
   }
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITParallelIndexTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITParallelIndexTest.java
index e457a1c..b3920a1 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITParallelIndexTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITParallelIndexTest.java
@@ -33,11 +33,16 @@ public class ITParallelIndexTest extends AbstractITBatchIndexTest
   private static String REINDEX_TASK = "/indexer/wikipedia_parallel_reindex_task.json";
   private static String REINDEX_QUERIES_RESOURCE = "/indexer/wikipedia_parallel_reindex_queries.json";
   private static String INDEX_DATASOURCE = "wikipedia_parallel_index_test";
+  private static String INDEX_INGEST_SEGMENT_DATASOURCE = "wikipedia_parallel_ingest_segment_index_test";
+  private static String INDEX_INGEST_SEGMENT_TASK = "/indexer/wikipedia_parallel_ingest_segment_index_task.json";
 
   @Test
   public void testIndexData() throws Exception
   {
-    try (final Closeable closeable = unloader(INDEX_DATASOURCE + config.getExtraDatasourceNameSuffix())) {
+    try (final Closeable indexCloseable = unloader(INDEX_DATASOURCE + config.getExtraDatasourceNameSuffix());
+         final Closeable ingestSegmentCloseable = unloader(
+             INDEX_INGEST_SEGMENT_DATASOURCE + config.getExtraDatasourceNameSuffix());
+    ) {
       doIndexTestTest(
           INDEX_DATASOURCE,
           INDEX_TASK,
@@ -53,6 +58,13 @@ public class ITParallelIndexTest extends AbstractITBatchIndexTest
           REINDEX_QUERIES_RESOURCE,
           true
       );
+
+      doReindexTest(
+          INDEX_DATASOURCE,
+          INDEX_INGEST_SEGMENT_DATASOURCE,
+          INDEX_INGEST_SEGMENT_TASK,
+          REINDEX_QUERIES_RESOURCE
+      );
     }
   }
 }
diff --git a/integration-tests/src/test/resources/indexer/wikipedia_parallel_ingest_segment_index_task.json b/integration-tests/src/test/resources/indexer/wikipedia_parallel_ingest_segment_index_task.json
new file mode 100644
index 0000000..535e859
--- /dev/null
+++ b/integration-tests/src/test/resources/indexer/wikipedia_parallel_ingest_segment_index_task.json
@@ -0,0 +1,63 @@
+{
+  "type": "index_parallel",
+  "spec": {
+    "dataSchema": {
+      "dataSource": "%%REINDEX_DATASOURCE%%",
+      "metricsSpec": [
+        {
+          "type": "count",
+          "name": "count"
+        },
+        {
+          "type": "doubleSum",
+          "name": "added",
+          "fieldName": "added"
+        },
+        {
+          "type": "doubleSum",
+          "name": "deleted",
+          "fieldName": "deleted"
+        },
+        {
+          "type": "doubleSum",
+          "name": "delta",
+          "fieldName": "delta"
+        }
+      ],
+      "granularitySpec": {
+        "segmentGranularity": "DAY",
+        "queryGranularity": "second",
+        "intervals": [
+          "2013-08-31/2013-09-02"
+        ]
+      },
+      "parser": {
+        "parseSpec": {
+          "format": "json",
+          "timestampSpec": {
+            "column": "timestamp"
+          },
+          "dimensionsSpec": {
+            "dimensionExclusions": [
+              "robot",
+              "continent"
+            ]
+          }
+        }
+      }
+    },
+    "ioConfig": {
+      "type": "index_parallel",
+      "firehose": {
+        "type": "ingestSegment",
+        "dataSource": "%%DATASOURCE%%",
+        "interval": "2013-08-31/2013-09-02",
+        "maxInputSegmentBytesPerTask": 1
+      }
+    },
+    "tuningConfig": {
+      "type": "index_parallel",
+      "maxNumSubTasks": 10
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java b/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java
index 4188540..247dee1 100644
--- a/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java
+++ b/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java
@@ -156,4 +156,36 @@ public class CoordinatorClient
       throw new RuntimeException(e);
     }
   }
+
+  public DataSegment getDatabaseSegmentDataSourceSegment(String dataSource, String segmentId)
+  {
+    try {
+      FullResponseHolder response = druidLeaderClient.go(
+          druidLeaderClient.makeRequest(
+              HttpMethod.GET,
+              StringUtils.format(
+                  "/druid/coordinator/v1/metadata/datasources/%s/segments/%s",
+                  StringUtils.urlEncode(dataSource),
+                  StringUtils.urlEncode(segmentId)
+              )
+          )
+      );
+
+      if (!response.getStatus().equals(HttpResponseStatus.OK)) {
+        throw new ISE(
+            "Error while fetching database segment data source segment status[%s] content[%s]",
+            response.getStatus(),
+            response.getContent()
+        );
+      }
+      return jsonMapper.readValue(
+          response.getContent(), new TypeReference<DataSegment>()
+          {
+          }
+      );
+    }
+    catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org