You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2018/01/22 10:54:44 UTC
[camel] 03/03: CAMEL-12175 - Camel-AWS Kinesis Firehose: Expose
options to avoid a required client in the registry
This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 4a16e066b846fc2bbfbb964e425703cb9c8501f8
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Mon Jan 22 11:53:26 2018 +0100
CAMEL-12175 - Camel-AWS Kinesis Firehose: Expose options to avoid a required client in the registry
---
.../aws/firehose/KinesisFirehoseConfiguration.java | 51 ++++++++++++++++++++-
.../aws/firehose/KinesisFirehoseEndpoint.java | 53 +++++++++++++++++++++-
.../aws/firehose/KinesisFirehoseEndpointTest.java | 14 ++++++
3 files changed, 116 insertions(+), 2 deletions(-)
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/firehose/KinesisFirehoseConfiguration.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/firehose/KinesisFirehoseConfiguration.java
index 7c6201f..699a2ca 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/firehose/KinesisFirehoseConfiguration.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/firehose/KinesisFirehoseConfiguration.java
@@ -29,9 +29,18 @@ public class KinesisFirehoseConfiguration {
@UriPath(description = "Name of the stream")
@Metadata(required = "true")
private String streamName;
+ @UriParam(label = "security", secret = true, description = "Amazon AWS Access Key")
+ private String accessKey;
+ @UriParam(label = "security", secret = true, description = "Amazon AWS Secret Key")
+ private String secretKey;
+ @UriParam(description = "The region in which Kinesis client needs to work")
+ private String region;
@UriParam(description = "Amazon Kinesis Firehose client to use for all requests for this endpoint")
- @Metadata(required = "true")
private AmazonKinesisFirehose amazonKinesisFirehoseClient;
+ @UriParam(description = "To define a proxy host when instantiating the DDBStreams client")
+ private String proxyHost;
+ @UriParam(description = "To define a proxy port when instantiating the DDBStreams client")
+ private Integer proxyPort;
public void setAmazonKinesisFirehoseClient(AmazonKinesisFirehose client) {
this.amazonKinesisFirehoseClient = client;
@@ -48,4 +57,44 @@ public class KinesisFirehoseConfiguration {
public String getStreamName() {
return streamName;
}
+
+ public String getAccessKey() {
+ return accessKey;
+ }
+
+ public void setAccessKey(String accessKey) {
+ this.accessKey = accessKey;
+ }
+
+ public String getSecretKey() {
+ return secretKey;
+ }
+
+ public void setSecretKey(String secretKey) {
+ this.secretKey = secretKey;
+ }
+
+ public String getRegion() {
+ return region;
+ }
+
+ public void setRegion(String region) {
+ this.region = region;
+ }
+
+ public String getProxyHost() {
+ return proxyHost;
+ }
+
+ public void setProxyHost(String proxyHost) {
+ this.proxyHost = proxyHost;
+ }
+
+ public Integer getProxyPort() {
+ return proxyPort;
+ }
+
+ public void setProxyPort(Integer proxyPort) {
+ this.proxyPort = proxyPort;
+ }
}
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/firehose/KinesisFirehoseEndpoint.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/firehose/KinesisFirehoseEndpoint.java
index cdeb4d4..3f5fafe 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/firehose/KinesisFirehoseEndpoint.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/firehose/KinesisFirehoseEndpoint.java
@@ -16,13 +16,21 @@
*/
package org.apache.camel.component.aws.firehose;
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehose;
+import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehoseClientBuilder;
+
import org.apache.camel.Consumer;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.impl.DefaultEndpoint;
import org.apache.camel.spi.UriEndpoint;
import org.apache.camel.spi.UriParam;
+import org.apache.camel.util.ObjectHelper;
/**
* The aws-kinesis-firehose component is used for producing Amazon's Kinesis Firehose streams.
@@ -33,6 +41,8 @@ public class KinesisFirehoseEndpoint extends DefaultEndpoint {
@UriParam
private KinesisFirehoseConfiguration configuration;
+
+ private AmazonKinesisFirehose kinesisFirehoseClient;
public KinesisFirehoseEndpoint(String uri, KinesisFirehoseConfiguration configuration, KinesisFirehoseComponent component) {
super(uri, component);
@@ -48,14 +58,55 @@ public class KinesisFirehoseEndpoint extends DefaultEndpoint {
public Consumer createConsumer(Processor processor) throws Exception {
throw new UnsupportedOperationException("You cannot consume messages from this endpoint");
}
+
+ @Override
+ protected void doStart() throws Exception {
+ super.doStart();
+ kinesisFirehoseClient = configuration.getAmazonKinesisFirehoseClient() != null ? configuration.getAmazonKinesisFirehoseClient()
+ : createKinesisFirehoseClient();
+
+ }
@Override
public boolean isSingleton() {
return true;
}
+
+ AmazonKinesisFirehose createKinesisFirehoseClient() {
+ AmazonKinesisFirehose client = null;
+ ClientConfiguration clientConfiguration = null;
+ AmazonKinesisFirehoseClientBuilder clientBuilder = null;
+ boolean isClientConfigFound = false;
+ if (ObjectHelper.isNotEmpty(configuration.getProxyHost()) && ObjectHelper.isNotEmpty(configuration.getProxyPort())) {
+ clientConfiguration = new ClientConfiguration();
+ clientConfiguration.setProxyHost(configuration.getProxyHost());
+ clientConfiguration.setProxyPort(configuration.getProxyPort());
+ isClientConfigFound = true;
+ }
+ if (configuration.getAccessKey() != null && configuration.getSecretKey() != null) {
+ AWSCredentials credentials = new BasicAWSCredentials(configuration.getAccessKey(), configuration.getSecretKey());
+ AWSCredentialsProvider credentialsProvider = new AWSStaticCredentialsProvider(credentials);
+ if (isClientConfigFound) {
+ clientBuilder = AmazonKinesisFirehoseClientBuilder.standard().withClientConfiguration(clientConfiguration).withCredentials(credentialsProvider);
+ } else {
+ clientBuilder = AmazonKinesisFirehoseClientBuilder.standard().withCredentials(credentialsProvider);
+ }
+ } else {
+ if (isClientConfigFound) {
+ clientBuilder = AmazonKinesisFirehoseClientBuilder.standard();
+ } else {
+ clientBuilder = AmazonKinesisFirehoseClientBuilder.standard().withClientConfiguration(clientConfiguration);
+ }
+ }
+ if (ObjectHelper.isNotEmpty(configuration.getRegion())) {
+ clientBuilder = clientBuilder.withRegion(configuration.getRegion());
+ }
+ client = clientBuilder.build();
+ return client;
+ }
public AmazonKinesisFirehose getClient() {
- return configuration.getAmazonKinesisFirehoseClient();
+ return kinesisFirehoseClient;
}
public KinesisFirehoseConfiguration getConfiguration() {
diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/firehose/KinesisFirehoseEndpointTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/firehose/KinesisFirehoseEndpointTest.java
index ac8cdf1..391bb5d 100644
--- a/components/camel-aws/src/test/java/org/apache/camel/component/aws/firehose/KinesisFirehoseEndpointTest.java
+++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/firehose/KinesisFirehoseEndpointTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.camel.component.aws.firehose;
+import com.amazonaws.regions.Regions;
import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehose;
import org.apache.camel.CamelContext;
import org.apache.camel.impl.DefaultCamelContext;
@@ -49,8 +50,21 @@ public class KinesisFirehoseEndpointTest {
KinesisFirehoseEndpoint endpoint = (KinesisFirehoseEndpoint) camelContext.getEndpoint("aws-kinesis-firehose://some_stream_name"
+ "?amazonKinesisFirehoseClient=#firehoseClient"
);
+ endpoint.start();
assertThat(endpoint.getClient(), is(amazonKinesisFirehoseClient));
assertThat(endpoint.getConfiguration().getStreamName(), is("some_stream_name"));
}
+
+ @Test
+ public void allClientCreationParams() throws Exception {
+ KinesisFirehoseEndpoint endpoint = (KinesisFirehoseEndpoint) camelContext.getEndpoint("aws-kinesis-firehose://some_stream_name"
+ + "?accessKey=xxx&secretKey=yyy®ion=us-east-1"
+ );
+
+ assertThat(endpoint.getConfiguration().getRegion(), is(Regions.US_EAST_1.getName()));
+ assertThat(endpoint.getConfiguration().getAccessKey(), is("xxx"));
+ assertThat(endpoint.getConfiguration().getSecretKey(), is("yyy"));
+ assertThat(endpoint.getConfiguration().getStreamName(), is("some_stream_name"));
+ }
}
--
To stop receiving notification emails like this one, please contact
acosentino@apache.org.