You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2018/12/05 15:12:20 UTC

[camel] 01/03: CAMEL-12919 - Camel AWS-SQS: Creating Amazon SQS Queue with Server-Side Encryption

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit c8be940f3232cb59229991c804f7017c80d62711
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Wed Dec 5 15:52:42 2018 +0100

    CAMEL-12919 - Camel AWS-SQS: Creating Amazon SQS Queue with Server-Side Encryption
---
 .../camel-aws/src/main/docs/aws-sqs-component.adoc |  5 +-
 .../camel/component/aws/sqs/SqsConfiguration.java  | 40 +++++++++++++++
 .../camel/component/aws/sqs/SqsEndpoint.java       | 16 ++++++
 .../camel/component/aws/sqs/SqsEndpointTest.java   | 59 ++++++++++++++++++++++
 .../sqs/springboot/SqsComponentConfiguration.java  | 42 +++++++++++++++
 5 files changed, 161 insertions(+), 1 deletion(-)

diff --git a/components/camel-aws/src/main/docs/aws-sqs-component.adoc b/components/camel-aws/src/main/docs/aws-sqs-component.adoc
index d5dc6e8..1130007 100644
--- a/components/camel-aws/src/main/docs/aws-sqs-component.adoc
+++ b/components/camel-aws/src/main/docs/aws-sqs-component.adoc
@@ -68,7 +68,7 @@ with the following path and query parameters:
 |===
 
 
-==== Query Parameters (48 parameters):
+==== Query Parameters (51 parameters):
 
 
 [width="100%",cols="2,5,^1,2",options="header"]
@@ -86,9 +86,12 @@ with the following path and query parameters:
 | *deleteAfterRead* (consumer) | Delete message from SQS after it has been read | true | boolean
 | *deleteIfFiltered* (consumer) | Whether or not to send the DeleteMessage to the SQS queue if an exchange fails to get through a filter. If 'false' and exchange does not make it through a Camel filter upstream in the route, then don't send DeleteMessage. | true | boolean
 | *extendMessageVisibility* (consumer) | If enabled then a scheduled background task will keep extending the message visibility on SQS. This is needed if it takes a long time to process the message. If set to true defaultVisibilityTimeout must be set. See details at Amazon docs. | false | boolean
+| *kmsDataKeyReusePeriod Seconds* (consumer) | The length of time, in seconds, for which Amazon SQS can reuse a data key to encrypt or decrypt messages before calling AWS KMS again. An integer representing seconds, between 60 seconds (1 minute) and 86,400 seconds (24 hours). Default: 300 (5 minutes). |  | Integer
+| *kmsMasterKeyId* (consumer) | The ID of an AWS-managed customer master key (CMK) for Amazon SQS or a custom CMK. |  | String
 | *maxMessagesPerPoll* (consumer) | Gets the maximum number of messages as a limit to poll at each polling. Is default unlimited, but use 0 or negative number to disable it as unlimited. |  | int
 | *messageAttributeNames* (consumer) | A list of message attribute names to receive when consuming. Multiple names can be separated by comma. |  | String
 | *sendEmptyMessageWhenIdle* (consumer) | If the polling consumer did not poll any files, you can enable this option to send an empty message (no body) instead. | false | boolean
+| *serverSideEncryptionEnabled* (consumer) | Define if Server Side Encryption is enabled or not on the queue | false | boolean
 | *visibilityTimeout* (consumer) | The duration (in seconds) that the received messages are hidden from subsequent retrieve requests after being retrieved by a ReceiveMessage request to set in the com.amazonaws.services.sqs.model.SetQueueAttributesRequest. This only make sense if its different from defaultVisibilityTimeout. It changes the queue visibility timeout attribute permanently. |  | Integer
 | *waitTimeSeconds* (consumer) | Duration in seconds (0 to 20) that the ReceiveMessage action call will wait until a message is in the queue to include in the response. |  | Integer
 | *exceptionHandler* (consumer) | To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By default the consumer will deal with exceptions, that will be logged at WARN or ERROR level and ignored. |  | ExceptionHandler
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java
index d6ad090..e63a8e7 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java
@@ -61,6 +61,12 @@ public class SqsConfiguration implements Cloneable {
     private Integer defaultVisibilityTimeout;
     @UriParam(label = "consumer")
     private boolean extendMessageVisibility;
+    @UriParam(label = "consumer")
+    private String kmsMasterKeyId;
+    @UriParam(label = "consumer")
+    private Integer kmsDataKeyReusePeriodSeconds;
+    @UriParam(label = "consumer")
+    private boolean serverSideEncryptionEnabled;
     @UriParam(label = "consumer", defaultValue = "1")
     private int concurrentConsumers = 1;
     @UriParam(label = "advanced")
@@ -396,6 +402,40 @@ public class SqsConfiguration implements Cloneable {
         this.proxyPort = proxyPort;
     }
 
+    public String getKmsMasterKeyId() {
+        return kmsMasterKeyId;
+    }
+
+    /**
+     * The ID of an AWS-managed customer master key (CMK) for Amazon SQS or a custom CMK.
+     */
+    public void setKmsMasterKeyId(String kmsMasterKeyId) {
+        this.kmsMasterKeyId = kmsMasterKeyId;
+    }
+
+    public Integer getKmsDataKeyReusePeriodSeconds() {
+        return kmsDataKeyReusePeriodSeconds;
+    }
+
+    /**
+     * The length of time, in seconds, for which Amazon SQS can reuse a data key to encrypt or decrypt messages before calling AWS KMS again. An integer representing seconds, between 60 seconds (1 minute) 
+     * and 86,400 seconds (24 hours). Default: 300 (5 minutes).
+     */
+    public void setKmsDataKeyReusePeriodSeconds(Integer kmsDataKeyReusePeriodSeconds) {
+        this.kmsDataKeyReusePeriodSeconds = kmsDataKeyReusePeriodSeconds;
+    }
+
+    public boolean isServerSideEncryptionEnabled() {
+        return serverSideEncryptionEnabled;
+    }
+
+    /**
+     * Define if Server Side Encryption is enabled or not on the queue
+     */
+    public void setServerSideEncryptionEnabled(boolean serverSideEncryptionEnabled) {
+        this.serverSideEncryptionEnabled = serverSideEncryptionEnabled;
+    }
+
     /**
      * Only for FIFO queues. Strategy for setting the messageGroupId on the message.
      * Can be one of the following options: *useConstant*, *useExchangeId*, *usePropertyValue*.
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java
index de21934..6ae3e77 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java
@@ -189,6 +189,14 @@ public class SqsEndpoint extends ScheduledPollEndpoint implements HeaderFilterSt
         if (getConfiguration().getRedrivePolicy() != null) {
             request.getAttributes().put(QueueAttributeName.RedrivePolicy.name(), getConfiguration().getRedrivePolicy());
         }
+        if (getConfiguration().isServerSideEncryptionEnabled()) {
+            if (getConfiguration().getKmsMasterKeyId() != null) {
+                request.getAttributes().put(QueueAttributeName.KmsMasterKeyId.name(), getConfiguration().getKmsMasterKeyId());
+            }
+            if (getConfiguration().getKmsDataKeyReusePeriodSeconds() != null) {
+                request.getAttributes().put(QueueAttributeName.KmsDataKeyReusePeriodSeconds.name(), String.valueOf(getConfiguration().getKmsDataKeyReusePeriodSeconds()));
+            }
+        }
         LOG.trace("Creating queue [{}] with request [{}]...", configuration.getQueueName(), request);
 
         CreateQueueResult queueResult = client.createQueue(request);
@@ -221,6 +229,14 @@ public class SqsEndpoint extends ScheduledPollEndpoint implements HeaderFilterSt
         if (getConfiguration().getRedrivePolicy() != null) {
             request.getAttributes().put(QueueAttributeName.RedrivePolicy.name(), getConfiguration().getRedrivePolicy());
         }
+        if (getConfiguration().isServerSideEncryptionEnabled()) {
+            if (getConfiguration().getKmsMasterKeyId() != null) {
+                request.getAttributes().put(QueueAttributeName.KmsMasterKeyId.name(), getConfiguration().getKmsMasterKeyId());
+            }
+            if (getConfiguration().getKmsDataKeyReusePeriodSeconds() != null) {
+                request.getAttributes().put(QueueAttributeName.KmsDataKeyReusePeriodSeconds.name(), String.valueOf(getConfiguration().getKmsDataKeyReusePeriodSeconds()));
+            }
+        }
         if (!request.getAttributes().isEmpty()) {
             LOG.trace("Updating queue '{}' with the provided queue attributes...", configuration.getQueueName());
             client.setQueueAttributes(request);
diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsEndpointTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsEndpointTest.java
index f62e3d1..728dbaa 100644
--- a/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsEndpointTest.java
+++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsEndpointTest.java
@@ -137,4 +137,63 @@ public class SqsEndpointTest {
         Mockito.verify(amazonSQSClient).createQueue(expectedCreateQueueRequest);
         assertEquals("https://sqs.us-east-1.amazonaws.com/111222333/test-queue", endpoint.getQueueUrl());
     }
+    
+    @Test
+    public void createQueueWithSSEShouldCreateStandardQueueWithSSESet() {
+        config.setDefaultVisibilityTimeout(1000);
+        config.setMaximumMessageSize(128);
+        config.setMessageRetentionPeriod(1000);
+        config.setPolicy("{\"Version\": \"2012-10-17\"}");
+        config.setReceiveMessageWaitTimeSeconds(5);
+        config.setRedrivePolicy("{ \"deadLetterTargetArn\" : String, \"maxReceiveCount\" : Integer }");
+        config.setKmsMasterKeyId("keyMaster1");
+        config.setKmsDataKeyReusePeriodSeconds(300);
+        config.setServerSideEncryptionEnabled(true);
+
+        CreateQueueRequest expectedCreateQueueRequest = new CreateQueueRequest("test-queue")
+                .addAttributesEntry(QueueAttributeName.VisibilityTimeout.name(), "1000")
+                .addAttributesEntry(QueueAttributeName.MaximumMessageSize.name(), "128")
+                .addAttributesEntry(QueueAttributeName.MessageRetentionPeriod.name(), "1000")
+                .addAttributesEntry(QueueAttributeName.Policy.name(), "{\"Version\": \"2012-10-17\"}")
+                .addAttributesEntry(QueueAttributeName.ReceiveMessageWaitTimeSeconds.name(), "5")
+                .addAttributesEntry(QueueAttributeName.KmsMasterKeyId.name(), "keyMaster1")
+                .addAttributesEntry(QueueAttributeName.KmsDataKeyReusePeriodSeconds.name(), "300")
+                .addAttributesEntry(QueueAttributeName.RedrivePolicy.name(), "{ \"deadLetterTargetArn\" : String, \"maxReceiveCount\" : Integer }");
+        Mockito.when(amazonSQSClient.createQueue(ArgumentMatchers.any(CreateQueueRequest.class)))
+                .thenReturn(new CreateQueueResult()
+                                .withQueueUrl("https://sqs.us-east-1.amazonaws.com/111222333/test-queue"));
+
+        endpoint.createQueue(amazonSQSClient);
+
+        Mockito.verify(amazonSQSClient).createQueue(expectedCreateQueueRequest);
+        assertEquals("https://sqs.us-east-1.amazonaws.com/111222333/test-queue", endpoint.getQueueUrl());
+    }
+    
+    @Test
+    public void createQueueWithoutSSEShouldNotCreateStandardQueueWithSSESet() {
+        config.setDefaultVisibilityTimeout(1000);
+        config.setMaximumMessageSize(128);
+        config.setMessageRetentionPeriod(1000);
+        config.setPolicy("{\"Version\": \"2012-10-17\"}");
+        config.setReceiveMessageWaitTimeSeconds(5);
+        config.setRedrivePolicy("{ \"deadLetterTargetArn\" : String, \"maxReceiveCount\" : Integer }");
+        config.setKmsMasterKeyId("keyMaster1");
+        config.setKmsDataKeyReusePeriodSeconds(300);
+
+        CreateQueueRequest expectedCreateQueueRequest = new CreateQueueRequest("test-queue")
+                .addAttributesEntry(QueueAttributeName.VisibilityTimeout.name(), "1000")
+                .addAttributesEntry(QueueAttributeName.MaximumMessageSize.name(), "128")
+                .addAttributesEntry(QueueAttributeName.MessageRetentionPeriod.name(), "1000")
+                .addAttributesEntry(QueueAttributeName.Policy.name(), "{\"Version\": \"2012-10-17\"}")
+                .addAttributesEntry(QueueAttributeName.ReceiveMessageWaitTimeSeconds.name(), "5")
+                .addAttributesEntry(QueueAttributeName.RedrivePolicy.name(), "{ \"deadLetterTargetArn\" : String, \"maxReceiveCount\" : Integer }");
+        Mockito.when(amazonSQSClient.createQueue(ArgumentMatchers.any(CreateQueueRequest.class)))
+                .thenReturn(new CreateQueueResult()
+                                .withQueueUrl("https://sqs.us-east-1.amazonaws.com/111222333/test-queue"));
+
+        endpoint.createQueue(amazonSQSClient);
+
+        Mockito.verify(amazonSQSClient).createQueue(expectedCreateQueueRequest);
+        assertEquals("https://sqs.us-east-1.amazonaws.com/111222333/test-queue", endpoint.getQueueUrl());
+    }
 }
\ No newline at end of file
diff --git a/platforms/spring-boot/components-starter/camel-aws-starter/src/main/java/org/apache/camel/component/aws/sqs/springboot/SqsComponentConfiguration.java b/platforms/spring-boot/components-starter/camel-aws-starter/src/main/java/org/apache/camel/component/aws/sqs/springboot/SqsComponentConfiguration.java
index c8ae146..827b57f 100644
--- a/platforms/spring-boot/components-starter/camel-aws-starter/src/main/java/org/apache/camel/component/aws/sqs/springboot/SqsComponentConfiguration.java
+++ b/platforms/spring-boot/components-starter/camel-aws-starter/src/main/java/org/apache/camel/component/aws/sqs/springboot/SqsComponentConfiguration.java
@@ -237,6 +237,22 @@ public class SqsComponentConfiguration
          * To define a proxy port when instantiating the SQS client
          */
         private Integer proxyPort;
+        /**
+         * The ID of an AWS-managed customer master key (CMK) for Amazon SQS or
+         * a custom CMK.
+         */
+        private String kmsMasterKeyId;
+        /**
+         * The length of time, in seconds, for which Amazon SQS can reuse a data
+         * key to encrypt or decrypt messages before calling AWS KMS again. An
+         * integer representing seconds, between 60 seconds (1 minute) and
+         * 86,400 seconds (24 hours). Default: 300 (5 minutes).
+         */
+        private Integer kmsDataKeyReusePeriodSeconds;
+        /**
+         * Define if Server Side Encryption is enabled or not on the queue
+         */
+        private Boolean serverSideEncryptionEnabled = false;
 
         public String getAmazonAWSHost() {
             return amazonAWSHost;
@@ -446,5 +462,31 @@ public class SqsComponentConfiguration
         public void setProxyPort(Integer proxyPort) {
             this.proxyPort = proxyPort;
         }
+
+        public String getKmsMasterKeyId() {
+            return kmsMasterKeyId;
+        }
+
+        public void setKmsMasterKeyId(String kmsMasterKeyId) {
+            this.kmsMasterKeyId = kmsMasterKeyId;
+        }
+
+        public Integer getKmsDataKeyReusePeriodSeconds() {
+            return kmsDataKeyReusePeriodSeconds;
+        }
+
+        public void setKmsDataKeyReusePeriodSeconds(
+                Integer kmsDataKeyReusePeriodSeconds) {
+            this.kmsDataKeyReusePeriodSeconds = kmsDataKeyReusePeriodSeconds;
+        }
+
+        public Boolean getServerSideEncryptionEnabled() {
+            return serverSideEncryptionEnabled;
+        }
+
+        public void setServerSideEncryptionEnabled(
+                Boolean serverSideEncryptionEnabled) {
+            this.serverSideEncryptionEnabled = serverSideEncryptionEnabled;
+        }
     }
 }
\ No newline at end of file