You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2014/10/23 23:48:41 UTC
[5/7] git commit: add queuescope factory
add queuescope factory
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/daebd252
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/daebd252
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/daebd252
Branch: refs/heads/two-dot-o-events
Commit: daebd252b14f57c8ce091ece5b2104310c94b188
Parents: 4e2e411
Author: Shawn Feldman <sf...@apache.org>
Authored: Thu Oct 23 14:30:15 2014 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Thu Oct 23 14:30:15 2014 -0600
----------------------------------------------------------------------
.../usergrid/persistence/queue/QueueFig.java | 3 ++
.../persistence/queue/QueueScopeFactory.java | 34 ++++++++++++++
.../persistence/queue/guice/QueueModule.java | 10 ++++-
.../queue/impl/QueueScopeFactoryImpl.java | 47 ++++++++++++++++++++
.../persistence/queue/QueueManagerTest.java | 27 ++++++++---
.../notifications/ApplicationQueueManager.java | 1 -
.../notifications/NotificationsService.java | 5 ++-
.../services/notifications/QueueListener.java | 5 ++-
8 files changed, 122 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/daebd252/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java
index fd71f9e..7fd2258 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java
@@ -12,5 +12,8 @@ public interface QueueFig extends GuicyFig {
@Default("us-east-1")
public String getRegion();
+ @Key( "queue.prefix" )
+ @Default("usergrid")
+ public String getPrefix();
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/daebd252/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueScopeFactory.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueScopeFactory.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueScopeFactory.java
new file mode 100644
index 0000000..3a508a9
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueScopeFactory.java
@@ -0,0 +1,34 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. The ASF licenses this file to You
+ * * under the Apache License, Version 2.0 (the "License"); you may not
+ * * use this file except in compliance with the License.
+ * * You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License. For additional information regarding
+ * * copyright in this work, please see the NOTICE file in the top level
+ * * directory of this distribution.
+ *
+ */
+
+package org.apache.usergrid.persistence.queue;
+
+import org.apache.usergrid.persistence.model.entity.SimpleId;
+import org.apache.usergrid.persistence.queue.impl.QueueScopeImpl;
+
+import java.util.UUID;
+
+/**
+ * Created by ApigeeCorporation on 10/23/14.
+ */
+public interface QueueScopeFactory {
+
+ public QueueScope getScope( UUID id, String queueName);
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/daebd252/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java
index e8fc7c8..4e487e1 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java
@@ -22,6 +22,8 @@ import com.google.inject.assistedinject.FactoryModuleBuilder;
import org.apache.usergrid.persistence.queue.QueueFig;
import org.apache.usergrid.persistence.queue.QueueManager;
import org.apache.usergrid.persistence.queue.QueueManagerFactory;
+import org.apache.usergrid.persistence.queue.QueueScopeFactory;
+import org.apache.usergrid.persistence.queue.impl.QueueScopeFactoryImpl;
import org.apache.usergrid.persistence.queue.impl.SQSQueueManagerImpl;
import org.safehaus.guicyfig.GuicyFigModule;
@@ -37,10 +39,14 @@ public class QueueModule extends AbstractModule {
@Override
protected void configure() {
+
install( new GuicyFigModule( QueueFig.class) );
+
// create a guice factory for getting our collection manager
- install( new FactoryModuleBuilder().implement( QueueManager.class, SQSQueueManagerImpl.class )
- .build( QueueManagerFactory.class ) );
+ install(new FactoryModuleBuilder()
+ .implement(QueueManager.class, SQSQueueManagerImpl.class)
+ .build(QueueManagerFactory.class));
+ bind( QueueScopeFactory.class ).to( QueueScopeFactoryImpl.class );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/daebd252/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueScopeFactoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueScopeFactoryImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueScopeFactoryImpl.java
new file mode 100644
index 0000000..92ed075
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueScopeFactoryImpl.java
@@ -0,0 +1,47 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. The ASF licenses this file to You
+ * * under the Apache License, Version 2.0 (the "License"); you may not
+ * * use this file except in compliance with the License.
+ * * You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License. For additional information regarding
+ * * copyright in this work, please see the NOTICE file in the top level
+ * * directory of this distribution.
+ *
+ */
+
+package org.apache.usergrid.persistence.queue.impl;
+
+import com.google.inject.Inject;
+import org.apache.usergrid.persistence.model.entity.SimpleId;
+import org.apache.usergrid.persistence.queue.QueueFig;
+import org.apache.usergrid.persistence.queue.QueueScope;
+import org.apache.usergrid.persistence.queue.QueueScopeFactory;
+
+import java.util.UUID;
+
+/**
+ * Classy class class.
+ */
+public class QueueScopeFactoryImpl implements QueueScopeFactory {
+
+ private QueueFig fig;
+
+ @Inject
+ public QueueScopeFactoryImpl(QueueFig fig){
+ this.fig = fig;
+ }
+
+ @Override
+ public QueueScope getScope(UUID applicationId, String queueName) {
+ return new QueueScopeImpl(new SimpleId(applicationId, fig.getPrefix()), queueName);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/daebd252/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
index 26a350c..1d3c049 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
@@ -22,6 +22,7 @@ package org.apache.usergrid.persistence.queue;
import com.amazonaws.services.glacier.TreeHashGenerator;
import org.apache.usergrid.persistence.collection.util.InvalidEntityGenerator;
import org.apache.usergrid.persistence.queue.guice.TestQueueModule;
+import org.apache.usergrid.persistence.queue.impl.QueueScopeFactoryImpl;
import org.apache.usergrid.persistence.queue.impl.QueueScopeImpl;
import org.jukito.UseModules;
import org.junit.Before;
@@ -33,12 +34,13 @@ import org.apache.usergrid.persistence.core.cassandra.ITRunner;
import org.apache.usergrid.persistence.model.entity.SimpleId;
import com.google.inject.Inject;
+import org.safehaus.guicyfig.Bypass;
+import org.safehaus.guicyfig.OptionState;
+import org.safehaus.guicyfig.Overrides;
+import java.beans.PropertyChangeListener;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import static org.junit.Assert.*;
@@ -46,10 +48,14 @@ import static org.junit.Assert.*;
@UseModules( { TestQueueModule.class } )
public class QueueManagerTest {
-
+ @Inject
+ protected QueueFig queueFig;
@Inject
protected QueueManagerFactory qmf;
+ @Inject
+ protected QueueScopeFactory queueScopeFactory;
+
protected QueueScope scope;
private QueueManager qm;
@@ -58,6 +64,16 @@ public class QueueManagerTest {
public void mockApp() {
this.scope = new QueueScopeImpl( new SimpleId( "application" ), "testQueue" );
qm = qmf.getQueueManager(scope);
+ queueScopeFactory = new QueueScopeFactoryImpl(queueFig);
+ }
+
+ @Test
+ public void scopeFactory(){
+ UUID uuid = UUID.randomUUID();
+ String key = "test";
+ QueueScope scope =queueScopeFactory.getScope(uuid,key);
+ assertEquals(key,scope.getName());
+ assertEquals(scope.getApplication().getUuid(),uuid);
}
@Ignore("need aws creds")
@@ -97,4 +113,5 @@ public class QueueManagerTest {
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/daebd252/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
index e819356..a83513f 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
@@ -54,7 +54,6 @@ public class ApplicationQueueManager {
private static ExecutorService INACTIVE_DEVICE_CHECK_POOL = Executors.newFixedThreadPool(5);
public static final String NOTIFIER_ID_POSTFIX = ".notifier.id";
- public static final String QUEUE_PREFIX = "usergrid";
private final EntityManager em;
private final QueueManager qm;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/daebd252/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
index 750b305..69f66e5 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
@@ -33,6 +33,7 @@ import org.apache.usergrid.persistence.model.entity.SimpleId;
import org.apache.usergrid.persistence.queue.QueueManager;
import org.apache.usergrid.persistence.queue.QueueManagerFactory;
import org.apache.usergrid.persistence.queue.QueueScope;
+import org.apache.usergrid.persistence.queue.QueueScopeFactory;
import org.apache.usergrid.persistence.queue.impl.QueueScopeImpl;
import org.apache.usergrid.services.*;
import org.slf4j.Logger;
@@ -99,13 +100,15 @@ public class NotificationsService extends AbstractCollectionService {
super.init(info);
smf = getApplicationContext().getBean(ServiceManagerFactory.class);
emf = getApplicationContext().getBean(EntityManagerFactory.class);
+
Properties props = (Properties)getApplicationContext().getBean("properties");
metricsService = getApplicationContext().getBean(MetricsFactory.class);
postMeter = metricsService.getMeter(NotificationsService.class, "requests");
postTimer = metricsService.getTimer(this.getClass(), "execution_rest");
JobScheduler jobScheduler = new JobScheduler(sm,em);
String name = ApplicationQueueManager.getQueueNames(props);
- QueueScope queueScope = new QueueScopeImpl(new SimpleId(smf.getManagementAppId(),ApplicationQueueManager.QUEUE_PREFIX),name);
+ QueueScopeFactory queueScopeFactory = CpSetup.getInjector().getInstance(QueueScopeFactory.class);
+ QueueScope queueScope = queueScopeFactory.getScope(smf.getManagementAppId(), name);
queueManagerFactory = CpSetup.getInjector().getInstance(QueueManagerFactory.class);
QueueManager queueManager = TEST_QUEUE_MANAGER !=null ? TEST_QUEUE_MANAGER : queueManagerFactory.getQueueManager(queueScope);
notificationQueueManager = new ApplicationQueueManager(jobScheduler,em,queueManager,metricsService,props);
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/daebd252/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
index 3ef9198..7ce315b 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
@@ -43,6 +43,7 @@ import java.util.concurrent.atomic.AtomicInteger;
public class QueueListener {
public final int MESSAGE_TRANSACTION_TIMEOUT = 25 * 1000;
private final QueueManagerFactory queueManagerFactory;
+ private final QueueScopeFactory queueScopeFactory;
public long DEFAULT_SLEEP = 5000;
@@ -78,6 +79,8 @@ public class QueueListener {
this.emf = emf;
this.metricsService = metricsService;
this.properties = props;
+ this.queueScopeFactory = CpSetup.getInjector().getInstance(QueueScopeFactory.class);
+
}
@PostConstruct
@@ -137,7 +140,7 @@ public class QueueListener {
com.codahale.metrics.Timer timer = metricsService.getTimer(QueueListener.class, "dequeue");
svcMgr = smf.getServiceManager(smf.getManagementAppId());
LOG.info("getting from queue {} ", queueName);
- QueueScope queueScope = new QueueScopeImpl(new SimpleId(smf.getManagementAppId(),ApplicationQueueManager.QUEUE_PREFIX),queueName);
+ QueueScope queueScope = queueScopeFactory.getScope(smf.getManagementAppId(), queueName);
QueueManager queueManager = TEST_QUEUE_MANAGER != null ? TEST_QUEUE_MANAGER : queueManagerFactory.getQueueManager(queueScope);
// run until there are no more active jobs
while ( true ) {