You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2014/08/22 01:38:43 UTC
[4/4] git commit: all compile except tests
all compile except tests
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/5586b33f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/5586b33f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/5586b33f
Branch: refs/heads/two-dot-o-notifications-queue
Commit: 5586b33f908cd7f44435180a8335560818ec48b3
Parents: 6db623b
Author: Shawn Feldman <sf...@apache.org>
Authored: Thu Aug 21 17:30:55 2014 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Thu Aug 21 17:30:55 2014 -0600
----------------------------------------------------------------------
.../notifications/NotificationsQueueManager.java | 8 +++-----
.../notifications/NotificationsService.java | 7 +++++--
.../services/notifications/QueueListener.java | 19 ++++++-------------
.../notifications/apns/APNsNotification.java | 4 ++--
4 files changed, 16 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5586b33f/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsQueueManager.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsQueueManager.java
index 0597728..34d4571 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsQueueManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsQueueManager.java
@@ -50,6 +50,7 @@ import java.util.concurrent.atomic.AtomicInteger;
*/
public class NotificationsQueueManager implements NotificationServiceProxy {
private static final String NOTIFICATION_CONCURRENT_BATCHES = "notification.concurrent.batches";
+ public static final String QUEUE_NAME = "notifications/queuelistener";
private static final Logger LOG = LoggerFactory.getLogger(NotificationsQueueManager.class);
@@ -153,7 +154,7 @@ public class NotificationsQueueManager implements NotificationServiceProxy {
public Entity call(Entity entity) {
try {
List<EntityRef> devicesRef = getDevices(entity); // resolve group
- String queueName = getJobQueueName(notification);
+
for (EntityRef deviceRef : devicesRef) {
long hash = MurmurHash.hash(deviceRef.getUuid());
if(sketch.estimateCount(hash)>0){ //look for duplicates
@@ -163,7 +164,7 @@ public class NotificationsQueueManager implements NotificationServiceProxy {
sketch.add(hash,1);
}
QueueMessage message = new QueueMessage(em.getApplicationId(),notification.getUuid(),deviceRef.getUuid());
- qm.postToQueue(queueName, message);
+ qm.postToQueue(QUEUE_NAME, message);
if(notification.getQueued() == null){
// update queued time
notification.setQueued(System.currentTimeMillis());
@@ -596,7 +597,4 @@ public class NotificationsQueueManager implements NotificationServiceProxy {
}
}
- private String getJobQueueName(EntityRef entityRef) {
- return utils.pluralize(entityRef.getType()) + "/" + entityRef.getUuid();
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5586b33f/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
index ae289ed..1a76b10 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
@@ -202,11 +202,14 @@ public class NotificationsService extends AbstractCollectionService {
}
Entity response = super.updateEntity(request, ref, payload);
+ notification = (Notification) response;
- Long deliver = (Long) payload.getProperty("deliver");
+ Long deliver = notification.getDeliver();
if (deliver != null) {
if (!deliver.equals(notification.getDeliver())) {
- notificationQueueManager.processBatchAndReschedule((Notification) response, null);
+ if(!notificationQueueManager.scheduleQueueJob(notification)){
+ notificationQueueManager.queueNotification(notification, null);
+ }
}
}
return response;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5586b33f/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
index b1dfbd5..18ea0ca 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
@@ -29,7 +29,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
-import rx.*;
import rx.Observable;
import javax.annotation.PostConstruct;
@@ -39,10 +38,9 @@ import java.util.concurrent.atomic.AtomicInteger;
@Component( "notificationsQueueListener" )
public class QueueListener {
public static int BATCH_SIZE = 1000;
-
+ public static int MAX_CONSECUTIVE_FAILS = 10;
public static final long MESSAGE_TRANSACTION_TIMEOUT = 5 * 60 * 1000;
- public static final String queuePath = "notifications/queuelistener";
private static final Logger LOG = LoggerFactory.getLogger(QueueListener.class);
@Autowired
@@ -82,7 +80,7 @@ public class QueueListener {
// run until there are no more active jobs
while ( true ) {
try {
- QueueResults results = getDeliveryBatch(1000);
+ QueueResults results = getDeliveryBatch();
List<Message> messages = results.getMessages();
HashMap<UUID,List<QueueMessage>> queueMap = new HashMap<>();
for(Message message : messages){
@@ -124,7 +122,7 @@ public class QueueListener {
consecutiveExceptions.set(0);
}catch (Exception ex){
LOG.error("failed to dequeue",ex);
- if(consecutiveExceptions.getAndIncrement() > 10){
+ if(consecutiveExceptions.getAndIncrement() > MAX_CONSECUTIVE_FAILS){
LOG.error("killing message listener; too many failures");
break;
}
@@ -132,16 +130,11 @@ public class QueueListener {
}
}
- public void queueMessage(QueueMessage message){
- queueManager.postToQueue(queuePath, message);
-
- }
-
- private QueueResults getDeliveryBatch(int batchSize) throws Exception {
+ private QueueResults getDeliveryBatch() throws Exception {
QueueQuery qq = new QueueQuery();
- qq.setLimit(batchSize);
+ qq.setLimit(BATCH_SIZE);
qq.setTimeout(this.MESSAGE_TRANSACTION_TIMEOUT);
- QueueResults results = queueManager.getFromQueue(queuePath, qq);
+ QueueResults results = queueManager.getFromQueue(NotificationsQueueManager.QUEUE_NAME, qq);
LOG.debug("got batch of {} devices", results.size());
return results;
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5586b33f/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/APNsNotification.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/APNsNotification.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/APNsNotification.java
index b580410..101ded2 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/APNsNotification.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/APNsNotification.java
@@ -49,7 +49,7 @@ public class APNsNotification extends SimpleApnsPushNotification {
try {
final byte[] token = TokenUtil.tokenStringToByteArray(providerId);
- return new APNsNotification(tracker, date.getTime(), token, payload, notification);
+ return new APNsNotification(tracker, date.getTime(), token, payload);
}catch(MalformedTokenStringException mtse){
throw new RuntimeException("Exception converting token",mtse);
}
@@ -62,7 +62,7 @@ public class APNsNotification extends SimpleApnsPushNotification {
* @param token
* @param payload
*/
- public APNsNotification(TaskTracker tracker, Date expiryTime, byte[] token, String payload,Notification notification) {
+ public APNsNotification(TaskTracker tracker, Date expiryTime, byte[] token, String payload) {
super(token, payload, expiryTime);
this.tracker = tracker;
}