You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/10/29 20:21:43 UTC

[02/22] git commit: rewrite remove inactive devices

rewrite remove inactive devices


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/e1b2e736
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/e1b2e736
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/e1b2e736

Branch: refs/heads/two-dot-o-events
Commit: e1b2e7365e84c4d0a5a02fdae10d7adac80e5263
Parents: d5d859b
Author: Shawn Feldman <sf...@apache.org>
Authored: Fri Oct 24 16:07:40 2014 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Fri Oct 24 16:07:40 2014 -0600

----------------------------------------------------------------------
 .../usergrid/persistence/entities/Notifier.java | 10 ++-
 .../notifications/ApplicationQueueManager.java  | 90 +++++---------------
 .../notifications/InactiveDeviceManager.java    | 77 +++++++++++++++++
 .../notifications/NotificationsService.java     |  1 +
 .../services/notifications/ProviderAdapter.java |  2 +-
 .../services/notifications/QueueListener.java   | 24 ++++--
 .../notifications/apns/APNsAdapter.java         | 68 ++++-----------
 .../notifications/apns/EntityPushManager.java   | 72 ++++++++++++++++
 .../apns/ExpiredTokenListener.java              | 60 +++++++++++++
 .../services/notifications/gcm/GCMAdapter.java  |  7 +-
 10 files changed, 280 insertions(+), 131 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e1b2e736/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notifier.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notifier.java b/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notifier.java
index be0b447..ba3f2fc 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notifier.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notifier.java
@@ -18,6 +18,7 @@ package org.apache.usergrid.persistence.entities;
 
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import org.apache.usergrid.persistence.EntityManager;
 import org.apache.usergrid.persistence.TypedEntity;
 import org.apache.usergrid.persistence.annotations.EntityProperty;
 
@@ -34,6 +35,8 @@ public class Notifier extends TypedEntity {
 
     public static final String ENTITY_TYPE = "notifier";
 
+    protected EntityManager entityManager;
+
     @EntityProperty(aliasProperty = true, unique = true, basic = true)
     protected String name;
 
@@ -63,8 +66,6 @@ public class Notifier extends TypedEntity {
         uuid = id;
     }
 
-
-
     @Override
     @JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
     public String getName() {
@@ -131,5 +132,10 @@ public class Notifier extends TypedEntity {
         this.apiKey = apiKey;
     }
 
+    @JsonIgnore
+    public EntityManager getEntityManager(){return entityManager;}
+
+    public void setEntityManager(EntityManager entityManager){ this.entityManager = entityManager;}
+
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e1b2e736/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
index a83513f..3fd84a6 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
@@ -63,16 +63,7 @@ public class ApplicationQueueManager  {
 
     HashMap<Object, Notifier> notifierHashMap; // only retrieve notifiers once
 
-    public final Map<String, ProviderAdapter> providerAdapters =   new HashMap<String, ProviderAdapter>(3);
-    {
-        providerAdapters.put("apple", APNS_ADAPTER);
-        providerAdapters.put("google", new GCMAdapter());
-        providerAdapters.put("noop", TEST_ADAPTER);
-    };
-    // these 2 can be static, but GCM can't. future: would be nice to get gcm
-    // static as well...
-    public static ProviderAdapter APNS_ADAPTER = new APNsAdapter();
-    public static ProviderAdapter TEST_ADAPTER = new TestAdapter();
+    public final Map<String, ProviderAdapter> providerAdapters;
 
 
     public ApplicationQueueManager(JobScheduler jobScheduler, EntityManager entityManager, QueueManager queueManager, MetricsFactory metricsFactory, Properties properties){
@@ -81,6 +72,12 @@ public class ApplicationQueueManager  {
         this.jobScheduler = jobScheduler;
         this.metricsFactory = metricsFactory;
         this.queueName = getQueueNames(properties);
+        providerAdapters =   new HashMap<String, ProviderAdapter>(3);
+        {
+            providerAdapters.put("apple", new APNsAdapter());
+            providerAdapters.put("google", new GCMAdapter());
+            providerAdapters.put("noop", new TestAdapter());
+        };
     }
 
     public boolean scheduleQueueJob(Notification notification) throws Exception{
@@ -254,6 +251,7 @@ public class ApplicationQueueManager  {
             int count = 0;
             while (notifierIterator.hasNext()) {
                 Notifier notifier = notifierIterator.next();
+                notifier.setEntityManager(em);
                 String name = notifier.getName() != null ? notifier.getName() : "";
                 UUID uuid = notifier.getUuid() != null ? notifier.getUuid() : UUID.randomUUID();
                 notifierHashMap.put(name.toLowerCase(), notifier);
@@ -454,71 +452,27 @@ public class ApplicationQueueManager  {
         }
     }
 
-    public void asyncCheckForInactiveDevices(Set<Notifier> notifiers)  throws Exception {
+    public void asyncCheckForInactiveDevices() throws Exception {
+        Collection<Notifier> notifiers = getNotifierMap().values();
         for (final Notifier notifier : notifiers) {
-            INACTIVE_DEVICE_CHECK_POOL.execute(new Runnable() {
-                @Override
-                public void run() {
-                    try {
-                        checkForInactiveDevices(notifier);
-                    } catch (Exception e) {
-                        LOG.error("checkForInactiveDevices", e); // not
-                        // essential so
-                        // don't fail,
-                        // but log
-                    }
-                }
-            });
-        }
-    }
+            try {
+                ProviderAdapter providerAdapter = providerAdapters.get(notifier.getProvider());
+                if (providerAdapter != null) {
+                    LOG.debug("checking notifier {} for inactive devices", notifier);
+                    providerAdapter.removeInactiveDevices(notifier, em);
 
-    /** gets the list of inactive devices from the Provider and updates them */
-    private void checkForInactiveDevices(Notifier notifier) throws Exception {
-        ProviderAdapter providerAdapter = providerAdapters.get(notifier
-                .getProvider());
-        if (providerAdapter != null) {
-            LOG.debug("checking notifier {} for inactive devices", notifier);
-            Map<String, Date> inactiveDeviceMap = providerAdapter
-                    .getInactiveDevices(notifier, em);
-
-            if (inactiveDeviceMap != null && inactiveDeviceMap.size() > 0) {
-                LOG.debug("processing {} inactive devices",
-                        inactiveDeviceMap.size());
-                Map<String, Object> clearPushtokenMap = new HashMap<String, Object>(
-                        2);
-                clearPushtokenMap.put(notifier.getName() + NOTIFIER_ID_POSTFIX,
-                        "");
-                clearPushtokenMap.put(notifier.getUuid() + NOTIFIER_ID_POSTFIX,
-                        "");
-
-                // todo: this could be done in a single query
-                for (Map.Entry<String, Date> entry : inactiveDeviceMap
-                        .entrySet()) {
-                    // name
-                    Query query = new Query();
-                    query.addEqualityFilter(notifier.getName()
-                            + NOTIFIER_ID_POSTFIX, entry.getKey());
-                    Results results = em.searchCollection(em.getApplication(),
-                            "devices", query);
-                    for (Entity e : results.getEntities()) {
-                        em.updateProperties(e, clearPushtokenMap);
-                    }
-                    // uuid
-                    query = new Query();
-                    query.addEqualityFilter(notifier.getUuid()
-                            + NOTIFIER_ID_POSTFIX, entry.getKey());
-                    results = em.searchCollection(em.getApplication(),
-                            "devices", query);
-                    for (Entity e : results.getEntities()) {
-                        em.updateProperties(e, clearPushtokenMap);
-                    }
+                    LOG.debug("finished checking notifier {} for inactive devices",notifier);
                 }
+            } catch (Exception e) {
+                LOG.error("checkForInactiveDevices", e); // not
+                // essential so
+                // don't fail,
+                // but log
             }
-            LOG.debug("finished checking notifier {} for inactive devices",
-                    notifier);
         }
     }
 
+
     private boolean isOkToSend(Notification notification) {
         Map<String,Long> stats = notification.getStatistics();
         if (stats != null && notification.getExpectedCount() == (stats.get("sent")+ stats.get("errors"))) {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e1b2e736/stack/services/src/main/java/org/apache/usergrid/services/notifications/InactiveDeviceManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/InactiveDeviceManager.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/InactiveDeviceManager.java
new file mode 100644
index 0000000..ecee485
--- /dev/null
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/InactiveDeviceManager.java
@@ -0,0 +1,77 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  *  contributor license agreements.  The ASF licenses this file to You
+ *  * under the Apache License, Version 2.0 (the "License"); you may not
+ *  * use this file except in compliance with the License.
+ *  * You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.  For additional information regarding
+ *  * copyright in this work, please see the NOTICE file in the top level
+ *  * directory of this distribution.
+ *
+ */
+
+package org.apache.usergrid.services.notifications;
+
+import org.apache.usergrid.persistence.Entity;
+import org.apache.usergrid.persistence.EntityManager;
+import org.apache.usergrid.persistence.Results;
+import org.apache.usergrid.persistence.entities.Notifier;
+import org.apache.usergrid.persistence.index.query.Query;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Classy class class.
+ */
+public class InactiveDeviceManager {
+    private static final Logger LOG = LoggerFactory.getLogger(InactiveDeviceManager.class);
+    private final Notifier notifier;
+
+    public InactiveDeviceManager(Notifier notifier){
+        this.notifier = notifier;
+    }
+    public void removeInactiveDevices( Map<String,Date> inactiveDeviceMap  ){
+        final String notfierPostFix = ApplicationQueueManager.NOTIFIER_ID_POSTFIX;
+        final EntityManager em = notifier.getEntityManager();
+        if (inactiveDeviceMap != null && inactiveDeviceMap.size() > 0) {
+            LOG.debug("processing {} inactive devices",  inactiveDeviceMap.size());
+            Map<String, Object> clearPushtokenMap = new HashMap<String, Object>( 2);
+            clearPushtokenMap.put(notifier.getName() + notfierPostFix,  "");
+            clearPushtokenMap.put(notifier.getUuid() + notfierPostFix,  "");
+
+            // todo: this could be done in a single query
+            for (Map.Entry<String, Date> entry : inactiveDeviceMap.entrySet()) {
+                try {
+                    // name
+                    Query query = new Query();
+                    query.addEqualityFilter(notifier.getName() + notfierPostFix, entry.getKey());
+                    Results results = em.searchCollection(em.getApplication(), "devices", query);
+                    for (Entity e : results.getEntities()) {
+                        em.updateProperties(e, clearPushtokenMap);
+                    }
+                    // uuid
+                    query = new Query();
+                    query.addEqualityFilter(notifier.getUuid() + notfierPostFix, entry.getKey());
+                    results = em.searchCollection(em.getApplication(),  "devices", query);
+                    for (Entity e : results.getEntities()) {
+                        em.updateProperties(e, clearPushtokenMap);
+                    }
+                }catch (Exception e){
+                    LOG.error("failed to remove token",e);
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e1b2e736/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
index 69f66e5..5420d29 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
@@ -356,6 +356,7 @@ public class NotificationsService extends AbstractCollectionService {
     public void testConnection(Notifier notifier) throws Exception {
         ProviderAdapter providerAdapter = providerAdapters.get(notifier.getProvider());
         if (providerAdapter != null) {
+            notifier.setEntityManager(em);
             providerAdapter.testConnection(notifier);
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e1b2e736/stack/services/src/main/java/org/apache/usergrid/services/notifications/ProviderAdapter.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/ProviderAdapter.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ProviderAdapter.java
index 5268674..33e921f 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/ProviderAdapter.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ProviderAdapter.java
@@ -46,7 +46,7 @@ public interface ProviderAdapter {
      */
     public void doneSendingNotifications() throws Exception;
 
-    public Map<String, Date> getInactiveDevices(Notifier notifier,
+    public void removeInactiveDevices(Notifier notifier,
                                                 EntityManager em) throws Exception;
 
     public Object translatePayload(Object payload) throws Exception;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e1b2e736/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
index 7ce315b..432ad7f 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
@@ -35,6 +35,7 @@ import org.slf4j.LoggerFactory;
 import rx.Observable;
 import javax.annotation.PostConstruct;
 import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -151,7 +152,7 @@ public class QueueListener  {
                 LOG.info("retrieved batch of {} messages from queue {} ", messages.size(),queueName);
 
                 if (messages.size() > 0) {
-
+                    Map<UUID,ApplicationQueueManager> queueManagerMap = new ConcurrentHashMap<>();
                     HashMap<UUID, List<QueueMessage>> messageMap = new HashMap<>(messages.size());
                     //group messages into hash map by app id
                     for (QueueMessage message : messages) {
@@ -172,14 +173,18 @@ public class QueueListener  {
                         UUID applicationId = entry.getKey();
                         EntityManager entityManager = emf.getEntityManager(applicationId);
                         ServiceManager serviceManager = smf.getServiceManager(applicationId);
-                        final ApplicationQueueManager manager = new ApplicationQueueManager(
-                                new JobScheduler(serviceManager, entityManager),
-                                entityManager,
-                                queueManager,
-                                metricsService,
-                                properties
-                        );
 
+                        ApplicationQueueManager manager = queueManagerMap.get(applicationId);
+                        if(manager==null) {
+                            manager = new ApplicationQueueManager(
+                                    new JobScheduler(serviceManager, entityManager),
+                                    entityManager,
+                                    queueManager,
+                                    metricsService,
+                                    properties
+                            );
+                            queueManagerMap.put(applicationId,manager);
+                        }
                         LOG.info("send batch for app {} of {} messages", entry.getKey(), entry.getValue().size());
                         Observable current = manager.sendBatchToProviders(entry.getValue(),queueName);
                         if(merge == null)
@@ -192,6 +197,9 @@ public class QueueListener  {
                         merge.toBlocking().lastOrDefault(null);
                     }
                     queueManager.commitMessages(messages);
+                    for(ApplicationQueueManager applicationQueueManager : queueManagerMap.values()){
+                        applicationQueueManager.asyncCheckForInactiveDevices();
+                    }
                     meter.mark(messages.size());
                     LOG.info("sent batch {} messages duration {} ms", messages.size(),System.currentTimeMillis() - now);
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e1b2e736/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/APNsAdapter.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/APNsAdapter.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/APNsAdapter.java
index 4277b8a..7ab1dc3 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/APNsAdapter.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/APNsAdapter.java
@@ -47,8 +47,7 @@ import javax.net.ssl.SSLContext;
  */
 public class APNsAdapter implements ProviderAdapter {
 
-    private static final Logger logger = LoggerFactory
-            .getLogger(APNsAdapter.class);
+    private static final Logger logger = LoggerFactory.getLogger(APNsAdapter.class);
 
     public static int MAX_CONNECTION_POOL_SIZE = 15;
     private static final Set<String> validEnvironments = new HashSet<String>();
@@ -72,7 +71,7 @@ public class APNsAdapter implements ProviderAdapter {
         try {
             CountDownLatch latch = new CountDownLatch(1);
             notification.setLatch(latch);
-                PushManager<SimpleApnsPushNotification> pushManager = getPushManager(notifier);
+            EntityPushManager pushManager = getPushManager(notifier);
                 addToQueue(pushManager, notification);
                 latch.await(10000,TimeUnit.MILLISECONDS);
                 if(notification.hasFailed()){
@@ -80,10 +79,10 @@ public class APNsAdapter implements ProviderAdapter {
                     throw new ConnectionException("Bad certificate. Double-check your environment.",notification.getCause() != null ? notification.getCause() : new Exception("Bad certificate."));
                 }
                 notification.finished();
-            } catch (Exception e) {
-                notification.finished();
+        } catch (Exception e) {
+            notification.finished();
 
-                if (e instanceof ConnectionException) {
+            if (e instanceof ConnectionException) {
                 throw (ConnectionException) e;
             }
             if (e instanceof InterruptedException) {
@@ -120,16 +119,13 @@ public class APNsAdapter implements ProviderAdapter {
     }
 
     @Override
-    public Map<String, Date> getInactiveDevices(Notifier notifier,
-            EntityManager em) throws Exception {
-        Map<String,Date> map = new HashMap<String,Date>();
+    public void removeInactiveDevices(Notifier notifier,EntityManager em) throws Exception {
         PushManager<SimpleApnsPushNotification> pushManager = getPushManager(notifier);
         pushManager.requestExpiredTokens();
-        return map;
     }
 
-    private PushManager<SimpleApnsPushNotification> getPushManager(Notifier notifier) throws ExecutionException {
-        PushManager<SimpleApnsPushNotification> pushManager = apnsServiceMap.get(notifier);
+    private EntityPushManager getPushManager(Notifier notifier) throws ExecutionException {
+        EntityPushManager pushManager = apnsServiceMap.get(notifier);
         if(pushManager != null &&  !pushManager.isStarted() && pushManager.isShutDown()){
             apnsServiceMap.invalidate(notifier);
             pushManager = apnsServiceMap.get(notifier);
@@ -138,15 +134,15 @@ public class APNsAdapter implements ProviderAdapter {
     }
 
     //cache to retrieve push manager, cached per notifier, so many notifications will get same push manager
-    private static LoadingCache<Notifier, PushManager<SimpleApnsPushNotification>> apnsServiceMap = CacheBuilder
+    private static LoadingCache<Notifier, EntityPushManager> apnsServiceMap = CacheBuilder
             .newBuilder()
             .expireAfterAccess(10, TimeUnit.MINUTES)
-            .removalListener(new RemovalListener<Notifier, PushManager<SimpleApnsPushNotification>>() {
+            .removalListener(new RemovalListener<Notifier, EntityPushManager>() {
                 @Override
                 public void onRemoval(
-                        RemovalNotification<Notifier, PushManager<SimpleApnsPushNotification>> notification) {
+                        RemovalNotification<Notifier,EntityPushManager> notification) {
                     try {
-                        PushManager<SimpleApnsPushNotification> manager = notification.getValue();
+                        EntityPushManager manager = notification.getValue();
                         if (!manager.isShutDown()) {
                             List<SimpleApnsPushNotification> notifications = manager.shutdown(3000);
                             for (SimpleApnsPushNotification notification1 : notifications) {
@@ -161,31 +157,20 @@ public class APNsAdapter implements ProviderAdapter {
                         logger.error("Failed to shutdown from cache", ie);
                     }
                 }
-            }).build(new CacheLoader<Notifier, PushManager<SimpleApnsPushNotification>>() {
+            }).build(new CacheLoader<Notifier, EntityPushManager>() {
                 @Override
-                public PushManager<SimpleApnsPushNotification> load(Notifier notifier) {
+                public EntityPushManager load(final Notifier notifier) {
                     try {
-                        LinkedBlockingQueue<SimpleApnsPushNotification> queue = new LinkedBlockingQueue<SimpleApnsPushNotification>();
-                        NioEventLoopGroup group = new NioEventLoopGroup();
                         PushManagerConfiguration config = new PushManagerConfiguration();
                         config.setConcurrentConnectionCount(Runtime.getRuntime().availableProcessors() * 2);
-                        PushManager<SimpleApnsPushNotification> pushManager =  new PushManager<>(getApnsEnvironment(notifier), getSSLContext(notifier), group,null , queue, config,notifier.getName());
+                        EntityPushManager pushManager =  new EntityPushManager(notifier, config);
                         //only tested when a message is sent
                         pushManager.registerRejectedNotificationListener(new RejectedAPNsListener());
                         //this will get tested when start is called
                         pushManager.registerFailedConnectionListener(new FailedConnectionListener());
+                        //unregistered expired devices
+                        pushManager.registerExpiredTokenListener(new ExpiredTokenListener());
 
-                        pushManager.registerExpiredTokenListener(new ExpiredTokenListener<SimpleApnsPushNotification>() {
-                            @Override
-                            public void handleExpiredTokens(PushManager<? extends SimpleApnsPushNotification> pushManager, Collection<ExpiredToken> expiredTokens) {
-                                Map<String,Date> map = new HashMap<String,Date>();
-                                for(ExpiredToken token : expiredTokens){
-                                    String expiredToken = new String(token.getToken());
-                                    map.put(expiredToken, token.getExpiration());
-                                }
-                                //TODO figure out way to call back and clear out em references
-                            }
-                        });
                         try {
                             if (!pushManager.isStarted()) { //ensure manager is started
                                 pushManager.start();
@@ -253,24 +238,7 @@ public class APNsAdapter implements ProviderAdapter {
         return wasDelayed;
     }
 
-    private static ApnsEnvironment getApnsEnvironment(Notifier notifier){
-        return  notifier.isProduction()
-                ? ApnsEnvironment.getProductionEnvironment()
-                : ApnsEnvironment.getSandboxEnvironment();
-    }
 
 
-    private static SSLContext getSSLContext(Notifier notifier) {
-        try {
-            KeyStore keyStore = KeyStore.getInstance("PKCS12");
-            String password = notifier.getCertificatePassword();
-            char[] passChars =(password != null ? password : "").toCharArray();
-            InputStream stream = notifier.getP12CertificateStream();
-            keyStore.load(stream,passChars);
-            SSLContext context =  SSLContextUtil.createDefaultSSLContext(keyStore, passChars);
-            return context;
-        }catch (Exception e){
-            throw new RuntimeException("Error getting certificate",e);
-        }
-    }
 }
+

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e1b2e736/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/EntityPushManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/EntityPushManager.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/EntityPushManager.java
new file mode 100644
index 0000000..29841f0
--- /dev/null
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/EntityPushManager.java
@@ -0,0 +1,72 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  *  contributor license agreements.  The ASF licenses this file to You
+ *  * under the Apache License, Version 2.0 (the "License"); you may not
+ *  * use this file except in compliance with the License.
+ *  * You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.  For additional information regarding
+ *  * copyright in this work, please see the NOTICE file in the top level
+ *  * directory of this distribution.
+ *
+ */
+
+package org.apache.usergrid.services.notifications.apns;
+
+import com.relayrides.pushy.apns.ApnsEnvironment;
+import com.relayrides.pushy.apns.PushManager;
+import com.relayrides.pushy.apns.PushManagerConfiguration;
+import com.relayrides.pushy.apns.util.SSLContextUtil;
+import com.relayrides.pushy.apns.util.SimpleApnsPushNotification;
+import org.apache.usergrid.persistence.EntityManager;
+import org.apache.usergrid.persistence.entities.Notifier;
+
+import javax.net.ssl.SSLContext;
+import java.io.InputStream;
+import java.security.KeyStore;
+import java.util.concurrent.LinkedBlockingDeque;
+
+/**
+ * Classy class class.
+ */
+public class EntityPushManager extends PushManager<SimpleApnsPushNotification> {
+    private final Notifier notifier;
+
+    public EntityPushManager( Notifier notifier, PushManagerConfiguration configuration) {
+        super(getApnsEnvironment(notifier), getSSLContext(notifier), null, null, new LinkedBlockingDeque<SimpleApnsPushNotification>(), configuration, notifier.getName());
+        this.notifier = notifier;
+    }
+
+    public EntityManager getEntityManager() {
+        return notifier.getEntityManager();
+    }
+
+    public Notifier getNotifier() {
+        return notifier;
+    }
+    private static ApnsEnvironment getApnsEnvironment(Notifier notifier){
+        return  notifier.isProduction()
+                ? ApnsEnvironment.getProductionEnvironment()
+                : ApnsEnvironment.getSandboxEnvironment();
+    }
+    private static SSLContext getSSLContext(Notifier notifier) {
+        try {
+            KeyStore keyStore = KeyStore.getInstance("PKCS12");
+            String password = notifier.getCertificatePassword();
+            char[] passChars =(password != null ? password : "").toCharArray();
+            InputStream stream = notifier.getP12CertificateStream();
+            keyStore.load(stream,passChars);
+            SSLContext context =  SSLContextUtil.createDefaultSSLContext(keyStore, passChars);
+            return context;
+        }catch (Exception e){
+            throw new RuntimeException("Error getting certificate",e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e1b2e736/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/ExpiredTokenListener.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/ExpiredTokenListener.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/ExpiredTokenListener.java
new file mode 100644
index 0000000..4c38013
--- /dev/null
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/ExpiredTokenListener.java
@@ -0,0 +1,60 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  *  contributor license agreements.  The ASF licenses this file to You
+ *  * under the Apache License, Version 2.0 (the "License"); you may not
+ *  * use this file except in compliance with the License.
+ *  * You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.  For additional information regarding
+ *  * copyright in this work, please see the NOTICE file in the top level
+ *  * directory of this distribution.
+ *
+ */
+
+package org.apache.usergrid.services.notifications.apns;
+
+import com.relayrides.pushy.apns.ExpiredToken;
+import com.relayrides.pushy.apns.PushManager;
+import com.relayrides.pushy.apns.util.SimpleApnsPushNotification;
+import org.apache.usergrid.persistence.Entity;
+import org.apache.usergrid.persistence.EntityManager;
+import org.apache.usergrid.persistence.Results;
+import org.apache.usergrid.persistence.entities.Notifier;
+import org.apache.usergrid.persistence.index.query.Query;
+import org.apache.usergrid.services.notifications.ApplicationQueueManager;
+import org.apache.usergrid.services.notifications.InactiveDeviceManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Classy class class.
+ */
+public class ExpiredTokenListener implements com.relayrides.pushy.apns.ExpiredTokenListener<SimpleApnsPushNotification> {
+
+
+    @Override
+    public void handleExpiredTokens(PushManager<? extends SimpleApnsPushNotification> pushManager, Collection<ExpiredToken> expiredTokens) {
+        Map<String,Date> inactiveDeviceMap = new HashMap<>();
+        for(ExpiredToken token : expiredTokens){
+            String expiredToken = new String(token.getToken());
+            inactiveDeviceMap.put(expiredToken, token.getExpiration());
+        }
+        if(pushManager instanceof EntityPushManager){
+            EntityPushManager entityPushManager = (EntityPushManager) pushManager;
+            InactiveDeviceManager inactiveDeviceManager = new InactiveDeviceManager(entityPushManager.getNotifier());
+            inactiveDeviceManager.removeInactiveDevices(inactiveDeviceMap);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e1b2e736/stack/services/src/main/java/org/apache/usergrid/services/notifications/gcm/GCMAdapter.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/gcm/GCMAdapter.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/gcm/GCMAdapter.java
index dca4a73..f8de5ff 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/gcm/GCMAdapter.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/gcm/GCMAdapter.java
@@ -19,6 +19,7 @@ package org.apache.usergrid.services.notifications.gcm;
 import com.google.android.gcm.server.*;
 import org.apache.usergrid.persistence.entities.Notification;
 import org.apache.usergrid.persistence.entities.Notifier;
+import org.apache.usergrid.services.notifications.InactiveDeviceManager;
 import org.mortbay.util.ajax.JSON;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -87,14 +88,16 @@ public class GCMAdapter implements ProviderAdapter {
     }
 
     @Override
-    public Map<String, Date> getInactiveDevices(Notifier notifier,
+    public void removeInactiveDevices(Notifier notifier,
             EntityManager em) throws Exception {
         Batch batch = getBatch(notifier, null);
         Map<String,Date> map = null;
         if(batch != null) {
             map = batch.getAndClearInactiveDevices();
+            InactiveDeviceManager deviceManager = new InactiveDeviceManager(notifier);
+            deviceManager.removeInactiveDevices(map);
         }
-        return map;
+
     }
 
     @Override