You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2021/01/03 01:22:08 UTC

[incubator-pinot] 02/23: Add initial implementation of Kinesis consumer

This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch sharded_consumer_type_support_with_kinesis
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 60195b016b07b9d0305cc4e4e8e5a6e424f5b76f
Author: KKcorps <kh...@gmail.com>
AuthorDate: Thu Dec 10 19:08:41 2020 +0530

    Add initial implementation of Kinesis consumer
---
 .../pinot-stream-ingestion/pinot-kinesis/pom.xml   | 39 ++++++++++++++++++
 .../plugin/stream/kinesis/KinesisCheckpoint.java   | 28 +++++++++++++
 .../stream/kinesis/KinesisConnectionHandler.java   | 25 ++++++++++++
 .../plugin/stream/kinesis/KinesisConsumer.java     | 40 ++++++++++++++++++
 .../plugin/stream/kinesis/KinesisFetchResult.java  | 25 ++++++++++++
 .../stream/kinesis/KinesisShardMetadata.java       | 47 ++++++++++++++++++++++
 pinot-plugins/pinot-stream-ingestion/pom.xml       |  1 +
 7 files changed, 205 insertions(+)

diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml
new file mode 100644
index 0000000..97e5eef
--- /dev/null
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml
@@ -0,0 +1,39 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <parent>
+    <artifactId>pinot-stream-ingestion</artifactId>
+    <groupId>org.apache.pinot</groupId>
+    <version>0.7.0-SNAPSHOT</version>
+    <relativePath>..</relativePath>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+
+  <artifactId>pinot-kinesis</artifactId>
+
+  <properties>
+    <pinot.root>${basedir}/../../..</pinot.root>
+    <phase.prop>package</phase.prop>
+    <aws.version>2.15.42</aws.version>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>software.amazon.awssdk</groupId>
+      <artifactId>kinesis</artifactId>
+      <version>${aws.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.pinot</groupId>
+      <artifactId>pinot-json</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.pinot</groupId>
+      <artifactId>pinot-spi</artifactId>
+    </dependency>
+  </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
new file mode 100644
index 0000000..a330e78
--- /dev/null
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
@@ -0,0 +1,28 @@
+package org.apache.pinot.plugin.stream.kinesis;
+
+import org.apache.pinot.spi.stream.v2.Checkpoint;
+import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse;
+
+
+public class KinesisCheckpoint implements Checkpoint {
+  String _shardIterator;
+
+  public KinesisCheckpoint(String shardIterator){
+    _shardIterator = shardIterator;
+  }
+
+  public String getShardIterator() {
+    return _shardIterator;
+  }
+
+  @Override
+  public byte[] serialize() {
+    return _shardIterator.getBytes();
+  }
+
+  @Override
+  public Checkpoint deserialize(byte[] blob) {
+    return new KinesisCheckpoint(new String(blob));
+  }
+
+}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java
new file mode 100644
index 0000000..7ea24c0
--- /dev/null
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java
@@ -0,0 +1,25 @@
+package org.apache.pinot.plugin.stream.kinesis;
+
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.v2.ConsumerV2;
+import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.kinesis.KinesisClient;
+import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
+import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
+
+
+public class KinesisConnectionHandler {
+  String _awsRegion = "";
+  KinesisClient _kinesisClient;
+
+  public KinesisConnectionHandler(){
+
+  }
+
+  public KinesisConnectionHandler(String awsRegion){
+    _awsRegion = awsRegion;
+    _kinesisClient = KinesisClient.builder().region(Region.of(_awsRegion)).credentialsProvider(DefaultCredentialsProvider.create()).build();
+  }
+
+}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
new file mode 100644
index 0000000..251d831
--- /dev/null
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
@@ -0,0 +1,40 @@
+package org.apache.pinot.plugin.stream.kinesis;
+
+import java.util.Collections;
+import org.apache.pinot.spi.stream.v2.Checkpoint;
+import org.apache.pinot.spi.stream.v2.ConsumerV2;
+import org.apache.pinot.spi.stream.v2.FetchResult;
+import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
+import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
+import software.amazon.awssdk.services.kinesis.model.Record;
+
+
+public class KinesisConsumer extends KinesisConnectionHandler implements ConsumerV2 {
+
+  //TODO: Fetch AWS region from  Stream Config.
+  public KinesisConsumer(String awsRegion) {
+    super(awsRegion);
+  }
+
+  @Override
+  public FetchResult fetch(Checkpoint start, Checkpoint end, long timeout) {
+    KinesisCheckpoint kinesisStartCheckpoint = (KinesisCheckpoint) start;
+    KinesisCheckpoint kinesisEndCheckpoint = (KinesisCheckpoint) end;
+
+    String kinesisShardIteratorStart = kinesisStartCheckpoint.getShardIterator();
+
+    GetRecordsRequest getRecordsRequest = GetRecordsRequest.builder().shardIterator(kinesisShardIteratorStart).build();
+    GetRecordsResponse getRecordsResponse = _kinesisClient.getRecords(getRecordsRequest);
+
+    String kinesisNextShardIterator = getRecordsResponse.nextShardIterator();
+
+    if(!getRecordsResponse.hasRecords()){
+      return new KinesisFetchResult(kinesisNextShardIterator, Collections.emptyList());
+    }
+
+    KinesisFetchResult kinesisFetchResult = new KinesisFetchResult(kinesisNextShardIterator,
+        getRecordsResponse.records());
+
+    return kinesisFetchResult;
+  }
+}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java
new file mode 100644
index 0000000..5ef4e30
--- /dev/null
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java
@@ -0,0 +1,25 @@
+package org.apache.pinot.plugin.stream.kinesis;
+
+import java.util.List;
+import org.apache.pinot.spi.stream.v2.Checkpoint;
+import org.apache.pinot.spi.stream.v2.FetchResult;
+import software.amazon.awssdk.services.kinesis.model.Record;
+
+
+public class KinesisFetchResult implements FetchResult {
+  private String _nextShardIterator;
+
+  public KinesisFetchResult(String nextShardIterator, List<Record> recordList){
+     _nextShardIterator = nextShardIterator;
+  }
+
+  @Override
+  public Checkpoint getLastCheckpoint() {
+    return new KinesisCheckpoint(_nextShardIterator);
+  }
+
+  @Override
+  public byte[] getMessages() {
+    return new byte[0];
+  }
+}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java
new file mode 100644
index 0000000..07ede73
--- /dev/null
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java
@@ -0,0 +1,47 @@
+package org.apache.pinot.plugin.stream.kinesis;
+
+import org.apache.pinot.spi.stream.v2.Checkpoint;
+import org.apache.pinot.spi.stream.v2.PartitionGroupMetadata;
+import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
+import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse;
+
+
+public class KinesisShardMetadata extends KinesisConnectionHandler implements PartitionGroupMetadata {
+  Checkpoint _startCheckpoint;
+  Checkpoint _endCheckpoint;
+
+  public KinesisShardMetadata(String shardId, String streamName) {
+    GetShardIteratorResponse getShardIteratorResponse = _kinesisClient.getShardIterator(GetShardIteratorRequest.builder().shardId(shardId).streamName(streamName).build());
+    _startCheckpoint = new KinesisCheckpoint(getShardIteratorResponse.shardIterator());
+  }
+
+  @Override
+  public Checkpoint getStartCheckpoint() {
+    return _startCheckpoint;
+  }
+
+  @Override
+  public Checkpoint getEndCheckpoint() {
+    return _endCheckpoint;
+  }
+
+  @Override
+  public void setStartCheckpoint(Checkpoint startCheckpoint) {
+    _startCheckpoint = startCheckpoint;
+  }
+
+  @Override
+  public void setEndCheckpoint(Checkpoint endCheckpoint) {
+    _endCheckpoint = endCheckpoint;
+  }
+
+  @Override
+  public byte[] serialize() {
+    return new byte[0];
+  }
+
+  @Override
+  public PartitionGroupMetadata deserialize(byte[] blob) {
+    return null;
+  }
+}
diff --git a/pinot-plugins/pinot-stream-ingestion/pom.xml b/pinot-plugins/pinot-stream-ingestion/pom.xml
index 3a51626..e7b9a46 100644
--- a/pinot-plugins/pinot-stream-ingestion/pom.xml
+++ b/pinot-plugins/pinot-stream-ingestion/pom.xml
@@ -42,6 +42,7 @@
     <module>pinot-kafka-base</module>
     <module>pinot-kafka-0.9</module>
     <module>pinot-kafka-2.0</module>
+    <module>pinot-kinesis</module>
   </modules>
 
 </project>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org