You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2023/01/25 06:38:48 UTC

[camel] branch camel-3.x updated: CAMEL-18968 Camel-aws2-sqs - Queue url might stay empty for the delayed queue.

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch camel-3.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-3.x by this push:
     new 64060a1962b CAMEL-18968 Camel-aws2-sqs - Queue url might stay empty for the delayed queue.
64060a1962b is described below

commit 64060a1962b6aebe7184fd48f1e3801120f29fd8
Author: JiriOndrusek <on...@gmail.com>
AuthorDate: Mon Jan 23 13:57:27 2023 +0100

    CAMEL-18968 Camel-aws2-sqs - Queue url might stay empty for the delayed queue.
---
 .../camel/component/aws2/sqs/Sqs2Endpoint.java     | 69 ++++++++++++-------
 .../component/aws2/sqs/AmazonSQSClientMock.java    | 16 ++++-
 .../aws2/sqs/SqsProducerDelayedQueueuTest.java     | 79 ++++++++++++++++++++++
 3 files changed, 136 insertions(+), 28 deletions(-)

diff --git a/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Endpoint.java b/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Endpoint.java
index 806216e3481..a51cb819492 100644
--- a/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Endpoint.java
+++ b/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Endpoint.java
@@ -65,6 +65,7 @@ public class Sqs2Endpoint extends ScheduledPollEndpoint implements HeaderFilterS
 
     private SqsClient client;
     private String queueUrl;
+    private boolean queueUrlInitialized;
 
     @UriPath(description = "Queue name or ARN")
     @Metadata(required = true)
@@ -151,6 +152,7 @@ public class Sqs2Endpoint extends ScheduledPollEndpoint implements HeaderFilterS
 
         if (configuration.getQueueUrl() != null) {
             queueUrl = configuration.getQueueUrl();
+            queueUrlInitialized = true;
         } else {
             // If both region and Account ID is provided the queue URL can be
             // built manually.
@@ -159,39 +161,16 @@ public class Sqs2Endpoint extends ScheduledPollEndpoint implements HeaderFilterS
             if (configuration.getRegion() != null && configuration.getQueueOwnerAWSAccountId() != null) {
                 queueUrl = getAwsEndpointUri() + "/" + configuration.getQueueOwnerAWSAccountId() + "/"
                            + configuration.getQueueName();
+                queueUrlInitialized = true;
             } else if (configuration.getQueueOwnerAWSAccountId() != null) {
                 GetQueueUrlRequest.Builder getQueueUrlRequest = GetQueueUrlRequest.builder();
                 getQueueUrlRequest.queueName(configuration.getQueueName());
                 getQueueUrlRequest.queueOwnerAWSAccountId(configuration.getQueueOwnerAWSAccountId());
                 GetQueueUrlResponse getQueueUrlResult = client.getQueueUrl(getQueueUrlRequest.build());
                 queueUrl = getQueueUrlResult.queueUrl();
+                queueUrlInitialized = true;
             } else {
-                // check whether the queue already exists
-                String queueNamePath = "/" + configuration.getQueueName();
-                ListQueuesRequest.Builder listQueuesRequestBuilder
-                        = ListQueuesRequest.builder().maxResults(1000).queueNamePrefix(configuration.getQueueName());
-
-                for (;;) {
-                    ListQueuesResponse listQueuesResult = client.listQueues(listQueuesRequestBuilder.build());
-                    for (String url : listQueuesResult.queueUrls()) {
-                        if (url.endsWith(queueNamePath)) {
-                            queueUrl = url;
-                            LOG.trace("Queue available at '{}'.", queueUrl);
-                            break;
-                        }
-                    }
-
-                    if (queueUrl != null) {
-                        break;
-                    }
-
-                    String token = listQueuesResult.nextToken();
-                    if (token == null) {
-                        break;
-                    }
-
-                    listQueuesRequestBuilder = listQueuesRequestBuilder.nextToken(token);
-                }
+                initQueueUrl();
             }
         }
 
@@ -203,6 +182,36 @@ public class Sqs2Endpoint extends ScheduledPollEndpoint implements HeaderFilterS
         }
     }
 
+    private void initQueueUrl() {
+        // check whether the queue already exists
+        String queueNamePath = "/" + configuration.getQueueName();
+        ListQueuesRequest.Builder listQueuesRequestBuilder
+                = ListQueuesRequest.builder().maxResults(1000).queueNamePrefix(configuration.getQueueName());
+
+        for (;;) {
+            ListQueuesResponse listQueuesResult = client.listQueues(listQueuesRequestBuilder.build());
+            for (String url : listQueuesResult.queueUrls()) {
+                if (url.endsWith(queueNamePath)) {
+                    queueUrl = url;
+                    LOG.trace("Queue available at '{}'.", queueUrl);
+                    break;
+                }
+            }
+
+            if (queueUrl != null) {
+                queueUrlInitialized = true;
+                break;
+            }
+
+            String token = listQueuesResult.nextToken();
+            if (token == null) {
+                break;
+            }
+
+            listQueuesRequestBuilder = listQueuesRequestBuilder.nextToken(token);
+        }
+    }
+
     private boolean queueExists(SqsClient client) {
         LOG.trace("Checking if queue '{}' exists", configuration.getQueueName());
 
@@ -366,7 +375,15 @@ public class Sqs2Endpoint extends ScheduledPollEndpoint implements HeaderFilterS
         this.client = client;
     }
 
+    /**
+     * If queue does not exist during endpoint initialization, the queueUrl has to be initialized again. See
+     * https://issues.apache.org/jira/browse/CAMEL-18968 for more details.
+     */
     protected String getQueueUrl() {
+        if (!queueUrlInitialized) {
+            LOG.trace("Queue url was not initialized during the start of the component. Initializing again.");
+            initQueueUrl();
+        }
         return queueUrl;
     }
 
diff --git a/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/AmazonSQSClientMock.java b/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/AmazonSQSClientMock.java
index c50ea3b10a1..1a06a45bff7 100644
--- a/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/AmazonSQSClientMock.java
+++ b/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/AmazonSQSClientMock.java
@@ -68,6 +68,7 @@ public class AmazonSQSClientMock implements SqsClient {
     private Map<String, ScheduledFuture<?>> inFlight = new LinkedHashMap<>();
     private ScheduledExecutorService scheduler;
     private String queueName;
+    private boolean verifyQueueUrl;
 
     public AmazonSQSClientMock() {
     }
@@ -88,8 +89,8 @@ public class AmazonSQSClientMock implements SqsClient {
         if (queueName != null) {
             queues.add("/" + queueName);
         } else {
-            queues.add("queue1");
-            queues.add("queue2");
+            queues.add("/queue1");
+            queues.add("/queue2");
         }
         result.queueUrls(queues);
         return result.build();
@@ -106,6 +107,9 @@ public class AmazonSQSClientMock implements SqsClient {
 
     @Override
     public SendMessageResponse sendMessage(SendMessageRequest sendMessageRequest) {
+        if (verifyQueueUrl && sendMessageRequest.queueUrl() == null) {
+            throw new RuntimeException("QueueUrl can not be null.");
+        }
         Message.Builder message = Message.builder();
         message.body(sendMessageRequest.messageBody());
         message.md5OfBody("6a1559560f67c5e7a7d5d838bf0272ee");
@@ -272,4 +276,12 @@ public class AmazonSQSClientMock implements SqsClient {
                 .queueUrl("https://queue.amazonaws.com/queue/camel-836")
                 .build();
     }
+
+    public void setVerifyQueueUrl(boolean verifyQueueUrl) {
+        this.verifyQueueUrl = verifyQueueUrl;
+    }
+
+    public void setQueueName(String queueName) {
+        this.queueName = queueName;
+    }
 }
diff --git a/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/SqsProducerDelayedQueueuTest.java b/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/SqsProducerDelayedQueueuTest.java
new file mode 100644
index 00000000000..ae821b9e016
--- /dev/null
+++ b/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/SqsProducerDelayedQueueuTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws2.sqs;
+
+import org.apache.camel.BindToRegistry;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class SqsProducerDelayedQueueuTest extends CamelTestSupport {
+
+    @BindToRegistry("client")
+    AmazonSQSClientMock mock = new AmazonSQSClientMock();
+
+    @EndpointInject("direct:start")
+    private ProducerTemplate template;
+
+    @EndpointInject("mock:result")
+    private MockEndpoint result;
+
+    @Test
+    public void testDelayed() throws Exception {
+        mock.setVerifyQueueUrl(true);
+        result.expectedMessageCount(1);
+
+        //should fail, because queue3 is not registered in client
+        template.send("direct:start", new Processor() {
+            @Override
+            public void process(Exchange exchange) {
+                exchange.getIn().setBody("Hi from sqs 1");
+            }
+        });
+        //adding queue3 later (delayed)
+        mock.setQueueName("queue3");
+
+        template.send("direct:start", new Processor() {
+            @Override
+            public void process(Exchange exchange) {
+                exchange.getIn().setBody("Hi from sqs 2");
+            }
+        });
+
+        MockEndpoint.assertIsSatisfied(context);
+        String res = result.getExchanges().get(0).getIn().getBody(String.class);
+        assertEquals("Hi from sqs 2", res);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            @Override
+            public void configure() {
+                from("direct:start").to("aws2-sqs://queue3")
+                        .to("mock:result");
+            }
+        };
+    }
+}