You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2022/06/01 15:18:23 UTC

[camel] branch main updated (49cacf09875 -> b14097992a6)

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


    from 49cacf09875 CAMEL-18151: camel-jbang - Export command for quarkus
     new 74d4edc43f7 (chores) camel-aws2-kinesis: removes unused exception
     new b14097992a6 CAMEL-18127: added integration test for Resume with AWS Kinesis

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../kinesis/integration/KinesisConsumerIT.java     |  2 +-
 ...onsumerIT.java => KinesisConsumerResumeIT.java} | 98 +++++++++++++++++-----
 .../test/infra/aws2/clients/KinesisUtils.java      | 41 +++++----
 3 files changed, 107 insertions(+), 34 deletions(-)
 copy components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/{KinesisConsumerIT.java => KinesisConsumerResumeIT.java} (56%)


[camel] 01/02: (chores) camel-aws2-kinesis: removes unused exception

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 74d4edc43f7138bd488d3b3bcfa66d5abe8b8c76
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Wed Jun 1 14:19:05 2022 +0200

    (chores) camel-aws2-kinesis: removes unused exception
---
 .../camel/component/aws2/kinesis/integration/KinesisConsumerIT.java     | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisConsumerIT.java b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisConsumerIT.java
index 197cb446773..4d5acc64957 100644
--- a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisConsumerIT.java
+++ b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisConsumerIT.java
@@ -91,7 +91,7 @@ public class KinesisConsumerIT extends CamelTestSupport {
                 fromF(kinesisEndpointUri, streamName)
                         .process(new Processor() {
                             @Override
-                            public void process(Exchange exchange) throws Exception {
+                            public void process(Exchange exchange) {
                                 KinesisData data = new KinesisData();
 
                                 final Message message = exchange.getMessage();


[camel] 02/02: CAMEL-18127: added integration test for Resume with AWS Kinesis

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit b14097992a6ecba4555078bd53f45b19d4fc807d
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Wed Jun 1 15:50:58 2022 +0200

    CAMEL-18127: added integration test for Resume with AWS Kinesis
---
 .../integration/KinesisConsumerResumeIT.java       | 200 +++++++++++++++++++++
 .../test/infra/aws2/clients/KinesisUtils.java      |  41 +++--
 2 files changed, 227 insertions(+), 14 deletions(-)

diff --git a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisConsumerResumeIT.java b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisConsumerResumeIT.java
new file mode 100644
index 00000000000..2cfa0cafd3d
--- /dev/null
+++ b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisConsumerResumeIT.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.aws2.kinesis.integration;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Message;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.aws2.kinesis.Kinesis2Constants;
+import org.apache.camel.component.aws2.kinesis.consumer.KinesisResumeAdapter;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.resume.TransientResumeStrategy;
+import org.apache.camel.test.infra.aws.common.AWSCommon;
+import org.apache.camel.test.infra.aws.common.services.AWSService;
+import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils;
+import org.apache.camel.test.infra.aws2.services.AWSServiceFactory;
+import org.apache.camel.test.infra.common.TestUtils;
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
+import software.amazon.awssdk.services.kinesis.KinesisClient;
+import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
+import software.amazon.awssdk.services.kinesis.model.PutRecordsResponse;
+import software.amazon.awssdk.services.kinesis.model.PutRecordsResultEntry;
+import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
+
+import static org.apache.camel.test.infra.aws2.clients.KinesisUtils.createStream;
+import static org.apache.camel.test.infra.aws2.clients.KinesisUtils.deleteStream;
+import static org.apache.camel.test.infra.aws2.clients.KinesisUtils.putRecords;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.testcontainers.shaded.org.awaitility.Awaitility.await;
+
+/*
+ * This test simulates resuming consumption from AWS Kinesis - consuming right at the middle of the batch of messages
+ * sent.
+ */
+public class KinesisConsumerResumeIT extends CamelTestSupport {
+    private static final class KinesisData {
+        private String partition;
+        private String body;
+
+        @Override
+        public String toString() {
+            return "KinesisData{" +
+                   "partition='" + partition + '\'' +
+                   ", body='" + body + '\'' +
+                   '}';
+        }
+    }
+
+    private static final class TestKinesisResumeAdapter implements KinesisResumeAdapter {
+        private List<PutRecordsResponse> previousRecords;
+        private final int expectedCount;
+
+        private TestKinesisResumeAdapter(int expectedCount) {
+            this.expectedCount = expectedCount;
+        }
+
+        @Override
+        public void setRequestBuilder(GetShardIteratorRequest.Builder builder) {
+            LOG.debug("Waiting for data");
+            Awaitility.await().atMost(1, TimeUnit.MINUTES).until(() -> !previousRecords.isEmpty());
+
+            final PutRecordsResultEntry putRecordsResultEntry = previousRecords.get(0).records().get(expectedCount);
+
+            LOG.info("Setting sequence number to: {}", putRecordsResultEntry.sequenceNumber());
+
+            builder.startingSequenceNumber(putRecordsResultEntry.sequenceNumber());
+            builder.shardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER);
+        }
+
+        @Override
+        public void resume() {
+
+        }
+
+        public void setPreviousRecords(List<PutRecordsResponse> previousRecords) {
+            this.previousRecords = previousRecords;
+        }
+    }
+
+    @RegisterExtension
+    public static AWSService awsService = AWSServiceFactory.createKinesisService();
+
+    private static final Logger LOG = LoggerFactory.getLogger(KinesisProducerIT.class);
+
+    @EndpointInject("mock:result")
+    private MockEndpoint result;
+
+    private KinesisClient client;
+    private String streamName = AWSCommon.KINESIS_STREAM_BASE_NAME + "-cons-" + TestUtils.randomWithRange(0, 100);
+    private final int messageCount = 20;
+    private final int expectedCount = messageCount / 2;
+    private List<KinesisData> receivedMessages = new ArrayList<>();
+    private List<PutRecordsResponse> previousRecords;
+    private TestKinesisResumeAdapter adapter = new TestKinesisResumeAdapter(expectedCount);
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        client = AWSSDKClientUtils.newKinesisClient();
+
+        context.getRegistry().bind("amazonKinesisClient", client);
+
+        return new RouteBuilder() {
+            @Override
+            public void configure() {
+                bindToRegistry("testResumeStrategy", new TransientResumeStrategy(adapter));
+
+                String kinesisEndpointUri = "aws2-kinesis://%s?amazonKinesisClient=#amazonKinesisClient";
+
+                fromF(kinesisEndpointUri, streamName)
+                        .process(exchange -> {
+                            KinesisData data = new KinesisData();
+
+                            final Message message = exchange.getMessage();
+
+                            if (message != null) {
+                                data.body = message.getBody(String.class);
+                                data.partition = message.getHeader(Kinesis2Constants.PARTITION_KEY, String.class);
+                            }
+
+                            receivedMessages.add(data);
+                        })
+                        .resumable("testResumeStrategy")
+                        .to("mock:result");
+            }
+        };
+    }
+
+    @BeforeEach
+    public void prepareEnvironment() {
+        createStream(client, streamName);
+
+        previousRecords = putRecords(client, streamName, messageCount);
+        for (PutRecordsResponse response : previousRecords) {
+            final List<PutRecordsResultEntry> records = response.records();
+
+            for (PutRecordsResultEntry entry : records) {
+                LOG.debug("Sequence {}", entry.sequenceNumber());
+            }
+        }
+
+        adapter.setPreviousRecords(previousRecords);
+    }
+
+    @AfterEach
+    public void cleanUp() {
+        deleteStream(client, streamName);
+    }
+
+    @DisplayName("Tests that the component can resume messages from AWS Kinesis")
+    @Timeout(value = 2, unit = TimeUnit.MINUTES)
+    @Test
+    void testProduceMessages() {
+        result.expectedMessageCount(expectedCount);
+
+        await().atMost(1, TimeUnit.MINUTES)
+                .untilAsserted(() -> result.assertIsSatisfied());
+
+        assertEquals(expectedCount, receivedMessages.size());
+        for (KinesisData data : receivedMessages) {
+            assert data != null; // should never happen
+            LOG.info("Received: {}", data.body);
+            assertNotNull(data.body, "The body should not be null");
+            assertNotNull(data.partition, "The partition should not be null");
+            /*
+             In this test scenario message "1" is sent to partition-1, message "2" is sent to partition-2,
+             and so on. This is just testing that the code is not mixing things up.
+             */
+            assertTrue(data.partition.endsWith(data.body), "The data/partition mismatch for record: " + data);
+        }
+    }
+}
diff --git a/test-infra/camel-test-infra-aws-v2/src/test/java/org/apache/camel/test/infra/aws2/clients/KinesisUtils.java b/test-infra/camel-test-infra-aws-v2/src/test/java/org/apache/camel/test/infra/aws2/clients/KinesisUtils.java
index 7ddd7b212e9..75f4ce1b9f3 100644
--- a/test-infra/camel-test-infra-aws-v2/src/test/java/org/apache/camel/test/infra/aws2/clients/KinesisUtils.java
+++ b/test-infra/camel-test-infra-aws-v2/src/test/java/org/apache/camel/test/infra/aws2/clients/KinesisUtils.java
@@ -20,12 +20,14 @@ package org.apache.camel.test.infra.aws2.clients;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
 
 import org.apache.camel.test.infra.common.TestUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import software.amazon.awssdk.awscore.exception.AwsServiceException;
 import software.amazon.awssdk.core.SdkBytes;
+import software.amazon.awssdk.core.exception.SdkClientException;
 import software.amazon.awssdk.services.kinesis.KinesisClient;
 import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest;
 import software.amazon.awssdk.services.kinesis.model.CreateStreamResponse;
@@ -82,7 +84,7 @@ public final class KinesisUtils {
             LOG.info("Kinesis stream check result: {}", status);
         } catch (KinesisException e) {
             if (LOG.isTraceEnabled()) {
-                LOG.info("The stream does not exist, auto creating it: {}", e.getMessage(), e);
+                LOG.trace("The stream does not exist, auto creating it: {}", e.getMessage(), e);
             } else {
                 LOG.info("The stream does not exist, auto creating it: {}", e.getMessage());
             }
@@ -100,6 +102,8 @@ public final class KinesisUtils {
                     return false;
                 }
             });
+        } catch (SdkClientException e) {
+            LOG.info("SDK Error when getting the stream: {}", e.getMessage());
         }
     }
 
@@ -149,7 +153,13 @@ public final class KinesisUtils {
         }
     }
 
-    public static void putRecords(KinesisClient kinesisClient, String streamName, int count) {
+    public static List<PutRecordsResponse> putRecords(KinesisClient kinesisClient, String streamName, int count) {
+        return putRecords(kinesisClient, streamName, count, null);
+    }
+
+    public static List<PutRecordsResponse> putRecords(
+            KinesisClient kinesisClient, String streamName, int count,
+            Consumer<PutRecordsRequest.Builder> customizer) {
         List<PutRecordsRequestEntry> putRecordsRequestEntryList = new ArrayList<>();
 
         LOG.debug("Adding data to the Kinesis stream");
@@ -167,23 +177,24 @@ public final class KinesisUtils {
 
         LOG.debug("Done creating the data records");
 
-        PutRecordsRequest putRecordsRequest = PutRecordsRequest
-                .builder()
+        final PutRecordsRequest.Builder requestBuilder = PutRecordsRequest
+                .builder();
+
+        requestBuilder
                 .streamName(streamName)
-                .records(putRecordsRequestEntryList)
-                .build();
+                .records(putRecordsRequestEntryList);
+
+        if (customizer != null) {
+            customizer.accept(requestBuilder);
+        }
+
+        PutRecordsRequest putRecordsRequest = requestBuilder.build();
+        List<PutRecordsResponse> replies = new ArrayList<>(count);
 
         int retries = 5;
         do {
             try {
-                PutRecordsResponse response = kinesisClient.putRecords(putRecordsRequest);
-
-                if (response.sdkHttpResponse().isSuccessful()) {
-                    LOG.debug("Done putting the data records into the stream");
-                } else {
-                    fail("Unable to put all the records into the stream");
-                }
-
+                replies.add(kinesisClient.putRecords(putRecordsRequest));
                 break;
             } catch (AwsServiceException e) {
                 retries--;
@@ -207,6 +218,8 @@ public final class KinesisUtils {
                 }
             }
         } while (retries > 0);
+
+        return replies;
     }
 
     private static boolean hasShards(KinesisClient kinesisClient, DescribeStreamRequest describeStreamRequest) {