You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2016/10/10 21:45:19 UTC

[1/6] usergrid git commit: Add total-time metrics, handle Ack requests via WriterRouter, use C* batch-statement for Ack operation

Repository: usergrid
Updated Branches:
  refs/heads/usergrid-1318-queue 71fe06fec -> 9f2863fd6


http://git-wip-us.apache.org/repos/asf/usergrid/blob/7be8c274/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/DatabaseQueueMessageSerializationTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/DatabaseQueueMessageSerializationTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/DatabaseQueueMessageSerializationTest.java
index 4ea6de3..88d89de 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/DatabaseQueueMessageSerializationTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/DatabaseQueueMessageSerializationTest.java
@@ -21,11 +21,13 @@ package org.apache.usergrid.persistence.qakka.serialization.queuemessages;
 
 import com.datastax.driver.core.DataType;
 import com.datastax.driver.core.ProtocolVersion;
+import com.google.inject.Injector;
 import org.apache.commons.lang.RandomStringUtils;
 import org.apache.usergrid.persistence.qakka.AbstractTest;
 import org.apache.usergrid.persistence.qakka.exceptions.QakkaRuntimeException;
 import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard;
 import org.apache.usergrid.persistence.qakka.core.QakkaUtils;
+import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardSerialization;
 import org.junit.Test;
 
 import java.io.*;
@@ -64,19 +66,27 @@ public class DatabaseQueueMessageSerializationTest extends AbstractTest {
     @Test
     public void deleteMessage(){
 
+        Injector injector = getInjector();
+
         QueueMessageSerialization queueMessageSerialization =
-                getInjector().getInstance( QueueMessageSerialization.class );
+            injector.getInstance( QueueMessageSerialization.class );
 
-        Shard shard1 = new Shard("test", "region1", Shard.Type.DEFAULT, 100L, null);
+        ShardSerialization shardSerialization =
+            injector.getInstance( ShardSerialization.class );
 
-        UUID messageId = QakkaUtils.getTimeUuid();
         String queueName = "dqmst_queue_" + RandomStringUtils.randomAlphanumeric( 20 );
+        String region = "dummy_region";
+
+        Shard shard1 = new Shard(queueName, region, Shard.Type.DEFAULT, 1L, null);
+        shardSerialization.createShard( shard1 );
+
+        UUID messageId = QakkaUtils.getTimeUuid();
 
         DatabaseQueueMessage message = new DatabaseQueueMessage(
                 messageId,
                 DatabaseQueueMessage.Type.DEFAULT,
                 queueName,
-                "dummy_region",
+                region,
                 shard1.getShardId(),
                 System.currentTimeMillis(),
                 null, null );
@@ -85,14 +95,14 @@ public class DatabaseQueueMessageSerializationTest extends AbstractTest {
 
         queueMessageSerialization.deleteMessage(
             queueName,
-            "dummy_region",
+            region,
             shard1.getShardId(),
             DatabaseQueueMessage.Type.DEFAULT,
             queueMessageId );
 
         assertNull( queueMessageSerialization.loadMessage(
             queueName,
-            "dummy_region",
+            region,
             shard1.getShardId(),
             DatabaseQueueMessage.Type.DEFAULT,
             queueMessageId

http://git-wip-us.apache.org/repos/asf/usergrid/blob/7be8c274/stack/corepersistence/queue/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/resources/log4j.properties b/stack/corepersistence/queue/src/test/resources/log4j.properties
index 2653fd6..c7d53a3 100644
--- a/stack/corepersistence/queue/src/test/resources/log4j.properties
+++ b/stack/corepersistence/queue/src/test/resources/log4j.properties
@@ -25,6 +25,6 @@ log4j.logger.org.apache.cassandra=WARN
 log4j.logger.org.glassfish=WARN
 
 log4j.logger.org.apache.usergrid=INFO
-log4j.logger.org.apache.usergrid.persistence.qakka=INFO
-log4j.logger.org.apache.usergrid.persistence.queue=INFO
+#log4j.logger.org.apache.usergrid.persistence.qakka=DEBUG
+#log4j.logger.org.apache.usergrid.persistence.queue=DEBUG
 log4j.logger.org.apache.usergrid.corepersistence.asyncevents=INFO


[5/6] usergrid git commit: Minor test improvements

Posted by sn...@apache.org.
Minor test improvements


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/63561ee0
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/63561ee0
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/63561ee0

Branch: refs/heads/usergrid-1318-queue
Commit: 63561ee03d078f738e1dba33a26555b7cbd27064
Parents: 7f3b2da
Author: Dave Johnson <sn...@apache.org>
Authored: Mon Oct 10 16:35:16 2016 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Mon Oct 10 16:35:16 2016 -0400

----------------------------------------------------------------------
 .../persistence/qakka/core/QueueMessageManagerTest.java      | 2 --
 .../persistence/qakka/distributed/QueueActorServiceTest.java | 8 +++-----
 .../qakka/distributed/actors/ShardAllocatorTest.java         | 8 ++++----
 3 files changed, 7 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/63561ee0/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
index f77f31b..5f0216f 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
@@ -212,8 +212,6 @@ public class QueueMessageManagerTest extends AbstractTest {
 
             distributedQueueService.processTimeouts();
 
-            Thread.sleep( qakkaFig.getQueueTimeoutSeconds() * 1000 );
-
             // attempt to ack other half of messages
 
             for (QueueMessage message : messages) {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/63561ee0/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java
index 5bd2b05..7fe8b16 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java
@@ -29,10 +29,7 @@ import org.apache.usergrid.persistence.actorsystem.ActorSystemFig;
 import org.apache.usergrid.persistence.qakka.AbstractTest;
 import org.apache.usergrid.persistence.qakka.App;
 import org.apache.usergrid.persistence.qakka.QakkaModule;
-import org.apache.usergrid.persistence.qakka.core.CassandraClient;
-import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl;
-import org.apache.usergrid.persistence.qakka.core.Queue;
-import org.apache.usergrid.persistence.qakka.core.QueueManager;
+import org.apache.usergrid.persistence.qakka.core.*;
 import org.apache.usergrid.persistence.qakka.core.impl.InMemoryQueue;
 import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
 import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessageBody;
@@ -129,6 +126,7 @@ public class QueueActorServiceTest extends AbstractTest {
         QueueMessageSerialization serialization         = injector.getInstance( QueueMessageSerialization.class );
         TransferLogSerialization xferLogSerialization   = injector.getInstance( TransferLogSerialization.class );
         InMemoryQueue inMemoryQueue                     = injector.getInstance( InMemoryQueue.class );
+        QueueMessageManager queueMessageManager         = injector.getInstance( QueueMessageManager.class );
 
         String queueName = "queue_testGetMultipleQueueMessages_" + UUID.randomUUID();
         QueueManager queueManager = injector.getInstance( QueueManager.class );
@@ -159,7 +157,7 @@ public class QueueActorServiceTest extends AbstractTest {
             int count = 0;
             while (retries++ < maxRetries) {
                 distributedQueueService.refresh();
-                if (inMemoryQueue.size( queueName ) == 100) {
+                if ( queueMessageManager.getQueueDepth(  queueName ) == 100 ) {
                     count = 100;
                     break;
                 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/63561ee0/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java
index aae5f44..ecacccc 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java
@@ -180,7 +180,7 @@ public class ShardAllocatorTest extends AbstractTest {
         DistributedQueueService distributedQueueService = injector.getInstance( DistributedQueueService.class );
         ShardCounterSerialization shardCounterSer = injector.getInstance( ShardCounterSerialization.class );
 
-        Assert.assertEquals( "test assumes 'queue.shard.max.size' is 15 ", 15, qakkaFig.getMaxShardSize() );
+        Assert.assertEquals( "test assumes 'queue.shard.max.size' is 10 ", 10, qakkaFig.getMaxShardSize() );
 
         String region = actorSystemFig.getRegionLocal();
         App app = injector.getInstance( App.class );
@@ -210,10 +210,10 @@ public class ShardAllocatorTest extends AbstractTest {
 
             distributedQueueService.refresh();
 
-            // Test that right number of shards created
+            // Test that approximately right number of shards created
             int shardCount = countShards( cassandraClient, shardCounterSer, queueName, region, Shard.Type.DEFAULT );
-            Assert.assertTrue( "shards > 10", shardCount > 10 );
-            Assert.assertTrue( "shards < 20", shardCount < 20 );
+            Assert.assertTrue( "shards > 7", shardCount > 7 );
+            Assert.assertTrue( "shards < 17", shardCount < 17 );
 
         } finally {
             queueManager.deleteQueue( queueName );


[2/6] usergrid git commit: Add total-time metrics, handle Ack requests via WriterRouter, use C* batch-statement for Ack operation

Posted by sn...@apache.org.
Add total-time metrics, handle Ack requests via WriterRouter, use C* batch-statement for Ack operation


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/7be8c274
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/7be8c274
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/7be8c274

Branch: refs/heads/usergrid-1318-queue
Commit: 7be8c274e83e1680221e06c327ad04e697c61ca6
Parents: 71fe06f
Author: Dave Johnson <sn...@apache.org>
Authored: Mon Oct 10 09:37:32 2016 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Mon Oct 10 09:37:32 2016 -0400

----------------------------------------------------------------------
 .../usergrid/persistence/qakka/QakkaFig.java    |   2 +-
 .../persistence/qakka/api/QueueResource.java    |  83 ++++------
 .../qakka/distributed/actors/QueueActor.java    |  40 +----
 .../distributed/actors/QueueActorHelper.java    | 150 +++++------------
 .../distributed/actors/QueueActorRouter.java    |  50 +-----
 .../distributed/actors/QueueRefresher.java      | 103 +++++++++++-
 .../qakka/distributed/actors/QueueSender.java   |  16 +-
 .../distributed/actors/QueueTimeouter.java      |  13 --
 .../qakka/distributed/actors/QueueWriter.java   |  49 ++++--
 .../distributed/actors/QueueWriterRouter.java   |   7 +-
 .../impl/DistributedQueueServiceImpl.java       | 161 +++++++++++--------
 .../impl/QueueActorRouterProducer.java          |   3 +-
 .../impl/QueueWriterRouterProducer.java         |  11 +-
 .../distributed/messages/QakkaMessage.java      |   6 +-
 .../distributed/messages/QueueGetResponse.java  |  14 +-
 .../distributed/messages/QueueSendResponse.java |   9 +-
 .../messages/QueueWriteResponse.java            |   8 +-
 .../MultiShardMessageIterator.java              |   6 +
 .../QueueMessageSerialization.java              |   5 +
 .../impl/QueueMessageSerializationImpl.java     | 151 ++++++++++++-----
 .../distributed/QueueActorServiceTest.java      |   3 -
 .../actors/QueueActorHelperTest.java            |   2 +-
 .../distributed/actors/QueueReaderTest.java     |  10 +-
 .../DatabaseQueueMessageSerializationTest.java  |  22 ++-
 .../queue/src/test/resources/log4j.properties   |   4 +-
 25 files changed, 507 insertions(+), 421 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/7be8c274/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
index 7d89187..3093c39 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
@@ -109,7 +109,7 @@ public interface QakkaFig extends GuicyFig, Serializable {
 
     /** How long to wait for response from queue actor before timing out and trying again */
     @Key(QUEUE_GET_TIMEOUT)
-    @Default("4")
+    @Default("1")
     int getGetTimeoutSeconds();
 
     /** Max number of times to retry call to queue writer for queue send operation */

http://git-wip-us.apache.org/repos/asf/usergrid/blob/7be8c274/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/QueueResource.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/QueueResource.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/QueueResource.java
index 10dae04..b609de3 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/QueueResource.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/api/QueueResource.java
@@ -19,13 +19,10 @@
 
 package org.apache.usergrid.persistence.qakka.api;
 
-import com.codahale.metrics.Timer;
 import com.fasterxml.jackson.databind.util.ByteBufferBackedInputStream;
 import com.google.common.base.Preconditions;
 import com.google.common.io.ByteStreams;
-import org.apache.usergrid.persistence.qakka.MetricsService;
 import org.apache.usergrid.persistence.qakka.core.*;
-import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardCounterSerialization;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,7 +43,6 @@ public class QueueResource {
 
     private final QueueManager        queueManager;
     private final QueueMessageManager queueMessageManager;
-    private final MetricsService      metricsService;
     private final URIStrategy         uriStrategy;
     private final Regions             regions;
 
@@ -55,13 +51,11 @@ public class QueueResource {
     public QueueResource(
             QueueManager              queueManager,
             QueueMessageManager       queueMessageManager,
-            MetricsService            metricsService,
             URIStrategy               uriStrategy,
             Regions                   regions ) {
 
         this.queueManager              = queueManager;
         this.queueMessageManager       = queueMessageManager;
-        this.metricsService            = metricsService;
         this.uriStrategy               = uriStrategy;
         this.regions                   = regions;
     }
@@ -262,8 +256,6 @@ public class QueueResource {
                                    String contentType,
                                    ByteBuffer byteBuffer) {
 
-        Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.SEND_TIME_TOTAL ).time();
-        try {
 
             Preconditions.checkArgument( !QakkaUtils.isNullOrEmpty( queueName ), "Queue name is required" );
 
@@ -285,9 +277,6 @@ public class QueueResource {
             apiResponse.setCount( 1 );
             return Response.ok().entity( apiResponse ).build();
 
-        } finally {
-            timer.close();
-        }
     }
 
 
@@ -297,38 +286,31 @@ public class QueueResource {
     public Response getNextMessages( @PathParam("queueName") String queueName,
                                      @QueryParam("count") @DefaultValue("1") String countParam) throws Exception {
 
-        Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.GET_TIME_TOTAL ).time();
-        try {
-
-            Preconditions.checkArgument( !QakkaUtils.isNullOrEmpty( queueName ), "Queue name is required" );
+        Preconditions.checkArgument( !QakkaUtils.isNullOrEmpty( queueName ), "Queue name is required" );
 
-            int count = 1;
-            try {
-                count = Integer.parseInt( countParam );
-            } catch (Exception e) {
-                throw new IllegalArgumentException( "Invalid count parameter" );
-            }
-            if (count <= 0) {
-                // invalid count
-                throw new IllegalArgumentException( "Count must be >= 1" );
-            }
-
-            List<QueueMessage> messages = queueMessageManager.getNextMessages( queueName, count );
+        int count = 1;
+        try {
+            count = Integer.parseInt( countParam );
+        } catch (Exception e) {
+            throw new IllegalArgumentException( "Invalid count parameter" );
+        }
+        if (count <= 0) {
+            // invalid count
+            throw new IllegalArgumentException( "Count must be >= 1" );
+        }
 
-            ApiResponse apiResponse = new ApiResponse();
+        List<QueueMessage> messages = queueMessageManager.getNextMessages( queueName, count );
 
-            if (messages != null && !messages.isEmpty()) {
-                apiResponse.setQueueMessages( messages );
+        ApiResponse apiResponse = new ApiResponse();
 
-            } else { // always return queueMessages field
-                apiResponse.setQueueMessages( Collections.EMPTY_LIST );
-            }
-            apiResponse.setCount( apiResponse.getQueueMessages().size() );
-            return Response.ok().entity( apiResponse ).build();
+        if (messages != null && !messages.isEmpty()) {
+            apiResponse.setQueueMessages( messages );
 
-        } finally {
-            timer.close();
+        } else { // always return queueMessages field
+            apiResponse.setQueueMessages( Collections.EMPTY_LIST );
         }
+        apiResponse.setCount( apiResponse.getQueueMessages().size() );
+        return Response.ok().entity( apiResponse ).build();
     }
 
 
@@ -338,25 +320,18 @@ public class QueueResource {
     public Response ackMessage( @PathParam("queueName") String queueName,
                                 @PathParam("queueMessageId") String queueMessageId) throws Exception {
 
-        Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.ACK_TIME_TOTAL ).time();
-        try {
-
-            Preconditions.checkArgument( !QakkaUtils.isNullOrEmpty( queueName ), "Queue name is required" );
+        Preconditions.checkArgument( !QakkaUtils.isNullOrEmpty( queueName ), "Queue name is required" );
 
-            UUID messageUuid;
-            try {
-                messageUuid = UUID.fromString( queueMessageId );
-            } catch (Exception e) {
-                throw new IllegalArgumentException( "Invalid queue message UUID" );
-            }
-            queueMessageManager.ackMessage( queueName, messageUuid );
-
-            ApiResponse apiResponse = new ApiResponse();
-            return Response.ok().entity( apiResponse ).build();
-
-        } finally {
-            timer.close();
+        UUID messageUuid;
+        try {
+            messageUuid = UUID.fromString( queueMessageId );
+        } catch (Exception e) {
+            throw new IllegalArgumentException( "Invalid queue message UUID" );
         }
+        queueMessageManager.ackMessage( queueName, messageUuid );
+
+        ApiResponse apiResponse = new ApiResponse();
+        return Response.ok().entity( apiResponse ).build();
     }
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/7be8c274/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
index 9ce38ef..248f9cd 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
@@ -64,32 +64,23 @@ public class QueueActor extends UntypedActor {
 
     private final Set<String> queuesSeen = new HashSet<>();
 
-    //private final Injector injector;
-
 
     @Inject
     public QueueActor(
-        //Injector         injector,
         QakkaFig         qakkaFig,
         InMemoryQueue    inMemoryQueue,
         QueueActorHelper queueActorHelper,
         MetricsService   metricsService,
         MessageCounterSerialization messageCounterSerialization
     ) {
-        //this.injector = injector;
         this.qakkaFig = qakkaFig;
         this.inMemoryQueue = inMemoryQueue;
         this.queueActorHelper = queueActorHelper;
         this.metricsService = metricsService;
         this.messageCounterSerialization = messageCounterSerialization;
-
-//        qakkaFig         = injector.getInstance( QakkaFig.class );
-//        inMemoryQueue    = injector.getInstance( InMemoryQueue.class );
-//        queueActorHelper = injector.getInstance( QueueActorHelper.class );
-//        metricsService   = injector.getInstance( MetricsService.class );
-//        messageCounterSerialization = injector.getInstance( MessageCounterSerialization.class );
     }
 
+
     @Override
     public void onReceive(Object message) {
 
@@ -134,6 +125,7 @@ public class QueueActor extends UntypedActor {
                 logger.debug("Created shard allocater for queue {}", request.getQueueName() );
             }
 
+
         } else if ( message instanceof QueueRefreshRequest ) {
             QueueRefreshRequest request = (QueueRefreshRequest)message;
             queuesSeen.add( request.getQueueName() );
@@ -154,6 +146,7 @@ public class QueueActor extends UntypedActor {
             // hand-off to queue's reader
             queueReadersByQueueName.get( request.getQueueName() ).tell( request, self() );
 
+
         } else if ( message instanceof QueueTimeoutRequest ) {
             QueueTimeoutRequest request = (QueueTimeoutRequest)message;
 
@@ -169,6 +162,7 @@ public class QueueActor extends UntypedActor {
             // ASYNCHRONOUS -> hand-off to queue's timeouter
             queueTimeoutersByQueueName.get( request.getQueueName() ).tell( request, self() );
 
+
         } else if ( message instanceof ShardCheckRequest ) {
             ShardCheckRequest request = (ShardCheckRequest)message;
 
@@ -184,6 +178,7 @@ public class QueueActor extends UntypedActor {
             // ASYNCHRONOUS -> hand-off to queue's shard allocator
             shardAllocatorsByQueueName.get( request.getQueueName() ).tell( request, self() );
 
+
         } else if ( message instanceof QueueGetRequest) {
 
             QueueGetRequest queueGetRequest = (QueueGetRequest) message;
@@ -199,30 +194,7 @@ public class QueueActor extends UntypedActor {
                 Collection<DatabaseQueueMessage> messages = queueActorHelper.getMessages( queueName, numRequested);
 
                 getSender().tell( new QueueGetResponse(
-                        DistributedQueueService.Status.SUCCESS, messages ), getSender() );
-
-            } finally {
-                timer.close();
-            }
-
-
-        } else if ( message instanceof QueueAckRequest) {
-
-            Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.ACK_TIME_ACK ).time();
-            try {
-
-                QueueAckRequest queueAckRequest = (QueueAckRequest) message;
-
-                queuesSeen.add( queueAckRequest.getQueueName() );
-
-                DistributedQueueService.Status status = queueActorHelper.ackQueueMessage(
-                        queueAckRequest.getQueueName(),
-                        queueAckRequest.getQueueMessageId() );
-
-                getSender().tell( new QueueAckResponse(
-                        queueAckRequest.getQueueName(),
-                        queueAckRequest.getQueueMessageId(),
-                        status ), getSender() );
+                        DistributedQueueService.Status.SUCCESS, messages, queueName ), getSender() );
 
             } finally {
                 timer.close();

http://git-wip-us.apache.org/repos/asf/usergrid/blob/7be8c274/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java
index e3996c5..fcb3fba 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java
@@ -41,7 +41,6 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Optional;
 import java.util.UUID;
-import java.util.concurrent.atomic.AtomicLong;
 
 
 public class QueueActorHelper {
@@ -55,9 +54,6 @@ public class QueueActorHelper {
     private final MetricsService            metricsService;
     private final CassandraClient           cassandraClient;
 
-    private final AtomicLong runCount = new AtomicLong(0);
-    private final AtomicLong totalRead = new AtomicLong(0);
-
 
     @Inject
     public QueueActorHelper(
@@ -109,7 +105,7 @@ public class QueueActorHelper {
 
             if (queueMessage != null) {
 
-                if (putInflight( queueName, queueMessage )) {
+                if (putInflight( queueMessage )) {
                     queueMessages.add( queueMessage );
                 }
 
@@ -125,10 +121,48 @@ public class QueueActorHelper {
     }
 
 
+    boolean putInflight( DatabaseQueueMessage queueMessage ) {
+
+        UUID qmid = queueMessage.getQueueMessageId();
+        try {
+
+            messageSerialization.putInflight( queueMessage );
+
+        } catch ( Throwable t ) {
+            logger.error("Error putting inflight queue message "
+                + qmid + " queue name: " + queueMessage.getQueueName(), t);
+
+            auditLogSerialization.recordAuditLog(
+                AuditLog.Action.GET,
+                AuditLog.Status.ERROR,
+                queueMessage.getQueueName(),
+                actorSystemFig.getRegionLocal(),
+                queueMessage.getMessageId(),
+                qmid);
+
+            return false;
+        }
+
+        auditLogSerialization.recordAuditLog(
+            AuditLog.Action.GET,
+            AuditLog.Status.SUCCESS,
+            queueMessage.getQueueName(),
+            actorSystemFig.getRegionLocal(),
+            queueMessage.getMessageId(),
+            qmid);
+
+        return true;
+    }
+
+
     DistributedQueueService.Status ackQueueMessage(String queueName, UUID queueMessageId ) {
 
-        DatabaseQueueMessage queueMessage = loadDatabaseQueueMessage(
-                queueName, queueMessageId, DatabaseQueueMessage.Type.INFLIGHT );
+        DatabaseQueueMessage queueMessage = messageSerialization.loadMessage(
+            queueName,
+            actorSystemFig.getRegionLocal(),
+            null,
+            DatabaseQueueMessage.Type.INFLIGHT,
+            queueMessageId );
 
         if ( queueMessage == null ) {
             logger.error("Queue {} queue message id {} not found in inflight table", queueName, queueMessageId);
@@ -174,106 +208,4 @@ public class QueueActorHelper {
             return DistributedQueueService.Status.ERROR;
         }
     }
-
-
-    boolean putInflight( String queueName, DatabaseQueueMessage queueMessage ) {
-
-        UUID qmid = queueMessage.getQueueMessageId();
-        try {
-
-            DatabaseQueueMessage inflightMessage = new DatabaseQueueMessage(
-                queueMessage.getMessageId(),
-                DatabaseQueueMessage.Type.INFLIGHT,
-                queueName,
-                actorSystemFig.getRegionLocal(),
-                null,                         // let serialization select the shard
-                queueMessage.getQueuedAt(),
-                System.currentTimeMillis(),
-                qmid);
-
-            messageSerialization.writeMessage( inflightMessage );
-
-            DatabaseQueueMessage retrieved = loadDatabaseQueueMessage(
-                queueName, qmid, DatabaseQueueMessage.Type.INFLIGHT );
-            if ( retrieved == null ) {
-                logger.error("Failed ot write queue message id {} to inflight table", qmid);
-                return false;
-            }
-
-            messageSerialization.deleteMessage(
-                    queueName,
-                    actorSystemFig.getRegionLocal(),
-                    null,
-                    DatabaseQueueMessage.Type.DEFAULT,
-                    qmid);
-
-            //logger.debug("Put message {} inflight for queue name {}", qmid, queueName);
-
-        } catch ( Throwable t ) {
-            logger.error("Error putting inflight queue message " + qmid + " queue name: " + queueName, t);
-
-            auditLogSerialization.recordAuditLog(
-                    AuditLog.Action.GET,
-                    AuditLog.Status.ERROR,
-                    queueName,
-                    actorSystemFig.getRegionLocal(),
-                    queueMessage.getMessageId(),
-                    qmid);
-
-            return false;
-        }
-
-        auditLogSerialization.recordAuditLog(
-                AuditLog.Action.GET,
-                AuditLog.Status.SUCCESS,
-                queueName,
-                actorSystemFig.getRegionLocal(),
-                queueMessage.getMessageId(),
-                qmid);
-
-        return true;
-    }
-
-
-    void queueRefresh( String queueName ) {
-
-        Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.REFRESH_TIME).time();
-
-        try {
-
-            if (inMemoryQueue.size( queueName ) < qakkaFig.getQueueInMemorySize()) {
-
-                // TODO: need to track the starting shard
-
-                ShardIterator shardIterator = new ShardIterator(
-                    cassandraClient, queueName, actorSystemFig.getRegionLocal(),
-                    Shard.Type.DEFAULT, Optional.empty() );
-
-                UUID since = inMemoryQueue.getNewest( queueName );
-
-                String region = actorSystemFig.getRegionLocal();
-                MultiShardMessageIterator multiShardIterator = new MultiShardMessageIterator(
-                    cassandraClient, queueName, region, DatabaseQueueMessage.Type.DEFAULT,
-                    shardIterator, since);
-
-                int need = qakkaFig.getQueueInMemorySize() - inMemoryQueue.size( queueName );
-                int count = 0;
-
-                while ( multiShardIterator.hasNext() && count < need ) {
-                    DatabaseQueueMessage queueMessage = multiShardIterator.next();
-                    inMemoryQueue.add( queueName, queueMessage );
-                    count++;
-                }
-
-                if ( count > 0 ) {
-                    logger.debug( "Added {} in-memory for queue {}, new size = {}",
-                        count, queueName, inMemoryQueue.size( queueName ) );
-                }
-            }
-
-        } finally {
-            timer.close();
-        }
-
-    }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/7be8c274/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java
index c40a3d9..f908e7f 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java
@@ -27,6 +27,7 @@ import akka.routing.FromConfig;
 import com.google.inject.Inject;
 import com.google.inject.Injector;
 import org.apache.usergrid.persistence.actorsystem.GuiceActorProducer;
+import org.apache.usergrid.persistence.qakka.distributed.impl.QueueActorRouterProducer;
 import org.apache.usergrid.persistence.qakka.distributed.messages.*;
 
 
@@ -36,10 +37,13 @@ import org.apache.usergrid.persistence.qakka.distributed.messages.*;
 public class QueueActorRouter extends UntypedActor {
 
     private final ActorRef routerRef;
+    private final QueueActorRouterProducer queueActorRouterProducer;
 
 
     @Inject
-    public QueueActorRouter( Injector injector ) {
+    public QueueActorRouter( QueueActorRouterProducer queueActorRouterProducer ) {
+
+        this.queueActorRouterProducer = queueActorRouterProducer;
 
         this.routerRef = getContext().actorOf( FromConfig.getInstance().props(
             Props.create(GuiceActorProducer.class, QueueActor.class)), "router");
@@ -48,51 +52,13 @@ public class QueueActorRouter extends UntypedActor {
     @Override
     public void onReceive(Object message) {
 
-        // TODO: can we do something smarter than this if-then-else structure
-        // e.g. if message is recognized as one of ours, then we just pass it on?
-
-        if ( message instanceof QueueGetRequest) {
-            QueueGetRequest qgr = (QueueGetRequest) message;
+        if ( queueActorRouterProducer.getMessageTypes().contains( message.getClass() ) ) {
+            QakkaMessage qakkaMessage = (QakkaMessage) message;
 
             ConsistentHashingRouter.ConsistentHashableEnvelope envelope =
-                    new ConsistentHashingRouter.ConsistentHashableEnvelope( message, qgr.getQueueName() );
+                    new ConsistentHashingRouter.ConsistentHashableEnvelope( message, qakkaMessage.getQueueName() );
             routerRef.tell( envelope, getSender() );
 
-        } else if ( message instanceof QueueAckRequest) {
-            QueueAckRequest qar = (QueueAckRequest)message;
-
-            ConsistentHashingRouter.ConsistentHashableEnvelope envelope =
-                    new ConsistentHashingRouter.ConsistentHashableEnvelope( message, qar.getQueueName() );
-            routerRef.tell( envelope, getSender());
-
-        } else if ( message instanceof QueueInitRequest) {
-            QueueInitRequest qar = (QueueInitRequest)message;
-
-            ConsistentHashingRouter.ConsistentHashableEnvelope envelope =
-                    new ConsistentHashingRouter.ConsistentHashableEnvelope( message, qar.getQueueName() );
-            routerRef.tell( envelope, getSender());
-
-        } else if ( message instanceof QueueRefreshRequest) {
-            QueueRefreshRequest qar = (QueueRefreshRequest)message;
-
-            ConsistentHashingRouter.ConsistentHashableEnvelope envelope =
-                    new ConsistentHashingRouter.ConsistentHashableEnvelope( message, qar.getQueueName() );
-            routerRef.tell( envelope, getSender());
-
-        } else if ( message instanceof QueueTimeoutRequest) {
-            QueueTimeoutRequest qar = (QueueTimeoutRequest)message;
-
-            ConsistentHashingRouter.ConsistentHashableEnvelope envelope =
-                    new ConsistentHashingRouter.ConsistentHashableEnvelope( message, qar.getQueueName() );
-            routerRef.tell( envelope, getSender());
-
-        } else if ( message instanceof ShardCheckRequest) {
-            ShardCheckRequest qar = (ShardCheckRequest)message;
-
-            ConsistentHashingRouter.ConsistentHashableEnvelope envelope =
-                    new ConsistentHashingRouter.ConsistentHashableEnvelope( message, qar.getQueueName() );
-            routerRef.tell( envelope, getSender());
-
         } else {
             unhandled(message);
         }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/7be8c274/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java
index ae9969c..afd5640 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java
@@ -20,20 +20,52 @@
 package org.apache.usergrid.persistence.qakka.distributed.actors;
 
 import akka.actor.UntypedActor;
+import com.codahale.metrics.Timer;
 import com.google.inject.Inject;
+import org.apache.usergrid.persistence.actorsystem.ActorSystemFig;
+import org.apache.usergrid.persistence.qakka.MetricsService;
+import org.apache.usergrid.persistence.qakka.QakkaFig;
+import org.apache.usergrid.persistence.qakka.core.CassandraClient;
+import org.apache.usergrid.persistence.qakka.core.impl.InMemoryQueue;
 import org.apache.usergrid.persistence.qakka.distributed.messages.QueueRefreshRequest;
+import org.apache.usergrid.persistence.qakka.serialization.MultiShardMessageIterator;
+import org.apache.usergrid.persistence.qakka.serialization.auditlog.AuditLogSerialization;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization;
+import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard;
+import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardIterator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+
 
 public class QueueRefresher extends UntypedActor {
     private static final Logger logger = LoggerFactory.getLogger( QueueRefresher.class );
 
-    final QueueActorHelper helper;
+    private final ActorSystemFig  actorSystemFig;
+    private final InMemoryQueue   inMemoryQueue;
+    private final QakkaFig        qakkaFig;
+    private final MetricsService  metricsService;
+    private final CassandraClient cassandraClient;
+
 
     @Inject
-    public QueueRefresher( QueueActorHelper helper ) {
-        this.helper = helper;
+    public QueueRefresher(
+        ActorSystemFig  actorSystemFig,
+        InMemoryQueue   inMemoryQueue,
+        QakkaFig        qakkaFig,
+        MetricsService  metricsService,
+        CassandraClient cassandraClient
+    ) {
+        this.actorSystemFig  = actorSystemFig;
+        this.inMemoryQueue   = inMemoryQueue;
+        this.qakkaFig        = qakkaFig;
+        this.metricsService  = metricsService;
+        this.cassandraClient = cassandraClient;
     }
 
 
@@ -44,10 +76,73 @@ public class QueueRefresher extends UntypedActor {
 
             QueueRefreshRequest request = (QueueRefreshRequest) message;
             String queueName = request.getQueueName();
-            helper.queueRefresh( queueName );
+            queueRefresh( queueName );
 
         } else {
             unhandled( message );
         }
     }
+
+    Map<String, Long> startingShards = new HashMap<>();
+
+
+    void queueRefresh( String queueName ) {
+
+        Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.REFRESH_TIME).time();
+
+        try {
+
+            if (inMemoryQueue.size( queueName ) < qakkaFig.getQueueInMemorySize()) {
+
+                final Optional shardIdOptional;
+                final String shardKey =
+                    createShardKey( queueName, Shard.Type.DEFAULT, actorSystemFig.getRegionLocal() );
+                Long shardId = startingShards.get( shardKey );
+
+                if ( shardId != null ) {
+                    shardIdOptional = Optional.of( shardId );
+                } else {
+                    shardIdOptional = Optional.empty();
+                }
+
+                ShardIterator shardIterator = new ShardIterator(
+                    cassandraClient, queueName, actorSystemFig.getRegionLocal(),
+                    Shard.Type.DEFAULT, Optional.empty() );
+
+                UUID since = inMemoryQueue.getNewest( queueName );
+
+                String region = actorSystemFig.getRegionLocal();
+                MultiShardMessageIterator multiShardIterator = new MultiShardMessageIterator(
+                    cassandraClient, queueName, region, DatabaseQueueMessage.Type.DEFAULT,
+                    shardIterator, since);
+
+                int need = qakkaFig.getQueueInMemorySize() - inMemoryQueue.size( queueName );
+                int count = 0;
+
+                while ( multiShardIterator.hasNext() && count < need ) {
+                    DatabaseQueueMessage queueMessage = multiShardIterator.next();
+                    inMemoryQueue.add( queueName, queueMessage );
+                    count++;
+                }
+
+                if ( multiShardIterator.getCurrentShard() != null ) {
+                    startingShards.put( shardKey, multiShardIterator.getCurrentShard().getShardId() );
+                }
+
+                if ( count > 0 ) {
+                    logger.debug( "Added {} in-memory for queue {}, new size = {}",
+                        count, queueName, inMemoryQueue.size( queueName ) );
+                }
+            }
+
+        } finally {
+            timer.close();
+        }
+
+    }
+
+    private String createShardKey(String queueName, Shard.Type type, String region ) {
+        return queueName + "_" + type + region;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/7be8c274/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java
index 3dc695e..ccc39f5 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java
@@ -26,18 +26,13 @@ import akka.pattern.Patterns;
 import akka.util.Timeout;
 import com.codahale.metrics.Timer;
 import com.google.inject.Inject;
-import com.google.inject.Injector;
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.usergrid.persistence.actorsystem.ActorSystemFig;
 import org.apache.usergrid.persistence.actorsystem.ActorSystemManager;
-import org.apache.usergrid.persistence.qakka.App;
 import org.apache.usergrid.persistence.qakka.MetricsService;
 import org.apache.usergrid.persistence.qakka.QakkaFig;
 import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService;
-import org.apache.usergrid.persistence.qakka.distributed.messages.QueueSendRequest;
-import org.apache.usergrid.persistence.qakka.distributed.messages.QueueSendResponse;
-import org.apache.usergrid.persistence.qakka.distributed.messages.QueueWriteRequest;
-import org.apache.usergrid.persistence.qakka.distributed.messages.QueueWriteResponse;
+import org.apache.usergrid.persistence.qakka.distributed.messages.*;
 import org.apache.usergrid.persistence.qakka.exceptions.QakkaException;
 import org.apache.usergrid.persistence.qakka.exceptions.QakkaRuntimeException;
 import org.apache.usergrid.persistence.qakka.serialization.auditlog.AuditLog;
@@ -80,13 +75,6 @@ public class QueueSender extends UntypedActor {
         this.actorSystemFig = actorSystemFig;
         this.qakkaFig = qakkaFig;
         this.metricsService = metricsService;
-
-//        actorSystemManager       = injector.getInstance( ActorSystemManager.class );
-//        transferLogSerialization = injector.getInstance( TransferLogSerialization.class );
-//        auditLogSerialization    = injector.getInstance( AuditLogSerialization.class );
-//        actorSystemFig           = injector.getInstance( ActorSystemFig.class );
-//        qakkaFig                 = injector.getInstance( QakkaFig.class );
-//        metricsService           = injector.getInstance( MetricsService.class );
     }
 
     @Override
@@ -97,7 +85,7 @@ public class QueueSender extends UntypedActor {
 
             // as far as caller is concerned, we are done.
             getSender().tell( new QueueSendResponse(
-                    DistributedQueueService.Status.SUCCESS ), getSender() );
+                    DistributedQueueService.Status.SUCCESS, qa.getQueueName() ), getSender() );
 
             final QueueWriter.WriteStatus writeStatus = sendMessageToRegion(
                     qa.getQueueName(),

http://git-wip-us.apache.org/repos/asf/usergrid/blob/7be8c274/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java
index 33f1dd9..b7a95df 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java
@@ -22,29 +22,22 @@ package org.apache.usergrid.persistence.qakka.distributed.actors;
 import akka.actor.UntypedActor;
 import com.codahale.metrics.Timer;
 import com.google.inject.Inject;
-import com.google.inject.Injector;
 import org.apache.usergrid.persistence.actorsystem.ActorSystemFig;
-import org.apache.usergrid.persistence.qakka.App;
 import org.apache.usergrid.persistence.qakka.MetricsService;
 import org.apache.usergrid.persistence.qakka.QakkaFig;
 import org.apache.usergrid.persistence.qakka.core.CassandraClient;
-import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl;
 import org.apache.usergrid.persistence.qakka.core.QakkaUtils;
 import org.apache.usergrid.persistence.qakka.distributed.messages.QueueTimeoutRequest;
-import org.apache.usergrid.persistence.qakka.exceptions.QakkaRuntimeException;
 import org.apache.usergrid.persistence.qakka.serialization.MultiShardMessageIterator;
 import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
-import org.apache.usergrid.persistence.qakka.serialization.queuemessages.MessageCounterSerialization;
 import org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization;
 import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard;
 import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardIterator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.text.DecimalFormat;
 import java.util.Optional;
 import java.util.UUID;
-import java.util.concurrent.atomic.AtomicLong;
 
 
 public class QueueTimeouter extends UntypedActor {
@@ -70,12 +63,6 @@ public class QueueTimeouter extends UntypedActor {
         this.actorSystemFig = actorSystemFig;
         this.qakkaFig = qakkaFig;
         this.cassandraClient = cassandraClient;
-
-//        messageSerialization = injector.getInstance( QueueMessageSerialization.class );
-//        actorSystemFig       = injector.getInstance( ActorSystemFig.class );
-//        qakkaFig             = injector.getInstance( QakkaFig.class );
-//        metricsService       = injector.getInstance( MetricsService.class );
-//        cassandraClient      = injector.getInstance( CassandraClientImpl.class );
     }
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/7be8c274/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java
index e014d59..a7dbbd0 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java
@@ -22,17 +22,16 @@ package org.apache.usergrid.persistence.qakka.distributed.actors;
 import akka.actor.UntypedActor;
 import com.codahale.metrics.Timer;
 import com.google.inject.Inject;
-import com.google.inject.Injector;
-import org.apache.usergrid.persistence.qakka.App;
 import org.apache.usergrid.persistence.qakka.MetricsService;
 import org.apache.usergrid.persistence.qakka.core.QakkaUtils;
 import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService;
+import org.apache.usergrid.persistence.qakka.distributed.messages.QueueAckRequest;
+import org.apache.usergrid.persistence.qakka.distributed.messages.QueueAckResponse;
 import org.apache.usergrid.persistence.qakka.distributed.messages.QueueWriteRequest;
 import org.apache.usergrid.persistence.qakka.distributed.messages.QueueWriteResponse;
 import org.apache.usergrid.persistence.qakka.serialization.auditlog.AuditLog;
 import org.apache.usergrid.persistence.qakka.serialization.auditlog.AuditLogSerialization;
 import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
-import org.apache.usergrid.persistence.qakka.serialization.queuemessages.MessageCounterSerialization;
 import org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization;
 import org.apache.usergrid.persistence.qakka.serialization.transferlog.TransferLogSerialization;
 import org.slf4j.Logger;
@@ -50,24 +49,21 @@ public class QueueWriter extends UntypedActor {
     private final TransferLogSerialization  transferLogSerialization;
     private final AuditLogSerialization     auditLogSerialization;
     private final MetricsService            metricsService;
-
+    private final QueueActorHelper          queueActorHelper;
 
     @Inject
     public QueueWriter(
         QueueMessageSerialization messageSerialization,
         TransferLogSerialization  transferLogSerialization,
         AuditLogSerialization     auditLogSerialization,
-        MetricsService            metricsService
+        MetricsService            metricsService,
+        QueueActorHelper          queueActorHelper
     ) {
-        this.messageSerialization = messageSerialization;
+        this.messageSerialization     = messageSerialization;
         this.transferLogSerialization = transferLogSerialization;
-        this.auditLogSerialization = auditLogSerialization;
-        this.metricsService = metricsService;
-
-//        messageSerialization     = injector.getInstance( QueueMessageSerialization.class );
-//        transferLogSerialization = injector.getInstance( TransferLogSerialization.class );
-//        auditLogSerialization    = injector.getInstance( AuditLogSerialization.class );
-//        metricsService           = injector.getInstance( MetricsService.class );
+        this.auditLogSerialization    = auditLogSerialization;
+        this.metricsService           = metricsService;
+        this.queueActorHelper         = queueActorHelper;
     }
 
     @Override
@@ -86,6 +82,7 @@ public class QueueWriter extends UntypedActor {
 
                 DatabaseQueueMessage dbqm = null;
                 long currentTime = System.currentTimeMillis();
+                String queueName = qa.getQueueName();
 
                 try {
                     dbqm = new DatabaseQueueMessage(
@@ -115,7 +112,7 @@ public class QueueWriter extends UntypedActor {
                             dbqm.getMessageId() );
 
                     getSender().tell( new QueueWriteResponse(
-                            QueueWriter.WriteStatus.ERROR ), getSender() );
+                            QueueWriter.WriteStatus.ERROR, queueName ), getSender() );
 
                     return;
                 }
@@ -136,7 +133,7 @@ public class QueueWriter extends UntypedActor {
                             qa.getMessageId() );
 
                     getSender().tell( new QueueWriteResponse(
-                            QueueWriter.WriteStatus.SUCCESS_XFERLOG_DELETED ), getSender() );
+                            QueueWriter.WriteStatus.SUCCESS_XFERLOG_DELETED, queueName ), getSender() );
 
                 } catch (Throwable e) {
                     logger.debug( "Unable to delete transfer log for {} {} {} {}",
@@ -147,13 +144,33 @@ public class QueueWriter extends UntypedActor {
                     logger.debug("Error deleting transferlog", e);
 
                     getSender().tell( new QueueWriteResponse(
-                            QueueWriter.WriteStatus.SUCCESS_XFERLOG_NOTDELETED ), getSender() );
+                            QueueWriter.WriteStatus.SUCCESS_XFERLOG_NOTDELETED, queueName ), getSender() );
                 }
 
             } finally {
                 timer.close();
             }
 
+        } else if ( message instanceof QueueAckRequest ){
+
+            Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.ACK_TIME_ACK ).time();
+            try {
+
+                QueueAckRequest queueAckRequest = (QueueAckRequest) message;
+
+                DistributedQueueService.Status status = queueActorHelper.ackQueueMessage(
+                    queueAckRequest.getQueueName(),
+                    queueAckRequest.getQueueMessageId() );
+
+                getSender().tell( new QueueAckResponse(
+                    queueAckRequest.getQueueName(),
+                    queueAckRequest.getQueueMessageId(),
+                    status ), getSender() );
+
+            } finally {
+                timer.close();
+            }
+
         } else {
             unhandled( message );
         }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/7be8c274/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriterRouter.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriterRouter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriterRouter.java
index 0e3e981..cb06c1d 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriterRouter.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriterRouter.java
@@ -24,8 +24,8 @@ import akka.actor.Props;
 import akka.actor.UntypedActor;
 import akka.routing.FromConfig;
 import com.google.inject.Inject;
-import com.google.inject.Injector;
 import org.apache.usergrid.persistence.actorsystem.GuiceActorProducer;
+import org.apache.usergrid.persistence.qakka.distributed.messages.QueueAckRequest;
 import org.apache.usergrid.persistence.qakka.distributed.messages.QueueWriteRequest;
 
 
@@ -37,7 +37,7 @@ public class QueueWriterRouter extends UntypedActor {
     private final ActorRef router;
 
     @Inject
-    public QueueWriterRouter( Injector injector ) {
+    public QueueWriterRouter() {
 
         this.router = getContext().actorOf( FromConfig.getInstance().props(
             Props.create( GuiceActorProducer.class, QueueWriter.class )), "router");
@@ -46,7 +46,8 @@ public class QueueWriterRouter extends UntypedActor {
     @Override
     public void onReceive(Object message) {
 
-        if ( message instanceof QueueWriteRequest) {
+        if (   message instanceof QueueWriteRequest || message instanceof QueueAckRequest ) {
+
             router.tell( message, getSender() );
 
         } else {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/7be8c274/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
index 9063242..20bf608 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
@@ -22,14 +22,16 @@ package org.apache.usergrid.persistence.qakka.distributed.impl;
 import akka.actor.ActorRef;
 import akka.pattern.Patterns;
 import akka.util.Timeout;
+import com.codahale.metrics.*;
+import com.codahale.metrics.Timer;
 import com.datastax.driver.core.exceptions.InvalidQueryException;
-import com.google.inject.Guice;
 import com.google.inject.Inject;
 import com.google.inject.Injector;
 import com.google.inject.Singleton;
 import org.apache.usergrid.persistence.actorsystem.ActorSystemManager;
 import org.apache.usergrid.persistence.actorsystem.ClientActor;
 import org.apache.usergrid.persistence.actorsystem.GuiceActorProducer;
+import org.apache.usergrid.persistence.qakka.MetricsService;
 import org.apache.usergrid.persistence.qakka.QakkaFig;
 import org.apache.usergrid.persistence.qakka.core.QueueManager;
 import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService;
@@ -37,7 +39,6 @@ import org.apache.usergrid.persistence.qakka.distributed.messages.*;
 import org.apache.usergrid.persistence.qakka.exceptions.NotFoundException;
 import org.apache.usergrid.persistence.qakka.exceptions.QakkaRuntimeException;
 import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
-import org.apache.usergrid.persistence.qakka.serialization.queuemessages.MessageCounterSerialization;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.Await;
@@ -56,19 +57,21 @@ public class DistributedQueueServiceImpl implements DistributedQueueService {
     private final ActorSystemManager actorSystemManager;
     private final QueueManager queueManager;
     private final QakkaFig qakkaFig;
-
+    private final MetricsService metricsService;
 
     @Inject
     public DistributedQueueServiceImpl(
             Injector injector,
             ActorSystemManager actorSystemManager,
             QueueManager queueManager,
-            QakkaFig qakkaFig
+            QakkaFig qakkaFig,
+            MetricsService metricsService
             ) {
 
         this.actorSystemManager = actorSystemManager;
         this.queueManager = queueManager;
         this.qakkaFig = qakkaFig;
+        this.metricsService = metricsService;
 
         GuiceActorProducer.INJECTOR = injector;
     }
@@ -154,59 +157,66 @@ public class DistributedQueueServiceImpl implements DistributedQueueService {
             String queueName, String sourceRegion, String destRegion, UUID messageId,
             Long deliveryTime, Long expirationTime ) {
 
-        if ( queueManager.getQueueConfig( queueName ) == null ) {
-            throw new NotFoundException( "Queue not found: " + queueName );
-        }
+        Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.SEND_TIME_TOTAL ).time();
+        try {
 
-        int maxRetries = qakkaFig.getMaxSendRetries();
-        int retries = 0;
+            if ( queueManager.getQueueConfig( queueName ) == null ) {
+                throw new NotFoundException( "Queue not found: " + queueName );
+            }
 
-        QueueSendRequest request = new QueueSendRequest(
-                queueName, sourceRegion, destRegion, messageId, deliveryTime, expirationTime );
+            int maxRetries = qakkaFig.getMaxSendRetries();
+            int retries = 0;
 
-        while ( retries++ < maxRetries ) {
-            try {
-                Timeout t = new Timeout( qakkaFig.getSendTimeoutSeconds(), TimeUnit.SECONDS );
+            QueueSendRequest request = new QueueSendRequest(
+                    queueName, sourceRegion, destRegion, messageId, deliveryTime, expirationTime );
 
-                // send to current region via local clientActor
-                ActorRef clientActor = actorSystemManager.getClientActor();
-                Future<Object> fut = Patterns.ask( clientActor, request, t );
+            while ( retries++ < maxRetries ) {
+                try {
+                    Timeout t = new Timeout( qakkaFig.getSendTimeoutSeconds(), TimeUnit.SECONDS );
 
-                // wait for response...
-                final Object response = Await.result( fut, t.duration() );
+                    // send to current region via local clientActor
+                    ActorRef clientActor = actorSystemManager.getClientActor();
+                    Future<Object> fut = Patterns.ask( clientActor, request, t );
 
-                if ( response != null && response instanceof QueueSendResponse) {
-                    QueueSendResponse qarm = (QueueSendResponse)response;
+                    // wait for response...
+                    final Object response = Await.result( fut, t.duration() );
 
-                    if ( !DistributedQueueService.Status.ERROR.equals( qarm.getSendStatus() )) {
+                    if ( response != null && response instanceof QueueSendResponse) {
+                        QueueSendResponse qarm = (QueueSendResponse)response;
 
-                        if ( retries > 1 ) {
-                            logger.debug("SUCCESS after {} retries", retries );
-                        }
+                        if ( !DistributedQueueService.Status.ERROR.equals( qarm.getSendStatus() )) {
 
-                        // send refresh-queue-if-empty message
-                        QueueRefreshRequest qrr = new QueueRefreshRequest( queueName, false );
-                        clientActor.tell( qrr, null );
+                            if ( retries > 1 ) {
+                                logger.debug("SUCCESS after {} retries", retries );
+                            }
+
+                            // send refresh-queue-if-empty message
+                            QueueRefreshRequest qrr = new QueueRefreshRequest( queueName, false );
+                            clientActor.tell( qrr, null );
+
+                            return qarm.getSendStatus();
+
+                        } else {
+                            logger.debug("ERROR STATUS sending to queue, retrying {}", retries );
+                        }
 
-                        return qarm.getSendStatus();
+                    } else if ( response != null  ) {
+                        logger.debug("NULL RESPONSE sending to queue, retrying {}", retries );
 
                     } else {
-                        logger.debug("ERROR STATUS sending to queue, retrying {}", retries );
+                        logger.debug("TIMEOUT sending to queue, retrying {}", retries );
                     }
 
-                } else if ( response != null  ) {
-                    logger.debug("NULL RESPONSE sending to queue, retrying {}", retries );
-
-                } else {
-                    logger.debug("TIMEOUT sending to queue, retrying {}", retries );
+                } catch ( Exception e ) {
+                    logger.debug("ERROR sending to queue, retrying " + retries, e );
                 }
-
-            } catch ( Exception e ) {
-                logger.debug("ERROR sending to queue, retrying " + retries, e );
             }
-        }
 
-        throw new QakkaRuntimeException( "Error sending to queue after " + retries );
+            throw new QakkaRuntimeException( "Error sending to queue after " + retries );
+
+        } finally {
+            timer.close();
+        }
     }
 
 
@@ -214,19 +224,32 @@ public class DistributedQueueServiceImpl implements DistributedQueueService {
     public Collection<DatabaseQueueMessage> getNextMessages( String queueName, int count ) {
         List<DatabaseQueueMessage> ret = new ArrayList<>();
 
-        long startTime = System.currentTimeMillis();
+        com.codahale.metrics.Timer.Context timer =
+            metricsService.getMetricRegistry().timer( MetricsService.GET_TIME_TOTAL ).time();
+
+        try {
 
-        while ( ret.size() < count
-            && System.currentTimeMillis() - startTime < qakkaFig.getLongPollTimeMillis()) {
+            long startTime = System.currentTimeMillis();
 
-            ret.addAll( getNextMessagesInternal( queueName, count ));
+            while ( ret.size() < count
+                && System.currentTimeMillis() - startTime < qakkaFig.getLongPollTimeMillis()) {
 
-            if ( ret.size() < count ) {
-                try { Thread.sleep( qakkaFig.getLongPollTimeMillis() / 2 ); } catch (Exception ignored) {}
+                ret.addAll( getNextMessagesInternal( queueName, count ));
+
+                if ( ret.size() < count ) {
+                    try { Thread.sleep( qakkaFig.getLongPollTimeMillis() / 2 ); } catch (Exception ignored) {}
+                }
             }
+
+            if ( ret.isEmpty() ) {
+                logger.info( "Requested {} but queue '{}' is empty", count, queueName);
+            }
+            return ret;
+
+        } finally {
+            timer.close();
         }
 
-        return ret;
     }
 
 
@@ -242,10 +265,10 @@ public class DistributedQueueServiceImpl implements DistributedQueueService {
         }
 
         int maxRetries = qakkaFig.getMaxGetRetries();
-        int retries = 0;
+        int tries = 0;
 
         QueueGetRequest request = new QueueGetRequest( queueName, count );
-        while ( ++retries < maxRetries ) {
+        while ( ++tries < maxRetries ) {
             try {
                 Timeout t = new Timeout( qakkaFig.getGetTimeoutSeconds(), TimeUnit.SECONDS );
 
@@ -261,8 +284,8 @@ public class DistributedQueueServiceImpl implements DistributedQueueService {
                     if ( response != null && response instanceof QueueGetResponse) {
                         QueueGetResponse qprm = (QueueGetResponse)response;
                         if ( qprm.isSuccess() ) {
-                            if (retries > 1) {
-                                logger.debug( "getNextMessage {} SUCCESS after {} retries", queueName, retries );
+                            if (tries > 1) {
+                                logger.warn( "getNextMessage {} SUCCESS after {} tries", queueName, tries );
                             }
                         }
                         logger.debug("Returning queue {} messages {}", queueName, qprm.getQueueMessages().size());
@@ -270,41 +293,49 @@ public class DistributedQueueServiceImpl implements DistributedQueueService {
 
 
                     } else if ( response != null  ) {
-                        logger.debug("ERROR RESPONSE (1) popping queue {}, retrying {}", queueName, retries );
+                        logger.debug("ERROR RESPONSE (1) popping queue {}, retrying {}", queueName, tries );
 
                     } else {
-                        logger.debug("TIMEOUT popping from queue {}, retrying {}", queueName, retries );
+                        logger.debug("TIMEOUT popping from queue {}, retrying {}", queueName, tries );
                     }
 
                 } else if ( responseObject instanceof ClientActor.ErrorResponse ) {
 
                     final ClientActor.ErrorResponse errorResponse = (ClientActor.ErrorResponse)responseObject;
                     logger.debug("ACTORSYSTEM ERROR popping queue: {}, retrying {}",
-                        errorResponse.getMessage(), retries );
+                        errorResponse.getMessage(), tries );
 
                 } else {
-                    logger.debug("UNKNOWN RESPONSE popping queue {}, retrying {}", queueName, retries );
+                    logger.debug("UNKNOWN RESPONSE popping queue {}, retrying {}", queueName, tries );
                 }
 
             } catch ( Exception e ) {
-                logger.debug("ERROR popping to queue " + queueName + " retrying " + retries, e );
+                logger.error("ERROR popping to queue " + queueName + " retrying " + tries, e );
             }
         }
 
         throw new QakkaRuntimeException(
-                "Error getting from queue " + queueName + " after " + retries + " tries");
+                "Error getting from queue " + queueName + " after " + tries + " tries");
     }
 
 
     @Override
     public Status ackMessage(String queueName, UUID queueMessageId ) {
 
-        if ( queueManager.getQueueConfig( queueName ) == null ) {
-            throw new NotFoundException( "Queue not found: " + queueName );
-        }
+        Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.ACK_TIME_TOTAL ).time();
+        try {
 
-        QueueAckRequest message = new QueueAckRequest( queueName, queueMessageId );
-        return sendMessageToLocalQueueActors( message );
+            if ( queueManager.getQueueConfig( queueName ) == null ) {
+                throw new NotFoundException( "Queue not found: " + queueName );
+            }
+
+            QueueAckRequest message = new QueueAckRequest( queueName, queueMessageId );
+            return sendMessageToLocalRouters( message );
+
+
+        } finally {
+            timer.close();
+        }
     }
 
 
@@ -316,7 +347,7 @@ public class DistributedQueueServiceImpl implements DistributedQueueService {
         }
 
         QueueAckRequest message = new QueueAckRequest( queueName, messageId );
-        return sendMessageToLocalQueueActors( message );
+        return sendMessageToLocalRouters( message );
     }
 
 
@@ -332,7 +363,7 @@ public class DistributedQueueServiceImpl implements DistributedQueueService {
     }
 
 
-    private Status sendMessageToLocalQueueActors( QakkaMessage message ) {
+    private Status sendMessageToLocalRouters( QakkaMessage message ) {
 
         int maxRetries = 5;
         int retries = 0;
@@ -367,6 +398,6 @@ public class DistributedQueueServiceImpl implements DistributedQueueService {
     }
 
     public void shutdown() {
-        actorSystemManager.shutdownAll();
+        // no op
     }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/7be8c274/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueActorRouterProducer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueActorRouterProducer.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueActorRouterProducer.java
index d74936b..3bf6180 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueActorRouterProducer.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueActorRouterProducer.java
@@ -28,6 +28,7 @@ import akka.cluster.singleton.ClusterSingletonProxy;
 import akka.cluster.singleton.ClusterSingletonProxySettings;
 import com.google.inject.Inject;
 import com.google.inject.Injector;
+import com.google.inject.Singleton;
 import org.apache.usergrid.persistence.actorsystem.ActorSystemManager;
 import org.apache.usergrid.persistence.actorsystem.GuiceActorProducer;
 import org.apache.usergrid.persistence.actorsystem.RouterProducer;
@@ -41,6 +42,7 @@ import java.util.HashMap;
 import java.util.Map;
 
 
+@Singleton
 public class QueueActorRouterProducer implements RouterProducer {
 
     static Injector injector;
@@ -131,7 +133,6 @@ public class QueueActorRouterProducer implements RouterProducer {
     public Collection<Class> getMessageTypes() {
         return new ArrayList() {{
             add( QueueGetRequest.class );
-            add( QueueAckRequest.class );
             add( QueueInitRequest.class );
             add( QueueRefreshRequest.class );
             add( QueueTimeoutRequest.class );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/7be8c274/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueWriterRouterProducer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueWriterRouterProducer.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueWriterRouterProducer.java
index 006f1a7..c4e7acc 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueWriterRouterProducer.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueWriterRouterProducer.java
@@ -28,19 +28,22 @@ import akka.cluster.singleton.ClusterSingletonProxy;
 import akka.cluster.singleton.ClusterSingletonProxySettings;
 import com.google.inject.Inject;
 import com.google.inject.Injector;
+import com.google.inject.Singleton;
 import org.apache.usergrid.persistence.actorsystem.ActorSystemManager;
 import org.apache.usergrid.persistence.actorsystem.GuiceActorProducer;
 import org.apache.usergrid.persistence.actorsystem.RouterProducer;
 import org.apache.usergrid.persistence.qakka.QakkaFig;
 import org.apache.usergrid.persistence.qakka.distributed.actors.QueueWriterRouter;
+import org.apache.usergrid.persistence.qakka.distributed.messages.QueueAckRequest;
 import org.apache.usergrid.persistence.qakka.distributed.messages.QueueWriteRequest;
 
+import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
 
+@Singleton
 public class QueueWriterRouterProducer implements RouterProducer {
 
     static Injector injector;
@@ -128,7 +131,11 @@ public class QueueWriterRouterProducer implements RouterProducer {
 
     @Override
     public Collection<Class> getMessageTypes() {
-        return Collections.singletonList( QueueWriteRequest.class );
+        return new ArrayList() {{
+            add( QueueAckRequest.class );
+            add( QueueWriteRequest.class );
+        }};
+
     }
 
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/7be8c274/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QakkaMessage.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QakkaMessage.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QakkaMessage.java
index a1bbf14..a3919ea 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QakkaMessage.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QakkaMessage.java
@@ -21,8 +21,8 @@ package org.apache.usergrid.persistence.qakka.distributed.messages;
 
 import java.io.Serializable;
 
-/**
- * Marker interface
- */
+
 public interface QakkaMessage extends Serializable {
+
+    String getQueueName();
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/7be8c274/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueGetResponse.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueGetResponse.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueGetResponse.java
index c8004fb..1776360 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueGetResponse.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueGetResponse.java
@@ -30,16 +30,19 @@ import java.util.Collections;
 public class QueueGetResponse implements QakkaMessage {
     private final Collection<DatabaseQueueMessage> queueMessages;
     private final DistributedQueueService.Status status;
+    private final String queueName;
 
-
-    public QueueGetResponse(DistributedQueueService.Status status ) {
+    public QueueGetResponse(DistributedQueueService.Status status, String queueName ) {
         this.status = status;
         this.queueMessages = Collections.emptyList();
+        this.queueName = queueName;
     }
 
-    public QueueGetResponse(DistributedQueueService.Status status, Collection<DatabaseQueueMessage> queueMessages) {
+    public QueueGetResponse(
+        DistributedQueueService.Status status, Collection<DatabaseQueueMessage> queueMessages, String queueName) {
         this.status = status;
         this.queueMessages = queueMessages;
+        this.queueName = queueName;
     }
 
     public DistributedQueueService.Status getStatus() {
@@ -60,4 +63,9 @@ public class QueueGetResponse implements QakkaMessage {
                 .append( "status", status )
                 .toString();
     }
+
+    @Override
+    public String getQueueName() {
+        return queueName;
+    }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/7be8c274/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueSendResponse.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueSendResponse.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueSendResponse.java
index 0c295a0..9b065af 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueSendResponse.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueSendResponse.java
@@ -25,9 +25,12 @@ import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService
 
 public class QueueSendResponse implements QakkaMessage {
     private final DistributedQueueService.Status status;
+    private final String queueName;
 
-    public QueueSendResponse(DistributedQueueService.Status status) {
+
+    public QueueSendResponse( DistributedQueueService.Status status, String queueName ) {
         this.status = status;
+        this.queueName = queueName;
     }
 
     public DistributedQueueService.Status getSendStatus() {
@@ -40,4 +43,8 @@ public class QueueSendResponse implements QakkaMessage {
                 .toString();
     }
 
+    @Override
+    public String getQueueName() {
+        return queueName;
+    }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/7be8c274/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueWriteResponse.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueWriteResponse.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueWriteResponse.java
index 1eb513c..463b4df 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueWriteResponse.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueWriteResponse.java
@@ -25,9 +25,11 @@ import org.apache.usergrid.persistence.qakka.distributed.actors.QueueWriter;
 
 public class QueueWriteResponse implements QakkaMessage {
     private final QueueWriter.WriteStatus status;
+    private String queueName;
 
-    public QueueWriteResponse(QueueWriter.WriteStatus status) {
+    public QueueWriteResponse(QueueWriter.WriteStatus status, String queueName ) {
         this.status = status;
+        this.queueName = queueName;
     }
 
     public QueueWriter.WriteStatus getSendStatus() {
@@ -40,4 +42,8 @@ public class QueueWriteResponse implements QakkaMessage {
                 .toString();
     }
 
+    @Override
+    public String getQueueName() {
+        return queueName;
+    }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/7be8c274/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardMessageIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardMessageIterator.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardMessageIterator.java
index 6ec0774..29327e2 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardMessageIterator.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardMessageIterator.java
@@ -49,6 +49,7 @@ public class MultiShardMessageIterator implements Iterator<DatabaseQueueMessage>
     private final Iterator<Shard> shardIterator;
 
     private Iterator<DatabaseQueueMessage> currentIterator;
+
     private Shard currentShard;
     private UUID nextStart;
 
@@ -184,4 +185,9 @@ public class MultiShardMessageIterator implements Iterator<DatabaseQueueMessage>
 
     }
 
+
+    public Shard getCurrentShard() {
+        return currentShard;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/7be8c274/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/QueueMessageSerialization.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/QueueMessageSerialization.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/QueueMessageSerialization.java
index 3ebe735..86c50a5 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/QueueMessageSerialization.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/QueueMessageSerialization.java
@@ -51,4 +51,9 @@ public interface QueueMessageSerialization extends Migration {
     DatabaseQueueMessageBody loadMessageData(final UUID messageId);
 
     void deleteMessageData(final UUID messageId);
+
+    /**
+     * Write message to inflight table and remove from available table
+     */
+    void putInflight( DatabaseQueueMessage queueMessage );
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/7be8c274/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java
index d9a2543..33de7bc 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java
@@ -19,6 +19,7 @@
 
 package org.apache.usergrid.persistence.qakka.serialization.queuemessages.impl;
 
+import com.datastax.driver.core.BatchStatement;
 import com.datastax.driver.core.Row;
 import com.datastax.driver.core.Statement;
 import com.datastax.driver.core.querybuilder.Clause;
@@ -137,12 +138,6 @@ public class QueueMessageSerializationImpl implements QueueMessageSerialization
         final UUID queueMessageId =  message.getQueueMessageId() == null ?
                 QakkaUtils.getTimeUuid() : message.getQueueMessageId();
 
-        long queuedAt = message.getQueuedAt() == null ?
-                System.currentTimeMillis() : message.getQueuedAt();
-
-        long inflightAt = message.getInflightAt() == null ?
-                message.getQueuedAt() : message.getInflightAt();
-
         Shard.Type shardType = DatabaseQueueMessage.Type.DEFAULT.equals( message.getType() ) ?
                 Shard.Type.DEFAULT : Shard.Type.INFLIGHT;
 
@@ -152,16 +147,7 @@ public class QueueMessageSerializationImpl implements QueueMessageSerialization
             message.setShardId( shard.getShardId() );
         }
 
-        Statement insert = QueryBuilder.insertInto(getTableName(message.getType()))
-                .value( COLUMN_QUEUE_NAME,       message.getQueueName())
-                .value( COLUMN_REGION,           message.getRegion())
-                .value( COLUMN_SHARD_ID,         message.getShardId())
-                .value( COLUMN_MESSAGE_ID,       message.getMessageId())
-                .value( COLUMN_QUEUE_MESSAGE_ID, queueMessageId)
-                .value( COLUMN_INFLIGHT_AT,      inflightAt )
-                .value( COLUMN_QUEUED_AT,        queuedAt)
-            .using( QueryBuilder.ttl( maxTtl ) );
-
+        Statement insert = createWriteMessageStatement( message );
         cassandraClient.getQueueMessageSession().execute(insert);
 
         shardCounterSerialization.incrementCounter( message.getQueueName(), shardType, message.getShardId(), 1 );
@@ -233,28 +219,7 @@ public class QueueMessageSerializationImpl implements QueueMessageSerialization
             final DatabaseQueueMessage.Type type,
             final UUID queueMessageId ) {
 
-        final long shardId;
-        if ( shardIdOrNull == null ) {
-            Shard.Type shardType = DatabaseQueueMessage.Type.DEFAULT.equals( type ) ?
-                    Shard.Type.DEFAULT : Shard.Type.INFLIGHT;
-            Shard shard = shardStrategy.selectShard(
-                    queueName, actorSystemFig.getRegionLocal(), shardType, queueMessageId );
-            shardId = shard.getShardId();
-        } else {
-            shardId = shardIdOrNull;
-        }
-
-        Clause queueNameClause = QueryBuilder.eq(      COLUMN_QUEUE_NAME, queueName );
-        Clause regionClause = QueryBuilder.eq(         COLUMN_REGION, region );
-        Clause shardIdClause = QueryBuilder.eq(        COLUMN_SHARD_ID, shardId );
-        Clause queueMessageIdClause = QueryBuilder.eq( COLUMN_QUEUE_MESSAGE_ID, queueMessageId);
-
-        Statement delete = QueryBuilder.delete().from(getTableName( type ))
-                .where(queueNameClause)
-                .and(regionClause)
-                .and(shardIdClause)
-                .and(queueMessageIdClause);
-
+        Statement delete = createDeleteMessageStatement( queueName, region, null, type,queueMessageId);
         cassandraClient.getQueueMessageSession().execute( delete );
 
         messageCounterSerialization.decrementCounter( queueName, type, 1L );
@@ -297,14 +262,118 @@ public class QueueMessageSerializationImpl implements QueueMessageSerialization
     public void deleteMessageData( final UUID messageId ) {
 
         Clause messageIdClause = QueryBuilder.eq(COLUMN_MESSAGE_ID, messageId);
-
-        Statement delete = QueryBuilder.delete().from(TABLE_MESSAGE_DATA)
-                .where(messageIdClause);
+        Statement delete = QueryBuilder.delete().from(TABLE_MESSAGE_DATA).where(messageIdClause);
 
         cassandraClient.getApplicationSession().execute(delete);
     }
 
 
+    @Override
+    public void putInflight( DatabaseQueueMessage message ) {
+
+        // create statement to write new queue message to inflight table
+
+        Shard.Type shardType = Shard.Type.INFLIGHT;
+        Shard shard = shardStrategy.selectShard(
+            message.getQueueName(), message.getRegion(), shardType, message.getQueueMessageId() );
+
+        DatabaseQueueMessage inflightMessage = new DatabaseQueueMessage(
+            message.getMessageId(),
+            DatabaseQueueMessage.Type.INFLIGHT,
+            message.getQueueName(),
+            message.getRegion(),
+            shard.getShardId(),
+            message.getQueuedAt(),
+            System.currentTimeMillis(),
+            message.getQueueMessageId() );
+
+        Statement insert = createWriteMessageStatement( inflightMessage );
+
+        // create statement to delete queue message from available table
+
+        Statement delete = createDeleteMessageStatement(
+            message.getQueueName(),
+            message.getRegion(),
+            null,
+            DatabaseQueueMessage.Type.DEFAULT,
+            message.getQueueMessageId());
+
+        // execute statements as a batch
+
+        BatchStatement batchStatement = new BatchStatement();
+        batchStatement.add( insert );
+        batchStatement.add( delete );
+        cassandraClient.getQueueMessageSession().execute( batchStatement );
+
+        // bump counters
+
+        shardCounterSerialization.incrementCounter(
+            message.getQueueName(), Shard.Type.INFLIGHT, message.getShardId(), 1 );
+
+        messageCounterSerialization.incrementCounter(
+            message.getQueueName(), DatabaseQueueMessage.Type.INFLIGHT, 1L );
+
+        messageCounterSerialization.decrementCounter(
+            message.getQueueName(), DatabaseQueueMessage.Type.DEFAULT, 1L );
+    }
+
+
+    private Statement createDeleteMessageStatement( final String queueName,
+                                                    final String region,
+                                                    final Long shardIdOrNull,
+                                                    final DatabaseQueueMessage.Type type,
+                                                    final UUID queueMessageId ) {
+        final long shardId;
+        if ( shardIdOrNull == null ) {
+            Shard.Type shardType = DatabaseQueueMessage.Type.DEFAULT.equals( type ) ?
+                Shard.Type.DEFAULT : Shard.Type.INFLIGHT;
+            Shard shard = shardStrategy.selectShard(
+                queueName, region, shardType, queueMessageId );
+            shardId = shard.getShardId();
+        } else {
+            shardId = shardIdOrNull;
+        }
+
+        Clause queueNameClause = QueryBuilder.eq(      COLUMN_QUEUE_NAME, queueName );
+        Clause regionClause = QueryBuilder.eq(         COLUMN_REGION, region );
+        Clause shardIdClause = QueryBuilder.eq(        COLUMN_SHARD_ID, shardId );
+        Clause queueMessageIdClause = QueryBuilder.eq( COLUMN_QUEUE_MESSAGE_ID, queueMessageId);
+
+        Statement delete = QueryBuilder.delete().from(getTableName( type ))
+            .where(queueNameClause)
+            .and(regionClause)
+            .and(shardIdClause)
+            .and(queueMessageIdClause);
+
+        return delete;
+    }
+
+
+    private Statement createWriteMessageStatement( DatabaseQueueMessage message ) {
+
+        final UUID queueMessageId =  message.getQueueMessageId() == null ?
+            QakkaUtils.getTimeUuid() : message.getQueueMessageId();
+
+        long queuedAt = message.getQueuedAt() == null ?
+            System.currentTimeMillis() : message.getQueuedAt();
+
+        long inflightAt = message.getInflightAt() == null ?
+            message.getQueuedAt() : message.getInflightAt();
+
+        Statement insert = QueryBuilder.insertInto(getTableName(message.getType()))
+            .value( COLUMN_QUEUE_NAME,       message.getQueueName())
+            .value( COLUMN_REGION,           message.getRegion())
+            .value( COLUMN_SHARD_ID,         message.getShardId())
+            .value( COLUMN_MESSAGE_ID,       message.getMessageId())
+            .value( COLUMN_QUEUE_MESSAGE_ID, queueMessageId)
+            .value( COLUMN_INFLIGHT_AT,      inflightAt )
+            .value( COLUMN_QUEUED_AT,        queuedAt)
+            .using( QueryBuilder.ttl( maxTtl ) );
+
+        return insert;
+    }
+
+
     public static String getTableName(DatabaseQueueMessage.Type messageType){
 
         String table;

http://git-wip-us.apache.org/repos/asf/usergrid/blob/7be8c274/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java
index a5c95bd..5bd2b05 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java
@@ -102,9 +102,6 @@ public class QueueActorServiceTest extends AbstractTest {
             ByteBuffer blob = dqmb.getBlob();
 
             String returnedData = new String( blob.array(), "UTF-8" );
-//        ByteArrayInputStream bais = new ByteArrayInputStream( blob.array() );
-//        ObjectInputStream ios = new ObjectInputStream( bais );
-//        String returnedData = (String)ios.readObject();
 
             Assert.assertEquals( data, returnedData );
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/7be8c274/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelperTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelperTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelperTest.java
index 791650e..0f5b46f 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelperTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelperTest.java
@@ -177,7 +177,7 @@ public class QueueActorHelperTest extends AbstractTest {
             // put message inflight
 
             QueueActorHelper helper = injector.getInstance( QueueActorHelper.class );
-            helper.putInflight( queueName, message );
+            helper.putInflight( message );
 
             // message must be gone from messages_available table
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/7be8c274/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueReaderTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueReaderTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueReaderTest.java
index 5b42184..9fb8f29 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueReaderTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueReaderTest.java
@@ -30,6 +30,7 @@ import org.apache.usergrid.persistence.qakka.AbstractTest;
 import org.apache.usergrid.persistence.qakka.QakkaFig;
 import org.apache.usergrid.persistence.qakka.core.QakkaUtils;
 import org.apache.usergrid.persistence.qakka.core.impl.InMemoryQueue;
+import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService;
 import org.apache.usergrid.persistence.qakka.distributed.messages.QueueRefreshRequest;
 import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
 import org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization;
@@ -53,6 +54,8 @@ public class QueueReaderTest extends AbstractTest {
 
         Injector injector = getInjector();
 
+        injector.getInstance( DistributedQueueService.class ); // init the INJECTOR
+
         QakkaFig qakkaFig = injector.getInstance( QakkaFig.class );
         ActorSystemFig actorSystemFig = injector.getInstance( ActorSystemFig.class );
         ShardSerialization shardSerialization = injector.getInstance( ShardSerialization.class );
@@ -89,13 +92,16 @@ public class QueueReaderTest extends AbstractTest {
 
         // run the QueueRefresher to fill up the in-memory queue
 
-        QueueActorHelper helper = injector.getInstance( QueueActorHelper.class );
+        ActorSystem system = ActorSystem.create("Test-" + queueName);
+        ActorRef queueReaderRef = system.actorOf(
+            Props.create( GuiceActorProducer.class, QueueRefresher.class ), "queueReader");
+        QueueRefreshRequest refreshRequest = new QueueRefreshRequest( queueName, false );
 
         // need to wait for refresh to complete
         int maxRetries = 10;
         int retries = 0;
         while ( inMemoryQueue.size( queueName ) < qakkaFig.getQueueInMemorySize() && retries++ < maxRetries ) {
-            helper.queueRefresh( queueName );
+            queueReaderRef.tell( refreshRequest, null ); // tell sends message, returns immediately
             Thread.sleep(1000);
         }
 


[3/6] usergrid git commit: Formatting, debug log statements, etc.

Posted by sn...@apache.org.
Formatting, debug log statements, etc.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/5d37ed58
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/5d37ed58
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/5d37ed58

Branch: refs/heads/usergrid-1318-queue
Commit: 5d37ed58d8649e9498e8a0fe7fe858f52762978c
Parents: 7be8c27
Author: Dave Johnson <sn...@apache.org>
Authored: Mon Oct 10 13:51:53 2016 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Mon Oct 10 13:51:53 2016 -0400

----------------------------------------------------------------------
 .../qakka/distributed/actors/QueueActorHelper.java            | 2 +-
 .../persistence/qakka/distributed/actors/ShardAllocator.java  | 7 -------
 .../qakka/serialization/MultiShardMessageIterator.java        | 4 ++++
 .../queuemessages/impl/QueueMessageSerializationImpl.java     | 3 +++
 .../usergrid/persistence/queue/LegacyQueueManagerTest.java    | 2 --
 5 files changed, 8 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/5d37ed58/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java
index fcb3fba..6382661 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java
@@ -115,7 +115,7 @@ public class QueueActorHelper {
             }
         }
 
-        logger.debug("{} returning {} for queue {}", this, queueMessages.size(), queueName);
+        //logger.debug("{} returning {} for queue {}", this, queueMessages.size(), queueName);
         return queueMessages;
 
     }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/5d37ed58/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocator.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocator.java
index 46dc0ed..1863472 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocator.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocator.java
@@ -74,13 +74,6 @@ public class ShardAllocator extends UntypedActor {
         this.shardCounterSerialization = shardCounterSerialization;
         this.metricsService = metricsService;
         this.cassandraClient = cassandraClient;
-
-//        this.qakkaFig                  = injector.getInstance( QakkaFig.class );
-//        this.shardCounterSerialization = injector.getInstance( ShardCounterSerializationImpl.class );
-//        this.shardSerialization        = injector.getInstance( ShardSerializationImpl.class );
-//        this.actorSystemFig            = injector.getInstance( ActorSystemFig.class );
-//        this.metricsService            = injector.getInstance( MetricsService.class );
-//        this.cassandraClient           = injector.getInstance( CassandraClientImpl.class );
     }
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/5d37ed58/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardMessageIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardMessageIterator.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardMessageIterator.java
index 29327e2..6a066e9 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardMessageIterator.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardMessageIterator.java
@@ -133,6 +133,7 @@ public class MultiShardMessageIterator implements Iterator<DatabaseQueueMessage>
                     .and(regionClause)
                     .and(shardIdClause)
                     .limit(PAGE_SIZE);
+
         } else {
 
             Clause messageIdClause = QueryBuilder.gt( COLUMN_QUEUE_MESSAGE_ID, nextStart);
@@ -144,8 +145,11 @@ public class MultiShardMessageIterator implements Iterator<DatabaseQueueMessage>
                     .limit(PAGE_SIZE);
         }
 
+
         List<Row> rows = cassandraClient.getQueueMessageSession().execute(query).all();
 
+        //logger.debug("Query got {}: {}", rows.size(), query);
+
         if ( (rows == null || rows.size() == 0) && shardIterator.hasNext()) {
 
             currentShard = shardIterator.next();

http://git-wip-us.apache.org/repos/asf/usergrid/blob/5d37ed58/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java
index 33de7bc..fba2bed 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java
@@ -150,6 +150,9 @@ public class QueueMessageSerializationImpl implements QueueMessageSerialization
         Statement insert = createWriteMessageStatement( message );
         cassandraClient.getQueueMessageSession().execute(insert);
 
+//        logger.debug("Wrote queue {} queue message {} shardId {}",
+//            message.getQueueName(), message.getQueueMessageId(), message.getShardId() );
+
         shardCounterSerialization.incrementCounter( message.getQueueName(), shardType, message.getShardId(), 1 );
 
         messageCounterSerialization.incrementCounter( message.getQueueName(), message.getType(), 1L );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/5d37ed58/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerTest.java
index 92075b6..13fb195 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerTest.java
@@ -92,8 +92,6 @@ public class LegacyQueueManagerTest extends AbstractTest {
 
         Injector myInjector = getInjector();
 
-        CassandraClient cassandraClient = myInjector.getInstance( CassandraClientImpl.class );
-
         ActorSystemFig actorSystemFig = myInjector.getInstance( ActorSystemFig.class );
         String region = actorSystemFig.getRegionLocal();
 


[4/6] usergrid git commit: Fix QueueRefresher & concurrency problem in MessageCounterSerialization.

Posted by sn...@apache.org.
Fix QueueRefresher & concurrency problem in MessageCounterSerialization.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/7f3b2dae
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/7f3b2dae
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/7f3b2dae

Branch: refs/heads/usergrid-1318-queue
Commit: 7f3b2dae42d47c0422c16def1b6f2518bfc82e57
Parents: 5d37ed5
Author: Dave Johnson <sn...@apache.org>
Authored: Mon Oct 10 15:32:33 2016 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Mon Oct 10 15:32:33 2016 -0400

----------------------------------------------------------------------
 .../distributed/actors/QueueRefresher.java      | 14 +++++------
 .../impl/MessageCounterSerializationImpl.java   | 16 ++++++++-----
 .../serialization/sharding/ShardIterator.java   | 25 +++++++++++++-------
 .../impl/ShardCounterSerializationImpl.java     |  3 ++-
 .../distributed/actors/ShardAllocatorTest.java  | 14 ++++++-----
 .../queue/src/test/resources/qakka.properties   |  4 ++--
 6 files changed, 44 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/7f3b2dae/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java
index afd5640..d8faeb2 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java
@@ -107,15 +107,17 @@ public class QueueRefresher extends UntypedActor {
 
                 ShardIterator shardIterator = new ShardIterator(
                     cassandraClient, queueName, actorSystemFig.getRegionLocal(),
-                    Shard.Type.DEFAULT, Optional.empty() );
+                    Shard.Type.DEFAULT, shardIdOptional );
 
                 UUID since = inMemoryQueue.getNewest( queueName );
 
                 String region = actorSystemFig.getRegionLocal();
+
                 MultiShardMessageIterator multiShardIterator = new MultiShardMessageIterator(
                     cassandraClient, queueName, region, DatabaseQueueMessage.Type.DEFAULT,
                     shardIterator, since);
 
+
                 int need = qakkaFig.getQueueInMemorySize() - inMemoryQueue.size( queueName );
                 int count = 0;
 
@@ -125,14 +127,10 @@ public class QueueRefresher extends UntypedActor {
                     count++;
                 }
 
-                if ( multiShardIterator.getCurrentShard() != null ) {
-                    startingShards.put( shardKey, multiShardIterator.getCurrentShard().getShardId() );
-                }
+                startingShards.put( shardKey, shardId );
 
-                if ( count > 0 ) {
-                    logger.debug( "Added {} in-memory for queue {}, new size = {}",
-                        count, queueName, inMemoryQueue.size( queueName ) );
-                }
+//                logger.debug("Refreshed queue {} region {} shard {} since {} found {}",
+//                    queueName, region, shardId, since, count );
             }
 
         } finally {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/7f3b2dae/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java
index 0fdb47e..2eb482a 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java
@@ -134,11 +134,13 @@ public class MessageCounterSerializationImpl implements MessageCounterSerializat
                     inMemoryCounters.put( key, new InMemoryCount( value ));
                 }
             }
-        }
 
-        InMemoryCount inMemoryCount = inMemoryCounters.get( key );
-        inMemoryCount.getIncrement().addAndGet( increment );
+            InMemoryCount inMemoryCount = inMemoryCounters.get( key );
+            inMemoryCount.getIncrement().addAndGet( increment );
 
+//            logger.info("Incremented Count for queue {} type {} = {}",
+//                queueName, type, getCounterValue( queueName, type ));
+        }
         saveIfNeeded( queueName, type );
     }
 
@@ -161,11 +163,13 @@ public class MessageCounterSerializationImpl implements MessageCounterSerializat
                     inMemoryCounters.put( key, new InMemoryCount( value ));
                 }
             }
-        }
 
-        InMemoryCount inMemoryCount = inMemoryCounters.get( key );
-        inMemoryCount.getDecrement().addAndGet( decrement );
+            InMemoryCount inMemoryCount = inMemoryCounters.get( key );
+            inMemoryCount.getDecrement().addAndGet( decrement );
 
+//            logger.info("Decremented Count for queue {} type {} = {}",
+//                queueName, type, getCounterValue( queueName, type ));
+        }
         saveIfNeeded( queueName, type );
     }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/7f3b2dae/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardIterator.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardIterator.java
index 402d429..5f46a7e 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardIterator.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardIterator.java
@@ -40,9 +40,10 @@ public class ShardIterator implements Iterator<Shard> {
     private final String queueName;
     private final String region;
     private final Shard.Type shardType;
-    private final Optional<Long> shardId;
+    private final Optional<Long> lastShardId;
 
     private Iterator<Shard> currentIterator;
+    private long currentShardId = 0L;
 
     private long nextStart = 0L;
 
@@ -57,8 +58,10 @@ public class ShardIterator implements Iterator<Shard> {
         this.queueName = queueName;
         this.region = region;
         this.shardType = shardtype;
-        this.shardId = lastShardId.isPresent() ? lastShardId : Optional.of(0L);
+        this.lastShardId = lastShardId.isPresent() ? lastShardId : Optional.of(0L);
         this.cassandraClient = cassandraClient;
+
+        this.currentShardId = this.lastShardId.get();
     }
 
     @Override
@@ -79,7 +82,9 @@ public class ShardIterator implements Iterator<Shard> {
             throw new NoSuchElementException( "No next shard exists" );
         }
 
-        return currentIterator.next();
+        Shard next = currentIterator.next();
+        currentShardId = next.getShardId();
+        return next;
 
     }
 
@@ -90,13 +95,15 @@ public class ShardIterator implements Iterator<Shard> {
         Clause regionClause = QueryBuilder.eq( ShardSerializationImpl.COLUMN_REGION, region);
         Clause activeClause = QueryBuilder.eq( ShardSerializationImpl.COLUMN_ACTIVE, 1);
         Clause shardIdClause;
-        if(nextStart == 0L && shardId.isPresent()){
-            shardIdClause = QueryBuilder.gt( ShardSerializationImpl.COLUMN_SHARD_ID, shardId.get());
-        }else if( nextStart == 0L && !shardId.isPresent()){
-            shardIdClause = QueryBuilder.gte( ShardSerializationImpl.COLUMN_SHARD_ID, 0L);
 
-        }else{
-            shardIdClause = QueryBuilder.gt( ShardSerializationImpl.COLUMN_SHARD_ID, nextStart);
+        if (nextStart == 0L && lastShardId.isPresent()) {
+            shardIdClause = QueryBuilder.gt( ShardSerializationImpl.COLUMN_SHARD_ID, lastShardId.get() );
+
+        } else if (nextStart == 0L && !lastShardId.isPresent()) {
+            shardIdClause = QueryBuilder.gte( ShardSerializationImpl.COLUMN_SHARD_ID, 0L );
+
+        } else {
+            shardIdClause = QueryBuilder.gt( ShardSerializationImpl.COLUMN_SHARD_ID, nextStart );
         }
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/7f3b2dae/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardCounterSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardCounterSerializationImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardCounterSerializationImpl.java
index bcfb74d..f303f43 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardCounterSerializationImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardCounterSerializationImpl.java
@@ -91,7 +91,8 @@ public class ShardCounterSerializationImpl implements ShardCounterSerialization
 
 
     @Inject
-    public ShardCounterSerializationImpl( CassandraConfig cassandraConfig, QakkaFig qakkaFig, CassandraClient cassandraClient ) {
+    public ShardCounterSerializationImpl(
+        CassandraConfig cassandraConfig, QakkaFig qakkaFig, CassandraClient cassandraClient ) {
         this.cassandraConfig = cassandraConfig;
         this.maxInMemoryIncrement = qakkaFig.getMaxInMemoryShardCounter();
         this.cassandraClient = cassandraClient;

http://git-wip-us.apache.org/repos/asf/usergrid/blob/7f3b2dae/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java
index 919673c..aae5f44 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java
@@ -173,12 +173,14 @@ public class ShardAllocatorTest extends AbstractTest {
 
         injector.getInstance( App.class ); // init the INJECTOR
 
+        QakkaFig            qakkaFig              = injector.getInstance( QakkaFig.class );
         ActorSystemFig      actorSystemFig        = injector.getInstance( ActorSystemFig.class );
         QueueManager        queueManager          = injector.getInstance( QueueManager.class );
         QueueMessageManager queueMessageManager   = injector.getInstance( QueueMessageManager.class );
         DistributedQueueService distributedQueueService = injector.getInstance( DistributedQueueService.class );
         ShardCounterSerialization shardCounterSer = injector.getInstance( ShardCounterSerialization.class );
 
+        Assert.assertEquals( "test assumes 'queue.shard.max.size' is 15 ", 15, qakkaFig.getMaxShardSize() );
 
         String region = actorSystemFig.getRegionLocal();
         App app = injector.getInstance( App.class );
@@ -191,9 +193,9 @@ public class ShardAllocatorTest extends AbstractTest {
 
         try {
 
-            // Create 4000 messages
+            // Create number of messages
 
-            int numMessages = 4000;
+            int numMessages = 400;
 
             for (int i = 0; i < numMessages; i++) {
                 queueMessageManager.sendMessages(
@@ -208,10 +210,10 @@ public class ShardAllocatorTest extends AbstractTest {
 
             distributedQueueService.refresh();
 
-            // Test that 8 shards were created
-
-            Assert.assertTrue( "num shards >= 7",
-                countShards( cassandraClient, shardCounterSer, queueName, region, Shard.Type.DEFAULT ) >= 7 );
+            // Test that right number of shards created
+            int shardCount = countShards( cassandraClient, shardCounterSer, queueName, region, Shard.Type.DEFAULT );
+            Assert.assertTrue( "shards > 10", shardCount > 10 );
+            Assert.assertTrue( "shards < 20", shardCount < 20 );
 
         } finally {
             queueManager.deleteQueue( queueName );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/7f3b2dae/stack/corepersistence/queue/src/test/resources/qakka.properties
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/resources/qakka.properties b/stack/corepersistence/queue/src/test/resources/qakka.properties
index ef1be80..142138d 100644
--- a/stack/corepersistence/queue/src/test/resources/qakka.properties
+++ b/stack/corepersistence/queue/src/test/resources/qakka.properties
@@ -34,14 +34,14 @@ usergrid.cluster.region.local=us-east
 usergrid.cluster.seeds=us-east:localhost
 
 # Port used for cluster communications.
-usergrid.cluster.port=2551
+usergrid.cluster.port=3545
 
 queue.num.actors=50
 queue.sender.num.actors=100
 queue.writer.num.actors=100
 
 # set shard size and times low for testing purposes
-queue.shard.max.size=500
+queue.shard.max.size=10
 queue.shard.allocation.check.frequency.millis=100
 queue.shard.allocation.advance.time.millis=200
 


[6/6] usergrid git commit: Use akka blocking io dispatcher in routers

Posted by sn...@apache.org.
Use akka blocking io dispatcher in routers


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/9f2863fd
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/9f2863fd
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/9f2863fd

Branch: refs/heads/usergrid-1318-queue
Commit: 9f2863fd6e28977551a6cd98ac44b869ac337608
Parents: 63561ee
Author: Dave Johnson <sn...@apache.org>
Authored: Mon Oct 10 17:24:26 2016 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Mon Oct 10 17:24:26 2016 -0400

----------------------------------------------------------------------
 .../uniquevalues/UniqueValuesRouter.java           | 17 ++++-------------
 .../qakka/distributed/actors/QueueActorRouter.java |  3 ++-
 .../distributed/actors/QueueSenderRouter.java      |  7 ++++---
 .../distributed/actors/QueueWriterRouter.java      |  7 ++++---
 4 files changed, 14 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/9f2863fd/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesRouter.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesRouter.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesRouter.java
index 47db3a5..355320b 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesRouter.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesRouter.java
@@ -34,25 +34,16 @@ import org.slf4j.LoggerFactory;
 public class UniqueValuesRouter extends UntypedActor {
     private static final Logger logger = LoggerFactory.getLogger( UniqueValueActor.class );
 
-    private final String name = RandomStringUtils.randomAlphanumeric( 4 );
-
     private final ActorRef router;
 
+
     @Inject
-    public UniqueValuesRouter(Injector injector ) {
+    public UniqueValuesRouter() {
 
         router = getContext().actorOf(
-            FromConfig.getInstance()
-                .props(Props.create(UniqueValueActor.class)
+            FromConfig.getInstance().props(
+                Props.create( UniqueValueActor.class)
                     .withDispatcher("akka.blocking-io-dispatcher")), "router");
-
-        // TODO: is there some way to pass the injector here without getting this exception:
-        // NotSerializableException: No configured serialization-bindings for class [InjectorImpl]
-        //router = getContext().actorOf(
-            //FromConfig.getInstance().props( Props.create( GuiceActorProducer.class, injector, UniqueValueActor.class)),
-            //"router" );
-
-        //logger.info("UniqueValuesRouter {} is live with injector {}", name, injector);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9f2863fd/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java
index f908e7f..b5b9c30 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java
@@ -46,7 +46,8 @@ public class QueueActorRouter extends UntypedActor {
         this.queueActorRouterProducer = queueActorRouterProducer;
 
         this.routerRef = getContext().actorOf( FromConfig.getInstance().props(
-            Props.create(GuiceActorProducer.class, QueueActor.class)), "router");
+            Props.create( GuiceActorProducer.class, QueueActor.class)
+                .withDispatcher("akka.blocking-io-dispatcher")), "router");
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9f2863fd/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSenderRouter.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSenderRouter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSenderRouter.java
index a205d71..88c5a4b 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSenderRouter.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSenderRouter.java
@@ -38,16 +38,17 @@ public class QueueSenderRouter extends UntypedActor {
 
 
     @Inject
-    public QueueSenderRouter( Injector injector ) {
+    public QueueSenderRouter() {
 
         this.router = getContext().actorOf( FromConfig.getInstance().props(
-            Props.create( GuiceActorProducer.class, QueueSender.class )), "router");
+            Props.create( GuiceActorProducer.class, QueueSender.class )
+                .withDispatcher("akka.blocking-io-dispatcher")), "router");
     }
 
     @Override
     public void onReceive(Object message) {
 
-        if ( message instanceof QueueSendRequest) {
+        if ( message instanceof QueueSendRequest ) {
             router.tell( message, getSender() );
 
         } else {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9f2863fd/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriterRouter.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriterRouter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriterRouter.java
index cb06c1d..c3436eb 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriterRouter.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriterRouter.java
@@ -36,18 +36,19 @@ public class QueueWriterRouter extends UntypedActor {
 
     private final ActorRef router;
 
+
     @Inject
     public QueueWriterRouter() {
 
         this.router = getContext().actorOf( FromConfig.getInstance().props(
-            Props.create( GuiceActorProducer.class, QueueWriter.class )), "router");
+            Props.create( GuiceActorProducer.class, QueueWriter.class )
+                .withDispatcher("akka.blocking-io-dispatcher")), "router");
     }
 
     @Override
     public void onReceive(Object message) {
 
-        if (   message instanceof QueueWriteRequest || message instanceof QueueAckRequest ) {
-
+        if ( message instanceof QueueWriteRequest || message instanceof QueueAckRequest ) {
             router.tell( message, getSender() );
 
         } else {