You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/05/28 01:52:46 UTC

[10/15] incubator-usergrid git commit: New class for SNS-based queue impl

New class for SNS-based queue impl


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/6825b607
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/6825b607
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/6825b607

Branch: refs/heads/two-dot-o-dev
Commit: 6825b607497c72a88ec47df55a206246cd73d663
Parents: 8cb287a
Author: Jeff West <jw...@apigee.com>
Authored: Tue May 26 09:08:43 2015 -0700
Committer: Jeff West <jw...@apigee.com>
Committed: Tue May 26 09:08:43 2015 -0700

----------------------------------------------------------------------
 .../queue/impl/SNSQueueManagerImpl.java         | 370 +++++++++++++++++++
 1 file changed, 370 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6825b607/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
new file mode 100644
index 0000000..0f1661d
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
@@ -0,0 +1,370 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  The ASF licenses this file to You
+ * under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.  For additional information regarding
+ * copyright in this work, please see the NOTICE file in the top level
+ * directory of this distribution.
+ */
+package org.apache.usergrid.persistence.queue.impl;
+
+
+import com.amazonaws.regions.Region;
+import com.amazonaws.regions.Regions;
+import com.amazonaws.services.sns.AmazonSNSClient;
+import com.amazonaws.services.sns.model.*;
+import com.amazonaws.services.sqs.AmazonSQSClient;
+import com.amazonaws.services.sqs.model.*;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.smile.SmileFactory;
+import com.google.common.base.Preconditions;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+import org.apache.usergrid.persistence.queue.*;
+import org.apache.usergrid.persistence.queue.Queue;
+import org.apache.usergrid.persistence.queue.util.AmazonNotificationUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.ExecutionException;
+
+public class SNSQueueManagerImpl implements QueueManager {
+
+    private static final Logger logger = LoggerFactory.getLogger(SNSQueueManagerImpl.class);
+
+    private final QueueScope scope;
+    private ObjectMapper mapper;
+    private final QueueFig fig;
+    private final AmazonSQSClient sqs;
+    private final AmazonSNSClient sns;
+
+    private static SmileFactory smileFactory = new SmileFactory();
+
+    private final LoadingCache<String, String> writeTopicArnMap = CacheBuilder.newBuilder()
+        .maximumSize(1000)
+        .build(new CacheLoader<String, String>() {
+            @Override
+            public String load(String queueName)
+                throws Exception {
+
+                String primaryTopicArn = setupMultiRegion(queueName);
+
+                return primaryTopicArn;
+            }
+        });
+
+    private final LoadingCache<String, Queue> readQueueUrlMap = CacheBuilder.newBuilder()
+        .maximumSize(1000)
+        .build(new CacheLoader<String, Queue>() {
+            @Override
+            public Queue load(String queueName) throws Exception {
+
+                Queue queue = null;
+
+                try {
+                    GetQueueUrlResult result = sqs.getQueueUrl(queueName);
+                    queue = new Queue(result.getQueueUrl());
+                } catch (QueueDoesNotExistException queueDoesNotExistException) {
+                    logger.error("Queue {} does not exist, creating", queueName);
+                } catch (Exception e) {
+                    logger.error("failed to get queue from service", e);
+                    throw e;
+                }
+
+                if (queue == null) {
+                    String primaryTopicArn = setupMultiRegion(queueName);
+
+                    String url = AmazonNotificationUtils.getQueueArnByName(queueName, sqs);
+                    queue = new Queue(url);
+                }
+
+                return queue;
+            }
+        });
+
+
+    @Inject
+    public SNSQueueManagerImpl(@Assisted QueueScope scope, QueueFig fig) {
+        this.scope = scope;
+        this.fig = fig;
+
+        try {
+            smileFactory.delegateToTextual(true);
+            mapper = new ObjectMapper(smileFactory);
+            mapper.enableDefaultTypingAsProperty(ObjectMapper.DefaultTyping.JAVA_LANG_OBJECT, "@class");
+
+            sqs = createSQSClient(getRegion());
+            sns = createSNSClient(getRegion());
+
+        } catch (Exception e) {
+            throw new RuntimeException("Error setting up mapper", e);
+        }
+    }
+
+    private String setupMultiRegion(final String queueName)
+        throws Exception {
+
+        String primaryTopicArn = AmazonNotificationUtils.getTopicArn(queueName, sns, true);
+
+        String primaryQueueArn = AmazonNotificationUtils.getQueueArnByName(queueName, sqs);
+
+        if (primaryQueueArn == null) {
+            String queueUrl = AmazonNotificationUtils.createQueue(queueName, sqs, fig);
+            primaryQueueArn = AmazonNotificationUtils.getQueueArnByUrl(queueUrl, sqs);
+        }
+
+        AmazonNotificationUtils.subscribeQueueToTopic(primaryTopicArn, primaryQueueArn, sns);
+
+        if (fig.isMultiRegion()) {
+
+            String multiRegion = fig.getRegionList();
+            String[] regionNames = multiRegion.split(",");
+
+            final Set<String> arrQueueArns = new HashSet<>(regionNames.length + 1);
+            final Map<String, AmazonSNSClient> topicArns = new HashMap<>(regionNames.length + 1);
+
+            arrQueueArns.add(primaryQueueArn);
+            topicArns.put(primaryTopicArn, sns);
+
+            for (String regionName : regionNames) {
+
+                Regions regions = Regions.fromName(regionName);
+                Region region = Region.getRegion(regions);
+
+                AmazonSQSClient sqsClient = createSQSClient(region);
+                AmazonSNSClient snsClient = createSNSClient(region);
+
+                String topicArn = AmazonNotificationUtils.getTopicArn(queueName, snsClient, true);
+
+                topicArns.put(topicArn, snsClient);
+
+                String queueArn = AmazonNotificationUtils.getQueueArnByName(queueName, sqsClient);
+
+                if (queueArn == null) {
+                    String queueUrl = AmazonNotificationUtils.createQueue(queueName, sqsClient, fig);
+                    queueArn = AmazonNotificationUtils.getQueueArnByUrl(queueUrl, sqsClient);
+                }
+
+                arrQueueArns.add(queueArn);
+            }
+
+            for (String queueArn : arrQueueArns) {
+                for (Map.Entry<String, AmazonSNSClient> topicArnEntry : topicArns.entrySet()) {
+                    String topicArn = topicArnEntry.getKey();
+                    AmazonSNSClient snsClient = topicArnEntry.getValue();
+                    AmazonNotificationUtils.subscribeQueueToTopic(topicArn, queueArn, snsClient);
+                }
+
+            }
+        }
+
+        return primaryTopicArn;
+    }
+
+
+    private AmazonSNSClient createSNSClient(final Region region) {
+        final UsergridAwsCredentialsProvider ugProvider = new UsergridAwsCredentialsProvider();
+        final AmazonSNSClient sns = new AmazonSNSClient(ugProvider.getCredentials());
+
+        sns.setRegion(region);
+
+        return sns;
+    }
+
+
+    private String getName() {
+        String name = fig.getPrefix() + "_" + scope.getName();
+
+        Preconditions.checkArgument(name.length() <= 80, "Your name must be < than 80 characters");
+
+        return name;
+    }
+
+    public Queue getReadQueue() {
+        try {
+            return readQueueUrlMap.get(getName());
+        } catch (ExecutionException ee) {
+            throw new RuntimeException(ee);
+        }
+    }
+
+    public String getWriteTopicArn() {
+        try {
+            return writeTopicArnMap.get(getName());
+
+        } catch (ExecutionException ee) {
+            throw new RuntimeException(ee);
+        }
+    }
+
+    @Override
+    public List<QueueMessage> getMessages(final int limit,
+                                          final int transactionTimeout,
+                                          final int waitTime,
+                                          final Class klass) {
+
+        if (sqs == null) {
+            logger.error("SQS is null - was not initialized properly");
+            return new ArrayList<>();
+        }
+
+
+        String url = getReadQueue().getUrl();
+
+        if (logger.isDebugEnabled()) logger.debug("Getting {} messages from {}", limit, url);
+
+        ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(url);
+        receiveMessageRequest.setMaxNumberOfMessages(limit);
+        receiveMessageRequest.setVisibilityTimeout(transactionTimeout / 1000);
+        receiveMessageRequest.setWaitTimeSeconds(waitTime / 1000);
+        ReceiveMessageResult result = sqs.receiveMessage(receiveMessageRequest);
+        List<Message> messages = result.getMessages();
+
+        if (logger.isDebugEnabled()) logger.debug("Received {} messages from {}", messages.size(), url);
+
+        List<QueueMessage> queueMessages = new ArrayList<>(messages.size());
+
+        for (Message message : messages) {
+            Object body;
+
+            try {
+                body = fromString(message.getBody(), klass);
+            } catch (Exception e) {
+                logger.error("failed to deserialize message", e);
+                throw new RuntimeException(e);
+            }
+
+            QueueMessage queueMessage = new QueueMessage(message.getMessageId(), message.getReceiptHandle(), body, message.getAttributes().get("type"));
+            queueMessages.add(queueMessage);
+        }
+        return queueMessages;
+    }
+
+    @Override
+    public void sendMessages(final List bodies) throws IOException {
+
+        if (sns == null) {
+            logger.error("SNS client is null, perhaps it failed to initialize successfully");
+            return;
+        }
+
+        for (Object body : bodies) {
+            sendMessage(body);
+        }
+
+    }
+
+    @Override
+    public void sendMessage(final Object body) throws IOException {
+
+        if (sns == null) {
+            logger.error("SNS client is null, perhaps it failed to initialize successfully");
+            return;
+        }
+
+        final String stringBody = toString(body);
+
+        String topicArn = getWriteTopicArn();
+
+        if (logger.isDebugEnabled()) logger.debug("Publishing Message...{} to arn: {}", stringBody, topicArn);
+
+        PublishResult publishResult = sns.publish(topicArn, toString(body));
+
+        if (logger.isDebugEnabled())
+            logger.debug("Published Message ID: {} to arn: {}", publishResult.getMessageId(), topicArn);
+    }
+
+
+    @Override
+    public void commitMessage(final QueueMessage queueMessage) {
+        String url = getReadQueue().getUrl();
+        if (logger.isDebugEnabled()) logger.debug("Commit message {} to queue {}", queueMessage.getMessageId(), url);
+
+        sqs.deleteMessage(new DeleteMessageRequest()
+            .withQueueUrl(url)
+            .withReceiptHandle(queueMessage.getHandle()));
+    }
+
+
+    @Override
+    public void commitMessages(final List<QueueMessage> queueMessages) {
+        String url = getReadQueue().getUrl();
+
+        if (logger.isDebugEnabled()) logger.debug("Commit messages {} to queue {}", queueMessages.size(), url);
+
+        List<DeleteMessageBatchRequestEntry> entries = new ArrayList<>();
+
+        for (QueueMessage message : queueMessages) {
+            entries.add(new DeleteMessageBatchRequestEntry(message.getMessageId(), message.getHandle()));
+        }
+
+        DeleteMessageBatchRequest request = new DeleteMessageBatchRequest(url, entries);
+        DeleteMessageBatchResult result = sqs.deleteMessageBatch(request);
+
+        boolean successful = result.getFailed().size() <= 0;
+
+        if (!successful) {
+            for (BatchResultErrorEntry failed : result.getFailed()) {
+                logger.error("Commit failed reason: {} messages id: {}", failed.getMessage(), failed.getId());
+            }
+        }
+    }
+
+
+    /**
+     * Read the object from Base64 string.
+     */
+    private Object fromString(final String s, final Class klass)
+        throws IOException, ClassNotFoundException {
+
+        Object o = mapper.readValue(s, klass);
+        return o;
+    }
+
+    /**
+     * Write the object to a Base64 string.
+     */
+    private String toString(final Object o) throws IOException {
+        return mapper.writeValueAsString(o);
+    }
+
+
+    /**
+     * Get the region
+     *
+     * @return
+     */
+    private Region getRegion() {
+        Regions regions = Regions.fromName(fig.getRegion());
+        return Region.getRegion(regions);
+    }
+
+
+    /**
+     * Create the SQS client for the specified settings
+     */
+    private AmazonSQSClient createSQSClient(final Region region) {
+        final UsergridAwsCredentialsProvider ugProvider = new UsergridAwsCredentialsProvider();
+        final AmazonSQSClient sqs = new AmazonSQSClient(ugProvider.getCredentials());
+
+        sqs.setRegion(region);
+
+        return sqs;
+    }
+
+
+}