You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2021/01/03 01:22:20 UTC
[incubator-pinot] 14/23: Handle exceptions
This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch sharded_consumer_type_support_with_kinesis
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit b0a2468f27fd6638be796657e30f65bc52fc24ed
Author: KKcorps <kh...@gmail.com>
AuthorDate: Sun Dec 20 23:41:06 2020 +0530
Handle exceptions
---
.../plugin/stream/kinesis/KinesisConsumer.java | 59 +++++++++++++++++-----
1 file changed, 47 insertions(+), 12 deletions(-)
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
index 910b9ee..dfd6cda 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
@@ -32,12 +32,18 @@ import org.apache.pinot.spi.stream.v2.Checkpoint;
import org.apache.pinot.spi.stream.v2.ConsumerV2;
import org.apache.pinot.spi.stream.v2.FetchResult;
import org.apache.pinot.spi.stream.v2.PartitionGroupMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException;
import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse;
+import software.amazon.awssdk.services.kinesis.model.InvalidArgumentException;
import software.amazon.awssdk.services.kinesis.model.KinesisException;
+import software.amazon.awssdk.services.kinesis.model.ProvisionedThroughputExceededException;
import software.amazon.awssdk.services.kinesis.model.Record;
+import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
//TODO: Handle exceptions and timeout
@@ -46,6 +52,7 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume
Integer _maxRecords;
String _shardId;
ExecutorService _executorService;
+ private final Logger LOG = LoggerFactory.getLogger(KinesisConsumer.class);
public KinesisConsumer(KinesisConfig kinesisConfig, PartitionGroupMetadata partitionGroupMetadata) {
super(kinesisConfig.getStream(), kinesisConfig.getAwsRegion());
@@ -58,13 +65,7 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume
@Override
public KinesisFetchResult fetch(Checkpoint start, Checkpoint end, long timeout) {
- Future<KinesisFetchResult> kinesisFetchResultFuture = _executorService.submit(new Callable<KinesisFetchResult>() {
- @Override
- public KinesisFetchResult call()
- throws Exception {
- return getResult(start, end);
- }
- });
+ Future<KinesisFetchResult> kinesisFetchResultFuture = _executorService.submit(() -> getResult(start, end));
try {
return kinesisFetchResultFuture.get(timeout, TimeUnit.MILLISECONDS);
@@ -74,13 +75,13 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume
}
private KinesisFetchResult getResult(Checkpoint start, Checkpoint end) {
+ List<Record> recordList = new ArrayList<>();
+ KinesisCheckpoint kinesisStartCheckpoint = (KinesisCheckpoint) start;
+
try {
- KinesisCheckpoint kinesisStartCheckpoint = (KinesisCheckpoint) start;
String shardIterator = getShardIterator(kinesisStartCheckpoint);
- List<Record> recordList = new ArrayList<>();
-
String kinesisEndSequenceNumber = null;
if (end != null) {
@@ -119,8 +120,42 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume
KinesisFetchResult kinesisFetchResult = new KinesisFetchResult(kinesisCheckpoint, recordList);
return kinesisFetchResult;
- }catch (KinesisException e){
- return null;
+ }catch (ProvisionedThroughputExceededException e) {
+ LOG.warn(
+ "The request rate for the stream is too high"
+ , e);
+ return handleException(kinesisStartCheckpoint, recordList);
+ }
+ catch (ExpiredIteratorException e) {
+ LOG.warn(
+ "ShardIterator expired while trying to fetch records",e
+ );
+ return handleException(kinesisStartCheckpoint, recordList);
+ }
+ catch (ResourceNotFoundException | InvalidArgumentException e) {
+ // aws errors
+ LOG.error("Encountered AWS error while attempting to fetch records", e);
+ return handleException(kinesisStartCheckpoint, recordList);
+ }
+ catch (KinesisException e) {
+ LOG.warn("Encountered unknown unrecoverable AWS exception", e);
+ throw new RuntimeException(e);
+ }
+ catch (Throwable e) {
+ // non transient errors
+ LOG.error("Unknown fetchRecords exception", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ private KinesisFetchResult handleException(KinesisCheckpoint start, List<Record> recordList) {
+ if(recordList.size() > 0){
+ String nextStartSequenceNumber = recordList.get(recordList.size() - 1).sequenceNumber();
+ KinesisCheckpoint kinesisCheckpoint = new KinesisCheckpoint(nextStartSequenceNumber);
+ return new KinesisFetchResult(kinesisCheckpoint, recordList);
+ }else{
+ KinesisCheckpoint kinesisCheckpoint = new KinesisCheckpoint(start.getSequenceNumber());
+ return new KinesisFetchResult(kinesisCheckpoint, recordList);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org