You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2014/10/01 22:20:13 UTC
git commit: adding send now options
Repository: incubator-usergrid
Updated Branches:
refs/heads/two-dot-o 34a2a6ad1 -> 1530bf7aa
adding send now options
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/1530bf7a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/1530bf7a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/1530bf7a
Branch: refs/heads/two-dot-o
Commit: 1530bf7aa335627f6871f0ae3d534519ed362ab0
Parents: 34a2a6a
Author: Shawn Feldman <sf...@apache.org>
Authored: Wed Oct 1 14:01:19 2014 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Wed Oct 1 14:01:19 2014 -0600
----------------------------------------------------------------------
.../notifications/ApplicationQueueManager.java | 20 ++++++++++++++++----
.../services/notifications/TaskManager.java | 4 +++-
2 files changed, 19 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/1530bf7a/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
index a00d676..e730a6d 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
@@ -61,6 +61,7 @@ public class ApplicationQueueManager implements QueueManager {
private final JobScheduler jobScheduler;
private final MetricsFactory metricsFactory;
private final String[] queueNames;
+ private boolean sendNow = false;
HashMap<Object, Notifier> notifierHashMap; // only retrieve notifiers once
@@ -82,6 +83,7 @@ public class ApplicationQueueManager implements QueueManager {
this.jobScheduler = jobScheduler;
this.metricsFactory = metricsFactory;
this.queueNames = getQueueNames(properties);
+ this.sendNow = new Boolean(properties.getProperty("usergrid.notifications.sendNow",""+sendNow));
}
@@ -114,6 +116,7 @@ public class ApplicationQueueManager implements QueueManager {
final HashMap<Object,Notifier> notifierMap = getNotifierMap();
final String queueName = getRandomQueue(queueNames);
+ final List<ApplicationQueueMessage> messages = new ArrayList<>();
//get devices in querystring, and make sure you have access
if (pathQuery != null) {
@@ -128,8 +131,6 @@ public class ApplicationQueueManager implements QueueManager {
final CountMinSketch sketch = new CountMinSketch(0.0001,.99,7364181); //add probablistic counter to find dups
final UUID appId = em.getApplication().getUuid();
final Map<String,Object> payloads = notification.getPayloads();
-
-
final Func1<Entity,Entity> entityListFunct = new Func1<Entity, Entity>() {
@Override
public Entity call(Entity entity) {
@@ -179,7 +180,11 @@ public class ApplicationQueueManager implements QueueManager {
LOG.info("ApplicationQueueMessage: notification {} device {} queue time set. duration "+(System.currentTimeMillis()-now)+" ms", notification.getUuid(), deviceRef.getUuid());
}
now = System.currentTimeMillis();
- qm.postToQueue(queueName, message);
+ if(jobExecution == null && sendNow) {
+ messages.add(message);
+ }else{
+ qm.postToQueue(queueName, message);
+ }
LOG.info("ApplicationQueueMessage: notification {} post-queue to device {} duration " + (System.currentTimeMillis() - now) + " ms "+queueName+" queue", notification.getUuid(), deviceRef.getUuid());
deviceCount.incrementAndGet();
queueMeter.mark();
@@ -207,7 +212,7 @@ public class ApplicationQueueManager implements QueueManager {
}
});
o.toBlocking().lastOrDefault(null);
- LOG.info("ApplicationQueueMessage: notification {} done queueing duration {} ms", notification.getUuid(),System.currentTimeMillis() - now);
+ LOG.info("ApplicationQueueMessage: notification {} done queueing duration {} ms", notification.getUuid(), System.currentTimeMillis() - now);
}
@@ -244,6 +249,12 @@ public class ApplicationQueueManager implements QueueManager {
LOG.info("ApplicationQueueMessage: notification {} done queuing to {} devices in "+elapsed+" ms",notification.getUuid().toString(),deviceCount.get());
}
+ now = System.currentTimeMillis();
+ if(sendNow && messages.size()>0){
+ sendBatchToProviders(messages,null).toBlocking().lastOrDefault(null);
+ }
+ LOG.info("ApplicationQueueMessage: notification {} done sending duration {} ms", notification.getUuid(), System.currentTimeMillis() - now);
+
}
/**
@@ -285,6 +296,7 @@ public class ApplicationQueueManager implements QueueManager {
* @param messages
* @throws Exception
*/
+
public Observable sendBatchToProviders( final List<ApplicationQueueMessage> messages, final String queuePath) {
LOG.info("sending batch of {} notifications.", messages.size());
final Meter sendMeter = metricsFactory.getMeter(NotificationsService.class, "send");
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/1530bf7a/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java
index f163431..07aed57 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java
@@ -63,7 +63,9 @@ public class TaskManager {
LOG.debug("REMOVED {}", deviceUUID);
try {
LOG.debug("notification {} removing device {} from remaining", notification.getUuid(), deviceUUID);
- qm.commitTransaction(queuePath, messageMap.get(deviceUUID).getTransaction(), null);
+ if(queuePath!=null){
+ qm.commitTransaction(queuePath, messageMap.get(deviceUUID).getTransaction(), null);
+ }
EntityRef deviceRef = new SimpleEntityRef(Device.ENTITY_TYPE, deviceUUID);
if (receipt != null) {