You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ma...@apache.org on 2021/11/09 05:17:44 UTC
[druid] branch master updated: Support changing dimension schema in
Auto Compaction (#11874)
This is an automated email from the ASF dual-hosted git repository.
maytasm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new ddc68c6 Support changing dimension schema in Auto Compaction (#11874)
ddc68c6 is described below
commit ddc68c6a816bb1d0a0d657c80feee5e7fbd199b7
Author: Maytas Monsereenusorn <ma...@apache.org>
AuthorDate: Mon Nov 8 21:17:08 2021 -0800
Support changing dimension schema in Auto Compaction (#11874)
* add impl
* add unit tests
* fix checkstyle
* add impl
* add impl
* add impl
* add impl
* add impl
* add impl
* fix test
* add IT
* add IT
* fix docs
* add test
* address comments
* fix conflict
---
.../NewestSegmentFirstPolicyBenchmark.java | 1 +
.../org/apache/druid/timeline/CompactionState.java | 14 +-
.../org/apache/druid/timeline/DataSegmentTest.java | 99 +++++++++++++
docs/configuration/index.md | 12 +-
docs/ingestion/compaction.md | 13 +-
.../common/task/AbstractBatchIndexTask.java | 21 ++-
.../druid/indexing/common/task/IndexTask.java | 3 +-
.../parallel/ParallelIndexSupervisorTask.java | 3 +-
.../task/ClientCompactionTaskQuerySerdeTest.java | 9 ++
.../common/task/CompactionTaskParallelRunTest.java | 6 +
.../common/task/CompactionTaskRunTest.java | 2 +
.../apache/druid/testing/utils/CompactionUtil.java | 1 +
.../coordinator/duty/ITAutoCompactionTest.java | 61 +++++++-
.../duty/ITAutoCompactionUpgradeTest.java | 1 +
.../wikipedia_index_task_with_dimension_spec.json | 86 ++++++++++++
.../ClientCompactionTaskDimensionsSpec.java | 88 ++++++++++++
.../client/indexing/ClientCompactionTaskQuery.java | 14 +-
.../client/indexing/HttpIndexingServiceClient.java | 2 +
.../client/indexing/IndexingServiceClient.java | 1 +
.../coordinator/DataSourceCompactionConfig.java | 12 ++
.../UserCompactionTaskDimensionsConfig.java | 88 ++++++++++++
.../server/coordinator/duty/CompactSegments.java | 20 ++-
.../duty/NewestSegmentFirstIterator.java | 28 ++++
.../ClientCompactionTaskDimensionsSpecTest.java | 71 ++++++++++
.../client/indexing/NoopIndexingServiceClient.java | 1 +
.../DataSourceCompactionConfigTest.java | 41 ++++++
.../UserCompactionTaskDimensionsConfigTest.java | 71 ++++++++++
.../coordinator/duty/CompactSegmentsTest.java | 153 +++++++++++++++++++--
.../coordinator/duty/KillCompactionConfigTest.java | 3 +
.../duty/NewestSegmentFirstIteratorTest.java | 9 ++
.../duty/NewestSegmentFirstPolicyTest.java | 141 ++++++++++++++++---
.../CoordinatorCompactionConfigsResourceTest.java | 4 +
.../druid/sql/calcite/schema/SystemSchemaTest.java | 1 +
website/.spelling | 1 +
34 files changed, 1031 insertions(+), 50 deletions(-)
diff --git a/benchmarks/src/test/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java b/benchmarks/src/test/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java
index b93053e..4af61c6 100644
--- a/benchmarks/src/test/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/server/coordinator/NewestSegmentFirstPolicyBenchmark.java
@@ -101,6 +101,7 @@ public class NewestSegmentFirstPolicyBenchmark
null,
null,
null,
+ null,
null
)
);
diff --git a/core/src/main/java/org/apache/druid/timeline/CompactionState.java b/core/src/main/java/org/apache/druid/timeline/CompactionState.java
index 8588717..fff8234 100644
--- a/core/src/main/java/org/apache/druid/timeline/CompactionState.java
+++ b/core/src/main/java/org/apache/druid/timeline/CompactionState.java
@@ -21,6 +21,7 @@ package org.apache.druid.timeline;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import java.util.Map;
@@ -40,6 +41,7 @@ import java.util.Objects;
public class CompactionState
{
private final PartitionsSpec partitionsSpec;
+ private final DimensionsSpec dimensionsSpec;
// org.apache.druid.segment.IndexSpec cannot be used here because it's in the 'processing' module which
// has a dependency on the 'core' module where this class is.
private final Map<String, Object> indexSpec;
@@ -50,11 +52,13 @@ public class CompactionState
@JsonCreator
public CompactionState(
@JsonProperty("partitionsSpec") PartitionsSpec partitionsSpec,
+ @JsonProperty("dimensionsSpec") DimensionsSpec dimensionsSpec,
@JsonProperty("indexSpec") Map<String, Object> indexSpec,
@JsonProperty("granularitySpec") Map<String, Object> granularitySpec
)
{
this.partitionsSpec = partitionsSpec;
+ this.dimensionsSpec = dimensionsSpec;
this.indexSpec = indexSpec;
this.granularitySpec = granularitySpec;
}
@@ -66,6 +70,12 @@ public class CompactionState
}
@JsonProperty
+ public DimensionsSpec getDimensionsSpec()
+ {
+ return dimensionsSpec;
+ }
+
+ @JsonProperty
public Map<String, Object> getIndexSpec()
{
return indexSpec;
@@ -88,6 +98,7 @@ public class CompactionState
}
CompactionState that = (CompactionState) o;
return Objects.equals(partitionsSpec, that.partitionsSpec) &&
+ Objects.equals(dimensionsSpec, that.dimensionsSpec) &&
Objects.equals(indexSpec, that.indexSpec) &&
Objects.equals(granularitySpec, that.granularitySpec);
}
@@ -95,7 +106,7 @@ public class CompactionState
@Override
public int hashCode()
{
- return Objects.hash(partitionsSpec, indexSpec, granularitySpec);
+ return Objects.hash(partitionsSpec, dimensionsSpec, indexSpec, granularitySpec);
}
@Override
@@ -103,6 +114,7 @@ public class CompactionState
{
return "CompactionState{" +
"partitionsSpec=" + partitionsSpec +
+ ", dimensionsSpec=" + dimensionsSpec +
", indexSpec=" + indexSpec +
", granularitySpec=" + granularitySpec +
'}';
diff --git a/core/src/test/java/org/apache/druid/timeline/DataSegmentTest.java b/core/src/test/java/org/apache/druid/timeline/DataSegmentTest.java
index 66c3400..0ed35da 100644
--- a/core/src/test/java/org/apache/druid/timeline/DataSegmentTest.java
+++ b/core/src/test/java/org/apache/druid/timeline/DataSegmentTest.java
@@ -25,6 +25,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.RangeSet;
import org.apache.druid.TestObjectMapper;
+import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.java.util.common.DateTimes;
@@ -120,6 +121,7 @@ public class DataSegmentTest
new NumberedShardSpec(3, 0),
new CompactionState(
new HashedPartitionsSpec(100000, null, ImmutableList.of("dim1")),
+ new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim1", "bar", "foo")), null, null),
ImmutableMap.of(),
ImmutableMap.of()
),
@@ -142,6 +144,7 @@ public class DataSegmentTest
Assert.assertEquals(ImmutableMap.of("type", "numbered", "partitionNum", 3, "partitions", 0), objectMap.get("shardSpec"));
Assert.assertEquals(TEST_VERSION, objectMap.get("binaryVersion"));
Assert.assertEquals(1, objectMap.get("size"));
+ Assert.assertEquals(4, ((Map) objectMap.get("lastCompactionState")).size());
DataSegment deserializedSegment = MAPPER.readValue(MAPPER.writeValueAsString(segment), DataSegment.class);
@@ -154,6 +157,7 @@ public class DataSegmentTest
Assert.assertEquals(segment.getShardSpec(), deserializedSegment.getShardSpec());
Assert.assertEquals(segment.getSize(), deserializedSegment.getSize());
Assert.assertEquals(segment.getId(), deserializedSegment.getId());
+ Assert.assertEquals(segment.getLastCompactionState(), deserializedSegment.getLastCompactionState());
deserializedSegment = MAPPER.readValue(MAPPER.writeValueAsString(segment), DataSegment.class);
Assert.assertEquals(0, segment.compareTo(deserializedSegment));
@@ -166,6 +170,100 @@ public class DataSegmentTest
}
@Test
+ public void testDeserializationDataSegmentLastCompactionStateWithoutDimensionsSpec() throws Exception
+ {
+ final Interval interval = Intervals.of("2011-10-01/2011-10-02");
+ final ImmutableMap<String, Object> loadSpec = ImmutableMap.of("something", "or_other");
+ DataSegment segment = new DataSegment(
+ "something",
+ interval,
+ "1",
+ loadSpec,
+ Arrays.asList("dim1", "dim2"),
+ Arrays.asList("met1", "met2"),
+ new NumberedShardSpec(3, 0),
+ new CompactionState(
+ new HashedPartitionsSpec(100000, null, ImmutableList.of("dim1")),
+ null,
+ ImmutableMap.of(),
+ ImmutableMap.of()
+ ),
+ TEST_VERSION,
+ 1
+ );
+ String lastCompactionStateWithoutDimensionsSpec = "{"
+ + "\"dataSource\": \"something\","
+ + "\"interval\": \"2011-10-01T00:00:00.000Z/2011-10-02T00:00:00.000Z\","
+ + "\"version\": \"1\","
+ + "\"loadSpec\": {"
+ + " \"something\": \"or_other\""
+ + "},"
+ + "\"dimensions\": \"dim1,dim2\","
+ + "\"metrics\": \"met1,met2\","
+ + "\"shardSpec\": {"
+ + " \"type\": \"numbered\","
+ + " \"partitionNum\": 3,"
+ + " \"partitions\": 0"
+ + "},"
+ + "\"lastCompactionState\": {"
+ + " \"partitionsSpec\": {"
+ + " \"type\": \"hashed\","
+ + " \"numShards\": null,"
+ + " \"partitionDimensions\": [\"dim1\"],"
+ + " \"partitionFunction\": \"murmur3_32_abs\","
+ + " \"maxRowsPerSegment\": 100000"
+ + " },"
+ + " \"indexSpec\": {},"
+ + " \"granularitySpec\": {}"
+ + "},"
+ + "\"binaryVersion\": 9,"
+ + "\"size\": 1,"
+ + "\"identifier\": \"something_2011-10-01T00:00:00.000Z_2011-10-02T00:00:00.000Z_1_3\""
+ + "}";
+
+ final Map<String, Object> objectMap = MAPPER.readValue(
+ lastCompactionStateWithoutDimensionsSpec,
+ JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT
+ );
+ Assert.assertEquals(11, objectMap.size());
+ Assert.assertEquals("something", objectMap.get("dataSource"));
+ Assert.assertEquals(interval.toString(), objectMap.get("interval"));
+ Assert.assertEquals("1", objectMap.get("version"));
+ Assert.assertEquals(loadSpec, objectMap.get("loadSpec"));
+ Assert.assertEquals("dim1,dim2", objectMap.get("dimensions"));
+ Assert.assertEquals("met1,met2", objectMap.get("metrics"));
+ Assert.assertEquals(ImmutableMap.of("type", "numbered", "partitionNum", 3, "partitions", 0), objectMap.get("shardSpec"));
+ Assert.assertEquals(TEST_VERSION, objectMap.get("binaryVersion"));
+ Assert.assertEquals(1, objectMap.get("size"));
+ Assert.assertEquals(3, ((Map) objectMap.get("lastCompactionState")).size());
+
+ DataSegment deserializedSegment = MAPPER.readValue(lastCompactionStateWithoutDimensionsSpec, DataSegment.class);
+ Assert.assertEquals(segment.getDataSource(), deserializedSegment.getDataSource());
+ Assert.assertEquals(segment.getInterval(), deserializedSegment.getInterval());
+ Assert.assertEquals(segment.getVersion(), deserializedSegment.getVersion());
+ Assert.assertEquals(segment.getLoadSpec(), deserializedSegment.getLoadSpec());
+ Assert.assertEquals(segment.getDimensions(), deserializedSegment.getDimensions());
+ Assert.assertEquals(segment.getMetrics(), deserializedSegment.getMetrics());
+ Assert.assertEquals(segment.getShardSpec(), deserializedSegment.getShardSpec());
+ Assert.assertEquals(segment.getSize(), deserializedSegment.getSize());
+ Assert.assertEquals(segment.getId(), deserializedSegment.getId());
+ Assert.assertEquals(segment.getLastCompactionState(), deserializedSegment.getLastCompactionState());
+ Assert.assertNotNull(segment.getLastCompactionState());
+ Assert.assertNull(segment.getLastCompactionState().getDimensionsSpec());
+ Assert.assertNotNull(deserializedSegment.getLastCompactionState());
+ Assert.assertNull(deserializedSegment.getLastCompactionState().getDimensionsSpec());
+
+ deserializedSegment = MAPPER.readValue(lastCompactionStateWithoutDimensionsSpec, DataSegment.class);
+ Assert.assertEquals(0, segment.compareTo(deserializedSegment));
+
+ deserializedSegment = MAPPER.readValue(lastCompactionStateWithoutDimensionsSpec, DataSegment.class);
+ Assert.assertEquals(0, deserializedSegment.compareTo(segment));
+
+ deserializedSegment = MAPPER.readValue(lastCompactionStateWithoutDimensionsSpec, DataSegment.class);
+ Assert.assertEquals(segment.hashCode(), deserializedSegment.hashCode());
+ }
+
+ @Test
public void testIdentifier()
{
final DataSegment segment = DataSegment.builder()
@@ -232,6 +330,7 @@ public class DataSegmentTest
{
final CompactionState compactionState = new CompactionState(
new DynamicPartitionsSpec(null, null),
+ new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("bar", "foo")), null, null),
Collections.singletonMap("test", "map"),
Collections.singletonMap("test2", "map2")
);
diff --git a/docs/configuration/index.md b/docs/configuration/index.md
index 1eba2dc..e1a49b3 100644
--- a/docs/configuration/index.md
+++ b/docs/configuration/index.md
@@ -945,7 +945,8 @@ A description of the compaction config is:
|`skipOffsetFromLatest`|The offset for searching segments to be compacted in [ISO 8601](https://en.wikipedia.org/wiki/ISO_8601) duration format. Strongly recommended to set for realtime dataSources. See [Data handling with compaction](../ingestion/compaction.md#data-handling-with-compaction)|no (default = "P1D")|
|`tuningConfig`|Tuning config for compaction tasks. See below [Compaction Task TuningConfig](#automatic-compaction-tuningconfig).|no|
|`taskContext`|[Task context](../ingestion/tasks.md#context) for compaction tasks.|no|
-|`granularitySpec`|Custom `granularitySpec` to describe the `segmentGranularity` for the compacted segments. See [Automatic compaction granularitySpec](#automatic-compaction-granularityspec)|No|
+|`granularitySpec`|Custom `granularitySpec`. See [Automatic compaction granularitySpec](#automatic-compaction-granularityspec)|No|
+|`dimensionsSpec`|Custom `dimensionsSpec`. See [Automatic compaction dimensionsSpec](#automatic-compaction-dimensions-spec)|No|
|`ioConfig`|IO config for compaction tasks. See below [Compaction Task IOConfig](#automatic-compaction-ioconfig).|no|
An example of compaction config is:
@@ -988,9 +989,6 @@ The below is a list of the supported configurations for auto compaction.
|`chatHandlerNumRetries`|Retries for reporting the pushed segments in worker tasks.|no (default = 5)|
###### Automatic compaction granularitySpec
-You can optionally use the `granularitySpec` object to configure the segment granularity of the compacted segments.
-
-`granularitySpec` takes the following keys:
|Field|Description|Required|
|-----|-----------|--------|
@@ -998,6 +996,12 @@ You can optionally use the `granularitySpec` object to configure the segment gra
|`queryGranularity`|The resolution of timestamp storage within each segment. Defaults to 'null', which preserves the original query granularity. Accepts all [Query granularity](../querying/granularities.md) values.|No|
|`rollup`|Whether to enable ingestion-time rollup or not. Defaults to 'null', which preserves the original setting. Note that once data is rollup, individual records can no longer be recovered. |No|
+###### Automatic compaction dimensions spec
+
+|Field|Description|Required|
+|-----|-----------|--------|
+|`dimensions`| A list of dimension names or objects. Defaults to 'null', which preserves the original dimensions. Note that setting this will cause segments manually compacted with `dimensionExclusions` to be compacted again.|No|
+
###### Automatic compaction IOConfig
Auto compaction supports a subset of the [IOConfig for Parallel task](../ingestion/native-batch.md).
diff --git a/docs/ingestion/compaction.md b/docs/ingestion/compaction.md
index fa63a63..3762fd5 100644
--- a/docs/ingestion/compaction.md
+++ b/docs/ingestion/compaction.md
@@ -109,7 +109,7 @@ To perform a manual compaction, you submit a compaction task. Compaction tasks m
|`id`|Task id|No|
|`dataSource`|Data source name to compact|Yes|
|`ioConfig`|I/O configuration for compaction task. See [Compaction I/O configuration](#compaction-io-configuration) for details.|Yes|
-|`dimensionsSpec`|Custom dimensions spec. The compaction task uses the specified dimensions spec if it exists instead of generating one.|No|
+|`dimensionsSpec`|Custom dimensions spec. The compaction task uses the specified dimensions spec if it exists instead of generating one. See [Compaction dimensionsSpec](#compaction-dimensions-spec) for details.|No|
|`metricsSpec`|Custom metrics spec. The compaction task uses the specified metrics spec rather than generating one.|No|
|`segmentGranularity`|When set, the compaction task changes the segment granularity for the given interval. Deprecated. Use `granularitySpec`. |No.|
|`tuningConfig`|[Parallel indexing task tuningConfig](native-batch.md#tuningconfig). `awaitSegmentAvailabilityTimeoutMillis` in the tuning config is not currently supported for compaction tasks. Do not set it to a non-zero value.|No|
@@ -181,6 +181,17 @@ Druid supports two supported `inputSpec` formats:
|`type`|Task type. Should be `segments`|Yes|
|`segments`|A list of segment IDs|Yes|
+
+### Compaction dimensions spec
+You can optionally use the `dimensionsSpec` object to configure the dimensions of the compacted segments.
+
+`dimensionsSpec` takes the following keys:
+
+|Field|Description|Required|
+|-----|-----------|--------|
+|`dimensions`| A list of dimension names or objects. Cannot have the same column in both `dimensions` and `dimensionExclusions`. Defaults to `null`, which preserves the original dimensions.|No|
+|`dimensionExclusions`| The names of dimensions to exclude from compaction. Only names are supported here, not objects. This list is only used if the dimensions list is null or empty; otherwise it is ignored. Defaults to `[]`.|No|
+
### Compaction granularity spec
You can optionally use the `granularitySpec` object to configure the segment granularity and the query granularity of the compacted segments. Their syntax is as follows:
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
index a289221..7faaa09 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
@@ -28,6 +28,7 @@ import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputSource;
import org.apache.druid.data.input.InputSourceReader;
+import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.TaskLock;
@@ -56,6 +57,8 @@ import org.apache.druid.segment.handoff.SegmentHandoffNotifier;
import org.apache.druid.segment.incremental.ParseExceptionHandler;
import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.indexing.IngestionSpec;
+import org.apache.druid.segment.indexing.TuningConfig;
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
import org.apache.druid.timeline.CompactionState;
import org.apache.druid.timeline.DataSegment;
@@ -479,14 +482,22 @@ public abstract class AbstractBatchIndexTask extends AbstractTask
public static Function<Set<DataSegment>, Set<DataSegment>> compactionStateAnnotateFunction(
boolean storeCompactionState,
TaskToolbox toolbox,
- IndexTuningConfig tuningConfig,
- GranularitySpec granularitySpec
+ IngestionSpec ingestionSpec
)
{
if (storeCompactionState) {
- final Map<String, Object> indexSpecMap = tuningConfig.getIndexSpec().asMap(toolbox.getJsonMapper());
- final Map<String, Object> granularitySpecMap = granularitySpec.asMap(toolbox.getJsonMapper());
- final CompactionState compactionState = new CompactionState(tuningConfig.getPartitionsSpec(), indexSpecMap, granularitySpecMap);
+ TuningConfig tuningConfig = ingestionSpec.getTuningConfig();
+ GranularitySpec granularitySpec = ingestionSpec.getDataSchema().getGranularitySpec();
+ // We do not need to store dimensionExclusions and spatialDimensions since auto compaction does not support them
+ DimensionsSpec dimensionsSpec = ingestionSpec.getDataSchema().getDimensionsSpec() == null
+ ? null
+ : new DimensionsSpec(ingestionSpec.getDataSchema().getDimensionsSpec().getDimensions(), null, null);
+ final CompactionState compactionState = new CompactionState(
+ tuningConfig.getPartitionsSpec(),
+ dimensionsSpec,
+ tuningConfig.getIndexSpec().asMap(toolbox.getJsonMapper()),
+ granularitySpec.asMap(toolbox.getJsonMapper())
+ );
return segments -> segments
.stream()
.map(s -> s.withLastCompactionState(compactionState))
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
index a993b53..ba59ba1 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
@@ -925,8 +925,7 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
compactionStateAnnotateFunction(
storeCompactionState,
toolbox,
- ingestionSchema.getTuningConfig(),
- ingestionSchema.getDataSchema().getGranularitySpec()
+ ingestionSchema
);
// Probably we can publish atomicUpdateGroup along with segments.
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
index ec5a5df..c185d71 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
@@ -1051,8 +1051,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
final Function<Set<DataSegment>, Set<DataSegment>> annotateFunction = compactionStateAnnotateFunction(
storeCompactionState,
toolbox,
- ingestionSchema.getTuningConfig(),
- ingestionSchema.getDataSchema().getGranularitySpec()
+ ingestionSchema
);
Set<DataSegment> segmentsFoundForDrop = null;
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java
index 8d78d32..c1a3363 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java
@@ -27,6 +27,7 @@ import com.google.common.collect.ImmutableMap;
import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.client.indexing.ClientCompactionIOConfig;
import org.apache.druid.client.indexing.ClientCompactionIntervalSpec;
+import org.apache.druid.client.indexing.ClientCompactionTaskDimensionsSpec;
import org.apache.druid.client.indexing.ClientCompactionTaskGranularitySpec;
import org.apache.druid.client.indexing.ClientCompactionTaskQuery;
import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig;
@@ -34,6 +35,7 @@ import org.apache.druid.client.indexing.ClientTaskQuery;
import org.apache.druid.client.indexing.IndexingServiceClient;
import org.apache.druid.client.indexing.NoopIndexingServiceClient;
import org.apache.druid.data.input.SegmentsSplitHintSpec;
+import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.guice.GuiceAnnotationIntrospector;
import org.apache.druid.guice.GuiceInjectableValues;
import org.apache.druid.guice.GuiceInjectors;
@@ -117,6 +119,7 @@ public class ClientCompactionTaskQuerySerdeTest
100
),
new ClientCompactionTaskGranularitySpec(Granularities.DAY, Granularities.HOUR, true),
+ new ClientCompactionTaskDimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("ts", "dim"))),
ImmutableMap.of("key", "value")
);
@@ -211,6 +214,10 @@ public class ClientCompactionTaskQuerySerdeTest
task.getIoConfig().isDropExisting()
);
Assert.assertEquals(query.getContext(), task.getContext());
+ Assert.assertEquals(
+ query.getDimensionsSpec().getDimensions(),
+ task.getDimensionsSpec().getDimensions()
+ );
}
@Test
@@ -269,6 +276,7 @@ public class ClientCompactionTaskQuerySerdeTest
)
)
.granularitySpec(new ClientCompactionTaskGranularitySpec(Granularities.DAY, Granularities.HOUR, true))
+ .dimensionsSpec(new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("ts", "dim")), ImmutableList.of("__time", "val"), null))
.build();
final ClientCompactionTaskQuery expected = new ClientCompactionTaskQuery(
@@ -312,6 +320,7 @@ public class ClientCompactionTaskQuerySerdeTest
100
),
new ClientCompactionTaskGranularitySpec(Granularities.DAY, Granularities.HOUR, true),
+ new ClientCompactionTaskDimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("ts", "dim"))),
new HashMap<>()
);
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
index e9ec434..c4f96c2 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
@@ -158,6 +158,7 @@ public class CompactionTaskParallelRunTest extends AbstractParallelIndexSupervis
// Expect compaction state to exist as store compaction state by default
CompactionState expectedState = new CompactionState(
new DynamicPartitionsSpec(null, Long.MAX_VALUE),
+ new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("ts", "dim")), null, null),
compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper()),
getObjectMapper().readValue(
getObjectMapper().writeValueAsString(
@@ -198,6 +199,7 @@ public class CompactionTaskParallelRunTest extends AbstractParallelIndexSupervis
Assert.assertSame(HashBasedNumberedShardSpec.class, segment.getShardSpec().getClass());
CompactionState expectedState = new CompactionState(
new HashedPartitionsSpec(null, 3, null),
+ new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("ts", "dim")), null, null),
compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper()),
getObjectMapper().readValue(
getObjectMapper().writeValueAsString(
@@ -238,6 +240,7 @@ public class CompactionTaskParallelRunTest extends AbstractParallelIndexSupervis
Assert.assertSame(SingleDimensionShardSpec.class, segment.getShardSpec().getClass());
CompactionState expectedState = new CompactionState(
new SingleDimensionPartitionsSpec(7, null, "dim", false),
+ new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("ts", "dim")), null, null),
compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper()),
getObjectMapper().readValue(
getObjectMapper().writeValueAsString(
@@ -281,6 +284,7 @@ public class CompactionTaskParallelRunTest extends AbstractParallelIndexSupervis
Assert.assertSame(DimensionRangeShardSpec.class, segment.getShardSpec().getClass());
CompactionState expectedState = new CompactionState(
new DimensionRangePartitionsSpec(7, null, Arrays.asList("dim1", "dim2"), false),
+ new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("ts", "dim")), null, null),
compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper()),
getObjectMapper().readValue(
getObjectMapper().writeValueAsString(
@@ -321,6 +325,7 @@ public class CompactionTaskParallelRunTest extends AbstractParallelIndexSupervis
Assert.assertSame(SingleDimensionShardSpec.class, segment.getShardSpec().getClass());
CompactionState expectedState = new CompactionState(
new SingleDimensionPartitionsSpec(7, null, "dim", false),
+ new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("ts", "dim")), null, null),
compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper()),
getObjectMapper().readValue(
getObjectMapper().writeValueAsString(
@@ -364,6 +369,7 @@ public class CompactionTaskParallelRunTest extends AbstractParallelIndexSupervis
Assert.assertSame(DimensionRangeShardSpec.class, segment.getShardSpec().getClass());
CompactionState expectedState = new CompactionState(
new DimensionRangePartitionsSpec(7, null, Arrays.asList("dim1", "dim2"), false),
+ new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("ts", "dim")), null, null),
compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper()),
getObjectMapper().readValue(
getObjectMapper().writeValueAsString(
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
index eebd875..b1e3c03 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
@@ -194,6 +194,7 @@ public class CompactionTaskRunTest extends IngestionTestBase
// Expected compaction state to exist after compaction as we store compaction state by default
return new CompactionState(
new DynamicPartitionsSpec(5000000, Long.MAX_VALUE),
+ new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("ts", "dim")), null, null),
mapper.readValue(mapper.writeValueAsString(new IndexSpec()), Map.class),
mapper.readValue(
mapper.writeValueAsString(
@@ -338,6 +339,7 @@ public class CompactionTaskRunTest extends IngestionTestBase
);
CompactionState expectedState = new CompactionState(
new HashedPartitionsSpec(null, 3, null),
+ new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("ts", "dim")), null, null),
compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper()),
getObjectMapper().readValue(
getObjectMapper().writeValueAsString(
diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/CompactionUtil.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/CompactionUtil.java
index 5ccd5f4..6dcf790 100644
--- a/integration-tests/src/main/java/org/apache/druid/testing/utils/CompactionUtil.java
+++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/CompactionUtil.java
@@ -69,6 +69,7 @@ public class CompactionUtil
1
),
null,
+ null,
new UserCompactionTaskIOConfig(true),
null
);
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
index 157b8d5..c7e5634 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
@@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import org.apache.commons.io.IOUtils;
import org.apache.druid.data.input.MaxSizeSplitHintSpec;
+import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
@@ -38,6 +39,7 @@ import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.server.coordinator.AutoCompactionSnapshot;
import org.apache.druid.server.coordinator.CoordinatorCompactionConfig;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
+import org.apache.druid.server.coordinator.UserCompactionTaskDimensionsConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskGranularityConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskIOConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskQueryTuningConfig;
@@ -76,6 +78,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
private static final Logger LOG = new Logger(ITAutoCompactionTest.class);
private static final String INDEX_TASK = "/indexer/wikipedia_index_task.json";
private static final String INDEX_TASK_WITH_GRANULARITY_SPEC = "/indexer/wikipedia_index_task_with_granularity_spec.json";
+ private static final String INDEX_TASK_WITH_DIMENSION_SPEC = "/indexer/wikipedia_index_task_with_dimension_spec.json";
private static final String INDEX_ROLLUP_QUERIES_RESOURCE = "/indexer/wikipedia_index_rollup_queries.json";
private static final String INDEX_QUERIES_RESOURCE = "/indexer/wikipedia_index_queries.json";
private static final int MAX_ROWS_PER_SEGMENT_COMPACTED = 10000;
@@ -175,7 +178,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
LOG.info("Auto compaction test with hash partitioning");
final HashedPartitionsSpec hashedPartitionsSpec = new HashedPartitionsSpec(null, 3, null);
- submitCompactionConfig(hashedPartitionsSpec, NO_SKIP_OFFSET, 1, null, false);
+ submitCompactionConfig(hashedPartitionsSpec, NO_SKIP_OFFSET, 1, null, null, false);
// 2 segments published per day after compaction.
forceTriggerAutoCompaction(4);
verifyQuery(INDEX_QUERIES_RESOURCE);
@@ -190,7 +193,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
"city",
false
);
- submitCompactionConfig(rangePartitionsSpec, NO_SKIP_OFFSET, 1, null, false);
+ submitCompactionConfig(rangePartitionsSpec, NO_SKIP_OFFSET, 1, null, null, false);
forceTriggerAutoCompaction(2);
verifyQuery(INDEX_QUERIES_RESOURCE);
verifySegmentsCompacted(rangePartitionsSpec, 2);
@@ -679,6 +682,51 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
}
}
+ @Test
+ public void testAutoCompactionDutyWithDimensionsSpec() throws Exception
+ {
+ // Index data with dimensions "page", "language", "user", "unpatrolled", "newPage", "robot", "anonymous",
+ // "namespace", "continent", "country", "region", "city"
+ loadData(INDEX_TASK_WITH_DIMENSION_SPEC);
+ try (final Closeable ignored = unloader(fullDatasourceName)) {
+ final List<String> intervalsBeforeCompaction = coordinator.getSegmentIntervals(fullDatasourceName);
+ intervalsBeforeCompaction.sort(null);
+ // 4 segments across 2 days (4 total)...
+ verifySegmentsCount(4);
+
+ // Result is not rollup
+ Map<String, Object> expectedResult = ImmutableMap.of(
+ "%%EXPECTED_COUNT_RESULT%%", 2,
+ "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(57.0), ImmutableList.of(459.0))))
+ );
+ verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, expectedResult);
+
+ // Compact and change dimension to only "language"
+ submitCompactionConfig(
+ MAX_ROWS_PER_SEGMENT_COMPACTED,
+ NO_SKIP_OFFSET,
+ null,
+ new UserCompactionTaskDimensionsConfig(DimensionsSpec.getDefaultSchemas(ImmutableList.of("language"))),
+ false
+ );
+ forceTriggerAutoCompaction(2);
+
+ // Result should rollup on language dimension
+ expectedResult = ImmutableMap.of(
+ "%%EXPECTED_COUNT_RESULT%%", 1,
+ "%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(516.0))))
+ );
+ verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, expectedResult);
+ verifySegmentsCompacted(2, MAX_ROWS_PER_SEGMENT_COMPACTED);
+
+ List<TaskResponseObject> compactTasksBefore = indexer.getCompleteTasksForDataSource(fullDatasourceName);
+ // Verify compacted segments does not get compacted again
+ forceTriggerAutoCompaction(2);
+ List<TaskResponseObject> compactTasksAfter = indexer.getCompleteTasksForDataSource(fullDatasourceName);
+ Assert.assertEquals(compactTasksAfter.size(), compactTasksBefore.size());
+ }
+ }
+
private void loadData(String indexTask) throws Exception
{
loadData(indexTask, ImmutableMap.of());
@@ -752,7 +800,12 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
private void submitCompactionConfig(Integer maxRowsPerSegment, Period skipOffsetFromLatest, UserCompactionTaskGranularityConfig granularitySpec, boolean dropExisting) throws Exception
{
- submitCompactionConfig(new DynamicPartitionsSpec(maxRowsPerSegment, null), skipOffsetFromLatest, 1, granularitySpec, dropExisting);
+ submitCompactionConfig(maxRowsPerSegment, skipOffsetFromLatest, granularitySpec, null, dropExisting);
+ }
+
+ private void submitCompactionConfig(Integer maxRowsPerSegment, Period skipOffsetFromLatest, UserCompactionTaskGranularityConfig granularitySpec, UserCompactionTaskDimensionsConfig dimensionsSpec, boolean dropExisting) throws Exception
+ {
+ submitCompactionConfig(new DynamicPartitionsSpec(maxRowsPerSegment, null), skipOffsetFromLatest, 1, granularitySpec, dimensionsSpec, dropExisting);
}
private void submitCompactionConfig(
@@ -760,6 +813,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
Period skipOffsetFromLatest,
int maxNumConcurrentSubTasks,
UserCompactionTaskGranularityConfig granularitySpec,
+ UserCompactionTaskDimensionsConfig dimensionsSpec,
boolean dropExisting
) throws Exception
{
@@ -789,6 +843,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
1
),
granularitySpec,
+ dimensionsSpec,
!dropExisting ? null : new UserCompactionTaskIOConfig(true),
null
);
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionUpgradeTest.java b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionUpgradeTest.java
index a4b1ed6..0ca16cf 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionUpgradeTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionUpgradeTest.java
@@ -97,6 +97,7 @@ public class ITAutoCompactionUpgradeTest extends AbstractIndexerTest
1
),
new UserCompactionTaskGranularityConfig(Granularities.YEAR, null, null),
+ null,
new UserCompactionTaskIOConfig(true),
null
);
diff --git a/integration-tests/src/test/resources/indexer/wikipedia_index_task_with_dimension_spec.json b/integration-tests/src/test/resources/indexer/wikipedia_index_task_with_dimension_spec.json
new file mode 100644
index 0000000..1fa8b4e
--- /dev/null
+++ b/integration-tests/src/test/resources/indexer/wikipedia_index_task_with_dimension_spec.json
@@ -0,0 +1,86 @@
+{
+ "type": "index",
+ "spec": {
+ "dataSchema": {
+ "dataSource": "%%DATASOURCE%%",
+ "metricsSpec": [
+ {
+ "type": "count",
+ "name": "count"
+ },
+ {
+ "type": "doubleSum",
+ "name": "added",
+ "fieldName": "added"
+ },
+ {
+ "type": "doubleSum",
+ "name": "deleted",
+ "fieldName": "deleted"
+ },
+ {
+ "type": "doubleSum",
+ "name": "delta",
+ "fieldName": "delta"
+ },
+ {
+ "name": "thetaSketch",
+ "type": "thetaSketch",
+ "fieldName": "user"
+ },
+ {
+ "name": "quantilesDoublesSketch",
+ "type": "quantilesDoublesSketch",
+ "fieldName": "delta"
+ },
+ {
+ "name": "HLLSketchBuild",
+ "type": "HLLSketchBuild",
+ "fieldName": "user"
+ }
+ ],
+ "granularitySpec": {
+ "segmentGranularity": "DAY",
+ "queryGranularity": "DAY",
+ "intervals" : [ "2013-08-31/2013-09-02" ]
+ },
+ "parser": {
+ "parseSpec": {
+ "format" : "json",
+ "timestampSpec": {
+ "column": "timestamp"
+ },
+ "dimensionsSpec": {
+ "dimensions": [
+ "page",
+ {"type": "string", "name": "language", "createBitmapIndex": false},
+ "user",
+ "unpatrolled",
+ "newPage",
+ "robot",
+ "anonymous",
+ "namespace",
+ "continent",
+ "country",
+ "region",
+ "city"
+ ]
+ }
+ }
+ }
+ },
+ "ioConfig": {
+ "type": "index",
+ "firehose": {
+ "type": "local",
+ "baseDir": "/resources/data/batch_index/json",
+ "filter": "wikipedia_index_data*"
+ }
+ },
+ "tuningConfig": {
+ "type": "index",
+ "maxRowsPerSegment": 3,
+ "awaitSegmentAvailabilityTimeoutMillis": %%SEGMENT_AVAIL_TIMEOUT_MILLIS%%
+ }
+ }
+}
\ No newline at end of file
diff --git a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskDimensionsSpec.java b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskDimensionsSpec.java
new file mode 100644
index 0000000..4937bce
--- /dev/null
+++ b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskDimensionsSpec.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.client.indexing;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.data.input.impl.DimensionSchema;
+import org.apache.druid.java.util.common.parsers.ParserUtils;
+
+import javax.annotation.Nullable;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/**
+ * Spec containing dimension configs for Compaction Task.
+ * This class mimics JSON field names for fields supported in compaction task with
+ * the corresponding fields in {@link org.apache.druid.data.input.impl.DimensionsSpec}.
+ * This is done for end-user ease of use. Basically, end-user will use the same syntax / JSON structure to set
+ * dimension configs for Compaction task as they would for any other ingestion task.
+ */
+public class ClientCompactionTaskDimensionsSpec
+{
+ @Nullable private final List<DimensionSchema> dimensions;
+
+ @JsonCreator
+ public ClientCompactionTaskDimensionsSpec(
+ @Nullable @JsonProperty("dimensions") List<DimensionSchema> dimensions
+ )
+ {
+ if (dimensions != null) {
+ List<String> dimensionNames = dimensions.stream().map(DimensionSchema::getName).collect(Collectors.toList());
+ ParserUtils.validateFields(dimensionNames);
+ }
+ this.dimensions = dimensions;
+ }
+
+ @Nullable
+ @JsonProperty
+ public List<DimensionSchema> getDimensions()
+ {
+ return dimensions;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ClientCompactionTaskDimensionsSpec that = (ClientCompactionTaskDimensionsSpec) o;
+ return Objects.equals(dimensions, that.dimensions);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(dimensions);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "ClientCompactionTaskDimensionsSpec{" +
+ "dimensions=" + dimensions +
+ '}';
+ }
+}
diff --git a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskQuery.java b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskQuery.java
index 62e39d6..89e5049 100644
--- a/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskQuery.java
+++ b/server/src/main/java/org/apache/druid/client/indexing/ClientCompactionTaskQuery.java
@@ -39,6 +39,7 @@ public class ClientCompactionTaskQuery implements ClientTaskQuery
private final ClientCompactionIOConfig ioConfig;
private final ClientCompactionTaskQueryTuningConfig tuningConfig;
private final ClientCompactionTaskGranularitySpec granularitySpec;
+ private final ClientCompactionTaskDimensionsSpec dimensionsSpec;
private final Map<String, Object> context;
@JsonCreator
@@ -48,6 +49,7 @@ public class ClientCompactionTaskQuery implements ClientTaskQuery
@JsonProperty("ioConfig") ClientCompactionIOConfig ioConfig,
@JsonProperty("tuningConfig") ClientCompactionTaskQueryTuningConfig tuningConfig,
@JsonProperty("granularitySpec") ClientCompactionTaskGranularitySpec granularitySpec,
+ @JsonProperty("dimensionsSpec") ClientCompactionTaskDimensionsSpec dimensionsSpec,
@JsonProperty("context") Map<String, Object> context
)
{
@@ -56,6 +58,7 @@ public class ClientCompactionTaskQuery implements ClientTaskQuery
this.ioConfig = ioConfig;
this.tuningConfig = tuningConfig;
this.granularitySpec = granularitySpec;
+ this.dimensionsSpec = dimensionsSpec;
this.context = context;
}
@@ -99,12 +102,17 @@ public class ClientCompactionTaskQuery implements ClientTaskQuery
}
@JsonProperty
+ public ClientCompactionTaskDimensionsSpec getDimensionsSpec()
+ {
+ return dimensionsSpec;
+ }
+
+ @JsonProperty
public Map<String, Object> getContext()
{
return context;
}
-
@Override
public boolean equals(Object o)
{
@@ -120,13 +128,14 @@ public class ClientCompactionTaskQuery implements ClientTaskQuery
Objects.equals(ioConfig, that.ioConfig) &&
Objects.equals(tuningConfig, that.tuningConfig) &&
Objects.equals(granularitySpec, that.granularitySpec) &&
+ Objects.equals(dimensionsSpec, that.dimensionsSpec) &&
Objects.equals(context, that.context);
}
@Override
public int hashCode()
{
- return Objects.hash(id, dataSource, ioConfig, tuningConfig, granularitySpec, context);
+ return Objects.hash(id, dataSource, ioConfig, tuningConfig, granularitySpec, dimensionsSpec, context);
}
@Override
@@ -138,6 +147,7 @@ public class ClientCompactionTaskQuery implements ClientTaskQuery
", ioConfig=" + ioConfig +
", tuningConfig=" + tuningConfig +
", granularitySpec=" + granularitySpec +
+ ", dimensionsSpec=" + dimensionsSpec +
", context=" + context +
'}';
}
diff --git a/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java b/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java
index afab3b5..c2f80a0 100644
--- a/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java
+++ b/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java
@@ -81,6 +81,7 @@ public class HttpIndexingServiceClient implements IndexingServiceClient
int compactionTaskPriority,
@Nullable ClientCompactionTaskQueryTuningConfig tuningConfig,
@Nullable ClientCompactionTaskGranularitySpec granularitySpec,
+ @Nullable ClientCompactionTaskDimensionsSpec dimensionsSpec,
@Nullable Boolean dropExisting,
@Nullable Map<String, Object> context
)
@@ -103,6 +104,7 @@ public class HttpIndexingServiceClient implements IndexingServiceClient
new ClientCompactionIOConfig(ClientCompactionIntervalSpec.fromSegments(segments), dropExisting),
tuningConfig,
granularitySpec,
+ dimensionsSpec,
context
);
return runTask(taskId, taskQuery);
diff --git a/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java b/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java
index 84f9f55..d249989 100644
--- a/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java
+++ b/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java
@@ -41,6 +41,7 @@ public interface IndexingServiceClient
int compactionTaskPriority,
@Nullable ClientCompactionTaskQueryTuningConfig tuningConfig,
@Nullable ClientCompactionTaskGranularitySpec granularitySpec,
+ @Nullable ClientCompactionTaskDimensionsSpec dimensionsSpec,
@Nullable Boolean dropExisting,
@Nullable Map<String, Object> context
);
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java
index dfe1f57..33d8043 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/DataSourceCompactionConfig.java
@@ -47,6 +47,7 @@ public class DataSourceCompactionConfig
private final Period skipOffsetFromLatest;
private final UserCompactionTaskQueryTuningConfig tuningConfig;
private final UserCompactionTaskGranularityConfig granularitySpec;
+ private final UserCompactionTaskDimensionsConfig dimensionsSpec;
private final UserCompactionTaskIOConfig ioConfig;
private final Map<String, Object> taskContext;
@@ -59,6 +60,7 @@ public class DataSourceCompactionConfig
@JsonProperty("skipOffsetFromLatest") @Nullable Period skipOffsetFromLatest,
@JsonProperty("tuningConfig") @Nullable UserCompactionTaskQueryTuningConfig tuningConfig,
@JsonProperty("granularitySpec") @Nullable UserCompactionTaskGranularityConfig granularitySpec,
+ @JsonProperty("dimensionsSpec") @Nullable UserCompactionTaskDimensionsConfig dimensionsSpec,
@JsonProperty("ioConfig") @Nullable UserCompactionTaskIOConfig ioConfig,
@JsonProperty("taskContext") @Nullable Map<String, Object> taskContext
)
@@ -75,6 +77,7 @@ public class DataSourceCompactionConfig
this.tuningConfig = tuningConfig;
this.ioConfig = ioConfig;
this.granularitySpec = granularitySpec;
+ this.dimensionsSpec = dimensionsSpec;
this.taskContext = taskContext;
}
@@ -133,6 +136,13 @@ public class DataSourceCompactionConfig
@JsonProperty
@Nullable
+ public UserCompactionTaskDimensionsConfig getDimensionsSpec()
+ {
+ return dimensionsSpec;
+ }
+
+ @JsonProperty
+ @Nullable
public Map<String, Object> getTaskContext()
{
return taskContext;
@@ -155,6 +165,7 @@ public class DataSourceCompactionConfig
Objects.equals(skipOffsetFromLatest, that.skipOffsetFromLatest) &&
Objects.equals(tuningConfig, that.tuningConfig) &&
Objects.equals(granularitySpec, that.granularitySpec) &&
+ Objects.equals(dimensionsSpec, that.dimensionsSpec) &&
Objects.equals(ioConfig, that.ioConfig) &&
Objects.equals(taskContext, that.taskContext);
}
@@ -170,6 +181,7 @@ public class DataSourceCompactionConfig
skipOffsetFromLatest,
tuningConfig,
granularitySpec,
+ dimensionsSpec,
ioConfig,
taskContext
);
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/UserCompactionTaskDimensionsConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/UserCompactionTaskDimensionsConfig.java
new file mode 100644
index 0000000..bee7f35
--- /dev/null
+++ b/server/src/main/java/org/apache/druid/server/coordinator/UserCompactionTaskDimensionsConfig.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.data.input.impl.DimensionSchema;
+import org.apache.druid.java.util.common.parsers.ParserUtils;
+
+import javax.annotation.Nullable;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/**
+ * Spec containing dimension configs for Compaction Task.
+ * This class mimics JSON field names for fields supported in compaction task with
+ * the corresponding fields in {@link org.apache.druid.data.input.impl.DimensionsSpec}.
+ * This is done for end-user ease of use. Basically, end-user will use the same syntax / JSON structure to set
+ * dimension configs for Compaction task as they would for any other ingestion task.
+ */
+public class UserCompactionTaskDimensionsConfig
+{
+ @Nullable private final List<DimensionSchema> dimensions;
+
+ @JsonCreator
+ public UserCompactionTaskDimensionsConfig(
+ @Nullable @JsonProperty("dimensions") List<DimensionSchema> dimensions
+ )
+ {
+ if (dimensions != null) {
+ List<String> dimensionNames = dimensions.stream().map(DimensionSchema::getName).collect(Collectors.toList());
+ ParserUtils.validateFields(dimensionNames);
+ }
+ this.dimensions = dimensions;
+ }
+
+ @Nullable
+ @JsonProperty
+ public List<DimensionSchema> getDimensions()
+ {
+ return dimensions;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ UserCompactionTaskDimensionsConfig that = (UserCompactionTaskDimensionsConfig) o;
+ return Objects.equals(dimensions, that.dimensions);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(dimensions);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "UserCompactionTaskDimensionsConfig{" +
+ "dimensions=" + dimensions +
+ '}';
+ }
+}
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
index b58910a..adf3c96 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
@@ -22,6 +22,7 @@ package org.apache.druid.server.coordinator.duty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.inject.Inject;
+import org.apache.druid.client.indexing.ClientCompactionTaskDimensionsSpec;
import org.apache.druid.client.indexing.ClientCompactionTaskGranularitySpec;
import org.apache.druid.client.indexing.ClientCompactionTaskQuery;
import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig;
@@ -341,16 +342,26 @@ public class CompactSegments implements CoordinatorDuty
snapshotBuilder.incrementSegmentCountCompacted(segmentsToCompact.size());
final DataSourceCompactionConfig config = compactionConfigs.get(dataSourceName);
- ClientCompactionTaskGranularitySpec queryGranularitySpec;
+ // Create granularitySpec to send to compaction task
+ ClientCompactionTaskGranularitySpec granularitySpec;
if (config.getGranularitySpec() != null) {
- queryGranularitySpec = new ClientCompactionTaskGranularitySpec(
+ granularitySpec = new ClientCompactionTaskGranularitySpec(
config.getGranularitySpec().getSegmentGranularity(),
config.getGranularitySpec().getQueryGranularity(),
config.getGranularitySpec().isRollup()
);
} else {
- queryGranularitySpec = null;
+ granularitySpec = null;
+ }
+ // Create dimensionsSpec to send to compaction task
+ ClientCompactionTaskDimensionsSpec dimensionsSpec;
+ if (config.getDimensionsSpec() != null) {
+ dimensionsSpec = new ClientCompactionTaskDimensionsSpec(
+ config.getDimensionsSpec().getDimensions()
+ );
+ } else {
+ dimensionsSpec = null;
}
Boolean dropExisting = null;
@@ -364,7 +375,8 @@ public class CompactSegments implements CoordinatorDuty
segmentsToCompact,
config.getTaskPriority(),
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment()),
- queryGranularitySpec,
+ granularitySpec,
+ dimensionsSpec,
dropExisting,
newAutoCompactionContext(config.getTaskContext())
);
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java
index 4225fec..11dd2c4 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIterator.java
@@ -26,6 +26,8 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.druid.client.indexing.ClientCompactionTaskGranularitySpec;
import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig;
+import org.apache.druid.data.input.impl.DimensionSchema;
+import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.java.util.common.DateTimes;
@@ -424,6 +426,11 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
existingGranularitySpec.isRollup() :
null;
if (existingRollup == null || !config.getGranularitySpec().isRollup().equals(existingRollup)) {
+ log.info(
+ "Configured rollup[%s] is different from the rollup[%s] of segments. Needs compaction",
+ config.getGranularitySpec().isRollup(),
+ existingRollup
+ );
return true;
}
}
@@ -435,6 +442,27 @@ public class NewestSegmentFirstIterator implements CompactionSegmentIterator
existingGranularitySpec.getQueryGranularity() :
null;
if (!config.getGranularitySpec().getQueryGranularity().equals(existingQueryGranularity)) {
+ log.info(
+ "Configured queryGranularity[%s] is different from the queryGranularity[%s] of segments. Needs compaction",
+ config.getGranularitySpec().getQueryGranularity(),
+ existingQueryGranularity
+ );
+ return true;
+ }
+ }
+ }
+
+ if (config.getDimensionsSpec() != null) {
+ final DimensionsSpec existingDimensionsSpec = lastCompactionState.getDimensionsSpec();
+ // Checks for list of dimensions
+ if (config.getDimensionsSpec().getDimensions() != null) {
+ final List<DimensionSchema> existingDimensions = existingDimensionsSpec != null ?
+ existingDimensionsSpec.getDimensions() :
+ null;
+ if (!config.getDimensionsSpec().getDimensions().equals(existingDimensions)) {
+ log.info(
+ "Configured dimensionsSpec is different from the dimensionsSpec of segments. Needs compaction"
+ );
return true;
}
}
diff --git a/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionTaskDimensionsSpecTest.java b/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionTaskDimensionsSpecTest.java
new file mode 100644
index 0000000..6b2df99
--- /dev/null
+++ b/server/src/test/java/org/apache/druid/client/indexing/ClientCompactionTaskDimensionsSpecTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.client.indexing;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import nl.jqno.equalsverifier.EqualsVerifier;
+import org.apache.druid.data.input.impl.DimensionSchema;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.StringDimensionSchema;
+import org.apache.druid.java.util.common.parsers.ParseException;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+
+public class ClientCompactionTaskDimensionsSpecTest
+{
+ @Test
+ public void testEquals()
+ {
+ EqualsVerifier.forClass(ClientCompactionTaskDimensionsSpec.class)
+ .withPrefabValues(
+ DimensionSchema.class,
+ new StringDimensionSchema("bar", DimensionSchema.MultiValueHandling.ofDefault(), true),
+ new StringDimensionSchema("foo", DimensionSchema.MultiValueHandling.ofDefault(), true)
+ )
+ .usingGetClass()
+ .verify();
+ }
+
+ @Test
+ public void testSerde() throws IOException
+ {
+ final ClientCompactionTaskDimensionsSpec expected = new ClientCompactionTaskDimensionsSpec(
+ DimensionsSpec.getDefaultSchemas(ImmutableList.of("ts", "dim"))
+ );
+ final ObjectMapper mapper = new ObjectMapper();
+ final byte[] json = mapper.writeValueAsBytes(expected);
+ final ClientCompactionTaskDimensionsSpec fromJson = (ClientCompactionTaskDimensionsSpec) mapper.readValue(
+ json,
+ ClientCompactionTaskDimensionsSpec.class
+ );
+ Assert.assertEquals(expected, fromJson);
+ }
+
+ @Test(expected = ParseException.class)
+ public void testInvalidDimensionsField()
+ {
+ final ClientCompactionTaskDimensionsSpec expected = new ClientCompactionTaskDimensionsSpec(
+ DimensionsSpec.getDefaultSchemas(ImmutableList.of("ts", "dim", "dim"))
+ );
+ }
+}
diff --git a/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java b/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java
index 2035dba..5bc7b87 100644
--- a/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java
+++ b/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java
@@ -51,6 +51,7 @@ public class NoopIndexingServiceClient implements IndexingServiceClient
int compactionTaskPriority,
@Nullable ClientCompactionTaskQueryTuningConfig tuningConfig,
@Nullable ClientCompactionTaskGranularitySpec granularitySpec,
+ @Nullable ClientCompactionTaskDimensionsSpec dimensionsSpec,
@Nullable Boolean dropExisting,
@Nullable Map<String, Object> context
)
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java
index 4f8fd12..27732ef 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/DataSourceCompactionConfigTest.java
@@ -20,8 +20,10 @@
package org.apache.druid.server.coordinator;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.data.input.SegmentsSplitHintSpec;
+import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.HumanReadableBytes;
@@ -59,6 +61,7 @@ public class DataSourceCompactionConfigTest
null,
null,
null,
+ null,
ImmutableMap.of("key", "val")
);
final String json = OBJECT_MAPPER.writeValueAsString(config);
@@ -86,6 +89,7 @@ public class DataSourceCompactionConfigTest
null,
null,
null,
+ null,
ImmutableMap.of("key", "val")
);
final String json = OBJECT_MAPPER.writeValueAsString(config);
@@ -130,6 +134,7 @@ public class DataSourceCompactionConfigTest
),
null,
null,
+ null,
ImmutableMap.of("key", "val")
);
final String json = OBJECT_MAPPER.writeValueAsString(config);
@@ -174,6 +179,7 @@ public class DataSourceCompactionConfigTest
),
null,
null,
+ null,
ImmutableMap.of("key", "val")
);
@@ -240,6 +246,7 @@ public class DataSourceCompactionConfigTest
null,
new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, null),
null,
+ null,
ImmutableMap.of("key", "val")
);
final String json = OBJECT_MAPPER.writeValueAsString(config);
@@ -267,6 +274,7 @@ public class DataSourceCompactionConfigTest
null,
new UserCompactionTaskGranularityConfig(null, Granularities.YEAR, null),
null,
+ null,
ImmutableMap.of("key", "val")
);
final String json = OBJECT_MAPPER.writeValueAsString(config);
@@ -297,6 +305,7 @@ public class DataSourceCompactionConfigTest
null,
null,
null,
+ null,
ImmutableMap.of("key", "val")
);
final String json = OBJECT_MAPPER.writeValueAsString(config);
@@ -324,6 +333,7 @@ public class DataSourceCompactionConfigTest
null,
new UserCompactionTaskGranularityConfig(null, null, null),
null,
+ null,
ImmutableMap.of("key", "val")
);
final String json = OBJECT_MAPPER.writeValueAsString(config);
@@ -351,6 +361,7 @@ public class DataSourceCompactionConfigTest
null,
new UserCompactionTaskGranularityConfig(null, null, true),
null,
+ null,
ImmutableMap.of("key", "val")
);
final String json = OBJECT_MAPPER.writeValueAsString(config);
@@ -380,6 +391,7 @@ public class DataSourceCompactionConfigTest
new Period(3600),
null,
new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, null),
+ null,
new UserCompactionTaskIOConfig(true),
ImmutableMap.of("key", "val")
);
@@ -408,6 +420,7 @@ public class DataSourceCompactionConfigTest
new Period(3600),
null,
new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, null),
+ null,
new UserCompactionTaskIOConfig(null),
ImmutableMap.of("key", "val")
);
@@ -424,4 +437,32 @@ public class DataSourceCompactionConfigTest
Assert.assertEquals(config.getGranularitySpec(), fromJson.getGranularitySpec());
Assert.assertEquals(config.getIoConfig(), fromJson.getIoConfig());
}
+
+ @Test
+ public void testSerdeDimensionsSpec() throws IOException
+ {
+ final DataSourceCompactionConfig config = new DataSourceCompactionConfig(
+ "dataSource",
+ null,
+ 500L,
+ null,
+ new Period(3600),
+ null,
+ null,
+ new UserCompactionTaskDimensionsConfig(DimensionsSpec.getDefaultSchemas(ImmutableList.of("foo"))),
+ null,
+ ImmutableMap.of("key", "val")
+ );
+ final String json = OBJECT_MAPPER.writeValueAsString(config);
+ final DataSourceCompactionConfig fromJson = OBJECT_MAPPER.readValue(json, DataSourceCompactionConfig.class);
+
+ Assert.assertEquals(config.getDataSource(), fromJson.getDataSource());
+ Assert.assertEquals(25, fromJson.getTaskPriority());
+ Assert.assertEquals(config.getInputSegmentSizeBytes(), fromJson.getInputSegmentSizeBytes());
+ Assert.assertEquals(config.getMaxRowsPerSegment(), fromJson.getMaxRowsPerSegment());
+ Assert.assertEquals(config.getSkipOffsetFromLatest(), fromJson.getSkipOffsetFromLatest());
+ Assert.assertEquals(config.getTuningConfig(), fromJson.getTuningConfig());
+ Assert.assertEquals(config.getTaskContext(), fromJson.getTaskContext());
+ Assert.assertEquals(config.getDimensionsSpec(), fromJson.getDimensionsSpec());
+ }
}
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/UserCompactionTaskDimensionsConfigTest.java b/server/src/test/java/org/apache/druid/server/coordinator/UserCompactionTaskDimensionsConfigTest.java
new file mode 100644
index 0000000..4862b45
--- /dev/null
+++ b/server/src/test/java/org/apache/druid/server/coordinator/UserCompactionTaskDimensionsConfigTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import nl.jqno.equalsverifier.EqualsVerifier;
+import org.apache.druid.data.input.impl.DimensionSchema;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.StringDimensionSchema;
+import org.apache.druid.java.util.common.parsers.ParseException;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+
+public class UserCompactionTaskDimensionsConfigTest
+{
+ @Test
+ public void testEquals()
+ {
+ EqualsVerifier.forClass(UserCompactionTaskDimensionsConfig.class)
+ .withPrefabValues(
+ DimensionSchema.class,
+ new StringDimensionSchema("bar", DimensionSchema.MultiValueHandling.ofDefault(), true),
+ new StringDimensionSchema("foo", DimensionSchema.MultiValueHandling.ofDefault(), true)
+ )
+ .usingGetClass()
+ .verify();
+ }
+
+ @Test
+ public void testSerde() throws IOException
+ {
+ final UserCompactionTaskDimensionsConfig expected = new UserCompactionTaskDimensionsConfig(
+ DimensionsSpec.getDefaultSchemas(ImmutableList.of("ts", "dim"))
+ );
+ final ObjectMapper mapper = new ObjectMapper();
+ final byte[] json = mapper.writeValueAsBytes(expected);
+ final UserCompactionTaskDimensionsConfig fromJson = (UserCompactionTaskDimensionsConfig) mapper.readValue(
+ json,
+ UserCompactionTaskDimensionsConfig.class
+ );
+ Assert.assertEquals(expected, fromJson);
+ }
+
+ @Test(expected = ParseException.class)
+ public void testInvalidDimensionsField()
+ {
+ final UserCompactionTaskDimensionsConfig expected = new UserCompactionTaskDimensionsConfig(
+ DimensionsSpec.getDefaultSchemas(ImmutableList.of("ts", "dim", "dim"))
+ );
+ }
+}
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
index 494f567..2da165a 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java
@@ -30,6 +30,7 @@ import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.druid.client.DataSourcesSnapshot;
import org.apache.druid.client.indexing.ClientCompactionIOConfig;
import org.apache.druid.client.indexing.ClientCompactionIntervalSpec;
+import org.apache.druid.client.indexing.ClientCompactionTaskDimensionsSpec;
import org.apache.druid.client.indexing.ClientCompactionTaskGranularitySpec;
import org.apache.druid.client.indexing.ClientCompactionTaskQuery;
import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig;
@@ -38,6 +39,7 @@ import org.apache.druid.client.indexing.HttpIndexingServiceClient;
import org.apache.druid.client.indexing.IndexingWorker;
import org.apache.druid.client.indexing.IndexingWorkerInfo;
import org.apache.druid.client.indexing.TaskPayloadResponse;
+import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.discovery.DruidLeaderClient;
import org.apache.druid.discovery.DruidNodeDiscovery;
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
@@ -66,6 +68,7 @@ import org.apache.druid.server.coordinator.CoordinatorStats;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.server.coordinator.DruidCoordinatorConfig;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
+import org.apache.druid.server.coordinator.UserCompactionTaskDimensionsConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskGranularityConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskIOConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskQueryTuningConfig;
@@ -387,12 +390,12 @@ public class CompactSegmentsTest
DataSegment afterNoon = createSegment(dataSourceName, j, false, k);
if (j == 3) {
// Make two intervals on this day compacted (two compacted intervals back-to-back)
- beforeNoon = beforeNoon.withLastCompactionState(new CompactionState(partitionsSpec, ImmutableMap.of(), ImmutableMap.of()));
- afterNoon = afterNoon.withLastCompactionState(new CompactionState(partitionsSpec, ImmutableMap.of(), ImmutableMap.of()));
+ beforeNoon = beforeNoon.withLastCompactionState(new CompactionState(partitionsSpec, null, ImmutableMap.of(), ImmutableMap.of()));
+ afterNoon = afterNoon.withLastCompactionState(new CompactionState(partitionsSpec, null, ImmutableMap.of(), ImmutableMap.of()));
}
if (j == 1) {
// Make one interval on this day compacted
- afterNoon = afterNoon.withLastCompactionState(new CompactionState(partitionsSpec, ImmutableMap.of(), ImmutableMap.of()));
+ afterNoon = afterNoon.withLastCompactionState(new CompactionState(partitionsSpec, null, ImmutableMap.of(), ImmutableMap.of()));
}
segments.add(beforeNoon);
segments.add(afterNoon);
@@ -671,6 +674,7 @@ public class CompactSegmentsTest
),
null,
null,
+ null,
null
)
);
@@ -685,6 +689,7 @@ public class CompactSegmentsTest
ArgumentMatchers.any(),
granularitySpecArgumentCaptor.capture(),
ArgumentMatchers.any(),
+ ArgumentMatchers.any(),
ArgumentMatchers.any()
);
// Only the same amount of segments as the original PARTITION_PER_TIME_INTERVAL since segment granulartity is the same
@@ -726,6 +731,7 @@ public class CompactSegmentsTest
null
),
null,
+ null,
new UserCompactionTaskIOConfig(true),
null
)
@@ -738,6 +744,7 @@ public class CompactSegmentsTest
ArgumentMatchers.anyInt(),
ArgumentMatchers.any(),
ArgumentMatchers.any(),
+ ArgumentMatchers.any(),
dropExistingCapture.capture(),
ArgumentMatchers.any()
);
@@ -779,6 +786,7 @@ public class CompactSegmentsTest
),
null,
null,
+ null,
null
)
);
@@ -790,6 +798,7 @@ public class CompactSegmentsTest
ArgumentMatchers.anyInt(),
ArgumentMatchers.any(),
ArgumentMatchers.any(),
+ ArgumentMatchers.any(),
dropExistingCapture.capture(),
ArgumentMatchers.any()
);
@@ -831,6 +840,7 @@ public class CompactSegmentsTest
),
new UserCompactionTaskGranularityConfig(Granularities.YEAR, null, null),
null,
+ null,
null
)
);
@@ -845,6 +855,7 @@ public class CompactSegmentsTest
ArgumentMatchers.any(),
granularitySpecArgumentCaptor.capture(),
ArgumentMatchers.any(),
+ ArgumentMatchers.any(),
ArgumentMatchers.any()
);
// All segments is compact at the same time since we changed the segment granularity to YEAR and all segment
@@ -857,6 +868,119 @@ public class CompactSegmentsTest
}
@Test
+ public void testCompactWithDimensionSpec()
+ {
+ final HttpIndexingServiceClient mockIndexingServiceClient = Mockito.mock(HttpIndexingServiceClient.class);
+ final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, mockIndexingServiceClient);
+ final List<DataSourceCompactionConfig> compactionConfigs = new ArrayList<>();
+ final String dataSource = DATA_SOURCE_PREFIX + 0;
+ compactionConfigs.add(
+ new DataSourceCompactionConfig(
+ dataSource,
+ 0,
+ 500L,
+ null,
+ new Period("PT0H"), // smaller than segment interval
+ new UserCompactionTaskQueryTuningConfig(
+ null,
+ null,
+ null,
+ null,
+ partitionsSpec,
+ null,
+ null,
+ null,
+ null,
+ null,
+ 3,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null
+ ),
+ null,
+ new UserCompactionTaskDimensionsConfig(DimensionsSpec.getDefaultSchemas(ImmutableList.of("bar", "foo"))),
+ null,
+ null
+ )
+ );
+ doCompactSegments(compactSegments, compactionConfigs);
+ ArgumentCaptor<ClientCompactionTaskDimensionsSpec> dimensionsSpecArgumentCaptor = ArgumentCaptor.forClass(
+ ClientCompactionTaskDimensionsSpec.class);
+ Mockito.verify(mockIndexingServiceClient).compactSegments(
+ ArgumentMatchers.anyString(),
+ ArgumentMatchers.any(),
+ ArgumentMatchers.anyInt(),
+ ArgumentMatchers.any(),
+ ArgumentMatchers.any(),
+ dimensionsSpecArgumentCaptor.capture(),
+ ArgumentMatchers.any(),
+ ArgumentMatchers.any()
+ );
+ ClientCompactionTaskDimensionsSpec actual = dimensionsSpecArgumentCaptor.getValue();
+ Assert.assertNotNull(actual);
+ Assert.assertEquals(DimensionsSpec.getDefaultSchemas(ImmutableList.of("bar", "foo")), actual.getDimensions());
+ }
+
+ @Test
+ public void testCompactWithoutDimensionSpec()
+ {
+ final HttpIndexingServiceClient mockIndexingServiceClient = Mockito.mock(HttpIndexingServiceClient.class);
+ final CompactSegments compactSegments = new CompactSegments(COORDINATOR_CONFIG, JSON_MAPPER, mockIndexingServiceClient);
+ final List<DataSourceCompactionConfig> compactionConfigs = new ArrayList<>();
+ final String dataSource = DATA_SOURCE_PREFIX + 0;
+ compactionConfigs.add(
+ new DataSourceCompactionConfig(
+ dataSource,
+ 0,
+ 500L,
+ null,
+ new Period("PT0H"), // smaller than segment interval
+ new UserCompactionTaskQueryTuningConfig(
+ null,
+ null,
+ null,
+ null,
+ partitionsSpec,
+ null,
+ null,
+ null,
+ null,
+ null,
+ 3,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null
+ ),
+ null,
+ null,
+ null,
+ null
+ )
+ );
+ doCompactSegments(compactSegments, compactionConfigs);
+ ArgumentCaptor<ClientCompactionTaskDimensionsSpec> dimensionsSpecArgumentCaptor = ArgumentCaptor.forClass(
+ ClientCompactionTaskDimensionsSpec.class);
+ Mockito.verify(mockIndexingServiceClient).compactSegments(
+ ArgumentMatchers.anyString(),
+ ArgumentMatchers.any(),
+ ArgumentMatchers.anyInt(),
+ ArgumentMatchers.any(),
+ ArgumentMatchers.any(),
+ dimensionsSpecArgumentCaptor.capture(),
+ ArgumentMatchers.any(),
+ ArgumentMatchers.any()
+ );
+ ClientCompactionTaskDimensionsSpec actual = dimensionsSpecArgumentCaptor.getValue();
+ Assert.assertNull(actual);
+ }
+
+ @Test
public void testCompactWithRollupInGranularitySpec()
{
final HttpIndexingServiceClient mockIndexingServiceClient = Mockito.mock(HttpIndexingServiceClient.class);
@@ -891,6 +1015,7 @@ public class CompactSegmentsTest
),
new UserCompactionTaskGranularityConfig(Granularities.YEAR, null, true),
null,
+ null,
null
)
);
@@ -905,6 +1030,7 @@ public class CompactSegmentsTest
ArgumentMatchers.any(),
granularitySpecArgumentCaptor.capture(),
ArgumentMatchers.any(),
+ ArgumentMatchers.any(),
ArgumentMatchers.any()
);
Assert.assertEquals(datasourceToSegments.get(dataSource).size(), segmentsCaptor.getValue().size());
@@ -947,6 +1073,7 @@ public class CompactSegmentsTest
),
null,
new ClientCompactionTaskGranularitySpec(Granularities.DAY, null, null),
+ null,
null
)
);
@@ -983,6 +1110,7 @@ public class CompactSegmentsTest
),
new UserCompactionTaskGranularityConfig(Granularities.YEAR, null, null),
null,
+ null,
null
)
);
@@ -1002,6 +1130,7 @@ public class CompactSegmentsTest
ArgumentMatchers.any(),
granularitySpecArgumentCaptor.capture(),
ArgumentMatchers.any(),
+ ArgumentMatchers.any(),
ArgumentMatchers.any()
);
// All segments is compact at the same time since we changed the segment granularity to YEAR and all segment
@@ -1400,6 +1529,7 @@ public class CompactSegmentsTest
),
null,
null,
+ null,
null
)
);
@@ -1502,7 +1632,7 @@ public class CompactSegmentsTest
compactSegments(
timeline,
segments,
- compactionTaskQuery.getTuningConfig()
+ compactionTaskQuery
);
return createStringFullResponseHolder(jsonMapper.writeValueAsString(ImmutableMap.of("task", taskQuery.getId())));
}
@@ -1515,7 +1645,7 @@ public class CompactSegmentsTest
private void compactSegments(
VersionedIntervalTimeline<String, DataSegment> timeline,
List<DataSegment> segments,
- ClientCompactionTaskQueryTuningConfig tuningConfig
+ ClientCompactionTaskQuery clientCompactionTaskQuery
)
{
Preconditions.checkArgument(segments.size() > 1);
@@ -1539,13 +1669,13 @@ public class CompactSegmentsTest
final String version = "newVersion_" + compactVersionSuffix++;
final long segmentSize = segments.stream().mapToLong(DataSegment::getSize).sum() / 2;
final PartitionsSpec compactionPartitionsSpec;
- if (tuningConfig.getPartitionsSpec() instanceof DynamicPartitionsSpec) {
+ if (clientCompactionTaskQuery.getTuningConfig().getPartitionsSpec() instanceof DynamicPartitionsSpec) {
compactionPartitionsSpec = new DynamicPartitionsSpec(
- tuningConfig.getPartitionsSpec().getMaxRowsPerSegment(),
- ((DynamicPartitionsSpec) tuningConfig.getPartitionsSpec()).getMaxTotalRowsOr(Long.MAX_VALUE)
+ clientCompactionTaskQuery.getTuningConfig().getPartitionsSpec().getMaxRowsPerSegment(),
+ ((DynamicPartitionsSpec) clientCompactionTaskQuery.getTuningConfig().getPartitionsSpec()).getMaxTotalRowsOr(Long.MAX_VALUE)
);
} else {
- compactionPartitionsSpec = tuningConfig.getPartitionsSpec();
+ compactionPartitionsSpec = clientCompactionTaskQuery.getTuningConfig().getPartitionsSpec();
}
for (int i = 0; i < 2; i++) {
@@ -1559,6 +1689,11 @@ public class CompactSegmentsTest
shardSpecFactory.apply(i, 2),
new CompactionState(
compactionPartitionsSpec,
+ clientCompactionTaskQuery.getDimensionsSpec() == null ? null : new DimensionsSpec(
+ clientCompactionTaskQuery.getDimensionsSpec().getDimensions(),
+ null,
+ null
+ ),
ImmutableMap.of(
"bitmap",
ImmutableMap.of("type", "roaring", "compressRunOnSerialization", true),
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillCompactionConfigTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillCompactionConfigTest.java
index a7d1d37..674c7f2 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillCompactionConfigTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillCompactionConfigTest.java
@@ -232,6 +232,7 @@ public class KillCompactionConfigTest
null,
new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, null),
null,
+ null,
ImmutableMap.of("key", "val")
);
@@ -244,6 +245,7 @@ public class KillCompactionConfigTest
null,
new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, null),
null,
+ null,
ImmutableMap.of("key", "val")
);
CoordinatorCompactionConfig originalCurrentConfig = CoordinatorCompactionConfig.from(ImmutableList.of(inactiveDatasourceConfig, activeDatasourceConfig));
@@ -348,6 +350,7 @@ public class KillCompactionConfigTest
null,
new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, null),
null,
+ null,
ImmutableMap.of("key", "val")
);
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIteratorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIteratorTest.java
index 092bc9f..1dc6908 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIteratorTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstIteratorTest.java
@@ -93,6 +93,7 @@ public class NewestSegmentFirstIteratorTest
null,
null,
null,
+ null,
null
);
Assert.assertEquals(
@@ -133,6 +134,7 @@ public class NewestSegmentFirstIteratorTest
),
null,
null,
+ null,
null
);
Assert.assertEquals(
@@ -173,6 +175,7 @@ public class NewestSegmentFirstIteratorTest
),
null,
null,
+ null,
null
);
Assert.assertEquals(
@@ -213,6 +216,7 @@ public class NewestSegmentFirstIteratorTest
),
null,
null,
+ null,
null
);
Assert.assertEquals(
@@ -253,6 +257,7 @@ public class NewestSegmentFirstIteratorTest
),
null,
null,
+ null,
null
);
Assert.assertEquals(
@@ -293,6 +298,7 @@ public class NewestSegmentFirstIteratorTest
),
null,
null,
+ null,
null
);
Assert.assertEquals(
@@ -333,6 +339,7 @@ public class NewestSegmentFirstIteratorTest
),
null,
null,
+ null,
null
);
Assert.assertEquals(
@@ -373,6 +380,7 @@ public class NewestSegmentFirstIteratorTest
),
null,
null,
+ null,
null
);
Assert.assertEquals(
@@ -413,6 +421,7 @@ public class NewestSegmentFirstIteratorTest
),
null,
null,
+ null,
null
);
Assert.assertEquals(
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java
index a202b6d..82a6573 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/NewestSegmentFirstPolicyTest.java
@@ -27,6 +27,7 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import org.apache.druid.client.indexing.ClientCompactionTaskQueryTuningConfig;
+import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
@@ -37,6 +38,7 @@ import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.data.ConciseBitmapSerdeFactory;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
+import org.apache.druid.server.coordinator.UserCompactionTaskDimensionsConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskGranularityConfig;
import org.apache.druid.timeline.CompactionState;
import org.apache.druid.timeline.DataSegment;
@@ -696,13 +698,13 @@ public class NewestSegmentFirstPolicyTest
Intervals.of("2017-10-01T00:00:00/2017-10-02T00:00:00"),
new Period("P1D"),
null,
- new CompactionState(partitionsSpec, indexSpec, null)
+ new CompactionState(partitionsSpec, null, indexSpec, null)
),
new SegmentGenerateSpec(
Intervals.of("2017-10-02T00:00:00/2017-10-03T00:00:00"),
new Period("P1D"),
null,
- new CompactionState(partitionsSpec, indexSpec, null)
+ new CompactionState(partitionsSpec, null, indexSpec, null)
)
);
@@ -729,13 +731,13 @@ public class NewestSegmentFirstPolicyTest
Intervals.of("2017-10-01T00:00:00/2017-10-02T00:00:00"),
new Period("P1D"),
null,
- new CompactionState(partitionsSpec, indexSpec, ImmutableMap.of("segmentGranularity", "day"))
+ new CompactionState(partitionsSpec, null, indexSpec, ImmutableMap.of("segmentGranularity", "day"))
),
new SegmentGenerateSpec(
Intervals.of("2017-10-02T00:00:00/2017-10-03T00:00:00"),
new Period("P1D"),
null,
- new CompactionState(partitionsSpec, indexSpec, ImmutableMap.of("segmentGranularity", "day"))
+ new CompactionState(partitionsSpec, null, indexSpec, ImmutableMap.of("segmentGranularity", "day"))
)
);
@@ -762,13 +764,13 @@ public class NewestSegmentFirstPolicyTest
Intervals.of("2017-10-01T00:00:00/2017-10-02T00:00:00"),
new Period("P1D"),
null,
- new CompactionState(partitionsSpec, indexSpec, null)
+ new CompactionState(partitionsSpec, null, indexSpec, null)
),
new SegmentGenerateSpec(
Intervals.of("2017-10-02T00:00:00/2017-10-03T00:00:00"),
new Period("P1D"),
null,
- new CompactionState(partitionsSpec, indexSpec, null)
+ new CompactionState(partitionsSpec, null, indexSpec, null)
)
);
@@ -805,13 +807,13 @@ public class NewestSegmentFirstPolicyTest
Intervals.of("2017-10-01T00:00:00/2017-10-02T00:00:00"),
new Period("P1D"),
null,
- new CompactionState(partitionsSpec, indexSpec, ImmutableMap.of("segmentGranularity", "day"))
+ new CompactionState(partitionsSpec, null, indexSpec, ImmutableMap.of("segmentGranularity", "day"))
),
new SegmentGenerateSpec(
Intervals.of("2017-10-02T00:00:00/2017-10-03T00:00:00"),
new Period("P1D"),
null,
- new CompactionState(partitionsSpec, indexSpec, ImmutableMap.of("segmentGranularity", "day"))
+ new CompactionState(partitionsSpec, null, indexSpec, ImmutableMap.of("segmentGranularity", "day"))
)
);
@@ -848,7 +850,7 @@ public class NewestSegmentFirstPolicyTest
Intervals.of("2017-10-02T00:00:00/2017-10-03T00:00:00"),
new Period("P1D"),
null,
- new CompactionState(partitionsSpec, indexSpec, null)
+ new CompactionState(partitionsSpec, null, indexSpec, null)
)
);
@@ -900,7 +902,7 @@ public class NewestSegmentFirstPolicyTest
Intervals.of("2017-10-02T00:00:00/2017-10-03T00:00:00"),
new Period("P1D"),
null,
- new CompactionState(partitionsSpec, indexSpec, null)
+ new CompactionState(partitionsSpec, null, indexSpec, null)
)
);
@@ -954,19 +956,19 @@ public class NewestSegmentFirstPolicyTest
Intervals.of("2017-10-01T00:00:00/2017-10-02T00:00:00"),
new Period("P1D"),
null,
- new CompactionState(partitionsSpec, indexSpec, ImmutableMap.of("rollup", "false"))
+ new CompactionState(partitionsSpec, null, indexSpec, ImmutableMap.of("rollup", "false"))
),
new SegmentGenerateSpec(
Intervals.of("2017-10-02T00:00:00/2017-10-03T00:00:00"),
new Period("P1D"),
null,
- new CompactionState(partitionsSpec, indexSpec, ImmutableMap.of("rollup", "true"))
+ new CompactionState(partitionsSpec, null, indexSpec, ImmutableMap.of("rollup", "true"))
),
new SegmentGenerateSpec(
Intervals.of("2017-10-03T00:00:00/2017-10-04T00:00:00"),
new Period("P1D"),
null,
- new CompactionState(partitionsSpec, indexSpec, ImmutableMap.of())
+ new CompactionState(partitionsSpec, null, indexSpec, ImmutableMap.of())
)
);
@@ -1014,19 +1016,19 @@ public class NewestSegmentFirstPolicyTest
Intervals.of("2017-10-01T00:00:00/2017-10-02T00:00:00"),
new Period("P1D"),
null,
- new CompactionState(partitionsSpec, indexSpec, ImmutableMap.of("queryGranularity", "day"))
+ new CompactionState(partitionsSpec, null, indexSpec, ImmutableMap.of("queryGranularity", "day"))
),
new SegmentGenerateSpec(
Intervals.of("2017-10-02T00:00:00/2017-10-03T00:00:00"),
new Period("P1D"),
null,
- new CompactionState(partitionsSpec, indexSpec, ImmutableMap.of("queryGranularity", "minute"))
+ new CompactionState(partitionsSpec, null, indexSpec, ImmutableMap.of("queryGranularity", "minute"))
),
new SegmentGenerateSpec(
Intervals.of("2017-10-03T00:00:00/2017-10-04T00:00:00"),
new Period("P1D"),
null,
- new CompactionState(partitionsSpec, indexSpec, ImmutableMap.of())
+ new CompactionState(partitionsSpec, null, indexSpec, ImmutableMap.of())
)
);
@@ -1058,6 +1060,100 @@ public class NewestSegmentFirstPolicyTest
}
@Test
+ public void testIteratorReturnsSegmentsAsSegmentsWasCompactedAndHaveDifferentDimensions()
+ {
+ // Same indexSpec as what is set in the auto compaction config
+ Map<String, Object> indexSpec = mapper.convertValue(new IndexSpec(), new TypeReference<Map<String, Object>>() {});
+ // Same partitionsSpec as what is set in the auto compaction config
+ PartitionsSpec partitionsSpec = NewestSegmentFirstIterator.findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig.from(null, null));
+
+ // Create segments that were compacted (CompactionState != null) and have
+ // Dimensions=["foo", "bar"] for interval 2017-10-01T00:00:00/2017-10-02T00:00:00,
+ // Dimensions=["foo"] for interval 2017-10-02T00:00:00/2017-10-03T00:00:00,
+ // Dimensions=null for interval 2017-10-03T00:00:00/2017-10-04T00:00:00 (dimensions was not set during last compaction)
+ // and dimensionsSpec=null for interval 2017-10-04T00:00:00/2017-10-05T00:00:00 (dimensionsSpec was not set during last compaction)
+ final VersionedIntervalTimeline<String, DataSegment> timeline = createTimeline(
+ new SegmentGenerateSpec(
+ Intervals.of("2017-10-01T00:00:00/2017-10-02T00:00:00"),
+ new Period("P1D"),
+ null,
+ new CompactionState(partitionsSpec, new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("bar", "foo")), null, null), indexSpec, null)
+ ),
+ new SegmentGenerateSpec(
+ Intervals.of("2017-10-02T00:00:00/2017-10-03T00:00:00"),
+ new Period("P1D"),
+ null,
+ new CompactionState(partitionsSpec, new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("foo")), null, null), indexSpec, null)
+ ),
+ new SegmentGenerateSpec(
+ Intervals.of("2017-10-03T00:00:00/2017-10-04T00:00:00"),
+ new Period("P1D"),
+ null,
+ new CompactionState(partitionsSpec, new DimensionsSpec(null, null, null), indexSpec, null)
+ ),
+ new SegmentGenerateSpec(
+ Intervals.of("2017-10-04T00:00:00/2017-10-05T00:00:00"),
+ new Period("P1D"),
+ null,
+ new CompactionState(partitionsSpec, null, indexSpec, null)
+ )
+ );
+
+ // Auto compaction config sets Dimensions=["foo"]
+ CompactionSegmentIterator iterator = policy.reset(
+ ImmutableMap.of(DATA_SOURCE, createCompactionConfig(
+ 130000,
+ new Period("P0D"),
+ null,
+ new UserCompactionTaskDimensionsConfig(DimensionsSpec.getDefaultSchemas(ImmutableList.of("foo")))
+ )),
+ ImmutableMap.of(DATA_SOURCE, timeline),
+ Collections.emptyMap()
+ );
+ // We should get interval 2017-10-01T00:00:00/2017-10-02T00:00:00, interval 2017-10-04T00:00:00/2017-10-05T00:00:00, and interval 2017-10-03T00:00:00/2017-10-04T00:00:00.
+ Assert.assertTrue(iterator.hasNext());
+ List<DataSegment> expectedSegmentsToCompact = new ArrayList<>(
+ timeline.findNonOvershadowedObjectsInInterval(Intervals.of("2017-10-04T00:00:00/2017-10-05T00:00:00"), Partitions.ONLY_COMPLETE)
+ );
+ Assert.assertEquals(
+ ImmutableSet.copyOf(expectedSegmentsToCompact),
+ ImmutableSet.copyOf(iterator.next())
+ );
+ Assert.assertTrue(iterator.hasNext());
+ expectedSegmentsToCompact = new ArrayList<>(
+ timeline.findNonOvershadowedObjectsInInterval(Intervals.of("2017-10-03T00:00:00/2017-10-04T00:00:00"), Partitions.ONLY_COMPLETE)
+ );
+ Assert.assertEquals(
+ ImmutableSet.copyOf(expectedSegmentsToCompact),
+ ImmutableSet.copyOf(iterator.next())
+ );
+ Assert.assertTrue(iterator.hasNext());
+ expectedSegmentsToCompact = new ArrayList<>(
+ timeline.findNonOvershadowedObjectsInInterval(Intervals.of("2017-10-01T00:00:00/2017-10-02T00:00:00"), Partitions.ONLY_COMPLETE)
+ );
+ Assert.assertEquals(
+ ImmutableSet.copyOf(expectedSegmentsToCompact),
+ ImmutableSet.copyOf(iterator.next())
+ );
+ // No more
+ Assert.assertFalse(iterator.hasNext());
+
+ // Auto compaction config sets Dimensions=null
+ iterator = policy.reset(
+ ImmutableMap.of(DATA_SOURCE, createCompactionConfig(
+ 130000,
+ new Period("P0D"),
+ null,
+ new UserCompactionTaskDimensionsConfig(null)
+ )),
+ ImmutableMap.of(DATA_SOURCE, timeline),
+ Collections.emptyMap()
+ );
+ // No more
+ Assert.assertFalse(iterator.hasNext());
+ }
+
+ @Test
public void testIteratorReturnsSegmentsSmallerSegmentGranularityCoveringMultipleSegmentsInTimeline()
{
final VersionedIntervalTimeline<String, DataSegment> timeline = createTimeline(
@@ -1101,7 +1197,7 @@ public class NewestSegmentFirstPolicyTest
Intervals.of("2017-10-02T00:00:00/2017-10-03T00:00:00"),
new Period("P1D"),
null,
- new CompactionState(partitionsSpec, newIndexSpecMap, null)
+ new CompactionState(partitionsSpec, null, newIndexSpecMap, null)
)
);
@@ -1234,6 +1330,16 @@ public class NewestSegmentFirstPolicyTest
UserCompactionTaskGranularityConfig granularitySpec
)
{
+ return createCompactionConfig(inputSegmentSizeBytes, skipOffsetFromLatest, granularitySpec, null);
+ }
+
+ private DataSourceCompactionConfig createCompactionConfig(
+ long inputSegmentSizeBytes,
+ Period skipOffsetFromLatest,
+ UserCompactionTaskGranularityConfig granularitySpec,
+ UserCompactionTaskDimensionsConfig dimensionsSpec
+ )
+ {
return new DataSourceCompactionConfig(
DATA_SOURCE,
0,
@@ -1242,6 +1348,7 @@ public class NewestSegmentFirstPolicyTest
skipOffsetFromLatest,
null,
granularitySpec,
+ dimensionsSpec,
null,
null
);
diff --git a/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java b/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java
index 353550d..fc16507 100644
--- a/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java
+++ b/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java
@@ -57,6 +57,7 @@ public class CoordinatorCompactionConfigsResourceTest
null,
new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, null),
null,
+ null,
ImmutableMap.of("key", "val")
);
private static final byte[] OLD_CONFIG_IN_BYTES = {1, 2, 3};
@@ -152,6 +153,7 @@ public class CoordinatorCompactionConfigsResourceTest
null,
new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, true),
null,
+ null,
ImmutableMap.of("key", "val")
);
String author = "maytas";
@@ -192,6 +194,7 @@ public class CoordinatorCompactionConfigsResourceTest
null,
new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, null),
null,
+ null,
ImmutableMap.of("key", "val")
);
final CoordinatorCompactionConfig originalConfig = CoordinatorCompactionConfig.from(ImmutableList.of(toDelete));
@@ -313,6 +316,7 @@ public class CoordinatorCompactionConfigsResourceTest
null,
new UserCompactionTaskGranularityConfig(Granularities.HOUR, null, null),
null,
+ null,
ImmutableMap.of("key", "val")
);
String author = "maytas";
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
index 3936855..72740a5 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java
@@ -275,6 +275,7 @@ public class SystemSchemaTest extends CalciteTestBase
private final CompactionState expectedCompactionState = new CompactionState(
new DynamicPartitionsSpec(null, null),
+ null,
Collections.singletonMap("test", "map"),
Collections.singletonMap("test2", "map2")
);
diff --git a/website/.spelling b/website/.spelling
index c2b9dd9..9d474f7 100644
--- a/website/.spelling
+++ b/website/.spelling
@@ -1775,6 +1775,7 @@ cpuacct
dataSourceName
datetime
defaultHistory
+dimensionsSpec
doubleMax
doubleMin
doubleSum
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org