You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by mr...@apache.org on 2016/06/02 17:07:37 UTC
[19/54] [abbrv] usergrid git commit: Add a separate executor pool for
async processing instead of unbounded Schedulers.io()
Add a separate executor pool for async processing instead of unbounded Schedulers.io()
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/f272af2f
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/f272af2f
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/f272af2f
Branch: refs/heads/apm
Commit: f272af2f3b41ce6ded649ffcd7410f23e33587fb
Parents: 6488b05
Author: Michael Russo <mr...@apigee.com>
Authored: Wed Apr 20 19:50:41 2016 +0100
Committer: Michael Russo <mr...@apigee.com>
Committed: Wed Apr 20 19:50:41 2016 +0100
----------------------------------------------------------------------
.../services/notifications/TaskManager.java | 96 +++++++++-----------
.../impl/ApplicationQueueManagerImpl.java | 86 +++++++++++++-----
2 files changed, 105 insertions(+), 77 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/f272af2f/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java
index ce2b82c..531ca7c 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java
@@ -37,12 +37,10 @@ public class TaskManager {
private AtomicLong successes = new AtomicLong();
private AtomicLong failures = new AtomicLong();
private EntityManager em;
- private boolean hasFinished;
public TaskManager(EntityManager em, Notification notification) {
this.em = em;
this.notification = notification;
- hasFinished = false;
}
public long getSuccesses(){return successes.get();}
@@ -53,77 +51,69 @@ public class TaskManager {
completed(notifier,null,deviceUUID,null);
}
public void completed(Notifier notifier, Receipt receipt, UUID deviceUUID, String newProviderId) throws Exception {
- if (logger.isTraceEnabled()) {
- logger.trace("REMOVED {}", deviceUUID);
- }
+
+ successes.incrementAndGet();
+
try {
- if (logger.isTraceEnabled()) {
- logger.trace("notification {} removing device {} from remaining", notification.getUuid(), deviceUUID);
- }
EntityRef deviceRef = new SimpleEntityRef(Device.ENTITY_TYPE, deviceUUID);
+
if (receipt != null) {
- if (logger.isTraceEnabled()) {
- logger.trace("notification {} sent to device {}. saving receipt.", notification.getUuid(), deviceUUID);
- }
+
receipt.setSent(System.currentTimeMillis());
this.saveReceipt(notification, deviceRef, receipt,false);
if (logger.isTraceEnabled()) {
- logger.trace("notification {} receipt saved for device {}", notification.getUuid(), deviceUUID);
+ logger.trace("Notification {} receipt saved for device {}", notification.getUuid(), deviceUUID);
}
- successes.incrementAndGet();
+
}
if (newProviderId != null) {
if (logger.isTraceEnabled()) {
- logger.trace("notification {} replacing device {} notifierId", notification.getUuid(), deviceUUID);
+ logger.trace("Notification {} replacing notifier id for device {} ", notification.getUuid(), deviceUUID);
}
replaceProviderId(deviceRef, notifier, newProviderId);
}
if (logger.isTraceEnabled()) {
- logger.trace("notification {} completed device {}", notification.getUuid(), deviceUUID);
+ logger.trace("Notification {} sending completed for device {}", notification.getUuid(), deviceUUID);
}
- } finally {
- if (logger.isTraceEnabled()) {
- logger.trace("COUNT is: {}", successes.get());
- }
-// if (hasFinished) { //process has finished but notifications are still coming in
-// finishedBatch();
-//
-// }
+ } catch(Exception e) {
+
+ logger.error("Unable to mark notification {} as completed due to: {}", notification.getUuid(), e);
+
}
}
public void failed(Notifier notifier, Receipt receipt, UUID deviceUUID, Object code, String message) throws Exception {
+ failures.incrementAndGet();
+
try {
if (logger.isDebugEnabled()) {
- logger.debug("notification {} for device {} got error {}", notification.getUuid(), deviceUUID, code);
+ logger.debug("Notification {} for device {} got error {}", notification.getUuid(), deviceUUID, code);
}
- failures.incrementAndGet();
- if(receipt!=null) {
- if ( receipt.getUuid() != null ) {
- successes.decrementAndGet();
- }
+ if(receipt != null) {
receipt.setErrorCode( code );
receipt.setErrorMessage( message );
this.saveReceipt( notification, new SimpleEntityRef( Device.ENTITY_TYPE, deviceUUID ), receipt, true );
- if ( logger.isDebugEnabled() ) {
- logger.debug( "notification {} receipt saved for device {}", notification.getUuid(), deviceUUID );
- }
}
- } finally {
+
completed(notifier, deviceUUID);
finishedBatch();
+
+ } catch (Exception e){
+
+ logger.error("Unable to finish marking notification {} as failed due to error: ", notification.getUuid(), e);
+
}
}
- /*
- * called from TaskManager - creates a persistent receipt and updates the
- * passed one w/ the UUID
+ /**
+ * Called from TaskManager - Creates a persistent receipt
+ *
*/
private void saveReceipt(EntityRef notification, EntityRef device, Receipt receipt, boolean hasError) throws Exception {
@@ -142,11 +132,16 @@ public class TaskManager {
} else {
em.addToCollections(entities, Notification.RECEIPTS_COLLECTION, receipt);
}
+
+ if ( logger.isDebugEnabled() ) {
+ logger.debug( "Notification {} receipt saved for device {}", notification.getUuid(), device.getUuid() );
+ }
+
}
}
- protected void replaceProviderId(EntityRef device, Notifier notifier,
+ private void replaceProviderId(EntityRef device, Notifier notifier,
String newProviderId) throws Exception {
Object value = em.getProperty(device, notifier.getName()
+ ApplicationQueueManager.NOTIFIER_ID_POSTFIX);
@@ -161,33 +156,24 @@ public class TaskManager {
}
}
- public void finishedBatch() throws Exception {
- finishedBatch(true);
- }
- public void finishedBatch(boolean refreshNotification) throws Exception {
-
- long successes = this.successes.get(); //reset counters
- long failures = this.failures.get(); //reset counters
+ public void finishedBatch() throws Exception {
- for (int i = 0; i < successes; i++) {
- this.successes.decrementAndGet();
- }
- for (int i = 0; i < failures; i++) {
- this.failures.decrementAndGet();
- }
+ long successes = this.successes.get();
+ long failures = this.failures.get();
- this.hasFinished = true;
+ // reset the counters
+ this.successes.set(0);
+ this.failures.set(0);
- // force refresh notification by fetching it
- if (refreshNotification) {
- notification = em.get(this.notification.getUuid(), Notification.class);
- }
+ // get the latest notification info
+ notification = em.get(this.notification.getUuid(), Notification.class);
notification.updateStatistics(successes, failures);
notification.setModified(System.currentTimeMillis());
notification.setFinished(notification.getModified());
em.update(notification);
+
}
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/f272af2f/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
index 2f39ae4..1bb92b7 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
@@ -19,6 +19,7 @@ package org.apache.usergrid.services.notifications.impl;
import com.codahale.metrics.Meter;
import org.apache.usergrid.batch.JobExecution;
import org.apache.usergrid.persistence.*;
+import org.apache.usergrid.persistence.core.executor.TaskExecutorFactory;
import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
import org.apache.usergrid.persistence.entities.*;
import org.apache.usergrid.persistence.Query;
@@ -52,11 +53,19 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
private final Meter queueMeter;
private final Meter sendMeter;
+ private final static String PUSH_PROCESSING_MAXTHREADS_PROP = "usergrid.push.async.processing.threads";
+ private final static String PUSH_PROCESSING_QUEUESIZE_PROP = "usergrid.push.async.processing.queue.size";
private final static String PUSH_PROCESSING_CONCURRENCY_PROP = "usergrid.push.async.processing.concurrency";
HashMap<Object, ProviderAdapter> notifierHashMap; // only retrieve notifiers once
+
+ private final ExecutorService asyncExecutor;
+
+
+
+
public ApplicationQueueManagerImpl( JobScheduler jobScheduler, EntityManager entityManager,
QueueManager queueManager, MetricsFactory metricsFactory,
Properties properties) {
@@ -65,8 +74,31 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
this.jobScheduler = jobScheduler;
this.metricsFactory = metricsFactory;
this.queueName = getQueueNames(properties);
- queueMeter = metricsFactory.getMeter(ApplicationQueueManagerImpl.class, "notification.queue");
- sendMeter = metricsFactory.getMeter(NotificationsService.class, "queue.send");
+ this.queueMeter = metricsFactory.getMeter(ApplicationQueueManagerImpl.class, "notification.queue");
+ this.sendMeter = metricsFactory.getMeter(NotificationsService.class, "queue.send");
+
+ int maxAsyncThreads;
+ int workerQueueSize;
+
+ try {
+
+ maxAsyncThreads = Integer.valueOf(System.getProperty(PUSH_PROCESSING_MAXTHREADS_PROP, "200"));
+ workerQueueSize = Integer.valueOf(System.getProperty(PUSH_PROCESSING_QUEUESIZE_PROP, "2000"));
+
+ } catch (Exception e){
+
+ // if junk is passed into the property, just default the values
+ maxAsyncThreads = 200;
+ workerQueueSize = 2000;
+
+ }
+
+
+ // create our own executor which has a bounded queue w/ caller runs policy for rejected tasks
+ this.asyncExecutor = TaskExecutorFactory
+ .createTaskExecutor( "push-device-io", maxAsyncThreads, workerQueueSize,
+ TaskExecutorFactory.RejectionAction.CALLERRUNS );
+
}
@@ -296,7 +328,7 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
}
- }).subscribeOn(Schedulers.io());
+ }).subscribeOn(Schedulers.from(asyncExecutor));
}, Integer.valueOf(System.getProperty(PUSH_PROCESSING_CONCURRENCY_PROP, "50")))
.doOnError(throwable -> {
@@ -327,7 +359,7 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
});
- processMessagesObservable.subscribeOn(Schedulers.io()).subscribe(); // fire the queuing into the background
+ processMessagesObservable.subscribeOn(Schedulers.from(asyncExecutor)).subscribe(); // fire the queuing into the background
}
@@ -348,7 +380,7 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
// if no devices, go ahead and mark the batch finished
if (deviceCount.get() <= 0 ) {
TaskManager taskManager = new TaskManager(em, notification);
- taskManager.finishedBatch(true);
+ taskManager.finishedBatch();
}
@@ -540,32 +572,43 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
/**
- * Validates that a notifier and adapter exists to send notifications to;
- * {"winphone":"mymessage","apple":"mymessage"}
- * TODO: document this method better
+ * Validates that a notifier and adapter exists to send notifications to. For the example payload
+ *
+ * { "payloads" : {"winphone":"mymessage","apple":"mymessage"} }
+ *
+ * Notifiers with name "winphone" and "apple" must exist.
*/
- private Map<String, Object> translatePayloads(Map<String, Object> payloads, Map<Object, ProviderAdapter>
- notifierMap) throws Exception {
- Map<String, Object> translatedPayloads = new HashMap<String, Object>(payloads.size());
+ private Map<String, Object> translatePayloads(Map<String, Object> payloads,
+ Map<Object, ProviderAdapter> notifierMap) throws Exception {
+
+ final Map<String, Object> translatedPayloads = new HashMap<String, Object>(payloads.size());
+
for (Map.Entry<String, Object> entry : payloads.entrySet()) {
+
String payloadKey = entry.getKey().toLowerCase();
Object payloadValue = entry.getValue();
+
//look for adapter from payload map
ProviderAdapter providerAdapter = notifierMap.get(payloadKey);
if (providerAdapter != null) {
+
//translate payload to usable information
Object translatedPayload = payloadValue != null ? providerAdapter.translatePayload(payloadValue) : null;
if (translatedPayload != null) {
translatedPayloads.put(payloadKey, translatedPayload);
}
+
}
}
return translatedPayloads;
}
public static String getQueueNames(Properties properties) {
- String name = properties.getProperty(ApplicationQueueManagerImpl.DEFAULT_QUEUE_PROPERTY, ApplicationQueueManagerImpl.DEFAULT_QUEUE_NAME);
+
+ String name = properties.getProperty(ApplicationQueueManagerImpl.DEFAULT_QUEUE_PROPERTY,
+ ApplicationQueueManagerImpl.DEFAULT_QUEUE_NAME);
return name;
+
}
private static final class IteratorObservable<T> implements rx.Observable.OnSubscribe<T> {
@@ -585,15 +628,15 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
try {
while (!subscriber.isUnsubscribed() && input.hasNext()) {
+
//send our input to the next
- //logger.debug("calling next on iterator: {}", input.getClass().getSimpleName());
subscriber.onNext((T) input.next());
+
}
//tell the subscriber we don't have any more data
- //logger.debug("finished iterator: {}", input.getClass().getSimpleName());
-
subscriber.onCompleted();
+
} catch (Throwable t) {
logger.error("failed on subscriber", t);
subscriber.onError(t);
@@ -617,10 +660,9 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
}
}
} catch (Exception e) {
- logger.error("checkForInactiveDevices", e); // not
- // essential so
- // don't fail,
- // but log
+ // not essential so don't fail, but log
+ logger.error("checkForInactiveDevices", e);
+
}
}
}
@@ -630,14 +672,14 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
if (notification.getCanceled() == Boolean.TRUE) {
if (logger.isDebugEnabled()) {
- logger.debug("notification {} canceled. not sending.",
+ logger.debug("Notification {} canceled. Not sending.",
notification.getUuid());
}
return false;
}
if (notification.isExpired()) {
if (logger.isDebugEnabled()) {
- logger.debug("notification {} expired. not sending.",
+ logger.debug("Notification {} expired. Not sending.",
notification.getUuid());
}
return false;
@@ -654,7 +696,7 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
}
return value != null ? value.toString() : null;
} catch (Exception e) {
- logger.error("Error getting provider ID, proceeding with rest of batch", e);
+ logger.error("Error getting notifier for device {}, proceeding with rest of batch", device, e);
return null;
}
}