You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/08/14 23:09:18 UTC

[1/3] incubator-usergrid git commit: add queue depth

Repository: incubator-usergrid
Updated Branches:
  refs/heads/two-dot-o-dev 35430a59d -> daac07d2b


add queue depth


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/bb6ca8ed
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/bb6ca8ed
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/bb6ca8ed

Branch: refs/heads/two-dot-o-dev
Commit: bb6ca8edfed851cd03f23b96bb91bec0ee3b990a
Parents: 35430a5
Author: Shawn Feldman <sf...@apache.org>
Authored: Fri Aug 14 14:45:58 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Fri Aug 14 14:45:58 2015 -0600

----------------------------------------------------------------------
 .../asyncevents/AmazonAsyncEventService.java    |  2 +-
 .../asyncevents/AsyncEventService.java          |  2 ++
 .../persistence/queue/DefaultQueueManager.java  |  5 ++++
 .../persistence/queue/QueueManager.java         |  6 ++++
 .../queue/impl/SNSQueueManagerImpl.java         | 14 +++++++++
 .../queue/impl/SQSQueueManagerImpl.java         | 31 +++++++++-----------
 .../persistence/queue/QueueManagerTest.java     | 18 ++++++++++++
 .../org/apache/usergrid/rest/RootResource.java  | 17 ++++++++++-
 .../services/queues/ImportQueueManager.java     |  5 ++++
 9 files changed, 81 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bb6ca8ed/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index b71a549..ed106e2 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -80,7 +80,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
 
     // SQS maximum receive messages is 10
     private static final int MAX_TAKE = 10;
-    private static final String QUEUE_NAME = "es_queue";
+    public static final String QUEUE_NAME = "es_queue";
 
     private final QueueManager queue;
     private final QueueScope queueScope;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bb6ca8ed/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
index 1a5e865..7cce8b3 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
@@ -76,4 +76,6 @@ public interface AsyncEventService extends ReIndexAction {
 
 
 
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bb6ca8ed/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java
index dc5878c..c72e109 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/DefaultQueueManager.java
@@ -47,6 +47,11 @@ public class DefaultQueueManager implements QueueManager {
     }
 
     @Override
+    public long getQueueDepth() {
+        return queue.size();
+    }
+
+    @Override
     public void commitMessage(QueueMessage queueMessage) {
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bb6ca8ed/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
index 09ae95a..0ec2337 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
@@ -39,6 +39,12 @@ public interface QueueManager {
     Observable<QueueMessage> getMessages(int limit,int transactionTimeout, int waitTime, Class klass);
 
     /**
+     * get the queue depth
+     * @return
+     */
+    long getQueueDepth();
+
+    /**
      * Commit the transaction
      * @param queueMessage
      */

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bb6ca8ed/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
index f41d238..6d0e18b 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
@@ -381,6 +381,20 @@ public class SNSQueueManagerImpl implements QueueManager {
     }
 
     @Override
+    public long getQueueDepth() {
+        String key = "ApproximateNumberOfMessages";
+        try {
+            GetQueueAttributesResult result = sqs.getQueueAttributes(getReadQueue().getUrl(), Collections.singletonList(key));
+            String depthString = result.getAttributes().get(key);
+            return depthString != null ? Long.parseLong(depthString) : 0;
+        }catch (Exception e){
+            logger.error("Exception getting queue depth",e);
+            return -1;
+
+        }
+    }
+
+    @Override
     public void sendMessages(final List bodies) throws IOException {
 
         if (snsAsync == null) {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bb6ca8ed/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
index effa373..075e90c 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.*;
 import java.util.concurrent.ExecutionException;
 
+import com.amazonaws.services.sqs.model.*;
 import org.apache.usergrid.persistence.core.guicyfig.ClusterFig;
 import org.apache.usergrid.persistence.queue.util.AmazonNotificationUtils;
 import org.slf4j.Logger;
@@ -36,22 +37,6 @@ import org.apache.usergrid.persistence.queue.QueueScope;
 import com.amazonaws.regions.Region;
 import com.amazonaws.regions.Regions;
 import com.amazonaws.services.sqs.AmazonSQSClient;
-import com.amazonaws.services.sqs.model.BatchResultErrorEntry;
-import com.amazonaws.services.sqs.model.CreateQueueRequest;
-import com.amazonaws.services.sqs.model.CreateQueueResult;
-import com.amazonaws.services.sqs.model.DeleteMessageBatchRequest;
-import com.amazonaws.services.sqs.model.DeleteMessageBatchRequestEntry;
-import com.amazonaws.services.sqs.model.DeleteMessageBatchResult;
-import com.amazonaws.services.sqs.model.DeleteMessageRequest;
-import com.amazonaws.services.sqs.model.GetQueueUrlResult;
-import com.amazonaws.services.sqs.model.Message;
-import com.amazonaws.services.sqs.model.MessageAttributeValue;
-import com.amazonaws.services.sqs.model.QueueDoesNotExistException;
-import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
-import com.amazonaws.services.sqs.model.ReceiveMessageResult;
-import com.amazonaws.services.sqs.model.SendMessageBatchRequest;
-import com.amazonaws.services.sqs.model.SendMessageBatchRequestEntry;
-import com.amazonaws.services.sqs.model.SendMessageRequest;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.dataformat.smile.SmileFactory;
 import com.google.common.base.Preconditions;
@@ -147,7 +132,6 @@ public class SQSQueueManagerImpl implements QueueManager {
             //pretty print, disabling for speed
 //            mapper.enable(SerializationFeature.INDENT_OUTPUT);
             mapper.enableDefaultTypingAsProperty(ObjectMapper.DefaultTyping.JAVA_LANG_OBJECT, "@class");
-
             sqs = createClient();
 
         } catch (Exception e) {
@@ -219,6 +203,19 @@ public class SQSQueueManagerImpl implements QueueManager {
     }
 
     @Override
+    public long getQueueDepth() {
+        String key = "ApproximateNumberOfMessages";
+        try {
+            GetQueueAttributesResult result = sqs.getQueueAttributes(new GetQueueAttributesRequest().withAttributeNames(key));
+            String depthString = result.getAttributes().get(key);
+            return depthString != null ? Long.parseLong(depthString) : 0;
+        }catch (Exception e){
+            logger.error("Exception getting queue depth",e);
+            return -1;
+
+        }
+    }
+    @Override
     public void sendMessages(final List bodies) throws IOException {
 
         if (sqs == null) {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bb6ca8ed/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
index 3be02e1..e948015 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
@@ -105,5 +105,23 @@ public class QueueManagerTest {
 
     }
 
+    @Test
+    public void queueSize() throws IOException,ClassNotFoundException{
+        HashMap<String,String> values = new HashMap<>();
+        values.put("test", "Test");
+
+        List<Map<String,String>> bodies = new ArrayList<>();
+        bodies.add(values);
+        qm.sendMessages(bodies);
+        long depth = qm.getQueueDepth();
+        assertTrue(depth>0);
+        List<QueueMessage> messageList = qm.getMessages(1,5000,5000,values.getClass()).toList().toBlocking().last();
+        assertTrue(messageList.size() >= 1);
+        for(QueueMessage message : messageList){
+            assertTrue(message.getBody().equals(values));
+        }
+        qm.commitMessages(messageList);
+    }
+
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bb6ca8ed/stack/rest/src/main/java/org/apache/usergrid/rest/RootResource.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/RootResource.java b/stack/rest/src/main/java/org/apache/usergrid/rest/RootResource.java
index 5b5e711..989df26 100644
--- a/stack/rest/src/main/java/org/apache/usergrid/rest/RootResource.java
+++ b/stack/rest/src/main/java/org/apache/usergrid/rest/RootResource.java
@@ -37,8 +37,15 @@ import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.ResponseBuilder;
 import javax.ws.rs.core.UriInfo;
 
+import com.google.inject.Injector;
+import org.apache.usergrid.persistence.index.IndexFig;
 import org.apache.usergrid.persistence.index.IndexRefreshCommand;
 import org.apache.usergrid.persistence.index.query.Identifier;
+import org.apache.usergrid.persistence.queue.Queue;
+import org.apache.usergrid.persistence.queue.QueueManager;
+import org.apache.usergrid.persistence.queue.QueueManagerFactory;
+import org.apache.usergrid.persistence.queue.QueueScope;
+import org.apache.usergrid.persistence.queue.impl.QueueScopeImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -102,6 +109,9 @@ public class RootResource extends AbstractContextResource implements MetricProce
     @Autowired
     private UsergridSystemMonitor usergridSystemMonitor;
 
+    @Autowired
+    private Injector injector;
+
 
     public RootResource() {
     }
@@ -181,6 +191,10 @@ public class RootResource extends AbstractContextResource implements MetricProce
 
         ApiResponse response = createApiResponse();
 
+        QueueManagerFactory queueManagerFactory = injector.getInstance(QueueManagerFactory.class);
+        QueueScope queueScope = new QueueScopeImpl("es_queue", QueueScope.RegionImplementation.ALLREGIONS);
+        QueueManager queue = queueManagerFactory.getQueueManager(queueScope);
+
         if ( !ignoreError ) {
 
             if ( !emf.getEntityStoreHealth().equals( Health.GREEN )) {
@@ -205,8 +219,9 @@ public class RootResource extends AbstractContextResource implements MetricProce
         node.put( "cassandraStatus", emf.getEntityStoreHealth().toString() );
 
         // Core Persistence Query Index module status for Management App Index
-        EntityManager em = emf.getEntityManager(emf.getManagementAppId());
         node.put( "managementAppIndexStatus", emf.getIndexHealth().toString() );
+        node.put( "queueDepth", queue.getQueueDepth() );
+
 
         dumpMetrics(node);
         response.setProperty( "status", node );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bb6ca8ed/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java b/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java
index 5f42484..d74f688 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java
@@ -40,6 +40,11 @@ public class ImportQueueManager implements QueueManager {
         return Observable.empty();
     }
 
+    @Override
+    public long getQueueDepth() {
+        return 0;
+    }
+
 
     @Override
     public void commitMessage( final QueueMessage queueMessage ) {


[2/3] incubator-usergrid git commit: add queue depth

Posted by to...@apache.org.
add queue depth


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/ee6e087f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/ee6e087f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/ee6e087f

Branch: refs/heads/two-dot-o-dev
Commit: ee6e087f8b8acde9a27952a2d4a46ae80773559c
Parents: bb6ca8e
Author: Shawn Feldman <sf...@apache.org>
Authored: Fri Aug 14 14:48:32 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Fri Aug 14 14:48:32 2015 -0600

----------------------------------------------------------------------
 .../asyncevents/AmazonAsyncEventService.java            |  5 +++++
 .../corepersistence/asyncevents/AsyncEventService.java  |  5 +++++
 .../asyncevents/InMemoryAsyncEventService.java          | 12 +++++++++---
 .../java/org/apache/usergrid/rest/RootResource.java     | 10 +++++-----
 4 files changed, 24 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ee6e087f/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index ed106e2..46c7076 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -373,6 +373,11 @@ public class AmazonAsyncEventService implements AsyncEventService {
         offer( new EntityDeleteEvent( new EntityIdScope( applicationScope, entityId ) ) );
     }
 
+    @Override
+    public long getQueueDepth() {
+        return queue.getQueueDepth();
+    }
+
     public void handleEntityDelete(final QueueMessage message) {
 
         Preconditions.checkNotNull(message, "Queue Message cannot be null for handleEntityDelete");

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ee6e087f/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
index 7cce8b3..6d51679 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
@@ -75,6 +75,11 @@ public interface AsyncEventService extends ReIndexAction {
     void queueEntityDelete(final ApplicationScope applicationScope, final Id entityId);
 
 
+    /**
+     * current queue depth
+     * @return
+     */
+    long getQueueDepth();
 
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ee6e087f/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
index 67078dc..6a71b3e 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
@@ -79,13 +79,13 @@ public class InMemoryAsyncEventService implements AsyncEventService {
 
     @Override
     public void queueNewEdge( final ApplicationScope applicationScope, final Entity entity, final Edge newEdge ) {
-        run( eventBuilder.buildNewEdge( applicationScope, entity, newEdge ) );
+        run( eventBuilder.buildNewEdge(applicationScope, entity, newEdge) );
     }
 
 
     @Override
     public void queueDeleteEdge( final ApplicationScope applicationScope, final Edge edge ) {
-        run( eventBuilder.buildDeleteEdge( applicationScope, edge ) );
+        run( eventBuilder.buildDeleteEdge(applicationScope, edge) );
     }
 
 
@@ -103,7 +103,7 @@ public class InMemoryAsyncEventService implements AsyncEventService {
     public void index( final ApplicationScope applicationScope, final Id id, final long updatedSince ) {
         final EntityIndexOperation entityIndexOperation = new EntityIndexOperation( applicationScope, id, updatedSince );
 
-        run(eventBuilder.buildEntityIndex( entityIndexOperation ));
+        run(eventBuilder.buildEntityIndex(entityIndexOperation));
     }
 
     public void indexBatch(final List<EdgeScope> edges, final long updatedSince) {
@@ -125,4 +125,10 @@ public class InMemoryAsyncEventService implements AsyncEventService {
             observable.toBlocking().lastOrDefault(null);
         }
     }
+
+    @Override
+    public long getQueueDepth() {
+        return 0;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ee6e087f/stack/rest/src/main/java/org/apache/usergrid/rest/RootResource.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/RootResource.java b/stack/rest/src/main/java/org/apache/usergrid/rest/RootResource.java
index 989df26..7c17b7c 100644
--- a/stack/rest/src/main/java/org/apache/usergrid/rest/RootResource.java
+++ b/stack/rest/src/main/java/org/apache/usergrid/rest/RootResource.java
@@ -38,6 +38,7 @@ import javax.ws.rs.core.Response.ResponseBuilder;
 import javax.ws.rs.core.UriInfo;
 
 import com.google.inject.Injector;
+import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
 import org.apache.usergrid.persistence.index.IndexFig;
 import org.apache.usergrid.persistence.index.IndexRefreshCommand;
 import org.apache.usergrid.persistence.index.query.Identifier;
@@ -191,9 +192,8 @@ public class RootResource extends AbstractContextResource implements MetricProce
 
         ApiResponse response = createApiResponse();
 
-        QueueManagerFactory queueManagerFactory = injector.getInstance(QueueManagerFactory.class);
-        QueueScope queueScope = new QueueScopeImpl("es_queue", QueueScope.RegionImplementation.ALLREGIONS);
-        QueueManager queue = queueManagerFactory.getQueueManager(queueScope);
+        AsyncEventService eventService = injector.getInstance(AsyncEventService.class);
+
 
         if ( !ignoreError ) {
 
@@ -210,7 +210,7 @@ public class RootResource extends AbstractContextResource implements MetricProce
         ObjectNode node = JsonNodeFactory.instance.objectNode();
         node.put( "started", started );
         node.put( "uptime", System.currentTimeMillis() - started );
-        node.put( "version", usergridSystemMonitor.getBuildNumber() );
+        node.put( "version", usergridSystemMonitor.getBuildNumber());
 
         // Hector status, for backwards compatibility
         node.put("cassandraAvailable", usergridSystemMonitor.getIsCassandraAlive());
@@ -220,7 +220,7 @@ public class RootResource extends AbstractContextResource implements MetricProce
 
         // Core Persistence Query Index module status for Management App Index
         node.put( "managementAppIndexStatus", emf.getIndexHealth().toString() );
-        node.put( "queueDepth", queue.getQueueDepth() );
+        node.put( "queueDepth", eventService.getQueueDepth() );
 
 
         dumpMetrics(node);


[3/3] incubator-usergrid git commit: test queue depth matches

Posted by to...@apache.org.
test queue depth matches


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/daac07d2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/daac07d2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/daac07d2

Branch: refs/heads/two-dot-o-dev
Commit: daac07d2b6d5069cb10867ca33daeb3534dd1f6e
Parents: ee6e087
Author: Shawn Feldman <sf...@apache.org>
Authored: Fri Aug 14 15:02:51 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Fri Aug 14 15:02:51 2015 -0600

----------------------------------------------------------------------
 .../usergrid/persistence/queue/QueueManagerTest.java    | 12 +++++++++---
 1 file changed, 9 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/daac07d2/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
index e948015..f265620 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
@@ -39,6 +39,7 @@ import org.apache.usergrid.persistence.queue.impl.QueueScopeImpl;
 
 import com.google.inject.Inject;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assume.assumeTrue;
 
@@ -112,15 +113,20 @@ public class QueueManagerTest {
 
         List<Map<String,String>> bodies = new ArrayList<>();
         bodies.add(values);
+        long initialDepth = qm.getQueueDepth();
         qm.sendMessages(bodies);
         long depth = qm.getQueueDepth();
         assertTrue(depth>0);
-        List<QueueMessage> messageList = qm.getMessages(1,5000,5000,values.getClass()).toList().toBlocking().last();
-        assertTrue(messageList.size() >= 1);
+        List<QueueMessage> messageList = qm.getMessages(10,5000,5000,values.getClass()).toList().toBlocking().last();
+        assertTrue(messageList.size() <= 500);
         for(QueueMessage message : messageList){
             assertTrue(message.getBody().equals(values));
         }
-        qm.commitMessages(messageList);
+        if(messageList.size()>0) {
+            qm.commitMessages(messageList);
+        }
+        depth = qm.getQueueDepth();
+        assertEquals(initialDepth, depth);
     }