You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ab...@apache.org on 2022/09/16 13:46:44 UTC

[druid] branch master updated: Allocate numCorePartitions using only used segments (#13070)

This is an automated email from the ASF dual-hosted git repository.

abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 9b53b0184f Allocate numCorePartitions using only used segments (#13070)
9b53b0184f is described below

commit 9b53b0184f12e072074e35e58633f442f2acb6da
Author: AmatyaAvadhanula <am...@imply.io>
AuthorDate: Fri Sep 16 19:16:36 2022 +0530

    Allocate numCorePartitions using only used segments (#13070)
    
    * Allocate numCorePartitions using only used segments
    
    * Add corePartition checks in existing test
    
    * Separate committedMaxId and overallMaxId
    
    * Fix bug: replace overall with committed
---
 .../IndexerSQLMetadataStorageCoordinator.java      | 86 ++++++++++++----------
 .../IndexerSQLMetadataStorageCoordinatorTest.java  | 12 ++-
 2 files changed, 60 insertions(+), 38 deletions(-)

diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
index 931ae81774..c804638099 100644
--- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
+++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
@@ -815,8 +815,10 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
       return null;
 
     } else {
-      // max partitionId of the shardSpecs which share the same partition space.
-      SegmentIdWithShardSpec maxId = null;
+      // max partitionId of the committed shardSpecs which share the same partition space.
+      SegmentIdWithShardSpec committedMaxId = null;
+      // max partitionId of the all shardSpecs including the pending ones which share the same partition space.
+      SegmentIdWithShardSpec overallMaxId;
 
       if (!existingChunks.isEmpty()) {
         TimelineObjectHolder<String, DataSegment> existingHolder = Iterables.getOnlyElement(existingChunks);
@@ -831,8 +833,9 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
             .filter(segment -> segment.getShardSpec().sharePartitionSpace(partialShardSpec))) {
           // Don't use the stream API for performance.
           // Note that this will compute the max id of existing, visible, data segments in the time chunk:
-          if (maxId == null || maxId.getShardSpec().getPartitionNum() < segment.getShardSpec().getPartitionNum()) {
-            maxId = SegmentIdWithShardSpec.fromDataSegment(segment);
+          if (committedMaxId == null
+              || committedMaxId.getShardSpec().getPartitionNum() < segment.getShardSpec().getPartitionNum()) {
+            committedMaxId = SegmentIdWithShardSpec.fromDataSegment(segment);
           }
         }
       }
@@ -848,7 +851,8 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
         versionOfExistingChunk = null;
       }
 
-      // next, we need to enrich the maxId computed before with the information of the pending segments
+
+      // next, we need to enrich the overallMaxId computed with committed segments with the information of the pending segments
       // it is possible that a pending segment has a higher id in which case we need that, it will work,
       // and it will avoid clashes when inserting the new pending segment later in the caller of this method
       final Set<SegmentIdWithShardSpec> pendings = getPendingSegmentsForIntervalWithHandle(
@@ -856,29 +860,30 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
           dataSource,
           interval
       );
-      // Make sure we add the maxId we obtained from the segments table:
-      if (maxId != null) {
-        pendings.add(maxId);
+      // Make sure we add the committed max id we obtained from the segments table:
+      if (committedMaxId != null) {
+        pendings.add(committedMaxId);
       }
-      //  Now compute the maxId with all the information: pendings + segments:
+      //  Now compute the overallMaxId with all the information: pendings + segments:
       // The versionOfExistingChunks filter is ensure that we pick the max id with the version of the existing chunk
       // in the case that there may be a pending segment with a higher version but no corresponding used segments
       // which may generate a clash with an existing segment once the new id is generated
-      maxId = pendings.stream()
-                      .filter(id -> id.getShardSpec().sharePartitionSpace(partialShardSpec))
-                      .filter(id -> versionOfExistingChunk == null ? true : id.getVersion().equals(versionOfExistingChunk))
-                      .max((id1, id2) -> {
-                        final int versionCompare = id1.getVersion().compareTo(id2.getVersion());
-                        if (versionCompare != 0) {
-                          return versionCompare;
-                        } else {
-                          return Integer.compare(
-                              id1.getShardSpec().getPartitionNum(),
-                              id2.getShardSpec().getPartitionNum()
-                          );
-                        }
-                      })
-                      .orElse(null);
+      overallMaxId = pendings.stream()
+                             .filter(id -> id.getShardSpec().sharePartitionSpace(partialShardSpec))
+                             .filter(id -> versionOfExistingChunk == null || id.getVersion()
+                                                                               .equals(versionOfExistingChunk))
+                             .max((id1, id2) -> {
+                               final int versionCompare = id1.getVersion().compareTo(id2.getVersion());
+                               if (versionCompare != 0) {
+                                 return versionCompare;
+                               } else {
+                                 return Integer.compare(
+                                     id1.getShardSpec().getPartitionNum(),
+                                     id2.getShardSpec().getPartitionNum()
+                                 );
+                               }
+                             })
+                             .orElse(null);
 
       // The following code attempts to compute the new version, if this
       // new version is not null at the end of next block then it will be
@@ -887,16 +892,16 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
       if (versionOfExistingChunk != null) {
         // segment version overrides, so pick that now that we know it exists
         newSegmentVersion = versionOfExistingChunk;
-      } else if (!pendings.isEmpty() && maxId != null) {
-        // there is no visible segments in the time chunk, so pick the maxId of pendings, as computed above
-        newSegmentVersion = maxId.getVersion();
+      } else if (!pendings.isEmpty() && overallMaxId != null) {
+        // there is no visible segments in the time chunk, so pick the max id of pendings, as computed above
+        newSegmentVersion = overallMaxId.getVersion();
       } else {
         // no segments, no pendings, so this must be the very first segment created for this interval
         newSegmentVersion = null;
       }
 
-      if (maxId == null) {
-        // When appending segments, null maxId means that we are allocating the very initial
+      if (overallMaxId == null) {
+        // When appending segments, null overallMaxId means that we are allocating the very initial
         // segment for this time chunk.
         // This code is executed when the Overlord coordinates segment allocation, which is either you append segments
         // or you use segment lock. Since the core partitions set is not determined for appended segments, we set
@@ -912,31 +917,38 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
             version,
             partialShardSpec.complete(jsonMapper, newPartitionId, 0)
         );
-      } else if (!maxId.getInterval().equals(interval) || maxId.getVersion().compareTo(existingVersion) > 0) {
+      } else if (!overallMaxId.getInterval().equals(interval)
+                 || overallMaxId.getVersion().compareTo(existingVersion) > 0) {
         log.warn(
             "Cannot allocate new segment for dataSource[%s], interval[%s], existingVersion[%s]: conflicting segment[%s].",
             dataSource,
             interval,
             existingVersion,
-            maxId
+            overallMaxId
         );
         return null;
-      } else if (maxId.getShardSpec().getNumCorePartitions() == SingleDimensionShardSpec.UNKNOWN_NUM_CORE_PARTITIONS) {
+      } else if (committedMaxId != null
+                 && committedMaxId.getShardSpec().getNumCorePartitions()
+                    == SingleDimensionShardSpec.UNKNOWN_NUM_CORE_PARTITIONS) {
         log.warn(
             "Cannot allocate new segment because of unknown core partition size of segment[%s], shardSpec[%s]",
-            maxId,
-            maxId.getShardSpec()
+            committedMaxId,
+            committedMaxId.getShardSpec()
         );
         return null;
       } else {
+        // The number of core partitions must always be chosen from the set of used segments in the VersionedIntervalTimeline.
+        // When the core partitions have been dropped, using pending segments may lead to an incorrect state
+        // where the chunk is believed to have core partitions and queries results are incorrect.
+
         return new SegmentIdWithShardSpec(
             dataSource,
-            maxId.getInterval(),
+            overallMaxId.getInterval(),
             Preconditions.checkNotNull(newSegmentVersion, "newSegmentVersion"),
             partialShardSpec.complete(
                 jsonMapper,
-                maxId.getShardSpec().getPartitionNum() + 1,
-                maxId.getShardSpec().getNumCorePartitions()
+                overallMaxId.getShardSpec().getPartitionNum() + 1,
+                committedMaxId == null ? 0 : committedMaxId.getShardSpec().getNumCorePartitions()
             )
         );
       }
diff --git a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
index 6f8d0caaf8..d7000f8f4d 100644
--- a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
+++ b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
@@ -1607,6 +1607,8 @@ public class IndexerSQLMetadataStorageCoordinatorTest
         true
     );
     Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_version", identifier.toString());
+    // Since there are no used core partitions yet
+    Assert.assertEquals(0, identifier.getShardSpec().getNumCorePartitions());
 
     // simulate one more load using kafka streaming (as if previous segment was published, note different sequence name)
     final SegmentIdWithShardSpec identifier1 = coordinator.allocatePendingSegment(
@@ -1619,6 +1621,8 @@ public class IndexerSQLMetadataStorageCoordinatorTest
         true
     );
     Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_version_1", identifier1.toString());
+    // Since there are no used core partitions yet
+    Assert.assertEquals(0, identifier1.getShardSpec().getNumCorePartitions());
 
     // simulate one more load using kafka streaming (as if previous segment was published, note different sequence name)
     final SegmentIdWithShardSpec identifier2 = coordinator.allocatePendingSegment(
@@ -1631,6 +1635,8 @@ public class IndexerSQLMetadataStorageCoordinatorTest
         true
     );
     Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_version_2", identifier2.toString());
+    // Since there are no used core partitions yet
+    Assert.assertEquals(0, identifier2.getShardSpec().getNumCorePartitions());
 
     // now simulate that one compaction was done (batch) ingestion for same interval (like reindex of the previous three):
     DataSegment segment = new DataSegment(
@@ -1640,7 +1646,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
         ImmutableMap.of(),
         ImmutableList.of("dim1"),
         ImmutableList.of("m1"),
-        new LinearShardSpec(0),
+        new NumberedShardSpec(0, 1),
         9,
         100
     );
@@ -1659,6 +1665,8 @@ public class IndexerSQLMetadataStorageCoordinatorTest
         true
     );
     Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_version_new_1", identifier3.toString());
+    // Used segment set has 1 core partition
+    Assert.assertEquals(1, identifier3.getShardSpec().getNumCorePartitions());
 
     // now drop the used segment previously loaded:
     markAllSegmentsUnused(ImmutableSet.of(segment));
@@ -1675,6 +1683,8 @@ public class IndexerSQLMetadataStorageCoordinatorTest
         true
     );
     Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_version_new_2", identifier4.toString());
+    // Since all core partitions have been dropped
+    Assert.assertEquals(0, identifier4.getShardSpec().getNumCorePartitions());
 
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org