You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2020/08/05 13:03:05 UTC

[camel] branch camel-3.4.x updated: CAMEL-15369: avoid an IndexOutOfBoundsException if there are no shards AWS SDK v2 (#4071)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-3.4.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-3.4.x by this push:
     new b63e386   CAMEL-15369: avoid an IndexOutOfBoundsException if there are no shards AWS SDK v2 (#4071)
b63e386 is described below

commit b63e386e028e8988fc70cdb7d8ca45ef930407cb
Author: Otavio Rodolfo Piske <or...@users.noreply.github.com>
AuthorDate: Wed Aug 5 15:02:50 2020 +0200

     CAMEL-15369: avoid an IndexOutOfBoundsException if there are no shards AWS SDK v2 (#4071)
---
 .../component/aws2/kinesis/Kinesis2Consumer.java   | 30 +++++++++++++++++++---
 1 file changed, 26 insertions(+), 4 deletions(-)

diff --git a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
index c1483bd..74d0c38 100644
--- a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
+++ b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
@@ -52,7 +52,21 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer {
 
     @Override
     protected int poll() throws Exception {
-        GetRecordsRequest req = GetRecordsRequest.builder().shardIterator(getShardItertor()).limit(getEndpoint().getConfiguration().getMaxResultsPerRequest()).build();
+        String shardIterator = getShardIterator();
+
+        if (shardIterator == null) {
+            // probably closed. Returning 0 as nothing was processed
+
+            return 0;
+        }
+
+        GetRecordsRequest req = GetRecordsRequest
+                .builder()
+                .shardIterator(shardIterator)
+                .limit(getEndpoint()
+                        .getConfiguration()
+                        .getMaxResultsPerRequest())
+                .build();
         GetRecordsResponse result = getClient().getRecords(req);
 
         Queue<Exchange> exchanges = createExchanges(result.records());
@@ -109,7 +123,7 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer {
         return (Kinesis2Endpoint)super.getEndpoint();
     }
 
-    private String getShardItertor() {
+    private String getShardIterator() {
         // either return a cached one or get a new one via a GetShardIterator
         // request.
         if (currentShardIterator == null) {
@@ -129,8 +143,16 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer {
             } else {
                 DescribeStreamRequest req1 = DescribeStreamRequest.builder().streamName(getEndpoint().getConfiguration().getStreamName()).build();
                 DescribeStreamResponse res1 = getClient().describeStream(req1);
-                shardId = res1.streamDescription().shards().get(0).shardId();
-                isShardClosed = res1.streamDescription().shards().get(0).sequenceNumberRange().endingSequenceNumber() != null;
+
+                List<Shard> shards = res1.streamDescription().shards();
+
+                if (shards.size() == 0) {
+                    LOG.warn("There are no shards in the stream");
+                    return null;
+                }
+
+                shardId = shards.get(0).shardId();
+                isShardClosed = shards.get(0).sequenceNumberRange().endingSequenceNumber() != null;
             }
             LOG.debug("ShardId is: {}", shardId);