You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by mr...@apache.org on 2016/06/02 17:07:37 UTC

[19/54] [abbrv] usergrid git commit: Add a separate executor pool for async processing instead of unbounded Schedulers.io()

Add a separate executor pool for async processing instead of unbounded Schedulers.io()


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/f272af2f
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/f272af2f
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/f272af2f

Branch: refs/heads/apm
Commit: f272af2f3b41ce6ded649ffcd7410f23e33587fb
Parents: 6488b05
Author: Michael Russo <mr...@apigee.com>
Authored: Wed Apr 20 19:50:41 2016 +0100
Committer: Michael Russo <mr...@apigee.com>
Committed: Wed Apr 20 19:50:41 2016 +0100

----------------------------------------------------------------------
 .../services/notifications/TaskManager.java     | 96 +++++++++-----------
 .../impl/ApplicationQueueManagerImpl.java       | 86 +++++++++++++-----
 2 files changed, 105 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/f272af2f/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java
index ce2b82c..531ca7c 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java
@@ -37,12 +37,10 @@ public class TaskManager {
     private AtomicLong successes = new AtomicLong();
     private AtomicLong failures = new AtomicLong();
     private EntityManager em;
-    private boolean hasFinished;
 
     public TaskManager(EntityManager em, Notification notification) {
         this.em = em;
         this.notification = notification;
-        hasFinished = false;
     }
 
     public long getSuccesses(){return successes.get();}
@@ -53,77 +51,69 @@ public class TaskManager {
         completed(notifier,null,deviceUUID,null);
     }
     public void completed(Notifier notifier, Receipt receipt, UUID deviceUUID, String newProviderId) throws Exception {
-        if (logger.isTraceEnabled()) {
-            logger.trace("REMOVED {}", deviceUUID);
-        }
+
+        successes.incrementAndGet();
+
         try {
-            if (logger.isTraceEnabled()) {
-                logger.trace("notification {} removing device {} from remaining", notification.getUuid(), deviceUUID);
-            }
 
             EntityRef deviceRef = new SimpleEntityRef(Device.ENTITY_TYPE, deviceUUID);
+
             if (receipt != null) {
-                if (logger.isTraceEnabled()) {
-                    logger.trace("notification {} sent to device {}. saving receipt.", notification.getUuid(), deviceUUID);
-                }
+
                 receipt.setSent(System.currentTimeMillis());
                 this.saveReceipt(notification, deviceRef, receipt,false);
                 if (logger.isTraceEnabled()) {
-                    logger.trace("notification {} receipt saved for device {}", notification.getUuid(), deviceUUID);
+                    logger.trace("Notification {} receipt saved for device {}", notification.getUuid(), deviceUUID);
                 }
-                successes.incrementAndGet();
+
             }
 
             if (newProviderId != null) {
                 if (logger.isTraceEnabled()) {
-                    logger.trace("notification {} replacing device {} notifierId", notification.getUuid(), deviceUUID);
+                    logger.trace("Notification {} replacing notifier id for device {} ", notification.getUuid(), deviceUUID);
                 }
                 replaceProviderId(deviceRef, notifier, newProviderId);
             }
 
             if (logger.isTraceEnabled()) {
-                logger.trace("notification {} completed device {}", notification.getUuid(), deviceUUID);
+                logger.trace("Notification {} sending completed for device {}", notification.getUuid(), deviceUUID);
             }
 
-        } finally {
-            if (logger.isTraceEnabled()) {
-                logger.trace("COUNT is: {}", successes.get());
-            }
-//            if (hasFinished) { //process has finished but notifications are still coming in
-//                finishedBatch();
-//
-//            }
+        } catch(Exception e) {
+
+            logger.error("Unable to mark notification {} as completed due to: {}", notification.getUuid(), e);
+
         }
     }
 
     public void failed(Notifier notifier, Receipt receipt, UUID deviceUUID, Object code, String message) throws Exception {
 
+        failures.incrementAndGet();
+
         try {
             if (logger.isDebugEnabled()) {
-                logger.debug("notification {} for device {} got error {}", notification.getUuid(), deviceUUID, code);
+                logger.debug("Notification {} for device {} got error {}", notification.getUuid(), deviceUUID, code);
             }
 
-            failures.incrementAndGet();
-            if(receipt!=null) {
-                if ( receipt.getUuid() != null ) {
-                    successes.decrementAndGet();
-                }
+            if(receipt != null) {
                 receipt.setErrorCode( code );
                 receipt.setErrorMessage( message );
                 this.saveReceipt( notification, new SimpleEntityRef( Device.ENTITY_TYPE, deviceUUID ), receipt, true );
-                if ( logger.isDebugEnabled() ) {
-                    logger.debug( "notification {} receipt saved for device {}", notification.getUuid(), deviceUUID );
-                }
             }
-        } finally {
+
             completed(notifier, deviceUUID);
             finishedBatch();
+
+        } catch (Exception e){
+
+            logger.error("Unable to finish marking notification {} as failed due to error: ", notification.getUuid(), e);
+
         }
     }
 
-    /*
-    * called from TaskManager - creates a persistent receipt and updates the
-    * passed one w/ the UUID
+    /**
+    * Called from TaskManager - Creates a persistent receipt
+    *
     */
     private void saveReceipt(EntityRef notification, EntityRef device, Receipt receipt, boolean hasError) throws Exception {
 
@@ -142,11 +132,16 @@ public class TaskManager {
             } else {
                 em.addToCollections(entities, Notification.RECEIPTS_COLLECTION, receipt);
             }
+
+            if ( logger.isDebugEnabled() ) {
+                logger.debug( "Notification {} receipt saved for device {}", notification.getUuid(), device.getUuid() );
+            }
+
         }
 
     }
 
-    protected void replaceProviderId(EntityRef device, Notifier notifier,
+    private void replaceProviderId(EntityRef device, Notifier notifier,
                                      String newProviderId) throws Exception {
         Object value = em.getProperty(device, notifier.getName()
                 + ApplicationQueueManager.NOTIFIER_ID_POSTFIX);
@@ -161,33 +156,24 @@ public class TaskManager {
         }
     }
 
-    public void finishedBatch() throws Exception {
-        finishedBatch(true);
-    }
 
-    public void finishedBatch(boolean refreshNotification) throws Exception {
-
-        long successes = this.successes.get(); //reset counters
-        long failures = this.failures.get(); //reset counters
+    public void finishedBatch() throws Exception {
 
-        for (int i = 0; i < successes; i++) {
-            this.successes.decrementAndGet();
-        }
-        for (int i = 0; i < failures; i++) {
-            this.failures.decrementAndGet();
-        }
+        long successes = this.successes.get();
+        long failures = this.failures.get();
 
-        this.hasFinished = true;
+        // reset the counters
+        this.successes.set(0);
+        this.failures.set(0);
 
-        // force refresh notification by fetching it
-        if (refreshNotification) {
-            notification = em.get(this.notification.getUuid(), Notification.class);
-        }
+        // get the latest notification info
+        notification = em.get(this.notification.getUuid(), Notification.class);
 
         notification.updateStatistics(successes, failures);
         notification.setModified(System.currentTimeMillis());
         notification.setFinished(notification.getModified());
 
         em.update(notification);
+
     }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/f272af2f/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
index 2f39ae4..1bb92b7 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
@@ -19,6 +19,7 @@ package org.apache.usergrid.services.notifications.impl;
 import com.codahale.metrics.Meter;
 import org.apache.usergrid.batch.JobExecution;
 import org.apache.usergrid.persistence.*;
+import org.apache.usergrid.persistence.core.executor.TaskExecutorFactory;
 import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
 import org.apache.usergrid.persistence.entities.*;
 import org.apache.usergrid.persistence.Query;
@@ -52,11 +53,19 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
     private final Meter queueMeter;
     private final Meter sendMeter;
 
+    private final static String PUSH_PROCESSING_MAXTHREADS_PROP = "usergrid.push.async.processing.threads";
+    private final static String PUSH_PROCESSING_QUEUESIZE_PROP = "usergrid.push.async.processing.queue.size";
     private final static String PUSH_PROCESSING_CONCURRENCY_PROP = "usergrid.push.async.processing.concurrency";
 
     HashMap<Object, ProviderAdapter> notifierHashMap; // only retrieve notifiers once
 
 
+
+    private final ExecutorService asyncExecutor;
+
+
+
+
     public ApplicationQueueManagerImpl( JobScheduler jobScheduler, EntityManager entityManager,
                                         QueueManager queueManager, MetricsFactory metricsFactory,
                                         Properties properties) {
@@ -65,8 +74,31 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
         this.jobScheduler = jobScheduler;
         this.metricsFactory = metricsFactory;
         this.queueName = getQueueNames(properties);
-        queueMeter = metricsFactory.getMeter(ApplicationQueueManagerImpl.class, "notification.queue");
-        sendMeter = metricsFactory.getMeter(NotificationsService.class, "queue.send");
+        this.queueMeter = metricsFactory.getMeter(ApplicationQueueManagerImpl.class, "notification.queue");
+        this.sendMeter = metricsFactory.getMeter(NotificationsService.class, "queue.send");
+
+        int maxAsyncThreads;
+        int workerQueueSize;
+
+        try {
+
+            maxAsyncThreads = Integer.valueOf(System.getProperty(PUSH_PROCESSING_MAXTHREADS_PROP, "200"));
+            workerQueueSize = Integer.valueOf(System.getProperty(PUSH_PROCESSING_QUEUESIZE_PROP, "2000"));
+
+        } catch (Exception e){
+
+            // if junk is passed into the property, just default the values
+            maxAsyncThreads = 200;
+            workerQueueSize = 2000;
+
+        }
+
+
+        // create our own executor which has a bounded queue w/ caller runs policy for rejected tasks
+        this.asyncExecutor = TaskExecutorFactory
+            .createTaskExecutor( "push-device-io", maxAsyncThreads, workerQueueSize,
+                TaskExecutorFactory.RejectionAction.CALLERRUNS );
+
 
     }
 
@@ -296,7 +328,7 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
                             }
 
 
-                        }).subscribeOn(Schedulers.io());
+                        }).subscribeOn(Schedulers.from(asyncExecutor));
 
                 }, Integer.valueOf(System.getProperty(PUSH_PROCESSING_CONCURRENCY_PROP, "50")))
                 .doOnError(throwable -> {
@@ -327,7 +359,7 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
 
                 });
 
-            processMessagesObservable.subscribeOn(Schedulers.io()).subscribe(); // fire the queuing into the background
+            processMessagesObservable.subscribeOn(Schedulers.from(asyncExecutor)).subscribe(); // fire the queuing into the background
 
         }
 
@@ -348,7 +380,7 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
         // if no devices, go ahead and mark the batch finished
         if (deviceCount.get() <= 0 ) {
             TaskManager taskManager = new TaskManager(em, notification);
-            taskManager.finishedBatch(true);
+            taskManager.finishedBatch();
         }
 
 
@@ -540,32 +572,43 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
 
 
     /**
-     * Validates that a notifier and adapter exists to send notifications to;
-     * {"winphone":"mymessage","apple":"mymessage"}
-     * TODO: document this method better
+     *  Validates that a notifier and adapter exists to send notifications to. For the example payload
+     *
+     *  { "payloads" : {"winphone":"mymessage","apple":"mymessage"} }
+     *
+     *  Notifiers with name "winphone" and "apple" must exist.
      */
-    private Map<String, Object> translatePayloads(Map<String, Object> payloads, Map<Object, ProviderAdapter>
-        notifierMap) throws Exception {
-        Map<String, Object> translatedPayloads = new HashMap<String, Object>(payloads.size());
+    private Map<String, Object> translatePayloads(Map<String, Object> payloads,
+                                                  Map<Object, ProviderAdapter> notifierMap) throws Exception {
+
+        final Map<String, Object> translatedPayloads = new HashMap<String, Object>(payloads.size());
+
         for (Map.Entry<String, Object> entry : payloads.entrySet()) {
+
             String payloadKey = entry.getKey().toLowerCase();
             Object payloadValue = entry.getValue();
+
             //look for adapter from payload map
             ProviderAdapter providerAdapter = notifierMap.get(payloadKey);
             if (providerAdapter != null) {
+
                 //translate payload to usable information
                 Object translatedPayload = payloadValue != null ? providerAdapter.translatePayload(payloadValue) : null;
                 if (translatedPayload != null) {
                     translatedPayloads.put(payloadKey, translatedPayload);
                 }
+
             }
         }
         return translatedPayloads;
     }
 
     public static String getQueueNames(Properties properties) {
-        String name = properties.getProperty(ApplicationQueueManagerImpl.DEFAULT_QUEUE_PROPERTY, ApplicationQueueManagerImpl.DEFAULT_QUEUE_NAME);
+
+        String name = properties.getProperty(ApplicationQueueManagerImpl.DEFAULT_QUEUE_PROPERTY,
+            ApplicationQueueManagerImpl.DEFAULT_QUEUE_NAME);
         return name;
+
     }
 
     private static final class IteratorObservable<T> implements rx.Observable.OnSubscribe<T> {
@@ -585,15 +628,15 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
 
             try {
                 while (!subscriber.isUnsubscribed() && input.hasNext()) {
+
                     //send our input to the next
-                    //logger.debug("calling next on iterator: {}", input.getClass().getSimpleName());
                     subscriber.onNext((T) input.next());
+
                 }
 
                 //tell the subscriber we don't have any more data
-                //logger.debug("finished iterator: {}", input.getClass().getSimpleName());
-
                 subscriber.onCompleted();
+
             } catch (Throwable t) {
                 logger.error("failed on subscriber", t);
                 subscriber.onError(t);
@@ -617,10 +660,9 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
                     }
                 }
             } catch (Exception e) {
-                logger.error("checkForInactiveDevices", e); // not
-                // essential so
-                // don't fail,
-                // but log
+                // not essential so don't fail, but log
+                logger.error("checkForInactiveDevices", e);
+
             }
         }
     }
@@ -630,14 +672,14 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
 
         if (notification.getCanceled() == Boolean.TRUE) {
             if (logger.isDebugEnabled()) {
-                logger.debug("notification {} canceled. not sending.",
+                logger.debug("Notification {} canceled. Not sending.",
                     notification.getUuid());
             }
             return false;
         }
         if (notification.isExpired()) {
             if (logger.isDebugEnabled()) {
-                logger.debug("notification {} expired. not sending.",
+                logger.debug("Notification {} expired. Not sending.",
                     notification.getUuid());
             }
             return false;
@@ -654,7 +696,7 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
             }
             return value != null ? value.toString() : null;
         } catch (Exception e) {
-            logger.error("Error getting provider ID, proceeding with rest of batch", e);
+            logger.error("Error getting notifier for device {}, proceeding with rest of batch", device, e);
             return null;
         }
     }