You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/10/19 23:19:46 UTC
[3/8] usergrid git commit: Merge branch 'refs/heads/2.1-release' into
USERGRID-1048
Merge branch 'refs/heads/2.1-release' into USERGRID-1048
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/3e155852
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/3e155852
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/3e155852
Branch: refs/heads/remove-inmemory-event-service
Commit: 3e1558524728c96834cfd66d9e53e3f1b6a7d3d6
Parents: 04a3f47 a09485a
Author: Todd Nine <tn...@apigee.com>
Authored: Mon Oct 19 13:30:04 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Mon Oct 19 13:30:04 2015 -0600
----------------------------------------------------------------------
.../asyncevents/AmazonAsyncEventService.java | 149 ++++++++++---------
.../asyncevents/AsyncIndexProvider.java | 26 ++--
.../asyncevents/model/AsyncEvent.java | 14 +-
.../asyncevents/model/EdgeDeleteEvent.java | 6 +-
.../asyncevents/model/EdgeIndexEvent.java | 9 +-
.../asyncevents/model/EntityDeleteEvent.java | 8 +-
.../asyncevents/model/EntityIndexEvent.java | 6 +-
.../model/InitializeApplicationIndexEvent.java | 4 +-
.../index/AmazonAsyncEventServiceTest.java | 6 +-
.../cache/CachedEntityCollectionManager.java | 147 ------------------
.../EntityCollectionManagerFactoryImpl.java | 6 -
.../usergrid/persistence/queue/QueueFig.java | 2 +-
.../queue/impl/SNSQueueManagerImpl.java | 8 +-
13 files changed, 135 insertions(+), 256 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3e155852/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --cc stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index d319ac8,c198674..f8ef5e7
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@@ -97,7 -84,9 +100,8 @@@ public class AmazonAsyncEventService im
public static final String QUEUE_NAME = "index"; //keep this short as AWS limits queue name size to 80 chars
private final QueueManager queue;
- private final QueueScope queueScope;
private final IndexProcessorFig indexProcessorFig;
+ private final QueueFig queueFig;
private final IndexProducer indexProducer;
private final EntityCollectionManagerFactory entityCollectionManagerFactory;
private final IndexLocationStrategyFactory indexLocationStrategyFactory;
@@@ -125,33 -113,28 +129,35 @@@
@Inject
- public AmazonAsyncEventService(final QueueManagerFactory queueManagerFactory,
- final IndexProcessorFig indexProcessorFig,
- final IndexProducer indexProducer,
- final MetricsFactory metricsFactory,
- final EntityCollectionManagerFactory entityCollectionManagerFactory,
- final IndexLocationStrategyFactory indexLocationStrategyFactory,
- final EntityIndexFactory entityIndexFactory,
- final EventBuilder eventBuilder,
- final RxTaskScheduler rxTaskScheduler,
- QueueFig queueFig) {
+ public AmazonAsyncEventService( final QueueManagerFactory queueManagerFactory,
+ final IndexProcessorFig indexProcessorFig,
+ final IndexProducer indexProducer,
+ final MetricsFactory metricsFactory,
+ final EntityCollectionManagerFactory entityCollectionManagerFactory,
+ final IndexLocationStrategyFactory indexLocationStrategyFactory,
+ final EntityIndexFactory entityIndexFactory,
+ final EventBuilder eventBuilder,
+ final MapManagerFactory mapManagerFactory,
++ final QueueFig queueFig,
+ final RxTaskScheduler rxTaskScheduler ) {
this.indexProducer = indexProducer;
this.entityCollectionManagerFactory = entityCollectionManagerFactory;
this.indexLocationStrategyFactory = indexLocationStrategyFactory;
this.entityIndexFactory = entityIndexFactory;
this.eventBuilder = eventBuilder;
+
+ final MapScope mapScope = new MapScopeImpl( CpNamingUtils.getManagementApplicationId(), "indexEvents");
+
+ this.esMapPersistence = mapManagerFactory.createMapManager( mapScope );
+
this.rxTaskScheduler = rxTaskScheduler;
- this.queueScope = new QueueScopeImpl(QUEUE_NAME, QueueScope.RegionImplementation.ALL);
+ QueueScope queueScope = new QueueScopeImpl(QUEUE_NAME, QueueScope.RegionImplementation.ALL);
this.queue = queueManagerFactory.getQueueManager(queueScope);
+
this.indexProcessorFig = indexProcessorFig;
+ this.queueFig = queueFig;
this.writeTimer = metricsFactory.getTimer(AmazonAsyncEventService.class, "async_event.write");
this.readTimer = metricsFactory.getTimer(AmazonAsyncEventService.class, "async_event.read");
@@@ -271,70 -259,64 +277,78 @@@
logger.debug("callEventHandlers with {} message", messages.size());
}
-- Stream<IndexEventResult> indexEventResults = messages.stream()
-- .map(message -> {
-- AsyncEvent event = null;
-- try {
-- event = (AsyncEvent) message.getBody();
-- } catch (ClassCastException cce) {
-- logger.error("Failed to deserialize message body", cce);
++ Stream<IndexEventResult> indexEventResults = messages.stream().map( message -> {
++ AsyncEvent event = null;
++ try {
++ event = ( AsyncEvent ) message.getBody();
++ }
++ catch ( ClassCastException cce ) {
++ logger.error( "Failed to deserialize message body", cce );
++ }
++
++ if ( event == null ) {
++ logger.error( "AsyncEvent type or event is null!" );
++ return new IndexEventResult( Optional.fromNullable( message ), Optional.<IndexOperationMessage>absent(),
++ System.currentTimeMillis() );
++ }
++
++ final AsyncEvent thisEvent = event;
++ if ( logger.isDebugEnabled() ) {
++ logger.debug( "Processing {} event", event );
++ }
++
++ try {
++ //check for empty sets if this is true
++ boolean validateEmptySets = true;
++ Observable<IndexOperationMessage> indexoperationObservable;
++ //merge each operation to a master observable;
++ if ( event instanceof EdgeDeleteEvent ) {
++ indexoperationObservable = handleEdgeDelete( message );
++ }
++ else if ( event instanceof EdgeIndexEvent ) {
++ indexoperationObservable = handleEdgeIndex( message );
++ }
++ else if ( event instanceof EntityDeleteEvent ) {
++ indexoperationObservable = handleEntityDelete( message );
++ }
++ else if ( event instanceof EntityIndexEvent ) {
++ indexoperationObservable = handleEntityIndexUpdate( message );
++ }
++ else if ( event instanceof InitializeApplicationIndexEvent ) {
++ //does not return observable
++ handleInitializeApplicationIndex( event, message );
++ indexoperationObservable = Observable.just( new IndexOperationMessage() );
++ validateEmptySets = false; //do not check this one for an empty set b/c it will be empty.
++ }
++ else if ( event instanceof ElasticsearchIndexEvent ) {
++ handleIndexOperation( ( ElasticsearchIndexEvent ) event );
++ indexoperationObservable = Observable.just( new IndexOperationMessage() );
++ validateEmptySets = false; //do not check this one for an empty set b/c it will be empty.
}
-- if (event == null) {
-- logger.error("AsyncEvent type or event is null!");
-- return new IndexEventResult(Optional.fromNullable(message), Optional.<IndexOperationMessage>absent(), System.currentTimeMillis());
++ else {
++ throw new Exception( "Unknown EventType" );//TODO: print json instead
}
-- final AsyncEvent thisEvent = event;
-- if (logger.isDebugEnabled()) {
-- logger.debug("Processing {} event", event);
++ //collect all of the
++ IndexOperationMessage indexOperationMessage = indexoperationObservable
++ .collect( () -> new IndexOperationMessage(), ( collector, single ) -> collector.ingest( single ) )
++ .toBlocking().lastOrDefault( null );
++
++ if ( validateEmptySets && ( indexOperationMessage == null || indexOperationMessage.isEmpty() ) ) {
++ logger.error( "Received empty index sequence message:({}), body:({}) ", message.getMessageId(),
++ message.getStringBody() );
++ throw new Exception( "Received empty index sequence." );
}
-- try {
-- //check for empty sets if this is true
-- boolean validateEmptySets = true;
-- Observable<IndexOperationMessage> indexoperationObservable;
-- //merge each operation to a master observable;
-- if (event instanceof EdgeDeleteEvent) {
-- indexoperationObservable = handleEdgeDelete(message);
-- } else if (event instanceof EdgeIndexEvent) {
-- indexoperationObservable = handleEdgeIndex(message);
-- } else if (event instanceof EntityDeleteEvent) {
-- indexoperationObservable = handleEntityDelete(message);
-- } else if (event instanceof EntityIndexEvent) {
-- indexoperationObservable = handleEntityIndexUpdate(message);
-- } else if (event instanceof InitializeApplicationIndexEvent) {
-- //does not return observable
-- handleInitializeApplicationIndex(event, message);
-- indexoperationObservable = Observable.just(new IndexOperationMessage());
- validateEmptySets = false; //do not check this one for an empty set b/c it will be empty.
- } else if (event instanceof ElasticsearchIndexEvent){
- handleIndexOperation( (ElasticsearchIndexEvent)event );
- indexoperationObservable = Observable.just( new IndexOperationMessage() );
-- validateEmptySets = false; //do not check this one for an empty set b/c it will be empty.
- }
-
- else {
- } else {
-- throw new Exception("Unknown EventType");//TODO: print json instead
-- }
--
-- //collect all of the
-- IndexOperationMessage indexOperationMessage =
-- indexoperationObservable
-- .collect(() -> new IndexOperationMessage(), (collector, single) -> collector.ingest(single))
-- .toBlocking().lastOrDefault(null);
--
-- if (validateEmptySets && (indexOperationMessage == null || indexOperationMessage.isEmpty())) {
-- logger.error("Received empty index sequence message:({}), body:({}) ",
-- message.getMessageId(), message.getStringBody());
-- throw new Exception("Received empty index sequence.");
-- }
--
-- //return type that can be indexed and ack'd later
-- return new IndexEventResult(Optional.fromNullable(message), Optional.fromNullable(indexOperationMessage), thisEvent.getCreationTime());
-- } catch (Exception e) {
-- logger.error("Failed to index message: " + message.getMessageId(), message.getStringBody(), e);
-- return new IndexEventResult(Optional.absent(), Optional.<IndexOperationMessage>absent(), event.getCreationTime());
++ //return type that can be indexed and ack'd later
++ return new IndexEventResult( Optional.fromNullable( message ),
++ Optional.fromNullable( indexOperationMessage ), thisEvent.getCreationTime() );
++ }
++ catch ( Exception e ) {
++ logger.error( "Failed to index message: " + message.getMessageId(), message.getStringBody(), e );
++ return new IndexEventResult( Optional.absent(), Optional.<IndexOperationMessage>absent(),
++ event.getCreationTime());
}
});
@@@ -346,8 -328,7 +360,7 @@@
public void queueInitializeApplicationIndex( final ApplicationScope applicationScope) {
IndexLocationStrategy indexLocationStrategy = indexLocationStrategyFactory.getIndexLocationStrategy(
applicationScope );
- offerTopic(
- new InitializeApplicationIndexEvent( new ReplicatedIndexLocationStrategy( indexLocationStrategy ) ) );
- offer(new InitializeApplicationIndexEvent(queueFig.getPrimaryRegion(), new ReplicatedIndexLocationStrategy(indexLocationStrategy)));
++ offerTopic(new InitializeApplicationIndexEvent(queueFig.getPrimaryRegion(), new ReplicatedIndexLocationStrategy(indexLocationStrategy)));
}
@@@ -413,8 -394,8 +426,8 @@@
final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager( applicationScope );
-- final Observable<IndexOperationMessage> edgeIndexObservable = ecm.load(edgeIndexEvent.getEntityId()).flatMap(entity -> eventBuilder.buildNewEdge(
-- applicationScope, entity, edge));
++ final Observable<IndexOperationMessage> edgeIndexObservable = ecm.load( edgeIndexEvent.getEntityId() ).flatMap(
++ entity -> eventBuilder.buildNewEdge( applicationScope, entity, edge));
return edgeIndexObservable;
}
@@@ -450,84 -431,9 +463,84 @@@
@Override
public void queueEntityDelete(final ApplicationScope applicationScope, final Id entityId) {
- offer( new EntityDeleteEvent( new EntityIdScope( applicationScope, entityId ) ) );
+ offer( new EntityDeleteEvent(queueFig.getPrimaryRegion(), new EntityIdScope( applicationScope, entityId ) ) );
}
+
+ /**
+ * Queue up an indexOperationMessage for multi region execution
+ * @param indexOperationMessage
+ */
+ public void queueIndexOperationMessage( final IndexOperationMessage indexOperationMessage ) {
+
+ final String jsonValue = ObjectJsonSerializer.INSTANCE.toString( indexOperationMessage );
+
+ final UUID newMessageId = UUIDGenerator.newTimeUUID();
+
+ //write to the map in ES
+ esMapPersistence.putString( newMessageId.toString(), jsonValue, indexProcessorFig.getIndexMessageTtl() );
+
+
+
+ //now queue up the index message
+
+ final ElasticsearchIndexEvent elasticsearchIndexEvent = new ElasticsearchIndexEvent( newMessageId );
+
+ //send to the topic so all regions index the batch
+
+ offerTopic( elasticsearchIndexEvent );
+ }
+
+ public void handleIndexOperation(final ElasticsearchIndexEvent elasticsearchIndexEvent){
+ Preconditions.checkNotNull( elasticsearchIndexEvent, "elasticsearchIndexEvent cannot be null" );
+
+ final UUID messageId = elasticsearchIndexEvent.getIndexBatchId();
+
+ Preconditions.checkNotNull( messageId, "messageId must not be null" );
+
+
+ //load the entity
+
+ final String message = esMapPersistence.getString( messageId.toString() );
+
+ String highConsistency = null;
+
+ if(message == null){
+ logger.error( "Received message with id {} to process, unable to find it, reading with higher consistency level" );
+
+ highConsistency = esMapPersistence.getStringHighConsistency( messageId.toString() );
+
+ }
+
+ //read the value from the string
+
+ final IndexOperationMessage indexOperationMessage;
+
+ //our original local read has it, parse it.
+ if(message != null){
+ indexOperationMessage = ObjectJsonSerializer.INSTANCE.fromString( message, IndexOperationMessage.class );
+ }
+ //we tried to read it at a higher consistency level and it works
+ else if (highConsistency != null){
+ indexOperationMessage = ObjectJsonSerializer.INSTANCE.fromString( highConsistency, IndexOperationMessage.class );
+ }
+
+ //we couldn't find it, bail
+ else{
+ logger.error( "Unable to find the ES batch with id {} to process at a higher consistency level" );
+
+ throw new RuntimeException( "Unable to find the ES batch to process with message id " + messageId );
+ }
+
+
+
+ //now execute it
+ indexProducer.put(indexOperationMessage).toBlocking().last();
+
+ }
+
+
+
@Override
public long getQueueDepth() {
return queue.getQueueDepth();
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3e155852/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
----------------------------------------------------------------------
diff --cc stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
index 3865ecb,8b44714..1649046
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
@@@ -28,7 -27,7 +27,8 @@@ import org.apache.usergrid.persistence.
import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
import org.apache.usergrid.persistence.index.EntityIndexFactory;
import org.apache.usergrid.persistence.index.impl.IndexProducer;
+import org.apache.usergrid.persistence.map.MapManagerFactory;
+ import org.apache.usergrid.persistence.queue.QueueFig;
import org.apache.usergrid.persistence.queue.QueueManagerFactory;
import com.google.inject.Inject;
@@@ -52,18 -51,21 +52,24 @@@ public class AsyncIndexProvider impleme
private final IndexLocationStrategyFactory indexLocationStrategyFactory;
private final EntityIndexFactory entityIndexFactory;
private final IndexProducer indexProducer;
+ private final MapManagerFactory mapManagerFactory;
+ private final QueueFig queueFig;
private AsyncEventService asyncEventService;
@Inject
- public AsyncIndexProvider( final IndexProcessorFig indexProcessorFig, final QueueManagerFactory queueManagerFactory,
- final MetricsFactory metricsFactory, final RxTaskScheduler rxTaskScheduler, final
- EntityCollectionManagerFactory entityCollectionManagerFactory,
- final EventBuilder eventBuilder, final IndexLocationStrategyFactory indexLocationStrategyFactory,
- final EntityIndexFactory entityIndexFactory, final IndexProducer indexProducer,
- final MapManagerFactory mapManagerFactory ) {
+ public AsyncIndexProvider(final IndexProcessorFig indexProcessorFig,
+ final QueueManagerFactory queueManagerFactory,
+ final MetricsFactory metricsFactory,
+ final RxTaskScheduler rxTaskScheduler,
+ final EntityCollectionManagerFactory entityCollectionManagerFactory,
+ final EventBuilder eventBuilder,
+ final IndexLocationStrategyFactory indexLocationStrategyFactory,
+ final EntityIndexFactory entityIndexFactory,
- final IndexProducer indexProducer, QueueFig queueFig) {
++ final IndexProducer indexProducer,
++ final MapManagerFactory mapManagerFactory,
++ final QueueFig queueFig) {
this.indexProcessorFig = indexProcessorFig;
this.queueManagerFactory = queueManagerFactory;
@@@ -74,7 -76,7 +80,8 @@@
this.indexLocationStrategyFactory = indexLocationStrategyFactory;
this.entityIndexFactory = entityIndexFactory;
this.indexProducer = indexProducer;
+ this.mapManagerFactory = mapManagerFactory;
+ this.queueFig = queueFig;
}
@@@ -98,11 -100,11 +105,10 @@@
case LOCAL:
return new InMemoryAsyncEventService(eventBuilder, rxTaskScheduler, indexProducer,indexProcessorFig.resolveSynchronously());
case SQS:
-- return new AmazonAsyncEventService(queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory,
- entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory, eventBuilder, mapManagerFactory, rxTaskScheduler );
- entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory, eventBuilder, rxTaskScheduler,queueFig );
++ throw new IllegalArgumentException("Configuration value of SQS is no longer allowed. Use SNS instead");
case SNS:
return new AmazonAsyncEventService(queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory,
- entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory, eventBuilder, mapManagerFactory, rxTaskScheduler );
- entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory, eventBuilder, rxTaskScheduler, queueFig);
++ entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory, eventBuilder, rxTaskScheduler );
default:
throw new IllegalArgumentException("Configuration value of " + getErrorValues() + " are allowed");
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3e155852/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3e155852/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
----------------------------------------------------------------------
diff --cc stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
index e83d6f8,5b921d9..8ee47a2
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
@@@ -89,7 -89,7 +93,7 @@@ public class AmazonAsyncEventServiceTes
@Override
protected AsyncEventService getAsyncEventService() {
- return new AmazonAsyncEventService( queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory, entityCollectionManagerFactory, indexLocationStrategyFactory, entityIndexFactory, eventBuilder, mapManagerFactory, rxTaskScheduler );
- return new AmazonAsyncEventService( queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory, entityCollectionManagerFactory, indexLocationStrategyFactory, entityIndexFactory, eventBuilder, rxTaskScheduler, queueFig );
++ return new AmazonAsyncEventService( queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory, entityCollectionManagerFactory, indexLocationStrategyFactory, entityIndexFactory, eventBuilder, rxTaskScheduler );
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/3e155852/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
index a3fa05e,5ab1a4b..58b2a4d
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
@@@ -226,44 -171,43 +226,44 @@@ public class SNSQueueManagerImpl implem
String multiRegion = fig.getRegionList();
- if (logger.isDebugEnabled())
- logger.debug("MultiRegion Setup specified, regions: [{}]", multiRegion);
+ if ( logger.isDebugEnabled() ) {
+ logger.debug( "MultiRegion Setup specified, regions: [{}]", multiRegion );
+ }
- String[] regionNames = multiRegion.split(",");
+ String[] regionNames = multiRegion.split( "," );
- final Map<String, String> arrQueueArns = new HashMap<>(regionNames.length + 1);
- final Map<String, String> topicArns = new HashMap<>(regionNames.length + 1);
+ final Map<String, String> arrQueueArns = new HashMap<>( regionNames.length + 1 );
+ final Map<String, String> topicArns = new HashMap<>( regionNames.length + 1 );
- arrQueueArns.put( primaryQueueArn, fig.getRegion() );
- topicArns.put( primaryTopicArn, fig.getRegion() );
+ arrQueueArns.put(primaryQueueArn, fig.getPrimaryRegion());
+ topicArns.put(primaryTopicArn, fig.getPrimaryRegion());
- for (String regionName : regionNames) {
+ for ( String regionName : regionNames ) {
regionName = regionName.trim();
- Regions regions = Regions.fromName(regionName);
- Region region = Region.getRegion(regions);
+ Regions regions = Regions.fromName( regionName );
+ Region region = Region.getRegion( regions );
- AmazonSQSClient sqsClient = createSQSClient(region);
- AmazonSNSClient snsClient = createSNSClient(region); // do this stuff synchronously
+ AmazonSQSClient sqsClient = createSQSClient( region );
+ AmazonSNSClient snsClient = createSNSClient( region ); // do this stuff synchronously
// getTopicArn will create the SNS topic if it doesn't exist
- String topicArn = AmazonNotificationUtils.getTopicArn(snsClient, queueName, true);
- topicArns.put(topicArn, regionName);
+ String topicArn = AmazonNotificationUtils.getTopicArn( snsClient, queueName, true );
+ topicArns.put( topicArn, regionName );
// create the SQS queue if it doesn't exist
- String queueArn = AmazonNotificationUtils.getQueueArnByName(sqsClient, queueName);
- if (queueArn == null) {
- queueUrl = AmazonNotificationUtils.createQueue(sqsClient, queueName, fig);
- queueArn = AmazonNotificationUtils.getQueueArnByUrl(sqsClient, queueUrl);
+ String queueArn = AmazonNotificationUtils.getQueueArnByName( sqsClient, queueName );
+ if ( queueArn == null ) {
+ queueUrl = AmazonNotificationUtils.createQueue( sqsClient, queueName, fig );
+ queueArn = AmazonNotificationUtils.getQueueArnByUrl( sqsClient, queueUrl );
}
- arrQueueArns.put(queueArn, regionName);
+ arrQueueArns.put( queueArn, regionName );
}
- logger.debug("Creating Subscriptions...");
+ logger.debug( "Creating Subscriptions..." );
- for (Map.Entry<String, String> queueArnEntry : arrQueueArns.entrySet()) {
+ for ( Map.Entry<String, String> queueArnEntry : arrQueueArns.entrySet() ) {
String queueARN = queueArnEntry.getKey();
String strSqsRegion = queueArnEntry.getValue();
@@@ -650,10 -519,12 +650,10 @@@
/**
* Get the region
- *
- * @return
*/
private Region getRegion() {
- Regions regions = Regions.fromName( fig.getRegion() );
- return Region.getRegion( regions );
+ Regions regions = Regions.fromName(fig.getPrimaryRegion());
+ return Region.getRegion(regions);
}