You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2023/08/30 17:20:29 UTC

[camel] branch camel-4.0.x updated: CAMEL-19811: restore multi-shard handling. (#11243)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-4.0.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-4.0.x by this push:
     new aa3dd7090b9 CAMEL-19811: restore multi-shard handling. (#11243)
aa3dd7090b9 is described below

commit aa3dd7090b95c0b289a4698cce229d620b520fba
Author: klease <38...@users.noreply.github.com>
AuthorDate: Wed Aug 30 19:19:41 2023 +0200

    CAMEL-19811: restore multi-shard handling. (#11243)
    
    Adapt the handling of closed streams.
    Modify KinesisUtils to be able to create a stream with multiple shards.
    Modify KinesisConsumerIT to test a stream with 2 shards.
---
 .../component/aws2/kinesis/Kinesis2Consumer.java   | 62 +++++++++++++---------
 .../KinesisConsumerClosedShardWithFailTest.java    |  9 +++-
 .../kinesis/integration/KinesisConsumerIT.java     |  8 +--
 .../test/infra/aws2/clients/KinesisUtils.java      | 10 ++--
 4 files changed, 56 insertions(+), 33 deletions(-)

diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
index 8e7a8692a35..87178332e51 100644
--- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
+++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
@@ -18,6 +18,7 @@ package org.apache.camel.component.aws2.kinesis;
 
 import java.util.ArrayDeque;
 import java.util.List;
+import java.util.Map;
 import java.util.Queue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -50,7 +51,7 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R
     private KinesisConnection connection;
     private ResumeStrategy resumeStrategy;
 
-    private String currentShardIterator;
+    private Map<String, String> currentShardIterators = new java.util.HashMap<>();
 
     public Kinesis2Consumer(Kinesis2Endpoint endpoint,
                             Processor processor) {
@@ -167,27 +168,11 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R
         // getRecords request. That way, on the next poll, we start from where
         // we left off, however, I don't know what happens to subsequent
         // exchanges when an earlier exchange fails.
+        updateShardIterator(shard, result.nextShardIterator());
+    }
 
-        currentShardIterator = result.nextShardIterator();
-        if (currentShardIterator == null) {
-            // This indicates that the shard is closed and no more data is available
-            switch (getEndpoint().getConfiguration().getShardClosed()) {
-                case ignore:
-                    LOG.warn("The shard with id={} on stream {} reached CLOSE status",
-                            shard.shardId(), getEndpoint().getConfiguration().getStreamName());
-                    break;
-                case silent:
-                    break;
-                case fail:
-                    LOG.info("The shard with id={} on stream {} reached CLOSE status",
-                            shard.shardId(), getEndpoint().getConfiguration().getStreamName());
-                    throw new IllegalStateException(
-                            new ReachedClosedStatusException(
-                                    getEndpoint().getConfiguration().getStreamName(), shard.shardId()));
-                default:
-                    throw new IllegalArgumentException("Unsupported shard closed strategy");
-            }
-        }
+    private void updateShardIterator(Shard shard, String nextShardIterator) {
+        currentShardIterators.put(shard.shardId(), nextShardIterator);
     }
 
     @Override
@@ -215,8 +200,14 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R
             throws ExecutionException, InterruptedException {
         // either return a cached one or get a new one via a GetShardIterator
         // request.
-        if (currentShardIterator == null) {
-            var shardId = shard.shardId();
+
+        var shardId = shard.shardId();
+
+        if (currentShardIterators.get(shardId) == null) {
+            if (currentShardIterators.containsKey(shardId)) {
+                // There was previously a shardIterator but shard is now closed
+                handleClosedShard(shardId);
+            }
 
             GetShardIteratorRequest.Builder request = GetShardIteratorRequest.builder()
                     .streamName(getEndpoint().getConfiguration().getStreamName()).shardId(shardId)
@@ -244,12 +235,31 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R
                         .getShardIterator(request.build());
             }
 
-            currentShardIterator = result.shardIterator();
-            LOG.debug("Obtained new ShardIterator {} for shard {} on stream {}", currentShardIterator, shardId,
+            currentShardIterators.put(shardId, result.shardIterator());
+            LOG.debug("Obtained new ShardIterator {} for shard {} on stream {}", result.shardIterator(), shardId,
                     getEndpoint().getConfiguration().getStreamName());
         }
 
-        return currentShardIterator;
+        return currentShardIterators.get(shardId);
+    }
+
+    private void handleClosedShard(String shardId) {
+        switch (getEndpoint().getConfiguration().getShardClosed()) {
+            case ignore:
+                LOG.warn("The shard with id={} on stream {} reached CLOSE status",
+                        shardId, getEndpoint().getConfiguration().getStreamName());
+                break;
+            case silent:
+                break;
+            case fail:
+                LOG.info("The shard with id={} on stream {} reached CLOSE status",
+                        shardId, getEndpoint().getConfiguration().getStreamName());
+                throw new IllegalStateException(
+                        new ReachedClosedStatusException(
+                                getEndpoint().getConfiguration().getStreamName(), shardId));
+            default:
+                throw new IllegalArgumentException("Unsupported shard closed strategy");
+        }
     }
 
     private void resume(GetShardIteratorRequest.Builder req) {
diff --git a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithFailTest.java b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithFailTest.java
index ea9b3882511..9f5156ca5a7 100644
--- a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithFailTest.java
+++ b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithFailTest.java
@@ -41,7 +41,9 @@ import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.fail;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
@@ -92,6 +94,11 @@ public class KinesisConsumerClosedShardWithFailTest {
 
     @Test
     public void itObtainsAShardIteratorOnFirstPoll() {
+        try {
+            underTest.poll();
+        } catch (Exception e) {
+            fail("The first call should not throw an exception");
+        }
         assertThrows(IllegalStateException.class, () -> {
             underTest.poll();
         });
@@ -106,7 +113,7 @@ public class KinesisConsumerClosedShardWithFailTest {
         assertThat(getShardIteratorReqCap.getValue().shardId(), is("shardId"));
         assertThat(getShardIteratorReqCap.getValue().shardIteratorType(), is(ShardIteratorType.LATEST));
 
-        verify(kinesisClient).listShards(getListShardsCap.capture());
+        verify(kinesisClient, times(2)).listShards(getListShardsCap.capture());
         assertThat(getListShardsCap.getValue().streamName(), is("streamName"));
     }
 }
diff --git a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisConsumerIT.java b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisConsumerIT.java
index b9614b1129a..46a62640dc6 100644
--- a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisConsumerIT.java
+++ b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisConsumerIT.java
@@ -45,6 +45,7 @@ import software.amazon.awssdk.services.kinesis.KinesisClient;
 import static org.apache.camel.test.infra.aws2.clients.KinesisUtils.createStream;
 import static org.apache.camel.test.infra.aws2.clients.KinesisUtils.putRecords;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.testcontainers.shaded.org.awaitility.Awaitility.await;
@@ -107,7 +108,7 @@ public class KinesisConsumerIT extends CamelTestSupport {
 
     @BeforeEach
     public void prepareEnvironment() {
-        createStream(client, streamName);
+        createStream(client, streamName, 2);
 
         putRecords(client, streamName, messageCount);
     }
@@ -122,7 +123,7 @@ public class KinesisConsumerIT extends CamelTestSupport {
                 .untilAsserted(() -> result.assertIsSatisfied());
 
         assertEquals(messageCount, receivedMessages.size());
-        int messageCount = 0;
+        String partitionKey = null;
         for (KinesisData data : receivedMessages) {
             ObjectHelper.notNull(data, "data");
             assertNotNull(data.body, "The body should not be null");
@@ -132,7 +133,8 @@ public class KinesisConsumerIT extends CamelTestSupport {
              and so on. This is just testing that the code is not mixing things up.
              */
             assertTrue(data.partition.endsWith(data.body), "The data/partition mismatch for record: " + data);
-            assertEquals(messageCount++, Integer.valueOf(data.partition.substring(data.partition.lastIndexOf('-') + 1)));
+            assertNotEquals(partitionKey, data.partition);
+            partitionKey = data.partition;
         }
     }
 }
diff --git a/test-infra/camel-test-infra-aws-v2/src/test/java/org/apache/camel/test/infra/aws2/clients/KinesisUtils.java b/test-infra/camel-test-infra-aws-v2/src/test/java/org/apache/camel/test/infra/aws2/clients/KinesisUtils.java
index bec6c6022fa..32f02076245 100644
--- a/test-infra/camel-test-infra-aws-v2/src/test/java/org/apache/camel/test/infra/aws2/clients/KinesisUtils.java
+++ b/test-infra/camel-test-infra-aws-v2/src/test/java/org/apache/camel/test/infra/aws2/clients/KinesisUtils.java
@@ -57,10 +57,10 @@ public final class KinesisUtils {
 
     }
 
-    private static void doCreateStream(KinesisClient kinesisClient, String streamName) {
+    private static void doCreateStream(KinesisClient kinesisClient, String streamName, int shardCount) {
         CreateStreamRequest request = CreateStreamRequest.builder()
                 .streamName(streamName)
-                .shardCount(1)
+                .shardCount(shardCount)
                 .build();
 
         try {
@@ -78,6 +78,10 @@ public final class KinesisUtils {
     }
 
     public static void createStream(KinesisClient kinesisClient, String streamName) {
+        createStream(kinesisClient, streamName, 1);
+    }
+
+    public static void createStream(KinesisClient kinesisClient, String streamName, int shardCount) {
         try {
             LOG.info("Checking whether the stream exists already");
             int status = getStreamStatus(kinesisClient, streamName);
@@ -89,7 +93,7 @@ public final class KinesisUtils {
                 LOG.info("The stream does not exist, auto creating it: {}", e.getMessage());
             }
 
-            doCreateStream(kinesisClient, streamName);
+            doCreateStream(kinesisClient, streamName, shardCount);
             TestUtils.waitFor(() -> {
                 try {
                     GetRecordsRequest getRecordsRequest = KinesisUtils.getGetRecordsRequest(kinesisClient, streamName);