You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2021/01/03 01:13:41 UTC

[incubator-pinot] branch sharded_consumer_type_support updated: Server side changes and some fixes

This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch sharded_consumer_type_support
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/sharded_consumer_type_support by this push:
     new 9aa7e58  Server side changes and some fixes
9aa7e58 is described below

commit 9aa7e58677af4f00969777d547e32c57ec9e16d8
Author: Neha Pawar <ne...@gmail.com>
AuthorDate: Sat Jan 2 17:10:21 2021 -0800

    Server side changes and some fixes
---
 .../realtime/LLRealtimeSegmentDataManager.java     | 43 ++++++++++------------
 .../realtime/SegmentBuildTimeLeaseExtender.java    |  3 +-
 .../realtime/LLRealtimeSegmentDataManagerTest.java |  7 ++--
 .../impl/fakestream/FakeStreamConsumerFactory.java |  2 +-
 ...lakyConsumerRealtimeClusterIntegrationTest.java |  2 +-
 .../stream/kafka09/KafkaConsumerFactory.java       |  2 +-
 .../stream/kafka20/KafkaConsumerFactory.java       |  4 +-
 .../kafka20/KafkaPartitionLevelConsumer.java       |  1 +
 .../kafka20/KafkaStreamMetadataProvider.java       | 19 +++++++++-
 ...y.java => PartitionGroupCheckpointFactory.java} |  6 +--
 .../pinot/spi/stream/PartitionGroupConsumer.java   |  4 +-
 .../pinot/spi/stream/PartitionLevelConsumer.java   |  8 +++-
 .../pinot/spi/stream/StreamConsumerFactory.java    |  2 +-
 .../stream/StreamPartitionMsgOffsetFactory.java    |  6 ++-
 14 files changed, 69 insertions(+), 40 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index dabe748..3d84738 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -34,7 +34,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import javax.annotation.Nullable;
@@ -71,8 +70,9 @@ import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
-import org.apache.pinot.spi.stream.FetchResult;
+import org.apache.pinot.spi.stream.Checkpoint;
 import org.apache.pinot.spi.stream.MessageBatch;
+import org.apache.pinot.spi.stream.PartitionGroupCheckpointFactory;
 import org.apache.pinot.spi.stream.PartitionGroupConsumer;
 import org.apache.pinot.spi.stream.PartitionGroupMetadata;
 import org.apache.pinot.spi.stream.PartitionLevelStreamConfig;
@@ -83,8 +83,6 @@ import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
 import org.apache.pinot.spi.stream.StreamDecoderProvider;
 import org.apache.pinot.spi.stream.StreamMessageDecoder;
 import org.apache.pinot.spi.stream.StreamMetadataProvider;
-import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
-import org.apache.pinot.spi.stream.StreamPartitionMsgOffsetFactory;
 import org.apache.pinot.spi.stream.TransientConsumerException;
 import org.apache.pinot.spi.utils.IngestionConfigUtils;
 import org.joda.time.DateTime;
@@ -152,13 +150,13 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
   public class SegmentBuildDescriptor {
     final File _segmentTarFile;
     final Map<String, File> _metadataFileMap;
-    final StreamPartitionMsgOffset _offset;
+    final Checkpoint _offset;
     final long _waitTimeMillis;
     final long _buildTimeMillis;
     final long _segmentSizeBytes;
 
     public SegmentBuildDescriptor(@Nullable File segmentTarFile, @Nullable Map<String, File> metadataFileMap,
-        StreamPartitionMsgOffset offset, long buildTimeMillis, long waitTimeMillis, long segmentSizeBytes) {
+        Checkpoint offset, long buildTimeMillis, long waitTimeMillis, long segmentSizeBytes) {
       _segmentTarFile = segmentTarFile;
       _metadataFileMap = metadataFileMap;
       _offset = _streamPartitionMsgOffsetFactory.create(offset);
@@ -167,7 +165,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
       _segmentSizeBytes = segmentSizeBytes;
     }
 
-    public StreamPartitionMsgOffset getOffset() {
+    public Checkpoint getOffset() {
       return _offset;
     }
 
@@ -225,7 +223,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
   private final String _metricKeyName;
   private final ServerMetrics _serverMetrics;
   private final MutableSegmentImpl _realtimeSegment;
-  private StreamPartitionMsgOffset _currentOffset;
+  private Checkpoint _currentOffset;
   private volatile State _state;
   private volatile int _numRowsConsumed = 0;
   private volatile int _numRowsIndexed = 0; // Can be different from _numRowsConsumed when metrics update is enabled.
@@ -237,11 +235,11 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
   private final SegmentBuildTimeLeaseExtender _leaseExtender;
   private SegmentBuildDescriptor _segmentBuildDescriptor;
   private StreamConsumerFactory _streamConsumerFactory;
-  private StreamPartitionMsgOffsetFactory _streamPartitionMsgOffsetFactory;
+  private PartitionGroupCheckpointFactory _streamPartitionMsgOffsetFactory;
 
   // Segment end criteria
   private volatile long _consumeEndTime = 0;
-  private StreamPartitionMsgOffset _finalOffset; // Used when we want to catch up to this one
+  private Checkpoint _finalOffset; // Used when we want to catch up to this one
   private volatile boolean _shouldStop = false;
 
   // It takes 30s to locate controller leader, and more if there are multiple controller failures.
@@ -271,7 +269,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
   private final String _instanceId;
   private final ServerSegmentCompletionProtocolHandler _protocolHandler;
   private final long _consumeStartTime;
-  private final StreamPartitionMsgOffset _startOffset;
+  private final Checkpoint _startOffset;
   private final PartitionLevelStreamConfig _partitionLevelStreamConfig;
 
   private long _lastLogTime = 0;
@@ -370,7 +368,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
     final long idlePipeSleepTimeMillis = 100;
     final long maxIdleCountBeforeStatUpdate = (3 * 60 * 1000) / (idlePipeSleepTimeMillis + _partitionLevelStreamConfig
         .getFetchTimeoutMillis());  // 3 minute count
-    StreamPartitionMsgOffset lastUpdatedOffset = _streamPartitionMsgOffsetFactory
+    Checkpoint lastUpdatedOffset = _streamPartitionMsgOffsetFactory
         .create(_currentOffset);  // so that we always update the metric when we enter this method.
     long consecutiveIdleCount = 0;
     // At this point, we know that we can potentially move the offset, so the old saved segment file is not valid
@@ -383,9 +381,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
       // Update _currentOffset upon return from this method
       MessageBatch messageBatch;
       try {
-        FetchResult fetchResult = _partitionGroupConsumer
+        messageBatch = _partitionGroupConsumer
             .fetchMessages(_currentOffset, null, _partitionLevelStreamConfig.getFetchTimeoutMillis());
-        messageBatch = fetchResult.getMessages();
         consecutiveErrorCount = 0;
       } catch (TransientConsumerException e) {
         handleTransientStreamErrors(e);
@@ -559,7 +556,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
           _state = State.HOLDING;
           SegmentCompletionProtocol.Response response = postSegmentConsumedMsg();
           SegmentCompletionProtocol.ControllerResponseStatus status = response.getStatus();
-          StreamPartitionMsgOffset rspOffset = extractOffset(response);
+          Checkpoint rspOffset = extractOffset(response);
           boolean success;
           switch (status) {
             case NOT_LEADER:
@@ -665,7 +662,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
   }
 
   @VisibleForTesting
-  protected StreamPartitionMsgOffset extractOffset(SegmentCompletionProtocol.Response response) {
+  protected Checkpoint extractOffset(SegmentCompletionProtocol.Response response) {
     if (response.getStreamPartitionMsgOffset() != null) {
       return _streamPartitionMsgOffsetFactory.create(response.getStreamPartitionMsgOffset());
     } else {
@@ -721,7 +718,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
   }
 
   @VisibleForTesting
-  protected StreamPartitionMsgOffset getCurrentOffset() {
+  protected Checkpoint getCurrentOffset() {
     return _currentOffset;
   }
 
@@ -890,14 +887,14 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
   }
 
   private void closeKafkaConsumers() {
-    closePartitionLevelConsumer();
+    closePartitionGroupConsumer();
     closeStreamMetadataProvider();
     if (_acquiredConsumerSemaphore.compareAndSet(true, false)) {
       _partitionGroupConsumerSemaphore.release();
     }
   }
 
-  private void closePartitionLevelConsumer() {
+  private void closePartitionGroupConsumer() {
     try {
       _partitionGroupConsumer.close();
     } catch (Exception e) {
@@ -965,7 +962,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
       // Remove the segment file before we do anything else.
       removeSegmentFile();
       _leaseExtender.removeSegment(_segmentNameStr);
-      final StreamPartitionMsgOffset endOffset = _streamPartitionMsgOffsetFactory.create(llcMetadata.getEndOffset());
+      final Checkpoint endOffset = _streamPartitionMsgOffsetFactory.create(llcMetadata.getEndOffset());
       segmentLogger
           .info("State: {}, transitioning from CONSUMING to ONLINE (startOffset: {}, endOffset: {})", _state.toString(),
               _startOffset, endOffset);
@@ -1042,7 +1039,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
     return System.currentTimeMillis();
   }
 
-  private boolean catchupToFinalOffset(StreamPartitionMsgOffset endOffset, long timeoutMs) {
+  private boolean catchupToFinalOffset(Checkpoint endOffset, long timeoutMs) {
     _finalOffset = endOffset;
     _consumeEndTime = now() + timeoutMs;
     _state = State.CONSUMING_TO_ONLINE;
@@ -1310,10 +1307,10 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
    */
   private void makeStreamConsumer(String reason) {
     if (_partitionGroupConsumer != null) {
-      closePartitionLevelConsumer();
+      closePartitionGroupConsumer();
     }
     segmentLogger.info("Creating new stream consumer, reason: {}", reason);
-    _partitionGroupConsumer = _streamConsumerFactory.createPartitionGroupConsumer(_partitionGroupMetadata);
+    _partitionGroupConsumer = _streamConsumerFactory.createPartitionGroupConsumer(_clientId, _partitionGroupMetadata);
   }
 
   /**
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentBuildTimeLeaseExtender.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentBuildTimeLeaseExtender.java
index 69d7e80..b1a1342 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentBuildTimeLeaseExtender.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentBuildTimeLeaseExtender.java
@@ -29,6 +29,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
 import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler;
+import org.apache.pinot.spi.stream.Checkpoint;
 import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -91,7 +92,7 @@ public class SegmentBuildTimeLeaseExtender {
    * @param initialBuildTimeMs is the initial time budget that SegmentCompletionManager has allocated.
    * @param offset The offset at which this segment is being built.
    */
-  public void addSegment(String segmentId, long initialBuildTimeMs, StreamPartitionMsgOffset offset) {
+  public void addSegment(String segmentId, long initialBuildTimeMs, Checkpoint offset) {
     final long initialDelayMs = initialBuildTimeMs * 9 / 10;
     final SegmentCompletionProtocol.Request.Params reqParams = new SegmentCompletionProtocol.Request.Params();
     reqParams.withStreamPartitionMsgOffset(offset.toString()).withSegmentName(segmentId).withExtraTimeSec(EXTRA_TIME_SECONDS)
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
index d09bdeb..d7aec8d 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
@@ -46,6 +46,7 @@ import org.apache.pinot.core.upsert.PartitionUpsertMetadataManager;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.stream.Checkpoint;
 import org.apache.pinot.spi.stream.LongMsgOffset;
 import org.apache.pinot.spi.stream.LongMsgOffsetFactory;
 import org.apache.pinot.spi.stream.PermanentConsumerException;
@@ -193,7 +194,7 @@ public class LLRealtimeSegmentDataManagerTest {
               + "  \"status\" : \"CATCH_UP\""
               + "}";
       SegmentCompletionProtocol.Response response = SegmentCompletionProtocol.Response.fromJsonString(responseStr);
-      StreamPartitionMsgOffset extractedOffset = segmentDataManager.extractOffset(response);
+      Checkpoint extractedOffset = segmentDataManager.extractOffset(response);
       Assert.assertEquals(extractedOffset.compareTo(new LongMsgOffset(offset)), 0);
     }
     {
@@ -207,7 +208,7 @@ public class LLRealtimeSegmentDataManagerTest {
               + "  \"status\" : \"CATCH_UP\""
               + "}";
       SegmentCompletionProtocol.Response response = SegmentCompletionProtocol.Response.fromJsonString(responseStr);
-      StreamPartitionMsgOffset extractedOffset = segmentDataManager.extractOffset(response);
+      Checkpoint extractedOffset = segmentDataManager.extractOffset(response);
       Assert.assertEquals(extractedOffset.compareTo(new LongMsgOffset(offset)), 0);
     }
     {
@@ -221,7 +222,7 @@ public class LLRealtimeSegmentDataManagerTest {
               + "  \"status\" : \"CATCH_UP\""
               + "}";
       SegmentCompletionProtocol.Response response = SegmentCompletionProtocol.Response.fromJsonString(responseStr);
-      StreamPartitionMsgOffset extractedOffset = segmentDataManager.extractOffset(response);
+      Checkpoint extractedOffset = segmentDataManager.extractOffset(response);
       Assert.assertEquals(extractedOffset.compareTo(new LongMsgOffset(offset)), 0);
     }
     segmentDataManager.destroy();
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java
index 6121eef..fbeb808 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java
@@ -71,7 +71,7 @@ public class FakeStreamConsumerFactory extends StreamConsumerFactory {
 
 
   @Override
-  public PartitionGroupConsumer createPartitionGroupConsumer(PartitionGroupMetadata metadata) {
+  public PartitionGroupConsumer createPartitionGroupConsumer(String clientId, PartitionGroupMetadata metadata) {
     return null;
   }
 
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/FlakyConsumerRealtimeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/FlakyConsumerRealtimeClusterIntegrationTest.java
index d917d73..c7523e3 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/FlakyConsumerRealtimeClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/FlakyConsumerRealtimeClusterIntegrationTest.java
@@ -121,7 +121,7 @@ public class FlakyConsumerRealtimeClusterIntegrationTest extends RealtimeCluster
     }
 
     @Override
-    public PartitionGroupConsumer createPartitionGroupConsumer(PartitionGroupMetadata metadata) {
+    public PartitionGroupConsumer createPartitionGroupConsumer(String clientId, PartitionGroupMetadata metadata) {
       return null;
     }
   }
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaConsumerFactory.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaConsumerFactory.java
index 82c282c..fe5a461 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaConsumerFactory.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaConsumerFactory.java
@@ -54,7 +54,7 @@ public class KafkaConsumerFactory extends StreamConsumerFactory {
   }
 
   @Override
-  public PartitionGroupConsumer createPartitionGroupConsumer(PartitionGroupMetadata metadata) {
+  public PartitionGroupConsumer createPartitionGroupConsumer(String clientId, PartitionGroupMetadata metadata) {
     return null;
   }
 }
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConsumerFactory.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConsumerFactory.java
index c73aacb..b6746ff 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConsumerFactory.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConsumerFactory.java
@@ -51,7 +51,7 @@ public class KafkaConsumerFactory extends StreamConsumerFactory {
   }
 
   @Override
-  public PartitionGroupConsumer createPartitionGroupConsumer(PartitionGroupMetadata metadata) {
-    return null;
+  public PartitionGroupConsumer createPartitionGroupConsumer(String clientId, PartitionGroupMetadata metadata) {
+    return new KafkaPartitionLevelConsumer(clientId, _streamConfig, metadata.getPartitionGroupId());
   }
 }
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java
index f9b4365..25b1742 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java
@@ -26,6 +26,7 @@ import java.util.concurrent.TimeoutException;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.pinot.spi.stream.Checkpoint;
 import org.apache.pinot.spi.stream.LongMsgOffset;
 import org.apache.pinot.spi.stream.MessageBatch;
 import org.apache.pinot.spi.stream.PartitionLevelConsumer;
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
index ef22b6a..1d3162a 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.concurrent.TimeoutException;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.pinot.spi.stream.LongMsgOffset;
 import org.apache.pinot.spi.stream.OffsetCriteria;
 import org.apache.pinot.spi.stream.PartitionGroupInfo;
@@ -76,7 +77,7 @@ public class KafkaStreamMetadataProvider extends KafkaPartitionLevelConnectionHa
     // use offset criteria from stream config
     for (int i = currentPartitionGroupsMetadata.size(); i < partitionCount; i++) {
       StreamPartitionMsgOffset streamPartitionMsgOffset =
-          fetchStreamPartitionOffset(_streamConfig.getOffsetCriteria(), 5000);
+          fetchStreamPartitionOffsetInternal(i, _streamConfig.getOffsetCriteria(), 5000);
       newPartitionGroupInfoList.add(new PartitionGroupInfo(i, streamPartitionMsgOffset.toString()));
     }
     return newPartitionGroupInfoList;
@@ -104,6 +105,22 @@ public class KafkaStreamMetadataProvider extends KafkaPartitionLevelConnectionHa
     return new LongMsgOffset(offset);
   }
 
+  private StreamPartitionMsgOffset fetchStreamPartitionOffsetInternal(int partitionId, @Nonnull OffsetCriteria offsetCriteria, long timeoutMillis) {
+    Preconditions.checkNotNull(offsetCriteria);
+    TopicPartition topicPartition = new TopicPartition(_topic, partitionId);
+    long offset = -1;
+    if (offsetCriteria.isLargest()) {
+      offset =  _consumer.endOffsets(Collections.singletonList(topicPartition), Duration.ofMillis(timeoutMillis))
+          .get(topicPartition);
+    } else if (offsetCriteria.isSmallest()) {
+      offset =  _consumer.beginningOffsets(Collections.singletonList(topicPartition), Duration.ofMillis(timeoutMillis))
+          .get(topicPartition);
+    } else {
+      throw new IllegalArgumentException("Unknown initial offset value " + offsetCriteria.toString());
+    }
+    return new LongMsgOffset(offset);
+  }
+
   @Override
   public void close()
       throws IOException {
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamPartitionMsgOffsetFactory.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupCheckpointFactory.java
similarity index 89%
copy from pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamPartitionMsgOffsetFactory.java
copy to pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupCheckpointFactory.java
index d61d32d..14d2f39 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamPartitionMsgOffsetFactory.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupCheckpointFactory.java
@@ -25,7 +25,7 @@ import org.apache.pinot.spi.annotations.InterfaceStability;
  * An interface to be implemented by streams that are consumed using Pinot LLC consumption.
  */
 @InterfaceStability.Evolving
-public interface StreamPartitionMsgOffsetFactory {
+public interface PartitionGroupCheckpointFactory {
   /**
    * Initialization, called once when the factory is created.
    * @param streamConfig
@@ -37,7 +37,7 @@ public interface StreamPartitionMsgOffsetFactory {
    * @param offsetStr
    * @return StreamPartitionMsgOffset
    */
-  StreamPartitionMsgOffset create(String offsetStr);
+  Checkpoint create(String offsetStr);
 
   /**
    * Construct an offset from another one provided, of the same type.
@@ -45,5 +45,5 @@ public interface StreamPartitionMsgOffsetFactory {
    * @param other
    * @return
    */
-  StreamPartitionMsgOffset create(StreamPartitionMsgOffset other);
+  Checkpoint create(Checkpoint other);
 }
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java
index bbbdaad..b421268 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java
@@ -19,8 +19,10 @@
 package org.apache.pinot.spi.stream;
 
 import java.io.Closeable;
+import java.util.concurrent.TimeoutException;
 
 
 public interface PartitionGroupConsumer extends Closeable {
-  FetchResult fetchMessages(Checkpoint start, Checkpoint end, long timeout);
+  MessageBatch fetchMessages(Checkpoint start, Checkpoint end, int timeout)
+      throws TimeoutException;
 }
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionLevelConsumer.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionLevelConsumer.java
index 3a0a1d2..3bedc8a 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionLevelConsumer.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionLevelConsumer.java
@@ -28,7 +28,7 @@ import org.apache.pinot.spi.annotations.InterfaceStability;
  */
 @InterfaceAudience.Public
 @InterfaceStability.Stable
-public interface PartitionLevelConsumer extends Closeable {
+public interface PartitionLevelConsumer extends Closeable, PartitionGroupConsumer {
 
   /**
    * Is here for backward compatibility for a short time.
@@ -62,4 +62,10 @@ public interface PartitionLevelConsumer extends Closeable {
     long endOffsetLong = endOffset == null ? Long.MAX_VALUE : ((LongMsgOffset)endOffset).getOffset();
     return fetchMessages(startOffsetLong, endOffsetLong, timeoutMillis);
   }
+
+  default MessageBatch fetchMessages(Checkpoint startCheckpoint, Checkpoint endCheckpoint, int timeoutMillis)
+      throws java.util.concurrent.TimeoutException {
+    // TODO Issue 5359 remove this default implementation once all kafka consumers have migrated to use this API
+    return fetchMessages((StreamPartitionMsgOffset) startCheckpoint, (StreamPartitionMsgOffset) endCheckpoint, timeoutMillis);
+  }
 }
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java
index 9caf61b..db48a83 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java
@@ -75,5 +75,5 @@ public abstract class StreamConsumerFactory {
   }
 
   // creates a consumer which consumes from a partition group
-  public abstract PartitionGroupConsumer createPartitionGroupConsumer(PartitionGroupMetadata metadata);
+  public abstract PartitionGroupConsumer createPartitionGroupConsumer(String clientId, PartitionGroupMetadata metadata);
 }
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamPartitionMsgOffsetFactory.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamPartitionMsgOffsetFactory.java
index d61d32d..2e3386c 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamPartitionMsgOffsetFactory.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamPartitionMsgOffsetFactory.java
@@ -25,7 +25,7 @@ import org.apache.pinot.spi.annotations.InterfaceStability;
  * An interface to be implemented by streams that are consumed using Pinot LLC consumption.
  */
 @InterfaceStability.Evolving
-public interface StreamPartitionMsgOffsetFactory {
+public interface StreamPartitionMsgOffsetFactory extends PartitionGroupCheckpointFactory{
   /**
    * Initialization, called once when the factory is created.
    * @param streamConfig
@@ -46,4 +46,8 @@ public interface StreamPartitionMsgOffsetFactory {
    * @return
    */
   StreamPartitionMsgOffset create(StreamPartitionMsgOffset other);
+
+  default Checkpoint create(Checkpoint other) {
+    return create((StreamPartitionMsgOffset) other);
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org