You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by sa...@apache.org on 2023/12/21 23:16:26 UTC
(pinot) branch master updated: Proper computation of realtime "segment.flush.threshold.size" in case of force-commit (#12188)
This is an automated email from the ASF dual-hosted git repository.
sajjad pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 9cfce82385 Proper computation of realtime "segment.flush.threshold.size" in case of force-commit (#12188)
9cfce82385 is described below
commit 9cfce82385dfcd11eed4467c07f2409b76622d57
Author: Sajjad Moradi <mo...@gmail.com>
AuthorDate: Thu Dec 21 15:16:20 2023 -0800
Proper computation of realtime "segment.flush.threshold.size" in case of force-commit (#12188)
---
.../protocols/SegmentCompletionProtocol.java | 2 ++
.../resources/LLCSegmentCompletionHandlers.java | 3 ++-
.../segment/CommittingSegmentDescriptor.java | 10 ++++++++++
.../segment/SegmentFlushThresholdComputer.java | 14 +++++++++----
.../segment/SegmentFlushThresholdComputerTest.java | 23 ++++++++++++++++++++++
.../realtime/RealtimeSegmentDataManager.java | 2 +-
6 files changed, 48 insertions(+), 6 deletions(-)
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java b/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java
index 3b0753919d..1cad2d09f5 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java
@@ -255,6 +255,7 @@ public class SegmentCompletionProtocol {
_memoryUsedBytes = MEMORY_USED_BYTES_DEFAULT;
_segmentSizeBytes = SEGMENT_SIZE_BYTES_DEFAULT;
_streamPartitionMsgOffset = null;
+ _reason = null;
}
public Params(Params params) {
@@ -269,6 +270,7 @@ public class SegmentCompletionProtocol {
_memoryUsedBytes = params.getMemoryUsedBytes();
_segmentSizeBytes = params.getSegmentSizeBytes();
_streamPartitionMsgOffset = params.getStreamPartitionMsgOffset();
+ _reason = params.getReason();
}
@Deprecated
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
index cef0de67b4..993028585c 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
@@ -363,6 +363,7 @@ public class LLCSegmentCompletionHandlers {
@QueryParam(SegmentCompletionProtocol.PARAM_WAIT_TIME_MILLIS) long waitTimeMillis,
@QueryParam(SegmentCompletionProtocol.PARAM_ROW_COUNT) int numRows,
@QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_SIZE_BYTES) long segmentSizeBytes,
+ @QueryParam(SegmentCompletionProtocol.PARAM_REASON) String stopReason,
FormDataMultiPart metadataFiles) {
if (instanceId == null || segmentName == null || segmentLocation == null || metadataFiles == null || (offset == -1
&& streamPartitionMsgOffset == null)) {
@@ -376,7 +377,7 @@ public class LLCSegmentCompletionHandlers {
SegmentCompletionProtocol.Request.Params requestParams = new SegmentCompletionProtocol.Request.Params();
requestParams.withInstanceId(instanceId).withSegmentName(segmentName).withSegmentLocation(segmentLocation)
.withSegmentSizeBytes(segmentSizeBytes).withBuildTimeMillis(buildTimeMillis).withWaitTimeMillis(waitTimeMillis)
- .withNumRows(numRows).withMemoryUsedBytes(memoryUsedBytes);
+ .withNumRows(numRows).withMemoryUsedBytes(memoryUsedBytes).withReason(stopReason);
extractOffsetFromParams(requestParams, streamPartitionMsgOffset, offset);
LOGGER.info("Processing segmentCommitEndWithMetadata:{}", requestParams.toString());
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/CommittingSegmentDescriptor.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/CommittingSegmentDescriptor.java
index c264143024..f26b2d5d39 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/CommittingSegmentDescriptor.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/CommittingSegmentDescriptor.java
@@ -31,6 +31,7 @@ public class CommittingSegmentDescriptor {
private String _segmentLocation;
private String _nextOffset;
private SegmentMetadataImpl _segmentMetadata;
+ private String _stopReason;
public static CommittingSegmentDescriptor fromSegmentCompletionReqParams(
SegmentCompletionProtocol.Request.Params reqParams) {
@@ -38,6 +39,7 @@ public class CommittingSegmentDescriptor {
new CommittingSegmentDescriptor(reqParams.getSegmentName(), reqParams.getStreamPartitionMsgOffset(),
reqParams.getSegmentSizeBytes());
committingSegmentDescriptor.setSegmentLocation(reqParams.getSegmentLocation());
+ committingSegmentDescriptor.setStopReason(reqParams.getReason());
return committingSegmentDescriptor;
}
@@ -95,4 +97,12 @@ public class CommittingSegmentDescriptor {
public void setSegmentMetadata(SegmentMetadataImpl segmentMetadata) {
_segmentMetadata = segmentMetadata;
}
+
+ public String getStopReason() {
+ return _stopReason;
+ }
+
+ public void setStopReason(String stopReason) {
+ _stopReason = stopReason;
+ }
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentFlushThresholdComputer.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentFlushThresholdComputer.java
index 808642a345..2c826dd60e 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentFlushThresholdComputer.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentFlushThresholdComputer.java
@@ -22,9 +22,11 @@ import com.google.common.annotations.VisibleForTesting;
import java.time.Clock;
import javax.annotation.Nullable;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.utils.TimeUtils;
+
class SegmentFlushThresholdComputer {
public static final int MINIMUM_NUM_ROWS_THRESHOLD = 10_000;
static final double CURRENT_SEGMENT_RATIO_WEIGHT = 0.1;
@@ -78,11 +80,15 @@ class SegmentFlushThresholdComputer {
}
final long committingSegmentSizeBytes = committingSegmentDescriptor.getSegmentSizeBytes();
- if (committingSegmentSizeBytes <= 0) { // repair segment case
+ if (committingSegmentSizeBytes <= 0 // repair segment case
+ || SegmentCompletionProtocol.REASON_FORCE_COMMIT_MESSAGE_RECEIVED.equals(
+ committingSegmentDescriptor.getStopReason())) {
+ String reason = committingSegmentSizeBytes <= 0 //
+ ? "Committing segment size is not available" //
+ : "Committing segment is due to force-commit";
final int targetNumRows = committingSegmentZKMetadata.getSizeThresholdToFlushSegment();
- SegmentSizeBasedFlushThresholdUpdater.LOGGER.info(
- "Committing segment size is not available, setting thresholds from previous segment for {} as {}",
- newSegmentName, targetNumRows);
+ SegmentSizeBasedFlushThresholdUpdater.LOGGER.info("{}, setting thresholds from previous segment for {} as {}",
+ reason, newSegmentName, targetNumRows);
return targetNumRows;
}
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentFlushThresholdComputerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentFlushThresholdComputerTest.java
index 9bdc1656a0..a9d1c27221 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentFlushThresholdComputerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentFlushThresholdComputerTest.java
@@ -26,6 +26,7 @@ import org.apache.pinot.spi.stream.StreamConfig;
import org.testng.annotations.Test;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.pinot.common.protocols.SegmentCompletionProtocol.*;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
@@ -105,6 +106,28 @@ public class SegmentFlushThresholdComputerTest {
assertEquals(threshold, segmentSizeThreshold);
}
+ @Test
+ public void testUseLastSegmentsThresholdIfSegmentIsCommittingDueToForceCommit() {
+ long committingSegmentSizeBytes = 500_000L;
+ int committingSegmentSizeThreshold = 25_000;
+ SegmentFlushThresholdComputer computer = new SegmentFlushThresholdComputer();
+
+ CommittingSegmentDescriptor committingSegmentDescriptor = mock(CommittingSegmentDescriptor.class);
+ when(committingSegmentDescriptor.getSegmentSizeBytes()).thenReturn(committingSegmentSizeBytes);
+ when(committingSegmentDescriptor.getStopReason()).thenReturn(REASON_FORCE_COMMIT_MESSAGE_RECEIVED);
+
+ SegmentZKMetadata committingSegmentZKMetadata = mock(SegmentZKMetadata.class);
+ when(committingSegmentZKMetadata.getSizeThresholdToFlushSegment()).thenReturn(committingSegmentSizeThreshold);
+
+ StreamConfig streamConfig = mock(StreamConfig.class);
+
+ int newSegmentSizeThreshold =
+ computer.computeThreshold(streamConfig, committingSegmentDescriptor, committingSegmentZKMetadata,
+ "newSegmentName");
+
+ assertEquals(newSegmentSizeThreshold, committingSegmentSizeThreshold);
+ }
+
@Test
public void testApplyMultiplierToTotalDocsWhenTimeThresholdNotReached() {
long currentTime = 1640216032391L;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index d68c11f8e6..fa1501e868 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -1060,7 +1060,7 @@ public class RealtimeSegmentDataManager extends SegmentDataManager {
SegmentCompletionProtocol.Request.Params params = new SegmentCompletionProtocol.Request.Params();
params.withSegmentName(_segmentNameStr).withStreamPartitionMsgOffset(_currentOffset.toString())
- .withNumRows(_numRowsConsumed).withInstanceId(_instanceId)
+ .withNumRows(_numRowsConsumed).withInstanceId(_instanceId).withReason(_stopReason)
.withBuildTimeMillis(_segmentBuildDescriptor.getBuildTimeMillis())
.withSegmentSizeBytes(_segmentBuildDescriptor.getSegmentSizeBytes())
.withWaitTimeMillis(_segmentBuildDescriptor.getWaitTimeMillis());
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org