You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by kh...@apache.org on 2021/06/16 13:10:54 UTC

[incubator-pinot] branch threshold_updater_patch created (now c42d296)

This is an automated email from the ASF dual-hosted git repository.

kharekartik pushed a change to branch threshold_updater_patch
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git.


      at c42d296  Add support for lowest partition id in case partition id 0 is not available

This branch includes the following new commits:

     new c42d296  Add support for lowest partition id in case partition id 0 is not available

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 01/01: Add support for lowest partition id in case partition id 0 is not available

Posted by kh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kharekartik pushed a commit to branch threshold_updater_patch
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit c42d296786f1dc1bb3773d7cb2ac532e57e5dc33
Author: KKcorps <kh...@gmail.com>
AuthorDate: Wed Jun 16 18:37:37 2021 +0530

    Add support for lowest partition id in case partition id 0 is not available
---
 .../realtime/PinotLLCRealtimeSegmentManager.java   | 96 ++++++++++++----------
 .../segment/DefaultFlushThresholdUpdater.java      |  5 +-
 .../realtime/segment/FlushThresholdUpdater.java    |  5 +-
 .../SegmentSizeBasedFlushThresholdUpdater.java     | 17 +++-
 .../segment/FlushThresholdUpdaterTest.java         | 46 ++++++-----
 5 files changed, 101 insertions(+), 68 deletions(-)

diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index d26a434..c9182a4 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -254,10 +254,11 @@ public class PinotLLCRealtimeSegmentManager {
 
     _flushThresholdUpdateManager.clearFlushThresholdUpdater(realtimeTableName);
 
-    PartitionLevelStreamConfig streamConfig =
-        new PartitionLevelStreamConfig(tableConfig.getTableName(), IngestionConfigUtils.getStreamConfigMap(tableConfig));
+    PartitionLevelStreamConfig streamConfig = new PartitionLevelStreamConfig(tableConfig.getTableName(),
+        IngestionConfigUtils.getStreamConfigMap(tableConfig));
     InstancePartitions instancePartitions = getConsumingInstancePartitions(tableConfig);
-    List<PartitionGroupMetadata> newPartitionGroupMetadataList = getNewPartitionGroupMetadataList(streamConfig, Collections.emptyList());
+    List<PartitionGroupMetadata> newPartitionGroupMetadataList =
+        getNewPartitionGroupMetadataList(streamConfig, Collections.emptyList());
     int numPartitionGroups = newPartitionGroupMetadataList.size();
     int numReplicas = getNumReplicas(tableConfig, instancePartitions);
 
@@ -268,8 +269,9 @@ public class PinotLLCRealtimeSegmentManager {
     long currentTimeMs = getCurrentTimeMs();
     Map<String, Map<String, String>> instanceStatesMap = idealState.getRecord().getMapFields();
     for (PartitionGroupMetadata partitionGroupMetadata : newPartitionGroupMetadataList) {
-      String segmentName = setupNewPartitionGroup(tableConfig, streamConfig, partitionGroupMetadata,
-          currentTimeMs, instancePartitions, numPartitionGroups, numReplicas);
+      String segmentName =
+          setupNewPartitionGroup(tableConfig, streamConfig, partitionGroupMetadata, currentTimeMs, instancePartitions,
+              numPartitionGroups, numReplicas);
 
       updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null, segmentName, segmentAssignment,
           instancePartitionsMap);
@@ -504,7 +506,8 @@ public class PinotLLCRealtimeSegmentManager {
     List<PartitionGroupMetadata> newPartitionGroupMetadataList =
         getNewPartitionGroupMetadataList(streamConfig, currentPartitionGroupConsumptionStatusList);
     Set<Integer> newPartitionGroupSet =
-        newPartitionGroupMetadataList.stream().map(PartitionGroupMetadata::getPartitionGroupId).collect(Collectors.toSet());
+        newPartitionGroupMetadataList.stream().map(PartitionGroupMetadata::getPartitionGroupId)
+            .collect(Collectors.toSet());
     int numPartitionGroups = newPartitionGroupMetadataList.size();
 
     // Only if committingSegment's partitionGroup is present in the newPartitionGroupMetadataList, we create new segment metadata
@@ -515,7 +518,8 @@ public class PinotLLCRealtimeSegmentManager {
       LLCSegmentName newLLCSegment = new LLCSegmentName(rawTableName, committingSegmentPartitionGroupId,
           committingLLCSegment.getSequenceNumber() + 1, newSegmentCreationTimeMs);
       createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegment, newSegmentCreationTimeMs,
-          committingSegmentDescriptor, committingSegmentZKMetadata, instancePartitions, numPartitionGroups, numReplicas);
+          committingSegmentDescriptor, committingSegmentZKMetadata, instancePartitions, numPartitionGroups, numReplicas,
+          newPartitionGroupMetadataList);
       newConsumingSegmentName = newLLCSegment.getSegmentName();
     }
 
@@ -609,7 +613,7 @@ public class PinotLLCRealtimeSegmentManager {
   private void createNewSegmentZKMetadata(TableConfig tableConfig, PartitionLevelStreamConfig streamConfig,
       LLCSegmentName newLLCSegmentName, long creationTimeMs, CommittingSegmentDescriptor committingSegmentDescriptor,
       @Nullable LLCRealtimeSegmentZKMetadata committingSegmentZKMetadata, InstancePartitions instancePartitions,
-      int numPartitionGroups, int numReplicas) {
+      int numPartitionGroups, int numReplicas, List<PartitionGroupMetadata> partitionGroupMetadataList) {
     String realtimeTableName = tableConfig.getTableName();
     String segmentName = newLLCSegmentName.getSegmentName();
     String startOffset = committingSegmentDescriptor.getNextOffset();
@@ -637,7 +641,9 @@ public class PinotLLCRealtimeSegmentManager {
     // Update the flush threshold
     FlushThresholdUpdater flushThresholdUpdater = _flushThresholdUpdateManager.getFlushThresholdUpdater(streamConfig);
     flushThresholdUpdater.updateFlushThreshold(streamConfig, newSegmentZKMetadata, committingSegmentDescriptor,
-        committingSegmentZKMetadata, getMaxNumPartitionsPerInstance(instancePartitions, numPartitionGroups, numReplicas));
+        committingSegmentZKMetadata,
+        getMaxNumPartitionsPerInstance(instancePartitions, numPartitionGroups, numReplicas),
+        partitionGroupMetadataList);
 
     persistSegmentZKMetadata(realtimeTableName, newSegmentZKMetadata, -1);
   }
@@ -706,8 +712,8 @@ public class PinotLLCRealtimeSegmentManager {
   @VisibleForTesting
   List<PartitionGroupMetadata> getNewPartitionGroupMetadataList(StreamConfig streamConfig,
       List<PartitionGroupConsumptionStatus> currentPartitionGroupConsumptionStatusList) {
-    return PinotTableIdealStateBuilder.getPartitionGroupMetadataList(streamConfig,
-        currentPartitionGroupConsumptionStatusList);
+    return PinotTableIdealStateBuilder
+        .getPartitionGroupMetadataList(streamConfig, currentPartitionGroupConsumptionStatusList);
   }
 
   /**
@@ -756,15 +762,15 @@ public class PinotLLCRealtimeSegmentManager {
       LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
       latestLLCSegmentNameMap.compute(llcSegmentName.getPartitionGroupId(), (partitionId, latestLLCSegmentName) -> {
         if (latestLLCSegmentName == null) {
-              return llcSegmentName;
-            } else {
-              if (llcSegmentName.getSequenceNumber() > latestLLCSegmentName.getSequenceNumber()) {
-                return llcSegmentName;
-              } else {
-                return latestLLCSegmentName;
-              }
-            }
-          });
+          return llcSegmentName;
+        } else {
+          if (llcSegmentName.getSequenceNumber() > latestLLCSegmentName.getSequenceNumber()) {
+            return llcSegmentName;
+          } else {
+            return latestLLCSegmentName;
+          }
+        }
+      });
     }
 
     Map<Integer, LLCRealtimeSegmentZKMetadata> latestSegmentZKMetadataMap = new HashMap<>();
@@ -815,7 +821,6 @@ public class PinotLLCRealtimeSegmentManager {
         List<PartitionGroupMetadata> newPartitionGroupMetadataList =
             getNewPartitionGroupMetadataList(streamConfig, currentPartitionGroupConsumptionStatusList);
         return ensureAllPartitionsConsuming(tableConfig, streamConfig, idealState, newPartitionGroupMetadataList);
-
       } else {
         LOGGER.info("Skipping LLC segments validation for disabled table: {}", realtimeTableName);
         return idealState;
@@ -876,14 +881,17 @@ public class PinotLLCRealtimeSegmentManager {
       for (String segmentNameStr : instanceStatesMap.keySet()) {
         LLCSegmentName llcSegmentName = new LLCSegmentName(segmentNameStr);
         if (llcSegmentName.getPartitionGroupId() == partitionId && llcSegmentName.getSequenceNumber() == seqNum) {
-          String errorMsg = String.format("Segment %s is a duplicate of existing segment %s", newSegmentName, segmentNameStr);
+          String errorMsg =
+              String.format("Segment %s is a duplicate of existing segment %s", newSegmentName, segmentNameStr);
           LOGGER.error(errorMsg);
           throw new HelixHelper.PermanentUpdaterException(errorMsg);
         }
       }
       // Assign instances to the new segment and add instances as state CONSUMING
-      List<String> instancesAssigned = segmentAssignment.assignSegment(newSegmentName, instanceStatesMap, instancePartitionsMap);
-      instanceStatesMap.put(newSegmentName, SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentStateModel.CONSUMING));
+      List<String> instancesAssigned =
+          segmentAssignment.assignSegment(newSegmentName, instanceStatesMap, instancePartitionsMap);
+      instanceStatesMap.put(newSegmentName,
+          SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentStateModel.CONSUMING));
       LOGGER.info("Adding new CONSUMING segment: {} to instances: {}", newSegmentName, instancesAssigned);
     }
   }
@@ -964,7 +972,8 @@ public class PinotLLCRealtimeSegmentManager {
     int numReplicas = getNumReplicas(tableConfig, instancePartitions);
     int numPartitions = newPartitionGroupMetadataList.size();
     Set<Integer> newPartitionGroupSet =
-        newPartitionGroupMetadataList.stream().map(PartitionGroupMetadata::getPartitionGroupId).collect(Collectors.toSet());
+        newPartitionGroupMetadataList.stream().map(PartitionGroupMetadata::getPartitionGroupId)
+            .collect(Collectors.toSet());
 
     SegmentAssignment segmentAssignment = SegmentAssignmentFactory.getSegmentAssignment(_helixManager, tableConfig);
     Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap =
@@ -1016,17 +1025,18 @@ public class PinotLLCRealtimeSegmentManager {
 
               LLCSegmentName newLLCSegmentName = getNextLLCSegmentName(latestLLCSegmentName, currentTimeMs);
               String newSegmentName = newLLCSegmentName.getSegmentName();
-              CommittingSegmentDescriptor committingSegmentDescriptor = new CommittingSegmentDescriptor(latestSegmentName,
-                  (offsetFactory.create(latestSegmentZKMetadata.getEndOffset()).toString()), 0);
+              CommittingSegmentDescriptor committingSegmentDescriptor =
+                  new CommittingSegmentDescriptor(latestSegmentName,
+                      (offsetFactory.create(latestSegmentZKMetadata.getEndOffset()).toString()), 0);
               createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName, currentTimeMs,
-                  committingSegmentDescriptor, latestSegmentZKMetadata, instancePartitions, numPartitions, numReplicas);
+                  committingSegmentDescriptor, latestSegmentZKMetadata, instancePartitions, numPartitions, numReplicas,
+                  newPartitionGroupMetadataList);
               updateInstanceStatesForNewConsumingSegment(instanceStatesMap, latestSegmentName, newSegmentName,
                   segmentAssignment, instancePartitionsMap);
             } else { // partition group reached end of life
-              LOGGER.info(
-                  "PartitionGroup: {} has reached end of life. Updating ideal state for segment: {}. "
-                      + "Skipping creation of new ZK metadata and new segment in ideal state",
-                  partitionGroupId, latestSegmentName);
+              LOGGER.info("PartitionGroup: {} has reached end of life. Updating ideal state for segment: {}. "
+                      + "Skipping creation of new ZK metadata and new segment in ideal state", partitionGroupId,
+                  latestSegmentName);
               updateInstanceStatesForNewConsumingSegment(instanceStatesMap, latestSegmentName, null, segmentAssignment,
                   instancePartitionsMap);
             }
@@ -1058,7 +1068,8 @@ public class PinotLLCRealtimeSegmentManager {
             CommittingSegmentDescriptor committingSegmentDescriptor =
                 new CommittingSegmentDescriptor(latestSegmentName, startOffset.toString(), 0);
             createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName, currentTimeMs,
-                committingSegmentDescriptor, latestSegmentZKMetadata, instancePartitions, numPartitions, numReplicas);
+                committingSegmentDescriptor, latestSegmentZKMetadata, instancePartitions, numPartitions, numReplicas,
+                newPartitionGroupMetadataList);
             String newSegmentName = newLLCSegmentName.getSegmentName();
             updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null, newSegmentName, segmentAssignment,
                 instancePartitionsMap);
@@ -1068,7 +1079,8 @@ public class PinotLLCRealtimeSegmentManager {
               // not OFFLINE. That is an unexpected state which cannot be fixed by the validation manager currently. In
               // that case, we need to either extend this part to handle the state, or prevent segments from getting into
               // such state.
-              LOGGER.error("Got unexpected instance state map: {} for segment: {}", instanceStateMap, latestSegmentName);
+              LOGGER
+                  .error("Got unexpected instance state map: {} for segment: {}", instanceStateMap, latestSegmentName);
             }
             // else, the partition group has reached end of life. This is an acceptable state
           }
@@ -1116,8 +1128,8 @@ public class PinotLLCRealtimeSegmentManager {
       int partitionGroupId = partitionGroupMetadata.getPartitionGroupId();
       if (!latestSegmentZKMetadataMap.containsKey(partitionGroupId)) {
         String newSegmentName =
-            setupNewPartitionGroup(tableConfig, streamConfig, partitionGroupMetadata, currentTimeMs, instancePartitions, numPartitions,
-                numReplicas);
+            setupNewPartitionGroup(tableConfig, streamConfig, partitionGroupMetadata, currentTimeMs, instancePartitions,
+                numPartitions, numReplicas);
         updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null, newSegmentName, segmentAssignment,
             instancePartitionsMap);
       }
@@ -1154,8 +1166,9 @@ public class PinotLLCRealtimeSegmentManager {
    * Sets up a new partition group.
    * <p>Persists the ZK metadata for the first CONSUMING segment, and returns the segment name.
    */
-  private String setupNewPartitionGroup(TableConfig tableConfig, PartitionLevelStreamConfig streamConfig, PartitionGroupMetadata partitionGroupMetadata,
-      long creationTimeMs, InstancePartitions instancePartitions, int numPartitionGroups, int numReplicas) {
+  private String setupNewPartitionGroup(TableConfig tableConfig, PartitionLevelStreamConfig streamConfig,
+      PartitionGroupMetadata partitionGroupMetadata, long creationTimeMs, InstancePartitions instancePartitions,
+      int numPartitionGroups, int numReplicas) {
     String realtimeTableName = tableConfig.getTableName();
     int partitionGroupId = partitionGroupMetadata.getPartitionGroupId();
     String startOffset = partitionGroupMetadata.getStartOffset().toString();
@@ -1166,15 +1179,14 @@ public class PinotLLCRealtimeSegmentManager {
         new LLCSegmentName(rawTableName, partitionGroupId, STARTING_SEQUENCE_NUMBER, creationTimeMs);
     String newSegmentName = newLLCSegmentName.getSegmentName();
 
-    CommittingSegmentDescriptor committingSegmentDescriptor =
-        new CommittingSegmentDescriptor(null, startOffset, 0);
+    CommittingSegmentDescriptor committingSegmentDescriptor = new CommittingSegmentDescriptor(null, startOffset, 0);
     createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName, creationTimeMs,
-        committingSegmentDescriptor, null, instancePartitions, numPartitionGroups, numReplicas);
+        committingSegmentDescriptor, null, instancePartitions, numPartitionGroups, numReplicas,
+        Collections.singletonList(partitionGroupMetadata));
 
     return newSegmentName;
   }
 
-
   @VisibleForTesting
   long getCurrentTimeMs() {
     return System.currentTimeMillis();
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/DefaultFlushThresholdUpdater.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/DefaultFlushThresholdUpdater.java
index 5a68497..fda9882 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/DefaultFlushThresholdUpdater.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/DefaultFlushThresholdUpdater.java
@@ -19,8 +19,10 @@
 package org.apache.pinot.controller.helix.core.realtime.segment;
 
 import com.google.common.annotations.VisibleForTesting;
+import java.util.List;
 import javax.annotation.Nullable;
 import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
+import org.apache.pinot.spi.stream.PartitionGroupMetadata;
 import org.apache.pinot.spi.stream.PartitionLevelStreamConfig;
 
 
@@ -38,7 +40,8 @@ public class DefaultFlushThresholdUpdater implements FlushThresholdUpdater {
   @Override
   public void updateFlushThreshold(PartitionLevelStreamConfig streamConfig,
       LLCRealtimeSegmentZKMetadata newSegmentZKMetadata, CommittingSegmentDescriptor committingSegmentDescriptor,
-      @Nullable LLCRealtimeSegmentZKMetadata committingSegmentZKMetadata, int maxNumPartitionsPerInstance) {
+      @Nullable LLCRealtimeSegmentZKMetadata committingSegmentZKMetadata, int maxNumPartitionsPerInstance,
+      List<PartitionGroupMetadata> partitionGroupMetadataList) {
     // Configure the segment size flush limit based on the maximum number of partitions allocated to an instance
     newSegmentZKMetadata.setSizeThresholdToFlushSegment(_tableFlushSize / maxNumPartitionsPerInstance);
   }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdater.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdater.java
index 544975a..87e8312 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdater.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdater.java
@@ -18,8 +18,10 @@
  */
 package org.apache.pinot.controller.helix.core.realtime.segment;
 
+import java.util.List;
 import javax.annotation.Nullable;
 import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
+import org.apache.pinot.spi.stream.PartitionGroupMetadata;
 import org.apache.pinot.spi.stream.PartitionLevelStreamConfig;
 
 
@@ -34,5 +36,6 @@ public interface FlushThresholdUpdater {
    */
   void updateFlushThreshold(PartitionLevelStreamConfig streamConfig, LLCRealtimeSegmentZKMetadata newSegmentZKMetadata,
       CommittingSegmentDescriptor committingSegmentDescriptor,
-      @Nullable LLCRealtimeSegmentZKMetadata committingSegmentZKMetadata, int maxNumPartitionsPerInstance);
+      @Nullable LLCRealtimeSegmentZKMetadata committingSegmentZKMetadata, int maxNumPartitionsPerInstance,
+      List<PartitionGroupMetadata> partitionGroupMetadataList);
 }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentSizeBasedFlushThresholdUpdater.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentSizeBasedFlushThresholdUpdater.java
index ba74503..44dd957 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentSizeBasedFlushThresholdUpdater.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentSizeBasedFlushThresholdUpdater.java
@@ -19,9 +19,13 @@
 package org.apache.pinot.controller.helix.core.realtime.segment;
 
 import com.google.common.annotations.VisibleForTesting;
+import java.util.Comparator;
+import java.util.List;
+import java.util.function.Supplier;
 import javax.annotation.Nullable;
 import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
 import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.spi.stream.PartitionGroupMetadata;
 import org.apache.pinot.spi.stream.PartitionLevelStreamConfig;
 import org.apache.pinot.spi.utils.TimeUtils;
 import org.slf4j.Logger;
@@ -55,7 +59,8 @@ public class SegmentSizeBasedFlushThresholdUpdater implements FlushThresholdUpda
   @Override
   public synchronized void updateFlushThreshold(PartitionLevelStreamConfig streamConfig,
       LLCRealtimeSegmentZKMetadata newSegmentZKMetadata, CommittingSegmentDescriptor committingSegmentDescriptor,
-      @Nullable LLCRealtimeSegmentZKMetadata committingSegmentZKMetadata, int maxNumPartitionsPerInstance) {
+      @Nullable LLCRealtimeSegmentZKMetadata committingSegmentZKMetadata, int maxNumPartitionsPerInstance,
+      List<PartitionGroupMetadata> partitionGroupMetadataList) {
     final long desiredSegmentSizeBytes = streamConfig.getFlushThresholdSegmentSizeBytes();
     final long timeThresholdMillis = streamConfig.getFlushThresholdTimeMillis();
     final int autotuneInitialRows = streamConfig.getFlushAutotuneInitialRows();
@@ -102,8 +107,14 @@ public class SegmentSizeBasedFlushThresholdUpdater implements FlushThresholdUpda
     // less same characteristics at any one point in time).
     // However, when we start a new table or change controller mastership, we can have any partition completing first.
     // It is best to learn the ratio as quickly as we can, so we allow any partition to supply the value.
-    // FIXME: The stream may not have partition "0"
-    if (new LLCSegmentName(newSegmentName).getPartitionGroupId() == 0 || _latestSegmentRowsToSizeRatio == 0) {
+
+    // Partition group id 0 might not be available always. We take the smallest available partition id in that case to update the threshold
+    int smallestAvailablePartitionGroupId =
+        partitionGroupMetadataList.stream().min(Comparator.comparingInt(PartitionGroupMetadata::getPartitionGroupId))
+            .map(PartitionGroupMetadata::getPartitionGroupId).orElseGet(() -> 0);
+
+    if (new LLCSegmentName(newSegmentName).getPartitionGroupId() == smallestAvailablePartitionGroupId
+        || _latestSegmentRowsToSizeRatio == 0) {
       if (_latestSegmentRowsToSizeRatio > 0) {
         _latestSegmentRowsToSizeRatio =
             CURRENT_SEGMENT_RATIO_WEIGHT * currentRatio + PREVIOUS_SEGMENT_RATIO_WEIGHT * _latestSegmentRowsToSizeRatio;
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdaterTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdaterTest.java
index 396e4bd..811867a 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdaterTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdaterTest.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.controller.helix.core.realtime.segment;
 
 import java.util.Arrays;
+import java.util.Collections;
 import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
 import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.spi.stream.LongMsgOffset;
@@ -132,7 +133,8 @@ public class FlushThresholdUpdaterTest {
       LLCRealtimeSegmentZKMetadata newSegmentZKMetadata = getNewSegmentZKMetadata(0);
       CommittingSegmentDescriptor committingSegmentDescriptor = getCommittingSegmentDescriptor(0L);
       flushThresholdUpdater
-          .updateFlushThreshold(streamConfig, newSegmentZKMetadata, committingSegmentDescriptor, null, 1);
+          .updateFlushThreshold(streamConfig, newSegmentZKMetadata, committingSegmentDescriptor, null, 1,
+              Collections.emptyList());
       assertEquals(newSegmentZKMetadata.getSizeThresholdToFlushSegment(), streamConfig.getFlushAutotuneInitialRows());
 
       int numRuns = 500;
@@ -144,7 +146,7 @@ public class FlushThresholdUpdaterTest {
         LLCRealtimeSegmentZKMetadata committingSegmentZKMetadata =
             getCommittingSegmentZKMetadata(System.currentTimeMillis(), numRowsConsumed, numRowsConsumed);
         flushThresholdUpdater.updateFlushThreshold(streamConfig, newSegmentZKMetadata, committingSegmentDescriptor,
-            committingSegmentZKMetadata, 1);
+            committingSegmentZKMetadata, 1, Collections.emptyList());
 
         // Assert that segment size is in limits
         if (run > checkRunsAfter) {
@@ -194,8 +196,8 @@ public class FlushThresholdUpdaterTest {
     // Start consumption
     LLCRealtimeSegmentZKMetadata newSegmentZKMetadata = getNewSegmentZKMetadata(0);
     CommittingSegmentDescriptor committingSegmentDescriptor = getCommittingSegmentDescriptor(0L);
-    flushThresholdUpdater
-        .updateFlushThreshold(streamConfig, newSegmentZKMetadata, committingSegmentDescriptor, null, 1);
+    flushThresholdUpdater.updateFlushThreshold(streamConfig, newSegmentZKMetadata, committingSegmentDescriptor, null, 1,
+        Collections.emptyList());
     int sizeThreshold = newSegmentZKMetadata.getSizeThresholdToFlushSegment();
 
     // First segment consumes rows less than the threshold
@@ -204,7 +206,7 @@ public class FlushThresholdUpdaterTest {
     LLCRealtimeSegmentZKMetadata committingSegmentZKMetadata =
         getCommittingSegmentZKMetadata(System.currentTimeMillis(), sizeThreshold, numRowsConsumed);
     flushThresholdUpdater.updateFlushThreshold(streamConfig, newSegmentZKMetadata, committingSegmentDescriptor,
-        committingSegmentZKMetadata, 1);
+        committingSegmentZKMetadata, 1, Collections.emptyList());
     sizeThreshold = newSegmentZKMetadata.getSizeThresholdToFlushSegment();
     assertEquals(sizeThreshold,
         (int) (numRowsConsumed * SegmentSizeBasedFlushThresholdUpdater.ROWS_MULTIPLIER_WHEN_TIME_THRESHOLD_HIT));
@@ -214,7 +216,7 @@ public class FlushThresholdUpdaterTest {
     committingSegmentZKMetadata =
         getCommittingSegmentZKMetadata(System.currentTimeMillis(), sizeThreshold, numRowsConsumed);
     flushThresholdUpdater.updateFlushThreshold(streamConfig, newSegmentZKMetadata, committingSegmentDescriptor,
-        committingSegmentZKMetadata, 1);
+        committingSegmentZKMetadata, 1, Collections.emptyList());
     assertNotEquals(newSegmentZKMetadata.getSizeThresholdToFlushSegment(),
         (int) (numRowsConsumed * SegmentSizeBasedFlushThresholdUpdater.ROWS_MULTIPLIER_WHEN_TIME_THRESHOLD_HIT));
   }
@@ -227,8 +229,8 @@ public class FlushThresholdUpdaterTest {
     // Start consumption
     LLCRealtimeSegmentZKMetadata newSegmentZKMetadata = getNewSegmentZKMetadata(0);
     CommittingSegmentDescriptor committingSegmentDescriptor = getCommittingSegmentDescriptor(0L);
-    flushThresholdUpdater
-        .updateFlushThreshold(streamConfig, newSegmentZKMetadata, committingSegmentDescriptor, null, 1);
+    flushThresholdUpdater.updateFlushThreshold(streamConfig, newSegmentZKMetadata, committingSegmentDescriptor, null, 1,
+        Collections.emptyList());
     int sizeThreshold = newSegmentZKMetadata.getSizeThresholdToFlushSegment();
 
     // First segment only consumed 15 rows, so next segment should have size threshold of 10_000
@@ -237,7 +239,7 @@ public class FlushThresholdUpdaterTest {
     LLCRealtimeSegmentZKMetadata committingSegmentZKMetadata =
         getCommittingSegmentZKMetadata(System.currentTimeMillis(), sizeThreshold, numRowsConsumed);
     flushThresholdUpdater.updateFlushThreshold(streamConfig, newSegmentZKMetadata, committingSegmentDescriptor,
-        committingSegmentZKMetadata, 1);
+        committingSegmentZKMetadata, 1, Collections.emptyList());
     sizeThreshold = newSegmentZKMetadata.getSizeThresholdToFlushSegment();
     assertEquals(sizeThreshold, SegmentSizeBasedFlushThresholdUpdater.MINIMUM_NUM_ROWS_THRESHOLD);
 
@@ -246,7 +248,7 @@ public class FlushThresholdUpdaterTest {
     committingSegmentZKMetadata =
         getCommittingSegmentZKMetadata(System.currentTimeMillis(), sizeThreshold, numRowsConsumed);
     flushThresholdUpdater.updateFlushThreshold(streamConfig, newSegmentZKMetadata, committingSegmentDescriptor,
-        committingSegmentZKMetadata, 1);
+        committingSegmentZKMetadata, 1, Collections.emptyList());
     sizeThreshold = newSegmentZKMetadata.getSizeThresholdToFlushSegment();
     assertEquals(sizeThreshold, SegmentSizeBasedFlushThresholdUpdater.MINIMUM_NUM_ROWS_THRESHOLD);
   }
@@ -261,9 +263,11 @@ public class FlushThresholdUpdaterTest {
     LLCRealtimeSegmentZKMetadata newSegmentZKMetadataForPartition1 = getNewSegmentZKMetadata(1);
     CommittingSegmentDescriptor committingSegmentDescriptor = getCommittingSegmentDescriptor(0L);
     flushThresholdUpdater
-        .updateFlushThreshold(streamConfig, newSegmentZKMetadataForPartition0, committingSegmentDescriptor, null, 1);
+        .updateFlushThreshold(streamConfig, newSegmentZKMetadataForPartition0, committingSegmentDescriptor, null, 1,
+            Collections.emptyList());
     flushThresholdUpdater
-        .updateFlushThreshold(streamConfig, newSegmentZKMetadataForPartition1, committingSegmentDescriptor, null, 1);
+        .updateFlushThreshold(streamConfig, newSegmentZKMetadataForPartition1, committingSegmentDescriptor, null, 1,
+            Collections.emptyList());
     int sizeThresholdForPartition0 = newSegmentZKMetadataForPartition0.getSizeThresholdToFlushSegment();
     int sizeThresholdForPartition1 = newSegmentZKMetadataForPartition1.getSizeThresholdToFlushSegment();
     double sizeRatio = flushThresholdUpdater.getLatestSegmentRowsToSizeRatio();
@@ -278,7 +282,7 @@ public class FlushThresholdUpdaterTest {
             sizeThresholdForPartition1);
     flushThresholdUpdater
         .updateFlushThreshold(streamConfig, newSegmentZKMetadataForPartition1, committingSegmentDescriptor,
-            committingSegmentZKMetadata, 1);
+            committingSegmentZKMetadata, 1, Collections.emptyList());
     sizeThresholdForPartition1 = newSegmentZKMetadataForPartition1.getSizeThresholdToFlushSegment();
     sizeRatio = flushThresholdUpdater.getLatestSegmentRowsToSizeRatio();
     assertTrue(sizeRatio > 0.0);
@@ -289,7 +293,7 @@ public class FlushThresholdUpdaterTest {
         sizeThresholdForPartition1);
     flushThresholdUpdater
         .updateFlushThreshold(streamConfig, newSegmentZKMetadataForPartition1, committingSegmentDescriptor,
-            committingSegmentZKMetadata, 1);
+            committingSegmentZKMetadata, 1, Collections.emptyList());
     assertEquals(flushThresholdUpdater.getLatestSegmentRowsToSizeRatio(), sizeRatio);
 
     // First segment update from partition 0 should change the size ratio
@@ -297,7 +301,7 @@ public class FlushThresholdUpdaterTest {
         sizeThresholdForPartition0);
     flushThresholdUpdater
         .updateFlushThreshold(streamConfig, newSegmentZKMetadataForPartition0, committingSegmentDescriptor,
-            committingSegmentZKMetadata, 1);
+            committingSegmentZKMetadata, 1, Collections.emptyList());
     assertNotEquals(flushThresholdUpdater.getLatestSegmentRowsToSizeRatio(), sizeRatio);
   }
 
@@ -315,8 +319,8 @@ public class FlushThresholdUpdaterTest {
     // Start consumption
     LLCRealtimeSegmentZKMetadata newSegmentZKMetadata = getNewSegmentZKMetadata(0);
     CommittingSegmentDescriptor committingSegmentDescriptor = getCommittingSegmentDescriptor(0L);
-    flushThresholdUpdater
-        .updateFlushThreshold(streamConfig, newSegmentZKMetadata, committingSegmentDescriptor, null, 1);
+    flushThresholdUpdater.updateFlushThreshold(streamConfig, newSegmentZKMetadata, committingSegmentDescriptor, null, 1,
+        Collections.emptyList());
     int sizeThreshold = newSegmentZKMetadata.getSizeThresholdToFlushSegment();
     assertEquals(sizeThreshold, flushAutotuneInitialRows);
 
@@ -330,7 +334,7 @@ public class FlushThresholdUpdaterTest {
     LLCRealtimeSegmentZKMetadata committingSegmentZKMetadata =
         getCommittingSegmentZKMetadata(creationTime, sizeThreshold, numRowsConsumed);
     flushThresholdUpdater.updateFlushThreshold(streamConfig, newSegmentZKMetadata, committingSegmentDescriptor,
-        committingSegmentZKMetadata, 1);
+        committingSegmentZKMetadata, 1, Collections.emptyList());
     sizeThreshold = newSegmentZKMetadata.getSizeThresholdToFlushSegment();
     assertTrue(sizeThreshold > numRowsConsumed);
 
@@ -343,7 +347,7 @@ public class FlushThresholdUpdaterTest {
         mockAutotuneStreamConfig(flushSegmentDesiredSizeBytes, flushThresholdTimeMillis, flushAutotuneInitialRows);
     committingSegmentZKMetadata = getCommittingSegmentZKMetadata(creationTime, sizeThreshold, numRowsConsumed);
     flushThresholdUpdater.updateFlushThreshold(streamConfig, newSegmentZKMetadata, committingSegmentDescriptor,
-        committingSegmentZKMetadata, 1);
+        committingSegmentZKMetadata, 1, Collections.emptyList());
     sizeThreshold = newSegmentZKMetadata.getSizeThresholdToFlushSegment();
     assertTrue(sizeThreshold < numRowsConsumed);
 
@@ -354,7 +358,7 @@ public class FlushThresholdUpdaterTest {
     committingSegmentDescriptor = getCommittingSegmentDescriptor(committingSegmentSize);
     committingSegmentZKMetadata = getCommittingSegmentZKMetadata(creationTime, sizeThreshold, numRowsConsumed);
     flushThresholdUpdater.updateFlushThreshold(streamConfig, newSegmentZKMetadata, committingSegmentDescriptor,
-        committingSegmentZKMetadata, 1);
+        committingSegmentZKMetadata, 1, Collections.emptyList());
     sizeThreshold = newSegmentZKMetadata.getSizeThresholdToFlushSegment();
     assertEquals(sizeThreshold,
         (long) (numRowsConsumed * SegmentSizeBasedFlushThresholdUpdater.ROWS_MULTIPLIER_WHEN_TIME_THRESHOLD_HIT));
@@ -367,7 +371,7 @@ public class FlushThresholdUpdaterTest {
         mockAutotuneStreamConfig(flushSegmentDesiredSizeBytes, flushThresholdTimeMillis, flushAutotuneInitialRows);
     committingSegmentZKMetadata = getCommittingSegmentZKMetadata(creationTime, sizeThreshold, numRowsConsumed);
     flushThresholdUpdater.updateFlushThreshold(streamConfig, newSegmentZKMetadata, committingSegmentDescriptor,
-        committingSegmentZKMetadata, 1);
+        committingSegmentZKMetadata, 1, Collections.emptyList());
     sizeThreshold = newSegmentZKMetadata.getSizeThresholdToFlushSegment();
     assertTrue(sizeThreshold < numRowsConsumed);
   }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org