You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2018/01/22 08:59:52 UTC
[camel] branch master updated: CAMEL-12172 - Camel-AWS Kinesis: Add
the ability to specify credentials and region at component level
This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new 3701f2d CAMEL-12172 - Camel-AWS Kinesis: Add the ability to specify credentials and region at component level
3701f2d is described below
commit 3701f2d21052799a5d907dfe3776e1c014062ff0
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Mon Jan 22 09:57:03 2018 +0100
CAMEL-12172 - Camel-AWS Kinesis: Add the ability to specify credentials and region at component level
---
.../src/main/docs/aws-kinesis-component.adoc | 14 +-
.../component/aws/kinesis/KinesisComponent.java | 69 ++++++++-
.../aws/kinesis/KinesisConfiguration.java | 16 +-
.../kinesis/KinesisComponentConfigurationTest.java | 28 ++++
.../springboot/KinesisComponentConfiguration.java | 165 +++++++++++++++++++++
5 files changed, 289 insertions(+), 3 deletions(-)
diff --git a/components/camel-aws/src/main/docs/aws-kinesis-component.adoc b/components/camel-aws/src/main/docs/aws-kinesis-component.adoc
index 559a89c..a570cb2 100644
--- a/components/camel-aws/src/main/docs/aws-kinesis-component.adoc
+++ b/components/camel-aws/src/main/docs/aws-kinesis-component.adoc
@@ -26,7 +26,19 @@ The stream needs to be created prior to it being used. +
// component options: START
-The AWS Kinesis component has no options.
+The AWS Kinesis component supports 5 options which are listed below.
+
+
+
+[width="100%",cols="2,5,^1,2",options="header"]
+|===
+| Name | Description | Default | Type
+| *configuration* (advanced) | The AWS S3 default configuration | | KinesisConfiguration
+| *accessKey* (common) | Amazon AWS Access Key | | String
+| *secretKey* (common) | Amazon AWS Secret Key | | String
+| *region* (common) | Amazon AWS Region | | String
+| *resolveProperty Placeholders* (advanced) | Whether the component should resolve property placeholders on itself when starting. Only properties which are of String type can use property placeholders. | true | boolean
+|===
// component options: END
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisComponent.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisComponent.java
index 34789af..e1a3385 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisComponent.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisComponent.java
@@ -21,24 +21,91 @@ import java.util.Map;
import org.apache.camel.CamelContext;
import org.apache.camel.Endpoint;
import org.apache.camel.impl.DefaultComponent;
+import org.apache.camel.spi.Metadata;
+import org.apache.camel.util.ObjectHelper;
public class KinesisComponent extends DefaultComponent {
+ @Metadata
+ private String accessKey;
+ @Metadata
+ private String secretKey;
+ @Metadata
+ private String region;
+ @Metadata(label = "advanced")
+ private KinesisConfiguration configuration;
+
public KinesisComponent() {
this(null);
}
public KinesisComponent(CamelContext context) {
super(context);
+
+ this.configuration = new KinesisConfiguration();
}
@Override
protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
- KinesisConfiguration configuration = new KinesisConfiguration();
+ KinesisConfiguration configuration = this.configuration.copy();
configuration.setStreamName(remaining);
setProperties(configuration, parameters);
+ if (ObjectHelper.isEmpty(configuration.getAccessKey())) {
+ setAccessKey(accessKey);
+ }
+ if (ObjectHelper.isEmpty(configuration.getSecretKey())) {
+ setSecretKey(secretKey);
+ }
+ if (ObjectHelper.isEmpty(configuration.getRegion())) {
+ setRegion(region);
+ }
+
KinesisEndpoint endpoint = new KinesisEndpoint(uri, configuration, this);
return endpoint;
}
+
+ public KinesisConfiguration getConfiguration() {
+ return configuration;
+ }
+
+ /**
+ * The AWS S3 default configuration
+ */
+ public void setConfiguration(KinesisConfiguration configuration) {
+ this.configuration = configuration;
+ }
+
+ public String getAccessKey() {
+ return configuration.getAccessKey();
+ }
+
+ /**
+ * Amazon AWS Access Key
+ */
+ public void setAccessKey(String accessKey) {
+ configuration.setAccessKey(accessKey);
+ }
+
+ public String getSecretKey() {
+ return configuration.getSecretKey();
+ }
+
+ /**
+ * Amazon AWS Secret Key
+ */
+ public void setSecretKey(String secretKey) {
+ configuration.setSecretKey(secretKey);
+ }
+
+ public String getRegion() {
+ return configuration.getRegion();
+ }
+
+ /**
+ * Amazon AWS Region
+ */
+ public void setRegion(String region) {
+ configuration.setRegion(region);
+ }
}
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConfiguration.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConfiguration.java
index a6d38e3..820492b 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConfiguration.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConfiguration.java
@@ -19,12 +19,14 @@ package org.apache.camel.component.aws.kinesis;
import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.model.ShardIteratorType;
+import org.apache.camel.RuntimeCamelException;
import org.apache.camel.spi.Metadata;
import org.apache.camel.spi.UriParam;
import org.apache.camel.spi.UriParams;
import org.apache.camel.spi.UriPath;
+
@UriParams
-public class KinesisConfiguration {
+public class KinesisConfiguration implements Cloneable {
@UriPath(description = "Name of the stream")
@Metadata(required = "true")
@@ -150,4 +152,16 @@ public class KinesisConfiguration {
public void setProxyPort(Integer proxyPort) {
this.proxyPort = proxyPort;
}
+
+ // *************************************************
+ //
+ // *************************************************
+
+ public KinesisConfiguration copy() {
+ try {
+ return (KinesisConfiguration)super.clone();
+ } catch (CloneNotSupportedException e) {
+ throw new RuntimeCamelException(e);
+ }
+ }
}
diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisComponentConfigurationTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisComponentConfigurationTest.java
index 0e45169..20bc912 100644
--- a/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisComponentConfigurationTest.java
+++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisComponentConfigurationTest.java
@@ -16,6 +16,8 @@
*/
package org.apache.camel.component.aws.kinesis;
+import com.amazonaws.regions.Regions;
+
import org.apache.camel.test.junit4.CamelTestSupport;
import org.junit.Test;
@@ -31,4 +33,30 @@ public class KinesisComponentConfigurationTest extends CamelTestSupport {
assertEquals("yyyyy", endpoint.getConfiguration().getSecretKey());
}
+ @Test
+ public void createEndpointWithComponentElements() throws Exception {
+ KinesisComponent component = new KinesisComponent(context);
+ component.setAccessKey("XXX");
+ component.setSecretKey("YYY");
+ KinesisEndpoint endpoint = (KinesisEndpoint)component.createEndpoint("aws-kinesis://some_stream_name");
+
+ assertEquals("some_stream_name", endpoint.getConfiguration().getStreamName());
+ assertEquals("XXX", endpoint.getConfiguration().getAccessKey());
+ assertEquals("YYY", endpoint.getConfiguration().getSecretKey());
+ }
+
+ @Test
+ public void createEndpointWithComponentAndEndpointElements() throws Exception {
+ KinesisComponent component = new KinesisComponent(context);
+ component.setAccessKey("XXX");
+ component.setSecretKey("YYY");
+ component.setRegion(Regions.US_WEST_1.toString());
+ KinesisEndpoint endpoint = (KinesisEndpoint)component.createEndpoint("aws-kinesis://some_stream_name?accessKey=xxxxxx&secretKey=yyyyy®ion=US_EAST_1");
+
+ assertEquals("some_stream_name", endpoint.getConfiguration().getStreamName());
+ assertEquals("xxxxxx", endpoint.getConfiguration().getAccessKey());
+ assertEquals("yyyyy", endpoint.getConfiguration().getSecretKey());
+ assertEquals("US_EAST_1", endpoint.getConfiguration().getRegion());
+ }
+
}
\ No newline at end of file
diff --git a/platforms/spring-boot/components-starter/camel-aws-starter/src/main/java/org/apache/camel/component/aws/kinesis/springboot/KinesisComponentConfiguration.java b/platforms/spring-boot/components-starter/camel-aws-starter/src/main/java/org/apache/camel/component/aws/kinesis/springboot/KinesisComponentConfiguration.java
index 464ab9f..197461b 100644
--- a/platforms/spring-boot/components-starter/camel-aws-starter/src/main/java/org/apache/camel/component/aws/kinesis/springboot/KinesisComponentConfiguration.java
+++ b/platforms/spring-boot/components-starter/camel-aws-starter/src/main/java/org/apache/camel/component/aws/kinesis/springboot/KinesisComponentConfiguration.java
@@ -17,6 +17,10 @@
package org.apache.camel.component.aws.kinesis.springboot;
import javax.annotation.Generated;
+import com.amazonaws.services.kinesis.AmazonKinesis;
+import com.amazonaws.services.kinesis.model.ShardIteratorType;
+import org.apache.camel.component.aws.kinesis.KinesisComponent;
+import org.apache.camel.component.aws.kinesis.KinesisShardClosedStrategyEnum;
import org.apache.camel.spring.boot.ComponentConfigurationPropertiesCommon;
import org.springframework.boot.context.properties.ConfigurationProperties;
@@ -33,12 +37,61 @@ public class KinesisComponentConfiguration
ComponentConfigurationPropertiesCommon {
/**
+ * The AWS S3 default configuration
+ */
+ private KinesisConfigurationNestedConfiguration configuration;
+ /**
+ * Amazon AWS Access Key
+ */
+ private String accessKey;
+ /**
+ * Amazon AWS Secret Key
+ */
+ private String secretKey;
+ /**
+ * Amazon AWS Region
+ */
+ private String region;
+ /**
* Whether the component should resolve property placeholders on itself when
* starting. Only properties which are of String type can use property
* placeholders.
*/
private Boolean resolvePropertyPlaceholders = true;
+ public KinesisConfigurationNestedConfiguration getConfiguration() {
+ return configuration;
+ }
+
+ public void setConfiguration(
+ KinesisConfigurationNestedConfiguration configuration) {
+ this.configuration = configuration;
+ }
+
+ public String getAccessKey() {
+ return accessKey;
+ }
+
+ public void setAccessKey(String accessKey) {
+ this.accessKey = accessKey;
+ }
+
+ public String getSecretKey() {
+ return secretKey;
+ }
+
+ public void setSecretKey(String secretKey) {
+ this.secretKey = secretKey;
+ }
+
+ public String getRegion() {
+ return region;
+ }
+
+ public void setRegion(String region) {
+ this.region = region;
+ }
+
public Boolean getResolvePropertyPlaceholders() {
return resolvePropertyPlaceholders;
}
@@ -47,4 +100,116 @@ public class KinesisComponentConfiguration
Boolean resolvePropertyPlaceholders) {
this.resolvePropertyPlaceholders = resolvePropertyPlaceholders;
}
+
+ public static class KinesisConfigurationNestedConfiguration {
+ public static final Class CAMEL_NESTED_CLASS = org.apache.camel.component.aws.kinesis.KinesisConfiguration.class;
+ private AmazonKinesis amazonKinesisClient;
+ private Integer maxResultsPerRequest = 1;
+ private String streamName;
+ private ShardIteratorType iteratorType = ShardIteratorType.TRIM_HORIZON;
+ private String shardId;
+ private String sequenceNumber;
+ private KinesisShardClosedStrategyEnum shardClosed = KinesisShardClosedStrategyEnum.ignore;
+ private String accessKey;
+ private String secretKey;
+ private String region;
+ private String proxyHost;
+ private Integer proxyPort;
+
+ public AmazonKinesis getAmazonKinesisClient() {
+ return amazonKinesisClient;
+ }
+
+ public void setAmazonKinesisClient(AmazonKinesis amazonKinesisClient) {
+ this.amazonKinesisClient = amazonKinesisClient;
+ }
+
+ public Integer getMaxResultsPerRequest() {
+ return maxResultsPerRequest;
+ }
+
+ public void setMaxResultsPerRequest(Integer maxResultsPerRequest) {
+ this.maxResultsPerRequest = maxResultsPerRequest;
+ }
+
+ public String getStreamName() {
+ return streamName;
+ }
+
+ public void setStreamName(String streamName) {
+ this.streamName = streamName;
+ }
+
+ public ShardIteratorType getIteratorType() {
+ return iteratorType;
+ }
+
+ public void setIteratorType(ShardIteratorType iteratorType) {
+ this.iteratorType = iteratorType;
+ }
+
+ public String getShardId() {
+ return shardId;
+ }
+
+ public void setShardId(String shardId) {
+ this.shardId = shardId;
+ }
+
+ public String getSequenceNumber() {
+ return sequenceNumber;
+ }
+
+ public void setSequenceNumber(String sequenceNumber) {
+ this.sequenceNumber = sequenceNumber;
+ }
+
+ public KinesisShardClosedStrategyEnum getShardClosed() {
+ return shardClosed;
+ }
+
+ public void setShardClosed(KinesisShardClosedStrategyEnum shardClosed) {
+ this.shardClosed = shardClosed;
+ }
+
+ public String getAccessKey() {
+ return accessKey;
+ }
+
+ public void setAccessKey(String accessKey) {
+ this.accessKey = accessKey;
+ }
+
+ public String getSecretKey() {
+ return secretKey;
+ }
+
+ public void setSecretKey(String secretKey) {
+ this.secretKey = secretKey;
+ }
+
+ public String getRegion() {
+ return region;
+ }
+
+ public void setRegion(String region) {
+ this.region = region;
+ }
+
+ public String getProxyHost() {
+ return proxyHost;
+ }
+
+ public void setProxyHost(String proxyHost) {
+ this.proxyHost = proxyHost;
+ }
+
+ public Integer getProxyPort() {
+ return proxyPort;
+ }
+
+ public void setProxyPort(Integer proxyPort) {
+ this.proxyPort = proxyPort;
+ }
+ }
}
\ No newline at end of file
--
To stop receiving notification emails like this one, please contact
acosentino@apache.org.