You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2018/01/19 10:00:54 UTC

[camel] branch master updated: CAMEL-12160 - Camel-AWS Kinesis Firehose: Use a configuration for the options like the other AWS components

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new f73e533  CAMEL-12160 - Camel-AWS Kinesis Firehose: Use a configuration for the options like the other AWS components
f73e533 is described below

commit f73e533d4fb90409441d8f0b53329099c85d3521
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Fri Jan 19 11:00:28 2018 +0100

    CAMEL-12160 - Camel-AWS Kinesis Firehose: Use a configuration for the options like the other AWS components
---
 .../aws/firehose/KinesisFirehoseComponent.java     |  6 ++-
 ...oint.java => KinesisFirehoseConfiguration.java} | 46 ++++++----------------
 .../aws/firehose/KinesisFirehoseEndpoint.java      | 22 ++++-------
 .../aws/firehose/KinesisFirehoseProducer.java      |  2 +-
 .../aws/kinesis/KinesisConfiguration.java          |  7 ++--
 .../component/aws/kinesis/KinesisConsumer.java     |  3 +-
 .../component/aws/kinesis/KinesisEndpoint.java     |  9 ++---
 .../aws/firehose/KinesisFirehoseEndpointTest.java  |  2 +-
 .../aws/firehose/KinesisFirehoseProducerTest.java  |  5 ++-
 9 files changed, 37 insertions(+), 65 deletions(-)

diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/firehose/KinesisFirehoseComponent.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/firehose/KinesisFirehoseComponent.java
index d640d56..369c695 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/firehose/KinesisFirehoseComponent.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/firehose/KinesisFirehoseComponent.java
@@ -34,8 +34,10 @@ public class KinesisFirehoseComponent extends DefaultComponent {
 
     @Override
     protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
-        KinesisFirehoseEndpoint endpoint = new KinesisFirehoseEndpoint(uri, remaining, this);
-        setProperties(endpoint, parameters);
+        KinesisFirehoseConfiguration configuration = new KinesisFirehoseConfiguration();
+        configuration.setStreamName(remaining);
+        setProperties(configuration, parameters);
+        KinesisFirehoseEndpoint endpoint = new KinesisFirehoseEndpoint(uri, configuration, this);
         return endpoint;
     }
 }
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/firehose/KinesisFirehoseEndpoint.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/firehose/KinesisFirehoseConfiguration.java
similarity index 57%
copy from components/camel-aws/src/main/java/org/apache/camel/component/aws/firehose/KinesisFirehoseEndpoint.java
copy to components/camel-aws/src/main/java/org/apache/camel/component/aws/firehose/KinesisFirehoseConfiguration.java
index 97512a9..7c6201f 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/firehose/KinesisFirehoseEndpoint.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/firehose/KinesisFirehoseConfiguration.java
@@ -17,22 +17,14 @@
 package org.apache.camel.component.aws.firehose;
 
 import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehose;
-import org.apache.camel.Consumer;
-import org.apache.camel.Processor;
-import org.apache.camel.Producer;
-import org.apache.camel.component.aws.kinesis.KinesisConsumer;
-import org.apache.camel.impl.DefaultEndpoint;
+
 import org.apache.camel.spi.Metadata;
-import org.apache.camel.spi.UriEndpoint;
 import org.apache.camel.spi.UriParam;
+import org.apache.camel.spi.UriParams;
 import org.apache.camel.spi.UriPath;
 
-/**
- * The aws-kinesis-firehose component is used for producing Amazon's Kinesis Firehose streams.
- */
-@UriEndpoint(firstVersion = "2.19.0", scheme = "aws-kinesis-firehose", title = "AWS Kinesis Firehose", syntax = "aws-kinesis-firehose:streamName",
-    producerOnly = true, label = "cloud,messaging")
-public class KinesisFirehoseEndpoint extends DefaultEndpoint {
+@UriParams
+public class KinesisFirehoseConfiguration {
 
     @UriPath(description = "Name of the stream")
     @Metadata(required = "true")
@@ -40,36 +32,20 @@ public class KinesisFirehoseEndpoint extends DefaultEndpoint {
     @UriParam(description = "Amazon Kinesis Firehose client to use for all requests for this endpoint")
     @Metadata(required = "true")
     private AmazonKinesisFirehose amazonKinesisFirehoseClient;
-
-    public KinesisFirehoseEndpoint(String uri, String streamName, KinesisFirehoseComponent component) {
-        super(uri, component);
-        this.streamName = streamName;
-    }
-
-    @Override
-    public Producer createProducer() throws Exception {
-        return new KinesisFirehoseProducer(this);
-    }
-
-    @Override
-    public Consumer createConsumer(Processor processor) throws Exception {
-        throw new UnsupportedOperationException("You cannot consume messages from this endpoint");
-    }
-
-    @Override
-    public boolean isSingleton() {
-        return true;
-    }
-
+    
     public void setAmazonKinesisFirehoseClient(AmazonKinesisFirehose client) {
         this.amazonKinesisFirehoseClient = client;
     }
 
-    public AmazonKinesisFirehose getClient() {
+    public AmazonKinesisFirehose getAmazonKinesisFirehoseClient() {
         return amazonKinesisFirehoseClient;
     }
 
+    public void setStreamName(String streamName) {
+        this.streamName = streamName;
+    }
+
     public String getStreamName() {
         return streamName;
     }
-}
\ No newline at end of file
+}
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/firehose/KinesisFirehoseEndpoint.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/firehose/KinesisFirehoseEndpoint.java
index 97512a9..7a79f9b 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/firehose/KinesisFirehoseEndpoint.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/firehose/KinesisFirehoseEndpoint.java
@@ -34,16 +34,12 @@ import org.apache.camel.spi.UriPath;
     producerOnly = true, label = "cloud,messaging")
 public class KinesisFirehoseEndpoint extends DefaultEndpoint {
 
-    @UriPath(description = "Name of the stream")
-    @Metadata(required = "true")
-    private String streamName;
-    @UriParam(description = "Amazon Kinesis Firehose client to use for all requests for this endpoint")
-    @Metadata(required = "true")
-    private AmazonKinesisFirehose amazonKinesisFirehoseClient;
+    @UriParam
+    private KinesisFirehoseConfiguration configuration;
 
-    public KinesisFirehoseEndpoint(String uri, String streamName, KinesisFirehoseComponent component) {
+    public KinesisFirehoseEndpoint(String uri, KinesisFirehoseConfiguration configuration, KinesisFirehoseComponent component) {
         super(uri, component);
-        this.streamName = streamName;
+        this.configuration = configuration;
     }
 
     @Override
@@ -61,15 +57,11 @@ public class KinesisFirehoseEndpoint extends DefaultEndpoint {
         return true;
     }
 
-    public void setAmazonKinesisFirehoseClient(AmazonKinesisFirehose client) {
-        this.amazonKinesisFirehoseClient = client;
-    }
-
     public AmazonKinesisFirehose getClient() {
-        return amazonKinesisFirehoseClient;
+        return configuration.getAmazonKinesisFirehoseClient();
     }
 
-    public String getStreamName() {
-        return streamName;
+    public KinesisFirehoseConfiguration getConfiguration() {
+        return configuration;
     }
 }
\ No newline at end of file
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/firehose/KinesisFirehoseProducer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/firehose/KinesisFirehoseProducer.java
index ce24736..650d01d 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/firehose/KinesisFirehoseProducer.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/firehose/KinesisFirehoseProducer.java
@@ -54,7 +54,7 @@ public class KinesisFirehoseProducer extends DefaultProducer {
         record.setData(body);
 
         PutRecordRequest putRecordRequest = new PutRecordRequest();
-        putRecordRequest.setDeliveryStreamName(getEndpoint().getStreamName());
+        putRecordRequest.setDeliveryStreamName(getEndpoint().getConfiguration().getStreamName());
         putRecordRequest.setRecord(record);
         return putRecordRequest;
     }
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConfiguration.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConfiguration.java
index 241c36d..c42e5ef 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConfiguration.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConfiguration.java
@@ -16,14 +16,13 @@
  */
 package org.apache.camel.component.aws.kinesis;
 
+import com.amazonaws.services.kinesis.AmazonKinesis;
+import com.amazonaws.services.kinesis.model.ShardIteratorType;
+
 import org.apache.camel.spi.Metadata;
 import org.apache.camel.spi.UriParam;
 import org.apache.camel.spi.UriParams;
 import org.apache.camel.spi.UriPath;
-
-import com.amazonaws.services.kinesis.AmazonKinesis;
-import com.amazonaws.services.kinesis.model.ShardIteratorType;
-
 @UriParams
 public class KinesisConfiguration {
     
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConsumer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConsumer.java
index 04fdbd4..52bd17a 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConsumer.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConsumer.java
@@ -169,6 +169,7 @@ public class KinesisConsumer extends ScheduledBatchPollingConsumer {
 
     private boolean hasSequenceNumber() {
         return !getEndpoint().getConfiguration().getSequenceNumber().isEmpty()
-               && (getEndpoint().getConfiguration().getIteratorType().equals(ShardIteratorType.AFTER_SEQUENCE_NUMBER) || getEndpoint().getConfiguration().getIteratorType().equals(ShardIteratorType.AT_SEQUENCE_NUMBER));
+               && (getEndpoint().getConfiguration().getIteratorType().equals(ShardIteratorType.AFTER_SEQUENCE_NUMBER)
+                   || getEndpoint().getConfiguration().getIteratorType().equals(ShardIteratorType.AT_SEQUENCE_NUMBER));
     }
 }
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisEndpoint.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisEndpoint.java
index 12bc8d3..1a7e79b 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisEndpoint.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisEndpoint.java
@@ -25,10 +25,8 @@ import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
 import org.apache.camel.impl.ScheduledPollEndpoint;
-import org.apache.camel.spi.Metadata;
 import org.apache.camel.spi.UriEndpoint;
 import org.apache.camel.spi.UriParam;
-import org.apache.camel.spi.UriPath;
 
 /**
  * The aws-kinesis component is for consuming and producing records from Amazon
@@ -39,7 +37,7 @@ public class KinesisEndpoint extends ScheduledPollEndpoint {
 
     @UriParam
     private KinesisConfiguration configuration;
-    
+
     public KinesisEndpoint(String uri, KinesisConfiguration configuration, KinesisComponent component) {
         super(uri, component);
         this.configuration = configuration;
@@ -47,7 +45,8 @@ public class KinesisEndpoint extends ScheduledPollEndpoint {
 
     @Override
     protected void doStart() throws Exception {
-        if ((configuration.getIteratorType().equals(ShardIteratorType.AFTER_SEQUENCE_NUMBER) || configuration.getIteratorType().equals(ShardIteratorType.AT_SEQUENCE_NUMBER)) && configuration.getSequenceNumber().isEmpty()) {
+        if ((configuration.getIteratorType().equals(ShardIteratorType.AFTER_SEQUENCE_NUMBER) || configuration.getIteratorType().equals(ShardIteratorType.AT_SEQUENCE_NUMBER))
+            && configuration.getSequenceNumber().isEmpty()) {
             throw new IllegalArgumentException("Sequence Number must be specified with iterator Types AFTER_SEQUENCE_NUMBER or AT_SEQUENCE_NUMBER");
         }
         super.doStart();
@@ -83,7 +82,7 @@ public class KinesisEndpoint extends ScheduledPollEndpoint {
     public AmazonKinesis getClient() {
         return configuration.getAmazonKinesisClient();
     }
-    
+
     public KinesisConfiguration getConfiguration() {
         return configuration;
     }
diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/firehose/KinesisFirehoseEndpointTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/firehose/KinesisFirehoseEndpointTest.java
index 67864b3..ac8cdf1 100644
--- a/components/camel-aws/src/test/java/org/apache/camel/component/aws/firehose/KinesisFirehoseEndpointTest.java
+++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/firehose/KinesisFirehoseEndpointTest.java
@@ -51,6 +51,6 @@ public class KinesisFirehoseEndpointTest {
         );
 
         assertThat(endpoint.getClient(), is(amazonKinesisFirehoseClient));
-        assertThat(endpoint.getStreamName(), is("some_stream_name"));
+        assertThat(endpoint.getConfiguration().getStreamName(), is("some_stream_name"));
     }
 }
diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/firehose/KinesisFirehoseProducerTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/firehose/KinesisFirehoseProducerTest.java
index 612e768..38111a5 100644
--- a/components/camel-aws/src/test/java/org/apache/camel/component/aws/firehose/KinesisFirehoseProducerTest.java
+++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/firehose/KinesisFirehoseProducerTest.java
@@ -48,6 +48,8 @@ public class KinesisFirehoseProducerTest {
     @Mock
     private KinesisFirehoseEndpoint kinesisFirehoseEndpoint;
     @Mock
+    private KinesisFirehoseConfiguration kinesisFirehoseConfiguration;
+    @Mock
     private Message inMessage;
     @Mock
     private Message outMessage;
@@ -61,7 +63,8 @@ public class KinesisFirehoseProducerTest {
     @Before
     public void setup() throws Exception {
         when(kinesisFirehoseEndpoint.getClient()).thenReturn(kinesisFirehoseClient);
-        when(kinesisFirehoseEndpoint.getStreamName()).thenReturn(STREAM_NAME);
+        when(kinesisFirehoseEndpoint.getConfiguration()).thenReturn(kinesisFirehoseConfiguration);
+        when(kinesisFirehoseEndpoint.getConfiguration().getStreamName()).thenReturn(STREAM_NAME);
         when(exchange.getOut()).thenReturn(outMessage);
         when(exchange.getIn()).thenReturn(inMessage);
         when(exchange.getPattern()).thenReturn(ExchangePattern.InOut);

-- 
To stop receiving notification emails like this one, please contact
['"commits@camel.apache.org" <co...@camel.apache.org>'].