You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2023/11/20 19:07:32 UTC

(pinot) branch master updated: Optimize segment commit to not read partition group metadata (#11943)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 8e84fc3fc5 Optimize segment commit to not read partition group metadata (#11943)
8e84fc3fc5 is described below

commit 8e84fc3fc50deeb3a05b24c09732c4116ad3979d
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Mon Nov 20 11:07:25 2023 -0800

    Optimize segment commit to not read partition group metadata (#11943)
---
 .../broker/broker/FakeStreamConsumerFactory.java   |   6 ++
 .../realtime/PinotLLCRealtimeSegmentManager.java   | 117 +++++++++++++--------
 .../segment/DefaultFlushThresholdUpdater.java      |   4 +-
 .../realtime/segment/FlushThresholdUpdater.java    |   4 +-
 .../segment/SegmentFlushThresholdComputer.java     |  29 ++---
 .../SegmentSizeBasedFlushThresholdUpdater.java     |  14 +--
 .../PinotLLCRealtimeSegmentManagerTest.java        |   9 ++
 .../segment/FlushThresholdUpdaterTest.java         | 108 ++++---------------
 .../segment/SegmentFlushThresholdComputerTest.java |  46 +++-----
 .../fakestream/FakeStreamMetadataProvider.java     |   8 ++
 .../kafka20/KafkaStreamMetadataProvider.java       |  19 ++++
 .../pulsar/PulsarStreamMetadataProvider.java       |  19 +++-
 .../pinot/spi/stream/StreamMetadataProvider.java   |  11 +-
 13 files changed, 186 insertions(+), 208 deletions(-)

diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/FakeStreamConsumerFactory.java b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/FakeStreamConsumerFactory.java
index 3ada727a13..b0fee61322 100644
--- a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/FakeStreamConsumerFactory.java
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/FakeStreamConsumerFactory.java
@@ -21,6 +21,7 @@ package org.apache.pinot.broker.broker;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.TimeoutException;
 import org.apache.pinot.spi.stream.LongMsgOffset;
 import org.apache.pinot.spi.stream.MessageBatch;
@@ -77,6 +78,11 @@ public class FakeStreamConsumerFactory extends StreamConsumerFactory {
       return 1;
     }
 
+    @Override
+    public Set<Integer> fetchPartitionIds(long timeoutMillis) {
+      return Collections.singleton(0);
+    }
+
     @Override
     public StreamPartitionMsgOffset fetchStreamPartitionOffset(OffsetCriteria offsetCriteria, long timeoutMillis)
         throws TimeoutException {
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 8611c251aa..90cd05d16b 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -92,7 +92,9 @@ import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
 import org.apache.pinot.spi.stream.PartitionGroupMetadata;
 import org.apache.pinot.spi.stream.StreamConfig;
 import org.apache.pinot.spi.stream.StreamConfigProperties;
+import org.apache.pinot.spi.stream.StreamConsumerFactory;
 import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
+import org.apache.pinot.spi.stream.StreamMetadataProvider;
 import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
 import org.apache.pinot.spi.stream.StreamPartitionMsgOffsetFactory;
 import org.apache.pinot.spi.utils.CommonConstants;
@@ -322,8 +324,7 @@ public class PinotLLCRealtimeSegmentManager {
     for (PartitionGroupMetadata partitionGroupMetadata : newPartitionGroupMetadataList) {
       String segmentName =
           setupNewPartitionGroup(tableConfig, streamConfig, partitionGroupMetadata, currentTimeMs, instancePartitions,
-              numPartitionGroups, numReplicas, newPartitionGroupMetadataList);
-
+              numPartitionGroups, numReplicas);
       updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null, segmentName, segmentAssignment,
           instancePartitionsMap);
     }
@@ -500,6 +501,10 @@ public class PinotLLCRealtimeSegmentManager {
     LLCSegmentName committingLLCSegment = new LLCSegmentName(committingSegmentName);
     int committingSegmentPartitionGroupId = committingLLCSegment.getPartitionGroupId();
     LOGGER.info("Committing segment metadata for segment: {}", committingSegmentName);
+    if (StringUtils.isBlank(committingSegmentDescriptor.getSegmentLocation())) {
+      LOGGER.warn("Committing segment: {} was not uploaded to deep store", committingSegmentName);
+      _controllerMetrics.addMeteredTableValue(realtimeTableName, ControllerMeter.SEGMENT_MISSING_DEEP_STORE_LINK, 1);
+    }
 
     TableConfig tableConfig = getTableConfig(realtimeTableName);
     InstancePartitions instancePartitions = getConsumingInstancePartitions(tableConfig);
@@ -518,40 +523,47 @@ public class PinotLLCRealtimeSegmentManager {
      */
 
     // Step-1
+    long startTimeNs1 = System.nanoTime();
     SegmentZKMetadata committingSegmentZKMetadata =
         updateCommittingSegmentZKMetadata(realtimeTableName, committingSegmentDescriptor);
     // Refresh the Broker routing to reflect the changes in the segment ZK metadata
     _helixResourceManager.sendSegmentRefreshMessage(realtimeTableName, committingSegmentName, false, true);
 
-    // Using the latest segment of each partition group, creates a list of {@link PartitionGroupConsumptionStatus}
-    StreamConfig streamConfig =
-        new StreamConfig(tableConfig.getTableName(), IngestionConfigUtils.getStreamConfigMap(tableConfig));
-    List<PartitionGroupConsumptionStatus> currentPartitionGroupConsumptionStatusList =
-        getPartitionGroupConsumptionStatusList(idealState, streamConfig);
-
-    // Fetches new partition groups, given current list of {@link PartitionGroupConsumptionStatus}.
-    List<PartitionGroupMetadata> newPartitionGroupMetadataList =
-        getNewPartitionGroupMetadataList(streamConfig, currentPartitionGroupConsumptionStatusList);
-    Set<Integer> newPartitionGroupSet =
-        newPartitionGroupMetadataList.stream().map(PartitionGroupMetadata::getPartitionGroupId)
-            .collect(Collectors.toSet());
-    int numPartitionGroups = newPartitionGroupMetadataList.size();
-
+    // Step-2
+    long startTimeNs2 = System.nanoTime();
     String newConsumingSegmentName = null;
-    if (!isTablePaused(idealState) && newPartitionGroupSet.contains(committingSegmentPartitionGroupId)) {
-      // Only if committingSegment's partitionGroup is present in the newPartitionGroupMetadataList, we create new
-      // segment metadata
-      String rawTableName = TableNameBuilder.extractRawTableName(realtimeTableName);
-      long newSegmentCreationTimeMs = getCurrentTimeMs();
-      LLCSegmentName newLLCSegment = new LLCSegmentName(rawTableName, committingSegmentPartitionGroupId,
-          committingLLCSegment.getSequenceNumber() + 1, newSegmentCreationTimeMs);
-      createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegment, newSegmentCreationTimeMs,
-          committingSegmentDescriptor, committingSegmentZKMetadata, instancePartitions, numPartitionGroups, numReplicas,
-          newPartitionGroupMetadataList);
-      newConsumingSegmentName = newLLCSegment.getSegmentName();
+    if (!isTablePaused(idealState)) {
+      StreamConfig streamConfig =
+          new StreamConfig(tableConfig.getTableName(), IngestionConfigUtils.getStreamConfigMap(tableConfig));
+      Set<Integer> partitionIds;
+      try {
+        partitionIds = getPartitionIds(streamConfig);
+      } catch (Exception e) {
+        LOGGER.info("Failed to fetch partition ids from stream metadata provider for table: {}, exception: {}. "
+            + "Reading all partition group metadata to determine partition ids.", realtimeTableName, e.toString());
+        // TODO: Find a better way to determine partition count and if the committing partition group is fully consumed.
+        //       We don't need to read partition group metadata for other partition groups.
+        List<PartitionGroupConsumptionStatus> currentPartitionGroupConsumptionStatusList =
+            getPartitionGroupConsumptionStatusList(idealState, streamConfig);
+        List<PartitionGroupMetadata> newPartitionGroupMetadataList =
+            getNewPartitionGroupMetadataList(streamConfig, currentPartitionGroupConsumptionStatusList);
+        partitionIds = newPartitionGroupMetadataList.stream().map(PartitionGroupMetadata::getPartitionGroupId)
+            .collect(Collectors.toSet());
+      }
+      if (partitionIds.contains(committingSegmentPartitionGroupId)) {
+        String rawTableName = TableNameBuilder.extractRawTableName(realtimeTableName);
+        long newSegmentCreationTimeMs = getCurrentTimeMs();
+        LLCSegmentName newLLCSegment = new LLCSegmentName(rawTableName, committingSegmentPartitionGroupId,
+            committingLLCSegment.getSequenceNumber() + 1, newSegmentCreationTimeMs);
+        createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegment, newSegmentCreationTimeMs,
+            committingSegmentDescriptor, committingSegmentZKMetadata, instancePartitions, partitionIds.size(),
+            numReplicas);
+        newConsumingSegmentName = newLLCSegment.getSegmentName();
+      }
     }
 
     // Step-3
+    long startTimeNs3 = System.nanoTime();
     SegmentAssignment segmentAssignment =
         SegmentAssignmentFactory.getSegmentAssignment(_helixManager, tableConfig, _controllerMetrics);
     Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap =
@@ -572,6 +584,15 @@ public class PinotLLCRealtimeSegmentManager {
       lock.unlock();
     }
 
+    long endTimeNs = System.nanoTime();
+    LOGGER.info(
+        "Finished committing segment metadata for segment: {}. Time taken for updating committing segment metadata: "
+            + "{}ms; creating new consuming segment ({}) metadata: {}ms; updating ideal state: {}ms; total: {}ms",
+        committingSegmentName, TimeUnit.NANOSECONDS.toMillis(startTimeNs2 - startTimeNs1), newConsumingSegmentName,
+        TimeUnit.NANOSECONDS.toMillis(startTimeNs3 - startTimeNs2),
+        TimeUnit.NANOSECONDS.toMillis(endTimeNs - startTimeNs3),
+        TimeUnit.NANOSECONDS.toMillis(endTimeNs - startTimeNs1));
+
     // TODO: also create the new partition groups here, instead of waiting till the {@link
     //  RealtimeSegmentValidationManager} runs
     //  E.g. If current state is A, B, C, and newPartitionGroupMetadataList contains B, C, D, E,
@@ -581,10 +602,6 @@ public class PinotLLCRealtimeSegmentManager {
 
     // Trigger the metadata event notifier
     _metadataEventNotifierFactory.create().notifyOnSegmentFlush(tableConfig);
-
-    if (StringUtils.isBlank(committingSegmentDescriptor.getSegmentLocation())) {
-      _controllerMetrics.addMeteredTableValue(realtimeTableName, ControllerMeter.SEGMENT_MISSING_DEEP_STORE_LINK, 1);
-    }
   }
 
   /**
@@ -649,8 +666,8 @@ public class PinotLLCRealtimeSegmentManager {
    */
   private void createNewSegmentZKMetadata(TableConfig tableConfig, StreamConfig streamConfig,
       LLCSegmentName newLLCSegmentName, long creationTimeMs, CommittingSegmentDescriptor committingSegmentDescriptor,
-      @Nullable SegmentZKMetadata committingSegmentZKMetadata, InstancePartitions instancePartitions,
-      int numPartitionGroups, int numReplicas, List<PartitionGroupMetadata> partitionGroupMetadataList) {
+      @Nullable SegmentZKMetadata committingSegmentZKMetadata, InstancePartitions instancePartitions, int numPartitions,
+      int numReplicas) {
     String realtimeTableName = tableConfig.getTableName();
     String segmentName = newLLCSegmentName.getSegmentName();
     String startOffset = committingSegmentDescriptor.getNextOffset();
@@ -668,7 +685,7 @@ public class PinotLLCRealtimeSegmentManager {
 
     // Add the partition metadata if available
     SegmentPartitionMetadata partitionMetadata =
-        getPartitionMetadataFromTableConfig(tableConfig, newLLCSegmentName.getPartitionGroupId(), numPartitionGroups);
+        getPartitionMetadataFromTableConfig(tableConfig, newLLCSegmentName.getPartitionGroupId(), numPartitions);
     if (partitionMetadata != null) {
       newSegmentZKMetadata.setPartitionMetadata(partitionMetadata);
     }
@@ -676,9 +693,7 @@ public class PinotLLCRealtimeSegmentManager {
     // Update the flush threshold
     FlushThresholdUpdater flushThresholdUpdater = _flushThresholdUpdateManager.getFlushThresholdUpdater(streamConfig);
     flushThresholdUpdater.updateFlushThreshold(streamConfig, newSegmentZKMetadata, committingSegmentDescriptor,
-        committingSegmentZKMetadata,
-        getMaxNumPartitionsPerInstance(instancePartitions, numPartitionGroups, numReplicas),
-        partitionGroupMetadataList);
+        committingSegmentZKMetadata, getMaxNumPartitionsPerInstance(instancePartitions, numPartitions, numReplicas));
 
     persistSegmentZKMetadata(realtimeTableName, newSegmentZKMetadata, -1);
   }
@@ -747,6 +762,22 @@ public class PinotLLCRealtimeSegmentManager {
     return commitTimeoutMS;
   }
 
+  /**
+   * Fetches the partition ids for the stream. Some stream (e.g. Kinesis) might not support this operation, in which
+   * case exception will be thrown.
+   */
+  @VisibleForTesting
+  Set<Integer> getPartitionIds(StreamConfig streamConfig)
+      throws Exception {
+    String clientId =
+        PinotLLCRealtimeSegmentManager.class.getSimpleName() + "-" + streamConfig.getTableNameWithType() + "-"
+            + streamConfig.getTopicName();
+    StreamConsumerFactory consumerFactory = StreamConsumerFactoryProvider.create(streamConfig);
+    try (StreamMetadataProvider metadataProvider = consumerFactory.createStreamMetadataProvider(clientId)) {
+      return metadataProvider.fetchPartitionIds(5000L);
+    }
+  }
+
   /**
    * Fetches the latest state of the PartitionGroups for the stream
    * If any partition has reached end of life, and all messages of that partition have been consumed by the segment,
@@ -1122,8 +1153,7 @@ public class PinotLLCRealtimeSegmentManager {
                   new CommittingSegmentDescriptor(latestSegmentName,
                       (offsetFactory.create(latestSegmentZKMetadata.getEndOffset()).toString()), 0);
               createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName, currentTimeMs,
-                  committingSegmentDescriptor, latestSegmentZKMetadata, instancePartitions, numPartitions, numReplicas,
-                  newPartitionGroupMetadataList);
+                  committingSegmentDescriptor, latestSegmentZKMetadata, instancePartitions, numPartitions, numReplicas);
               updateInstanceStatesForNewConsumingSegment(instanceStatesMap, latestSegmentName, newSegmentName,
                   segmentAssignment, instancePartitionsMap);
             } else { // partition group reached end of life
@@ -1227,7 +1257,7 @@ public class PinotLLCRealtimeSegmentManager {
       if (!latestSegmentZKMetadataMap.containsKey(partitionGroupId)) {
         String newSegmentName =
             setupNewPartitionGroup(tableConfig, streamConfig, partitionGroupMetadata, currentTimeMs, instancePartitions,
-                numPartitions, numReplicas, newPartitionGroupMetadataList);
+                numPartitions, numReplicas);
         updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null, newSegmentName, segmentAssignment,
             instancePartitionsMap);
       }
@@ -1248,7 +1278,7 @@ public class PinotLLCRealtimeSegmentManager {
     CommittingSegmentDescriptor committingSegmentDescriptor =
         new CommittingSegmentDescriptor(latestSegmentZKMetadata.getSegmentName(), startOffset.toString(), 0);
     createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName, currentTimeMs, committingSegmentDescriptor,
-        latestSegmentZKMetadata, instancePartitions, numPartitions, numReplicas, newPartitionGroupMetadataList);
+        latestSegmentZKMetadata, instancePartitions, numPartitions, numReplicas);
     String newSegmentName = newLLCSegmentName.getSegmentName();
     updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null, newSegmentName, segmentAssignment,
         instancePartitionsMap);
@@ -1300,7 +1330,7 @@ public class PinotLLCRealtimeSegmentManager {
    */
   private String setupNewPartitionGroup(TableConfig tableConfig, StreamConfig streamConfig,
       PartitionGroupMetadata partitionGroupMetadata, long creationTimeMs, InstancePartitions instancePartitions,
-      int numPartitionGroups, int numReplicas, List<PartitionGroupMetadata> partitionGroupMetadataList) {
+      int numPartitions, int numReplicas) {
     String realtimeTableName = tableConfig.getTableName();
     int partitionGroupId = partitionGroupMetadata.getPartitionGroupId();
     String startOffset = partitionGroupMetadata.getStartOffset().toString();
@@ -1313,8 +1343,7 @@ public class PinotLLCRealtimeSegmentManager {
 
     CommittingSegmentDescriptor committingSegmentDescriptor = new CommittingSegmentDescriptor(null, startOffset, 0);
     createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName, creationTimeMs,
-        committingSegmentDescriptor, null, instancePartitions, numPartitionGroups, numReplicas,
-        partitionGroupMetadataList);
+        committingSegmentDescriptor, null, instancePartitions, numPartitions, numReplicas);
 
     return newSegmentName;
   }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/DefaultFlushThresholdUpdater.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/DefaultFlushThresholdUpdater.java
index e1dc7f9a80..57a4a11743 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/DefaultFlushThresholdUpdater.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/DefaultFlushThresholdUpdater.java
@@ -19,10 +19,8 @@
 package org.apache.pinot.controller.helix.core.realtime.segment;
 
 import com.google.common.annotations.VisibleForTesting;
-import java.util.List;
 import javax.annotation.Nullable;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
-import org.apache.pinot.spi.stream.PartitionGroupMetadata;
 import org.apache.pinot.spi.stream.StreamConfig;
 
 
@@ -40,7 +38,7 @@ public class DefaultFlushThresholdUpdater implements FlushThresholdUpdater {
   @Override
   public void updateFlushThreshold(StreamConfig streamConfig, SegmentZKMetadata newSegmentZKMetadata,
       CommittingSegmentDescriptor committingSegmentDescriptor, @Nullable SegmentZKMetadata committingSegmentZKMetadata,
-      int maxNumPartitionsPerInstance, List<PartitionGroupMetadata> partitionGroupMetadataList) {
+      int maxNumPartitionsPerInstance) {
     // Configure the segment size flush limit based on the maximum number of partitions allocated to an instance
     newSegmentZKMetadata.setSizeThresholdToFlushSegment(_tableFlushSize / maxNumPartitionsPerInstance);
   }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdater.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdater.java
index 5022ca398a..e322a367e0 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdater.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdater.java
@@ -18,10 +18,8 @@
  */
 package org.apache.pinot.controller.helix.core.realtime.segment;
 
-import java.util.List;
 import javax.annotation.Nullable;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
-import org.apache.pinot.spi.stream.PartitionGroupMetadata;
 import org.apache.pinot.spi.stream.StreamConfig;
 
 
@@ -36,5 +34,5 @@ public interface FlushThresholdUpdater {
    */
   void updateFlushThreshold(StreamConfig streamConfig, SegmentZKMetadata newSegmentZKMetadata,
       CommittingSegmentDescriptor committingSegmentDescriptor, @Nullable SegmentZKMetadata committingSegmentZKMetadata,
-      int maxNumPartitionsPerInstance, List<PartitionGroupMetadata> partitionGroupMetadataList);
+      int maxNumPartitionsPerInstance);
 }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentFlushThresholdComputer.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentFlushThresholdComputer.java
index aba18df887..808642a345 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentFlushThresholdComputer.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentFlushThresholdComputer.java
@@ -20,11 +20,8 @@ package org.apache.pinot.controller.helix.core.realtime.segment;
 
 import com.google.common.annotations.VisibleForTesting;
 import java.time.Clock;
-import java.util.List;
 import javax.annotation.Nullable;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
-import org.apache.pinot.common.utils.LLCSegmentName;
-import org.apache.pinot.spi.stream.PartitionGroupMetadata;
 import org.apache.pinot.spi.stream.StreamConfig;
 import org.apache.pinot.spi.utils.TimeUtils;
 
@@ -58,8 +55,7 @@ class SegmentFlushThresholdComputer {
   }
 
   public int computeThreshold(StreamConfig streamConfig, CommittingSegmentDescriptor committingSegmentDescriptor,
-      @Nullable SegmentZKMetadata committingSegmentZKMetadata, List<PartitionGroupMetadata> partitionGroupMetadataList,
-      String newSegmentName) {
+      @Nullable SegmentZKMetadata committingSegmentZKMetadata, String newSegmentName) {
     final long desiredSegmentSizeBytes = streamConfig.getFlushThresholdSegmentSizeBytes();
     final long optimalSegmentSizeBytesMin = desiredSegmentSizeBytes / 2;
     final double optimalSegmentSizeBytesMax = desiredSegmentSizeBytes * 1.5;
@@ -99,24 +95,11 @@ class SegmentFlushThresholdComputer {
         committingSegmentSizeBytes);
 
     double currentRatio = (double) numRowsConsumed / committingSegmentSizeBytes;
-    // Compute segment size to rows ratio only from the lowest available partition id.
-    // If we consider all partitions then it is likely that we will assign a much higher weight to the most
-    // recent trend in the table (since it is usually true that all partitions of the same table follow more or
-    // less same characteristics at any one point in time).
-    // However, when we start a new table or change controller mastership, we can have any partition completing first.
-    // It is best to learn the ratio as quickly as we can, so we allow any partition to supply the value.
-    int smallestAvailablePartitionGroupId =
-        partitionGroupMetadataList.stream().mapToInt(PartitionGroupMetadata::getPartitionGroupId).min().orElse(0);
-
-    if (new LLCSegmentName(newSegmentName).getPartitionGroupId() == smallestAvailablePartitionGroupId
-        || _latestSegmentRowsToSizeRatio == 0) {
-      if (_latestSegmentRowsToSizeRatio > 0) {
-        _latestSegmentRowsToSizeRatio =
-            CURRENT_SEGMENT_RATIO_WEIGHT * currentRatio
-                + PREVIOUS_SEGMENT_RATIO_WEIGHT * _latestSegmentRowsToSizeRatio;
-      } else {
-        _latestSegmentRowsToSizeRatio = currentRatio;
-      }
+    if (_latestSegmentRowsToSizeRatio > 0) {
+      _latestSegmentRowsToSizeRatio =
+          CURRENT_SEGMENT_RATIO_WEIGHT * currentRatio + PREVIOUS_SEGMENT_RATIO_WEIGHT * _latestSegmentRowsToSizeRatio;
+    } else {
+      _latestSegmentRowsToSizeRatio = currentRatio;
     }
 
     // If the number of rows consumed is less than what we set as target in metadata, then the segment hit time limit.
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentSizeBasedFlushThresholdUpdater.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentSizeBasedFlushThresholdUpdater.java
index 439279afbd..511e85651b 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentSizeBasedFlushThresholdUpdater.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentSizeBasedFlushThresholdUpdater.java
@@ -18,11 +18,8 @@
  */
 package org.apache.pinot.controller.helix.core.realtime.segment;
 
-import com.google.common.annotations.VisibleForTesting;
-import java.util.List;
 import javax.annotation.Nullable;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
-import org.apache.pinot.spi.stream.PartitionGroupMetadata;
 import org.apache.pinot.spi.stream.StreamConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -33,18 +30,13 @@ import org.slf4j.LoggerFactory;
  * previous segment
  * The formula used to compute new number of rows is:
  * targetNumRows = ideal_segment_size * (a * current_rows_to_size_ratio + b * previous_rows_to_size_ratio)
- * where a = 0.25, b = 0.75, prev ratio= ratio collected over all previous segment completions
+ * where a = 0.1, b = 0.9, prev ratio= ratio collected over all previous segment completions
  * This ensures that we take into account the history of the segment size and number rows
  */
 public class SegmentSizeBasedFlushThresholdUpdater implements FlushThresholdUpdater {
   public static final Logger LOGGER = LoggerFactory.getLogger(SegmentSizeBasedFlushThresholdUpdater.class);
   private final SegmentFlushThresholdComputer _flushThresholdComputer;
 
-  @VisibleForTesting
-  double getLatestSegmentRowsToSizeRatio() {
-    return _flushThresholdComputer.getLatestSegmentRowsToSizeRatio();
-  }
-
   public SegmentSizeBasedFlushThresholdUpdater() {
     _flushThresholdComputer = new SegmentFlushThresholdComputer();
   }
@@ -53,10 +45,10 @@ public class SegmentSizeBasedFlushThresholdUpdater implements FlushThresholdUpda
   @Override
   public synchronized void updateFlushThreshold(StreamConfig streamConfig, SegmentZKMetadata newSegmentZKMetadata,
       CommittingSegmentDescriptor committingSegmentDescriptor, @Nullable SegmentZKMetadata committingSegmentZKMetadata,
-      int maxNumPartitionsPerInstance, List<PartitionGroupMetadata> partitionGroupMetadataList) {
+      int maxNumPartitionsPerInstance) {
     int threshold =
         _flushThresholdComputer.computeThreshold(streamConfig, committingSegmentDescriptor, committingSegmentZKMetadata,
-            partitionGroupMetadataList, newSegmentZKMetadata.getSegmentName());
+            newSegmentZKMetadata.getSegmentName());
     newSegmentZKMetadata.setSizeThresholdToFlushSegment(threshold);
   }
 }
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index ffc98692a0..92d7ec19b4 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -31,6 +31,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.Set;
 import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
@@ -1220,6 +1221,14 @@ public class PinotLLCRealtimeSegmentManagerTest {
           segmentAssignment, instancePartitionsMap);
     }
 
+    @Override
+    Set<Integer> getPartitionIds(StreamConfig streamConfig) {
+      if (_partitionGroupMetadataList != null) {
+        throw new UnsupportedOperationException();
+      }
+      return IntStream.range(0, _numPartitions).boxed().collect(Collectors.toSet());
+    }
+
     @Override
     List<PartitionGroupMetadata> getNewPartitionGroupMetadataList(StreamConfig streamConfig,
         List<PartitionGroupConsumptionStatus> currentPartitionGroupConsumptionStatusList) {
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdaterTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdaterTest.java
index fbdaec5d3e..1184f113d2 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdaterTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdaterTest.java
@@ -18,14 +18,10 @@
  */
 package org.apache.pinot.controller.helix.core.realtime.segment;
 
-import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.spi.stream.LongMsgOffset;
-import org.apache.pinot.spi.stream.PartitionGroupMetadata;
 import org.apache.pinot.spi.stream.StreamConfig;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.testng.annotations.Test;
@@ -134,9 +130,8 @@ public class FlushThresholdUpdaterTest {
       // Start consumption
       SegmentZKMetadata newSegmentZKMetadata = getNewSegmentZKMetadata(0);
       CommittingSegmentDescriptor committingSegmentDescriptor = getCommittingSegmentDescriptor(0L);
-      flushThresholdUpdater
-          .updateFlushThreshold(streamConfig, newSegmentZKMetadata, committingSegmentDescriptor, null, 1,
-              Collections.emptyList());
+      flushThresholdUpdater.updateFlushThreshold(streamConfig, newSegmentZKMetadata, committingSegmentDescriptor, null,
+          1);
       assertEquals(newSegmentZKMetadata.getSizeThresholdToFlushSegment(), streamConfig.getFlushAutotuneInitialRows());
 
       int numRuns = 500;
@@ -148,7 +143,7 @@ public class FlushThresholdUpdaterTest {
         SegmentZKMetadata committingSegmentZKMetadata =
             getCommittingSegmentZKMetadata(System.currentTimeMillis(), numRowsConsumed, numRowsConsumed);
         flushThresholdUpdater.updateFlushThreshold(streamConfig, newSegmentZKMetadata, committingSegmentDescriptor,
-            committingSegmentZKMetadata, 1, Collections.emptyList());
+            committingSegmentZKMetadata, 1);
 
         // Assert that segment size is in limits
         if (run > checkRunsAfter) {
@@ -172,9 +167,8 @@ public class FlushThresholdUpdaterTest {
       // Start consumption
       SegmentZKMetadata newSegmentZKMetadata = getNewSegmentZKMetadata(1);
       CommittingSegmentDescriptor committingSegmentDescriptor = getCommittingSegmentDescriptor(0L);
-      flushThresholdUpdater
-          .updateFlushThreshold(streamConfig, newSegmentZKMetadata, committingSegmentDescriptor, null, 1,
-              getPartitionGroupMetadataList(3, 1));
+      flushThresholdUpdater.updateFlushThreshold(streamConfig, newSegmentZKMetadata, committingSegmentDescriptor, null,
+          1);
       assertEquals(newSegmentZKMetadata.getSizeThresholdToFlushSegment(), streamConfig.getFlushAutotuneInitialRows());
 
       int numRuns = 500;
@@ -186,7 +180,7 @@ public class FlushThresholdUpdaterTest {
         SegmentZKMetadata committingSegmentZKMetadata =
             getCommittingSegmentZKMetadata(System.currentTimeMillis(), numRowsConsumed, numRowsConsumed);
         flushThresholdUpdater.updateFlushThreshold(streamConfig, newSegmentZKMetadata, committingSegmentDescriptor,
-            committingSegmentZKMetadata, 1, getPartitionGroupMetadataList(3, 1));
+            committingSegmentZKMetadata, 1);
 
         // Assert that segment size is in limits
         if (run > checkRunsAfter) {
@@ -201,16 +195,6 @@ public class FlushThresholdUpdaterTest {
         new LLCSegmentName(RAW_TABLE_NAME, partitionId, 0, System.currentTimeMillis()).getSegmentName());
   }
 
-  private List<PartitionGroupMetadata> getPartitionGroupMetadataList(int numPartitions, int startPartitionId) {
-    List<PartitionGroupMetadata> newPartitionGroupMetadataList = new ArrayList<>();
-
-    for (int i = 0; i < numPartitions; i++) {
-      newPartitionGroupMetadataList.add(new PartitionGroupMetadata(startPartitionId + i, null));
-    }
-
-    return newPartitionGroupMetadataList;
-  }
-
   private CommittingSegmentDescriptor getCommittingSegmentDescriptor(long segmentSizeBytes) {
     return new CommittingSegmentDescriptor(null, new LongMsgOffset(0).toString(), segmentSizeBytes);
   }
@@ -244,8 +228,8 @@ public class FlushThresholdUpdaterTest {
     // Start consumption
     SegmentZKMetadata newSegmentZKMetadata = getNewSegmentZKMetadata(0);
     CommittingSegmentDescriptor committingSegmentDescriptor = getCommittingSegmentDescriptor(0L);
-    flushThresholdUpdater.updateFlushThreshold(streamConfig, newSegmentZKMetadata, committingSegmentDescriptor, null, 1,
-        Collections.emptyList());
+    flushThresholdUpdater.updateFlushThreshold(streamConfig, newSegmentZKMetadata, committingSegmentDescriptor, null,
+        1);
     int sizeThreshold = newSegmentZKMetadata.getSizeThresholdToFlushSegment();
 
     // First segment consumes rows less than the threshold
@@ -254,7 +238,7 @@ public class FlushThresholdUpdaterTest {
     SegmentZKMetadata committingSegmentZKMetadata =
         getCommittingSegmentZKMetadata(System.currentTimeMillis(), sizeThreshold, numRowsConsumed);
     flushThresholdUpdater.updateFlushThreshold(streamConfig, newSegmentZKMetadata, committingSegmentDescriptor,
-        committingSegmentZKMetadata, 1, Collections.emptyList());
+        committingSegmentZKMetadata, 1);
     sizeThreshold = newSegmentZKMetadata.getSizeThresholdToFlushSegment();
     assertEquals(sizeThreshold,
         (int) (numRowsConsumed * SegmentFlushThresholdComputer.ROWS_MULTIPLIER_WHEN_TIME_THRESHOLD_HIT));
@@ -264,7 +248,7 @@ public class FlushThresholdUpdaterTest {
     committingSegmentZKMetadata =
         getCommittingSegmentZKMetadata(System.currentTimeMillis(), sizeThreshold, numRowsConsumed);
     flushThresholdUpdater.updateFlushThreshold(streamConfig, newSegmentZKMetadata, committingSegmentDescriptor,
-        committingSegmentZKMetadata, 1, Collections.emptyList());
+        committingSegmentZKMetadata, 1);
     assertNotEquals(newSegmentZKMetadata.getSizeThresholdToFlushSegment(),
         (int) (numRowsConsumed * SegmentFlushThresholdComputer.ROWS_MULTIPLIER_WHEN_TIME_THRESHOLD_HIT));
   }
@@ -277,8 +261,8 @@ public class FlushThresholdUpdaterTest {
     // Start consumption
     SegmentZKMetadata newSegmentZKMetadata = getNewSegmentZKMetadata(0);
     CommittingSegmentDescriptor committingSegmentDescriptor = getCommittingSegmentDescriptor(0L);
-    flushThresholdUpdater.updateFlushThreshold(streamConfig, newSegmentZKMetadata, committingSegmentDescriptor, null, 1,
-        Collections.emptyList());
+    flushThresholdUpdater.updateFlushThreshold(streamConfig, newSegmentZKMetadata, committingSegmentDescriptor, null,
+        1);
     int sizeThreshold = newSegmentZKMetadata.getSizeThresholdToFlushSegment();
 
     // First segment only consumed 15 rows, so next segment should have size threshold of 10_000
@@ -287,7 +271,7 @@ public class FlushThresholdUpdaterTest {
     SegmentZKMetadata committingSegmentZKMetadata =
         getCommittingSegmentZKMetadata(System.currentTimeMillis(), sizeThreshold, numRowsConsumed);
     flushThresholdUpdater.updateFlushThreshold(streamConfig, newSegmentZKMetadata, committingSegmentDescriptor,
-        committingSegmentZKMetadata, 1, Collections.emptyList());
+        committingSegmentZKMetadata, 1);
     sizeThreshold = newSegmentZKMetadata.getSizeThresholdToFlushSegment();
     assertEquals(sizeThreshold, SegmentFlushThresholdComputer.MINIMUM_NUM_ROWS_THRESHOLD);
 
@@ -296,63 +280,11 @@ public class FlushThresholdUpdaterTest {
     committingSegmentZKMetadata =
         getCommittingSegmentZKMetadata(System.currentTimeMillis(), sizeThreshold, numRowsConsumed);
     flushThresholdUpdater.updateFlushThreshold(streamConfig, newSegmentZKMetadata, committingSegmentDescriptor,
-        committingSegmentZKMetadata, 1, Collections.emptyList());
+        committingSegmentZKMetadata, 1);
     sizeThreshold = newSegmentZKMetadata.getSizeThresholdToFlushSegment();
     assertEquals(sizeThreshold, SegmentFlushThresholdComputer.MINIMUM_NUM_ROWS_THRESHOLD);
   }
 
-  @Test
-  public void testNonZeroPartitionUpdates() {
-    SegmentSizeBasedFlushThresholdUpdater flushThresholdUpdater = new SegmentSizeBasedFlushThresholdUpdater();
-    StreamConfig streamConfig = mockDefaultAutotuneStreamConfig();
-
-    // Start consumption for 2 partitions
-    SegmentZKMetadata newSegmentZKMetadataForPartition0 = getNewSegmentZKMetadata(0);
-    SegmentZKMetadata newSegmentZKMetadataForPartition1 = getNewSegmentZKMetadata(1);
-    CommittingSegmentDescriptor committingSegmentDescriptor = getCommittingSegmentDescriptor(0L);
-    flushThresholdUpdater
-        .updateFlushThreshold(streamConfig, newSegmentZKMetadataForPartition0, committingSegmentDescriptor, null, 1,
-            Collections.emptyList());
-    flushThresholdUpdater
-        .updateFlushThreshold(streamConfig, newSegmentZKMetadataForPartition1, committingSegmentDescriptor, null, 1,
-            Collections.emptyList());
-    int sizeThresholdForPartition0 = newSegmentZKMetadataForPartition0.getSizeThresholdToFlushSegment();
-    int sizeThresholdForPartition1 = newSegmentZKMetadataForPartition1.getSizeThresholdToFlushSegment();
-    double sizeRatio = flushThresholdUpdater.getLatestSegmentRowsToSizeRatio();
-    assertEquals(sizeThresholdForPartition0, streamConfig.getFlushAutotuneInitialRows());
-    assertEquals(sizeThresholdForPartition1, streamConfig.getFlushAutotuneInitialRows());
-    assertEquals(sizeRatio, 0.0);
-
-    // First segment from partition 1 should change the size ratio
-    committingSegmentDescriptor = getCommittingSegmentDescriptor(128_000_000L);
-    SegmentZKMetadata committingSegmentZKMetadata =
-        getCommittingSegmentZKMetadata(System.currentTimeMillis(), sizeThresholdForPartition1,
-            sizeThresholdForPartition1);
-    flushThresholdUpdater
-        .updateFlushThreshold(streamConfig, newSegmentZKMetadataForPartition1, committingSegmentDescriptor,
-            committingSegmentZKMetadata, 1, Collections.emptyList());
-    sizeThresholdForPartition1 = newSegmentZKMetadataForPartition1.getSizeThresholdToFlushSegment();
-    sizeRatio = flushThresholdUpdater.getLatestSegmentRowsToSizeRatio();
-    assertTrue(sizeRatio > 0.0);
-
-    // Second segment update from partition 1 should not change the size ratio
-    committingSegmentDescriptor = getCommittingSegmentDescriptor(256_000_000L);
-    committingSegmentZKMetadata = getCommittingSegmentZKMetadata(System.currentTimeMillis(), sizeThresholdForPartition1,
-        sizeThresholdForPartition1);
-    flushThresholdUpdater
-        .updateFlushThreshold(streamConfig, newSegmentZKMetadataForPartition1, committingSegmentDescriptor,
-            committingSegmentZKMetadata, 1, Collections.emptyList());
-    assertEquals(flushThresholdUpdater.getLatestSegmentRowsToSizeRatio(), sizeRatio);
-
-    // First segment update from partition 0 should change the size ratio
-    committingSegmentZKMetadata = getCommittingSegmentZKMetadata(System.currentTimeMillis(), sizeThresholdForPartition0,
-        sizeThresholdForPartition0);
-    flushThresholdUpdater
-        .updateFlushThreshold(streamConfig, newSegmentZKMetadataForPartition0, committingSegmentDescriptor,
-            committingSegmentZKMetadata, 1, Collections.emptyList());
-    assertNotEquals(flushThresholdUpdater.getLatestSegmentRowsToSizeRatio(), sizeRatio);
-  }
-
   @Test
   public void testSegmentSizeBasedUpdaterWithModifications() {
     SegmentSizeBasedFlushThresholdUpdater flushThresholdUpdater = new SegmentSizeBasedFlushThresholdUpdater();
@@ -367,8 +299,8 @@ public class FlushThresholdUpdaterTest {
     // Start consumption
     SegmentZKMetadata newSegmentZKMetadata = getNewSegmentZKMetadata(0);
     CommittingSegmentDescriptor committingSegmentDescriptor = getCommittingSegmentDescriptor(0L);
-    flushThresholdUpdater.updateFlushThreshold(streamConfig, newSegmentZKMetadata, committingSegmentDescriptor, null, 1,
-        Collections.emptyList());
+    flushThresholdUpdater.updateFlushThreshold(streamConfig, newSegmentZKMetadata, committingSegmentDescriptor, null,
+        1);
     int sizeThreshold = newSegmentZKMetadata.getSizeThresholdToFlushSegment();
     assertEquals(sizeThreshold, flushAutotuneInitialRows);
 
@@ -382,7 +314,7 @@ public class FlushThresholdUpdaterTest {
     SegmentZKMetadata committingSegmentZKMetadata =
         getCommittingSegmentZKMetadata(creationTime, sizeThreshold, numRowsConsumed);
     flushThresholdUpdater.updateFlushThreshold(streamConfig, newSegmentZKMetadata, committingSegmentDescriptor,
-        committingSegmentZKMetadata, 1, Collections.emptyList());
+        committingSegmentZKMetadata, 1);
     sizeThreshold = newSegmentZKMetadata.getSizeThresholdToFlushSegment();
     assertTrue(sizeThreshold > numRowsConsumed);
 
@@ -395,7 +327,7 @@ public class FlushThresholdUpdaterTest {
         mockAutotuneStreamConfig(flushSegmentDesiredSizeBytes, flushThresholdTimeMillis, flushAutotuneInitialRows);
     committingSegmentZKMetadata = getCommittingSegmentZKMetadata(creationTime, sizeThreshold, numRowsConsumed);
     flushThresholdUpdater.updateFlushThreshold(streamConfig, newSegmentZKMetadata, committingSegmentDescriptor,
-        committingSegmentZKMetadata, 1, Collections.emptyList());
+        committingSegmentZKMetadata, 1);
     sizeThreshold = newSegmentZKMetadata.getSizeThresholdToFlushSegment();
     assertTrue(sizeThreshold < numRowsConsumed);
 
@@ -406,7 +338,7 @@ public class FlushThresholdUpdaterTest {
     committingSegmentDescriptor = getCommittingSegmentDescriptor(committingSegmentSize);
     committingSegmentZKMetadata = getCommittingSegmentZKMetadata(creationTime, sizeThreshold, numRowsConsumed);
     flushThresholdUpdater.updateFlushThreshold(streamConfig, newSegmentZKMetadata, committingSegmentDescriptor,
-        committingSegmentZKMetadata, 1, Collections.emptyList());
+        committingSegmentZKMetadata, 1);
     sizeThreshold = newSegmentZKMetadata.getSizeThresholdToFlushSegment();
     assertEquals(sizeThreshold,
         (long) (numRowsConsumed * SegmentFlushThresholdComputer.ROWS_MULTIPLIER_WHEN_TIME_THRESHOLD_HIT));
@@ -419,7 +351,7 @@ public class FlushThresholdUpdaterTest {
         mockAutotuneStreamConfig(flushSegmentDesiredSizeBytes, flushThresholdTimeMillis, flushAutotuneInitialRows);
     committingSegmentZKMetadata = getCommittingSegmentZKMetadata(creationTime, sizeThreshold, numRowsConsumed);
     flushThresholdUpdater.updateFlushThreshold(streamConfig, newSegmentZKMetadata, committingSegmentDescriptor,
-        committingSegmentZKMetadata, 1, Collections.emptyList());
+        committingSegmentZKMetadata, 1);
     sizeThreshold = newSegmentZKMetadata.getSizeThresholdToFlushSegment();
     assertTrue(sizeThreshold < numRowsConsumed);
   }
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentFlushThresholdComputerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentFlushThresholdComputerTest.java
index 820b40a491..9bdc1656a0 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentFlushThresholdComputerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentFlushThresholdComputerTest.java
@@ -20,11 +20,8 @@ package org.apache.pinot.controller.helix.core.realtime.segment;
 
 import java.time.Clock;
 import java.time.ZoneId;
-import java.util.ArrayList;
-import java.util.List;
 import java.util.concurrent.TimeUnit;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
-import org.apache.pinot.spi.stream.PartitionGroupMetadata;
 import org.apache.pinot.spi.stream.StreamConfig;
 import org.testng.annotations.Test;
 
@@ -35,6 +32,7 @@ import static org.testng.Assert.assertEquals;
 
 
 public class SegmentFlushThresholdComputerTest {
+
   @Test
   public void testUseAutoTuneInitialRowsIfFirstSegmentInPartition() {
     int autoTuneInitialRows = 1_000;
@@ -44,11 +42,8 @@ public class SegmentFlushThresholdComputerTest {
     when(streamConfig.getFlushAutotuneInitialRows()).thenReturn(autoTuneInitialRows);
 
     CommittingSegmentDescriptor committingSegmentDescriptor = mock(CommittingSegmentDescriptor.class);
-    SegmentZKMetadata committingSegmentZKMetadata = null;
 
-    List<PartitionGroupMetadata> partitionGroupMetadataList = new ArrayList<>();
-    int threshold = computer.computeThreshold(streamConfig, committingSegmentDescriptor, committingSegmentZKMetadata,
-        partitionGroupMetadataList, "newSegmentName");
+    int threshold = computer.computeThreshold(streamConfig, committingSegmentDescriptor, null, "newSegmentName");
 
     assertEquals(threshold, autoTuneInitialRows);
   }
@@ -64,11 +59,8 @@ public class SegmentFlushThresholdComputerTest {
     when(streamConfig.getFlushThresholdSegmentSizeBytes()).thenReturn(segmentSizeBytes);
 
     CommittingSegmentDescriptor committingSegmentDescriptor = mock(CommittingSegmentDescriptor.class);
-    SegmentZKMetadata committingSegmentZKMetadata = null;
 
-    List<PartitionGroupMetadata> partitionGroupMetadataList = new ArrayList<>();
-    int threshold = computer.computeThreshold(streamConfig, committingSegmentDescriptor, committingSegmentZKMetadata,
-        partitionGroupMetadataList, "newSegmentName");
+    int threshold = computer.computeThreshold(streamConfig, committingSegmentDescriptor, null, "newSegmentName");
 
     // segmentSize * 1.5
     // 20000 * 1.5
@@ -86,11 +78,8 @@ public class SegmentFlushThresholdComputerTest {
     when(streamConfig.getFlushThresholdSegmentSizeBytes()).thenReturn(segmentSizeBytes);
 
     CommittingSegmentDescriptor committingSegmentDescriptor = mock(CommittingSegmentDescriptor.class);
-    SegmentZKMetadata committingSegmentZKMetadata = null;
 
-    List<PartitionGroupMetadata> partitionGroupMetadataList = new ArrayList<>();
-    int threshold = computer.computeThreshold(streamConfig, committingSegmentDescriptor, committingSegmentZKMetadata,
-        partitionGroupMetadataList, "newSegmentName");
+    int threshold = computer.computeThreshold(streamConfig, committingSegmentDescriptor, null, "newSegmentName");
 
     assertEquals(threshold, 10000);
   }
@@ -110,9 +99,8 @@ public class SegmentFlushThresholdComputerTest {
     SegmentZKMetadata committingSegmentZKMetadata = mock(SegmentZKMetadata.class);
     when(committingSegmentZKMetadata.getSizeThresholdToFlushSegment()).thenReturn(segmentSizeThreshold);
 
-    List<PartitionGroupMetadata> partitionGroupMetadataList = new ArrayList<>();
     int threshold = computer.computeThreshold(streamConfig, committingSegmentDescriptor, committingSegmentZKMetadata,
-        partitionGroupMetadataList, "newSegmentName");
+        "newSegmentName");
 
     assertEquals(threshold, segmentSizeThreshold);
   }
@@ -137,9 +125,8 @@ public class SegmentFlushThresholdComputerTest {
     when(committingSegmentZKMetadata.getCreationTime()).thenReturn(
         currentTime - MILLISECONDS.convert(1, TimeUnit.HOURS));
 
-    List<PartitionGroupMetadata> partitionGroupMetadataList = new ArrayList<>();
     int threshold = computer.computeThreshold(streamConfig, committingSegmentDescriptor, committingSegmentZKMetadata,
-        partitionGroupMetadataList, "events3__0__0__20211222T1646Z");
+        "events3__0__0__20211222T1646Z");
 
     // totalDocs * 1.1
     // 10000 * 1.1
@@ -166,9 +153,8 @@ public class SegmentFlushThresholdComputerTest {
     when(committingSegmentZKMetadata.getCreationTime()).thenReturn(
         currentTime - MILLISECONDS.convert(2, TimeUnit.HOURS));
 
-    List<PartitionGroupMetadata> partitionGroupMetadataList = new ArrayList<>();
     int threshold = computer.computeThreshold(streamConfig, committingSegmentDescriptor, committingSegmentZKMetadata,
-        partitionGroupMetadataList, "events3__0__0__20211222T1646Z");
+        "events3__0__0__20211222T1646Z");
 
     // (totalDocs / 2) * 1.1
     // (30000 / 2) * 1.1
@@ -190,9 +176,8 @@ public class SegmentFlushThresholdComputerTest {
     when(committingSegmentZKMetadata.getTotalDocs()).thenReturn(30_000L);
     when(committingSegmentZKMetadata.getSizeThresholdToFlushSegment()).thenReturn(20_000);
 
-    List<PartitionGroupMetadata> partitionGroupMetadataList = new ArrayList<>();
     int threshold = computer.computeThreshold(streamConfig, committingSegmentDescriptor, committingSegmentZKMetadata,
-        partitionGroupMetadataList, "events3__0__0__20211222T1646Z");
+        "events3__0__0__20211222T1646Z");
 
     // totalDocs / 2
     // 30000 / 2
@@ -213,9 +198,8 @@ public class SegmentFlushThresholdComputerTest {
     when(committingSegmentZKMetadata.getTotalDocs()).thenReturn(30_000L);
     when(committingSegmentZKMetadata.getSizeThresholdToFlushSegment()).thenReturn(20_000);
 
-    List<PartitionGroupMetadata> partitionGroupMetadataList = new ArrayList<>();
     int threshold = computer.computeThreshold(streamConfig, committingSegmentDescriptor, committingSegmentZKMetadata,
-        partitionGroupMetadataList, "events3__0__0__20211222T1646Z");
+        "events3__0__0__20211222T1646Z");
 
     // totalDocs + (totalDocs / 2)
     // 30000 + (30000 / 2)
@@ -236,9 +220,8 @@ public class SegmentFlushThresholdComputerTest {
     when(committingSegmentZKMetadata.getTotalDocs()).thenReturn(30_000L);
     when(committingSegmentZKMetadata.getSizeThresholdToFlushSegment()).thenReturn(20_000);
 
-    List<PartitionGroupMetadata> partitionGroupMetadataList = new ArrayList<>();
     int threshold = computer.computeThreshold(streamConfig, committingSegmentDescriptor, committingSegmentZKMetadata,
-        partitionGroupMetadataList, "events3__0__0__20211222T1646Z");
+        "events3__0__0__20211222T1646Z");
 
     // (totalDocs / segmentSize) * flushThresholdSegmentSize
     // (30000 / 250000) * 300000
@@ -259,9 +242,8 @@ public class SegmentFlushThresholdComputerTest {
     when(committingSegmentZKMetadata.getTotalDocs()).thenReturn(0L);
     when(committingSegmentZKMetadata.getSizeThresholdToFlushSegment()).thenReturn(0);
 
-    List<PartitionGroupMetadata> partitionGroupMetadataList = new ArrayList<>();
     int threshold = computer.computeThreshold(streamConfig, committingSegmentDescriptor, committingSegmentZKMetadata,
-        partitionGroupMetadataList, "events3__0__0__20211222T1646Z");
+        "events3__0__0__20211222T1646Z");
 
     // max((totalDocs / segmentSize) * flushThresholdSegmentSize, 10000)
     // max(0, 10000)
@@ -282,17 +264,15 @@ public class SegmentFlushThresholdComputerTest {
     when(committingSegmentZKMetadata.getTotalDocs()).thenReturn(30_000L, 50_000L);
     when(committingSegmentZKMetadata.getSizeThresholdToFlushSegment()).thenReturn(60_000);
 
-    List<PartitionGroupMetadata> partitionGroupMetadataList = new ArrayList<>();
-
     computer.computeThreshold(streamConfig, committingSegmentDescriptor, committingSegmentZKMetadata,
-        partitionGroupMetadataList, "events3__0__0__20211222T1646Z");
+        "events3__0__0__20211222T1646Z");
 
     // (totalDocs / segmentSize)
     // (30000 / 200000)
     assertEquals(computer.getLatestSegmentRowsToSizeRatio(), 0.15);
 
     computer.computeThreshold(streamConfig, committingSegmentDescriptor, committingSegmentZKMetadata,
-        partitionGroupMetadataList, "events3__0__0__20211222T1646Z");
+        "events3__0__0__20211222T1646Z");
 
     // (0.1 * (totalDocs / segmentSize)) + (0.9 * lastRatio)
     // (0.1 * (50000 / 200000)) + (0.9 * 0.15)
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMetadataProvider.java b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMetadataProvider.java
index 77abf423c9..c59c15a028 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMetadataProvider.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMetadataProvider.java
@@ -19,6 +19,9 @@
 package org.apache.pinot.core.realtime.impl.fakestream;
 
 import java.io.IOException;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 import org.apache.pinot.spi.stream.OffsetCriteria;
 import org.apache.pinot.spi.stream.StreamConfig;
 import org.apache.pinot.spi.stream.StreamMetadataProvider;
@@ -40,6 +43,11 @@ public class FakeStreamMetadataProvider implements StreamMetadataProvider {
     return _numPartitions;
   }
 
+  @Override
+  public Set<Integer> fetchPartitionIds(long timeoutMillis) {
+    return IntStream.range(0, _numPartitions).boxed().collect(Collectors.toSet());
+  }
+
   @Override
   public StreamPartitionMsgOffset fetchStreamPartitionOffset(OffsetCriteria offsetCriteria, long timeoutMillis) {
     if (offsetCriteria.isSmallest()) {
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
index 3e8928da12..1086c45d99 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.plugin.stream.kafka20;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
 import java.io.IOException;
 import java.time.Clock;
 import java.time.Duration;
@@ -26,6 +27,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
 import org.apache.kafka.common.PartitionInfo;
@@ -70,6 +72,23 @@ public class KafkaStreamMetadataProvider extends KafkaPartitionLevelConnectionHa
     }
   }
 
+  @Override
+  public Set<Integer> fetchPartitionIds(long timeoutMillis) {
+    try {
+      List<PartitionInfo> partitionInfos = _consumer.partitionsFor(_topic, Duration.ofMillis(timeoutMillis));
+      if (CollectionUtils.isEmpty(partitionInfos)) {
+        throw new RuntimeException(String.format("Failed to fetch partition information for topic: %s", _topic));
+      }
+      Set<Integer> partitionIds = Sets.newHashSetWithExpectedSize(partitionInfos.size());
+      for (PartitionInfo partitionInfo : partitionInfos) {
+        partitionIds.add(partitionInfo.partition());
+      }
+      return partitionIds;
+    } catch (TimeoutException e) {
+      throw new TransientConsumerException(e);
+    }
+  }
+
   @Override
   public StreamPartitionMsgOffset fetchStreamPartitionOffset(OffsetCriteria offsetCriteria, long timeoutMillis) {
     Preconditions.checkNotNull(offsetCriteria);
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamMetadataProvider.java b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamMetadataProvider.java
index 788e5e5ed3..1413aa4334 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamMetadataProvider.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamMetadataProvider.java
@@ -19,9 +19,11 @@
 package org.apache.pinot.plugin.stream.pulsar;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -31,6 +33,7 @@ import org.apache.pinot.spi.stream.PartitionGroupMetadata;
 import org.apache.pinot.spi.stream.StreamConfig;
 import org.apache.pinot.spi.stream.StreamMetadataProvider;
 import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+import org.apache.pinot.spi.stream.TransientConsumerException;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
@@ -64,12 +67,24 @@ public class PulsarStreamMetadataProvider extends PulsarPartitionLevelConnection
   @Override
   public int fetchPartitionCount(long timeoutMillis) {
     try {
-      return _pulsarClient.getPartitionsForTopic(_topic).get().size();
+      return _pulsarClient.getPartitionsForTopic(_topic).get(timeoutMillis, TimeUnit.MILLISECONDS).size();
+    } catch (TimeoutException e) {
+      throw new TransientConsumerException(e);
     } catch (Exception e) {
-      throw new RuntimeException("Cannot fetch partitions for topic: " + _topic, e);
+      throw new RuntimeException("Failed to fetch partitions for topic: " + _topic, e);
     }
   }
 
+  @Override
+  public Set<Integer> fetchPartitionIds(long timeoutMillis) {
+    int partitionCount = fetchPartitionCount(timeoutMillis);
+    Set<Integer> partitionIds = Sets.newHashSetWithExpectedSize(partitionCount);
+    for (int i = 0; i < partitionCount; i++) {
+      partitionIds.add(i);
+    }
+    return partitionIds;
+  }
+
   /**
    * Fetch the messageId and use it as offset.
    * If offset criteria is smallest, the message id of earliest record in the partition is returned.
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
index 94abc3ca6d..78847c0627 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
@@ -24,6 +24,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.TimeoutException;
 import org.apache.pinot.spi.annotations.InterfaceAudience;
 import org.apache.pinot.spi.annotations.InterfaceStability;
@@ -43,6 +44,13 @@ public interface StreamMetadataProvider extends Closeable {
    */
   int fetchPartitionCount(long timeoutMillis);
 
+  /**
+   * Fetches the partition ids for a topic given the stream configs.
+   */
+  default Set<Integer> fetchPartitionIds(long timeoutMillis) {
+    throw new UnsupportedOperationException();
+  }
+
   /**
    * Fetches the offset for a given partition and offset criteria
    * @param offsetCriteria offset criteria to fetch{@link StreamPartitionMsgOffset}.
@@ -98,5 +106,6 @@ public interface StreamMetadataProvider extends Closeable {
     return result;
   }
 
-  class UnknownLagState extends PartitionLagState { }
+  class UnknownLagState extends PartitionLagState {
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org