You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by cm...@apache.org on 2012/09/05 21:55:09 UTC

svn commit: r1381327 - in /camel/trunk/components/camel-aws/src: main/java/org/apache/camel/component/aws/sqs/ test/java/org/apache/camel/component/aws/sqs/

Author: cmueller
Date: Wed Sep  5 19:55:09 2012
New Revision: 1381327

URL: http://svn.apache.org/viewvc?rev=1381327&view=rev
Log:
CAMEL-5414: SqsEndpoint can't retrieve existing queue url with visibility timeout different than default

Added:
    camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsEndpointUseExistingQueueTest.java
Modified:
    camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java
    camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/AmazonSQSClientMock.java

Modified: camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java?rev=1381327&r1=1381326&r2=1381327&view=diff
==============================================================================
--- camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java (original)
+++ camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java Wed Sep  5 19:55:09 2012
@@ -23,6 +23,7 @@ import com.amazonaws.auth.BasicAWSCreden
 import com.amazonaws.services.sqs.AmazonSQSClient;
 import com.amazonaws.services.sqs.model.CreateQueueRequest;
 import com.amazonaws.services.sqs.model.CreateQueueResult;
+import com.amazonaws.services.sqs.model.ListQueuesResult;
 import com.amazonaws.services.sqs.model.QueueAttributeName;
 
 import org.apache.camel.Consumer;
@@ -74,27 +75,41 @@ public class SqsEndpoint extends Schedul
     protected void doStart() throws Exception {
         client = getConfiguration().getAmazonSQSClient() != null
                 ? getConfiguration().getAmazonSQSClient() : getClient();
-        
-        // creates a new queue, or returns the URL of an existing one
-        CreateQueueRequest request = new CreateQueueRequest(configuration.getQueueName());
-        if (getConfiguration().getDefaultVisibilityTimeout() != null) {
-            request.getAttributes().put(QueueAttributeName.VisibilityTimeout.name(), String.valueOf(getConfiguration().getDefaultVisibilityTimeout()));
-        }
-        if (getConfiguration().getMaximumMessageSize() != null) {
-            request.getAttributes().put(QueueAttributeName.MaximumMessageSize.name(), String.valueOf(getConfiguration().getMaximumMessageSize()));
-        }
-        if (getConfiguration().getMessageRetentionPeriod() != null) {
-            request.getAttributes().put(QueueAttributeName.MessageRetentionPeriod.name(), String.valueOf(getConfiguration().getMessageRetentionPeriod()));
+
+        // check whether the queue already exists
+        ListQueuesResult listQueuesResult = client.listQueues();
+        for (String url : listQueuesResult.getQueueUrls()) {
+            if (url.endsWith("/" + configuration.getQueueName())) {
+                queueUrl = url;
+                LOG.trace("Queue available at '{}'. Using existing queue attributes!", queueUrl);
+                break;
+            }
         }
-        if (getConfiguration().getPolicy() != null) {
-            request.getAttributes().put(QueueAttributeName.Policy.name(), String.valueOf(getConfiguration().getPolicy()));
-        }        
-        LOG.trace("Creating queue [{}] with request [{}]...", configuration.getQueueName(), request);
         
-        CreateQueueResult queueResult = client.createQueue(request);
-        queueUrl = queueResult.getQueueUrl();
-        
-        LOG.trace("Queue created and available at: {}", queueUrl);
+        if (queueUrl == null) {
+            LOG.trace("Queue '{}' doesn't exist. Will create it...", configuration.getQueueName());
+
+            // creates a new queue, or returns the URL of an existing one
+            CreateQueueRequest request = new CreateQueueRequest(configuration.getQueueName());
+            if (getConfiguration().getDefaultVisibilityTimeout() != null) {
+                request.getAttributes().put(QueueAttributeName.VisibilityTimeout.name(), String.valueOf(getConfiguration().getDefaultVisibilityTimeout()));
+            }
+            if (getConfiguration().getMaximumMessageSize() != null) {
+                request.getAttributes().put(QueueAttributeName.MaximumMessageSize.name(), String.valueOf(getConfiguration().getMaximumMessageSize()));
+            }
+            if (getConfiguration().getMessageRetentionPeriod() != null) {
+                request.getAttributes().put(QueueAttributeName.MessageRetentionPeriod.name(), String.valueOf(getConfiguration().getMessageRetentionPeriod()));
+            }
+            if (getConfiguration().getPolicy() != null) {
+                request.getAttributes().put(QueueAttributeName.Policy.name(), String.valueOf(getConfiguration().getPolicy()));
+            }        
+            LOG.trace("Creating queue [{}] with request [{}]...", configuration.getQueueName(), request);
+            
+            CreateQueueResult queueResult = client.createQueue(request);
+            queueUrl = queueResult.getQueueUrl();
+            
+            LOG.trace("Queue created and available at: {}", queueUrl);
+        }
     }
 
     @Override

Modified: camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/AmazonSQSClientMock.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/AmazonSQSClientMock.java?rev=1381327&r1=1381326&r2=1381327&view=diff
==============================================================================
--- camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/AmazonSQSClientMock.java (original)
+++ camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/AmazonSQSClientMock.java Wed Sep  5 19:55:09 2012
@@ -31,6 +31,7 @@ import com.amazonaws.services.sqs.model.
 import com.amazonaws.services.sqs.model.CreateQueueRequest;
 import com.amazonaws.services.sqs.model.CreateQueueResult;
 import com.amazonaws.services.sqs.model.DeleteMessageRequest;
+import com.amazonaws.services.sqs.model.ListQueuesResult;
 import com.amazonaws.services.sqs.model.Message;
 import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
 import com.amazonaws.services.sqs.model.ReceiveMessageResult;
@@ -49,6 +50,12 @@ public class AmazonSQSClientMock extends
     }
 
     @Override
+    public ListQueuesResult listQueues() throws AmazonServiceException, AmazonClientException {
+        ListQueuesResult result = new ListQueuesResult();
+        return result;
+    }
+
+    @Override
     public CreateQueueResult createQueue(CreateQueueRequest createQueueRequest) throws AmazonServiceException, AmazonClientException {
         CreateQueueResult result = new CreateQueueResult();
         result.setQueueUrl("https://queue.amazonaws.com/541925086079/MyQueue");

Added: camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsEndpointUseExistingQueueTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsEndpointUseExistingQueueTest.java?rev=1381327&view=auto
==============================================================================
--- camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsEndpointUseExistingQueueTest.java (added)
+++ camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsEndpointUseExistingQueueTest.java Wed Sep  5 19:55:09 2012
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.sqs;
+
+import java.util.List;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.sqs.AmazonSQSClient;
+import com.amazonaws.services.sqs.model.CreateQueueRequest;
+import com.amazonaws.services.sqs.model.CreateQueueResult;
+import com.amazonaws.services.sqs.model.ListQueuesResult;
+import com.amazonaws.services.sqs.model.Message;
+import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
+import com.amazonaws.services.sqs.model.ReceiveMessageResult;
+
+import org.apache.camel.EndpointInject;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.JndiRegistry;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+public class SqsEndpointUseExistingQueueTest extends CamelTestSupport {
+
+    @EndpointInject(uri = "mock:result")
+    private MockEndpoint mock;
+
+    @Test
+    public void defaultsToDisabled() throws Exception {
+        this.mock.expectedMessageCount(1);
+
+        assertMockEndpointsSatisfied(); // Wait for message to arrive.
+    }
+
+    @Override
+    protected JndiRegistry createRegistry() throws Exception {
+        JndiRegistry registry = super.createRegistry();
+        
+        AmazonSQSClientMock clientMock = new SqsEndpointUseExistingQueueTest.AmazonSQSClientMock();
+        registry.bind("amazonSQSClient", clientMock);
+        
+        return registry;
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("aws-sqs://MyQueue?amazonSQSClient=#amazonSQSClient")
+                    .to("mock:result");
+            }
+        };
+    }
+    
+    static class AmazonSQSClientMock extends AmazonSQSClient {
+        
+        public AmazonSQSClientMock() {
+            super(new BasicAWSCredentials("myAccessKey", "mySecretKey"));
+        }
+
+        @Override
+        public ListQueuesResult listQueues() throws AmazonServiceException, AmazonClientException {
+            ListQueuesResult result = new ListQueuesResult();
+            result.getQueueUrls().add("http://queue.amazonaws.com/0815/Foo");
+            result.getQueueUrls().add("http://queue.amazonaws.com/0815/MyQueue");
+            result.getQueueUrls().add("http://queue.amazonaws.com/0815/Bar");
+            return result;
+        }
+        
+        @Override
+        public CreateQueueResult createQueue(CreateQueueRequest createQueueRequest) throws AmazonServiceException, AmazonClientException {
+            throw new AmazonServiceException("forced exception for test if this method is called");
+        }
+        
+        @Override
+        public ReceiveMessageResult receiveMessage(ReceiveMessageRequest receiveMessageRequest) throws AmazonServiceException, AmazonClientException {
+            ReceiveMessageResult result = new ReceiveMessageResult();
+            List<Message> resultMessages = result.getMessages();
+            Message message = new Message();
+            resultMessages.add(message);
+            
+            return result;
+        }
+    }
+}