You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2014/08/22 01:38:43 UTC

[4/4] git commit: all compile except tests

all compile except tests


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/5586b33f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/5586b33f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/5586b33f

Branch: refs/heads/two-dot-o-notifications-queue
Commit: 5586b33f908cd7f44435180a8335560818ec48b3
Parents: 6db623b
Author: Shawn Feldman <sf...@apache.org>
Authored: Thu Aug 21 17:30:55 2014 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Thu Aug 21 17:30:55 2014 -0600

----------------------------------------------------------------------
 .../notifications/NotificationsQueueManager.java |  8 +++-----
 .../notifications/NotificationsService.java      |  7 +++++--
 .../services/notifications/QueueListener.java    | 19 ++++++-------------
 .../notifications/apns/APNsNotification.java     |  4 ++--
 4 files changed, 16 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5586b33f/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsQueueManager.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsQueueManager.java
index 0597728..34d4571 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsQueueManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsQueueManager.java
@@ -50,6 +50,7 @@ import java.util.concurrent.atomic.AtomicInteger;
  */
 public class NotificationsQueueManager implements NotificationServiceProxy {
     private static final String NOTIFICATION_CONCURRENT_BATCHES = "notification.concurrent.batches";
+    public static final String QUEUE_NAME = "notifications/queuelistener";
 
     private static final Logger LOG = LoggerFactory.getLogger(NotificationsQueueManager.class);
 
@@ -153,7 +154,7 @@ public class NotificationsQueueManager implements NotificationServiceProxy {
                         public Entity call(Entity entity) {
                             try {
                                 List<EntityRef> devicesRef = getDevices(entity); // resolve group
-                                String queueName = getJobQueueName(notification);
+
                                 for (EntityRef deviceRef : devicesRef) {
                                     long hash = MurmurHash.hash(deviceRef.getUuid());
                                     if(sketch.estimateCount(hash)>0){ //look for duplicates
@@ -163,7 +164,7 @@ public class NotificationsQueueManager implements NotificationServiceProxy {
                                         sketch.add(hash,1);
                                     }
                                     QueueMessage message = new QueueMessage(em.getApplicationId(),notification.getUuid(),deviceRef.getUuid());
-                                    qm.postToQueue(queueName, message);
+                                    qm.postToQueue(QUEUE_NAME, message);
                                     if(notification.getQueued() == null){
                                         // update queued time
                                         notification.setQueued(System.currentTimeMillis());
@@ -596,7 +597,4 @@ public class NotificationsQueueManager implements NotificationServiceProxy {
         }
     }
 
-    private String getJobQueueName(EntityRef entityRef) {
-        return utils.pluralize(entityRef.getType()) + "/" + entityRef.getUuid();
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5586b33f/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
index ae289ed..1a76b10 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
@@ -202,11 +202,14 @@ public class NotificationsService extends AbstractCollectionService {
         }
 
         Entity response = super.updateEntity(request, ref, payload);
+        notification = (Notification) response;
 
-        Long deliver = (Long) payload.getProperty("deliver");
+        Long deliver = notification.getDeliver();
         if (deliver != null) {
             if (!deliver.equals(notification.getDeliver())) {
-                notificationQueueManager.processBatchAndReschedule((Notification) response, null);
+                if(!notificationQueueManager.scheduleQueueJob(notification)){
+                    notificationQueueManager.queueNotification(notification, null);
+                }
             }
         }
         return response;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5586b33f/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
index b1dfbd5..18ea0ca 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
@@ -29,7 +29,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
-import rx.*;
 import rx.Observable;
 
 import javax.annotation.PostConstruct;
@@ -39,10 +38,9 @@ import java.util.concurrent.atomic.AtomicInteger;
 @Component( "notificationsQueueListener" )
 public class QueueListener  {
     public static int BATCH_SIZE = 1000;
-
+    public static int MAX_CONSECUTIVE_FAILS = 10;
     public static final long MESSAGE_TRANSACTION_TIMEOUT =  5 * 60 * 1000;
 
-    public static final String queuePath = "notifications/queuelistener";
     private static final Logger LOG = LoggerFactory.getLogger(QueueListener.class);
 
     @Autowired
@@ -82,7 +80,7 @@ public class QueueListener  {
         // run until there are no more active jobs
         while ( true ) {
             try {
-                QueueResults results = getDeliveryBatch(1000);
+                QueueResults results = getDeliveryBatch();
                 List<Message> messages = results.getMessages();
                 HashMap<UUID,List<QueueMessage>> queueMap = new HashMap<>();
                 for(Message message : messages){
@@ -124,7 +122,7 @@ public class QueueListener  {
                 consecutiveExceptions.set(0);
             }catch (Exception ex){
                 LOG.error("failed to dequeue",ex);
-                if(consecutiveExceptions.getAndIncrement() > 10){
+                if(consecutiveExceptions.getAndIncrement() > MAX_CONSECUTIVE_FAILS){
                     LOG.error("killing message listener; too many failures");
                     break;
                 }
@@ -132,16 +130,11 @@ public class QueueListener  {
         }
     }
 
-    public void queueMessage(QueueMessage message){
-        queueManager.postToQueue(queuePath, message);
-
-    }
-
-    private QueueResults getDeliveryBatch(int batchSize) throws Exception {
+    private QueueResults getDeliveryBatch() throws Exception {
         QueueQuery qq = new QueueQuery();
-        qq.setLimit(batchSize);
+        qq.setLimit(BATCH_SIZE);
         qq.setTimeout(this.MESSAGE_TRANSACTION_TIMEOUT);
-        QueueResults results = queueManager.getFromQueue(queuePath, qq);
+        QueueResults results = queueManager.getFromQueue(NotificationsQueueManager.QUEUE_NAME, qq);
         LOG.debug("got batch of {} devices", results.size());
         return results;
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5586b33f/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/APNsNotification.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/APNsNotification.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/APNsNotification.java
index b580410..101ded2 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/APNsNotification.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/apns/APNsNotification.java
@@ -49,7 +49,7 @@ public class APNsNotification extends SimpleApnsPushNotification {
       try {
           final byte[] token = TokenUtil.tokenStringToByteArray(providerId);
 
-          return new APNsNotification(tracker, date.getTime(), token, payload, notification);
+          return new APNsNotification(tracker, date.getTime(), token, payload);
       }catch(MalformedTokenStringException mtse){
           throw new RuntimeException("Exception converting token",mtse);
       }
@@ -62,7 +62,7 @@ public class APNsNotification extends SimpleApnsPushNotification {
      * @param token
      * @param payload
      */
-    public APNsNotification(TaskTracker tracker, Date expiryTime, byte[] token, String payload,Notification notification) {
+    public APNsNotification(TaskTracker tracker, Date expiryTime, byte[] token, String payload) {
         super(token, payload, expiryTime);
         this.tracker = tracker;
     }