You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2023/07/19 09:42:00 UTC

[camel] branch main updated: CAMEL-19583 : multiple shards consumption of Camel aws2 Kinesis (#10709)

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new f8bad1bbf26 CAMEL-19583 : multiple shards consumption of Camel aws2 Kinesis (#10709)
f8bad1bbf26 is described below

commit f8bad1bbf26876f8f62a043dcb16cf5392a97344
Author: Hamed Hatami <ja...@gmail.com>
AuthorDate: Wed Jul 19 11:41:54 2023 +0200

    CAMEL-19583 : multiple shards consumption of Camel aws2 Kinesis (#10709)
    
    * CAMEL-19583 : multiple shards consumption of Camel aws2 Kinesis
    
    * CAMEL-19583 : multiple shards consumption of Camel aws2 Kinesis
    
    ---------
    
    Co-authored-by: Hamed Hatami <ha...@postnord.com>
---
 .../component/aws2/kinesis/Kinesis2Consumer.java   | 188 +++++++++------------
 .../KinesisConsumerClosedShardWithFailTest.java    |  27 ++-
 .../KinesisConsumerClosedShardWithSilentTest.java  |  42 ++---
 3 files changed, 109 insertions(+), 148 deletions(-)

diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
index 196032c9560..b00bdfb4e89 100644
--- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
+++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
@@ -20,6 +20,7 @@ import java.util.ArrayDeque;
 import java.util.List;
 import java.util.Queue;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
@@ -34,14 +35,12 @@ import org.apache.camel.util.CastUtils;
 import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
 import software.amazon.awssdk.services.kinesis.KinesisClient;
-import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;
-import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse;
 import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
 import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
 import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
 import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse;
+import software.amazon.awssdk.services.kinesis.model.ListShardsRequest;
 import software.amazon.awssdk.services.kinesis.model.Record;
 import software.amazon.awssdk.services.kinesis.model.Shard;
 import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
@@ -63,58 +62,69 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R
 
     @Override
     protected int poll() throws Exception {
-        String shardIterator = getShardIterator();
 
-        if (shardIterator == null) {
-            // probably closed. Returning 0 as nothing was processed
+        var processedExchangeCount = new AtomicInteger(0);
 
-            return 0;
-        }
+        getShardList()
+                .parallelStream()
+                .forEach(shard -> {
 
-        GetRecordsRequest req = GetRecordsRequest
-                .builder()
-                .shardIterator(shardIterator)
-                .limit(getEndpoint()
-                        .getConfiguration()
-                        .getMaxResultsPerRequest())
-                .build();
+                    String shardIterator = null;
+                    try {
+                        shardIterator = getShardIterator(shard);
+                    } catch (ExecutionException | InterruptedException e) {
+                        throw new RuntimeException(e);
+                    }
 
-        GetRecordsResponse result = null;
-        if (getEndpoint().getConfiguration().isAsyncClient()) {
-            result = getAsyncClient()
-                    .getRecords(req)
-                    .get();
-        } else {
-            result = getClient().getRecords(req);
-        }
+                    if (shardIterator == null) {
+                        // probably closed. Returning 0 as nothing was processed
+                        processedExchangeCount.set(0);
+                    }
 
-        Queue<Exchange> exchanges = createExchanges(result.records());
-        int processedExchangeCount = processBatch(CastUtils.cast(exchanges));
-
-        // May cache the last successful sequence number, and pass it to the
-        // getRecords request. That way, on the next poll, we start from where
-        // we left off, however, I don't know what happens to subsequent
-        // exchanges when an earlier exchange fails.
-
-        currentShardIterator = result.nextShardIterator();
-        if (isShardClosed) {
-            switch (getEndpoint().getConfiguration().getShardClosed()) {
-                case ignore:
-                    LOG.warn("The shard {} is in closed state", currentShardIterator);
-                    break;
-                case silent:
-                    break;
-                case fail:
-                    LOG.info("Shard Iterator reaches CLOSE status:{} {}", getEndpoint().getConfiguration().getStreamName(),
-                            getEndpoint().getConfiguration().getShardId());
-                    throw new ReachedClosedStatusException(
-                            getEndpoint().getConfiguration().getStreamName(), getEndpoint().getConfiguration().getShardId());
-                default:
-                    throw new IllegalArgumentException("Unsupported shard closed strategy");
-            }
-        }
+                    GetRecordsRequest req = GetRecordsRequest
+                            .builder()
+                            .shardIterator(shardIterator)
+                            .limit(getEndpoint()
+                                    .getConfiguration()
+                                    .getMaxResultsPerRequest())
+                            .build();
+                    GetRecordsResponse result = getClient().getRecords(req);
+
+                    try {
+                        Queue<Exchange> exchanges = createExchanges(result.records());
+                        processedExchangeCount.getAndSet(processBatch(CastUtils.cast(exchanges)));
+                    } catch (Exception e) {
+                        throw new RuntimeException(e);
+                    }
+
+                    // May cache the last successful sequence number, and pass it to the
+                    // getRecords request. That way, on the next poll, we start from where
+                    // we left off, however, I don't know what happens to subsequent
+                    // exchanges when an earlier exchange fails.
+
+                    currentShardIterator = result.nextShardIterator();
+                    if (isShardClosed) {
+                        switch (getEndpoint().getConfiguration().getShardClosed()) {
+                            case ignore:
+                                LOG.warn("The shard {} is in closed state", currentShardIterator);
+                                break;
+                            case silent:
+                                break;
+                            case fail:
+                                LOG.info("Shard Iterator reaches CLOSE status:{} {}",
+                                        getEndpoint().getConfiguration().getStreamName(),
+                                        getEndpoint().getConfiguration().getShardId());
+                                throw new IllegalStateException(
+                                        new ReachedClosedStatusException(
+                                                getEndpoint().getConfiguration().getStreamName(), shard.shardId()));
+                            default:
+                                throw new IllegalArgumentException("Unsupported shard closed strategy");
+                        }
+                    }
 
-        return processedExchangeCount;
+                });
+
+        return processedExchangeCount.get();
     }
 
     @Override
@@ -135,82 +145,31 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R
         return getEndpoint().getClient();
     }
 
-    private KinesisAsyncClient getAsyncClient() {
-        return getEndpoint().getAsyncClient();
-    }
-
     @Override
     public Kinesis2Endpoint getEndpoint() {
         return (Kinesis2Endpoint) super.getEndpoint();
     }
 
-    private String getShardIterator() throws ExecutionException, InterruptedException {
+    private String getShardIterator(final Shard shard) throws ExecutionException, InterruptedException {
         // either return a cached one or get a new one via a GetShardIterator
         // request.
         if (currentShardIterator == null) {
-            String shardId;
-
-            // If ShardId supplied use it, else choose first one
-            if (!getEndpoint().getConfiguration().getShardId().isEmpty()) {
-                shardId = getEndpoint().getConfiguration().getShardId();
-                DescribeStreamRequest request
-                        = DescribeStreamRequest.builder().streamName(getEndpoint().getConfiguration().getStreamName()).build();
-
-                DescribeStreamResponse response = null;
-                if (getEndpoint().getConfiguration().isAsyncClient()) {
-                    response = getAsyncClient()
-                            .describeStream(request)
-                            .get();
-                } else {
-                    response = getClient().describeStream(request);
-                }
-                for (Shard shard : response.streamDescription().shards()) {
-                    if (shard.shardId().equalsIgnoreCase(getEndpoint().getConfiguration().getShardId())) {
-                        isShardClosed = shard.sequenceNumberRange().endingSequenceNumber() != null;
-                    }
-                }
-
-            } else {
-                DescribeStreamRequest request
-                        = DescribeStreamRequest.builder().streamName(getEndpoint().getConfiguration().getStreamName()).build();
-
-                DescribeStreamResponse response = null;
-                if (getEndpoint().getConfiguration().isAsyncClient()) {
-                    response = getAsyncClient()
-                            .describeStream(request)
-                            .get();
-                } else {
-                    response = getClient().describeStream(request);
-                }
-                List<Shard> shards = response.streamDescription().shards();
-
-                if (shards.isEmpty()) {
-                    LOG.warn("There are no shards in the stream");
-                    return null;
-                }
-
-                shardId = shards.get(0).shardId();
-                isShardClosed = shards.get(0).sequenceNumberRange().endingSequenceNumber() != null;
-            }
+            var shardId = shard.shardId();
+
+            isShardClosed = shard.sequenceNumberRange().endingSequenceNumber() != null;
+
             LOG.debug("ShardId is: {}", shardId);
 
-            GetShardIteratorRequest.Builder req = GetShardIteratorRequest.builder()
+            GetShardIteratorRequest.Builder request = GetShardIteratorRequest.builder()
                     .streamName(getEndpoint().getConfiguration().getStreamName()).shardId(shardId)
                     .shardIteratorType(getEndpoint().getConfiguration().getIteratorType());
 
             if (hasSequenceNumber()) {
-                req.startingSequenceNumber(getEndpoint().getConfiguration().getSequenceNumber());
-            }
-
-            resume(req);
-
-            GetShardIteratorResponse result = null;
-            if (getEndpoint().getConfiguration().isAsyncClient()) {
-                result = getAsyncClient().getShardIterator(req.build()).get();
-            } else {
-                result = getClient().getShardIterator(req.build());
+                request.startingSequenceNumber(getEndpoint().getConfiguration().getSequenceNumber());
             }
 
+            resume(request);
+            GetShardIteratorResponse result = getClient().getShardIterator(request.build());
             currentShardIterator = result.shardIterator();
         }
 
@@ -294,4 +253,15 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R
     protected Kinesis2Configuration getConfiguration() {
         return getEndpoint().getConfiguration();
     }
+
+    private List<Shard> getShardList() {
+
+        var request = ListShardsRequest
+                .builder()
+                .streamName(getEndpoint().getConfiguration().getStreamName())
+                .build();
+
+        return getClient().listShards(request).shards();
+
+    }
 }
diff --git a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithFailTest.java b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithFailTest.java
index 55b718f299d..48e739e423f 100644
--- a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithFailTest.java
+++ b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithFailTest.java
@@ -28,16 +28,15 @@ import org.mockito.ArgumentCaptor;
 import org.mockito.Mock;
 import org.mockito.junit.jupiter.MockitoExtension;
 import software.amazon.awssdk.services.kinesis.KinesisClient;
-import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;
-import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse;
 import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
 import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
 import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
 import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse;
+import software.amazon.awssdk.services.kinesis.model.ListShardsRequest;
+import software.amazon.awssdk.services.kinesis.model.ListShardsResponse;
 import software.amazon.awssdk.services.kinesis.model.SequenceNumberRange;
 import software.amazon.awssdk.services.kinesis.model.Shard;
 import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
-import software.amazon.awssdk.services.kinesis.model.StreamDescription;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -57,7 +56,7 @@ public class KinesisConsumerClosedShardWithFailTest {
     private final CamelContext context = new DefaultCamelContext();
     private final Kinesis2Component component = new Kinesis2Component(context);
 
-    private Kinesis2Consumer undertest;
+    private Kinesis2Consumer underTest;
 
     @BeforeEach
     public void setup() {
@@ -68,7 +67,7 @@ public class KinesisConsumerClosedShardWithFailTest {
         configuration.setStreamName("streamName");
         Kinesis2Endpoint endpoint = new Kinesis2Endpoint(null, configuration, component);
         endpoint.start();
-        undertest = new Kinesis2Consumer(endpoint, processor);
+        underTest = new Kinesis2Consumer(endpoint, processor);
 
         SequenceNumberRange range = SequenceNumberRange.builder().endingSequenceNumber("20").build();
         Shard shard = Shard.builder().shardId("shardId").sequenceNumberRange(range).build();
@@ -77,29 +76,29 @@ public class KinesisConsumerClosedShardWithFailTest {
 
         when(kinesisClient.getRecords(any(GetRecordsRequest.class)))
                 .thenReturn(GetRecordsResponse.builder().nextShardIterator("nextShardIterator").build());
-        when(kinesisClient.describeStream(any(DescribeStreamRequest.class)))
-                .thenReturn(DescribeStreamResponse.builder()
-                        .streamDescription(StreamDescription.builder().shards(shardList).build()).build());
         when(kinesisClient.getShardIterator(any(GetShardIteratorRequest.class)))
                 .thenReturn(GetShardIteratorResponse.builder().shardIterator("shardIterator").build());
+        when(kinesisClient.listShards(any(ListShardsRequest.class)))
+                .thenReturn(ListShardsResponse.builder().shards(shardList).build());
     }
 
     @Test
     public void itObtainsAShardIteratorOnFirstPoll() {
-        assertThrows(ReachedClosedStatusException.class, () -> {
-            undertest.poll();
+        assertThrows(IllegalStateException.class, () -> {
+            underTest.poll();
         });
 
-        final ArgumentCaptor<DescribeStreamRequest> describeStreamReqCap = ArgumentCaptor.forClass(DescribeStreamRequest.class);
         final ArgumentCaptor<GetShardIteratorRequest> getShardIteratorReqCap
                 = ArgumentCaptor.forClass(GetShardIteratorRequest.class);
-
-        verify(kinesisClient).describeStream(describeStreamReqCap.capture());
-        assertThat(describeStreamReqCap.getValue().streamName(), is("streamName"));
+        final ArgumentCaptor<ListShardsRequest> getListShardsCap
+                = ArgumentCaptor.forClass(ListShardsRequest.class);
 
         verify(kinesisClient).getShardIterator(getShardIteratorReqCap.capture());
         assertThat(getShardIteratorReqCap.getValue().streamName(), is("streamName"));
         assertThat(getShardIteratorReqCap.getValue().shardId(), is("shardId"));
         assertThat(getShardIteratorReqCap.getValue().shardIteratorType(), is(ShardIteratorType.LATEST));
+
+        verify(kinesisClient).listShards(getListShardsCap.capture());
+        assertThat(getListShardsCap.getValue().streamName(), is("streamName"));
     }
 }
diff --git a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithSilentTest.java b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithSilentTest.java
index 41afc449035..76a5599e8f1 100644
--- a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithSilentTest.java
+++ b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithSilentTest.java
@@ -31,16 +31,16 @@ import org.mockito.junit.jupiter.MockitoExtension;
 import software.amazon.awssdk.core.SdkBytes;
 import software.amazon.awssdk.services.kinesis.KinesisClient;
 import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;
-import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse;
 import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
 import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
 import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
 import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse;
+import software.amazon.awssdk.services.kinesis.model.ListShardsRequest;
+import software.amazon.awssdk.services.kinesis.model.ListShardsResponse;
 import software.amazon.awssdk.services.kinesis.model.Record;
 import software.amazon.awssdk.services.kinesis.model.SequenceNumberRange;
 import software.amazon.awssdk.services.kinesis.model.Shard;
 import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
-import software.amazon.awssdk.services.kinesis.model.StreamDescription;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -60,7 +60,7 @@ public class KinesisConsumerClosedShardWithSilentTest {
     private final CamelContext context = new DefaultCamelContext();
     private final Kinesis2Component component = new Kinesis2Component(context);
 
-    private Kinesis2Consumer undertest;
+    private Kinesis2Consumer underTest;
 
     @BeforeEach
     public void setup() {
@@ -71,7 +71,7 @@ public class KinesisConsumerClosedShardWithSilentTest {
         configuration.setStreamName("streamName");
         Kinesis2Endpoint endpoint = new Kinesis2Endpoint("aws2-kinesis:foo", configuration, component);
         endpoint.start();
-        undertest = new Kinesis2Consumer(endpoint, processor);
+        underTest = new Kinesis2Consumer(endpoint, processor);
 
         SequenceNumberRange range = SequenceNumberRange.builder().endingSequenceNumber("20").build();
         Shard shard = Shard.builder().shardId("shardId").sequenceNumberRange(range).build();
@@ -86,27 +86,23 @@ public class KinesisConsumerClosedShardWithSilentTest {
                         Record.builder().sequenceNumber("2").data(SdkBytes.fromString("Hello", Charset.defaultCharset()))
                                 .build())
                 .build());
-        when(kinesisClient.describeStream(any(DescribeStreamRequest.class)))
-                .thenReturn(DescribeStreamResponse.builder()
-                        .streamDescription(StreamDescription.builder().shards(shardList).build()).build());
         when(kinesisClient.getShardIterator(any(GetShardIteratorRequest.class)))
                 .thenReturn(GetShardIteratorResponse.builder().shardIterator("shardIterator").build());
+        when(kinesisClient.listShards(any(ListShardsRequest.class)))
+                .thenReturn(ListShardsResponse.builder().shards(shardList).build());
 
         context.start();
-        undertest.start();
+        underTest.start();
     }
 
     @Test
     public void itObtainsAShardIteratorOnFirstPoll() throws Exception {
-        undertest.poll();
+        underTest.poll();
 
         final ArgumentCaptor<DescribeStreamRequest> describeStreamReqCap = ArgumentCaptor.forClass(DescribeStreamRequest.class);
         final ArgumentCaptor<GetShardIteratorRequest> getShardIteratorReqCap
                 = ArgumentCaptor.forClass(GetShardIteratorRequest.class);
 
-        verify(kinesisClient).describeStream(describeStreamReqCap.capture());
-        assertThat(describeStreamReqCap.getValue().streamName(), is("streamName"));
-
         verify(kinesisClient).getShardIterator(getShardIteratorReqCap.capture());
         assertThat(getShardIteratorReqCap.getValue().streamName(), is("streamName"));
         assertThat(getShardIteratorReqCap.getValue().shardId(), is("shardId"));
@@ -115,33 +111,30 @@ public class KinesisConsumerClosedShardWithSilentTest {
 
     @Test
     public void itDoesNotMakeADescribeStreamRequestIfShardIdIsSet() throws Exception {
-        undertest.getEndpoint().getConfiguration().setShardId("shardIdPassedAsUrlParam");
+        underTest.getEndpoint().getConfiguration().setShardId("shardId");
 
-        undertest.poll();
+        underTest.poll();
 
         final ArgumentCaptor<GetShardIteratorRequest> getShardIteratorReqCap
                 = ArgumentCaptor.forClass(GetShardIteratorRequest.class);
 
         verify(kinesisClient).getShardIterator(getShardIteratorReqCap.capture());
         assertThat(getShardIteratorReqCap.getValue().streamName(), is("streamName"));
-        assertThat(getShardIteratorReqCap.getValue().shardId(), is("shardIdPassedAsUrlParam"));
+        assertThat(getShardIteratorReqCap.getValue().shardId(), is("shardId"));
         assertThat(getShardIteratorReqCap.getValue().shardIteratorType(), is(ShardIteratorType.LATEST));
     }
 
     @Test
     public void itObtainsAShardIteratorOnFirstPollForSequenceNumber() throws Exception {
-        undertest.getEndpoint().getConfiguration().setSequenceNumber("12345");
-        undertest.getEndpoint().getConfiguration().setIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER);
+        underTest.getEndpoint().getConfiguration().setSequenceNumber("12345");
+        underTest.getEndpoint().getConfiguration().setIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER);
 
-        undertest.poll();
+        underTest.poll();
 
         final ArgumentCaptor<DescribeStreamRequest> describeStreamReqCap = ArgumentCaptor.forClass(DescribeStreamRequest.class);
         final ArgumentCaptor<GetShardIteratorRequest> getShardIteratorReqCap
                 = ArgumentCaptor.forClass(GetShardIteratorRequest.class);
 
-        verify(kinesisClient).describeStream(describeStreamReqCap.capture());
-        assertThat(describeStreamReqCap.getValue().streamName(), is("streamName"));
-
         verify(kinesisClient).getShardIterator(getShardIteratorReqCap.capture());
         assertThat(getShardIteratorReqCap.getValue().streamName(), is("streamName"));
         assertThat(getShardIteratorReqCap.getValue().shardId(), is("shardId"));
@@ -152,7 +145,7 @@ public class KinesisConsumerClosedShardWithSilentTest {
 
     @Test
     public void itUsesTheShardIteratorOnPolls() throws Exception {
-        undertest.poll();
+        underTest.poll();
 
         final ArgumentCaptor<GetRecordsRequest> getRecordsReqCap = ArgumentCaptor.forClass(GetRecordsRequest.class);
         verify(kinesisClient).getRecords(getRecordsReqCap.capture());
@@ -162,12 +155,11 @@ public class KinesisConsumerClosedShardWithSilentTest {
 
     @Test
     public void itUsesTheShardIteratorOnSubsiquentPolls() throws Exception {
-        undertest.poll();
-        undertest.poll();
+        underTest.poll();
+        underTest.poll();
 
         final ArgumentCaptor<GetRecordsRequest> getRecordsReqCap = ArgumentCaptor.forClass(GetRecordsRequest.class);
 
-        verify(kinesisClient, times(1)).describeStream(any(DescribeStreamRequest.class));
         verify(kinesisClient, times(1)).getShardIterator(any(GetShardIteratorRequest.class));
         verify(kinesisClient, times(2)).getRecords(getRecordsReqCap.capture());
         assertThat(getRecordsReqCap.getAllValues().get(0).shardIterator(), is("shardIterator"));