You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by sa...@apache.org on 2023/12/21 23:16:26 UTC

(pinot) branch master updated: Proper computation of realtime "segment.flush.threshold.size" in case of force-commit (#12188)

This is an automated email from the ASF dual-hosted git repository.

sajjad pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 9cfce82385 Proper computation of realtime "segment.flush.threshold.size" in case of force-commit (#12188)
9cfce82385 is described below

commit 9cfce82385dfcd11eed4467c07f2409b76622d57
Author: Sajjad Moradi <mo...@gmail.com>
AuthorDate: Thu Dec 21 15:16:20 2023 -0800

    Proper computation of realtime "segment.flush.threshold.size" in case of force-commit (#12188)
---
 .../protocols/SegmentCompletionProtocol.java       |  2 ++
 .../resources/LLCSegmentCompletionHandlers.java    |  3 ++-
 .../segment/CommittingSegmentDescriptor.java       | 10 ++++++++++
 .../segment/SegmentFlushThresholdComputer.java     | 14 +++++++++----
 .../segment/SegmentFlushThresholdComputerTest.java | 23 ++++++++++++++++++++++
 .../realtime/RealtimeSegmentDataManager.java       |  2 +-
 6 files changed, 48 insertions(+), 6 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java b/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java
index 3b0753919d..1cad2d09f5 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java
@@ -255,6 +255,7 @@ public class SegmentCompletionProtocol {
         _memoryUsedBytes = MEMORY_USED_BYTES_DEFAULT;
         _segmentSizeBytes = SEGMENT_SIZE_BYTES_DEFAULT;
         _streamPartitionMsgOffset = null;
+        _reason = null;
       }
 
       public Params(Params params) {
@@ -269,6 +270,7 @@ public class SegmentCompletionProtocol {
         _memoryUsedBytes = params.getMemoryUsedBytes();
         _segmentSizeBytes = params.getSegmentSizeBytes();
         _streamPartitionMsgOffset = params.getStreamPartitionMsgOffset();
+        _reason = params.getReason();
       }
 
       @Deprecated
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
index cef0de67b4..993028585c 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
@@ -363,6 +363,7 @@ public class LLCSegmentCompletionHandlers {
       @QueryParam(SegmentCompletionProtocol.PARAM_WAIT_TIME_MILLIS) long waitTimeMillis,
       @QueryParam(SegmentCompletionProtocol.PARAM_ROW_COUNT) int numRows,
       @QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_SIZE_BYTES) long segmentSizeBytes,
+      @QueryParam(SegmentCompletionProtocol.PARAM_REASON) String stopReason,
       FormDataMultiPart metadataFiles) {
     if (instanceId == null || segmentName == null || segmentLocation == null || metadataFiles == null || (offset == -1
         && streamPartitionMsgOffset == null)) {
@@ -376,7 +377,7 @@ public class LLCSegmentCompletionHandlers {
     SegmentCompletionProtocol.Request.Params requestParams = new SegmentCompletionProtocol.Request.Params();
     requestParams.withInstanceId(instanceId).withSegmentName(segmentName).withSegmentLocation(segmentLocation)
         .withSegmentSizeBytes(segmentSizeBytes).withBuildTimeMillis(buildTimeMillis).withWaitTimeMillis(waitTimeMillis)
-        .withNumRows(numRows).withMemoryUsedBytes(memoryUsedBytes);
+        .withNumRows(numRows).withMemoryUsedBytes(memoryUsedBytes).withReason(stopReason);
     extractOffsetFromParams(requestParams, streamPartitionMsgOffset, offset);
     LOGGER.info("Processing segmentCommitEndWithMetadata:{}", requestParams.toString());
 
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/CommittingSegmentDescriptor.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/CommittingSegmentDescriptor.java
index c264143024..f26b2d5d39 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/CommittingSegmentDescriptor.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/CommittingSegmentDescriptor.java
@@ -31,6 +31,7 @@ public class CommittingSegmentDescriptor {
   private String _segmentLocation;
   private String _nextOffset;
   private SegmentMetadataImpl _segmentMetadata;
+  private String _stopReason;
 
   public static CommittingSegmentDescriptor fromSegmentCompletionReqParams(
       SegmentCompletionProtocol.Request.Params reqParams) {
@@ -38,6 +39,7 @@ public class CommittingSegmentDescriptor {
         new CommittingSegmentDescriptor(reqParams.getSegmentName(), reqParams.getStreamPartitionMsgOffset(),
             reqParams.getSegmentSizeBytes());
     committingSegmentDescriptor.setSegmentLocation(reqParams.getSegmentLocation());
+    committingSegmentDescriptor.setStopReason(reqParams.getReason());
     return committingSegmentDescriptor;
   }
 
@@ -95,4 +97,12 @@ public class CommittingSegmentDescriptor {
   public void setSegmentMetadata(SegmentMetadataImpl segmentMetadata) {
     _segmentMetadata = segmentMetadata;
   }
+
+  public String getStopReason() {
+    return _stopReason;
+  }
+
+  public void setStopReason(String stopReason) {
+    _stopReason = stopReason;
+  }
 }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentFlushThresholdComputer.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentFlushThresholdComputer.java
index 808642a345..2c826dd60e 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentFlushThresholdComputer.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentFlushThresholdComputer.java
@@ -22,9 +22,11 @@ import com.google.common.annotations.VisibleForTesting;
 import java.time.Clock;
 import javax.annotation.Nullable;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
 import org.apache.pinot.spi.stream.StreamConfig;
 import org.apache.pinot.spi.utils.TimeUtils;
 
+
 class SegmentFlushThresholdComputer {
   public static final int MINIMUM_NUM_ROWS_THRESHOLD = 10_000;
   static final double CURRENT_SEGMENT_RATIO_WEIGHT = 0.1;
@@ -78,11 +80,15 @@ class SegmentFlushThresholdComputer {
     }
 
     final long committingSegmentSizeBytes = committingSegmentDescriptor.getSegmentSizeBytes();
-    if (committingSegmentSizeBytes <= 0) { // repair segment case
+    if (committingSegmentSizeBytes <= 0 // repair segment case
+        || SegmentCompletionProtocol.REASON_FORCE_COMMIT_MESSAGE_RECEIVED.equals(
+        committingSegmentDescriptor.getStopReason())) {
+      String reason = committingSegmentSizeBytes <= 0 //
+          ? "Committing segment size is not available" //
+          : "Committing segment is due to force-commit";
       final int targetNumRows = committingSegmentZKMetadata.getSizeThresholdToFlushSegment();
-      SegmentSizeBasedFlushThresholdUpdater.LOGGER.info(
-          "Committing segment size is not available, setting thresholds from previous segment for {} as {}",
-          newSegmentName, targetNumRows);
+      SegmentSizeBasedFlushThresholdUpdater.LOGGER.info("{}, setting thresholds from previous segment for {} as {}",
+          reason, newSegmentName, targetNumRows);
       return targetNumRows;
     }
 
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentFlushThresholdComputerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentFlushThresholdComputerTest.java
index 9bdc1656a0..a9d1c27221 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentFlushThresholdComputerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentFlushThresholdComputerTest.java
@@ -26,6 +26,7 @@ import org.apache.pinot.spi.stream.StreamConfig;
 import org.testng.annotations.Test;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.pinot.common.protocols.SegmentCompletionProtocol.*;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
@@ -105,6 +106,28 @@ public class SegmentFlushThresholdComputerTest {
     assertEquals(threshold, segmentSizeThreshold);
   }
 
+  @Test
+  public void testUseLastSegmentsThresholdIfSegmentIsCommittingDueToForceCommit() {
+    long committingSegmentSizeBytes = 500_000L;
+    int committingSegmentSizeThreshold = 25_000;
+    SegmentFlushThresholdComputer computer = new SegmentFlushThresholdComputer();
+
+    CommittingSegmentDescriptor committingSegmentDescriptor = mock(CommittingSegmentDescriptor.class);
+    when(committingSegmentDescriptor.getSegmentSizeBytes()).thenReturn(committingSegmentSizeBytes);
+    when(committingSegmentDescriptor.getStopReason()).thenReturn(REASON_FORCE_COMMIT_MESSAGE_RECEIVED);
+
+    SegmentZKMetadata committingSegmentZKMetadata = mock(SegmentZKMetadata.class);
+    when(committingSegmentZKMetadata.getSizeThresholdToFlushSegment()).thenReturn(committingSegmentSizeThreshold);
+
+    StreamConfig streamConfig = mock(StreamConfig.class);
+
+    int newSegmentSizeThreshold =
+        computer.computeThreshold(streamConfig, committingSegmentDescriptor, committingSegmentZKMetadata,
+            "newSegmentName");
+
+    assertEquals(newSegmentSizeThreshold, committingSegmentSizeThreshold);
+  }
+
   @Test
   public void testApplyMultiplierToTotalDocsWhenTimeThresholdNotReached() {
     long currentTime = 1640216032391L;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index d68c11f8e6..fa1501e868 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -1060,7 +1060,7 @@ public class RealtimeSegmentDataManager extends SegmentDataManager {
     SegmentCompletionProtocol.Request.Params params = new SegmentCompletionProtocol.Request.Params();
 
     params.withSegmentName(_segmentNameStr).withStreamPartitionMsgOffset(_currentOffset.toString())
-        .withNumRows(_numRowsConsumed).withInstanceId(_instanceId)
+        .withNumRows(_numRowsConsumed).withInstanceId(_instanceId).withReason(_stopReason)
         .withBuildTimeMillis(_segmentBuildDescriptor.getBuildTimeMillis())
         .withSegmentSizeBytes(_segmentBuildDescriptor.getSegmentSizeBytes())
         .withWaitTimeMillis(_segmentBuildDescriptor.getWaitTimeMillis());


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org