You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2016/09/30 21:18:55 UTC

[10/10] usergrid git commit: Change to use proper Guice injection instead of static injector kludge.

Change to use proper Guice injection instead of static injector kludge.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/5a19ba9a
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/5a19ba9a
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/5a19ba9a

Branch: refs/heads/usergrid-1318-queue
Commit: 5a19ba9a748c5d96f5356d77df1e7845aa92fc0f
Parents: 2cd8ecb
Author: Dave Johnson <sn...@apache.org>
Authored: Fri Sep 30 17:18:24 2016 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Fri Sep 30 17:18:24 2016 -0400

----------------------------------------------------------------------
 build.log                                       |  15 ++
 .../apache/usergrid/persistence/qakka/App.java  |  17 +-
 .../qakka/core/impl/InMemoryQueue.java          |  11 +-
 .../core/impl/QueueMessageManagerImpl.java      |  18 +-
 .../distributed/DistributedQueueService.java    |   2 +-
 .../qakka/distributed/actors/QueueActor.java    |  56 +++---
 .../distributed/actors/QueueActorHelper.java    | 105 ++++++++++-
 .../distributed/actors/QueueActorRouter.java    |  11 +-
 .../distributed/actors/QueueRefresher.java      | 120 +-----------
 .../qakka/distributed/actors/QueueSender.java   |   5 +-
 .../distributed/actors/QueueSenderRouter.java   |  10 +-
 .../distributed/actors/QueueTimeouter.java      |  26 +--
 .../qakka/distributed/actors/QueueWriter.java   |   6 +-
 .../distributed/actors/QueueWriterRouter.java   |  11 +-
 .../distributed/actors/ShardAllocator.java      |  22 +--
 .../impl/DistributedQueueServiceImpl.java       |  26 ++-
 .../qakka/core/QueueMessageManagerTest.java     | 178 +++++++++---------
 .../distributed/QueueActorServiceTest.java      | 112 ++++++-----
 .../actors/QueueActorHelperTest.java            | 186 +++++++++++--------
 .../distributed/actors/QueueReaderTest.java     |  11 +-
 .../distributed/actors/QueueTimeouterTest.java  |   7 +-
 .../distributed/actors/ShardAllocatorTest.java  |  30 +--
 .../queue/src/test/resources/log4j.properties   |   6 +-
 23 files changed, 531 insertions(+), 460 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a19ba9a/build.log
----------------------------------------------------------------------
diff --git a/build.log b/build.log
new file mode 100644
index 0000000..43ffacd
--- /dev/null
+++ b/build.log
@@ -0,0 +1,15 @@
+[INFO] Scanning for projects...
+[INFO] ------------------------------------------------------------------------
+[INFO] BUILD FAILURE
+[INFO] ------------------------------------------------------------------------
+[INFO] Total time: 0.086 s
+[INFO] Finished at: 2016-09-30T07:54:28-04:00
+[INFO] Final Memory: 46M/6710M
+[INFO] ------------------------------------------------------------------------
+[ERROR] The goal you specified requires a project to execute but there is no POM in this directory (/Users/ApigeeCorporation/src/usergrid-snoopdave). Please verify you invoked Maven from the correct directory. -> [Help 1]
+[ERROR] 
+[ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
+[ERROR] Re-run Maven using the -X switch to enable full debug logging.
+[ERROR] 
+[ERROR] For more information about the errors and possible solutions, please read the following articles:
+[ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MissingProjectException

http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a19ba9a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/App.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/App.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/App.java
index abbf3da..35fdb20 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/App.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/App.java
@@ -44,9 +44,6 @@ import org.slf4j.LoggerFactory;
 public class App implements MetricsService {
     private static final Logger logger = LoggerFactory.getLogger( App.class );
 
-    // TODO: can we avoid this kludge with better Akka-Guice integration?
-    static public Injector INJECTOR;
-
     private final ActorSystemFig          actorSystemFig;
     private final ActorSystemManager      actorSystemManager;
     private final DistributedQueueService distributedQueueService;
@@ -55,14 +52,16 @@ public class App implements MetricsService {
 
     @Inject
     public App(
-            Injector                  injector,
             QakkaFig                  qakkaFig,
             ActorSystemFig            actorSystemFig,
             ActorSystemManager        actorSystemManager,
             DistributedQueueService   distributedQueueService,
-            MigrationManager          migrationManager) {
+            MigrationManager          migrationManager,
+            QueueActorRouterProducer  queueActorRouterProducer,
+            QueueWriterRouterProducer queueWriterRouterProducer,
+            QueueSenderRouterProducer queueSenderRouterProducer
+            ) {
 
-        this.INJECTOR = injector;
         this.actorSystemFig = actorSystemFig;
         this.actorSystemManager = actorSystemManager;
         this.distributedQueueService = distributedQueueService;
@@ -74,9 +73,9 @@ public class App implements MetricsService {
             } catch (MigrationException e) {
                 throw new QakkaRuntimeException( "Error running migration", e );
             }
-            actorSystemManager.registerRouterProducer( injector.getInstance( QueueActorRouterProducer.class ) );
-            actorSystemManager.registerRouterProducer( injector.getInstance( QueueWriterRouterProducer.class ) );
-            actorSystemManager.registerRouterProducer( injector.getInstance( QueueSenderRouterProducer.class ) );
+            actorSystemManager.registerRouterProducer( queueActorRouterProducer );
+            actorSystemManager.registerRouterProducer( queueWriterRouterProducer );
+            actorSystemManager.registerRouterProducer( queueSenderRouterProducer );
         }
     }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a19ba9a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java
index 27de079..1f6fe6e 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java
@@ -19,8 +19,10 @@
 
 package org.apache.usergrid.persistence.qakka.core.impl;
 
+import com.datastax.driver.core.utils.UUIDs;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 import org.apache.usergrid.persistence.qakka.QakkaFig;
 import org.apache.usergrid.persistence.qakka.distributed.actors.QueueRefresher;
 import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
@@ -71,7 +73,14 @@ public class InMemoryQueue {
     }
 
     public UUID getNewest( String queueName ) {
-        return newestByQueueName.get( queueName );
+        UUID newest = newestByQueueName.get( queueName );
+//        if ( newest == null ) {
+//            // Create oldest UUID from a UNIX timestamp via DataStax utility
+//            // https://docs.datastax.com/en/drivers/java/2.0/com/datastax/driver/core/utils/UUIDs.html
+//            newest = UUIDs.startOf( 0L );
+//            newestByQueueName.put( queueName, newest );
+//        }
+        return newest;
     }
 
     public DatabaseQueueMessage poll( String queueName ) {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a19ba9a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java
index 691c1a6..59e0ce0 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java
@@ -138,10 +138,6 @@ public class QueueMessageManagerImpl implements QueueMessageManager {
     @Override
     public List<QueueMessage> getNextMessages(String queueName, int count) {
 
-        if ( queueManager.getQueueConfig( queueName ) == null ) {
-            throw new NotFoundException( "Queue not found: " + queueName );
-        }
-
         Collection<DatabaseQueueMessage> dbMessages = distributedQueueService.getNextMessages( queueName, count );
 
         List<QueueMessage> queueMessages = joinMessages( queueName, dbMessages );
@@ -210,15 +206,14 @@ public class QueueMessageManagerImpl implements QueueMessageManager {
     @Override
     public void ackMessage(String queueName, UUID queueMessageId) {
 
-        if ( queueManager.getQueueConfig( queueName ) == null ) {
-            throw new NotFoundException( "Queue not found: " + queueName );
-        }
-
         DistributedQueueService.Status status = distributedQueueService.ackMessage( queueName, queueMessageId );
 
-        if ( DistributedQueueService.Status.BAD_REQUEST.equals( status )) {
+        if ( DistributedQueueService.Status.NOT_INFLIGHT.equals( status )) {
             throw new BadRequestException( "Message not inflight" );
 
+        } else if ( DistributedQueueService.Status.BAD_REQUEST.equals( status )) {
+            throw new BadRequestException( "Bad request" );
+
         } else if ( DistributedQueueService.Status.ERROR.equals( status )) {
             throw new QakkaRuntimeException( "Unable to ack message due to error" );
         }
@@ -228,10 +223,6 @@ public class QueueMessageManagerImpl implements QueueMessageManager {
     @Override
     public void requeueMessage(String queueName, UUID messageId, Long delayMs) {
 
-        if ( queueManager.getQueueConfig( queueName ) == null ) {
-            throw new NotFoundException( "Queue not found: " + queueName );
-        }
-
         // TODO: implement requeueMessage
 
         throw new UnsupportedOperationException( "requeueMessage not yet implemented" );
@@ -268,7 +259,6 @@ public class QueueMessageManagerImpl implements QueueMessageManager {
 
         // first look in INFLIGHT storage
 
-
         DatabaseQueueMessage dbMessage = queueMessageSerialization.loadMessage(
                 queueName, actorSystemFig.getRegionLocal(), null,
                 DatabaseQueueMessage.Type.INFLIGHT, queueMessageId );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a19ba9a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/DistributedQueueService.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/DistributedQueueService.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/DistributedQueueService.java
index b02a623..b11dcff 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/DistributedQueueService.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/DistributedQueueService.java
@@ -30,7 +30,7 @@ import java.util.UUID;
  */
 public interface DistributedQueueService {
 
-    enum Status { SUCCESS, ERROR, BAD_REQUEST };
+    enum Status { SUCCESS, ERROR, BAD_REQUEST, NOT_INFLIGHT };
 
     void init();
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a19ba9a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
index 87342ad..5ebba3d 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
@@ -24,7 +24,9 @@ import akka.actor.Cancellable;
 import akka.actor.Props;
 import akka.actor.UntypedActor;
 import com.codahale.metrics.Timer;
+import com.google.inject.Inject;
 import com.google.inject.Injector;
+import org.apache.usergrid.persistence.actorsystem.GuiceActorProducer;
 import org.apache.usergrid.persistence.qakka.App;
 import org.apache.usergrid.persistence.qakka.MetricsService;
 import org.apache.usergrid.persistence.qakka.QakkaFig;
@@ -65,10 +67,13 @@ public class QueueActor extends UntypedActor {
     private final AtomicLong messageCount = new AtomicLong(0);
     private final Set<String> queuesSeen = new HashSet<>();
 
+    private final Injector injector;
 
-    public QueueActor() {
 
-        Injector injector = App.INJECTOR;
+    @Inject
+    public QueueActor( Injector injector ) {
+
+        this.injector = injector;
 
         qakkaFig         = injector.getInstance( QakkaFig.class );
         inMemoryQueue    = injector.getInstance( InMemoryQueue.class );
@@ -107,7 +112,7 @@ public class QueueActor extends UntypedActor {
                         getContext().dispatcher(),
                         getSelf());
                 timeoutSchedulersByQueueName.put( request.getQueueName(), scheduler );
-                logger.debug("Created scheduler for queue {}", request.getQueueName() );
+                logger.debug("Created timeouter for queue {}", request.getQueueName() );
             }
 
             if ( shardAllocationSchedulersByQueueName.get( request.getQueueName() ) == null ) {
@@ -126,18 +131,20 @@ public class QueueActor extends UntypedActor {
             QueueRefreshRequest request = (QueueRefreshRequest)message;
 
             queuesSeen.add( request.getQueueName() );
-
-            if ( queueReadersByQueueName.get( request.getQueueName() ) == null ) {
-
-                if ( !request.isOnlyIfEmpty() || inMemoryQueue.peek( request.getQueueName()) == null ) {
-                    ActorRef readerRef = getContext().actorOf( Props.create(
-                        QueueRefresher.class, request.getQueueName()), request.getQueueName() + "_reader");
-                    queueReadersByQueueName.put( request.getQueueName(), readerRef );
-                }
-            }
-
-            // hand-off to queue's reader
-            queueReadersByQueueName.get( request.getQueueName() ).tell( request, self() );
+            queueActorHelper.queueRefresh( request.getQueueName() );
+
+//            if ( queueReadersByQueueName.get( request.getQueueName() ) == null ) {
+//
+//                if ( !request.isOnlyIfEmpty() || inMemoryQueue.peek( request.getQueueName()) == null ) {
+//                    ActorRef readerRef = getContext().actorOf(
+//                        Props.create( GuiceActorProducer.class, injector, QueueRefresher.class ),
+//                        request.getQueueName() + "_reader");
+//                    queueReadersByQueueName.put( request.getQueueName(), readerRef );
+//                }
+//            }
+//
+//            // hand-off to queue's reader
+//            queueReadersByQueueName.get( request.getQueueName() ).tell( request, self() );
 
         } else if ( message instanceof QueueTimeoutRequest ) {
             QueueTimeoutRequest request = (QueueTimeoutRequest)message;
@@ -145,8 +152,9 @@ public class QueueActor extends UntypedActor {
             queuesSeen.add( request.getQueueName() );
 
             if ( queueTimeoutersByQueueName.get( request.getQueueName() ) == null ) {
-                ActorRef readerRef = getContext().actorOf( Props.create(
-                    QueueTimeouter.class, request.getQueueName()), request.getQueueName() + "_timeouter");
+                ActorRef readerRef = getContext().actorOf(
+                    Props.create( GuiceActorProducer.class, injector, QueueTimeouter.class),
+                    request.getQueueName() + "_timeouter");
                 queueTimeoutersByQueueName.put( request.getQueueName(), readerRef );
             }
 
@@ -160,8 +168,9 @@ public class QueueActor extends UntypedActor {
             queuesSeen.add( request.getQueueName() );
 
             if ( shardAllocatorsByQueueName.get( request.getQueueName() ) == null ) {
-                ActorRef readerRef = getContext().actorOf( Props.create(
-                        ShardAllocator.class, request.getQueueName()), request.getQueueName() + "_shard_allocator");
+                ActorRef readerRef = getContext().actorOf(
+                    Props.create( GuiceActorProducer.class, injector, ShardAllocator.class),
+                    request.getQueueName() + "_shard_allocator");
                 shardAllocatorsByQueueName.put( request.getQueueName(), readerRef );
             }
 
@@ -181,15 +190,15 @@ public class QueueActor extends UntypedActor {
 
                 while (queueMessages.size() < queueGetRequest.getNumRequested()) {
 
-                    DatabaseQueueMessage queueMessage = inMemoryQueue.poll( queueGetRequest.getQueueName() );
+                    DatabaseQueueMessage queueMessage = inMemoryQueue.peek( queueGetRequest.getQueueName() );
 
                     if (queueMessage != null) {
                         if (queueActorHelper.putInflight( queueGetRequest.getQueueName(), queueMessage )) {
                             queueMessages.add( queueMessage );
                         }
                     } else {
-//                        logger.debug("in-memory queue for {} is empty, object is: {}",
-//                                queueGetRequest.getQueueName(), inMemoryQueue );
+                        logger.debug("in-memory queue for {} is empty, object is: {}",
+                                queueGetRequest.getQueueName(), inMemoryQueue );
                         break;
                     }
                 }
@@ -199,6 +208,9 @@ public class QueueActor extends UntypedActor {
                     DatabaseQueueMessage.Type.DEFAULT,
                     queueMessages.size());
 
+                logger.debug("{} returning {} for queue {}",
+                    this, queueMessages.size(), queueGetRequest.getQueueName());
+
                 getSender().tell( new QueueGetResponse(
                         DistributedQueueService.Status.SUCCESS, queueMessages ), getSender() );
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a19ba9a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java
index 26db903..68250df 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java
@@ -19,17 +19,28 @@
 
 package org.apache.usergrid.persistence.qakka.distributed.actors;
 
+import com.codahale.metrics.Timer;
 import com.google.inject.Inject;
 import org.apache.usergrid.persistence.actorsystem.ActorSystemFig;
+import org.apache.usergrid.persistence.qakka.MetricsService;
+import org.apache.usergrid.persistence.qakka.QakkaFig;
+import org.apache.usergrid.persistence.qakka.core.CassandraClient;
+import org.apache.usergrid.persistence.qakka.core.impl.InMemoryQueue;
 import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService;
+import org.apache.usergrid.persistence.qakka.serialization.MultiShardMessageIterator;
 import org.apache.usergrid.persistence.qakka.serialization.auditlog.AuditLog;
 import org.apache.usergrid.persistence.qakka.serialization.auditlog.AuditLogSerialization;
 import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
 import org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization;
+import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard;
+import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardIterator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.text.DecimalFormat;
+import java.util.Optional;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
 
 
 public class QueueActorHelper {
@@ -38,18 +49,33 @@ public class QueueActorHelper {
     private final ActorSystemFig            actorSystemFig;
     private final QueueMessageSerialization messageSerialization;
     private final AuditLogSerialization     auditLogSerialization;
+    private final InMemoryQueue             inMemoryQueue;
+    private final QakkaFig                  qakkaFig;
+    private final MetricsService            metricsService;
+    private final CassandraClient           cassandraClient;
+
+    private final AtomicLong runCount = new AtomicLong(0);
+    private final AtomicLong totalRead = new AtomicLong(0);
 
 
     @Inject
     public QueueActorHelper(
-            ActorSystemFig actorSystemFig,
+            QakkaFig                  qakkaFig,
+            ActorSystemFig            actorSystemFig,
             QueueMessageSerialization messageSerialization,
-            AuditLogSerialization auditLogSerialization
+            AuditLogSerialization     auditLogSerialization,
+            InMemoryQueue             inMemoryQueue,
+            MetricsService            metricsService,
+            CassandraClient           cassandraClient
             ) {
 
-        this.actorSystemFig = actorSystemFig;
-        this.messageSerialization = messageSerialization;
+        this.actorSystemFig        = actorSystemFig;
+        this.messageSerialization  = messageSerialization;
         this.auditLogSerialization = auditLogSerialization;
+        this.inMemoryQueue         = inMemoryQueue;
+        this.qakkaFig              = qakkaFig;
+        this.metricsService        = metricsService;
+        this.cassandraClient       = cassandraClient;
     }
 
 
@@ -78,7 +104,7 @@ public class QueueActorHelper {
                 queueName, queueMessageId, DatabaseQueueMessage.Type.INFLIGHT );
 
         if ( queueMessage == null ) {
-            return DistributedQueueService.Status.BAD_REQUEST;
+            return DistributedQueueService.Status.NOT_INFLIGHT;
         }
 
         boolean error = false;
@@ -164,4 +190,73 @@ public class QueueActorHelper {
 
         return true;
     }
+
+    void queueRefresh( String queueName ) {
+
+        Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.REFRESH_TIME).time();
+
+        try {
+
+            if (inMemoryQueue.size( queueName ) < qakkaFig.getQueueInMemorySize()) {
+
+                ShardIterator shardIterator = new ShardIterator(
+                    cassandraClient, queueName, actorSystemFig.getRegionLocal(),
+                    Shard.Type.DEFAULT, Optional.empty() );
+
+                UUID since = inMemoryQueue.getNewest( queueName );
+
+                String region = actorSystemFig.getRegionLocal();
+                MultiShardMessageIterator multiShardIterator = new MultiShardMessageIterator(
+                    cassandraClient, queueName, region, DatabaseQueueMessage.Type.DEFAULT,
+                    shardIterator, since);
+
+                int need = qakkaFig.getQueueInMemorySize() - inMemoryQueue.size( queueName );
+                int count = 0;
+
+                while ( multiShardIterator.hasNext() && count < need ) {
+                    DatabaseQueueMessage queueMessage = multiShardIterator.next();
+                    inMemoryQueue.add( queueName, queueMessage );
+                    count++;
+                }
+
+                long runs = runCount.incrementAndGet();
+                long readCount = totalRead.addAndGet( count );
+
+                if ( logger.isDebugEnabled() && runs % 100 == 0 ) {
+
+                    final DecimalFormat format = new DecimalFormat("##.###");
+                    final long nano = 1000000000;
+                    Timer t = metricsService.getMetricRegistry().timer( MetricsService.REFRESH_TIME );
+
+                    logger.debug("QueueRefresher for queue '{}' stats:\n" +
+                            "   Num runs={}\n" +
+                            "   Read count={}\n" +
+                            "   Mean={}\n" +
+                            "   One min rate={}\n" +
+                            "   Five min rate={}\n" +
+                            "   Snapshot mean={}\n" +
+                            "   Snapshot min={}\n" +
+                            "   Snapshot max={}",
+                        queueName,
+                        t.getCount(),
+                        readCount,
+                        format.format( t.getMeanRate() ),
+                        format.format( t.getOneMinuteRate() ),
+                        format.format( t.getFiveMinuteRate() ),
+                        format.format( t.getSnapshot().getMean() / nano ),
+                        format.format( (double) t.getSnapshot().getMin() / nano ),
+                        format.format( (double) t.getSnapshot().getMax() / nano ) );
+                }
+
+                if ( count > 0 ) {
+                    logger.debug( "Added {} in-memory for queue {}, new size = {}",
+                        count, queueName, inMemoryQueue.size( queueName ) );
+                }
+            }
+
+        } finally {
+            timer.close();
+        }
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a19ba9a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java
index 97e591c..9257a0d 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java
@@ -24,6 +24,9 @@ import akka.actor.Props;
 import akka.actor.UntypedActor;
 import akka.routing.ConsistentHashingRouter;
 import akka.routing.FromConfig;
+import com.google.inject.Inject;
+import com.google.inject.Injector;
+import org.apache.usergrid.persistence.actorsystem.GuiceActorProducer;
 import org.apache.usergrid.persistence.qakka.distributed.messages.*;
 
 
@@ -35,9 +38,11 @@ public class QueueActorRouter extends UntypedActor {
     private final ActorRef routerRef;
 
 
-    public QueueActorRouter() {
-        routerRef = getContext().actorOf(
-                FromConfig.getInstance().props( Props.create(QueueActor.class)), "router");
+    @Inject
+    public QueueActorRouter( Injector injector ) {
+
+        this.routerRef = getContext().actorOf( FromConfig.getInstance().props(
+            Props.create(GuiceActorProducer.class, injector, QueueActor.class)), "router");
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a19ba9a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java
index dbd5235..2f70088 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java
@@ -20,55 +20,20 @@
 package org.apache.usergrid.persistence.qakka.distributed.actors;
 
 import akka.actor.UntypedActor;
-import com.codahale.metrics.Timer;
-import com.google.inject.Injector;
-import org.apache.usergrid.persistence.actorsystem.ActorSystemFig;
-import org.apache.usergrid.persistence.qakka.App;
-import org.apache.usergrid.persistence.qakka.MetricsService;
-import org.apache.usergrid.persistence.qakka.QakkaFig;
-import org.apache.usergrid.persistence.qakka.core.CassandraClient;
-import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl;
-import org.apache.usergrid.persistence.qakka.core.impl.InMemoryQueue;
+import com.google.inject.Inject;
 import org.apache.usergrid.persistence.qakka.distributed.messages.QueueRefreshRequest;
-import org.apache.usergrid.persistence.qakka.exceptions.QakkaRuntimeException;
-import org.apache.usergrid.persistence.qakka.serialization.MultiShardMessageIterator;
-import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
-import org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization;
-import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard;
-import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardIterator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.text.DecimalFormat;
-import java.util.Optional;
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicLong;
-
 
 public class QueueRefresher extends UntypedActor {
     private static final Logger logger = LoggerFactory.getLogger( QueueRefresher.class );
 
-    private final String          queueName;
-    private final InMemoryQueue   inMemoryQueue;
-    private final QakkaFig        qakkaFig;
-    private final ActorSystemFig  actorSystemFig;
-    private final MetricsService  metricsService;
-    private final CassandraClient cassandraClient;
-
-    private final AtomicLong      runCount = new AtomicLong(0);
-    private final AtomicLong      totalRead = new AtomicLong(0);
-
+    final QueueActorHelper helper;
 
-    public QueueRefresher(String queueName ) {
-        this.queueName = queueName;
-
-        Injector injector = App.INJECTOR;
-
-        inMemoryQueue  = injector.getInstance( InMemoryQueue.class );
-        qakkaFig       = injector.getInstance( QakkaFig.class );
-        actorSystemFig = injector.getInstance( ActorSystemFig.class );
-        metricsService = injector.getInstance( MetricsService.class );
-        cassandraClient = injector.getInstance( CassandraClientImpl.class );
+    @Inject
+    public QueueRefresher( QueueActorHelper helper ) {
+        this.helper = helper;
     }
 
 
@@ -78,78 +43,9 @@ public class QueueRefresher extends UntypedActor {
         if ( message instanceof QueueRefreshRequest ) {
 
             QueueRefreshRequest request = (QueueRefreshRequest) message;
-
-            //logger.debug( "running for queue {}", queueName );
-
-            if (!request.getQueueName().equals( queueName )) {
-                throw new QakkaRuntimeException(
-                        "QueueWriter for " + queueName + ": Incorrect queueName " + request.getQueueName() );
-            }
-
-            Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.REFRESH_TIME).time();
-
-            try {
-
-                if (inMemoryQueue.size( queueName ) < qakkaFig.getQueueInMemorySize()) {
-
-                    ShardIterator shardIterator = new ShardIterator(
-                            cassandraClient, queueName, actorSystemFig.getRegionLocal(),
-                            Shard.Type.DEFAULT, Optional.empty() );
-
-                    UUID since = inMemoryQueue.getNewest( queueName );
-
-                    String region = actorSystemFig.getRegionLocal();
-                    MultiShardMessageIterator multiShardIterator = new MultiShardMessageIterator(
-                            cassandraClient, queueName, region, DatabaseQueueMessage.Type.DEFAULT,
-                            shardIterator, since);
-
-                    int need = qakkaFig.getQueueInMemorySize() - inMemoryQueue.size( queueName );
-                    int count = 0;
-
-                    while ( multiShardIterator.hasNext() && count < need ) {
-                        DatabaseQueueMessage queueMessage = multiShardIterator.next();
-                        inMemoryQueue.add( queueName, queueMessage );
-                        count++;
-                    }
-
-                    long runs = runCount.incrementAndGet();
-                    long readCount = totalRead.addAndGet( count );
-
-                    if ( logger.isDebugEnabled() && runs % 100 == 0 ) {
-
-                        final DecimalFormat format = new DecimalFormat("##.###");
-                        final long nano = 1000000000;
-                        Timer t = metricsService.getMetricRegistry().timer(MetricsService.REFRESH_TIME );
-
-                        logger.debug("QueueRefresher for queue '{}' stats:\n" +
-                            "   Num runs={}\n" +
-                            "   Read count={}\n" +
-                            "   Mean={}\n" +
-                            "   One min rate={}\n" +
-                            "   Five min rate={}\n" +
-                            "   Snapshot mean={}\n" +
-                            "   Snapshot min={}\n" +
-                            "   Snapshot max={}",
-                            queueName,
-                            t.getCount(),
-                            readCount,
-                            format.format( t.getMeanRate() ),
-                            format.format( t.getOneMinuteRate() ),
-                            format.format( t.getFiveMinuteRate() ),
-                            format.format( t.getSnapshot().getMean() / nano ),
-                            format.format( (double) t.getSnapshot().getMin() / nano ),
-                            format.format( (double) t.getSnapshot().getMax() / nano ) );
-                    }
-
-//                    if ( count > 0 ) {
-//                        logger.debug( "Added {} in-memory for queue {}, new size = {}",
-//                                count, queueName, inMemoryQueue.size( queueName ) );
-//                    }
-                }
-
-            } finally {
-                timer.close();
-            }
+            logger.debug( "running for queue {}", request.getQueueName() );
+            String queueName = request.getQueueName();
+            helper.queueRefresh( queueName );
 
         } else {
             unhandled( message );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a19ba9a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java
index 03d1216..739e1c4 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java
@@ -25,6 +25,7 @@ import akka.cluster.client.ClusterClient;
 import akka.pattern.Patterns;
 import akka.util.Timeout;
 import com.codahale.metrics.Timer;
+import com.google.inject.Inject;
 import com.google.inject.Injector;
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.usergrid.persistence.actorsystem.ActorSystemFig;
@@ -63,9 +64,9 @@ public class QueueSender extends UntypedActor {
     private final QakkaFig qakkaFig;
     private final MetricsService            metricsService;
 
-    public QueueSender() {
 
-        Injector injector = App.INJECTOR;
+    @Inject
+    public QueueSender( Injector injector ) {
 
         actorSystemManager       = injector.getInstance( ActorSystemManager.class );
         transferLogSerialization = injector.getInstance( TransferLogSerialization.class );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a19ba9a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSenderRouter.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSenderRouter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSenderRouter.java
index 20603a5..92d0785 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSenderRouter.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSenderRouter.java
@@ -23,6 +23,9 @@ import akka.actor.ActorRef;
 import akka.actor.Props;
 import akka.actor.UntypedActor;
 import akka.routing.FromConfig;
+import com.google.inject.Inject;
+import com.google.inject.Injector;
+import org.apache.usergrid.persistence.actorsystem.GuiceActorProducer;
 import org.apache.usergrid.persistence.qakka.distributed.messages.QueueSendRequest;
 
 
@@ -34,10 +37,11 @@ public class QueueSenderRouter extends UntypedActor {
     private final ActorRef router;
 
 
-    public QueueSenderRouter() {
+    @Inject
+    public QueueSenderRouter( Injector injector ) {
 
-        router = getContext().actorOf(
-                FromConfig.getInstance().props(Props.create(QueueSender.class )), "router");
+        this.router = getContext().actorOf( FromConfig.getInstance().props(
+            Props.create( GuiceActorProducer.class, injector, QueueSender.class )), "router");
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a19ba9a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java
index b47aac6..9b11277 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java
@@ -21,6 +21,7 @@ package org.apache.usergrid.persistence.qakka.distributed.actors;
 
 import akka.actor.UntypedActor;
 import com.codahale.metrics.Timer;
+import com.google.inject.Inject;
 import com.google.inject.Injector;
 import org.apache.usergrid.persistence.actorsystem.ActorSystemFig;
 import org.apache.usergrid.persistence.qakka.App;
@@ -49,8 +50,6 @@ import java.util.concurrent.atomic.AtomicLong;
 public class QueueTimeouter extends UntypedActor {
     private static final Logger logger = LoggerFactory.getLogger( QueueTimeouter.class );
 
-    private final String                    queueName;
-
     private final QueueMessageSerialization messageSerialization;
     private final MetricsService            metricsService;
     private final ActorSystemFig            actorSystemFig;
@@ -62,10 +61,8 @@ public class QueueTimeouter extends UntypedActor {
     private final AtomicLong totalTimedout = new AtomicLong(0);
 
 
-    public QueueTimeouter(String queueName ) {
-        this.queueName = queueName;
-
-        Injector injector = App.INJECTOR;
+    @Inject
+    public QueueTimeouter( Injector injector) {
 
         messageSerialization = injector.getInstance( QueueMessageSerialization.class );
         actorSystemFig       = injector.getInstance( ActorSystemFig.class );
@@ -88,10 +85,7 @@ public class QueueTimeouter extends UntypedActor {
 
                 QueueTimeoutRequest request = (QueueTimeoutRequest) message;
 
-                if (!request.getQueueName().equals( queueName )) {
-                    throw new QakkaRuntimeException(
-                            "QueueTimeouter for " + queueName + ": Incorrect queueName " + request.getQueueName() );
-                }
+                String queueName = request.getQueueName();
 
                 //logger.debug("Processing timeouts for queue {} ", queueName );
 
@@ -171,12 +165,12 @@ public class QueueTimeouter extends UntypedActor {
                         format.format( (double) t.getSnapshot().getMax() / nano ) );
                 }
 
-//                if (count > 0) {
-//                    logger.debug( "Timed out {} messages for queue {}", count, queueName );
-//
-//                    messageCounterSerialization.decrementCounter(
-//                        queueName, DatabaseQueueMessage.Type.DEFAULT, count);
-//                }
+                if (count > 0) {
+                    logger.debug( "Timed out {} messages for queue {}", count, queueName );
+
+                    messageCounterSerialization.decrementCounter(
+                        queueName, DatabaseQueueMessage.Type.DEFAULT, count);
+                }
 
             } finally {
                 timer.close();

http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a19ba9a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java
index e54d916..273f0b2 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java
@@ -21,6 +21,7 @@ package org.apache.usergrid.persistence.qakka.distributed.actors;
 
 import akka.actor.UntypedActor;
 import com.codahale.metrics.Timer;
+import com.google.inject.Inject;
 import com.google.inject.Injector;
 import org.apache.usergrid.persistence.qakka.App;
 import org.apache.usergrid.persistence.qakka.MetricsService;
@@ -54,9 +55,8 @@ public class QueueWriter extends UntypedActor {
     private final MessageCounterSerialization messageCounterSerialization;
 
 
-    public QueueWriter() {
-
-        Injector injector = App.INJECTOR;
+    @Inject
+    public QueueWriter( Injector injector ) {
 
         messageSerialization     = injector.getInstance( QueueMessageSerialization.class );
         transferLogSerialization = injector.getInstance( TransferLogSerialization.class );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a19ba9a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriterRouter.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriterRouter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriterRouter.java
index 9cf06d9..f0540af 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriterRouter.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriterRouter.java
@@ -23,6 +23,9 @@ import akka.actor.ActorRef;
 import akka.actor.Props;
 import akka.actor.UntypedActor;
 import akka.routing.FromConfig;
+import com.google.inject.Inject;
+import com.google.inject.Injector;
+import org.apache.usergrid.persistence.actorsystem.GuiceActorProducer;
 import org.apache.usergrid.persistence.qakka.distributed.messages.QueueWriteRequest;
 
 
@@ -33,11 +36,11 @@ public class QueueWriterRouter extends UntypedActor {
 
     private final ActorRef router;
 
+    @Inject
+    public QueueWriterRouter( Injector injector ) {
 
-    public QueueWriterRouter() {
-
-        router = getContext().actorOf(
-                FromConfig.getInstance().props(Props.create(QueueWriter.class )), "router");
+        this.router = getContext().actorOf( FromConfig.getInstance().props(
+            Props.create( GuiceActorProducer.class, injector, QueueWriter.class )), "router");
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a19ba9a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocator.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocator.java
index 46e4906..65c3370 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocator.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocator.java
@@ -23,6 +23,7 @@ package org.apache.usergrid.persistence.qakka.distributed.actors;
 import akka.actor.UntypedActor;
 import com.codahale.metrics.Timer;
 import com.datastax.driver.core.utils.UUIDs;
+import com.google.inject.Inject;
 import com.google.inject.Injector;
 import org.apache.usergrid.persistence.actorsystem.ActorSystemFig;
 import org.apache.usergrid.persistence.qakka.App;
@@ -49,8 +50,6 @@ import java.util.UUID;
 public class ShardAllocator extends UntypedActor {
     private static final Logger logger = LoggerFactory.getLogger( ShardAllocator.class );
 
-    private final String queueName;
-
     private final QakkaFig qakkaFig;
     private final ActorSystemFig            actorSystemFig;
     private final ShardSerialization        shardSerialization;
@@ -59,10 +58,8 @@ public class ShardAllocator extends UntypedActor {
     private final CassandraClient           cassandraClient;
 
 
-    public ShardAllocator( String queueName ) {
-        this.queueName = queueName;
-
-        Injector injector = App.INJECTOR;
+    @Inject
+    public ShardAllocator( Injector injector ) {
 
         this.qakkaFig                  = injector.getInstance( QakkaFig.class );
         this.shardCounterSerialization = injector.getInstance( ShardCounterSerializationImpl.class );
@@ -70,8 +67,6 @@ public class ShardAllocator extends UntypedActor {
         this.actorSystemFig            = injector.getInstance( ActorSystemFig.class );
         this.metricsService            = injector.getInstance( MetricsService.class );
         this.cassandraClient           = injector.getInstance( CassandraClientImpl.class );
-
-        logger.debug( "Created shard allocator for queue {}", queueName );
     }
 
 
@@ -82,14 +77,9 @@ public class ShardAllocator extends UntypedActor {
 
             ShardCheckRequest request = (ShardCheckRequest) message;
 
-            if (!request.getQueueName().equals( queueName )) {
-                throw new QakkaRuntimeException(
-                        "ShardAllocator for " + queueName + ": Incorrect queueName " + request.getQueueName() );
-            }
-
             // check both types of shard
-            checkLatestShard( Shard.Type.DEFAULT );
-            checkLatestShard( Shard.Type.INFLIGHT );
+            checkLatestShard( request.getQueueName(), Shard.Type.DEFAULT );
+            checkLatestShard( request.getQueueName(), Shard.Type.INFLIGHT );
 
         } else {
             unhandled( message );
@@ -97,7 +87,7 @@ public class ShardAllocator extends UntypedActor {
 
     }
 
-    private void checkLatestShard( Shard.Type type ) {
+    private void checkLatestShard( String queueName, Shard.Type type ) {
 
         Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.ALLOCATE_TIME).time();
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a19ba9a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
index bcb6b79..e24bdb4 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
@@ -31,6 +31,7 @@ import org.apache.usergrid.persistence.qakka.QakkaFig;
 import org.apache.usergrid.persistence.qakka.core.QueueManager;
 import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService;
 import org.apache.usergrid.persistence.qakka.distributed.messages.*;
+import org.apache.usergrid.persistence.qakka.exceptions.NotFoundException;
 import org.apache.usergrid.persistence.qakka.exceptions.QakkaRuntimeException;
 import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
 import org.apache.usergrid.persistence.qakka.serialization.queuemessages.MessageCounterSerialization;
@@ -134,9 +135,8 @@ public class DistributedQueueServiceImpl implements DistributedQueueService {
             String queueName, String sourceRegion, String destRegion, UUID messageId,
             Long deliveryTime, Long expirationTime ) {
 
-        List<String> queueNames = queueManager.getListOfQueues();
-        if ( !queueNames.contains( queueName ) ) {
-            throw new QakkaRuntimeException( "Queue name: " + queueName + " does not exist");
+        if ( queueManager.getQueueConfig( queueName ) == null ) {
+            throw new NotFoundException( "Queue not found: " + queueName );
         }
 
         int maxRetries = qakkaFig.getMaxSendRetries();
@@ -213,9 +213,8 @@ public class DistributedQueueServiceImpl implements DistributedQueueService {
 
     public Collection<DatabaseQueueMessage> getNextMessagesInternal( String queueName, int count ) {
 
-        List<String> queueNames = queueManager.getListOfQueues();
-        if ( !queueNames.contains( queueName ) ) {
-            throw new QakkaRuntimeException( "Queue name: " + queueName + " does not exist");
+        if ( queueManager.getQueueConfig( queueName ) == null ) {
+            throw new NotFoundException( "Queue not found: " + queueName );
         }
 
         if ( actorSystemManager.getClientActor() == null || !actorSystemManager.isReady() ) {
@@ -280,9 +279,8 @@ public class DistributedQueueServiceImpl implements DistributedQueueService {
     @Override
     public Status ackMessage(String queueName, UUID queueMessageId ) {
 
-        List<String> queueNames = queueManager.getListOfQueues();
-        if ( !queueNames.contains( queueName ) ) {
-            return Status.BAD_REQUEST;
+        if ( queueManager.getQueueConfig( queueName ) == null ) {
+            throw new NotFoundException( "Queue not found: " + queueName );
         }
 
         QueueAckRequest message = new QueueAckRequest( queueName, queueMessageId );
@@ -293,9 +291,8 @@ public class DistributedQueueServiceImpl implements DistributedQueueService {
     @Override
     public Status requeueMessage(String queueName, UUID messageId) {
 
-        List<String> queueNames = queueManager.getListOfQueues();
-        if ( !queueNames.contains( queueName ) ) {
-            throw new QakkaRuntimeException( "Queue name: " + queueName + " does not exist");
+        if ( queueManager.getQueueConfig( queueName ) == null ) {
+            throw new NotFoundException( "Queue not found: " + queueName );
         }
 
         QueueAckRequest message = new QueueAckRequest( queueName, messageId );
@@ -306,9 +303,8 @@ public class DistributedQueueServiceImpl implements DistributedQueueService {
     @Override
     public Status clearMessages(String queueName) {
 
-        List<String> queueNames = queueManager.getListOfQueues();
-        if ( !queueNames.contains( queueName ) ) {
-            throw new QakkaRuntimeException( "Queue name: " + queueName + " does not exist");
+        if ( queueManager.getQueueConfig( queueName ) == null ) {
+            throw new NotFoundException( "Queue not found: " + queueName );
         }
 
         // TODO: implement clear queue

http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a19ba9a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
index c10d1f5..8ce9822 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
@@ -70,9 +70,9 @@ public class QueueMessageManagerTest extends AbstractTest {
     @Test
     public void testBasicOperation() throws Exception {
 
-        Injector injector = getInjector();
+        String queueName = "qmmt_queue_" + RandomStringUtils.randomAlphanumeric(15);
 
-        CassandraClient cassandraClient = injector.getInstance( CassandraClientImpl.class );
+        Injector injector = getInjector();
 
         DistributedQueueService distributedQueueService = injector.getInstance( DistributedQueueService.class );
         ActorSystemFig actorSystemFig = injector.getInstance( ActorSystemFig.class );
@@ -82,54 +82,60 @@ public class QueueMessageManagerTest extends AbstractTest {
         app.start( "localhost", getNextAkkaPort(), region );
 
         // create queue and send one message to it
-        String queueName = "qmmt_queue_" + RandomStringUtils.randomAlphanumeric(15);
         QueueManager queueManager = injector.getInstance( QueueManager.class );
-        QueueMessageManager qmm = injector.getInstance( QueueMessageManager.class );
-        queueManager.createQueue( new Queue( queueName, "test-type", region, region, 0L, 5, 10, null ));
-        String jsonData = "{}";
-        qmm.sendMessages( queueName, Collections.singletonList(region), null, null,
-                "application/json", DataType.serializeValue( jsonData, ProtocolVersion.NEWEST_SUPPORTED) );
 
-        distributedQueueService.refresh();
-        Thread.sleep(1000);
+        try {
 
-        // get message from the queue
-        List<QueueMessage> messages = qmm.getNextMessages( queueName, 1 );
-        Assert.assertEquals( 1, messages.size() );
-        QueueMessage message = messages.get(0);
+            QueueMessageManager qmm = injector.getInstance( QueueMessageManager.class );
+            queueManager.createQueue( new Queue( queueName, "test-type", region, region, 0L, 5, 10, null ) );
+            String jsonData = "{}";
+            qmm.sendMessages( queueName, Collections.singletonList( region ), null, null,
+                "application/json", DataType.serializeValue( jsonData, ProtocolVersion.NEWEST_SUPPORTED ) );
 
-        // test that queue message data is present and correct
-        QueueMessageSerialization qms = injector.getInstance( QueueMessageSerialization.class );
-        DatabaseQueueMessageBody data = qms.loadMessageData( message.getMessageId() );
-        Assert.assertNotNull( data );
-        Assert.assertEquals( "application/json", data.getContentType() );
-        String jsonDataReturned = new String( data.getBlob().array(), Charset.forName("UTF-8") );
-        Assert.assertEquals( jsonData, jsonDataReturned );
-
-        // test that transfer log is empty for our queue
-        TransferLogSerialization tlogs = injector.getInstance( TransferLogSerialization.class );
-        Result<TransferLog> all = tlogs.getAllTransferLogs( null, 1000 );
-        List<TransferLog> logs = all.getEntities().stream()
+            distributedQueueService.refresh();
+            Thread.sleep( 1000 );
+
+            // get message from the queue
+            List<QueueMessage> messages = qmm.getNextMessages( queueName, 1 );
+            Assert.assertEquals( 1, messages.size() );
+            QueueMessage message = messages.get( 0 );
+
+            // test that queue message data is present and correct
+            QueueMessageSerialization qms = injector.getInstance( QueueMessageSerialization.class );
+            DatabaseQueueMessageBody data = qms.loadMessageData( message.getMessageId() );
+            Assert.assertNotNull( data );
+            Assert.assertEquals( "application/json", data.getContentType() );
+            String jsonDataReturned = new String( data.getBlob().array(), Charset.forName( "UTF-8" ) );
+            Assert.assertEquals( jsonData, jsonDataReturned );
+
+            // test that transfer log is empty for our queue
+            TransferLogSerialization tlogs = injector.getInstance( TransferLogSerialization.class );
+            Result<TransferLog> all = tlogs.getAllTransferLogs( null, 1000 );
+            List<TransferLog> logs = all.getEntities().stream()
                 .filter( log -> log.getQueueName().equals( queueName ) ).collect( Collectors.toList() );
-        Assert.assertTrue( logs.isEmpty() );
+            Assert.assertTrue( logs.isEmpty() );
 
-        // ack the message
-        qmm.ackMessage( queueName, message.getQueueMessageId() );
+            // ack the message
+            qmm.ackMessage( queueName, message.getQueueMessageId() );
 
-        // test that message is no longer stored in non-replicated keyspace
+            // test that message is no longer stored in non-replicated keyspace
 
-        Assert.assertNull( qms.loadMessage( queueName, region, null,
-                DatabaseQueueMessage.Type.DEFAULT, message.getQueueMessageId() ));
+            Assert.assertNull( qms.loadMessage( queueName, region, null,
+                DatabaseQueueMessage.Type.DEFAULT, message.getQueueMessageId() ) );
 
-        Assert.assertNull( qms.loadMessage( queueName, region, null,
-                DatabaseQueueMessage.Type.INFLIGHT, message.getQueueMessageId() ));
+            Assert.assertNull( qms.loadMessage( queueName, region, null,
+                DatabaseQueueMessage.Type.INFLIGHT, message.getQueueMessageId() ) );
 
-        // test that audit log entry was written
-        AuditLogSerialization auditLogSerialization = injector.getInstance( AuditLogSerialization.class );
-        Result<AuditLog> auditLogs = auditLogSerialization.getAuditLogs( message.getMessageId() );
-        Assert.assertEquals( 3, auditLogs.getEntities().size() );
+            // test that audit log entry was written
+            AuditLogSerialization auditLogSerialization = injector.getInstance( AuditLogSerialization.class );
+            Result<AuditLog> auditLogs = auditLogSerialization.getAuditLogs( message.getMessageId() );
+            Assert.assertEquals( 3, auditLogs.getEntities().size() );
 
-        distributedQueueService.shutdown();
+            distributedQueueService.shutdown();
+
+        } finally {
+            queueManager.deleteQueue( queueName );
+        }
     }
 
 
@@ -138,8 +144,6 @@ public class QueueMessageManagerTest extends AbstractTest {
 
         Injector injector = getInjector();
 
-        CassandraClient cassandraClient = injector.getInstance( CassandraClientImpl.class );
-
         DistributedQueueService distributedQueueService = injector.getInstance( DistributedQueueService.class );
         QakkaFig qakkaFig             = injector.getInstance( QakkaFig.class );
         ActorSystemFig actorSystemFig = injector.getInstance( ActorSystemFig.class );
@@ -152,74 +156,82 @@ public class QueueMessageManagerTest extends AbstractTest {
         // create some number of queue messages
 
         QueueManager queueManager = injector.getInstance( QueueManager.class );
-        QueueMessageManager qmm   = injector.getInstance( QueueMessageManager.class );
-        String queueName = "queue_testQueueMessageTimeouts_" + RandomStringUtils.randomAlphanumeric(15);
-        queueManager.createQueue( new Queue( queueName, "test-type", region, region, 0L, 5, 10, null ));
 
-        int numMessages = 40;
+        String queueName = "queue_testQueueMessageTimeouts_" + RandomStringUtils.randomAlphanumeric( 15 );
 
-        for ( int i=0; i<numMessages; i++ ) {
-            qmm.sendMessages(
+        try {
+
+            QueueMessageManager qmm = injector.getInstance( QueueMessageManager.class );
+            queueManager.createQueue( new Queue( queueName, "test-type", region, region, 0L, 5, 10, null ) );
+
+            int numMessages = 40;
+
+            for (int i = 0; i < numMessages; i++) {
+                qmm.sendMessages(
                     queueName,
                     Collections.singletonList( region ),
                     null, // delay
                     null, // expiration
                     "application/json",
                     DataType.serializeValue( "{}", ProtocolVersion.NEWEST_SUPPORTED ) );
-        }
+            }
 
-        int maxRetries = 15;
-        int retries = 0;
-        while ( retries++ < maxRetries ) {
-            distributedQueueService.refresh();
-            if (inMemoryQueue.size( queueName ) == 40) {
-                break;
+            int maxRetries = 15;
+            int retries = 0;
+            while (retries++ < maxRetries) {
+                distributedQueueService.refresh();
+                if (inMemoryQueue.size( queueName ) == 40) {
+                    break;
+                }
+                Thread.sleep( 500 );
             }
-            Thread.sleep( 500 );
-        }
 
-        Assert.assertEquals( numMessages, qmm.getQueueDepth( queueName ) );
+            Assert.assertEquals( numMessages, qmm.getQueueDepth( queueName ) );
 
-        // get all messages from queue
+            // get all messages from queue
 
-        List<QueueMessage> messages = qmm.getNextMessages( queueName, numMessages );
-        Assert.assertEquals( numMessages, messages.size() );
+            List<QueueMessage> messages = qmm.getNextMessages( queueName, numMessages );
+            Assert.assertEquals( numMessages, messages.size() );
 
-        // ack half of the messages
+            // ack half of the messages
 
-        List<QueueMessage> remove = new ArrayList<>();
+            List<QueueMessage> remove = new ArrayList<>();
 
-        for ( int i=0; i<numMessages/2; i++ ) {
-            QueueMessage queueMessage = messages.get( i );
-            qmm.ackMessage( queueName, queueMessage.getQueueMessageId() );
-            remove.add( queueMessage );
-        }
+            for (int i = 0; i < numMessages / 2; i++) {
+                QueueMessage queueMessage = messages.get( i );
+                qmm.ackMessage( queueName, queueMessage.getQueueMessageId() );
+                remove.add( queueMessage );
+            }
 
-        for ( QueueMessage message : remove ) {
-            messages.remove( message );
-        }
+            for (QueueMessage message : remove) {
+                messages.remove( message );
+            }
 
-        // wait for twice timeout period
+            // wait for twice timeout period
 
-        Thread.sleep( 2 * qakkaFig.getQueueTimeoutSeconds()*1000 );
+            Thread.sleep( 2 * qakkaFig.getQueueTimeoutSeconds() * 1000 );
 
-        distributedQueueService.processTimeouts();
+            distributedQueueService.processTimeouts();
 
-        Thread.sleep( qakkaFig.getQueueTimeoutSeconds()*1000 );
+            Thread.sleep( qakkaFig.getQueueTimeoutSeconds() * 1000 );
 
-        // attempt to ack other half of messages
+            // attempt to ack other half of messages
 
-        for ( QueueMessage message : messages ) {
-            try {
-                qmm.ackMessage( queueName, message.getQueueMessageId() );
-                Assert.fail("Message should have timed out by now");
+            for (QueueMessage message : messages) {
+                try {
+                    qmm.ackMessage( queueName, message.getQueueMessageId() );
+                    Assert.fail( "Message should have timed out by now" );
 
-            } catch ( QakkaRuntimeException expected ) {
-                // keep on going...
+                } catch (QakkaRuntimeException expected) {
+                    // keep on going...
+                }
             }
-        }
 
-        distributedQueueService.shutdown();
+            distributedQueueService.shutdown();
+
+        } finally {
+            queueManager.deleteQueue( queueName );
+        }
     }
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a19ba9a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java
index 7423424..53f9224 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java
@@ -78,35 +78,42 @@ public class QueueActorServiceTest extends AbstractTest {
         QueueManager queueManager = injector.getInstance( QueueManager.class );
         queueManager.createQueue( new Queue( queueName, "test-type", region, region, 0L, 5, 10, null ));
 
-        // send 1 queue message, get back one queue message
-        UUID messageId = UUIDGen.getTimeUUID();
+        try {
 
-        final String data = "my test data";
-        final DatabaseQueueMessageBody messageBody = new DatabaseQueueMessageBody(
+            // send 1 queue message, get back one queue message
+            UUID messageId = UUIDGen.getTimeUUID();
+
+            final String data = "my test data";
+            final DatabaseQueueMessageBody messageBody = new DatabaseQueueMessageBody(
                 DataType.serializeValue( data, ProtocolVersion.NEWEST_SUPPORTED ), "text/plain" );
-        serialization.writeMessageData( messageId, messageBody );
+            serialization.writeMessageData( messageId, messageBody );
 
-        distributedQueueService.sendMessageToRegion(
-                queueName, region, region, messageId, null, null);
+            distributedQueueService.sendMessageToRegion(
+                queueName, region, region, messageId, null, null );
 
-        distributedQueueService.refresh();
-        Thread.sleep(1000);
+            distributedQueueService.refresh();
+            Thread.sleep( 1000 );
 
-        Collection<DatabaseQueueMessage> qmReturned = distributedQueueService.getNextMessages( queueName, 1 );
-        Assert.assertEquals( 1, qmReturned.size() );
+            Collection<DatabaseQueueMessage> qmReturned = distributedQueueService.getNextMessages( queueName, 1 );
+            Assert.assertEquals( 1, qmReturned.size() );
 
-        DatabaseQueueMessage dqm = qmReturned.iterator().next();
-        DatabaseQueueMessageBody dqmb = serialization.loadMessageData( dqm.getMessageId() );
-        ByteBuffer blob = dqmb.getBlob();
+            DatabaseQueueMessage dqm = qmReturned.iterator().next();
+            DatabaseQueueMessageBody dqmb = serialization.loadMessageData( dqm.getMessageId() );
+            ByteBuffer blob = dqmb.getBlob();
 
-        String returnedData = new String( blob.array(), "UTF-8");
+            String returnedData = new String( blob.array(), "UTF-8" );
 //        ByteArrayInputStream bais = new ByteArrayInputStream( blob.array() );
 //        ObjectInputStream ios = new ObjectInputStream( bais );
 //        String returnedData = (String)ios.readObject();
 
-        Assert.assertEquals( data, returnedData );
+            Assert.assertEquals( data, returnedData );
+
+            distributedQueueService.shutdown();
+
+        } finally {
+            queueManager.deleteQueue( queueName );
+        }
 
-        distributedQueueService.shutdown();
     }
 
 
@@ -128,51 +135,58 @@ public class QueueActorServiceTest extends AbstractTest {
 
         String queueName = "queue_testGetMultipleQueueMessages_" + UUID.randomUUID();
         QueueManager queueManager = injector.getInstance( QueueManager.class );
-        queueManager.createQueue(
-                new Queue( queueName, "test-type", region, region, 0L, 5, 10, null ));
 
-        for ( int i=0; i<100; i++ ) {
+        try {
 
-            UUID messageId = UUIDGen.getTimeUUID();
+            queueManager.createQueue(
+                new Queue( queueName, "test-type", region, region, 0L, 5, 10, null ) );
 
-            final String data = "my test data";
-            final DatabaseQueueMessageBody messageBody = new DatabaseQueueMessageBody(
+            for (int i = 0; i < 100; i++) {
+
+                UUID messageId = UUIDGen.getTimeUUID();
+
+                final String data = "my test data";
+                final DatabaseQueueMessageBody messageBody = new DatabaseQueueMessageBody(
                     DataType.serializeValue( data, ProtocolVersion.NEWEST_SUPPORTED ), "text/plain" );
-            serialization.writeMessageData( messageId, messageBody );
+                serialization.writeMessageData( messageId, messageBody );
 
-            xferLogSerialization.recordTransferLog(
-                queueName, actorSystemFig.getRegionLocal(), region, messageId );
+                xferLogSerialization.recordTransferLog(
+                    queueName, actorSystemFig.getRegionLocal(), region, messageId );
 
-            distributedQueueService.sendMessageToRegion(
-                    queueName, region, region, messageId , null, null);
-        }
+                distributedQueueService.sendMessageToRegion(
+                    queueName, region, region, messageId, null, null );
+            }
 
-        int maxRetries = 25;
-        int retries = 0;
-        int count = 0;
-        while ( retries++ < maxRetries ) {
-            distributedQueueService.refresh();
-            if (inMemoryQueue.size( queueName ) == 100) {
-                count = 100;
-                break;
+            int maxRetries = 25;
+            int retries = 0;
+            int count = 0;
+            while (retries++ < maxRetries) {
+                distributedQueueService.refresh();
+                if (inMemoryQueue.size( queueName ) == 100) {
+                    count = 100;
+                    break;
+                }
+                Thread.sleep( 1000 );
             }
-            Thread.sleep(1000);
-        }
 
-        Assert.assertEquals( 100, count );
+            Assert.assertEquals( 100, count );
 
-        Assert.assertEquals( 25, distributedQueueService.getNextMessages( queueName, 25 ).size() );
-        Assert.assertEquals( 75, inMemoryQueue.size( queueName ) );
+            Assert.assertEquals( 25, distributedQueueService.getNextMessages( queueName, 25 ).size() );
+            Assert.assertEquals( 75, inMemoryQueue.size( queueName ) );
 
-        Assert.assertEquals( 25, distributedQueueService.getNextMessages( queueName, 25 ).size() );
-        Assert.assertEquals( 50, inMemoryQueue.size( queueName ) );
+            Assert.assertEquals( 25, distributedQueueService.getNextMessages( queueName, 25 ).size() );
+            Assert.assertEquals( 50, inMemoryQueue.size( queueName ) );
 
-        Assert.assertEquals( 25, distributedQueueService.getNextMessages( queueName, 25 ).size() );
-        Assert.assertEquals( 25, inMemoryQueue.size( queueName ) );
+            Assert.assertEquals( 25, distributedQueueService.getNextMessages( queueName, 25 ).size() );
+            Assert.assertEquals( 25, inMemoryQueue.size( queueName ) );
 
-        Assert.assertEquals( 25, distributedQueueService.getNextMessages( queueName, 25 ).size() );
-        Assert.assertEquals( 0, inMemoryQueue.size( queueName ) );
+            Assert.assertEquals( 25, distributedQueueService.getNextMessages( queueName, 25 ).size() );
+            Assert.assertEquals( 0, inMemoryQueue.size( queueName ) );
 
-        distributedQueueService.shutdown();
+            distributedQueueService.shutdown();
+
+        } finally {
+            queueManager.deleteQueue( queueName );
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a19ba9a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelperTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelperTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelperTest.java
index 3bf352f..791650e 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelperTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelperTest.java
@@ -53,7 +53,6 @@ public class QueueActorHelperTest extends AbstractTest {
     public void loadDatabaseQueueMessage() throws Exception {
 
         Injector injector = getInjector();
-        CassandraClient cassandraClient = injector.getInstance( CassandraClientImpl.class );
 
         injector.getInstance( App.class ); // init the INJECTOR
 
@@ -66,33 +65,39 @@ public class QueueActorHelperTest extends AbstractTest {
         app.start( "localhost", getNextAkkaPort(), region );
 
         String queueName = "qat_queue_" + RandomStringUtils.randomAlphanumeric( 10 );
-        queueManager.createQueue( new Queue( queueName ) );
 
-        UUID queueMessageId = QakkaUtils.getTimeUuid();
+        try {
+            queueManager.createQueue( new Queue( queueName ) );
 
-        // write message
+            UUID queueMessageId = QakkaUtils.getTimeUuid();
 
-        DatabaseQueueMessage message = new DatabaseQueueMessage(
-                QakkaUtils.getTimeUuid(),
-                DatabaseQueueMessage.Type.DEFAULT,
-                queueName,
-                actorSystemFig.getRegionLocal(),
-                null,
-                System.currentTimeMillis(),
-                null,
-                queueMessageId);
-        qms.writeMessage( message );
+            // write message
+
+            DatabaseQueueMessage message = new DatabaseQueueMessage(
+                    QakkaUtils.getTimeUuid(),
+                    DatabaseQueueMessage.Type.DEFAULT,
+                    queueName,
+                    actorSystemFig.getRegionLocal(),
+                    null,
+                    System.currentTimeMillis(),
+                    null,
+                    queueMessageId);
+            qms.writeMessage( message );
 
-        // load message
+            // load message
 
-        QueueActorHelper helper = injector.getInstance( QueueActorHelper.class );
-        DatabaseQueueMessage queueMessage = helper.loadDatabaseQueueMessage(
-                queueName, message.getQueueMessageId(), message.getType() );
+            QueueActorHelper helper = injector.getInstance( QueueActorHelper.class );
+            DatabaseQueueMessage queueMessage = helper.loadDatabaseQueueMessage(
+                    queueName, message.getQueueMessageId(), message.getType() );
 
-        Assert.assertNotNull( queueMessage );
+            Assert.assertNotNull( queueMessage );
 
-        DistributedQueueService distributedQueueService = injector.getInstance( DistributedQueueService.class );
-        distributedQueueService.shutdown();
+            DistributedQueueService distributedQueueService = injector.getInstance( DistributedQueueService.class );
+            distributedQueueService.shutdown();
+
+        } finally {
+            queueManager.deleteQueue( queueName );
+        }
     }
 
 
@@ -100,8 +105,6 @@ public class QueueActorHelperTest extends AbstractTest {
     public void loadDatabaseQueueMessageNotFound() throws Exception {
 
         Injector injector = getInjector();
-        CassandraClient cassandraClient = injector.getInstance( CassandraClientImpl.class );
-
 
         injector.getInstance( App.class ); // init the INJECTOR
         QueueManager queueManager = injector.getInstance( QueueManager.class );
@@ -112,20 +115,27 @@ public class QueueActorHelperTest extends AbstractTest {
         app.start( "localhost", getNextAkkaPort(), region );
 
         String queueName = "qat_queue_" + RandomStringUtils.randomAlphanumeric( 10 );
+
         queueManager.createQueue( new Queue( queueName ) );
 
-        // don't write any message
+        try {
+
+            // don't write any message
 
-        // load message
+            // load message
 
-        QueueActorHelper helper = injector.getInstance( QueueActorHelper.class );
-        DatabaseQueueMessage queueMessage = helper.loadDatabaseQueueMessage(
+            QueueActorHelper helper = injector.getInstance( QueueActorHelper.class );
+            DatabaseQueueMessage queueMessage = helper.loadDatabaseQueueMessage(
                 queueName, QakkaUtils.getTimeUuid(), DatabaseQueueMessage.Type.DEFAULT );
 
-        Assert.assertNull( queueMessage );
+            Assert.assertNull( queueMessage );
 
-        DistributedQueueService distributedQueueService = injector.getInstance( DistributedQueueService.class );
-        distributedQueueService.shutdown();
+            DistributedQueueService distributedQueueService = injector.getInstance( DistributedQueueService.class );
+            distributedQueueService.shutdown();
+
+        } finally {
+            queueManager.deleteQueue( queueName );
+        }
     }
 
 
@@ -133,8 +143,6 @@ public class QueueActorHelperTest extends AbstractTest {
     public void putInflight() throws Exception {
 
         Injector injector = getInjector();
-        CassandraClient cassandraClient = injector.getInstance( CassandraClientImpl.class );
-
 
         injector.getInstance( App.class ); // init the INJECTOR
 
@@ -153,7 +161,9 @@ public class QueueActorHelperTest extends AbstractTest {
         String queueName = "qat_queue_" + RandomStringUtils.randomAlphanumeric( 10 );
         queueManager.createQueue( new Queue( queueName ) );
 
-        DatabaseQueueMessage message = new DatabaseQueueMessage(
+        try {
+
+            DatabaseQueueMessage message = new DatabaseQueueMessage(
                 QakkaUtils.getTimeUuid(),
                 DatabaseQueueMessage.Type.DEFAULT,
                 queueName,
@@ -161,42 +171,46 @@ public class QueueActorHelperTest extends AbstractTest {
                 null,
                 System.currentTimeMillis(),
                 null,
-                queueMessageId);
-        qms.writeMessage( message );
+                queueMessageId );
+            qms.writeMessage( message );
 
-        // put message inflight
+            // put message inflight
 
-        QueueActorHelper helper = injector.getInstance( QueueActorHelper.class );
-        helper.putInflight( queueName, message );
+            QueueActorHelper helper = injector.getInstance( QueueActorHelper.class );
+            helper.putInflight( queueName, message );
 
-        // message must be gone from messages_available table
+            // message must be gone from messages_available table
 
-        Assert.assertNull( qms.loadMessage(
+            Assert.assertNull( qms.loadMessage(
                 queueName,
                 actorSystemFig.getRegionLocal(),
                 null,
                 DatabaseQueueMessage.Type.DEFAULT,
                 message.getQueueMessageId() ) );
 
-        // message must be present in messages_inflight table
+            // message must be present in messages_inflight table
 
-        Assert.assertNotNull( qms.loadMessage(
+            Assert.assertNotNull( qms.loadMessage(
                 queueName,
                 actorSystemFig.getRegionLocal(),
                 null,
                 DatabaseQueueMessage.Type.INFLIGHT,
                 message.getQueueMessageId() ) );
 
-        // there must be an audit log record of the successful get operation
+            // there must be an audit log record of the successful get operation
+
+            AuditLogSerialization auditLogSerialization = injector.getInstance( AuditLogSerialization.class );
+            Result<AuditLog> auditLogs = auditLogSerialization.getAuditLogs( message.getMessageId() );
+            Assert.assertEquals( 1, auditLogs.getEntities().size() );
+            Assert.assertEquals( AuditLog.Status.SUCCESS, auditLogs.getEntities().get( 0 ).getStatus() );
+            Assert.assertEquals( AuditLog.Action.GET, auditLogs.getEntities().get( 0 ).getAction() );
 
-        AuditLogSerialization auditLogSerialization = injector.getInstance( AuditLogSerialization.class );
-        Result<AuditLog> auditLogs = auditLogSerialization.getAuditLogs( message.getMessageId() );
-        Assert.assertEquals( 1, auditLogs.getEntities().size() );
-        Assert.assertEquals( AuditLog.Status.SUCCESS, auditLogs.getEntities().get(0).getStatus()  );
-        Assert.assertEquals( AuditLog.Action.GET,     auditLogs.getEntities().get(0).getAction()  );
+            DistributedQueueService distributedQueueService = injector.getInstance( DistributedQueueService.class );
+            distributedQueueService.shutdown();
 
-        DistributedQueueService distributedQueueService = injector.getInstance( DistributedQueueService.class );
-        distributedQueueService.shutdown();
+        } finally {
+            queueManager.deleteQueue( queueName );
+        }
     }
 
 
@@ -222,9 +236,11 @@ public class QueueActorHelperTest extends AbstractTest {
         String queueName = "qat_queue_" + RandomStringUtils.randomAlphanumeric( 10 );
         queueManager.createQueue( new Queue( queueName ) );
 
-        // write message to messages_inflight table
+        try {
 
-        DatabaseQueueMessage message = new DatabaseQueueMessage(
+            // write message to messages_inflight table
+
+            DatabaseQueueMessage message = new DatabaseQueueMessage(
                 QakkaUtils.getTimeUuid(),
                 DatabaseQueueMessage.Type.INFLIGHT,
                 queueName,
@@ -232,34 +248,38 @@ public class QueueActorHelperTest extends AbstractTest {
                 null,
                 System.currentTimeMillis(),
                 null,
-                queueMessageId);
-        qms.writeMessage( message );
+                queueMessageId );
+            qms.writeMessage( message );
+
+            // ack message
 
-        // ack message
+            QueueActorHelper helper = injector.getInstance( QueueActorHelper.class );
+            helper.ackQueueMessage( queueName, message.getQueueMessageId() );
 
-        QueueActorHelper helper = injector.getInstance( QueueActorHelper.class );
-        helper.ackQueueMessage( queueName, message.getQueueMessageId() );
+            // message must be gone from messages_available table
 
-        // message must be gone from messages_available table
+            Assert.assertNull( helper.loadDatabaseQueueMessage(
+                queueName, QakkaUtils.getTimeUuid(), DatabaseQueueMessage.Type.INFLIGHT ) );
 
-        Assert.assertNull( helper.loadDatabaseQueueMessage(
-                queueName, QakkaUtils.getTimeUuid(), DatabaseQueueMessage.Type.INFLIGHT ));
+            // message must be gone from messages_inflight table
 
-        // message must be gone from messages_inflight table
+            Assert.assertNull( helper.loadDatabaseQueueMessage(
+                queueName, QakkaUtils.getTimeUuid(), DatabaseQueueMessage.Type.DEFAULT ) );
 
-        Assert.assertNull( helper.loadDatabaseQueueMessage(
-                queueName, QakkaUtils.getTimeUuid(), DatabaseQueueMessage.Type.DEFAULT ));
+            // there should be an audit log record of the successful ack operation
 
-        // there should be an audit log record of the successful ack operation
+            AuditLogSerialization auditLogSerialization = injector.getInstance( AuditLogSerialization.class );
+            Result<AuditLog> auditLogs = auditLogSerialization.getAuditLogs( message.getMessageId() );
+            Assert.assertEquals( 1, auditLogs.getEntities().size() );
+            Assert.assertEquals( AuditLog.Status.SUCCESS, auditLogs.getEntities().get( 0 ).getStatus() );
+            Assert.assertEquals( AuditLog.Action.ACK, auditLogs.getEntities().get( 0 ).getAction() );
 
-        AuditLogSerialization auditLogSerialization = injector.getInstance( AuditLogSerialization.class );
-        Result<AuditLog> auditLogs = auditLogSerialization.getAuditLogs( message.getMessageId() );
-        Assert.assertEquals( 1, auditLogs.getEntities().size() );
-        Assert.assertEquals( AuditLog.Status.SUCCESS, auditLogs.getEntities().get(0).getStatus()  );
-        Assert.assertEquals( AuditLog.Action.ACK,     auditLogs.getEntities().get(0).getAction()  );
+            DistributedQueueService distributedQueueService = injector.getInstance( DistributedQueueService.class );
+            distributedQueueService.shutdown();
 
-        DistributedQueueService distributedQueueService = injector.getInstance( DistributedQueueService.class );
-        distributedQueueService.shutdown();
+        } finally {
+            queueManager.deleteQueue( queueName );
+        }
     }
 
 
@@ -267,8 +287,6 @@ public class QueueActorHelperTest extends AbstractTest {
     public void ackQueueMessageNotFound() throws Exception {
 
         Injector injector = getInjector();
-        CassandraClient cassandraClient = injector.getInstance( CassandraClientImpl.class );
-
 
         injector.getInstance( App.class ); // init the INJECTOR
         QueueManager queueManager     = injector.getInstance( QueueManager.class );
@@ -281,17 +299,23 @@ public class QueueActorHelperTest extends AbstractTest {
         String queueName = "qat_queue_" + RandomStringUtils.randomAlphanumeric( 10 );
         queueManager.createQueue( new Queue( queueName ) );
 
-        // don't write message, just make up some bogus IDs
+        try {
 
-        UUID queueMessageId = QakkaUtils.getTimeUuid();
+            // don't write message, just make up some bogus IDs
+
+            UUID queueMessageId = QakkaUtils.getTimeUuid();
+
+            // ack message must fail
 
-        // ack message must fail
+            QueueActorHelper helper = injector.getInstance( QueueActorHelper.class );
+            Assert.assertEquals( DistributedQueueService.Status.NOT_INFLIGHT,
+                helper.ackQueueMessage( queueName, queueMessageId ) );
 
-        QueueActorHelper helper = injector.getInstance( QueueActorHelper.class );
-        Assert.assertEquals( DistributedQueueService.Status.BAD_REQUEST,
-            helper.ackQueueMessage( queueName, queueMessageId ));
+            DistributedQueueService distributedQueueService = injector.getInstance( DistributedQueueService.class );
+            distributedQueueService.shutdown();
 
-        DistributedQueueService distributedQueueService = injector.getInstance( DistributedQueueService.class );
-        distributedQueueService.shutdown();
+        } finally {
+            queueManager.deleteQueue( queueName );
+        }
     }
 }