You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2021/01/03 01:22:08 UTC
[incubator-pinot] 02/23: Add initial implementation of Kinesis
consumer
This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch sharded_consumer_type_support_with_kinesis
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 60195b016b07b9d0305cc4e4e8e5a6e424f5b76f
Author: KKcorps <kh...@gmail.com>
AuthorDate: Thu Dec 10 19:08:41 2020 +0530
Add initial implementation of Kinesis consumer
---
.../pinot-stream-ingestion/pinot-kinesis/pom.xml | 39 ++++++++++++++++++
.../plugin/stream/kinesis/KinesisCheckpoint.java | 28 +++++++++++++
.../stream/kinesis/KinesisConnectionHandler.java | 25 ++++++++++++
.../plugin/stream/kinesis/KinesisConsumer.java | 40 ++++++++++++++++++
.../plugin/stream/kinesis/KinesisFetchResult.java | 25 ++++++++++++
.../stream/kinesis/KinesisShardMetadata.java | 47 ++++++++++++++++++++++
pinot-plugins/pinot-stream-ingestion/pom.xml | 1 +
7 files changed, 205 insertions(+)
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml
new file mode 100644
index 0000000..97e5eef
--- /dev/null
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml
@@ -0,0 +1,39 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>pinot-stream-ingestion</artifactId>
+ <groupId>org.apache.pinot</groupId>
+ <version>0.7.0-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>pinot-kinesis</artifactId>
+
+ <properties>
+ <pinot.root>${basedir}/../../..</pinot.root>
+ <phase.prop>package</phase.prop>
+ <aws.version>2.15.42</aws.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>software.amazon.awssdk</groupId>
+ <artifactId>kinesis</artifactId>
+ <version>${aws.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.pinot</groupId>
+ <artifactId>pinot-json</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.pinot</groupId>
+ <artifactId>pinot-spi</artifactId>
+ </dependency>
+ </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
new file mode 100644
index 0000000..a330e78
--- /dev/null
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
@@ -0,0 +1,28 @@
+package org.apache.pinot.plugin.stream.kinesis;
+
+import org.apache.pinot.spi.stream.v2.Checkpoint;
+import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse;
+
+
+public class KinesisCheckpoint implements Checkpoint {
+ String _shardIterator;
+
+ public KinesisCheckpoint(String shardIterator){
+ _shardIterator = shardIterator;
+ }
+
+ public String getShardIterator() {
+ return _shardIterator;
+ }
+
+ @Override
+ public byte[] serialize() {
+ return _shardIterator.getBytes();
+ }
+
+ @Override
+ public Checkpoint deserialize(byte[] blob) {
+ return new KinesisCheckpoint(new String(blob));
+ }
+
+}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java
new file mode 100644
index 0000000..7ea24c0
--- /dev/null
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java
@@ -0,0 +1,25 @@
+package org.apache.pinot.plugin.stream.kinesis;
+
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.v2.ConsumerV2;
+import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.kinesis.KinesisClient;
+import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
+import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
+
+
+public class KinesisConnectionHandler {
+ String _awsRegion = "";
+ KinesisClient _kinesisClient;
+
+ public KinesisConnectionHandler(){
+
+ }
+
+ public KinesisConnectionHandler(String awsRegion){
+ _awsRegion = awsRegion;
+ _kinesisClient = KinesisClient.builder().region(Region.of(_awsRegion)).credentialsProvider(DefaultCredentialsProvider.create()).build();
+ }
+
+}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
new file mode 100644
index 0000000..251d831
--- /dev/null
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
@@ -0,0 +1,40 @@
+package org.apache.pinot.plugin.stream.kinesis;
+
+import java.util.Collections;
+import org.apache.pinot.spi.stream.v2.Checkpoint;
+import org.apache.pinot.spi.stream.v2.ConsumerV2;
+import org.apache.pinot.spi.stream.v2.FetchResult;
+import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
+import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
+import software.amazon.awssdk.services.kinesis.model.Record;
+
+
+public class KinesisConsumer extends KinesisConnectionHandler implements ConsumerV2 {
+
+ //TODO: Fetch AWS region from Stream Config.
+ public KinesisConsumer(String awsRegion) {
+ super(awsRegion);
+ }
+
+ @Override
+ public FetchResult fetch(Checkpoint start, Checkpoint end, long timeout) {
+ KinesisCheckpoint kinesisStartCheckpoint = (KinesisCheckpoint) start;
+ KinesisCheckpoint kinesisEndCheckpoint = (KinesisCheckpoint) end;
+
+ String kinesisShardIteratorStart = kinesisStartCheckpoint.getShardIterator();
+
+ GetRecordsRequest getRecordsRequest = GetRecordsRequest.builder().shardIterator(kinesisShardIteratorStart).build();
+ GetRecordsResponse getRecordsResponse = _kinesisClient.getRecords(getRecordsRequest);
+
+ String kinesisNextShardIterator = getRecordsResponse.nextShardIterator();
+
+ if(!getRecordsResponse.hasRecords()){
+ return new KinesisFetchResult(kinesisNextShardIterator, Collections.emptyList());
+ }
+
+ KinesisFetchResult kinesisFetchResult = new KinesisFetchResult(kinesisNextShardIterator,
+ getRecordsResponse.records());
+
+ return kinesisFetchResult;
+ }
+}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java
new file mode 100644
index 0000000..5ef4e30
--- /dev/null
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java
@@ -0,0 +1,25 @@
+package org.apache.pinot.plugin.stream.kinesis;
+
+import java.util.List;
+import org.apache.pinot.spi.stream.v2.Checkpoint;
+import org.apache.pinot.spi.stream.v2.FetchResult;
+import software.amazon.awssdk.services.kinesis.model.Record;
+
+
+public class KinesisFetchResult implements FetchResult {
+ private String _nextShardIterator;
+
+ public KinesisFetchResult(String nextShardIterator, List<Record> recordList){
+ _nextShardIterator = nextShardIterator;
+ }
+
+ @Override
+ public Checkpoint getLastCheckpoint() {
+ return new KinesisCheckpoint(_nextShardIterator);
+ }
+
+ @Override
+ public byte[] getMessages() {
+ return new byte[0];
+ }
+}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java
new file mode 100644
index 0000000..07ede73
--- /dev/null
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java
@@ -0,0 +1,47 @@
+package org.apache.pinot.plugin.stream.kinesis;
+
+import org.apache.pinot.spi.stream.v2.Checkpoint;
+import org.apache.pinot.spi.stream.v2.PartitionGroupMetadata;
+import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
+import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse;
+
+
+public class KinesisShardMetadata extends KinesisConnectionHandler implements PartitionGroupMetadata {
+ Checkpoint _startCheckpoint;
+ Checkpoint _endCheckpoint;
+
+ public KinesisShardMetadata(String shardId, String streamName) {
+ GetShardIteratorResponse getShardIteratorResponse = _kinesisClient.getShardIterator(GetShardIteratorRequest.builder().shardId(shardId).streamName(streamName).build());
+ _startCheckpoint = new KinesisCheckpoint(getShardIteratorResponse.shardIterator());
+ }
+
+ @Override
+ public Checkpoint getStartCheckpoint() {
+ return _startCheckpoint;
+ }
+
+ @Override
+ public Checkpoint getEndCheckpoint() {
+ return _endCheckpoint;
+ }
+
+ @Override
+ public void setStartCheckpoint(Checkpoint startCheckpoint) {
+ _startCheckpoint = startCheckpoint;
+ }
+
+ @Override
+ public void setEndCheckpoint(Checkpoint endCheckpoint) {
+ _endCheckpoint = endCheckpoint;
+ }
+
+ @Override
+ public byte[] serialize() {
+ return new byte[0];
+ }
+
+ @Override
+ public PartitionGroupMetadata deserialize(byte[] blob) {
+ return null;
+ }
+}
diff --git a/pinot-plugins/pinot-stream-ingestion/pom.xml b/pinot-plugins/pinot-stream-ingestion/pom.xml
index 3a51626..e7b9a46 100644
--- a/pinot-plugins/pinot-stream-ingestion/pom.xml
+++ b/pinot-plugins/pinot-stream-ingestion/pom.xml
@@ -42,6 +42,7 @@
<module>pinot-kafka-base</module>
<module>pinot-kafka-0.9</module>
<module>pinot-kafka-2.0</module>
+ <module>pinot-kinesis</module>
</modules>
</project>
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org