You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2020/11/09 15:46:53 UTC
[camel] branch master updated: CAMEL-15833: prevent a concurrent
access error on AWS v2 SQS (#4582)
This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new 3fd3518 CAMEL-15833: prevent a concurrent access error on AWS v2 SQS (#4582)
3fd3518 is described below
commit 3fd35185e74ea5b0a2104182b4e8d1a0956c8d14
Author: Otavio Rodolfo Piske <or...@users.noreply.github.com>
AuthorDate: Mon Nov 9 16:46:24 2020 +0100
CAMEL-15833: prevent a concurrent access error on AWS v2 SQS (#4582)
---
.../camel/component/aws2/sqs/Sqs2Endpoint.java | 43 ++++++++++++++++++++--
.../component/aws2/sqs/AmazonSQSClientMock.java | 14 +++++++
2 files changed, 53 insertions(+), 4 deletions(-)
diff --git a/components/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Endpoint.java b/components/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Endpoint.java
index 8ceef1b..1282b4b 100644
--- a/components/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Endpoint.java
+++ b/components/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Endpoint.java
@@ -49,7 +49,9 @@ import software.amazon.awssdk.services.sqs.model.GetQueueUrlResponse;
import software.amazon.awssdk.services.sqs.model.ListQueuesResponse;
import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;
import software.amazon.awssdk.services.sqs.model.QueueAttributeName;
+import software.amazon.awssdk.services.sqs.model.QueueDoesNotExistException;
import software.amazon.awssdk.services.sqs.model.SetQueueAttributesRequest;
+import software.amazon.awssdk.services.sqs.model.SqsException;
/**
* Sending and receive messages to/from AWS SQS service using AWS SDK version 2.x.
@@ -189,8 +191,32 @@ public class Sqs2Endpoint extends ScheduledPollEndpoint implements HeaderFilterS
}
}
+ private boolean queueExists(SqsClient client) {
+ LOG.trace("Checking if queue '{}' exists", configuration.getQueueName());
+
+ GetQueueUrlRequest getQueueUrlRequest = GetQueueUrlRequest.builder()
+ .queueName(configuration.getQueueName())
+ .build();
+ try {
+ queueUrl = client.getQueueUrl(getQueueUrlRequest).queueUrl();
+ LOG.trace("Queue '{}' exists and its URL is '{}'", configuration.getQueueName(),
+ queueUrl);
+
+ return true;
+
+ } catch (QueueDoesNotExistException e) {
+ LOG.trace("Queue '{}' does not exist", configuration.getQueueName());
+
+ return false;
+ }
+ }
+
protected void createQueue(SqsClient client) {
- LOG.trace("Queue '{}' doesn't exist. Will create it...", configuration.getQueueName());
+ if (queueExists(client)) {
+ return;
+ }
+
+ LOG.trace("Creating the a queue named '{}'", configuration.getQueueName());
// creates a new queue, or returns the URL of an existing one
CreateQueueRequest.Builder request = CreateQueueRequest.builder().queueName(configuration.getQueueName());
@@ -234,11 +260,20 @@ public class Sqs2Endpoint extends ScheduledPollEndpoint implements HeaderFilterS
String.valueOf(getConfiguration().getKmsDataKeyReusePeriodSeconds()));
}
}
- LOG.trace("Creating queue [{}] with request [{}]...", configuration.getQueueName(), request);
+ LOG.trace("Trying to create queue [{}] with request [{}]...", configuration.getQueueName(), request);
request.attributes(attributes);
- CreateQueueResponse queueResult = client.createQueue(request.build());
- queueUrl = queueResult.queueUrl();
+ try {
+ CreateQueueResponse queueResult = client.createQueue(request.build());
+ queueUrl = queueResult.queueUrl();
+ } catch (SqsException e) {
+ if (queueExists(client)) {
+ LOG.warn("The queue may have been created since last check and could not be created");
+ LOG.debug("AWS SDK error preventing queue creation: {}", e.getMessage(), e);
+ } else {
+ throw e;
+ }
+ }
LOG.trace("Queue created and available at: {}", queueUrl);
}
diff --git a/components/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/AmazonSQSClientMock.java b/components/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/AmazonSQSClientMock.java
index f0c3725..2be4289 100644
--- a/components/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/AmazonSQSClientMock.java
+++ b/components/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/AmazonSQSClientMock.java
@@ -28,6 +28,8 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
+import software.amazon.awssdk.awscore.exception.AwsServiceException;
+import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.BatchResultErrorEntry;
import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityRequest;
@@ -36,11 +38,14 @@ import software.amazon.awssdk.services.sqs.model.CreateQueueRequest;
import software.amazon.awssdk.services.sqs.model.CreateQueueResponse;
import software.amazon.awssdk.services.sqs.model.DeleteMessageRequest;
import software.amazon.awssdk.services.sqs.model.DeleteMessageResponse;
+import software.amazon.awssdk.services.sqs.model.GetQueueUrlRequest;
+import software.amazon.awssdk.services.sqs.model.GetQueueUrlResponse;
import software.amazon.awssdk.services.sqs.model.ListQueuesRequest;
import software.amazon.awssdk.services.sqs.model.ListQueuesResponse;
import software.amazon.awssdk.services.sqs.model.Message;
import software.amazon.awssdk.services.sqs.model.PurgeQueueRequest;
import software.amazon.awssdk.services.sqs.model.PurgeQueueResponse;
+import software.amazon.awssdk.services.sqs.model.QueueDoesNotExistException;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse;
import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest;
@@ -50,6 +55,7 @@ import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
import software.amazon.awssdk.services.sqs.model.SendMessageResponse;
import software.amazon.awssdk.services.sqs.model.SetQueueAttributesRequest;
import software.amazon.awssdk.services.sqs.model.SetQueueAttributesResponse;
+import software.amazon.awssdk.services.sqs.model.SqsException;
public class AmazonSQSClientMock implements SqsClient {
@@ -235,4 +241,12 @@ public class AmazonSQSClientMock implements SqsClient {
// TODO Auto-generated method stub
}
+
+ @Override
+ public GetQueueUrlResponse getQueueUrl(GetQueueUrlRequest getQueueUrlRequest)
+ throws QueueDoesNotExistException, AwsServiceException, SdkClientException, SqsException {
+ return GetQueueUrlResponse.builder()
+ .queueUrl("https://queue.amazonaws.com/queue/camel-836")
+ .build();
+ }
}