You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2016/10/05 14:04:46 UTC

[5/5] usergrid git commit: Return to using asynchronous actor for in-memory queue refresh job.

Return to using asynchronous actor for in-memory queue refresh job.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/a90a0bbd
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/a90a0bbd
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/a90a0bbd

Branch: refs/heads/usergrid-1318-queue
Commit: a90a0bbd2fe341395e4bd084e566881feb98d9bf
Parents: bd5835b
Author: Dave Johnson <sn...@apache.org>
Authored: Wed Oct 5 10:02:45 2016 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Wed Oct 5 10:02:45 2016 -0400

----------------------------------------------------------------------
 .../usergrid/persistence/qakka/QakkaFig.java    |   2 +-
 .../qakka/distributed/actors/QueueActor.java    | 126 ++++++++-----------
 .../distributed/actors/QueueActorHelper.java    | 118 ++++++++++++-----
 .../qakka/core/QueueMessageManagerTest.java     |   3 -
 .../distributed/actors/QueueReaderTest.java     |  25 ++--
 .../queue/src/test/resources/log4j.properties   |   4 +-
 6 files changed, 152 insertions(+), 126 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/a90a0bbd/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
index 472c241..7d89187 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
@@ -109,7 +109,7 @@ public interface QakkaFig extends GuicyFig, Serializable {
 
     /** How long to wait for response from queue actor before timing out and trying again */
     @Key(QUEUE_GET_TIMEOUT)
-    @Default("2")
+    @Default("4")
     int getGetTimeoutSeconds();
 
     /** Max number of times to retry call to queue writer for queue send operation */

http://git-wip-us.apache.org/repos/asf/usergrid/blob/a90a0bbd/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
index c706f7d..64f12d4 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
@@ -129,22 +129,23 @@ public class QueueActor extends UntypedActor {
 
         } else if ( message instanceof QueueRefreshRequest ) {
             QueueRefreshRequest request = (QueueRefreshRequest)message;
-
             queuesSeen.add( request.getQueueName() );
-            queueActorHelper.queueRefresh( request.getQueueName() );
 
-//            if ( queueReadersByQueueName.get( request.getQueueName() ) == null ) {
-//
-//                if ( !request.isOnlyIfEmpty() || inMemoryQueue.peek( request.getQueueName()) == null ) {
-//                    ActorRef readerRef = getContext().actorOf(
-//                        Props.create( GuiceActorProducer.class, injector, QueueRefresher.class ),
-//                        request.getQueueName() + "_reader");
-//                    queueReadersByQueueName.put( request.getQueueName(), readerRef );
-//                }
-//            }
-//
-//            // hand-off to queue's reader
-//            queueReadersByQueueName.get( request.getQueueName() ).tell( request, self() );
+//            // NOT asynchronous
+//            queueActorHelper.queueRefresh( request.getQueueName() );
+
+            if ( queueReadersByQueueName.get( request.getQueueName() ) == null ) {
+
+                if ( !request.isOnlyIfEmpty() || inMemoryQueue.peek( request.getQueueName()) == null ) {
+                    ActorRef readerRef = getContext().actorOf(
+                        Props.create( GuiceActorProducer.class, injector, QueueRefresher.class ),
+                        request.getQueueName() + "_reader");
+                    queueReadersByQueueName.put( request.getQueueName(), readerRef );
+                }
+            }
+
+            // hand-off to queue's reader
+            queueReadersByQueueName.get( request.getQueueName() ).tell( request, self() );
 
         } else if ( message instanceof QueueTimeoutRequest ) {
             QueueTimeoutRequest request = (QueueTimeoutRequest)message;
@@ -158,10 +159,9 @@ public class QueueActor extends UntypedActor {
                 queueTimeoutersByQueueName.put( request.getQueueName(), readerRef );
             }
 
-            // hand-off to queue's timeouter
+            // ASYNCHRONOUS -> hand-off to queue's timeouter
             queueTimeoutersByQueueName.get( request.getQueueName() ).tell( request, self() );
 
-
         } else if ( message instanceof ShardCheckRequest ) {
             ShardCheckRequest request = (ShardCheckRequest)message;
 
@@ -174,75 +174,59 @@ public class QueueActor extends UntypedActor {
                 shardAllocatorsByQueueName.put( request.getQueueName(), readerRef );
             }
 
-            // hand-off to queue's shard allocator
+            // ASYNCHRONOUS -> hand-off to queue's shard allocator
             shardAllocatorsByQueueName.get( request.getQueueName() ).tell( request, self() );
 
-
         } else if ( message instanceof QueueGetRequest) {
 
-            Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.GET_TIME_GET ).time();
-            try {
-                QueueGetRequest queueGetRequest = (QueueGetRequest) message;
-
-                queuesSeen.add( queueGetRequest.getQueueName() );
+            QueueGetRequest queueGetRequest = (QueueGetRequest) message;
 
-                Collection<DatabaseQueueMessage> queueMessages = new ArrayList<>();
+            String queueName = queueGetRequest.getQueueName();
+            int numRequested = queueGetRequest.getNumRequested();
 
-                while (queueMessages.size() < queueGetRequest.getNumRequested()) {
+            queuesSeen.add( queueName );
 
-                    DatabaseQueueMessage queueMessage = inMemoryQueue.poll( queueGetRequest.getQueueName() );
+            Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.GET_TIME_GET ).time();
+            try {
 
-                    if (queueMessage != null) {
-                        if (queueActorHelper.putInflight( queueGetRequest.getQueueName(), queueMessage )) {
-                            queueMessages.add( queueMessage );
-                        }
-                    } else {
-                        logger.debug("in-memory queue for {} is empty, object is: {}",
-                                queueGetRequest.getQueueName(), inMemoryQueue );
-                        break;
-                    }
-                }
+                Collection<DatabaseQueueMessage> messages = queueActorHelper.getMessages( queueName, numRequested);
 
                 messageCounterSerialization.decrementCounter(
-                    queueGetRequest.getQueueName(),
+                    queueName,
                     DatabaseQueueMessage.Type.DEFAULT,
-                    queueMessages.size());
-
-                logger.debug("{} returning {} for queue {}",
-                    this, queueMessages.size(), queueGetRequest.getQueueName());
+                    messages.size());
 
                 getSender().tell( new QueueGetResponse(
-                        DistributedQueueService.Status.SUCCESS, queueMessages ), getSender() );
-
-                long runs = runCount.incrementAndGet();
-                long messagesReturned = messageCount.addAndGet( queueMessages.size() );
-
-                if ( logger.isDebugEnabled() && runs % 100 == 0 ) {
-
-                    final DecimalFormat format = new DecimalFormat("##.###");
-                    final long nano = 1000000000;
-                    Timer t = metricsService.getMetricRegistry().timer(MetricsService.GET_TIME_GET );
-
-                    logger.debug("QueueActor get stats (queues {}):\n" +
-                            "   Num runs={}\n" +
-                            "   Messages={}\n" +
-                            "   Mean={}\n" +
-                            "   One min rate={}\n" +
-                            "   Five min rate={}\n" +
-                            "   Snapshot mean={}\n" +
-                            "   Snapshot min={}\n" +
-                            "   Snapshot max={}",
-                        queuesSeen.toArray(),
-                        t.getCount(),
-                        messagesReturned,
-                        format.format( t.getMeanRate() ),
-                        format.format( t.getOneMinuteRate() ),
-                        format.format( t.getFiveMinuteRate() ),
-                        format.format( t.getSnapshot().getMean() / nano ),
-                        format.format( (double) t.getSnapshot().getMin() / nano ),
-                        format.format( (double) t.getSnapshot().getMax() / nano ) );
-                }
+                        DistributedQueueService.Status.SUCCESS, messages ), getSender() );
 
+//                long runs = runCount.incrementAndGet();
+//                long messagesReturned = messageCount.addAndGet( queueMessages.size() );
+//
+//                if ( logger.isDebugEnabled() && runs % 100 == 0 ) {
+//
+//                    final DecimalFormat format = new DecimalFormat("##.###");
+//                    final long nano = 1000000000;
+//                    Timer t = metricsService.getMetricRegistry().timer(MetricsService.GET_TIME_GET );
+//
+//                    logger.debug("QueueActor get stats (queues {}):\n" +
+//                            "   Num runs={}\n" +
+//                            "   Messages={}\n" +
+//                            "   Mean={}\n" +
+//                            "   One min rate={}\n" +
+//                            "   Five min rate={}\n" +
+//                            "   Snapshot mean={}\n" +
+//                            "   Snapshot min={}\n" +
+//                            "   Snapshot max={}",
+//                        queuesSeen.toArray(),
+//                        t.getCount(),
+//                        messagesReturned,
+//                        format.format( t.getMeanRate() ),
+//                        format.format( t.getOneMinuteRate() ),
+//                        format.format( t.getFiveMinuteRate() ),
+//                        format.format( t.getSnapshot().getMean() / nano ),
+//                        format.format( (double) t.getSnapshot().getMin() / nano ),
+//                        format.format( (double) t.getSnapshot().getMax() / nano ) );
+//                }
 
             } finally {
                 timer.close();

http://git-wip-us.apache.org/repos/asf/usergrid/blob/a90a0bbd/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java
index 68250df..0b31e16 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java
@@ -37,7 +37,8 @@ import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardIterato
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.text.DecimalFormat;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicLong;
@@ -98,12 +99,39 @@ public class QueueActorHelper {
     }
 
 
+    Collection<DatabaseQueueMessage> getMessages(String queueName, int numRequested ) {
+
+        Collection<DatabaseQueueMessage> queueMessages = new ArrayList<>();
+
+        while (queueMessages.size() < numRequested) {
+
+            DatabaseQueueMessage queueMessage = inMemoryQueue.poll( queueName );
+
+            if (queueMessage != null) {
+
+                if (putInflight( queueName, queueMessage )) {
+                    queueMessages.add( queueMessage );
+                }
+
+            } else {
+                //logger.debug("in-memory queue for {} is empty, object is: {}", queueName, inMemoryQueue );
+                break;
+            }
+        }
+
+        //logger.debug("{} returning {} for queue {}", this, queueMessages.size(), queueName);
+        return queueMessages;
+
+    }
+
+
     DistributedQueueService.Status ackQueueMessage(String queueName, UUID queueMessageId ) {
 
         DatabaseQueueMessage queueMessage = loadDatabaseQueueMessage(
                 queueName, queueMessageId, DatabaseQueueMessage.Type.INFLIGHT );
 
         if ( queueMessage == null ) {
+            logger.error("Queue {} queue message id {} not found in inflight table", queueName, queueMessageId);
             return DistributedQueueService.Status.NOT_INFLIGHT;
         }
 
@@ -152,10 +180,25 @@ public class QueueActorHelper {
 
         UUID qmid = queueMessage.getQueueMessageId();
         try {
-            queueMessage.setType( DatabaseQueueMessage.Type.INFLIGHT );
-            queueMessage.setShardId( null );
-            queueMessage.setInflightAt( System.currentTimeMillis() );
-            messageSerialization.writeMessage( queueMessage );
+
+            DatabaseQueueMessage inflightMessage = new DatabaseQueueMessage(
+                queueMessage.getMessageId(),
+                DatabaseQueueMessage.Type.INFLIGHT,
+                queueName,
+                actorSystemFig.getRegionLocal(),
+                null,                         // let serialization select the shard
+                queueMessage.getQueuedAt(),
+                System.currentTimeMillis(),
+                qmid);
+
+            messageSerialization.writeMessage( inflightMessage );
+
+            DatabaseQueueMessage retrieved = loadDatabaseQueueMessage(
+                queueName, qmid, DatabaseQueueMessage.Type.INFLIGHT );
+            if ( retrieved == null ) {
+                logger.error("Failed ot write queue message id {} to inflight table", qmid);
+                return false;
+            }
 
             messageSerialization.deleteMessage(
                     queueName,
@@ -191,6 +234,7 @@ public class QueueActorHelper {
         return true;
     }
 
+
     void queueRefresh( String queueName ) {
 
         Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.REFRESH_TIME).time();
@@ -199,12 +243,20 @@ public class QueueActorHelper {
 
             if (inMemoryQueue.size( queueName ) < qakkaFig.getQueueInMemorySize()) {
 
+                // TODO: need to track the starting shard
+
                 ShardIterator shardIterator = new ShardIterator(
                     cassandraClient, queueName, actorSystemFig.getRegionLocal(),
                     Shard.Type.DEFAULT, Optional.empty() );
 
                 UUID since = inMemoryQueue.getNewest( queueName );
 
+//                if ( since != null ) {
+//                    logger.debug( "Loading queue {} messages newer than {}", queueName, since.timestamp() );
+//                } else {
+//                    logger.debug( "Loading queue {} messages newer than [null]", queueName );
+//                }
+
                 String region = actorSystemFig.getRegionLocal();
                 MultiShardMessageIterator multiShardIterator = new MultiShardMessageIterator(
                     cassandraClient, queueName, region, DatabaseQueueMessage.Type.DEFAULT,
@@ -219,34 +271,34 @@ public class QueueActorHelper {
                     count++;
                 }
 
-                long runs = runCount.incrementAndGet();
-                long readCount = totalRead.addAndGet( count );
-
-                if ( logger.isDebugEnabled() && runs % 100 == 0 ) {
-
-                    final DecimalFormat format = new DecimalFormat("##.###");
-                    final long nano = 1000000000;
-                    Timer t = metricsService.getMetricRegistry().timer( MetricsService.REFRESH_TIME );
-
-                    logger.debug("QueueRefresher for queue '{}' stats:\n" +
-                            "   Num runs={}\n" +
-                            "   Read count={}\n" +
-                            "   Mean={}\n" +
-                            "   One min rate={}\n" +
-                            "   Five min rate={}\n" +
-                            "   Snapshot mean={}\n" +
-                            "   Snapshot min={}\n" +
-                            "   Snapshot max={}",
-                        queueName,
-                        t.getCount(),
-                        readCount,
-                        format.format( t.getMeanRate() ),
-                        format.format( t.getOneMinuteRate() ),
-                        format.format( t.getFiveMinuteRate() ),
-                        format.format( t.getSnapshot().getMean() / nano ),
-                        format.format( (double) t.getSnapshot().getMin() / nano ),
-                        format.format( (double) t.getSnapshot().getMax() / nano ) );
-                }
+//                long runs = runCount.incrementAndGet();
+//                long readCount = totalRead.addAndGet( count );
+//
+//                if ( logger.isDebugEnabled() && runs % 100 == 0 ) {
+//
+//                    final DecimalFormat format = new DecimalFormat("##.###");
+//                    final long nano = 1000000000;
+//                    Timer t = metricsService.getMetricRegistry().timer( MetricsService.REFRESH_TIME );
+//
+//                    logger.debug("QueueRefresher for queue '{}' stats:\n" +
+//                            "   Num runs={}\n" +
+//                            "   Read count={}\n" +
+//                            "   Mean={}\n" +
+//                            "   One min rate={}\n" +
+//                            "   Five min rate={}\n" +
+//                            "   Snapshot mean={}\n" +
+//                            "   Snapshot min={}\n" +
+//                            "   Snapshot max={}",
+//                        queueName,
+//                        t.getCount(),
+//                        readCount,
+//                        format.format( t.getMeanRate() ),
+//                        format.format( t.getOneMinuteRate() ),
+//                        format.format( t.getFiveMinuteRate() ),
+//                        format.format( t.getSnapshot().getMean() / nano ),
+//                        format.format( (double) t.getSnapshot().getMin() / nano ),
+//                        format.format( (double) t.getSnapshot().getMax() / nano ) );
+//                }
 
                 if ( count > 0 ) {
                     logger.debug( "Added {} in-memory for queue {}, new size = {}",

http://git-wip-us.apache.org/repos/asf/usergrid/blob/a90a0bbd/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
index 9e0a9d8..f77f31b 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
@@ -42,7 +42,6 @@ import org.apache.usergrid.persistence.qakka.serialization.transferlog.TransferL
 import org.apache.usergrid.persistence.qakka.serialization.transferlog.TransferLogSerialization;
 import org.apache.usergrid.persistence.queue.TestModule;
 import org.junit.Assert;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -240,8 +239,6 @@ public class QueueMessageManagerTest extends AbstractTest {
 
         Injector injector = getInjector();
 
-        CassandraClient cassandraClient = injector.getInstance( CassandraClientImpl.class );
-
         injector.getInstance( App.class ); // init the INJECTOR
 
         ActorSystemFig actorSystemFig = injector.getInstance( ActorSystemFig.class );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/a90a0bbd/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueReaderTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueReaderTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueReaderTest.java
index ba3c0f8..5b42184 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueReaderTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueReaderTest.java
@@ -22,22 +22,18 @@ package org.apache.usergrid.persistence.qakka.distributed.actors;
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.actor.Props;
-import com.google.inject.Inject;
 import com.google.inject.Injector;
 import org.apache.commons.lang.RandomStringUtils;
 import org.apache.usergrid.persistence.actorsystem.ActorSystemFig;
 import org.apache.usergrid.persistence.actorsystem.GuiceActorProducer;
-import org.apache.usergrid.persistence.qakka.QakkaFig;
-import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl;
-import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard;
 import org.apache.usergrid.persistence.qakka.AbstractTest;
-import org.apache.usergrid.persistence.qakka.App;
-import org.apache.usergrid.persistence.qakka.core.CassandraClient;
+import org.apache.usergrid.persistence.qakka.QakkaFig;
 import org.apache.usergrid.persistence.qakka.core.QakkaUtils;
 import org.apache.usergrid.persistence.qakka.core.impl.InMemoryQueue;
 import org.apache.usergrid.persistence.qakka.distributed.messages.QueueRefreshRequest;
 import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
 import org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization;
+import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard;
 import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardSerialization;
 import org.junit.Assert;
 import org.junit.Test;
@@ -57,14 +53,14 @@ public class QueueReaderTest extends AbstractTest {
 
         Injector injector = getInjector();
 
-        QakkaFig qakkaFig = getInjector().getInstance( QakkaFig.class );
-        ActorSystemFig actorSystemFig = getInjector().getInstance( ActorSystemFig.class );
-        ShardSerialization shardSerialization = getInjector().getInstance( ShardSerialization.class );
+        QakkaFig qakkaFig = injector.getInstance( QakkaFig.class );
+        ActorSystemFig actorSystemFig = injector.getInstance( ActorSystemFig.class );
+        ShardSerialization shardSerialization = injector.getInstance( ShardSerialization.class );
 
         int numMessages = 200;
         // create queue messages, only first lot get queue message data
 
-        QueueMessageSerialization serialization = getInjector().getInstance( QueueMessageSerialization.class );
+        QueueMessageSerialization serialization = injector.getInstance( QueueMessageSerialization.class );
         String queueName = "qrt_queue_" + RandomStringUtils.randomAlphanumeric( 10 );
 
         Shard newShard = new Shard( queueName, actorSystemFig.getRegionLocal(),
@@ -88,21 +84,18 @@ public class QueueReaderTest extends AbstractTest {
             serialization.writeMessage( message );
         }
 
-        InMemoryQueue inMemoryQueue = getInjector().getInstance( InMemoryQueue.class );
+        InMemoryQueue inMemoryQueue = injector.getInstance( InMemoryQueue.class );
         Assert.assertEquals( 0, inMemoryQueue.size( queueName ) );
 
         // run the QueueRefresher to fill up the in-memory queue
 
-        ActorSystem system = ActorSystem.create("Test-" + queueName);
-        ActorRef queueReaderRef = system.actorOf(
-            Props.create( GuiceActorProducer.class, injector, QueueRefresher.class ), "queueReader");
-        QueueRefreshRequest refreshRequest = new QueueRefreshRequest( queueName, false );
+        QueueActorHelper helper = injector.getInstance( QueueActorHelper.class );
 
         // need to wait for refresh to complete
         int maxRetries = 10;
         int retries = 0;
         while ( inMemoryQueue.size( queueName ) < qakkaFig.getQueueInMemorySize() && retries++ < maxRetries ) {
-            queueReaderRef.tell( refreshRequest, null ); // tell sends message, returns immediately
+            helper.queueRefresh( queueName );
             Thread.sleep(1000);
         }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/a90a0bbd/stack/corepersistence/queue/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/resources/log4j.properties b/stack/corepersistence/queue/src/test/resources/log4j.properties
index e1cbda4..eb45d2a 100644
--- a/stack/corepersistence/queue/src/test/resources/log4j.properties
+++ b/stack/corepersistence/queue/src/test/resources/log4j.properties
@@ -28,6 +28,6 @@ log4j.logger.org.glassfish=WARN
 #log4j.logger.org.apache.usergrid.persistence.actorsystem=DEBUG
 #log4j.logger.org.apache.usergrid.persistence.qakka=DEBUG
 
-log4j.logger.org.apache.usergrid.persistence.qakka.distributed.actors=DEBUG
-log4j.logger.org.apache.usergrid.persistence.queue=INFO
+log4j.logger.org.apache.usergrid.persistence.qakka=DEBUG
+log4j.logger.org.apache.usergrid.persistence.queue=DEBUG
 log4j.logger.org.apache.usergrid.corepersistence.asyncevents=INFO