You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by mr...@apache.org on 2016/04/16 00:08:44 UTC

usergrid git commit: Introduce a graph iterator for segmenting push notifications.

Repository: usergrid
Updated Branches:
  refs/heads/release-2.1.1 f064c499c -> 32ab5da0a


Introduce a graph iterator for segmenting push notifications.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/32ab5da0
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/32ab5da0
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/32ab5da0

Branch: refs/heads/release-2.1.1
Commit: 32ab5da0af66d2dfc4ed9fdb8ddf84a6b0231c03
Parents: f064c49
Author: Michael Russo <mr...@apigee.com>
Authored: Sat Apr 16 00:08:32 2016 +0200
Committer: Michael Russo <mr...@apigee.com>
Committed: Sat Apr 16 00:08:32 2016 +0200

----------------------------------------------------------------------
 .../persistence/NotificationGraphIterator.java  | 119 +++++++++++++++++++
 .../apache/usergrid/persistence/PathQuery.java  |  35 ++++--
 .../persistence/entities/Notification.java      |  62 +++++++++-
 .../notifications/NotificationDeviceFilter.java |  45 +++++++
 .../notifications/NotificationsService.java     |   6 +
 .../impl/ApplicationQueueManagerImpl.java       | 101 ++++++++++++++--
 6 files changed, 352 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/32ab5da0/stack/core/src/main/java/org/apache/usergrid/persistence/NotificationGraphIterator.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/NotificationGraphIterator.java b/stack/core/src/main/java/org/apache/usergrid/persistence/NotificationGraphIterator.java
new file mode 100644
index 0000000..b83f555
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/NotificationGraphIterator.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.persistence;
+
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Iterator;
+
+public class NotificationGraphIterator implements ResultsIterator, Iterable {
+
+    private static final Logger logger = LoggerFactory.getLogger(NotificationGraphIterator.class);
+
+
+    EntityManager entityManager;
+
+    private Iterator<EntityRef> source;
+    private Query query;
+    private Iterator currentIterator;
+
+
+    public NotificationGraphIterator(EntityManager entityManager,
+                                     Iterator<EntityRef> source,
+                                     Query query) {
+
+        this.entityManager = entityManager;
+        this.source = source;
+        this.query = query;
+
+    }
+
+    @Override
+    public Iterator iterator() {
+        return this;
+    }
+
+    @Override
+    public boolean hasNext() {
+        if (source == null) {
+            return false;
+        }
+        if (currentIterator != null && currentIterator.hasNext()) {
+            return true;
+        }
+        while (source.hasNext()) {
+            EntityRef ref = source.next();
+            Results r = getResultsFor(ref);
+            if (r.size() > 0) {
+                currentIterator = new PagingResultsIterator(r, query.getResultsLevel());
+                return currentIterator.hasNext();
+            }
+        }
+        currentIterator = null;
+        source = null;
+        return false;
+    }
+
+
+    @Override
+    public Object next() {
+
+
+
+
+        return (currentIterator != null) ? currentIterator.next() : null;
+    }
+
+    @Override
+    public boolean hasPages() {
+        return currentIterator != null && currentIterator instanceof ResultsIterator && ((ResultsIterator) currentIterator).hasPages();
+    }
+
+
+    private Results getResultsFor(EntityRef ref) {
+
+        try {
+
+            if (query.getCollection() != null) {
+
+                if(logger.isTraceEnabled()) {
+                    logger.trace("Fetching with refType: {}, collection: {} with no query",
+                        ref.getType(), query.getCollection());
+                }
+                return entityManager.searchCollection(ref, query.getCollection(), null);
+
+            } else {
+
+                if(logger.isTraceEnabled()) {
+                    logger.trace("Searching target entities with refType: {} for collection: {}  with no query",
+                        ref.getType(), query.getCollection());
+                }
+
+                query.setQl("select *");
+                return entityManager.searchTargetEntities(ref, query);
+
+            }
+
+
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/32ab5da0/stack/core/src/main/java/org/apache/usergrid/persistence/PathQuery.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/PathQuery.java b/stack/core/src/main/java/org/apache/usergrid/persistence/PathQuery.java
index 55839a6..215f6ac 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/PathQuery.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/PathQuery.java
@@ -22,7 +22,6 @@ import java.util.Iterator;
 import java.util.UUID;
 
 import org.apache.usergrid.persistence.Query.Level;
-import org.apache.usergrid.persistence.index.query.Identifier;
 import org.apache.usergrid.utils.InflectionUtils;
 
 
@@ -52,11 +51,10 @@ public class PathQuery<E> {
 
     /**
      * top level
-     *
-     * @param head the top-level entity
+     *  @param head the top-level entity
      * @param query the query - must have a collection or connectType value set
      */
-    public PathQuery( EntityRef head, Query query ) {
+    public PathQuery(EntityRef head, Query query) {
         if ( query.getCollection() == null && query.getConnectionType() == null ) {
             throw new IllegalArgumentException( "Query must have a collection or connectionType value" );
         }
@@ -93,7 +91,7 @@ public class PathQuery<E> {
                 return new PagingResultsIterator( getHeadResults( em ), query.getResultsLevel() );
             }
             else {
-                return new MultiQueryIterator( em, source.refIterator( em ), query );
+                return new MultiQueryIterator( em, source.refIterator( em, false), query );
             }
         }
         catch ( Exception e ) {
@@ -101,6 +99,24 @@ public class PathQuery<E> {
         }
     }
 
+    public Iterator<E> graphIterator( EntityManager em ) {
+        try {
+
+            if ( uuid != null && type != null ) {
+                return new PagingResultsIterator( getHeadResults( em ), query.getResultsLevel() );
+            }else {
+
+                return new NotificationGraphIterator(em, source.refIterator(em, true), query);
+            }
+
+        }
+        catch ( Exception e ) {
+            throw new RuntimeException( e );
+        }
+    }
+
+
+
 
     protected Results getHeadResults( EntityManager em ) throws Exception {
 
@@ -123,7 +139,7 @@ public class PathQuery<E> {
     }
 
 
-    protected Iterator refIterator( EntityManager em ) throws Exception {
+    protected Iterator refIterator(EntityManager em, boolean useGraph) throws Exception {
 
         if ( query.getQl() == null && query.getSingleNameOrEmailIdentifier() != null){
 
@@ -140,7 +156,12 @@ public class PathQuery<E> {
                 q = new Query( q );
                 q.setResultsLevel( Level.REFS );
             }
-            return new MultiQueryIterator( em, source.refIterator( em ), q );
+            if( useGraph){
+                return new NotificationGraphIterator( em, source.refIterator( em, true), q );
+            }else{
+                return new MultiQueryIterator( em, source.refIterator( em, false ), q );
+
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/32ab5da0/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notification.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notification.java b/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notification.java
index 6a6e3fa..aca10cf 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notification.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notification.java
@@ -26,6 +26,8 @@ import org.apache.usergrid.persistence.*;
 import org.apache.usergrid.persistence.annotations.EntityCollection;
 import org.apache.usergrid.persistence.annotations.EntityProperty;
 import org.apache.usergrid.persistence.index.query.Identifier;
+import org.apache.usergrid.utils.InflectionUtils;
+
 
 /**
  * The entity class for representing Notifications.
@@ -77,6 +79,10 @@ public class Notification extends TypedEntity {
     @EntityProperty
     protected Long expire;
 
+    /** Stores the number of devices processed */
+    @EntityProperty
+    protected int deviceProcessedCount;
+
     /** True if notification is canceled */
     @EntityProperty
     protected Boolean canceled;
@@ -89,6 +95,10 @@ public class Notification extends TypedEntity {
     @EntityProperty
     protected String priority;
 
+    /** Flag to signal Usergrid to use graph traversal + filtering to find devices  */
+    @EntityProperty
+    protected boolean useGraph;
+
     /** Error messages that may have been encountered by Usergrid when trying to process the notification */
     @EntityProperty
     protected String errorMessage;
@@ -104,6 +114,9 @@ public class Notification extends TypedEntity {
     @EntityProperty
     protected Map<String, Long> statistics;
 
+    @EntityProperty
+    protected Map<String, Object> filters;
+
 
     public Notification() {
         pathQuery = new PathTokens();
@@ -173,6 +186,15 @@ public class Notification extends TypedEntity {
     }
 
     @JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+    public int getDeviceProcessedCount() {
+        return deviceProcessedCount;
+    }
+
+    public void setDeviceProcessedCount(int deviceProcessedCount) {
+        this.deviceProcessedCount = deviceProcessedCount;
+    }
+
+    @JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
     public Boolean getCanceled() {
         return canceled;
     }
@@ -191,6 +213,15 @@ public class Notification extends TypedEntity {
     }
 
     @JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+    public boolean getUseGraph() {
+        return useGraph;
+    }
+
+    public void setUseGraph(boolean useGraph) {
+        this.useGraph = useGraph;
+    }
+
+    @JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
     public boolean getDebug() {
         return debug;
     }
@@ -252,6 +283,15 @@ public class Notification extends TypedEntity {
         this.statistics = statistics;
     }
 
+    @JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+    public Map<String, Object> getFilters() {
+        return filters;
+    }
+
+    public void setFilters(Map<String, Object> filters) {
+        this.filters = filters;
+    }
+
     public void updateStatistics(long sent, long errors) {
         if (this.statistics == null) {
             this.statistics = new HashMap<String, Long>(2);
@@ -341,7 +381,8 @@ public class Notification extends TypedEntity {
         @JsonIgnore
         public PathQuery<Device> buildPathQuery() {
             PathQuery pathQuery = null;
-            for (PathToken pathToken : getPathTokens()) {
+            List<PathToken> pathTokens = getPathTokens();
+            for (PathToken pathToken : pathTokens) {
                 String collection = pathToken.getCollection();
                 Query query = new Query();
                 if(pathToken.getQl() != null){
@@ -365,6 +406,25 @@ public class Notification extends TypedEntity {
 
                 if (pathQuery == null) {
                     pathQuery = new PathQuery(getApplicationRef(), query);
+
+                    if ( pathTokens.size() == 1 && collection.equals(InflectionUtils.pluralize(Group.ENTITY_TYPE) )){
+
+                        Query usersQuery = new Query();
+                        usersQuery.setQl("select *");
+                        usersQuery.setCollection("users");
+                        usersQuery.setLimit(100);
+
+                        Query devicesQuery = new Query();
+                        devicesQuery.setQl("select *");
+                        devicesQuery.setCollection("devices");
+                        usersQuery.setLimit(100);
+
+
+                        // build up the chain so the proper iterators can be used later
+                        pathQuery = pathQuery.chain( usersQuery ).chain( devicesQuery );
+
+                    }
+
                 } else {
                     pathQuery = pathQuery.chain(query);
                 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/32ab5da0/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationDeviceFilter.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationDeviceFilter.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationDeviceFilter.java
new file mode 100644
index 0000000..35700ea
--- /dev/null
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationDeviceFilter.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.services.notifications;
+
+
+public class NotificationDeviceFilter {
+
+
+    private String key;
+    private Object value;
+
+    public NotificationDeviceFilter(String key, Object value){
+
+        this.key = key;
+        this.value = value;
+
+    }
+
+    public Object getValue(){
+
+        return this.value;
+
+    }
+
+    public String getKey(){
+
+        return this.key;
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/32ab5da0/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
index 05c1cd7..50eb883 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
@@ -19,6 +19,7 @@ package org.apache.usergrid.services.notifications;
 
 import java.util.*;
 
+import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -84,6 +85,7 @@ public class NotificationsService extends AbstractCollectionService {
     private ServiceManagerFactory smf;
     private EntityManagerFactory emf;
     private QueueManagerFactory queueManagerFactory;
+    private EntityCollectionManagerFactory ecmf;
 
     public NotificationsService() {
         if (logger.isTraceEnabled()) {
@@ -139,9 +141,13 @@ public class NotificationsService extends AbstractCollectionService {
         try {
             validate(null, context.getPayload());
             Notification.PathTokens pathTokens = getPathTokens(context.getRequest().getOriginalParameters());
+
             // default saving of receipts
+            context.getProperties().put("filters", context.getProperties().getOrDefault("filters", new HashMap<>()));
+            context.getProperties().put("useGraph", context.getProperties().getOrDefault("useGraph", false));
             context.getProperties().put("saveReceipts", context.getProperties().getOrDefault("saveReceipts", true));
             context.getProperties().put("processingFinished", 0L); // defaulting processing finished to 0
+            context.getProperties().put("deviceProcessedCount", 0); // defaulting processing finished to 0
             context.getProperties().put("state", Notification.State.CREATED);
             context.getProperties().put("pathQuery", pathTokens);
             context.setOwner(sm.getApplication());

http://git-wip-us.apache.org/repos/asf/usergrid/blob/32ab5da0/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
index 2466164..487ea1f 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
@@ -19,7 +19,10 @@ package org.apache.usergrid.services.notifications.impl;
 import com.codahale.metrics.Meter;
 import org.apache.usergrid.batch.JobExecution;
 import org.apache.usergrid.persistence.*;
+import org.apache.usergrid.persistence.collection.EntityCollectionManager;
+import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
 import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
+import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
 import org.apache.usergrid.persistence.entities.*;
 import org.apache.usergrid.persistence.Query;
 import org.apache.usergrid.persistence.queue.QueueManager;
@@ -97,14 +100,21 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
         final AtomicInteger deviceCount = new AtomicInteger(); //count devices so you can make a judgement on batching
         final ConcurrentLinkedQueue<String> errorMessages = new ConcurrentLinkedQueue<>(); //build up list of issues
 
-
         //get devices in querystring, and make sure you have access
         if (pathQuery != null) {
             final HashMap<Object, ProviderAdapter> notifierMap = getAdapterMap();
             if (logger.isTraceEnabled()) {
                 logger.trace("notification {} start query", notification.getUuid());
             }
-            final Iterator<Device> iterator = pathQuery.iterator(em);
+
+
+            // the main iterator can use graph traversal or index querying
+            final Iterator<Device> iterator;
+            if( notification.getUseGraph()){
+                iterator = pathQuery.graphIterator(em);
+            }else{
+                iterator = pathQuery.iterator(em);
+            }
 
             //if there are more pages (defined by PAGE_SIZE) you probably want this to be async, also if this is already a job then don't reschedule
             if (iterator instanceof ResultsIterator && ((ResultsIterator) iterator).hasPages() && jobExecution == null) {
@@ -167,6 +177,7 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
 
             };
 
+            final Map<String, Object> filters = notification.getFilters();
 
             Observable processMessagesObservable = Observable.create(new IteratorObservable<Entity>(iterator))
                 .flatMap(entity -> {
@@ -180,10 +191,73 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
 
                 })
                 .distinct(ref -> ref.getUuid())
+                .flatMap( entityRef -> {
+
+                    return Observable.just(entityRef).flatMap( ref -> {
+
+                        if(logger.isTraceEnabled()){
+                            logger.trace("Loading device: {}", ref.getUuid());
+
+                        }
+                            try {
+                                return Observable.just(em.get(ref, Device.class));
+                            }
+                            catch (Exception e){
+
+                                return Observable.empty();
+
+                            }
+
+                        }).subscribeOn(Schedulers.io());
+
+
+                }, 50)
+                .filter( device -> {
+
+                    if(logger.isTraceEnabled()) {
+                        logger.trace("Filtering device: {}", device.getUuid());
+                    }
+
+
+                    if(notification.getUseGraph() && filters.size() > 0 ) {
+
+                        for (Map.Entry<String, Object> entry : filters.entrySet()) {
+
+                            if ((device.getDynamicProperties().get(entry.getKey()) != null &&
+                                device.getDynamicProperties().get(entry.getKey()).equals(entry.getValue())) ||
+
+                                (device.getProperties().get(entry.getKey()) != null &&
+                                    device.getProperties().get(entry.getKey()).equals(entry.getValue()))
+
+                                ) {
+
+
+                                return true;
+                            }
+
+                        }
+
+                        if(logger.isTraceEnabled()) {
+                            logger.trace("Push notification filter matched for notification {}, so removing from notification",
+                                device.getUuid(), notification.getUuid());
+                        }
+                        return false;
+
+
+                    }
+
+                    return true;
+
+                })
                 .map(sendMessageFunction)
                 .doOnNext( message -> {
                         try {
+
                             if(message.isPresent()){
+
+                                if(logger.isTraceEnabled()) {
+                                    logger.trace("Queueing notification message for device: {}", message.get().getDeviceId());
+                                }
                                 qm.sendMessage( message.get() );
                                 queueMeter.mark();
                             }
@@ -206,7 +280,10 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
 
                     try {
                         notification.setProcessingFinished(System.currentTimeMillis());
+                        notification.setDeviceProcessedCount(deviceCount.get());
                         em.update(notification);
+                        logger.info("{} devices processed for notification {}", deviceCount.get(), notification.getUuid());
+
                     } catch (Exception e) {
                         logger.error("Unable to set processing finished timestamp for notification");
                     }
@@ -569,9 +646,9 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
                 while( initial || resultSize >= LIMIT){
 
                     initial = false;
+
                     final List<EntityRef> myusers =  em.getCollection(ref, "users", start,
                         LIMIT, Query.Level.REFS, true).getRefs();
-
                     resultSize = myusers.size();
 
                     if(myusers.size() > 0){
@@ -579,13 +656,21 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
                     }
 
 
-                    // don't allow a single user to have more than 100 devices?
-                    for (EntityRef user : myusers) {
+                    Observable.from(myusers).flatMap( user -> {
+
+                        try {
+                            devices.addAll(em.getCollection(user, "devices", null, 100,
+                                Query.Level.REFS, true).getRefs());
+                        }catch (Exception e){
+                            logger.error ("Unable to fetch devices for user: {}", user.getUuid());
+                        }
+                        return Observable.from(Collections.singletonList(user));
+
+                    }, 50).toBlocking().lastOrDefault(null);
+
+
 
-                        devices.addAll( em.getCollection(user, "devices", null, 100,
-                            Query.Level.REFS, true).getRefs() );
 
-                    }
 
                 }