You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2020/02/27 17:27:45 UTC

[camel] 06/13: CAMEL-14520 - Create an AWS-Kinesis component based on SDK v2, fixed CS

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 3c4c288e0bae52b8161e90df4e412c48e3a108d2
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Thu Feb 27 17:14:33 2020 +0100

    CAMEL-14520 - Create an AWS-Kinesis component based on SDK v2, fixed CS
---
 .../aws2/firehose/KinesisFirehose2Component.java   | 14 ++++----
 .../firehose/KinesisFirehose2Configuration.java    | 10 +++---
 .../aws2/firehose/KinesisFirehose2Endpoint.java    | 19 ++++++-----
 .../aws2/firehose/KinesisFirehose2Producer.java    |  5 ++-
 .../component/aws2/kinesis/Kinesis2Component.java  | 14 ++++----
 .../aws2/kinesis/Kinesis2Configuration.java        | 16 +++++-----
 .../component/aws2/kinesis/Kinesis2Constants.java  |  3 +-
 .../component/aws2/kinesis/Kinesis2Consumer.java   | 26 +++++++--------
 .../component/aws2/kinesis/Kinesis2Endpoint.java   | 14 ++++----
 .../component/aws2/kinesis/Kinesis2Producer.java   |  4 +--
 .../kinesis/Kinesis2ShardClosedStrategyEnum.java   |  4 +--
 .../KinesisFirehoseComponentConfigurationTest.java | 24 +++++++-------
 .../aws2/firehose/KinesisFirehoseEndpointTest.java | 12 +++----
 .../KinesisFirehoseComponentIntegrationTest.java   |  6 ++--
 .../kinesis/KinesisComponentConfigurationTest.java | 21 ++++++------
 .../KinesisConsumerClosedShardWithFailTest.java    |  1 -
 .../KinesisConsumerClosedShardWithSilentTest.java  | 37 +++++-----------------
 .../aws2/kinesis/KinesisEndpointTest.java          | 29 ++++-------------
 .../aws2/kinesis/RecordStringConverterTest.java    |  4 +--
 19 files changed, 109 insertions(+), 154 deletions(-)

diff --git a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Component.java b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Component.java
index a9a25b9..61e716c 100644
--- a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Component.java
+++ b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Component.java
@@ -38,16 +38,16 @@ public class KinesisFirehose2Component extends DefaultComponent {
     private String secretKey;
     @Metadata
     private String region;
-    @Metadata(label = "advanced")    
+    @Metadata(label = "advanced")
     private KinesisFirehose2Configuration configuration;
-    
+
     public KinesisFirehose2Component() {
         this(null);
     }
 
     public KinesisFirehose2Component(CamelContext context) {
         super(context);
-        
+
         registerExtension(new KinesisFirehose2ComponentVerifierExtension());
     }
 
@@ -66,7 +66,7 @@ public class KinesisFirehose2Component extends DefaultComponent {
         }
         return endpoint;
     }
-    
+
     public KinesisFirehose2Configuration getConfiguration() {
         return configuration;
     }
@@ -77,7 +77,7 @@ public class KinesisFirehose2Component extends DefaultComponent {
     public void setConfiguration(KinesisFirehose2Configuration configuration) {
         this.configuration = configuration;
     }
-    
+
     public String getAccessKey() {
         return accessKey;
     }
@@ -99,7 +99,7 @@ public class KinesisFirehose2Component extends DefaultComponent {
     public void setSecretKey(String secretKey) {
         this.secretKey = secretKey;
     }
-    
+
     public String getRegion() {
         return region;
     }
@@ -110,7 +110,7 @@ public class KinesisFirehose2Component extends DefaultComponent {
     public void setRegion(String region) {
         this.region = region;
     }
-    
+
     private void checkAndSetRegistryClient(KinesisFirehose2Configuration configuration) {
         Set<FirehoseClient> clients = getCamelContext().getRegistry().findByType(FirehoseClient.class);
         if (clients.size() == 1) {
diff --git a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Configuration.java b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Configuration.java
index 82cab1f..67724ba 100644
--- a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Configuration.java
+++ b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Configuration.java
@@ -36,8 +36,8 @@ public class KinesisFirehose2Configuration implements Cloneable {
     private String accessKey;
     @UriParam(label = "security", secret = true, description = "Amazon AWS Secret Key")
     private String secretKey;
-    @UriParam(description = "The region in which Kinesis client needs to work. When using this parameter, the configuration will expect the capitalized name of the region (for example AP_EAST_1)" 
-              + "You'll need to use the name Regions.EU_WEST_1.name()")
+    @UriParam(description = "The region in which Kinesis client needs to work. When using this parameter, the configuration will expect the capitalized name of the region (for example AP_EAST_1)"
+                            + "You'll need to use the name Regions.EU_WEST_1.name()")
     private String region;
     @UriParam(description = "Amazon Kinesis Firehose client to use for all requests for this endpoint")
     private FirehoseClient amazonKinesisFirehoseClient;
@@ -47,7 +47,7 @@ public class KinesisFirehose2Configuration implements Cloneable {
     private String proxyHost;
     @UriParam(description = "To define a proxy port when instantiating the Kinesis Firehose client")
     private Integer proxyPort;
-    
+
     public void setAmazonKinesisFirehoseClient(FirehoseClient client) {
         this.amazonKinesisFirehoseClient = client;
     }
@@ -94,7 +94,7 @@ public class KinesisFirehose2Configuration implements Cloneable {
 
     public void setProxyProtocol(Protocol proxyProtocol) {
         this.proxyProtocol = proxyProtocol;
-    }    
+    }
 
     public String getProxyHost() {
         return proxyHost;
@@ -111,7 +111,7 @@ public class KinesisFirehose2Configuration implements Cloneable {
     public void setProxyPort(Integer proxyPort) {
         this.proxyPort = proxyPort;
     }
-    
+
     // *************************************************
     //
     // *************************************************
diff --git a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Endpoint.java b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Endpoint.java
index 8a85366..7fd05f7 100644
--- a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Endpoint.java
+++ b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Endpoint.java
@@ -37,15 +37,15 @@ import software.amazon.awssdk.services.kinesis.KinesisClient;
 import software.amazon.awssdk.services.kinesis.KinesisClientBuilder;
 
 /**
- * The aws-kinesis-firehose component is used for producing Amazon's Kinesis Firehose streams.
+ * The aws-kinesis-firehose component is used for producing Amazon's Kinesis
+ * Firehose streams.
  */
-@UriEndpoint(firstVersion = "3.2.0", scheme = "aws2-kinesis-firehose", title = "AWS 2 Kinesis Firehose", syntax = "aws2-kinesis-firehose:streamName",
-    producerOnly = true, label = "cloud,messaging")
+@UriEndpoint(firstVersion = "3.2.0", scheme = "aws2-kinesis-firehose", title = "AWS 2 Kinesis Firehose", syntax = "aws2-kinesis-firehose:streamName", producerOnly = true, label = "cloud,messaging")
 public class KinesisFirehose2Endpoint extends DefaultEndpoint {
 
     @UriParam
     private KinesisFirehose2Configuration configuration;
-    
+
     private FirehoseClient kinesisFirehoseClient;
 
     public KinesisFirehose2Endpoint(String uri, KinesisFirehose2Configuration configuration, KinesisFirehose2Component component) {
@@ -62,15 +62,14 @@ public class KinesisFirehose2Endpoint extends DefaultEndpoint {
     public Consumer createConsumer(Processor processor) throws Exception {
         throw new UnsupportedOperationException("You cannot consume messages from this endpoint");
     }
-    
+
     @Override
     protected void doStart() throws Exception {
         super.doStart();
-        kinesisFirehoseClient = configuration.getAmazonKinesisFirehoseClient() != null ? configuration.getAmazonKinesisFirehoseClient()
-            : createKinesisFirehoseClient();
-               
+        kinesisFirehoseClient = configuration.getAmazonKinesisFirehoseClient() != null ? configuration.getAmazonKinesisFirehoseClient() : createKinesisFirehoseClient();
+
     }
-    
+
     @Override
     public void doStop() throws Exception {
         if (ObjectHelper.isEmpty(configuration.getAmazonKinesisFirehoseClient())) {
@@ -80,7 +79,7 @@ public class KinesisFirehose2Endpoint extends DefaultEndpoint {
         }
         super.doStop();
     }
-    
+
     FirehoseClient createKinesisFirehoseClient() {
         FirehoseClient client = null;
         FirehoseClientBuilder clientBuilder = FirehoseClient.builder();
diff --git a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Producer.java b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Producer.java
index 7f26ec8..0fe76b1 100644
--- a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Producer.java
+++ b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Producer.java
@@ -18,7 +18,6 @@ package org.apache.camel.component.aws2.firehose;
 
 import java.nio.ByteBuffer;
 
-
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.support.DefaultProducer;
@@ -40,7 +39,7 @@ public class KinesisFirehose2Producer extends DefaultProducer {
 
     @Override
     public KinesisFirehose2Endpoint getEndpoint() {
-        return (KinesisFirehose2Endpoint) super.getEndpoint();
+        return (KinesisFirehose2Endpoint)super.getEndpoint();
     }
 
     @Override
@@ -63,7 +62,7 @@ public class KinesisFirehose2Producer extends DefaultProducer {
         putRecordRequest.record(record.build());
         return putRecordRequest.build();
     }
-    
+
     public static Message getMessageForResponse(final Exchange exchange) {
         return exchange.getMessage();
     }
diff --git a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Component.java b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Component.java
index 36dee4f..a4ec5da 100644
--- a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Component.java
+++ b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Component.java
@@ -36,7 +36,7 @@ public class Kinesis2Component extends DefaultComponent {
     private String secretKey;
     @Metadata
     private String region;
-    @Metadata(label = "advanced")    
+    @Metadata(label = "advanced")
     private Kinesis2Configuration configuration;
 
     public Kinesis2Component() {
@@ -45,7 +45,7 @@ public class Kinesis2Component extends DefaultComponent {
 
     public Kinesis2Component(CamelContext context) {
         super(context);
-        
+
         registerExtension(new Kinesis2ComponentVerifierExtension());
     }
 
@@ -61,10 +61,10 @@ public class Kinesis2Component extends DefaultComponent {
         checkAndSetRegistryClient(configuration);
         if (configuration.getAmazonKinesisClient() == null && (configuration.getAccessKey() == null || configuration.getSecretKey() == null)) {
             throw new IllegalArgumentException("amazonKinesisClient or accessKey and secretKey must be specified");
-        }        
+        }
         return endpoint;
     }
-    
+
     public Kinesis2Configuration getConfiguration() {
         return configuration;
     }
@@ -75,7 +75,7 @@ public class Kinesis2Component extends DefaultComponent {
     public void setConfiguration(Kinesis2Configuration configuration) {
         this.configuration = configuration;
     }
-    
+
     public String getAccessKey() {
         return accessKey;
     }
@@ -97,7 +97,7 @@ public class Kinesis2Component extends DefaultComponent {
     public void setSecretKey(String secretKey) {
         this.secretKey = secretKey;
     }
-    
+
     public String getRegion() {
         return region;
     }
@@ -108,7 +108,7 @@ public class Kinesis2Component extends DefaultComponent {
     public void setRegion(String region) {
         this.region = region;
     }
-    
+
     private void checkAndSetRegistryClient(Kinesis2Configuration configuration) {
         Set<KinesisClient> clients = getCamelContext().getRegistry().findByType(KinesisClient.class);
         if (clients.size() == 1) {
diff --git a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Configuration.java b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Configuration.java
index 575f057..0dca1a2 100644
--- a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Configuration.java
+++ b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Configuration.java
@@ -28,7 +28,7 @@ import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
 
 @UriParams
 public class Kinesis2Configuration implements Cloneable {
-    
+
     @UriPath(description = "Name of the stream")
     @Metadata(required = true)
     private String streamName;
@@ -36,8 +36,8 @@ public class Kinesis2Configuration implements Cloneable {
     private String accessKey;
     @UriParam(label = "security", secret = true, description = "Amazon AWS Secret Key")
     private String secretKey;
-    @UriParam(description = "The region in which Kinesis client needs to work. When using this parameter, the configuration will expect the capitalized name of the region (for example AP_EAST_1)" 
-              + "You'll need to use the name Regions.EU_WEST_1.name()")
+    @UriParam(description = "The region in which Kinesis client needs to work. When using this parameter, the configuration will expect the capitalized name of the region (for example AP_EAST_1)"
+                            + "You'll need to use the name Regions.EU_WEST_1.name()")
     private String region;
     @UriParam(description = "Amazon Kinesis client to use for all requests for this endpoint")
     private KinesisClient amazonKinesisClient;
@@ -116,7 +116,7 @@ public class Kinesis2Configuration implements Cloneable {
     public void setShardClosed(Kinesis2ShardClosedStrategyEnum shardClosed) {
         this.shardClosed = shardClosed;
     }
-    
+
     public String getAccessKey() {
         return accessKey;
     }
@@ -140,14 +140,14 @@ public class Kinesis2Configuration implements Cloneable {
     public void setRegion(String region) {
         this.region = region;
     }
-    
+
     public Protocol getProxyProtocol() {
         return proxyProtocol;
     }
 
     public void setProxyProtocol(Protocol proxyProtocol) {
         this.proxyProtocol = proxyProtocol;
-    }    
+    }
 
     public String getProxyHost() {
         return proxyHost;
@@ -163,8 +163,8 @@ public class Kinesis2Configuration implements Cloneable {
 
     public void setProxyPort(Integer proxyPort) {
         this.proxyPort = proxyPort;
-    }   
-    
+    }
+
     // *************************************************
     //
     // *************************************************
diff --git a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Constants.java b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Constants.java
index b09aac9..d6e4f56 100644
--- a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Constants.java
+++ b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Constants.java
@@ -23,7 +23,8 @@ public interface Kinesis2Constants {
     String PARTITION_KEY = "CamelAwsKinesisPartitionKey";
 
     /**
-     * in a Kinesis Record object, the shard ID is used on writes to indicate where the data was stored
+     * in a Kinesis Record object, the shard ID is used on writes to indicate
+     * where the data was stored
      */
     String SHARD_ID = "CamelAwsKinesisShardId";
 }
diff --git a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
index e080083..0ad92fa 100644
--- a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
+++ b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
@@ -67,16 +67,16 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer {
         currentShardIterator = result.nextShardIterator();
         if (isShardClosed) {
             switch (getEndpoint().getConfiguration().getShardClosed()) {
-                case ignore:
-                    LOG.warn("The shard {} is in closed state", currentShardIterator);
-                    break;
-                case silent:
-                    break;
-                case fail:
-                    LOG.info("Shard Iterator reaches CLOSE status:{} {}", getEndpoint().getConfiguration().getStreamName(), getEndpoint().getConfiguration().getShardId());
-                    throw new ReachedClosedStatusException(getEndpoint().getConfiguration().getStreamName(), getEndpoint().getConfiguration().getShardId());
-                default:
-                    throw new IllegalArgumentException("Unsupported shard closed strategy");
+            case ignore:
+                LOG.warn("The shard {} is in closed state", currentShardIterator);
+                break;
+            case silent:
+                break;
+            case fail:
+                LOG.info("Shard Iterator reaches CLOSE status:{} {}", getEndpoint().getConfiguration().getStreamName(), getEndpoint().getConfiguration().getShardId());
+                throw new ReachedClosedStatusException(getEndpoint().getConfiguration().getStreamName(), getEndpoint().getConfiguration().getShardId());
+            default:
+                throw new IllegalArgumentException("Unsupported shard closed strategy");
             }
         }
 
@@ -136,7 +136,7 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer {
             LOG.debug("ShardId is: {}", shardId);
 
             GetShardIteratorRequest.Builder req = GetShardIteratorRequest.builder().streamName(getEndpoint().getConfiguration().getStreamName()).shardId(shardId)
-                    .shardIteratorType(getEndpoint().getConfiguration().getIteratorType());
+                .shardIteratorType(getEndpoint().getConfiguration().getIteratorType());
 
             if (hasSequenceNumber()) {
                 req.startingSequenceNumber(getEndpoint().getConfiguration().getSequenceNumber());
@@ -159,7 +159,7 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer {
 
     private boolean hasSequenceNumber() {
         return !getEndpoint().getConfiguration().getSequenceNumber().isEmpty()
-                && (getEndpoint().getConfiguration().getIteratorType().equals(ShardIteratorType.AFTER_SEQUENCE_NUMBER)
-                || getEndpoint().getConfiguration().getIteratorType().equals(ShardIteratorType.AT_SEQUENCE_NUMBER));
+               && (getEndpoint().getConfiguration().getIteratorType().equals(ShardIteratorType.AFTER_SEQUENCE_NUMBER)
+                   || getEndpoint().getConfiguration().getIteratorType().equals(ShardIteratorType.AT_SEQUENCE_NUMBER));
     }
 }
diff --git a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Endpoint.java b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Endpoint.java
index 187efbd..e9cc6e3 100644
--- a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Endpoint.java
+++ b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Endpoint.java
@@ -46,9 +46,9 @@ public class Kinesis2Endpoint extends ScheduledPollEndpoint {
 
     @UriParam
     private Kinesis2Configuration configuration;
-    
+
     private KinesisClient kinesisClient;
-    
+
     public Kinesis2Endpoint(String uri, Kinesis2Configuration configuration, Kinesis2Component component) {
         super(uri, component);
         this.configuration = configuration;
@@ -57,16 +57,14 @@ public class Kinesis2Endpoint extends ScheduledPollEndpoint {
     @Override
     protected void doStart() throws Exception {
         super.doStart();
-        kinesisClient = configuration.getAmazonKinesisClient() != null ? configuration.getAmazonKinesisClient()
-            : createKinesisClient();
-       
-        
+        kinesisClient = configuration.getAmazonKinesisClient() != null ? configuration.getAmazonKinesisClient() : createKinesisClient();
+
         if ((configuration.getIteratorType().equals(ShardIteratorType.AFTER_SEQUENCE_NUMBER) || configuration.getIteratorType().equals(ShardIteratorType.AT_SEQUENCE_NUMBER))
             && configuration.getSequenceNumber().isEmpty()) {
             throw new IllegalArgumentException("Sequence Number must be specified with iterator Types AFTER_SEQUENCE_NUMBER or AT_SEQUENCE_NUMBER");
         }
     }
-    
+
     @Override
     public void doStop() throws Exception {
         if (ObjectHelper.isEmpty(configuration.getAmazonKinesisClient())) {
@@ -106,7 +104,7 @@ public class Kinesis2Endpoint extends ScheduledPollEndpoint {
     public Kinesis2Configuration getConfiguration() {
         return configuration;
     }
-    
+
     KinesisClient createKinesisClient() {
         KinesisClient client = null;
         KinesisClientBuilder clientBuilder = KinesisClient.builder();
diff --git a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Producer.java b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Producer.java
index 56a3d96..5c10228 100644
--- a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Producer.java
+++ b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Producer.java
@@ -34,7 +34,7 @@ public class Kinesis2Producer extends DefaultProducer {
 
     @Override
     public Kinesis2Endpoint getEndpoint() {
-        return (Kinesis2Endpoint) super.getEndpoint();
+        return (Kinesis2Endpoint)super.getEndpoint();
     }
 
     @Override
@@ -60,7 +60,7 @@ public class Kinesis2Producer extends DefaultProducer {
         }
         return putRecordRequest.build();
     }
-    
+
     public static Message getMessageForResponse(final Exchange exchange) {
         return exchange.getMessage();
     }
diff --git a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2ShardClosedStrategyEnum.java b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2ShardClosedStrategyEnum.java
index 166334e..c98a7e0 100644
--- a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2ShardClosedStrategyEnum.java
+++ b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2ShardClosedStrategyEnum.java
@@ -18,7 +18,5 @@ package org.apache.camel.component.aws2.kinesis;
 
 public enum Kinesis2ShardClosedStrategyEnum {
 
-    ignore,
-    fail,
-    silent
+    ignore, fail, silent
 }
diff --git a/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/firehose/KinesisFirehoseComponentConfigurationTest.java b/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/firehose/KinesisFirehoseComponentConfigurationTest.java
index 7d4b2df..80fa545 100644
--- a/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/firehose/KinesisFirehoseComponentConfigurationTest.java
+++ b/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/firehose/KinesisFirehoseComponentConfigurationTest.java
@@ -25,51 +25,53 @@ import software.amazon.awssdk.core.Protocol;
 import software.amazon.awssdk.regions.Region;
 
 public class KinesisFirehoseComponentConfigurationTest extends CamelTestSupport {
-    
+
     @Test
     public void createEndpointWithAccessAndSecretKey() throws Exception {
         KinesisFirehose2Component component = context.getComponent("aws2-kinesis-firehose", KinesisFirehose2Component.class);
         KinesisFirehose2Endpoint endpoint = (KinesisFirehose2Endpoint)component.createEndpoint("aws2-kinesis-firehose://some_stream_name?accessKey=xxxxx&secretKey=yyyyy");
-        
+
         assertEquals("some_stream_name", endpoint.getConfiguration().getStreamName());
         assertEquals("xxxxx", endpoint.getConfiguration().getAccessKey());
-        assertEquals("yyyyy", endpoint.getConfiguration().getSecretKey());    
+        assertEquals("yyyyy", endpoint.getConfiguration().getSecretKey());
     }
-    
+
     @Test
     public void createEndpointWithComponentElements() throws Exception {
         KinesisFirehose2Component component = context.getComponent("aws2-kinesis-firehose", KinesisFirehose2Component.class);
         component.setAccessKey("XXX");
         component.setSecretKey("YYY");
         KinesisFirehose2Endpoint endpoint = (KinesisFirehose2Endpoint)component.createEndpoint("aws2-kinesis-firehose://some_stream_name");
-        
+
         assertEquals("some_stream_name", endpoint.getConfiguration().getStreamName());
         assertEquals("XXX", endpoint.getConfiguration().getAccessKey());
         assertEquals("YYY", endpoint.getConfiguration().getSecretKey());
     }
-    
+
     @Test
     public void createEndpointWithComponentAndEndpointElements() throws Exception {
         KinesisFirehose2Component component = context.getComponent("aws2-kinesis-firehose", KinesisFirehose2Component.class);
         component.setAccessKey("XXX");
         component.setSecretKey("YYY");
         component.setRegion(Region.US_WEST_1.toString());
-        KinesisFirehose2Endpoint endpoint = (KinesisFirehose2Endpoint)component.createEndpoint("aws2-kinesis-firehose://some_stream_name?accessKey=xxxxxx&secretKey=yyyyy&region=US_EAST_1");
-        
+        KinesisFirehose2Endpoint endpoint = (KinesisFirehose2Endpoint)component
+            .createEndpoint("aws2-kinesis-firehose://some_stream_name?accessKey=xxxxxx&secretKey=yyyyy&region=US_EAST_1");
+
         assertEquals("some_stream_name", endpoint.getConfiguration().getStreamName());
         assertEquals("xxxxxx", endpoint.getConfiguration().getAccessKey());
         assertEquals("yyyyy", endpoint.getConfiguration().getSecretKey());
         assertEquals("US_EAST_1", endpoint.getConfiguration().getRegion());
     }
-    
+
     @Test
     public void createEndpointWithComponentEndpointElementsAndProxy() throws Exception {
         KinesisFirehose2Component component = context.getComponent("aws2-kinesis-firehose", KinesisFirehose2Component.class);
         component.setAccessKey("XXX");
         component.setSecretKey("YYY");
         component.setRegion(Region.US_WEST_1.toString());
-        KinesisFirehose2Endpoint endpoint = (KinesisFirehose2Endpoint)component.createEndpoint("aws2-kinesis-firehose://label?accessKey=xxxxxx&secretKey=yyyyy&region=US_EAST_1&proxyHost=localhost&proxyPort=9000&proxyProtocol=HTTP");
-        
+        KinesisFirehose2Endpoint endpoint = (KinesisFirehose2Endpoint)component
+            .createEndpoint("aws2-kinesis-firehose://label?accessKey=xxxxxx&secretKey=yyyyy&region=US_EAST_1&proxyHost=localhost&proxyPort=9000&proxyProtocol=HTTP");
+
         assertEquals("xxxxxx", endpoint.getConfiguration().getAccessKey());
         assertEquals("yyyyy", endpoint.getConfiguration().getSecretKey());
         assertEquals("US_EAST_1", endpoint.getConfiguration().getRegion());
diff --git a/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/firehose/KinesisFirehoseEndpointTest.java b/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/firehose/KinesisFirehoseEndpointTest.java
index c3afb12..d3c6477 100644
--- a/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/firehose/KinesisFirehoseEndpointTest.java
+++ b/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/firehose/KinesisFirehoseEndpointTest.java
@@ -49,20 +49,18 @@ public class KinesisFirehoseEndpointTest {
 
     @Test
     public void allEndpointParams() throws Exception {
-        KinesisFirehose2Endpoint endpoint = (KinesisFirehose2Endpoint) camelContext.getEndpoint("aws2-kinesis-firehose://some_stream_name"
-                + "?amazonKinesisFirehoseClient=#firehoseClient"
-        );
+        KinesisFirehose2Endpoint endpoint = (KinesisFirehose2Endpoint)camelContext
+            .getEndpoint("aws2-kinesis-firehose://some_stream_name" + "?amazonKinesisFirehoseClient=#firehoseClient");
         endpoint.start();
 
         assertThat(endpoint.getClient(), is(amazonKinesisFirehoseClient));
         assertThat(endpoint.getConfiguration().getStreamName(), is("some_stream_name"));
     }
-    
+
     @Test
     public void allClientCreationParams() throws Exception {
-        KinesisFirehose2Endpoint endpoint = (KinesisFirehose2Endpoint) camelContext.getEndpoint("aws2-kinesis-firehose://some_stream_name"
-                + "?accessKey=xxx&secretKey=yyy&region=us-east-1"
-        );
+        KinesisFirehose2Endpoint endpoint = (KinesisFirehose2Endpoint)camelContext
+            .getEndpoint("aws2-kinesis-firehose://some_stream_name" + "?accessKey=xxx&secretKey=yyy&region=us-east-1");
 
         assertThat(endpoint.getConfiguration().getRegion(), is(Region.US_EAST_1.id()));
         assertThat(endpoint.getConfiguration().getAccessKey(), is("xxx"));
diff --git a/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/firehose/integration/KinesisFirehoseComponentIntegrationTest.java b/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/firehose/integration/KinesisFirehoseComponentIntegrationTest.java
index 1240902..3bac64a9 100644
--- a/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/firehose/integration/KinesisFirehoseComponentIntegrationTest.java
+++ b/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/firehose/integration/KinesisFirehoseComponentIntegrationTest.java
@@ -33,7 +33,7 @@ public class KinesisFirehoseComponentIntegrationTest extends CamelTestSupport {
 
     @BindToRegistry("FirehoseClient")
     FirehoseClient client = FirehoseClient.builder().build();
-    
+
     @Test
     public void testFirehoseRouting() throws Exception {
         Exchange exchange = template.send("direct:start", ExchangePattern.InOnly, new Processor() {
@@ -49,10 +49,8 @@ public class KinesisFirehoseComponentIntegrationTest extends CamelTestSupport {
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                from("direct:start")
-                        .to("aws2-kinesis-firehose://mystream?amazonKinesisFirehoseClient=#FirehoseClient");
+                from("direct:start").to("aws2-kinesis-firehose://mystream?amazonKinesisFirehoseClient=#FirehoseClient");
             }
         };
     }
 }
-
diff --git a/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisComponentConfigurationTest.java b/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisComponentConfigurationTest.java
index 25dc454..b6e47c4 100644
--- a/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisComponentConfigurationTest.java
+++ b/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisComponentConfigurationTest.java
@@ -25,29 +25,29 @@ import software.amazon.awssdk.core.Protocol;
 import software.amazon.awssdk.regions.Region;
 
 public class KinesisComponentConfigurationTest extends CamelTestSupport {
-    
+
     @Test
     public void createEndpointWithAccessAndSecretKey() throws Exception {
         Kinesis2Component component = context.getComponent("aws2-kinesis", Kinesis2Component.class);
         Kinesis2Endpoint endpoint = (Kinesis2Endpoint)component.createEndpoint("aws2-kinesis://some_stream_name?accessKey=xxxxx&secretKey=yyyyy");
-        
+
         assertEquals("some_stream_name", endpoint.getConfiguration().getStreamName());
         assertEquals("xxxxx", endpoint.getConfiguration().getAccessKey());
-        assertEquals("yyyyy", endpoint.getConfiguration().getSecretKey());    
+        assertEquals("yyyyy", endpoint.getConfiguration().getSecretKey());
     }
-    
+
     @Test
     public void createEndpointWithComponentElements() throws Exception {
         Kinesis2Component component = context.getComponent("aws2-kinesis", Kinesis2Component.class);
         component.setAccessKey("XXX");
         component.setSecretKey("YYY");
         Kinesis2Endpoint endpoint = (Kinesis2Endpoint)component.createEndpoint("aws2-kinesis://some_stream_name");
-        
+
         assertEquals("some_stream_name", endpoint.getConfiguration().getStreamName());
         assertEquals("XXX", endpoint.getConfiguration().getAccessKey());
         assertEquals("YYY", endpoint.getConfiguration().getSecretKey());
     }
-    
+
     @Test
     public void createEndpointWithComponentAndEndpointElements() throws Exception {
         Kinesis2Component component = context.getComponent("aws2-kinesis", Kinesis2Component.class);
@@ -55,21 +55,22 @@ public class KinesisComponentConfigurationTest extends CamelTestSupport {
         component.setSecretKey("YYY");
         component.setRegion(Region.US_WEST_1.toString());
         Kinesis2Endpoint endpoint = (Kinesis2Endpoint)component.createEndpoint("aws2-kinesis://some_stream_name?accessKey=xxxxxx&secretKey=yyyyy&region=US_EAST_1");
-        
+
         assertEquals("some_stream_name", endpoint.getConfiguration().getStreamName());
         assertEquals("xxxxxx", endpoint.getConfiguration().getAccessKey());
         assertEquals("yyyyy", endpoint.getConfiguration().getSecretKey());
         assertEquals("US_EAST_1", endpoint.getConfiguration().getRegion());
     }
-    
+
     @Test
     public void createEndpointWithComponentEndpointElementsAndProxy() throws Exception {
         Kinesis2Component component = context.getComponent("aws2-kinesis", Kinesis2Component.class);
         component.setAccessKey("XXX");
         component.setSecretKey("YYY");
         component.setRegion(Region.US_WEST_1.toString());
-        Kinesis2Endpoint endpoint = (Kinesis2Endpoint)component.createEndpoint("aws2-kinesis://label?accessKey=xxxxxx&secretKey=yyyyy&region=US_EAST_1&proxyHost=localhost&proxyPort=9000&proxyProtocol=HTTP");
-        
+        Kinesis2Endpoint endpoint = (Kinesis2Endpoint)component
+            .createEndpoint("aws2-kinesis://label?accessKey=xxxxxx&secretKey=yyyyy&region=US_EAST_1&proxyHost=localhost&proxyPort=9000&proxyProtocol=HTTP");
+
         assertEquals("xxxxxx", endpoint.getConfiguration().getAccessKey());
         assertEquals("yyyyy", endpoint.getConfiguration().getSecretKey());
         assertEquals("US_EAST_1", endpoint.getConfiguration().getRegion());
diff --git a/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithFailTest.java b/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithFailTest.java
index 38e6450..c713e91 100644
--- a/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithFailTest.java
+++ b/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithFailTest.java
@@ -75,7 +75,6 @@ public class KinesisConsumerClosedShardWithFailTest {
         Kinesis2Endpoint endpoint = new Kinesis2Endpoint(null, configuration, component);
         endpoint.start();
         undertest = new Kinesis2Consumer(endpoint, processor);
-        
 
         SequenceNumberRange range = SequenceNumberRange.builder().endingSequenceNumber("20").build();
         Shard shard = Shard.builder().shardId("shardId").sequenceNumberRange(range).build();
diff --git a/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithSilentTest.java b/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithSilentTest.java
index bccdd37..24c75d6 100644
--- a/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithSilentTest.java
+++ b/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithSilentTest.java
@@ -81,27 +81,16 @@ public class KinesisConsumerClosedShardWithSilentTest {
         Kinesis2Endpoint endpoint = new Kinesis2Endpoint(null, configuration, component);
         endpoint.start();
         undertest = new Kinesis2Consumer(endpoint, processor);
-        
+
         SequenceNumberRange range = SequenceNumberRange.builder().endingSequenceNumber("20").build();
         Shard shard = Shard.builder().shardId("shardId").sequenceNumberRange(range).build();
         ArrayList<Shard> shardList = new ArrayList<>();
         shardList.add(shard);
-       
 
-        when(kinesisClient.getRecords(any(GetRecordsRequest.class)))
-            .thenReturn(GetRecordsResponse.builder()
-                .nextShardIterator("nextShardIterator").build()
-            );
+        when(kinesisClient.getRecords(any(GetRecordsRequest.class))).thenReturn(GetRecordsResponse.builder().nextShardIterator("nextShardIterator").build());
         when(kinesisClient.describeStream(any(DescribeStreamRequest.class)))
-            .thenReturn(DescribeStreamResponse.builder()
-                .streamDescription(StreamDescription.builder()
-                    .shards(shardList).build()
-                ).build()
-            );
-        when(kinesisClient.getShardIterator(any(GetShardIteratorRequest.class)))
-            .thenReturn(GetShardIteratorResponse.builder()
-                .shardIterator("shardIterator").build()
-            );
+            .thenReturn(DescribeStreamResponse.builder().streamDescription(StreamDescription.builder().shards(shardList).build()).build());
+        when(kinesisClient.getShardIterator(any(GetShardIteratorRequest.class))).thenReturn(GetShardIteratorResponse.builder().shardIterator("shardIterator").build());
     }
 
     @Test
@@ -181,11 +170,8 @@ public class KinesisConsumerClosedShardWithSilentTest {
 
     @Test
     public void recordsAreSentToTheProcessor() throws Exception {
-        when(kinesisClient.getRecords(any(GetRecordsRequest.class)))
-            .thenReturn(GetRecordsResponse.builder()
-                .nextShardIterator("nextShardIterator")
-                .records(Record.builder().sequenceNumber("1").build(), Record.builder().sequenceNumber("2").build()).build()
-            );
+        when(kinesisClient.getRecords(any(GetRecordsRequest.class))).thenReturn(GetRecordsResponse.builder().nextShardIterator("nextShardIterator")
+            .records(Record.builder().sequenceNumber("1").build(), Record.builder().sequenceNumber("2").build()).build());
 
         int messageCount = undertest.poll();
 
@@ -201,15 +187,8 @@ public class KinesisConsumerClosedShardWithSilentTest {
     public void exchangePropertiesAreSet() throws Exception {
         String partitionKey = "partitionKey";
         String sequenceNumber = "1";
-        when(kinesisClient.getRecords(any(GetRecordsRequest.class)))
-            .thenReturn(GetRecordsResponse.builder()
-                .nextShardIterator("nextShardIterator")
-                .records(Record.builder()
-                    .sequenceNumber(sequenceNumber)
-                    .approximateArrivalTimestamp(Instant.now())
-                    .partitionKey(partitionKey).build()
-                ).build()
-            );
+        when(kinesisClient.getRecords(any(GetRecordsRequest.class))).thenReturn(GetRecordsResponse.builder().nextShardIterator("nextShardIterator")
+            .records(Record.builder().sequenceNumber(sequenceNumber).approximateArrivalTimestamp(Instant.now()).partitionKey(partitionKey).build()).build());
 
         undertest.poll();
 
diff --git a/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisEndpointTest.java b/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisEndpointTest.java
index 413c107..3b58993 100644
--- a/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisEndpointTest.java
+++ b/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisEndpointTest.java
@@ -49,13 +49,8 @@ public class KinesisEndpointTest {
 
     @Test
     public void allTheEndpointParams() throws Exception {
-        Kinesis2Endpoint endpoint = (Kinesis2Endpoint) camelContext.getEndpoint("aws2-kinesis://some_stream_name"
-                + "?amazonKinesisClient=#kinesisClient"
-                + "&maxResultsPerRequest=101"
-                + "&iteratorType=latest"
-                + "&shardId=abc"
-                + "&sequenceNumber=123"
-        );
+        Kinesis2Endpoint endpoint = (Kinesis2Endpoint)camelContext.getEndpoint("aws2-kinesis://some_stream_name" + "?amazonKinesisClient=#kinesisClient"
+                                                                               + "&maxResultsPerRequest=101" + "&iteratorType=latest" + "&shardId=abc" + "&sequenceNumber=123");
 
         assertThat(endpoint.getConfiguration().getAmazonKinesisClient(), is(amazonKinesisClient));
         assertThat(endpoint.getConfiguration().getStreamName(), is("some_stream_name"));
@@ -67,9 +62,7 @@ public class KinesisEndpointTest {
 
     @Test
     public void onlyRequiredEndpointParams() throws Exception {
-        Kinesis2Endpoint endpoint = (Kinesis2Endpoint) camelContext.getEndpoint("aws2-kinesis://some_stream_name"
-                + "?amazonKinesisClient=#kinesisClient"
-        );
+        Kinesis2Endpoint endpoint = (Kinesis2Endpoint)camelContext.getEndpoint("aws2-kinesis://some_stream_name" + "?amazonKinesisClient=#kinesisClient");
 
         assertThat(endpoint.getConfiguration().getAmazonKinesisClient(), is(amazonKinesisClient));
         assertThat(endpoint.getConfiguration().getStreamName(), is("some_stream_name"));
@@ -79,12 +72,8 @@ public class KinesisEndpointTest {
 
     @Test
     public void afterSequenceNumberRequiresSequenceNumber() throws Exception {
-        Kinesis2Endpoint endpoint = (Kinesis2Endpoint) camelContext.getEndpoint("aws2-kinesis://some_stream_name"
-                + "?amazonKinesisClient=#kinesisClient"
-                + "&iteratorType=AFTER_SEQUENCE_NUMBER"
-                + "&shardId=abc"
-                + "&sequenceNumber=123"
-        );
+        Kinesis2Endpoint endpoint = (Kinesis2Endpoint)camelContext.getEndpoint("aws2-kinesis://some_stream_name" + "?amazonKinesisClient=#kinesisClient"
+                                                                               + "&iteratorType=AFTER_SEQUENCE_NUMBER" + "&shardId=abc" + "&sequenceNumber=123");
 
         assertThat(endpoint.getConfiguration().getAmazonKinesisClient(), is(amazonKinesisClient));
         assertThat(endpoint.getConfiguration().getStreamName(), is("some_stream_name"));
@@ -95,12 +84,8 @@ public class KinesisEndpointTest {
 
     @Test
     public void atSequenceNumberRequiresSequenceNumber() throws Exception {
-        Kinesis2Endpoint endpoint = (Kinesis2Endpoint) camelContext.getEndpoint("aws2-kinesis://some_stream_name"
-                + "?amazonKinesisClient=#kinesisClient"
-                + "&iteratorType=AT_SEQUENCE_NUMBER"
-                + "&shardId=abc"
-                + "&sequenceNumber=123"
-        );
+        Kinesis2Endpoint endpoint = (Kinesis2Endpoint)camelContext
+            .getEndpoint("aws2-kinesis://some_stream_name" + "?amazonKinesisClient=#kinesisClient" + "&iteratorType=AT_SEQUENCE_NUMBER" + "&shardId=abc" + "&sequenceNumber=123");
 
         assertThat(endpoint.getConfiguration().getAmazonKinesisClient(), is(amazonKinesisClient));
         assertThat(endpoint.getConfiguration().getStreamName(), is("some_stream_name"));
diff --git a/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/RecordStringConverterTest.java b/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/RecordStringConverterTest.java
index e368f87..6515b9d 100644
--- a/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/RecordStringConverterTest.java
+++ b/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/RecordStringConverterTest.java
@@ -32,9 +32,7 @@ public class RecordStringConverterTest {
 
     @Test
     public void convertRecordToString() throws Exception {
-        Record record = Record.builder()
-                .sequenceNumber("1")
-                .data(SdkBytes.fromByteBuffer(ByteBuffer.wrap("this is a String".getBytes(Charset.forName("UTF-8"))))).build();
+        Record record = Record.builder().sequenceNumber("1").data(SdkBytes.fromByteBuffer(ByteBuffer.wrap("this is a String".getBytes(Charset.forName("UTF-8"))))).build();
 
         String result = RecordStringConverter.toString(record);
         assertThat(result, is("this is a String"));