You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2016/10/10 21:45:20 UTC

[2/6] usergrid git commit: Add total-time metrics, handle Ack requests via WriterRouter, use C* batch-statement for Ack operation

Add total-time metrics, handle Ack requests via WriterRouter, use C* batch-statement for Ack operation


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/7be8c274
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/7be8c274
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/7be8c274

Branch: refs/heads/usergrid-1318-queue
Commit: 7be8c274e83e1680221e06c327ad04e697c61ca6
Parents: 71fe06f
Author: Dave Johnson <sn...@apache.org>
Authored: Mon Oct 10 09:37:32 2016 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Mon Oct 10 09:37:32 2016 -0400

----------------------------------------------------------------------
 .../usergrid/persistence/qakka/QakkaFig.java    |   2 +-
 .../persistence/qakka/api/QueueResource.java    |  83 ++++------
 .../qakka/distributed/actors/QueueActor.java    |  40 +----
 .../distributed/actors/QueueActorHelper.java    | 150 +++++------------
 .../distributed/actors/QueueActorRouter.java    |  50 +-----
 .../distributed/actors/QueueRefresher.java      | 103 +++++++++++-
 .../qakka/distributed/actors/QueueSender.java   |  16 +-
 .../distributed/actors/QueueTimeouter.java      |  13 --
 .../qakka/distributed/actors/QueueWriter.java   |  49 ++++--
 .../distributed/actors/QueueWriterRouter.java   |   7 +-
 .../impl/DistributedQueueServiceImpl.java       | 161 +++++++++++--------
 .../impl/QueueActorRouterProducer.java          |   3 +-
 .../impl/QueueWriterRouterProducer.java         |  11 +-
 .../distributed/messages/QakkaMessage.java      |   6 +-
 .../distributed/messages/QueueGetResponse.java  |  14 +-
 .../distributed/messages/QueueSendResponse.java |   9 +-
 .../messages/QueueWriteResponse.java            |   8 +-
 .../MultiShardMessageIterator.java              |   6 +
 .../QueueMessageSerialization.java              |   5 +
 .../impl/QueueMessageSerializationImpl.java     | 151 ++++++++++++-----
 .../distributed/QueueActorServiceTest.java      |   3 -
 .../actors/QueueActorHelperTest.java            |   2 +-
 .../distributed/actors/QueueReaderTest.java     |  10 +-
 .../DatabaseQueueMessageSerializationTest.java  |  22 ++-
 .../queue/src/test/resources/log4j.properties   |   4 +-
 25 files changed, 507 insertions(+), 421 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/7be8c274/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
index 7d89187..3093c39 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
@@ -109,7 +109,7 @@ public interface QakkaFig extends GuicyFig, Serializable {
 
     /** How long to wait for response from queue actor before timing out and trying again */
     @Key(QUEUE_GET_TIMEOUT)
-    @Default("4")
+    @Default("1")
     int getGetTimeoutSeconds();
 
     /** Max number of times to retry call to queue writer for queue send operation */

http://git-wip-us.apache.org/repos/asf/usergrid/blob/7be8c274/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/QueueResource.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/QueueResource.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/QueueResource.java
index 10dae04..b609de3 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/QueueResource.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/QueueResource.java
@@ -19,13 +19,10 @@
 
 package org.apache.usergrid.persistence.qakka.api;
 
-import com.codahale.metrics.Timer;
 import com.fasterxml.jackson.databind.util.ByteBufferBackedInputStream;
 import com.google.common.base.Preconditions;
 import com.google.common.io.ByteStreams;
-import org.apache.usergrid.persistence.qakka.MetricsService;
 import org.apache.usergrid.persistence.qakka.core.*;
-import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardCounterSerialization;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,7 +43,6 @@ public class QueueResource {
 
     private final QueueManager        queueManager;
     private final QueueMessageManager queueMessageManager;
-    private final MetricsService      metricsService;
     private final URIStrategy         uriStrategy;
     private final Regions             regions;
 
@@ -55,13 +51,11 @@ public class QueueResource {
     public QueueResource(
             QueueManager              queueManager,
             QueueMessageManager       queueMessageManager,
-            MetricsService            metricsService,
             URIStrategy               uriStrategy,
             Regions                   regions ) {
 
         this.queueManager              = queueManager;
         this.queueMessageManager       = queueMessageManager;
-        this.metricsService            = metricsService;
         this.uriStrategy               = uriStrategy;
         this.regions                   = regions;
     }
@@ -262,8 +256,6 @@ public class QueueResource {
                                    String contentType,
                                    ByteBuffer byteBuffer) {
 
-        Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.SEND_TIME_TOTAL ).time();
-        try {
 
             Preconditions.checkArgument( !QakkaUtils.isNullOrEmpty( queueName ), "Queue name is required" );
 
@@ -285,9 +277,6 @@ public class QueueResource {
             apiResponse.setCount( 1 );
             return Response.ok().entity( apiResponse ).build();
 
-        } finally {
-            timer.close();
-        }
     }
 
 
@@ -297,38 +286,31 @@ public class QueueResource {
     public Response getNextMessages( @PathParam("queueName") String queueName,
                                      @QueryParam("count") @DefaultValue("1") String countParam) throws Exception {
 
-        Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.GET_TIME_TOTAL ).time();
-        try {
-
-            Preconditions.checkArgument( !QakkaUtils.isNullOrEmpty( queueName ), "Queue name is required" );
+        Preconditions.checkArgument( !QakkaUtils.isNullOrEmpty( queueName ), "Queue name is required" );
 
-            int count = 1;
-            try {
-                count = Integer.parseInt( countParam );
-            } catch (Exception e) {
-                throw new IllegalArgumentException( "Invalid count parameter" );
-            }
-            if (count <= 0) {
-                // invalid count
-                throw new IllegalArgumentException( "Count must be >= 1" );
-            }
-
-            List<QueueMessage> messages = queueMessageManager.getNextMessages( queueName, count );
+        int count = 1;
+        try {
+            count = Integer.parseInt( countParam );
+        } catch (Exception e) {
+            throw new IllegalArgumentException( "Invalid count parameter" );
+        }
+        if (count <= 0) {
+            // invalid count
+            throw new IllegalArgumentException( "Count must be >= 1" );
+        }
 
-            ApiResponse apiResponse = new ApiResponse();
+        List<QueueMessage> messages = queueMessageManager.getNextMessages( queueName, count );
 
-            if (messages != null && !messages.isEmpty()) {
-                apiResponse.setQueueMessages( messages );
+        ApiResponse apiResponse = new ApiResponse();
 
-            } else { // always return queueMessages field
-                apiResponse.setQueueMessages( Collections.EMPTY_LIST );
-            }
-            apiResponse.setCount( apiResponse.getQueueMessages().size() );
-            return Response.ok().entity( apiResponse ).build();
+        if (messages != null && !messages.isEmpty()) {
+            apiResponse.setQueueMessages( messages );
 
-        } finally {
-            timer.close();
+        } else { // always return queueMessages field
+            apiResponse.setQueueMessages( Collections.EMPTY_LIST );
         }
+        apiResponse.setCount( apiResponse.getQueueMessages().size() );
+        return Response.ok().entity( apiResponse ).build();
     }
 
 
@@ -338,25 +320,18 @@ public class QueueResource {
     public Response ackMessage( @PathParam("queueName") String queueName,
                                 @PathParam("queueMessageId") String queueMessageId) throws Exception {
 
-        Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.ACK_TIME_TOTAL ).time();
-        try {
-
-            Preconditions.checkArgument( !QakkaUtils.isNullOrEmpty( queueName ), "Queue name is required" );
+        Preconditions.checkArgument( !QakkaUtils.isNullOrEmpty( queueName ), "Queue name is required" );
 
-            UUID messageUuid;
-            try {
-                messageUuid = UUID.fromString( queueMessageId );
-            } catch (Exception e) {
-                throw new IllegalArgumentException( "Invalid queue message UUID" );
-            }
-            queueMessageManager.ackMessage( queueName, messageUuid );
-
-            ApiResponse apiResponse = new ApiResponse();
-            return Response.ok().entity( apiResponse ).build();
-
-        } finally {
-            timer.close();
+        UUID messageUuid;
+        try {
+            messageUuid = UUID.fromString( queueMessageId );
+        } catch (Exception e) {
+            throw new IllegalArgumentException( "Invalid queue message UUID" );
         }
+        queueMessageManager.ackMessage( queueName, messageUuid );
+
+        ApiResponse apiResponse = new ApiResponse();
+        return Response.ok().entity( apiResponse ).build();
     }
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/7be8c274/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
index 9ce38ef..248f9cd 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
@@ -64,32 +64,23 @@ public class QueueActor extends UntypedActor {
 
     private final Set<String> queuesSeen = new HashSet<>();
 
-    //private final Injector injector;
-
 
     @Inject
     public QueueActor(
-        //Injector         injector,
         QakkaFig         qakkaFig,
         InMemoryQueue    inMemoryQueue,
         QueueActorHelper queueActorHelper,
         MetricsService   metricsService,
         MessageCounterSerialization messageCounterSerialization
     ) {
-        //this.injector = injector;
         this.qakkaFig = qakkaFig;
         this.inMemoryQueue = inMemoryQueue;
         this.queueActorHelper = queueActorHelper;
         this.metricsService = metricsService;
         this.messageCounterSerialization = messageCounterSerialization;
-
-//        qakkaFig         = injector.getInstance( QakkaFig.class );
-//        inMemoryQueue    = injector.getInstance( InMemoryQueue.class );
-//        queueActorHelper = injector.getInstance( QueueActorHelper.class );
-//        metricsService   = injector.getInstance( MetricsService.class );
-//        messageCounterSerialization = injector.getInstance( MessageCounterSerialization.class );
     }
 
+
     @Override
     public void onReceive(Object message) {
 
@@ -134,6 +125,7 @@ public class QueueActor extends UntypedActor {
                 logger.debug("Created shard allocater for queue {}", request.getQueueName() );
             }
 
+
         } else if ( message instanceof QueueRefreshRequest ) {
             QueueRefreshRequest request = (QueueRefreshRequest)message;
             queuesSeen.add( request.getQueueName() );
@@ -154,6 +146,7 @@ public class QueueActor extends UntypedActor {
             // hand-off to queue's reader
             queueReadersByQueueName.get( request.getQueueName() ).tell( request, self() );
 
+
         } else if ( message instanceof QueueTimeoutRequest ) {
             QueueTimeoutRequest request = (QueueTimeoutRequest)message;
 
@@ -169,6 +162,7 @@ public class QueueActor extends UntypedActor {
             // ASYNCHRONOUS -> hand-off to queue's timeouter
             queueTimeoutersByQueueName.get( request.getQueueName() ).tell( request, self() );
 
+
         } else if ( message instanceof ShardCheckRequest ) {
             ShardCheckRequest request = (ShardCheckRequest)message;
 
@@ -184,6 +178,7 @@ public class QueueActor extends UntypedActor {
             // ASYNCHRONOUS -> hand-off to queue's shard allocator
             shardAllocatorsByQueueName.get( request.getQueueName() ).tell( request, self() );
 
+
         } else if ( message instanceof QueueGetRequest) {
 
             QueueGetRequest queueGetRequest = (QueueGetRequest) message;
@@ -199,30 +194,7 @@ public class QueueActor extends UntypedActor {
                 Collection<DatabaseQueueMessage> messages = queueActorHelper.getMessages( queueName, numRequested);
 
                 getSender().tell( new QueueGetResponse(
-                        DistributedQueueService.Status.SUCCESS, messages ), getSender() );
-
-            } finally {
-                timer.close();
-            }
-
-
-        } else if ( message instanceof QueueAckRequest) {
-
-            Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.ACK_TIME_ACK ).time();
-            try {
-
-                QueueAckRequest queueAckRequest = (QueueAckRequest) message;
-
-                queuesSeen.add( queueAckRequest.getQueueName() );
-
-                DistributedQueueService.Status status = queueActorHelper.ackQueueMessage(
-                        queueAckRequest.getQueueName(),
-                        queueAckRequest.getQueueMessageId() );
-
-                getSender().tell( new QueueAckResponse(
-                        queueAckRequest.getQueueName(),
-                        queueAckRequest.getQueueMessageId(),
-                        status ), getSender() );
+                        DistributedQueueService.Status.SUCCESS, messages, queueName ), getSender() );
 
             } finally {
                 timer.close();

http://git-wip-us.apache.org/repos/asf/usergrid/blob/7be8c274/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java
index e3996c5..fcb3fba 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java
@@ -41,7 +41,6 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Optional;
 import java.util.UUID;
-import java.util.concurrent.atomic.AtomicLong;
 
 
 public class QueueActorHelper {
@@ -55,9 +54,6 @@ public class QueueActorHelper {
     private final MetricsService            metricsService;
     private final CassandraClient           cassandraClient;
 
-    private final AtomicLong runCount = new AtomicLong(0);
-    private final AtomicLong totalRead = new AtomicLong(0);
-
 
     @Inject
     public QueueActorHelper(
@@ -109,7 +105,7 @@ public class QueueActorHelper {
 
             if (queueMessage != null) {
 
-                if (putInflight( queueName, queueMessage )) {
+                if (putInflight( queueMessage )) {
                     queueMessages.add( queueMessage );
                 }
 
@@ -125,10 +121,48 @@ public class QueueActorHelper {
     }
 
 
+    boolean putInflight( DatabaseQueueMessage queueMessage ) {
+
+        UUID qmid = queueMessage.getQueueMessageId();
+        try {
+
+            messageSerialization.putInflight( queueMessage );
+
+        } catch ( Throwable t ) {
+            logger.error("Error putting inflight queue message "
+                + qmid + " queue name: " + queueMessage.getQueueName(), t);
+
+            auditLogSerialization.recordAuditLog(
+                AuditLog.Action.GET,
+                AuditLog.Status.ERROR,
+                queueMessage.getQueueName(),
+                actorSystemFig.getRegionLocal(),
+                queueMessage.getMessageId(),
+                qmid);
+
+            return false;
+        }
+
+        auditLogSerialization.recordAuditLog(
+            AuditLog.Action.GET,
+            AuditLog.Status.SUCCESS,
+            queueMessage.getQueueName(),
+            actorSystemFig.getRegionLocal(),
+            queueMessage.getMessageId(),
+            qmid);
+
+        return true;
+    }
+
+
     DistributedQueueService.Status ackQueueMessage(String queueName, UUID queueMessageId ) {
 
-        DatabaseQueueMessage queueMessage = loadDatabaseQueueMessage(
-                queueName, queueMessageId, DatabaseQueueMessage.Type.INFLIGHT );
+        DatabaseQueueMessage queueMessage = messageSerialization.loadMessage(
+            queueName,
+            actorSystemFig.getRegionLocal(),
+            null,
+            DatabaseQueueMessage.Type.INFLIGHT,
+            queueMessageId );
 
         if ( queueMessage == null ) {
             logger.error("Queue {} queue message id {} not found in inflight table", queueName, queueMessageId);
@@ -174,106 +208,4 @@ public class QueueActorHelper {
             return DistributedQueueService.Status.ERROR;
         }
     }
-
-
-    boolean putInflight( String queueName, DatabaseQueueMessage queueMessage ) {
-
-        UUID qmid = queueMessage.getQueueMessageId();
-        try {
-
-            DatabaseQueueMessage inflightMessage = new DatabaseQueueMessage(
-                queueMessage.getMessageId(),
-                DatabaseQueueMessage.Type.INFLIGHT,
-                queueName,
-                actorSystemFig.getRegionLocal(),
-                null,                         // let serialization select the shard
-                queueMessage.getQueuedAt(),
-                System.currentTimeMillis(),
-                qmid);
-
-            messageSerialization.writeMessage( inflightMessage );
-
-            DatabaseQueueMessage retrieved = loadDatabaseQueueMessage(
-                queueName, qmid, DatabaseQueueMessage.Type.INFLIGHT );
-            if ( retrieved == null ) {
-                logger.error("Failed ot write queue message id {} to inflight table", qmid);
-                return false;
-            }
-
-            messageSerialization.deleteMessage(
-                    queueName,
-                    actorSystemFig.getRegionLocal(),
-                    null,
-                    DatabaseQueueMessage.Type.DEFAULT,
-                    qmid);
-
-            //logger.debug("Put message {} inflight for queue name {}", qmid, queueName);
-
-        } catch ( Throwable t ) {
-            logger.error("Error putting inflight queue message " + qmid + " queue name: " + queueName, t);
-
-            auditLogSerialization.recordAuditLog(
-                    AuditLog.Action.GET,
-                    AuditLog.Status.ERROR,
-                    queueName,
-                    actorSystemFig.getRegionLocal(),
-                    queueMessage.getMessageId(),
-                    qmid);
-
-            return false;
-        }
-
-        auditLogSerialization.recordAuditLog(
-                AuditLog.Action.GET,
-                AuditLog.Status.SUCCESS,
-                queueName,
-                actorSystemFig.getRegionLocal(),
-                queueMessage.getMessageId(),
-                qmid);
-
-        return true;
-    }
-
-
-    void queueRefresh( String queueName ) {
-
-        Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.REFRESH_TIME).time();
-
-        try {
-
-            if (inMemoryQueue.size( queueName ) < qakkaFig.getQueueInMemorySize()) {
-
-                // TODO: need to track the starting shard
-
-                ShardIterator shardIterator = new ShardIterator(
-                    cassandraClient, queueName, actorSystemFig.getRegionLocal(),
-                    Shard.Type.DEFAULT, Optional.empty() );
-
-                UUID since = inMemoryQueue.getNewest( queueName );
-
-                String region = actorSystemFig.getRegionLocal();
-                MultiShardMessageIterator multiShardIterator = new MultiShardMessageIterator(
-                    cassandraClient, queueName, region, DatabaseQueueMessage.Type.DEFAULT,
-                    shardIterator, since);
-
-                int need = qakkaFig.getQueueInMemorySize() - inMemoryQueue.size( queueName );
-                int count = 0;
-
-                while ( multiShardIterator.hasNext() && count < need ) {
-                    DatabaseQueueMessage queueMessage = multiShardIterator.next();
-                    inMemoryQueue.add( queueName, queueMessage );
-                    count++;
-                }
-
-                if ( count > 0 ) {
-                    logger.debug( "Added {} in-memory for queue {}, new size = {}",
-                        count, queueName, inMemoryQueue.size( queueName ) );
-                }
-            }
-
-        } finally {
-            timer.close();
-        }
-
-    }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/7be8c274/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java
index c40a3d9..f908e7f 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java
@@ -27,6 +27,7 @@ import akka.routing.FromConfig;
 import com.google.inject.Inject;
 import com.google.inject.Injector;
 import org.apache.usergrid.persistence.actorsystem.GuiceActorProducer;
+import org.apache.usergrid.persistence.qakka.distributed.impl.QueueActorRouterProducer;
 import org.apache.usergrid.persistence.qakka.distributed.messages.*;
 
 
@@ -36,10 +37,13 @@ import org.apache.usergrid.persistence.qakka.distributed.messages.*;
 public class QueueActorRouter extends UntypedActor {
 
     private final ActorRef routerRef;
+    private final QueueActorRouterProducer queueActorRouterProducer;
 
 
     @Inject
-    public QueueActorRouter( Injector injector ) {
+    public QueueActorRouter( QueueActorRouterProducer queueActorRouterProducer ) {
+
+        this.queueActorRouterProducer = queueActorRouterProducer;
 
         this.routerRef = getContext().actorOf( FromConfig.getInstance().props(
             Props.create(GuiceActorProducer.class, QueueActor.class)), "router");
@@ -48,51 +52,13 @@ public class QueueActorRouter extends UntypedActor {
     @Override
     public void onReceive(Object message) {
 
-        // TODO: can we do something smarter than this if-then-else structure
-        // e.g. if message is recognized as one of ours, then we just pass it on?
-
-        if ( message instanceof QueueGetRequest) {
-            QueueGetRequest qgr = (QueueGetRequest) message;
+        if ( queueActorRouterProducer.getMessageTypes().contains( message.getClass() ) ) {
+            QakkaMessage qakkaMessage = (QakkaMessage) message;
 
             ConsistentHashingRouter.ConsistentHashableEnvelope envelope =
-                    new ConsistentHashingRouter.ConsistentHashableEnvelope( message, qgr.getQueueName() );
+                    new ConsistentHashingRouter.ConsistentHashableEnvelope( message, qakkaMessage.getQueueName() );
             routerRef.tell( envelope, getSender() );
 
-        } else if ( message instanceof QueueAckRequest) {
-            QueueAckRequest qar = (QueueAckRequest)message;
-
-            ConsistentHashingRouter.ConsistentHashableEnvelope envelope =
-                    new ConsistentHashingRouter.ConsistentHashableEnvelope( message, qar.getQueueName() );
-            routerRef.tell( envelope, getSender());
-
-        } else if ( message instanceof QueueInitRequest) {
-            QueueInitRequest qar = (QueueInitRequest)message;
-
-            ConsistentHashingRouter.ConsistentHashableEnvelope envelope =
-                    new ConsistentHashingRouter.ConsistentHashableEnvelope( message, qar.getQueueName() );
-            routerRef.tell( envelope, getSender());
-
-        } else if ( message instanceof QueueRefreshRequest) {
-            QueueRefreshRequest qar = (QueueRefreshRequest)message;
-
-            ConsistentHashingRouter.ConsistentHashableEnvelope envelope =
-                    new ConsistentHashingRouter.ConsistentHashableEnvelope( message, qar.getQueueName() );
-            routerRef.tell( envelope, getSender());
-
-        } else if ( message instanceof QueueTimeoutRequest) {
-            QueueTimeoutRequest qar = (QueueTimeoutRequest)message;
-
-            ConsistentHashingRouter.ConsistentHashableEnvelope envelope =
-                    new ConsistentHashingRouter.ConsistentHashableEnvelope( message, qar.getQueueName() );
-            routerRef.tell( envelope, getSender());
-
-        } else if ( message instanceof ShardCheckRequest) {
-            ShardCheckRequest qar = (ShardCheckRequest)message;
-
-            ConsistentHashingRouter.ConsistentHashableEnvelope envelope =
-                    new ConsistentHashingRouter.ConsistentHashableEnvelope( message, qar.getQueueName() );
-            routerRef.tell( envelope, getSender());
-
         } else {
             unhandled(message);
         }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/7be8c274/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java
index ae9969c..afd5640 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java
@@ -20,20 +20,52 @@
 package org.apache.usergrid.persistence.qakka.distributed.actors;
 
 import akka.actor.UntypedActor;
+import com.codahale.metrics.Timer;
 import com.google.inject.Inject;
+import org.apache.usergrid.persistence.actorsystem.ActorSystemFig;
+import org.apache.usergrid.persistence.qakka.MetricsService;
+import org.apache.usergrid.persistence.qakka.QakkaFig;
+import org.apache.usergrid.persistence.qakka.core.CassandraClient;
+import org.apache.usergrid.persistence.qakka.core.impl.InMemoryQueue;
 import org.apache.usergrid.persistence.qakka.distributed.messages.QueueRefreshRequest;
+import org.apache.usergrid.persistence.qakka.serialization.MultiShardMessageIterator;
+import org.apache.usergrid.persistence.qakka.serialization.auditlog.AuditLogSerialization;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization;
+import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard;
+import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardIterator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+
 
 public class QueueRefresher extends UntypedActor {
     private static final Logger logger = LoggerFactory.getLogger( QueueRefresher.class );
 
-    final QueueActorHelper helper;
+    private final ActorSystemFig  actorSystemFig;
+    private final InMemoryQueue   inMemoryQueue;
+    private final QakkaFig        qakkaFig;
+    private final MetricsService  metricsService;
+    private final CassandraClient cassandraClient;
+
 
     @Inject
-    public QueueRefresher( QueueActorHelper helper ) {
-        this.helper = helper;
+    public QueueRefresher(
+        ActorSystemFig  actorSystemFig,
+        InMemoryQueue   inMemoryQueue,
+        QakkaFig        qakkaFig,
+        MetricsService  metricsService,
+        CassandraClient cassandraClient
+    ) {
+        this.actorSystemFig  = actorSystemFig;
+        this.inMemoryQueue   = inMemoryQueue;
+        this.qakkaFig        = qakkaFig;
+        this.metricsService  = metricsService;
+        this.cassandraClient = cassandraClient;
     }
 
 
@@ -44,10 +76,73 @@ public class QueueRefresher extends UntypedActor {
 
             QueueRefreshRequest request = (QueueRefreshRequest) message;
             String queueName = request.getQueueName();
-            helper.queueRefresh( queueName );
+            queueRefresh( queueName );
 
         } else {
             unhandled( message );
         }
     }
+
+    Map<String, Long> startingShards = new HashMap<>();
+
+
+    void queueRefresh( String queueName ) {
+
+        Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.REFRESH_TIME).time();
+
+        try {
+
+            if (inMemoryQueue.size( queueName ) < qakkaFig.getQueueInMemorySize()) {
+
+                final Optional shardIdOptional;
+                final String shardKey =
+                    createShardKey( queueName, Shard.Type.DEFAULT, actorSystemFig.getRegionLocal() );
+                Long shardId = startingShards.get( shardKey );
+
+                if ( shardId != null ) {
+                    shardIdOptional = Optional.of( shardId );
+                } else {
+                    shardIdOptional = Optional.empty();
+                }
+
+                ShardIterator shardIterator = new ShardIterator(
+                    cassandraClient, queueName, actorSystemFig.getRegionLocal(),
+                    Shard.Type.DEFAULT, Optional.empty() );
+
+                UUID since = inMemoryQueue.getNewest( queueName );
+
+                String region = actorSystemFig.getRegionLocal();
+                MultiShardMessageIterator multiShardIterator = new MultiShardMessageIterator(
+                    cassandraClient, queueName, region, DatabaseQueueMessage.Type.DEFAULT,
+                    shardIterator, since);
+
+                int need = qakkaFig.getQueueInMemorySize() - inMemoryQueue.size( queueName );
+                int count = 0;
+
+                while ( multiShardIterator.hasNext() && count < need ) {
+                    DatabaseQueueMessage queueMessage = multiShardIterator.next();
+                    inMemoryQueue.add( queueName, queueMessage );
+                    count++;
+                }
+
+                if ( multiShardIterator.getCurrentShard() != null ) {
+                    startingShards.put( shardKey, multiShardIterator.getCurrentShard().getShardId() );
+                }
+
+                if ( count > 0 ) {
+                    logger.debug( "Added {} in-memory for queue {}, new size = {}",
+                        count, queueName, inMemoryQueue.size( queueName ) );
+                }
+            }
+
+        } finally {
+            timer.close();
+        }
+
+    }
+
+    private String createShardKey(String queueName, Shard.Type type, String region ) {
+        return queueName + "_" + type + region;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/7be8c274/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java
index 3dc695e..ccc39f5 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java
@@ -26,18 +26,13 @@ import akka.pattern.Patterns;
 import akka.util.Timeout;
 import com.codahale.metrics.Timer;
 import com.google.inject.Inject;
-import com.google.inject.Injector;
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.usergrid.persistence.actorsystem.ActorSystemFig;
 import org.apache.usergrid.persistence.actorsystem.ActorSystemManager;
-import org.apache.usergrid.persistence.qakka.App;
 import org.apache.usergrid.persistence.qakka.MetricsService;
 import org.apache.usergrid.persistence.qakka.QakkaFig;
 import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService;
-import org.apache.usergrid.persistence.qakka.distributed.messages.QueueSendRequest;
-import org.apache.usergrid.persistence.qakka.distributed.messages.QueueSendResponse;
-import org.apache.usergrid.persistence.qakka.distributed.messages.QueueWriteRequest;
-import org.apache.usergrid.persistence.qakka.distributed.messages.QueueWriteResponse;
+import org.apache.usergrid.persistence.qakka.distributed.messages.*;
 import org.apache.usergrid.persistence.qakka.exceptions.QakkaException;
 import org.apache.usergrid.persistence.qakka.exceptions.QakkaRuntimeException;
 import org.apache.usergrid.persistence.qakka.serialization.auditlog.AuditLog;
@@ -80,13 +75,6 @@ public class QueueSender extends UntypedActor {
         this.actorSystemFig = actorSystemFig;
         this.qakkaFig = qakkaFig;
         this.metricsService = metricsService;
-
-//        actorSystemManager       = injector.getInstance( ActorSystemManager.class );
-//        transferLogSerialization = injector.getInstance( TransferLogSerialization.class );
-//        auditLogSerialization    = injector.getInstance( AuditLogSerialization.class );
-//        actorSystemFig           = injector.getInstance( ActorSystemFig.class );
-//        qakkaFig                 = injector.getInstance( QakkaFig.class );
-//        metricsService           = injector.getInstance( MetricsService.class );
     }
 
     @Override
@@ -97,7 +85,7 @@ public class QueueSender extends UntypedActor {
 
             // as far as caller is concerned, we are done.
             getSender().tell( new QueueSendResponse(
-                    DistributedQueueService.Status.SUCCESS ), getSender() );
+                    DistributedQueueService.Status.SUCCESS, qa.getQueueName() ), getSender() );
 
             final QueueWriter.WriteStatus writeStatus = sendMessageToRegion(
                     qa.getQueueName(),

http://git-wip-us.apache.org/repos/asf/usergrid/blob/7be8c274/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java
index 33f1dd9..b7a95df 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java
@@ -22,29 +22,22 @@ package org.apache.usergrid.persistence.qakka.distributed.actors;
 import akka.actor.UntypedActor;
 import com.codahale.metrics.Timer;
 import com.google.inject.Inject;
-import com.google.inject.Injector;
 import org.apache.usergrid.persistence.actorsystem.ActorSystemFig;
-import org.apache.usergrid.persistence.qakka.App;
 import org.apache.usergrid.persistence.qakka.MetricsService;
 import org.apache.usergrid.persistence.qakka.QakkaFig;
 import org.apache.usergrid.persistence.qakka.core.CassandraClient;
-import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl;
 import org.apache.usergrid.persistence.qakka.core.QakkaUtils;
 import org.apache.usergrid.persistence.qakka.distributed.messages.QueueTimeoutRequest;
-import org.apache.usergrid.persistence.qakka.exceptions.QakkaRuntimeException;
 import org.apache.usergrid.persistence.qakka.serialization.MultiShardMessageIterator;
 import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
-import org.apache.usergrid.persistence.qakka.serialization.queuemessages.MessageCounterSerialization;
 import org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization;
 import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard;
 import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardIterator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.text.DecimalFormat;
 import java.util.Optional;
 import java.util.UUID;
-import java.util.concurrent.atomic.AtomicLong;
 
 
 public class QueueTimeouter extends UntypedActor {
@@ -70,12 +63,6 @@ public class QueueTimeouter extends UntypedActor {
         this.actorSystemFig = actorSystemFig;
         this.qakkaFig = qakkaFig;
         this.cassandraClient = cassandraClient;
-
-//        messageSerialization = injector.getInstance( QueueMessageSerialization.class );
-//        actorSystemFig       = injector.getInstance( ActorSystemFig.class );
-//        qakkaFig             = injector.getInstance( QakkaFig.class );
-//        metricsService       = injector.getInstance( MetricsService.class );
-//        cassandraClient      = injector.getInstance( CassandraClientImpl.class );
     }
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/7be8c274/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java
index e014d59..a7dbbd0 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java
@@ -22,17 +22,16 @@ package org.apache.usergrid.persistence.qakka.distributed.actors;
 import akka.actor.UntypedActor;
 import com.codahale.metrics.Timer;
 import com.google.inject.Inject;
-import com.google.inject.Injector;
-import org.apache.usergrid.persistence.qakka.App;
 import org.apache.usergrid.persistence.qakka.MetricsService;
 import org.apache.usergrid.persistence.qakka.core.QakkaUtils;
 import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService;
+import org.apache.usergrid.persistence.qakka.distributed.messages.QueueAckRequest;
+import org.apache.usergrid.persistence.qakka.distributed.messages.QueueAckResponse;
 import org.apache.usergrid.persistence.qakka.distributed.messages.QueueWriteRequest;
 import org.apache.usergrid.persistence.qakka.distributed.messages.QueueWriteResponse;
 import org.apache.usergrid.persistence.qakka.serialization.auditlog.AuditLog;
 import org.apache.usergrid.persistence.qakka.serialization.auditlog.AuditLogSerialization;
 import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
-import org.apache.usergrid.persistence.qakka.serialization.queuemessages.MessageCounterSerialization;
 import org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization;
 import org.apache.usergrid.persistence.qakka.serialization.transferlog.TransferLogSerialization;
 import org.slf4j.Logger;
@@ -50,24 +49,21 @@ public class QueueWriter extends UntypedActor {
     private final TransferLogSerialization  transferLogSerialization;
     private final AuditLogSerialization     auditLogSerialization;
     private final MetricsService            metricsService;
-
+    private final QueueActorHelper          queueActorHelper;
 
     @Inject
     public QueueWriter(
         QueueMessageSerialization messageSerialization,
         TransferLogSerialization  transferLogSerialization,
         AuditLogSerialization     auditLogSerialization,
-        MetricsService            metricsService
+        MetricsService            metricsService,
+        QueueActorHelper          queueActorHelper
     ) {
-        this.messageSerialization = messageSerialization;
+        this.messageSerialization     = messageSerialization;
         this.transferLogSerialization = transferLogSerialization;
-        this.auditLogSerialization = auditLogSerialization;
-        this.metricsService = metricsService;
-
-//        messageSerialization     = injector.getInstance( QueueMessageSerialization.class );
-//        transferLogSerialization = injector.getInstance( TransferLogSerialization.class );
-//        auditLogSerialization    = injector.getInstance( AuditLogSerialization.class );
-//        metricsService           = injector.getInstance( MetricsService.class );
+        this.auditLogSerialization    = auditLogSerialization;
+        this.metricsService           = metricsService;
+        this.queueActorHelper         = queueActorHelper;
     }
 
     @Override
@@ -86,6 +82,7 @@ public class QueueWriter extends UntypedActor {
 
                 DatabaseQueueMessage dbqm = null;
                 long currentTime = System.currentTimeMillis();
+                String queueName = qa.getQueueName();
 
                 try {
                     dbqm = new DatabaseQueueMessage(
@@ -115,7 +112,7 @@ public class QueueWriter extends UntypedActor {
                             dbqm.getMessageId() );
 
                     getSender().tell( new QueueWriteResponse(
-                            QueueWriter.WriteStatus.ERROR ), getSender() );
+                            QueueWriter.WriteStatus.ERROR, queueName ), getSender() );
 
                     return;
                 }
@@ -136,7 +133,7 @@ public class QueueWriter extends UntypedActor {
                             qa.getMessageId() );
 
                     getSender().tell( new QueueWriteResponse(
-                            QueueWriter.WriteStatus.SUCCESS_XFERLOG_DELETED ), getSender() );
+                            QueueWriter.WriteStatus.SUCCESS_XFERLOG_DELETED, queueName ), getSender() );
 
                 } catch (Throwable e) {
                     logger.debug( "Unable to delete transfer log for {} {} {} {}",
@@ -147,13 +144,33 @@ public class QueueWriter extends UntypedActor {
                     logger.debug("Error deleting transferlog", e);
 
                     getSender().tell( new QueueWriteResponse(
-                            QueueWriter.WriteStatus.SUCCESS_XFERLOG_NOTDELETED ), getSender() );
+                            QueueWriter.WriteStatus.SUCCESS_XFERLOG_NOTDELETED, queueName ), getSender() );
                 }
 
             } finally {
                 timer.close();
             }
 
+        } else if ( message instanceof QueueAckRequest ){
+
+            Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.ACK_TIME_ACK ).time();
+            try {
+
+                QueueAckRequest queueAckRequest = (QueueAckRequest) message;
+
+                DistributedQueueService.Status status = queueActorHelper.ackQueueMessage(
+                    queueAckRequest.getQueueName(),
+                    queueAckRequest.getQueueMessageId() );
+
+                getSender().tell( new QueueAckResponse(
+                    queueAckRequest.getQueueName(),
+                    queueAckRequest.getQueueMessageId(),
+                    status ), getSender() );
+
+            } finally {
+                timer.close();
+            }
+
         } else {
             unhandled( message );
         }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/7be8c274/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriterRouter.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriterRouter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriterRouter.java
index 0e3e981..cb06c1d 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriterRouter.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriterRouter.java
@@ -24,8 +24,8 @@ import akka.actor.Props;
 import akka.actor.UntypedActor;
 import akka.routing.FromConfig;
 import com.google.inject.Inject;
-import com.google.inject.Injector;
 import org.apache.usergrid.persistence.actorsystem.GuiceActorProducer;
+import org.apache.usergrid.persistence.qakka.distributed.messages.QueueAckRequest;
 import org.apache.usergrid.persistence.qakka.distributed.messages.QueueWriteRequest;
 
 
@@ -37,7 +37,7 @@ public class QueueWriterRouter extends UntypedActor {
     private final ActorRef router;
 
     @Inject
-    public QueueWriterRouter( Injector injector ) {
+    public QueueWriterRouter() {
 
         this.router = getContext().actorOf( FromConfig.getInstance().props(
             Props.create( GuiceActorProducer.class, QueueWriter.class )), "router");
@@ -46,7 +46,8 @@ public class QueueWriterRouter extends UntypedActor {
     @Override
     public void onReceive(Object message) {
 
-        if ( message instanceof QueueWriteRequest) {
+        if (   message instanceof QueueWriteRequest || message instanceof QueueAckRequest ) {
+
             router.tell( message, getSender() );
 
         } else {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/7be8c274/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
index 9063242..20bf608 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
@@ -22,14 +22,16 @@ package org.apache.usergrid.persistence.qakka.distributed.impl;
 import akka.actor.ActorRef;
 import akka.pattern.Patterns;
 import akka.util.Timeout;
+import com.codahale.metrics.*;
+import com.codahale.metrics.Timer;
 import com.datastax.driver.core.exceptions.InvalidQueryException;
-import com.google.inject.Guice;
 import com.google.inject.Inject;
 import com.google.inject.Injector;
 import com.google.inject.Singleton;
 import org.apache.usergrid.persistence.actorsystem.ActorSystemManager;
 import org.apache.usergrid.persistence.actorsystem.ClientActor;
 import org.apache.usergrid.persistence.actorsystem.GuiceActorProducer;
+import org.apache.usergrid.persistence.qakka.MetricsService;
 import org.apache.usergrid.persistence.qakka.QakkaFig;
 import org.apache.usergrid.persistence.qakka.core.QueueManager;
 import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService;
@@ -37,7 +39,6 @@ import org.apache.usergrid.persistence.qakka.distributed.messages.*;
 import org.apache.usergrid.persistence.qakka.exceptions.NotFoundException;
 import org.apache.usergrid.persistence.qakka.exceptions.QakkaRuntimeException;
 import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
-import org.apache.usergrid.persistence.qakka.serialization.queuemessages.MessageCounterSerialization;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.Await;
@@ -56,19 +57,21 @@ public class DistributedQueueServiceImpl implements DistributedQueueService {
     private final ActorSystemManager actorSystemManager;
     private final QueueManager queueManager;
     private final QakkaFig qakkaFig;
-
+    private final MetricsService metricsService;
 
     @Inject
     public DistributedQueueServiceImpl(
             Injector injector,
             ActorSystemManager actorSystemManager,
             QueueManager queueManager,
-            QakkaFig qakkaFig
+            QakkaFig qakkaFig,
+            MetricsService metricsService
             ) {
 
         this.actorSystemManager = actorSystemManager;
         this.queueManager = queueManager;
         this.qakkaFig = qakkaFig;
+        this.metricsService = metricsService;
 
         GuiceActorProducer.INJECTOR = injector;
     }
@@ -154,59 +157,66 @@ public class DistributedQueueServiceImpl implements DistributedQueueService {
             String queueName, String sourceRegion, String destRegion, UUID messageId,
             Long deliveryTime, Long expirationTime ) {
 
-        if ( queueManager.getQueueConfig( queueName ) == null ) {
-            throw new NotFoundException( "Queue not found: " + queueName );
-        }
+        Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.SEND_TIME_TOTAL ).time();
+        try {
 
-        int maxRetries = qakkaFig.getMaxSendRetries();
-        int retries = 0;
+            if ( queueManager.getQueueConfig( queueName ) == null ) {
+                throw new NotFoundException( "Queue not found: " + queueName );
+            }
 
-        QueueSendRequest request = new QueueSendRequest(
-                queueName, sourceRegion, destRegion, messageId, deliveryTime, expirationTime );
+            int maxRetries = qakkaFig.getMaxSendRetries();
+            int retries = 0;
 
-        while ( retries++ < maxRetries ) {
-            try {
-                Timeout t = new Timeout( qakkaFig.getSendTimeoutSeconds(), TimeUnit.SECONDS );
+            QueueSendRequest request = new QueueSendRequest(
+                    queueName, sourceRegion, destRegion, messageId, deliveryTime, expirationTime );
 
-                // send to current region via local clientActor
-                ActorRef clientActor = actorSystemManager.getClientActor();
-                Future<Object> fut = Patterns.ask( clientActor, request, t );
+            while ( retries++ < maxRetries ) {
+                try {
+                    Timeout t = new Timeout( qakkaFig.getSendTimeoutSeconds(), TimeUnit.SECONDS );
 
-                // wait for response...
-                final Object response = Await.result( fut, t.duration() );
+                    // send to current region via local clientActor
+                    ActorRef clientActor = actorSystemManager.getClientActor();
+                    Future<Object> fut = Patterns.ask( clientActor, request, t );
 
-                if ( response != null && response instanceof QueueSendResponse) {
-                    QueueSendResponse qarm = (QueueSendResponse)response;
+                    // wait for response...
+                    final Object response = Await.result( fut, t.duration() );
 
-                    if ( !DistributedQueueService.Status.ERROR.equals( qarm.getSendStatus() )) {
+                    if ( response != null && response instanceof QueueSendResponse) {
+                        QueueSendResponse qarm = (QueueSendResponse)response;
 
-                        if ( retries > 1 ) {
-                            logger.debug("SUCCESS after {} retries", retries );
-                        }
+                        if ( !DistributedQueueService.Status.ERROR.equals( qarm.getSendStatus() )) {
 
-                        // send refresh-queue-if-empty message
-                        QueueRefreshRequest qrr = new QueueRefreshRequest( queueName, false );
-                        clientActor.tell( qrr, null );
+                            if ( retries > 1 ) {
+                                logger.debug("SUCCESS after {} retries", retries );
+                            }
+
+                            // send refresh-queue-if-empty message
+                            QueueRefreshRequest qrr = new QueueRefreshRequest( queueName, false );
+                            clientActor.tell( qrr, null );
+
+                            return qarm.getSendStatus();
+
+                        } else {
+                            logger.debug("ERROR STATUS sending to queue, retrying {}", retries );
+                        }
 
-                        return qarm.getSendStatus();
+                    } else if ( response != null  ) {
+                        logger.debug("NULL RESPONSE sending to queue, retrying {}", retries );
 
                     } else {
-                        logger.debug("ERROR STATUS sending to queue, retrying {}", retries );
+                        logger.debug("TIMEOUT sending to queue, retrying {}", retries );
                     }
 
-                } else if ( response != null  ) {
-                    logger.debug("NULL RESPONSE sending to queue, retrying {}", retries );
-
-                } else {
-                    logger.debug("TIMEOUT sending to queue, retrying {}", retries );
+                } catch ( Exception e ) {
+                    logger.debug("ERROR sending to queue, retrying " + retries, e );
                 }
-
-            } catch ( Exception e ) {
-                logger.debug("ERROR sending to queue, retrying " + retries, e );
             }
-        }
 
-        throw new QakkaRuntimeException( "Error sending to queue after " + retries );
+            throw new QakkaRuntimeException( "Error sending to queue after " + retries );
+
+        } finally {
+            timer.close();
+        }
     }
 
 
@@ -214,19 +224,32 @@ public class DistributedQueueServiceImpl implements DistributedQueueService {
     public Collection<DatabaseQueueMessage> getNextMessages( String queueName, int count ) {
         List<DatabaseQueueMessage> ret = new ArrayList<>();
 
-        long startTime = System.currentTimeMillis();
+        com.codahale.metrics.Timer.Context timer =
+            metricsService.getMetricRegistry().timer( MetricsService.GET_TIME_TOTAL ).time();
+
+        try {
 
-        while ( ret.size() < count
-            && System.currentTimeMillis() - startTime < qakkaFig.getLongPollTimeMillis()) {
+            long startTime = System.currentTimeMillis();
 
-            ret.addAll( getNextMessagesInternal( queueName, count ));
+            while ( ret.size() < count
+                && System.currentTimeMillis() - startTime < qakkaFig.getLongPollTimeMillis()) {
 
-            if ( ret.size() < count ) {
-                try { Thread.sleep( qakkaFig.getLongPollTimeMillis() / 2 ); } catch (Exception ignored) {}
+                ret.addAll( getNextMessagesInternal( queueName, count ));
+
+                if ( ret.size() < count ) {
+                    try { Thread.sleep( qakkaFig.getLongPollTimeMillis() / 2 ); } catch (Exception ignored) {}
+                }
             }
+
+            if ( ret.isEmpty() ) {
+                logger.info( "Requested {} but queue '{}' is empty", count, queueName);
+            }
+            return ret;
+
+        } finally {
+            timer.close();
         }
 
-        return ret;
     }
 
 
@@ -242,10 +265,10 @@ public class DistributedQueueServiceImpl implements DistributedQueueService {
         }
 
         int maxRetries = qakkaFig.getMaxGetRetries();
-        int retries = 0;
+        int tries = 0;
 
         QueueGetRequest request = new QueueGetRequest( queueName, count );
-        while ( ++retries < maxRetries ) {
+        while ( ++tries < maxRetries ) {
             try {
                 Timeout t = new Timeout( qakkaFig.getGetTimeoutSeconds(), TimeUnit.SECONDS );
 
@@ -261,8 +284,8 @@ public class DistributedQueueServiceImpl implements DistributedQueueService {
                     if ( response != null && response instanceof QueueGetResponse) {
                         QueueGetResponse qprm = (QueueGetResponse)response;
                         if ( qprm.isSuccess() ) {
-                            if (retries > 1) {
-                                logger.debug( "getNextMessage {} SUCCESS after {} retries", queueName, retries );
+                            if (tries > 1) {
+                                logger.warn( "getNextMessage {} SUCCESS after {} tries", queueName, tries );
                             }
                         }
                         logger.debug("Returning queue {} messages {}", queueName, qprm.getQueueMessages().size());
@@ -270,41 +293,49 @@ public class DistributedQueueServiceImpl implements DistributedQueueService {
 
 
                     } else if ( response != null  ) {
-                        logger.debug("ERROR RESPONSE (1) popping queue {}, retrying {}", queueName, retries );
+                        logger.debug("ERROR RESPONSE (1) popping queue {}, retrying {}", queueName, tries );
 
                     } else {
-                        logger.debug("TIMEOUT popping from queue {}, retrying {}", queueName, retries );
+                        logger.debug("TIMEOUT popping from queue {}, retrying {}", queueName, tries );
                     }
 
                 } else if ( responseObject instanceof ClientActor.ErrorResponse ) {
 
                     final ClientActor.ErrorResponse errorResponse = (ClientActor.ErrorResponse)responseObject;
                     logger.debug("ACTORSYSTEM ERROR popping queue: {}, retrying {}",
-                        errorResponse.getMessage(), retries );
+                        errorResponse.getMessage(), tries );
 
                 } else {
-                    logger.debug("UNKNOWN RESPONSE popping queue {}, retrying {}", queueName, retries );
+                    logger.debug("UNKNOWN RESPONSE popping queue {}, retrying {}", queueName, tries );
                 }
 
             } catch ( Exception e ) {
-                logger.debug("ERROR popping to queue " + queueName + " retrying " + retries, e );
+                logger.error("ERROR popping to queue " + queueName + " retrying " + tries, e );
             }
         }
 
         throw new QakkaRuntimeException(
-                "Error getting from queue " + queueName + " after " + retries + " tries");
+                "Error getting from queue " + queueName + " after " + tries + " tries");
     }
 
 
     @Override
     public Status ackMessage(String queueName, UUID queueMessageId ) {
 
-        if ( queueManager.getQueueConfig( queueName ) == null ) {
-            throw new NotFoundException( "Queue not found: " + queueName );
-        }
+        Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.ACK_TIME_TOTAL ).time();
+        try {
 
-        QueueAckRequest message = new QueueAckRequest( queueName, queueMessageId );
-        return sendMessageToLocalQueueActors( message );
+            if ( queueManager.getQueueConfig( queueName ) == null ) {
+                throw new NotFoundException( "Queue not found: " + queueName );
+            }
+
+            QueueAckRequest message = new QueueAckRequest( queueName, queueMessageId );
+            return sendMessageToLocalRouters( message );
+
+
+        } finally {
+            timer.close();
+        }
     }
 
 
@@ -316,7 +347,7 @@ public class DistributedQueueServiceImpl implements DistributedQueueService {
         }
 
         QueueAckRequest message = new QueueAckRequest( queueName, messageId );
-        return sendMessageToLocalQueueActors( message );
+        return sendMessageToLocalRouters( message );
     }
 
 
@@ -332,7 +363,7 @@ public class DistributedQueueServiceImpl implements DistributedQueueService {
     }
 
 
-    private Status sendMessageToLocalQueueActors( QakkaMessage message ) {
+    private Status sendMessageToLocalRouters( QakkaMessage message ) {
 
         int maxRetries = 5;
         int retries = 0;
@@ -367,6 +398,6 @@ public class DistributedQueueServiceImpl implements DistributedQueueService {
     }
 
     public void shutdown() {
-        actorSystemManager.shutdownAll();
+        // no op
     }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/7be8c274/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueActorRouterProducer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueActorRouterProducer.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueActorRouterProducer.java
index d74936b..3bf6180 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueActorRouterProducer.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueActorRouterProducer.java
@@ -28,6 +28,7 @@ import akka.cluster.singleton.ClusterSingletonProxy;
 import akka.cluster.singleton.ClusterSingletonProxySettings;
 import com.google.inject.Inject;
 import com.google.inject.Injector;
+import com.google.inject.Singleton;
 import org.apache.usergrid.persistence.actorsystem.ActorSystemManager;
 import org.apache.usergrid.persistence.actorsystem.GuiceActorProducer;
 import org.apache.usergrid.persistence.actorsystem.RouterProducer;
@@ -41,6 +42,7 @@ import java.util.HashMap;
 import java.util.Map;
 
 
+@Singleton
 public class QueueActorRouterProducer implements RouterProducer {
 
     static Injector injector;
@@ -131,7 +133,6 @@ public class QueueActorRouterProducer implements RouterProducer {
     public Collection<Class> getMessageTypes() {
         return new ArrayList() {{
             add( QueueGetRequest.class );
-            add( QueueAckRequest.class );
             add( QueueInitRequest.class );
             add( QueueRefreshRequest.class );
             add( QueueTimeoutRequest.class );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/7be8c274/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueWriterRouterProducer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueWriterRouterProducer.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueWriterRouterProducer.java
index 006f1a7..c4e7acc 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueWriterRouterProducer.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueWriterRouterProducer.java
@@ -28,19 +28,22 @@ import akka.cluster.singleton.ClusterSingletonProxy;
 import akka.cluster.singleton.ClusterSingletonProxySettings;
 import com.google.inject.Inject;
 import com.google.inject.Injector;
+import com.google.inject.Singleton;
 import org.apache.usergrid.persistence.actorsystem.ActorSystemManager;
 import org.apache.usergrid.persistence.actorsystem.GuiceActorProducer;
 import org.apache.usergrid.persistence.actorsystem.RouterProducer;
 import org.apache.usergrid.persistence.qakka.QakkaFig;
 import org.apache.usergrid.persistence.qakka.distributed.actors.QueueWriterRouter;
+import org.apache.usergrid.persistence.qakka.distributed.messages.QueueAckRequest;
 import org.apache.usergrid.persistence.qakka.distributed.messages.QueueWriteRequest;
 
+import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
 
+@Singleton
 public class QueueWriterRouterProducer implements RouterProducer {
 
     static Injector injector;
@@ -128,7 +131,11 @@ public class QueueWriterRouterProducer implements RouterProducer {
 
     @Override
     public Collection<Class> getMessageTypes() {
-        return Collections.singletonList( QueueWriteRequest.class );
+        return new ArrayList() {{
+            add( QueueAckRequest.class );
+            add( QueueWriteRequest.class );
+        }};
+
     }
 
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/7be8c274/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QakkaMessage.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QakkaMessage.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QakkaMessage.java
index a1bbf14..a3919ea 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QakkaMessage.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QakkaMessage.java
@@ -21,8 +21,8 @@ package org.apache.usergrid.persistence.qakka.distributed.messages;
 
 import java.io.Serializable;
 
-/**
- * Marker interface
- */
+
 public interface QakkaMessage extends Serializable {
+
+    String getQueueName();
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/7be8c274/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueGetResponse.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueGetResponse.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueGetResponse.java
index c8004fb..1776360 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueGetResponse.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueGetResponse.java
@@ -30,16 +30,19 @@ import java.util.Collections;
 public class QueueGetResponse implements QakkaMessage {
     private final Collection<DatabaseQueueMessage> queueMessages;
     private final DistributedQueueService.Status status;
+    private final String queueName;
 
-
-    public QueueGetResponse(DistributedQueueService.Status status ) {
+    public QueueGetResponse(DistributedQueueService.Status status, String queueName ) {
         this.status = status;
         this.queueMessages = Collections.emptyList();
+        this.queueName = queueName;
     }
 
-    public QueueGetResponse(DistributedQueueService.Status status, Collection<DatabaseQueueMessage> queueMessages) {
+    public QueueGetResponse(
+        DistributedQueueService.Status status, Collection<DatabaseQueueMessage> queueMessages, String queueName) {
         this.status = status;
         this.queueMessages = queueMessages;
+        this.queueName = queueName;
     }
 
     public DistributedQueueService.Status getStatus() {
@@ -60,4 +63,9 @@ public class QueueGetResponse implements QakkaMessage {
                 .append( "status", status )
                 .toString();
     }
+
+    @Override
+    public String getQueueName() {
+        return queueName;
+    }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/7be8c274/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueSendResponse.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueSendResponse.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueSendResponse.java
index 0c295a0..9b065af 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueSendResponse.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueSendResponse.java
@@ -25,9 +25,12 @@ import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService
 
 public class QueueSendResponse implements QakkaMessage {
     private final DistributedQueueService.Status status;
+    private final String queueName;
 
-    public QueueSendResponse(DistributedQueueService.Status status) {
+
+    public QueueSendResponse( DistributedQueueService.Status status, String queueName ) {
         this.status = status;
+        this.queueName = queueName;
     }
 
     public DistributedQueueService.Status getSendStatus() {
@@ -40,4 +43,8 @@ public class QueueSendResponse implements QakkaMessage {
                 .toString();
     }
 
+    @Override
+    public String getQueueName() {
+        return queueName;
+    }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/7be8c274/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueWriteResponse.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueWriteResponse.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueWriteResponse.java
index 1eb513c..463b4df 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueWriteResponse.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueWriteResponse.java
@@ -25,9 +25,11 @@ import org.apache.usergrid.persistence.qakka.distributed.actors.QueueWriter;
 
 public class QueueWriteResponse implements QakkaMessage {
     private final QueueWriter.WriteStatus status;
+    private String queueName;
 
-    public QueueWriteResponse(QueueWriter.WriteStatus status) {
+    public QueueWriteResponse(QueueWriter.WriteStatus status, String queueName ) {
         this.status = status;
+        this.queueName = queueName;
     }
 
     public QueueWriter.WriteStatus getSendStatus() {
@@ -40,4 +42,8 @@ public class QueueWriteResponse implements QakkaMessage {
                 .toString();
     }
 
+    @Override
+    public String getQueueName() {
+        return queueName;
+    }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/7be8c274/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardMessageIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardMessageIterator.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardMessageIterator.java
index 6ec0774..29327e2 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardMessageIterator.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardMessageIterator.java
@@ -49,6 +49,7 @@ public class MultiShardMessageIterator implements Iterator<DatabaseQueueMessage>
     private final Iterator<Shard> shardIterator;
 
     private Iterator<DatabaseQueueMessage> currentIterator;
+
     private Shard currentShard;
     private UUID nextStart;
 
@@ -184,4 +185,9 @@ public class MultiShardMessageIterator implements Iterator<DatabaseQueueMessage>
 
     }
 
+
+    public Shard getCurrentShard() {
+        return currentShard;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/7be8c274/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/QueueMessageSerialization.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/QueueMessageSerialization.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/QueueMessageSerialization.java
index 3ebe735..86c50a5 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/QueueMessageSerialization.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/QueueMessageSerialization.java
@@ -51,4 +51,9 @@ public interface QueueMessageSerialization extends Migration {
     DatabaseQueueMessageBody loadMessageData(final UUID messageId);
 
     void deleteMessageData(final UUID messageId);
+
+    /**
+     * Write message to inflight table and remove from available table
+     */
+    void putInflight( DatabaseQueueMessage queueMessage );
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/7be8c274/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java
index d9a2543..33de7bc 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java
@@ -19,6 +19,7 @@
 
 package org.apache.usergrid.persistence.qakka.serialization.queuemessages.impl;
 
+import com.datastax.driver.core.BatchStatement;
 import com.datastax.driver.core.Row;
 import com.datastax.driver.core.Statement;
 import com.datastax.driver.core.querybuilder.Clause;
@@ -137,12 +138,6 @@ public class QueueMessageSerializationImpl implements QueueMessageSerialization
         final UUID queueMessageId =  message.getQueueMessageId() == null ?
                 QakkaUtils.getTimeUuid() : message.getQueueMessageId();
 
-        long queuedAt = message.getQueuedAt() == null ?
-                System.currentTimeMillis() : message.getQueuedAt();
-
-        long inflightAt = message.getInflightAt() == null ?
-                message.getQueuedAt() : message.getInflightAt();
-
         Shard.Type shardType = DatabaseQueueMessage.Type.DEFAULT.equals( message.getType() ) ?
                 Shard.Type.DEFAULT : Shard.Type.INFLIGHT;
 
@@ -152,16 +147,7 @@ public class QueueMessageSerializationImpl implements QueueMessageSerialization
             message.setShardId( shard.getShardId() );
         }
 
-        Statement insert = QueryBuilder.insertInto(getTableName(message.getType()))
-                .value( COLUMN_QUEUE_NAME,       message.getQueueName())
-                .value( COLUMN_REGION,           message.getRegion())
-                .value( COLUMN_SHARD_ID,         message.getShardId())
-                .value( COLUMN_MESSAGE_ID,       message.getMessageId())
-                .value( COLUMN_QUEUE_MESSAGE_ID, queueMessageId)
-                .value( COLUMN_INFLIGHT_AT,      inflightAt )
-                .value( COLUMN_QUEUED_AT,        queuedAt)
-            .using( QueryBuilder.ttl( maxTtl ) );
-
+        Statement insert = createWriteMessageStatement( message );
         cassandraClient.getQueueMessageSession().execute(insert);
 
         shardCounterSerialization.incrementCounter( message.getQueueName(), shardType, message.getShardId(), 1 );
@@ -233,28 +219,7 @@ public class QueueMessageSerializationImpl implements QueueMessageSerialization
             final DatabaseQueueMessage.Type type,
             final UUID queueMessageId ) {
 
-        final long shardId;
-        if ( shardIdOrNull == null ) {
-            Shard.Type shardType = DatabaseQueueMessage.Type.DEFAULT.equals( type ) ?
-                    Shard.Type.DEFAULT : Shard.Type.INFLIGHT;
-            Shard shard = shardStrategy.selectShard(
-                    queueName, actorSystemFig.getRegionLocal(), shardType, queueMessageId );
-            shardId = shard.getShardId();
-        } else {
-            shardId = shardIdOrNull;
-        }
-
-        Clause queueNameClause = QueryBuilder.eq(      COLUMN_QUEUE_NAME, queueName );
-        Clause regionClause = QueryBuilder.eq(         COLUMN_REGION, region );
-        Clause shardIdClause = QueryBuilder.eq(        COLUMN_SHARD_ID, shardId );
-        Clause queueMessageIdClause = QueryBuilder.eq( COLUMN_QUEUE_MESSAGE_ID, queueMessageId);
-
-        Statement delete = QueryBuilder.delete().from(getTableName( type ))
-                .where(queueNameClause)
-                .and(regionClause)
-                .and(shardIdClause)
-                .and(queueMessageIdClause);
-
+        Statement delete = createDeleteMessageStatement( queueName, region, null, type,queueMessageId);
         cassandraClient.getQueueMessageSession().execute( delete );
 
         messageCounterSerialization.decrementCounter( queueName, type, 1L );
@@ -297,14 +262,118 @@ public class QueueMessageSerializationImpl implements QueueMessageSerialization
     public void deleteMessageData( final UUID messageId ) {
 
         Clause messageIdClause = QueryBuilder.eq(COLUMN_MESSAGE_ID, messageId);
-
-        Statement delete = QueryBuilder.delete().from(TABLE_MESSAGE_DATA)
-                .where(messageIdClause);
+        Statement delete = QueryBuilder.delete().from(TABLE_MESSAGE_DATA).where(messageIdClause);
 
         cassandraClient.getApplicationSession().execute(delete);
     }
 
 
+    @Override
+    public void putInflight( DatabaseQueueMessage message ) {
+
+        // create statement to write new queue message to inflight table
+
+        Shard.Type shardType = Shard.Type.INFLIGHT;
+        Shard shard = shardStrategy.selectShard(
+            message.getQueueName(), message.getRegion(), shardType, message.getQueueMessageId() );
+
+        DatabaseQueueMessage inflightMessage = new DatabaseQueueMessage(
+            message.getMessageId(),
+            DatabaseQueueMessage.Type.INFLIGHT,
+            message.getQueueName(),
+            message.getRegion(),
+            shard.getShardId(),
+            message.getQueuedAt(),
+            System.currentTimeMillis(),
+            message.getQueueMessageId() );
+
+        Statement insert = createWriteMessageStatement( inflightMessage );
+
+        // create statement to delete queue message from available table
+
+        Statement delete = createDeleteMessageStatement(
+            message.getQueueName(),
+            message.getRegion(),
+            null,
+            DatabaseQueueMessage.Type.DEFAULT,
+            message.getQueueMessageId());
+
+        // execute statements as a batch
+
+        BatchStatement batchStatement = new BatchStatement();
+        batchStatement.add( insert );
+        batchStatement.add( delete );
+        cassandraClient.getQueueMessageSession().execute( batchStatement );
+
+        // bump counters
+
+        shardCounterSerialization.incrementCounter(
+            message.getQueueName(), Shard.Type.INFLIGHT, message.getShardId(), 1 );
+
+        messageCounterSerialization.incrementCounter(
+            message.getQueueName(), DatabaseQueueMessage.Type.INFLIGHT, 1L );
+
+        messageCounterSerialization.decrementCounter(
+            message.getQueueName(), DatabaseQueueMessage.Type.DEFAULT, 1L );
+    }
+
+
+    private Statement createDeleteMessageStatement( final String queueName,
+                                                    final String region,
+                                                    final Long shardIdOrNull,
+                                                    final DatabaseQueueMessage.Type type,
+                                                    final UUID queueMessageId ) {
+        final long shardId;
+        if ( shardIdOrNull == null ) {
+            Shard.Type shardType = DatabaseQueueMessage.Type.DEFAULT.equals( type ) ?
+                Shard.Type.DEFAULT : Shard.Type.INFLIGHT;
+            Shard shard = shardStrategy.selectShard(
+                queueName, region, shardType, queueMessageId );
+            shardId = shard.getShardId();
+        } else {
+            shardId = shardIdOrNull;
+        }
+
+        Clause queueNameClause = QueryBuilder.eq(      COLUMN_QUEUE_NAME, queueName );
+        Clause regionClause = QueryBuilder.eq(         COLUMN_REGION, region );
+        Clause shardIdClause = QueryBuilder.eq(        COLUMN_SHARD_ID, shardId );
+        Clause queueMessageIdClause = QueryBuilder.eq( COLUMN_QUEUE_MESSAGE_ID, queueMessageId);
+
+        Statement delete = QueryBuilder.delete().from(getTableName( type ))
+            .where(queueNameClause)
+            .and(regionClause)
+            .and(shardIdClause)
+            .and(queueMessageIdClause);
+
+        return delete;
+    }
+
+
+    private Statement createWriteMessageStatement( DatabaseQueueMessage message ) {
+
+        final UUID queueMessageId =  message.getQueueMessageId() == null ?
+            QakkaUtils.getTimeUuid() : message.getQueueMessageId();
+
+        long queuedAt = message.getQueuedAt() == null ?
+            System.currentTimeMillis() : message.getQueuedAt();
+
+        long inflightAt = message.getInflightAt() == null ?
+            message.getQueuedAt() : message.getInflightAt();
+
+        Statement insert = QueryBuilder.insertInto(getTableName(message.getType()))
+            .value( COLUMN_QUEUE_NAME,       message.getQueueName())
+            .value( COLUMN_REGION,           message.getRegion())
+            .value( COLUMN_SHARD_ID,         message.getShardId())
+            .value( COLUMN_MESSAGE_ID,       message.getMessageId())
+            .value( COLUMN_QUEUE_MESSAGE_ID, queueMessageId)
+            .value( COLUMN_INFLIGHT_AT,      inflightAt )
+            .value( COLUMN_QUEUED_AT,        queuedAt)
+            .using( QueryBuilder.ttl( maxTtl ) );
+
+        return insert;
+    }
+
+
     public static String getTableName(DatabaseQueueMessage.Type messageType){
 
         String table;

http://git-wip-us.apache.org/repos/asf/usergrid/blob/7be8c274/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java
index a5c95bd..5bd2b05 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java
@@ -102,9 +102,6 @@ public class QueueActorServiceTest extends AbstractTest {
             ByteBuffer blob = dqmb.getBlob();
 
             String returnedData = new String( blob.array(), "UTF-8" );
-//        ByteArrayInputStream bais = new ByteArrayInputStream( blob.array() );
-//        ObjectInputStream ios = new ObjectInputStream( bais );
-//        String returnedData = (String)ios.readObject();
 
             Assert.assertEquals( data, returnedData );
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/7be8c274/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelperTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelperTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelperTest.java
index 791650e..0f5b46f 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelperTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelperTest.java
@@ -177,7 +177,7 @@ public class QueueActorHelperTest extends AbstractTest {
             // put message inflight
 
             QueueActorHelper helper = injector.getInstance( QueueActorHelper.class );
-            helper.putInflight( queueName, message );
+            helper.putInflight( message );
 
             // message must be gone from messages_available table
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/7be8c274/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueReaderTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueReaderTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueReaderTest.java
index 5b42184..9fb8f29 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueReaderTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueReaderTest.java
@@ -30,6 +30,7 @@ import org.apache.usergrid.persistence.qakka.AbstractTest;
 import org.apache.usergrid.persistence.qakka.QakkaFig;
 import org.apache.usergrid.persistence.qakka.core.QakkaUtils;
 import org.apache.usergrid.persistence.qakka.core.impl.InMemoryQueue;
+import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService;
 import org.apache.usergrid.persistence.qakka.distributed.messages.QueueRefreshRequest;
 import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
 import org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization;
@@ -53,6 +54,8 @@ public class QueueReaderTest extends AbstractTest {
 
         Injector injector = getInjector();
 
+        injector.getInstance( DistributedQueueService.class ); // init the INJECTOR
+
         QakkaFig qakkaFig = injector.getInstance( QakkaFig.class );
         ActorSystemFig actorSystemFig = injector.getInstance( ActorSystemFig.class );
         ShardSerialization shardSerialization = injector.getInstance( ShardSerialization.class );
@@ -89,13 +92,16 @@ public class QueueReaderTest extends AbstractTest {
 
         // run the QueueRefresher to fill up the in-memory queue
 
-        QueueActorHelper helper = injector.getInstance( QueueActorHelper.class );
+        ActorSystem system = ActorSystem.create("Test-" + queueName);
+        ActorRef queueReaderRef = system.actorOf(
+            Props.create( GuiceActorProducer.class, QueueRefresher.class ), "queueReader");
+        QueueRefreshRequest refreshRequest = new QueueRefreshRequest( queueName, false );
 
         // need to wait for refresh to complete
         int maxRetries = 10;
         int retries = 0;
         while ( inMemoryQueue.size( queueName ) < qakkaFig.getQueueInMemorySize() && retries++ < maxRetries ) {
-            helper.queueRefresh( queueName );
+            queueReaderRef.tell( refreshRequest, null ); // tell sends message, returns immediately
             Thread.sleep(1000);
         }