You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/05/28 01:52:45 UTC
[09/15] incubator-usergrid git commit: Reformatted for readability,
small changes to support full impl of async events
Reformatted for readability, small changes to support full impl of async events
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/8cb287ac
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/8cb287ac
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/8cb287ac
Branch: refs/heads/two-dot-o-dev
Commit: 8cb287ac6d43acab4d68506b35d4a7725ff344d0
Parents: 7298853
Author: Jeff West <jw...@apigee.com>
Authored: Tue May 26 09:08:10 2015 -0700
Committer: Jeff West <jw...@apigee.com>
Committed: Tue May 26 09:08:10 2015 -0700
----------------------------------------------------------------------
.../queue/impl/SQSQueueManagerImpl.java | 239 ++++++++++++-------
1 file changed, 151 insertions(+), 88 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8cb287ac/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
index 088359a..e28e805 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
@@ -19,11 +19,10 @@ package org.apache.usergrid.persistence.queue.impl;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.UUID;
+import java.util.*;
import java.util.concurrent.ExecutionException;
+import org.apache.usergrid.persistence.queue.util.AmazonNotificationUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -62,69 +61,100 @@ import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
public class SQSQueueManagerImpl implements QueueManager {
- private static final Logger LOG = LoggerFactory.getLogger(SQSQueueManagerImpl.class);
+ private static final Logger logger = LoggerFactory.getLogger(SQSQueueManagerImpl.class);
- private final QueueScope scope;
- private ObjectMapper mapper;
- private final QueueFig fig;
- private final AmazonSQSClient sqs;
+ private final QueueScope scope;
+ private ObjectMapper mapper;
+ protected final QueueFig fig;
+ protected final AmazonSQSClient sqs;
private static SmileFactory smileFactory = new SmileFactory();
private LoadingCache<String, Queue> urlMap = CacheBuilder.newBuilder()
- .maximumSize( 1000 )
- .build( new CacheLoader<String, Queue>() {
- @Override
- public Queue load( String queueName ) throws Exception {
-
- //the amazon client is not thread safe, we need to create one per queue
- Queue queue = null;
- try {
- GetQueueUrlResult result = sqs.getQueueUrl( queueName );
- queue = new Queue( result.getQueueUrl() );
- }catch ( QueueDoesNotExistException queueDoesNotExistException ) {
- //no op, swallow
- LOG.error( "Queue {} does not exist, creating", queueName );
-
- }
- catch ( Exception e ) {
- LOG.error( "failed to get queue from service", e );
- throw e;
- }
- if ( queue == null ) {
- CreateQueueRequest createQueueRequest = new CreateQueueRequest().withQueueName( queueName );
- CreateQueueResult result = sqs.createQueue( createQueueRequest );
- String url = result.getQueueUrl();
- queue = new Queue( url );
- LOG.info( "Created queue with url {}", url );
- }
- return queue;
+ .maximumSize(1000)
+ .build(new CacheLoader<String, Queue>() {
+ @Override
+ public Queue load(String queueName) throws Exception {
+
+ //the amazon client is not thread safe, we need to create one per queue
+ Queue queue = null;
+
+ try {
+
+ GetQueueUrlResult result = sqs.getQueueUrl(queueName);
+ queue = new Queue(result.getQueueUrl());
+
+ } catch (QueueDoesNotExistException queueDoesNotExistException) {
+ //no op, swallow
+ logger.error("Queue {} does not exist, creating", queueName);
+
+ } catch (Exception e) {
+ logger.error("failed to get queue from service", e);
+ throw e;
+ }
+
+ if (queue == null) {
+
+ final String deadletterQueueName = String.format("%s_dead", queueName);
+ final Map<String, String> deadLetterAttributes = new HashMap<>(2);
+
+ deadLetterAttributes.put("MessageRetentionPeriod", fig.getDeadletterRetentionPeriod());
+ CreateQueueRequest createDeadLetterQueueRequest = new CreateQueueRequest()
+ .withQueueName(deadletterQueueName).withAttributes(deadLetterAttributes);
+
+ final CreateQueueResult deadletterResult = sqs.createQueue(createDeadLetterQueueRequest);
+ logger.info("Created deadletter queue with url {}", deadletterResult.getQueueUrl());
+
+ final String deadletterArn = AmazonNotificationUtils.getQueueArnByName(deadletterQueueName, sqs);
+
+ String redrivePolicy = String.format("{\"maxReceiveCount\":\"%s\"," +
+ " \"deadLetterTargetArn\":\"%s\"}", fig.getQueueDeliveryLimit(), deadletterArn);
+
+ final Map<String, String> queueAttributes = new HashMap<>(2);
+ deadLetterAttributes.put("MessageRetentionPeriod", fig.getRetentionPeriod());
+ deadLetterAttributes.put("RedrivePolicy", redrivePolicy);
+
+ CreateQueueRequest createQueueRequest = new CreateQueueRequest().
+ withQueueName(queueName)
+ .withAttributes(queueAttributes);
+
+ CreateQueueResult result = sqs.createQueue(createQueueRequest);
+
+ String url = result.getQueueUrl();
+ queue = new Queue(url);
+
+ logger.info("Created queue with url {}", url);
}
- } );
+
+ return queue;
+ }
+ });
@Inject
- public SQSQueueManagerImpl( @Assisted QueueScope scope, QueueFig fig ){
+ public SQSQueueManagerImpl(@Assisted QueueScope scope, final QueueFig fig) {
+
this.scope = scope;
this.fig = fig;
try {
smileFactory.delegateToTextual(true);
- mapper = new ObjectMapper( smileFactory );
+ mapper = new ObjectMapper(smileFactory);
//pretty print, disabling for speed
// mapper.enable(SerializationFeature.INDENT_OUTPUT);
mapper.enableDefaultTypingAsProperty(ObjectMapper.DefaultTyping.JAVA_LANG_OBJECT, "@class");
sqs = createClient();
- } catch ( Exception e ) {
+ } catch (Exception e) {
throw new RuntimeException("Error setting up mapper", e);
}
}
- private String getName() {
+ protected String getName() {
+
String name = fig.getPrefix() + "_" + scope.getName();
Preconditions.checkArgument(name.length() <= 80, "Your name must be < than 80 characters");
@@ -133,6 +163,7 @@ public class SQSQueueManagerImpl implements QueueManager {
}
public Queue getQueue() {
+
try {
Queue queue = urlMap.get(getName());
return queue;
@@ -142,67 +173,86 @@ public class SQSQueueManagerImpl implements QueueManager {
}
@Override
- public List<QueueMessage> getMessages(int limit, int transactionTimeout, int waitTime, Class klass) {
- if(sqs == null){
- LOG.error("Sqs is null");
+ public List<QueueMessage> getMessages(final int limit,
+ final int transactionTimeout,
+ final int waitTime,
+ final Class klass) {
+
+ if (sqs == null) {
+ logger.error("Sqs is null");
return new ArrayList<>();
}
- waitTime = waitTime/1000;
+
String url = getQueue().getUrl();
- LOG.debug( "Getting {} messages from {}", limit, url);
+
+ if (logger.isDebugEnabled()) logger.debug("Getting Max {} messages from {}", limit, url);
+
ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(url);
receiveMessageRequest.setMaxNumberOfMessages(limit);
- receiveMessageRequest.setVisibilityTimeout(transactionTimeout);
- receiveMessageRequest.setWaitTimeSeconds(waitTime);
+ receiveMessageRequest.setVisibilityTimeout(transactionTimeout / 1000);
+ receiveMessageRequest.setWaitTimeSeconds(waitTime / 1000);
ReceiveMessageResult result = sqs.receiveMessage(receiveMessageRequest);
List<Message> messages = result.getMessages();
- LOG.debug( "Received {} messages from {}", messages.size(), url);
+
+ if (logger.isDebugEnabled()) logger.debug("Received {} messages from {}", messages.size(), url);
+
List<QueueMessage> queueMessages = new ArrayList<>(messages.size());
+
for (Message message : messages) {
- Object body ;
- try{
- body = fromString(message.getBody(),klass);
- }catch (Exception e){
- LOG.error("failed to deserialize message", e);
+ Object body;
+
+ try {
+ body = fromString(message.getBody(), klass);
+ } catch (Exception e) {
+ logger.error("failed to deserialize message", e);
throw new RuntimeException(e);
}
- QueueMessage queueMessage = new QueueMessage(message.getMessageId(),message.getReceiptHandle(),body,message.getAttributes().get( "type" ));
+
+ QueueMessage queueMessage = new QueueMessage(message.getMessageId(), message.getReceiptHandle(), body, message.getAttributes().get("type"));
queueMessages.add(queueMessage);
}
+
return queueMessages;
}
@Override
- public void sendMessages(List bodies) throws IOException {
- if(sqs == null){
- LOG.error("Sqs is null");
+ public void sendMessages(final List bodies) throws IOException {
+
+ if (sqs == null) {
+ logger.error("Sqs is null");
return;
}
String url = getQueue().getUrl();
- LOG.debug( "Sending Messages...{} to {}", bodies.size(), url);
+
+ if (logger.isDebugEnabled()) logger.debug("Sending Messages...{} to {}", bodies.size(), url);
SendMessageBatchRequest request = new SendMessageBatchRequest(url);
List<SendMessageBatchRequestEntry> entries = new ArrayList<>(bodies.size());
- for(Object body : bodies){
+
+ for (Object body : bodies) {
SendMessageBatchRequestEntry entry = new SendMessageBatchRequestEntry();
entry.setId(UUID.randomUUID().toString());
- entry.setMessageBody( toString( body ) );
- entry.addMessageAttributesEntry( "type",new MessageAttributeValue().withStringValue( "mytype" ) );
+ entry.setMessageBody(toString(body));
+ entry.addMessageAttributesEntry("type", new MessageAttributeValue().withStringValue("mytype"));
entries.add(entry);
}
+
request.setEntries(entries);
sqs.sendMessageBatch(request);
}
@Override
- public void sendMessage(Object body) throws IOException {
- if(sqs == null){
- LOG.error("Sqs is null");
+ public void sendMessage(final Object body) throws IOException {
+
+ if (sqs == null) {
+ logger.error("Sqs is null");
return;
}
+
String url = getQueue().getUrl();
- LOG.debug( "Sending Message...{} to {}", body.toString(), url);
+
+ if (logger.isDebugEnabled()) logger.debug("Sending Message...{} to {}", body.toString(), url);
final String stringBody = toString(body);
@@ -212,55 +262,68 @@ public class SQSQueueManagerImpl implements QueueManager {
@Override
- public void commitMessage(QueueMessage queueMessage) {
+ public void commitMessage(final QueueMessage queueMessage) {
+
String url = getQueue().getUrl();
- LOG.debug( "Commit message {} to queue {}", queueMessage.getMessageId(), url);
+ if (logger.isDebugEnabled()) logger.debug("Commit message {} to queue {}", queueMessage.getMessageId(), url);
sqs.deleteMessage(new DeleteMessageRequest()
- .withQueueUrl(url)
- .withReceiptHandle(queueMessage.getHandle()));
+ .withQueueUrl(url)
+ .withReceiptHandle(queueMessage.getHandle()));
}
@Override
- public void commitMessages(List<QueueMessage> queueMessages) {
+ public void commitMessages(final List<QueueMessage> queueMessages) {
+
String url = getQueue().getUrl();
- LOG.debug( "Commit messages {} to queue {}", queueMessages.size(), url);
+ if (logger.isDebugEnabled()) logger.debug("Commit messages {} to queue {}", queueMessages.size(), url);
+
List<DeleteMessageBatchRequestEntry> entries = new ArrayList<>();
- for(QueueMessage message : queueMessages){
- entries.add(new DeleteMessageBatchRequestEntry(message.getMessageId(),message.getHandle()));
+
+ for (QueueMessage message : queueMessages) {
+ entries.add(new DeleteMessageBatchRequestEntry(message.getMessageId(), message.getHandle()));
}
- DeleteMessageBatchRequest request = new DeleteMessageBatchRequest(url,entries);
+
+ DeleteMessageBatchRequest request = new DeleteMessageBatchRequest(url, entries);
DeleteMessageBatchResult result = sqs.deleteMessageBatch(request);
+
boolean successful = result.getFailed().size() <= 0;
- if(!successful){
- for( BatchResultErrorEntry failed : result.getFailed()) {
- LOG.error("Commit failed reason: {} messages id: {}", failed.getMessage(),failed.getId());
+
+ if (!successful) {
+
+ for (BatchResultErrorEntry failed : result.getFailed()) {
+ logger.error("Commit failed reason: {} messages id: {}", failed.getMessage(), failed.getId());
}
}
}
-
- /** Read the object from Base64 string. */
- private Object fromString( String s, Class klass ) throws IOException, ClassNotFoundException {
- Object o = mapper.readValue(s,klass);
+ /**
+ * Read the object from Base64 string.
+ */
+ private Object fromString(final String s,
+ final Class klass) throws IOException, ClassNotFoundException {
+ Object o = mapper.readValue(s, klass);
return o;
}
- /** Write the object to a Base64 string. */
- private String toString( Object o ) throws IOException {
+ /**
+ * Write the object to a Base64 string.
+ */
+ protected String toString(final Object o) throws IOException {
return mapper.writeValueAsString(o);
}
/**
* Get the region
+ *
* @return
*/
- private Region getRegion() {
- Regions regions = Regions.fromName( fig.getRegion() );
- Region region = Region.getRegion( regions );
+ protected Region getRegion() {
+ Regions regions = Regions.fromName(fig.getRegion());
+ Region region = Region.getRegion(regions);
return region;
}
@@ -270,9 +333,9 @@ public class SQSQueueManagerImpl implements QueueManager {
*/
private AmazonSQSClient createClient() {
final UsergridAwsCredentialsProvider ugProvider = new UsergridAwsCredentialsProvider();
- final AmazonSQSClient sqs = new AmazonSQSClient( ugProvider.getCredentials() );
+ final AmazonSQSClient sqs = new AmazonSQSClient(ugProvider.getCredentials());
final Region region = getRegion();
- sqs.setRegion( region );
+ sqs.setRegion(region);
return sqs;
}