You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ab...@apache.org on 2022/09/16 13:46:44 UTC
[druid] branch master updated: Allocate numCorePartitions using only used segments (#13070)
This is an automated email from the ASF dual-hosted git repository.
abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 9b53b0184f Allocate numCorePartitions using only used segments (#13070)
9b53b0184f is described below
commit 9b53b0184f12e072074e35e58633f442f2acb6da
Author: AmatyaAvadhanula <am...@imply.io>
AuthorDate: Fri Sep 16 19:16:36 2022 +0530
Allocate numCorePartitions using only used segments (#13070)
* Allocate numCorePartitions using only used segments
* Add corePartition checks in existing test
* Separate committedMaxId and overallMaxId
* Fix bug: replace overall with committed
---
.../IndexerSQLMetadataStorageCoordinator.java | 86 ++++++++++++----------
.../IndexerSQLMetadataStorageCoordinatorTest.java | 12 ++-
2 files changed, 60 insertions(+), 38 deletions(-)
diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
index 931ae81774..c804638099 100644
--- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
+++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
@@ -815,8 +815,10 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
return null;
} else {
- // max partitionId of the shardSpecs which share the same partition space.
- SegmentIdWithShardSpec maxId = null;
+ // max partitionId of the committed shardSpecs which share the same partition space.
+ SegmentIdWithShardSpec committedMaxId = null;
+ // max partitionId of the all shardSpecs including the pending ones which share the same partition space.
+ SegmentIdWithShardSpec overallMaxId;
if (!existingChunks.isEmpty()) {
TimelineObjectHolder<String, DataSegment> existingHolder = Iterables.getOnlyElement(existingChunks);
@@ -831,8 +833,9 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
.filter(segment -> segment.getShardSpec().sharePartitionSpace(partialShardSpec))) {
// Don't use the stream API for performance.
// Note that this will compute the max id of existing, visible, data segments in the time chunk:
- if (maxId == null || maxId.getShardSpec().getPartitionNum() < segment.getShardSpec().getPartitionNum()) {
- maxId = SegmentIdWithShardSpec.fromDataSegment(segment);
+ if (committedMaxId == null
+ || committedMaxId.getShardSpec().getPartitionNum() < segment.getShardSpec().getPartitionNum()) {
+ committedMaxId = SegmentIdWithShardSpec.fromDataSegment(segment);
}
}
}
@@ -848,7 +851,8 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
versionOfExistingChunk = null;
}
- // next, we need to enrich the maxId computed before with the information of the pending segments
+
+ // next, we need to enrich the overallMaxId computed with committed segments with the information of the pending segments
// it is possible that a pending segment has a higher id in which case we need that, it will work,
// and it will avoid clashes when inserting the new pending segment later in the caller of this method
final Set<SegmentIdWithShardSpec> pendings = getPendingSegmentsForIntervalWithHandle(
@@ -856,29 +860,30 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
dataSource,
interval
);
- // Make sure we add the maxId we obtained from the segments table:
- if (maxId != null) {
- pendings.add(maxId);
+ // Make sure we add the committed max id we obtained from the segments table:
+ if (committedMaxId != null) {
+ pendings.add(committedMaxId);
}
- // Now compute the maxId with all the information: pendings + segments:
+ // Now compute the overallMaxId with all the information: pendings + segments:
// The versionOfExistingChunks filter is ensure that we pick the max id with the version of the existing chunk
// in the case that there may be a pending segment with a higher version but no corresponding used segments
// which may generate a clash with an existing segment once the new id is generated
- maxId = pendings.stream()
- .filter(id -> id.getShardSpec().sharePartitionSpace(partialShardSpec))
- .filter(id -> versionOfExistingChunk == null ? true : id.getVersion().equals(versionOfExistingChunk))
- .max((id1, id2) -> {
- final int versionCompare = id1.getVersion().compareTo(id2.getVersion());
- if (versionCompare != 0) {
- return versionCompare;
- } else {
- return Integer.compare(
- id1.getShardSpec().getPartitionNum(),
- id2.getShardSpec().getPartitionNum()
- );
- }
- })
- .orElse(null);
+ overallMaxId = pendings.stream()
+ .filter(id -> id.getShardSpec().sharePartitionSpace(partialShardSpec))
+ .filter(id -> versionOfExistingChunk == null || id.getVersion()
+ .equals(versionOfExistingChunk))
+ .max((id1, id2) -> {
+ final int versionCompare = id1.getVersion().compareTo(id2.getVersion());
+ if (versionCompare != 0) {
+ return versionCompare;
+ } else {
+ return Integer.compare(
+ id1.getShardSpec().getPartitionNum(),
+ id2.getShardSpec().getPartitionNum()
+ );
+ }
+ })
+ .orElse(null);
// The following code attempts to compute the new version, if this
// new version is not null at the end of next block then it will be
@@ -887,16 +892,16 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
if (versionOfExistingChunk != null) {
// segment version overrides, so pick that now that we know it exists
newSegmentVersion = versionOfExistingChunk;
- } else if (!pendings.isEmpty() && maxId != null) {
- // there is no visible segments in the time chunk, so pick the maxId of pendings, as computed above
- newSegmentVersion = maxId.getVersion();
+ } else if (!pendings.isEmpty() && overallMaxId != null) {
+ // there is no visible segments in the time chunk, so pick the max id of pendings, as computed above
+ newSegmentVersion = overallMaxId.getVersion();
} else {
// no segments, no pendings, so this must be the very first segment created for this interval
newSegmentVersion = null;
}
- if (maxId == null) {
- // When appending segments, null maxId means that we are allocating the very initial
+ if (overallMaxId == null) {
+ // When appending segments, null overallMaxId means that we are allocating the very initial
// segment for this time chunk.
// This code is executed when the Overlord coordinates segment allocation, which is either you append segments
// or you use segment lock. Since the core partitions set is not determined for appended segments, we set
@@ -912,31 +917,38 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
version,
partialShardSpec.complete(jsonMapper, newPartitionId, 0)
);
- } else if (!maxId.getInterval().equals(interval) || maxId.getVersion().compareTo(existingVersion) > 0) {
+ } else if (!overallMaxId.getInterval().equals(interval)
+ || overallMaxId.getVersion().compareTo(existingVersion) > 0) {
log.warn(
"Cannot allocate new segment for dataSource[%s], interval[%s], existingVersion[%s]: conflicting segment[%s].",
dataSource,
interval,
existingVersion,
- maxId
+ overallMaxId
);
return null;
- } else if (maxId.getShardSpec().getNumCorePartitions() == SingleDimensionShardSpec.UNKNOWN_NUM_CORE_PARTITIONS) {
+ } else if (committedMaxId != null
+ && committedMaxId.getShardSpec().getNumCorePartitions()
+ == SingleDimensionShardSpec.UNKNOWN_NUM_CORE_PARTITIONS) {
log.warn(
"Cannot allocate new segment because of unknown core partition size of segment[%s], shardSpec[%s]",
- maxId,
- maxId.getShardSpec()
+ committedMaxId,
+ committedMaxId.getShardSpec()
);
return null;
} else {
+ // The number of core partitions must always be chosen from the set of used segments in the VersionedIntervalTimeline.
+ // When the core partitions have been dropped, using pending segments may lead to an incorrect state
+ // where the chunk is believed to have core partitions and queries results are incorrect.
+
return new SegmentIdWithShardSpec(
dataSource,
- maxId.getInterval(),
+ overallMaxId.getInterval(),
Preconditions.checkNotNull(newSegmentVersion, "newSegmentVersion"),
partialShardSpec.complete(
jsonMapper,
- maxId.getShardSpec().getPartitionNum() + 1,
- maxId.getShardSpec().getNumCorePartitions()
+ overallMaxId.getShardSpec().getPartitionNum() + 1,
+ committedMaxId == null ? 0 : committedMaxId.getShardSpec().getNumCorePartitions()
)
);
}
diff --git a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
index 6f8d0caaf8..d7000f8f4d 100644
--- a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
+++ b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
@@ -1607,6 +1607,8 @@ public class IndexerSQLMetadataStorageCoordinatorTest
true
);
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_version", identifier.toString());
+ // Since there are no used core partitions yet
+ Assert.assertEquals(0, identifier.getShardSpec().getNumCorePartitions());
// simulate one more load using kafka streaming (as if previous segment was published, note different sequence name)
final SegmentIdWithShardSpec identifier1 = coordinator.allocatePendingSegment(
@@ -1619,6 +1621,8 @@ public class IndexerSQLMetadataStorageCoordinatorTest
true
);
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_version_1", identifier1.toString());
+ // Since there are no used core partitions yet
+ Assert.assertEquals(0, identifier1.getShardSpec().getNumCorePartitions());
// simulate one more load using kafka streaming (as if previous segment was published, note different sequence name)
final SegmentIdWithShardSpec identifier2 = coordinator.allocatePendingSegment(
@@ -1631,6 +1635,8 @@ public class IndexerSQLMetadataStorageCoordinatorTest
true
);
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_version_2", identifier2.toString());
+ // Since there are no used core partitions yet
+ Assert.assertEquals(0, identifier2.getShardSpec().getNumCorePartitions());
// now simulate that one compaction was done (batch) ingestion for same interval (like reindex of the previous three):
DataSegment segment = new DataSegment(
@@ -1640,7 +1646,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
ImmutableMap.of(),
ImmutableList.of("dim1"),
ImmutableList.of("m1"),
- new LinearShardSpec(0),
+ new NumberedShardSpec(0, 1),
9,
100
);
@@ -1659,6 +1665,8 @@ public class IndexerSQLMetadataStorageCoordinatorTest
true
);
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_version_new_1", identifier3.toString());
+ // Used segment set has 1 core partition
+ Assert.assertEquals(1, identifier3.getShardSpec().getNumCorePartitions());
// now drop the used segment previously loaded:
markAllSegmentsUnused(ImmutableSet.of(segment));
@@ -1675,6 +1683,8 @@ public class IndexerSQLMetadataStorageCoordinatorTest
true
);
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_version_new_2", identifier4.toString());
+ // Since all core partitions have been dropped
+ Assert.assertEquals(0, identifier4.getShardSpec().getNumCorePartitions());
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org