You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2013/10/11 12:33:19 UTC

[2/2] git commit: CAMEL-6850: Allow AWS SQS to not ack or even if it doesn't encounter an exception. Thanks to Christian Posta for the patch.

CAMEL-6850: Allow AWS SQS to not ack or even if it doesn't encounter an exception. Thanks to Christian Posta for the patch.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/8257f46b
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/8257f46b
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/8257f46b

Branch: refs/heads/camel-2.12.x
Commit: 8257f46beb04dc2b2af9552e092ca1c296e99f45
Parents: ddc6b43
Author: Claus Ibsen <da...@apache.org>
Authored: Fri Oct 11 12:32:41 2013 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Fri Oct 11 12:32:55 2013 +0200

----------------------------------------------------------------------
 .../component/aws/sqs/SqsConfiguration.java     |  10 ++
 .../camel/component/aws/sqs/SqsConsumer.java    |  19 ++-
 .../component/aws/sqs/AmazonSQSClientMock.java  |  67 +++++++++-
 .../sqs/SqsFilterMessagesWithNoDeleteTest.java  | 127 +++++++++++++++++++
 4 files changed, 214 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/8257f46b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java
index e306221..87f102c 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConfiguration.java
@@ -36,6 +36,7 @@ public class SqsConfiguration {
     
     // consumer properties
     private Boolean deleteAfterRead = Boolean.TRUE;
+    private Boolean deleteIfFiltered = Boolean.TRUE;
     private Integer visibilityTimeout;
     private Collection<String> attributeNames;
     private Integer waitTimeSeconds;
@@ -187,6 +188,14 @@ public class SqsConfiguration {
         this.queueOwnerAWSAccountId = queueOwnerAWSAccountId;
     }
 
+    public Boolean isDeleteIfFiltered() {
+        return deleteIfFiltered;
+    }
+
+    public void setDeleteIfFiltered(Boolean deleteIfFiltered) {
+        this.deleteIfFiltered = deleteIfFiltered;
+    }
+
     @Override
     public String toString() {
         return "SqsConfiguration[queueName=" + queueName
@@ -194,6 +203,7 @@ public class SqsConfiguration {
             + ", accessKey=" + accessKey
             + ", secretKey=xxxxxxxxxxxxxxx" 
             + ", deleteAfterRead=" + deleteAfterRead
+            + ", deleteIfFiltered=" + deleteIfFiltered
             + ", visibilityTimeout=" + visibilityTimeout
             + ", attributeNames=" + attributeNames
             + ", waitTimeSeconds=" + waitTimeSeconds

http://git-wip-us.apache.org/repos/asf/camel/blob/8257f46b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java
index 4fae7d4..4c76c15 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java
@@ -180,12 +180,13 @@ public class SqsConsumer extends ScheduledBatchPollingConsumer {
      */
     protected void processCommit(Exchange exchange) {
         try {
-            if (getConfiguration().isDeleteAfterRead()) {
+
+            if (shouldDelete(exchange)) {
                 String receiptHandle = exchange.getIn().getHeader(SqsConstants.RECEIPT_HANDLE, String.class);
                 DeleteMessageRequest deleteRequest = new DeleteMessageRequest(getQueueUrl(), receiptHandle);
-                
+
                 LOG.trace("Deleting message with receipt handle {}...", receiptHandle);
-                
+
                 getClient().deleteMessage(deleteRequest);
 
                 LOG.trace("Deleted message with receipt handle {}...", receiptHandle);
@@ -195,6 +196,18 @@ public class SqsConsumer extends ScheduledBatchPollingConsumer {
         }
     }
 
+    private boolean shouldDelete(Exchange exchange) {
+        return getConfiguration().isDeleteAfterRead()
+                && (getConfiguration().isDeleteIfFiltered()
+                    || (!getConfiguration().isDeleteIfFiltered()
+                        && passedThroughFilter(exchange)));
+    }
+
+    private boolean passedThroughFilter(Exchange exchange) {
+        return exchange.getProperties().containsKey(Exchange.FILTER_MATCHED)
+                && ((Boolean) exchange.getProperties().get(Exchange.FILTER_MATCHED));
+    }
+
     /**
      * Strategy when processing the exchange failed.
      *

http://git-wip-us.apache.org/repos/asf/camel/blob/8257f46b/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/AmazonSQSClientMock.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/AmazonSQSClientMock.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/AmazonSQSClientMock.java
index ac0838f..4d227a7 100644
--- a/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/AmazonSQSClientMock.java
+++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/AmazonSQSClientMock.java
@@ -20,9 +20,14 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 
 import com.amazonaws.AmazonClientException;
 import com.amazonaws.AmazonServiceException;
@@ -41,11 +46,14 @@ import com.amazonaws.services.sqs.model.SendMessageResult;
 import com.amazonaws.services.sqs.model.SetQueueAttributesRequest;
 
 public class AmazonSQSClientMock extends AmazonSQSClient {
-    
+
     List<Message> messages = new ArrayList<Message>();
     Map<String, Map<String, String>> queueAttributes = new HashMap<String, Map<String, String>>();
     List<ChangeMessageVisibilityRequest> changeMessageVisibilityRequests = new CopyOnWriteArrayList<ChangeMessageVisibilityRequest>();
-    
+    private Map<String, CreateQueueRequest> queues = new LinkedHashMap<String, CreateQueueRequest>();
+    private Map<String, ScheduledFuture> inFlight = new LinkedHashMap<String, ScheduledFuture>();
+    private ScheduledExecutorService scheduler;
+
     public AmazonSQSClientMock() {
         super(new BasicAWSCredentials("myAccessKey", "mySecretKey"));
     }
@@ -58,8 +66,10 @@ public class AmazonSQSClientMock extends AmazonSQSClient {
 
     @Override
     public CreateQueueResult createQueue(CreateQueueRequest createQueueRequest) throws AmazonServiceException, AmazonClientException {
+        String queueName = "https://queue.amazonaws.com/541925086079/" + createQueueRequest.getQueueName();
+        queues.put(queueName, createQueueRequest);
         CreateQueueResult result = new CreateQueueResult();
-        result.setQueueUrl("https://queue.amazonaws.com/541925086079/MyQueue");
+        result.setQueueUrl(queueName);
         return result;
     }
 
@@ -91,8 +101,10 @@ public class AmazonSQSClientMock extends AmazonSQSClient {
         synchronized (messages) {
             int fetchSize = 0;
             for (Iterator<Message> iterator = messages.iterator(); iterator.hasNext() && fetchSize < maxNumberOfMessages; fetchSize++) {
-                resultMessages.add(iterator.next());
+                Message rc = iterator.next();
+                resultMessages.add(rc);
                 iterator.remove();
+                scheduleCancelInflight(receiveMessageRequest.getQueueUrl(), rc);
             }
         }
         
@@ -100,9 +112,52 @@ public class AmazonSQSClientMock extends AmazonSQSClient {
         return result;
     }
 
+    /*
+     * Cancel (put back onto queue) in flight messages if the visibility time has expired
+     * and has not been manually deleted (ack'd)
+     */
+    private void scheduleCancelInflight(final String queueUrl, final Message message) {
+        if (scheduler != null) {
+            int visibility = getVisibilityForQueue(queueUrl);
+            if (visibility > 0) {
+                ScheduledFuture task = scheduler.schedule(new Runnable() {
+                    @Override
+                    public void run() {
+                        synchronized (messages) {
+                            // put it back!
+                            messages.add(message);
+                        }
+                    }
+                }, visibility, TimeUnit.SECONDS);
+
+                inFlight.put(message.getReceiptHandle(), task);
+            }
+        }
+    }
+
+    private int getVisibilityForQueue(String queueUrl) {
+        Map<String, String> queueAttr = queues.get(queueUrl).getAttributes();
+        if (queueAttr.containsKey("VisibilityTimeout")) {
+            return Integer.parseInt(queueAttr.get("VisibilityTimeout"));
+        }
+        return 0;
+    }
+
+    public ScheduledExecutorService getScheduler() {
+        return scheduler;
+    }
+
+    public void setScheduler(ScheduledExecutorService scheduler) {
+        this.scheduler = scheduler;
+    }
+
     @Override
-    public void deleteMessage(DeleteMessageRequest deleteMessageRequest) throws AmazonServiceException, AmazonClientException {
-        // noop
+    public void deleteMessage(DeleteMessageRequest deleteMessageRequest) throws AmazonClientException {
+        String receiptHandle = deleteMessageRequest.getReceiptHandle();
+        if (inFlight.containsKey(receiptHandle)) {
+            ScheduledFuture inFlightTask = inFlight.get(receiptHandle);
+            inFlightTask.cancel(true);
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/camel/blob/8257f46b/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsFilterMessagesWithNoDeleteTest.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsFilterMessagesWithNoDeleteTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsFilterMessagesWithNoDeleteTest.java
new file mode 100644
index 0000000..2c72d7e
--- /dev/null
+++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsFilterMessagesWithNoDeleteTest.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.sqs;
+
+import java.util.concurrent.TimeUnit;
+
+import com.amazonaws.services.sqs.model.Message;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.impl.JndiRegistry;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+
+public class SqsFilterMessagesWithNoDeleteTest extends CamelTestSupport {
+
+
+    // put some test messages onto the 'queue'
+    private void populateMessages(AmazonSQSClientMock clientMock) {
+        Message message = new Message();
+        message.setBody("Message: hello, world!");
+        message.setMD5OfBody("6a1559560f67c5e7a7d5d838bf0272ee");
+        message.setMessageId("f6fb6f99-5eb2-4be4-9b15-144774141458");
+        message.setReceiptHandle("0NNAq8PwvXsyZkR6yu4nQ07FGxNmOBWi5");
+
+        clientMock.messages.add(message);
+    }
+
+    @Test
+    public void testDoesNotGetThroughFilter() throws Exception {
+
+        final String sqsURI = String.format("aws-sqs://MyQueue?amazonSQSClient=#amazonSQSClient"
+                // note we will NOT delete if this message gets filtered out
+                + "&deleteIfFiltered=false"
+                + "&defaultVisibilityTimeout=1");
+
+        AmazonSQSClientMock clientMock = new AmazonSQSClientMock();
+        populateMessages(clientMock);
+        JndiRegistry registry = new JndiRegistry(createJndiContext());
+
+        DefaultCamelContext ctx = new DefaultCamelContext(registry);
+        ctx.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from(sqsURI)
+                        // try to filter using a non-existent header... should not go through
+                        .filter(simple("${header.login} == true"))
+                        .to("mock:result");
+
+            }
+        });
+        MockEndpoint result = MockEndpoint.resolve(ctx, "mock:result");
+        clientMock.setScheduler(ctx.getExecutorServiceManager().newScheduledThreadPool(clientMock, "ClientMock Scheduler", 1));
+        registry.bind("amazonSQSClient", clientMock);
+
+        result.expectedMessageCount(0);
+
+        ctx.start();
+
+        // we shouldn't get
+        result.assertIsSatisfied(1000, TimeUnit.MILLISECONDS);
+
+
+        // however, the message should not be deleted, that is, it should be left on the queue
+        String response = ctx.createConsumerTemplate().receiveBody(sqsURI, 5000, String.class);
+
+        assertEquals(response, "Message: hello, world!");
+
+    }
+
+    @Test
+    public void testGetThroughFilter() throws Exception {
+        final String sqsURI = String.format("aws-sqs://MyQueue?amazonSQSClient=#amazonSQSClient"
+                // note we will NOT delete if this message gets filtered out, but if it goes
+                // through filter, it should be deleted!
+                + "&deleteIfFiltered=false"
+                + "&defaultVisibilityTimeout=1");
+
+        AmazonSQSClientMock clientMock = new AmazonSQSClientMock();
+        populateMessages(clientMock);
+        JndiRegistry registry = new JndiRegistry(createJndiContext());
+
+        DefaultCamelContext ctx = new DefaultCamelContext(registry);
+        ctx.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from(sqsURI)
+                        .setHeader("login", constant(true))
+
+                        // this filter should allow the message to pass..
+                        .filter(simple("${header.login} == true"))
+                        .to("mock:result");
+
+            }
+        });
+        MockEndpoint result = MockEndpoint.resolve(ctx, "mock:result");
+        registry.bind("amazonSQSClient", clientMock);
+        clientMock.setScheduler(ctx.getExecutorServiceManager().newScheduledThreadPool(clientMock, "ClientMock Scheduler", 1));
+
+        result.expectedMessageCount(1);
+        ctx.start();
+
+        // the message should get through filter and mock should assert this
+        result.assertIsSatisfied(1000, TimeUnit.MILLISECONDS);
+
+        // however, the message should not be deleted, that is, it should be left on the queue
+        String response = ctx.createConsumerTemplate().receiveBody(sqsURI, 5000, String.class);
+
+        assertNull(response);
+    }
+
+}