You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2013/10/11 12:33:18 UTC
[1/2] git commit: CAMEL-6850: Allow AWS SQS to not ack or even if it
doesn't encounter an exception. Thanks to Christian Posta for the patch.
Updated Branches:
refs/heads/camel-2.12.x ddc6b43c1 -> 8257f46be
refs/heads/master e76d870de -> a562c867e
CAMEL-6850: Allow AWS SQS to not ack or even if it doesn't encounter an exception. Thanks to Christian Posta for the patch.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/a562c867
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/a562c867
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/a562c867
Branch: refs/heads/master
Commit: a562c867ed2794fc1934709a3777dfeed69aff8f
Parents: e76d870
Author: Claus Ibsen <da...@apache.org>
Authored: Fri Oct 11 12:32:41 2013 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Fri Oct 11 12:32:41 2013 +0200
----------------------------------------------------------------------
.../component/aws/sqs/SqsConfiguration.java | 10 ++
.../camel/component/aws/sqs/SqsConsumer.java | 19 ++-
.../component/aws/sqs/AmazonSQSClientMock.java | 67 +++++++++-
.../sqs/SqsFilterMessagesWithNoDeleteTest.java | 127 +++++++++++++++++++
4 files changed, 214 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/a562c867/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java
index e306221..87f102c 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java
@@ -36,6 +36,7 @@ public class SqsConfiguration {
// consumer properties
private Boolean deleteAfterRead = Boolean.TRUE;
+ private Boolean deleteIfFiltered = Boolean.TRUE;
private Integer visibilityTimeout;
private Collection<String> attributeNames;
private Integer waitTimeSeconds;
@@ -187,6 +188,14 @@ public class SqsConfiguration {
this.queueOwnerAWSAccountId = queueOwnerAWSAccountId;
}
+ public Boolean isDeleteIfFiltered() {
+ return deleteIfFiltered;
+ }
+
+ public void setDeleteIfFiltered(Boolean deleteIfFiltered) {
+ this.deleteIfFiltered = deleteIfFiltered;
+ }
+
@Override
public String toString() {
return "SqsConfiguration[queueName=" + queueName
@@ -194,6 +203,7 @@ public class SqsConfiguration {
+ ", accessKey=" + accessKey
+ ", secretKey=xxxxxxxxxxxxxxx"
+ ", deleteAfterRead=" + deleteAfterRead
+ + ", deleteIfFiltered=" + deleteIfFiltered
+ ", visibilityTimeout=" + visibilityTimeout
+ ", attributeNames=" + attributeNames
+ ", waitTimeSeconds=" + waitTimeSeconds
http://git-wip-us.apache.org/repos/asf/camel/blob/a562c867/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java
index 4fae7d4..4c76c15 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java
@@ -180,12 +180,13 @@ public class SqsConsumer extends ScheduledBatchPollingConsumer {
*/
protected void processCommit(Exchange exchange) {
try {
- if (getConfiguration().isDeleteAfterRead()) {
+
+ if (shouldDelete(exchange)) {
String receiptHandle = exchange.getIn().getHeader(SqsConstants.RECEIPT_HANDLE, String.class);
DeleteMessageRequest deleteRequest = new DeleteMessageRequest(getQueueUrl(), receiptHandle);
-
+
LOG.trace("Deleting message with receipt handle {}...", receiptHandle);
-
+
getClient().deleteMessage(deleteRequest);
LOG.trace("Deleted message with receipt handle {}...", receiptHandle);
@@ -195,6 +196,18 @@ public class SqsConsumer extends ScheduledBatchPollingConsumer {
}
}
+ private boolean shouldDelete(Exchange exchange) {
+ return getConfiguration().isDeleteAfterRead()
+ && (getConfiguration().isDeleteIfFiltered()
+ || (!getConfiguration().isDeleteIfFiltered()
+ && passedThroughFilter(exchange)));
+ }
+
+ private boolean passedThroughFilter(Exchange exchange) {
+ return exchange.getProperties().containsKey(Exchange.FILTER_MATCHED)
+ && ((Boolean) exchange.getProperties().get(Exchange.FILTER_MATCHED));
+ }
+
/**
* Strategy when processing the exchange failed.
*
http://git-wip-us.apache.org/repos/asf/camel/blob/a562c867/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/AmazonSQSClientMock.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/AmazonSQSClientMock.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/AmazonSQSClientMock.java
index ac0838f..4d227a7 100644
--- a/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/AmazonSQSClientMock.java
+++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/AmazonSQSClientMock.java
@@ -20,9 +20,14 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
import com.amazonaws.AmazonClientException;
import com.amazonaws.AmazonServiceException;
@@ -41,11 +46,14 @@ import com.amazonaws.services.sqs.model.SendMessageResult;
import com.amazonaws.services.sqs.model.SetQueueAttributesRequest;
public class AmazonSQSClientMock extends AmazonSQSClient {
-
+
List<Message> messages = new ArrayList<Message>();
Map<String, Map<String, String>> queueAttributes = new HashMap<String, Map<String, String>>();
List<ChangeMessageVisibilityRequest> changeMessageVisibilityRequests = new CopyOnWriteArrayList<ChangeMessageVisibilityRequest>();
-
+ private Map<String, CreateQueueRequest> queues = new LinkedHashMap<String, CreateQueueRequest>();
+ private Map<String, ScheduledFuture> inFlight = new LinkedHashMap<String, ScheduledFuture>();
+ private ScheduledExecutorService scheduler;
+
public AmazonSQSClientMock() {
super(new BasicAWSCredentials("myAccessKey", "mySecretKey"));
}
@@ -58,8 +66,10 @@ public class AmazonSQSClientMock extends AmazonSQSClient {
@Override
public CreateQueueResult createQueue(CreateQueueRequest createQueueRequest) throws AmazonServiceException, AmazonClientException {
+ String queueName = "https://queue.amazonaws.com/541925086079/" + createQueueRequest.getQueueName();
+ queues.put(queueName, createQueueRequest);
CreateQueueResult result = new CreateQueueResult();
- result.setQueueUrl("https://queue.amazonaws.com/541925086079/MyQueue");
+ result.setQueueUrl(queueName);
return result;
}
@@ -91,8 +101,10 @@ public class AmazonSQSClientMock extends AmazonSQSClient {
synchronized (messages) {
int fetchSize = 0;
for (Iterator<Message> iterator = messages.iterator(); iterator.hasNext() && fetchSize < maxNumberOfMessages; fetchSize++) {
- resultMessages.add(iterator.next());
+ Message rc = iterator.next();
+ resultMessages.add(rc);
iterator.remove();
+ scheduleCancelInflight(receiveMessageRequest.getQueueUrl(), rc);
}
}
@@ -100,9 +112,52 @@ public class AmazonSQSClientMock extends AmazonSQSClient {
return result;
}
+ /*
+ * Cancel (put back onto queue) in flight messages if the visibility time has expired
+ * and has not been manually deleted (ack'd)
+ */
+ private void scheduleCancelInflight(final String queueUrl, final Message message) {
+ if (scheduler != null) {
+ int visibility = getVisibilityForQueue(queueUrl);
+ if (visibility > 0) {
+ ScheduledFuture task = scheduler.schedule(new Runnable() {
+ @Override
+ public void run() {
+ synchronized (messages) {
+ // put it back!
+ messages.add(message);
+ }
+ }
+ }, visibility, TimeUnit.SECONDS);
+
+ inFlight.put(message.getReceiptHandle(), task);
+ }
+ }
+ }
+
+ private int getVisibilityForQueue(String queueUrl) {
+ Map<String, String> queueAttr = queues.get(queueUrl).getAttributes();
+ if (queueAttr.containsKey("VisibilityTimeout")) {
+ return Integer.parseInt(queueAttr.get("VisibilityTimeout"));
+ }
+ return 0;
+ }
+
+ public ScheduledExecutorService getScheduler() {
+ return scheduler;
+ }
+
+ public void setScheduler(ScheduledExecutorService scheduler) {
+ this.scheduler = scheduler;
+ }
+
@Override
- public void deleteMessage(DeleteMessageRequest deleteMessageRequest) throws AmazonServiceException, AmazonClientException {
- // noop
+ public void deleteMessage(DeleteMessageRequest deleteMessageRequest) throws AmazonClientException {
+ String receiptHandle = deleteMessageRequest.getReceiptHandle();
+ if (inFlight.containsKey(receiptHandle)) {
+ ScheduledFuture inFlightTask = inFlight.get(receiptHandle);
+ inFlightTask.cancel(true);
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/camel/blob/a562c867/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsFilterMessagesWithNoDeleteTest.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsFilterMessagesWithNoDeleteTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsFilterMessagesWithNoDeleteTest.java
new file mode 100644
index 0000000..2c72d7e
--- /dev/null
+++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsFilterMessagesWithNoDeleteTest.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.sqs;
+
+import java.util.concurrent.TimeUnit;
+
+import com.amazonaws.services.sqs.model.Message;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.impl.JndiRegistry;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+
+public class SqsFilterMessagesWithNoDeleteTest extends CamelTestSupport {
+
+
+ // put some test messages onto the 'queue'
+ private void populateMessages(AmazonSQSClientMock clientMock) {
+ Message message = new Message();
+ message.setBody("Message: hello, world!");
+ message.setMD5OfBody("6a1559560f67c5e7a7d5d838bf0272ee");
+ message.setMessageId("f6fb6f99-5eb2-4be4-9b15-144774141458");
+ message.setReceiptHandle("0NNAq8PwvXsyZkR6yu4nQ07FGxNmOBWi5");
+
+ clientMock.messages.add(message);
+ }
+
+ @Test
+ public void testDoesNotGetThroughFilter() throws Exception {
+
+ final String sqsURI = String.format("aws-sqs://MyQueue?amazonSQSClient=#amazonSQSClient"
+ // note we will NOT delete if this message gets filtered out
+ + "&deleteIfFiltered=false"
+ + "&defaultVisibilityTimeout=1");
+
+ AmazonSQSClientMock clientMock = new AmazonSQSClientMock();
+ populateMessages(clientMock);
+ JndiRegistry registry = new JndiRegistry(createJndiContext());
+
+ DefaultCamelContext ctx = new DefaultCamelContext(registry);
+ ctx.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from(sqsURI)
+ // try to filter using a non-existent header... should not go through
+ .filter(simple("${header.login} == true"))
+ .to("mock:result");
+
+ }
+ });
+ MockEndpoint result = MockEndpoint.resolve(ctx, "mock:result");
+ clientMock.setScheduler(ctx.getExecutorServiceManager().newScheduledThreadPool(clientMock, "ClientMock Scheduler", 1));
+ registry.bind("amazonSQSClient", clientMock);
+
+ result.expectedMessageCount(0);
+
+ ctx.start();
+
+ // we shouldn't get
+ result.assertIsSatisfied(1000, TimeUnit.MILLISECONDS);
+
+
+ // however, the message should not be deleted, that is, it should be left on the queue
+ String response = ctx.createConsumerTemplate().receiveBody(sqsURI, 5000, String.class);
+
+ assertEquals(response, "Message: hello, world!");
+
+ }
+
+ @Test
+ public void testGetThroughFilter() throws Exception {
+ final String sqsURI = String.format("aws-sqs://MyQueue?amazonSQSClient=#amazonSQSClient"
+ // note we will NOT delete if this message gets filtered out, but if it goes
+ // through filter, it should be deleted!
+ + "&deleteIfFiltered=false"
+ + "&defaultVisibilityTimeout=1");
+
+ AmazonSQSClientMock clientMock = new AmazonSQSClientMock();
+ populateMessages(clientMock);
+ JndiRegistry registry = new JndiRegistry(createJndiContext());
+
+ DefaultCamelContext ctx = new DefaultCamelContext(registry);
+ ctx.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from(sqsURI)
+ .setHeader("login", constant(true))
+
+ // this filter should allow the message to pass..
+ .filter(simple("${header.login} == true"))
+ .to("mock:result");
+
+ }
+ });
+ MockEndpoint result = MockEndpoint.resolve(ctx, "mock:result");
+ registry.bind("amazonSQSClient", clientMock);
+ clientMock.setScheduler(ctx.getExecutorServiceManager().newScheduledThreadPool(clientMock, "ClientMock Scheduler", 1));
+
+ result.expectedMessageCount(1);
+ ctx.start();
+
+ // the message should get through filter and mock should assert this
+ result.assertIsSatisfied(1000, TimeUnit.MILLISECONDS);
+
+ // however, the message should not be deleted, that is, it should be left on the queue
+ String response = ctx.createConsumerTemplate().receiveBody(sqsURI, 5000, String.class);
+
+ assertNull(response);
+ }
+
+}
[2/2] git commit: CAMEL-6850: Allow AWS SQS to not ack or even if it
doesn't encounter an exception. Thanks to Christian Posta for the patch.
Posted by da...@apache.org.
CAMEL-6850: Allow AWS SQS to not ack or even if it doesn't encounter an exception. Thanks to Christian Posta for the patch.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/8257f46b
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/8257f46b
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/8257f46b
Branch: refs/heads/camel-2.12.x
Commit: 8257f46beb04dc2b2af9552e092ca1c296e99f45
Parents: ddc6b43
Author: Claus Ibsen <da...@apache.org>
Authored: Fri Oct 11 12:32:41 2013 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Fri Oct 11 12:32:55 2013 +0200
----------------------------------------------------------------------
.../component/aws/sqs/SqsConfiguration.java | 10 ++
.../camel/component/aws/sqs/SqsConsumer.java | 19 ++-
.../component/aws/sqs/AmazonSQSClientMock.java | 67 +++++++++-
.../sqs/SqsFilterMessagesWithNoDeleteTest.java | 127 +++++++++++++++++++
4 files changed, 214 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/8257f46b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java
index e306221..87f102c 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java
@@ -36,6 +36,7 @@ public class SqsConfiguration {
// consumer properties
private Boolean deleteAfterRead = Boolean.TRUE;
+ private Boolean deleteIfFiltered = Boolean.TRUE;
private Integer visibilityTimeout;
private Collection<String> attributeNames;
private Integer waitTimeSeconds;
@@ -187,6 +188,14 @@ public class SqsConfiguration {
this.queueOwnerAWSAccountId = queueOwnerAWSAccountId;
}
+ public Boolean isDeleteIfFiltered() {
+ return deleteIfFiltered;
+ }
+
+ public void setDeleteIfFiltered(Boolean deleteIfFiltered) {
+ this.deleteIfFiltered = deleteIfFiltered;
+ }
+
@Override
public String toString() {
return "SqsConfiguration[queueName=" + queueName
@@ -194,6 +203,7 @@ public class SqsConfiguration {
+ ", accessKey=" + accessKey
+ ", secretKey=xxxxxxxxxxxxxxx"
+ ", deleteAfterRead=" + deleteAfterRead
+ + ", deleteIfFiltered=" + deleteIfFiltered
+ ", visibilityTimeout=" + visibilityTimeout
+ ", attributeNames=" + attributeNames
+ ", waitTimeSeconds=" + waitTimeSeconds
http://git-wip-us.apache.org/repos/asf/camel/blob/8257f46b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java
index 4fae7d4..4c76c15 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java
@@ -180,12 +180,13 @@ public class SqsConsumer extends ScheduledBatchPollingConsumer {
*/
protected void processCommit(Exchange exchange) {
try {
- if (getConfiguration().isDeleteAfterRead()) {
+
+ if (shouldDelete(exchange)) {
String receiptHandle = exchange.getIn().getHeader(SqsConstants.RECEIPT_HANDLE, String.class);
DeleteMessageRequest deleteRequest = new DeleteMessageRequest(getQueueUrl(), receiptHandle);
-
+
LOG.trace("Deleting message with receipt handle {}...", receiptHandle);
-
+
getClient().deleteMessage(deleteRequest);
LOG.trace("Deleted message with receipt handle {}...", receiptHandle);
@@ -195,6 +196,18 @@ public class SqsConsumer extends ScheduledBatchPollingConsumer {
}
}
+ private boolean shouldDelete(Exchange exchange) {
+ return getConfiguration().isDeleteAfterRead()
+ && (getConfiguration().isDeleteIfFiltered()
+ || (!getConfiguration().isDeleteIfFiltered()
+ && passedThroughFilter(exchange)));
+ }
+
+ private boolean passedThroughFilter(Exchange exchange) {
+ return exchange.getProperties().containsKey(Exchange.FILTER_MATCHED)
+ && ((Boolean) exchange.getProperties().get(Exchange.FILTER_MATCHED));
+ }
+
/**
* Strategy when processing the exchange failed.
*
http://git-wip-us.apache.org/repos/asf/camel/blob/8257f46b/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/AmazonSQSClientMock.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/AmazonSQSClientMock.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/AmazonSQSClientMock.java
index ac0838f..4d227a7 100644
--- a/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/AmazonSQSClientMock.java
+++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/AmazonSQSClientMock.java
@@ -20,9 +20,14 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
import com.amazonaws.AmazonClientException;
import com.amazonaws.AmazonServiceException;
@@ -41,11 +46,14 @@ import com.amazonaws.services.sqs.model.SendMessageResult;
import com.amazonaws.services.sqs.model.SetQueueAttributesRequest;
public class AmazonSQSClientMock extends AmazonSQSClient {
-
+
List<Message> messages = new ArrayList<Message>();
Map<String, Map<String, String>> queueAttributes = new HashMap<String, Map<String, String>>();
List<ChangeMessageVisibilityRequest> changeMessageVisibilityRequests = new CopyOnWriteArrayList<ChangeMessageVisibilityRequest>();
-
+ private Map<String, CreateQueueRequest> queues = new LinkedHashMap<String, CreateQueueRequest>();
+ private Map<String, ScheduledFuture> inFlight = new LinkedHashMap<String, ScheduledFuture>();
+ private ScheduledExecutorService scheduler;
+
public AmazonSQSClientMock() {
super(new BasicAWSCredentials("myAccessKey", "mySecretKey"));
}
@@ -58,8 +66,10 @@ public class AmazonSQSClientMock extends AmazonSQSClient {
@Override
public CreateQueueResult createQueue(CreateQueueRequest createQueueRequest) throws AmazonServiceException, AmazonClientException {
+ String queueName = "https://queue.amazonaws.com/541925086079/" + createQueueRequest.getQueueName();
+ queues.put(queueName, createQueueRequest);
CreateQueueResult result = new CreateQueueResult();
- result.setQueueUrl("https://queue.amazonaws.com/541925086079/MyQueue");
+ result.setQueueUrl(queueName);
return result;
}
@@ -91,8 +101,10 @@ public class AmazonSQSClientMock extends AmazonSQSClient {
synchronized (messages) {
int fetchSize = 0;
for (Iterator<Message> iterator = messages.iterator(); iterator.hasNext() && fetchSize < maxNumberOfMessages; fetchSize++) {
- resultMessages.add(iterator.next());
+ Message rc = iterator.next();
+ resultMessages.add(rc);
iterator.remove();
+ scheduleCancelInflight(receiveMessageRequest.getQueueUrl(), rc);
}
}
@@ -100,9 +112,52 @@ public class AmazonSQSClientMock extends AmazonSQSClient {
return result;
}
+ /*
+ * Cancel (put back onto queue) in flight messages if the visibility time has expired
+ * and has not been manually deleted (ack'd)
+ */
+ private void scheduleCancelInflight(final String queueUrl, final Message message) {
+ if (scheduler != null) {
+ int visibility = getVisibilityForQueue(queueUrl);
+ if (visibility > 0) {
+ ScheduledFuture task = scheduler.schedule(new Runnable() {
+ @Override
+ public void run() {
+ synchronized (messages) {
+ // put it back!
+ messages.add(message);
+ }
+ }
+ }, visibility, TimeUnit.SECONDS);
+
+ inFlight.put(message.getReceiptHandle(), task);
+ }
+ }
+ }
+
+ private int getVisibilityForQueue(String queueUrl) {
+ Map<String, String> queueAttr = queues.get(queueUrl).getAttributes();
+ if (queueAttr.containsKey("VisibilityTimeout")) {
+ return Integer.parseInt(queueAttr.get("VisibilityTimeout"));
+ }
+ return 0;
+ }
+
+ public ScheduledExecutorService getScheduler() {
+ return scheduler;
+ }
+
+ public void setScheduler(ScheduledExecutorService scheduler) {
+ this.scheduler = scheduler;
+ }
+
@Override
- public void deleteMessage(DeleteMessageRequest deleteMessageRequest) throws AmazonServiceException, AmazonClientException {
- // noop
+ public void deleteMessage(DeleteMessageRequest deleteMessageRequest) throws AmazonClientException {
+ String receiptHandle = deleteMessageRequest.getReceiptHandle();
+ if (inFlight.containsKey(receiptHandle)) {
+ ScheduledFuture inFlightTask = inFlight.get(receiptHandle);
+ inFlightTask.cancel(true);
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/camel/blob/8257f46b/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsFilterMessagesWithNoDeleteTest.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsFilterMessagesWithNoDeleteTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsFilterMessagesWithNoDeleteTest.java
new file mode 100644
index 0000000..2c72d7e
--- /dev/null
+++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsFilterMessagesWithNoDeleteTest.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.sqs;
+
+import java.util.concurrent.TimeUnit;
+
+import com.amazonaws.services.sqs.model.Message;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.impl.JndiRegistry;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+
+public class SqsFilterMessagesWithNoDeleteTest extends CamelTestSupport {
+
+
+ // put some test messages onto the 'queue'
+ private void populateMessages(AmazonSQSClientMock clientMock) {
+ Message message = new Message();
+ message.setBody("Message: hello, world!");
+ message.setMD5OfBody("6a1559560f67c5e7a7d5d838bf0272ee");
+ message.setMessageId("f6fb6f99-5eb2-4be4-9b15-144774141458");
+ message.setReceiptHandle("0NNAq8PwvXsyZkR6yu4nQ07FGxNmOBWi5");
+
+ clientMock.messages.add(message);
+ }
+
+ @Test
+ public void testDoesNotGetThroughFilter() throws Exception {
+
+ final String sqsURI = String.format("aws-sqs://MyQueue?amazonSQSClient=#amazonSQSClient"
+ // note we will NOT delete if this message gets filtered out
+ + "&deleteIfFiltered=false"
+ + "&defaultVisibilityTimeout=1");
+
+ AmazonSQSClientMock clientMock = new AmazonSQSClientMock();
+ populateMessages(clientMock);
+ JndiRegistry registry = new JndiRegistry(createJndiContext());
+
+ DefaultCamelContext ctx = new DefaultCamelContext(registry);
+ ctx.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from(sqsURI)
+ // try to filter using a non-existent header... should not go through
+ .filter(simple("${header.login} == true"))
+ .to("mock:result");
+
+ }
+ });
+ MockEndpoint result = MockEndpoint.resolve(ctx, "mock:result");
+ clientMock.setScheduler(ctx.getExecutorServiceManager().newScheduledThreadPool(clientMock, "ClientMock Scheduler", 1));
+ registry.bind("amazonSQSClient", clientMock);
+
+ result.expectedMessageCount(0);
+
+ ctx.start();
+
+ // we shouldn't get
+ result.assertIsSatisfied(1000, TimeUnit.MILLISECONDS);
+
+
+ // however, the message should not be deleted, that is, it should be left on the queue
+ String response = ctx.createConsumerTemplate().receiveBody(sqsURI, 5000, String.class);
+
+ assertEquals(response, "Message: hello, world!");
+
+ }
+
+ @Test
+ public void testGetThroughFilter() throws Exception {
+ final String sqsURI = String.format("aws-sqs://MyQueue?amazonSQSClient=#amazonSQSClient"
+ // note we will NOT delete if this message gets filtered out, but if it goes
+ // through filter, it should be deleted!
+ + "&deleteIfFiltered=false"
+ + "&defaultVisibilityTimeout=1");
+
+ AmazonSQSClientMock clientMock = new AmazonSQSClientMock();
+ populateMessages(clientMock);
+ JndiRegistry registry = new JndiRegistry(createJndiContext());
+
+ DefaultCamelContext ctx = new DefaultCamelContext(registry);
+ ctx.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from(sqsURI)
+ .setHeader("login", constant(true))
+
+ // this filter should allow the message to pass..
+ .filter(simple("${header.login} == true"))
+ .to("mock:result");
+
+ }
+ });
+ MockEndpoint result = MockEndpoint.resolve(ctx, "mock:result");
+ registry.bind("amazonSQSClient", clientMock);
+ clientMock.setScheduler(ctx.getExecutorServiceManager().newScheduledThreadPool(clientMock, "ClientMock Scheduler", 1));
+
+ result.expectedMessageCount(1);
+ ctx.start();
+
+ // the message should get through filter and mock should assert this
+ result.assertIsSatisfied(1000, TimeUnit.MILLISECONDS);
+
+ // however, the message should not be deleted, that is, it should be left on the queue
+ String response = ctx.createConsumerTemplate().receiveBody(sqsURI, 5000, String.class);
+
+ assertNull(response);
+ }
+
+}