You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by kh...@apache.org on 2021/10/22 05:19:01 UTC

[pinot] branch master updated: Add support for lowest partition id in case partition id 0 is not available (#7066)

This is an automated email from the ASF dual-hosted git repository.

kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 1ee3c02  Add support for lowest partition id in case partition id 0 is not available (#7066)
1ee3c02 is described below

commit 1ee3c020fa421a44f7f199299cc6c400216a972d
Author: Kartik Khare <kh...@gmail.com>
AuthorDate: Fri Oct 22 10:47:43 2021 +0530

    Add support for lowest partition id in case partition id 0 is not available (#7066)
    
    * Add support for lowest partition id in case partition id 0 is not available
    
    * Remove unused import
    
    * Add complete partition metadata list instead of just current partition
    
    * Add new test for min partition data instead of partition 0
    
    * Fix linting
    
    * Fix linting
    
    * edit comment to reflect new partition id calculation
    
    * Update method for easy int comparison
---
 .../realtime/PinotLLCRealtimeSegmentManager.java   | 69 +++++++--------
 .../segment/DefaultFlushThresholdUpdater.java      |  9 +-
 .../realtime/segment/FlushThresholdUpdater.java    |  7 +-
 .../SegmentSizeBasedFlushThresholdUpdater.java     | 15 +++-
 .../segment/FlushThresholdUpdaterTest.java         | 97 +++++++++++++++++-----
 5 files changed, 130 insertions(+), 67 deletions(-)

diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 6e72249..7f6996a 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -142,8 +142,8 @@ public class PinotLLCRealtimeSegmentManager {
    * check the segment expiration time to see if it is about to be deleted (i.e. less than this threshold). Skip the
    * deep store fix if necessary. RetentionManager will delete this kind of segments shortly anyway.
    */
-  private static final long MIN_TIME_BEFORE_SEGMENT_EXPIRATION_FOR_FIXING_DEEP_STORE_COPY_MILLIS =
-      60 * 60 * 1000L; // 1 hour
+  private static final long MIN_TIME_BEFORE_SEGMENT_EXPIRATION_FOR_FIXING_DEEP_STORE_COPY_MILLIS = 60 * 60 * 1000L;
+  // 1 hour
   private static final Random RANDOM = new Random();
 
   private final HelixAdmin _helixAdmin;
@@ -182,8 +182,7 @@ public class PinotLLCRealtimeSegmentManager {
     }
     _tableConfigCache = new TableConfigCache(_propertyStore);
     _flushThresholdUpdateManager = new FlushThresholdUpdateManager();
-    _isDeepStoreLLCSegmentUploadRetryEnabled =
-        controllerConf.isDeepStoreRetryUploadLLCSegmentEnabled();
+    _isDeepStoreLLCSegmentUploadRetryEnabled = controllerConf.isDeepStoreRetryUploadLLCSegmentEnabled();
     if (_isDeepStoreLLCSegmentUploadRetryEnabled) {
       _fileUploadDownloadClient = initFileUploadDownloadClient();
     }
@@ -317,7 +316,7 @@ public class PinotLLCRealtimeSegmentManager {
     for (PartitionGroupMetadata partitionGroupMetadata : newPartitionGroupMetadataList) {
       String segmentName =
           setupNewPartitionGroup(tableConfig, streamConfig, partitionGroupMetadata, currentTimeMs, instancePartitions,
-              numPartitionGroups, numReplicas);
+              numPartitionGroups, numReplicas, newPartitionGroupMetadataList);
 
       updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null, segmentName, segmentAssignment,
           instancePartitionsMap);
@@ -564,8 +563,8 @@ public class PinotLLCRealtimeSegmentManager {
       LLCSegmentName newLLCSegment = new LLCSegmentName(rawTableName, committingSegmentPartitionGroupId,
           committingLLCSegment.getSequenceNumber() + 1, newSegmentCreationTimeMs);
       createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegment, newSegmentCreationTimeMs,
-          committingSegmentDescriptor, committingSegmentZKMetadata, instancePartitions, numPartitionGroups,
-          numReplicas);
+          committingSegmentDescriptor, committingSegmentZKMetadata, instancePartitions, numPartitionGroups, numReplicas,
+          newPartitionGroupMetadataList);
       newConsumingSegmentName = newLLCSegment.getSegmentName();
     }
 
@@ -663,7 +662,7 @@ public class PinotLLCRealtimeSegmentManager {
   private void createNewSegmentZKMetadata(TableConfig tableConfig, PartitionLevelStreamConfig streamConfig,
       LLCSegmentName newLLCSegmentName, long creationTimeMs, CommittingSegmentDescriptor committingSegmentDescriptor,
       @Nullable SegmentZKMetadata committingSegmentZKMetadata, InstancePartitions instancePartitions,
-      int numPartitionGroups, int numReplicas) {
+      int numPartitionGroups, int numReplicas, List<PartitionGroupMetadata> partitionGroupMetadataList) {
     String realtimeTableName = tableConfig.getTableName();
     String segmentName = newLLCSegmentName.getSegmentName();
     String startOffset = committingSegmentDescriptor.getNextOffset();
@@ -692,7 +691,8 @@ public class PinotLLCRealtimeSegmentManager {
     FlushThresholdUpdater flushThresholdUpdater = _flushThresholdUpdateManager.getFlushThresholdUpdater(streamConfig);
     flushThresholdUpdater.updateFlushThreshold(streamConfig, newSegmentZKMetadata, committingSegmentDescriptor,
         committingSegmentZKMetadata,
-        getMaxNumPartitionsPerInstance(instancePartitions, numPartitionGroups, numReplicas));
+        getMaxNumPartitionsPerInstance(instancePartitions, numPartitionGroups, numReplicas),
+        partitionGroupMetadataList);
 
     persistSegmentZKMetadata(realtimeTableName, newSegmentZKMetadata, -1);
   }
@@ -1089,7 +1089,8 @@ public class PinotLLCRealtimeSegmentManager {
                   new CommittingSegmentDescriptor(latestSegmentName,
                       (offsetFactory.create(latestSegmentZKMetadata.getEndOffset()).toString()), 0);
               createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName, currentTimeMs,
-                  committingSegmentDescriptor, latestSegmentZKMetadata, instancePartitions, numPartitions, numReplicas);
+                  committingSegmentDescriptor, latestSegmentZKMetadata, instancePartitions, numPartitions, numReplicas,
+                  newPartitionGroupMetadataList);
               updateInstanceStatesForNewConsumingSegment(instanceStatesMap, latestSegmentName, newSegmentName,
                   segmentAssignment, instancePartitionsMap);
             } else { // partition group reached end of life
@@ -1129,7 +1130,8 @@ public class PinotLLCRealtimeSegmentManager {
             CommittingSegmentDescriptor committingSegmentDescriptor =
                 new CommittingSegmentDescriptor(latestSegmentName, startOffset.toString(), 0);
             createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName, currentTimeMs,
-                committingSegmentDescriptor, latestSegmentZKMetadata, instancePartitions, numPartitions, numReplicas);
+                committingSegmentDescriptor, latestSegmentZKMetadata, instancePartitions, numPartitions, numReplicas,
+                newPartitionGroupMetadataList);
             String newSegmentName = newLLCSegmentName.getSegmentName();
             updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null, newSegmentName, segmentAssignment,
                 instancePartitionsMap);
@@ -1191,7 +1193,7 @@ public class PinotLLCRealtimeSegmentManager {
       if (!latestSegmentZKMetadataMap.containsKey(partitionGroupId)) {
         String newSegmentName =
             setupNewPartitionGroup(tableConfig, streamConfig, partitionGroupMetadata, currentTimeMs, instancePartitions,
-                numPartitions, numReplicas);
+                numPartitions, numReplicas, newPartitionGroupMetadataList);
         updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null, newSegmentName, segmentAssignment,
             instancePartitionsMap);
       }
@@ -1230,7 +1232,7 @@ public class PinotLLCRealtimeSegmentManager {
    */
   private String setupNewPartitionGroup(TableConfig tableConfig, PartitionLevelStreamConfig streamConfig,
       PartitionGroupMetadata partitionGroupMetadata, long creationTimeMs, InstancePartitions instancePartitions,
-      int numPartitionGroups, int numReplicas) {
+      int numPartitionGroups, int numReplicas, List<PartitionGroupMetadata> partitionGroupMetadataList) {
     String realtimeTableName = tableConfig.getTableName();
     int partitionGroupId = partitionGroupMetadata.getPartitionGroupId();
     String startOffset = partitionGroupMetadata.getStartOffset().toString();
@@ -1243,7 +1245,8 @@ public class PinotLLCRealtimeSegmentManager {
 
     CommittingSegmentDescriptor committingSegmentDescriptor = new CommittingSegmentDescriptor(null, startOffset, 0);
     createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName, creationTimeMs,
-        committingSegmentDescriptor, null, instancePartitions, numPartitionGroups, numReplicas);
+        committingSegmentDescriptor, null, instancePartitions, numPartitionGroups, numReplicas,
+        partitionGroupMetadataList);
 
     return newSegmentName;
   }
@@ -1303,8 +1306,7 @@ public class PinotLLCRealtimeSegmentManager {
     String realtimeTableName = tableConfig.getTableName();
 
     if (_isStopping) {
-      LOGGER.info(
-          "Skipped fixing deep store copy of LLC segments for table {}, because segment manager is stopping.",
+      LOGGER.info("Skipped fixing deep store copy of LLC segments for table {}, because segment manager is stopping.",
           realtimeTableName);
       return;
     }
@@ -1314,11 +1316,9 @@ public class PinotLLCRealtimeSegmentManager {
     SegmentsValidationAndRetentionConfig validationConfig = tableConfig.getValidationConfig();
     if (validationConfig.getRetentionTimeUnit() != null && !validationConfig.getRetentionTimeUnit().isEmpty()
         && validationConfig.getRetentionTimeValue() != null && !validationConfig.getRetentionTimeValue().isEmpty()) {
-      long retentionMs =
-          TimeUnit.valueOf(validationConfig.getRetentionTimeUnit().toUpperCase())
-              .toMillis(Long.parseLong(validationConfig.getRetentionTimeValue()));
-      retentionStrategy = new TimeRetentionStrategy(
-          TimeUnit.MILLISECONDS,
+      long retentionMs = TimeUnit.valueOf(validationConfig.getRetentionTimeUnit().toUpperCase())
+          .toMillis(Long.parseLong(validationConfig.getRetentionTimeValue()));
+      retentionStrategy = new TimeRetentionStrategy(TimeUnit.MILLISECONDS,
           retentionMs - MIN_TIME_BEFORE_SEGMENT_EXPIRATION_FOR_FIXING_DEEP_STORE_COPY_MILLIS);
     }
 
@@ -1342,40 +1342,35 @@ public class PinotLLCRealtimeSegmentManager {
         }
         // Skip the fix for the segment if it is already out of retention.
         if (retentionStrategy != null && retentionStrategy.isPurgeable(realtimeTableName, segmentZKMetadata)) {
-          LOGGER.info("Skipped deep store uploading of LLC segment {} which is already out of retention",
-              segmentName);
+          LOGGER.info("Skipped deep store uploading of LLC segment {} which is already out of retention", segmentName);
           continue;
         }
         LOGGER.info("Fixing LLC segment {} whose deep store copy is unavailable", segmentName);
 
         // Find servers which have online replica
-        List<URI> peerSegmentURIs = PeerServerSegmentFinder
-            .getPeerServerURIs(segmentName, CommonConstants.HTTP_PROTOCOL, _helixManager);
+        List<URI> peerSegmentURIs =
+            PeerServerSegmentFinder.getPeerServerURIs(segmentName, CommonConstants.HTTP_PROTOCOL, _helixManager);
         if (peerSegmentURIs.isEmpty()) {
-          throw new IllegalStateException(
-              String.format(
-                  "Failed to upload segment %s to deep store because no online replica is found",
-                  segmentName));
+          throw new IllegalStateException(String
+              .format("Failed to upload segment %s to deep store because no online replica is found", segmentName));
         }
 
         // Randomly ask one server to upload
         URI uri = peerSegmentURIs.get(RANDOM.nextInt(peerSegmentURIs.size()));
         String serverUploadRequestUrl = StringUtil.join("/", uri.toString(), "upload");
-        LOGGER.info(
-            "Ask server to upload LLC segment {} to deep store by this path: {}",
-            segmentName, serverUploadRequestUrl);
+        LOGGER.info("Ask server to upload LLC segment {} to deep store by this path: {}", segmentName,
+            serverUploadRequestUrl);
         String segmentDownloadUrl = _fileUploadDownloadClient.uploadToSegmentStore(serverUploadRequestUrl);
         LOGGER.info("Updating segment {} download url in ZK to be {}", segmentName, segmentDownloadUrl);
         // Update segment ZK metadata by adding the download URL
         segmentZKMetadata.setDownloadUrl(segmentDownloadUrl);
         // TODO: add version check when persist segment ZK metadata
         persistSegmentZKMetadata(realtimeTableName, segmentZKMetadata, -1);
-        LOGGER.info(
-            "Successfully uploaded LLC segment {} to deep store with download url: {}",
-            segmentName, segmentDownloadUrl);
+        LOGGER.info("Successfully uploaded LLC segment {} to deep store with download url: {}", segmentName,
+            segmentDownloadUrl);
       } catch (Exception e) {
-        _controllerMetrics.addMeteredTableValue(realtimeTableName,
-            ControllerMeter.LLC_SEGMENTS_DEEP_STORE_UPLOAD_RETRY_ERROR, 1L);
+        _controllerMetrics
+            .addMeteredTableValue(realtimeTableName, ControllerMeter.LLC_SEGMENTS_DEEP_STORE_UPLOAD_RETRY_ERROR, 1L);
         LOGGER.error("Failed to upload segment {} to deep store", segmentName, e);
       }
     }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/DefaultFlushThresholdUpdater.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/DefaultFlushThresholdUpdater.java
index 3c1d27b..9c1ddea 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/DefaultFlushThresholdUpdater.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/DefaultFlushThresholdUpdater.java
@@ -19,8 +19,10 @@
 package org.apache.pinot.controller.helix.core.realtime.segment;
 
 import com.google.common.annotations.VisibleForTesting;
+import java.util.List;
 import javax.annotation.Nullable;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.spi.stream.PartitionGroupMetadata;
 import org.apache.pinot.spi.stream.PartitionLevelStreamConfig;
 
 
@@ -36,9 +38,10 @@ public class DefaultFlushThresholdUpdater implements FlushThresholdUpdater {
   }
 
   @Override
-  public void updateFlushThreshold(PartitionLevelStreamConfig streamConfig, SegmentZKMetadata newSegmentZKMetadata,
-      CommittingSegmentDescriptor committingSegmentDescriptor, @Nullable SegmentZKMetadata committingSegmentZKMetadata,
-      int maxNumPartitionsPerInstance) {
+  public void updateFlushThreshold(PartitionLevelStreamConfig streamConfig,
+      SegmentZKMetadata newSegmentZKMetadata, CommittingSegmentDescriptor committingSegmentDescriptor,
+      @Nullable SegmentZKMetadata committingSegmentZKMetadata, int maxNumPartitionsPerInstance,
+      List<PartitionGroupMetadata> partitionGroupMetadataList) {
     // Configure the segment size flush limit based on the maximum number of partitions allocated to an instance
     newSegmentZKMetadata.setSizeThresholdToFlushSegment(_tableFlushSize / maxNumPartitionsPerInstance);
   }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdater.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdater.java
index 379b3b3..c10cbab 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdater.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdater.java
@@ -18,8 +18,10 @@
  */
 package org.apache.pinot.controller.helix.core.realtime.segment;
 
+import java.util.List;
 import javax.annotation.Nullable;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.spi.stream.PartitionGroupMetadata;
 import org.apache.pinot.spi.stream.PartitionLevelStreamConfig;
 
 
@@ -33,6 +35,7 @@ public interface FlushThresholdUpdater {
    * Updates the flush threshold for the given segment ZK metadata
    */
   void updateFlushThreshold(PartitionLevelStreamConfig streamConfig, SegmentZKMetadata newSegmentZKMetadata,
-      CommittingSegmentDescriptor committingSegmentDescriptor, @Nullable SegmentZKMetadata committingSegmentZKMetadata,
-      int maxNumPartitionsPerInstance);
+      CommittingSegmentDescriptor committingSegmentDescriptor,
+      @Nullable SegmentZKMetadata committingSegmentZKMetadata, int maxNumPartitionsPerInstance,
+      List<PartitionGroupMetadata> partitionGroupMetadataList);
 }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentSizeBasedFlushThresholdUpdater.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentSizeBasedFlushThresholdUpdater.java
index 61b2cda..7372db1 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentSizeBasedFlushThresholdUpdater.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentSizeBasedFlushThresholdUpdater.java
@@ -19,9 +19,11 @@
 package org.apache.pinot.controller.helix.core.realtime.segment;
 
 import com.google.common.annotations.VisibleForTesting;
+import java.util.List;
 import javax.annotation.Nullable;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.spi.stream.PartitionGroupMetadata;
 import org.apache.pinot.spi.stream.PartitionLevelStreamConfig;
 import org.apache.pinot.spi.utils.TimeUtils;
 import org.slf4j.Logger;
@@ -56,7 +58,8 @@ public class SegmentSizeBasedFlushThresholdUpdater implements FlushThresholdUpda
   @Override
   public synchronized void updateFlushThreshold(PartitionLevelStreamConfig streamConfig,
       SegmentZKMetadata newSegmentZKMetadata, CommittingSegmentDescriptor committingSegmentDescriptor,
-      @Nullable SegmentZKMetadata committingSegmentZKMetadata, int maxNumPartitionsPerInstance) {
+      @Nullable SegmentZKMetadata committingSegmentZKMetadata, int maxNumPartitionsPerInstance,
+      List<PartitionGroupMetadata> partitionGroupMetadataList) {
     final long desiredSegmentSizeBytes = streamConfig.getFlushThresholdSegmentSizeBytes();
     final long timeThresholdMillis = streamConfig.getFlushThresholdTimeMillis();
     final int autotuneInitialRows = streamConfig.getFlushAutotuneInitialRows();
@@ -97,14 +100,18 @@ public class SegmentSizeBasedFlushThresholdUpdater implements FlushThresholdUpda
         committingSegmentSizeBytes);
 
     double currentRatio = (double) numRowsConsumed / committingSegmentSizeBytes;
-    // Compute segment size to rows ratio only from partition 0.
+    // Compute segment size to rows ratio only from the lowest available partition id.
     // If we consider all partitions then it is likely that we will assign a much higher weight to the most
     // recent trend in the table (since it is usually true that all partitions of the same table follow more or
     // less same characteristics at any one point in time).
     // However, when we start a new table or change controller mastership, we can have any partition completing first.
     // It is best to learn the ratio as quickly as we can, so we allow any partition to supply the value.
-    // FIXME: The stream may not have partition "0"
-    if (new LLCSegmentName(newSegmentName).getPartitionGroupId() == 0 || _latestSegmentRowsToSizeRatio == 0) {
+    int smallestAvailablePartitionGroupId =
+        partitionGroupMetadataList.stream().mapToInt(PartitionGroupMetadata::getPartitionGroupId).min()
+            .orElseGet(() -> 0);
+
+    if (new LLCSegmentName(newSegmentName).getPartitionGroupId() == smallestAvailablePartitionGroupId
+        || _latestSegmentRowsToSizeRatio == 0) {
       if (_latestSegmentRowsToSizeRatio > 0) {
         _latestSegmentRowsToSizeRatio =
             CURRENT_SEGMENT_RATIO_WEIGHT * currentRatio + PREVIOUS_SEGMENT_RATIO_WEIGHT * _latestSegmentRowsToSizeRatio;
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdaterTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdaterTest.java
index 841c575..b24f7ef 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdaterTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdaterTest.java
@@ -18,10 +18,14 @@
  */
 package org.apache.pinot.controller.helix.core.realtime.segment;
 
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.spi.stream.LongMsgOffset;
+import org.apache.pinot.spi.stream.PartitionGroupMetadata;
 import org.apache.pinot.spi.stream.PartitionLevelStreamConfig;
 import org.apache.pinot.spi.stream.StreamConfig;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
@@ -132,7 +136,8 @@ public class FlushThresholdUpdaterTest {
       SegmentZKMetadata newSegmentZKMetadata = getNewSegmentZKMetadata(0);
       CommittingSegmentDescriptor committingSegmentDescriptor = getCommittingSegmentDescriptor(0L);
       flushThresholdUpdater
-          .updateFlushThreshold(streamConfig, newSegmentZKMetadata, committingSegmentDescriptor, null, 1);
+          .updateFlushThreshold(streamConfig, newSegmentZKMetadata, committingSegmentDescriptor, null, 1,
+              Collections.emptyList());
       assertEquals(newSegmentZKMetadata.getSizeThresholdToFlushSegment(), streamConfig.getFlushAutotuneInitialRows());
 
       int numRuns = 500;
@@ -144,7 +149,45 @@ public class FlushThresholdUpdaterTest {
         SegmentZKMetadata committingSegmentZKMetadata =
             getCommittingSegmentZKMetadata(System.currentTimeMillis(), numRowsConsumed, numRowsConsumed);
         flushThresholdUpdater.updateFlushThreshold(streamConfig, newSegmentZKMetadata, committingSegmentDescriptor,
-            committingSegmentZKMetadata, 1);
+            committingSegmentZKMetadata, 1, Collections.emptyList());
+
+        // Assert that segment size is in limits
+        if (run > checkRunsAfter) {
+          assertTrue(segmentSizeBytes > segmentSizeLowerLimit && segmentSizeBytes < segmentSizeHigherLimit);
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testSegmentSizeBasedFlushThresholdMinPartition() {
+    PartitionLevelStreamConfig streamConfig = mockDefaultAutotuneStreamConfig();
+    long desiredSegmentSizeBytes = streamConfig.getFlushThresholdSegmentSizeBytes();
+    long segmentSizeLowerLimit = (long) (desiredSegmentSizeBytes * 0.99);
+    long segmentSizeHigherLimit = (long) (desiredSegmentSizeBytes * 1.01);
+
+    for (long[] segmentSizesMB : Arrays
+        .asList(EXPONENTIAL_GROWTH_SEGMENT_SIZES_MB, LOGARITHMIC_GROWTH_SEGMENT_SIZES_MB, STEPS_SEGMENT_SIZES_MB)) {
+      SegmentSizeBasedFlushThresholdUpdater flushThresholdUpdater = new SegmentSizeBasedFlushThresholdUpdater();
+
+      // Start consumption
+      SegmentZKMetadata newSegmentZKMetadata = getNewSegmentZKMetadata(1);
+      CommittingSegmentDescriptor committingSegmentDescriptor = getCommittingSegmentDescriptor(0L);
+      flushThresholdUpdater
+          .updateFlushThreshold(streamConfig, newSegmentZKMetadata, committingSegmentDescriptor, null, 1,
+              getPartitionGroupMetadataList(3, 1));
+      assertEquals(newSegmentZKMetadata.getSizeThresholdToFlushSegment(), streamConfig.getFlushAutotuneInitialRows());
+
+      int numRuns = 500;
+      int checkRunsAfter = 400;
+      for (int run = 0; run < numRuns; run++) {
+        int numRowsConsumed = newSegmentZKMetadata.getSizeThresholdToFlushSegment();
+        long segmentSizeBytes = getSegmentSizeBytes(numRowsConsumed, segmentSizesMB);
+        committingSegmentDescriptor = getCommittingSegmentDescriptor(segmentSizeBytes);
+        SegmentZKMetadata committingSegmentZKMetadata =
+            getCommittingSegmentZKMetadata(System.currentTimeMillis(), numRowsConsumed, numRowsConsumed);
+        flushThresholdUpdater.updateFlushThreshold(streamConfig, newSegmentZKMetadata, committingSegmentDescriptor,
+            committingSegmentZKMetadata, 1, getPartitionGroupMetadataList(3, 1));
 
         // Assert that segment size is in limits
         if (run > checkRunsAfter) {
@@ -159,6 +202,16 @@ public class FlushThresholdUpdaterTest {
         new LLCSegmentName(RAW_TABLE_NAME, partitionId, 0, System.currentTimeMillis()).getSegmentName());
   }
 
+  private List<PartitionGroupMetadata> getPartitionGroupMetadataList(int numPartitions, int startPartitionId) {
+    List<PartitionGroupMetadata> newPartitionGroupMetadataList = new ArrayList<>();
+
+    for (int i = 0; i < numPartitions; i++) {
+      newPartitionGroupMetadataList.add(new PartitionGroupMetadata(startPartitionId + i, null));
+    }
+
+    return newPartitionGroupMetadataList;
+  }
+
   private CommittingSegmentDescriptor getCommittingSegmentDescriptor(long segmentSizeBytes) {
     return new CommittingSegmentDescriptor(null, new LongMsgOffset(0).toString(), segmentSizeBytes);
   }
@@ -192,8 +245,8 @@ public class FlushThresholdUpdaterTest {
     // Start consumption
     SegmentZKMetadata newSegmentZKMetadata = getNewSegmentZKMetadata(0);
     CommittingSegmentDescriptor committingSegmentDescriptor = getCommittingSegmentDescriptor(0L);
-    flushThresholdUpdater
-        .updateFlushThreshold(streamConfig, newSegmentZKMetadata, committingSegmentDescriptor, null, 1);
+    flushThresholdUpdater.updateFlushThreshold(streamConfig, newSegmentZKMetadata, committingSegmentDescriptor, null, 1,
+        Collections.emptyList());
     int sizeThreshold = newSegmentZKMetadata.getSizeThresholdToFlushSegment();
 
     // First segment consumes rows less than the threshold
@@ -202,7 +255,7 @@ public class FlushThresholdUpdaterTest {
     SegmentZKMetadata committingSegmentZKMetadata =
         getCommittingSegmentZKMetadata(System.currentTimeMillis(), sizeThreshold, numRowsConsumed);
     flushThresholdUpdater.updateFlushThreshold(streamConfig, newSegmentZKMetadata, committingSegmentDescriptor,
-        committingSegmentZKMetadata, 1);
+        committingSegmentZKMetadata, 1, Collections.emptyList());
     sizeThreshold = newSegmentZKMetadata.getSizeThresholdToFlushSegment();
     assertEquals(sizeThreshold,
         (int) (numRowsConsumed * SegmentSizeBasedFlushThresholdUpdater.ROWS_MULTIPLIER_WHEN_TIME_THRESHOLD_HIT));
@@ -212,7 +265,7 @@ public class FlushThresholdUpdaterTest {
     committingSegmentZKMetadata =
         getCommittingSegmentZKMetadata(System.currentTimeMillis(), sizeThreshold, numRowsConsumed);
     flushThresholdUpdater.updateFlushThreshold(streamConfig, newSegmentZKMetadata, committingSegmentDescriptor,
-        committingSegmentZKMetadata, 1);
+        committingSegmentZKMetadata, 1, Collections.emptyList());
     assertNotEquals(newSegmentZKMetadata.getSizeThresholdToFlushSegment(),
         (int) (numRowsConsumed * SegmentSizeBasedFlushThresholdUpdater.ROWS_MULTIPLIER_WHEN_TIME_THRESHOLD_HIT));
   }
@@ -225,8 +278,8 @@ public class FlushThresholdUpdaterTest {
     // Start consumption
     SegmentZKMetadata newSegmentZKMetadata = getNewSegmentZKMetadata(0);
     CommittingSegmentDescriptor committingSegmentDescriptor = getCommittingSegmentDescriptor(0L);
-    flushThresholdUpdater
-        .updateFlushThreshold(streamConfig, newSegmentZKMetadata, committingSegmentDescriptor, null, 1);
+    flushThresholdUpdater.updateFlushThreshold(streamConfig, newSegmentZKMetadata, committingSegmentDescriptor, null, 1,
+        Collections.emptyList());
     int sizeThreshold = newSegmentZKMetadata.getSizeThresholdToFlushSegment();
 
     // First segment only consumed 15 rows, so next segment should have size threshold of 10_000
@@ -235,7 +288,7 @@ public class FlushThresholdUpdaterTest {
     SegmentZKMetadata committingSegmentZKMetadata =
         getCommittingSegmentZKMetadata(System.currentTimeMillis(), sizeThreshold, numRowsConsumed);
     flushThresholdUpdater.updateFlushThreshold(streamConfig, newSegmentZKMetadata, committingSegmentDescriptor,
-        committingSegmentZKMetadata, 1);
+        committingSegmentZKMetadata, 1, Collections.emptyList());
     sizeThreshold = newSegmentZKMetadata.getSizeThresholdToFlushSegment();
     assertEquals(sizeThreshold, SegmentSizeBasedFlushThresholdUpdater.MINIMUM_NUM_ROWS_THRESHOLD);
 
@@ -244,7 +297,7 @@ public class FlushThresholdUpdaterTest {
     committingSegmentZKMetadata =
         getCommittingSegmentZKMetadata(System.currentTimeMillis(), sizeThreshold, numRowsConsumed);
     flushThresholdUpdater.updateFlushThreshold(streamConfig, newSegmentZKMetadata, committingSegmentDescriptor,
-        committingSegmentZKMetadata, 1);
+        committingSegmentZKMetadata, 1, Collections.emptyList());
     sizeThreshold = newSegmentZKMetadata.getSizeThresholdToFlushSegment();
     assertEquals(sizeThreshold, SegmentSizeBasedFlushThresholdUpdater.MINIMUM_NUM_ROWS_THRESHOLD);
   }
@@ -259,9 +312,11 @@ public class FlushThresholdUpdaterTest {
     SegmentZKMetadata newSegmentZKMetadataForPartition1 = getNewSegmentZKMetadata(1);
     CommittingSegmentDescriptor committingSegmentDescriptor = getCommittingSegmentDescriptor(0L);
     flushThresholdUpdater
-        .updateFlushThreshold(streamConfig, newSegmentZKMetadataForPartition0, committingSegmentDescriptor, null, 1);
+        .updateFlushThreshold(streamConfig, newSegmentZKMetadataForPartition0, committingSegmentDescriptor, null, 1,
+            Collections.emptyList());
     flushThresholdUpdater
-        .updateFlushThreshold(streamConfig, newSegmentZKMetadataForPartition1, committingSegmentDescriptor, null, 1);
+        .updateFlushThreshold(streamConfig, newSegmentZKMetadataForPartition1, committingSegmentDescriptor, null, 1,
+            Collections.emptyList());
     int sizeThresholdForPartition0 = newSegmentZKMetadataForPartition0.getSizeThresholdToFlushSegment();
     int sizeThresholdForPartition1 = newSegmentZKMetadataForPartition1.getSizeThresholdToFlushSegment();
     double sizeRatio = flushThresholdUpdater.getLatestSegmentRowsToSizeRatio();
@@ -276,7 +331,7 @@ public class FlushThresholdUpdaterTest {
             sizeThresholdForPartition1);
     flushThresholdUpdater
         .updateFlushThreshold(streamConfig, newSegmentZKMetadataForPartition1, committingSegmentDescriptor,
-            committingSegmentZKMetadata, 1);
+            committingSegmentZKMetadata, 1, Collections.emptyList());
     sizeThresholdForPartition1 = newSegmentZKMetadataForPartition1.getSizeThresholdToFlushSegment();
     sizeRatio = flushThresholdUpdater.getLatestSegmentRowsToSizeRatio();
     assertTrue(sizeRatio > 0.0);
@@ -287,7 +342,7 @@ public class FlushThresholdUpdaterTest {
         sizeThresholdForPartition1);
     flushThresholdUpdater
         .updateFlushThreshold(streamConfig, newSegmentZKMetadataForPartition1, committingSegmentDescriptor,
-            committingSegmentZKMetadata, 1);
+            committingSegmentZKMetadata, 1, Collections.emptyList());
     assertEquals(flushThresholdUpdater.getLatestSegmentRowsToSizeRatio(), sizeRatio);
 
     // First segment update from partition 0 should change the size ratio
@@ -295,7 +350,7 @@ public class FlushThresholdUpdaterTest {
         sizeThresholdForPartition0);
     flushThresholdUpdater
         .updateFlushThreshold(streamConfig, newSegmentZKMetadataForPartition0, committingSegmentDescriptor,
-            committingSegmentZKMetadata, 1);
+            committingSegmentZKMetadata, 1, Collections.emptyList());
     assertNotEquals(flushThresholdUpdater.getLatestSegmentRowsToSizeRatio(), sizeRatio);
   }
 
@@ -313,8 +368,8 @@ public class FlushThresholdUpdaterTest {
     // Start consumption
     SegmentZKMetadata newSegmentZKMetadata = getNewSegmentZKMetadata(0);
     CommittingSegmentDescriptor committingSegmentDescriptor = getCommittingSegmentDescriptor(0L);
-    flushThresholdUpdater
-        .updateFlushThreshold(streamConfig, newSegmentZKMetadata, committingSegmentDescriptor, null, 1);
+    flushThresholdUpdater.updateFlushThreshold(streamConfig, newSegmentZKMetadata, committingSegmentDescriptor, null, 1,
+        Collections.emptyList());
     int sizeThreshold = newSegmentZKMetadata.getSizeThresholdToFlushSegment();
     assertEquals(sizeThreshold, flushAutotuneInitialRows);
 
@@ -328,7 +383,7 @@ public class FlushThresholdUpdaterTest {
     SegmentZKMetadata committingSegmentZKMetadata =
         getCommittingSegmentZKMetadata(creationTime, sizeThreshold, numRowsConsumed);
     flushThresholdUpdater.updateFlushThreshold(streamConfig, newSegmentZKMetadata, committingSegmentDescriptor,
-        committingSegmentZKMetadata, 1);
+        committingSegmentZKMetadata, 1, Collections.emptyList());
     sizeThreshold = newSegmentZKMetadata.getSizeThresholdToFlushSegment();
     assertTrue(sizeThreshold > numRowsConsumed);
 
@@ -341,7 +396,7 @@ public class FlushThresholdUpdaterTest {
         mockAutotuneStreamConfig(flushSegmentDesiredSizeBytes, flushThresholdTimeMillis, flushAutotuneInitialRows);
     committingSegmentZKMetadata = getCommittingSegmentZKMetadata(creationTime, sizeThreshold, numRowsConsumed);
     flushThresholdUpdater.updateFlushThreshold(streamConfig, newSegmentZKMetadata, committingSegmentDescriptor,
-        committingSegmentZKMetadata, 1);
+        committingSegmentZKMetadata, 1, Collections.emptyList());
     sizeThreshold = newSegmentZKMetadata.getSizeThresholdToFlushSegment();
     assertTrue(sizeThreshold < numRowsConsumed);
 
@@ -352,7 +407,7 @@ public class FlushThresholdUpdaterTest {
     committingSegmentDescriptor = getCommittingSegmentDescriptor(committingSegmentSize);
     committingSegmentZKMetadata = getCommittingSegmentZKMetadata(creationTime, sizeThreshold, numRowsConsumed);
     flushThresholdUpdater.updateFlushThreshold(streamConfig, newSegmentZKMetadata, committingSegmentDescriptor,
-        committingSegmentZKMetadata, 1);
+        committingSegmentZKMetadata, 1, Collections.emptyList());
     sizeThreshold = newSegmentZKMetadata.getSizeThresholdToFlushSegment();
     assertEquals(sizeThreshold,
         (long) (numRowsConsumed * SegmentSizeBasedFlushThresholdUpdater.ROWS_MULTIPLIER_WHEN_TIME_THRESHOLD_HIT));
@@ -365,7 +420,7 @@ public class FlushThresholdUpdaterTest {
         mockAutotuneStreamConfig(flushSegmentDesiredSizeBytes, flushThresholdTimeMillis, flushAutotuneInitialRows);
     committingSegmentZKMetadata = getCommittingSegmentZKMetadata(creationTime, sizeThreshold, numRowsConsumed);
     flushThresholdUpdater.updateFlushThreshold(streamConfig, newSegmentZKMetadata, committingSegmentDescriptor,
-        committingSegmentZKMetadata, 1);
+        committingSegmentZKMetadata, 1, Collections.emptyList());
     sizeThreshold = newSegmentZKMetadata.getSizeThresholdToFlushSegment();
     assertTrue(sizeThreshold < numRowsConsumed);
   }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org