You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2021/02/02 02:43:30 UTC
[incubator-pinot] 40/47: Dont create new CONSUMING segment if shard
has reached end of life
This is an automated email from the ASF dual-hosted git repository.
nehapawar pushed a commit to branch sharded_consumer_type_support_with_kinesis
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 214c007c2915c8aa149e1e06689e66abaa85b083
Author: Neha Pawar <ne...@gmail.com>
AuthorDate: Thu Jan 7 16:07:16 2021 -0800
Dont create new CONSUMING segment if shard has reached end of life
---
.../protocols/SegmentCompletionProtocol.java | 1 +
.../realtime/PinotLLCRealtimeSegmentManager.java | 101 ++++++++++-----------
.../RealtimeSegmentValidationManager.java | 2 +-
.../PinotLLCRealtimeSegmentManagerTest.java | 3 +-
.../realtime/LLRealtimeSegmentDataManager.java | 13 ++-
.../plugin/stream/kinesis/KinesisConsumer.java | 8 +-
.../stream/kinesis/KinesisConsumerFactory.java | 2 +-
.../plugin/stream/kinesis/KinesisRecordsBatch.java | 9 +-
.../kinesis/KinesisStreamMetadataProvider.java | 48 +++++++---
.../org/apache/pinot/spi/stream/MessageBatch.java | 7 ++
10 files changed, 115 insertions(+), 79 deletions(-)
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java b/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java
index dd1330d..74614df 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java
@@ -138,6 +138,7 @@ public class SegmentCompletionProtocol {
public static final String REASON_ROW_LIMIT = "rowLimit"; // Stop reason sent by server as max num rows reached
public static final String REASON_TIME_LIMIT = "timeLimit"; // Stop reason sent by server as max time reached
+ public static final String REASON_END_OF_PARTITION_GROUP = "endOfPartitionGroup"; // Stop reason sent by server as end of partitionGroup reached
// Canned responses
public static final Response RESP_NOT_LEADER =
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index bbd1ef3..9fa6850 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -477,7 +477,6 @@ public class PinotLLCRealtimeSegmentManager {
Preconditions
.checkState(idealState.getInstanceStateMap(committingSegmentName).containsValue(SegmentStateModel.CONSUMING),
"Failed to find instance in CONSUMING state in IdealState for segment: %s", committingSegmentName);
- int numPartitions = getNumPartitionsFromIdealState(idealState);
int numReplicas = getNumReplicas(tableConfig, instancePartitions);
/*
@@ -496,18 +495,21 @@ public class PinotLLCRealtimeSegmentManager {
// Step-2
- // Say we currently were consuming from 3 shards A, B, C. Of those, A is the one committing. Also suppose that new partition D has come up
+ // Say we currently were consuming from 2 shards A, B. Of those, A is the one committing.
- // get current partition groups - this gives current state of latest segments for each partition [A - DONE], [B - IN_PROGRESS], [C - IN_PROGRESS]
+ // get current partition groups - this gives current state of latest segments for each partition [A - DONE], [B - IN_PROGRESS]
List<PartitionGroupMetadata> currentPartitionGroupMetadataList = getCurrentPartitionGroupMetadataList(idealState);
PartitionLevelStreamConfig streamConfig = new PartitionLevelStreamConfig(tableConfig.getTableName(),
IngestionConfigUtils.getStreamConfigMap(tableConfig));
- // find new partition groups [A],[B],[C],[D]
+ // find new partition groups [A],[B],[C],[D] (assume A split into C D)
+ // If segment has consumed all of A, we will receive B,C,D
+ // If segment is still not reached last msg of A, we will receive A,B,C,D
List<PartitionGroupInfo> newPartitionGroupInfoList =
getPartitionGroupInfoList(streamConfig, currentPartitionGroupMetadataList);
+ int numPartitions = newPartitionGroupInfoList.size();
- // create new segment metadata, only if it is not IN_PROGRESS in the current state
+ // create new segment metadata, only if PartitionGroupInfo was returned for it in the newPartitionGroupInfoList
Map<Integer, PartitionGroupMetadata> currentGroupIdToMetadata = currentPartitionGroupMetadataList.stream().collect(
Collectors.toMap(PartitionGroupMetadata::getPartitionGroupId, p -> p));
@@ -519,36 +521,25 @@ public class PinotLLCRealtimeSegmentManager {
PartitionGroupMetadata currentPartitionGroupMetadata = currentGroupIdToMetadata.get(newPartitionGroupId);
if (currentPartitionGroupMetadata == null) { // not present in current state. New partition found.
// make new segment
- // FIXME: flushThreshold of segment is actually (configured threshold/numPartitions)
- // In Kinesis, with every split/merge, we get new partitions, and an old partition gets deactivated.
- // However, the getPartitionGroupInfo call returns ALL shards, regardless of whether they're active or not.
- // So our numPartitions will forever keep increasing.
- // TODO: can the getPartitionGroupInfo return the active partitions only, based on the checkpoints passed in current?
+ // fixme: letting validation manager do this would be best, otherwise we risk creating multiple CONSUMING segments
String newLLCSegmentName =
setupNewPartitionGroup(tableConfig, streamConfig, partitionGroupInfo, newSegmentCreationTimeMs,
instancePartitions, numPartitions, numReplicas);
newConsumingSegmentNames.add(newLLCSegmentName);
} else {
- String currentStatus = currentPartitionGroupMetadata.getStatus();
- if (!currentStatus.equals(Status.IN_PROGRESS.toString())) {
- // not IN_PROGRESS anymore in current state. Should be DONE.
- // This should ONLY happen for the committing segment's partition. Need to trigger new consuming segment
- // todo: skip this if the partition doesn't match with the committing segment?
+ LLCSegmentName committingLLCSegment = new LLCSegmentName(committingSegmentName);
+ // Update this only for committing segment. All other partitions should get updated by their own commit call
+ if (newPartitionGroupId == committingLLCSegment.getPartitionGroupId()) {
+ Preconditions.checkState(currentPartitionGroupMetadata.getStatus().equals(Status.DONE.toString()));
LLCSegmentName newLLCSegmentName = new LLCSegmentName(rawTableName, newPartitionGroupId,
currentPartitionGroupMetadata.getSequenceNumber() + 1, newSegmentCreationTimeMs);
createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName, newSegmentCreationTimeMs,
committingSegmentDescriptor, committingSegmentZKMetadata, instancePartitions, numPartitions, numReplicas);
newConsumingSegmentNames.add(newLLCSegmentName.getSegmentName());
-
- // FIXME: a new CONSUMING segment is created even if EOL for this shard has been reached.
- // the logic in getPartitionGroupInfo to prevent returning of EOLed shards isn't working
- // OPTION: Since consumer knows about it, it can pass param in request/committingSegmentDescriptor "isEndOfShard"
- // We can set that in metadata for validation manager to skip these partitions
}
}
}
-
// Step-3
SegmentAssignment segmentAssignment = SegmentAssignmentFactory.getSegmentAssignment(_helixManager, tableConfig);
Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap =
@@ -840,8 +831,9 @@ public class PinotLLCRealtimeSegmentManager {
if (idealState.isEnabled()) {
List<PartitionGroupMetadata> currentPartitionGroupMetadataList =
getCurrentPartitionGroupMetadataList(idealState);
- int numPartitions = getPartitionGroupInfoList(streamConfig, currentPartitionGroupMetadataList).size();
- return ensureAllPartitionsConsuming(tableConfig, streamConfig, idealState, numPartitions);
+ List<PartitionGroupInfo> newPartitionGroupInfoList =
+ getPartitionGroupInfoList(streamConfig, currentPartitionGroupMetadataList);
+ return ensureAllPartitionsConsuming(tableConfig, streamConfig, idealState, newPartitionGroupInfoList);
} else {
LOGGER.info("Skipping LLC segments validation for disabled table: {}", realtimeTableName);
return idealState;
@@ -988,11 +980,14 @@ public class PinotLLCRealtimeSegmentManager {
*/
@VisibleForTesting
IdealState ensureAllPartitionsConsuming(TableConfig tableConfig, PartitionLevelStreamConfig streamConfig,
- IdealState idealState, int numPartitions) {
+ IdealState idealState, List<PartitionGroupInfo> newPartitionGroupInfoList) {
String realtimeTableName = tableConfig.getTableName();
InstancePartitions instancePartitions = getConsumingInstancePartitions(tableConfig);
int numReplicas = getNumReplicas(tableConfig, instancePartitions);
+ int numPartitions = newPartitionGroupInfoList.size();
+ Set<Integer> newPartitionGroupSet =
+ newPartitionGroupInfoList.stream().map(PartitionGroupInfo::getPartitionGroupId).collect(Collectors.toSet());
SegmentAssignment segmentAssignment = SegmentAssignmentFactory.getSegmentAssignment(_helixManager, tableConfig);
Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap =
@@ -1029,7 +1024,7 @@ public class PinotLLCRealtimeSegmentManager {
Map<String, String> instanceStateMap = instanceStatesMap.get(latestSegmentName);
if (instanceStateMap != null) {
// Latest segment of metadata is in idealstate.
- if (instanceStateMap.values().contains(SegmentStateModel.CONSUMING)) {
+ if (instanceStateMap.containsValue(SegmentStateModel.CONSUMING)) {
if (latestSegmentZKMetadata.getStatus() == Status.DONE) {
// step-1 of commmitSegmentMetadata is done (i.e. marking old segment as DONE)
@@ -1040,15 +1035,23 @@ public class PinotLLCRealtimeSegmentManager {
}
LOGGER.info("Repairing segment: {} which is DONE in segment ZK metadata, but is CONSUMING in IdealState",
latestSegmentName);
-
- LLCSegmentName newLLCSegmentName = getNextLLCSegmentName(latestLLCSegmentName, currentTimeMs);
- String newSegmentName = newLLCSegmentName.getSegmentName();
- CommittingSegmentDescriptor committingSegmentDescriptor = new CommittingSegmentDescriptor(latestSegmentName,
- (offsetFactory.create(latestSegmentZKMetadata.getEndOffset()).toString()), 0);
- createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName, currentTimeMs,
- committingSegmentDescriptor, latestSegmentZKMetadata, instancePartitions, numPartitions, numReplicas);
- updateInstanceStatesForNewConsumingSegment(instanceStatesMap, latestSegmentName, newSegmentName,
- segmentAssignment, instancePartitionsMap);
+ if (newPartitionGroupSet.contains(partitionGroupId)) {
+ LLCSegmentName newLLCSegmentName = getNextLLCSegmentName(latestLLCSegmentName, currentTimeMs);
+ String newSegmentName = newLLCSegmentName.getSegmentName();
+ CommittingSegmentDescriptor committingSegmentDescriptor = new CommittingSegmentDescriptor(latestSegmentName,
+ (offsetFactory.create(latestSegmentZKMetadata.getEndOffset()).toString()), 0);
+ createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName, currentTimeMs,
+ committingSegmentDescriptor, latestSegmentZKMetadata, instancePartitions, numPartitions, numReplicas);
+ updateInstanceStatesForNewConsumingSegment(instanceStatesMap, latestSegmentName, newSegmentName,
+ segmentAssignment, instancePartitionsMap);
+ } else { // partition group reached end of life
+ LOGGER.info(
+ "PartitionGroup: {} has reached end of life. Updating ideal state for segment: {}. "
+ + "Skipping creation of new ZK metadata and new segment in ideal state",
+ partitionGroupId, latestSegmentName);
+ updateInstanceStatesForNewConsumingSegment(instanceStatesMap, latestSegmentName, null, segmentAssignment,
+ instancePartitionsMap);
+ }
}
// else, the metadata should be IN_PROGRESS, which is the right state for a consuming segment.
} else { // no replica in CONSUMING state
@@ -1081,11 +1084,14 @@ public class PinotLLCRealtimeSegmentManager {
updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null, newSegmentName, segmentAssignment,
instancePartitionsMap);
} else {
- // If we get here, that means in IdealState, the latest segment has no CONSUMING replicas, but has replicas
- // not OFFLINE. That is an unexpected state which cannot be fixed by the validation manager currently. In
- // that case, we need to either extend this part to handle the state, or prevent segments from getting into
- // such state.
- LOGGER.error("Got unexpected instance state map: {} for segment: {}", instanceStateMap, latestSegmentName);
+ if (!newPartitionGroupSet.contains(partitionGroupId)) {
+ // If we get here, that means in IdealState, the latest segment has no CONSUMING replicas, but has replicas
+ // not OFFLINE. That is an unexpected state which cannot be fixed by the validation manager currently. In
+ // that case, we need to either extend this part to handle the state, or prevent segments from getting into
+ // such state.
+ LOGGER.error("Got unexpected instance state map: {} for segment: {}", instanceStateMap, latestSegmentName);
+ }
+ // else, the partition group has reached end of life. This is an acceptable state
}
}
} else {
@@ -1127,10 +1133,7 @@ public class PinotLLCRealtimeSegmentManager {
}
// Set up new partitions if not exist
- List<PartitionGroupMetadata> currentPartitionGroupMetadataList = getCurrentPartitionGroupMetadataList(idealState);
- List<PartitionGroupInfo> partitionGroupInfoList =
- getPartitionGroupInfoList(streamConfig, currentPartitionGroupMetadataList);
- for (PartitionGroupInfo partitionGroupInfo : partitionGroupInfoList) {
+ for (PartitionGroupInfo partitionGroupInfo : newPartitionGroupInfoList) {
int partitionGroupId = partitionGroupInfo.getPartitionGroupId();
if (!latestSegmentZKMetadataMap.containsKey(partitionGroupId)) {
String newSegmentName =
@@ -1178,18 +1181,6 @@ public class PinotLLCRealtimeSegmentManager {
return System.currentTimeMillis();
}
- // fixme: investigate if this should only return active partitions (i.e. skip a shard if it has reached eol)
- // or return all unique partitions found in ideal state right from the birth of the table
- private int getNumPartitionsFromIdealState(IdealState idealState) {
- Set<String> uniquePartitions = new HashSet<>();
- for (String segmentName : idealState.getRecord().getMapFields().keySet()) {
- if (LLCSegmentName.isLowLevelConsumerSegmentName(segmentName)) {
- uniquePartitions.add(String.valueOf(new LLCSegmentName(segmentName).getPartitionGroupId()));
- }
- }
- return uniquePartitions.size();
- }
-
private int getNumReplicas(TableConfig tableConfig, InstancePartitions instancePartitions) {
if (instancePartitions.getNumReplicaGroups() == 1) {
// Non-replica-group based
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
index d611433..96604dd 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
@@ -58,7 +58,7 @@ public class RealtimeSegmentValidationManager extends ControllerPeriodicTask<Rea
LeadControllerManager leadControllerManager, PinotLLCRealtimeSegmentManager llcRealtimeSegmentManager,
ValidationMetrics validationMetrics, ControllerMetrics controllerMetrics) {
super("RealtimeSegmentValidationManager", config.getRealtimeSegmentValidationFrequencyInSeconds(),
- config.getRealtimeSegmentValidationManagerInitialDelaySeconds(), pinotHelixResourceManager,
+ 6000, pinotHelixResourceManager,
leadControllerManager, controllerMetrics);
_llcRealtimeSegmentManager = llcRealtimeSegmentManager;
_validationMetrics = validationMetrics;
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index 75c8057..0f33556 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -850,7 +850,8 @@ public class PinotLLCRealtimeSegmentManagerTest {
}
public void ensureAllPartitionsConsuming() {
- ensureAllPartitionsConsuming(_tableConfig, _streamConfig, _idealState, _numPartitions);
+ ensureAllPartitionsConsuming(_tableConfig, _streamConfig, _idealState,
+ getPartitionGroupInfoList(_streamConfig, Collections.emptyList()));
}
@Override
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index 758c656..c889193 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -240,6 +240,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
// Segment end criteria
private volatile long _consumeEndTime = 0;
private Checkpoint _finalOffset; // Used when we want to catch up to this one
+ private boolean _endOfPartitionGroup = false;
private volatile boolean _shouldStop = false;
// It takes 30s to locate controller leader, and more if there are multiple controller failures.
@@ -306,6 +307,13 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
_numRowsIndexed, _numRowsConsumed, _segmentMaxRowCount);
_stopReason = SegmentCompletionProtocol.REASON_ROW_LIMIT;
return true;
+ } else if (_endOfPartitionGroup) {
+ segmentLogger.info("Stopping consumption due to end of partitionGroup reached nRows={} numRowsIndexed={}, numRowsConsumed={}",
+ _numRowsIndexed, _numRowsConsumed, _segmentMaxRowCount);
+ _stopReason = SegmentCompletionProtocol.REASON_END_OF_PARTITION_GROUP;
+ // fixme: what happens if reached endOfPartitionGroup but numDocsIndexed == 0
+ // If we decide to only setupNewPartitions via ValidationManager, we don't need commit on endOfShard
+ return true;
}
return false;
@@ -384,6 +392,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
try {
messageBatch = _partitionGroupConsumer
.fetchMessages(_currentOffset, null, _partitionLevelStreamConfig.getFetchTimeoutMillis());
+ _endOfPartitionGroup = messageBatch.isEndOfPartitionGroup();
consecutiveErrorCount = 0;
} catch (TransientConsumerException e) {
handleTransientStreamErrors(e);
@@ -1245,9 +1254,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
// long as the partition function is not changed.
int numPartitions = columnPartitionConfig.getNumPartitions();
try {
- // fixme: get this from ideal state
- int numStreamPartitions = _streamMetadataProvider
- .getPartitionGroupInfoList(_clientId, _partitionLevelStreamConfig, Collections.emptyList(), 5000).size();
+ int numStreamPartitions = _streamMetadataProvider.fetchPartitionCount(/*maxWaitTimeMs=*/5000L);
if (numStreamPartitions != numPartitions) {
segmentLogger.warn(
"Number of stream partitions: {} does not match number of partitions in the partition config: {}, using number of stream partitions",
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
index 70d2c8a..5cbd7e6 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
@@ -125,8 +125,7 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Partiti
if (nextStartSequenceNumber == null && recordList.size() > 0) {
nextStartSequenceNumber = recordList.get(recordList.size() - 1).sequenceNumber();
}
-
- return new KinesisRecordsBatch(recordList, next.getKey());
+ return new KinesisRecordsBatch(recordList, next.getKey(), isEndOfShard);
} catch (IllegalStateException e) {
LOG.warn("Illegal state exception, connection is broken", e);
return handleException(kinesisStartCheckpoint, recordList);
@@ -158,10 +157,9 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Partiti
Map<String, String> newCheckpoint = new HashMap<>(start.getShardToStartSequenceMap());
newCheckpoint.put(newCheckpoint.keySet().iterator().next(), nextStartSequenceNumber);
- return new KinesisRecordsBatch(recordList, shardId);
+ return new KinesisRecordsBatch(recordList, shardId, false);
} else {
- return new KinesisRecordsBatch(recordList, shardId);
-
+ return new KinesisRecordsBatch(recordList, shardId, false);
}
}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java
index 631f240..fc9c4af 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java
@@ -48,7 +48,7 @@ public class KinesisConsumerFactory extends StreamConsumerFactory {
@Override
public StreamMetadataProvider createStreamMetadataProvider(String clientId) {
- return new KinesisStreamMetadataProvider(clientId, new KinesisConfig(_streamConfig));
+ return new KinesisStreamMetadataProvider(clientId, _streamConfig);
}
@Override
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisRecordsBatch.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisRecordsBatch.java
index fdc883b..b3eb626 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisRecordsBatch.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisRecordsBatch.java
@@ -32,10 +32,12 @@ import software.amazon.awssdk.services.kinesis.model.Record;
public class KinesisRecordsBatch implements MessageBatch<byte[]> {
private final List<Record> _recordList;
private final String _shardId;
+ private final boolean _endOfShard;
- public KinesisRecordsBatch(List<Record> recordList, String shardId) {
+ public KinesisRecordsBatch(List<Record> recordList, String shardId, boolean endOfShard) {
_recordList = recordList;
_shardId = shardId;
+ _endOfShard = endOfShard;
}
@Override
@@ -68,4 +70,9 @@ public class KinesisRecordsBatch implements MessageBatch<byte[]> {
public long getNextStreamMessageOffsetAtIndex(int index) {
throw new UnsupportedOperationException();
}
+
+ @Override
+ public boolean isEndOfPartitionGroup() {
+ return _endOfShard;
+ }
}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java
index 6c55a18..8968b56 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java
@@ -1,27 +1,45 @@
package org.apache.pinot.plugin.stream.kinesis;
+import com.google.common.base.Preconditions;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.stream.MessageBatch;
import org.apache.pinot.spi.stream.OffsetCriteria;
+import org.apache.pinot.spi.stream.PartitionGroupConsumer;
import org.apache.pinot.spi.stream.PartitionGroupInfo;
import org.apache.pinot.spi.stream.PartitionGroupMetadata;
import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamConsumerFactory;
+import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
import org.apache.pinot.spi.stream.StreamMetadataProvider;
import software.amazon.awssdk.services.kinesis.model.Shard;
public class KinesisStreamMetadataProvider implements StreamMetadataProvider {
private final KinesisConnectionHandler _kinesisConnectionHandler;
+ private final StreamConsumerFactory _kinesisStreamConsumerFactory;
+ private final String _clientId;
+ private final int _fetchTimeoutMs;
- public KinesisStreamMetadataProvider(String clientId, KinesisConfig kinesisConfig) {
+ public KinesisStreamMetadataProvider(String clientId, StreamConfig streamConfig) {
+ KinesisConfig kinesisConfig = new KinesisConfig(streamConfig);
_kinesisConnectionHandler = new KinesisConnectionHandler(kinesisConfig.getStream(), kinesisConfig.getAwsRegion());
+ _kinesisStreamConsumerFactory = StreamConsumerFactoryProvider.create(streamConfig);
+ _clientId = clientId;
+ _fetchTimeoutMs = streamConfig.getFetchTimeoutMillis();
}
@Override
@@ -37,7 +55,7 @@ public class KinesisStreamMetadataProvider implements StreamMetadataProvider {
@Override
public List<PartitionGroupInfo> getPartitionGroupInfoList(String clientId, StreamConfig streamConfig,
List<PartitionGroupMetadata> currentPartitionGroupsMetadata, int timeoutMillis)
- throws IOException {
+ throws IOException, TimeoutException {
Map<Integer, PartitionGroupMetadata> currentPartitionGroupMap =
currentPartitionGroupsMetadata.stream().collect(Collectors.toMap(PartitionGroupMetadata::getPartitionGroupId, p -> p));
@@ -45,10 +63,12 @@ public class KinesisStreamMetadataProvider implements StreamMetadataProvider {
List<PartitionGroupInfo> newPartitionGroupInfos = new ArrayList<>();
List<Shard> shards = _kinesisConnectionHandler.getShards();
for (Shard shard : shards) { // go over all shards
+ KinesisCheckpoint newStartCheckpoint;
+
String shardId = shard.shardId();
int partitionGroupId = getPartitionGroupIdFromShardId(shardId);
PartitionGroupMetadata currentPartitionGroupMetadata = currentPartitionGroupMap.get(partitionGroupId);
- KinesisCheckpoint newStartCheckpoint;
+
if (currentPartitionGroupMetadata != null) { // existing shard
KinesisCheckpoint currentEndCheckpoint = null;
try {
@@ -59,15 +79,18 @@ public class KinesisStreamMetadataProvider implements StreamMetadataProvider {
if (currentEndCheckpoint != null) { // end checkpoint available i.e. committing segment
String endingSequenceNumber = shard.sequenceNumberRange().endingSequenceNumber();
if (endingSequenceNumber != null) { // shard has ended
- // FIXME: this logic is not working
- // was expecting sequenceNumOfLastMsgInShard == endSequenceNumOfShard.
- // But it is much lesser than the endSeqNumOfShard
- Map<String, String> shardToSequenceNumberMap = new HashMap<>();
- shardToSequenceNumberMap.put(shardId, endingSequenceNumber);
- KinesisCheckpoint shardEndCheckpoint = new KinesisCheckpoint(shardToSequenceNumberMap);
- if (currentEndCheckpoint.compareTo(shardEndCheckpoint) >= 0) {
- // shard has ended AND we have reached the end checkpoint.
- // skip this partition group in the result
+ // check if segment has consumed all the messages already
+ PartitionGroupConsumer partitionGroupConsumer =
+ _kinesisStreamConsumerFactory.createPartitionGroupConsumer(_clientId, currentPartitionGroupMetadata);
+
+ MessageBatch messageBatch;
+ try {
+ messageBatch = partitionGroupConsumer.fetchMessages(currentEndCheckpoint, null, _fetchTimeoutMs);
+ } finally {
+ partitionGroupConsumer.close();
+ }
+ if (messageBatch.isEndOfPartitionGroup()) {
+ // shard has ended. Skip it from results
continue;
}
}
@@ -80,6 +103,7 @@ public class KinesisStreamMetadataProvider implements StreamMetadataProvider {
shardToSequenceNumberMap.put(shardId, shard.sequenceNumberRange().startingSequenceNumber());
newStartCheckpoint = new KinesisCheckpoint(shardToSequenceNumberMap);
}
+
newPartitionGroupInfos
.add(new PartitionGroupInfo(partitionGroupId, newStartCheckpoint.serialize()));
}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java
index 3052b9e..02c721f 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java
@@ -81,4 +81,11 @@ public interface MessageBatch<T> {
default StreamPartitionMsgOffset getNextStreamParitionMsgOffsetAtIndex(int index) {
return new LongMsgOffset(getNextStreamMessageOffsetAtIndex(index));
}
+
+ /**
+ * Returns true if end of the consumer detects that no more records can be read from this partition group for good
+ */
+ default boolean isEndOfPartitionGroup() {
+ return false;
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org