You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2016/04/19 18:51:23 UTC

[26/50] usergrid git commit: Push notification queuing to be async.

Push notification queuing to be async.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/11274801
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/11274801
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/11274801

Branch: refs/heads/master
Commit: 11274801059723b4a2264e01f45141080a47511a
Parents: 17e9b36
Author: Michael Russo <mr...@apigee.com>
Authored: Wed Apr 13 15:51:08 2016 +0200
Committer: Michael Russo <mr...@apigee.com>
Committed: Wed Apr 13 15:51:08 2016 +0200

----------------------------------------------------------------------
 .../persistence/entities/Notification.java      | 10 -----
 .../impl/ApplicationQueueManagerImpl.java       | 45 +++++++-------------
 2 files changed, 16 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/11274801/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notification.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notification.java b/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notification.java
index 5c3ee89..dc7c989 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notification.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notification.java
@@ -41,11 +41,6 @@ public class Notification extends TypedEntity {
         NORMAL, HIGH
     }
 
-
-    /** Total count of notifications sent based on the API path/query */
-    @EntityProperty
-    protected int expectedCount;
-
     /** The pathQuery/query that Usergrid used to idenitfy the devices to send the notification to */
     @EntityProperty
     private PathTokens pathQuery;
@@ -107,11 +102,6 @@ public class Notification extends TypedEntity {
     }
 
     @JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
-    public int getExpectedCount() {  return expectedCount;  }
-
-    public void setExpectedCount(int expectedCount) {  this.expectedCount = expectedCount;  }
-
-    @JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
     public PathTokens getPathQuery(){
         return pathQuery;
     }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/11274801/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
index fb4d64c..f819e39 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
@@ -20,10 +20,7 @@ import com.codahale.metrics.Meter;
 import org.apache.usergrid.batch.JobExecution;
 import org.apache.usergrid.persistence.*;
 import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
-import org.apache.usergrid.persistence.entities.Device;
-import org.apache.usergrid.persistence.entities.Notification;
-import org.apache.usergrid.persistence.entities.Notifier;
-import org.apache.usergrid.persistence.entities.Receipt;
+import org.apache.usergrid.persistence.entities.*;
 import org.apache.usergrid.persistence.Query;
 import org.apache.usergrid.persistence.queue.QueueManager;
 import org.apache.usergrid.persistence.queue.QueueMessage;
@@ -33,6 +30,7 @@ import org.slf4j.LoggerFactory;
 import rx.Observable;
 import rx.Subscriber;
 import rx.functions.Func1;
+import rx.schedulers.Schedulers;
 
 import java.io.IOException;
 import java.util.*;
@@ -174,15 +172,18 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
             //process up to 10 concurrently
             Observable processMessagesObservable = Observable.create(new IteratorObservable<Entity>(iterator))
                 .flatMap(entity -> {
+
+                    if(entity.getType().equals(Device.ENTITY_TYPE)){
+                        return Observable.from(Collections.singletonList(entity));
+                    }
+
+                    // if it's not a device, drill down and get them
                     return Observable.from(getDevices(entity));
-                }, 10)
+
+                }, 50)
                 .distinct(ref -> ref.getUuid())
                 .map(sendMessageFunction)
-                .buffer(100)
-                .doOnNext( applicationQueueMessages -> {
-
-                    applicationQueueMessages.forEach( message -> {
-
+                .doOnNext( message -> {
                         try {
                             if(message.isPresent()){
                                 qm.sendMessage( message.get() );
@@ -201,13 +202,12 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
 
                         }
 
-                    });
-
 
                 })
                 .doOnError(throwable -> logger.error("Failed while trying to send notification", throwable));
 
-            processMessagesObservable.toBlocking().lastOrDefault(null); // let this run and block the async thread, messages are queued
+            //TODO verify error handling here
+            processMessagesObservable.subscribeOn(Schedulers.io()).subscribe(); // fire the queuing into the background
 
         }
 
@@ -221,7 +221,6 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
             }
         }
 
-        notification.setExpectedCount(deviceCount.get());
         notification.addProperties(properties);
         em.update(notification);
 
@@ -491,14 +490,7 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
 
 
     private boolean isOkToSend(Notification notification) {
-        Map<String, Long> stats = notification.getStatistics();
-        if (stats != null && notification.getExpectedCount() == (stats.get("sent") + stats.get("errors"))) {
-            if (logger.isDebugEnabled()) {
-                logger.debug("notification {} already processed. not sending.",
-                    notification.getUuid());
-            }
-            return false;
-        }
+
         if (notification.getCanceled() == Boolean.TRUE) {
             if (logger.isDebugEnabled()) {
                 logger.debug("notification {} canceled. not sending.",
@@ -522,14 +514,9 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
 
         final int LIMIT = Query.MID_LIMIT;
 
-
         try {
 
-            if ("device".equals(ref.getType())) {
-
-                devices = Collections.singletonList(ref);
-
-            } else if ("user".equals(ref.getType())) {
+           if (User.ENTITY_TYPE.equals(ref.getType())) {
 
                 UUID start = null;
                 boolean initial = true;
@@ -551,7 +538,7 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
 
                 }
 
-            } else if ("group".equals(ref.getType())) {
+            } else if (Group.ENTITY_TYPE.equals(ref.getType())) {
 
                 UUID start = null;
                 boolean initial = true;