You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by "Jackie-Jiang (via GitHub)" <gi...@apache.org> on 2023/11/03 00:41:55 UTC

[PR] Optimize segment commit to not read partition group metadata [pinot]

Jackie-Jiang opened a new pull request, #11943:
URL: https://github.com/apache/pinot/pull/11943

   Currently when committing a real-time segment, controller needs to read partition group metadata for all partitions from upstream, which can be very slow for stream with lots of partitions.
   The partition group metadata is used only to extract the partition ids, which can be simply derived from partition count except for stream that closes partitions such as Kinesis.
   
   In this PR, we made the following changes:
   1. Only read partition count from upstream if available
   2. If partition count is not available, fall back to the current approach
   3. Log the time spent in each step for debugging
   4. In `SegmentFlushThresholdComputer`, remove the logic of only counting the segments with smallest partition id for the size ratio because it complicates the handling (quite anti-pattern as any segment commit requires info from all partitions) a lot, and I don't see much value from it. Quickly converging to the size ratio of recent data trend should be a pro instead of a con because this ratio is used to decide the segment size to consume data for the same period of time


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Optimize segment commit to not read partition group metadata [pinot]

Posted by "Jackie-Jiang (via GitHub)" <gi...@apache.org>.
Jackie-Jiang commented on code in PR #11943:
URL: https://github.com/apache/pinot/pull/11943#discussion_r1394997095


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -518,40 +523,56 @@ private void commitSegmentMetadataInternal(String realtimeTableName,
      */
 
     // Step-1
+    long startTimeNs1 = System.nanoTime();
     SegmentZKMetadata committingSegmentZKMetadata =
         updateCommittingSegmentZKMetadata(realtimeTableName, committingSegmentDescriptor);
     // Refresh the Broker routing to reflect the changes in the segment ZK metadata
     _helixResourceManager.sendSegmentRefreshMessage(realtimeTableName, committingSegmentName, false, true);
 
-    // Using the latest segment of each partition group, creates a list of {@link PartitionGroupConsumptionStatus}
-    StreamConfig streamConfig =
-        new StreamConfig(tableConfig.getTableName(), IngestionConfigUtils.getStreamConfigMap(tableConfig));
-    List<PartitionGroupConsumptionStatus> currentPartitionGroupConsumptionStatusList =
-        getPartitionGroupConsumptionStatusList(idealState, streamConfig);
-
-    // Fetches new partition groups, given current list of {@link PartitionGroupConsumptionStatus}.
-    List<PartitionGroupMetadata> newPartitionGroupMetadataList =
-        getNewPartitionGroupMetadataList(streamConfig, currentPartitionGroupConsumptionStatusList);
-    Set<Integer> newPartitionGroupSet =
-        newPartitionGroupMetadataList.stream().map(PartitionGroupMetadata::getPartitionGroupId)
-            .collect(Collectors.toSet());
-    int numPartitionGroups = newPartitionGroupMetadataList.size();
-
+    // Step-2
+    long startTimeNs2 = System.nanoTime();
     String newConsumingSegmentName = null;
-    if (!isTablePaused(idealState) && newPartitionGroupSet.contains(committingSegmentPartitionGroupId)) {
-      // Only if committingSegment's partitionGroup is present in the newPartitionGroupMetadataList, we create new
-      // segment metadata
-      String rawTableName = TableNameBuilder.extractRawTableName(realtimeTableName);
-      long newSegmentCreationTimeMs = getCurrentTimeMs();
-      LLCSegmentName newLLCSegment = new LLCSegmentName(rawTableName, committingSegmentPartitionGroupId,
-          committingLLCSegment.getSequenceNumber() + 1, newSegmentCreationTimeMs);
-      createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegment, newSegmentCreationTimeMs,
-          committingSegmentDescriptor, committingSegmentZKMetadata, instancePartitions, numPartitionGroups, numReplicas,
-          newPartitionGroupMetadataList);
-      newConsumingSegmentName = newLLCSegment.getSegmentName();
+    if (!isTablePaused(idealState)) {
+      int numPartitionGroups;
+      boolean shouldCreateNewConsumingSegment;
+      StreamConfig streamConfig =
+          new StreamConfig(tableConfig.getTableName(), IngestionConfigUtils.getStreamConfigMap(tableConfig));
+      try {
+        numPartitionGroups = getNumPartitionGroups(streamConfig);
+        shouldCreateNewConsumingSegment = numPartitionGroups > committingSegmentPartitionGroupId;
+      } catch (Exception e) {
+        LOGGER.info("Failed to read partition count from stream metadata provider for table: {}, exception: {}. "
+                + "Reading all partition group metadata to determine partition count and partition group status.",
+            realtimeTableName, e.toString());
+        // TODO: Find a better way to determine partition count and if the committing partition group is fully consumed.
+        //       We don't need to read partition group metadata for other partition groups.
+        List<PartitionGroupConsumptionStatus> currentPartitionGroupConsumptionStatusList =
+            getPartitionGroupConsumptionStatusList(idealState, streamConfig);
+        List<PartitionGroupMetadata> newPartitionGroupMetadataList =
+            getNewPartitionGroupMetadataList(streamConfig, currentPartitionGroupConsumptionStatusList);
+        numPartitionGroups = newPartitionGroupMetadataList.size();
+        shouldCreateNewConsumingSegment = false;
+        for (PartitionGroupMetadata newPartitionGroupMetadata : newPartitionGroupMetadataList) {
+          if (newPartitionGroupMetadata.getPartitionGroupId() == committingSegmentPartitionGroupId) {
+            shouldCreateNewConsumingSegment = true;
+            break;
+          }
+        }
+      }
+      if (shouldCreateNewConsumingSegment) {

Review Comment:
   @jadami10 Added `Set<Integer> fetchPartitionIds(long timeoutMillis)` into `StreamMetadataProvider` with a default implementation. Let me know if it suits your need



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Optimize segment commit to not read partition group metadata [pinot]

Posted by "Jackie-Jiang (via GitHub)" <gi...@apache.org>.
Jackie-Jiang commented on code in PR #11943:
URL: https://github.com/apache/pinot/pull/11943#discussion_r1386096273


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -518,40 +523,56 @@ private void commitSegmentMetadataInternal(String realtimeTableName,
      */
 
     // Step-1
+    long startTimeNs1 = System.nanoTime();
     SegmentZKMetadata committingSegmentZKMetadata =
         updateCommittingSegmentZKMetadata(realtimeTableName, committingSegmentDescriptor);
     // Refresh the Broker routing to reflect the changes in the segment ZK metadata
     _helixResourceManager.sendSegmentRefreshMessage(realtimeTableName, committingSegmentName, false, true);
 
-    // Using the latest segment of each partition group, creates a list of {@link PartitionGroupConsumptionStatus}
-    StreamConfig streamConfig =
-        new StreamConfig(tableConfig.getTableName(), IngestionConfigUtils.getStreamConfigMap(tableConfig));
-    List<PartitionGroupConsumptionStatus> currentPartitionGroupConsumptionStatusList =
-        getPartitionGroupConsumptionStatusList(idealState, streamConfig);
-
-    // Fetches new partition groups, given current list of {@link PartitionGroupConsumptionStatus}.
-    List<PartitionGroupMetadata> newPartitionGroupMetadataList =
-        getNewPartitionGroupMetadataList(streamConfig, currentPartitionGroupConsumptionStatusList);
-    Set<Integer> newPartitionGroupSet =
-        newPartitionGroupMetadataList.stream().map(PartitionGroupMetadata::getPartitionGroupId)
-            .collect(Collectors.toSet());
-    int numPartitionGroups = newPartitionGroupMetadataList.size();
-
+    // Step-2
+    long startTimeNs2 = System.nanoTime();
     String newConsumingSegmentName = null;
-    if (!isTablePaused(idealState) && newPartitionGroupSet.contains(committingSegmentPartitionGroupId)) {
-      // Only if committingSegment's partitionGroup is present in the newPartitionGroupMetadataList, we create new
-      // segment metadata
-      String rawTableName = TableNameBuilder.extractRawTableName(realtimeTableName);
-      long newSegmentCreationTimeMs = getCurrentTimeMs();
-      LLCSegmentName newLLCSegment = new LLCSegmentName(rawTableName, committingSegmentPartitionGroupId,
-          committingLLCSegment.getSequenceNumber() + 1, newSegmentCreationTimeMs);
-      createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegment, newSegmentCreationTimeMs,
-          committingSegmentDescriptor, committingSegmentZKMetadata, instancePartitions, numPartitionGroups, numReplicas,
-          newPartitionGroupMetadataList);
-      newConsumingSegmentName = newLLCSegment.getSegmentName();
+    if (!isTablePaused(idealState)) {
+      int numPartitionGroups;
+      boolean shouldCreateNewConsumingSegment;
+      StreamConfig streamConfig =
+          new StreamConfig(tableConfig.getTableName(), IngestionConfigUtils.getStreamConfigMap(tableConfig));
+      try {
+        numPartitionGroups = getNumPartitionGroups(streamConfig);
+        shouldCreateNewConsumingSegment = numPartitionGroups > committingSegmentPartitionGroupId;
+      } catch (Exception e) {
+        LOGGER.info("Failed to read partition count from stream metadata provider for table: {}, exception: {}. "
+                + "Reading all partition group metadata to determine partition count and partition group status.",
+            realtimeTableName, e.toString());
+        // TODO: Find a better way to determine partition count and if the committing partition group is fully consumed.
+        //       We don't need to read partition group metadata for other partition groups.
+        List<PartitionGroupConsumptionStatus> currentPartitionGroupConsumptionStatusList =
+            getPartitionGroupConsumptionStatusList(idealState, streamConfig);
+        List<PartitionGroupMetadata> newPartitionGroupMetadataList =
+            getNewPartitionGroupMetadataList(streamConfig, currentPartitionGroupConsumptionStatusList);
+        numPartitionGroups = newPartitionGroupMetadataList.size();
+        shouldCreateNewConsumingSegment = false;
+        for (PartitionGroupMetadata newPartitionGroupMetadata : newPartitionGroupMetadataList) {
+          if (newPartitionGroupMetadata.getPartitionGroupId() == committingSegmentPartitionGroupId) {
+            shouldCreateNewConsumingSegment = true;
+            break;
+          }
+        }
+      }
+      if (shouldCreateNewConsumingSegment) {

Review Comment:
   I don't fully follow. What is your concern here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Optimize segment commit to not read partition group metadata [pinot]

Posted by "mcvsubbu (via GitHub)" <gi...@apache.org>.
mcvsubbu commented on PR #11943:
URL: https://github.com/apache/pinot/pull/11943#issuecomment-1793234655

   [Without looking at the code changes]
   Using the smallest partitionID is because of the algorthm that optimizes the segment size. All partition IDs commit roughly at the same time, so we will not come up with newer values to correct the segment size soon. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Optimize segment commit to not read partition group metadata [pinot]

Posted by "mcvsubbu (via GitHub)" <gi...@apache.org>.
mcvsubbu commented on PR #11943:
URL: https://github.com/apache/pinot/pull/11943#issuecomment-1795513639

   I wrote this program to test out some hypothesis. According to this, it takes slightly more number of iterations to stabilize to the right segment size if  we apply the algorithm for all partitions. I tried with numPartitions = 1 and numPartitions=32.
   
   Assumptions made here:
   - The segment size varies with numrows in a logarithmic fashion (see program). This may be wrong, open to corrections.
   - Segment size reported has a noise of 5% 
   
   I think all partitions are somewhat same in behavior, so we should count each partition once so as to learn best from previous completions. One way we could improve is (with the assumptions that partitions complete roughly at the same time), pick the partition that completes first (may not be partition 0, of course). This was considered at the time, but was hard to do, given that partitions may complete within minutes in some cases.
   
   ```
   public class SegSizeAlgorithmTester {
     static Double sizeToRowsRatio = null;
   
     static final double ALPHA = 0.9;
     static final long optimalSegmentSize = 350*1024*1024;
   
     // Assume segment size varies with number of rows as:
     // segmentSize = A + B * ln (numRows/C), with a noise variation of 5%
     static final double A = 4096;
     static final double B = 9.297e+7;
     static final double C = 9.7e+4;
     static final double noisePercent = 5;
   
     final static int nPartitions = 32;
   
     public static void main(String args[]) {
       int nIterations = 0;  // One iteration is completion of all partitions (roughly at the same time)
   
       int reportedNumRows = computeNumRows(0, 0);
       for (int s = 0; s < 50; s++) {
         double reportedSegSize = 0;
         int numRows = 0;
         for (int j = 0; j < nPartitions; j++) {
           // All partitions report the same number of rows
           reportedSegSize = A + B * Math.log((double) reportedNumRows / C);
           reportedSegSize = addNoiseToReportedSegSize(reportedSegSize, noisePercent);
           numRows = computeNumRows(reportedNumRows, (long) reportedSegSize);
         }
         nIterations++;
         double deviationPercent = Math.abs(optimalSegmentSize-reportedSegSize) *100/optimalSegmentSize;
         System.out.println(nIterations + " " + deviationPercent + " " +  reportedNumRows + " " + reportedSegSize);
         reportedNumRows = numRows;
       }
     }
   
     private static double addNoiseToReportedSegSize(double segSize, double noisePercent) {
       double noise = noisePercent * Math.random() * segSize/100.0;
       if (Math.random() > 0.5) {
         return segSize + noise;
       } else {
         return segSize - noise;
       }
     }
   
     private static int computeNumRows(int numRowsConsumed, long segmentSize) {
       if (segmentSize == 0) { // first segment of table
         return 100_000; // First guess on number of rows
       }
   
       double   currentRatio = (double)segmentSize/numRowsConsumed;
       if (sizeToRowsRatio != null) {
         sizeToRowsRatio = sizeToRowsRatio * (1 - ALPHA) + currentRatio * ALPHA;
       } else {
         sizeToRowsRatio = currentRatio;
       }
   
       int newNumRows;
   
       if (segmentSize <= 0.5 * optimalSegmentSize) {
         // Quicker ramp up
         newNumRows = numRowsConsumed * 3/2;
       } else if (segmentSize >= 2 * optimalSegmentSize) {
         // Even quicker ramp down
         newNumRows = numRowsConsumed/2;
       } else {
         newNumRows = (int) (optimalSegmentSize / sizeToRowsRatio);
       }
   
       return newNumRows;
     }
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Optimize segment commit to not read partition group metadata [pinot]

Posted by "Jackie-Jiang (via GitHub)" <gi...@apache.org>.
Jackie-Jiang commented on PR #11943:
URL: https://github.com/apache/pinot/pull/11943#issuecomment-1795755694

   @mcvsubbu Thanks for taking time writing this program!
   
   > According to this, it takes slightly more number of iterations to stabilize to the right segment size if we apply the algorithm for all partitions. I tried with numPartitions = 1 and numPartitions=32.
   
   Conceptually, with `numPartitions=32`, it should take much less iterations comparing to `numPartitions=1` to stabilize to the desired segment size. The reason why it didn't show in the experiment is because we assume **all partitions report the same number of rows** within an iteration. Within the same iteration, the segment committed earlier can contribute to a more accurate segment size ration to be picked up by the next committing segment, which is not captured in the program.
   The program also assumes all segment commit at roughly the same time, which is not always the case (actually we probably want to avoid this because it can cause hotspot).
   
   Based on the above analysis, do you think we can simply this handling to just count all committing segments?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Optimize segment commit to not read partition group metadata [pinot]

Posted by "jadami10 (via GitHub)" <gi...@apache.org>.
jadami10 commented on code in PR #11943:
URL: https://github.com/apache/pinot/pull/11943#discussion_r1385821102


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -518,40 +523,56 @@ private void commitSegmentMetadataInternal(String realtimeTableName,
      */
 
     // Step-1
+    long startTimeNs1 = System.nanoTime();
     SegmentZKMetadata committingSegmentZKMetadata =
         updateCommittingSegmentZKMetadata(realtimeTableName, committingSegmentDescriptor);
     // Refresh the Broker routing to reflect the changes in the segment ZK metadata
     _helixResourceManager.sendSegmentRefreshMessage(realtimeTableName, committingSegmentName, false, true);
 
-    // Using the latest segment of each partition group, creates a list of {@link PartitionGroupConsumptionStatus}
-    StreamConfig streamConfig =
-        new StreamConfig(tableConfig.getTableName(), IngestionConfigUtils.getStreamConfigMap(tableConfig));
-    List<PartitionGroupConsumptionStatus> currentPartitionGroupConsumptionStatusList =
-        getPartitionGroupConsumptionStatusList(idealState, streamConfig);
-
-    // Fetches new partition groups, given current list of {@link PartitionGroupConsumptionStatus}.
-    List<PartitionGroupMetadata> newPartitionGroupMetadataList =
-        getNewPartitionGroupMetadataList(streamConfig, currentPartitionGroupConsumptionStatusList);
-    Set<Integer> newPartitionGroupSet =
-        newPartitionGroupMetadataList.stream().map(PartitionGroupMetadata::getPartitionGroupId)
-            .collect(Collectors.toSet());
-    int numPartitionGroups = newPartitionGroupMetadataList.size();
-
+    // Step-2
+    long startTimeNs2 = System.nanoTime();
     String newConsumingSegmentName = null;
-    if (!isTablePaused(idealState) && newPartitionGroupSet.contains(committingSegmentPartitionGroupId)) {
-      // Only if committingSegment's partitionGroup is present in the newPartitionGroupMetadataList, we create new
-      // segment metadata
-      String rawTableName = TableNameBuilder.extractRawTableName(realtimeTableName);
-      long newSegmentCreationTimeMs = getCurrentTimeMs();
-      LLCSegmentName newLLCSegment = new LLCSegmentName(rawTableName, committingSegmentPartitionGroupId,
-          committingLLCSegment.getSequenceNumber() + 1, newSegmentCreationTimeMs);
-      createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegment, newSegmentCreationTimeMs,
-          committingSegmentDescriptor, committingSegmentZKMetadata, instancePartitions, numPartitionGroups, numReplicas,
-          newPartitionGroupMetadataList);
-      newConsumingSegmentName = newLLCSegment.getSegmentName();
+    if (!isTablePaused(idealState)) {
+      int numPartitionGroups;
+      boolean shouldCreateNewConsumingSegment;
+      StreamConfig streamConfig =
+          new StreamConfig(tableConfig.getTableName(), IngestionConfigUtils.getStreamConfigMap(tableConfig));
+      try {
+        numPartitionGroups = getNumPartitionGroups(streamConfig);
+        shouldCreateNewConsumingSegment = numPartitionGroups > committingSegmentPartitionGroupId;
+      } catch (Exception e) {
+        LOGGER.info("Failed to read partition count from stream metadata provider for table: {}, exception: {}. "
+                + "Reading all partition group metadata to determine partition count and partition group status.",
+            realtimeTableName, e.toString());
+        // TODO: Find a better way to determine partition count and if the committing partition group is fully consumed.
+        //       We don't need to read partition group metadata for other partition groups.
+        List<PartitionGroupConsumptionStatus> currentPartitionGroupConsumptionStatusList =
+            getPartitionGroupConsumptionStatusList(idealState, streamConfig);
+        List<PartitionGroupMetadata> newPartitionGroupMetadataList =
+            getNewPartitionGroupMetadataList(streamConfig, currentPartitionGroupConsumptionStatusList);
+        numPartitionGroups = newPartitionGroupMetadataList.size();
+        shouldCreateNewConsumingSegment = false;
+        for (PartitionGroupMetadata newPartitionGroupMetadata : newPartitionGroupMetadataList) {
+          if (newPartitionGroupMetadata.getPartitionGroupId() == committingSegmentPartitionGroupId) {
+            shouldCreateNewConsumingSegment = true;
+            break;
+          }
+        }
+      }
+      if (shouldCreateNewConsumingSegment) {

Review Comment:
   Just double checking, nothing here is expecting partitionIds to be monotnically increasing right? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Optimize segment commit to not read partition group metadata [pinot]

Posted by "Jackie-Jiang (via GitHub)" <gi...@apache.org>.
Jackie-Jiang commented on code in PR #11943:
URL: https://github.com/apache/pinot/pull/11943#discussion_r1387514958


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -518,40 +523,56 @@ private void commitSegmentMetadataInternal(String realtimeTableName,
      */
 
     // Step-1
+    long startTimeNs1 = System.nanoTime();
     SegmentZKMetadata committingSegmentZKMetadata =
         updateCommittingSegmentZKMetadata(realtimeTableName, committingSegmentDescriptor);
     // Refresh the Broker routing to reflect the changes in the segment ZK metadata
     _helixResourceManager.sendSegmentRefreshMessage(realtimeTableName, committingSegmentName, false, true);
 
-    // Using the latest segment of each partition group, creates a list of {@link PartitionGroupConsumptionStatus}
-    StreamConfig streamConfig =
-        new StreamConfig(tableConfig.getTableName(), IngestionConfigUtils.getStreamConfigMap(tableConfig));
-    List<PartitionGroupConsumptionStatus> currentPartitionGroupConsumptionStatusList =
-        getPartitionGroupConsumptionStatusList(idealState, streamConfig);
-
-    // Fetches new partition groups, given current list of {@link PartitionGroupConsumptionStatus}.
-    List<PartitionGroupMetadata> newPartitionGroupMetadataList =
-        getNewPartitionGroupMetadataList(streamConfig, currentPartitionGroupConsumptionStatusList);
-    Set<Integer> newPartitionGroupSet =
-        newPartitionGroupMetadataList.stream().map(PartitionGroupMetadata::getPartitionGroupId)
-            .collect(Collectors.toSet());
-    int numPartitionGroups = newPartitionGroupMetadataList.size();
-
+    // Step-2
+    long startTimeNs2 = System.nanoTime();
     String newConsumingSegmentName = null;
-    if (!isTablePaused(idealState) && newPartitionGroupSet.contains(committingSegmentPartitionGroupId)) {
-      // Only if committingSegment's partitionGroup is present in the newPartitionGroupMetadataList, we create new
-      // segment metadata
-      String rawTableName = TableNameBuilder.extractRawTableName(realtimeTableName);
-      long newSegmentCreationTimeMs = getCurrentTimeMs();
-      LLCSegmentName newLLCSegment = new LLCSegmentName(rawTableName, committingSegmentPartitionGroupId,
-          committingLLCSegment.getSequenceNumber() + 1, newSegmentCreationTimeMs);
-      createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegment, newSegmentCreationTimeMs,
-          committingSegmentDescriptor, committingSegmentZKMetadata, instancePartitions, numPartitionGroups, numReplicas,
-          newPartitionGroupMetadataList);
-      newConsumingSegmentName = newLLCSegment.getSegmentName();
+    if (!isTablePaused(idealState)) {
+      int numPartitionGroups;
+      boolean shouldCreateNewConsumingSegment;
+      StreamConfig streamConfig =
+          new StreamConfig(tableConfig.getTableName(), IngestionConfigUtils.getStreamConfigMap(tableConfig));
+      try {
+        numPartitionGroups = getNumPartitionGroups(streamConfig);
+        shouldCreateNewConsumingSegment = numPartitionGroups > committingSegmentPartitionGroupId;
+      } catch (Exception e) {
+        LOGGER.info("Failed to read partition count from stream metadata provider for table: {}, exception: {}. "
+                + "Reading all partition group metadata to determine partition count and partition group status.",
+            realtimeTableName, e.toString());
+        // TODO: Find a better way to determine partition count and if the committing partition group is fully consumed.
+        //       We don't need to read partition group metadata for other partition groups.
+        List<PartitionGroupConsumptionStatus> currentPartitionGroupConsumptionStatusList =
+            getPartitionGroupConsumptionStatusList(idealState, streamConfig);
+        List<PartitionGroupMetadata> newPartitionGroupMetadataList =
+            getNewPartitionGroupMetadataList(streamConfig, currentPartitionGroupConsumptionStatusList);
+        numPartitionGroups = newPartitionGroupMetadataList.size();
+        shouldCreateNewConsumingSegment = false;
+        for (PartitionGroupMetadata newPartitionGroupMetadata : newPartitionGroupMetadataList) {
+          if (newPartitionGroupMetadata.getPartitionGroupId() == committingSegmentPartitionGroupId) {
+            shouldCreateNewConsumingSegment = true;
+            break;
+          }
+        }
+      }
+      if (shouldCreateNewConsumingSegment) {

Review Comment:
   That is indeed a problem. Can you elaborate more on the partitionIds in your stream metadata provider?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Optimize segment commit to not read partition group metadata [pinot]

Posted by "jadami10 (via GitHub)" <gi...@apache.org>.
jadami10 commented on code in PR #11943:
URL: https://github.com/apache/pinot/pull/11943#discussion_r1397594029


##########
pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java:
##########
@@ -43,6 +45,18 @@ public interface StreamMetadataProvider extends Closeable {
    */
   int fetchPartitionCount(long timeoutMillis);
 
+  /**
+   * Fetches the partition ids for a topic given the stream configs.
+   */
+  default Set<Integer> fetchPartitionIds(long timeoutMillis) {

Review Comment:
   I think the default unfortunately needs to be `fetchStreamPartitionOffset` and get the partition ids from there. That's effectively what the previous behavior was, so anyone else like us who wasn't depending on sequentially increasing partition IDs will break



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Optimize segment commit to not read partition group metadata [pinot]

Posted by "jadami10 (via GitHub)" <gi...@apache.org>.
jadami10 commented on code in PR #11943:
URL: https://github.com/apache/pinot/pull/11943#discussion_r1398503477


##########
pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java:
##########
@@ -43,6 +45,18 @@ public interface StreamMetadataProvider extends Closeable {
    */
   int fetchPartitionCount(long timeoutMillis);
 
+  /**
+   * Fetches the partition ids for a topic given the stream configs.
+   */
+  default Set<Integer> fetchPartitionIds(long timeoutMillis) {

Review Comment:
   oh good point. then yes, this lgtm



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Optimize segment commit to not read partition group metadata [pinot]

Posted by "jadami10 (via GitHub)" <gi...@apache.org>.
jadami10 commented on code in PR #11943:
URL: https://github.com/apache/pinot/pull/11943#discussion_r1387995741


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -518,40 +523,56 @@ private void commitSegmentMetadataInternal(String realtimeTableName,
      */
 
     // Step-1
+    long startTimeNs1 = System.nanoTime();
     SegmentZKMetadata committingSegmentZKMetadata =
         updateCommittingSegmentZKMetadata(realtimeTableName, committingSegmentDescriptor);
     // Refresh the Broker routing to reflect the changes in the segment ZK metadata
     _helixResourceManager.sendSegmentRefreshMessage(realtimeTableName, committingSegmentName, false, true);
 
-    // Using the latest segment of each partition group, creates a list of {@link PartitionGroupConsumptionStatus}
-    StreamConfig streamConfig =
-        new StreamConfig(tableConfig.getTableName(), IngestionConfigUtils.getStreamConfigMap(tableConfig));
-    List<PartitionGroupConsumptionStatus> currentPartitionGroupConsumptionStatusList =
-        getPartitionGroupConsumptionStatusList(idealState, streamConfig);
-
-    // Fetches new partition groups, given current list of {@link PartitionGroupConsumptionStatus}.
-    List<PartitionGroupMetadata> newPartitionGroupMetadataList =
-        getNewPartitionGroupMetadataList(streamConfig, currentPartitionGroupConsumptionStatusList);
-    Set<Integer> newPartitionGroupSet =
-        newPartitionGroupMetadataList.stream().map(PartitionGroupMetadata::getPartitionGroupId)
-            .collect(Collectors.toSet());
-    int numPartitionGroups = newPartitionGroupMetadataList.size();
-
+    // Step-2
+    long startTimeNs2 = System.nanoTime();
     String newConsumingSegmentName = null;
-    if (!isTablePaused(idealState) && newPartitionGroupSet.contains(committingSegmentPartitionGroupId)) {
-      // Only if committingSegment's partitionGroup is present in the newPartitionGroupMetadataList, we create new
-      // segment metadata
-      String rawTableName = TableNameBuilder.extractRawTableName(realtimeTableName);
-      long newSegmentCreationTimeMs = getCurrentTimeMs();
-      LLCSegmentName newLLCSegment = new LLCSegmentName(rawTableName, committingSegmentPartitionGroupId,
-          committingLLCSegment.getSequenceNumber() + 1, newSegmentCreationTimeMs);
-      createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegment, newSegmentCreationTimeMs,
-          committingSegmentDescriptor, committingSegmentZKMetadata, instancePartitions, numPartitionGroups, numReplicas,
-          newPartitionGroupMetadataList);
-      newConsumingSegmentName = newLLCSegment.getSegmentName();
+    if (!isTablePaused(idealState)) {
+      int numPartitionGroups;
+      boolean shouldCreateNewConsumingSegment;
+      StreamConfig streamConfig =
+          new StreamConfig(tableConfig.getTableName(), IngestionConfigUtils.getStreamConfigMap(tableConfig));
+      try {
+        numPartitionGroups = getNumPartitionGroups(streamConfig);
+        shouldCreateNewConsumingSegment = numPartitionGroups > committingSegmentPartitionGroupId;
+      } catch (Exception e) {
+        LOGGER.info("Failed to read partition count from stream metadata provider for table: {}, exception: {}. "
+                + "Reading all partition group metadata to determine partition count and partition group status.",
+            realtimeTableName, e.toString());
+        // TODO: Find a better way to determine partition count and if the committing partition group is fully consumed.
+        //       We don't need to read partition group metadata for other partition groups.
+        List<PartitionGroupConsumptionStatus> currentPartitionGroupConsumptionStatusList =
+            getPartitionGroupConsumptionStatusList(idealState, streamConfig);
+        List<PartitionGroupMetadata> newPartitionGroupMetadataList =
+            getNewPartitionGroupMetadataList(streamConfig, currentPartitionGroupConsumptionStatusList);
+        numPartitionGroups = newPartitionGroupMetadataList.size();
+        shouldCreateNewConsumingSegment = false;
+        for (PartitionGroupMetadata newPartitionGroupMetadata : newPartitionGroupMetadataList) {
+          if (newPartitionGroupMetadata.getPartitionGroupId() == committingSegmentPartitionGroupId) {
+            shouldCreateNewConsumingSegment = true;
+            break;
+          }
+        }
+      }
+      if (shouldCreateNewConsumingSegment) {

Review Comment:
   since we consume from multiple kafka clusters, our partitionIds are a custom encoding with half the bits representing the kafka cluster and the other half representing the partition number.
   
   Our "partitioncount" returns the total partitions across all clusters. So we do expect `fetchPartitionCount` to increase. It's just that the IDs themselves aren't ordered.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Optimize segment commit to not read partition group metadata [pinot]

Posted by "Jackie-Jiang (via GitHub)" <gi...@apache.org>.
Jackie-Jiang commented on code in PR #11943:
URL: https://github.com/apache/pinot/pull/11943#discussion_r1397830314


##########
pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java:
##########
@@ -43,6 +45,18 @@ public interface StreamMetadataProvider extends Closeable {
    */
   int fetchPartitionCount(long timeoutMillis);
 
+  /**
+   * Fetches the partition ids for a topic given the stream configs.
+   */
+  default Set<Integer> fetchPartitionIds(long timeoutMillis) {

Review Comment:
   By default this method is unsupported and will fall back to `computePartitionGroupMetadata()` at the caller side. We cannot directly use `computePartitionGroupMetadata()` as the default here because the arguments are different, and we don't want to force the caller to read the consumption status for all existing partitions



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Optimize segment commit to not read partition group metadata [pinot]

Posted by "jadami10 (via GitHub)" <gi...@apache.org>.
jadami10 commented on code in PR #11943:
URL: https://github.com/apache/pinot/pull/11943#discussion_r1397778891


##########
pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java:
##########
@@ -43,6 +45,18 @@ public interface StreamMetadataProvider extends Closeable {
    */
   int fetchPartitionCount(long timeoutMillis);
 
+  /**
+   * Fetches the partition ids for a topic given the stream configs.
+   */
+  default Set<Integer> fetchPartitionIds(long timeoutMillis) {

Review Comment:
   should we still use `fetchStreamPartitionOffset` as the default? this will break anyone upgrading with custom stream ingestion unless they can perfectly coordinate plugin deploys with new pinot deploys.
   
   in our case, we could do it. But plugins live in a separate repo, so it would take a fair bit of coordination to get the latest libraries, update the plugin, deploy both server and plugin, and make sure nothing restarts in the middle



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Optimize segment commit to not read partition group metadata [pinot]

Posted by "Jackie-Jiang (via GitHub)" <gi...@apache.org>.
Jackie-Jiang commented on PR #11943:
URL: https://github.com/apache/pinot/pull/11943#issuecomment-1791750445

   cc @jadami10 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Optimize segment commit to not read partition group metadata [pinot]

Posted by "codecov-commenter (via GitHub)" <gi...@apache.org>.
codecov-commenter commented on PR #11943:
URL: https://github.com/apache/pinot/pull/11943#issuecomment-1791771062

   ## [Codecov](https://app.codecov.io/gh/apache/pinot/pull/11943?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) Report
   > Merging [#11943](https://app.codecov.io/gh/apache/pinot/pull/11943?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) (ce7c148) into [master](https://app.codecov.io/gh/apache/pinot/commit/53e80331a4e8cb1593e01b8133a6d90aa39cef22?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) (53e8033) will **decrease** coverage by `14.88%`.
   > The diff coverage is `n/a`.
   
   ```diff
   @@              Coverage Diff              @@
   ##             master   #11943       +/-   ##
   =============================================
   - Coverage     61.30%   46.42%   -14.88%     
   + Complexity     1140      939      -201     
   =============================================
     Files          2378     1780      -598     
     Lines        128865    93326    -35539     
     Branches      19927    15080     -4847     
   =============================================
   - Hits          78998    43327    -35671     
   - Misses        44147    46957     +2810     
   + Partials       5720     3042     -2678     
   ```
   
   | [Flag](https://app.codecov.io/gh/apache/pinot/pull/11943/flags?src=pr&el=flags&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Coverage Δ | |
   |---|---|---|
   | [custom-integration1](https://app.codecov.io/gh/apache/pinot/pull/11943/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
   | [integration](https://app.codecov.io/gh/apache/pinot/pull/11943/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
   | [integration1](https://app.codecov.io/gh/apache/pinot/pull/11943/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
   | [java-11](https://app.codecov.io/gh/apache/pinot/pull/11943/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
   | [java-21](https://app.codecov.io/gh/apache/pinot/pull/11943/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `46.42% <ø> (-14.88%)` | :arrow_down: |
   | [skip-bytebuffers-false](https://app.codecov.io/gh/apache/pinot/pull/11943/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `46.42% <ø> (-14.86%)` | :arrow_down: |
   | [skip-bytebuffers-true](https://app.codecov.io/gh/apache/pinot/pull/11943/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
   | [temurin](https://app.codecov.io/gh/apache/pinot/pull/11943/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `46.42% <ø> (-14.88%)` | :arrow_down: |
   | [unittests](https://app.codecov.io/gh/apache/pinot/pull/11943/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `46.42% <ø> (-14.88%)` | :arrow_down: |
   | [unittests1](https://app.codecov.io/gh/apache/pinot/pull/11943/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `46.42% <ø> (-0.04%)` | :arrow_down: |
   | [unittests2](https://app.codecov.io/gh/apache/pinot/pull/11943/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   [see 974 files with indirect coverage changes](https://app.codecov.io/gh/apache/pinot/pull/11943/indirect-changes?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
   
   :mega: Codecov offers a browser extension for seamless coverage viewing on GitHub. Try it in [Chrome](https://chrome.google.com/webstore/detail/codecov/gedikamndpbemklijjkncpnolildpbgo) or [Firefox](https://addons.mozilla.org/en-US/firefox/addon/codecov/) today!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Optimize segment commit to not read partition group metadata [pinot]

Posted by "jadami10 (via GitHub)" <gi...@apache.org>.
jadami10 commented on code in PR #11943:
URL: https://github.com/apache/pinot/pull/11943#discussion_r1387424953


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -518,40 +523,56 @@ private void commitSegmentMetadataInternal(String realtimeTableName,
      */
 
     // Step-1
+    long startTimeNs1 = System.nanoTime();
     SegmentZKMetadata committingSegmentZKMetadata =
         updateCommittingSegmentZKMetadata(realtimeTableName, committingSegmentDescriptor);
     // Refresh the Broker routing to reflect the changes in the segment ZK metadata
     _helixResourceManager.sendSegmentRefreshMessage(realtimeTableName, committingSegmentName, false, true);
 
-    // Using the latest segment of each partition group, creates a list of {@link PartitionGroupConsumptionStatus}
-    StreamConfig streamConfig =
-        new StreamConfig(tableConfig.getTableName(), IngestionConfigUtils.getStreamConfigMap(tableConfig));
-    List<PartitionGroupConsumptionStatus> currentPartitionGroupConsumptionStatusList =
-        getPartitionGroupConsumptionStatusList(idealState, streamConfig);
-
-    // Fetches new partition groups, given current list of {@link PartitionGroupConsumptionStatus}.
-    List<PartitionGroupMetadata> newPartitionGroupMetadataList =
-        getNewPartitionGroupMetadataList(streamConfig, currentPartitionGroupConsumptionStatusList);
-    Set<Integer> newPartitionGroupSet =
-        newPartitionGroupMetadataList.stream().map(PartitionGroupMetadata::getPartitionGroupId)
-            .collect(Collectors.toSet());
-    int numPartitionGroups = newPartitionGroupMetadataList.size();
-
+    // Step-2
+    long startTimeNs2 = System.nanoTime();
     String newConsumingSegmentName = null;
-    if (!isTablePaused(idealState) && newPartitionGroupSet.contains(committingSegmentPartitionGroupId)) {
-      // Only if committingSegment's partitionGroup is present in the newPartitionGroupMetadataList, we create new
-      // segment metadata
-      String rawTableName = TableNameBuilder.extractRawTableName(realtimeTableName);
-      long newSegmentCreationTimeMs = getCurrentTimeMs();
-      LLCSegmentName newLLCSegment = new LLCSegmentName(rawTableName, committingSegmentPartitionGroupId,
-          committingLLCSegment.getSequenceNumber() + 1, newSegmentCreationTimeMs);
-      createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegment, newSegmentCreationTimeMs,
-          committingSegmentDescriptor, committingSegmentZKMetadata, instancePartitions, numPartitionGroups, numReplicas,
-          newPartitionGroupMetadataList);
-      newConsumingSegmentName = newLLCSegment.getSegmentName();
+    if (!isTablePaused(idealState)) {
+      int numPartitionGroups;
+      boolean shouldCreateNewConsumingSegment;
+      StreamConfig streamConfig =
+          new StreamConfig(tableConfig.getTableName(), IngestionConfigUtils.getStreamConfigMap(tableConfig));
+      try {
+        numPartitionGroups = getNumPartitionGroups(streamConfig);
+        shouldCreateNewConsumingSegment = numPartitionGroups > committingSegmentPartitionGroupId;
+      } catch (Exception e) {
+        LOGGER.info("Failed to read partition count from stream metadata provider for table: {}, exception: {}. "
+                + "Reading all partition group metadata to determine partition count and partition group status.",
+            realtimeTableName, e.toString());
+        // TODO: Find a better way to determine partition count and if the committing partition group is fully consumed.
+        //       We don't need to read partition group metadata for other partition groups.
+        List<PartitionGroupConsumptionStatus> currentPartitionGroupConsumptionStatusList =
+            getPartitionGroupConsumptionStatusList(idealState, streamConfig);
+        List<PartitionGroupMetadata> newPartitionGroupMetadataList =
+            getNewPartitionGroupMetadataList(streamConfig, currentPartitionGroupConsumptionStatusList);
+        numPartitionGroups = newPartitionGroupMetadataList.size();
+        shouldCreateNewConsumingSegment = false;
+        for (PartitionGroupMetadata newPartitionGroupMetadata : newPartitionGroupMetadataList) {
+          if (newPartitionGroupMetadata.getPartitionGroupId() == committingSegmentPartitionGroupId) {
+            shouldCreateNewConsumingSegment = true;
+            break;
+          }
+        }
+      }
+      if (shouldCreateNewConsumingSegment) {

Review Comment:
   the default implementation in the [StreamMetadataProvider](https://github.com/apache/pinot/blob/a986dd15368c0554902c6cca571d0b60a084b547/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java#L82) assumes partitionIds are 0..partitionCount. That's not the case for our partitions. I just want to make sure that's not a problem here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Optimize segment commit to not read partition group metadata [pinot]

Posted by "Jackie-Jiang (via GitHub)" <gi...@apache.org>.
Jackie-Jiang commented on code in PR #11943:
URL: https://github.com/apache/pinot/pull/11943#discussion_r1397652658


##########
pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java:
##########
@@ -43,6 +45,18 @@ public interface StreamMetadataProvider extends Closeable {
    */
   int fetchPartitionCount(long timeoutMillis);
 
+  /**
+   * Fetches the partition ids for a topic given the stream configs.
+   */
+  default Set<Integer> fetchPartitionIds(long timeoutMillis) {

Review Comment:
   Good point! Just realized that we can directly read the custom partition ids in `KafkaStreamMetadataProvider`. Applied the changes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Optimize segment commit to not read partition group metadata [pinot]

Posted by "Jackie-Jiang (via GitHub)" <gi...@apache.org>.
Jackie-Jiang commented on PR #11943:
URL: https://github.com/apache/pinot/pull/11943#issuecomment-1793246547

   > [Without looking at the code changes] Using the smallest partitionID is because of the algorthm that optimizes the segment size. All partition IDs commit roughly at the same time, so we will not come up with newer values to correct the segment size soon.
   
   Let's say I have 8 partitions, all partitions commit at roughly the same time but partition 0 is committed last, we will lose the segment size tuning for all 7 partitions. This PR changes it so that all segment commit will contribute to the segment size tuning, and IMO it will give better segment size prediction


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Optimize segment commit to not read partition group metadata [pinot]

Posted by "mcvsubbu (via GitHub)" <gi...@apache.org>.
mcvsubbu commented on PR #11943:
URL: https://github.com/apache/pinot/pull/11943#issuecomment-1793263916

   > > [Without looking at the code changes] Using the smallest partitionID is because of the algorthm that optimizes the segment size. All partition IDs commit roughly at the same time, so we will not come up with newer values to correct the segment size soon.
   > 
   > Let's say I have 8 partitions, all partitions commit at roughly the same time but partition 0 is committed last, we will lose the segment size tuning for all 7 partitions. This PR changes it so that all segment commit will contribute to the segment size tuning, and IMO it will give better segment size prediction
   
   No, you want to count only one segment in the group that finishes together. That is because of the formula we use where we pay most attention to the past segments and less attention to the most recent segment size (R_next = R_prev * 0.75 + R_current * 0.25).
   
   (It could be 0.9 and 0.1, not sure).
   
   So, the most recent segment counted 8 times (in your example) will soil the "past" value to be incorrectly weighted as the present value.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Optimize segment commit to not read partition group metadata [pinot]

Posted by "Jackie-Jiang (via GitHub)" <gi...@apache.org>.
Jackie-Jiang commented on PR #11943:
URL: https://github.com/apache/pinot/pull/11943#issuecomment-1793362417

   > No, you want to count only one segment in the group that finishes together. That is because of the formula we use where we pay most attention to the past segments and less attention to the most recent segment size (R_next = R_prev * 0.75 + R_current * 0.25).
   > 
   > (It could be 0.9 and 0.1, not sure).
   > 
   > So, the most recent segment counted 8 times (in your example) will soil the "past" value to be incorrectly weighted as the present value.
   
   The past weight is 0.9 and the current weight is 0.1
   In regular case the ratio for old and new segment should be very close. My point is when data distribution changes, why do we want it to pick up the new pattern so slowly? IMO we want to quickly adjust to the recent data distribution for better prediction. With current algorithm, I can see it gives more stable ratio, but when data distribution changes, it will take ~10 rounds to adjust to the new ratio, which can take multiple days. For certain stream types such as Kinesis, the smallest partition might be fully consumed, which will cause it not be able to update the ratio. Given these drawbacks and extra overhead, I feel counting all segments is a better approach.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Optimize segment commit to not read partition group metadata [pinot]

Posted by "Jackie-Jiang (via GitHub)" <gi...@apache.org>.
Jackie-Jiang merged PR #11943:
URL: https://github.com/apache/pinot/pull/11943


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org