You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by mr...@apache.org on 2016/06/02 17:07:44 UTC
[26/54] [abbrv] usergrid git commit: Fix scheduler.
Fix scheduler.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/938bef0d
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/938bef0d
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/938bef0d
Branch: refs/heads/apm
Commit: 938bef0d02c6f029319e4ea1632c13c7bf74a447
Parents: df9abc4
Author: Michael Russo <mr...@apigee.com>
Authored: Wed Apr 20 18:24:59 2016 -0700
Committer: Michael Russo <mr...@apigee.com>
Committed: Wed Apr 20 18:24:59 2016 -0700
----------------------------------------------------------------------
.../impl/ApplicationQueueManagerImpl.java | 17 +++++++++--------
1 file changed, 9 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/938bef0d/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
index 778307c..4b2612f 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
@@ -29,6 +29,7 @@ import org.apache.usergrid.services.notifications.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
+import rx.Scheduler;
import rx.Subscriber;
import rx.functions.Func1;
import rx.schedulers.Schedulers;
@@ -62,7 +63,7 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
- //private final ExecutorService asyncExecutor;
+ private final Scheduler scheduler;
@@ -78,7 +79,7 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
this.queueMeter = metricsFactory.getMeter(ApplicationQueueManagerImpl.class, "notification.queue");
this.sendMeter = metricsFactory.getMeter(NotificationsService.class, "queue.send");
- /**
+
int maxAsyncThreads;
int workerQueueSize;
@@ -99,11 +100,11 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
// create our own executor which has a bounded queue w/ caller runs policy for rejected tasks
- this.asyncExecutor = TaskExecutorFactory
+ this.scheduler = Schedulers.from(TaskExecutorFactory
.createTaskExecutor( "push-device-io", maxAsyncThreads, workerQueueSize,
- TaskExecutorFactory.RejectionAction.CALLERRUNS );
+ TaskExecutorFactory.RejectionAction.CALLERRUNS ));
+
- **/
}
private boolean scheduleQueueJob(Notification notification) throws Exception {
@@ -306,7 +307,7 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
})
.map(sendMessageFunction)
- .subscribeOn(Schedulers.io());
+ .subscribeOn(scheduler);
}, concurrencyFactor)
.distinct( queueMessage -> {
@@ -314,7 +315,7 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
if(queueMessage.isPresent()) {
return queueMessage.get().getNotificationId();
}
-
+
return queueMessage; // this will always be distinct, default handling for the Optional.empty() case
} )
@@ -372,7 +373,7 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
});
- processMessagesObservable.subscribeOn(Schedulers.io()).subscribe(); // fire the queuing into the background
+ processMessagesObservable.subscribeOn(scheduler).subscribe(); // fire the queuing into the background
}