You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2014/10/01 22:20:13 UTC

git commit: adding send now options

Repository: incubator-usergrid
Updated Branches:
  refs/heads/two-dot-o 34a2a6ad1 -> 1530bf7aa


adding send now options


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/1530bf7a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/1530bf7a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/1530bf7a

Branch: refs/heads/two-dot-o
Commit: 1530bf7aa335627f6871f0ae3d534519ed362ab0
Parents: 34a2a6a
Author: Shawn Feldman <sf...@apache.org>
Authored: Wed Oct 1 14:01:19 2014 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Wed Oct 1 14:01:19 2014 -0600

----------------------------------------------------------------------
 .../notifications/ApplicationQueueManager.java  | 20 ++++++++++++++++----
 .../services/notifications/TaskManager.java     |  4 +++-
 2 files changed, 19 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/1530bf7a/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
index a00d676..e730a6d 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
@@ -61,6 +61,7 @@ public class ApplicationQueueManager implements QueueManager {
     private final JobScheduler jobScheduler;
     private final MetricsFactory metricsFactory;
     private final String[] queueNames;
+    private boolean sendNow = false;
 
     HashMap<Object, Notifier> notifierHashMap; // only retrieve notifiers once
 
@@ -82,6 +83,7 @@ public class ApplicationQueueManager implements QueueManager {
         this.jobScheduler = jobScheduler;
         this.metricsFactory = metricsFactory;
         this.queueNames = getQueueNames(properties);
+        this.sendNow = new Boolean(properties.getProperty("usergrid.notifications.sendNow",""+sendNow));
     }
 
 
@@ -114,6 +116,7 @@ public class ApplicationQueueManager implements QueueManager {
 
         final HashMap<Object,Notifier> notifierMap =  getNotifierMap();
         final String queueName = getRandomQueue(queueNames);
+        final List<ApplicationQueueMessage> messages = new ArrayList<>();
 
         //get devices in querystring, and make sure you have access
         if (pathQuery != null) {
@@ -128,8 +131,6 @@ public class ApplicationQueueManager implements QueueManager {
             final CountMinSketch sketch = new CountMinSketch(0.0001,.99,7364181); //add probablistic counter to find dups
             final UUID appId = em.getApplication().getUuid();
             final Map<String,Object> payloads = notification.getPayloads();
-
-
             final Func1<Entity,Entity> entityListFunct = new Func1<Entity, Entity>() {
                 @Override
                 public Entity call(Entity entity) {
@@ -179,7 +180,11 @@ public class ApplicationQueueManager implements QueueManager {
                                 LOG.info("ApplicationQueueMessage: notification {} device {} queue time set. duration "+(System.currentTimeMillis()-now)+" ms", notification.getUuid(), deviceRef.getUuid());
                             }
                             now = System.currentTimeMillis();
-                            qm.postToQueue(queueName, message);
+                            if(jobExecution == null && sendNow) {
+                                messages.add(message);
+                            }else{
+                                qm.postToQueue(queueName, message);
+                            }
                             LOG.info("ApplicationQueueMessage: notification {} post-queue to device {} duration " + (System.currentTimeMillis() - now) + " ms "+queueName+" queue", notification.getUuid(), deviceRef.getUuid());
                             deviceCount.incrementAndGet();
                             queueMeter.mark();
@@ -207,7 +212,7 @@ public class ApplicationQueueManager implements QueueManager {
                         }
                     });
             o.toBlocking().lastOrDefault(null);
-            LOG.info("ApplicationQueueMessage: notification {} done queueing duration {} ms", notification.getUuid(),System.currentTimeMillis() - now);
+            LOG.info("ApplicationQueueMessage: notification {} done queueing duration {} ms", notification.getUuid(), System.currentTimeMillis() - now);
 
 
         }
@@ -244,6 +249,12 @@ public class ApplicationQueueManager implements QueueManager {
             LOG.info("ApplicationQueueMessage: notification {} done queuing to {} devices in "+elapsed+" ms",notification.getUuid().toString(),deviceCount.get());
         }
 
+        now = System.currentTimeMillis();
+        if(sendNow && messages.size()>0){
+            sendBatchToProviders(messages,null).toBlocking().lastOrDefault(null);
+        }
+        LOG.info("ApplicationQueueMessage: notification {} done sending duration {} ms", notification.getUuid(), System.currentTimeMillis() - now);
+
     }
 
     /**
@@ -285,6 +296,7 @@ public class ApplicationQueueManager implements QueueManager {
      * @param messages
      * @throws Exception
      */
+
     public Observable sendBatchToProviders( final List<ApplicationQueueMessage> messages, final String queuePath) {
         LOG.info("sending batch of {} notifications.", messages.size());
         final Meter sendMeter = metricsFactory.getMeter(NotificationsService.class, "send");

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/1530bf7a/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java
index f163431..07aed57 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java
@@ -63,7 +63,9 @@ public class TaskManager {
         LOG.debug("REMOVED {}", deviceUUID);
         try {
             LOG.debug("notification {} removing device {} from remaining", notification.getUuid(), deviceUUID);
-            qm.commitTransaction(queuePath, messageMap.get(deviceUUID).getTransaction(), null);
+            if(queuePath!=null){
+                qm.commitTransaction(queuePath, messageMap.get(deviceUUID).getTransaction(), null);
+            }
 
             EntityRef deviceRef = new SimpleEntityRef(Device.ENTITY_TYPE, deviceUUID);
             if (receipt != null) {