You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2023/07/19 20:41:09 UTC
[camel] branch main updated: integrate multi-shard consumer with async client (#10733)
This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new ff2caf3c1a7 integrate multi-shard consumer with async client (#10733)
ff2caf3c1a7 is described below
commit ff2caf3c1a79e2cd978fa6c864311ba16d282a98
Author: Hamed Hatami <ja...@gmail.com>
AuthorDate: Wed Jul 19 22:41:03 2023 +0200
integrate multi-shard consumer with async client (#10733)
* integrate multi-shard consumer with async client
* integrate multi-shard consumer with async client
* integrate multi-shard consumer with async client
* integrate multi-shard consumer with async client
---------
Co-authored-by: Hamed Hatami <ha...@postnord.com>
---
components/camel-aws/camel-aws2-kinesis/pom.xml | 6 +-
.../component/aws2/kinesis/Kinesis2Consumer.java | 87 ++++++++++++++--------
.../KinesisConsumerClosedShardWithSilentTest.java | 6 +-
3 files changed, 61 insertions(+), 38 deletions(-)
diff --git a/components/camel-aws/camel-aws2-kinesis/pom.xml b/components/camel-aws/camel-aws2-kinesis/pom.xml
index 07811f6045a..b85766e980a 100644
--- a/components/camel-aws/camel-aws2-kinesis/pom.xml
+++ b/components/camel-aws/camel-aws2-kinesis/pom.xml
@@ -43,9 +43,9 @@
<version>${aws-java-sdk2-version}</version>
</dependency>
<dependency>
- <groupId>software.amazon.kinesis</groupId>
- <artifactId>amazon-kinesis-client</artifactId>
- <version>${amazon-kinesis-common-version}</version>
+ <groupId>software.amazon.awssdk</groupId>
+ <artifactId>netty-nio-client</artifactId>
+ <version>${aws-java-sdk2-version}</version>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
index b00bdfb4e89..323b9c961d4 100644
--- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
+++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
@@ -16,12 +16,6 @@
*/
package org.apache.camel.component.aws2.kinesis;
-import java.util.ArrayDeque;
-import java.util.List;
-import java.util.Queue;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.atomic.AtomicInteger;
-
import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
@@ -35,6 +29,14 @@ import org.apache.camel.util.CastUtils;
import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
+import java.util.ArrayDeque;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
@@ -46,10 +48,7 @@ import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements ResumeAware<ResumeStrategy> {
-
private static final Logger LOG = LoggerFactory.getLogger(Kinesis2Consumer.class);
-
- private String currentShardIterator;
private boolean isShardClosed;
private ResumeStrategy resumeStrategy;
@@ -88,7 +87,20 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R
.getConfiguration()
.getMaxResultsPerRequest())
.build();
- GetRecordsResponse result = getClient().getRecords(req);
+
+ GetRecordsResponse result = null;
+ if (getEndpoint().getConfiguration().isAsyncClient()) {
+ try {
+ result = getAsyncClient()
+ .getRecords(req)
+ .get();
+ } catch (ExecutionException | InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ } else {
+ result = getClient().getRecords(req);
+ }
+
try {
Queue<Exchange> exchanges = createExchanges(result.records());
@@ -102,7 +114,7 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R
// we left off, however, I don't know what happens to subsequent
// exchanges when an earlier exchange fails.
- currentShardIterator = result.nextShardIterator();
+ var currentShardIterator = result.nextShardIterator();
if (isShardClosed) {
switch (getEndpoint().getConfiguration().getShardClosed()) {
case ignore:
@@ -145,36 +157,33 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R
return getEndpoint().getClient();
}
+ private KinesisAsyncClient getAsyncClient() {
+ return getEndpoint().getAsyncClient();
+ }
+
@Override
public Kinesis2Endpoint getEndpoint() {
return (Kinesis2Endpoint) super.getEndpoint();
}
private String getShardIterator(final Shard shard) throws ExecutionException, InterruptedException {
- // either return a cached one or get a new one via a GetShardIterator
- // request.
- if (currentShardIterator == null) {
- var shardId = shard.shardId();
- isShardClosed = shard.sequenceNumberRange().endingSequenceNumber() != null;
+ var shardId = shard.shardId();
+ isShardClosed = shard.sequenceNumberRange().endingSequenceNumber() != null;
+ LOG.debug("ShardId is: {}", shardId);
- LOG.debug("ShardId is: {}", shardId);
+ GetShardIteratorRequest.Builder request = GetShardIteratorRequest.builder()
+ .streamName(getEndpoint().getConfiguration().getStreamName()).shardId(shardId)
+ .shardIteratorType(getEndpoint().getConfiguration().getIteratorType());
- GetShardIteratorRequest.Builder request = GetShardIteratorRequest.builder()
- .streamName(getEndpoint().getConfiguration().getStreamName()).shardId(shardId)
- .shardIteratorType(getEndpoint().getConfiguration().getIteratorType());
-
- if (hasSequenceNumber()) {
- request.startingSequenceNumber(getEndpoint().getConfiguration().getSequenceNumber());
- }
-
- resume(request);
- GetShardIteratorResponse result = getClient().getShardIterator(request.build());
- currentShardIterator = result.shardIterator();
+ if (hasSequenceNumber()) {
+ request.startingSequenceNumber(getEndpoint().getConfiguration().getSequenceNumber());
}
- LOG.debug("Shard Iterator is: {}", currentShardIterator);
- return currentShardIterator;
+ resume(request);
+ GetShardIteratorResponse result = getClient().getShardIterator(request.build());
+
+ return result.shardIterator();
}
private void resume(GetShardIteratorRequest.Builder req) {
@@ -228,7 +237,7 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R
private boolean hasSequenceNumber() {
return !getEndpoint().getConfiguration().getSequenceNumber().isEmpty()
&& (getEndpoint().getConfiguration().getIteratorType().equals(ShardIteratorType.AFTER_SEQUENCE_NUMBER)
- || getEndpoint().getConfiguration().getIteratorType().equals(ShardIteratorType.AT_SEQUENCE_NUMBER));
+ || getEndpoint().getConfiguration().getIteratorType().equals(ShardIteratorType.AT_SEQUENCE_NUMBER));
}
@Override
@@ -261,7 +270,21 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R
.streamName(getEndpoint().getConfiguration().getStreamName())
.build();
- return getClient().listShards(request).shards();
+ List<Shard> shardList = null;
+ if (getEndpoint().getConfiguration().isAsyncClient()) {
+ try {
+ shardList = getAsyncClient()
+ .listShards(request)
+ .get()
+ .shards();
+ } catch (ExecutionException | InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ } else {
+ shardList = getClient().listShards(request).shards();
+ }
+
+ return shardList;
}
}
diff --git a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithSilentTest.java b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithSilentTest.java
index 76a5599e8f1..b7899b95efd 100644
--- a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithSilentTest.java
+++ b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithSilentTest.java
@@ -79,7 +79,7 @@ public class KinesisConsumerClosedShardWithSilentTest {
shardList.add(shard);
when(kinesisClient.getRecords(any(GetRecordsRequest.class))).thenReturn(GetRecordsResponse.builder()
- .nextShardIterator("nextShardIterator")
+ .nextShardIterator("shardIterator")
.records(
Record.builder().sequenceNumber("1").data(SdkBytes.fromString("Hello", Charset.defaultCharset()))
.build(),
@@ -160,9 +160,9 @@ public class KinesisConsumerClosedShardWithSilentTest {
final ArgumentCaptor<GetRecordsRequest> getRecordsReqCap = ArgumentCaptor.forClass(GetRecordsRequest.class);
- verify(kinesisClient, times(1)).getShardIterator(any(GetShardIteratorRequest.class));
+ verify(kinesisClient, times(2)).getShardIterator(any(GetShardIteratorRequest.class));
verify(kinesisClient, times(2)).getRecords(getRecordsReqCap.capture());
assertThat(getRecordsReqCap.getAllValues().get(0).shardIterator(), is("shardIterator"));
- assertThat(getRecordsReqCap.getAllValues().get(1).shardIterator(), is("nextShardIterator"));
+ assertThat(getRecordsReqCap.getAllValues().get(1).shardIterator(), is("shardIterator"));
}
}