You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/05/28 21:36:43 UTC

[1/2] incubator-usergrid git commit: observable conversion

Repository: incubator-usergrid
Updated Branches:
  refs/heads/two-dot-o-dev e45728e40 -> e70f0472e


observable conversion


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/8aa793b4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/8aa793b4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/8aa793b4

Branch: refs/heads/two-dot-o-dev
Commit: 8aa793b4e29666f6474b0198c804337f2a57bd00
Parents: ceadc6c
Author: Shawn Feldman <sf...@apache.org>
Authored: Thu May 28 13:35:35 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Thu May 28 13:35:35 2015 -0600

----------------------------------------------------------------------
 .../asyncevents/AmazonAsyncEventService.java    |  4 +-
 .../persistence/queue/DefaultQueueManager.java  |  6 +-
 .../persistence/queue/QueueManager.java         |  4 +-
 .../queue/impl/SNSQueueManagerImpl.java         |  6 +-
 .../queue/impl/SQSQueueManagerImpl.java         |  6 +-
 .../persistence/queue/QueueManagerTest.java     |  8 +--
 .../services/notifications/QueueListener.java   |  2 +-
 .../services/queues/ImportQueueManager.java     |  5 +-
 .../usergrid/services/queues/QueueListener.java | 75 ++++++++++----------
 9 files changed, 62 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8aa793b4/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index f3602f3..fc13d85 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -144,7 +144,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
     /**
      * Take message from SQS
      */
-    public List<QueueMessage> take() {
+    private Observable<QueueMessage> take() {
 
         //SQS doesn't support more than 10
         final Timer.Context timer = this.readTimer.time();
@@ -376,7 +376,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
                             Timer.Context timer = readTimer.time();
 
                             try {
-                                drainList = take();
+                                drainList = take().toList().toBlocking().last();
 
                                 //emit our list in it's entity to hand off to a worker pool
                                 subscriber.onNext(drainList);

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8aa793b4/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java
index 6917803..dc5878c 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java
@@ -20,6 +20,8 @@
 
 package org.apache.usergrid.persistence.queue;
 
+import rx.Observable;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
@@ -32,7 +34,7 @@ import java.util.concurrent.ArrayBlockingQueue;
 public class DefaultQueueManager implements QueueManager {
     public ArrayBlockingQueue<QueueMessage> queue = new ArrayBlockingQueue<>(10000);
     @Override
-    public synchronized List<QueueMessage> getMessages(int limit, int transactionTimeout, int waitTime, Class klass) {
+    public synchronized Observable<QueueMessage> getMessages(int limit, int transactionTimeout, int waitTime, Class klass) {
         List<QueueMessage> returnQueue = new ArrayList<>();
         for(int i=0;i<limit;i++){
             if(!queue.isEmpty()){
@@ -41,7 +43,7 @@ public class DefaultQueueManager implements QueueManager {
                 break;
             }
         }
-        return returnQueue;
+        return Observable.from( returnQueue);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8aa793b4/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
index dd044d2..09ae95a 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
@@ -17,6 +17,8 @@
  */
 package org.apache.usergrid.persistence.queue;
 
+import rx.Observable;
+
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.List;
@@ -34,7 +36,7 @@ public interface QueueManager {
      * @param klass class to cast the return from
      * @return List of Queue Messages
      */
-    List<QueueMessage> getMessages(int limit,int transactionTimeout, int waitTime, Class klass);
+    Observable<QueueMessage> getMessages(int limit,int transactionTimeout, int waitTime, Class klass);
 
     /**
      * Commit the transaction

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8aa793b4/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
index 0f1661d..802c2ce 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
@@ -212,14 +212,14 @@ public class SNSQueueManagerImpl implements QueueManager {
     }
 
     @Override
-    public List<QueueMessage> getMessages(final int limit,
+    public rx.Observable<QueueMessage> getMessages(final int limit,
                                           final int transactionTimeout,
                                           final int waitTime,
                                           final Class klass) {
 
         if (sqs == null) {
             logger.error("SQS is null - was not initialized properly");
-            return new ArrayList<>();
+            return rx.Observable.empty();
         }
 
 
@@ -251,7 +251,7 @@ public class SNSQueueManagerImpl implements QueueManager {
             QueueMessage queueMessage = new QueueMessage(message.getMessageId(), message.getReceiptHandle(), body, message.getAttributes().get("type"));
             queueMessages.add(queueMessage);
         }
-        return queueMessages;
+        return rx.Observable.from( queueMessages);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8aa793b4/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
index e28e805..e6f16c8 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
@@ -173,14 +173,14 @@ public class SQSQueueManagerImpl implements QueueManager {
     }
 
     @Override
-    public List<QueueMessage> getMessages(final int limit,
+    public rx.Observable<QueueMessage> getMessages(final int limit,
                                           final int transactionTimeout,
                                           final int waitTime,
                                           final Class klass) {
 
         if (sqs == null) {
             logger.error("Sqs is null");
-            return new ArrayList<>();
+            return rx.Observable.empty();
         }
 
         String url = getQueue().getUrl();
@@ -212,7 +212,7 @@ public class SQSQueueManagerImpl implements QueueManager {
             queueMessages.add(queueMessage);
         }
 
-        return queueMessages;
+        return rx.Observable.from(queueMessages);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8aa793b4/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
index 452d328..33fa1f5 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
@@ -74,13 +74,13 @@ public class QueueManagerTest {
     public void send() throws IOException,ClassNotFoundException{
         String value = "bodytest";
         qm.sendMessage(value);
-        List<QueueMessage> messageList = qm.getMessages(1,5000,5000,String.class);
+        List<QueueMessage> messageList = qm.getMessages(1,5000,5000,String.class).toList().toBlocking().last();
         assertTrue(messageList.size() >= 1);
         for(QueueMessage message : messageList){
             assertTrue(message.getBody().equals(value));
             qm.commitMessage(message);
         }
-        messageList = qm.getMessages(1,5000,5000,String.class);
+        messageList = qm.getMessages(1,5000,5000,String.class).toList().toBlocking().last();
         assertTrue(messageList.size() <= 0);
 
     }
@@ -93,14 +93,14 @@ public class QueueManagerTest {
         List<Map<String,String>> bodies = new ArrayList<>();
         bodies.add(values);
         qm.sendMessages(bodies);
-        List<QueueMessage> messageList = qm.getMessages(1,5000,5000,values.getClass());
+        List<QueueMessage> messageList = qm.getMessages(1,5000,5000,values.getClass()).toList().toBlocking().last();
         assertTrue(messageList.size() >= 1);
         for(QueueMessage message : messageList){
             assertTrue(message.getBody().equals(values));
         }
         qm.commitMessages(messageList);
 
-        messageList = qm.getMessages(1,5000,5000,values.getClass());
+        messageList = qm.getMessages(1,5000,5000,values.getClass()).toList().toBlocking().last();
         assertTrue(messageList.size() <= 0);
 
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8aa793b4/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
index e9f64b4..91b3e00 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
@@ -156,7 +156,7 @@ public class QueueListener  {
             try {
 
                 Timer.Context timerContext = timer.time();
-                List<QueueMessage> messages = queueManager.getMessages(getBatchSize(), MESSAGE_TRANSACTION_TIMEOUT, 10000, ApplicationQueueMessage.class);
+                List<QueueMessage> messages = queueManager.getMessages(getBatchSize(), MESSAGE_TRANSACTION_TIMEOUT, 10000, ApplicationQueueMessage.class).toList().toBlocking().last();
                 LOG.info("retrieved batch of {} messages from queue {} ", messages.size(),queueName);
 
                 if (messages.size() > 0) {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8aa793b4/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java b/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java
index f23262e..5f42484 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java
@@ -26,6 +26,7 @@ import java.util.List;
 
 import org.apache.usergrid.persistence.queue.QueueManager;
 import org.apache.usergrid.persistence.queue.QueueMessage;
+import rx.Observable;
 
 
 /**
@@ -34,9 +35,9 @@ import org.apache.usergrid.persistence.queue.QueueMessage;
 public class ImportQueueManager implements QueueManager {
 
     @Override
-    public List<QueueMessage> getMessages( final int limit, final int transactionTimeout, final int waitTime,
+    public Observable<QueueMessage> getMessages( final int limit, final int transactionTimeout, final int waitTime,
                                            final Class klass ) {
-        return null;
+        return Observable.empty();
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8aa793b4/stack/services/src/main/java/org/apache/usergrid/services/queues/QueueListener.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/queues/QueueListener.java b/stack/services/src/main/java/org/apache/usergrid/services/queues/QueueListener.java
index b87904e..2d8dd7a 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/queues/QueueListener.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/queues/QueueListener.java
@@ -171,52 +171,55 @@ public abstract class QueueListener  {
 
 
         while ( true ) {
-            try {
+
 
                 Timer.Context timerContext = timer.time();
                 //Get the messages out of the queue.
                 //TODO: a model class to get generic queueMessages out of the queueManager. Ask Shawn what should go here.
-                List<QueueMessage> messages = queueManager.getMessages(getBatchSize(), MESSAGE_TRANSACTION_TIMEOUT, 5000, ImportQueueMessage.class);
-                LOG.info("retrieved batch of {} messages from queue {} ", messages.size(),queueName);
+                queueManager.getMessages(getBatchSize(), MESSAGE_TRANSACTION_TIMEOUT, 5000, ImportQueueMessage.class)
+                    .buffer(getBatchSize())
+                    .doOnNext(messages -> {
+                        try {
+                            LOG.info("retrieved batch of {} messages from queue {} ", messages.size(), queueName);
 
-                if (messages.size() > 0) {
+                            if (messages.size() > 0) {
 
-                    long now = System.currentTimeMillis();
-                    //TODO: make sure this has a way to determine which QueueListener needs to be used
-                    // ideally this is done by checking which type the messages have then
-                    // asking for a onMessage call.
-                    onMessage( messages );
+                                long now = System.currentTimeMillis();
+                                //TODO: make sure this has a way to determine which QueueListener needs to be used
+                                // ideally this is done by checking which type the messages have then
+                                // asking for a onMessage call.
+                                onMessage(messages);
 
-                    queueManager.commitMessages(messages);
+                                queueManager.commitMessages(messages);
 
-                    meter.mark(messages.size());
-                    LOG.info("sent batch {} messages duration {} ms", messages.size(),System.currentTimeMillis() - now);
+                                meter.mark(messages.size());
+                                LOG.info("sent batch {} messages duration {} ms", messages.size(), System.currentTimeMillis() - now);
 
-                    if(sleepBetweenRuns > 0) {
-                        LOG.info("sleep between rounds...sleep...{}", sleepBetweenRuns);
-                        Thread.sleep(sleepBetweenRuns);
-                    }
+                                if (sleepBetweenRuns > 0) {
+                                    LOG.info("sleep between rounds...sleep...{}", sleepBetweenRuns);
+                                    Thread.sleep(sleepBetweenRuns);
+                                }
 
-                }
-                else{
-                    LOG.info("no messages...sleep...{}", sleepWhenNoneFound);
-                    Thread.sleep(sleepWhenNoneFound);
-                }
-                timerContext.stop();
-                //send to the providers
-                consecutiveExceptions.set(0);
-            }catch (Exception ex){
-                LOG.error("failed to dequeue",ex);
-                try {
-                    long sleeptime = sleepWhenNoneFound*consecutiveExceptions.incrementAndGet();
-                    long maxSleep = 15000;
-                    sleeptime = sleeptime > maxSleep ? maxSleep : sleeptime ;
-                    LOG.info("sleeping due to failures {} ms", sleeptime);
-                    Thread.sleep(sleeptime);
-                }catch (InterruptedException ie){
-                    LOG.info("sleep interrupted");
-                }
-            }
+                            } else {
+                                LOG.info("no messages...sleep...{}", sleepWhenNoneFound);
+                                Thread.sleep(sleepWhenNoneFound);
+                            }
+                            timerContext.stop();
+                            //send to the providers
+                            consecutiveExceptions.set(0);
+                        } catch (Exception ex) {
+                            LOG.error("failed to dequeue", ex);
+                            try {
+                                long sleeptime = sleepWhenNoneFound * consecutiveExceptions.incrementAndGet();
+                                long maxSleep = 15000;
+                                sleeptime = sleeptime > maxSleep ? maxSleep : sleeptime;
+                                LOG.info("sleeping due to failures {} ms", sleeptime);
+                                Thread.sleep(sleeptime);
+                            } catch (InterruptedException ie) {
+                                LOG.info("sleep interrupted");
+                            }
+                        }
+                    }).toBlocking().last();
         }
     }
 


[2/2] incubator-usergrid git commit: Merge branch 'two-dot-o-dev' of https://git-wip-us.apache.org/repos/asf/incubator-usergrid into two-dot-o-dev

Posted by sf...@apache.org.
Merge branch 'two-dot-o-dev' of https://git-wip-us.apache.org/repos/asf/incubator-usergrid into two-dot-o-dev


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/e70f0472
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/e70f0472
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/e70f0472

Branch: refs/heads/two-dot-o-dev
Commit: e70f0472e2a1ae2391ca4954d2acd48c1eb1248e
Parents: 8aa793b e45728e
Author: Shawn Feldman <sf...@apache.org>
Authored: Thu May 28 13:36:34 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Thu May 28 13:36:34 2015 -0600

----------------------------------------------------------------------
 .../src/main/java/org/apache/usergrid/persistence/Query.java   | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)
----------------------------------------------------------------------