You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2022/01/24 16:27:18 UTC
[pinot] branch master updated: Smallest offset for new partitionGroups (#8053)
This is an automated email from the ASF dual-hosted git repository.
nehapawar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 4e4b1ea Smallest offset for new partitionGroups (#8053)
4e4b1ea is described below
commit 4e4b1eafc98aebac44b5fd198084e6b76d8aa91d
Author: Neha Pawar <ne...@gmail.com>
AuthorDate: Mon Jan 24 08:26:39 2022 -0800
Smallest offset for new partitionGroups (#8053)
---
.../realtime/PinotLLCRealtimeSegmentManager.java | 32 +++++++++-------------
.../org/apache/pinot/spi/stream/StreamConfig.java | 8 +++++-
2 files changed, 20 insertions(+), 20 deletions(-)
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index c8d49d3..5c7edba 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -317,7 +317,7 @@ public class PinotLLCRealtimeSegmentManager {
for (PartitionGroupMetadata partitionGroupMetadata : newPartitionGroupMetadataList) {
String segmentName =
setupNewPartitionGroup(tableConfig, streamConfig, partitionGroupMetadata, currentTimeMs, instancePartitions,
- numPartitionGroups, numReplicas, newPartitionGroupMetadataList, false);
+ numPartitionGroups, numReplicas, newPartitionGroupMetadataList);
updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null, segmentName, segmentAssignment,
instancePartitionsMap);
@@ -875,8 +875,12 @@ public class PinotLLCRealtimeSegmentManager {
if (idealState.isEnabled()) {
List<PartitionGroupConsumptionStatus> currentPartitionGroupConsumptionStatusList =
getPartitionGroupConsumptionStatusList(idealState, streamConfig);
+ // Read the smallest offset when a new partition is detected
+ OffsetCriteria originalOffsetCriteria = streamConfig.getOffsetCriteria();
+ streamConfig.setOffsetCriteria(OffsetCriteria.SMALLEST_OFFSET_CRITERIA);
List<PartitionGroupMetadata> newPartitionGroupMetadataList =
getNewPartitionGroupMetadataList(streamConfig, currentPartitionGroupConsumptionStatusList);
+ streamConfig.setOffsetCriteria(originalOffsetCriteria);
return ensureAllPartitionsConsuming(tableConfig, streamConfig, idealState, newPartitionGroupMetadataList);
} else {
LOGGER.info("Skipping LLC segments validation for disabled table: {}", realtimeTableName);
@@ -1196,7 +1200,7 @@ public class PinotLLCRealtimeSegmentManager {
if (!latestSegmentZKMetadataMap.containsKey(partitionGroupId)) {
String newSegmentName =
setupNewPartitionGroup(tableConfig, streamConfig, partitionGroupMetadata, currentTimeMs, instancePartitions,
- numPartitions, numReplicas, newPartitionGroupMetadataList, true);
+ numPartitions, numReplicas, newPartitionGroupMetadataList);
updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null, newSegmentName, segmentAssignment,
instancePartitionsMap);
}
@@ -1206,14 +1210,11 @@ public class PinotLLCRealtimeSegmentManager {
}
private StreamPartitionMsgOffset getPartitionGroupSmallestOffset(StreamConfig streamConfig, int partitionGroupId) {
- Map<String, String> streamConfigMapWithSmallestOffsetCriteria = new HashMap<>(streamConfig.getStreamConfigsMap());
- streamConfigMapWithSmallestOffsetCriteria.put(StreamConfigProperties
- .constructStreamProperty(streamConfig.getType(), StreamConfigProperties.STREAM_CONSUMER_OFFSET_CRITERIA),
- OffsetCriteria.SMALLEST_OFFSET_CRITERIA.getOffsetString());
- StreamConfig smallestOffsetCriteriaStreamConfig =
- new StreamConfig(streamConfig.getTableNameWithType(), streamConfigMapWithSmallestOffsetCriteria);
+ OffsetCriteria originalOffsetCriteria = streamConfig.getOffsetCriteria();
+ streamConfig.setOffsetCriteria(OffsetCriteria.SMALLEST_OFFSET_CRITERIA);
List<PartitionGroupMetadata> smallestOffsetCriteriaPartitionGroupMetadata =
- getNewPartitionGroupMetadataList(smallestOffsetCriteriaStreamConfig, Collections.emptyList());
+ getNewPartitionGroupMetadataList(streamConfig, Collections.emptyList());
+ streamConfig.setOffsetCriteria(originalOffsetCriteria);
StreamPartitionMsgOffset partitionStartOffset = null;
for (PartitionGroupMetadata info : smallestOffsetCriteriaPartitionGroupMetadata) {
if (info.getPartitionGroupId() == partitionGroupId) {
@@ -1235,16 +1236,10 @@ public class PinotLLCRealtimeSegmentManager {
*/
private String setupNewPartitionGroup(TableConfig tableConfig, PartitionLevelStreamConfig streamConfig,
PartitionGroupMetadata partitionGroupMetadata, long creationTimeMs, InstancePartitions instancePartitions,
- int numPartitionGroups, int numReplicas, List<PartitionGroupMetadata> partitionGroupMetadataList,
- boolean isLiveTable) {
+ int numPartitionGroups, int numReplicas, List<PartitionGroupMetadata> partitionGroupMetadataList) {
String realtimeTableName = tableConfig.getTableName();
int partitionGroupId = partitionGroupMetadata.getPartitionGroupId();
- StreamPartitionMsgOffset startOffset;
- if (isLiveTable) {
- startOffset = getPartitionGroupSmallestOffset(streamConfig, partitionGroupId);
- } else {
- startOffset = partitionGroupMetadata.getStartOffset();
- }
+ String startOffset = partitionGroupMetadata.getStartOffset().toString();
LOGGER.info("Setting up new partition group: {} for table: {}", partitionGroupId, realtimeTableName);
String rawTableName = TableNameBuilder.extractRawTableName(realtimeTableName);
@@ -1252,8 +1247,7 @@ public class PinotLLCRealtimeSegmentManager {
new LLCSegmentName(rawTableName, partitionGroupId, STARTING_SEQUENCE_NUMBER, creationTimeMs);
String newSegmentName = newLLCSegmentName.getSegmentName();
- CommittingSegmentDescriptor committingSegmentDescriptor = new CommittingSegmentDescriptor(null,
- startOffset.toString(), 0);
+ CommittingSegmentDescriptor committingSegmentDescriptor = new CommittingSegmentDescriptor(null, startOffset, 0);
createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName, creationTimeMs,
committingSegmentDescriptor, null, instancePartitions, numPartitionGroups, numReplicas,
partitionGroupMetadataList);
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java
index 34e3a62..73fb3c6 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java
@@ -63,7 +63,6 @@ public class StreamConfig {
private final String _tableNameWithType;
private final List<ConsumerType> _consumerTypes = new ArrayList<>();
private final String _consumerFactoryClassName;
- private final OffsetCriteria _offsetCriteria;
private final String _decoderClass;
private final Map<String, String> _decoderProperties = new HashMap<>();
@@ -79,6 +78,9 @@ public class StreamConfig {
private final Map<String, String> _streamConfigMap = new HashMap<>();
+ // Allow overriding it to use different offset criteria
+ private OffsetCriteria _offsetCriteria;
+
/**
* Initializes a StreamConfig using the map of stream configs from the table config
*/
@@ -284,6 +286,10 @@ public class StreamConfig {
return _offsetCriteria;
}
+ public void setOffsetCriteria(OffsetCriteria offsetCriteria) {
+ _offsetCriteria = offsetCriteria;
+ }
+
public String getDecoderClass() {
return _decoderClass;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org