You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2020/09/01 12:09:40 UTC
[camel] 01/05: CAMEL-15471 - Kinesis-Firehose: Add more operation
to producer side
This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
commit bc913593e200712d259d7ae4e018df7bbf51a427
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Tue Sep 1 12:19:48 2020 +0200
CAMEL-15471 - Kinesis-Firehose: Add more operation to producer side
---
.../aws2/firehose/KinesisFirehose2Operations.java | 1 +
.../aws2/firehose/KinesisFirehose2Producer.java | 28 ++++++++++++++++++++++
2 files changed, 29 insertions(+)
diff --git a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Operations.java b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Operations.java
index 365f8a6..01aa840 100644
--- a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Operations.java
+++ b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Operations.java
@@ -21,5 +21,6 @@ public enum KinesisFirehose2Operations {
sendBatchRecord,
createDeliveryStream,
deleteDeliveryStream,
+ describeDeliveryStream,
updateDestination
}
diff --git a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Producer.java b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Producer.java
index 8b3b13c..5ab81a5 100644
--- a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Producer.java
+++ b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Producer.java
@@ -31,6 +31,8 @@ import software.amazon.awssdk.services.firehose.model.CreateDeliveryStreamReques
import software.amazon.awssdk.services.firehose.model.CreateDeliveryStreamResponse;
import software.amazon.awssdk.services.firehose.model.DeleteDeliveryStreamRequest;
import software.amazon.awssdk.services.firehose.model.DeleteDeliveryStreamResponse;
+import software.amazon.awssdk.services.firehose.model.DescribeDeliveryStreamRequest;
+import software.amazon.awssdk.services.firehose.model.DescribeDeliveryStreamResponse;
import software.amazon.awssdk.services.firehose.model.PutRecordBatchRequest;
import software.amazon.awssdk.services.firehose.model.PutRecordBatchResponse;
import software.amazon.awssdk.services.firehose.model.PutRecordRequest;
@@ -71,6 +73,9 @@ public class KinesisFirehose2Producer extends DefaultProducer {
case updateDestination:
updateDestination(getClient(), exchange);
break;
+ case describeDeliveryStream:
+ describeDeliveryStream(getClient(), exchange);
+ break;
default:
throw new IllegalArgumentException("Unsupported operation");
}
@@ -123,6 +128,29 @@ public class KinesisFirehose2Producer extends DefaultProducer {
"The updateDestination operation expects an UpdateDestinationRequest instance as body");
}
}
+
+ private void describeDeliveryStream(FirehoseClient client, Exchange exchange) {
+ if (exchange.getIn().getBody() instanceof DescribeDeliveryStreamRequest) {
+ DescribeDeliveryStreamRequest req = exchange.getIn().getBody(DescribeDeliveryStreamRequest.class);
+ DescribeDeliveryStreamResponse result = client.describeDeliveryStream(req);
+ Message message = getMessageForResponse(exchange);
+ message.setBody(result);
+ } else {
+ if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(KinesisFirehose2Constants.KINESIS_FIREHOSE_STREAM_NAME))) {
+ DescribeDeliveryStreamRequest req
+ = DescribeDeliveryStreamRequest.builder()
+ .deliveryStreamName(exchange.getIn()
+ .getHeader(KinesisFirehose2Constants.KINESIS_FIREHOSE_STREAM_NAME, String.class))
+ .build();
+ DescribeDeliveryStreamResponse result = client.describeDeliveryStream(req);
+ Message message = getMessageForResponse(exchange);
+ message.setBody(result);
+ } else {
+ throw new IllegalArgumentException(
+ "The describeDeliveryStream operation expects at least an delivery stream name header or a DeleteDeliveryStreamRequest instance");
+ }
+ }
+ }
private void sendBatchRecord(FirehoseClient client, Exchange exchange) {
if (exchange.getIn().getBody() instanceof Iterable) {