You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2016/04/26 17:02:38 UTC

[14/50] [abbrv] usergrid git commit: First pass to allow more than 1k notifications to be sent for groups/queries targeting users and devices.

First pass to allow more than 1k notifications to be sent for groups/queries targeting users and devices.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/8e4d7eef
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/8e4d7eef
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/8e4d7eef

Branch: refs/heads/asf-site
Commit: 8e4d7eef2ca0967491e9ef863c78880e002575d1
Parents: 74de4bc
Author: Michael Russo <mr...@apigee.com>
Authored: Sun Apr 10 23:20:10 2016 +0300
Committer: Michael Russo <mr...@apigee.com>
Committed: Sun Apr 10 23:20:10 2016 +0300

----------------------------------------------------------------------
 .../corepersistence/CpRelationManager.java      |  34 ++++---
 .../services/notifications/TaskManager.java     |   7 +-
 .../impl/ApplicationQueueManagerImpl.java       | 100 ++++++++++++++++---
 3 files changed, 113 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/8e4d7eef/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index 67e92f8..b5a4107 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@ -17,14 +17,7 @@
 package org.apache.usergrid.corepersistence;
 
 
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
+import java.util.*;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -318,14 +311,31 @@ public class CpRelationManager implements RelationManager {
 
         final String ql;
 
-        if ( startResult != null ) {
-            ql = "select * where created > " + startResult.timestamp();
-        }
-        else {
+
+        if (startResult != null ) {
+
+            // UUID timestamp is a different measure than 'created' field on entities
+            Calendar uuidEpoch = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
+            uuidEpoch.clear();
+            uuidEpoch.set(1582, 9, 15, 0, 0, 0); // 9 = October
+            long epochMillis = uuidEpoch.getTime().getTime();
+
+            long time = (startResult.timestamp() / 10000L) + epochMillis;
+
+            if ( !reversed ) {
+                ql = "select * where created > " + time;
+            } else {
+                ql = "select * where created < " + time;
+            }
+
+        } else {
             ql = "select *";
         }
 
         Query query = Query.fromQL( ql );
+        if(query == null ){
+            throw new RuntimeException("Unable to get data for collection: "+collectionName);
+        }
         query.setLimit( count );
         query.setReversed( reversed );
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/8e4d7eef/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java
index 3e78210..954724f 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java
@@ -131,13 +131,14 @@ public class TaskManager {
         }
 
         if ( debug || hasError) {
+
+            List<EntityRef> entities = Arrays.asList(notification, device);
+
             if (receipt.getUuid() == null) {
                 Receipt savedReceipt = em.create(receipt);
-                receipt.setUuid(savedReceipt.getUuid());
-                List<EntityRef> entities = Arrays.asList(notification, device);
                 em.addToCollections(entities, Notification.RECEIPTS_COLLECTION, savedReceipt);
             } else {
-                em.update(receipt);
+                em.addToCollections(entities, Notification.RECEIPTS_COLLECTION, receipt);
             }
         }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/8e4d7eef/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
index 12a47b6..04e60b7 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
@@ -34,6 +34,7 @@ import rx.Observable;
 import rx.Subscriber;
 import rx.functions.Func1;
 
+import java.io.IOException;
 import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -119,7 +120,7 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
             final UUID appId = em.getApplication().getUuid();
             final Map<String, Object> payloads = notification.getPayloads();
 
-            final Func1<EntityRef, EntityRef> sendMessageFunction = deviceRef -> {
+            final Func1<EntityRef, ApplicationQueueMessage> sendMessageFunction = deviceRef -> {
                 try {
 
                     long now = System.currentTimeMillis();
@@ -143,7 +144,8 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
                     }
 
                     if (notifierId == null) {
-                        return deviceRef;
+                        //TODO need to leverage optional here
+                        //return deviceRef;
                     }
 
                     ApplicationQueueMessage message = new ApplicationQueueMessage(appId, notification.getUuid(), deviceRef.getUuid(), notifierKey, notifierId);
@@ -153,16 +155,19 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
                         notification.setQueued(System.currentTimeMillis());
 
                     }
-                    qm.sendMessage(message);
                     deviceCount.incrementAndGet();
-                    queueMeter.mark();
+
+                    return message;
 
 
                 } catch (Exception deviceLoopException) {
                     logger.error("Failed to add device", deviceLoopException);
                     errorMessages.add("Failed to add device: " + deviceRef.getUuid() + ", error:" + deviceLoopException);
+
+                    //TODO need an optional here
+                    return new ApplicationQueueMessage(appId, notification.getUuid(), deviceRef.getUuid(), "test", "test");
                 }
-                return deviceRef;
+
             };
 
 
@@ -174,9 +179,28 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
                 }, 10)
                 .distinct(ref -> ref.getUuid())
                 .map(sendMessageFunction)
+                .buffer(100)
+                .doOnNext( applicationQueueMessages -> {
+
+                    applicationQueueMessages.forEach( message -> {
+
+                        try {
+
+                            qm.sendMessage( message );
+                            queueMeter.mark();
+
+                        } catch (IOException e) {
+                           logger.error("Unable to queue notification for notification UUID {} and device UUID {} ",
+                               message.getNotificationId(), message.getDeviceId());
+                        }
+
+                    });
+
+
+                })
                 .doOnError(throwable -> logger.error("Failed while trying to send notification", throwable));
 
-            processMessagesObservable.toBlocking().lastOrDefault(null);
+            processMessagesObservable.toBlocking(); // let this run and block the async thread, messages are queued
 
         }
 
@@ -487,20 +511,70 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
 
     private List<EntityRef> getDevices(EntityRef ref) {
 
-        List<EntityRef> devices = Collections.EMPTY_LIST;
+        List<EntityRef> devices = new ArrayList<>();
+
 
         try {
+
             if ("device".equals(ref.getType())) {
+
                 devices = Collections.singletonList(ref);
+
             } else if ("user".equals(ref.getType())) {
-                devices = em.getCollection(ref, "devices", null, Query.MAX_LIMIT,
-                    Query.Level.REFS, false).getRefs();
+
+                UUID start = null;
+                boolean initial = true;
+                int resultSize = 0;
+                while( initial || resultSize >= Query.DEFAULT_LIMIT) {
+
+                    initial = false;
+
+                    final List<EntityRef> mydevices = em.getCollection(ref, "devices", start, Query.DEFAULT_LIMIT,
+                        Query.Level.REFS, true).getRefs();
+
+                    resultSize = mydevices.size();
+                    if(mydevices.size() > 0){
+                        start = mydevices.get(mydevices.size() - 1 ).getUuid();
+                    }
+
+
+                    devices.addAll( mydevices  );
+
+
+                }
+
             } else if ("group".equals(ref.getType())) {
-                devices = new ArrayList<>();
-                for (EntityRef r : em.getCollection(ref, "users", null,
-                    Query.MAX_LIMIT, Query.Level.REFS, false).getRefs()) {
-                    devices.addAll(getDevices(r));
+
+                //devices = new ArrayList<>();
+                UUID start = null;
+                boolean initial = true;
+                int resultSize = 0;
+
+                while( initial || resultSize >= Query.DEFAULT_LIMIT){
+
+                        initial = false;
+                        final List<EntityRef> myusers =  em.getCollection(ref, "users", start,
+                            Query.DEFAULT_LIMIT, Query.Level.REFS, true).getRefs();
+
+                        resultSize = myusers.size();
+                        if(myusers.size() > 0){
+                            start = myusers.get(myusers.size() - 1 ).getUuid();
+                        }
+
+
+                        // don't allow a single user to have more than 100 devices?
+                        for (EntityRef user : myusers) {
+
+                            devices.addAll( em.getCollection(user, "devices", null, 100,
+                                Query.Level.REFS, true).getRefs() );
+
+
+                        }
+
                 }
+
+
+
             }
         } catch (Exception e) {