You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2021/01/02 23:52:23 UTC

[incubator-pinot] 06/08: More controller side changes

This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch sharded_consumer_type_support
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 1cb09cf59d2b4dd9b6ff79fdbe48775ae532ea60
Author: Neha Pawar <ne...@gmail.com>
AuthorDate: Thu Dec 31 15:39:44 2020 -0800

    More controller side changes
---
 .../helix/core/PinotHelixResourceManager.java      |  4 +-
 .../helix/core/PinotTableIdealStateBuilder.java    | 16 +++---
 .../realtime/PinotLLCRealtimeSegmentManager.java   | 57 ++++++++--------------
 .../PinotLLCRealtimeSegmentManagerTest.java        |  4 +-
 .../realtime/LLRealtimeSegmentDataManager.java     |  2 +-
 .../impl/fakestream/FakeStreamConsumerFactory.java |  2 +-
 .../fakestream/FakeStreamMetadataProvider.java     | 12 +++--
 .../kafka09/KafkaStreamMetadataProvider.java       | 36 ++++++++++----
 .../kafka09/KafkaPartitionLevelConsumerTest.java   |  4 +-
 .../kafka20/KafkaStreamMetadataProvider.java       |  3 +-
 ...Fetcher.java => PartitionGroupInfoFetcher.java} | 31 +++++-------
 .../pinot/spi/stream/StreamMetadataProvider.java   |  2 +-
 12 files changed, 86 insertions(+), 87 deletions(-)

diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index ebfbfa1..e80c06b 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -1337,7 +1337,7 @@ public class PinotHelixResourceManager {
       idealState = PinotTableIdealStateBuilder
           .buildLowLevelRealtimeIdealStateFor(realtimeTableName, realtimeTableConfig, idealState,
               _enableBatchMessageMode);
-      _pinotLLCRealtimeSegmentManager.setupNewShardedTable(realtimeTableConfig, idealState);
+      _pinotLLCRealtimeSegmentManager.setupNewTable(realtimeTableConfig, idealState);
       LOGGER.info("Successfully setup table for SHARDED consumers for {} ", realtimeTableName);
     } else {
 
@@ -1366,7 +1366,7 @@ public class PinotHelixResourceManager {
           idealState = PinotTableIdealStateBuilder
               .buildLowLevelRealtimeIdealStateFor(realtimeTableName, realtimeTableConfig, idealState,
                   _enableBatchMessageMode);
-          _pinotLLCRealtimeSegmentManager.setupNewShardedTable(realtimeTableConfig, idealState);
+          _pinotLLCRealtimeSegmentManager.setupNewTable(realtimeTableConfig, idealState);
           LOGGER.info("Successfully added Helix entries for low-level consumers for {} ", realtimeTableName);
         } else {
           LOGGER.info("LLC is already set up for table {}, not configuring again", realtimeTableName);
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
index a7b3c9e..8b200bb 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
@@ -30,10 +30,10 @@ import org.apache.pinot.common.metadata.instance.InstanceZKMetadata;
 import org.apache.pinot.common.utils.StringUtil;
 import org.apache.pinot.common.utils.config.TagNameUtils;
 import org.apache.pinot.common.utils.helix.HelixHelper;
-import org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
 import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.stream.PartitionGroupInfo;
 import org.apache.pinot.spi.stream.PartitionGroupMetadata;
-import org.apache.pinot.spi.stream.PartitionGroupMetadataFetcher;
+import org.apache.pinot.spi.stream.PartitionGroupInfoFetcher;
 import org.apache.pinot.spi.stream.StreamConfig;
 import org.apache.pinot.spi.utils.IngestionConfigUtils;
 import org.apache.pinot.spi.utils.retry.RetryPolicies;
@@ -115,15 +115,15 @@ public class PinotTableIdealStateBuilder {
     return idealState;
   }
 
-  public static List<PartitionGroupMetadata> getPartitionGroupMetadataList(StreamConfig streamConfig,
+  public static List<PartitionGroupInfo> getPartitionGroupInfoList(StreamConfig streamConfig,
       List<PartitionGroupMetadata> currentPartitionGroupMetadataList) {
-    PartitionGroupMetadataFetcher partitionGroupMetadataFetcher =
-        new PartitionGroupMetadataFetcher(streamConfig, currentPartitionGroupMetadataList);
+    PartitionGroupInfoFetcher partitionGroupInfoFetcher =
+        new PartitionGroupInfoFetcher(streamConfig, currentPartitionGroupMetadataList);
     try {
-      RetryPolicies.noDelayRetryPolicy(3).attempt(partitionGroupMetadataFetcher);
-      return partitionGroupMetadataFetcher.getPartitionGroupMetadataList();
+      RetryPolicies.noDelayRetryPolicy(3).attempt(partitionGroupInfoFetcher);
+      return partitionGroupInfoFetcher.getPartitionGroupInfoList();
     } catch (Exception e) {
-      Exception fetcherException = partitionGroupMetadataFetcher.getException();
+      Exception fetcherException = partitionGroupInfoFetcher.getException();
       LOGGER.error("Could not get partition count for {}", streamConfig.getTopicName(), fetcherException);
       throw new RuntimeException(fetcherException);
     }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 423a0b2..d899b4c 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -48,7 +48,6 @@ import org.apache.pinot.common.metadata.segment.ColumnPartitionMetadata;
 import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
 import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
 import org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata;
-import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.metrics.ControllerMeter;
 import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
@@ -208,7 +207,7 @@ public class PinotLLCRealtimeSegmentManager {
   /**
    * Sets up the realtime table ideal state for a table of consumer type SHARDED
    */
-  public void setupNewShardedTable(TableConfig tableConfig, IdealState idealState) {
+  public void setupNewTable(TableConfig tableConfig, IdealState idealState) {
     Preconditions.checkState(!_isStopping, "Segment manager is stopping");
 
     String realtimeTableName = tableConfig.getTableName();
@@ -220,18 +219,8 @@ public class PinotLLCRealtimeSegmentManager {
         new PartitionLevelStreamConfig(tableConfig.getTableName(), IngestionConfigUtils.getStreamConfigMap(tableConfig));
 
     // get new partition groups and their metadata
-    StreamConsumerFactory streamConsumerFactory = StreamConsumerFactoryProvider.create(streamConfig);
-    StreamMetadataProvider streamMetadataProvider = streamConsumerFactory
-        .createStreamMetadataProvider(streamConfig.getTopicName() + "_" + System.currentTimeMillis());
-
-    List<PartitionGroupInfo> newPartitionGroupMetadataList;
-    try {
-      newPartitionGroupMetadataList =
-          streamMetadataProvider.getPartitionGroupMetadataList(Collections.emptyList(), 5000);
-    } catch (TimeoutException e) {
-      throw new IllegalStateException(e);
-    }
-    int numPartitionGroups = newPartitionGroupMetadataList.size();
+    List<PartitionGroupInfo> newPartitionGroupInfoList = getPartitionGroupInfoList(streamConfig, Collections.emptyList());
+    int numPartitionGroups = newPartitionGroupInfoList.size();
 
     InstancePartitions instancePartitions = getConsumingInstancePartitions(tableConfig);
     int numReplicas = getNumReplicas(tableConfig, instancePartitions);
@@ -242,7 +231,7 @@ public class PinotLLCRealtimeSegmentManager {
 
     long currentTimeMs = getCurrentTimeMs();
     Map<String, Map<String, String>> instanceStatesMap = idealState.getRecord().getMapFields();
-    for (PartitionGroupInfo partitionGroupInfo : newPartitionGroupMetadataList) {
+    for (PartitionGroupInfo partitionGroupInfo : newPartitionGroupInfoList) {
       String segmentName = setupNewPartitionGroup(tableConfig, streamConfig, partitionGroupInfo,
           currentTimeMs, instancePartitions, numPartitionGroups, numReplicas);
       updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null, segmentName, segmentAssignment,
@@ -507,18 +496,10 @@ public class PinotLLCRealtimeSegmentManager {
     List<PartitionGroupMetadata> currentPartitionGroupMetadataList = getCurrentPartitionGroupMetadataList(idealState);
     PartitionLevelStreamConfig streamConfig = new PartitionLevelStreamConfig(tableConfig.getTableName(),
         IngestionConfigUtils.getStreamConfigMap(tableConfig));
-    StreamConsumerFactory streamConsumerFactory = StreamConsumerFactoryProvider.create(streamConfig);
-    StreamMetadataProvider streamMetadataProvider = streamConsumerFactory
-        .createStreamMetadataProvider(streamConfig.getTopicName() + " " + System.currentTimeMillis());
 
     // find new partition groups [A],[B],[C],[D]
-    List<PartitionGroupInfo> newPartitionGroupMetadataList;
-    try {
-      newPartitionGroupMetadataList =
-          streamMetadataProvider.getPartitionGroupMetadataList(currentPartitionGroupMetadataList, 1000);
-    } catch (TimeoutException e) {
-      throw new IllegalStateException(e);
-    }
+    List<PartitionGroupInfo> newPartitionGroupMetadataList =
+        getPartitionGroupInfoList(streamConfig, currentPartitionGroupMetadataList);
 
     // create new segment metadata, only if it is not IN_PROGRESS in the current state
     Map<Integer, PartitionGroupMetadata> currentGroupIdToMetadata = currentPartitionGroupMetadataList.stream().collect(
@@ -721,9 +702,9 @@ public class PinotLLCRealtimeSegmentManager {
   }
 
   @VisibleForTesting
-  List<PartitionGroupMetadata> getPartitionGroupMetadataList(StreamConfig streamConfig,
+  List<PartitionGroupInfo> getPartitionGroupInfoList(StreamConfig streamConfig,
       List<PartitionGroupMetadata> currentPartitionGroupMetadataList) {
-    return PinotTableIdealStateBuilder.getPartitionGroupMetadataList(streamConfig, currentPartitionGroupMetadataList);
+    return PinotTableIdealStateBuilder.getPartitionGroupInfoList(streamConfig, currentPartitionGroupMetadataList);
   }
 
   @VisibleForTesting
@@ -843,7 +824,7 @@ public class PinotLLCRealtimeSegmentManager {
       if (idealState.isEnabled()) {
         List<PartitionGroupMetadata> currentPartitionGroupMetadataList =
             getCurrentPartitionGroupMetadataList(idealState);
-        int numPartitions = getPartitionGroupMetadataList(streamConfig, currentPartitionGroupMetadataList).size();
+        int numPartitions = getPartitionGroupInfoList(streamConfig, currentPartitionGroupMetadataList).size();
         return ensureAllPartitionsConsuming(tableConfig, streamConfig, idealState, numPartitions);
       } else {
         LOGGER.info("Skipping LLC segments validation for disabled table: {}", realtimeTableName);
@@ -1005,7 +986,7 @@ public class PinotLLCRealtimeSegmentManager {
     //       and restart consumption from the same offset (if possible) or a newer offset (if realtime stream does not have the same offset).
     //       In latter case, report data loss.
     for (Map.Entry<Integer, LLCRealtimeSegmentZKMetadata> entry : latestSegmentZKMetadataMap.entrySet()) {
-      int partitionId = entry.getKey();
+      int partitionGroupId = entry.getKey();
       LLCRealtimeSegmentZKMetadata latestSegmentZKMetadata = entry.getValue();
       String latestSegmentName = latestSegmentZKMetadata.getSegmentName();
       LLCSegmentName latestLLCSegmentName = new LLCSegmentName(latestSegmentName);
@@ -1049,10 +1030,10 @@ public class PinotLLCRealtimeSegmentManager {
             StreamPartitionMsgOffset startOffset = offsetFactory.create(latestSegmentZKMetadata.getStartOffset());
             // Start offset must be higher than the start offset of the stream
             StreamPartitionMsgOffset partitionStartOffset =
-                getPartitionOffset(streamConfig, OffsetCriteria.SMALLEST_OFFSET_CRITERIA, partitionId);
+                getPartitionOffset(streamConfig, OffsetCriteria.SMALLEST_OFFSET_CRITERIA, partitionGroupId);
             if (partitionStartOffset.compareTo(startOffset) > 0) {
               LOGGER.error("Data lost from offset: {} to: {} for partition: {} of table: {}", startOffset,
-                  partitionStartOffset, partitionId, realtimeTableName);
+                  partitionStartOffset, partitionGroupId, realtimeTableName);
               _controllerMetrics.addMeteredTableValue(realtimeTableName, ControllerMeter.LLC_STREAM_DATA_LOSS, 1L);
               startOffset = partitionStartOffset;
             }
@@ -1089,7 +1070,7 @@ public class PinotLLCRealtimeSegmentManager {
           String previousConsumingSegment = null;
           for (Map.Entry<String, Map<String, String>> segmentEntry : instanceStatesMap.entrySet()) {
             LLCSegmentName llcSegmentName = new LLCSegmentName(segmentEntry.getKey());
-            if (llcSegmentName.getPartitionGroupId() == partitionId && segmentEntry.getValue()
+            if (llcSegmentName.getPartitionGroupId() == partitionGroupId && segmentEntry.getValue()
                 .containsValue(SegmentStateModel.CONSUMING)) {
               previousConsumingSegment = llcSegmentName.getSegmentName();
               break;
@@ -1098,7 +1079,7 @@ public class PinotLLCRealtimeSegmentManager {
           if (previousConsumingSegment == null) {
             LOGGER
                 .error("Failed to find previous CONSUMING segment for partition: {} of table: {}, potential data loss",
-                    partitionId, realtimeTableName);
+                    partitionGroupId, realtimeTableName);
             _controllerMetrics.addMeteredTableValue(realtimeTableName, ControllerMeter.LLC_STREAM_DATA_LOSS, 1L);
           }
           updateInstanceStatesForNewConsumingSegment(instanceStatesMap, previousConsumingSegment, latestSegmentName,
@@ -1111,10 +1092,14 @@ public class PinotLLCRealtimeSegmentManager {
     }
 
     // Set up new partitions if not exist
-    for (int partitionId = 0; partitionId < numPartitions; partitionId++) {
-      if (!latestSegmentZKMetadataMap.containsKey(partitionId)) {
+    List<PartitionGroupMetadata> currentPartitionGroupMetadataList = getCurrentPartitionGroupMetadataList(idealState);
+    List<PartitionGroupInfo> partitionGroupInfoList =
+        getPartitionGroupInfoList(streamConfig, currentPartitionGroupMetadataList);
+    for (PartitionGroupInfo partitionGroupInfo : partitionGroupInfoList) {
+      int partitionGroupId = partitionGroupInfo.getPartitionGroupId();
+      if (!latestSegmentZKMetadataMap.containsKey(partitionGroupId)) {
         String newSegmentName =
-            setupNewPartitionGroup(tableConfig, streamConfig, partitionId, currentTimeMs, instancePartitions, numPartitions,
+            setupNewPartitionGroup(tableConfig, streamConfig, partitionGroupInfo, currentTimeMs, instancePartitions, numPartitions,
                 numReplicas);
         updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null, newSegmentName, segmentAssignment,
             instancePartitionsMap);
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index 743e719..42bdedc 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -50,7 +50,6 @@ import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignme
 import org.apache.pinot.controller.helix.core.realtime.segment.CommittingSegmentDescriptor;
 import org.apache.pinot.controller.util.SegmentCompletionUtils;
 import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
-import org.apache.pinot.core.realtime.impl.fakestream.FakePartitionGroupMetadata;
 import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
 import org.apache.pinot.core.segment.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.spi.config.table.TableConfig;
@@ -60,6 +59,7 @@ import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.filesystem.PinotFSFactory;
 import org.apache.pinot.spi.stream.LongMsgOffset;
 import org.apache.pinot.spi.stream.OffsetCriteria;
+import org.apache.pinot.spi.stream.PartitionGroupInfo;
 import org.apache.pinot.spi.stream.PartitionGroupMetadata;
 import org.apache.pinot.spi.stream.PartitionLevelStreamConfig;
 import org.apache.pinot.spi.stream.StreamConfig;
@@ -914,7 +914,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
     }
 
     @Override
-    List<PartitionGroupMetadata> getPartitionGroupMetadataList(StreamConfig streamConfig, List<PartitionGroupMetadata> currentPartitionGroupMetadataList) {
+    List<PartitionGroupInfo> getPartitionGroupInfoList(StreamConfig streamConfig, List<PartitionGroupMetadata> currentPartitionGroupMetadataList) {
       return IntStream.range(0, _numPartitions).mapToObj(FakePartitionGroupMetadata::new).collect(Collectors.toList());
     }
 
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index f00facc..0cd1fba 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -1239,7 +1239,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
         int numPartitions = columnPartitionConfig.getNumPartitions();
         try {
           // fixme: get this from ideal state
-          int numStreamPartitions = _streamMetadataProvider.getPartitionGroupMetadataList(Collections.emptyList(), 5000).size();
+          int numStreamPartitions = _streamMetadataProvider.getPartitionGroupInfoList(Collections.emptyList(), 5000).size();
           if (numStreamPartitions != numPartitions) {
             segmentLogger.warn(
                 "Number of stream partitions: {} does not match number of partitions in the partition config: {}, using number of stream partitions",
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java
index 54be1b6..6121eef 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java
@@ -88,7 +88,7 @@ public class FakeStreamConsumerFactory extends StreamConsumerFactory {
 
     // stream metadata provider
     StreamMetadataProvider streamMetadataProvider = streamConsumerFactory.createStreamMetadataProvider(clientId);
-    int partitionCount = streamMetadataProvider.getPartitionGroupMetadataList(Collections.emptyList(), 10_000).size();
+    int partitionCount = streamMetadataProvider.getPartitionGroupInfoList(Collections.emptyList(), 10_000).size();
     System.out.println(partitionCount);
 
     // Partition metadata provider
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMetadataProvider.java b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMetadataProvider.java
index c96d06a..0de0ce2 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMetadataProvider.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMetadataProvider.java
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.concurrent.TimeoutException;
 import javax.annotation.Nonnull;
 import org.apache.pinot.spi.stream.OffsetCriteria;
+import org.apache.pinot.spi.stream.PartitionGroupInfo;
 import org.apache.pinot.spi.stream.PartitionGroupMetadata;
 import org.apache.pinot.spi.stream.StreamConfig;
 import org.apache.pinot.spi.stream.StreamMetadataProvider;
@@ -35,9 +36,11 @@ import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
  */
 public class FakeStreamMetadataProvider implements StreamMetadataProvider {
   private final int _numPartitions;
+  private StreamConfig _streamConfig;
 
   public FakeStreamMetadataProvider(StreamConfig streamConfig) {
     _numPartitions = FakeStreamConfigUtils.getNumPartitions(streamConfig);
+    _streamConfig = streamConfig;
   }
 
   @Override
@@ -46,11 +49,12 @@ public class FakeStreamMetadataProvider implements StreamMetadataProvider {
   }
 
   @Override
-  public List<PartitionGroupMetadata> getPartitionGroupMetadataList(
-      List<PartitionGroupMetadata> currentPartitionGroupsMetadata, long timeoutMillis) {
-    List<PartitionGroupMetadata> partitionGroupMetadataList = new ArrayList<>();
+  public List<PartitionGroupInfo> getPartitionGroupInfoList(
+      List<PartitionGroupMetadata> currentPartitionGroupsMetadata, long timeoutMillis)
+      throws TimeoutException {
+    List<PartitionGroupInfo> partitionGroupMetadataList = new ArrayList<>();
     for (int i = 0; i < _numPartitions; i++) {
-      partitionGroupMetadataList.add(new FakePartitionGroupMetadata(i));
+      partitionGroupMetadataList.add(new PartitionGroupInfo(i, fetchStreamPartitionOffset(_streamConfig.getOffsetCriteria(), 5000).toString()));
     }
     return partitionGroupMetadataList;
   }
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaStreamMetadataProvider.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaStreamMetadataProvider.java
index 865ae96..2d0ad31 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaStreamMetadataProvider.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaStreamMetadataProvider.java
@@ -40,6 +40,7 @@ import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.pinot.spi.stream.LongMsgOffset;
 import org.apache.pinot.spi.stream.OffsetCriteria;
+import org.apache.pinot.spi.stream.PartitionGroupInfo;
 import org.apache.pinot.spi.stream.PartitionGroupMetadata;
 import org.apache.pinot.spi.stream.StreamConfig;
 import org.apache.pinot.spi.stream.StreamMetadataProvider;
@@ -54,13 +55,14 @@ import org.slf4j.LoggerFactory;
 public class KafkaStreamMetadataProvider extends KafkaConnectionHandler implements StreamMetadataProvider {
   private static final Logger LOGGER = LoggerFactory.getLogger(KafkaStreamMetadataProvider.class);
 
+  private StreamConfig _streamConfig;
+
   /**
    * Create a partition specific metadata provider
-   * @param streamConfig
-   * @param partition
    */
   public KafkaStreamMetadataProvider(String clientId, StreamConfig streamConfig, int partition) {
     super(clientId, streamConfig, partition, new KafkaSimpleConsumerFactoryImpl());
+    _streamConfig = streamConfig;
   }
 
   /**
@@ -69,18 +71,21 @@ public class KafkaStreamMetadataProvider extends KafkaConnectionHandler implemen
    */
   public KafkaStreamMetadataProvider(String clientId, StreamConfig streamConfig) {
     super(clientId, streamConfig, new KafkaSimpleConsumerFactoryImpl());
+    _streamConfig = streamConfig;
   }
 
   @VisibleForTesting
   public KafkaStreamMetadataProvider(String clientId, StreamConfig streamConfig, int partition,
       KafkaSimpleConsumerFactory kafkaSimpleConsumerFactory) {
     super(clientId, streamConfig, partition, kafkaSimpleConsumerFactory);
+    _streamConfig = streamConfig;
   }
 
   @VisibleForTesting
   public KafkaStreamMetadataProvider(String clientId, StreamConfig streamConfig,
       KafkaSimpleConsumerFactory kafkaSimpleConsumerFactory) {
     super(clientId, streamConfig, kafkaSimpleConsumerFactory);
+    _streamConfig = streamConfig;
   }
 
   /**
@@ -156,19 +161,30 @@ public class KafkaStreamMetadataProvider extends KafkaConnectionHandler implemen
   }
 
   /**
-   * Fetch the partition group metadata list
+   * Fetch the partitionGroupMetadata list.
    * @param currentPartitionGroupsMetadata In case of Kafka, each partition group contains a single partition.
-   *                                       Hence current partition groups are not needed to compute the new partition groups
    */
   @Override
-  public List<PartitionGroupMetadata> getPartitionGroupMetadataList(
-      @Nullable List<PartitionGroupMetadata> currentPartitionGroupsMetadata, long timeoutMillis) {
+  public List<PartitionGroupInfo> getPartitionGroupInfoList(
+      List<PartitionGroupMetadata> currentPartitionGroupsMetadata, long timeoutMillis)
+      throws java.util.concurrent.TimeoutException {
     int partitionCount = fetchPartitionCountInternal(timeoutMillis);
-    List<PartitionGroupMetadata> partitionGroupMetadataList = new ArrayList<>(partitionCount);
-    for (int i = 0; i < partitionCount; i++) {
-      partitionGroupMetadataList.add(new KafkaPartitionGroupMetadata(i));
+    List<PartitionGroupInfo> newPartitionGroupInfoList = new ArrayList<>(partitionCount);
+
+    // add a PartitionGroupInfo into the list foreach partition already present in current.
+    // the end checkpoint is set as checkpoint
+    for (PartitionGroupMetadata currentPartitionGroupMetadata : currentPartitionGroupsMetadata) {
+      newPartitionGroupInfoList.add(new PartitionGroupInfo(currentPartitionGroupMetadata.getPartitionGroupId(),
+          currentPartitionGroupMetadata.getEndCheckpoint()));
+    }
+    // add PartitiongroupInfo for new partitions
+    // use offset criteria from stream config
+    for (int i = currentPartitionGroupsMetadata.size(); i < partitionCount; i++) {
+      StreamPartitionMsgOffset streamPartitionMsgOffset =
+          fetchStreamPartitionOffset(_streamConfig.getOffsetCriteria(), 5000);
+      newPartitionGroupInfoList.add(new PartitionGroupInfo(i, streamPartitionMsgOffset.toString()));
     }
-    return partitionGroupMetadataList;
+    return newPartitionGroupInfoList;
   }
 
   public synchronized long fetchPartitionOffset(@Nonnull OffsetCriteria offsetCriteria, long timeoutMillis)
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/test/java/org/apache/pinot/plugin/stream/kafka09/KafkaPartitionLevelConsumerTest.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/test/java/org/apache/pinot/plugin/stream/kafka09/KafkaPartitionLevelConsumerTest.java
index 43b72a8..9d3091e 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/test/java/org/apache/pinot/plugin/stream/kafka09/KafkaPartitionLevelConsumerTest.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/test/java/org/apache/pinot/plugin/stream/kafka09/KafkaPartitionLevelConsumerTest.java
@@ -267,7 +267,7 @@ public class KafkaPartitionLevelConsumerTest {
   }
 
   @Test
-  public void testGetPartitionCount() {
+  public void testGetPartitionCount() throws Exception {
     String streamType = "kafka";
     String streamKafkaTopicName = "theTopic";
     String streamKafkaBrokerList = "abcd:1234,bcde:2345";
@@ -291,7 +291,7 @@ public class KafkaPartitionLevelConsumerTest {
 
     KafkaStreamMetadataProvider streamMetadataProvider =
         new KafkaStreamMetadataProvider(clientId, streamConfig, mockKafkaSimpleConsumerFactory);
-    Assert.assertEquals(streamMetadataProvider.getPartitionGroupMetadataList(Collections.emptyList(), 10000L), 2);
+    Assert.assertEquals(streamMetadataProvider.getPartitionGroupInfoList(Collections.emptyList(), 10000L), 2);
   }
 
   @Test
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
index eb606f2..ef22b6a 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
@@ -58,10 +58,9 @@ public class KafkaStreamMetadataProvider extends KafkaPartitionLevelConnectionHa
   /**
    * Fetch the partitionGroupMetadata list.
    * @param currentPartitionGroupsMetadata In case of Kafka, each partition group contains a single partition.
-   *                                       Hence current partition groups are not needed to compute the new partition groups
    */
   @Override
-  public List<PartitionGroupInfo> getPartitionGroupMetadataList(
+  public List<PartitionGroupInfo> getPartitionGroupInfoList(
       List<PartitionGroupMetadata> currentPartitionGroupsMetadata, long timeoutMillis)
       throws TimeoutException {
     int partitionCount = _consumer.partitionsFor(_topic, Duration.ofMillis(timeoutMillis)).size();
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupInfoFetcher.java
similarity index 64%
rename from pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java
rename to pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupInfoFetcher.java
index e1ce1a6..d13be10 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupInfoFetcher.java
@@ -27,27 +27,24 @@ import org.slf4j.LoggerFactory;
 /**
  * Fetches the partition count of a stream using the {@link StreamMetadataProvider}
  */
-public class PartitionGroupMetadataFetcher implements Callable<Boolean> {
+public class PartitionGroupInfoFetcher implements Callable<Boolean> {
 
-  private static final Logger LOGGER = LoggerFactory.getLogger(PartitionGroupMetadataFetcher.class);
+  private static final Logger LOGGER = LoggerFactory.getLogger(PartitionGroupInfoFetcher.class);
 
-  private int _partitionCount = -1;
-  private List<PartitionGroupMetadata> _partitionGroupMetadataList;
-  private List<PartitionGroupMetadata> _currentPartitionGroupMetadata;
-  private final StreamConfig _streamConfig;
-  private StreamConsumerFactory _streamConsumerFactory;
+  private List<PartitionGroupInfo> _partitionGroupInfoList;
+  private final List<PartitionGroupMetadata> _currentPartitionGroupMetadata;
+  private final StreamConsumerFactory _streamConsumerFactory;
   private Exception _exception;
   private final String _topicName;
 
-  public PartitionGroupMetadataFetcher(StreamConfig streamConfig, List<PartitionGroupMetadata> currentPartitionGroupMetadataList) {
-    _streamConfig = streamConfig;
-    _streamConsumerFactory = StreamConsumerFactoryProvider.create(_streamConfig);
+  public PartitionGroupInfoFetcher(StreamConfig streamConfig, List<PartitionGroupMetadata> currentPartitionGroupMetadataList) {
+    _streamConsumerFactory = StreamConsumerFactoryProvider.create(streamConfig);
     _topicName = streamConfig.getTopicName();
     _currentPartitionGroupMetadata = currentPartitionGroupMetadataList;
   }
 
-  public List<PartitionGroupMetadata> getPartitionGroupMetadataList() {
-    return _partitionGroupMetadataList;
+  public List<PartitionGroupInfo> getPartitionGroupInfoList() {
+    return _partitionGroupInfoList;
   }
 
   public Exception getException() {
@@ -55,21 +52,19 @@ public class PartitionGroupMetadataFetcher implements Callable<Boolean> {
   }
 
   /**
-   * Callable to fetch the number of partitions of the stream given the stream metadata
-   * @return
-   * @throws Exception
+   * Callable to fetch the partition group info for the stream
    */
   @Override
   public Boolean call()
       throws Exception {
 
-    String clientId = PartitionGroupMetadataFetcher.class.getSimpleName() + "-" + _topicName;
+    String clientId = PartitionGroupInfoFetcher.class.getSimpleName() + "-" + _topicName;
     try (
         StreamMetadataProvider streamMetadataProvider = _streamConsumerFactory.createStreamMetadataProvider(clientId)) {
-      _partitionGroupMetadataList = streamMetadataProvider.getPartitionGroupMetadataList(_currentPartitionGroupMetadata, /*maxWaitTimeMs=*/5000L);
+      _partitionGroupInfoList = streamMetadataProvider.getPartitionGroupInfoList(_currentPartitionGroupMetadata, /*maxWaitTimeMs=*/5000L);
       if (_exception != null) {
         // We had at least one failure, but succeeded now. Log an info
-        LOGGER.info("Successfully retrieved partition count as {} for topic {}", _partitionCount, _topicName);
+        LOGGER.info("Successfully retrieved partition group info for topic {}", _topicName);
       }
       return Boolean.TRUE;
     } catch (TransientConsumerException e) {
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
index a9cd2d6..f595ea3 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
@@ -40,7 +40,7 @@ public interface StreamMetadataProvider extends Closeable {
   int fetchPartitionCount(long timeoutMillis);
 
   // takes the current state of partition groups (groupings of shards, the state of the consumption) and creates the new state
-  List<PartitionGroupInfo> getPartitionGroupMetadataList(
+  List<PartitionGroupInfo> getPartitionGroupInfoList(
       List<PartitionGroupMetadata> currentPartitionGroupsMetadata, long timeoutMillis)
       throws TimeoutException;
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org