You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2018/01/19 10:00:54 UTC
[camel] branch master updated: CAMEL-12160 - Camel-AWS Kinesis
Firehose: Use a configuration for the options like the other AWS components
This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new f73e533 CAMEL-12160 - Camel-AWS Kinesis Firehose: Use a configuration for the options like the other AWS components
f73e533 is described below
commit f73e533d4fb90409441d8f0b53329099c85d3521
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Fri Jan 19 11:00:28 2018 +0100
CAMEL-12160 - Camel-AWS Kinesis Firehose: Use a configuration for the options like the other AWS components
---
.../aws/firehose/KinesisFirehoseComponent.java | 6 ++-
...oint.java => KinesisFirehoseConfiguration.java} | 46 ++++++----------------
.../aws/firehose/KinesisFirehoseEndpoint.java | 22 ++++-------
.../aws/firehose/KinesisFirehoseProducer.java | 2 +-
.../aws/kinesis/KinesisConfiguration.java | 7 ++--
.../component/aws/kinesis/KinesisConsumer.java | 3 +-
.../component/aws/kinesis/KinesisEndpoint.java | 9 ++---
.../aws/firehose/KinesisFirehoseEndpointTest.java | 2 +-
.../aws/firehose/KinesisFirehoseProducerTest.java | 5 ++-
9 files changed, 37 insertions(+), 65 deletions(-)
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/firehose/KinesisFirehoseComponent.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/firehose/KinesisFirehoseComponent.java
index d640d56..369c695 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/firehose/KinesisFirehoseComponent.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/firehose/KinesisFirehoseComponent.java
@@ -34,8 +34,10 @@ public class KinesisFirehoseComponent extends DefaultComponent {
@Override
protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
- KinesisFirehoseEndpoint endpoint = new KinesisFirehoseEndpoint(uri, remaining, this);
- setProperties(endpoint, parameters);
+ KinesisFirehoseConfiguration configuration = new KinesisFirehoseConfiguration();
+ configuration.setStreamName(remaining);
+ setProperties(configuration, parameters);
+ KinesisFirehoseEndpoint endpoint = new KinesisFirehoseEndpoint(uri, configuration, this);
return endpoint;
}
}
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/firehose/KinesisFirehoseEndpoint.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/firehose/KinesisFirehoseConfiguration.java
similarity index 57%
copy from components/camel-aws/src/main/java/org/apache/camel/component/aws/firehose/KinesisFirehoseEndpoint.java
copy to components/camel-aws/src/main/java/org/apache/camel/component/aws/firehose/KinesisFirehoseConfiguration.java
index 97512a9..7c6201f 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/firehose/KinesisFirehoseEndpoint.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/firehose/KinesisFirehoseConfiguration.java
@@ -17,22 +17,14 @@
package org.apache.camel.component.aws.firehose;
import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehose;
-import org.apache.camel.Consumer;
-import org.apache.camel.Processor;
-import org.apache.camel.Producer;
-import org.apache.camel.component.aws.kinesis.KinesisConsumer;
-import org.apache.camel.impl.DefaultEndpoint;
+
import org.apache.camel.spi.Metadata;
-import org.apache.camel.spi.UriEndpoint;
import org.apache.camel.spi.UriParam;
+import org.apache.camel.spi.UriParams;
import org.apache.camel.spi.UriPath;
-/**
- * The aws-kinesis-firehose component is used for producing Amazon's Kinesis Firehose streams.
- */
-@UriEndpoint(firstVersion = "2.19.0", scheme = "aws-kinesis-firehose", title = "AWS Kinesis Firehose", syntax = "aws-kinesis-firehose:streamName",
- producerOnly = true, label = "cloud,messaging")
-public class KinesisFirehoseEndpoint extends DefaultEndpoint {
+@UriParams
+public class KinesisFirehoseConfiguration {
@UriPath(description = "Name of the stream")
@Metadata(required = "true")
@@ -40,36 +32,20 @@ public class KinesisFirehoseEndpoint extends DefaultEndpoint {
@UriParam(description = "Amazon Kinesis Firehose client to use for all requests for this endpoint")
@Metadata(required = "true")
private AmazonKinesisFirehose amazonKinesisFirehoseClient;
-
- public KinesisFirehoseEndpoint(String uri, String streamName, KinesisFirehoseComponent component) {
- super(uri, component);
- this.streamName = streamName;
- }
-
- @Override
- public Producer createProducer() throws Exception {
- return new KinesisFirehoseProducer(this);
- }
-
- @Override
- public Consumer createConsumer(Processor processor) throws Exception {
- throw new UnsupportedOperationException("You cannot consume messages from this endpoint");
- }
-
- @Override
- public boolean isSingleton() {
- return true;
- }
-
+
public void setAmazonKinesisFirehoseClient(AmazonKinesisFirehose client) {
this.amazonKinesisFirehoseClient = client;
}
- public AmazonKinesisFirehose getClient() {
+ public AmazonKinesisFirehose getAmazonKinesisFirehoseClient() {
return amazonKinesisFirehoseClient;
}
+ public void setStreamName(String streamName) {
+ this.streamName = streamName;
+ }
+
public String getStreamName() {
return streamName;
}
-}
\ No newline at end of file
+}
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/firehose/KinesisFirehoseEndpoint.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/firehose/KinesisFirehoseEndpoint.java
index 97512a9..7a79f9b 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/firehose/KinesisFirehoseEndpoint.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/firehose/KinesisFirehoseEndpoint.java
@@ -34,16 +34,12 @@ import org.apache.camel.spi.UriPath;
producerOnly = true, label = "cloud,messaging")
public class KinesisFirehoseEndpoint extends DefaultEndpoint {
- @UriPath(description = "Name of the stream")
- @Metadata(required = "true")
- private String streamName;
- @UriParam(description = "Amazon Kinesis Firehose client to use for all requests for this endpoint")
- @Metadata(required = "true")
- private AmazonKinesisFirehose amazonKinesisFirehoseClient;
+ @UriParam
+ private KinesisFirehoseConfiguration configuration;
- public KinesisFirehoseEndpoint(String uri, String streamName, KinesisFirehoseComponent component) {
+ public KinesisFirehoseEndpoint(String uri, KinesisFirehoseConfiguration configuration, KinesisFirehoseComponent component) {
super(uri, component);
- this.streamName = streamName;
+ this.configuration = configuration;
}
@Override
@@ -61,15 +57,11 @@ public class KinesisFirehoseEndpoint extends DefaultEndpoint {
return true;
}
- public void setAmazonKinesisFirehoseClient(AmazonKinesisFirehose client) {
- this.amazonKinesisFirehoseClient = client;
- }
-
public AmazonKinesisFirehose getClient() {
- return amazonKinesisFirehoseClient;
+ return configuration.getAmazonKinesisFirehoseClient();
}
- public String getStreamName() {
- return streamName;
+ public KinesisFirehoseConfiguration getConfiguration() {
+ return configuration;
}
}
\ No newline at end of file
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/firehose/KinesisFirehoseProducer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/firehose/KinesisFirehoseProducer.java
index ce24736..650d01d 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/firehose/KinesisFirehoseProducer.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/firehose/KinesisFirehoseProducer.java
@@ -54,7 +54,7 @@ public class KinesisFirehoseProducer extends DefaultProducer {
record.setData(body);
PutRecordRequest putRecordRequest = new PutRecordRequest();
- putRecordRequest.setDeliveryStreamName(getEndpoint().getStreamName());
+ putRecordRequest.setDeliveryStreamName(getEndpoint().getConfiguration().getStreamName());
putRecordRequest.setRecord(record);
return putRecordRequest;
}
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConfiguration.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConfiguration.java
index 241c36d..c42e5ef 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConfiguration.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConfiguration.java
@@ -16,14 +16,13 @@
*/
package org.apache.camel.component.aws.kinesis;
+import com.amazonaws.services.kinesis.AmazonKinesis;
+import com.amazonaws.services.kinesis.model.ShardIteratorType;
+
import org.apache.camel.spi.Metadata;
import org.apache.camel.spi.UriParam;
import org.apache.camel.spi.UriParams;
import org.apache.camel.spi.UriPath;
-
-import com.amazonaws.services.kinesis.AmazonKinesis;
-import com.amazonaws.services.kinesis.model.ShardIteratorType;
-
@UriParams
public class KinesisConfiguration {
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConsumer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConsumer.java
index 04fdbd4..52bd17a 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConsumer.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConsumer.java
@@ -169,6 +169,7 @@ public class KinesisConsumer extends ScheduledBatchPollingConsumer {
private boolean hasSequenceNumber() {
return !getEndpoint().getConfiguration().getSequenceNumber().isEmpty()
- && (getEndpoint().getConfiguration().getIteratorType().equals(ShardIteratorType.AFTER_SEQUENCE_NUMBER) || getEndpoint().getConfiguration().getIteratorType().equals(ShardIteratorType.AT_SEQUENCE_NUMBER));
+ && (getEndpoint().getConfiguration().getIteratorType().equals(ShardIteratorType.AFTER_SEQUENCE_NUMBER)
+ || getEndpoint().getConfiguration().getIteratorType().equals(ShardIteratorType.AT_SEQUENCE_NUMBER));
}
}
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisEndpoint.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisEndpoint.java
index 12bc8d3..1a7e79b 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisEndpoint.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/kinesis/KinesisEndpoint.java
@@ -25,10 +25,8 @@ import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.impl.ScheduledPollEndpoint;
-import org.apache.camel.spi.Metadata;
import org.apache.camel.spi.UriEndpoint;
import org.apache.camel.spi.UriParam;
-import org.apache.camel.spi.UriPath;
/**
* The aws-kinesis component is for consuming and producing records from Amazon
@@ -39,7 +37,7 @@ public class KinesisEndpoint extends ScheduledPollEndpoint {
@UriParam
private KinesisConfiguration configuration;
-
+
public KinesisEndpoint(String uri, KinesisConfiguration configuration, KinesisComponent component) {
super(uri, component);
this.configuration = configuration;
@@ -47,7 +45,8 @@ public class KinesisEndpoint extends ScheduledPollEndpoint {
@Override
protected void doStart() throws Exception {
- if ((configuration.getIteratorType().equals(ShardIteratorType.AFTER_SEQUENCE_NUMBER) || configuration.getIteratorType().equals(ShardIteratorType.AT_SEQUENCE_NUMBER)) && configuration.getSequenceNumber().isEmpty()) {
+ if ((configuration.getIteratorType().equals(ShardIteratorType.AFTER_SEQUENCE_NUMBER) || configuration.getIteratorType().equals(ShardIteratorType.AT_SEQUENCE_NUMBER))
+ && configuration.getSequenceNumber().isEmpty()) {
throw new IllegalArgumentException("Sequence Number must be specified with iterator Types AFTER_SEQUENCE_NUMBER or AT_SEQUENCE_NUMBER");
}
super.doStart();
@@ -83,7 +82,7 @@ public class KinesisEndpoint extends ScheduledPollEndpoint {
public AmazonKinesis getClient() {
return configuration.getAmazonKinesisClient();
}
-
+
public KinesisConfiguration getConfiguration() {
return configuration;
}
diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/firehose/KinesisFirehoseEndpointTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/firehose/KinesisFirehoseEndpointTest.java
index 67864b3..ac8cdf1 100644
--- a/components/camel-aws/src/test/java/org/apache/camel/component/aws/firehose/KinesisFirehoseEndpointTest.java
+++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/firehose/KinesisFirehoseEndpointTest.java
@@ -51,6 +51,6 @@ public class KinesisFirehoseEndpointTest {
);
assertThat(endpoint.getClient(), is(amazonKinesisFirehoseClient));
- assertThat(endpoint.getStreamName(), is("some_stream_name"));
+ assertThat(endpoint.getConfiguration().getStreamName(), is("some_stream_name"));
}
}
diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/firehose/KinesisFirehoseProducerTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/firehose/KinesisFirehoseProducerTest.java
index 612e768..38111a5 100644
--- a/components/camel-aws/src/test/java/org/apache/camel/component/aws/firehose/KinesisFirehoseProducerTest.java
+++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/firehose/KinesisFirehoseProducerTest.java
@@ -48,6 +48,8 @@ public class KinesisFirehoseProducerTest {
@Mock
private KinesisFirehoseEndpoint kinesisFirehoseEndpoint;
@Mock
+ private KinesisFirehoseConfiguration kinesisFirehoseConfiguration;
+ @Mock
private Message inMessage;
@Mock
private Message outMessage;
@@ -61,7 +63,8 @@ public class KinesisFirehoseProducerTest {
@Before
public void setup() throws Exception {
when(kinesisFirehoseEndpoint.getClient()).thenReturn(kinesisFirehoseClient);
- when(kinesisFirehoseEndpoint.getStreamName()).thenReturn(STREAM_NAME);
+ when(kinesisFirehoseEndpoint.getConfiguration()).thenReturn(kinesisFirehoseConfiguration);
+ when(kinesisFirehoseEndpoint.getConfiguration().getStreamName()).thenReturn(STREAM_NAME);
when(exchange.getOut()).thenReturn(outMessage);
when(exchange.getIn()).thenReturn(inMessage);
when(exchange.getPattern()).thenReturn(ExchangePattern.InOut);
--
To stop receiving notification emails like this one, please contact
['"commits@camel.apache.org" <co...@camel.apache.org>'].