You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2023/01/25 06:38:48 UTC
[camel] branch camel-3.x updated: CAMEL-18968 Camel-aws2-sqs - Queue url might stay empty for the delayed queue.
This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch camel-3.x
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-3.x by this push:
new 64060a1962b CAMEL-18968 Camel-aws2-sqs - Queue url might stay empty for the delayed queue.
64060a1962b is described below
commit 64060a1962b6aebe7184fd48f1e3801120f29fd8
Author: JiriOndrusek <on...@gmail.com>
AuthorDate: Mon Jan 23 13:57:27 2023 +0100
CAMEL-18968 Camel-aws2-sqs - Queue url might stay empty for the delayed queue.
---
.../camel/component/aws2/sqs/Sqs2Endpoint.java | 69 ++++++++++++-------
.../component/aws2/sqs/AmazonSQSClientMock.java | 16 ++++-
.../aws2/sqs/SqsProducerDelayedQueueuTest.java | 79 ++++++++++++++++++++++
3 files changed, 136 insertions(+), 28 deletions(-)
diff --git a/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Endpoint.java b/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Endpoint.java
index 806216e3481..a51cb819492 100644
--- a/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Endpoint.java
+++ b/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Endpoint.java
@@ -65,6 +65,7 @@ public class Sqs2Endpoint extends ScheduledPollEndpoint implements HeaderFilterS
private SqsClient client;
private String queueUrl;
+ private boolean queueUrlInitialized;
@UriPath(description = "Queue name or ARN")
@Metadata(required = true)
@@ -151,6 +152,7 @@ public class Sqs2Endpoint extends ScheduledPollEndpoint implements HeaderFilterS
if (configuration.getQueueUrl() != null) {
queueUrl = configuration.getQueueUrl();
+ queueUrlInitialized = true;
} else {
// If both region and Account ID is provided the queue URL can be
// built manually.
@@ -159,39 +161,16 @@ public class Sqs2Endpoint extends ScheduledPollEndpoint implements HeaderFilterS
if (configuration.getRegion() != null && configuration.getQueueOwnerAWSAccountId() != null) {
queueUrl = getAwsEndpointUri() + "/" + configuration.getQueueOwnerAWSAccountId() + "/"
+ configuration.getQueueName();
+ queueUrlInitialized = true;
} else if (configuration.getQueueOwnerAWSAccountId() != null) {
GetQueueUrlRequest.Builder getQueueUrlRequest = GetQueueUrlRequest.builder();
getQueueUrlRequest.queueName(configuration.getQueueName());
getQueueUrlRequest.queueOwnerAWSAccountId(configuration.getQueueOwnerAWSAccountId());
GetQueueUrlResponse getQueueUrlResult = client.getQueueUrl(getQueueUrlRequest.build());
queueUrl = getQueueUrlResult.queueUrl();
+ queueUrlInitialized = true;
} else {
- // check whether the queue already exists
- String queueNamePath = "/" + configuration.getQueueName();
- ListQueuesRequest.Builder listQueuesRequestBuilder
- = ListQueuesRequest.builder().maxResults(1000).queueNamePrefix(configuration.getQueueName());
-
- for (;;) {
- ListQueuesResponse listQueuesResult = client.listQueues(listQueuesRequestBuilder.build());
- for (String url : listQueuesResult.queueUrls()) {
- if (url.endsWith(queueNamePath)) {
- queueUrl = url;
- LOG.trace("Queue available at '{}'.", queueUrl);
- break;
- }
- }
-
- if (queueUrl != null) {
- break;
- }
-
- String token = listQueuesResult.nextToken();
- if (token == null) {
- break;
- }
-
- listQueuesRequestBuilder = listQueuesRequestBuilder.nextToken(token);
- }
+ initQueueUrl();
}
}
@@ -203,6 +182,36 @@ public class Sqs2Endpoint extends ScheduledPollEndpoint implements HeaderFilterS
}
}
+ private void initQueueUrl() {
+ // check whether the queue already exists
+ String queueNamePath = "/" + configuration.getQueueName();
+ ListQueuesRequest.Builder listQueuesRequestBuilder
+ = ListQueuesRequest.builder().maxResults(1000).queueNamePrefix(configuration.getQueueName());
+
+ for (;;) {
+ ListQueuesResponse listQueuesResult = client.listQueues(listQueuesRequestBuilder.build());
+ for (String url : listQueuesResult.queueUrls()) {
+ if (url.endsWith(queueNamePath)) {
+ queueUrl = url;
+ LOG.trace("Queue available at '{}'.", queueUrl);
+ break;
+ }
+ }
+
+ if (queueUrl != null) {
+ queueUrlInitialized = true;
+ break;
+ }
+
+ String token = listQueuesResult.nextToken();
+ if (token == null) {
+ break;
+ }
+
+ listQueuesRequestBuilder = listQueuesRequestBuilder.nextToken(token);
+ }
+ }
+
private boolean queueExists(SqsClient client) {
LOG.trace("Checking if queue '{}' exists", configuration.getQueueName());
@@ -366,7 +375,15 @@ public class Sqs2Endpoint extends ScheduledPollEndpoint implements HeaderFilterS
this.client = client;
}
+ /**
+ * If queue does not exist during endpoint initialization, the queueUrl has to be initialized again. See
+ * https://issues.apache.org/jira/browse/CAMEL-18968 for more details.
+ */
protected String getQueueUrl() {
+ if (!queueUrlInitialized) {
+ LOG.trace("Queue url was not initialized during the start of the component. Initializing again.");
+ initQueueUrl();
+ }
return queueUrl;
}
diff --git a/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/AmazonSQSClientMock.java b/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/AmazonSQSClientMock.java
index c50ea3b10a1..1a06a45bff7 100644
--- a/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/AmazonSQSClientMock.java
+++ b/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/AmazonSQSClientMock.java
@@ -68,6 +68,7 @@ public class AmazonSQSClientMock implements SqsClient {
private Map<String, ScheduledFuture<?>> inFlight = new LinkedHashMap<>();
private ScheduledExecutorService scheduler;
private String queueName;
+ private boolean verifyQueueUrl;
public AmazonSQSClientMock() {
}
@@ -88,8 +89,8 @@ public class AmazonSQSClientMock implements SqsClient {
if (queueName != null) {
queues.add("/" + queueName);
} else {
- queues.add("queue1");
- queues.add("queue2");
+ queues.add("/queue1");
+ queues.add("/queue2");
}
result.queueUrls(queues);
return result.build();
@@ -106,6 +107,9 @@ public class AmazonSQSClientMock implements SqsClient {
@Override
public SendMessageResponse sendMessage(SendMessageRequest sendMessageRequest) {
+ if (verifyQueueUrl && sendMessageRequest.queueUrl() == null) {
+ throw new RuntimeException("QueueUrl can not be null.");
+ }
Message.Builder message = Message.builder();
message.body(sendMessageRequest.messageBody());
message.md5OfBody("6a1559560f67c5e7a7d5d838bf0272ee");
@@ -272,4 +276,12 @@ public class AmazonSQSClientMock implements SqsClient {
.queueUrl("https://queue.amazonaws.com/queue/camel-836")
.build();
}
+
+ public void setVerifyQueueUrl(boolean verifyQueueUrl) {
+ this.verifyQueueUrl = verifyQueueUrl;
+ }
+
+ public void setQueueName(String queueName) {
+ this.queueName = queueName;
+ }
}
diff --git a/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/SqsProducerDelayedQueueuTest.java b/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/SqsProducerDelayedQueueuTest.java
new file mode 100644
index 00000000000..ae821b9e016
--- /dev/null
+++ b/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/SqsProducerDelayedQueueuTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws2.sqs;
+
+import org.apache.camel.BindToRegistry;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class SqsProducerDelayedQueueuTest extends CamelTestSupport {
+
+ @BindToRegistry("client")
+ AmazonSQSClientMock mock = new AmazonSQSClientMock();
+
+ @EndpointInject("direct:start")
+ private ProducerTemplate template;
+
+ @EndpointInject("mock:result")
+ private MockEndpoint result;
+
+ @Test
+ public void testDelayed() throws Exception {
+ mock.setVerifyQueueUrl(true);
+ result.expectedMessageCount(1);
+
+ //should fail, because queue3 is not registered in client
+ template.send("direct:start", new Processor() {
+ @Override
+ public void process(Exchange exchange) {
+ exchange.getIn().setBody("Hi from sqs 1");
+ }
+ });
+ //adding queue3 later (delayed)
+ mock.setQueueName("queue3");
+
+ template.send("direct:start", new Processor() {
+ @Override
+ public void process(Exchange exchange) {
+ exchange.getIn().setBody("Hi from sqs 2");
+ }
+ });
+
+ MockEndpoint.assertIsSatisfied(context);
+ String res = result.getExchanges().get(0).getIn().getBody(String.class);
+ assertEquals("Hi from sqs 2", res);
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ from("direct:start").to("aws2-sqs://queue3")
+ .to("mock:result");
+ }
+ };
+ }
+}