You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2021/02/02 02:43:32 UTC
[incubator-pinot] 42/47: Cleanup, javadocs, comments
This is an automated email from the ASF dual-hosted git repository.
nehapawar pushed a commit to branch sharded_consumer_type_support_with_kinesis
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit ce1a6462084dfa05d9b8c2b57a23a9c8274725e4
Author: Neha Pawar <ne...@gmail.com>
AuthorDate: Fri Jan 8 18:28:04 2021 -0800
Cleanup, javadocs, comments
---
.../protocols/SegmentCompletionProtocol.java | 1 -
.../helix/core/PinotTableIdealStateBuilder.java | 8 +-
.../realtime/PinotLLCRealtimeSegmentManager.java | 76 ++++++++++---------
.../PinotLLCRealtimeSegmentManagerTest.java | 21 ++----
.../realtime/LLRealtimeSegmentDataManager.java | 21 +++---
.../plugin/stream/kinesis/KinesisCheckpoint.java | 10 ++-
.../pinot/plugin/stream/kinesis/KinesisConfig.java | 3 +
.../stream/kinesis/KinesisConnectionHandler.java | 17 +++--
.../plugin/stream/kinesis/KinesisConsumer.java | 20 +++--
.../stream/kinesis/KinesisConsumerFactory.java | 8 +-
.../stream/kinesis/KinesisMsgOffsetFactory.java | 4 +
.../plugin/stream/kinesis/KinesisRecordsBatch.java | 6 +-
.../kinesis/KinesisStreamMetadataProvider.java | 27 +++----
.../org/apache/pinot/spi/stream/Checkpoint.java | 5 ++
.../stream/PartitionGroupCheckpointFactory.java | 12 +--
.../pinot/spi/stream/PartitionGroupConsumer.java | 16 +++-
.../pinot/spi/stream/PartitionGroupInfo.java | 13 ++--
.../spi/stream/PartitionGroupInfoFetcher.java | 2 +-
.../pinot/spi/stream/PartitionGroupMetadata.java | 4 -
.../pinot/spi/stream/PartitionLevelConsumer.java | 6 +-
.../pinot/spi/stream/PartitionOffsetFetcher.java | 88 ----------------------
.../pinot/spi/stream/StreamConsumerFactory.java | 10 ++-
.../pinot/spi/stream/StreamMetadataProvider.java | 9 +--
23 files changed, 170 insertions(+), 217 deletions(-)
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java b/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java
index 74614df..dd1330d 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java
@@ -138,7 +138,6 @@ public class SegmentCompletionProtocol {
public static final String REASON_ROW_LIMIT = "rowLimit"; // Stop reason sent by server as max num rows reached
public static final String REASON_TIME_LIMIT = "timeLimit"; // Stop reason sent by server as max time reached
- public static final String REASON_END_OF_PARTITION_GROUP = "endOfPartitionGroup"; // Stop reason sent by server as end of partitionGroup reached
// Canned responses
public static final Response RESP_NOT_LEADER =
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
index 68bcf57..98fbd5d 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
@@ -117,6 +117,12 @@ public class PinotTableIdealStateBuilder {
pinotLLCRealtimeSegmentManager.setUpNewTable(realtimeTableConfig, idealState);
}
+ /**
+ * Fetches the list of {@link PartitionGroupInfo} for the stream, with the help of the current partitionGroups metadata
+ * This call will only skip partitions which have reached end of life and all messages from that partition have been consumed.
+ * The current partition group metadata is used to determine the offsets that have been consumed for a partition.
+ * The current partition group metadata is also used to know about existing partition groupings which should not be disturbed
+ */
public static List<PartitionGroupInfo> getPartitionGroupInfoList(StreamConfig streamConfig,
List<PartitionGroupMetadata> currentPartitionGroupMetadataList) {
PartitionGroupInfoFetcher partitionGroupInfoFetcher =
@@ -126,7 +132,7 @@ public class PinotTableIdealStateBuilder {
return partitionGroupInfoFetcher.getPartitionGroupInfoList();
} catch (Exception e) {
Exception fetcherException = partitionGroupInfoFetcher.getException();
- LOGGER.error("Could not get partition count for {}", streamConfig.getTopicName(), fetcherException);
+ LOGGER.error("Could not get partition group info for {}", streamConfig.getTopicName(), fetcherException);
throw new RuntimeException(fetcherException);
}
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 9a0786b..27d487b 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -24,7 +24,6 @@ import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -77,11 +76,11 @@ import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
import org.apache.pinot.spi.filesystem.PinotFS;
import org.apache.pinot.spi.filesystem.PinotFSFactory;
+import org.apache.pinot.spi.stream.Checkpoint;
import org.apache.pinot.spi.stream.OffsetCriteria;
import org.apache.pinot.spi.stream.PartitionGroupInfo;
import org.apache.pinot.spi.stream.PartitionGroupMetadata;
import org.apache.pinot.spi.stream.PartitionLevelStreamConfig;
-import org.apache.pinot.spi.stream.PartitionOffsetFetcher;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamConfigProperties;
import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
@@ -164,17 +163,18 @@ public class PinotLLCRealtimeSegmentManager {
/**
- * Using the ideal state and segment metadata, return a list of the current partition groups
+ * Using the ideal state and segment metadata, return a list of {@link PartitionGroupMetadata}
+ * for latest segment of each partition group
*/
public List<PartitionGroupMetadata> getCurrentPartitionGroupMetadataList(IdealState idealState) {
List<PartitionGroupMetadata> partitionGroupMetadataList = new ArrayList<>();
- // from all segment names in the ideal state, find unique groups
- Map<Integer, LLCSegmentName> groupIdToLatestSegment = new HashMap<>();
+ // From all segment names in the ideal state, find unique partition group ids and their latest segment
+ Map<Integer, LLCSegmentName> partitionGroupIdToLatestSegment = new HashMap<>();
for (String segment : idealState.getPartitionSet()) {
LLCSegmentName llcSegmentName = new LLCSegmentName(segment);
int partitionGroupId = llcSegmentName.getPartitionGroupId();
- groupIdToLatestSegment.compute(partitionGroupId, (k, latestSegment) -> {
+ partitionGroupIdToLatestSegment.compute(partitionGroupId, (k, latestSegment) -> {
if (latestSegment == null) {
return llcSegmentName;
} else {
@@ -184,8 +184,8 @@ public class PinotLLCRealtimeSegmentManager {
});
}
- // create a PartitionGroupMetadata for each latest segment
- for (Map.Entry<Integer, LLCSegmentName> entry : groupIdToLatestSegment.entrySet()) {
+ // Create a PartitionGroupMetadata for each latest segment
+ for (Map.Entry<Integer, LLCSegmentName> entry : partitionGroupIdToLatestSegment.entrySet()) {
int partitionGroupId = entry.getKey();
LLCSegmentName llcSegmentName = entry.getValue();
RealtimeSegmentZKMetadata realtimeSegmentZKMetadata = ZKMetadataProvider
@@ -258,10 +258,8 @@ public class PinotLLCRealtimeSegmentManager {
PartitionLevelStreamConfig streamConfig =
new PartitionLevelStreamConfig(tableConfig.getTableName(), IngestionConfigUtils.getStreamConfigMap(tableConfig));
InstancePartitions instancePartitions = getConsumingInstancePartitions(tableConfig);
- // get new partition groups and their metadata
List<PartitionGroupInfo> newPartitionGroupInfoList = getPartitionGroupInfoList(streamConfig, Collections.emptyList());
int numPartitionGroups = newPartitionGroupInfoList.size();
-
int numReplicas = getNumReplicas(tableConfig, instancePartitions);
SegmentAssignment segmentAssignment = SegmentAssignmentFactory.getSegmentAssignment(_helixManager, tableConfig);
@@ -699,27 +697,16 @@ public class PinotLLCRealtimeSegmentManager {
return commitTimeoutMS;
}
+ /**
+ * Fetches the latest state of the PartitionGroups for the stream
+ * If any partition has reached end of life, and all messages of that partition have been consumed by the segment, it will be skipped from the result
+ */
@VisibleForTesting
List<PartitionGroupInfo> getPartitionGroupInfoList(StreamConfig streamConfig,
List<PartitionGroupMetadata> currentPartitionGroupMetadataList) {
return PinotTableIdealStateBuilder.getPartitionGroupInfoList(streamConfig, currentPartitionGroupMetadataList);
}
- @VisibleForTesting
- StreamPartitionMsgOffset getPartitionOffset(StreamConfig streamConfig, OffsetCriteria offsetCriteria,
- int partitionGroupId) {
- PartitionOffsetFetcher partitionOffsetFetcher =
- new PartitionOffsetFetcher(offsetCriteria, partitionGroupId, streamConfig);
- try {
- RetryPolicies.fixedDelayRetryPolicy(3, 1000L).attempt(partitionOffsetFetcher);
- return partitionOffsetFetcher.getOffset();
- } catch (Exception e) {
- throw new IllegalStateException(String
- .format("Failed to fetch the offset for topic: %s, partition: %s with criteria: %s",
- streamConfig.getTopicName(), partitionGroupId, offsetCriteria), e);
- }
- }
-
/**
* An instance is reporting that it has stopped consuming a topic due to some error.
* If the segment is in CONSUMING state, mark the state of the segment to be OFFLINE in idealstate.
@@ -1052,26 +1039,26 @@ public class PinotLLCRealtimeSegmentManager {
// Create a new segment to re-consume from the previous start offset
LLCSegmentName newLLCSegmentName = getNextLLCSegmentName(latestLLCSegmentName, currentTimeMs);
- StreamPartitionMsgOffset startOffset = offsetFactory.create(latestSegmentZKMetadata.getStartOffset());
+ Checkpoint startCheckpoint = offsetFactory.create(latestSegmentZKMetadata.getStartOffset());
+ Checkpoint partitionGroupStartCheckpoint = getPartitionGroupStartCheckpoint(streamConfig, partitionGroupId);
+
// Start offset must be higher than the start offset of the stream
- StreamPartitionMsgOffset partitionStartOffset =
- getPartitionOffset(streamConfig, OffsetCriteria.SMALLEST_OFFSET_CRITERIA, partitionGroupId);
- if (partitionStartOffset.compareTo(startOffset) > 0) {
- LOGGER.error("Data lost from offset: {} to: {} for partition: {} of table: {}", startOffset,
- partitionStartOffset, partitionGroupId, realtimeTableName);
+ if (partitionGroupStartCheckpoint.compareTo(startCheckpoint) > 0) {
+ LOGGER.error("Data lost from offset: {} to: {} for partition: {} of table: {}", startCheckpoint,
+ partitionGroupStartCheckpoint, partitionGroupId, realtimeTableName);
_controllerMetrics.addMeteredTableValue(realtimeTableName, ControllerMeter.LLC_STREAM_DATA_LOSS, 1L);
- startOffset = partitionStartOffset;
+ startCheckpoint = partitionGroupStartCheckpoint;
}
CommittingSegmentDescriptor committingSegmentDescriptor =
- new CommittingSegmentDescriptor(latestSegmentName, startOffset.toString(), 0);
+ new CommittingSegmentDescriptor(latestSegmentName, startCheckpoint.toString(), 0);
createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName, currentTimeMs,
committingSegmentDescriptor, latestSegmentZKMetadata, instancePartitions, numPartitions, numReplicas);
String newSegmentName = newLLCSegmentName.getSegmentName();
updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null, newSegmentName, segmentAssignment,
instancePartitionsMap);
} else {
- if (!newPartitionGroupSet.contains(partitionGroupId)) {
+ if (newPartitionGroupSet.contains(partitionGroupId)) {
// If we get here, that means in IdealState, the latest segment has no CONSUMING replicas, but has replicas
// not OFFLINE. That is an unexpected state which cannot be fixed by the validation manager currently. In
// that case, we need to either extend this part to handle the state, or prevent segments from getting into
@@ -1134,6 +1121,27 @@ public class PinotLLCRealtimeSegmentManager {
return idealState;
}
+ private StreamPartitionMsgOffset getPartitionGroupStartCheckpoint(StreamConfig streamConfig, int partitionGroupId) {
+ Map<String, String> streamConfigMapWithSmallestOffsetCriteria = new HashMap<>(streamConfig.getStreamConfigsMap());
+ streamConfigMapWithSmallestOffsetCriteria.put(StreamConfigProperties
+ .constructStreamProperty(streamConfig.getType(), StreamConfigProperties.STREAM_CONSUMER_OFFSET_CRITERIA),
+ OffsetCriteria.SMALLEST_OFFSET_CRITERIA.getOffsetString());
+ StreamConfig smallestOffsetCriteriaStreamConfig =
+ new StreamConfig(streamConfig.getTableNameWithType(), streamConfigMapWithSmallestOffsetCriteria);
+ List<PartitionGroupInfo> smallestOffsetCriteriaPartitionGroupInfo =
+ getPartitionGroupInfoList(smallestOffsetCriteriaStreamConfig, Collections.emptyList());
+ StreamPartitionMsgOffset partitionStartOffset = null;
+ for (PartitionGroupInfo info : smallestOffsetCriteriaPartitionGroupInfo) {
+ if (info.getPartitionGroupId() == partitionGroupId) {
+ StreamPartitionMsgOffsetFactory factory =
+ StreamConsumerFactoryProvider.create(streamConfig).createStreamMsgOffsetFactory();
+ partitionStartOffset = factory.create(info.getStartCheckpoint());
+ break;
+ }
+ }
+ return partitionStartOffset;
+ }
+
private LLCSegmentName getNextLLCSegmentName(LLCSegmentName lastLLCSegmentName, long creationTimeMs) {
return new LLCSegmentName(lastLLCSegmentName.getTableName(), lastLLCSegmentName.getPartitionGroupId(),
lastLLCSegmentName.getSequenceNumber() + 1, creationTimeMs);
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index 0f33556..c19a845 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -908,32 +908,23 @@ public class PinotLLCRealtimeSegmentManagerTest {
@Override
void updateIdealStateOnSegmentCompletion(String realtimeTableName, String committingSegmentName,
- List<String> newSegmentNames, SegmentAssignment segmentAssignment,
+ String newSegmentName, SegmentAssignment segmentAssignment,
Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap) {
- updateInstanceStatesForNewConsumingSegment(_idealState.getRecord().getMapFields(), committingSegmentName,
- null, segmentAssignment, instancePartitionsMap);
- for (String segmentName : newSegmentNames) {
- updateInstanceStatesForNewConsumingSegment(_idealState.getRecord().getMapFields(), null,
- segmentName, segmentAssignment, instancePartitionsMap);
- }
+ updateInstanceStatesForNewConsumingSegment(_idealState.getRecord().getMapFields(), committingSegmentName, null,
+ segmentAssignment, instancePartitionsMap);
+ updateInstanceStatesForNewConsumingSegment(_idealState.getRecord().getMapFields(), null, newSegmentName,
+ segmentAssignment, instancePartitionsMap);
}
@Override
List<PartitionGroupInfo> getPartitionGroupInfoList(StreamConfig streamConfig,
List<PartitionGroupMetadata> currentPartitionGroupMetadataList) {
return IntStream.range(0, _numPartitions).mapToObj(i -> new PartitionGroupInfo(i,
- getPartitionOffset(streamConfig, OffsetCriteria.SMALLEST_OFFSET_CRITERIA, i).toString()))
+ PARTITION_OFFSET.toString()))
.collect(Collectors.toList());
}
@Override
- LongMsgOffset getPartitionOffset(StreamConfig streamConfig, OffsetCriteria offsetCriteria, int partitionGroupId) {
- // The criteria for this test should always be SMALLEST (for default streaming config and new added partitions)
- assertTrue(offsetCriteria.isSmallest());
- return PARTITION_OFFSET;
- }
-
- @Override
boolean isExceededMaxSegmentCompletionTime(String realtimeTableName, String segmentName, long currentTimeMs) {
return _exceededMaxSegmentCompletionTime;
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index bc49830..e6e1402 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -34,6 +34,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
@@ -240,7 +241,6 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
// Segment end criteria
private volatile long _consumeEndTime = 0;
private Checkpoint _finalOffset; // Used when we want to catch up to this one
- private boolean _endOfPartitionGroup = false;
private volatile boolean _shouldStop = false;
// It takes 30s to locate controller leader, and more if there are multiple controller failures.
@@ -307,12 +307,6 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
_numRowsIndexed, _numRowsConsumed, _segmentMaxRowCount);
_stopReason = SegmentCompletionProtocol.REASON_ROW_LIMIT;
return true;
- } else if (_endOfPartitionGroup) {
- // FIXME: handle numDocsIndexed == 0 case
- segmentLogger.info("Stopping consumption due to end of partitionGroup reached nRows={} numRowsIndexed={}, numRowsConsumed={}",
- _numRowsIndexed, _numRowsConsumed, _segmentMaxRowCount);
- _stopReason = SegmentCompletionProtocol.REASON_END_OF_PARTITION_GROUP;
- return true;
}
return false;
@@ -391,8 +385,10 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
try {
messageBatch = _partitionGroupConsumer
.fetchMessages(_currentOffset, null, _partitionLevelStreamConfig.getFetchTimeoutMillis());
- _endOfPartitionGroup = messageBatch.isEndOfPartitionGroup();
consecutiveErrorCount = 0;
+ } catch (TimeoutException e) {
+ handleTransientStreamErrors(e);
+ continue;
} catch (TransientConsumerException e) {
handleTransientStreamErrors(e);
continue;
@@ -1253,7 +1249,12 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
// long as the partition function is not changed.
int numPartitions = columnPartitionConfig.getNumPartitions();
try {
- int numStreamPartitions = _streamMetadataProvider.fetchPartitionCount(/*maxWaitTimeMs=*/5000L);
+ // TODO: currentPartitionGroupMetadata should be fetched from idealState + segmentZkMetadata, so that we get back accurate partitionGroups info
+ // However this is not an issue for Kafka, since partitionGroups never expire and every partitionGroup has a single partition
+ // Fix this before opening support for partitioning in Kinesis
+ int numStreamPartitions = _streamMetadataProvider
+ .getPartitionGroupInfoList(_clientId, _partitionLevelStreamConfig,
+ Collections.emptyList(), /*maxWaitTimeMs=*/5000).size();
if (numStreamPartitions != numPartitions) {
segmentLogger.warn(
"Number of stream partitions: {} does not match number of partitions in the partition config: {}, using number of stream partitions",
@@ -1335,7 +1336,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
closeStreamMetadataProvider();
}
segmentLogger.info("Creating new stream metadata provider, reason: {}", reason);
- _streamMetadataProvider = _streamConsumerFactory.createPartitionMetadataProvider(_clientId, _partitionGroupId);
+ _streamMetadataProvider = _streamConsumerFactory.createStreamMetadataProvider(_clientId);
}
// This should be done during commit? We may not always commit when we build a segment....
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
index 517f8c0..e1f8b05 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
@@ -22,12 +22,17 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import java.io.IOException;
import java.util.Map;
+import org.apache.pinot.spi.stream.Checkpoint;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
import org.apache.pinot.spi.utils.JsonUtils;
+/**
+ * A {@link Checkpoint} implementation for the Kinesis partition group consumption
+ * A partition group consists of 1 or more shards. The KinesisCheckpoint maintains a Map of shards to the sequenceNumber
+ */
public class KinesisCheckpoint implements StreamPartitionMsgOffset {
- private Map<String, String> _shardToStartSequenceMap;
+ private final Map<String, String> _shardToStartSequenceMap;
public KinesisCheckpoint(Map<String, String> shardToStartSequenceMap) {
_shardToStartSequenceMap = shardToStartSequenceMap;
@@ -68,6 +73,7 @@ public class KinesisCheckpoint implements StreamPartitionMsgOffset {
@Override
public int compareTo(Object o) {
- return this._shardToStartSequenceMap.values().iterator().next().compareTo(((KinesisCheckpoint) o)._shardToStartSequenceMap.values().iterator().next());
+ return this._shardToStartSequenceMap.values().iterator().next()
+ .compareTo(((KinesisCheckpoint) o)._shardToStartSequenceMap.values().iterator().next());
}
}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java
index 529f34f..fbe369f 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java
@@ -23,6 +23,9 @@ import org.apache.pinot.spi.stream.StreamConfig;
import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
+/**
+ * Kinesis stream specific config
+ */
public class KinesisConfig {
public static final String STREAM = "stream";
public static final String SHARD_ITERATOR_TYPE = "shard-iterator-type";
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java
index 4d968f6..61d065e 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java
@@ -27,14 +27,13 @@ import software.amazon.awssdk.services.kinesis.model.ListShardsResponse;
import software.amazon.awssdk.services.kinesis.model.Shard;
+/**
+ * Manages the Kinesis stream connection, given the stream name and aws region
+ */
public class KinesisConnectionHandler {
KinesisClient _kinesisClient;
- private String _stream;
- private String _awsRegion;
-
- public KinesisConnectionHandler() {
-
- }
+ private final String _stream;
+ private final String _awsRegion;
public KinesisConnectionHandler(String stream, String awsRegion) {
_stream = stream;
@@ -42,12 +41,18 @@ public class KinesisConnectionHandler {
createConnection();
}
+ /**
+ * Lists all shards of the stream
+ */
public List<Shard> getShards() {
ListShardsResponse listShardsResponse =
_kinesisClient.listShards(ListShardsRequest.builder().streamName(_stream).build());
return listShardsResponse.shards();
}
+ /**
+ * Creates a Kinesis client for the stream
+ */
public void createConnection() {
if (_kinesisClient == null) {
_kinesisClient =
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
index 5cbd7e6..9c56f95 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
@@ -43,6 +43,9 @@ import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
+/**
+ * A {@link PartitionGroupConsumer} implementation for the Kinesis stream
+ */
public class KinesisConsumer extends KinesisConnectionHandler implements PartitionGroupConsumer {
private final Logger LOG = LoggerFactory.getLogger(KinesisConsumer.class);
String _stream;
@@ -58,16 +61,19 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Partiti
_executorService = Executors.newSingleThreadExecutor();
}
+ /**
+ * Fetch records from the Kinesis stream between the start and end KinesisCheckpoint
+ */
@Override
- public KinesisRecordsBatch fetchMessages(Checkpoint start, Checkpoint end, int timeoutMs) {
+ public KinesisRecordsBatch fetchMessages(Checkpoint startCheckpoint, Checkpoint endCheckpoint, int timeoutMs) {
List<Record> recordList = new ArrayList<>();
Future<KinesisRecordsBatch> kinesisFetchResultFuture =
- _executorService.submit(() -> getResult(start, end, recordList));
+ _executorService.submit(() -> getResult(startCheckpoint, endCheckpoint, recordList));
try {
return kinesisFetchResultFuture.get(timeoutMs, TimeUnit.MILLISECONDS);
} catch (Exception e) {
- return handleException((KinesisCheckpoint) start, recordList);
+ return handleException((KinesisCheckpoint) startCheckpoint, recordList);
}
}
@@ -81,6 +87,7 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Partiti
}
//TODO: iterate upon all the shardIds in the map
+ // Okay for now, since we have assumed that every partition group contains a single shard
Map.Entry<String, String> next = kinesisStartCheckpoint.getShardToStartSequenceMap().entrySet().iterator().next();
String shardIterator = getShardIterator(next.getKey(), next.getValue());
@@ -156,14 +163,11 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Partiti
String nextStartSequenceNumber = recordList.get(recordList.size() - 1).sequenceNumber();
Map<String, String> newCheckpoint = new HashMap<>(start.getShardToStartSequenceMap());
newCheckpoint.put(newCheckpoint.keySet().iterator().next(), nextStartSequenceNumber);
-
- return new KinesisRecordsBatch(recordList, shardId, false);
- } else {
- return new KinesisRecordsBatch(recordList, shardId, false);
}
+ return new KinesisRecordsBatch(recordList, shardId, false);
}
- public String getShardIterator(String shardId, String sequenceNumber) {
+ private String getShardIterator(String shardId, String sequenceNumber) {
GetShardIteratorRequest.Builder requestBuilder =
GetShardIteratorRequest.builder().streamName(_stream).shardId(shardId).shardIteratorType(_shardIteratorType);
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java
index fc9c4af..6792fb9 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java
@@ -28,6 +28,9 @@ import org.apache.pinot.spi.stream.StreamMetadataProvider;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffsetFactory;
+/**
+ * {@link StreamConsumerFactory} implementation for the Kinesis stream
+ */
public class KinesisConsumerFactory extends StreamConsumerFactory {
@Override
@@ -43,7 +46,7 @@ public class KinesisConsumerFactory extends StreamConsumerFactory {
@Override
public StreamMetadataProvider createPartitionMetadataProvider(String clientId, int partition) {
- return null;
+ throw new UnsupportedOperationException();
}
@Override
@@ -52,7 +55,8 @@ public class KinesisConsumerFactory extends StreamConsumerFactory {
}
@Override
- public PartitionGroupConsumer createPartitionGroupConsumer(String clientId, PartitionGroupMetadata metadata) {
+ public PartitionGroupConsumer createPartitionGroupConsumer(String clientId,
+ PartitionGroupMetadata partitionGroupMetadata) {
return new KinesisConsumer(new KinesisConfig(_streamConfig));
}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisMsgOffsetFactory.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisMsgOffsetFactory.java
index f234bae..8f6b932 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisMsgOffsetFactory.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisMsgOffsetFactory.java
@@ -1,11 +1,15 @@
package org.apache.pinot.plugin.stream.kinesis;
import java.io.IOException;
+import org.apache.pinot.spi.stream.PartitionGroupCheckpointFactory;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffsetFactory;
+/**
+ * An implementation of the {@link PartitionGroupCheckpointFactory} for Kinesis stream
+ */
public class KinesisMsgOffsetFactory implements StreamPartitionMsgOffsetFactory {
KinesisConfig _kinesisConfig;
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisRecordsBatch.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisRecordsBatch.java
index b3eb626..83228ec 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisRecordsBatch.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisRecordsBatch.java
@@ -22,13 +22,14 @@ import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import javax.annotation.Nullable;
import org.apache.pinot.spi.stream.MessageBatch;
-import org.apache.pinot.spi.stream.RowMetadata;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
import software.amazon.awssdk.services.kinesis.model.Record;
+/**
+ * A {@link MessageBatch} for collecting records from the Kinesis stream
+ */
public class KinesisRecordsBatch implements MessageBatch<byte[]> {
private final List<Record> _recordList;
private final String _shardId;
@@ -49,6 +50,7 @@ public class KinesisRecordsBatch implements MessageBatch<byte[]> {
public byte[] getMessageAtIndex(int index) {
return _recordList.get(index).data().asByteArray();
}
+
@Override
public int getMessageOffsetAtIndex(int index) {
return ByteBuffer.wrap(_recordList.get(index).data().asByteArray()).arrayOffset();
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java
index 8968b56..1083969 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java
@@ -1,21 +1,14 @@
package org.apache.pinot.plugin.stream.kinesis;
-import com.google.common.base.Preconditions;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import org.apache.commons.lang3.StringUtils;
-import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.stream.MessageBatch;
import org.apache.pinot.spi.stream.OffsetCriteria;
import org.apache.pinot.spi.stream.PartitionGroupConsumer;
@@ -28,6 +21,9 @@ import org.apache.pinot.spi.stream.StreamMetadataProvider;
import software.amazon.awssdk.services.kinesis.model.Shard;
+/**
+ * A {@link StreamMetadataProvider} implementation for the Kinesis stream
+ */
public class KinesisStreamMetadataProvider implements StreamMetadataProvider {
private final KinesisConnectionHandler _kinesisConnectionHandler;
private final StreamConsumerFactory _kinesisStreamConsumerFactory;
@@ -52,17 +48,23 @@ public class KinesisStreamMetadataProvider implements StreamMetadataProvider {
throw new UnsupportedOperationException();
}
+ /**
+ * This call returns all active shards, taking into account the consumption status for those shards.
+ * PartitionGroupInfo is returned for a shard if:
+ * 1. It is a branch new shard i.e. no partitionGroupMetadata was found for it in the current list
+ * 2. It is still being actively consumed from i.e. the consuming partition has not reached the end of the shard
+ */
@Override
public List<PartitionGroupInfo> getPartitionGroupInfoList(String clientId, StreamConfig streamConfig,
List<PartitionGroupMetadata> currentPartitionGroupsMetadata, int timeoutMillis)
throws IOException, TimeoutException {
- Map<Integer, PartitionGroupMetadata> currentPartitionGroupMap =
- currentPartitionGroupsMetadata.stream().collect(Collectors.toMap(PartitionGroupMetadata::getPartitionGroupId, p -> p));
+ Map<Integer, PartitionGroupMetadata> currentPartitionGroupMap = currentPartitionGroupsMetadata.stream()
+ .collect(Collectors.toMap(PartitionGroupMetadata::getPartitionGroupId, p -> p));
List<PartitionGroupInfo> newPartitionGroupInfos = new ArrayList<>();
List<Shard> shards = _kinesisConnectionHandler.getShards();
- for (Shard shard : shards) { // go over all shards
+ for (Shard shard : shards) {
KinesisCheckpoint newStartCheckpoint;
String shardId = shard.shardId();
@@ -76,7 +78,7 @@ public class KinesisStreamMetadataProvider implements StreamMetadataProvider {
} catch (Exception e) {
// ignore. No end checkpoint yet for IN_PROGRESS segment
}
- if (currentEndCheckpoint != null) { // end checkpoint available i.e. committing segment
+ if (currentEndCheckpoint != null) { // end checkpoint available i.e. committing/committed segment
String endingSequenceNumber = shard.sequenceNumberRange().endingSequenceNumber();
if (endingSequenceNumber != null) { // shard has ended
// check if segment has consumed all the messages already
@@ -104,8 +106,7 @@ public class KinesisStreamMetadataProvider implements StreamMetadataProvider {
newStartCheckpoint = new KinesisCheckpoint(shardToSequenceNumberMap);
}
- newPartitionGroupInfos
- .add(new PartitionGroupInfo(partitionGroupId, newStartCheckpoint.serialize()));
+ newPartitionGroupInfos.add(new PartitionGroupInfo(partitionGroupId, newStartCheckpoint.serialize()));
}
return newPartitionGroupInfos;
}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/Checkpoint.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/Checkpoint.java
index bae8832..b7a9dba 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/Checkpoint.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/Checkpoint.java
@@ -18,7 +18,12 @@
*/
package org.apache.pinot.spi.stream;
+/**
+ * Keeps track of the consumption for a PartitionGroup
+ */
public interface Checkpoint extends Comparable {
+
String serialize();
+
Checkpoint deserialize(String checkpointStr);
}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupCheckpointFactory.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupCheckpointFactory.java
index 14d2f39..4bd7839 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupCheckpointFactory.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupCheckpointFactory.java
@@ -18,32 +18,22 @@
*/
package org.apache.pinot.spi.stream;
-import org.apache.pinot.spi.annotations.InterfaceStability;
-
-
/**
* An interface to be implemented by streams that are consumed using Pinot LLC consumption.
*/
-@InterfaceStability.Evolving
public interface PartitionGroupCheckpointFactory {
/**
* Initialization, called once when the factory is created.
- * @param streamConfig
*/
void init(StreamConfig streamConfig);
/**
- * Construct an offset from the string provided.
- * @param offsetStr
- * @return StreamPartitionMsgOffset
+ * Construct a checkpoint from the string provided.
*/
Checkpoint create(String offsetStr);
/**
* Construct an offset from another one provided, of the same type.
- *
- * @param other
- * @return
*/
Checkpoint create(Checkpoint other);
}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java
index b421268..72b59d7 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java
@@ -22,7 +22,21 @@ import java.io.Closeable;
import java.util.concurrent.TimeoutException;
+/**
+ * Consumer interface for consuming from a partition group of a stream
+ */
public interface PartitionGroupConsumer extends Closeable {
- MessageBatch fetchMessages(Checkpoint start, Checkpoint end, int timeout)
+
+ /**
+ * Fetch messages and offsets from the stream partition group
+ *
+ * @param startCheckpoint The offset of the first message desired, inclusive
+ * @param endCheckpoint The offset of the last message desired, exclusive, or null
+ * @param timeoutMs Timeout in milliseconds
+ * @throws java.util.concurrent.TimeoutException If the operation could not be completed within {@code timeoutMillis}
+ * milliseconds
+ * @return An iterable containing messages fetched from the stream partition and their offsets
+ */
+ MessageBatch fetchMessages(Checkpoint startCheckpoint, Checkpoint endCheckpoint, int timeoutMs)
throws TimeoutException;
}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupInfo.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupInfo.java
index 438e148..758953d 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupInfo.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupInfo.java
@@ -18,21 +18,22 @@
*/
package org.apache.pinot.spi.stream;
+/**
+ * A PartitionGroup is a group of partitions/shards that the same consumer should consume from.
+ * This class is a container for the metadata of a partition group. It consists of
+ * 1. A unique partition group id for this partition group
+ * 2. The start checkpoint to begin consumption for this partition group
+ */
public class PartitionGroupInfo {
- // fixme: Make partitionGroupId string everywhere (LLCSegmentName, StreamMetadataProvider)
private final int _partitionGroupId;
- private String _startCheckpoint;
+ private final String _startCheckpoint;
public PartitionGroupInfo(int partitionGroupId, String startCheckpoint) {
_partitionGroupId = partitionGroupId;
_startCheckpoint = startCheckpoint;
}
- public void setStartCheckpoint(String startCheckpoint) {
- _startCheckpoint = startCheckpoint;
- }
-
public int getPartitionGroupId() {
return _partitionGroupId;
}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupInfoFetcher.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupInfoFetcher.java
index f2d3f17..9c746e8 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupInfoFetcher.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupInfoFetcher.java
@@ -25,7 +25,7 @@ import org.slf4j.LoggerFactory;
/**
- * Fetches the partition count of a stream using the {@link StreamMetadataProvider}
+ * Creates a list of PartitionGroupInfo for all partition groups of the stream using the {@link StreamMetadataProvider}
*/
public class PartitionGroupInfoFetcher implements Callable<Boolean> {
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadata.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadata.java
index aaf20b6..a99a82b 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadata.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadata.java
@@ -18,12 +18,8 @@
*/
package org.apache.pinot.spi.stream;
-import java.util.List;
-
-
public class PartitionGroupMetadata {
- // fixme: Make partitionGroupId string everywhere (LLCSegmentName, StreamMetadataProvider)
private final int _partitionGroupId;
private int _sequenceNumber;
private String _startCheckpoint;
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionLevelConsumer.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionLevelConsumer.java
index 3bedc8a..3f5b230 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionLevelConsumer.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionLevelConsumer.java
@@ -63,9 +63,9 @@ public interface PartitionLevelConsumer extends Closeable, PartitionGroupConsume
return fetchMessages(startOffsetLong, endOffsetLong, timeoutMillis);
}
- default MessageBatch fetchMessages(Checkpoint startCheckpoint, Checkpoint endCheckpoint, int timeoutMillis)
+ default MessageBatch fetchMessages(Checkpoint startCheckpoint, Checkpoint endCheckpoint, int timeoutMs)
throws java.util.concurrent.TimeoutException {
- // TODO Issue 5359 remove this default implementation once all kafka consumers have migrated to use this API
- return fetchMessages((StreamPartitionMsgOffset) startCheckpoint, (StreamPartitionMsgOffset) endCheckpoint, timeoutMillis);
+ return fetchMessages((StreamPartitionMsgOffset) startCheckpoint, (StreamPartitionMsgOffset) endCheckpoint,
+ timeoutMs);
}
}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionOffsetFetcher.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionOffsetFetcher.java
deleted file mode 100644
index b92f04d..0000000
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionOffsetFetcher.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.spi.stream;
-
-import java.util.concurrent.Callable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * Fetches the partition offset for a stream given the offset criteria, using the {@link StreamMetadataProvider}
- */
-public class PartitionOffsetFetcher implements Callable<Boolean> {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(PartitionOffsetFetcher.class);
- private static final int STREAM_PARTITION_OFFSET_FETCH_TIMEOUT_MILLIS = 10000;
-
- private final String _topicName;
- private final OffsetCriteria _offsetCriteria;
- private final int _partitionGroupId;
-
- private Exception _exception = null;
- private StreamPartitionMsgOffset _offset;
- private StreamConsumerFactory _streamConsumerFactory;
- StreamConfig _streamConfig;
-
- public PartitionOffsetFetcher(final OffsetCriteria offsetCriteria, int partitionGroupId, StreamConfig streamConfig) {
- _offsetCriteria = offsetCriteria;
- _partitionGroupId = partitionGroupId;
- _streamConfig = streamConfig;
- _streamConsumerFactory = StreamConsumerFactoryProvider.create(streamConfig);
- _topicName = streamConfig.getTopicName();
- }
-
- public StreamPartitionMsgOffset getOffset() {
- return _offset;
- }
-
- public Exception getException() {
- return _exception;
- }
-
- /**
- * Callable to fetch the offset of the partition given the stream metadata and offset criteria
- * @return
- * @throws Exception
- */
- @Override
- public Boolean call()
- throws Exception {
- String clientId = PartitionOffsetFetcher.class.getSimpleName() + "-" + _topicName + "-" + _partitionGroupId;
- try (StreamMetadataProvider streamMetadataProvider = _streamConsumerFactory
- .createPartitionMetadataProvider(clientId, _partitionGroupId)) {
- _offset =
- streamMetadataProvider.fetchStreamPartitionOffset(_offsetCriteria, STREAM_PARTITION_OFFSET_FETCH_TIMEOUT_MILLIS);
- if (_exception != null) {
- LOGGER.info("Successfully retrieved offset({}) for stream topic {} partition {}", _offset, _topicName,
- _partitionGroupId);
- }
- return Boolean.TRUE;
- } catch (TransientConsumerException e) {
- LOGGER.warn("Temporary exception when fetching offset for topic {} partition {}:{}", _topicName,
- _partitionGroupId,
- e.getMessage());
- _exception = e;
- return Boolean.FALSE;
- } catch (Exception e) {
- _exception = e;
- throw e;
- }
- }
-}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java
index f993fed..ac928c5 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java
@@ -41,7 +41,6 @@ public abstract class StreamConsumerFactory {
* @param partition the partition id of the partition for which this consumer is being created
* @return
*/
- @Deprecated
public abstract PartitionLevelConsumer createPartitionLevelConsumer(String clientId, int partition);
/**
@@ -74,8 +73,11 @@ public abstract class StreamConsumerFactory {
return new LongMsgOffsetFactory();
}
- // creates a consumer which consumes from a partition group
- public PartitionGroupConsumer createPartitionGroupConsumer(String clientId, PartitionGroupMetadata metadata) {
- return createPartitionLevelConsumer(clientId, metadata.getPartitionGroupId());
+ /**
+ * Creates a partition group consumer, which can fetch messages from a partition group
+ */
+ public PartitionGroupConsumer createPartitionGroupConsumer(String clientId,
+ PartitionGroupMetadata partitionGroupMetadata) {
+ return createPartitionLevelConsumer(clientId, partitionGroupMetadata.getPartitionGroupId());
}
}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
index be2e819..cecc708 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
@@ -60,7 +60,7 @@ public interface StreamMetadataProvider extends Closeable {
}
/**
- * Fetch the partitionGroupMetadata list.
+ * Fetch the list of partition group info for the latest state of the stream
* @param currentPartitionGroupsMetadata In case of Kafka, each partition group contains a single partition.
*/
default List<PartitionGroupInfo> getPartitionGroupInfoList(String clientId, StreamConfig streamConfig,
@@ -69,14 +69,13 @@ public interface StreamMetadataProvider extends Closeable {
int partitionCount = fetchPartitionCount(timeoutMillis);
List<PartitionGroupInfo> newPartitionGroupInfoList = new ArrayList<>(partitionCount);
- // add a PartitionGroupInfo into the list foreach partition already present in current.
- // the end checkpoint is set as checkpoint
+ // Add a PartitionGroupInfo into the list foreach partition already present in current.
for (PartitionGroupMetadata currentPartitionGroupMetadata : currentPartitionGroupsMetadata) {
newPartitionGroupInfoList.add(new PartitionGroupInfo(currentPartitionGroupMetadata.getPartitionGroupId(),
currentPartitionGroupMetadata.getEndCheckpoint()));
}
- // add PartitiongroupInfo for new partitions
- // use offset criteria from stream config
+ // Add PartitionGroupInfo for new partitions
+ // Use offset criteria from stream config
StreamConsumerFactory streamConsumerFactory = StreamConsumerFactoryProvider.create(streamConfig);
for (int i = currentPartitionGroupsMetadata.size(); i < partitionCount; i++) {
StreamMetadataProvider partitionMetadataProvider =
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org