You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2020/08/02 08:05:45 UTC
[camel] branch camel-3.4.x updated: CAMEL-15358: avoid an
IndexOutOfBoundsException if there are no shards (#4065) (#4066)
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch camel-3.4.x
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-3.4.x by this push:
new a1baa1a CAMEL-15358: avoid an IndexOutOfBoundsException if there are no shards (#4065) (#4066)
a1baa1a is described below
commit a1baa1aabba7d3b737a0559a508c660a39793686
Author: Otavio Rodolfo Piske <or...@users.noreply.github.com>
AuthorDate: Sun Aug 2 10:05:27 2020 +0200
CAMEL-15358: avoid an IndexOutOfBoundsException if there are no shards (#4065) (#4066)
---
.../component/aws/kinesis/KinesisConsumer.java | 34 ++++++++++++++++++----
1 file changed, 29 insertions(+), 5 deletions(-)
diff --git a/components/camel-aws-kinesis/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConsumer.java b/components/camel-aws-kinesis/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConsumer.java
index adbf237..7c30c34 100644
--- a/components/camel-aws-kinesis/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConsumer.java
+++ b/components/camel-aws-kinesis/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConsumer.java
@@ -52,7 +52,20 @@ public class KinesisConsumer extends ScheduledBatchPollingConsumer {
@Override
protected int poll() throws Exception {
- GetRecordsRequest req = new GetRecordsRequest().withShardIterator(getShardItertor()).withLimit(getEndpoint().getConfiguration().getMaxResultsPerRequest());
+ String shardIterator = getShardIterator();
+
+ if (shardIterator == null) {
+ // probably closed. Returning 0 as nothing was processed
+
+ return 0;
+ }
+
+ GetRecordsRequest req = new GetRecordsRequest()
+ .withShardIterator(shardIterator)
+ .withLimit(getEndpoint()
+ .getConfiguration()
+ .getMaxResultsPerRequest());
+
GetRecordsResult result = getClient().getRecords(req);
Queue<Exchange> exchanges = createExchanges(result.getRecords());
@@ -61,7 +74,7 @@ public class KinesisConsumer extends ScheduledBatchPollingConsumer {
// May cache the last successful sequence number, and pass it to the
// getRecords request. That way, on the next poll, we start from where
// we left off, however, I don't know what happens to subsequent
- // exchanges when an earlier echangee fails.
+ // exchanges when an earlier exchange fails.
currentShardIterator = result.getNextShardIterator();
if (isShardClosed) {
@@ -109,7 +122,7 @@ public class KinesisConsumer extends ScheduledBatchPollingConsumer {
return (KinesisEndpoint)super.getEndpoint();
}
- private String getShardItertor() {
+ private String getShardIterator() {
// either return a cached one or get a new one via a GetShardIterator
// request.
if (currentShardIterator == null) {
@@ -129,11 +142,22 @@ public class KinesisConsumer extends ScheduledBatchPollingConsumer {
} else {
DescribeStreamRequest req1 = new DescribeStreamRequest().withStreamName(getEndpoint().getConfiguration().getStreamName());
DescribeStreamResult res1 = getClient().describeStream(req1);
- shardId = res1.getStreamDescription().getShards().get(0).getShardId();
- isShardClosed = res1.getStreamDescription().getShards().get(0).getSequenceNumberRange().getEndingSequenceNumber() != null;
+
+ List<Shard> shards = res1.getStreamDescription().getShards();
+
+ if (shards.size() == 0) {
+ LOG.warn("There are no shards in the stream");
+ // NOTE: Should we also set isShardClosed to true?
+ return null;
+ }
+
+ shardId = shards.get(0).getShardId();
+ isShardClosed = shards.get(0).getSequenceNumberRange().getEndingSequenceNumber() != null;
}
+
LOG.debug("ShardId is: {}", shardId);
+
GetShardIteratorRequest req = new GetShardIteratorRequest().withStreamName(getEndpoint().getConfiguration().getStreamName()).withShardId(shardId)
.withShardIteratorType(getEndpoint().getConfiguration().getIteratorType());