You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by mr...@apache.org on 2016/06/02 17:07:43 UTC

[25/54] [abbrv] usergrid git commit: Temporarily disable notification counters and back to Schedulers.io()

Temporarily disable notification counters and back to Schedulers.io()


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/df9abc4d
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/df9abc4d
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/df9abc4d

Branch: refs/heads/apm
Commit: df9abc4d0c16af25dbfd75ea544bc9953b7addc7
Parents: 81eb251
Author: Michael Russo <mr...@apigee.com>
Authored: Wed Apr 20 18:01:57 2016 -0700
Committer: Michael Russo <mr...@apigee.com>
Committed: Wed Apr 20 18:01:57 2016 -0700

----------------------------------------------------------------------
 .../services/notifications/TaskManager.java     |  4 +-
 .../impl/ApplicationQueueManagerImpl.java       | 58 ++++++++++++--------
 2 files changed, 36 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/df9abc4d/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java
index e908c3b..870cae9 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java
@@ -62,7 +62,7 @@ public class TaskManager {
             //random date and time for format
 
 
-            incrementNotificationCounter( "completed" );
+            //incrementNotificationCounter( "completed" );
 
             EntityRef deviceRef = new SimpleEntityRef(Device.ENTITY_TYPE, deviceUUID);
 
@@ -100,7 +100,7 @@ public class TaskManager {
 
         try {
 
-            incrementNotificationCounter( "failed" );
+            //incrementNotificationCounter( "failed" );
 
             if (logger.isDebugEnabled()) {
                 logger.debug("Notification {} for device {} got error {}", notification.getUuid(), deviceUUID, code);

http://git-wip-us.apache.org/repos/asf/usergrid/blob/df9abc4d/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
index 5254fd6..778307c 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
@@ -62,7 +62,7 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
 
 
 
-    private final ExecutorService asyncExecutor;
+    //private final ExecutorService asyncExecutor;
 
 
 
@@ -78,6 +78,7 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
         this.queueMeter = metricsFactory.getMeter(ApplicationQueueManagerImpl.class, "notification.queue");
         this.sendMeter = metricsFactory.getMeter(NotificationsService.class, "queue.send");
 
+        /**
         int maxAsyncThreads;
         int workerQueueSize;
 
@@ -102,7 +103,7 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
             .createTaskExecutor( "push-device-io", maxAsyncThreads, workerQueueSize,
                 TaskExecutorFactory.RejectionAction.CALLERRUNS );
 
-
+        **/
     }
 
     private boolean scheduleQueueJob(Notification notification) throws Exception {
@@ -269,7 +270,6 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
                         return Observable.from(entities);
 
                         })
-                        .distinct( deviceRef -> deviceRef.getUuid())
                         .filter( device -> {
 
                             if(logger.isTraceEnabled()) {
@@ -306,37 +306,47 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
 
                         })
                         .map(sendMessageFunction)
-                        .doOnNext( message -> {
-                            try {
+                        .subscribeOn(Schedulers.io());
 
-                                if(message.isPresent()){
+                }, concurrencyFactor)
+                .distinct( queueMessage -> {
 
-                                    if(logger.isTraceEnabled()) {
-                                        logger.trace("Queueing notification message for device: {}", message.get().getDeviceId());
-                                    }
-                                    qm.sendMessage( message.get() );
-                                    queueMeter.mark();
-                                }
+                    if(queueMessage.isPresent()) {
+                        return queueMessage.get().getNotificationId();
+                    }
+                    
+                    return queueMessage; // this will always be distinct, default handling for the Optional.empty() case
 
-                            } catch (IOException e) {
+                } )
+                .doOnNext( message -> {
+                    try {
 
-                                if(message.isPresent()){
-                                    logger.error("Unable to queue notification for notification UUID {} and device UUID {} ",
-                                        message.get().getNotificationId(), message.get().getDeviceId());
-                                }
-                                else{
-                                    logger.error("Unable to queue notification as it's not present when trying to send to queue");
-                                }
+                        if(message.isPresent()){
 
+                            if(logger.isTraceEnabled()) {
+                                logger.trace("Queueing notification message for device: {}", message.get().getDeviceId());
                             }
+                            qm.sendMessage( message.get() );
+                            queueMeter.mark();
+                        }
 
+                    } catch (IOException e) {
 
-                        }).subscribeOn(Schedulers.from(asyncExecutor));
+                        if(message.isPresent()){
+                            logger.error("Unable to queue notification for notification UUID {} and device UUID {} ",
+                                message.get().getNotificationId(), message.get().getDeviceId());
+                        }
+                        else{
+                            logger.error("Unable to queue notification as it's not present when trying to send to queue");
+                        }
 
-                }, concurrencyFactor)
+                    }
+
+
+                })
                 .doOnError(throwable -> {
 
-                    logger.error("Error while processing devices for notification : {}", notification.getUuid());
+                    logger.error("Error while processing devices for notification : {}, error: {}", notification.getUuid(), throwable.getMessage());
                     notification.setProcessingFinished(-1L);
                     notification.setDeviceProcessedCount(deviceCount.get());
                     logger.warn("Partial notification. Only {} devices processed for notification {}",
@@ -362,7 +372,7 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
 
                 });
 
-            processMessagesObservable.subscribeOn(Schedulers.from(asyncExecutor)).subscribe(); // fire the queuing into the background
+            processMessagesObservable.subscribeOn(Schedulers.io()).subscribe(); // fire the queuing into the background
 
         }