You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by kh...@apache.org on 2021/10/22 05:19:01 UTC
[pinot] branch master updated: Add support for lowest partition id
in case partition id 0 is not available (#7066)
This is an automated email from the ASF dual-hosted git repository.
kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 1ee3c02 Add support for lowest partition id in case partition id 0 is not available (#7066)
1ee3c02 is described below
commit 1ee3c020fa421a44f7f199299cc6c400216a972d
Author: Kartik Khare <kh...@gmail.com>
AuthorDate: Fri Oct 22 10:47:43 2021 +0530
Add support for lowest partition id in case partition id 0 is not available (#7066)
* Add support for lowest partition id in case partition id 0 is not available
* Remove unused import
* Add complete partition metadata list instead of just current partition
* Add new test for min partition data instead of partition 0
* Fix linting
* Fix linting
* edit comment to reflect new partition id calculation
* Update method for easy int comparison
---
.../realtime/PinotLLCRealtimeSegmentManager.java | 69 +++++++--------
.../segment/DefaultFlushThresholdUpdater.java | 9 +-
.../realtime/segment/FlushThresholdUpdater.java | 7 +-
.../SegmentSizeBasedFlushThresholdUpdater.java | 15 +++-
.../segment/FlushThresholdUpdaterTest.java | 97 +++++++++++++++++-----
5 files changed, 130 insertions(+), 67 deletions(-)
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 6e72249..7f6996a 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -142,8 +142,8 @@ public class PinotLLCRealtimeSegmentManager {
* check the segment expiration time to see if it is about to be deleted (i.e. less than this threshold). Skip the
* deep store fix if necessary. RetentionManager will delete this kind of segments shortly anyway.
*/
- private static final long MIN_TIME_BEFORE_SEGMENT_EXPIRATION_FOR_FIXING_DEEP_STORE_COPY_MILLIS =
- 60 * 60 * 1000L; // 1 hour
+ private static final long MIN_TIME_BEFORE_SEGMENT_EXPIRATION_FOR_FIXING_DEEP_STORE_COPY_MILLIS = 60 * 60 * 1000L;
+ // 1 hour
private static final Random RANDOM = new Random();
private final HelixAdmin _helixAdmin;
@@ -182,8 +182,7 @@ public class PinotLLCRealtimeSegmentManager {
}
_tableConfigCache = new TableConfigCache(_propertyStore);
_flushThresholdUpdateManager = new FlushThresholdUpdateManager();
- _isDeepStoreLLCSegmentUploadRetryEnabled =
- controllerConf.isDeepStoreRetryUploadLLCSegmentEnabled();
+ _isDeepStoreLLCSegmentUploadRetryEnabled = controllerConf.isDeepStoreRetryUploadLLCSegmentEnabled();
if (_isDeepStoreLLCSegmentUploadRetryEnabled) {
_fileUploadDownloadClient = initFileUploadDownloadClient();
}
@@ -317,7 +316,7 @@ public class PinotLLCRealtimeSegmentManager {
for (PartitionGroupMetadata partitionGroupMetadata : newPartitionGroupMetadataList) {
String segmentName =
setupNewPartitionGroup(tableConfig, streamConfig, partitionGroupMetadata, currentTimeMs, instancePartitions,
- numPartitionGroups, numReplicas);
+ numPartitionGroups, numReplicas, newPartitionGroupMetadataList);
updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null, segmentName, segmentAssignment,
instancePartitionsMap);
@@ -564,8 +563,8 @@ public class PinotLLCRealtimeSegmentManager {
LLCSegmentName newLLCSegment = new LLCSegmentName(rawTableName, committingSegmentPartitionGroupId,
committingLLCSegment.getSequenceNumber() + 1, newSegmentCreationTimeMs);
createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegment, newSegmentCreationTimeMs,
- committingSegmentDescriptor, committingSegmentZKMetadata, instancePartitions, numPartitionGroups,
- numReplicas);
+ committingSegmentDescriptor, committingSegmentZKMetadata, instancePartitions, numPartitionGroups, numReplicas,
+ newPartitionGroupMetadataList);
newConsumingSegmentName = newLLCSegment.getSegmentName();
}
@@ -663,7 +662,7 @@ public class PinotLLCRealtimeSegmentManager {
private void createNewSegmentZKMetadata(TableConfig tableConfig, PartitionLevelStreamConfig streamConfig,
LLCSegmentName newLLCSegmentName, long creationTimeMs, CommittingSegmentDescriptor committingSegmentDescriptor,
@Nullable SegmentZKMetadata committingSegmentZKMetadata, InstancePartitions instancePartitions,
- int numPartitionGroups, int numReplicas) {
+ int numPartitionGroups, int numReplicas, List<PartitionGroupMetadata> partitionGroupMetadataList) {
String realtimeTableName = tableConfig.getTableName();
String segmentName = newLLCSegmentName.getSegmentName();
String startOffset = committingSegmentDescriptor.getNextOffset();
@@ -692,7 +691,8 @@ public class PinotLLCRealtimeSegmentManager {
FlushThresholdUpdater flushThresholdUpdater = _flushThresholdUpdateManager.getFlushThresholdUpdater(streamConfig);
flushThresholdUpdater.updateFlushThreshold(streamConfig, newSegmentZKMetadata, committingSegmentDescriptor,
committingSegmentZKMetadata,
- getMaxNumPartitionsPerInstance(instancePartitions, numPartitionGroups, numReplicas));
+ getMaxNumPartitionsPerInstance(instancePartitions, numPartitionGroups, numReplicas),
+ partitionGroupMetadataList);
persistSegmentZKMetadata(realtimeTableName, newSegmentZKMetadata, -1);
}
@@ -1089,7 +1089,8 @@ public class PinotLLCRealtimeSegmentManager {
new CommittingSegmentDescriptor(latestSegmentName,
(offsetFactory.create(latestSegmentZKMetadata.getEndOffset()).toString()), 0);
createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName, currentTimeMs,
- committingSegmentDescriptor, latestSegmentZKMetadata, instancePartitions, numPartitions, numReplicas);
+ committingSegmentDescriptor, latestSegmentZKMetadata, instancePartitions, numPartitions, numReplicas,
+ newPartitionGroupMetadataList);
updateInstanceStatesForNewConsumingSegment(instanceStatesMap, latestSegmentName, newSegmentName,
segmentAssignment, instancePartitionsMap);
} else { // partition group reached end of life
@@ -1129,7 +1130,8 @@ public class PinotLLCRealtimeSegmentManager {
CommittingSegmentDescriptor committingSegmentDescriptor =
new CommittingSegmentDescriptor(latestSegmentName, startOffset.toString(), 0);
createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName, currentTimeMs,
- committingSegmentDescriptor, latestSegmentZKMetadata, instancePartitions, numPartitions, numReplicas);
+ committingSegmentDescriptor, latestSegmentZKMetadata, instancePartitions, numPartitions, numReplicas,
+ newPartitionGroupMetadataList);
String newSegmentName = newLLCSegmentName.getSegmentName();
updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null, newSegmentName, segmentAssignment,
instancePartitionsMap);
@@ -1191,7 +1193,7 @@ public class PinotLLCRealtimeSegmentManager {
if (!latestSegmentZKMetadataMap.containsKey(partitionGroupId)) {
String newSegmentName =
setupNewPartitionGroup(tableConfig, streamConfig, partitionGroupMetadata, currentTimeMs, instancePartitions,
- numPartitions, numReplicas);
+ numPartitions, numReplicas, newPartitionGroupMetadataList);
updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null, newSegmentName, segmentAssignment,
instancePartitionsMap);
}
@@ -1230,7 +1232,7 @@ public class PinotLLCRealtimeSegmentManager {
*/
private String setupNewPartitionGroup(TableConfig tableConfig, PartitionLevelStreamConfig streamConfig,
PartitionGroupMetadata partitionGroupMetadata, long creationTimeMs, InstancePartitions instancePartitions,
- int numPartitionGroups, int numReplicas) {
+ int numPartitionGroups, int numReplicas, List<PartitionGroupMetadata> partitionGroupMetadataList) {
String realtimeTableName = tableConfig.getTableName();
int partitionGroupId = partitionGroupMetadata.getPartitionGroupId();
String startOffset = partitionGroupMetadata.getStartOffset().toString();
@@ -1243,7 +1245,8 @@ public class PinotLLCRealtimeSegmentManager {
CommittingSegmentDescriptor committingSegmentDescriptor = new CommittingSegmentDescriptor(null, startOffset, 0);
createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName, creationTimeMs,
- committingSegmentDescriptor, null, instancePartitions, numPartitionGroups, numReplicas);
+ committingSegmentDescriptor, null, instancePartitions, numPartitionGroups, numReplicas,
+ partitionGroupMetadataList);
return newSegmentName;
}
@@ -1303,8 +1306,7 @@ public class PinotLLCRealtimeSegmentManager {
String realtimeTableName = tableConfig.getTableName();
if (_isStopping) {
- LOGGER.info(
- "Skipped fixing deep store copy of LLC segments for table {}, because segment manager is stopping.",
+ LOGGER.info("Skipped fixing deep store copy of LLC segments for table {}, because segment manager is stopping.",
realtimeTableName);
return;
}
@@ -1314,11 +1316,9 @@ public class PinotLLCRealtimeSegmentManager {
SegmentsValidationAndRetentionConfig validationConfig = tableConfig.getValidationConfig();
if (validationConfig.getRetentionTimeUnit() != null && !validationConfig.getRetentionTimeUnit().isEmpty()
&& validationConfig.getRetentionTimeValue() != null && !validationConfig.getRetentionTimeValue().isEmpty()) {
- long retentionMs =
- TimeUnit.valueOf(validationConfig.getRetentionTimeUnit().toUpperCase())
- .toMillis(Long.parseLong(validationConfig.getRetentionTimeValue()));
- retentionStrategy = new TimeRetentionStrategy(
- TimeUnit.MILLISECONDS,
+ long retentionMs = TimeUnit.valueOf(validationConfig.getRetentionTimeUnit().toUpperCase())
+ .toMillis(Long.parseLong(validationConfig.getRetentionTimeValue()));
+ retentionStrategy = new TimeRetentionStrategy(TimeUnit.MILLISECONDS,
retentionMs - MIN_TIME_BEFORE_SEGMENT_EXPIRATION_FOR_FIXING_DEEP_STORE_COPY_MILLIS);
}
@@ -1342,40 +1342,35 @@ public class PinotLLCRealtimeSegmentManager {
}
// Skip the fix for the segment if it is already out of retention.
if (retentionStrategy != null && retentionStrategy.isPurgeable(realtimeTableName, segmentZKMetadata)) {
- LOGGER.info("Skipped deep store uploading of LLC segment {} which is already out of retention",
- segmentName);
+ LOGGER.info("Skipped deep store uploading of LLC segment {} which is already out of retention", segmentName);
continue;
}
LOGGER.info("Fixing LLC segment {} whose deep store copy is unavailable", segmentName);
// Find servers which have online replica
- List<URI> peerSegmentURIs = PeerServerSegmentFinder
- .getPeerServerURIs(segmentName, CommonConstants.HTTP_PROTOCOL, _helixManager);
+ List<URI> peerSegmentURIs =
+ PeerServerSegmentFinder.getPeerServerURIs(segmentName, CommonConstants.HTTP_PROTOCOL, _helixManager);
if (peerSegmentURIs.isEmpty()) {
- throw new IllegalStateException(
- String.format(
- "Failed to upload segment %s to deep store because no online replica is found",
- segmentName));
+ throw new IllegalStateException(String
+ .format("Failed to upload segment %s to deep store because no online replica is found", segmentName));
}
// Randomly ask one server to upload
URI uri = peerSegmentURIs.get(RANDOM.nextInt(peerSegmentURIs.size()));
String serverUploadRequestUrl = StringUtil.join("/", uri.toString(), "upload");
- LOGGER.info(
- "Ask server to upload LLC segment {} to deep store by this path: {}",
- segmentName, serverUploadRequestUrl);
+ LOGGER.info("Ask server to upload LLC segment {} to deep store by this path: {}", segmentName,
+ serverUploadRequestUrl);
String segmentDownloadUrl = _fileUploadDownloadClient.uploadToSegmentStore(serverUploadRequestUrl);
LOGGER.info("Updating segment {} download url in ZK to be {}", segmentName, segmentDownloadUrl);
// Update segment ZK metadata by adding the download URL
segmentZKMetadata.setDownloadUrl(segmentDownloadUrl);
// TODO: add version check when persist segment ZK metadata
persistSegmentZKMetadata(realtimeTableName, segmentZKMetadata, -1);
- LOGGER.info(
- "Successfully uploaded LLC segment {} to deep store with download url: {}",
- segmentName, segmentDownloadUrl);
+ LOGGER.info("Successfully uploaded LLC segment {} to deep store with download url: {}", segmentName,
+ segmentDownloadUrl);
} catch (Exception e) {
- _controllerMetrics.addMeteredTableValue(realtimeTableName,
- ControllerMeter.LLC_SEGMENTS_DEEP_STORE_UPLOAD_RETRY_ERROR, 1L);
+ _controllerMetrics
+ .addMeteredTableValue(realtimeTableName, ControllerMeter.LLC_SEGMENTS_DEEP_STORE_UPLOAD_RETRY_ERROR, 1L);
LOGGER.error("Failed to upload segment {} to deep store", segmentName, e);
}
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/DefaultFlushThresholdUpdater.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/DefaultFlushThresholdUpdater.java
index 3c1d27b..9c1ddea 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/DefaultFlushThresholdUpdater.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/DefaultFlushThresholdUpdater.java
@@ -19,8 +19,10 @@
package org.apache.pinot.controller.helix.core.realtime.segment;
import com.google.common.annotations.VisibleForTesting;
+import java.util.List;
import javax.annotation.Nullable;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.spi.stream.PartitionGroupMetadata;
import org.apache.pinot.spi.stream.PartitionLevelStreamConfig;
@@ -36,9 +38,10 @@ public class DefaultFlushThresholdUpdater implements FlushThresholdUpdater {
}
@Override
- public void updateFlushThreshold(PartitionLevelStreamConfig streamConfig, SegmentZKMetadata newSegmentZKMetadata,
- CommittingSegmentDescriptor committingSegmentDescriptor, @Nullable SegmentZKMetadata committingSegmentZKMetadata,
- int maxNumPartitionsPerInstance) {
+ public void updateFlushThreshold(PartitionLevelStreamConfig streamConfig,
+ SegmentZKMetadata newSegmentZKMetadata, CommittingSegmentDescriptor committingSegmentDescriptor,
+ @Nullable SegmentZKMetadata committingSegmentZKMetadata, int maxNumPartitionsPerInstance,
+ List<PartitionGroupMetadata> partitionGroupMetadataList) {
// Configure the segment size flush limit based on the maximum number of partitions allocated to an instance
newSegmentZKMetadata.setSizeThresholdToFlushSegment(_tableFlushSize / maxNumPartitionsPerInstance);
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdater.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdater.java
index 379b3b3..c10cbab 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdater.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdater.java
@@ -18,8 +18,10 @@
*/
package org.apache.pinot.controller.helix.core.realtime.segment;
+import java.util.List;
import javax.annotation.Nullable;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.spi.stream.PartitionGroupMetadata;
import org.apache.pinot.spi.stream.PartitionLevelStreamConfig;
@@ -33,6 +35,7 @@ public interface FlushThresholdUpdater {
* Updates the flush threshold for the given segment ZK metadata
*/
void updateFlushThreshold(PartitionLevelStreamConfig streamConfig, SegmentZKMetadata newSegmentZKMetadata,
- CommittingSegmentDescriptor committingSegmentDescriptor, @Nullable SegmentZKMetadata committingSegmentZKMetadata,
- int maxNumPartitionsPerInstance);
+ CommittingSegmentDescriptor committingSegmentDescriptor,
+ @Nullable SegmentZKMetadata committingSegmentZKMetadata, int maxNumPartitionsPerInstance,
+ List<PartitionGroupMetadata> partitionGroupMetadataList);
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentSizeBasedFlushThresholdUpdater.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentSizeBasedFlushThresholdUpdater.java
index 61b2cda..7372db1 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentSizeBasedFlushThresholdUpdater.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentSizeBasedFlushThresholdUpdater.java
@@ -19,9 +19,11 @@
package org.apache.pinot.controller.helix.core.realtime.segment;
import com.google.common.annotations.VisibleForTesting;
+import java.util.List;
import javax.annotation.Nullable;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.spi.stream.PartitionGroupMetadata;
import org.apache.pinot.spi.stream.PartitionLevelStreamConfig;
import org.apache.pinot.spi.utils.TimeUtils;
import org.slf4j.Logger;
@@ -56,7 +58,8 @@ public class SegmentSizeBasedFlushThresholdUpdater implements FlushThresholdUpda
@Override
public synchronized void updateFlushThreshold(PartitionLevelStreamConfig streamConfig,
SegmentZKMetadata newSegmentZKMetadata, CommittingSegmentDescriptor committingSegmentDescriptor,
- @Nullable SegmentZKMetadata committingSegmentZKMetadata, int maxNumPartitionsPerInstance) {
+ @Nullable SegmentZKMetadata committingSegmentZKMetadata, int maxNumPartitionsPerInstance,
+ List<PartitionGroupMetadata> partitionGroupMetadataList) {
final long desiredSegmentSizeBytes = streamConfig.getFlushThresholdSegmentSizeBytes();
final long timeThresholdMillis = streamConfig.getFlushThresholdTimeMillis();
final int autotuneInitialRows = streamConfig.getFlushAutotuneInitialRows();
@@ -97,14 +100,18 @@ public class SegmentSizeBasedFlushThresholdUpdater implements FlushThresholdUpda
committingSegmentSizeBytes);
double currentRatio = (double) numRowsConsumed / committingSegmentSizeBytes;
- // Compute segment size to rows ratio only from partition 0.
+ // Compute segment size to rows ratio only from the lowest available partition id.
// If we consider all partitions then it is likely that we will assign a much higher weight to the most
// recent trend in the table (since it is usually true that all partitions of the same table follow more or
// less same characteristics at any one point in time).
// However, when we start a new table or change controller mastership, we can have any partition completing first.
// It is best to learn the ratio as quickly as we can, so we allow any partition to supply the value.
- // FIXME: The stream may not have partition "0"
- if (new LLCSegmentName(newSegmentName).getPartitionGroupId() == 0 || _latestSegmentRowsToSizeRatio == 0) {
+ int smallestAvailablePartitionGroupId =
+ partitionGroupMetadataList.stream().mapToInt(PartitionGroupMetadata::getPartitionGroupId).min()
+ .orElseGet(() -> 0);
+
+ if (new LLCSegmentName(newSegmentName).getPartitionGroupId() == smallestAvailablePartitionGroupId
+ || _latestSegmentRowsToSizeRatio == 0) {
if (_latestSegmentRowsToSizeRatio > 0) {
_latestSegmentRowsToSizeRatio =
CURRENT_SEGMENT_RATIO_WEIGHT * currentRatio + PREVIOUS_SEGMENT_RATIO_WEIGHT * _latestSegmentRowsToSizeRatio;
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdaterTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdaterTest.java
index 841c575..b24f7ef 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdaterTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdaterTest.java
@@ -18,10 +18,14 @@
*/
package org.apache.pinot.controller.helix.core.realtime.segment;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.spi.stream.LongMsgOffset;
+import org.apache.pinot.spi.stream.PartitionGroupMetadata;
import org.apache.pinot.spi.stream.PartitionLevelStreamConfig;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
@@ -132,7 +136,8 @@ public class FlushThresholdUpdaterTest {
SegmentZKMetadata newSegmentZKMetadata = getNewSegmentZKMetadata(0);
CommittingSegmentDescriptor committingSegmentDescriptor = getCommittingSegmentDescriptor(0L);
flushThresholdUpdater
- .updateFlushThreshold(streamConfig, newSegmentZKMetadata, committingSegmentDescriptor, null, 1);
+ .updateFlushThreshold(streamConfig, newSegmentZKMetadata, committingSegmentDescriptor, null, 1,
+ Collections.emptyList());
assertEquals(newSegmentZKMetadata.getSizeThresholdToFlushSegment(), streamConfig.getFlushAutotuneInitialRows());
int numRuns = 500;
@@ -144,7 +149,45 @@ public class FlushThresholdUpdaterTest {
SegmentZKMetadata committingSegmentZKMetadata =
getCommittingSegmentZKMetadata(System.currentTimeMillis(), numRowsConsumed, numRowsConsumed);
flushThresholdUpdater.updateFlushThreshold(streamConfig, newSegmentZKMetadata, committingSegmentDescriptor,
- committingSegmentZKMetadata, 1);
+ committingSegmentZKMetadata, 1, Collections.emptyList());
+
+ // Assert that segment size is in limits
+ if (run > checkRunsAfter) {
+ assertTrue(segmentSizeBytes > segmentSizeLowerLimit && segmentSizeBytes < segmentSizeHigherLimit);
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testSegmentSizeBasedFlushThresholdMinPartition() {
+ PartitionLevelStreamConfig streamConfig = mockDefaultAutotuneStreamConfig();
+ long desiredSegmentSizeBytes = streamConfig.getFlushThresholdSegmentSizeBytes();
+ long segmentSizeLowerLimit = (long) (desiredSegmentSizeBytes * 0.99);
+ long segmentSizeHigherLimit = (long) (desiredSegmentSizeBytes * 1.01);
+
+ for (long[] segmentSizesMB : Arrays
+ .asList(EXPONENTIAL_GROWTH_SEGMENT_SIZES_MB, LOGARITHMIC_GROWTH_SEGMENT_SIZES_MB, STEPS_SEGMENT_SIZES_MB)) {
+ SegmentSizeBasedFlushThresholdUpdater flushThresholdUpdater = new SegmentSizeBasedFlushThresholdUpdater();
+
+ // Start consumption
+ SegmentZKMetadata newSegmentZKMetadata = getNewSegmentZKMetadata(1);
+ CommittingSegmentDescriptor committingSegmentDescriptor = getCommittingSegmentDescriptor(0L);
+ flushThresholdUpdater
+ .updateFlushThreshold(streamConfig, newSegmentZKMetadata, committingSegmentDescriptor, null, 1,
+ getPartitionGroupMetadataList(3, 1));
+ assertEquals(newSegmentZKMetadata.getSizeThresholdToFlushSegment(), streamConfig.getFlushAutotuneInitialRows());
+
+ int numRuns = 500;
+ int checkRunsAfter = 400;
+ for (int run = 0; run < numRuns; run++) {
+ int numRowsConsumed = newSegmentZKMetadata.getSizeThresholdToFlushSegment();
+ long segmentSizeBytes = getSegmentSizeBytes(numRowsConsumed, segmentSizesMB);
+ committingSegmentDescriptor = getCommittingSegmentDescriptor(segmentSizeBytes);
+ SegmentZKMetadata committingSegmentZKMetadata =
+ getCommittingSegmentZKMetadata(System.currentTimeMillis(), numRowsConsumed, numRowsConsumed);
+ flushThresholdUpdater.updateFlushThreshold(streamConfig, newSegmentZKMetadata, committingSegmentDescriptor,
+ committingSegmentZKMetadata, 1, getPartitionGroupMetadataList(3, 1));
// Assert that segment size is in limits
if (run > checkRunsAfter) {
@@ -159,6 +202,16 @@ public class FlushThresholdUpdaterTest {
new LLCSegmentName(RAW_TABLE_NAME, partitionId, 0, System.currentTimeMillis()).getSegmentName());
}
+ private List<PartitionGroupMetadata> getPartitionGroupMetadataList(int numPartitions, int startPartitionId) {
+ List<PartitionGroupMetadata> newPartitionGroupMetadataList = new ArrayList<>();
+
+ for (int i = 0; i < numPartitions; i++) {
+ newPartitionGroupMetadataList.add(new PartitionGroupMetadata(startPartitionId + i, null));
+ }
+
+ return newPartitionGroupMetadataList;
+ }
+
private CommittingSegmentDescriptor getCommittingSegmentDescriptor(long segmentSizeBytes) {
return new CommittingSegmentDescriptor(null, new LongMsgOffset(0).toString(), segmentSizeBytes);
}
@@ -192,8 +245,8 @@ public class FlushThresholdUpdaterTest {
// Start consumption
SegmentZKMetadata newSegmentZKMetadata = getNewSegmentZKMetadata(0);
CommittingSegmentDescriptor committingSegmentDescriptor = getCommittingSegmentDescriptor(0L);
- flushThresholdUpdater
- .updateFlushThreshold(streamConfig, newSegmentZKMetadata, committingSegmentDescriptor, null, 1);
+ flushThresholdUpdater.updateFlushThreshold(streamConfig, newSegmentZKMetadata, committingSegmentDescriptor, null, 1,
+ Collections.emptyList());
int sizeThreshold = newSegmentZKMetadata.getSizeThresholdToFlushSegment();
// First segment consumes rows less than the threshold
@@ -202,7 +255,7 @@ public class FlushThresholdUpdaterTest {
SegmentZKMetadata committingSegmentZKMetadata =
getCommittingSegmentZKMetadata(System.currentTimeMillis(), sizeThreshold, numRowsConsumed);
flushThresholdUpdater.updateFlushThreshold(streamConfig, newSegmentZKMetadata, committingSegmentDescriptor,
- committingSegmentZKMetadata, 1);
+ committingSegmentZKMetadata, 1, Collections.emptyList());
sizeThreshold = newSegmentZKMetadata.getSizeThresholdToFlushSegment();
assertEquals(sizeThreshold,
(int) (numRowsConsumed * SegmentSizeBasedFlushThresholdUpdater.ROWS_MULTIPLIER_WHEN_TIME_THRESHOLD_HIT));
@@ -212,7 +265,7 @@ public class FlushThresholdUpdaterTest {
committingSegmentZKMetadata =
getCommittingSegmentZKMetadata(System.currentTimeMillis(), sizeThreshold, numRowsConsumed);
flushThresholdUpdater.updateFlushThreshold(streamConfig, newSegmentZKMetadata, committingSegmentDescriptor,
- committingSegmentZKMetadata, 1);
+ committingSegmentZKMetadata, 1, Collections.emptyList());
assertNotEquals(newSegmentZKMetadata.getSizeThresholdToFlushSegment(),
(int) (numRowsConsumed * SegmentSizeBasedFlushThresholdUpdater.ROWS_MULTIPLIER_WHEN_TIME_THRESHOLD_HIT));
}
@@ -225,8 +278,8 @@ public class FlushThresholdUpdaterTest {
// Start consumption
SegmentZKMetadata newSegmentZKMetadata = getNewSegmentZKMetadata(0);
CommittingSegmentDescriptor committingSegmentDescriptor = getCommittingSegmentDescriptor(0L);
- flushThresholdUpdater
- .updateFlushThreshold(streamConfig, newSegmentZKMetadata, committingSegmentDescriptor, null, 1);
+ flushThresholdUpdater.updateFlushThreshold(streamConfig, newSegmentZKMetadata, committingSegmentDescriptor, null, 1,
+ Collections.emptyList());
int sizeThreshold = newSegmentZKMetadata.getSizeThresholdToFlushSegment();
// First segment only consumed 15 rows, so next segment should have size threshold of 10_000
@@ -235,7 +288,7 @@ public class FlushThresholdUpdaterTest {
SegmentZKMetadata committingSegmentZKMetadata =
getCommittingSegmentZKMetadata(System.currentTimeMillis(), sizeThreshold, numRowsConsumed);
flushThresholdUpdater.updateFlushThreshold(streamConfig, newSegmentZKMetadata, committingSegmentDescriptor,
- committingSegmentZKMetadata, 1);
+ committingSegmentZKMetadata, 1, Collections.emptyList());
sizeThreshold = newSegmentZKMetadata.getSizeThresholdToFlushSegment();
assertEquals(sizeThreshold, SegmentSizeBasedFlushThresholdUpdater.MINIMUM_NUM_ROWS_THRESHOLD);
@@ -244,7 +297,7 @@ public class FlushThresholdUpdaterTest {
committingSegmentZKMetadata =
getCommittingSegmentZKMetadata(System.currentTimeMillis(), sizeThreshold, numRowsConsumed);
flushThresholdUpdater.updateFlushThreshold(streamConfig, newSegmentZKMetadata, committingSegmentDescriptor,
- committingSegmentZKMetadata, 1);
+ committingSegmentZKMetadata, 1, Collections.emptyList());
sizeThreshold = newSegmentZKMetadata.getSizeThresholdToFlushSegment();
assertEquals(sizeThreshold, SegmentSizeBasedFlushThresholdUpdater.MINIMUM_NUM_ROWS_THRESHOLD);
}
@@ -259,9 +312,11 @@ public class FlushThresholdUpdaterTest {
SegmentZKMetadata newSegmentZKMetadataForPartition1 = getNewSegmentZKMetadata(1);
CommittingSegmentDescriptor committingSegmentDescriptor = getCommittingSegmentDescriptor(0L);
flushThresholdUpdater
- .updateFlushThreshold(streamConfig, newSegmentZKMetadataForPartition0, committingSegmentDescriptor, null, 1);
+ .updateFlushThreshold(streamConfig, newSegmentZKMetadataForPartition0, committingSegmentDescriptor, null, 1,
+ Collections.emptyList());
flushThresholdUpdater
- .updateFlushThreshold(streamConfig, newSegmentZKMetadataForPartition1, committingSegmentDescriptor, null, 1);
+ .updateFlushThreshold(streamConfig, newSegmentZKMetadataForPartition1, committingSegmentDescriptor, null, 1,
+ Collections.emptyList());
int sizeThresholdForPartition0 = newSegmentZKMetadataForPartition0.getSizeThresholdToFlushSegment();
int sizeThresholdForPartition1 = newSegmentZKMetadataForPartition1.getSizeThresholdToFlushSegment();
double sizeRatio = flushThresholdUpdater.getLatestSegmentRowsToSizeRatio();
@@ -276,7 +331,7 @@ public class FlushThresholdUpdaterTest {
sizeThresholdForPartition1);
flushThresholdUpdater
.updateFlushThreshold(streamConfig, newSegmentZKMetadataForPartition1, committingSegmentDescriptor,
- committingSegmentZKMetadata, 1);
+ committingSegmentZKMetadata, 1, Collections.emptyList());
sizeThresholdForPartition1 = newSegmentZKMetadataForPartition1.getSizeThresholdToFlushSegment();
sizeRatio = flushThresholdUpdater.getLatestSegmentRowsToSizeRatio();
assertTrue(sizeRatio > 0.0);
@@ -287,7 +342,7 @@ public class FlushThresholdUpdaterTest {
sizeThresholdForPartition1);
flushThresholdUpdater
.updateFlushThreshold(streamConfig, newSegmentZKMetadataForPartition1, committingSegmentDescriptor,
- committingSegmentZKMetadata, 1);
+ committingSegmentZKMetadata, 1, Collections.emptyList());
assertEquals(flushThresholdUpdater.getLatestSegmentRowsToSizeRatio(), sizeRatio);
// First segment update from partition 0 should change the size ratio
@@ -295,7 +350,7 @@ public class FlushThresholdUpdaterTest {
sizeThresholdForPartition0);
flushThresholdUpdater
.updateFlushThreshold(streamConfig, newSegmentZKMetadataForPartition0, committingSegmentDescriptor,
- committingSegmentZKMetadata, 1);
+ committingSegmentZKMetadata, 1, Collections.emptyList());
assertNotEquals(flushThresholdUpdater.getLatestSegmentRowsToSizeRatio(), sizeRatio);
}
@@ -313,8 +368,8 @@ public class FlushThresholdUpdaterTest {
// Start consumption
SegmentZKMetadata newSegmentZKMetadata = getNewSegmentZKMetadata(0);
CommittingSegmentDescriptor committingSegmentDescriptor = getCommittingSegmentDescriptor(0L);
- flushThresholdUpdater
- .updateFlushThreshold(streamConfig, newSegmentZKMetadata, committingSegmentDescriptor, null, 1);
+ flushThresholdUpdater.updateFlushThreshold(streamConfig, newSegmentZKMetadata, committingSegmentDescriptor, null, 1,
+ Collections.emptyList());
int sizeThreshold = newSegmentZKMetadata.getSizeThresholdToFlushSegment();
assertEquals(sizeThreshold, flushAutotuneInitialRows);
@@ -328,7 +383,7 @@ public class FlushThresholdUpdaterTest {
SegmentZKMetadata committingSegmentZKMetadata =
getCommittingSegmentZKMetadata(creationTime, sizeThreshold, numRowsConsumed);
flushThresholdUpdater.updateFlushThreshold(streamConfig, newSegmentZKMetadata, committingSegmentDescriptor,
- committingSegmentZKMetadata, 1);
+ committingSegmentZKMetadata, 1, Collections.emptyList());
sizeThreshold = newSegmentZKMetadata.getSizeThresholdToFlushSegment();
assertTrue(sizeThreshold > numRowsConsumed);
@@ -341,7 +396,7 @@ public class FlushThresholdUpdaterTest {
mockAutotuneStreamConfig(flushSegmentDesiredSizeBytes, flushThresholdTimeMillis, flushAutotuneInitialRows);
committingSegmentZKMetadata = getCommittingSegmentZKMetadata(creationTime, sizeThreshold, numRowsConsumed);
flushThresholdUpdater.updateFlushThreshold(streamConfig, newSegmentZKMetadata, committingSegmentDescriptor,
- committingSegmentZKMetadata, 1);
+ committingSegmentZKMetadata, 1, Collections.emptyList());
sizeThreshold = newSegmentZKMetadata.getSizeThresholdToFlushSegment();
assertTrue(sizeThreshold < numRowsConsumed);
@@ -352,7 +407,7 @@ public class FlushThresholdUpdaterTest {
committingSegmentDescriptor = getCommittingSegmentDescriptor(committingSegmentSize);
committingSegmentZKMetadata = getCommittingSegmentZKMetadata(creationTime, sizeThreshold, numRowsConsumed);
flushThresholdUpdater.updateFlushThreshold(streamConfig, newSegmentZKMetadata, committingSegmentDescriptor,
- committingSegmentZKMetadata, 1);
+ committingSegmentZKMetadata, 1, Collections.emptyList());
sizeThreshold = newSegmentZKMetadata.getSizeThresholdToFlushSegment();
assertEquals(sizeThreshold,
(long) (numRowsConsumed * SegmentSizeBasedFlushThresholdUpdater.ROWS_MULTIPLIER_WHEN_TIME_THRESHOLD_HIT));
@@ -365,7 +420,7 @@ public class FlushThresholdUpdaterTest {
mockAutotuneStreamConfig(flushSegmentDesiredSizeBytes, flushThresholdTimeMillis, flushAutotuneInitialRows);
committingSegmentZKMetadata = getCommittingSegmentZKMetadata(creationTime, sizeThreshold, numRowsConsumed);
flushThresholdUpdater.updateFlushThreshold(streamConfig, newSegmentZKMetadata, committingSegmentDescriptor,
- committingSegmentZKMetadata, 1);
+ committingSegmentZKMetadata, 1, Collections.emptyList());
sizeThreshold = newSegmentZKMetadata.getSizeThresholdToFlushSegment();
assertTrue(sizeThreshold < numRowsConsumed);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org