You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2020/02/27 17:27:45 UTC
[camel] 06/13: CAMEL-14520 - Create an AWS-Kinesis component based
on SDK v2, fixed CS
This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 3c4c288e0bae52b8161e90df4e412c48e3a108d2
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Thu Feb 27 17:14:33 2020 +0100
CAMEL-14520 - Create an AWS-Kinesis component based on SDK v2, fixed CS
---
.../aws2/firehose/KinesisFirehose2Component.java | 14 ++++----
.../firehose/KinesisFirehose2Configuration.java | 10 +++---
.../aws2/firehose/KinesisFirehose2Endpoint.java | 19 ++++++-----
.../aws2/firehose/KinesisFirehose2Producer.java | 5 ++-
.../component/aws2/kinesis/Kinesis2Component.java | 14 ++++----
.../aws2/kinesis/Kinesis2Configuration.java | 16 +++++-----
.../component/aws2/kinesis/Kinesis2Constants.java | 3 +-
.../component/aws2/kinesis/Kinesis2Consumer.java | 26 +++++++--------
.../component/aws2/kinesis/Kinesis2Endpoint.java | 14 ++++----
.../component/aws2/kinesis/Kinesis2Producer.java | 4 +--
.../kinesis/Kinesis2ShardClosedStrategyEnum.java | 4 +--
.../KinesisFirehoseComponentConfigurationTest.java | 24 +++++++-------
.../aws2/firehose/KinesisFirehoseEndpointTest.java | 12 +++----
.../KinesisFirehoseComponentIntegrationTest.java | 6 ++--
.../kinesis/KinesisComponentConfigurationTest.java | 21 ++++++------
.../KinesisConsumerClosedShardWithFailTest.java | 1 -
.../KinesisConsumerClosedShardWithSilentTest.java | 37 +++++-----------------
.../aws2/kinesis/KinesisEndpointTest.java | 29 ++++-------------
.../aws2/kinesis/RecordStringConverterTest.java | 4 +--
19 files changed, 109 insertions(+), 154 deletions(-)
diff --git a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Component.java b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Component.java
index a9a25b9..61e716c 100644
--- a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Component.java
+++ b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Component.java
@@ -38,16 +38,16 @@ public class KinesisFirehose2Component extends DefaultComponent {
private String secretKey;
@Metadata
private String region;
- @Metadata(label = "advanced")
+ @Metadata(label = "advanced")
private KinesisFirehose2Configuration configuration;
-
+
public KinesisFirehose2Component() {
this(null);
}
public KinesisFirehose2Component(CamelContext context) {
super(context);
-
+
registerExtension(new KinesisFirehose2ComponentVerifierExtension());
}
@@ -66,7 +66,7 @@ public class KinesisFirehose2Component extends DefaultComponent {
}
return endpoint;
}
-
+
public KinesisFirehose2Configuration getConfiguration() {
return configuration;
}
@@ -77,7 +77,7 @@ public class KinesisFirehose2Component extends DefaultComponent {
public void setConfiguration(KinesisFirehose2Configuration configuration) {
this.configuration = configuration;
}
-
+
public String getAccessKey() {
return accessKey;
}
@@ -99,7 +99,7 @@ public class KinesisFirehose2Component extends DefaultComponent {
public void setSecretKey(String secretKey) {
this.secretKey = secretKey;
}
-
+
public String getRegion() {
return region;
}
@@ -110,7 +110,7 @@ public class KinesisFirehose2Component extends DefaultComponent {
public void setRegion(String region) {
this.region = region;
}
-
+
private void checkAndSetRegistryClient(KinesisFirehose2Configuration configuration) {
Set<FirehoseClient> clients = getCamelContext().getRegistry().findByType(FirehoseClient.class);
if (clients.size() == 1) {
diff --git a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Configuration.java b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Configuration.java
index 82cab1f..67724ba 100644
--- a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Configuration.java
+++ b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Configuration.java
@@ -36,8 +36,8 @@ public class KinesisFirehose2Configuration implements Cloneable {
private String accessKey;
@UriParam(label = "security", secret = true, description = "Amazon AWS Secret Key")
private String secretKey;
- @UriParam(description = "The region in which Kinesis client needs to work. When using this parameter, the configuration will expect the capitalized name of the region (for example AP_EAST_1)"
- + "You'll need to use the name Regions.EU_WEST_1.name()")
+ @UriParam(description = "The region in which Kinesis client needs to work. When using this parameter, the configuration will expect the capitalized name of the region (for example AP_EAST_1)"
+ + "You'll need to use the name Regions.EU_WEST_1.name()")
private String region;
@UriParam(description = "Amazon Kinesis Firehose client to use for all requests for this endpoint")
private FirehoseClient amazonKinesisFirehoseClient;
@@ -47,7 +47,7 @@ public class KinesisFirehose2Configuration implements Cloneable {
private String proxyHost;
@UriParam(description = "To define a proxy port when instantiating the Kinesis Firehose client")
private Integer proxyPort;
-
+
public void setAmazonKinesisFirehoseClient(FirehoseClient client) {
this.amazonKinesisFirehoseClient = client;
}
@@ -94,7 +94,7 @@ public class KinesisFirehose2Configuration implements Cloneable {
public void setProxyProtocol(Protocol proxyProtocol) {
this.proxyProtocol = proxyProtocol;
- }
+ }
public String getProxyHost() {
return proxyHost;
@@ -111,7 +111,7 @@ public class KinesisFirehose2Configuration implements Cloneable {
public void setProxyPort(Integer proxyPort) {
this.proxyPort = proxyPort;
}
-
+
// *************************************************
//
// *************************************************
diff --git a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Endpoint.java b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Endpoint.java
index 8a85366..7fd05f7 100644
--- a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Endpoint.java
+++ b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Endpoint.java
@@ -37,15 +37,15 @@ import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.KinesisClientBuilder;
/**
- * The aws-kinesis-firehose component is used for producing Amazon's Kinesis Firehose streams.
+ * The aws-kinesis-firehose component is used for producing Amazon's Kinesis
+ * Firehose streams.
*/
-@UriEndpoint(firstVersion = "3.2.0", scheme = "aws2-kinesis-firehose", title = "AWS 2 Kinesis Firehose", syntax = "aws2-kinesis-firehose:streamName",
- producerOnly = true, label = "cloud,messaging")
+@UriEndpoint(firstVersion = "3.2.0", scheme = "aws2-kinesis-firehose", title = "AWS 2 Kinesis Firehose", syntax = "aws2-kinesis-firehose:streamName", producerOnly = true, label = "cloud,messaging")
public class KinesisFirehose2Endpoint extends DefaultEndpoint {
@UriParam
private KinesisFirehose2Configuration configuration;
-
+
private FirehoseClient kinesisFirehoseClient;
public KinesisFirehose2Endpoint(String uri, KinesisFirehose2Configuration configuration, KinesisFirehose2Component component) {
@@ -62,15 +62,14 @@ public class KinesisFirehose2Endpoint extends DefaultEndpoint {
public Consumer createConsumer(Processor processor) throws Exception {
throw new UnsupportedOperationException("You cannot consume messages from this endpoint");
}
-
+
@Override
protected void doStart() throws Exception {
super.doStart();
- kinesisFirehoseClient = configuration.getAmazonKinesisFirehoseClient() != null ? configuration.getAmazonKinesisFirehoseClient()
- : createKinesisFirehoseClient();
-
+ kinesisFirehoseClient = configuration.getAmazonKinesisFirehoseClient() != null ? configuration.getAmazonKinesisFirehoseClient() : createKinesisFirehoseClient();
+
}
-
+
@Override
public void doStop() throws Exception {
if (ObjectHelper.isEmpty(configuration.getAmazonKinesisFirehoseClient())) {
@@ -80,7 +79,7 @@ public class KinesisFirehose2Endpoint extends DefaultEndpoint {
}
super.doStop();
}
-
+
FirehoseClient createKinesisFirehoseClient() {
FirehoseClient client = null;
FirehoseClientBuilder clientBuilder = FirehoseClient.builder();
diff --git a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Producer.java b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Producer.java
index 7f26ec8..0fe76b1 100644
--- a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Producer.java
+++ b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Producer.java
@@ -18,7 +18,6 @@ package org.apache.camel.component.aws2.firehose;
import java.nio.ByteBuffer;
-
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.support.DefaultProducer;
@@ -40,7 +39,7 @@ public class KinesisFirehose2Producer extends DefaultProducer {
@Override
public KinesisFirehose2Endpoint getEndpoint() {
- return (KinesisFirehose2Endpoint) super.getEndpoint();
+ return (KinesisFirehose2Endpoint)super.getEndpoint();
}
@Override
@@ -63,7 +62,7 @@ public class KinesisFirehose2Producer extends DefaultProducer {
putRecordRequest.record(record.build());
return putRecordRequest.build();
}
-
+
public static Message getMessageForResponse(final Exchange exchange) {
return exchange.getMessage();
}
diff --git a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Component.java b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Component.java
index 36dee4f..a4ec5da 100644
--- a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Component.java
+++ b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Component.java
@@ -36,7 +36,7 @@ public class Kinesis2Component extends DefaultComponent {
private String secretKey;
@Metadata
private String region;
- @Metadata(label = "advanced")
+ @Metadata(label = "advanced")
private Kinesis2Configuration configuration;
public Kinesis2Component() {
@@ -45,7 +45,7 @@ public class Kinesis2Component extends DefaultComponent {
public Kinesis2Component(CamelContext context) {
super(context);
-
+
registerExtension(new Kinesis2ComponentVerifierExtension());
}
@@ -61,10 +61,10 @@ public class Kinesis2Component extends DefaultComponent {
checkAndSetRegistryClient(configuration);
if (configuration.getAmazonKinesisClient() == null && (configuration.getAccessKey() == null || configuration.getSecretKey() == null)) {
throw new IllegalArgumentException("amazonKinesisClient or accessKey and secretKey must be specified");
- }
+ }
return endpoint;
}
-
+
public Kinesis2Configuration getConfiguration() {
return configuration;
}
@@ -75,7 +75,7 @@ public class Kinesis2Component extends DefaultComponent {
public void setConfiguration(Kinesis2Configuration configuration) {
this.configuration = configuration;
}
-
+
public String getAccessKey() {
return accessKey;
}
@@ -97,7 +97,7 @@ public class Kinesis2Component extends DefaultComponent {
public void setSecretKey(String secretKey) {
this.secretKey = secretKey;
}
-
+
public String getRegion() {
return region;
}
@@ -108,7 +108,7 @@ public class Kinesis2Component extends DefaultComponent {
public void setRegion(String region) {
this.region = region;
}
-
+
private void checkAndSetRegistryClient(Kinesis2Configuration configuration) {
Set<KinesisClient> clients = getCamelContext().getRegistry().findByType(KinesisClient.class);
if (clients.size() == 1) {
diff --git a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Configuration.java b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Configuration.java
index 575f057..0dca1a2 100644
--- a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Configuration.java
+++ b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Configuration.java
@@ -28,7 +28,7 @@ import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
@UriParams
public class Kinesis2Configuration implements Cloneable {
-
+
@UriPath(description = "Name of the stream")
@Metadata(required = true)
private String streamName;
@@ -36,8 +36,8 @@ public class Kinesis2Configuration implements Cloneable {
private String accessKey;
@UriParam(label = "security", secret = true, description = "Amazon AWS Secret Key")
private String secretKey;
- @UriParam(description = "The region in which Kinesis client needs to work. When using this parameter, the configuration will expect the capitalized name of the region (for example AP_EAST_1)"
- + "You'll need to use the name Regions.EU_WEST_1.name()")
+ @UriParam(description = "The region in which Kinesis client needs to work. When using this parameter, the configuration will expect the capitalized name of the region (for example AP_EAST_1)"
+ + "You'll need to use the name Regions.EU_WEST_1.name()")
private String region;
@UriParam(description = "Amazon Kinesis client to use for all requests for this endpoint")
private KinesisClient amazonKinesisClient;
@@ -116,7 +116,7 @@ public class Kinesis2Configuration implements Cloneable {
public void setShardClosed(Kinesis2ShardClosedStrategyEnum shardClosed) {
this.shardClosed = shardClosed;
}
-
+
public String getAccessKey() {
return accessKey;
}
@@ -140,14 +140,14 @@ public class Kinesis2Configuration implements Cloneable {
public void setRegion(String region) {
this.region = region;
}
-
+
public Protocol getProxyProtocol() {
return proxyProtocol;
}
public void setProxyProtocol(Protocol proxyProtocol) {
this.proxyProtocol = proxyProtocol;
- }
+ }
public String getProxyHost() {
return proxyHost;
@@ -163,8 +163,8 @@ public class Kinesis2Configuration implements Cloneable {
public void setProxyPort(Integer proxyPort) {
this.proxyPort = proxyPort;
- }
-
+ }
+
// *************************************************
//
// *************************************************
diff --git a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Constants.java b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Constants.java
index b09aac9..d6e4f56 100644
--- a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Constants.java
+++ b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Constants.java
@@ -23,7 +23,8 @@ public interface Kinesis2Constants {
String PARTITION_KEY = "CamelAwsKinesisPartitionKey";
/**
- * in a Kinesis Record object, the shard ID is used on writes to indicate where the data was stored
+ * in a Kinesis Record object, the shard ID is used on writes to indicate
+ * where the data was stored
*/
String SHARD_ID = "CamelAwsKinesisShardId";
}
diff --git a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
index e080083..0ad92fa 100644
--- a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
+++ b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
@@ -67,16 +67,16 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer {
currentShardIterator = result.nextShardIterator();
if (isShardClosed) {
switch (getEndpoint().getConfiguration().getShardClosed()) {
- case ignore:
- LOG.warn("The shard {} is in closed state", currentShardIterator);
- break;
- case silent:
- break;
- case fail:
- LOG.info("Shard Iterator reaches CLOSE status:{} {}", getEndpoint().getConfiguration().getStreamName(), getEndpoint().getConfiguration().getShardId());
- throw new ReachedClosedStatusException(getEndpoint().getConfiguration().getStreamName(), getEndpoint().getConfiguration().getShardId());
- default:
- throw new IllegalArgumentException("Unsupported shard closed strategy");
+ case ignore:
+ LOG.warn("The shard {} is in closed state", currentShardIterator);
+ break;
+ case silent:
+ break;
+ case fail:
+ LOG.info("Shard Iterator reaches CLOSE status:{} {}", getEndpoint().getConfiguration().getStreamName(), getEndpoint().getConfiguration().getShardId());
+ throw new ReachedClosedStatusException(getEndpoint().getConfiguration().getStreamName(), getEndpoint().getConfiguration().getShardId());
+ default:
+ throw new IllegalArgumentException("Unsupported shard closed strategy");
}
}
@@ -136,7 +136,7 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer {
LOG.debug("ShardId is: {}", shardId);
GetShardIteratorRequest.Builder req = GetShardIteratorRequest.builder().streamName(getEndpoint().getConfiguration().getStreamName()).shardId(shardId)
- .shardIteratorType(getEndpoint().getConfiguration().getIteratorType());
+ .shardIteratorType(getEndpoint().getConfiguration().getIteratorType());
if (hasSequenceNumber()) {
req.startingSequenceNumber(getEndpoint().getConfiguration().getSequenceNumber());
@@ -159,7 +159,7 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer {
private boolean hasSequenceNumber() {
return !getEndpoint().getConfiguration().getSequenceNumber().isEmpty()
- && (getEndpoint().getConfiguration().getIteratorType().equals(ShardIteratorType.AFTER_SEQUENCE_NUMBER)
- || getEndpoint().getConfiguration().getIteratorType().equals(ShardIteratorType.AT_SEQUENCE_NUMBER));
+ && (getEndpoint().getConfiguration().getIteratorType().equals(ShardIteratorType.AFTER_SEQUENCE_NUMBER)
+ || getEndpoint().getConfiguration().getIteratorType().equals(ShardIteratorType.AT_SEQUENCE_NUMBER));
}
}
diff --git a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Endpoint.java b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Endpoint.java
index 187efbd..e9cc6e3 100644
--- a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Endpoint.java
+++ b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Endpoint.java
@@ -46,9 +46,9 @@ public class Kinesis2Endpoint extends ScheduledPollEndpoint {
@UriParam
private Kinesis2Configuration configuration;
-
+
private KinesisClient kinesisClient;
-
+
public Kinesis2Endpoint(String uri, Kinesis2Configuration configuration, Kinesis2Component component) {
super(uri, component);
this.configuration = configuration;
@@ -57,16 +57,14 @@ public class Kinesis2Endpoint extends ScheduledPollEndpoint {
@Override
protected void doStart() throws Exception {
super.doStart();
- kinesisClient = configuration.getAmazonKinesisClient() != null ? configuration.getAmazonKinesisClient()
- : createKinesisClient();
-
-
+ kinesisClient = configuration.getAmazonKinesisClient() != null ? configuration.getAmazonKinesisClient() : createKinesisClient();
+
if ((configuration.getIteratorType().equals(ShardIteratorType.AFTER_SEQUENCE_NUMBER) || configuration.getIteratorType().equals(ShardIteratorType.AT_SEQUENCE_NUMBER))
&& configuration.getSequenceNumber().isEmpty()) {
throw new IllegalArgumentException("Sequence Number must be specified with iterator Types AFTER_SEQUENCE_NUMBER or AT_SEQUENCE_NUMBER");
}
}
-
+
@Override
public void doStop() throws Exception {
if (ObjectHelper.isEmpty(configuration.getAmazonKinesisClient())) {
@@ -106,7 +104,7 @@ public class Kinesis2Endpoint extends ScheduledPollEndpoint {
public Kinesis2Configuration getConfiguration() {
return configuration;
}
-
+
KinesisClient createKinesisClient() {
KinesisClient client = null;
KinesisClientBuilder clientBuilder = KinesisClient.builder();
diff --git a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Producer.java b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Producer.java
index 56a3d96..5c10228 100644
--- a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Producer.java
+++ b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Producer.java
@@ -34,7 +34,7 @@ public class Kinesis2Producer extends DefaultProducer {
@Override
public Kinesis2Endpoint getEndpoint() {
- return (Kinesis2Endpoint) super.getEndpoint();
+ return (Kinesis2Endpoint)super.getEndpoint();
}
@Override
@@ -60,7 +60,7 @@ public class Kinesis2Producer extends DefaultProducer {
}
return putRecordRequest.build();
}
-
+
public static Message getMessageForResponse(final Exchange exchange) {
return exchange.getMessage();
}
diff --git a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2ShardClosedStrategyEnum.java b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2ShardClosedStrategyEnum.java
index 166334e..c98a7e0 100644
--- a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2ShardClosedStrategyEnum.java
+++ b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2ShardClosedStrategyEnum.java
@@ -18,7 +18,5 @@ package org.apache.camel.component.aws2.kinesis;
public enum Kinesis2ShardClosedStrategyEnum {
- ignore,
- fail,
- silent
+ ignore, fail, silent
}
diff --git a/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/firehose/KinesisFirehoseComponentConfigurationTest.java b/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/firehose/KinesisFirehoseComponentConfigurationTest.java
index 7d4b2df..80fa545 100644
--- a/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/firehose/KinesisFirehoseComponentConfigurationTest.java
+++ b/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/firehose/KinesisFirehoseComponentConfigurationTest.java
@@ -25,51 +25,53 @@ import software.amazon.awssdk.core.Protocol;
import software.amazon.awssdk.regions.Region;
public class KinesisFirehoseComponentConfigurationTest extends CamelTestSupport {
-
+
@Test
public void createEndpointWithAccessAndSecretKey() throws Exception {
KinesisFirehose2Component component = context.getComponent("aws2-kinesis-firehose", KinesisFirehose2Component.class);
KinesisFirehose2Endpoint endpoint = (KinesisFirehose2Endpoint)component.createEndpoint("aws2-kinesis-firehose://some_stream_name?accessKey=xxxxx&secretKey=yyyyy");
-
+
assertEquals("some_stream_name", endpoint.getConfiguration().getStreamName());
assertEquals("xxxxx", endpoint.getConfiguration().getAccessKey());
- assertEquals("yyyyy", endpoint.getConfiguration().getSecretKey());
+ assertEquals("yyyyy", endpoint.getConfiguration().getSecretKey());
}
-
+
@Test
public void createEndpointWithComponentElements() throws Exception {
KinesisFirehose2Component component = context.getComponent("aws2-kinesis-firehose", KinesisFirehose2Component.class);
component.setAccessKey("XXX");
component.setSecretKey("YYY");
KinesisFirehose2Endpoint endpoint = (KinesisFirehose2Endpoint)component.createEndpoint("aws2-kinesis-firehose://some_stream_name");
-
+
assertEquals("some_stream_name", endpoint.getConfiguration().getStreamName());
assertEquals("XXX", endpoint.getConfiguration().getAccessKey());
assertEquals("YYY", endpoint.getConfiguration().getSecretKey());
}
-
+
@Test
public void createEndpointWithComponentAndEndpointElements() throws Exception {
KinesisFirehose2Component component = context.getComponent("aws2-kinesis-firehose", KinesisFirehose2Component.class);
component.setAccessKey("XXX");
component.setSecretKey("YYY");
component.setRegion(Region.US_WEST_1.toString());
- KinesisFirehose2Endpoint endpoint = (KinesisFirehose2Endpoint)component.createEndpoint("aws2-kinesis-firehose://some_stream_name?accessKey=xxxxxx&secretKey=yyyyy®ion=US_EAST_1");
-
+ KinesisFirehose2Endpoint endpoint = (KinesisFirehose2Endpoint)component
+ .createEndpoint("aws2-kinesis-firehose://some_stream_name?accessKey=xxxxxx&secretKey=yyyyy®ion=US_EAST_1");
+
assertEquals("some_stream_name", endpoint.getConfiguration().getStreamName());
assertEquals("xxxxxx", endpoint.getConfiguration().getAccessKey());
assertEquals("yyyyy", endpoint.getConfiguration().getSecretKey());
assertEquals("US_EAST_1", endpoint.getConfiguration().getRegion());
}
-
+
@Test
public void createEndpointWithComponentEndpointElementsAndProxy() throws Exception {
KinesisFirehose2Component component = context.getComponent("aws2-kinesis-firehose", KinesisFirehose2Component.class);
component.setAccessKey("XXX");
component.setSecretKey("YYY");
component.setRegion(Region.US_WEST_1.toString());
- KinesisFirehose2Endpoint endpoint = (KinesisFirehose2Endpoint)component.createEndpoint("aws2-kinesis-firehose://label?accessKey=xxxxxx&secretKey=yyyyy®ion=US_EAST_1&proxyHost=localhost&proxyPort=9000&proxyProtocol=HTTP");
-
+ KinesisFirehose2Endpoint endpoint = (KinesisFirehose2Endpoint)component
+ .createEndpoint("aws2-kinesis-firehose://label?accessKey=xxxxxx&secretKey=yyyyy®ion=US_EAST_1&proxyHost=localhost&proxyPort=9000&proxyProtocol=HTTP");
+
assertEquals("xxxxxx", endpoint.getConfiguration().getAccessKey());
assertEquals("yyyyy", endpoint.getConfiguration().getSecretKey());
assertEquals("US_EAST_1", endpoint.getConfiguration().getRegion());
diff --git a/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/firehose/KinesisFirehoseEndpointTest.java b/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/firehose/KinesisFirehoseEndpointTest.java
index c3afb12..d3c6477 100644
--- a/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/firehose/KinesisFirehoseEndpointTest.java
+++ b/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/firehose/KinesisFirehoseEndpointTest.java
@@ -49,20 +49,18 @@ public class KinesisFirehoseEndpointTest {
@Test
public void allEndpointParams() throws Exception {
- KinesisFirehose2Endpoint endpoint = (KinesisFirehose2Endpoint) camelContext.getEndpoint("aws2-kinesis-firehose://some_stream_name"
- + "?amazonKinesisFirehoseClient=#firehoseClient"
- );
+ KinesisFirehose2Endpoint endpoint = (KinesisFirehose2Endpoint)camelContext
+ .getEndpoint("aws2-kinesis-firehose://some_stream_name" + "?amazonKinesisFirehoseClient=#firehoseClient");
endpoint.start();
assertThat(endpoint.getClient(), is(amazonKinesisFirehoseClient));
assertThat(endpoint.getConfiguration().getStreamName(), is("some_stream_name"));
}
-
+
@Test
public void allClientCreationParams() throws Exception {
- KinesisFirehose2Endpoint endpoint = (KinesisFirehose2Endpoint) camelContext.getEndpoint("aws2-kinesis-firehose://some_stream_name"
- + "?accessKey=xxx&secretKey=yyy®ion=us-east-1"
- );
+ KinesisFirehose2Endpoint endpoint = (KinesisFirehose2Endpoint)camelContext
+ .getEndpoint("aws2-kinesis-firehose://some_stream_name" + "?accessKey=xxx&secretKey=yyy®ion=us-east-1");
assertThat(endpoint.getConfiguration().getRegion(), is(Region.US_EAST_1.id()));
assertThat(endpoint.getConfiguration().getAccessKey(), is("xxx"));
diff --git a/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/firehose/integration/KinesisFirehoseComponentIntegrationTest.java b/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/firehose/integration/KinesisFirehoseComponentIntegrationTest.java
index 1240902..3bac64a9 100644
--- a/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/firehose/integration/KinesisFirehoseComponentIntegrationTest.java
+++ b/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/firehose/integration/KinesisFirehoseComponentIntegrationTest.java
@@ -33,7 +33,7 @@ public class KinesisFirehoseComponentIntegrationTest extends CamelTestSupport {
@BindToRegistry("FirehoseClient")
FirehoseClient client = FirehoseClient.builder().build();
-
+
@Test
public void testFirehoseRouting() throws Exception {
Exchange exchange = template.send("direct:start", ExchangePattern.InOnly, new Processor() {
@@ -49,10 +49,8 @@ public class KinesisFirehoseComponentIntegrationTest extends CamelTestSupport {
return new RouteBuilder() {
@Override
public void configure() throws Exception {
- from("direct:start")
- .to("aws2-kinesis-firehose://mystream?amazonKinesisFirehoseClient=#FirehoseClient");
+ from("direct:start").to("aws2-kinesis-firehose://mystream?amazonKinesisFirehoseClient=#FirehoseClient");
}
};
}
}
-
diff --git a/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisComponentConfigurationTest.java b/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisComponentConfigurationTest.java
index 25dc454..b6e47c4 100644
--- a/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisComponentConfigurationTest.java
+++ b/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisComponentConfigurationTest.java
@@ -25,29 +25,29 @@ import software.amazon.awssdk.core.Protocol;
import software.amazon.awssdk.regions.Region;
public class KinesisComponentConfigurationTest extends CamelTestSupport {
-
+
@Test
public void createEndpointWithAccessAndSecretKey() throws Exception {
Kinesis2Component component = context.getComponent("aws2-kinesis", Kinesis2Component.class);
Kinesis2Endpoint endpoint = (Kinesis2Endpoint)component.createEndpoint("aws2-kinesis://some_stream_name?accessKey=xxxxx&secretKey=yyyyy");
-
+
assertEquals("some_stream_name", endpoint.getConfiguration().getStreamName());
assertEquals("xxxxx", endpoint.getConfiguration().getAccessKey());
- assertEquals("yyyyy", endpoint.getConfiguration().getSecretKey());
+ assertEquals("yyyyy", endpoint.getConfiguration().getSecretKey());
}
-
+
@Test
public void createEndpointWithComponentElements() throws Exception {
Kinesis2Component component = context.getComponent("aws2-kinesis", Kinesis2Component.class);
component.setAccessKey("XXX");
component.setSecretKey("YYY");
Kinesis2Endpoint endpoint = (Kinesis2Endpoint)component.createEndpoint("aws2-kinesis://some_stream_name");
-
+
assertEquals("some_stream_name", endpoint.getConfiguration().getStreamName());
assertEquals("XXX", endpoint.getConfiguration().getAccessKey());
assertEquals("YYY", endpoint.getConfiguration().getSecretKey());
}
-
+
@Test
public void createEndpointWithComponentAndEndpointElements() throws Exception {
Kinesis2Component component = context.getComponent("aws2-kinesis", Kinesis2Component.class);
@@ -55,21 +55,22 @@ public class KinesisComponentConfigurationTest extends CamelTestSupport {
component.setSecretKey("YYY");
component.setRegion(Region.US_WEST_1.toString());
Kinesis2Endpoint endpoint = (Kinesis2Endpoint)component.createEndpoint("aws2-kinesis://some_stream_name?accessKey=xxxxxx&secretKey=yyyyy®ion=US_EAST_1");
-
+
assertEquals("some_stream_name", endpoint.getConfiguration().getStreamName());
assertEquals("xxxxxx", endpoint.getConfiguration().getAccessKey());
assertEquals("yyyyy", endpoint.getConfiguration().getSecretKey());
assertEquals("US_EAST_1", endpoint.getConfiguration().getRegion());
}
-
+
@Test
public void createEndpointWithComponentEndpointElementsAndProxy() throws Exception {
Kinesis2Component component = context.getComponent("aws2-kinesis", Kinesis2Component.class);
component.setAccessKey("XXX");
component.setSecretKey("YYY");
component.setRegion(Region.US_WEST_1.toString());
- Kinesis2Endpoint endpoint = (Kinesis2Endpoint)component.createEndpoint("aws2-kinesis://label?accessKey=xxxxxx&secretKey=yyyyy®ion=US_EAST_1&proxyHost=localhost&proxyPort=9000&proxyProtocol=HTTP");
-
+ Kinesis2Endpoint endpoint = (Kinesis2Endpoint)component
+ .createEndpoint("aws2-kinesis://label?accessKey=xxxxxx&secretKey=yyyyy®ion=US_EAST_1&proxyHost=localhost&proxyPort=9000&proxyProtocol=HTTP");
+
assertEquals("xxxxxx", endpoint.getConfiguration().getAccessKey());
assertEquals("yyyyy", endpoint.getConfiguration().getSecretKey());
assertEquals("US_EAST_1", endpoint.getConfiguration().getRegion());
diff --git a/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithFailTest.java b/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithFailTest.java
index 38e6450..c713e91 100644
--- a/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithFailTest.java
+++ b/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithFailTest.java
@@ -75,7 +75,6 @@ public class KinesisConsumerClosedShardWithFailTest {
Kinesis2Endpoint endpoint = new Kinesis2Endpoint(null, configuration, component);
endpoint.start();
undertest = new Kinesis2Consumer(endpoint, processor);
-
SequenceNumberRange range = SequenceNumberRange.builder().endingSequenceNumber("20").build();
Shard shard = Shard.builder().shardId("shardId").sequenceNumberRange(range).build();
diff --git a/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithSilentTest.java b/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithSilentTest.java
index bccdd37..24c75d6 100644
--- a/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithSilentTest.java
+++ b/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithSilentTest.java
@@ -81,27 +81,16 @@ public class KinesisConsumerClosedShardWithSilentTest {
Kinesis2Endpoint endpoint = new Kinesis2Endpoint(null, configuration, component);
endpoint.start();
undertest = new Kinesis2Consumer(endpoint, processor);
-
+
SequenceNumberRange range = SequenceNumberRange.builder().endingSequenceNumber("20").build();
Shard shard = Shard.builder().shardId("shardId").sequenceNumberRange(range).build();
ArrayList<Shard> shardList = new ArrayList<>();
shardList.add(shard);
-
- when(kinesisClient.getRecords(any(GetRecordsRequest.class)))
- .thenReturn(GetRecordsResponse.builder()
- .nextShardIterator("nextShardIterator").build()
- );
+ when(kinesisClient.getRecords(any(GetRecordsRequest.class))).thenReturn(GetRecordsResponse.builder().nextShardIterator("nextShardIterator").build());
when(kinesisClient.describeStream(any(DescribeStreamRequest.class)))
- .thenReturn(DescribeStreamResponse.builder()
- .streamDescription(StreamDescription.builder()
- .shards(shardList).build()
- ).build()
- );
- when(kinesisClient.getShardIterator(any(GetShardIteratorRequest.class)))
- .thenReturn(GetShardIteratorResponse.builder()
- .shardIterator("shardIterator").build()
- );
+ .thenReturn(DescribeStreamResponse.builder().streamDescription(StreamDescription.builder().shards(shardList).build()).build());
+ when(kinesisClient.getShardIterator(any(GetShardIteratorRequest.class))).thenReturn(GetShardIteratorResponse.builder().shardIterator("shardIterator").build());
}
@Test
@@ -181,11 +170,8 @@ public class KinesisConsumerClosedShardWithSilentTest {
@Test
public void recordsAreSentToTheProcessor() throws Exception {
- when(kinesisClient.getRecords(any(GetRecordsRequest.class)))
- .thenReturn(GetRecordsResponse.builder()
- .nextShardIterator("nextShardIterator")
- .records(Record.builder().sequenceNumber("1").build(), Record.builder().sequenceNumber("2").build()).build()
- );
+ when(kinesisClient.getRecords(any(GetRecordsRequest.class))).thenReturn(GetRecordsResponse.builder().nextShardIterator("nextShardIterator")
+ .records(Record.builder().sequenceNumber("1").build(), Record.builder().sequenceNumber("2").build()).build());
int messageCount = undertest.poll();
@@ -201,15 +187,8 @@ public class KinesisConsumerClosedShardWithSilentTest {
public void exchangePropertiesAreSet() throws Exception {
String partitionKey = "partitionKey";
String sequenceNumber = "1";
- when(kinesisClient.getRecords(any(GetRecordsRequest.class)))
- .thenReturn(GetRecordsResponse.builder()
- .nextShardIterator("nextShardIterator")
- .records(Record.builder()
- .sequenceNumber(sequenceNumber)
- .approximateArrivalTimestamp(Instant.now())
- .partitionKey(partitionKey).build()
- ).build()
- );
+ when(kinesisClient.getRecords(any(GetRecordsRequest.class))).thenReturn(GetRecordsResponse.builder().nextShardIterator("nextShardIterator")
+ .records(Record.builder().sequenceNumber(sequenceNumber).approximateArrivalTimestamp(Instant.now()).partitionKey(partitionKey).build()).build());
undertest.poll();
diff --git a/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisEndpointTest.java b/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisEndpointTest.java
index 413c107..3b58993 100644
--- a/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisEndpointTest.java
+++ b/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisEndpointTest.java
@@ -49,13 +49,8 @@ public class KinesisEndpointTest {
@Test
public void allTheEndpointParams() throws Exception {
- Kinesis2Endpoint endpoint = (Kinesis2Endpoint) camelContext.getEndpoint("aws2-kinesis://some_stream_name"
- + "?amazonKinesisClient=#kinesisClient"
- + "&maxResultsPerRequest=101"
- + "&iteratorType=latest"
- + "&shardId=abc"
- + "&sequenceNumber=123"
- );
+ Kinesis2Endpoint endpoint = (Kinesis2Endpoint)camelContext.getEndpoint("aws2-kinesis://some_stream_name" + "?amazonKinesisClient=#kinesisClient"
+ + "&maxResultsPerRequest=101" + "&iteratorType=latest" + "&shardId=abc" + "&sequenceNumber=123");
assertThat(endpoint.getConfiguration().getAmazonKinesisClient(), is(amazonKinesisClient));
assertThat(endpoint.getConfiguration().getStreamName(), is("some_stream_name"));
@@ -67,9 +62,7 @@ public class KinesisEndpointTest {
@Test
public void onlyRequiredEndpointParams() throws Exception {
- Kinesis2Endpoint endpoint = (Kinesis2Endpoint) camelContext.getEndpoint("aws2-kinesis://some_stream_name"
- + "?amazonKinesisClient=#kinesisClient"
- );
+ Kinesis2Endpoint endpoint = (Kinesis2Endpoint)camelContext.getEndpoint("aws2-kinesis://some_stream_name" + "?amazonKinesisClient=#kinesisClient");
assertThat(endpoint.getConfiguration().getAmazonKinesisClient(), is(amazonKinesisClient));
assertThat(endpoint.getConfiguration().getStreamName(), is("some_stream_name"));
@@ -79,12 +72,8 @@ public class KinesisEndpointTest {
@Test
public void afterSequenceNumberRequiresSequenceNumber() throws Exception {
- Kinesis2Endpoint endpoint = (Kinesis2Endpoint) camelContext.getEndpoint("aws2-kinesis://some_stream_name"
- + "?amazonKinesisClient=#kinesisClient"
- + "&iteratorType=AFTER_SEQUENCE_NUMBER"
- + "&shardId=abc"
- + "&sequenceNumber=123"
- );
+ Kinesis2Endpoint endpoint = (Kinesis2Endpoint)camelContext.getEndpoint("aws2-kinesis://some_stream_name" + "?amazonKinesisClient=#kinesisClient"
+ + "&iteratorType=AFTER_SEQUENCE_NUMBER" + "&shardId=abc" + "&sequenceNumber=123");
assertThat(endpoint.getConfiguration().getAmazonKinesisClient(), is(amazonKinesisClient));
assertThat(endpoint.getConfiguration().getStreamName(), is("some_stream_name"));
@@ -95,12 +84,8 @@ public class KinesisEndpointTest {
@Test
public void atSequenceNumberRequiresSequenceNumber() throws Exception {
- Kinesis2Endpoint endpoint = (Kinesis2Endpoint) camelContext.getEndpoint("aws2-kinesis://some_stream_name"
- + "?amazonKinesisClient=#kinesisClient"
- + "&iteratorType=AT_SEQUENCE_NUMBER"
- + "&shardId=abc"
- + "&sequenceNumber=123"
- );
+ Kinesis2Endpoint endpoint = (Kinesis2Endpoint)camelContext
+ .getEndpoint("aws2-kinesis://some_stream_name" + "?amazonKinesisClient=#kinesisClient" + "&iteratorType=AT_SEQUENCE_NUMBER" + "&shardId=abc" + "&sequenceNumber=123");
assertThat(endpoint.getConfiguration().getAmazonKinesisClient(), is(amazonKinesisClient));
assertThat(endpoint.getConfiguration().getStreamName(), is("some_stream_name"));
diff --git a/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/RecordStringConverterTest.java b/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/RecordStringConverterTest.java
index e368f87..6515b9d 100644
--- a/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/RecordStringConverterTest.java
+++ b/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/RecordStringConverterTest.java
@@ -32,9 +32,7 @@ public class RecordStringConverterTest {
@Test
public void convertRecordToString() throws Exception {
- Record record = Record.builder()
- .sequenceNumber("1")
- .data(SdkBytes.fromByteBuffer(ByteBuffer.wrap("this is a String".getBytes(Charset.forName("UTF-8"))))).build();
+ Record record = Record.builder().sequenceNumber("1").data(SdkBytes.fromByteBuffer(ByteBuffer.wrap("this is a String".getBytes(Charset.forName("UTF-8"))))).build();
String result = RecordStringConverter.toString(record);
assertThat(result, is("this is a String"));