You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/05/28 01:52:46 UTC
[10/15] incubator-usergrid git commit: New class for SNS-based queue
impl
New class for SNS-based queue impl
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/6825b607
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/6825b607
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/6825b607
Branch: refs/heads/two-dot-o-dev
Commit: 6825b607497c72a88ec47df55a206246cd73d663
Parents: 8cb287a
Author: Jeff West <jw...@apigee.com>
Authored: Tue May 26 09:08:43 2015 -0700
Committer: Jeff West <jw...@apigee.com>
Committed: Tue May 26 09:08:43 2015 -0700
----------------------------------------------------------------------
.../queue/impl/SNSQueueManagerImpl.java | 370 +++++++++++++++++++
1 file changed, 370 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6825b607/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
new file mode 100644
index 0000000..0f1661d
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
@@ -0,0 +1,370 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. The ASF licenses this file to You
+ * under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License. For additional information regarding
+ * copyright in this work, please see the NOTICE file in the top level
+ * directory of this distribution.
+ */
+package org.apache.usergrid.persistence.queue.impl;
+
+
+import com.amazonaws.regions.Region;
+import com.amazonaws.regions.Regions;
+import com.amazonaws.services.sns.AmazonSNSClient;
+import com.amazonaws.services.sns.model.*;
+import com.amazonaws.services.sqs.AmazonSQSClient;
+import com.amazonaws.services.sqs.model.*;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.smile.SmileFactory;
+import com.google.common.base.Preconditions;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+import org.apache.usergrid.persistence.queue.*;
+import org.apache.usergrid.persistence.queue.Queue;
+import org.apache.usergrid.persistence.queue.util.AmazonNotificationUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.ExecutionException;
+
+public class SNSQueueManagerImpl implements QueueManager {
+
+ private static final Logger logger = LoggerFactory.getLogger(SNSQueueManagerImpl.class);
+
+ private final QueueScope scope;
+ private ObjectMapper mapper;
+ private final QueueFig fig;
+ private final AmazonSQSClient sqs;
+ private final AmazonSNSClient sns;
+
+ private static SmileFactory smileFactory = new SmileFactory();
+
+ private final LoadingCache<String, String> writeTopicArnMap = CacheBuilder.newBuilder()
+ .maximumSize(1000)
+ .build(new CacheLoader<String, String>() {
+ @Override
+ public String load(String queueName)
+ throws Exception {
+
+ String primaryTopicArn = setupMultiRegion(queueName);
+
+ return primaryTopicArn;
+ }
+ });
+
+ private final LoadingCache<String, Queue> readQueueUrlMap = CacheBuilder.newBuilder()
+ .maximumSize(1000)
+ .build(new CacheLoader<String, Queue>() {
+ @Override
+ public Queue load(String queueName) throws Exception {
+
+ Queue queue = null;
+
+ try {
+ GetQueueUrlResult result = sqs.getQueueUrl(queueName);
+ queue = new Queue(result.getQueueUrl());
+ } catch (QueueDoesNotExistException queueDoesNotExistException) {
+ logger.error("Queue {} does not exist, creating", queueName);
+ } catch (Exception e) {
+ logger.error("failed to get queue from service", e);
+ throw e;
+ }
+
+ if (queue == null) {
+ String primaryTopicArn = setupMultiRegion(queueName);
+
+ String url = AmazonNotificationUtils.getQueueArnByName(queueName, sqs);
+ queue = new Queue(url);
+ }
+
+ return queue;
+ }
+ });
+
+
+ @Inject
+ public SNSQueueManagerImpl(@Assisted QueueScope scope, QueueFig fig) {
+ this.scope = scope;
+ this.fig = fig;
+
+ try {
+ smileFactory.delegateToTextual(true);
+ mapper = new ObjectMapper(smileFactory);
+ mapper.enableDefaultTypingAsProperty(ObjectMapper.DefaultTyping.JAVA_LANG_OBJECT, "@class");
+
+ sqs = createSQSClient(getRegion());
+ sns = createSNSClient(getRegion());
+
+ } catch (Exception e) {
+ throw new RuntimeException("Error setting up mapper", e);
+ }
+ }
+
+ private String setupMultiRegion(final String queueName)
+ throws Exception {
+
+ String primaryTopicArn = AmazonNotificationUtils.getTopicArn(queueName, sns, true);
+
+ String primaryQueueArn = AmazonNotificationUtils.getQueueArnByName(queueName, sqs);
+
+ if (primaryQueueArn == null) {
+ String queueUrl = AmazonNotificationUtils.createQueue(queueName, sqs, fig);
+ primaryQueueArn = AmazonNotificationUtils.getQueueArnByUrl(queueUrl, sqs);
+ }
+
+ AmazonNotificationUtils.subscribeQueueToTopic(primaryTopicArn, primaryQueueArn, sns);
+
+ if (fig.isMultiRegion()) {
+
+ String multiRegion = fig.getRegionList();
+ String[] regionNames = multiRegion.split(",");
+
+ final Set<String> arrQueueArns = new HashSet<>(regionNames.length + 1);
+ final Map<String, AmazonSNSClient> topicArns = new HashMap<>(regionNames.length + 1);
+
+ arrQueueArns.add(primaryQueueArn);
+ topicArns.put(primaryTopicArn, sns);
+
+ for (String regionName : regionNames) {
+
+ Regions regions = Regions.fromName(regionName);
+ Region region = Region.getRegion(regions);
+
+ AmazonSQSClient sqsClient = createSQSClient(region);
+ AmazonSNSClient snsClient = createSNSClient(region);
+
+ String topicArn = AmazonNotificationUtils.getTopicArn(queueName, snsClient, true);
+
+ topicArns.put(topicArn, snsClient);
+
+ String queueArn = AmazonNotificationUtils.getQueueArnByName(queueName, sqsClient);
+
+ if (queueArn == null) {
+ String queueUrl = AmazonNotificationUtils.createQueue(queueName, sqsClient, fig);
+ queueArn = AmazonNotificationUtils.getQueueArnByUrl(queueUrl, sqsClient);
+ }
+
+ arrQueueArns.add(queueArn);
+ }
+
+ for (String queueArn : arrQueueArns) {
+ for (Map.Entry<String, AmazonSNSClient> topicArnEntry : topicArns.entrySet()) {
+ String topicArn = topicArnEntry.getKey();
+ AmazonSNSClient snsClient = topicArnEntry.getValue();
+ AmazonNotificationUtils.subscribeQueueToTopic(topicArn, queueArn, snsClient);
+ }
+
+ }
+ }
+
+ return primaryTopicArn;
+ }
+
+
+ private AmazonSNSClient createSNSClient(final Region region) {
+ final UsergridAwsCredentialsProvider ugProvider = new UsergridAwsCredentialsProvider();
+ final AmazonSNSClient sns = new AmazonSNSClient(ugProvider.getCredentials());
+
+ sns.setRegion(region);
+
+ return sns;
+ }
+
+
+ private String getName() {
+ String name = fig.getPrefix() + "_" + scope.getName();
+
+ Preconditions.checkArgument(name.length() <= 80, "Your name must be < than 80 characters");
+
+ return name;
+ }
+
+ public Queue getReadQueue() {
+ try {
+ return readQueueUrlMap.get(getName());
+ } catch (ExecutionException ee) {
+ throw new RuntimeException(ee);
+ }
+ }
+
+ public String getWriteTopicArn() {
+ try {
+ return writeTopicArnMap.get(getName());
+
+ } catch (ExecutionException ee) {
+ throw new RuntimeException(ee);
+ }
+ }
+
+ @Override
+ public List<QueueMessage> getMessages(final int limit,
+ final int transactionTimeout,
+ final int waitTime,
+ final Class klass) {
+
+ if (sqs == null) {
+ logger.error("SQS is null - was not initialized properly");
+ return new ArrayList<>();
+ }
+
+
+ String url = getReadQueue().getUrl();
+
+ if (logger.isDebugEnabled()) logger.debug("Getting {} messages from {}", limit, url);
+
+ ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(url);
+ receiveMessageRequest.setMaxNumberOfMessages(limit);
+ receiveMessageRequest.setVisibilityTimeout(transactionTimeout / 1000);
+ receiveMessageRequest.setWaitTimeSeconds(waitTime / 1000);
+ ReceiveMessageResult result = sqs.receiveMessage(receiveMessageRequest);
+ List<Message> messages = result.getMessages();
+
+ if (logger.isDebugEnabled()) logger.debug("Received {} messages from {}", messages.size(), url);
+
+ List<QueueMessage> queueMessages = new ArrayList<>(messages.size());
+
+ for (Message message : messages) {
+ Object body;
+
+ try {
+ body = fromString(message.getBody(), klass);
+ } catch (Exception e) {
+ logger.error("failed to deserialize message", e);
+ throw new RuntimeException(e);
+ }
+
+ QueueMessage queueMessage = new QueueMessage(message.getMessageId(), message.getReceiptHandle(), body, message.getAttributes().get("type"));
+ queueMessages.add(queueMessage);
+ }
+ return queueMessages;
+ }
+
+ @Override
+ public void sendMessages(final List bodies) throws IOException {
+
+ if (sns == null) {
+ logger.error("SNS client is null, perhaps it failed to initialize successfully");
+ return;
+ }
+
+ for (Object body : bodies) {
+ sendMessage(body);
+ }
+
+ }
+
+ @Override
+ public void sendMessage(final Object body) throws IOException {
+
+ if (sns == null) {
+ logger.error("SNS client is null, perhaps it failed to initialize successfully");
+ return;
+ }
+
+ final String stringBody = toString(body);
+
+ String topicArn = getWriteTopicArn();
+
+ if (logger.isDebugEnabled()) logger.debug("Publishing Message...{} to arn: {}", stringBody, topicArn);
+
+ PublishResult publishResult = sns.publish(topicArn, toString(body));
+
+ if (logger.isDebugEnabled())
+ logger.debug("Published Message ID: {} to arn: {}", publishResult.getMessageId(), topicArn);
+ }
+
+
+ @Override
+ public void commitMessage(final QueueMessage queueMessage) {
+ String url = getReadQueue().getUrl();
+ if (logger.isDebugEnabled()) logger.debug("Commit message {} to queue {}", queueMessage.getMessageId(), url);
+
+ sqs.deleteMessage(new DeleteMessageRequest()
+ .withQueueUrl(url)
+ .withReceiptHandle(queueMessage.getHandle()));
+ }
+
+
+ @Override
+ public void commitMessages(final List<QueueMessage> queueMessages) {
+ String url = getReadQueue().getUrl();
+
+ if (logger.isDebugEnabled()) logger.debug("Commit messages {} to queue {}", queueMessages.size(), url);
+
+ List<DeleteMessageBatchRequestEntry> entries = new ArrayList<>();
+
+ for (QueueMessage message : queueMessages) {
+ entries.add(new DeleteMessageBatchRequestEntry(message.getMessageId(), message.getHandle()));
+ }
+
+ DeleteMessageBatchRequest request = new DeleteMessageBatchRequest(url, entries);
+ DeleteMessageBatchResult result = sqs.deleteMessageBatch(request);
+
+ boolean successful = result.getFailed().size() <= 0;
+
+ if (!successful) {
+ for (BatchResultErrorEntry failed : result.getFailed()) {
+ logger.error("Commit failed reason: {} messages id: {}", failed.getMessage(), failed.getId());
+ }
+ }
+ }
+
+
+ /**
+ * Read the object from Base64 string.
+ */
+ private Object fromString(final String s, final Class klass)
+ throws IOException, ClassNotFoundException {
+
+ Object o = mapper.readValue(s, klass);
+ return o;
+ }
+
+ /**
+ * Write the object to a Base64 string.
+ */
+ private String toString(final Object o) throws IOException {
+ return mapper.writeValueAsString(o);
+ }
+
+
+ /**
+ * Get the region
+ *
+ * @return
+ */
+ private Region getRegion() {
+ Regions regions = Regions.fromName(fig.getRegion());
+ return Region.getRegion(regions);
+ }
+
+
+ /**
+ * Create the SQS client for the specified settings
+ */
+ private AmazonSQSClient createSQSClient(final Region region) {
+ final UsergridAwsCredentialsProvider ugProvider = new UsergridAwsCredentialsProvider();
+ final AmazonSQSClient sqs = new AmazonSQSClient(ugProvider.getCredentials());
+
+ sqs.setRegion(region);
+
+ return sqs;
+ }
+
+
+}