You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by gi...@apache.org on 2018/08/09 20:51:23 UTC
[incubator-druid] branch master updated: Add support
'keepSegmentGranularity' for compactionTask (#6095)
This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push:
new d6a02de Add support 'keepSegmentGranularity' for compactionTask (#6095)
d6a02de is described below
commit d6a02de5b57ab470cd0efd9cc101440dbfa2a6f3
Author: Jihoon Son <ji...@apache.org>
AuthorDate: Thu Aug 9 13:51:20 2018 -0700
Add support 'keepSegmentGranularity' for compactionTask (#6095)
* Add keepSegmentGranularity for compactionTask
* fix build
* createIoConfig method
* fix build
* fix build
* address comments
* fix build
---
.../io/druid/data/input/impl/DimensionsSpec.java | 4 +
.../druid/indexing/common/task/CompactionTask.java | 178 ++++++++-----
.../indexing/common/task/CompactionTaskTest.java | 289 +++++++++++++++------
.../druid/tests/indexer/ITCompactionTaskTest.java | 46 +++-
.../indexer/wikipedia_compaction_task.json | 3 +-
.../resources/indexer/wikipedia_index_task.json | 3 +-
.../druid/client/indexing/ClientCompactQuery.java | 9 +
.../client/indexing/HttpIndexingServiceClient.java | 3 +-
.../client/indexing/IndexingServiceClient.java | 1 +
.../helper/DruidCoordinatorSegmentCompactor.java | 3 +
.../client/indexing/NoopIndexingServiceClient.java | 1 +
.../DruidCoordinatorSegmentCompactorTest.java | 1 +
12 files changed, 388 insertions(+), 153 deletions(-)
diff --git a/api/src/main/java/io/druid/data/input/impl/DimensionsSpec.java b/api/src/main/java/io/druid/data/input/impl/DimensionsSpec.java
index 837a482..be80163 100644
--- a/api/src/main/java/io/druid/data/input/impl/DimensionsSpec.java
+++ b/api/src/main/java/io/druid/data/input/impl/DimensionsSpec.java
@@ -101,6 +101,10 @@ public class DimensionsSpec
}
}
+ public DimensionsSpec(List<DimensionSchema> dimensions)
+ {
+ this(dimensions, null, null);
+ }
@JsonProperty
public List<DimensionSchema> getDimensions()
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/CompactionTask.java
index f81999f..9f1fb56 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/CompactionTask.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/CompactionTask.java
@@ -28,7 +28,6 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
-import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import io.druid.data.input.impl.DimensionSchema;
import io.druid.data.input.impl.DimensionSchema.MultiValueHandling;
@@ -54,7 +53,7 @@ import io.druid.java.util.common.ISE;
import io.druid.java.util.common.JodaUtils;
import io.druid.java.util.common.Pair;
import io.druid.java.util.common.RE;
-import io.druid.java.util.common.granularity.NoneGranularity;
+import io.druid.java.util.common.granularity.Granularities;
import io.druid.java.util.common.guava.Comparators;
import io.druid.java.util.common.jackson.JacksonUtils;
import io.druid.java.util.common.logger.Logger;
@@ -96,17 +95,19 @@ public class CompactionTask extends AbstractTask
{
private static final Logger log = new Logger(CompactionTask.class);
private static final String TYPE = "compact";
+ private static final boolean DEFAULT_KEEP_SEGMENT_GRANULARITY = true;
private final Interval interval;
private final List<DataSegment> segments;
private final DimensionsSpec dimensionsSpec;
+ private final boolean keepSegmentGranularity;
private final IndexTuningConfig tuningConfig;
private final ObjectMapper jsonMapper;
@JsonIgnore
private final SegmentProvider segmentProvider;
@JsonIgnore
- private IndexTask indexTaskSpec;
+ private List<IndexTask> indexTaskSpecs;
@JsonIgnore
private final AuthorizerMapper authorizerMapper;
@@ -125,6 +126,7 @@ public class CompactionTask extends AbstractTask
@Nullable @JsonProperty("interval") final Interval interval,
@Nullable @JsonProperty("segments") final List<DataSegment> segments,
@Nullable @JsonProperty("dimensions") final DimensionsSpec dimensionsSpec,
+ @Nullable @JsonProperty("keepSegmentGranularity") final Boolean keepSegmentGranularity,
@Nullable @JsonProperty("tuningConfig") final IndexTuningConfig tuningConfig,
@Nullable @JsonProperty("context") final Map<String, Object> context,
@JacksonInject ObjectMapper jsonMapper,
@@ -144,6 +146,9 @@ public class CompactionTask extends AbstractTask
this.interval = interval;
this.segments = segments;
this.dimensionsSpec = dimensionsSpec;
+ this.keepSegmentGranularity = keepSegmentGranularity == null
+ ? DEFAULT_KEEP_SEGMENT_GRANULARITY
+ : keepSegmentGranularity;
this.tuningConfig = tuningConfig;
this.jsonMapper = jsonMapper;
this.segmentProvider = segments == null ? new SegmentProvider(dataSource, interval) : new SegmentProvider(segments);
@@ -171,6 +176,12 @@ public class CompactionTask extends AbstractTask
}
@JsonProperty
+ public boolean isKeepSegmentGranularity()
+ {
+ return keepSegmentGranularity;
+ }
+
+ @JsonProperty
public IndexTuningConfig getTuningConfig()
{
return tuningConfig;
@@ -205,52 +216,65 @@ public class CompactionTask extends AbstractTask
@Override
public TaskStatus run(final TaskToolbox toolbox) throws Exception
{
- if (indexTaskSpec == null) {
- final IndexIngestionSpec ingestionSpec = createIngestionSchema(
+ if (indexTaskSpecs == null) {
+ indexTaskSpecs = createIngestionSchema(
toolbox,
segmentProvider,
dimensionsSpec,
+ keepSegmentGranularity,
tuningConfig,
jsonMapper
- );
-
- if (ingestionSpec != null) {
- indexTaskSpec = new IndexTask(
- getId(),
- getGroupId(),
- getTaskResource(),
- getDataSource(),
- ingestionSpec,
- getContext(),
- authorizerMapper,
- chatHandlerProvider,
- rowIngestionMetersFactory
- );
- }
+ ).stream()
+ .map(spec -> new IndexTask(
+ getId(),
+ getGroupId(),
+ getTaskResource(),
+ getDataSource(),
+ spec,
+ getContext(),
+ authorizerMapper,
+ chatHandlerProvider,
+ rowIngestionMetersFactory
+ ))
+ .collect(Collectors.toList());
}
- if (indexTaskSpec == null) {
+ if (indexTaskSpecs.isEmpty()) {
log.warn("Interval[%s] has no segments, nothing to do.", interval);
return TaskStatus.failure(getId());
} else {
- final String json = jsonMapper.writerWithDefaultPrettyPrinter().writeValueAsString(indexTaskSpec);
- log.info("Generated compaction task details: " + json);
+ log.info("Generated [%d] compaction task specs", indexTaskSpecs.size());
- return indexTaskSpec.run(toolbox);
+ for (IndexTask eachSpec : indexTaskSpecs) {
+ final String json = jsonMapper.writerWithDefaultPrettyPrinter().writeValueAsString(eachSpec);
+ log.info("Running indexSpec: " + json);
+
+ try {
+ final TaskStatus eachResult = eachSpec.run(toolbox);
+ if (!eachResult.isSuccess()) {
+ log.warn("Failed to run indexSpec: [%s].\nTrying the next indexSpec.", json);
+ }
+ }
+ catch (Exception e) {
+ log.warn(e, "Failed to run indexSpec: [%s].\nTrying the next indexSpec.", json);
+ }
+ }
+
+ return TaskStatus.success(getId());
}
}
/**
* Generate {@link IndexIngestionSpec} from input segments.
*
- * @return null if input segments don't exist. Otherwise, a generated ingestionSpec.
+ * @return an empty list if input segments don't exist. Otherwise, a generated ingestionSpec.
*/
- @Nullable
@VisibleForTesting
- static IndexIngestionSpec createIngestionSchema(
+ static List<IndexIngestionSpec> createIngestionSchema(
TaskToolbox toolbox,
SegmentProvider segmentProvider,
DimensionsSpec dimensionsSpec,
+ boolean keepSegmentGranularity,
IndexTuningConfig tuningConfig,
ObjectMapper jsonMapper
) throws IOException, SegmentLoadingException
@@ -263,33 +287,68 @@ public class CompactionTask extends AbstractTask
final List<TimelineObjectHolder<String, DataSegment>> timelineSegments = pair.rhs;
if (timelineSegments.size() == 0) {
- return null;
+ return Collections.emptyList();
}
- final DataSchema dataSchema = createDataSchema(
- segmentProvider.dataSource,
- segmentProvider.interval,
- dimensionsSpec,
- toolbox.getIndexIO(),
- jsonMapper,
- timelineSegments,
- segmentFileMap
- );
- return new IndexIngestionSpec(
- dataSchema,
- new IndexIOConfig(
- new IngestSegmentFirehoseFactory(
- segmentProvider.dataSource,
- segmentProvider.interval,
- null, // no filter
- // set dimensions and metrics names to make sure that the generated dataSchema is used for the firehose
- dataSchema.getParser().getParseSpec().getDimensionsSpec().getDimensionNames(),
- Arrays.stream(dataSchema.getAggregators()).map(AggregatorFactory::getName).collect(Collectors.toList()),
- toolbox.getIndexIO()
- ),
- false
+ if (keepSegmentGranularity) {
+ // if keepSegmentGranularity = true, create indexIngestionSpec per segment interval, so that we can run an index
+ // task per segment interval.
+ final List<IndexIngestionSpec> specs = new ArrayList<>(timelineSegments.size());
+ for (TimelineObjectHolder<String, DataSegment> holder : timelineSegments) {
+ final DataSchema dataSchema = createDataSchema(
+ segmentProvider.dataSource,
+ holder.getInterval(),
+ Collections.singletonList(holder),
+ dimensionsSpec,
+ toolbox.getIndexIO(),
+ jsonMapper,
+ segmentFileMap
+ );
+
+ specs.add(
+ new IndexIngestionSpec(
+ dataSchema,
+ createIoConfig(toolbox, dataSchema, holder.getInterval()),
+ tuningConfig
+ )
+ );
+ }
+
+ return specs;
+ } else {
+ final DataSchema dataSchema = createDataSchema(
+ segmentProvider.dataSource,
+ segmentProvider.interval,
+ timelineSegments,
+ dimensionsSpec,
+ toolbox.getIndexIO(),
+ jsonMapper,
+ segmentFileMap
+ );
+
+ return Collections.singletonList(
+ new IndexIngestionSpec(
+ dataSchema,
+ createIoConfig(toolbox, dataSchema, segmentProvider.interval),
+ tuningConfig
+ )
+ );
+ }
+ }
+
+ private static IndexIOConfig createIoConfig(TaskToolbox toolbox, DataSchema dataSchema, Interval interval)
+ {
+ return new IndexIOConfig(
+ new IngestSegmentFirehoseFactory(
+ dataSchema.getDataSource(),
+ interval,
+ null, // no filter
+ // set dimensions and metrics names to make sure that the generated dataSchema is used for the firehose
+ dataSchema.getParser().getParseSpec().getDimensionsSpec().getDimensionNames(),
+ Arrays.stream(dataSchema.getAggregators()).map(AggregatorFactory::getName).collect(Collectors.toList()),
+ toolbox.getIndexIO()
),
- tuningConfig
+ false
);
}
@@ -308,18 +367,18 @@ public class CompactionTask extends AbstractTask
private static DataSchema createDataSchema(
String dataSource,
- Interval interval,
+ Interval totalInterval,
+ List<TimelineObjectHolder<String, DataSegment>> timelineObjectHolder,
DimensionsSpec dimensionsSpec,
IndexIO indexIO,
ObjectMapper jsonMapper,
- List<TimelineObjectHolder<String, DataSegment>> timelineSegments,
Map<DataSegment, File> segmentFileMap
)
throws IOException
{
// find metadata for interval
final List<Pair<QueryableIndex, DataSegment>> queryableIndexAndSegments = loadSegments(
- timelineSegments,
+ timelineObjectHolder,
segmentFileMap,
indexIO
);
@@ -348,10 +407,11 @@ public class CompactionTask extends AbstractTask
final Boolean isRollup = pair.lhs.getMetadata().isRollup();
return isRollup != null && isRollup;
});
+
final GranularitySpec granularitySpec = new ArbitraryGranularitySpec(
- new NoneGranularity(),
+ Granularities.NONE,
rollup,
- ImmutableList.of(interval)
+ Collections.singletonList(totalInterval)
);
// find unique dimensions
@@ -444,15 +504,15 @@ public class CompactionTask extends AbstractTask
}
private static List<Pair<QueryableIndex, DataSegment>> loadSegments(
- List<TimelineObjectHolder<String, DataSegment>> timelineSegments,
+ List<TimelineObjectHolder<String, DataSegment>> timelineObjectHolders,
Map<DataSegment, File> segmentFileMap,
IndexIO indexIO
) throws IOException
{
final List<Pair<QueryableIndex, DataSegment>> segments = new ArrayList<>();
- for (TimelineObjectHolder<String, DataSegment> timelineSegment : timelineSegments) {
- final PartitionHolder<DataSegment> partitionHolder = timelineSegment.getObject();
+ for (TimelineObjectHolder<String, DataSegment> timelineObjectHolder : timelineObjectHolders) {
+ final PartitionHolder<DataSegment> partitionHolder = timelineObjectHolder.getObject();
for (PartitionChunk<DataSegment> chunk : partitionHolder) {
final DataSegment segment = chunk.getObject();
final QueryableIndex queryableIndex = indexIO.loadIndex(
diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/CompactionTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/CompactionTaskTest.java
index 2288393..6277a9e 100644
--- a/indexing-service/src/test/java/io/druid/indexing/common/task/CompactionTaskTest.java
+++ b/indexing-service/src/test/java/io/druid/indexing/common/task/CompactionTaskTest.java
@@ -98,11 +98,16 @@ import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -110,13 +115,22 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+@RunWith(Parameterized.class)
public class CompactionTaskTest
{
private static final String DATA_SOURCE = "dataSource";
private static final String TIMESTAMP_COLUMN = "timestamp";
private static final String MIXED_TYPE_COLUMN = "string_to_double";
private static final Interval COMPACTION_INTERVAL = Intervals.of("2017-01-01/2017-06-01");
+ private static final List<Interval> SEGMENT_INTERVALS = ImmutableList.of(
+ Intervals.of("2017-01-01/2017-02-01"),
+ Intervals.of("2017-02-01/2017-03-01"),
+ Intervals.of("2017-03-01/2017-04-01"),
+ Intervals.of("2017-04-01/2017-05-01"),
+ Intervals.of("2017-05-01/2017-06-01")
+ );
private static final Map<Interval, DimensionSchema> MIXED_TYPE_COLUMN_MAP = ImmutableMap.of(
Intervals.of("2017-01-01/2017-02-01"),
new StringDimensionSchema(MIXED_TYPE_COLUMN),
@@ -138,6 +152,8 @@ public class CompactionTaskTest
private static ObjectMapper objectMapper = setupInjectablesInObjectMapper(new DefaultObjectMapper());
private static Map<DataSegment, File> segmentMap;
+ private final boolean keepSegmentGranularity;
+
private TaskToolbox toolbox;
@BeforeClass
@@ -288,6 +304,20 @@ public class CompactionTaskTest
);
}
+ @Parameters(name = "keepSegmentGranularity={0}")
+ public static Collection<Object[]> parameters()
+ {
+ return ImmutableList.of(
+ new Object[] {false},
+ new Object[] {true}
+ );
+ }
+
+ public CompactionTaskTest(boolean keepSegmentGranularity)
+ {
+ this.keepSegmentGranularity = keepSegmentGranularity;
+ }
+
@Test
public void testSerdeWithInterval() throws IOException
{
@@ -298,6 +328,7 @@ public class CompactionTaskTest
COMPACTION_INTERVAL,
null,
null,
+ null,
createTuningConfig(),
ImmutableMap.of("testKey", "testContext"),
objectMapper,
@@ -327,6 +358,7 @@ public class CompactionTaskTest
null,
SEGMENTS,
null,
+ null,
createTuningConfig(),
ImmutableMap.of("testKey", "testContext"),
objectMapper,
@@ -346,18 +378,51 @@ public class CompactionTaskTest
}
@Test
- public void testCreateIngestionSchema() throws IOException, SegmentLoadingException
+ public void testCreateIngestionSchemaWithKeepSegmentGranularity() throws IOException, SegmentLoadingException
+ {
+ final List<IndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
+ toolbox,
+ new SegmentProvider(DATA_SOURCE, COMPACTION_INTERVAL),
+ null,
+ keepSegmentGranularity,
+ TUNING_CONFIG,
+ objectMapper
+ );
+ final List<DimensionsSpec> expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration(
+ keepSegmentGranularity
+ );
+
+ if (keepSegmentGranularity) {
+ Assert.assertEquals(5, ingestionSpecs.size());
+ assertIngestionSchema(ingestionSpecs, expectedDimensionsSpec, SEGMENT_INTERVALS);
+ } else {
+ Assert.assertEquals(1, ingestionSpecs.size());
+ assertIngestionSchema(ingestionSpecs, expectedDimensionsSpec, Collections.singletonList(COMPACTION_INTERVAL));
+ }
+ }
+
+ @Test
+ public void testCreateIngestionSchemaWithIgnoreSegmentGranularity() throws IOException, SegmentLoadingException
{
- final IndexIngestionSpec ingestionSchema = CompactionTask.createIngestionSchema(
+ final List<IndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
toolbox,
new SegmentProvider(DATA_SOURCE, COMPACTION_INTERVAL),
null,
+ keepSegmentGranularity,
TUNING_CONFIG,
objectMapper
);
- final DimensionsSpec expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration();
+ final List<DimensionsSpec> expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration(
+ keepSegmentGranularity
+ );
- assertIngestionSchema(ingestionSchema, expectedDimensionsSpec);
+ if (keepSegmentGranularity) {
+ Assert.assertEquals(5, ingestionSpecs.size());
+ assertIngestionSchema(ingestionSpecs, expectedDimensionsSpec, SEGMENT_INTERVALS);
+ } else {
+ Assert.assertEquals(1, ingestionSpecs.size());
+ assertIngestionSchema(ingestionSpecs, expectedDimensionsSpec, Collections.singletonList(COMPACTION_INTERVAL));
+ }
}
@Test
@@ -387,35 +452,59 @@ public class CompactionTaskTest
new DoubleDimensionSchema("double_dim_3"),
new DoubleDimensionSchema("double_dim_4"),
new StringDimensionSchema(MIXED_TYPE_COLUMN)
- ),
- null,
- null
+ )
);
- final IndexIngestionSpec ingestionSchema = CompactionTask.createIngestionSchema(
+ final List<IndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
toolbox,
new SegmentProvider(DATA_SOURCE, COMPACTION_INTERVAL),
customSpec,
+ keepSegmentGranularity,
TUNING_CONFIG,
objectMapper
);
- assertIngestionSchema(ingestionSchema, customSpec);
+ if (keepSegmentGranularity) {
+ Assert.assertEquals(5, ingestionSpecs.size());
+ final List<DimensionsSpec> dimensionsSpecs = new ArrayList<>(5);
+ IntStream.range(0, 5).forEach(i -> dimensionsSpecs.add(customSpec));
+ assertIngestionSchema(
+ ingestionSpecs,
+ dimensionsSpecs,
+ SEGMENT_INTERVALS
+ );
+ } else {
+ Assert.assertEquals(1, ingestionSpecs.size());
+ assertIngestionSchema(
+ ingestionSpecs,
+ Collections.singletonList(customSpec),
+ Collections.singletonList(COMPACTION_INTERVAL)
+ );
+ }
}
@Test
public void testCreateIngestionSchemaWithCustomSegments() throws IOException, SegmentLoadingException
{
- final IndexIngestionSpec ingestionSchema = CompactionTask.createIngestionSchema(
+ final List<IndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
toolbox,
new SegmentProvider(SEGMENTS),
null,
+ keepSegmentGranularity,
TUNING_CONFIG,
objectMapper
);
- final DimensionsSpec expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration();
+ final List<DimensionsSpec> expectedDimensionsSpec = getExpectedDimensionsSpecForAutoGeneration(
+ keepSegmentGranularity
+ );
- assertIngestionSchema(ingestionSchema, expectedDimensionsSpec);
+ if (keepSegmentGranularity) {
+ Assert.assertEquals(5, ingestionSpecs.size());
+ assertIngestionSchema(ingestionSpecs, expectedDimensionsSpec, SEGMENT_INTERVALS);
+ } else {
+ Assert.assertEquals(1, ingestionSpecs.size());
+ assertIngestionSchema(ingestionSpecs, expectedDimensionsSpec, Collections.singletonList(COMPACTION_INTERVAL));
+ }
}
@Test
@@ -430,6 +519,7 @@ public class CompactionTaskTest
toolbox,
new SegmentProvider(segments),
null,
+ keepSegmentGranularity,
TUNING_CONFIG,
objectMapper
);
@@ -448,13 +538,14 @@ public class CompactionTaskTest
toolbox,
new SegmentProvider(segments),
null,
+ keepSegmentGranularity,
TUNING_CONFIG,
objectMapper
);
}
@Test
- public void testEmptyInterval() throws Exception
+ public void testEmptyInterval()
{
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage(CoreMatchers.containsString("must specify a nonempty interval"));
@@ -468,6 +559,7 @@ public class CompactionTaskTest
null,
null,
null,
+ null,
objectMapper,
AuthTestUtils.TEST_AUTHORIZER_MAPPER,
new NoopChatHandlerProvider(),
@@ -475,85 +567,118 @@ public class CompactionTaskTest
);
}
- private static DimensionsSpec getExpectedDimensionsSpecForAutoGeneration()
+ private static List<DimensionsSpec> getExpectedDimensionsSpecForAutoGeneration(boolean keepSegmentGranularity)
{
- return new DimensionsSpec(
- Lists.newArrayList(
- new LongDimensionSchema("timestamp"),
- new StringDimensionSchema("string_dim_4"),
- new LongDimensionSchema("long_dim_4"),
- new FloatDimensionSchema("float_dim_4"),
- new DoubleDimensionSchema("double_dim_4"),
- new StringDimensionSchema("string_dim_0"),
- new LongDimensionSchema("long_dim_0"),
- new FloatDimensionSchema("float_dim_0"),
- new DoubleDimensionSchema("double_dim_0"),
- new StringDimensionSchema("string_dim_1"),
- new LongDimensionSchema("long_dim_1"),
- new FloatDimensionSchema("float_dim_1"),
- new DoubleDimensionSchema("double_dim_1"),
- new StringDimensionSchema("string_dim_2"),
- new LongDimensionSchema("long_dim_2"),
- new FloatDimensionSchema("float_dim_2"),
- new DoubleDimensionSchema("double_dim_2"),
- new StringDimensionSchema("string_dim_3"),
- new LongDimensionSchema("long_dim_3"),
- new FloatDimensionSchema("float_dim_3"),
- new DoubleDimensionSchema("double_dim_3"),
- new DoubleDimensionSchema("string_to_double")
- ),
- null,
- null
+ if (keepSegmentGranularity) {
+ return ImmutableList.of(
+ new DimensionsSpec(getDimensionSchema(new StringDimensionSchema("string_to_double"))),
+ new DimensionsSpec(getDimensionSchema(new StringDimensionSchema("string_to_double"))),
+ new DimensionsSpec(getDimensionSchema(new StringDimensionSchema("string_to_double"))),
+ new DimensionsSpec(getDimensionSchema(new StringDimensionSchema("string_to_double"))),
+ new DimensionsSpec(getDimensionSchema(new DoubleDimensionSchema("string_to_double")))
+ );
+ } else {
+ return Collections.singletonList(
+ new DimensionsSpec(getDimensionSchema(new DoubleDimensionSchema("string_to_double")))
+ );
+ }
+ }
+
+ private static List<DimensionSchema> getDimensionSchema(DimensionSchema mixedTypeColumn)
+ {
+ return Lists.newArrayList(
+ new LongDimensionSchema("timestamp"),
+ new StringDimensionSchema("string_dim_4"),
+ new LongDimensionSchema("long_dim_4"),
+ new FloatDimensionSchema("float_dim_4"),
+ new DoubleDimensionSchema("double_dim_4"),
+ new StringDimensionSchema("string_dim_0"),
+ new LongDimensionSchema("long_dim_0"),
+ new FloatDimensionSchema("float_dim_0"),
+ new DoubleDimensionSchema("double_dim_0"),
+ new StringDimensionSchema("string_dim_1"),
+ new LongDimensionSchema("long_dim_1"),
+ new FloatDimensionSchema("float_dim_1"),
+ new DoubleDimensionSchema("double_dim_1"),
+ new StringDimensionSchema("string_dim_2"),
+ new LongDimensionSchema("long_dim_2"),
+ new FloatDimensionSchema("float_dim_2"),
+ new DoubleDimensionSchema("double_dim_2"),
+ new StringDimensionSchema("string_dim_3"),
+ new LongDimensionSchema("long_dim_3"),
+ new FloatDimensionSchema("float_dim_3"),
+ new DoubleDimensionSchema("double_dim_3"),
+ mixedTypeColumn
);
}
private static void assertIngestionSchema(
- IndexIngestionSpec ingestionSchema,
- DimensionsSpec expectedDimensionsSpec
+ List<IndexIngestionSpec> ingestionSchemas,
+ List<DimensionsSpec> expectedDimensionsSpecs,
+ List<Interval> expectedSegmentIntervals
)
{
- // assert dataSchema
- final DataSchema dataSchema = ingestionSchema.getDataSchema();
- Assert.assertEquals(DATA_SOURCE, dataSchema.getDataSource());
-
- final InputRowParser parser = objectMapper.convertValue(dataSchema.getParser(), InputRowParser.class);
- Assert.assertTrue(parser instanceof TransformingInputRowParser);
- Assert.assertTrue(((TransformingInputRowParser) parser).getParser() instanceof NoopInputRowParser);
- Assert.assertTrue(parser.getParseSpec() instanceof TimeAndDimsParseSpec);
- Assert.assertEquals(
- new HashSet<>(expectedDimensionsSpec.getDimensions()),
- new HashSet<>(parser.getParseSpec().getDimensionsSpec().getDimensions())
- );
- final Set<AggregatorFactory> expectedAggregators = AGGREGATORS.values()
- .stream()
- .map(AggregatorFactory::getCombiningFactory)
- .collect(Collectors.toSet());
- Assert.assertEquals(expectedAggregators, new HashSet<>(Arrays.asList(dataSchema.getAggregators())));
- Assert.assertEquals(
- new ArbitraryGranularitySpec(Granularities.NONE, false, ImmutableList.of(COMPACTION_INTERVAL)),
- dataSchema.getGranularitySpec()
+ Preconditions.checkArgument(
+ ingestionSchemas.size() == expectedDimensionsSpecs.size(),
+ "ingesionSchemas.size()[%s] should be same with expectedDimensionsSpecs.size()[%s]",
+ ingestionSchemas.size(),
+ expectedDimensionsSpecs.size()
);
- // assert ioConfig
- final IndexIOConfig ioConfig = ingestionSchema.getIOConfig();
- Assert.assertFalse(ioConfig.isAppendToExisting());
- final FirehoseFactory firehoseFactory = ioConfig.getFirehoseFactory();
- Assert.assertTrue(firehoseFactory instanceof IngestSegmentFirehoseFactory);
- final IngestSegmentFirehoseFactory ingestSegmentFirehoseFactory = (IngestSegmentFirehoseFactory) firehoseFactory;
- Assert.assertEquals(DATA_SOURCE, ingestSegmentFirehoseFactory.getDataSource());
- Assert.assertEquals(COMPACTION_INTERVAL, ingestSegmentFirehoseFactory.getInterval());
- Assert.assertNull(ingestSegmentFirehoseFactory.getDimensionsFilter());
-
- // check the order of dimensions
- Assert.assertEquals(expectedDimensionsSpec.getDimensionNames(), ingestSegmentFirehoseFactory.getDimensions());
- // check the order of metrics
- Assert.assertEquals(
- Lists.newArrayList("agg_4", "agg_3", "agg_2", "agg_1", "agg_0"),
- ingestSegmentFirehoseFactory.getMetrics()
- );
+ for (int i = 0; i < ingestionSchemas.size(); i++) {
+ final IndexIngestionSpec ingestionSchema = ingestionSchemas.get(i);
+ final DimensionsSpec expectedDimensionsSpec = expectedDimensionsSpecs.get(i);
+
+ // assert dataSchema
+ final DataSchema dataSchema = ingestionSchema.getDataSchema();
+ Assert.assertEquals(DATA_SOURCE, dataSchema.getDataSource());
+
+ final InputRowParser parser = objectMapper.convertValue(dataSchema.getParser(), InputRowParser.class);
+ Assert.assertTrue(parser instanceof TransformingInputRowParser);
+ Assert.assertTrue(((TransformingInputRowParser) parser).getParser() instanceof NoopInputRowParser);
+ Assert.assertTrue(parser.getParseSpec() instanceof TimeAndDimsParseSpec);
+ Assert.assertEquals(
+ new HashSet<>(expectedDimensionsSpec.getDimensions()),
+ new HashSet<>(parser.getParseSpec().getDimensionsSpec().getDimensions())
+ );
+ final Set<AggregatorFactory> expectedAggregators = AGGREGATORS.values()
+ .stream()
+ .map(AggregatorFactory::getCombiningFactory)
+ .collect(Collectors.toSet());
+ Assert.assertEquals(expectedAggregators, new HashSet<>(Arrays.asList(dataSchema.getAggregators())));
+ Assert.assertEquals(
+ new ArbitraryGranularitySpec(
+ Granularities.NONE,
+ false,
+ Collections.singletonList(expectedSegmentIntervals.get(i))
+ ),
+ dataSchema.getGranularitySpec()
+ );
- // assert tuningConfig
- Assert.assertEquals(createTuningConfig(), ingestionSchema.getTuningConfig());
+ // assert ioConfig
+ final IndexIOConfig ioConfig = ingestionSchema.getIOConfig();
+ Assert.assertFalse(ioConfig.isAppendToExisting());
+ final FirehoseFactory firehoseFactory = ioConfig.getFirehoseFactory();
+ Assert.assertTrue(firehoseFactory instanceof IngestSegmentFirehoseFactory);
+ final IngestSegmentFirehoseFactory ingestSegmentFirehoseFactory = (IngestSegmentFirehoseFactory) firehoseFactory;
+ Assert.assertEquals(DATA_SOURCE, ingestSegmentFirehoseFactory.getDataSource());
+ Assert.assertEquals(expectedSegmentIntervals.get(i), ingestSegmentFirehoseFactory.getInterval());
+ Assert.assertNull(ingestSegmentFirehoseFactory.getDimensionsFilter());
+
+ // check the order of dimensions
+ Assert.assertEquals(
+ new HashSet<>(expectedDimensionsSpec.getDimensionNames()),
+ new HashSet<>(ingestSegmentFirehoseFactory.getDimensions())
+ );
+ // check the order of metrics
+ Assert.assertEquals(
+ Lists.newArrayList("agg_4", "agg_3", "agg_2", "agg_1", "agg_0"),
+ ingestSegmentFirehoseFactory.getMetrics()
+ );
+
+ // assert tuningConfig
+ Assert.assertEquals(createTuningConfig(), ingestionSchema.getTuningConfig());
+ }
}
private static class TestTaskToolbox extends TaskToolbox
diff --git a/integration-tests/src/test/java/io/druid/tests/indexer/ITCompactionTaskTest.java b/integration-tests/src/test/java/io/druid/tests/indexer/ITCompactionTaskTest.java
index c100515..1a951b2 100644
--- a/integration-tests/src/test/java/io/druid/tests/indexer/ITCompactionTaskTest.java
+++ b/integration-tests/src/test/java/io/druid/tests/indexer/ITCompactionTaskTest.java
@@ -36,24 +36,50 @@ public class ITCompactionTaskTest extends AbstractIndexerTest
private static String INDEX_QUERIES_RESOURCE = "/indexer/wikipedia_index_queries.json";
private static String INDEX_DATASOURCE = "wikipedia_index_test";
private static String COMPACTION_TASK = "/indexer/wikipedia_compaction_task.json";
- private static String COMPACTED_INTERVAL = "2013-08-31T00:00:00.000Z/2013-09-02T00:00:00.000Z";
@Test
- public void testCompaction() throws Exception
+ public void testCompactionWithoutKeepSegmentGranularity() throws Exception
{
loadData();
final List<String> intervalsBeforeCompaction = coordinator.getSegmentIntervals(INDEX_DATASOURCE);
- if (intervalsBeforeCompaction.contains(COMPACTED_INTERVAL)) {
- throw new ISE("Containing a segment for the compacted interval[%s] before compaction", COMPACTED_INTERVAL);
+ final String compactedInterval = "2013-08-31T00:00:00.000Z/2013-09-02T00:00:00.000Z";
+ if (intervalsBeforeCompaction.contains(compactedInterval)) {
+ throw new ISE("Containing a segment for the compacted interval[%s] before compaction", compactedInterval);
}
try {
queryHelper.testQueriesFromFile(INDEX_QUERIES_RESOURCE, 2);
- compactData();
+ compactData(false);
queryHelper.testQueriesFromFile(INDEX_QUERIES_RESOURCE, 2);
final List<String> intervalsAfterCompaction = coordinator.getSegmentIntervals(INDEX_DATASOURCE);
- if (!intervalsAfterCompaction.contains(COMPACTED_INTERVAL)) {
- throw new ISE("Compacted segment for interval[%s] does not exist", COMPACTED_INTERVAL);
+ if (!intervalsAfterCompaction.contains(compactedInterval)) {
+ throw new ISE("Compacted segment for interval[%s] does not exist", compactedInterval);
+ }
+ }
+ finally {
+ unloadAndKillData(INDEX_DATASOURCE);
+ }
+ }
+
+ @Test
+ public void testCompactionWithKeepSegmentGranularity() throws Exception
+ {
+ loadData();
+ final List<String> intervalsBeforeCompaction = coordinator.getSegmentIntervals(INDEX_DATASOURCE);
+ try {
+ queryHelper.testQueriesFromFile(INDEX_QUERIES_RESOURCE, 2);
+ compactData(true);
+ queryHelper.testQueriesFromFile(INDEX_QUERIES_RESOURCE, 2);
+
+ final List<String> intervalsAfterCompaction = coordinator.getSegmentIntervals(INDEX_DATASOURCE);
+ intervalsBeforeCompaction.sort(null);
+ intervalsAfterCompaction.sort(null);
+ if (!intervalsBeforeCompaction.equals(intervalsAfterCompaction)) {
+ throw new ISE(
+ "Intervals before compaction[%s] should be same with those after compaction[%s]",
+ intervalsBeforeCompaction,
+ intervalsAfterCompaction
+ );
}
}
finally {
@@ -73,9 +99,11 @@ public class ITCompactionTaskTest extends AbstractIndexerTest
);
}
- private void compactData() throws Exception
+ private void compactData(boolean keepSegmentGranularity) throws Exception
{
- final String taskID = indexer.submitTask(getTaskAsString(COMPACTION_TASK));
+ final String template = getTaskAsString(COMPACTION_TASK);
+ final String taskSpec = template.replace("${KEEP_SEGMENT_GRANULARITY}", Boolean.toString(keepSegmentGranularity));
+ final String taskID = indexer.submitTask(taskSpec);
LOG.info("TaskID for compaction task %s", taskID);
indexer.waitUntilTaskCompletes(taskID);
diff --git a/integration-tests/src/test/resources/indexer/wikipedia_compaction_task.json b/integration-tests/src/test/resources/indexer/wikipedia_compaction_task.json
index bc7f786..3fdad69 100644
--- a/integration-tests/src/test/resources/indexer/wikipedia_compaction_task.json
+++ b/integration-tests/src/test/resources/indexer/wikipedia_compaction_task.json
@@ -1,5 +1,6 @@
{
"type" : "compact",
"dataSource" : "wikipedia_index_test",
- "interval" : "2013-08-31/2013-09-02"
+ "interval" : "2013-08-31/2013-09-02",
+ "keepSegmentGranularity" : ${KEEP_SEGMENT_GRANULARITY}
}
\ No newline at end of file
diff --git a/integration-tests/src/test/resources/indexer/wikipedia_index_task.json b/integration-tests/src/test/resources/indexer/wikipedia_index_task.json
index af7b98a..8b3eab8 100644
--- a/integration-tests/src/test/resources/indexer/wikipedia_index_task.json
+++ b/integration-tests/src/test/resources/indexer/wikipedia_index_task.json
@@ -63,7 +63,8 @@
}
},
"tuningConfig": {
- "type": "index"
+ "type": "index",
+ "targetPartitionSize": 3
}
}
}
\ No newline at end of file
diff --git a/server/src/main/java/io/druid/client/indexing/ClientCompactQuery.java b/server/src/main/java/io/druid/client/indexing/ClientCompactQuery.java
index e1e3292..db40fec 100644
--- a/server/src/main/java/io/druid/client/indexing/ClientCompactQuery.java
+++ b/server/src/main/java/io/druid/client/indexing/ClientCompactQuery.java
@@ -30,6 +30,7 @@ public class ClientCompactQuery
{
private final String dataSource;
private final List<DataSegment> segments;
+ private final boolean keepSegmentGranularity;
private final ClientCompactQueryTuningConfig tuningConfig;
private final Map<String, Object> context;
@@ -37,12 +38,14 @@ public class ClientCompactQuery
public ClientCompactQuery(
@JsonProperty("dataSource") String dataSource,
@JsonProperty("segments") List<DataSegment> segments,
+ @JsonProperty("keepSegmentGranularity") boolean keepSegmentGranularity,
@JsonProperty("tuningConfig") ClientCompactQueryTuningConfig tuningConfig,
@JsonProperty("context") Map<String, Object> context
)
{
this.dataSource = dataSource;
this.segments = segments;
+ this.keepSegmentGranularity = keepSegmentGranularity;
this.tuningConfig = tuningConfig;
this.context = context;
}
@@ -66,6 +69,12 @@ public class ClientCompactQuery
}
@JsonProperty
+ public boolean isKeepSegmentGranularity()
+ {
+ return keepSegmentGranularity;
+ }
+
+ @JsonProperty
public ClientCompactQueryTuningConfig getTuningConfig()
{
return tuningConfig;
diff --git a/server/src/main/java/io/druid/client/indexing/HttpIndexingServiceClient.java b/server/src/main/java/io/druid/client/indexing/HttpIndexingServiceClient.java
index b012db6..c81bb0f 100644
--- a/server/src/main/java/io/druid/client/indexing/HttpIndexingServiceClient.java
+++ b/server/src/main/java/io/druid/client/indexing/HttpIndexingServiceClient.java
@@ -96,6 +96,7 @@ public class HttpIndexingServiceClient implements IndexingServiceClient
@Override
public String compactSegments(
List<DataSegment> segments,
+ boolean keepSegmentGranularity,
int compactionTaskPriority,
@Nullable ClientCompactQueryTuningConfig tuningConfig,
@Nullable Map<String, Object> context
@@ -112,7 +113,7 @@ public class HttpIndexingServiceClient implements IndexingServiceClient
context = context == null ? new HashMap<>() : context;
context.put("priority", compactionTaskPriority);
- return runTask(new ClientCompactQuery(dataSource, segments, tuningConfig, context));
+ return runTask(new ClientCompactQuery(dataSource, segments, keepSegmentGranularity, tuningConfig, context));
}
@Override
diff --git a/server/src/main/java/io/druid/client/indexing/IndexingServiceClient.java b/server/src/main/java/io/druid/client/indexing/IndexingServiceClient.java
index 80e47d0..72c9794 100644
--- a/server/src/main/java/io/druid/client/indexing/IndexingServiceClient.java
+++ b/server/src/main/java/io/druid/client/indexing/IndexingServiceClient.java
@@ -40,6 +40,7 @@ public interface IndexingServiceClient
String compactSegments(
List<DataSegment> segments,
+ boolean keepSegmentGranularity,
int compactionTaskPriority,
@Nullable ClientCompactQueryTuningConfig tuningConfig,
@Nullable Map<String, Object> context
diff --git a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java
index 6e368ed..4a3e490 100644
--- a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java
+++ b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java
@@ -150,8 +150,11 @@ public class DruidCoordinatorSegmentCompactor implements DruidCoordinatorHelper
if (segmentsToCompact.size() > 1) {
final DataSourceCompactionConfig config = compactionConfigs.get(dataSourceName);
+ // Currently set keepSegmentGranularity to false because it breaks the algorithm of CompactionSegmentIterator to
+ // find segments to be compacted.
final String taskId = indexingServiceClient.compactSegments(
segmentsToCompact,
+ false,
config.getTaskPriority(),
config.getTuningConfig(),
config.getTaskContext()
diff --git a/server/src/test/java/io/druid/client/indexing/NoopIndexingServiceClient.java b/server/src/test/java/io/druid/client/indexing/NoopIndexingServiceClient.java
index 030c991..e7889dc 100644
--- a/server/src/test/java/io/druid/client/indexing/NoopIndexingServiceClient.java
+++ b/server/src/test/java/io/druid/client/indexing/NoopIndexingServiceClient.java
@@ -57,6 +57,7 @@ public class NoopIndexingServiceClient implements IndexingServiceClient
@Override
public String compactSegments(
List<DataSegment> segments,
+ boolean keepSegmentGranularity,
int compactionTaskPriority,
@Nullable ClientCompactQueryTuningConfig tuningConfig,
@Nullable Map<String, Object> context
diff --git a/server/src/test/java/io/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactorTest.java b/server/src/test/java/io/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactorTest.java
index 52ba82c..62f79b2 100644
--- a/server/src/test/java/io/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactorTest.java
+++ b/server/src/test/java/io/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactorTest.java
@@ -64,6 +64,7 @@ public class DruidCoordinatorSegmentCompactorTest
@Override
public String compactSegments(
List<DataSegment> segments,
+ boolean keepSegmentGranularity,
int compactionTaskPriority,
ClientCompactQueryTuningConfig tuningConfig,
Map<String, Object> context
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org