You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by pp...@apache.org on 2021/02/16 17:35:51 UTC
[camel-quarkus] branch master updated: Test AWS 2 Firehose
This is an automated email from the ASF dual-hosted git repository.
ppalaga pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-quarkus.git
The following commit(s) were added to refs/heads/master by this push:
new 83e5c76 Test AWS 2 Firehose
83e5c76 is described below
commit 83e5c768b6196de7a7fb04990c3a40e2a22d24f7
Author: Peter Palaga <pp...@redhat.com>
AuthorDate: Tue Feb 16 11:02:14 2021 +0100
Test AWS 2 Firehose
---
integration-tests-aws2/aws2-kinesis/pom.xml | 10 +
.../kinesis/it/Aws2KinesisFirehoseResource.java | 65 ++++++
.../src/main/resources/application.properties | 4 +
.../component/aws2/kinesis/it/Aws2KinesisTest.java | 92 ++++++++-
.../kinesis/it/Aws2KinesisTestEnvCustomizer.java | 222 ++++++++++++++++++++-
.../test/support/aws2/Aws2TestEnvContext.java | 57 ++++--
.../test/support/aws2/Aws2TestEnvCustomizer.java | 7 +
.../test/support/aws2/Aws2TestResource.java | 11 +-
integration-tests/aws2-grouped/pom.xml | 5 +
pom.xml | 2 +-
.../integration-test-pom.xml | 2 +-
11 files changed, 446 insertions(+), 31 deletions(-)
diff --git a/integration-tests-aws2/aws2-kinesis/pom.xml b/integration-tests-aws2/aws2-kinesis/pom.xml
index e850bed..d05c8be 100644
--- a/integration-tests-aws2/aws2-kinesis/pom.xml
+++ b/integration-tests-aws2/aws2-kinesis/pom.xml
@@ -83,6 +83,16 @@
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>software.amazon.awssdk</groupId>
+ <artifactId>s3</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>software.amazon.awssdk</groupId>
+ <artifactId>iam</artifactId>
+ <scope>test</scope>
+ </dependency>
<!-- The following dependencies guarantee that this module is built after them. You can update them by running `mvn process-resources -Pformat -N` from the source tree root directory -->
<dependency>
diff --git a/integration-tests-aws2/aws2-kinesis/src/main/java/org/apache/camel/quarkus/component/aws2/kinesis/it/Aws2KinesisFirehoseResource.java b/integration-tests-aws2/aws2-kinesis/src/main/java/org/apache/camel/quarkus/component/aws2/kinesis/it/Aws2KinesisFirehoseResource.java
new file mode 100644
index 0000000..6376226
--- /dev/null
+++ b/integration-tests-aws2/aws2-kinesis/src/main/java/org/apache/camel/quarkus/component/aws2/kinesis/it/Aws2KinesisFirehoseResource.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.component.aws2.kinesis.it;
+
+import java.net.URI;
+
+import javax.enterprise.context.ApplicationScoped;
+import javax.inject.Inject;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.component.aws2.kinesis.Kinesis2Constants;
+import org.eclipse.microprofile.config.inject.ConfigProperty;
+
+@Path("/aws2-kinesis-firehose")
+@ApplicationScoped
+public class Aws2KinesisFirehoseResource {
+
+ @ConfigProperty(name = "aws-kinesis-firehose.delivery-stream-name")
+ String deliveryStreamName;
+
+ @Inject
+ ProducerTemplate producerTemplate;
+
+ @Path("/send")
+ @POST
+ @Consumes(MediaType.TEXT_PLAIN)
+ @Produces(MediaType.TEXT_PLAIN)
+ public Response send(String message) throws Exception {
+ final String response = producerTemplate.requestBodyAndHeader(
+ componentUri(),
+ message,
+ Kinesis2Constants.PARTITION_KEY,
+ "foo-partition-key",
+ String.class);
+ return Response
+ .created(new URI("https://camel.apache.org/"))
+ .entity(response)
+ .build();
+ }
+
+ private String componentUri() {
+ return "aws2-kinesis-firehose://" + deliveryStreamName;
+ }
+
+}
diff --git a/integration-tests-aws2/aws2-kinesis/src/main/resources/application.properties b/integration-tests-aws2/aws2-kinesis/src/main/resources/application.properties
index 926eb0c..e7a4c2d 100644
--- a/integration-tests-aws2/aws2-kinesis/src/main/resources/application.properties
+++ b/integration-tests-aws2/aws2-kinesis/src/main/resources/application.properties
@@ -18,3 +18,7 @@
camel.component.aws2-kinesis.access-key=${AWS_ACCESS_KEY}
camel.component.aws2-kinesis.secret-key=${AWS_SECRET_KEY}
camel.component.aws2-kinesis.region=${AWS_REGION:us-east-1}
+
+camel.component.aws2-kinesis-firehose.access-key=${AWS_ACCESS_KEY}
+camel.component.aws2-kinesis-firehose.secret-key=${AWS_SECRET_KEY}
+camel.component.aws2-kinesis-firehose.region=${AWS_REGION:us-east-1}
diff --git a/integration-tests-aws2/aws2-kinesis/src/test/java/org/apache/camel/quarkus/component/aws2/kinesis/it/Aws2KinesisTest.java b/integration-tests-aws2/aws2-kinesis/src/test/java/org/apache/camel/quarkus/component/aws2/kinesis/it/Aws2KinesisTest.java
index 7efffec..d093e10 100644
--- a/integration-tests-aws2/aws2-kinesis/src/test/java/org/apache/camel/quarkus/component/aws2/kinesis/it/Aws2KinesisTest.java
+++ b/integration-tests-aws2/aws2-kinesis/src/test/java/org/apache/camel/quarkus/component/aws2/kinesis/it/Aws2KinesisTest.java
@@ -16,20 +16,43 @@
*/
package org.apache.camel.quarkus.component.aws2.kinesis.it;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
import io.restassured.RestAssured;
import io.restassured.http.ContentType;
import org.apache.camel.quarkus.test.support.aws2.Aws2TestResource;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.awaitility.Awaitility;
+import org.eclipse.microprofile.config.Config;
+import org.eclipse.microprofile.config.ConfigProvider;
import org.hamcrest.Matchers;
+import org.jboss.logging.Logger;
import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.core.ResponseInputStream;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.S3ClientBuilder;
+import software.amazon.awssdk.services.s3.model.GetObjectRequest;
+import software.amazon.awssdk.services.s3.model.GetObjectResponse;
+import software.amazon.awssdk.services.s3.model.ListObjectsRequest;
+import software.amazon.awssdk.services.s3.model.ListObjectsResponse;
+import software.amazon.awssdk.services.s3.model.S3Object;
@QuarkusTest
@QuarkusTestResource(Aws2TestResource.class)
class Aws2KinesisTest {
+ private static final Logger LOG = Logger.getLogger(Aws2KinesisTest.class);
+
@Test
- public void test() {
+ public void kinesis() {
final String msg = "kinesis-" + java.util.UUID.randomUUID().toString().replace("-", "");
RestAssured.given() //
.contentType(ContentType.TEXT)
@@ -44,4 +67,71 @@ class Aws2KinesisTest {
.body(Matchers.is(msg));
}
+ @Test
+ public void firehose() {
+ final String msg = RandomStringUtils.randomAlphanumeric(32 * 1024);
+ final String msgPrefix = msg.substring(0, 32);
+ final long maxDataBytes = Aws2KinesisTestEnvCustomizer.BUFFERING_SIZE_MB * 1024 * 1024;
+ long bytesSent = 0;
+ LOG.info("Sending " + Aws2KinesisTestEnvCustomizer.BUFFERING_SIZE_MB + " MB of data to firehose using chunk "
+ + msgPrefix + "...");
+ final long deadline = System.currentTimeMillis() + (Aws2KinesisTestEnvCustomizer.BUFFERING_TIME_SEC * 1000);
+ while (bytesSent < maxDataBytes && System.currentTimeMillis() < deadline) {
+ /* Send at least 1MB of data but do not spend more than a minute by doing it.
+ * This is to overpass minimum buffering limits we have set via BufferingHints in the EnvCustomizer */
+ RestAssured.given() //
+ .contentType(ContentType.TEXT)
+ .body(msg)
+ .post("/aws2-kinesis-firehose/send") //
+ .then()
+ .statusCode(201);
+ bytesSent += msg.length();
+ LOG.info("Sent " + bytesSent + "/" + maxDataBytes + " bytes of data");
+ }
+ LOG.info("Sent " + Aws2KinesisTestEnvCustomizer.BUFFERING_SIZE_MB + " MB of data to firehose");
+
+ final Config config = ConfigProvider.getConfig();
+
+ S3ClientBuilder builder = S3Client.builder()
+ .credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create(
+ config.getValue("camel.component.aws2-kinesis.access-key", String.class),
+ config.getValue("camel.component.aws2-kinesis.secret-key", String.class))))
+ .region(Region.of(config.getValue("camel.component.aws2-kinesis.region", String.class)));
+
+ config.getOptionalValue("camel.component.aws2-kinesis.uri-endpoint-override",
+ String.class).ifPresent(endpointOverride -> builder.endpointOverride(URI.create(endpointOverride)));
+ try (S3Client client = builder.build()) {
+
+ final String bucketName = config.getValue("aws-kinesis.s3-bucket-name", String.class);
+
+ Awaitility.await().pollInterval(1, TimeUnit.SECONDS).atMost(120, TimeUnit.SECONDS).until(
+ () -> {
+ final ListObjectsResponse objects = client
+ .listObjects(ListObjectsRequest.builder().bucket(bucketName).build());
+ final List<S3Object> objs = objects.contents();
+ LOG.info("There are " + objs.size() + " objects in bucket " + bucketName);
+ for (S3Object obj : objs) {
+ LOG.info("Checking object " + obj.key() + " of size " + obj.size());
+ try (ResponseInputStream<GetObjectResponse> o = client
+ .getObject(GetObjectRequest.builder().bucket(bucketName).key(obj.key()).build())) {
+ final StringBuilder sb = new StringBuilder(msg.length());
+ final byte[] buf = new byte[1024];
+ int len;
+ while ((len = o.read(buf)) >= 0 && sb.length() < msgPrefix.length()) {
+ sb.append(new String(buf, 0, len, StandardCharsets.UTF_8));
+ }
+ final String foundContent = sb.toString();
+ if (foundContent.startsWith(msgPrefix)) {
+ /* Yes, this is what we have sent */
+ LOG.info("Found the expected content in object " + obj.key());
+ return true;
+ }
+ }
+ }
+ return false;
+ });
+ }
+
+ }
+
}
diff --git a/integration-tests-aws2/aws2-kinesis/src/test/java/org/apache/camel/quarkus/component/aws2/kinesis/it/Aws2KinesisTestEnvCustomizer.java b/integration-tests-aws2/aws2-kinesis/src/test/java/org/apache/camel/quarkus/component/aws2/kinesis/it/Aws2KinesisTestEnvCustomizer.java
index 189465d..4486af8 100644
--- a/integration-tests-aws2/aws2-kinesis/src/test/java/org/apache/camel/quarkus/component/aws2/kinesis/it/Aws2KinesisTestEnvCustomizer.java
+++ b/integration-tests-aws2/aws2-kinesis/src/test/java/org/apache/camel/quarkus/component/aws2/kinesis/it/Aws2KinesisTestEnvCustomizer.java
@@ -16,33 +16,73 @@
*/
package org.apache.camel.quarkus.component.aws2.kinesis.it;
+import java.util.List;
import java.util.Locale;
+import java.util.concurrent.TimeUnit;
import org.apache.camel.quarkus.test.support.aws2.Aws2TestEnvContext;
import org.apache.camel.quarkus.test.support.aws2.Aws2TestEnvCustomizer;
import org.apache.commons.lang3.RandomStringUtils;
+import org.awaitility.Awaitility;
+import org.jboss.logging.Logger;
import org.testcontainers.containers.localstack.LocalStackContainer.Service;
+import software.amazon.awssdk.services.firehose.FirehoseClient;
+import software.amazon.awssdk.services.firehose.model.BufferingHints;
+import software.amazon.awssdk.services.firehose.model.CreateDeliveryStreamRequest;
+import software.amazon.awssdk.services.firehose.model.DeleteDeliveryStreamRequest;
+import software.amazon.awssdk.services.firehose.model.DeliveryStreamStatus;
+import software.amazon.awssdk.services.firehose.model.DeliveryStreamType;
+import software.amazon.awssdk.services.firehose.model.DescribeDeliveryStreamRequest;
+import software.amazon.awssdk.services.firehose.model.InvalidArgumentException;
+import software.amazon.awssdk.services.firehose.model.S3DestinationConfiguration;
+import software.amazon.awssdk.services.iam.IamClient;
+import software.amazon.awssdk.services.iam.model.AttachRolePolicyRequest;
+import software.amazon.awssdk.services.iam.model.CreatePolicyRequest;
+import software.amazon.awssdk.services.iam.model.CreateRoleRequest;
+import software.amazon.awssdk.services.iam.model.DeletePolicyRequest;
+import software.amazon.awssdk.services.iam.model.DeleteRoleRequest;
+import software.amazon.awssdk.services.iam.model.DetachRolePolicyRequest;
+import software.amazon.awssdk.services.iam.model.GetPolicyRequest;
+import software.amazon.awssdk.services.iam.model.GetRoleRequest;
+import software.amazon.awssdk.services.iam.waiters.IamWaiter;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest;
import software.amazon.awssdk.services.kinesis.model.DeleteStreamRequest;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;
import software.amazon.awssdk.services.kinesis.waiters.KinesisWaiter;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
+import software.amazon.awssdk.services.s3.model.DeleteBucketRequest;
+import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
+import software.amazon.awssdk.services.s3.model.HeadBucketRequest;
+import software.amazon.awssdk.services.s3.model.ListObjectsRequest;
+import software.amazon.awssdk.services.s3.model.ListObjectsResponse;
+import software.amazon.awssdk.services.s3.model.S3Object;
+import software.amazon.awssdk.services.s3.waiters.S3Waiter;
public class Aws2KinesisTestEnvCustomizer implements Aws2TestEnvCustomizer {
+ public static final int BUFFERING_SIZE_MB = 1;
+ public static final int BUFFERING_TIME_SEC = 60;
+ private static final Logger LOG = Logger.getLogger(Aws2KinesisTestEnvCustomizer.class);
@Override
public Service[] localstackServices() {
- return new Service[] { Service.KINESIS };
+ return new Service[] { Service.KINESIS, Service.FIREHOSE, Service.S3, Service.IAM };
+ }
+
+ @Override
+ public Service[] exportCredentialsForLocalstackServices() {
+ return new Service[] { Service.KINESIS, Service.FIREHOSE };
}
@Override
public void customize(Aws2TestEnvContext envContext) {
final String streamName = "camel-quarkus-" + RandomStringUtils.randomAlphanumeric(16).toLowerCase(Locale.ROOT);
- envContext.property("aws-kinesis.stream-name", streamName);
-
- final KinesisClient client = envContext.client(Service.KINESIS, KinesisClient::builder);
+ final String streamArn;
{
+ envContext.property("aws-kinesis.stream-name", streamName);
+ final KinesisClient client = envContext.client(Service.KINESIS, KinesisClient::builder);
client.createStream(
CreateStreamRequest.builder()
.shardCount(1)
@@ -50,13 +90,183 @@ public class Aws2KinesisTestEnvCustomizer implements Aws2TestEnvCustomizer {
.build());
try (KinesisWaiter waiter = client.waiter()) {
- waiter.waitUntilStreamExists(DescribeStreamRequest.builder()
+ streamArn = waiter.waitUntilStreamExists(DescribeStreamRequest.builder()
.streamName(streamName)
- .build());
+ .build())
+ .matched().response().get().streamDescription().streamARN();
}
envContext.closeable(() -> client.deleteStream(DeleteStreamRequest.builder().streamName(streamName).build()));
}
+ {
+ final S3Client s3Client = envContext.client(Service.S3, S3Client::builder);
+
+ final String bucketName = "camel-quarkus-firehose-"
+ + RandomStringUtils.randomAlphanumeric(32).toLowerCase(Locale.ROOT);
+ final String bucketArn = "arn:aws:s3:::" + bucketName;
+ envContext.property("aws-kinesis.s3-bucket-name", bucketName);
+ s3Client.createBucket(CreateBucketRequest.builder().bucket(bucketName).build());
+ envContext.closeable(() -> s3Client.deleteBucket(DeleteBucketRequest.builder().bucket(bucketName).build()));
+ envContext.closeable(() -> {
+ final ListObjectsResponse objects = s3Client.listObjects(
+ ListObjectsRequest.builder()
+ .bucket(bucketName)
+ .build());
+ final List<S3Object> objs = objects.contents();
+ LOG.info("Deleting " + objs.size() + " objects in bucket " + bucketName);
+ for (S3Object obj : objs) {
+ LOG.info("Deleting object " + obj.key());
+ s3Client.deleteObject(DeleteObjectRequest.builder().bucket(bucketName).key(obj.key()).build());
+ }
+ });
+
+ try (S3Waiter w = s3Client.waiter()) {
+ w.waitUntilBucketExists(HeadBucketRequest.builder().bucket(bucketName).build());
+ }
+
+ final String deliveryStreamName = "camel-quarkus-firehose-delstr-"
+ + RandomStringUtils.randomAlphanumeric(16).toLowerCase(Locale.ROOT);
+ envContext.property("aws-kinesis-firehose.delivery-stream-name", deliveryStreamName);
+
+ final String roleName = "s3-" + deliveryStreamName;
+
+ final IamClient iamClient = envContext.client(Service.IAM, IamClient::builder);
+ final String roleArn = iamClient.createRole(
+ CreateRoleRequest.builder()
+ .roleName(roleName)
+ .path("/service-role/")
+ .assumeRolePolicyDocument("{\n"
+ + " \"Version\": \"2012-10-17\",\n"
+ + " \"Statement\": [\n"
+ + " {\n"
+ + " \"Sid\": \"sid" + RandomStringUtils.randomAlphanumeric(16) + "\",\n"
+ + " \"Effect\": \"Allow\",\n"
+ + " \"Principal\": {\n"
+ + " \"Service\": \"firehose.amazonaws.com\"\n"
+ + " },\n"
+ + " \"Action\": \"sts:AssumeRole\"\n"
+ + " }\n"
+ + " ]\n"
+ + "}")
+ .build())
+ .role().arn();
+ envContext.closeable(() -> iamClient.deleteRole(DeleteRoleRequest.builder().roleName(roleName).build()));
+
+ try (IamWaiter w = iamClient.waiter()) {
+ w.waitUntilRoleExists(GetRoleRequest.builder().roleName(roleName).build());
+ }
+
+ final String policyName = "firehose-s3-policy-" + deliveryStreamName;
+
+ final String policy = "{\n"
+ + " \"Version\": \"2012-10-17\",\n"
+ + " \"Statement\":\n"
+ + " [\n"
+ + " {\n"
+ + " \"Sid\": \"sid" + RandomStringUtils.randomAlphanumeric(16) + "\",\n"
+ + " \"Effect\": \"Allow\",\n"
+ + " \"Action\": [\n"
+ + " \"s3:AbortMultipartUpload\",\n"
+ + " \"s3:GetBucketLocation\",\n"
+ + " \"s3:GetObject\",\n"
+ + " \"s3:ListBucket\",\n"
+ + " \"s3:ListBucketMultipartUploads\",\n"
+ + " \"s3:PutObject\"\n"
+ + " ], \n"
+ + " \"Resource\": [\n"
+ + " \"arn:aws:s3:::" + bucketName + "\",\n"
+ + " \"arn:aws:s3:::" + bucketName + "/*\"\n"
+ + " ]\n"
+ + " },\n"
+ + " {\n"
+ + " \"Sid\": \"sid" + RandomStringUtils.randomAlphanumeric(16) + "\",\n"
+ + " \"Effect\": \"Allow\",\n"
+ + " \"Action\": [\n"
+ + " \"kinesis:DescribeStream\",\n"
+ + " \"kinesis:GetShardIterator\",\n"
+ + " \"kinesis:GetRecords\",\n"
+ + " \"kinesis:ListShards\"\n"
+ + " ],\n"
+ + " \"Resource\": \"" + streamArn + "\"\n"
+ + " }\n"
+ + " ]\n"
+ + "}";
+ final String policyArn = iamClient.createPolicy(
+ CreatePolicyRequest.builder()
+ .policyName(policyName)
+ .policyDocument(policy)
+ .build())
+ .policy().arn();
+ envContext.closeable(() -> iamClient.deletePolicy(DeletePolicyRequest.builder().policyArn(policyArn).build()));
+
+ try (IamWaiter w = iamClient.waiter()) {
+ w.waitUntilPolicyExists(GetPolicyRequest.builder().policyArn(policyArn).build());
+ }
+
+ iamClient.attachRolePolicy(
+ AttachRolePolicyRequest.builder()
+ .policyArn(policyArn)
+ .roleName(roleName)
+ .build());
+ envContext.closeable(() -> iamClient.detachRolePolicy(
+ DetachRolePolicyRequest.builder()
+ .roleName(roleName)
+ .policyArn(policyArn)
+ .build()));
+
+ final FirehoseClient fhClient = envContext.client(Service.FIREHOSE, FirehoseClient::builder);
+
+ /*
+ * Some of the dependency resources above needs some time to get visible for the firehose service
+ * So we need to retry creation of the delivery stream until it succeeds
+ */
+ Awaitility.await().pollInterval(1, TimeUnit.SECONDS).atMost(120, TimeUnit.SECONDS).until(
+ () -> {
+ try {
+ fhClient.createDeliveryStream(
+ CreateDeliveryStreamRequest.builder()
+ .deliveryStreamName(deliveryStreamName)
+ .s3DestinationConfiguration(
+ S3DestinationConfiguration.builder()
+ .bucketARN(bucketArn)
+ .roleARN(roleArn)
+ .bufferingHints(
+ BufferingHints.builder()
+ .intervalInSeconds(BUFFERING_TIME_SEC)
+ .sizeInMBs(BUFFERING_SIZE_MB)
+ .build())
+ .build())
+ .deliveryStreamType(DeliveryStreamType.DIRECT_PUT)
+ .build());
+ LOG.info("Firehose delivery stream " + deliveryStreamName + " finally created");
+ return true;
+ } catch (InvalidArgumentException e) {
+ LOG.info("Retrying the creation of delivery stream " + deliveryStreamName + " because "
+ + e.getMessage());
+ return false;
+ }
+ });
+
+ /*
+ * There is no waiter for FirehoseClient so we are polling the state of the stream until the state is ACTIVE
+ * Feel free to improve if you see a more elegant way to do this
+ */
+ Awaitility.await().pollInterval(1, TimeUnit.SECONDS).atMost(120, TimeUnit.SECONDS).until(
+ () -> {
+ DeliveryStreamStatus status = fhClient.describeDeliveryStream(
+ DescribeDeliveryStreamRequest.builder()
+ .deliveryStreamName(deliveryStreamName)
+ .build())
+ .deliveryStreamDescription().deliveryStreamStatus();
+ LOG.info("Delivery stream " + deliveryStreamName + " status: " + status);
+ return status == DeliveryStreamStatus.ACTIVE;
+ });
+
+ envContext.closeable(() -> fhClient.deleteDeliveryStream(
+ DeleteDeliveryStreamRequest.builder().deliveryStreamName(deliveryStreamName).build()));
+
+ }
+
}
}
diff --git a/integration-tests-support/aws2/src/main/java/org/apache/camel/quarkus/test/support/aws2/Aws2TestEnvContext.java b/integration-tests-support/aws2/src/main/java/org/apache/camel/quarkus/test/support/aws2/Aws2TestEnvContext.java
index 7246bbb..0b66ab2 100644
--- a/integration-tests-support/aws2/src/main/java/org/apache/camel/quarkus/test/support/aws2/Aws2TestEnvContext.java
+++ b/integration-tests-support/aws2/src/main/java/org/apache/camel/quarkus/test/support/aws2/Aws2TestEnvContext.java
@@ -16,6 +16,7 @@
*/
package org.apache.camel.quarkus.test.support.aws2;
+import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
@@ -47,31 +48,33 @@ public class Aws2TestEnvContext {
private final Optional<LocalStackContainer> localstack;
public Aws2TestEnvContext(String accessKey, String secretKey, String region, Optional<LocalStackContainer> localstack,
- Service... services) {
+ Service[] exportCredentialsServices) {
this.accessKey = accessKey;
this.secretKey = secretKey;
this.region = region;
this.localstack = localstack;
localstack.ifPresent(ls -> {
- for (Service service : services) {
+ for (Service service : exportCredentialsServices) {
String s = camelServiceAcronym(service);
- properties.put("camel.component.aws2-" + s + ".access-key", accessKey);
- properties.put("camel.component.aws2-" + s + ".secret-key", secretKey);
- properties.put("camel.component.aws2-" + s + ".region", region);
+ if (s != null) {
+ properties.put("camel.component.aws2-" + s + ".access-key", accessKey);
+ properties.put("camel.component.aws2-" + s + ".secret-key", secretKey);
+ properties.put("camel.component.aws2-" + s + ".region", region);
- switch (service) {
- case SQS:
- case SNS:
- case DYNAMODB:
- case DYNAMODB_STREAMS:
- // TODO https://github.com/apache/camel-quarkus/issues/2216
- break;
- default:
- properties.put("camel.component.aws2-" + s + ".override-endpoint", "true");
- properties.put("camel.component.aws2-" + s + ".uri-endpoint-override",
- ls.getEndpointOverride(service).toString());
- break;
+ switch (service) {
+ case SQS:
+ case SNS:
+ case DYNAMODB:
+ case DYNAMODB_STREAMS:
+ // TODO https://github.com/apache/camel-quarkus/issues/2216
+ break;
+ default:
+ properties.put("camel.component.aws2-" + s + ".override-endpoint", "true");
+ properties.put("camel.component.aws2-" + s + ".uri-endpoint-override",
+ ls.getEndpointOverride(service).toString());
+ break;
+ }
}
}
});
@@ -136,11 +139,19 @@ public class Aws2TestEnvContext {
Supplier<B> builderSupplier) {
B builder = builderSupplier.get()
.credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create(
- accessKey, secretKey)))
- .region(Region.of(region));
+ accessKey, secretKey)));
+ builder.region(Region.of(region));
+
if (localstack.isPresent()) {
- builder.endpointOverride(localstack.get().getEndpointOverride(service));
+ builder
+ .endpointOverride(localstack.get().getEndpointOverride(service))
+ .region(Region.of(region));
+ } else if (service == Service.IAM) {
+ /* Avoid UnknownHostException: iam.eu-central-1.amazonaws.com */
+ builder.endpointOverride(URI.create("https://iam.amazonaws.com"));
+ builder.region(Region.of("us-east-1"));
}
+
final C client = builder.build();
closeables.add(client);
return client;
@@ -152,8 +163,14 @@ public class Aws2TestEnvContext {
return "ddb";
case DYNAMODB_STREAMS:
return "ddbstream";
+ case FIREHOSE:
+ return "kinesis-firehose";
default:
return service.name().toLowerCase(Locale.ROOT);
}
}
+
+ public String getRegion() {
+ return region;
+ }
}
diff --git a/integration-tests-support/aws2/src/main/java/org/apache/camel/quarkus/test/support/aws2/Aws2TestEnvCustomizer.java b/integration-tests-support/aws2/src/main/java/org/apache/camel/quarkus/test/support/aws2/Aws2TestEnvCustomizer.java
index c479f00..bf073f8 100644
--- a/integration-tests-support/aws2/src/main/java/org/apache/camel/quarkus/test/support/aws2/Aws2TestEnvCustomizer.java
+++ b/integration-tests-support/aws2/src/main/java/org/apache/camel/quarkus/test/support/aws2/Aws2TestEnvCustomizer.java
@@ -30,6 +30,13 @@ public interface Aws2TestEnvCustomizer {
Service[] localstackServices();
/**
+ * @return an array of Localstack services for which {@link Aws2TestEnvContext} should export credentials properties
+ */
+ default Service[] exportCredentialsForLocalstackServices() {
+ return localstackServices();
+ }
+
+ /**
* Customize the given {@link Aws2TestEnvContext}
*
* @param envContext the {@link Aws2TestEnvContext} to customize
diff --git a/integration-tests-support/aws2/src/main/java/org/apache/camel/quarkus/test/support/aws2/Aws2TestResource.java b/integration-tests-support/aws2/src/main/java/org/apache/camel/quarkus/test/support/aws2/Aws2TestResource.java
index 4e4e48a..f1be06f 100644
--- a/integration-tests-support/aws2/src/main/java/org/apache/camel/quarkus/test/support/aws2/Aws2TestResource.java
+++ b/integration-tests-support/aws2/src/main/java/org/apache/camel/quarkus/test/support/aws2/Aws2TestResource.java
@@ -58,6 +58,13 @@ public final class Aws2TestResource implements ContainerResourceLifecycleManager
final Service[] services = customizers.stream()
.map(Aws2TestEnvCustomizer::localstackServices)
.flatMap((Service[] ss) -> Stream.of(ss))
+ .distinct()
+ .toArray(Service[]::new);
+
+ final Service[] exportCredentialsServices = customizers.stream()
+ .map(Aws2TestEnvCustomizer::exportCredentialsForLocalstackServices)
+ .flatMap((Service[] ss) -> Stream.of(ss))
+ .distinct()
.toArray(Service[]::new);
LocalStackContainer localstack = new LocalStackContainer(DockerImageName.parse("localstack/localstack:0.12.6"))
@@ -65,7 +72,7 @@ public final class Aws2TestResource implements ContainerResourceLifecycleManager
localstack.start();
envContext = new Aws2TestEnvContext(localstack.getAccessKey(), localstack.getSecretKey(), localstack.getRegion(),
- Optional.of(localstack), services);
+ Optional.of(localstack), exportCredentialsServices);
} else {
if (!startMockBackend && !realCredentialsProvided) {
@@ -73,7 +80,7 @@ public final class Aws2TestResource implements ContainerResourceLifecycleManager
"Set AWS_ACCESS_KEY, AWS_SECRET_KEY and AWS_REGION env vars if you set CAMEL_QUARKUS_START_MOCK_BACKEND=false");
}
MockBackendUtils.logRealBackendUsed();
- envContext = new Aws2TestEnvContext(realKey, realSecret, realRegion, Optional.empty());
+ envContext = new Aws2TestEnvContext(realKey, realSecret, realRegion, Optional.empty(), new Service[0]);
}
customizers.forEach(customizer -> customizer.customize(envContext));
diff --git a/integration-tests/aws2-grouped/pom.xml b/integration-tests/aws2-grouped/pom.xml
index c88fe59..7708feb 100644
--- a/integration-tests/aws2-grouped/pom.xml
+++ b/integration-tests/aws2-grouped/pom.xml
@@ -95,6 +95,11 @@
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>software.amazon.awssdk</groupId>
+ <artifactId>iam</artifactId>
+ <scope>test</scope>
+ </dependency>
<!-- The following dependencies guarantee that this module is built after them. You can update them by running `mvn process-resources -Pformat -N` from the source tree root directory -->
<dependency>
diff --git a/pom.xml b/pom.xml
index 0c9a9fd..c59d912 100644
--- a/pom.xml
+++ b/pom.xml
@@ -145,7 +145,7 @@
<zt-exec.version>1.11</zt-exec.version>
<!-- Maven plugin versions (keep sorted alphabetically) -->
- <cq-plugin.version>0.30.0</cq-plugin.version>
+ <cq-plugin.version>0.31.0</cq-plugin.version>
<build-helper-maven-plugin.version>3.1.0</build-helper-maven-plugin.version>
<exec-maven-plugin.version>3.0.0</exec-maven-plugin.version>
<formatter-maven-plugin.version>2.11.0</formatter-maven-plugin.version>
diff --git a/tooling/create-extension-templates/integration-test-pom.xml b/tooling/create-extension-templates/integration-test-pom.xml
index 51d3e9a..c5eefb4 100644
--- a/tooling/create-extension-templates/integration-test-pom.xml
+++ b/tooling/create-extension-templates/integration-test-pom.xml
@@ -23,7 +23,7 @@
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.camel.quarkus</groupId>
- <artifactId>camel-quarkus-[#if nativeSupported]integration-tests[#else]build-parent-it[/#if]</artifactId>
+ <artifactId>camel-quarkus-[#if nativeSupported][=itestParentArtifactId][#else]build-parent-it[/#if]</artifactId>
<version>[=version]</version>
<relativePath>[#if nativeSupported]../pom.xml[#else]../../../poms/build-parent-it/pom.xml[/#if]</relativePath>
</parent>