You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ag...@apache.org on 2022/04/15 16:08:14 UTC
[druid] branch master updated: Make tombstones ingestible by having them return an empty result set. (#12392)
This is an automated email from the ASF dual-hosted git repository.
agonzalez pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 0460d45e92 Make tombstones ingestible by having them return an empty result set. (#12392)
0460d45e92 is described below
commit 0460d45e92a15ebdadb4455afa16a7d977f74388
Author: Agustin Gonzalez <ag...@imply.io>
AuthorDate: Fri Apr 15 09:08:06 2022 -0700
Make tombstones ingestible by having them return an empty result set. (#12392)
* Make tombstones ingestible by having them return an empty result set.
* Spotbug
* Coverage
* Coverage
* Remove unnecessary exception (checkstyle)
* Fix integration test and add one more to test dropExisting set to false over tombstones
* Force dropExisting to true in auto-compaction when the interval contains only tombstones
* Checkstyle, fix unit test
* Changed flag by mistake, fixing it
* Remove method from interface since this method is specific to only DruidSegmentInputentity
* Fix typo
* Adapt to latest code
* Update comments when only tombstones to compact
* Move empty iterator to a new DruidTombstoneSegmentReader
* Code review feedback
* Checkstyle
* Review feedback
* Coverage
---
.../org/apache/druid/common/guava/GuavaUtils.java | 5 +-
.../org/apache/druid/data/input/InputEntity.java | 1 +
.../druid/indexing/common/task/CompactionTask.java | 165 ++++++++++--------
.../druid/indexing/input/DruidInputSource.java | 1 -
.../indexing/input/DruidSegmentInputEntity.java | 6 +
.../indexing/input/DruidSegmentInputFormat.java | 31 +++-
.../druid/indexing/input/DruidSegmentReader.java | 6 +-
.../input/DruidTombstoneSegmentReader.java | 88 ++++++++++
.../common/task/CompactionTaskRunTest.java | 116 +++++++++++++
.../parallel/DimensionCardinalityReportTest.java | 114 ++++++++++++
.../input/DruidSegmentInputFormatTest.java | 88 ++++++++++
.../indexing/input/DruidSegmentReaderTest.java | 114 +++++++++++-
.../coordinator/duty/ITAutoCompactionTest.java | 193 +++++++++++++++++++--
.../server/coordinator/duty/CompactSegments.java | 20 +++
14 files changed, 844 insertions(+), 104 deletions(-)
diff --git a/core/src/main/java/org/apache/druid/common/guava/GuavaUtils.java b/core/src/main/java/org/apache/druid/common/guava/GuavaUtils.java
index b3455393e7..f8b9dd03b7 100644
--- a/core/src/main/java/org/apache/druid/common/guava/GuavaUtils.java
+++ b/core/src/main/java/org/apache/druid/common/guava/GuavaUtils.java
@@ -71,7 +71,7 @@ public class GuavaUtils
/**
* If first argument is not null, return it, else return the other argument. Sort of like
- * {@link com.google.common.base.Objects#firstNonNull(Object, Object)} except will not explode if both arguments are
+ * {@link com.google.common.base.Objects#firstNonNull(T, T)} except will not explode if both arguments are
* null.
*/
@Nullable
@@ -85,7 +85,8 @@ public class GuavaUtils
/**
* Cancel futures manually, because sometime we can't cancel all futures in {@link com.google.common.util.concurrent.Futures.CombinedFuture}
- * automatically. Especially when we call {@link com.google.common.util.concurrent.Futures#allAsList(Iterable)} to create a batch of
+ * automatically. Especially when we call
+ * {@link static <V> ListenableFuture<List<V>> com.google.common.util.concurrent.Futures#allAsList(Iterable<? extends ListenableFuture <? extends V>> futures)} to create a batch of
* future.
* @param mayInterruptIfRunning {@code true} if the thread executing this
* task should be interrupted; otherwise, in-progress tasks are allowed
diff --git a/core/src/main/java/org/apache/druid/data/input/InputEntity.java b/core/src/main/java/org/apache/druid/data/input/InputEntity.java
index 343e245e22..6765ae8271 100644
--- a/core/src/main/java/org/apache/druid/data/input/InputEntity.java
+++ b/core/src/main/java/org/apache/druid/data/input/InputEntity.java
@@ -136,4 +136,5 @@ public interface InputEntity
{
return Predicates.alwaysFalse();
}
+
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
index 66e0c16a70..067f75ec7d 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
@@ -69,6 +69,7 @@ import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.java.util.common.NonnullPair;
+import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularity;
@@ -111,6 +112,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Optional;
import java.util.TreeMap;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -550,6 +552,7 @@ public class CompactionTask extends AbstractBatchIndexTask
segmentProvider,
lockGranularityInUse
);
+
final Map<DataSegment, File> segmentFileMap = pair.lhs;
final List<TimelineObjectHolder<String, DataSegment>> timelineSegments = pair.rhs;
@@ -557,9 +560,10 @@ public class CompactionTask extends AbstractBatchIndexTask
return Collections.emptyList();
}
- // find metadata for interval
+ // find metadata for intervals with real data segments
// queryableIndexAndSegments is sorted by the interval of the dataSegment
- final List<NonnullPair<QueryableIndex, DataSegment>> queryableIndexAndSegments = loadSegments(
+ // Note that this list will contain null QueriableIndex values for tombstones
+ final List<Pair<QueryableIndex, DataSegment>> queryableIndexAndSegments = loadSegments(
timelineSegments,
segmentFileMap,
toolbox.getIndexIO()
@@ -568,8 +572,10 @@ public class CompactionTask extends AbstractBatchIndexTask
final CompactionTuningConfig compactionTuningConfig = partitionConfigurationManager.computeTuningConfig();
if (granularitySpec == null || granularitySpec.getSegmentGranularity() == null) {
+ final List<ParallelIndexIngestionSpec> specs = new ArrayList<>();
+
// original granularity
- final Map<Interval, List<NonnullPair<QueryableIndex, DataSegment>>> intervalToSegments = new TreeMap<>(
+ final Map<Interval, List<Pair<QueryableIndex, DataSegment>>> intervalToSegments = new TreeMap<>(
Comparators.intervalsByStartThenEnd()
);
queryableIndexAndSegments.forEach(
@@ -578,11 +584,11 @@ public class CompactionTask extends AbstractBatchIndexTask
);
// unify overlapping intervals to ensure overlapping segments compacting in the same indexSpec
- List<NonnullPair<Interval, List<NonnullPair<QueryableIndex, DataSegment>>>> intervalToSegmentsUnified =
+ List<NonnullPair<Interval, List<Pair<QueryableIndex, DataSegment>>>> intervalToSegmentsUnified =
new ArrayList<>();
Interval union = null;
- List<NonnullPair<QueryableIndex, DataSegment>> segments = new ArrayList<>();
- for (Entry<Interval, List<NonnullPair<QueryableIndex, DataSegment>>> entry : intervalToSegments.entrySet()) {
+ List<Pair<QueryableIndex, DataSegment>> segments = new ArrayList<>();
+ for (Entry<Interval, List<Pair<QueryableIndex, DataSegment>>> entry : intervalToSegments.entrySet()) {
Interval cur = entry.getKey();
if (union == null) {
union = cur;
@@ -596,12 +602,12 @@ public class CompactionTask extends AbstractBatchIndexTask
segments = new ArrayList<>(entry.getValue());
}
}
+
intervalToSegmentsUnified.add(new NonnullPair<>(union, segments));
- final List<ParallelIndexIngestionSpec> specs = new ArrayList<>(intervalToSegmentsUnified.size());
- for (NonnullPair<Interval, List<NonnullPair<QueryableIndex, DataSegment>>> entry : intervalToSegmentsUnified) {
+ for (NonnullPair<Interval, List<Pair<QueryableIndex, DataSegment>>> entry : intervalToSegmentsUnified) {
final Interval interval = entry.lhs;
- final List<NonnullPair<QueryableIndex, DataSegment>> segmentsToCompact = entry.rhs;
+ final List<Pair<QueryableIndex, DataSegment>> segmentsToCompact = entry.rhs;
// If granularitySpec is not null, then set segmentGranularity. Otherwise,
// creates new granularitySpec and set segmentGranularity
Granularity segmentGranularityToUse = GranularityType.fromPeriod(interval.toPeriod()).getDefaultGranularity();
@@ -700,22 +706,19 @@ public class CompactionTask extends AbstractBatchIndexTask
LockGranularity lockGranularityInUse
) throws IOException, SegmentLoadingException
{
- final List<DataSegment> usedSegmentsMinusTombstones =
- segmentProvider.findSegments(toolbox.getTaskActionClient())
- .stream()
- .filter(dataSegment -> !dataSegment.isTombstone()) // skip tombstones
- .collect(Collectors.toList());
- segmentProvider.checkSegments(lockGranularityInUse, usedSegmentsMinusTombstones);
- final Map<DataSegment, File> segmentFileMap = toolbox.fetchSegments(usedSegmentsMinusTombstones);
+ final List<DataSegment> usedSegments =
+ segmentProvider.findSegments(toolbox.getTaskActionClient());
+ segmentProvider.checkSegments(lockGranularityInUse, usedSegments);
+ final Map<DataSegment, File> segmentFileMap = toolbox.fetchSegments(usedSegments);
final List<TimelineObjectHolder<String, DataSegment>> timelineSegments = VersionedIntervalTimeline
- .forSegments(usedSegmentsMinusTombstones)
+ .forSegments(usedSegments)
.lookup(segmentProvider.interval);
return new NonnullPair<>(segmentFileMap, timelineSegments);
}
private static DataSchema createDataSchema(
String dataSource,
- List<NonnullPair<QueryableIndex, DataSegment>> queryableIndexAndSegments,
+ List<Pair<QueryableIndex, DataSegment>> queryableIndexAndSegments,
@Nullable DimensionsSpec dimensionsSpec,
@Nullable ClientCompactionTaskTransformSpec transformSpec,
@Nullable AggregatorFactory[] metricsSpec,
@@ -781,34 +784,36 @@ public class CompactionTask extends AbstractBatchIndexTask
private static void decideRollupAndQueryGranularityCarryOver(
SettableSupplier<Boolean> rollup,
SettableSupplier<Granularity> queryGranularity,
- List<NonnullPair<QueryableIndex, DataSegment>> queryableIndexAndSegments
+ List<Pair<QueryableIndex, DataSegment>> queryableIndexAndSegments
)
{
final SettableSupplier<Boolean> rollupIsValid = new SettableSupplier<>(true);
- for (NonnullPair<QueryableIndex, DataSegment> pair : queryableIndexAndSegments) {
+ for (Pair<QueryableIndex, DataSegment> pair : queryableIndexAndSegments) {
final QueryableIndex index = pair.lhs;
- if (index.getMetadata() == null) {
- throw new RE("Index metadata doesn't exist for segment[%s]", pair.rhs.getId());
- }
- // carry-overs (i.e. query granularity & rollup) are valid iff they are the same in every segment:
-
- // Pick rollup value if all segments being compacted have the same, non-null, value otherwise set it to false
- if (rollupIsValid.get()) {
- Boolean isRollup = index.getMetadata().isRollup();
- if (isRollup == null) {
- rollupIsValid.set(false);
- rollup.set(false);
- } else if (rollup.get() == null) {
- rollup.set(isRollup);
- } else if (!rollup.get().equals(isRollup.booleanValue())) {
- rollupIsValid.set(false);
- rollup.set(false);
+ if (index != null) { // avoid tombstones
+ if (index.getMetadata() == null) {
+ throw new RE("Index metadata doesn't exist for segment[%s]", pair.rhs.getId());
+ }
+ // carry-overs (i.e. query granularity & rollup) are valid iff they are the same in every segment:
+
+ // Pick rollup value if all segments being compacted have the same, non-null, value otherwise set it to false
+ if (rollupIsValid.get()) {
+ Boolean isRollup = index.getMetadata().isRollup();
+ if (isRollup == null) {
+ rollupIsValid.set(false);
+ rollup.set(false);
+ } else if (rollup.get() == null) {
+ rollup.set(isRollup);
+ } else if (!rollup.get().equals(isRollup.booleanValue())) {
+ rollupIsValid.set(false);
+ rollup.set(false);
+ }
}
- }
- // Pick the finer, non-null, of the query granularities of the segments being compacted
- Granularity current = index.getMetadata().getQueryGranularity();
- queryGranularity.set(compareWithCurrent(queryGranularity.get(), current));
+ // Pick the finer, non-null, of the query granularities of the segments being compacted
+ Granularity current = index.getMetadata().getQueryGranularity();
+ queryGranularity.set(compareWithCurrent(queryGranularity.get(), current));
+ }
}
}
@@ -828,22 +833,28 @@ public class CompactionTask extends AbstractBatchIndexTask
}
private static AggregatorFactory[] createMetricsSpec(
- List<NonnullPair<QueryableIndex, DataSegment>> queryableIndexAndSegments
+ List<Pair<QueryableIndex, DataSegment>> queryableIndexAndSegments
)
{
final List<AggregatorFactory[]> aggregatorFactories = queryableIndexAndSegments
.stream()
+ .filter(pair -> pair.lhs != null) // avoid tombstones
.map(pair -> pair.lhs.getMetadata().getAggregators()) // We have already done null check on index.getMetadata()
.collect(Collectors.toList());
final AggregatorFactory[] mergedAggregators = AggregatorFactory.mergeAggregators(aggregatorFactories);
if (mergedAggregators == null) {
- throw new ISE("Failed to merge aggregators[%s]", aggregatorFactories);
+ Optional<Pair<QueryableIndex, DataSegment>> pair =
+ queryableIndexAndSegments.stream().filter(p -> !p.rhs.isTombstone()).findFirst();
+ if (pair.isPresent()) {
+ // this means that there are true data segments, so something went wrong
+ throw new ISE("Failed to merge aggregators[%s]", aggregatorFactories);
+ }
}
return mergedAggregators;
}
- private static DimensionsSpec createDimensionsSpec(List<NonnullPair<QueryableIndex, DataSegment>> queryableIndices)
+ private static DimensionsSpec createDimensionsSpec(List<Pair<QueryableIndex, DataSegment>> queryableIndices)
{
final BiMap<String, Integer> uniqueDims = HashBiMap.create();
final Map<String, DimensionSchema> dimensionSchemaMap = new HashMap<>();
@@ -859,33 +870,35 @@ public class CompactionTask extends AbstractBatchIndexTask
);
int index = 0;
- for (NonnullPair<QueryableIndex, DataSegment> pair : Lists.reverse(queryableIndices)) {
+ for (Pair<QueryableIndex, DataSegment> pair : Lists.reverse(queryableIndices)) {
final QueryableIndex queryableIndex = pair.lhs;
- final Map<String, DimensionHandler> dimensionHandlerMap = queryableIndex.getDimensionHandlers();
+ if (queryableIndex != null) { // avoid tombstones
+ final Map<String, DimensionHandler> dimensionHandlerMap = queryableIndex.getDimensionHandlers();
- for (String dimension : queryableIndex.getAvailableDimensions()) {
- final ColumnHolder columnHolder = Preconditions.checkNotNull(
- queryableIndex.getColumnHolder(dimension),
- "Cannot find column for dimension[%s]",
- dimension
- );
-
- if (!uniqueDims.containsKey(dimension)) {
- final DimensionHandler dimensionHandler = Preconditions.checkNotNull(
- dimensionHandlerMap.get(dimension),
- "Cannot find dimensionHandler for dimension[%s]",
+ for (String dimension : queryableIndex.getAvailableDimensions()) {
+ final ColumnHolder columnHolder = Preconditions.checkNotNull(
+ queryableIndex.getColumnHolder(dimension),
+ "Cannot find column for dimension[%s]",
dimension
);
- uniqueDims.put(dimension, index++);
- dimensionSchemaMap.put(
- dimension,
- createDimensionSchema(
- dimension,
- columnHolder.getCapabilities(),
- dimensionHandler.getMultivalueHandling()
- )
- );
+ if (!uniqueDims.containsKey(dimension)) {
+ final DimensionHandler dimensionHandler = Preconditions.checkNotNull(
+ dimensionHandlerMap.get(dimension),
+ "Cannot find dimensionHandler for dimension[%s]",
+ dimension
+ );
+
+ uniqueDims.put(dimension, index++);
+ dimensionSchemaMap.put(
+ dimension,
+ createDimensionSchema(
+ dimension,
+ columnHolder.getCapabilities(),
+ dimensionHandler.getMultivalueHandling()
+ )
+ );
+ }
}
}
}
@@ -905,25 +918,33 @@ public class CompactionTask extends AbstractBatchIndexTask
return new DimensionsSpec(dimensionSchemas);
}
- private static List<NonnullPair<QueryableIndex, DataSegment>> loadSegments(
+ /**
+ * This private method does not load, does not create QueryableIndices, for tombstones since tombstones
+ * do not have a file image, they are never pushed to deep storage. Thus, for the case of a tombstone,
+ * The return list
+ * will contain a null for the QueryableIndex slot in the pair (lhs)
+ */
+ private static List<Pair<QueryableIndex, DataSegment>> loadSegments(
List<TimelineObjectHolder<String, DataSegment>> timelineObjectHolders,
Map<DataSegment, File> segmentFileMap,
IndexIO indexIO
) throws IOException
{
- final List<NonnullPair<QueryableIndex, DataSegment>> segments = new ArrayList<>();
+ final List<Pair<QueryableIndex, DataSegment>> segments = new ArrayList<>();
for (TimelineObjectHolder<String, DataSegment> timelineObjectHolder : timelineObjectHolders) {
final PartitionHolder<DataSegment> partitionHolder = timelineObjectHolder.getObject();
for (PartitionChunk<DataSegment> chunk : partitionHolder) {
+ QueryableIndex queryableIndex = null;
final DataSegment segment = chunk.getObject();
- final QueryableIndex queryableIndex = indexIO.loadIndex(
- Preconditions.checkNotNull(segmentFileMap.get(segment), "File for segment %s", segment.getId())
- );
- segments.add(new NonnullPair<>(queryableIndex, segment));
+ if (!chunk.getObject().isTombstone()) {
+ queryableIndex = indexIO.loadIndex(
+ Preconditions.checkNotNull(segmentFileMap.get(segment), "File for segment %s", segment.getId())
+ );
+ }
+ segments.add(new Pair<>(queryableIndex, segment));
}
}
-
return segments;
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java
index 6f1a6d6446..d10f3d3e9f 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java
@@ -237,7 +237,6 @@ public class DruidInputSource extends AbstractInputSource implements SplittableI
//noinspection ConstantConditions
return FluentIterable
.from(partitionHolder)
- .filter(chunk -> !chunk.getObject().isTombstone())
.transform(chunk -> new DruidSegmentInputEntity(segmentCacheManager, chunk.getObject(), holder.getInterval()));
}).iterator();
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentInputEntity.java b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentInputEntity.java
index 63f2fe25c5..3396195a21 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentInputEntity.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentInputEntity.java
@@ -91,4 +91,10 @@ public class DruidSegmentInputEntity implements InputEntity
}
};
}
+
+ public boolean isFromTombstone()
+ {
+ return segment.isTombstone();
+ }
+
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentInputFormat.java b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentInputFormat.java
index 4d028596ff..732288e854 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentInputFormat.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentInputFormat.java
@@ -19,6 +19,7 @@
package org.apache.druid.indexing.input;
+import com.google.common.base.Preconditions;
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputFormat;
@@ -55,14 +56,28 @@ public class DruidSegmentInputFormat implements InputFormat
File temporaryDirectory
)
{
- return new DruidSegmentReader(
- source,
- indexIO,
- inputRowSchema.getTimestampSpec(),
- inputRowSchema.getDimensionsSpec(),
- inputRowSchema.getColumnsFilter(),
- dimFilter,
- temporaryDirectory
+ // this method handles the case when the entity comes from a tombstone or from a regular segment
+ Preconditions.checkArgument(
+ source instanceof DruidSegmentInputEntity,
+ DruidSegmentInputEntity.class.getName() + " required, but "
+ + source.getClass().getName() + " provided."
);
+
+ final InputEntityReader retVal;
+ // Cast is safe here because of the precondition above passed
+ if (((DruidSegmentInputEntity) source).isFromTombstone()) {
+ retVal = new DruidTombstoneSegmentReader(source);
+ } else {
+ retVal = new DruidSegmentReader(
+ source,
+ indexIO,
+ inputRowSchema.getTimestampSpec(),
+ inputRowSchema.getDimensionsSpec(),
+ inputRowSchema.getColumnsFilter(),
+ dimFilter,
+ temporaryDirectory
+ );
+ }
+ return retVal;
}
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentReader.java b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentReader.java
index 7117eea0a9..87181b8cb4 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentReader.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentReader.java
@@ -20,7 +20,6 @@
package org.apache.druid.indexing.input;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
@@ -72,7 +71,7 @@ import java.util.Set;
public class DruidSegmentReader extends IntermediateRowParsingReader<Map<String, Object>>
{
- private final DruidSegmentInputEntity source;
+ private DruidSegmentInputEntity source;
private final IndexIO indexIO;
private final ColumnsFilter columnsFilter;
private final InputRowSchema inputRowSchema;
@@ -89,7 +88,6 @@ public class DruidSegmentReader extends IntermediateRowParsingReader<Map<String,
final File temporaryDirectory
)
{
- Preconditions.checkArgument(source instanceof DruidSegmentInputEntity);
this.source = (DruidSegmentInputEntity) source;
this.indexIO = indexIO;
this.columnsFilter = columnsFilter;
@@ -105,7 +103,7 @@ public class DruidSegmentReader extends IntermediateRowParsingReader<Map<String,
@Override
protected CloseableIterator<Map<String, Object>> intermediateRowIterator() throws IOException
{
- final CleanableFile segmentFile = source.fetch(temporaryDirectory, null);
+ final CleanableFile segmentFile = source().fetch(temporaryDirectory, null);
final WindowedStorageAdapter storageAdapter = new WindowedStorageAdapter(
new QueryableIndexStorageAdapter(
indexIO.loadIndex(segmentFile.file())
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidTombstoneSegmentReader.java b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidTombstoneSegmentReader.java
new file mode 100644
index 0000000000..2a80112df8
--- /dev/null
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidTombstoneSegmentReader.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.input;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.druid.data.input.InputEntity;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.IntermediateRowParsingReader;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+
+/**
+ * This class will return an empty iterator since a tombstone has no data rows...
+ */
+public class DruidTombstoneSegmentReader extends IntermediateRowParsingReader<Map<String, Object>>
+{
+ private DruidSegmentInputEntity source;
+
+ public DruidTombstoneSegmentReader(
+ InputEntity source
+ )
+ {
+ this.source = (DruidSegmentInputEntity) source;
+ if (!this.source.isFromTombstone()) {
+ throw new IAE("DruidSegmentInputEntity must be created from a tombstone but is not.");
+ }
+ }
+
+ @Override
+ protected CloseableIterator<Map<String, Object>> intermediateRowIterator()
+ {
+ return new CloseableIterator<Map<String, Object>>()
+ {
+ @Override
+ public void close()
+ {
+
+ }
+
+ @Override
+ public boolean hasNext()
+ {
+ return false;
+ }
+
+ @Override
+ public Map<String, Object> next()
+ {
+ throw new NoSuchElementException();
+ }
+ };
+ }
+
+ @VisibleForTesting
+ @Override
+ protected List<InputRow> parseInputRows(Map<String, Object> intermediateRow)
+ {
+ throw new UnsupportedOperationException(getClass().getName());
+ }
+
+ @Override
+ protected List<Map<String, Object>> toMap(Map<String, Object> intermediateRow)
+ {
+ throw new UnsupportedOperationException(getClass().getName());
+ }
+
+}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
index 1c3463624e..bfd184e137 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java
@@ -82,6 +82,7 @@ import org.apache.druid.segment.loading.SegmentCacheManager;
import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.loading.SegmentLocalCacheManager;
import org.apache.druid.segment.loading.StorageLocationConfig;
+import org.apache.druid.segment.loading.TombstoneLoadSpec;
import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider;
import org.apache.druid.segment.realtime.firehose.WindowedStorageAdapter;
import org.apache.druid.server.security.AuthTestUtils;
@@ -1099,6 +1100,120 @@ public class CompactionTaskRunTest extends IngestionTestBase
}
+ @Test
+ public void testCompactDatasourceOverIntervalWithOnlyTombstones() throws Exception
+ {
+ // This test fails with segment lock because of the bug reported in https://github.com/apache/druid/issues/10911.
+ if (lockGranularity == LockGranularity.SEGMENT) {
+ return;
+ }
+
+ // The following task creates (several, more than three, last time I checked, six) HOUR segments with intervals of
+ // - 2014-01-01T00:00:00/2014-01-01T01:00:00
+ // - 2014-01-01T01:00:00/2014-01-01T02:00:00
+ // - 2014-01-01T02:00:00/2014-01-01T03:00:00
+ // The six segments are:
+ // three rows in hour 00:
+ // 2014-01-01T00:00:00.000Z_2014-01-01T01:00:00.000Z with two rows
+ // 2014-01-01T00:00:00.000Z_2014-01-01T01:00:00.000Z_1 with one row
+ // three rows in hour 01:
+ // 2014-01-01T01:00:00.000Z_2014-01-01T02:00:00.000Z with two rows
+ // 2014-01-01T01:00:00.000Z_2014-01-01T02:00:00.000Z_1 with one row
+ // four rows in hour 02:
+ // 2014-01-01T02:00:00.000Z_2014-01-01T03:00:00.000Z with two rows
+ // 2014-01-01T02:00:00.000Z_2014-01-01T03:00:00.000Z_1 with two rows
+ // there are 10 rows total in data set
+
+ // maxRowsPerSegment is set to 2 inside the runIndexTask methods
+ Pair<TaskStatus, List<DataSegment>> result = runIndexTask();
+ Assert.assertEquals(6, result.rhs.size());
+
+ final Builder builder = new Builder(
+ DATA_SOURCE,
+ segmentCacheManagerFactory,
+ RETRY_POLICY_FACTORY
+ );
+
+ // Setup partial interval compaction:
+ // Change the granularity from HOUR to MINUTE through compaction for hour 01, there are three rows in the compaction
+ // interval,
+ // all three in the same timestamp (see TEST_ROWS), this should generate one segment in same, first, minute
+ // (task will now use
+ // the default rows per segments since compaction's tuning config is null) and
+ // 59 tombstones to completely overshadow the existing hour 01 segment. Since the segments outside the
+ // compaction interval should remanin unchanged there should be a total of 1 + (2 + 59) + 2 = 64 segments
+
+ // **** PARTIAL COMPACTION: hour -> minute ****
+ final Interval compactionPartialInterval = Intervals.of("2014-01-01T01:00:00/2014-01-01T02:00:00");
+ final CompactionTask partialCompactionTask = builder
+ .segmentGranularity(Granularities.MINUTE)
+ // Set dropExisting to true
+ .inputSpec(new CompactionIntervalSpec(compactionPartialInterval, null), true)
+ .build();
+ final Pair<TaskStatus, List<DataSegment>> partialCompactionResult = runTask(partialCompactionTask);
+ Assert.assertTrue(partialCompactionResult.lhs.isSuccess());
+
+ // Segments that did not belong in the compaction interval (hours 00 and 02) are expected unchanged
+ // add 2 unchanged segments for hour 00:
+ final Set<DataSegment> expectedSegments = new HashSet<>();
+ expectedSegments.addAll(
+ getStorageCoordinator().retrieveUsedSegmentsForIntervals(
+ DATA_SOURCE,
+ Collections.singletonList(Intervals.of("2014-01-01T00:00:00/2014-01-01T01:00:00")),
+ Segments.ONLY_VISIBLE
+ )
+ );
+ // add 2 unchanged segments for hour 02:
+ expectedSegments.addAll(
+ getStorageCoordinator().retrieveUsedSegmentsForIntervals(
+ DATA_SOURCE,
+ Collections.singletonList(Intervals.of("2014-01-01T02:00:00/2014-01-01T03:00:00")),
+ Segments.ONLY_VISIBLE
+ )
+ );
+ expectedSegments.addAll(partialCompactionResult.rhs);
+ Assert.assertEquals(64, expectedSegments.size());
+
+ // New segments that were compacted are expected. However, old segments of the compacted interval should be
+ // overshadowed by the new tombstones (59) being created for all minutes other than 01:01
+ final Set<DataSegment> segmentsAfterPartialCompaction = new HashSet<>(
+ getStorageCoordinator().retrieveUsedSegmentsForIntervals(
+ DATA_SOURCE,
+ Collections.singletonList(Intervals.of("2014-01-01/2014-01-02")),
+ Segments.ONLY_VISIBLE
+ )
+ );
+ Assert.assertEquals(expectedSegments, segmentsAfterPartialCompaction);
+ final List<DataSegment> realSegmentsAfterPartialCompaction =
+ segmentsAfterPartialCompaction.stream()
+ .filter(s -> !s.isTombstone())
+ .collect(Collectors.toList());
+ final List<DataSegment> tombstonesAfterPartialCompaction =
+ segmentsAfterPartialCompaction.stream()
+ .filter(s -> s.isTombstone())
+ .collect(Collectors.toList());
+ Assert.assertEquals(59, tombstonesAfterPartialCompaction.size());
+ Assert.assertEquals(5, realSegmentsAfterPartialCompaction.size());
+ Assert.assertEquals(64, segmentsAfterPartialCompaction.size());
+
+ // Now setup compaction over an interval with only tombstones, keeping same, minute granularity
+ final CompactionTask compactionTaskOverOnlyTombstones = builder
+ .segmentGranularity(null)
+ // Set dropExisting to true
+ // last 59 minutes of our 01, should be all tombstones
+ .inputSpec(new CompactionIntervalSpec(Intervals.of("2014-01-01T01:01:00/2014-01-01T02:00:00"), null), true)
+ .build();
+
+ // **** Compaction over tombstones ****
+ final Pair<TaskStatus, List<DataSegment>> resultOverOnlyTombstones = runTask(compactionTaskOverOnlyTombstones);
+ Assert.assertTrue(resultOverOnlyTombstones.lhs.isSuccess());
+
+ // compaction should not fail but since it is over the same granularity it should leave
+ // the tombstones unchanged
+ Assert.assertEquals(59, resultOverOnlyTombstones.rhs.size());
+ resultOverOnlyTombstones.rhs.forEach(t -> Assert.assertTrue(t.isTombstone()));
+ }
+
@Test
public void testPartialIntervalCompactWithFinerSegmentGranularityThenFullIntervalCompactWithDropExistingFalse() throws Exception
{
@@ -1454,6 +1569,7 @@ public class CompactionTaskRunTest extends IngestionTestBase
final ObjectMapper objectMapper = getObjectMapper();
objectMapper.registerSubtypes(new NamedType(LocalLoadSpec.class, "local"));
objectMapper.registerSubtypes(LocalDataSegmentPuller.class);
+ objectMapper.registerSubtypes(TombstoneLoadSpec.class);
final TaskToolbox box = createTaskToolbox(objectMapper, task);
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/DimensionCardinalityReportTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/DimensionCardinalityReportTest.java
index 29bbe5c97f..876458d917 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/DimensionCardinalityReportTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/DimensionCardinalityReportTest.java
@@ -179,4 +179,118 @@ public class DimensionCardinalityReportTest
intervalToNumShards
);
}
+
+
+ @Test
+ public void testLargeSupervisorDetermineNumShardsFromCardinalityReport()
+ {
+ List<DimensionCardinalityReport> reports = new ArrayList<>();
+
+ HllSketch collector1 = DimensionCardinalityReport.createHllSketchForReport();
+ collector1.update(IndexTask.HASH_FUNCTION.hashLong(1L).asBytes());
+ collector1.update(IndexTask.HASH_FUNCTION.hashLong(200L).asBytes());
+ DimensionCardinalityReport report1 = new DimensionCardinalityReport(
+ "taskA",
+ ImmutableMap.of(
+ Intervals.of("1970-01-01T00:00:00.000Z/1970-01-02T00:00:00.000Z"),
+ collector1.toCompactByteArray()
+ )
+ );
+ reports.add(report1);
+
+ HllSketch collector2 = DimensionCardinalityReport.createHllSketchForReport();
+ collector2.update(IndexTask.HASH_FUNCTION.hashLong(1000L).asBytes());
+ collector2.update(IndexTask.HASH_FUNCTION.hashLong(30000L).asBytes());
+ DimensionCardinalityReport report2 = new DimensionCardinalityReport(
+ "taskB",
+ ImmutableMap.of(
+ Intervals.of("1970-01-01T00:00:00.000Z/1970-01-02T00:00:00.000Z"),
+ collector2.toCompactByteArray()
+ )
+ );
+ reports.add(report2);
+
+ // Separate interval with only 1 value
+ HllSketch collector3 = DimensionCardinalityReport.createHllSketchForReport();
+ collector3.update(IndexTask.HASH_FUNCTION.hashLong(99000L).asBytes());
+ DimensionCardinalityReport report3 = new DimensionCardinalityReport(
+ "taskC",
+ ImmutableMap.of(
+ Intervals.of("1970-01-02T00:00:00.000Z/1970-01-03T00:00:00.000Z"),
+ collector3.toCompactByteArray()
+ )
+ );
+ reports.add(report3);
+
+ // first interval in test has cardinality 4
+ Map<Interval, Integer> intervalToNumShards = ParallelIndexSupervisorTask.determineNumShardsFromCardinalityReport(
+ reports,
+ 1
+ );
+ Assert.assertEquals(
+ ImmutableMap.of(
+ Intervals.of("1970-01-01/P1D"),
+ 4,
+ Intervals.of("1970-01-02/P1D"),
+ 1
+ ),
+ intervalToNumShards
+ );
+
+ intervalToNumShards = ParallelIndexSupervisorTask.determineNumShardsFromCardinalityReport(
+ reports,
+ 2
+ );
+ Assert.assertEquals(
+ ImmutableMap.of(
+ Intervals.of("1970-01-01/P1D"),
+ 2,
+ Intervals.of("1970-01-02/P1D"),
+ 1
+ ),
+ intervalToNumShards
+ );
+
+ intervalToNumShards = ParallelIndexSupervisorTask.determineNumShardsFromCardinalityReport(
+ reports,
+ 3
+ );
+ Assert.assertEquals(
+ ImmutableMap.of(
+ Intervals.of("1970-01-01/P1D"),
+ 1,
+ Intervals.of("1970-01-02/P1D"),
+ 1
+ ),
+ intervalToNumShards
+ );
+
+ intervalToNumShards = ParallelIndexSupervisorTask.determineNumShardsFromCardinalityReport(
+ reports,
+ 4
+ );
+ Assert.assertEquals(
+ ImmutableMap.of(
+ Intervals.of("1970-01-01/P1D"),
+ 1,
+ Intervals.of("1970-01-02/P1D"),
+ 1
+ ),
+ intervalToNumShards
+ );
+
+ intervalToNumShards = ParallelIndexSupervisorTask.determineNumShardsFromCardinalityReport(
+ reports,
+ 5
+ );
+ Assert.assertEquals(
+ ImmutableMap.of(
+ Intervals.of("1970-01-01/P1D"),
+ 1,
+ Intervals.of("1970-01-02/P1D"),
+ 1
+ ),
+ intervalToNumShards
+ );
+ }
}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/input/DruidSegmentInputFormatTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/input/DruidSegmentInputFormatTest.java
new file mode 100644
index 0000000000..910371a566
--- /dev/null
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/input/DruidSegmentInputFormatTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.input;
+
+import org.apache.druid.data.input.ColumnsFilter;
+import org.apache.druid.data.input.InputEntityReader;
+import org.apache.druid.data.input.InputRowSchema;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.FileEntity;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.java.util.common.Intervals;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+
+import static org.junit.Assert.assertThrows;
+
+
+public class DruidSegmentInputFormatTest
+{
+
+ private static final InputRowSchema INPUT_ROW_SCHEMA = new InputRowSchema(
+ new TimestampSpec("ts", "auto", null),
+ new DimensionsSpec(DimensionsSpec.getDefaultSchemas(Arrays.asList("ts", "name"))),
+ ColumnsFilter.all()
+ );
+
+
+ @Test
+ public void testDruidSegmentInputEntityReader()
+ {
+ DruidSegmentInputFormat format = new DruidSegmentInputFormat(null, null);
+ InputEntityReader reader = format.createReader(
+ INPUT_ROW_SCHEMA,
+ DruidSegmentReaderTest.makeInputEntity(Intervals.of("2000/P1D"), null),
+ null
+ );
+ Assert.assertTrue(reader instanceof DruidSegmentReader);
+ }
+
+
+ @Test
+ public void testDruidTombstoneSegmentInputEntityReader()
+ {
+ DruidSegmentInputFormat format = new DruidSegmentInputFormat(null, null);
+ InputEntityReader reader = format.createReader(
+ INPUT_ROW_SCHEMA,
+ DruidSegmentReaderTest.makeTombstoneInputEntity(Intervals.of("2000/P1D")),
+ null
+ );
+ Assert.assertTrue(reader instanceof DruidTombstoneSegmentReader);
+ }
+
+ @Test
+ public void testDruidSegmentInputEntityReaderBadEntity()
+ {
+ DruidSegmentInputFormat format = new DruidSegmentInputFormat(null, null);
+ Exception exception = assertThrows(IllegalArgumentException.class, () -> {
+ format.createReader(
+ INPUT_ROW_SCHEMA,
+ new FileEntity(null),
+ null
+ );
+ });
+ String expectedMessage =
+ "org.apache.druid.indexing.input.DruidSegmentInputEntity required, but org.apache.druid.data.input.impl.FileEntity provided.";
+ String actualMessage = exception.getMessage();
+ Assert.assertEquals(expectedMessage, actualMessage);
+ }
+}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/input/DruidSegmentReaderTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/input/DruidSegmentReaderTest.java
index 9301b891e4..1638e79e9a 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/input/DruidSegmentReaderTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/input/DruidSegmentReaderTest.java
@@ -30,6 +30,7 @@ import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.DoubleDimensionSchema;
+import org.apache.druid.data.input.impl.FileEntity;
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.hll.HyperLogLogCollector;
@@ -52,6 +53,7 @@ import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.loading.SegmentCacheManager;
import org.apache.druid.segment.writeout.OnHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.partition.TombstoneShardSpec;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Before;
@@ -68,6 +70,8 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
+import static org.junit.Assert.assertThrows;
+
public class DruidSegmentReaderTest extends NullHandlingTest
{
@Rule
@@ -186,6 +190,42 @@ public class DruidSegmentReaderTest extends NullHandlingTest
);
}
+ @Test
+ public void testDruidTombstoneSegmentReader() throws IOException
+ {
+ final DruidTombstoneSegmentReader reader = new DruidTombstoneSegmentReader(
+ makeTombstoneInputEntity(Intervals.of("2000/P1D"))
+ );
+
+ Assert.assertFalse(reader.intermediateRowIterator().hasNext());
+ Assert.assertEquals(
+ Collections.emptyList(),
+ readRows(reader)
+ );
+ }
+
+ @Test
+ public void testDruidTombstoneSegmentReaderBadEntity()
+ {
+ assertThrows(ClassCastException.class, () -> {
+ new DruidTombstoneSegmentReader(
+ new FileEntity(null));
+ });
+ }
+
+ @Test
+ public void testDruidTombstoneSegmentReaderNotCreatedFromTombstone()
+ {
+ Exception exception = assertThrows(IllegalArgumentException.class, () -> {
+ new DruidTombstoneSegmentReader(makeInputEntity(Intervals.of("2000/P1D")));
+ });
+ String expectedMessage =
+ "DruidSegmentInputEntity must be created from a tombstone but is not.";
+ String actualMessage = exception.getMessage();
+ Assert.assertEquals(expectedMessage, actualMessage);
+
+ }
+
@Test
public void testReaderAutoTimestampFormat() throws IOException
{
@@ -582,6 +622,11 @@ public class DruidSegmentReaderTest extends NullHandlingTest
}
private DruidSegmentInputEntity makeInputEntity(final Interval interval)
+ {
+ return makeInputEntity(interval, segmentDirectory);
+ }
+
+ public static DruidSegmentInputEntity makeInputEntity(final Interval interval, final File segmentDirectory)
{
return new DruidSegmentInputEntity(
new SegmentCacheManager()
@@ -634,7 +679,62 @@ public class DruidSegmentReaderTest extends NullHandlingTest
);
}
- private List<InputRow> readRows(final DruidSegmentReader reader) throws IOException
+ public static DruidSegmentInputEntity makeTombstoneInputEntity(final Interval interval)
+ {
+ return new DruidSegmentInputEntity(
+ new SegmentCacheManager()
+ {
+ @Override
+ public boolean isSegmentCached(DataSegment segment)
+ {
+ throw new UnsupportedOperationException("unused");
+ }
+
+ @Override
+ public File getSegmentFiles(DataSegment segment)
+ {
+ throw new UnsupportedOperationException("unused");
+ }
+
+ @Override
+ public void cleanup(DataSegment segment)
+ {
+ throw new UnsupportedOperationException("unused");
+ }
+
+ @Override
+ public boolean reserve(DataSegment segment)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean release(DataSegment segment)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void loadSegmentIntoPageCache(DataSegment segment, ExecutorService exec)
+ {
+ throw new UnsupportedOperationException();
+
+ }
+ },
+ DataSegment.builder()
+ .dataSource("ds")
+ .interval(Intervals.of("2000/P1D"))
+ .version("1")
+ .shardSpec(new TombstoneShardSpec())
+ .loadSpec(ImmutableMap.of("type", "tombstone"))
+ .size(1)
+ .build(),
+ interval
+ );
+ }
+
+
+ private List<InputRow> readRows(DruidSegmentReader reader) throws IOException
{
final List<InputRow> rows = new ArrayList<>();
try (final CloseableIterator<Map<String, Object>> iterator = reader.intermediateRowIterator()) {
@@ -645,6 +745,18 @@ public class DruidSegmentReaderTest extends NullHandlingTest
return rows;
}
+ private List<InputRow> readRows(DruidTombstoneSegmentReader reader) throws IOException
+ {
+ final List<InputRow> rows = new ArrayList<>();
+ try (final CloseableIterator<Map<String, Object>> iterator = reader.intermediateRowIterator()) {
+ while (iterator.hasNext()) {
+ rows.addAll(reader.parseInputRows(iterator.next()));
+ }
+ }
+ return rows;
+ }
+
+
private static HyperLogLogCollector makeHLLC(final String... values)
{
final HyperLogLogCollector collector = HyperLogLogCollector.makeLatestCollector();
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
index 07a9bd62de..7dd7591615 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/coordinator/duty/ITAutoCompactionTest.java
@@ -40,6 +40,7 @@ import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
+import org.apache.druid.java.util.common.granularity.PeriodGranularity;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
@@ -67,6 +68,7 @@ import org.apache.druid.tests.TestNGGroup;
import org.apache.druid.tests.indexer.AbstractITBatchIndexTest;
import org.apache.druid.tests.indexer.AbstractIndexerTest;
import org.apache.druid.timeline.DataSegment;
+import org.joda.time.DateTimeZone;
import org.joda.time.Interval;
import org.joda.time.Period;
import org.joda.time.chrono.ISOChronology;
@@ -80,6 +82,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -502,10 +505,7 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
1,
1,
0);
- Assert.assertEquals(
- "14906",
- compactionResource.getCompactionProgress(fullDatasourceName).get("remainingSegmentSize")
- );
+ Assert.assertEquals(compactionResource.getCompactionProgress(fullDatasourceName).get("remainingSegmentSize"), "14906");
// Run compaction again to compact the remaining day
// Remaining day compacted (1 new segment). Now both days compacted (2 total)
forceTriggerAutoCompaction(2);
@@ -530,7 +530,28 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
@Test
public void testAutoCompactionDutyWithSegmentGranularityAndWithDropExistingTrue() throws Exception
{
+ // Interval is "2013-08-31/2013-09-02", segment gran is DAY,
+ // "maxRowsPerSegment": 3
+ // input files:
+ // "/resources/data/batch_index/json/wikipedia_index_data1.json",
+ // 3rows -> "2013-08-31T01:02:33Z", "2013-08-31T03:32:45Z", "2013-08-31T07:11:21Z"
+ // "/resources/data/batch_index/json/wikipedia_index_data2.json",
+ // 3 rows -> "2013-08-31T11:58:39Z", "2013-08-31T12:41:27Z", "2013-09-01T01:02:33Z"
+ // "/resources/data/batch_index/json/wikipedia_index_data3.json"
+ // 4 rows -> "2013-09-01T03:32:45Z", "2013-09-01T07:11:21Z", "2013-09-01T11:58:39Z", "2013-09-01T12:41:27Z"
+ // Summary of data:
+ // 5 rows @ 2013-08031 and 5 at 2013-0901, TWO days have data only
+ // Initial load/ingestion: DAY, "intervals" : [ "2013-08-31/2013-09-02" ], Four segments, no tombstones
+ // 1st compaction: YEAR: 10 rows during 2013 (4 segments of at most three rows each)
+ // "interval": "2013-01-01T00:00:00.000Z/2014-01-01T00:00:00.000Z",
+ // 2nd compaction: MONTH: 5 rows @ 2013-08 (two segments), 5 rows @ 2013-09 (two segments)
+ // "interval": "2013-01-01T00:00:00.000Z/2014-01-01T00:00:00.000Z",
+ // Four data segments (two months) and 10 tombstones for remaining months
+ // 3d compaction: SEMESTER: 5 rows @ 2013-08-31 (two segments), 5 rows @ 2013-09-01 (two segments),
+ // 2 compactions were generated for year 2013; one for each semester to be compacted of the whole year.
+ //
loadData(INDEX_TASK);
+
try (final Closeable ignored = unloader(fullDatasourceName)) {
final List<String> intervalsBeforeCompaction = coordinator.getSegmentIntervals(fullDatasourceName);
intervalsBeforeCompaction.sort(null);
@@ -538,12 +559,13 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
verifySegmentsCount(4);
verifyQuery(INDEX_QUERIES_RESOURCE);
+
+ LOG.info("Auto compaction test with YEAR segment granularity, dropExisting is true");
Granularity newGranularity = Granularities.YEAR;
// Set dropExisting to true
+ // "interval": "2013-01-01T00:00:00.000Z/2014-01-01T00:00:00.000Z",
submitCompactionConfig(1000, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null), true);
- LOG.info("Auto compaction test with YEAR segment granularity");
-
List<String> expectedIntervalAfterCompaction = new ArrayList<>();
for (String interval : intervalsBeforeCompaction) {
for (Interval newinterval : newGranularity.getIterable(new Interval(interval, ISOChronology.getInstanceUTC()))) {
@@ -555,30 +577,169 @@ public class ITAutoCompactionTest extends AbstractIndexerTest
verifySegmentsCompacted(1, 1000);
checkCompactionIntervals(expectedIntervalAfterCompaction);
- newGranularity = Granularities.DAY;
+
+ LOG.info("Auto compaction test with MONTH segment granularity, dropExisting is true");
+ // "interval": "2013-01-01T00:00:00.000Z/2014-01-01T00:00:00.000Z",
+ newGranularity = Granularities.MONTH;
// Set dropExisting to true
submitCompactionConfig(1000, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null), true);
- LOG.info("Auto compaction test with DAY segment granularity");
+ // Since dropExisting is set to true...
+ // Again data is only in two days
+ // The earlier segment with YEAR granularity will be completely covered, overshadowed, by the
+ // new MONTH segments for data and tombstones for days with no data
+ // Hence, we will only have 2013-08 to 2013-09 months with data
+ // plus 12 tombstones
+ final List<String> intervalsAfterYEARCompactionButBeforeMONTHCompaction =
+ coordinator.getSegmentIntervals(fullDatasourceName);
+ expectedIntervalAfterCompaction = new ArrayList<>();
+ for (String interval : intervalsAfterYEARCompactionButBeforeMONTHCompaction) {
+ for (Interval newinterval : newGranularity.getIterable(new Interval(interval, ISOChronology.getInstanceUTC()))) {
+ expectedIntervalAfterCompaction.add(newinterval.toString());
+ }
+ }
+ forceTriggerAutoCompaction(12);
+ verifyQuery(INDEX_QUERIES_RESOURCE);
+ verifyTombstones(10);
+ verifySegmentsCompacted(12, 1000);
+ checkCompactionIntervals(expectedIntervalAfterCompaction);
+
+ LOG.info("Auto compaction test with SEMESTER segment granularity, dropExisting is true, over tombstones");
+ // only reason is semester and not quarter or month is to minimize time in the test but to
+ // ensure that one of the compactions compacts *only* tombstones. The first semester will
+ // compact only tombstones, so it should be a tombstone itself.
+ newGranularity = new PeriodGranularity(new Period("P6M"), null, DateTimeZone.UTC);
+ // Set dropExisting to true
+ submitCompactionConfig(1000, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null), true);
// Since dropExisting is set to true...
+ // The earlier 12 segments with MONTH granularity will be completely covered, overshadowed, by the
+ // new PT6M segments for data and tombstones for days with no data
+ // Hence, we will have two segments, one tombstone for the first semester and one data segment for the second.
+ forceTriggerAutoCompaction(2); // two semesters compacted
+ verifyQuery(INDEX_QUERIES_RESOURCE);
+ verifyTombstones(1);
+ verifySegmentsCompacted(2, 1000);
+
+ expectedIntervalAfterCompaction =
+ Arrays.asList("2013-01-01T00:00:00.000Z/2013-07-01T00:00:00.000Z",
+ "2013-07-01T00:00:00.000Z/2014-01-01T00:00:00.000Z"
+ );
+ checkCompactionIntervals(expectedIntervalAfterCompaction);
+
+ // verify that autocompaction completed before
+ List<TaskResponseObject> compactTasksBefore = indexer.getCompleteTasksForDataSource(fullDatasourceName);
+ forceTriggerAutoCompaction(2);
+ List<TaskResponseObject> compactTasksAfter = indexer.getCompleteTasksForDataSource(fullDatasourceName);
+ Assert.assertEquals(compactTasksAfter.size(), compactTasksBefore.size());
+
+ }
+ }
+
+ @Test
+ public void testAutoCompactionDutyWithSegmentGranularityAndWithDropExistingTrueThenFalse() throws Exception
+ {
+ // Interval is "2013-08-31/2013-09-02", segment gran is DAY,
+ // "maxRowsPerSegment": 3
+ // input files:
+ // "/resources/data/batch_index/json/wikipedia_index_data1.json",
+ // 3rows -> "2013-08-31T01:02:33Z", "2013-08-31T03:32:45Z", "2013-08-31T07:11:21Z"
+ // "/resources/data/batch_index/json/wikipedia_index_data2.json",
+ // 3 rows -> "2013-08-31T11:58:39Z", "2013-08-31T12:41:27Z", "2013-09-01T01:02:33Z"
+ // "/resources/data/batch_index/json/wikipedia_index_data3.json"
+ // 4 rows -> "2013-09-01T03:32:45Z", "2013-09-01T07:11:21Z", "2013-09-01T11:58:39Z", "2013-09-01T12:41:27Z"
+ // Summary of data:
+ // 5 rows @ 2013-08031 and 5 at 2013-0901, TWO days have data only
+ // Initial load/ingestion: DAY, "intervals" : [ "2013-08-31/2013-09-02" ], Four segments, no tombstones
+ // 1st compaction: YEAR: 10 rows during 2013 (4 segments of at most three rows each)
+ // "interval": "2013-01-01T00:00:00.000Z/2014-01-01T00:00:00.000Z",
+ // 2nd compaction: MONTH: 5 rows @ 2013-08 (two segments), 5 rows @ 2013-09 (two segments)
+ // "interval": "2013-01-01T00:00:00.000Z/2014-01-01T00:00:00.000Z",
+ // Four data segments (two months) and 10 tombstones for remaining months
+ // 3d compaction: SEMESTER: 5 rows @ 2013-08-31, 5 rows @ 2013-09-01 (two segment),
+ // 2 compactions were generated for year 2013; one for each semester to be compacted of the whole year.
+ //
+ loadData(INDEX_TASK);
+
+ try (final Closeable ignored = unloader(fullDatasourceName)) {
+ final List<String> intervalsBeforeCompaction = coordinator.getSegmentIntervals(fullDatasourceName);
+ intervalsBeforeCompaction.sort(null);
+ // 4 segments across 2 days (4 total)...
+ verifySegmentsCount(4);
+ verifyQuery(INDEX_QUERIES_RESOURCE);
+
+
+ LOG.info("Auto compaction test with YEAR segment granularity, dropExisting is true");
+ Granularity newGranularity = Granularities.YEAR;
+ // Set dropExisting to true
+ // "interval": "2013-01-01T00:00:00.000Z/2014-01-01T00:00:00.000Z",
+ submitCompactionConfig(1000, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null), true);
+
+ List<String> expectedIntervalAfterCompaction = new ArrayList<>();
+ for (String interval : intervalsBeforeCompaction) {
+ for (Interval newinterval : newGranularity.getIterable(new Interval(interval, ISOChronology.getInstanceUTC()))) {
+ expectedIntervalAfterCompaction.add(newinterval.toString());
+ }
+ }
+ forceTriggerAutoCompaction(1);
+ verifyQuery(INDEX_QUERIES_RESOURCE);
+ verifySegmentsCompacted(1, 1000);
+ checkCompactionIntervals(expectedIntervalAfterCompaction);
+
+
+ LOG.info("Auto compaction test with MONTH segment granularity, dropExisting is true");
+ // "interval": "2013-01-01T00:00:00.000Z/2014-01-01T00:00:00.000Z",
+ newGranularity = Granularities.MONTH;
+ // Set dropExisting to true
+ submitCompactionConfig(1000, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity, null, null), true);
+
+ // Since dropExisting is set to true...
+ // Again data is only in two days
// The earlier segment with YEAR granularity will be completely covered, overshadowed, by the
- // new DAY segments for data and tombstones for days with no data
- // Hence, we will only have 2013-08-31 to 2013-09-01 and 2013-09-01 to 2013-09-02
- // plus 363 tombstones
- final List<String> intervalsAfterYEARCompactionButBeforeDAYCompaction =
+ // new MONTH segments for data and tombstones for days with no data
+ // Hence, we will only have 2013-08 to 2013-09 months with data
+ // plus 12 tombstones
+ final List<String> intervalsAfterYEARCompactionButBeforeMONTHCompaction =
coordinator.getSegmentIntervals(fullDatasourceName);
expectedIntervalAfterCompaction = new ArrayList<>();
- for (String interval : intervalsAfterYEARCompactionButBeforeDAYCompaction) {
+ for (String interval : intervalsAfterYEARCompactionButBeforeMONTHCompaction) {
for (Interval newinterval : newGranularity.getIterable(new Interval(interval, ISOChronology.getInstanceUTC()))) {
expectedIntervalAfterCompaction.add(newinterval.toString());
}
}
- forceTriggerAutoCompaction(365);
+ forceTriggerAutoCompaction(12);
+ verifyQuery(INDEX_QUERIES_RESOURCE);
+ verifyTombstones(10);
+ verifySegmentsCompacted(12, 1000);
+ checkCompactionIntervals(expectedIntervalAfterCompaction);
+
+ // Now compact again over tombstones but with dropExisting set to false:
+ LOG.info("Auto compaction test with SEMESTER segment granularity, dropExisting is false, over tombstones");
+ newGranularity = new PeriodGranularity(new Period("P6M"), null, DateTimeZone.UTC);
+ // Set dropExisting to false
+ submitCompactionConfig(1000, NO_SKIP_OFFSET, new UserCompactionTaskGranularityConfig(newGranularity,
+ null, null
+ ), false);
+
+ // Since dropExisting is set to false the first semester will be forced to dropExisting true
+ // Hence, we will have two, one tombstone for the first semester and one data segment for the second.
+ forceTriggerAutoCompaction(2); // two semesters compacted
verifyQuery(INDEX_QUERIES_RESOURCE);
- verifyTombstones(363);
- verifySegmentsCompacted(365, 1000);
+ verifyTombstones(1);
+ verifySegmentsCompacted(2, 1000);
+
+ expectedIntervalAfterCompaction =
+ Arrays.asList(
+ "2013-01-01T00:00:00.000Z/2013-07-01T00:00:00.000Z",
+ "2013-07-01T00:00:00.000Z/2014-01-01T00:00:00.000Z"
+ );
checkCompactionIntervals(expectedIntervalAfterCompaction);
+
+ // verify that autocompaction completed before
+ List<TaskResponseObject> compactTasksBefore = indexer.getCompleteTasksForDataSource(fullDatasourceName);
+ forceTriggerAutoCompaction(2);
+ List<TaskResponseObject> compactTasksAfter = indexer.getCompleteTasksForDataSource(fullDatasourceName);
+ Assert.assertEquals(compactTasksAfter.size(), compactTasksBefore.size());
}
}
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
index e3f47ddf24..ddae02298b 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java
@@ -427,6 +427,26 @@ public class CompactSegments implements CoordinatorCustomDuty
dropExisting = config.getIoConfig().isDropExisting();
}
+ // If all the segments found to be compacted are tombstones then dropExisting
+ // needs to be forced to true. This forcing needs to happen in the case that
+ // the flag is null, or it is false. It is needed when it is null to avoid the
+ // possibility of the code deciding to default it to false later.
+ // Forcing the flag to true will enable the task ingestion code to generate new, compacted, tombstones to
+ // cover the tombstones found to be compacted as well as to mark them
+ // as compacted (update their lastCompactionState). If we don't force the
+ // flag then every time this compact duty runs it will find the same tombstones
+ // in the interval since their lastCompactionState
+ // was not set repeating this over and over and the duty will not make progress; it
+ // will become stuck on this set of tombstones.
+ // This forcing code should be revised
+ // when/if the autocompaction code policy to decide which segments to compact changes
+ if (dropExisting == null || !dropExisting) {
+ if (segmentsToCompact.stream().allMatch(dataSegment -> dataSegment.isTombstone())) {
+ dropExisting = true;
+ LOG.info("Forcing dropExisting to %s since all segments to compact are tombstones", dropExisting);
+ }
+ }
+
// make tuningConfig
final String taskId = indexingServiceClient.compactSegments(
"coordinator-issued",
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org