You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by gr...@apache.org on 2015/06/10 18:45:34 UTC

[12/35] incubator-usergrid git commit: added debug logging and exception handling I should have done last time...

added debug logging and exception handling I should have done last time...


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/1ba2ce40
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/1ba2ce40
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/1ba2ce40

Branch: refs/heads/rest-test-framework-token-fix
Commit: 1ba2ce40bf98c8b0a8b9b79d9d6c5de1f742823b
Parents: 2e6b4e5
Author: Jeff West <jw...@apigee.com>
Authored: Fri Jun 5 16:59:50 2015 -0700
Committer: Jeff West <jw...@apigee.com>
Committed: Fri Jun 5 16:59:50 2015 -0700

----------------------------------------------------------------------
 .../queue/impl/SNSQueueManagerImpl.java         | 68 ++++++++++++++------
 .../queue/util/AmazonNotificationUtils.java     | 17 +++--
 2 files changed, 61 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/1ba2ce40/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
index 802c2ce..e5c0c2a 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
@@ -118,13 +118,25 @@ public class SNSQueueManagerImpl implements QueueManager {
     private String setupMultiRegion(final String queueName)
         throws Exception {
 
+        logger.info("Setting up SNS/SQS...");
+
         String primaryTopicArn = AmazonNotificationUtils.getTopicArn(queueName, sns, true);
 
+        if (logger.isDebugEnabled()) logger.debug("SNS/SQS Setup: primaryTopicArn=" + primaryTopicArn);
+
         String primaryQueueArn = AmazonNotificationUtils.getQueueArnByName(queueName, sqs);
 
+        if (logger.isDebugEnabled()) logger.debug("SNS/SQS Setup: primaryQueueArn=" + primaryQueueArn);
+
         if (primaryQueueArn == null) {
+            if (logger.isDebugEnabled())
+                logger.debug("SNS/SQS Setup: primaryQueueArn is null, setting creating queue...");
+
             String queueUrl = AmazonNotificationUtils.createQueue(queueName, sqs, fig);
             primaryQueueArn = AmazonNotificationUtils.getQueueArnByUrl(queueUrl, sqs);
+
+            if (logger.isDebugEnabled())
+                logger.debug("SNS/SQS Setup: New Queue URL=[{}] ARN=[{}]", queueUrl, primaryQueueArn);
         }
 
         AmazonNotificationUtils.subscribeQueueToTopic(primaryTopicArn, primaryQueueArn, sns);
@@ -132,6 +144,10 @@ public class SNSQueueManagerImpl implements QueueManager {
         if (fig.isMultiRegion()) {
 
             String multiRegion = fig.getRegionList();
+
+            if (logger.isDebugEnabled())
+                logger.debug("MultiRegion Setup specified, regions: {}", multiRegion);
+
             String[] regionNames = multiRegion.split(",");
 
             final Set<String> arrQueueArns = new HashSet<>(regionNames.length + 1);
@@ -213,45 +229,55 @@ public class SNSQueueManagerImpl implements QueueManager {
 
     @Override
     public rx.Observable<QueueMessage> getMessages(final int limit,
-                                          final int transactionTimeout,
-                                          final int waitTime,
-                                          final Class klass) {
+                                                   final int transactionTimeout,
+                                                   final int waitTime,
+                                                   final Class klass) {
 
         if (sqs == null) {
             logger.error("SQS is null - was not initialized properly");
             return rx.Observable.empty();
         }
 
-
         String url = getReadQueue().getUrl();
 
-        if (logger.isDebugEnabled()) logger.debug("Getting {} messages from {}", limit, url);
+        if (logger.isDebugEnabled()) logger.debug("Getting up to {} messages from {}", limit, url);
 
         ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(url);
         receiveMessageRequest.setMaxNumberOfMessages(limit);
         receiveMessageRequest.setVisibilityTimeout(transactionTimeout / 1000);
         receiveMessageRequest.setWaitTimeSeconds(waitTime / 1000);
-        ReceiveMessageResult result = sqs.receiveMessage(receiveMessageRequest);
-        List<Message> messages = result.getMessages();
 
-        if (logger.isDebugEnabled()) logger.debug("Received {} messages from {}", messages.size(), url);
+        try {
+            ReceiveMessageResult result = sqs.receiveMessage(receiveMessageRequest);
+            List<Message> messages = result.getMessages();
+
+            if (logger.isDebugEnabled()) logger.debug("Received {} messages from {}", messages.size(), url);
 
-        List<QueueMessage> queueMessages = new ArrayList<>(messages.size());
+            List<QueueMessage> queueMessages = new ArrayList<>(messages.size());
 
-        for (Message message : messages) {
-            Object body;
+            for (Message message : messages) {
+                Object body;
 
-            try {
-                body = fromString(message.getBody(), klass);
-            } catch (Exception e) {
-                logger.error("failed to deserialize message", e);
-                throw new RuntimeException(e);
+                try {
+                    body = fromString(message.getBody(), klass);
+                } catch (Exception e) {
+                    logger.error(String.format("failed to deserialize message: %s", message.getBody()), e);
+                    throw new RuntimeException(e);
+                }
+
+                QueueMessage queueMessage = new QueueMessage(message.getMessageId(), message.getReceiptHandle(), body, message.getAttributes().get("type"));
+                queueMessages.add(queueMessage);
             }
 
-            QueueMessage queueMessage = new QueueMessage(message.getMessageId(), message.getReceiptHandle(), body, message.getAttributes().get("type"));
-            queueMessages.add(queueMessage);
+            return rx.Observable.from(queueMessages);
+
+        } catch (com.amazonaws.services.sqs.model.QueueDoesNotExistException dne) {
+            logger.error(String.format("Queue does not exist! [%s]", url), dne);
+        } catch (Exception e) {
+            logger.error(String.format("Programming error getting messages from queue=[%s] exist!", url), e);
         }
-        return rx.Observable.from( queueMessages);
+
+        return rx.Observable.from(new ArrayList<>(0));
     }
 
     @Override
@@ -292,7 +318,8 @@ public class SNSQueueManagerImpl implements QueueManager {
     @Override
     public void commitMessage(final QueueMessage queueMessage) {
         String url = getReadQueue().getUrl();
-        if (logger.isDebugEnabled()) logger.debug("Commit message {} to queue {}", queueMessage.getMessageId(), url);
+        if (logger.isDebugEnabled())
+            logger.debug("Commit message {} to queue {}", queueMessage.getMessageId(), url);
 
         sqs.deleteMessage(new DeleteMessageRequest()
             .withQueueUrl(url)
@@ -328,6 +355,7 @@ public class SNSQueueManagerImpl implements QueueManager {
     /**
      * Read the object from Base64 string.
      */
+
     private Object fromString(final String s, final Class klass)
         throws IOException, ClassNotFoundException {
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/1ba2ce40/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/util/AmazonNotificationUtils.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/util/AmazonNotificationUtils.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/util/AmazonNotificationUtils.java
index bdceb2c..852fc43 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/util/AmazonNotificationUtils.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/util/AmazonNotificationUtils.java
@@ -80,7 +80,6 @@ public class AmazonNotificationUtils {
         }
 
         if (queueUrl != null) {
-            String queueArn = null;
 
             try {
                 GetQueueAttributesRequest queueAttributesRequest = new GetQueueAttributesRequest(queueUrl)
@@ -104,8 +103,6 @@ public class AmazonNotificationUtils {
                                           final AmazonSQSClient sqs)
         throws Exception {
 
-        String queueArn = null;
-
         try {
             GetQueueAttributesRequest queueAttributesRequest = new GetQueueAttributesRequest(queueUrl)
                 .withAttributeNames("All");
@@ -154,6 +151,9 @@ public class AmazonNotificationUtils {
                                      final boolean createOnMissing)
         throws Exception {
 
+        if (logger.isDebugEnabled())
+            logger.debug("Looking up Topic ARN: {}", queueName);
+
         ListTopicsResult listTopicsResult = sns.listTopics();
         String topicArn = null;
 
@@ -162,15 +162,24 @@ public class AmazonNotificationUtils {
 
             if (queueName.equals(arn.substring(arn.lastIndexOf(':')))) {
                 topicArn = arn;
+
+                logger.info("Found existing topic arn=[{}] for queue=[{}]", topicArn, queueName);
             }
         }
 
         if (topicArn == null && createOnMissing) {
+            logger.info("Creating topic for queue=[{}]...", queueName);
+
             CreateTopicResult createTopicResult = sns.createTopic(queueName);
             topicArn = createTopicResult.getTopicArn();
-            logger.info("Created topic with name {} and arn {}", queueName, topicArn);
+
+            logger.info("Successfully created topic with name {} and arn {}", queueName, topicArn);
         }
 
+        if (logger.isDebugEnabled())
+            logger.debug("Returning Topic ARN=[{}] for Queue=[{}]", topicArn, queueName);
+
+
         return topicArn;
     }