You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2020/11/09 15:46:53 UTC

[camel] branch master updated: CAMEL-15833: prevent a concurrent access error on AWS v2 SQS (#4582)

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new 3fd3518  CAMEL-15833: prevent a concurrent access error on AWS v2 SQS (#4582)
3fd3518 is described below

commit 3fd35185e74ea5b0a2104182b4e8d1a0956c8d14
Author: Otavio Rodolfo Piske <or...@users.noreply.github.com>
AuthorDate: Mon Nov 9 16:46:24 2020 +0100

    CAMEL-15833: prevent a concurrent access error on AWS v2 SQS (#4582)
---
 .../camel/component/aws2/sqs/Sqs2Endpoint.java     | 43 ++++++++++++++++++++--
 .../component/aws2/sqs/AmazonSQSClientMock.java    | 14 +++++++
 2 files changed, 53 insertions(+), 4 deletions(-)

diff --git a/components/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Endpoint.java b/components/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Endpoint.java
index 8ceef1b..1282b4b 100644
--- a/components/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Endpoint.java
+++ b/components/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Endpoint.java
@@ -49,7 +49,9 @@ import software.amazon.awssdk.services.sqs.model.GetQueueUrlResponse;
 import software.amazon.awssdk.services.sqs.model.ListQueuesResponse;
 import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;
 import software.amazon.awssdk.services.sqs.model.QueueAttributeName;
+import software.amazon.awssdk.services.sqs.model.QueueDoesNotExistException;
 import software.amazon.awssdk.services.sqs.model.SetQueueAttributesRequest;
+import software.amazon.awssdk.services.sqs.model.SqsException;
 
 /**
  * Sending and receive messages to/from AWS SQS service using AWS SDK version 2.x.
@@ -189,8 +191,32 @@ public class Sqs2Endpoint extends ScheduledPollEndpoint implements HeaderFilterS
         }
     }
 
+    private boolean queueExists(SqsClient client) {
+        LOG.trace("Checking if queue '{}' exists", configuration.getQueueName());
+
+        GetQueueUrlRequest getQueueUrlRequest = GetQueueUrlRequest.builder()
+                .queueName(configuration.getQueueName())
+                .build();
+        try {
+            queueUrl = client.getQueueUrl(getQueueUrlRequest).queueUrl();
+            LOG.trace("Queue '{}' exists and its URL is '{}'", configuration.getQueueName(),
+                    queueUrl);
+
+            return true;
+
+        } catch (QueueDoesNotExistException e) {
+            LOG.trace("Queue '{}' does not exist", configuration.getQueueName());
+
+            return false;
+        }
+    }
+
     protected void createQueue(SqsClient client) {
-        LOG.trace("Queue '{}' doesn't exist. Will create it...", configuration.getQueueName());
+        if (queueExists(client)) {
+            return;
+        }
+
+        LOG.trace("Creating the a queue named '{}'", configuration.getQueueName());
 
         // creates a new queue, or returns the URL of an existing one
         CreateQueueRequest.Builder request = CreateQueueRequest.builder().queueName(configuration.getQueueName());
@@ -234,11 +260,20 @@ public class Sqs2Endpoint extends ScheduledPollEndpoint implements HeaderFilterS
                         String.valueOf(getConfiguration().getKmsDataKeyReusePeriodSeconds()));
             }
         }
-        LOG.trace("Creating queue [{}] with request [{}]...", configuration.getQueueName(), request);
+        LOG.trace("Trying to create queue [{}] with request [{}]...", configuration.getQueueName(), request);
         request.attributes(attributes);
 
-        CreateQueueResponse queueResult = client.createQueue(request.build());
-        queueUrl = queueResult.queueUrl();
+        try {
+            CreateQueueResponse queueResult = client.createQueue(request.build());
+            queueUrl = queueResult.queueUrl();
+        } catch (SqsException e) {
+            if (queueExists(client)) {
+                LOG.warn("The queue may have been created since last check and could not be created");
+                LOG.debug("AWS SDK error preventing queue creation: {}", e.getMessage(), e);
+            } else {
+                throw e;
+            }
+        }
 
         LOG.trace("Queue created and available at: {}", queueUrl);
     }
diff --git a/components/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/AmazonSQSClientMock.java b/components/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/AmazonSQSClientMock.java
index f0c3725..2be4289 100644
--- a/components/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/AmazonSQSClientMock.java
+++ b/components/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/AmazonSQSClientMock.java
@@ -28,6 +28,8 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 
+import software.amazon.awssdk.awscore.exception.AwsServiceException;
+import software.amazon.awssdk.core.exception.SdkClientException;
 import software.amazon.awssdk.services.sqs.SqsClient;
 import software.amazon.awssdk.services.sqs.model.BatchResultErrorEntry;
 import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityRequest;
@@ -36,11 +38,14 @@ import software.amazon.awssdk.services.sqs.model.CreateQueueRequest;
 import software.amazon.awssdk.services.sqs.model.CreateQueueResponse;
 import software.amazon.awssdk.services.sqs.model.DeleteMessageRequest;
 import software.amazon.awssdk.services.sqs.model.DeleteMessageResponse;
+import software.amazon.awssdk.services.sqs.model.GetQueueUrlRequest;
+import software.amazon.awssdk.services.sqs.model.GetQueueUrlResponse;
 import software.amazon.awssdk.services.sqs.model.ListQueuesRequest;
 import software.amazon.awssdk.services.sqs.model.ListQueuesResponse;
 import software.amazon.awssdk.services.sqs.model.Message;
 import software.amazon.awssdk.services.sqs.model.PurgeQueueRequest;
 import software.amazon.awssdk.services.sqs.model.PurgeQueueResponse;
+import software.amazon.awssdk.services.sqs.model.QueueDoesNotExistException;
 import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
 import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse;
 import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest;
@@ -50,6 +55,7 @@ import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
 import software.amazon.awssdk.services.sqs.model.SendMessageResponse;
 import software.amazon.awssdk.services.sqs.model.SetQueueAttributesRequest;
 import software.amazon.awssdk.services.sqs.model.SetQueueAttributesResponse;
+import software.amazon.awssdk.services.sqs.model.SqsException;
 
 public class AmazonSQSClientMock implements SqsClient {
 
@@ -235,4 +241,12 @@ public class AmazonSQSClientMock implements SqsClient {
         // TODO Auto-generated method stub
 
     }
+
+    @Override
+    public GetQueueUrlResponse getQueueUrl(GetQueueUrlRequest getQueueUrlRequest)
+            throws QueueDoesNotExistException, AwsServiceException, SdkClientException, SqsException {
+        return GetQueueUrlResponse.builder()
+                .queueUrl("https://queue.amazonaws.com/queue/camel-836")
+                .build();
+    }
 }