You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/05/28 21:36:43 UTC
[1/2] incubator-usergrid git commit: observable conversion
Repository: incubator-usergrid
Updated Branches:
refs/heads/two-dot-o-dev e45728e40 -> e70f0472e
observable conversion
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/8aa793b4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/8aa793b4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/8aa793b4
Branch: refs/heads/two-dot-o-dev
Commit: 8aa793b4e29666f6474b0198c804337f2a57bd00
Parents: ceadc6c
Author: Shawn Feldman <sf...@apache.org>
Authored: Thu May 28 13:35:35 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Thu May 28 13:35:35 2015 -0600
----------------------------------------------------------------------
.../asyncevents/AmazonAsyncEventService.java | 4 +-
.../persistence/queue/DefaultQueueManager.java | 6 +-
.../persistence/queue/QueueManager.java | 4 +-
.../queue/impl/SNSQueueManagerImpl.java | 6 +-
.../queue/impl/SQSQueueManagerImpl.java | 6 +-
.../persistence/queue/QueueManagerTest.java | 8 +--
.../services/notifications/QueueListener.java | 2 +-
.../services/queues/ImportQueueManager.java | 5 +-
.../usergrid/services/queues/QueueListener.java | 75 ++++++++++----------
9 files changed, 62 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8aa793b4/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index f3602f3..fc13d85 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -144,7 +144,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
/**
* Take message from SQS
*/
- public List<QueueMessage> take() {
+ private Observable<QueueMessage> take() {
//SQS doesn't support more than 10
final Timer.Context timer = this.readTimer.time();
@@ -376,7 +376,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
Timer.Context timer = readTimer.time();
try {
- drainList = take();
+ drainList = take().toList().toBlocking().last();
//emit our list in it's entity to hand off to a worker pool
subscriber.onNext(drainList);
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8aa793b4/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java
index 6917803..dc5878c 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java
@@ -20,6 +20,8 @@
package org.apache.usergrid.persistence.queue;
+import rx.Observable;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@@ -32,7 +34,7 @@ import java.util.concurrent.ArrayBlockingQueue;
public class DefaultQueueManager implements QueueManager {
public ArrayBlockingQueue<QueueMessage> queue = new ArrayBlockingQueue<>(10000);
@Override
- public synchronized List<QueueMessage> getMessages(int limit, int transactionTimeout, int waitTime, Class klass) {
+ public synchronized Observable<QueueMessage> getMessages(int limit, int transactionTimeout, int waitTime, Class klass) {
List<QueueMessage> returnQueue = new ArrayList<>();
for(int i=0;i<limit;i++){
if(!queue.isEmpty()){
@@ -41,7 +43,7 @@ public class DefaultQueueManager implements QueueManager {
break;
}
}
- return returnQueue;
+ return Observable.from( returnQueue);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8aa793b4/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
index dd044d2..09ae95a 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
@@ -17,6 +17,8 @@
*/
package org.apache.usergrid.persistence.queue;
+import rx.Observable;
+
import java.io.IOException;
import java.io.Serializable;
import java.util.List;
@@ -34,7 +36,7 @@ public interface QueueManager {
* @param klass class to cast the return from
* @return List of Queue Messages
*/
- List<QueueMessage> getMessages(int limit,int transactionTimeout, int waitTime, Class klass);
+ Observable<QueueMessage> getMessages(int limit,int transactionTimeout, int waitTime, Class klass);
/**
* Commit the transaction
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8aa793b4/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
index 0f1661d..802c2ce 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
@@ -212,14 +212,14 @@ public class SNSQueueManagerImpl implements QueueManager {
}
@Override
- public List<QueueMessage> getMessages(final int limit,
+ public rx.Observable<QueueMessage> getMessages(final int limit,
final int transactionTimeout,
final int waitTime,
final Class klass) {
if (sqs == null) {
logger.error("SQS is null - was not initialized properly");
- return new ArrayList<>();
+ return rx.Observable.empty();
}
@@ -251,7 +251,7 @@ public class SNSQueueManagerImpl implements QueueManager {
QueueMessage queueMessage = new QueueMessage(message.getMessageId(), message.getReceiptHandle(), body, message.getAttributes().get("type"));
queueMessages.add(queueMessage);
}
- return queueMessages;
+ return rx.Observable.from( queueMessages);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8aa793b4/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
index e28e805..e6f16c8 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
@@ -173,14 +173,14 @@ public class SQSQueueManagerImpl implements QueueManager {
}
@Override
- public List<QueueMessage> getMessages(final int limit,
+ public rx.Observable<QueueMessage> getMessages(final int limit,
final int transactionTimeout,
final int waitTime,
final Class klass) {
if (sqs == null) {
logger.error("Sqs is null");
- return new ArrayList<>();
+ return rx.Observable.empty();
}
String url = getQueue().getUrl();
@@ -212,7 +212,7 @@ public class SQSQueueManagerImpl implements QueueManager {
queueMessages.add(queueMessage);
}
- return queueMessages;
+ return rx.Observable.from(queueMessages);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8aa793b4/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
index 452d328..33fa1f5 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
@@ -74,13 +74,13 @@ public class QueueManagerTest {
public void send() throws IOException,ClassNotFoundException{
String value = "bodytest";
qm.sendMessage(value);
- List<QueueMessage> messageList = qm.getMessages(1,5000,5000,String.class);
+ List<QueueMessage> messageList = qm.getMessages(1,5000,5000,String.class).toList().toBlocking().last();
assertTrue(messageList.size() >= 1);
for(QueueMessage message : messageList){
assertTrue(message.getBody().equals(value));
qm.commitMessage(message);
}
- messageList = qm.getMessages(1,5000,5000,String.class);
+ messageList = qm.getMessages(1,5000,5000,String.class).toList().toBlocking().last();
assertTrue(messageList.size() <= 0);
}
@@ -93,14 +93,14 @@ public class QueueManagerTest {
List<Map<String,String>> bodies = new ArrayList<>();
bodies.add(values);
qm.sendMessages(bodies);
- List<QueueMessage> messageList = qm.getMessages(1,5000,5000,values.getClass());
+ List<QueueMessage> messageList = qm.getMessages(1,5000,5000,values.getClass()).toList().toBlocking().last();
assertTrue(messageList.size() >= 1);
for(QueueMessage message : messageList){
assertTrue(message.getBody().equals(values));
}
qm.commitMessages(messageList);
- messageList = qm.getMessages(1,5000,5000,values.getClass());
+ messageList = qm.getMessages(1,5000,5000,values.getClass()).toList().toBlocking().last();
assertTrue(messageList.size() <= 0);
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8aa793b4/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
index e9f64b4..91b3e00 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
@@ -156,7 +156,7 @@ public class QueueListener {
try {
Timer.Context timerContext = timer.time();
- List<QueueMessage> messages = queueManager.getMessages(getBatchSize(), MESSAGE_TRANSACTION_TIMEOUT, 10000, ApplicationQueueMessage.class);
+ List<QueueMessage> messages = queueManager.getMessages(getBatchSize(), MESSAGE_TRANSACTION_TIMEOUT, 10000, ApplicationQueueMessage.class).toList().toBlocking().last();
LOG.info("retrieved batch of {} messages from queue {} ", messages.size(),queueName);
if (messages.size() > 0) {
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8aa793b4/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java b/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java
index f23262e..5f42484 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java
@@ -26,6 +26,7 @@ import java.util.List;
import org.apache.usergrid.persistence.queue.QueueManager;
import org.apache.usergrid.persistence.queue.QueueMessage;
+import rx.Observable;
/**
@@ -34,9 +35,9 @@ import org.apache.usergrid.persistence.queue.QueueMessage;
public class ImportQueueManager implements QueueManager {
@Override
- public List<QueueMessage> getMessages( final int limit, final int transactionTimeout, final int waitTime,
+ public Observable<QueueMessage> getMessages( final int limit, final int transactionTimeout, final int waitTime,
final Class klass ) {
- return null;
+ return Observable.empty();
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8aa793b4/stack/services/src/main/java/org/apache/usergrid/services/queues/QueueListener.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/queues/QueueListener.java b/stack/services/src/main/java/org/apache/usergrid/services/queues/QueueListener.java
index b87904e..2d8dd7a 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/queues/QueueListener.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/queues/QueueListener.java
@@ -171,52 +171,55 @@ public abstract class QueueListener {
while ( true ) {
- try {
+
Timer.Context timerContext = timer.time();
//Get the messages out of the queue.
//TODO: a model class to get generic queueMessages out of the queueManager. Ask Shawn what should go here.
- List<QueueMessage> messages = queueManager.getMessages(getBatchSize(), MESSAGE_TRANSACTION_TIMEOUT, 5000, ImportQueueMessage.class);
- LOG.info("retrieved batch of {} messages from queue {} ", messages.size(),queueName);
+ queueManager.getMessages(getBatchSize(), MESSAGE_TRANSACTION_TIMEOUT, 5000, ImportQueueMessage.class)
+ .buffer(getBatchSize())
+ .doOnNext(messages -> {
+ try {
+ LOG.info("retrieved batch of {} messages from queue {} ", messages.size(), queueName);
- if (messages.size() > 0) {
+ if (messages.size() > 0) {
- long now = System.currentTimeMillis();
- //TODO: make sure this has a way to determine which QueueListener needs to be used
- // ideally this is done by checking which type the messages have then
- // asking for a onMessage call.
- onMessage( messages );
+ long now = System.currentTimeMillis();
+ //TODO: make sure this has a way to determine which QueueListener needs to be used
+ // ideally this is done by checking which type the messages have then
+ // asking for a onMessage call.
+ onMessage(messages);
- queueManager.commitMessages(messages);
+ queueManager.commitMessages(messages);
- meter.mark(messages.size());
- LOG.info("sent batch {} messages duration {} ms", messages.size(),System.currentTimeMillis() - now);
+ meter.mark(messages.size());
+ LOG.info("sent batch {} messages duration {} ms", messages.size(), System.currentTimeMillis() - now);
- if(sleepBetweenRuns > 0) {
- LOG.info("sleep between rounds...sleep...{}", sleepBetweenRuns);
- Thread.sleep(sleepBetweenRuns);
- }
+ if (sleepBetweenRuns > 0) {
+ LOG.info("sleep between rounds...sleep...{}", sleepBetweenRuns);
+ Thread.sleep(sleepBetweenRuns);
+ }
- }
- else{
- LOG.info("no messages...sleep...{}", sleepWhenNoneFound);
- Thread.sleep(sleepWhenNoneFound);
- }
- timerContext.stop();
- //send to the providers
- consecutiveExceptions.set(0);
- }catch (Exception ex){
- LOG.error("failed to dequeue",ex);
- try {
- long sleeptime = sleepWhenNoneFound*consecutiveExceptions.incrementAndGet();
- long maxSleep = 15000;
- sleeptime = sleeptime > maxSleep ? maxSleep : sleeptime ;
- LOG.info("sleeping due to failures {} ms", sleeptime);
- Thread.sleep(sleeptime);
- }catch (InterruptedException ie){
- LOG.info("sleep interrupted");
- }
- }
+ } else {
+ LOG.info("no messages...sleep...{}", sleepWhenNoneFound);
+ Thread.sleep(sleepWhenNoneFound);
+ }
+ timerContext.stop();
+ //send to the providers
+ consecutiveExceptions.set(0);
+ } catch (Exception ex) {
+ LOG.error("failed to dequeue", ex);
+ try {
+ long sleeptime = sleepWhenNoneFound * consecutiveExceptions.incrementAndGet();
+ long maxSleep = 15000;
+ sleeptime = sleeptime > maxSleep ? maxSleep : sleeptime;
+ LOG.info("sleeping due to failures {} ms", sleeptime);
+ Thread.sleep(sleeptime);
+ } catch (InterruptedException ie) {
+ LOG.info("sleep interrupted");
+ }
+ }
+ }).toBlocking().last();
}
}
[2/2] incubator-usergrid git commit: Merge branch 'two-dot-o-dev' of
https://git-wip-us.apache.org/repos/asf/incubator-usergrid into two-dot-o-dev
Posted by sf...@apache.org.
Merge branch 'two-dot-o-dev' of https://git-wip-us.apache.org/repos/asf/incubator-usergrid into two-dot-o-dev
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/e70f0472
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/e70f0472
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/e70f0472
Branch: refs/heads/two-dot-o-dev
Commit: e70f0472e2a1ae2391ca4954d2acd48c1eb1248e
Parents: 8aa793b e45728e
Author: Shawn Feldman <sf...@apache.org>
Authored: Thu May 28 13:36:34 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Thu May 28 13:36:34 2015 -0600
----------------------------------------------------------------------
.../src/main/java/org/apache/usergrid/persistence/Query.java | 6 +++++-
1 file changed, 5 insertions(+), 1 deletion(-)
----------------------------------------------------------------------