You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2021/02/02 02:43:34 UTC

[incubator-pinot] 44/47: End-of-shard as end criteria AND consume shards in order

This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch sharded_consumer_type_support_with_kinesis
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 751e21205fa53b8c6db01c8ba26aa8b3d5ace424
Author: Neha Pawar <ne...@gmail.com>
AuthorDate: Fri Jan 15 17:38:28 2021 -0800

    End-of-shard as end criteria AND consume shards in order
---
 .../segment/LLCRealtimeSegmentZKMetadata.java      |   5 -
 .../protocols/SegmentCompletionProtocol.java       |   2 +
 .../realtime/PinotLLCRealtimeSegmentManager.java   |  31 +++---
 .../RealtimeSegmentValidationManager.java          |   2 +-
 .../PinotLLCRealtimeSegmentManagerTest.java        |   2 +-
 .../realtime/LLRealtimeSegmentDataManager.java     |  35 ++++---
 .../kinesis/KinesisStreamMetadataProvider.java     | 106 ++++++++++++---------
 .../pinot/spi/stream/PartitionGroupInfo.java       |   6 +-
 .../pinot/spi/stream/PartitionGroupMetadata.java   |  16 ++--
 .../pinot/spi/stream/StreamMetadataProvider.java   |   2 +-
 10 files changed, 119 insertions(+), 88 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/LLCRealtimeSegmentZKMetadata.java b/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/LLCRealtimeSegmentZKMetadata.java
index b8b8d95..7cb19a7 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/LLCRealtimeSegmentZKMetadata.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/LLCRealtimeSegmentZKMetadata.java
@@ -87,11 +87,6 @@ public class LLCRealtimeSegmentZKMetadata extends RealtimeSegmentZKMetadata {
   public ZNRecord toZNRecord() {
     ZNRecord znRecord = super.toZNRecord();
     znRecord.setSimpleField(START_OFFSET, _startOffset);
-    if (_endOffset == null) {
-      // TODO Issue 5359 Keep this until all components have upgraded to a version that can handle _offset being null
-      // For backward compatibility until all components have been upgraded to deal with null value for _endOffset
-      _endOffset = Long.toString(Long.MAX_VALUE);
-    }
     znRecord.setSimpleField(END_OFFSET, _endOffset);
     znRecord.setIntField(NUM_REPLICAS, _numReplicas);
     znRecord.setSimpleField(DOWNLOAD_URL, _downloadUrl);
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java b/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java
index dd1330d..6dcbda2 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java
@@ -138,6 +138,8 @@ public class SegmentCompletionProtocol {
 
   public static final String REASON_ROW_LIMIT = "rowLimit";  // Stop reason sent by server as max num rows reached
   public static final String REASON_TIME_LIMIT = "timeLimit";  // Stop reason sent by server as max time reached
+  public static final String REASON_END_OF_PARTITION_GROUP = "endOfPartitionGroup";
+      // Stop reason sent by server as end of partitionGroup reached
 
   // Canned responses
   public static final Response RESP_NOT_LEADER =
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 27d487b..72caaf4 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -78,6 +78,7 @@ import org.apache.pinot.spi.filesystem.PinotFS;
 import org.apache.pinot.spi.filesystem.PinotFSFactory;
 import org.apache.pinot.spi.stream.Checkpoint;
 import org.apache.pinot.spi.stream.OffsetCriteria;
+import org.apache.pinot.spi.stream.PartitionGroupCheckpointFactory;
 import org.apache.pinot.spi.stream.PartitionGroupInfo;
 import org.apache.pinot.spi.stream.PartitionGroupMetadata;
 import org.apache.pinot.spi.stream.PartitionLevelStreamConfig;
@@ -166,7 +167,8 @@ public class PinotLLCRealtimeSegmentManager {
    * Using the ideal state and segment metadata, return a list of {@link PartitionGroupMetadata}
    * for latest segment of each partition group
    */
-  public List<PartitionGroupMetadata> getCurrentPartitionGroupMetadataList(IdealState idealState) {
+  public List<PartitionGroupMetadata> getCurrentPartitionGroupMetadataList(IdealState idealState,
+      StreamConfig streamConfig) {
     List<PartitionGroupMetadata> partitionGroupMetadataList = new ArrayList<>();
 
     // From all segment names in the ideal state, find unique partition group ids and their latest segment
@@ -185,6 +187,8 @@ public class PinotLLCRealtimeSegmentManager {
     }
 
     // Create a PartitionGroupMetadata for each latest segment
+    PartitionGroupCheckpointFactory checkpointFactory =
+        StreamConsumerFactoryProvider.create(streamConfig).createStreamMsgOffsetFactory();
     for (Map.Entry<Integer, LLCSegmentName> entry : partitionGroupIdToLatestSegment.entrySet()) {
       int partitionGroupId = entry.getKey();
       LLCSegmentName llcSegmentName = entry.getValue();
@@ -195,7 +199,9 @@ public class PinotLLCRealtimeSegmentManager {
           (LLCRealtimeSegmentZKMetadata) realtimeSegmentZKMetadata;
       PartitionGroupMetadata partitionGroupMetadata =
           new PartitionGroupMetadata(partitionGroupId, llcSegmentName.getSequenceNumber(),
-              llRealtimeSegmentZKMetadata.getStartOffset(), llRealtimeSegmentZKMetadata.getEndOffset(),
+              checkpointFactory.create(llRealtimeSegmentZKMetadata.getStartOffset()),
+              llRealtimeSegmentZKMetadata.getEndOffset() == null ? null
+                  : checkpointFactory.create(llRealtimeSegmentZKMetadata.getEndOffset()),
               llRealtimeSegmentZKMetadata.getStatus().toString());
       partitionGroupMetadataList.add(partitionGroupMetadata);
     }
@@ -498,9 +504,10 @@ public class PinotLLCRealtimeSegmentManager {
     // Example: Say we currently were consuming from 2 shards A, B. Of those, A is the one committing.
 
     // Get current partition groups - this gives current state of latest segments for each partition [A - DONE], [B - IN_PROGRESS]
-    List<PartitionGroupMetadata> currentPartitionGroupMetadataList = getCurrentPartitionGroupMetadataList(idealState);
     PartitionLevelStreamConfig streamConfig = new PartitionLevelStreamConfig(tableConfig.getTableName(),
         IngestionConfigUtils.getStreamConfigMap(tableConfig));
+    List<PartitionGroupMetadata> currentPartitionGroupMetadataList =
+        getCurrentPartitionGroupMetadataList(idealState, streamConfig);
 
     // Find new partition groups [A],[B],[C],[D] (assume A split into C D)
     // If segment has consumed all of A, we will receive B,C,D
@@ -610,9 +617,7 @@ public class PinotLLCRealtimeSegmentManager {
       int numPartitions, int numReplicas) {
     String realtimeTableName = tableConfig.getTableName();
     String segmentName = newLLCSegmentName.getSegmentName();
-    StreamPartitionMsgOffsetFactory offsetFactory =
-        StreamConsumerFactoryProvider.create(streamConfig).createStreamMsgOffsetFactory();
-    StreamPartitionMsgOffset startOffset = offsetFactory.create(committingSegmentDescriptor.getNextOffset());
+    String startOffset = committingSegmentDescriptor.getNextOffset();
     LOGGER
         .info("Creating segment ZK metadata for new CONSUMING segment: {} with start offset: {} and creation time: {}",
             segmentName, startOffset, creationTimeMs);
@@ -621,7 +626,7 @@ public class PinotLLCRealtimeSegmentManager {
     newSegmentZKMetadata.setTableName(realtimeTableName);
     newSegmentZKMetadata.setSegmentName(segmentName);
     newSegmentZKMetadata.setCreationTime(creationTimeMs);
-    newSegmentZKMetadata.setStartOffset(startOffset.toString());
+    newSegmentZKMetadata.setStartOffset(startOffset);
     // Leave maxOffset as null.
     newSegmentZKMetadata.setNumReplicas(numReplicas);
     newSegmentZKMetadata.setStatus(Status.IN_PROGRESS);
@@ -808,7 +813,7 @@ public class PinotLLCRealtimeSegmentManager {
       assert idealState != null;
       if (idealState.isEnabled()) {
         List<PartitionGroupMetadata> currentPartitionGroupMetadataList =
-            getCurrentPartitionGroupMetadataList(idealState);
+            getCurrentPartitionGroupMetadataList(idealState, streamConfig);
         List<PartitionGroupInfo> newPartitionGroupInfoList =
             getPartitionGroupInfoList(streamConfig, currentPartitionGroupMetadataList);
         return ensureAllPartitionsConsuming(tableConfig, streamConfig, idealState, newPartitionGroupInfoList);
@@ -1121,7 +1126,7 @@ public class PinotLLCRealtimeSegmentManager {
     return idealState;
   }
 
-  private StreamPartitionMsgOffset getPartitionGroupStartCheckpoint(StreamConfig streamConfig, int partitionGroupId) {
+  private Checkpoint getPartitionGroupStartCheckpoint(StreamConfig streamConfig, int partitionGroupId) {
     Map<String, String> streamConfigMapWithSmallestOffsetCriteria = new HashMap<>(streamConfig.getStreamConfigsMap());
     streamConfigMapWithSmallestOffsetCriteria.put(StreamConfigProperties
             .constructStreamProperty(streamConfig.getType(), StreamConfigProperties.STREAM_CONSUMER_OFFSET_CRITERIA),
@@ -1130,12 +1135,10 @@ public class PinotLLCRealtimeSegmentManager {
         new StreamConfig(streamConfig.getTableNameWithType(), streamConfigMapWithSmallestOffsetCriteria);
     List<PartitionGroupInfo> smallestOffsetCriteriaPartitionGroupInfo =
         getPartitionGroupInfoList(smallestOffsetCriteriaStreamConfig, Collections.emptyList());
-    StreamPartitionMsgOffset partitionStartOffset = null;
+    Checkpoint partitionStartOffset = null;
     for (PartitionGroupInfo info : smallestOffsetCriteriaPartitionGroupInfo) {
       if (info.getPartitionGroupId() == partitionGroupId) {
-        StreamPartitionMsgOffsetFactory factory =
-            StreamConsumerFactoryProvider.create(streamConfig).createStreamMsgOffsetFactory();
-        partitionStartOffset = factory.create(info.getStartCheckpoint());
+        partitionStartOffset = info.getStartCheckpoint();
         break;
       }
     }
@@ -1155,7 +1158,7 @@ public class PinotLLCRealtimeSegmentManager {
       long creationTimeMs, InstancePartitions instancePartitions, int numPartitionGroups, int numReplicas) {
     String realtimeTableName = tableConfig.getTableName();
     int partitionGroupId = partitionGroupInfo.getPartitionGroupId();
-    String startCheckpoint = partitionGroupInfo.getStartCheckpoint();
+    String startCheckpoint = partitionGroupInfo.getStartCheckpoint().toString();
     LOGGER.info("Setting up new partition group: {} for table: {}", partitionGroupId, realtimeTableName);
 
     String rawTableName = TableNameBuilder.extractRawTableName(realtimeTableName);
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
index 96604dd..d611433 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
@@ -58,7 +58,7 @@ public class RealtimeSegmentValidationManager extends ControllerPeriodicTask<Rea
       LeadControllerManager leadControllerManager, PinotLLCRealtimeSegmentManager llcRealtimeSegmentManager,
       ValidationMetrics validationMetrics, ControllerMetrics controllerMetrics) {
     super("RealtimeSegmentValidationManager", config.getRealtimeSegmentValidationFrequencyInSeconds(),
-        6000, pinotHelixResourceManager,
+        config.getRealtimeSegmentValidationManagerInitialDelaySeconds(), pinotHelixResourceManager,
         leadControllerManager, controllerMetrics);
     _llcRealtimeSegmentManager = llcRealtimeSegmentManager;
     _validationMetrics = validationMetrics;
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index c19a845..ecbf2ef 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -920,7 +920,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
     List<PartitionGroupInfo> getPartitionGroupInfoList(StreamConfig streamConfig,
         List<PartitionGroupMetadata> currentPartitionGroupMetadataList) {
       return IntStream.range(0, _numPartitions).mapToObj(i -> new PartitionGroupInfo(i,
-          PARTITION_OFFSET.toString()))
+          PARTITION_OFFSET))
           .collect(Collectors.toList());
     }
 
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index e6e1402..1083757 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -160,7 +160,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
         Checkpoint offset, long buildTimeMillis, long waitTimeMillis, long segmentSizeBytes) {
       _segmentTarFile = segmentTarFile;
       _metadataFileMap = metadataFileMap;
-      _offset = _streamPartitionMsgOffsetFactory.create(offset);
+      _offset = _checkpointFactory.create(offset);
       _buildTimeMillis = buildTimeMillis;
       _waitTimeMillis = waitTimeMillis;
       _segmentSizeBytes = segmentSizeBytes;
@@ -235,11 +235,12 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
   private final SegmentVersion _segmentVersion;
   private final SegmentBuildTimeLeaseExtender _leaseExtender;
   private SegmentBuildDescriptor _segmentBuildDescriptor;
-  private StreamConsumerFactory _streamConsumerFactory;
-  private PartitionGroupCheckpointFactory _streamPartitionMsgOffsetFactory;
+  private final StreamConsumerFactory _streamConsumerFactory;
+  private final PartitionGroupCheckpointFactory _checkpointFactory;
 
   // Segment end criteria
   private volatile long _consumeEndTime = 0;
+  private boolean _endOfPartitionGroup = false;
   private Checkpoint _finalOffset; // Used when we want to catch up to this one
   private volatile boolean _shouldStop = false;
 
@@ -307,6 +308,14 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
               _numRowsIndexed, _numRowsConsumed, _segmentMaxRowCount);
           _stopReason = SegmentCompletionProtocol.REASON_ROW_LIMIT;
           return true;
+        } else if (_endOfPartitionGroup) {
+          segmentLogger.info(
+              "Stopping consumption due to end of partitionGroup reached nRows={} numRowsIndexed={}, numRowsConsumed={}",
+              _numRowsIndexed, _numRowsConsumed, _segmentMaxRowCount);
+          _stopReason = SegmentCompletionProtocol.REASON_END_OF_PARTITION_GROUP;
+          // fixme: Handle creating a segment with 0 rows.
+          //  Happens if endOfPartitionGroup reached but no rows were consumed
+          return true;
         }
         return false;
 
@@ -370,7 +379,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
     final long idlePipeSleepTimeMillis = 100;
     final long maxIdleCountBeforeStatUpdate = (3 * 60 * 1000) / (idlePipeSleepTimeMillis + _partitionLevelStreamConfig
         .getFetchTimeoutMillis());  // 3 minute count
-    Checkpoint lastUpdatedOffset = _streamPartitionMsgOffsetFactory
+    Checkpoint lastUpdatedOffset = _checkpointFactory
         .create(_currentOffset);  // so that we always update the metric when we enter this method.
     long consecutiveIdleCount = 0;
     // At this point, we know that we can potentially move the offset, so the old saved segment file is not valid
@@ -385,6 +394,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
       try {
         messageBatch = _partitionGroupConsumer
             .fetchMessages(_currentOffset, null, _partitionLevelStreamConfig.getFetchTimeoutMillis());
+        _endOfPartitionGroup = messageBatch.isEndOfPartitionGroup();
         consecutiveErrorCount = 0;
       } catch (TimeoutException e) {
         handleTransientStreamErrors(e);
@@ -411,7 +421,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
 //        _serverMetrics.setValueOfTableGauge(_metricKeyName, ServerGauge.HIGHEST_KAFKA_OFFSET_CONSUMED, _currentOffset.getOffset());
 //        _serverMetrics.setValueOfTableGauge(_metricKeyName, ServerGauge.HIGHEST_STREAM_OFFSET_CONSUMED, _currentOffset.getOffset());
         _serverMetrics.setValueOfTableGauge(_metricKeyName, ServerGauge.LLC_PARTITION_CONSUMING, 1);
-        lastUpdatedOffset = _streamPartitionMsgOffsetFactory.create(_currentOffset);
+        lastUpdatedOffset = _checkpointFactory.create(_currentOffset);
       } else {
         // We did not consume any rows. Update the partition-consuming metric only if we have been idling for a long time.
         // Create a new stream consumer wrapper, in case we are stuck on something.
@@ -669,10 +679,10 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
   @VisibleForTesting
   protected Checkpoint extractOffset(SegmentCompletionProtocol.Response response) {
     if (response.getStreamPartitionMsgOffset() != null) {
-      return _streamPartitionMsgOffsetFactory.create(response.getStreamPartitionMsgOffset());
+      return _checkpointFactory.create(response.getStreamPartitionMsgOffset());
     } else {
       // TODO Issue 5359 Remove this once the protocol is upgraded on server and controller
-      return _streamPartitionMsgOffsetFactory.create(Long.toString(response.getOffset()));
+      return _checkpointFactory.create(Long.toString(response.getOffset()));
     }
   }
 
@@ -967,7 +977,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
       // Remove the segment file before we do anything else.
       removeSegmentFile();
       _leaseExtender.removeSegment(_segmentNameStr);
-      final Checkpoint endOffset = _streamPartitionMsgOffsetFactory.create(llcMetadata.getEndOffset());
+      final Checkpoint endOffset = _checkpointFactory.create(llcMetadata.getEndOffset());
       segmentLogger
           .info("State: {}, transitioning from CONSUMING to ONLINE (startOffset: {}, endOffset: {})", _state.toString(),
               _startOffset, endOffset);
@@ -1127,14 +1137,15 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
     _partitionLevelStreamConfig =
         new PartitionLevelStreamConfig(_tableNameWithType, IngestionConfigUtils.getStreamConfigMap(_tableConfig));
     _streamConsumerFactory = StreamConsumerFactoryProvider.create(_partitionLevelStreamConfig);
-    _streamPartitionMsgOffsetFactory =
+    _checkpointFactory =
         StreamConsumerFactoryProvider.create(_partitionLevelStreamConfig).createStreamMsgOffsetFactory();
     _streamTopic = _partitionLevelStreamConfig.getTopicName();
     _segmentNameStr = _segmentZKMetadata.getSegmentName();
     _llcSegmentName = llcSegmentName;
     _partitionGroupId = _llcSegmentName.getPartitionGroupId();
     _partitionGroupMetadata = new PartitionGroupMetadata(_partitionGroupId, _llcSegmentName.getSequenceNumber(),
-        _segmentZKMetadata.getStartOffset(), _segmentZKMetadata.getEndOffset(),
+        _checkpointFactory.create(_segmentZKMetadata.getStartOffset()),
+        _segmentZKMetadata.getEndOffset() == null ? null : _checkpointFactory.create(_segmentZKMetadata.getEndOffset()),
         _segmentZKMetadata.getStatus().toString());
     _partitionGroupConsumerSemaphore = partitionGroupConsumerSemaphore;
     _acquiredConsumerSemaphore = new AtomicBoolean(false);
@@ -1279,8 +1290,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
     }
 
     _realtimeSegment = new MutableSegmentImpl(realtimeSegmentConfigBuilder.build(), serverMetrics);
-    _startOffset = _streamPartitionMsgOffsetFactory.create(_segmentZKMetadata.getStartOffset());
-    _currentOffset = _streamPartitionMsgOffsetFactory.create(_startOffset);
+    _startOffset = _checkpointFactory.create(_segmentZKMetadata.getStartOffset());
+    _currentOffset = _checkpointFactory.create(_startOffset);
     _resourceTmpDir = new File(resourceDataDir, "_tmp");
     if (!_resourceTmpDir.exists()) {
       _resourceTmpDir.mkdirs();
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java
index b22bbe4..42150a3 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java
@@ -21,12 +21,15 @@ package org.apache.pinot.plugin.stream.kinesis;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
 import javax.annotation.Nonnull;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.spi.stream.Checkpoint;
 import org.apache.pinot.spi.stream.MessageBatch;
 import org.apache.pinot.spi.stream.OffsetCriteria;
 import org.apache.pinot.spi.stream.PartitionGroupConsumer;
@@ -69,7 +72,7 @@ public class KinesisStreamMetadataProvider implements StreamMetadataProvider {
   /**
    * This call returns all active shards, taking into account the consumption status for those shards.
    * PartitionGroupInfo is returned for a shard if:
-   * 1. It is a branch new shard i.e. no partitionGroupMetadata was found for it in the current list
+   * 1. It is a branch new shard AND its parent has been consumed completely
    * 2. It is still being actively consumed from i.e. the consuming partition has not reached the end of the shard
    */
   @Override
@@ -77,54 +80,57 @@ public class KinesisStreamMetadataProvider implements StreamMetadataProvider {
       List<PartitionGroupMetadata> currentPartitionGroupsMetadata, int timeoutMillis)
       throws IOException, TimeoutException {
 
-    Map<Integer, PartitionGroupMetadata> currentPartitionGroupMap = currentPartitionGroupsMetadata.stream()
-        .collect(Collectors.toMap(PartitionGroupMetadata::getPartitionGroupId, p -> p));
-
     List<PartitionGroupInfo> newPartitionGroupInfos = new ArrayList<>();
-    List<Shard> shards = _kinesisConnectionHandler.getShards();
-    for (Shard shard : shards) {
-      KinesisCheckpoint newStartCheckpoint;
-
-      String shardId = shard.shardId();
-      int partitionGroupId = getPartitionGroupIdFromShardId(shardId);
-      PartitionGroupMetadata currentPartitionGroupMetadata = currentPartitionGroupMap.get(partitionGroupId);
-
-      if (currentPartitionGroupMetadata != null) { // existing shard
-        KinesisCheckpoint currentEndCheckpoint = null;
-        try {
-          currentEndCheckpoint = new KinesisCheckpoint(currentPartitionGroupMetadata.getEndCheckpoint());
-        } catch (Exception e) {
-          // ignore. No end checkpoint yet for IN_PROGRESS segment
-        }
-        if (currentEndCheckpoint != null) { // end checkpoint available i.e. committing/committed segment
-          String endingSequenceNumber = shard.sequenceNumberRange().endingSequenceNumber();
-          if (endingSequenceNumber != null) { // shard has ended
-            // check if segment has consumed all the messages already
-            PartitionGroupConsumer partitionGroupConsumer =
-                _kinesisStreamConsumerFactory.createPartitionGroupConsumer(_clientId, currentPartitionGroupMetadata);
-
-            MessageBatch messageBatch;
-            try {
-              messageBatch = partitionGroupConsumer.fetchMessages(currentEndCheckpoint, null, _fetchTimeoutMs);
-            } finally {
-              partitionGroupConsumer.close();
-            }
-            if (messageBatch.isEndOfPartitionGroup()) {
-              // shard has ended. Skip it from results
-              continue;
-            }
+
+    Map<String, Shard> shardIdToShardMap =
+        _kinesisConnectionHandler.getShards().stream().collect(Collectors.toMap(Shard::shardId, s -> s));
+    Set<String> shardsInCurrent = new HashSet<>();
+    Set<String> shardsEnded = new HashSet<>();
+
+    // TODO: Once we start supporting multiple shards in a PartitionGroup,
+    //  we need to iterate over all shards to check if any of them have reached end
+
+    // Process existing shards. Add them to new list if still consuming from them
+    for (PartitionGroupMetadata currentPartitionGroupMetadata : currentPartitionGroupsMetadata) {
+      KinesisCheckpoint kinesisStartCheckpoint = (KinesisCheckpoint) currentPartitionGroupMetadata.getStartCheckpoint();
+      String shardId = kinesisStartCheckpoint.getShardToStartSequenceMap().keySet().iterator().next();
+      Shard shard = shardIdToShardMap.get(shardId);
+      shardsInCurrent.add(shardId);
+
+      Checkpoint newStartCheckpoint;
+      Checkpoint currentEndCheckpoint = currentPartitionGroupMetadata.getEndCheckpoint();
+      if (currentEndCheckpoint != null) { // Segment DONE (committing/committed)
+        String endingSequenceNumber = shard.sequenceNumberRange().endingSequenceNumber();
+        if (endingSequenceNumber != null) { // Shard has ended, check if we're also done consuming it
+          if (consumedEndOfShard(currentEndCheckpoint, currentPartitionGroupMetadata)) {
+            shardsEnded.add(shardId);
+            continue; // Shard ended and we're done consuming it. Skip
           }
-          newStartCheckpoint = currentEndCheckpoint;
-        } else {
-          newStartCheckpoint = new KinesisCheckpoint(currentPartitionGroupMetadata.getStartCheckpoint());
         }
-      } else { // new shard
+        newStartCheckpoint = currentEndCheckpoint;
+      } else { // Segment IN_PROGRESS
+        newStartCheckpoint = currentPartitionGroupMetadata.getStartCheckpoint();
+      }
+      newPartitionGroupInfos.add(new PartitionGroupInfo(currentPartitionGroupMetadata.getPartitionGroupId(), newStartCheckpoint));
+    }
+
+    // Add new shards. Parent should be null (new table case, very first shards) OR we should be flagged as reached EOL and completely consumed.
+    for (Map.Entry<String, Shard> entry : shardIdToShardMap.entrySet()) {
+      String newShardId = entry.getKey();
+      if (shardsInCurrent.contains(newShardId)) {
+        continue;
+      }
+      Checkpoint newStartCheckpoint;
+      Shard newShard = entry.getValue();
+      String parentShardId = newShard.parentShardId();
+
+      if (parentShardId == null || shardsEnded.contains(parentShardId)) {
         Map<String, String> shardToSequenceNumberMap = new HashMap<>();
-        shardToSequenceNumberMap.put(shardId, shard.sequenceNumberRange().startingSequenceNumber());
+        shardToSequenceNumberMap.put(newShardId, newShard.sequenceNumberRange().startingSequenceNumber());
         newStartCheckpoint = new KinesisCheckpoint(shardToSequenceNumberMap);
+        int partitionGroupId = getPartitionGroupIdFromShardId(newShardId);
+        newPartitionGroupInfos.add(new PartitionGroupInfo(partitionGroupId, newStartCheckpoint));
       }
-
-      newPartitionGroupInfos.add(new PartitionGroupInfo(partitionGroupId, newStartCheckpoint.serialize()));
     }
     return newPartitionGroupInfos;
   }
@@ -138,6 +144,20 @@ public class KinesisStreamMetadataProvider implements StreamMetadataProvider {
     return shardIdNum.isEmpty() ? 0 : Integer.parseInt(shardIdNum);
   }
 
+  private boolean consumedEndOfShard(Checkpoint startCheckpoint, PartitionGroupMetadata partitionGroupMetadata)
+      throws IOException, TimeoutException {
+    PartitionGroupConsumer partitionGroupConsumer =
+        _kinesisStreamConsumerFactory.createPartitionGroupConsumer(_clientId, partitionGroupMetadata);
+
+    MessageBatch messageBatch;
+    try {
+      messageBatch = partitionGroupConsumer.fetchMessages(startCheckpoint, null, _fetchTimeoutMs);
+    } finally {
+      partitionGroupConsumer.close();
+    }
+    return messageBatch.isEndOfPartitionGroup();
+  }
+
   @Override
   public void close() {
 
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupInfo.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupInfo.java
index 758953d..b06e878 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupInfo.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupInfo.java
@@ -27,9 +27,9 @@ package org.apache.pinot.spi.stream;
 public class PartitionGroupInfo {
 
   private final int _partitionGroupId;
-  private final String _startCheckpoint;
+  private final Checkpoint _startCheckpoint;
 
-  public PartitionGroupInfo(int partitionGroupId, String startCheckpoint) {
+  public PartitionGroupInfo(int partitionGroupId, Checkpoint startCheckpoint) {
     _partitionGroupId = partitionGroupId;
     _startCheckpoint = startCheckpoint;
   }
@@ -38,7 +38,7 @@ public class PartitionGroupInfo {
     return _partitionGroupId;
   }
 
-  public String getStartCheckpoint() {
+  public Checkpoint getStartCheckpoint() {
     return _startCheckpoint;
   }
 }
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadata.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadata.java
index a99a82b..1ac12fb 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadata.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadata.java
@@ -22,12 +22,12 @@ public class PartitionGroupMetadata {
 
   private final int _partitionGroupId;
   private int _sequenceNumber;
-  private String _startCheckpoint;
-  private String _endCheckpoint;
+  private Checkpoint _startCheckpoint;
+  private Checkpoint _endCheckpoint;
   private String _status;
 
-  public PartitionGroupMetadata(int partitionGroupId, int sequenceNumber, String startCheckpoint,
-      String endCheckpoint, String status) {
+  public PartitionGroupMetadata(int partitionGroupId, int sequenceNumber, Checkpoint startCheckpoint,
+      Checkpoint endCheckpoint, String status) {
     _partitionGroupId = partitionGroupId;
     _sequenceNumber = sequenceNumber;
     _startCheckpoint = startCheckpoint;
@@ -47,19 +47,19 @@ public class PartitionGroupMetadata {
     _sequenceNumber = sequenceNumber;
   }
 
-  public String getStartCheckpoint() {
+  public Checkpoint getStartCheckpoint() {
     return _startCheckpoint;
   }
 
-  public void setStartCheckpoint(String startCheckpoint) {
+  public void setStartCheckpoint(Checkpoint startCheckpoint) {
     _startCheckpoint = startCheckpoint;
   }
 
-  public String getEndCheckpoint() {
+  public Checkpoint getEndCheckpoint() {
     return _endCheckpoint;
   }
 
-  public void setEndCheckpoint(String endCheckpoint) {
+  public void setEndCheckpoint(Checkpoint endCheckpoint) {
     _endCheckpoint = endCheckpoint;
   }
 
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
index cecc708..4b2751c 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
@@ -82,7 +82,7 @@ public interface StreamMetadataProvider extends Closeable {
           streamConsumerFactory.createPartitionMetadataProvider(clientId, i);
       StreamPartitionMsgOffset streamPartitionMsgOffset =
           partitionMetadataProvider.fetchStreamPartitionOffset(streamConfig.getOffsetCriteria(), timeoutMillis);
-      newPartitionGroupInfoList.add(new PartitionGroupInfo(i, streamPartitionMsgOffset.toString()));
+      newPartitionGroupInfoList.add(new PartitionGroupInfo(i, streamPartitionMsgOffset));
     }
     return newPartitionGroupInfoList;
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org