You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2014/09/16 16:16:17 UTC

git commit: add meter; change logging

Repository: incubator-usergrid
Updated Branches:
  refs/heads/two-dot-o 16848a82b -> 3187097dd


add meter; change logging


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/3187097d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/3187097d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/3187097d

Branch: refs/heads/two-dot-o
Commit: 3187097dd7401d77bda194b296488c4a21cbb9fd
Parents: 16848a8
Author: Shawn Feldman <sf...@apache.org>
Authored: Tue Sep 16 08:06:34 2014 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Tue Sep 16 08:06:34 2014 -0600

----------------------------------------------------------------------
 .../notifications/ApplicationQueueManager.java  | 31 +++++++++++---------
 1 file changed, 17 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3187097d/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
index 1033d16..d25c9d9 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
@@ -105,8 +105,10 @@ public class ApplicationQueueManager implements QueueManager {
     }
 
     public void queueNotification(final Notification notification, final JobExecution jobExecution) throws Exception {
+        final Meter queueMeter = metricsFactory.getMeter(ApplicationQueueManager.class,"queue");
+
         if (notification.getCanceled() == Boolean.TRUE) {
-            LOG.info("ApplicationsQueueMessage: notification " + notification.getUuid() + " canceled");
+            LOG.info("ApplicationQueueMessage: notification " + notification.getUuid() + " canceled");
             if (jobExecution != null) {
                 jobExecution.killed();
             }
@@ -114,7 +116,7 @@ public class ApplicationQueueManager implements QueueManager {
         }
 
         long startTime = System.currentTimeMillis();
-        LOG.info("ApplicationsQueueMessage: notification {} start queuing", notification.getUuid());
+        LOG.info("ApplicationQueueMessage: notification {} start queuing", notification.getUuid());
         final PathQuery<Device> pathQuery = notification.getPathQuery() ; //devices query
         final AtomicInteger deviceCount = new AtomicInteger(); //count devices so you can make a judgement on batching
         final ConcurrentLinkedQueue<String> errorMessages = new ConcurrentLinkedQueue<String>(); //build up list of issues
@@ -123,7 +125,7 @@ public class ApplicationQueueManager implements QueueManager {
 
         //get devices in querystring, and make sure you have access
         if (pathQuery != null) {
-            LOG.info("ApplicationsQueueMessage: notification {} start query", notification.getUuid());
+            LOG.info("ApplicationQueueMessage: notification {} start query", notification.getUuid());
             final Iterator<Device> iterator = pathQuery.iterator(em);
             //if there are more pages (defined by PAGE_SIZE) you probably want this to be async, also if this is already a job then don't reschedule
             if (iterator instanceof ResultsIterator && ((ResultsIterator) iterator).hasPages() && jobExecution == null) {
@@ -134,7 +136,7 @@ public class ApplicationQueueManager implements QueueManager {
             final UUID appId = em.getApplication().getUuid();
             final Map<String,Object> payloads = notification.getPayloads();
 
-            LOG.info("ApplicationsQueueMessage: notification {} start threading", notification.getUuid());
+            LOG.info("ApplicationQueueMessage: notification {} start threading", notification.getUuid());
             rx.Observable.create(new IteratorObservable<Entity>(iterator)).parallel(new Func1<Observable<Entity>, Observable<Entity>>() {
                 @Override
                 public rx.Observable<Entity> call(rx.Observable<Entity> deviceObservable) {
@@ -142,19 +144,19 @@ public class ApplicationQueueManager implements QueueManager {
                         @Override
                         public Entity call(Entity entity) {
                             try {
-                                LOG.info("ApplicationsQueueMessage: notification {} send to entity {}", notification.getUuid(), entity.getUuid());
+                                LOG.info("ApplicationQueueMessage: notification {} send to entity {}", notification.getUuid(), entity.getUuid());
 
                                 List<EntityRef> devicesRef = getDevices(entity); // resolve group
 
-                                LOG.info("ApplicationsQueueMessage: notification {} send to {} devices", notification.getUuid(), devicesRef.size());
+                                LOG.info("ApplicationQueueMessage: notification {} send to {} devices", notification.getUuid(), devicesRef.size());
 
                                 for (EntityRef deviceRef : devicesRef) {
                                     if(LOG.isDebugEnabled()){
-                                        LOG.info("ApplicationsQueueMessage: notification {} starting to queue device {} ", notification.getUuid(), deviceRef.getUuid());
+                                        LOG.info("ApplicationQueueMessage: notification {} starting to queue device {} ", notification.getUuid(), deviceRef.getUuid());
                                     }
                                     long hash = MurmurHash.hash(deviceRef.getUuid());
                                     if (sketch.estimateCount(hash) > 0) { //look for duplicates
-                                        LOG.debug("ApplicationsQueueMessage: Maybe Found duplicate device: {}", deviceRef.getUuid());
+                                        LOG.debug("ApplicationQueueMessage: Maybe Found duplicate device: {}", deviceRef.getUuid());
                                         continue;
                                     } else {
                                         sketch.add(hash, 1);
@@ -174,25 +176,26 @@ public class ApplicationQueueManager implements QueueManager {
                                     }
 
                                     if (notifierId == null) {
-                                        LOG.debug("ApplicationsQueueMessage: Notifier did not match for device {} ", deviceRef);
+                                        LOG.debug("ApplicationQueueMessage: Notifier did not match for device {} ", deviceRef);
                                         continue;
                                     }
 
                                     ApplicationQueueMessage message = new ApplicationQueueMessage(appId, notification.getUuid(), deviceRef.getUuid(), notifierKey, notifierId);
                                     if(LOG.isDebugEnabled()){
-                                        LOG.info("ApplicationsQueueMessage: notification {} pre-queue to device {} ", notification.getUuid(), deviceRef.getUuid());
+                                        LOG.info("ApplicationQueueMessage: notification {} pre-queue to device {} ", notification.getUuid(), deviceRef.getUuid());
                                     }
                                     qm.postToQueue(QUEUE_NAME, message);
                                     if(LOG.isDebugEnabled()){
-                                        LOG.info("ApplicationsQueueMessage: notification {} post-queue to device {} ", notification.getUuid(), deviceRef.getUuid());
+                                        LOG.info("ApplicationQueueMessage: notification {} post-queue to device {} ", notification.getUuid(), deviceRef.getUuid());
                                     }
                                     if (notification.getQueued() == null) {
                                         // update queued time
                                         notification.setQueued(System.currentTimeMillis());
                                         em.update(notification);
-                                        LOG.info("ApplicationsQueueMessage: notification {} queue time set.", notification.getUuid(), deviceRef.getUuid());
+                                        LOG.info("ApplicationQueueMessage: notification {} queue time set.", notification.getUuid(), deviceRef.getUuid());
                                     }
                                     deviceCount.incrementAndGet();
+                                    queueMeter.mark();
                                 }
 
                             } catch (Exception deviceLoopException) {
@@ -239,8 +242,8 @@ public class ApplicationQueueManager implements QueueManager {
 
         if (LOG.isInfoEnabled()) {
             long elapsed = notification.getQueued() != null ? notification.getQueued() - startTime : 0;
-            LOG.info("ApplicationsQueueMessage: notification {} done queuing to {} devices in "+elapsed+" ms",notification.getUuid().toString(),deviceCount.get());
-            LOG.info("ApplicationsQueueMessage: notification {} finished in {} ms",notification.getUuid().toString(),elapsed);
+            LOG.info("ApplicationQueueMessage: notification {} done queuing to {} devices in "+elapsed+" ms",notification.getUuid().toString(),deviceCount.get());
+            LOG.info("ApplicationQueueMessage: notification {} finished in {} ms",notification.getUuid().toString(),elapsed);
 
         }