You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2016/04/19 18:51:43 UTC
[46/50] usergrid git commit: Final changes to enhance parallel
loading of devices for push notifications.
Final changes to enhance parallel loading of devices for push notifications.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/06caa250
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/06caa250
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/06caa250
Branch: refs/heads/master
Commit: 06caa2509407322498c025b1b3d39135d82777cc
Parents: cc3cbfe
Author: Michael Russo <mr...@apigee.com>
Authored: Sun Apr 17 18:02:32 2016 +0100
Committer: Michael Russo <mr...@apigee.com>
Committed: Sun Apr 17 18:02:32 2016 +0100
----------------------------------------------------------------------
.../pipeline/builder/IdBuilder.java | 2 +-
.../persistence/MultiQueryIterator.java | 2 +-
.../persistence/NotificationGraphIterator.java | 59 ++----
.../persistence/PagingResultsIterator.java | 25 ++-
.../apache/usergrid/persistence/PathQuery.java | 14 +-
.../apache/usergrid/persistence/Results.java | 4 +
.../impl/ApplicationQueueManagerImpl.java | 211 +++++--------------
7 files changed, 102 insertions(+), 215 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/06caa250/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java
index 65cf7c1..781d7d5 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/builder/IdBuilder.java
@@ -151,7 +151,7 @@ public class IdBuilder {
public Observable<ResultsPage<Id>> build(){
//we must add our resume filter so we drop our previous page first element if it's present
- return pipeline.withFilter( new IdFilter() ).withFilter(new ResultsPageCollector<>()).execute();
+ return pipeline.withFilter( new IdResumeFilter() ).withFilter(new ResultsPageCollector<>()).execute();
}
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/06caa250/stack/core/src/main/java/org/apache/usergrid/persistence/MultiQueryIterator.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/MultiQueryIterator.java b/stack/core/src/main/java/org/apache/usergrid/persistence/MultiQueryIterator.java
index c5de5c1..9e28204 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/MultiQueryIterator.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/MultiQueryIterator.java
@@ -62,7 +62,7 @@ public class MultiQueryIterator implements ResultsIterator {
EntityRef ref = source.next();
Results r = getResultsFor( ref );
if ( r.size() > 0 ) {
- currentIterator = new PagingResultsIterator( r, query.getResultsLevel() );
+ currentIterator = new PagingResultsIterator( r, query.getResultsLevel(), null);
return currentIterator.hasNext();
}
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/06caa250/stack/core/src/main/java/org/apache/usergrid/persistence/NotificationGraphIterator.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/NotificationGraphIterator.java b/stack/core/src/main/java/org/apache/usergrid/persistence/NotificationGraphIterator.java
index a1f3246..a1b162d 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/NotificationGraphIterator.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/NotificationGraphIterator.java
@@ -23,10 +23,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-import java.util.UUID;
-import java.util.stream.Collectors;
public class NotificationGraphIterator implements ResultsIterator, Iterable {
@@ -67,18 +63,20 @@ public class NotificationGraphIterator implements ResultsIterator, Iterable {
Object next = source.next();
Results r;
-// if(next instanceof UUID){
-//
-// UUID id = (UUID) next;
-// r = getResultsForId(id, "user");
-//
-// }else {
- EntityRef ref = (EntityRef) next;
- r = getResultsFor(ref);
- // }
+ EntityRef ref = (EntityRef) next;
+ r = getResultsFor(ref);
if (r.size() > 0) {
- currentIterator = new PagingResultsIterator(r, query.getResultsLevel());
+
+
+ if(ref.getType().equals(Group.ENTITY_TYPE)) {
+
+ currentIterator = new PagingResultsIterator(r, query.getResultsLevel(), Query.Level.REFS);
+ }else{
+ currentIterator = new PagingResultsIterator(r, query.getResultsLevel(), null);
+
+ }
+
return currentIterator.hasNext();
}
}
@@ -122,26 +120,13 @@ public class NotificationGraphIterator implements ResultsIterator, Iterable {
// if we're fetching devices through groups->users->devices, get only the IDs and don't load the entities
if( ref.getType().equals(Group.ENTITY_TYPE)){
- // query users using IDs as we don't need to load the full entities just to find their devices
- Query usersQuery = new Query();
- usersQuery.setCollection("users");
- usersQuery.setResultsLevel(Query.Level.IDS);
- usersQuery.setLimit(1000);
+ // groups->users is a passthrough to devices, load our max limit
+ query.setLimit(Query.MAX_LIMIT);
-
- // set the query level for the iterator temporarily to IDS
+ // set the query level for the when fetching users to IDS, we don't need the full entity
query.setResultsLevel(Query.Level.IDS);
- return entityManager.searchCollection(ref, usersQuery.getCollection(), usersQuery);
-
-
-// List<EntityRef> refs =
-// results.getIds().stream()
-// .map( uuid -> new SimpleEntityRef( "user", uuid) ).collect(Collectors.toList());
-//
-// // set the query level for the iterator back to REFS after mapping our IDS
-// query.setResultsLevel(Query.Level.REFS);
-// return Results.fromRefList(refs);
+ return entityManager.searchCollection(ref, "users", query);
}
@@ -151,8 +136,6 @@ public class NotificationGraphIterator implements ResultsIterator, Iterable {
devicesQuery.setCollection("devices");
devicesQuery.setResultsLevel(Query.Level.CORE_PROPERTIES);
- //query.setCollection("devices");
- //query.setResultsLevel(Query.Level.CORE_PROPERTIES);
return entityManager.searchCollection(ref, devicesQuery.getCollection(), devicesQuery);
}
@@ -177,14 +160,4 @@ public class NotificationGraphIterator implements ResultsIterator, Iterable {
}
}
-
- private Results getResultsForId(UUID uuid, String type) {
-
- EntityRef ref = new SimpleEntityRef(type, uuid);
- return getResultsFor(ref);
-
-
- }
-
-
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/06caa250/stack/core/src/main/java/org/apache/usergrid/persistence/PagingResultsIterator.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/PagingResultsIterator.java b/stack/core/src/main/java/org/apache/usergrid/persistence/PagingResultsIterator.java
index a883e1b..640ee06 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/PagingResultsIterator.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/PagingResultsIterator.java
@@ -19,6 +19,8 @@ package org.apache.usergrid.persistence;
import java.util.Iterator;
import java.util.List;
+import java.util.stream.Collectors;
+
import org.apache.usergrid.persistence.Query.Level;
@@ -28,20 +30,23 @@ public class PagingResultsIterator implements ResultsIterator, Iterable {
private Results results;
private Iterator currentPageIterator;
private Level level;
+ private Level overrideLevel;
public PagingResultsIterator( Results results ) {
- this( results, results.level );
+ this( results, results.level, null);
}
/**
* @param level overrides the default level from the Results - in case you want to return, say, UUIDs where the
* Query was set for Entities
+ * @param overrideLevel
*/
- public PagingResultsIterator( Results results, Level level ) {
+ public PagingResultsIterator(Results results, Level level, Level overrideLevel) {
this.results = results;
this.level = level;
+ this.overrideLevel = overrideLevel;
initCurrentPageIterator();
}
@@ -86,16 +91,32 @@ public class PagingResultsIterator implements ResultsIterator, Iterable {
*/
private boolean initCurrentPageIterator() {
List currentPage;
+ Level origLevel = level;
+ if(overrideLevel != null){
+ level=overrideLevel;
+ if(results.getIds()!=null){
+
+ List<EntityRef> userRefs = results.getIds().stream()
+ .map( uuid -> new SimpleEntityRef("user", uuid)).collect(Collectors.toList());
+
+ results.setRefs(userRefs);
+
+ }
+ }
+
if ( results != null ) {
switch ( level ) {
case IDS:
currentPage = results.getIds();
+ level = origLevel;
break;
case REFS:
currentPage = results.getRefs();
+ level = origLevel;
break;
default:
currentPage = results.getEntities();
+ level = origLevel;
}
if ( currentPage.size() > 0 ) {
currentPageIterator = currentPage.iterator();
http://git-wip-us.apache.org/repos/asf/usergrid/blob/06caa250/stack/core/src/main/java/org/apache/usergrid/persistence/PathQuery.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/PathQuery.java b/stack/core/src/main/java/org/apache/usergrid/persistence/PathQuery.java
index 215f6ac..30636ab 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/PathQuery.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/PathQuery.java
@@ -88,7 +88,7 @@ public class PathQuery<E> {
try {
if ( uuid != null && type != null ) {
- return new PagingResultsIterator( getHeadResults( em ), query.getResultsLevel() );
+ return new PagingResultsIterator( getHeadResults( em ), query.getResultsLevel(), null);
}
else {
return new MultiQueryIterator( em, source.refIterator( em, false), query );
@@ -103,7 +103,7 @@ public class PathQuery<E> {
try {
if ( uuid != null && type != null ) {
- return new PagingResultsIterator( getHeadResults( em ), query.getResultsLevel() );
+ return new PagingResultsIterator( getHeadResults( em ), query.getResultsLevel(), null);
}else {
return new NotificationGraphIterator(em, source.refIterator(em, true), query);
@@ -130,6 +130,12 @@ public class PathQuery<E> {
UUID entityId = em.getUniqueIdFromAlias( entityType, name );
+ if( entityId == null){
+ throw new
+ IllegalArgumentException("Entity with name "+name+" not found. Unable to send push notification");
+ }
+
+
return em.getEntities(Collections.singletonList(entityId), entityType);
}
@@ -143,12 +149,12 @@ public class PathQuery<E> {
if ( query.getQl() == null && query.getSingleNameOrEmailIdentifier() != null){
- return new PagingResultsIterator( getHeadResults( em ), Level.REFS );
+ return new PagingResultsIterator( getHeadResults( em ), Level.REFS, null);
}
if ( type != null && uuid != null) {
- return new PagingResultsIterator( getHeadResults( em ), Level.REFS );
+ return new PagingResultsIterator( getHeadResults( em ), Level.REFS, null);
}
else {
Query q = query;
http://git-wip-us.apache.org/repos/asf/usergrid/blob/06caa250/stack/core/src/main/java/org/apache/usergrid/persistence/Results.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/Results.java b/stack/core/src/main/java/org/apache/usergrid/persistence/Results.java
index 2a84622..3502581 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/Results.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/Results.java
@@ -436,6 +436,10 @@ public class Results implements Iterable<Entity> {
level = Level.REFS;
}
+ public void setRefsOnly( List<EntityRef> resultsRefs ) {
+ refs = resultsRefs;
+ }
+
public Results withRefs( List<EntityRef> resultsRefs ) {
setRefs( resultsRefs );
http://git-wip-us.apache.org/repos/asf/usergrid/blob/06caa250/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
index 1cbb2c6..2f39ae4 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
@@ -52,6 +52,8 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
private final Meter queueMeter;
private final Meter sendMeter;
+ private final static String PUSH_PROCESSING_CONCURRENCY_PROP = "usergrid.push.async.processing.concurrency";
+
HashMap<Object, ProviderAdapter> notifierHashMap; // only retrieve notifiers once
@@ -91,25 +93,22 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
return;
}
- if (logger.isTraceEnabled()) {
- logger.trace("notification {} start queuing", notification.getUuid());
- }
-
final PathQuery<Device> pathQuery = notification.getPathQuery().buildPathQuery(); //devices query
final AtomicInteger deviceCount = new AtomicInteger(); //count devices so you can make a judgement on batching
final ConcurrentLinkedQueue<String> errorMessages = new ConcurrentLinkedQueue<>(); //build up list of issues
- //get devices in querystring, and make sure you have access
+ // Get devices in querystring, and make sure you have access
if (pathQuery != null) {
final HashMap<Object, ProviderAdapter> notifierMap = getAdapterMap();
if (logger.isTraceEnabled()) {
logger.trace("notification {} start query", notification.getUuid());
}
- logger.info("notification {} start query", notification.getUuid());
+
+ logger.info("Notification {} started processing", notification.getUuid());
- // the main iterator can use graph traversal or index querying
+ // The main iterator can use graph traversal or index querying based on payload property. Default is Index.
final Iterator<Device> iterator;
if( notification.getUseGraph()){
iterator = pathQuery.graphIterator(em);
@@ -117,15 +116,24 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
iterator = pathQuery.iterator(em);
}
-// //if there are more pages (defined by PAGE_SIZE) you probably want this to be async, also if this is already a job then don't reschedule
-// if (iterator instanceof ResultsIterator && ((ResultsIterator) iterator).hasPages() && jobExecution == null) {
-// if(logger.isTraceEnabled()){
-// logger.trace("Scheduling notification job as it has multiple pages of devices.");
-// }
-// jobScheduler.scheduleQueueJob(notification, true);
-// em.update(notification);
-// return;
-// }
+ /**** Old code to scheduler large sets of data, but now the processing is fired off async in the background.
+ Leaving this only a reference of how to do it, if needed in future.
+
+ //if there are more pages (defined by PAGE_SIZE) you probably want this to be async,
+ //also if this is already a job then don't reschedule
+
+ if (iterator instanceof ResultsIterator
+ && ((ResultsIterator) iterator).hasPages() && jobExecution == null) {
+
+ if(logger.isTraceEnabled()){
+ logger.trace("Scheduling notification job as it has multiple pages of devices.");
+ }
+ jobScheduler.scheduleQueueJob(notification, true);
+ em.update(notification);
+ return;
+ }
+ ****/
+
final UUID appId = em.getApplication().getUuid();
final Map<String, Object> payloads = notification.getPayloads();
@@ -182,87 +190,57 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
};
- final Map<String, Object> filters = notification.getFilters();
+ final Map<String, Object> filters = notification.getFilters();
+ Observable processMessagesObservable = Observable.create(new IteratorObservable<EntityRef>(iterator))
- Observable processMessagesObservable = Observable.create(new IteratorObservable<UUID>(iterator))
-// .flatMap(entity -> {
-//
-// if(entity.getType().equals(Device.ENTITY_TYPE)){
-// return Observable.from(Collections.singletonList(entity));
-// }
-//
-// // if it's not a device, drill down and get them
-// return Observable.from(getDevices(entity));
-//
-// })
- .distinct()
.flatMap( entityRef -> {
return Observable.just(entityRef).flatMap(ref->{
List<Entity> entities = new ArrayList<>();
+ if( ref.getType().equals(User.ENTITY_TYPE)){
+
Query devicesQuery = new Query();
devicesQuery.setCollection("devices");
devicesQuery.setResultsLevel(Query.Level.CORE_PROPERTIES);
try {
- entities = em.searchCollection(new SimpleEntityRef("user", ref), devicesQuery.getCollection(), devicesQuery).getEntities();
+ entities = em.searchCollection(new SimpleEntityRef("user", ref.getUuid()), devicesQuery.getCollection(), devicesQuery).getEntities();
}catch (Exception e){
- logger.error("Unable to load devices for user: {}", ref);
+ logger.error("Unable to load devices for user: {}", ref.getUuid());
return Observable.empty();
}
+ }else if ( ref.getType().equals(Device.ENTITY_TYPE)){
+ try{
+ entities.add(em.get(ref));
-// if( ref.getType().equals(User.ENTITY_TYPE)){
-//
-// Query devicesQuery = new Query();
-// devicesQuery.setCollection("devices");
-// devicesQuery.setResultsLevel(Query.Level.CORE_PROPERTIES);
-//
-// try {
-//
-// entities = em.searchCollection(new SimpleEntityRef("user", ref.getUuid()), devicesQuery.getCollection(), devicesQuery).getEntities();
-//
-// }catch (Exception e){
-//
-// logger.error("Unable to load devices for user: {}", ref.getUuid());
-// return Observable.empty();
-// }
-//
-//
-// }else if ( ref.getType().equals(Device.ENTITY_TYPE)){
-//
-// try{
-// entities.add(em.get(ref));
-//
-// }catch(Exception e){
-//
-// logger.error("Unable to load device: {}", ref.getUuid());
-// return Observable.empty();
-//
-// }
-//
-// }
+ }catch(Exception e){
+
+ logger.error("Unable to load device: {}", ref.getUuid());
+ return Observable.empty();
+
+ }
+
+ }
return Observable.from(entities);
})
+ .distinct( deviceRef -> deviceRef.getUuid())
.filter( device -> {
- logger.info("Filtering device: {}", device.getUuid());
-
if(logger.isTraceEnabled()) {
logger.trace("Filtering device: {}", device.getUuid());
}
-
if(notification.getUseGraph() && filters.size() > 0 ) {
for (Map.Entry<String, Object> entry : filters.entrySet()) {
@@ -280,7 +258,6 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
}
}
-
if(logger.isTraceEnabled()) {
logger.trace("Push notification filter did not match for notification {}, so removing from notification",
device.getUuid(), notification.getUuid());
@@ -321,20 +298,7 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
}).subscribeOn(Schedulers.io());
- }, 100)
- //.map( entityRef -> entityRef.getUuid() )
- //.buffer(10)
-// .flatMap( uuids -> {
-//
-// if(logger.isTraceEnabled()) {
-// logger.trace("Processing batch of {} device(s)", uuids.size());
-// }
-//
-//
-// return Observable.from(em.getEntities(uuids, "device")).subscribeOn(Schedulers.io());
-//
-// }, 10)
-
+ }, Integer.valueOf(System.getProperty(PUSH_PROCESSING_CONCURRENCY_PROP, "50")))
.doOnError(throwable -> {
logger.error("Error while processing devices for notification : {}", notification.getUuid());
@@ -355,7 +319,7 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
notification.setProcessingFinished(System.currentTimeMillis());
notification.setDeviceProcessedCount(deviceCount.get());
em.update(notification);
- logger.info("{} device(s) processed for notification {}", deviceCount.get(), notification.getUuid());
+ logger.info("Notification {} finished processing {} device(s)", notification.getUuid(), deviceCount.get());
} catch (Exception e) {
logger.error("Unable to set processing finished timestamp for notification");
@@ -622,10 +586,13 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
try {
while (!subscriber.isUnsubscribed() && input.hasNext()) {
//send our input to the next
+ //logger.debug("calling next on iterator: {}", input.getClass().getSimpleName());
subscriber.onNext((T) input.next());
}
//tell the subscriber we don't have any more data
+ //logger.debug("finished iterator: {}", input.getClass().getSimpleName());
+
subscriber.onCompleted();
} catch (Throwable t) {
logger.error("failed on subscriber", t);
@@ -678,90 +645,6 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
return true;
}
- private List<EntityRef> getDevices(EntityRef ref) {
-
- List<EntityRef> devices = new ArrayList<>();
-
- final int LIMIT = Query.MID_LIMIT;
-
- try {
-
- if (User.ENTITY_TYPE.equals(ref.getType())) {
-
- UUID start = null;
- boolean initial = true;
- int resultSize = 0;
- while( initial || resultSize >= Query.DEFAULT_LIMIT) {
-
- initial = false;
-
- final List<EntityRef> mydevices = em.getCollection(ref, "devices", start, LIMIT,
- Query.Level.REFS, true).getRefs();
-
- resultSize = mydevices.size();
-
- if(mydevices.size() > 0){
- start = mydevices.get(mydevices.size() - 1 ).getUuid();
- }
-
- devices.addAll( mydevices );
-
- }
-
- } else if (Group.ENTITY_TYPE.equals(ref.getType())) {
-
- UUID start = null;
- boolean initial = true;
- int resultSize = 0;
-
- while( initial || resultSize >= LIMIT){
-
- initial = false;
-
- final List<EntityRef> myusers = em.getCollection(ref, "users", start,
- LIMIT, Query.Level.REFS, true).getRefs();
- resultSize = myusers.size();
-
- if(myusers.size() > 0){
- start = myusers.get(myusers.size() - 1 ).getUuid();
- }
-
-
- Observable.from(myusers).flatMap( user -> {
-
- try {
- devices.addAll(em.getCollection(user, "devices", null, 100,
- Query.Level.REFS, true).getRefs());
- }catch (Exception e){
- logger.error ("Unable to fetch devices for user: {}", user.getUuid());
- }
- return Observable.from(Collections.singletonList(user));
-
- }, 50).toBlocking().lastOrDefault(null);
-
-
-
-
-
- }
-
- }
- } catch (Exception e) {
-
- if (ref != null){
- logger.error("Error while retrieving devices for entity type {} and uuid {}. Error: {}",
- ref.getType(), ref.getUuid(), e);
- }else{
- logger.error("Error while retrieving devices. Entity ref was null.");
- }
-
- throw new RuntimeException("Unable to retrieve devices for EntityRef", e);
-
- }
-
- return devices;
- }
-
private String getProviderId(EntityRef device, Notifier notifier) throws Exception {
try {