You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2021/01/03 01:22:20 UTC

[incubator-pinot] 14/23: Handle exceptions

This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch sharded_consumer_type_support_with_kinesis
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit b0a2468f27fd6638be796657e30f65bc52fc24ed
Author: KKcorps <kh...@gmail.com>
AuthorDate: Sun Dec 20 23:41:06 2020 +0530

    Handle exceptions
---
 .../plugin/stream/kinesis/KinesisConsumer.java     | 59 +++++++++++++++++-----
 1 file changed, 47 insertions(+), 12 deletions(-)

diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
index 910b9ee..dfd6cda 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
@@ -32,12 +32,18 @@ import org.apache.pinot.spi.stream.v2.Checkpoint;
 import org.apache.pinot.spi.stream.v2.ConsumerV2;
 import org.apache.pinot.spi.stream.v2.FetchResult;
 import org.apache.pinot.spi.stream.v2.PartitionGroupMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException;
 import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
 import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
 import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
 import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse;
+import software.amazon.awssdk.services.kinesis.model.InvalidArgumentException;
 import software.amazon.awssdk.services.kinesis.model.KinesisException;
+import software.amazon.awssdk.services.kinesis.model.ProvisionedThroughputExceededException;
 import software.amazon.awssdk.services.kinesis.model.Record;
+import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
 import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
 
 //TODO: Handle exceptions and timeout
@@ -46,6 +52,7 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume
   Integer _maxRecords;
   String _shardId;
   ExecutorService _executorService;
+  private final Logger LOG = LoggerFactory.getLogger(KinesisConsumer.class);
 
   public KinesisConsumer(KinesisConfig kinesisConfig, PartitionGroupMetadata partitionGroupMetadata) {
     super(kinesisConfig.getStream(), kinesisConfig.getAwsRegion());
@@ -58,13 +65,7 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume
 
   @Override
   public KinesisFetchResult fetch(Checkpoint start, Checkpoint end, long timeout) {
-    Future<KinesisFetchResult> kinesisFetchResultFuture = _executorService.submit(new Callable<KinesisFetchResult>() {
-      @Override
-      public KinesisFetchResult call()
-          throws Exception {
-        return getResult(start, end);
-      }
-    });
+    Future<KinesisFetchResult> kinesisFetchResultFuture = _executorService.submit(() -> getResult(start, end));
 
     try {
       return kinesisFetchResultFuture.get(timeout, TimeUnit.MILLISECONDS);
@@ -74,13 +75,13 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume
   }
 
   private KinesisFetchResult getResult(Checkpoint start, Checkpoint end) {
+    List<Record> recordList = new ArrayList<>();
+    KinesisCheckpoint kinesisStartCheckpoint = (KinesisCheckpoint) start;
+
     try {
-      KinesisCheckpoint kinesisStartCheckpoint = (KinesisCheckpoint) start;
 
       String shardIterator = getShardIterator(kinesisStartCheckpoint);
 
-      List<Record> recordList = new ArrayList<>();
-
       String kinesisEndSequenceNumber = null;
 
       if (end != null) {
@@ -119,8 +120,42 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume
       KinesisFetchResult kinesisFetchResult = new KinesisFetchResult(kinesisCheckpoint, recordList);
 
       return kinesisFetchResult;
-    }catch (KinesisException e){
-      return null;
+    }catch (ProvisionedThroughputExceededException e) {
+      LOG.warn(
+          "The request rate for the stream is too high"
+      , e);
+      return handleException(kinesisStartCheckpoint, recordList);
+    }
+    catch (ExpiredIteratorException e) {
+      LOG.warn(
+          "ShardIterator expired while trying to fetch records",e
+      );
+      return handleException(kinesisStartCheckpoint, recordList);
+    }
+    catch (ResourceNotFoundException | InvalidArgumentException e) {
+      // aws errors
+      LOG.error("Encountered AWS error while attempting to fetch records", e);
+      return handleException(kinesisStartCheckpoint, recordList);
+    }
+    catch (KinesisException e) {
+      LOG.warn("Encountered unknown unrecoverable AWS exception", e);
+      throw new RuntimeException(e);
+    }
+    catch (Throwable e) {
+      // non transient errors
+      LOG.error("Unknown fetchRecords exception", e);
+      throw new RuntimeException(e);
+    }
+  }
+
+  private KinesisFetchResult handleException(KinesisCheckpoint start, List<Record> recordList) {
+    if(recordList.size() > 0){
+      String nextStartSequenceNumber = recordList.get(recordList.size() - 1).sequenceNumber();
+      KinesisCheckpoint kinesisCheckpoint = new KinesisCheckpoint(nextStartSequenceNumber);
+      return new KinesisFetchResult(kinesisCheckpoint, recordList);
+    }else{
+      KinesisCheckpoint kinesisCheckpoint = new KinesisCheckpoint(start.getSequenceNumber());
+      return new KinesisFetchResult(kinesisCheckpoint, recordList);
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org