You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by cm...@apache.org on 2012/09/19 22:24:12 UTC

svn commit: r1387739 - in /camel/branches/camel-2.10.x/components/camel-aws/src: main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java test/java/org/apache/camel/component/aws/sqs/SqsEndpointUseExistingQueueTest.java

Author: cmueller
Date: Wed Sep 19 20:24:11 2012
New Revision: 1387739

URL: http://svn.apache.org/viewvc?rev=1387739&view=rev
Log:
CAMEL-5414: SqsEndpoint can't retrieve existing queue url with visibility timeout different than default

Modified:
    camel/branches/camel-2.10.x/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java
    camel/branches/camel-2.10.x/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsEndpointUseExistingQueueTest.java

Modified: camel/branches/camel-2.10.x/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java?rev=1387739&r1=1387738&r2=1387739&view=diff
==============================================================================
--- camel/branches/camel-2.10.x/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java (original)
+++ camel/branches/camel-2.10.x/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java Wed Sep 19 20:24:11 2012
@@ -25,6 +25,7 @@ import com.amazonaws.services.sqs.model.
 import com.amazonaws.services.sqs.model.CreateQueueResult;
 import com.amazonaws.services.sqs.model.ListQueuesResult;
 import com.amazonaws.services.sqs.model.QueueAttributeName;
+import com.amazonaws.services.sqs.model.SetQueueAttributesRequest;
 
 import org.apache.camel.Consumer;
 import org.apache.camel.Exchange;
@@ -81,35 +82,63 @@ public class SqsEndpoint extends Schedul
         for (String url : listQueuesResult.getQueueUrls()) {
             if (url.endsWith("/" + configuration.getQueueName())) {
                 queueUrl = url;
-                LOG.trace("Queue available at '{}'. Using existing queue attributes!", queueUrl);
+                LOG.trace("Queue available at '{}'.", queueUrl);
                 break;
             }
         }
         
         if (queueUrl == null) {
-            LOG.trace("Queue '{}' doesn't exist. Will create it...", configuration.getQueueName());
+            createQueue(client);
+        } else {
+            updateQueueAttributes(client);
+        }
+    }
 
-            // creates a new queue, or returns the URL of an existing one
-            CreateQueueRequest request = new CreateQueueRequest(configuration.getQueueName());
-            if (getConfiguration().getDefaultVisibilityTimeout() != null) {
-                request.getAttributes().put(QueueAttributeName.VisibilityTimeout.name(), String.valueOf(getConfiguration().getDefaultVisibilityTimeout()));
-            }
-            if (getConfiguration().getMaximumMessageSize() != null) {
-                request.getAttributes().put(QueueAttributeName.MaximumMessageSize.name(), String.valueOf(getConfiguration().getMaximumMessageSize()));
-            }
-            if (getConfiguration().getMessageRetentionPeriod() != null) {
-                request.getAttributes().put(QueueAttributeName.MessageRetentionPeriod.name(), String.valueOf(getConfiguration().getMessageRetentionPeriod()));
-            }
-            if (getConfiguration().getPolicy() != null) {
-                request.getAttributes().put(QueueAttributeName.Policy.name(), String.valueOf(getConfiguration().getPolicy()));
-            }        
-            LOG.trace("Creating queue [{}] with request [{}]...", configuration.getQueueName(), request);
-            
-            CreateQueueResult queueResult = client.createQueue(request);
-            queueUrl = queueResult.getQueueUrl();
-            
-            LOG.trace("Queue created and available at: {}", queueUrl);
+    private void createQueue(AmazonSQSClient client) {
+        LOG.trace("Queue '{}' doesn't exist. Will create it...", configuration.getQueueName());
+
+        // creates a new queue, or returns the URL of an existing one
+        CreateQueueRequest request = new CreateQueueRequest(configuration.getQueueName());
+        if (getConfiguration().getDefaultVisibilityTimeout() != null) {
+            request.getAttributes().put(QueueAttributeName.VisibilityTimeout.name(), String.valueOf(getConfiguration().getDefaultVisibilityTimeout()));
+        }
+        if (getConfiguration().getMaximumMessageSize() != null) {
+            request.getAttributes().put(QueueAttributeName.MaximumMessageSize.name(), String.valueOf(getConfiguration().getMaximumMessageSize()));
+        }
+        if (getConfiguration().getMessageRetentionPeriod() != null) {
+            request.getAttributes().put(QueueAttributeName.MessageRetentionPeriod.name(), String.valueOf(getConfiguration().getMessageRetentionPeriod()));
+        }
+        if (getConfiguration().getPolicy() != null) {
+            request.getAttributes().put(QueueAttributeName.Policy.name(), String.valueOf(getConfiguration().getPolicy()));
+        }        
+        LOG.trace("Creating queue [{}] with request [{}]...", configuration.getQueueName(), request);
+        
+        CreateQueueResult queueResult = client.createQueue(request);
+        queueUrl = queueResult.getQueueUrl();
+        
+        LOG.trace("Queue created and available at: {}", queueUrl);
+    }
+
+    private void updateQueueAttributes(AmazonSQSClient client) {
+        LOG.trace("Updating queue '{}' with the provided queue attributes...", configuration.getQueueName());
+        
+        SetQueueAttributesRequest request = new SetQueueAttributesRequest();
+        request.setQueueUrl(queueUrl);
+        if (getConfiguration().getDefaultVisibilityTimeout() != null) {
+            request.getAttributes().put(QueueAttributeName.VisibilityTimeout.name(), String.valueOf(getConfiguration().getDefaultVisibilityTimeout()));
         }
+        if (getConfiguration().getMaximumMessageSize() != null) {
+            request.getAttributes().put(QueueAttributeName.MaximumMessageSize.name(), String.valueOf(getConfiguration().getMaximumMessageSize()));
+        }
+        if (getConfiguration().getMessageRetentionPeriod() != null) {
+            request.getAttributes().put(QueueAttributeName.MessageRetentionPeriod.name(), String.valueOf(getConfiguration().getMessageRetentionPeriod()));
+        }
+        if (getConfiguration().getPolicy() != null) {
+            request.getAttributes().put(QueueAttributeName.Policy.name(), String.valueOf(getConfiguration().getPolicy()));
+        }
+        client.setQueueAttributes(request);
+        
+        LOG.trace("Queue '{}' updated and available at {}'", configuration.getQueueName(), queueUrl);
     }
 
     @Override

Modified: camel/branches/camel-2.10.x/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsEndpointUseExistingQueueTest.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsEndpointUseExistingQueueTest.java?rev=1387739&r1=1387738&r2=1387739&view=diff
==============================================================================
--- camel/branches/camel-2.10.x/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsEndpointUseExistingQueueTest.java (original)
+++ camel/branches/camel-2.10.x/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsEndpointUseExistingQueueTest.java Wed Sep 19 20:24:11 2012
@@ -28,6 +28,7 @@ import com.amazonaws.services.sqs.model.
 import com.amazonaws.services.sqs.model.Message;
 import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
 import com.amazonaws.services.sqs.model.ReceiveMessageResult;
+import com.amazonaws.services.sqs.model.SetQueueAttributesRequest;
 
 import org.apache.camel.EndpointInject;
 import org.apache.camel.builder.RouteBuilder;
@@ -90,6 +91,11 @@ public class SqsEndpointUseExistingQueue
         }
         
         @Override
+        public void setQueueAttributes(SetQueueAttributesRequest setQueueAttributesRequest) throws AmazonServiceException, AmazonClientException {
+            // noop
+        }
+        
+        @Override
         public ReceiveMessageResult receiveMessage(ReceiveMessageRequest receiveMessageRequest) throws AmazonServiceException, AmazonClientException {
             ReceiveMessageResult result = new ReceiveMessageResult();
             List<Message> resultMessages = result.getMessages();