You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by pp...@apache.org on 2021/02/16 17:35:51 UTC

[camel-quarkus] branch master updated: Test AWS 2 Firehose

This is an automated email from the ASF dual-hosted git repository.

ppalaga pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-quarkus.git


The following commit(s) were added to refs/heads/master by this push:
     new 83e5c76  Test AWS 2 Firehose
83e5c76 is described below

commit 83e5c768b6196de7a7fb04990c3a40e2a22d24f7
Author: Peter Palaga <pp...@redhat.com>
AuthorDate: Tue Feb 16 11:02:14 2021 +0100

    Test AWS 2 Firehose
---
 integration-tests-aws2/aws2-kinesis/pom.xml        |  10 +
 .../kinesis/it/Aws2KinesisFirehoseResource.java    |  65 ++++++
 .../src/main/resources/application.properties      |   4 +
 .../component/aws2/kinesis/it/Aws2KinesisTest.java |  92 ++++++++-
 .../kinesis/it/Aws2KinesisTestEnvCustomizer.java   | 222 ++++++++++++++++++++-
 .../test/support/aws2/Aws2TestEnvContext.java      |  57 ++++--
 .../test/support/aws2/Aws2TestEnvCustomizer.java   |   7 +
 .../test/support/aws2/Aws2TestResource.java        |  11 +-
 integration-tests/aws2-grouped/pom.xml             |   5 +
 pom.xml                                            |   2 +-
 .../integration-test-pom.xml                       |   2 +-
 11 files changed, 446 insertions(+), 31 deletions(-)

diff --git a/integration-tests-aws2/aws2-kinesis/pom.xml b/integration-tests-aws2/aws2-kinesis/pom.xml
index e850bed..d05c8be 100644
--- a/integration-tests-aws2/aws2-kinesis/pom.xml
+++ b/integration-tests-aws2/aws2-kinesis/pom.xml
@@ -83,6 +83,16 @@
             <artifactId>awaitility</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>software.amazon.awssdk</groupId>
+            <artifactId>s3</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>software.amazon.awssdk</groupId>
+            <artifactId>iam</artifactId>
+            <scope>test</scope>
+        </dependency>
 
         <!-- The following dependencies guarantee that this module is built after them. You can update them by running `mvn process-resources -Pformat -N` from the source tree root directory -->
         <dependency>
diff --git a/integration-tests-aws2/aws2-kinesis/src/main/java/org/apache/camel/quarkus/component/aws2/kinesis/it/Aws2KinesisFirehoseResource.java b/integration-tests-aws2/aws2-kinesis/src/main/java/org/apache/camel/quarkus/component/aws2/kinesis/it/Aws2KinesisFirehoseResource.java
new file mode 100644
index 0000000..6376226
--- /dev/null
+++ b/integration-tests-aws2/aws2-kinesis/src/main/java/org/apache/camel/quarkus/component/aws2/kinesis/it/Aws2KinesisFirehoseResource.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.component.aws2.kinesis.it;
+
+import java.net.URI;
+
+import javax.enterprise.context.ApplicationScoped;
+import javax.inject.Inject;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.component.aws2.kinesis.Kinesis2Constants;
+import org.eclipse.microprofile.config.inject.ConfigProperty;
+
+@Path("/aws2-kinesis-firehose")
+@ApplicationScoped
+public class Aws2KinesisFirehoseResource {
+
+    @ConfigProperty(name = "aws-kinesis-firehose.delivery-stream-name")
+    String deliveryStreamName;
+
+    @Inject
+    ProducerTemplate producerTemplate;
+
+    @Path("/send")
+    @POST
+    @Consumes(MediaType.TEXT_PLAIN)
+    @Produces(MediaType.TEXT_PLAIN)
+    public Response send(String message) throws Exception {
+        final String response = producerTemplate.requestBodyAndHeader(
+                componentUri(),
+                message,
+                Kinesis2Constants.PARTITION_KEY,
+                "foo-partition-key",
+                String.class);
+        return Response
+                .created(new URI("https://camel.apache.org/"))
+                .entity(response)
+                .build();
+    }
+
+    private String componentUri() {
+        return "aws2-kinesis-firehose://" + deliveryStreamName;
+    }
+
+}
diff --git a/integration-tests-aws2/aws2-kinesis/src/main/resources/application.properties b/integration-tests-aws2/aws2-kinesis/src/main/resources/application.properties
index 926eb0c..e7a4c2d 100644
--- a/integration-tests-aws2/aws2-kinesis/src/main/resources/application.properties
+++ b/integration-tests-aws2/aws2-kinesis/src/main/resources/application.properties
@@ -18,3 +18,7 @@
 camel.component.aws2-kinesis.access-key=${AWS_ACCESS_KEY}
 camel.component.aws2-kinesis.secret-key=${AWS_SECRET_KEY}
 camel.component.aws2-kinesis.region=${AWS_REGION:us-east-1}
+
+camel.component.aws2-kinesis-firehose.access-key=${AWS_ACCESS_KEY}
+camel.component.aws2-kinesis-firehose.secret-key=${AWS_SECRET_KEY}
+camel.component.aws2-kinesis-firehose.region=${AWS_REGION:us-east-1}
diff --git a/integration-tests-aws2/aws2-kinesis/src/test/java/org/apache/camel/quarkus/component/aws2/kinesis/it/Aws2KinesisTest.java b/integration-tests-aws2/aws2-kinesis/src/test/java/org/apache/camel/quarkus/component/aws2/kinesis/it/Aws2KinesisTest.java
index 7efffec..d093e10 100644
--- a/integration-tests-aws2/aws2-kinesis/src/test/java/org/apache/camel/quarkus/component/aws2/kinesis/it/Aws2KinesisTest.java
+++ b/integration-tests-aws2/aws2-kinesis/src/test/java/org/apache/camel/quarkus/component/aws2/kinesis/it/Aws2KinesisTest.java
@@ -16,20 +16,43 @@
  */
 package org.apache.camel.quarkus.component.aws2.kinesis.it;
 
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
 import io.quarkus.test.common.QuarkusTestResource;
 import io.quarkus.test.junit.QuarkusTest;
 import io.restassured.RestAssured;
 import io.restassured.http.ContentType;
 import org.apache.camel.quarkus.test.support.aws2.Aws2TestResource;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.awaitility.Awaitility;
+import org.eclipse.microprofile.config.Config;
+import org.eclipse.microprofile.config.ConfigProvider;
 import org.hamcrest.Matchers;
+import org.jboss.logging.Logger;
 import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.core.ResponseInputStream;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.S3ClientBuilder;
+import software.amazon.awssdk.services.s3.model.GetObjectRequest;
+import software.amazon.awssdk.services.s3.model.GetObjectResponse;
+import software.amazon.awssdk.services.s3.model.ListObjectsRequest;
+import software.amazon.awssdk.services.s3.model.ListObjectsResponse;
+import software.amazon.awssdk.services.s3.model.S3Object;
 
 @QuarkusTest
 @QuarkusTestResource(Aws2TestResource.class)
 class Aws2KinesisTest {
 
+    private static final Logger LOG = Logger.getLogger(Aws2KinesisTest.class);
+
     @Test
-    public void test() {
+    public void kinesis() {
         final String msg = "kinesis-" + java.util.UUID.randomUUID().toString().replace("-", "");
         RestAssured.given() //
                 .contentType(ContentType.TEXT)
@@ -44,4 +67,71 @@ class Aws2KinesisTest {
                 .body(Matchers.is(msg));
     }
 
+    @Test
+    public void firehose() {
+        final String msg = RandomStringUtils.randomAlphanumeric(32 * 1024);
+        final String msgPrefix = msg.substring(0, 32);
+        final long maxDataBytes = Aws2KinesisTestEnvCustomizer.BUFFERING_SIZE_MB * 1024 * 1024;
+        long bytesSent = 0;
+        LOG.info("Sending " + Aws2KinesisTestEnvCustomizer.BUFFERING_SIZE_MB + " MB of data to firehose using chunk "
+                + msgPrefix + "...");
+        final long deadline = System.currentTimeMillis() + (Aws2KinesisTestEnvCustomizer.BUFFERING_TIME_SEC * 1000);
+        while (bytesSent < maxDataBytes && System.currentTimeMillis() < deadline) {
+            /* Send at least 1MB of data but do not spend more than a minute by doing it.
+             * This is to overpass minimum buffering limits we have set via BufferingHints in the EnvCustomizer */
+            RestAssured.given() //
+                    .contentType(ContentType.TEXT)
+                    .body(msg)
+                    .post("/aws2-kinesis-firehose/send") //
+                    .then()
+                    .statusCode(201);
+            bytesSent += msg.length();
+            LOG.info("Sent " + bytesSent + "/" + maxDataBytes + " bytes of data");
+        }
+        LOG.info("Sent " + Aws2KinesisTestEnvCustomizer.BUFFERING_SIZE_MB + " MB of data to firehose");
+
+        final Config config = ConfigProvider.getConfig();
+
+        S3ClientBuilder builder = S3Client.builder()
+                .credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create(
+                        config.getValue("camel.component.aws2-kinesis.access-key", String.class),
+                        config.getValue("camel.component.aws2-kinesis.secret-key", String.class))))
+                .region(Region.of(config.getValue("camel.component.aws2-kinesis.region", String.class)));
+
+        config.getOptionalValue("camel.component.aws2-kinesis.uri-endpoint-override",
+                String.class).ifPresent(endpointOverride -> builder.endpointOverride(URI.create(endpointOverride)));
+        try (S3Client client = builder.build()) {
+
+            final String bucketName = config.getValue("aws-kinesis.s3-bucket-name", String.class);
+
+            Awaitility.await().pollInterval(1, TimeUnit.SECONDS).atMost(120, TimeUnit.SECONDS).until(
+                    () -> {
+                        final ListObjectsResponse objects = client
+                                .listObjects(ListObjectsRequest.builder().bucket(bucketName).build());
+                        final List<S3Object> objs = objects.contents();
+                        LOG.info("There are  " + objs.size() + " objects in bucket " + bucketName);
+                        for (S3Object obj : objs) {
+                            LOG.info("Checking object " + obj.key() + " of size " + obj.size());
+                            try (ResponseInputStream<GetObjectResponse> o = client
+                                    .getObject(GetObjectRequest.builder().bucket(bucketName).key(obj.key()).build())) {
+                                final StringBuilder sb = new StringBuilder(msg.length());
+                                final byte[] buf = new byte[1024];
+                                int len;
+                                while ((len = o.read(buf)) >= 0 && sb.length() < msgPrefix.length()) {
+                                    sb.append(new String(buf, 0, len, StandardCharsets.UTF_8));
+                                }
+                                final String foundContent = sb.toString();
+                                if (foundContent.startsWith(msgPrefix)) {
+                                    /* Yes, this is what we have sent */
+                                    LOG.info("Found the expected content in object " + obj.key());
+                                    return true;
+                                }
+                            }
+                        }
+                        return false;
+                    });
+        }
+
+    }
+
 }
diff --git a/integration-tests-aws2/aws2-kinesis/src/test/java/org/apache/camel/quarkus/component/aws2/kinesis/it/Aws2KinesisTestEnvCustomizer.java b/integration-tests-aws2/aws2-kinesis/src/test/java/org/apache/camel/quarkus/component/aws2/kinesis/it/Aws2KinesisTestEnvCustomizer.java
index 189465d..4486af8 100644
--- a/integration-tests-aws2/aws2-kinesis/src/test/java/org/apache/camel/quarkus/component/aws2/kinesis/it/Aws2KinesisTestEnvCustomizer.java
+++ b/integration-tests-aws2/aws2-kinesis/src/test/java/org/apache/camel/quarkus/component/aws2/kinesis/it/Aws2KinesisTestEnvCustomizer.java
@@ -16,33 +16,73 @@
  */
 package org.apache.camel.quarkus.component.aws2.kinesis.it;
 
+import java.util.List;
 import java.util.Locale;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.quarkus.test.support.aws2.Aws2TestEnvContext;
 import org.apache.camel.quarkus.test.support.aws2.Aws2TestEnvCustomizer;
 import org.apache.commons.lang3.RandomStringUtils;
+import org.awaitility.Awaitility;
+import org.jboss.logging.Logger;
 import org.testcontainers.containers.localstack.LocalStackContainer.Service;
+import software.amazon.awssdk.services.firehose.FirehoseClient;
+import software.amazon.awssdk.services.firehose.model.BufferingHints;
+import software.amazon.awssdk.services.firehose.model.CreateDeliveryStreamRequest;
+import software.amazon.awssdk.services.firehose.model.DeleteDeliveryStreamRequest;
+import software.amazon.awssdk.services.firehose.model.DeliveryStreamStatus;
+import software.amazon.awssdk.services.firehose.model.DeliveryStreamType;
+import software.amazon.awssdk.services.firehose.model.DescribeDeliveryStreamRequest;
+import software.amazon.awssdk.services.firehose.model.InvalidArgumentException;
+import software.amazon.awssdk.services.firehose.model.S3DestinationConfiguration;
+import software.amazon.awssdk.services.iam.IamClient;
+import software.amazon.awssdk.services.iam.model.AttachRolePolicyRequest;
+import software.amazon.awssdk.services.iam.model.CreatePolicyRequest;
+import software.amazon.awssdk.services.iam.model.CreateRoleRequest;
+import software.amazon.awssdk.services.iam.model.DeletePolicyRequest;
+import software.amazon.awssdk.services.iam.model.DeleteRoleRequest;
+import software.amazon.awssdk.services.iam.model.DetachRolePolicyRequest;
+import software.amazon.awssdk.services.iam.model.GetPolicyRequest;
+import software.amazon.awssdk.services.iam.model.GetRoleRequest;
+import software.amazon.awssdk.services.iam.waiters.IamWaiter;
 import software.amazon.awssdk.services.kinesis.KinesisClient;
 import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest;
 import software.amazon.awssdk.services.kinesis.model.DeleteStreamRequest;
 import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;
 import software.amazon.awssdk.services.kinesis.waiters.KinesisWaiter;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
+import software.amazon.awssdk.services.s3.model.DeleteBucketRequest;
+import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
+import software.amazon.awssdk.services.s3.model.HeadBucketRequest;
+import software.amazon.awssdk.services.s3.model.ListObjectsRequest;
+import software.amazon.awssdk.services.s3.model.ListObjectsResponse;
+import software.amazon.awssdk.services.s3.model.S3Object;
+import software.amazon.awssdk.services.s3.waiters.S3Waiter;
 
 public class Aws2KinesisTestEnvCustomizer implements Aws2TestEnvCustomizer {
+    public static final int BUFFERING_SIZE_MB = 1;
+    public static final int BUFFERING_TIME_SEC = 60;
+    private static final Logger LOG = Logger.getLogger(Aws2KinesisTestEnvCustomizer.class);
 
     @Override
     public Service[] localstackServices() {
-        return new Service[] { Service.KINESIS };
+        return new Service[] { Service.KINESIS, Service.FIREHOSE, Service.S3, Service.IAM };
+    }
+
+    @Override
+    public Service[] exportCredentialsForLocalstackServices() {
+        return new Service[] { Service.KINESIS, Service.FIREHOSE };
     }
 
     @Override
     public void customize(Aws2TestEnvContext envContext) {
 
         final String streamName = "camel-quarkus-" + RandomStringUtils.randomAlphanumeric(16).toLowerCase(Locale.ROOT);
-        envContext.property("aws-kinesis.stream-name", streamName);
-
-        final KinesisClient client = envContext.client(Service.KINESIS, KinesisClient::builder);
+        final String streamArn;
         {
+            envContext.property("aws-kinesis.stream-name", streamName);
+            final KinesisClient client = envContext.client(Service.KINESIS, KinesisClient::builder);
             client.createStream(
                     CreateStreamRequest.builder()
                             .shardCount(1)
@@ -50,13 +90,183 @@ public class Aws2KinesisTestEnvCustomizer implements Aws2TestEnvCustomizer {
                             .build());
 
             try (KinesisWaiter waiter = client.waiter()) {
-                waiter.waitUntilStreamExists(DescribeStreamRequest.builder()
+                streamArn = waiter.waitUntilStreamExists(DescribeStreamRequest.builder()
                         .streamName(streamName)
-                        .build());
+                        .build())
+                        .matched().response().get().streamDescription().streamARN();
             }
 
             envContext.closeable(() -> client.deleteStream(DeleteStreamRequest.builder().streamName(streamName).build()));
         }
 
+        {
+            final S3Client s3Client = envContext.client(Service.S3, S3Client::builder);
+
+            final String bucketName = "camel-quarkus-firehose-"
+                    + RandomStringUtils.randomAlphanumeric(32).toLowerCase(Locale.ROOT);
+            final String bucketArn = "arn:aws:s3:::" + bucketName;
+            envContext.property("aws-kinesis.s3-bucket-name", bucketName);
+            s3Client.createBucket(CreateBucketRequest.builder().bucket(bucketName).build());
+            envContext.closeable(() -> s3Client.deleteBucket(DeleteBucketRequest.builder().bucket(bucketName).build()));
+            envContext.closeable(() -> {
+                final ListObjectsResponse objects = s3Client.listObjects(
+                        ListObjectsRequest.builder()
+                                .bucket(bucketName)
+                                .build());
+                final List<S3Object> objs = objects.contents();
+                LOG.info("Deleting " + objs.size() + " objects in bucket " + bucketName);
+                for (S3Object obj : objs) {
+                    LOG.info("Deleting object " + obj.key());
+                    s3Client.deleteObject(DeleteObjectRequest.builder().bucket(bucketName).key(obj.key()).build());
+                }
+            });
+
+            try (S3Waiter w = s3Client.waiter()) {
+                w.waitUntilBucketExists(HeadBucketRequest.builder().bucket(bucketName).build());
+            }
+
+            final String deliveryStreamName = "camel-quarkus-firehose-delstr-"
+                    + RandomStringUtils.randomAlphanumeric(16).toLowerCase(Locale.ROOT);
+            envContext.property("aws-kinesis-firehose.delivery-stream-name", deliveryStreamName);
+
+            final String roleName = "s3-" + deliveryStreamName;
+
+            final IamClient iamClient = envContext.client(Service.IAM, IamClient::builder);
+            final String roleArn = iamClient.createRole(
+                    CreateRoleRequest.builder()
+                            .roleName(roleName)
+                            .path("/service-role/")
+                            .assumeRolePolicyDocument("{\n"
+                                    + "  \"Version\": \"2012-10-17\",\n"
+                                    + "  \"Statement\": [\n"
+                                    + "    {\n"
+                                    + "      \"Sid\": \"sid" + RandomStringUtils.randomAlphanumeric(16) + "\",\n"
+                                    + "      \"Effect\": \"Allow\",\n"
+                                    + "      \"Principal\": {\n"
+                                    + "        \"Service\": \"firehose.amazonaws.com\"\n"
+                                    + "      },\n"
+                                    + "      \"Action\": \"sts:AssumeRole\"\n"
+                                    + "    }\n"
+                                    + "  ]\n"
+                                    + "}")
+                            .build())
+                    .role().arn();
+            envContext.closeable(() -> iamClient.deleteRole(DeleteRoleRequest.builder().roleName(roleName).build()));
+
+            try (IamWaiter w = iamClient.waiter()) {
+                w.waitUntilRoleExists(GetRoleRequest.builder().roleName(roleName).build());
+            }
+
+            final String policyName = "firehose-s3-policy-" + deliveryStreamName;
+
+            final String policy = "{\n"
+                    + "    \"Version\": \"2012-10-17\",\n"
+                    + "    \"Statement\":\n"
+                    + "    [\n"
+                    + "        {\n"
+                    + "            \"Sid\": \"sid" + RandomStringUtils.randomAlphanumeric(16) + "\",\n"
+                    + "            \"Effect\": \"Allow\",\n"
+                    + "            \"Action\": [\n"
+                    + "                \"s3:AbortMultipartUpload\",\n"
+                    + "                \"s3:GetBucketLocation\",\n"
+                    + "                \"s3:GetObject\",\n"
+                    + "                \"s3:ListBucket\",\n"
+                    + "                \"s3:ListBucketMultipartUploads\",\n"
+                    + "                \"s3:PutObject\"\n"
+                    + "            ],      \n"
+                    + "            \"Resource\": [\n"
+                    + "                \"arn:aws:s3:::" + bucketName + "\",\n"
+                    + "                \"arn:aws:s3:::" + bucketName + "/*\"\n"
+                    + "            ]\n"
+                    + "        },\n"
+                    + "        {\n"
+                    + "            \"Sid\": \"sid" + RandomStringUtils.randomAlphanumeric(16) + "\",\n"
+                    + "            \"Effect\": \"Allow\",\n"
+                    + "            \"Action\": [\n"
+                    + "                \"kinesis:DescribeStream\",\n"
+                    + "                \"kinesis:GetShardIterator\",\n"
+                    + "                \"kinesis:GetRecords\",\n"
+                    + "                \"kinesis:ListShards\"\n"
+                    + "            ],\n"
+                    + "            \"Resource\": \"" + streamArn + "\"\n"
+                    + "        }\n"
+                    + "    ]\n"
+                    + "}";
+            final String policyArn = iamClient.createPolicy(
+                    CreatePolicyRequest.builder()
+                            .policyName(policyName)
+                            .policyDocument(policy)
+                            .build())
+                    .policy().arn();
+            envContext.closeable(() -> iamClient.deletePolicy(DeletePolicyRequest.builder().policyArn(policyArn).build()));
+
+            try (IamWaiter w = iamClient.waiter()) {
+                w.waitUntilPolicyExists(GetPolicyRequest.builder().policyArn(policyArn).build());
+            }
+
+            iamClient.attachRolePolicy(
+                    AttachRolePolicyRequest.builder()
+                            .policyArn(policyArn)
+                            .roleName(roleName)
+                            .build());
+            envContext.closeable(() -> iamClient.detachRolePolicy(
+                    DetachRolePolicyRequest.builder()
+                            .roleName(roleName)
+                            .policyArn(policyArn)
+                            .build()));
+
+            final FirehoseClient fhClient = envContext.client(Service.FIREHOSE, FirehoseClient::builder);
+
+            /*
+             * Some of the dependency resources above needs some time to get visible for the firehose service
+             * So we need to retry creation of the delivery stream until it succeeds
+             */
+            Awaitility.await().pollInterval(1, TimeUnit.SECONDS).atMost(120, TimeUnit.SECONDS).until(
+                    () -> {
+                        try {
+                            fhClient.createDeliveryStream(
+                                    CreateDeliveryStreamRequest.builder()
+                                            .deliveryStreamName(deliveryStreamName)
+                                            .s3DestinationConfiguration(
+                                                    S3DestinationConfiguration.builder()
+                                                            .bucketARN(bucketArn)
+                                                            .roleARN(roleArn)
+                                                            .bufferingHints(
+                                                                    BufferingHints.builder()
+                                                                            .intervalInSeconds(BUFFERING_TIME_SEC)
+                                                                            .sizeInMBs(BUFFERING_SIZE_MB)
+                                                                            .build())
+                                                            .build())
+                                            .deliveryStreamType(DeliveryStreamType.DIRECT_PUT)
+                                            .build());
+                            LOG.info("Firehose delivery stream " + deliveryStreamName + " finally created");
+                            return true;
+                        } catch (InvalidArgumentException e) {
+                            LOG.info("Retrying the creation of delivery stream " + deliveryStreamName + " because "
+                                    + e.getMessage());
+                            return false;
+                        }
+                    });
+
+            /*
+             * There is no waiter for FirehoseClient so we are polling the state of the stream until the state is ACTIVE
+             * Feel free to improve if you see a more elegant way to do this
+             */
+            Awaitility.await().pollInterval(1, TimeUnit.SECONDS).atMost(120, TimeUnit.SECONDS).until(
+                    () -> {
+                        DeliveryStreamStatus status = fhClient.describeDeliveryStream(
+                                DescribeDeliveryStreamRequest.builder()
+                                        .deliveryStreamName(deliveryStreamName)
+                                        .build())
+                                .deliveryStreamDescription().deliveryStreamStatus();
+                        LOG.info("Delivery stream " + deliveryStreamName + " status: " + status);
+                        return status == DeliveryStreamStatus.ACTIVE;
+                    });
+
+            envContext.closeable(() -> fhClient.deleteDeliveryStream(
+                    DeleteDeliveryStreamRequest.builder().deliveryStreamName(deliveryStreamName).build()));
+
+        }
+
     }
 }
diff --git a/integration-tests-support/aws2/src/main/java/org/apache/camel/quarkus/test/support/aws2/Aws2TestEnvContext.java b/integration-tests-support/aws2/src/main/java/org/apache/camel/quarkus/test/support/aws2/Aws2TestEnvContext.java
index 7246bbb..0b66ab2 100644
--- a/integration-tests-support/aws2/src/main/java/org/apache/camel/quarkus/test/support/aws2/Aws2TestEnvContext.java
+++ b/integration-tests-support/aws2/src/main/java/org/apache/camel/quarkus/test/support/aws2/Aws2TestEnvContext.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.quarkus.test.support.aws2;
 
+import java.net.URI;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.LinkedHashMap;
@@ -47,31 +48,33 @@ public class Aws2TestEnvContext {
     private final Optional<LocalStackContainer> localstack;
 
     public Aws2TestEnvContext(String accessKey, String secretKey, String region, Optional<LocalStackContainer> localstack,
-            Service... services) {
+            Service[] exportCredentialsServices) {
         this.accessKey = accessKey;
         this.secretKey = secretKey;
         this.region = region;
         this.localstack = localstack;
 
         localstack.ifPresent(ls -> {
-            for (Service service : services) {
+            for (Service service : exportCredentialsServices) {
                 String s = camelServiceAcronym(service);
-                properties.put("camel.component.aws2-" + s + ".access-key", accessKey);
-                properties.put("camel.component.aws2-" + s + ".secret-key", secretKey);
-                properties.put("camel.component.aws2-" + s + ".region", region);
+                if (s != null) {
+                    properties.put("camel.component.aws2-" + s + ".access-key", accessKey);
+                    properties.put("camel.component.aws2-" + s + ".secret-key", secretKey);
+                    properties.put("camel.component.aws2-" + s + ".region", region);
 
-                switch (service) {
-                case SQS:
-                case SNS:
-                case DYNAMODB:
-                case DYNAMODB_STREAMS:
-                    // TODO https://github.com/apache/camel-quarkus/issues/2216
-                    break;
-                default:
-                    properties.put("camel.component.aws2-" + s + ".override-endpoint", "true");
-                    properties.put("camel.component.aws2-" + s + ".uri-endpoint-override",
-                            ls.getEndpointOverride(service).toString());
-                    break;
+                    switch (service) {
+                    case SQS:
+                    case SNS:
+                    case DYNAMODB:
+                    case DYNAMODB_STREAMS:
+                        // TODO https://github.com/apache/camel-quarkus/issues/2216
+                        break;
+                    default:
+                        properties.put("camel.component.aws2-" + s + ".override-endpoint", "true");
+                        properties.put("camel.component.aws2-" + s + ".uri-endpoint-override",
+                                ls.getEndpointOverride(service).toString());
+                        break;
+                    }
                 }
             }
         });
@@ -136,11 +139,19 @@ public class Aws2TestEnvContext {
             Supplier<B> builderSupplier) {
         B builder = builderSupplier.get()
                 .credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create(
-                        accessKey, secretKey)))
-                .region(Region.of(region));
+                        accessKey, secretKey)));
+        builder.region(Region.of(region));
+
         if (localstack.isPresent()) {
-            builder.endpointOverride(localstack.get().getEndpointOverride(service));
+            builder
+                    .endpointOverride(localstack.get().getEndpointOverride(service))
+                    .region(Region.of(region));
+        } else if (service == Service.IAM) {
+            /* Avoid UnknownHostException: iam.eu-central-1.amazonaws.com */
+            builder.endpointOverride(URI.create("https://iam.amazonaws.com"));
+            builder.region(Region.of("us-east-1"));
         }
+
         final C client = builder.build();
         closeables.add(client);
         return client;
@@ -152,8 +163,14 @@ public class Aws2TestEnvContext {
             return "ddb";
         case DYNAMODB_STREAMS:
             return "ddbstream";
+        case FIREHOSE:
+            return "kinesis-firehose";
         default:
             return service.name().toLowerCase(Locale.ROOT);
         }
     }
+
+    public String getRegion() {
+        return region;
+    }
 }
diff --git a/integration-tests-support/aws2/src/main/java/org/apache/camel/quarkus/test/support/aws2/Aws2TestEnvCustomizer.java b/integration-tests-support/aws2/src/main/java/org/apache/camel/quarkus/test/support/aws2/Aws2TestEnvCustomizer.java
index c479f00..bf073f8 100644
--- a/integration-tests-support/aws2/src/main/java/org/apache/camel/quarkus/test/support/aws2/Aws2TestEnvCustomizer.java
+++ b/integration-tests-support/aws2/src/main/java/org/apache/camel/quarkus/test/support/aws2/Aws2TestEnvCustomizer.java
@@ -30,6 +30,13 @@ public interface Aws2TestEnvCustomizer {
     Service[] localstackServices();
 
     /**
+     * @return an array of Localstack services for which {@link Aws2TestEnvContext} should export credentials properties
+     */
+    default Service[] exportCredentialsForLocalstackServices() {
+        return localstackServices();
+    }
+
+    /**
      * Customize the given {@link Aws2TestEnvContext}
      *
      * @param envContext the {@link Aws2TestEnvContext} to customize
diff --git a/integration-tests-support/aws2/src/main/java/org/apache/camel/quarkus/test/support/aws2/Aws2TestResource.java b/integration-tests-support/aws2/src/main/java/org/apache/camel/quarkus/test/support/aws2/Aws2TestResource.java
index 4e4e48a..f1be06f 100644
--- a/integration-tests-support/aws2/src/main/java/org/apache/camel/quarkus/test/support/aws2/Aws2TestResource.java
+++ b/integration-tests-support/aws2/src/main/java/org/apache/camel/quarkus/test/support/aws2/Aws2TestResource.java
@@ -58,6 +58,13 @@ public final class Aws2TestResource implements ContainerResourceLifecycleManager
             final Service[] services = customizers.stream()
                     .map(Aws2TestEnvCustomizer::localstackServices)
                     .flatMap((Service[] ss) -> Stream.of(ss))
+                    .distinct()
+                    .toArray(Service[]::new);
+
+            final Service[] exportCredentialsServices = customizers.stream()
+                    .map(Aws2TestEnvCustomizer::exportCredentialsForLocalstackServices)
+                    .flatMap((Service[] ss) -> Stream.of(ss))
+                    .distinct()
                     .toArray(Service[]::new);
 
             LocalStackContainer localstack = new LocalStackContainer(DockerImageName.parse("localstack/localstack:0.12.6"))
@@ -65,7 +72,7 @@ public final class Aws2TestResource implements ContainerResourceLifecycleManager
             localstack.start();
 
             envContext = new Aws2TestEnvContext(localstack.getAccessKey(), localstack.getSecretKey(), localstack.getRegion(),
-                    Optional.of(localstack), services);
+                    Optional.of(localstack), exportCredentialsServices);
 
         } else {
             if (!startMockBackend && !realCredentialsProvided) {
@@ -73,7 +80,7 @@ public final class Aws2TestResource implements ContainerResourceLifecycleManager
                         "Set AWS_ACCESS_KEY, AWS_SECRET_KEY and AWS_REGION env vars if you set CAMEL_QUARKUS_START_MOCK_BACKEND=false");
             }
             MockBackendUtils.logRealBackendUsed();
-            envContext = new Aws2TestEnvContext(realKey, realSecret, realRegion, Optional.empty());
+            envContext = new Aws2TestEnvContext(realKey, realSecret, realRegion, Optional.empty(), new Service[0]);
         }
 
         customizers.forEach(customizer -> customizer.customize(envContext));
diff --git a/integration-tests/aws2-grouped/pom.xml b/integration-tests/aws2-grouped/pom.xml
index c88fe59..7708feb 100644
--- a/integration-tests/aws2-grouped/pom.xml
+++ b/integration-tests/aws2-grouped/pom.xml
@@ -95,6 +95,11 @@
             <artifactId>awaitility</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>software.amazon.awssdk</groupId>
+            <artifactId>iam</artifactId>
+            <scope>test</scope>
+        </dependency>
 
         <!-- The following dependencies guarantee that this module is built after them. You can update them by running `mvn process-resources -Pformat -N` from the source tree root directory -->
         <dependency>
diff --git a/pom.xml b/pom.xml
index 0c9a9fd..c59d912 100644
--- a/pom.xml
+++ b/pom.xml
@@ -145,7 +145,7 @@
         <zt-exec.version>1.11</zt-exec.version>
 
         <!-- Maven plugin versions (keep sorted alphabetically) -->
-        <cq-plugin.version>0.30.0</cq-plugin.version>
+        <cq-plugin.version>0.31.0</cq-plugin.version>
         <build-helper-maven-plugin.version>3.1.0</build-helper-maven-plugin.version>
         <exec-maven-plugin.version>3.0.0</exec-maven-plugin.version>
         <formatter-maven-plugin.version>2.11.0</formatter-maven-plugin.version>
diff --git a/tooling/create-extension-templates/integration-test-pom.xml b/tooling/create-extension-templates/integration-test-pom.xml
index 51d3e9a..c5eefb4 100644
--- a/tooling/create-extension-templates/integration-test-pom.xml
+++ b/tooling/create-extension-templates/integration-test-pom.xml
@@ -23,7 +23,7 @@
     <modelVersion>4.0.0</modelVersion>
     <parent>
         <groupId>org.apache.camel.quarkus</groupId>
-        <artifactId>camel-quarkus-[#if nativeSupported]integration-tests[#else]build-parent-it[/#if]</artifactId>
+        <artifactId>camel-quarkus-[#if nativeSupported][=itestParentArtifactId][#else]build-parent-it[/#if]</artifactId>
         <version>[=version]</version>
         <relativePath>[#if nativeSupported]../pom.xml[#else]../../../poms/build-parent-it/pom.xml[/#if]</relativePath>
     </parent>