You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2022/01/24 16:27:18 UTC

[pinot] branch master updated: Smallest offset for new partitionGroups (#8053)

This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 4e4b1ea  Smallest offset for new partitionGroups (#8053)
4e4b1ea is described below

commit 4e4b1eafc98aebac44b5fd198084e6b76d8aa91d
Author: Neha Pawar <ne...@gmail.com>
AuthorDate: Mon Jan 24 08:26:39 2022 -0800

    Smallest offset for new partitionGroups (#8053)
---
 .../realtime/PinotLLCRealtimeSegmentManager.java   | 32 +++++++++-------------
 .../org/apache/pinot/spi/stream/StreamConfig.java  |  8 +++++-
 2 files changed, 20 insertions(+), 20 deletions(-)

diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index c8d49d3..5c7edba 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -317,7 +317,7 @@ public class PinotLLCRealtimeSegmentManager {
     for (PartitionGroupMetadata partitionGroupMetadata : newPartitionGroupMetadataList) {
       String segmentName =
           setupNewPartitionGroup(tableConfig, streamConfig, partitionGroupMetadata, currentTimeMs, instancePartitions,
-              numPartitionGroups, numReplicas, newPartitionGroupMetadataList, false);
+              numPartitionGroups, numReplicas, newPartitionGroupMetadataList);
 
       updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null, segmentName, segmentAssignment,
           instancePartitionsMap);
@@ -875,8 +875,12 @@ public class PinotLLCRealtimeSegmentManager {
       if (idealState.isEnabled()) {
         List<PartitionGroupConsumptionStatus> currentPartitionGroupConsumptionStatusList =
             getPartitionGroupConsumptionStatusList(idealState, streamConfig);
+        // Read the smallest offset when a new partition is detected
+        OffsetCriteria originalOffsetCriteria = streamConfig.getOffsetCriteria();
+        streamConfig.setOffsetCriteria(OffsetCriteria.SMALLEST_OFFSET_CRITERIA);
         List<PartitionGroupMetadata> newPartitionGroupMetadataList =
             getNewPartitionGroupMetadataList(streamConfig, currentPartitionGroupConsumptionStatusList);
+        streamConfig.setOffsetCriteria(originalOffsetCriteria);
         return ensureAllPartitionsConsuming(tableConfig, streamConfig, idealState, newPartitionGroupMetadataList);
       } else {
         LOGGER.info("Skipping LLC segments validation for disabled table: {}", realtimeTableName);
@@ -1196,7 +1200,7 @@ public class PinotLLCRealtimeSegmentManager {
       if (!latestSegmentZKMetadataMap.containsKey(partitionGroupId)) {
         String newSegmentName =
             setupNewPartitionGroup(tableConfig, streamConfig, partitionGroupMetadata, currentTimeMs, instancePartitions,
-                numPartitions, numReplicas, newPartitionGroupMetadataList, true);
+                numPartitions, numReplicas, newPartitionGroupMetadataList);
         updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null, newSegmentName, segmentAssignment,
             instancePartitionsMap);
       }
@@ -1206,14 +1210,11 @@ public class PinotLLCRealtimeSegmentManager {
   }
 
   private StreamPartitionMsgOffset getPartitionGroupSmallestOffset(StreamConfig streamConfig, int partitionGroupId) {
-    Map<String, String> streamConfigMapWithSmallestOffsetCriteria = new HashMap<>(streamConfig.getStreamConfigsMap());
-    streamConfigMapWithSmallestOffsetCriteria.put(StreamConfigProperties
-            .constructStreamProperty(streamConfig.getType(), StreamConfigProperties.STREAM_CONSUMER_OFFSET_CRITERIA),
-        OffsetCriteria.SMALLEST_OFFSET_CRITERIA.getOffsetString());
-    StreamConfig smallestOffsetCriteriaStreamConfig =
-        new StreamConfig(streamConfig.getTableNameWithType(), streamConfigMapWithSmallestOffsetCriteria);
+    OffsetCriteria originalOffsetCriteria = streamConfig.getOffsetCriteria();
+    streamConfig.setOffsetCriteria(OffsetCriteria.SMALLEST_OFFSET_CRITERIA);
     List<PartitionGroupMetadata> smallestOffsetCriteriaPartitionGroupMetadata =
-        getNewPartitionGroupMetadataList(smallestOffsetCriteriaStreamConfig, Collections.emptyList());
+        getNewPartitionGroupMetadataList(streamConfig, Collections.emptyList());
+    streamConfig.setOffsetCriteria(originalOffsetCriteria);
     StreamPartitionMsgOffset partitionStartOffset = null;
     for (PartitionGroupMetadata info : smallestOffsetCriteriaPartitionGroupMetadata) {
       if (info.getPartitionGroupId() == partitionGroupId) {
@@ -1235,16 +1236,10 @@ public class PinotLLCRealtimeSegmentManager {
    */
   private String setupNewPartitionGroup(TableConfig tableConfig, PartitionLevelStreamConfig streamConfig,
       PartitionGroupMetadata partitionGroupMetadata, long creationTimeMs, InstancePartitions instancePartitions,
-      int numPartitionGroups, int numReplicas, List<PartitionGroupMetadata> partitionGroupMetadataList,
-      boolean isLiveTable) {
+      int numPartitionGroups, int numReplicas, List<PartitionGroupMetadata> partitionGroupMetadataList) {
     String realtimeTableName = tableConfig.getTableName();
     int partitionGroupId = partitionGroupMetadata.getPartitionGroupId();
-    StreamPartitionMsgOffset startOffset;
-    if (isLiveTable) {
-      startOffset = getPartitionGroupSmallestOffset(streamConfig, partitionGroupId);
-    } else {
-      startOffset = partitionGroupMetadata.getStartOffset();
-    }
+    String startOffset = partitionGroupMetadata.getStartOffset().toString();
     LOGGER.info("Setting up new partition group: {} for table: {}", partitionGroupId, realtimeTableName);
 
     String rawTableName = TableNameBuilder.extractRawTableName(realtimeTableName);
@@ -1252,8 +1247,7 @@ public class PinotLLCRealtimeSegmentManager {
         new LLCSegmentName(rawTableName, partitionGroupId, STARTING_SEQUENCE_NUMBER, creationTimeMs);
     String newSegmentName = newLLCSegmentName.getSegmentName();
 
-    CommittingSegmentDescriptor committingSegmentDescriptor = new CommittingSegmentDescriptor(null,
-        startOffset.toString(), 0);
+    CommittingSegmentDescriptor committingSegmentDescriptor = new CommittingSegmentDescriptor(null, startOffset, 0);
     createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName, creationTimeMs,
         committingSegmentDescriptor, null, instancePartitions, numPartitionGroups, numReplicas,
         partitionGroupMetadataList);
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java
index 34e3a62..73fb3c6 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java
@@ -63,7 +63,6 @@ public class StreamConfig {
   private final String _tableNameWithType;
   private final List<ConsumerType> _consumerTypes = new ArrayList<>();
   private final String _consumerFactoryClassName;
-  private final OffsetCriteria _offsetCriteria;
   private final String _decoderClass;
   private final Map<String, String> _decoderProperties = new HashMap<>();
 
@@ -79,6 +78,9 @@ public class StreamConfig {
 
   private final Map<String, String> _streamConfigMap = new HashMap<>();
 
+  // Allow overriding it to use different offset criteria
+  private OffsetCriteria _offsetCriteria;
+
   /**
    * Initializes a StreamConfig using the map of stream configs from the table config
    */
@@ -284,6 +286,10 @@ public class StreamConfig {
     return _offsetCriteria;
   }
 
+  public void setOffsetCriteria(OffsetCriteria offsetCriteria) {
+    _offsetCriteria = offsetCriteria;
+  }
+
   public String getDecoderClass() {
     return _decoderClass;
   }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org