You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2014/10/04 03:11:56 UTC

[2/4] git commit: dependency issues, removing old queueing mechanism

dependency issues, removing old queueing mechanism


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/fadf5277
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/fadf5277
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/fadf5277

Branch: refs/heads/sqs_queues
Commit: fadf5277011d980525e8a35302cd776068446c4a
Parents: eb5204b
Author: Shawn Feldman <sf...@apache.org>
Authored: Fri Oct 3 16:12:35 2014 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Fri Oct 3 16:12:35 2014 -0600

----------------------------------------------------------------------
 stack/core/pom.xml                              | 15 +++++
 .../usergrid/corepersistence/GuiceModule.java   |  4 ++
 stack/corepersistence/map/pom.xml               | 16 +++++
 stack/corepersistence/pom.xml                   | 16 +++++
 stack/corepersistence/queue/pom.xml             |  1 +
 .../persistence/queue/QueueManagerFactory.java  |  3 -
 .../queue/impl/SQSQueueManagerImpl.java         |  3 +-
 stack/pom.xml                                   |  6 +-
 stack/queue/pom.xml                             | 16 -----
 .../notifications/ApplicationQueueManager.java  | 69 +++++++-------------
 .../notifications/ApplicationQueueMessage.java  | 67 +++++++------------
 .../notifications/NotificationsService.java     | 15 ++++-
 .../services/notifications/QueueListener.java   | 56 ++++++++--------
 .../services/notifications/QueueManager.java    | 31 ---------
 .../services/notifications/TaskManager.java     | 27 ++++----
 .../apns/NotificationsServiceIT.java            |  2 +-
 .../gcm/NotificationsServiceIT.java             |  2 +-
 17 files changed, 164 insertions(+), 185 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fadf5277/stack/core/pom.xml
----------------------------------------------------------------------
diff --git a/stack/core/pom.xml b/stack/core/pom.xml
index 6cd5dac..30a14d1 100644
--- a/stack/core/pom.xml
+++ b/stack/core/pom.xml
@@ -526,6 +526,21 @@
 	    <type>jar</type>
     </dependency>
 
+    <dependency>
+      <groupId>org.apache.usergrid</groupId>
+      <artifactId>map</artifactId>
+      <version>2.0.0-SNAPSHOT</version>
+      <type>jar</type>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.usergrid</groupId>
+      <artifactId>queue</artifactId>
+      <version>2.0.0-SNAPSHOT</version>
+      <type>jar</type>
+    </dependency>
+
+
     <!--<dependency>-->
       <!--<artifactId>lucene-core</artifactId>-->
       <!--<groupId>org.apache.lucene</groupId>-->

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fadf5277/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceModule.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceModule.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceModule.java
index 1f3d615..42f81d5 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceModule.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceModule.java
@@ -17,6 +17,8 @@
 package org.apache.usergrid.corepersistence;
 
 
+import org.apache.usergrid.persistence.map.guice.MapModule;
+import org.apache.usergrid.persistence.queue.guice.QueueModule;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -41,6 +43,8 @@ public class GuiceModule  extends AbstractModule {
         install(new CollectionModule());
         install(new GraphModule());
         install(new IndexModule());
+        install(new MapModule());
+        install(new QueueModule());
 
         bind(CpEntityDeleteListener.class).asEagerSingleton();
         bind(CpEntityIndexDeleteListener.class).asEagerSingleton();

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fadf5277/stack/corepersistence/map/pom.xml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/map/pom.xml b/stack/corepersistence/map/pom.xml
index 93b1030..e9cb5ab 100644
--- a/stack/corepersistence/map/pom.xml
+++ b/stack/corepersistence/map/pom.xml
@@ -1,4 +1,20 @@
 <?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
 <project xmlns="http://maven.apache.org/POM/4.0.0"
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fadf5277/stack/corepersistence/pom.xml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/pom.xml b/stack/corepersistence/pom.xml
index 90b4838..7482271 100644
--- a/stack/corepersistence/pom.xml
+++ b/stack/corepersistence/pom.xml
@@ -1,4 +1,20 @@
 <?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
 <project xmlns="http://maven.apache.org/POM/4.0.0"
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fadf5277/stack/corepersistence/queue/pom.xml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/pom.xml b/stack/corepersistence/queue/pom.xml
index 6795248..94f11a8 100644
--- a/stack/corepersistence/queue/pom.xml
+++ b/stack/corepersistence/queue/pom.xml
@@ -85,6 +85,7 @@
       <version>1.8.11</version>
     </dependency>
 
+
   </dependencies>
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fadf5277/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManagerFactory.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManagerFactory.java
index 79bcba9..4cdb5e2 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManagerFactory.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManagerFactory.java
@@ -17,9 +17,6 @@
  */
 package org.apache.usergrid.persistence.queue;
 
-/**
- * Created by ApigeeCorporation on 10/3/14.
- */
 public interface QueueManagerFactory {
     public QueueManager getQueueManager( final QueueScope scope );
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fadf5277/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
index 84e3cab..4c898f3 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
@@ -67,7 +67,8 @@ public class SQSQueueManagerImpl implements QueueManager {
     }
 
     private String getName() {
-        return scope.getApplication().getType() + scope.getApplication().getUuid().toString() + scope.getName();
+        String name = scope.getApplication().getType() + scope.getApplication().getUuid().toString() + scope.getName();
+        return name;
     }
 
     public Queue getQueue(){

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fadf5277/stack/pom.xml
----------------------------------------------------------------------
diff --git a/stack/pom.xml b/stack/pom.xml
index e105c79..2e0d835 100644
--- a/stack/pom.xml
+++ b/stack/pom.xml
@@ -276,7 +276,7 @@
       <dependency>
         <groupId>org.apache.httpcomponents</groupId>
         <artifactId>httpclient</artifactId>
-        <version>4.1.3</version>
+        <version>4.2</version>
         <exclusions>
           <exclusion>
             <groupId>commons-codec</groupId>
@@ -457,6 +457,10 @@
         <version>${cassandra-version}</version>
         <exclusions>
           <exclusion>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpclient</artifactId>
+          </exclusion>
+          <exclusion>
             <groupId>commons-lang</groupId>
             <artifactId>commons-lang</artifactId>
           </exclusion>

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fadf5277/stack/queue/pom.xml
----------------------------------------------------------------------
diff --git a/stack/queue/pom.xml b/stack/queue/pom.xml
deleted file mode 100644
index 4cdf79d..0000000
--- a/stack/queue/pom.xml
+++ /dev/null
@@ -1,16 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-  <parent>
-    <artifactId>persistence</artifactId>
-    <groupId>org.apache.usergrid</groupId>
-    <version>2.0.0-SNAPSHOT</version>
-    <relativePath>../corepersistence/pom.xml</relativePath>
-  </parent>
-  <modelVersion>4.0.0</modelVersion>
-
-  <artifactId>queue</artifactId>
-
-
-</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fadf5277/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
index c88cead..bb0061d 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
@@ -27,6 +27,8 @@ import org.apache.usergrid.persistence.entities.Notification;
 import org.apache.usergrid.persistence.entities.Notifier;
 import org.apache.usergrid.persistence.entities.Receipt;
 import org.apache.usergrid.persistence.index.query.Query;
+import org.apache.usergrid.persistence.queue.QueueManager;
+import org.apache.usergrid.persistence.queue.QueueMessage;
 import org.apache.usergrid.services.notifications.apns.APNsAdapter;
 import org.apache.usergrid.services.notifications.gcm.GCMAdapter;
 import org.slf4j.Logger;
@@ -42,13 +44,10 @@ import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
 
 
-/**
- * Created by ApigeeCorporation on 8/27/14.
- */
-public class ApplicationQueueManager implements QueueManager {
+public class ApplicationQueueManager  {
 
-    public static  String DEFAULT_QUEUE_NAME = "notifications/queuelistenerv1_40;notifications/queuelistenerv1_41;notifications/queuelistenerv1_42";
-    public static final String DEFAULT_QUEUE_PROPERTY = "usergrid.notifications.listener.queueName";
+    public static  String DEFAULT_QUEUE_NAME = "queuelistenerv1_60";
+    public static final String DEFAULT_QUEUE_PROPERTY = "usergrid.notifications.listener.queue";
     private static final Logger LOG = LoggerFactory.getLogger(ApplicationQueueManager.class);
 
     //this is for tests, will not mark initial post complete, set to false for tests
@@ -57,11 +56,10 @@ public class ApplicationQueueManager implements QueueManager {
     public static final String NOTIFIER_ID_POSTFIX = ".notifier.id";
 
     private final EntityManager em;
-    private final org.apache.usergrid.mq.QueueManager qm;
+    private final QueueManager qm;
     private final JobScheduler jobScheduler;
     private final MetricsFactory metricsFactory;
-    private final String[] queueNames;
-    private boolean sendNow = true;
+    private final String queueName;
 
     HashMap<Object, Notifier> notifierHashMap; // only retrieve notifiers once
 
@@ -77,13 +75,12 @@ public class ApplicationQueueManager implements QueueManager {
     public static ProviderAdapter TEST_ADAPTER = new TestAdapter();
 
 
-    public ApplicationQueueManager(JobScheduler jobScheduler, EntityManager entityManager, org.apache.usergrid.mq.QueueManager queueManager, MetricsFactory metricsFactory, Properties properties){
+    public ApplicationQueueManager(JobScheduler jobScheduler, EntityManager entityManager, QueueManager queueManager, MetricsFactory metricsFactory, Properties properties){
         this.em = entityManager;
         this.qm = queueManager;
         this.jobScheduler = jobScheduler;
         this.metricsFactory = metricsFactory;
-        this.queueNames = getQueueNames(properties);
-        this.sendNow = new Boolean(properties.getProperty("usergrid.notifications.sendNow",""+sendNow));
+        this.queueName = getQueueNames(properties);
     }
 
 
@@ -115,7 +112,6 @@ public class ApplicationQueueManager implements QueueManager {
         final ConcurrentLinkedQueue<String> errorMessages = new ConcurrentLinkedQueue<String>(); //build up list of issues
 
         final HashMap<Object,Notifier> notifierMap =  getNotifierMap();
-        final String queueName = getRandomQueue(queueNames);
         final List<ApplicationQueueMessage> messages = new ArrayList<>();
 
         //get devices in querystring, and make sure you have access
@@ -132,7 +128,6 @@ public class ApplicationQueueManager implements QueueManager {
             final UUID appId = em.getApplication().getUuid();
             final Map<String,Object> payloads = notification.getPayloads();
 
-            final boolean sendNow = this.sendNow; //&& jobExecution == null;
 
             final Func1<Entity,Entity> entityListFunct = new Func1<Entity, Entity>() {
                 @Override
@@ -183,11 +178,8 @@ public class ApplicationQueueManager implements QueueManager {
                                 LOG.info("ApplicationQueueMessage: notification {} device {} queue time set. duration "+(System.currentTimeMillis()-now)+" ms", notification.getUuid(), deviceRef.getUuid());
                             }
                             now = System.currentTimeMillis();
-                            if(sendNow){ //if(jobExecution == null && sendNow) {
-                                messages.add(message);
-                            }else{
-                                qm.postToQueue(queueName, message);
-                            }
+
+                                qm.sendMessage(message);
                             LOG.info("ApplicationQueueMessage: notification {} post-queue to device {} duration " + (System.currentTimeMillis() - now) + " ms "+queueName+" queue", notification.getUuid(), deviceRef.getUuid());
                             deviceCount.incrementAndGet();
                             queueMeter.mark();
@@ -242,7 +234,7 @@ public class ApplicationQueueManager implements QueueManager {
 
         //do i have devices, and have i already started batching.
         if (deviceCount.get() <= 0) {
-            TaskManager taskManager = new TaskManager(em, qm, this, notification,queueName);
+            TaskManager taskManager = new TaskManager(em, this, notification,this.qm);
             //if i'm in a test value will be false, do not mark finished for test orchestration, not ideal need real tests
             taskManager.finishedBatch();
         }
@@ -251,13 +243,6 @@ public class ApplicationQueueManager implements QueueManager {
             long elapsed = notification.getQueued() != null ? notification.getQueued() - startTime : 0;
             LOG.info("ApplicationQueueMessage: notification {} done queuing to {} devices in "+elapsed+" ms",notification.getUuid().toString(),deviceCount.get());
         }
-
-        if(messages.size()>0){
-            now = System.currentTimeMillis();
-            sendBatchToProviders(messages,null).toBlocking().lastOrDefault(null);
-            LOG.info("ApplicationQueueMessage: notification {} done sending to "+messages.size()+" devicess in {} ms", notification.getUuid(), System.currentTimeMillis() - now);
-        }
-
     }
 
     /**
@@ -300,20 +285,22 @@ public class ApplicationQueueManager implements QueueManager {
      * @throws Exception
      */
 
-    public Observable sendBatchToProviders( final List<ApplicationQueueMessage> messages, final String queuePath) {
+    public Observable sendBatchToProviders( final List<QueueMessage> messages, final String queuePath) {
         LOG.info("sending batch of {} notifications.", messages.size());
         final Meter sendMeter = metricsFactory.getMeter(NotificationsService.class, "send");
 
         final Map<Object, Notifier> notifierMap = getNotifierMap();
-        final QueueManager proxy = this;
+        final ApplicationQueueManager proxy = this;
         final ConcurrentHashMap<UUID,TaskManager> taskMap = new ConcurrentHashMap<UUID, TaskManager>(messages.size());
         final ConcurrentHashMap<UUID,Notification> notificationMap = new ConcurrentHashMap<UUID, Notification>(messages.size());
 
-        final Func1<ApplicationQueueMessage, ApplicationQueueMessage> func = new Func1<ApplicationQueueMessage, ApplicationQueueMessage>() {
+        final Func1<QueueMessage, ApplicationQueueMessage> func = new Func1<QueueMessage, ApplicationQueueMessage>() {
             @Override
-            public ApplicationQueueMessage call(ApplicationQueueMessage message) {
+            public ApplicationQueueMessage call(QueueMessage queueMessage) {
                 boolean messageCommitted = false;
+                ApplicationQueueMessage message = null;
                 try {
+                    message = (ApplicationQueueMessage) queueMessage.getBody();
                     LOG.info("start sending notification for device {} for Notification: {} on thread "+Thread.currentThread().getId(), message.getDeviceId(), message.getNotificationId());
 
                     UUID deviceUUID = message.getDeviceId();
@@ -325,7 +312,7 @@ public class ApplicationQueueManager implements QueueManager {
                     }
                     TaskManager taskManager = taskMap.get(message.getNotificationId());
                     if (taskManager == null) {
-                        taskManager = new TaskManager(em, qm, proxy, notification,queuePath);
+                        taskManager = new TaskManager(em, proxy, notification, qm);
                         taskMap.putIfAbsent(message.getNotificationId(), taskManager);
                         taskManager = taskMap.get(message.getNotificationId());
                     }
@@ -334,7 +321,7 @@ public class ApplicationQueueManager implements QueueManager {
                     final Map<String, Object> translatedPayloads = translatePayloads(payloads, notifierMap);
                     LOG.info("sending notification for device {} for Notification: {}", deviceUUID, notification.getUuid());
 
-                    taskManager.addMessage(deviceUUID,message);
+                    taskManager.addMessage(deviceUUID,queueMessage);
                     try {
                         String notifierName = message.getNotifierKey().toLowerCase();
                         Notifier notifier = notifierMap.get(notifierName.toLowerCase());
@@ -368,7 +355,7 @@ public class ApplicationQueueManager implements QueueManager {
                     LOG.error("Failure while sending",e);
                     try {
                         if(!messageCommitted && queuePath != null) {
-                            qm.commitTransaction(queuePath, message.getTransaction(), null);
+                            qm.commitMessage(queueMessage);
                         }
                     }catch (Exception queueException){
                         LOG.error("Failed to commit message.",queueException);
@@ -378,9 +365,9 @@ public class ApplicationQueueManager implements QueueManager {
             }
         };
         Observable o = rx.Observable.from(messages)
-                .parallel(new Func1<rx.Observable<ApplicationQueueMessage>, rx.Observable<ApplicationQueueMessage>>() {
+                .parallel(new Func1<rx.Observable<QueueMessage>, rx.Observable<ApplicationQueueMessage>>() {
                     @Override
-                    public rx.Observable<ApplicationQueueMessage> call(rx.Observable<ApplicationQueueMessage> messageObservable) {
+                    public rx.Observable<ApplicationQueueMessage> call(rx.Observable<QueueMessage> messageObservable) {
                         return messageObservable.map(func);
                     }
                 }, Schedulers.io())
@@ -446,14 +433,8 @@ public class ApplicationQueueManager implements QueueManager {
         return translatedPayloads;
     }
 
-    public static String[] getQueueNames(Properties properties) {
-        String[] names = properties.getProperty(ApplicationQueueManager.DEFAULT_QUEUE_PROPERTY,ApplicationQueueManager.DEFAULT_QUEUE_NAME).split(";");
-        return names;
-    }
-    public static String getRandomQueue(String[] queueNames) {
-        int size = queueNames.length;
-        Random random = new Random();
-        String name = queueNames[random.nextInt(size)];
+    public static String getQueueNames(Properties properties) {
+        String name = properties.getProperty(ApplicationQueueManager.DEFAULT_QUEUE_PROPERTY,ApplicationQueueManager.DEFAULT_QUEUE_NAME);
         return name;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fadf5277/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueMessage.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueMessage.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueMessage.java
index 91f1312..fa75531 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueMessage.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueMessage.java
@@ -16,6 +16,7 @@
  */
 package org.apache.usergrid.services.notifications;
 
+import java.io.Serializable;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import org.apache.usergrid.mq.Message;
@@ -28,23 +29,25 @@ import org.slf4j.LoggerFactory;
 /**
  * Created by ApigeeCorporation on 9/4/14.
  */
-public class ApplicationQueueMessage extends Message {
+public class ApplicationQueueMessage implements Serializable {
 
     private static final Logger log = LoggerFactory.getLogger(ApplicationQueueMessage.class);
-
-    static final String MESSAGE_PROPERTY_DEVICE_UUID = "deviceUUID";
-    static final String MESSAGE_PROPERTY_APPLICATION_UUID = "applicationUUID";
-    static final String MESSAGE_PROPERTY_NOTIFIER_ID = "notifierId";
-    static final String MESSAGE_PROPERTY_NOTIFICATION_ID = "notificationId";
-    static final String MESSAGE_PROPERTY_NOTIFIER_NAME = "notifierName";
+    private UUID applicationId;
+    private UUID notificationId;
+    private UUID deviceId;
+    private String notifierKey;
+    private String notifierId;
 
 
     public ApplicationQueueMessage() {
     }
 
     public ApplicationQueueMessage(UUID applicationId, UUID notificationId, UUID deviceId, String notifierKey, String notifierId) {
-        setApplicationId(applicationId);
-        setDeviceId(deviceId);
+        this.applicationId = applicationId;
+        this.notificationId = notificationId;
+        this.deviceId = deviceId;
+        this.notifierKey = notifierKey;
+        this.notifierId = notifierId;
         setNotificationId(notificationId);
         setNotifierKey(notifierKey);
         setNotifierId(notifierId);
@@ -58,71 +61,45 @@ public class ApplicationQueueMessage extends Message {
         return new UUID( msb, lsb );
     }
 
-    public static ApplicationQueueMessage generate(Message message) {
-
-        // this crazyness may indicate that Core Persistence is not storing UUIDs correctly
-
-        byte[] mpaBytes = (byte[])message.getObjectProperty(MESSAGE_PROPERTY_APPLICATION_UUID);
-        UUID mpaUuid = bytesToUuid(mpaBytes);
-
-        byte[] mpnBytes = (byte[])message.getObjectProperty(MESSAGE_PROPERTY_NOTIFICATION_ID);
-        UUID mpnUuid = bytesToUuid(mpnBytes);
-
-        final UUID mpdUuid;
-        Object o = message.getObjectProperty(MESSAGE_PROPERTY_DEVICE_UUID);
-        if ( o instanceof UUID ) {
-            mpdUuid = (UUID)message.getObjectProperty(MESSAGE_PROPERTY_DEVICE_UUID);
-        } else {
-            byte[] mpdBytes = (byte[])o;
-            mpdUuid =  bytesToUuid(mpdBytes);
-        }
-
-        // end of crazyness
-
-        return new ApplicationQueueMessage(
-                mpaUuid, mpnUuid, mpdUuid,
-                message.getStringProperty(MESSAGE_PROPERTY_NOTIFIER_NAME), 
-                message.getStringProperty(MESSAGE_PROPERTY_NOTIFIER_ID));
-    }
 
     public UUID getApplicationId() {
-        return (UUID) this.getObjectProperty(MESSAGE_PROPERTY_APPLICATION_UUID);
+        return applicationId;
     }
 
     public void setApplicationId(UUID applicationId) {
-        this.setProperty(MESSAGE_PROPERTY_APPLICATION_UUID, applicationId);
+       this.applicationId = applicationId;
     }
 
     public UUID getDeviceId() {
-        return (UUID) this.getObjectProperty(MESSAGE_PROPERTY_DEVICE_UUID);
+        return deviceId;
     }
 
     public void setDeviceId(UUID deviceId) {
-        this.setProperty(MESSAGE_PROPERTY_DEVICE_UUID, deviceId);
+        this.deviceId = deviceId;
     }
 
     public UUID getNotificationId() {
-        return (UUID) this.getObjectProperty(MESSAGE_PROPERTY_NOTIFICATION_ID);
+        return notificationId;
     }
 
     public void setNotificationId(UUID notificationId) {
-        this.setProperty(MESSAGE_PROPERTY_NOTIFICATION_ID, notificationId);
+       this.notificationId = notificationId;
     }
 
     public String getNotifierId() {
-        return this.getStringProperty(MESSAGE_PROPERTY_NOTIFIER_ID);
+        return notifierId;
     }
 
     public void setNotifierId(String notifierId) {
-        this.setProperty(MESSAGE_PROPERTY_NOTIFIER_ID, notifierId);
+         this.notifierId = notifierId;
     }
 
     public String getNotifierKey() {
-        return this.getStringProperty(MESSAGE_PROPERTY_NOTIFIER_NAME);
+        return notifierKey;
     }
 
     public void setNotifierKey(String name) {
-        this.setProperty(MESSAGE_PROPERTY_NOTIFIER_NAME, name);
+        notifierKey = name;
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fadf5277/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
index 4e5692e..c5cd3c4 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
@@ -20,6 +20,7 @@ import java.util.*;
 
 import com.codahale.metrics.*;
 import com.codahale.metrics.Timer;
+import org.apache.usergrid.corepersistence.CpSetup;
 import org.apache.usergrid.metrics.MetricsFactory;
 import org.apache.usergrid.mq.Message;
 import org.apache.usergrid.persistence.*;
@@ -28,6 +29,11 @@ import org.apache.usergrid.persistence.entities.Notifier;
 import org.apache.usergrid.persistence.entities.Receipt;
 import org.apache.usergrid.persistence.index.query.Identifier;
 import org.apache.usergrid.persistence.index.query.Query;
+import org.apache.usergrid.persistence.model.entity.SimpleId;
+import org.apache.usergrid.persistence.queue.QueueManager;
+import org.apache.usergrid.persistence.queue.QueueManagerFactory;
+import org.apache.usergrid.persistence.queue.QueueScope;
+import org.apache.usergrid.persistence.queue.impl.QueueScopeImpl;
 import org.apache.usergrid.services.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -80,10 +86,9 @@ public class NotificationsService extends AbstractCollectionService {
 
     private ApplicationQueueManager notificationQueueManager;
     private long gracePeriod;
-    @Autowired
     private ServiceManagerFactory smf;
-    @Autowired
     private EntityManagerFactory emf;
+    private QueueManagerFactory queueManagerFactory;
 
     public NotificationsService() {
         LOG.info("/notifications");
@@ -99,7 +104,11 @@ public class NotificationsService extends AbstractCollectionService {
         postMeter = metricsService.getMeter(NotificationsService.class, "requests");
         postTimer = metricsService.getTimer(this.getClass(), "execution_rest");
         JobScheduler jobScheduler = new JobScheduler(sm,em);
-        notificationQueueManager = new ApplicationQueueManager(jobScheduler,em,smf.getServiceManager(smf.getManagementAppId()).getQueueManager(),metricsService,props);
+        String name = ApplicationQueueManager.getQueueNames(props);
+        QueueScope queueScope = new QueueScopeImpl(new SimpleId(smf.getManagementAppId(),"notifications"),name);
+        queueManagerFactory = CpSetup.getInjector().getInstance(QueueManagerFactory.class);
+        QueueManager queueManager = queueManagerFactory.getQueueManager(queueScope);
+        notificationQueueManager = new ApplicationQueueManager(jobScheduler,em,queueManager,metricsService,props);
         gracePeriod = jobScheduler.SCHEDULER_GRACE_PERIOD;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fadf5277/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
index 7d76ee9..a9e0702 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
@@ -16,11 +16,17 @@
  */
 package org.apache.usergrid.services.notifications;
 
+import org.apache.usergrid.corepersistence.CpSetup;
 import org.apache.usergrid.metrics.MetricsFactory;
-import org.apache.usergrid.mq.*;
+
+import org.apache.usergrid.mq.QueueResults;
 import org.apache.usergrid.persistence.EntityManager;
 import org.apache.usergrid.persistence.EntityManagerFactory;
 
+import org.apache.usergrid.persistence.model.entity.SimpleId;
+import org.apache.usergrid.persistence.queue.*;
+import org.apache.usergrid.persistence.queue.QueueManager;
+import org.apache.usergrid.persistence.queue.impl.QueueScopeImpl;
 import org.apache.usergrid.services.ServiceManager;
 import org.apache.usergrid.services.ServiceManagerFactory;
 import org.slf4j.Logger;
@@ -35,7 +41,8 @@ import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicInteger;
 
 public class QueueListener  {
-    public  final long MESSAGE_TRANSACTION_TIMEOUT =  1 * 60 * 1000;
+    public  final int MESSAGE_TRANSACTION_TIMEOUT =  25 * 1000;
+    private final QueueManagerFactory queueManagerFactory;
 
     public   long DEFAULT_SLEEP = 5000;
 
@@ -50,7 +57,6 @@ public class QueueListener  {
 
     private Properties properties;
 
-    private org.apache.usergrid.mq.QueueManager queueManager;
 
     private ServiceManager svcMgr;
 
@@ -62,12 +68,11 @@ public class QueueListener  {
     private List<Future> futures;
 
     public  final String MAX_THREADS = "2";
-    private Integer batchSize = 100;
-    private String[] queueNames;
-
-
+    private Integer batchSize = 10;
+    private String queueName;
 
     public QueueListener(ServiceManagerFactory smf, EntityManagerFactory emf, MetricsFactory metricsService, Properties props){
+        this.queueManagerFactory = CpSetup.getInjector().getInstance(QueueManagerFactory.class);
         this.smf = smf;
         this.emf = emf;
         this.metricsService = metricsService;
@@ -76,7 +81,7 @@ public class QueueListener  {
 
     @PostConstruct
     public void start(){
-        boolean shouldRun = new Boolean(properties.getProperty("usergrid.notifications.listener.run", "false"));
+        boolean shouldRun = new Boolean(properties.getProperty("usergrid.notifications.listener.run", "true"));
 
         if(shouldRun) {
             LOG.info("QueueListener: starting.");
@@ -86,7 +91,7 @@ public class QueueListener  {
                 sleepBetweenRuns = new Long(properties.getProperty("usergrid.notifications.listener.sleep.between", "0")).longValue();
                 sleepWhenNoneFound = new Long(properties.getProperty("usergrid.notifications.listener.sleep.after", ""+DEFAULT_SLEEP)).longValue();
                 batchSize = new Integer(properties.getProperty("usergrid.notifications.listener.batchSize", (""+batchSize)));
-                queueNames = ApplicationQueueManager.getQueueNames(properties);
+                queueName = ApplicationQueueManager.getQueueNames(properties);
 
                 int maxThreads = new Integer(properties.getProperty("usergrid.notifications.listener.maxThreads", MAX_THREADS));
                 futures = new ArrayList<Future>(maxThreads);
@@ -120,7 +125,7 @@ public class QueueListener  {
     }
 
     private void execute(){
-        Thread.currentThread().setDaemon(true);
+//        Thread.currentThread().setDaemon(true);
         Thread.currentThread().setName("Notifications_Processor"+UUID.randomUUID());
 
         final AtomicInteger consecutiveExceptions = new AtomicInteger();
@@ -130,31 +135,30 @@ public class QueueListener  {
         while ( true ) {
             try {
                 svcMgr = smf.getServiceManager(smf.getManagementAppId());
-                queueManager = svcMgr.getQueueManager();
-                String queueName = ApplicationQueueManager.getRandomQueue(queueNames);
                 LOG.info("getting from queue {} ", queueName);
-                QueueResults results = getDeliveryBatch(queueManager,queueName);
-                LOG.info("QueueListener: retrieved batch of {} messages from queue {} ", results.size(),queueName);
+                QueueScope queueScope = new QueueScopeImpl(new SimpleId(smf.getManagementAppId(),"notifications"),queueName);
+                QueueManager queueManager = queueManagerFactory.getQueueManager(queueScope);
+                List<QueueMessage> messages = getDeliveryBatch(queueManager);
+                LOG.info("QueueListener: retrieved batch of {} messages from queue {} ", messages.size(),queueName);
 
-                List<Message> messages = results.getMessages();
                 if (messages.size() > 0) {
-                    HashMap<UUID, List<ApplicationQueueMessage>> messageMap = new HashMap<>(messages.size());
+                    HashMap<UUID, List<QueueMessage>> messageMap = new HashMap<>(messages.size());
                     //group messages into hash map by app id
-                    for (Message message : messages) {
-                        ApplicationQueueMessage queueMessage = ApplicationQueueMessage.generate(message);
+                    for (QueueMessage message : messages) {
+                        ApplicationQueueMessage queueMessage = (ApplicationQueueMessage) message.getBody();
                         UUID applicationId = queueMessage.getApplicationId();
                         if (!messageMap.containsKey(applicationId)) {
-                            List<ApplicationQueueMessage> applicationQueueMessages = new ArrayList<ApplicationQueueMessage>();
-                            applicationQueueMessages.add(queueMessage);
+                            List<QueueMessage> applicationQueueMessages = new ArrayList<QueueMessage>();
+                            applicationQueueMessages.add(message);
                             messageMap.put(applicationId, applicationQueueMessages);
                         } else {
-                            messageMap.get(applicationId).add(queueMessage);
+                            messageMap.get(applicationId).add(message);
                         }
                     }
                     long now = System.currentTimeMillis();
                     Observable merge = null;
                     //send each set of app ids together
-                    for (Map.Entry<UUID, List<ApplicationQueueMessage>> entry : messageMap.entrySet()) {
+                    for (Map.Entry<UUID, List<QueueMessage>> entry : messageMap.entrySet()) {
                         UUID applicationId = entry.getKey();
                         EntityManager entityManager = emf.getEntityManager(applicationId);
                         ServiceManager serviceManager = smf.getServiceManager(applicationId);
@@ -218,11 +222,9 @@ public class QueueListener  {
         pool.shutdownNow();
     }
 
-    private  QueueResults getDeliveryBatch(org.apache.usergrid.mq.QueueManager queueManager,String queuePath) throws Exception {
-        QueueQuery qq = new QueueQuery();
-        qq.setLimit(this.getBatchSize());
-        qq.setTimeout(MESSAGE_TRANSACTION_TIMEOUT);
-        QueueResults results = queueManager.getFromQueue(queuePath, qq);
+    private  List<QueueMessage> getDeliveryBatch(QueueManager queueManager) throws Exception {
+
+        List<QueueMessage> results = queueManager.getMessages(getBatchSize(),MESSAGE_TRANSACTION_TIMEOUT);
         return results;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fadf5277/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueManager.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueManager.java
deleted file mode 100644
index 0024417..0000000
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueManager.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.usergrid.services.notifications;
-
-import org.apache.usergrid.persistence.entities.Notifier;
-
-import java.util.HashMap;
-import java.util.Set;
-
-/**
- * Created by ApigeeCorporation on 9/4/14.
- */
-public interface QueueManager {
-
-    public void asyncCheckForInactiveDevices(Set<Notifier> notifiers) throws Exception ;
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fadf5277/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java
index 08f067d..42bd65c 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java
@@ -23,6 +23,8 @@ import org.apache.usergrid.persistence.entities.Device;
 import org.apache.usergrid.persistence.entities.Notification;
 import org.apache.usergrid.persistence.entities.Notifier;
 import org.apache.usergrid.persistence.entities.Receipt;
+import org.apache.usergrid.persistence.queue.QueueManager;
+import org.apache.usergrid.persistence.queue.QueueMessage;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -33,28 +35,26 @@ import java.util.concurrent.atomic.AtomicLong;
 public class TaskManager {
 
     private static final Logger LOG = LoggerFactory.getLogger(TaskManager.class);
-    private final QueueManager proxy;
-    private final String queuePath;
+    private final ApplicationQueueManager proxy;
+    private final QueueManager queueManager;
 
     private Notification notification;
     private AtomicLong successes = new AtomicLong();
     private AtomicLong failures = new AtomicLong();
-    private org.apache.usergrid.mq.QueueManager qm;
     private EntityManager em;
-    private ConcurrentHashMap<UUID, ApplicationQueueMessage> messageMap;
+    private ConcurrentHashMap<UUID, QueueMessage> messageMap;
     private boolean hasFinished;
 
-    public TaskManager(EntityManager em, org.apache.usergrid.mq.QueueManager qm, QueueManager proxy, Notification notification, String queuePath) {
+    public TaskManager(EntityManager em,ApplicationQueueManager proxy, Notification notification, QueueManager queueManager) {
         this.em = em;
-        this.qm = qm;
         this.notification = notification;
         this.proxy = proxy;
-        this.messageMap = new ConcurrentHashMap<UUID, ApplicationQueueMessage>();
+        this.messageMap = new ConcurrentHashMap<UUID, QueueMessage>();
         hasFinished = false;
-        this.queuePath = queuePath;
+        this.queueManager = queueManager;
     }
 
-    public void addMessage(UUID deviceId, ApplicationQueueMessage message) {
+    public void addMessage(UUID deviceId, QueueMessage message) {
         messageMap.put(deviceId, message);
     }
 
@@ -62,9 +62,7 @@ public class TaskManager {
         LOG.debug("REMOVED {}", deviceUUID);
         try {
             LOG.debug("notification {} removing device {} from remaining", notification.getUuid(), deviceUUID);
-            if(queuePath!=null){
-                qm.commitTransaction(queuePath, messageMap.get(deviceUUID).getTransaction(), null);
-            }
+
 
             EntityRef deviceRef = new SimpleEntityRef(Device.ENTITY_TYPE, deviceUUID);
             if (receipt != null) {
@@ -148,6 +146,11 @@ public class TaskManager {
     }
 
     public void finishedBatch() throws Exception {
+        if(queueManager!=null){
+            List<QueueMessage> list = new ArrayList<QueueMessage>();
+            list.addAll(messageMap.values());
+            queueManager.commitMessages(list);
+        }
         long successes = this.successes.getAndSet(0); //reset counters
         long failures = this.failures.getAndSet(0); //reset counters
         this.hasFinished = true;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fadf5277/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java b/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java
index 27e6d27..12d0925 100644
--- a/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java
@@ -65,7 +65,7 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
 
     @BeforeClass
     public static void setup(){
-        ApplicationQueueManager.DEFAULT_QUEUE_NAME = "notifications/test/" + UUID.randomUUID().toString()+";"+"notifications/test/" + UUID.randomUUID().toString();
+        ApplicationQueueManager.DEFAULT_QUEUE_NAME = "test";
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fadf5277/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java b/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java
index 8907226..034723e 100644
--- a/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java
@@ -61,7 +61,7 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
 
     @BeforeClass
     public static void setup(){
-        ApplicationQueueManager.DEFAULT_QUEUE_NAME = "notifications/test/" + UUID.randomUUID().toString()+";"+"notifications/test/" + UUID.randomUUID().toString();
+        ApplicationQueueManager.DEFAULT_QUEUE_NAME = "test";
     }
     @Override
     @Before