You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2023/07/21 08:49:16 UTC

[camel] branch main updated: add the capability of consume only a single shard or more (#10753)

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 4afac70fa11 add the capability of consume only a single shard or more (#10753)
4afac70fa11 is described below

commit 4afac70fa1108042dc1a21790aee95a48cdab1be
Author: Hamed Hatami <ja...@gmail.com>
AuthorDate: Fri Jul 21 10:49:09 2023 +0200

    add the capability of consume only a single shard or more (#10753)
    
    * add the capability of consume only a single shard or more
    
    * add the capability of consume only a single shard or more
    
    ---------
    
    Co-authored-by: Hamed Hatami <ha...@postnord.com>
---
 .../component/aws2/kinesis/Kinesis2Consumer.java   | 178 +++++++++++++--------
 .../KinesisConsumerClosedShardWithSilentTest.java  |  16 +-
 2 files changed, 124 insertions(+), 70 deletions(-)

diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
index 696b56e165a..6a9638641a2 100644
--- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
+++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
@@ -37,6 +37,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
 import software.amazon.awssdk.services.kinesis.KinesisClient;
+import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;
+import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse;
 import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
 import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
 import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
@@ -63,80 +65,118 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R
 
         var processedExchangeCount = new AtomicInteger(0);
 
-        getShardList()
-                .parallelStream()
-                .forEach(shard -> {
-
-                    String shardIterator = null;
-                    try {
-                        shardIterator = getShardIterator(shard);
-                    } catch (ExecutionException | InterruptedException e) {
-                        throw new RuntimeException(e);
-                    }
-
-                    if (shardIterator == null) {
-                        // probably closed. Returning 0 as nothing was processed
-                        processedExchangeCount.set(0);
-                    }
-
-                    GetRecordsRequest req = GetRecordsRequest
-                            .builder()
-                            .shardIterator(shardIterator)
-                            .limit(getEndpoint()
+        if (!getEndpoint().getConfiguration().getShardId().isEmpty()) {
+            var request = DescribeStreamRequest
+                    .builder()
+                    .streamName(getEndpoint().getConfiguration().getStreamName())
+                    .build();
+            DescribeStreamResponse response = null;
+            if (getEndpoint().getConfiguration().isAsyncClient()) {
+                try {
+                    response = getAsyncClient()
+                            .describeStream(request)
+                            .get();
+                } catch (ExecutionException | InterruptedException e) {
+                    throw new RuntimeException(e);
+                }
+            } else {
+                response = getClient().describeStream(request);
+            }
+
+            var shard = response
+                    .streamDescription()
+                    .shards()
+                    .stream()
+                    .filter(shardItem -> shardItem
+                            .shardId()
+                            .equalsIgnoreCase(getEndpoint()
                                     .getConfiguration()
-                                    .getMaxResultsPerRequest())
-                            .build();
-
-                    GetRecordsResponse result = null;
-                    if (getEndpoint().getConfiguration().isAsyncClient()) {
-                        try {
-                            result = getAsyncClient()
-                                    .getRecords(req)
-                                    .get();
-                        } catch (ExecutionException | InterruptedException e) {
-                            throw new RuntimeException(e);
-                        }
-                    } else {
-                        result = getClient().getRecords(req);
-                    }
-
-                    try {
-                        Queue<Exchange> exchanges = createExchanges(result.records());
-                        processedExchangeCount.getAndSet(processBatch(CastUtils.cast(exchanges)));
-                    } catch (Exception e) {
-                        throw new RuntimeException(e);
-                    }
-
-                    // May cache the last successful sequence number, and pass it to the
-                    // getRecords request. That way, on the next poll, we start from where
-                    // we left off, however, I don't know what happens to subsequent
-                    // exchanges when an earlier exchange fails.
-
-                    var currentShardIterator = result.nextShardIterator();
-                    if (isShardClosed) {
-                        switch (getEndpoint().getConfiguration().getShardClosed()) {
-                            case ignore:
-                                LOG.warn("The shard {} is in closed state", currentShardIterator);
-                                break;
-                            case silent:
-                                break;
-                            case fail:
-                                LOG.info("Shard Iterator reaches CLOSE status:{} {}",
-                                        getEndpoint().getConfiguration().getStreamName(),
-                                        getEndpoint().getConfiguration().getShardId());
-                                throw new IllegalStateException(
-                                        new ReachedClosedStatusException(
-                                                getEndpoint().getConfiguration().getStreamName(), shard.shardId()));
-                            default:
-                                throw new IllegalArgumentException("Unsupported shard closed strategy");
-                        }
-                    }
-
-                });
+                                    .getShardId()))
+                    .findFirst()
+                    .orElseThrow(() -> new IllegalStateException("The shard can't be found"));
+
+            fetchAndPrepareRecordsForCamel(shard, processedExchangeCount);
+
+        } else {
+            getShardList()
+                    .parallelStream()
+                    .forEach(shard -> {
+                        fetchAndPrepareRecordsForCamel(shard, processedExchangeCount);
+                    });
+        }
 
         return processedExchangeCount.get();
     }
 
+    private void fetchAndPrepareRecordsForCamel(
+            final Shard shard,
+            AtomicInteger processedExchangeCount) {
+        String shardIterator = null;
+        try {
+            shardIterator = getShardIterator(shard);
+        } catch (ExecutionException | InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+
+        if (shardIterator == null) {
+            // probably closed. Returning 0 as nothing was processed
+            processedExchangeCount.set(0);
+        }
+
+        GetRecordsRequest req = GetRecordsRequest
+                .builder()
+                .shardIterator(shardIterator)
+                .limit(getEndpoint()
+                        .getConfiguration()
+                        .getMaxResultsPerRequest())
+                .build();
+
+        GetRecordsResponse result = null;
+        if (getEndpoint().getConfiguration().isAsyncClient()) {
+            try {
+                result = getAsyncClient()
+                        .getRecords(req)
+                        .get();
+            } catch (ExecutionException | InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+        } else {
+            result = getClient().getRecords(req);
+        }
+
+        try {
+            Queue<Exchange> exchanges = createExchanges(result.records());
+            processedExchangeCount.getAndSet(processBatch(CastUtils.cast(exchanges)));
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+
+        // May cache the last successful sequence number, and pass it to the
+        // getRecords request. That way, on the next poll, we start from where
+        // we left off, however, I don't know what happens to subsequent
+        // exchanges when an earlier exchange fails.
+
+        var currentShardIterator = result.nextShardIterator();
+        if (isShardClosed) {
+            switch (getEndpoint().getConfiguration().getShardClosed()) {
+                case ignore:
+                    LOG.warn("The shard {} is in closed state", currentShardIterator);
+                    break;
+                case silent:
+                    break;
+                case fail:
+                    LOG.info("Shard Iterator reaches CLOSE status:{} {}",
+                            getEndpoint().getConfiguration().getStreamName(),
+                            getEndpoint().getConfiguration().getShardId());
+                    throw new IllegalStateException(
+                            new ReachedClosedStatusException(
+                                    getEndpoint().getConfiguration().getStreamName(), shard.shardId()));
+                default:
+                    throw new IllegalArgumentException("Unsupported shard closed strategy");
+            }
+        }
+    }
+
     @Override
     public int processBatch(Queue<Object> exchanges) throws Exception {
         int processedExchanges = 0;
diff --git a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithSilentTest.java b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithSilentTest.java
index b7899b95efd..2b34ca315ed 100644
--- a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithSilentTest.java
+++ b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithSilentTest.java
@@ -28,9 +28,12 @@ import org.junit.jupiter.api.extension.ExtendWith;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Mock;
 import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
 import software.amazon.awssdk.core.SdkBytes;
 import software.amazon.awssdk.services.kinesis.KinesisClient;
 import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;
+import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse;
 import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
 import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
 import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
@@ -41,6 +44,7 @@ import software.amazon.awssdk.services.kinesis.model.Record;
 import software.amazon.awssdk.services.kinesis.model.SequenceNumberRange;
 import software.amazon.awssdk.services.kinesis.model.Shard;
 import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
+import software.amazon.awssdk.services.kinesis.model.StreamDescription;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -50,8 +54,8 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 @ExtendWith(MockitoExtension.class)
+@MockitoSettings(strictness = Strictness.LENIENT)
 public class KinesisConsumerClosedShardWithSilentTest {
-
     @Mock
     private KinesisClient kinesisClient;
     @Mock
@@ -111,6 +115,16 @@ public class KinesisConsumerClosedShardWithSilentTest {
 
     @Test
     public void itDoesNotMakeADescribeStreamRequestIfShardIdIsSet() throws Exception {
+
+        SequenceNumberRange range = SequenceNumberRange.builder().endingSequenceNumber("20").build();
+        Shard shard = Shard.builder().shardId("shardId").sequenceNumberRange(range).build();
+        ArrayList<Shard> shardList = new ArrayList<>();
+        shardList.add(shard);
+
+        when(kinesisClient.describeStream(any(DescribeStreamRequest.class)))
+                .thenReturn(DescribeStreamResponse.builder()
+                        .streamDescription(StreamDescription.builder().shards(shardList).build()).build());
+
         underTest.getEndpoint().getConfiguration().setShardId("shardId");
 
         underTest.poll();