You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2016/10/05 14:04:46 UTC
[5/5] usergrid git commit: Return to using asynchronous actor for
in-memory queue refresh job.
Return to using asynchronous actor for in-memory queue refresh job.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/a90a0bbd
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/a90a0bbd
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/a90a0bbd
Branch: refs/heads/usergrid-1318-queue
Commit: a90a0bbd2fe341395e4bd084e566881feb98d9bf
Parents: bd5835b
Author: Dave Johnson <sn...@apache.org>
Authored: Wed Oct 5 10:02:45 2016 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Wed Oct 5 10:02:45 2016 -0400
----------------------------------------------------------------------
.../usergrid/persistence/qakka/QakkaFig.java | 2 +-
.../qakka/distributed/actors/QueueActor.java | 126 ++++++++-----------
.../distributed/actors/QueueActorHelper.java | 118 ++++++++++++-----
.../qakka/core/QueueMessageManagerTest.java | 3 -
.../distributed/actors/QueueReaderTest.java | 25 ++--
.../queue/src/test/resources/log4j.properties | 4 +-
6 files changed, 152 insertions(+), 126 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/a90a0bbd/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
index 472c241..7d89187 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
@@ -109,7 +109,7 @@ public interface QakkaFig extends GuicyFig, Serializable {
/** How long to wait for response from queue actor before timing out and trying again */
@Key(QUEUE_GET_TIMEOUT)
- @Default("2")
+ @Default("4")
int getGetTimeoutSeconds();
/** Max number of times to retry call to queue writer for queue send operation */
http://git-wip-us.apache.org/repos/asf/usergrid/blob/a90a0bbd/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
index c706f7d..64f12d4 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
@@ -129,22 +129,23 @@ public class QueueActor extends UntypedActor {
} else if ( message instanceof QueueRefreshRequest ) {
QueueRefreshRequest request = (QueueRefreshRequest)message;
-
queuesSeen.add( request.getQueueName() );
- queueActorHelper.queueRefresh( request.getQueueName() );
-// if ( queueReadersByQueueName.get( request.getQueueName() ) == null ) {
-//
-// if ( !request.isOnlyIfEmpty() || inMemoryQueue.peek( request.getQueueName()) == null ) {
-// ActorRef readerRef = getContext().actorOf(
-// Props.create( GuiceActorProducer.class, injector, QueueRefresher.class ),
-// request.getQueueName() + "_reader");
-// queueReadersByQueueName.put( request.getQueueName(), readerRef );
-// }
-// }
-//
-// // hand-off to queue's reader
-// queueReadersByQueueName.get( request.getQueueName() ).tell( request, self() );
+// // NOT asynchronous
+// queueActorHelper.queueRefresh( request.getQueueName() );
+
+ if ( queueReadersByQueueName.get( request.getQueueName() ) == null ) {
+
+ if ( !request.isOnlyIfEmpty() || inMemoryQueue.peek( request.getQueueName()) == null ) {
+ ActorRef readerRef = getContext().actorOf(
+ Props.create( GuiceActorProducer.class, injector, QueueRefresher.class ),
+ request.getQueueName() + "_reader");
+ queueReadersByQueueName.put( request.getQueueName(), readerRef );
+ }
+ }
+
+ // hand-off to queue's reader
+ queueReadersByQueueName.get( request.getQueueName() ).tell( request, self() );
} else if ( message instanceof QueueTimeoutRequest ) {
QueueTimeoutRequest request = (QueueTimeoutRequest)message;
@@ -158,10 +159,9 @@ public class QueueActor extends UntypedActor {
queueTimeoutersByQueueName.put( request.getQueueName(), readerRef );
}
- // hand-off to queue's timeouter
+ // ASYNCHRONOUS -> hand-off to queue's timeouter
queueTimeoutersByQueueName.get( request.getQueueName() ).tell( request, self() );
-
} else if ( message instanceof ShardCheckRequest ) {
ShardCheckRequest request = (ShardCheckRequest)message;
@@ -174,75 +174,59 @@ public class QueueActor extends UntypedActor {
shardAllocatorsByQueueName.put( request.getQueueName(), readerRef );
}
- // hand-off to queue's shard allocator
+ // ASYNCHRONOUS -> hand-off to queue's shard allocator
shardAllocatorsByQueueName.get( request.getQueueName() ).tell( request, self() );
-
} else if ( message instanceof QueueGetRequest) {
- Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.GET_TIME_GET ).time();
- try {
- QueueGetRequest queueGetRequest = (QueueGetRequest) message;
-
- queuesSeen.add( queueGetRequest.getQueueName() );
+ QueueGetRequest queueGetRequest = (QueueGetRequest) message;
- Collection<DatabaseQueueMessage> queueMessages = new ArrayList<>();
+ String queueName = queueGetRequest.getQueueName();
+ int numRequested = queueGetRequest.getNumRequested();
- while (queueMessages.size() < queueGetRequest.getNumRequested()) {
+ queuesSeen.add( queueName );
- DatabaseQueueMessage queueMessage = inMemoryQueue.poll( queueGetRequest.getQueueName() );
+ Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.GET_TIME_GET ).time();
+ try {
- if (queueMessage != null) {
- if (queueActorHelper.putInflight( queueGetRequest.getQueueName(), queueMessage )) {
- queueMessages.add( queueMessage );
- }
- } else {
- logger.debug("in-memory queue for {} is empty, object is: {}",
- queueGetRequest.getQueueName(), inMemoryQueue );
- break;
- }
- }
+ Collection<DatabaseQueueMessage> messages = queueActorHelper.getMessages( queueName, numRequested);
messageCounterSerialization.decrementCounter(
- queueGetRequest.getQueueName(),
+ queueName,
DatabaseQueueMessage.Type.DEFAULT,
- queueMessages.size());
-
- logger.debug("{} returning {} for queue {}",
- this, queueMessages.size(), queueGetRequest.getQueueName());
+ messages.size());
getSender().tell( new QueueGetResponse(
- DistributedQueueService.Status.SUCCESS, queueMessages ), getSender() );
-
- long runs = runCount.incrementAndGet();
- long messagesReturned = messageCount.addAndGet( queueMessages.size() );
-
- if ( logger.isDebugEnabled() && runs % 100 == 0 ) {
-
- final DecimalFormat format = new DecimalFormat("##.###");
- final long nano = 1000000000;
- Timer t = metricsService.getMetricRegistry().timer(MetricsService.GET_TIME_GET );
-
- logger.debug("QueueActor get stats (queues {}):\n" +
- " Num runs={}\n" +
- " Messages={}\n" +
- " Mean={}\n" +
- " One min rate={}\n" +
- " Five min rate={}\n" +
- " Snapshot mean={}\n" +
- " Snapshot min={}\n" +
- " Snapshot max={}",
- queuesSeen.toArray(),
- t.getCount(),
- messagesReturned,
- format.format( t.getMeanRate() ),
- format.format( t.getOneMinuteRate() ),
- format.format( t.getFiveMinuteRate() ),
- format.format( t.getSnapshot().getMean() / nano ),
- format.format( (double) t.getSnapshot().getMin() / nano ),
- format.format( (double) t.getSnapshot().getMax() / nano ) );
- }
+ DistributedQueueService.Status.SUCCESS, messages ), getSender() );
+// long runs = runCount.incrementAndGet();
+// long messagesReturned = messageCount.addAndGet( queueMessages.size() );
+//
+// if ( logger.isDebugEnabled() && runs % 100 == 0 ) {
+//
+// final DecimalFormat format = new DecimalFormat("##.###");
+// final long nano = 1000000000;
+// Timer t = metricsService.getMetricRegistry().timer(MetricsService.GET_TIME_GET );
+//
+// logger.debug("QueueActor get stats (queues {}):\n" +
+// " Num runs={}\n" +
+// " Messages={}\n" +
+// " Mean={}\n" +
+// " One min rate={}\n" +
+// " Five min rate={}\n" +
+// " Snapshot mean={}\n" +
+// " Snapshot min={}\n" +
+// " Snapshot max={}",
+// queuesSeen.toArray(),
+// t.getCount(),
+// messagesReturned,
+// format.format( t.getMeanRate() ),
+// format.format( t.getOneMinuteRate() ),
+// format.format( t.getFiveMinuteRate() ),
+// format.format( t.getSnapshot().getMean() / nano ),
+// format.format( (double) t.getSnapshot().getMin() / nano ),
+// format.format( (double) t.getSnapshot().getMax() / nano ) );
+// }
} finally {
timer.close();
http://git-wip-us.apache.org/repos/asf/usergrid/blob/a90a0bbd/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java
index 68250df..0b31e16 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java
@@ -37,7 +37,8 @@ import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardIterato
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.text.DecimalFormat;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
@@ -98,12 +99,39 @@ public class QueueActorHelper {
}
+ Collection<DatabaseQueueMessage> getMessages(String queueName, int numRequested ) {
+
+ Collection<DatabaseQueueMessage> queueMessages = new ArrayList<>();
+
+ while (queueMessages.size() < numRequested) {
+
+ DatabaseQueueMessage queueMessage = inMemoryQueue.poll( queueName );
+
+ if (queueMessage != null) {
+
+ if (putInflight( queueName, queueMessage )) {
+ queueMessages.add( queueMessage );
+ }
+
+ } else {
+ //logger.debug("in-memory queue for {} is empty, object is: {}", queueName, inMemoryQueue );
+ break;
+ }
+ }
+
+ //logger.debug("{} returning {} for queue {}", this, queueMessages.size(), queueName);
+ return queueMessages;
+
+ }
+
+
DistributedQueueService.Status ackQueueMessage(String queueName, UUID queueMessageId ) {
DatabaseQueueMessage queueMessage = loadDatabaseQueueMessage(
queueName, queueMessageId, DatabaseQueueMessage.Type.INFLIGHT );
if ( queueMessage == null ) {
+ logger.error("Queue {} queue message id {} not found in inflight table", queueName, queueMessageId);
return DistributedQueueService.Status.NOT_INFLIGHT;
}
@@ -152,10 +180,25 @@ public class QueueActorHelper {
UUID qmid = queueMessage.getQueueMessageId();
try {
- queueMessage.setType( DatabaseQueueMessage.Type.INFLIGHT );
- queueMessage.setShardId( null );
- queueMessage.setInflightAt( System.currentTimeMillis() );
- messageSerialization.writeMessage( queueMessage );
+
+ DatabaseQueueMessage inflightMessage = new DatabaseQueueMessage(
+ queueMessage.getMessageId(),
+ DatabaseQueueMessage.Type.INFLIGHT,
+ queueName,
+ actorSystemFig.getRegionLocal(),
+ null, // let serialization select the shard
+ queueMessage.getQueuedAt(),
+ System.currentTimeMillis(),
+ qmid);
+
+ messageSerialization.writeMessage( inflightMessage );
+
+ DatabaseQueueMessage retrieved = loadDatabaseQueueMessage(
+ queueName, qmid, DatabaseQueueMessage.Type.INFLIGHT );
+ if ( retrieved == null ) {
+ logger.error("Failed ot write queue message id {} to inflight table", qmid);
+ return false;
+ }
messageSerialization.deleteMessage(
queueName,
@@ -191,6 +234,7 @@ public class QueueActorHelper {
return true;
}
+
void queueRefresh( String queueName ) {
Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.REFRESH_TIME).time();
@@ -199,12 +243,20 @@ public class QueueActorHelper {
if (inMemoryQueue.size( queueName ) < qakkaFig.getQueueInMemorySize()) {
+ // TODO: need to track the starting shard
+
ShardIterator shardIterator = new ShardIterator(
cassandraClient, queueName, actorSystemFig.getRegionLocal(),
Shard.Type.DEFAULT, Optional.empty() );
UUID since = inMemoryQueue.getNewest( queueName );
+// if ( since != null ) {
+// logger.debug( "Loading queue {} messages newer than {}", queueName, since.timestamp() );
+// } else {
+// logger.debug( "Loading queue {} messages newer than [null]", queueName );
+// }
+
String region = actorSystemFig.getRegionLocal();
MultiShardMessageIterator multiShardIterator = new MultiShardMessageIterator(
cassandraClient, queueName, region, DatabaseQueueMessage.Type.DEFAULT,
@@ -219,34 +271,34 @@ public class QueueActorHelper {
count++;
}
- long runs = runCount.incrementAndGet();
- long readCount = totalRead.addAndGet( count );
-
- if ( logger.isDebugEnabled() && runs % 100 == 0 ) {
-
- final DecimalFormat format = new DecimalFormat("##.###");
- final long nano = 1000000000;
- Timer t = metricsService.getMetricRegistry().timer( MetricsService.REFRESH_TIME );
-
- logger.debug("QueueRefresher for queue '{}' stats:\n" +
- " Num runs={}\n" +
- " Read count={}\n" +
- " Mean={}\n" +
- " One min rate={}\n" +
- " Five min rate={}\n" +
- " Snapshot mean={}\n" +
- " Snapshot min={}\n" +
- " Snapshot max={}",
- queueName,
- t.getCount(),
- readCount,
- format.format( t.getMeanRate() ),
- format.format( t.getOneMinuteRate() ),
- format.format( t.getFiveMinuteRate() ),
- format.format( t.getSnapshot().getMean() / nano ),
- format.format( (double) t.getSnapshot().getMin() / nano ),
- format.format( (double) t.getSnapshot().getMax() / nano ) );
- }
+// long runs = runCount.incrementAndGet();
+// long readCount = totalRead.addAndGet( count );
+//
+// if ( logger.isDebugEnabled() && runs % 100 == 0 ) {
+//
+// final DecimalFormat format = new DecimalFormat("##.###");
+// final long nano = 1000000000;
+// Timer t = metricsService.getMetricRegistry().timer( MetricsService.REFRESH_TIME );
+//
+// logger.debug("QueueRefresher for queue '{}' stats:\n" +
+// " Num runs={}\n" +
+// " Read count={}\n" +
+// " Mean={}\n" +
+// " One min rate={}\n" +
+// " Five min rate={}\n" +
+// " Snapshot mean={}\n" +
+// " Snapshot min={}\n" +
+// " Snapshot max={}",
+// queueName,
+// t.getCount(),
+// readCount,
+// format.format( t.getMeanRate() ),
+// format.format( t.getOneMinuteRate() ),
+// format.format( t.getFiveMinuteRate() ),
+// format.format( t.getSnapshot().getMean() / nano ),
+// format.format( (double) t.getSnapshot().getMin() / nano ),
+// format.format( (double) t.getSnapshot().getMax() / nano ) );
+// }
if ( count > 0 ) {
logger.debug( "Added {} in-memory for queue {}, new size = {}",
http://git-wip-us.apache.org/repos/asf/usergrid/blob/a90a0bbd/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
index 9e0a9d8..f77f31b 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
@@ -42,7 +42,6 @@ import org.apache.usergrid.persistence.qakka.serialization.transferlog.TransferL
import org.apache.usergrid.persistence.qakka.serialization.transferlog.TransferLogSerialization;
import org.apache.usergrid.persistence.queue.TestModule;
import org.junit.Assert;
-import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -240,8 +239,6 @@ public class QueueMessageManagerTest extends AbstractTest {
Injector injector = getInjector();
- CassandraClient cassandraClient = injector.getInstance( CassandraClientImpl.class );
-
injector.getInstance( App.class ); // init the INJECTOR
ActorSystemFig actorSystemFig = injector.getInstance( ActorSystemFig.class );
http://git-wip-us.apache.org/repos/asf/usergrid/blob/a90a0bbd/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueReaderTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueReaderTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueReaderTest.java
index ba3c0f8..5b42184 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueReaderTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueReaderTest.java
@@ -22,22 +22,18 @@ package org.apache.usergrid.persistence.qakka.distributed.actors;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
-import com.google.inject.Inject;
import com.google.inject.Injector;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.usergrid.persistence.actorsystem.ActorSystemFig;
import org.apache.usergrid.persistence.actorsystem.GuiceActorProducer;
-import org.apache.usergrid.persistence.qakka.QakkaFig;
-import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl;
-import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard;
import org.apache.usergrid.persistence.qakka.AbstractTest;
-import org.apache.usergrid.persistence.qakka.App;
-import org.apache.usergrid.persistence.qakka.core.CassandraClient;
+import org.apache.usergrid.persistence.qakka.QakkaFig;
import org.apache.usergrid.persistence.qakka.core.QakkaUtils;
import org.apache.usergrid.persistence.qakka.core.impl.InMemoryQueue;
import org.apache.usergrid.persistence.qakka.distributed.messages.QueueRefreshRequest;
import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
import org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization;
+import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard;
import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardSerialization;
import org.junit.Assert;
import org.junit.Test;
@@ -57,14 +53,14 @@ public class QueueReaderTest extends AbstractTest {
Injector injector = getInjector();
- QakkaFig qakkaFig = getInjector().getInstance( QakkaFig.class );
- ActorSystemFig actorSystemFig = getInjector().getInstance( ActorSystemFig.class );
- ShardSerialization shardSerialization = getInjector().getInstance( ShardSerialization.class );
+ QakkaFig qakkaFig = injector.getInstance( QakkaFig.class );
+ ActorSystemFig actorSystemFig = injector.getInstance( ActorSystemFig.class );
+ ShardSerialization shardSerialization = injector.getInstance( ShardSerialization.class );
int numMessages = 200;
// create queue messages, only first lot get queue message data
- QueueMessageSerialization serialization = getInjector().getInstance( QueueMessageSerialization.class );
+ QueueMessageSerialization serialization = injector.getInstance( QueueMessageSerialization.class );
String queueName = "qrt_queue_" + RandomStringUtils.randomAlphanumeric( 10 );
Shard newShard = new Shard( queueName, actorSystemFig.getRegionLocal(),
@@ -88,21 +84,18 @@ public class QueueReaderTest extends AbstractTest {
serialization.writeMessage( message );
}
- InMemoryQueue inMemoryQueue = getInjector().getInstance( InMemoryQueue.class );
+ InMemoryQueue inMemoryQueue = injector.getInstance( InMemoryQueue.class );
Assert.assertEquals( 0, inMemoryQueue.size( queueName ) );
// run the QueueRefresher to fill up the in-memory queue
- ActorSystem system = ActorSystem.create("Test-" + queueName);
- ActorRef queueReaderRef = system.actorOf(
- Props.create( GuiceActorProducer.class, injector, QueueRefresher.class ), "queueReader");
- QueueRefreshRequest refreshRequest = new QueueRefreshRequest( queueName, false );
+ QueueActorHelper helper = injector.getInstance( QueueActorHelper.class );
// need to wait for refresh to complete
int maxRetries = 10;
int retries = 0;
while ( inMemoryQueue.size( queueName ) < qakkaFig.getQueueInMemorySize() && retries++ < maxRetries ) {
- queueReaderRef.tell( refreshRequest, null ); // tell sends message, returns immediately
+ helper.queueRefresh( queueName );
Thread.sleep(1000);
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/a90a0bbd/stack/corepersistence/queue/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/resources/log4j.properties b/stack/corepersistence/queue/src/test/resources/log4j.properties
index e1cbda4..eb45d2a 100644
--- a/stack/corepersistence/queue/src/test/resources/log4j.properties
+++ b/stack/corepersistence/queue/src/test/resources/log4j.properties
@@ -28,6 +28,6 @@ log4j.logger.org.glassfish=WARN
#log4j.logger.org.apache.usergrid.persistence.actorsystem=DEBUG
#log4j.logger.org.apache.usergrid.persistence.qakka=DEBUG
-log4j.logger.org.apache.usergrid.persistence.qakka.distributed.actors=DEBUG
-log4j.logger.org.apache.usergrid.persistence.queue=INFO
+log4j.logger.org.apache.usergrid.persistence.qakka=DEBUG
+log4j.logger.org.apache.usergrid.persistence.queue=DEBUG
log4j.logger.org.apache.usergrid.corepersistence.asyncevents=INFO