You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2021/01/03 01:22:17 UTC

[incubator-pinot] 11/23: Add Kinesis config wrapper

This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch sharded_consumer_type_support_with_kinesis
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit b0eeec6926d43a7769e57e83b345e5a52a1c1221
Author: KKcorps <kh...@gmail.com>
AuthorDate: Sun Dec 20 11:35:18 2020 +0530

    Add Kinesis config wrapper
---
 .../pinot/plugin/stream/kinesis/KinesisConfig.java | 29 ++++++++
 .../plugin/stream/kinesis/KinesisConsumer.java     | 78 ++++++++++++----------
 .../stream/kinesis/KinesisConsumerFactory.java     | 10 ++-
 3 files changed, 74 insertions(+), 43 deletions(-)

diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java
new file mode 100644
index 0000000..01d666a
--- /dev/null
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java
@@ -0,0 +1,29 @@
+package org.apache.pinot.plugin.stream.kinesis;
+
+import org.apache.pinot.spi.stream.StreamConfig;
+
+
+public class KinesisConfig {
+  private final StreamConfig _streamConfig;
+  private static final String AWS_REGION = "aws-region";
+  private static final String MAX_RECORDS_TO_FETCH = "max-records-to-fetch";
+
+  private static final String DEFAULT_AWS_REGION = "us-central-1";
+  private static final String DEFAULT_MAX_RECORDS = "20";
+
+  public KinesisConfig(StreamConfig streamConfig) {
+    _streamConfig = streamConfig;
+  }
+
+  public String getStream(){
+    return _streamConfig.getTopicName();
+  }
+
+  public String getAwsRegion(){
+    return _streamConfig.getStreamConfigsMap().getOrDefault(AWS_REGION, DEFAULT_AWS_REGION);
+  }
+
+  public Integer maxRecordsToFetch(){
+    return Integer.parseInt(_streamConfig.getStreamConfigsMap().getOrDefault(MAX_RECORDS_TO_FETCH, DEFAULT_MAX_RECORDS));
+  }
+}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
index 7670f06..96241d4 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
@@ -30,71 +30,75 @@ import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
 import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
 import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
 import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse;
+import software.amazon.awssdk.services.kinesis.model.KinesisException;
 import software.amazon.awssdk.services.kinesis.model.Record;
 import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
 
-
+//TODO: Handle exceptions and timeout
 public class KinesisConsumer extends KinesisConnectionHandler implements ConsumerV2 {
   String _stream;
   Integer _maxRecords;
   String _shardId;
 
-  public KinesisConsumer(String stream, StreamConfig streamConfig, PartitionGroupMetadata partitionGroupMetadata) {
-    super(stream, streamConfig.getStreamConfigsMap().getOrDefault("aws-region", "global"));
-    _stream = stream;
-    _maxRecords = Integer.parseInt(streamConfig.getStreamConfigsMap().getOrDefault("maxRecords", "20"));
+  public KinesisConsumer(KinesisConfig kinesisConfig, PartitionGroupMetadata partitionGroupMetadata) {
+    super(kinesisConfig.getStream(), kinesisConfig.getAwsRegion());
+    _stream = kinesisConfig.getStream();
+    _maxRecords = kinesisConfig.maxRecordsToFetch();
     KinesisShardMetadata kinesisShardMetadata = (KinesisShardMetadata) partitionGroupMetadata;
     _shardId = kinesisShardMetadata.getShardId();
   }
 
   @Override
   public KinesisFetchResult fetch(Checkpoint start, Checkpoint end, long timeout) {
-    KinesisCheckpoint kinesisStartCheckpoint = (KinesisCheckpoint) start;
+    try {
+      KinesisCheckpoint kinesisStartCheckpoint = (KinesisCheckpoint) start;
 
-    String shardIterator = getShardIterator(kinesisStartCheckpoint);
+      String shardIterator = getShardIterator(kinesisStartCheckpoint);
 
-    List<Record> recordList = new ArrayList<>();
+      List<Record> recordList = new ArrayList<>();
 
-    String kinesisEndSequenceNumber = null;
+      String kinesisEndSequenceNumber = null;
 
-    if (end != null) {
-      KinesisCheckpoint kinesisEndCheckpoint = (KinesisCheckpoint) end;
-      kinesisEndSequenceNumber = kinesisEndCheckpoint.getSequenceNumber();
-    }
+      if (end != null) {
+        KinesisCheckpoint kinesisEndCheckpoint = (KinesisCheckpoint) end;
+        kinesisEndSequenceNumber = kinesisEndCheckpoint.getSequenceNumber();
+      }
 
-    String nextStartSequenceNumber = null;
-    Long startTimestamp = System.currentTimeMillis();
+      String nextStartSequenceNumber = null;
+      Long startTimestamp = System.currentTimeMillis();
 
-    while (shardIterator != null && !isTimedOut(startTimestamp, timeout)) {
-      GetRecordsRequest getRecordsRequest = GetRecordsRequest.builder().shardIterator(shardIterator).build();
-      GetRecordsResponse getRecordsResponse = _kinesisClient.getRecords(getRecordsRequest);
+      while (shardIterator != null && !isTimedOut(startTimestamp, timeout)) {
+        GetRecordsRequest getRecordsRequest = GetRecordsRequest.builder().shardIterator(shardIterator).build();
+        GetRecordsResponse getRecordsResponse = _kinesisClient.getRecords(getRecordsRequest);
 
-      if (getRecordsResponse.records().size() > 0) {
-        recordList.addAll(getRecordsResponse.records());
-        nextStartSequenceNumber = recordList.get(recordList.size() - 1).sequenceNumber();
+        if (getRecordsResponse.records().size() > 0) {
+          recordList.addAll(getRecordsResponse.records());
+          nextStartSequenceNumber = recordList.get(recordList.size() - 1).sequenceNumber();
 
-        if (kinesisEndSequenceNumber != null
-            && kinesisEndSequenceNumber.compareTo(recordList.get(recordList.size() - 1).sequenceNumber()) <= 0) {
-          nextStartSequenceNumber = kinesisEndSequenceNumber;
-          break;
-        }
+          if (kinesisEndSequenceNumber != null && kinesisEndSequenceNumber.compareTo(recordList.get(recordList.size() - 1).sequenceNumber()) <= 0) {
+            nextStartSequenceNumber = kinesisEndSequenceNumber;
+            break;
+          }
 
-        if (recordList.size() >= _maxRecords) {
-          break;
+          if (recordList.size() >= _maxRecords) {
+            break;
+          }
         }
-      }
 
-      shardIterator = getRecordsResponse.nextShardIterator();
-    }
+        shardIterator = getRecordsResponse.nextShardIterator();
+      }
 
-    if (nextStartSequenceNumber == null && recordList.size() > 0) {
-      nextStartSequenceNumber = recordList.get(recordList.size() - 1).sequenceNumber();
-    }
+      if (nextStartSequenceNumber == null && recordList.size() > 0) {
+        nextStartSequenceNumber = recordList.get(recordList.size() - 1).sequenceNumber();
+      }
 
-    KinesisCheckpoint kinesisCheckpoint = new KinesisCheckpoint(nextStartSequenceNumber);
-    KinesisFetchResult kinesisFetchResult = new KinesisFetchResult(kinesisCheckpoint, recordList);
+      KinesisCheckpoint kinesisCheckpoint = new KinesisCheckpoint(nextStartSequenceNumber);
+      KinesisFetchResult kinesisFetchResult = new KinesisFetchResult(kinesisCheckpoint, recordList);
 
-    return kinesisFetchResult;
+      return kinesisFetchResult;
+    }catch (KinesisException e){
+      return null;
+    }
   }
 
   private String getShardIterator(KinesisCheckpoint kinesisStartCheckpoint) {
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java
index 931fa07..da39aab 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java
@@ -28,19 +28,17 @@ import org.apache.pinot.spi.stream.v2.StreamConsumerFactoryV2;
 
 
 public class KinesisConsumerFactory implements StreamConsumerFactoryV2 {
-  private StreamConfig _streamConfig;
-  private final String AWS_REGION = "aws-region";
+  private KinesisConfig _kinesisConfig;
 
   @Override
   public void init(StreamConfig streamConfig) {
-    _streamConfig = streamConfig;
+    _kinesisConfig = new KinesisConfig(streamConfig);
   }
 
   @Override
   public PartitionGroupMetadataMap getPartitionGroupsMetadata(
       PartitionGroupMetadataMap currentPartitionGroupsMetadata) {
-    return new KinesisPartitionGroupMetadataMap(_streamConfig.getTopicName(),
-        _streamConfig.getStreamConfigsMap().getOrDefault(AWS_REGION, "global"));
+    return new KinesisPartitionGroupMetadataMap(_kinesisConfig.getStream(), _kinesisConfig.getAwsRegion());
   }
 
   @Override
@@ -50,6 +48,6 @@ public class KinesisConsumerFactory implements StreamConsumerFactoryV2 {
 
   @Override
   public ConsumerV2 createConsumer(PartitionGroupMetadata metadata) {
-    return new KinesisConsumer(_streamConfig.getTopicName(), _streamConfig, metadata);
+    return new KinesisConsumer(_kinesisConfig, metadata);
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org