You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/05/28 01:52:44 UTC
[08/15] incubator-usergrid git commit: helper class for Amazon queues
helper class for Amazon queues
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/7298853b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/7298853b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/7298853b
Branch: refs/heads/two-dot-o-dev
Commit: 7298853b7c37b5f29792775f77374d4c86559b29
Parents: 8adf048
Author: Jeff West <jw...@apigee.com>
Authored: Tue May 26 09:07:21 2015 -0700
Committer: Jeff West <jw...@apigee.com>
Committed: Tue May 26 09:07:21 2015 -0700
----------------------------------------------------------------------
.../queue/util/AmazonNotificationUtils.java | 177 +++++++++++++++++++
1 file changed, 177 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7298853b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/util/AmazonNotificationUtils.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/util/AmazonNotificationUtils.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/util/AmazonNotificationUtils.java
new file mode 100644
index 0000000..bdceb2c
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/util/AmazonNotificationUtils.java
@@ -0,0 +1,177 @@
+package org.apache.usergrid.persistence.queue.util;
+
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.services.sns.AmazonSNSClient;
+import com.amazonaws.services.sns.model.*;
+import com.amazonaws.services.sqs.AmazonSQSClient;
+import com.amazonaws.services.sqs.model.*;
+import org.apache.usergrid.persistence.queue.QueueFig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Created by Jeff West on 5/25/15.
+ */
+public class AmazonNotificationUtils {
+
+ private static final Logger logger = LoggerFactory.getLogger(AmazonNotificationUtils.class);
+
+ public static String createQueue(final String queueName,
+ final AmazonSQSClient sqs,
+ final QueueFig fig)
+ throws Exception {
+
+ final String deadletterQueueName = String.format("%s_dead", queueName);
+ final Map<String, String> deadLetterAttributes = new HashMap<>(2);
+
+ deadLetterAttributes.put("MessageRetentionPeriod", fig.getDeadletterRetentionPeriod());
+
+ CreateQueueRequest createDeadLetterQueueRequest = new CreateQueueRequest()
+ .withQueueName(deadletterQueueName).withAttributes(deadLetterAttributes);
+
+ final CreateQueueResult deadletterResult = sqs.createQueue(createDeadLetterQueueRequest);
+
+ logger.info("Created deadletter queue with url {}", deadletterResult.getQueueUrl());
+
+ final String deadletterArn = AmazonNotificationUtils.getQueueArnByName(deadletterQueueName, sqs);
+
+ String redrivePolicy = String.format("{\"maxReceiveCount\":\"%s\"," +
+ " \"deadLetterTargetArn\":\"%s\"}", fig.getQueueDeliveryLimit(), deadletterArn);
+
+ final Map<String, String> queueAttributes = new HashMap<>(2);
+ deadLetterAttributes.put("MessageRetentionPeriod", fig.getRetentionPeriod());
+ deadLetterAttributes.put("RedrivePolicy", redrivePolicy);
+
+ CreateQueueRequest createQueueRequest = new CreateQueueRequest().
+ withQueueName(queueName)
+ .withAttributes(queueAttributes);
+
+ CreateQueueResult result = sqs.createQueue(createQueueRequest);
+
+ String url = result.getQueueUrl();
+
+ logger.info("Created SQS queue with url {}", url);
+
+ return url;
+ }
+
+
+ public static String getQueueArnByName(final String queueName,
+ final AmazonSQSClient sqs)
+ throws Exception {
+
+ String queueUrl = null;
+
+ try {
+ GetQueueUrlResult result = sqs.getQueueUrl(queueName);
+ queueUrl = result.getQueueUrl();
+
+ } catch (QueueDoesNotExistException queueDoesNotExistException) {
+ //no op, swallow
+ logger.warn("Queue {} does not exist", queueName);
+ return null;
+
+ } catch (Exception e) {
+ logger.error(String.format("Failed to get URL for Queue [%s] from SQS", queueName), e);
+ throw e;
+ }
+
+ if (queueUrl != null) {
+ String queueArn = null;
+
+ try {
+ GetQueueAttributesRequest queueAttributesRequest = new GetQueueAttributesRequest(queueUrl)
+ .withAttributeNames("All");
+
+ GetQueueAttributesResult queueAttributesResult = sqs.getQueueAttributes(queueAttributesRequest);
+ Map<String, String> sqsAttributeMap = queueAttributesResult.getAttributes();
+
+ return sqsAttributeMap.get("QueueArn");
+
+ } catch (Exception e) {
+ logger.error("Failed to get queue URL from service", e);
+ throw e;
+ }
+ }
+
+ return null;
+ }
+
+ public static String getQueueArnByUrl(final String queueUrl,
+ final AmazonSQSClient sqs)
+ throws Exception {
+
+ String queueArn = null;
+
+ try {
+ GetQueueAttributesRequest queueAttributesRequest = new GetQueueAttributesRequest(queueUrl)
+ .withAttributeNames("All");
+
+ GetQueueAttributesResult queueAttributesResult = sqs.getQueueAttributes(queueAttributesRequest);
+ Map<String, String> sqsAttributeMap = queueAttributesResult.getAttributes();
+
+ return sqsAttributeMap.get("QueueArn");
+
+ } catch (Exception e) {
+ logger.error("Failed to get queue URL from service", e);
+ throw e;
+ }
+ }
+
+ public static void subscribeQueueToTopic(final String topicArn,
+ final String queueArn,
+ final AmazonSNSClient sns)
+ throws Exception {
+
+ try {
+ SubscribeRequest subscribeRequest = new SubscribeRequest(topicArn, "sqs", queueArn);
+ SubscribeResult subscribeResult = sns.subscribe(subscribeRequest);
+ String subscriptionArn = subscribeResult.getSubscriptionArn();
+
+ logger.info("Successfully subscribed SQS Queue {} to SNS arn {} with Subscription arn {}", queueArn, topicArn,
+ subscriptionArn);
+
+ } catch (AuthorizationErrorException e) {
+ logger.error(String.format("AuthorizationErrorException creating/subscribing SQS Queue [%s] to SNS arn [%s]: %s", queueArn, topicArn, e.getMessage()), e);
+ throw new Exception("AuthorizationErrorException creating/subscribing SQS queue to SNS", e);
+ } catch (SubscriptionLimitExceededException e) {
+ logger.error(String.format("SubscriptionLimitExceededException creating/subscribing SQS Queue [%s] to SNS arn [%s]: %s", queueArn, topicArn, e.getMessage()), e);
+ throw new Exception("SubscriptionLimitExceededException creating/subscribing SQS queue to SNS", e);
+ } catch (AmazonServiceException e) {
+ logger.error(String.format("AmazonServiceException creating/subscribing SQS Queue [%s] to SNS arn [%s]: %s", queueArn, topicArn, e.getMessage()), e);
+ throw new Exception("AmazonServiceException creating/subscribing SQS queue to SNS", e);
+ } catch (Exception e) {
+ logger.error(String.format("Failed creating/subscribing SQS Queue [%s] to SNS arn [%s]: %s", queueArn, topicArn, e.getMessage()), e);
+ throw e;
+ }
+ }
+
+ public static String getTopicArn(final String queueName,
+ final AmazonSNSClient sns,
+ final boolean createOnMissing)
+ throws Exception {
+
+ ListTopicsResult listTopicsResult = sns.listTopics();
+ String topicArn = null;
+
+ for (Topic topic : listTopicsResult.getTopics()) {
+ String arn = topic.getTopicArn();
+
+ if (queueName.equals(arn.substring(arn.lastIndexOf(':')))) {
+ topicArn = arn;
+ }
+ }
+
+ if (topicArn == null && createOnMissing) {
+ CreateTopicResult createTopicResult = sns.createTopic(queueName);
+ topicArn = createTopicResult.getTopicArn();
+ logger.info("Created topic with name {} and arn {}", queueName, topicArn);
+ }
+
+ return topicArn;
+ }
+
+}