You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2018/08/09 20:51:23 UTC

[GitHub] gianm closed pull request #6095: Add support 'keepSegmentGranularity' for compactionTask

gianm closed pull request #6095: Add support 'keepSegmentGranularity' for compactionTask
URL: https://github.com/apache/incubator-druid/pull/6095
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/api/src/main/java/io/druid/data/input/impl/DimensionsSpec.java b/api/src/main/java/io/druid/data/input/impl/DimensionsSpec.java
index 837a48261c3..be801637031 100644
--- a/api/src/main/java/io/druid/data/input/impl/DimensionsSpec.java
+++ b/api/src/main/java/io/druid/data/input/impl/DimensionsSpec.java
@@ -101,6 +101,10 @@ public DimensionsSpec(
     }
   }
 
+  public DimensionsSpec(List<DimensionSchema> dimensions)
+  {
+    this(dimensions, null, null);
+  }
 
   @JsonProperty
   public List<DimensionSchema> getDimensions()
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/CompactionTask.java
index f81999f15f2..9f1fb567130 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/CompactionTask.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/CompactionTask.java
@@ -28,7 +28,6 @@
 import com.google.common.base.Preconditions;
 import com.google.common.collect.BiMap;
 import com.google.common.collect.HashBiMap;
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import io.druid.data.input.impl.DimensionSchema;
 import io.druid.data.input.impl.DimensionSchema.MultiValueHandling;
@@ -54,7 +53,7 @@
 import io.druid.java.util.common.JodaUtils;
 import io.druid.java.util.common.Pair;
 import io.druid.java.util.common.RE;
-import io.druid.java.util.common.granularity.NoneGranularity;
+import io.druid.java.util.common.granularity.Granularities;
 import io.druid.java.util.common.guava.Comparators;
 import io.druid.java.util.common.jackson.JacksonUtils;
 import io.druid.java.util.common.logger.Logger;
@@ -96,17 +95,19 @@
 {
   private static final Logger log = new Logger(CompactionTask.class);
   private static final String TYPE = "compact";
+  private static final boolean DEFAULT_KEEP_SEGMENT_GRANULARITY = true;
 
   private final Interval interval;
   private final List<DataSegment> segments;
   private final DimensionsSpec dimensionsSpec;
+  private final boolean keepSegmentGranularity;
   private final IndexTuningConfig tuningConfig;
   private final ObjectMapper jsonMapper;
   @JsonIgnore
   private final SegmentProvider segmentProvider;
 
   @JsonIgnore
-  private IndexTask indexTaskSpec;
+  private List<IndexTask> indexTaskSpecs;
 
   @JsonIgnore
   private final AuthorizerMapper authorizerMapper;
@@ -125,6 +126,7 @@ public CompactionTask(
       @Nullable @JsonProperty("interval") final Interval interval,
       @Nullable @JsonProperty("segments") final List<DataSegment> segments,
       @Nullable @JsonProperty("dimensions") final DimensionsSpec dimensionsSpec,
+      @Nullable @JsonProperty("keepSegmentGranularity") final Boolean keepSegmentGranularity,
       @Nullable @JsonProperty("tuningConfig") final IndexTuningConfig tuningConfig,
       @Nullable @JsonProperty("context") final Map<String, Object> context,
       @JacksonInject ObjectMapper jsonMapper,
@@ -144,6 +146,9 @@ public CompactionTask(
     this.interval = interval;
     this.segments = segments;
     this.dimensionsSpec = dimensionsSpec;
+    this.keepSegmentGranularity = keepSegmentGranularity == null
+                                  ? DEFAULT_KEEP_SEGMENT_GRANULARITY
+                                  : keepSegmentGranularity;
     this.tuningConfig = tuningConfig;
     this.jsonMapper = jsonMapper;
     this.segmentProvider = segments == null ? new SegmentProvider(dataSource, interval) : new SegmentProvider(segments);
@@ -170,6 +175,12 @@ public DimensionsSpec getDimensionsSpec()
     return dimensionsSpec;
   }
 
+  @JsonProperty
+  public boolean isKeepSegmentGranularity()
+  {
+    return keepSegmentGranularity;
+  }
+
   @JsonProperty
   public IndexTuningConfig getTuningConfig()
   {
@@ -205,52 +216,65 @@ public boolean isReady(TaskActionClient taskActionClient) throws Exception
   @Override
   public TaskStatus run(final TaskToolbox toolbox) throws Exception
   {
-    if (indexTaskSpec == null) {
-      final IndexIngestionSpec ingestionSpec = createIngestionSchema(
+    if (indexTaskSpecs == null) {
+      indexTaskSpecs = createIngestionSchema(
           toolbox,
           segmentProvider,
           dimensionsSpec,
+          keepSegmentGranularity,
           tuningConfig,
           jsonMapper
-      );
-
-      if (ingestionSpec != null) {
-        indexTaskSpec = new IndexTask(
-            getId(),
-            getGroupId(),
-            getTaskResource(),
-            getDataSource(),
-            ingestionSpec,
-            getContext(),
-            authorizerMapper,
-            chatHandlerProvider,
-            rowIngestionMetersFactory
-        );
-      }
+      ).stream()
+      .map(spec -> new IndexTask(
+          getId(),
+          getGroupId(),
+          getTaskResource(),
+          getDataSource(),
+          spec,
+          getContext(),
+          authorizerMapper,
+          chatHandlerProvider,
+          rowIngestionMetersFactory
+      ))
+      .collect(Collectors.toList());
     }
 
-    if (indexTaskSpec == null) {
+    if (indexTaskSpecs.isEmpty()) {
       log.warn("Interval[%s] has no segments, nothing to do.", interval);
       return TaskStatus.failure(getId());
     } else {
-      final String json = jsonMapper.writerWithDefaultPrettyPrinter().writeValueAsString(indexTaskSpec);
-      log.info("Generated compaction task details: " + json);
+      log.info("Generated [%d] compaction task specs", indexTaskSpecs.size());
 
-      return indexTaskSpec.run(toolbox);
+      for (IndexTask eachSpec : indexTaskSpecs) {
+        final String json = jsonMapper.writerWithDefaultPrettyPrinter().writeValueAsString(eachSpec);
+        log.info("Running indexSpec: " + json);
+
+        try {
+          final TaskStatus eachResult = eachSpec.run(toolbox);
+          if (!eachResult.isSuccess()) {
+            log.warn("Failed to run indexSpec: [%s].\nTrying the next indexSpec.", json);
+          }
+        }
+        catch (Exception e) {
+          log.warn(e, "Failed to run indexSpec: [%s].\nTrying the next indexSpec.", json);
+        }
+      }
+
+      return TaskStatus.success(getId());
     }
   }
 
   /**
    * Generate {@link IndexIngestionSpec} from input segments.
    *
-   * @return null if input segments don't exist. Otherwise, a generated ingestionSpec.
+   * @return an empty list if input segments don't exist. Otherwise, a generated ingestionSpec.
    */
-  @Nullable
   @VisibleForTesting
-  static IndexIngestionSpec createIngestionSchema(
+  static List<IndexIngestionSpec> createIngestionSchema(
       TaskToolbox toolbox,
       SegmentProvider segmentProvider,
       DimensionsSpec dimensionsSpec,
+      boolean keepSegmentGranularity,
       IndexTuningConfig tuningConfig,
       ObjectMapper jsonMapper
   ) throws IOException, SegmentLoadingException
@@ -263,33 +287,68 @@ static IndexIngestionSpec createIngestionSchema(
     final List<TimelineObjectHolder<String, DataSegment>> timelineSegments = pair.rhs;
 
     if (timelineSegments.size() == 0) {
-      return null;
+      return Collections.emptyList();
     }
 
-    final DataSchema dataSchema = createDataSchema(
-        segmentProvider.dataSource,
-        segmentProvider.interval,
-        dimensionsSpec,
-        toolbox.getIndexIO(),
-        jsonMapper,
-        timelineSegments,
-        segmentFileMap
-    );
-    return new IndexIngestionSpec(
-        dataSchema,
-        new IndexIOConfig(
-            new IngestSegmentFirehoseFactory(
-                segmentProvider.dataSource,
-                segmentProvider.interval,
-                null, // no filter
-                // set dimensions and metrics names to make sure that the generated dataSchema is used for the firehose
-                dataSchema.getParser().getParseSpec().getDimensionsSpec().getDimensionNames(),
-                Arrays.stream(dataSchema.getAggregators()).map(AggregatorFactory::getName).collect(Collectors.toList()),
-                toolbox.getIndexIO()
-            ),
-            false
+    if (keepSegmentGranularity) {
+      // if keepSegmentGranularity = true, create indexIngestionSpec per segment interval, so that we can run an index
+      // task per segment interval.
+      final List<IndexIngestionSpec> specs = new ArrayList<>(timelineSegments.size());
+      for (TimelineObjectHolder<String, DataSegment> holder : timelineSegments) {
+        final DataSchema dataSchema = createDataSchema(
+            segmentProvider.dataSource,
+            holder.getInterval(),
+            Collections.singletonList(holder),
+            dimensionsSpec,
+            toolbox.getIndexIO(),
+            jsonMapper,
+            segmentFileMap
+        );
+
+        specs.add(
+            new IndexIngestionSpec(
+                dataSchema,
+                createIoConfig(toolbox, dataSchema, holder.getInterval()),
+                tuningConfig
+            )
+        );
+      }
+
+      return specs;
+    } else {
+      final DataSchema dataSchema = createDataSchema(
+          segmentProvider.dataSource,
+          segmentProvider.interval,
+          timelineSegments,
+          dimensionsSpec,
+          toolbox.getIndexIO(),
+          jsonMapper,
+          segmentFileMap
+      );
+
+      return Collections.singletonList(
+          new IndexIngestionSpec(
+              dataSchema,
+              createIoConfig(toolbox, dataSchema, segmentProvider.interval),
+              tuningConfig
+          )
+      );
+    }
+  }
+
+  private static IndexIOConfig createIoConfig(TaskToolbox toolbox, DataSchema dataSchema, Interval interval)
+  {
+    return new IndexIOConfig(
+        new IngestSegmentFirehoseFactory(
+            dataSchema.getDataSource(),
+            interval,
+            null, // no filter
+            // set dimensions and metrics names to make sure that the generated dataSchema is used for the firehose
+            dataSchema.getParser().getParseSpec().getDimensionsSpec().getDimensionNames(),
+            Arrays.stream(dataSchema.getAggregators()).map(AggregatorFactory::getName).collect(Collectors.toList()),
+            toolbox.getIndexIO()
         ),
-        tuningConfig
+        false
     );
   }
 
@@ -308,18 +367,18 @@ static IndexIngestionSpec createIngestionSchema(
 
   private static DataSchema createDataSchema(
       String dataSource,
-      Interval interval,
+      Interval totalInterval,
+      List<TimelineObjectHolder<String, DataSegment>> timelineObjectHolder,
       DimensionsSpec dimensionsSpec,
       IndexIO indexIO,
       ObjectMapper jsonMapper,
-      List<TimelineObjectHolder<String, DataSegment>> timelineSegments,
       Map<DataSegment, File> segmentFileMap
   )
       throws IOException
   {
     // find metadata for interval
     final List<Pair<QueryableIndex, DataSegment>> queryableIndexAndSegments = loadSegments(
-        timelineSegments,
+        timelineObjectHolder,
         segmentFileMap,
         indexIO
     );
@@ -348,10 +407,11 @@ private static DataSchema createDataSchema(
       final Boolean isRollup = pair.lhs.getMetadata().isRollup();
       return isRollup != null && isRollup;
     });
+
     final GranularitySpec granularitySpec = new ArbitraryGranularitySpec(
-        new NoneGranularity(),
+        Granularities.NONE,
         rollup,
-        ImmutableList.of(interval)
+        Collections.singletonList(totalInterval)
     );
 
     // find unique dimensions
@@ -444,15 +504,15 @@ private static DimensionsSpec createDimensionsSpec(List<Pair<QueryableIndex, Dat
   }
 
   private static List<Pair<QueryableIndex, DataSegment>> loadSegments(
-      List<TimelineObjectHolder<String, DataSegment>> timelineSegments,
+      List<TimelineObjectHolder<String, DataSegment>> timelineObjectHolders,
       Map<DataSegment, File> segmentFileMap,
       IndexIO indexIO
   ) throws IOException
   {
     final List<Pair<QueryableIndex, DataSegment>> segments = new ArrayList<>();
 
-    for (TimelineObjectHolder<String, DataSegment> timelineSegment : timelineSegments) {
-      final PartitionHolder<DataSegment> partitionHolder = timelineSegment.getObject();
+    for (TimelineObjectHolder<String, DataSegment> timelineObjectHolder : timelineObjectHolders) {
+      final PartitionHolder<DataSegment> partitionHolder = timelineObjectHolder.getObject();
       for (PartitionChunk<DataSegment> chunk : partitionHolder) {
         final DataSegment segment = chunk.getObject();
         final QueryableIndex queryableIndex = indexIO.loadIndex(
diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/CompactionTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/CompactionTaskTest.java
index 2288393bbe8..6277a9e8f11 100644
--- a/indexing-service/src/test/java/io/druid/indexing/common/task/CompactionTaskTest.java
+++ b/indexing-service/src/test/java/io/druid/indexing/common/task/CompactionTaskTest.java
@@ -98,11 +98,16 @@
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
 
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -110,13 +115,22 @@
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
+@RunWith(Parameterized.class)
 public class CompactionTaskTest
 {
   private static final String DATA_SOURCE = "dataSource";
   private static final String TIMESTAMP_COLUMN = "timestamp";
   private static final String MIXED_TYPE_COLUMN = "string_to_double";
   private static final Interval COMPACTION_INTERVAL = Intervals.of("2017-01-01/2017-06-01");
+  private static final List<Interval> SEGMENT_INTERVALS = ImmutableList.of(
+      Intervals.of("2017-01-01/2017-02-01"),
+      Intervals.of("2017-02-01/2017-03-01"),
+      Intervals.of("2017-03-01/2017-04-01"),
+      Intervals.of("2017-04-01/2017-05-01"),
+      Intervals.of("2017-05-01/2017-06-01")
+  );
   private static final Map<Interval, DimensionSchema> MIXED_TYPE_COLUMN_MAP = ImmutableMap.of(
       Intervals.of("2017-01-01/2017-02-01"),
       new StringDimensionSchema(MIXED_TYPE_COLUMN),
@@ -138,6 +152,8 @@
   private static ObjectMapper objectMapper = setupInjectablesInObjectMapper(new DefaultObjectMapper());
   private static Map<DataSegment, File> segmentMap;
 
+  private final boolean keepSegmentGranularity;
+
   private TaskToolbox toolbox;
 
   @BeforeClass
@@ -288,6 +304,20 @@ public void setup()
     );
   }
 
+  @Parameters(name = "keepSegmentGranularity={0}")
+  public static Collection<Object[]> parameters()
+  {
+    return ImmutableList.of(
+        new Object[] {false},
+        new Object[] {true}
+    );
+  }
+
+  public CompactionTaskTest(boolean keepSegmentGranularity)
+  {
+    this.keepSegmentGranularity = keepSegmentGranularity;
+  }
+
   @Test
   public void testSerdeWithInterval() throws IOException
   {
@@ -298,6 +328,7 @@ public void testSerdeWithInterval() throws IOException
         COMPACTION_INTERVAL,
         null,
         null,
+        null,
         createTuningConfig(),
         ImmutableMap.of("testKey", "testContext"),
         objectMapper,
@@ -327,6 +358,7 @@ public void testSerdeWithSegments() throws IOException
         null,
         SEGMENTS,
         null,
+        null,
         createTuningConfig(),
         ImmutableMap.of("testKey", "testContext"),
         objectMapper,
@@ -346,18 +378,51 @@ public void testSerdeWithSegments() throws IOException
   }
 
   @Test
-  public void testCreateIngestionSchema() throws IOException, SegmentLoadingException
+  public void testCreateIngestionSchemaWithKeepSegmentGranularity() throws IOException, SegmentLoadingException
+  {
+    final List<IndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
+        toolbox,
+        new SegmentProvider(DATA_SOURCE, COMPACTION_INTERVAL),
+        null,
+        keepSegmentGranularity,
+        TUNING_CONFIG,
+        objectMapper
+    );
+    final List<DimensionsSpec> expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration(
+        keepSegmentGranularity
+    );
+
+    if (keepSegmentGranularity) {
+      Assert.assertEquals(5, ingestionSpecs.size());
+      assertIngestionSchema(ingestionSpecs, expectedDimensionsSpec, SEGMENT_INTERVALS);
+    } else {
+      Assert.assertEquals(1, ingestionSpecs.size());
+      assertIngestionSchema(ingestionSpecs, expectedDimensionsSpec, Collections.singletonList(COMPACTION_INTERVAL));
+    }
+  }
+
+  @Test
+  public void testCreateIngestionSchemaWithIgnoreSegmentGranularity() throws IOException, SegmentLoadingException
   {
-    final IndexIngestionSpec ingestionSchema = CompactionTask.createIngestionSchema(
+    final List<IndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
         toolbox,
         new SegmentProvider(DATA_SOURCE, COMPACTION_INTERVAL),
         null,
+        keepSegmentGranularity,
         TUNING_CONFIG,
         objectMapper
     );
-    final DimensionsSpec expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration();
+    final List<DimensionsSpec> expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration(
+        keepSegmentGranularity
+    );
 
-    assertIngestionSchema(ingestionSchema, expectedDimensionsSpec);
+    if (keepSegmentGranularity) {
+      Assert.assertEquals(5, ingestionSpecs.size());
+      assertIngestionSchema(ingestionSpecs, expectedDimensionsSpec, SEGMENT_INTERVALS);
+    } else {
+      Assert.assertEquals(1, ingestionSpecs.size());
+      assertIngestionSchema(ingestionSpecs, expectedDimensionsSpec, Collections.singletonList(COMPACTION_INTERVAL));
+    }
   }
 
   @Test
@@ -387,35 +452,59 @@ public void testCreateIngestionSchemaWithCustomDimensionsSpec() throws IOExcepti
             new DoubleDimensionSchema("double_dim_3"),
             new DoubleDimensionSchema("double_dim_4"),
             new StringDimensionSchema(MIXED_TYPE_COLUMN)
-        ),
-        null,
-        null
+        )
     );
 
-    final IndexIngestionSpec ingestionSchema = CompactionTask.createIngestionSchema(
+    final List<IndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
         toolbox,
         new SegmentProvider(DATA_SOURCE, COMPACTION_INTERVAL),
         customSpec,
+        keepSegmentGranularity,
         TUNING_CONFIG,
         objectMapper
     );
 
-    assertIngestionSchema(ingestionSchema, customSpec);
+    if (keepSegmentGranularity) {
+      Assert.assertEquals(5, ingestionSpecs.size());
+      final List<DimensionsSpec> dimensionsSpecs = new ArrayList<>(5);
+      IntStream.range(0, 5).forEach(i -> dimensionsSpecs.add(customSpec));
+      assertIngestionSchema(
+          ingestionSpecs,
+          dimensionsSpecs,
+          SEGMENT_INTERVALS
+      );
+    } else {
+      Assert.assertEquals(1, ingestionSpecs.size());
+      assertIngestionSchema(
+          ingestionSpecs,
+          Collections.singletonList(customSpec),
+          Collections.singletonList(COMPACTION_INTERVAL)
+      );
+    }
   }
 
   @Test
   public void testCreateIngestionSchemaWithCustomSegments() throws IOException, SegmentLoadingException
   {
-    final IndexIngestionSpec ingestionSchema = CompactionTask.createIngestionSchema(
+    final List<IndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
         toolbox,
         new SegmentProvider(SEGMENTS),
         null,
+        keepSegmentGranularity,
         TUNING_CONFIG,
         objectMapper
     );
-    final DimensionsSpec expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration();
+    final List<DimensionsSpec> expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration(
+        keepSegmentGranularity
+    );
 
-    assertIngestionSchema(ingestionSchema, expectedDimensionsSpec);
+    if (keepSegmentGranularity) {
+      Assert.assertEquals(5, ingestionSpecs.size());
+      assertIngestionSchema(ingestionSpecs, expectedDimensionsSpec, SEGMENT_INTERVALS);
+    } else {
+      Assert.assertEquals(1, ingestionSpecs.size());
+      assertIngestionSchema(ingestionSpecs, expectedDimensionsSpec, Collections.singletonList(COMPACTION_INTERVAL));
+    }
   }
 
   @Test
@@ -430,6 +519,7 @@ public void testCreateIngestionSchemaWithDifferentSegmentSet() throws IOExceptio
         toolbox,
         new SegmentProvider(segments),
         null,
+        keepSegmentGranularity,
         TUNING_CONFIG,
         objectMapper
     );
@@ -448,13 +538,14 @@ public void testMissingMetadata() throws IOException, SegmentLoadingException
         toolbox,
         new SegmentProvider(segments),
         null,
+        keepSegmentGranularity,
         TUNING_CONFIG,
         objectMapper
     );
   }
 
   @Test
-  public void testEmptyInterval() throws Exception
+  public void testEmptyInterval()
   {
     expectedException.expect(IllegalArgumentException.class);
     expectedException.expectMessage(CoreMatchers.containsString("must specify a nonempty interval"));
@@ -468,6 +559,7 @@ public void testEmptyInterval() throws Exception
         null,
         null,
         null,
+        null,
         objectMapper,
         AuthTestUtils.TEST_AUTHORIZER_MAPPER,
         new NoopChatHandlerProvider(),
@@ -475,85 +567,118 @@ public void testEmptyInterval() throws Exception
     );
   }
 
-  private static DimensionsSpec getExpectedDimensionsSpecForAutoGeneration()
+  private static List<DimensionsSpec> getExpectedDimensionsSpecForAutoGeneration(boolean keepSegmentGranularity)
   {
-    return new DimensionsSpec(
-        Lists.newArrayList(
-            new LongDimensionSchema("timestamp"),
-            new StringDimensionSchema("string_dim_4"),
-            new LongDimensionSchema("long_dim_4"),
-            new FloatDimensionSchema("float_dim_4"),
-            new DoubleDimensionSchema("double_dim_4"),
-            new StringDimensionSchema("string_dim_0"),
-            new LongDimensionSchema("long_dim_0"),
-            new FloatDimensionSchema("float_dim_0"),
-            new DoubleDimensionSchema("double_dim_0"),
-            new StringDimensionSchema("string_dim_1"),
-            new LongDimensionSchema("long_dim_1"),
-            new FloatDimensionSchema("float_dim_1"),
-            new DoubleDimensionSchema("double_dim_1"),
-            new StringDimensionSchema("string_dim_2"),
-            new LongDimensionSchema("long_dim_2"),
-            new FloatDimensionSchema("float_dim_2"),
-            new DoubleDimensionSchema("double_dim_2"),
-            new StringDimensionSchema("string_dim_3"),
-            new LongDimensionSchema("long_dim_3"),
-            new FloatDimensionSchema("float_dim_3"),
-            new DoubleDimensionSchema("double_dim_3"),
-            new DoubleDimensionSchema("string_to_double")
-        ),
-        null,
-        null
+    if (keepSegmentGranularity) {
+      return ImmutableList.of(
+          new DimensionsSpec(getDimensionSchema(new StringDimensionSchema("string_to_double"))),
+          new DimensionsSpec(getDimensionSchema(new StringDimensionSchema("string_to_double"))),
+          new DimensionsSpec(getDimensionSchema(new StringDimensionSchema("string_to_double"))),
+          new DimensionsSpec(getDimensionSchema(new StringDimensionSchema("string_to_double"))),
+          new DimensionsSpec(getDimensionSchema(new DoubleDimensionSchema("string_to_double")))
+      );
+    } else {
+      return Collections.singletonList(
+          new DimensionsSpec(getDimensionSchema(new DoubleDimensionSchema("string_to_double")))
+      );
+    }
+  }
+
+  private static List<DimensionSchema> getDimensionSchema(DimensionSchema mixedTypeColumn)
+  {
+    return Lists.newArrayList(
+        new LongDimensionSchema("timestamp"),
+        new StringDimensionSchema("string_dim_4"),
+        new LongDimensionSchema("long_dim_4"),
+        new FloatDimensionSchema("float_dim_4"),
+        new DoubleDimensionSchema("double_dim_4"),
+        new StringDimensionSchema("string_dim_0"),
+        new LongDimensionSchema("long_dim_0"),
+        new FloatDimensionSchema("float_dim_0"),
+        new DoubleDimensionSchema("double_dim_0"),
+        new StringDimensionSchema("string_dim_1"),
+        new LongDimensionSchema("long_dim_1"),
+        new FloatDimensionSchema("float_dim_1"),
+        new DoubleDimensionSchema("double_dim_1"),
+        new StringDimensionSchema("string_dim_2"),
+        new LongDimensionSchema("long_dim_2"),
+        new FloatDimensionSchema("float_dim_2"),
+        new DoubleDimensionSchema("double_dim_2"),
+        new StringDimensionSchema("string_dim_3"),
+        new LongDimensionSchema("long_dim_3"),
+        new FloatDimensionSchema("float_dim_3"),
+        new DoubleDimensionSchema("double_dim_3"),
+        mixedTypeColumn
     );
   }
 
   private static void assertIngestionSchema(
-      IndexIngestionSpec ingestionSchema,
-      DimensionsSpec expectedDimensionsSpec
+      List<IndexIngestionSpec> ingestionSchemas,
+      List<DimensionsSpec> expectedDimensionsSpecs,
+      List<Interval> expectedSegmentIntervals
   )
   {
-    // assert dataSchema
-    final DataSchema dataSchema = ingestionSchema.getDataSchema();
-    Assert.assertEquals(DATA_SOURCE, dataSchema.getDataSource());
-
-    final InputRowParser parser = objectMapper.convertValue(dataSchema.getParser(), InputRowParser.class);
-    Assert.assertTrue(parser instanceof TransformingInputRowParser);
-    Assert.assertTrue(((TransformingInputRowParser) parser).getParser() instanceof NoopInputRowParser);
-    Assert.assertTrue(parser.getParseSpec() instanceof TimeAndDimsParseSpec);
-    Assert.assertEquals(
-        new HashSet<>(expectedDimensionsSpec.getDimensions()),
-        new HashSet<>(parser.getParseSpec().getDimensionsSpec().getDimensions())
-    );
-    final Set<AggregatorFactory> expectedAggregators = AGGREGATORS.values()
-                                                                  .stream()
-                                                                  .map(AggregatorFactory::getCombiningFactory)
-                                                                  .collect(Collectors.toSet());
-    Assert.assertEquals(expectedAggregators, new HashSet<>(Arrays.asList(dataSchema.getAggregators())));
-    Assert.assertEquals(
-        new ArbitraryGranularitySpec(Granularities.NONE, false, ImmutableList.of(COMPACTION_INTERVAL)),
-        dataSchema.getGranularitySpec()
+    Preconditions.checkArgument(
+        ingestionSchemas.size() == expectedDimensionsSpecs.size(),
+        "ingesionSchemas.size()[%s] should be same with expectedDimensionsSpecs.size()[%s]",
+        ingestionSchemas.size(),
+        expectedDimensionsSpecs.size()
     );
 
-    // assert ioConfig
-    final IndexIOConfig ioConfig = ingestionSchema.getIOConfig();
-    Assert.assertFalse(ioConfig.isAppendToExisting());
-    final FirehoseFactory firehoseFactory = ioConfig.getFirehoseFactory();
-    Assert.assertTrue(firehoseFactory instanceof IngestSegmentFirehoseFactory);
-    final IngestSegmentFirehoseFactory ingestSegmentFirehoseFactory = (IngestSegmentFirehoseFactory) firehoseFactory;
-    Assert.assertEquals(DATA_SOURCE, ingestSegmentFirehoseFactory.getDataSource());
-    Assert.assertEquals(COMPACTION_INTERVAL, ingestSegmentFirehoseFactory.getInterval());
-    Assert.assertNull(ingestSegmentFirehoseFactory.getDimensionsFilter());
-
-    // check the order of dimensions
-    Assert.assertEquals(expectedDimensionsSpec.getDimensionNames(), ingestSegmentFirehoseFactory.getDimensions());
-    // check the order of metrics
-    Assert.assertEquals(
-        Lists.newArrayList("agg_4", "agg_3", "agg_2", "agg_1", "agg_0"),
-        ingestSegmentFirehoseFactory.getMetrics()
-    );
+    for (int i = 0; i < ingestionSchemas.size(); i++) {
+      final IndexIngestionSpec ingestionSchema = ingestionSchemas.get(i);
+      final DimensionsSpec expectedDimensionsSpec = expectedDimensionsSpecs.get(i);
+
+      // assert dataSchema
+      final DataSchema dataSchema = ingestionSchema.getDataSchema();
+      Assert.assertEquals(DATA_SOURCE, dataSchema.getDataSource());
+
+      final InputRowParser parser = objectMapper.convertValue(dataSchema.getParser(), InputRowParser.class);
+      Assert.assertTrue(parser instanceof TransformingInputRowParser);
+      Assert.assertTrue(((TransformingInputRowParser) parser).getParser() instanceof NoopInputRowParser);
+      Assert.assertTrue(parser.getParseSpec() instanceof TimeAndDimsParseSpec);
+      Assert.assertEquals(
+          new HashSet<>(expectedDimensionsSpec.getDimensions()),
+          new HashSet<>(parser.getParseSpec().getDimensionsSpec().getDimensions())
+      );
+      final Set<AggregatorFactory> expectedAggregators = AGGREGATORS.values()
+                                                                    .stream()
+                                                                    .map(AggregatorFactory::getCombiningFactory)
+                                                                    .collect(Collectors.toSet());
+      Assert.assertEquals(expectedAggregators, new HashSet<>(Arrays.asList(dataSchema.getAggregators())));
+      Assert.assertEquals(
+          new ArbitraryGranularitySpec(
+              Granularities.NONE,
+              false,
+              Collections.singletonList(expectedSegmentIntervals.get(i))
+          ),
+          dataSchema.getGranularitySpec()
+      );
 
-    // assert tuningConfig
-    Assert.assertEquals(createTuningConfig(), ingestionSchema.getTuningConfig());
+      // assert ioConfig
+      final IndexIOConfig ioConfig = ingestionSchema.getIOConfig();
+      Assert.assertFalse(ioConfig.isAppendToExisting());
+      final FirehoseFactory firehoseFactory = ioConfig.getFirehoseFactory();
+      Assert.assertTrue(firehoseFactory instanceof IngestSegmentFirehoseFactory);
+      final IngestSegmentFirehoseFactory ingestSegmentFirehoseFactory = (IngestSegmentFirehoseFactory) firehoseFactory;
+      Assert.assertEquals(DATA_SOURCE, ingestSegmentFirehoseFactory.getDataSource());
+      Assert.assertEquals(expectedSegmentIntervals.get(i), ingestSegmentFirehoseFactory.getInterval());
+      Assert.assertNull(ingestSegmentFirehoseFactory.getDimensionsFilter());
+
+      // check the order of dimensions
+      Assert.assertEquals(
+          new HashSet<>(expectedDimensionsSpec.getDimensionNames()),
+          new HashSet<>(ingestSegmentFirehoseFactory.getDimensions())
+      );
+      // check the order of metrics
+      Assert.assertEquals(
+          Lists.newArrayList("agg_4", "agg_3", "agg_2", "agg_1", "agg_0"),
+          ingestSegmentFirehoseFactory.getMetrics()
+      );
+
+      // assert tuningConfig
+      Assert.assertEquals(createTuningConfig(), ingestionSchema.getTuningConfig());
+    }
   }
 
   private static class TestTaskToolbox extends TaskToolbox
diff --git a/integration-tests/src/test/java/io/druid/tests/indexer/ITCompactionTaskTest.java b/integration-tests/src/test/java/io/druid/tests/indexer/ITCompactionTaskTest.java
index c1005153998..1a951b2977a 100644
--- a/integration-tests/src/test/java/io/druid/tests/indexer/ITCompactionTaskTest.java
+++ b/integration-tests/src/test/java/io/druid/tests/indexer/ITCompactionTaskTest.java
@@ -36,24 +36,50 @@
   private static String INDEX_QUERIES_RESOURCE = "/indexer/wikipedia_index_queries.json";
   private static String INDEX_DATASOURCE = "wikipedia_index_test";
   private static String COMPACTION_TASK = "/indexer/wikipedia_compaction_task.json";
-  private static String COMPACTED_INTERVAL = "2013-08-31T00:00:00.000Z/2013-09-02T00:00:00.000Z";
 
   @Test
-  public void testCompaction() throws Exception
+  public void testCompactionWithoutKeepSegmentGranularity() throws Exception
   {
     loadData();
     final List<String> intervalsBeforeCompaction = coordinator.getSegmentIntervals(INDEX_DATASOURCE);
-    if (intervalsBeforeCompaction.contains(COMPACTED_INTERVAL)) {
-      throw new ISE("Containing a segment for the compacted interval[%s] before compaction", COMPACTED_INTERVAL);
+    final String compactedInterval = "2013-08-31T00:00:00.000Z/2013-09-02T00:00:00.000Z";
+    if (intervalsBeforeCompaction.contains(compactedInterval)) {
+      throw new ISE("Containing a segment for the compacted interval[%s] before compaction", compactedInterval);
     }
     try {
       queryHelper.testQueriesFromFile(INDEX_QUERIES_RESOURCE, 2);
-      compactData();
+      compactData(false);
       queryHelper.testQueriesFromFile(INDEX_QUERIES_RESOURCE, 2);
 
       final List<String> intervalsAfterCompaction = coordinator.getSegmentIntervals(INDEX_DATASOURCE);
-      if (!intervalsAfterCompaction.contains(COMPACTED_INTERVAL)) {
-        throw new ISE("Compacted segment for interval[%s] does not exist", COMPACTED_INTERVAL);
+      if (!intervalsAfterCompaction.contains(compactedInterval)) {
+        throw new ISE("Compacted segment for interval[%s] does not exist", compactedInterval);
+      }
+    }
+    finally {
+      unloadAndKillData(INDEX_DATASOURCE);
+    }
+  }
+
+  @Test
+  public void testCompactionWithKeepSegmentGranularity() throws Exception
+  {
+    loadData();
+    final List<String> intervalsBeforeCompaction = coordinator.getSegmentIntervals(INDEX_DATASOURCE);
+    try {
+      queryHelper.testQueriesFromFile(INDEX_QUERIES_RESOURCE, 2);
+      compactData(true);
+      queryHelper.testQueriesFromFile(INDEX_QUERIES_RESOURCE, 2);
+
+      final List<String> intervalsAfterCompaction = coordinator.getSegmentIntervals(INDEX_DATASOURCE);
+      intervalsBeforeCompaction.sort(null);
+      intervalsAfterCompaction.sort(null);
+      if (!intervalsBeforeCompaction.equals(intervalsAfterCompaction)) {
+        throw new ISE(
+            "Intervals before compaction[%s] should be same with those after compaction[%s]",
+            intervalsBeforeCompaction,
+            intervalsAfterCompaction
+        );
       }
     }
     finally {
@@ -73,9 +99,11 @@ private void loadData() throws Exception
     );
   }
 
-  private void compactData() throws Exception
+  private void compactData(boolean keepSegmentGranularity) throws Exception
   {
-    final String taskID = indexer.submitTask(getTaskAsString(COMPACTION_TASK));
+    final String template = getTaskAsString(COMPACTION_TASK);
+    final String taskSpec = template.replace("${KEEP_SEGMENT_GRANULARITY}", Boolean.toString(keepSegmentGranularity));
+    final String taskID = indexer.submitTask(taskSpec);
     LOG.info("TaskID for compaction task %s", taskID);
     indexer.waitUntilTaskCompletes(taskID);
 
diff --git a/integration-tests/src/test/resources/indexer/wikipedia_compaction_task.json b/integration-tests/src/test/resources/indexer/wikipedia_compaction_task.json
index bc7f786646f..3fdad69ff5d 100644
--- a/integration-tests/src/test/resources/indexer/wikipedia_compaction_task.json
+++ b/integration-tests/src/test/resources/indexer/wikipedia_compaction_task.json
@@ -1,5 +1,6 @@
 {
   "type" : "compact",
   "dataSource" : "wikipedia_index_test",
-  "interval" : "2013-08-31/2013-09-02"
+  "interval" : "2013-08-31/2013-09-02",
+  "keepSegmentGranularity" : ${KEEP_SEGMENT_GRANULARITY}
 }
\ No newline at end of file
diff --git a/integration-tests/src/test/resources/indexer/wikipedia_index_task.json b/integration-tests/src/test/resources/indexer/wikipedia_index_task.json
index af7b98a6a71..8b3eab89fb2 100644
--- a/integration-tests/src/test/resources/indexer/wikipedia_index_task.json
+++ b/integration-tests/src/test/resources/indexer/wikipedia_index_task.json
@@ -63,7 +63,8 @@
             }
         },
         "tuningConfig": {
-            "type": "index"
+            "type": "index",
+            "targetPartitionSize": 3
         }
     }
 }
\ No newline at end of file
diff --git a/server/src/main/java/io/druid/client/indexing/ClientCompactQuery.java b/server/src/main/java/io/druid/client/indexing/ClientCompactQuery.java
index e1e32922588..db40fecf439 100644
--- a/server/src/main/java/io/druid/client/indexing/ClientCompactQuery.java
+++ b/server/src/main/java/io/druid/client/indexing/ClientCompactQuery.java
@@ -30,6 +30,7 @@
 {
   private final String dataSource;
   private final List<DataSegment> segments;
+  private final boolean keepSegmentGranularity;
   private final ClientCompactQueryTuningConfig tuningConfig;
   private final Map<String, Object> context;
 
@@ -37,12 +38,14 @@
   public ClientCompactQuery(
       @JsonProperty("dataSource") String dataSource,
       @JsonProperty("segments") List<DataSegment> segments,
+      @JsonProperty("keepSegmentGranularity") boolean keepSegmentGranularity,
       @JsonProperty("tuningConfig") ClientCompactQueryTuningConfig tuningConfig,
       @JsonProperty("context") Map<String, Object> context
   )
   {
     this.dataSource = dataSource;
     this.segments = segments;
+    this.keepSegmentGranularity = keepSegmentGranularity;
     this.tuningConfig = tuningConfig;
     this.context = context;
   }
@@ -65,6 +68,12 @@ public String getDataSource()
     return segments;
   }
 
+  @JsonProperty
+  public boolean isKeepSegmentGranularity()
+  {
+    return keepSegmentGranularity;
+  }
+
   @JsonProperty
   public ClientCompactQueryTuningConfig getTuningConfig()
   {
diff --git a/server/src/main/java/io/druid/client/indexing/HttpIndexingServiceClient.java b/server/src/main/java/io/druid/client/indexing/HttpIndexingServiceClient.java
index b012db60442..c81bb0f5f4f 100644
--- a/server/src/main/java/io/druid/client/indexing/HttpIndexingServiceClient.java
+++ b/server/src/main/java/io/druid/client/indexing/HttpIndexingServiceClient.java
@@ -96,6 +96,7 @@ public void upgradeSegment(DataSegment dataSegment)
   @Override
   public String compactSegments(
       List<DataSegment> segments,
+      boolean keepSegmentGranularity,
       int compactionTaskPriority,
       @Nullable ClientCompactQueryTuningConfig tuningConfig,
       @Nullable Map<String, Object> context
@@ -112,7 +113,7 @@ public String compactSegments(
     context = context == null ? new HashMap<>() : context;
     context.put("priority", compactionTaskPriority);
 
-    return runTask(new ClientCompactQuery(dataSource, segments, tuningConfig, context));
+    return runTask(new ClientCompactQuery(dataSource, segments, keepSegmentGranularity, tuningConfig, context));
   }
 
   @Override
diff --git a/server/src/main/java/io/druid/client/indexing/IndexingServiceClient.java b/server/src/main/java/io/druid/client/indexing/IndexingServiceClient.java
index 80e47d0d48c..72c9794b366 100644
--- a/server/src/main/java/io/druid/client/indexing/IndexingServiceClient.java
+++ b/server/src/main/java/io/druid/client/indexing/IndexingServiceClient.java
@@ -40,6 +40,7 @@
 
   String compactSegments(
       List<DataSegment> segments,
+      boolean keepSegmentGranularity,
       int compactionTaskPriority,
       @Nullable ClientCompactQueryTuningConfig tuningConfig,
       @Nullable Map<String, Object> context
diff --git a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java
index 6e368ed21d6..4a3e4900d08 100644
--- a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java
+++ b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java
@@ -150,8 +150,11 @@ private CoordinatorStats doRun(
 
       if (segmentsToCompact.size() > 1) {
         final DataSourceCompactionConfig config = compactionConfigs.get(dataSourceName);
+        // Currently set keepSegmentGranularity to false because it breaks the algorithm of CompactionSegmentIterator to
+        // find segments to be compacted.
         final String taskId = indexingServiceClient.compactSegments(
             segmentsToCompact,
+            false,
             config.getTaskPriority(),
             config.getTuningConfig(),
             config.getTaskContext()
diff --git a/server/src/test/java/io/druid/client/indexing/NoopIndexingServiceClient.java b/server/src/test/java/io/druid/client/indexing/NoopIndexingServiceClient.java
index 030c991cbfe..e7889dc0e2d 100644
--- a/server/src/test/java/io/druid/client/indexing/NoopIndexingServiceClient.java
+++ b/server/src/test/java/io/druid/client/indexing/NoopIndexingServiceClient.java
@@ -57,6 +57,7 @@ public void mergeSegments(List<DataSegment> segments)
   @Override
   public String compactSegments(
       List<DataSegment> segments,
+      boolean keepSegmentGranularity,
       int compactionTaskPriority,
       @Nullable ClientCompactQueryTuningConfig tuningConfig,
       @Nullable Map<String, Object> context
diff --git a/server/src/test/java/io/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactorTest.java b/server/src/test/java/io/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactorTest.java
index 52ba82ce1ff..62f79b2c8d1 100644
--- a/server/src/test/java/io/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactorTest.java
+++ b/server/src/test/java/io/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactorTest.java
@@ -64,6 +64,7 @@
     @Override
     public String compactSegments(
         List<DataSegment> segments,
+        boolean keepSegmentGranularity,
         int compactionTaskPriority,
         ClientCompactQueryTuningConfig tuningConfig,
         Map<String, Object> context


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org