You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by cm...@apache.org on 2012/09/05 21:55:09 UTC
svn commit: r1381327 - in /camel/trunk/components/camel-aws/src:
main/java/org/apache/camel/component/aws/sqs/
test/java/org/apache/camel/component/aws/sqs/
Author: cmueller
Date: Wed Sep 5 19:55:09 2012
New Revision: 1381327
URL: http://svn.apache.org/viewvc?rev=1381327&view=rev
Log:
CAMEL-5414: SqsEndpoint can't retrieve existing queue url with visibility timeout different than default
Added:
camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsEndpointUseExistingQueueTest.java
Modified:
camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java
camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/AmazonSQSClientMock.java
Modified: camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java?rev=1381327&r1=1381326&r2=1381327&view=diff
==============================================================================
--- camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java (original)
+++ camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java Wed Sep 5 19:55:09 2012
@@ -23,6 +23,7 @@ import com.amazonaws.auth.BasicAWSCreden
import com.amazonaws.services.sqs.AmazonSQSClient;
import com.amazonaws.services.sqs.model.CreateQueueRequest;
import com.amazonaws.services.sqs.model.CreateQueueResult;
+import com.amazonaws.services.sqs.model.ListQueuesResult;
import com.amazonaws.services.sqs.model.QueueAttributeName;
import org.apache.camel.Consumer;
@@ -74,27 +75,41 @@ public class SqsEndpoint extends Schedul
protected void doStart() throws Exception {
client = getConfiguration().getAmazonSQSClient() != null
? getConfiguration().getAmazonSQSClient() : getClient();
-
- // creates a new queue, or returns the URL of an existing one
- CreateQueueRequest request = new CreateQueueRequest(configuration.getQueueName());
- if (getConfiguration().getDefaultVisibilityTimeout() != null) {
- request.getAttributes().put(QueueAttributeName.VisibilityTimeout.name(), String.valueOf(getConfiguration().getDefaultVisibilityTimeout()));
- }
- if (getConfiguration().getMaximumMessageSize() != null) {
- request.getAttributes().put(QueueAttributeName.MaximumMessageSize.name(), String.valueOf(getConfiguration().getMaximumMessageSize()));
- }
- if (getConfiguration().getMessageRetentionPeriod() != null) {
- request.getAttributes().put(QueueAttributeName.MessageRetentionPeriod.name(), String.valueOf(getConfiguration().getMessageRetentionPeriod()));
+
+ // check whether the queue already exists
+ ListQueuesResult listQueuesResult = client.listQueues();
+ for (String url : listQueuesResult.getQueueUrls()) {
+ if (url.endsWith("/" + configuration.getQueueName())) {
+ queueUrl = url;
+ LOG.trace("Queue available at '{}'. Using existing queue attributes!", queueUrl);
+ break;
+ }
}
- if (getConfiguration().getPolicy() != null) {
- request.getAttributes().put(QueueAttributeName.Policy.name(), String.valueOf(getConfiguration().getPolicy()));
- }
- LOG.trace("Creating queue [{}] with request [{}]...", configuration.getQueueName(), request);
- CreateQueueResult queueResult = client.createQueue(request);
- queueUrl = queueResult.getQueueUrl();
-
- LOG.trace("Queue created and available at: {}", queueUrl);
+ if (queueUrl == null) {
+ LOG.trace("Queue '{}' doesn't exist. Will create it...", configuration.getQueueName());
+
+ // creates a new queue, or returns the URL of an existing one
+ CreateQueueRequest request = new CreateQueueRequest(configuration.getQueueName());
+ if (getConfiguration().getDefaultVisibilityTimeout() != null) {
+ request.getAttributes().put(QueueAttributeName.VisibilityTimeout.name(), String.valueOf(getConfiguration().getDefaultVisibilityTimeout()));
+ }
+ if (getConfiguration().getMaximumMessageSize() != null) {
+ request.getAttributes().put(QueueAttributeName.MaximumMessageSize.name(), String.valueOf(getConfiguration().getMaximumMessageSize()));
+ }
+ if (getConfiguration().getMessageRetentionPeriod() != null) {
+ request.getAttributes().put(QueueAttributeName.MessageRetentionPeriod.name(), String.valueOf(getConfiguration().getMessageRetentionPeriod()));
+ }
+ if (getConfiguration().getPolicy() != null) {
+ request.getAttributes().put(QueueAttributeName.Policy.name(), String.valueOf(getConfiguration().getPolicy()));
+ }
+ LOG.trace("Creating queue [{}] with request [{}]...", configuration.getQueueName(), request);
+
+ CreateQueueResult queueResult = client.createQueue(request);
+ queueUrl = queueResult.getQueueUrl();
+
+ LOG.trace("Queue created and available at: {}", queueUrl);
+ }
}
@Override
Modified: camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/AmazonSQSClientMock.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/AmazonSQSClientMock.java?rev=1381327&r1=1381326&r2=1381327&view=diff
==============================================================================
--- camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/AmazonSQSClientMock.java (original)
+++ camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/AmazonSQSClientMock.java Wed Sep 5 19:55:09 2012
@@ -31,6 +31,7 @@ import com.amazonaws.services.sqs.model.
import com.amazonaws.services.sqs.model.CreateQueueRequest;
import com.amazonaws.services.sqs.model.CreateQueueResult;
import com.amazonaws.services.sqs.model.DeleteMessageRequest;
+import com.amazonaws.services.sqs.model.ListQueuesResult;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.amazonaws.services.sqs.model.ReceiveMessageResult;
@@ -49,6 +50,12 @@ public class AmazonSQSClientMock extends
}
@Override
+ public ListQueuesResult listQueues() throws AmazonServiceException, AmazonClientException {
+ ListQueuesResult result = new ListQueuesResult();
+ return result;
+ }
+
+ @Override
public CreateQueueResult createQueue(CreateQueueRequest createQueueRequest) throws AmazonServiceException, AmazonClientException {
CreateQueueResult result = new CreateQueueResult();
result.setQueueUrl("https://queue.amazonaws.com/541925086079/MyQueue");
Added: camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsEndpointUseExistingQueueTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsEndpointUseExistingQueueTest.java?rev=1381327&view=auto
==============================================================================
--- camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsEndpointUseExistingQueueTest.java (added)
+++ camel/trunk/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsEndpointUseExistingQueueTest.java Wed Sep 5 19:55:09 2012
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.sqs;
+
+import java.util.List;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.sqs.AmazonSQSClient;
+import com.amazonaws.services.sqs.model.CreateQueueRequest;
+import com.amazonaws.services.sqs.model.CreateQueueResult;
+import com.amazonaws.services.sqs.model.ListQueuesResult;
+import com.amazonaws.services.sqs.model.Message;
+import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
+import com.amazonaws.services.sqs.model.ReceiveMessageResult;
+
+import org.apache.camel.EndpointInject;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.JndiRegistry;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+public class SqsEndpointUseExistingQueueTest extends CamelTestSupport {
+
+ @EndpointInject(uri = "mock:result")
+ private MockEndpoint mock;
+
+ @Test
+ public void defaultsToDisabled() throws Exception {
+ this.mock.expectedMessageCount(1);
+
+ assertMockEndpointsSatisfied(); // Wait for message to arrive.
+ }
+
+ @Override
+ protected JndiRegistry createRegistry() throws Exception {
+ JndiRegistry registry = super.createRegistry();
+
+ AmazonSQSClientMock clientMock = new SqsEndpointUseExistingQueueTest.AmazonSQSClientMock();
+ registry.bind("amazonSQSClient", clientMock);
+
+ return registry;
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("aws-sqs://MyQueue?amazonSQSClient=#amazonSQSClient")
+ .to("mock:result");
+ }
+ };
+ }
+
+ static class AmazonSQSClientMock extends AmazonSQSClient {
+
+ public AmazonSQSClientMock() {
+ super(new BasicAWSCredentials("myAccessKey", "mySecretKey"));
+ }
+
+ @Override
+ public ListQueuesResult listQueues() throws AmazonServiceException, AmazonClientException {
+ ListQueuesResult result = new ListQueuesResult();
+ result.getQueueUrls().add("http://queue.amazonaws.com/0815/Foo");
+ result.getQueueUrls().add("http://queue.amazonaws.com/0815/MyQueue");
+ result.getQueueUrls().add("http://queue.amazonaws.com/0815/Bar");
+ return result;
+ }
+
+ @Override
+ public CreateQueueResult createQueue(CreateQueueRequest createQueueRequest) throws AmazonServiceException, AmazonClientException {
+ throw new AmazonServiceException("forced exception for test if this method is called");
+ }
+
+ @Override
+ public ReceiveMessageResult receiveMessage(ReceiveMessageRequest receiveMessageRequest) throws AmazonServiceException, AmazonClientException {
+ ReceiveMessageResult result = new ReceiveMessageResult();
+ List<Message> resultMessages = result.getMessages();
+ Message message = new Message();
+ resultMessages.add(message);
+
+ return result;
+ }
+ }
+}