You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2021/02/02 02:43:32 UTC

[incubator-pinot] 42/47: Cleanup, javadocs, comments

This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch sharded_consumer_type_support_with_kinesis
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit ce1a6462084dfa05d9b8c2b57a23a9c8274725e4
Author: Neha Pawar <ne...@gmail.com>
AuthorDate: Fri Jan 8 18:28:04 2021 -0800

    Cleanup, javadocs, comments
---
 .../protocols/SegmentCompletionProtocol.java       |  1 -
 .../helix/core/PinotTableIdealStateBuilder.java    |  8 +-
 .../realtime/PinotLLCRealtimeSegmentManager.java   | 76 ++++++++++---------
 .../PinotLLCRealtimeSegmentManagerTest.java        | 21 ++----
 .../realtime/LLRealtimeSegmentDataManager.java     | 21 +++---
 .../plugin/stream/kinesis/KinesisCheckpoint.java   | 10 ++-
 .../pinot/plugin/stream/kinesis/KinesisConfig.java |  3 +
 .../stream/kinesis/KinesisConnectionHandler.java   | 17 +++--
 .../plugin/stream/kinesis/KinesisConsumer.java     | 20 +++--
 .../stream/kinesis/KinesisConsumerFactory.java     |  8 +-
 .../stream/kinesis/KinesisMsgOffsetFactory.java    |  4 +
 .../plugin/stream/kinesis/KinesisRecordsBatch.java |  6 +-
 .../kinesis/KinesisStreamMetadataProvider.java     | 27 +++----
 .../org/apache/pinot/spi/stream/Checkpoint.java    |  5 ++
 .../stream/PartitionGroupCheckpointFactory.java    | 12 +--
 .../pinot/spi/stream/PartitionGroupConsumer.java   | 16 +++-
 .../pinot/spi/stream/PartitionGroupInfo.java       | 13 ++--
 .../spi/stream/PartitionGroupInfoFetcher.java      |  2 +-
 .../pinot/spi/stream/PartitionGroupMetadata.java   |  4 -
 .../pinot/spi/stream/PartitionLevelConsumer.java   |  6 +-
 .../pinot/spi/stream/PartitionOffsetFetcher.java   | 88 ----------------------
 .../pinot/spi/stream/StreamConsumerFactory.java    | 10 ++-
 .../pinot/spi/stream/StreamMetadataProvider.java   |  9 +--
 23 files changed, 170 insertions(+), 217 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java b/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java
index 74614df..dd1330d 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java
@@ -138,7 +138,6 @@ public class SegmentCompletionProtocol {
 
   public static final String REASON_ROW_LIMIT = "rowLimit";  // Stop reason sent by server as max num rows reached
   public static final String REASON_TIME_LIMIT = "timeLimit";  // Stop reason sent by server as max time reached
-  public static final String REASON_END_OF_PARTITION_GROUP = "endOfPartitionGroup";  // Stop reason sent by server as end of partitionGroup reached
 
   // Canned responses
   public static final Response RESP_NOT_LEADER =
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
index 68bcf57..98fbd5d 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
@@ -117,6 +117,12 @@ public class PinotTableIdealStateBuilder {
     pinotLLCRealtimeSegmentManager.setUpNewTable(realtimeTableConfig, idealState);
   }
 
+  /**
+   * Fetches the list of {@link PartitionGroupInfo} for the stream, with the help of the current partitionGroups metadata
+   * This call will only skip partitions which have reached end of life and all messages from that partition have been consumed.
+   * The current partition group metadata is used to determine the offsets that have been consumed for a partition.
+   * The current partition group metadata is also used to know about existing partition groupings which should not be disturbed
+   */
   public static List<PartitionGroupInfo> getPartitionGroupInfoList(StreamConfig streamConfig,
       List<PartitionGroupMetadata> currentPartitionGroupMetadataList) {
     PartitionGroupInfoFetcher partitionGroupInfoFetcher =
@@ -126,7 +132,7 @@ public class PinotTableIdealStateBuilder {
       return partitionGroupInfoFetcher.getPartitionGroupInfoList();
     } catch (Exception e) {
       Exception fetcherException = partitionGroupInfoFetcher.getException();
-      LOGGER.error("Could not get partition count for {}", streamConfig.getTopicName(), fetcherException);
+      LOGGER.error("Could not get partition group info for {}", streamConfig.getTopicName(), fetcherException);
       throw new RuntimeException(fetcherException);
     }
   }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 9a0786b..27d487b 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -24,7 +24,6 @@ import java.net.URI;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -77,11 +76,11 @@ import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
 import org.apache.pinot.spi.filesystem.PinotFS;
 import org.apache.pinot.spi.filesystem.PinotFSFactory;
+import org.apache.pinot.spi.stream.Checkpoint;
 import org.apache.pinot.spi.stream.OffsetCriteria;
 import org.apache.pinot.spi.stream.PartitionGroupInfo;
 import org.apache.pinot.spi.stream.PartitionGroupMetadata;
 import org.apache.pinot.spi.stream.PartitionLevelStreamConfig;
-import org.apache.pinot.spi.stream.PartitionOffsetFetcher;
 import org.apache.pinot.spi.stream.StreamConfig;
 import org.apache.pinot.spi.stream.StreamConfigProperties;
 import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
@@ -164,17 +163,18 @@ public class PinotLLCRealtimeSegmentManager {
 
 
   /**
-   * Using the ideal state and segment metadata, return a list of the current partition groups
+   * Using the ideal state and segment metadata, return a list of {@link PartitionGroupMetadata}
+   * for latest segment of each partition group
    */
   public List<PartitionGroupMetadata> getCurrentPartitionGroupMetadataList(IdealState idealState) {
     List<PartitionGroupMetadata> partitionGroupMetadataList = new ArrayList<>();
 
-    // from all segment names in the ideal state, find unique groups
-    Map<Integer, LLCSegmentName> groupIdToLatestSegment = new HashMap<>();
+    // From all segment names in the ideal state, find unique partition group ids and their latest segment
+    Map<Integer, LLCSegmentName> partitionGroupIdToLatestSegment = new HashMap<>();
     for (String segment : idealState.getPartitionSet()) {
       LLCSegmentName llcSegmentName = new LLCSegmentName(segment);
       int partitionGroupId = llcSegmentName.getPartitionGroupId();
-      groupIdToLatestSegment.compute(partitionGroupId, (k, latestSegment) -> {
+      partitionGroupIdToLatestSegment.compute(partitionGroupId, (k, latestSegment) -> {
         if (latestSegment == null) {
           return llcSegmentName;
         } else {
@@ -184,8 +184,8 @@ public class PinotLLCRealtimeSegmentManager {
       });
     }
 
-    // create a PartitionGroupMetadata for each latest segment
-    for (Map.Entry<Integer, LLCSegmentName> entry : groupIdToLatestSegment.entrySet()) {
+    // Create a PartitionGroupMetadata for each latest segment
+    for (Map.Entry<Integer, LLCSegmentName> entry : partitionGroupIdToLatestSegment.entrySet()) {
       int partitionGroupId = entry.getKey();
       LLCSegmentName llcSegmentName = entry.getValue();
       RealtimeSegmentZKMetadata realtimeSegmentZKMetadata = ZKMetadataProvider
@@ -258,10 +258,8 @@ public class PinotLLCRealtimeSegmentManager {
     PartitionLevelStreamConfig streamConfig =
         new PartitionLevelStreamConfig(tableConfig.getTableName(), IngestionConfigUtils.getStreamConfigMap(tableConfig));
     InstancePartitions instancePartitions = getConsumingInstancePartitions(tableConfig);
-    // get new partition groups and their metadata
     List<PartitionGroupInfo> newPartitionGroupInfoList = getPartitionGroupInfoList(streamConfig, Collections.emptyList());
     int numPartitionGroups = newPartitionGroupInfoList.size();
-
     int numReplicas = getNumReplicas(tableConfig, instancePartitions);
 
     SegmentAssignment segmentAssignment = SegmentAssignmentFactory.getSegmentAssignment(_helixManager, tableConfig);
@@ -699,27 +697,16 @@ public class PinotLLCRealtimeSegmentManager {
     return commitTimeoutMS;
   }
 
+  /**
+   * Fetches the latest state of the PartitionGroups for the stream
+   * If any partition has reached end of life, and all messages of that partition have been consumed by the segment, it will be skipped from the result
+   */
   @VisibleForTesting
   List<PartitionGroupInfo> getPartitionGroupInfoList(StreamConfig streamConfig,
       List<PartitionGroupMetadata> currentPartitionGroupMetadataList) {
     return PinotTableIdealStateBuilder.getPartitionGroupInfoList(streamConfig, currentPartitionGroupMetadataList);
   }
 
-  @VisibleForTesting
-  StreamPartitionMsgOffset getPartitionOffset(StreamConfig streamConfig, OffsetCriteria offsetCriteria,
-      int partitionGroupId) {
-    PartitionOffsetFetcher partitionOffsetFetcher =
-        new PartitionOffsetFetcher(offsetCriteria, partitionGroupId, streamConfig);
-    try {
-      RetryPolicies.fixedDelayRetryPolicy(3, 1000L).attempt(partitionOffsetFetcher);
-      return partitionOffsetFetcher.getOffset();
-    } catch (Exception e) {
-      throw new IllegalStateException(String
-          .format("Failed to fetch the offset for topic: %s, partition: %s with criteria: %s",
-              streamConfig.getTopicName(), partitionGroupId, offsetCriteria), e);
-    }
-  }
-
   /**
    * An instance is reporting that it has stopped consuming a topic due to some error.
    * If the segment is in CONSUMING state, mark the state of the segment to be OFFLINE in idealstate.
@@ -1052,26 +1039,26 @@ public class PinotLLCRealtimeSegmentManager {
 
             // Create a new segment to re-consume from the previous start offset
             LLCSegmentName newLLCSegmentName = getNextLLCSegmentName(latestLLCSegmentName, currentTimeMs);
-            StreamPartitionMsgOffset startOffset = offsetFactory.create(latestSegmentZKMetadata.getStartOffset());
+            Checkpoint startCheckpoint = offsetFactory.create(latestSegmentZKMetadata.getStartOffset());
+            Checkpoint partitionGroupStartCheckpoint = getPartitionGroupStartCheckpoint(streamConfig, partitionGroupId);
+
             // Start offset must be higher than the start offset of the stream
-            StreamPartitionMsgOffset partitionStartOffset =
-                getPartitionOffset(streamConfig, OffsetCriteria.SMALLEST_OFFSET_CRITERIA, partitionGroupId);
-            if (partitionStartOffset.compareTo(startOffset) > 0) {
-              LOGGER.error("Data lost from offset: {} to: {} for partition: {} of table: {}", startOffset,
-                  partitionStartOffset, partitionGroupId, realtimeTableName);
+            if (partitionGroupStartCheckpoint.compareTo(startCheckpoint) > 0) {
+              LOGGER.error("Data lost from offset: {} to: {} for partition: {} of table: {}", startCheckpoint,
+                  partitionGroupStartCheckpoint, partitionGroupId, realtimeTableName);
               _controllerMetrics.addMeteredTableValue(realtimeTableName, ControllerMeter.LLC_STREAM_DATA_LOSS, 1L);
-              startOffset = partitionStartOffset;
+              startCheckpoint = partitionGroupStartCheckpoint;
             }
 
             CommittingSegmentDescriptor committingSegmentDescriptor =
-                new CommittingSegmentDescriptor(latestSegmentName, startOffset.toString(), 0);
+                new CommittingSegmentDescriptor(latestSegmentName, startCheckpoint.toString(), 0);
             createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName, currentTimeMs,
                 committingSegmentDescriptor, latestSegmentZKMetadata, instancePartitions, numPartitions, numReplicas);
             String newSegmentName = newLLCSegmentName.getSegmentName();
             updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null, newSegmentName, segmentAssignment,
                 instancePartitionsMap);
           } else {
-            if (!newPartitionGroupSet.contains(partitionGroupId)) {
+            if (newPartitionGroupSet.contains(partitionGroupId)) {
               // If we get here, that means in IdealState, the latest segment has no CONSUMING replicas, but has replicas
               // not OFFLINE. That is an unexpected state which cannot be fixed by the validation manager currently. In
               // that case, we need to either extend this part to handle the state, or prevent segments from getting into
@@ -1134,6 +1121,27 @@ public class PinotLLCRealtimeSegmentManager {
     return idealState;
   }
 
+  private StreamPartitionMsgOffset getPartitionGroupStartCheckpoint(StreamConfig streamConfig, int partitionGroupId) {
+    Map<String, String> streamConfigMapWithSmallestOffsetCriteria = new HashMap<>(streamConfig.getStreamConfigsMap());
+    streamConfigMapWithSmallestOffsetCriteria.put(StreamConfigProperties
+            .constructStreamProperty(streamConfig.getType(), StreamConfigProperties.STREAM_CONSUMER_OFFSET_CRITERIA),
+        OffsetCriteria.SMALLEST_OFFSET_CRITERIA.getOffsetString());
+    StreamConfig smallestOffsetCriteriaStreamConfig =
+        new StreamConfig(streamConfig.getTableNameWithType(), streamConfigMapWithSmallestOffsetCriteria);
+    List<PartitionGroupInfo> smallestOffsetCriteriaPartitionGroupInfo =
+        getPartitionGroupInfoList(smallestOffsetCriteriaStreamConfig, Collections.emptyList());
+    StreamPartitionMsgOffset partitionStartOffset = null;
+    for (PartitionGroupInfo info : smallestOffsetCriteriaPartitionGroupInfo) {
+      if (info.getPartitionGroupId() == partitionGroupId) {
+        StreamPartitionMsgOffsetFactory factory =
+            StreamConsumerFactoryProvider.create(streamConfig).createStreamMsgOffsetFactory();
+        partitionStartOffset = factory.create(info.getStartCheckpoint());
+        break;
+      }
+    }
+    return partitionStartOffset;
+  }
+
   private LLCSegmentName getNextLLCSegmentName(LLCSegmentName lastLLCSegmentName, long creationTimeMs) {
     return new LLCSegmentName(lastLLCSegmentName.getTableName(), lastLLCSegmentName.getPartitionGroupId(),
         lastLLCSegmentName.getSequenceNumber() + 1, creationTimeMs);
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index 0f33556..c19a845 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -908,32 +908,23 @@ public class PinotLLCRealtimeSegmentManagerTest {
 
     @Override
     void updateIdealStateOnSegmentCompletion(String realtimeTableName, String committingSegmentName,
-        List<String> newSegmentNames, SegmentAssignment segmentAssignment,
+        String newSegmentName, SegmentAssignment segmentAssignment,
         Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap) {
-      updateInstanceStatesForNewConsumingSegment(_idealState.getRecord().getMapFields(), committingSegmentName,
-          null, segmentAssignment, instancePartitionsMap);
-      for (String segmentName : newSegmentNames) {
-        updateInstanceStatesForNewConsumingSegment(_idealState.getRecord().getMapFields(), null,
-            segmentName, segmentAssignment, instancePartitionsMap);
-      }
+      updateInstanceStatesForNewConsumingSegment(_idealState.getRecord().getMapFields(), committingSegmentName, null,
+          segmentAssignment, instancePartitionsMap);
+      updateInstanceStatesForNewConsumingSegment(_idealState.getRecord().getMapFields(), null, newSegmentName,
+          segmentAssignment, instancePartitionsMap);
     }
 
     @Override
     List<PartitionGroupInfo> getPartitionGroupInfoList(StreamConfig streamConfig,
         List<PartitionGroupMetadata> currentPartitionGroupMetadataList) {
       return IntStream.range(0, _numPartitions).mapToObj(i -> new PartitionGroupInfo(i,
-          getPartitionOffset(streamConfig, OffsetCriteria.SMALLEST_OFFSET_CRITERIA, i).toString()))
+          PARTITION_OFFSET.toString()))
           .collect(Collectors.toList());
     }
 
     @Override
-    LongMsgOffset getPartitionOffset(StreamConfig streamConfig, OffsetCriteria offsetCriteria, int partitionGroupId) {
-      // The criteria for this test should always be SMALLEST (for default streaming config and new added partitions)
-      assertTrue(offsetCriteria.isSmallest());
-      return PARTITION_OFFSET;
-    }
-
-    @Override
     boolean isExceededMaxSegmentCompletionTime(String realtimeTableName, String segmentName, long currentTimeMs) {
       return _exceededMaxSegmentCompletionTime;
     }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index bc49830..e6e1402 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -34,6 +34,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import javax.annotation.Nullable;
@@ -240,7 +241,6 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
   // Segment end criteria
   private volatile long _consumeEndTime = 0;
   private Checkpoint _finalOffset; // Used when we want to catch up to this one
-  private boolean _endOfPartitionGroup = false;
   private volatile boolean _shouldStop = false;
 
   // It takes 30s to locate controller leader, and more if there are multiple controller failures.
@@ -307,12 +307,6 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
               _numRowsIndexed, _numRowsConsumed, _segmentMaxRowCount);
           _stopReason = SegmentCompletionProtocol.REASON_ROW_LIMIT;
           return true;
-        } else if (_endOfPartitionGroup) {
-          // FIXME: handle numDocsIndexed == 0 case
-          segmentLogger.info("Stopping consumption due to end of partitionGroup reached nRows={} numRowsIndexed={}, numRowsConsumed={}",
-              _numRowsIndexed, _numRowsConsumed, _segmentMaxRowCount);
-          _stopReason = SegmentCompletionProtocol.REASON_END_OF_PARTITION_GROUP;
-          return true;
         }
         return false;
 
@@ -391,8 +385,10 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
       try {
         messageBatch = _partitionGroupConsumer
             .fetchMessages(_currentOffset, null, _partitionLevelStreamConfig.getFetchTimeoutMillis());
-        _endOfPartitionGroup = messageBatch.isEndOfPartitionGroup();
         consecutiveErrorCount = 0;
+      } catch (TimeoutException e) {
+        handleTransientStreamErrors(e);
+        continue;
       } catch (TransientConsumerException e) {
         handleTransientStreamErrors(e);
         continue;
@@ -1253,7 +1249,12 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
         //       long as the partition function is not changed.
         int numPartitions = columnPartitionConfig.getNumPartitions();
         try {
-          int numStreamPartitions = _streamMetadataProvider.fetchPartitionCount(/*maxWaitTimeMs=*/5000L);
+          // TODO: currentPartitionGroupMetadata should be fetched from idealState + segmentZkMetadata, so that we get back accurate partitionGroups info
+          //  However this is not an issue for Kafka, since partitionGroups never expire and every partitionGroup has a single partition
+          //  Fix this before opening support for partitioning in Kinesis
+          int numStreamPartitions = _streamMetadataProvider
+              .getPartitionGroupInfoList(_clientId, _partitionLevelStreamConfig,
+                  Collections.emptyList(), /*maxWaitTimeMs=*/5000).size();
           if (numStreamPartitions != numPartitions) {
             segmentLogger.warn(
                 "Number of stream partitions: {} does not match number of partitions in the partition config: {}, using number of stream partitions",
@@ -1335,7 +1336,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
       closeStreamMetadataProvider();
     }
     segmentLogger.info("Creating new stream metadata provider, reason: {}", reason);
-    _streamMetadataProvider = _streamConsumerFactory.createPartitionMetadataProvider(_clientId, _partitionGroupId);
+    _streamMetadataProvider = _streamConsumerFactory.createStreamMetadataProvider(_clientId);
   }
 
   // This should be done during commit? We may not always commit when we build a segment....
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
index 517f8c0..e1f8b05 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
@@ -22,12 +22,17 @@ import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.core.type.TypeReference;
 import java.io.IOException;
 import java.util.Map;
+import org.apache.pinot.spi.stream.Checkpoint;
 import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
 import org.apache.pinot.spi.utils.JsonUtils;
 
 
+/**
+ * A {@link Checkpoint} implementation for the Kinesis partition group consumption
+ * A partition group consists of 1 or more shards. The KinesisCheckpoint maintains a Map of shards to the sequenceNumber
+ */
 public class KinesisCheckpoint implements StreamPartitionMsgOffset {
-  private Map<String, String> _shardToStartSequenceMap;
+  private final Map<String, String> _shardToStartSequenceMap;
 
   public KinesisCheckpoint(Map<String, String> shardToStartSequenceMap) {
     _shardToStartSequenceMap = shardToStartSequenceMap;
@@ -68,6 +73,7 @@ public class KinesisCheckpoint implements StreamPartitionMsgOffset {
 
   @Override
   public int compareTo(Object o) {
-    return this._shardToStartSequenceMap.values().iterator().next().compareTo(((KinesisCheckpoint) o)._shardToStartSequenceMap.values().iterator().next());
+    return this._shardToStartSequenceMap.values().iterator().next()
+        .compareTo(((KinesisCheckpoint) o)._shardToStartSequenceMap.values().iterator().next());
   }
 }
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java
index 529f34f..fbe369f 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java
@@ -23,6 +23,9 @@ import org.apache.pinot.spi.stream.StreamConfig;
 import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
 
 
+/**
+ * Kinesis stream specific config
+ */
 public class KinesisConfig {
   public static final String STREAM = "stream";
   public static final String SHARD_ITERATOR_TYPE = "shard-iterator-type";
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java
index 4d968f6..61d065e 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java
@@ -27,14 +27,13 @@ import software.amazon.awssdk.services.kinesis.model.ListShardsResponse;
 import software.amazon.awssdk.services.kinesis.model.Shard;
 
 
+/**
+ * Manages the Kinesis stream connection, given the stream name and aws region
+ */
 public class KinesisConnectionHandler {
   KinesisClient _kinesisClient;
-  private String _stream;
-  private String _awsRegion;
-
-  public KinesisConnectionHandler() {
-
-  }
+  private final String _stream;
+  private final String _awsRegion;
 
   public KinesisConnectionHandler(String stream, String awsRegion) {
     _stream = stream;
@@ -42,12 +41,18 @@ public class KinesisConnectionHandler {
     createConnection();
   }
 
+  /**
+   * Lists all shards of the stream
+   */
   public List<Shard> getShards() {
     ListShardsResponse listShardsResponse =
         _kinesisClient.listShards(ListShardsRequest.builder().streamName(_stream).build());
     return listShardsResponse.shards();
   }
 
+  /**
+   * Creates a Kinesis client for the stream
+   */
   public void createConnection() {
     if (_kinesisClient == null) {
       _kinesisClient =
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
index 5cbd7e6..9c56f95 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
@@ -43,6 +43,9 @@ import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
 import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
 
 
+/**
+ * A {@link PartitionGroupConsumer} implementation for the Kinesis stream
+ */
 public class KinesisConsumer extends KinesisConnectionHandler implements PartitionGroupConsumer {
   private final Logger LOG = LoggerFactory.getLogger(KinesisConsumer.class);
   String _stream;
@@ -58,16 +61,19 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Partiti
     _executorService = Executors.newSingleThreadExecutor();
   }
 
+  /**
+   * Fetch records from the Kinesis stream between the start and end KinesisCheckpoint
+   */
   @Override
-  public KinesisRecordsBatch fetchMessages(Checkpoint start, Checkpoint end, int timeoutMs) {
+  public KinesisRecordsBatch fetchMessages(Checkpoint startCheckpoint, Checkpoint endCheckpoint, int timeoutMs) {
     List<Record> recordList = new ArrayList<>();
     Future<KinesisRecordsBatch> kinesisFetchResultFuture =
-        _executorService.submit(() -> getResult(start, end, recordList));
+        _executorService.submit(() -> getResult(startCheckpoint, endCheckpoint, recordList));
 
     try {
       return kinesisFetchResultFuture.get(timeoutMs, TimeUnit.MILLISECONDS);
     } catch (Exception e) {
-      return handleException((KinesisCheckpoint) start, recordList);
+      return handleException((KinesisCheckpoint) startCheckpoint, recordList);
     }
   }
 
@@ -81,6 +87,7 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Partiti
       }
 
       //TODO: iterate upon all the shardIds in the map
+      // Okay for now, since we have assumed that every partition group contains a single shard
       Map.Entry<String, String> next = kinesisStartCheckpoint.getShardToStartSequenceMap().entrySet().iterator().next();
       String shardIterator = getShardIterator(next.getKey(), next.getValue());
 
@@ -156,14 +163,11 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Partiti
       String nextStartSequenceNumber = recordList.get(recordList.size() - 1).sequenceNumber();
       Map<String, String> newCheckpoint = new HashMap<>(start.getShardToStartSequenceMap());
       newCheckpoint.put(newCheckpoint.keySet().iterator().next(), nextStartSequenceNumber);
-
-      return new KinesisRecordsBatch(recordList, shardId, false);
-    } else {
-      return new KinesisRecordsBatch(recordList, shardId, false);
     }
+    return new KinesisRecordsBatch(recordList, shardId, false);
   }
 
-  public String getShardIterator(String shardId, String sequenceNumber) {
+  private String getShardIterator(String shardId, String sequenceNumber) {
 
     GetShardIteratorRequest.Builder requestBuilder =
         GetShardIteratorRequest.builder().streamName(_stream).shardId(shardId).shardIteratorType(_shardIteratorType);
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java
index fc9c4af..6792fb9 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java
@@ -28,6 +28,9 @@ import org.apache.pinot.spi.stream.StreamMetadataProvider;
 import org.apache.pinot.spi.stream.StreamPartitionMsgOffsetFactory;
 
 
+/**
+ * {@link StreamConsumerFactory} implementation for the Kinesis stream
+ */
 public class KinesisConsumerFactory extends StreamConsumerFactory {
 
   @Override
@@ -43,7 +46,7 @@ public class KinesisConsumerFactory extends StreamConsumerFactory {
 
   @Override
   public StreamMetadataProvider createPartitionMetadataProvider(String clientId, int partition) {
-    return null;
+    throw new UnsupportedOperationException();
   }
 
   @Override
@@ -52,7 +55,8 @@ public class KinesisConsumerFactory extends StreamConsumerFactory {
   }
 
   @Override
-  public PartitionGroupConsumer createPartitionGroupConsumer(String clientId, PartitionGroupMetadata metadata) {
+  public PartitionGroupConsumer createPartitionGroupConsumer(String clientId,
+      PartitionGroupMetadata partitionGroupMetadata) {
     return new KinesisConsumer(new KinesisConfig(_streamConfig));
   }
 
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisMsgOffsetFactory.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisMsgOffsetFactory.java
index f234bae..8f6b932 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisMsgOffsetFactory.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisMsgOffsetFactory.java
@@ -1,11 +1,15 @@
 package org.apache.pinot.plugin.stream.kinesis;
 
 import java.io.IOException;
+import org.apache.pinot.spi.stream.PartitionGroupCheckpointFactory;
 import org.apache.pinot.spi.stream.StreamConfig;
 import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
 import org.apache.pinot.spi.stream.StreamPartitionMsgOffsetFactory;
 
 
+/**
+ * An implementation of the {@link PartitionGroupCheckpointFactory} for Kinesis stream
+ */
 public class KinesisMsgOffsetFactory implements StreamPartitionMsgOffsetFactory {
 
   KinesisConfig _kinesisConfig;
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisRecordsBatch.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisRecordsBatch.java
index b3eb626..83228ec 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisRecordsBatch.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisRecordsBatch.java
@@ -22,13 +22,14 @@ import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import javax.annotation.Nullable;
 import org.apache.pinot.spi.stream.MessageBatch;
-import org.apache.pinot.spi.stream.RowMetadata;
 import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
 import software.amazon.awssdk.services.kinesis.model.Record;
 
 
+/**
+ * A {@link MessageBatch} for collecting records from the Kinesis stream
+ */
 public class KinesisRecordsBatch implements MessageBatch<byte[]> {
   private final List<Record> _recordList;
   private final String _shardId;
@@ -49,6 +50,7 @@ public class KinesisRecordsBatch implements MessageBatch<byte[]> {
   public byte[] getMessageAtIndex(int index) {
     return _recordList.get(index).data().asByteArray();
   }
+
   @Override
   public int getMessageOffsetAtIndex(int index) {
     return ByteBuffer.wrap(_recordList.get(index).data().asByteArray()).arrayOffset();
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java
index 8968b56..1083969 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java
@@ -1,21 +1,14 @@
 package org.apache.pinot.plugin.stream.kinesis;
 
-import com.google.common.base.Preconditions;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
 import javax.annotation.Nonnull;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.stream.MessageBatch;
 import org.apache.pinot.spi.stream.OffsetCriteria;
 import org.apache.pinot.spi.stream.PartitionGroupConsumer;
@@ -28,6 +21,9 @@ import org.apache.pinot.spi.stream.StreamMetadataProvider;
 import software.amazon.awssdk.services.kinesis.model.Shard;
 
 
+/**
+ * A {@link StreamMetadataProvider} implementation for the Kinesis stream
+ */
 public class KinesisStreamMetadataProvider implements StreamMetadataProvider {
   private final KinesisConnectionHandler _kinesisConnectionHandler;
   private final StreamConsumerFactory _kinesisStreamConsumerFactory;
@@ -52,17 +48,23 @@ public class KinesisStreamMetadataProvider implements StreamMetadataProvider {
     throw new UnsupportedOperationException();
   }
 
+  /**
+   * This call returns all active shards, taking into account the consumption status for those shards.
+   * PartitionGroupInfo is returned for a shard if:
+   * 1. It is a branch new shard i.e. no partitionGroupMetadata was found for it in the current list
+   * 2. It is still being actively consumed from i.e. the consuming partition has not reached the end of the shard
+   */
   @Override
   public List<PartitionGroupInfo> getPartitionGroupInfoList(String clientId, StreamConfig streamConfig,
       List<PartitionGroupMetadata> currentPartitionGroupsMetadata, int timeoutMillis)
       throws IOException, TimeoutException {
 
-    Map<Integer, PartitionGroupMetadata> currentPartitionGroupMap =
-        currentPartitionGroupsMetadata.stream().collect(Collectors.toMap(PartitionGroupMetadata::getPartitionGroupId, p -> p));
+    Map<Integer, PartitionGroupMetadata> currentPartitionGroupMap = currentPartitionGroupsMetadata.stream()
+        .collect(Collectors.toMap(PartitionGroupMetadata::getPartitionGroupId, p -> p));
 
     List<PartitionGroupInfo> newPartitionGroupInfos = new ArrayList<>();
     List<Shard> shards = _kinesisConnectionHandler.getShards();
-    for (Shard shard : shards) { // go over all shards
+    for (Shard shard : shards) {
       KinesisCheckpoint newStartCheckpoint;
 
       String shardId = shard.shardId();
@@ -76,7 +78,7 @@ public class KinesisStreamMetadataProvider implements StreamMetadataProvider {
         } catch (Exception e) {
           // ignore. No end checkpoint yet for IN_PROGRESS segment
         }
-        if (currentEndCheckpoint != null) { // end checkpoint available i.e. committing segment
+        if (currentEndCheckpoint != null) { // end checkpoint available i.e. committing/committed segment
           String endingSequenceNumber = shard.sequenceNumberRange().endingSequenceNumber();
           if (endingSequenceNumber != null) { // shard has ended
             // check if segment has consumed all the messages already
@@ -104,8 +106,7 @@ public class KinesisStreamMetadataProvider implements StreamMetadataProvider {
         newStartCheckpoint = new KinesisCheckpoint(shardToSequenceNumberMap);
       }
 
-      newPartitionGroupInfos
-          .add(new PartitionGroupInfo(partitionGroupId, newStartCheckpoint.serialize()));
+      newPartitionGroupInfos.add(new PartitionGroupInfo(partitionGroupId, newStartCheckpoint.serialize()));
     }
     return newPartitionGroupInfos;
   }
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/Checkpoint.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/Checkpoint.java
index bae8832..b7a9dba 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/Checkpoint.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/Checkpoint.java
@@ -18,7 +18,12 @@
  */
 package org.apache.pinot.spi.stream;
 
+/**
+ * Keeps track of the consumption for a PartitionGroup
+ */
 public interface Checkpoint extends Comparable {
+
   String serialize();
+
   Checkpoint deserialize(String checkpointStr);
 }
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupCheckpointFactory.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupCheckpointFactory.java
index 14d2f39..4bd7839 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupCheckpointFactory.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupCheckpointFactory.java
@@ -18,32 +18,22 @@
  */
 package org.apache.pinot.spi.stream;
 
-import org.apache.pinot.spi.annotations.InterfaceStability;
-
-
 /**
  * An interface to be implemented by streams that are consumed using Pinot LLC consumption.
  */
-@InterfaceStability.Evolving
 public interface PartitionGroupCheckpointFactory {
   /**
    * Initialization, called once when the factory is created.
-   * @param streamConfig
    */
   void init(StreamConfig streamConfig);
 
   /**
-   * Construct an offset from the string provided.
-   * @param offsetStr
-   * @return StreamPartitionMsgOffset
+   * Construct a checkpoint from the string provided.
    */
   Checkpoint create(String offsetStr);
 
   /**
    * Construct an offset from another one provided, of the same type.
-   *
-   * @param other
-   * @return
    */
   Checkpoint create(Checkpoint other);
 }
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java
index b421268..72b59d7 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java
@@ -22,7 +22,21 @@ import java.io.Closeable;
 import java.util.concurrent.TimeoutException;
 
 
+/**
+ * Consumer interface for consuming from a partition group of a stream
+ */
 public interface PartitionGroupConsumer extends Closeable {
-  MessageBatch fetchMessages(Checkpoint start, Checkpoint end, int timeout)
+
+  /**
+   * Fetch messages and offsets from the stream partition group
+   *
+   * @param startCheckpoint The offset of the first message desired, inclusive
+   * @param endCheckpoint The offset of the last message desired, exclusive, or null
+   * @param timeoutMs Timeout in milliseconds
+   * @throws java.util.concurrent.TimeoutException If the operation could not be completed within {@code timeoutMillis}
+   * milliseconds
+   * @return An iterable containing messages fetched from the stream partition and their offsets
+   */
+  MessageBatch fetchMessages(Checkpoint startCheckpoint, Checkpoint endCheckpoint, int timeoutMs)
       throws TimeoutException;
 }
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupInfo.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupInfo.java
index 438e148..758953d 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupInfo.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupInfo.java
@@ -18,21 +18,22 @@
  */
 package org.apache.pinot.spi.stream;
 
+/**
+ * A PartitionGroup is a group of partitions/shards that the same consumer should consume from.
+ * This class is a container for the metadata of a partition group. It consists of
+ * 1. A unique partition group id for this partition group
+ * 2. The start checkpoint to begin consumption for this partition group
+ */
 public class PartitionGroupInfo {
 
-  // fixme: Make partitionGroupId string everywhere (LLCSegmentName, StreamMetadataProvider)
   private final int _partitionGroupId;
-  private String _startCheckpoint;
+  private final String _startCheckpoint;
 
   public PartitionGroupInfo(int partitionGroupId, String startCheckpoint) {
     _partitionGroupId = partitionGroupId;
     _startCheckpoint = startCheckpoint;
   }
 
-  public void setStartCheckpoint(String startCheckpoint) {
-    _startCheckpoint = startCheckpoint;
-  }
-
   public int getPartitionGroupId() {
     return _partitionGroupId;
   }
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupInfoFetcher.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupInfoFetcher.java
index f2d3f17..9c746e8 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupInfoFetcher.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupInfoFetcher.java
@@ -25,7 +25,7 @@ import org.slf4j.LoggerFactory;
 
 
 /**
- * Fetches the partition count of a stream using the {@link StreamMetadataProvider}
+ * Creates a list of PartitionGroupInfo for all partition groups of the stream using the {@link StreamMetadataProvider}
  */
 public class PartitionGroupInfoFetcher implements Callable<Boolean> {
 
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadata.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadata.java
index aaf20b6..a99a82b 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadata.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadata.java
@@ -18,12 +18,8 @@
  */
 package org.apache.pinot.spi.stream;
 
-import java.util.List;
-
-
 public class PartitionGroupMetadata {
 
-  // fixme: Make partitionGroupId string everywhere (LLCSegmentName, StreamMetadataProvider)
   private final int _partitionGroupId;
   private int _sequenceNumber;
   private String _startCheckpoint;
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionLevelConsumer.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionLevelConsumer.java
index 3bedc8a..3f5b230 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionLevelConsumer.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionLevelConsumer.java
@@ -63,9 +63,9 @@ public interface PartitionLevelConsumer extends Closeable, PartitionGroupConsume
     return fetchMessages(startOffsetLong, endOffsetLong, timeoutMillis);
   }
 
-  default MessageBatch fetchMessages(Checkpoint startCheckpoint, Checkpoint endCheckpoint, int timeoutMillis)
+  default MessageBatch fetchMessages(Checkpoint startCheckpoint, Checkpoint endCheckpoint, int timeoutMs)
       throws java.util.concurrent.TimeoutException {
-    // TODO Issue 5359 remove this default implementation once all kafka consumers have migrated to use this API
-    return fetchMessages((StreamPartitionMsgOffset) startCheckpoint, (StreamPartitionMsgOffset) endCheckpoint, timeoutMillis);
+    return fetchMessages((StreamPartitionMsgOffset) startCheckpoint, (StreamPartitionMsgOffset) endCheckpoint,
+        timeoutMs);
   }
 }
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionOffsetFetcher.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionOffsetFetcher.java
deleted file mode 100644
index b92f04d..0000000
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionOffsetFetcher.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.spi.stream;
-
-import java.util.concurrent.Callable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * Fetches the partition offset for a stream given the offset criteria, using the {@link StreamMetadataProvider}
- */
-public class PartitionOffsetFetcher implements Callable<Boolean> {
-
-  private static final Logger LOGGER = LoggerFactory.getLogger(PartitionOffsetFetcher.class);
-  private static final int STREAM_PARTITION_OFFSET_FETCH_TIMEOUT_MILLIS = 10000;
-
-  private final String _topicName;
-  private final OffsetCriteria _offsetCriteria;
-  private final int _partitionGroupId;
-
-  private Exception _exception = null;
-  private StreamPartitionMsgOffset _offset;
-  private StreamConsumerFactory _streamConsumerFactory;
-  StreamConfig _streamConfig;
-
-  public PartitionOffsetFetcher(final OffsetCriteria offsetCriteria, int partitionGroupId, StreamConfig streamConfig) {
-    _offsetCriteria = offsetCriteria;
-    _partitionGroupId = partitionGroupId;
-    _streamConfig = streamConfig;
-    _streamConsumerFactory = StreamConsumerFactoryProvider.create(streamConfig);
-    _topicName = streamConfig.getTopicName();
-  }
-
-  public StreamPartitionMsgOffset getOffset() {
-    return _offset;
-  }
-
-  public Exception getException() {
-    return _exception;
-  }
-
-  /**
-   * Callable to fetch the offset of the partition given the stream metadata and offset criteria
-   * @return
-   * @throws Exception
-   */
-  @Override
-  public Boolean call()
-      throws Exception {
-    String clientId = PartitionOffsetFetcher.class.getSimpleName() + "-" + _topicName + "-" + _partitionGroupId;
-    try (StreamMetadataProvider streamMetadataProvider = _streamConsumerFactory
-        .createPartitionMetadataProvider(clientId, _partitionGroupId)) {
-      _offset =
-          streamMetadataProvider.fetchStreamPartitionOffset(_offsetCriteria, STREAM_PARTITION_OFFSET_FETCH_TIMEOUT_MILLIS);
-      if (_exception != null) {
-        LOGGER.info("Successfully retrieved offset({}) for stream topic {} partition {}", _offset, _topicName,
-            _partitionGroupId);
-      }
-      return Boolean.TRUE;
-    } catch (TransientConsumerException e) {
-      LOGGER.warn("Temporary exception when fetching offset for topic {} partition {}:{}", _topicName,
-          _partitionGroupId,
-          e.getMessage());
-      _exception = e;
-      return Boolean.FALSE;
-    } catch (Exception e) {
-      _exception = e;
-      throw e;
-    }
-  }
-}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java
index f993fed..ac928c5 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java
@@ -41,7 +41,6 @@ public abstract class StreamConsumerFactory {
    * @param partition the partition id of the partition for which this consumer is being created
    * @return
    */
-  @Deprecated
   public abstract PartitionLevelConsumer createPartitionLevelConsumer(String clientId, int partition);
 
   /**
@@ -74,8 +73,11 @@ public abstract class StreamConsumerFactory {
     return new LongMsgOffsetFactory();
   }
 
-  // creates a consumer which consumes from a partition group
-  public PartitionGroupConsumer createPartitionGroupConsumer(String clientId, PartitionGroupMetadata metadata) {
-    return createPartitionLevelConsumer(clientId, metadata.getPartitionGroupId());
+  /**
+   * Creates a partition group consumer, which can fetch messages from a partition group
+   */
+  public PartitionGroupConsumer createPartitionGroupConsumer(String clientId,
+      PartitionGroupMetadata partitionGroupMetadata) {
+    return createPartitionLevelConsumer(clientId, partitionGroupMetadata.getPartitionGroupId());
   }
 }
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
index be2e819..cecc708 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
@@ -60,7 +60,7 @@ public interface StreamMetadataProvider extends Closeable {
   }
 
   /**
-   * Fetch the partitionGroupMetadata list.
+   * Fetch the list of partition group info for the latest state of the stream
    * @param currentPartitionGroupsMetadata In case of Kafka, each partition group contains a single partition.
    */
   default List<PartitionGroupInfo> getPartitionGroupInfoList(String clientId, StreamConfig streamConfig,
@@ -69,14 +69,13 @@ public interface StreamMetadataProvider extends Closeable {
     int partitionCount = fetchPartitionCount(timeoutMillis);
     List<PartitionGroupInfo> newPartitionGroupInfoList = new ArrayList<>(partitionCount);
 
-    // add a PartitionGroupInfo into the list foreach partition already present in current.
-    // the end checkpoint is set as checkpoint
+    // Add a PartitionGroupInfo into the list foreach partition already present in current.
     for (PartitionGroupMetadata currentPartitionGroupMetadata : currentPartitionGroupsMetadata) {
       newPartitionGroupInfoList.add(new PartitionGroupInfo(currentPartitionGroupMetadata.getPartitionGroupId(),
           currentPartitionGroupMetadata.getEndCheckpoint()));
     }
-    // add PartitiongroupInfo for new partitions
-    // use offset criteria from stream config
+    // Add PartitionGroupInfo for new partitions
+    // Use offset criteria from stream config
     StreamConsumerFactory streamConsumerFactory = StreamConsumerFactoryProvider.create(streamConfig);
     for (int i = currentPartitionGroupsMetadata.size(); i < partitionCount; i++) {
       StreamMetadataProvider partitionMetadataProvider =


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org