You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by cm...@apache.org on 2012/09/19 22:24:12 UTC
svn commit: r1387739 - in
/camel/branches/camel-2.10.x/components/camel-aws/src:
main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java
test/java/org/apache/camel/component/aws/sqs/SqsEndpointUseExistingQueueTest.java
Author: cmueller
Date: Wed Sep 19 20:24:11 2012
New Revision: 1387739
URL: http://svn.apache.org/viewvc?rev=1387739&view=rev
Log:
CAMEL-5414: SqsEndpoint can't retrieve existing queue url with visibility timeout different than default
Modified:
camel/branches/camel-2.10.x/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java
camel/branches/camel-2.10.x/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsEndpointUseExistingQueueTest.java
Modified: camel/branches/camel-2.10.x/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java?rev=1387739&r1=1387738&r2=1387739&view=diff
==============================================================================
--- camel/branches/camel-2.10.x/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java (original)
+++ camel/branches/camel-2.10.x/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java Wed Sep 19 20:24:11 2012
@@ -25,6 +25,7 @@ import com.amazonaws.services.sqs.model.
import com.amazonaws.services.sqs.model.CreateQueueResult;
import com.amazonaws.services.sqs.model.ListQueuesResult;
import com.amazonaws.services.sqs.model.QueueAttributeName;
+import com.amazonaws.services.sqs.model.SetQueueAttributesRequest;
import org.apache.camel.Consumer;
import org.apache.camel.Exchange;
@@ -81,35 +82,63 @@ public class SqsEndpoint extends Schedul
for (String url : listQueuesResult.getQueueUrls()) {
if (url.endsWith("/" + configuration.getQueueName())) {
queueUrl = url;
- LOG.trace("Queue available at '{}'. Using existing queue attributes!", queueUrl);
+ LOG.trace("Queue available at '{}'.", queueUrl);
break;
}
}
if (queueUrl == null) {
- LOG.trace("Queue '{}' doesn't exist. Will create it...", configuration.getQueueName());
+ createQueue(client);
+ } else {
+ updateQueueAttributes(client);
+ }
+ }
- // creates a new queue, or returns the URL of an existing one
- CreateQueueRequest request = new CreateQueueRequest(configuration.getQueueName());
- if (getConfiguration().getDefaultVisibilityTimeout() != null) {
- request.getAttributes().put(QueueAttributeName.VisibilityTimeout.name(), String.valueOf(getConfiguration().getDefaultVisibilityTimeout()));
- }
- if (getConfiguration().getMaximumMessageSize() != null) {
- request.getAttributes().put(QueueAttributeName.MaximumMessageSize.name(), String.valueOf(getConfiguration().getMaximumMessageSize()));
- }
- if (getConfiguration().getMessageRetentionPeriod() != null) {
- request.getAttributes().put(QueueAttributeName.MessageRetentionPeriod.name(), String.valueOf(getConfiguration().getMessageRetentionPeriod()));
- }
- if (getConfiguration().getPolicy() != null) {
- request.getAttributes().put(QueueAttributeName.Policy.name(), String.valueOf(getConfiguration().getPolicy()));
- }
- LOG.trace("Creating queue [{}] with request [{}]...", configuration.getQueueName(), request);
-
- CreateQueueResult queueResult = client.createQueue(request);
- queueUrl = queueResult.getQueueUrl();
-
- LOG.trace("Queue created and available at: {}", queueUrl);
+ private void createQueue(AmazonSQSClient client) {
+ LOG.trace("Queue '{}' doesn't exist. Will create it...", configuration.getQueueName());
+
+ // creates a new queue, or returns the URL of an existing one
+ CreateQueueRequest request = new CreateQueueRequest(configuration.getQueueName());
+ if (getConfiguration().getDefaultVisibilityTimeout() != null) {
+ request.getAttributes().put(QueueAttributeName.VisibilityTimeout.name(), String.valueOf(getConfiguration().getDefaultVisibilityTimeout()));
+ }
+ if (getConfiguration().getMaximumMessageSize() != null) {
+ request.getAttributes().put(QueueAttributeName.MaximumMessageSize.name(), String.valueOf(getConfiguration().getMaximumMessageSize()));
+ }
+ if (getConfiguration().getMessageRetentionPeriod() != null) {
+ request.getAttributes().put(QueueAttributeName.MessageRetentionPeriod.name(), String.valueOf(getConfiguration().getMessageRetentionPeriod()));
+ }
+ if (getConfiguration().getPolicy() != null) {
+ request.getAttributes().put(QueueAttributeName.Policy.name(), String.valueOf(getConfiguration().getPolicy()));
+ }
+ LOG.trace("Creating queue [{}] with request [{}]...", configuration.getQueueName(), request);
+
+ CreateQueueResult queueResult = client.createQueue(request);
+ queueUrl = queueResult.getQueueUrl();
+
+ LOG.trace("Queue created and available at: {}", queueUrl);
+ }
+
+ private void updateQueueAttributes(AmazonSQSClient client) {
+ LOG.trace("Updating queue '{}' with the provided queue attributes...", configuration.getQueueName());
+
+ SetQueueAttributesRequest request = new SetQueueAttributesRequest();
+ request.setQueueUrl(queueUrl);
+ if (getConfiguration().getDefaultVisibilityTimeout() != null) {
+ request.getAttributes().put(QueueAttributeName.VisibilityTimeout.name(), String.valueOf(getConfiguration().getDefaultVisibilityTimeout()));
}
+ if (getConfiguration().getMaximumMessageSize() != null) {
+ request.getAttributes().put(QueueAttributeName.MaximumMessageSize.name(), String.valueOf(getConfiguration().getMaximumMessageSize()));
+ }
+ if (getConfiguration().getMessageRetentionPeriod() != null) {
+ request.getAttributes().put(QueueAttributeName.MessageRetentionPeriod.name(), String.valueOf(getConfiguration().getMessageRetentionPeriod()));
+ }
+ if (getConfiguration().getPolicy() != null) {
+ request.getAttributes().put(QueueAttributeName.Policy.name(), String.valueOf(getConfiguration().getPolicy()));
+ }
+ client.setQueueAttributes(request);
+
+ LOG.trace("Queue '{}' updated and available at {}'", configuration.getQueueName(), queueUrl);
}
@Override
Modified: camel/branches/camel-2.10.x/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsEndpointUseExistingQueueTest.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsEndpointUseExistingQueueTest.java?rev=1387739&r1=1387738&r2=1387739&view=diff
==============================================================================
--- camel/branches/camel-2.10.x/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsEndpointUseExistingQueueTest.java (original)
+++ camel/branches/camel-2.10.x/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsEndpointUseExistingQueueTest.java Wed Sep 19 20:24:11 2012
@@ -28,6 +28,7 @@ import com.amazonaws.services.sqs.model.
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.amazonaws.services.sqs.model.ReceiveMessageResult;
+import com.amazonaws.services.sqs.model.SetQueueAttributesRequest;
import org.apache.camel.EndpointInject;
import org.apache.camel.builder.RouteBuilder;
@@ -90,6 +91,11 @@ public class SqsEndpointUseExistingQueue
}
@Override
+ public void setQueueAttributes(SetQueueAttributesRequest setQueueAttributesRequest) throws AmazonServiceException, AmazonClientException {
+ // noop
+ }
+
+ @Override
public ReceiveMessageResult receiveMessage(ReceiveMessageRequest receiveMessageRequest) throws AmazonServiceException, AmazonClientException {
ReceiveMessageResult result = new ReceiveMessageResult();
List<Message> resultMessages = result.getMessages();