You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2018/01/22 08:59:52 UTC

[camel] branch master updated: CAMEL-12172 - Camel-AWS Kinesis: Add the ability to specify credentials and region at component level

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new 3701f2d  CAMEL-12172 - Camel-AWS Kinesis: Add the ability to specify credentials and region at component level
3701f2d is described below

commit 3701f2d21052799a5d907dfe3776e1c014062ff0
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Mon Jan 22 09:57:03 2018 +0100

    CAMEL-12172 - Camel-AWS Kinesis: Add the ability to specify credentials and region at component level
---
 .../src/main/docs/aws-kinesis-component.adoc       |  14 +-
 .../component/aws/kinesis/KinesisComponent.java    |  69 ++++++++-
 .../aws/kinesis/KinesisConfiguration.java          |  16 +-
 .../kinesis/KinesisComponentConfigurationTest.java |  28 ++++
 .../springboot/KinesisComponentConfiguration.java  | 165 +++++++++++++++++++++
 5 files changed, 289 insertions(+), 3 deletions(-)

diff --git a/components/camel-aws/src/main/docs/aws-kinesis-component.adoc b/components/camel-aws/src/main/docs/aws-kinesis-component.adoc
index 559a89c..a570cb2 100644
--- a/components/camel-aws/src/main/docs/aws-kinesis-component.adoc
+++ b/components/camel-aws/src/main/docs/aws-kinesis-component.adoc
@@ -26,7 +26,19 @@ The stream needs to be created prior to it being used. +
 
 
 // component options: START
-The AWS Kinesis component has no options.
+The AWS Kinesis component supports 5 options which are listed below.
+
+
+
+[width="100%",cols="2,5,^1,2",options="header"]
+|===
+| Name | Description | Default | Type
+| *configuration* (advanced) | The AWS S3 default configuration |  | KinesisConfiguration
+| *accessKey* (common) | Amazon AWS Access Key |  | String
+| *secretKey* (common) | Amazon AWS Secret Key |  | String
+| *region* (common) | Amazon AWS Region |  | String
+| *resolveProperty Placeholders* (advanced) | Whether the component should resolve property placeholders on itself when starting. Only properties which are of String type can use property placeholders. | true | boolean
+|===
 // component options: END
 
 
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisComponent.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisComponent.java
index 34789af..e1a3385 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisComponent.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisComponent.java
@@ -21,24 +21,91 @@ import java.util.Map;
 import org.apache.camel.CamelContext;
 import org.apache.camel.Endpoint;
 import org.apache.camel.impl.DefaultComponent;
+import org.apache.camel.spi.Metadata;
+import org.apache.camel.util.ObjectHelper;
 
 public class KinesisComponent extends DefaultComponent {
 
+    @Metadata
+    private String accessKey;
+    @Metadata
+    private String secretKey;
+    @Metadata
+    private String region;
+    @Metadata(label = "advanced")    
+    private KinesisConfiguration configuration;
+
     public KinesisComponent() {
         this(null);
     }
 
     public KinesisComponent(CamelContext context) {
         super(context);
+        
+        this.configuration = new KinesisConfiguration();
     }
 
     @Override
     protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
-        KinesisConfiguration configuration = new KinesisConfiguration();
+        KinesisConfiguration configuration = this.configuration.copy();
         configuration.setStreamName(remaining);
         setProperties(configuration, parameters);
         
+        if (ObjectHelper.isEmpty(configuration.getAccessKey())) {
+            setAccessKey(accessKey);
+        }
+        if (ObjectHelper.isEmpty(configuration.getSecretKey())) {
+            setSecretKey(secretKey);
+        }
+        if (ObjectHelper.isEmpty(configuration.getRegion())) {
+            setRegion(region);
+        }
+        
         KinesisEndpoint endpoint = new KinesisEndpoint(uri, configuration, this);
         return endpoint;
     }
+    
+    public KinesisConfiguration getConfiguration() {
+        return configuration;
+    }
+
+    /**
+     * The AWS S3 default configuration
+     */
+    public void setConfiguration(KinesisConfiguration configuration) {
+        this.configuration = configuration;
+    }
+    
+    public String getAccessKey() {
+        return configuration.getAccessKey();
+    }
+
+    /**
+     * Amazon AWS Access Key
+     */
+    public void setAccessKey(String accessKey) {
+        configuration.setAccessKey(accessKey);
+    }
+
+    public String getSecretKey() {
+        return configuration.getSecretKey();
+    }
+
+    /**
+     * Amazon AWS Secret Key
+     */
+    public void setSecretKey(String secretKey) {
+        configuration.setSecretKey(secretKey);
+    }
+    
+    public String getRegion() {
+        return configuration.getRegion();
+    }
+
+    /**
+     * Amazon AWS Region
+     */
+    public void setRegion(String region) {
+        configuration.setRegion(region);
+    }
 }
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConfiguration.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConfiguration.java
index a6d38e3..820492b 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConfiguration.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConfiguration.java
@@ -19,12 +19,14 @@ package org.apache.camel.component.aws.kinesis;
 import com.amazonaws.services.kinesis.AmazonKinesis;
 import com.amazonaws.services.kinesis.model.ShardIteratorType;
 
+import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.spi.Metadata;
 import org.apache.camel.spi.UriParam;
 import org.apache.camel.spi.UriParams;
 import org.apache.camel.spi.UriPath;
+
 @UriParams
-public class KinesisConfiguration {
+public class KinesisConfiguration implements Cloneable {
     
     @UriPath(description = "Name of the stream")
     @Metadata(required = "true")
@@ -150,4 +152,16 @@ public class KinesisConfiguration {
     public void setProxyPort(Integer proxyPort) {
         this.proxyPort = proxyPort;
     }   
+    
+    // *************************************************
+    //
+    // *************************************************
+
+    public KinesisConfiguration copy() {
+        try {
+            return (KinesisConfiguration)super.clone();
+        } catch (CloneNotSupportedException e) {
+            throw new RuntimeCamelException(e);
+        }
+    }
 }
diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisComponentConfigurationTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisComponentConfigurationTest.java
index 0e45169..20bc912 100644
--- a/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisComponentConfigurationTest.java
+++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/kinesis/KinesisComponentConfigurationTest.java
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.component.aws.kinesis;
 
+import com.amazonaws.regions.Regions;
+
 import org.apache.camel.test.junit4.CamelTestSupport;
 import org.junit.Test;
 
@@ -31,4 +33,30 @@ public class KinesisComponentConfigurationTest extends CamelTestSupport {
         assertEquals("yyyyy", endpoint.getConfiguration().getSecretKey());    
     }
     
+    @Test
+    public void createEndpointWithComponentElements() throws Exception {
+        KinesisComponent component = new KinesisComponent(context);
+        component.setAccessKey("XXX");
+        component.setSecretKey("YYY");
+        KinesisEndpoint endpoint = (KinesisEndpoint)component.createEndpoint("aws-kinesis://some_stream_name");
+        
+        assertEquals("some_stream_name", endpoint.getConfiguration().getStreamName());
+        assertEquals("XXX", endpoint.getConfiguration().getAccessKey());
+        assertEquals("YYY", endpoint.getConfiguration().getSecretKey());
+    }
+    
+    @Test
+    public void createEndpointWithComponentAndEndpointElements() throws Exception {
+        KinesisComponent component = new KinesisComponent(context);
+        component.setAccessKey("XXX");
+        component.setSecretKey("YYY");
+        component.setRegion(Regions.US_WEST_1.toString());
+        KinesisEndpoint endpoint = (KinesisEndpoint)component.createEndpoint("aws-kinesis://some_stream_name?accessKey=xxxxxx&secretKey=yyyyy&region=US_EAST_1");
+        
+        assertEquals("some_stream_name", endpoint.getConfiguration().getStreamName());
+        assertEquals("xxxxxx", endpoint.getConfiguration().getAccessKey());
+        assertEquals("yyyyy", endpoint.getConfiguration().getSecretKey());
+        assertEquals("US_EAST_1", endpoint.getConfiguration().getRegion());
+    }
+    
 }
\ No newline at end of file
diff --git a/platforms/spring-boot/components-starter/camel-aws-starter/src/main/java/org/apache/camel/component/aws/kinesis/springboot/KinesisComponentConfiguration.java b/platforms/spring-boot/components-starter/camel-aws-starter/src/main/java/org/apache/camel/component/aws/kinesis/springboot/KinesisComponentConfiguration.java
index 464ab9f..197461b 100644
--- a/platforms/spring-boot/components-starter/camel-aws-starter/src/main/java/org/apache/camel/component/aws/kinesis/springboot/KinesisComponentConfiguration.java
+++ b/platforms/spring-boot/components-starter/camel-aws-starter/src/main/java/org/apache/camel/component/aws/kinesis/springboot/KinesisComponentConfiguration.java
@@ -17,6 +17,10 @@
 package org.apache.camel.component.aws.kinesis.springboot;
 
 import javax.annotation.Generated;
+import com.amazonaws.services.kinesis.AmazonKinesis;
+import com.amazonaws.services.kinesis.model.ShardIteratorType;
+import org.apache.camel.component.aws.kinesis.KinesisComponent;
+import org.apache.camel.component.aws.kinesis.KinesisShardClosedStrategyEnum;
 import org.apache.camel.spring.boot.ComponentConfigurationPropertiesCommon;
 import org.springframework.boot.context.properties.ConfigurationProperties;
 
@@ -33,12 +37,61 @@ public class KinesisComponentConfiguration
             ComponentConfigurationPropertiesCommon {
 
     /**
+     * The AWS S3 default configuration
+     */
+    private KinesisConfigurationNestedConfiguration configuration;
+    /**
+     * Amazon AWS Access Key
+     */
+    private String accessKey;
+    /**
+     * Amazon AWS Secret Key
+     */
+    private String secretKey;
+    /**
+     * Amazon AWS Region
+     */
+    private String region;
+    /**
      * Whether the component should resolve property placeholders on itself when
      * starting. Only properties which are of String type can use property
      * placeholders.
      */
     private Boolean resolvePropertyPlaceholders = true;
 
+    public KinesisConfigurationNestedConfiguration getConfiguration() {
+        return configuration;
+    }
+
+    public void setConfiguration(
+            KinesisConfigurationNestedConfiguration configuration) {
+        this.configuration = configuration;
+    }
+
+    public String getAccessKey() {
+        return accessKey;
+    }
+
+    public void setAccessKey(String accessKey) {
+        this.accessKey = accessKey;
+    }
+
+    public String getSecretKey() {
+        return secretKey;
+    }
+
+    public void setSecretKey(String secretKey) {
+        this.secretKey = secretKey;
+    }
+
+    public String getRegion() {
+        return region;
+    }
+
+    public void setRegion(String region) {
+        this.region = region;
+    }
+
     public Boolean getResolvePropertyPlaceholders() {
         return resolvePropertyPlaceholders;
     }
@@ -47,4 +100,116 @@ public class KinesisComponentConfiguration
             Boolean resolvePropertyPlaceholders) {
         this.resolvePropertyPlaceholders = resolvePropertyPlaceholders;
     }
+
+    public static class KinesisConfigurationNestedConfiguration {
+        public static final Class CAMEL_NESTED_CLASS = org.apache.camel.component.aws.kinesis.KinesisConfiguration.class;
+        private AmazonKinesis amazonKinesisClient;
+        private Integer maxResultsPerRequest = 1;
+        private String streamName;
+        private ShardIteratorType iteratorType = ShardIteratorType.TRIM_HORIZON;
+        private String shardId;
+        private String sequenceNumber;
+        private KinesisShardClosedStrategyEnum shardClosed = KinesisShardClosedStrategyEnum.ignore;
+        private String accessKey;
+        private String secretKey;
+        private String region;
+        private String proxyHost;
+        private Integer proxyPort;
+
+        public AmazonKinesis getAmazonKinesisClient() {
+            return amazonKinesisClient;
+        }
+
+        public void setAmazonKinesisClient(AmazonKinesis amazonKinesisClient) {
+            this.amazonKinesisClient = amazonKinesisClient;
+        }
+
+        public Integer getMaxResultsPerRequest() {
+            return maxResultsPerRequest;
+        }
+
+        public void setMaxResultsPerRequest(Integer maxResultsPerRequest) {
+            this.maxResultsPerRequest = maxResultsPerRequest;
+        }
+
+        public String getStreamName() {
+            return streamName;
+        }
+
+        public void setStreamName(String streamName) {
+            this.streamName = streamName;
+        }
+
+        public ShardIteratorType getIteratorType() {
+            return iteratorType;
+        }
+
+        public void setIteratorType(ShardIteratorType iteratorType) {
+            this.iteratorType = iteratorType;
+        }
+
+        public String getShardId() {
+            return shardId;
+        }
+
+        public void setShardId(String shardId) {
+            this.shardId = shardId;
+        }
+
+        public String getSequenceNumber() {
+            return sequenceNumber;
+        }
+
+        public void setSequenceNumber(String sequenceNumber) {
+            this.sequenceNumber = sequenceNumber;
+        }
+
+        public KinesisShardClosedStrategyEnum getShardClosed() {
+            return shardClosed;
+        }
+
+        public void setShardClosed(KinesisShardClosedStrategyEnum shardClosed) {
+            this.shardClosed = shardClosed;
+        }
+
+        public String getAccessKey() {
+            return accessKey;
+        }
+
+        public void setAccessKey(String accessKey) {
+            this.accessKey = accessKey;
+        }
+
+        public String getSecretKey() {
+            return secretKey;
+        }
+
+        public void setSecretKey(String secretKey) {
+            this.secretKey = secretKey;
+        }
+
+        public String getRegion() {
+            return region;
+        }
+
+        public void setRegion(String region) {
+            this.region = region;
+        }
+
+        public String getProxyHost() {
+            return proxyHost;
+        }
+
+        public void setProxyHost(String proxyHost) {
+            this.proxyHost = proxyHost;
+        }
+
+        public Integer getProxyPort() {
+            return proxyPort;
+        }
+
+        public void setProxyPort(Integer proxyPort) {
+            this.proxyPort = proxyPort;
+        }
+    }
 }
\ No newline at end of file

-- 
To stop receiving notification emails like this one, please contact
acosentino@apache.org.