You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/10/29 20:21:43 UTC
[02/22] git commit: rewrite remove inactive devices
rewrite remove inactive devices
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/e1b2e736
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/e1b2e736
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/e1b2e736
Branch: refs/heads/two-dot-o-events
Commit: e1b2e7365e84c4d0a5a02fdae10d7adac80e5263
Parents: d5d859b
Author: Shawn Feldman <sf...@apache.org>
Authored: Fri Oct 24 16:07:40 2014 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Fri Oct 24 16:07:40 2014 -0600
----------------------------------------------------------------------
.../usergrid/persistence/entities/Notifier.java | 10 ++-
.../notifications/ApplicationQueueManager.java | 90 +++++---------------
.../notifications/InactiveDeviceManager.java | 77 +++++++++++++++++
.../notifications/NotificationsService.java | 1 +
.../services/notifications/ProviderAdapter.java | 2 +-
.../services/notifications/QueueListener.java | 24 ++++--
.../notifications/apns/APNsAdapter.java | 68 ++++-----------
.../notifications/apns/EntityPushManager.java | 72 ++++++++++++++++
.../apns/ExpiredTokenListener.java | 60 +++++++++++++
.../services/notifications/gcm/GCMAdapter.java | 7 +-
10 files changed, 280 insertions(+), 131 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e1b2e736/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notifier.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notifier.java b/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notifier.java
index be0b447..ba3f2fc 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notifier.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notifier.java
@@ -18,6 +18,7 @@ package org.apache.usergrid.persistence.entities;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import org.apache.usergrid.persistence.EntityManager;
import org.apache.usergrid.persistence.TypedEntity;
import org.apache.usergrid.persistence.annotations.EntityProperty;
@@ -34,6 +35,8 @@ public class Notifier extends TypedEntity {
public static final String ENTITY_TYPE = "notifier";
+ protected EntityManager entityManager;
+
@EntityProperty(aliasProperty = true, unique = true, basic = true)
protected String name;
@@ -63,8 +66,6 @@ public class Notifier extends TypedEntity {
uuid = id;
}
-
-
@Override
@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
public String getName() {
@@ -131,5 +132,10 @@ public class Notifier extends TypedEntity {
this.apiKey = apiKey;
}
+ @JsonIgnore
+ public EntityManager getEntityManager(){return entityManager;}
+
+ public void setEntityManager(EntityManager entityManager){ this.entityManager = entityManager;}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e1b2e736/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
index a83513f..3fd84a6 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
@@ -63,16 +63,7 @@ public class ApplicationQueueManager {
HashMap<Object, Notifier> notifierHashMap; // only retrieve notifiers once
- public final Map<String, ProviderAdapter> providerAdapters = new HashMap<String, ProviderAdapter>(3);
- {
- providerAdapters.put("apple", APNS_ADAPTER);
- providerAdapters.put("google", new GCMAdapter());
- providerAdapters.put("noop", TEST_ADAPTER);
- };
- // these 2 can be static, but GCM can't. future: would be nice to get gcm
- // static as well...
- public static ProviderAdapter APNS_ADAPTER = new APNsAdapter();
- public static ProviderAdapter TEST_ADAPTER = new TestAdapter();
+ public final Map<String, ProviderAdapter> providerAdapters;
public ApplicationQueueManager(JobScheduler jobScheduler, EntityManager entityManager, QueueManager queueManager, MetricsFactory metricsFactory, Properties properties){
@@ -81,6 +72,12 @@ public class ApplicationQueueManager {
this.jobScheduler = jobScheduler;
this.metricsFactory = metricsFactory;
this.queueName = getQueueNames(properties);
+ providerAdapters = new HashMap<String, ProviderAdapter>(3);
+ {
+ providerAdapters.put("apple", new APNsAdapter());
+ providerAdapters.put("google", new GCMAdapter());
+ providerAdapters.put("noop", new TestAdapter());
+ };
}
public boolean scheduleQueueJob(Notification notification) throws Exception{
@@ -254,6 +251,7 @@ public class ApplicationQueueManager {
int count = 0;
while (notifierIterator.hasNext()) {
Notifier notifier = notifierIterator.next();
+ notifier.setEntityManager(em);
String name = notifier.getName() != null ? notifier.getName() : "";
UUID uuid = notifier.getUuid() != null ? notifier.getUuid() : UUID.randomUUID();
notifierHashMap.put(name.toLowerCase(), notifier);
@@ -454,71 +452,27 @@ public class ApplicationQueueManager {
}
}
- public void asyncCheckForInactiveDevices(Set<Notifier> notifiers) throws Exception {
+ public void asyncCheckForInactiveDevices() throws Exception {
+ Collection<Notifier> notifiers = getNotifierMap().values();
for (final Notifier notifier : notifiers) {
- INACTIVE_DEVICE_CHECK_POOL.execute(new Runnable() {
- @Override
- public void run() {
- try {
- checkForInactiveDevices(notifier);
- } catch (Exception e) {
- LOG.error("checkForInactiveDevices", e); // not
- // essential so
- // don't fail,
- // but log
- }
- }
- });
- }
- }
+ try {
+ ProviderAdapter providerAdapter = providerAdapters.get(notifier.getProvider());
+ if (providerAdapter != null) {
+ LOG.debug("checking notifier {} for inactive devices", notifier);
+ providerAdapter.removeInactiveDevices(notifier, em);
- /** gets the list of inactive devices from the Provider and updates them */
- private void checkForInactiveDevices(Notifier notifier) throws Exception {
- ProviderAdapter providerAdapter = providerAdapters.get(notifier
- .getProvider());
- if (providerAdapter != null) {
- LOG.debug("checking notifier {} for inactive devices", notifier);
- Map<String, Date> inactiveDeviceMap = providerAdapter
- .getInactiveDevices(notifier, em);
-
- if (inactiveDeviceMap != null && inactiveDeviceMap.size() > 0) {
- LOG.debug("processing {} inactive devices",
- inactiveDeviceMap.size());
- Map<String, Object> clearPushtokenMap = new HashMap<String, Object>(
- 2);
- clearPushtokenMap.put(notifier.getName() + NOTIFIER_ID_POSTFIX,
- "");
- clearPushtokenMap.put(notifier.getUuid() + NOTIFIER_ID_POSTFIX,
- "");
-
- // todo: this could be done in a single query
- for (Map.Entry<String, Date> entry : inactiveDeviceMap
- .entrySet()) {
- // name
- Query query = new Query();
- query.addEqualityFilter(notifier.getName()
- + NOTIFIER_ID_POSTFIX, entry.getKey());
- Results results = em.searchCollection(em.getApplication(),
- "devices", query);
- for (Entity e : results.getEntities()) {
- em.updateProperties(e, clearPushtokenMap);
- }
- // uuid
- query = new Query();
- query.addEqualityFilter(notifier.getUuid()
- + NOTIFIER_ID_POSTFIX, entry.getKey());
- results = em.searchCollection(em.getApplication(),
- "devices", query);
- for (Entity e : results.getEntities()) {
- em.updateProperties(e, clearPushtokenMap);
- }
+ LOG.debug("finished checking notifier {} for inactive devices",notifier);
}
+ } catch (Exception e) {
+ LOG.error("checkForInactiveDevices", e); // not
+ // essential so
+ // don't fail,
+ // but log
}
- LOG.debug("finished checking notifier {} for inactive devices",
- notifier);
}
}
+
private boolean isOkToSend(Notification notification) {
Map<String,Long> stats = notification.getStatistics();
if (stats != null && notification.getExpectedCount() == (stats.get("sent")+ stats.get("errors"))) {
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e1b2e736/stack/services/src/main/java/org/apache/usergrid/services/notifications/InactiveDeviceManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/InactiveDeviceManager.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/InactiveDeviceManager.java
new file mode 100644
index 0000000..ecee485
--- /dev/null
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/InactiveDeviceManager.java
@@ -0,0 +1,77 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. The ASF licenses this file to You
+ * * under the Apache License, Version 2.0 (the "License"); you may not
+ * * use this file except in compliance with the License.
+ * * You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License. For additional information regarding
+ * * copyright in this work, please see the NOTICE file in the top level
+ * * directory of this distribution.
+ *
+ */
+
+package org.apache.usergrid.services.notifications;
+
+import org.apache.usergrid.persistence.Entity;
+import org.apache.usergrid.persistence.EntityManager;
+import org.apache.usergrid.persistence.Results;
+import org.apache.usergrid.persistence.entities.Notifier;
+import org.apache.usergrid.persistence.index.query.Query;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Classy class class.
+ */
+public class InactiveDeviceManager {
+ private static final Logger LOG = LoggerFactory.getLogger(InactiveDeviceManager.class);
+ private final Notifier notifier;
+
+ public InactiveDeviceManager(Notifier notifier){
+ this.notifier = notifier;
+ }
+ public void removeInactiveDevices( Map<String,Date> inactiveDeviceMap ){
+ final String notfierPostFix = ApplicationQueueManager.NOTIFIER_ID_POSTFIX;
+ final EntityManager em = notifier.getEntityManager();
+ if (inactiveDeviceMap != null && inactiveDeviceMap.size() > 0) {
+ LOG.debug("processing {} inactive devices", inactiveDeviceMap.size());
+ Map<String, Object> clearPushtokenMap = new HashMap<String, Object>( 2);
+ clearPushtokenMap.put(notifier.getName() + notfierPostFix, "");
+ clearPushtokenMap.put(notifier.getUuid() + notfierPostFix, "");
+
+ // todo: this could be done in a single query
+ for (Map.Entry<String, Date> entry : inactiveDeviceMap.entrySet()) {
+ try {
+ // name
+ Query query = new Query();
+ query.addEqualityFilter(notifier.getName() + notfierPostFix, entry.getKey());
+ Results results = em.searchCollection(em.getApplication(), "devices", query);
+ for (Entity e : results.getEntities()) {
+ em.updateProperties(e, clearPushtokenMap);
+ }
+ // uuid
+ query = new Query();
+ query.addEqualityFilter(notifier.getUuid() + notfierPostFix, entry.getKey());
+ results = em.searchCollection(em.getApplication(), "devices", query);
+ for (Entity e : results.getEntities()) {
+ em.updateProperties(e, clearPushtokenMap);
+ }
+ }catch (Exception e){
+ LOG.error("failed to remove token",e);
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e1b2e736/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
index 69f66e5..5420d29 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
@@ -356,6 +356,7 @@ public class NotificationsService extends AbstractCollectionService {
public void testConnection(Notifier notifier) throws Exception {
ProviderAdapter providerAdapter = providerAdapters.get(notifier.getProvider());
if (providerAdapter != null) {
+ notifier.setEntityManager(em);
providerAdapter.testConnection(notifier);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e1b2e736/stack/services/src/main/java/org/apache/usergrid/services/notifications/ProviderAdapter.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/ProviderAdapter.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ProviderAdapter.java
index 5268674..33e921f 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/ProviderAdapter.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ProviderAdapter.java
@@ -46,7 +46,7 @@ public interface ProviderAdapter {
*/
public void doneSendingNotifications() throws Exception;
- public Map<String, Date> getInactiveDevices(Notifier notifier,
+ public void removeInactiveDevices(Notifier notifier,
EntityManager em) throws Exception;
public Object translatePayload(Object payload) throws Exception;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e1b2e736/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
index 7ce315b..432ad7f 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
@@ -35,6 +35,7 @@ import org.slf4j.LoggerFactory;
import rx.Observable;
import javax.annotation.PostConstruct;
import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -151,7 +152,7 @@ public class QueueListener {
LOG.info("retrieved batch of {} messages from queue {} ", messages.size(),queueName);
if (messages.size() > 0) {
-
+ Map<UUID,ApplicationQueueManager> queueManagerMap = new ConcurrentHashMap<>();
HashMap<UUID, List<QueueMessage>> messageMap = new HashMap<>(messages.size());
//group messages into hash map by app id
for (QueueMessage message : messages) {
@@ -172,14 +173,18 @@ public class QueueListener {
UUID applicationId = entry.getKey();
EntityManager entityManager = emf.getEntityManager(applicationId);
ServiceManager serviceManager = smf.getServiceManager(applicationId);
- final ApplicationQueueManager manager = new ApplicationQueueManager(
- new JobScheduler(serviceManager, entityManager),
- entityManager,
- queueManager,
- metricsService,
- properties
- );
+ ApplicationQueueManager manager = queueManagerMap.get(applicationId);
+ if(manager==null) {
+ manager = new ApplicationQueueManager(
+ new JobScheduler(serviceManager, entityManager),
+ entityManager,
+ queueManager,
+ metricsService,
+ properties
+ );
+ queueManagerMap.put(applicationId,manager);
+ }
LOG.info("send batch for app {} of {} messages", entry.getKey(), entry.getValue().size());
Observable current = manager.sendBatchToProviders(entry.getValue(),queueName);
if(merge == null)
@@ -192,6 +197,9 @@ public class QueueListener {
merge.toBlocking().lastOrDefault(null);
}
queueManager.commitMessages(messages);
+ for(ApplicationQueueManager applicationQueueManager : queueManagerMap.values()){
+ applicationQueueManager.asyncCheckForInactiveDevices();
+ }
meter.mark(messages.size());
LOG.info("sent batch {} messages duration {} ms", messages.size(),System.currentTimeMillis() - now);
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e1b2e736/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/APNsAdapter.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/APNsAdapter.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/APNsAdapter.java
index 4277b8a..7ab1dc3 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/APNsAdapter.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/APNsAdapter.java
@@ -47,8 +47,7 @@ import javax.net.ssl.SSLContext;
*/
public class APNsAdapter implements ProviderAdapter {
- private static final Logger logger = LoggerFactory
- .getLogger(APNsAdapter.class);
+ private static final Logger logger = LoggerFactory.getLogger(APNsAdapter.class);
public static int MAX_CONNECTION_POOL_SIZE = 15;
private static final Set<String> validEnvironments = new HashSet<String>();
@@ -72,7 +71,7 @@ public class APNsAdapter implements ProviderAdapter {
try {
CountDownLatch latch = new CountDownLatch(1);
notification.setLatch(latch);
- PushManager<SimpleApnsPushNotification> pushManager = getPushManager(notifier);
+ EntityPushManager pushManager = getPushManager(notifier);
addToQueue(pushManager, notification);
latch.await(10000,TimeUnit.MILLISECONDS);
if(notification.hasFailed()){
@@ -80,10 +79,10 @@ public class APNsAdapter implements ProviderAdapter {
throw new ConnectionException("Bad certificate. Double-check your environment.",notification.getCause() != null ? notification.getCause() : new Exception("Bad certificate."));
}
notification.finished();
- } catch (Exception e) {
- notification.finished();
+ } catch (Exception e) {
+ notification.finished();
- if (e instanceof ConnectionException) {
+ if (e instanceof ConnectionException) {
throw (ConnectionException) e;
}
if (e instanceof InterruptedException) {
@@ -120,16 +119,13 @@ public class APNsAdapter implements ProviderAdapter {
}
@Override
- public Map<String, Date> getInactiveDevices(Notifier notifier,
- EntityManager em) throws Exception {
- Map<String,Date> map = new HashMap<String,Date>();
+ public void removeInactiveDevices(Notifier notifier,EntityManager em) throws Exception {
PushManager<SimpleApnsPushNotification> pushManager = getPushManager(notifier);
pushManager.requestExpiredTokens();
- return map;
}
- private PushManager<SimpleApnsPushNotification> getPushManager(Notifier notifier) throws ExecutionException {
- PushManager<SimpleApnsPushNotification> pushManager = apnsServiceMap.get(notifier);
+ private EntityPushManager getPushManager(Notifier notifier) throws ExecutionException {
+ EntityPushManager pushManager = apnsServiceMap.get(notifier);
if(pushManager != null && !pushManager.isStarted() && pushManager.isShutDown()){
apnsServiceMap.invalidate(notifier);
pushManager = apnsServiceMap.get(notifier);
@@ -138,15 +134,15 @@ public class APNsAdapter implements ProviderAdapter {
}
//cache to retrieve push manager, cached per notifier, so many notifications will get same push manager
- private static LoadingCache<Notifier, PushManager<SimpleApnsPushNotification>> apnsServiceMap = CacheBuilder
+ private static LoadingCache<Notifier, EntityPushManager> apnsServiceMap = CacheBuilder
.newBuilder()
.expireAfterAccess(10, TimeUnit.MINUTES)
- .removalListener(new RemovalListener<Notifier, PushManager<SimpleApnsPushNotification>>() {
+ .removalListener(new RemovalListener<Notifier, EntityPushManager>() {
@Override
public void onRemoval(
- RemovalNotification<Notifier, PushManager<SimpleApnsPushNotification>> notification) {
+ RemovalNotification<Notifier,EntityPushManager> notification) {
try {
- PushManager<SimpleApnsPushNotification> manager = notification.getValue();
+ EntityPushManager manager = notification.getValue();
if (!manager.isShutDown()) {
List<SimpleApnsPushNotification> notifications = manager.shutdown(3000);
for (SimpleApnsPushNotification notification1 : notifications) {
@@ -161,31 +157,20 @@ public class APNsAdapter implements ProviderAdapter {
logger.error("Failed to shutdown from cache", ie);
}
}
- }).build(new CacheLoader<Notifier, PushManager<SimpleApnsPushNotification>>() {
+ }).build(new CacheLoader<Notifier, EntityPushManager>() {
@Override
- public PushManager<SimpleApnsPushNotification> load(Notifier notifier) {
+ public EntityPushManager load(final Notifier notifier) {
try {
- LinkedBlockingQueue<SimpleApnsPushNotification> queue = new LinkedBlockingQueue<SimpleApnsPushNotification>();
- NioEventLoopGroup group = new NioEventLoopGroup();
PushManagerConfiguration config = new PushManagerConfiguration();
config.setConcurrentConnectionCount(Runtime.getRuntime().availableProcessors() * 2);
- PushManager<SimpleApnsPushNotification> pushManager = new PushManager<>(getApnsEnvironment(notifier), getSSLContext(notifier), group,null , queue, config,notifier.getName());
+ EntityPushManager pushManager = new EntityPushManager(notifier, config);
//only tested when a message is sent
pushManager.registerRejectedNotificationListener(new RejectedAPNsListener());
//this will get tested when start is called
pushManager.registerFailedConnectionListener(new FailedConnectionListener());
+ //unregistered expired devices
+ pushManager.registerExpiredTokenListener(new ExpiredTokenListener());
- pushManager.registerExpiredTokenListener(new ExpiredTokenListener<SimpleApnsPushNotification>() {
- @Override
- public void handleExpiredTokens(PushManager<? extends SimpleApnsPushNotification> pushManager, Collection<ExpiredToken> expiredTokens) {
- Map<String,Date> map = new HashMap<String,Date>();
- for(ExpiredToken token : expiredTokens){
- String expiredToken = new String(token.getToken());
- map.put(expiredToken, token.getExpiration());
- }
- //TODO figure out way to call back and clear out em references
- }
- });
try {
if (!pushManager.isStarted()) { //ensure manager is started
pushManager.start();
@@ -253,24 +238,7 @@ public class APNsAdapter implements ProviderAdapter {
return wasDelayed;
}
- private static ApnsEnvironment getApnsEnvironment(Notifier notifier){
- return notifier.isProduction()
- ? ApnsEnvironment.getProductionEnvironment()
- : ApnsEnvironment.getSandboxEnvironment();
- }
- private static SSLContext getSSLContext(Notifier notifier) {
- try {
- KeyStore keyStore = KeyStore.getInstance("PKCS12");
- String password = notifier.getCertificatePassword();
- char[] passChars =(password != null ? password : "").toCharArray();
- InputStream stream = notifier.getP12CertificateStream();
- keyStore.load(stream,passChars);
- SSLContext context = SSLContextUtil.createDefaultSSLContext(keyStore, passChars);
- return context;
- }catch (Exception e){
- throw new RuntimeException("Error getting certificate",e);
- }
- }
}
+
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e1b2e736/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/EntityPushManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/EntityPushManager.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/EntityPushManager.java
new file mode 100644
index 0000000..29841f0
--- /dev/null
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/EntityPushManager.java
@@ -0,0 +1,72 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. The ASF licenses this file to You
+ * * under the Apache License, Version 2.0 (the "License"); you may not
+ * * use this file except in compliance with the License.
+ * * You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License. For additional information regarding
+ * * copyright in this work, please see the NOTICE file in the top level
+ * * directory of this distribution.
+ *
+ */
+
+package org.apache.usergrid.services.notifications.apns;
+
+import com.relayrides.pushy.apns.ApnsEnvironment;
+import com.relayrides.pushy.apns.PushManager;
+import com.relayrides.pushy.apns.PushManagerConfiguration;
+import com.relayrides.pushy.apns.util.SSLContextUtil;
+import com.relayrides.pushy.apns.util.SimpleApnsPushNotification;
+import org.apache.usergrid.persistence.EntityManager;
+import org.apache.usergrid.persistence.entities.Notifier;
+
+import javax.net.ssl.SSLContext;
+import java.io.InputStream;
+import java.security.KeyStore;
+import java.util.concurrent.LinkedBlockingDeque;
+
+/**
+ * Classy class class.
+ */
+public class EntityPushManager extends PushManager<SimpleApnsPushNotification> {
+ private final Notifier notifier;
+
+ public EntityPushManager( Notifier notifier, PushManagerConfiguration configuration) {
+ super(getApnsEnvironment(notifier), getSSLContext(notifier), null, null, new LinkedBlockingDeque<SimpleApnsPushNotification>(), configuration, notifier.getName());
+ this.notifier = notifier;
+ }
+
+ public EntityManager getEntityManager() {
+ return notifier.getEntityManager();
+ }
+
+ public Notifier getNotifier() {
+ return notifier;
+ }
+ private static ApnsEnvironment getApnsEnvironment(Notifier notifier){
+ return notifier.isProduction()
+ ? ApnsEnvironment.getProductionEnvironment()
+ : ApnsEnvironment.getSandboxEnvironment();
+ }
+ private static SSLContext getSSLContext(Notifier notifier) {
+ try {
+ KeyStore keyStore = KeyStore.getInstance("PKCS12");
+ String password = notifier.getCertificatePassword();
+ char[] passChars =(password != null ? password : "").toCharArray();
+ InputStream stream = notifier.getP12CertificateStream();
+ keyStore.load(stream,passChars);
+ SSLContext context = SSLContextUtil.createDefaultSSLContext(keyStore, passChars);
+ return context;
+ }catch (Exception e){
+ throw new RuntimeException("Error getting certificate",e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e1b2e736/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/ExpiredTokenListener.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/ExpiredTokenListener.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/ExpiredTokenListener.java
new file mode 100644
index 0000000..4c38013
--- /dev/null
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/ExpiredTokenListener.java
@@ -0,0 +1,60 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. The ASF licenses this file to You
+ * * under the Apache License, Version 2.0 (the "License"); you may not
+ * * use this file except in compliance with the License.
+ * * You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License. For additional information regarding
+ * * copyright in this work, please see the NOTICE file in the top level
+ * * directory of this distribution.
+ *
+ */
+
+package org.apache.usergrid.services.notifications.apns;
+
+import com.relayrides.pushy.apns.ExpiredToken;
+import com.relayrides.pushy.apns.PushManager;
+import com.relayrides.pushy.apns.util.SimpleApnsPushNotification;
+import org.apache.usergrid.persistence.Entity;
+import org.apache.usergrid.persistence.EntityManager;
+import org.apache.usergrid.persistence.Results;
+import org.apache.usergrid.persistence.entities.Notifier;
+import org.apache.usergrid.persistence.index.query.Query;
+import org.apache.usergrid.services.notifications.ApplicationQueueManager;
+import org.apache.usergrid.services.notifications.InactiveDeviceManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Classy class class.
+ */
+public class ExpiredTokenListener implements com.relayrides.pushy.apns.ExpiredTokenListener<SimpleApnsPushNotification> {
+
+
+ @Override
+ public void handleExpiredTokens(PushManager<? extends SimpleApnsPushNotification> pushManager, Collection<ExpiredToken> expiredTokens) {
+ Map<String,Date> inactiveDeviceMap = new HashMap<>();
+ for(ExpiredToken token : expiredTokens){
+ String expiredToken = new String(token.getToken());
+ inactiveDeviceMap.put(expiredToken, token.getExpiration());
+ }
+ if(pushManager instanceof EntityPushManager){
+ EntityPushManager entityPushManager = (EntityPushManager) pushManager;
+ InactiveDeviceManager inactiveDeviceManager = new InactiveDeviceManager(entityPushManager.getNotifier());
+ inactiveDeviceManager.removeInactiveDevices(inactiveDeviceMap);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e1b2e736/stack/services/src/main/java/org/apache/usergrid/services/notifications/gcm/GCMAdapter.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/gcm/GCMAdapter.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/gcm/GCMAdapter.java
index dca4a73..f8de5ff 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/gcm/GCMAdapter.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/gcm/GCMAdapter.java
@@ -19,6 +19,7 @@ package org.apache.usergrid.services.notifications.gcm;
import com.google.android.gcm.server.*;
import org.apache.usergrid.persistence.entities.Notification;
import org.apache.usergrid.persistence.entities.Notifier;
+import org.apache.usergrid.services.notifications.InactiveDeviceManager;
import org.mortbay.util.ajax.JSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -87,14 +88,16 @@ public class GCMAdapter implements ProviderAdapter {
}
@Override
- public Map<String, Date> getInactiveDevices(Notifier notifier,
+ public void removeInactiveDevices(Notifier notifier,
EntityManager em) throws Exception {
Batch batch = getBatch(notifier, null);
Map<String,Date> map = null;
if(batch != null) {
map = batch.getAndClearInactiveDevices();
+ InactiveDeviceManager deviceManager = new InactiveDeviceManager(notifier);
+ deviceManager.removeInactiveDevices(map);
}
- return map;
+
}
@Override