You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/08/14 23:09:18 UTC
[1/3] incubator-usergrid git commit: add queue depth
Repository: incubator-usergrid
Updated Branches:
refs/heads/two-dot-o-dev 35430a59d -> daac07d2b
add queue depth
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/bb6ca8ed
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/bb6ca8ed
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/bb6ca8ed
Branch: refs/heads/two-dot-o-dev
Commit: bb6ca8edfed851cd03f23b96bb91bec0ee3b990a
Parents: 35430a5
Author: Shawn Feldman <sf...@apache.org>
Authored: Fri Aug 14 14:45:58 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Fri Aug 14 14:45:58 2015 -0600
----------------------------------------------------------------------
.../asyncevents/AmazonAsyncEventService.java | 2 +-
.../asyncevents/AsyncEventService.java | 2 ++
.../persistence/queue/DefaultQueueManager.java | 5 ++++
.../persistence/queue/QueueManager.java | 6 ++++
.../queue/impl/SNSQueueManagerImpl.java | 14 +++++++++
.../queue/impl/SQSQueueManagerImpl.java | 31 +++++++++-----------
.../persistence/queue/QueueManagerTest.java | 18 ++++++++++++
.../org/apache/usergrid/rest/RootResource.java | 17 ++++++++++-
.../services/queues/ImportQueueManager.java | 5 ++++
9 files changed, 81 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bb6ca8ed/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index b71a549..ed106e2 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -80,7 +80,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
// SQS maximum receive messages is 10
private static final int MAX_TAKE = 10;
- private static final String QUEUE_NAME = "es_queue";
+ public static final String QUEUE_NAME = "es_queue";
private final QueueManager queue;
private final QueueScope queueScope;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bb6ca8ed/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
index 1a5e865..7cce8b3 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
@@ -76,4 +76,6 @@ public interface AsyncEventService extends ReIndexAction {
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bb6ca8ed/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java
index dc5878c..c72e109 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java
@@ -47,6 +47,11 @@ public class DefaultQueueManager implements QueueManager {
}
@Override
+ public long getQueueDepth() {
+ return queue.size();
+ }
+
+ @Override
public void commitMessage(QueueMessage queueMessage) {
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bb6ca8ed/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
index 09ae95a..0ec2337 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
@@ -39,6 +39,12 @@ public interface QueueManager {
Observable<QueueMessage> getMessages(int limit,int transactionTimeout, int waitTime, Class klass);
/**
+ * get the queue depth
+ * @return
+ */
+ long getQueueDepth();
+
+ /**
* Commit the transaction
* @param queueMessage
*/
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bb6ca8ed/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
index f41d238..6d0e18b 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
@@ -381,6 +381,20 @@ public class SNSQueueManagerImpl implements QueueManager {
}
@Override
+ public long getQueueDepth() {
+ String key = "ApproximateNumberOfMessages";
+ try {
+ GetQueueAttributesResult result = sqs.getQueueAttributes(getReadQueue().getUrl(), Collections.singletonList(key));
+ String depthString = result.getAttributes().get(key);
+ return depthString != null ? Long.parseLong(depthString) : 0;
+ }catch (Exception e){
+ logger.error("Exception getting queue depth",e);
+ return -1;
+
+ }
+ }
+
+ @Override
public void sendMessages(final List bodies) throws IOException {
if (snsAsync == null) {
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bb6ca8ed/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
index effa373..075e90c 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.*;
import java.util.concurrent.ExecutionException;
+import com.amazonaws.services.sqs.model.*;
import org.apache.usergrid.persistence.core.guicyfig.ClusterFig;
import org.apache.usergrid.persistence.queue.util.AmazonNotificationUtils;
import org.slf4j.Logger;
@@ -36,22 +37,6 @@ import org.apache.usergrid.persistence.queue.QueueScope;
import com.amazonaws.regions.Region;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.sqs.AmazonSQSClient;
-import com.amazonaws.services.sqs.model.BatchResultErrorEntry;
-import com.amazonaws.services.sqs.model.CreateQueueRequest;
-import com.amazonaws.services.sqs.model.CreateQueueResult;
-import com.amazonaws.services.sqs.model.DeleteMessageBatchRequest;
-import com.amazonaws.services.sqs.model.DeleteMessageBatchRequestEntry;
-import com.amazonaws.services.sqs.model.DeleteMessageBatchResult;
-import com.amazonaws.services.sqs.model.DeleteMessageRequest;
-import com.amazonaws.services.sqs.model.GetQueueUrlResult;
-import com.amazonaws.services.sqs.model.Message;
-import com.amazonaws.services.sqs.model.MessageAttributeValue;
-import com.amazonaws.services.sqs.model.QueueDoesNotExistException;
-import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
-import com.amazonaws.services.sqs.model.ReceiveMessageResult;
-import com.amazonaws.services.sqs.model.SendMessageBatchRequest;
-import com.amazonaws.services.sqs.model.SendMessageBatchRequestEntry;
-import com.amazonaws.services.sqs.model.SendMessageRequest;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.smile.SmileFactory;
import com.google.common.base.Preconditions;
@@ -147,7 +132,6 @@ public class SQSQueueManagerImpl implements QueueManager {
//pretty print, disabling for speed
// mapper.enable(SerializationFeature.INDENT_OUTPUT);
mapper.enableDefaultTypingAsProperty(ObjectMapper.DefaultTyping.JAVA_LANG_OBJECT, "@class");
-
sqs = createClient();
} catch (Exception e) {
@@ -219,6 +203,19 @@ public class SQSQueueManagerImpl implements QueueManager {
}
@Override
+ public long getQueueDepth() {
+ String key = "ApproximateNumberOfMessages";
+ try {
+ GetQueueAttributesResult result = sqs.getQueueAttributes(new GetQueueAttributesRequest().withAttributeNames(key));
+ String depthString = result.getAttributes().get(key);
+ return depthString != null ? Long.parseLong(depthString) : 0;
+ }catch (Exception e){
+ logger.error("Exception getting queue depth",e);
+ return -1;
+
+ }
+ }
+ @Override
public void sendMessages(final List bodies) throws IOException {
if (sqs == null) {
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bb6ca8ed/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
index 3be02e1..e948015 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
@@ -105,5 +105,23 @@ public class QueueManagerTest {
}
+ @Test
+ public void queueSize() throws IOException,ClassNotFoundException{
+ HashMap<String,String> values = new HashMap<>();
+ values.put("test", "Test");
+
+ List<Map<String,String>> bodies = new ArrayList<>();
+ bodies.add(values);
+ qm.sendMessages(bodies);
+ long depth = qm.getQueueDepth();
+ assertTrue(depth>0);
+ List<QueueMessage> messageList = qm.getMessages(1,5000,5000,values.getClass()).toList().toBlocking().last();
+ assertTrue(messageList.size() >= 1);
+ for(QueueMessage message : messageList){
+ assertTrue(message.getBody().equals(values));
+ }
+ qm.commitMessages(messageList);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bb6ca8ed/stack/rest/src/main/java/org/apache/usergrid/rest/RootResource.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/RootResource.java b/stack/rest/src/main/java/org/apache/usergrid/rest/RootResource.java
index 5b5e711..989df26 100644
--- a/stack/rest/src/main/java/org/apache/usergrid/rest/RootResource.java
+++ b/stack/rest/src/main/java/org/apache/usergrid/rest/RootResource.java
@@ -37,8 +37,15 @@ import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.ResponseBuilder;
import javax.ws.rs.core.UriInfo;
+import com.google.inject.Injector;
+import org.apache.usergrid.persistence.index.IndexFig;
import org.apache.usergrid.persistence.index.IndexRefreshCommand;
import org.apache.usergrid.persistence.index.query.Identifier;
+import org.apache.usergrid.persistence.queue.Queue;
+import org.apache.usergrid.persistence.queue.QueueManager;
+import org.apache.usergrid.persistence.queue.QueueManagerFactory;
+import org.apache.usergrid.persistence.queue.QueueScope;
+import org.apache.usergrid.persistence.queue.impl.QueueScopeImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -102,6 +109,9 @@ public class RootResource extends AbstractContextResource implements MetricProce
@Autowired
private UsergridSystemMonitor usergridSystemMonitor;
+ @Autowired
+ private Injector injector;
+
public RootResource() {
}
@@ -181,6 +191,10 @@ public class RootResource extends AbstractContextResource implements MetricProce
ApiResponse response = createApiResponse();
+ QueueManagerFactory queueManagerFactory = injector.getInstance(QueueManagerFactory.class);
+ QueueScope queueScope = new QueueScopeImpl("es_queue", QueueScope.RegionImplementation.ALLREGIONS);
+ QueueManager queue = queueManagerFactory.getQueueManager(queueScope);
+
if ( !ignoreError ) {
if ( !emf.getEntityStoreHealth().equals( Health.GREEN )) {
@@ -205,8 +219,9 @@ public class RootResource extends AbstractContextResource implements MetricProce
node.put( "cassandraStatus", emf.getEntityStoreHealth().toString() );
// Core Persistence Query Index module status for Management App Index
- EntityManager em = emf.getEntityManager(emf.getManagementAppId());
node.put( "managementAppIndexStatus", emf.getIndexHealth().toString() );
+ node.put( "queueDepth", queue.getQueueDepth() );
+
dumpMetrics(node);
response.setProperty( "status", node );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bb6ca8ed/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java b/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java
index 5f42484..d74f688 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java
@@ -40,6 +40,11 @@ public class ImportQueueManager implements QueueManager {
return Observable.empty();
}
+ @Override
+ public long getQueueDepth() {
+ return 0;
+ }
+
@Override
public void commitMessage( final QueueMessage queueMessage ) {
[2/3] incubator-usergrid git commit: add queue depth
Posted by to...@apache.org.
add queue depth
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/ee6e087f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/ee6e087f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/ee6e087f
Branch: refs/heads/two-dot-o-dev
Commit: ee6e087f8b8acde9a27952a2d4a46ae80773559c
Parents: bb6ca8e
Author: Shawn Feldman <sf...@apache.org>
Authored: Fri Aug 14 14:48:32 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Fri Aug 14 14:48:32 2015 -0600
----------------------------------------------------------------------
.../asyncevents/AmazonAsyncEventService.java | 5 +++++
.../corepersistence/asyncevents/AsyncEventService.java | 5 +++++
.../asyncevents/InMemoryAsyncEventService.java | 12 +++++++++---
.../java/org/apache/usergrid/rest/RootResource.java | 10 +++++-----
4 files changed, 24 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ee6e087f/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index ed106e2..46c7076 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -373,6 +373,11 @@ public class AmazonAsyncEventService implements AsyncEventService {
offer( new EntityDeleteEvent( new EntityIdScope( applicationScope, entityId ) ) );
}
+ @Override
+ public long getQueueDepth() {
+ return queue.getQueueDepth();
+ }
+
public void handleEntityDelete(final QueueMessage message) {
Preconditions.checkNotNull(message, "Queue Message cannot be null for handleEntityDelete");
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ee6e087f/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
index 7cce8b3..6d51679 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
@@ -75,6 +75,11 @@ public interface AsyncEventService extends ReIndexAction {
void queueEntityDelete(final ApplicationScope applicationScope, final Id entityId);
+ /**
+ * current queue depth
+ * @return
+ */
+ long getQueueDepth();
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ee6e087f/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
index 67078dc..6a71b3e 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
@@ -79,13 +79,13 @@ public class InMemoryAsyncEventService implements AsyncEventService {
@Override
public void queueNewEdge( final ApplicationScope applicationScope, final Entity entity, final Edge newEdge ) {
- run( eventBuilder.buildNewEdge( applicationScope, entity, newEdge ) );
+ run( eventBuilder.buildNewEdge(applicationScope, entity, newEdge) );
}
@Override
public void queueDeleteEdge( final ApplicationScope applicationScope, final Edge edge ) {
- run( eventBuilder.buildDeleteEdge( applicationScope, edge ) );
+ run( eventBuilder.buildDeleteEdge(applicationScope, edge) );
}
@@ -103,7 +103,7 @@ public class InMemoryAsyncEventService implements AsyncEventService {
public void index( final ApplicationScope applicationScope, final Id id, final long updatedSince ) {
final EntityIndexOperation entityIndexOperation = new EntityIndexOperation( applicationScope, id, updatedSince );
- run(eventBuilder.buildEntityIndex( entityIndexOperation ));
+ run(eventBuilder.buildEntityIndex(entityIndexOperation));
}
public void indexBatch(final List<EdgeScope> edges, final long updatedSince) {
@@ -125,4 +125,10 @@ public class InMemoryAsyncEventService implements AsyncEventService {
observable.toBlocking().lastOrDefault(null);
}
}
+
+ @Override
+ public long getQueueDepth() {
+ return 0;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ee6e087f/stack/rest/src/main/java/org/apache/usergrid/rest/RootResource.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/RootResource.java b/stack/rest/src/main/java/org/apache/usergrid/rest/RootResource.java
index 989df26..7c17b7c 100644
--- a/stack/rest/src/main/java/org/apache/usergrid/rest/RootResource.java
+++ b/stack/rest/src/main/java/org/apache/usergrid/rest/RootResource.java
@@ -38,6 +38,7 @@ import javax.ws.rs.core.Response.ResponseBuilder;
import javax.ws.rs.core.UriInfo;
import com.google.inject.Injector;
+import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
import org.apache.usergrid.persistence.index.IndexFig;
import org.apache.usergrid.persistence.index.IndexRefreshCommand;
import org.apache.usergrid.persistence.index.query.Identifier;
@@ -191,9 +192,8 @@ public class RootResource extends AbstractContextResource implements MetricProce
ApiResponse response = createApiResponse();
- QueueManagerFactory queueManagerFactory = injector.getInstance(QueueManagerFactory.class);
- QueueScope queueScope = new QueueScopeImpl("es_queue", QueueScope.RegionImplementation.ALLREGIONS);
- QueueManager queue = queueManagerFactory.getQueueManager(queueScope);
+ AsyncEventService eventService = injector.getInstance(AsyncEventService.class);
+
if ( !ignoreError ) {
@@ -210,7 +210,7 @@ public class RootResource extends AbstractContextResource implements MetricProce
ObjectNode node = JsonNodeFactory.instance.objectNode();
node.put( "started", started );
node.put( "uptime", System.currentTimeMillis() - started );
- node.put( "version", usergridSystemMonitor.getBuildNumber() );
+ node.put( "version", usergridSystemMonitor.getBuildNumber());
// Hector status, for backwards compatibility
node.put("cassandraAvailable", usergridSystemMonitor.getIsCassandraAlive());
@@ -220,7 +220,7 @@ public class RootResource extends AbstractContextResource implements MetricProce
// Core Persistence Query Index module status for Management App Index
node.put( "managementAppIndexStatus", emf.getIndexHealth().toString() );
- node.put( "queueDepth", queue.getQueueDepth() );
+ node.put( "queueDepth", eventService.getQueueDepth() );
dumpMetrics(node);
[3/3] incubator-usergrid git commit: test queue depth matches
Posted by to...@apache.org.
test queue depth matches
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/daac07d2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/daac07d2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/daac07d2
Branch: refs/heads/two-dot-o-dev
Commit: daac07d2b6d5069cb10867ca33daeb3534dd1f6e
Parents: ee6e087
Author: Shawn Feldman <sf...@apache.org>
Authored: Fri Aug 14 15:02:51 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Fri Aug 14 15:02:51 2015 -0600
----------------------------------------------------------------------
.../usergrid/persistence/queue/QueueManagerTest.java | 12 +++++++++---
1 file changed, 9 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/daac07d2/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
index e948015..f265620 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
@@ -39,6 +39,7 @@ import org.apache.usergrid.persistence.queue.impl.QueueScopeImpl;
import com.google.inject.Inject;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assume.assumeTrue;
@@ -112,15 +113,20 @@ public class QueueManagerTest {
List<Map<String,String>> bodies = new ArrayList<>();
bodies.add(values);
+ long initialDepth = qm.getQueueDepth();
qm.sendMessages(bodies);
long depth = qm.getQueueDepth();
assertTrue(depth>0);
- List<QueueMessage> messageList = qm.getMessages(1,5000,5000,values.getClass()).toList().toBlocking().last();
- assertTrue(messageList.size() >= 1);
+ List<QueueMessage> messageList = qm.getMessages(10,5000,5000,values.getClass()).toList().toBlocking().last();
+ assertTrue(messageList.size() <= 500);
for(QueueMessage message : messageList){
assertTrue(message.getBody().equals(values));
}
- qm.commitMessages(messageList);
+ if(messageList.size()>0) {
+ qm.commitMessages(messageList);
+ }
+ depth = qm.getQueueDepth();
+ assertEquals(initialDepth, depth);
}