You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2021/02/02 02:43:34 UTC
[incubator-pinot] 44/47: End-of-shard as end criteria AND consume
shards in order
This is an automated email from the ASF dual-hosted git repository.
nehapawar pushed a commit to branch sharded_consumer_type_support_with_kinesis
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 751e21205fa53b8c6db01c8ba26aa8b3d5ace424
Author: Neha Pawar <ne...@gmail.com>
AuthorDate: Fri Jan 15 17:38:28 2021 -0800
End-of-shard as end criteria AND consume shards in order
---
.../segment/LLCRealtimeSegmentZKMetadata.java | 5 -
.../protocols/SegmentCompletionProtocol.java | 2 +
.../realtime/PinotLLCRealtimeSegmentManager.java | 31 +++---
.../RealtimeSegmentValidationManager.java | 2 +-
.../PinotLLCRealtimeSegmentManagerTest.java | 2 +-
.../realtime/LLRealtimeSegmentDataManager.java | 35 ++++---
.../kinesis/KinesisStreamMetadataProvider.java | 106 ++++++++++++---------
.../pinot/spi/stream/PartitionGroupInfo.java | 6 +-
.../pinot/spi/stream/PartitionGroupMetadata.java | 16 ++--
.../pinot/spi/stream/StreamMetadataProvider.java | 2 +-
10 files changed, 119 insertions(+), 88 deletions(-)
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/LLCRealtimeSegmentZKMetadata.java b/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/LLCRealtimeSegmentZKMetadata.java
index b8b8d95..7cb19a7 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/LLCRealtimeSegmentZKMetadata.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/LLCRealtimeSegmentZKMetadata.java
@@ -87,11 +87,6 @@ public class LLCRealtimeSegmentZKMetadata extends RealtimeSegmentZKMetadata {
public ZNRecord toZNRecord() {
ZNRecord znRecord = super.toZNRecord();
znRecord.setSimpleField(START_OFFSET, _startOffset);
- if (_endOffset == null) {
- // TODO Issue 5359 Keep this until all components have upgraded to a version that can handle _offset being null
- // For backward compatibility until all components have been upgraded to deal with null value for _endOffset
- _endOffset = Long.toString(Long.MAX_VALUE);
- }
znRecord.setSimpleField(END_OFFSET, _endOffset);
znRecord.setIntField(NUM_REPLICAS, _numReplicas);
znRecord.setSimpleField(DOWNLOAD_URL, _downloadUrl);
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java b/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java
index dd1330d..6dcbda2 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java
@@ -138,6 +138,8 @@ public class SegmentCompletionProtocol {
public static final String REASON_ROW_LIMIT = "rowLimit"; // Stop reason sent by server as max num rows reached
public static final String REASON_TIME_LIMIT = "timeLimit"; // Stop reason sent by server as max time reached
+ public static final String REASON_END_OF_PARTITION_GROUP = "endOfPartitionGroup";
+ // Stop reason sent by server as end of partitionGroup reached
// Canned responses
public static final Response RESP_NOT_LEADER =
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 27d487b..72caaf4 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -78,6 +78,7 @@ import org.apache.pinot.spi.filesystem.PinotFS;
import org.apache.pinot.spi.filesystem.PinotFSFactory;
import org.apache.pinot.spi.stream.Checkpoint;
import org.apache.pinot.spi.stream.OffsetCriteria;
+import org.apache.pinot.spi.stream.PartitionGroupCheckpointFactory;
import org.apache.pinot.spi.stream.PartitionGroupInfo;
import org.apache.pinot.spi.stream.PartitionGroupMetadata;
import org.apache.pinot.spi.stream.PartitionLevelStreamConfig;
@@ -166,7 +167,8 @@ public class PinotLLCRealtimeSegmentManager {
* Using the ideal state and segment metadata, return a list of {@link PartitionGroupMetadata}
* for latest segment of each partition group
*/
- public List<PartitionGroupMetadata> getCurrentPartitionGroupMetadataList(IdealState idealState) {
+ public List<PartitionGroupMetadata> getCurrentPartitionGroupMetadataList(IdealState idealState,
+ StreamConfig streamConfig) {
List<PartitionGroupMetadata> partitionGroupMetadataList = new ArrayList<>();
// From all segment names in the ideal state, find unique partition group ids and their latest segment
@@ -185,6 +187,8 @@ public class PinotLLCRealtimeSegmentManager {
}
// Create a PartitionGroupMetadata for each latest segment
+ PartitionGroupCheckpointFactory checkpointFactory =
+ StreamConsumerFactoryProvider.create(streamConfig).createStreamMsgOffsetFactory();
for (Map.Entry<Integer, LLCSegmentName> entry : partitionGroupIdToLatestSegment.entrySet()) {
int partitionGroupId = entry.getKey();
LLCSegmentName llcSegmentName = entry.getValue();
@@ -195,7 +199,9 @@ public class PinotLLCRealtimeSegmentManager {
(LLCRealtimeSegmentZKMetadata) realtimeSegmentZKMetadata;
PartitionGroupMetadata partitionGroupMetadata =
new PartitionGroupMetadata(partitionGroupId, llcSegmentName.getSequenceNumber(),
- llRealtimeSegmentZKMetadata.getStartOffset(), llRealtimeSegmentZKMetadata.getEndOffset(),
+ checkpointFactory.create(llRealtimeSegmentZKMetadata.getStartOffset()),
+ llRealtimeSegmentZKMetadata.getEndOffset() == null ? null
+ : checkpointFactory.create(llRealtimeSegmentZKMetadata.getEndOffset()),
llRealtimeSegmentZKMetadata.getStatus().toString());
partitionGroupMetadataList.add(partitionGroupMetadata);
}
@@ -498,9 +504,10 @@ public class PinotLLCRealtimeSegmentManager {
// Example: Say we currently were consuming from 2 shards A, B. Of those, A is the one committing.
// Get current partition groups - this gives current state of latest segments for each partition [A - DONE], [B - IN_PROGRESS]
- List<PartitionGroupMetadata> currentPartitionGroupMetadataList = getCurrentPartitionGroupMetadataList(idealState);
PartitionLevelStreamConfig streamConfig = new PartitionLevelStreamConfig(tableConfig.getTableName(),
IngestionConfigUtils.getStreamConfigMap(tableConfig));
+ List<PartitionGroupMetadata> currentPartitionGroupMetadataList =
+ getCurrentPartitionGroupMetadataList(idealState, streamConfig);
// Find new partition groups [A],[B],[C],[D] (assume A split into C D)
// If segment has consumed all of A, we will receive B,C,D
@@ -610,9 +617,7 @@ public class PinotLLCRealtimeSegmentManager {
int numPartitions, int numReplicas) {
String realtimeTableName = tableConfig.getTableName();
String segmentName = newLLCSegmentName.getSegmentName();
- StreamPartitionMsgOffsetFactory offsetFactory =
- StreamConsumerFactoryProvider.create(streamConfig).createStreamMsgOffsetFactory();
- StreamPartitionMsgOffset startOffset = offsetFactory.create(committingSegmentDescriptor.getNextOffset());
+ String startOffset = committingSegmentDescriptor.getNextOffset();
LOGGER
.info("Creating segment ZK metadata for new CONSUMING segment: {} with start offset: {} and creation time: {}",
segmentName, startOffset, creationTimeMs);
@@ -621,7 +626,7 @@ public class PinotLLCRealtimeSegmentManager {
newSegmentZKMetadata.setTableName(realtimeTableName);
newSegmentZKMetadata.setSegmentName(segmentName);
newSegmentZKMetadata.setCreationTime(creationTimeMs);
- newSegmentZKMetadata.setStartOffset(startOffset.toString());
+ newSegmentZKMetadata.setStartOffset(startOffset);
// Leave maxOffset as null.
newSegmentZKMetadata.setNumReplicas(numReplicas);
newSegmentZKMetadata.setStatus(Status.IN_PROGRESS);
@@ -808,7 +813,7 @@ public class PinotLLCRealtimeSegmentManager {
assert idealState != null;
if (idealState.isEnabled()) {
List<PartitionGroupMetadata> currentPartitionGroupMetadataList =
- getCurrentPartitionGroupMetadataList(idealState);
+ getCurrentPartitionGroupMetadataList(idealState, streamConfig);
List<PartitionGroupInfo> newPartitionGroupInfoList =
getPartitionGroupInfoList(streamConfig, currentPartitionGroupMetadataList);
return ensureAllPartitionsConsuming(tableConfig, streamConfig, idealState, newPartitionGroupInfoList);
@@ -1121,7 +1126,7 @@ public class PinotLLCRealtimeSegmentManager {
return idealState;
}
- private StreamPartitionMsgOffset getPartitionGroupStartCheckpoint(StreamConfig streamConfig, int partitionGroupId) {
+ private Checkpoint getPartitionGroupStartCheckpoint(StreamConfig streamConfig, int partitionGroupId) {
Map<String, String> streamConfigMapWithSmallestOffsetCriteria = new HashMap<>(streamConfig.getStreamConfigsMap());
streamConfigMapWithSmallestOffsetCriteria.put(StreamConfigProperties
.constructStreamProperty(streamConfig.getType(), StreamConfigProperties.STREAM_CONSUMER_OFFSET_CRITERIA),
@@ -1130,12 +1135,10 @@ public class PinotLLCRealtimeSegmentManager {
new StreamConfig(streamConfig.getTableNameWithType(), streamConfigMapWithSmallestOffsetCriteria);
List<PartitionGroupInfo> smallestOffsetCriteriaPartitionGroupInfo =
getPartitionGroupInfoList(smallestOffsetCriteriaStreamConfig, Collections.emptyList());
- StreamPartitionMsgOffset partitionStartOffset = null;
+ Checkpoint partitionStartOffset = null;
for (PartitionGroupInfo info : smallestOffsetCriteriaPartitionGroupInfo) {
if (info.getPartitionGroupId() == partitionGroupId) {
- StreamPartitionMsgOffsetFactory factory =
- StreamConsumerFactoryProvider.create(streamConfig).createStreamMsgOffsetFactory();
- partitionStartOffset = factory.create(info.getStartCheckpoint());
+ partitionStartOffset = info.getStartCheckpoint();
break;
}
}
@@ -1155,7 +1158,7 @@ public class PinotLLCRealtimeSegmentManager {
long creationTimeMs, InstancePartitions instancePartitions, int numPartitionGroups, int numReplicas) {
String realtimeTableName = tableConfig.getTableName();
int partitionGroupId = partitionGroupInfo.getPartitionGroupId();
- String startCheckpoint = partitionGroupInfo.getStartCheckpoint();
+ String startCheckpoint = partitionGroupInfo.getStartCheckpoint().toString();
LOGGER.info("Setting up new partition group: {} for table: {}", partitionGroupId, realtimeTableName);
String rawTableName = TableNameBuilder.extractRawTableName(realtimeTableName);
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
index 96604dd..d611433 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
@@ -58,7 +58,7 @@ public class RealtimeSegmentValidationManager extends ControllerPeriodicTask<Rea
LeadControllerManager leadControllerManager, PinotLLCRealtimeSegmentManager llcRealtimeSegmentManager,
ValidationMetrics validationMetrics, ControllerMetrics controllerMetrics) {
super("RealtimeSegmentValidationManager", config.getRealtimeSegmentValidationFrequencyInSeconds(),
- 6000, pinotHelixResourceManager,
+ config.getRealtimeSegmentValidationManagerInitialDelaySeconds(), pinotHelixResourceManager,
leadControllerManager, controllerMetrics);
_llcRealtimeSegmentManager = llcRealtimeSegmentManager;
_validationMetrics = validationMetrics;
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index c19a845..ecbf2ef 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -920,7 +920,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
List<PartitionGroupInfo> getPartitionGroupInfoList(StreamConfig streamConfig,
List<PartitionGroupMetadata> currentPartitionGroupMetadataList) {
return IntStream.range(0, _numPartitions).mapToObj(i -> new PartitionGroupInfo(i,
- PARTITION_OFFSET.toString()))
+ PARTITION_OFFSET))
.collect(Collectors.toList());
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index e6e1402..1083757 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -160,7 +160,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
Checkpoint offset, long buildTimeMillis, long waitTimeMillis, long segmentSizeBytes) {
_segmentTarFile = segmentTarFile;
_metadataFileMap = metadataFileMap;
- _offset = _streamPartitionMsgOffsetFactory.create(offset);
+ _offset = _checkpointFactory.create(offset);
_buildTimeMillis = buildTimeMillis;
_waitTimeMillis = waitTimeMillis;
_segmentSizeBytes = segmentSizeBytes;
@@ -235,11 +235,12 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
private final SegmentVersion _segmentVersion;
private final SegmentBuildTimeLeaseExtender _leaseExtender;
private SegmentBuildDescriptor _segmentBuildDescriptor;
- private StreamConsumerFactory _streamConsumerFactory;
- private PartitionGroupCheckpointFactory _streamPartitionMsgOffsetFactory;
+ private final StreamConsumerFactory _streamConsumerFactory;
+ private final PartitionGroupCheckpointFactory _checkpointFactory;
// Segment end criteria
private volatile long _consumeEndTime = 0;
+ private boolean _endOfPartitionGroup = false;
private Checkpoint _finalOffset; // Used when we want to catch up to this one
private volatile boolean _shouldStop = false;
@@ -307,6 +308,14 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
_numRowsIndexed, _numRowsConsumed, _segmentMaxRowCount);
_stopReason = SegmentCompletionProtocol.REASON_ROW_LIMIT;
return true;
+ } else if (_endOfPartitionGroup) {
+ segmentLogger.info(
+ "Stopping consumption due to end of partitionGroup reached nRows={} numRowsIndexed={}, numRowsConsumed={}",
+ _numRowsIndexed, _numRowsConsumed, _segmentMaxRowCount);
+ _stopReason = SegmentCompletionProtocol.REASON_END_OF_PARTITION_GROUP;
+ // fixme: Handle creating a segment with 0 rows.
+ // Happens if endOfPartitionGroup reached but no rows were consumed
+ return true;
}
return false;
@@ -370,7 +379,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
final long idlePipeSleepTimeMillis = 100;
final long maxIdleCountBeforeStatUpdate = (3 * 60 * 1000) / (idlePipeSleepTimeMillis + _partitionLevelStreamConfig
.getFetchTimeoutMillis()); // 3 minute count
- Checkpoint lastUpdatedOffset = _streamPartitionMsgOffsetFactory
+ Checkpoint lastUpdatedOffset = _checkpointFactory
.create(_currentOffset); // so that we always update the metric when we enter this method.
long consecutiveIdleCount = 0;
// At this point, we know that we can potentially move the offset, so the old saved segment file is not valid
@@ -385,6 +394,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
try {
messageBatch = _partitionGroupConsumer
.fetchMessages(_currentOffset, null, _partitionLevelStreamConfig.getFetchTimeoutMillis());
+ _endOfPartitionGroup = messageBatch.isEndOfPartitionGroup();
consecutiveErrorCount = 0;
} catch (TimeoutException e) {
handleTransientStreamErrors(e);
@@ -411,7 +421,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
// _serverMetrics.setValueOfTableGauge(_metricKeyName, ServerGauge.HIGHEST_KAFKA_OFFSET_CONSUMED, _currentOffset.getOffset());
// _serverMetrics.setValueOfTableGauge(_metricKeyName, ServerGauge.HIGHEST_STREAM_OFFSET_CONSUMED, _currentOffset.getOffset());
_serverMetrics.setValueOfTableGauge(_metricKeyName, ServerGauge.LLC_PARTITION_CONSUMING, 1);
- lastUpdatedOffset = _streamPartitionMsgOffsetFactory.create(_currentOffset);
+ lastUpdatedOffset = _checkpointFactory.create(_currentOffset);
} else {
// We did not consume any rows. Update the partition-consuming metric only if we have been idling for a long time.
// Create a new stream consumer wrapper, in case we are stuck on something.
@@ -669,10 +679,10 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
@VisibleForTesting
protected Checkpoint extractOffset(SegmentCompletionProtocol.Response response) {
if (response.getStreamPartitionMsgOffset() != null) {
- return _streamPartitionMsgOffsetFactory.create(response.getStreamPartitionMsgOffset());
+ return _checkpointFactory.create(response.getStreamPartitionMsgOffset());
} else {
// TODO Issue 5359 Remove this once the protocol is upgraded on server and controller
- return _streamPartitionMsgOffsetFactory.create(Long.toString(response.getOffset()));
+ return _checkpointFactory.create(Long.toString(response.getOffset()));
}
}
@@ -967,7 +977,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
// Remove the segment file before we do anything else.
removeSegmentFile();
_leaseExtender.removeSegment(_segmentNameStr);
- final Checkpoint endOffset = _streamPartitionMsgOffsetFactory.create(llcMetadata.getEndOffset());
+ final Checkpoint endOffset = _checkpointFactory.create(llcMetadata.getEndOffset());
segmentLogger
.info("State: {}, transitioning from CONSUMING to ONLINE (startOffset: {}, endOffset: {})", _state.toString(),
_startOffset, endOffset);
@@ -1127,14 +1137,15 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
_partitionLevelStreamConfig =
new PartitionLevelStreamConfig(_tableNameWithType, IngestionConfigUtils.getStreamConfigMap(_tableConfig));
_streamConsumerFactory = StreamConsumerFactoryProvider.create(_partitionLevelStreamConfig);
- _streamPartitionMsgOffsetFactory =
+ _checkpointFactory =
StreamConsumerFactoryProvider.create(_partitionLevelStreamConfig).createStreamMsgOffsetFactory();
_streamTopic = _partitionLevelStreamConfig.getTopicName();
_segmentNameStr = _segmentZKMetadata.getSegmentName();
_llcSegmentName = llcSegmentName;
_partitionGroupId = _llcSegmentName.getPartitionGroupId();
_partitionGroupMetadata = new PartitionGroupMetadata(_partitionGroupId, _llcSegmentName.getSequenceNumber(),
- _segmentZKMetadata.getStartOffset(), _segmentZKMetadata.getEndOffset(),
+ _checkpointFactory.create(_segmentZKMetadata.getStartOffset()),
+ _segmentZKMetadata.getEndOffset() == null ? null : _checkpointFactory.create(_segmentZKMetadata.getEndOffset()),
_segmentZKMetadata.getStatus().toString());
_partitionGroupConsumerSemaphore = partitionGroupConsumerSemaphore;
_acquiredConsumerSemaphore = new AtomicBoolean(false);
@@ -1279,8 +1290,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
}
_realtimeSegment = new MutableSegmentImpl(realtimeSegmentConfigBuilder.build(), serverMetrics);
- _startOffset = _streamPartitionMsgOffsetFactory.create(_segmentZKMetadata.getStartOffset());
- _currentOffset = _streamPartitionMsgOffsetFactory.create(_startOffset);
+ _startOffset = _checkpointFactory.create(_segmentZKMetadata.getStartOffset());
+ _currentOffset = _checkpointFactory.create(_startOffset);
_resourceTmpDir = new File(resourceDataDir, "_tmp");
if (!_resourceTmpDir.exists()) {
_resourceTmpDir.mkdirs();
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java
index b22bbe4..42150a3 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java
@@ -21,12 +21,15 @@ package org.apache.pinot.plugin.stream.kinesis;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.spi.stream.Checkpoint;
import org.apache.pinot.spi.stream.MessageBatch;
import org.apache.pinot.spi.stream.OffsetCriteria;
import org.apache.pinot.spi.stream.PartitionGroupConsumer;
@@ -69,7 +72,7 @@ public class KinesisStreamMetadataProvider implements StreamMetadataProvider {
/**
* This call returns all active shards, taking into account the consumption status for those shards.
* PartitionGroupInfo is returned for a shard if:
- * 1. It is a branch new shard i.e. no partitionGroupMetadata was found for it in the current list
+ * 1. It is a branch new shard AND its parent has been consumed completely
* 2. It is still being actively consumed from i.e. the consuming partition has not reached the end of the shard
*/
@Override
@@ -77,54 +80,57 @@ public class KinesisStreamMetadataProvider implements StreamMetadataProvider {
List<PartitionGroupMetadata> currentPartitionGroupsMetadata, int timeoutMillis)
throws IOException, TimeoutException {
- Map<Integer, PartitionGroupMetadata> currentPartitionGroupMap = currentPartitionGroupsMetadata.stream()
- .collect(Collectors.toMap(PartitionGroupMetadata::getPartitionGroupId, p -> p));
-
List<PartitionGroupInfo> newPartitionGroupInfos = new ArrayList<>();
- List<Shard> shards = _kinesisConnectionHandler.getShards();
- for (Shard shard : shards) {
- KinesisCheckpoint newStartCheckpoint;
-
- String shardId = shard.shardId();
- int partitionGroupId = getPartitionGroupIdFromShardId(shardId);
- PartitionGroupMetadata currentPartitionGroupMetadata = currentPartitionGroupMap.get(partitionGroupId);
-
- if (currentPartitionGroupMetadata != null) { // existing shard
- KinesisCheckpoint currentEndCheckpoint = null;
- try {
- currentEndCheckpoint = new KinesisCheckpoint(currentPartitionGroupMetadata.getEndCheckpoint());
- } catch (Exception e) {
- // ignore. No end checkpoint yet for IN_PROGRESS segment
- }
- if (currentEndCheckpoint != null) { // end checkpoint available i.e. committing/committed segment
- String endingSequenceNumber = shard.sequenceNumberRange().endingSequenceNumber();
- if (endingSequenceNumber != null) { // shard has ended
- // check if segment has consumed all the messages already
- PartitionGroupConsumer partitionGroupConsumer =
- _kinesisStreamConsumerFactory.createPartitionGroupConsumer(_clientId, currentPartitionGroupMetadata);
-
- MessageBatch messageBatch;
- try {
- messageBatch = partitionGroupConsumer.fetchMessages(currentEndCheckpoint, null, _fetchTimeoutMs);
- } finally {
- partitionGroupConsumer.close();
- }
- if (messageBatch.isEndOfPartitionGroup()) {
- // shard has ended. Skip it from results
- continue;
- }
+
+ Map<String, Shard> shardIdToShardMap =
+ _kinesisConnectionHandler.getShards().stream().collect(Collectors.toMap(Shard::shardId, s -> s));
+ Set<String> shardsInCurrent = new HashSet<>();
+ Set<String> shardsEnded = new HashSet<>();
+
+ // TODO: Once we start supporting multiple shards in a PartitionGroup,
+ // we need to iterate over all shards to check if any of them have reached end
+
+ // Process existing shards. Add them to new list if still consuming from them
+ for (PartitionGroupMetadata currentPartitionGroupMetadata : currentPartitionGroupsMetadata) {
+ KinesisCheckpoint kinesisStartCheckpoint = (KinesisCheckpoint) currentPartitionGroupMetadata.getStartCheckpoint();
+ String shardId = kinesisStartCheckpoint.getShardToStartSequenceMap().keySet().iterator().next();
+ Shard shard = shardIdToShardMap.get(shardId);
+ shardsInCurrent.add(shardId);
+
+ Checkpoint newStartCheckpoint;
+ Checkpoint currentEndCheckpoint = currentPartitionGroupMetadata.getEndCheckpoint();
+ if (currentEndCheckpoint != null) { // Segment DONE (committing/committed)
+ String endingSequenceNumber = shard.sequenceNumberRange().endingSequenceNumber();
+ if (endingSequenceNumber != null) { // Shard has ended, check if we're also done consuming it
+ if (consumedEndOfShard(currentEndCheckpoint, currentPartitionGroupMetadata)) {
+ shardsEnded.add(shardId);
+ continue; // Shard ended and we're done consuming it. Skip
}
- newStartCheckpoint = currentEndCheckpoint;
- } else {
- newStartCheckpoint = new KinesisCheckpoint(currentPartitionGroupMetadata.getStartCheckpoint());
}
- } else { // new shard
+ newStartCheckpoint = currentEndCheckpoint;
+ } else { // Segment IN_PROGRESS
+ newStartCheckpoint = currentPartitionGroupMetadata.getStartCheckpoint();
+ }
+ newPartitionGroupInfos.add(new PartitionGroupInfo(currentPartitionGroupMetadata.getPartitionGroupId(), newStartCheckpoint));
+ }
+
+ // Add new shards. Parent should be null (new table case, very first shards) OR we should be flagged as reached EOL and completely consumed.
+ for (Map.Entry<String, Shard> entry : shardIdToShardMap.entrySet()) {
+ String newShardId = entry.getKey();
+ if (shardsInCurrent.contains(newShardId)) {
+ continue;
+ }
+ Checkpoint newStartCheckpoint;
+ Shard newShard = entry.getValue();
+ String parentShardId = newShard.parentShardId();
+
+ if (parentShardId == null || shardsEnded.contains(parentShardId)) {
Map<String, String> shardToSequenceNumberMap = new HashMap<>();
- shardToSequenceNumberMap.put(shardId, shard.sequenceNumberRange().startingSequenceNumber());
+ shardToSequenceNumberMap.put(newShardId, newShard.sequenceNumberRange().startingSequenceNumber());
newStartCheckpoint = new KinesisCheckpoint(shardToSequenceNumberMap);
+ int partitionGroupId = getPartitionGroupIdFromShardId(newShardId);
+ newPartitionGroupInfos.add(new PartitionGroupInfo(partitionGroupId, newStartCheckpoint));
}
-
- newPartitionGroupInfos.add(new PartitionGroupInfo(partitionGroupId, newStartCheckpoint.serialize()));
}
return newPartitionGroupInfos;
}
@@ -138,6 +144,20 @@ public class KinesisStreamMetadataProvider implements StreamMetadataProvider {
return shardIdNum.isEmpty() ? 0 : Integer.parseInt(shardIdNum);
}
+ private boolean consumedEndOfShard(Checkpoint startCheckpoint, PartitionGroupMetadata partitionGroupMetadata)
+ throws IOException, TimeoutException {
+ PartitionGroupConsumer partitionGroupConsumer =
+ _kinesisStreamConsumerFactory.createPartitionGroupConsumer(_clientId, partitionGroupMetadata);
+
+ MessageBatch messageBatch;
+ try {
+ messageBatch = partitionGroupConsumer.fetchMessages(startCheckpoint, null, _fetchTimeoutMs);
+ } finally {
+ partitionGroupConsumer.close();
+ }
+ return messageBatch.isEndOfPartitionGroup();
+ }
+
@Override
public void close() {
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupInfo.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupInfo.java
index 758953d..b06e878 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupInfo.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupInfo.java
@@ -27,9 +27,9 @@ package org.apache.pinot.spi.stream;
public class PartitionGroupInfo {
private final int _partitionGroupId;
- private final String _startCheckpoint;
+ private final Checkpoint _startCheckpoint;
- public PartitionGroupInfo(int partitionGroupId, String startCheckpoint) {
+ public PartitionGroupInfo(int partitionGroupId, Checkpoint startCheckpoint) {
_partitionGroupId = partitionGroupId;
_startCheckpoint = startCheckpoint;
}
@@ -38,7 +38,7 @@ public class PartitionGroupInfo {
return _partitionGroupId;
}
- public String getStartCheckpoint() {
+ public Checkpoint getStartCheckpoint() {
return _startCheckpoint;
}
}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadata.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadata.java
index a99a82b..1ac12fb 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadata.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadata.java
@@ -22,12 +22,12 @@ public class PartitionGroupMetadata {
private final int _partitionGroupId;
private int _sequenceNumber;
- private String _startCheckpoint;
- private String _endCheckpoint;
+ private Checkpoint _startCheckpoint;
+ private Checkpoint _endCheckpoint;
private String _status;
- public PartitionGroupMetadata(int partitionGroupId, int sequenceNumber, String startCheckpoint,
- String endCheckpoint, String status) {
+ public PartitionGroupMetadata(int partitionGroupId, int sequenceNumber, Checkpoint startCheckpoint,
+ Checkpoint endCheckpoint, String status) {
_partitionGroupId = partitionGroupId;
_sequenceNumber = sequenceNumber;
_startCheckpoint = startCheckpoint;
@@ -47,19 +47,19 @@ public class PartitionGroupMetadata {
_sequenceNumber = sequenceNumber;
}
- public String getStartCheckpoint() {
+ public Checkpoint getStartCheckpoint() {
return _startCheckpoint;
}
- public void setStartCheckpoint(String startCheckpoint) {
+ public void setStartCheckpoint(Checkpoint startCheckpoint) {
_startCheckpoint = startCheckpoint;
}
- public String getEndCheckpoint() {
+ public Checkpoint getEndCheckpoint() {
return _endCheckpoint;
}
- public void setEndCheckpoint(String endCheckpoint) {
+ public void setEndCheckpoint(Checkpoint endCheckpoint) {
_endCheckpoint = endCheckpoint;
}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
index cecc708..4b2751c 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
@@ -82,7 +82,7 @@ public interface StreamMetadataProvider extends Closeable {
streamConsumerFactory.createPartitionMetadataProvider(clientId, i);
StreamPartitionMsgOffset streamPartitionMsgOffset =
partitionMetadataProvider.fetchStreamPartitionOffset(streamConfig.getOffsetCriteria(), timeoutMillis);
- newPartitionGroupInfoList.add(new PartitionGroupInfo(i, streamPartitionMsgOffset.toString()));
+ newPartitionGroupInfoList.add(new PartitionGroupInfo(i, streamPartitionMsgOffset));
}
return newPartitionGroupInfoList;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org