You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/05/28 01:52:45 UTC

[09/15] incubator-usergrid git commit: Reformatted for readability, small changes to support full impl of async events

Reformatted for readability, small changes to support full impl of async events


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/8cb287ac
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/8cb287ac
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/8cb287ac

Branch: refs/heads/two-dot-o-dev
Commit: 8cb287ac6d43acab4d68506b35d4a7725ff344d0
Parents: 7298853
Author: Jeff West <jw...@apigee.com>
Authored: Tue May 26 09:08:10 2015 -0700
Committer: Jeff West <jw...@apigee.com>
Committed: Tue May 26 09:08:10 2015 -0700

----------------------------------------------------------------------
 .../queue/impl/SQSQueueManagerImpl.java         | 239 ++++++++++++-------
 1 file changed, 151 insertions(+), 88 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8cb287ac/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
index 088359a..e28e805 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
@@ -19,11 +19,10 @@ package org.apache.usergrid.persistence.queue.impl;
 
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.UUID;
+import java.util.*;
 import java.util.concurrent.ExecutionException;
 
+import org.apache.usergrid.persistence.queue.util.AmazonNotificationUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -62,69 +61,100 @@ import com.google.inject.Inject;
 import com.google.inject.assistedinject.Assisted;
 
 public class SQSQueueManagerImpl implements QueueManager {
-    private static final Logger LOG = LoggerFactory.getLogger(SQSQueueManagerImpl.class);
+    private static final Logger logger = LoggerFactory.getLogger(SQSQueueManagerImpl.class);
 
 
-    private  final QueueScope scope;
-    private  ObjectMapper mapper;
-    private final QueueFig fig;
-    private final AmazonSQSClient sqs;
+    private final QueueScope scope;
+    private ObjectMapper mapper;
+    protected final QueueFig fig;
+    protected final AmazonSQSClient sqs;
 
     private static SmileFactory smileFactory = new SmileFactory();
 
     private LoadingCache<String, Queue> urlMap = CacheBuilder.newBuilder()
-            .maximumSize( 1000 )
-            .build( new CacheLoader<String, Queue>() {
-                @Override
-                public Queue load( String queueName ) throws Exception {
-
-                    //the amazon client is not thread safe, we need to create one per queue
-                    Queue queue = null;
-                    try {
-                        GetQueueUrlResult result = sqs.getQueueUrl( queueName );
-                        queue = new Queue( result.getQueueUrl() );
-                    }catch ( QueueDoesNotExistException queueDoesNotExistException ) {
-                        //no op, swallow
-                        LOG.error( "Queue {} does not exist, creating", queueName );
-
-                    }
-                    catch ( Exception e ) {
-                        LOG.error( "failed to get queue from service", e );
-                        throw e;
-                    }
-                    if ( queue == null ) {
-                        CreateQueueRequest createQueueRequest = new CreateQueueRequest().withQueueName( queueName );
-                        CreateQueueResult result = sqs.createQueue( createQueueRequest );
-                        String url = result.getQueueUrl();
-                        queue = new Queue( url );
-                        LOG.info( "Created queue with url {}", url );
-                    }
-                    return queue;
+        .maximumSize(1000)
+        .build(new CacheLoader<String, Queue>() {
+            @Override
+            public Queue load(String queueName) throws Exception {
+
+                //the amazon client is not thread safe, we need to create one per queue
+                Queue queue = null;
+
+                try {
+
+                    GetQueueUrlResult result = sqs.getQueueUrl(queueName);
+                    queue = new Queue(result.getQueueUrl());
+
+                } catch (QueueDoesNotExistException queueDoesNotExistException) {
+                    //no op, swallow
+                    logger.error("Queue {} does not exist, creating", queueName);
+
+                } catch (Exception e) {
+                    logger.error("failed to get queue from service", e);
+                    throw e;
+                }
+
+                if (queue == null) {
+
+                    final String deadletterQueueName = String.format("%s_dead", queueName);
+                    final Map<String, String> deadLetterAttributes = new HashMap<>(2);
+
+                    deadLetterAttributes.put("MessageRetentionPeriod", fig.getDeadletterRetentionPeriod());
+                    CreateQueueRequest createDeadLetterQueueRequest = new CreateQueueRequest()
+                        .withQueueName(deadletterQueueName).withAttributes(deadLetterAttributes);
+
+                    final CreateQueueResult deadletterResult = sqs.createQueue(createDeadLetterQueueRequest);
+                    logger.info("Created deadletter queue with url {}", deadletterResult.getQueueUrl());
+
+                    final String deadletterArn = AmazonNotificationUtils.getQueueArnByName(deadletterQueueName, sqs);
+
+                    String redrivePolicy = String.format("{\"maxReceiveCount\":\"%s\"," +
+                        " \"deadLetterTargetArn\":\"%s\"}", fig.getQueueDeliveryLimit(), deadletterArn);
+
+                    final Map<String, String> queueAttributes = new HashMap<>(2);
+                    deadLetterAttributes.put("MessageRetentionPeriod", fig.getRetentionPeriod());
+                    deadLetterAttributes.put("RedrivePolicy", redrivePolicy);
+
+                    CreateQueueRequest createQueueRequest = new CreateQueueRequest().
+                        withQueueName(queueName)
+                        .withAttributes(queueAttributes);
+
+                    CreateQueueResult result = sqs.createQueue(createQueueRequest);
+
+                    String url = result.getQueueUrl();
+                    queue = new Queue(url);
+
+                    logger.info("Created queue with url {}", url);
                 }
-            } );
+
+                return queue;
+            }
+        });
 
 
     @Inject
-    public SQSQueueManagerImpl( @Assisted QueueScope scope, QueueFig fig ){
+    public SQSQueueManagerImpl(@Assisted QueueScope scope, final QueueFig fig) {
+
         this.scope = scope;
         this.fig = fig;
         try {
 
             smileFactory.delegateToTextual(true);
-            mapper = new ObjectMapper( smileFactory );
+            mapper = new ObjectMapper(smileFactory);
             //pretty print, disabling for speed
 //            mapper.enable(SerializationFeature.INDENT_OUTPUT);
             mapper.enableDefaultTypingAsProperty(ObjectMapper.DefaultTyping.JAVA_LANG_OBJECT, "@class");
 
             sqs = createClient();
 
-        } catch ( Exception e ) {
+        } catch (Exception e) {
             throw new RuntimeException("Error setting up mapper", e);
         }
     }
 
 
-    private String getName() {
+    protected String getName() {
+
         String name = fig.getPrefix() + "_" + scope.getName();
 
         Preconditions.checkArgument(name.length() <= 80, "Your name must be < than 80 characters");
@@ -133,6 +163,7 @@ public class SQSQueueManagerImpl implements QueueManager {
     }
 
     public Queue getQueue() {
+
         try {
             Queue queue = urlMap.get(getName());
             return queue;
@@ -142,67 +173,86 @@ public class SQSQueueManagerImpl implements QueueManager {
     }
 
     @Override
-    public List<QueueMessage> getMessages(int limit, int transactionTimeout, int waitTime, Class klass) {
-        if(sqs == null){
-            LOG.error("Sqs is null");
+    public List<QueueMessage> getMessages(final int limit,
+                                          final int transactionTimeout,
+                                          final int waitTime,
+                                          final Class klass) {
+
+        if (sqs == null) {
+            logger.error("Sqs is null");
             return new ArrayList<>();
         }
-        waitTime = waitTime/1000;
+
         String url = getQueue().getUrl();
-        LOG.debug( "Getting {} messages from {}", limit, url);
+
+        if (logger.isDebugEnabled()) logger.debug("Getting Max {} messages from {}", limit, url);
+
         ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(url);
         receiveMessageRequest.setMaxNumberOfMessages(limit);
-        receiveMessageRequest.setVisibilityTimeout(transactionTimeout);
-        receiveMessageRequest.setWaitTimeSeconds(waitTime);
+        receiveMessageRequest.setVisibilityTimeout(transactionTimeout / 1000);
+        receiveMessageRequest.setWaitTimeSeconds(waitTime / 1000);
         ReceiveMessageResult result = sqs.receiveMessage(receiveMessageRequest);
         List<Message> messages = result.getMessages();
-        LOG.debug( "Received {} messages from {}", messages.size(), url);
+
+        if (logger.isDebugEnabled()) logger.debug("Received {} messages from {}", messages.size(), url);
+
         List<QueueMessage> queueMessages = new ArrayList<>(messages.size());
+
         for (Message message : messages) {
-            Object body ;
-            try{
-                body = fromString(message.getBody(),klass);
-            }catch (Exception e){
-                LOG.error("failed to deserialize message", e);
+            Object body;
+
+            try {
+                body = fromString(message.getBody(), klass);
+            } catch (Exception e) {
+                logger.error("failed to deserialize message", e);
                 throw new RuntimeException(e);
             }
-            QueueMessage queueMessage = new QueueMessage(message.getMessageId(),message.getReceiptHandle(),body,message.getAttributes().get( "type" ));
+
+            QueueMessage queueMessage = new QueueMessage(message.getMessageId(), message.getReceiptHandle(), body, message.getAttributes().get("type"));
             queueMessages.add(queueMessage);
         }
+
         return queueMessages;
     }
 
     @Override
-    public void sendMessages(List bodies) throws IOException {
-        if(sqs == null){
-            LOG.error("Sqs is null");
+    public void sendMessages(final List bodies) throws IOException {
+
+        if (sqs == null) {
+            logger.error("Sqs is null");
             return;
         }
         String url = getQueue().getUrl();
-        LOG.debug( "Sending Messages...{} to {}", bodies.size(), url);
+
+        if (logger.isDebugEnabled()) logger.debug("Sending Messages...{} to {}", bodies.size(), url);
 
         SendMessageBatchRequest request = new SendMessageBatchRequest(url);
         List<SendMessageBatchRequestEntry> entries = new ArrayList<>(bodies.size());
-        for(Object body : bodies){
+
+        for (Object body : bodies) {
             SendMessageBatchRequestEntry entry = new SendMessageBatchRequestEntry();
             entry.setId(UUID.randomUUID().toString());
-            entry.setMessageBody( toString( body ) );
-            entry.addMessageAttributesEntry( "type",new MessageAttributeValue().withStringValue( "mytype" ) );
+            entry.setMessageBody(toString(body));
+            entry.addMessageAttributesEntry("type", new MessageAttributeValue().withStringValue("mytype"));
             entries.add(entry);
         }
+
         request.setEntries(entries);
         sqs.sendMessageBatch(request);
 
     }
 
     @Override
-    public void sendMessage(Object body) throws IOException {
-        if(sqs == null){
-            LOG.error("Sqs is null");
+    public void sendMessage(final Object body) throws IOException {
+
+        if (sqs == null) {
+            logger.error("Sqs is null");
             return;
         }
+
         String url = getQueue().getUrl();
-        LOG.debug( "Sending Message...{} to {}", body.toString(), url);
+
+        if (logger.isDebugEnabled()) logger.debug("Sending Message...{} to {}", body.toString(), url);
 
         final String stringBody = toString(body);
 
@@ -212,55 +262,68 @@ public class SQSQueueManagerImpl implements QueueManager {
 
 
     @Override
-    public void commitMessage(QueueMessage queueMessage) {
+    public void commitMessage(final QueueMessage queueMessage) {
+
         String url = getQueue().getUrl();
-        LOG.debug( "Commit message {} to queue {}", queueMessage.getMessageId(), url);
+        if (logger.isDebugEnabled()) logger.debug("Commit message {} to queue {}", queueMessage.getMessageId(), url);
 
         sqs.deleteMessage(new DeleteMessageRequest()
-                .withQueueUrl(url)
-                .withReceiptHandle(queueMessage.getHandle()));
+            .withQueueUrl(url)
+            .withReceiptHandle(queueMessage.getHandle()));
     }
 
 
     @Override
-    public void commitMessages(List<QueueMessage> queueMessages) {
+    public void commitMessages(final List<QueueMessage> queueMessages) {
+
         String url = getQueue().getUrl();
-        LOG.debug( "Commit messages {} to queue {}", queueMessages.size(), url);
+        if (logger.isDebugEnabled()) logger.debug("Commit messages {} to queue {}", queueMessages.size(), url);
+
         List<DeleteMessageBatchRequestEntry> entries = new ArrayList<>();
-        for(QueueMessage message : queueMessages){
-            entries.add(new DeleteMessageBatchRequestEntry(message.getMessageId(),message.getHandle()));
+
+        for (QueueMessage message : queueMessages) {
+            entries.add(new DeleteMessageBatchRequestEntry(message.getMessageId(), message.getHandle()));
         }
-        DeleteMessageBatchRequest request = new DeleteMessageBatchRequest(url,entries);
+
+        DeleteMessageBatchRequest request = new DeleteMessageBatchRequest(url, entries);
         DeleteMessageBatchResult result = sqs.deleteMessageBatch(request);
+
         boolean successful = result.getFailed().size() <= 0;
-        if(!successful){
-            for( BatchResultErrorEntry failed : result.getFailed()) {
-                LOG.error("Commit failed reason: {} messages id: {}", failed.getMessage(),failed.getId());
+
+        if (!successful) {
+
+            for (BatchResultErrorEntry failed : result.getFailed()) {
+                logger.error("Commit failed reason: {} messages id: {}", failed.getMessage(), failed.getId());
             }
         }
     }
 
 
-
-    /** Read the object from Base64 string. */
-    private Object fromString( String s, Class klass ) throws IOException, ClassNotFoundException {
-        Object o =  mapper.readValue(s,klass);
+    /**
+     * Read the object from Base64 string.
+     */
+    private Object fromString(final String s,
+                              final Class klass) throws IOException, ClassNotFoundException {
+        Object o = mapper.readValue(s, klass);
         return o;
     }
 
-    /** Write the object to a Base64 string. */
-    private  String toString( Object o ) throws IOException {
+    /**
+     * Write the object to a Base64 string.
+     */
+    protected String toString(final Object o) throws IOException {
         return mapper.writeValueAsString(o);
     }
 
 
     /**
      * Get the region
+     *
      * @return
      */
-    private Region getRegion() {
-        Regions regions = Regions.fromName( fig.getRegion() );
-        Region region = Region.getRegion( regions );
+    protected Region getRegion() {
+        Regions regions = Regions.fromName(fig.getRegion());
+        Region region = Region.getRegion(regions);
         return region;
     }
 
@@ -270,9 +333,9 @@ public class SQSQueueManagerImpl implements QueueManager {
      */
     private AmazonSQSClient createClient() {
         final UsergridAwsCredentialsProvider ugProvider = new UsergridAwsCredentialsProvider();
-        final AmazonSQSClient sqs = new AmazonSQSClient( ugProvider.getCredentials() );
+        final AmazonSQSClient sqs = new AmazonSQSClient(ugProvider.getCredentials());
         final Region region = getRegion();
-        sqs.setRegion( region );
+        sqs.setRegion(region);
 
         return sqs;
     }