You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2020/09/01 12:09:40 UTC

[camel] 01/05: CAMEL-15471 - Kinesis-Firehose: Add more operation to producer side

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit bc913593e200712d259d7ae4e018df7bbf51a427
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Tue Sep 1 12:19:48 2020 +0200

    CAMEL-15471 - Kinesis-Firehose: Add more operation to producer side
---
 .../aws2/firehose/KinesisFirehose2Operations.java  |  1 +
 .../aws2/firehose/KinesisFirehose2Producer.java    | 28 ++++++++++++++++++++++
 2 files changed, 29 insertions(+)

diff --git a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Operations.java b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Operations.java
index 365f8a6..01aa840 100644
--- a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Operations.java
+++ b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Operations.java
@@ -21,5 +21,6 @@ public enum KinesisFirehose2Operations {
     sendBatchRecord,
     createDeliveryStream,
     deleteDeliveryStream,
+    describeDeliveryStream, 
     updateDestination
 }
diff --git a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Producer.java b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Producer.java
index 8b3b13c..5ab81a5 100644
--- a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Producer.java
+++ b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/firehose/KinesisFirehose2Producer.java
@@ -31,6 +31,8 @@ import software.amazon.awssdk.services.firehose.model.CreateDeliveryStreamReques
 import software.amazon.awssdk.services.firehose.model.CreateDeliveryStreamResponse;
 import software.amazon.awssdk.services.firehose.model.DeleteDeliveryStreamRequest;
 import software.amazon.awssdk.services.firehose.model.DeleteDeliveryStreamResponse;
+import software.amazon.awssdk.services.firehose.model.DescribeDeliveryStreamRequest;
+import software.amazon.awssdk.services.firehose.model.DescribeDeliveryStreamResponse;
 import software.amazon.awssdk.services.firehose.model.PutRecordBatchRequest;
 import software.amazon.awssdk.services.firehose.model.PutRecordBatchResponse;
 import software.amazon.awssdk.services.firehose.model.PutRecordRequest;
@@ -71,6 +73,9 @@ public class KinesisFirehose2Producer extends DefaultProducer {
                 case updateDestination:
                     updateDestination(getClient(), exchange);
                     break;
+                case describeDeliveryStream:
+                    describeDeliveryStream(getClient(), exchange);
+                    break;
                 default:
                     throw new IllegalArgumentException("Unsupported operation");
             }
@@ -123,6 +128,29 @@ public class KinesisFirehose2Producer extends DefaultProducer {
                     "The updateDestination operation expects an UpdateDestinationRequest instance as body");
         }
     }
+    
+    private void describeDeliveryStream(FirehoseClient client, Exchange exchange) {
+        if (exchange.getIn().getBody() instanceof DescribeDeliveryStreamRequest) {
+        	DescribeDeliveryStreamRequest req = exchange.getIn().getBody(DescribeDeliveryStreamRequest.class);
+            DescribeDeliveryStreamResponse result = client.describeDeliveryStream(req);
+            Message message = getMessageForResponse(exchange);
+            message.setBody(result);
+        } else {
+            if (ObjectHelper.isNotEmpty(exchange.getIn().getHeader(KinesisFirehose2Constants.KINESIS_FIREHOSE_STREAM_NAME))) {
+                DescribeDeliveryStreamRequest req
+                        = DescribeDeliveryStreamRequest.builder()
+                                .deliveryStreamName(exchange.getIn()
+                                        .getHeader(KinesisFirehose2Constants.KINESIS_FIREHOSE_STREAM_NAME, String.class))
+                                .build();
+                DescribeDeliveryStreamResponse result = client.describeDeliveryStream(req);
+                Message message = getMessageForResponse(exchange);
+                message.setBody(result);
+            } else {
+                throw new IllegalArgumentException(
+                        "The describeDeliveryStream operation expects at least an delivery stream name header or a DeleteDeliveryStreamRequest instance");
+            }
+        }
+    }
 
     private void sendBatchRecord(FirehoseClient client, Exchange exchange) {
         if (exchange.getIn().getBody() instanceof Iterable) {