You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2021/01/03 01:13:41 UTC
[incubator-pinot] branch sharded_consumer_type_support updated:
Server side changes and some fixes
This is an automated email from the ASF dual-hosted git repository.
nehapawar pushed a commit to branch sharded_consumer_type_support
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/sharded_consumer_type_support by this push:
new 9aa7e58 Server side changes and some fixes
9aa7e58 is described below
commit 9aa7e58677af4f00969777d547e32c57ec9e16d8
Author: Neha Pawar <ne...@gmail.com>
AuthorDate: Sat Jan 2 17:10:21 2021 -0800
Server side changes and some fixes
---
.../realtime/LLRealtimeSegmentDataManager.java | 43 ++++++++++------------
.../realtime/SegmentBuildTimeLeaseExtender.java | 3 +-
.../realtime/LLRealtimeSegmentDataManagerTest.java | 7 ++--
.../impl/fakestream/FakeStreamConsumerFactory.java | 2 +-
...lakyConsumerRealtimeClusterIntegrationTest.java | 2 +-
.../stream/kafka09/KafkaConsumerFactory.java | 2 +-
.../stream/kafka20/KafkaConsumerFactory.java | 4 +-
.../kafka20/KafkaPartitionLevelConsumer.java | 1 +
.../kafka20/KafkaStreamMetadataProvider.java | 19 +++++++++-
...y.java => PartitionGroupCheckpointFactory.java} | 6 +--
.../pinot/spi/stream/PartitionGroupConsumer.java | 4 +-
.../pinot/spi/stream/PartitionLevelConsumer.java | 8 +++-
.../pinot/spi/stream/StreamConsumerFactory.java | 2 +-
.../stream/StreamPartitionMsgOffsetFactory.java | 6 ++-
14 files changed, 69 insertions(+), 40 deletions(-)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index dabe748..3d84738 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -34,7 +34,6 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
@@ -71,8 +70,9 @@ import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
-import org.apache.pinot.spi.stream.FetchResult;
+import org.apache.pinot.spi.stream.Checkpoint;
import org.apache.pinot.spi.stream.MessageBatch;
+import org.apache.pinot.spi.stream.PartitionGroupCheckpointFactory;
import org.apache.pinot.spi.stream.PartitionGroupConsumer;
import org.apache.pinot.spi.stream.PartitionGroupMetadata;
import org.apache.pinot.spi.stream.PartitionLevelStreamConfig;
@@ -83,8 +83,6 @@ import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
import org.apache.pinot.spi.stream.StreamDecoderProvider;
import org.apache.pinot.spi.stream.StreamMessageDecoder;
import org.apache.pinot.spi.stream.StreamMetadataProvider;
-import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
-import org.apache.pinot.spi.stream.StreamPartitionMsgOffsetFactory;
import org.apache.pinot.spi.stream.TransientConsumerException;
import org.apache.pinot.spi.utils.IngestionConfigUtils;
import org.joda.time.DateTime;
@@ -152,13 +150,13 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
public class SegmentBuildDescriptor {
final File _segmentTarFile;
final Map<String, File> _metadataFileMap;
- final StreamPartitionMsgOffset _offset;
+ final Checkpoint _offset;
final long _waitTimeMillis;
final long _buildTimeMillis;
final long _segmentSizeBytes;
public SegmentBuildDescriptor(@Nullable File segmentTarFile, @Nullable Map<String, File> metadataFileMap,
- StreamPartitionMsgOffset offset, long buildTimeMillis, long waitTimeMillis, long segmentSizeBytes) {
+ Checkpoint offset, long buildTimeMillis, long waitTimeMillis, long segmentSizeBytes) {
_segmentTarFile = segmentTarFile;
_metadataFileMap = metadataFileMap;
_offset = _streamPartitionMsgOffsetFactory.create(offset);
@@ -167,7 +165,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
_segmentSizeBytes = segmentSizeBytes;
}
- public StreamPartitionMsgOffset getOffset() {
+ public Checkpoint getOffset() {
return _offset;
}
@@ -225,7 +223,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
private final String _metricKeyName;
private final ServerMetrics _serverMetrics;
private final MutableSegmentImpl _realtimeSegment;
- private StreamPartitionMsgOffset _currentOffset;
+ private Checkpoint _currentOffset;
private volatile State _state;
private volatile int _numRowsConsumed = 0;
private volatile int _numRowsIndexed = 0; // Can be different from _numRowsConsumed when metrics update is enabled.
@@ -237,11 +235,11 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
private final SegmentBuildTimeLeaseExtender _leaseExtender;
private SegmentBuildDescriptor _segmentBuildDescriptor;
private StreamConsumerFactory _streamConsumerFactory;
- private StreamPartitionMsgOffsetFactory _streamPartitionMsgOffsetFactory;
+ private PartitionGroupCheckpointFactory _streamPartitionMsgOffsetFactory;
// Segment end criteria
private volatile long _consumeEndTime = 0;
- private StreamPartitionMsgOffset _finalOffset; // Used when we want to catch up to this one
+ private Checkpoint _finalOffset; // Used when we want to catch up to this one
private volatile boolean _shouldStop = false;
// It takes 30s to locate controller leader, and more if there are multiple controller failures.
@@ -271,7 +269,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
private final String _instanceId;
private final ServerSegmentCompletionProtocolHandler _protocolHandler;
private final long _consumeStartTime;
- private final StreamPartitionMsgOffset _startOffset;
+ private final Checkpoint _startOffset;
private final PartitionLevelStreamConfig _partitionLevelStreamConfig;
private long _lastLogTime = 0;
@@ -370,7 +368,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
final long idlePipeSleepTimeMillis = 100;
final long maxIdleCountBeforeStatUpdate = (3 * 60 * 1000) / (idlePipeSleepTimeMillis + _partitionLevelStreamConfig
.getFetchTimeoutMillis()); // 3 minute count
- StreamPartitionMsgOffset lastUpdatedOffset = _streamPartitionMsgOffsetFactory
+ Checkpoint lastUpdatedOffset = _streamPartitionMsgOffsetFactory
.create(_currentOffset); // so that we always update the metric when we enter this method.
long consecutiveIdleCount = 0;
// At this point, we know that we can potentially move the offset, so the old saved segment file is not valid
@@ -383,9 +381,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
// Update _currentOffset upon return from this method
MessageBatch messageBatch;
try {
- FetchResult fetchResult = _partitionGroupConsumer
+ messageBatch = _partitionGroupConsumer
.fetchMessages(_currentOffset, null, _partitionLevelStreamConfig.getFetchTimeoutMillis());
- messageBatch = fetchResult.getMessages();
consecutiveErrorCount = 0;
} catch (TransientConsumerException e) {
handleTransientStreamErrors(e);
@@ -559,7 +556,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
_state = State.HOLDING;
SegmentCompletionProtocol.Response response = postSegmentConsumedMsg();
SegmentCompletionProtocol.ControllerResponseStatus status = response.getStatus();
- StreamPartitionMsgOffset rspOffset = extractOffset(response);
+ Checkpoint rspOffset = extractOffset(response);
boolean success;
switch (status) {
case NOT_LEADER:
@@ -665,7 +662,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
}
@VisibleForTesting
- protected StreamPartitionMsgOffset extractOffset(SegmentCompletionProtocol.Response response) {
+ protected Checkpoint extractOffset(SegmentCompletionProtocol.Response response) {
if (response.getStreamPartitionMsgOffset() != null) {
return _streamPartitionMsgOffsetFactory.create(response.getStreamPartitionMsgOffset());
} else {
@@ -721,7 +718,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
}
@VisibleForTesting
- protected StreamPartitionMsgOffset getCurrentOffset() {
+ protected Checkpoint getCurrentOffset() {
return _currentOffset;
}
@@ -890,14 +887,14 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
}
private void closeKafkaConsumers() {
- closePartitionLevelConsumer();
+ closePartitionGroupConsumer();
closeStreamMetadataProvider();
if (_acquiredConsumerSemaphore.compareAndSet(true, false)) {
_partitionGroupConsumerSemaphore.release();
}
}
- private void closePartitionLevelConsumer() {
+ private void closePartitionGroupConsumer() {
try {
_partitionGroupConsumer.close();
} catch (Exception e) {
@@ -965,7 +962,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
// Remove the segment file before we do anything else.
removeSegmentFile();
_leaseExtender.removeSegment(_segmentNameStr);
- final StreamPartitionMsgOffset endOffset = _streamPartitionMsgOffsetFactory.create(llcMetadata.getEndOffset());
+ final Checkpoint endOffset = _streamPartitionMsgOffsetFactory.create(llcMetadata.getEndOffset());
segmentLogger
.info("State: {}, transitioning from CONSUMING to ONLINE (startOffset: {}, endOffset: {})", _state.toString(),
_startOffset, endOffset);
@@ -1042,7 +1039,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
return System.currentTimeMillis();
}
- private boolean catchupToFinalOffset(StreamPartitionMsgOffset endOffset, long timeoutMs) {
+ private boolean catchupToFinalOffset(Checkpoint endOffset, long timeoutMs) {
_finalOffset = endOffset;
_consumeEndTime = now() + timeoutMs;
_state = State.CONSUMING_TO_ONLINE;
@@ -1310,10 +1307,10 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
*/
private void makeStreamConsumer(String reason) {
if (_partitionGroupConsumer != null) {
- closePartitionLevelConsumer();
+ closePartitionGroupConsumer();
}
segmentLogger.info("Creating new stream consumer, reason: {}", reason);
- _partitionGroupConsumer = _streamConsumerFactory.createPartitionGroupConsumer(_partitionGroupMetadata);
+ _partitionGroupConsumer = _streamConsumerFactory.createPartitionGroupConsumer(_clientId, _partitionGroupMetadata);
}
/**
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentBuildTimeLeaseExtender.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentBuildTimeLeaseExtender.java
index 69d7e80..b1a1342 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentBuildTimeLeaseExtender.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentBuildTimeLeaseExtender.java
@@ -29,6 +29,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler;
+import org.apache.pinot.spi.stream.Checkpoint;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -91,7 +92,7 @@ public class SegmentBuildTimeLeaseExtender {
* @param initialBuildTimeMs is the initial time budget that SegmentCompletionManager has allocated.
* @param offset The offset at which this segment is being built.
*/
- public void addSegment(String segmentId, long initialBuildTimeMs, StreamPartitionMsgOffset offset) {
+ public void addSegment(String segmentId, long initialBuildTimeMs, Checkpoint offset) {
final long initialDelayMs = initialBuildTimeMs * 9 / 10;
final SegmentCompletionProtocol.Request.Params reqParams = new SegmentCompletionProtocol.Request.Params();
reqParams.withStreamPartitionMsgOffset(offset.toString()).withSegmentName(segmentId).withExtraTimeSec(EXTRA_TIME_SECONDS)
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
index d09bdeb..d7aec8d 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
@@ -46,6 +46,7 @@ import org.apache.pinot.core.upsert.PartitionUpsertMetadataManager;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.stream.Checkpoint;
import org.apache.pinot.spi.stream.LongMsgOffset;
import org.apache.pinot.spi.stream.LongMsgOffsetFactory;
import org.apache.pinot.spi.stream.PermanentConsumerException;
@@ -193,7 +194,7 @@ public class LLRealtimeSegmentDataManagerTest {
+ " \"status\" : \"CATCH_UP\""
+ "}";
SegmentCompletionProtocol.Response response = SegmentCompletionProtocol.Response.fromJsonString(responseStr);
- StreamPartitionMsgOffset extractedOffset = segmentDataManager.extractOffset(response);
+ Checkpoint extractedOffset = segmentDataManager.extractOffset(response);
Assert.assertEquals(extractedOffset.compareTo(new LongMsgOffset(offset)), 0);
}
{
@@ -207,7 +208,7 @@ public class LLRealtimeSegmentDataManagerTest {
+ " \"status\" : \"CATCH_UP\""
+ "}";
SegmentCompletionProtocol.Response response = SegmentCompletionProtocol.Response.fromJsonString(responseStr);
- StreamPartitionMsgOffset extractedOffset = segmentDataManager.extractOffset(response);
+ Checkpoint extractedOffset = segmentDataManager.extractOffset(response);
Assert.assertEquals(extractedOffset.compareTo(new LongMsgOffset(offset)), 0);
}
{
@@ -221,7 +222,7 @@ public class LLRealtimeSegmentDataManagerTest {
+ " \"status\" : \"CATCH_UP\""
+ "}";
SegmentCompletionProtocol.Response response = SegmentCompletionProtocol.Response.fromJsonString(responseStr);
- StreamPartitionMsgOffset extractedOffset = segmentDataManager.extractOffset(response);
+ Checkpoint extractedOffset = segmentDataManager.extractOffset(response);
Assert.assertEquals(extractedOffset.compareTo(new LongMsgOffset(offset)), 0);
}
segmentDataManager.destroy();
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java
index 6121eef..fbeb808 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java
@@ -71,7 +71,7 @@ public class FakeStreamConsumerFactory extends StreamConsumerFactory {
@Override
- public PartitionGroupConsumer createPartitionGroupConsumer(PartitionGroupMetadata metadata) {
+ public PartitionGroupConsumer createPartitionGroupConsumer(String clientId, PartitionGroupMetadata metadata) {
return null;
}
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/FlakyConsumerRealtimeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/FlakyConsumerRealtimeClusterIntegrationTest.java
index d917d73..c7523e3 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/FlakyConsumerRealtimeClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/FlakyConsumerRealtimeClusterIntegrationTest.java
@@ -121,7 +121,7 @@ public class FlakyConsumerRealtimeClusterIntegrationTest extends RealtimeCluster
}
@Override
- public PartitionGroupConsumer createPartitionGroupConsumer(PartitionGroupMetadata metadata) {
+ public PartitionGroupConsumer createPartitionGroupConsumer(String clientId, PartitionGroupMetadata metadata) {
return null;
}
}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaConsumerFactory.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaConsumerFactory.java
index 82c282c..fe5a461 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaConsumerFactory.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaConsumerFactory.java
@@ -54,7 +54,7 @@ public class KafkaConsumerFactory extends StreamConsumerFactory {
}
@Override
- public PartitionGroupConsumer createPartitionGroupConsumer(PartitionGroupMetadata metadata) {
+ public PartitionGroupConsumer createPartitionGroupConsumer(String clientId, PartitionGroupMetadata metadata) {
return null;
}
}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConsumerFactory.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConsumerFactory.java
index c73aacb..b6746ff 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConsumerFactory.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConsumerFactory.java
@@ -51,7 +51,7 @@ public class KafkaConsumerFactory extends StreamConsumerFactory {
}
@Override
- public PartitionGroupConsumer createPartitionGroupConsumer(PartitionGroupMetadata metadata) {
- return null;
+ public PartitionGroupConsumer createPartitionGroupConsumer(String clientId, PartitionGroupMetadata metadata) {
+ return new KafkaPartitionLevelConsumer(clientId, _streamConfig, metadata.getPartitionGroupId());
}
}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java
index f9b4365..25b1742 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java
@@ -26,6 +26,7 @@ import java.util.concurrent.TimeoutException;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.utils.Bytes;
+import org.apache.pinot.spi.stream.Checkpoint;
import org.apache.pinot.spi.stream.LongMsgOffset;
import org.apache.pinot.spi.stream.MessageBatch;
import org.apache.pinot.spi.stream.PartitionLevelConsumer;
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
index ef22b6a..1d3162a 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
@@ -27,6 +27,7 @@ import java.util.List;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
+import org.apache.kafka.common.TopicPartition;
import org.apache.pinot.spi.stream.LongMsgOffset;
import org.apache.pinot.spi.stream.OffsetCriteria;
import org.apache.pinot.spi.stream.PartitionGroupInfo;
@@ -76,7 +77,7 @@ public class KafkaStreamMetadataProvider extends KafkaPartitionLevelConnectionHa
// use offset criteria from stream config
for (int i = currentPartitionGroupsMetadata.size(); i < partitionCount; i++) {
StreamPartitionMsgOffset streamPartitionMsgOffset =
- fetchStreamPartitionOffset(_streamConfig.getOffsetCriteria(), 5000);
+ fetchStreamPartitionOffsetInternal(i, _streamConfig.getOffsetCriteria(), 5000);
newPartitionGroupInfoList.add(new PartitionGroupInfo(i, streamPartitionMsgOffset.toString()));
}
return newPartitionGroupInfoList;
@@ -104,6 +105,22 @@ public class KafkaStreamMetadataProvider extends KafkaPartitionLevelConnectionHa
return new LongMsgOffset(offset);
}
+ private StreamPartitionMsgOffset fetchStreamPartitionOffsetInternal(int partitionId, @Nonnull OffsetCriteria offsetCriteria, long timeoutMillis) {
+ Preconditions.checkNotNull(offsetCriteria);
+ TopicPartition topicPartition = new TopicPartition(_topic, partitionId);
+ long offset = -1;
+ if (offsetCriteria.isLargest()) {
+ offset = _consumer.endOffsets(Collections.singletonList(topicPartition), Duration.ofMillis(timeoutMillis))
+ .get(topicPartition);
+ } else if (offsetCriteria.isSmallest()) {
+ offset = _consumer.beginningOffsets(Collections.singletonList(topicPartition), Duration.ofMillis(timeoutMillis))
+ .get(topicPartition);
+ } else {
+ throw new IllegalArgumentException("Unknown initial offset value " + offsetCriteria.toString());
+ }
+ return new LongMsgOffset(offset);
+ }
+
@Override
public void close()
throws IOException {
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamPartitionMsgOffsetFactory.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupCheckpointFactory.java
similarity index 89%
copy from pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamPartitionMsgOffsetFactory.java
copy to pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupCheckpointFactory.java
index d61d32d..14d2f39 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamPartitionMsgOffsetFactory.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupCheckpointFactory.java
@@ -25,7 +25,7 @@ import org.apache.pinot.spi.annotations.InterfaceStability;
* An interface to be implemented by streams that are consumed using Pinot LLC consumption.
*/
@InterfaceStability.Evolving
-public interface StreamPartitionMsgOffsetFactory {
+public interface PartitionGroupCheckpointFactory {
/**
* Initialization, called once when the factory is created.
* @param streamConfig
@@ -37,7 +37,7 @@ public interface StreamPartitionMsgOffsetFactory {
* @param offsetStr
* @return StreamPartitionMsgOffset
*/
- StreamPartitionMsgOffset create(String offsetStr);
+ Checkpoint create(String offsetStr);
/**
* Construct an offset from another one provided, of the same type.
@@ -45,5 +45,5 @@ public interface StreamPartitionMsgOffsetFactory {
* @param other
* @return
*/
- StreamPartitionMsgOffset create(StreamPartitionMsgOffset other);
+ Checkpoint create(Checkpoint other);
}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java
index bbbdaad..b421268 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java
@@ -19,8 +19,10 @@
package org.apache.pinot.spi.stream;
import java.io.Closeable;
+import java.util.concurrent.TimeoutException;
public interface PartitionGroupConsumer extends Closeable {
- FetchResult fetchMessages(Checkpoint start, Checkpoint end, long timeout);
+ MessageBatch fetchMessages(Checkpoint start, Checkpoint end, int timeout)
+ throws TimeoutException;
}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionLevelConsumer.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionLevelConsumer.java
index 3a0a1d2..3bedc8a 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionLevelConsumer.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionLevelConsumer.java
@@ -28,7 +28,7 @@ import org.apache.pinot.spi.annotations.InterfaceStability;
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
-public interface PartitionLevelConsumer extends Closeable {
+public interface PartitionLevelConsumer extends Closeable, PartitionGroupConsumer {
/**
* Is here for backward compatibility for a short time.
@@ -62,4 +62,10 @@ public interface PartitionLevelConsumer extends Closeable {
long endOffsetLong = endOffset == null ? Long.MAX_VALUE : ((LongMsgOffset)endOffset).getOffset();
return fetchMessages(startOffsetLong, endOffsetLong, timeoutMillis);
}
+
+ default MessageBatch fetchMessages(Checkpoint startCheckpoint, Checkpoint endCheckpoint, int timeoutMillis)
+ throws java.util.concurrent.TimeoutException {
+ // TODO Issue 5359 remove this default implementation once all kafka consumers have migrated to use this API
+ return fetchMessages((StreamPartitionMsgOffset) startCheckpoint, (StreamPartitionMsgOffset) endCheckpoint, timeoutMillis);
+ }
}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java
index 9caf61b..db48a83 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java
@@ -75,5 +75,5 @@ public abstract class StreamConsumerFactory {
}
// creates a consumer which consumes from a partition group
- public abstract PartitionGroupConsumer createPartitionGroupConsumer(PartitionGroupMetadata metadata);
+ public abstract PartitionGroupConsumer createPartitionGroupConsumer(String clientId, PartitionGroupMetadata metadata);
}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamPartitionMsgOffsetFactory.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamPartitionMsgOffsetFactory.java
index d61d32d..2e3386c 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamPartitionMsgOffsetFactory.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamPartitionMsgOffsetFactory.java
@@ -25,7 +25,7 @@ import org.apache.pinot.spi.annotations.InterfaceStability;
* An interface to be implemented by streams that are consumed using Pinot LLC consumption.
*/
@InterfaceStability.Evolving
-public interface StreamPartitionMsgOffsetFactory {
+public interface StreamPartitionMsgOffsetFactory extends PartitionGroupCheckpointFactory{
/**
* Initialization, called once when the factory is created.
* @param streamConfig
@@ -46,4 +46,8 @@ public interface StreamPartitionMsgOffsetFactory {
* @return
*/
StreamPartitionMsgOffset create(StreamPartitionMsgOffset other);
+
+ default Checkpoint create(Checkpoint other) {
+ return create((StreamPartitionMsgOffset) other);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org