You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2023/07/19 20:41:09 UTC

[camel] branch main updated: integrate multi-shard consumer with async client (#10733)

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new ff2caf3c1a7 integrate multi-shard consumer with async client (#10733)
ff2caf3c1a7 is described below

commit ff2caf3c1a79e2cd978fa6c864311ba16d282a98
Author: Hamed Hatami <ja...@gmail.com>
AuthorDate: Wed Jul 19 22:41:03 2023 +0200

    integrate multi-shard consumer with async client (#10733)
    
    * integrate multi-shard consumer with async client
    
    * integrate multi-shard consumer with async client
    
    * integrate multi-shard consumer with async client
    
    * integrate multi-shard consumer with async client
    
    ---------
    
    Co-authored-by: Hamed Hatami <ha...@postnord.com>
---
 components/camel-aws/camel-aws2-kinesis/pom.xml    |  6 +-
 .../component/aws2/kinesis/Kinesis2Consumer.java   | 87 ++++++++++++++--------
 .../KinesisConsumerClosedShardWithSilentTest.java  |  6 +-
 3 files changed, 61 insertions(+), 38 deletions(-)

diff --git a/components/camel-aws/camel-aws2-kinesis/pom.xml b/components/camel-aws/camel-aws2-kinesis/pom.xml
index 07811f6045a..b85766e980a 100644
--- a/components/camel-aws/camel-aws2-kinesis/pom.xml
+++ b/components/camel-aws/camel-aws2-kinesis/pom.xml
@@ -43,9 +43,9 @@
             <version>${aws-java-sdk2-version}</version>
         </dependency>
         <dependency>
-            <groupId>software.amazon.kinesis</groupId>
-            <artifactId>amazon-kinesis-client</artifactId>
-            <version>${amazon-kinesis-common-version}</version>
+            <groupId>software.amazon.awssdk</groupId>
+            <artifactId>netty-nio-client</artifactId>
+            <version>${aws-java-sdk2-version}</version>
         </dependency>
         <dependency>
             <groupId>software.amazon.awssdk</groupId>
diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
index b00bdfb4e89..323b9c961d4 100644
--- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
+++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
@@ -16,12 +16,6 @@
  */
 package org.apache.camel.component.aws2.kinesis;
 
-import java.util.ArrayDeque;
-import java.util.List;
-import java.util.Queue;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.atomic.AtomicInteger;
-
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
@@ -35,6 +29,14 @@ import org.apache.camel.util.CastUtils;
 import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
+import java.util.ArrayDeque;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
 import software.amazon.awssdk.services.kinesis.KinesisClient;
 import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
 import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
@@ -46,10 +48,7 @@ import software.amazon.awssdk.services.kinesis.model.Shard;
 import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
 
 public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements ResumeAware<ResumeStrategy> {
-
     private static final Logger LOG = LoggerFactory.getLogger(Kinesis2Consumer.class);
-
-    private String currentShardIterator;
     private boolean isShardClosed;
     private ResumeStrategy resumeStrategy;
 
@@ -88,7 +87,20 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R
                                     .getConfiguration()
                                     .getMaxResultsPerRequest())
                             .build();
-                    GetRecordsResponse result = getClient().getRecords(req);
+
+                    GetRecordsResponse result = null;
+                    if (getEndpoint().getConfiguration().isAsyncClient()) {
+                        try {
+                            result = getAsyncClient()
+                                    .getRecords(req)
+                                    .get();
+                        } catch (ExecutionException | InterruptedException e) {
+                            throw new RuntimeException(e);
+                        }
+                    } else {
+                        result = getClient().getRecords(req);
+                    }
+
 
                     try {
                         Queue<Exchange> exchanges = createExchanges(result.records());
@@ -102,7 +114,7 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R
                     // we left off, however, I don't know what happens to subsequent
                     // exchanges when an earlier exchange fails.
 
-                    currentShardIterator = result.nextShardIterator();
+                    var currentShardIterator = result.nextShardIterator();
                     if (isShardClosed) {
                         switch (getEndpoint().getConfiguration().getShardClosed()) {
                             case ignore:
@@ -145,36 +157,33 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R
         return getEndpoint().getClient();
     }
 
+    private KinesisAsyncClient getAsyncClient() {
+        return getEndpoint().getAsyncClient();
+    }
+
     @Override
     public Kinesis2Endpoint getEndpoint() {
         return (Kinesis2Endpoint) super.getEndpoint();
     }
 
     private String getShardIterator(final Shard shard) throws ExecutionException, InterruptedException {
-        // either return a cached one or get a new one via a GetShardIterator
-        // request.
-        if (currentShardIterator == null) {
-            var shardId = shard.shardId();
 
-            isShardClosed = shard.sequenceNumberRange().endingSequenceNumber() != null;
+        var shardId = shard.shardId();
+        isShardClosed = shard.sequenceNumberRange().endingSequenceNumber() != null;
+        LOG.debug("ShardId is: {}", shardId);
 
-            LOG.debug("ShardId is: {}", shardId);
+        GetShardIteratorRequest.Builder request = GetShardIteratorRequest.builder()
+                .streamName(getEndpoint().getConfiguration().getStreamName()).shardId(shardId)
+                .shardIteratorType(getEndpoint().getConfiguration().getIteratorType());
 
-            GetShardIteratorRequest.Builder request = GetShardIteratorRequest.builder()
-                    .streamName(getEndpoint().getConfiguration().getStreamName()).shardId(shardId)
-                    .shardIteratorType(getEndpoint().getConfiguration().getIteratorType());
-
-            if (hasSequenceNumber()) {
-                request.startingSequenceNumber(getEndpoint().getConfiguration().getSequenceNumber());
-            }
-
-            resume(request);
-            GetShardIteratorResponse result = getClient().getShardIterator(request.build());
-            currentShardIterator = result.shardIterator();
+        if (hasSequenceNumber()) {
+            request.startingSequenceNumber(getEndpoint().getConfiguration().getSequenceNumber());
         }
 
-        LOG.debug("Shard Iterator is: {}", currentShardIterator);
-        return currentShardIterator;
+        resume(request);
+        GetShardIteratorResponse result = getClient().getShardIterator(request.build());
+
+        return result.shardIterator();
     }
 
     private void resume(GetShardIteratorRequest.Builder req) {
@@ -228,7 +237,7 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R
     private boolean hasSequenceNumber() {
         return !getEndpoint().getConfiguration().getSequenceNumber().isEmpty()
                 && (getEndpoint().getConfiguration().getIteratorType().equals(ShardIteratorType.AFTER_SEQUENCE_NUMBER)
-                        || getEndpoint().getConfiguration().getIteratorType().equals(ShardIteratorType.AT_SEQUENCE_NUMBER));
+                || getEndpoint().getConfiguration().getIteratorType().equals(ShardIteratorType.AT_SEQUENCE_NUMBER));
     }
 
     @Override
@@ -261,7 +270,21 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R
                 .streamName(getEndpoint().getConfiguration().getStreamName())
                 .build();
 
-        return getClient().listShards(request).shards();
+        List<Shard> shardList = null;
+        if (getEndpoint().getConfiguration().isAsyncClient()) {
+            try {
+                shardList = getAsyncClient()
+                        .listShards(request)
+                        .get()
+                        .shards();
+            } catch (ExecutionException | InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+        } else {
+            shardList = getClient().listShards(request).shards();
+        }
+
+        return shardList;
 
     }
 }
diff --git a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithSilentTest.java b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithSilentTest.java
index 76a5599e8f1..b7899b95efd 100644
--- a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithSilentTest.java
+++ b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithSilentTest.java
@@ -79,7 +79,7 @@ public class KinesisConsumerClosedShardWithSilentTest {
         shardList.add(shard);
 
         when(kinesisClient.getRecords(any(GetRecordsRequest.class))).thenReturn(GetRecordsResponse.builder()
-                .nextShardIterator("nextShardIterator")
+                .nextShardIterator("shardIterator")
                 .records(
                         Record.builder().sequenceNumber("1").data(SdkBytes.fromString("Hello", Charset.defaultCharset()))
                                 .build(),
@@ -160,9 +160,9 @@ public class KinesisConsumerClosedShardWithSilentTest {
 
         final ArgumentCaptor<GetRecordsRequest> getRecordsReqCap = ArgumentCaptor.forClass(GetRecordsRequest.class);
 
-        verify(kinesisClient, times(1)).getShardIterator(any(GetShardIteratorRequest.class));
+        verify(kinesisClient, times(2)).getShardIterator(any(GetShardIteratorRequest.class));
         verify(kinesisClient, times(2)).getRecords(getRecordsReqCap.capture());
         assertThat(getRecordsReqCap.getAllValues().get(0).shardIterator(), is("shardIterator"));
-        assertThat(getRecordsReqCap.getAllValues().get(1).shardIterator(), is("nextShardIterator"));
+        assertThat(getRecordsReqCap.getAllValues().get(1).shardIterator(), is("shardIterator"));
     }
 }