You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2016/10/05 21:44:26 UTC

[1/5] usergrid git commit: Ensure that injector is not used in Akka constructors to avoid "cannot serialize injector" errors

Repository: usergrid
Updated Branches:
  refs/heads/usergrid-1318-queue a90a0bbd2 -> fbce160a1


Ensure that injector is not used in Akka constructors to avoid "cannot serialize injector" errors


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/fe08fa72
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/fe08fa72
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/fe08fa72

Branch: refs/heads/usergrid-1318-queue
Commit: fe08fa72f8e807565141bac871796479f28aa3f1
Parents: a90a0bb
Author: Dave Johnson <sn...@apache.org>
Authored: Wed Oct 5 17:38:32 2016 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Wed Oct 5 17:38:32 2016 -0400

----------------------------------------------------------------------
 .../qakka/distributed/actors/QueueActor.java    | 75 +++++++-------------
 .../distributed/actors/QueueActorRouter.java    |  2 +-
 .../distributed/actors/QueueRefresher.java      |  1 -
 .../qakka/distributed/actors/QueueSender.java   | 34 +++++----
 .../distributed/actors/QueueSenderRouter.java   |  2 +-
 .../distributed/actors/QueueTimeouter.java      | 66 +++++------------
 .../qakka/distributed/actors/QueueWriter.java   | 30 ++++----
 .../distributed/actors/QueueWriterRouter.java   |  2 +-
 .../distributed/actors/ShardAllocator.java      | 35 ++++++---
 .../impl/DistributedQueueServiceImpl.java       | 13 ++--
 .../impl/QueueActorRouterProducer.java          |  2 +-
 .../impl/QueueSenderRouterProducer.java         |  2 +-
 .../impl/QueueWriterRouterProducer.java         |  2 +-
 .../impl/QueueMessageSerializationImpl.java     | 18 +++--
 14 files changed, 132 insertions(+), 152 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/fe08fa72/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
index 64f12d4..9ce38ef 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
@@ -52,7 +52,6 @@ public class QueueActor extends UntypedActor {
     private final InMemoryQueue    inMemoryQueue;
     private final QueueActorHelper queueActorHelper;
     private final MetricsService   metricsService;
-
     private final MessageCounterSerialization messageCounterSerialization;
 
     private final Map<String, Cancellable> refreshSchedulersByQueueName = new HashMap<>();
@@ -63,24 +62,32 @@ public class QueueActor extends UntypedActor {
     private final Map<String, ActorRef> queueTimeoutersByQueueName = new HashMap<>();
     private final Map<String, ActorRef> shardAllocatorsByQueueName = new HashMap<>();
 
-    private final AtomicLong runCount = new AtomicLong(0);
-    private final AtomicLong messageCount = new AtomicLong(0);
     private final Set<String> queuesSeen = new HashSet<>();
 
-    private final Injector injector;
+    //private final Injector injector;
 
 
     @Inject
-    public QueueActor( Injector injector ) {
-
-        this.injector = injector;
-
-        qakkaFig         = injector.getInstance( QakkaFig.class );
-        inMemoryQueue    = injector.getInstance( InMemoryQueue.class );
-        queueActorHelper = injector.getInstance( QueueActorHelper.class );
-        metricsService   = injector.getInstance( MetricsService.class );
-
-        messageCounterSerialization = injector.getInstance( MessageCounterSerialization.class );
+    public QueueActor(
+        //Injector         injector,
+        QakkaFig         qakkaFig,
+        InMemoryQueue    inMemoryQueue,
+        QueueActorHelper queueActorHelper,
+        MetricsService   metricsService,
+        MessageCounterSerialization messageCounterSerialization
+    ) {
+        //this.injector = injector;
+        this.qakkaFig = qakkaFig;
+        this.inMemoryQueue = inMemoryQueue;
+        this.queueActorHelper = queueActorHelper;
+        this.metricsService = metricsService;
+        this.messageCounterSerialization = messageCounterSerialization;
+
+//        qakkaFig         = injector.getInstance( QakkaFig.class );
+//        inMemoryQueue    = injector.getInstance( InMemoryQueue.class );
+//        queueActorHelper = injector.getInstance( QueueActorHelper.class );
+//        metricsService   = injector.getInstance( MetricsService.class );
+//        messageCounterSerialization = injector.getInstance( MessageCounterSerialization.class );
     }
 
     @Override
@@ -138,7 +145,7 @@ public class QueueActor extends UntypedActor {
 
                 if ( !request.isOnlyIfEmpty() || inMemoryQueue.peek( request.getQueueName()) == null ) {
                     ActorRef readerRef = getContext().actorOf(
-                        Props.create( GuiceActorProducer.class, injector, QueueRefresher.class ),
+                        Props.create( GuiceActorProducer.class, QueueRefresher.class ),
                         request.getQueueName() + "_reader");
                     queueReadersByQueueName.put( request.getQueueName(), readerRef );
                 }
@@ -154,7 +161,7 @@ public class QueueActor extends UntypedActor {
 
             if ( queueTimeoutersByQueueName.get( request.getQueueName() ) == null ) {
                 ActorRef readerRef = getContext().actorOf(
-                    Props.create( GuiceActorProducer.class, injector, QueueTimeouter.class),
+                    Props.create( GuiceActorProducer.class, QueueTimeouter.class),
                     request.getQueueName() + "_timeouter");
                 queueTimeoutersByQueueName.put( request.getQueueName(), readerRef );
             }
@@ -169,7 +176,7 @@ public class QueueActor extends UntypedActor {
 
             if ( shardAllocatorsByQueueName.get( request.getQueueName() ) == null ) {
                 ActorRef readerRef = getContext().actorOf(
-                    Props.create( GuiceActorProducer.class, injector, ShardAllocator.class),
+                    Props.create( GuiceActorProducer.class, ShardAllocator.class),
                     request.getQueueName() + "_shard_allocator");
                 shardAllocatorsByQueueName.put( request.getQueueName(), readerRef );
             }
@@ -191,43 +198,9 @@ public class QueueActor extends UntypedActor {
 
                 Collection<DatabaseQueueMessage> messages = queueActorHelper.getMessages( queueName, numRequested);
 
-                messageCounterSerialization.decrementCounter(
-                    queueName,
-                    DatabaseQueueMessage.Type.DEFAULT,
-                    messages.size());
-
                 getSender().tell( new QueueGetResponse(
                         DistributedQueueService.Status.SUCCESS, messages ), getSender() );
 
-//                long runs = runCount.incrementAndGet();
-//                long messagesReturned = messageCount.addAndGet( queueMessages.size() );
-//
-//                if ( logger.isDebugEnabled() && runs % 100 == 0 ) {
-//
-//                    final DecimalFormat format = new DecimalFormat("##.###");
-//                    final long nano = 1000000000;
-//                    Timer t = metricsService.getMetricRegistry().timer(MetricsService.GET_TIME_GET );
-//
-//                    logger.debug("QueueActor get stats (queues {}):\n" +
-//                            "   Num runs={}\n" +
-//                            "   Messages={}\n" +
-//                            "   Mean={}\n" +
-//                            "   One min rate={}\n" +
-//                            "   Five min rate={}\n" +
-//                            "   Snapshot mean={}\n" +
-//                            "   Snapshot min={}\n" +
-//                            "   Snapshot max={}",
-//                        queuesSeen.toArray(),
-//                        t.getCount(),
-//                        messagesReturned,
-//                        format.format( t.getMeanRate() ),
-//                        format.format( t.getOneMinuteRate() ),
-//                        format.format( t.getFiveMinuteRate() ),
-//                        format.format( t.getSnapshot().getMean() / nano ),
-//                        format.format( (double) t.getSnapshot().getMin() / nano ),
-//                        format.format( (double) t.getSnapshot().getMax() / nano ) );
-//                }
-
             } finally {
                 timer.close();
             }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/fe08fa72/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java
index 9257a0d..c40a3d9 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java
@@ -42,7 +42,7 @@ public class QueueActorRouter extends UntypedActor {
     public QueueActorRouter( Injector injector ) {
 
         this.routerRef = getContext().actorOf( FromConfig.getInstance().props(
-            Props.create(GuiceActorProducer.class, injector, QueueActor.class)), "router");
+            Props.create(GuiceActorProducer.class, QueueActor.class)), "router");
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/usergrid/blob/fe08fa72/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java
index 2f70088..ae9969c 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java
@@ -43,7 +43,6 @@ public class QueueRefresher extends UntypedActor {
         if ( message instanceof QueueRefreshRequest ) {
 
             QueueRefreshRequest request = (QueueRefreshRequest) message;
-            logger.debug( "running for queue {}", request.getQueueName() );
             String queueName = request.getQueueName();
             helper.queueRefresh( queueName );
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/fe08fa72/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java
index 739e1c4..3dc695e 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java
@@ -61,19 +61,32 @@ public class QueueSender extends UntypedActor {
     private final TransferLogSerialization  transferLogSerialization;
     private final AuditLogSerialization     auditLogSerialization;
     private final ActorSystemFig            actorSystemFig;
-    private final QakkaFig qakkaFig;
+    private final QakkaFig                  qakkaFig;
     private final MetricsService            metricsService;
 
 
     @Inject
-    public QueueSender( Injector injector ) {
-
-        actorSystemManager       = injector.getInstance( ActorSystemManager.class );
-        transferLogSerialization = injector.getInstance( TransferLogSerialization.class );
-        auditLogSerialization    = injector.getInstance( AuditLogSerialization.class );
-        actorSystemFig           = injector.getInstance( ActorSystemFig.class );
-        qakkaFig                 = injector.getInstance( QakkaFig.class );
-        metricsService           = injector.getInstance( MetricsService.class );
+    public QueueSender(
+        ActorSystemManager        actorSystemManager,
+        TransferLogSerialization  transferLogSerialization,
+        AuditLogSerialization     auditLogSerialization,
+        ActorSystemFig            actorSystemFig,
+        QakkaFig                  qakkaFig,
+        MetricsService            metricsService
+    ) {
+        this.actorSystemManager = actorSystemManager;
+        this.transferLogSerialization = transferLogSerialization;
+        this.auditLogSerialization = auditLogSerialization;
+        this.actorSystemFig = actorSystemFig;
+        this.qakkaFig = qakkaFig;
+        this.metricsService = metricsService;
+
+//        actorSystemManager       = injector.getInstance( ActorSystemManager.class );
+//        transferLogSerialization = injector.getInstance( TransferLogSerialization.class );
+//        auditLogSerialization    = injector.getInstance( AuditLogSerialization.class );
+//        actorSystemFig           = injector.getInstance( ActorSystemFig.class );
+//        qakkaFig                 = injector.getInstance( QakkaFig.class );
+//        metricsService           = injector.getInstance( MetricsService.class );
     }
 
     @Override
@@ -195,9 +208,6 @@ public class QueueSender extends UntypedActor {
         } else if ( writeStatus != null
                 && writeStatus.equals( QueueWriter.WriteStatus.SUCCESS_XFERLOG_NOTDELETED ) ) {
 
-            //logger.debug( "Delivery Success, now removing transfer log: {}, {}, {}, {}",
-            //        new Object[]{queueName, actorSystemFig.getRegionLocal(), region, messageId} );
-
             // queue actor failed to clean up transfer log
             try {
                 transferLogSerialization.removeTransferLog(

http://git-wip-us.apache.org/repos/asf/usergrid/blob/fe08fa72/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSenderRouter.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSenderRouter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSenderRouter.java
index 92d0785..a205d71 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSenderRouter.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSenderRouter.java
@@ -41,7 +41,7 @@ public class QueueSenderRouter extends UntypedActor {
     public QueueSenderRouter( Injector injector ) {
 
         this.router = getContext().actorOf( FromConfig.getInstance().props(
-            Props.create( GuiceActorProducer.class, injector, QueueSender.class )), "router");
+            Props.create( GuiceActorProducer.class, QueueSender.class )), "router");
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/usergrid/blob/fe08fa72/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java
index 9b11277..33f1dd9 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java
@@ -53,24 +53,29 @@ public class QueueTimeouter extends UntypedActor {
     private final QueueMessageSerialization messageSerialization;
     private final MetricsService            metricsService;
     private final ActorSystemFig            actorSystemFig;
-    private final QakkaFig qakkaFig;
+    private final QakkaFig                  qakkaFig;
     private final CassandraClient           cassandraClient;
-    private final MessageCounterSerialization messageCounterSerialization;
-
-    private final AtomicLong runCount = new AtomicLong(0);
-    private final AtomicLong totalTimedout = new AtomicLong(0);
 
 
     @Inject
-    public QueueTimeouter( Injector injector) {
-
-        messageSerialization = injector.getInstance( QueueMessageSerialization.class );
-        actorSystemFig       = injector.getInstance( ActorSystemFig.class );
-        qakkaFig             = injector.getInstance( QakkaFig.class );
-        metricsService       = injector.getInstance( MetricsService.class );
-        cassandraClient      = injector.getInstance( CassandraClientImpl.class );
-
-        messageCounterSerialization = injector.getInstance( MessageCounterSerialization.class );
+    public QueueTimeouter(
+        QueueMessageSerialization messageSerialization,
+        MetricsService            metricsService,
+        ActorSystemFig            actorSystemFig,
+        QakkaFig                  qakkaFig,
+        CassandraClient           cassandraClient
+    ) {
+        this.messageSerialization = messageSerialization;
+        this.metricsService = metricsService;
+        this.actorSystemFig = actorSystemFig;
+        this.qakkaFig = qakkaFig;
+        this.cassandraClient = cassandraClient;
+
+//        messageSerialization = injector.getInstance( QueueMessageSerialization.class );
+//        actorSystemFig       = injector.getInstance( ActorSystemFig.class );
+//        qakkaFig             = injector.getInstance( QakkaFig.class );
+//        metricsService       = injector.getInstance( MetricsService.class );
+//        cassandraClient      = injector.getInstance( CassandraClientImpl.class );
     }
 
 
@@ -135,41 +140,8 @@ public class QueueTimeouter extends UntypedActor {
                     }
                 }
 
-
-                long runs = runCount.incrementAndGet();
-                long timeoutCount = totalTimedout.addAndGet( count );
-
-                if ( logger.isDebugEnabled() && runs % 100 == 0 ) {
-
-                    final DecimalFormat format = new DecimalFormat("##.###");
-                    final long nano = 1000000000;
-                    Timer t = metricsService.getMetricRegistry().timer(MetricsService.TIMEOUT_TIME );
-
-                    logger.debug("QueueTimeouter for queue '{}' stats:\n" +
-                            "   Num runs={}\n" +
-                            "   Timeout count={}\n" +
-                            "   Mean={}\n" +
-                            "   One min rate={}\n" +
-                            "   Five min rate={}\n" +
-                            "   Snapshot mean={}\n" +
-                            "   Snapshot min={}\n" +
-                            "   Snapshot max={}",
-                        queueName,
-                        t.getCount(),
-                        timeoutCount,
-                        format.format( t.getMeanRate() ),
-                        format.format( t.getOneMinuteRate() ),
-                        format.format( t.getFiveMinuteRate() ),
-                        format.format( t.getSnapshot().getMean() / nano ),
-                        format.format( (double) t.getSnapshot().getMin() / nano ),
-                        format.format( (double) t.getSnapshot().getMax() / nano ) );
-                }
-
                 if (count > 0) {
                     logger.debug( "Timed out {} messages for queue {}", count, queueName );
-
-                    messageCounterSerialization.decrementCounter(
-                        queueName, DatabaseQueueMessage.Type.DEFAULT, count);
                 }
 
             } finally {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/fe08fa72/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java
index 273f0b2..e014d59 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java
@@ -46,25 +46,28 @@ public class QueueWriter extends UntypedActor {
 
     public enum WriteStatus { SUCCESS_XFERLOG_DELETED, SUCCESS_XFERLOG_NOTDELETED, ERROR };
 
-    private final DistributedQueueService   distributedQueueService;
     private final QueueMessageSerialization messageSerialization;
     private final TransferLogSerialization  transferLogSerialization;
     private final AuditLogSerialization     auditLogSerialization;
     private final MetricsService            metricsService;
 
-    private final MessageCounterSerialization messageCounterSerialization;
-
 
     @Inject
-    public QueueWriter( Injector injector ) {
-
-        messageSerialization     = injector.getInstance( QueueMessageSerialization.class );
-        transferLogSerialization = injector.getInstance( TransferLogSerialization.class );
-        auditLogSerialization    = injector.getInstance( AuditLogSerialization.class );
-        metricsService           = injector.getInstance( MetricsService.class );
-
-        distributedQueueService     = injector.getInstance( DistributedQueueService.class );
-        messageCounterSerialization = injector.getInstance( MessageCounterSerialization.class );
+    public QueueWriter(
+        QueueMessageSerialization messageSerialization,
+        TransferLogSerialization  transferLogSerialization,
+        AuditLogSerialization     auditLogSerialization,
+        MetricsService            metricsService
+    ) {
+        this.messageSerialization = messageSerialization;
+        this.transferLogSerialization = transferLogSerialization;
+        this.auditLogSerialization = auditLogSerialization;
+        this.metricsService = metricsService;
+
+//        messageSerialization     = injector.getInstance( QueueMessageSerialization.class );
+//        transferLogSerialization = injector.getInstance( TransferLogSerialization.class );
+//        auditLogSerialization    = injector.getInstance( AuditLogSerialization.class );
+//        metricsService           = injector.getInstance( MetricsService.class );
     }
 
     @Override
@@ -97,9 +100,6 @@ public class QueueWriter extends UntypedActor {
 
                     messageSerialization.writeMessage( dbqm );
 
-                    messageCounterSerialization.incrementCounter(
-                        qa.getQueueName(), DatabaseQueueMessage.Type.DEFAULT, 1);
-
                     //logger.debug("Wrote queue message id {} to queue name {}",
                     //        dbqm.getQueueMessageId(), dbqm.getQueueName());
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/fe08fa72/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriterRouter.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriterRouter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriterRouter.java
index f0540af..0e3e981 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriterRouter.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriterRouter.java
@@ -40,7 +40,7 @@ public class QueueWriterRouter extends UntypedActor {
     public QueueWriterRouter( Injector injector ) {
 
         this.router = getContext().actorOf( FromConfig.getInstance().props(
-            Props.create( GuiceActorProducer.class, injector, QueueWriter.class )), "router");
+            Props.create( GuiceActorProducer.class, QueueWriter.class )), "router");
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/usergrid/blob/fe08fa72/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocator.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocator.java
index 65c3370..46dc0ed 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocator.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocator.java
@@ -31,6 +31,7 @@ import org.apache.usergrid.persistence.qakka.MetricsService;
 import org.apache.usergrid.persistence.qakka.QakkaFig;
 import org.apache.usergrid.persistence.qakka.core.CassandraClient;
 import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl;
+import org.apache.usergrid.persistence.qakka.core.QakkaUtils;
 import org.apache.usergrid.persistence.qakka.distributed.messages.ShardCheckRequest;
 import org.apache.usergrid.persistence.qakka.exceptions.NotFoundException;
 import org.apache.usergrid.persistence.qakka.exceptions.QakkaRuntimeException;
@@ -59,14 +60,27 @@ public class ShardAllocator extends UntypedActor {
 
 
     @Inject
-    public ShardAllocator( Injector injector ) {
-
-        this.qakkaFig                  = injector.getInstance( QakkaFig.class );
-        this.shardCounterSerialization = injector.getInstance( ShardCounterSerializationImpl.class );
-        this.shardSerialization        = injector.getInstance( ShardSerializationImpl.class );
-        this.actorSystemFig            = injector.getInstance( ActorSystemFig.class );
-        this.metricsService            = injector.getInstance( MetricsService.class );
-        this.cassandraClient           = injector.getInstance( CassandraClientImpl.class );
+    public ShardAllocator(
+        QakkaFig qakkaFig,
+        ActorSystemFig            actorSystemFig,
+        ShardSerialization        shardSerialization,
+        ShardCounterSerialization shardCounterSerialization,
+        MetricsService            metricsService,
+        CassandraClient           cassandraClient
+    ) {
+        this.qakkaFig = qakkaFig;
+        this.actorSystemFig = actorSystemFig;
+        this.shardSerialization = shardSerialization;
+        this.shardCounterSerialization = shardCounterSerialization;
+        this.metricsService = metricsService;
+        this.cassandraClient = cassandraClient;
+
+//        this.qakkaFig                  = injector.getInstance( QakkaFig.class );
+//        this.shardCounterSerialization = injector.getInstance( ShardCounterSerializationImpl.class );
+//        this.shardSerialization        = injector.getInstance( ShardSerializationImpl.class );
+//        this.actorSystemFig            = injector.getInstance( ActorSystemFig.class );
+//        this.metricsService            = injector.getInstance( MetricsService.class );
+//        this.cassandraClient           = injector.getInstance( CassandraClientImpl.class );
     }
 
 
@@ -106,7 +120,10 @@ public class ShardAllocator extends UntypedActor {
             }
 
             if (shard == null) {
-                logger.warn( "No shard found for {}, {}, {}", queueName, region, type );
+                shard = new Shard( queueName, actorSystemFig.getRegionLocal(),
+                    type, 1L, QakkaUtils.getTimeUuid());
+                shardSerialization.createShard( shard );
+
                 return;
             }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/fe08fa72/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
index af71247..9063242 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
@@ -23,10 +23,13 @@ import akka.actor.ActorRef;
 import akka.pattern.Patterns;
 import akka.util.Timeout;
 import com.datastax.driver.core.exceptions.InvalidQueryException;
+import com.google.inject.Guice;
 import com.google.inject.Inject;
+import com.google.inject.Injector;
 import com.google.inject.Singleton;
 import org.apache.usergrid.persistence.actorsystem.ActorSystemManager;
 import org.apache.usergrid.persistence.actorsystem.ClientActor;
+import org.apache.usergrid.persistence.actorsystem.GuiceActorProducer;
 import org.apache.usergrid.persistence.qakka.QakkaFig;
 import org.apache.usergrid.persistence.qakka.core.QueueManager;
 import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService;
@@ -53,23 +56,21 @@ public class DistributedQueueServiceImpl implements DistributedQueueService {
     private final ActorSystemManager actorSystemManager;
     private final QueueManager queueManager;
     private final QakkaFig qakkaFig;
-    private final MessageCounterSerialization messageCounterSerialization;
 
 
     @Inject
     public DistributedQueueServiceImpl(
+            Injector injector,
             ActorSystemManager actorSystemManager,
             QueueManager queueManager,
-            QakkaFig qakkaFig,
-            MessageCounterSerialization messageCounterSerialization ) {
+            QakkaFig qakkaFig
+            ) {
 
         this.actorSystemManager = actorSystemManager;
         this.queueManager = queueManager;
         this.qakkaFig = qakkaFig;
-        this.messageCounterSerialization = messageCounterSerialization;
-
-
 
+        GuiceActorProducer.INJECTOR = injector;
     }
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/fe08fa72/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueActorRouterProducer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueActorRouterProducer.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueActorRouterProducer.java
index 8c6adda..d74936b 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueActorRouterProducer.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueActorRouterProducer.java
@@ -73,7 +73,7 @@ public class QueueActorRouterProducer implements RouterProducer {
                 ClusterSingletonManagerSettings.create( system ).withRole( "io" );
 
         system.actorOf( ClusterSingletonManager.props(
-                Props.create( GuiceActorProducer.class, injector, QueueActorRouter.class ),
+                Props.create( GuiceActorProducer.class, QueueActorRouter.class ),
                 PoisonPill.getInstance(), settings ), "queueActorRouter" );
 
         ClusterSingletonProxySettings proxySettings =

http://git-wip-us.apache.org/repos/asf/usergrid/blob/fe08fa72/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueSenderRouterProducer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueSenderRouterProducer.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueSenderRouterProducer.java
index 885a559..4fa9048 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueSenderRouterProducer.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueSenderRouterProducer.java
@@ -73,7 +73,7 @@ public class QueueSenderRouterProducer implements RouterProducer {
                 ClusterSingletonManagerSettings.create( system ).withRole( "io" );
 
         system.actorOf( ClusterSingletonManager.props(
-                Props.create( GuiceActorProducer.class, injector, QueueSenderRouter.class ),
+                Props.create( GuiceActorProducer.class, QueueSenderRouter.class ),
                 PoisonPill.getInstance(), settings ), "queueSenderRouter" );
 
         ClusterSingletonProxySettings proxySettings =

http://git-wip-us.apache.org/repos/asf/usergrid/blob/fe08fa72/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueWriterRouterProducer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueWriterRouterProducer.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueWriterRouterProducer.java
index c8b5567..006f1a7 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueWriterRouterProducer.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/QueueWriterRouterProducer.java
@@ -73,7 +73,7 @@ public class QueueWriterRouterProducer implements RouterProducer {
                 ClusterSingletonManagerSettings.create( system ).withRole( "io" );
 
         system.actorOf( ClusterSingletonManager.props(
-                Props.create( GuiceActorProducer.class, injector, QueueWriterRouter.class ),
+                Props.create( GuiceActorProducer.class, QueueWriterRouter.class ),
                 PoisonPill.getInstance(), settings ), "queueWriterRouter" );
 
         ClusterSingletonProxySettings proxySettings =

http://git-wip-us.apache.org/repos/asf/usergrid/blob/fe08fa72/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java
index 02862c4..d9a2543 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java
@@ -36,6 +36,7 @@ import org.apache.usergrid.persistence.qakka.core.CassandraClient;
 import org.apache.usergrid.persistence.qakka.core.QakkaUtils;
 import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
 import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessageBody;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.MessageCounterSerialization;
 import org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization;
 import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard;
 import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardCounterSerialization;
@@ -59,6 +60,7 @@ public class QueueMessageSerializationImpl implements QueueMessageSerialization
     private final ActorSystemFig            actorSystemFig;
     private final ShardStrategy             shardStrategy;
     private final ShardCounterSerialization shardCounterSerialization;
+    private final MessageCounterSerialization messageCounterSerialization;
 
     public final static String COLUMN_QUEUE_NAME       = "queue_name";
     public final static String COLUMN_REGION           = "region";
@@ -114,14 +116,16 @@ public class QueueMessageSerializationImpl implements QueueMessageSerialization
             ActorSystemFig            actorSystemFig,
             ShardStrategy             shardStrategy,
             ShardCounterSerialization shardCounterSerialization,
+            MessageCounterSerialization messageCounterSerialization,
             CassandraClient           cassandraClient,
             QakkaFig                  qakkaFig
         ) {
-        this.cassandraConfig              = cassandraConfig;
-        this.actorSystemFig            = actorSystemFig;
-        this.shardStrategy             = shardStrategy;
-        this.shardCounterSerialization = shardCounterSerialization;
-        this.cassandraClient = cassandraClient;
+        this.cassandraConfig             = cassandraConfig;
+        this.actorSystemFig              = actorSystemFig;
+        this.shardStrategy               = shardStrategy;
+        this.shardCounterSerialization   = shardCounterSerialization;
+        this.messageCounterSerialization = messageCounterSerialization;
+        this.cassandraClient             = cassandraClient;
 
         this.maxTtl = qakkaFig.getMaxTtlSeconds();
     }
@@ -162,6 +166,8 @@ public class QueueMessageSerializationImpl implements QueueMessageSerialization
 
         shardCounterSerialization.incrementCounter( message.getQueueName(), shardType, message.getShardId(), 1 );
 
+        messageCounterSerialization.incrementCounter( message.getQueueName(), message.getType(), 1L );
+
         return queueMessageId;
     }
 
@@ -250,6 +256,8 @@ public class QueueMessageSerializationImpl implements QueueMessageSerialization
                 .and(queueMessageIdClause);
 
         cassandraClient.getQueueMessageSession().execute( delete );
+
+        messageCounterSerialization.decrementCounter( queueName, type, 1L );
     }
 
 


[5/5] usergrid git commit: Ensure correct keyspace names used

Posted by sn...@apache.org.
Ensure correct keyspace names used


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/fbce160a
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/fbce160a
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/fbce160a

Branch: refs/heads/usergrid-1318-queue
Commit: fbce160a1a87418ee6eb09c6183a9db8f35aec8c
Parents: 202d5be
Author: Dave Johnson <sn...@apache.org>
Authored: Wed Oct 5 17:43:19 2016 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Wed Oct 5 17:43:19 2016 -0400

----------------------------------------------------------------------
 .../persistence/core/CassandraConfig.java       |  28 +++
 .../persistence/core/CassandraConfigImpl.java   |  69 ++++++++
 .../core/datastax/impl/DataStaxClusterImpl.java |  57 ++++---
 .../core/astyanax/ColumnNameIteratorTest.java   |  65 +++++++
 .../MultiKeyColumnNameIteratorTest.java         |  66 +++++++
 .../astyanax/MultiRowColumnIteratorTest.java    |  67 ++++++++
 .../qakka/core/impl/QueueManagerImpl.java       |  13 +-
 .../qakka/api/QueueResourceTest.java            | 170 ++++++++++---------
 8 files changed, 421 insertions(+), 114 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/fbce160a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraConfig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraConfig.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraConfig.java
index f45ad43..499561e 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraConfig.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraConfig.java
@@ -20,6 +20,8 @@ package org.apache.usergrid.persistence.core;
 
 
 import com.netflix.astyanax.model.ConsistencyLevel;
+import org.apache.cassandra.db.marshal.AbstractCompositeType;
+import org.apache.log4j.lf5.viewer.categoryexplorer.CategoryPath;
 
 
 /**
@@ -77,4 +79,30 @@ public interface CassandraConfig {
     String getApplicationKeyspace();
 
     String getApplicationLocalKeyspace();
+
+    String getLocalDataCenter();
+
+    int getConnections();
+
+    int getTimeout();
+
+    int getPoolTimeout();
+
+    String getClusterName();
+
+    String getHosts();
+
+    String getVersion();
+
+    String getUsername();
+
+    String getPassword();
+
+    String getStrategy();
+
+    String getStrategyOptions();
+
+    String getStrategyLocal();
+
+    String getStrategyOptionsLocal();
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/fbce160a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraConfigImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraConfigImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraConfigImpl.java
index 77f7228..9cdec95 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraConfigImpl.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraConfigImpl.java
@@ -26,6 +26,7 @@ import java.beans.PropertyChangeListener;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.netflix.astyanax.model.ConsistencyLevel;
+import org.apache.log4j.lf5.viewer.categoryexplorer.CategoryPath;
 
 
 /**
@@ -35,6 +36,7 @@ import com.netflix.astyanax.model.ConsistencyLevel;
 @Singleton
 public class CassandraConfigImpl implements CassandraConfig {
 
+    private CassandraFig cassandraFig;
 
     private ConsistencyLevel readCl;
     private ConsistencyLevel writeCl;
@@ -54,6 +56,8 @@ public class CassandraConfigImpl implements CassandraConfig {
     @Inject
     public CassandraConfigImpl( final CassandraFig cassandraFig ) {
 
+        this.cassandraFig = cassandraFig;
+
         this.readCl = ConsistencyLevel.valueOf( cassandraFig.getAstyanaxReadCL() );
 
         this.writeCl = ConsistencyLevel.valueOf( cassandraFig.getAstyanaxWriteCL() );
@@ -154,4 +158,69 @@ public class CassandraConfigImpl implements CassandraConfig {
         return applicationLocalKeyspace;
     }
 
+    @Override
+    public String getLocalDataCenter() {
+        return cassandraFig.getLocalDataCenter();
+    }
+
+    @Override
+    public int getConnections() {
+        return cassandraFig.getConnections();
+    }
+
+    @Override
+    public int getTimeout() {
+        return cassandraFig.getTimeout();
+    }
+
+    @Override
+    public int getPoolTimeout() {
+        return cassandraFig.getPoolTimeout();
+    }
+
+    @Override
+    public String getClusterName() {
+        return cassandraFig.getClusterName();
+    }
+
+    @Override
+    public String getHosts() {
+        return cassandraFig.getHosts();
+    }
+
+    @Override
+    public String getVersion() {
+        return cassandraFig.getVersion();
+    }
+
+    @Override
+    public String getUsername() {
+        return cassandraFig.getUsername();
+    }
+
+    @Override
+    public String getPassword() {
+        return cassandraFig.getPassword();
+    }
+
+    @Override
+    public String getStrategy() {
+        return cassandraFig.getStrategyLocal();
+    }
+
+    @Override
+    public String getStrategyOptions() {
+        return cassandraFig.getStrategyOptions();
+    }
+
+    @Override
+    public String getStrategyLocal() {
+        return cassandraFig.getStrategyLocal();
+    }
+
+    @Override
+    public String getStrategyOptionsLocal() {
+        return cassandraFig.getStrategyOptionsLocal();
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/fbce160a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/impl/DataStaxClusterImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/impl/DataStaxClusterImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/impl/DataStaxClusterImpl.java
index c8ddf3e..67f6123 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/impl/DataStaxClusterImpl.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/impl/DataStaxClusterImpl.java
@@ -23,6 +23,7 @@ import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy;
 import com.datastax.driver.core.policies.LoadBalancingPolicy;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
+import org.apache.usergrid.persistence.core.CassandraConfig;
 import org.apache.usergrid.persistence.core.CassandraFig;
 import org.apache.usergrid.persistence.core.datastax.CQLUtils;
 import org.apache.usergrid.persistence.core.datastax.DataStaxCluster;
@@ -36,15 +37,15 @@ public class DataStaxClusterImpl implements DataStaxCluster {
     private static final Logger logger = LoggerFactory.getLogger( DataStaxClusterImpl.class );
 
 
-    private final CassandraFig cassandraFig;
+    private final CassandraConfig cassandraConfig;
     private Cluster cluster;
     private Session applicationSession;
     private Session queueMessageSession;
     private Session clusterSession;
 
     @Inject
-    public DataStaxClusterImpl(final CassandraFig cassandraFig ) throws Exception {
-        this.cassandraFig = cassandraFig;
+    public DataStaxClusterImpl(final CassandraConfig cassandraFig ) throws Exception {
+        this.cassandraConfig = cassandraFig;
         this.cluster = buildCluster();
 
         // always initialize the keyspaces
@@ -85,7 +86,7 @@ public class DataStaxClusterImpl implements DataStaxCluster {
 
         // always grab cluster from getCluster() in case it was prematurely closed
         if ( applicationSession == null || applicationSession.isClosed() ){
-            applicationSession = getCluster().connect( CQLUtils.quote(cassandraFig.getApplicationKeyspace() ) );
+            applicationSession = getCluster().connect( CQLUtils.quote( cassandraConfig.getApplicationKeyspace() ) );
         }
         return applicationSession;
     }
@@ -96,7 +97,7 @@ public class DataStaxClusterImpl implements DataStaxCluster {
 
         // always grab cluster from getCluster() in case it was prematurely closed
         if ( queueMessageSession == null || queueMessageSession.isClosed() ){
-            queueMessageSession = getCluster().connect( CQLUtils.quote(cassandraFig.getApplicationLocalKeyspace() ) );
+            queueMessageSession = getCluster().connect( CQLUtils.quote( cassandraConfig.getApplicationLocalKeyspace() ) );
         }
         return queueMessageSession;
     }
@@ -110,7 +111,7 @@ public class DataStaxClusterImpl implements DataStaxCluster {
     public void createApplicationKeyspace() throws Exception {
 
         boolean exists = getClusterSession().getCluster().getMetadata()
-            .getKeyspace(CQLUtils.quote(cassandraFig.getApplicationKeyspace())) != null;
+            .getKeyspace(CQLUtils.quote( cassandraConfig.getApplicationKeyspace())) != null;
 
         if(exists){
             return;
@@ -118,14 +119,14 @@ public class DataStaxClusterImpl implements DataStaxCluster {
 
         final String createApplicationKeyspace = String.format(
             "CREATE KEYSPACE IF NOT EXISTS %s WITH replication = %s",
-            CQLUtils.quote(cassandraFig.getApplicationKeyspace()),
-            CQLUtils.getFormattedReplication(cassandraFig.getStrategy(), cassandraFig.getStrategyOptions())
+            CQLUtils.quote( cassandraConfig.getApplicationKeyspace()),
+            CQLUtils.getFormattedReplication( cassandraConfig.getStrategy(), cassandraConfig.getStrategyOptions())
 
         );
 
         getClusterSession().execute(createApplicationKeyspace);
 
-        logger.info("Created keyspace: {}", cassandraFig.getApplicationKeyspace());
+        logger.info("Created keyspace: {}", cassandraConfig.getApplicationKeyspace());
 
     }
 
@@ -139,7 +140,7 @@ public class DataStaxClusterImpl implements DataStaxCluster {
     public void createApplicationLocalKeyspace() throws Exception {
 
         boolean exists = getClusterSession().getCluster().getMetadata()
-            .getKeyspace(CQLUtils.quote(cassandraFig.getApplicationLocalKeyspace())) != null;
+            .getKeyspace(CQLUtils.quote( cassandraConfig.getApplicationLocalKeyspace())) != null;
 
         if (exists) {
             return;
@@ -147,14 +148,14 @@ public class DataStaxClusterImpl implements DataStaxCluster {
 
         final String createQueueMessageKeyspace = String.format(
             "CREATE KEYSPACE IF NOT EXISTS %s WITH replication = %s",
-            CQLUtils.quote(cassandraFig.getApplicationLocalKeyspace()),
-            CQLUtils.getFormattedReplication(cassandraFig.getStrategyLocal(), cassandraFig.getStrategyOptionsLocal())
+            CQLUtils.quote( cassandraConfig.getApplicationLocalKeyspace()),
+            CQLUtils.getFormattedReplication( cassandraConfig.getStrategyLocal(), cassandraConfig.getStrategyOptionsLocal())
 
         );
 
         getClusterSession().execute(createQueueMessageKeyspace);
 
-        logger.info("Created keyspace: {}", cassandraFig.getApplicationLocalKeyspace());
+        logger.info("Created keyspace: {}", cassandraConfig.getApplicationLocalKeyspace());
 
     }
 
@@ -184,7 +185,7 @@ public class DataStaxClusterImpl implements DataStaxCluster {
 
         ConsistencyLevel defaultConsistencyLevel;
         try {
-            defaultConsistencyLevel = ConsistencyLevel.valueOf(cassandraFig.getReadCl());
+            defaultConsistencyLevel = cassandraConfig.getDataStaxReadCl();
         } catch (IllegalArgumentException e){
 
             logger.error("Unable to parse provided consistency level in property: {}, defaulting to: {}",
@@ -196,44 +197,44 @@ public class DataStaxClusterImpl implements DataStaxCluster {
 
 
         LoadBalancingPolicy loadBalancingPolicy;
-        if( !cassandraFig.getLocalDataCenter().isEmpty() ){
+        if( !cassandraConfig.getLocalDataCenter().isEmpty() ){
 
             loadBalancingPolicy = new DCAwareRoundRobinPolicy.Builder()
-                .withLocalDc( cassandraFig.getLocalDataCenter() ).build();
+                .withLocalDc( cassandraConfig.getLocalDataCenter() ).build();
         }else{
             loadBalancingPolicy = new DCAwareRoundRobinPolicy.Builder().build();
         }
 
         final PoolingOptions poolingOptions = new PoolingOptions()
-            .setCoreConnectionsPerHost(HostDistance.LOCAL, cassandraFig.getConnections() / 2)
-            .setMaxConnectionsPerHost(HostDistance.LOCAL, cassandraFig.getConnections())
-            .setIdleTimeoutSeconds(cassandraFig.getTimeout() / 1000)
-            .setPoolTimeoutMillis(cassandraFig.getPoolTimeout());
+            .setCoreConnectionsPerHost(HostDistance.LOCAL, cassandraConfig.getConnections() / 2)
+            .setMaxConnectionsPerHost(HostDistance.LOCAL, cassandraConfig.getConnections())
+            .setIdleTimeoutSeconds( cassandraConfig.getTimeout() / 1000)
+            .setPoolTimeoutMillis( cassandraConfig.getPoolTimeout());
 
         // purposely add a couple seconds to the driver's lower level socket timeouts vs. cassandra timeouts
         final SocketOptions socketOptions = new SocketOptions()
-            .setConnectTimeoutMillis(cassandraFig.getPoolTimeout() + 2000)
-            .setReadTimeoutMillis(cassandraFig.getTimeout() + 2000);
+            .setConnectTimeoutMillis( cassandraConfig.getPoolTimeout() + 2000)
+            .setReadTimeoutMillis( cassandraConfig.getTimeout() + 2000);
 
         final QueryOptions queryOptions = new QueryOptions()
             .setConsistencyLevel(defaultConsistencyLevel);
 
         Cluster.Builder datastaxCluster = Cluster.builder()
-            .withClusterName(cassandraFig.getClusterName())
-            .addContactPoints(cassandraFig.getHosts().split(","))
+            .withClusterName( cassandraConfig.getClusterName())
+            .addContactPoints( cassandraConfig.getHosts().split(","))
             .withMaxSchemaAgreementWaitSeconds(30)
             .withCompression(ProtocolOptions.Compression.LZ4)
             .withLoadBalancingPolicy(loadBalancingPolicy)
             .withPoolingOptions(poolingOptions)
             .withQueryOptions(queryOptions)
             .withSocketOptions(socketOptions)
-            .withProtocolVersion(getProtocolVersion(cassandraFig.getVersion()));
+            .withProtocolVersion(getProtocolVersion( cassandraConfig.getVersion()));
 
         // only add auth credentials if they were provided
-        if ( !cassandraFig.getUsername().isEmpty() && !cassandraFig.getPassword().isEmpty() ){
+        if ( !cassandraConfig.getUsername().isEmpty() && !cassandraConfig.getPassword().isEmpty() ){
             datastaxCluster.withCredentials(
-                cassandraFig.getUsername(),
-                cassandraFig.getPassword()
+                cassandraConfig.getUsername(),
+                cassandraConfig.getPassword()
             );
         }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/fbce160a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/ColumnNameIteratorTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/ColumnNameIteratorTest.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/ColumnNameIteratorTest.java
index c45fdd1..00856a5 100644
--- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/ColumnNameIteratorTest.java
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/ColumnNameIteratorTest.java
@@ -126,6 +126,71 @@ public class ColumnNameIteratorTest {
                 return cassandraFig.getApplicationLocalKeyspace();
             }
 
+            @Override
+            public String getLocalDataCenter() {
+                return cassandraFig.getLocalDataCenter();
+            }
+
+            @Override
+            public int getConnections() {
+                return cassandraFig.getConnections();
+            }
+
+            @Override
+            public int getTimeout() {
+                return cassandraFig.getTimeout();
+            }
+
+            @Override
+            public int getPoolTimeout() {
+                return cassandraFig.getPoolTimeout();
+            }
+
+            @Override
+            public String getClusterName() {
+                return cassandraFig.getClusterName();
+            }
+
+            @Override
+            public String getHosts() {
+                return cassandraFig.getHosts();
+            }
+
+            @Override
+            public String getVersion() {
+                return cassandraFig.getVersion();
+            }
+
+            @Override
+            public String getUsername() {
+                return cassandraFig.getUsername();
+            }
+
+            @Override
+            public String getPassword() {
+                return cassandraFig.getPassword();
+            }
+
+            @Override
+            public String getStrategy() {
+                return cassandraFig.getStrategy();
+            }
+
+            @Override
+            public String getStrategyOptions() {
+                return cassandraFig.getStrategyOptions();
+            }
+
+            @Override
+            public String getStrategyLocal() {
+                return cassandraFig.getStrategyLocal();
+            }
+
+            @Override
+            public String getStrategyOptionsLocal() {
+                return cassandraFig.getStrategyOptionsLocal();
+            }
+
         };
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/fbce160a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIteratorTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIteratorTest.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIteratorTest.java
index 5cdf0e1..b12ea6f 100644
--- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIteratorTest.java
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIteratorTest.java
@@ -129,6 +129,72 @@ public class MultiKeyColumnNameIteratorTest {
                 return cassandraFig.getApplicationLocalKeyspace();
             }
 
+
+            @Override
+            public String getLocalDataCenter() {
+                return cassandraFig.getLocalDataCenter();
+            }
+
+            @Override
+            public int getConnections() {
+                return cassandraFig.getConnections();
+            }
+
+            @Override
+            public int getTimeout() {
+                return cassandraFig.getTimeout();
+            }
+
+            @Override
+            public int getPoolTimeout() {
+                return cassandraFig.getPoolTimeout();
+            }
+
+            @Override
+            public String getClusterName() {
+                return cassandraFig.getClusterName();
+            }
+
+            @Override
+            public String getHosts() {
+                return cassandraFig.getHosts();
+            }
+
+            @Override
+            public String getVersion() {
+                return cassandraFig.getVersion();
+            }
+
+            @Override
+            public String getUsername() {
+                return cassandraFig.getUsername();
+            }
+
+            @Override
+            public String getPassword() {
+                return cassandraFig.getPassword();
+            }
+
+            @Override
+            public String getStrategy() {
+                return cassandraFig.getStrategy();
+            }
+
+            @Override
+            public String getStrategyOptions() {
+                return cassandraFig.getStrategyOptions();
+            }
+
+            @Override
+            public String getStrategyLocal() {
+                return cassandraFig.getStrategyLocal();
+            }
+
+            @Override
+            public String getStrategyOptionsLocal() {
+                return cassandraFig.getStrategyOptionsLocal();
+            }
+
         };
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/fbce160a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIteratorTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIteratorTest.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIteratorTest.java
index 6331941..e509c45 100644
--- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIteratorTest.java
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIteratorTest.java
@@ -132,6 +132,73 @@ public class MultiRowColumnIteratorTest {
             public String getApplicationLocalKeyspace() {
                 return cassandraFig.getApplicationLocalKeyspace();
             }
+
+
+            @Override
+            public String getLocalDataCenter() {
+                return cassandraFig.getLocalDataCenter();
+            }
+
+            @Override
+            public int getConnections() {
+                return cassandraFig.getConnections();
+            }
+
+            @Override
+            public int getTimeout() {
+                return cassandraFig.getTimeout();
+            }
+
+            @Override
+            public int getPoolTimeout() {
+                return cassandraFig.getPoolTimeout();
+            }
+
+            @Override
+            public String getClusterName() {
+                return cassandraFig.getClusterName();
+            }
+
+            @Override
+            public String getHosts() {
+                return cassandraFig.getHosts();
+            }
+
+            @Override
+            public String getVersion() {
+                return cassandraFig.getVersion();
+            }
+
+            @Override
+            public String getUsername() {
+                return cassandraFig.getUsername();
+            }
+
+            @Override
+            public String getPassword() {
+                return cassandraFig.getPassword();
+            }
+
+            @Override
+            public String getStrategy() {
+                return cassandraFig.getStrategy();
+            }
+
+            @Override
+            public String getStrategyOptions() {
+                return cassandraFig.getStrategyOptions();
+            }
+
+            @Override
+            public String getStrategyLocal() {
+                return cassandraFig.getStrategyLocal();
+            }
+
+            @Override
+            public String getStrategyOptionsLocal() {
+                return cassandraFig.getStrategyOptionsLocal();
+            }
+
         };
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/fbce160a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueManagerImpl.java
index a8139a1..d51fe2d 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueManagerImpl.java
@@ -81,14 +81,13 @@ public class QueueManagerImpl implements QueueManager {
             }
         }
 
-        for ( String region : regions ) {
+        Shard available = new Shard( queue.getName(), actorSystemFig.getRegionLocal(),
+            Shard.Type.DEFAULT, 1L, QakkaUtils.getTimeUuid());
+        shardSerialization.createShard( available );
 
-            Shard available = new Shard( queue.getName(), region, Shard.Type.DEFAULT, 1L, QakkaUtils.getTimeUuid());
-            shardSerialization.createShard( available );
-
-            Shard inflight = new Shard( queue.getName(), region, Shard.Type.INFLIGHT, 1L, QakkaUtils.getTimeUuid());
-            shardSerialization.createShard( inflight );
-        }
+        Shard inflight = new Shard( queue.getName(), actorSystemFig.getRegionLocal(),
+            Shard.Type.INFLIGHT, 1L, QakkaUtils.getTimeUuid());
+        shardSerialization.createShard( inflight );
 
         // only write the existence of a queue to the database if its dependent initial shards have been written
         queueSerialization.writeQueue(queue.toDatabaseQueue());

http://git-wip-us.apache.org/repos/asf/usergrid/blob/fbce160a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/api/QueueResourceTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/api/QueueResourceTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/api/QueueResourceTest.java
index d723e97..620e946 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/api/QueueResourceTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/api/QueueResourceTest.java
@@ -155,39 +155,43 @@ public class QueueResourceTest extends AbstractRestTest {
         }};
         target( "queues" ).request().post( Entity.entity( queueMap, MediaType.APPLICATION_JSON_TYPE ) );
 
-        // send some messages
+        try {
 
-        ObjectMapper mapper = new ObjectMapper();
+            // send some messages
+
+            ObjectMapper mapper = new ObjectMapper();
 
-        int numMessages = 100;
-        for (int i = 0; i < numMessages; i++) {
+            int numMessages = 100;
+            for (int i = 0; i < numMessages; i++) {
 
-            final int number = i;
-            Map<String, Object> messageMap = new HashMap<String, Object>() {{
-                put( "message", "this is message #" + number );
-                put( "valid", true );
-            }};
-            String body = mapper.writeValueAsString( messageMap );
+                final int number = i;
+                Map<String, Object> messageMap = new HashMap<String, Object>() {{
+                    put( "message", "this is message #" + number );
+                    put( "valid", true );
+                }};
+                String body = mapper.writeValueAsString( messageMap );
 
-            Response response;
-            if ( asJson ) {
-                response = target( "queues" ).path( queueName ).path( "messages" )
+                Response response;
+                if (asJson) {
+                    response = target( "queues" ).path( queueName ).path( "messages" )
                         .request().post( Entity.entity( body, MediaType.APPLICATION_JSON ) );
-            } else {
-                response = target( "queues" ).path( queueName ).path( "messages" )
+                } else {
+                    response = target( "queues" ).path( queueName ).path( "messages" )
                         .queryParam( "contentType", MediaType.APPLICATION_JSON )
                         .request().post( Entity.entity( body, MediaType.APPLICATION_OCTET_STREAM ) );
-            }
+                }
 
-            Assert.assertEquals( 200, response.getStatus() );
-        }
+                Assert.assertEquals( 200, response.getStatus() );
+            }
 
-        // get all messages, checking for dups
+            // get all messages, checking for dups
 
-        checkJsonMessages( queueName, numMessages );
+            checkJsonMessages( queueName, numMessages );
 
-        Response response = target( "queues" ).path( queueName ).queryParam( "confirm", true ).request().delete();
-        Assert.assertEquals( 200, response.getStatus() );
+        } finally {
+            Response response = target( "queues" ).path( queueName ).queryParam( "confirm", true ).request().delete();
+            Assert.assertEquals( 200, response.getStatus() );
+        }
     }
 
 
@@ -244,28 +248,32 @@ public class QueueResourceTest extends AbstractRestTest {
         }};
         target( "queues" ).request().post( Entity.entity( queueMap, MediaType.APPLICATION_JSON_TYPE ) );
 
-        // send messages each with image/jpg payload
+        try {
 
-        InputStream is = getClass().getResourceAsStream("/qakka-duck.jpg");
-        byte[] bytes = ByteStreams.toByteArray( is );
+            // send messages each with image/jpg payload
 
-        int numMessages = 100;
-        for (int i = 0; i < numMessages; i++) {
+            InputStream is = getClass().getResourceAsStream( "/qakka-duck.jpg" );
+            byte[] bytes = ByteStreams.toByteArray( is );
 
-            Response response = target( "queues" ).path( queueName ).path( "messages" )
+            int numMessages = 100;
+            for (int i = 0; i < numMessages; i++) {
+
+                Response response = target( "queues" ).path( queueName ).path( "messages" )
                     .queryParam( "contentType", "image/jpg" )
                     .request()
-                    .post( Entity.entity( bytes, MediaType.APPLICATION_OCTET_STREAM ));
+                    .post( Entity.entity( bytes, MediaType.APPLICATION_OCTET_STREAM ) );
 
-            Assert.assertEquals( 200, response.getStatus() );
-        }
+                Assert.assertEquals( 200, response.getStatus() );
+            }
 
-        // get all messages, checking for dups
+            // get all messages, checking for dups
 
-        checkBinaryMessages( queueName, numMessages );
+            checkBinaryMessages( queueName, numMessages );
 
-        Response response = target( "queues" ).path( queueName ).queryParam( "confirm", true ).request().delete();
-        Assert.assertEquals( 200, response.getStatus() );
+        } finally {
+            Response response = target( "queues" ).path( queueName ).queryParam( "confirm", true ).request().delete();
+            Assert.assertEquals( 200, response.getStatus() );
+        }
     }
 
 
@@ -320,71 +328,75 @@ public class QueueResourceTest extends AbstractRestTest {
         Map<String, Object> queueMap = new HashMap<String, Object>() {{ put("name", queueName); }};
         target("queues").request().post( Entity.entity( queueMap, MediaType.APPLICATION_JSON_TYPE));
 
-        // send some messages
+        try {
 
-        ObjectMapper mapper = new ObjectMapper();
+            // send some messages
 
-        int numMessages = 100;
-        for ( int i=0; i<numMessages; i++ ) {
+            ObjectMapper mapper = new ObjectMapper();
 
-            final int number = i;
-            Map<String, Object> messageMap = new HashMap<String, Object>() {{
-                put("message", "this is message #" + number);
-                put("valid", true );
-            }};
-            String body = mapper.writeValueAsString( messageMap );
+            int numMessages = 100;
+            for (int i = 0; i < numMessages; i++) {
 
-            Response response = target("queues").path( queueName ).path( "messages" )
-                    .request().post( Entity.entity( body, MediaType.APPLICATION_JSON ));
+                final int number = i;
+                Map<String, Object> messageMap = new HashMap<String, Object>() {{
+                    put( "message", "this is message #" + number );
+                    put( "valid", true );
+                }};
+                String body = mapper.writeValueAsString( messageMap );
 
-            Assert.assertEquals( 200, response.getStatus() );
-        }
+                Response response = target( "queues" ).path( queueName ).path( "messages" )
+                    .request().post( Entity.entity( body, MediaType.APPLICATION_JSON ) );
 
-        // get all messages, checking for dups
+                Assert.assertEquals( 200, response.getStatus() );
+            }
 
-        Set<UUID> messageIds = checkJsonMessages( queueName, numMessages );
+            // get all messages, checking for dups
 
-        // there should be no more messages available
+            Set<UUID> messageIds = checkJsonMessages( queueName, numMessages );
 
-        Response response = target( "queues" ).path( queueName ).path( "messages" ).request().get();
-        ApiResponse apiResponse = response.readEntity( ApiResponse.class );
-        Assert.assertNotNull( apiResponse.getQueueMessages() );
-        Assert.assertTrue( apiResponse.getQueueMessages().isEmpty() );
+            // there should be no more messages available
+
+            Response response = target( "queues" ).path( queueName ).path( "messages" ).request().get();
+            ApiResponse apiResponse = response.readEntity( ApiResponse.class );
+            Assert.assertNotNull( apiResponse.getQueueMessages() );
+            Assert.assertTrue( apiResponse.getQueueMessages().isEmpty() );
 
-        // ack half of the messages
+            // ack half of the messages
 
-        int count = 0;
-        Set<UUID> ackedIds = new HashSet<>();
-        for ( UUID queueMessageId : messageIds ) {
-            response = target( "queues" )
+            int count = 0;
+            Set<UUID> ackedIds = new HashSet<>();
+            for (UUID queueMessageId : messageIds) {
+                response = target( "queues" )
                     .path( queueName ).path( "messages" ).path( queueMessageId.toString() ).request().delete();
-            Assert.assertEquals( 200, response.getStatus() );
-            ackedIds.add( queueMessageId );
-            if ( ++count >= numMessages/2 ) {
-                break;
+                Assert.assertEquals( 200, response.getStatus() );
+                ackedIds.add( queueMessageId );
+                if (++count >= numMessages / 2) {
+                    break;
+                }
             }
-        }
-        messageIds.removeAll( ackedIds );
+            messageIds.removeAll( ackedIds );
 
-        // wait for remaining of the messages to timeout
+            // wait for remaining of the messages to timeout
 
-        QakkaFig qakkaFig = StartupListener.INJECTOR.getInstance( QakkaFig.class );
-        Thread.sleep( 2*qakkaFig.getQueueTimeoutSeconds() * 1000 );
+            QakkaFig qakkaFig = StartupListener.INJECTOR.getInstance( QakkaFig.class );
+            Thread.sleep( 2 * qakkaFig.getQueueTimeoutSeconds() * 1000 );
 
-        // now, the remaining messages cannot be acked because they timed out
+            // now, the remaining messages cannot be acked because they timed out
 
-        for ( UUID queueMessageId : messageIds ) {
-            response = target( "queues" )
+            for (UUID queueMessageId : messageIds) {
+                response = target( "queues" )
                     .path( queueName ).path( "messages" ).path( queueMessageId.toString() ).request().delete();
-            Assert.assertEquals( 400, response.getStatus() );
-        }
+                Assert.assertEquals( 400, response.getStatus() );
+            }
 
-        // and, those same messages should be available again in the queue
+            // and, those same messages should be available again in the queue
 
-        checkJsonMessages( queueName, numMessages/2 );
+            checkJsonMessages( queueName, numMessages / 2 );
 
-        response = target( "queues" ).path( queueName ).queryParam( "confirm", true ).request().delete();
-        Assert.assertEquals( 200, response.getStatus() );
+        } finally {
+            Response response = target( "queues" ).path( queueName ).queryParam( "confirm", true ).request().delete();
+            Assert.assertEquals( 200, response.getStatus() );
+        }
     }
 
 


[2/5] usergrid git commit: Ensure that injector is not used in Akka constructors to avoid "cannot serialize injector" errors

Posted by sn...@apache.org.
Ensure that injector is not used in Akka constructors to avoid "cannot serialize injector" errors


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/d2153285
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/d2153285
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/d2153285

Branch: refs/heads/usergrid-1318-queue
Commit: d2153285dae4c9257f3b62ab533186f31f770926
Parents: fe08fa7
Author: Dave Johnson <sn...@apache.org>
Authored: Wed Oct 5 17:41:26 2016 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Wed Oct 5 17:41:26 2016 -0400

----------------------------------------------------------------------
 .../persistence/actorsystem/GuiceActorProducer.java |  8 ++++----
 .../uniquevalues/UniqueValuesServiceImpl.java       |  2 +-
 .../distributed/actors/QueueTimeouterTest.java      | 16 +++++++++-------
 .../distributed/actors/ShardAllocatorTest.java      |  4 ++--
 4 files changed, 16 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/d2153285/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/GuiceActorProducer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/GuiceActorProducer.java b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/GuiceActorProducer.java
index 9304c4c..d73f760 100644
--- a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/GuiceActorProducer.java
+++ b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/GuiceActorProducer.java
@@ -25,11 +25,11 @@ import com.google.inject.Injector;
 
 public class GuiceActorProducer implements IndirectActorProducer {
 
-    final Injector injector;
+    public static Injector INJECTOR = null;
+
     final Class<? extends Actor> actorClass;
 
-    public GuiceActorProducer(Injector injector, Class<? extends Actor> actorClass) {
-        this.injector = injector;
+    public GuiceActorProducer(Class<? extends Actor> actorClass) {
         this.actorClass = actorClass;
     }
 
@@ -40,7 +40,7 @@ public class GuiceActorProducer implements IndirectActorProducer {
 
     @Override
     public Actor produce() {
-        return injector.getInstance( actorClass );
+        return INJECTOR.getInstance( actorClass );
     }
 }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d2153285/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java
index 8bdb02c..08de853 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java
@@ -292,7 +292,7 @@ public class UniqueValuesServiceImpl implements UniqueValuesService {
             ClusterSingletonManagerSettings.create( system ).withRole("io");
 
         system.actorOf( ClusterSingletonManager.props(
-            Props.create( GuiceActorProducer.class, injector, UniqueValuesRouter.class ),
+            Props.create( GuiceActorProducer.class, UniqueValuesRouter.class ),
             PoisonPill.getInstance(), settings ), "uvRouter" );
 
         ClusterSingletonProxySettings proxySettings =

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d2153285/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouterTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouterTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouterTest.java
index 3079773..3bf87a6 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouterTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouterTest.java
@@ -31,6 +31,7 @@ import org.apache.usergrid.persistence.qakka.App;
 import org.apache.usergrid.persistence.qakka.QakkaFig;
 import org.apache.usergrid.persistence.qakka.core.CassandraClient;
 import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl;
+import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService;
 import org.apache.usergrid.persistence.qakka.distributed.messages.QueueTimeoutRequest;
 import org.apache.usergrid.persistence.qakka.serialization.MultiShardMessageIterator;
 import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard;
@@ -58,14 +59,15 @@ public class QueueTimeouterTest extends AbstractTest {
     @Test
     public void testBasicOperation() throws Exception {
 
-        CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class );
-
         Injector injector = getInjector();
 
-        QakkaFig qakkaFig             = getInjector().getInstance( QakkaFig.class );
-        ActorSystemFig actorSystemFig = getInjector().getInstance( ActorSystemFig.class );
-        QueueMessageSerialization qms = getInjector().getInstance( QueueMessageSerialization.class );
-        ShardSerialization shardSerialization = getInjector().getInstance( ShardSerialization.class );
+        injector.getInstance( DistributedQueueService.class ); // init the INJECTOR
+
+        CassandraClient cassandraClient = injector.getInstance( CassandraClientImpl.class );
+        QakkaFig qakkaFig             = injector.getInstance( QakkaFig.class );
+        ActorSystemFig actorSystemFig = injector.getInstance( ActorSystemFig.class );
+        QueueMessageSerialization qms = injector.getInstance( QueueMessageSerialization.class );
+        ShardSerialization shardSerialization = injector.getInstance( ShardSerialization.class );
 
         // create records in inflight table, with some being old enough to time out
 
@@ -113,7 +115,7 @@ public class QueueTimeouterTest extends AbstractTest {
 
         ActorSystem system = ActorSystem.create("Test-" + queueName);
         ActorRef timeouterRef = system.actorOf( Props.create(
-            GuiceActorProducer.class, injector, QueueTimeouter.class), "timeouter");
+            GuiceActorProducer.class, QueueTimeouter.class), "timeouter");
         QueueTimeoutRequest qtr = new QueueTimeoutRequest( queueName );
         timeouterRef.tell( qtr, null ); // tell sends message, returns immediately
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d2153285/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java
index c6831b7..919673c 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java
@@ -69,7 +69,7 @@ public class ShardAllocatorTest extends AbstractTest {
 
         CassandraClient cassandraClient = injector.getInstance( CassandraClientImpl.class );
 
-        injector.getInstance( App.class ); // init the INJECTOR
+        injector.getInstance( DistributedQueueService.class ); // init the INJECTOR
 
         ShardSerialization shardSer = injector.getInstance( ShardSerialization.class );
         QakkaFig qakkaFig           = injector.getInstance( QakkaFig.class );
@@ -113,7 +113,7 @@ public class ShardAllocatorTest extends AbstractTest {
 
         ActorSystem system = ActorSystem.create("Test-" + queueName);
         ActorRef shardAllocRef = system.actorOf( Props.create(
-            GuiceActorProducer.class, injector, ShardAllocator.class), "shardallocator");
+            GuiceActorProducer.class, ShardAllocator.class), "shardallocator");
 
         ShardCheckRequest checkRequest = new ShardCheckRequest( queueName );
         shardAllocRef.tell( checkRequest, null ); // tell sends message, returns immediately


[3/5] usergrid git commit: Logging improvements

Posted by sn...@apache.org.
Logging improvements


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/e3285843
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/e3285843
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/e3285843

Branch: refs/heads/usergrid-1318-queue
Commit: e3285843260c361b3126c5d235068d38802ff164
Parents: d215328
Author: Dave Johnson <sn...@apache.org>
Authored: Wed Oct 5 17:42:21 2016 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Wed Oct 5 17:42:21 2016 -0400

----------------------------------------------------------------------
 .../persistence/qakka/MetricsService.java       | 20 ++++-----
 .../qakka/core/impl/InMemoryQueue.java          |  2 +-
 .../distributed/actors/QueueActorHelper.java    | 37 +----------------
 .../queue/LegacyQueueManagerTest.java           | 43 ++++++++++----------
 4 files changed, 33 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/e3285843/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/MetricsService.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/MetricsService.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/MetricsService.java
index 378ba0d..b032be6 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/MetricsService.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/MetricsService.java
@@ -24,16 +24,16 @@ import com.codahale.metrics.MetricRegistry;
 
 public interface MetricsService {
 
-    String SEND_TIME_TOTAL  = "org.apache.usergrid.persistence.qakka.send.time.total";
-    String SEND_TIME_SEND   = "org.apache.usergrid.persistence.qakka.send.time.send";
-    String SEND_TIME_WRITE  = "org.apache.usergrid.persistence.qakka.send.time.write";
-    String GET_TIME_TOTAL   = "org.apache.usergrid.persistence.qakka.get.time.total";
-    String GET_TIME_GET     = "org.apache.usergrid.persistence.qakka.get.time.get";
-    String ACK_TIME_TOTAL   = "org.apache.usergrid.persistence.qakka.ack.time.total";
-    String ACK_TIME_ACK     = "org.apache.usergrid.persistence.qakka.ack.time.ack";
-    String TIMEOUT_TIME     = "org.apache.usergrid.persistence.qakka.timeout.time";
-    String REFRESH_TIME     = "org.apache.usergrid.persistence.qakka.timeout.time";
-    String ALLOCATE_TIME    = "org.apache.usergrid.persistence.qakka.allocate.time";
+    String SEND_TIME_TOTAL  = "qakka.send.time.total";
+    String SEND_TIME_SEND   = "qakka.send.time.send";
+    String SEND_TIME_WRITE  = "qakka.send.time.write";
+    String GET_TIME_TOTAL   = "qakka.get.time.total";
+    String GET_TIME_GET     = "qakka.get.time.get";
+    String ACK_TIME_TOTAL   = "qakka.ack.time.total";
+    String ACK_TIME_ACK     = "qakka.ack.time.ack";
+    String TIMEOUT_TIME     = "qakka.timeout.time";
+    String REFRESH_TIME     = "qakka.refresh.time";
+    String ALLOCATE_TIME    = "qakka.allocate.time";
 
     MetricRegistry getMetricRegistry();
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/e3285843/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java
index e70ec75..6a26483 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java
@@ -65,7 +65,7 @@ public class InMemoryQueue {
         } else {
             if ( databaseQueueMessage.getQueueMessageId().timestamp() > newest.timestamp() ) {
                 newest = databaseQueueMessage.getQueueMessageId();
-                logger.debug("New newest for queue {} is {}", queueName, newest.timestamp());
+                //logger.debug("New newest for queue {} is {}", queueName, newest.timestamp());
             }
         }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/e3285843/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java
index 0b31e16..e3996c5 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java
@@ -119,7 +119,7 @@ public class QueueActorHelper {
             }
         }
 
-        //logger.debug("{} returning {} for queue {}", this, queueMessages.size(), queueName);
+        logger.debug("{} returning {} for queue {}", this, queueMessages.size(), queueName);
         return queueMessages;
 
     }
@@ -251,12 +251,6 @@ public class QueueActorHelper {
 
                 UUID since = inMemoryQueue.getNewest( queueName );
 
-//                if ( since != null ) {
-//                    logger.debug( "Loading queue {} messages newer than {}", queueName, since.timestamp() );
-//                } else {
-//                    logger.debug( "Loading queue {} messages newer than [null]", queueName );
-//                }
-
                 String region = actorSystemFig.getRegionLocal();
                 MultiShardMessageIterator multiShardIterator = new MultiShardMessageIterator(
                     cassandraClient, queueName, region, DatabaseQueueMessage.Type.DEFAULT,
@@ -271,35 +265,6 @@ public class QueueActorHelper {
                     count++;
                 }
 
-//                long runs = runCount.incrementAndGet();
-//                long readCount = totalRead.addAndGet( count );
-//
-//                if ( logger.isDebugEnabled() && runs % 100 == 0 ) {
-//
-//                    final DecimalFormat format = new DecimalFormat("##.###");
-//                    final long nano = 1000000000;
-//                    Timer t = metricsService.getMetricRegistry().timer( MetricsService.REFRESH_TIME );
-//
-//                    logger.debug("QueueRefresher for queue '{}' stats:\n" +
-//                            "   Num runs={}\n" +
-//                            "   Read count={}\n" +
-//                            "   Mean={}\n" +
-//                            "   One min rate={}\n" +
-//                            "   Five min rate={}\n" +
-//                            "   Snapshot mean={}\n" +
-//                            "   Snapshot min={}\n" +
-//                            "   Snapshot max={}",
-//                        queueName,
-//                        t.getCount(),
-//                        readCount,
-//                        format.format( t.getMeanRate() ),
-//                        format.format( t.getOneMinuteRate() ),
-//                        format.format( t.getFiveMinuteRate() ),
-//                        format.format( t.getSnapshot().getMean() / nano ),
-//                        format.format( (double) t.getSnapshot().getMin() / nano ),
-//                        format.format( (double) t.getSnapshot().getMax() / nano ) );
-//                }
-
                 if ( count > 0 ) {
                     logger.debug( "Added {} in-memory for queue {}, new size = {}",
                         count, queueName, inMemoryQueue.size( queueName ) );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/e3285843/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerTest.java
index 65c3309..92075b6 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerTest.java
@@ -29,7 +29,6 @@ import org.apache.usergrid.persistence.qakka.core.CassandraClient;
 import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl;
 import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService;
 import org.apache.usergrid.persistence.queue.impl.LegacyQueueScopeImpl;
-import org.junit.Ignore;
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -143,41 +142,41 @@ public class LegacyQueueManagerTest extends AbstractTest {
         final LegacyQueueScopeImpl scope =
             new LegacyQueueScopeImpl( "testQueue" + queueSeed++, LegacyQueueScope.RegionImplementation.LOCAL );
         LegacyQueueManagerFactory qmf = myInjector.getInstance( LegacyQueueManagerFactory.class );
-        LegacyQueueManager qm = qmf.getQueueManager(scope);
+        LegacyQueueManager qm = qmf.getQueueManager( scope );
 
-        HashMap<String,String> values = new HashMap<>();
-        values.put("test", "Test");
+        HashMap<String, String> values = new HashMap<>();
+        values.put( "test", "Test" );
 
-        List<Map<String,String>> bodies = new ArrayList<>();
-        bodies.add(values);
+        List<Map<String, String>> bodies = new ArrayList<>();
+        bodies.add( values );
         long initialDepth = qm.getQueueDepth();
-        qm.sendMessages(bodies);
+        qm.sendMessages( bodies );
         long depth = 0;
-        for(int i=0; i<10;i++){
-             depth = qm.getQueueDepth();
-            if(depth>0){
+        for (int i = 0; i < 10; i++) {
+            depth = qm.getQueueDepth();
+            if (depth > 0) {
                 break;
             }
-            Thread.sleep(1000);
+            Thread.sleep( 1000 );
         }
-        assertTrue(depth>0);
+        assertTrue( depth > 0 );
 
-        List<LegacyQueueMessage> messageList = qm.getMessages(10, values.getClass());
-        assertTrue(messageList.size() <= 500);
-        for(LegacyQueueMessage message : messageList){
-            assertTrue(message.getBody().equals(values));
+        List<LegacyQueueMessage> messageList = qm.getMessages( 10, values.getClass() );
+        assertTrue( messageList.size() <= 500 );
+        for (LegacyQueueMessage message : messageList) {
+            assertTrue( message.getBody().equals( values ) );
         }
-        if(messageList.size()>0) {
-            qm.commitMessages(messageList);
+        if (messageList.size() > 0) {
+            qm.commitMessages( messageList );
         }
-        for(int i=0; i<10;i++){
+        for (int i = 0; i < 10; i++) {
             depth = qm.getQueueDepth();
-            if(depth==initialDepth){
+            if (depth == initialDepth) {
                 break;
             }
-            Thread.sleep(1000);
+            Thread.sleep( 1000 );
         }
-        assertEquals(initialDepth, depth);
+        assertEquals( initialDepth, depth );
 
         DistributedQueueService distributedQueueService = myInjector.getInstance( DistributedQueueService.class );
         distributedQueueService.shutdown();


[4/5] usergrid git commit: Queue info REST end-point

Posted by sn...@apache.org.
Queue info REST end-point


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/202d5bed
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/202d5bed
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/202d5bed

Branch: refs/heads/usergrid-1318-queue
Commit: 202d5beda527e54852c86439c9c65d6ab2157ad8
Parents: e328584
Author: Dave Johnson <sn...@apache.org>
Authored: Wed Oct 5 17:42:42 2016 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Wed Oct 5 17:42:42 2016 -0400

----------------------------------------------------------------------
 .../rest/system/QueueSystemResource.java        | 78 ++++++++++++++++++++
 1 file changed, 78 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/202d5bed/stack/rest/src/main/java/org/apache/usergrid/rest/system/QueueSystemResource.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/system/QueueSystemResource.java b/stack/rest/src/main/java/org/apache/usergrid/rest/system/QueueSystemResource.java
index 15bdb34..86cd387 100644
--- a/stack/rest/src/main/java/org/apache/usergrid/rest/system/QueueSystemResource.java
+++ b/stack/rest/src/main/java/org/apache/usergrid/rest/system/QueueSystemResource.java
@@ -19,9 +19,18 @@
  */
 package org.apache.usergrid.rest.system;
 
+import com.codahale.metrics.Timer;
 import com.fasterxml.jackson.databind.node.JsonNodeFactory;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
+import org.apache.usergrid.persistence.qakka.MetricsService;
+import org.apache.usergrid.persistence.qakka.QakkaFig;
+import org.apache.usergrid.persistence.qakka.core.Queue;
+import org.apache.usergrid.persistence.qakka.core.QueueManager;
+import org.apache.usergrid.persistence.qakka.core.QueueMessageManager;
+import org.apache.usergrid.persistence.qakka.core.impl.InMemoryQueue;
+import org.apache.usergrid.persistence.qakka.core.impl.QueueManagerImpl;
+import org.apache.usergrid.persistence.qakka.core.impl.QueueMessageManagerImpl;
 import org.apache.usergrid.rest.AbstractContextResource;
 import org.apache.usergrid.rest.ApiResponse;
 import org.apache.usergrid.rest.security.annotations.RequireSystemAccess;
@@ -32,6 +41,10 @@ import org.springframework.stereotype.Component;
 
 import javax.ws.rs.*;
 import javax.ws.rs.core.MediaType;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.text.DecimalFormat;
+import java.util.*;
 
 /**
  * retrieves queue stats
@@ -47,6 +60,7 @@ public class QueueSystemResource extends AbstractContextResource {
 
     public QueueSystemResource(){logger.info("queue resource initialized");}
 
+
     /**
      * Return queue depth of this Usergrid instance in JSON format.
      *
@@ -74,4 +88,68 @@ public class QueueSystemResource extends AbstractContextResource {
         return response;
     }
 
+
+    @GET
+    @RequireSystemAccess
+    @Path("info")
+    public ApiResponse getQueueInfo(
+        @QueryParam("callback") @DefaultValue("callback") String callback ) {
+
+        ApiResponse response = createApiResponse();
+        response.setAction( "get queue info" );
+
+        MetricsService metricsService = injector.getInstance( MetricsService.class );
+
+        final DecimalFormat format = new DecimalFormat("##.###");
+        final long nano = 1000000000;
+
+        Map<String, Object> info = new HashMap<String, Object>() {{
+            put( "name", "Queue Info" );
+            try {
+                put( "host", InetAddress.getLocalHost().getHostName() );
+            } catch (UnknownHostException e) {
+                put( "host", "unknown" );
+            }
+            SortedSet<String> names = metricsService.getMetricRegistry().getNames();
+            for (String name : names) {
+                Timer t = metricsService.getMetricRegistry().timer( name );
+                put( name, new HashMap<String, Object>() {{
+                    put( "count", ""            + t.getCount() );
+                    put( "mean_rate", ""        + format.format( t.getMeanRate() ) );
+                    put( "one_minute_rate", ""  + format.format( t.getOneMinuteRate() ) );
+                    put( "five_minute_rate", "" + format.format( t.getFiveMinuteRate() ) );
+                    put( "mean (s)", ""         + format.format( t.getSnapshot().getMean() / nano ) );
+                    put( "min (s)", ""          + format.format( (double) t.getSnapshot().getMin() / nano ) );
+                    put( "max (s)", ""          + format.format( (double) t.getSnapshot().getMax() / nano ) );
+                }} );
+            }
+        }};
+
+        QueueManager queueManager               = injector.getInstance( QueueManagerImpl.class );
+        QueueMessageManager queueMessageManager = injector.getInstance( QueueMessageManagerImpl.class );
+        InMemoryQueue inMemoryQueue             = injector.getInstance( InMemoryQueue.class );
+
+        List queues = new ArrayList();
+        final List<String> listOfQueues = queueManager.getListOfQueues();
+        for ( String queueName : listOfQueues ) {
+
+            Map<String, Object> queueInfo = new HashMap<>();
+
+            queueInfo.put("name", queueName );
+            queueInfo.put("depth", queueMessageManager.getQueueDepth( queueName ));
+            queueInfo.put("inmemory", inMemoryQueue.size( queueName ));
+
+            UUID newest = inMemoryQueue.getNewest( queueName );
+            queueInfo.put("since", newest == null ? "null" : newest.timestamp());
+
+            queues.add( queueInfo );
+        }
+
+        info.put("queues", queues);
+
+        response.setProperty( "data", info );
+
+        return response;
+    }
+
 }