You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2023/11/20 19:07:32 UTC
(pinot) branch master updated: Optimize segment commit to not read partition group metadata (#11943)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 8e84fc3fc5 Optimize segment commit to not read partition group metadata (#11943)
8e84fc3fc5 is described below
commit 8e84fc3fc50deeb3a05b24c09732c4116ad3979d
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Mon Nov 20 11:07:25 2023 -0800
Optimize segment commit to not read partition group metadata (#11943)
---
.../broker/broker/FakeStreamConsumerFactory.java | 6 ++
.../realtime/PinotLLCRealtimeSegmentManager.java | 117 +++++++++++++--------
.../segment/DefaultFlushThresholdUpdater.java | 4 +-
.../realtime/segment/FlushThresholdUpdater.java | 4 +-
.../segment/SegmentFlushThresholdComputer.java | 29 ++---
.../SegmentSizeBasedFlushThresholdUpdater.java | 14 +--
.../PinotLLCRealtimeSegmentManagerTest.java | 9 ++
.../segment/FlushThresholdUpdaterTest.java | 108 ++++---------------
.../segment/SegmentFlushThresholdComputerTest.java | 46 +++-----
.../fakestream/FakeStreamMetadataProvider.java | 8 ++
.../kafka20/KafkaStreamMetadataProvider.java | 19 ++++
.../pulsar/PulsarStreamMetadataProvider.java | 19 +++-
.../pinot/spi/stream/StreamMetadataProvider.java | 11 +-
13 files changed, 186 insertions(+), 208 deletions(-)
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/FakeStreamConsumerFactory.java b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/FakeStreamConsumerFactory.java
index 3ada727a13..b0fee61322 100644
--- a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/FakeStreamConsumerFactory.java
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/FakeStreamConsumerFactory.java
@@ -21,6 +21,7 @@ package org.apache.pinot.broker.broker;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.TimeoutException;
import org.apache.pinot.spi.stream.LongMsgOffset;
import org.apache.pinot.spi.stream.MessageBatch;
@@ -77,6 +78,11 @@ public class FakeStreamConsumerFactory extends StreamConsumerFactory {
return 1;
}
+ @Override
+ public Set<Integer> fetchPartitionIds(long timeoutMillis) {
+ return Collections.singleton(0);
+ }
+
@Override
public StreamPartitionMsgOffset fetchStreamPartitionOffset(OffsetCriteria offsetCriteria, long timeoutMillis)
throws TimeoutException {
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 8611c251aa..90cd05d16b 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -92,7 +92,9 @@ import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
import org.apache.pinot.spi.stream.PartitionGroupMetadata;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamConfigProperties;
+import org.apache.pinot.spi.stream.StreamConsumerFactory;
import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
+import org.apache.pinot.spi.stream.StreamMetadataProvider;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffsetFactory;
import org.apache.pinot.spi.utils.CommonConstants;
@@ -322,8 +324,7 @@ public class PinotLLCRealtimeSegmentManager {
for (PartitionGroupMetadata partitionGroupMetadata : newPartitionGroupMetadataList) {
String segmentName =
setupNewPartitionGroup(tableConfig, streamConfig, partitionGroupMetadata, currentTimeMs, instancePartitions,
- numPartitionGroups, numReplicas, newPartitionGroupMetadataList);
-
+ numPartitionGroups, numReplicas);
updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null, segmentName, segmentAssignment,
instancePartitionsMap);
}
@@ -500,6 +501,10 @@ public class PinotLLCRealtimeSegmentManager {
LLCSegmentName committingLLCSegment = new LLCSegmentName(committingSegmentName);
int committingSegmentPartitionGroupId = committingLLCSegment.getPartitionGroupId();
LOGGER.info("Committing segment metadata for segment: {}", committingSegmentName);
+ if (StringUtils.isBlank(committingSegmentDescriptor.getSegmentLocation())) {
+ LOGGER.warn("Committing segment: {} was not uploaded to deep store", committingSegmentName);
+ _controllerMetrics.addMeteredTableValue(realtimeTableName, ControllerMeter.SEGMENT_MISSING_DEEP_STORE_LINK, 1);
+ }
TableConfig tableConfig = getTableConfig(realtimeTableName);
InstancePartitions instancePartitions = getConsumingInstancePartitions(tableConfig);
@@ -518,40 +523,47 @@ public class PinotLLCRealtimeSegmentManager {
*/
// Step-1
+ long startTimeNs1 = System.nanoTime();
SegmentZKMetadata committingSegmentZKMetadata =
updateCommittingSegmentZKMetadata(realtimeTableName, committingSegmentDescriptor);
// Refresh the Broker routing to reflect the changes in the segment ZK metadata
_helixResourceManager.sendSegmentRefreshMessage(realtimeTableName, committingSegmentName, false, true);
- // Using the latest segment of each partition group, creates a list of {@link PartitionGroupConsumptionStatus}
- StreamConfig streamConfig =
- new StreamConfig(tableConfig.getTableName(), IngestionConfigUtils.getStreamConfigMap(tableConfig));
- List<PartitionGroupConsumptionStatus> currentPartitionGroupConsumptionStatusList =
- getPartitionGroupConsumptionStatusList(idealState, streamConfig);
-
- // Fetches new partition groups, given current list of {@link PartitionGroupConsumptionStatus}.
- List<PartitionGroupMetadata> newPartitionGroupMetadataList =
- getNewPartitionGroupMetadataList(streamConfig, currentPartitionGroupConsumptionStatusList);
- Set<Integer> newPartitionGroupSet =
- newPartitionGroupMetadataList.stream().map(PartitionGroupMetadata::getPartitionGroupId)
- .collect(Collectors.toSet());
- int numPartitionGroups = newPartitionGroupMetadataList.size();
-
+ // Step-2
+ long startTimeNs2 = System.nanoTime();
String newConsumingSegmentName = null;
- if (!isTablePaused(idealState) && newPartitionGroupSet.contains(committingSegmentPartitionGroupId)) {
- // Only if committingSegment's partitionGroup is present in the newPartitionGroupMetadataList, we create new
- // segment metadata
- String rawTableName = TableNameBuilder.extractRawTableName(realtimeTableName);
- long newSegmentCreationTimeMs = getCurrentTimeMs();
- LLCSegmentName newLLCSegment = new LLCSegmentName(rawTableName, committingSegmentPartitionGroupId,
- committingLLCSegment.getSequenceNumber() + 1, newSegmentCreationTimeMs);
- createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegment, newSegmentCreationTimeMs,
- committingSegmentDescriptor, committingSegmentZKMetadata, instancePartitions, numPartitionGroups, numReplicas,
- newPartitionGroupMetadataList);
- newConsumingSegmentName = newLLCSegment.getSegmentName();
+ if (!isTablePaused(idealState)) {
+ StreamConfig streamConfig =
+ new StreamConfig(tableConfig.getTableName(), IngestionConfigUtils.getStreamConfigMap(tableConfig));
+ Set<Integer> partitionIds;
+ try {
+ partitionIds = getPartitionIds(streamConfig);
+ } catch (Exception e) {
+ LOGGER.info("Failed to fetch partition ids from stream metadata provider for table: {}, exception: {}. "
+ + "Reading all partition group metadata to determine partition ids.", realtimeTableName, e.toString());
+ // TODO: Find a better way to determine partition count and if the committing partition group is fully consumed.
+ // We don't need to read partition group metadata for other partition groups.
+ List<PartitionGroupConsumptionStatus> currentPartitionGroupConsumptionStatusList =
+ getPartitionGroupConsumptionStatusList(idealState, streamConfig);
+ List<PartitionGroupMetadata> newPartitionGroupMetadataList =
+ getNewPartitionGroupMetadataList(streamConfig, currentPartitionGroupConsumptionStatusList);
+ partitionIds = newPartitionGroupMetadataList.stream().map(PartitionGroupMetadata::getPartitionGroupId)
+ .collect(Collectors.toSet());
+ }
+ if (partitionIds.contains(committingSegmentPartitionGroupId)) {
+ String rawTableName = TableNameBuilder.extractRawTableName(realtimeTableName);
+ long newSegmentCreationTimeMs = getCurrentTimeMs();
+ LLCSegmentName newLLCSegment = new LLCSegmentName(rawTableName, committingSegmentPartitionGroupId,
+ committingLLCSegment.getSequenceNumber() + 1, newSegmentCreationTimeMs);
+ createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegment, newSegmentCreationTimeMs,
+ committingSegmentDescriptor, committingSegmentZKMetadata, instancePartitions, partitionIds.size(),
+ numReplicas);
+ newConsumingSegmentName = newLLCSegment.getSegmentName();
+ }
}
// Step-3
+ long startTimeNs3 = System.nanoTime();
SegmentAssignment segmentAssignment =
SegmentAssignmentFactory.getSegmentAssignment(_helixManager, tableConfig, _controllerMetrics);
Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap =
@@ -572,6 +584,15 @@ public class PinotLLCRealtimeSegmentManager {
lock.unlock();
}
+ long endTimeNs = System.nanoTime();
+ LOGGER.info(
+ "Finished committing segment metadata for segment: {}. Time taken for updating committing segment metadata: "
+ + "{}ms; creating new consuming segment ({}) metadata: {}ms; updating ideal state: {}ms; total: {}ms",
+ committingSegmentName, TimeUnit.NANOSECONDS.toMillis(startTimeNs2 - startTimeNs1), newConsumingSegmentName,
+ TimeUnit.NANOSECONDS.toMillis(startTimeNs3 - startTimeNs2),
+ TimeUnit.NANOSECONDS.toMillis(endTimeNs - startTimeNs3),
+ TimeUnit.NANOSECONDS.toMillis(endTimeNs - startTimeNs1));
+
// TODO: also create the new partition groups here, instead of waiting till the {@link
// RealtimeSegmentValidationManager} runs
// E.g. If current state is A, B, C, and newPartitionGroupMetadataList contains B, C, D, E,
@@ -581,10 +602,6 @@ public class PinotLLCRealtimeSegmentManager {
// Trigger the metadata event notifier
_metadataEventNotifierFactory.create().notifyOnSegmentFlush(tableConfig);
-
- if (StringUtils.isBlank(committingSegmentDescriptor.getSegmentLocation())) {
- _controllerMetrics.addMeteredTableValue(realtimeTableName, ControllerMeter.SEGMENT_MISSING_DEEP_STORE_LINK, 1);
- }
}
/**
@@ -649,8 +666,8 @@ public class PinotLLCRealtimeSegmentManager {
*/
private void createNewSegmentZKMetadata(TableConfig tableConfig, StreamConfig streamConfig,
LLCSegmentName newLLCSegmentName, long creationTimeMs, CommittingSegmentDescriptor committingSegmentDescriptor,
- @Nullable SegmentZKMetadata committingSegmentZKMetadata, InstancePartitions instancePartitions,
- int numPartitionGroups, int numReplicas, List<PartitionGroupMetadata> partitionGroupMetadataList) {
+ @Nullable SegmentZKMetadata committingSegmentZKMetadata, InstancePartitions instancePartitions, int numPartitions,
+ int numReplicas) {
String realtimeTableName = tableConfig.getTableName();
String segmentName = newLLCSegmentName.getSegmentName();
String startOffset = committingSegmentDescriptor.getNextOffset();
@@ -668,7 +685,7 @@ public class PinotLLCRealtimeSegmentManager {
// Add the partition metadata if available
SegmentPartitionMetadata partitionMetadata =
- getPartitionMetadataFromTableConfig(tableConfig, newLLCSegmentName.getPartitionGroupId(), numPartitionGroups);
+ getPartitionMetadataFromTableConfig(tableConfig, newLLCSegmentName.getPartitionGroupId(), numPartitions);
if (partitionMetadata != null) {
newSegmentZKMetadata.setPartitionMetadata(partitionMetadata);
}
@@ -676,9 +693,7 @@ public class PinotLLCRealtimeSegmentManager {
// Update the flush threshold
FlushThresholdUpdater flushThresholdUpdater = _flushThresholdUpdateManager.getFlushThresholdUpdater(streamConfig);
flushThresholdUpdater.updateFlushThreshold(streamConfig, newSegmentZKMetadata, committingSegmentDescriptor,
- committingSegmentZKMetadata,
- getMaxNumPartitionsPerInstance(instancePartitions, numPartitionGroups, numReplicas),
- partitionGroupMetadataList);
+ committingSegmentZKMetadata, getMaxNumPartitionsPerInstance(instancePartitions, numPartitions, numReplicas));
persistSegmentZKMetadata(realtimeTableName, newSegmentZKMetadata, -1);
}
@@ -747,6 +762,22 @@ public class PinotLLCRealtimeSegmentManager {
return commitTimeoutMS;
}
+ /**
+ * Fetches the partition ids for the stream. Some stream (e.g. Kinesis) might not support this operation, in which
+ * case exception will be thrown.
+ */
+ @VisibleForTesting
+ Set<Integer> getPartitionIds(StreamConfig streamConfig)
+ throws Exception {
+ String clientId =
+ PinotLLCRealtimeSegmentManager.class.getSimpleName() + "-" + streamConfig.getTableNameWithType() + "-"
+ + streamConfig.getTopicName();
+ StreamConsumerFactory consumerFactory = StreamConsumerFactoryProvider.create(streamConfig);
+ try (StreamMetadataProvider metadataProvider = consumerFactory.createStreamMetadataProvider(clientId)) {
+ return metadataProvider.fetchPartitionIds(5000L);
+ }
+ }
+
/**
* Fetches the latest state of the PartitionGroups for the stream
* If any partition has reached end of life, and all messages of that partition have been consumed by the segment,
@@ -1122,8 +1153,7 @@ public class PinotLLCRealtimeSegmentManager {
new CommittingSegmentDescriptor(latestSegmentName,
(offsetFactory.create(latestSegmentZKMetadata.getEndOffset()).toString()), 0);
createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName, currentTimeMs,
- committingSegmentDescriptor, latestSegmentZKMetadata, instancePartitions, numPartitions, numReplicas,
- newPartitionGroupMetadataList);
+ committingSegmentDescriptor, latestSegmentZKMetadata, instancePartitions, numPartitions, numReplicas);
updateInstanceStatesForNewConsumingSegment(instanceStatesMap, latestSegmentName, newSegmentName,
segmentAssignment, instancePartitionsMap);
} else { // partition group reached end of life
@@ -1227,7 +1257,7 @@ public class PinotLLCRealtimeSegmentManager {
if (!latestSegmentZKMetadataMap.containsKey(partitionGroupId)) {
String newSegmentName =
setupNewPartitionGroup(tableConfig, streamConfig, partitionGroupMetadata, currentTimeMs, instancePartitions,
- numPartitions, numReplicas, newPartitionGroupMetadataList);
+ numPartitions, numReplicas);
updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null, newSegmentName, segmentAssignment,
instancePartitionsMap);
}
@@ -1248,7 +1278,7 @@ public class PinotLLCRealtimeSegmentManager {
CommittingSegmentDescriptor committingSegmentDescriptor =
new CommittingSegmentDescriptor(latestSegmentZKMetadata.getSegmentName(), startOffset.toString(), 0);
createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName, currentTimeMs, committingSegmentDescriptor,
- latestSegmentZKMetadata, instancePartitions, numPartitions, numReplicas, newPartitionGroupMetadataList);
+ latestSegmentZKMetadata, instancePartitions, numPartitions, numReplicas);
String newSegmentName = newLLCSegmentName.getSegmentName();
updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null, newSegmentName, segmentAssignment,
instancePartitionsMap);
@@ -1300,7 +1330,7 @@ public class PinotLLCRealtimeSegmentManager {
*/
private String setupNewPartitionGroup(TableConfig tableConfig, StreamConfig streamConfig,
PartitionGroupMetadata partitionGroupMetadata, long creationTimeMs, InstancePartitions instancePartitions,
- int numPartitionGroups, int numReplicas, List<PartitionGroupMetadata> partitionGroupMetadataList) {
+ int numPartitions, int numReplicas) {
String realtimeTableName = tableConfig.getTableName();
int partitionGroupId = partitionGroupMetadata.getPartitionGroupId();
String startOffset = partitionGroupMetadata.getStartOffset().toString();
@@ -1313,8 +1343,7 @@ public class PinotLLCRealtimeSegmentManager {
CommittingSegmentDescriptor committingSegmentDescriptor = new CommittingSegmentDescriptor(null, startOffset, 0);
createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName, creationTimeMs,
- committingSegmentDescriptor, null, instancePartitions, numPartitionGroups, numReplicas,
- partitionGroupMetadataList);
+ committingSegmentDescriptor, null, instancePartitions, numPartitions, numReplicas);
return newSegmentName;
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/DefaultFlushThresholdUpdater.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/DefaultFlushThresholdUpdater.java
index e1dc7f9a80..57a4a11743 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/DefaultFlushThresholdUpdater.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/DefaultFlushThresholdUpdater.java
@@ -19,10 +19,8 @@
package org.apache.pinot.controller.helix.core.realtime.segment;
import com.google.common.annotations.VisibleForTesting;
-import java.util.List;
import javax.annotation.Nullable;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
-import org.apache.pinot.spi.stream.PartitionGroupMetadata;
import org.apache.pinot.spi.stream.StreamConfig;
@@ -40,7 +38,7 @@ public class DefaultFlushThresholdUpdater implements FlushThresholdUpdater {
@Override
public void updateFlushThreshold(StreamConfig streamConfig, SegmentZKMetadata newSegmentZKMetadata,
CommittingSegmentDescriptor committingSegmentDescriptor, @Nullable SegmentZKMetadata committingSegmentZKMetadata,
- int maxNumPartitionsPerInstance, List<PartitionGroupMetadata> partitionGroupMetadataList) {
+ int maxNumPartitionsPerInstance) {
// Configure the segment size flush limit based on the maximum number of partitions allocated to an instance
newSegmentZKMetadata.setSizeThresholdToFlushSegment(_tableFlushSize / maxNumPartitionsPerInstance);
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdater.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdater.java
index 5022ca398a..e322a367e0 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdater.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdater.java
@@ -18,10 +18,8 @@
*/
package org.apache.pinot.controller.helix.core.realtime.segment;
-import java.util.List;
import javax.annotation.Nullable;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
-import org.apache.pinot.spi.stream.PartitionGroupMetadata;
import org.apache.pinot.spi.stream.StreamConfig;
@@ -36,5 +34,5 @@ public interface FlushThresholdUpdater {
*/
void updateFlushThreshold(StreamConfig streamConfig, SegmentZKMetadata newSegmentZKMetadata,
CommittingSegmentDescriptor committingSegmentDescriptor, @Nullable SegmentZKMetadata committingSegmentZKMetadata,
- int maxNumPartitionsPerInstance, List<PartitionGroupMetadata> partitionGroupMetadataList);
+ int maxNumPartitionsPerInstance);
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentFlushThresholdComputer.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentFlushThresholdComputer.java
index aba18df887..808642a345 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentFlushThresholdComputer.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentFlushThresholdComputer.java
@@ -20,11 +20,8 @@ package org.apache.pinot.controller.helix.core.realtime.segment;
import com.google.common.annotations.VisibleForTesting;
import java.time.Clock;
-import java.util.List;
import javax.annotation.Nullable;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
-import org.apache.pinot.common.utils.LLCSegmentName;
-import org.apache.pinot.spi.stream.PartitionGroupMetadata;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.utils.TimeUtils;
@@ -58,8 +55,7 @@ class SegmentFlushThresholdComputer {
}
public int computeThreshold(StreamConfig streamConfig, CommittingSegmentDescriptor committingSegmentDescriptor,
- @Nullable SegmentZKMetadata committingSegmentZKMetadata, List<PartitionGroupMetadata> partitionGroupMetadataList,
- String newSegmentName) {
+ @Nullable SegmentZKMetadata committingSegmentZKMetadata, String newSegmentName) {
final long desiredSegmentSizeBytes = streamConfig.getFlushThresholdSegmentSizeBytes();
final long optimalSegmentSizeBytesMin = desiredSegmentSizeBytes / 2;
final double optimalSegmentSizeBytesMax = desiredSegmentSizeBytes * 1.5;
@@ -99,24 +95,11 @@ class SegmentFlushThresholdComputer {
committingSegmentSizeBytes);
double currentRatio = (double) numRowsConsumed / committingSegmentSizeBytes;
- // Compute segment size to rows ratio only from the lowest available partition id.
- // If we consider all partitions then it is likely that we will assign a much higher weight to the most
- // recent trend in the table (since it is usually true that all partitions of the same table follow more or
- // less same characteristics at any one point in time).
- // However, when we start a new table or change controller mastership, we can have any partition completing first.
- // It is best to learn the ratio as quickly as we can, so we allow any partition to supply the value.
- int smallestAvailablePartitionGroupId =
- partitionGroupMetadataList.stream().mapToInt(PartitionGroupMetadata::getPartitionGroupId).min().orElse(0);
-
- if (new LLCSegmentName(newSegmentName).getPartitionGroupId() == smallestAvailablePartitionGroupId
- || _latestSegmentRowsToSizeRatio == 0) {
- if (_latestSegmentRowsToSizeRatio > 0) {
- _latestSegmentRowsToSizeRatio =
- CURRENT_SEGMENT_RATIO_WEIGHT * currentRatio
- + PREVIOUS_SEGMENT_RATIO_WEIGHT * _latestSegmentRowsToSizeRatio;
- } else {
- _latestSegmentRowsToSizeRatio = currentRatio;
- }
+ if (_latestSegmentRowsToSizeRatio > 0) {
+ _latestSegmentRowsToSizeRatio =
+ CURRENT_SEGMENT_RATIO_WEIGHT * currentRatio + PREVIOUS_SEGMENT_RATIO_WEIGHT * _latestSegmentRowsToSizeRatio;
+ } else {
+ _latestSegmentRowsToSizeRatio = currentRatio;
}
// If the number of rows consumed is less than what we set as target in metadata, then the segment hit time limit.
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentSizeBasedFlushThresholdUpdater.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentSizeBasedFlushThresholdUpdater.java
index 439279afbd..511e85651b 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentSizeBasedFlushThresholdUpdater.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentSizeBasedFlushThresholdUpdater.java
@@ -18,11 +18,8 @@
*/
package org.apache.pinot.controller.helix.core.realtime.segment;
-import com.google.common.annotations.VisibleForTesting;
-import java.util.List;
import javax.annotation.Nullable;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
-import org.apache.pinot.spi.stream.PartitionGroupMetadata;
import org.apache.pinot.spi.stream.StreamConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,18 +30,13 @@ import org.slf4j.LoggerFactory;
* previous segment
* The formula used to compute new number of rows is:
* targetNumRows = ideal_segment_size * (a * current_rows_to_size_ratio + b * previous_rows_to_size_ratio)
- * where a = 0.25, b = 0.75, prev ratio= ratio collected over all previous segment completions
+ * where a = 0.1, b = 0.9, prev ratio= ratio collected over all previous segment completions
* This ensures that we take into account the history of the segment size and number rows
*/
public class SegmentSizeBasedFlushThresholdUpdater implements FlushThresholdUpdater {
public static final Logger LOGGER = LoggerFactory.getLogger(SegmentSizeBasedFlushThresholdUpdater.class);
private final SegmentFlushThresholdComputer _flushThresholdComputer;
- @VisibleForTesting
- double getLatestSegmentRowsToSizeRatio() {
- return _flushThresholdComputer.getLatestSegmentRowsToSizeRatio();
- }
-
public SegmentSizeBasedFlushThresholdUpdater() {
_flushThresholdComputer = new SegmentFlushThresholdComputer();
}
@@ -53,10 +45,10 @@ public class SegmentSizeBasedFlushThresholdUpdater implements FlushThresholdUpda
@Override
public synchronized void updateFlushThreshold(StreamConfig streamConfig, SegmentZKMetadata newSegmentZKMetadata,
CommittingSegmentDescriptor committingSegmentDescriptor, @Nullable SegmentZKMetadata committingSegmentZKMetadata,
- int maxNumPartitionsPerInstance, List<PartitionGroupMetadata> partitionGroupMetadataList) {
+ int maxNumPartitionsPerInstance) {
int threshold =
_flushThresholdComputer.computeThreshold(streamConfig, committingSegmentDescriptor, committingSegmentZKMetadata,
- partitionGroupMetadataList, newSegmentZKMetadata.getSegmentName());
+ newSegmentZKMetadata.getSegmentName());
newSegmentZKMetadata.setSizeThresholdToFlushSegment(threshold);
}
}
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index ffc98692a0..92d7ec19b4 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -31,6 +31,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
+import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
@@ -1220,6 +1221,14 @@ public class PinotLLCRealtimeSegmentManagerTest {
segmentAssignment, instancePartitionsMap);
}
+ @Override
+ Set<Integer> getPartitionIds(StreamConfig streamConfig) {
+ if (_partitionGroupMetadataList != null) {
+ throw new UnsupportedOperationException();
+ }
+ return IntStream.range(0, _numPartitions).boxed().collect(Collectors.toSet());
+ }
+
@Override
List<PartitionGroupMetadata> getNewPartitionGroupMetadataList(StreamConfig streamConfig,
List<PartitionGroupConsumptionStatus> currentPartitionGroupConsumptionStatusList) {
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdaterTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdaterTest.java
index fbdaec5d3e..1184f113d2 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdaterTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdaterTest.java
@@ -18,14 +18,10 @@
*/
package org.apache.pinot.controller.helix.core.realtime.segment;
-import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.spi.stream.LongMsgOffset;
-import org.apache.pinot.spi.stream.PartitionGroupMetadata;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.testng.annotations.Test;
@@ -134,9 +130,8 @@ public class FlushThresholdUpdaterTest {
// Start consumption
SegmentZKMetadata newSegmentZKMetadata = getNewSegmentZKMetadata(0);
CommittingSegmentDescriptor committingSegmentDescriptor = getCommittingSegmentDescriptor(0L);
- flushThresholdUpdater
- .updateFlushThreshold(streamConfig, newSegmentZKMetadata, committingSegmentDescriptor, null, 1,
- Collections.emptyList());
+ flushThresholdUpdater.updateFlushThreshold(streamConfig, newSegmentZKMetadata, committingSegmentDescriptor, null,
+ 1);
assertEquals(newSegmentZKMetadata.getSizeThresholdToFlushSegment(), streamConfig.getFlushAutotuneInitialRows());
int numRuns = 500;
@@ -148,7 +143,7 @@ public class FlushThresholdUpdaterTest {
SegmentZKMetadata committingSegmentZKMetadata =
getCommittingSegmentZKMetadata(System.currentTimeMillis(), numRowsConsumed, numRowsConsumed);
flushThresholdUpdater.updateFlushThreshold(streamConfig, newSegmentZKMetadata, committingSegmentDescriptor,
- committingSegmentZKMetadata, 1, Collections.emptyList());
+ committingSegmentZKMetadata, 1);
// Assert that segment size is in limits
if (run > checkRunsAfter) {
@@ -172,9 +167,8 @@ public class FlushThresholdUpdaterTest {
// Start consumption
SegmentZKMetadata newSegmentZKMetadata = getNewSegmentZKMetadata(1);
CommittingSegmentDescriptor committingSegmentDescriptor = getCommittingSegmentDescriptor(0L);
- flushThresholdUpdater
- .updateFlushThreshold(streamConfig, newSegmentZKMetadata, committingSegmentDescriptor, null, 1,
- getPartitionGroupMetadataList(3, 1));
+ flushThresholdUpdater.updateFlushThreshold(streamConfig, newSegmentZKMetadata, committingSegmentDescriptor, null,
+ 1);
assertEquals(newSegmentZKMetadata.getSizeThresholdToFlushSegment(), streamConfig.getFlushAutotuneInitialRows());
int numRuns = 500;
@@ -186,7 +180,7 @@ public class FlushThresholdUpdaterTest {
SegmentZKMetadata committingSegmentZKMetadata =
getCommittingSegmentZKMetadata(System.currentTimeMillis(), numRowsConsumed, numRowsConsumed);
flushThresholdUpdater.updateFlushThreshold(streamConfig, newSegmentZKMetadata, committingSegmentDescriptor,
- committingSegmentZKMetadata, 1, getPartitionGroupMetadataList(3, 1));
+ committingSegmentZKMetadata, 1);
// Assert that segment size is in limits
if (run > checkRunsAfter) {
@@ -201,16 +195,6 @@ public class FlushThresholdUpdaterTest {
new LLCSegmentName(RAW_TABLE_NAME, partitionId, 0, System.currentTimeMillis()).getSegmentName());
}
- private List<PartitionGroupMetadata> getPartitionGroupMetadataList(int numPartitions, int startPartitionId) {
- List<PartitionGroupMetadata> newPartitionGroupMetadataList = new ArrayList<>();
-
- for (int i = 0; i < numPartitions; i++) {
- newPartitionGroupMetadataList.add(new PartitionGroupMetadata(startPartitionId + i, null));
- }
-
- return newPartitionGroupMetadataList;
- }
-
private CommittingSegmentDescriptor getCommittingSegmentDescriptor(long segmentSizeBytes) {
return new CommittingSegmentDescriptor(null, new LongMsgOffset(0).toString(), segmentSizeBytes);
}
@@ -244,8 +228,8 @@ public class FlushThresholdUpdaterTest {
// Start consumption
SegmentZKMetadata newSegmentZKMetadata = getNewSegmentZKMetadata(0);
CommittingSegmentDescriptor committingSegmentDescriptor = getCommittingSegmentDescriptor(0L);
- flushThresholdUpdater.updateFlushThreshold(streamConfig, newSegmentZKMetadata, committingSegmentDescriptor, null, 1,
- Collections.emptyList());
+ flushThresholdUpdater.updateFlushThreshold(streamConfig, newSegmentZKMetadata, committingSegmentDescriptor, null,
+ 1);
int sizeThreshold = newSegmentZKMetadata.getSizeThresholdToFlushSegment();
// First segment consumes rows less than the threshold
@@ -254,7 +238,7 @@ public class FlushThresholdUpdaterTest {
SegmentZKMetadata committingSegmentZKMetadata =
getCommittingSegmentZKMetadata(System.currentTimeMillis(), sizeThreshold, numRowsConsumed);
flushThresholdUpdater.updateFlushThreshold(streamConfig, newSegmentZKMetadata, committingSegmentDescriptor,
- committingSegmentZKMetadata, 1, Collections.emptyList());
+ committingSegmentZKMetadata, 1);
sizeThreshold = newSegmentZKMetadata.getSizeThresholdToFlushSegment();
assertEquals(sizeThreshold,
(int) (numRowsConsumed * SegmentFlushThresholdComputer.ROWS_MULTIPLIER_WHEN_TIME_THRESHOLD_HIT));
@@ -264,7 +248,7 @@ public class FlushThresholdUpdaterTest {
committingSegmentZKMetadata =
getCommittingSegmentZKMetadata(System.currentTimeMillis(), sizeThreshold, numRowsConsumed);
flushThresholdUpdater.updateFlushThreshold(streamConfig, newSegmentZKMetadata, committingSegmentDescriptor,
- committingSegmentZKMetadata, 1, Collections.emptyList());
+ committingSegmentZKMetadata, 1);
assertNotEquals(newSegmentZKMetadata.getSizeThresholdToFlushSegment(),
(int) (numRowsConsumed * SegmentFlushThresholdComputer.ROWS_MULTIPLIER_WHEN_TIME_THRESHOLD_HIT));
}
@@ -277,8 +261,8 @@ public class FlushThresholdUpdaterTest {
// Start consumption
SegmentZKMetadata newSegmentZKMetadata = getNewSegmentZKMetadata(0);
CommittingSegmentDescriptor committingSegmentDescriptor = getCommittingSegmentDescriptor(0L);
- flushThresholdUpdater.updateFlushThreshold(streamConfig, newSegmentZKMetadata, committingSegmentDescriptor, null, 1,
- Collections.emptyList());
+ flushThresholdUpdater.updateFlushThreshold(streamConfig, newSegmentZKMetadata, committingSegmentDescriptor, null,
+ 1);
int sizeThreshold = newSegmentZKMetadata.getSizeThresholdToFlushSegment();
// First segment only consumed 15 rows, so next segment should have size threshold of 10_000
@@ -287,7 +271,7 @@ public class FlushThresholdUpdaterTest {
SegmentZKMetadata committingSegmentZKMetadata =
getCommittingSegmentZKMetadata(System.currentTimeMillis(), sizeThreshold, numRowsConsumed);
flushThresholdUpdater.updateFlushThreshold(streamConfig, newSegmentZKMetadata, committingSegmentDescriptor,
- committingSegmentZKMetadata, 1, Collections.emptyList());
+ committingSegmentZKMetadata, 1);
sizeThreshold = newSegmentZKMetadata.getSizeThresholdToFlushSegment();
assertEquals(sizeThreshold, SegmentFlushThresholdComputer.MINIMUM_NUM_ROWS_THRESHOLD);
@@ -296,63 +280,11 @@ public class FlushThresholdUpdaterTest {
committingSegmentZKMetadata =
getCommittingSegmentZKMetadata(System.currentTimeMillis(), sizeThreshold, numRowsConsumed);
flushThresholdUpdater.updateFlushThreshold(streamConfig, newSegmentZKMetadata, committingSegmentDescriptor,
- committingSegmentZKMetadata, 1, Collections.emptyList());
+ committingSegmentZKMetadata, 1);
sizeThreshold = newSegmentZKMetadata.getSizeThresholdToFlushSegment();
assertEquals(sizeThreshold, SegmentFlushThresholdComputer.MINIMUM_NUM_ROWS_THRESHOLD);
}
- @Test
- public void testNonZeroPartitionUpdates() {
- SegmentSizeBasedFlushThresholdUpdater flushThresholdUpdater = new SegmentSizeBasedFlushThresholdUpdater();
- StreamConfig streamConfig = mockDefaultAutotuneStreamConfig();
-
- // Start consumption for 2 partitions
- SegmentZKMetadata newSegmentZKMetadataForPartition0 = getNewSegmentZKMetadata(0);
- SegmentZKMetadata newSegmentZKMetadataForPartition1 = getNewSegmentZKMetadata(1);
- CommittingSegmentDescriptor committingSegmentDescriptor = getCommittingSegmentDescriptor(0L);
- flushThresholdUpdater
- .updateFlushThreshold(streamConfig, newSegmentZKMetadataForPartition0, committingSegmentDescriptor, null, 1,
- Collections.emptyList());
- flushThresholdUpdater
- .updateFlushThreshold(streamConfig, newSegmentZKMetadataForPartition1, committingSegmentDescriptor, null, 1,
- Collections.emptyList());
- int sizeThresholdForPartition0 = newSegmentZKMetadataForPartition0.getSizeThresholdToFlushSegment();
- int sizeThresholdForPartition1 = newSegmentZKMetadataForPartition1.getSizeThresholdToFlushSegment();
- double sizeRatio = flushThresholdUpdater.getLatestSegmentRowsToSizeRatio();
- assertEquals(sizeThresholdForPartition0, streamConfig.getFlushAutotuneInitialRows());
- assertEquals(sizeThresholdForPartition1, streamConfig.getFlushAutotuneInitialRows());
- assertEquals(sizeRatio, 0.0);
-
- // First segment from partition 1 should change the size ratio
- committingSegmentDescriptor = getCommittingSegmentDescriptor(128_000_000L);
- SegmentZKMetadata committingSegmentZKMetadata =
- getCommittingSegmentZKMetadata(System.currentTimeMillis(), sizeThresholdForPartition1,
- sizeThresholdForPartition1);
- flushThresholdUpdater
- .updateFlushThreshold(streamConfig, newSegmentZKMetadataForPartition1, committingSegmentDescriptor,
- committingSegmentZKMetadata, 1, Collections.emptyList());
- sizeThresholdForPartition1 = newSegmentZKMetadataForPartition1.getSizeThresholdToFlushSegment();
- sizeRatio = flushThresholdUpdater.getLatestSegmentRowsToSizeRatio();
- assertTrue(sizeRatio > 0.0);
-
- // Second segment update from partition 1 should not change the size ratio
- committingSegmentDescriptor = getCommittingSegmentDescriptor(256_000_000L);
- committingSegmentZKMetadata = getCommittingSegmentZKMetadata(System.currentTimeMillis(), sizeThresholdForPartition1,
- sizeThresholdForPartition1);
- flushThresholdUpdater
- .updateFlushThreshold(streamConfig, newSegmentZKMetadataForPartition1, committingSegmentDescriptor,
- committingSegmentZKMetadata, 1, Collections.emptyList());
- assertEquals(flushThresholdUpdater.getLatestSegmentRowsToSizeRatio(), sizeRatio);
-
- // First segment update from partition 0 should change the size ratio
- committingSegmentZKMetadata = getCommittingSegmentZKMetadata(System.currentTimeMillis(), sizeThresholdForPartition0,
- sizeThresholdForPartition0);
- flushThresholdUpdater
- .updateFlushThreshold(streamConfig, newSegmentZKMetadataForPartition0, committingSegmentDescriptor,
- committingSegmentZKMetadata, 1, Collections.emptyList());
- assertNotEquals(flushThresholdUpdater.getLatestSegmentRowsToSizeRatio(), sizeRatio);
- }
-
@Test
public void testSegmentSizeBasedUpdaterWithModifications() {
SegmentSizeBasedFlushThresholdUpdater flushThresholdUpdater = new SegmentSizeBasedFlushThresholdUpdater();
@@ -367,8 +299,8 @@ public class FlushThresholdUpdaterTest {
// Start consumption
SegmentZKMetadata newSegmentZKMetadata = getNewSegmentZKMetadata(0);
CommittingSegmentDescriptor committingSegmentDescriptor = getCommittingSegmentDescriptor(0L);
- flushThresholdUpdater.updateFlushThreshold(streamConfig, newSegmentZKMetadata, committingSegmentDescriptor, null, 1,
- Collections.emptyList());
+ flushThresholdUpdater.updateFlushThreshold(streamConfig, newSegmentZKMetadata, committingSegmentDescriptor, null,
+ 1);
int sizeThreshold = newSegmentZKMetadata.getSizeThresholdToFlushSegment();
assertEquals(sizeThreshold, flushAutotuneInitialRows);
@@ -382,7 +314,7 @@ public class FlushThresholdUpdaterTest {
SegmentZKMetadata committingSegmentZKMetadata =
getCommittingSegmentZKMetadata(creationTime, sizeThreshold, numRowsConsumed);
flushThresholdUpdater.updateFlushThreshold(streamConfig, newSegmentZKMetadata, committingSegmentDescriptor,
- committingSegmentZKMetadata, 1, Collections.emptyList());
+ committingSegmentZKMetadata, 1);
sizeThreshold = newSegmentZKMetadata.getSizeThresholdToFlushSegment();
assertTrue(sizeThreshold > numRowsConsumed);
@@ -395,7 +327,7 @@ public class FlushThresholdUpdaterTest {
mockAutotuneStreamConfig(flushSegmentDesiredSizeBytes, flushThresholdTimeMillis, flushAutotuneInitialRows);
committingSegmentZKMetadata = getCommittingSegmentZKMetadata(creationTime, sizeThreshold, numRowsConsumed);
flushThresholdUpdater.updateFlushThreshold(streamConfig, newSegmentZKMetadata, committingSegmentDescriptor,
- committingSegmentZKMetadata, 1, Collections.emptyList());
+ committingSegmentZKMetadata, 1);
sizeThreshold = newSegmentZKMetadata.getSizeThresholdToFlushSegment();
assertTrue(sizeThreshold < numRowsConsumed);
@@ -406,7 +338,7 @@ public class FlushThresholdUpdaterTest {
committingSegmentDescriptor = getCommittingSegmentDescriptor(committingSegmentSize);
committingSegmentZKMetadata = getCommittingSegmentZKMetadata(creationTime, sizeThreshold, numRowsConsumed);
flushThresholdUpdater.updateFlushThreshold(streamConfig, newSegmentZKMetadata, committingSegmentDescriptor,
- committingSegmentZKMetadata, 1, Collections.emptyList());
+ committingSegmentZKMetadata, 1);
sizeThreshold = newSegmentZKMetadata.getSizeThresholdToFlushSegment();
assertEquals(sizeThreshold,
(long) (numRowsConsumed * SegmentFlushThresholdComputer.ROWS_MULTIPLIER_WHEN_TIME_THRESHOLD_HIT));
@@ -419,7 +351,7 @@ public class FlushThresholdUpdaterTest {
mockAutotuneStreamConfig(flushSegmentDesiredSizeBytes, flushThresholdTimeMillis, flushAutotuneInitialRows);
committingSegmentZKMetadata = getCommittingSegmentZKMetadata(creationTime, sizeThreshold, numRowsConsumed);
flushThresholdUpdater.updateFlushThreshold(streamConfig, newSegmentZKMetadata, committingSegmentDescriptor,
- committingSegmentZKMetadata, 1, Collections.emptyList());
+ committingSegmentZKMetadata, 1);
sizeThreshold = newSegmentZKMetadata.getSizeThresholdToFlushSegment();
assertTrue(sizeThreshold < numRowsConsumed);
}
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentFlushThresholdComputerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentFlushThresholdComputerTest.java
index 820b40a491..9bdc1656a0 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentFlushThresholdComputerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentFlushThresholdComputerTest.java
@@ -20,11 +20,8 @@ package org.apache.pinot.controller.helix.core.realtime.segment;
import java.time.Clock;
import java.time.ZoneId;
-import java.util.ArrayList;
-import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
-import org.apache.pinot.spi.stream.PartitionGroupMetadata;
import org.apache.pinot.spi.stream.StreamConfig;
import org.testng.annotations.Test;
@@ -35,6 +32,7 @@ import static org.testng.Assert.assertEquals;
public class SegmentFlushThresholdComputerTest {
+
@Test
public void testUseAutoTuneInitialRowsIfFirstSegmentInPartition() {
int autoTuneInitialRows = 1_000;
@@ -44,11 +42,8 @@ public class SegmentFlushThresholdComputerTest {
when(streamConfig.getFlushAutotuneInitialRows()).thenReturn(autoTuneInitialRows);
CommittingSegmentDescriptor committingSegmentDescriptor = mock(CommittingSegmentDescriptor.class);
- SegmentZKMetadata committingSegmentZKMetadata = null;
- List<PartitionGroupMetadata> partitionGroupMetadataList = new ArrayList<>();
- int threshold = computer.computeThreshold(streamConfig, committingSegmentDescriptor, committingSegmentZKMetadata,
- partitionGroupMetadataList, "newSegmentName");
+ int threshold = computer.computeThreshold(streamConfig, committingSegmentDescriptor, null, "newSegmentName");
assertEquals(threshold, autoTuneInitialRows);
}
@@ -64,11 +59,8 @@ public class SegmentFlushThresholdComputerTest {
when(streamConfig.getFlushThresholdSegmentSizeBytes()).thenReturn(segmentSizeBytes);
CommittingSegmentDescriptor committingSegmentDescriptor = mock(CommittingSegmentDescriptor.class);
- SegmentZKMetadata committingSegmentZKMetadata = null;
- List<PartitionGroupMetadata> partitionGroupMetadataList = new ArrayList<>();
- int threshold = computer.computeThreshold(streamConfig, committingSegmentDescriptor, committingSegmentZKMetadata,
- partitionGroupMetadataList, "newSegmentName");
+ int threshold = computer.computeThreshold(streamConfig, committingSegmentDescriptor, null, "newSegmentName");
// segmentSize * 1.5
// 20000 * 1.5
@@ -86,11 +78,8 @@ public class SegmentFlushThresholdComputerTest {
when(streamConfig.getFlushThresholdSegmentSizeBytes()).thenReturn(segmentSizeBytes);
CommittingSegmentDescriptor committingSegmentDescriptor = mock(CommittingSegmentDescriptor.class);
- SegmentZKMetadata committingSegmentZKMetadata = null;
- List<PartitionGroupMetadata> partitionGroupMetadataList = new ArrayList<>();
- int threshold = computer.computeThreshold(streamConfig, committingSegmentDescriptor, committingSegmentZKMetadata,
- partitionGroupMetadataList, "newSegmentName");
+ int threshold = computer.computeThreshold(streamConfig, committingSegmentDescriptor, null, "newSegmentName");
assertEquals(threshold, 10000);
}
@@ -110,9 +99,8 @@ public class SegmentFlushThresholdComputerTest {
SegmentZKMetadata committingSegmentZKMetadata = mock(SegmentZKMetadata.class);
when(committingSegmentZKMetadata.getSizeThresholdToFlushSegment()).thenReturn(segmentSizeThreshold);
- List<PartitionGroupMetadata> partitionGroupMetadataList = new ArrayList<>();
int threshold = computer.computeThreshold(streamConfig, committingSegmentDescriptor, committingSegmentZKMetadata,
- partitionGroupMetadataList, "newSegmentName");
+ "newSegmentName");
assertEquals(threshold, segmentSizeThreshold);
}
@@ -137,9 +125,8 @@ public class SegmentFlushThresholdComputerTest {
when(committingSegmentZKMetadata.getCreationTime()).thenReturn(
currentTime - MILLISECONDS.convert(1, TimeUnit.HOURS));
- List<PartitionGroupMetadata> partitionGroupMetadataList = new ArrayList<>();
int threshold = computer.computeThreshold(streamConfig, committingSegmentDescriptor, committingSegmentZKMetadata,
- partitionGroupMetadataList, "events3__0__0__20211222T1646Z");
+ "events3__0__0__20211222T1646Z");
// totalDocs * 1.1
// 10000 * 1.1
@@ -166,9 +153,8 @@ public class SegmentFlushThresholdComputerTest {
when(committingSegmentZKMetadata.getCreationTime()).thenReturn(
currentTime - MILLISECONDS.convert(2, TimeUnit.HOURS));
- List<PartitionGroupMetadata> partitionGroupMetadataList = new ArrayList<>();
int threshold = computer.computeThreshold(streamConfig, committingSegmentDescriptor, committingSegmentZKMetadata,
- partitionGroupMetadataList, "events3__0__0__20211222T1646Z");
+ "events3__0__0__20211222T1646Z");
// (totalDocs / 2) * 1.1
// (30000 / 2) * 1.1
@@ -190,9 +176,8 @@ public class SegmentFlushThresholdComputerTest {
when(committingSegmentZKMetadata.getTotalDocs()).thenReturn(30_000L);
when(committingSegmentZKMetadata.getSizeThresholdToFlushSegment()).thenReturn(20_000);
- List<PartitionGroupMetadata> partitionGroupMetadataList = new ArrayList<>();
int threshold = computer.computeThreshold(streamConfig, committingSegmentDescriptor, committingSegmentZKMetadata,
- partitionGroupMetadataList, "events3__0__0__20211222T1646Z");
+ "events3__0__0__20211222T1646Z");
// totalDocs / 2
// 30000 / 2
@@ -213,9 +198,8 @@ public class SegmentFlushThresholdComputerTest {
when(committingSegmentZKMetadata.getTotalDocs()).thenReturn(30_000L);
when(committingSegmentZKMetadata.getSizeThresholdToFlushSegment()).thenReturn(20_000);
- List<PartitionGroupMetadata> partitionGroupMetadataList = new ArrayList<>();
int threshold = computer.computeThreshold(streamConfig, committingSegmentDescriptor, committingSegmentZKMetadata,
- partitionGroupMetadataList, "events3__0__0__20211222T1646Z");
+ "events3__0__0__20211222T1646Z");
// totalDocs + (totalDocs / 2)
// 30000 + (30000 / 2)
@@ -236,9 +220,8 @@ public class SegmentFlushThresholdComputerTest {
when(committingSegmentZKMetadata.getTotalDocs()).thenReturn(30_000L);
when(committingSegmentZKMetadata.getSizeThresholdToFlushSegment()).thenReturn(20_000);
- List<PartitionGroupMetadata> partitionGroupMetadataList = new ArrayList<>();
int threshold = computer.computeThreshold(streamConfig, committingSegmentDescriptor, committingSegmentZKMetadata,
- partitionGroupMetadataList, "events3__0__0__20211222T1646Z");
+ "events3__0__0__20211222T1646Z");
// (totalDocs / segmentSize) * flushThresholdSegmentSize
// (30000 / 250000) * 300000
@@ -259,9 +242,8 @@ public class SegmentFlushThresholdComputerTest {
when(committingSegmentZKMetadata.getTotalDocs()).thenReturn(0L);
when(committingSegmentZKMetadata.getSizeThresholdToFlushSegment()).thenReturn(0);
- List<PartitionGroupMetadata> partitionGroupMetadataList = new ArrayList<>();
int threshold = computer.computeThreshold(streamConfig, committingSegmentDescriptor, committingSegmentZKMetadata,
- partitionGroupMetadataList, "events3__0__0__20211222T1646Z");
+ "events3__0__0__20211222T1646Z");
// max((totalDocs / segmentSize) * flushThresholdSegmentSize, 10000)
// max(0, 10000)
@@ -282,17 +264,15 @@ public class SegmentFlushThresholdComputerTest {
when(committingSegmentZKMetadata.getTotalDocs()).thenReturn(30_000L, 50_000L);
when(committingSegmentZKMetadata.getSizeThresholdToFlushSegment()).thenReturn(60_000);
- List<PartitionGroupMetadata> partitionGroupMetadataList = new ArrayList<>();
-
computer.computeThreshold(streamConfig, committingSegmentDescriptor, committingSegmentZKMetadata,
- partitionGroupMetadataList, "events3__0__0__20211222T1646Z");
+ "events3__0__0__20211222T1646Z");
// (totalDocs / segmentSize)
// (30000 / 200000)
assertEquals(computer.getLatestSegmentRowsToSizeRatio(), 0.15);
computer.computeThreshold(streamConfig, committingSegmentDescriptor, committingSegmentZKMetadata,
- partitionGroupMetadataList, "events3__0__0__20211222T1646Z");
+ "events3__0__0__20211222T1646Z");
// (0.1 * (totalDocs / segmentSize)) + (0.9 * lastRatio)
// (0.1 * (50000 / 200000)) + (0.9 * 0.15)
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMetadataProvider.java b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMetadataProvider.java
index 77abf423c9..c59c15a028 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMetadataProvider.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMetadataProvider.java
@@ -19,6 +19,9 @@
package org.apache.pinot.core.realtime.impl.fakestream;
import java.io.IOException;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import org.apache.pinot.spi.stream.OffsetCriteria;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamMetadataProvider;
@@ -40,6 +43,11 @@ public class FakeStreamMetadataProvider implements StreamMetadataProvider {
return _numPartitions;
}
+ @Override
+ public Set<Integer> fetchPartitionIds(long timeoutMillis) {
+ return IntStream.range(0, _numPartitions).boxed().collect(Collectors.toSet());
+ }
+
@Override
public StreamPartitionMsgOffset fetchStreamPartitionOffset(OffsetCriteria offsetCriteria, long timeoutMillis) {
if (offsetCriteria.isSmallest()) {
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
index 3e8928da12..1086c45d99 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
@@ -19,6 +19,7 @@
package org.apache.pinot.plugin.stream.kafka20;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
import java.io.IOException;
import java.time.Clock;
import java.time.Duration;
@@ -26,6 +27,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.commons.collections.CollectionUtils;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.PartitionInfo;
@@ -70,6 +72,23 @@ public class KafkaStreamMetadataProvider extends KafkaPartitionLevelConnectionHa
}
}
+ @Override
+ public Set<Integer> fetchPartitionIds(long timeoutMillis) {
+ try {
+ List<PartitionInfo> partitionInfos = _consumer.partitionsFor(_topic, Duration.ofMillis(timeoutMillis));
+ if (CollectionUtils.isEmpty(partitionInfos)) {
+ throw new RuntimeException(String.format("Failed to fetch partition information for topic: %s", _topic));
+ }
+ Set<Integer> partitionIds = Sets.newHashSetWithExpectedSize(partitionInfos.size());
+ for (PartitionInfo partitionInfo : partitionInfos) {
+ partitionIds.add(partitionInfo.partition());
+ }
+ return partitionIds;
+ } catch (TimeoutException e) {
+ throw new TransientConsumerException(e);
+ }
+ }
+
@Override
public StreamPartitionMsgOffset fetchStreamPartitionOffset(OffsetCriteria offsetCriteria, long timeoutMillis) {
Preconditions.checkNotNull(offsetCriteria);
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamMetadataProvider.java b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamMetadataProvider.java
index 788e5e5ed3..1413aa4334 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamMetadataProvider.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamMetadataProvider.java
@@ -19,9 +19,11 @@
package org.apache.pinot.plugin.stream.pulsar;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -31,6 +33,7 @@ import org.apache.pinot.spi.stream.PartitionGroupMetadata;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamMetadataProvider;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+import org.apache.pinot.spi.stream.TransientConsumerException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
@@ -64,12 +67,24 @@ public class PulsarStreamMetadataProvider extends PulsarPartitionLevelConnection
@Override
public int fetchPartitionCount(long timeoutMillis) {
try {
- return _pulsarClient.getPartitionsForTopic(_topic).get().size();
+ return _pulsarClient.getPartitionsForTopic(_topic).get(timeoutMillis, TimeUnit.MILLISECONDS).size();
+ } catch (TimeoutException e) {
+ throw new TransientConsumerException(e);
} catch (Exception e) {
- throw new RuntimeException("Cannot fetch partitions for topic: " + _topic, e);
+ throw new RuntimeException("Failed to fetch partitions for topic: " + _topic, e);
}
}
+ @Override
+ public Set<Integer> fetchPartitionIds(long timeoutMillis) {
+ int partitionCount = fetchPartitionCount(timeoutMillis);
+ Set<Integer> partitionIds = Sets.newHashSetWithExpectedSize(partitionCount);
+ for (int i = 0; i < partitionCount; i++) {
+ partitionIds.add(i);
+ }
+ return partitionIds;
+ }
+
/**
* Fetch the messageId and use it as offset.
* If offset criteria is smallest, the message id of earliest record in the partition is returned.
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
index 94abc3ca6d..78847c0627 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
@@ -24,6 +24,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.TimeoutException;
import org.apache.pinot.spi.annotations.InterfaceAudience;
import org.apache.pinot.spi.annotations.InterfaceStability;
@@ -43,6 +44,13 @@ public interface StreamMetadataProvider extends Closeable {
*/
int fetchPartitionCount(long timeoutMillis);
+ /**
+ * Fetches the partition ids for a topic given the stream configs.
+ */
+ default Set<Integer> fetchPartitionIds(long timeoutMillis) {
+ throw new UnsupportedOperationException();
+ }
+
/**
* Fetches the offset for a given partition and offset criteria
* @param offsetCriteria offset criteria to fetch{@link StreamPartitionMsgOffset}.
@@ -98,5 +106,6 @@ public interface StreamMetadataProvider extends Closeable {
return result;
}
- class UnknownLagState extends PartitionLagState { }
+ class UnknownLagState extends PartitionLagState {
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org