You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/03/11 14:41:21 UTC

incubator-usergrid git commit: add deadletter queue

Repository: incubator-usergrid
Updated Branches:
  refs/heads/USERGRID-473 [created] c021d5d6a


add deadletter queue


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/c021d5d6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/c021d5d6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/c021d5d6

Branch: refs/heads/USERGRID-473
Commit: c021d5d6a4c73e8dcb7d40c8d432c13a55b4b068
Parents: 5c7a5f8
Author: Shawn Feldman <sf...@apache.org>
Authored: Tue Mar 10 17:35:35 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Tue Mar 10 17:35:35 2015 -0600

----------------------------------------------------------------------
 .../usergrid/persistence/queue/QueueFig.java    |  3 ++
 .../queue/impl/SQSQueueManagerImpl.java         | 35 +++++++++++++++-----
 2 files changed, 30 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c021d5d6/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java
index 197b791..b6c6ad7 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java
@@ -16,4 +16,7 @@ public interface QueueFig extends GuicyFig {
     @Default("usergrid")
     public String getPrefix();
 
+    @Key( "usergrid.queue.max.receive.count" )
+    @Default("5")
+    public Integer getMaxReceiveCount();
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c021d5d6/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
index f202fda..e859437 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
@@ -36,12 +36,11 @@ import com.google.inject.assistedinject.Assisted;
 import org.apache.commons.lang.StringUtils;
 import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 import org.apache.usergrid.persistence.queue.*;
+import org.apache.usergrid.persistence.queue.Queue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import java.io.*;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.UUID;
+import java.util.*;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
@@ -74,15 +73,30 @@ public class SQSQueueManagerImpl implements QueueManager {
                                CreateQueueRequest createQueueRequest = new CreateQueueRequest()
                                        .withQueueName(name);
                                CreateQueueResult result = queueLoader.getClient().createQueue(createQueueRequest);
-                               String url = result.getQueueUrl();
-                               queue = new Queue(url);
-                               LOG.info("Created queue with url {}", url);
+                               String queueUrl = result.getQueueUrl();
+                               setDeadLetterQueue(queueLoader.client,queueLoader.config(), queueUrl, name+"_dead_letter");
+                               queue = new Queue(queueUrl);
+                               LOG.info("Created queue with url {}", queueUrl);
                            }
                            return queue;
                        }
                    }
             );
 
+    private static void setDeadLetterQueue(AmazonSQSClient client, QueueFig fig, String deadLetterName, String queueUrl) {
+        CreateQueueRequest deadLetterQueueRequest = new CreateQueueRequest()
+            .withQueueName(deadLetterName);
+        CreateQueueResult deadLetterResult = client.createQueue(deadLetterQueueRequest);
+        String deadLetterUrl = deadLetterResult.getQueueUrl();
+        String redrivePolicy = String.format("{\"maxReceiveCount\":\"%s\", \"deadLetterTargetArn\":\"%s\"}", fig.getMaxReceiveCount(), deadLetterUrl);
+        SetQueueAttributesRequest queueAttributes = new SetQueueAttributesRequest();
+        Map<String, String> attributes = new HashMap<>();
+        attributes.put("RedrivePolicy", redrivePolicy);
+        queueAttributes.setAttributes(attributes);
+        queueAttributes.setQueueUrl(queueUrl);
+        client.setQueueAttributes(queueAttributes);
+    }
+
     @Inject
     public SQSQueueManagerImpl(@Assisted QueueScope scope, QueueFig fig){
         this.fig = fig;
@@ -111,7 +125,7 @@ public class SQSQueueManagerImpl implements QueueManager {
 
     public Queue getQueue() {
         try {
-            Queue queue = urlMap.get(new SqsLoader(getName(),sqs));
+            Queue queue = urlMap.get(new SqsLoader(getName(),sqs,fig));
             return queue;
         } catch (ExecutionException ee) {
             throw new RuntimeException(ee);
@@ -127,6 +141,7 @@ public class SQSQueueManagerImpl implements QueueManager {
         waitTime = waitTime/1000;
         String url = getQueue().getUrl();
         LOG.info("Getting {} messages from {}", limit, url);
+
         ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(url);
         receiveMessageRequest.setMaxNumberOfMessages(limit);
         receiveMessageRequest.setVisibilityTimeout(transactionTimeout);
@@ -230,10 +245,12 @@ public class SQSQueueManagerImpl implements QueueManager {
     public class SqsLoader {
         private final String key;
         private final AmazonSQSClient client;
+        private final QueueFig fig;
 
-        public SqsLoader(String key, AmazonSQSClient client) {
+        public SqsLoader(String key, AmazonSQSClient client,QueueFig fig) {
             this.key = key;
             this.client = client;
+            this.fig = fig;
         }
 
         public AmazonSQSClient getClient() {
@@ -265,5 +282,7 @@ public class SQSQueueManagerImpl implements QueueManager {
             return getKey();
         }
 
+        public QueueFig config(){return fig;}
+
     }
 }