You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2020/03/19 11:33:37 UTC

[camel-kafka-connector] branch master updated: Added support for remote AWS testing

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 658da77  Added support for remote AWS testing
     new 703f37c  Merge pull request #107 from orpiske/decouple-aws
658da77 is described below

commit 658da770547698d4cb2da8230b2e2675776d1d6d
Author: Otavio R. Piske <an...@gmail.com>
AuthorDate: Sat Feb 29 15:16:13 2020 +0100

    Added support for remote AWS testing
    
    Includes:
    - Added remote testing support for AWS SNS sink
    - Added remote testing support for AWS SQS source/sink
    - Added remote testing support for AWS S3 source
    - Added remote testing support for AWS Kinesis source
    - Updated documentation for remote testing
---
 README.adoc                                        |  30 +++-
 .../apache/camel/kafkaconnector/ContainerUtil.java |  53 -------
 .../{ => clients/aws}/AWSConfigs.java              |   4 +-
 .../clients/aws/sqs/AWSSQSClient.java              |  11 +-
 .../aws/sqs/TestAWSCredentialsProvider.java        |  69 +++++++++
 .../services/aws/AWSClientUtils.java               | 168 +++++++++++++++++++++
 .../aws/AWSKinesisLocalContainerService.java       |  57 +++++++
 .../services/aws/AWSLocalContainerService.java     | 104 +++++++++++++
 .../services/aws/AWSRemoteService.java             |  80 ++++++++++
 .../services/aws/AWSS3LocalContainerService.java   |  57 +++++++
 .../services/aws/AWSSNSLocalContainerService.java  |  56 +++++++
 .../services/aws/AWSSQSLocalContainerService.java  |  54 +++++++
 .../kafkaconnector/services/aws/AWSService.java    |  55 +++++++
 .../services/aws/AWSServiceFactory.java            |  97 ++++++++++++
 .../sink/aws/sns/CamelAWSSNSPropertyFactory.java   |   2 +-
 .../sink/aws/sns/CamelSinkAWSSNSITCase.java        |  33 ++--
 .../sink/aws/sns/TestSNSConfiguration.java         |  70 +--------
 .../sink/aws/sqs/CamelAWSSQSPropertyFactory.java   |  27 +++-
 .../sink/aws/sqs/CamelSinkAWSSQSITCase.java        |  25 ++-
 .../kinesis/CamelAWSKinesisPropertyFactory.java    |   2 +-
 .../aws/kinesis/CamelSourceAWSKinesisITCase.java   |  38 +----
 .../aws/kinesis/TestKinesisConfiguration.java      |  52 +------
 .../source/aws/s3/CamelAWSS3PropertyFactory.java   |   2 +-
 .../source/aws/s3/CamelSourceAWSS3ITCase.java      |  37 +----
 .../source/aws/s3/TestS3Configuration.java         |  53 +------
 .../source/aws/sqs/CamelAWSSQSPropertyFactory.java |  25 ++-
 .../source/aws/sqs/CamelSourceAWSSQSITCase.java    |  27 ++--
 27 files changed, 927 insertions(+), 361 deletions(-)

diff --git a/README.adoc b/README.adoc
index d9298ca..97693db 100644
--- a/README.adoc
+++ b/README.adoc
@@ -34,12 +34,40 @@ To run the integration tests it is required to:
 mvn -DskipIntegrationTests=false clean verify package
 ----
 
-It is also possible to point the tests to use an external Kafka broker. To do so, run the tests using:
+It is also possible to point the tests to use an external services. To do so, you must set
+properties for the services that you want to run. This causes the tests to not launch the local
+container and use existing remote instances. At the moment, the following properties can be set
+for remote testing:
+
+* kafka.instance.type
+** kafka.bootstrap.servers
+* aws-service.instance.type
+** access.key: AWS access key (mandatory for remote testing)
+** secret.key: AWS secret key (mandatory for remote testing)
+** aws.region: AWS region (optional)
+** aws.host: AWS host (optional)
+* aws-service.kinesis.instance.type
+** access.key: AWS access key (mandatory for remote testing)
+** secret.key: AWS secret key (mandatory for remote testing)
+** aws.region: AWS region (optional)
+** aws.host: AWS host (optional)
+* elasticsearch.instance.type
+** elasticsearch.host
+** elasticsearch.port
+* cassandra.instance.type
+** cassandra.host
+** cassandra.cql3.port
+* jms-service.instance.type
+** jms.broker.address
+
 
 ----
 mvn -Dkafka.bootstrap.servers=host1:port -Dkafka.instance.type=remote -DskipIntegrationTests=false clean verify package
 ----
 
+It's possible to use a properties file to set these properties. To do so use `-Dtest.properties=/path/to/file.properties`.
+
+
 === Try it out locally
 
 You can use Camel Kafka connectors with local Apache Kafka installation.
diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/ContainerUtil.java b/tests/src/test/java/org/apache/camel/kafkaconnector/ContainerUtil.java
deleted file mode 100644
index ee5e84e..0000000
--- a/tests/src/test/java/org/apache/camel/kafkaconnector/ContainerUtil.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.camel.kafkaconnector;
-
-import java.util.Properties;
-
-import com.amazonaws.auth.AWSCredentials;
-import com.amazonaws.regions.Regions;
-import org.testcontainers.containers.localstack.LocalStackContainer;
-
-
-public final class ContainerUtil {
-
-    private ContainerUtil() {
-    }
-
-
-    public static Properties setupAWSConfigs(LocalStackContainer container, int service) {
-        Properties properties = new Properties();
-
-        final String amazonAWSHost = "localhost:" + container.getMappedPort(service);
-        properties.put(AWSConfigs.AMAZON_AWS_HOST,  amazonAWSHost);
-        System.setProperty(AWSConfigs.AMAZON_AWS_HOST, amazonAWSHost);
-
-        AWSCredentials credentials = container.getDefaultCredentialsProvider().getCredentials();
-
-        properties.put(AWSConfigs.ACCESS_KEY, credentials.getAWSAccessKeyId());
-        System.setProperty(AWSConfigs.ACCESS_KEY, credentials.getAWSAccessKeyId());
-
-        properties.put(AWSConfigs.SECRET_KEY, credentials.getAWSSecretKey());
-        System.setProperty(AWSConfigs.SECRET_KEY, credentials.getAWSSecretKey());
-
-        properties.put(AWSConfigs.REGION, Regions.US_EAST_1.name());
-        System.setProperty(AWSConfigs.REGION, Regions.US_EAST_1.name());
-
-        return properties;
-    }
-}
diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/AWSConfigs.java b/tests/src/test/java/org/apache/camel/kafkaconnector/clients/aws/AWSConfigs.java
similarity index 91%
rename from tests/src/test/java/org/apache/camel/kafkaconnector/AWSConfigs.java
rename to tests/src/test/java/org/apache/camel/kafkaconnector/clients/aws/AWSConfigs.java
index f5da049..5038542 100644
--- a/tests/src/test/java/org/apache/camel/kafkaconnector/AWSConfigs.java
+++ b/tests/src/test/java/org/apache/camel/kafkaconnector/clients/aws/AWSConfigs.java
@@ -14,8 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
-package org.apache.camel.kafkaconnector;
+package org.apache.camel.kafkaconnector.clients.aws;
 
 
 public final class AWSConfigs {
@@ -24,6 +23,7 @@ public final class AWSConfigs {
     public static final String REGION = "aws.region";
     public static final String AMAZON_AWS_HOST = "aws.host";
     public static final String AMAZON_AWS_SNS_2_SQS_QUEUE_URL = "aws.sns.2.sqs.queue.url";
+    public static final String PROTOCOL = "aws.protocol";
 
     private AWSConfigs() {
     }
diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/clients/aws/sqs/AWSSQSClient.java b/tests/src/test/java/org/apache/camel/kafkaconnector/clients/aws/sqs/AWSSQSClient.java
index 161fc3e..cb4539d 100644
--- a/tests/src/test/java/org/apache/camel/kafkaconnector/clients/aws/sqs/AWSSQSClient.java
+++ b/tests/src/test/java/org/apache/camel/kafkaconnector/clients/aws/sqs/AWSSQSClient.java
@@ -23,7 +23,6 @@ import java.util.Map;
 import java.util.function.Predicate;
 
 import com.amazonaws.services.sqs.AmazonSQS;
-import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
 import com.amazonaws.services.sqs.model.CreateQueueRequest;
 import com.amazonaws.services.sqs.model.Message;
 import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
@@ -31,7 +30,6 @@ import com.amazonaws.services.sqs.model.ReceiveMessageResult;
 import com.amazonaws.services.sqs.model.SendMessageRequest;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.testcontainers.containers.localstack.LocalStackContainer;
 
 public class AWSSQSClient {
     private static final Logger LOG = LoggerFactory.getLogger(AWSSQSClient.class);
@@ -40,13 +38,8 @@ public class AWSSQSClient {
     private int maxWaitTime = 10;
     private int maxNumberOfMessages = 1;
 
-    public AWSSQSClient(LocalStackContainer localStackContainer) {
-        sqs = AmazonSQSClientBuilder
-                .standard()
-                .withEndpointConfiguration(localStackContainer
-                        .getEndpointConfiguration(LocalStackContainer.Service.SQS))
-                .withCredentials(localStackContainer.getDefaultCredentialsProvider())
-                .build();
+    public AWSSQSClient(AmazonSQS sqs) {
+        this.sqs = sqs;
     }
 
     public String getQueue(String queue) {
diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/clients/aws/sqs/TestAWSCredentialsProvider.java b/tests/src/test/java/org/apache/camel/kafkaconnector/clients/aws/sqs/TestAWSCredentialsProvider.java
new file mode 100644
index 0000000..e348f40
--- /dev/null
+++ b/tests/src/test/java/org/apache/camel/kafkaconnector/clients/aws/sqs/TestAWSCredentialsProvider.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.kafkaconnector.clients.aws.sqs;
+
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import org.apache.camel.kafkaconnector.clients.aws.AWSConfigs;
+
+public class TestAWSCredentialsProvider implements AWSCredentialsProvider {
+    private static class TestAWSCredentials implements AWSCredentials {
+        private final String accessKey;
+        private final String secretKey;
+
+
+        public TestAWSCredentials() {
+            this(System.getProperty(AWSConfigs.ACCESS_KEY), System.getProperty(AWSConfigs.SECRET_KEY));
+        }
+
+        public TestAWSCredentials(String accessKey, String secretKey) {
+            this.accessKey = accessKey;
+            this.secretKey = secretKey;
+        }
+
+        @Override
+        public String getAWSAccessKeyId() {
+            return accessKey;
+        }
+
+        @Override
+        public String getAWSSecretKey() {
+            return secretKey;
+        }
+    };
+
+    private AWSCredentials credentials;
+
+    public TestAWSCredentialsProvider() {
+        credentials = new TestAWSCredentials();
+    }
+
+
+    public TestAWSCredentialsProvider(String accessKey, String secretKey) {
+        credentials = new TestAWSCredentials(accessKey, secretKey);
+    }
+
+    @Override
+    public AWSCredentials getCredentials() {
+        return credentials;
+    }
+
+    @Override
+    public void refresh() {
+
+    }
+}
diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/services/aws/AWSClientUtils.java b/tests/src/test/java/org/apache/camel/kafkaconnector/services/aws/AWSClientUtils.java
new file mode 100644
index 0000000..625bc04
--- /dev/null
+++ b/tests/src/test/java/org/apache/camel/kafkaconnector/services/aws/AWSClientUtils.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.services.aws;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.Protocol;
+import com.amazonaws.client.builder.AwsClientBuilder;
+import com.amazonaws.regions.Regions;
+import com.amazonaws.services.kinesis.AmazonKinesis;
+import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3ClientBuilder;
+import com.amazonaws.services.sns.AmazonSNS;
+import com.amazonaws.services.sns.AmazonSNSClientBuilder;
+import com.amazonaws.services.sqs.AmazonSQS;
+import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
+import org.apache.camel.kafkaconnector.clients.aws.AWSConfigs;
+import org.apache.camel.kafkaconnector.clients.aws.sqs.TestAWSCredentialsProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class AWSClientUtils {
+    private static final Logger LOG = LoggerFactory.getLogger(AWSClientUtils.class);
+
+    private AWSClientUtils() {
+    }
+
+    private static String getRegion() {
+        String regionStr = System.getProperty(AWSConfigs.REGION);
+        String region;
+
+        if (regionStr != null && !regionStr.isEmpty()) {
+            region = Regions.valueOf(regionStr).getName();
+        } else {
+            region = Regions.US_EAST_1.getName();
+        }
+
+        return region;
+    }
+
+
+    public static AmazonSNS newSNSClient() {
+        LOG.debug("Creating a custom SNS client for running a AWS SNS test");
+        AmazonSNSClientBuilder clientBuilder = AmazonSNSClientBuilder
+                .standard();
+
+        String awsInstanceType = System.getProperty("aws-service.instance.type");
+        String region = getRegion();
+
+        if (awsInstanceType == null || awsInstanceType.equals("local-aws-container")) {
+            String amazonHost = System.getProperty(AWSConfigs.AMAZON_AWS_HOST);
+
+            ClientConfiguration clientConfiguration = new ClientConfiguration();
+            clientConfiguration.setProtocol(Protocol.HTTP);
+
+            clientBuilder
+                    .withClientConfiguration(clientConfiguration)
+                    .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(amazonHost, region))
+                    .withCredentials(new TestAWSCredentialsProvider("accesskey", "secretkey"));
+        } else {
+            clientBuilder
+                    .withRegion(region)
+                    .withCredentials(new TestAWSCredentialsProvider());
+        }
+
+        return clientBuilder.build();
+    }
+
+    public static AmazonSQS newSQSClient() {
+        LOG.debug("Creating a custom SQS client for running a AWS SNS test");
+        AmazonSQSClientBuilder clientBuilder = AmazonSQSClientBuilder
+                .standard();
+
+        String awsInstanceType = System.getProperty("aws-service.instance.type");
+        String region = getRegion();
+
+        if (awsInstanceType == null || awsInstanceType.equals("local-aws-container")) {
+            String amazonHost = System.getProperty(AWSConfigs.AMAZON_AWS_HOST);
+
+            ClientConfiguration clientConfiguration = new ClientConfiguration();
+            clientConfiguration.setProtocol(Protocol.HTTP);
+
+            clientBuilder
+                    .withClientConfiguration(clientConfiguration)
+                    .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(amazonHost, region))
+                    .withCredentials(new TestAWSCredentialsProvider("accesskey", "secretkey"));
+        } else {
+            clientBuilder
+                    .withRegion(region)
+                    .withCredentials(new TestAWSCredentialsProvider());
+        }
+
+
+
+        return clientBuilder.build();
+    }
+
+    public static AmazonS3 newS3Client() {
+        LOG.debug("Creating a new S3 client");
+        AmazonS3ClientBuilder clientBuilder = AmazonS3ClientBuilder.standard();
+
+        String awsInstanceType = System.getProperty("aws-service.instance.type");
+        String region = getRegion();
+
+        if (awsInstanceType == null || awsInstanceType.equals("local-aws-container")) {
+            String amazonHost = System.getProperty(AWSConfigs.AMAZON_AWS_HOST);
+            ClientConfiguration clientConfiguration = new ClientConfiguration();
+            clientConfiguration.setProtocol(Protocol.HTTP);
+
+            clientBuilder
+                    .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(amazonHost, region))
+                    .withClientConfiguration(clientConfiguration)
+                    .withCredentials(new TestAWSCredentialsProvider("accesskey", "secretkey"));
+        } else {
+            clientBuilder
+                    .withRegion(region)
+                    .withCredentials(new TestAWSCredentialsProvider());
+        }
+
+        clientBuilder
+                .withPathStyleAccessEnabled(true);
+
+        return clientBuilder.build();
+    }
+
+    public static AmazonKinesis newKinesisClient() {
+        LOG.debug("Creating a new AWS Kinesis client");
+        AmazonKinesisClientBuilder clientBuilder = AmazonKinesisClientBuilder.standard();
+
+        String awsInstanceType = System.getProperty("aws-service.kinesis.instance.type");
+        String region = getRegion();
+
+        if (awsInstanceType == null || awsInstanceType.equals("local-aws-container")) {
+            String amazonHost = System.getProperty(AWSConfigs.AMAZON_AWS_HOST);
+
+            LOG.debug("Creating a new AWS Kinesis client to access {}", amazonHost);
+
+            ClientConfiguration clientConfiguration = new ClientConfiguration();
+            clientConfiguration.setProtocol(Protocol.HTTP);
+
+            clientBuilder
+                    .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(amazonHost, region))
+                    .withClientConfiguration(clientConfiguration)
+                    .withCredentials(new TestAWSCredentialsProvider("accesskey", "secretkey"));
+        } else {
+            clientBuilder
+                .withRegion(region)
+                .withCredentials(new TestAWSCredentialsProvider());
+        }
+
+        return clientBuilder.build();
+    }
+}
diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/services/aws/AWSKinesisLocalContainerService.java b/tests/src/test/java/org/apache/camel/kafkaconnector/services/aws/AWSKinesisLocalContainerService.java
new file mode 100644
index 0000000..6a5353b
--- /dev/null
+++ b/tests/src/test/java/org/apache/camel/kafkaconnector/services/aws/AWSKinesisLocalContainerService.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.services.aws;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.Protocol;
+import com.amazonaws.services.kinesis.AmazonKinesis;
+import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
+import org.testcontainers.containers.localstack.LocalStackContainer;
+
+public class AWSKinesisLocalContainerService extends AWSLocalContainerService<AmazonKinesis> {
+
+    public AWSKinesisLocalContainerService() {
+        super(LocalStackContainer.Service.KINESIS);
+    }
+
+    @Override
+    public String getServiceEndpoint() {
+        return super.getServiceEndpoint(LocalStackContainer.Service.KINESIS);
+    }
+
+    @Override
+    public String getAmazonHost() {
+        final int kinesisPort = 4568;
+
+        return "localhost:" + getContainer().getMappedPort(kinesisPort);
+    }
+
+
+    @Override
+    public AmazonKinesis getClient() {
+        ClientConfiguration clientConfiguration = new ClientConfiguration();
+        clientConfiguration.setProtocol(Protocol.HTTP);
+
+        return AmazonKinesisClientBuilder
+                .standard()
+                .withEndpointConfiguration(getContainer().getEndpointConfiguration(LocalStackContainer.Service.KINESIS))
+                .withCredentials(getContainer().getDefaultCredentialsProvider())
+                .withClientConfiguration(clientConfiguration)
+                .build();
+    }
+}
diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/services/aws/AWSLocalContainerService.java b/tests/src/test/java/org/apache/camel/kafkaconnector/services/aws/AWSLocalContainerService.java
new file mode 100644
index 0000000..596585b
--- /dev/null
+++ b/tests/src/test/java/org/apache/camel/kafkaconnector/services/aws/AWSLocalContainerService.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.services.aws;
+
+import java.util.Properties;
+
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.regions.Regions;
+import org.apache.camel.kafkaconnector.clients.aws.AWSConfigs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.localstack.LocalStackContainer;
+
+abstract class AWSLocalContainerService<T> implements AWSService<T> {
+    private static final Logger LOG = LoggerFactory.getLogger(AWSLocalContainerService.class);
+    private final LocalStackContainer container;
+
+    public AWSLocalContainerService(LocalStackContainer.Service...services) {
+        this.container = new LocalStackContainer().withServices(services);
+
+        container.start();
+    }
+
+
+    protected abstract String getAmazonHost();
+
+    protected abstract String getServiceEndpoint();
+
+    @Override
+    public void initialize() {
+        LOG.info("AWS service running at address {}", getServiceEndpoint());
+    }
+
+    @Override
+    public void shutdown() {
+        LOG.info("Stopping local AWS service");
+        container.stop();
+    }
+
+    @Override
+    public AWSCredentials getCredentials() {
+        return container.getDefaultCredentialsProvider().getCredentials();
+    }
+
+
+    @Override
+    public Properties getConnectionProperties() {
+        Properties properties = new Properties();
+
+        AWSCredentials credentials = getCredentials();
+
+        properties.put(AWSConfigs.ACCESS_KEY, credentials.getAWSAccessKeyId());
+
+        properties.put(AWSConfigs.SECRET_KEY, credentials.getAWSSecretKey());
+
+        properties.put(AWSConfigs.REGION, Regions.US_EAST_1.name());
+
+        properties.put(AWSConfigs.AMAZON_AWS_HOST,  getAmazonHost());
+
+        /**
+         * We need to set this one. For some sets, when they instantiate the clients within
+         * Camel, they need to know what is the Amazon host being used (ie.: when creating them
+         * using the withEndpointConfiguration()). Because this happens within Camel, there's
+         * no way to pass that information easily. Therefore, the host is set as a property
+         * and read by whatever class/method creates the clients to pass to Camel.
+         *
+         * Do not unset.
+         */
+        System.setProperty(AWSConfigs.AMAZON_AWS_HOST, getAmazonHost());
+
+        properties.put(AWSConfigs.PROTOCOL, "http");
+
+        return properties;
+    }
+
+    protected LocalStackContainer getContainer() {
+        return container;
+    }
+
+    protected String getAmazonHost(int port) {
+        return "localhost:" + container.getMappedPort(port);
+    }
+
+    protected String getServiceEndpoint(LocalStackContainer.Service service) {
+        return container
+                .getEndpointConfiguration(service)
+                .getServiceEndpoint();
+    }
+}
diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/services/aws/AWSRemoteService.java b/tests/src/test/java/org/apache/camel/kafkaconnector/services/aws/AWSRemoteService.java
new file mode 100644
index 0000000..2a4c23a
--- /dev/null
+++ b/tests/src/test/java/org/apache/camel/kafkaconnector/services/aws/AWSRemoteService.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.services.aws;
+
+import java.util.Properties;
+import java.util.function.Supplier;
+
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.regions.Regions;
+import com.amazonaws.services.sqs.AmazonSQS;
+import org.apache.camel.kafkaconnector.clients.aws.AWSConfigs;
+import org.apache.camel.kafkaconnector.clients.aws.sqs.AWSSQSClient;
+import org.apache.camel.kafkaconnector.clients.aws.sqs.TestAWSCredentialsProvider;
+
+public class AWSRemoteService<T> implements AWSService<T> {
+    private static final AWSCredentialsProvider CREDENTIALS_PROVIDER = new TestAWSCredentialsProvider();
+    private Supplier<T> remoteClientSupplier;
+
+    public AWSRemoteService(Supplier<T> remoteClientSupplier) {
+        this.remoteClientSupplier = remoteClientSupplier;
+    }
+
+
+    @Override
+    public T getClient() {
+        return remoteClientSupplier.get();
+    }
+
+    @Override
+    public AWSCredentials getCredentials() {
+        return CREDENTIALS_PROVIDER.getCredentials();
+    }
+
+    @Override
+    public Properties getConnectionProperties() {
+        Properties properties = new Properties();
+
+        AWSCredentials credentials = getCredentials();
+
+        properties.put(AWSConfigs.ACCESS_KEY, credentials.getAWSAccessKeyId());
+        properties.put(AWSConfigs.SECRET_KEY, credentials.getAWSSecretKey());
+        properties.put(AWSConfigs.REGION, Regions.US_EAST_1.name());
+
+        return properties;
+    }
+
+    @Override
+    public void initialize() {
+
+    }
+
+    @Override
+    public void shutdown() {
+
+    }
+
+    public static AWSSQSClient newSQSClient() {
+        AmazonSQS sqs = AWSClientUtils.newSQSClient();
+
+        return new AWSSQSClient(sqs);
+    }
+
+
+}
diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/services/aws/AWSS3LocalContainerService.java b/tests/src/test/java/org/apache/camel/kafkaconnector/services/aws/AWSS3LocalContainerService.java
new file mode 100644
index 0000000..ab7679b
--- /dev/null
+++ b/tests/src/test/java/org/apache/camel/kafkaconnector/services/aws/AWSS3LocalContainerService.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.services.aws;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.Protocol;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3ClientBuilder;
+import org.testcontainers.containers.localstack.LocalStackContainer;
+
+public class AWSS3LocalContainerService extends AWSLocalContainerService<AmazonS3> {
+
+    public AWSS3LocalContainerService() {
+        super(LocalStackContainer.Service.S3);
+    }
+
+
+    @Override
+    public String getServiceEndpoint() {
+        return super.getServiceEndpoint(LocalStackContainer.Service.S3);
+    }
+
+    @Override
+    public String getAmazonHost() {
+        final int s3Port = 4572;
+
+        return "localhost:" + getContainer().getMappedPort(s3Port);
+    }
+
+    @Override
+    public AmazonS3 getClient() {
+        ClientConfiguration clientConfiguration = new ClientConfiguration();
+        clientConfiguration.setProtocol(Protocol.HTTP);
+
+        return AmazonS3ClientBuilder
+                .standard()
+                .withEndpointConfiguration(getContainer().getEndpointConfiguration(LocalStackContainer.Service.S3))
+                .withCredentials(getContainer().getDefaultCredentialsProvider())
+                .withClientConfiguration(clientConfiguration)
+                .build();
+    }
+}
diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/services/aws/AWSSNSLocalContainerService.java b/tests/src/test/java/org/apache/camel/kafkaconnector/services/aws/AWSSNSLocalContainerService.java
new file mode 100644
index 0000000..8f1b8e0
--- /dev/null
+++ b/tests/src/test/java/org/apache/camel/kafkaconnector/services/aws/AWSSNSLocalContainerService.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.services.aws;
+
+import com.amazonaws.services.sqs.AmazonSQS;
+import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
+import org.apache.camel.kafkaconnector.clients.aws.sqs.AWSSQSClient;
+import org.testcontainers.containers.localstack.LocalStackContainer;
+
+public class AWSSNSLocalContainerService extends AWSLocalContainerService<AWSSQSClient> {
+
+    public AWSSNSLocalContainerService() {
+        super(LocalStackContainer.Service.SQS,
+                LocalStackContainer.Service.SNS);
+    }
+
+    @Override
+    public String getServiceEndpoint() {
+        return super.getServiceEndpoint(LocalStackContainer.Service.SNS);
+    }
+
+    @Override
+    public String getAmazonHost() {
+        final int snsPort = 4575;
+
+        return super.getAmazonHost(snsPort);
+    }
+
+
+    @Override
+    public AWSSQSClient getClient() {
+        AmazonSQS sqs = AmazonSQSClientBuilder
+                .standard()
+                .withEndpointConfiguration(getContainer()
+                        .getEndpointConfiguration(LocalStackContainer.Service.SQS))
+                .withCredentials(getContainer().getDefaultCredentialsProvider())
+                .build();
+
+        return new AWSSQSClient(sqs);
+    }
+}
diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/services/aws/AWSSQSLocalContainerService.java b/tests/src/test/java/org/apache/camel/kafkaconnector/services/aws/AWSSQSLocalContainerService.java
new file mode 100644
index 0000000..1e7ffd9
--- /dev/null
+++ b/tests/src/test/java/org/apache/camel/kafkaconnector/services/aws/AWSSQSLocalContainerService.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.services.aws;
+
+import com.amazonaws.services.sqs.AmazonSQS;
+import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
+import org.apache.camel.kafkaconnector.clients.aws.sqs.AWSSQSClient;
+import org.testcontainers.containers.localstack.LocalStackContainer;
+
+public class AWSSQSLocalContainerService extends AWSLocalContainerService<AWSSQSClient> {
+
+    public AWSSQSLocalContainerService() {
+        super(LocalStackContainer.Service.SQS);
+    }
+
+    @Override
+    public String getServiceEndpoint() {
+        return super.getServiceEndpoint(LocalStackContainer.Service.SQS);
+    }
+
+    @Override
+    public String getAmazonHost() {
+        final int sqsPort = 4576;
+
+        return super.getAmazonHost(sqsPort);
+    }
+
+    @Override
+    public AWSSQSClient getClient() {
+        AmazonSQS sqs = AmazonSQSClientBuilder
+                .standard()
+                .withEndpointConfiguration(getContainer()
+                        .getEndpointConfiguration(LocalStackContainer.Service.SQS))
+                .withCredentials(getContainer().getDefaultCredentialsProvider())
+                .build();
+
+        return new AWSSQSClient(sqs);
+    }
+}
diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/services/aws/AWSService.java b/tests/src/test/java/org/apache/camel/kafkaconnector/services/aws/AWSService.java
new file mode 100644
index 0000000..ffff052
--- /dev/null
+++ b/tests/src/test/java/org/apache/camel/kafkaconnector/services/aws/AWSService.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.services.aws;
+
+import java.util.Properties;
+
+import com.amazonaws.auth.AWSCredentials;
+import org.junit.jupiter.api.extension.AfterAllCallback;
+import org.junit.jupiter.api.extension.BeforeAllCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+
+public interface AWSService<T> extends BeforeAllCallback, AfterAllCallback {
+
+    T getClient();
+
+    AWSCredentials getCredentials();
+
+    Properties getConnectionProperties();
+
+    /**
+     * Perform any initialization necessary
+     */
+    void initialize();
+
+    /**
+     * Shuts down the service after the test has completed
+     */
+    void shutdown();
+
+
+    @Override
+    default void beforeAll(ExtensionContext extensionContext) throws Exception {
+        initialize();
+    }
+
+    @Override
+    default void afterAll(ExtensionContext extensionContext) throws Exception {
+        shutdown();
+    }
+}
diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/services/aws/AWSServiceFactory.java b/tests/src/test/java/org/apache/camel/kafkaconnector/services/aws/AWSServiceFactory.java
new file mode 100644
index 0000000..ea12594
--- /dev/null
+++ b/tests/src/test/java/org/apache/camel/kafkaconnector/services/aws/AWSServiceFactory.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.services.aws;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class AWSServiceFactory {
+    private static final Logger LOG = LoggerFactory.getLogger(AWSServiceFactory.class);
+
+    private AWSServiceFactory() {
+    }
+
+    public static AWSService createSQSService() {
+        String awsInstanceType = System.getProperty("aws-service.instance.type");
+        LOG.info("Creating a {} AWS SQS instance", awsInstanceType);
+
+        if (awsInstanceType == null || awsInstanceType.equals("local-aws-container")) {
+            return new AWSSQSLocalContainerService();
+        }
+
+        if (awsInstanceType.equals("remote")) {
+            return new AWSRemoteService(AWSRemoteService::newSQSClient);
+        }
+
+        LOG.error("Invalid AWS instance type: {}. Must be either 'remote' or 'local-aws-container'",
+                awsInstanceType);
+        throw new UnsupportedOperationException("Invalid AWS instance type");
+    }
+
+    public static AWSService createSNSService() {
+        String awsInstanceType = System.getProperty("aws-service.instance.type");
+        LOG.info("Creating a {} AWS SNS instance", awsInstanceType);
+
+        if (awsInstanceType == null || awsInstanceType.equals("local-aws-container")) {
+            return new AWSSNSLocalContainerService();
+        }
+
+        if (awsInstanceType.equals("remote")) {
+            return new AWSRemoteService(AWSRemoteService::newSQSClient);
+        }
+
+        LOG.error("Invalid AWS instance type: {}. Must be either 'remote' or 'local-aws-container'",
+                awsInstanceType);
+        throw new UnsupportedOperationException("Invalid AWS instance type");
+
+    }
+
+    public static AWSService createKinesisService() {
+        String awsInstanceType = System.getProperty("aws-service.kinesis.instance.type");
+        LOG.info("Creating a {} AWS kinesis instance", awsInstanceType);
+
+        if (awsInstanceType == null || awsInstanceType.equals("local-aws-container")) {
+            return new AWSKinesisLocalContainerService();
+        }
+
+        if (awsInstanceType.equals("remote")) {
+            return new AWSRemoteService(AWSClientUtils::newKinesisClient);
+        }
+
+        LOG.error("Invalid AWS instance type: {}. Must be either 'remote' or 'local-aws-container'",
+                awsInstanceType);
+        throw new UnsupportedOperationException("Invalid AWS instance type");
+    }
+
+    public static AWSService createS3Service() {
+        String awsInstanceType = System.getProperty("aws-service.instance.type");
+        LOG.info("Creating a {} AWS S3 instance", awsInstanceType);
+
+        if (awsInstanceType == null || awsInstanceType.equals("local-aws-container")) {
+            return new AWSS3LocalContainerService();
+        }
+
+        if (awsInstanceType.equals("remote")) {
+            return new AWSRemoteService(AWSClientUtils::newS3Client);
+        }
+
+        LOG.error("Invalid AWS instance type: {}. Must be either 'remote' or 'local-aws-container'",
+                awsInstanceType);
+        throw new UnsupportedOperationException("Invalid AWS instance type");
+    }
+}
diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/sink/aws/sns/CamelAWSSNSPropertyFactory.java b/tests/src/test/java/org/apache/camel/kafkaconnector/sink/aws/sns/CamelAWSSNSPropertyFactory.java
index 389ad98..d801d7d 100644
--- a/tests/src/test/java/org/apache/camel/kafkaconnector/sink/aws/sns/CamelAWSSNSPropertyFactory.java
+++ b/tests/src/test/java/org/apache/camel/kafkaconnector/sink/aws/sns/CamelAWSSNSPropertyFactory.java
@@ -19,8 +19,8 @@ package org.apache.camel.kafkaconnector.sink.aws.sns;
 
 import java.util.Properties;
 
-import org.apache.camel.kafkaconnector.AWSConfigs;
 import org.apache.camel.kafkaconnector.ConnectorPropertyFactory;
+import org.apache.camel.kafkaconnector.clients.aws.AWSConfigs;
 import org.apache.kafka.connect.runtime.ConnectorConfig;
 
 /**
diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/sink/aws/sns/CamelSinkAWSSNSITCase.java b/tests/src/test/java/org/apache/camel/kafkaconnector/sink/aws/sns/CamelSinkAWSSNSITCase.java
index 9495d96..d486c75 100644
--- a/tests/src/test/java/org/apache/camel/kafkaconnector/sink/aws/sns/CamelSinkAWSSNSITCase.java
+++ b/tests/src/test/java/org/apache/camel/kafkaconnector/sink/aws/sns/CamelSinkAWSSNSITCase.java
@@ -25,20 +25,20 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 import com.amazonaws.services.sqs.model.Message;
-import org.apache.camel.kafkaconnector.AWSConfigs;
 import org.apache.camel.kafkaconnector.AbstractKafkaTest;
 import org.apache.camel.kafkaconnector.ConnectorPropertyFactory;
-import org.apache.camel.kafkaconnector.ContainerUtil;
 import org.apache.camel.kafkaconnector.TestCommon;
+import org.apache.camel.kafkaconnector.clients.aws.AWSConfigs;
 import org.apache.camel.kafkaconnector.clients.aws.sqs.AWSSQSClient;
 import org.apache.camel.kafkaconnector.clients.kafka.KafkaClient;
+import org.apache.camel.kafkaconnector.services.aws.AWSService;
+import org.apache.camel.kafkaconnector.services.aws.AWSServiceFactory;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.RegisterExtension;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.testcontainers.containers.localstack.LocalStackContainer;
-import org.testcontainers.junit.jupiter.Container;
 import org.testcontainers.junit.jupiter.Testcontainers;
 
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -46,12 +46,10 @@ import static org.junit.jupiter.api.Assertions.fail;
 
 @Testcontainers
 public class CamelSinkAWSSNSITCase extends AbstractKafkaTest  {
-    private static final Logger LOG = LoggerFactory.getLogger(CamelSinkAWSSNSITCase.class);
-    private static final int SNS_PORT = 4575;
+    @RegisterExtension
+    public static AWSService<AWSSQSClient> service = AWSServiceFactory.createSNSService();
 
-    @Container
-    public LocalStackContainer localStackContainer = new LocalStackContainer()
-            .withServices(LocalStackContainer.Service.SQS, LocalStackContainer.Service.SNS);
+    private static final Logger LOG = LoggerFactory.getLogger(CamelSinkAWSSNSITCase.class);
 
     private AWSSQSClient awsSqsClient;
 
@@ -60,20 +58,7 @@ public class CamelSinkAWSSNSITCase extends AbstractKafkaTest  {
 
     @BeforeEach
     public void setUp() {
-        final String sqsInstance = localStackContainer
-                .getEndpointConfiguration(LocalStackContainer.Service.SQS)
-                .getServiceEndpoint();
-
-        LOG.info("SQS instance running at {}", sqsInstance);
-
-        awsSqsClient = new AWSSQSClient(localStackContainer);
-
-
-        final String snsInstance = localStackContainer
-                .getEndpointConfiguration(LocalStackContainer.Service.SNS)
-                .getServiceEndpoint();
-
-        LOG.info("SNS instance running at {}", snsInstance);
+        awsSqsClient = service.getClient();
     }
 
     private boolean checkMessages(List<Message> messages) {
@@ -110,7 +95,7 @@ public class CamelSinkAWSSNSITCase extends AbstractKafkaTest  {
             final String sqsQueue = awsSqsClient.getQueue(TestCommon.DEFAULT_SQS_QUEUE_FOR_SNS);
             LOG.info("Created SQS queue {}", sqsQueue);
 
-            Properties properties = ContainerUtil.setupAWSConfigs(localStackContainer, SNS_PORT);
+            Properties properties = service.getConnectionProperties();
             properties.put(AWSConfigs.AMAZON_AWS_SNS_2_SQS_QUEUE_URL, sqsQueue);
 
             ConnectorPropertyFactory testProperties = new CamelAWSSNSPropertyFactory(1,
diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/sink/aws/sns/TestSNSConfiguration.java b/tests/src/test/java/org/apache/camel/kafkaconnector/sink/aws/sns/TestSNSConfiguration.java
index dc2fa49..11ded50 100644
--- a/tests/src/test/java/org/apache/camel/kafkaconnector/sink/aws/sns/TestSNSConfiguration.java
+++ b/tests/src/test/java/org/apache/camel/kafkaconnector/sink/aws/sns/TestSNSConfiguration.java
@@ -17,84 +17,20 @@
 
 package org.apache.camel.kafkaconnector.sink.aws.sns;
 
-import com.amazonaws.ClientConfiguration;
-import com.amazonaws.Protocol;
-import com.amazonaws.auth.AWSCredentials;
-import com.amazonaws.auth.AWSCredentialsProvider;
-import com.amazonaws.client.builder.AwsClientBuilder;
-import com.amazonaws.regions.Regions;
 import com.amazonaws.services.sns.AmazonSNS;
-import com.amazonaws.services.sns.AmazonSNSClientBuilder;
 import com.amazonaws.services.sqs.AmazonSQS;
-import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
 import org.apache.camel.component.aws.sns.SnsConfiguration;
-import org.apache.camel.kafkaconnector.AWSConfigs;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.camel.kafkaconnector.services.aws.AWSClientUtils;
 
 public class TestSNSConfiguration extends SnsConfiguration {
-    private static final Logger LOG = LoggerFactory.getLogger(TestSNSConfiguration.class);
-    private final String amazonHost;
-    private final String region;
-
-    private class TestAWSCredentialsProvider implements AWSCredentialsProvider {
-        @Override
-        public AWSCredentials getCredentials() {
-            return new AWSCredentials() {
-                @Override
-                public String getAWSAccessKeyId() {
-                    return System.getProperty(AWSConfigs.ACCESS_KEY);
-                }
-
-                @Override
-                public String getAWSSecretKey() {
-                    return System.getProperty(AWSConfigs.SECRET_KEY);
-                }
-            };
-        }
-
-        @Override
-        public void refresh() {
-
-        }
-    }
-
-    public TestSNSConfiguration() {
-        amazonHost = System.getProperty(AWSConfigs.AMAZON_AWS_HOST);
-        region = Regions.valueOf(System.getProperty(AWSConfigs.REGION)).getName();
-    }
 
     @Override
     public AmazonSNS getAmazonSNSClient() {
-        LOG.debug("Creating a custom SNS client for running a AWS SNS test");
-        AmazonSNSClientBuilder clientBuilder = AmazonSNSClientBuilder
-                .standard();
-
-        ClientConfiguration clientConfiguration = new ClientConfiguration();
-        clientConfiguration.setProtocol(Protocol.HTTP);
-
-        clientBuilder
-                .withClientConfiguration(clientConfiguration)
-                .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(amazonHost, region))
-                .withCredentials(new TestAWSCredentialsProvider());
-
-        return clientBuilder.build();
+        return AWSClientUtils.newSNSClient();
     }
 
     @Override
     public AmazonSQS getAmazonSQSClient() {
-        LOG.debug("Creating a custom SQS client for running a AWS SNS test");
-        AmazonSQSClientBuilder clientBuilder = AmazonSQSClientBuilder
-                .standard();
-
-        ClientConfiguration clientConfiguration = new ClientConfiguration();
-        clientConfiguration.setProtocol(Protocol.HTTP);
-
-        clientBuilder
-                .withClientConfiguration(clientConfiguration)
-                .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(amazonHost, region))
-                .withCredentials(new TestAWSCredentialsProvider());
-
-        return clientBuilder.build();
+        return AWSClientUtils.newSQSClient();
     }
 }
diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/sink/aws/sqs/CamelAWSSQSPropertyFactory.java b/tests/src/test/java/org/apache/camel/kafkaconnector/sink/aws/sqs/CamelAWSSQSPropertyFactory.java
index cad1fcf..d4b52c5 100644
--- a/tests/src/test/java/org/apache/camel/kafkaconnector/sink/aws/sqs/CamelAWSSQSPropertyFactory.java
+++ b/tests/src/test/java/org/apache/camel/kafkaconnector/sink/aws/sqs/CamelAWSSQSPropertyFactory.java
@@ -19,8 +19,9 @@ package org.apache.camel.kafkaconnector.sink.aws.sqs;
 
 import java.util.Properties;
 
-import org.apache.camel.kafkaconnector.AWSConfigs;
+import com.amazonaws.regions.Regions;
 import org.apache.camel.kafkaconnector.ConnectorPropertyFactory;
+import org.apache.camel.kafkaconnector.clients.aws.AWSConfigs;
 import org.apache.kafka.connect.runtime.ConnectorConfig;
 
 /**
@@ -43,6 +44,7 @@ class CamelAWSSQSPropertyFactory implements ConnectorPropertyFactory {
     @Override
     public Properties getProperties() {
         Properties connectorProps = new Properties();
+
         connectorProps.put(ConnectorConfig.NAME_CONFIG, "CamelAWSSQSSinkConnector");
         connectorProps.put("tasks.max", String.valueOf(tasksMax));
 
@@ -50,8 +52,24 @@ class CamelAWSSQSPropertyFactory implements ConnectorPropertyFactory {
         connectorProps.put(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
         connectorProps.put(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
 
-        String queueUrl = "aws-sqs://" + queue + "?autoCreateQueue=true&accessKey=accesskey&secretKey=secretKey&region=EU_WEST_1&protocol=http&amazonAWSHost="
-                + amazonConfigs.getProperty(AWSConfigs.AMAZON_AWS_HOST, "localhost");
+        String accessKey = amazonConfigs.getProperty(AWSConfigs.ACCESS_KEY, "");
+        String secretKey = amazonConfigs.getProperty(AWSConfigs.SECRET_KEY, "");
+
+        String region = amazonConfigs.getProperty(AWSConfigs.REGION, Regions.US_EAST_1.name());
+
+        String queueUrl = String.format("aws-sqs://%s?autoCreateQueue=true&accessKey=%s&secretKey=%s&region=%s",
+                    queue, accessKey, secretKey, region);
+
+        String protocol = amazonConfigs.getProperty(AWSConfigs.PROTOCOL);
+        if (protocol != null && !protocol.isEmpty()) {
+            queueUrl = String.format("%s&protocol=%s", queueUrl, protocol);
+        }
+
+        String amazonAWSHost = amazonConfigs.getProperty(AWSConfigs.AMAZON_AWS_HOST);
+        if (amazonAWSHost != null && !amazonAWSHost.isEmpty()) {
+            queueUrl = String.format("%s&amazonAWSHost=%s", queueUrl, amazonAWSHost);
+        }
+
 
         connectorProps.put("camel.sink.url", queueUrl);
         connectorProps.put("topics", topic);
@@ -61,8 +79,7 @@ class CamelAWSSQSPropertyFactory implements ConnectorPropertyFactory {
         connectorProps.put("camel.component.aws-sqs.configuration.secret-key",
                 amazonConfigs.getProperty(AWSConfigs.SECRET_KEY, ""));
 
-        connectorProps.put("camel.component.aws-sqs.configuration.region",
-            amazonConfigs.getProperty(AWSConfigs.REGION, ""));
+        connectorProps.put("camel.component.aws-sqs.configuration.region", region);
 
         return connectorProps;
     }
diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/sink/aws/sqs/CamelSinkAWSSQSITCase.java b/tests/src/test/java/org/apache/camel/kafkaconnector/sink/aws/sqs/CamelSinkAWSSQSITCase.java
index 314690e..2a4946d 100644
--- a/tests/src/test/java/org/apache/camel/kafkaconnector/sink/aws/sqs/CamelSinkAWSSQSITCase.java
+++ b/tests/src/test/java/org/apache/camel/kafkaconnector/sink/aws/sqs/CamelSinkAWSSQSITCase.java
@@ -27,17 +27,18 @@ import java.util.concurrent.TimeUnit;
 import com.amazonaws.services.sqs.model.Message;
 import org.apache.camel.kafkaconnector.AbstractKafkaTest;
 import org.apache.camel.kafkaconnector.ConnectorPropertyFactory;
-import org.apache.camel.kafkaconnector.ContainerUtil;
 import org.apache.camel.kafkaconnector.TestCommon;
+import org.apache.camel.kafkaconnector.clients.aws.AWSConfigs;
 import org.apache.camel.kafkaconnector.clients.aws.sqs.AWSSQSClient;
 import org.apache.camel.kafkaconnector.clients.kafka.KafkaClient;
+import org.apache.camel.kafkaconnector.services.aws.AWSService;
+import org.apache.camel.kafkaconnector.services.aws.AWSServiceFactory;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.RegisterExtension;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.testcontainers.containers.localstack.LocalStackContainer;
-import org.testcontainers.junit.jupiter.Container;
 import org.testcontainers.junit.jupiter.Testcontainers;
 
 import static org.junit.jupiter.api.Assertions.fail;
@@ -45,12 +46,10 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 
 @Testcontainers
 public class CamelSinkAWSSQSITCase extends AbstractKafkaTest {
-    private static final Logger LOG = LoggerFactory.getLogger(CamelSinkAWSSQSITCase.class);
-    private static final int SQS_PORT = 4576;
+    @RegisterExtension
+    public static AWSService<AWSSQSClient> awsService = AWSServiceFactory.createSQSService();
 
-    @Container
-    public LocalStackContainer localStackContainer = new LocalStackContainer()
-            .withServices(LocalStackContainer.Service.SQS);
+    private static final Logger LOG = LoggerFactory.getLogger(CamelSinkAWSSQSITCase.class);
 
     private AWSSQSClient awssqsClient;
 
@@ -59,13 +58,7 @@ public class CamelSinkAWSSQSITCase extends AbstractKafkaTest {
 
     @BeforeEach
     public void setUp() {
-        final String sqsInstance = localStackContainer
-                .getEndpointConfiguration(LocalStackContainer.Service.SQS)
-                .getServiceEndpoint();
-
-        LOG.info("SQS instance running at {}", sqsInstance);
-
-        awssqsClient = new AWSSQSClient(localStackContainer);
+        awssqsClient = awsService.getClient();
     }
 
     private boolean checkMessages(List<Message> messages) {
@@ -111,7 +104,7 @@ public class CamelSinkAWSSQSITCase extends AbstractKafkaTest {
     @Timeout(value = 120)
     public void testBasicSendReceive() {
         try {
-            Properties properties = ContainerUtil.setupAWSConfigs(localStackContainer, SQS_PORT);
+            Properties properties = awsService.getConnectionProperties();
 
             ConnectorPropertyFactory testProperties = new CamelAWSSQSPropertyFactory(1,
                     TestCommon.getDefaultTestTopic(this.getClass()), TestCommon.DEFAULT_SQS_QUEUE, properties);
diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/source/aws/kinesis/CamelAWSKinesisPropertyFactory.java b/tests/src/test/java/org/apache/camel/kafkaconnector/source/aws/kinesis/CamelAWSKinesisPropertyFactory.java
index df7559b..bea1d09 100644
--- a/tests/src/test/java/org/apache/camel/kafkaconnector/source/aws/kinesis/CamelAWSKinesisPropertyFactory.java
+++ b/tests/src/test/java/org/apache/camel/kafkaconnector/source/aws/kinesis/CamelAWSKinesisPropertyFactory.java
@@ -19,8 +19,8 @@ package org.apache.camel.kafkaconnector.source.aws.kinesis;
 
 import java.util.Properties;
 
-import org.apache.camel.kafkaconnector.AWSConfigs;
 import org.apache.camel.kafkaconnector.ConnectorPropertyFactory;
+import org.apache.camel.kafkaconnector.clients.aws.AWSConfigs;
 import org.apache.kafka.connect.runtime.ConnectorConfig;
 
 
diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/source/aws/kinesis/CamelSourceAWSKinesisITCase.java b/tests/src/test/java/org/apache/camel/kafkaconnector/source/aws/kinesis/CamelSourceAWSKinesisITCase.java
index 61e5a9b..e75acb4 100644
--- a/tests/src/test/java/org/apache/camel/kafkaconnector/source/aws/kinesis/CamelSourceAWSKinesisITCase.java
+++ b/tests/src/test/java/org/apache/camel/kafkaconnector/source/aws/kinesis/CamelSourceAWSKinesisITCase.java
@@ -25,27 +25,24 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
 import com.amazonaws.AmazonServiceException;
-import com.amazonaws.ClientConfiguration;
-import com.amazonaws.Protocol;
 import com.amazonaws.services.kinesis.AmazonKinesis;
-import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
 import com.amazonaws.services.kinesis.model.CreateStreamResult;
 import com.amazonaws.services.kinesis.model.PutRecordsRequest;
 import com.amazonaws.services.kinesis.model.PutRecordsRequestEntry;
 import com.amazonaws.services.kinesis.model.PutRecordsResult;
 import org.apache.camel.kafkaconnector.AbstractKafkaTest;
 import org.apache.camel.kafkaconnector.ConnectorPropertyFactory;
-import org.apache.camel.kafkaconnector.ContainerUtil;
 import org.apache.camel.kafkaconnector.TestCommon;
 import org.apache.camel.kafkaconnector.clients.kafka.KafkaClient;
+import org.apache.camel.kafkaconnector.services.aws.AWSService;
+import org.apache.camel.kafkaconnector.services.aws.AWSServiceFactory;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.RegisterExtension;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.testcontainers.containers.localstack.LocalStackContainer;
-import org.testcontainers.junit.jupiter.Container;
 import org.testcontainers.junit.jupiter.Testcontainers;
 
 import static org.junit.Assert.fail;
@@ -53,12 +50,10 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 
 @Testcontainers
 public class CamelSourceAWSKinesisITCase extends AbstractKafkaTest {
+    @RegisterExtension
+    public static AWSService<AmazonKinesis> service = AWSServiceFactory.createKinesisService();
+    
     private static final Logger LOG = LoggerFactory.getLogger(CamelSourceAWSKinesisITCase.class);
-    private static final int KINESIS_PORT = 4568;
-
-    @Container
-    public LocalStackContainer localStackContainer = new LocalStackContainer()
-            .withServices(LocalStackContainer.Service.KINESIS);
 
     private AmazonKinesis awsKinesisClient;
 
@@ -68,24 +63,7 @@ public class CamelSourceAWSKinesisITCase extends AbstractKafkaTest {
 
     @BeforeEach
     public void setUp() {
-        if (!localStackContainer.isRunning()) {
-            LOG.info("Kinesis is not running");
-        }
-        final String kinesisInstance = localStackContainer
-                .getEndpointConfiguration(LocalStackContainer.Service.KINESIS)
-                .getServiceEndpoint();
-
-        LOG.info("Kinesis instance running at {}", kinesisInstance);
-
-        ClientConfiguration clientConfiguration = new ClientConfiguration();
-        clientConfiguration.setProtocol(Protocol.HTTP);
-
-        awsKinesisClient = AmazonKinesisClientBuilder
-                .standard()
-                .withEndpointConfiguration(localStackContainer.getEndpointConfiguration(LocalStackContainer.Service.KINESIS))
-                .withCredentials(localStackContainer.getDefaultCredentialsProvider())
-                .withClientConfiguration(clientConfiguration)
-                .build();
+        awsKinesisClient = service.getClient();
     }
 
     private boolean checkRecord(ConsumerRecord<String, String> record) {
@@ -102,7 +80,7 @@ public class CamelSourceAWSKinesisITCase extends AbstractKafkaTest {
     @Test
     @Timeout(120)
     public void testBasicSendReceive() throws ExecutionException, InterruptedException {
-        Properties properties = ContainerUtil.setupAWSConfigs(localStackContainer, KINESIS_PORT);
+        Properties properties = service.getConnectionProperties();
 
         ConnectorPropertyFactory testProperties = new CamelAWSKinesisPropertyFactory(1,
                 TestCommon.getDefaultTestTopic(this.getClass()), TestCommon.DEFAULT_KINESIS_STREAM, properties);
diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/source/aws/kinesis/TestKinesisConfiguration.java b/tests/src/test/java/org/apache/camel/kafkaconnector/source/aws/kinesis/TestKinesisConfiguration.java
index 26939c8..b2a97be 100644
--- a/tests/src/test/java/org/apache/camel/kafkaconnector/source/aws/kinesis/TestKinesisConfiguration.java
+++ b/tests/src/test/java/org/apache/camel/kafkaconnector/source/aws/kinesis/TestKinesisConfiguration.java
@@ -17,64 +17,16 @@
 
 package org.apache.camel.kafkaconnector.source.aws.kinesis;
 
-import com.amazonaws.ClientConfiguration;
-import com.amazonaws.Protocol;
-import com.amazonaws.auth.AWSCredentials;
-import com.amazonaws.auth.AWSCredentialsProvider;
-import com.amazonaws.client.builder.AwsClientBuilder;
-import com.amazonaws.regions.Regions;
 import com.amazonaws.services.kinesis.AmazonKinesis;
-import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
 import org.apache.camel.component.aws.kinesis.KinesisConfiguration;
-import org.apache.camel.kafkaconnector.AWSConfigs;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.camel.kafkaconnector.services.aws.AWSClientUtils;
 
 public class TestKinesisConfiguration extends KinesisConfiguration {
-    private static final Logger LOG = LoggerFactory.getLogger(TestKinesisConfiguration.class);
-    private final String amazonHost;
-    private final String region;
-
     private AmazonKinesis amazonKinesis;
 
-    private class TestAWSCredentialsProvider implements AWSCredentialsProvider {
-        @Override
-        public AWSCredentials getCredentials() {
-            return new AWSCredentials() {
-                @Override
-                public String getAWSAccessKeyId() {
-                    return System.getProperty(AWSConfigs.ACCESS_KEY);
-                }
-
-                @Override
-                public String getAWSSecretKey() {
-                    return System.getProperty(AWSConfigs.SECRET_KEY);
-                }
-            };
-        }
-
-        @Override
-        public void refresh() {
-
-        }
-    }
-
-    public TestKinesisConfiguration() {
-        amazonHost = System.getProperty(AWSConfigs.AMAZON_AWS_HOST);
-        region = Regions.valueOf(System.getProperty(AWSConfigs.REGION)).getName();
-    }
 
     private AmazonKinesis buildClient() {
-        LOG.debug("Creating a new AWS Kinesis client");
-        ClientConfiguration clientConfiguration = new ClientConfiguration();
-        clientConfiguration.setProtocol(Protocol.HTTP);
-
-        return AmazonKinesisClientBuilder
-                .standard()
-                .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(amazonHost, region))
-                .withCredentials(new TestAWSCredentialsProvider())
-                .withClientConfiguration(clientConfiguration)
-                .build();
+        return AWSClientUtils.newKinesisClient();
     }
 
     @Override
diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/source/aws/s3/CamelAWSS3PropertyFactory.java b/tests/src/test/java/org/apache/camel/kafkaconnector/source/aws/s3/CamelAWSS3PropertyFactory.java
index 28fda93..c1ecc67 100644
--- a/tests/src/test/java/org/apache/camel/kafkaconnector/source/aws/s3/CamelAWSS3PropertyFactory.java
+++ b/tests/src/test/java/org/apache/camel/kafkaconnector/source/aws/s3/CamelAWSS3PropertyFactory.java
@@ -19,8 +19,8 @@ package org.apache.camel.kafkaconnector.source.aws.s3;
 
 import java.util.Properties;
 
-import org.apache.camel.kafkaconnector.AWSConfigs;
 import org.apache.camel.kafkaconnector.ConnectorPropertyFactory;
+import org.apache.camel.kafkaconnector.clients.aws.AWSConfigs;
 import org.apache.kafka.connect.runtime.ConnectorConfig;
 
 
diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/source/aws/s3/CamelSourceAWSS3ITCase.java b/tests/src/test/java/org/apache/camel/kafkaconnector/source/aws/s3/CamelSourceAWSS3ITCase.java
index 74164b9..afdaee5 100644
--- a/tests/src/test/java/org/apache/camel/kafkaconnector/source/aws/s3/CamelSourceAWSS3ITCase.java
+++ b/tests/src/test/java/org/apache/camel/kafkaconnector/source/aws/s3/CamelSourceAWSS3ITCase.java
@@ -21,35 +21,30 @@ import java.io.File;
 import java.util.Properties;
 import java.util.concurrent.ExecutionException;
 
-import com.amazonaws.ClientConfiguration;
-import com.amazonaws.Protocol;
 import com.amazonaws.services.s3.AmazonS3;
-import com.amazonaws.services.s3.AmazonS3ClientBuilder;
 import org.apache.camel.kafkaconnector.AbstractKafkaTest;
 import org.apache.camel.kafkaconnector.ConnectorPropertyFactory;
-import org.apache.camel.kafkaconnector.ContainerUtil;
 import org.apache.camel.kafkaconnector.TestCommon;
 import org.apache.camel.kafkaconnector.clients.kafka.KafkaClient;
+import org.apache.camel.kafkaconnector.services.aws.AWSService;
+import org.apache.camel.kafkaconnector.services.aws.AWSServiceFactory;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.RegisterExtension;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.testcontainers.containers.localstack.LocalStackContainer;
-import org.testcontainers.junit.jupiter.Container;
 import org.testcontainers.junit.jupiter.Testcontainers;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 @Testcontainers
 public class CamelSourceAWSS3ITCase extends AbstractKafkaTest {
-    private static final Logger LOG = LoggerFactory.getLogger(CamelSourceAWSS3ITCase.class);
-    private static final int S3_PORT = 4572;
+    @RegisterExtension
+    public static AWSService<AmazonS3> service = AWSServiceFactory.createS3Service();
 
-    @Container
-    public LocalStackContainer localStackContainer = new LocalStackContainer()
-            .withServices(LocalStackContainer.Service.S3);
+    private static final Logger LOG = LoggerFactory.getLogger(CamelSourceAWSS3ITCase.class);
 
     private AmazonS3 awsS3Client;
 
@@ -58,23 +53,7 @@ public class CamelSourceAWSS3ITCase extends AbstractKafkaTest {
 
     @BeforeEach
     public void setUp() {
-        final String s3Instance = localStackContainer
-                .getEndpointConfiguration(LocalStackContainer.Service.S3)
-                .getServiceEndpoint();
-
-        LOG.info("S3 instance running at {}", s3Instance);
-
-        ClientConfiguration clientConfiguration = new ClientConfiguration();
-        clientConfiguration.setProtocol(Protocol.HTTP);
-
-        awsS3Client = AmazonS3ClientBuilder
-                .standard()
-                .withEndpointConfiguration(localStackContainer.getEndpointConfiguration(LocalStackContainer.Service.S3))
-                .withCredentials(localStackContainer.getDefaultCredentialsProvider())
-                .withClientConfiguration(clientConfiguration)
-
-                .build();
-
+        awsS3Client = service.getClient();
     }
 
     private boolean checkRecord(ConsumerRecord<String, String> record) {
@@ -91,7 +70,7 @@ public class CamelSourceAWSS3ITCase extends AbstractKafkaTest {
     @Test
     @Timeout(180)
     public void testBasicSendReceive() throws ExecutionException, InterruptedException {
-        Properties properties = ContainerUtil.setupAWSConfigs(localStackContainer, S3_PORT);
+        Properties properties = service.getConnectionProperties();
 
         ConnectorPropertyFactory testProperties = new CamelAWSS3PropertyFactory(1,
                 TestCommon.getDefaultTestTopic(this.getClass()), TestCommon.DEFAULT_S3_BUCKET, properties);
diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/source/aws/s3/TestS3Configuration.java b/tests/src/test/java/org/apache/camel/kafkaconnector/source/aws/s3/TestS3Configuration.java
index 05779ff..e2e78bb 100644
--- a/tests/src/test/java/org/apache/camel/kafkaconnector/source/aws/s3/TestS3Configuration.java
+++ b/tests/src/test/java/org/apache/camel/kafkaconnector/source/aws/s3/TestS3Configuration.java
@@ -17,65 +17,16 @@
 
 package org.apache.camel.kafkaconnector.source.aws.s3;
 
-import com.amazonaws.ClientConfiguration;
-import com.amazonaws.Protocol;
-import com.amazonaws.auth.AWSCredentials;
-import com.amazonaws.auth.AWSCredentialsProvider;
-import com.amazonaws.client.builder.AwsClientBuilder;
-import com.amazonaws.regions.Regions;
 import com.amazonaws.services.s3.AmazonS3;
-import com.amazonaws.services.s3.AmazonS3ClientBuilder;
 import org.apache.camel.component.aws.s3.S3Configuration;
-import org.apache.camel.kafkaconnector.AWSConfigs;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.camel.kafkaconnector.services.aws.AWSClientUtils;
 
 public class TestS3Configuration extends S3Configuration {
-    private static final Logger LOG = LoggerFactory.getLogger(TestS3Configuration.class);
-    private final String amazonHost;
-    private final String region;
-
     private AmazonS3 amazonS3;
 
-    private class TestAWSCredentialsProvider implements AWSCredentialsProvider {
-        @Override
-        public AWSCredentials getCredentials() {
-            return new AWSCredentials() {
-                @Override
-                public String getAWSAccessKeyId() {
-                    return System.getProperty(AWSConfigs.ACCESS_KEY);
-                }
-
-                @Override
-                public String getAWSSecretKey() {
-                    return System.getProperty(AWSConfigs.SECRET_KEY);
-                }
-            };
-        }
-
-        @Override
-        public void refresh() {
-
-        }
-    }
-
-    public TestS3Configuration() {
-        amazonHost = System.getProperty(AWSConfigs.AMAZON_AWS_HOST);
-        region = Regions.valueOf(System.getProperty(AWSConfigs.REGION)).getName();
-    }
 
     private AmazonS3 buildClient() {
-        LOG.debug("Creating a new S3 client");
-        ClientConfiguration clientConfiguration = new ClientConfiguration();
-        clientConfiguration.setProtocol(Protocol.HTTP);
-
-        return AmazonS3ClientBuilder
-                .standard()
-                .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(amazonHost, region))
-                .withCredentials(new TestAWSCredentialsProvider())
-                .withClientConfiguration(clientConfiguration)
-                .withPathStyleAccessEnabled(true)
-                .build();
+        return AWSClientUtils.newS3Client();
     }
 
     @Override
diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/source/aws/sqs/CamelAWSSQSPropertyFactory.java b/tests/src/test/java/org/apache/camel/kafkaconnector/source/aws/sqs/CamelAWSSQSPropertyFactory.java
index 9b374a1..5cb5b0f 100644
--- a/tests/src/test/java/org/apache/camel/kafkaconnector/source/aws/sqs/CamelAWSSQSPropertyFactory.java
+++ b/tests/src/test/java/org/apache/camel/kafkaconnector/source/aws/sqs/CamelAWSSQSPropertyFactory.java
@@ -19,8 +19,9 @@ package org.apache.camel.kafkaconnector.source.aws.sqs;
 
 import java.util.Properties;
 
-import org.apache.camel.kafkaconnector.AWSConfigs;
+import com.amazonaws.regions.Regions;
 import org.apache.camel.kafkaconnector.ConnectorPropertyFactory;
+import org.apache.camel.kafkaconnector.clients.aws.AWSConfigs;
 import org.apache.kafka.connect.runtime.ConnectorConfig;
 
 
@@ -45,6 +46,7 @@ class CamelAWSSQSPropertyFactory implements ConnectorPropertyFactory {
     public Properties getProperties() {
         Properties connectorProps = new Properties();
         connectorProps.put(ConnectorConfig.NAME_CONFIG, "CamelAWSSQSSourceConnector");
+        connectorProps.put("tasks.max", String.valueOf(tasksMax));
 
         connectorProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, "org.apache.camel.kafkaconnector.CamelSourceConnector");
         connectorProps.put(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
@@ -52,10 +54,25 @@ class CamelAWSSQSPropertyFactory implements ConnectorPropertyFactory {
 
         connectorProps.put("camel.source.kafka.topic", topic);
 
-        String queueUrl = "aws-sqs://" + queue + "?autoCreateQueue=true&accessKey=accesskey&secretKey=secretKey&region=EU_WEST_1&protocol=http&amazonAWSHost="
-                + amazonConfigs.getProperty(AWSConfigs.AMAZON_AWS_HOST, "localhost");
-        connectorProps.put("camel.source.url", queueUrl);
+        String accessKey = amazonConfigs.getProperty(AWSConfigs.ACCESS_KEY, "");
+        String secretKey = amazonConfigs.getProperty(AWSConfigs.SECRET_KEY, "");
+
+        String region = amazonConfigs.getProperty(AWSConfigs.REGION, Regions.US_EAST_1.name());
+
+        String queueUrl = String.format("aws-sqs://%s?autoCreateQueue=true&accessKey=%s&secretKey=%s&region=%s",
+                queue, accessKey, secretKey, region);
 
+        String protocol = amazonConfigs.getProperty(AWSConfigs.PROTOCOL);
+        if (protocol != null && !protocol.isEmpty()) {
+            queueUrl = String.format("%s&protocol=%s", queueUrl, protocol);
+        }
+
+        String amazonAWSHost = amazonConfigs.getProperty(AWSConfigs.AMAZON_AWS_HOST);
+        if (amazonAWSHost != null && !amazonAWSHost.isEmpty()) {
+            queueUrl = String.format("%s&amazonAWSHost=%s", queueUrl, amazonAWSHost);
+        }
+
+        connectorProps.put("camel.source.url", queueUrl);
 
         connectorProps.put("camel.component.aws-sqs.configuration.access-key",
                 amazonConfigs.getProperty(AWSConfigs.ACCESS_KEY, ""));
diff --git a/tests/src/test/java/org/apache/camel/kafkaconnector/source/aws/sqs/CamelSourceAWSSQSITCase.java b/tests/src/test/java/org/apache/camel/kafkaconnector/source/aws/sqs/CamelSourceAWSSQSITCase.java
index 9d22cc5..c8e6772 100644
--- a/tests/src/test/java/org/apache/camel/kafkaconnector/source/aws/sqs/CamelSourceAWSSQSITCase.java
+++ b/tests/src/test/java/org/apache/camel/kafkaconnector/source/aws/sqs/CamelSourceAWSSQSITCase.java
@@ -22,30 +22,29 @@ import java.util.concurrent.ExecutionException;
 
 import org.apache.camel.kafkaconnector.AbstractKafkaTest;
 import org.apache.camel.kafkaconnector.ConnectorPropertyFactory;
-import org.apache.camel.kafkaconnector.ContainerUtil;
 import org.apache.camel.kafkaconnector.TestCommon;
+import org.apache.camel.kafkaconnector.clients.aws.AWSConfigs;
 import org.apache.camel.kafkaconnector.clients.aws.sqs.AWSSQSClient;
 import org.apache.camel.kafkaconnector.clients.kafka.KafkaClient;
+import org.apache.camel.kafkaconnector.services.aws.AWSService;
+import org.apache.camel.kafkaconnector.services.aws.AWSServiceFactory;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.RegisterExtension;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.testcontainers.containers.localstack.LocalStackContainer;
-import org.testcontainers.junit.jupiter.Container;
 import org.testcontainers.junit.jupiter.Testcontainers;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 @Testcontainers
 public class CamelSourceAWSSQSITCase extends AbstractKafkaTest {
-    private static final Logger LOG = LoggerFactory.getLogger(CamelSourceAWSSQSITCase.class);
-    private static final int SQS_PORT = 4576;
+    @RegisterExtension
+    public static AWSService<AWSSQSClient> service = AWSServiceFactory.createSQSService();
 
-    @Container
-    public LocalStackContainer localStackContainer = new LocalStackContainer()
-            .withServices(LocalStackContainer.Service.SQS);
+    private static final Logger LOG = LoggerFactory.getLogger(CamelSourceAWSSQSITCase.class);
 
     private AWSSQSClient awssqsClient;
 
@@ -54,13 +53,7 @@ public class CamelSourceAWSSQSITCase extends AbstractKafkaTest {
 
     @BeforeEach
     public void setUp() {
-        final String sqsInstance = localStackContainer
-                .getEndpointConfiguration(LocalStackContainer.Service.SQS)
-                .getServiceEndpoint();
-
-        LOG.info("SQS instance running at {}", sqsInstance);
-
-        awssqsClient = new AWSSQSClient(localStackContainer);
+        awssqsClient = service.getClient();
     }
 
     private boolean checkRecord(ConsumerRecord<String, String> record) {
@@ -77,7 +70,7 @@ public class CamelSourceAWSSQSITCase extends AbstractKafkaTest {
     @Test
     @Timeout(90)
     public void testBasicSendReceive() throws ExecutionException, InterruptedException {
-        Properties properties = ContainerUtil.setupAWSConfigs(localStackContainer, SQS_PORT);
+        Properties properties = service.getConnectionProperties();
 
         ConnectorPropertyFactory testProperties = new CamelAWSSQSPropertyFactory(1,
                 TestCommon.getDefaultTestTopic(this.getClass()), TestCommon.DEFAULT_SQS_QUEUE,
@@ -87,7 +80,7 @@ public class CamelSourceAWSSQSITCase extends AbstractKafkaTest {
 
         LOG.debug("Sending SQS messages");
         for (int i = 0; i < expect; i++) {
-            awssqsClient.send(TestCommon.DEFAULT_SQS_QUEUE, "Test message " + i);
+            awssqsClient.send(TestCommon.DEFAULT_SQS_QUEUE, "Source test message " + i);
         }
         LOG.debug("Done sending SQS messages");