You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/07/14 02:25:17 UTC

[6/9] incubator-usergrid git commit: Add support for having single region and all region AWS queue implementations.

Add support for having single region and all region AWS queue implementations.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/c467a385
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/c467a385
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/c467a385

Branch: refs/heads/two-dot-o-dev
Commit: c467a3858ed859443666d6f497832bae8d1652f9
Parents: ad3916f
Author: Michael Russo <mi...@gmail.com>
Authored: Mon Jul 13 16:31:18 2015 -0700
Committer: Michael Russo <mi...@gmail.com>
Committed: Mon Jul 13 16:31:18 2015 -0700

----------------------------------------------------------------------
 .../asyncevents/AmazonAsyncEventService.java          |  3 ++-
 .../apache/usergrid/persistence/queue/QueueScope.java | 14 ++++++++++++++
 .../persistence/queue/impl/QueueScopeImpl.java        |  7 ++++++-
 .../persistence/queue/impl/SNSQueueManagerImpl.java   |  4 ++--
 .../usergrid/persistence/queue/QueueManagerTest.java  |  2 +-
 .../services/notifications/NotificationsService.java  |  2 +-
 .../services/notifications/QueueListener.java         |  2 +-
 .../usergrid/services/queues/QueueListener.java       |  2 +-
 8 files changed, 28 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c467a385/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index c5b836b..bd97d66 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -73,6 +73,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
     private static final String QUEUE_NAME = "es_queue";
 
     private final QueueManager queue;
+    private final QueueScope queueScope;
     private final IndexProcessorFig indexProcessorFig;
     private final IndexService indexService;
     private final EntityCollectionManagerFactory entityCollectionManagerFactory;
@@ -105,7 +106,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
         this.entityCollectionManagerFactory = entityCollectionManagerFactory;
         this.rxTaskScheduler = rxTaskScheduler;
 
-        final QueueScope queueScope = new QueueScopeImpl(QUEUE_NAME);
+        this.queueScope = new QueueScopeImpl(QUEUE_NAME, QueueScope.RegionImplementation.ALLREGIONS);
         this.queue = queueManagerFactory.getQueueManager(queueScope);
         this.indexProcessorFig = indexProcessorFig;
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c467a385/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueScope.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueScope.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueScope.java
index cf6bf24..4beacf6 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueScope.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueScope.java
@@ -24,8 +24,22 @@ import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 public interface QueueScope  {
 
     /**
+     * LOCALREGION will create a SNS topic with a queue subscription in a single AWS region.
+     * ALLREGIONS will create SNS topics and queue subscriptions  in ALL AWS regions.
+     */
+    enum RegionImplementation {
+        LOCALREGION,
+        ALLREGIONS
+    }
+
+    /**
      * Get the name of the the map
      * @return
      */
     public String getName();
+
+    /**
+     * Get the Usergrid region enum
+     */
+    public RegionImplementation getRegionImplementation();
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c467a385/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueScopeImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueScopeImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueScopeImpl.java
index 381cd8e..09a0bcd 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueScopeImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueScopeImpl.java
@@ -26,9 +26,11 @@ import org.apache.usergrid.persistence.queue.QueueScope;
 public class QueueScopeImpl implements QueueScope {
 
     private final String name;
+    private final RegionImplementation regionImpl;
 
-    public QueueScopeImpl(  final String name ) {
+    public QueueScopeImpl(  final String name, final RegionImplementation regionImpl) {
         this.name = name;
+        this.regionImpl = regionImpl;
     }
 
 
@@ -40,6 +42,9 @@ public class QueueScopeImpl implements QueueScope {
     }
 
     @Override
+    public RegionImplementation getRegionImplementation() {return regionImpl;}
+
+    @Override
     public boolean equals( final Object o ) {
         if ( this == o ) {
             return true;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c467a385/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
index c5a0f30..60138ee 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
@@ -158,7 +158,7 @@ public class SNSQueueManagerImpl implements QueueManager {
             logger.error(String.format("Unable to subscribe PRIMARY queue=[%s] to topic=[%s]", queueUrl, primaryTopicArn), e);
         }
 
-        if (fig.isMultiRegion()) {
+        if (fig.isMultiRegion() && scope.getRegionImplementation() == QueueScope.RegionImplementation.ALLREGIONS) {
 
             String multiRegion = fig.getRegionList();
 
@@ -299,7 +299,7 @@ public class SNSQueueManagerImpl implements QueueManager {
 
 
     private String getName() {
-        String name = clusterFig.getClusterName() + "_" + scope.getName();
+        String name = clusterFig.getClusterName() + "_" + scope.getName() + "_" + scope.getRegionImplementation();
 
         Preconditions.checkArgument(name.length() <= 80, "Your name must be < than 80 characters");
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c467a385/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
index 33fa1f5..0ed6065 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
@@ -65,7 +65,7 @@ public class QueueManagerTest {
 
     @Before
     public void mockApp() {
-        this.scope = new QueueScopeImpl(  "testQueue" );
+        this.scope = new QueueScopeImpl( "testQueue", QueueScope.RegionImplementation.LOCALREGION );
         qm = qmf.getQueueManager(scope);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c467a385/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
index 76f10c9..130756d 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
@@ -108,7 +108,7 @@ public class NotificationsService extends AbstractCollectionService {
         postTimer = metricsService.getTimer(this.getClass(), "collection.post_requests");
         JobScheduler jobScheduler = new JobScheduler(sm,em);
         String name = ApplicationQueueManagerImpl.getQueueNames( props );
-        QueueScope queueScope = new QueueScopeImpl( name );
+        QueueScope queueScope = new QueueScopeImpl( name, QueueScope.RegionImplementation.LOCALREGION );
         queueManagerFactory = getApplicationContext().getBean( Injector.class ).getInstance(QueueManagerFactory.class);
         QueueManager queueManager = queueManagerFactory.getQueueManager(queueScope);
         notificationQueueManager = new ApplicationQueueManagerImpl(jobScheduler,em,queueManager,metricsService,props);

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c467a385/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
index 8c765ac..eba0060 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
@@ -146,7 +146,7 @@ public class QueueListener  {
         com.codahale.metrics.Timer timer = metricsService.getTimer(QueueListener.class, "execute.dequeue");
         svcMgr = smf.getServiceManager(smf.getManagementAppId());
         LOG.info("getting from queue {} ", queueName);
-        QueueScope queueScope = new QueueScopeImpl( queueName ) {};
+        QueueScope queueScope = new QueueScopeImpl( queueName, QueueScope.RegionImplementation.LOCALREGION );
         QueueManager queueManager = TEST_QUEUE_MANAGER != null ? TEST_QUEUE_MANAGER : queueManagerFactory.getQueueManager(queueScope);
         // run until there are no more active jobs
         final AtomicLong runCount = new AtomicLong(0);

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c467a385/stack/services/src/main/java/org/apache/usergrid/services/queues/QueueListener.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/queues/QueueListener.java b/stack/services/src/main/java/org/apache/usergrid/services/queues/QueueListener.java
index d38db95..bd57bb3 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/queues/QueueListener.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/queues/QueueListener.java
@@ -160,7 +160,7 @@ public abstract class QueueListener  {
         LOG.info("QueueListener: Starting execute process.");
         svcMgr = smf.getServiceManager(smf.getManagementAppId());
         LOG.info("getting from queue {} ", queueName);
-        QueueScope queueScope = new QueueScopeImpl( queueName);
+        QueueScope queueScope = new QueueScopeImpl( queueName, QueueScope.RegionImplementation.LOCALREGION);
         QueueManager queueManager = TEST_QUEUE_MANAGER != null ? TEST_QUEUE_MANAGER : queueManagerFactory.getQueueManager(queueScope);
         // run until there are no more active jobs
         long runCount = 0;