You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by kl...@apache.org on 2023/08/25 15:32:11 UTC

[camel] branch main updated: CAMEL-19789: Fix handling of shardIterator so that all records are returned. (#11206)

This is an automated email from the ASF dual-hosted git repository.

klease pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 4e87f51c4a1 CAMEL-19789: Fix handling of shardIterator so that all records are returned. (#11206)
4e87f51c4a1 is described below

commit 4e87f51c4a1c93455c4062b6f65c34730c65d433
Author: klease <38...@users.noreply.github.com>
AuthorDate: Fri Aug 25 17:32:04 2023 +0200

    CAMEL-19789: Fix handling of shardIterator so that all records are returned. (#11206)
    
    According to the AWS Kinesis client documentation, "use a GetShardIterator request
    to get the first shard iterator for use in your first GetRecords request and for subsequent reads
    use the shard iterator returned by the GetRecords request in NextShardIterator".
    Adapt the unit tests for this case and modify the KinesisConsumerIT test to
    verify that different records are returned.
---
 .../component/aws2/kinesis/Kinesis2Consumer.java   | 73 ++++++++++++----------
 .../KinesisConsumerClosedShardWithFailTest.java    |  2 +-
 .../KinesisConsumerClosedShardWithSilentTest.java  |  8 +--
 .../kinesis/integration/KinesisConsumerIT.java     |  2 +
 4 files changed, 48 insertions(+), 37 deletions(-)

diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
index f70551aa53f..8e7a8692a35 100644
--- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
+++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
@@ -48,9 +48,10 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R
     private static final Logger LOG = LoggerFactory.getLogger(Kinesis2Consumer.class);
 
     private KinesisConnection connection;
-    private boolean isShardClosed;
     private ResumeStrategy resumeStrategy;
 
+    private String currentShardIterator;
+
     public Kinesis2Consumer(Kinesis2Endpoint endpoint,
                             Processor processor) {
         super(endpoint, processor);
@@ -126,8 +127,9 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R
         }
 
         if (shardIterator == null) {
-            // probably closed. Returning 0 as nothing was processed
+            // Unable to get an interator so shard must be closed
             processedExchangeCount.set(0);
+            return;
         }
 
         GetRecordsRequest req = GetRecordsRequest
@@ -166,18 +168,19 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R
         // we left off, however, I don't know what happens to subsequent
         // exchanges when an earlier exchange fails.
 
-        var currentShardIterator = result.nextShardIterator();
-        if (isShardClosed) {
+        currentShardIterator = result.nextShardIterator();
+        if (currentShardIterator == null) {
+            // This indicates that the shard is closed and no more data is available
             switch (getEndpoint().getConfiguration().getShardClosed()) {
                 case ignore:
-                    LOG.warn("The shard {} is in closed state", currentShardIterator);
+                    LOG.warn("The shard with id={} on stream {} reached CLOSE status",
+                            shard.shardId(), getEndpoint().getConfiguration().getStreamName());
                     break;
                 case silent:
                     break;
                 case fail:
-                    LOG.info("Shard Iterator reaches CLOSE status:{} {}",
-                            getEndpoint().getConfiguration().getStreamName(),
-                            getEndpoint().getConfiguration().getShardId());
+                    LOG.info("The shard with id={} on stream {} reached CLOSE status",
+                            shard.shardId(), getEndpoint().getConfiguration().getStreamName());
                     throw new IllegalStateException(
                             new ReachedClosedStatusException(
                                     getEndpoint().getConfiguration().getStreamName(), shard.shardId()));
@@ -210,38 +213,43 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R
             final Shard shard,
             final KinesisConnection kinesisConnection)
             throws ExecutionException, InterruptedException {
+        // either return a cached one or get a new one via a GetShardIterator
+        // request.
+        if (currentShardIterator == null) {
+            var shardId = shard.shardId();
 
-        var shardId = shard.shardId();
-        isShardClosed = shard.sequenceNumberRange().endingSequenceNumber() != null;
-        LOG.debug("ShardId is: {}", shardId);
-
-        GetShardIteratorRequest.Builder request = GetShardIteratorRequest.builder()
-                .streamName(getEndpoint().getConfiguration().getStreamName()).shardId(shardId)
-                .shardIteratorType(getEndpoint().getConfiguration().getIteratorType());
+            GetShardIteratorRequest.Builder request = GetShardIteratorRequest.builder()
+                    .streamName(getEndpoint().getConfiguration().getStreamName()).shardId(shardId)
+                    .shardIteratorType(getEndpoint().getConfiguration().getIteratorType());
 
-        if (hasSequenceNumber()) {
-            request.startingSequenceNumber(getEndpoint().getConfiguration().getSequenceNumber());
-        }
+            if (hasSequenceNumber()) {
+                request.startingSequenceNumber(getEndpoint().getConfiguration().getSequenceNumber());
+            }
 
-        resume(request);
+            resume(request);
 
-        GetShardIteratorResponse result;
-        if (getEndpoint().getConfiguration().isAsyncClient()) {
-            try {
+            GetShardIteratorResponse result;
+            if (getEndpoint().getConfiguration().isAsyncClient()) {
+                try {
+                    result = kinesisConnection
+                            .getAsyncClient(getEndpoint())
+                            .getShardIterator(request.build())
+                            .get();
+                } catch (ExecutionException | InterruptedException e) {
+                    throw new RuntimeException(e);
+                }
+            } else {
                 result = kinesisConnection
-                        .getAsyncClient(getEndpoint())
-                        .getShardIterator(request.build())
-                        .get();
-            } catch (ExecutionException | InterruptedException e) {
-                throw new RuntimeException(e);
+                        .getClient(getEndpoint())
+                        .getShardIterator(request.build());
             }
-        } else {
-            result = kinesisConnection
-                    .getClient(getEndpoint())
-                    .getShardIterator(request.build());
+
+            currentShardIterator = result.shardIterator();
+            LOG.debug("Obtained new ShardIterator {} for shard {} on stream {}", currentShardIterator, shardId,
+                    getEndpoint().getConfiguration().getStreamName());
         }
 
-        return result.shardIterator();
+        return currentShardIterator;
     }
 
     private void resume(GetShardIteratorRequest.Builder req) {
@@ -270,6 +278,7 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R
     }
 
     protected Exchange createExchange(Record dataRecord) {
+        LOG.debug("Received Kinesis record with partition_key={}", dataRecord.partitionKey());
         Exchange exchange = createExchange(true);
         exchange.getIn().setBody(dataRecord.data().asInputStream());
         exchange.getIn().setHeader(Kinesis2Constants.APPROX_ARRIVAL_TIME, dataRecord.approximateArrivalTimestamp());
diff --git a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithFailTest.java b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithFailTest.java
index 495942d298e..ea9b3882511 100644
--- a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithFailTest.java
+++ b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithFailTest.java
@@ -81,7 +81,7 @@ public class KinesisConsumerClosedShardWithFailTest {
 
         when(kinesisClient
                 .getRecords(any(GetRecordsRequest.class)))
-                .thenReturn(GetRecordsResponse.builder().nextShardIterator("nextShardIterator").build());
+                .thenReturn(GetRecordsResponse.builder().nextShardIterator(null).build());
         when(kinesisClient
                 .getShardIterator(any(GetShardIteratorRequest.class)))
                 .thenReturn(GetShardIteratorResponse.builder().shardIterator("shardIterator").build());
diff --git a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithSilentTest.java b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithSilentTest.java
index d14e5ebb332..034af13ed1b 100644
--- a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithSilentTest.java
+++ b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithSilentTest.java
@@ -89,7 +89,7 @@ public class KinesisConsumerClosedShardWithSilentTest {
 
         when(kinesisClient
                 .getRecords(any(GetRecordsRequest.class))).thenReturn(GetRecordsResponse.builder()
-                        .nextShardIterator("shardIterator")
+                        .nextShardIterator("nextShardIterator")
                         .records(
                                 Record.builder().sequenceNumber("1")
                                         .data(SdkBytes.fromString("Hello", Charset.defaultCharset()))
@@ -185,10 +185,10 @@ public class KinesisConsumerClosedShardWithSilentTest {
         underTest.poll();
 
         final ArgumentCaptor<GetRecordsRequest> getRecordsReqCap = ArgumentCaptor.forClass(GetRecordsRequest.class);
-
-        verify(kinesisClient, times(2)).getShardIterator(any(GetShardIteratorRequest.class));
+        // On second call it uses the one returned from the first call
+        verify(kinesisClient, times(1)).getShardIterator(any(GetShardIteratorRequest.class));
         verify(kinesisClient, times(2)).getRecords(getRecordsReqCap.capture());
         assertThat(getRecordsReqCap.getAllValues().get(0).shardIterator(), is("shardIterator"));
-        assertThat(getRecordsReqCap.getAllValues().get(1).shardIterator(), is("shardIterator"));
+        assertThat(getRecordsReqCap.getAllValues().get(1).shardIterator(), is("nextShardIterator"));
     }
 }
diff --git a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisConsumerIT.java b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisConsumerIT.java
index 754862b44c1..b9614b1129a 100644
--- a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisConsumerIT.java
+++ b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisConsumerIT.java
@@ -122,6 +122,7 @@ public class KinesisConsumerIT extends CamelTestSupport {
                 .untilAsserted(() -> result.assertIsSatisfied());
 
         assertEquals(messageCount, receivedMessages.size());
+        int messageCount = 0;
         for (KinesisData data : receivedMessages) {
             ObjectHelper.notNull(data, "data");
             assertNotNull(data.body, "The body should not be null");
@@ -131,6 +132,7 @@ public class KinesisConsumerIT extends CamelTestSupport {
              and so on. This is just testing that the code is not mixing things up.
              */
             assertTrue(data.partition.endsWith(data.body), "The data/partition mismatch for record: " + data);
+            assertEquals(messageCount++, Integer.valueOf(data.partition.substring(data.partition.lastIndexOf('-') + 1)));
         }
     }
 }