You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by gi...@apache.org on 2018/08/09 20:51:23 UTC

[incubator-druid] branch master updated: Add support 'keepSegmentGranularity' for compactionTask (#6095)

This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new d6a02de  Add support 'keepSegmentGranularity' for compactionTask (#6095)
d6a02de is described below

commit d6a02de5b57ab470cd0efd9cc101440dbfa2a6f3
Author: Jihoon Son <ji...@apache.org>
AuthorDate: Thu Aug 9 13:51:20 2018 -0700

    Add support 'keepSegmentGranularity' for compactionTask (#6095)
    
    * Add keepSegmentGranularity for compactionTask
    
    * fix build
    
    * createIoConfig method
    
    * fix build
    
    * fix build
    
    * address comments
    
    * fix build
---
 .../io/druid/data/input/impl/DimensionsSpec.java   |   4 +
 .../druid/indexing/common/task/CompactionTask.java | 178 ++++++++-----
 .../indexing/common/task/CompactionTaskTest.java   | 289 +++++++++++++++------
 .../druid/tests/indexer/ITCompactionTaskTest.java  |  46 +++-
 .../indexer/wikipedia_compaction_task.json         |   3 +-
 .../resources/indexer/wikipedia_index_task.json    |   3 +-
 .../druid/client/indexing/ClientCompactQuery.java  |   9 +
 .../client/indexing/HttpIndexingServiceClient.java |   3 +-
 .../client/indexing/IndexingServiceClient.java     |   1 +
 .../helper/DruidCoordinatorSegmentCompactor.java   |   3 +
 .../client/indexing/NoopIndexingServiceClient.java |   1 +
 .../DruidCoordinatorSegmentCompactorTest.java      |   1 +
 12 files changed, 388 insertions(+), 153 deletions(-)

diff --git a/api/src/main/java/io/druid/data/input/impl/DimensionsSpec.java b/api/src/main/java/io/druid/data/input/impl/DimensionsSpec.java
index 837a482..be80163 100644
--- a/api/src/main/java/io/druid/data/input/impl/DimensionsSpec.java
+++ b/api/src/main/java/io/druid/data/input/impl/DimensionsSpec.java
@@ -101,6 +101,10 @@ public class DimensionsSpec
     }
   }
 
+  public DimensionsSpec(List<DimensionSchema> dimensions)
+  {
+    this(dimensions, null, null);
+  }
 
   @JsonProperty
   public List<DimensionSchema> getDimensions()
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/CompactionTask.java
index f81999f..9f1fb56 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/CompactionTask.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/CompactionTask.java
@@ -28,7 +28,6 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.BiMap;
 import com.google.common.collect.HashBiMap;
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import io.druid.data.input.impl.DimensionSchema;
 import io.druid.data.input.impl.DimensionSchema.MultiValueHandling;
@@ -54,7 +53,7 @@ import io.druid.java.util.common.ISE;
 import io.druid.java.util.common.JodaUtils;
 import io.druid.java.util.common.Pair;
 import io.druid.java.util.common.RE;
-import io.druid.java.util.common.granularity.NoneGranularity;
+import io.druid.java.util.common.granularity.Granularities;
 import io.druid.java.util.common.guava.Comparators;
 import io.druid.java.util.common.jackson.JacksonUtils;
 import io.druid.java.util.common.logger.Logger;
@@ -96,17 +95,19 @@ public class CompactionTask extends AbstractTask
 {
   private static final Logger log = new Logger(CompactionTask.class);
   private static final String TYPE = "compact";
+  private static final boolean DEFAULT_KEEP_SEGMENT_GRANULARITY = true;
 
   private final Interval interval;
   private final List<DataSegment> segments;
   private final DimensionsSpec dimensionsSpec;
+  private final boolean keepSegmentGranularity;
   private final IndexTuningConfig tuningConfig;
   private final ObjectMapper jsonMapper;
   @JsonIgnore
   private final SegmentProvider segmentProvider;
 
   @JsonIgnore
-  private IndexTask indexTaskSpec;
+  private List<IndexTask> indexTaskSpecs;
 
   @JsonIgnore
   private final AuthorizerMapper authorizerMapper;
@@ -125,6 +126,7 @@ public class CompactionTask extends AbstractTask
       @Nullable @JsonProperty("interval") final Interval interval,
       @Nullable @JsonProperty("segments") final List<DataSegment> segments,
       @Nullable @JsonProperty("dimensions") final DimensionsSpec dimensionsSpec,
+      @Nullable @JsonProperty("keepSegmentGranularity") final Boolean keepSegmentGranularity,
       @Nullable @JsonProperty("tuningConfig") final IndexTuningConfig tuningConfig,
       @Nullable @JsonProperty("context") final Map<String, Object> context,
       @JacksonInject ObjectMapper jsonMapper,
@@ -144,6 +146,9 @@ public class CompactionTask extends AbstractTask
     this.interval = interval;
     this.segments = segments;
     this.dimensionsSpec = dimensionsSpec;
+    this.keepSegmentGranularity = keepSegmentGranularity == null
+                                  ? DEFAULT_KEEP_SEGMENT_GRANULARITY
+                                  : keepSegmentGranularity;
     this.tuningConfig = tuningConfig;
     this.jsonMapper = jsonMapper;
     this.segmentProvider = segments == null ? new SegmentProvider(dataSource, interval) : new SegmentProvider(segments);
@@ -171,6 +176,12 @@ public class CompactionTask extends AbstractTask
   }
 
   @JsonProperty
+  public boolean isKeepSegmentGranularity()
+  {
+    return keepSegmentGranularity;
+  }
+
+  @JsonProperty
   public IndexTuningConfig getTuningConfig()
   {
     return tuningConfig;
@@ -205,52 +216,65 @@ public class CompactionTask extends AbstractTask
   @Override
   public TaskStatus run(final TaskToolbox toolbox) throws Exception
   {
-    if (indexTaskSpec == null) {
-      final IndexIngestionSpec ingestionSpec = createIngestionSchema(
+    if (indexTaskSpecs == null) {
+      indexTaskSpecs = createIngestionSchema(
           toolbox,
           segmentProvider,
           dimensionsSpec,
+          keepSegmentGranularity,
           tuningConfig,
           jsonMapper
-      );
-
-      if (ingestionSpec != null) {
-        indexTaskSpec = new IndexTask(
-            getId(),
-            getGroupId(),
-            getTaskResource(),
-            getDataSource(),
-            ingestionSpec,
-            getContext(),
-            authorizerMapper,
-            chatHandlerProvider,
-            rowIngestionMetersFactory
-        );
-      }
+      ).stream()
+      .map(spec -> new IndexTask(
+          getId(),
+          getGroupId(),
+          getTaskResource(),
+          getDataSource(),
+          spec,
+          getContext(),
+          authorizerMapper,
+          chatHandlerProvider,
+          rowIngestionMetersFactory
+      ))
+      .collect(Collectors.toList());
     }
 
-    if (indexTaskSpec == null) {
+    if (indexTaskSpecs.isEmpty()) {
       log.warn("Interval[%s] has no segments, nothing to do.", interval);
       return TaskStatus.failure(getId());
     } else {
-      final String json = jsonMapper.writerWithDefaultPrettyPrinter().writeValueAsString(indexTaskSpec);
-      log.info("Generated compaction task details: " + json);
+      log.info("Generated [%d] compaction task specs", indexTaskSpecs.size());
 
-      return indexTaskSpec.run(toolbox);
+      for (IndexTask eachSpec : indexTaskSpecs) {
+        final String json = jsonMapper.writerWithDefaultPrettyPrinter().writeValueAsString(eachSpec);
+        log.info("Running indexSpec: " + json);
+
+        try {
+          final TaskStatus eachResult = eachSpec.run(toolbox);
+          if (!eachResult.isSuccess()) {
+            log.warn("Failed to run indexSpec: [%s].\nTrying the next indexSpec.", json);
+          }
+        }
+        catch (Exception e) {
+          log.warn(e, "Failed to run indexSpec: [%s].\nTrying the next indexSpec.", json);
+        }
+      }
+
+      return TaskStatus.success(getId());
     }
   }
 
   /**
    * Generate {@link IndexIngestionSpec} from input segments.
    *
-   * @return null if input segments don't exist. Otherwise, a generated ingestionSpec.
+   * @return an empty list if input segments don't exist. Otherwise, a generated ingestionSpec.
    */
-  @Nullable
   @VisibleForTesting
-  static IndexIngestionSpec createIngestionSchema(
+  static List<IndexIngestionSpec> createIngestionSchema(
       TaskToolbox toolbox,
       SegmentProvider segmentProvider,
       DimensionsSpec dimensionsSpec,
+      boolean keepSegmentGranularity,
       IndexTuningConfig tuningConfig,
       ObjectMapper jsonMapper
   ) throws IOException, SegmentLoadingException
@@ -263,33 +287,68 @@ public class CompactionTask extends AbstractTask
     final List<TimelineObjectHolder<String, DataSegment>> timelineSegments = pair.rhs;
 
     if (timelineSegments.size() == 0) {
-      return null;
+      return Collections.emptyList();
     }
 
-    final DataSchema dataSchema = createDataSchema(
-        segmentProvider.dataSource,
-        segmentProvider.interval,
-        dimensionsSpec,
-        toolbox.getIndexIO(),
-        jsonMapper,
-        timelineSegments,
-        segmentFileMap
-    );
-    return new IndexIngestionSpec(
-        dataSchema,
-        new IndexIOConfig(
-            new IngestSegmentFirehoseFactory(
-                segmentProvider.dataSource,
-                segmentProvider.interval,
-                null, // no filter
-                // set dimensions and metrics names to make sure that the generated dataSchema is used for the firehose
-                dataSchema.getParser().getParseSpec().getDimensionsSpec().getDimensionNames(),
-                Arrays.stream(dataSchema.getAggregators()).map(AggregatorFactory::getName).collect(Collectors.toList()),
-                toolbox.getIndexIO()
-            ),
-            false
+    if (keepSegmentGranularity) {
+      // if keepSegmentGranularity = true, create indexIngestionSpec per segment interval, so that we can run an index
+      // task per segment interval.
+      final List<IndexIngestionSpec> specs = new ArrayList<>(timelineSegments.size());
+      for (TimelineObjectHolder<String, DataSegment> holder : timelineSegments) {
+        final DataSchema dataSchema = createDataSchema(
+            segmentProvider.dataSource,
+            holder.getInterval(),
+            Collections.singletonList(holder),
+            dimensionsSpec,
+            toolbox.getIndexIO(),
+            jsonMapper,
+            segmentFileMap
+        );
+
+        specs.add(
+            new IndexIngestionSpec(
+                dataSchema,
+                createIoConfig(toolbox, dataSchema, holder.getInterval()),
+                tuningConfig
+            )
+        );
+      }
+
+      return specs;
+    } else {
+      final DataSchema dataSchema = createDataSchema(
+          segmentProvider.dataSource,
+          segmentProvider.interval,
+          timelineSegments,
+          dimensionsSpec,
+          toolbox.getIndexIO(),
+          jsonMapper,
+          segmentFileMap
+      );
+
+      return Collections.singletonList(
+          new IndexIngestionSpec(
+              dataSchema,
+              createIoConfig(toolbox, dataSchema, segmentProvider.interval),
+              tuningConfig
+          )
+      );
+    }
+  }
+
+  private static IndexIOConfig createIoConfig(TaskToolbox toolbox, DataSchema dataSchema, Interval interval)
+  {
+    return new IndexIOConfig(
+        new IngestSegmentFirehoseFactory(
+            dataSchema.getDataSource(),
+            interval,
+            null, // no filter
+            // set dimensions and metrics names to make sure that the generated dataSchema is used for the firehose
+            dataSchema.getParser().getParseSpec().getDimensionsSpec().getDimensionNames(),
+            Arrays.stream(dataSchema.getAggregators()).map(AggregatorFactory::getName).collect(Collectors.toList()),
+            toolbox.getIndexIO()
         ),
-        tuningConfig
+        false
     );
   }
 
@@ -308,18 +367,18 @@ public class CompactionTask extends AbstractTask
 
   private static DataSchema createDataSchema(
       String dataSource,
-      Interval interval,
+      Interval totalInterval,
+      List<TimelineObjectHolder<String, DataSegment>> timelineObjectHolder,
       DimensionsSpec dimensionsSpec,
       IndexIO indexIO,
       ObjectMapper jsonMapper,
-      List<TimelineObjectHolder<String, DataSegment>> timelineSegments,
       Map<DataSegment, File> segmentFileMap
   )
       throws IOException
   {
     // find metadata for interval
     final List<Pair<QueryableIndex, DataSegment>> queryableIndexAndSegments = loadSegments(
-        timelineSegments,
+        timelineObjectHolder,
         segmentFileMap,
         indexIO
     );
@@ -348,10 +407,11 @@ public class CompactionTask extends AbstractTask
       final Boolean isRollup = pair.lhs.getMetadata().isRollup();
       return isRollup != null && isRollup;
     });
+
     final GranularitySpec granularitySpec = new ArbitraryGranularitySpec(
-        new NoneGranularity(),
+        Granularities.NONE,
         rollup,
-        ImmutableList.of(interval)
+        Collections.singletonList(totalInterval)
     );
 
     // find unique dimensions
@@ -444,15 +504,15 @@ public class CompactionTask extends AbstractTask
   }
 
   private static List<Pair<QueryableIndex, DataSegment>> loadSegments(
-      List<TimelineObjectHolder<String, DataSegment>> timelineSegments,
+      List<TimelineObjectHolder<String, DataSegment>> timelineObjectHolders,
       Map<DataSegment, File> segmentFileMap,
       IndexIO indexIO
   ) throws IOException
   {
     final List<Pair<QueryableIndex, DataSegment>> segments = new ArrayList<>();
 
-    for (TimelineObjectHolder<String, DataSegment> timelineSegment : timelineSegments) {
-      final PartitionHolder<DataSegment> partitionHolder = timelineSegment.getObject();
+    for (TimelineObjectHolder<String, DataSegment> timelineObjectHolder : timelineObjectHolders) {
+      final PartitionHolder<DataSegment> partitionHolder = timelineObjectHolder.getObject();
       for (PartitionChunk<DataSegment> chunk : partitionHolder) {
         final DataSegment segment = chunk.getObject();
         final QueryableIndex queryableIndex = indexIO.loadIndex(
diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/CompactionTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/CompactionTaskTest.java
index 2288393..6277a9e 100644
--- a/indexing-service/src/test/java/io/druid/indexing/common/task/CompactionTaskTest.java
+++ b/indexing-service/src/test/java/io/druid/indexing/common/task/CompactionTaskTest.java
@@ -98,11 +98,16 @@ import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
 
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -110,13 +115,22 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
+@RunWith(Parameterized.class)
 public class CompactionTaskTest
 {
   private static final String DATA_SOURCE = "dataSource";
   private static final String TIMESTAMP_COLUMN = "timestamp";
   private static final String MIXED_TYPE_COLUMN = "string_to_double";
   private static final Interval COMPACTION_INTERVAL = Intervals.of("2017-01-01/2017-06-01");
+  private static final List<Interval> SEGMENT_INTERVALS = ImmutableList.of(
+      Intervals.of("2017-01-01/2017-02-01"),
+      Intervals.of("2017-02-01/2017-03-01"),
+      Intervals.of("2017-03-01/2017-04-01"),
+      Intervals.of("2017-04-01/2017-05-01"),
+      Intervals.of("2017-05-01/2017-06-01")
+  );
   private static final Map<Interval, DimensionSchema> MIXED_TYPE_COLUMN_MAP = ImmutableMap.of(
       Intervals.of("2017-01-01/2017-02-01"),
       new StringDimensionSchema(MIXED_TYPE_COLUMN),
@@ -138,6 +152,8 @@ public class CompactionTaskTest
   private static ObjectMapper objectMapper = setupInjectablesInObjectMapper(new DefaultObjectMapper());
   private static Map<DataSegment, File> segmentMap;
 
+  private final boolean keepSegmentGranularity;
+
   private TaskToolbox toolbox;
 
   @BeforeClass
@@ -288,6 +304,20 @@ public class CompactionTaskTest
     );
   }
 
+  @Parameters(name = "keepSegmentGranularity={0}")
+  public static Collection<Object[]> parameters()
+  {
+    return ImmutableList.of(
+        new Object[] {false},
+        new Object[] {true}
+    );
+  }
+
+  public CompactionTaskTest(boolean keepSegmentGranularity)
+  {
+    this.keepSegmentGranularity = keepSegmentGranularity;
+  }
+
   @Test
   public void testSerdeWithInterval() throws IOException
   {
@@ -298,6 +328,7 @@ public class CompactionTaskTest
         COMPACTION_INTERVAL,
         null,
         null,
+        null,
         createTuningConfig(),
         ImmutableMap.of("testKey", "testContext"),
         objectMapper,
@@ -327,6 +358,7 @@ public class CompactionTaskTest
         null,
         SEGMENTS,
         null,
+        null,
         createTuningConfig(),
         ImmutableMap.of("testKey", "testContext"),
         objectMapper,
@@ -346,18 +378,51 @@ public class CompactionTaskTest
   }
 
   @Test
-  public void testCreateIngestionSchema() throws IOException, SegmentLoadingException
+  public void testCreateIngestionSchemaWithKeepSegmentGranularity() throws IOException, SegmentLoadingException
+  {
+    final List<IndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
+        toolbox,
+        new SegmentProvider(DATA_SOURCE, COMPACTION_INTERVAL),
+        null,
+        keepSegmentGranularity,
+        TUNING_CONFIG,
+        objectMapper
+    );
+    final List<DimensionsSpec> expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration(
+        keepSegmentGranularity
+    );
+
+    if (keepSegmentGranularity) {
+      Assert.assertEquals(5, ingestionSpecs.size());
+      assertIngestionSchema(ingestionSpecs, expectedDimensionsSpec, SEGMENT_INTERVALS);
+    } else {
+      Assert.assertEquals(1, ingestionSpecs.size());
+      assertIngestionSchema(ingestionSpecs, expectedDimensionsSpec, Collections.singletonList(COMPACTION_INTERVAL));
+    }
+  }
+
+  @Test
+  public void testCreateIngestionSchemaWithIgnoreSegmentGranularity() throws IOException, SegmentLoadingException
   {
-    final IndexIngestionSpec ingestionSchema = CompactionTask.createIngestionSchema(
+    final List<IndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
         toolbox,
         new SegmentProvider(DATA_SOURCE, COMPACTION_INTERVAL),
         null,
+        keepSegmentGranularity,
         TUNING_CONFIG,
         objectMapper
     );
-    final DimensionsSpec expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration();
+    final List<DimensionsSpec> expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration(
+        keepSegmentGranularity
+    );
 
-    assertIngestionSchema(ingestionSchema, expectedDimensionsSpec);
+    if (keepSegmentGranularity) {
+      Assert.assertEquals(5, ingestionSpecs.size());
+      assertIngestionSchema(ingestionSpecs, expectedDimensionsSpec, SEGMENT_INTERVALS);
+    } else {
+      Assert.assertEquals(1, ingestionSpecs.size());
+      assertIngestionSchema(ingestionSpecs, expectedDimensionsSpec, Collections.singletonList(COMPACTION_INTERVAL));
+    }
   }
 
   @Test
@@ -387,35 +452,59 @@ public class CompactionTaskTest
             new DoubleDimensionSchema("double_dim_3"),
             new DoubleDimensionSchema("double_dim_4"),
             new StringDimensionSchema(MIXED_TYPE_COLUMN)
-        ),
-        null,
-        null
+        )
     );
 
-    final IndexIngestionSpec ingestionSchema = CompactionTask.createIngestionSchema(
+    final List<IndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
         toolbox,
         new SegmentProvider(DATA_SOURCE, COMPACTION_INTERVAL),
         customSpec,
+        keepSegmentGranularity,
         TUNING_CONFIG,
         objectMapper
     );
 
-    assertIngestionSchema(ingestionSchema, customSpec);
+    if (keepSegmentGranularity) {
+      Assert.assertEquals(5, ingestionSpecs.size());
+      final List<DimensionsSpec> dimensionsSpecs = new ArrayList<>(5);
+      IntStream.range(0, 5).forEach(i -> dimensionsSpecs.add(customSpec));
+      assertIngestionSchema(
+          ingestionSpecs,
+          dimensionsSpecs,
+          SEGMENT_INTERVALS
+      );
+    } else {
+      Assert.assertEquals(1, ingestionSpecs.size());
+      assertIngestionSchema(
+          ingestionSpecs,
+          Collections.singletonList(customSpec),
+          Collections.singletonList(COMPACTION_INTERVAL)
+      );
+    }
   }
 
   @Test
   public void testCreateIngestionSchemaWithCustomSegments() throws IOException, SegmentLoadingException
   {
-    final IndexIngestionSpec ingestionSchema = CompactionTask.createIngestionSchema(
+    final List<IndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
         toolbox,
         new SegmentProvider(SEGMENTS),
         null,
+        keepSegmentGranularity,
         TUNING_CONFIG,
         objectMapper
     );
-    final DimensionsSpec expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration();
+    final List<DimensionsSpec> expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration(
+        keepSegmentGranularity
+    );
 
-    assertIngestionSchema(ingestionSchema, expectedDimensionsSpec);
+    if (keepSegmentGranularity) {
+      Assert.assertEquals(5, ingestionSpecs.size());
+      assertIngestionSchema(ingestionSpecs, expectedDimensionsSpec, SEGMENT_INTERVALS);
+    } else {
+      Assert.assertEquals(1, ingestionSpecs.size());
+      assertIngestionSchema(ingestionSpecs, expectedDimensionsSpec, Collections.singletonList(COMPACTION_INTERVAL));
+    }
   }
 
   @Test
@@ -430,6 +519,7 @@ public class CompactionTaskTest
         toolbox,
         new SegmentProvider(segments),
         null,
+        keepSegmentGranularity,
         TUNING_CONFIG,
         objectMapper
     );
@@ -448,13 +538,14 @@ public class CompactionTaskTest
         toolbox,
         new SegmentProvider(segments),
         null,
+        keepSegmentGranularity,
         TUNING_CONFIG,
         objectMapper
     );
   }
 
   @Test
-  public void testEmptyInterval() throws Exception
+  public void testEmptyInterval()
   {
     expectedException.expect(IllegalArgumentException.class);
     expectedException.expectMessage(CoreMatchers.containsString("must specify a nonempty interval"));
@@ -468,6 +559,7 @@ public class CompactionTaskTest
         null,
         null,
         null,
+        null,
         objectMapper,
         AuthTestUtils.TEST_AUTHORIZER_MAPPER,
         new NoopChatHandlerProvider(),
@@ -475,85 +567,118 @@ public class CompactionTaskTest
     );
   }
 
-  private static DimensionsSpec getExpectedDimensionsSpecForAutoGeneration()
+  private static List<DimensionsSpec> getExpectedDimensionsSpecForAutoGeneration(boolean keepSegmentGranularity)
   {
-    return new DimensionsSpec(
-        Lists.newArrayList(
-            new LongDimensionSchema("timestamp"),
-            new StringDimensionSchema("string_dim_4"),
-            new LongDimensionSchema("long_dim_4"),
-            new FloatDimensionSchema("float_dim_4"),
-            new DoubleDimensionSchema("double_dim_4"),
-            new StringDimensionSchema("string_dim_0"),
-            new LongDimensionSchema("long_dim_0"),
-            new FloatDimensionSchema("float_dim_0"),
-            new DoubleDimensionSchema("double_dim_0"),
-            new StringDimensionSchema("string_dim_1"),
-            new LongDimensionSchema("long_dim_1"),
-            new FloatDimensionSchema("float_dim_1"),
-            new DoubleDimensionSchema("double_dim_1"),
-            new StringDimensionSchema("string_dim_2"),
-            new LongDimensionSchema("long_dim_2"),
-            new FloatDimensionSchema("float_dim_2"),
-            new DoubleDimensionSchema("double_dim_2"),
-            new StringDimensionSchema("string_dim_3"),
-            new LongDimensionSchema("long_dim_3"),
-            new FloatDimensionSchema("float_dim_3"),
-            new DoubleDimensionSchema("double_dim_3"),
-            new DoubleDimensionSchema("string_to_double")
-        ),
-        null,
-        null
+    if (keepSegmentGranularity) {
+      return ImmutableList.of(
+          new DimensionsSpec(getDimensionSchema(new StringDimensionSchema("string_to_double"))),
+          new DimensionsSpec(getDimensionSchema(new StringDimensionSchema("string_to_double"))),
+          new DimensionsSpec(getDimensionSchema(new StringDimensionSchema("string_to_double"))),
+          new DimensionsSpec(getDimensionSchema(new StringDimensionSchema("string_to_double"))),
+          new DimensionsSpec(getDimensionSchema(new DoubleDimensionSchema("string_to_double")))
+      );
+    } else {
+      return Collections.singletonList(
+          new DimensionsSpec(getDimensionSchema(new DoubleDimensionSchema("string_to_double")))
+      );
+    }
+  }
+
+  private static List<DimensionSchema> getDimensionSchema(DimensionSchema mixedTypeColumn)
+  {
+    return Lists.newArrayList(
+        new LongDimensionSchema("timestamp"),
+        new StringDimensionSchema("string_dim_4"),
+        new LongDimensionSchema("long_dim_4"),
+        new FloatDimensionSchema("float_dim_4"),
+        new DoubleDimensionSchema("double_dim_4"),
+        new StringDimensionSchema("string_dim_0"),
+        new LongDimensionSchema("long_dim_0"),
+        new FloatDimensionSchema("float_dim_0"),
+        new DoubleDimensionSchema("double_dim_0"),
+        new StringDimensionSchema("string_dim_1"),
+        new LongDimensionSchema("long_dim_1"),
+        new FloatDimensionSchema("float_dim_1"),
+        new DoubleDimensionSchema("double_dim_1"),
+        new StringDimensionSchema("string_dim_2"),
+        new LongDimensionSchema("long_dim_2"),
+        new FloatDimensionSchema("float_dim_2"),
+        new DoubleDimensionSchema("double_dim_2"),
+        new StringDimensionSchema("string_dim_3"),
+        new LongDimensionSchema("long_dim_3"),
+        new FloatDimensionSchema("float_dim_3"),
+        new DoubleDimensionSchema("double_dim_3"),
+        mixedTypeColumn
     );
   }
 
   private static void assertIngestionSchema(
-      IndexIngestionSpec ingestionSchema,
-      DimensionsSpec expectedDimensionsSpec
+      List<IndexIngestionSpec> ingestionSchemas,
+      List<DimensionsSpec> expectedDimensionsSpecs,
+      List<Interval> expectedSegmentIntervals
   )
   {
-    // assert dataSchema
-    final DataSchema dataSchema = ingestionSchema.getDataSchema();
-    Assert.assertEquals(DATA_SOURCE, dataSchema.getDataSource());
-
-    final InputRowParser parser = objectMapper.convertValue(dataSchema.getParser(), InputRowParser.class);
-    Assert.assertTrue(parser instanceof TransformingInputRowParser);
-    Assert.assertTrue(((TransformingInputRowParser) parser).getParser() instanceof NoopInputRowParser);
-    Assert.assertTrue(parser.getParseSpec() instanceof TimeAndDimsParseSpec);
-    Assert.assertEquals(
-        new HashSet<>(expectedDimensionsSpec.getDimensions()),
-        new HashSet<>(parser.getParseSpec().getDimensionsSpec().getDimensions())
-    );
-    final Set<AggregatorFactory> expectedAggregators = AGGREGATORS.values()
-                                                                  .stream()
-                                                                  .map(AggregatorFactory::getCombiningFactory)
-                                                                  .collect(Collectors.toSet());
-    Assert.assertEquals(expectedAggregators, new HashSet<>(Arrays.asList(dataSchema.getAggregators())));
-    Assert.assertEquals(
-        new ArbitraryGranularitySpec(Granularities.NONE, false, ImmutableList.of(COMPACTION_INTERVAL)),
-        dataSchema.getGranularitySpec()
+    Preconditions.checkArgument(
+        ingestionSchemas.size() == expectedDimensionsSpecs.size(),
+        "ingesionSchemas.size()[%s] should be same with expectedDimensionsSpecs.size()[%s]",
+        ingestionSchemas.size(),
+        expectedDimensionsSpecs.size()
     );
 
-    // assert ioConfig
-    final IndexIOConfig ioConfig = ingestionSchema.getIOConfig();
-    Assert.assertFalse(ioConfig.isAppendToExisting());
-    final FirehoseFactory firehoseFactory = ioConfig.getFirehoseFactory();
-    Assert.assertTrue(firehoseFactory instanceof IngestSegmentFirehoseFactory);
-    final IngestSegmentFirehoseFactory ingestSegmentFirehoseFactory = (IngestSegmentFirehoseFactory) firehoseFactory;
-    Assert.assertEquals(DATA_SOURCE, ingestSegmentFirehoseFactory.getDataSource());
-    Assert.assertEquals(COMPACTION_INTERVAL, ingestSegmentFirehoseFactory.getInterval());
-    Assert.assertNull(ingestSegmentFirehoseFactory.getDimensionsFilter());
-
-    // check the order of dimensions
-    Assert.assertEquals(expectedDimensionsSpec.getDimensionNames(), ingestSegmentFirehoseFactory.getDimensions());
-    // check the order of metrics
-    Assert.assertEquals(
-        Lists.newArrayList("agg_4", "agg_3", "agg_2", "agg_1", "agg_0"),
-        ingestSegmentFirehoseFactory.getMetrics()
-    );
+    for (int i = 0; i < ingestionSchemas.size(); i++) {
+      final IndexIngestionSpec ingestionSchema = ingestionSchemas.get(i);
+      final DimensionsSpec expectedDimensionsSpec = expectedDimensionsSpecs.get(i);
+
+      // assert dataSchema
+      final DataSchema dataSchema = ingestionSchema.getDataSchema();
+      Assert.assertEquals(DATA_SOURCE, dataSchema.getDataSource());
+
+      final InputRowParser parser = objectMapper.convertValue(dataSchema.getParser(), InputRowParser.class);
+      Assert.assertTrue(parser instanceof TransformingInputRowParser);
+      Assert.assertTrue(((TransformingInputRowParser) parser).getParser() instanceof NoopInputRowParser);
+      Assert.assertTrue(parser.getParseSpec() instanceof TimeAndDimsParseSpec);
+      Assert.assertEquals(
+          new HashSet<>(expectedDimensionsSpec.getDimensions()),
+          new HashSet<>(parser.getParseSpec().getDimensionsSpec().getDimensions())
+      );
+      final Set<AggregatorFactory> expectedAggregators = AGGREGATORS.values()
+                                                                    .stream()
+                                                                    .map(AggregatorFactory::getCombiningFactory)
+                                                                    .collect(Collectors.toSet());
+      Assert.assertEquals(expectedAggregators, new HashSet<>(Arrays.asList(dataSchema.getAggregators())));
+      Assert.assertEquals(
+          new ArbitraryGranularitySpec(
+              Granularities.NONE,
+              false,
+              Collections.singletonList(expectedSegmentIntervals.get(i))
+          ),
+          dataSchema.getGranularitySpec()
+      );
 
-    // assert tuningConfig
-    Assert.assertEquals(createTuningConfig(), ingestionSchema.getTuningConfig());
+      // assert ioConfig
+      final IndexIOConfig ioConfig = ingestionSchema.getIOConfig();
+      Assert.assertFalse(ioConfig.isAppendToExisting());
+      final FirehoseFactory firehoseFactory = ioConfig.getFirehoseFactory();
+      Assert.assertTrue(firehoseFactory instanceof IngestSegmentFirehoseFactory);
+      final IngestSegmentFirehoseFactory ingestSegmentFirehoseFactory = (IngestSegmentFirehoseFactory) firehoseFactory;
+      Assert.assertEquals(DATA_SOURCE, ingestSegmentFirehoseFactory.getDataSource());
+      Assert.assertEquals(expectedSegmentIntervals.get(i), ingestSegmentFirehoseFactory.getInterval());
+      Assert.assertNull(ingestSegmentFirehoseFactory.getDimensionsFilter());
+
+      // check the order of dimensions
+      Assert.assertEquals(
+          new HashSet<>(expectedDimensionsSpec.getDimensionNames()),
+          new HashSet<>(ingestSegmentFirehoseFactory.getDimensions())
+      );
+      // check the order of metrics
+      Assert.assertEquals(
+          Lists.newArrayList("agg_4", "agg_3", "agg_2", "agg_1", "agg_0"),
+          ingestSegmentFirehoseFactory.getMetrics()
+      );
+
+      // assert tuningConfig
+      Assert.assertEquals(createTuningConfig(), ingestionSchema.getTuningConfig());
+    }
   }
 
   private static class TestTaskToolbox extends TaskToolbox
diff --git a/integration-tests/src/test/java/io/druid/tests/indexer/ITCompactionTaskTest.java b/integration-tests/src/test/java/io/druid/tests/indexer/ITCompactionTaskTest.java
index c100515..1a951b2 100644
--- a/integration-tests/src/test/java/io/druid/tests/indexer/ITCompactionTaskTest.java
+++ b/integration-tests/src/test/java/io/druid/tests/indexer/ITCompactionTaskTest.java
@@ -36,24 +36,50 @@ public class ITCompactionTaskTest extends AbstractIndexerTest
   private static String INDEX_QUERIES_RESOURCE = "/indexer/wikipedia_index_queries.json";
   private static String INDEX_DATASOURCE = "wikipedia_index_test";
   private static String COMPACTION_TASK = "/indexer/wikipedia_compaction_task.json";
-  private static String COMPACTED_INTERVAL = "2013-08-31T00:00:00.000Z/2013-09-02T00:00:00.000Z";
 
   @Test
-  public void testCompaction() throws Exception
+  public void testCompactionWithoutKeepSegmentGranularity() throws Exception
   {
     loadData();
     final List<String> intervalsBeforeCompaction = coordinator.getSegmentIntervals(INDEX_DATASOURCE);
-    if (intervalsBeforeCompaction.contains(COMPACTED_INTERVAL)) {
-      throw new ISE("Containing a segment for the compacted interval[%s] before compaction", COMPACTED_INTERVAL);
+    final String compactedInterval = "2013-08-31T00:00:00.000Z/2013-09-02T00:00:00.000Z";
+    if (intervalsBeforeCompaction.contains(compactedInterval)) {
+      throw new ISE("Containing a segment for the compacted interval[%s] before compaction", compactedInterval);
     }
     try {
       queryHelper.testQueriesFromFile(INDEX_QUERIES_RESOURCE, 2);
-      compactData();
+      compactData(false);
       queryHelper.testQueriesFromFile(INDEX_QUERIES_RESOURCE, 2);
 
       final List<String> intervalsAfterCompaction = coordinator.getSegmentIntervals(INDEX_DATASOURCE);
-      if (!intervalsAfterCompaction.contains(COMPACTED_INTERVAL)) {
-        throw new ISE("Compacted segment for interval[%s] does not exist", COMPACTED_INTERVAL);
+      if (!intervalsAfterCompaction.contains(compactedInterval)) {
+        throw new ISE("Compacted segment for interval[%s] does not exist", compactedInterval);
+      }
+    }
+    finally {
+      unloadAndKillData(INDEX_DATASOURCE);
+    }
+  }
+
+  @Test
+  public void testCompactionWithKeepSegmentGranularity() throws Exception
+  {
+    loadData();
+    final List<String> intervalsBeforeCompaction = coordinator.getSegmentIntervals(INDEX_DATASOURCE);
+    try {
+      queryHelper.testQueriesFromFile(INDEX_QUERIES_RESOURCE, 2);
+      compactData(true);
+      queryHelper.testQueriesFromFile(INDEX_QUERIES_RESOURCE, 2);
+
+      final List<String> intervalsAfterCompaction = coordinator.getSegmentIntervals(INDEX_DATASOURCE);
+      intervalsBeforeCompaction.sort(null);
+      intervalsAfterCompaction.sort(null);
+      if (!intervalsBeforeCompaction.equals(intervalsAfterCompaction)) {
+        throw new ISE(
+            "Intervals before compaction[%s] should be same with those after compaction[%s]",
+            intervalsBeforeCompaction,
+            intervalsAfterCompaction
+        );
       }
     }
     finally {
@@ -73,9 +99,11 @@ public class ITCompactionTaskTest extends AbstractIndexerTest
     );
   }
 
-  private void compactData() throws Exception
+  private void compactData(boolean keepSegmentGranularity) throws Exception
   {
-    final String taskID = indexer.submitTask(getTaskAsString(COMPACTION_TASK));
+    final String template = getTaskAsString(COMPACTION_TASK);
+    final String taskSpec = template.replace("${KEEP_SEGMENT_GRANULARITY}", Boolean.toString(keepSegmentGranularity));
+    final String taskID = indexer.submitTask(taskSpec);
     LOG.info("TaskID for compaction task %s", taskID);
     indexer.waitUntilTaskCompletes(taskID);
 
diff --git a/integration-tests/src/test/resources/indexer/wikipedia_compaction_task.json b/integration-tests/src/test/resources/indexer/wikipedia_compaction_task.json
index bc7f786..3fdad69 100644
--- a/integration-tests/src/test/resources/indexer/wikipedia_compaction_task.json
+++ b/integration-tests/src/test/resources/indexer/wikipedia_compaction_task.json
@@ -1,5 +1,6 @@
 {
   "type" : "compact",
   "dataSource" : "wikipedia_index_test",
-  "interval" : "2013-08-31/2013-09-02"
+  "interval" : "2013-08-31/2013-09-02",
+  "keepSegmentGranularity" : ${KEEP_SEGMENT_GRANULARITY}
 }
\ No newline at end of file
diff --git a/integration-tests/src/test/resources/indexer/wikipedia_index_task.json b/integration-tests/src/test/resources/indexer/wikipedia_index_task.json
index af7b98a..8b3eab8 100644
--- a/integration-tests/src/test/resources/indexer/wikipedia_index_task.json
+++ b/integration-tests/src/test/resources/indexer/wikipedia_index_task.json
@@ -63,7 +63,8 @@
             }
         },
         "tuningConfig": {
-            "type": "index"
+            "type": "index",
+            "targetPartitionSize": 3
         }
     }
 }
\ No newline at end of file
diff --git a/server/src/main/java/io/druid/client/indexing/ClientCompactQuery.java b/server/src/main/java/io/druid/client/indexing/ClientCompactQuery.java
index e1e3292..db40fec 100644
--- a/server/src/main/java/io/druid/client/indexing/ClientCompactQuery.java
+++ b/server/src/main/java/io/druid/client/indexing/ClientCompactQuery.java
@@ -30,6 +30,7 @@ public class ClientCompactQuery
 {
   private final String dataSource;
   private final List<DataSegment> segments;
+  private final boolean keepSegmentGranularity;
   private final ClientCompactQueryTuningConfig tuningConfig;
   private final Map<String, Object> context;
 
@@ -37,12 +38,14 @@ public class ClientCompactQuery
   public ClientCompactQuery(
       @JsonProperty("dataSource") String dataSource,
       @JsonProperty("segments") List<DataSegment> segments,
+      @JsonProperty("keepSegmentGranularity") boolean keepSegmentGranularity,
       @JsonProperty("tuningConfig") ClientCompactQueryTuningConfig tuningConfig,
       @JsonProperty("context") Map<String, Object> context
   )
   {
     this.dataSource = dataSource;
     this.segments = segments;
+    this.keepSegmentGranularity = keepSegmentGranularity;
     this.tuningConfig = tuningConfig;
     this.context = context;
   }
@@ -66,6 +69,12 @@ public class ClientCompactQuery
   }
 
   @JsonProperty
+  public boolean isKeepSegmentGranularity()
+  {
+    return keepSegmentGranularity;
+  }
+
+  @JsonProperty
   public ClientCompactQueryTuningConfig getTuningConfig()
   {
     return tuningConfig;
diff --git a/server/src/main/java/io/druid/client/indexing/HttpIndexingServiceClient.java b/server/src/main/java/io/druid/client/indexing/HttpIndexingServiceClient.java
index b012db6..c81bb0f 100644
--- a/server/src/main/java/io/druid/client/indexing/HttpIndexingServiceClient.java
+++ b/server/src/main/java/io/druid/client/indexing/HttpIndexingServiceClient.java
@@ -96,6 +96,7 @@ public class HttpIndexingServiceClient implements IndexingServiceClient
   @Override
   public String compactSegments(
       List<DataSegment> segments,
+      boolean keepSegmentGranularity,
       int compactionTaskPriority,
       @Nullable ClientCompactQueryTuningConfig tuningConfig,
       @Nullable Map<String, Object> context
@@ -112,7 +113,7 @@ public class HttpIndexingServiceClient implements IndexingServiceClient
     context = context == null ? new HashMap<>() : context;
     context.put("priority", compactionTaskPriority);
 
-    return runTask(new ClientCompactQuery(dataSource, segments, tuningConfig, context));
+    return runTask(new ClientCompactQuery(dataSource, segments, keepSegmentGranularity, tuningConfig, context));
   }
 
   @Override
diff --git a/server/src/main/java/io/druid/client/indexing/IndexingServiceClient.java b/server/src/main/java/io/druid/client/indexing/IndexingServiceClient.java
index 80e47d0..72c9794 100644
--- a/server/src/main/java/io/druid/client/indexing/IndexingServiceClient.java
+++ b/server/src/main/java/io/druid/client/indexing/IndexingServiceClient.java
@@ -40,6 +40,7 @@ public interface IndexingServiceClient
 
   String compactSegments(
       List<DataSegment> segments,
+      boolean keepSegmentGranularity,
       int compactionTaskPriority,
       @Nullable ClientCompactQueryTuningConfig tuningConfig,
       @Nullable Map<String, Object> context
diff --git a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java
index 6e368ed..4a3e490 100644
--- a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java
+++ b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java
@@ -150,8 +150,11 @@ public class DruidCoordinatorSegmentCompactor implements DruidCoordinatorHelper
 
       if (segmentsToCompact.size() > 1) {
         final DataSourceCompactionConfig config = compactionConfigs.get(dataSourceName);
+        // Currently set keepSegmentGranularity to false because it breaks the algorithm of CompactionSegmentIterator to
+        // find segments to be compacted.
         final String taskId = indexingServiceClient.compactSegments(
             segmentsToCompact,
+            false,
             config.getTaskPriority(),
             config.getTuningConfig(),
             config.getTaskContext()
diff --git a/server/src/test/java/io/druid/client/indexing/NoopIndexingServiceClient.java b/server/src/test/java/io/druid/client/indexing/NoopIndexingServiceClient.java
index 030c991..e7889dc 100644
--- a/server/src/test/java/io/druid/client/indexing/NoopIndexingServiceClient.java
+++ b/server/src/test/java/io/druid/client/indexing/NoopIndexingServiceClient.java
@@ -57,6 +57,7 @@ public class NoopIndexingServiceClient implements IndexingServiceClient
   @Override
   public String compactSegments(
       List<DataSegment> segments,
+      boolean keepSegmentGranularity,
       int compactionTaskPriority,
       @Nullable ClientCompactQueryTuningConfig tuningConfig,
       @Nullable Map<String, Object> context
diff --git a/server/src/test/java/io/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactorTest.java b/server/src/test/java/io/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactorTest.java
index 52ba82c..62f79b2 100644
--- a/server/src/test/java/io/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactorTest.java
+++ b/server/src/test/java/io/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactorTest.java
@@ -64,6 +64,7 @@ public class DruidCoordinatorSegmentCompactorTest
     @Override
     public String compactSegments(
         List<DataSegment> segments,
+        boolean keepSegmentGranularity,
         int compactionTaskPriority,
         ClientCompactQueryTuningConfig tuningConfig,
         Map<String, Object> context


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org