You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2023/07/21 08:49:16 UTC
[camel] branch main updated: add the capability of consume only a single shard or more (#10753)
This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 4afac70fa11 add the capability of consume only a single shard or more (#10753)
4afac70fa11 is described below
commit 4afac70fa1108042dc1a21790aee95a48cdab1be
Author: Hamed Hatami <ja...@gmail.com>
AuthorDate: Fri Jul 21 10:49:09 2023 +0200
add the capability of consume only a single shard or more (#10753)
* add the capability of consume only a single shard or more
* add the capability of consume only a single shard or more
---------
Co-authored-by: Hamed Hatami <ha...@postnord.com>
---
.../component/aws2/kinesis/Kinesis2Consumer.java | 178 +++++++++++++--------
.../KinesisConsumerClosedShardWithSilentTest.java | 16 +-
2 files changed, 124 insertions(+), 70 deletions(-)
diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
index 696b56e165a..6a9638641a2 100644
--- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
+++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
@@ -37,6 +37,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisClient;
+import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;
+import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse;
import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
@@ -63,80 +65,118 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R
var processedExchangeCount = new AtomicInteger(0);
- getShardList()
- .parallelStream()
- .forEach(shard -> {
-
- String shardIterator = null;
- try {
- shardIterator = getShardIterator(shard);
- } catch (ExecutionException | InterruptedException e) {
- throw new RuntimeException(e);
- }
-
- if (shardIterator == null) {
- // probably closed. Returning 0 as nothing was processed
- processedExchangeCount.set(0);
- }
-
- GetRecordsRequest req = GetRecordsRequest
- .builder()
- .shardIterator(shardIterator)
- .limit(getEndpoint()
+ if (!getEndpoint().getConfiguration().getShardId().isEmpty()) {
+ var request = DescribeStreamRequest
+ .builder()
+ .streamName(getEndpoint().getConfiguration().getStreamName())
+ .build();
+ DescribeStreamResponse response = null;
+ if (getEndpoint().getConfiguration().isAsyncClient()) {
+ try {
+ response = getAsyncClient()
+ .describeStream(request)
+ .get();
+ } catch (ExecutionException | InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ } else {
+ response = getClient().describeStream(request);
+ }
+
+ var shard = response
+ .streamDescription()
+ .shards()
+ .stream()
+ .filter(shardItem -> shardItem
+ .shardId()
+ .equalsIgnoreCase(getEndpoint()
.getConfiguration()
- .getMaxResultsPerRequest())
- .build();
-
- GetRecordsResponse result = null;
- if (getEndpoint().getConfiguration().isAsyncClient()) {
- try {
- result = getAsyncClient()
- .getRecords(req)
- .get();
- } catch (ExecutionException | InterruptedException e) {
- throw new RuntimeException(e);
- }
- } else {
- result = getClient().getRecords(req);
- }
-
- try {
- Queue<Exchange> exchanges = createExchanges(result.records());
- processedExchangeCount.getAndSet(processBatch(CastUtils.cast(exchanges)));
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
-
- // May cache the last successful sequence number, and pass it to the
- // getRecords request. That way, on the next poll, we start from where
- // we left off, however, I don't know what happens to subsequent
- // exchanges when an earlier exchange fails.
-
- var currentShardIterator = result.nextShardIterator();
- if (isShardClosed) {
- switch (getEndpoint().getConfiguration().getShardClosed()) {
- case ignore:
- LOG.warn("The shard {} is in closed state", currentShardIterator);
- break;
- case silent:
- break;
- case fail:
- LOG.info("Shard Iterator reaches CLOSE status:{} {}",
- getEndpoint().getConfiguration().getStreamName(),
- getEndpoint().getConfiguration().getShardId());
- throw new IllegalStateException(
- new ReachedClosedStatusException(
- getEndpoint().getConfiguration().getStreamName(), shard.shardId()));
- default:
- throw new IllegalArgumentException("Unsupported shard closed strategy");
- }
- }
-
- });
+ .getShardId()))
+ .findFirst()
+ .orElseThrow(() -> new IllegalStateException("The shard can't be found"));
+
+ fetchAndPrepareRecordsForCamel(shard, processedExchangeCount);
+
+ } else {
+ getShardList()
+ .parallelStream()
+ .forEach(shard -> {
+ fetchAndPrepareRecordsForCamel(shard, processedExchangeCount);
+ });
+ }
return processedExchangeCount.get();
}
+ private void fetchAndPrepareRecordsForCamel(
+ final Shard shard,
+ AtomicInteger processedExchangeCount) {
+ String shardIterator = null;
+ try {
+ shardIterator = getShardIterator(shard);
+ } catch (ExecutionException | InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+
+ if (shardIterator == null) {
+ // probably closed. Returning 0 as nothing was processed
+ processedExchangeCount.set(0);
+ }
+
+ GetRecordsRequest req = GetRecordsRequest
+ .builder()
+ .shardIterator(shardIterator)
+ .limit(getEndpoint()
+ .getConfiguration()
+ .getMaxResultsPerRequest())
+ .build();
+
+ GetRecordsResponse result = null;
+ if (getEndpoint().getConfiguration().isAsyncClient()) {
+ try {
+ result = getAsyncClient()
+ .getRecords(req)
+ .get();
+ } catch (ExecutionException | InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ } else {
+ result = getClient().getRecords(req);
+ }
+
+ try {
+ Queue<Exchange> exchanges = createExchanges(result.records());
+ processedExchangeCount.getAndSet(processBatch(CastUtils.cast(exchanges)));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ // May cache the last successful sequence number, and pass it to the
+ // getRecords request. That way, on the next poll, we start from where
+ // we left off, however, I don't know what happens to subsequent
+ // exchanges when an earlier exchange fails.
+
+ var currentShardIterator = result.nextShardIterator();
+ if (isShardClosed) {
+ switch (getEndpoint().getConfiguration().getShardClosed()) {
+ case ignore:
+ LOG.warn("The shard {} is in closed state", currentShardIterator);
+ break;
+ case silent:
+ break;
+ case fail:
+ LOG.info("Shard Iterator reaches CLOSE status:{} {}",
+ getEndpoint().getConfiguration().getStreamName(),
+ getEndpoint().getConfiguration().getShardId());
+ throw new IllegalStateException(
+ new ReachedClosedStatusException(
+ getEndpoint().getConfiguration().getStreamName(), shard.shardId()));
+ default:
+ throw new IllegalArgumentException("Unsupported shard closed strategy");
+ }
+ }
+ }
+
@Override
public int processBatch(Queue<Object> exchanges) throws Exception {
int processedExchanges = 0;
diff --git a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithSilentTest.java b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithSilentTest.java
index b7899b95efd..2b34ca315ed 100644
--- a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithSilentTest.java
+++ b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithSilentTest.java
@@ -28,9 +28,12 @@ import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;
+import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse;
import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
@@ -41,6 +44,7 @@ import software.amazon.awssdk.services.kinesis.model.Record;
import software.amazon.awssdk.services.kinesis.model.SequenceNumberRange;
import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
+import software.amazon.awssdk.services.kinesis.model.StreamDescription;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -50,8 +54,8 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
+@MockitoSettings(strictness = Strictness.LENIENT)
public class KinesisConsumerClosedShardWithSilentTest {
-
@Mock
private KinesisClient kinesisClient;
@Mock
@@ -111,6 +115,16 @@ public class KinesisConsumerClosedShardWithSilentTest {
@Test
public void itDoesNotMakeADescribeStreamRequestIfShardIdIsSet() throws Exception {
+
+ SequenceNumberRange range = SequenceNumberRange.builder().endingSequenceNumber("20").build();
+ Shard shard = Shard.builder().shardId("shardId").sequenceNumberRange(range).build();
+ ArrayList<Shard> shardList = new ArrayList<>();
+ shardList.add(shard);
+
+ when(kinesisClient.describeStream(any(DescribeStreamRequest.class)))
+ .thenReturn(DescribeStreamResponse.builder()
+ .streamDescription(StreamDescription.builder().shards(shardList).build()).build());
+
underTest.getEndpoint().getConfiguration().setShardId("shardId");
underTest.poll();