You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2018/01/22 10:54:44 UTC

[camel] 03/03: CAMEL-12175 - Camel-AWS Kinesis Firehose: Expose options to avoid a required client in the registry

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 4a16e066b846fc2bbfbb964e425703cb9c8501f8
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Mon Jan 22 11:53:26 2018 +0100

    CAMEL-12175 - Camel-AWS Kinesis Firehose: Expose options to avoid a required client in the registry
---
 .../aws/firehose/KinesisFirehoseConfiguration.java | 51 ++++++++++++++++++++-
 .../aws/firehose/KinesisFirehoseEndpoint.java      | 53 +++++++++++++++++++++-
 .../aws/firehose/KinesisFirehoseEndpointTest.java  | 14 ++++++
 3 files changed, 116 insertions(+), 2 deletions(-)

diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/firehose/KinesisFirehoseConfiguration.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/firehose/KinesisFirehoseConfiguration.java
index 7c6201f..699a2ca 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/firehose/KinesisFirehoseConfiguration.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/firehose/KinesisFirehoseConfiguration.java
@@ -29,9 +29,18 @@ public class KinesisFirehoseConfiguration {
     @UriPath(description = "Name of the stream")
     @Metadata(required = "true")
     private String streamName;
+    @UriParam(label = "security", secret = true, description = "Amazon AWS Access Key")
+    private String accessKey;
+    @UriParam(label = "security", secret = true, description = "Amazon AWS Secret Key")
+    private String secretKey;
+    @UriParam(description = "The region in which Kinesis client needs to work")
+    private String region;
     @UriParam(description = "Amazon Kinesis Firehose client to use for all requests for this endpoint")
-    @Metadata(required = "true")
     private AmazonKinesisFirehose amazonKinesisFirehoseClient;
+    @UriParam(description = "To define a proxy host when instantiating the DDBStreams client")
+    private String proxyHost;
+    @UriParam(description = "To define a proxy port when instantiating the DDBStreams client")
+    private Integer proxyPort;
     
     public void setAmazonKinesisFirehoseClient(AmazonKinesisFirehose client) {
         this.amazonKinesisFirehoseClient = client;
@@ -48,4 +57,44 @@ public class KinesisFirehoseConfiguration {
     public String getStreamName() {
         return streamName;
     }
+
+    public String getAccessKey() {
+        return accessKey;
+    }
+
+    public void setAccessKey(String accessKey) {
+        this.accessKey = accessKey;
+    }
+
+    public String getSecretKey() {
+        return secretKey;
+    }
+
+    public void setSecretKey(String secretKey) {
+        this.secretKey = secretKey;
+    }
+
+    public String getRegion() {
+        return region;
+    }
+
+    public void setRegion(String region) {
+        this.region = region;
+    }
+
+    public String getProxyHost() {
+        return proxyHost;
+    }
+
+    public void setProxyHost(String proxyHost) {
+        this.proxyHost = proxyHost;
+    }
+
+    public Integer getProxyPort() {
+        return proxyPort;
+    }
+
+    public void setProxyPort(Integer proxyPort) {
+        this.proxyPort = proxyPort;
+    }
 }
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/firehose/KinesisFirehoseEndpoint.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/firehose/KinesisFirehoseEndpoint.java
index cdeb4d4..3f5fafe 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/firehose/KinesisFirehoseEndpoint.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/firehose/KinesisFirehoseEndpoint.java
@@ -16,13 +16,21 @@
  */
 package org.apache.camel.component.aws.firehose;
 
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
 import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehose;
+import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehoseClientBuilder;
+
 import org.apache.camel.Consumer;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
 import org.apache.camel.impl.DefaultEndpoint;
 import org.apache.camel.spi.UriEndpoint;
 import org.apache.camel.spi.UriParam;
+import org.apache.camel.util.ObjectHelper;
 
 /**
  * The aws-kinesis-firehose component is used for producing Amazon's Kinesis Firehose streams.
@@ -33,6 +41,8 @@ public class KinesisFirehoseEndpoint extends DefaultEndpoint {
 
     @UriParam
     private KinesisFirehoseConfiguration configuration;
+    
+    private AmazonKinesisFirehose kinesisFirehoseClient;
 
     public KinesisFirehoseEndpoint(String uri, KinesisFirehoseConfiguration configuration, KinesisFirehoseComponent component) {
         super(uri, component);
@@ -48,14 +58,55 @@ public class KinesisFirehoseEndpoint extends DefaultEndpoint {
     public Consumer createConsumer(Processor processor) throws Exception {
         throw new UnsupportedOperationException("You cannot consume messages from this endpoint");
     }
+    
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+        kinesisFirehoseClient = configuration.getAmazonKinesisFirehoseClient() != null ? configuration.getAmazonKinesisFirehoseClient()
+            : createKinesisFirehoseClient();
+               
+    }
 
     @Override
     public boolean isSingleton() {
         return true;
     }
+    
+    AmazonKinesisFirehose createKinesisFirehoseClient() {
+        AmazonKinesisFirehose client = null;
+        ClientConfiguration clientConfiguration = null;
+        AmazonKinesisFirehoseClientBuilder clientBuilder = null;
+        boolean isClientConfigFound = false;
+        if (ObjectHelper.isNotEmpty(configuration.getProxyHost()) && ObjectHelper.isNotEmpty(configuration.getProxyPort())) {
+            clientConfiguration = new ClientConfiguration();
+            clientConfiguration.setProxyHost(configuration.getProxyHost());
+            clientConfiguration.setProxyPort(configuration.getProxyPort());
+            isClientConfigFound = true;
+        }
+        if (configuration.getAccessKey() != null && configuration.getSecretKey() != null) {
+            AWSCredentials credentials = new BasicAWSCredentials(configuration.getAccessKey(), configuration.getSecretKey());
+            AWSCredentialsProvider credentialsProvider = new AWSStaticCredentialsProvider(credentials);
+            if (isClientConfigFound) {
+                clientBuilder = AmazonKinesisFirehoseClientBuilder.standard().withClientConfiguration(clientConfiguration).withCredentials(credentialsProvider);
+            } else {
+                clientBuilder = AmazonKinesisFirehoseClientBuilder.standard().withCredentials(credentialsProvider);
+            }
+        } else {
+            if (isClientConfigFound) {
+                clientBuilder = AmazonKinesisFirehoseClientBuilder.standard();
+            } else {
+                clientBuilder = AmazonKinesisFirehoseClientBuilder.standard().withClientConfiguration(clientConfiguration);
+            }
+        }
+        if (ObjectHelper.isNotEmpty(configuration.getRegion())) {
+            clientBuilder = clientBuilder.withRegion(configuration.getRegion());
+        }
+        client = clientBuilder.build();
+        return client;
+    }
 
     public AmazonKinesisFirehose getClient() {
-        return configuration.getAmazonKinesisFirehoseClient();
+        return kinesisFirehoseClient;
     }
 
     public KinesisFirehoseConfiguration getConfiguration() {
diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/firehose/KinesisFirehoseEndpointTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/firehose/KinesisFirehoseEndpointTest.java
index ac8cdf1..391bb5d 100644
--- a/components/camel-aws/src/test/java/org/apache/camel/component/aws/firehose/KinesisFirehoseEndpointTest.java
+++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/firehose/KinesisFirehoseEndpointTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.component.aws.firehose;
 
+import com.amazonaws.regions.Regions;
 import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehose;
 import org.apache.camel.CamelContext;
 import org.apache.camel.impl.DefaultCamelContext;
@@ -49,8 +50,21 @@ public class KinesisFirehoseEndpointTest {
         KinesisFirehoseEndpoint endpoint = (KinesisFirehoseEndpoint) camelContext.getEndpoint("aws-kinesis-firehose://some_stream_name"
                 + "?amazonKinesisFirehoseClient=#firehoseClient"
         );
+        endpoint.start();
 
         assertThat(endpoint.getClient(), is(amazonKinesisFirehoseClient));
         assertThat(endpoint.getConfiguration().getStreamName(), is("some_stream_name"));
     }
+    
+    @Test
+    public void allClientCreationParams() throws Exception {
+        KinesisFirehoseEndpoint endpoint = (KinesisFirehoseEndpoint) camelContext.getEndpoint("aws-kinesis-firehose://some_stream_name"
+                + "?accessKey=xxx&secretKey=yyy&region=us-east-1"
+        );
+
+        assertThat(endpoint.getConfiguration().getRegion(), is(Regions.US_EAST_1.getName()));
+        assertThat(endpoint.getConfiguration().getAccessKey(), is("xxx"));
+        assertThat(endpoint.getConfiguration().getSecretKey(), is("yyy"));
+        assertThat(endpoint.getConfiguration().getStreamName(), is("some_stream_name"));
+    }
 }

-- 
To stop receiving notification emails like this one, please contact
acosentino@apache.org.