You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/10/29 20:21:57 UTC

[16/22] git commit: move queue maanger behind interface

move queue maanger behind interface


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/8f4720db
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/8f4720db
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/8f4720db

Branch: refs/heads/two-dot-o-events
Commit: 8f4720db79534172f964378c08b5f51884e046d0
Parents: 955a92b
Author: Shawn Feldman <sf...@apache.org>
Authored: Wed Oct 29 11:05:27 2014 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Wed Oct 29 11:05:27 2014 -0600

----------------------------------------------------------------------
 .../notifications/ApplicationQueueManager.java  | 532 +------------------
 .../notifications/InactiveDeviceManager.java    |   3 +-
 .../notifications/NotificationsService.java     |  14 +-
 .../services/notifications/QueueListener.java   |   7 +-
 .../services/notifications/TaskManager.java     |  17 +-
 .../apns/ExpiredTokenListener.java              |   8 -
 .../impl/ApplicationQueueManagerImpl.java       | 523 ++++++++++++++++++
 .../apns/NotificationsServiceIT.java            |   7 +-
 .../gcm/NotificationsServiceIT.java             |   7 +-
 9 files changed, 569 insertions(+), 549 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8f4720db/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
index 1058c34..8012b42 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
@@ -1,526 +1,48 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  *  contributor license agreements.  The ASF licenses this file to You
+ *  * under the Apache License, Version 2.0 (the "License"); you may not
+ *  * use this file except in compliance with the License.
+ *  * You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.  For additional information regarding
+ *  * copyright in this work, please see the NOTICE file in the top level
+ *  * directory of this distribution.
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
  */
+
 package org.apache.usergrid.services.notifications;
 
-import com.clearspring.analytics.hash.MurmurHash;
-import com.clearspring.analytics.stream.frequency.CountMinSketch;
-import com.codahale.metrics.Meter;
 import org.apache.usergrid.batch.JobExecution;
-import org.apache.usergrid.metrics.MetricsFactory;
-import org.apache.usergrid.persistence.*;
-import org.apache.usergrid.persistence.entities.Device;
 import org.apache.usergrid.persistence.entities.Notification;
-import org.apache.usergrid.persistence.entities.Notifier;
-import org.apache.usergrid.persistence.entities.Receipt;
-import org.apache.usergrid.persistence.index.query.Query;
-import org.apache.usergrid.persistence.queue.QueueManager;
 import org.apache.usergrid.persistence.queue.QueueMessage;
-import org.apache.usergrid.services.notifications.apns.APNsAdapter;
-import org.apache.usergrid.services.notifications.gcm.GCMAdapter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import rx.Observable;
-import rx.Subscriber;
-import rx.functions.Action1;
-import rx.functions.Func1;
-import rx.schedulers.Schedulers;
-
-import java.security.Provider;
-import java.util.*;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.AtomicInteger;
 
+import java.util.List;
 
-public class ApplicationQueueManager  {
+/**
+ * Classy class class.
+ */
+public interface ApplicationQueueManager {
 
-    public static  String DEFAULT_QUEUE_NAME = "push_v1";
     public static final String DEFAULT_QUEUE_PROPERTY = "usergrid.notifications.listener.queue";
-    private static final Logger LOG = LoggerFactory.getLogger(ApplicationQueueManager.class);
-
-    //this is for tests, will not mark initial post complete, set to false for tests
 
-    private static ExecutorService INACTIVE_DEVICE_CHECK_POOL = Executors.newFixedThreadPool(5);
     public static final String NOTIFIER_ID_POSTFIX = ".notifier.id";
 
-    private final EntityManager em;
-    private final QueueManager qm;
-    private final JobScheduler jobScheduler;
-    private final MetricsFactory metricsFactory;
-    private final String queueName;
-
-    HashMap<Object, ProviderAdapter> notifierHashMap; // only retrieve notifiers once
-
-
-    public ApplicationQueueManager(JobScheduler jobScheduler, EntityManager entityManager, QueueManager queueManager, MetricsFactory metricsFactory, Properties properties){
-        this.em = entityManager;
-        this.qm = queueManager;
-        this.jobScheduler = jobScheduler;
-        this.metricsFactory = metricsFactory;
-        this.queueName = getQueueNames(properties);
-
-    }
-
-    public boolean scheduleQueueJob(Notification notification) throws Exception{
-        return jobScheduler.scheduleQueueJob(notification);
-    }
-
-    public void queueNotification(final Notification notification, final JobExecution jobExecution) throws Exception {
-        if(scheduleQueueJob(notification)){
-            em.update(notification);
-            return;
-        }
-        final Meter queueMeter = metricsFactory.getMeter(ApplicationQueueManager.class,"queue");
-        long startTime = System.currentTimeMillis();
-
-        if (notification.getCanceled() == Boolean.TRUE) {
-            LOG.info("notification " + notification.getUuid() + " canceled");
-            if (jobExecution != null) {
-                jobExecution.killed();
-            }
-            return;
-        }
-
-        LOG.info("notification {} start queuing", notification.getUuid());
-
-        final PathQuery<Device> pathQuery = notification.getPathQuery() ; //devices query
-        final AtomicInteger deviceCount = new AtomicInteger(); //count devices so you can make a judgement on batching
-        final ConcurrentLinkedQueue<String> errorMessages = new ConcurrentLinkedQueue<String>(); //build up list of issues
-
-        final HashMap<Object,ProviderAdapter> notifierMap =  getNotifierMap();
-
-        //get devices in querystring, and make sure you have access
-        if (pathQuery != null) {
-            LOG.info("notification {} start query", notification.getUuid());
-            final Iterator<Device> iterator = pathQuery.iterator(em);
-            //if there are more pages (defined by PAGE_SIZE) you probably want this to be async, also if this is already a job then don't reschedule
-            if (iterator instanceof ResultsIterator && ((ResultsIterator) iterator).hasPages() && jobExecution == null) {
-                jobScheduler.scheduleQueueJob(notification, true);
-                em.update(notification);
-                return;
-            }
-            final CountMinSketch sketch = new CountMinSketch(0.0001,.99,7364181); //add probablistic counter to find dups
-            final UUID appId = em.getApplication().getUuid();
-            final Map<String,Object> payloads = notification.getPayloads();
-
-            final Func1<Entity,Entity> entityListFunct = new Func1<Entity, Entity>() {
-                @Override
-                public Entity call(Entity entity) {
-
-                    try {
-
-                        long now = System.currentTimeMillis();
-                        List<EntityRef> devicesRef = getDevices(entity); // resolve group
-
-                        LOG.info("notification {} queue  {} devices, duration "+(System.currentTimeMillis()-now)+" ms", notification.getUuid(), devicesRef.size());
-
-                        for (EntityRef deviceRef : devicesRef) {
-                            LOG.info("notification {} starting to queue device {} ", notification.getUuid(), deviceRef.getUuid());
-                            long hash = MurmurHash.hash(deviceRef.getUuid());
-                            if (sketch.estimateCount(hash) > 0) { //look for duplicates
-                                LOG.warn("Maybe Found duplicate device: {}", deviceRef.getUuid());
-                                continue;
-                            } else {
-                                sketch.add(hash, 1);
-                            }
-                            String notifierId = null;
-                            String notifierKey = null;
-
-                            //find the device notifier info, match it to the payload
-                            for (Map.Entry<String, Object> entry : payloads.entrySet()) {
-                                ProviderAdapter adapter = notifierMap.get(entry.getKey().toLowerCase());
-                                now = System.currentTimeMillis();
-                                String providerId = getProviderId(deviceRef, adapter.getNotifier());
-                                if (providerId != null) {
-                                    notifierId = providerId;
-                                    notifierKey = entry.getKey().toLowerCase();
-                                    break;
-                                }
-                                LOG.info("Provider query for notification {} device {} took "+(System.currentTimeMillis()-now)+" ms",notification.getUuid(),deviceRef.getUuid());
-                            }
-
-                            if (notifierId == null) {
-                                LOG.info("Notifier did not match for device {} ", deviceRef);
-                                continue;
-                            }
-
-                            ApplicationQueueMessage message = new ApplicationQueueMessage(appId, notification.getUuid(), deviceRef.getUuid(), notifierKey, notifierId);
-                            if (notification.getQueued() == null) {
-                                // update queued time
-                                now = System.currentTimeMillis();
-                                notification.setQueued(System.currentTimeMillis());
-                                LOG.info("notification {} device {} queue time set. duration "+(System.currentTimeMillis()-now)+" ms", notification.getUuid(), deviceRef.getUuid());
-                            }
-                            now = System.currentTimeMillis();
-                            qm.sendMessage(message);
-                            LOG.info("notification {} post-queue to device {} duration " + (System.currentTimeMillis() - now) + " ms "+queueName+" queue", notification.getUuid(), deviceRef.getUuid());
-                            deviceCount.incrementAndGet();
-                            queueMeter.mark();
-                        }
-                    } catch (Exception deviceLoopException) {
-                        LOG.error("Failed to add devices", deviceLoopException);
-                        errorMessages.add("Failed to add devices for entity: " + entity.getUuid() + " error:" + deviceLoopException);
-                    }
-                    return entity;
-                }
-            };
-
-            long now = System.currentTimeMillis();
-            Observable o = rx.Observable.create(new IteratorObservable<Entity>(iterator))
-                    .parallel(new Func1<Observable<Entity>, Observable<Entity>>() {
-                        @Override
-                        public rx.Observable<Entity> call(rx.Observable<Entity> deviceObservable) {
-                            return deviceObservable.map(entityListFunct);
-                        }
-                    }, Schedulers.io())
-                    .doOnError(new Action1<Throwable>() {
-                        @Override
-                        public void call(Throwable throwable) {
-                            LOG.error("Failed while writing", throwable);
-                        }
-                    });
-            o.toBlocking().lastOrDefault(null);
-            LOG.info("notification {} done queueing duration {} ms", notification.getUuid(), System.currentTimeMillis() - now);
-        }
-
-        // update queued time
-        Map<String, Object> properties = new HashMap<String, Object>(2);
-        properties.put("queued", notification.getQueued());
-        properties.put("state", notification.getState());
-        if(errorMessages.size()>0){
-            if (notification.getErrorMessage() == null) {
-                notification.setErrorMessage("There was a problem delivering all of your notifications. See deliveryErrors in properties");
-            }
-        }
-
-        notification.setExpectedCount(deviceCount.get());
-        notification.addProperties(properties);
-        long now = System.currentTimeMillis();
-
-
-        LOG.info("notification {} updated notification duration {} ms", notification.getUuid(), System.currentTimeMillis() - now);
-
-        //do i have devices, and have i already started batching.
-        if (deviceCount.get() <= 0 || !notification.getDebug()) {
-            TaskManager taskManager = new TaskManager(em, this, notification);
-            //if i'm in a test value will be false, do not mark finished for test orchestration, not ideal need real tests
-            taskManager.finishedBatch(false,true);
-        }else {
-            em.update(notification);
-        }
-
-        long elapsed = notification.getQueued() != null ? notification.getQueued() - startTime : 0;
-        LOG.info("notification {} done queuing to {} devices in " + elapsed + " ms", notification.getUuid().toString(), deviceCount.get());
-    }
-
-    /**
-     * only need to get notifiers once. will reset on next batch
-     * @return
-     */
-    public HashMap<Object,ProviderAdapter> getNotifierMap(){
-        if(notifierHashMap == null) {
-            long now = System.currentTimeMillis();
-            notifierHashMap = new HashMap<Object, ProviderAdapter>();
-            Query query = new Query();
-            query.setCollection("notifiers");
-            query.setLimit(100);
-            PathQuery<Notifier> pathQuery = new PathQuery<Notifier>(
-                    new SimpleEntityRef(em.getApplicationRef()),
-                    query
-            );
-            Iterator<Notifier> notifierIterator = pathQuery.iterator(em);
-            int count = 0;
-            while (notifierIterator.hasNext()) {
-                Notifier notifier = notifierIterator.next();
-                String name = notifier.getName() != null ? notifier.getName() : "";
-                UUID uuid = notifier.getUuid() != null ? notifier.getUuid() : UUID.randomUUID();
-                ProviderAdapter providerAdapter = ProviderAdapterFactory.getProviderAdapter(notifier,em);
-                notifierHashMap.put(name.toLowerCase(), providerAdapter);
-                notifierHashMap.put(uuid, providerAdapter);
-                notifierHashMap.put(uuid.toString(), providerAdapter);
-                if(count++ >= 100){
-                    LOG.error("ApplicationQueueManager: too many notifiers...breaking out ", notifierHashMap.size());
-                    break;
-                }
-            }
-            LOG.info("ApplicationQueueManager: fetching notifiers finished size={}, duration {} ms", notifierHashMap.size(),System.currentTimeMillis() - now);
-        }
-        return notifierHashMap;
-    }
-
-    /**
-     * send batches of notifications to provider
-     * @param messages
-     * @throws Exception
-     */
-    public Observable sendBatchToProviders( final List<QueueMessage> messages, final String queuePath) {
-        LOG.info("sending batch of {} notifications.", messages.size());
-        final Meter sendMeter = metricsFactory.getMeter(NotificationsService.class, "send");
-
-        final Map<Object, ProviderAdapter> notifierMap = getNotifierMap();
-        final ApplicationQueueManager proxy = this;
-        final ConcurrentHashMap<UUID,TaskManager> taskMap = new ConcurrentHashMap<UUID, TaskManager>(messages.size());
-        final ConcurrentHashMap<UUID,Notification> notificationMap = new ConcurrentHashMap<UUID, Notification>(messages.size());
-
-        final Func1<QueueMessage, ApplicationQueueMessage> func = new Func1<QueueMessage, ApplicationQueueMessage>() {
-            @Override
-            public ApplicationQueueMessage call(QueueMessage queueMessage) {
-                boolean messageCommitted = false;
-                ApplicationQueueMessage message = null;
-                try {
-                    message = (ApplicationQueueMessage) queueMessage.getBody();
-                    LOG.info("start sending notification for device {} for Notification: {} on thread "+Thread.currentThread().getId(), message.getDeviceId(), message.getNotificationId());
-
-                    UUID deviceUUID = message.getDeviceId();
-
-                    Notification notification = notificationMap.get(message.getNotificationId());
-                    if (notification == null) {
-                        notification = em.get(message.getNotificationId(), Notification.class);
-                        notificationMap.put(message.getNotificationId(), notification);
-                    }
-                    TaskManager taskManager = taskMap.get(message.getNotificationId());
-                    if (taskManager == null) {
-                        taskManager = new TaskManager(em, proxy, notification);
-                        taskMap.putIfAbsent(message.getNotificationId(), taskManager);
-                        taskManager = taskMap.get(message.getNotificationId());
-                    }
-
-                    final Map<String, Object> payloads = notification.getPayloads();
-                    final Map<String, Object> translatedPayloads = translatePayloads(payloads, notifierMap);
-                    LOG.info("sending notification for device {} for Notification: {}", deviceUUID, notification.getUuid());
-
-                    try {
-                        String notifierName = message.getNotifierKey().toLowerCase();
-                        ProviderAdapter providerAdapter = notifierMap.get(notifierName.toLowerCase());
-                        Object payload = translatedPayloads.get(notifierName);
-                        Receipt receipt = new Receipt(notification.getUuid(), message.getNotifierId(), payload, deviceUUID);
-                        TaskTracker tracker = new TaskTracker(providerAdapter.getNotifier(), taskManager, receipt, deviceUUID);
-                        if(!isOkToSend(notification)){
-                             tracker.failed(0, "Notification is duplicate/expired/cancelled.");
-                        }else {
-                            if (payload == null) {
-                                LOG.debug("selected device {} for notification {} doesn't have a valid payload. skipping.", deviceUUID, notification.getUuid());
-                                tracker.failed(0, "failed to match payload to " + message.getNotifierId() + " notifier");
-                            } else {
-                                long now = System.currentTimeMillis();
-                                try {
-                                    providerAdapter.sendNotification(message.getNotifierId(), payload, notification, tracker);
-                                } catch (Exception e) {
-                                    tracker.failed(0, e.getMessage());
-                                } finally {
-                                    LOG.info("sending to device {} for Notification: {} duration " + (System.currentTimeMillis() - now) + " ms", deviceUUID, notification.getUuid());
-                                }
-                            }
-                        }
-                        messageCommitted = true;
-                    } finally {
-                        sendMeter.mark();
-                    }
-
-                } catch (Exception e) {
-                    LOG.error("Failure while sending",e);
-                    try {
-                        if(!messageCommitted && queuePath != null) {
-                            qm.commitMessage(queueMessage);
-                        }
-                    }catch (Exception queueException){
-                        LOG.error("Failed to commit message.",queueException);
-                    }
-                }
-                return message;
-            }
-        };
-        Observable o = rx.Observable.from(messages)
-                .parallel(new Func1<rx.Observable<QueueMessage>, rx.Observable<ApplicationQueueMessage>>() {
-                    @Override
-                    public rx.Observable<ApplicationQueueMessage> call(rx.Observable<QueueMessage> messageObservable) {
-                        return messageObservable.map(func);
-                    }
-                }, Schedulers.io())
-                .buffer(messages.size())
-                .map(new Func1<List<ApplicationQueueMessage>, HashMap<UUID, ApplicationQueueMessage>>() {
-                    @Override
-                    public HashMap<UUID, ApplicationQueueMessage> call(List<ApplicationQueueMessage> queueMessages) {
-                        //for gcm this will actually send notification
-                        for (ProviderAdapter providerAdapter : notifierMap.values()) {
-                            try {
-                                providerAdapter.doneSendingNotifications();
-                            } catch (Exception e) {
-                                LOG.error("providerAdapter.doneSendingNotifications: ", e);
-                            }
-                        }
-                        //TODO: check if a notification is done and mark it
-                        HashMap<UUID, ApplicationQueueMessage> notifications = new HashMap<UUID, ApplicationQueueMessage>();
-                        for (ApplicationQueueMessage message : queueMessages) {
-                            if (notifications.get(message.getNotificationId()) == null) {
-                                try {
-                                    TaskManager taskManager = taskMap.get(message.getNotificationId());
-                                    notifications.put(message.getNotificationId(), message);
-                                    taskManager.finishedBatch();
-                                } catch (Exception e) {
-                                    LOG.error("Failed to finish batch", e);
-                                }
-                            }
-
-                        }
-                        return notifications;
-                    }
-                })
-                .doOnError(new Action1<Throwable>() {
-                    @Override
-                    public void call(Throwable throwable) {
-                        LOG.error("Failed while sending",throwable);
-                    }
-                });
-        return o;
-    }
-
-    public void stop(){
-        for(ProviderAdapter adapter : getNotifierMap().values()){
-            try {
-                adapter.stop();
-            }catch (Exception e){
-                LOG.error("failed to stop adapter",e);
-            }
-        }
-    }
-
-
-    /**
-     * Call the adapter with the notifier
-     */
-    private Map<String, Object> translatePayloads(Map<String, Object> payloads, Map<Object, ProviderAdapter> notifierMap) throws Exception {
-        Map<String, Object> translatedPayloads = new HashMap<String, Object>(  payloads.size());
-        for (Map.Entry<String, Object> entry : payloads.entrySet()) {
-            String payloadKey = entry.getKey().toLowerCase();
-            Object payloadValue = entry.getValue();
-            ProviderAdapter providerAdapter = notifierMap.get(payloadKey);
-            if (providerAdapter != null) {
-                Object translatedPayload = payloadValue != null ? providerAdapter.translatePayload(payloadValue) : null;
-                if (translatedPayload != null) {
-                    translatedPayloads.put(payloadKey, translatedPayload);
-                }
-            }
-        }
-        return translatedPayloads;
-    }
-
-    public static String getQueueNames(Properties properties) {
-        String name = properties.getProperty(ApplicationQueueManager.DEFAULT_QUEUE_PROPERTY,ApplicationQueueManager.DEFAULT_QUEUE_NAME);
-        return name;
-    }
-
-    private static final class IteratorObservable<T> implements rx.Observable.OnSubscribe<T> {
-        private final Iterator<T> input;
-        private IteratorObservable( final Iterator input ) {this.input = input;}
-
-        @Override
-        public void call( final Subscriber<? super T> subscriber ) {
-
-            /**
-             * You would replace this code with your file reading.  Instead of emitting from an iterator,
-             * you would create a bean object that represents the entity, and then emit it
-             */
-
-            try {
-                while ( !subscriber.isUnsubscribed() && input.hasNext() ) {
-                    //send our input to the next
-                    subscriber.onNext( (T) input.next() );
-                }
-
-                //tell the subscriber we don't have any more data
-                subscriber.onCompleted();
-            }
-            catch ( Throwable t ) {
-                LOG.error("failed on subscriber",t);
-                subscriber.onError( t );
-            }
-        }
-    }
-
-    public void asyncCheckForInactiveDevices() throws Exception {
-        Collection<ProviderAdapter> providerAdapters = getNotifierMap().values();
-        for (final ProviderAdapter providerAdapter : providerAdapters) {
-            try {
-                if (providerAdapter != null) {
-                    LOG.debug("checking notifier {} for inactive devices", providerAdapter.getNotifier());
-                    providerAdapter.removeInactiveDevices();
-
-                    LOG.debug("finished checking notifier {} for inactive devices",providerAdapter.getNotifier());
-                }
-            } catch (Exception e) {
-                LOG.error("checkForInactiveDevices", e); // not
-                // essential so
-                // don't fail,
-                // but log
-            }
-        }
-    }
-
-
-    private boolean isOkToSend(Notification notification) {
-        Map<String,Long> stats = notification.getStatistics();
-        if (stats != null && notification.getExpectedCount() == (stats.get("sent")+ stats.get("errors"))) {
-            LOG.info("notification {} already processed. not sending.",
-                    notification.getUuid());
-            return false;
-        }
-        if (notification.getCanceled() == Boolean.TRUE) {
-            LOG.info("notification {} canceled. not sending.",
-                    notification.getUuid());
-            return false;
-        }
-        if (notification.isExpired()) {
-            LOG.info("notification {} expired. not sending.",
-                    notification.getUuid());
-            return false;
-        }
-        return true;
-    }
-
-    private List<EntityRef> getDevices(EntityRef ref) throws Exception {
-        List<EntityRef> devices = Collections.EMPTY_LIST;
-        if ("device".equals(ref.getType())) {
-            devices = Collections.singletonList(ref);
-        } else if ("user".equals(ref.getType())) {
-            devices = em.getCollection(ref, "devices", null, Query.MAX_LIMIT,
-                    Query.Level.REFS, false).getRefs();
-        } else if ("group".equals(ref.getType())) {
-            devices = new ArrayList<EntityRef>();
-            for (EntityRef r : em.getCollection(ref, "users", null,
-                    Query.MAX_LIMIT, Query.Level.REFS, false).getRefs()) {
-                devices.addAll(getDevices(r));
-            }
-        }
-        return devices;
-    }
+    public static final  String DEFAULT_QUEUE_NAME = "push_v1";
 
+    void queueNotification(Notification notification, JobExecution jobExecution) throws Exception;
 
-    private String getProviderId(EntityRef device, Notifier notifier) throws Exception {
-        try {
-            Object value = em.getProperty(device, notifier.getName() + NOTIFIER_ID_POSTFIX);
-            if (value == null) {
-                value = em.getProperty(device, notifier.getUuid() + NOTIFIER_ID_POSTFIX);
-            }
-            return value != null ? value.toString() : null;
-        } catch (Exception e) {
-            LOG.error("Errer getting provider ID, proceding with rest of batch", e);
-            return null;
-        }
-    }
+    Observable sendBatchToProviders(List<QueueMessage> messages, String queuePath);
 
+    void stop();
 
-}
\ No newline at end of file
+    void asyncCheckForInactiveDevices() throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8f4720db/stack/services/src/main/java/org/apache/usergrid/services/notifications/InactiveDeviceManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/InactiveDeviceManager.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/InactiveDeviceManager.java
index 82841d2..108a4a0 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/InactiveDeviceManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/InactiveDeviceManager.java
@@ -25,6 +25,7 @@ import org.apache.usergrid.persistence.EntityManager;
 import org.apache.usergrid.persistence.Results;
 import org.apache.usergrid.persistence.entities.Notifier;
 import org.apache.usergrid.persistence.index.query.Query;
+import org.apache.usergrid.services.notifications.impl.ApplicationQueueManagerImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,7 +46,7 @@ public class InactiveDeviceManager {
         this.entityManager = entityManager;
     }
     public void removeInactiveDevices( Map<String,Date> inactiveDeviceMap  ){
-        final String notfierPostFix = ApplicationQueueManager.NOTIFIER_ID_POSTFIX;
+        final String notfierPostFix = ApplicationQueueManagerImpl.NOTIFIER_ID_POSTFIX;
         if (inactiveDeviceMap != null && inactiveDeviceMap.size() > 0) {
             LOG.debug("processing {} inactive devices",  inactiveDeviceMap.size());
             Map<String, Object> clearPushtokenMap = new HashMap<String, Object>( 2);

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8f4720db/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
index a64704c..5de4d7d 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
@@ -29,13 +29,12 @@ import org.apache.usergrid.persistence.entities.Notifier;
 import org.apache.usergrid.persistence.entities.Receipt;
 import org.apache.usergrid.persistence.index.query.Identifier;
 import org.apache.usergrid.persistence.index.query.Query;
-import org.apache.usergrid.persistence.model.entity.SimpleId;
 import org.apache.usergrid.persistence.queue.QueueManager;
 import org.apache.usergrid.persistence.queue.QueueManagerFactory;
 import org.apache.usergrid.persistence.queue.QueueScope;
 import org.apache.usergrid.persistence.queue.QueueScopeFactory;
-import org.apache.usergrid.persistence.queue.impl.QueueScopeImpl;
 import org.apache.usergrid.services.*;
+import org.apache.usergrid.services.notifications.impl.ApplicationQueueManagerImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -44,9 +43,6 @@ import org.apache.usergrid.persistence.exceptions.RequiredPropertyNotFoundExcept
 import org.apache.usergrid.services.exceptions.ForbiddenServiceOperationException;
 import static org.apache.usergrid.utils.InflectionUtils.pluralize;
 
-import org.apache.usergrid.services.notifications.apns.APNsAdapter;
-import org.apache.usergrid.services.notifications.gcm.GCMAdapter;
-import org.springframework.beans.factory.annotation.Autowired;
 import rx.Observable;
 import rx.functions.Func1;
 import rx.schedulers.Schedulers;
@@ -63,8 +59,6 @@ public class NotificationsService extends AbstractCollectionService {
     //need a mocking framework, this is to substitute for no mocking
     public static QueueManager TEST_QUEUE_MANAGER = null;
 
-    public static final String NOTIFIER_ID_POSTFIX = ".notifier.id";
-
     static final String MESSAGE_PROPERTY_DEVICE_UUID = "deviceUUID";
 
     static {
@@ -94,12 +88,12 @@ public class NotificationsService extends AbstractCollectionService {
         postMeter = metricsService.getMeter(NotificationsService.class, "requests");
         postTimer = metricsService.getTimer(this.getClass(), "execution_rest");
         JobScheduler jobScheduler = new JobScheduler(sm,em);
-        String name = ApplicationQueueManager.getQueueNames(props);
+        String name = ApplicationQueueManagerImpl.getQueueNames(props);
         QueueScopeFactory queueScopeFactory = CpSetup.getInjector().getInstance(QueueScopeFactory.class);
         QueueScope queueScope = queueScopeFactory.getScope(smf.getManagementAppId(), name);
         queueManagerFactory = CpSetup.getInjector().getInstance(QueueManagerFactory.class);
         QueueManager queueManager = TEST_QUEUE_MANAGER !=null ? TEST_QUEUE_MANAGER : queueManagerFactory.getQueueManager(queueScope);
-        notificationQueueManager = new ApplicationQueueManager(jobScheduler,em,queueManager,metricsService,props);
+        notificationQueueManager = new ApplicationQueueManagerImpl(jobScheduler,em,queueManager,metricsService,props);
         gracePeriod = jobScheduler.SCHEDULER_GRACE_PERIOD;
     }
 
@@ -262,7 +256,7 @@ public class NotificationsService extends AbstractCollectionService {
                         throw new IllegalArgumentException("notifier \""
                                 + notifierId + "\" not found");
                     }
-                    ProviderAdapter providerAdapter = ProviderAdapterFactory.getProviderAdapter(notifier,em);
+                    ProviderAdapter providerAdapter = ProviderAdapterFactory.getProviderAdapter(notifier, em);
                     Object payload = entry.getValue();
                     try {
                         return providerAdapter.translatePayload(payload); // validate

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8f4720db/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
index 3c788a9..b5aaeda 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
@@ -25,12 +25,11 @@ import org.apache.usergrid.metrics.MetricsFactory;
 import org.apache.usergrid.persistence.EntityManager;
 import org.apache.usergrid.persistence.EntityManagerFactory;
 
-import org.apache.usergrid.persistence.model.entity.SimpleId;
 import org.apache.usergrid.persistence.queue.*;
 import org.apache.usergrid.persistence.queue.QueueManager;
-import org.apache.usergrid.persistence.queue.impl.QueueScopeImpl;
 import org.apache.usergrid.services.ServiceManager;
 import org.apache.usergrid.services.ServiceManagerFactory;
+import org.apache.usergrid.services.notifications.impl.ApplicationQueueManagerImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import rx.Observable;
@@ -96,7 +95,7 @@ public class QueueListener  {
                 sleepWhenNoneFound = new Long(properties.getProperty("usergrid.notifications.listener.sleep.after", ""+DEFAULT_SLEEP)).longValue();
                 batchSize = new Integer(properties.getProperty("usergrid.notifications.listener.batchSize", (""+batchSize)));
                 consecutiveCallsToRemoveDevices = new Integer(properties.getProperty("usergrid.notifications.inactive.interval", ""+200));
-                queueName = ApplicationQueueManager.getQueueNames(properties);
+                queueName = ApplicationQueueManagerImpl.getQueueNames(properties);
 
                 int maxThreads = new Integer(properties.getProperty("usergrid.notifications.listener.maxThreads", ""+MAX_THREADS));
 
@@ -251,7 +250,7 @@ public class QueueListener  {
                                  EntityManager entityManager = emf.getEntityManager(applicationId);
                                  ServiceManager serviceManager = smf.getServiceManager(applicationId);
 
-                                 ApplicationQueueManager manager = new ApplicationQueueManager(
+                                 ApplicationQueueManagerImpl manager = new ApplicationQueueManagerImpl(
                                          new JobScheduler(serviceManager, entityManager),
                                          entityManager,
                                          queueManager,

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8f4720db/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java
index 5902a93..148a2dc 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java
@@ -23,19 +23,15 @@ import org.apache.usergrid.persistence.entities.Device;
 import org.apache.usergrid.persistence.entities.Notification;
 import org.apache.usergrid.persistence.entities.Notifier;
 import org.apache.usergrid.persistence.entities.Receipt;
-import org.apache.usergrid.persistence.queue.QueueManager;
-import org.apache.usergrid.persistence.queue.QueueMessage;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
 
 public class TaskManager {
 
     private static final Logger LOG = LoggerFactory.getLogger(TaskManager.class);
-    private final ApplicationQueueManager proxy;
 
     private Notification notification;
     private AtomicLong successes = new AtomicLong();
@@ -43,10 +39,9 @@ public class TaskManager {
     private EntityManager em;
     private boolean hasFinished;
 
-    public TaskManager(EntityManager em,ApplicationQueueManager proxy, Notification notification) {
+    public TaskManager(EntityManager em, Notification notification) {
         this.em = em;
         this.notification = notification;
-        this.proxy = proxy;
         hasFinished = false;
     }
 
@@ -132,14 +127,14 @@ public class TaskManager {
     protected void replaceProviderId(EntityRef device, Notifier notifier,
                                      String newProviderId) throws Exception {
         Object value = em.getProperty(device, notifier.getName()
-                + NotificationsService.NOTIFIER_ID_POSTFIX);
+                + ApplicationQueueManager.NOTIFIER_ID_POSTFIX);
         if (value != null) {
-            em.setProperty(device, notifier.getName() + NotificationsService.NOTIFIER_ID_POSTFIX, newProviderId);
+            em.setProperty(device, notifier.getName() + ApplicationQueueManager.NOTIFIER_ID_POSTFIX, newProviderId);
         } else {
             value = em.getProperty(device, notifier.getUuid()
-                    + NotificationsService.NOTIFIER_ID_POSTFIX);
+                    + ApplicationQueueManager.NOTIFIER_ID_POSTFIX);
             if (value != null) {
-                em.setProperty(device, notifier.getUuid() + NotificationsService.NOTIFIER_ID_POSTFIX, newProviderId);
+                em.setProperty(device, notifier.getUuid() + ApplicationQueueManager.NOTIFIER_ID_POSTFIX, newProviderId);
             }
         }
     }
@@ -183,7 +178,7 @@ public class TaskManager {
             LOG.info("notification finished batch: {} of {} devices in " + latency + "ms", notification.getUuid(), totals);
 
             em.update(notification);
-//        Set<Notifier> notifiers = new HashSet<>(proxy.getNotifierMap().values()); // remove dups
+//        Set<Notifier> notifiers = new HashSet<>(proxy.getAdapterMap().values()); // remove dups
 //        proxy.asyncCheckForInactiveDevices(notifiers);
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8f4720db/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/ExpiredTokenListener.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/ExpiredTokenListener.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/ExpiredTokenListener.java
index 6408dfd..1f7984a 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/ExpiredTokenListener.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/ExpiredTokenListener.java
@@ -23,15 +23,7 @@ package org.apache.usergrid.services.notifications.apns;
 import com.relayrides.pushy.apns.ExpiredToken;
 import com.relayrides.pushy.apns.PushManager;
 import com.relayrides.pushy.apns.util.SimpleApnsPushNotification;
-import org.apache.usergrid.persistence.Entity;
-import org.apache.usergrid.persistence.EntityManager;
-import org.apache.usergrid.persistence.Results;
-import org.apache.usergrid.persistence.entities.Notifier;
-import org.apache.usergrid.persistence.index.query.Query;
-import org.apache.usergrid.services.notifications.ApplicationQueueManager;
 import org.apache.usergrid.services.notifications.InactiveDeviceManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.Collection;
 import java.util.Date;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8f4720db/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
new file mode 100644
index 0000000..c8c5165
--- /dev/null
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
@@ -0,0 +1,523 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.services.notifications.impl;
+
+import com.clearspring.analytics.hash.MurmurHash;
+import com.clearspring.analytics.stream.frequency.CountMinSketch;
+import com.codahale.metrics.Meter;
+import org.apache.usergrid.batch.JobExecution;
+import org.apache.usergrid.metrics.MetricsFactory;
+import org.apache.usergrid.persistence.*;
+import org.apache.usergrid.persistence.entities.Device;
+import org.apache.usergrid.persistence.entities.Notification;
+import org.apache.usergrid.persistence.entities.Notifier;
+import org.apache.usergrid.persistence.entities.Receipt;
+import org.apache.usergrid.persistence.index.query.Query;
+import org.apache.usergrid.persistence.queue.QueueManager;
+import org.apache.usergrid.persistence.queue.QueueMessage;
+import org.apache.usergrid.services.notifications.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import rx.Observable;
+import rx.Subscriber;
+import rx.functions.Action1;
+import rx.functions.Func1;
+import rx.schedulers.Schedulers;
+
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
+
+
+public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ApplicationQueueManagerImpl.class);
+
+    //this is for tests, will not mark initial post complete, set to false for tests
+
+    private final EntityManager em;
+    private final QueueManager qm;
+    private final JobScheduler jobScheduler;
+    private final MetricsFactory metricsFactory;
+    private final String queueName;
+
+    HashMap<Object, ProviderAdapter> notifierHashMap; // only retrieve notifiers once
+
+
+    public ApplicationQueueManagerImpl(JobScheduler jobScheduler, EntityManager entityManager, QueueManager queueManager, MetricsFactory metricsFactory, Properties properties){
+        this.em = entityManager;
+        this.qm = queueManager;
+        this.jobScheduler = jobScheduler;
+        this.metricsFactory = metricsFactory;
+        this.queueName = getQueueNames(properties);
+
+    }
+
+    private boolean scheduleQueueJob(Notification notification) throws Exception{
+        return jobScheduler.scheduleQueueJob(notification);
+    }
+
+    @Override
+    public void queueNotification(final Notification notification, final JobExecution jobExecution) throws Exception {
+        if(scheduleQueueJob(notification)){
+            em.update(notification);
+            return;
+        }
+        final Meter queueMeter = metricsFactory.getMeter(ApplicationQueueManagerImpl.class,"queue");
+        long startTime = System.currentTimeMillis();
+
+        if (notification.getCanceled() == Boolean.TRUE) {
+            LOG.info("notification " + notification.getUuid() + " canceled");
+            if (jobExecution != null) {
+                jobExecution.killed();
+            }
+            return;
+        }
+
+        LOG.info("notification {} start queuing", notification.getUuid());
+
+        final PathQuery<Device> pathQuery = notification.getPathQuery() ; //devices query
+        final AtomicInteger deviceCount = new AtomicInteger(); //count devices so you can make a judgement on batching
+        final ConcurrentLinkedQueue<String> errorMessages = new ConcurrentLinkedQueue<String>(); //build up list of issues
+
+
+        //get devices in querystring, and make sure you have access
+        if (pathQuery != null) {
+            final HashMap<Object,ProviderAdapter> notifierMap =  getAdapterMap();
+            LOG.info("notification {} start query", notification.getUuid());
+            final Iterator<Device> iterator = pathQuery.iterator(em);
+            //if there are more pages (defined by PAGE_SIZE) you probably want this to be async, also if this is already a job then don't reschedule
+            if (iterator instanceof ResultsIterator && ((ResultsIterator) iterator).hasPages() && jobExecution == null) {
+                jobScheduler.scheduleQueueJob(notification, true);
+                em.update(notification);
+                return;
+            }
+            final CountMinSketch sketch = new CountMinSketch(0.0001,.99,7364181); //add probablistic counter to find dups
+            final UUID appId = em.getApplication().getUuid();
+            final Map<String,Object> payloads = notification.getPayloads();
+
+            final Func1<Entity,Entity> entityListFunct = new Func1<Entity, Entity>() {
+                @Override
+                public Entity call(Entity entity) {
+
+                    try {
+
+                        long now = System.currentTimeMillis();
+                        List<EntityRef> devicesRef = getDevices(entity); // resolve group
+
+                        LOG.info("notification {} queue  {} devices, duration "+(System.currentTimeMillis()-now)+" ms", notification.getUuid(), devicesRef.size());
+
+                        for (EntityRef deviceRef : devicesRef) {
+                            LOG.info("notification {} starting to queue device {} ", notification.getUuid(), deviceRef.getUuid());
+                            long hash = MurmurHash.hash(deviceRef.getUuid());
+                            if (sketch.estimateCount(hash) > 0) { //look for duplicates
+                                LOG.warn("Maybe Found duplicate device: {}", deviceRef.getUuid());
+                                continue;
+                            } else {
+                                sketch.add(hash, 1);
+                            }
+                            String notifierId = null;
+                            String notifierKey = null;
+
+                            //find the device notifier info, match it to the payload
+                            for (Map.Entry<String, Object> entry : payloads.entrySet()) {
+                                ProviderAdapter adapter = notifierMap.get(entry.getKey().toLowerCase());
+                                now = System.currentTimeMillis();
+                                String providerId = getProviderId(deviceRef, adapter.getNotifier());
+                                if (providerId != null) {
+                                    notifierId = providerId;
+                                    notifierKey = entry.getKey().toLowerCase();
+                                    break;
+                                }
+                                LOG.info("Provider query for notification {} device {} took "+(System.currentTimeMillis()-now)+" ms",notification.getUuid(),deviceRef.getUuid());
+                            }
+
+                            if (notifierId == null) {
+                                LOG.info("Notifier did not match for device {} ", deviceRef);
+                                continue;
+                            }
+
+                            ApplicationQueueMessage message = new ApplicationQueueMessage(appId, notification.getUuid(), deviceRef.getUuid(), notifierKey, notifierId);
+                            if (notification.getQueued() == null) {
+                                // update queued time
+                                now = System.currentTimeMillis();
+                                notification.setQueued(System.currentTimeMillis());
+                                LOG.info("notification {} device {} queue time set. duration "+(System.currentTimeMillis()-now)+" ms", notification.getUuid(), deviceRef.getUuid());
+                            }
+                            now = System.currentTimeMillis();
+                            qm.sendMessage(message);
+                            LOG.info("notification {} post-queue to device {} duration " + (System.currentTimeMillis() - now) + " ms "+queueName+" queue", notification.getUuid(), deviceRef.getUuid());
+                            deviceCount.incrementAndGet();
+                            queueMeter.mark();
+                        }
+                    } catch (Exception deviceLoopException) {
+                        LOG.error("Failed to add devices", deviceLoopException);
+                        errorMessages.add("Failed to add devices for entity: " + entity.getUuid() + " error:" + deviceLoopException);
+                    }
+                    return entity;
+                }
+            };
+
+            long now = System.currentTimeMillis();
+            Observable o = rx.Observable.create(new IteratorObservable<Entity>(iterator))
+                    .parallel(new Func1<Observable<Entity>, Observable<Entity>>() {
+                        @Override
+                        public rx.Observable<Entity> call(rx.Observable<Entity> deviceObservable) {
+                            return deviceObservable.map(entityListFunct);
+                        }
+                    }, Schedulers.io())
+                    .doOnError(new Action1<Throwable>() {
+                        @Override
+                        public void call(Throwable throwable) {
+                            LOG.error("Failed while writing", throwable);
+                        }
+                    });
+            o.toBlocking().lastOrDefault(null);
+            LOG.info("notification {} done queueing duration {} ms", notification.getUuid(), System.currentTimeMillis() - now);
+        }
+
+        // update queued time
+        Map<String, Object> properties = new HashMap<String, Object>(2);
+        properties.put("queued", notification.getQueued());
+        properties.put("state", notification.getState());
+        if(errorMessages.size()>0){
+            if (notification.getErrorMessage() == null) {
+                notification.setErrorMessage("There was a problem delivering all of your notifications. See deliveryErrors in properties");
+            }
+        }
+
+        notification.setExpectedCount(deviceCount.get());
+        notification.addProperties(properties);
+        long now = System.currentTimeMillis();
+
+
+        LOG.info("notification {} updated notification duration {} ms", notification.getUuid(), System.currentTimeMillis() - now);
+
+        //do i have devices, and have i already started batching.
+        if (deviceCount.get() <= 0 || !notification.getDebug()) {
+            TaskManager taskManager = new TaskManager(em, notification);
+            //if i'm in a test value will be false, do not mark finished for test orchestration, not ideal need real tests
+            taskManager.finishedBatch(false,true);
+        }else {
+            em.update(notification);
+        }
+
+        long elapsed = notification.getQueued() != null ? notification.getQueued() - startTime : 0;
+        LOG.info("notification {} done queuing to {} devices in " + elapsed + " ms", notification.getUuid().toString(), deviceCount.get());
+    }
+
+    /**
+     * only need to get notifiers once. will reset on next batch
+     * @return
+     */
+    private HashMap<Object,ProviderAdapter> getAdapterMap(){
+        if(notifierHashMap == null) {
+            long now = System.currentTimeMillis();
+            notifierHashMap = new HashMap<Object, ProviderAdapter>();
+            Query query = new Query();
+            query.setCollection("notifiers");
+            query.setLimit(100);
+            PathQuery<Notifier> pathQuery = new PathQuery<Notifier>(
+                    new SimpleEntityRef(em.getApplicationRef()),
+                    query
+            );
+            Iterator<Notifier> notifierIterator = pathQuery.iterator(em);
+            int count = 0;
+            while (notifierIterator.hasNext()) {
+                Notifier notifier = notifierIterator.next();
+                String name = notifier.getName() != null ? notifier.getName() : "";
+                UUID uuid = notifier.getUuid() != null ? notifier.getUuid() : UUID.randomUUID();
+                ProviderAdapter providerAdapter = ProviderAdapterFactory.getProviderAdapter(notifier,em);
+                notifierHashMap.put(name.toLowerCase(), providerAdapter);
+                notifierHashMap.put(uuid, providerAdapter);
+                notifierHashMap.put(uuid.toString(), providerAdapter);
+                if(count++ >= 100){
+                    LOG.error("ApplicationQueueManager: too many notifiers...breaking out ", notifierHashMap.size());
+                    break;
+                }
+            }
+            LOG.info("ApplicationQueueManager: fetching notifiers finished size={}, duration {} ms", notifierHashMap.size(),System.currentTimeMillis() - now);
+        }
+        return notifierHashMap;
+    }
+
+    /**
+     * send batches of notifications to provider
+     * @param messages
+     * @throws Exception
+     */
+    @Override
+    public Observable sendBatchToProviders(final List<QueueMessage> messages, final String queuePath) {
+        LOG.info("sending batch of {} notifications.", messages.size());
+        final Meter sendMeter = metricsFactory.getMeter(NotificationsService.class, "send");
+
+        final Map<Object, ProviderAdapter> notifierMap = getAdapterMap();
+        final ApplicationQueueManagerImpl proxy = this;
+        final ConcurrentHashMap<UUID,TaskManager> taskMap = new ConcurrentHashMap<UUID, TaskManager>(messages.size());
+        final ConcurrentHashMap<UUID,Notification> notificationMap = new ConcurrentHashMap<UUID, Notification>(messages.size());
+
+        final Func1<QueueMessage, ApplicationQueueMessage> func = new Func1<QueueMessage, ApplicationQueueMessage>() {
+            @Override
+            public ApplicationQueueMessage call(QueueMessage queueMessage) {
+                boolean messageCommitted = false;
+                ApplicationQueueMessage message = null;
+                try {
+                    message = (ApplicationQueueMessage) queueMessage.getBody();
+                    LOG.info("start sending notification for device {} for Notification: {} on thread "+Thread.currentThread().getId(), message.getDeviceId(), message.getNotificationId());
+
+                    UUID deviceUUID = message.getDeviceId();
+
+                    Notification notification = notificationMap.get(message.getNotificationId());
+                    if (notification == null) {
+                        notification = em.get(message.getNotificationId(), Notification.class);
+                        notificationMap.put(message.getNotificationId(), notification);
+                    }
+                    TaskManager taskManager = taskMap.get(message.getNotificationId());
+                    if (taskManager == null) {
+                        taskManager = new TaskManager(em, notification);
+                        taskMap.putIfAbsent(message.getNotificationId(), taskManager);
+                        taskManager = taskMap.get(message.getNotificationId());
+                    }
+
+                    final Map<String, Object> payloads = notification.getPayloads();
+                    final Map<String, Object> translatedPayloads = translatePayloads(payloads, notifierMap);
+                    LOG.info("sending notification for device {} for Notification: {}", deviceUUID, notification.getUuid());
+
+                    try {
+                        String notifierName = message.getNotifierKey().toLowerCase();
+                        ProviderAdapter providerAdapter = notifierMap.get(notifierName.toLowerCase());
+                        Object payload = translatedPayloads.get(notifierName);
+                        Receipt receipt = new Receipt(notification.getUuid(), message.getNotifierId(), payload, deviceUUID);
+                        TaskTracker tracker = new TaskTracker(providerAdapter.getNotifier(), taskManager, receipt, deviceUUID);
+                        if(!isOkToSend(notification)){
+                             tracker.failed(0, "Notification is duplicate/expired/cancelled.");
+                        }else {
+                            if (payload == null) {
+                                LOG.debug("selected device {} for notification {} doesn't have a valid payload. skipping.", deviceUUID, notification.getUuid());
+                                tracker.failed(0, "failed to match payload to " + message.getNotifierId() + " notifier");
+                            } else {
+                                long now = System.currentTimeMillis();
+                                try {
+                                    providerAdapter.sendNotification(message.getNotifierId(), payload, notification, tracker);
+                                } catch (Exception e) {
+                                    tracker.failed(0, e.getMessage());
+                                } finally {
+                                    LOG.info("sending to device {} for Notification: {} duration " + (System.currentTimeMillis() - now) + " ms", deviceUUID, notification.getUuid());
+                                }
+                            }
+                        }
+                        messageCommitted = true;
+                    } finally {
+                        sendMeter.mark();
+                    }
+
+                } catch (Exception e) {
+                    LOG.error("Failure while sending",e);
+                    try {
+                        if(!messageCommitted && queuePath != null) {
+                            qm.commitMessage(queueMessage);
+                        }
+                    }catch (Exception queueException){
+                        LOG.error("Failed to commit message.",queueException);
+                    }
+                }
+                return message;
+            }
+        };
+        Observable o = rx.Observable.from(messages)
+                .parallel(new Func1<rx.Observable<QueueMessage>, rx.Observable<ApplicationQueueMessage>>() {
+                    @Override
+                    public rx.Observable<ApplicationQueueMessage> call(rx.Observable<QueueMessage> messageObservable) {
+                        return messageObservable.map(func);
+                    }
+                }, Schedulers.io())
+                .buffer(messages.size())
+                .map(new Func1<List<ApplicationQueueMessage>, HashMap<UUID, ApplicationQueueMessage>>() {
+                    @Override
+                    public HashMap<UUID, ApplicationQueueMessage> call(List<ApplicationQueueMessage> queueMessages) {
+                        //for gcm this will actually send notification
+                        for (ProviderAdapter providerAdapter : notifierMap.values()) {
+                            try {
+                                providerAdapter.doneSendingNotifications();
+                            } catch (Exception e) {
+                                LOG.error("providerAdapter.doneSendingNotifications: ", e);
+                            }
+                        }
+                        //TODO: check if a notification is done and mark it
+                        HashMap<UUID, ApplicationQueueMessage> notifications = new HashMap<UUID, ApplicationQueueMessage>();
+                        for (ApplicationQueueMessage message : queueMessages) {
+                            if (notifications.get(message.getNotificationId()) == null) {
+                                try {
+                                    TaskManager taskManager = taskMap.get(message.getNotificationId());
+                                    notifications.put(message.getNotificationId(), message);
+                                    taskManager.finishedBatch();
+                                } catch (Exception e) {
+                                    LOG.error("Failed to finish batch", e);
+                                }
+                            }
+
+                        }
+                        return notifications;
+                    }
+                })
+                .doOnError(new Action1<Throwable>() {
+                    @Override
+                    public void call(Throwable throwable) {
+                        LOG.error("Failed while sending",throwable);
+                    }
+                });
+        return o;
+    }
+
+    @Override
+    public void stop(){
+        for(ProviderAdapter adapter : getAdapterMap().values()){
+            try {
+                adapter.stop();
+            }catch (Exception e){
+                LOG.error("failed to stop adapter",e);
+            }
+        }
+    }
+
+
+    /**
+     * Call the adapter with the notifier
+     */
+    private Map<String, Object> translatePayloads(Map<String, Object> payloads, Map<Object, ProviderAdapter> notifierMap) throws Exception {
+        Map<String, Object> translatedPayloads = new HashMap<String, Object>(  payloads.size());
+        for (Map.Entry<String, Object> entry : payloads.entrySet()) {
+            String payloadKey = entry.getKey().toLowerCase();
+            Object payloadValue = entry.getValue();
+            ProviderAdapter providerAdapter = notifierMap.get(payloadKey);
+            if (providerAdapter != null) {
+                Object translatedPayload = payloadValue != null ? providerAdapter.translatePayload(payloadValue) : null;
+                if (translatedPayload != null) {
+                    translatedPayloads.put(payloadKey, translatedPayload);
+                }
+            }
+        }
+        return translatedPayloads;
+    }
+
+    public static String getQueueNames(Properties properties) {
+        String name = properties.getProperty(ApplicationQueueManagerImpl.DEFAULT_QUEUE_PROPERTY, ApplicationQueueManagerImpl.DEFAULT_QUEUE_NAME);
+        return name;
+    }
+
+    private static final class IteratorObservable<T> implements rx.Observable.OnSubscribe<T> {
+        private final Iterator<T> input;
+        private IteratorObservable( final Iterator input ) {this.input = input;}
+
+        @Override
+        public void call( final Subscriber<? super T> subscriber ) {
+
+            /**
+             * You would replace this code with your file reading.  Instead of emitting from an iterator,
+             * you would create a bean object that represents the entity, and then emit it
+             */
+
+            try {
+                while ( !subscriber.isUnsubscribed() && input.hasNext() ) {
+                    //send our input to the next
+                    subscriber.onNext( (T) input.next() );
+                }
+
+                //tell the subscriber we don't have any more data
+                subscriber.onCompleted();
+            }
+            catch ( Throwable t ) {
+                LOG.error("failed on subscriber",t);
+                subscriber.onError( t );
+            }
+        }
+    }
+
+    @Override
+    public void asyncCheckForInactiveDevices() throws Exception {
+        Collection<ProviderAdapter> providerAdapters = getAdapterMap().values();
+        for (final ProviderAdapter providerAdapter : providerAdapters) {
+            try {
+                if (providerAdapter != null) {
+                    LOG.debug("checking notifier {} for inactive devices", providerAdapter.getNotifier());
+                    providerAdapter.removeInactiveDevices();
+
+                    LOG.debug("finished checking notifier {} for inactive devices",providerAdapter.getNotifier());
+                }
+            } catch (Exception e) {
+                LOG.error("checkForInactiveDevices", e); // not
+                // essential so
+                // don't fail,
+                // but log
+            }
+        }
+    }
+
+
+    private boolean isOkToSend(Notification notification) {
+        Map<String,Long> stats = notification.getStatistics();
+        if (stats != null && notification.getExpectedCount() == (stats.get("sent")+ stats.get("errors"))) {
+            LOG.info("notification {} already processed. not sending.",
+                    notification.getUuid());
+            return false;
+        }
+        if (notification.getCanceled() == Boolean.TRUE) {
+            LOG.info("notification {} canceled. not sending.",
+                    notification.getUuid());
+            return false;
+        }
+        if (notification.isExpired()) {
+            LOG.info("notification {} expired. not sending.",
+                    notification.getUuid());
+            return false;
+        }
+        return true;
+    }
+
+    private List<EntityRef> getDevices(EntityRef ref) throws Exception {
+        List<EntityRef> devices = Collections.EMPTY_LIST;
+        if ("device".equals(ref.getType())) {
+            devices = Collections.singletonList(ref);
+        } else if ("user".equals(ref.getType())) {
+            devices = em.getCollection(ref, "devices", null, Query.MAX_LIMIT,
+                    Query.Level.REFS, false).getRefs();
+        } else if ("group".equals(ref.getType())) {
+            devices = new ArrayList<EntityRef>();
+            for (EntityRef r : em.getCollection(ref, "users", null,
+                    Query.MAX_LIMIT, Query.Level.REFS, false).getRefs()) {
+                devices.addAll(getDevices(r));
+            }
+        }
+        return devices;
+    }
+
+
+    private String getProviderId(EntityRef device, Notifier notifier) throws Exception {
+        try {
+            Object value = em.getProperty(device, notifier.getName() + NOTIFIER_ID_POSTFIX);
+            if (value == null) {
+                value = em.getProperty(device, notifier.getUuid() + NOTIFIER_ID_POSTFIX);
+            }
+            return value != null ? value.toString() : null;
+        } catch (Exception e) {
+            LOG.error("Errer getting provider ID, proceding with rest of batch", e);
+            return null;
+        }
+    }
+
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8f4720db/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java b/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java
index 2a4ec73..02f881d 100644
--- a/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java
@@ -16,10 +16,8 @@
  */
 package org.apache.usergrid.services.notifications.apns;
 
-import com.relayrides.pushy.apns.*;
 import com.relayrides.pushy.apns.util.*;
 import org.apache.commons.io.IOUtils;
-import org.apache.usergrid.services.ServiceParameter;
 import org.apache.usergrid.persistence.*;
 import org.apache.usergrid.persistence.entities.*;
 import org.apache.usergrid.persistence.index.query.Query;
@@ -30,7 +28,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.InputStream;
-import java.net.SocketException;
 import java.util.*;
 
 import org.apache.usergrid.services.ServiceAction;
@@ -40,7 +37,7 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.fail;
 
-import static org.apache.usergrid.services.notifications.NotificationsService.NOTIFIER_ID_POSTFIX;
+import static org.apache.usergrid.services.notifications.impl.ApplicationQueueManagerImpl.NOTIFIER_ID_POSTFIX;
 
 // todo: test reschedule on delivery time change
 // todo: test restart of queuing
@@ -68,7 +65,7 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
 
     @BeforeClass
     public static void setup(){
-        ApplicationQueueManager.DEFAULT_QUEUE_NAME = "test";
+
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8f4720db/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java b/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java
index 3795be1..ad1c9f2 100644
--- a/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java
@@ -18,22 +18,19 @@ package org.apache.usergrid.services.notifications.gcm;
 
 import org.apache.usergrid.persistence.*;
 import org.apache.usergrid.persistence.entities.*;
-import org.apache.usergrid.persistence.index.query.Query;
-import org.apache.usergrid.services.ServiceParameter;
 import org.apache.usergrid.services.TestQueueManager;
 import org.apache.usergrid.services.notifications.*;
 import org.junit.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.lang.reflect.Field;
 import java.util.*;
 
 import org.apache.usergrid.services.ServiceAction;
 
 import static org.junit.Assert.*;
-import static org.apache.usergrid.services.notifications.NotificationsService.NOTIFIER_ID_POSTFIX;
+import static org.apache.usergrid.services.notifications.ApplicationQueueManager.NOTIFIER_ID_POSTFIX;
 
 public class NotificationsServiceIT extends AbstractServiceNotificationIT {
 
@@ -59,7 +56,7 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
 
     @BeforeClass
     public static void setup(){
-        ApplicationQueueManager.DEFAULT_QUEUE_NAME = "test";
+
     }
     @Override
     @Before