You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2016/04/26 17:03:04 UTC
[40/50] [abbrv] usergrid git commit: Introduce a graph iterator for
segmenting push notifications.
Introduce a graph iterator for segmenting push notifications.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/32ab5da0
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/32ab5da0
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/32ab5da0
Branch: refs/heads/asf-site
Commit: 32ab5da0af66d2dfc4ed9fdb8ddf84a6b0231c03
Parents: f064c49
Author: Michael Russo <mr...@apigee.com>
Authored: Sat Apr 16 00:08:32 2016 +0200
Committer: Michael Russo <mr...@apigee.com>
Committed: Sat Apr 16 00:08:32 2016 +0200
----------------------------------------------------------------------
.../persistence/NotificationGraphIterator.java | 119 +++++++++++++++++++
.../apache/usergrid/persistence/PathQuery.java | 35 ++++--
.../persistence/entities/Notification.java | 62 +++++++++-
.../notifications/NotificationDeviceFilter.java | 45 +++++++
.../notifications/NotificationsService.java | 6 +
.../impl/ApplicationQueueManagerImpl.java | 101 ++++++++++++++--
6 files changed, 352 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/32ab5da0/stack/core/src/main/java/org/apache/usergrid/persistence/NotificationGraphIterator.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/NotificationGraphIterator.java b/stack/core/src/main/java/org/apache/usergrid/persistence/NotificationGraphIterator.java
new file mode 100644
index 0000000..b83f555
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/NotificationGraphIterator.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.persistence;
+
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Iterator;
+
+public class NotificationGraphIterator implements ResultsIterator, Iterable {
+
+ private static final Logger logger = LoggerFactory.getLogger(NotificationGraphIterator.class);
+
+
+ EntityManager entityManager;
+
+ private Iterator<EntityRef> source;
+ private Query query;
+ private Iterator currentIterator;
+
+
+ public NotificationGraphIterator(EntityManager entityManager,
+ Iterator<EntityRef> source,
+ Query query) {
+
+ this.entityManager = entityManager;
+ this.source = source;
+ this.query = query;
+
+ }
+
+ @Override
+ public Iterator iterator() {
+ return this;
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (source == null) {
+ return false;
+ }
+ if (currentIterator != null && currentIterator.hasNext()) {
+ return true;
+ }
+ while (source.hasNext()) {
+ EntityRef ref = source.next();
+ Results r = getResultsFor(ref);
+ if (r.size() > 0) {
+ currentIterator = new PagingResultsIterator(r, query.getResultsLevel());
+ return currentIterator.hasNext();
+ }
+ }
+ currentIterator = null;
+ source = null;
+ return false;
+ }
+
+
+ @Override
+ public Object next() {
+
+
+
+
+ return (currentIterator != null) ? currentIterator.next() : null;
+ }
+
+ @Override
+ public boolean hasPages() {
+ return currentIterator != null && currentIterator instanceof ResultsIterator && ((ResultsIterator) currentIterator).hasPages();
+ }
+
+
+ private Results getResultsFor(EntityRef ref) {
+
+ try {
+
+ if (query.getCollection() != null) {
+
+ if(logger.isTraceEnabled()) {
+ logger.trace("Fetching with refType: {}, collection: {} with no query",
+ ref.getType(), query.getCollection());
+ }
+ return entityManager.searchCollection(ref, query.getCollection(), null);
+
+ } else {
+
+ if(logger.isTraceEnabled()) {
+ logger.trace("Searching target entities with refType: {} for collection: {} with no query",
+ ref.getType(), query.getCollection());
+ }
+
+ query.setQl("select *");
+ return entityManager.searchTargetEntities(ref, query);
+
+ }
+
+
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/32ab5da0/stack/core/src/main/java/org/apache/usergrid/persistence/PathQuery.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/PathQuery.java b/stack/core/src/main/java/org/apache/usergrid/persistence/PathQuery.java
index 55839a6..215f6ac 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/PathQuery.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/PathQuery.java
@@ -22,7 +22,6 @@ import java.util.Iterator;
import java.util.UUID;
import org.apache.usergrid.persistence.Query.Level;
-import org.apache.usergrid.persistence.index.query.Identifier;
import org.apache.usergrid.utils.InflectionUtils;
@@ -52,11 +51,10 @@ public class PathQuery<E> {
/**
* top level
- *
- * @param head the top-level entity
+ * @param head the top-level entity
* @param query the query - must have a collection or connectType value set
*/
- public PathQuery( EntityRef head, Query query ) {
+ public PathQuery(EntityRef head, Query query) {
if ( query.getCollection() == null && query.getConnectionType() == null ) {
throw new IllegalArgumentException( "Query must have a collection or connectionType value" );
}
@@ -93,7 +91,7 @@ public class PathQuery<E> {
return new PagingResultsIterator( getHeadResults( em ), query.getResultsLevel() );
}
else {
- return new MultiQueryIterator( em, source.refIterator( em ), query );
+ return new MultiQueryIterator( em, source.refIterator( em, false), query );
}
}
catch ( Exception e ) {
@@ -101,6 +99,24 @@ public class PathQuery<E> {
}
}
+ public Iterator<E> graphIterator( EntityManager em ) {
+ try {
+
+ if ( uuid != null && type != null ) {
+ return new PagingResultsIterator( getHeadResults( em ), query.getResultsLevel() );
+ }else {
+
+ return new NotificationGraphIterator(em, source.refIterator(em, true), query);
+ }
+
+ }
+ catch ( Exception e ) {
+ throw new RuntimeException( e );
+ }
+ }
+
+
+
protected Results getHeadResults( EntityManager em ) throws Exception {
@@ -123,7 +139,7 @@ public class PathQuery<E> {
}
- protected Iterator refIterator( EntityManager em ) throws Exception {
+ protected Iterator refIterator(EntityManager em, boolean useGraph) throws Exception {
if ( query.getQl() == null && query.getSingleNameOrEmailIdentifier() != null){
@@ -140,7 +156,12 @@ public class PathQuery<E> {
q = new Query( q );
q.setResultsLevel( Level.REFS );
}
- return new MultiQueryIterator( em, source.refIterator( em ), q );
+ if( useGraph){
+ return new NotificationGraphIterator( em, source.refIterator( em, true), q );
+ }else{
+ return new MultiQueryIterator( em, source.refIterator( em, false ), q );
+
+ }
}
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/32ab5da0/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notification.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notification.java b/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notification.java
index 6a6e3fa..aca10cf 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notification.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notification.java
@@ -26,6 +26,8 @@ import org.apache.usergrid.persistence.*;
import org.apache.usergrid.persistence.annotations.EntityCollection;
import org.apache.usergrid.persistence.annotations.EntityProperty;
import org.apache.usergrid.persistence.index.query.Identifier;
+import org.apache.usergrid.utils.InflectionUtils;
+
/**
* The entity class for representing Notifications.
@@ -77,6 +79,10 @@ public class Notification extends TypedEntity {
@EntityProperty
protected Long expire;
+ /** Stores the number of devices processed */
+ @EntityProperty
+ protected int deviceProcessedCount;
+
/** True if notification is canceled */
@EntityProperty
protected Boolean canceled;
@@ -89,6 +95,10 @@ public class Notification extends TypedEntity {
@EntityProperty
protected String priority;
+ /** Flag to signal Usergrid to use graph traversal + filtering to find devices */
+ @EntityProperty
+ protected boolean useGraph;
+
/** Error messages that may have been encountered by Usergrid when trying to process the notification */
@EntityProperty
protected String errorMessage;
@@ -104,6 +114,9 @@ public class Notification extends TypedEntity {
@EntityProperty
protected Map<String, Long> statistics;
+ @EntityProperty
+ protected Map<String, Object> filters;
+
public Notification() {
pathQuery = new PathTokens();
@@ -173,6 +186,15 @@ public class Notification extends TypedEntity {
}
@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+ public int getDeviceProcessedCount() {
+ return deviceProcessedCount;
+ }
+
+ public void setDeviceProcessedCount(int deviceProcessedCount) {
+ this.deviceProcessedCount = deviceProcessedCount;
+ }
+
+ @JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
public Boolean getCanceled() {
return canceled;
}
@@ -191,6 +213,15 @@ public class Notification extends TypedEntity {
}
@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+ public boolean getUseGraph() {
+ return useGraph;
+ }
+
+ public void setUseGraph(boolean useGraph) {
+ this.useGraph = useGraph;
+ }
+
+ @JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
public boolean getDebug() {
return debug;
}
@@ -252,6 +283,15 @@ public class Notification extends TypedEntity {
this.statistics = statistics;
}
+ @JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+ public Map<String, Object> getFilters() {
+ return filters;
+ }
+
+ public void setFilters(Map<String, Object> filters) {
+ this.filters = filters;
+ }
+
public void updateStatistics(long sent, long errors) {
if (this.statistics == null) {
this.statistics = new HashMap<String, Long>(2);
@@ -341,7 +381,8 @@ public class Notification extends TypedEntity {
@JsonIgnore
public PathQuery<Device> buildPathQuery() {
PathQuery pathQuery = null;
- for (PathToken pathToken : getPathTokens()) {
+ List<PathToken> pathTokens = getPathTokens();
+ for (PathToken pathToken : pathTokens) {
String collection = pathToken.getCollection();
Query query = new Query();
if(pathToken.getQl() != null){
@@ -365,6 +406,25 @@ public class Notification extends TypedEntity {
if (pathQuery == null) {
pathQuery = new PathQuery(getApplicationRef(), query);
+
+ if ( pathTokens.size() == 1 && collection.equals(InflectionUtils.pluralize(Group.ENTITY_TYPE) )){
+
+ Query usersQuery = new Query();
+ usersQuery.setQl("select *");
+ usersQuery.setCollection("users");
+ usersQuery.setLimit(100);
+
+ Query devicesQuery = new Query();
+ devicesQuery.setQl("select *");
+ devicesQuery.setCollection("devices");
+ usersQuery.setLimit(100);
+
+
+ // build up the chain so the proper iterators can be used later
+ pathQuery = pathQuery.chain( usersQuery ).chain( devicesQuery );
+
+ }
+
} else {
pathQuery = pathQuery.chain(query);
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/32ab5da0/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationDeviceFilter.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationDeviceFilter.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationDeviceFilter.java
new file mode 100644
index 0000000..35700ea
--- /dev/null
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationDeviceFilter.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.services.notifications;
+
+
+public class NotificationDeviceFilter {
+
+
+ private String key;
+ private Object value;
+
+ public NotificationDeviceFilter(String key, Object value){
+
+ this.key = key;
+ this.value = value;
+
+ }
+
+ public Object getValue(){
+
+ return this.value;
+
+ }
+
+ public String getKey(){
+
+ return this.key;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/32ab5da0/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
index 05c1cd7..50eb883 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
@@ -19,6 +19,7 @@ package org.apache.usergrid.services.notifications;
import java.util.*;
+import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -84,6 +85,7 @@ public class NotificationsService extends AbstractCollectionService {
private ServiceManagerFactory smf;
private EntityManagerFactory emf;
private QueueManagerFactory queueManagerFactory;
+ private EntityCollectionManagerFactory ecmf;
public NotificationsService() {
if (logger.isTraceEnabled()) {
@@ -139,9 +141,13 @@ public class NotificationsService extends AbstractCollectionService {
try {
validate(null, context.getPayload());
Notification.PathTokens pathTokens = getPathTokens(context.getRequest().getOriginalParameters());
+
// default saving of receipts
+ context.getProperties().put("filters", context.getProperties().getOrDefault("filters", new HashMap<>()));
+ context.getProperties().put("useGraph", context.getProperties().getOrDefault("useGraph", false));
context.getProperties().put("saveReceipts", context.getProperties().getOrDefault("saveReceipts", true));
context.getProperties().put("processingFinished", 0L); // defaulting processing finished to 0
+ context.getProperties().put("deviceProcessedCount", 0); // defaulting processing finished to 0
context.getProperties().put("state", Notification.State.CREATED);
context.getProperties().put("pathQuery", pathTokens);
context.setOwner(sm.getApplication());
http://git-wip-us.apache.org/repos/asf/usergrid/blob/32ab5da0/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
index 2466164..487ea1f 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
@@ -19,7 +19,10 @@ package org.apache.usergrid.services.notifications.impl;
import com.codahale.metrics.Meter;
import org.apache.usergrid.batch.JobExecution;
import org.apache.usergrid.persistence.*;
+import org.apache.usergrid.persistence.collection.EntityCollectionManager;
+import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
+import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
import org.apache.usergrid.persistence.entities.*;
import org.apache.usergrid.persistence.Query;
import org.apache.usergrid.persistence.queue.QueueManager;
@@ -97,14 +100,21 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
final AtomicInteger deviceCount = new AtomicInteger(); //count devices so you can make a judgement on batching
final ConcurrentLinkedQueue<String> errorMessages = new ConcurrentLinkedQueue<>(); //build up list of issues
-
//get devices in querystring, and make sure you have access
if (pathQuery != null) {
final HashMap<Object, ProviderAdapter> notifierMap = getAdapterMap();
if (logger.isTraceEnabled()) {
logger.trace("notification {} start query", notification.getUuid());
}
- final Iterator<Device> iterator = pathQuery.iterator(em);
+
+
+ // the main iterator can use graph traversal or index querying
+ final Iterator<Device> iterator;
+ if( notification.getUseGraph()){
+ iterator = pathQuery.graphIterator(em);
+ }else{
+ iterator = pathQuery.iterator(em);
+ }
//if there are more pages (defined by PAGE_SIZE) you probably want this to be async, also if this is already a job then don't reschedule
if (iterator instanceof ResultsIterator && ((ResultsIterator) iterator).hasPages() && jobExecution == null) {
@@ -167,6 +177,7 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
};
+ final Map<String, Object> filters = notification.getFilters();
Observable processMessagesObservable = Observable.create(new IteratorObservable<Entity>(iterator))
.flatMap(entity -> {
@@ -180,10 +191,73 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
})
.distinct(ref -> ref.getUuid())
+ .flatMap( entityRef -> {
+
+ return Observable.just(entityRef).flatMap( ref -> {
+
+ if(logger.isTraceEnabled()){
+ logger.trace("Loading device: {}", ref.getUuid());
+
+ }
+ try {
+ return Observable.just(em.get(ref, Device.class));
+ }
+ catch (Exception e){
+
+ return Observable.empty();
+
+ }
+
+ }).subscribeOn(Schedulers.io());
+
+
+ }, 50)
+ .filter( device -> {
+
+ if(logger.isTraceEnabled()) {
+ logger.trace("Filtering device: {}", device.getUuid());
+ }
+
+
+ if(notification.getUseGraph() && filters.size() > 0 ) {
+
+ for (Map.Entry<String, Object> entry : filters.entrySet()) {
+
+ if ((device.getDynamicProperties().get(entry.getKey()) != null &&
+ device.getDynamicProperties().get(entry.getKey()).equals(entry.getValue())) ||
+
+ (device.getProperties().get(entry.getKey()) != null &&
+ device.getProperties().get(entry.getKey()).equals(entry.getValue()))
+
+ ) {
+
+
+ return true;
+ }
+
+ }
+
+ if(logger.isTraceEnabled()) {
+ logger.trace("Push notification filter matched for notification {}, so removing from notification",
+ device.getUuid(), notification.getUuid());
+ }
+ return false;
+
+
+ }
+
+ return true;
+
+ })
.map(sendMessageFunction)
.doOnNext( message -> {
try {
+
if(message.isPresent()){
+
+ if(logger.isTraceEnabled()) {
+ logger.trace("Queueing notification message for device: {}", message.get().getDeviceId());
+ }
qm.sendMessage( message.get() );
queueMeter.mark();
}
@@ -206,7 +280,10 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
try {
notification.setProcessingFinished(System.currentTimeMillis());
+ notification.setDeviceProcessedCount(deviceCount.get());
em.update(notification);
+ logger.info("{} devices processed for notification {}", deviceCount.get(), notification.getUuid());
+
} catch (Exception e) {
logger.error("Unable to set processing finished timestamp for notification");
}
@@ -569,9 +646,9 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
while( initial || resultSize >= LIMIT){
initial = false;
+
final List<EntityRef> myusers = em.getCollection(ref, "users", start,
LIMIT, Query.Level.REFS, true).getRefs();
-
resultSize = myusers.size();
if(myusers.size() > 0){
@@ -579,13 +656,21 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
}
- // don't allow a single user to have more than 100 devices?
- for (EntityRef user : myusers) {
+ Observable.from(myusers).flatMap( user -> {
+
+ try {
+ devices.addAll(em.getCollection(user, "devices", null, 100,
+ Query.Level.REFS, true).getRefs());
+ }catch (Exception e){
+ logger.error ("Unable to fetch devices for user: {}", user.getUuid());
+ }
+ return Observable.from(Collections.singletonList(user));
+
+ }, 50).toBlocking().lastOrDefault(null);
+
+
- devices.addAll( em.getCollection(user, "devices", null, 100,
- Query.Level.REFS, true).getRefs() );
- }
}