You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2017/12/21 06:53:33 UTC

[camel] 01/01: CAMEL-12089 - Camel-AWS: Kinesis consumer starts consuming data from the beginning even though the shard is in Closed state

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch CAMEL-12089
in repository https://gitbox.apache.org/repos/asf/camel.git

commit fa740ff4f0c11aaa1b4b8e02881cba72ef7bd1a4
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Mon Dec 18 14:09:42 2017 +0100

    CAMEL-12089 - Camel-AWS: Kinesis consumer starts consuming data from the beginning even though the shard is in Closed state
---
 .../src/main/docs/aws-kinesis-component.adoc       |  3 +-
 .../component/aws/kinesis/KinesisConsumer.java     | 60 ++++++++++----
 .../component/aws/kinesis/KinesisEndpoint.java     | 19 ++++-
 .../kinesis/KinesisShardClosedStrategyEnum.java    | 24 ++++++
 .../aws/kinesis/ReachedClosedStatusException.java  | 29 +++++++
 .../KinesisConsumerClosedShardWithFailTest.java    | 96 ++++++++++++++++++++++
 ... KinesisConsumerClosedShardWithSilentTest.java} | 17 ++--
 .../component/aws/kinesis/KinesisConsumerTest.java | 13 ++-
 8 files changed, 235 insertions(+), 26 deletions(-)

diff --git a/components/camel-aws/src/main/docs/aws-kinesis-component.adoc b/components/camel-aws/src/main/docs/aws-kinesis-component.adoc
index 39006e5..5ec4b7d 100644
--- a/components/camel-aws/src/main/docs/aws-kinesis-component.adoc
+++ b/components/camel-aws/src/main/docs/aws-kinesis-component.adoc
@@ -52,7 +52,7 @@ with the following path and query parameters:
 | *streamName* | *Required* Name of the stream |  | String
 |===
 
-==== Query Parameters (24 parameters):
+==== Query Parameters (25 parameters):
 
 [width="100%",cols="2,5,^1,2",options="header"]
 |===
@@ -63,6 +63,7 @@ with the following path and query parameters:
 | *maxResultsPerRequest* (consumer) | Maximum number of records that will be fetched in each poll | 1 | int
 | *sendEmptyMessageWhenIdle* (consumer) | If the polling consumer did not poll any files you can enable this option to send an empty message (no body) instead. | false | boolean
 | *sequenceNumber* (consumer) | The sequence number to start polling from. Required if iteratorType is set to AFTER_SEQUENCE_NUMBER or AT_SEQUENCE_NUMBER |  | String
+| *shardClosed* (consumer) | Define what will be the behavior in case of shard closed. Possible value are ignore silent and fail.In case of ignore a message will be logged and the consumer will restart from the beginningin case of silent there will be no logging and the consumer will start from the beginningin case of fail a ReachedClosedStateException will be raised | ignore | KinesisShardClosed StrategyEnum
 | *shardId* (consumer) | Defines which shardId in the Kinesis stream to get records from |  | String
 | *exceptionHandler* (consumer) | To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this options is not in use. By default the consumer will deal with exceptions that will be logged at WARN or ERROR level and ignored. |  | ExceptionHandler
 | *exchangePattern* (consumer) | Sets the exchange pattern when the consumer creates an exchange. |  | ExchangePattern
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConsumer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConsumer.java
index 1addcdf..ec480e4 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConsumer.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConsumer.java
@@ -17,6 +17,7 @@
 package org.apache.camel.component.aws.kinesis;
 
 import java.util.ArrayDeque;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Queue;
 
@@ -28,7 +29,9 @@ import com.amazonaws.services.kinesis.model.GetRecordsResult;
 import com.amazonaws.services.kinesis.model.GetShardIteratorRequest;
 import com.amazonaws.services.kinesis.model.GetShardIteratorResult;
 import com.amazonaws.services.kinesis.model.Record;
+import com.amazonaws.services.kinesis.model.Shard;
 import com.amazonaws.services.kinesis.model.ShardIteratorType;
+
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
@@ -42,6 +45,7 @@ public class KinesisConsumer extends ScheduledBatchPollingConsumer {
     private static final Logger LOG = LoggerFactory.getLogger(KinesisConsumer.class);
 
     private String currentShardIterator;
+    private boolean isShardClosed;
 
     public KinesisConsumer(KinesisEndpoint endpoint, Processor processor) {
         super(endpoint, processor);
@@ -49,9 +53,7 @@ public class KinesisConsumer extends ScheduledBatchPollingConsumer {
 
     @Override
     protected int poll() throws Exception {
-        GetRecordsRequest req = new GetRecordsRequest()
-                .withShardIterator(getShardItertor())
-                .withLimit(getEndpoint().getMaxResultsPerRequest());
+        GetRecordsRequest req = new GetRecordsRequest().withShardIterator(getShardItertor()).withLimit(getEndpoint().getMaxResultsPerRequest());
         GetRecordsResult result = getClient().getRecords(req);
 
         Queue<Exchange> exchanges = createExchanges(result.getRecords());
@@ -63,6 +65,20 @@ public class KinesisConsumer extends ScheduledBatchPollingConsumer {
         // exchanges when an earlier echangee fails.
 
         currentShardIterator = result.getNextShardIterator();
+        if (isShardClosed) {
+            switch (getEndpoint().getShardClosed()) {
+            case ignore:
+                LOG.warn("The shard {} is in closed state");
+                break;
+            case silent:
+                break;
+            case fail:
+                LOG.info("Shard Iterator reaches CLOSE status:", getEndpoint().getStreamName(), getEndpoint().getShardId());
+                throw new ReachedClosedStatusException(getEndpoint().getStreamName(), getEndpoint().getShardId());
+            default:
+                throw new IllegalArgumentException("Unsupported shard closed strategy");
+            }
+        }
 
         return processedExchangeCount;
     }
@@ -91,29 +107,46 @@ public class KinesisConsumer extends ScheduledBatchPollingConsumer {
 
     @Override
     public KinesisEndpoint getEndpoint() {
-        return (KinesisEndpoint) super.getEndpoint();
+        return (KinesisEndpoint)super.getEndpoint();
     }
 
     private String getShardItertor() {
-        // either return a cached one or get a new one via a GetShardIterator request.
+        // either return a cached one or get a new one via a GetShardIterator
+        // request.
         if (currentShardIterator == null) {
             String shardId;
 
-            //If ShardId supplied use it, else choose first one
+            // If ShardId supplied use it, else choose first one
             if (!getEndpoint().getShardId().isEmpty()) {
                 shardId = getEndpoint().getShardId();
+                DescribeStreamRequest req1 = new DescribeStreamRequest().withStreamName(getEndpoint().getStreamName());
+                DescribeStreamResult res1 = getClient().describeStream(req1);
+                Iterator it = res1.getStreamDescription().getShards().iterator();
+                while (it.hasNext()) {
+                    Shard shard = (Shard)it.next();
+                    if (shard.getShardId().equalsIgnoreCase(getEndpoint().getShardId())) {
+                        if (shard.getSequenceNumberRange().getEndingSequenceNumber() == null) {
+                            isShardClosed = false;
+                        } else {
+                            isShardClosed = true;
+                        }
+                    }
+                }
+
             } else {
-                DescribeStreamRequest req1 = new DescribeStreamRequest()
-                        .withStreamName(getEndpoint().getStreamName());
+                DescribeStreamRequest req1 = new DescribeStreamRequest().withStreamName(getEndpoint().getStreamName());
                 DescribeStreamResult res1 = getClient().describeStream(req1);
                 shardId = res1.getStreamDescription().getShards().get(0).getShardId();
+                if (res1.getStreamDescription().getShards().get(0).getSequenceNumberRange().getEndingSequenceNumber() == null) {
+                    isShardClosed = false;
+                } else {
+                    isShardClosed = true;
+                }
             }
             LOG.debug("ShardId is: {}", shardId);
 
-            GetShardIteratorRequest req = new GetShardIteratorRequest()
-                    .withStreamName(getEndpoint().getStreamName())
-                    .withShardId(shardId)
-                    .withShardIteratorType(getEndpoint().getIteratorType());
+            GetShardIteratorRequest req = new GetShardIteratorRequest().withStreamName(getEndpoint().getStreamName()).withShardId(shardId)
+                .withShardIteratorType(getEndpoint().getIteratorType());
 
             if (hasSequenceNumber()) {
                 req.withStartingSequenceNumber(getEndpoint().getSequenceNumber());
@@ -136,7 +169,6 @@ public class KinesisConsumer extends ScheduledBatchPollingConsumer {
 
     private boolean hasSequenceNumber() {
         return !getEndpoint().getSequenceNumber().isEmpty()
-                && (getEndpoint().getIteratorType().equals(ShardIteratorType.AFTER_SEQUENCE_NUMBER)
-                    || getEndpoint().getIteratorType().equals(ShardIteratorType.AT_SEQUENCE_NUMBER));
+               && (getEndpoint().getIteratorType().equals(ShardIteratorType.AFTER_SEQUENCE_NUMBER) || getEndpoint().getIteratorType().equals(ShardIteratorType.AT_SEQUENCE_NUMBER));
     }
 }
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisEndpoint.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisEndpoint.java
index 3bd06e5..9515889 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisEndpoint.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisEndpoint.java
@@ -19,6 +19,7 @@ package org.apache.camel.component.aws.kinesis;
 import com.amazonaws.services.kinesis.AmazonKinesis;
 import com.amazonaws.services.kinesis.model.Record;
 import com.amazonaws.services.kinesis.model.ShardIteratorType;
+
 import org.apache.camel.Consumer;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
@@ -30,10 +31,10 @@ import org.apache.camel.spi.UriParam;
 import org.apache.camel.spi.UriPath;
 
 /**
- * The aws-kinesis component is for consuming and producing records from Amazon Kinesis Streams.
+ * The aws-kinesis component is for consuming and producing records from Amazon
+ * Kinesis Streams.
  */
-@UriEndpoint(firstVersion = "2.17.0", scheme = "aws-kinesis", title = "AWS Kinesis", syntax = "aws-kinesis:streamName",
-    consumerClass = KinesisConsumer.class, label = "cloud,messaging")
+@UriEndpoint(firstVersion = "2.17.0", scheme = "aws-kinesis", title = "AWS Kinesis", syntax = "aws-kinesis:streamName", consumerClass = KinesisConsumer.class, label = "cloud,messaging")
 public class KinesisEndpoint extends ScheduledPollEndpoint {
 
     @UriPath(description = "Name of the stream")
@@ -50,6 +51,11 @@ public class KinesisEndpoint extends ScheduledPollEndpoint {
     private String shardId = "";
     @UriParam(label = "consumer", description = "The sequence number to start polling from. Required if iteratorType is set to AFTER_SEQUENCE_NUMBER or AT_SEQUENCE_NUMBER")
     private String sequenceNumber = "";
+    @UriParam(label = "consumer", defaultValue = "ignore", description = "Define what will be the behavior in case of shard closed. Possible value are ignore, silent and fail."
+                                                                         + "In case of ignore a message will be logged and the consumer will restart from the beginning,"
+                                                                         + "in case of silent there will be no logging and the consumer will start from the beginning,"
+                                                                         + "in case of fail a ReachedClosedStateException will be raised")
+    private KinesisShardClosedStrategyEnum shardClosed;
 
     public KinesisEndpoint(String uri, String streamName, KinesisComponent component) {
         super(uri, component);
@@ -144,4 +150,11 @@ public class KinesisEndpoint extends ScheduledPollEndpoint {
         this.sequenceNumber = sequenceNumber;
     }
 
+    public KinesisShardClosedStrategyEnum getShardClosed() {
+        return shardClosed;
+    }
+
+    public void setShardClosed(KinesisShardClosedStrategyEnum shardClosed) {
+        this.shardClosed = shardClosed;
+    }
 }
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisShardClosedStrategyEnum.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisShardClosedStrategyEnum.java
new file mode 100644
index 0000000..c4db5e3
--- /dev/null
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisShardClosedStrategyEnum.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.kinesis;
+
+public enum KinesisShardClosedStrategyEnum {
+
+    ignore,
+    fail,
+    silent
+}
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/ReachedClosedStatusException.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/ReachedClosedStatusException.java
new file mode 100644
index 0000000..8376511
--- /dev/null
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/ReachedClosedStatusException.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.kinesis;
+
+public class ReachedClosedStatusException extends Exception {
+
+    private final String streamName;
+    private final String shardId;
+
+    public ReachedClosedStatusException(String streamName, String shardId) {
+        super();
+        this.streamName = streamName;
+        this.shardId = shardId;
+    }
+}
diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisConsumerClosedShardWithFailTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisConsumerClosedShardWithFailTest.java
new file mode 100644
index 0000000..3f7e7ea
--- /dev/null
+++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisConsumerClosedShardWithFailTest.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.kinesis;
+
+import java.util.ArrayList;
+
+import com.amazonaws.services.kinesis.AmazonKinesis;
+import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
+import com.amazonaws.services.kinesis.model.DescribeStreamResult;
+import com.amazonaws.services.kinesis.model.GetRecordsRequest;
+import com.amazonaws.services.kinesis.model.GetRecordsResult;
+import com.amazonaws.services.kinesis.model.GetShardIteratorRequest;
+import com.amazonaws.services.kinesis.model.GetShardIteratorResult;
+import com.amazonaws.services.kinesis.model.SequenceNumberRange;
+import com.amazonaws.services.kinesis.model.Shard;
+import com.amazonaws.services.kinesis.model.ShardIteratorType;
+import com.amazonaws.services.kinesis.model.StreamDescription;
+
+import org.apache.camel.AsyncProcessor;
+import org.apache.camel.CamelContext;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class KinesisConsumerClosedShardWithFailTest {
+
+    @Mock
+    private AmazonKinesis kinesisClient;
+    @Mock
+    private AsyncProcessor processor;
+
+    private final CamelContext context = new DefaultCamelContext();
+    private final KinesisComponent component = new KinesisComponent(context);
+
+    private KinesisConsumer undertest;
+
+    @Before
+    public void setup() throws Exception {
+        KinesisEndpoint endpoint = new KinesisEndpoint(null, "streamName", component);
+        endpoint.setAmazonKinesisClient(kinesisClient);
+        endpoint.setIteratorType(ShardIteratorType.LATEST);
+        endpoint.setShardClosed(KinesisShardClosedStrategyEnum.fail);
+        undertest = new KinesisConsumer(endpoint, processor);
+
+        SequenceNumberRange range = new SequenceNumberRange().withEndingSequenceNumber("20");
+        Shard shard = new Shard().withShardId("shardId").withSequenceNumberRange(range);
+        ArrayList<Shard> shardList = new ArrayList<>();
+        shardList.add(shard);
+
+        when(kinesisClient.getRecords(any(GetRecordsRequest.class))).thenReturn(new GetRecordsResult().withNextShardIterator("nextShardIterator"));
+        when(kinesisClient.describeStream(any(DescribeStreamRequest.class)))
+            .thenReturn(new DescribeStreamResult().withStreamDescription(new StreamDescription().withShards(shardList)));
+        when(kinesisClient.getShardIterator(any(GetShardIteratorRequest.class))).thenReturn(new GetShardIteratorResult().withShardIterator("shardIterator"));
+    }
+
+    @Test(expected = ReachedClosedStatusException.class)
+    public void itObtainsAShardIteratorOnFirstPoll() throws Exception {
+        undertest.poll();
+
+        final ArgumentCaptor<DescribeStreamRequest> describeStreamReqCap = ArgumentCaptor.forClass(DescribeStreamRequest.class);
+        final ArgumentCaptor<GetShardIteratorRequest> getShardIteratorReqCap = ArgumentCaptor.forClass(GetShardIteratorRequest.class);
+
+        verify(kinesisClient).describeStream(describeStreamReqCap.capture());
+        assertThat(describeStreamReqCap.getValue().getStreamName(), is("streamName"));
+
+        verify(kinesisClient).getShardIterator(getShardIteratorReqCap.capture());
+        assertThat(getShardIteratorReqCap.getValue().getStreamName(), is("streamName"));
+        assertThat(getShardIteratorReqCap.getValue().getShardId(), is("shardId"));
+        assertThat(getShardIteratorReqCap.getValue().getShardIteratorType(), is("LATEST"));
+    }
+}
diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisConsumerTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisConsumerClosedShardWithSilentTest.java
similarity index 94%
copy from components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisConsumerTest.java
copy to components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisConsumerClosedShardWithSilentTest.java
index 3fb29fe..f5daa8f 100644
--- a/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisConsumerTest.java
+++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisConsumerClosedShardWithSilentTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.component.aws.kinesis;
 
+import java.util.ArrayList;
 import java.util.Date;
 
 import com.amazonaws.services.kinesis.AmazonKinesis;
@@ -26,6 +27,7 @@ import com.amazonaws.services.kinesis.model.GetRecordsResult;
 import com.amazonaws.services.kinesis.model.GetShardIteratorRequest;
 import com.amazonaws.services.kinesis.model.GetShardIteratorResult;
 import com.amazonaws.services.kinesis.model.Record;
+import com.amazonaws.services.kinesis.model.SequenceNumberRange;
 import com.amazonaws.services.kinesis.model.Shard;
 import com.amazonaws.services.kinesis.model.ShardIteratorType;
 import com.amazonaws.services.kinesis.model.StreamDescription;
@@ -43,6 +45,7 @@ import org.mockito.junit.MockitoJUnitRunner;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertThat;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.times;
@@ -50,7 +53,7 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 @RunWith(MockitoJUnitRunner.class)
-public class KinesisConsumerTest {
+public class KinesisConsumerClosedShardWithSilentTest {
 
     @Mock
     private AmazonKinesis kinesisClient;
@@ -67,7 +70,14 @@ public class KinesisConsumerTest {
         KinesisEndpoint endpoint = new KinesisEndpoint(null, "streamName", component);
         endpoint.setAmazonKinesisClient(kinesisClient);
         endpoint.setIteratorType(ShardIteratorType.LATEST);
+        endpoint.setShardClosed(KinesisShardClosedStrategyEnum.silent);
         undertest = new KinesisConsumer(endpoint, processor);
+        
+        SequenceNumberRange range = new SequenceNumberRange().withEndingSequenceNumber("20");
+        Shard shard = new Shard().withShardId("shardId").withSequenceNumberRange(range);
+        ArrayList<Shard> shardList = new ArrayList<>();
+        shardList.add(shard);
+       
 
         when(kinesisClient.getRecords(any(GetRecordsRequest.class)))
             .thenReturn(new GetRecordsResult()
@@ -76,7 +86,7 @@ public class KinesisConsumerTest {
         when(kinesisClient.describeStream(any(DescribeStreamRequest.class)))
             .thenReturn(new DescribeStreamResult()
                 .withStreamDescription(new StreamDescription()
-                    .withShards(new Shard().withShardId("shardId"))
+                    .withShards(shardList)
                 )
             );
         when(kinesisClient.getShardIterator(any(GetShardIteratorRequest.class)))
@@ -107,8 +117,6 @@ public class KinesisConsumerTest {
 
         undertest.poll();
 
-        verify(kinesisClient, never()).describeStream(any(DescribeStreamRequest.class));
-
         final ArgumentCaptor<GetShardIteratorRequest> getShardIteratorReqCap = ArgumentCaptor.forClass(GetShardIteratorRequest.class);
 
         verify(kinesisClient).getShardIterator(getShardIteratorReqCap.capture());
@@ -203,5 +211,4 @@ public class KinesisConsumerTest {
         assertThat(exchangeCaptor.getValue().getIn().getHeader(KinesisConstants.PARTITION_KEY, String.class), is(partitionKey));
         assertThat(exchangeCaptor.getValue().getIn().getHeader(KinesisConstants.SEQUENCE_NUMBER, String.class), is(sequenceNumber));
     }
-
 }
\ No newline at end of file
diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisConsumerTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisConsumerTest.java
index 3fb29fe..e43461a 100644
--- a/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisConsumerTest.java
+++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisConsumerTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.component.aws.kinesis;
 
+import java.util.ArrayList;
 import java.util.Date;
 
 import com.amazonaws.services.kinesis.AmazonKinesis;
@@ -26,6 +27,7 @@ import com.amazonaws.services.kinesis.model.GetRecordsResult;
 import com.amazonaws.services.kinesis.model.GetShardIteratorRequest;
 import com.amazonaws.services.kinesis.model.GetShardIteratorResult;
 import com.amazonaws.services.kinesis.model.Record;
+import com.amazonaws.services.kinesis.model.SequenceNumberRange;
 import com.amazonaws.services.kinesis.model.Shard;
 import com.amazonaws.services.kinesis.model.ShardIteratorType;
 import com.amazonaws.services.kinesis.model.StreamDescription;
@@ -67,7 +69,14 @@ public class KinesisConsumerTest {
         KinesisEndpoint endpoint = new KinesisEndpoint(null, "streamName", component);
         endpoint.setAmazonKinesisClient(kinesisClient);
         endpoint.setIteratorType(ShardIteratorType.LATEST);
+        endpoint.setShardClosed(KinesisShardClosedStrategyEnum.silent);
         undertest = new KinesisConsumer(endpoint, processor);
+        
+        SequenceNumberRange range = new SequenceNumberRange().withEndingSequenceNumber(null);
+        Shard shard = new Shard().withShardId("shardId").withSequenceNumberRange(range);
+        ArrayList<Shard> shardList = new ArrayList<>();
+        shardList.add(shard);
+       
 
         when(kinesisClient.getRecords(any(GetRecordsRequest.class)))
             .thenReturn(new GetRecordsResult()
@@ -76,7 +85,7 @@ public class KinesisConsumerTest {
         when(kinesisClient.describeStream(any(DescribeStreamRequest.class)))
             .thenReturn(new DescribeStreamResult()
                 .withStreamDescription(new StreamDescription()
-                    .withShards(new Shard().withShardId("shardId"))
+                    .withShards(shardList)
                 )
             );
         when(kinesisClient.getShardIterator(any(GetShardIteratorRequest.class)))
@@ -107,8 +116,6 @@ public class KinesisConsumerTest {
 
         undertest.poll();
 
-        verify(kinesisClient, never()).describeStream(any(DescribeStreamRequest.class));
-
         final ArgumentCaptor<GetShardIteratorRequest> getShardIteratorReqCap = ArgumentCaptor.forClass(GetShardIteratorRequest.class);
 
         verify(kinesisClient).getShardIterator(getShardIteratorReqCap.capture());

-- 
To stop receiving notification emails like this one, please contact
"commits@camel.apache.org" <co...@camel.apache.org>.