You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2015/10/20 23:07:16 UTC
[15/33] usergrid git commit: Fixes serialization tests and verifies
full end to end functionality.
Fixes serialization tests and verifies full end to end functionality.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/04a3f47b
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/04a3f47b
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/04a3f47b
Branch: refs/heads/usergrid-1007-shiro-cache
Commit: 04a3f47bd86ec0674ff487f479327e0f174f0425
Parents: 94a9078
Author: Todd Nine <tn...@apigee.com>
Authored: Mon Oct 19 12:00:56 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Mon Oct 19 12:00:56 2015 -0600
----------------------------------------------------------------------
.../asyncevents/AmazonAsyncEventService.java | 70 ++-
.../asyncevents/model/AsyncEvent.java | 5 +-
.../index/IndexProcessorFig.java | 7 +-
.../util/ObjectJsonSerializer.java | 28 +-
.../persistence/queue/QueueManager.java | 2 +-
.../persistence/queue/guice/QueueModule.java | 1 -
.../queue/impl/SNSQueueManagerImpl.java | 515 ++++++++++---------
.../queue/impl/SQSQueueManagerImpl.java | 362 -------------
8 files changed, 336 insertions(+), 654 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/04a3f47b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index c9f0953..d319ac8 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -29,19 +29,13 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.Stream;
-import com.google.common.base.Optional;
-
-import org.apache.usergrid.corepersistence.asyncevents.model.ElasticsearchIndexEvent;
-import org.apache.usergrid.corepersistence.util.CpNamingUtils;
-import org.apache.usergrid.corepersistence.util.ObjectJsonSerializer;
-import org.apache.usergrid.exception.NotImplementedException;
-import org.apache.usergrid.persistence.index.impl.IndexProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.usergrid.corepersistence.asyncevents.model.AsyncEvent;
import org.apache.usergrid.corepersistence.asyncevents.model.EdgeDeleteEvent;
import org.apache.usergrid.corepersistence.asyncevents.model.EdgeIndexEvent;
+import org.apache.usergrid.corepersistence.asyncevents.model.ElasticsearchIndexEvent;
import org.apache.usergrid.corepersistence.asyncevents.model.EntityDeleteEvent;
import org.apache.usergrid.corepersistence.asyncevents.model.EntityIndexEvent;
import org.apache.usergrid.corepersistence.asyncevents.model.InitializeApplicationIndexEvent;
@@ -50,6 +44,8 @@ import org.apache.usergrid.corepersistence.index.IndexLocationStrategyFactory;
import org.apache.usergrid.corepersistence.index.IndexProcessorFig;
import org.apache.usergrid.corepersistence.index.ReplicatedIndexLocationStrategy;
import org.apache.usergrid.corepersistence.rx.impl.EdgeScope;
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
+import org.apache.usergrid.corepersistence.util.ObjectJsonSerializer;
import org.apache.usergrid.persistence.collection.EntityCollectionManager;
import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
@@ -61,6 +57,7 @@ import org.apache.usergrid.persistence.index.EntityIndex;
import org.apache.usergrid.persistence.index.EntityIndexFactory;
import org.apache.usergrid.persistence.index.IndexLocationStrategy;
import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
+import org.apache.usergrid.persistence.index.impl.IndexProducer;
import org.apache.usergrid.persistence.map.MapManager;
import org.apache.usergrid.persistence.map.MapManagerFactory;
import org.apache.usergrid.persistence.map.MapScope;
@@ -78,6 +75,7 @@ import com.codahale.metrics.Counter;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Timer;
+import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.inject.Inject;
import com.google.inject.Singleton;
@@ -94,8 +92,6 @@ public class AmazonAsyncEventService implements AsyncEventService {
private static final Logger logger = LoggerFactory.getLogger(AmazonAsyncEventService.class);
- private static final ObjectJsonSerializer OBJECT_JSON_SERIALIZER = new ObjectJsonSerializer( );
-
// SQS maximum receive messages is 10
private static final int MAX_TAKE = 10;
public static final String QUEUE_NAME = "index"; //keep this short as AWS limits queue name size to 80 chars
@@ -192,6 +188,22 @@ public class AmazonAsyncEventService implements AsyncEventService {
}
}
+
+ private void offerTopic( final Serializable operation ) {
+ final Timer.Context timer = this.writeTimer.time();
+
+ try {
+ //signal to SQS
+ this.queue.sendMessageToTopic( operation );
+ }
+ catch ( IOException e ) {
+ throw new RuntimeException( "Unable to queue message", e );
+ }
+ finally {
+ timer.stop();
+ }
+ }
+
private void offerBatch(final List operations){
final Timer.Context timer = this.writeTimer.time();
@@ -226,27 +238,6 @@ public class AmazonAsyncEventService implements AsyncEventService {
}
- /**
- * Ack message in SQS
- */
- public void ack(final QueueMessage message) {
-
- final Timer.Context timer = this.ackTimer.time();
-
- try{
- queue.commitMessage( message );
-
- //decrement our in-flight counter
- inFlight.decrementAndGet();
-
- }catch(Exception e){
- throw new RuntimeException("Unable to ack messages", e);
- }finally {
- timer.stop();
- }
-
-
- }
/**
* Ack message in SQS
@@ -355,7 +346,8 @@ public class AmazonAsyncEventService implements AsyncEventService {
public void queueInitializeApplicationIndex( final ApplicationScope applicationScope) {
IndexLocationStrategy indexLocationStrategy = indexLocationStrategyFactory.getIndexLocationStrategy(
applicationScope );
- offer(new InitializeApplicationIndexEvent(new ReplicatedIndexLocationStrategy(indexLocationStrategy)));
+ offerTopic(
+ new InitializeApplicationIndexEvent( new ReplicatedIndexLocationStrategy( indexLocationStrategy ) ) );
}
@@ -468,7 +460,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
*/
public void queueIndexOperationMessage( final IndexOperationMessage indexOperationMessage ) {
- final String jsonValue = OBJECT_JSON_SERIALIZER.toByteBuffer( indexOperationMessage );
+ final String jsonValue = ObjectJsonSerializer.INSTANCE.toString( indexOperationMessage );
final UUID newMessageId = UUIDGenerator.newTimeUUID();
@@ -482,12 +474,8 @@ public class AmazonAsyncEventService implements AsyncEventService {
final ElasticsearchIndexEvent elasticsearchIndexEvent = new ElasticsearchIndexEvent( newMessageId );
//send to the topic so all regions index the batch
- try {
- queue.sendMessageToTopic( elasticsearchIndexEvent );
- }
- catch ( IOException e ) {
- throw new RuntimeException( "Unable to pulish to topic", e );
- }
+
+ offerTopic( elasticsearchIndexEvent );
}
public void handleIndexOperation(final ElasticsearchIndexEvent elasticsearchIndexEvent){
@@ -505,7 +493,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
String highConsistency = null;
if(message == null){
- logger.error( "Receive message with id {} to process, unable to find it, reading with higher consistency level" );
+ logger.error( "Received message with id {} to process, unable to find it, reading with higher consistency level" );
highConsistency = esMapPersistence.getStringHighConsistency( messageId.toString() );
@@ -517,11 +505,11 @@ public class AmazonAsyncEventService implements AsyncEventService {
//our original local read has it, parse it.
if(message != null){
- indexOperationMessage = OBJECT_JSON_SERIALIZER.fromString( message, IndexOperationMessage.class );
+ indexOperationMessage = ObjectJsonSerializer.INSTANCE.fromString( message, IndexOperationMessage.class );
}
//we tried to read it at a higher consistency level and it works
else if (highConsistency != null){
- indexOperationMessage = OBJECT_JSON_SERIALIZER.fromString( highConsistency, IndexOperationMessage.class );
+ indexOperationMessage = ObjectJsonSerializer.INSTANCE.fromString( highConsistency, IndexOperationMessage.class );
}
//we couldn't find it, bail
http://git-wip-us.apache.org/repos/asf/usergrid/blob/04a3f47b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
index 1af54e3..7c51003 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
@@ -30,8 +30,11 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo;
/**
* Marker class for serialization
+ *
+ * Note that when you add a subtype, you will need to add it's serialization value below in the JsonSubTypes annotation.
+ *
+ * Each name must be unique, and must map to a subclass that is serialized
*/
-
@JsonIgnoreProperties( ignoreUnknown = true )
@JsonTypeInfo( use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.WRAPPER_OBJECT, property = "type" )
@JsonSubTypes( {
http://git-wip-us.apache.org/repos/asf/usergrid/blob/04a3f47b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
index 6fd73b4..ec9b315 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
@@ -105,10 +105,13 @@ public interface IndexProcessorFig extends GuicyFig {
boolean resolveSynchronously();
/**
- * Get the message TTL in milliseconds
+ * Get the message TTL in milliseconds. Defaults to 24 hours
+ *
+ * 24 * 60 * 60 * 1000
+ *
* @return
*/
- @Default("604800000")
+ @Default("86400000")
@Key( "elasticsearch.message.ttl" )
int getIndexMessageTtl();
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/04a3f47b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/ObjectJsonSerializer.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/ObjectJsonSerializer.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/ObjectJsonSerializer.java
index dbd5ca3..4e5873a 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/ObjectJsonSerializer.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/ObjectJsonSerializer.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.io.Serializable;
import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
@@ -33,16 +34,33 @@ import com.google.common.base.Preconditions;
public final class ObjectJsonSerializer {
- private final JsonFactory JSON_FACTORY = new JsonFactory();
+ private static final JsonFactory JSON_FACTORY = new JsonFactory();
- private final ObjectMapper MAPPER = new ObjectMapper( JSON_FACTORY );
+ private static final ObjectMapper MAPPER = new ObjectMapper( JSON_FACTORY );
+
+ static{
+
+ /**
+ * Because of the way SNS escapes all our json, we have to tell jackson to accept it. See the documentation
+ * here for how SNS borks the message body
+ *
+ * http://docs.aws.amazon.com/sns/latest/dg/SendMessageToHttp.html
+ */
+ MAPPER.configure( JsonParser.Feature.ALLOW_BACKSLASH_ESCAPING_ANY_CHARACTER, true );
+ }
+
+ /**
+ * Singleton instance of our serializer, instantiating it and configuring the mapper is expensive.
+ */
+ public static final ObjectJsonSerializer INSTANCE = new ObjectJsonSerializer();
+
+
+ private ObjectJsonSerializer( ) {
- public ObjectJsonSerializer( ) {
- MAPPER.enableDefaultTypingAsProperty( ObjectMapper.DefaultTyping.JAVA_LANG_OBJECT, "@class" );
}
- public <T extends Serializable> String toByteBuffer( final T toSerialize ) {
+ public <T extends Serializable> String toString( final T toSerialize ) {
Preconditions.checkNotNull( toSerialize, "toSerialize must not be null" );
final String stringValue;
http://git-wip-us.apache.org/repos/asf/usergrid/blob/04a3f47b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
index dc3d1b5..34a3654 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
@@ -68,7 +68,7 @@ public interface QueueManager {
* @param body
* @throws IOException
*/
- <T extends Serializable> void sendMessage(T body)throws IOException;
+ <T extends Serializable> void sendMessage(T body)throws IOException;
/**
* Send a messae to the topic to be sent to other queues
http://git-wip-us.apache.org/repos/asf/usergrid/blob/04a3f47b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java
index dd1fe16..caf61bf 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java
@@ -26,7 +26,6 @@ import org.safehaus.guicyfig.GuicyFigModule;
import org.apache.usergrid.persistence.queue.QueueFig;
import org.apache.usergrid.persistence.queue.QueueManager;
import org.apache.usergrid.persistence.queue.QueueManagerFactory;
-import org.apache.usergrid.persistence.queue.impl.SQSQueueManagerImpl;
import com.google.inject.AbstractModule;
import com.google.inject.assistedinject.FactoryModuleBuilder;
http://git-wip-us.apache.org/repos/asf/usergrid/blob/04a3f47b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
index 59ecd24..a3fa05e 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
@@ -68,6 +68,7 @@ import com.amazonaws.services.sqs.model.ReceiveMessageResult;
import com.amazonaws.services.sqs.model.SendMessageRequest;
import com.amazonaws.services.sqs.model.SendMessageResult;
import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
@@ -77,9 +78,10 @@ import com.google.common.cache.LoadingCache;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
+
public class SNSQueueManagerImpl implements QueueManager {
- private static final Logger logger = LoggerFactory.getLogger(SNSQueueManagerImpl.class);
+ private static final Logger logger = LoggerFactory.getLogger( SNSQueueManagerImpl.class );
private final QueueScope scope;
private final QueueFig fig;
@@ -91,55 +93,64 @@ public class SNSQueueManagerImpl implements QueueManager {
private final AmazonSQSAsyncClient sqsAsync;
- private final JsonFactory JSON_FACTORY = new JsonFactory();
- private final ObjectMapper mapper = new ObjectMapper(JSON_FACTORY);
+ private static final JsonFactory JSON_FACTORY = new JsonFactory();
+ private static final ObjectMapper mapper = new ObjectMapper( JSON_FACTORY );
+
+ static {
+
+ /**
+ * Because of the way SNS escapes all our json, we have to tell jackson to accept it. See the documentation
+ * here for how SNS borks the message body
+ *
+ * http://docs.aws.amazon.com/sns/latest/dg/SendMessageToHttp.html
+ */
+ mapper.configure( JsonParser.Feature.ALLOW_BACKSLASH_ESCAPING_ANY_CHARACTER, true );
+ }
- private final LoadingCache<String, String> writeTopicArnMap = CacheBuilder.newBuilder()
- .maximumSize(1000)
- .build(new CacheLoader<String, String>() {
+ private final LoadingCache<String, String> writeTopicArnMap =
+ CacheBuilder.newBuilder().maximumSize( 1000 ).build( new CacheLoader<String, String>() {
@Override
- public String load(String queueName)
- throws Exception {
+ public String load( String queueName ) throws Exception {
- return setupTopics(queueName);
+ return setupTopics( queueName );
}
- });
+ } );
- private final LoadingCache<String, Queue> readQueueUrlMap = CacheBuilder.newBuilder()
- .maximumSize(1000)
- .build(new CacheLoader<String, Queue>() {
+ private final LoadingCache<String, Queue> readQueueUrlMap =
+ CacheBuilder.newBuilder().maximumSize( 1000 ).build( new CacheLoader<String, Queue>() {
@Override
- public Queue load(String queueName) throws Exception {
+ public Queue load( String queueName ) throws Exception {
Queue queue = null;
try {
- GetQueueUrlResult result = sqs.getQueueUrl(queueName);
- queue = new Queue(result.getQueueUrl());
- } catch (QueueDoesNotExistException queueDoesNotExistException) {
- logger.error("Queue {} does not exist, will create", queueName);
- } catch (Exception e) {
- logger.error("failed to get queue from service", e);
+ GetQueueUrlResult result = sqs.getQueueUrl( queueName );
+ queue = new Queue( result.getQueueUrl() );
+ }
+ catch ( QueueDoesNotExistException queueDoesNotExistException ) {
+ logger.error( "Queue {} does not exist, will create", queueName );
+ }
+ catch ( Exception e ) {
+ logger.error( "failed to get queue from service", e );
throw e;
}
- if (queue == null) {
- String url = AmazonNotificationUtils.createQueue(sqs, queueName, fig);
- queue = new Queue(url);
+ if ( queue == null ) {
+ String url = AmazonNotificationUtils.createQueue( sqs, queueName, fig );
+ queue = new Queue( url );
}
- setupTopics(queueName);
+ setupTopics( queueName );
return queue;
}
- });
-
+ } );
@Inject
- public SNSQueueManagerImpl(@Assisted QueueScope scope, QueueFig fig, ClusterFig clusterFig,
- CassandraFig cassandraFig, QueueFig queueFig) {
+ public SNSQueueManagerImpl( @Assisted QueueScope scope, QueueFig fig, ClusterFig clusterFig,
+ CassandraFig cassandraFig, QueueFig queueFig ) {
this.scope = scope;
this.fig = fig;
this.clusterFig = clusterFig;
@@ -148,177 +159,179 @@ public class SNSQueueManagerImpl implements QueueManager {
// create our own executor which has a bounded queue w/ caller runs policy for rejected tasks
final ExecutorService executor = TaskExecutorFactory
- .createTaskExecutor("amazon-async-io", queueFig.getAsyncMaxThreads(), queueFig.getAsyncQueueSize(),
- TaskExecutorFactory.RejectionAction.CALLERRUNS);
+ .createTaskExecutor( "amazon-async-io", queueFig.getAsyncMaxThreads(), queueFig.getAsyncQueueSize(),
+ TaskExecutorFactory.RejectionAction.CALLERRUNS );
final Region region = getRegion();
try {
- sqs = createSQSClient(region);
- sns = createSNSClient(region);
- snsAsync = createAsyncSNSClient(region, executor);
+ sqs = createSQSClient( region );
+ sns = createSNSClient( region );
+ snsAsync = createAsyncSNSClient( region, executor );
sqsAsync = createAsyncSQSClient( region, executor );
-
- } catch (Exception e) {
- throw new RuntimeException("Error setting up mapper", e);
+ }
+ catch ( Exception e ) {
+ throw new RuntimeException( "Error setting up mapper", e );
}
}
- private String setupTopics(final String queueName)
- throws Exception {
- logger.info("Setting up setupTopics SNS/SQS...");
+ private String setupTopics( final String queueName ) throws Exception {
- String primaryTopicArn = AmazonNotificationUtils.getTopicArn(sns, queueName, true);
+ logger.info( "Setting up setupTopics SNS/SQS..." );
- if (logger.isDebugEnabled()) logger.debug("SNS/SQS Setup: primaryTopicArn=" + primaryTopicArn);
+ String primaryTopicArn = AmazonNotificationUtils.getTopicArn( sns, queueName, true );
- String queueUrl = AmazonNotificationUtils.getQueueUrlByName(sqs, queueName);
- String primaryQueueArn = AmazonNotificationUtils.getQueueArnByName(sqs, queueName);
+ if ( logger.isDebugEnabled() ) {
+ logger.debug( "SNS/SQS Setup: primaryTopicArn=" + primaryTopicArn );
+ }
+
+ String queueUrl = AmazonNotificationUtils.getQueueUrlByName( sqs, queueName );
+ String primaryQueueArn = AmazonNotificationUtils.getQueueArnByName( sqs, queueName );
- if (logger.isDebugEnabled()) logger.debug("SNS/SQS Setup: primaryQueueArn=" + primaryQueueArn);
+ if ( logger.isDebugEnabled() ) {
+ logger.debug( "SNS/SQS Setup: primaryQueueArn=" + primaryQueueArn );
+ }
- if (primaryQueueArn == null) {
- if (logger.isDebugEnabled())
- logger.debug("SNS/SQS Setup: primaryQueueArn is null, creating queue...");
+ if ( primaryQueueArn == null ) {
+ if ( logger.isDebugEnabled() ) {
+ logger.debug( "SNS/SQS Setup: primaryQueueArn is null, creating queue..." );
+ }
- queueUrl = AmazonNotificationUtils.createQueue(sqs, queueName, fig);
- primaryQueueArn = AmazonNotificationUtils.getQueueArnByUrl(sqs, queueUrl);
+ queueUrl = AmazonNotificationUtils.createQueue( sqs, queueName, fig );
+ primaryQueueArn = AmazonNotificationUtils.getQueueArnByUrl( sqs, queueUrl );
- if (logger.isDebugEnabled())
- logger.debug("SNS/SQS Setup: New Queue URL=[{}] ARN=[{}]", queueUrl, primaryQueueArn);
+ if ( logger.isDebugEnabled() ) {
+ logger.debug( "SNS/SQS Setup: New Queue URL=[{}] ARN=[{}]", queueUrl, primaryQueueArn );
+ }
}
try {
- SubscribeRequest primarySubscribeRequest = new SubscribeRequest(primaryTopicArn, "sqs", primaryQueueArn);
- sns.subscribe(primarySubscribeRequest);
+ SubscribeRequest primarySubscribeRequest = new SubscribeRequest( primaryTopicArn, "sqs", primaryQueueArn );
+ sns.subscribe( primarySubscribeRequest );
// ensure the SNS primary topic has permission to send to the primary SQS queue
List<String> primaryTopicArnList = new ArrayList<>();
- primaryTopicArnList.add(primaryTopicArn);
- AmazonNotificationUtils.setQueuePermissionsToReceive(sqs, queueUrl, primaryTopicArnList);
- } catch (AmazonServiceException e) {
- logger.error(String.format("Unable to subscribe PRIMARY queue=[%s] to topic=[%s]", queueUrl, primaryTopicArn), e);
+ primaryTopicArnList.add( primaryTopicArn );
+ AmazonNotificationUtils.setQueuePermissionsToReceive( sqs, queueUrl, primaryTopicArnList );
+ }
+ catch ( AmazonServiceException e ) {
+ logger.error(
+ String.format( "Unable to subscribe PRIMARY queue=[%s] to topic=[%s]", queueUrl, primaryTopicArn ), e );
}
- if (fig.isMultiRegion() && scope.getRegionImplementation() == QueueScope.RegionImplementation.ALL) {
+ if ( fig.isMultiRegion() && scope.getRegionImplementation() == QueueScope.RegionImplementation.ALL ) {
String multiRegion = fig.getRegionList();
- if (logger.isDebugEnabled())
- logger.debug("MultiRegion Setup specified, regions: [{}]", multiRegion);
+ if ( logger.isDebugEnabled() ) {
+ logger.debug( "MultiRegion Setup specified, regions: [{}]", multiRegion );
+ }
- String[] regionNames = multiRegion.split(",");
+ String[] regionNames = multiRegion.split( "," );
- final Map<String, String> arrQueueArns = new HashMap<>(regionNames.length + 1);
- final Map<String, String> topicArns = new HashMap<>(regionNames.length + 1);
+ final Map<String, String> arrQueueArns = new HashMap<>( regionNames.length + 1 );
+ final Map<String, String> topicArns = new HashMap<>( regionNames.length + 1 );
- arrQueueArns.put(primaryQueueArn, fig.getRegion());
- topicArns.put(primaryTopicArn, fig.getRegion());
+ arrQueueArns.put( primaryQueueArn, fig.getRegion() );
+ topicArns.put( primaryTopicArn, fig.getRegion() );
- for (String regionName : regionNames) {
+ for ( String regionName : regionNames ) {
regionName = regionName.trim();
- Regions regions = Regions.fromName(regionName);
- Region region = Region.getRegion(regions);
+ Regions regions = Regions.fromName( regionName );
+ Region region = Region.getRegion( regions );
- AmazonSQSClient sqsClient = createSQSClient(region);
- AmazonSNSClient snsClient = createSNSClient(region); // do this stuff synchronously
+ AmazonSQSClient sqsClient = createSQSClient( region );
+ AmazonSNSClient snsClient = createSNSClient( region ); // do this stuff synchronously
// getTopicArn will create the SNS topic if it doesn't exist
- String topicArn = AmazonNotificationUtils.getTopicArn(snsClient, queueName, true);
- topicArns.put(topicArn, regionName);
+ String topicArn = AmazonNotificationUtils.getTopicArn( snsClient, queueName, true );
+ topicArns.put( topicArn, regionName );
// create the SQS queue if it doesn't exist
- String queueArn = AmazonNotificationUtils.getQueueArnByName(sqsClient, queueName);
- if (queueArn == null) {
- queueUrl = AmazonNotificationUtils.createQueue(sqsClient, queueName, fig);
- queueArn = AmazonNotificationUtils.getQueueArnByUrl(sqsClient, queueUrl);
+ String queueArn = AmazonNotificationUtils.getQueueArnByName( sqsClient, queueName );
+ if ( queueArn == null ) {
+ queueUrl = AmazonNotificationUtils.createQueue( sqsClient, queueName, fig );
+ queueArn = AmazonNotificationUtils.getQueueArnByUrl( sqsClient, queueUrl );
}
- arrQueueArns.put(queueArn, regionName);
+ arrQueueArns.put( queueArn, regionName );
}
- logger.debug("Creating Subscriptions...");
+ logger.debug( "Creating Subscriptions..." );
- for (Map.Entry<String, String> queueArnEntry : arrQueueArns.entrySet()) {
+ for ( Map.Entry<String, String> queueArnEntry : arrQueueArns.entrySet() ) {
String queueARN = queueArnEntry.getKey();
String strSqsRegion = queueArnEntry.getValue();
- Regions sqsRegions = Regions.fromName(strSqsRegion);
- Region sqsRegion = Region.getRegion(sqsRegions);
+ Regions sqsRegions = Regions.fromName( strSqsRegion );
+ Region sqsRegion = Region.getRegion( sqsRegions );
- AmazonSQSClient subscribeSqsClient = createSQSClient(sqsRegion);
+ AmazonSQSClient subscribeSqsClient = createSQSClient( sqsRegion );
// ensure the URL used to subscribe is for the correct name/region
- String subscribeQueueUrl = AmazonNotificationUtils.getQueueUrlByName(subscribeSqsClient, queueName);
+ String subscribeQueueUrl = AmazonNotificationUtils.getQueueUrlByName( subscribeSqsClient, queueName );
// this list used later for adding permissions to queues
List<String> topicArnList = new ArrayList<>();
- for (Map.Entry<String, String> topicArnEntry : topicArns.entrySet()) {
+ for ( Map.Entry<String, String> topicArnEntry : topicArns.entrySet() ) {
String topicARN = topicArnEntry.getKey();
- topicArnList.add(topicARN);
+ topicArnList.add( topicARN );
String strSnsRegion = topicArnEntry.getValue();
- Regions snsRegions = Regions.fromName(strSnsRegion);
- Region snsRegion = Region.getRegion(snsRegions);
+ Regions snsRegions = Regions.fromName( strSnsRegion );
+ Region snsRegion = Region.getRegion( snsRegions );
- AmazonSNSClient subscribeSnsClient = createSNSClient(snsRegion); // do this stuff synchronously
- SubscribeRequest subscribeRequest = new SubscribeRequest(topicARN, "sqs", queueARN);
+ AmazonSNSClient subscribeSnsClient = createSNSClient( snsRegion ); // do this stuff synchronously
+ SubscribeRequest subscribeRequest = new SubscribeRequest( topicARN, "sqs", queueARN );
try {
- logger.info("Subscribing Queue ARN/Region=[{} / {}] and Topic ARN/Region=[{} / {}]",
- queueARN,
- strSqsRegion,
- topicARN,
- strSnsRegion
- );
+ logger.info( "Subscribing Queue ARN/Region=[{} / {}] and Topic ARN/Region=[{} / {}]", queueARN,
+ strSqsRegion, topicARN, strSnsRegion );
- SubscribeResult subscribeResult = subscribeSnsClient.subscribe(subscribeRequest);
+ SubscribeResult subscribeResult = subscribeSnsClient.subscribe( subscribeRequest );
String subscriptionARN = subscribeResult.getSubscriptionArn();
- if(logger.isDebugEnabled()){
- logger.debug("Successfully subscribed Queue ARN=[{}] to Topic ARN=[{}], subscription ARN=[{}]", queueARN, topicARN, subscriptionARN);
+ if ( logger.isDebugEnabled() ) {
+ logger.debug(
+ "Successfully subscribed Queue ARN=[{}] to Topic ARN=[{}], subscription ARN=[{}]",
+ queueARN, topicARN, subscriptionARN );
}
-
-
- } catch (Exception e) {
- logger.error(String.format("ERROR Subscribing Queue ARN/Region=[%s / %s] and Topic ARN/Region=[%s / %s]",
- queueARN,
- strSqsRegion,
- topicARN,
- strSnsRegion), e);
-
-
+ }
+ catch ( Exception e ) {
+ logger.error( String
+ .format( "ERROR Subscribing Queue ARN/Region=[%s / %s] and Topic ARN/Region=[%s / %s]",
+ queueARN, strSqsRegion, topicARN, strSnsRegion ), e );
}
}
- logger.info("Adding permission to receive messages...");
+ logger.info( "Adding permission to receive messages..." );
// add permission to each queue, providing a list of topics that it's subscribed to
- AmazonNotificationUtils.setQueuePermissionsToReceive(subscribeSqsClient, subscribeQueueUrl, topicArnList);
-
+ AmazonNotificationUtils
+ .setQueuePermissionsToReceive( subscribeSqsClient, subscribeQueueUrl, topicArnList );
}
}
return primaryTopicArn;
}
+
/**
* The Asynchronous SNS client is used for publishing events to SNS.
- *
*/
- private AmazonSNSAsyncClient createAsyncSNSClient(final Region region, final ExecutorService executor) {
+ private AmazonSNSAsyncClient createAsyncSNSClient( final Region region, final ExecutorService executor ) {
final UsergridAwsCredentialsProvider ugProvider = new UsergridAwsCredentialsProvider();
- final AmazonSNSAsyncClient sns = new AmazonSNSAsyncClient(ugProvider.getCredentials(), executor);
+ final AmazonSNSAsyncClient sns = new AmazonSNSAsyncClient( ugProvider.getCredentials(), executor );
- sns.setRegion(region);
+ sns.setRegion( region );
return sns;
}
@@ -326,11 +339,8 @@ public class SNSQueueManagerImpl implements QueueManager {
/**
* Create the async sqs client
- * @param region
- * @param executor
- * @return
*/
- private AmazonSQSAsyncClient createAsyncSQSClient(final Region region, final ExecutorService executor){
+ private AmazonSQSAsyncClient createAsyncSQSClient( final Region region, final ExecutorService executor ) {
final UsergridAwsCredentialsProvider ugProvider = new UsergridAwsCredentialsProvider();
final AmazonSQSAsyncClient sqs = new AmazonSQSAsyncClient( ugProvider.getCredentials(), executor );
@@ -338,173 +348,209 @@ public class SNSQueueManagerImpl implements QueueManager {
sqs.setRegion( region );
return sqs;
-
}
+
/**
* The Synchronous SNS client is used for creating topics and subscribing queues.
- *
*/
- private AmazonSNSClient createSNSClient(final Region region) {
+ private AmazonSNSClient createSNSClient( final Region region ) {
final UsergridAwsCredentialsProvider ugProvider = new UsergridAwsCredentialsProvider();
- final AmazonSNSClient sns = new AmazonSNSClient(ugProvider.getCredentials());
+ final AmazonSNSClient sns = new AmazonSNSClient( ugProvider.getCredentials() );
- sns.setRegion(region);
+ sns.setRegion( region );
return sns;
}
private String getName() {
- String name = clusterFig.getClusterName() + "_" + cassandraFig.getApplicationKeyspace() + "_" + scope.getName() + "_" + scope.getRegionImplementation();
+ String name =
+ clusterFig.getClusterName() + "_" + cassandraFig.getApplicationKeyspace() + "_" + scope.getName() + "_"
+ + scope.getRegionImplementation();
name = name.toLowerCase(); //user lower case values
- Preconditions.checkArgument(name.length() <= 80, "Your name must be < than 80 characters");
+ Preconditions.checkArgument( name.length() <= 80, "Your name must be < than 80 characters" );
return name;
}
+
public Queue getReadQueue() {
String queueName = getName();
try {
- return readQueueUrlMap.get(queueName);
- } catch (ExecutionException ee) {
- throw new RuntimeException(ee);
+ return readQueueUrlMap.get( queueName );
+ }
+ catch ( ExecutionException ee ) {
+ throw new RuntimeException( ee );
}
}
+
public String getWriteTopicArn() {
try {
- return writeTopicArnMap.get(getName());
-
- } catch (ExecutionException ee) {
- throw new RuntimeException(ee);
+ return writeTopicArnMap.get( getName() );
+ }
+ catch ( ExecutionException ee ) {
+ throw new RuntimeException( ee );
}
}
+
@Override
- public rx.Observable<QueueMessage> getMessages(final int limit,
- final int transactionTimeout,
- final int waitTime,
- final Class klass) {
+ public rx.Observable<QueueMessage> getMessages( final int limit, final int transactionTimeout, final int waitTime,
+ final Class klass ) {
- if (sqs == null) {
- logger.error("SQS is null - was not initialized properly");
+ if ( sqs == null ) {
+ logger.error( "SQS is null - was not initialized properly" );
return rx.Observable.empty();
}
String url = getReadQueue().getUrl();
- if (logger.isDebugEnabled()) logger.debug("Getting up to {} messages from {}", limit, url);
+ if ( logger.isDebugEnabled() ) {
+ logger.debug( "Getting up to {} messages from {}", limit, url );
+ }
- ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(url);
- receiveMessageRequest.setMaxNumberOfMessages(limit);
- receiveMessageRequest.setVisibilityTimeout(Math.max(1, transactionTimeout / 1000));
- receiveMessageRequest.setWaitTimeSeconds(waitTime / 1000);
+ ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest( url );
+ receiveMessageRequest.setMaxNumberOfMessages( limit );
+ receiveMessageRequest.setVisibilityTimeout( Math.max( 1, transactionTimeout / 1000 ) );
+ receiveMessageRequest.setWaitTimeSeconds( waitTime / 1000 );
try {
- ReceiveMessageResult result = sqs.receiveMessage(receiveMessageRequest);
+ ReceiveMessageResult result = sqs.receiveMessage( receiveMessageRequest );
List<Message> messages = result.getMessages();
- if (logger.isDebugEnabled()) logger.debug("Received {} messages from {}", messages.size(), url);
+ if ( logger.isDebugEnabled() ) {
+ logger.debug( "Received {} messages from {}", messages.size(), url );
+ }
+
+ List<QueueMessage> queueMessages = new ArrayList<>( messages.size() );
- List<QueueMessage> queueMessages = new ArrayList<>(messages.size());
+ for ( Message message : messages ) {
- for (Message message : messages) {
- Object body;
+ Object payload;
final String originalBody = message.getBody();
try {
- final JsonNode bodyNode = mapper.readTree(message.getBody());
- JsonNode bodyObj = bodyNode.has("Message") ? bodyNode.get("Message") : bodyNode;
+ final JsonNode bodyNode = mapper.readTree( message.getBody() );
+ /**
+ * When a message originates from SNS it has a "Message" we have to extract
+ * it and then process it seperately
+ */
- final String bodyText = mapper.writeValueAsString( bodyObj );;
+ if ( bodyNode.has( "Message" ) ) {
+ final String snsNode = bodyNode.get( "Message" ).asText();
- body = fromString(bodyText, klass);
- } catch (Exception e) {
- logger.error(String.format("failed to deserialize message: %s", message.getBody()), e);
- throw new RuntimeException(e);
+ payload = deSerializeSQSMessage( snsNode, klass );
+ }
+ else {
+ payload = deSerializeSQSMessage( originalBody, klass );
+ }
+ }
+ catch ( Exception e ) {
+ logger.error( String.format( "failed to deserialize message: %s", message.getBody() ), e );
+ throw new RuntimeException( e );
}
- QueueMessage queueMessage = new QueueMessage(message.getMessageId(), message.getReceiptHandle(), body, message.getAttributes().get("type"));
- queueMessage.setStringBody(originalBody);
- queueMessages.add(queueMessage);
+ QueueMessage queueMessage = new QueueMessage( message.getMessageId(), message.getReceiptHandle(), payload,
+ message.getAttributes().get( "type" ) );
+ queueMessage.setStringBody( originalBody );
+ queueMessages.add( queueMessage );
}
- return rx.Observable.from(queueMessages);
-
- } catch (com.amazonaws.services.sqs.model.QueueDoesNotExistException dne) {
- logger.error(String.format("Queue does not exist! [%s]", url), dne);
- } catch (Exception e) {
- logger.error(String.format("Programming error getting messages from queue=[%s] exist!", url), e);
+ return rx.Observable.from( queueMessages );
+ }
+ catch ( com.amazonaws.services.sqs.model.QueueDoesNotExistException dne ) {
+ logger.error( String.format( "Queue does not exist! [%s]", url ), dne );
+ }
+ catch ( Exception e ) {
+ logger.error( String.format( "Programming error getting messages from queue=[%s] exist!", url ), e );
}
- return rx.Observable.from(new ArrayList<>(0));
+ return rx.Observable.from( new ArrayList<>( 0 ) );
}
+
+ /**
+ * Take a string, possibly escaped via SNS, and run it through our mapper to create an object)
+ */
+ private Object deSerializeSQSMessage( final String message, final Class type ) {
+ try {
+ final Object o = mapper.readValue( message, type );
+ return o;
+ }
+ catch ( Exception e ) {
+ throw new RuntimeException( "Unable to deserialize message " + message + " for class " + type, e );
+ }
+ }
+
+
@Override
public long getQueueDepth() {
String key = "ApproximateNumberOfMessages";
try {
- GetQueueAttributesResult result = sqs.getQueueAttributes(getReadQueue().getUrl(), Collections.singletonList(key));
- String depthString = result.getAttributes().get(key);
- return depthString != null ? Long.parseLong(depthString) : 0;
- }catch (Exception e){
- logger.error("Exception getting queue depth",e);
+ GetQueueAttributesResult result =
+ sqs.getQueueAttributes( getReadQueue().getUrl(), Collections.singletonList( key ) );
+ String depthString = result.getAttributes().get( key );
+ return depthString != null ? Long.parseLong( depthString ) : 0;
+ }
+ catch ( Exception e ) {
+ logger.error( "Exception getting queue depth", e );
return -1;
-
}
}
@Override
public <T extends Serializable> void sendMessageToTopic( final T body ) throws IOException {
- if (snsAsync == null) {
- logger.error("SNS client is null, perhaps it failed to initialize successfully");
- return;
- }
-
- final String stringBody = toString(body);
+ if ( snsAsync == null ) {
+ logger.error( "SNS client is null, perhaps it failed to initialize successfully" );
+ return;
+ }
- String topicArn = getWriteTopicArn();
+ final String stringBody = toString( body );
- if (logger.isDebugEnabled()) logger.debug("Publishing Message...{} to arn: {}", stringBody, topicArn);
+ String topicArn = getWriteTopicArn();
- PublishRequest publishRequest = new PublishRequest(topicArn, stringBody);
+ if ( logger.isDebugEnabled() ) {
+ logger.debug( "Publishing Message...{} to arn: {}", stringBody, topicArn );
+ }
- snsAsync.publishAsync( publishRequest, new AsyncHandler<PublishRequest, PublishResult>() {
- @Override
- public void onError( Exception e ) {
- logger.error( "Error publishing message... {}", e );
- }
+ PublishRequest publishRequest = new PublishRequest( topicArn, stringBody );
+ snsAsync.publishAsync( publishRequest, new AsyncHandler<PublishRequest, PublishResult>() {
+ @Override
+ public void onError( Exception e ) {
+ logger.error( "Error publishing message... {}", e );
+ }
- @Override
- public void onSuccess( PublishRequest request, PublishResult result ) {
- if ( logger.isDebugEnabled() ) logger
- .debug( "Successfully published... messageID=[{}], arn=[{}]", result.getMessageId(),
- request.getTopicArn() );
- }
- } );
+ @Override
+ public void onSuccess( PublishRequest request, PublishResult result ) {
+ if ( logger.isDebugEnabled() ) {
+ logger.debug( "Successfully published... messageID=[{}], arn=[{}]", result.getMessageId(),
+ request.getTopicArn() );
+ }
+ }
+ } );
}
@Override
- public void sendMessages(final List bodies) throws IOException {
+ public void sendMessages( final List bodies ) throws IOException {
- if (snsAsync == null) {
- logger.error("SNS client is null, perhaps it failed to initialize successfully");
+ if ( snsAsync == null ) {
+ logger.error( "SNS client is null, perhaps it failed to initialize successfully" );
return;
}
- for (Object body : bodies) {
- sendMessage((Serializable)body);
+ for ( Object body : bodies ) {
+ sendMessage( ( Serializable ) body );
}
-
}
@@ -545,94 +591,81 @@ public class SNSQueueManagerImpl implements QueueManager {
} );
}
+
@Override
public void deleteQueue() {
- logger.warn("Deleting queue: "+getReadQueue().getUrl());
- sqs.deleteQueue(new DeleteQueueRequest().withQueueUrl(getReadQueue().getUrl()));
- logger.warn("Deleting queue: "+getReadQueue().getUrl()+"_dead");
- sqs.deleteQueue(new DeleteQueueRequest().withQueueUrl(getReadQueue().getUrl()+"_dead"));
-
+ logger.warn( "Deleting queue: " + getReadQueue().getUrl() );
+ sqs.deleteQueue( new DeleteQueueRequest().withQueueUrl( getReadQueue().getUrl() ) );
+ logger.warn( "Deleting queue: " + getReadQueue().getUrl() + "_dead" );
+ sqs.deleteQueue( new DeleteQueueRequest().withQueueUrl( getReadQueue().getUrl() + "_dead" ) );
}
@Override
- public void commitMessage(final QueueMessage queueMessage) {
+ public void commitMessage( final QueueMessage queueMessage ) {
String url = getReadQueue().getUrl();
- if (logger.isDebugEnabled())
- logger.debug("Commit message {} to queue {}", queueMessage.getMessageId(), url);
+ if ( logger.isDebugEnabled() ) {
+ logger.debug( "Commit message {} to queue {}", queueMessage.getMessageId(), url );
+ }
- sqs.deleteMessage(new DeleteMessageRequest()
- .withQueueUrl(url)
- .withReceiptHandle(queueMessage.getHandle()));
+ sqs.deleteMessage(
+ new DeleteMessageRequest().withQueueUrl( url ).withReceiptHandle( queueMessage.getHandle() ) );
}
@Override
- public void commitMessages(final List<QueueMessage> queueMessages) {
+ public void commitMessages( final List<QueueMessage> queueMessages ) {
String url = getReadQueue().getUrl();
- if (logger.isDebugEnabled()) logger.debug("Commit messages {} to queue {}", queueMessages.size(), url);
+ if ( logger.isDebugEnabled() ) {
+ logger.debug( "Commit messages {} to queue {}", queueMessages.size(), url );
+ }
List<DeleteMessageBatchRequestEntry> entries = new ArrayList<>();
- for (QueueMessage message : queueMessages) {
- entries.add(new DeleteMessageBatchRequestEntry(message.getMessageId(), message.getHandle()));
+ for ( QueueMessage message : queueMessages ) {
+ entries.add( new DeleteMessageBatchRequestEntry( message.getMessageId(), message.getHandle() ) );
}
- DeleteMessageBatchRequest request = new DeleteMessageBatchRequest(url, entries);
- DeleteMessageBatchResult result = sqs.deleteMessageBatch(request);
+ DeleteMessageBatchRequest request = new DeleteMessageBatchRequest( url, entries );
+ DeleteMessageBatchResult result = sqs.deleteMessageBatch( request );
boolean successful = result.getFailed().size() <= 0;
- if (!successful) {
- for (BatchResultErrorEntry failed : result.getFailed()) {
- logger.error("Commit failed reason: {} messages id: {}", failed.getMessage(), failed.getId());
+ if ( !successful ) {
+ for ( BatchResultErrorEntry failed : result.getFailed() ) {
+ logger.error( "Commit failed reason: {} messages id: {}", failed.getMessage(), failed.getId() );
}
}
}
/**
- * Read the object from Base64 string.
- */
-
- private Object fromString(final String s, final Class klass)
- throws IOException, ClassNotFoundException {
-
- Object o = mapper.readValue(s, klass);
- return o;
- }
-
- /**
* Write the object to a Base64 string.
*/
- private String toString(final Object o) throws IOException {
- return mapper.writeValueAsString(o);
+ private String toString( final Object o ) throws IOException {
+ return mapper.writeValueAsString( o );
}
/**
* Get the region
- *
- * @return
*/
private Region getRegion() {
- Regions regions = Regions.fromName(fig.getRegion());
- return Region.getRegion(regions);
+ Regions regions = Regions.fromName( fig.getRegion() );
+ return Region.getRegion( regions );
}
/**
* Create the SQS client for the specified settings
*/
- private AmazonSQSClient createSQSClient(final Region region) {
+ private AmazonSQSClient createSQSClient( final Region region ) {
final UsergridAwsCredentialsProvider ugProvider = new UsergridAwsCredentialsProvider();
- final AmazonSQSClient sqs = new AmazonSQSClient(ugProvider.getCredentials());
+ final AmazonSQSClient sqs = new AmazonSQSClient( ugProvider.getCredentials() );
- sqs.setRegion(region);
+ sqs.setRegion( region );
return sqs;
}
-
-
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/04a3f47b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
deleted file mode 100644
index 0c56c05..0000000
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
+++ /dev/null
@@ -1,362 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. The ASF licenses this file to You
- * under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License. For additional information regarding
- * copyright in this work, please see the NOTICE file in the top level
- * directory of this distribution.
- */
-package org.apache.usergrid.persistence.queue.impl;
-
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.*;
-import java.util.concurrent.ExecutionException;
-
-import com.amazonaws.services.sqs.model.*;
-import org.apache.usergrid.persistence.core.guicyfig.ClusterFig;
-import org.apache.usergrid.persistence.queue.util.AmazonNotificationUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.persistence.queue.Queue;
-import org.apache.usergrid.persistence.queue.QueueFig;
-import org.apache.usergrid.persistence.queue.QueueManager;
-import org.apache.usergrid.persistence.queue.QueueMessage;
-import org.apache.usergrid.persistence.queue.QueueScope;
-
-import com.amazonaws.regions.Region;
-import com.amazonaws.regions.Regions;
-import com.amazonaws.services.sqs.AmazonSQSClient;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.dataformat.smile.SmileFactory;
-import com.google.common.base.Preconditions;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.inject.Inject;
-import com.google.inject.assistedinject.Assisted;
-
-public class SQSQueueManagerImpl implements QueueManager {
- private static final Logger logger = LoggerFactory.getLogger(SQSQueueManagerImpl.class);
-
-
- private final QueueScope scope;
- private ObjectMapper mapper;
- protected final QueueFig fig;
- private final ClusterFig clusterFig;
- protected final AmazonSQSClient sqs;
-
- private static SmileFactory smileFactory = new SmileFactory();
-
- private LoadingCache<String, Queue> urlMap = CacheBuilder.newBuilder()
- .maximumSize(1000)
- .build(new CacheLoader<String, Queue>() {
- @Override
- public Queue load(String queueName) throws Exception {
-
- //the amazon client is not thread safe, we need to create one per queue
- Queue queue = null;
-
- try {
-
- GetQueueUrlResult result = sqs.getQueueUrl(queueName);
- queue = new Queue(result.getQueueUrl());
-
- } catch (QueueDoesNotExistException queueDoesNotExistException) {
- //no op, swallow
- logger.error("Queue {} does not exist, creating", queueName);
-
- } catch (Exception e) {
- logger.error("failed to get queue from service", e);
- throw e;
- }
-
- if (queue == null) {
-
- final String deadletterQueueName = String.format("%s_dead", queueName);
- final Map<String, String> deadLetterAttributes = new HashMap<>(2);
-
- deadLetterAttributes.put("MessageRetentionPeriod", fig.getDeadletterRetentionPeriod());
- CreateQueueRequest createDeadLetterQueueRequest = new CreateQueueRequest()
- .withQueueName(deadletterQueueName).withAttributes(deadLetterAttributes);
-
- final CreateQueueResult deadletterResult = sqs.createQueue(createDeadLetterQueueRequest);
- logger.info("Created deadletter queue with url {}", deadletterResult.getQueueUrl());
-
- final String deadletterArn = AmazonNotificationUtils.getQueueArnByName(sqs, deadletterQueueName);
-
- String redrivePolicy = String.format("{\"maxReceiveCount\":\"%s\"," +
- " \"deadLetterTargetArn\":\"%s\"}", fig.getQueueDeliveryLimit(), deadletterArn);
-
- final Map<String, String> queueAttributes = new HashMap<>(2);
- deadLetterAttributes.put("MessageRetentionPeriod", fig.getRetentionPeriod());
- deadLetterAttributes.put("RedrivePolicy", redrivePolicy);
-
- CreateQueueRequest createQueueRequest = new CreateQueueRequest().
- withQueueName(queueName)
- .withAttributes(queueAttributes);
-
- CreateQueueResult result = sqs.createQueue(createQueueRequest);
-
- String url = result.getQueueUrl();
- queue = new Queue(url);
-
- logger.info("Created queue with url {}", url);
- }
-
- return queue;
- }
- });
-
-
- @Inject
- public SQSQueueManagerImpl(@Assisted QueueScope scope, final QueueFig fig, final ClusterFig clusterFig) {
-
- this.scope = scope;
- this.fig = fig;
- this.clusterFig = clusterFig;
- try {
-
- smileFactory.delegateToTextual(true);
- mapper = new ObjectMapper(smileFactory);
- //pretty print, disabling for speed
-// mapper.enable(SerializationFeature.INDENT_OUTPUT);
- mapper.enableDefaultTypingAsProperty(ObjectMapper.DefaultTyping.JAVA_LANG_OBJECT, "@class");
- sqs = createClient();
-
- } catch (Exception e) {
- throw new RuntimeException("Error setting up mapper", e);
- }
- }
-
-
- protected String getName() {
-
- String name = clusterFig.getClusterName() + "_" + scope.getName();
-
- Preconditions.checkArgument(name.length() <= 80, "Your name must be < than 80 characters");
-
- return name;
- }
-
- public Queue getQueue() {
-
- try {
- Queue queue = urlMap.get(getName());
- return queue;
- } catch (ExecutionException ee) {
- throw new RuntimeException(ee);
- }
- }
-
- @Override
- public rx.Observable<QueueMessage> getMessages(final int limit,
- final int transactionTimeout,
- final int waitTime,
- final Class klass) {
-
- if (sqs == null) {
- logger.error("Sqs is null");
- return rx.Observable.empty();
- }
-
- String url = getQueue().getUrl();
-
- if (logger.isDebugEnabled()) logger.debug("Getting Max {} messages from {}", limit, url);
-
- ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(url);
- receiveMessageRequest.setMaxNumberOfMessages(limit);
- receiveMessageRequest.setVisibilityTimeout(transactionTimeout / 1000);
- receiveMessageRequest.setWaitTimeSeconds(waitTime / 1000);
- ReceiveMessageResult result = sqs.receiveMessage(receiveMessageRequest);
- List<Message> messages = result.getMessages();
-
- if (logger.isDebugEnabled()) logger.debug("Received {} messages from {}", messages.size(), url);
-
- List<QueueMessage> queueMessages = new ArrayList<>(messages.size());
-
- for (Message message : messages) {
- Object body;
-
- try {
- body = fromString(message.getBody(), klass);
- } catch (Exception e) {
- logger.error("failed to deserialize message", e);
- throw new RuntimeException(e);
- }
-
- QueueMessage queueMessage = new QueueMessage(message.getMessageId(), message.getReceiptHandle(), body, message.getAttributes().get("type"));
- queueMessage.setStringBody(message.getBody());
- queueMessages.add(queueMessage);
- }
-
- return rx.Observable.from(queueMessages);
- }
-
- @Override
- public long getQueueDepth() {
- String key = "ApproximateNumberOfMessages";
- try {
- GetQueueAttributesResult result = sqs.getQueueAttributes(new GetQueueAttributesRequest().withAttributeNames(key));
- String depthString = result.getAttributes().get(key);
- return depthString != null ? Long.parseLong(depthString) : 0;
- }catch (Exception e){
- logger.error("Exception getting queue depth",e);
- return -1;
-
- }
- }
- @Override
- public void sendMessages(final List bodies) throws IOException {
-
- if (sqs == null) {
- logger.error("Sqs is null");
- return;
- }
- String url = getQueue().getUrl();
-
- if (logger.isDebugEnabled()) logger.debug("Sending Messages...{} to {}", bodies.size(), url);
-
- SendMessageBatchRequest request = new SendMessageBatchRequest(url);
- List<SendMessageBatchRequestEntry> entries = new ArrayList<>(bodies.size());
-
- for (Object body : bodies) {
- SendMessageBatchRequestEntry entry = new SendMessageBatchRequestEntry();
- entry.setId(UUID.randomUUID().toString());
- entry.setMessageBody(toString(body));
- entry.addMessageAttributesEntry("type", new MessageAttributeValue().withStringValue("mytype"));
- entries.add(entry);
- }
-
- request.setEntries(entries);
- sqs.sendMessageBatch(request);
-
- }
-
-
- @Override
- public <T extends Serializable> void sendMessage( final T body ) throws IOException {
-
- if (sqs == null) {
- logger.error("Sqs is null");
- return;
- }
-
- String url = getQueue().getUrl();
-
- if (logger.isDebugEnabled()) logger.debug("Sending Message...{} to {}", body.toString(), url);
-
- final String stringBody = toString(body);
-
- SendMessageRequest request = new SendMessageRequest(url, stringBody);
- sqs.sendMessage(request);
- }
-
-
-
- @Override
- public <T extends Serializable> void sendMessageToTopic( final T body ) throws IOException {
- sendMessage( body );
- }
-
-
-
- @Override
- public void commitMessage(final QueueMessage queueMessage) {
-
- String url = getQueue().getUrl();
- if (logger.isDebugEnabled()) logger.debug("Commit message {} to queue {}", queueMessage.getMessageId(), url);
-
- sqs.deleteMessage(new DeleteMessageRequest()
- .withQueueUrl(url)
- .withReceiptHandle(queueMessage.getHandle()));
- }
-
-
- @Override
- public void commitMessages(final List<QueueMessage> queueMessages) {
-
- String url = getQueue().getUrl();
- if (logger.isDebugEnabled()) logger.debug("Commit messages {} to queue {}", queueMessages.size(), url);
-
- List<DeleteMessageBatchRequestEntry> entries = new ArrayList<>();
-
- for (QueueMessage message : queueMessages) {
- entries.add(new DeleteMessageBatchRequestEntry(message.getMessageId(), message.getHandle()));
- }
-
- DeleteMessageBatchRequest request = new DeleteMessageBatchRequest(url, entries);
- DeleteMessageBatchResult result = sqs.deleteMessageBatch(request);
-
- boolean successful = result.getFailed().size() <= 0;
-
- if (!successful) {
-
- for (BatchResultErrorEntry failed : result.getFailed()) {
- logger.error("Commit failed reason: {} messages id: {}", failed.getMessage(), failed.getId());
- }
- }
- }
-
-
- /**
- * Read the object from Base64 string.
- */
- private Object fromString(final String s,
- final Class klass) throws IOException, ClassNotFoundException {
- Object o = mapper.readValue(s, klass);
- return o;
- }
-
- /**
- * Write the object to a Base64 string.
- */
- protected String toString(final Object o) throws IOException {
- return mapper.writeValueAsString(o);
- }
-
-
- /**
- * Get the region
- *
- * @return
- */
- protected Region getRegion() {
- Regions regions = Regions.fromName(fig.getRegion());
- Region region = Region.getRegion(regions);
- return region;
- }
-
- @Override
- public void deleteQueue() {
- logger.warn("Deleting queue: "+getQueue().getUrl());
- sqs.deleteQueue(new DeleteQueueRequest().withQueueUrl(getQueue().getUrl()));
- }
-
-
-
- /**
- * Create the SQS client for the specified settings
- */
- private AmazonSQSClient createClient() {
- final UsergridAwsCredentialsProvider ugProvider = new UsergridAwsCredentialsProvider();
- final AmazonSQSClient sqs = new AmazonSQSClient(ugProvider.getCredentials());
- final Region region = getRegion();
- sqs.setRegion(region);
-
- return sqs;
- }
-
-
-}