You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by gr...@apache.org on 2015/06/10 18:45:34 UTC
[12/35] incubator-usergrid git commit: added debug logging and
exception handling I should have done last time...
added debug logging and exception handling I should have done last time...
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/1ba2ce40
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/1ba2ce40
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/1ba2ce40
Branch: refs/heads/rest-test-framework-token-fix
Commit: 1ba2ce40bf98c8b0a8b9b79d9d6c5de1f742823b
Parents: 2e6b4e5
Author: Jeff West <jw...@apigee.com>
Authored: Fri Jun 5 16:59:50 2015 -0700
Committer: Jeff West <jw...@apigee.com>
Committed: Fri Jun 5 16:59:50 2015 -0700
----------------------------------------------------------------------
.../queue/impl/SNSQueueManagerImpl.java | 68 ++++++++++++++------
.../queue/util/AmazonNotificationUtils.java | 17 +++--
2 files changed, 61 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/1ba2ce40/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
index 802c2ce..e5c0c2a 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
@@ -118,13 +118,25 @@ public class SNSQueueManagerImpl implements QueueManager {
private String setupMultiRegion(final String queueName)
throws Exception {
+ logger.info("Setting up SNS/SQS...");
+
String primaryTopicArn = AmazonNotificationUtils.getTopicArn(queueName, sns, true);
+ if (logger.isDebugEnabled()) logger.debug("SNS/SQS Setup: primaryTopicArn=" + primaryTopicArn);
+
String primaryQueueArn = AmazonNotificationUtils.getQueueArnByName(queueName, sqs);
+ if (logger.isDebugEnabled()) logger.debug("SNS/SQS Setup: primaryQueueArn=" + primaryQueueArn);
+
if (primaryQueueArn == null) {
+ if (logger.isDebugEnabled())
+ logger.debug("SNS/SQS Setup: primaryQueueArn is null, setting creating queue...");
+
String queueUrl = AmazonNotificationUtils.createQueue(queueName, sqs, fig);
primaryQueueArn = AmazonNotificationUtils.getQueueArnByUrl(queueUrl, sqs);
+
+ if (logger.isDebugEnabled())
+ logger.debug("SNS/SQS Setup: New Queue URL=[{}] ARN=[{}]", queueUrl, primaryQueueArn);
}
AmazonNotificationUtils.subscribeQueueToTopic(primaryTopicArn, primaryQueueArn, sns);
@@ -132,6 +144,10 @@ public class SNSQueueManagerImpl implements QueueManager {
if (fig.isMultiRegion()) {
String multiRegion = fig.getRegionList();
+
+ if (logger.isDebugEnabled())
+ logger.debug("MultiRegion Setup specified, regions: {}", multiRegion);
+
String[] regionNames = multiRegion.split(",");
final Set<String> arrQueueArns = new HashSet<>(regionNames.length + 1);
@@ -213,45 +229,55 @@ public class SNSQueueManagerImpl implements QueueManager {
@Override
public rx.Observable<QueueMessage> getMessages(final int limit,
- final int transactionTimeout,
- final int waitTime,
- final Class klass) {
+ final int transactionTimeout,
+ final int waitTime,
+ final Class klass) {
if (sqs == null) {
logger.error("SQS is null - was not initialized properly");
return rx.Observable.empty();
}
-
String url = getReadQueue().getUrl();
- if (logger.isDebugEnabled()) logger.debug("Getting {} messages from {}", limit, url);
+ if (logger.isDebugEnabled()) logger.debug("Getting up to {} messages from {}", limit, url);
ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(url);
receiveMessageRequest.setMaxNumberOfMessages(limit);
receiveMessageRequest.setVisibilityTimeout(transactionTimeout / 1000);
receiveMessageRequest.setWaitTimeSeconds(waitTime / 1000);
- ReceiveMessageResult result = sqs.receiveMessage(receiveMessageRequest);
- List<Message> messages = result.getMessages();
- if (logger.isDebugEnabled()) logger.debug("Received {} messages from {}", messages.size(), url);
+ try {
+ ReceiveMessageResult result = sqs.receiveMessage(receiveMessageRequest);
+ List<Message> messages = result.getMessages();
+
+ if (logger.isDebugEnabled()) logger.debug("Received {} messages from {}", messages.size(), url);
- List<QueueMessage> queueMessages = new ArrayList<>(messages.size());
+ List<QueueMessage> queueMessages = new ArrayList<>(messages.size());
- for (Message message : messages) {
- Object body;
+ for (Message message : messages) {
+ Object body;
- try {
- body = fromString(message.getBody(), klass);
- } catch (Exception e) {
- logger.error("failed to deserialize message", e);
- throw new RuntimeException(e);
+ try {
+ body = fromString(message.getBody(), klass);
+ } catch (Exception e) {
+ logger.error(String.format("failed to deserialize message: %s", message.getBody()), e);
+ throw new RuntimeException(e);
+ }
+
+ QueueMessage queueMessage = new QueueMessage(message.getMessageId(), message.getReceiptHandle(), body, message.getAttributes().get("type"));
+ queueMessages.add(queueMessage);
}
- QueueMessage queueMessage = new QueueMessage(message.getMessageId(), message.getReceiptHandle(), body, message.getAttributes().get("type"));
- queueMessages.add(queueMessage);
+ return rx.Observable.from(queueMessages);
+
+ } catch (com.amazonaws.services.sqs.model.QueueDoesNotExistException dne) {
+ logger.error(String.format("Queue does not exist! [%s]", url), dne);
+ } catch (Exception e) {
+ logger.error(String.format("Programming error getting messages from queue=[%s] exist!", url), e);
}
- return rx.Observable.from( queueMessages);
+
+ return rx.Observable.from(new ArrayList<>(0));
}
@Override
@@ -292,7 +318,8 @@ public class SNSQueueManagerImpl implements QueueManager {
@Override
public void commitMessage(final QueueMessage queueMessage) {
String url = getReadQueue().getUrl();
- if (logger.isDebugEnabled()) logger.debug("Commit message {} to queue {}", queueMessage.getMessageId(), url);
+ if (logger.isDebugEnabled())
+ logger.debug("Commit message {} to queue {}", queueMessage.getMessageId(), url);
sqs.deleteMessage(new DeleteMessageRequest()
.withQueueUrl(url)
@@ -328,6 +355,7 @@ public class SNSQueueManagerImpl implements QueueManager {
/**
* Read the object from Base64 string.
*/
+
private Object fromString(final String s, final Class klass)
throws IOException, ClassNotFoundException {
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/1ba2ce40/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/util/AmazonNotificationUtils.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/util/AmazonNotificationUtils.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/util/AmazonNotificationUtils.java
index bdceb2c..852fc43 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/util/AmazonNotificationUtils.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/util/AmazonNotificationUtils.java
@@ -80,7 +80,6 @@ public class AmazonNotificationUtils {
}
if (queueUrl != null) {
- String queueArn = null;
try {
GetQueueAttributesRequest queueAttributesRequest = new GetQueueAttributesRequest(queueUrl)
@@ -104,8 +103,6 @@ public class AmazonNotificationUtils {
final AmazonSQSClient sqs)
throws Exception {
- String queueArn = null;
-
try {
GetQueueAttributesRequest queueAttributesRequest = new GetQueueAttributesRequest(queueUrl)
.withAttributeNames("All");
@@ -154,6 +151,9 @@ public class AmazonNotificationUtils {
final boolean createOnMissing)
throws Exception {
+ if (logger.isDebugEnabled())
+ logger.debug("Looking up Topic ARN: {}", queueName);
+
ListTopicsResult listTopicsResult = sns.listTopics();
String topicArn = null;
@@ -162,15 +162,24 @@ public class AmazonNotificationUtils {
if (queueName.equals(arn.substring(arn.lastIndexOf(':')))) {
topicArn = arn;
+
+ logger.info("Found existing topic arn=[{}] for queue=[{}]", topicArn, queueName);
}
}
if (topicArn == null && createOnMissing) {
+ logger.info("Creating topic for queue=[{}]...", queueName);
+
CreateTopicResult createTopicResult = sns.createTopic(queueName);
topicArn = createTopicResult.getTopicArn();
- logger.info("Created topic with name {} and arn {}", queueName, topicArn);
+
+ logger.info("Successfully created topic with name {} and arn {}", queueName, topicArn);
}
+ if (logger.isDebugEnabled())
+ logger.debug("Returning Topic ARN=[{}] for Queue=[{}]", topicArn, queueName);
+
+
return topicArn;
}