You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2016/04/26 17:02:38 UTC
[14/50] [abbrv] usergrid git commit: First pass to allow more than 1k
notifications to be sent for groups/queries targeting users and devices.
First pass to allow more than 1k notifications to be sent for groups/queries targeting users and devices.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/8e4d7eef
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/8e4d7eef
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/8e4d7eef
Branch: refs/heads/asf-site
Commit: 8e4d7eef2ca0967491e9ef863c78880e002575d1
Parents: 74de4bc
Author: Michael Russo <mr...@apigee.com>
Authored: Sun Apr 10 23:20:10 2016 +0300
Committer: Michael Russo <mr...@apigee.com>
Committed: Sun Apr 10 23:20:10 2016 +0300
----------------------------------------------------------------------
.../corepersistence/CpRelationManager.java | 34 ++++---
.../services/notifications/TaskManager.java | 7 +-
.../impl/ApplicationQueueManagerImpl.java | 100 ++++++++++++++++---
3 files changed, 113 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/8e4d7eef/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index 67e92f8..b5a4107 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@ -17,14 +17,7 @@
package org.apache.usergrid.corepersistence;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
+import java.util.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -318,14 +311,31 @@ public class CpRelationManager implements RelationManager {
final String ql;
- if ( startResult != null ) {
- ql = "select * where created > " + startResult.timestamp();
- }
- else {
+
+ if (startResult != null ) {
+
+ // UUID timestamp is a different measure than 'created' field on entities
+ Calendar uuidEpoch = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
+ uuidEpoch.clear();
+ uuidEpoch.set(1582, 9, 15, 0, 0, 0); // 9 = October
+ long epochMillis = uuidEpoch.getTime().getTime();
+
+ long time = (startResult.timestamp() / 10000L) + epochMillis;
+
+ if ( !reversed ) {
+ ql = "select * where created > " + time;
+ } else {
+ ql = "select * where created < " + time;
+ }
+
+ } else {
ql = "select *";
}
Query query = Query.fromQL( ql );
+ if(query == null ){
+ throw new RuntimeException("Unable to get data for collection: "+collectionName);
+ }
query.setLimit( count );
query.setReversed( reversed );
http://git-wip-us.apache.org/repos/asf/usergrid/blob/8e4d7eef/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java
index 3e78210..954724f 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java
@@ -131,13 +131,14 @@ public class TaskManager {
}
if ( debug || hasError) {
+
+ List<EntityRef> entities = Arrays.asList(notification, device);
+
if (receipt.getUuid() == null) {
Receipt savedReceipt = em.create(receipt);
- receipt.setUuid(savedReceipt.getUuid());
- List<EntityRef> entities = Arrays.asList(notification, device);
em.addToCollections(entities, Notification.RECEIPTS_COLLECTION, savedReceipt);
} else {
- em.update(receipt);
+ em.addToCollections(entities, Notification.RECEIPTS_COLLECTION, receipt);
}
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/8e4d7eef/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
index 12a47b6..04e60b7 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
@@ -34,6 +34,7 @@ import rx.Observable;
import rx.Subscriber;
import rx.functions.Func1;
+import java.io.IOException;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
@@ -119,7 +120,7 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
final UUID appId = em.getApplication().getUuid();
final Map<String, Object> payloads = notification.getPayloads();
- final Func1<EntityRef, EntityRef> sendMessageFunction = deviceRef -> {
+ final Func1<EntityRef, ApplicationQueueMessage> sendMessageFunction = deviceRef -> {
try {
long now = System.currentTimeMillis();
@@ -143,7 +144,8 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
}
if (notifierId == null) {
- return deviceRef;
+ //TODO need to leverage optional here
+ //return deviceRef;
}
ApplicationQueueMessage message = new ApplicationQueueMessage(appId, notification.getUuid(), deviceRef.getUuid(), notifierKey, notifierId);
@@ -153,16 +155,19 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
notification.setQueued(System.currentTimeMillis());
}
- qm.sendMessage(message);
deviceCount.incrementAndGet();
- queueMeter.mark();
+
+ return message;
} catch (Exception deviceLoopException) {
logger.error("Failed to add device", deviceLoopException);
errorMessages.add("Failed to add device: " + deviceRef.getUuid() + ", error:" + deviceLoopException);
+
+ //TODO need an optional here
+ return new ApplicationQueueMessage(appId, notification.getUuid(), deviceRef.getUuid(), "test", "test");
}
- return deviceRef;
+
};
@@ -174,9 +179,28 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
}, 10)
.distinct(ref -> ref.getUuid())
.map(sendMessageFunction)
+ .buffer(100)
+ .doOnNext( applicationQueueMessages -> {
+
+ applicationQueueMessages.forEach( message -> {
+
+ try {
+
+ qm.sendMessage( message );
+ queueMeter.mark();
+
+ } catch (IOException e) {
+ logger.error("Unable to queue notification for notification UUID {} and device UUID {} ",
+ message.getNotificationId(), message.getDeviceId());
+ }
+
+ });
+
+
+ })
.doOnError(throwable -> logger.error("Failed while trying to send notification", throwable));
- processMessagesObservable.toBlocking().lastOrDefault(null);
+ processMessagesObservable.toBlocking(); // let this run and block the async thread, messages are queued
}
@@ -487,20 +511,70 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
private List<EntityRef> getDevices(EntityRef ref) {
- List<EntityRef> devices = Collections.EMPTY_LIST;
+ List<EntityRef> devices = new ArrayList<>();
+
try {
+
if ("device".equals(ref.getType())) {
+
devices = Collections.singletonList(ref);
+
} else if ("user".equals(ref.getType())) {
- devices = em.getCollection(ref, "devices", null, Query.MAX_LIMIT,
- Query.Level.REFS, false).getRefs();
+
+ UUID start = null;
+ boolean initial = true;
+ int resultSize = 0;
+ while( initial || resultSize >= Query.DEFAULT_LIMIT) {
+
+ initial = false;
+
+ final List<EntityRef> mydevices = em.getCollection(ref, "devices", start, Query.DEFAULT_LIMIT,
+ Query.Level.REFS, true).getRefs();
+
+ resultSize = mydevices.size();
+ if(mydevices.size() > 0){
+ start = mydevices.get(mydevices.size() - 1 ).getUuid();
+ }
+
+
+ devices.addAll( mydevices );
+
+
+ }
+
} else if ("group".equals(ref.getType())) {
- devices = new ArrayList<>();
- for (EntityRef r : em.getCollection(ref, "users", null,
- Query.MAX_LIMIT, Query.Level.REFS, false).getRefs()) {
- devices.addAll(getDevices(r));
+
+ //devices = new ArrayList<>();
+ UUID start = null;
+ boolean initial = true;
+ int resultSize = 0;
+
+ while( initial || resultSize >= Query.DEFAULT_LIMIT){
+
+ initial = false;
+ final List<EntityRef> myusers = em.getCollection(ref, "users", start,
+ Query.DEFAULT_LIMIT, Query.Level.REFS, true).getRefs();
+
+ resultSize = myusers.size();
+ if(myusers.size() > 0){
+ start = myusers.get(myusers.size() - 1 ).getUuid();
+ }
+
+
+ // don't allow a single user to have more than 100 devices?
+ for (EntityRef user : myusers) {
+
+ devices.addAll( em.getCollection(user, "devices", null, 100,
+ Query.Level.REFS, true).getRefs() );
+
+
+ }
+
}
+
+
+
}
} catch (Exception e) {