You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2021/02/02 02:42:55 UTC

[incubator-pinot] 05/47: Separate PartitionGroupInfo and PartitionGroupMetadata

This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch sharded_consumer_type_support_with_kinesis
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit a7fba5a7ffc843ea576d23e330cd2fd8441ee5fb
Author: Neha Pawar <ne...@gmail.com>
AuthorDate: Thu Dec 31 14:46:46 2020 -0800

    Separate PartitionGroupInfo and PartitionGroupMetadata
---
 .../helix/core/PinotHelixResourceManager.java      |  12 +--
 .../realtime/PinotLLCRealtimeSegmentManager.java   | 108 ++++++++-------------
 .../impl/fakestream/FakeStreamConsumerFactory.java |   3 +-
 .../kafka09/KafkaPartitionLevelConsumerTest.java   |   2 +-
 .../kafka20/KafkaStreamMetadataProvider.java       |  28 ++++--
 .../pinot/spi/stream/PartitionGroupInfo.java       |  43 ++++++++
 .../pinot/spi/stream/StreamMetadataProvider.java   |   6 +-
 7 files changed, 115 insertions(+), 87 deletions(-)

diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 1f36e4f..f0d52bc 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -126,12 +126,7 @@ import org.apache.pinot.spi.config.table.TenantConfig;
 import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
 import org.apache.pinot.spi.config.tenant.Tenant;
 import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.stream.PartitionGroupMetadata;
-import org.apache.pinot.spi.stream.PartitionLevelStreamConfig;
 import org.apache.pinot.spi.stream.StreamConfig;
-import org.apache.pinot.spi.stream.StreamConsumerFactory;
-import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
-import org.apache.pinot.spi.stream.StreamMetadataProvider;
 import org.apache.pinot.spi.utils.IngestionConfigUtils;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.apache.pinot.spi.utils.retry.RetryPolicies;
@@ -1361,9 +1356,8 @@ public class PinotHelixResourceManager {
       idealState = PinotTableIdealStateBuilder
           .buildLowLevelRealtimeIdealStateFor(realtimeTableName, realtimeTableConfig, idealState,
               _enableBatchMessageMode);
-      _pinotLLCRealtimeSegmentManager.setUpNewTable(realtimeTableConfig, idealState);
-      LOGGER.info("Successfully added Helix entries for low-level consumers for {} ", realtimeTableName);
-      _pinotLLCRealtimeSegmentManager.setupNewShardedTable(rawRealtimeTableConfig, idealState);
+      _pinotLLCRealtimeSegmentManager.setupNewShardedTable(realtimeTableConfig, idealState);
+      LOGGER.info("Successfully setup table for SHARDED consumers for {} ", realtimeTableName);
     } else {
 
       if (streamConfig.hasHighLevelConsumerType()) {
@@ -1391,7 +1385,7 @@ public class PinotHelixResourceManager {
           idealState = PinotTableIdealStateBuilder
               .buildLowLevelRealtimeIdealStateFor(realtimeTableName, realtimeTableConfig, idealState,
                   _enableBatchMessageMode);
-          _pinotLLCRealtimeSegmentManager.setUpNewTable(realtimeTableConfig, idealState);
+          _pinotLLCRealtimeSegmentManager.setupNewShardedTable(realtimeTableConfig, idealState);
           LOGGER.info("Successfully added Helix entries for low-level consumers for {} ", realtimeTableName);
         } else {
           LOGGER.info("LLC is already set up for table {}, not configuring again", realtimeTableName);
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 9b03fa4..528125b 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -29,6 +29,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -78,6 +79,7 @@ import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
 import org.apache.pinot.spi.filesystem.PinotFS;
 import org.apache.pinot.spi.filesystem.PinotFSFactory;
 import org.apache.pinot.spi.stream.OffsetCriteria;
+import org.apache.pinot.spi.stream.PartitionGroupInfo;
 import org.apache.pinot.spi.stream.PartitionGroupMetadata;
 import org.apache.pinot.spi.stream.PartitionLevelStreamConfig;
 import org.apache.pinot.spi.stream.PartitionOffsetFetcher;
@@ -221,8 +223,14 @@ public class PinotLLCRealtimeSegmentManager {
     StreamConsumerFactory streamConsumerFactory = StreamConsumerFactoryProvider.create(streamConfig);
     StreamMetadataProvider streamMetadataProvider = streamConsumerFactory
         .createStreamMetadataProvider(streamConfig.getTopicName() + "_" + System.currentTimeMillis());
-    List<PartitionGroupMetadata> newPartitionGroupMetadataList =
-        streamMetadataProvider.getPartitionGroupMetadataList(Collections.emptyList(), 5000);
+
+    List<PartitionGroupInfo> newPartitionGroupMetadataList;
+    try {
+      newPartitionGroupMetadataList =
+          streamMetadataProvider.getPartitionGroupMetadataList(Collections.emptyList(), 5000);
+    } catch (TimeoutException e) {
+      throw new IllegalStateException(e);
+    }
     int numPartitionGroups = newPartitionGroupMetadataList.size();
 
     InstancePartitions instancePartitions = getConsumingInstancePartitions(tableConfig);
@@ -234,8 +242,8 @@ public class PinotLLCRealtimeSegmentManager {
 
     long currentTimeMs = getCurrentTimeMs();
     Map<String, Map<String, String>> instanceStatesMap = idealState.getRecord().getMapFields();
-    for (PartitionGroupMetadata partitionGroupMetadata : newPartitionGroupMetadataList) {
-      String segmentName = setupNewPartitionGroup(tableConfig, streamConfig, partitionGroupMetadata.getPartitionGroupId(),
+    for (PartitionGroupInfo partitionGroupInfo : newPartitionGroupMetadataList) {
+      String segmentName = setupNewPartitionGroup(tableConfig, streamConfig, partitionGroupInfo,
           currentTimeMs, instancePartitions, numPartitionGroups, numReplicas);
       updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null, segmentName, segmentAssignment,
           instancePartitionsMap);
@@ -277,50 +285,6 @@ public class PinotLLCRealtimeSegmentManager {
   }
 
   /**
-   * Sets up the initial segments for a new LLC real-time table.
-   * <p>NOTE: the passed in IdealState may contain HLC segments if both HLC and LLC are configured.
-   */
-  public void setUpNewTable(TableConfig tableConfig, IdealState idealState) {
-    Preconditions.checkState(!_isStopping, "Segment manager is stopping");
-
-    String realtimeTableName = tableConfig.getTableName();
-    LOGGER.info("Setting up new LLC table: {}", realtimeTableName);
-
-    // Make sure all the existing segments are HLC segments
-    List<String> currentSegments = getAllSegments(realtimeTableName);
-    for (String segmentName : currentSegments) {
-      // TODO: Should return 4xx HTTP status code. Currently all exceptions are returning 500
-      Preconditions.checkState(SegmentName.isHighLevelConsumerSegmentName(segmentName),
-          "Cannot set up new LLC table: %s with existing non-HLC segment: %s", realtimeTableName, segmentName);
-    }
-
-    _flushThresholdUpdateManager.clearFlushThresholdUpdater(realtimeTableName);
-
-    PartitionLevelStreamConfig streamConfig =
-        new PartitionLevelStreamConfig(tableConfig.getTableName(), IngestionConfigUtils.getStreamConfigMap(tableConfig));
-    InstancePartitions instancePartitions = getConsumingInstancePartitions(tableConfig);
-    List<PartitionGroupMetadata> currentPartitionGroupMetadataList = getCurrentPartitionGroupMetadataList(idealState);
-    int numPartitionGroups = getPartitionGroupMetadataList(streamConfig, currentPartitionGroupMetadataList).size();
-    int numReplicas = getNumReplicas(tableConfig, instancePartitions);
-
-    SegmentAssignment segmentAssignment = SegmentAssignmentFactory.getSegmentAssignment(_helixManager, tableConfig);
-    Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap =
-        Collections.singletonMap(InstancePartitionsType.CONSUMING, instancePartitions);
-
-    long currentTimeMs = getCurrentTimeMs();
-    Map<String, Map<String, String>> instanceStatesMap = idealState.getRecord().getMapFields();
-    for (int partitionGroupId = 0; partitionGroupId < numPartitionGroups; partitionGroupId++) {
-      String segmentName =
-          setupNewPartitionGroup(tableConfig, streamConfig, partitionGroupId, currentTimeMs, instancePartitions, numPartitionGroups,
-              numReplicas);
-      updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null, segmentName, segmentAssignment,
-          instancePartitionsMap);
-    }
-
-    setIdealState(realtimeTableName, idealState);
-  }
-
-  /**
    * Removes all LLC segments from the given IdealState.
    */
   public void removeLLCSegments(IdealState idealState) {
@@ -538,15 +502,23 @@ public class PinotLLCRealtimeSegmentManager {
     // Step-2
 
     // Say we currently were consuming from 3 shards A, B, C. Of those, A is the one committing. Also suppose that new partition D has come up
+
     // get current partition groups - this gives current state of latest segments for each partition [A - DONE], [B - IN_PROGRESS], [C - IN_PROGRESS]
     List<PartitionGroupMetadata> currentPartitionGroupMetadataList = getCurrentPartitionGroupMetadataList(idealState);
-    StreamConfig streamConfig = new StreamConfig(tableConfig.getTableName(), IngestionConfigUtils.getStreamConfigMap(tableConfig));
+    PartitionLevelStreamConfig streamConfig = new PartitionLevelStreamConfig(tableConfig.getTableName(),
+        IngestionConfigUtils.getStreamConfigMap(tableConfig));
     StreamConsumerFactory streamConsumerFactory = StreamConsumerFactoryProvider.create(streamConfig);
     StreamMetadataProvider streamMetadataProvider = streamConsumerFactory
         .createStreamMetadataProvider(streamConfig.getTopicName() + " " + System.currentTimeMillis());
+
     // find new partition groups [A],[B],[C],[D]
-    List<PartitionGroupMetadata> newPartitionGroupMetadataList =
-        streamMetadataProvider.getPartitionGroupMetadataList(currentPartitionGroupMetadataList, 1000);
+    List<PartitionGroupInfo> newPartitionGroupMetadataList;
+    try {
+      newPartitionGroupMetadataList =
+          streamMetadataProvider.getPartitionGroupMetadataList(currentPartitionGroupMetadataList, 1000);
+    } catch (TimeoutException e) {
+      throw new IllegalStateException(e);
+    }
 
     // create new segment metadata, only if it is not IN_PROGRESS in the current state
     Map<Integer, PartitionGroupMetadata> currentGroupIdToMetadata = currentPartitionGroupMetadataList.stream().collect(
@@ -555,25 +527,24 @@ public class PinotLLCRealtimeSegmentManager {
     List<String> newConsumingSegmentNames = new ArrayList<>();
     String rawTableName = TableNameBuilder.extractRawTableName(realtimeTableName);
     long newSegmentCreationTimeMs = getCurrentTimeMs();
-    for (PartitionGroupMetadata partitionGroupMetadata : newPartitionGroupMetadataList) {
-      int newPartitionGroupId = partitionGroupMetadata.getPartitionGroupId();
+    for (PartitionGroupInfo partitionGroupInfo : newPartitionGroupMetadataList) {
+      int newPartitionGroupId = partitionGroupInfo.getPartitionGroupId();
       PartitionGroupMetadata currentPartitionGroupMetadata = currentGroupIdToMetadata.get(newPartitionGroupId);
-      if (currentPartitionGroupMetadata == null) { // not present in current state
+      if (currentPartitionGroupMetadata == null) { // not present in current state. New partition found.
         // make new segment
-        LLCSegmentName newLLCSegmentName =
-            new LLCSegmentName(rawTableName, newPartitionGroupId, STARTING_SEQUENCE_NUMBER, newSegmentCreationTimeMs);
-        createNewSegmentZKMetadata(tableConfig, new PartitionLevelStreamConfig(tableConfig.getTableName(),
-                IngestionConfigUtils.getStreamConfigMap(tableConfig)), newLLCSegmentName, newSegmentCreationTimeMs,
-            committingSegmentDescriptor, committingSegmentZKMetadata, instancePartitions, numPartitions, numReplicas);
-        newConsumingSegmentNames.add(newLLCSegmentName.getSegmentName());
+        String newLLCSegmentName =
+            setupNewPartitionGroup(tableConfig, streamConfig, partitionGroupInfo, newSegmentCreationTimeMs,
+                instancePartitions, numPartitions, numReplicas);
+        newConsumingSegmentNames.add(newLLCSegmentName);
       } else {
         String currentStatus = currentPartitionGroupMetadata.getStatus();
-        if (!currentStatus.equals(Status.IN_PROGRESS.toString())) { // not IN_PROGRESS anymore in current state
-          // make new segment
+        if (!currentStatus.equals(Status.IN_PROGRESS.toString())) {
+          // not IN_PROGRESS anymore in current state. Should be DONE.
+          // This should ONLY happen for the committing segment's partition. Need to trigger new consuming segment
+          // todo: skip this if the partition doesn't match with the committing segment?
           LLCSegmentName newLLCSegmentName = new LLCSegmentName(rawTableName, newPartitionGroupId,
               currentPartitionGroupMetadata.getSequenceNumber() + 1, newSegmentCreationTimeMs);
-          createNewSegmentZKMetadata(tableConfig, new PartitionLevelStreamConfig(tableConfig.getTableName(),
-                  IngestionConfigUtils.getStreamConfigMap(tableConfig)), newLLCSegmentName, newSegmentCreationTimeMs,
+          createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName, newSegmentCreationTimeMs,
               committingSegmentDescriptor, committingSegmentZKMetadata, instancePartitions, numPartitions, numReplicas);
           newConsumingSegmentNames.add(newLLCSegmentName.getSegmentName());
         }
@@ -1181,19 +1152,20 @@ public class PinotLLCRealtimeSegmentManager {
    * Sets up a new partition.
    * <p>Persists the ZK metadata for the first CONSUMING segment, and returns the segment name.
    */
-  private String setupNewPartitionGroup(TableConfig tableConfig, PartitionLevelStreamConfig streamConfig, int partitionGroupId,
+  private String setupNewPartitionGroup(TableConfig tableConfig, PartitionLevelStreamConfig streamConfig, PartitionGroupInfo partitionGroupInfo,
       long creationTimeMs, InstancePartitions instancePartitions, int numPartitionGroups, int numReplicas) {
     String realtimeTableName = tableConfig.getTableName();
+    int partitionGroupId = partitionGroupInfo.getPartitionGroupId();
+    String startCheckpoint = partitionGroupInfo.getStartCheckpoint();
     LOGGER.info("Setting up new partition group: {} for table: {}", partitionGroupId, realtimeTableName);
 
     String rawTableName = TableNameBuilder.extractRawTableName(realtimeTableName);
     LLCSegmentName newLLCSegmentName =
         new LLCSegmentName(rawTableName, partitionGroupId, STARTING_SEQUENCE_NUMBER, creationTimeMs);
     String newSegmentName = newLLCSegmentName.getSegmentName();
-    StreamPartitionMsgOffset startOffset =
-        getPartitionOffset(streamConfig, streamConfig.getOffsetCriteria(), partitionGroupId);
+
     CommittingSegmentDescriptor committingSegmentDescriptor =
-        new CommittingSegmentDescriptor(null, startOffset.toString(), 0);
+        new CommittingSegmentDescriptor(null, startCheckpoint, 0);
     createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName, creationTimeMs,
         committingSegmentDescriptor, null, instancePartitions, numPartitionGroups, numReplicas);
 
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java
index 289b226..54be1b6 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.core.realtime.impl.fakestream;
 
+import java.util.Collections;
 import java.util.Set;
 import org.apache.pinot.core.util.IngestionUtils;
 import org.apache.pinot.spi.config.table.TableConfig;
@@ -87,7 +88,7 @@ public class FakeStreamConsumerFactory extends StreamConsumerFactory {
 
     // stream metadata provider
     StreamMetadataProvider streamMetadataProvider = streamConsumerFactory.createStreamMetadataProvider(clientId);
-    int partitionCount = streamMetadataProvider.getPartitionGroupMetadataList(null, 10_000).size();
+    int partitionCount = streamMetadataProvider.getPartitionGroupMetadataList(Collections.emptyList(), 10_000).size();
     System.out.println(partitionCount);
 
     // Partition metadata provider
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/test/java/org/apache/pinot/plugin/stream/kafka09/KafkaPartitionLevelConsumerTest.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/test/java/org/apache/pinot/plugin/stream/kafka09/KafkaPartitionLevelConsumerTest.java
index fbdfdfb..43b72a8 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/test/java/org/apache/pinot/plugin/stream/kafka09/KafkaPartitionLevelConsumerTest.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/test/java/org/apache/pinot/plugin/stream/kafka09/KafkaPartitionLevelConsumerTest.java
@@ -291,7 +291,7 @@ public class KafkaPartitionLevelConsumerTest {
 
     KafkaStreamMetadataProvider streamMetadataProvider =
         new KafkaStreamMetadataProvider(clientId, streamConfig, mockKafkaSimpleConsumerFactory);
-    Assert.assertEquals(streamMetadataProvider.getPartitionGroupMetadataList(null, 10000L), 2);
+    Assert.assertEquals(streamMetadataProvider.getPartitionGroupMetadataList(Collections.emptyList(), 10000L), 2);
   }
 
   @Test
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
index 187c61b..eb606f2 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
@@ -29,6 +29,7 @@ import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import org.apache.pinot.spi.stream.LongMsgOffset;
 import org.apache.pinot.spi.stream.OffsetCriteria;
+import org.apache.pinot.spi.stream.PartitionGroupInfo;
 import org.apache.pinot.spi.stream.PartitionGroupMetadata;
 import org.apache.pinot.spi.stream.StreamConfig;
 import org.apache.pinot.spi.stream.StreamMetadataProvider;
@@ -37,12 +38,15 @@ import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
 
 public class KafkaStreamMetadataProvider extends KafkaPartitionLevelConnectionHandler implements StreamMetadataProvider {
 
+  private StreamConfig _streamConfig;
+
   public KafkaStreamMetadataProvider(String clientId, StreamConfig streamConfig) {
     this(clientId, streamConfig, Integer.MIN_VALUE);
   }
 
   public KafkaStreamMetadataProvider(String clientId, StreamConfig streamConfig, int partition) {
     super(clientId, streamConfig, partition);
+    _streamConfig = streamConfig;
   }
 
   @Override
@@ -57,14 +61,26 @@ public class KafkaStreamMetadataProvider extends KafkaPartitionLevelConnectionHa
    *                                       Hence current partition groups are not needed to compute the new partition groups
    */
   @Override
-  public List<PartitionGroupMetadata> getPartitionGroupMetadataList(
-      @Nullable List<PartitionGroupMetadata> currentPartitionGroupsMetadata, long timeoutMillis) {
+  public List<PartitionGroupInfo> getPartitionGroupMetadataList(
+      List<PartitionGroupMetadata> currentPartitionGroupsMetadata, long timeoutMillis)
+      throws TimeoutException {
     int partitionCount = _consumer.partitionsFor(_topic, Duration.ofMillis(timeoutMillis)).size();
-    List<PartitionGroupMetadata> partitionGroupMetadataList = new ArrayList<>(partitionCount);
-    for (int i = 0; i < partitionCount; i++) {
-      partitionGroupMetadataList.add(new KafkaPartitionGroupMetadata(i));
+    List<PartitionGroupInfo> newPartitionGroupInfoList = new ArrayList<>(partitionCount);
+
+    // add a PartitionGroupInfo into the list foreach partition already present in current.
+    // the end checkpoint is set as checkpoint
+    for (PartitionGroupMetadata currentPartitionGroupMetadata : currentPartitionGroupsMetadata) {
+      newPartitionGroupInfoList.add(new PartitionGroupInfo(currentPartitionGroupMetadata.getPartitionGroupId(),
+          currentPartitionGroupMetadata.getEndCheckpoint()));
+    }
+    // add PartitiongroupInfo for new partitions
+    // use offset criteria from stream config
+    for (int i = currentPartitionGroupsMetadata.size(); i < partitionCount; i++) {
+      StreamPartitionMsgOffset streamPartitionMsgOffset =
+          fetchStreamPartitionOffset(_streamConfig.getOffsetCriteria(), 5000);
+      newPartitionGroupInfoList.add(new PartitionGroupInfo(i, streamPartitionMsgOffset.toString()));
     }
-    return partitionGroupMetadataList;
+    return newPartitionGroupInfoList;
   }
 
   public synchronized long fetchPartitionOffset(@Nonnull OffsetCriteria offsetCriteria, long timeoutMillis)
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupInfo.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupInfo.java
new file mode 100644
index 0000000..438e148
--- /dev/null
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupInfo.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.stream;
+
+public class PartitionGroupInfo {
+
+  // fixme: Make partitionGroupId string everywhere (LLCSegmentName, StreamMetadataProvider)
+  private final int _partitionGroupId;
+  private String _startCheckpoint;
+
+  public PartitionGroupInfo(int partitionGroupId, String startCheckpoint) {
+    _partitionGroupId = partitionGroupId;
+    _startCheckpoint = startCheckpoint;
+  }
+
+  public void setStartCheckpoint(String startCheckpoint) {
+    _startCheckpoint = startCheckpoint;
+  }
+
+  public int getPartitionGroupId() {
+    return _partitionGroupId;
+  }
+
+  public String getStartCheckpoint() {
+    return _startCheckpoint;
+  }
+}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
index 5b9104e..a9cd2d6 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
@@ -20,6 +20,7 @@ package org.apache.pinot.spi.stream;
 
 import java.io.Closeable;
 import java.util.List;
+import java.util.concurrent.TimeoutException;
 import javax.annotation.Nonnull;
 import org.apache.pinot.spi.annotations.InterfaceAudience;
 import org.apache.pinot.spi.annotations.InterfaceStability;
@@ -39,8 +40,9 @@ public interface StreamMetadataProvider extends Closeable {
   int fetchPartitionCount(long timeoutMillis);
 
   // takes the current state of partition groups (groupings of shards, the state of the consumption) and creates the new state
-  List<PartitionGroupMetadata> getPartitionGroupMetadataList(
-      List<PartitionGroupMetadata> currentPartitionGroupsMetadata, long timeoutMillis);
+  List<PartitionGroupInfo> getPartitionGroupMetadataList(
+      List<PartitionGroupMetadata> currentPartitionGroupsMetadata, long timeoutMillis)
+      throws TimeoutException;
 
   // Issue 5953 Retain this interface for 0.5.0, remove in 0.6.0
   @Deprecated


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org