You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2020/08/05 13:02:54 UTC
[camel] branch master updated: CAMEL-15369: avoid an
IndexOutOfBoundsException if there are no shards AWS SDK v2 (#4070)
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new 20c5aaf CAMEL-15369: avoid an IndexOutOfBoundsException if there are no shards AWS SDK v2 (#4070)
20c5aaf is described below
commit 20c5aaf4d5abaaa18a3c981d2e23cd7cbb8abd71
Author: Otavio Rodolfo Piske <or...@users.noreply.github.com>
AuthorDate: Wed Aug 5 15:02:39 2020 +0200
CAMEL-15369: avoid an IndexOutOfBoundsException if there are no shards AWS SDK v2 (#4070)
---
.../component/aws2/kinesis/Kinesis2Consumer.java | 30 +++++++++++++++++++---
1 file changed, 26 insertions(+), 4 deletions(-)
diff --git a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
index c1483bd..74d0c38 100644
--- a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
+++ b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
@@ -52,7 +52,21 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer {
@Override
protected int poll() throws Exception {
- GetRecordsRequest req = GetRecordsRequest.builder().shardIterator(getShardItertor()).limit(getEndpoint().getConfiguration().getMaxResultsPerRequest()).build();
+ String shardIterator = getShardIterator();
+
+ if (shardIterator == null) {
+ // probably closed. Returning 0 as nothing was processed
+
+ return 0;
+ }
+
+ GetRecordsRequest req = GetRecordsRequest
+ .builder()
+ .shardIterator(shardIterator)
+ .limit(getEndpoint()
+ .getConfiguration()
+ .getMaxResultsPerRequest())
+ .build();
GetRecordsResponse result = getClient().getRecords(req);
Queue<Exchange> exchanges = createExchanges(result.records());
@@ -109,7 +123,7 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer {
return (Kinesis2Endpoint)super.getEndpoint();
}
- private String getShardItertor() {
+ private String getShardIterator() {
// either return a cached one or get a new one via a GetShardIterator
// request.
if (currentShardIterator == null) {
@@ -129,8 +143,16 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer {
} else {
DescribeStreamRequest req1 = DescribeStreamRequest.builder().streamName(getEndpoint().getConfiguration().getStreamName()).build();
DescribeStreamResponse res1 = getClient().describeStream(req1);
- shardId = res1.streamDescription().shards().get(0).shardId();
- isShardClosed = res1.streamDescription().shards().get(0).sequenceNumberRange().endingSequenceNumber() != null;
+
+ List<Shard> shards = res1.streamDescription().shards();
+
+ if (shards.size() == 0) {
+ LOG.warn("There are no shards in the stream");
+ return null;
+ }
+
+ shardId = shards.get(0).shardId();
+ isShardClosed = shards.get(0).sequenceNumberRange().endingSequenceNumber() != null;
}
LOG.debug("ShardId is: {}", shardId);