You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2014/10/29 18:23:18 UTC
[16/18] git commit: move queue maanger behind interface
move queue maanger behind interface
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/8f4720db
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/8f4720db
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/8f4720db
Branch: refs/heads/two-dot-o
Commit: 8f4720db79534172f964378c08b5f51884e046d0
Parents: 955a92b
Author: Shawn Feldman <sf...@apache.org>
Authored: Wed Oct 29 11:05:27 2014 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Wed Oct 29 11:05:27 2014 -0600
----------------------------------------------------------------------
.../notifications/ApplicationQueueManager.java | 532 +------------------
.../notifications/InactiveDeviceManager.java | 3 +-
.../notifications/NotificationsService.java | 14 +-
.../services/notifications/QueueListener.java | 7 +-
.../services/notifications/TaskManager.java | 17 +-
.../apns/ExpiredTokenListener.java | 8 -
.../impl/ApplicationQueueManagerImpl.java | 523 ++++++++++++++++++
.../apns/NotificationsServiceIT.java | 7 +-
.../gcm/NotificationsServiceIT.java | 7 +-
9 files changed, 569 insertions(+), 549 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8f4720db/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
index 1058c34..8012b42 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
@@ -1,526 +1,48 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. The ASF licenses this file to You
+ * * under the Apache License, Version 2.0 (the "License"); you may not
+ * * use this file except in compliance with the License.
+ * * You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License. For additional information regarding
+ * * copyright in this work, please see the NOTICE file in the top level
+ * * directory of this distribution.
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
*/
+
package org.apache.usergrid.services.notifications;
-import com.clearspring.analytics.hash.MurmurHash;
-import com.clearspring.analytics.stream.frequency.CountMinSketch;
-import com.codahale.metrics.Meter;
import org.apache.usergrid.batch.JobExecution;
-import org.apache.usergrid.metrics.MetricsFactory;
-import org.apache.usergrid.persistence.*;
-import org.apache.usergrid.persistence.entities.Device;
import org.apache.usergrid.persistence.entities.Notification;
-import org.apache.usergrid.persistence.entities.Notifier;
-import org.apache.usergrid.persistence.entities.Receipt;
-import org.apache.usergrid.persistence.index.query.Query;
-import org.apache.usergrid.persistence.queue.QueueManager;
import org.apache.usergrid.persistence.queue.QueueMessage;
-import org.apache.usergrid.services.notifications.apns.APNsAdapter;
-import org.apache.usergrid.services.notifications.gcm.GCMAdapter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import rx.Observable;
-import rx.Subscriber;
-import rx.functions.Action1;
-import rx.functions.Func1;
-import rx.schedulers.Schedulers;
-
-import java.security.Provider;
-import java.util.*;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.List;
-public class ApplicationQueueManager {
+/**
+ * Classy class class.
+ */
+public interface ApplicationQueueManager {
- public static String DEFAULT_QUEUE_NAME = "push_v1";
public static final String DEFAULT_QUEUE_PROPERTY = "usergrid.notifications.listener.queue";
- private static final Logger LOG = LoggerFactory.getLogger(ApplicationQueueManager.class);
-
- //this is for tests, will not mark initial post complete, set to false for tests
- private static ExecutorService INACTIVE_DEVICE_CHECK_POOL = Executors.newFixedThreadPool(5);
public static final String NOTIFIER_ID_POSTFIX = ".notifier.id";
- private final EntityManager em;
- private final QueueManager qm;
- private final JobScheduler jobScheduler;
- private final MetricsFactory metricsFactory;
- private final String queueName;
-
- HashMap<Object, ProviderAdapter> notifierHashMap; // only retrieve notifiers once
-
-
- public ApplicationQueueManager(JobScheduler jobScheduler, EntityManager entityManager, QueueManager queueManager, MetricsFactory metricsFactory, Properties properties){
- this.em = entityManager;
- this.qm = queueManager;
- this.jobScheduler = jobScheduler;
- this.metricsFactory = metricsFactory;
- this.queueName = getQueueNames(properties);
-
- }
-
- public boolean scheduleQueueJob(Notification notification) throws Exception{
- return jobScheduler.scheduleQueueJob(notification);
- }
-
- public void queueNotification(final Notification notification, final JobExecution jobExecution) throws Exception {
- if(scheduleQueueJob(notification)){
- em.update(notification);
- return;
- }
- final Meter queueMeter = metricsFactory.getMeter(ApplicationQueueManager.class,"queue");
- long startTime = System.currentTimeMillis();
-
- if (notification.getCanceled() == Boolean.TRUE) {
- LOG.info("notification " + notification.getUuid() + " canceled");
- if (jobExecution != null) {
- jobExecution.killed();
- }
- return;
- }
-
- LOG.info("notification {} start queuing", notification.getUuid());
-
- final PathQuery<Device> pathQuery = notification.getPathQuery() ; //devices query
- final AtomicInteger deviceCount = new AtomicInteger(); //count devices so you can make a judgement on batching
- final ConcurrentLinkedQueue<String> errorMessages = new ConcurrentLinkedQueue<String>(); //build up list of issues
-
- final HashMap<Object,ProviderAdapter> notifierMap = getNotifierMap();
-
- //get devices in querystring, and make sure you have access
- if (pathQuery != null) {
- LOG.info("notification {} start query", notification.getUuid());
- final Iterator<Device> iterator = pathQuery.iterator(em);
- //if there are more pages (defined by PAGE_SIZE) you probably want this to be async, also if this is already a job then don't reschedule
- if (iterator instanceof ResultsIterator && ((ResultsIterator) iterator).hasPages() && jobExecution == null) {
- jobScheduler.scheduleQueueJob(notification, true);
- em.update(notification);
- return;
- }
- final CountMinSketch sketch = new CountMinSketch(0.0001,.99,7364181); //add probablistic counter to find dups
- final UUID appId = em.getApplication().getUuid();
- final Map<String,Object> payloads = notification.getPayloads();
-
- final Func1<Entity,Entity> entityListFunct = new Func1<Entity, Entity>() {
- @Override
- public Entity call(Entity entity) {
-
- try {
-
- long now = System.currentTimeMillis();
- List<EntityRef> devicesRef = getDevices(entity); // resolve group
-
- LOG.info("notification {} queue {} devices, duration "+(System.currentTimeMillis()-now)+" ms", notification.getUuid(), devicesRef.size());
-
- for (EntityRef deviceRef : devicesRef) {
- LOG.info("notification {} starting to queue device {} ", notification.getUuid(), deviceRef.getUuid());
- long hash = MurmurHash.hash(deviceRef.getUuid());
- if (sketch.estimateCount(hash) > 0) { //look for duplicates
- LOG.warn("Maybe Found duplicate device: {}", deviceRef.getUuid());
- continue;
- } else {
- sketch.add(hash, 1);
- }
- String notifierId = null;
- String notifierKey = null;
-
- //find the device notifier info, match it to the payload
- for (Map.Entry<String, Object> entry : payloads.entrySet()) {
- ProviderAdapter adapter = notifierMap.get(entry.getKey().toLowerCase());
- now = System.currentTimeMillis();
- String providerId = getProviderId(deviceRef, adapter.getNotifier());
- if (providerId != null) {
- notifierId = providerId;
- notifierKey = entry.getKey().toLowerCase();
- break;
- }
- LOG.info("Provider query for notification {} device {} took "+(System.currentTimeMillis()-now)+" ms",notification.getUuid(),deviceRef.getUuid());
- }
-
- if (notifierId == null) {
- LOG.info("Notifier did not match for device {} ", deviceRef);
- continue;
- }
-
- ApplicationQueueMessage message = new ApplicationQueueMessage(appId, notification.getUuid(), deviceRef.getUuid(), notifierKey, notifierId);
- if (notification.getQueued() == null) {
- // update queued time
- now = System.currentTimeMillis();
- notification.setQueued(System.currentTimeMillis());
- LOG.info("notification {} device {} queue time set. duration "+(System.currentTimeMillis()-now)+" ms", notification.getUuid(), deviceRef.getUuid());
- }
- now = System.currentTimeMillis();
- qm.sendMessage(message);
- LOG.info("notification {} post-queue to device {} duration " + (System.currentTimeMillis() - now) + " ms "+queueName+" queue", notification.getUuid(), deviceRef.getUuid());
- deviceCount.incrementAndGet();
- queueMeter.mark();
- }
- } catch (Exception deviceLoopException) {
- LOG.error("Failed to add devices", deviceLoopException);
- errorMessages.add("Failed to add devices for entity: " + entity.getUuid() + " error:" + deviceLoopException);
- }
- return entity;
- }
- };
-
- long now = System.currentTimeMillis();
- Observable o = rx.Observable.create(new IteratorObservable<Entity>(iterator))
- .parallel(new Func1<Observable<Entity>, Observable<Entity>>() {
- @Override
- public rx.Observable<Entity> call(rx.Observable<Entity> deviceObservable) {
- return deviceObservable.map(entityListFunct);
- }
- }, Schedulers.io())
- .doOnError(new Action1<Throwable>() {
- @Override
- public void call(Throwable throwable) {
- LOG.error("Failed while writing", throwable);
- }
- });
- o.toBlocking().lastOrDefault(null);
- LOG.info("notification {} done queueing duration {} ms", notification.getUuid(), System.currentTimeMillis() - now);
- }
-
- // update queued time
- Map<String, Object> properties = new HashMap<String, Object>(2);
- properties.put("queued", notification.getQueued());
- properties.put("state", notification.getState());
- if(errorMessages.size()>0){
- if (notification.getErrorMessage() == null) {
- notification.setErrorMessage("There was a problem delivering all of your notifications. See deliveryErrors in properties");
- }
- }
-
- notification.setExpectedCount(deviceCount.get());
- notification.addProperties(properties);
- long now = System.currentTimeMillis();
-
-
- LOG.info("notification {} updated notification duration {} ms", notification.getUuid(), System.currentTimeMillis() - now);
-
- //do i have devices, and have i already started batching.
- if (deviceCount.get() <= 0 || !notification.getDebug()) {
- TaskManager taskManager = new TaskManager(em, this, notification);
- //if i'm in a test value will be false, do not mark finished for test orchestration, not ideal need real tests
- taskManager.finishedBatch(false,true);
- }else {
- em.update(notification);
- }
-
- long elapsed = notification.getQueued() != null ? notification.getQueued() - startTime : 0;
- LOG.info("notification {} done queuing to {} devices in " + elapsed + " ms", notification.getUuid().toString(), deviceCount.get());
- }
-
- /**
- * only need to get notifiers once. will reset on next batch
- * @return
- */
- public HashMap<Object,ProviderAdapter> getNotifierMap(){
- if(notifierHashMap == null) {
- long now = System.currentTimeMillis();
- notifierHashMap = new HashMap<Object, ProviderAdapter>();
- Query query = new Query();
- query.setCollection("notifiers");
- query.setLimit(100);
- PathQuery<Notifier> pathQuery = new PathQuery<Notifier>(
- new SimpleEntityRef(em.getApplicationRef()),
- query
- );
- Iterator<Notifier> notifierIterator = pathQuery.iterator(em);
- int count = 0;
- while (notifierIterator.hasNext()) {
- Notifier notifier = notifierIterator.next();
- String name = notifier.getName() != null ? notifier.getName() : "";
- UUID uuid = notifier.getUuid() != null ? notifier.getUuid() : UUID.randomUUID();
- ProviderAdapter providerAdapter = ProviderAdapterFactory.getProviderAdapter(notifier,em);
- notifierHashMap.put(name.toLowerCase(), providerAdapter);
- notifierHashMap.put(uuid, providerAdapter);
- notifierHashMap.put(uuid.toString(), providerAdapter);
- if(count++ >= 100){
- LOG.error("ApplicationQueueManager: too many notifiers...breaking out ", notifierHashMap.size());
- break;
- }
- }
- LOG.info("ApplicationQueueManager: fetching notifiers finished size={}, duration {} ms", notifierHashMap.size(),System.currentTimeMillis() - now);
- }
- return notifierHashMap;
- }
-
- /**
- * send batches of notifications to provider
- * @param messages
- * @throws Exception
- */
- public Observable sendBatchToProviders( final List<QueueMessage> messages, final String queuePath) {
- LOG.info("sending batch of {} notifications.", messages.size());
- final Meter sendMeter = metricsFactory.getMeter(NotificationsService.class, "send");
-
- final Map<Object, ProviderAdapter> notifierMap = getNotifierMap();
- final ApplicationQueueManager proxy = this;
- final ConcurrentHashMap<UUID,TaskManager> taskMap = new ConcurrentHashMap<UUID, TaskManager>(messages.size());
- final ConcurrentHashMap<UUID,Notification> notificationMap = new ConcurrentHashMap<UUID, Notification>(messages.size());
-
- final Func1<QueueMessage, ApplicationQueueMessage> func = new Func1<QueueMessage, ApplicationQueueMessage>() {
- @Override
- public ApplicationQueueMessage call(QueueMessage queueMessage) {
- boolean messageCommitted = false;
- ApplicationQueueMessage message = null;
- try {
- message = (ApplicationQueueMessage) queueMessage.getBody();
- LOG.info("start sending notification for device {} for Notification: {} on thread "+Thread.currentThread().getId(), message.getDeviceId(), message.getNotificationId());
-
- UUID deviceUUID = message.getDeviceId();
-
- Notification notification = notificationMap.get(message.getNotificationId());
- if (notification == null) {
- notification = em.get(message.getNotificationId(), Notification.class);
- notificationMap.put(message.getNotificationId(), notification);
- }
- TaskManager taskManager = taskMap.get(message.getNotificationId());
- if (taskManager == null) {
- taskManager = new TaskManager(em, proxy, notification);
- taskMap.putIfAbsent(message.getNotificationId(), taskManager);
- taskManager = taskMap.get(message.getNotificationId());
- }
-
- final Map<String, Object> payloads = notification.getPayloads();
- final Map<String, Object> translatedPayloads = translatePayloads(payloads, notifierMap);
- LOG.info("sending notification for device {} for Notification: {}", deviceUUID, notification.getUuid());
-
- try {
- String notifierName = message.getNotifierKey().toLowerCase();
- ProviderAdapter providerAdapter = notifierMap.get(notifierName.toLowerCase());
- Object payload = translatedPayloads.get(notifierName);
- Receipt receipt = new Receipt(notification.getUuid(), message.getNotifierId(), payload, deviceUUID);
- TaskTracker tracker = new TaskTracker(providerAdapter.getNotifier(), taskManager, receipt, deviceUUID);
- if(!isOkToSend(notification)){
- tracker.failed(0, "Notification is duplicate/expired/cancelled.");
- }else {
- if (payload == null) {
- LOG.debug("selected device {} for notification {} doesn't have a valid payload. skipping.", deviceUUID, notification.getUuid());
- tracker.failed(0, "failed to match payload to " + message.getNotifierId() + " notifier");
- } else {
- long now = System.currentTimeMillis();
- try {
- providerAdapter.sendNotification(message.getNotifierId(), payload, notification, tracker);
- } catch (Exception e) {
- tracker.failed(0, e.getMessage());
- } finally {
- LOG.info("sending to device {} for Notification: {} duration " + (System.currentTimeMillis() - now) + " ms", deviceUUID, notification.getUuid());
- }
- }
- }
- messageCommitted = true;
- } finally {
- sendMeter.mark();
- }
-
- } catch (Exception e) {
- LOG.error("Failure while sending",e);
- try {
- if(!messageCommitted && queuePath != null) {
- qm.commitMessage(queueMessage);
- }
- }catch (Exception queueException){
- LOG.error("Failed to commit message.",queueException);
- }
- }
- return message;
- }
- };
- Observable o = rx.Observable.from(messages)
- .parallel(new Func1<rx.Observable<QueueMessage>, rx.Observable<ApplicationQueueMessage>>() {
- @Override
- public rx.Observable<ApplicationQueueMessage> call(rx.Observable<QueueMessage> messageObservable) {
- return messageObservable.map(func);
- }
- }, Schedulers.io())
- .buffer(messages.size())
- .map(new Func1<List<ApplicationQueueMessage>, HashMap<UUID, ApplicationQueueMessage>>() {
- @Override
- public HashMap<UUID, ApplicationQueueMessage> call(List<ApplicationQueueMessage> queueMessages) {
- //for gcm this will actually send notification
- for (ProviderAdapter providerAdapter : notifierMap.values()) {
- try {
- providerAdapter.doneSendingNotifications();
- } catch (Exception e) {
- LOG.error("providerAdapter.doneSendingNotifications: ", e);
- }
- }
- //TODO: check if a notification is done and mark it
- HashMap<UUID, ApplicationQueueMessage> notifications = new HashMap<UUID, ApplicationQueueMessage>();
- for (ApplicationQueueMessage message : queueMessages) {
- if (notifications.get(message.getNotificationId()) == null) {
- try {
- TaskManager taskManager = taskMap.get(message.getNotificationId());
- notifications.put(message.getNotificationId(), message);
- taskManager.finishedBatch();
- } catch (Exception e) {
- LOG.error("Failed to finish batch", e);
- }
- }
-
- }
- return notifications;
- }
- })
- .doOnError(new Action1<Throwable>() {
- @Override
- public void call(Throwable throwable) {
- LOG.error("Failed while sending",throwable);
- }
- });
- return o;
- }
-
- public void stop(){
- for(ProviderAdapter adapter : getNotifierMap().values()){
- try {
- adapter.stop();
- }catch (Exception e){
- LOG.error("failed to stop adapter",e);
- }
- }
- }
-
-
- /**
- * Call the adapter with the notifier
- */
- private Map<String, Object> translatePayloads(Map<String, Object> payloads, Map<Object, ProviderAdapter> notifierMap) throws Exception {
- Map<String, Object> translatedPayloads = new HashMap<String, Object>( payloads.size());
- for (Map.Entry<String, Object> entry : payloads.entrySet()) {
- String payloadKey = entry.getKey().toLowerCase();
- Object payloadValue = entry.getValue();
- ProviderAdapter providerAdapter = notifierMap.get(payloadKey);
- if (providerAdapter != null) {
- Object translatedPayload = payloadValue != null ? providerAdapter.translatePayload(payloadValue) : null;
- if (translatedPayload != null) {
- translatedPayloads.put(payloadKey, translatedPayload);
- }
- }
- }
- return translatedPayloads;
- }
-
- public static String getQueueNames(Properties properties) {
- String name = properties.getProperty(ApplicationQueueManager.DEFAULT_QUEUE_PROPERTY,ApplicationQueueManager.DEFAULT_QUEUE_NAME);
- return name;
- }
-
- private static final class IteratorObservable<T> implements rx.Observable.OnSubscribe<T> {
- private final Iterator<T> input;
- private IteratorObservable( final Iterator input ) {this.input = input;}
-
- @Override
- public void call( final Subscriber<? super T> subscriber ) {
-
- /**
- * You would replace this code with your file reading. Instead of emitting from an iterator,
- * you would create a bean object that represents the entity, and then emit it
- */
-
- try {
- while ( !subscriber.isUnsubscribed() && input.hasNext() ) {
- //send our input to the next
- subscriber.onNext( (T) input.next() );
- }
-
- //tell the subscriber we don't have any more data
- subscriber.onCompleted();
- }
- catch ( Throwable t ) {
- LOG.error("failed on subscriber",t);
- subscriber.onError( t );
- }
- }
- }
-
- public void asyncCheckForInactiveDevices() throws Exception {
- Collection<ProviderAdapter> providerAdapters = getNotifierMap().values();
- for (final ProviderAdapter providerAdapter : providerAdapters) {
- try {
- if (providerAdapter != null) {
- LOG.debug("checking notifier {} for inactive devices", providerAdapter.getNotifier());
- providerAdapter.removeInactiveDevices();
-
- LOG.debug("finished checking notifier {} for inactive devices",providerAdapter.getNotifier());
- }
- } catch (Exception e) {
- LOG.error("checkForInactiveDevices", e); // not
- // essential so
- // don't fail,
- // but log
- }
- }
- }
-
-
- private boolean isOkToSend(Notification notification) {
- Map<String,Long> stats = notification.getStatistics();
- if (stats != null && notification.getExpectedCount() == (stats.get("sent")+ stats.get("errors"))) {
- LOG.info("notification {} already processed. not sending.",
- notification.getUuid());
- return false;
- }
- if (notification.getCanceled() == Boolean.TRUE) {
- LOG.info("notification {} canceled. not sending.",
- notification.getUuid());
- return false;
- }
- if (notification.isExpired()) {
- LOG.info("notification {} expired. not sending.",
- notification.getUuid());
- return false;
- }
- return true;
- }
-
- private List<EntityRef> getDevices(EntityRef ref) throws Exception {
- List<EntityRef> devices = Collections.EMPTY_LIST;
- if ("device".equals(ref.getType())) {
- devices = Collections.singletonList(ref);
- } else if ("user".equals(ref.getType())) {
- devices = em.getCollection(ref, "devices", null, Query.MAX_LIMIT,
- Query.Level.REFS, false).getRefs();
- } else if ("group".equals(ref.getType())) {
- devices = new ArrayList<EntityRef>();
- for (EntityRef r : em.getCollection(ref, "users", null,
- Query.MAX_LIMIT, Query.Level.REFS, false).getRefs()) {
- devices.addAll(getDevices(r));
- }
- }
- return devices;
- }
+ public static final String DEFAULT_QUEUE_NAME = "push_v1";
+ void queueNotification(Notification notification, JobExecution jobExecution) throws Exception;
- private String getProviderId(EntityRef device, Notifier notifier) throws Exception {
- try {
- Object value = em.getProperty(device, notifier.getName() + NOTIFIER_ID_POSTFIX);
- if (value == null) {
- value = em.getProperty(device, notifier.getUuid() + NOTIFIER_ID_POSTFIX);
- }
- return value != null ? value.toString() : null;
- } catch (Exception e) {
- LOG.error("Errer getting provider ID, proceding with rest of batch", e);
- return null;
- }
- }
+ Observable sendBatchToProviders(List<QueueMessage> messages, String queuePath);
+ void stop();
-}
\ No newline at end of file
+ void asyncCheckForInactiveDevices() throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8f4720db/stack/services/src/main/java/org/apache/usergrid/services/notifications/InactiveDeviceManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/InactiveDeviceManager.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/InactiveDeviceManager.java
index 82841d2..108a4a0 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/InactiveDeviceManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/InactiveDeviceManager.java
@@ -25,6 +25,7 @@ import org.apache.usergrid.persistence.EntityManager;
import org.apache.usergrid.persistence.Results;
import org.apache.usergrid.persistence.entities.Notifier;
import org.apache.usergrid.persistence.index.query.Query;
+import org.apache.usergrid.services.notifications.impl.ApplicationQueueManagerImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,7 +46,7 @@ public class InactiveDeviceManager {
this.entityManager = entityManager;
}
public void removeInactiveDevices( Map<String,Date> inactiveDeviceMap ){
- final String notfierPostFix = ApplicationQueueManager.NOTIFIER_ID_POSTFIX;
+ final String notfierPostFix = ApplicationQueueManagerImpl.NOTIFIER_ID_POSTFIX;
if (inactiveDeviceMap != null && inactiveDeviceMap.size() > 0) {
LOG.debug("processing {} inactive devices", inactiveDeviceMap.size());
Map<String, Object> clearPushtokenMap = new HashMap<String, Object>( 2);
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8f4720db/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
index a64704c..5de4d7d 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
@@ -29,13 +29,12 @@ import org.apache.usergrid.persistence.entities.Notifier;
import org.apache.usergrid.persistence.entities.Receipt;
import org.apache.usergrid.persistence.index.query.Identifier;
import org.apache.usergrid.persistence.index.query.Query;
-import org.apache.usergrid.persistence.model.entity.SimpleId;
import org.apache.usergrid.persistence.queue.QueueManager;
import org.apache.usergrid.persistence.queue.QueueManagerFactory;
import org.apache.usergrid.persistence.queue.QueueScope;
import org.apache.usergrid.persistence.queue.QueueScopeFactory;
-import org.apache.usergrid.persistence.queue.impl.QueueScopeImpl;
import org.apache.usergrid.services.*;
+import org.apache.usergrid.services.notifications.impl.ApplicationQueueManagerImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,9 +43,6 @@ import org.apache.usergrid.persistence.exceptions.RequiredPropertyNotFoundExcept
import org.apache.usergrid.services.exceptions.ForbiddenServiceOperationException;
import static org.apache.usergrid.utils.InflectionUtils.pluralize;
-import org.apache.usergrid.services.notifications.apns.APNsAdapter;
-import org.apache.usergrid.services.notifications.gcm.GCMAdapter;
-import org.springframework.beans.factory.annotation.Autowired;
import rx.Observable;
import rx.functions.Func1;
import rx.schedulers.Schedulers;
@@ -63,8 +59,6 @@ public class NotificationsService extends AbstractCollectionService {
//need a mocking framework, this is to substitute for no mocking
public static QueueManager TEST_QUEUE_MANAGER = null;
- public static final String NOTIFIER_ID_POSTFIX = ".notifier.id";
-
static final String MESSAGE_PROPERTY_DEVICE_UUID = "deviceUUID";
static {
@@ -94,12 +88,12 @@ public class NotificationsService extends AbstractCollectionService {
postMeter = metricsService.getMeter(NotificationsService.class, "requests");
postTimer = metricsService.getTimer(this.getClass(), "execution_rest");
JobScheduler jobScheduler = new JobScheduler(sm,em);
- String name = ApplicationQueueManager.getQueueNames(props);
+ String name = ApplicationQueueManagerImpl.getQueueNames(props);
QueueScopeFactory queueScopeFactory = CpSetup.getInjector().getInstance(QueueScopeFactory.class);
QueueScope queueScope = queueScopeFactory.getScope(smf.getManagementAppId(), name);
queueManagerFactory = CpSetup.getInjector().getInstance(QueueManagerFactory.class);
QueueManager queueManager = TEST_QUEUE_MANAGER !=null ? TEST_QUEUE_MANAGER : queueManagerFactory.getQueueManager(queueScope);
- notificationQueueManager = new ApplicationQueueManager(jobScheduler,em,queueManager,metricsService,props);
+ notificationQueueManager = new ApplicationQueueManagerImpl(jobScheduler,em,queueManager,metricsService,props);
gracePeriod = jobScheduler.SCHEDULER_GRACE_PERIOD;
}
@@ -262,7 +256,7 @@ public class NotificationsService extends AbstractCollectionService {
throw new IllegalArgumentException("notifier \""
+ notifierId + "\" not found");
}
- ProviderAdapter providerAdapter = ProviderAdapterFactory.getProviderAdapter(notifier,em);
+ ProviderAdapter providerAdapter = ProviderAdapterFactory.getProviderAdapter(notifier, em);
Object payload = entry.getValue();
try {
return providerAdapter.translatePayload(payload); // validate
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8f4720db/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
index 3c788a9..b5aaeda 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
@@ -25,12 +25,11 @@ import org.apache.usergrid.metrics.MetricsFactory;
import org.apache.usergrid.persistence.EntityManager;
import org.apache.usergrid.persistence.EntityManagerFactory;
-import org.apache.usergrid.persistence.model.entity.SimpleId;
import org.apache.usergrid.persistence.queue.*;
import org.apache.usergrid.persistence.queue.QueueManager;
-import org.apache.usergrid.persistence.queue.impl.QueueScopeImpl;
import org.apache.usergrid.services.ServiceManager;
import org.apache.usergrid.services.ServiceManagerFactory;
+import org.apache.usergrid.services.notifications.impl.ApplicationQueueManagerImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
@@ -96,7 +95,7 @@ public class QueueListener {
sleepWhenNoneFound = new Long(properties.getProperty("usergrid.notifications.listener.sleep.after", ""+DEFAULT_SLEEP)).longValue();
batchSize = new Integer(properties.getProperty("usergrid.notifications.listener.batchSize", (""+batchSize)));
consecutiveCallsToRemoveDevices = new Integer(properties.getProperty("usergrid.notifications.inactive.interval", ""+200));
- queueName = ApplicationQueueManager.getQueueNames(properties);
+ queueName = ApplicationQueueManagerImpl.getQueueNames(properties);
int maxThreads = new Integer(properties.getProperty("usergrid.notifications.listener.maxThreads", ""+MAX_THREADS));
@@ -251,7 +250,7 @@ public class QueueListener {
EntityManager entityManager = emf.getEntityManager(applicationId);
ServiceManager serviceManager = smf.getServiceManager(applicationId);
- ApplicationQueueManager manager = new ApplicationQueueManager(
+ ApplicationQueueManagerImpl manager = new ApplicationQueueManagerImpl(
new JobScheduler(serviceManager, entityManager),
entityManager,
queueManager,
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8f4720db/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java
index 5902a93..148a2dc 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java
@@ -23,19 +23,15 @@ import org.apache.usergrid.persistence.entities.Device;
import org.apache.usergrid.persistence.entities.Notification;
import org.apache.usergrid.persistence.entities.Notifier;
import org.apache.usergrid.persistence.entities.Receipt;
-import org.apache.usergrid.persistence.queue.QueueManager;
-import org.apache.usergrid.persistence.queue.QueueMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
public class TaskManager {
private static final Logger LOG = LoggerFactory.getLogger(TaskManager.class);
- private final ApplicationQueueManager proxy;
private Notification notification;
private AtomicLong successes = new AtomicLong();
@@ -43,10 +39,9 @@ public class TaskManager {
private EntityManager em;
private boolean hasFinished;
- public TaskManager(EntityManager em,ApplicationQueueManager proxy, Notification notification) {
+ public TaskManager(EntityManager em, Notification notification) {
this.em = em;
this.notification = notification;
- this.proxy = proxy;
hasFinished = false;
}
@@ -132,14 +127,14 @@ public class TaskManager {
protected void replaceProviderId(EntityRef device, Notifier notifier,
String newProviderId) throws Exception {
Object value = em.getProperty(device, notifier.getName()
- + NotificationsService.NOTIFIER_ID_POSTFIX);
+ + ApplicationQueueManager.NOTIFIER_ID_POSTFIX);
if (value != null) {
- em.setProperty(device, notifier.getName() + NotificationsService.NOTIFIER_ID_POSTFIX, newProviderId);
+ em.setProperty(device, notifier.getName() + ApplicationQueueManager.NOTIFIER_ID_POSTFIX, newProviderId);
} else {
value = em.getProperty(device, notifier.getUuid()
- + NotificationsService.NOTIFIER_ID_POSTFIX);
+ + ApplicationQueueManager.NOTIFIER_ID_POSTFIX);
if (value != null) {
- em.setProperty(device, notifier.getUuid() + NotificationsService.NOTIFIER_ID_POSTFIX, newProviderId);
+ em.setProperty(device, notifier.getUuid() + ApplicationQueueManager.NOTIFIER_ID_POSTFIX, newProviderId);
}
}
}
@@ -183,7 +178,7 @@ public class TaskManager {
LOG.info("notification finished batch: {} of {} devices in " + latency + "ms", notification.getUuid(), totals);
em.update(notification);
-// Set<Notifier> notifiers = new HashSet<>(proxy.getNotifierMap().values()); // remove dups
+// Set<Notifier> notifiers = new HashSet<>(proxy.getAdapterMap().values()); // remove dups
// proxy.asyncCheckForInactiveDevices(notifiers);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8f4720db/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/ExpiredTokenListener.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/ExpiredTokenListener.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/ExpiredTokenListener.java
index 6408dfd..1f7984a 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/ExpiredTokenListener.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/ExpiredTokenListener.java
@@ -23,15 +23,7 @@ package org.apache.usergrid.services.notifications.apns;
import com.relayrides.pushy.apns.ExpiredToken;
import com.relayrides.pushy.apns.PushManager;
import com.relayrides.pushy.apns.util.SimpleApnsPushNotification;
-import org.apache.usergrid.persistence.Entity;
-import org.apache.usergrid.persistence.EntityManager;
-import org.apache.usergrid.persistence.Results;
-import org.apache.usergrid.persistence.entities.Notifier;
-import org.apache.usergrid.persistence.index.query.Query;
-import org.apache.usergrid.services.notifications.ApplicationQueueManager;
import org.apache.usergrid.services.notifications.InactiveDeviceManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.Date;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8f4720db/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
new file mode 100644
index 0000000..c8c5165
--- /dev/null
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
@@ -0,0 +1,523 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.services.notifications.impl;
+
+import com.clearspring.analytics.hash.MurmurHash;
+import com.clearspring.analytics.stream.frequency.CountMinSketch;
+import com.codahale.metrics.Meter;
+import org.apache.usergrid.batch.JobExecution;
+import org.apache.usergrid.metrics.MetricsFactory;
+import org.apache.usergrid.persistence.*;
+import org.apache.usergrid.persistence.entities.Device;
+import org.apache.usergrid.persistence.entities.Notification;
+import org.apache.usergrid.persistence.entities.Notifier;
+import org.apache.usergrid.persistence.entities.Receipt;
+import org.apache.usergrid.persistence.index.query.Query;
+import org.apache.usergrid.persistence.queue.QueueManager;
+import org.apache.usergrid.persistence.queue.QueueMessage;
+import org.apache.usergrid.services.notifications.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import rx.Observable;
+import rx.Subscriber;
+import rx.functions.Action1;
+import rx.functions.Func1;
+import rx.schedulers.Schedulers;
+
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
+
+
+public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ApplicationQueueManagerImpl.class);
+
+ //this is for tests, will not mark initial post complete, set to false for tests
+
+ private final EntityManager em;
+ private final QueueManager qm;
+ private final JobScheduler jobScheduler;
+ private final MetricsFactory metricsFactory;
+ private final String queueName;
+
+ HashMap<Object, ProviderAdapter> notifierHashMap; // only retrieve notifiers once
+
+
+ public ApplicationQueueManagerImpl(JobScheduler jobScheduler, EntityManager entityManager, QueueManager queueManager, MetricsFactory metricsFactory, Properties properties){
+ this.em = entityManager;
+ this.qm = queueManager;
+ this.jobScheduler = jobScheduler;
+ this.metricsFactory = metricsFactory;
+ this.queueName = getQueueNames(properties);
+
+ }
+
+ private boolean scheduleQueueJob(Notification notification) throws Exception{
+ return jobScheduler.scheduleQueueJob(notification);
+ }
+
+ @Override
+ public void queueNotification(final Notification notification, final JobExecution jobExecution) throws Exception {
+ if(scheduleQueueJob(notification)){
+ em.update(notification);
+ return;
+ }
+ final Meter queueMeter = metricsFactory.getMeter(ApplicationQueueManagerImpl.class,"queue");
+ long startTime = System.currentTimeMillis();
+
+ if (notification.getCanceled() == Boolean.TRUE) {
+ LOG.info("notification " + notification.getUuid() + " canceled");
+ if (jobExecution != null) {
+ jobExecution.killed();
+ }
+ return;
+ }
+
+ LOG.info("notification {} start queuing", notification.getUuid());
+
+ final PathQuery<Device> pathQuery = notification.getPathQuery() ; //devices query
+ final AtomicInteger deviceCount = new AtomicInteger(); //count devices so you can make a judgement on batching
+ final ConcurrentLinkedQueue<String> errorMessages = new ConcurrentLinkedQueue<String>(); //build up list of issues
+
+
+ //get devices in querystring, and make sure you have access
+ if (pathQuery != null) {
+ final HashMap<Object,ProviderAdapter> notifierMap = getAdapterMap();
+ LOG.info("notification {} start query", notification.getUuid());
+ final Iterator<Device> iterator = pathQuery.iterator(em);
+ //if there are more pages (defined by PAGE_SIZE) you probably want this to be async, also if this is already a job then don't reschedule
+ if (iterator instanceof ResultsIterator && ((ResultsIterator) iterator).hasPages() && jobExecution == null) {
+ jobScheduler.scheduleQueueJob(notification, true);
+ em.update(notification);
+ return;
+ }
+ final CountMinSketch sketch = new CountMinSketch(0.0001,.99,7364181); //add probablistic counter to find dups
+ final UUID appId = em.getApplication().getUuid();
+ final Map<String,Object> payloads = notification.getPayloads();
+
+ final Func1<Entity,Entity> entityListFunct = new Func1<Entity, Entity>() {
+ @Override
+ public Entity call(Entity entity) {
+
+ try {
+
+ long now = System.currentTimeMillis();
+ List<EntityRef> devicesRef = getDevices(entity); // resolve group
+
+ LOG.info("notification {} queue {} devices, duration "+(System.currentTimeMillis()-now)+" ms", notification.getUuid(), devicesRef.size());
+
+ for (EntityRef deviceRef : devicesRef) {
+ LOG.info("notification {} starting to queue device {} ", notification.getUuid(), deviceRef.getUuid());
+ long hash = MurmurHash.hash(deviceRef.getUuid());
+ if (sketch.estimateCount(hash) > 0) { //look for duplicates
+ LOG.warn("Maybe Found duplicate device: {}", deviceRef.getUuid());
+ continue;
+ } else {
+ sketch.add(hash, 1);
+ }
+ String notifierId = null;
+ String notifierKey = null;
+
+ //find the device notifier info, match it to the payload
+ for (Map.Entry<String, Object> entry : payloads.entrySet()) {
+ ProviderAdapter adapter = notifierMap.get(entry.getKey().toLowerCase());
+ now = System.currentTimeMillis();
+ String providerId = getProviderId(deviceRef, adapter.getNotifier());
+ if (providerId != null) {
+ notifierId = providerId;
+ notifierKey = entry.getKey().toLowerCase();
+ break;
+ }
+ LOG.info("Provider query for notification {} device {} took "+(System.currentTimeMillis()-now)+" ms",notification.getUuid(),deviceRef.getUuid());
+ }
+
+ if (notifierId == null) {
+ LOG.info("Notifier did not match for device {} ", deviceRef);
+ continue;
+ }
+
+ ApplicationQueueMessage message = new ApplicationQueueMessage(appId, notification.getUuid(), deviceRef.getUuid(), notifierKey, notifierId);
+ if (notification.getQueued() == null) {
+ // update queued time
+ now = System.currentTimeMillis();
+ notification.setQueued(System.currentTimeMillis());
+ LOG.info("notification {} device {} queue time set. duration "+(System.currentTimeMillis()-now)+" ms", notification.getUuid(), deviceRef.getUuid());
+ }
+ now = System.currentTimeMillis();
+ qm.sendMessage(message);
+ LOG.info("notification {} post-queue to device {} duration " + (System.currentTimeMillis() - now) + " ms "+queueName+" queue", notification.getUuid(), deviceRef.getUuid());
+ deviceCount.incrementAndGet();
+ queueMeter.mark();
+ }
+ } catch (Exception deviceLoopException) {
+ LOG.error("Failed to add devices", deviceLoopException);
+ errorMessages.add("Failed to add devices for entity: " + entity.getUuid() + " error:" + deviceLoopException);
+ }
+ return entity;
+ }
+ };
+
+ long now = System.currentTimeMillis();
+ Observable o = rx.Observable.create(new IteratorObservable<Entity>(iterator))
+ .parallel(new Func1<Observable<Entity>, Observable<Entity>>() {
+ @Override
+ public rx.Observable<Entity> call(rx.Observable<Entity> deviceObservable) {
+ return deviceObservable.map(entityListFunct);
+ }
+ }, Schedulers.io())
+ .doOnError(new Action1<Throwable>() {
+ @Override
+ public void call(Throwable throwable) {
+ LOG.error("Failed while writing", throwable);
+ }
+ });
+ o.toBlocking().lastOrDefault(null);
+ LOG.info("notification {} done queueing duration {} ms", notification.getUuid(), System.currentTimeMillis() - now);
+ }
+
+ // update queued time
+ Map<String, Object> properties = new HashMap<String, Object>(2);
+ properties.put("queued", notification.getQueued());
+ properties.put("state", notification.getState());
+ if(errorMessages.size()>0){
+ if (notification.getErrorMessage() == null) {
+ notification.setErrorMessage("There was a problem delivering all of your notifications. See deliveryErrors in properties");
+ }
+ }
+
+ notification.setExpectedCount(deviceCount.get());
+ notification.addProperties(properties);
+ long now = System.currentTimeMillis();
+
+
+ LOG.info("notification {} updated notification duration {} ms", notification.getUuid(), System.currentTimeMillis() - now);
+
+ //do i have devices, and have i already started batching.
+ if (deviceCount.get() <= 0 || !notification.getDebug()) {
+ TaskManager taskManager = new TaskManager(em, notification);
+ //if i'm in a test value will be false, do not mark finished for test orchestration, not ideal need real tests
+ taskManager.finishedBatch(false,true);
+ }else {
+ em.update(notification);
+ }
+
+ long elapsed = notification.getQueued() != null ? notification.getQueued() - startTime : 0;
+ LOG.info("notification {} done queuing to {} devices in " + elapsed + " ms", notification.getUuid().toString(), deviceCount.get());
+ }
+
+ /**
+ * only need to get notifiers once. will reset on next batch
+ * @return
+ */
+ private HashMap<Object,ProviderAdapter> getAdapterMap(){
+ if(notifierHashMap == null) {
+ long now = System.currentTimeMillis();
+ notifierHashMap = new HashMap<Object, ProviderAdapter>();
+ Query query = new Query();
+ query.setCollection("notifiers");
+ query.setLimit(100);
+ PathQuery<Notifier> pathQuery = new PathQuery<Notifier>(
+ new SimpleEntityRef(em.getApplicationRef()),
+ query
+ );
+ Iterator<Notifier> notifierIterator = pathQuery.iterator(em);
+ int count = 0;
+ while (notifierIterator.hasNext()) {
+ Notifier notifier = notifierIterator.next();
+ String name = notifier.getName() != null ? notifier.getName() : "";
+ UUID uuid = notifier.getUuid() != null ? notifier.getUuid() : UUID.randomUUID();
+ ProviderAdapter providerAdapter = ProviderAdapterFactory.getProviderAdapter(notifier,em);
+ notifierHashMap.put(name.toLowerCase(), providerAdapter);
+ notifierHashMap.put(uuid, providerAdapter);
+ notifierHashMap.put(uuid.toString(), providerAdapter);
+ if(count++ >= 100){
+ LOG.error("ApplicationQueueManager: too many notifiers...breaking out ", notifierHashMap.size());
+ break;
+ }
+ }
+ LOG.info("ApplicationQueueManager: fetching notifiers finished size={}, duration {} ms", notifierHashMap.size(),System.currentTimeMillis() - now);
+ }
+ return notifierHashMap;
+ }
+
+ /**
+ * send batches of notifications to provider
+ * @param messages
+ * @throws Exception
+ */
+ @Override
+ public Observable sendBatchToProviders(final List<QueueMessage> messages, final String queuePath) {
+ LOG.info("sending batch of {} notifications.", messages.size());
+ final Meter sendMeter = metricsFactory.getMeter(NotificationsService.class, "send");
+
+ final Map<Object, ProviderAdapter> notifierMap = getAdapterMap();
+ final ApplicationQueueManagerImpl proxy = this;
+ final ConcurrentHashMap<UUID,TaskManager> taskMap = new ConcurrentHashMap<UUID, TaskManager>(messages.size());
+ final ConcurrentHashMap<UUID,Notification> notificationMap = new ConcurrentHashMap<UUID, Notification>(messages.size());
+
+ final Func1<QueueMessage, ApplicationQueueMessage> func = new Func1<QueueMessage, ApplicationQueueMessage>() {
+ @Override
+ public ApplicationQueueMessage call(QueueMessage queueMessage) {
+ boolean messageCommitted = false;
+ ApplicationQueueMessage message = null;
+ try {
+ message = (ApplicationQueueMessage) queueMessage.getBody();
+ LOG.info("start sending notification for device {} for Notification: {} on thread "+Thread.currentThread().getId(), message.getDeviceId(), message.getNotificationId());
+
+ UUID deviceUUID = message.getDeviceId();
+
+ Notification notification = notificationMap.get(message.getNotificationId());
+ if (notification == null) {
+ notification = em.get(message.getNotificationId(), Notification.class);
+ notificationMap.put(message.getNotificationId(), notification);
+ }
+ TaskManager taskManager = taskMap.get(message.getNotificationId());
+ if (taskManager == null) {
+ taskManager = new TaskManager(em, notification);
+ taskMap.putIfAbsent(message.getNotificationId(), taskManager);
+ taskManager = taskMap.get(message.getNotificationId());
+ }
+
+ final Map<String, Object> payloads = notification.getPayloads();
+ final Map<String, Object> translatedPayloads = translatePayloads(payloads, notifierMap);
+ LOG.info("sending notification for device {} for Notification: {}", deviceUUID, notification.getUuid());
+
+ try {
+ String notifierName = message.getNotifierKey().toLowerCase();
+ ProviderAdapter providerAdapter = notifierMap.get(notifierName.toLowerCase());
+ Object payload = translatedPayloads.get(notifierName);
+ Receipt receipt = new Receipt(notification.getUuid(), message.getNotifierId(), payload, deviceUUID);
+ TaskTracker tracker = new TaskTracker(providerAdapter.getNotifier(), taskManager, receipt, deviceUUID);
+ if(!isOkToSend(notification)){
+ tracker.failed(0, "Notification is duplicate/expired/cancelled.");
+ }else {
+ if (payload == null) {
+ LOG.debug("selected device {} for notification {} doesn't have a valid payload. skipping.", deviceUUID, notification.getUuid());
+ tracker.failed(0, "failed to match payload to " + message.getNotifierId() + " notifier");
+ } else {
+ long now = System.currentTimeMillis();
+ try {
+ providerAdapter.sendNotification(message.getNotifierId(), payload, notification, tracker);
+ } catch (Exception e) {
+ tracker.failed(0, e.getMessage());
+ } finally {
+ LOG.info("sending to device {} for Notification: {} duration " + (System.currentTimeMillis() - now) + " ms", deviceUUID, notification.getUuid());
+ }
+ }
+ }
+ messageCommitted = true;
+ } finally {
+ sendMeter.mark();
+ }
+
+ } catch (Exception e) {
+ LOG.error("Failure while sending",e);
+ try {
+ if(!messageCommitted && queuePath != null) {
+ qm.commitMessage(queueMessage);
+ }
+ }catch (Exception queueException){
+ LOG.error("Failed to commit message.",queueException);
+ }
+ }
+ return message;
+ }
+ };
+ Observable o = rx.Observable.from(messages)
+ .parallel(new Func1<rx.Observable<QueueMessage>, rx.Observable<ApplicationQueueMessage>>() {
+ @Override
+ public rx.Observable<ApplicationQueueMessage> call(rx.Observable<QueueMessage> messageObservable) {
+ return messageObservable.map(func);
+ }
+ }, Schedulers.io())
+ .buffer(messages.size())
+ .map(new Func1<List<ApplicationQueueMessage>, HashMap<UUID, ApplicationQueueMessage>>() {
+ @Override
+ public HashMap<UUID, ApplicationQueueMessage> call(List<ApplicationQueueMessage> queueMessages) {
+ //for gcm this will actually send notification
+ for (ProviderAdapter providerAdapter : notifierMap.values()) {
+ try {
+ providerAdapter.doneSendingNotifications();
+ } catch (Exception e) {
+ LOG.error("providerAdapter.doneSendingNotifications: ", e);
+ }
+ }
+ //TODO: check if a notification is done and mark it
+ HashMap<UUID, ApplicationQueueMessage> notifications = new HashMap<UUID, ApplicationQueueMessage>();
+ for (ApplicationQueueMessage message : queueMessages) {
+ if (notifications.get(message.getNotificationId()) == null) {
+ try {
+ TaskManager taskManager = taskMap.get(message.getNotificationId());
+ notifications.put(message.getNotificationId(), message);
+ taskManager.finishedBatch();
+ } catch (Exception e) {
+ LOG.error("Failed to finish batch", e);
+ }
+ }
+
+ }
+ return notifications;
+ }
+ })
+ .doOnError(new Action1<Throwable>() {
+ @Override
+ public void call(Throwable throwable) {
+ LOG.error("Failed while sending",throwable);
+ }
+ });
+ return o;
+ }
+
+ @Override
+ public void stop(){
+ for(ProviderAdapter adapter : getAdapterMap().values()){
+ try {
+ adapter.stop();
+ }catch (Exception e){
+ LOG.error("failed to stop adapter",e);
+ }
+ }
+ }
+
+
+ /**
+ * Call the adapter with the notifier
+ */
+ private Map<String, Object> translatePayloads(Map<String, Object> payloads, Map<Object, ProviderAdapter> notifierMap) throws Exception {
+ Map<String, Object> translatedPayloads = new HashMap<String, Object>( payloads.size());
+ for (Map.Entry<String, Object> entry : payloads.entrySet()) {
+ String payloadKey = entry.getKey().toLowerCase();
+ Object payloadValue = entry.getValue();
+ ProviderAdapter providerAdapter = notifierMap.get(payloadKey);
+ if (providerAdapter != null) {
+ Object translatedPayload = payloadValue != null ? providerAdapter.translatePayload(payloadValue) : null;
+ if (translatedPayload != null) {
+ translatedPayloads.put(payloadKey, translatedPayload);
+ }
+ }
+ }
+ return translatedPayloads;
+ }
+
+ public static String getQueueNames(Properties properties) {
+ String name = properties.getProperty(ApplicationQueueManagerImpl.DEFAULT_QUEUE_PROPERTY, ApplicationQueueManagerImpl.DEFAULT_QUEUE_NAME);
+ return name;
+ }
+
+ private static final class IteratorObservable<T> implements rx.Observable.OnSubscribe<T> {
+ private final Iterator<T> input;
+ private IteratorObservable( final Iterator input ) {this.input = input;}
+
+ @Override
+ public void call( final Subscriber<? super T> subscriber ) {
+
+ /**
+ * You would replace this code with your file reading. Instead of emitting from an iterator,
+ * you would create a bean object that represents the entity, and then emit it
+ */
+
+ try {
+ while ( !subscriber.isUnsubscribed() && input.hasNext() ) {
+ //send our input to the next
+ subscriber.onNext( (T) input.next() );
+ }
+
+ //tell the subscriber we don't have any more data
+ subscriber.onCompleted();
+ }
+ catch ( Throwable t ) {
+ LOG.error("failed on subscriber",t);
+ subscriber.onError( t );
+ }
+ }
+ }
+
+ @Override
+ public void asyncCheckForInactiveDevices() throws Exception {
+ Collection<ProviderAdapter> providerAdapters = getAdapterMap().values();
+ for (final ProviderAdapter providerAdapter : providerAdapters) {
+ try {
+ if (providerAdapter != null) {
+ LOG.debug("checking notifier {} for inactive devices", providerAdapter.getNotifier());
+ providerAdapter.removeInactiveDevices();
+
+ LOG.debug("finished checking notifier {} for inactive devices",providerAdapter.getNotifier());
+ }
+ } catch (Exception e) {
+ LOG.error("checkForInactiveDevices", e); // not
+ // essential so
+ // don't fail,
+ // but log
+ }
+ }
+ }
+
+
+ private boolean isOkToSend(Notification notification) {
+ Map<String,Long> stats = notification.getStatistics();
+ if (stats != null && notification.getExpectedCount() == (stats.get("sent")+ stats.get("errors"))) {
+ LOG.info("notification {} already processed. not sending.",
+ notification.getUuid());
+ return false;
+ }
+ if (notification.getCanceled() == Boolean.TRUE) {
+ LOG.info("notification {} canceled. not sending.",
+ notification.getUuid());
+ return false;
+ }
+ if (notification.isExpired()) {
+ LOG.info("notification {} expired. not sending.",
+ notification.getUuid());
+ return false;
+ }
+ return true;
+ }
+
+ private List<EntityRef> getDevices(EntityRef ref) throws Exception {
+ List<EntityRef> devices = Collections.EMPTY_LIST;
+ if ("device".equals(ref.getType())) {
+ devices = Collections.singletonList(ref);
+ } else if ("user".equals(ref.getType())) {
+ devices = em.getCollection(ref, "devices", null, Query.MAX_LIMIT,
+ Query.Level.REFS, false).getRefs();
+ } else if ("group".equals(ref.getType())) {
+ devices = new ArrayList<EntityRef>();
+ for (EntityRef r : em.getCollection(ref, "users", null,
+ Query.MAX_LIMIT, Query.Level.REFS, false).getRefs()) {
+ devices.addAll(getDevices(r));
+ }
+ }
+ return devices;
+ }
+
+
+ private String getProviderId(EntityRef device, Notifier notifier) throws Exception {
+ try {
+ Object value = em.getProperty(device, notifier.getName() + NOTIFIER_ID_POSTFIX);
+ if (value == null) {
+ value = em.getProperty(device, notifier.getUuid() + NOTIFIER_ID_POSTFIX);
+ }
+ return value != null ? value.toString() : null;
+ } catch (Exception e) {
+ LOG.error("Errer getting provider ID, proceding with rest of batch", e);
+ return null;
+ }
+ }
+
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8f4720db/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java b/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java
index 2a4ec73..02f881d 100644
--- a/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java
@@ -16,10 +16,8 @@
*/
package org.apache.usergrid.services.notifications.apns;
-import com.relayrides.pushy.apns.*;
import com.relayrides.pushy.apns.util.*;
import org.apache.commons.io.IOUtils;
-import org.apache.usergrid.services.ServiceParameter;
import org.apache.usergrid.persistence.*;
import org.apache.usergrid.persistence.entities.*;
import org.apache.usergrid.persistence.index.query.Query;
@@ -30,7 +28,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.InputStream;
-import java.net.SocketException;
import java.util.*;
import org.apache.usergrid.services.ServiceAction;
@@ -40,7 +37,7 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.fail;
-import static org.apache.usergrid.services.notifications.NotificationsService.NOTIFIER_ID_POSTFIX;
+import static org.apache.usergrid.services.notifications.impl.ApplicationQueueManagerImpl.NOTIFIER_ID_POSTFIX;
// todo: test reschedule on delivery time change
// todo: test restart of queuing
@@ -68,7 +65,7 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
@BeforeClass
public static void setup(){
- ApplicationQueueManager.DEFAULT_QUEUE_NAME = "test";
+
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8f4720db/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java b/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java
index 3795be1..ad1c9f2 100644
--- a/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java
@@ -18,22 +18,19 @@ package org.apache.usergrid.services.notifications.gcm;
import org.apache.usergrid.persistence.*;
import org.apache.usergrid.persistence.entities.*;
-import org.apache.usergrid.persistence.index.query.Query;
-import org.apache.usergrid.services.ServiceParameter;
import org.apache.usergrid.services.TestQueueManager;
import org.apache.usergrid.services.notifications.*;
import org.junit.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
import java.lang.reflect.Field;
import java.util.*;
import org.apache.usergrid.services.ServiceAction;
import static org.junit.Assert.*;
-import static org.apache.usergrid.services.notifications.NotificationsService.NOTIFIER_ID_POSTFIX;
+import static org.apache.usergrid.services.notifications.ApplicationQueueManager.NOTIFIER_ID_POSTFIX;
public class NotificationsServiceIT extends AbstractServiceNotificationIT {
@@ -59,7 +56,7 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
@BeforeClass
public static void setup(){
- ApplicationQueueManager.DEFAULT_QUEUE_NAME = "test";
+
}
@Override
@Before