You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2021/02/02 02:43:30 UTC

[incubator-pinot] 40/47: Dont create new CONSUMING segment if shard has reached end of life

This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch sharded_consumer_type_support_with_kinesis
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 214c007c2915c8aa149e1e06689e66abaa85b083
Author: Neha Pawar <ne...@gmail.com>
AuthorDate: Thu Jan 7 16:07:16 2021 -0800

    Dont create new CONSUMING segment if shard has reached end of life
---
 .../protocols/SegmentCompletionProtocol.java       |   1 +
 .../realtime/PinotLLCRealtimeSegmentManager.java   | 101 ++++++++++-----------
 .../RealtimeSegmentValidationManager.java          |   2 +-
 .../PinotLLCRealtimeSegmentManagerTest.java        |   3 +-
 .../realtime/LLRealtimeSegmentDataManager.java     |  13 ++-
 .../plugin/stream/kinesis/KinesisConsumer.java     |   8 +-
 .../stream/kinesis/KinesisConsumerFactory.java     |   2 +-
 .../plugin/stream/kinesis/KinesisRecordsBatch.java |   9 +-
 .../kinesis/KinesisStreamMetadataProvider.java     |  48 +++++++---
 .../org/apache/pinot/spi/stream/MessageBatch.java  |   7 ++
 10 files changed, 115 insertions(+), 79 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java b/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java
index dd1330d..74614df 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java
@@ -138,6 +138,7 @@ public class SegmentCompletionProtocol {
 
   public static final String REASON_ROW_LIMIT = "rowLimit";  // Stop reason sent by server as max num rows reached
   public static final String REASON_TIME_LIMIT = "timeLimit";  // Stop reason sent by server as max time reached
+  public static final String REASON_END_OF_PARTITION_GROUP = "endOfPartitionGroup";  // Stop reason sent by server as end of partitionGroup reached
 
   // Canned responses
   public static final Response RESP_NOT_LEADER =
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index bbd1ef3..9fa6850 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -477,7 +477,6 @@ public class PinotLLCRealtimeSegmentManager {
     Preconditions
         .checkState(idealState.getInstanceStateMap(committingSegmentName).containsValue(SegmentStateModel.CONSUMING),
             "Failed to find instance in CONSUMING state in IdealState for segment: %s", committingSegmentName);
-    int numPartitions = getNumPartitionsFromIdealState(idealState);
     int numReplicas = getNumReplicas(tableConfig, instancePartitions);
 
     /*
@@ -496,18 +495,21 @@ public class PinotLLCRealtimeSegmentManager {
 
     // Step-2
 
-    // Say we currently were consuming from 3 shards A, B, C. Of those, A is the one committing. Also suppose that new partition D has come up
+    // Say we currently were consuming from 2 shards A, B. Of those, A is the one committing.
 
-    // get current partition groups - this gives current state of latest segments for each partition [A - DONE], [B - IN_PROGRESS], [C - IN_PROGRESS]
+    // get current partition groups - this gives current state of latest segments for each partition [A - DONE], [B - IN_PROGRESS]
     List<PartitionGroupMetadata> currentPartitionGroupMetadataList = getCurrentPartitionGroupMetadataList(idealState);
     PartitionLevelStreamConfig streamConfig = new PartitionLevelStreamConfig(tableConfig.getTableName(),
         IngestionConfigUtils.getStreamConfigMap(tableConfig));
 
-    // find new partition groups [A],[B],[C],[D]
+    // find new partition groups [A],[B],[C],[D] (assume A split into C D)
+    // If segment has consumed all of A, we will receive B,C,D
+    // If segment is still not reached last msg of A, we will receive A,B,C,D
     List<PartitionGroupInfo> newPartitionGroupInfoList =
         getPartitionGroupInfoList(streamConfig, currentPartitionGroupMetadataList);
+    int numPartitions = newPartitionGroupInfoList.size();
 
-    // create new segment metadata, only if it is not IN_PROGRESS in the current state
+    // create new segment metadata, only if PartitionGroupInfo was returned for it in the newPartitionGroupInfoList
     Map<Integer, PartitionGroupMetadata> currentGroupIdToMetadata = currentPartitionGroupMetadataList.stream().collect(
         Collectors.toMap(PartitionGroupMetadata::getPartitionGroupId, p -> p));
 
@@ -519,36 +521,25 @@ public class PinotLLCRealtimeSegmentManager {
       PartitionGroupMetadata currentPartitionGroupMetadata = currentGroupIdToMetadata.get(newPartitionGroupId);
       if (currentPartitionGroupMetadata == null) { // not present in current state. New partition found.
         // make new segment
-        // FIXME: flushThreshold of segment is actually (configured threshold/numPartitions)
-        //  In Kinesis, with every split/merge, we get new partitions, and an old partition gets deactivated.
-        //  However, the getPartitionGroupInfo call returns ALL shards, regardless of whether they're active or not.
-        //  So our numPartitions will forever keep increasing.
-        // TODO: can the getPartitionGroupInfo return the active partitions only, based on the checkpoints passed in current?
+        // fixme: letting validation manager do this would be best, otherwise we risk creating multiple CONSUMING segments
         String newLLCSegmentName =
             setupNewPartitionGroup(tableConfig, streamConfig, partitionGroupInfo, newSegmentCreationTimeMs,
                 instancePartitions, numPartitions, numReplicas);
         newConsumingSegmentNames.add(newLLCSegmentName);
       } else {
-        String currentStatus = currentPartitionGroupMetadata.getStatus();
-        if (!currentStatus.equals(Status.IN_PROGRESS.toString())) {
-          // not IN_PROGRESS anymore in current state. Should be DONE.
-          // This should ONLY happen for the committing segment's partition. Need to trigger new consuming segment
-          // todo: skip this if the partition doesn't match with the committing segment?
+        LLCSegmentName committingLLCSegment = new LLCSegmentName(committingSegmentName);
+        // Update this only for committing segment. All other partitions should get updated by their own commit call
+        if (newPartitionGroupId == committingLLCSegment.getPartitionGroupId()) {
+          Preconditions.checkState(currentPartitionGroupMetadata.getStatus().equals(Status.DONE.toString()));
           LLCSegmentName newLLCSegmentName = new LLCSegmentName(rawTableName, newPartitionGroupId,
               currentPartitionGroupMetadata.getSequenceNumber() + 1, newSegmentCreationTimeMs);
           createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName, newSegmentCreationTimeMs,
               committingSegmentDescriptor, committingSegmentZKMetadata, instancePartitions, numPartitions, numReplicas);
           newConsumingSegmentNames.add(newLLCSegmentName.getSegmentName());
-
-          // FIXME: a new CONSUMING segment is created even if EOL for this shard has been reached.
-          //  the logic in getPartitionGroupInfo to prevent returning of EOLed shards isn't working
-          //  OPTION: Since consumer knows about it, it can pass param in request/committingSegmentDescriptor "isEndOfShard"
-          //  We can set that in metadata for validation manager to skip these partitions
         }
       }
     }
 
-
     // Step-3
     SegmentAssignment segmentAssignment = SegmentAssignmentFactory.getSegmentAssignment(_helixManager, tableConfig);
     Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap =
@@ -840,8 +831,9 @@ public class PinotLLCRealtimeSegmentManager {
       if (idealState.isEnabled()) {
         List<PartitionGroupMetadata> currentPartitionGroupMetadataList =
             getCurrentPartitionGroupMetadataList(idealState);
-        int numPartitions = getPartitionGroupInfoList(streamConfig, currentPartitionGroupMetadataList).size();
-        return ensureAllPartitionsConsuming(tableConfig, streamConfig, idealState, numPartitions);
+        List<PartitionGroupInfo> newPartitionGroupInfoList =
+            getPartitionGroupInfoList(streamConfig, currentPartitionGroupMetadataList);
+        return ensureAllPartitionsConsuming(tableConfig, streamConfig, idealState, newPartitionGroupInfoList);
       } else {
         LOGGER.info("Skipping LLC segments validation for disabled table: {}", realtimeTableName);
         return idealState;
@@ -988,11 +980,14 @@ public class PinotLLCRealtimeSegmentManager {
    */
   @VisibleForTesting
   IdealState ensureAllPartitionsConsuming(TableConfig tableConfig, PartitionLevelStreamConfig streamConfig,
-      IdealState idealState, int numPartitions) {
+      IdealState idealState, List<PartitionGroupInfo> newPartitionGroupInfoList) {
     String realtimeTableName = tableConfig.getTableName();
 
     InstancePartitions instancePartitions = getConsumingInstancePartitions(tableConfig);
     int numReplicas = getNumReplicas(tableConfig, instancePartitions);
+    int numPartitions = newPartitionGroupInfoList.size();
+    Set<Integer> newPartitionGroupSet =
+        newPartitionGroupInfoList.stream().map(PartitionGroupInfo::getPartitionGroupId).collect(Collectors.toSet());
 
     SegmentAssignment segmentAssignment = SegmentAssignmentFactory.getSegmentAssignment(_helixManager, tableConfig);
     Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap =
@@ -1029,7 +1024,7 @@ public class PinotLLCRealtimeSegmentManager {
       Map<String, String> instanceStateMap = instanceStatesMap.get(latestSegmentName);
       if (instanceStateMap != null) {
         // Latest segment of metadata is in idealstate.
-        if (instanceStateMap.values().contains(SegmentStateModel.CONSUMING)) {
+        if (instanceStateMap.containsValue(SegmentStateModel.CONSUMING)) {
           if (latestSegmentZKMetadata.getStatus() == Status.DONE) {
 
             // step-1 of commmitSegmentMetadata is done (i.e. marking old segment as DONE)
@@ -1040,15 +1035,23 @@ public class PinotLLCRealtimeSegmentManager {
             }
             LOGGER.info("Repairing segment: {} which is DONE in segment ZK metadata, but is CONSUMING in IdealState",
                 latestSegmentName);
-
-            LLCSegmentName newLLCSegmentName = getNextLLCSegmentName(latestLLCSegmentName, currentTimeMs);
-            String newSegmentName = newLLCSegmentName.getSegmentName();
-            CommittingSegmentDescriptor committingSegmentDescriptor = new CommittingSegmentDescriptor(latestSegmentName,
-                (offsetFactory.create(latestSegmentZKMetadata.getEndOffset()).toString()), 0);
-            createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName, currentTimeMs,
-                committingSegmentDescriptor, latestSegmentZKMetadata, instancePartitions, numPartitions, numReplicas);
-            updateInstanceStatesForNewConsumingSegment(instanceStatesMap, latestSegmentName, newSegmentName,
-                segmentAssignment, instancePartitionsMap);
+            if (newPartitionGroupSet.contains(partitionGroupId)) {
+              LLCSegmentName newLLCSegmentName = getNextLLCSegmentName(latestLLCSegmentName, currentTimeMs);
+              String newSegmentName = newLLCSegmentName.getSegmentName();
+              CommittingSegmentDescriptor committingSegmentDescriptor = new CommittingSegmentDescriptor(latestSegmentName,
+                  (offsetFactory.create(latestSegmentZKMetadata.getEndOffset()).toString()), 0);
+              createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName, currentTimeMs,
+                  committingSegmentDescriptor, latestSegmentZKMetadata, instancePartitions, numPartitions, numReplicas);
+              updateInstanceStatesForNewConsumingSegment(instanceStatesMap, latestSegmentName, newSegmentName,
+                  segmentAssignment, instancePartitionsMap);
+            } else { // partition group reached end of life
+              LOGGER.info(
+                  "PartitionGroup: {} has reached end of life. Updating ideal state for segment: {}. "
+                      + "Skipping creation of new ZK metadata and new segment in ideal state",
+                  partitionGroupId, latestSegmentName);
+              updateInstanceStatesForNewConsumingSegment(instanceStatesMap, latestSegmentName, null, segmentAssignment,
+                  instancePartitionsMap);
+            }
           }
           // else, the metadata should be IN_PROGRESS, which is the right state for a consuming segment.
         } else { // no replica in CONSUMING state
@@ -1081,11 +1084,14 @@ public class PinotLLCRealtimeSegmentManager {
             updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null, newSegmentName, segmentAssignment,
                 instancePartitionsMap);
           } else {
-            // If we get here, that means in IdealState, the latest segment has no CONSUMING replicas, but has replicas
-            // not OFFLINE. That is an unexpected state which cannot be fixed by the validation manager currently. In
-            // that case, we need to either extend this part to handle the state, or prevent segments from getting into
-            // such state.
-            LOGGER.error("Got unexpected instance state map: {} for segment: {}", instanceStateMap, latestSegmentName);
+            if (!newPartitionGroupSet.contains(partitionGroupId)) {
+              // If we get here, that means in IdealState, the latest segment has no CONSUMING replicas, but has replicas
+              // not OFFLINE. That is an unexpected state which cannot be fixed by the validation manager currently. In
+              // that case, we need to either extend this part to handle the state, or prevent segments from getting into
+              // such state.
+              LOGGER.error("Got unexpected instance state map: {} for segment: {}", instanceStateMap, latestSegmentName);
+            }
+            // else, the partition group has reached end of life. This is an acceptable state
           }
         }
       } else {
@@ -1127,10 +1133,7 @@ public class PinotLLCRealtimeSegmentManager {
     }
 
     // Set up new partitions if not exist
-    List<PartitionGroupMetadata> currentPartitionGroupMetadataList = getCurrentPartitionGroupMetadataList(idealState);
-    List<PartitionGroupInfo> partitionGroupInfoList =
-        getPartitionGroupInfoList(streamConfig, currentPartitionGroupMetadataList);
-    for (PartitionGroupInfo partitionGroupInfo : partitionGroupInfoList) {
+    for (PartitionGroupInfo partitionGroupInfo : newPartitionGroupInfoList) {
       int partitionGroupId = partitionGroupInfo.getPartitionGroupId();
       if (!latestSegmentZKMetadataMap.containsKey(partitionGroupId)) {
         String newSegmentName =
@@ -1178,18 +1181,6 @@ public class PinotLLCRealtimeSegmentManager {
     return System.currentTimeMillis();
   }
 
-  // fixme: investigate if this should only return active partitions (i.e. skip a shard if it has reached eol)
-  //  or return all unique partitions found in ideal state right from the birth of the table
-  private int getNumPartitionsFromIdealState(IdealState idealState) {
-    Set<String> uniquePartitions = new HashSet<>();
-    for (String segmentName : idealState.getRecord().getMapFields().keySet()) {
-      if (LLCSegmentName.isLowLevelConsumerSegmentName(segmentName)) {
-        uniquePartitions.add(String.valueOf(new LLCSegmentName(segmentName).getPartitionGroupId()));
-      }
-    }
-    return uniquePartitions.size();
-  }
-
   private int getNumReplicas(TableConfig tableConfig, InstancePartitions instancePartitions) {
     if (instancePartitions.getNumReplicaGroups() == 1) {
       // Non-replica-group based
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
index d611433..96604dd 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
@@ -58,7 +58,7 @@ public class RealtimeSegmentValidationManager extends ControllerPeriodicTask<Rea
       LeadControllerManager leadControllerManager, PinotLLCRealtimeSegmentManager llcRealtimeSegmentManager,
       ValidationMetrics validationMetrics, ControllerMetrics controllerMetrics) {
     super("RealtimeSegmentValidationManager", config.getRealtimeSegmentValidationFrequencyInSeconds(),
-        config.getRealtimeSegmentValidationManagerInitialDelaySeconds(), pinotHelixResourceManager,
+        6000, pinotHelixResourceManager,
         leadControllerManager, controllerMetrics);
     _llcRealtimeSegmentManager = llcRealtimeSegmentManager;
     _validationMetrics = validationMetrics;
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index 75c8057..0f33556 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -850,7 +850,8 @@ public class PinotLLCRealtimeSegmentManagerTest {
     }
 
     public void ensureAllPartitionsConsuming() {
-      ensureAllPartitionsConsuming(_tableConfig, _streamConfig, _idealState, _numPartitions);
+      ensureAllPartitionsConsuming(_tableConfig, _streamConfig, _idealState,
+          getPartitionGroupInfoList(_streamConfig, Collections.emptyList()));
     }
 
     @Override
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index 758c656..c889193 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -240,6 +240,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
   // Segment end criteria
   private volatile long _consumeEndTime = 0;
   private Checkpoint _finalOffset; // Used when we want to catch up to this one
+  private boolean _endOfPartitionGroup = false;
   private volatile boolean _shouldStop = false;
 
   // It takes 30s to locate controller leader, and more if there are multiple controller failures.
@@ -306,6 +307,13 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
               _numRowsIndexed, _numRowsConsumed, _segmentMaxRowCount);
           _stopReason = SegmentCompletionProtocol.REASON_ROW_LIMIT;
           return true;
+        } else if (_endOfPartitionGroup) {
+          segmentLogger.info("Stopping consumption due to end of partitionGroup reached nRows={} numRowsIndexed={}, numRowsConsumed={}",
+              _numRowsIndexed, _numRowsConsumed, _segmentMaxRowCount);
+          _stopReason = SegmentCompletionProtocol.REASON_END_OF_PARTITION_GROUP;
+          // fixme: what happens if reached endOfPartitionGroup but numDocsIndexed == 0
+          //  If we decide to only setupNewPartitions via ValidationManager, we don't need commit on endOfShard
+          return true;
         }
         return false;
 
@@ -384,6 +392,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
       try {
         messageBatch = _partitionGroupConsumer
             .fetchMessages(_currentOffset, null, _partitionLevelStreamConfig.getFetchTimeoutMillis());
+        _endOfPartitionGroup = messageBatch.isEndOfPartitionGroup();
         consecutiveErrorCount = 0;
       } catch (TransientConsumerException e) {
         handleTransientStreamErrors(e);
@@ -1245,9 +1254,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
         //       long as the partition function is not changed.
         int numPartitions = columnPartitionConfig.getNumPartitions();
         try {
-          // fixme: get this from ideal state
-          int numStreamPartitions = _streamMetadataProvider
-              .getPartitionGroupInfoList(_clientId, _partitionLevelStreamConfig, Collections.emptyList(), 5000).size();
+          int numStreamPartitions = _streamMetadataProvider.fetchPartitionCount(/*maxWaitTimeMs=*/5000L);
           if (numStreamPartitions != numPartitions) {
             segmentLogger.warn(
                 "Number of stream partitions: {} does not match number of partitions in the partition config: {}, using number of stream partitions",
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
index 70d2c8a..5cbd7e6 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
@@ -125,8 +125,7 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Partiti
       if (nextStartSequenceNumber == null && recordList.size() > 0) {
         nextStartSequenceNumber = recordList.get(recordList.size() - 1).sequenceNumber();
       }
-
-      return new KinesisRecordsBatch(recordList, next.getKey());
+      return new KinesisRecordsBatch(recordList, next.getKey(), isEndOfShard);
     } catch (IllegalStateException e) {
       LOG.warn("Illegal state exception, connection is broken", e);
       return handleException(kinesisStartCheckpoint, recordList);
@@ -158,10 +157,9 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Partiti
       Map<String, String> newCheckpoint = new HashMap<>(start.getShardToStartSequenceMap());
       newCheckpoint.put(newCheckpoint.keySet().iterator().next(), nextStartSequenceNumber);
 
-      return new KinesisRecordsBatch(recordList, shardId);
+      return new KinesisRecordsBatch(recordList, shardId, false);
     } else {
-      return new KinesisRecordsBatch(recordList, shardId);
-
+      return new KinesisRecordsBatch(recordList, shardId, false);
     }
   }
 
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java
index 631f240..fc9c4af 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java
@@ -48,7 +48,7 @@ public class KinesisConsumerFactory extends StreamConsumerFactory {
 
   @Override
   public StreamMetadataProvider createStreamMetadataProvider(String clientId) {
-    return new KinesisStreamMetadataProvider(clientId, new KinesisConfig(_streamConfig));
+    return new KinesisStreamMetadataProvider(clientId, _streamConfig);
   }
 
   @Override
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisRecordsBatch.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisRecordsBatch.java
index fdc883b..b3eb626 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisRecordsBatch.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisRecordsBatch.java
@@ -32,10 +32,12 @@ import software.amazon.awssdk.services.kinesis.model.Record;
 public class KinesisRecordsBatch implements MessageBatch<byte[]> {
   private final List<Record> _recordList;
   private final String _shardId;
+  private final boolean _endOfShard;
 
-  public KinesisRecordsBatch(List<Record> recordList, String shardId) {
+  public KinesisRecordsBatch(List<Record> recordList, String shardId, boolean endOfShard) {
     _recordList = recordList;
     _shardId = shardId;
+    _endOfShard = endOfShard;
   }
 
   @Override
@@ -68,4 +70,9 @@ public class KinesisRecordsBatch implements MessageBatch<byte[]> {
   public long getNextStreamMessageOffsetAtIndex(int index) {
     throw new UnsupportedOperationException();
   }
+
+  @Override
+  public boolean isEndOfPartitionGroup() {
+    return _endOfShard;
+  }
 }
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java
index 6c55a18..8968b56 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java
@@ -1,27 +1,45 @@
 package org.apache.pinot.plugin.stream.kinesis;
 
+import com.google.common.base.Preconditions;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
 import javax.annotation.Nonnull;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.stream.MessageBatch;
 import org.apache.pinot.spi.stream.OffsetCriteria;
+import org.apache.pinot.spi.stream.PartitionGroupConsumer;
 import org.apache.pinot.spi.stream.PartitionGroupInfo;
 import org.apache.pinot.spi.stream.PartitionGroupMetadata;
 import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamConsumerFactory;
+import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
 import org.apache.pinot.spi.stream.StreamMetadataProvider;
 import software.amazon.awssdk.services.kinesis.model.Shard;
 
 
 public class KinesisStreamMetadataProvider implements StreamMetadataProvider {
   private final KinesisConnectionHandler _kinesisConnectionHandler;
+  private final StreamConsumerFactory _kinesisStreamConsumerFactory;
+  private final String _clientId;
+  private final int _fetchTimeoutMs;
 
-  public KinesisStreamMetadataProvider(String clientId, KinesisConfig kinesisConfig) {
+  public KinesisStreamMetadataProvider(String clientId, StreamConfig streamConfig) {
+    KinesisConfig kinesisConfig = new KinesisConfig(streamConfig);
     _kinesisConnectionHandler = new KinesisConnectionHandler(kinesisConfig.getStream(), kinesisConfig.getAwsRegion());
+    _kinesisStreamConsumerFactory = StreamConsumerFactoryProvider.create(streamConfig);
+    _clientId = clientId;
+    _fetchTimeoutMs = streamConfig.getFetchTimeoutMillis();
   }
 
   @Override
@@ -37,7 +55,7 @@ public class KinesisStreamMetadataProvider implements StreamMetadataProvider {
   @Override
   public List<PartitionGroupInfo> getPartitionGroupInfoList(String clientId, StreamConfig streamConfig,
       List<PartitionGroupMetadata> currentPartitionGroupsMetadata, int timeoutMillis)
-      throws IOException {
+      throws IOException, TimeoutException {
 
     Map<Integer, PartitionGroupMetadata> currentPartitionGroupMap =
         currentPartitionGroupsMetadata.stream().collect(Collectors.toMap(PartitionGroupMetadata::getPartitionGroupId, p -> p));
@@ -45,10 +63,12 @@ public class KinesisStreamMetadataProvider implements StreamMetadataProvider {
     List<PartitionGroupInfo> newPartitionGroupInfos = new ArrayList<>();
     List<Shard> shards = _kinesisConnectionHandler.getShards();
     for (Shard shard : shards) { // go over all shards
+      KinesisCheckpoint newStartCheckpoint;
+
       String shardId = shard.shardId();
       int partitionGroupId = getPartitionGroupIdFromShardId(shardId);
       PartitionGroupMetadata currentPartitionGroupMetadata = currentPartitionGroupMap.get(partitionGroupId);
-      KinesisCheckpoint newStartCheckpoint;
+
       if (currentPartitionGroupMetadata != null) { // existing shard
         KinesisCheckpoint currentEndCheckpoint = null;
         try {
@@ -59,15 +79,18 @@ public class KinesisStreamMetadataProvider implements StreamMetadataProvider {
         if (currentEndCheckpoint != null) { // end checkpoint available i.e. committing segment
           String endingSequenceNumber = shard.sequenceNumberRange().endingSequenceNumber();
           if (endingSequenceNumber != null) { // shard has ended
-            // FIXME: this logic is not working
-            //  was expecting sequenceNumOfLastMsgInShard == endSequenceNumOfShard.
-            //  But it is much lesser than the endSeqNumOfShard
-            Map<String, String> shardToSequenceNumberMap = new HashMap<>();
-            shardToSequenceNumberMap.put(shardId, endingSequenceNumber);
-            KinesisCheckpoint shardEndCheckpoint = new KinesisCheckpoint(shardToSequenceNumberMap);
-            if (currentEndCheckpoint.compareTo(shardEndCheckpoint) >= 0) {
-              // shard has ended AND we have reached the end checkpoint.
-              // skip this partition group in the result
+            // check if segment has consumed all the messages already
+            PartitionGroupConsumer partitionGroupConsumer =
+                _kinesisStreamConsumerFactory.createPartitionGroupConsumer(_clientId, currentPartitionGroupMetadata);
+
+            MessageBatch messageBatch;
+            try {
+              messageBatch = partitionGroupConsumer.fetchMessages(currentEndCheckpoint, null, _fetchTimeoutMs);
+            } finally {
+              partitionGroupConsumer.close();
+            }
+            if (messageBatch.isEndOfPartitionGroup()) {
+              // shard has ended. Skip it from results
               continue;
             }
           }
@@ -80,6 +103,7 @@ public class KinesisStreamMetadataProvider implements StreamMetadataProvider {
         shardToSequenceNumberMap.put(shardId, shard.sequenceNumberRange().startingSequenceNumber());
         newStartCheckpoint = new KinesisCheckpoint(shardToSequenceNumberMap);
       }
+
       newPartitionGroupInfos
           .add(new PartitionGroupInfo(partitionGroupId, newStartCheckpoint.serialize()));
     }
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java
index 3052b9e..02c721f 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java
@@ -81,4 +81,11 @@ public interface MessageBatch<T> {
   default StreamPartitionMsgOffset getNextStreamParitionMsgOffsetAtIndex(int index) {
     return new LongMsgOffset(getNextStreamMessageOffsetAtIndex(index));
   }
+
+  /**
+   * Returns true if end of the consumer detects that no more records can be read from this partition group for good
+   */
+  default boolean isEndOfPartitionGroup() {
+    return false;
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org