You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/07/14 02:25:17 UTC
[6/9] incubator-usergrid git commit: Add support for having single
region and all region AWS queue implementations.
Add support for having single region and all region AWS queue implementations.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/c467a385
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/c467a385
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/c467a385
Branch: refs/heads/two-dot-o-dev
Commit: c467a3858ed859443666d6f497832bae8d1652f9
Parents: ad3916f
Author: Michael Russo <mi...@gmail.com>
Authored: Mon Jul 13 16:31:18 2015 -0700
Committer: Michael Russo <mi...@gmail.com>
Committed: Mon Jul 13 16:31:18 2015 -0700
----------------------------------------------------------------------
.../asyncevents/AmazonAsyncEventService.java | 3 ++-
.../apache/usergrid/persistence/queue/QueueScope.java | 14 ++++++++++++++
.../persistence/queue/impl/QueueScopeImpl.java | 7 ++++++-
.../persistence/queue/impl/SNSQueueManagerImpl.java | 4 ++--
.../usergrid/persistence/queue/QueueManagerTest.java | 2 +-
.../services/notifications/NotificationsService.java | 2 +-
.../services/notifications/QueueListener.java | 2 +-
.../usergrid/services/queues/QueueListener.java | 2 +-
8 files changed, 28 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c467a385/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index c5b836b..bd97d66 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -73,6 +73,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
private static final String QUEUE_NAME = "es_queue";
private final QueueManager queue;
+ private final QueueScope queueScope;
private final IndexProcessorFig indexProcessorFig;
private final IndexService indexService;
private final EntityCollectionManagerFactory entityCollectionManagerFactory;
@@ -105,7 +106,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
this.entityCollectionManagerFactory = entityCollectionManagerFactory;
this.rxTaskScheduler = rxTaskScheduler;
- final QueueScope queueScope = new QueueScopeImpl(QUEUE_NAME);
+ this.queueScope = new QueueScopeImpl(QUEUE_NAME, QueueScope.RegionImplementation.ALLREGIONS);
this.queue = queueManagerFactory.getQueueManager(queueScope);
this.indexProcessorFig = indexProcessorFig;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c467a385/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueScope.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueScope.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueScope.java
index cf6bf24..4beacf6 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueScope.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueScope.java
@@ -24,8 +24,22 @@ import org.apache.usergrid.persistence.core.scope.ApplicationScope;
public interface QueueScope {
/**
+ * LOCALREGION will create a SNS topic with a queue subscription in a single AWS region.
+ * ALLREGIONS will create SNS topics and queue subscriptions in ALL AWS regions.
+ */
+ enum RegionImplementation {
+ LOCALREGION,
+ ALLREGIONS
+ }
+
+ /**
* Get the name of the the map
* @return
*/
public String getName();
+
+ /**
+ * Get the Usergrid region enum
+ */
+ public RegionImplementation getRegionImplementation();
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c467a385/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueScopeImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueScopeImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueScopeImpl.java
index 381cd8e..09a0bcd 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueScopeImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueScopeImpl.java
@@ -26,9 +26,11 @@ import org.apache.usergrid.persistence.queue.QueueScope;
public class QueueScopeImpl implements QueueScope {
private final String name;
+ private final RegionImplementation regionImpl;
- public QueueScopeImpl( final String name ) {
+ public QueueScopeImpl( final String name, final RegionImplementation regionImpl) {
this.name = name;
+ this.regionImpl = regionImpl;
}
@@ -40,6 +42,9 @@ public class QueueScopeImpl implements QueueScope {
}
@Override
+ public RegionImplementation getRegionImplementation() {return regionImpl;}
+
+ @Override
public boolean equals( final Object o ) {
if ( this == o ) {
return true;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c467a385/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
index c5a0f30..60138ee 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
@@ -158,7 +158,7 @@ public class SNSQueueManagerImpl implements QueueManager {
logger.error(String.format("Unable to subscribe PRIMARY queue=[%s] to topic=[%s]", queueUrl, primaryTopicArn), e);
}
- if (fig.isMultiRegion()) {
+ if (fig.isMultiRegion() && scope.getRegionImplementation() == QueueScope.RegionImplementation.ALLREGIONS) {
String multiRegion = fig.getRegionList();
@@ -299,7 +299,7 @@ public class SNSQueueManagerImpl implements QueueManager {
private String getName() {
- String name = clusterFig.getClusterName() + "_" + scope.getName();
+ String name = clusterFig.getClusterName() + "_" + scope.getName() + "_" + scope.getRegionImplementation();
Preconditions.checkArgument(name.length() <= 80, "Your name must be < than 80 characters");
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c467a385/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
index 33fa1f5..0ed6065 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
@@ -65,7 +65,7 @@ public class QueueManagerTest {
@Before
public void mockApp() {
- this.scope = new QueueScopeImpl( "testQueue" );
+ this.scope = new QueueScopeImpl( "testQueue", QueueScope.RegionImplementation.LOCALREGION );
qm = qmf.getQueueManager(scope);
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c467a385/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
index 76f10c9..130756d 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
@@ -108,7 +108,7 @@ public class NotificationsService extends AbstractCollectionService {
postTimer = metricsService.getTimer(this.getClass(), "collection.post_requests");
JobScheduler jobScheduler = new JobScheduler(sm,em);
String name = ApplicationQueueManagerImpl.getQueueNames( props );
- QueueScope queueScope = new QueueScopeImpl( name );
+ QueueScope queueScope = new QueueScopeImpl( name, QueueScope.RegionImplementation.LOCALREGION );
queueManagerFactory = getApplicationContext().getBean( Injector.class ).getInstance(QueueManagerFactory.class);
QueueManager queueManager = queueManagerFactory.getQueueManager(queueScope);
notificationQueueManager = new ApplicationQueueManagerImpl(jobScheduler,em,queueManager,metricsService,props);
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c467a385/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
index 8c765ac..eba0060 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
@@ -146,7 +146,7 @@ public class QueueListener {
com.codahale.metrics.Timer timer = metricsService.getTimer(QueueListener.class, "execute.dequeue");
svcMgr = smf.getServiceManager(smf.getManagementAppId());
LOG.info("getting from queue {} ", queueName);
- QueueScope queueScope = new QueueScopeImpl( queueName ) {};
+ QueueScope queueScope = new QueueScopeImpl( queueName, QueueScope.RegionImplementation.LOCALREGION );
QueueManager queueManager = TEST_QUEUE_MANAGER != null ? TEST_QUEUE_MANAGER : queueManagerFactory.getQueueManager(queueScope);
// run until there are no more active jobs
final AtomicLong runCount = new AtomicLong(0);
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c467a385/stack/services/src/main/java/org/apache/usergrid/services/queues/QueueListener.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/queues/QueueListener.java b/stack/services/src/main/java/org/apache/usergrid/services/queues/QueueListener.java
index d38db95..bd57bb3 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/queues/QueueListener.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/queues/QueueListener.java
@@ -160,7 +160,7 @@ public abstract class QueueListener {
LOG.info("QueueListener: Starting execute process.");
svcMgr = smf.getServiceManager(smf.getManagementAppId());
LOG.info("getting from queue {} ", queueName);
- QueueScope queueScope = new QueueScopeImpl( queueName);
+ QueueScope queueScope = new QueueScopeImpl( queueName, QueueScope.RegionImplementation.LOCALREGION);
QueueManager queueManager = TEST_QUEUE_MANAGER != null ? TEST_QUEUE_MANAGER : queueManagerFactory.getQueueManager(queueScope);
// run until there are no more active jobs
long runCount = 0;