You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/03/11 14:48:11 UTC

[3/3] incubator-usergrid git commit: adding dead letter

adding dead letter


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/0aeaa881
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/0aeaa881
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/0aeaa881

Branch: refs/heads/USERGRID-473
Commit: 0aeaa881556fa85d330fb877deca522c0b28e2f5
Parents: 3bd96c9
Author: Shawn Feldman <sf...@apache.org>
Authored: Wed Mar 11 07:47:57 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Wed Mar 11 07:47:57 2015 -0600

----------------------------------------------------------------------
 .../queue/impl/SQSQueueManagerImpl.java         | 84 ++++++++++++--------
 .../persistence/queue/QueueManagerTest.java     |  2 -
 2 files changed, 52 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0aeaa881/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
index f202fda..cfaf0b1 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
@@ -36,12 +36,11 @@ import com.google.inject.assistedinject.Assisted;
 import org.apache.commons.lang.StringUtils;
 import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 import org.apache.usergrid.persistence.queue.*;
+import org.apache.usergrid.persistence.queue.Queue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import java.io.*;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.UUID;
+import java.util.*;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
@@ -55,33 +54,49 @@ public class SQSQueueManagerImpl implements QueueManager {
     private static SmileFactory smileFactory = new SmileFactory();
 
     private static LoadingCache<SqsLoader, Queue> urlMap = CacheBuilder.newBuilder()
-            .maximumSize(1000)
-            .build(new CacheLoader<SqsLoader, Queue>() {
-                       @Override
-                       public Queue load(SqsLoader queueLoader) throws Exception {
-                           Queue queue = null;
-                           try {
-                               GetQueueUrlResult result = queueLoader.getClient().getQueueUrl(queueLoader.getKey());
-                               queue = new Queue(result.getQueueUrl());
-                           } catch (QueueDoesNotExistException queueDoesNotExistException) {
-                               queue = null;
-                           } catch (Exception e) {
-                               LOG.error("failed to get queue from service", e);
-                               throw e;
-                           }
-                           if (queue == null) {
-                               String name = queueLoader.getKey();
-                               CreateQueueRequest createQueueRequest = new CreateQueueRequest()
-                                       .withQueueName(name);
-                               CreateQueueResult result = queueLoader.getClient().createQueue(createQueueRequest);
-                               String url = result.getQueueUrl();
-                               queue = new Queue(url);
-                               LOG.info("Created queue with url {}", url);
-                           }
-                           return queue;
+        .maximumSize(1000)
+        .build(new CacheLoader<SqsLoader, Queue>() {
+                   @Override
+                   public Queue load(SqsLoader queueLoader) throws Exception {
+                       Queue queue = null;
+                       try {
+                           GetQueueUrlResult result = queueLoader.getClient().getQueueUrl(queueLoader.getKey());
+                           queue = new Queue(result.getQueueUrl());
+                       } catch (QueueDoesNotExistException queueDoesNotExistException) {
+                           queue = null;
+                       } catch (Exception e) {
+                           LOG.error("failed to get queue from service", e);
+                           throw e;
                        }
+                       if (queue == null) {
+                           String name = queueLoader.getKey();
+                           CreateQueueRequest createQueueRequest = new CreateQueueRequest()
+                               .withQueueName(name);
+                           CreateQueueResult result = queueLoader.getClient().createQueue(createQueueRequest);
+                           String queueUrl = result.getQueueUrl();
+
+                           setDeadLetterQueue(queueLoader.client,queueLoader.config(), queueUrl, name+"_dead");
+                           queue = new Queue(queueUrl);
+                           LOG.info("Created queue with url {}", queueUrl);
+                       }
+                       return queue;
                    }
-            );
+               }
+        );
+
+    private static void setDeadLetterQueue(AmazonSQSClient client, QueueFig fig,  String queueUrl, String deadLetterName) {
+        CreateQueueRequest deadLetterQueueRequest = new CreateQueueRequest()
+            .withQueueName(deadLetterName);
+        CreateQueueResult deadLetterResult = client.createQueue(deadLetterQueueRequest);
+        String deadLetterUrl = deadLetterResult.getQueueUrl();
+        String redrivePolicy = String.format("{\"maxReceiveCount\":\"%s\", \"deadLetterTargetArn\":\"%s\"}", fig.getMaxReceiveCount(), deadLetterUrl);
+        SetQueueAttributesRequest queueAttributes = new SetQueueAttributesRequest();
+        Map<String, String> attributes = new HashMap<>();
+        attributes.put("RedrivePolicy", redrivePolicy);
+        queueAttributes.setAttributes(attributes);
+        queueAttributes.setQueueUrl(queueUrl);
+        client.setQueueAttributes(queueAttributes);
+    }
 
     @Inject
     public SQSQueueManagerImpl(@Assisted QueueScope scope, QueueFig fig){
@@ -111,7 +126,7 @@ public class SQSQueueManagerImpl implements QueueManager {
 
     public Queue getQueue() {
         try {
-            Queue queue = urlMap.get(new SqsLoader(getName(),sqs));
+            Queue queue = urlMap.get(new SqsLoader(getName(),sqs,fig));
             return queue;
         } catch (ExecutionException ee) {
             throw new RuntimeException(ee);
@@ -127,6 +142,7 @@ public class SQSQueueManagerImpl implements QueueManager {
         waitTime = waitTime/1000;
         String url = getQueue().getUrl();
         LOG.info("Getting {} messages from {}", limit, url);
+
         ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(url);
         receiveMessageRequest.setMaxNumberOfMessages(limit);
         receiveMessageRequest.setVisibilityTimeout(transactionTimeout);
@@ -191,8 +207,8 @@ public class SQSQueueManagerImpl implements QueueManager {
         LOG.info("Commit message {} to queue {}",queueMessage.getMessageId(),url);
 
         sqs.deleteMessage(new DeleteMessageRequest()
-                .withQueueUrl(url)
-                .withReceiptHandle(queueMessage.getHandle()));
+            .withQueueUrl(url)
+            .withReceiptHandle(queueMessage.getHandle()));
     }
 
 
@@ -230,10 +246,12 @@ public class SQSQueueManagerImpl implements QueueManager {
     public class SqsLoader {
         private final String key;
         private final AmazonSQSClient client;
+        private final QueueFig fig;
 
-        public SqsLoader(String key, AmazonSQSClient client) {
+        public SqsLoader(String key, AmazonSQSClient client,QueueFig fig) {
             this.key = key;
             this.client = client;
+            this.fig = fig;
         }
 
         public AmazonSQSClient getClient() {
@@ -265,5 +283,7 @@ public class SQSQueueManagerImpl implements QueueManager {
             return getKey();
         }
 
+        public QueueFig config(){return fig;}
+
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0aeaa881/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
index 935fd16..89fb9cd 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
@@ -76,7 +76,6 @@ public class QueueManagerTest {
         assertEquals(scope.getApplication().getUuid(),uuid);
     }
 
-    @Ignore("need aws creds")
     @Test
     public void send() throws IOException,ClassNotFoundException{
         String value = "bodytest";
@@ -92,7 +91,6 @@ public class QueueManagerTest {
 
     }
 
-    @Ignore("need aws creds")
     @Test
     public void sendMore() throws IOException,ClassNotFoundException{
         HashMap<String,String> values = new HashMap<>();