You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/10/23 22:16:26 UTC

[03/18] usergrid git commit: Merge branch 'refs/heads/2.1-release' into USERGRID-1048

Merge branch 'refs/heads/2.1-release' into USERGRID-1048


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/3e155852
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/3e155852
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/3e155852

Branch: refs/heads/USERGRID-1052
Commit: 3e1558524728c96834cfd66d9e53e3f1b6a7d3d6
Parents: 04a3f47 a09485a
Author: Todd Nine <tn...@apigee.com>
Authored: Mon Oct 19 13:30:04 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Mon Oct 19 13:30:04 2015 -0600

----------------------------------------------------------------------
 .../asyncevents/AmazonAsyncEventService.java    | 149 ++++++++++---------
 .../asyncevents/AsyncIndexProvider.java         |  26 ++--
 .../asyncevents/model/AsyncEvent.java           |  14 +-
 .../asyncevents/model/EdgeDeleteEvent.java      |   6 +-
 .../asyncevents/model/EdgeIndexEvent.java       |   9 +-
 .../asyncevents/model/EntityDeleteEvent.java    |   8 +-
 .../asyncevents/model/EntityIndexEvent.java     |   6 +-
 .../model/InitializeApplicationIndexEvent.java  |   4 +-
 .../index/AmazonAsyncEventServiceTest.java      |   6 +-
 .../cache/CachedEntityCollectionManager.java    | 147 ------------------
 .../EntityCollectionManagerFactoryImpl.java     |   6 -
 .../usergrid/persistence/queue/QueueFig.java    |   2 +-
 .../queue/impl/SNSQueueManagerImpl.java         |   8 +-
 13 files changed, 135 insertions(+), 256 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/3e155852/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --cc stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index d319ac8,c198674..f8ef5e7
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@@ -97,7 -84,9 +100,8 @@@ public class AmazonAsyncEventService im
      public static final String QUEUE_NAME = "index"; //keep this short as AWS limits queue name size to 80 chars
  
      private final QueueManager queue;
 -    private final QueueScope queueScope;
      private final IndexProcessorFig indexProcessorFig;
+     private final QueueFig queueFig;
      private final IndexProducer indexProducer;
      private final EntityCollectionManagerFactory entityCollectionManagerFactory;
      private final IndexLocationStrategyFactory indexLocationStrategyFactory;
@@@ -125,33 -113,28 +129,35 @@@
  
  
      @Inject
 -    public AmazonAsyncEventService(final QueueManagerFactory queueManagerFactory,
 -                                   final IndexProcessorFig indexProcessorFig,
 -                                   final IndexProducer indexProducer,
 -                                   final MetricsFactory metricsFactory,
 -                                   final EntityCollectionManagerFactory entityCollectionManagerFactory,
 -                                   final IndexLocationStrategyFactory indexLocationStrategyFactory,
 -                                   final EntityIndexFactory entityIndexFactory,
 -                                   final EventBuilder eventBuilder,
 -                                   final RxTaskScheduler rxTaskScheduler,
 -                                   QueueFig queueFig) {
 +    public AmazonAsyncEventService( final QueueManagerFactory queueManagerFactory,
 +                                    final IndexProcessorFig indexProcessorFig,
 +                                    final IndexProducer indexProducer,
 +                                    final MetricsFactory metricsFactory,
 +                                    final EntityCollectionManagerFactory entityCollectionManagerFactory,
 +                                    final IndexLocationStrategyFactory indexLocationStrategyFactory,
 +                                    final EntityIndexFactory entityIndexFactory,
 +                                    final EventBuilder eventBuilder,
 +                                    final MapManagerFactory mapManagerFactory,
++                                    final QueueFig queueFig,
 +                                    final RxTaskScheduler rxTaskScheduler ) {
          this.indexProducer = indexProducer;
  
          this.entityCollectionManagerFactory = entityCollectionManagerFactory;
          this.indexLocationStrategyFactory = indexLocationStrategyFactory;
          this.entityIndexFactory = entityIndexFactory;
          this.eventBuilder = eventBuilder;
 +
 +        final MapScope mapScope = new MapScopeImpl( CpNamingUtils.getManagementApplicationId(),  "indexEvents");
 +
 +        this.esMapPersistence = mapManagerFactory.createMapManager( mapScope );
 +
          this.rxTaskScheduler = rxTaskScheduler;
  
 -        this.queueScope = new QueueScopeImpl(QUEUE_NAME, QueueScope.RegionImplementation.ALL);
 +        QueueScope queueScope = new QueueScopeImpl(QUEUE_NAME, QueueScope.RegionImplementation.ALL);
          this.queue = queueManagerFactory.getQueueManager(queueScope);
 +
          this.indexProcessorFig = indexProcessorFig;
+         this.queueFig = queueFig;
  
          this.writeTimer = metricsFactory.getTimer(AmazonAsyncEventService.class, "async_event.write");
          this.readTimer = metricsFactory.getTimer(AmazonAsyncEventService.class, "async_event.read");
@@@ -271,70 -259,64 +277,78 @@@
              logger.debug("callEventHandlers with {} message", messages.size());
          }
  
--        Stream<IndexEventResult> indexEventResults = messages.stream()
--            .map(message -> {
--                AsyncEvent event = null;
--                try {
--                    event = (AsyncEvent) message.getBody();
--                } catch (ClassCastException cce) {
--                    logger.error("Failed to deserialize message body", cce);
++        Stream<IndexEventResult> indexEventResults = messages.stream().map( message -> {
++            AsyncEvent event = null;
++            try {
++                event = ( AsyncEvent ) message.getBody();
++            }
++            catch ( ClassCastException cce ) {
++                logger.error( "Failed to deserialize message body", cce );
++            }
++
++            if ( event == null ) {
++                logger.error( "AsyncEvent type or event is null!" );
++                return new IndexEventResult( Optional.fromNullable( message ), Optional.<IndexOperationMessage>absent(),
++                    System.currentTimeMillis() );
++            }
++
++            final AsyncEvent thisEvent = event;
++            if ( logger.isDebugEnabled() ) {
++                logger.debug( "Processing {} event", event );
++            }
++
++            try {
++                //check for empty sets if this is true
++                boolean validateEmptySets = true;
++                Observable<IndexOperationMessage> indexoperationObservable;
++                //merge each operation to a master observable;
++                if ( event instanceof EdgeDeleteEvent ) {
++                    indexoperationObservable = handleEdgeDelete( message );
++                }
++                else if ( event instanceof EdgeIndexEvent ) {
++                    indexoperationObservable = handleEdgeIndex( message );
++                }
++                else if ( event instanceof EntityDeleteEvent ) {
++                    indexoperationObservable = handleEntityDelete( message );
++                }
++                else if ( event instanceof EntityIndexEvent ) {
++                    indexoperationObservable = handleEntityIndexUpdate( message );
++                }
++                else if ( event instanceof InitializeApplicationIndexEvent ) {
++                    //does not return observable
++                    handleInitializeApplicationIndex( event, message );
++                    indexoperationObservable = Observable.just( new IndexOperationMessage() );
++                    validateEmptySets = false; //do not check this one for an empty set b/c it will be empty.
++                }
++                else if ( event instanceof ElasticsearchIndexEvent ) {
++                    handleIndexOperation( ( ElasticsearchIndexEvent ) event );
++                    indexoperationObservable = Observable.just( new IndexOperationMessage() );
++                    validateEmptySets = false; //do not check this one for an empty set b/c it will be empty.
                  }
  
--                if (event == null) {
--                    logger.error("AsyncEvent type or event is null!");
--                    return new IndexEventResult(Optional.fromNullable(message), Optional.<IndexOperationMessage>absent(), System.currentTimeMillis());
++                else {
++                    throw new Exception( "Unknown EventType" );//TODO: print json instead
                  }
  
--                final AsyncEvent thisEvent = event;
--                if (logger.isDebugEnabled()) {
--                    logger.debug("Processing {} event", event);
++                //collect all of the
++                IndexOperationMessage indexOperationMessage = indexoperationObservable
++                    .collect( () -> new IndexOperationMessage(), ( collector, single ) -> collector.ingest( single ) )
++                    .toBlocking().lastOrDefault( null );
++
++                if ( validateEmptySets && ( indexOperationMessage == null || indexOperationMessage.isEmpty() ) ) {
++                    logger.error( "Received empty index sequence message:({}), body:({}) ", message.getMessageId(),
++                        message.getStringBody() );
++                    throw new Exception( "Received empty index sequence." );
                  }
  
--                try {
--                    //check for empty sets if this is true
--                    boolean validateEmptySets = true;
--                    Observable<IndexOperationMessage> indexoperationObservable;
--                    //merge each operation to a master observable;
--                    if (event instanceof EdgeDeleteEvent) {
--                        indexoperationObservable = handleEdgeDelete(message);
--                    } else if (event instanceof EdgeIndexEvent) {
--                        indexoperationObservable = handleEdgeIndex(message);
--                    } else if (event instanceof EntityDeleteEvent) {
--                        indexoperationObservable = handleEntityDelete(message);
--                    } else if (event instanceof EntityIndexEvent) {
--                        indexoperationObservable = handleEntityIndexUpdate(message);
--                    } else if (event instanceof InitializeApplicationIndexEvent) {
--                        //does not return observable
--                        handleInitializeApplicationIndex(event, message);
--                        indexoperationObservable = Observable.just(new IndexOperationMessage());
-                         validateEmptySets = false; //do not check this one for an empty set b/c it will be empty.
-                     } else if (event instanceof ElasticsearchIndexEvent){
-                         handleIndexOperation( (ElasticsearchIndexEvent)event );
-                         indexoperationObservable = Observable.just( new IndexOperationMessage() );
--                        validateEmptySets = false; //do not check this one for an empty set b/c it will be empty.
-                     }
- 
-                     else {
 -                    } else {
--                        throw new Exception("Unknown EventType");//TODO: print json instead
--                    }
--
--                    //collect all of the
--                    IndexOperationMessage indexOperationMessage =
--                        indexoperationObservable
--                            .collect(() -> new IndexOperationMessage(), (collector, single) -> collector.ingest(single))
--                            .toBlocking().lastOrDefault(null);
--
--                    if (validateEmptySets && (indexOperationMessage == null || indexOperationMessage.isEmpty())) {
--                        logger.error("Received empty index sequence message:({}), body:({}) ",
--                            message.getMessageId(), message.getStringBody());
--                        throw new Exception("Received empty index sequence.");
--                    }
--
--                    //return type that can be indexed and ack'd later
--                    return new IndexEventResult(Optional.fromNullable(message), Optional.fromNullable(indexOperationMessage), thisEvent.getCreationTime());
--                } catch (Exception e) {
--                    logger.error("Failed to index message: " + message.getMessageId(), message.getStringBody(), e);
--                    return new IndexEventResult(Optional.absent(), Optional.<IndexOperationMessage>absent(), event.getCreationTime());
++                //return type that can be indexed and ack'd later
++                return new IndexEventResult( Optional.fromNullable( message ),
++                    Optional.fromNullable( indexOperationMessage ), thisEvent.getCreationTime() );
++            }
++            catch ( Exception e ) {
++                logger.error( "Failed to index message: " + message.getMessageId(), message.getStringBody(), e );
++                return new IndexEventResult( Optional.absent(), Optional.<IndexOperationMessage>absent(),
++                    event.getCreationTime());
                  }
              });
  
@@@ -346,8 -328,7 +360,7 @@@
      public void queueInitializeApplicationIndex( final ApplicationScope applicationScope) {
          IndexLocationStrategy indexLocationStrategy = indexLocationStrategyFactory.getIndexLocationStrategy(
              applicationScope );
-         offerTopic(
-             new InitializeApplicationIndexEvent( new ReplicatedIndexLocationStrategy( indexLocationStrategy ) ) );
 -        offer(new InitializeApplicationIndexEvent(queueFig.getPrimaryRegion(), new ReplicatedIndexLocationStrategy(indexLocationStrategy)));
++        offerTopic(new InitializeApplicationIndexEvent(queueFig.getPrimaryRegion(), new ReplicatedIndexLocationStrategy(indexLocationStrategy)));
      }
  
  
@@@ -413,8 -394,8 +426,8 @@@
  
          final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager( applicationScope );
  
--        final Observable<IndexOperationMessage> edgeIndexObservable = ecm.load(edgeIndexEvent.getEntityId()).flatMap(entity -> eventBuilder.buildNewEdge(
--            applicationScope, entity, edge));
++        final Observable<IndexOperationMessage> edgeIndexObservable = ecm.load( edgeIndexEvent.getEntityId() ).flatMap(
++            entity -> eventBuilder.buildNewEdge( applicationScope, entity, edge));
          return edgeIndexObservable;
      }
  
@@@ -450,84 -431,9 +463,84 @@@
      @Override
      public void queueEntityDelete(final ApplicationScope applicationScope, final Id entityId) {
  
-         offer( new EntityDeleteEvent( new EntityIdScope( applicationScope, entityId ) ) );
+         offer( new EntityDeleteEvent(queueFig.getPrimaryRegion(), new EntityIdScope( applicationScope, entityId ) ) );
      }
  
 +
 +    /**
 +     * Queue up an indexOperationMessage for multi region execution
 +     * @param indexOperationMessage
 +     */
 +    public void queueIndexOperationMessage( final IndexOperationMessage indexOperationMessage ) {
 +
 +        final String jsonValue = ObjectJsonSerializer.INSTANCE.toString( indexOperationMessage );
 +
 +        final UUID newMessageId = UUIDGenerator.newTimeUUID();
 +
 +        //write to the map in ES
 +        esMapPersistence.putString( newMessageId.toString(), jsonValue, indexProcessorFig.getIndexMessageTtl() );
 +
 +
 +
 +        //now queue up the index message
 +
 +        final ElasticsearchIndexEvent elasticsearchIndexEvent = new ElasticsearchIndexEvent( newMessageId );
 +
 +        //send to the topic so all regions index the batch
 +
 +        offerTopic( elasticsearchIndexEvent );
 +    }
 +
 +    public void handleIndexOperation(final ElasticsearchIndexEvent elasticsearchIndexEvent){
 +         Preconditions.checkNotNull( elasticsearchIndexEvent, "elasticsearchIndexEvent cannot be null" );
 +
 +        final UUID messageId = elasticsearchIndexEvent.getIndexBatchId();
 +
 +        Preconditions.checkNotNull( messageId, "messageId must not be null" );
 +
 +
 +        //load the entity
 +
 +        final String message = esMapPersistence.getString( messageId.toString() );
 +
 +        String highConsistency = null;
 +
 +        if(message == null){
 +            logger.error( "Received message with id {} to process, unable to find it, reading with higher consistency level" );
 +
 +            highConsistency =  esMapPersistence.getStringHighConsistency( messageId.toString() );
 +
 +        }
 +
 +        //read the value from the string
 +
 +        final IndexOperationMessage indexOperationMessage;
 +
 +        //our original local read has it, parse it.
 +        if(message != null){
 +             indexOperationMessage = ObjectJsonSerializer.INSTANCE.fromString( message, IndexOperationMessage.class );
 +        }
 +        //we tried to read it at a higher consistency level and it works
 +        else if (highConsistency != null){
 +            indexOperationMessage = ObjectJsonSerializer.INSTANCE.fromString( highConsistency, IndexOperationMessage.class );
 +        }
 +
 +        //we couldn't find it, bail
 +        else{
 +            logger.error( "Unable to find the ES batch with id {} to process at a higher consistency level" );
 +
 +            throw new RuntimeException( "Unable to find the ES batch to process with message id " + messageId );
 +        }
 +
 +
 +
 +        //now execute it
 +        indexProducer.put(indexOperationMessage).toBlocking().last();
 +
 +    }
 +
 +
 +
      @Override
      public long getQueueDepth() {
          return queue.getQueueDepth();

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3e155852/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
----------------------------------------------------------------------
diff --cc stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
index 3865ecb,8b44714..1649046
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
@@@ -28,7 -27,7 +27,8 @@@ import org.apache.usergrid.persistence.
  import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
  import org.apache.usergrid.persistence.index.EntityIndexFactory;
  import org.apache.usergrid.persistence.index.impl.IndexProducer;
 +import org.apache.usergrid.persistence.map.MapManagerFactory;
+ import org.apache.usergrid.persistence.queue.QueueFig;
  import org.apache.usergrid.persistence.queue.QueueManagerFactory;
  
  import com.google.inject.Inject;
@@@ -52,18 -51,21 +52,24 @@@ public class AsyncIndexProvider impleme
      private final IndexLocationStrategyFactory indexLocationStrategyFactory;
      private final EntityIndexFactory entityIndexFactory;
      private final IndexProducer indexProducer;
 +    private final MapManagerFactory mapManagerFactory;
+     private final QueueFig queueFig;
  
      private AsyncEventService asyncEventService;
  
  
      @Inject
-     public AsyncIndexProvider( final IndexProcessorFig indexProcessorFig, final QueueManagerFactory queueManagerFactory,
-                                final MetricsFactory metricsFactory, final RxTaskScheduler rxTaskScheduler, final
-                                    EntityCollectionManagerFactory entityCollectionManagerFactory,
-                                final EventBuilder eventBuilder, final IndexLocationStrategyFactory indexLocationStrategyFactory,
-                                final EntityIndexFactory entityIndexFactory, final IndexProducer indexProducer,
-                                final MapManagerFactory mapManagerFactory ) {
+     public AsyncIndexProvider(final IndexProcessorFig indexProcessorFig,
+                               final QueueManagerFactory queueManagerFactory,
+                               final MetricsFactory metricsFactory,
+                               final RxTaskScheduler rxTaskScheduler,
+                               final EntityCollectionManagerFactory entityCollectionManagerFactory,
+                               final EventBuilder eventBuilder,
+                               final IndexLocationStrategyFactory indexLocationStrategyFactory,
+                               final EntityIndexFactory entityIndexFactory,
 -                              final IndexProducer indexProducer, QueueFig queueFig) {
++                              final IndexProducer indexProducer,
++                              final MapManagerFactory mapManagerFactory,
++                              final QueueFig queueFig) {
  
          this.indexProcessorFig = indexProcessorFig;
          this.queueManagerFactory = queueManagerFactory;
@@@ -74,7 -76,7 +80,8 @@@
          this.indexLocationStrategyFactory = indexLocationStrategyFactory;
          this.entityIndexFactory = entityIndexFactory;
          this.indexProducer = indexProducer;
 +        this.mapManagerFactory = mapManagerFactory;
+         this.queueFig = queueFig;
      }
  
  
@@@ -98,11 -100,11 +105,10 @@@
              case LOCAL:
                  return new InMemoryAsyncEventService(eventBuilder, rxTaskScheduler, indexProducer,indexProcessorFig.resolveSynchronously());
              case SQS:
--                return new AmazonAsyncEventService(queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory,
-                     entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory, eventBuilder, mapManagerFactory, rxTaskScheduler );
 -                    entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory, eventBuilder, rxTaskScheduler,queueFig );
++                throw new IllegalArgumentException("Configuration value of SQS is no longer allowed. Use SNS instead");
              case SNS:
                  return new AmazonAsyncEventService(queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory,
-                     entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory, eventBuilder, mapManagerFactory, rxTaskScheduler );
 -                    entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory, eventBuilder, rxTaskScheduler, queueFig);
++                    entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory, eventBuilder, rxTaskScheduler );
              default:
                  throw new IllegalArgumentException("Configuration value of " + getErrorValues() + " are allowed");
          }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3e155852/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3e155852/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
----------------------------------------------------------------------
diff --cc stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
index e83d6f8,5b921d9..8ee47a2
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
@@@ -89,7 -89,7 +93,7 @@@ public class AmazonAsyncEventServiceTes
  
      @Override
      protected AsyncEventService getAsyncEventService() {
-         return  new AmazonAsyncEventService( queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory,  entityCollectionManagerFactory, indexLocationStrategyFactory, entityIndexFactory, eventBuilder, mapManagerFactory, rxTaskScheduler );
 -        return  new AmazonAsyncEventService( queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory,  entityCollectionManagerFactory, indexLocationStrategyFactory, entityIndexFactory, eventBuilder, rxTaskScheduler, queueFig );
++        return  new AmazonAsyncEventService( queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory,  entityCollectionManagerFactory, indexLocationStrategyFactory, entityIndexFactory, eventBuilder, rxTaskScheduler );
      }
  
  

http://git-wip-us.apache.org/repos/asf/usergrid/blob/3e155852/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
index a3fa05e,5ab1a4b..58b2a4d
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
@@@ -226,44 -171,43 +226,44 @@@ public class SNSQueueManagerImpl implem
  
              String multiRegion = fig.getRegionList();
  
 -            if (logger.isDebugEnabled())
 -                logger.debug("MultiRegion Setup specified, regions: [{}]", multiRegion);
 +            if ( logger.isDebugEnabled() ) {
 +                logger.debug( "MultiRegion Setup specified, regions: [{}]", multiRegion );
 +            }
  
 -            String[] regionNames = multiRegion.split(",");
 +            String[] regionNames = multiRegion.split( "," );
  
 -            final Map<String, String> arrQueueArns = new HashMap<>(regionNames.length + 1);
 -            final Map<String, String> topicArns = new HashMap<>(regionNames.length + 1);
 +            final Map<String, String> arrQueueArns = new HashMap<>( regionNames.length + 1 );
 +            final Map<String, String> topicArns = new HashMap<>( regionNames.length + 1 );
  
-             arrQueueArns.put( primaryQueueArn, fig.getRegion() );
-             topicArns.put( primaryTopicArn, fig.getRegion() );
+             arrQueueArns.put(primaryQueueArn, fig.getPrimaryRegion());
+             topicArns.put(primaryTopicArn, fig.getPrimaryRegion());
  
 -            for (String regionName : regionNames) {
 +            for ( String regionName : regionNames ) {
  
                  regionName = regionName.trim();
 -                Regions regions = Regions.fromName(regionName);
 -                Region region = Region.getRegion(regions);
 +                Regions regions = Regions.fromName( regionName );
 +                Region region = Region.getRegion( regions );
  
 -                AmazonSQSClient sqsClient = createSQSClient(region);
 -                AmazonSNSClient snsClient = createSNSClient(region); // do this stuff synchronously
 +                AmazonSQSClient sqsClient = createSQSClient( region );
 +                AmazonSNSClient snsClient = createSNSClient( region ); // do this stuff synchronously
  
                  // getTopicArn will create the SNS topic if it doesn't exist
 -                String topicArn = AmazonNotificationUtils.getTopicArn(snsClient, queueName, true);
 -                topicArns.put(topicArn, regionName);
 +                String topicArn = AmazonNotificationUtils.getTopicArn( snsClient, queueName, true );
 +                topicArns.put( topicArn, regionName );
  
                  // create the SQS queue if it doesn't exist
 -                String queueArn = AmazonNotificationUtils.getQueueArnByName(sqsClient, queueName);
 -                if (queueArn == null) {
 -                    queueUrl = AmazonNotificationUtils.createQueue(sqsClient, queueName, fig);
 -                    queueArn = AmazonNotificationUtils.getQueueArnByUrl(sqsClient, queueUrl);
 +                String queueArn = AmazonNotificationUtils.getQueueArnByName( sqsClient, queueName );
 +                if ( queueArn == null ) {
 +                    queueUrl = AmazonNotificationUtils.createQueue( sqsClient, queueName, fig );
 +                    queueArn = AmazonNotificationUtils.getQueueArnByUrl( sqsClient, queueUrl );
                  }
  
 -                arrQueueArns.put(queueArn, regionName);
 +                arrQueueArns.put( queueArn, regionName );
              }
  
 -            logger.debug("Creating Subscriptions...");
 +            logger.debug( "Creating Subscriptions..." );
  
 -            for (Map.Entry<String, String> queueArnEntry : arrQueueArns.entrySet()) {
 +            for ( Map.Entry<String, String> queueArnEntry : arrQueueArns.entrySet() ) {
                  String queueARN = queueArnEntry.getKey();
                  String strSqsRegion = queueArnEntry.getValue();
  
@@@ -650,10 -519,12 +650,10 @@@
  
      /**
       * Get the region
 -     *
 -     * @return
       */
      private Region getRegion() {
-         Regions regions = Regions.fromName( fig.getRegion() );
-         return Region.getRegion( regions );
+         Regions regions = Regions.fromName(fig.getPrimaryRegion());
+         return Region.getRegion(regions);
      }