You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/03/11 14:41:21 UTC
incubator-usergrid git commit: add deadletter queue
Repository: incubator-usergrid
Updated Branches:
refs/heads/USERGRID-473 [created] c021d5d6a
add deadletter queue
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/c021d5d6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/c021d5d6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/c021d5d6
Branch: refs/heads/USERGRID-473
Commit: c021d5d6a4c73e8dcb7d40c8d432c13a55b4b068
Parents: 5c7a5f8
Author: Shawn Feldman <sf...@apache.org>
Authored: Tue Mar 10 17:35:35 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Tue Mar 10 17:35:35 2015 -0600
----------------------------------------------------------------------
.../usergrid/persistence/queue/QueueFig.java | 3 ++
.../queue/impl/SQSQueueManagerImpl.java | 35 +++++++++++++++-----
2 files changed, 30 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c021d5d6/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java
index 197b791..b6c6ad7 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java
@@ -16,4 +16,7 @@ public interface QueueFig extends GuicyFig {
@Default("usergrid")
public String getPrefix();
+ @Key( "usergrid.queue.max.receive.count" )
+ @Default("5")
+ public Integer getMaxReceiveCount();
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c021d5d6/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
index f202fda..e859437 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
@@ -36,12 +36,11 @@ import com.google.inject.assistedinject.Assisted;
import org.apache.commons.lang.StringUtils;
import org.apache.usergrid.persistence.model.util.UUIDGenerator;
import org.apache.usergrid.persistence.queue.*;
+import org.apache.usergrid.persistence.queue.Queue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.*;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.UUID;
+import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@@ -74,15 +73,30 @@ public class SQSQueueManagerImpl implements QueueManager {
CreateQueueRequest createQueueRequest = new CreateQueueRequest()
.withQueueName(name);
CreateQueueResult result = queueLoader.getClient().createQueue(createQueueRequest);
- String url = result.getQueueUrl();
- queue = new Queue(url);
- LOG.info("Created queue with url {}", url);
+ String queueUrl = result.getQueueUrl();
+ setDeadLetterQueue(queueLoader.client,queueLoader.config(), queueUrl, name+"_dead_letter");
+ queue = new Queue(queueUrl);
+ LOG.info("Created queue with url {}", queueUrl);
}
return queue;
}
}
);
+ private static void setDeadLetterQueue(AmazonSQSClient client, QueueFig fig, String deadLetterName, String queueUrl) {
+ CreateQueueRequest deadLetterQueueRequest = new CreateQueueRequest()
+ .withQueueName(deadLetterName);
+ CreateQueueResult deadLetterResult = client.createQueue(deadLetterQueueRequest);
+ String deadLetterUrl = deadLetterResult.getQueueUrl();
+ String redrivePolicy = String.format("{\"maxReceiveCount\":\"%s\", \"deadLetterTargetArn\":\"%s\"}", fig.getMaxReceiveCount(), deadLetterUrl);
+ SetQueueAttributesRequest queueAttributes = new SetQueueAttributesRequest();
+ Map<String, String> attributes = new HashMap<>();
+ attributes.put("RedrivePolicy", redrivePolicy);
+ queueAttributes.setAttributes(attributes);
+ queueAttributes.setQueueUrl(queueUrl);
+ client.setQueueAttributes(queueAttributes);
+ }
+
@Inject
public SQSQueueManagerImpl(@Assisted QueueScope scope, QueueFig fig){
this.fig = fig;
@@ -111,7 +125,7 @@ public class SQSQueueManagerImpl implements QueueManager {
public Queue getQueue() {
try {
- Queue queue = urlMap.get(new SqsLoader(getName(),sqs));
+ Queue queue = urlMap.get(new SqsLoader(getName(),sqs,fig));
return queue;
} catch (ExecutionException ee) {
throw new RuntimeException(ee);
@@ -127,6 +141,7 @@ public class SQSQueueManagerImpl implements QueueManager {
waitTime = waitTime/1000;
String url = getQueue().getUrl();
LOG.info("Getting {} messages from {}", limit, url);
+
ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(url);
receiveMessageRequest.setMaxNumberOfMessages(limit);
receiveMessageRequest.setVisibilityTimeout(transactionTimeout);
@@ -230,10 +245,12 @@ public class SQSQueueManagerImpl implements QueueManager {
public class SqsLoader {
private final String key;
private final AmazonSQSClient client;
+ private final QueueFig fig;
- public SqsLoader(String key, AmazonSQSClient client) {
+ public SqsLoader(String key, AmazonSQSClient client,QueueFig fig) {
this.key = key;
this.client = client;
+ this.fig = fig;
}
public AmazonSQSClient getClient() {
@@ -265,5 +282,7 @@ public class SQSQueueManagerImpl implements QueueManager {
return getKey();
}
+ public QueueFig config(){return fig;}
+
}
}