You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2016/09/30 21:18:49 UTC

[04/10] usergrid git commit: Complete message counter and add support for getQueueDepth() in QakkaQueueManager

Complete message counter and add support for getQueueDepth() in QakkaQueueManager


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/9306f12e
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/9306f12e
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/9306f12e

Branch: refs/heads/usergrid-1318-queue
Commit: 9306f12eea8cd8f0ad0b2ec4751cd8a9b9ba5382
Parents: 8b79fb8
Author: Dave Johnson <sn...@apache.org>
Authored: Wed Sep 21 08:34:28 2016 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Wed Sep 21 08:34:28 2016 -0400

----------------------------------------------------------------------
 .../usergrid/persistence/qakka/QakkaFig.java    |   9 +-
 .../usergrid/persistence/qakka/QakkaModule.java |  29 ++--
 .../qakka/core/QueueMessageManager.java         |   4 +
 .../core/impl/QueueMessageManagerImpl.java      |  13 +-
 .../qakka/distributed/actors/QueueActor.java    |  14 +-
 .../distributed/actors/QueueTimeouter.java      |   8 +
 .../qakka/distributed/actors/QueueWriter.java   | 138 ++++++++--------
 .../impl/DistributedQueueServiceImpl.java       |   6 +-
 .../MessageCounterSerialization.java            |   4 +-
 .../impl/MessageCounterSerializationImpl.java   | 159 +++++++++++++------
 .../queue/impl/QakkaQueueManager.java           |   4 +-
 .../qakka/core/QueueMessageManagerTest.java     |   2 +
 .../impl/MessageCounterSerializationTest.java   |  90 +++++++++++
 .../sharding/ShardCounterSerializationTest.java |   3 -
 .../queue/src/test/resources/qakka.properties   |   1 +
 15 files changed, 347 insertions(+), 137 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/9306f12e/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
index c66001d..c3f4189 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
@@ -52,7 +52,9 @@ public interface QakkaFig extends GuicyFig, Serializable {
 
     String QUEUE_GET_TIMEOUT                      = "queue.get.timeout.seconds";
 
-    String QUEUE_MAX_SHARD_COUNTER                = "queue.max.inmemory.shard.counter";
+    String QUEUE_MAX_SHARD_COUNTER                = "queue.max.inmemory.max.shard.counter";
+
+    String QUEUE_MAX_MESSAGE_CHANGES              = "queue.max.inmemory.max.message.changes";
 
     String QUEUE_SHARD_ALLOCATION_CHECK_FREQUENCY = "queue.shard.allocation.check.frequency.millis";
 
@@ -125,6 +127,11 @@ public interface QakkaFig extends GuicyFig, Serializable {
     @Default("100")
     long getMaxInMemoryShardCounter();
 
+    /** Once counter reaches this value, write it to permanent storage */
+    @Key(QUEUE_MAX_MESSAGE_CHANGES)
+    @Default("100")
+    long getMaxInMemoryMessageCounter();
+
     /** How often to check whether new shard is needed for each queue */
     @Key(QUEUE_SHARD_ALLOCATION_CHECK_FREQUENCY)
     @Default("5000")

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9306f12e/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaModule.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaModule.java
index d1d8d7e..e3113e1 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaModule.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaModule.java
@@ -37,7 +37,9 @@ import org.apache.usergrid.persistence.qakka.distributed.impl.QueueSenderRouterP
 import org.apache.usergrid.persistence.qakka.distributed.impl.QueueWriterRouterProducer;
 import org.apache.usergrid.persistence.qakka.serialization.auditlog.AuditLogSerialization;
 import org.apache.usergrid.persistence.qakka.serialization.auditlog.impl.AuditLogSerializationImpl;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.MessageCounterSerialization;
 import org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.impl.MessageCounterSerializationImpl;
 import org.apache.usergrid.persistence.qakka.serialization.queuemessages.impl.QueueMessageSerializationImpl;
 import org.apache.usergrid.persistence.qakka.serialization.queues.QueueSerialization;
 import org.apache.usergrid.persistence.qakka.serialization.queues.impl.QueueSerializationImpl;
@@ -76,23 +78,24 @@ public class QakkaModule extends AbstractModule {
 
         bind( App.class );
 
-        bind( CassandraClient.class ).to(           CassandraClientImpl.class );
-        bind( MetricsService.class ).to(            App.class );
+        bind( CassandraClient.class ).to(             CassandraClientImpl.class );
+        bind( MetricsService.class ).to(              App.class );
 
-        bind( QueueManager.class ).to(              QueueManagerImpl.class );
-        bind( QueueSerialization.class ).to(        QueueSerializationImpl.class );
+        bind( QueueManager.class ).to(                QueueManagerImpl.class );
+        bind( QueueSerialization.class ).to(          QueueSerializationImpl.class );
 
-        bind( QueueMessageManager.class ).to(       QueueMessageManagerImpl.class );
-        bind( QueueMessageSerialization.class ).to( QueueMessageSerializationImpl.class );
+        bind( QueueMessageManager.class ).to(         QueueMessageManagerImpl.class );
+        bind( QueueMessageSerialization.class ).to(   QueueMessageSerializationImpl.class );
 
-        bind( ShardSerialization.class ).to(        ShardSerializationImpl.class );
-        bind( ShardStrategy.class ).to(             ShardStrategyImpl.class );
+        bind( ShardSerialization.class ).to(          ShardSerializationImpl.class );
+        bind( ShardStrategy.class ).to(               ShardStrategyImpl.class );
 
-        bind( ShardCounterSerialization.class ).to( ShardCounterSerializationImpl.class );
+        bind( ShardCounterSerialization.class ).to(   ShardCounterSerializationImpl.class );
+        bind( MessageCounterSerialization.class ).to( MessageCounterSerializationImpl.class );
 
-        bind( TransferLogSerialization.class ).to(  TransferLogSerializationImpl.class );
-        bind( AuditLogSerialization.class ).to(     AuditLogSerializationImpl.class );
-        bind( DistributedQueueService.class ).to(   DistributedQueueServiceImpl.class );
+        bind( TransferLogSerialization.class ).to(    TransferLogSerializationImpl.class );
+        bind( AuditLogSerialization.class ).to(       AuditLogSerializationImpl.class );
+        bind( DistributedQueueService.class ).to(     DistributedQueueServiceImpl.class );
 
         bind( QueueActorRouterProducer.class );
         bind( QueueWriterRouterProducer.class );
@@ -110,6 +113,6 @@ public class QakkaModule extends AbstractModule {
         migrationBinder.addBinding().to( Key.get( ShardCounterSerialization.class ) );
         migrationBinder.addBinding().to( Key.get( ShardSerialization.class ) );
         migrationBinder.addBinding().to( Key.get( TransferLogSerialization.class ) );
-        //migrationBinder.addBinding().to( Key.get( MessageCounterSerialization.class ) );
+        migrationBinder.addBinding().to( Key.get( MessageCounterSerialization.class ) );
     }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9306f12e/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManager.java
index 15203d8..b540fce 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManager.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManager.java
@@ -19,6 +19,8 @@
 
 package org.apache.usergrid.persistence.qakka.core;
 
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
+
 import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.UUID;
@@ -80,4 +82,6 @@ public interface QueueMessageManager {
      * Get message from messages available or messages inflight storage.
      */
     QueueMessage getMessage(String queueName, UUID queueMessageId);
+
+    long getQueueDepth(String queueName);
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9306f12e/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java
index bcd0f58..691c1a6 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java
@@ -33,6 +33,7 @@ import org.apache.usergrid.persistence.qakka.exceptions.NotFoundException;
 import org.apache.usergrid.persistence.qakka.exceptions.QakkaRuntimeException;
 import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
 import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessageBody;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.MessageCounterSerialization;
 import org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization;
 import org.apache.usergrid.persistence.qakka.serialization.transferlog.TransferLogSerialization;
 import org.slf4j.Logger;
@@ -58,6 +59,7 @@ public class QueueMessageManagerImpl implements QueueMessageManager {
     private final DistributedQueueService   distributedQueueService;
     private final TransferLogSerialization  transferLogSerialization;
     private final URIStrategy uriStrategy;
+    private final MessageCounterSerialization messageCounterSerialization;
 
 
     @Inject
@@ -67,8 +69,8 @@ public class QueueMessageManagerImpl implements QueueMessageManager {
             QueueMessageSerialization queueMessageSerialization,
             DistributedQueueService   distributedQueueService,
             TransferLogSerialization  transferLogSerialization,
-            URIStrategy               uriStrategy
-            ) {
+            URIStrategy               uriStrategy,
+            MessageCounterSerialization messageCounterSerialization ) {
 
         this.actorSystemFig            = actorSystemFig;
         this.queueManager              = queueManager;
@@ -76,6 +78,7 @@ public class QueueMessageManagerImpl implements QueueMessageManager {
         this.distributedQueueService   = distributedQueueService;
         this.transferLogSerialization  = transferLogSerialization;
         this.uriStrategy               = uriStrategy;
+        this.messageCounterSerialization = messageCounterSerialization;
     }
 
 
@@ -296,4 +299,10 @@ public class QueueMessageManagerImpl implements QueueMessageManager {
         return queueMessage;
     }
 
+
+    @Override
+    public long getQueueDepth(String queueName) {
+        return messageCounterSerialization.getCounterValue( queueName, DatabaseQueueMessage.Type.DEFAULT );
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9306f12e/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
index 6fed13b..3b50711 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
@@ -32,6 +32,7 @@ import org.apache.usergrid.persistence.qakka.core.impl.InMemoryQueue;
 import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService;
 import org.apache.usergrid.persistence.qakka.distributed.messages.*;
 import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.MessageCounterSerialization;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.duration.Duration;
@@ -46,11 +47,13 @@ import java.util.concurrent.TimeUnit;
 public class QueueActor extends UntypedActor {
     private static final Logger logger = LoggerFactory.getLogger( QueueActor.class );
 
-    private final QakkaFig qakkaFig;
-    private final InMemoryQueue inMemoryQueue;
+    private final QakkaFig         qakkaFig;
+    private final InMemoryQueue    inMemoryQueue;
     private final QueueActorHelper queueActorHelper;
     private final MetricsService   metricsService;
 
+    private final MessageCounterSerialization messageCounterSerialization;
+
     private final Map<String, Cancellable> refreshSchedulersByQueueName = new HashMap<>();
     private final Map<String, Cancellable> timeoutSchedulersByQueueName = new HashMap<>();
     private final Map<String, Cancellable> shardAllocationSchedulersByQueueName = new HashMap<>();
@@ -68,6 +71,8 @@ public class QueueActor extends UntypedActor {
         inMemoryQueue    = injector.getInstance( InMemoryQueue.class );
         queueActorHelper = injector.getInstance( QueueActorHelper.class );
         metricsService   = injector.getInstance( MetricsService.class );
+
+        messageCounterSerialization = injector.getInstance( MessageCounterSerialization.class );
     }
 
     @Override
@@ -173,6 +178,11 @@ public class QueueActor extends UntypedActor {
                     }
                 }
 
+                messageCounterSerialization.decrementCounter(
+                    queueGetRequest.getQueueName(),
+                    DatabaseQueueMessage.Type.DEFAULT,
+                    queueMessages.size());
+
                 getSender().tell( new QueueGetResponse(
                         DistributedQueueService.Status.SUCCESS, queueMessages ), getSender() );
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9306f12e/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java
index fcd2161..7806d30 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java
@@ -33,6 +33,7 @@ import org.apache.usergrid.persistence.qakka.distributed.messages.QueueTimeoutRe
 import org.apache.usergrid.persistence.qakka.exceptions.QakkaRuntimeException;
 import org.apache.usergrid.persistence.qakka.serialization.MultiShardMessageIterator;
 import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.MessageCounterSerialization;
 import org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization;
 import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard;
 import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardIterator;
@@ -54,6 +55,8 @@ public class QueueTimeouter extends UntypedActor {
     private final QakkaFig qakkaFig;
     private final CassandraClient           cassandraClient;
 
+    private final MessageCounterSerialization messageCounterSerialization;
+
 
     public QueueTimeouter(String queueName ) {
         this.queueName = queueName;
@@ -65,6 +68,8 @@ public class QueueTimeouter extends UntypedActor {
         qakkaFig             = injector.getInstance( QakkaFig.class );
         metricsService       = injector.getInstance( MetricsService.class );
         cassandraClient      = injector.getInstance( CassandraClientImpl.class );
+
+        messageCounterSerialization = injector.getInstance( MessageCounterSerialization.class );
     }
 
 
@@ -134,6 +139,9 @@ public class QueueTimeouter extends UntypedActor {
 
                 if (count > 0) {
                     logger.debug( "Timed out {} messages for queue {}", count, queueName );
+
+                    messageCounterSerialization.decrementCounter(
+                        queueName, DatabaseQueueMessage.Type.DEFAULT, count);
                 }
 
             } finally {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9306f12e/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java
index 8657370..7166ef1 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java
@@ -30,6 +30,7 @@ import org.apache.usergrid.persistence.qakka.distributed.messages.QueueWriteResp
 import org.apache.usergrid.persistence.qakka.serialization.auditlog.AuditLog;
 import org.apache.usergrid.persistence.qakka.serialization.auditlog.AuditLogSerialization;
 import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.MessageCounterSerialization;
 import org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization;
 import org.apache.usergrid.persistence.qakka.serialization.transferlog.TransferLogSerialization;
 import org.slf4j.Logger;
@@ -48,6 +49,8 @@ public class QueueWriter extends UntypedActor {
     private final AuditLogSerialization     auditLogSerialization;
     private final MetricsService            metricsService;
 
+    private final MessageCounterSerialization messageCounterSerialization;
+
 
     public QueueWriter() {
 
@@ -57,96 +60,101 @@ public class QueueWriter extends UntypedActor {
         transferLogSerialization = injector.getInstance( TransferLogSerialization.class );
         auditLogSerialization    = injector.getInstance( AuditLogSerialization.class );
         metricsService           = injector.getInstance( MetricsService.class );
+
+        messageCounterSerialization = injector.getInstance( MessageCounterSerialization.class );
     }
 
     @Override
     public void onReceive(Object message) {
 
-            if (message instanceof QueueWriteRequest) {
-
-                Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.SEND_TIME_WRITE ).time();
+        if (message instanceof QueueWriteRequest) {
 
-                try {
-                    QueueWriteRequest qa = (QueueWriteRequest) message;
-
-                    UUID queueMessageId = QakkaUtils.getTimeUuid();
+            Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.SEND_TIME_WRITE ).time();
 
-                    // TODO: implement deliveryTime and expirationTime
+            try {
+                QueueWriteRequest qa = (QueueWriteRequest) message;
 
-                    DatabaseQueueMessage dbqm = null;
-                    long currentTime = System.currentTimeMillis();
+                UUID queueMessageId = QakkaUtils.getTimeUuid();
 
-                    try {
-                        dbqm = new DatabaseQueueMessage(
-                                qa.getMessageId(),
-                                DatabaseQueueMessage.Type.DEFAULT,
-                                qa.getQueueName(),
-                                qa.getDestRegion(),
-                                null,
-                                currentTime,
-                                currentTime,
-                                queueMessageId );
+                // TODO: implement deliveryTime and expirationTime
 
-                        messageSerialization.writeMessage( dbqm );
+                DatabaseQueueMessage dbqm = null;
+                long currentTime = System.currentTimeMillis();
 
-                        //logger.debug("Wrote queue message id {} to queue name {}",
-                        //        dbqm.getQueueMessageId(), dbqm.getQueueName());
+                try {
+                    dbqm = new DatabaseQueueMessage(
+                            qa.getMessageId(),
+                            DatabaseQueueMessage.Type.DEFAULT,
+                            qa.getQueueName(),
+                            qa.getDestRegion(),
+                            null,
+                            currentTime,
+                            currentTime,
+                            queueMessageId );
 
-                    } catch (Throwable t) {
-                        logger.debug("Error creating database queue message", t);
+                    messageSerialization.writeMessage( dbqm );
 
-                        auditLogSerialization.recordAuditLog(
-                                AuditLog.Action.SEND,
-                                AuditLog.Status.ERROR,
-                                qa.getQueueName(),
-                                qa.getDestRegion(),
-                                qa.getMessageId(),
-                                dbqm.getMessageId() );
+                    messageCounterSerialization.incrementCounter(
+                        qa.getQueueName(), DatabaseQueueMessage.Type.DEFAULT, 1);
 
-                        getSender().tell( new QueueWriteResponse(
-                                QueueWriter.WriteStatus.ERROR ), getSender() );
+                    //logger.debug("Wrote queue message id {} to queue name {}",
+                    //        dbqm.getQueueMessageId(), dbqm.getQueueName());
 
-                        return;
-                    }
+                } catch (Throwable t) {
+                    logger.debug("Error creating database queue message", t);
 
                     auditLogSerialization.recordAuditLog(
                             AuditLog.Action.SEND,
-                            AuditLog.Status.SUCCESS,
+                            AuditLog.Status.ERROR,
                             qa.getQueueName(),
                             qa.getDestRegion(),
                             qa.getMessageId(),
-                            dbqm.getQueueMessageId() );
-
-                    try {
-                        transferLogSerialization.removeTransferLog(
-                                qa.getQueueName(),
-                                qa.getSourceRegion(),
-                                qa.getDestRegion(),
-                                qa.getMessageId() );
-
-                        getSender().tell( new QueueWriteResponse(
-                                QueueWriter.WriteStatus.SUCCESS_XFERLOG_DELETED ), getSender() );
-
-                    } catch (Throwable e) {
-                        logger.debug( "Unable to delete transfer log for {} {} {} {}",
-                                qa.getQueueName(),
-                                qa.getSourceRegion(),
-                                qa.getDestRegion(),
-                                qa.getMessageId() );
-                        logger.debug("Error deleting transferlog", e);
-
-                        getSender().tell( new QueueWriteResponse(
-                                QueueWriter.WriteStatus.SUCCESS_XFERLOG_NOTDELETED ), getSender() );
-                    }
-
-                } finally {
-                    timer.close();
+                            dbqm.getMessageId() );
+
+                    getSender().tell( new QueueWriteResponse(
+                            QueueWriter.WriteStatus.ERROR ), getSender() );
+
+                    return;
                 }
 
-            } else {
-                unhandled( message );
+                auditLogSerialization.recordAuditLog(
+                        AuditLog.Action.SEND,
+                        AuditLog.Status.SUCCESS,
+                        qa.getQueueName(),
+                        qa.getDestRegion(),
+                        qa.getMessageId(),
+                        dbqm.getQueueMessageId() );
+
+                try {
+                    transferLogSerialization.removeTransferLog(
+                            qa.getQueueName(),
+                            qa.getSourceRegion(),
+                            qa.getDestRegion(),
+                            qa.getMessageId() );
+
+                    getSender().tell( new QueueWriteResponse(
+                            QueueWriter.WriteStatus.SUCCESS_XFERLOG_DELETED ), getSender() );
+
+                } catch (Throwable e) {
+                    logger.debug( "Unable to delete transfer log for {} {} {} {}",
+                            qa.getQueueName(),
+                            qa.getSourceRegion(),
+                            qa.getDestRegion(),
+                            qa.getMessageId() );
+                    logger.debug("Error deleting transferlog", e);
+
+                    getSender().tell( new QueueWriteResponse(
+                            QueueWriter.WriteStatus.SUCCESS_XFERLOG_NOTDELETED ), getSender() );
+                }
+
+            } finally {
+                timer.close();
             }
 
+        } else {
+            unhandled( message );
+        }
+
     }
 
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9306f12e/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
index be20cde..3d6a808 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
@@ -34,6 +34,7 @@ import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService
 import org.apache.usergrid.persistence.qakka.distributed.messages.*;
 import org.apache.usergrid.persistence.qakka.exceptions.QakkaRuntimeException;
 import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.MessageCounterSerialization;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.Await;
@@ -54,17 +55,20 @@ public class DistributedQueueServiceImpl implements DistributedQueueService {
     private final ActorSystemManager actorSystemManager;
     private final QueueManager queueManager;
     private final QakkaFig qakkaFig;
+    private final MessageCounterSerialization messageCounterSerialization;
 
 
     @Inject
     public DistributedQueueServiceImpl(
             ActorSystemManager actorSystemManager,
             QueueManager queueManager,
-            QakkaFig qakkaFig ) {
+            QakkaFig qakkaFig,
+            MessageCounterSerialization messageCounterSerialization ) {
 
         this.actorSystemManager = actorSystemManager;
         this.queueManager = queueManager;
         this.qakkaFig = qakkaFig;
+        this.messageCounterSerialization = messageCounterSerialization;
     }
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9306f12e/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/MessageCounterSerialization.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/MessageCounterSerialization.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/MessageCounterSerialization.java
index cbbf11f..6c81863 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/MessageCounterSerialization.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/MessageCounterSerialization.java
@@ -21,11 +21,11 @@ package org.apache.usergrid.persistence.qakka.serialization.queuemessages;
 import org.apache.usergrid.persistence.core.migration.schema.Migration;
 
 
-public interface MessageCounterSerialization  extends Migration {
+public interface MessageCounterSerialization extends Migration {
 
     void incrementCounter(String queueName, DatabaseQueueMessage.Type type, long increment);
 
-    void decrementCounter(String queueName, DatabaseQueueMessage.Type type, long increment);
+    void decrementCounter(String queueName, DatabaseQueueMessage.Type type, long decrement);
 
     long getCounterValue(String name, DatabaseQueueMessage.Type type);
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9306f12e/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java
index f198d05..0fdb47e 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java
@@ -32,8 +32,8 @@ import org.apache.usergrid.persistence.qakka.QakkaFig;
 import org.apache.usergrid.persistence.qakka.core.CassandraClient;
 import org.apache.usergrid.persistence.qakka.exceptions.NotFoundException;
 import org.apache.usergrid.persistence.qakka.exceptions.QakkaRuntimeException;
-import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard;
-import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardCounterSerialization;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.MessageCounterSerialization;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,49 +43,56 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 
 @Singleton
-public class MessageCounterSerializationImpl implements ShardCounterSerialization {
+public class MessageCounterSerializationImpl implements MessageCounterSerialization {
     private static final Logger logger = LoggerFactory.getLogger( MessageCounterSerializationImpl.class );
 
     private final CassandraClient cassandraClient;
     private final CassandraConfig cassandraConfig;
 
-    final static String TABLE_SHARD_COUNTERS       = "counters";
-    final static String COLUMN_QUEUE_NAME    = "queue_name";
-    final static String COLUMN_SHARD_ID      = "shard_id";
-    final static String COLUMN_COUNTER_VALUE = "counter_value";
-    final static String COLUMN_SHARD_TYPE    = "shard_type";
+    final static String TABLE_MESSAGE_COUNTERS = "message_counters";
+    final static String COLUMN_QUEUE_NAME      = "queue_name";
+    final static String COLUMN_COUNTER_VALUE   = "counter_value";
+    final static String COLUMN_MESSAGE_TYPE    = "message_type";
 
     // design note: counters based on DataStax example here:
     // https://docs.datastax.com/en/cql/3.1/cql/cql_using/use_counter_t.html
 
     static final String CQL =
-        "CREATE TABLE IF NOT EXISTS shard_counters ( " +
+        "CREATE TABLE IF NOT EXISTS message_counters ( " +
                 "counter_value counter, " +
                 "queue_name    varchar, " +
-                "shard_type    varchar, " +
-                "shard_id      bigint, " +
-                "PRIMARY KEY (queue_name, shard_type, shard_id) " +
+                "message_type  varchar, " +
+                "PRIMARY KEY (queue_name, message_type) " +
         ");";
 
 
-    final long maxInMemoryIncrement;
+    /** number of changes since last save to database */
+    final AtomicInteger numChanges = new AtomicInteger( 0 );
+
+    final long maxChangesBeforeSave;
 
     class InMemoryCount {
         long baseCount;
         final AtomicLong increment = new AtomicLong( 0L );
+        final AtomicLong decrement = new AtomicLong( 0L );
+
         InMemoryCount( long baseCount ) {
             this.baseCount = baseCount;
         }
-        public long value() {
-            return baseCount + increment.get();
-        }
         public AtomicLong getIncrement() {
             return increment;
         }
+        public AtomicLong getDecrement() {
+            return decrement;
+        }
+        public long value() {
+            return baseCount + increment.get() - decrement.get();
+        }
         void setBaseCount( long baseCount ) {
             this.baseCount = baseCount;
         }
@@ -95,64 +102,89 @@ public class MessageCounterSerializationImpl implements ShardCounterSerializatio
 
 
     @Inject
-    public MessageCounterSerializationImpl( CassandraConfig cassandraConfig, QakkaFig qakkaFig, CassandraClient cassandraClient ) {
+    public MessageCounterSerializationImpl(
+        CassandraConfig cassandraConfig, QakkaFig qakkaFig, CassandraClient cassandraClient ) {
+
         this.cassandraConfig = cassandraConfig;
-        this.maxInMemoryIncrement = qakkaFig.getMaxInMemoryShardCounter();
+        this.maxChangesBeforeSave = qakkaFig.getMaxInMemoryMessageCounter();
         this.cassandraClient = cassandraClient;
     }
 
 
+    private String buildKey( String queueName, DatabaseQueueMessage.Type type ) {
+        return queueName + "_" + type;
+    }
+
+
     @Override
-    public void incrementCounter(String queueName, Shard.Type type, long shardId, long increment ) {
+    public void incrementCounter(String queueName, DatabaseQueueMessage.Type type, long increment ) {
+
+        String key = buildKey( queueName, type );
 
-        String key = queueName + type + shardId;
         synchronized ( inMemoryCounters ) {
 
             if ( inMemoryCounters.get( key ) == null ) {
 
-                Long value = retrieveCounterFromStorage( queueName, type, shardId );
+                Long value = retrieveCounterFromStorage( queueName, type );
 
                 if ( value == null ) {
-                    incrementCounterInStorage( queueName, type, shardId, 0L );
+                    incrementCounterInStorage( queueName, type, 0L );
                     inMemoryCounters.put( key, new InMemoryCount( 0L ));
                 } else {
                     inMemoryCounters.put( key, new InMemoryCount( value ));
                 }
-                inMemoryCounters.get( key ).getIncrement().addAndGet( increment );
-                return;
             }
         }
 
         InMemoryCount inMemoryCount = inMemoryCounters.get( key );
+        inMemoryCount.getIncrement().addAndGet( increment );
 
-        synchronized ( inMemoryCount ) {
-            long totalIncrement = inMemoryCount.getIncrement().addAndGet( increment );
+        saveIfNeeded( queueName, type );
+    }
 
-            if (totalIncrement > maxInMemoryIncrement) {
-                incrementCounterInStorage( queueName, type, shardId, totalIncrement );
-                inMemoryCount.setBaseCount( retrieveCounterFromStorage( queueName, type, shardId ) );
-                inMemoryCount.getIncrement().set( 0L );
+
+    @Override
+    public void decrementCounter(String queueName, DatabaseQueueMessage.Type type, long decrement) {
+
+        String key = buildKey( queueName, type );
+
+        synchronized ( inMemoryCounters ) {
+
+            if ( inMemoryCounters.get( key ) == null ) {
+
+                Long value = retrieveCounterFromStorage( queueName, type );
+
+                if ( value == null ) {
+                    decrementCounterInStorage( queueName, type, 0L );
+                    inMemoryCounters.put( key, new InMemoryCount( 0L ));
+                } else {
+                    inMemoryCounters.put( key, new InMemoryCount( value ));
+                }
             }
         }
 
+        InMemoryCount inMemoryCount = inMemoryCounters.get( key );
+        inMemoryCount.getDecrement().addAndGet( decrement );
+
+        saveIfNeeded( queueName, type );
     }
 
 
     @Override
-    public long getCounterValue( String queueName, Shard.Type type, long shardId ) {
+    public long getCounterValue( String queueName, DatabaseQueueMessage.Type type ) {
 
-        String key = queueName + type + shardId;
+        String key = buildKey( queueName, type );
 
         synchronized ( inMemoryCounters ) {
 
             if ( inMemoryCounters.get( key ) == null ) {
 
-                Long value = retrieveCounterFromStorage( queueName, type, shardId );
+                Long value = retrieveCounterFromStorage( queueName, type );
 
                 if ( value == null ) {
                     throw new NotFoundException(
-                            MessageFormat.format( "No counter found for queue {0} type {1} shardId {2}",
-                                    queueName, type, shardId ));
+                            MessageFormat.format( "No counter found for queue {0} type {1}",
+                                    queueName, type ));
                 } else {
                     inMemoryCounters.put( key, new InMemoryCount( value ));
                 }
@@ -162,30 +194,39 @@ public class MessageCounterSerializationImpl implements ShardCounterSerializatio
         return inMemoryCounters.get( key ).value();
     }
 
-    void incrementCounterInStorage( String queueName, Shard.Type type, long shardId, long increment ) {
 
-        Statement update = QueryBuilder.update( TABLE_SHARD_COUNTERS )
+    void incrementCounterInStorage( String queueName, DatabaseQueueMessage.Type type, long increment ) {
+
+        Statement update = QueryBuilder.update( TABLE_MESSAGE_COUNTERS )
                 .where( QueryBuilder.eq(   COLUMN_QUEUE_NAME, queueName ) )
-                .and(   QueryBuilder.eq(   COLUMN_SHARD_TYPE, type.toString() ) )
-                .and(   QueryBuilder.eq(   COLUMN_SHARD_ID, shardId ) )
+                .and(   QueryBuilder.eq(   COLUMN_MESSAGE_TYPE, type.toString() ) )
                 .with(  QueryBuilder.incr( COLUMN_COUNTER_VALUE, increment ) );
         cassandraClient.getQueueMessageSession().execute( update );
     }
 
 
-    Long retrieveCounterFromStorage( String queueName, Shard.Type type, long shardId ) {
+    void decrementCounterInStorage( String queueName, DatabaseQueueMessage.Type type, long decrement ) {
 
-        Statement query = QueryBuilder.select().from( TABLE_SHARD_COUNTERS )
+        Statement update = QueryBuilder.update( TABLE_MESSAGE_COUNTERS )
+            .where( QueryBuilder.eq(   COLUMN_QUEUE_NAME, queueName ) )
+            .and(   QueryBuilder.eq(   COLUMN_MESSAGE_TYPE, type.toString() ) )
+            .with(  QueryBuilder.decr( COLUMN_COUNTER_VALUE, decrement ) );
+        cassandraClient.getQueueMessageSession().execute( update );
+    }
+
+
+    Long retrieveCounterFromStorage( String queueName, DatabaseQueueMessage.Type type ) {
+
+        Statement query = QueryBuilder.select().from( TABLE_MESSAGE_COUNTERS )
                 .where( QueryBuilder.eq( COLUMN_QUEUE_NAME, queueName ) )
-                .and( QueryBuilder.eq( COLUMN_SHARD_TYPE, type.toString()) )
-                .and( QueryBuilder.eq( COLUMN_SHARD_ID, shardId ) );
+                .and( QueryBuilder.eq( COLUMN_MESSAGE_TYPE, type.toString()) );
 
         ResultSet resultSet = cassandraClient.getQueueMessageSession().execute( query );
         List<Row> all = resultSet.all();
 
         if ( all.size() > 1 ) {
             throw new QakkaRuntimeException(
-                    "Multiple rows for counter " + queueName + " type " + type + " shardId " + shardId );
+                    "Multiple rows for counter " + queueName + " type " + type );
         }
         if ( all.isEmpty() ) {
             return null;
@@ -194,6 +235,32 @@ public class MessageCounterSerializationImpl implements ShardCounterSerializatio
     }
 
 
+    private void saveIfNeeded( String queueName, DatabaseQueueMessage.Type type ) {
+
+        String key = buildKey( queueName, type );
+
+        InMemoryCount inMemoryCount = inMemoryCounters.get( key );
+
+        synchronized ( inMemoryCount ) {
+
+            if ( numChanges.incrementAndGet() > maxChangesBeforeSave ) {
+
+                long totalIncrement = inMemoryCount.getIncrement().get();
+                incrementCounterInStorage( queueName, type, totalIncrement );
+
+                long totalDecrement = inMemoryCount.getDecrement().get();
+                decrementCounterInStorage( queueName, type, totalDecrement );
+
+                inMemoryCount.setBaseCount( retrieveCounterFromStorage( queueName, type ) );
+                inMemoryCount.getIncrement().set( 0L );
+                inMemoryCount.getDecrement().set( 0L );
+
+                numChanges.set( 0 );
+            }
+        }
+    }
+
+
     @Override
     public Collection<MultiTenantColumnFamilyDefinition> getColumnFamilies() {
         return Collections.EMPTY_LIST;
@@ -201,8 +268,8 @@ public class MessageCounterSerializationImpl implements ShardCounterSerializatio
 
     @Override
     public Collection<TableDefinition> getTables() {
-        return Collections.singletonList(
-            new TableDefinitionStringImpl( cassandraConfig.getApplicationLocalKeyspace(), TABLE_SHARD_COUNTERS, CQL ) );
+        return Collections.singletonList( new TableDefinitionStringImpl(
+            cassandraConfig.getApplicationLocalKeyspace(), TABLE_MESSAGE_COUNTERS, CQL ) );
     }
 
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9306f12e/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java
index 0eb609d..f3cae86 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java
@@ -24,6 +24,7 @@ import com.google.inject.assistedinject.Assisted;
 import org.apache.usergrid.persistence.qakka.QakkaFig;
 import org.apache.usergrid.persistence.qakka.core.*;
 import org.apache.usergrid.persistence.qakka.exceptions.QakkaRuntimeException;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.MessageCounterSerialization;
 import org.apache.usergrid.persistence.queue.LegacyQueueFig;
 import org.apache.usergrid.persistence.queue.LegacyQueueManager;
 import org.apache.usergrid.persistence.queue.LegacyQueueMessage;
@@ -139,10 +140,9 @@ public class QakkaQueueManager implements LegacyQueueManager {
         return messages;
     }
 
-
     @Override
     public long getQueueDepth() {
-        return 0;
+        return queueMessageManager.getQueueDepth( scope.getName() );
     }
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9306f12e/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
index 0413f81..3225a66 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
@@ -177,6 +177,8 @@ public class QueueMessageManagerTest extends AbstractTest {
             }
         }
 
+        Assert.assertEquals( numMessages, qmm.getQueueDepth( queueName ) );
+
         // get all messages from queue
 
         List<QueueMessage> messages = qmm.getNextMessages( queueName, numMessages );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9306f12e/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationTest.java
new file mode 100644
index 0000000..a4ea0f1
--- /dev/null
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.serialization.queuemessages.impl;
+
+import com.google.inject.Injector;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.usergrid.persistence.qakka.AbstractTest;
+import org.apache.usergrid.persistence.qakka.exceptions.NotFoundException;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.MessageCounterSerialization;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.junit.Assert.fail;
+
+
+/**
+ * Created by Dave Johnson (snoopdave@apache.org) on 9/20/16.
+ */
+public class MessageCounterSerializationTest extends AbstractTest {
+
+    @Test
+    public void testBasicOperation() {
+
+        Injector injector = getInjector();
+        MessageCounterSerialization mcs = injector.getInstance( MessageCounterSerialization.class );
+
+        String queueName = "mcst_queue_" + RandomStringUtils.randomAlphanumeric( 20 );
+
+        try {
+            mcs.getCounterValue( queueName, DatabaseQueueMessage.Type.DEFAULT );
+            fail("Should have throw NotFoundException");
+        } catch ( NotFoundException expected ) {
+            // pass
+        }
+
+        for ( int i=0; i<10; i++ ) {
+            mcs.incrementCounter( queueName, DatabaseQueueMessage.Type.DEFAULT, 1 );
+            Assert.assertEquals( i+1, mcs.getCounterValue( queueName, DatabaseQueueMessage.Type.DEFAULT ) );
+        }
+
+        mcs.decrementCounter( queueName, DatabaseQueueMessage.Type.DEFAULT, 10 );
+        Assert.assertEquals( 0, mcs.getCounterValue( queueName, DatabaseQueueMessage.Type.DEFAULT ) );
+
+        mcs.incrementCounter( queueName, DatabaseQueueMessage.Type.DEFAULT, 10 );
+        Assert.assertEquals( 10, mcs.getCounterValue( queueName, DatabaseQueueMessage.Type.DEFAULT ) );
+
+        mcs.incrementCounter( queueName, DatabaseQueueMessage.Type.DEFAULT, 10 );
+        Assert.assertEquals( 20, mcs.getCounterValue( queueName, DatabaseQueueMessage.Type.DEFAULT ) );
+
+        mcs.incrementCounter( queueName, DatabaseQueueMessage.Type.DEFAULT, 10 );
+        Assert.assertEquals( 30, mcs.getCounterValue( queueName, DatabaseQueueMessage.Type.DEFAULT ) );
+
+        mcs.incrementCounter( queueName, DatabaseQueueMessage.Type.DEFAULT, 10 );
+        Assert.assertEquals( 40, mcs.getCounterValue( queueName, DatabaseQueueMessage.Type.DEFAULT ) );
+
+        mcs.incrementCounter( queueName, DatabaseQueueMessage.Type.DEFAULT, 10 );
+        Assert.assertEquals( 50, mcs.getCounterValue( queueName, DatabaseQueueMessage.Type.DEFAULT ) );
+
+        mcs.incrementCounter( queueName, DatabaseQueueMessage.Type.DEFAULT, 50 );
+        Assert.assertEquals( 100, mcs.getCounterValue( queueName, DatabaseQueueMessage.Type.DEFAULT ) );
+
+        mcs.decrementCounter( queueName, DatabaseQueueMessage.Type.DEFAULT, 10 );
+        Assert.assertEquals( 90, mcs.getCounterValue( queueName, DatabaseQueueMessage.Type.DEFAULT ) );
+
+        mcs.decrementCounter( queueName, DatabaseQueueMessage.Type.DEFAULT, 10 );
+        Assert.assertEquals( 80, mcs.getCounterValue( queueName, DatabaseQueueMessage.Type.DEFAULT ) );
+
+        mcs.decrementCounter( queueName, DatabaseQueueMessage.Type.DEFAULT, 10 );
+        Assert.assertEquals( 70, mcs.getCounterValue( queueName, DatabaseQueueMessage.Type.DEFAULT ) );
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9306f12e/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardCounterSerializationTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardCounterSerializationTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardCounterSerializationTest.java
index f9c2951..8dc16bb 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardCounterSerializationTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardCounterSerializationTest.java
@@ -36,9 +36,6 @@ public class ShardCounterSerializationTest extends AbstractTest {
     @Test
     public void testBasicOperation() throws Exception {
 
-        CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class );
-
-
         ShardCounterSerialization scs = getInjector().getInstance( ShardCounterSerialization.class );
 
         String queueName = "scst_queue_" + RandomStringUtils.randomAlphanumeric( 20 );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9306f12e/stack/corepersistence/queue/src/test/resources/qakka.properties
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/resources/qakka.properties b/stack/corepersistence/queue/src/test/resources/qakka.properties
index aacc187..fb46f3d 100644
--- a/stack/corepersistence/queue/src/test/resources/qakka.properties
+++ b/stack/corepersistence/queue/src/test/resources/qakka.properties
@@ -44,6 +44,7 @@ queue.shard.allocation.check.frequency.millis=100
 queue.shard.allocation.advance.time.millis=200
 
 queue.max.inmemory.shard.counter = 100
+queue.max.inmemory.max.message.changes=3
 
 queue.long.polling.time.millis=2000