You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2021/01/03 01:22:10 UTC

[incubator-pinot] 04/23: Add kinesis code to handle offsets

This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch sharded_consumer_type_support_with_kinesis
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 76cfcf1816d3d36974841081346edb8c95bf73fc
Author: KKcorps <kh...@gmail.com>
AuthorDate: Fri Dec 11 13:57:25 2020 +0530

    Add kinesis code to handle offsets
---
 .../plugin/stream/kinesis/KinesisCheckpoint.java   | 13 ++++---
 .../plugin/stream/kinesis/KinesisConsumer.java     | 42 +++++++++++++++++++---
 .../stream/kinesis/KinesisConsumerFactory.java     | 36 +++++++++++++++++++
 .../plugin/stream/kinesis/KinesisFetchResult.java  | 11 +++---
 .../kinesis/KinesisPartitionGroupMetadataMap.java  | 31 ++++++++++++++++
 .../stream/kinesis/KinesisShardMetadata.java       |  5 ++-
 6 files changed, 121 insertions(+), 17 deletions(-)

diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
index a330e78..77f790b 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
@@ -1,23 +1,22 @@
 package org.apache.pinot.plugin.stream.kinesis;
 
 import org.apache.pinot.spi.stream.v2.Checkpoint;
-import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse;
 
 
 public class KinesisCheckpoint implements Checkpoint {
-  String _shardIterator;
+  String _sequenceNumber;
 
-  public KinesisCheckpoint(String shardIterator){
-    _shardIterator = shardIterator;
+  public KinesisCheckpoint(String sequenceNumber){
+    _sequenceNumber = sequenceNumber;
   }
 
-  public String getShardIterator() {
-    return _shardIterator;
+  public String getSequenceNumber() {
+    return _sequenceNumber;
   }
 
   @Override
   public byte[] serialize() {
-    return _shardIterator.getBytes();
+    return _sequenceNumber.getBytes();
   }
 
   @Override
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
index 251d831..dc44079 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
@@ -1,19 +1,26 @@
 package org.apache.pinot.plugin.stream.kinesis;
 
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
 import org.apache.pinot.spi.stream.v2.Checkpoint;
 import org.apache.pinot.spi.stream.v2.ConsumerV2;
 import org.apache.pinot.spi.stream.v2.FetchResult;
 import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
 import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
+import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
+import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse;
 import software.amazon.awssdk.services.kinesis.model.Record;
+import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
 
 
 public class KinesisConsumer extends KinesisConnectionHandler implements ConsumerV2 {
+  String _stream;
 
   //TODO: Fetch AWS region from  Stream Config.
-  public KinesisConsumer(String awsRegion) {
+  public KinesisConsumer(String stream, String awsRegion) {
     super(awsRegion);
+    _stream = stream;
   }
 
   @Override
@@ -21,18 +28,43 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume
     KinesisCheckpoint kinesisStartCheckpoint = (KinesisCheckpoint) start;
     KinesisCheckpoint kinesisEndCheckpoint = (KinesisCheckpoint) end;
 
-    String kinesisShardIteratorStart = kinesisStartCheckpoint.getShardIterator();
+    String kinesisStartSequenceNumber = kinesisStartCheckpoint.getSequenceNumber();
+    String kinesisEndSequenceNumber = kinesisEndCheckpoint.getSequenceNumber();
 
-    GetRecordsRequest getRecordsRequest = GetRecordsRequest.builder().shardIterator(kinesisShardIteratorStart).build();
+    GetShardIteratorResponse getShardIteratorResponse = _kinesisClient.getShardIterator(GetShardIteratorRequest.builder().streamName(_stream).shardIteratorType(
+        ShardIteratorType.AFTER_SEQUENCE_NUMBER).startingSequenceNumber(kinesisStartSequenceNumber).build());
+
+    String shardIterator = getShardIteratorResponse.shardIterator();
+    GetRecordsRequest getRecordsRequest = GetRecordsRequest.builder().shardIterator(shardIterator).build();
     GetRecordsResponse getRecordsResponse = _kinesisClient.getRecords(getRecordsRequest);
 
     String kinesisNextShardIterator = getRecordsResponse.nextShardIterator();
 
+    //TODO: Get records in the loop and stop when end sequence number is reached or there is an exception.
     if(!getRecordsResponse.hasRecords()){
-      return new KinesisFetchResult(kinesisNextShardIterator, Collections.emptyList());
+      return new KinesisFetchResult(kinesisStartSequenceNumber, Collections.emptyList());
+    }
+
+    List<Record> recordList = new ArrayList<>();
+    recordList.addAll(getRecordsResponse.records());
+
+    String nextStartSequenceNumber = recordList.get(recordList.size() - 1).sequenceNumber();
+    while(kinesisNextShardIterator != null){
+      getRecordsRequest = GetRecordsRequest.builder().shardIterator(kinesisNextShardIterator).build();
+      getRecordsResponse = _kinesisClient.getRecords(getRecordsRequest);
+      if(getRecordsResponse.hasRecords()){
+        recordList.addAll(getRecordsResponse.records());
+        nextStartSequenceNumber = recordList.get(recordList.size() - 1).sequenceNumber();
+      }
+
+      if(kinesisEndSequenceNumber.compareTo(recordList.get(recordList.size() - 1).sequenceNumber()) <= 0 ) {
+        nextStartSequenceNumber = kinesisEndSequenceNumber;
+        break;
+      }
+      kinesisNextShardIterator = getRecordsResponse.nextShardIterator();
     }
 
-    KinesisFetchResult kinesisFetchResult = new KinesisFetchResult(kinesisNextShardIterator,
+    KinesisFetchResult kinesisFetchResult = new KinesisFetchResult(nextStartSequenceNumber,
         getRecordsResponse.records());
 
     return kinesisFetchResult;
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java
new file mode 100644
index 0000000..6bd1e3a
--- /dev/null
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java
@@ -0,0 +1,36 @@
+package org.apache.pinot.plugin.stream.kinesis;
+
+import java.util.Map;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.v2.ConsumerV2;
+import org.apache.pinot.spi.stream.v2.PartitionGroupMetadata;
+import org.apache.pinot.spi.stream.v2.PartitionGroupMetadataMap;
+import org.apache.pinot.spi.stream.v2.SegmentNameGenerator;
+import org.apache.pinot.spi.stream.v2.StreamConsumerFactoryV2;
+
+
+public class KinesisConsumerFactory implements StreamConsumerFactoryV2 {
+  private StreamConfig _streamConfig;
+  private final String AWS_REGION = "aws-region";
+
+  @Override
+  public void init(StreamConfig streamConfig) {
+    _streamConfig = streamConfig;
+  }
+
+  @Override
+  public PartitionGroupMetadataMap getPartitionGroupsMetadata(
+      PartitionGroupMetadataMap currentPartitionGroupsMetadata) {
+    return new KinesisPartitionGroupMetadataMap(_streamConfig.getTopicName(), _streamConfig.getStreamConfigsMap().getOrDefault(AWS_REGION, "us-central-1"));
+  }
+
+  @Override
+  public SegmentNameGenerator getSegmentNameGenerator() {
+    return null;
+  }
+
+  @Override
+  public ConsumerV2 createConsumer(PartitionGroupMetadata metadata) {
+    return new KinesisConsumer(_streamConfig.getTopicName(), _streamConfig.getStreamConfigsMap().getOrDefault(AWS_REGION, "us-central-1"));
+  }
+}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java
index 5ef4e30..dc8e764 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java
@@ -1,16 +1,19 @@
 package org.apache.pinot.plugin.stream.kinesis;
 
+import java.util.ArrayList;
 import java.util.List;
 import org.apache.pinot.spi.stream.v2.Checkpoint;
 import org.apache.pinot.spi.stream.v2.FetchResult;
 import software.amazon.awssdk.services.kinesis.model.Record;
 
 
-public class KinesisFetchResult implements FetchResult {
-  private String _nextShardIterator;
+public class KinesisFetchResult implements FetchResult<Record> {
+  private final String _nextShardIterator;
+  private final List<Record> _recordList;
 
   public KinesisFetchResult(String nextShardIterator, List<Record> recordList){
      _nextShardIterator = nextShardIterator;
+     _recordList = recordList;
   }
 
   @Override
@@ -19,7 +22,7 @@ public class KinesisFetchResult implements FetchResult {
   }
 
   @Override
-  public byte[] getMessages() {
-    return new byte[0];
+  public List<Record> getMessages() {
+    return _recordList;
   }
 }
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java
new file mode 100644
index 0000000..bc3fef2
--- /dev/null
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java
@@ -0,0 +1,31 @@
+package org.apache.pinot.plugin.stream.kinesis;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.spi.stream.v2.PartitionGroupMetadata;
+import org.apache.pinot.spi.stream.v2.PartitionGroupMetadataMap;
+import software.amazon.awssdk.services.kinesis.model.ListShardsRequest;
+import software.amazon.awssdk.services.kinesis.model.ListShardsResponse;
+import software.amazon.awssdk.services.kinesis.model.Shard;
+
+
+public class KinesisPartitionGroupMetadataMap extends KinesisConnectionHandler implements PartitionGroupMetadataMap {
+  private Map<String, PartitionGroupMetadata> _stringPartitionGroupMetadataMap = new HashMap<>();
+
+  public KinesisPartitionGroupMetadataMap(String stream, String awsRegion){
+    super(awsRegion);
+    ListShardsResponse listShardsResponse = _kinesisClient.listShards(ListShardsRequest.builder().streamName(stream).build());
+    List<Shard> shardList = listShardsResponse.shards();
+    for(Shard shard : shardList){
+      String endingSequenceNumber = shard.sequenceNumberRange().endingSequenceNumber();
+      KinesisShardMetadata shardMetadata = new KinesisShardMetadata(shard.shardId(), stream);
+      shardMetadata.setEndCheckpoint(new KinesisCheckpoint(endingSequenceNumber));
+      _stringPartitionGroupMetadataMap.put(shard.shardId(), shardMetadata);
+    }
+  }
+
+  public Map<String, PartitionGroupMetadata> getPartitionMetadata(){
+      return _stringPartitionGroupMetadataMap;
+  }
+}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java
index 07ede73..d50d821 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java
@@ -4,6 +4,7 @@ import org.apache.pinot.spi.stream.v2.Checkpoint;
 import org.apache.pinot.spi.stream.v2.PartitionGroupMetadata;
 import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
 import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse;
+import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
 
 
 public class KinesisShardMetadata extends KinesisConnectionHandler implements PartitionGroupMetadata {
@@ -11,8 +12,10 @@ public class KinesisShardMetadata extends KinesisConnectionHandler implements Pa
   Checkpoint _endCheckpoint;
 
   public KinesisShardMetadata(String shardId, String streamName) {
-    GetShardIteratorResponse getShardIteratorResponse = _kinesisClient.getShardIterator(GetShardIteratorRequest.builder().shardId(shardId).streamName(streamName).build());
+    GetShardIteratorResponse getShardIteratorResponse = _kinesisClient.getShardIterator(GetShardIteratorRequest.builder().shardId(shardId).shardIteratorType(
+        ShardIteratorType.LATEST).streamName(streamName).build());
     _startCheckpoint = new KinesisCheckpoint(getShardIteratorResponse.shardIterator());
+    _endCheckpoint = null;
   }
 
   @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org