You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2014/08/22 21:57:46 UTC

[4/5] git commit: move notifier matching logic up

move notifier matching logic up


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/8a40594d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/8a40594d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/8a40594d

Branch: refs/heads/two-dot-o-notifications-queue
Commit: 8a40594d8ec92dd1cd980614a339cc6b597fe674
Parents: 5586b33
Author: Shawn Feldman <sf...@apache.org>
Authored: Fri Aug 22 13:57:05 2014 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Fri Aug 22 13:57:05 2014 -0600

----------------------------------------------------------------------
 .../persistence/entities/Notification.java      |  11 ++
 .../NotificationsQueueManager.java              | 125 ++++++++++---------
 .../services/notifications/QueueMessage.java    |  31 ++++-
 .../apns/NotificationsServiceIT.java            |   1 -
 4 files changed, 106 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8a40594d/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notification.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notification.java b/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notification.java
index 7a643ac..f070ee6 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notification.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notification.java
@@ -41,6 +41,8 @@ public class Notification extends TypedEntity {
 
     public static final String RECEIPTS_COLLECTION = "receipts";
 
+
+
     public static enum State {
         CREATED, FAILED, SCHEDULED, STARTED, FINISHED, CANCELED, EXPIRED
     }
@@ -49,6 +51,10 @@ public class Notification extends TypedEntity {
     @EntityProperty
     protected Map<String, Object> payloads;
 
+    /** Total count */
+    @EntityProperty
+    private int expectedCount;
+
     /** Time processed */
     @EntityProperty
     protected Long queued;
@@ -237,4 +243,9 @@ public class Notification extends TypedEntity {
     public void setQueued(Long queued) {
         this.queued = queued;
     }
+
+    public void setExpectedCount(int expectedCount) {  this.expectedCount = expectedCount;  }
+
+    @JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+    public int getExpectedCount() {  return expectedCount;  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8a40594d/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsQueueManager.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsQueueManager.java
index 34d4571..93543bd 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsQueueManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsQueueManager.java
@@ -135,6 +135,8 @@ public class NotificationsQueueManager implements NotificationServiceProxy {
         final AtomicInteger batchCount = new AtomicInteger(); //count devices so you can make a judgement on batching
         final int numCurrentBatchesConfig = getNumConcurrentBatches();
         final ConcurrentLinkedQueue<String> errorMessages = new ConcurrentLinkedQueue<String>(); //build up list of issues
+        final HashMap<Object,Notifier> notifierMap =  getNotifierMap();
+        final Map<String,Object> payloads = notification.getPayloads();
 
         //get devices in querystring, and make sure you have access
         if (pathQuery != null) {
@@ -163,7 +165,26 @@ public class NotificationsQueueManager implements NotificationServiceProxy {
                                     }else {
                                         sketch.add(hash,1);
                                     }
-                                    QueueMessage message = new QueueMessage(em.getApplicationId(),notification.getUuid(),deviceRef.getUuid());
+                                    String notifierId = null;
+                                    String notifierName = null;
+
+                                    //find the device notifier info, match it to the payload
+                                    for (Map.Entry<String, Object> entry : payloads.entrySet()) {
+                                        Notifier notifier = notifierMap.get(entry.getKey());
+                                        String providerId = getProviderId(deviceRef, notifier);
+                                        if (providerId != null) {
+                                            notifierId = providerId;
+                                            notifierName = notifier.getName();
+                                            break;
+                                        }
+                                    }
+
+                                    if(notifierId == null){
+                                        LOG.debug("Notifier did not match for device {} ", deviceRef);
+                                        continue;
+                                    }
+
+                                    QueueMessage message = new QueueMessage(em.getApplicationId(),notification.getUuid(),deviceRef.getUuid(),notifierName,notifierId);
                                     qm.postToQueue(QUEUE_NAME, message);
                                     if(notification.getQueued() == null){
                                         // update queued time
@@ -202,10 +223,12 @@ public class NotificationsQueueManager implements NotificationServiceProxy {
 
         if(errorMessages.size()>0){
             properties.put("deliveryErrors", errorMessages.toArray());
-            if(notification.getErrorMessage()==null){
+            if (notification.getErrorMessage() == null) {
                 notification.setErrorMessage("There was a problem delivering all of your notifications. See deliveryErrors in properties");
             }
         }
+
+        notification.setExpectedCount(deviceCount.get());
         notification.addProperties(properties);
         em.update(notification);
 
@@ -254,7 +277,6 @@ public class NotificationsQueueManager implements NotificationServiceProxy {
                         map.put("notification",notification);
                         final Map<String, Object> payloads = notification.getPayloads();
                         final Map<String, Object> translatedPayloads = translatePayloads(payloads, notifierMap);
-                        map.put("payloads",payloads);
                         map.put("translatedPayloads",translatedPayloads);
                         LOG.info("sending batch of {} devices for Notification: {}", messages.size(), notification.getUuid());
                         return map;
@@ -274,7 +296,6 @@ public class NotificationsQueueManager implements NotificationServiceProxy {
                                     try {
                                         UUID deviceUUID = message.getUuid();
                                         HashMap<String, Object> notificationMap = notificationCache.get(message.getNotificationId());
-                                        Map<String, Object> payloads = (Map<String, Object>) notificationMap.get("payloads");
                                         Map<String, Object> translatedPayloads = (Map<String, Object>) notificationMap.get("translatedPayloads");
                                         TaskManager taskManager = (TaskManager) notificationMap.get("taskManager");
                                         Notification notification = (Notification) notificationMap.get("notification");
@@ -282,59 +303,35 @@ public class NotificationsQueueManager implements NotificationServiceProxy {
                                             return message;
                                         }
                                         boolean foundNotifier = false;
-                                        for (Map.Entry<String, Object> entry : payloads.entrySet()) {
-                                            try {
-                                                String payloadKey = entry.getKey();
-                                                Notifier notifier = notifierMap.get(payloadKey.toLowerCase());
-                                                EntityRef deviceRef = new SimpleEntityRef(Device.ENTITY_TYPE, deviceUUID);
-
-                                                String providerId;
+                                        try {
+                                            String notifierName = message.getNotifierName();
+                                            Notifier notifier = notifierMap.get(notifierName.toLowerCase());
+                                            Object payload = translatedPayloads.get(notifierName);
+                                            Receipt receipt = new Receipt(notification.getUuid(), message.getNotifierId(), payload, deviceUUID, message);
+                                            TaskTracker tracker = new TaskTracker(notifier, taskManager, receipt, deviceUUID);
+
+                                            if (payload == null) {
+                                                LOG.debug("selected device {} for notification {} doesn't have a valid payload. skipping.", deviceUUID, notification.getUuid());
                                                 try {
-                                                    providerId = getProviderId(deviceRef, notifier);
-                                                    if (providerId == null) {
-                                                        LOG.debug("Provider not found.{} {}", deviceRef, notifier.getName());
-                                                        continue;
-                                                    }
-                                                } catch (Exception providerException) {
-                                                    LOG.error("Exception getting provider.", providerException);
-                                                    continue;
-                                                }
-                                                Object payload = translatedPayloads.get(payloadKey);
-
-                                                Receipt receipt = new Receipt(notification.getUuid(), providerId, payload, deviceUUID,message);
-                                                TaskTracker tracker = new TaskTracker(notifier, taskManager, receipt, deviceUUID);
-                                                if (payload == null) {
-                                                    LOG.debug("selected device {} for notification {} doesn't have a valid payload. skipping.", deviceUUID, notification.getUuid());
-                                                    try {
-                                                        tracker.failed(0, "failed to match payload to " + payloadKey + " notifier");
-                                                    } catch (Exception e) {
-                                                        LOG.debug("failed to mark device failed" + e);
-                                                    }
-                                                    continue;
-                                                }
-
-                                                if (LOG.isDebugEnabled()) {
-                                                    StringBuilder sb = new StringBuilder();
-                                                    sb.append("sending notification ").append(notification.getUuid());
-                                                    sb.append(" to device ").append(deviceUUID);
-                                                    LOG.debug(sb.toString());
+                                                    tracker.failed(0, "failed to match payload to " + message.getNotifierId() + " notifier");
+                                                } catch (Exception e) {
+                                                    LOG.debug("failed to mark device failed" + e);
                                                 }
+                                            }
 
+                                            try {
+                                                ProviderAdapter providerAdapter = providerAdapters.get(notifier.getProvider());
+                                                providerAdapter.sendNotification(message.getNotifierId(), notifier, payload, notification, tracker);
+                                            } catch (Exception e) {
                                                 try {
-                                                    ProviderAdapter providerAdapter = providerAdapters.get(notifier.getProvider());
-                                                    providerAdapter.sendNotification(providerId, notifier, payload, notification, tracker);
-
-                                                } catch (Exception e) {
-                                                    try {
-                                                        tracker.failed(0, e.getMessage());
-                                                    } catch (Exception trackerException) {
-                                                        LOG.error("tracker failed", trackerException);
-                                                    }
+                                                    tracker.failed(0, e.getMessage());
+                                                } catch (Exception trackerException) {
+                                                    LOG.error("tracker failed", trackerException);
                                                 }
-                                                foundNotifier = true;
-                                            } finally {
-                                                sendMeter.mark();
                                             }
+                                            foundNotifier = true;
+                                        } finally {
+                                            sendMeter.mark();
                                         }
                                         if (!foundNotifier) {
                                             try {
@@ -344,14 +341,14 @@ public class NotificationsQueueManager implements NotificationServiceProxy {
                                             }
                                         }
                                     } catch (Exception x) {
-
+                                        LOG.error("Failure unknown",x);
                                     }
                                     return message;
                                 }
                             });
                         }
                     }, Schedulers.io())
-                    .buffer(1000)
+                    .buffer(QueueListener.BATCH_SIZE)
                     .map(new Func1<List<QueueMessage>, Object>() {
                         @Override
                         public Object call(List<QueueMessage> queueMessages) {
@@ -363,6 +360,19 @@ public class NotificationsQueueManager implements NotificationServiceProxy {
                                     LOG.error("providerAdapter.doneSendingNotifications: ", e);
                                 }
                             }
+                            //TODO: check if a notification is done and mark it
+                            HashMap<UUID, Notification> notifications = new HashMap<UUID, Notification>();
+                            for (QueueMessage message : queueMessages) {
+                                if (notifications.get(message.getNotificationId()) == null) {
+                                    try {
+                                        final Notification notification = em.get(message.getNotificationId(), Notification.class);
+                                        finishedBatch(notification, 0, 0);
+                                    } catch (Exception e) {
+                                        LOG.error("Failed to finish batch", e);
+                                    }
+                                }
+
+                            }
                             notificationCache.cleanUp();
                             return null;
                         }
@@ -386,15 +396,16 @@ public class NotificationsQueueManager implements NotificationServiceProxy {
         Map<String, Object> properties = new HashMap<String, Object>(4);
         properties.put("statistics", notification.getStatistics());
         properties.put("modified", notification.getModified());
-
+        long sent = notification.getStatistics().get("sent");
+        long errors = notification.getStatistics().get("errors");
         //none of this is known and should you ever do this
+        if(notification.getExpectedCount() == (errors  + sent )) {
             notification.setFinished(notification.getModified());
             properties.put("finished", notification.getModified());
             properties.put("state", notification.getState());
             long elapsed = notification.getFinished()
                     - notification.getStarted();
-            long sent = notification.getStatistics().get("sent");
-            long errors = notification.getStatistics().get("errors");
+
 
             if (LOG.isInfoEnabled()) {
                 StringBuilder sb = new StringBuilder();
@@ -403,7 +414,7 @@ public class NotificationsQueueManager implements NotificationServiceProxy {
                 sb.append(" devices in ").append(elapsed).append(" ms");
                 LOG.info(sb.toString());
             }
-
+        }
         LOG.info("notification finished batch: {}",
                 notification.getUuid());
         em.updateProperties(notification, properties);

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8a40594d/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueMessage.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueMessage.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueMessage.java
index e6301ed..b8b5c6f 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueMessage.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueMessage.java
@@ -25,25 +25,32 @@ public class QueueMessage extends Message {
 
     static final String MESSAGE_PROPERTY_DEVICE_UUID = "deviceUUID";
     static final String MESSAGE_PROPERTY_APPLICATION_UUID = "applicationUUID";
+    static final String MESSAGE_PROPERTY_NOTIFIER_ID = "notifierId";
+    static final String MESSAGE_PROPERTY_NOTIFICATION_ID = "notificationId";
+    static final String MESSAGE_PROPERTY_NOTIFIER_NAME = "notifierName";
 
 
     public QueueMessage() {
     }
 
-    public QueueMessage(UUID applicationId,UUID notificationId,UUID deviceId){
+    public QueueMessage(UUID applicationId,UUID notificationId,UUID deviceId,String notifierName,String notifierId){
         setApplicationId(applicationId);
         setDeviceId(deviceId);
+        setNotificationId(notificationId);
+        setNotifierName(notifierName);
+        setNotifierId(notifierId);
     }
 
 
 
     public static QueueMessage generate(Message message){
-        return new QueueMessage((UUID) message.getObjectProperty(MESSAGE_PROPERTY_APPLICATION_UUID),(UUID) message.getObjectProperty("notificationId"),(UUID) message.getObjectProperty(MESSAGE_PROPERTY_DEVICE_UUID));
+        return new QueueMessage((UUID) message.getObjectProperty(MESSAGE_PROPERTY_APPLICATION_UUID),(UUID) message.getObjectProperty(MESSAGE_PROPERTY_NOTIFICATION_ID),(UUID) message.getObjectProperty(MESSAGE_PROPERTY_DEVICE_UUID),message.getStringProperty(MESSAGE_PROPERTY_NOTIFIER_NAME),message.getStringProperty(MESSAGE_PROPERTY_NOTIFIER_ID));
     }
 
     public UUID getApplicationId() {
         return (UUID) this.getObjectProperty(MESSAGE_PROPERTY_APPLICATION_UUID);
     }
+
     public void setApplicationId(UUID applicationId){
         this.setProperty(MESSAGE_PROPERTY_APPLICATION_UUID,applicationId);
     }
@@ -56,10 +63,26 @@ public class QueueMessage extends Message {
     }
 
     public UUID getNotificationId(){
-        return (UUID) this.getObjectProperty("notificationId");
+        return (UUID) this.getObjectProperty(MESSAGE_PROPERTY_NOTIFICATION_ID);
     }
 
     public void setNotificationId(UUID notificationId){
-        this.setProperty("notificationdId",notificationId);
+        this.setProperty(MESSAGE_PROPERTY_NOTIFICATION_ID,notificationId);
+    }
+
+    public String getNotifierId() {
+        return  this.getStringProperty(MESSAGE_PROPERTY_NOTIFIER_ID);
     }
+    public void setNotifierId(String notifierId){
+        this.setProperty(MESSAGE_PROPERTY_NOTIFIER_ID,notifierId);
+    }
+
+    public String getNotifierName() {
+        return  this.getStringProperty(MESSAGE_PROPERTY_NOTIFIER_NAME);
+    }
+    public void setNotifierName(String name){
+        this.setProperty(MESSAGE_PROPERTY_NOTIFIER_NAME,name);
+    }
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8a40594d/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java b/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java
index ed15eeb..456df69 100644
--- a/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java
@@ -77,7 +77,6 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
     public void before() throws Exception {
         super.before();
         // create apns notifier //
-        NotificationsQueueManager.IS_TEST = true;
 
         app.clear();
         app.put("name", "apns");