You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2021/01/03 02:16:36 UTC

[incubator-pinot] branch sharded_consumer_type_support updated: default methods to avoid interface changes

This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch sharded_consumer_type_support
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/sharded_consumer_type_support by this push:
     new 6f4336e  default methods to avoid interface changes
6f4336e is described below

commit 6f4336e070522a60e787fc5b948d6e00c7eda1d8
Author: Neha Pawar <ne...@gmail.com>
AuthorDate: Sat Jan 2 18:16:09 2021 -0800

    default methods to avoid interface changes
---
 .../realtime/LLRealtimeSegmentDataManager.java     |  3 +-
 .../impl/fakestream/FakeStreamConsumerFactory.java |  8 +---
 .../fakestream/FakeStreamMetadataProvider.java     | 11 -----
 ...lakyConsumerRealtimeClusterIntegrationTest.java |  5 ---
 .../stream/kafka09/KafkaConsumerFactory.java       |  7 ---
 .../kafka09/KafkaStreamMetadataProvider.java       | 46 +------------------
 .../kafka09/KafkaPartitionLevelConsumerTest.java   |  2 +-
 .../stream/kafka20/KafkaConsumerFactory.java       |  7 ---
 .../kafka20/KafkaStreamMetadataProvider.java       | 52 ----------------------
 .../spi/stream/PartitionGroupInfoFetcher.java      |  4 +-
 .../pinot/spi/stream/StreamConsumerFactory.java    |  4 +-
 .../pinot/spi/stream/StreamMetadataProvider.java   | 36 ++++++++++++---
 12 files changed, 43 insertions(+), 142 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index 3d84738..7d82b4e 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -1240,7 +1240,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
         int numPartitions = columnPartitionConfig.getNumPartitions();
         try {
           // fixme: get this from ideal state
-          int numStreamPartitions = _streamMetadataProvider.getPartitionGroupInfoList(Collections.emptyList(), 5000).size();
+          int numStreamPartitions = _streamMetadataProvider
+              .getPartitionGroupInfoList(_clientId, _partitionLevelStreamConfig, Collections.emptyList(), 5000).size();
           if (numStreamPartitions != numPartitions) {
             segmentLogger.warn(
                 "Number of stream partitions: {} does not match number of partitions in the partition config: {}, using number of stream partitions",
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java
index fbeb808..b0dc7eb 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java
@@ -69,12 +69,6 @@ public class FakeStreamConsumerFactory extends StreamConsumerFactory {
     return new FakeStreamMetadataProvider(_streamConfig);
   }
 
-
-  @Override
-  public PartitionGroupConsumer createPartitionGroupConsumer(String clientId, PartitionGroupMetadata metadata) {
-    return null;
-  }
-
   public static void main(String[] args)
       throws Exception {
     String clientId = "client_id_localhost_tester";
@@ -88,7 +82,7 @@ public class FakeStreamConsumerFactory extends StreamConsumerFactory {
 
     // stream metadata provider
     StreamMetadataProvider streamMetadataProvider = streamConsumerFactory.createStreamMetadataProvider(clientId);
-    int partitionCount = streamMetadataProvider.getPartitionGroupInfoList(Collections.emptyList(), 10_000).size();
+    int partitionCount = streamMetadataProvider.getPartitionGroupInfoList("clientId", streamConfig, Collections.emptyList(), 10_000).size();
     System.out.println(partitionCount);
 
     // Partition metadata provider
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMetadataProvider.java b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMetadataProvider.java
index 0de0ce2..61aa01f 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMetadataProvider.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMetadataProvider.java
@@ -48,17 +48,6 @@ public class FakeStreamMetadataProvider implements StreamMetadataProvider {
     return _numPartitions;
   }
 
-  @Override
-  public List<PartitionGroupInfo> getPartitionGroupInfoList(
-      List<PartitionGroupMetadata> currentPartitionGroupsMetadata, long timeoutMillis)
-      throws TimeoutException {
-    List<PartitionGroupInfo> partitionGroupMetadataList = new ArrayList<>();
-    for (int i = 0; i < _numPartitions; i++) {
-      partitionGroupMetadataList.add(new PartitionGroupInfo(i, fetchStreamPartitionOffset(_streamConfig.getOffsetCriteria(), 5000).toString()));
-    }
-    return partitionGroupMetadataList;
-  }
-
   public long fetchPartitionOffset(@Nonnull OffsetCriteria offsetCriteria, long timeoutMillis) throws TimeoutException {
     throw new UnsupportedOperationException("This method is deprecated");
   }
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/FlakyConsumerRealtimeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/FlakyConsumerRealtimeClusterIntegrationTest.java
index c7523e3..4503de0 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/FlakyConsumerRealtimeClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/FlakyConsumerRealtimeClusterIntegrationTest.java
@@ -119,10 +119,5 @@ public class FlakyConsumerRealtimeClusterIntegrationTest extends RealtimeCluster
     public StreamMetadataProvider createStreamMetadataProvider(String clientId) {
       throw new UnsupportedOperationException();
     }
-
-    @Override
-    public PartitionGroupConsumer createPartitionGroupConsumer(String clientId, PartitionGroupMetadata metadata) {
-      return null;
-    }
   }
 }
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaConsumerFactory.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaConsumerFactory.java
index fe5a461..615e354 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaConsumerFactory.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaConsumerFactory.java
@@ -19,8 +19,6 @@
 package org.apache.pinot.plugin.stream.kafka09;
 
 import java.util.Set;
-import org.apache.pinot.spi.stream.PartitionGroupConsumer;
-import org.apache.pinot.spi.stream.PartitionGroupMetadata;
 import org.apache.pinot.spi.stream.PartitionLevelConsumer;
 import org.apache.pinot.spi.stream.StreamConsumerFactory;
 import org.apache.pinot.spi.stream.StreamLevelConsumer;
@@ -52,9 +50,4 @@ public class KafkaConsumerFactory extends StreamConsumerFactory {
   public StreamMetadataProvider createStreamMetadataProvider(String clientId) {
     return new KafkaStreamMetadataProvider(clientId, _streamConfig);
   }
-
-  @Override
-  public PartitionGroupConsumer createPartitionGroupConsumer(String clientId, PartitionGroupMetadata metadata) {
-    return null;
-  }
 }
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaStreamMetadataProvider.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaStreamMetadataProvider.java
index 2d0ad31..06ee697 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaStreamMetadataProvider.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaStreamMetadataProvider.java
@@ -22,13 +22,9 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.Uninterruptibles;
 import java.io.IOException;
-import java.time.Duration;
-import java.util.ArrayList;
 import java.util.Collections;
-import java.util.List;
 import java.util.concurrent.TimeUnit;
 import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
 import kafka.api.PartitionOffsetRequestInfo;
 import kafka.common.TopicAndPartition;
 import kafka.javaapi.OffsetRequest;
@@ -40,8 +36,6 @@ import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.pinot.spi.stream.LongMsgOffset;
 import org.apache.pinot.spi.stream.OffsetCriteria;
-import org.apache.pinot.spi.stream.PartitionGroupInfo;
-import org.apache.pinot.spi.stream.PartitionGroupMetadata;
 import org.apache.pinot.spi.stream.StreamConfig;
 import org.apache.pinot.spi.stream.StreamMetadataProvider;
 import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
@@ -55,14 +49,13 @@ import org.slf4j.LoggerFactory;
 public class KafkaStreamMetadataProvider extends KafkaConnectionHandler implements StreamMetadataProvider {
   private static final Logger LOGGER = LoggerFactory.getLogger(KafkaStreamMetadataProvider.class);
 
-  private StreamConfig _streamConfig;
-
   /**
    * Create a partition specific metadata provider
+   * @param streamConfig
+   * @param partition
    */
   public KafkaStreamMetadataProvider(String clientId, StreamConfig streamConfig, int partition) {
     super(clientId, streamConfig, partition, new KafkaSimpleConsumerFactoryImpl());
-    _streamConfig = streamConfig;
   }
 
   /**
@@ -71,21 +64,18 @@ public class KafkaStreamMetadataProvider extends KafkaConnectionHandler implemen
    */
   public KafkaStreamMetadataProvider(String clientId, StreamConfig streamConfig) {
     super(clientId, streamConfig, new KafkaSimpleConsumerFactoryImpl());
-    _streamConfig = streamConfig;
   }
 
   @VisibleForTesting
   public KafkaStreamMetadataProvider(String clientId, StreamConfig streamConfig, int partition,
       KafkaSimpleConsumerFactory kafkaSimpleConsumerFactory) {
     super(clientId, streamConfig, partition, kafkaSimpleConsumerFactory);
-    _streamConfig = streamConfig;
   }
 
   @VisibleForTesting
   public KafkaStreamMetadataProvider(String clientId, StreamConfig streamConfig,
       KafkaSimpleConsumerFactory kafkaSimpleConsumerFactory) {
     super(clientId, streamConfig, kafkaSimpleConsumerFactory);
-    _streamConfig = streamConfig;
   }
 
   /**
@@ -94,12 +84,7 @@ public class KafkaStreamMetadataProvider extends KafkaConnectionHandler implemen
    * @return
    */
   @Override
-  @Deprecated
   public synchronized int fetchPartitionCount(long timeoutMillis) {
-    return fetchPartitionCountInternal(timeoutMillis);
-  }
-
-  private int fetchPartitionCountInternal(long timeoutMillis) {
     int unknownTopicReplyCount = 0;
     final int MAX_UNKNOWN_TOPIC_REPLY_COUNT = 10;
     int kafkaErrorCount = 0;
@@ -160,33 +145,6 @@ public class KafkaStreamMetadataProvider extends KafkaConnectionHandler implemen
     throw new TimeoutException();
   }
 
-  /**
-   * Fetch the partitionGroupMetadata list.
-   * @param currentPartitionGroupsMetadata In case of Kafka, each partition group contains a single partition.
-   */
-  @Override
-  public List<PartitionGroupInfo> getPartitionGroupInfoList(
-      List<PartitionGroupMetadata> currentPartitionGroupsMetadata, long timeoutMillis)
-      throws java.util.concurrent.TimeoutException {
-    int partitionCount = fetchPartitionCountInternal(timeoutMillis);
-    List<PartitionGroupInfo> newPartitionGroupInfoList = new ArrayList<>(partitionCount);
-
-    // add a PartitionGroupInfo into the list foreach partition already present in current.
-    // the end checkpoint is set as checkpoint
-    for (PartitionGroupMetadata currentPartitionGroupMetadata : currentPartitionGroupsMetadata) {
-      newPartitionGroupInfoList.add(new PartitionGroupInfo(currentPartitionGroupMetadata.getPartitionGroupId(),
-          currentPartitionGroupMetadata.getEndCheckpoint()));
-    }
-    // add PartitiongroupInfo for new partitions
-    // use offset criteria from stream config
-    for (int i = currentPartitionGroupsMetadata.size(); i < partitionCount; i++) {
-      StreamPartitionMsgOffset streamPartitionMsgOffset =
-          fetchStreamPartitionOffset(_streamConfig.getOffsetCriteria(), 5000);
-      newPartitionGroupInfoList.add(new PartitionGroupInfo(i, streamPartitionMsgOffset.toString()));
-    }
-    return newPartitionGroupInfoList;
-  }
-
   public synchronized long fetchPartitionOffset(@Nonnull OffsetCriteria offsetCriteria, long timeoutMillis)
       throws java.util.concurrent.TimeoutException {
     throw new UnsupportedOperationException("The use of this method s not supported");
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/test/java/org/apache/pinot/plugin/stream/kafka09/KafkaPartitionLevelConsumerTest.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/test/java/org/apache/pinot/plugin/stream/kafka09/KafkaPartitionLevelConsumerTest.java
index 9d3091e..90dc5ad 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/test/java/org/apache/pinot/plugin/stream/kafka09/KafkaPartitionLevelConsumerTest.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/test/java/org/apache/pinot/plugin/stream/kafka09/KafkaPartitionLevelConsumerTest.java
@@ -291,7 +291,7 @@ public class KafkaPartitionLevelConsumerTest {
 
     KafkaStreamMetadataProvider streamMetadataProvider =
         new KafkaStreamMetadataProvider(clientId, streamConfig, mockKafkaSimpleConsumerFactory);
-    Assert.assertEquals(streamMetadataProvider.getPartitionGroupInfoList(Collections.emptyList(), 10000L), 2);
+    Assert.assertEquals(streamMetadataProvider.getPartitionGroupInfoList("clientId", streamConfig, Collections.emptyList(), 10000), 2);
   }
 
   @Test
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConsumerFactory.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConsumerFactory.java
index b6746ff..e0d1015 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConsumerFactory.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConsumerFactory.java
@@ -19,8 +19,6 @@
 package org.apache.pinot.plugin.stream.kafka20;
 
 import java.util.Set;
-import org.apache.pinot.spi.stream.PartitionGroupConsumer;
-import org.apache.pinot.spi.stream.PartitionGroupMetadata;
 import org.apache.pinot.spi.stream.PartitionLevelConsumer;
 import org.apache.pinot.spi.stream.StreamConsumerFactory;
 import org.apache.pinot.spi.stream.StreamLevelConsumer;
@@ -49,9 +47,4 @@ public class KafkaConsumerFactory extends StreamConsumerFactory {
   public StreamMetadataProvider createStreamMetadataProvider(String clientId) {
     return new KafkaStreamMetadataProvider(clientId, _streamConfig);
   }
-
-  @Override
-  public PartitionGroupConsumer createPartitionGroupConsumer(String clientId, PartitionGroupMetadata metadata) {
-    return new KafkaPartitionLevelConsumer(clientId, _streamConfig, metadata.getPartitionGroupId());
-  }
 }
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
index 1d3162a..38c49f5 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
@@ -21,17 +21,11 @@ package org.apache.pinot.plugin.stream.kafka20;
 import com.google.common.base.Preconditions;
 import java.io.IOException;
 import java.time.Duration;
-import java.util.ArrayList;
 import java.util.Collections;
-import java.util.List;
 import java.util.concurrent.TimeoutException;
 import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
-import org.apache.kafka.common.TopicPartition;
 import org.apache.pinot.spi.stream.LongMsgOffset;
 import org.apache.pinot.spi.stream.OffsetCriteria;
-import org.apache.pinot.spi.stream.PartitionGroupInfo;
-import org.apache.pinot.spi.stream.PartitionGroupMetadata;
 import org.apache.pinot.spi.stream.StreamConfig;
 import org.apache.pinot.spi.stream.StreamMetadataProvider;
 import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
@@ -39,15 +33,12 @@ import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
 
 public class KafkaStreamMetadataProvider extends KafkaPartitionLevelConnectionHandler implements StreamMetadataProvider {
 
-  private StreamConfig _streamConfig;
-
   public KafkaStreamMetadataProvider(String clientId, StreamConfig streamConfig) {
     this(clientId, streamConfig, Integer.MIN_VALUE);
   }
 
   public KafkaStreamMetadataProvider(String clientId, StreamConfig streamConfig, int partition) {
     super(clientId, streamConfig, partition);
-    _streamConfig = streamConfig;
   }
 
   @Override
@@ -56,33 +47,6 @@ public class KafkaStreamMetadataProvider extends KafkaPartitionLevelConnectionHa
     return _consumer.partitionsFor(_topic, Duration.ofMillis(timeoutMillis)).size();
   }
 
-  /**
-   * Fetch the partitionGroupMetadata list.
-   * @param currentPartitionGroupsMetadata In case of Kafka, each partition group contains a single partition.
-   */
-  @Override
-  public List<PartitionGroupInfo> getPartitionGroupInfoList(
-      List<PartitionGroupMetadata> currentPartitionGroupsMetadata, long timeoutMillis)
-      throws TimeoutException {
-    int partitionCount = _consumer.partitionsFor(_topic, Duration.ofMillis(timeoutMillis)).size();
-    List<PartitionGroupInfo> newPartitionGroupInfoList = new ArrayList<>(partitionCount);
-
-    // add a PartitionGroupInfo into the list foreach partition already present in current.
-    // the end checkpoint is set as checkpoint
-    for (PartitionGroupMetadata currentPartitionGroupMetadata : currentPartitionGroupsMetadata) {
-      newPartitionGroupInfoList.add(new PartitionGroupInfo(currentPartitionGroupMetadata.getPartitionGroupId(),
-          currentPartitionGroupMetadata.getEndCheckpoint()));
-    }
-    // add PartitiongroupInfo for new partitions
-    // use offset criteria from stream config
-    for (int i = currentPartitionGroupsMetadata.size(); i < partitionCount; i++) {
-      StreamPartitionMsgOffset streamPartitionMsgOffset =
-          fetchStreamPartitionOffsetInternal(i, _streamConfig.getOffsetCriteria(), 5000);
-      newPartitionGroupInfoList.add(new PartitionGroupInfo(i, streamPartitionMsgOffset.toString()));
-    }
-    return newPartitionGroupInfoList;
-  }
-
   public synchronized long fetchPartitionOffset(@Nonnull OffsetCriteria offsetCriteria, long timeoutMillis)
       throws java.util.concurrent.TimeoutException {
     throw new UnsupportedOperationException("The use of this method is not supported");
@@ -105,22 +69,6 @@ public class KafkaStreamMetadataProvider extends KafkaPartitionLevelConnectionHa
     return new LongMsgOffset(offset);
   }
 
-  private StreamPartitionMsgOffset fetchStreamPartitionOffsetInternal(int partitionId, @Nonnull OffsetCriteria offsetCriteria, long timeoutMillis) {
-    Preconditions.checkNotNull(offsetCriteria);
-    TopicPartition topicPartition = new TopicPartition(_topic, partitionId);
-    long offset = -1;
-    if (offsetCriteria.isLargest()) {
-      offset =  _consumer.endOffsets(Collections.singletonList(topicPartition), Duration.ofMillis(timeoutMillis))
-          .get(topicPartition);
-    } else if (offsetCriteria.isSmallest()) {
-      offset =  _consumer.beginningOffsets(Collections.singletonList(topicPartition), Duration.ofMillis(timeoutMillis))
-          .get(topicPartition);
-    } else {
-      throw new IllegalArgumentException("Unknown initial offset value " + offsetCriteria.toString());
-    }
-    return new LongMsgOffset(offset);
-  }
-
   @Override
   public void close()
       throws IOException {
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupInfoFetcher.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupInfoFetcher.java
index d13be10..f2d3f17 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupInfoFetcher.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupInfoFetcher.java
@@ -32,6 +32,7 @@ public class PartitionGroupInfoFetcher implements Callable<Boolean> {
   private static final Logger LOGGER = LoggerFactory.getLogger(PartitionGroupInfoFetcher.class);
 
   private List<PartitionGroupInfo> _partitionGroupInfoList;
+  private final StreamConfig _streamConfig;
   private final List<PartitionGroupMetadata> _currentPartitionGroupMetadata;
   private final StreamConsumerFactory _streamConsumerFactory;
   private Exception _exception;
@@ -40,6 +41,7 @@ public class PartitionGroupInfoFetcher implements Callable<Boolean> {
   public PartitionGroupInfoFetcher(StreamConfig streamConfig, List<PartitionGroupMetadata> currentPartitionGroupMetadataList) {
     _streamConsumerFactory = StreamConsumerFactoryProvider.create(streamConfig);
     _topicName = streamConfig.getTopicName();
+    _streamConfig = streamConfig;
     _currentPartitionGroupMetadata = currentPartitionGroupMetadataList;
   }
 
@@ -61,7 +63,7 @@ public class PartitionGroupInfoFetcher implements Callable<Boolean> {
     String clientId = PartitionGroupInfoFetcher.class.getSimpleName() + "-" + _topicName;
     try (
         StreamMetadataProvider streamMetadataProvider = _streamConsumerFactory.createStreamMetadataProvider(clientId)) {
-      _partitionGroupInfoList = streamMetadataProvider.getPartitionGroupInfoList(_currentPartitionGroupMetadata, /*maxWaitTimeMs=*/5000L);
+      _partitionGroupInfoList = streamMetadataProvider.getPartitionGroupInfoList(clientId, _streamConfig, _currentPartitionGroupMetadata, /*maxWaitTimeMs=*/5000);
       if (_exception != null) {
         // We had at least one failure, but succeeded now. Log an info
         LOGGER.info("Successfully retrieved partition group info for topic {}", _topicName);
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java
index db48a83..f993fed 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java
@@ -75,5 +75,7 @@ public abstract class StreamConsumerFactory {
   }
 
   // creates a consumer which consumes from a partition group
-  public abstract PartitionGroupConsumer createPartitionGroupConsumer(String clientId, PartitionGroupMetadata metadata);
+  public PartitionGroupConsumer createPartitionGroupConsumer(String clientId, PartitionGroupMetadata metadata) {
+    return createPartitionLevelConsumer(clientId, metadata.getPartitionGroupId());
+  }
 }
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
index f595ea3..572cd02 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
@@ -19,6 +19,8 @@
 package org.apache.pinot.spi.stream;
 
 import java.io.Closeable;
+import java.time.Duration;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.TimeoutException;
 import javax.annotation.Nonnull;
@@ -39,11 +41,6 @@ public interface StreamMetadataProvider extends Closeable {
   @Deprecated
   int fetchPartitionCount(long timeoutMillis);
 
-  // takes the current state of partition groups (groupings of shards, the state of the consumption) and creates the new state
-  List<PartitionGroupInfo> getPartitionGroupInfoList(
-      List<PartitionGroupMetadata> currentPartitionGroupsMetadata, long timeoutMillis)
-      throws TimeoutException;
-
   // Issue 5953 Retain this interface for 0.5.0, remove in 0.6.0
   @Deprecated
   long fetchPartitionOffset(@Nonnull OffsetCriteria offsetCriteria, long timeoutMillis)
@@ -60,4 +57,33 @@ public interface StreamMetadataProvider extends Closeable {
     long offset = fetchPartitionOffset(offsetCriteria, timeoutMillis);
     return new LongMsgOffset(offset);
   }
+
+  /**
+   * Fetch the partitionGroupMetadata list.
+   * @param currentPartitionGroupsMetadata In case of Kafka, each partition group contains a single partition.
+   */
+  default List<PartitionGroupInfo> getPartitionGroupInfoList(String clientId, StreamConfig streamConfig,
+      List<PartitionGroupMetadata> currentPartitionGroupsMetadata, int timeoutMillis)
+      throws TimeoutException {
+    int partitionCount = fetchPartitionCount(timeoutMillis);
+    List<PartitionGroupInfo> newPartitionGroupInfoList = new ArrayList<>(partitionCount);
+
+    // add a PartitionGroupInfo into the list foreach partition already present in current.
+    // the end checkpoint is set as checkpoint
+    for (PartitionGroupMetadata currentPartitionGroupMetadata : currentPartitionGroupsMetadata) {
+      newPartitionGroupInfoList.add(new PartitionGroupInfo(currentPartitionGroupMetadata.getPartitionGroupId(),
+          currentPartitionGroupMetadata.getEndCheckpoint()));
+    }
+    // add PartitiongroupInfo for new partitions
+    // use offset criteria from stream config
+    StreamConsumerFactory streamConsumerFactory = StreamConsumerFactoryProvider.create(streamConfig);
+    for (int i = currentPartitionGroupsMetadata.size(); i < partitionCount; i++) {
+      StreamMetadataProvider partitionMetadataProvider =
+          streamConsumerFactory.createPartitionMetadataProvider(clientId, i);
+      StreamPartitionMsgOffset streamPartitionMsgOffset =
+          partitionMetadataProvider.fetchStreamPartitionOffset(streamConfig.getOffsetCriteria(), timeoutMillis);
+      newPartitionGroupInfoList.add(new PartitionGroupInfo(i, streamPartitionMsgOffset.toString()));
+    }
+    return newPartitionGroupInfoList;
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org