You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by mr...@apache.org on 2016/06/02 17:07:43 UTC
[25/54] [abbrv] usergrid git commit: Temporarily disable notification
counters and back to Schedulers.io()
Temporarily disable notification counters and back to Schedulers.io()
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/df9abc4d
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/df9abc4d
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/df9abc4d
Branch: refs/heads/apm
Commit: df9abc4d0c16af25dbfd75ea544bc9953b7addc7
Parents: 81eb251
Author: Michael Russo <mr...@apigee.com>
Authored: Wed Apr 20 18:01:57 2016 -0700
Committer: Michael Russo <mr...@apigee.com>
Committed: Wed Apr 20 18:01:57 2016 -0700
----------------------------------------------------------------------
.../services/notifications/TaskManager.java | 4 +-
.../impl/ApplicationQueueManagerImpl.java | 58 ++++++++++++--------
2 files changed, 36 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/df9abc4d/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java
index e908c3b..870cae9 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java
@@ -62,7 +62,7 @@ public class TaskManager {
//random date and time for format
- incrementNotificationCounter( "completed" );
+ //incrementNotificationCounter( "completed" );
EntityRef deviceRef = new SimpleEntityRef(Device.ENTITY_TYPE, deviceUUID);
@@ -100,7 +100,7 @@ public class TaskManager {
try {
- incrementNotificationCounter( "failed" );
+ //incrementNotificationCounter( "failed" );
if (logger.isDebugEnabled()) {
logger.debug("Notification {} for device {} got error {}", notification.getUuid(), deviceUUID, code);
http://git-wip-us.apache.org/repos/asf/usergrid/blob/df9abc4d/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
index 5254fd6..778307c 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
@@ -62,7 +62,7 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
- private final ExecutorService asyncExecutor;
+ //private final ExecutorService asyncExecutor;
@@ -78,6 +78,7 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
this.queueMeter = metricsFactory.getMeter(ApplicationQueueManagerImpl.class, "notification.queue");
this.sendMeter = metricsFactory.getMeter(NotificationsService.class, "queue.send");
+ /**
int maxAsyncThreads;
int workerQueueSize;
@@ -102,7 +103,7 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
.createTaskExecutor( "push-device-io", maxAsyncThreads, workerQueueSize,
TaskExecutorFactory.RejectionAction.CALLERRUNS );
-
+ **/
}
private boolean scheduleQueueJob(Notification notification) throws Exception {
@@ -269,7 +270,6 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
return Observable.from(entities);
})
- .distinct( deviceRef -> deviceRef.getUuid())
.filter( device -> {
if(logger.isTraceEnabled()) {
@@ -306,37 +306,47 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
})
.map(sendMessageFunction)
- .doOnNext( message -> {
- try {
+ .subscribeOn(Schedulers.io());
- if(message.isPresent()){
+ }, concurrencyFactor)
+ .distinct( queueMessage -> {
- if(logger.isTraceEnabled()) {
- logger.trace("Queueing notification message for device: {}", message.get().getDeviceId());
- }
- qm.sendMessage( message.get() );
- queueMeter.mark();
- }
+ if(queueMessage.isPresent()) {
+ return queueMessage.get().getNotificationId();
+ }
+
+ return queueMessage; // this will always be distinct, default handling for the Optional.empty() case
- } catch (IOException e) {
+ } )
+ .doOnNext( message -> {
+ try {
- if(message.isPresent()){
- logger.error("Unable to queue notification for notification UUID {} and device UUID {} ",
- message.get().getNotificationId(), message.get().getDeviceId());
- }
- else{
- logger.error("Unable to queue notification as it's not present when trying to send to queue");
- }
+ if(message.isPresent()){
+ if(logger.isTraceEnabled()) {
+ logger.trace("Queueing notification message for device: {}", message.get().getDeviceId());
}
+ qm.sendMessage( message.get() );
+ queueMeter.mark();
+ }
+ } catch (IOException e) {
- }).subscribeOn(Schedulers.from(asyncExecutor));
+ if(message.isPresent()){
+ logger.error("Unable to queue notification for notification UUID {} and device UUID {} ",
+ message.get().getNotificationId(), message.get().getDeviceId());
+ }
+ else{
+ logger.error("Unable to queue notification as it's not present when trying to send to queue");
+ }
- }, concurrencyFactor)
+ }
+
+
+ })
.doOnError(throwable -> {
- logger.error("Error while processing devices for notification : {}", notification.getUuid());
+ logger.error("Error while processing devices for notification : {}, error: {}", notification.getUuid(), throwable.getMessage());
notification.setProcessingFinished(-1L);
notification.setDeviceProcessedCount(deviceCount.get());
logger.warn("Partial notification. Only {} devices processed for notification {}",
@@ -362,7 +372,7 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
});
- processMessagesObservable.subscribeOn(Schedulers.from(asyncExecutor)).subscribe(); // fire the queuing into the background
+ processMessagesObservable.subscribeOn(Schedulers.io()).subscribe(); // fire the queuing into the background
}