You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2014/10/07 19:29:12 UTC

[2/3] git commit: adding metrics

adding metrics


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/9c1e4aba
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/9c1e4aba
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/9c1e4aba

Branch: refs/heads/two-dot-o
Commit: 9c1e4aba92b6b1ac960feff031580182d19e5381
Parents: ef8a8c6
Author: Shawn Feldman <sf...@apache.org>
Authored: Tue Oct 7 11:22:02 2014 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Tue Oct 7 11:22:02 2014 -0600

----------------------------------------------------------------------
 .../usergrid/services/notifications/QueueListener.java  | 12 +++++++++---
 1 file changed, 9 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9c1e4aba/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
index c5419c4..a69afe2 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
@@ -16,10 +16,11 @@
  */
 package org.apache.usergrid.services.notifications;
 
+import com.codahale.metrics.*;
+import com.codahale.metrics.Timer;
 import org.apache.usergrid.corepersistence.CpSetup;
 import org.apache.usergrid.metrics.MetricsFactory;
 
-import org.apache.usergrid.mq.QueueResults;
 import org.apache.usergrid.persistence.EntityManager;
 import org.apache.usergrid.persistence.EntityManagerFactory;
 
@@ -31,7 +32,6 @@ import org.apache.usergrid.services.ServiceManager;
 import org.apache.usergrid.services.ServiceManagerFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
 import rx.Observable;
 import javax.annotation.PostConstruct;
 import java.util.*;
@@ -126,11 +126,13 @@ public class QueueListener  {
     }
 
     private void execute(){
-//        Thread.currentThread().setDaemon(true);
+        Thread.currentThread().setDaemon(true);
         Thread.currentThread().setName("Notifications_Processor"+UUID.randomUUID());
 
         final AtomicInteger consecutiveExceptions = new AtomicInteger();
         LOG.info("QueueListener: Starting execute process.");
+        Meter meter = metricsService.getMeter(QueueListener.class, "queue");
+        com.codahale.metrics.Timer timer = metricsService.getTimer(QueueListener.class, "dequeue");
 
         // run until there are no more active jobs
         while ( true ) {
@@ -139,10 +141,12 @@ public class QueueListener  {
                 LOG.info("getting from queue {} ", queueName);
                 QueueScope queueScope = new QueueScopeImpl(new SimpleId(smf.getManagementAppId(),"notifications"),queueName);
                 QueueManager queueManager = TEST_QUEUE_MANAGER != null ? TEST_QUEUE_MANAGER : queueManagerFactory.getQueueManager(queueScope);
+                Timer.Context timerContext = timer.time();
                 List<QueueMessage> messages = queueManager.getMessages(getBatchSize(), MESSAGE_TRANSACTION_TIMEOUT, 5000, ApplicationQueueMessage.class);
                 LOG.info("retrieved batch of {} messages from queue {} ", messages.size(),queueName);
 
                 if (messages.size() > 0) {
+
                     HashMap<UUID, List<QueueMessage>> messageMap = new HashMap<>(messages.size());
                     //group messages into hash map by app id
                     for (QueueMessage message : messages) {
@@ -183,6 +187,7 @@ public class QueueListener  {
                         merge.toBlocking().lastOrDefault(null);
                     }
                     queueManager.commitMessages(messages);
+                    meter.mark(messages.size());
                     LOG.info("sent batch {} messages duration {} ms", messages.size(),System.currentTimeMillis() - now);
 
                     if(sleepBetweenRuns > 0) {
@@ -194,6 +199,7 @@ public class QueueListener  {
                     LOG.info("no messages...sleep...{}", sleepWhenNoneFound);
                     Thread.sleep(sleepWhenNoneFound);
                 }
+                timerContext.stop();
                 //send to the providers
                 consecutiveExceptions.set(0);
             }catch (Exception ex){