You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2014/08/22 21:57:46 UTC
[4/5] git commit: move notifier matching logic up
move notifier matching logic up
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/8a40594d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/8a40594d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/8a40594d
Branch: refs/heads/two-dot-o-notifications-queue
Commit: 8a40594d8ec92dd1cd980614a339cc6b597fe674
Parents: 5586b33
Author: Shawn Feldman <sf...@apache.org>
Authored: Fri Aug 22 13:57:05 2014 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Fri Aug 22 13:57:05 2014 -0600
----------------------------------------------------------------------
.../persistence/entities/Notification.java | 11 ++
.../NotificationsQueueManager.java | 125 ++++++++++---------
.../services/notifications/QueueMessage.java | 31 ++++-
.../apns/NotificationsServiceIT.java | 1 -
4 files changed, 106 insertions(+), 62 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8a40594d/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notification.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notification.java b/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notification.java
index 7a643ac..f070ee6 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notification.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notification.java
@@ -41,6 +41,8 @@ public class Notification extends TypedEntity {
public static final String RECEIPTS_COLLECTION = "receipts";
+
+
public static enum State {
CREATED, FAILED, SCHEDULED, STARTED, FINISHED, CANCELED, EXPIRED
}
@@ -49,6 +51,10 @@ public class Notification extends TypedEntity {
@EntityProperty
protected Map<String, Object> payloads;
+ /** Total count */
+ @EntityProperty
+ private int expectedCount;
+
/** Time processed */
@EntityProperty
protected Long queued;
@@ -237,4 +243,9 @@ public class Notification extends TypedEntity {
public void setQueued(Long queued) {
this.queued = queued;
}
+
+ public void setExpectedCount(int expectedCount) { this.expectedCount = expectedCount; }
+
+ @JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+ public int getExpectedCount() { return expectedCount; }
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8a40594d/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsQueueManager.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsQueueManager.java
index 34d4571..93543bd 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsQueueManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsQueueManager.java
@@ -135,6 +135,8 @@ public class NotificationsQueueManager implements NotificationServiceProxy {
final AtomicInteger batchCount = new AtomicInteger(); //count devices so you can make a judgement on batching
final int numCurrentBatchesConfig = getNumConcurrentBatches();
final ConcurrentLinkedQueue<String> errorMessages = new ConcurrentLinkedQueue<String>(); //build up list of issues
+ final HashMap<Object,Notifier> notifierMap = getNotifierMap();
+ final Map<String,Object> payloads = notification.getPayloads();
//get devices in querystring, and make sure you have access
if (pathQuery != null) {
@@ -163,7 +165,26 @@ public class NotificationsQueueManager implements NotificationServiceProxy {
}else {
sketch.add(hash,1);
}
- QueueMessage message = new QueueMessage(em.getApplicationId(),notification.getUuid(),deviceRef.getUuid());
+ String notifierId = null;
+ String notifierName = null;
+
+ //find the device notifier info, match it to the payload
+ for (Map.Entry<String, Object> entry : payloads.entrySet()) {
+ Notifier notifier = notifierMap.get(entry.getKey());
+ String providerId = getProviderId(deviceRef, notifier);
+ if (providerId != null) {
+ notifierId = providerId;
+ notifierName = notifier.getName();
+ break;
+ }
+ }
+
+ if(notifierId == null){
+ LOG.debug("Notifier did not match for device {} ", deviceRef);
+ continue;
+ }
+
+ QueueMessage message = new QueueMessage(em.getApplicationId(),notification.getUuid(),deviceRef.getUuid(),notifierName,notifierId);
qm.postToQueue(QUEUE_NAME, message);
if(notification.getQueued() == null){
// update queued time
@@ -202,10 +223,12 @@ public class NotificationsQueueManager implements NotificationServiceProxy {
if(errorMessages.size()>0){
properties.put("deliveryErrors", errorMessages.toArray());
- if(notification.getErrorMessage()==null){
+ if (notification.getErrorMessage() == null) {
notification.setErrorMessage("There was a problem delivering all of your notifications. See deliveryErrors in properties");
}
}
+
+ notification.setExpectedCount(deviceCount.get());
notification.addProperties(properties);
em.update(notification);
@@ -254,7 +277,6 @@ public class NotificationsQueueManager implements NotificationServiceProxy {
map.put("notification",notification);
final Map<String, Object> payloads = notification.getPayloads();
final Map<String, Object> translatedPayloads = translatePayloads(payloads, notifierMap);
- map.put("payloads",payloads);
map.put("translatedPayloads",translatedPayloads);
LOG.info("sending batch of {} devices for Notification: {}", messages.size(), notification.getUuid());
return map;
@@ -274,7 +296,6 @@ public class NotificationsQueueManager implements NotificationServiceProxy {
try {
UUID deviceUUID = message.getUuid();
HashMap<String, Object> notificationMap = notificationCache.get(message.getNotificationId());
- Map<String, Object> payloads = (Map<String, Object>) notificationMap.get("payloads");
Map<String, Object> translatedPayloads = (Map<String, Object>) notificationMap.get("translatedPayloads");
TaskManager taskManager = (TaskManager) notificationMap.get("taskManager");
Notification notification = (Notification) notificationMap.get("notification");
@@ -282,59 +303,35 @@ public class NotificationsQueueManager implements NotificationServiceProxy {
return message;
}
boolean foundNotifier = false;
- for (Map.Entry<String, Object> entry : payloads.entrySet()) {
- try {
- String payloadKey = entry.getKey();
- Notifier notifier = notifierMap.get(payloadKey.toLowerCase());
- EntityRef deviceRef = new SimpleEntityRef(Device.ENTITY_TYPE, deviceUUID);
-
- String providerId;
+ try {
+ String notifierName = message.getNotifierName();
+ Notifier notifier = notifierMap.get(notifierName.toLowerCase());
+ Object payload = translatedPayloads.get(notifierName);
+ Receipt receipt = new Receipt(notification.getUuid(), message.getNotifierId(), payload, deviceUUID, message);
+ TaskTracker tracker = new TaskTracker(notifier, taskManager, receipt, deviceUUID);
+
+ if (payload == null) {
+ LOG.debug("selected device {} for notification {} doesn't have a valid payload. skipping.", deviceUUID, notification.getUuid());
try {
- providerId = getProviderId(deviceRef, notifier);
- if (providerId == null) {
- LOG.debug("Provider not found.{} {}", deviceRef, notifier.getName());
- continue;
- }
- } catch (Exception providerException) {
- LOG.error("Exception getting provider.", providerException);
- continue;
- }
- Object payload = translatedPayloads.get(payloadKey);
-
- Receipt receipt = new Receipt(notification.getUuid(), providerId, payload, deviceUUID,message);
- TaskTracker tracker = new TaskTracker(notifier, taskManager, receipt, deviceUUID);
- if (payload == null) {
- LOG.debug("selected device {} for notification {} doesn't have a valid payload. skipping.", deviceUUID, notification.getUuid());
- try {
- tracker.failed(0, "failed to match payload to " + payloadKey + " notifier");
- } catch (Exception e) {
- LOG.debug("failed to mark device failed" + e);
- }
- continue;
- }
-
- if (LOG.isDebugEnabled()) {
- StringBuilder sb = new StringBuilder();
- sb.append("sending notification ").append(notification.getUuid());
- sb.append(" to device ").append(deviceUUID);
- LOG.debug(sb.toString());
+ tracker.failed(0, "failed to match payload to " + message.getNotifierId() + " notifier");
+ } catch (Exception e) {
+ LOG.debug("failed to mark device failed" + e);
}
+ }
+ try {
+ ProviderAdapter providerAdapter = providerAdapters.get(notifier.getProvider());
+ providerAdapter.sendNotification(message.getNotifierId(), notifier, payload, notification, tracker);
+ } catch (Exception e) {
try {
- ProviderAdapter providerAdapter = providerAdapters.get(notifier.getProvider());
- providerAdapter.sendNotification(providerId, notifier, payload, notification, tracker);
-
- } catch (Exception e) {
- try {
- tracker.failed(0, e.getMessage());
- } catch (Exception trackerException) {
- LOG.error("tracker failed", trackerException);
- }
+ tracker.failed(0, e.getMessage());
+ } catch (Exception trackerException) {
+ LOG.error("tracker failed", trackerException);
}
- foundNotifier = true;
- } finally {
- sendMeter.mark();
}
+ foundNotifier = true;
+ } finally {
+ sendMeter.mark();
}
if (!foundNotifier) {
try {
@@ -344,14 +341,14 @@ public class NotificationsQueueManager implements NotificationServiceProxy {
}
}
} catch (Exception x) {
-
+ LOG.error("Failure unknown",x);
}
return message;
}
});
}
}, Schedulers.io())
- .buffer(1000)
+ .buffer(QueueListener.BATCH_SIZE)
.map(new Func1<List<QueueMessage>, Object>() {
@Override
public Object call(List<QueueMessage> queueMessages) {
@@ -363,6 +360,19 @@ public class NotificationsQueueManager implements NotificationServiceProxy {
LOG.error("providerAdapter.doneSendingNotifications: ", e);
}
}
+ //TODO: check if a notification is done and mark it
+ HashMap<UUID, Notification> notifications = new HashMap<UUID, Notification>();
+ for (QueueMessage message : queueMessages) {
+ if (notifications.get(message.getNotificationId()) == null) {
+ try {
+ final Notification notification = em.get(message.getNotificationId(), Notification.class);
+ finishedBatch(notification, 0, 0);
+ } catch (Exception e) {
+ LOG.error("Failed to finish batch", e);
+ }
+ }
+
+ }
notificationCache.cleanUp();
return null;
}
@@ -386,15 +396,16 @@ public class NotificationsQueueManager implements NotificationServiceProxy {
Map<String, Object> properties = new HashMap<String, Object>(4);
properties.put("statistics", notification.getStatistics());
properties.put("modified", notification.getModified());
-
+ long sent = notification.getStatistics().get("sent");
+ long errors = notification.getStatistics().get("errors");
//none of this is known and should you ever do this
+ if(notification.getExpectedCount() == (errors + sent )) {
notification.setFinished(notification.getModified());
properties.put("finished", notification.getModified());
properties.put("state", notification.getState());
long elapsed = notification.getFinished()
- notification.getStarted();
- long sent = notification.getStatistics().get("sent");
- long errors = notification.getStatistics().get("errors");
+
if (LOG.isInfoEnabled()) {
StringBuilder sb = new StringBuilder();
@@ -403,7 +414,7 @@ public class NotificationsQueueManager implements NotificationServiceProxy {
sb.append(" devices in ").append(elapsed).append(" ms");
LOG.info(sb.toString());
}
-
+ }
LOG.info("notification finished batch: {}",
notification.getUuid());
em.updateProperties(notification, properties);
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8a40594d/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueMessage.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueMessage.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueMessage.java
index e6301ed..b8b5c6f 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueMessage.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueMessage.java
@@ -25,25 +25,32 @@ public class QueueMessage extends Message {
static final String MESSAGE_PROPERTY_DEVICE_UUID = "deviceUUID";
static final String MESSAGE_PROPERTY_APPLICATION_UUID = "applicationUUID";
+ static final String MESSAGE_PROPERTY_NOTIFIER_ID = "notifierId";
+ static final String MESSAGE_PROPERTY_NOTIFICATION_ID = "notificationId";
+ static final String MESSAGE_PROPERTY_NOTIFIER_NAME = "notifierName";
public QueueMessage() {
}
- public QueueMessage(UUID applicationId,UUID notificationId,UUID deviceId){
+ public QueueMessage(UUID applicationId,UUID notificationId,UUID deviceId,String notifierName,String notifierId){
setApplicationId(applicationId);
setDeviceId(deviceId);
+ setNotificationId(notificationId);
+ setNotifierName(notifierName);
+ setNotifierId(notifierId);
}
public static QueueMessage generate(Message message){
- return new QueueMessage((UUID) message.getObjectProperty(MESSAGE_PROPERTY_APPLICATION_UUID),(UUID) message.getObjectProperty("notificationId"),(UUID) message.getObjectProperty(MESSAGE_PROPERTY_DEVICE_UUID));
+ return new QueueMessage((UUID) message.getObjectProperty(MESSAGE_PROPERTY_APPLICATION_UUID),(UUID) message.getObjectProperty(MESSAGE_PROPERTY_NOTIFICATION_ID),(UUID) message.getObjectProperty(MESSAGE_PROPERTY_DEVICE_UUID),message.getStringProperty(MESSAGE_PROPERTY_NOTIFIER_NAME),message.getStringProperty(MESSAGE_PROPERTY_NOTIFIER_ID));
}
public UUID getApplicationId() {
return (UUID) this.getObjectProperty(MESSAGE_PROPERTY_APPLICATION_UUID);
}
+
public void setApplicationId(UUID applicationId){
this.setProperty(MESSAGE_PROPERTY_APPLICATION_UUID,applicationId);
}
@@ -56,10 +63,26 @@ public class QueueMessage extends Message {
}
public UUID getNotificationId(){
- return (UUID) this.getObjectProperty("notificationId");
+ return (UUID) this.getObjectProperty(MESSAGE_PROPERTY_NOTIFICATION_ID);
}
public void setNotificationId(UUID notificationId){
- this.setProperty("notificationdId",notificationId);
+ this.setProperty(MESSAGE_PROPERTY_NOTIFICATION_ID,notificationId);
+ }
+
+ public String getNotifierId() {
+ return this.getStringProperty(MESSAGE_PROPERTY_NOTIFIER_ID);
}
+ public void setNotifierId(String notifierId){
+ this.setProperty(MESSAGE_PROPERTY_NOTIFIER_ID,notifierId);
+ }
+
+ public String getNotifierName() {
+ return this.getStringProperty(MESSAGE_PROPERTY_NOTIFIER_NAME);
+ }
+ public void setNotifierName(String name){
+ this.setProperty(MESSAGE_PROPERTY_NOTIFIER_NAME,name);
+ }
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8a40594d/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java b/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java
index ed15eeb..456df69 100644
--- a/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java
@@ -77,7 +77,6 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
public void before() throws Exception {
super.before();
// create apns notifier //
- NotificationsQueueManager.IS_TEST = true;
app.clear();
app.put("name", "apns");