You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2021/01/03 03:50:26 UTC
[incubator-pinot] branch sharded_consumer_type_support_with_kinesis
updated: Implementation fixes
This is an automated email from the ASF dual-hosted git repository.
nehapawar pushed a commit to branch sharded_consumer_type_support_with_kinesis
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/sharded_consumer_type_support_with_kinesis by this push:
new a341b28 Implementation fixes
a341b28 is described below
commit a341b285016b1b478824de2a721e19d38186089f
Author: Neha Pawar <ne...@gmail.com>
AuthorDate: Sat Jan 2 19:49:34 2021 -0800
Implementation fixes
---
.../pinot-stream-ingestion/pinot-kinesis/pom.xml | 2 +-
.../plugin/stream/kinesis/KinesisCheckpoint.java | 47 +++++++----
.../plugin/stream/kinesis/KinesisConsumer.java | 46 +++++------
.../stream/kinesis/KinesisConsumerFactory.java | 39 +++++----
.../plugin/stream/kinesis/KinesisFetchResult.java | 44 ----------
.../kinesis/KinesisPartitionGroupMetadataMap.java | 93 ----------------------
.../stream/kinesis/KinesisShardMetadata.java | 71 -----------------
.../kinesis/KinesisStreamMetadataProvider.java | 53 ++++++++++++
.../plugin/stream/kinesis/KinesisConsumerTest.java | 18 +++--
.../pinot/spi/stream/PartitionGroupMetadata.java | 3 +
.../org/apache/pinot/spi/stream/v2/Checkpoint.java | 25 ------
.../org/apache/pinot/spi/stream/v2/ConsumerV2.java | 24 ------
.../apache/pinot/spi/stream/v2/FetchResult.java | 29 -------
.../spi/stream/v2/PartitionGroupMetadata.java | 34 --------
.../spi/stream/v2/PartitionGroupMetadataMap.java | 30 -------
.../pinot/spi/stream/v2/SegmentNameGenerator.java | 25 ------
.../spi/stream/v2/StreamConsumerFactoryV2.java | 37 ---------
pinot-tools/pom.xml | 5 ++
pom.xml | 2 +-
19 files changed, 148 insertions(+), 479 deletions(-)
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml
index 4fce169..38d4f73 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml
@@ -109,4 +109,4 @@
</dependency>
</dependencies>
-</project>
\ No newline at end of file
+</project>
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
index f3a7a49..1b8f86e 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
@@ -18,38 +18,51 @@
*/
package org.apache.pinot.plugin.stream.kinesis;
-import org.apache.pinot.spi.stream.v2.Checkpoint;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.pinot.spi.stream.Checkpoint;
+import org.apache.pinot.spi.utils.JsonUtils;
public class KinesisCheckpoint implements Checkpoint {
- String _sequenceNumber;
- Boolean _isEndOfPartition = false;
+ private Map<String, String> _shardToStartSequenceMap;
- public KinesisCheckpoint(String sequenceNumber) {
- _sequenceNumber = sequenceNumber;
+ public KinesisCheckpoint(Map<String, String> shardToStartSequenceMap) {
+ _shardToStartSequenceMap = shardToStartSequenceMap;
}
- public KinesisCheckpoint(String sequenceNumber, Boolean isEndOfPartition) {
- _sequenceNumber = sequenceNumber;
- _isEndOfPartition = isEndOfPartition;
+ public KinesisCheckpoint(String checkpointStr)
+ throws IOException {
+ _shardToStartSequenceMap = JsonUtils.stringToObject(checkpointStr, new TypeReference<Map<String, String>>() {
+ });
}
- @Override
- public boolean isEndOfPartition() {
- return _isEndOfPartition;
+ public Map<String, String> getShardToStartSequenceMap() {
+ return _shardToStartSequenceMap;
}
- public String getSequenceNumber() {
- return _sequenceNumber;
+ @Override
+ public String serialize() {
+ try {
+ return JsonUtils.objectToString(_shardToStartSequenceMap);
+ } catch (JsonProcessingException e) {
+ throw new IllegalStateException();
+ }
}
@Override
- public byte[] serialize() {
- return _sequenceNumber.getBytes();
+ public KinesisCheckpoint deserialize(String blob) {
+ try {
+ return new KinesisCheckpoint(blob);
+ } catch (IOException e) {
+ throw new IllegalStateException();
+ }
}
@Override
- public KinesisCheckpoint deserialize(byte[] blob) {
- return new KinesisCheckpoint(new String(blob));
+ public int compareTo(Object o) {
+ return this._shardToStartSequenceMap.values().iterator().next().compareTo(((KinesisCheckpoint) o)._shardToStartSequenceMap.values().iterator().next());
}
}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
index fb414f0..8a24208 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
@@ -19,14 +19,16 @@
package org.apache.pinot.plugin.stream.kinesis;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
-import org.apache.pinot.spi.stream.v2.Checkpoint;
-import org.apache.pinot.spi.stream.v2.ConsumerV2;
-import org.apache.pinot.spi.stream.v2.PartitionGroupMetadata;
+import org.apache.pinot.spi.stream.Checkpoint;
+import org.apache.pinot.spi.stream.PartitionGroupConsumer;
+import org.apache.pinot.spi.stream.PartitionGroupMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException;
@@ -41,28 +43,25 @@ import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
-public class KinesisConsumer extends KinesisConnectionHandler implements ConsumerV2 {
+public class KinesisConsumer extends KinesisConnectionHandler implements PartitionGroupConsumer {
private final Logger LOG = LoggerFactory.getLogger(KinesisConsumer.class);
String _stream;
Integer _maxRecords;
- String _shardId;
ExecutorService _executorService;
ShardIteratorType _shardIteratorType;
- public KinesisConsumer(KinesisConfig kinesisConfig, PartitionGroupMetadata partitionGroupMetadata) {
+ public KinesisConsumer(KinesisConfig kinesisConfig) {
super(kinesisConfig.getStream(), kinesisConfig.getAwsRegion());
_stream = kinesisConfig.getStream();
_maxRecords = kinesisConfig.maxRecordsToFetch();
- KinesisShardMetadata kinesisShardMetadata = (KinesisShardMetadata) partitionGroupMetadata;
- _shardId = kinesisShardMetadata.getShardId();
_shardIteratorType = kinesisConfig.getShardIteratorType();
_executorService = Executors.newSingleThreadExecutor();
}
@Override
- public KinesisFetchResult fetch(Checkpoint start, Checkpoint end, long timeout) {
+ public KinesisRecordsBatch fetchMessages(Checkpoint start, Checkpoint end, int timeout) {
List<Record> recordList = new ArrayList<>();
- Future<KinesisFetchResult> kinesisFetchResultFuture =
+ Future<KinesisRecordsBatch> kinesisFetchResultFuture =
_executorService.submit(() -> getResult(start, end, recordList));
try {
@@ -72,7 +71,7 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume
}
}
- private KinesisFetchResult getResult(Checkpoint start, Checkpoint end, List<Record> recordList) {
+ private KinesisRecordsBatch getResult(Checkpoint start, Checkpoint end, List<Record> recordList) {
KinesisCheckpoint kinesisStartCheckpoint = (KinesisCheckpoint) start;
try {
@@ -81,13 +80,14 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume
createConnection();
}
- String shardIterator = getShardIterator(kinesisStartCheckpoint.getSequenceNumber());
+ Map.Entry<String, String> next = kinesisStartCheckpoint.getShardToStartSequenceMap().entrySet().iterator().next();
+ String shardIterator = getShardIterator(next.getKey(), next.getValue());
String kinesisEndSequenceNumber = null;
if (end != null) {
KinesisCheckpoint kinesisEndCheckpoint = (KinesisCheckpoint) end;
- kinesisEndSequenceNumber = kinesisEndCheckpoint.getSequenceNumber();
+ kinesisEndSequenceNumber = kinesisEndCheckpoint.getShardToStartSequenceMap().values().iterator().next();
}
String nextStartSequenceNumber = null;
@@ -125,10 +125,7 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume
nextStartSequenceNumber = recordList.get(recordList.size() - 1).sequenceNumber();
}
- KinesisCheckpoint kinesisCheckpoint = new KinesisCheckpoint(nextStartSequenceNumber, isEndOfShard);
- KinesisFetchResult kinesisFetchResult = new KinesisFetchResult(kinesisCheckpoint, recordList);
-
- return kinesisFetchResult;
+ return new KinesisRecordsBatch(recordList);
} catch (ProvisionedThroughputExceededException e) {
LOG.warn("The request rate for the stream is too high", e);
return handleException(kinesisStartCheckpoint, recordList);
@@ -149,21 +146,22 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume
}
}
- private KinesisFetchResult handleException(KinesisCheckpoint start, List<Record> recordList) {
+ private KinesisRecordsBatch handleException(KinesisCheckpoint start, List<Record> recordList) {
if (recordList.size() > 0) {
String nextStartSequenceNumber = recordList.get(recordList.size() - 1).sequenceNumber();
- KinesisCheckpoint kinesisCheckpoint = new KinesisCheckpoint(nextStartSequenceNumber);
- return new KinesisFetchResult(kinesisCheckpoint, recordList);
+ Map<String, String> newCheckpoint = new HashMap<>(start.getShardToStartSequenceMap());
+ newCheckpoint.put(newCheckpoint.keySet().iterator().next(), nextStartSequenceNumber);
+ return new KinesisRecordsBatch(recordList);
} else {
- KinesisCheckpoint kinesisCheckpoint = new KinesisCheckpoint(start.getSequenceNumber());
- return new KinesisFetchResult(kinesisCheckpoint, recordList);
+ return new KinesisRecordsBatch(recordList);
+
}
}
- public String getShardIterator(String sequenceNumber) {
+ public String getShardIterator(String shardId, String sequenceNumber) {
GetShardIteratorRequest.Builder requestBuilder =
- GetShardIteratorRequest.builder().streamName(_stream).shardId(_shardId).shardIteratorType(_shardIteratorType);
+ GetShardIteratorRequest.builder().streamName(_stream).shardId(shardId).shardIteratorType(_shardIteratorType);
if (sequenceNumber != null) {
requestBuilder = requestBuilder.startingSequenceNumber(sequenceNumber);
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java
index 9bb4d0c..aa90812 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java
@@ -18,36 +18,41 @@
*/
package org.apache.pinot.plugin.stream.kinesis;
-import org.apache.pinot.spi.stream.StreamConfig;
-import org.apache.pinot.spi.stream.v2.ConsumerV2;
-import org.apache.pinot.spi.stream.v2.PartitionGroupMetadata;
-import org.apache.pinot.spi.stream.v2.PartitionGroupMetadataMap;
-import org.apache.pinot.spi.stream.v2.SegmentNameGenerator;
-import org.apache.pinot.spi.stream.v2.StreamConsumerFactoryV2;
+import java.util.Set;
+import org.apache.pinot.spi.stream.PartitionGroupConsumer;
+import org.apache.pinot.spi.stream.PartitionGroupMetadata;
+import org.apache.pinot.spi.stream.PartitionLevelConsumer;
+import org.apache.pinot.spi.stream.StreamConsumerFactory;
+import org.apache.pinot.spi.stream.StreamLevelConsumer;
+import org.apache.pinot.spi.stream.StreamMetadataProvider;
-public class KinesisConsumerFactory implements StreamConsumerFactoryV2 {
- private KinesisConfig _kinesisConfig;
+public class KinesisConsumerFactory extends StreamConsumerFactory {
@Override
- public void init(StreamConfig streamConfig) {
- _kinesisConfig = new KinesisConfig(streamConfig);
+ public PartitionLevelConsumer createPartitionLevelConsumer(String clientId, int partition) {
+ throw new UnsupportedOperationException();
}
@Override
- public PartitionGroupMetadataMap getPartitionGroupsMetadata(
- PartitionGroupMetadataMap currentPartitionGroupsMetadata) {
- return new KinesisPartitionGroupMetadataMap(_kinesisConfig.getStream(), _kinesisConfig.getAwsRegion(),
- currentPartitionGroupsMetadata);
+ public StreamLevelConsumer createStreamLevelConsumer(String clientId, String tableName, Set<String> fieldsToRead,
+ String groupId) {
+ throw new UnsupportedOperationException();
}
@Override
- public SegmentNameGenerator getSegmentNameGenerator() {
+ public StreamMetadataProvider createPartitionMetadataProvider(String clientId, int partition) {
return null;
}
@Override
- public ConsumerV2 createConsumer(PartitionGroupMetadata metadata) {
- return new KinesisConsumer(_kinesisConfig, metadata);
+ public StreamMetadataProvider createStreamMetadataProvider(String clientId) {
+ return new KinesisStreamMetadataProvider(clientId, new KinesisConfig(_streamConfig));
}
+
+ @Override
+ public PartitionGroupConsumer createPartitionGroupConsumer(String clientId, PartitionGroupMetadata metadata) {
+ return new KinesisConsumer(new KinesisConfig(_streamConfig));
+ }
+
}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java
deleted file mode 100644
index 8da3d2e..0000000
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.plugin.stream.kinesis;
-
-import java.util.List;
-import org.apache.pinot.spi.stream.v2.FetchResult;
-import software.amazon.awssdk.services.kinesis.model.Record;
-
-
-public class KinesisFetchResult implements FetchResult<byte[]> {
- private final KinesisCheckpoint _kinesisCheckpoint;
- private final List<Record> _recordList;
-
- public KinesisFetchResult(KinesisCheckpoint kinesisCheckpoint, List<Record> recordList) {
- _kinesisCheckpoint = kinesisCheckpoint;
- _recordList = recordList;
- }
-
- @Override
- public KinesisCheckpoint getLastCheckpoint() {
- return _kinesisCheckpoint;
- }
-
- @Override
- public KinesisRecordsBatch getMessages() {
- return new KinesisRecordsBatch(_recordList);
- }
-}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java
deleted file mode 100644
index f96533f..0000000
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.plugin.stream.kinesis;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.pinot.spi.stream.v2.PartitionGroupMetadata;
-import org.apache.pinot.spi.stream.v2.PartitionGroupMetadataMap;
-import software.amazon.awssdk.services.kinesis.model.Shard;
-
-
-public class KinesisPartitionGroupMetadataMap extends KinesisConnectionHandler implements PartitionGroupMetadataMap {
- private final List<PartitionGroupMetadata> _stringPartitionGroupMetadataIndex = new ArrayList<>();
-
- public KinesisPartitionGroupMetadataMap(String stream, String awsRegion,
- PartitionGroupMetadataMap currentPartitionGroupMetadataMap) {
- //TODO: Handle child shards. Do not consume data from child shard unless parent is finished.
- //Return metadata only for shards in current metadata
- super(stream, awsRegion);
- KinesisPartitionGroupMetadataMap currentPartitionMeta =
- (KinesisPartitionGroupMetadataMap) currentPartitionGroupMetadataMap;
- List<PartitionGroupMetadata> currentMetaList = currentPartitionMeta.getMetadataList();
-
- List<Shard> shardList = getShards();
-
- Map<String, PartitionGroupMetadata> currentMetadataMap = new HashMap<>();
- for (PartitionGroupMetadata partitionGroupMetadata : currentMetaList) {
- KinesisShardMetadata kinesisShardMetadata = (KinesisShardMetadata) partitionGroupMetadata;
- currentMetadataMap.put(kinesisShardMetadata.getShardId(), kinesisShardMetadata);
- }
-
- for (Shard shard : shardList) {
- if (currentMetadataMap.containsKey(shard.shardId())) {
- //Return existing shard metadata
- _stringPartitionGroupMetadataIndex.add(currentMetadataMap.get(shard.shardId()));
- } else if (currentMetadataMap.containsKey(shard.parentShardId())) {
- KinesisShardMetadata kinesisShardMetadata =
- (KinesisShardMetadata) currentMetadataMap.get(shard.parentShardId());
- if (isProcessingFinished(kinesisShardMetadata)) {
- //Add child shards for processing since parent has finished
- appendShardMetadata(stream, awsRegion, shard);
- } else {
- //Do not process this shard unless the parent shard is finished or expired
- }
- } else {
- //This is a new shard with no parents. We can start processing this shard.
- appendShardMetadata(stream, awsRegion, shard);
- }
- }
- }
-
- private boolean isProcessingFinished(KinesisShardMetadata kinesisShardMetadata) {
- return kinesisShardMetadata.getEndCheckpoint().getSequenceNumber() != null && kinesisShardMetadata
- .getStartCheckpoint().getSequenceNumber().equals(kinesisShardMetadata.getEndCheckpoint().getSequenceNumber());
- }
-
- private void appendShardMetadata(String stream, String awsRegion, Shard shard) {
- String startSequenceNumber = shard.sequenceNumberRange().startingSequenceNumber();
- String endingSequenceNumber = shard.sequenceNumberRange().endingSequenceNumber();
- KinesisShardMetadata shardMetadata = new KinesisShardMetadata(shard.shardId(), stream, awsRegion);
- shardMetadata.setStartCheckpoint(new KinesisCheckpoint(startSequenceNumber));
- shardMetadata.setEndCheckpoint(new KinesisCheckpoint(endingSequenceNumber));
- _stringPartitionGroupMetadataIndex.add(shardMetadata);
- }
-
- @Override
- public List<PartitionGroupMetadata> getMetadataList() {
- return _stringPartitionGroupMetadataIndex;
- }
-
- @Override
- public PartitionGroupMetadata getPartitionGroupMetadata(int index) {
- return _stringPartitionGroupMetadataIndex.get(index);
- }
-}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java
deleted file mode 100644
index e24121b..0000000
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.plugin.stream.kinesis;
-
-import org.apache.pinot.spi.stream.v2.Checkpoint;
-import org.apache.pinot.spi.stream.v2.PartitionGroupMetadata;
-
-
-//TODO: Implement shardId as Array and have unique id
-public class KinesisShardMetadata extends KinesisConnectionHandler implements PartitionGroupMetadata {
- String _shardId;
- KinesisCheckpoint _startCheckpoint;
- KinesisCheckpoint _endCheckpoint;
-
- public KinesisShardMetadata(String shardId, String streamName, String awsRegion) {
- super(streamName, awsRegion);
- _startCheckpoint = null;
- _endCheckpoint = null;
- _shardId = shardId;
- }
-
- public String getShardId() {
- return _shardId;
- }
-
- @Override
- public KinesisCheckpoint getStartCheckpoint() {
- return _startCheckpoint;
- }
-
- @Override
- public void setStartCheckpoint(Checkpoint startCheckpoint) {
- _startCheckpoint = (KinesisCheckpoint) startCheckpoint;
- }
-
- @Override
- public KinesisCheckpoint getEndCheckpoint() {
- return _endCheckpoint;
- }
-
- @Override
- public void setEndCheckpoint(Checkpoint endCheckpoint) {
- _endCheckpoint = (KinesisCheckpoint) endCheckpoint;
- }
-
- @Override
- public byte[] serialize() {
- return new byte[0];
- }
-
- @Override
- public KinesisShardMetadata deserialize(byte[] blob) {
- return null;
- }
-}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java
new file mode 100644
index 0000000..ba9d2b6
--- /dev/null
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java
@@ -0,0 +1,53 @@
+package org.apache.pinot.plugin.stream.kinesis;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+import javax.annotation.Nonnull;
+import org.apache.pinot.spi.stream.OffsetCriteria;
+import org.apache.pinot.spi.stream.PartitionGroupInfo;
+import org.apache.pinot.spi.stream.PartitionGroupMetadata;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamMetadataProvider;
+import software.amazon.awssdk.services.kinesis.model.Shard;
+
+
+public class KinesisStreamMetadataProvider implements StreamMetadataProvider {
+ private final KinesisConfig _kinesisConfig;
+ private KinesisConnectionHandler _kinesisConnectionHandler;
+
+ public KinesisStreamMetadataProvider(String clientId, KinesisConfig kinesisConfig) {
+ _kinesisConfig = kinesisConfig;
+ _kinesisConnectionHandler = new KinesisConnectionHandler(kinesisConfig.getStream(), kinesisConfig.getAwsRegion());
+ }
+
+ @Override
+ public int fetchPartitionCount(long timeoutMillis) {
+ return 0;
+ }
+
+ @Override
+ public long fetchPartitionOffset(@Nonnull OffsetCriteria offsetCriteria, long timeoutMillis)
+ throws TimeoutException {
+ return 0;
+ }
+
+ @Override
+ public List<PartitionGroupInfo> getPartitionGroupInfoList(String clientId, StreamConfig streamConfig,
+ List<PartitionGroupMetadata> currentPartitionGroupsMetadata, int timeoutMillis)
+ throws TimeoutException {
+ List<PartitionGroupInfo> partitionGroupInfos = new ArrayList<>();
+ List<Shard> shards = _kinesisConnectionHandler.getShards();
+ for (Shard shard : shards) {
+ partitionGroupInfos.add(new PartitionGroupInfo(shard.shardId().hashCode(), shard.sequenceNumberRange().startingSequenceNumber()));
+ }
+ return partitionGroupInfos;
+ }
+
+ @Override
+ public void close()
+ throws IOException {
+
+ }
+}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerTest.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerTest.java
index f853875..57baae9 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerTest.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerTest.java
@@ -17,9 +17,11 @@ package org.apache.pinot.plugin.stream.kinesis; /**
* under the License.
*/
+import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.pinot.spi.stream.PartitionGroupMetadata;
import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
@@ -29,7 +31,8 @@ public class KinesisConsumerTest {
private static final String STREAM_NAME = "kinesis-test";
private static final String AWS_REGION = "us-west-2";
- public static void main(String[] args) {
+ public static void main(String[] args)
+ throws IOException {
Map<String, String> props = new HashMap<>();
props.put(KinesisConfig.STREAM, STREAM_NAME);
props.put(KinesisConfig.AWS_REGION, AWS_REGION);
@@ -42,18 +45,19 @@ public class KinesisConsumerTest {
System.out.println("SHARD: " + shard.shardId());
KinesisConsumer kinesisConsumer =
- new KinesisConsumer(kinesisConfig, new KinesisShardMetadata(shard.shardId(), STREAM_NAME, AWS_REGION));
+ new KinesisConsumer(kinesisConfig);
System.out.println(
"Kinesis Checkpoint Range: < " + shard.sequenceNumberRange().startingSequenceNumber() + ", " + shard
.sequenceNumberRange().endingSequenceNumber() + " >");
- KinesisCheckpoint kinesisCheckpoint = new KinesisCheckpoint(shard.sequenceNumberRange().startingSequenceNumber());
- KinesisFetchResult fetchResult = kinesisConsumer.fetch(kinesisCheckpoint, null, 60 * 1000L);
- KinesisRecordsBatch list = fetchResult.getMessages();
- int n = list.getMessageCount();
+ Map<String, String> shardIdToSeqNumMap = new HashMap<>();
+ shardIdToSeqNumMap.put(shard.shardId(), shard.sequenceNumberRange().startingSequenceNumber());
+ KinesisCheckpoint kinesisCheckpoint = new KinesisCheckpoint(shardIdToSeqNumMap);
+ KinesisRecordsBatch kinesisRecordsBatch = kinesisConsumer.fetchMessages(kinesisCheckpoint, null, 60 * 1000);
+ int n = kinesisRecordsBatch.getMessageCount();
System.out.println("Found " + n + " messages ");
for (int i = 0; i < n; i++) {
- System.out.println("SEQ-NO: " + list.getMessageOffsetAtIndex(i) + ", DATA: " + list.getMessageAtIndex(i));
+ System.out.println("SEQ-NO: " + kinesisRecordsBatch.getMessageOffsetAtIndex(i) + ", DATA: " + kinesisRecordsBatch.getMessageAtIndex(i));
}
kinesisConsumer.close();
}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadata.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadata.java
index f662d99..7c4e3ef 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadata.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadata.java
@@ -18,6 +18,9 @@
*/
package org.apache.pinot.spi.stream;
+import java.util.List;
+
+
public class PartitionGroupMetadata {
// fixme: Make partitionGroupId string everywhere (LLCSegmentName, StreamMetadataProvider)
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/Checkpoint.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/Checkpoint.java
deleted file mode 100644
index 0195684..0000000
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/Checkpoint.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.spi.stream.v2;
-
-public interface Checkpoint {
- boolean isEndOfPartition();
- byte[] serialize();
- Checkpoint deserialize(byte[] blob);
-}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/ConsumerV2.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/ConsumerV2.java
deleted file mode 100644
index 48b387d..0000000
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/ConsumerV2.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.spi.stream.v2;
-
-public interface ConsumerV2 {
- FetchResult fetch(Checkpoint start, Checkpoint end, long timeout);
-}
-
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/FetchResult.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/FetchResult.java
deleted file mode 100644
index 2188ac9..0000000
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/FetchResult.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.spi.stream.v2;
-
-import java.util.List;
-import org.apache.pinot.spi.stream.MessageBatch;
-
-
-public interface FetchResult<T> {
- Checkpoint getLastCheckpoint();
- MessageBatch<T> getMessages();
-}
-
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/PartitionGroupMetadata.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/PartitionGroupMetadata.java
deleted file mode 100644
index d7c44d7..0000000
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/PartitionGroupMetadata.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.spi.stream.v2;
-
-public interface PartitionGroupMetadata {
- Checkpoint getStartCheckpoint(); // similar to getStartOffset
-
- Checkpoint getEndCheckpoint(); // similar to getEndOffset
-
- void setStartCheckpoint(Checkpoint startCheckpoint);
-
- void setEndCheckpoint(Checkpoint endCheckpoint);
-
- byte[] serialize();
-
- PartitionGroupMetadata deserialize(byte[] blob);
-}
-
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/PartitionGroupMetadataMap.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/PartitionGroupMetadataMap.java
deleted file mode 100644
index ba37767..0000000
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/PartitionGroupMetadataMap.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.spi.stream.v2;
-
-import java.util.List;
-
-
-public interface PartitionGroupMetadataMap {
-
- List<PartitionGroupMetadata> getMetadataList();
-
- PartitionGroupMetadata getPartitionGroupMetadata(int index);
-
-}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/SegmentNameGenerator.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/SegmentNameGenerator.java
deleted file mode 100644
index 6e65b25..0000000
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/SegmentNameGenerator.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.spi.stream.v2;
-
-public interface SegmentNameGenerator {
- // generates a unique name for a partition group based on the metadata
- String generateSegmentName(PartitionGroupMetadata metadata);
-
-}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/StreamConsumerFactoryV2.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/StreamConsumerFactoryV2.java
deleted file mode 100644
index 9e671aa..0000000
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/StreamConsumerFactoryV2.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.spi.stream.v2;
-
-import java.util.Map;
-import org.apache.pinot.spi.stream.StreamConfig;
-
-
-public interface StreamConsumerFactoryV2 {
- void init(StreamConfig streamConfig);
-
- // takes the current state of partition groups (groupings of shards, the state of the consumption) and creates the new state
- PartitionGroupMetadataMap getPartitionGroupsMetadata(PartitionGroupMetadataMap currentPartitionGroupsMetadata);
-
- // creates a name generator which generates segment name for a partition group
- SegmentNameGenerator getSegmentNameGenerator();
-
- // creates a consumer which consumes from a partition group
- ConsumerV2 createConsumer(PartitionGroupMetadata metadata);
-
-}
diff --git a/pinot-tools/pom.xml b/pinot-tools/pom.xml
index cf3fe70..e3c21ef 100644
--- a/pinot-tools/pom.xml
+++ b/pinot-tools/pom.xml
@@ -87,6 +87,11 @@
</dependency>
<dependency>
<groupId>org.apache.pinot</groupId>
+ <artifactId>pinot-kinesis</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.pinot</groupId>
<artifactId>pinot-kafka-${kafka.version}</artifactId>
<version>${project.version}</version>
<scope>runtime</scope>
diff --git a/pom.xml b/pom.xml
index 79dabf7..237b5c9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -117,7 +117,7 @@
<parquet.version>1.8.0</parquet.version>
<helix.version>0.9.8</helix.version>
<zkclient.version>0.7</zkclient.version>
- <jackson.version>2.9.8</jackson.version>
+ <jackson.version>2.12.0</jackson.version>
<async-http-client.version>1.9.21</async-http-client.version>
<jersey.version>2.28</jersey.version>
<grizzly.version>2.4.4</grizzly.version>
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org