You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2023/07/19 09:42:00 UTC
[camel] branch main updated: CAMEL-19583 : multiple shards consumption of Camel aws2 Kinesis (#10709)
This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new f8bad1bbf26 CAMEL-19583 : multiple shards consumption of Camel aws2 Kinesis (#10709)
f8bad1bbf26 is described below
commit f8bad1bbf26876f8f62a043dcb16cf5392a97344
Author: Hamed Hatami <ja...@gmail.com>
AuthorDate: Wed Jul 19 11:41:54 2023 +0200
CAMEL-19583 : multiple shards consumption of Camel aws2 Kinesis (#10709)
* CAMEL-19583 : multiple shards consumption of Camel aws2 Kinesis
* CAMEL-19583 : multiple shards consumption of Camel aws2 Kinesis
---------
Co-authored-by: Hamed Hatami <ha...@postnord.com>
---
.../component/aws2/kinesis/Kinesis2Consumer.java | 188 +++++++++------------
.../KinesisConsumerClosedShardWithFailTest.java | 27 ++-
.../KinesisConsumerClosedShardWithSilentTest.java | 42 ++---
3 files changed, 109 insertions(+), 148 deletions(-)
diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
index 196032c9560..b00bdfb4e89 100644
--- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
+++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
@@ -20,6 +20,7 @@ import java.util.ArrayDeque;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
@@ -34,14 +35,12 @@ import org.apache.camel.util.CastUtils;
import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisClient;
-import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;
-import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse;
import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse;
+import software.amazon.awssdk.services.kinesis.model.ListShardsRequest;
import software.amazon.awssdk.services.kinesis.model.Record;
import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
@@ -63,58 +62,69 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R
@Override
protected int poll() throws Exception {
- String shardIterator = getShardIterator();
- if (shardIterator == null) {
- // probably closed. Returning 0 as nothing was processed
+ var processedExchangeCount = new AtomicInteger(0);
- return 0;
- }
+ getShardList()
+ .parallelStream()
+ .forEach(shard -> {
- GetRecordsRequest req = GetRecordsRequest
- .builder()
- .shardIterator(shardIterator)
- .limit(getEndpoint()
- .getConfiguration()
- .getMaxResultsPerRequest())
- .build();
+ String shardIterator = null;
+ try {
+ shardIterator = getShardIterator(shard);
+ } catch (ExecutionException | InterruptedException e) {
+ throw new RuntimeException(e);
+ }
- GetRecordsResponse result = null;
- if (getEndpoint().getConfiguration().isAsyncClient()) {
- result = getAsyncClient()
- .getRecords(req)
- .get();
- } else {
- result = getClient().getRecords(req);
- }
+ if (shardIterator == null) {
+ // probably closed. Returning 0 as nothing was processed
+ processedExchangeCount.set(0);
+ }
- Queue<Exchange> exchanges = createExchanges(result.records());
- int processedExchangeCount = processBatch(CastUtils.cast(exchanges));
-
- // May cache the last successful sequence number, and pass it to the
- // getRecords request. That way, on the next poll, we start from where
- // we left off, however, I don't know what happens to subsequent
- // exchanges when an earlier exchange fails.
-
- currentShardIterator = result.nextShardIterator();
- if (isShardClosed) {
- switch (getEndpoint().getConfiguration().getShardClosed()) {
- case ignore:
- LOG.warn("The shard {} is in closed state", currentShardIterator);
- break;
- case silent:
- break;
- case fail:
- LOG.info("Shard Iterator reaches CLOSE status:{} {}", getEndpoint().getConfiguration().getStreamName(),
- getEndpoint().getConfiguration().getShardId());
- throw new ReachedClosedStatusException(
- getEndpoint().getConfiguration().getStreamName(), getEndpoint().getConfiguration().getShardId());
- default:
- throw new IllegalArgumentException("Unsupported shard closed strategy");
- }
- }
+ GetRecordsRequest req = GetRecordsRequest
+ .builder()
+ .shardIterator(shardIterator)
+ .limit(getEndpoint()
+ .getConfiguration()
+ .getMaxResultsPerRequest())
+ .build();
+ GetRecordsResponse result = getClient().getRecords(req);
+
+ try {
+ Queue<Exchange> exchanges = createExchanges(result.records());
+ processedExchangeCount.getAndSet(processBatch(CastUtils.cast(exchanges)));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ // May cache the last successful sequence number, and pass it to the
+ // getRecords request. That way, on the next poll, we start from where
+ // we left off, however, I don't know what happens to subsequent
+ // exchanges when an earlier exchange fails.
+
+ currentShardIterator = result.nextShardIterator();
+ if (isShardClosed) {
+ switch (getEndpoint().getConfiguration().getShardClosed()) {
+ case ignore:
+ LOG.warn("The shard {} is in closed state", currentShardIterator);
+ break;
+ case silent:
+ break;
+ case fail:
+ LOG.info("Shard Iterator reaches CLOSE status:{} {}",
+ getEndpoint().getConfiguration().getStreamName(),
+ getEndpoint().getConfiguration().getShardId());
+ throw new IllegalStateException(
+ new ReachedClosedStatusException(
+ getEndpoint().getConfiguration().getStreamName(), shard.shardId()));
+ default:
+ throw new IllegalArgumentException("Unsupported shard closed strategy");
+ }
+ }
- return processedExchangeCount;
+ });
+
+ return processedExchangeCount.get();
}
@Override
@@ -135,82 +145,31 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R
return getEndpoint().getClient();
}
- private KinesisAsyncClient getAsyncClient() {
- return getEndpoint().getAsyncClient();
- }
-
@Override
public Kinesis2Endpoint getEndpoint() {
return (Kinesis2Endpoint) super.getEndpoint();
}
- private String getShardIterator() throws ExecutionException, InterruptedException {
+ private String getShardIterator(final Shard shard) throws ExecutionException, InterruptedException {
// either return a cached one or get a new one via a GetShardIterator
// request.
if (currentShardIterator == null) {
- String shardId;
-
- // If ShardId supplied use it, else choose first one
- if (!getEndpoint().getConfiguration().getShardId().isEmpty()) {
- shardId = getEndpoint().getConfiguration().getShardId();
- DescribeStreamRequest request
- = DescribeStreamRequest.builder().streamName(getEndpoint().getConfiguration().getStreamName()).build();
-
- DescribeStreamResponse response = null;
- if (getEndpoint().getConfiguration().isAsyncClient()) {
- response = getAsyncClient()
- .describeStream(request)
- .get();
- } else {
- response = getClient().describeStream(request);
- }
- for (Shard shard : response.streamDescription().shards()) {
- if (shard.shardId().equalsIgnoreCase(getEndpoint().getConfiguration().getShardId())) {
- isShardClosed = shard.sequenceNumberRange().endingSequenceNumber() != null;
- }
- }
-
- } else {
- DescribeStreamRequest request
- = DescribeStreamRequest.builder().streamName(getEndpoint().getConfiguration().getStreamName()).build();
-
- DescribeStreamResponse response = null;
- if (getEndpoint().getConfiguration().isAsyncClient()) {
- response = getAsyncClient()
- .describeStream(request)
- .get();
- } else {
- response = getClient().describeStream(request);
- }
- List<Shard> shards = response.streamDescription().shards();
-
- if (shards.isEmpty()) {
- LOG.warn("There are no shards in the stream");
- return null;
- }
-
- shardId = shards.get(0).shardId();
- isShardClosed = shards.get(0).sequenceNumberRange().endingSequenceNumber() != null;
- }
+ var shardId = shard.shardId();
+
+ isShardClosed = shard.sequenceNumberRange().endingSequenceNumber() != null;
+
LOG.debug("ShardId is: {}", shardId);
- GetShardIteratorRequest.Builder req = GetShardIteratorRequest.builder()
+ GetShardIteratorRequest.Builder request = GetShardIteratorRequest.builder()
.streamName(getEndpoint().getConfiguration().getStreamName()).shardId(shardId)
.shardIteratorType(getEndpoint().getConfiguration().getIteratorType());
if (hasSequenceNumber()) {
- req.startingSequenceNumber(getEndpoint().getConfiguration().getSequenceNumber());
- }
-
- resume(req);
-
- GetShardIteratorResponse result = null;
- if (getEndpoint().getConfiguration().isAsyncClient()) {
- result = getAsyncClient().getShardIterator(req.build()).get();
- } else {
- result = getClient().getShardIterator(req.build());
+ request.startingSequenceNumber(getEndpoint().getConfiguration().getSequenceNumber());
}
+ resume(request);
+ GetShardIteratorResponse result = getClient().getShardIterator(request.build());
currentShardIterator = result.shardIterator();
}
@@ -294,4 +253,15 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R
protected Kinesis2Configuration getConfiguration() {
return getEndpoint().getConfiguration();
}
+
+ private List<Shard> getShardList() {
+
+ var request = ListShardsRequest
+ .builder()
+ .streamName(getEndpoint().getConfiguration().getStreamName())
+ .build();
+
+ return getClient().listShards(request).shards();
+
+ }
}
diff --git a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithFailTest.java b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithFailTest.java
index 55b718f299d..48e739e423f 100644
--- a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithFailTest.java
+++ b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithFailTest.java
@@ -28,16 +28,15 @@ import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import software.amazon.awssdk.services.kinesis.KinesisClient;
-import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;
-import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse;
import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse;
+import software.amazon.awssdk.services.kinesis.model.ListShardsRequest;
+import software.amazon.awssdk.services.kinesis.model.ListShardsResponse;
import software.amazon.awssdk.services.kinesis.model.SequenceNumberRange;
import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
-import software.amazon.awssdk.services.kinesis.model.StreamDescription;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -57,7 +56,7 @@ public class KinesisConsumerClosedShardWithFailTest {
private final CamelContext context = new DefaultCamelContext();
private final Kinesis2Component component = new Kinesis2Component(context);
- private Kinesis2Consumer undertest;
+ private Kinesis2Consumer underTest;
@BeforeEach
public void setup() {
@@ -68,7 +67,7 @@ public class KinesisConsumerClosedShardWithFailTest {
configuration.setStreamName("streamName");
Kinesis2Endpoint endpoint = new Kinesis2Endpoint(null, configuration, component);
endpoint.start();
- undertest = new Kinesis2Consumer(endpoint, processor);
+ underTest = new Kinesis2Consumer(endpoint, processor);
SequenceNumberRange range = SequenceNumberRange.builder().endingSequenceNumber("20").build();
Shard shard = Shard.builder().shardId("shardId").sequenceNumberRange(range).build();
@@ -77,29 +76,29 @@ public class KinesisConsumerClosedShardWithFailTest {
when(kinesisClient.getRecords(any(GetRecordsRequest.class)))
.thenReturn(GetRecordsResponse.builder().nextShardIterator("nextShardIterator").build());
- when(kinesisClient.describeStream(any(DescribeStreamRequest.class)))
- .thenReturn(DescribeStreamResponse.builder()
- .streamDescription(StreamDescription.builder().shards(shardList).build()).build());
when(kinesisClient.getShardIterator(any(GetShardIteratorRequest.class)))
.thenReturn(GetShardIteratorResponse.builder().shardIterator("shardIterator").build());
+ when(kinesisClient.listShards(any(ListShardsRequest.class)))
+ .thenReturn(ListShardsResponse.builder().shards(shardList).build());
}
@Test
public void itObtainsAShardIteratorOnFirstPoll() {
- assertThrows(ReachedClosedStatusException.class, () -> {
- undertest.poll();
+ assertThrows(IllegalStateException.class, () -> {
+ underTest.poll();
});
- final ArgumentCaptor<DescribeStreamRequest> describeStreamReqCap = ArgumentCaptor.forClass(DescribeStreamRequest.class);
final ArgumentCaptor<GetShardIteratorRequest> getShardIteratorReqCap
= ArgumentCaptor.forClass(GetShardIteratorRequest.class);
-
- verify(kinesisClient).describeStream(describeStreamReqCap.capture());
- assertThat(describeStreamReqCap.getValue().streamName(), is("streamName"));
+ final ArgumentCaptor<ListShardsRequest> getListShardsCap
+ = ArgumentCaptor.forClass(ListShardsRequest.class);
verify(kinesisClient).getShardIterator(getShardIteratorReqCap.capture());
assertThat(getShardIteratorReqCap.getValue().streamName(), is("streamName"));
assertThat(getShardIteratorReqCap.getValue().shardId(), is("shardId"));
assertThat(getShardIteratorReqCap.getValue().shardIteratorType(), is(ShardIteratorType.LATEST));
+
+ verify(kinesisClient).listShards(getListShardsCap.capture());
+ assertThat(getListShardsCap.getValue().streamName(), is("streamName"));
}
}
diff --git a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithSilentTest.java b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithSilentTest.java
index 41afc449035..76a5599e8f1 100644
--- a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithSilentTest.java
+++ b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithSilentTest.java
@@ -31,16 +31,16 @@ import org.mockito.junit.jupiter.MockitoExtension;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;
-import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse;
import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse;
+import software.amazon.awssdk.services.kinesis.model.ListShardsRequest;
+import software.amazon.awssdk.services.kinesis.model.ListShardsResponse;
import software.amazon.awssdk.services.kinesis.model.Record;
import software.amazon.awssdk.services.kinesis.model.SequenceNumberRange;
import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
-import software.amazon.awssdk.services.kinesis.model.StreamDescription;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -60,7 +60,7 @@ public class KinesisConsumerClosedShardWithSilentTest {
private final CamelContext context = new DefaultCamelContext();
private final Kinesis2Component component = new Kinesis2Component(context);
- private Kinesis2Consumer undertest;
+ private Kinesis2Consumer underTest;
@BeforeEach
public void setup() {
@@ -71,7 +71,7 @@ public class KinesisConsumerClosedShardWithSilentTest {
configuration.setStreamName("streamName");
Kinesis2Endpoint endpoint = new Kinesis2Endpoint("aws2-kinesis:foo", configuration, component);
endpoint.start();
- undertest = new Kinesis2Consumer(endpoint, processor);
+ underTest = new Kinesis2Consumer(endpoint, processor);
SequenceNumberRange range = SequenceNumberRange.builder().endingSequenceNumber("20").build();
Shard shard = Shard.builder().shardId("shardId").sequenceNumberRange(range).build();
@@ -86,27 +86,23 @@ public class KinesisConsumerClosedShardWithSilentTest {
Record.builder().sequenceNumber("2").data(SdkBytes.fromString("Hello", Charset.defaultCharset()))
.build())
.build());
- when(kinesisClient.describeStream(any(DescribeStreamRequest.class)))
- .thenReturn(DescribeStreamResponse.builder()
- .streamDescription(StreamDescription.builder().shards(shardList).build()).build());
when(kinesisClient.getShardIterator(any(GetShardIteratorRequest.class)))
.thenReturn(GetShardIteratorResponse.builder().shardIterator("shardIterator").build());
+ when(kinesisClient.listShards(any(ListShardsRequest.class)))
+ .thenReturn(ListShardsResponse.builder().shards(shardList).build());
context.start();
- undertest.start();
+ underTest.start();
}
@Test
public void itObtainsAShardIteratorOnFirstPoll() throws Exception {
- undertest.poll();
+ underTest.poll();
final ArgumentCaptor<DescribeStreamRequest> describeStreamReqCap = ArgumentCaptor.forClass(DescribeStreamRequest.class);
final ArgumentCaptor<GetShardIteratorRequest> getShardIteratorReqCap
= ArgumentCaptor.forClass(GetShardIteratorRequest.class);
- verify(kinesisClient).describeStream(describeStreamReqCap.capture());
- assertThat(describeStreamReqCap.getValue().streamName(), is("streamName"));
-
verify(kinesisClient).getShardIterator(getShardIteratorReqCap.capture());
assertThat(getShardIteratorReqCap.getValue().streamName(), is("streamName"));
assertThat(getShardIteratorReqCap.getValue().shardId(), is("shardId"));
@@ -115,33 +111,30 @@ public class KinesisConsumerClosedShardWithSilentTest {
@Test
public void itDoesNotMakeADescribeStreamRequestIfShardIdIsSet() throws Exception {
- undertest.getEndpoint().getConfiguration().setShardId("shardIdPassedAsUrlParam");
+ underTest.getEndpoint().getConfiguration().setShardId("shardId");
- undertest.poll();
+ underTest.poll();
final ArgumentCaptor<GetShardIteratorRequest> getShardIteratorReqCap
= ArgumentCaptor.forClass(GetShardIteratorRequest.class);
verify(kinesisClient).getShardIterator(getShardIteratorReqCap.capture());
assertThat(getShardIteratorReqCap.getValue().streamName(), is("streamName"));
- assertThat(getShardIteratorReqCap.getValue().shardId(), is("shardIdPassedAsUrlParam"));
+ assertThat(getShardIteratorReqCap.getValue().shardId(), is("shardId"));
assertThat(getShardIteratorReqCap.getValue().shardIteratorType(), is(ShardIteratorType.LATEST));
}
@Test
public void itObtainsAShardIteratorOnFirstPollForSequenceNumber() throws Exception {
- undertest.getEndpoint().getConfiguration().setSequenceNumber("12345");
- undertest.getEndpoint().getConfiguration().setIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER);
+ underTest.getEndpoint().getConfiguration().setSequenceNumber("12345");
+ underTest.getEndpoint().getConfiguration().setIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER);
- undertest.poll();
+ underTest.poll();
final ArgumentCaptor<DescribeStreamRequest> describeStreamReqCap = ArgumentCaptor.forClass(DescribeStreamRequest.class);
final ArgumentCaptor<GetShardIteratorRequest> getShardIteratorReqCap
= ArgumentCaptor.forClass(GetShardIteratorRequest.class);
- verify(kinesisClient).describeStream(describeStreamReqCap.capture());
- assertThat(describeStreamReqCap.getValue().streamName(), is("streamName"));
-
verify(kinesisClient).getShardIterator(getShardIteratorReqCap.capture());
assertThat(getShardIteratorReqCap.getValue().streamName(), is("streamName"));
assertThat(getShardIteratorReqCap.getValue().shardId(), is("shardId"));
@@ -152,7 +145,7 @@ public class KinesisConsumerClosedShardWithSilentTest {
@Test
public void itUsesTheShardIteratorOnPolls() throws Exception {
- undertest.poll();
+ underTest.poll();
final ArgumentCaptor<GetRecordsRequest> getRecordsReqCap = ArgumentCaptor.forClass(GetRecordsRequest.class);
verify(kinesisClient).getRecords(getRecordsReqCap.capture());
@@ -162,12 +155,11 @@ public class KinesisConsumerClosedShardWithSilentTest {
@Test
public void itUsesTheShardIteratorOnSubsiquentPolls() throws Exception {
- undertest.poll();
- undertest.poll();
+ underTest.poll();
+ underTest.poll();
final ArgumentCaptor<GetRecordsRequest> getRecordsReqCap = ArgumentCaptor.forClass(GetRecordsRequest.class);
- verify(kinesisClient, times(1)).describeStream(any(DescribeStreamRequest.class));
verify(kinesisClient, times(1)).getShardIterator(any(GetShardIteratorRequest.class));
verify(kinesisClient, times(2)).getRecords(getRecordsReqCap.capture());
assertThat(getRecordsReqCap.getAllValues().get(0).shardIterator(), is("shardIterator"));