You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2014/09/16 16:16:17 UTC
git commit: add meter; change logging
Repository: incubator-usergrid
Updated Branches:
refs/heads/two-dot-o 16848a82b -> 3187097dd
add meter; change logging
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/3187097d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/3187097d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/3187097d
Branch: refs/heads/two-dot-o
Commit: 3187097dd7401d77bda194b296488c4a21cbb9fd
Parents: 16848a8
Author: Shawn Feldman <sf...@apache.org>
Authored: Tue Sep 16 08:06:34 2014 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Tue Sep 16 08:06:34 2014 -0600
----------------------------------------------------------------------
.../notifications/ApplicationQueueManager.java | 31 +++++++++++---------
1 file changed, 17 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3187097d/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
index 1033d16..d25c9d9 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
@@ -105,8 +105,10 @@ public class ApplicationQueueManager implements QueueManager {
}
public void queueNotification(final Notification notification, final JobExecution jobExecution) throws Exception {
+ final Meter queueMeter = metricsFactory.getMeter(ApplicationQueueManager.class,"queue");
+
if (notification.getCanceled() == Boolean.TRUE) {
- LOG.info("ApplicationsQueueMessage: notification " + notification.getUuid() + " canceled");
+ LOG.info("ApplicationQueueMessage: notification " + notification.getUuid() + " canceled");
if (jobExecution != null) {
jobExecution.killed();
}
@@ -114,7 +116,7 @@ public class ApplicationQueueManager implements QueueManager {
}
long startTime = System.currentTimeMillis();
- LOG.info("ApplicationsQueueMessage: notification {} start queuing", notification.getUuid());
+ LOG.info("ApplicationQueueMessage: notification {} start queuing", notification.getUuid());
final PathQuery<Device> pathQuery = notification.getPathQuery() ; //devices query
final AtomicInteger deviceCount = new AtomicInteger(); //count devices so you can make a judgement on batching
final ConcurrentLinkedQueue<String> errorMessages = new ConcurrentLinkedQueue<String>(); //build up list of issues
@@ -123,7 +125,7 @@ public class ApplicationQueueManager implements QueueManager {
//get devices in querystring, and make sure you have access
if (pathQuery != null) {
- LOG.info("ApplicationsQueueMessage: notification {} start query", notification.getUuid());
+ LOG.info("ApplicationQueueMessage: notification {} start query", notification.getUuid());
final Iterator<Device> iterator = pathQuery.iterator(em);
//if there are more pages (defined by PAGE_SIZE) you probably want this to be async, also if this is already a job then don't reschedule
if (iterator instanceof ResultsIterator && ((ResultsIterator) iterator).hasPages() && jobExecution == null) {
@@ -134,7 +136,7 @@ public class ApplicationQueueManager implements QueueManager {
final UUID appId = em.getApplication().getUuid();
final Map<String,Object> payloads = notification.getPayloads();
- LOG.info("ApplicationsQueueMessage: notification {} start threading", notification.getUuid());
+ LOG.info("ApplicationQueueMessage: notification {} start threading", notification.getUuid());
rx.Observable.create(new IteratorObservable<Entity>(iterator)).parallel(new Func1<Observable<Entity>, Observable<Entity>>() {
@Override
public rx.Observable<Entity> call(rx.Observable<Entity> deviceObservable) {
@@ -142,19 +144,19 @@ public class ApplicationQueueManager implements QueueManager {
@Override
public Entity call(Entity entity) {
try {
- LOG.info("ApplicationsQueueMessage: notification {} send to entity {}", notification.getUuid(), entity.getUuid());
+ LOG.info("ApplicationQueueMessage: notification {} send to entity {}", notification.getUuid(), entity.getUuid());
List<EntityRef> devicesRef = getDevices(entity); // resolve group
- LOG.info("ApplicationsQueueMessage: notification {} send to {} devices", notification.getUuid(), devicesRef.size());
+ LOG.info("ApplicationQueueMessage: notification {} send to {} devices", notification.getUuid(), devicesRef.size());
for (EntityRef deviceRef : devicesRef) {
if(LOG.isDebugEnabled()){
- LOG.info("ApplicationsQueueMessage: notification {} starting to queue device {} ", notification.getUuid(), deviceRef.getUuid());
+ LOG.info("ApplicationQueueMessage: notification {} starting to queue device {} ", notification.getUuid(), deviceRef.getUuid());
}
long hash = MurmurHash.hash(deviceRef.getUuid());
if (sketch.estimateCount(hash) > 0) { //look for duplicates
- LOG.debug("ApplicationsQueueMessage: Maybe Found duplicate device: {}", deviceRef.getUuid());
+ LOG.debug("ApplicationQueueMessage: Maybe Found duplicate device: {}", deviceRef.getUuid());
continue;
} else {
sketch.add(hash, 1);
@@ -174,25 +176,26 @@ public class ApplicationQueueManager implements QueueManager {
}
if (notifierId == null) {
- LOG.debug("ApplicationsQueueMessage: Notifier did not match for device {} ", deviceRef);
+ LOG.debug("ApplicationQueueMessage: Notifier did not match for device {} ", deviceRef);
continue;
}
ApplicationQueueMessage message = new ApplicationQueueMessage(appId, notification.getUuid(), deviceRef.getUuid(), notifierKey, notifierId);
if(LOG.isDebugEnabled()){
- LOG.info("ApplicationsQueueMessage: notification {} pre-queue to device {} ", notification.getUuid(), deviceRef.getUuid());
+ LOG.info("ApplicationQueueMessage: notification {} pre-queue to device {} ", notification.getUuid(), deviceRef.getUuid());
}
qm.postToQueue(QUEUE_NAME, message);
if(LOG.isDebugEnabled()){
- LOG.info("ApplicationsQueueMessage: notification {} post-queue to device {} ", notification.getUuid(), deviceRef.getUuid());
+ LOG.info("ApplicationQueueMessage: notification {} post-queue to device {} ", notification.getUuid(), deviceRef.getUuid());
}
if (notification.getQueued() == null) {
// update queued time
notification.setQueued(System.currentTimeMillis());
em.update(notification);
- LOG.info("ApplicationsQueueMessage: notification {} queue time set.", notification.getUuid(), deviceRef.getUuid());
+ LOG.info("ApplicationQueueMessage: notification {} queue time set.", notification.getUuid(), deviceRef.getUuid());
}
deviceCount.incrementAndGet();
+ queueMeter.mark();
}
} catch (Exception deviceLoopException) {
@@ -239,8 +242,8 @@ public class ApplicationQueueManager implements QueueManager {
if (LOG.isInfoEnabled()) {
long elapsed = notification.getQueued() != null ? notification.getQueued() - startTime : 0;
- LOG.info("ApplicationsQueueMessage: notification {} done queuing to {} devices in "+elapsed+" ms",notification.getUuid().toString(),deviceCount.get());
- LOG.info("ApplicationsQueueMessage: notification {} finished in {} ms",notification.getUuid().toString(),elapsed);
+ LOG.info("ApplicationQueueMessage: notification {} done queuing to {} devices in "+elapsed+" ms",notification.getUuid().toString(),deviceCount.get());
+ LOG.info("ApplicationQueueMessage: notification {} finished in {} ms",notification.getUuid().toString(),elapsed);
}