You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2021/02/02 02:42:53 UTC

[incubator-pinot] 03/47: Rename partitionId to partitionGroupId

This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch sharded_consumer_type_support_with_kinesis
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 3892fc417c2f7d07e15b78eae1e1b3dd09e60090
Author: Neha Pawar <ne...@gmail.com>
AuthorDate: Wed Dec 30 13:46:22 2020 -0800

    Rename partitionId to partitionGroupId
---
 .../segmentselector/RealtimeSegmentSelector.java   |  2 +-
 .../apache/pinot/common/utils/LLCSegmentName.java  | 24 +++++-----
 .../org/apache/pinot/common/utils/SegmentName.java |  2 +-
 .../pinot/common/utils/SegmentNameBuilderTest.java |  6 +--
 .../helix/core/PinotHelixResourceManager.java      |  5 ++-
 .../helix/core/PinotTableIdealStateBuilder.java    | 15 ++++---
 .../segment/RealtimeSegmentAssignment.java         |  6 +--
 .../RealtimeToOfflineSegmentsTaskGenerator.java    |  4 +-
 .../realtime/PinotLLCRealtimeSegmentManager.java   | 51 ++++++++++++----------
 .../SegmentSizeBasedFlushThresholdUpdater.java     |  2 +-
 .../PinotLLCRealtimeSegmentManagerTest.java        | 16 ++++---
 .../realtime/LLRealtimeSegmentDataManager.java     | 34 ++++++++-------
 .../manager/realtime/RealtimeTableDataManager.java | 13 +++---
 .../realtime/LLRealtimeSegmentDataManagerTest.java | 10 ++---
 .../fakestream/FakePartitionGroupMetadata.java     | 48 ++++++++++++++++++++
 .../impl/fakestream/FakeStreamConsumerFactory.java | 10 +----
 .../fakestream/FakeStreamMetadataProvider.java     | 15 ++++++-
 ...lakyConsumerRealtimeClusterIntegrationTest.java |  9 +---
 ...PartitionLLCRealtimeClusterIntegrationTest.java |  6 +--
 .../stream/kafka09/KafkaConsumerFactory.java       |  9 +---
 .../kafka09/KafkaPartitionGroupMetadata.java       | 48 ++++++++++++++++++++
 .../kafka09/KafkaStreamMetadataProvider.java       | 26 +++++++++++
 .../kafka09/KafkaPartitionLevelConsumerTest.java   |  2 +-
 .../stream/kafka20/KafkaConsumerFactory.java       |  9 +---
 .../kafka20/KafkaPartitionGroupMetadata.java       | 48 ++++++++++++++++++++
 .../kafka20/KafkaStreamMetadataProvider.java       | 21 +++++++++
 ...her.java => PartitionGroupMetadataFetcher.java} | 18 +++++---
 .../pinot/spi/stream/PartitionOffsetFetcher.java   | 15 ++++---
 .../pinot/spi/stream/StreamConsumerFactory.java    |  8 +---
 .../pinot/spi/stream/StreamMetadataProvider.java   |  9 +++-
 30 files changed, 347 insertions(+), 144 deletions(-)

diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentselector/RealtimeSegmentSelector.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentselector/RealtimeSegmentSelector.java
index f462326..2d778c6 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentselector/RealtimeSegmentSelector.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentselector/RealtimeSegmentSelector.java
@@ -95,7 +95,7 @@ public class RealtimeSegmentSelector implements SegmentSelector {
         if (instanceStateMap.containsValue(SegmentStateModel.CONSUMING)) {
           // Keep the first CONSUMING segment for each partition
           LLCSegmentName llcSegmentName = new LLCSegmentName(segment);
-          partitionIdToFirstConsumingLLCSegmentMap.compute(llcSegmentName.getPartitionId(), (k, consumingSegment) -> {
+          partitionIdToFirstConsumingLLCSegmentMap.compute(llcSegmentName.getPartitionGroupId(), (k, consumingSegment) -> {
             if (consumingSegment == null) {
               return llcSegmentName;
             } else {
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/LLCSegmentName.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/LLCSegmentName.java
index adc24ad..a66bb3c 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/LLCSegmentName.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/LLCSegmentName.java
@@ -26,7 +26,7 @@ import org.joda.time.DateTimeZone;
 public class LLCSegmentName extends SegmentName implements Comparable {
   private final static String DATE_FORMAT = "yyyyMMdd'T'HHmm'Z'";
   private final String _tableName;
-  private final int _partitionId;
+  private final int _partitionGroupId;
   private final int _sequenceNumber;
   private final String _creationTime;
   private final String _segmentName;
@@ -39,22 +39,22 @@ public class LLCSegmentName extends SegmentName implements Comparable {
     _segmentName = segmentName;
     String[] parts = StringUtils.splitByWholeSeparator(segmentName, SEPARATOR);
     _tableName = parts[0];
-    _partitionId = Integer.parseInt(parts[1]);
+    _partitionGroupId = Integer.parseInt(parts[1]);
     _sequenceNumber = Integer.parseInt(parts[2]);
     _creationTime = parts[3];
   }
 
-  public LLCSegmentName(String tableName, int partitionId, int sequenceNumber, long msSinceEpoch) {
+  public LLCSegmentName(String tableName, int partitionGroupId, int sequenceNumber, long msSinceEpoch) {
     if (!isValidComponentName(tableName)) {
       throw new RuntimeException("Invalid table name " + tableName);
     }
     _tableName = tableName;
-    _partitionId = partitionId;
+    _partitionGroupId = partitionGroupId;
     _sequenceNumber = sequenceNumber;
     // ISO8601 date: 20160120T1234Z
     DateTime dateTime = new DateTime(msSinceEpoch, DateTimeZone.UTC);
     _creationTime = dateTime.toString(DATE_FORMAT);
-    _segmentName = tableName + SEPARATOR + partitionId + SEPARATOR + sequenceNumber + SEPARATOR + _creationTime;
+    _segmentName = tableName + SEPARATOR + partitionGroupId + SEPARATOR + sequenceNumber + SEPARATOR + _creationTime;
   }
 
   /**
@@ -75,13 +75,13 @@ public class LLCSegmentName extends SegmentName implements Comparable {
   }
 
   @Override
-  public int getPartitionId() {
-    return _partitionId;
+  public int getPartitionGroupId() {
+    return _partitionGroupId;
   }
 
   @Override
   public String getPartitionRange() {
-    return Integer.toString(getPartitionId());
+    return Integer.toString(getPartitionGroupId());
   }
 
   @Override
@@ -110,9 +110,9 @@ public class LLCSegmentName extends SegmentName implements Comparable {
       throw new RuntimeException(
           "Cannot compare segment names " + this.getSegmentName() + " and " + other.getSegmentName());
     }
-    if (this.getPartitionId() > other.getPartitionId()) {
+    if (this.getPartitionGroupId() > other.getPartitionGroupId()) {
       return 1;
-    } else if (this.getPartitionId() < other.getPartitionId()) {
+    } else if (this.getPartitionGroupId() < other.getPartitionGroupId()) {
       return -1;
     } else {
       if (this.getSequenceNumber() > other.getSequenceNumber()) {
@@ -141,7 +141,7 @@ public class LLCSegmentName extends SegmentName implements Comparable {
 
     LLCSegmentName segName = (LLCSegmentName) o;
 
-    if (_partitionId != segName._partitionId) {
+    if (_partitionGroupId != segName._partitionGroupId) {
       return false;
     }
     if (_sequenceNumber != segName._sequenceNumber) {
@@ -159,7 +159,7 @@ public class LLCSegmentName extends SegmentName implements Comparable {
   @Override
   public int hashCode() {
     int result = _tableName != null ? _tableName.hashCode() : 0;
-    result = 31 * result + _partitionId;
+    result = 31 * result + _partitionGroupId;
     result = 31 * result + _sequenceNumber;
     result = 31 * result + (_creationTime != null ? _creationTime.hashCode() : 0);
     result = 31 * result + (_segmentName != null ? _segmentName.hashCode() : 0);
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/SegmentName.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/SegmentName.java
index 6763f6d..b0c00ae 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/SegmentName.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/SegmentName.java
@@ -63,7 +63,7 @@ public abstract class SegmentName {
     throw new RuntimeException("No groupId in " + getSegmentName());
   }
 
-  public int getPartitionId() {
+  public int getPartitionGroupId() {
     throw new RuntimeException("No partitionId in " + getSegmentName());
   }
 
diff --git a/pinot-common/src/test/java/org/apache/pinot/common/utils/SegmentNameBuilderTest.java b/pinot-common/src/test/java/org/apache/pinot/common/utils/SegmentNameBuilderTest.java
index f632f51..de606cc 100644
--- a/pinot-common/src/test/java/org/apache/pinot/common/utils/SegmentNameBuilderTest.java
+++ b/pinot-common/src/test/java/org/apache/pinot/common/utils/SegmentNameBuilderTest.java
@@ -58,7 +58,7 @@ public class SegmentNameBuilderTest {
     // Check partition range
     assertEquals(longNameSegment.getPartitionRange(), "0");
     assertEquals(shortNameSegment.getPartitionRange(), "ALL");
-    assertEquals(llcSegment.getPartitionId(), 0);
+    assertEquals(llcSegment.getPartitionGroupId(), 0);
 
     // Check groupId
     assertEquals(longNameSegment.getGroupId(), "myTable_REALTIME_1234567_0");
@@ -127,14 +127,14 @@ public class SegmentNameBuilderTest {
 
     LLCSegmentName segName1 = new LLCSegmentName(tableName, partitionId, sequenceNumber, msSinceEpoch);
     Assert.assertEquals(segName1.getSegmentName(), segmentName);
-    Assert.assertEquals(segName1.getPartitionId(), partitionId);
+    Assert.assertEquals(segName1.getPartitionGroupId(), partitionId);
     Assert.assertEquals(segName1.getCreationTime(), creationTime);
     Assert.assertEquals(segName1.getSequenceNumber(), sequenceNumber);
     Assert.assertEquals(segName1.getTableName(), tableName);
 
     LLCSegmentName segName2 = new LLCSegmentName(segmentName);
     Assert.assertEquals(segName2.getSegmentName(), segmentName);
-    Assert.assertEquals(segName2.getPartitionId(), partitionId);
+    Assert.assertEquals(segName2.getPartitionGroupId(), partitionId);
     Assert.assertEquals(segName2.getCreationTime(), creationTime);
     Assert.assertEquals(segName2.getSequenceNumber(), sequenceNumber);
     Assert.assertEquals(segName2.getTableName(), tableName);
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index fa117fa..a04e0bc 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -130,6 +130,7 @@ import org.apache.pinot.spi.stream.PartitionGroupMetadata;
 import org.apache.pinot.spi.stream.StreamConfig;
 import org.apache.pinot.spi.stream.StreamConsumerFactory;
 import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
+import org.apache.pinot.spi.stream.StreamMetadataProvider;
 import org.apache.pinot.spi.utils.IngestionConfigUtils;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.apache.pinot.spi.utils.retry.RetryPolicies;
@@ -1396,6 +1397,8 @@ public class PinotHelixResourceManager {
    */
   private void setupShardedRealtimeTable(StreamConfig streamConfig, IdealState idealState, int numReplicas) {
     StreamConsumerFactory streamConsumerFactory = StreamConsumerFactoryProvider.create(streamConfig);
+    StreamMetadataProvider streamMetadataProvider = streamConsumerFactory
+        .createStreamMetadataProvider(streamConfig.getTopicName() + "_" + System.currentTimeMillis());
 
     // get current partition groups and their metadata - this will be empty when creating the table
     List<PartitionGroupMetadata> currentPartitionGroupMetadataList = _pinotLLCRealtimeSegmentManager.getCurrentPartitionGroupMetadataList(idealState);
@@ -1403,7 +1406,7 @@ public class PinotHelixResourceManager {
     // get new partition groups and their metadata,
     // Assume table has 3 shards. Say we get [0], [1], [2] groups (for now assume that each group contains only 1 shard)
     List<PartitionGroupMetadata> newPartitionGroupMetadataList =
-        streamConsumerFactory.getPartitionGroupMetadataList(currentPartitionGroupMetadataList);
+        streamMetadataProvider.getPartitionGroupMetadataList(currentPartitionGroupMetadataList, 5000);
 
     // setup segment zk metadata and ideal state for all the new found partition groups
     _pinotLLCRealtimeSegmentManager.setupNewPartitionGroups(newPartitionGroupMetadataList, numReplicas);
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
index a564542..1e95966 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
@@ -32,7 +32,8 @@ import org.apache.pinot.common.utils.config.TagNameUtils;
 import org.apache.pinot.common.utils.helix.HelixHelper;
 import org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
 import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.stream.PartitionCountFetcher;
+import org.apache.pinot.spi.stream.PartitionGroupMetadata;
+import org.apache.pinot.spi.stream.PartitionGroupMetadataFetcher;
 import org.apache.pinot.spi.stream.StreamConfig;
 import org.apache.pinot.spi.utils.IngestionConfigUtils;
 import org.apache.pinot.spi.utils.retry.RetryPolicies;
@@ -115,13 +116,15 @@ public class PinotTableIdealStateBuilder {
     pinotLLCRealtimeSegmentManager.setUpNewTable(realtimeTableConfig, idealState);
   }
 
-  public static int getPartitionCount(StreamConfig streamConfig) {
-    PartitionCountFetcher partitionCountFetcher = new PartitionCountFetcher(streamConfig);
+  public static List<PartitionGroupMetadata> getPartitionGroupMetadataList(StreamConfig streamConfig,
+      List<PartitionGroupMetadata> currentPartitionGroupMetadataList) {
+    PartitionGroupMetadataFetcher partitionGroupMetadataFetcher =
+        new PartitionGroupMetadataFetcher(streamConfig, currentPartitionGroupMetadataList);
     try {
-      RetryPolicies.noDelayRetryPolicy(3).attempt(partitionCountFetcher);
-      return partitionCountFetcher.getPartitionCount();
+      RetryPolicies.noDelayRetryPolicy(3).attempt(partitionGroupMetadataFetcher);
+      return partitionGroupMetadataFetcher.getPartitionGroupMetadataList();
     } catch (Exception e) {
-      Exception fetcherException = partitionCountFetcher.getException();
+      Exception fetcherException = partitionGroupMetadataFetcher.getException();
       LOGGER.error("Could not get partition count for {}", streamConfig.getTopicName(), fetcherException);
       throw new RuntimeException(fetcherException);
     }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
index a069734..e27958f 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java
@@ -136,7 +136,7 @@ public class RealtimeSegmentAssignment implements SegmentAssignment {
    * Helper method to assign instances for CONSUMING segment based on the segment partition id and instance partitions.
    */
   private List<String> assignConsumingSegment(String segmentName, InstancePartitions instancePartitions) {
-    int partitionId = new LLCSegmentName(segmentName).getPartitionId();
+    int partitionId = new LLCSegmentName(segmentName).getPartitionGroupId();
 
     int numReplicaGroups = instancePartitions.getNumReplicaGroups();
     if (numReplicaGroups == 1) {
@@ -325,7 +325,7 @@ public class RealtimeSegmentAssignment implements SegmentAssignment {
 
         Map<Integer, List<String>> partitionIdToSegmentsMap = new HashMap<>();
         for (String segmentName : currentAssignment.keySet()) {
-          int partitionId = new LLCSegmentName(segmentName).getPartitionId();
+          int partitionId = new LLCSegmentName(segmentName).getPartitionGroupId();
           partitionIdToSegmentsMap.computeIfAbsent(partitionId, k -> new ArrayList<>()).add(segmentName);
         }
 
@@ -360,7 +360,7 @@ public class RealtimeSegmentAssignment implements SegmentAssignment {
       // Replica-group based assignment
 
       // Uniformly spray the segment partitions over the instance partitions
-      int segmentPartitionId = new LLCSegmentName(segmentName).getPartitionId();
+      int segmentPartitionId = new LLCSegmentName(segmentName).getPartitionGroupId();
       int numPartitions = instancePartitions.getNumPartitions();
       int partitionId = segmentPartitionId % numPartitions;
       return SegmentAssignmentUtils.assignSegmentWithReplicaGroup(currentAssignment, instancePartitions, partitionId);
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/RealtimeToOfflineSegmentsTaskGenerator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/RealtimeToOfflineSegmentsTaskGenerator.java
index a278396..8208d8e 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/RealtimeToOfflineSegmentsTaskGenerator.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/RealtimeToOfflineSegmentsTaskGenerator.java
@@ -252,11 +252,11 @@ public class RealtimeToOfflineSegmentsTaskGenerator implements PinotTaskGenerato
     Map<Integer, LLCSegmentName> latestLLCSegmentNameMap = new HashMap<>();
     for (LLCRealtimeSegmentZKMetadata metadata : realtimeSegmentsMetadataList) {
       LLCSegmentName llcSegmentName = new LLCSegmentName(metadata.getSegmentName());
-      allPartitions.add(llcSegmentName.getPartitionId());
+      allPartitions.add(llcSegmentName.getPartitionGroupId());
 
       if (metadata.getStatus().equals(Segment.Realtime.Status.DONE)) {
         completedSegmentsMetadataList.add(metadata);
-        latestLLCSegmentNameMap.compute(llcSegmentName.getPartitionId(), (partitionId, latestLLCSegmentName) -> {
+        latestLLCSegmentNameMap.compute(llcSegmentName.getPartitionGroupId(), (partitionId, latestLLCSegmentName) -> {
           if (latestLLCSegmentName == null) {
             return llcSegmentName;
           } else {
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 8a29489..189be8b 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -83,6 +83,7 @@ import org.apache.pinot.spi.stream.StreamConfig;
 import org.apache.pinot.spi.stream.StreamConfigProperties;
 import org.apache.pinot.spi.stream.StreamConsumerFactory;
 import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
+import org.apache.pinot.spi.stream.StreamMetadataProvider;
 import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
 import org.apache.pinot.spi.stream.StreamPartitionMsgOffsetFactory;
 import org.apache.pinot.spi.utils.IngestionConfigUtils;
@@ -186,8 +187,10 @@ public class PinotLLCRealtimeSegmentManager {
 
     // get new partition groups (honor any groupings which are already consuming - [0], [1], [2])
     StreamConsumerFactory streamConsumerFactory = StreamConsumerFactoryProvider.create(streamConfig);
+    StreamMetadataProvider streamMetadataProvider = streamConsumerFactory
+        .createStreamMetadataProvider(streamConfig.getTopicName() + " " + System.currentTimeMillis());
     List<PartitionGroupMetadata> newPartitionGroupMetadataList =
-        streamConsumerFactory.getPartitionGroupMetadataList(currentPartitionGroupMetadataList);
+        streamMetadataProvider.getPartitionGroupMetadataList(currentPartitionGroupMetadataList, 1000);
 
     // from the above list, remove the partition groups which are already CONSUMING
     // i.e. newPartitionGroups - currentPartitionGroups. Therefore, ([0], [1], [2]) - ([1], [2]) = ([0])
@@ -292,7 +295,8 @@ public class PinotLLCRealtimeSegmentManager {
     PartitionLevelStreamConfig streamConfig =
         new PartitionLevelStreamConfig(tableConfig.getTableName(), IngestionConfigUtils.getStreamConfigMap(tableConfig));
     InstancePartitions instancePartitions = getConsumingInstancePartitions(tableConfig);
-    int numPartitions = getNumPartitions(streamConfig);
+    List<PartitionGroupMetadata> currentPartitionGroupMetadataList = getCurrentPartitionGroupMetadataList(idealState);
+    int numPartitionGroups = getPartitionGroupMetadataList(streamConfig, currentPartitionGroupMetadataList).size();
     int numReplicas = getNumReplicas(tableConfig, instancePartitions);
 
     SegmentAssignment segmentAssignment = SegmentAssignmentFactory.getSegmentAssignment(_helixManager, tableConfig);
@@ -301,9 +305,9 @@ public class PinotLLCRealtimeSegmentManager {
 
     long currentTimeMs = getCurrentTimeMs();
     Map<String, Map<String, String>> instanceStatesMap = idealState.getRecord().getMapFields();
-    for (int partitionId = 0; partitionId < numPartitions; partitionId++) {
+    for (int partitionGroupId = 0; partitionGroupId < numPartitionGroups; partitionGroupId++) {
       String segmentName =
-          setupNewPartition(tableConfig, streamConfig, partitionId, currentTimeMs, instancePartitions, numPartitions,
+          setupNewPartitionGroup(tableConfig, streamConfig, partitionGroupId, currentTimeMs, instancePartitions, numPartitionGroups,
               numReplicas);
       updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null, segmentName, segmentAssignment,
           instancePartitionsMap);
@@ -635,7 +639,7 @@ public class PinotLLCRealtimeSegmentManager {
 
     // Add the partition metadata if available
     SegmentPartitionMetadata partitionMetadata =
-        getPartitionMetadataFromTableConfig(tableConfig, newLLCSegmentName.getPartitionId());
+        getPartitionMetadataFromTableConfig(tableConfig, newLLCSegmentName.getPartitionGroupId());
     if (partitionMetadata != null) {
       newSegmentZKMetadata.setPartitionMetadata(partitionMetadata);
     }
@@ -705,22 +709,23 @@ public class PinotLLCRealtimeSegmentManager {
   }
 
   @VisibleForTesting
-  int getNumPartitions(StreamConfig streamConfig) {
-    return PinotTableIdealStateBuilder.getPartitionCount(streamConfig);
+  List<PartitionGroupMetadata> getPartitionGroupMetadataList(StreamConfig streamConfig,
+      List<PartitionGroupMetadata> currentPartitionGroupMetadataList) {
+    return PinotTableIdealStateBuilder.getPartitionGroupMetadataList(streamConfig, currentPartitionGroupMetadataList);
   }
 
   @VisibleForTesting
   StreamPartitionMsgOffset getPartitionOffset(StreamConfig streamConfig, OffsetCriteria offsetCriteria,
-      int partitionId) {
+      int partitionGroupId) {
     PartitionOffsetFetcher partitionOffsetFetcher =
-        new PartitionOffsetFetcher(offsetCriteria, partitionId, streamConfig);
+        new PartitionOffsetFetcher(offsetCriteria, partitionGroupId, streamConfig);
     try {
       RetryPolicies.fixedDelayRetryPolicy(3, 1000L).attempt(partitionOffsetFetcher);
       return partitionOffsetFetcher.getOffset();
     } catch (Exception e) {
       throw new IllegalStateException(String
           .format("Failed to fetch the offset for topic: %s, partition: %s with criteria: %s",
-              streamConfig.getTopicName(), partitionId, offsetCriteria), e);
+              streamConfig.getTopicName(), partitionGroupId, offsetCriteria), e);
     }
   }
 
@@ -768,7 +773,7 @@ public class PinotLLCRealtimeSegmentManager {
     Map<Integer, LLCSegmentName> latestLLCSegmentNameMap = new HashMap<>();
     for (String segmentName : segments) {
       LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
-      latestLLCSegmentNameMap.compute(llcSegmentName.getPartitionId(), (partitionId, latestLLCSegmentName) -> {
+      latestLLCSegmentNameMap.compute(llcSegmentName.getPartitionGroupId(), (partitionId, latestLLCSegmentName) -> {
         if (latestLLCSegmentName == null) {
           return llcSegmentName;
         } else {
@@ -821,10 +826,12 @@ public class PinotLLCRealtimeSegmentManager {
     Preconditions.checkState(!_isStopping, "Segment manager is stopping");
 
     String realtimeTableName = tableConfig.getTableName();
-    int numPartitions = getNumPartitions(streamConfig);
     HelixHelper.updateIdealState(_helixManager, realtimeTableName, idealState -> {
       assert idealState != null;
       if (idealState.isEnabled()) {
+        List<PartitionGroupMetadata> currentPartitionGroupMetadataList =
+            getCurrentPartitionGroupMetadataList(idealState);
+        int numPartitions = getPartitionGroupMetadataList(streamConfig, currentPartitionGroupMetadataList).size();
         return ensureAllPartitionsConsuming(tableConfig, streamConfig, idealState, numPartitions);
       } else {
         LOGGER.info("Skipping LLC segments validation for disabled table: {}", realtimeTableName);
@@ -1085,7 +1092,7 @@ public class PinotLLCRealtimeSegmentManager {
           String previousConsumingSegment = null;
           for (Map.Entry<String, Map<String, String>> segmentEntry : instanceStatesMap.entrySet()) {
             LLCSegmentName llcSegmentName = new LLCSegmentName(segmentEntry.getKey());
-            if (llcSegmentName.getPartitionId() == partitionId && segmentEntry.getValue()
+            if (llcSegmentName.getPartitionGroupId() == partitionId && segmentEntry.getValue()
                 .containsValue(SegmentStateModel.CONSUMING)) {
               previousConsumingSegment = llcSegmentName.getSegmentName();
               break;
@@ -1110,7 +1117,7 @@ public class PinotLLCRealtimeSegmentManager {
     for (int partitionId = 0; partitionId < numPartitions; partitionId++) {
       if (!latestSegmentZKMetadataMap.containsKey(partitionId)) {
         String newSegmentName =
-            setupNewPartition(tableConfig, streamConfig, partitionId, currentTimeMs, instancePartitions, numPartitions,
+            setupNewPartitionGroup(tableConfig, streamConfig, partitionId, currentTimeMs, instancePartitions, numPartitions,
                 numReplicas);
         updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null, newSegmentName, segmentAssignment,
             instancePartitionsMap);
@@ -1121,7 +1128,7 @@ public class PinotLLCRealtimeSegmentManager {
   }
 
   private LLCSegmentName getNextLLCSegmentName(LLCSegmentName lastLLCSegmentName, long creationTimeMs) {
-    return new LLCSegmentName(lastLLCSegmentName.getTableName(), lastLLCSegmentName.getPartitionId(),
+    return new LLCSegmentName(lastLLCSegmentName.getTableName(), lastLLCSegmentName.getPartitionGroupId(),
         lastLLCSegmentName.getSequenceNumber() + 1, creationTimeMs);
   }
 
@@ -1129,21 +1136,21 @@ public class PinotLLCRealtimeSegmentManager {
    * Sets up a new partition.
    * <p>Persists the ZK metadata for the first CONSUMING segment, and returns the segment name.
    */
-  private String setupNewPartition(TableConfig tableConfig, PartitionLevelStreamConfig streamConfig, int partitionId,
-      long creationTimeMs, InstancePartitions instancePartitions, int numPartitions, int numReplicas) {
+  private String setupNewPartitionGroup(TableConfig tableConfig, PartitionLevelStreamConfig streamConfig, int partitionGroupId,
+      long creationTimeMs, InstancePartitions instancePartitions, int numPartitionGroups, int numReplicas) {
     String realtimeTableName = tableConfig.getTableName();
-    LOGGER.info("Setting up new partition: {} for table: {}", partitionId, realtimeTableName);
+    LOGGER.info("Setting up new partition group: {} for table: {}", partitionGroupId, realtimeTableName);
 
     String rawTableName = TableNameBuilder.extractRawTableName(realtimeTableName);
     LLCSegmentName newLLCSegmentName =
-        new LLCSegmentName(rawTableName, partitionId, STARTING_SEQUENCE_NUMBER, creationTimeMs);
+        new LLCSegmentName(rawTableName, partitionGroupId, STARTING_SEQUENCE_NUMBER, creationTimeMs);
     String newSegmentName = newLLCSegmentName.getSegmentName();
     StreamPartitionMsgOffset startOffset =
-        getPartitionOffset(streamConfig, streamConfig.getOffsetCriteria(), partitionId);
+        getPartitionOffset(streamConfig, streamConfig.getOffsetCriteria(), partitionGroupId);
     CommittingSegmentDescriptor committingSegmentDescriptor =
         new CommittingSegmentDescriptor(null, startOffset.toString(), 0);
     createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName, creationTimeMs,
-        committingSegmentDescriptor, null, instancePartitions, numPartitions, numReplicas);
+        committingSegmentDescriptor, null, instancePartitions, numPartitionGroups, numReplicas);
 
     return newSegmentName;
   }
@@ -1157,7 +1164,7 @@ public class PinotLLCRealtimeSegmentManager {
     int numPartitions = 0;
     for (String segmentName : idealState.getRecord().getMapFields().keySet()) {
       if (LLCSegmentName.isLowLevelConsumerSegmentName(segmentName)) {
-        numPartitions = Math.max(numPartitions, new LLCSegmentName(segmentName).getPartitionId() + 1);
+        numPartitions = Math.max(numPartitions, new LLCSegmentName(segmentName).getPartitionGroupId() + 1);
       }
     }
     return numPartitions;
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentSizeBasedFlushThresholdUpdater.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentSizeBasedFlushThresholdUpdater.java
index 2e73806..56ae29e 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentSizeBasedFlushThresholdUpdater.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentSizeBasedFlushThresholdUpdater.java
@@ -102,7 +102,7 @@ public class SegmentSizeBasedFlushThresholdUpdater implements FlushThresholdUpda
     // less same characteristics at any one point in time).
     // However, when we start a new table or change controller mastership, we can have any partition completing first.
     // It is best to learn the ratio as quickly as we can, so we allow any partition to supply the value.
-    if (new LLCSegmentName(newSegmentName).getPartitionId() == 0 || _latestSegmentRowsToSizeRatio == 0) {
+    if (new LLCSegmentName(newSegmentName).getPartitionGroupId() == 0 || _latestSegmentRowsToSizeRatio == 0) {
       if (_latestSegmentRowsToSizeRatio > 0) {
         _latestSegmentRowsToSizeRatio =
             CURRENT_SEGMENT_RATIO_WEIGHT * currentRatio + PREVIOUS_SEGMENT_RATIO_WEIGHT * _latestSegmentRowsToSizeRatio;
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index 4888f17..743e719 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -30,6 +30,8 @@ import java.util.Map;
 import java.util.Random;
 import java.util.TreeMap;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 import javax.annotation.Nullable;
 import org.apache.commons.io.FileUtils;
 import org.apache.helix.model.IdealState;
@@ -48,6 +50,7 @@ import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignme
 import org.apache.pinot.controller.helix.core.realtime.segment.CommittingSegmentDescriptor;
 import org.apache.pinot.controller.util.SegmentCompletionUtils;
 import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
+import org.apache.pinot.core.realtime.impl.fakestream.FakePartitionGroupMetadata;
 import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
 import org.apache.pinot.core.segment.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.spi.config.table.TableConfig;
@@ -57,6 +60,7 @@ import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.filesystem.PinotFSFactory;
 import org.apache.pinot.spi.stream.LongMsgOffset;
 import org.apache.pinot.spi.stream.OffsetCriteria;
+import org.apache.pinot.spi.stream.PartitionGroupMetadata;
 import org.apache.pinot.spi.stream.PartitionLevelStreamConfig;
 import org.apache.pinot.spi.stream.StreamConfig;
 import org.apache.pinot.spi.utils.IngestionConfigUtils;
@@ -333,7 +337,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
       assertTrue(oldSegmentZKMetadataMap.containsKey(segmentName));
       assertTrue(segmentZKMetadataMap.containsKey(segmentName));
       assertEquals(segmentZKMetadataMap.get(segmentName), oldSegmentZKMetadataMap.get(segmentName));
-      oldNumPartitions = Math.max(oldNumPartitions, new LLCSegmentName(segmentName).getPartitionId() + 1);
+      oldNumPartitions = Math.max(oldNumPartitions, new LLCSegmentName(segmentName).getPartitionGroupId() + 1);
     }
 
     // Check that for new partitions, each partition should have exactly 1 new segment in CONSUMING state, and metadata
@@ -341,7 +345,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
     Map<Integer, List<String>> partitionIdToSegmentsMap = new HashMap<>();
     for (Map.Entry<String, Map<String, String>> entry : instanceStatesMap.entrySet()) {
       String segmentName = entry.getKey();
-      int partitionId = new LLCSegmentName(segmentName).getPartitionId();
+      int partitionId = new LLCSegmentName(segmentName).getPartitionGroupId();
       partitionIdToSegmentsMap.computeIfAbsent(partitionId, k -> new ArrayList<>()).add(segmentName);
     }
     for (int partitionId = oldNumPartitions; partitionId < segmentManager._numPartitions; partitionId++) {
@@ -579,7 +583,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
       if (instanceStateMap.containsValue(SegmentStateModel.ONLINE) || instanceStateMap.containsValue(
           SegmentStateModel.CONSUMING)) {
         LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
-        int partitionsId = llcSegmentName.getPartitionId();
+        int partitionsId = llcSegmentName.getPartitionGroupId();
         Map<Integer, String> sequenceNumberToSegmentMap = partitionIdToSegmentsMap.get(partitionsId);
         int sequenceNumber = llcSegmentName.getSequenceNumber();
         assertFalse(sequenceNumberToSegmentMap.containsKey(sequenceNumber));
@@ -910,12 +914,12 @@ public class PinotLLCRealtimeSegmentManagerTest {
     }
 
     @Override
-    int getNumPartitions(StreamConfig streamConfig) {
-      return _numPartitions;
+    List<PartitionGroupMetadata> getPartitionGroupMetadataList(StreamConfig streamConfig, List<PartitionGroupMetadata> currentPartitionGroupMetadataList) {
+      return IntStream.range(0, _numPartitions).mapToObj(FakePartitionGroupMetadata::new).collect(Collectors.toList());
     }
 
     @Override
-    LongMsgOffset getPartitionOffset(StreamConfig streamConfig, OffsetCriteria offsetCriteria, int partitionId) {
+    LongMsgOffset getPartitionOffset(StreamConfig streamConfig, OffsetCriteria offsetCriteria, int partitionGroupId) {
       // The criteria for this test should always be SMALLEST (for default streaming config and new added partitions)
       assertTrue(offsetCriteria.isSmallest());
       return PARTITION_OFFSET;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index 84d4592..13a9ab2 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -27,6 +27,7 @@ import java.io.IOException;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -214,7 +215,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
   // Semaphore for each partitionId only, which is to prevent two different Kafka consumers
   // from consuming with the same partitionId in parallel in the same host.
   // See the comments in {@link RealtimeTableDataManager}.
-  private final Semaphore _partitionConsumerSemaphore;
+  private final Semaphore _partitionGroupConsumerSemaphore;
   // A boolean flag to check whether the current thread has acquired the semaphore.
   // This boolean is needed because the semaphore is shared by threads; every thread holding this semaphore can
   // modify the permit. This boolean make sure the semaphore gets released only once when the partition stops consuming.
@@ -247,7 +248,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
 
   private Thread _consumerThread;
   private final String _streamTopic;
-  private final int _streamPartitionId;
+  private final int _partitionGroupId;
   final String _clientId;
   private final LLCSegmentName _llcSegmentName;
   private final RecordTransformer _recordTransformer;
@@ -705,7 +706,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
   @Override
   public Map<String, String> getPartitionToCurrentOffset() {
     Map<String, String> partitionToCurrentOffset = new HashMap<>();
-    partitionToCurrentOffset.put(String.valueOf(_streamPartitionId), _currentOffset.toString());
+    partitionToCurrentOffset.put(String.valueOf(_partitionGroupId), _currentOffset.toString());
     return partitionToCurrentOffset;
   }
 
@@ -730,8 +731,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
   }
 
   @VisibleForTesting
-  protected Semaphore getPartitionConsumerSemaphore() {
-    return _partitionConsumerSemaphore;
+  protected Semaphore getPartitionGroupConsumerSemaphore() {
+    return _partitionGroupConsumerSemaphore;
   }
 
   @VisibleForTesting
@@ -892,7 +893,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
     closePartitionLevelConsumer();
     closeStreamMetadataProvider();
     if (_acquiredConsumerSemaphore.compareAndSet(true, false)) {
-      _partitionConsumerSemaphore.release();
+      _partitionGroupConsumerSemaphore.release();
     }
   }
 
@@ -1102,7 +1103,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
   // If the transition is OFFLINE to ONLINE, the caller should have downloaded the segment and we don't reach here.
   public LLRealtimeSegmentDataManager(RealtimeSegmentZKMetadata segmentZKMetadata, TableConfig tableConfig,
       RealtimeTableDataManager realtimeTableDataManager, String resourceDataDir, IndexLoadingConfig indexLoadingConfig,
-      Schema schema, LLCSegmentName llcSegmentName, Semaphore partitionConsumerSemaphore, ServerMetrics serverMetrics,
+      Schema schema, LLCSegmentName llcSegmentName, Semaphore partitionGroupConsumerSemaphore, ServerMetrics serverMetrics,
       @Nullable PartitionUpsertMetadataManager partitionUpsertMetadataManager) {
     _segBuildSemaphore = realtimeTableDataManager.getSegmentBuildSemaphore();
     _segmentZKMetadata = (LLCRealtimeSegmentZKMetadata) segmentZKMetadata;
@@ -1129,10 +1130,10 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
     _streamTopic = _partitionLevelStreamConfig.getTopicName();
     _segmentNameStr = _segmentZKMetadata.getSegmentName();
     _llcSegmentName = llcSegmentName;
-    _streamPartitionId = _llcSegmentName.getPartitionId();
-    _partitionConsumerSemaphore = partitionConsumerSemaphore;
+    _partitionGroupId = _llcSegmentName.getPartitionGroupId();
+    _partitionGroupConsumerSemaphore = partitionGroupConsumerSemaphore;
     _acquiredConsumerSemaphore = new AtomicBoolean(false);
-    _metricKeyName = _tableNameWithType + "-" + _streamTopic + "-" + _streamPartitionId;
+    _metricKeyName = _tableNameWithType + "-" + _streamTopic + "-" + _partitionGroupId;
     segmentLogger = LoggerFactory.getLogger(LLRealtimeSegmentDataManager.class.getName() + "_" + _segmentNameStr);
     _tableStreamName = _tableNameWithType + "_" + _streamTopic;
     _memoryManager = getMemoryManager(realtimeTableDataManager.getConsumerDir(), _segmentNameStr,
@@ -1210,14 +1211,14 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
     // Create message decoder
     Set<String> fieldsToRead = IngestionUtils.getFieldsForRecordExtractor(_tableConfig.getIngestionConfig(), _schema);
     _messageDecoder = StreamDecoderProvider.create(_partitionLevelStreamConfig, fieldsToRead);
-    _clientId = _streamTopic + "-" + _streamPartitionId;
+    _clientId = _streamTopic + "-" + _partitionGroupId;
 
     // Create record transformer
     _recordTransformer = CompositeTransformer.getDefaultTransformer(tableConfig, schema);
 
     // Acquire semaphore to create Kafka consumers
     try {
-      _partitionConsumerSemaphore.acquire();
+      _partitionGroupConsumerSemaphore.acquire();
       _acquiredConsumerSemaphore.set(true);
     } catch (InterruptedException e) {
       String errorMsg = "InterruptedException when acquiring the partitionConsumerSemaphore";
@@ -1243,7 +1244,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
         //       long as the partition function is not changed.
         int numPartitions = columnPartitionConfig.getNumPartitions();
         try {
-          int numStreamPartitions = _streamMetadataProvider.fetchPartitionCount(/*maxWaitTimeMs=*/5000L);
+          // fixme: get this from ideal state
+          int numStreamPartitions = _streamMetadataProvider.getPartitionGroupMetadataList(Collections.emptyList(), 5000).size();
           if (numStreamPartitions != numPartitions) {
             segmentLogger.warn(
                 "Number of stream partitions: {} does not match number of partitions in the partition config: {}, using number of stream partitions",
@@ -1261,7 +1263,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
         realtimeSegmentConfigBuilder.setPartitionColumn(partitionColumn);
         realtimeSegmentConfigBuilder
             .setPartitionFunction(PartitionFunctionFactory.getPartitionFunction(partitionFunctionName, numPartitions));
-        realtimeSegmentConfigBuilder.setPartitionId(_streamPartitionId);
+        realtimeSegmentConfigBuilder.setPartitionId(_partitionGroupId);
       } else {
         segmentLogger.warn("Cannot partition on multiple columns: {}", columnPartitionMap.keySet());
       }
@@ -1313,7 +1315,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
       closePartitionLevelConsumer();
     }
     segmentLogger.info("Creating new stream consumer, reason: {}", reason);
-    _partitionLevelConsumer = _streamConsumerFactory.createPartitionLevelConsumer(_clientId, _streamPartitionId);
+    _partitionLevelConsumer = _streamConsumerFactory.createPartitionLevelConsumer(_clientId, _partitionGroupId);
   }
 
   /**
@@ -1325,7 +1327,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
       closeStreamMetadataProvider();
     }
     segmentLogger.info("Creating new stream metadata provider, reason: {}", reason);
-    _streamMetadataProvider = _streamConsumerFactory.createPartitionMetadataProvider(_clientId, _streamPartitionId);
+    _streamMetadataProvider = _streamConsumerFactory.createPartitionMetadataProvider(_clientId, _partitionGroupId);
   }
 
   // This should be done during commit? We may not always commit when we build a segment....
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index 33283b9..9850048 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -36,6 +36,7 @@ import java.util.concurrent.TimeUnit;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.io.FileUtils;
+import org.apache.helix.model.IdealState;
 import org.apache.pinot.common.Utils;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.metadata.instance.InstanceZKMetadata;
@@ -89,7 +90,7 @@ public class RealtimeTableDataManager extends BaseTableDataManager {
   // In some streams, it's possible that having multiple consumers (with the same consumer name on the same host) consuming from the same stream partition can lead to bugs.
   // The semaphores will stay in the hash map even if the consuming partitions move to a different host.
   // We expect that there will be a small number of semaphores, but that may be ok.
-  private final Map<Integer, Semaphore> _partitionIdToSemaphoreMap = new ConcurrentHashMap<>();
+  private final Map<Integer, Semaphore> _partitionGroupIdToSemaphoreMap = new ConcurrentHashMap<>();
 
   // The old name of the stats file used to be stats.ser which we changed when we moved all packages
   // from com.linkedin to org.apache because of not being able to deserialize the old files using the newer classes
@@ -274,7 +275,7 @@ public class RealtimeTableDataManager extends BaseTableDataManager {
       llcSegmentName = new LLCSegmentName(segmentName);
       if (_tableUpsertMetadataManager != null) {
         partitionUpsertMetadataManager =
-            _tableUpsertMetadataManager.getOrCreatePartitionManager(llcSegmentName.getPartitionId());
+            _tableUpsertMetadataManager.getOrCreatePartitionManager(llcSegmentName.getPartitionGroupId());
       }
     }
 
@@ -307,11 +308,11 @@ public class RealtimeTableDataManager extends BaseTableDataManager {
         }
 
         // Generates only one semaphore for every partitionId
-        int partitionId = llcSegmentName.getPartitionId();
-        _partitionIdToSemaphoreMap.putIfAbsent(partitionId, new Semaphore(1));
+        int partitionGroupId = llcSegmentName.getPartitionGroupId();
+        _partitionGroupIdToSemaphoreMap.putIfAbsent(partitionGroupId, new Semaphore(1));
         manager =
             new LLRealtimeSegmentDataManager(realtimeSegmentZKMetadata, tableConfig, this, _indexDir.getAbsolutePath(),
-                indexLoadingConfig, schema, llcSegmentName, _partitionIdToSemaphoreMap.get(partitionId), _serverMetrics,
+                indexLoadingConfig, schema, llcSegmentName, _partitionGroupIdToSemaphoreMap.get(partitionGroupId), _serverMetrics,
                 partitionUpsertMetadataManager);
       }
       _logger.info("Initialize RealtimeSegmentDataManager - " + segmentName);
@@ -336,7 +337,7 @@ public class RealtimeTableDataManager extends BaseTableDataManager {
     columnToReaderMap.put(_timeColumnName, new PinotSegmentColumnReader(immutableSegment, _timeColumnName));
     int numTotalDocs = immutableSegment.getSegmentMetadata().getTotalDocs();
     String segmentName = immutableSegment.getSegmentName();
-    int partitionId = new LLCSegmentName(immutableSegment.getSegmentName()).getPartitionId();
+    int partitionId = new LLCSegmentName(immutableSegment.getSegmentName()).getPartitionGroupId();
     PartitionUpsertMetadataManager partitionUpsertMetadataManager =
         _tableUpsertMetadataManager.getOrCreatePartitionManager(partitionId);
     int numPrimaryKeyColumns = _primaryKeyColumns.size();
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
index 0017c43..d09bdeb 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
@@ -719,7 +719,7 @@ public class LLRealtimeSegmentDataManagerTest {
     long timeout = 10_000L;
     FakeLLRealtimeSegmentDataManager firstSegmentDataManager = createFakeSegmentManager();
     Assert.assertTrue(firstSegmentDataManager.getAcquiredConsumerSemaphore().get());
-    Semaphore firstSemaphore = firstSegmentDataManager.getPartitionConsumerSemaphore();
+    Semaphore firstSemaphore = firstSegmentDataManager.getPartitionGroupConsumerSemaphore();
     Assert.assertEquals(firstSemaphore.availablePermits(), 0);
     Assert.assertFalse(firstSemaphore.hasQueuedThreads());
 
@@ -751,18 +751,18 @@ public class LLRealtimeSegmentDataManagerTest {
         "Failed to acquire the semaphore for the second segment manager in " + timeout + "ms");
 
     Assert.assertTrue(secondSegmentDataManager.get().getAcquiredConsumerSemaphore().get());
-    Semaphore secondSemaphore = secondSegmentDataManager.get().getPartitionConsumerSemaphore();
+    Semaphore secondSemaphore = secondSegmentDataManager.get().getPartitionGroupConsumerSemaphore();
     Assert.assertEquals(firstSemaphore, secondSemaphore);
     Assert.assertEquals(secondSemaphore.availablePermits(), 0);
     Assert.assertFalse(secondSemaphore.hasQueuedThreads());
 
     // Call destroy method the 2nd time on the first segment manager, the permits in semaphore won't increase.
     firstSegmentDataManager.destroy();
-    Assert.assertEquals(firstSegmentDataManager.getPartitionConsumerSemaphore().availablePermits(), 0);
+    Assert.assertEquals(firstSegmentDataManager.getPartitionGroupConsumerSemaphore().availablePermits(), 0);
 
     // The permit finally gets released in the Semaphore.
     secondSegmentDataManager.get().destroy();
-    Assert.assertEquals(secondSegmentDataManager.get().getPartitionConsumerSemaphore().availablePermits(), 1);
+    Assert.assertEquals(secondSegmentDataManager.get().getPartitionGroupConsumerSemaphore().availablePermits(), 1);
   }
 
   public static class FakeLLRealtimeSegmentDataManager extends LLRealtimeSegmentDataManager {
@@ -800,7 +800,7 @@ public class LLRealtimeSegmentDataManagerTest {
         throws Exception {
       super(segmentZKMetadata, tableConfig, realtimeTableDataManager, resourceDataDir,
           new IndexLoadingConfig(makeInstanceDataManagerConfig(), tableConfig), schema, llcSegmentName,
-          semaphoreMap.get(llcSegmentName.getPartitionId()), serverMetrics,
+          semaphoreMap.get(llcSegmentName.getPartitionGroupId()), serverMetrics,
           new PartitionUpsertMetadataManager("testTable_REALTIME", 0, serverMetrics));
       _state = LLRealtimeSegmentDataManager.class.getDeclaredField("_state");
       _state.setAccessible(true);
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakePartitionGroupMetadata.java b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakePartitionGroupMetadata.java
new file mode 100644
index 0000000..78ee12c
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakePartitionGroupMetadata.java
@@ -0,0 +1,48 @@
+package org.apache.pinot.core.realtime.impl.fakestream;
+
+import org.apache.pinot.spi.stream.Checkpoint;
+import org.apache.pinot.spi.stream.PartitionGroupMetadata;
+
+
+public class FakePartitionGroupMetadata implements PartitionGroupMetadata {
+
+  private final int _groupId;
+  public FakePartitionGroupMetadata(int groupId) {
+    _groupId = groupId;
+  }
+
+  @Override
+  public int getGroupId() {
+    return getGroupId();
+  }
+
+  @Override
+  public Checkpoint getStartCheckpoint() {
+    return null;
+  }
+
+  @Override
+  public Checkpoint getEndCheckpoint() {
+    return null;
+  }
+
+  @Override
+  public void setStartCheckpoint(Checkpoint startCheckpoint) {
+
+  }
+
+  @Override
+  public void setEndCheckpoint(Checkpoint endCheckpoint) {
+
+  }
+
+  @Override
+  public byte[] serialize() {
+    return new byte[0];
+  }
+
+  @Override
+  public PartitionGroupMetadata deserialize(byte[] blob) {
+    return null;
+  }
+}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java
index 9669223..289b226 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pinot.core.realtime.impl.fakestream;
 
-import java.util.List;
 import java.util.Set;
 import org.apache.pinot.core.util.IngestionUtils;
 import org.apache.pinot.spi.config.table.TableConfig;
@@ -69,14 +68,9 @@ public class FakeStreamConsumerFactory extends StreamConsumerFactory {
     return new FakeStreamMetadataProvider(_streamConfig);
   }
 
-  @Override
-  public List<PartitionGroupMetadata> getPartitionGroupMetadataList(
-      List<PartitionGroupMetadata> currentPartitionGroupsMetadata) {
-    return null;
-  }
 
   @Override
-  public PartitionGroupConsumer createConsumer(PartitionGroupMetadata metadata) {
+  public PartitionGroupConsumer createPartitionGroupConsumer(PartitionGroupMetadata metadata) {
     return null;
   }
 
@@ -93,7 +87,7 @@ public class FakeStreamConsumerFactory extends StreamConsumerFactory {
 
     // stream metadata provider
     StreamMetadataProvider streamMetadataProvider = streamConsumerFactory.createStreamMetadataProvider(clientId);
-    int partitionCount = streamMetadataProvider.fetchPartitionCount(10_000);
+    int partitionCount = streamMetadataProvider.getPartitionGroupMetadataList(null, 10_000).size();
     System.out.println(partitionCount);
 
     // Partition metadata provider
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMetadataProvider.java b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMetadataProvider.java
index e0b8ebd..c96d06a 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMetadataProvider.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMetadataProvider.java
@@ -19,9 +19,12 @@
 package org.apache.pinot.core.realtime.impl.fakestream;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.TimeoutException;
 import javax.annotation.Nonnull;
 import org.apache.pinot.spi.stream.OffsetCriteria;
+import org.apache.pinot.spi.stream.PartitionGroupMetadata;
 import org.apache.pinot.spi.stream.StreamConfig;
 import org.apache.pinot.spi.stream.StreamMetadataProvider;
 import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
@@ -31,7 +34,7 @@ import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
  * StreamMetadataProvider implementation for the fake stream
  */
 public class FakeStreamMetadataProvider implements StreamMetadataProvider {
-  private int _numPartitions;
+  private final int _numPartitions;
 
   public FakeStreamMetadataProvider(StreamConfig streamConfig) {
     _numPartitions = FakeStreamConfigUtils.getNumPartitions(streamConfig);
@@ -42,6 +45,16 @@ public class FakeStreamMetadataProvider implements StreamMetadataProvider {
     return _numPartitions;
   }
 
+  @Override
+  public List<PartitionGroupMetadata> getPartitionGroupMetadataList(
+      List<PartitionGroupMetadata> currentPartitionGroupsMetadata, long timeoutMillis) {
+    List<PartitionGroupMetadata> partitionGroupMetadataList = new ArrayList<>();
+    for (int i = 0; i < _numPartitions; i++) {
+      partitionGroupMetadataList.add(new FakePartitionGroupMetadata(i));
+    }
+    return partitionGroupMetadataList;
+  }
+
   public long fetchPartitionOffset(@Nonnull OffsetCriteria offsetCriteria, long timeoutMillis) throws TimeoutException {
     throw new UnsupportedOperationException("This method is deprecated");
   }
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/FlakyConsumerRealtimeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/FlakyConsumerRealtimeClusterIntegrationTest.java
index 808a464..d917d73 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/FlakyConsumerRealtimeClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/FlakyConsumerRealtimeClusterIntegrationTest.java
@@ -19,7 +19,6 @@
 package org.apache.pinot.integration.tests;
 
 import java.lang.reflect.Constructor;
-import java.util.List;
 import java.util.Random;
 import java.util.Set;
 import org.apache.pinot.spi.data.readers.GenericRow;
@@ -122,13 +121,7 @@ public class FlakyConsumerRealtimeClusterIntegrationTest extends RealtimeCluster
     }
 
     @Override
-    public List<PartitionGroupMetadata> getPartitionGroupMetadataList(
-        List<PartitionGroupMetadata> currentPartitionGroupsMetadata) {
-      return null;
-    }
-
-    @Override
-    public PartitionGroupConsumer createConsumer(PartitionGroupMetadata metadata) {
+    public PartitionGroupConsumer createPartitionGroupConsumer(PartitionGroupMetadata metadata) {
       return null;
     }
   }
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentPartitionLLCRealtimeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentPartitionLLCRealtimeClusterIntegrationTest.java
index cd4f9b3..0196bde 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentPartitionLLCRealtimeClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentPartitionLLCRealtimeClusterIntegrationTest.java
@@ -165,7 +165,7 @@ public class SegmentPartitionLLCRealtimeClusterIntegrationTest extends BaseClust
       assertNotNull(columnPartitionMetadata);
       assertTrue(columnPartitionMetadata.getFunctionName().equalsIgnoreCase("murmur"));
       assertEquals(columnPartitionMetadata.getNumPartitions(), 2);
-      int streamPartitionId = new LLCSegmentName(segmentZKMetadata.getSegmentName()).getPartitionId();
+      int streamPartitionId = new LLCSegmentName(segmentZKMetadata.getSegmentName()).getPartitionGroupId();
       assertEquals(columnPartitionMetadata.getPartitions(), Collections.singleton(streamPartitionId));
       numSegmentsForPartition[streamPartitionId]++;
     }
@@ -236,7 +236,7 @@ public class SegmentPartitionLLCRealtimeClusterIntegrationTest extends BaseClust
       assertNotNull(columnPartitionMetadata);
       assertTrue(columnPartitionMetadata.getFunctionName().equalsIgnoreCase("murmur"));
       assertEquals(columnPartitionMetadata.getNumPartitions(), 2);
-      int streamPartitionId = new LLCSegmentName(segmentZKMetadata.getSegmentName()).getPartitionId();
+      int streamPartitionId = new LLCSegmentName(segmentZKMetadata.getSegmentName()).getPartitionGroupId();
       numSegmentsForPartition[streamPartitionId]++;
 
       if (segmentZKMetadata.getStatus() == Status.IN_PROGRESS) {
@@ -313,7 +313,7 @@ public class SegmentPartitionLLCRealtimeClusterIntegrationTest extends BaseClust
       assertNotNull(columnPartitionMetadata);
       assertTrue(columnPartitionMetadata.getFunctionName().equalsIgnoreCase("murmur"));
       assertEquals(columnPartitionMetadata.getNumPartitions(), 2);
-      int streamPartitionId = new LLCSegmentName(segmentZKMetadata.getSegmentName()).getPartitionId();
+      int streamPartitionId = new LLCSegmentName(segmentZKMetadata.getSegmentName()).getPartitionGroupId();
       numSegmentsForPartition[streamPartitionId]++;
 
       if (segmentZKMetadata.getStatus() == Status.IN_PROGRESS) {
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaConsumerFactory.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaConsumerFactory.java
index b8ed19d..82c282c 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaConsumerFactory.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaConsumerFactory.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pinot.plugin.stream.kafka09;
 
-import java.util.List;
 import java.util.Set;
 import org.apache.pinot.spi.stream.PartitionGroupConsumer;
 import org.apache.pinot.spi.stream.PartitionGroupMetadata;
@@ -55,13 +54,7 @@ public class KafkaConsumerFactory extends StreamConsumerFactory {
   }
 
   @Override
-  public List<PartitionGroupMetadata> getPartitionGroupMetadataList(
-      List<PartitionGroupMetadata> currentPartitionGroupsMetadata) {
-    return null;
-  }
-
-  @Override
-  public PartitionGroupConsumer createConsumer(PartitionGroupMetadata metadata) {
+  public PartitionGroupConsumer createPartitionGroupConsumer(PartitionGroupMetadata metadata) {
     return null;
   }
 }
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaPartitionGroupMetadata.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaPartitionGroupMetadata.java
new file mode 100644
index 0000000..1d792ac
--- /dev/null
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaPartitionGroupMetadata.java
@@ -0,0 +1,48 @@
+package org.apache.pinot.plugin.stream.kafka09;
+
+import org.apache.pinot.spi.stream.Checkpoint;
+import org.apache.pinot.spi.stream.PartitionGroupMetadata;
+
+
+public class KafkaPartitionGroupMetadata implements PartitionGroupMetadata {
+
+  private final int _groupId;
+  public KafkaPartitionGroupMetadata(int partitionId) {
+    _groupId = partitionId;
+  }
+
+  @Override
+  public int getGroupId() {
+    return _groupId;
+  }
+
+  @Override
+  public Checkpoint getStartCheckpoint() {
+    return null;
+  }
+
+  @Override
+  public Checkpoint getEndCheckpoint() {
+    return null;
+  }
+
+  @Override
+  public void setStartCheckpoint(Checkpoint startCheckpoint) {
+
+  }
+
+  @Override
+  public void setEndCheckpoint(Checkpoint endCheckpoint) {
+
+  }
+
+  @Override
+  public byte[] serialize() {
+    return new byte[0];
+  }
+
+  @Override
+  public PartitionGroupMetadata deserialize(byte[] blob) {
+    return null;
+  }
+}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaStreamMetadataProvider.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaStreamMetadataProvider.java
index 06ee697..865ae96 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaStreamMetadataProvider.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaStreamMetadataProvider.java
@@ -22,9 +22,13 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.Uninterruptibles;
 import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
 import java.util.concurrent.TimeUnit;
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 import kafka.api.PartitionOffsetRequestInfo;
 import kafka.common.TopicAndPartition;
 import kafka.javaapi.OffsetRequest;
@@ -36,6 +40,7 @@ import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.pinot.spi.stream.LongMsgOffset;
 import org.apache.pinot.spi.stream.OffsetCriteria;
+import org.apache.pinot.spi.stream.PartitionGroupMetadata;
 import org.apache.pinot.spi.stream.StreamConfig;
 import org.apache.pinot.spi.stream.StreamMetadataProvider;
 import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
@@ -84,7 +89,12 @@ public class KafkaStreamMetadataProvider extends KafkaConnectionHandler implemen
    * @return
    */
   @Override
+  @Deprecated
   public synchronized int fetchPartitionCount(long timeoutMillis) {
+    return fetchPartitionCountInternal(timeoutMillis);
+  }
+
+  private int fetchPartitionCountInternal(long timeoutMillis) {
     int unknownTopicReplyCount = 0;
     final int MAX_UNKNOWN_TOPIC_REPLY_COUNT = 10;
     int kafkaErrorCount = 0;
@@ -145,6 +155,22 @@ public class KafkaStreamMetadataProvider extends KafkaConnectionHandler implemen
     throw new TimeoutException();
   }
 
+  /**
+   * Fetch the partition group metadata list
+   * @param currentPartitionGroupsMetadata In case of Kafka, each partition group contains a single partition.
+   *                                       Hence current partition groups are not needed to compute the new partition groups
+   */
+  @Override
+  public List<PartitionGroupMetadata> getPartitionGroupMetadataList(
+      @Nullable List<PartitionGroupMetadata> currentPartitionGroupsMetadata, long timeoutMillis) {
+    int partitionCount = fetchPartitionCountInternal(timeoutMillis);
+    List<PartitionGroupMetadata> partitionGroupMetadataList = new ArrayList<>(partitionCount);
+    for (int i = 0; i < partitionCount; i++) {
+      partitionGroupMetadataList.add(new KafkaPartitionGroupMetadata(i));
+    }
+    return partitionGroupMetadataList;
+  }
+
   public synchronized long fetchPartitionOffset(@Nonnull OffsetCriteria offsetCriteria, long timeoutMillis)
       throws java.util.concurrent.TimeoutException {
     throw new UnsupportedOperationException("The use of this method s not supported");
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/test/java/org/apache/pinot/plugin/stream/kafka09/KafkaPartitionLevelConsumerTest.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/test/java/org/apache/pinot/plugin/stream/kafka09/KafkaPartitionLevelConsumerTest.java
index beb82e5..fbdfdfb 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/test/java/org/apache/pinot/plugin/stream/kafka09/KafkaPartitionLevelConsumerTest.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/test/java/org/apache/pinot/plugin/stream/kafka09/KafkaPartitionLevelConsumerTest.java
@@ -291,7 +291,7 @@ public class KafkaPartitionLevelConsumerTest {
 
     KafkaStreamMetadataProvider streamMetadataProvider =
         new KafkaStreamMetadataProvider(clientId, streamConfig, mockKafkaSimpleConsumerFactory);
-    Assert.assertEquals(streamMetadataProvider.fetchPartitionCount(10000L), 2);
+    Assert.assertEquals(streamMetadataProvider.getPartitionGroupMetadataList(null, 10000L), 2);
   }
 
   @Test
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConsumerFactory.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConsumerFactory.java
index 806baff..c73aacb 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConsumerFactory.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConsumerFactory.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pinot.plugin.stream.kafka20;
 
-import java.util.List;
 import java.util.Set;
 import org.apache.pinot.spi.stream.PartitionGroupConsumer;
 import org.apache.pinot.spi.stream.PartitionGroupMetadata;
@@ -52,13 +51,7 @@ public class KafkaConsumerFactory extends StreamConsumerFactory {
   }
 
   @Override
-  public List<PartitionGroupMetadata> getPartitionGroupMetadataList(
-      List<PartitionGroupMetadata> currentPartitionGroupsMetadata) {
-    return null;
-  }
-
-  @Override
-  public PartitionGroupConsumer createConsumer(PartitionGroupMetadata metadata) {
+  public PartitionGroupConsumer createPartitionGroupConsumer(PartitionGroupMetadata metadata) {
     return null;
   }
 }
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionGroupMetadata.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionGroupMetadata.java
new file mode 100644
index 0000000..31ae75a
--- /dev/null
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionGroupMetadata.java
@@ -0,0 +1,48 @@
+package org.apache.pinot.plugin.stream.kafka20;
+
+import org.apache.pinot.spi.stream.Checkpoint;
+import org.apache.pinot.spi.stream.PartitionGroupMetadata;
+
+
+public class KafkaPartitionGroupMetadata implements PartitionGroupMetadata {
+
+  private final int _groupId;
+  public KafkaPartitionGroupMetadata(int partitionId) {
+    _groupId = partitionId;
+  }
+
+  @Override
+  public int getGroupId() {
+    return _groupId;
+  }
+
+  @Override
+  public Checkpoint getStartCheckpoint() {
+    return null;
+  }
+
+  @Override
+  public Checkpoint getEndCheckpoint() {
+    return null;
+  }
+
+  @Override
+  public void setStartCheckpoint(Checkpoint startCheckpoint) {
+
+  }
+
+  @Override
+  public void setEndCheckpoint(Checkpoint endCheckpoint) {
+
+  }
+
+  @Override
+  public byte[] serialize() {
+    return new byte[0];
+  }
+
+  @Override
+  public PartitionGroupMetadata deserialize(byte[] blob) {
+    return null;
+  }
+}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
index c0e2041..187c61b 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
@@ -21,11 +21,15 @@ package org.apache.pinot.plugin.stream.kafka20;
 import com.google.common.base.Preconditions;
 import java.io.IOException;
 import java.time.Duration;
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
 import java.util.concurrent.TimeoutException;
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 import org.apache.pinot.spi.stream.LongMsgOffset;
 import org.apache.pinot.spi.stream.OffsetCriteria;
+import org.apache.pinot.spi.stream.PartitionGroupMetadata;
 import org.apache.pinot.spi.stream.StreamConfig;
 import org.apache.pinot.spi.stream.StreamMetadataProvider;
 import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
@@ -42,10 +46,27 @@ public class KafkaStreamMetadataProvider extends KafkaPartitionLevelConnectionHa
   }
 
   @Override
+  @Deprecated
   public int fetchPartitionCount(long timeoutMillis) {
     return _consumer.partitionsFor(_topic, Duration.ofMillis(timeoutMillis)).size();
   }
 
+  /**
+   * Fetch the partitionGroupMetadata list.
+   * @param currentPartitionGroupsMetadata In case of Kafka, each partition group contains a single partition.
+   *                                       Hence current partition groups are not needed to compute the new partition groups
+   */
+  @Override
+  public List<PartitionGroupMetadata> getPartitionGroupMetadataList(
+      @Nullable List<PartitionGroupMetadata> currentPartitionGroupsMetadata, long timeoutMillis) {
+    int partitionCount = _consumer.partitionsFor(_topic, Duration.ofMillis(timeoutMillis)).size();
+    List<PartitionGroupMetadata> partitionGroupMetadataList = new ArrayList<>(partitionCount);
+    for (int i = 0; i < partitionCount; i++) {
+      partitionGroupMetadataList.add(new KafkaPartitionGroupMetadata(i));
+    }
+    return partitionGroupMetadataList;
+  }
+
   public synchronized long fetchPartitionOffset(@Nonnull OffsetCriteria offsetCriteria, long timeoutMillis)
       throws java.util.concurrent.TimeoutException {
     throw new UnsupportedOperationException("The use of this method is not supported");
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionCountFetcher.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java
similarity index 74%
rename from pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionCountFetcher.java
rename to pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java
index d523235..e1ce1a6 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionCountFetcher.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.spi.stream;
 
+import java.util.List;
 import java.util.concurrent.Callable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -26,24 +27,27 @@ import org.slf4j.LoggerFactory;
 /**
  * Fetches the partition count of a stream using the {@link StreamMetadataProvider}
  */
-public class PartitionCountFetcher implements Callable<Boolean> {
+public class PartitionGroupMetadataFetcher implements Callable<Boolean> {
 
-  private static final Logger LOGGER = LoggerFactory.getLogger(PartitionCountFetcher.class);
+  private static final Logger LOGGER = LoggerFactory.getLogger(PartitionGroupMetadataFetcher.class);
 
   private int _partitionCount = -1;
+  private List<PartitionGroupMetadata> _partitionGroupMetadataList;
+  private List<PartitionGroupMetadata> _currentPartitionGroupMetadata;
   private final StreamConfig _streamConfig;
   private StreamConsumerFactory _streamConsumerFactory;
   private Exception _exception;
   private final String _topicName;
 
-  public PartitionCountFetcher(StreamConfig streamConfig) {
+  public PartitionGroupMetadataFetcher(StreamConfig streamConfig, List<PartitionGroupMetadata> currentPartitionGroupMetadataList) {
     _streamConfig = streamConfig;
     _streamConsumerFactory = StreamConsumerFactoryProvider.create(_streamConfig);
     _topicName = streamConfig.getTopicName();
+    _currentPartitionGroupMetadata = currentPartitionGroupMetadataList;
   }
 
-  public int getPartitionCount() {
-    return _partitionCount;
+  public List<PartitionGroupMetadata> getPartitionGroupMetadataList() {
+    return _partitionGroupMetadataList;
   }
 
   public Exception getException() {
@@ -59,10 +63,10 @@ public class PartitionCountFetcher implements Callable<Boolean> {
   public Boolean call()
       throws Exception {
 
-    String clientId = PartitionCountFetcher.class.getSimpleName() + "-" + _topicName;
+    String clientId = PartitionGroupMetadataFetcher.class.getSimpleName() + "-" + _topicName;
     try (
         StreamMetadataProvider streamMetadataProvider = _streamConsumerFactory.createStreamMetadataProvider(clientId)) {
-      _partitionCount = streamMetadataProvider.fetchPartitionCount(/*maxWaitTimeMs=*/5000L);
+      _partitionGroupMetadataList = streamMetadataProvider.getPartitionGroupMetadataList(_currentPartitionGroupMetadata, /*maxWaitTimeMs=*/5000L);
       if (_exception != null) {
         // We had at least one failure, but succeeded now. Log an info
         LOGGER.info("Successfully retrieved partition count as {} for topic {}", _partitionCount, _topicName);
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionOffsetFetcher.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionOffsetFetcher.java
index 1d50160..b92f04d 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionOffsetFetcher.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionOffsetFetcher.java
@@ -33,16 +33,16 @@ public class PartitionOffsetFetcher implements Callable<Boolean> {
 
   private final String _topicName;
   private final OffsetCriteria _offsetCriteria;
-  private final int _partitionId;
+  private final int _partitionGroupId;
 
   private Exception _exception = null;
   private StreamPartitionMsgOffset _offset;
   private StreamConsumerFactory _streamConsumerFactory;
   StreamConfig _streamConfig;
 
-  public PartitionOffsetFetcher(final OffsetCriteria offsetCriteria, int partitionId, StreamConfig streamConfig) {
+  public PartitionOffsetFetcher(final OffsetCriteria offsetCriteria, int partitionGroupId, StreamConfig streamConfig) {
     _offsetCriteria = offsetCriteria;
-    _partitionId = partitionId;
+    _partitionGroupId = partitionGroupId;
     _streamConfig = streamConfig;
     _streamConsumerFactory = StreamConsumerFactoryProvider.create(streamConfig);
     _topicName = streamConfig.getTopicName();
@@ -64,18 +64,19 @@ public class PartitionOffsetFetcher implements Callable<Boolean> {
   @Override
   public Boolean call()
       throws Exception {
-    String clientId = PartitionOffsetFetcher.class.getSimpleName() + "-" + _topicName + "-" + _partitionId;
+    String clientId = PartitionOffsetFetcher.class.getSimpleName() + "-" + _topicName + "-" + _partitionGroupId;
     try (StreamMetadataProvider streamMetadataProvider = _streamConsumerFactory
-        .createPartitionMetadataProvider(clientId, _partitionId)) {
+        .createPartitionMetadataProvider(clientId, _partitionGroupId)) {
       _offset =
           streamMetadataProvider.fetchStreamPartitionOffset(_offsetCriteria, STREAM_PARTITION_OFFSET_FETCH_TIMEOUT_MILLIS);
       if (_exception != null) {
         LOGGER.info("Successfully retrieved offset({}) for stream topic {} partition {}", _offset, _topicName,
-            _partitionId);
+            _partitionGroupId);
       }
       return Boolean.TRUE;
     } catch (TransientConsumerException e) {
-      LOGGER.warn("Temporary exception when fetching offset for topic {} partition {}:{}", _topicName, _partitionId,
+      LOGGER.warn("Temporary exception when fetching offset for topic {} partition {}:{}", _topicName,
+          _partitionGroupId,
           e.getMessage());
       _exception = e;
       return Boolean.FALSE;
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java
index 4db0fb1..9caf61b 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pinot.spi.stream;
 
-import java.util.List;
 import java.util.Set;
 
 
@@ -42,6 +41,7 @@ public abstract class StreamConsumerFactory {
    * @param partition the partition id of the partition for which this consumer is being created
    * @return
    */
+  @Deprecated
   public abstract PartitionLevelConsumer createPartitionLevelConsumer(String clientId, int partition);
 
   /**
@@ -74,10 +74,6 @@ public abstract class StreamConsumerFactory {
     return new LongMsgOffsetFactory();
   }
 
-  // takes the current state of partition groups (groupings of shards, the state of the consumption) and creates the new state
-  public abstract List<PartitionGroupMetadata> getPartitionGroupMetadataList(
-      List<PartitionGroupMetadata> currentPartitionGroupsMetadata);
-
   // creates a consumer which consumes from a partition group
-  public abstract PartitionGroupConsumer createConsumer(PartitionGroupMetadata metadata);
+  public abstract PartitionGroupConsumer createPartitionGroupConsumer(PartitionGroupMetadata metadata);
 }
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
index 557ffc4..5b9104e 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.spi.stream;
 
 import java.io.Closeable;
+import java.util.List;
 import javax.annotation.Nonnull;
 import org.apache.pinot.spi.annotations.InterfaceAudience;
 import org.apache.pinot.spi.annotations.InterfaceStability;
@@ -32,11 +33,15 @@ import org.apache.pinot.spi.annotations.InterfaceStability;
 public interface StreamMetadataProvider extends Closeable {
   /**
    * Fetches the number of partitions for a topic given the stream configs
-   * @param timeoutMillis
-   * @return
+   * @deprecated use getPartitionGroupMetadataList instead
    */
+  @Deprecated
   int fetchPartitionCount(long timeoutMillis);
 
+  // takes the current state of partition groups (groupings of shards, the state of the consumption) and creates the new state
+  List<PartitionGroupMetadata> getPartitionGroupMetadataList(
+      List<PartitionGroupMetadata> currentPartitionGroupsMetadata, long timeoutMillis);
+
   // Issue 5953 Retain this interface for 0.5.0, remove in 0.6.0
   @Deprecated
   long fetchPartitionOffset(@Nonnull OffsetCriteria offsetCriteria, long timeoutMillis)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org