You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2014/10/07 19:29:12 UTC
[2/3] git commit: adding metrics
adding metrics
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/9c1e4aba
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/9c1e4aba
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/9c1e4aba
Branch: refs/heads/two-dot-o
Commit: 9c1e4aba92b6b1ac960feff031580182d19e5381
Parents: ef8a8c6
Author: Shawn Feldman <sf...@apache.org>
Authored: Tue Oct 7 11:22:02 2014 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Tue Oct 7 11:22:02 2014 -0600
----------------------------------------------------------------------
.../usergrid/services/notifications/QueueListener.java | 12 +++++++++---
1 file changed, 9 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9c1e4aba/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
index c5419c4..a69afe2 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
@@ -16,10 +16,11 @@
*/
package org.apache.usergrid.services.notifications;
+import com.codahale.metrics.*;
+import com.codahale.metrics.Timer;
import org.apache.usergrid.corepersistence.CpSetup;
import org.apache.usergrid.metrics.MetricsFactory;
-import org.apache.usergrid.mq.QueueResults;
import org.apache.usergrid.persistence.EntityManager;
import org.apache.usergrid.persistence.EntityManagerFactory;
@@ -31,7 +32,6 @@ import org.apache.usergrid.services.ServiceManager;
import org.apache.usergrid.services.ServiceManagerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
import rx.Observable;
import javax.annotation.PostConstruct;
import java.util.*;
@@ -126,11 +126,13 @@ public class QueueListener {
}
private void execute(){
-// Thread.currentThread().setDaemon(true);
+ Thread.currentThread().setDaemon(true);
Thread.currentThread().setName("Notifications_Processor"+UUID.randomUUID());
final AtomicInteger consecutiveExceptions = new AtomicInteger();
LOG.info("QueueListener: Starting execute process.");
+ Meter meter = metricsService.getMeter(QueueListener.class, "queue");
+ com.codahale.metrics.Timer timer = metricsService.getTimer(QueueListener.class, "dequeue");
// run until there are no more active jobs
while ( true ) {
@@ -139,10 +141,12 @@ public class QueueListener {
LOG.info("getting from queue {} ", queueName);
QueueScope queueScope = new QueueScopeImpl(new SimpleId(smf.getManagementAppId(),"notifications"),queueName);
QueueManager queueManager = TEST_QUEUE_MANAGER != null ? TEST_QUEUE_MANAGER : queueManagerFactory.getQueueManager(queueScope);
+ Timer.Context timerContext = timer.time();
List<QueueMessage> messages = queueManager.getMessages(getBatchSize(), MESSAGE_TRANSACTION_TIMEOUT, 5000, ApplicationQueueMessage.class);
LOG.info("retrieved batch of {} messages from queue {} ", messages.size(),queueName);
if (messages.size() > 0) {
+
HashMap<UUID, List<QueueMessage>> messageMap = new HashMap<>(messages.size());
//group messages into hash map by app id
for (QueueMessage message : messages) {
@@ -183,6 +187,7 @@ public class QueueListener {
merge.toBlocking().lastOrDefault(null);
}
queueManager.commitMessages(messages);
+ meter.mark(messages.size());
LOG.info("sent batch {} messages duration {} ms", messages.size(),System.currentTimeMillis() - now);
if(sleepBetweenRuns > 0) {
@@ -194,6 +199,7 @@ public class QueueListener {
LOG.info("no messages...sleep...{}", sleepWhenNoneFound);
Thread.sleep(sleepWhenNoneFound);
}
+ timerContext.stop();
//send to the providers
consecutiveExceptions.set(0);
}catch (Exception ex){