You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2016/04/26 17:02:47 UTC
[23/50] [abbrv] usergrid git commit: Push notification queuing to be
async.
Push notification queuing to be async.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/11274801
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/11274801
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/11274801
Branch: refs/heads/asf-site
Commit: 11274801059723b4a2264e01f45141080a47511a
Parents: 17e9b36
Author: Michael Russo <mr...@apigee.com>
Authored: Wed Apr 13 15:51:08 2016 +0200
Committer: Michael Russo <mr...@apigee.com>
Committed: Wed Apr 13 15:51:08 2016 +0200
----------------------------------------------------------------------
.../persistence/entities/Notification.java | 10 -----
.../impl/ApplicationQueueManagerImpl.java | 45 +++++++-------------
2 files changed, 16 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/11274801/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notification.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notification.java b/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notification.java
index 5c3ee89..dc7c989 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notification.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/entities/Notification.java
@@ -41,11 +41,6 @@ public class Notification extends TypedEntity {
NORMAL, HIGH
}
-
- /** Total count of notifications sent based on the API path/query */
- @EntityProperty
- protected int expectedCount;
-
/** The pathQuery/query that Usergrid used to idenitfy the devices to send the notification to */
@EntityProperty
private PathTokens pathQuery;
@@ -107,11 +102,6 @@ public class Notification extends TypedEntity {
}
@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
- public int getExpectedCount() { return expectedCount; }
-
- public void setExpectedCount(int expectedCount) { this.expectedCount = expectedCount; }
-
- @JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
public PathTokens getPathQuery(){
return pathQuery;
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/11274801/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
index fb4d64c..f819e39 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
@@ -20,10 +20,7 @@ import com.codahale.metrics.Meter;
import org.apache.usergrid.batch.JobExecution;
import org.apache.usergrid.persistence.*;
import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
-import org.apache.usergrid.persistence.entities.Device;
-import org.apache.usergrid.persistence.entities.Notification;
-import org.apache.usergrid.persistence.entities.Notifier;
-import org.apache.usergrid.persistence.entities.Receipt;
+import org.apache.usergrid.persistence.entities.*;
import org.apache.usergrid.persistence.Query;
import org.apache.usergrid.persistence.queue.QueueManager;
import org.apache.usergrid.persistence.queue.QueueMessage;
@@ -33,6 +30,7 @@ import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Subscriber;
import rx.functions.Func1;
+import rx.schedulers.Schedulers;
import java.io.IOException;
import java.util.*;
@@ -174,15 +172,18 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
//process up to 10 concurrently
Observable processMessagesObservable = Observable.create(new IteratorObservable<Entity>(iterator))
.flatMap(entity -> {
+
+ if(entity.getType().equals(Device.ENTITY_TYPE)){
+ return Observable.from(Collections.singletonList(entity));
+ }
+
+ // if it's not a device, drill down and get them
return Observable.from(getDevices(entity));
- }, 10)
+
+ }, 50)
.distinct(ref -> ref.getUuid())
.map(sendMessageFunction)
- .buffer(100)
- .doOnNext( applicationQueueMessages -> {
-
- applicationQueueMessages.forEach( message -> {
-
+ .doOnNext( message -> {
try {
if(message.isPresent()){
qm.sendMessage( message.get() );
@@ -201,13 +202,12 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
}
- });
-
})
.doOnError(throwable -> logger.error("Failed while trying to send notification", throwable));
- processMessagesObservable.toBlocking().lastOrDefault(null); // let this run and block the async thread, messages are queued
+ //TODO verify error handling here
+ processMessagesObservable.subscribeOn(Schedulers.io()).subscribe(); // fire the queuing into the background
}
@@ -221,7 +221,6 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
}
}
- notification.setExpectedCount(deviceCount.get());
notification.addProperties(properties);
em.update(notification);
@@ -491,14 +490,7 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
private boolean isOkToSend(Notification notification) {
- Map<String, Long> stats = notification.getStatistics();
- if (stats != null && notification.getExpectedCount() == (stats.get("sent") + stats.get("errors"))) {
- if (logger.isDebugEnabled()) {
- logger.debug("notification {} already processed. not sending.",
- notification.getUuid());
- }
- return false;
- }
+
if (notification.getCanceled() == Boolean.TRUE) {
if (logger.isDebugEnabled()) {
logger.debug("notification {} canceled. not sending.",
@@ -522,14 +514,9 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
final int LIMIT = Query.MID_LIMIT;
-
try {
- if ("device".equals(ref.getType())) {
-
- devices = Collections.singletonList(ref);
-
- } else if ("user".equals(ref.getType())) {
+ if (User.ENTITY_TYPE.equals(ref.getType())) {
UUID start = null;
boolean initial = true;
@@ -551,7 +538,7 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
}
- } else if ("group".equals(ref.getType())) {
+ } else if (Group.ENTITY_TYPE.equals(ref.getType())) {
UUID start = null;
boolean initial = true;