You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2016/09/30 21:18:46 UTC

[01/10] usergrid git commit: Implement configurable long polling for Qakka queue gets

Repository: usergrid
Updated Branches:
  refs/heads/usergrid-1318-queue c08e8f0e7 -> 5a19ba9a7


Implement configurable long polling for Qakka queue gets


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/434e53e7
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/434e53e7
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/434e53e7

Branch: refs/heads/usergrid-1318-queue
Commit: 434e53e7eed46fbdc3cdeafde9464c90101fa7e3
Parents: c08e8f0
Author: Dave Johnson <sn...@apache.org>
Authored: Tue Sep 20 12:18:33 2016 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Tue Sep 20 12:18:33 2016 -0400

----------------------------------------------------------------------
 .../actorsystem/ActorSystemManagerImpl.java     |  2 +-
 .../apache/usergrid/persistence/qakka/App.java  | 25 ++++++---
 .../usergrid/persistence/qakka/QakkaFig.java    | 13 +++++
 .../impl/DistributedQueueServiceImpl.java       | 23 ++++++++-
 .../qakka/common/CassandraClientTest.java       | 46 -----------------
 .../qakka/core/CassandraClientTest.java         | 46 +++++++++++++++++
 .../qakka/core/QueueMessageManagerTest.java     |  3 +-
 .../distributed/QueueActorServiceTest.java      |  8 +--
 .../queue/src/test/resources/qakka.properties   |  4 ++
 .../services/notifications/QueueListener.java   | 54 ++++++++++++--------
 10 files changed, 140 insertions(+), 84 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/434e53e7/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java
index 9fb39b8..3fc191d 100644
--- a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java
+++ b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java
@@ -333,7 +333,7 @@ public class ActorSystemManagerImpl implements ActorSystemManager {
     /**
      * Create cluster system for this the current region
      */
-    private ActorSystem createClusterSystem( Config config ) {
+    private synchronized ActorSystem createClusterSystem( Config config ) {
 
         // there is only 1 akka system for a Usergrid cluster
         final String clusterName = getClusterName();

http://git-wip-us.apache.org/repos/asf/usergrid/blob/434e53e7/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/App.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/App.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/App.java
index 41bc6fa..abbf3da 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/App.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/App.java
@@ -22,11 +22,16 @@ package org.apache.usergrid.persistence.qakka;
 import com.codahale.metrics.MetricRegistry;
 import com.google.inject.Inject;
 import com.google.inject.Injector;
+import com.google.inject.Singleton;
 import org.apache.usergrid.persistence.actorsystem.ActorSystemFig;
 import org.apache.usergrid.persistence.actorsystem.ActorSystemManager;
 import org.apache.usergrid.persistence.core.migration.schema.MigrationException;
 import org.apache.usergrid.persistence.core.migration.schema.MigrationManager;
+import org.apache.usergrid.persistence.qakka.core.Queue;
 import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService;
+import org.apache.usergrid.persistence.qakka.distributed.impl.QueueActorRouterProducer;
+import org.apache.usergrid.persistence.qakka.distributed.impl.QueueSenderRouterProducer;
+import org.apache.usergrid.persistence.qakka.distributed.impl.QueueWriterRouterProducer;
 import org.apache.usergrid.persistence.qakka.exceptions.QakkaRuntimeException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -35,6 +40,7 @@ import org.slf4j.LoggerFactory;
 /**
  * Akka queueing application
  */
+@Singleton
 public class App implements MetricsService {
     private static final Logger logger = LoggerFactory.getLogger( App.class );
 
@@ -50,6 +56,7 @@ public class App implements MetricsService {
     @Inject
     public App(
             Injector                  injector,
+            QakkaFig                  qakkaFig,
             ActorSystemFig            actorSystemFig,
             ActorSystemManager        actorSystemManager,
             DistributedQueueService   distributedQueueService,
@@ -59,12 +66,18 @@ public class App implements MetricsService {
         this.actorSystemFig = actorSystemFig;
         this.actorSystemManager = actorSystemManager;
         this.distributedQueueService = distributedQueueService;
-//
-//        try {
-//            migrationManager.migrate();
-//        } catch (MigrationException e) {
-//            throw new QakkaRuntimeException( "Error running migration", e );
-//        }
+
+        if ( qakkaFig.getStandalone() ) {
+
+            try {
+                migrationManager.migrate();
+            } catch (MigrationException e) {
+                throw new QakkaRuntimeException( "Error running migration", e );
+            }
+            actorSystemManager.registerRouterProducer( injector.getInstance( QueueActorRouterProducer.class ) );
+            actorSystemManager.registerRouterProducer( injector.getInstance( QueueWriterRouterProducer.class ) );
+            actorSystemManager.registerRouterProducer( injector.getInstance( QueueSenderRouterProducer.class ) );
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/usergrid/blob/434e53e7/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
index 3b901b2..aa4e349 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
@@ -30,6 +30,8 @@ import java.io.Serializable;
 @FigSingleton
 public interface QakkaFig extends GuicyFig, Serializable {
 
+    String QUEUE_STANDALONE                       = "queue.standalone";
+
     String QUEUE_NUM_ACTORS                       = "queue.num.actors";
 
     String QUEUE_SENDER_NUM_ACTORS                = "queue.sender.num.actors";
@@ -58,6 +60,13 @@ public interface QakkaFig extends GuicyFig, Serializable {
 
     String QUEUE_SHARD_MAX_SIZE                   = "queue.shard.max.size";
 
+    String QUEUE_LONG_POLL_TIME_MILLIS            = "queue.long.polling.time.millis";
+
+
+    /** True if Qakka is running standlone */
+    @Key(QUEUE_STANDALONE)
+    @Default("false")
+    boolean getStandalone();
 
     /** Queue senders send to queue writers */
     @Key(QUEUE_SENDER_NUM_ACTORS)
@@ -128,4 +137,8 @@ public interface QakkaFig extends GuicyFig, Serializable {
     @Key(QUEUE_SHARD_MAX_SIZE)
     @Default("400000")
     long getMaxShardSize();
+
+    @Key(QUEUE_LONG_POLL_TIME_MILLIS)
+    @Default("5000")
+    long getLongPollTimeMillis();
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/434e53e7/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
index 4737347..be20cde 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
@@ -25,6 +25,7 @@ import akka.util.Timeout;
 import com.datastax.driver.core.exceptions.InvalidQueryException;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
+import org.apache.log4j.net.SyslogAppender;
 import org.apache.usergrid.persistence.actorsystem.ActorSystemManager;
 import org.apache.usergrid.persistence.actorsystem.ClientActor;
 import org.apache.usergrid.persistence.qakka.QakkaFig;
@@ -38,6 +39,7 @@ import org.slf4j.LoggerFactory;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.UUID;
@@ -107,7 +109,7 @@ public class DistributedQueueServiceImpl implements DistributedQueueService {
 
     @Override
     public void refreshQueue(String queueName) {
-        logger.info("Refreshing queue: {}", queueName);
+        logger.info("{} Requesting refresh for queue: {}", this, queueName);
         QueueRefreshRequest request = new QueueRefreshRequest( queueName );
         ActorRef clientActor = actorSystemManager.getClientActor();
         clientActor.tell( request, null );
@@ -186,6 +188,25 @@ public class DistributedQueueServiceImpl implements DistributedQueueService {
 
     @Override
     public Collection<DatabaseQueueMessage> getNextMessages( String queueName, int count ) {
+        List<DatabaseQueueMessage> ret = new ArrayList<>();
+
+        long startTime = System.currentTimeMillis();
+
+        while ( ret.size() < count
+            && System.currentTimeMillis() - startTime < qakkaFig.getLongPollTimeMillis()) {
+
+            ret.addAll( getNextMessagesInternal( queueName, count ));
+
+            if ( ret.size() < count ) {
+                try { Thread.sleep( qakkaFig.getLongPollTimeMillis() / 5 ); } catch (Exception ignored) {}
+            }
+        }
+
+        return ret;
+    }
+
+
+    public Collection<DatabaseQueueMessage> getNextMessagesInternal( String queueName, int count ) {
 
         List<String> queueNames = queueManager.getListOfQueues();
         if ( !queueNames.contains( queueName ) ) {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/434e53e7/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/common/CassandraClientTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/common/CassandraClientTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/common/CassandraClientTest.java
deleted file mode 100644
index e1f0c7e..0000000
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/common/CassandraClientTest.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.persistence.qakka.common;
-
-import com.datastax.driver.core.Session;
-import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl;
-import org.apache.usergrid.persistence.qakka.AbstractTest;
-import org.apache.usergrid.persistence.qakka.core.CassandraClient;
-import org.junit.Test;
-
-
-/**
- * Created by russo on 6/8/16.
- */
-public class CassandraClientTest extends AbstractTest {
-
-    @Test
-    public void getClient(){
-
-        CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class );
-
-        Session session = cassandraClient.getApplicationSession();
-
-        session.getLoggedKeyspace();
-
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/434e53e7/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/CassandraClientTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/CassandraClientTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/CassandraClientTest.java
new file mode 100644
index 0000000..416de0e
--- /dev/null
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/CassandraClientTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.core;
+
+import com.datastax.driver.core.Session;
+import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl;
+import org.apache.usergrid.persistence.qakka.AbstractTest;
+import org.apache.usergrid.persistence.qakka.core.CassandraClient;
+import org.junit.Test;
+
+
+/**
+ * Created by russo on 6/8/16.
+ */
+public class CassandraClientTest extends AbstractTest {
+
+    @Test
+    public void getClient(){
+
+        CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class );
+
+        Session session = cassandraClient.getApplicationSession();
+
+        session.getLoggedKeyspace();
+
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/434e53e7/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
index 124cb86..0413f81 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
@@ -171,8 +171,7 @@ public class QueueMessageManagerTest extends AbstractTest {
         int maxRetries = 15;
         int retries = 0;
         while ( retries++ < maxRetries ) {
-            //distributedQueueService.refresh();
-            Thread.sleep( 1000 );
+            distributedQueueService.refresh();
             if (inMemoryQueue.size( queueName ) == 40) {
                 break;
             }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/434e53e7/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java
index 0883650..7423424 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java
@@ -65,8 +65,6 @@ public class QueueActorServiceTest extends AbstractTest {
 
         Injector injector = getInjector();
 
-        CassandraClient cassandraClient = injector.getInstance( CassandraClientImpl.class );
-
         ActorSystemFig actorSystemFig = injector.getInstance( ActorSystemFig.class );
         String region = actorSystemFig.getRegionLocal();
 
@@ -117,8 +115,6 @@ public class QueueActorServiceTest extends AbstractTest {
 
         Injector injector = getInjector();
 
-        CassandraClient cassandraClient = injector.getInstance( CassandraClientImpl.class );
-
         ActorSystemFig actorSystemFig = injector.getInstance( ActorSystemFig.class );
         String region = actorSystemFig.getRegionLocal();
 
@@ -151,16 +147,16 @@ public class QueueActorServiceTest extends AbstractTest {
                     queueName, region, region, messageId , null, null);
         }
 
-        int maxRetries = 15;
+        int maxRetries = 25;
         int retries = 0;
         int count = 0;
         while ( retries++ < maxRetries ) {
-            Thread.sleep( 1000 );
             distributedQueueService.refresh();
             if (inMemoryQueue.size( queueName ) == 100) {
                 count = 100;
                 break;
             }
+            Thread.sleep(1000);
         }
 
         Assert.assertEquals( 100, count );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/434e53e7/stack/corepersistence/queue/src/test/resources/qakka.properties
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/resources/qakka.properties b/stack/corepersistence/queue/src/test/resources/qakka.properties
index dc7ef48..aacc187 100644
--- a/stack/corepersistence/queue/src/test/resources/qakka.properties
+++ b/stack/corepersistence/queue/src/test/resources/qakka.properties
@@ -18,6 +18,8 @@
 
 # Properties for JUnit tests
 
+queue.standalone=true
+
 usergrid.cluster_name=Test Cluster
 
 usergrid.cluster.hostname=localhost
@@ -43,6 +45,8 @@ queue.shard.allocation.advance.time.millis=200
 
 queue.max.inmemory.shard.counter = 100
 
+queue.long.polling.time.millis=2000
+
 cassandra.hosts=localhost
 
 cassandra.keyspace.application=qakka_test_application

http://git-wip-us.apache.org/repos/asf/usergrid/blob/434e53e7/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
index 20fbd84..796450b 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
@@ -72,12 +72,14 @@ public class QueueListener  {
     private int consecutiveCallsToRemoveDevices;
 
     public QueueListener(ServiceManagerFactory smf, EntityManagerFactory emf, Properties props){
-        this.queueManagerFactory = smf.getApplicationContext().getBean( Injector.class ).getInstance(LegacyQueueManagerFactory.class);
+        this.queueManagerFactory =
+            smf.getApplicationContext().getBean( Injector.class ).getInstance(LegacyQueueManagerFactory.class);
         this.smf = smf;
         this.emf = emf;
         this.metricsService = smf.getApplicationContext().getBean( Injector.class ).getInstance(MetricsFactory.class);
         this.properties = props;
-        this.applicationQueueManagerCache = smf.getApplicationContext().getBean(Injector.class).getInstance(ApplicationQueueManagerCache.class);
+        this.applicationQueueManagerCache =
+            smf.getApplicationContext().getBean(Injector.class).getInstance(ApplicationQueueManagerCache.class);
 
     }
 
@@ -94,12 +96,15 @@ public class QueueListener  {
 
             try {
 
-                sleepBetweenRuns = new Long(properties.getProperty("usergrid.push.sleep", "" + DEFAULT_SLEEP));
-                sleepWhenNoneFound = new Long(properties.getProperty("usergrid.push.sleep", "" + DEFAULT_SLEEP));
-                consecutiveCallsToRemoveDevices = new Integer(properties.getProperty("usergrid.notifications.inactive.interval", ""+200));
+                sleepBetweenRuns = new Long(properties.getProperty("usergrid.push.worker.sleep", "" + DEFAULT_SLEEP));
+                sleepWhenNoneFound = new Long(properties.getProperty("usergrid.push.worker.sleep", "" + DEFAULT_SLEEP));
+
+                consecutiveCallsToRemoveDevices =
+                    new Integer(properties.getProperty("usergrid.notifications.inactive.interval", ""+200));
                 queueName = ApplicationQueueManagerImpl.getQueueNames(properties);
 
-                int maxThreads = new Integer(properties.getProperty("usergrid.push.worker_count", ""+PUSH_CONSUMER_MAX_THREADS));
+                int maxThreads =
+                    new Integer(properties.getProperty("usergrid.push.worker_count", ""+PUSH_CONSUMER_MAX_THREADS));
 
                 futures = new ArrayList<>(maxThreads);
 
@@ -166,6 +171,13 @@ public class QueueListener  {
 
         while ( true ) {
 
+                if(sleepBetweenRuns > 0) {
+                    if (logger.isTraceEnabled()) {
+                        logger.trace("sleep between rounds...sleep...{}", sleepBetweenRuns);
+                    }
+                    try { Thread.sleep(sleepBetweenRuns); } catch (InterruptedException ignored) { }
+                }
+
                 Timer.Context timerContext = timer.time();
                 rx.Observable.from( legacyQueueManager.getMessages(MAX_TAKE, ApplicationQueueMessage.class))
                     .buffer(MAX_TAKE)
@@ -173,7 +185,7 @@ public class QueueListener  {
 
                         try {
                             if (logger.isTraceEnabled()) {
-                                logger.trace("retrieved batch of {} messages from queue {}", messages.size(), queueName);
+                                logger.trace("retrieved batch of {} messages from queue {}",messages.size(),queueName);
                             }
 
                             if (messages.size() > 0) {
@@ -185,12 +197,13 @@ public class QueueListener  {
                                     ApplicationQueueMessage queueMessage = (ApplicationQueueMessage) message.getBody();
                                     UUID applicationId = queueMessage.getApplicationId();
 
-                                    //Groups queue messages by application Id, ( they are all probably going to the same place )
+                                    // Groups queue messages by application Id,
+                                    // (they are all probably going to the same place)
                                     if (!messageMap.containsKey(applicationId)) {
                                         //For each app id it sends the set.
-                                        List<LegacyQueueMessage> applicationQueueMessages = new ArrayList<LegacyQueueMessage>();
-                                        applicationQueueMessages.add(message);
-                                        messageMap.put(applicationId, applicationQueueMessages);
+                                        List<LegacyQueueMessage> lqms = new ArrayList<LegacyQueueMessage>();
+                                        lqms.add(message);
+                                        messageMap.put(applicationId, lqms);
                                     } else {
                                         messageMap.get(applicationId).add(message);
                                     }
@@ -207,13 +220,15 @@ public class QueueListener  {
                                         .getApplicationQueueManager(
                                             emf.getEntityManager(applicationId),
                                             legacyQueueManager,
-                                            new JobScheduler(smf.getServiceManager(applicationId), emf.getEntityManager(applicationId)),
+                                            new JobScheduler(smf.getServiceManager(applicationId),
+                                                             emf.getEntityManager(applicationId)),
                                             metricsService,
                                             properties
                                         );
 
                                     if (logger.isTraceEnabled()) {
-                                        logger.trace("send batch for app {} of {} messages", entry.getKey(), entry.getValue().size());
+                                        logger.trace("send batch for app {} of {} messages",
+                                            entry.getKey(), entry.getValue().size());
                                     }
                                     Observable current = manager.sendBatchToProviders(entry.getValue(),queueName);
 
@@ -231,20 +246,15 @@ public class QueueListener  {
 
                                 meter.mark(messages.size());
                                 if (logger.isTraceEnabled()) {
-                                    logger.trace("sent batch {} messages duration {} ms", messages.size(), System.currentTimeMillis() - now);
+                                    logger.trace("sent batch {} messages duration {} ms",
+                                        messages.size(), System.currentTimeMillis() - now);
                                 }
 
-                                if(sleepBetweenRuns > 0) {
-                                    if (logger.isTraceEnabled()) {
-                                        logger.trace("sleep between rounds...sleep...{}", sleepBetweenRuns);
-                                    }
-                                    Thread.sleep(sleepBetweenRuns);
-                                }
 
                                 if(runCount.incrementAndGet() % consecutiveCallsToRemoveDevices == 0){
-                                    for(ApplicationQueueManager applicationQueueManager : applicationQueueManagerCache.asMap().values()){
+                                    for(ApplicationQueueManager aqm : applicationQueueManagerCache.asMap().values()){
                                         try {
-                                            applicationQueueManager.asyncCheckForInactiveDevices();
+                                            aqm.asyncCheckForInactiveDevices();
                                         }catch (Exception inactiveDeviceException){
                                             logger.error("Inactive Device Get failed",inactiveDeviceException);
                                         }


[06/10] usergrid git commit: Use same timeout value as old queue implementation did

Posted by sn...@apache.org.
Use same timeout value as old queue implementation did


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/c08e02a9
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/c08e02a9
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/c08e02a9

Branch: refs/heads/usergrid-1318-queue
Commit: c08e02a914d98bd048f997cddd47b343ba9908c1
Parents: 727ff1d
Author: Dave Johnson <sn...@apache.org>
Authored: Thu Sep 22 07:47:07 2016 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Thu Sep 22 07:47:07 2016 -0400

----------------------------------------------------------------------
 .../main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/c08e02a9/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
index da47c98..472c241 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
@@ -89,7 +89,7 @@ public interface QakkaFig extends GuicyFig, Serializable {
 
     /** Time for queue messages to timeout, if not set per queue */
     @Key(QUEUE_TIMEOUT_SECONDS)
-    @Default("10")
+    @Default("30")
     int getQueueTimeoutSeconds();
 
     /** How often to refresh each queue's in-memory data */


[03/10] usergrid git commit: Introduce maxTtl for queue message and message payload data, default is two weeks.

Posted by sn...@apache.org.
Introduce maxTtl for queue message and message payload data, default is two weeks.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/8b79fb87
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/8b79fb87
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/8b79fb87

Branch: refs/heads/usergrid-1318-queue
Commit: 8b79fb875a3aea88458e08429286d2119d6a3c88
Parents: 6c204b9
Author: Dave Johnson <sn...@apache.org>
Authored: Tue Sep 20 14:30:20 2016 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Tue Sep 20 14:30:20 2016 -0400

----------------------------------------------------------------------
 .../usergrid/persistence/qakka/QakkaFig.java    |  7 +++++++
 .../impl/QueueMessageSerializationImpl.java     | 22 ++++++++++++--------
 2 files changed, 20 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/8b79fb87/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
index aa4e349..c66001d 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
@@ -62,6 +62,8 @@ public interface QakkaFig extends GuicyFig, Serializable {
 
     String QUEUE_LONG_POLL_TIME_MILLIS            = "queue.long.polling.time.millis";
 
+    String QUEUE_MAX_TTL                          = "queue.max.ttl";
+
 
     /** True if Qakka is running standlone */
     @Key(QUEUE_STANDALONE)
@@ -141,4 +143,9 @@ public interface QakkaFig extends GuicyFig, Serializable {
     @Key(QUEUE_LONG_POLL_TIME_MILLIS)
     @Default("5000")
     long getLongPollTimeMillis();
+
+    /** Max time-to-live for queue message and payload data */
+    @Key(QUEUE_MAX_TTL)
+    @Default("1209600") // default is two weeks
+    int getMaxTtlSeconds();
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/8b79fb87/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java
index d868021..02862c4 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java
@@ -19,7 +19,6 @@
 
 package org.apache.usergrid.persistence.qakka.serialization.queuemessages.impl;
 
-import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.core.Row;
 import com.datastax.driver.core.Statement;
 import com.datastax.driver.core.querybuilder.Clause;
@@ -32,6 +31,7 @@ import org.apache.usergrid.persistence.core.CassandraConfig;
 import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamilyDefinition;
 import org.apache.usergrid.persistence.core.datastax.TableDefinition;
 import org.apache.usergrid.persistence.core.datastax.impl.TableDefinitionStringImpl;
+import org.apache.usergrid.persistence.qakka.QakkaFig;
 import org.apache.usergrid.persistence.qakka.core.CassandraClient;
 import org.apache.usergrid.persistence.qakka.core.QakkaUtils;
 import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
@@ -49,12 +49,13 @@ import java.util.UUID;
 
 
 public class QueueMessageSerializationImpl implements QueueMessageSerialization {
-
     private static final Logger logger = LoggerFactory.getLogger( QueueMessageSerializationImpl.class );
 
     private final CassandraClient cassandraClient;
     private final CassandraConfig cassandraConfig;
 
+    private final int maxTtl;
+
     private final ActorSystemFig            actorSystemFig;
     private final ShardStrategy             shardStrategy;
     private final ShardCounterSerialization shardCounterSerialization;
@@ -109,17 +110,20 @@ public class QueueMessageSerializationImpl implements QueueMessageSerialization
 
     @Inject
     public QueueMessageSerializationImpl(
-            CassandraConfig              cassandraConfig,
+            CassandraConfig           cassandraConfig,
             ActorSystemFig            actorSystemFig,
             ShardStrategy             shardStrategy,
             ShardCounterSerialization shardCounterSerialization,
-            CassandraClient           cassandraClient
+            CassandraClient           cassandraClient,
+            QakkaFig                  qakkaFig
         ) {
         this.cassandraConfig              = cassandraConfig;
         this.actorSystemFig            = actorSystemFig;
         this.shardStrategy             = shardStrategy;
         this.shardCounterSerialization = shardCounterSerialization;
         this.cassandraClient = cassandraClient;
+
+        this.maxTtl = qakkaFig.getMaxTtlSeconds();
     }
 
 
@@ -151,7 +155,8 @@ public class QueueMessageSerializationImpl implements QueueMessageSerialization
                 .value( COLUMN_MESSAGE_ID,       message.getMessageId())
                 .value( COLUMN_QUEUE_MESSAGE_ID, queueMessageId)
                 .value( COLUMN_INFLIGHT_AT,      inflightAt )
-                .value( COLUMN_QUEUED_AT,        queuedAt);
+                .value( COLUMN_QUEUED_AT,        queuedAt)
+            .using( QueryBuilder.ttl( maxTtl ) );
 
         cassandraClient.getQueueMessageSession().execute(insert);
 
@@ -244,9 +249,7 @@ public class QueueMessageSerializationImpl implements QueueMessageSerialization
                 .and(shardIdClause)
                 .and(queueMessageIdClause);
 
-        ResultSet resultSet = cassandraClient.getQueueMessageSession().execute( delete );
-
-        String s = "s";
+        cassandraClient.getQueueMessageSession().execute( delete );
     }
 
 
@@ -275,7 +278,8 @@ public class QueueMessageSerializationImpl implements QueueMessageSerialization
         Statement insert = QueryBuilder.insertInto(TABLE_MESSAGE_DATA)
                 .value( COLUMN_MESSAGE_ID, messageId)
                 .value( COLUMN_MESSAGE_DATA, messageBody.getBlob())
-                .value( COLUMN_CONTENT_TYPE, messageBody.getContentType());
+                .value( COLUMN_CONTENT_TYPE, messageBody.getContentType())
+            .using( QueryBuilder.ttl( maxTtl ) );
 
         cassandraClient.getApplicationSession().execute(insert);
     }


[08/10] usergrid git commit: Now supports elasticsearch.queue_impl=MULTIREGION setting instead of SNS, also more/better DEBUG logging

Posted by sn...@apache.org.
Now supports elasticsearch.queue_impl=MULTIREGION setting instead of SNS, also more/better DEBUG logging


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/2cd8ecb3
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/2cd8ecb3
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/2cd8ecb3

Branch: refs/heads/usergrid-1318-queue
Commit: 2cd8ecb30fc5f97927ec000573daba2ba16e914f
Parents: 00eb139
Author: Dave Johnson <sn...@apache.org>
Authored: Fri Sep 23 12:47:29 2016 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Fri Sep 23 12:47:29 2016 -0400

----------------------------------------------------------------------
 .../asyncevents/AsyncIndexProvider.java         | 46 ++++++++++++----
 .../qakka/distributed/actors/QueueActor.java    | 53 ++++++++++++++++--
 .../distributed/actors/QueueRefresher.java      | 56 +++++++++++++++-----
 .../distributed/actors/QueueTimeouter.java      | 44 +++++++++++++--
 .../impl/DistributedQueueServiceImpl.java       | 12 +++--
 .../queue/impl/QueueManagerFactoryImpl.java     |  7 ++-
 .../queue/src/test/resources/log4j.properties   | 11 ++--
 tests/performance/results.txt                   |  1 +
 8 files changed, 188 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/2cd8ecb3/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
index aac0e66..f4a9bd2 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
@@ -103,16 +103,44 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> {
         final Implementations impl = Implementations.valueOf(value);
 
         switch (impl) {
+
             case LOCAL:
-                AsyncEventServiceImpl eventService = new AsyncEventServiceImpl(scope -> new LocalQueueManager(), indexProcessorFig, indexProducer, metricsFactory,
-                    entityCollectionManagerFactory, indexLocationStrategyFactory, entityIndexFactory, eventBuilder,mapManagerFactory, queueFig,rxTaskScheduler);
+                AsyncEventServiceImpl eventService =
+                    new AsyncEventServiceImpl(scope -> new LocalQueueManager(),
+                        indexProcessorFig,
+                        indexProducer,
+                        metricsFactory,
+                        entityCollectionManagerFactory,
+                        indexLocationStrategyFactory,
+                        entityIndexFactory,
+                        eventBuilder,
+                        mapManagerFactory,
+                        queueFig,rxTaskScheduler);
                 eventService.MAX_TAKE = 1000;
                 return eventService;
+
             case SQS:
-                throw new IllegalArgumentException("Configuration value of SQS is no longer allowed. Use SNS instead with only a single region");
+                throw new IllegalArgumentException(
+                    "Configuration value of SQS is no longer allowed. Use SNS instead with only a single region.");
+
             case SNS:
-                return new AsyncEventServiceImpl(queueManagerFactory, indexProcessorFig, indexProducer, metricsFactory,
-                    entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory, eventBuilder, mapManagerFactory, queueFig, rxTaskScheduler );
+                throw new IllegalArgumentException(
+                    "Configuration value of SNS is no longer allowed. Use MULTIREGION instead. ");
+
+            case MULTIREGION:
+                return new AsyncEventServiceImpl(
+                    queueManagerFactory,
+                    indexProcessorFig,
+                    indexProducer,
+                    metricsFactory,
+                    entityCollectionManagerFactory,
+                    indexLocationStrategyFactory,
+                    entityIndexFactory,
+                    eventBuilder,
+                    mapManagerFactory,
+                    queueFig,
+                    rxTaskScheduler );
+
             default:
                 throw new IllegalArgumentException("Configuration value of " + getErrorValues() + " are allowed");
         }
@@ -135,12 +163,12 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> {
     /**
      * Different implementations
      */
-    public static enum Implementations { //TODO see about removing SNS and SQS and use AMZN? - michaelarusso
+    public static enum Implementations {
         TEST,
         LOCAL,
-        SQS,
-        SNS;
-
+        SQS,         // deprecated
+        SNS,         // deprecated
+        MULTIREGION; // built-in Akka-based queue
 
         public String asString() {
             return toString();

http://git-wip-us.apache.org/repos/asf/usergrid/blob/2cd8ecb3/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
index 315975c..87342ad 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
@@ -37,11 +37,10 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.duration.Duration;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
+import java.text.DecimalFormat;
+import java.util.*;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 
 
 public class QueueActor extends UntypedActor {
@@ -62,6 +61,10 @@ public class QueueActor extends UntypedActor {
     private final Map<String, ActorRef> queueTimeoutersByQueueName = new HashMap<>();
     private final Map<String, ActorRef> shardAllocatorsByQueueName = new HashMap<>();
 
+    private final AtomicLong runCount = new AtomicLong(0);
+    private final AtomicLong messageCount = new AtomicLong(0);
+    private final Set<String> queuesSeen = new HashSet<>();
+
 
     public QueueActor() {
 
@@ -81,6 +84,8 @@ public class QueueActor extends UntypedActor {
         if ( message instanceof QueueInitRequest) {
             QueueInitRequest request = (QueueInitRequest)message;
 
+            queuesSeen.add( request.getQueueName() );
+
             if ( refreshSchedulersByQueueName.get( request.getQueueName() ) == null ) {
                 Cancellable scheduler = getContext().system().scheduler().schedule(
                     Duration.create( 0, TimeUnit.MILLISECONDS),
@@ -120,6 +125,8 @@ public class QueueActor extends UntypedActor {
         } else if ( message instanceof QueueRefreshRequest ) {
             QueueRefreshRequest request = (QueueRefreshRequest)message;
 
+            queuesSeen.add( request.getQueueName() );
+
             if ( queueReadersByQueueName.get( request.getQueueName() ) == null ) {
 
                 if ( !request.isOnlyIfEmpty() || inMemoryQueue.peek( request.getQueueName()) == null ) {
@@ -135,6 +142,8 @@ public class QueueActor extends UntypedActor {
         } else if ( message instanceof QueueTimeoutRequest ) {
             QueueTimeoutRequest request = (QueueTimeoutRequest)message;
 
+            queuesSeen.add( request.getQueueName() );
+
             if ( queueTimeoutersByQueueName.get( request.getQueueName() ) == null ) {
                 ActorRef readerRef = getContext().actorOf( Props.create(
                     QueueTimeouter.class, request.getQueueName()), request.getQueueName() + "_timeouter");
@@ -148,6 +157,8 @@ public class QueueActor extends UntypedActor {
         } else if ( message instanceof ShardCheckRequest ) {
             ShardCheckRequest request = (ShardCheckRequest)message;
 
+            queuesSeen.add( request.getQueueName() );
+
             if ( shardAllocatorsByQueueName.get( request.getQueueName() ) == null ) {
                 ActorRef readerRef = getContext().actorOf( Props.create(
                         ShardAllocator.class, request.getQueueName()), request.getQueueName() + "_shard_allocator");
@@ -164,6 +175,8 @@ public class QueueActor extends UntypedActor {
             try {
                 QueueGetRequest queueGetRequest = (QueueGetRequest) message;
 
+                queuesSeen.add( queueGetRequest.getQueueName() );
+
                 Collection<DatabaseQueueMessage> queueMessages = new ArrayList<>();
 
                 while (queueMessages.size() < queueGetRequest.getNumRequested()) {
@@ -189,6 +202,36 @@ public class QueueActor extends UntypedActor {
                 getSender().tell( new QueueGetResponse(
                         DistributedQueueService.Status.SUCCESS, queueMessages ), getSender() );
 
+                long runs = runCount.incrementAndGet();
+                long messagesReturned = messageCount.addAndGet( queueMessages.size() );
+
+                if ( logger.isDebugEnabled() && runs % 100 == 0 ) {
+
+                    final DecimalFormat format = new DecimalFormat("##.###");
+                    final long nano = 1000000000;
+                    Timer t = metricsService.getMetricRegistry().timer(MetricsService.GET_TIME_GET );
+
+                    logger.debug("QueueActor get stats (queues {}):\n" +
+                            "   Num runs={}\n" +
+                            "   Messages={}\n" +
+                            "   Mean={}\n" +
+                            "   One min rate={}\n" +
+                            "   Five min rate={}\n" +
+                            "   Snapshot mean={}\n" +
+                            "   Snapshot min={}\n" +
+                            "   Snapshot max={}",
+                        queuesSeen.toArray(),
+                        t.getCount(),
+                        messagesReturned,
+                        format.format( t.getMeanRate() ),
+                        format.format( t.getOneMinuteRate() ),
+                        format.format( t.getFiveMinuteRate() ),
+                        format.format( t.getSnapshot().getMean() / nano ),
+                        format.format( (double) t.getSnapshot().getMin() / nano ),
+                        format.format( (double) t.getSnapshot().getMax() / nano ) );
+                }
+
+
             } finally {
                 timer.close();
             }
@@ -201,6 +244,8 @@ public class QueueActor extends UntypedActor {
 
                 QueueAckRequest queueAckRequest = (QueueAckRequest) message;
 
+                queuesSeen.add( queueAckRequest.getQueueName() );
+
                 DistributedQueueService.Status status = queueActorHelper.ackQueueMessage(
                         queueAckRequest.getQueueName(),
                         queueAckRequest.getQueueMessageId() );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/2cd8ecb3/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java
index ebc328f..dbd5235 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java
@@ -39,28 +39,31 @@ import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardIterato
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.text.DecimalFormat;
 import java.util.Optional;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
 
 
 public class QueueRefresher extends UntypedActor {
     private static final Logger logger = LoggerFactory.getLogger( QueueRefresher.class );
 
-    private final String                    queueName;
-
-    private final QueueMessageSerialization serialization;
-    private final InMemoryQueue             inMemoryQueue;
-    private final QakkaFig qakkaFig;
-    private final ActorSystemFig            actorSystemFig;
-    private final MetricsService            metricsService;
+    private final String          queueName;
+    private final InMemoryQueue   inMemoryQueue;
+    private final QakkaFig        qakkaFig;
+    private final ActorSystemFig  actorSystemFig;
+    private final MetricsService  metricsService;
     private final CassandraClient cassandraClient;
 
+    private final AtomicLong      runCount = new AtomicLong(0);
+    private final AtomicLong      totalRead = new AtomicLong(0);
+
+
     public QueueRefresher(String queueName ) {
         this.queueName = queueName;
 
         Injector injector = App.INJECTOR;
 
-        serialization  = injector.getInstance( QueueMessageSerialization.class );
         inMemoryQueue  = injector.getInstance( InMemoryQueue.class );
         qakkaFig       = injector.getInstance( QakkaFig.class );
         actorSystemFig = injector.getInstance( ActorSystemFig.class );
@@ -76,7 +79,7 @@ public class QueueRefresher extends UntypedActor {
 
             QueueRefreshRequest request = (QueueRefreshRequest) message;
 
-            logger.debug( "running for queue {}", queueName );
+            //logger.debug( "running for queue {}", queueName );
 
             if (!request.getQueueName().equals( queueName )) {
                 throw new QakkaRuntimeException(
@@ -109,10 +112,39 @@ public class QueueRefresher extends UntypedActor {
                         count++;
                     }
 
-                    if ( count > 0 ) {
-                        logger.debug( "Added {} in-memory for queue {}, new size = {}",
-                                count, queueName, inMemoryQueue.size( queueName ) );
+                    long runs = runCount.incrementAndGet();
+                    long readCount = totalRead.addAndGet( count );
+
+                    if ( logger.isDebugEnabled() && runs % 100 == 0 ) {
+
+                        final DecimalFormat format = new DecimalFormat("##.###");
+                        final long nano = 1000000000;
+                        Timer t = metricsService.getMetricRegistry().timer(MetricsService.REFRESH_TIME );
+
+                        logger.debug("QueueRefresher for queue '{}' stats:\n" +
+                            "   Num runs={}\n" +
+                            "   Read count={}\n" +
+                            "   Mean={}\n" +
+                            "   One min rate={}\n" +
+                            "   Five min rate={}\n" +
+                            "   Snapshot mean={}\n" +
+                            "   Snapshot min={}\n" +
+                            "   Snapshot max={}",
+                            queueName,
+                            t.getCount(),
+                            readCount,
+                            format.format( t.getMeanRate() ),
+                            format.format( t.getOneMinuteRate() ),
+                            format.format( t.getFiveMinuteRate() ),
+                            format.format( t.getSnapshot().getMean() / nano ),
+                            format.format( (double) t.getSnapshot().getMin() / nano ),
+                            format.format( (double) t.getSnapshot().getMax() / nano ) );
                     }
+
+//                    if ( count > 0 ) {
+//                        logger.debug( "Added {} in-memory for queue {}, new size = {}",
+//                                count, queueName, inMemoryQueue.size( queueName ) );
+//                    }
                 }
 
             } finally {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/2cd8ecb3/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java
index 7806d30..b47aac6 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java
@@ -40,8 +40,10 @@ import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardIterato
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.text.DecimalFormat;
 import java.util.Optional;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
 
 
 public class QueueTimeouter extends UntypedActor {
@@ -54,9 +56,11 @@ public class QueueTimeouter extends UntypedActor {
     private final ActorSystemFig            actorSystemFig;
     private final QakkaFig qakkaFig;
     private final CassandraClient           cassandraClient;
-
     private final MessageCounterSerialization messageCounterSerialization;
 
+    private final AtomicLong runCount = new AtomicLong(0);
+    private final AtomicLong totalTimedout = new AtomicLong(0);
+
 
     public QueueTimeouter(String queueName ) {
         this.queueName = queueName;
@@ -137,13 +141,43 @@ public class QueueTimeouter extends UntypedActor {
                     }
                 }
 
-                if (count > 0) {
-                    logger.debug( "Timed out {} messages for queue {}", count, queueName );
 
-                    messageCounterSerialization.decrementCounter(
-                        queueName, DatabaseQueueMessage.Type.DEFAULT, count);
+                long runs = runCount.incrementAndGet();
+                long timeoutCount = totalTimedout.addAndGet( count );
+
+                if ( logger.isDebugEnabled() && runs % 100 == 0 ) {
+
+                    final DecimalFormat format = new DecimalFormat("##.###");
+                    final long nano = 1000000000;
+                    Timer t = metricsService.getMetricRegistry().timer(MetricsService.TIMEOUT_TIME );
+
+                    logger.debug("QueueTimeouter for queue '{}' stats:\n" +
+                            "   Num runs={}\n" +
+                            "   Timeout count={}\n" +
+                            "   Mean={}\n" +
+                            "   One min rate={}\n" +
+                            "   Five min rate={}\n" +
+                            "   Snapshot mean={}\n" +
+                            "   Snapshot min={}\n" +
+                            "   Snapshot max={}",
+                        queueName,
+                        t.getCount(),
+                        timeoutCount,
+                        format.format( t.getMeanRate() ),
+                        format.format( t.getOneMinuteRate() ),
+                        format.format( t.getFiveMinuteRate() ),
+                        format.format( t.getSnapshot().getMean() / nano ),
+                        format.format( (double) t.getSnapshot().getMin() / nano ),
+                        format.format( (double) t.getSnapshot().getMax() / nano ) );
                 }
 
+//                if (count > 0) {
+//                    logger.debug( "Timed out {} messages for queue {}", count, queueName );
+//
+//                    messageCounterSerialization.decrementCounter(
+//                        queueName, DatabaseQueueMessage.Type.DEFAULT, count);
+//                }
+
             } finally {
                 timer.close();
             }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/2cd8ecb3/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
index 9d71c31..bcb6b79 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
@@ -39,10 +39,7 @@ import org.slf4j.LoggerFactory;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.UUID;
+import java.util.*;
 import java.util.concurrent.TimeUnit;
 
 
@@ -168,7 +165,7 @@ public class DistributedQueueServiceImpl implements DistributedQueueService {
                             logger.debug("SUCCESS after {} retries", retries );
                         }
 
-                        logger.debug("{} Requesting refresh if empty for queue: {}", this, queueName);
+                        // send refresh-queue-if-empty message
                         QueueRefreshRequest qrr = new QueueRefreshRequest( queueName, false );
                         clientActor.tell( qrr, null );
 
@@ -221,6 +218,11 @@ public class DistributedQueueServiceImpl implements DistributedQueueService {
             throw new QakkaRuntimeException( "Queue name: " + queueName + " does not exist");
         }
 
+        if ( actorSystemManager.getClientActor() == null || !actorSystemManager.isReady() ) {
+            logger.error("Akka Actor System is not ready yet for requests.");
+            return Collections.EMPTY_LIST;
+        }
+
         int maxRetries = qakkaFig.getMaxGetRetries();
         int retries = 0;
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/2cd8ecb3/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerFactoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerFactoryImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerFactoryImpl.java
index e1bf239..4a41dda 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerFactoryImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerFactoryImpl.java
@@ -53,15 +53,18 @@ public class QueueManagerFactoryImpl implements LegacyQueueManagerFactory {
                     if ( queueFig.overrideQueueForDefault() ){
 
                         LegacyQueueManager manager = defaultManager.get( scope.getName() );
-                        logger.info("Using Queue Implemention: {}", manager.getClass().getSimpleName());
                         if ( manager == null ) {
+                            logger.info("Using LocalQueueManager for scope {}", scope.getName() );
                             manager = new LocalQueueManager();
                             defaultManager.put( scope.getName(), manager );
                         }
                         return manager;
 
                     } else {
-                        return queuemanagerInternalFactory.getQueueManager(scope);
+                        LegacyQueueManager queueManager = queuemanagerInternalFactory.getQueueManager( scope );
+                        logger.info("Using queue manager {} for scope {}",
+                            queueManager.getClass().getSimpleName(), scope.getName() );
+                        return queueManager;
                     }
 
                 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/2cd8ecb3/stack/corepersistence/queue/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/resources/log4j.properties b/stack/corepersistence/queue/src/test/resources/log4j.properties
index 9e14f29..b207ea3 100644
--- a/stack/corepersistence/queue/src/test/resources/log4j.properties
+++ b/stack/corepersistence/queue/src/test/resources/log4j.properties
@@ -21,12 +21,13 @@ log4j.appender.stdout=org.apache.log4j.ConsoleAppender
 log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
 log4j.appender.stdout.layout.ConversionPattern=%d %p (%t) %c{1} - %m%n
 
-log4j.logger.org.apache.usergrid.persistence.actorsystem=DEBUG
-log4j.logger.org.apache.usergrid.persistence.actorsystem=DEBUG
-log4j.logger.org.apache.usergrid.persistence.qakka=DEBUG
-
 log4j.logger.org.apache.cassandra=WARN
 log4j.logger.org.glassfish=WARN
 
-log4j.logger.org.apache.usergrid.persistence.qakka=DEBUG
+#log4j.logger.org.apache.usergrid.persistence.actorsystem=DEBUG
+#log4j.logger.org.apache.usergrid.persistence.actorsystem=DEBUG
+#log4j.logger.org.apache.usergrid.persistence.qakka=DEBUG
 
+log4j.logger.org.apache.usergrid.persistence.qakka=DEBUG
+log4j.logger.org.apache.usergrid.persistence.queue=DEBUG
+log4j.logger.org.apache.usergrid.corepersistence.asyncevents=DEBUG

http://git-wip-us.apache.org/repos/asf/usergrid/blob/2cd8ecb3/tests/performance/results.txt
----------------------------------------------------------------------
diff --git a/tests/performance/results.txt b/tests/performance/results.txt
new file mode 100644
index 0000000..fb63637
--- /dev/null
+++ b/tests/performance/results.txt
@@ -0,0 +1 @@
+collection,name,uuid,modified,status,error,lastStatus


[02/10] usergrid git commit: Ensure local keyspace has unique name per datacenter / region.

Posted by sn...@apache.org.
Ensure local keyspace has unique name per datacenter / region.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/6c204b9f
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/6c204b9f
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/6c204b9f

Branch: refs/heads/usergrid-1318-queue
Commit: 6c204b9f046fa9ea234d7e608b940ead60fb6720
Parents: 434e53e
Author: Dave Johnson <sn...@apache.org>
Authored: Tue Sep 20 13:27:17 2016 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Tue Sep 20 13:27:17 2016 -0400

----------------------------------------------------------------------
 .../persistence/core/CassandraConfigImpl.java     |  7 +++++--
 .../usergrid/persistence/core/CassandraFig.java   |  1 +
 .../auditlog/impl/AuditLogSerializationImpl.java  | 10 +++++-----
 .../impl/MessageCounterSerializationImpl.java     | 10 +++++-----
 .../impl/QueueMessageSerializationImpl.java       | 14 +++++++-------
 .../queues/impl/QueueSerializationImpl.java       | 10 +++++-----
 .../impl/ShardCounterSerializationImpl.java       | 10 +++++-----
 .../sharding/impl/ShardSerializationImpl.java     | 12 ++++++------
 .../impl/TransferLogSerializationImpl.java        | 10 +++++-----
 .../persistence/queue/impl/QakkaQueueManager.java |  6 ------
 .../queue/impl/SNSQueueManagerImpl.java           | 10 +++++-----
 ...ultiShardDatabaseQueueMessageIteratorTest.java |  6 +++---
 .../serialization/sharding/ShardIteratorTest.java | 14 +++++++-------
 .../sharding/ShardSerializationTest.java          | 18 +++++++++---------
 14 files changed, 68 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/6c204b9f/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraConfigImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraConfigImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraConfigImpl.java
index 729f5b2..77f7228 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraConfigImpl.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraConfigImpl.java
@@ -64,13 +64,16 @@ public class CassandraConfigImpl implements CassandraConfig {
 
         this.dataStaxReadCl = com.datastax.driver.core.ConsistencyLevel.valueOf( cassandraFig.getReadCl());
 
-        this.dataStaxReadConsistentCl = com.datastax.driver.core.ConsistencyLevel.valueOf( cassandraFig.getReadClConsistent());
+        this.dataStaxReadConsistentCl = com.datastax.driver.core.ConsistencyLevel.valueOf(
+            cassandraFig.getReadClConsistent());
 
         this.dataStaxWriteCl = com.datastax.driver.core.ConsistencyLevel.valueOf( cassandraFig.getWriteCl() );
 
         this.applicationKeyspace = cassandraFig.getApplicationKeyspace();
 
-        this.applicationLocalKeyspace = cassandraFig.getApplicationLocalKeyspace();
+        this.applicationLocalKeyspace =
+              cassandraFig.getApplicationLocalKeyspace() + "_"
+            + cassandraFig.getLocalDataCenter().replace("-", "_");
 
         //add the listeners to update the values
         cassandraFig.addPropertyChangeListener( new PropertyChangeListener() {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/6c204b9f/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraFig.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraFig.java
index bc8d087..1faf1e7 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraFig.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CassandraFig.java
@@ -91,6 +91,7 @@ public interface CassandraFig extends GuicyFig {
     @Default( "Usergrid_Applications" )
     String getApplicationKeyspace();
 
+    /** Prefix for local keyspace name. Name will be this prefix plus "_" plus local data center name. */
     @Key( "cassandra.keyspace.application.local" )
     @Default( "Usergrid_Applications_Local" )
     String getApplicationLocalKeyspace();

http://git-wip-us.apache.org/repos/asf/usergrid/blob/6c204b9f/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/auditlog/impl/AuditLogSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/auditlog/impl/AuditLogSerializationImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/auditlog/impl/AuditLogSerializationImpl.java
index ddbd345..93dfe4b 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/auditlog/impl/AuditLogSerializationImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/auditlog/impl/AuditLogSerializationImpl.java
@@ -24,7 +24,7 @@ import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.core.Statement;
 import com.datastax.driver.core.querybuilder.QueryBuilder;
 import com.google.inject.Inject;
-import org.apache.usergrid.persistence.core.CassandraFig;
+import org.apache.usergrid.persistence.core.CassandraConfig;
 import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamilyDefinition;
 import org.apache.usergrid.persistence.core.datastax.TableDefinition;
 import org.apache.usergrid.persistence.core.datastax.impl.TableDefinitionStringImpl;
@@ -47,7 +47,7 @@ public class AuditLogSerializationImpl implements AuditLogSerialization {
     private static final Logger logger = LoggerFactory.getLogger( AuditLogSerializationImpl.class );
 
     private final CassandraClient cassandraClient;
-    private final CassandraFig cassandraFig;
+    private final CassandraConfig cassandraConfig;
 
     public final static String TABLE_AUDIT_LOG   = "audit_log";
 
@@ -77,8 +77,8 @@ public class AuditLogSerializationImpl implements AuditLogSerialization {
 
 
     @Inject
-    public AuditLogSerializationImpl( CassandraFig cassandraFig, CassandraClient cassandraClient ) {
-        this.cassandraFig = cassandraFig;
+    public AuditLogSerializationImpl( CassandraConfig cassandraConfig, CassandraClient cassandraClient ) {
+        this.cassandraConfig = cassandraConfig;
         this.cassandraClient = cassandraClient;
     }
 
@@ -147,6 +147,6 @@ public class AuditLogSerializationImpl implements AuditLogSerialization {
     @Override
     public Collection<TableDefinition> getTables() {
         return Collections.singletonList(
-            new TableDefinitionStringImpl( cassandraFig.getApplicationKeyspace(), TABLE_AUDIT_LOG, CQL ) );
+            new TableDefinitionStringImpl( cassandraConfig.getApplicationKeyspace(), TABLE_AUDIT_LOG, CQL ) );
     }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/6c204b9f/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java
index 5206ec7..f198d05 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java
@@ -24,7 +24,7 @@ import com.datastax.driver.core.Statement;
 import com.datastax.driver.core.querybuilder.QueryBuilder;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
-import org.apache.usergrid.persistence.core.CassandraFig;
+import org.apache.usergrid.persistence.core.CassandraConfig;
 import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamilyDefinition;
 import org.apache.usergrid.persistence.core.datastax.TableDefinition;
 import org.apache.usergrid.persistence.core.datastax.impl.TableDefinitionStringImpl;
@@ -51,7 +51,7 @@ public class MessageCounterSerializationImpl implements ShardCounterSerializatio
     private static final Logger logger = LoggerFactory.getLogger( MessageCounterSerializationImpl.class );
 
     private final CassandraClient cassandraClient;
-    private final CassandraFig cassandraFig;
+    private final CassandraConfig cassandraConfig;
 
     final static String TABLE_SHARD_COUNTERS       = "counters";
     final static String COLUMN_QUEUE_NAME    = "queue_name";
@@ -95,8 +95,8 @@ public class MessageCounterSerializationImpl implements ShardCounterSerializatio
 
 
     @Inject
-    public MessageCounterSerializationImpl( CassandraFig cassandraFig, QakkaFig qakkaFig, CassandraClient cassandraClient ) {
-        this.cassandraFig = cassandraFig;
+    public MessageCounterSerializationImpl( CassandraConfig cassandraConfig, QakkaFig qakkaFig, CassandraClient cassandraClient ) {
+        this.cassandraConfig = cassandraConfig;
         this.maxInMemoryIncrement = qakkaFig.getMaxInMemoryShardCounter();
         this.cassandraClient = cassandraClient;
     }
@@ -202,7 +202,7 @@ public class MessageCounterSerializationImpl implements ShardCounterSerializatio
     @Override
     public Collection<TableDefinition> getTables() {
         return Collections.singletonList(
-            new TableDefinitionStringImpl( cassandraFig.getApplicationLocalKeyspace(), TABLE_SHARD_COUNTERS, CQL ) );
+            new TableDefinitionStringImpl( cassandraConfig.getApplicationLocalKeyspace(), TABLE_SHARD_COUNTERS, CQL ) );
     }
 
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/6c204b9f/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java
index 99ff783..d868021 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java
@@ -28,7 +28,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.inject.Inject;
 import org.apache.usergrid.persistence.actorsystem.ActorSystemFig;
-import org.apache.usergrid.persistence.core.CassandraFig;
+import org.apache.usergrid.persistence.core.CassandraConfig;
 import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamilyDefinition;
 import org.apache.usergrid.persistence.core.datastax.TableDefinition;
 import org.apache.usergrid.persistence.core.datastax.impl.TableDefinitionStringImpl;
@@ -53,7 +53,7 @@ public class QueueMessageSerializationImpl implements QueueMessageSerialization
     private static final Logger logger = LoggerFactory.getLogger( QueueMessageSerializationImpl.class );
 
     private final CassandraClient cassandraClient;
-    private final CassandraFig cassandraFig;
+    private final CassandraConfig cassandraConfig;
 
     private final ActorSystemFig            actorSystemFig;
     private final ShardStrategy             shardStrategy;
@@ -109,13 +109,13 @@ public class QueueMessageSerializationImpl implements QueueMessageSerialization
 
     @Inject
     public QueueMessageSerializationImpl(
-            CassandraFig              cassandraFig,
+            CassandraConfig              cassandraConfig,
             ActorSystemFig            actorSystemFig,
             ShardStrategy             shardStrategy,
             ShardCounterSerialization shardCounterSerialization,
             CassandraClient           cassandraClient
         ) {
-        this.cassandraFig              = cassandraFig;
+        this.cassandraConfig              = cassandraConfig;
         this.actorSystemFig            = actorSystemFig;
         this.shardStrategy             = shardStrategy;
         this.shardCounterSerialization = shardCounterSerialization;
@@ -316,13 +316,13 @@ public class QueueMessageSerializationImpl implements QueueMessageSerialization
     public Collection<TableDefinition> getTables() {
         return Lists.newArrayList(
 
-            new TableDefinitionStringImpl( cassandraFig.getApplicationLocalKeyspace(),
+            new TableDefinitionStringImpl( cassandraConfig.getApplicationLocalKeyspace(),
                 TABLE_MESSAGES_AVAILABLE, MESSAGES_AVAILABLE ),
 
-            new TableDefinitionStringImpl( cassandraFig.getApplicationLocalKeyspace(),
+            new TableDefinitionStringImpl( cassandraConfig.getApplicationLocalKeyspace(),
                 TABLE_MESSAGES_INFLIGHT, MESSAGES_INFLIGHT ),
 
-            new TableDefinitionStringImpl( cassandraFig.getApplicationKeyspace(),
+            new TableDefinitionStringImpl( cassandraConfig.getApplicationKeyspace(),
                 TABLE_MESSAGE_DATA, MESSAGE_DATA )
         );
     }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/6c204b9f/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queues/impl/QueueSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queues/impl/QueueSerializationImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queues/impl/QueueSerializationImpl.java
index 07a201c..17a48c6 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queues/impl/QueueSerializationImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queues/impl/QueueSerializationImpl.java
@@ -26,7 +26,7 @@ import com.datastax.driver.core.Statement;
 import com.datastax.driver.core.querybuilder.Clause;
 import com.datastax.driver.core.querybuilder.QueryBuilder;
 import com.google.inject.Inject;
-import org.apache.usergrid.persistence.core.CassandraFig;
+import org.apache.usergrid.persistence.core.CassandraConfig;
 import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamilyDefinition;
 import org.apache.usergrid.persistence.core.datastax.TableDefinition;
 import org.apache.usergrid.persistence.core.datastax.impl.TableDefinitionStringImpl;
@@ -47,7 +47,7 @@ public class QueueSerializationImpl implements QueueSerialization {
     private static final Logger logger = LoggerFactory.getLogger( QueueMessageSerializationImpl.class );
 
     private final CassandraClient cassandraClient;
-    private final CassandraFig cassandraFig;
+    private final CassandraConfig cassandraConfig;
 
     public final static String COLUMN_QUEUE_NAME = "queue_name";
     public final static String COLUMN_REGIONS = "regions";
@@ -74,8 +74,8 @@ public class QueueSerializationImpl implements QueueSerialization {
 
 
     @Inject
-    public QueueSerializationImpl( CassandraFig cassandraFig,  CassandraClient cassandraClient ) {
-        this.cassandraFig = cassandraFig;
+    public QueueSerializationImpl( CassandraConfig cassandraConfig,  CassandraClient cassandraClient ) {
+        this.cassandraConfig = cassandraConfig;
         this.cassandraClient = cassandraClient;
     }
 
@@ -155,7 +155,7 @@ public class QueueSerializationImpl implements QueueSerialization {
     @Override
     public Collection<TableDefinition> getTables() {
         return Collections.singletonList(
-            new TableDefinitionStringImpl( cassandraFig.getApplicationKeyspace(), "queues", CQL ) );
+            new TableDefinitionStringImpl( cassandraConfig.getApplicationKeyspace(), "queues", CQL ) );
     }
 
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/6c204b9f/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardCounterSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardCounterSerializationImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardCounterSerializationImpl.java
index f14d234..bcfb74d 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardCounterSerializationImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardCounterSerializationImpl.java
@@ -24,7 +24,7 @@ import com.datastax.driver.core.Statement;
 import com.datastax.driver.core.querybuilder.QueryBuilder;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
-import org.apache.usergrid.persistence.core.CassandraFig;
+import org.apache.usergrid.persistence.core.CassandraConfig;
 import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamilyDefinition;
 import org.apache.usergrid.persistence.core.datastax.TableDefinition;
 import org.apache.usergrid.persistence.core.datastax.impl.TableDefinitionStringImpl;
@@ -51,7 +51,7 @@ public class ShardCounterSerializationImpl implements ShardCounterSerialization
     private static final Logger logger = LoggerFactory.getLogger( ShardCounterSerializationImpl.class );
 
     private final CassandraClient cassandraClient;
-    private final CassandraFig cassandraFig;
+    private final CassandraConfig cassandraConfig;
 
     final static String TABLE_COUNTERS       = "shard_counters";
     final static String COLUMN_QUEUE_NAME    = "queue_name";
@@ -91,8 +91,8 @@ public class ShardCounterSerializationImpl implements ShardCounterSerialization
 
 
     @Inject
-    public ShardCounterSerializationImpl( CassandraFig cassandraFig, QakkaFig qakkaFig, CassandraClient cassandraClient ) {
-        this.cassandraFig = cassandraFig;
+    public ShardCounterSerializationImpl( CassandraConfig cassandraConfig, QakkaFig qakkaFig, CassandraClient cassandraClient ) {
+        this.cassandraConfig = cassandraConfig;
         this.maxInMemoryIncrement = qakkaFig.getMaxInMemoryShardCounter();
         this.cassandraClient = cassandraClient;
     }
@@ -197,6 +197,6 @@ public class ShardCounterSerializationImpl implements ShardCounterSerialization
     @Override
     public Collection<TableDefinition> getTables() {
         return Collections.singletonList(
-            new TableDefinitionStringImpl( cassandraFig.getApplicationLocalKeyspace(), "shard_counters", CQL ) );
+            new TableDefinitionStringImpl( cassandraConfig.getApplicationLocalKeyspace(), "shard_counters", CQL ) );
     }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/6c204b9f/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardSerializationImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardSerializationImpl.java
index 989622b..cc5caab 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardSerializationImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardSerializationImpl.java
@@ -26,7 +26,7 @@ import com.datastax.driver.core.querybuilder.Clause;
 import com.datastax.driver.core.querybuilder.QueryBuilder;
 import com.google.common.collect.Lists;
 import com.google.inject.Inject;
-import org.apache.usergrid.persistence.core.CassandraFig;
+import org.apache.usergrid.persistence.core.CassandraConfig;
 import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamilyDefinition;
 import org.apache.usergrid.persistence.core.datastax.TableDefinition;
 import org.apache.usergrid.persistence.core.datastax.impl.TableDefinitionStringImpl;
@@ -46,7 +46,7 @@ public class ShardSerializationImpl implements ShardSerialization {
     private static final Logger logger = LoggerFactory.getLogger( ShardSerializationImpl.class );
 
     private final CassandraClient cassandraClient;
-    private final CassandraFig cassandraFig;
+    private final CassandraConfig cassandraConfig;
 
     public final static String COLUMN_QUEUE_NAME = "queue_name";
     public final static String COLUMN_REGION = "region";
@@ -82,8 +82,8 @@ public class ShardSerializationImpl implements ShardSerialization {
 
 
     @Inject
-    public ShardSerializationImpl( CassandraFig cassandraFig,  CassandraClient cassandraClient ) {
-        this.cassandraFig = cassandraFig;
+    public ShardSerializationImpl( CassandraConfig cassandraConfig,  CassandraClient cassandraClient ) {
+        this.cassandraConfig = cassandraConfig;
         this.cassandraClient = cassandraClient;
     }
 
@@ -195,9 +195,9 @@ public class ShardSerializationImpl implements ShardSerialization {
     @Override
     public Collection<TableDefinition> getTables() {
         return Lists.newArrayList(
-                new TableDefinitionStringImpl( cassandraFig.getApplicationLocalKeyspace(),
+                new TableDefinitionStringImpl( cassandraConfig.getApplicationLocalKeyspace(),
                     TABLE_SHARDS_MESSAGES_AVAILABLE, SHARDS_MESSAGES_AVAILABLE ),
-                new TableDefinitionStringImpl( cassandraFig.getApplicationLocalKeyspace(),
+                new TableDefinitionStringImpl( cassandraConfig.getApplicationLocalKeyspace(),
                     TABLE_SHARDS_MESSAGES_INFLIGHT, SHARDS_MESSAGES_AVAILABLE_INFLIGHT )
         );
     }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/6c204b9f/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/transferlog/impl/TransferLogSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/transferlog/impl/TransferLogSerializationImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/transferlog/impl/TransferLogSerializationImpl.java
index 9ebb841..51a168e 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/transferlog/impl/TransferLogSerializationImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/transferlog/impl/TransferLogSerializationImpl.java
@@ -25,7 +25,7 @@ import com.datastax.driver.core.Row;
 import com.datastax.driver.core.Statement;
 import com.datastax.driver.core.querybuilder.QueryBuilder;
 import com.google.inject.Inject;
-import org.apache.usergrid.persistence.core.CassandraFig;
+import org.apache.usergrid.persistence.core.CassandraConfig;
 import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamilyDefinition;
 import org.apache.usergrid.persistence.core.datastax.TableDefinition;
 import org.apache.usergrid.persistence.core.datastax.impl.TableDefinitionStringImpl;
@@ -45,7 +45,7 @@ public class TransferLogSerializationImpl implements TransferLogSerialization {
     private static final Logger logger = LoggerFactory.getLogger( TransferLogSerializationImpl.class );
 
     private final CassandraClient cassandraClient;
-    private final CassandraFig cassandraFig;
+    private final CassandraConfig cassandraConfig;
 
     public final static String TABLE_TRANSFER_LOG   = "transfer_log";
 
@@ -67,8 +67,8 @@ public class TransferLogSerializationImpl implements TransferLogSerialization {
 
 
     @Inject
-    public TransferLogSerializationImpl( CassandraFig cassandraFig,  CassandraClient cassandraClient ) {
-        this.cassandraFig = cassandraFig;
+    public TransferLogSerializationImpl( CassandraConfig cassandraConfig,  CassandraClient cassandraClient ) {
+        this.cassandraConfig = cassandraConfig;
         this.cassandraClient = cassandraClient;
     }
 
@@ -164,7 +164,7 @@ public class TransferLogSerializationImpl implements TransferLogSerialization {
     @Override
     public Collection<TableDefinition> getTables() {
         return Collections.singletonList(
-            new TableDefinitionStringImpl( cassandraFig.getApplicationKeyspace(), TABLE_TRANSFER_LOG, CQL ) );
+            new TableDefinitionStringImpl( cassandraConfig.getApplicationKeyspace(), TABLE_TRANSFER_LOG, CQL ) );
     }
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/6c204b9f/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java
index 832cecd..0eb609d 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java
@@ -42,27 +42,21 @@ public class QakkaQueueManager implements LegacyQueueManager {
     private static final Logger logger = LoggerFactory.getLogger( QakkaQueueManager.class );
 
     private final LegacyQueueScope    scope;
-    private final LegacyQueueFig      fig;
     private final QueueManager        queueManager;
     private final QueueMessageManager queueMessageManager;
-    private final QakkaFig            qakkaFig;
     private final Regions             regions;
 
 
     @Inject
     public QakkaQueueManager(
         @Assisted LegacyQueueScope scope,
-        LegacyQueueFig      fig,
         QueueManager        queueManager,
         QueueMessageManager queueMessageManager,
-        QakkaFig            qakkaFig,
         Regions             regions
     ) {
 
         this.scope = scope;
-        this.fig = fig;
         this.queueManager = queueManager;
-        this.qakkaFig = qakkaFig;
         this.queueMessageManager = queueMessageManager;
         this.regions = regions;
     }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/6c204b9f/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
index 6d4e0c4..637f157 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
@@ -32,7 +32,7 @@ import com.amazonaws.ClientConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.usergrid.persistence.core.CassandraFig;
+import org.apache.usergrid.persistence.core.CassandraConfig;
 import org.apache.usergrid.persistence.core.executor.TaskExecutorFactory;
 import org.apache.usergrid.persistence.core.guicyfig.ClusterFig;
 import org.apache.usergrid.persistence.queue.LegacyQueue;
@@ -87,7 +87,7 @@ public class SNSQueueManagerImpl implements LegacyQueueManager {
     private final LegacyQueueScope scope;
     private final LegacyQueueFig fig;
     private final ClusterFig clusterFig;
-    private final CassandraFig cassandraFig;
+    private final CassandraConfig cassandraConfig;
     private final ClientConfiguration clientConfiguration;
     private final AmazonSQSClient sqs;
     private final AmazonSNSClient sns;
@@ -154,11 +154,11 @@ public class SNSQueueManagerImpl implements LegacyQueueManager {
 
     @Inject
     public SNSQueueManagerImpl(@Assisted LegacyQueueScope scope, LegacyQueueFig fig, ClusterFig clusterFig,
-                               CassandraFig cassandraFig, LegacyQueueFig queueFig ) {
+                               CassandraConfig cassandraConfig, LegacyQueueFig queueFig ) {
         this.scope = scope;
         this.fig = fig;
         this.clusterFig = clusterFig;
-        this.cassandraFig = cassandraFig;
+        this.cassandraConfig = cassandraConfig;
 
 
         // create our own executor which has a bounded queue w/ caller runs policy for rejected tasks
@@ -382,7 +382,7 @@ public class SNSQueueManagerImpl implements LegacyQueueManager {
 
     private String getName() {
         String name =
-            clusterFig.getClusterName() + "_" + cassandraFig.getApplicationKeyspace() + "_" + scope.getName() + "_"
+            clusterFig.getClusterName() + "_" + cassandraConfig.getApplicationKeyspace() + "_" + scope.getName() + "_"
                 + scope.getRegionImplementation();
         name = name.toLowerCase(); //user lower case values
         Preconditions.checkArgument( name.length() <= 80, "Your name must be < than 80 characters" );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/6c204b9f/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardDatabaseQueueMessageIteratorTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardDatabaseQueueMessageIteratorTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardDatabaseQueueMessageIteratorTest.java
index 5fa3434..053fdd1 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardDatabaseQueueMessageIteratorTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/MultiShardDatabaseQueueMessageIteratorTest.java
@@ -20,7 +20,7 @@
 package org.apache.usergrid.persistence.qakka.serialization;
 
 import org.apache.commons.lang.RandomStringUtils;
-import org.apache.usergrid.persistence.core.CassandraFig;
+import org.apache.usergrid.persistence.core.CassandraConfig;
 import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl;
 import org.apache.usergrid.persistence.qakka.AbstractTest;
 import org.apache.usergrid.persistence.qakka.core.CassandraClient;
@@ -51,8 +51,8 @@ public class MultiShardDatabaseQueueMessageIteratorTest extends AbstractTest {
     public void testIterator() throws InterruptedException {
 
         CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class );
-        CassandraFig cassandraFig = getInjector().getInstance( CassandraFig.class );
-        ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraFig, cassandraClient );
+        CassandraConfig cassandraConfig = getInjector().getInstance( CassandraConfig.class );
+        ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraConfig, cassandraClient );
 
         QueueMessageSerialization queueMessageSerialization =
                 getInjector().getInstance( QueueMessageSerialization.class );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/6c204b9f/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardIteratorTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardIteratorTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardIteratorTest.java
index 0d593aa..0c305fa 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardIteratorTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardIteratorTest.java
@@ -20,7 +20,7 @@
 package org.apache.usergrid.persistence.qakka.serialization.sharding;
 
 import org.apache.commons.lang.RandomStringUtils;
-import org.apache.usergrid.persistence.core.CassandraFig;
+import org.apache.usergrid.persistence.core.CassandraConfig;
 import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl;
 import org.apache.usergrid.persistence.qakka.serialization.sharding.impl.ShardSerializationImpl;
 import org.apache.usergrid.persistence.qakka.AbstractTest;
@@ -47,8 +47,8 @@ public class ShardIteratorTest extends AbstractTest {
     public void getActiveShards(){
 
         CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class );
-        CassandraFig cassandraFig = getInjector().getInstance( CassandraFig.class );
-        ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraFig, cassandraClient );
+        CassandraConfig cassandraConfig = getInjector().getInstance( CassandraConfig.class );
+        ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraConfig, cassandraClient );
 
         String queueName = "queue_sit_" + RandomStringUtils.randomAlphanumeric( 10 );
 
@@ -80,8 +80,8 @@ public class ShardIteratorTest extends AbstractTest {
     public void seekActiveShards(){
 
         CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class );
-        CassandraFig cassandraFig = getInjector().getInstance( CassandraFig.class );
-        ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraFig, cassandraClient );
+        CassandraConfig cassandraConfig = getInjector().getInstance( CassandraConfig.class );
+        ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraConfig, cassandraClient );
 
         String queueName = "queue_sit_" + RandomStringUtils.randomAlphanumeric( 10 );
 
@@ -114,8 +114,8 @@ public class ShardIteratorTest extends AbstractTest {
     public void shardIteratorOrdering() throws Exception {
 
         CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class );
-        CassandraFig cassandraFig = getInjector().getInstance( CassandraFig.class );
-        ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraFig, cassandraClient );
+        CassandraConfig cassandraConfig = getInjector().getInstance( CassandraConfig.class );
+        ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraConfig, cassandraClient );
 
         int numShards = 10;
         String region = "default";

http://git-wip-us.apache.org/repos/asf/usergrid/blob/6c204b9f/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardSerializationTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardSerializationTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardSerializationTest.java
index e67db28..debfdd3 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardSerializationTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardSerializationTest.java
@@ -19,7 +19,7 @@
 
 package org.apache.usergrid.persistence.qakka.serialization.sharding;
 
-import org.apache.usergrid.persistence.core.CassandraFig;
+import org.apache.usergrid.persistence.core.CassandraConfig;
 import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl;
 import org.apache.usergrid.persistence.qakka.AbstractTest;
 import org.apache.usergrid.persistence.qakka.core.CassandraClient;
@@ -47,8 +47,8 @@ public class ShardSerializationTest extends AbstractTest {
     public void writeNewShard(){
 
         CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class );
-        CassandraFig cassandraFig = getInjector().getInstance( CassandraFig.class );
-        ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraFig, cassandraClient );
+        CassandraConfig cassandraConfig = getInjector().getInstance( CassandraConfig.class );
+        ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraConfig, cassandraClient );
 
         Shard shard1 = new Shard("test", "region1", Shard.Type.DEFAULT, 100L, null);
         shardSerialization.createShard(shard1);
@@ -58,8 +58,8 @@ public class ShardSerializationTest extends AbstractTest {
     public void deleteShard(){
 
         CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class );
-        CassandraFig cassandraFig = getInjector().getInstance( CassandraFig.class );
-        ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraFig, cassandraClient );
+        CassandraConfig cassandraConfig = getInjector().getInstance( CassandraConfig.class );
+        ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraConfig, cassandraClient );
 
         Shard shard1 = new Shard("test", "region1", Shard.Type.DEFAULT, 100L, null);
 
@@ -75,8 +75,8 @@ public class ShardSerializationTest extends AbstractTest {
     public void loadNullShard(){
 
         CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class );
-        CassandraFig cassandraFig = getInjector().getInstance( CassandraFig.class );
-        ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraFig, cassandraClient );
+        CassandraConfig cassandraConfig = getInjector().getInstance( CassandraConfig.class );
+        ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraConfig, cassandraClient );
 
         Shard shard1 = new Shard("junk", "region1", Shard.Type.DEFAULT, 100L, null);
 
@@ -90,8 +90,8 @@ public class ShardSerializationTest extends AbstractTest {
     public void updatePointer(){
 
         CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class );
-        CassandraFig cassandraFig = getInjector().getInstance( CassandraFig.class );
-        ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraFig, cassandraClient );
+        CassandraConfig cassandraConfig = getInjector().getInstance( CassandraConfig.class );
+        ShardSerialization shardSerialization = new ShardSerializationImpl( cassandraConfig, cassandraClient );
 
         Shard shard1 = new Shard("test", "region1", Shard.Type.DEFAULT, 100L, null);
         shardSerialization.createShard(shard1);


[04/10] usergrid git commit: Complete message counter and add support for getQueueDepth() in QakkaQueueManager

Posted by sn...@apache.org.
Complete message counter and add support for getQueueDepth() in QakkaQueueManager


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/9306f12e
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/9306f12e
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/9306f12e

Branch: refs/heads/usergrid-1318-queue
Commit: 9306f12eea8cd8f0ad0b2ec4751cd8a9b9ba5382
Parents: 8b79fb8
Author: Dave Johnson <sn...@apache.org>
Authored: Wed Sep 21 08:34:28 2016 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Wed Sep 21 08:34:28 2016 -0400

----------------------------------------------------------------------
 .../usergrid/persistence/qakka/QakkaFig.java    |   9 +-
 .../usergrid/persistence/qakka/QakkaModule.java |  29 ++--
 .../qakka/core/QueueMessageManager.java         |   4 +
 .../core/impl/QueueMessageManagerImpl.java      |  13 +-
 .../qakka/distributed/actors/QueueActor.java    |  14 +-
 .../distributed/actors/QueueTimeouter.java      |   8 +
 .../qakka/distributed/actors/QueueWriter.java   | 138 ++++++++--------
 .../impl/DistributedQueueServiceImpl.java       |   6 +-
 .../MessageCounterSerialization.java            |   4 +-
 .../impl/MessageCounterSerializationImpl.java   | 159 +++++++++++++------
 .../queue/impl/QakkaQueueManager.java           |   4 +-
 .../qakka/core/QueueMessageManagerTest.java     |   2 +
 .../impl/MessageCounterSerializationTest.java   |  90 +++++++++++
 .../sharding/ShardCounterSerializationTest.java |   3 -
 .../queue/src/test/resources/qakka.properties   |   1 +
 15 files changed, 347 insertions(+), 137 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/9306f12e/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
index c66001d..c3f4189 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
@@ -52,7 +52,9 @@ public interface QakkaFig extends GuicyFig, Serializable {
 
     String QUEUE_GET_TIMEOUT                      = "queue.get.timeout.seconds";
 
-    String QUEUE_MAX_SHARD_COUNTER                = "queue.max.inmemory.shard.counter";
+    String QUEUE_MAX_SHARD_COUNTER                = "queue.max.inmemory.max.shard.counter";
+
+    String QUEUE_MAX_MESSAGE_CHANGES              = "queue.max.inmemory.max.message.changes";
 
     String QUEUE_SHARD_ALLOCATION_CHECK_FREQUENCY = "queue.shard.allocation.check.frequency.millis";
 
@@ -125,6 +127,11 @@ public interface QakkaFig extends GuicyFig, Serializable {
     @Default("100")
     long getMaxInMemoryShardCounter();
 
+    /** Once counter reaches this value, write it to permanent storage */
+    @Key(QUEUE_MAX_MESSAGE_CHANGES)
+    @Default("100")
+    long getMaxInMemoryMessageCounter();
+
     /** How often to check whether new shard is needed for each queue */
     @Key(QUEUE_SHARD_ALLOCATION_CHECK_FREQUENCY)
     @Default("5000")

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9306f12e/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaModule.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaModule.java
index d1d8d7e..e3113e1 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaModule.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaModule.java
@@ -37,7 +37,9 @@ import org.apache.usergrid.persistence.qakka.distributed.impl.QueueSenderRouterP
 import org.apache.usergrid.persistence.qakka.distributed.impl.QueueWriterRouterProducer;
 import org.apache.usergrid.persistence.qakka.serialization.auditlog.AuditLogSerialization;
 import org.apache.usergrid.persistence.qakka.serialization.auditlog.impl.AuditLogSerializationImpl;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.MessageCounterSerialization;
 import org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.impl.MessageCounterSerializationImpl;
 import org.apache.usergrid.persistence.qakka.serialization.queuemessages.impl.QueueMessageSerializationImpl;
 import org.apache.usergrid.persistence.qakka.serialization.queues.QueueSerialization;
 import org.apache.usergrid.persistence.qakka.serialization.queues.impl.QueueSerializationImpl;
@@ -76,23 +78,24 @@ public class QakkaModule extends AbstractModule {
 
         bind( App.class );
 
-        bind( CassandraClient.class ).to(           CassandraClientImpl.class );
-        bind( MetricsService.class ).to(            App.class );
+        bind( CassandraClient.class ).to(             CassandraClientImpl.class );
+        bind( MetricsService.class ).to(              App.class );
 
-        bind( QueueManager.class ).to(              QueueManagerImpl.class );
-        bind( QueueSerialization.class ).to(        QueueSerializationImpl.class );
+        bind( QueueManager.class ).to(                QueueManagerImpl.class );
+        bind( QueueSerialization.class ).to(          QueueSerializationImpl.class );
 
-        bind( QueueMessageManager.class ).to(       QueueMessageManagerImpl.class );
-        bind( QueueMessageSerialization.class ).to( QueueMessageSerializationImpl.class );
+        bind( QueueMessageManager.class ).to(         QueueMessageManagerImpl.class );
+        bind( QueueMessageSerialization.class ).to(   QueueMessageSerializationImpl.class );
 
-        bind( ShardSerialization.class ).to(        ShardSerializationImpl.class );
-        bind( ShardStrategy.class ).to(             ShardStrategyImpl.class );
+        bind( ShardSerialization.class ).to(          ShardSerializationImpl.class );
+        bind( ShardStrategy.class ).to(               ShardStrategyImpl.class );
 
-        bind( ShardCounterSerialization.class ).to( ShardCounterSerializationImpl.class );
+        bind( ShardCounterSerialization.class ).to(   ShardCounterSerializationImpl.class );
+        bind( MessageCounterSerialization.class ).to( MessageCounterSerializationImpl.class );
 
-        bind( TransferLogSerialization.class ).to(  TransferLogSerializationImpl.class );
-        bind( AuditLogSerialization.class ).to(     AuditLogSerializationImpl.class );
-        bind( DistributedQueueService.class ).to(   DistributedQueueServiceImpl.class );
+        bind( TransferLogSerialization.class ).to(    TransferLogSerializationImpl.class );
+        bind( AuditLogSerialization.class ).to(       AuditLogSerializationImpl.class );
+        bind( DistributedQueueService.class ).to(     DistributedQueueServiceImpl.class );
 
         bind( QueueActorRouterProducer.class );
         bind( QueueWriterRouterProducer.class );
@@ -110,6 +113,6 @@ public class QakkaModule extends AbstractModule {
         migrationBinder.addBinding().to( Key.get( ShardCounterSerialization.class ) );
         migrationBinder.addBinding().to( Key.get( ShardSerialization.class ) );
         migrationBinder.addBinding().to( Key.get( TransferLogSerialization.class ) );
-        //migrationBinder.addBinding().to( Key.get( MessageCounterSerialization.class ) );
+        migrationBinder.addBinding().to( Key.get( MessageCounterSerialization.class ) );
     }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9306f12e/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManager.java
index 15203d8..b540fce 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManager.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManager.java
@@ -19,6 +19,8 @@
 
 package org.apache.usergrid.persistence.qakka.core;
 
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
+
 import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.UUID;
@@ -80,4 +82,6 @@ public interface QueueMessageManager {
      * Get message from messages available or messages inflight storage.
      */
     QueueMessage getMessage(String queueName, UUID queueMessageId);
+
+    long getQueueDepth(String queueName);
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9306f12e/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java
index bcd0f58..691c1a6 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java
@@ -33,6 +33,7 @@ import org.apache.usergrid.persistence.qakka.exceptions.NotFoundException;
 import org.apache.usergrid.persistence.qakka.exceptions.QakkaRuntimeException;
 import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
 import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessageBody;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.MessageCounterSerialization;
 import org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization;
 import org.apache.usergrid.persistence.qakka.serialization.transferlog.TransferLogSerialization;
 import org.slf4j.Logger;
@@ -58,6 +59,7 @@ public class QueueMessageManagerImpl implements QueueMessageManager {
     private final DistributedQueueService   distributedQueueService;
     private final TransferLogSerialization  transferLogSerialization;
     private final URIStrategy uriStrategy;
+    private final MessageCounterSerialization messageCounterSerialization;
 
 
     @Inject
@@ -67,8 +69,8 @@ public class QueueMessageManagerImpl implements QueueMessageManager {
             QueueMessageSerialization queueMessageSerialization,
             DistributedQueueService   distributedQueueService,
             TransferLogSerialization  transferLogSerialization,
-            URIStrategy               uriStrategy
-            ) {
+            URIStrategy               uriStrategy,
+            MessageCounterSerialization messageCounterSerialization ) {
 
         this.actorSystemFig            = actorSystemFig;
         this.queueManager              = queueManager;
@@ -76,6 +78,7 @@ public class QueueMessageManagerImpl implements QueueMessageManager {
         this.distributedQueueService   = distributedQueueService;
         this.transferLogSerialization  = transferLogSerialization;
         this.uriStrategy               = uriStrategy;
+        this.messageCounterSerialization = messageCounterSerialization;
     }
 
 
@@ -296,4 +299,10 @@ public class QueueMessageManagerImpl implements QueueMessageManager {
         return queueMessage;
     }
 
+
+    @Override
+    public long getQueueDepth(String queueName) {
+        return messageCounterSerialization.getCounterValue( queueName, DatabaseQueueMessage.Type.DEFAULT );
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9306f12e/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
index 6fed13b..3b50711 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
@@ -32,6 +32,7 @@ import org.apache.usergrid.persistence.qakka.core.impl.InMemoryQueue;
 import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService;
 import org.apache.usergrid.persistence.qakka.distributed.messages.*;
 import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.MessageCounterSerialization;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.duration.Duration;
@@ -46,11 +47,13 @@ import java.util.concurrent.TimeUnit;
 public class QueueActor extends UntypedActor {
     private static final Logger logger = LoggerFactory.getLogger( QueueActor.class );
 
-    private final QakkaFig qakkaFig;
-    private final InMemoryQueue inMemoryQueue;
+    private final QakkaFig         qakkaFig;
+    private final InMemoryQueue    inMemoryQueue;
     private final QueueActorHelper queueActorHelper;
     private final MetricsService   metricsService;
 
+    private final MessageCounterSerialization messageCounterSerialization;
+
     private final Map<String, Cancellable> refreshSchedulersByQueueName = new HashMap<>();
     private final Map<String, Cancellable> timeoutSchedulersByQueueName = new HashMap<>();
     private final Map<String, Cancellable> shardAllocationSchedulersByQueueName = new HashMap<>();
@@ -68,6 +71,8 @@ public class QueueActor extends UntypedActor {
         inMemoryQueue    = injector.getInstance( InMemoryQueue.class );
         queueActorHelper = injector.getInstance( QueueActorHelper.class );
         metricsService   = injector.getInstance( MetricsService.class );
+
+        messageCounterSerialization = injector.getInstance( MessageCounterSerialization.class );
     }
 
     @Override
@@ -173,6 +178,11 @@ public class QueueActor extends UntypedActor {
                     }
                 }
 
+                messageCounterSerialization.decrementCounter(
+                    queueGetRequest.getQueueName(),
+                    DatabaseQueueMessage.Type.DEFAULT,
+                    queueMessages.size());
+
                 getSender().tell( new QueueGetResponse(
                         DistributedQueueService.Status.SUCCESS, queueMessages ), getSender() );
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9306f12e/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java
index fcd2161..7806d30 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java
@@ -33,6 +33,7 @@ import org.apache.usergrid.persistence.qakka.distributed.messages.QueueTimeoutRe
 import org.apache.usergrid.persistence.qakka.exceptions.QakkaRuntimeException;
 import org.apache.usergrid.persistence.qakka.serialization.MultiShardMessageIterator;
 import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.MessageCounterSerialization;
 import org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization;
 import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard;
 import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardIterator;
@@ -54,6 +55,8 @@ public class QueueTimeouter extends UntypedActor {
     private final QakkaFig qakkaFig;
     private final CassandraClient           cassandraClient;
 
+    private final MessageCounterSerialization messageCounterSerialization;
+
 
     public QueueTimeouter(String queueName ) {
         this.queueName = queueName;
@@ -65,6 +68,8 @@ public class QueueTimeouter extends UntypedActor {
         qakkaFig             = injector.getInstance( QakkaFig.class );
         metricsService       = injector.getInstance( MetricsService.class );
         cassandraClient      = injector.getInstance( CassandraClientImpl.class );
+
+        messageCounterSerialization = injector.getInstance( MessageCounterSerialization.class );
     }
 
 
@@ -134,6 +139,9 @@ public class QueueTimeouter extends UntypedActor {
 
                 if (count > 0) {
                     logger.debug( "Timed out {} messages for queue {}", count, queueName );
+
+                    messageCounterSerialization.decrementCounter(
+                        queueName, DatabaseQueueMessage.Type.DEFAULT, count);
                 }
 
             } finally {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9306f12e/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java
index 8657370..7166ef1 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java
@@ -30,6 +30,7 @@ import org.apache.usergrid.persistence.qakka.distributed.messages.QueueWriteResp
 import org.apache.usergrid.persistence.qakka.serialization.auditlog.AuditLog;
 import org.apache.usergrid.persistence.qakka.serialization.auditlog.AuditLogSerialization;
 import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.MessageCounterSerialization;
 import org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization;
 import org.apache.usergrid.persistence.qakka.serialization.transferlog.TransferLogSerialization;
 import org.slf4j.Logger;
@@ -48,6 +49,8 @@ public class QueueWriter extends UntypedActor {
     private final AuditLogSerialization     auditLogSerialization;
     private final MetricsService            metricsService;
 
+    private final MessageCounterSerialization messageCounterSerialization;
+
 
     public QueueWriter() {
 
@@ -57,96 +60,101 @@ public class QueueWriter extends UntypedActor {
         transferLogSerialization = injector.getInstance( TransferLogSerialization.class );
         auditLogSerialization    = injector.getInstance( AuditLogSerialization.class );
         metricsService           = injector.getInstance( MetricsService.class );
+
+        messageCounterSerialization = injector.getInstance( MessageCounterSerialization.class );
     }
 
     @Override
     public void onReceive(Object message) {
 
-            if (message instanceof QueueWriteRequest) {
-
-                Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.SEND_TIME_WRITE ).time();
+        if (message instanceof QueueWriteRequest) {
 
-                try {
-                    QueueWriteRequest qa = (QueueWriteRequest) message;
-
-                    UUID queueMessageId = QakkaUtils.getTimeUuid();
+            Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.SEND_TIME_WRITE ).time();
 
-                    // TODO: implement deliveryTime and expirationTime
+            try {
+                QueueWriteRequest qa = (QueueWriteRequest) message;
 
-                    DatabaseQueueMessage dbqm = null;
-                    long currentTime = System.currentTimeMillis();
+                UUID queueMessageId = QakkaUtils.getTimeUuid();
 
-                    try {
-                        dbqm = new DatabaseQueueMessage(
-                                qa.getMessageId(),
-                                DatabaseQueueMessage.Type.DEFAULT,
-                                qa.getQueueName(),
-                                qa.getDestRegion(),
-                                null,
-                                currentTime,
-                                currentTime,
-                                queueMessageId );
+                // TODO: implement deliveryTime and expirationTime
 
-                        messageSerialization.writeMessage( dbqm );
+                DatabaseQueueMessage dbqm = null;
+                long currentTime = System.currentTimeMillis();
 
-                        //logger.debug("Wrote queue message id {} to queue name {}",
-                        //        dbqm.getQueueMessageId(), dbqm.getQueueName());
+                try {
+                    dbqm = new DatabaseQueueMessage(
+                            qa.getMessageId(),
+                            DatabaseQueueMessage.Type.DEFAULT,
+                            qa.getQueueName(),
+                            qa.getDestRegion(),
+                            null,
+                            currentTime,
+                            currentTime,
+                            queueMessageId );
 
-                    } catch (Throwable t) {
-                        logger.debug("Error creating database queue message", t);
+                    messageSerialization.writeMessage( dbqm );
 
-                        auditLogSerialization.recordAuditLog(
-                                AuditLog.Action.SEND,
-                                AuditLog.Status.ERROR,
-                                qa.getQueueName(),
-                                qa.getDestRegion(),
-                                qa.getMessageId(),
-                                dbqm.getMessageId() );
+                    messageCounterSerialization.incrementCounter(
+                        qa.getQueueName(), DatabaseQueueMessage.Type.DEFAULT, 1);
 
-                        getSender().tell( new QueueWriteResponse(
-                                QueueWriter.WriteStatus.ERROR ), getSender() );
+                    //logger.debug("Wrote queue message id {} to queue name {}",
+                    //        dbqm.getQueueMessageId(), dbqm.getQueueName());
 
-                        return;
-                    }
+                } catch (Throwable t) {
+                    logger.debug("Error creating database queue message", t);
 
                     auditLogSerialization.recordAuditLog(
                             AuditLog.Action.SEND,
-                            AuditLog.Status.SUCCESS,
+                            AuditLog.Status.ERROR,
                             qa.getQueueName(),
                             qa.getDestRegion(),
                             qa.getMessageId(),
-                            dbqm.getQueueMessageId() );
-
-                    try {
-                        transferLogSerialization.removeTransferLog(
-                                qa.getQueueName(),
-                                qa.getSourceRegion(),
-                                qa.getDestRegion(),
-                                qa.getMessageId() );
-
-                        getSender().tell( new QueueWriteResponse(
-                                QueueWriter.WriteStatus.SUCCESS_XFERLOG_DELETED ), getSender() );
-
-                    } catch (Throwable e) {
-                        logger.debug( "Unable to delete transfer log for {} {} {} {}",
-                                qa.getQueueName(),
-                                qa.getSourceRegion(),
-                                qa.getDestRegion(),
-                                qa.getMessageId() );
-                        logger.debug("Error deleting transferlog", e);
-
-                        getSender().tell( new QueueWriteResponse(
-                                QueueWriter.WriteStatus.SUCCESS_XFERLOG_NOTDELETED ), getSender() );
-                    }
-
-                } finally {
-                    timer.close();
+                            dbqm.getMessageId() );
+
+                    getSender().tell( new QueueWriteResponse(
+                            QueueWriter.WriteStatus.ERROR ), getSender() );
+
+                    return;
                 }
 
-            } else {
-                unhandled( message );
+                auditLogSerialization.recordAuditLog(
+                        AuditLog.Action.SEND,
+                        AuditLog.Status.SUCCESS,
+                        qa.getQueueName(),
+                        qa.getDestRegion(),
+                        qa.getMessageId(),
+                        dbqm.getQueueMessageId() );
+
+                try {
+                    transferLogSerialization.removeTransferLog(
+                            qa.getQueueName(),
+                            qa.getSourceRegion(),
+                            qa.getDestRegion(),
+                            qa.getMessageId() );
+
+                    getSender().tell( new QueueWriteResponse(
+                            QueueWriter.WriteStatus.SUCCESS_XFERLOG_DELETED ), getSender() );
+
+                } catch (Throwable e) {
+                    logger.debug( "Unable to delete transfer log for {} {} {} {}",
+                            qa.getQueueName(),
+                            qa.getSourceRegion(),
+                            qa.getDestRegion(),
+                            qa.getMessageId() );
+                    logger.debug("Error deleting transferlog", e);
+
+                    getSender().tell( new QueueWriteResponse(
+                            QueueWriter.WriteStatus.SUCCESS_XFERLOG_NOTDELETED ), getSender() );
+                }
+
+            } finally {
+                timer.close();
             }
 
+        } else {
+            unhandled( message );
+        }
+
     }
 
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9306f12e/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
index be20cde..3d6a808 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
@@ -34,6 +34,7 @@ import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService
 import org.apache.usergrid.persistence.qakka.distributed.messages.*;
 import org.apache.usergrid.persistence.qakka.exceptions.QakkaRuntimeException;
 import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.MessageCounterSerialization;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.Await;
@@ -54,17 +55,20 @@ public class DistributedQueueServiceImpl implements DistributedQueueService {
     private final ActorSystemManager actorSystemManager;
     private final QueueManager queueManager;
     private final QakkaFig qakkaFig;
+    private final MessageCounterSerialization messageCounterSerialization;
 
 
     @Inject
     public DistributedQueueServiceImpl(
             ActorSystemManager actorSystemManager,
             QueueManager queueManager,
-            QakkaFig qakkaFig ) {
+            QakkaFig qakkaFig,
+            MessageCounterSerialization messageCounterSerialization ) {
 
         this.actorSystemManager = actorSystemManager;
         this.queueManager = queueManager;
         this.qakkaFig = qakkaFig;
+        this.messageCounterSerialization = messageCounterSerialization;
     }
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9306f12e/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/MessageCounterSerialization.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/MessageCounterSerialization.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/MessageCounterSerialization.java
index cbbf11f..6c81863 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/MessageCounterSerialization.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/MessageCounterSerialization.java
@@ -21,11 +21,11 @@ package org.apache.usergrid.persistence.qakka.serialization.queuemessages;
 import org.apache.usergrid.persistence.core.migration.schema.Migration;
 
 
-public interface MessageCounterSerialization  extends Migration {
+public interface MessageCounterSerialization extends Migration {
 
     void incrementCounter(String queueName, DatabaseQueueMessage.Type type, long increment);
 
-    void decrementCounter(String queueName, DatabaseQueueMessage.Type type, long increment);
+    void decrementCounter(String queueName, DatabaseQueueMessage.Type type, long decrement);
 
     long getCounterValue(String name, DatabaseQueueMessage.Type type);
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9306f12e/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java
index f198d05..0fdb47e 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java
@@ -32,8 +32,8 @@ import org.apache.usergrid.persistence.qakka.QakkaFig;
 import org.apache.usergrid.persistence.qakka.core.CassandraClient;
 import org.apache.usergrid.persistence.qakka.exceptions.NotFoundException;
 import org.apache.usergrid.persistence.qakka.exceptions.QakkaRuntimeException;
-import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard;
-import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardCounterSerialization;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.MessageCounterSerialization;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,49 +43,56 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 
 @Singleton
-public class MessageCounterSerializationImpl implements ShardCounterSerialization {
+public class MessageCounterSerializationImpl implements MessageCounterSerialization {
     private static final Logger logger = LoggerFactory.getLogger( MessageCounterSerializationImpl.class );
 
     private final CassandraClient cassandraClient;
     private final CassandraConfig cassandraConfig;
 
-    final static String TABLE_SHARD_COUNTERS       = "counters";
-    final static String COLUMN_QUEUE_NAME    = "queue_name";
-    final static String COLUMN_SHARD_ID      = "shard_id";
-    final static String COLUMN_COUNTER_VALUE = "counter_value";
-    final static String COLUMN_SHARD_TYPE    = "shard_type";
+    final static String TABLE_MESSAGE_COUNTERS = "message_counters";
+    final static String COLUMN_QUEUE_NAME      = "queue_name";
+    final static String COLUMN_COUNTER_VALUE   = "counter_value";
+    final static String COLUMN_MESSAGE_TYPE    = "message_type";
 
     // design note: counters based on DataStax example here:
     // https://docs.datastax.com/en/cql/3.1/cql/cql_using/use_counter_t.html
 
     static final String CQL =
-        "CREATE TABLE IF NOT EXISTS shard_counters ( " +
+        "CREATE TABLE IF NOT EXISTS message_counters ( " +
                 "counter_value counter, " +
                 "queue_name    varchar, " +
-                "shard_type    varchar, " +
-                "shard_id      bigint, " +
-                "PRIMARY KEY (queue_name, shard_type, shard_id) " +
+                "message_type  varchar, " +
+                "PRIMARY KEY (queue_name, message_type) " +
         ");";
 
 
-    final long maxInMemoryIncrement;
+    /** number of changes since last save to database */
+    final AtomicInteger numChanges = new AtomicInteger( 0 );
+
+    final long maxChangesBeforeSave;
 
     class InMemoryCount {
         long baseCount;
         final AtomicLong increment = new AtomicLong( 0L );
+        final AtomicLong decrement = new AtomicLong( 0L );
+
         InMemoryCount( long baseCount ) {
             this.baseCount = baseCount;
         }
-        public long value() {
-            return baseCount + increment.get();
-        }
         public AtomicLong getIncrement() {
             return increment;
         }
+        public AtomicLong getDecrement() {
+            return decrement;
+        }
+        public long value() {
+            return baseCount + increment.get() - decrement.get();
+        }
         void setBaseCount( long baseCount ) {
             this.baseCount = baseCount;
         }
@@ -95,64 +102,89 @@ public class MessageCounterSerializationImpl implements ShardCounterSerializatio
 
 
     @Inject
-    public MessageCounterSerializationImpl( CassandraConfig cassandraConfig, QakkaFig qakkaFig, CassandraClient cassandraClient ) {
+    public MessageCounterSerializationImpl(
+        CassandraConfig cassandraConfig, QakkaFig qakkaFig, CassandraClient cassandraClient ) {
+
         this.cassandraConfig = cassandraConfig;
-        this.maxInMemoryIncrement = qakkaFig.getMaxInMemoryShardCounter();
+        this.maxChangesBeforeSave = qakkaFig.getMaxInMemoryMessageCounter();
         this.cassandraClient = cassandraClient;
     }
 
 
+    private String buildKey( String queueName, DatabaseQueueMessage.Type type ) {
+        return queueName + "_" + type;
+    }
+
+
     @Override
-    public void incrementCounter(String queueName, Shard.Type type, long shardId, long increment ) {
+    public void incrementCounter(String queueName, DatabaseQueueMessage.Type type, long increment ) {
+
+        String key = buildKey( queueName, type );
 
-        String key = queueName + type + shardId;
         synchronized ( inMemoryCounters ) {
 
             if ( inMemoryCounters.get( key ) == null ) {
 
-                Long value = retrieveCounterFromStorage( queueName, type, shardId );
+                Long value = retrieveCounterFromStorage( queueName, type );
 
                 if ( value == null ) {
-                    incrementCounterInStorage( queueName, type, shardId, 0L );
+                    incrementCounterInStorage( queueName, type, 0L );
                     inMemoryCounters.put( key, new InMemoryCount( 0L ));
                 } else {
                     inMemoryCounters.put( key, new InMemoryCount( value ));
                 }
-                inMemoryCounters.get( key ).getIncrement().addAndGet( increment );
-                return;
             }
         }
 
         InMemoryCount inMemoryCount = inMemoryCounters.get( key );
+        inMemoryCount.getIncrement().addAndGet( increment );
 
-        synchronized ( inMemoryCount ) {
-            long totalIncrement = inMemoryCount.getIncrement().addAndGet( increment );
+        saveIfNeeded( queueName, type );
+    }
 
-            if (totalIncrement > maxInMemoryIncrement) {
-                incrementCounterInStorage( queueName, type, shardId, totalIncrement );
-                inMemoryCount.setBaseCount( retrieveCounterFromStorage( queueName, type, shardId ) );
-                inMemoryCount.getIncrement().set( 0L );
+
+    @Override
+    public void decrementCounter(String queueName, DatabaseQueueMessage.Type type, long decrement) {
+
+        String key = buildKey( queueName, type );
+
+        synchronized ( inMemoryCounters ) {
+
+            if ( inMemoryCounters.get( key ) == null ) {
+
+                Long value = retrieveCounterFromStorage( queueName, type );
+
+                if ( value == null ) {
+                    decrementCounterInStorage( queueName, type, 0L );
+                    inMemoryCounters.put( key, new InMemoryCount( 0L ));
+                } else {
+                    inMemoryCounters.put( key, new InMemoryCount( value ));
+                }
             }
         }
 
+        InMemoryCount inMemoryCount = inMemoryCounters.get( key );
+        inMemoryCount.getDecrement().addAndGet( decrement );
+
+        saveIfNeeded( queueName, type );
     }
 
 
     @Override
-    public long getCounterValue( String queueName, Shard.Type type, long shardId ) {
+    public long getCounterValue( String queueName, DatabaseQueueMessage.Type type ) {
 
-        String key = queueName + type + shardId;
+        String key = buildKey( queueName, type );
 
         synchronized ( inMemoryCounters ) {
 
             if ( inMemoryCounters.get( key ) == null ) {
 
-                Long value = retrieveCounterFromStorage( queueName, type, shardId );
+                Long value = retrieveCounterFromStorage( queueName, type );
 
                 if ( value == null ) {
                     throw new NotFoundException(
-                            MessageFormat.format( "No counter found for queue {0} type {1} shardId {2}",
-                                    queueName, type, shardId ));
+                            MessageFormat.format( "No counter found for queue {0} type {1}",
+                                    queueName, type ));
                 } else {
                     inMemoryCounters.put( key, new InMemoryCount( value ));
                 }
@@ -162,30 +194,39 @@ public class MessageCounterSerializationImpl implements ShardCounterSerializatio
         return inMemoryCounters.get( key ).value();
     }
 
-    void incrementCounterInStorage( String queueName, Shard.Type type, long shardId, long increment ) {
 
-        Statement update = QueryBuilder.update( TABLE_SHARD_COUNTERS )
+    void incrementCounterInStorage( String queueName, DatabaseQueueMessage.Type type, long increment ) {
+
+        Statement update = QueryBuilder.update( TABLE_MESSAGE_COUNTERS )
                 .where( QueryBuilder.eq(   COLUMN_QUEUE_NAME, queueName ) )
-                .and(   QueryBuilder.eq(   COLUMN_SHARD_TYPE, type.toString() ) )
-                .and(   QueryBuilder.eq(   COLUMN_SHARD_ID, shardId ) )
+                .and(   QueryBuilder.eq(   COLUMN_MESSAGE_TYPE, type.toString() ) )
                 .with(  QueryBuilder.incr( COLUMN_COUNTER_VALUE, increment ) );
         cassandraClient.getQueueMessageSession().execute( update );
     }
 
 
-    Long retrieveCounterFromStorage( String queueName, Shard.Type type, long shardId ) {
+    void decrementCounterInStorage( String queueName, DatabaseQueueMessage.Type type, long decrement ) {
 
-        Statement query = QueryBuilder.select().from( TABLE_SHARD_COUNTERS )
+        Statement update = QueryBuilder.update( TABLE_MESSAGE_COUNTERS )
+            .where( QueryBuilder.eq(   COLUMN_QUEUE_NAME, queueName ) )
+            .and(   QueryBuilder.eq(   COLUMN_MESSAGE_TYPE, type.toString() ) )
+            .with(  QueryBuilder.decr( COLUMN_COUNTER_VALUE, decrement ) );
+        cassandraClient.getQueueMessageSession().execute( update );
+    }
+
+
+    Long retrieveCounterFromStorage( String queueName, DatabaseQueueMessage.Type type ) {
+
+        Statement query = QueryBuilder.select().from( TABLE_MESSAGE_COUNTERS )
                 .where( QueryBuilder.eq( COLUMN_QUEUE_NAME, queueName ) )
-                .and( QueryBuilder.eq( COLUMN_SHARD_TYPE, type.toString()) )
-                .and( QueryBuilder.eq( COLUMN_SHARD_ID, shardId ) );
+                .and( QueryBuilder.eq( COLUMN_MESSAGE_TYPE, type.toString()) );
 
         ResultSet resultSet = cassandraClient.getQueueMessageSession().execute( query );
         List<Row> all = resultSet.all();
 
         if ( all.size() > 1 ) {
             throw new QakkaRuntimeException(
-                    "Multiple rows for counter " + queueName + " type " + type + " shardId " + shardId );
+                    "Multiple rows for counter " + queueName + " type " + type );
         }
         if ( all.isEmpty() ) {
             return null;
@@ -194,6 +235,32 @@ public class MessageCounterSerializationImpl implements ShardCounterSerializatio
     }
 
 
+    private void saveIfNeeded( String queueName, DatabaseQueueMessage.Type type ) {
+
+        String key = buildKey( queueName, type );
+
+        InMemoryCount inMemoryCount = inMemoryCounters.get( key );
+
+        synchronized ( inMemoryCount ) {
+
+            if ( numChanges.incrementAndGet() > maxChangesBeforeSave ) {
+
+                long totalIncrement = inMemoryCount.getIncrement().get();
+                incrementCounterInStorage( queueName, type, totalIncrement );
+
+                long totalDecrement = inMemoryCount.getDecrement().get();
+                decrementCounterInStorage( queueName, type, totalDecrement );
+
+                inMemoryCount.setBaseCount( retrieveCounterFromStorage( queueName, type ) );
+                inMemoryCount.getIncrement().set( 0L );
+                inMemoryCount.getDecrement().set( 0L );
+
+                numChanges.set( 0 );
+            }
+        }
+    }
+
+
     @Override
     public Collection<MultiTenantColumnFamilyDefinition> getColumnFamilies() {
         return Collections.EMPTY_LIST;
@@ -201,8 +268,8 @@ public class MessageCounterSerializationImpl implements ShardCounterSerializatio
 
     @Override
     public Collection<TableDefinition> getTables() {
-        return Collections.singletonList(
-            new TableDefinitionStringImpl( cassandraConfig.getApplicationLocalKeyspace(), TABLE_SHARD_COUNTERS, CQL ) );
+        return Collections.singletonList( new TableDefinitionStringImpl(
+            cassandraConfig.getApplicationLocalKeyspace(), TABLE_MESSAGE_COUNTERS, CQL ) );
     }
 
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9306f12e/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java
index 0eb609d..f3cae86 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QakkaQueueManager.java
@@ -24,6 +24,7 @@ import com.google.inject.assistedinject.Assisted;
 import org.apache.usergrid.persistence.qakka.QakkaFig;
 import org.apache.usergrid.persistence.qakka.core.*;
 import org.apache.usergrid.persistence.qakka.exceptions.QakkaRuntimeException;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.MessageCounterSerialization;
 import org.apache.usergrid.persistence.queue.LegacyQueueFig;
 import org.apache.usergrid.persistence.queue.LegacyQueueManager;
 import org.apache.usergrid.persistence.queue.LegacyQueueMessage;
@@ -139,10 +140,9 @@ public class QakkaQueueManager implements LegacyQueueManager {
         return messages;
     }
 
-
     @Override
     public long getQueueDepth() {
-        return 0;
+        return queueMessageManager.getQueueDepth( scope.getName() );
     }
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9306f12e/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
index 0413f81..3225a66 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
@@ -177,6 +177,8 @@ public class QueueMessageManagerTest extends AbstractTest {
             }
         }
 
+        Assert.assertEquals( numMessages, qmm.getQueueDepth( queueName ) );
+
         // get all messages from queue
 
         List<QueueMessage> messages = qmm.getNextMessages( queueName, numMessages );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9306f12e/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationTest.java
new file mode 100644
index 0000000..a4ea0f1
--- /dev/null
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.qakka.serialization.queuemessages.impl;
+
+import com.google.inject.Injector;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.usergrid.persistence.qakka.AbstractTest;
+import org.apache.usergrid.persistence.qakka.exceptions.NotFoundException;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
+import org.apache.usergrid.persistence.qakka.serialization.queuemessages.MessageCounterSerialization;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.junit.Assert.fail;
+
+
+/**
+ * Created by Dave Johnson (snoopdave@apache.org) on 9/20/16.
+ */
+public class MessageCounterSerializationTest extends AbstractTest {
+
+    @Test
+    public void testBasicOperation() {
+
+        Injector injector = getInjector();
+        MessageCounterSerialization mcs = injector.getInstance( MessageCounterSerialization.class );
+
+        String queueName = "mcst_queue_" + RandomStringUtils.randomAlphanumeric( 20 );
+
+        try {
+            mcs.getCounterValue( queueName, DatabaseQueueMessage.Type.DEFAULT );
+            fail("Should have throw NotFoundException");
+        } catch ( NotFoundException expected ) {
+            // pass
+        }
+
+        for ( int i=0; i<10; i++ ) {
+            mcs.incrementCounter( queueName, DatabaseQueueMessage.Type.DEFAULT, 1 );
+            Assert.assertEquals( i+1, mcs.getCounterValue( queueName, DatabaseQueueMessage.Type.DEFAULT ) );
+        }
+
+        mcs.decrementCounter( queueName, DatabaseQueueMessage.Type.DEFAULT, 10 );
+        Assert.assertEquals( 0, mcs.getCounterValue( queueName, DatabaseQueueMessage.Type.DEFAULT ) );
+
+        mcs.incrementCounter( queueName, DatabaseQueueMessage.Type.DEFAULT, 10 );
+        Assert.assertEquals( 10, mcs.getCounterValue( queueName, DatabaseQueueMessage.Type.DEFAULT ) );
+
+        mcs.incrementCounter( queueName, DatabaseQueueMessage.Type.DEFAULT, 10 );
+        Assert.assertEquals( 20, mcs.getCounterValue( queueName, DatabaseQueueMessage.Type.DEFAULT ) );
+
+        mcs.incrementCounter( queueName, DatabaseQueueMessage.Type.DEFAULT, 10 );
+        Assert.assertEquals( 30, mcs.getCounterValue( queueName, DatabaseQueueMessage.Type.DEFAULT ) );
+
+        mcs.incrementCounter( queueName, DatabaseQueueMessage.Type.DEFAULT, 10 );
+        Assert.assertEquals( 40, mcs.getCounterValue( queueName, DatabaseQueueMessage.Type.DEFAULT ) );
+
+        mcs.incrementCounter( queueName, DatabaseQueueMessage.Type.DEFAULT, 10 );
+        Assert.assertEquals( 50, mcs.getCounterValue( queueName, DatabaseQueueMessage.Type.DEFAULT ) );
+
+        mcs.incrementCounter( queueName, DatabaseQueueMessage.Type.DEFAULT, 50 );
+        Assert.assertEquals( 100, mcs.getCounterValue( queueName, DatabaseQueueMessage.Type.DEFAULT ) );
+
+        mcs.decrementCounter( queueName, DatabaseQueueMessage.Type.DEFAULT, 10 );
+        Assert.assertEquals( 90, mcs.getCounterValue( queueName, DatabaseQueueMessage.Type.DEFAULT ) );
+
+        mcs.decrementCounter( queueName, DatabaseQueueMessage.Type.DEFAULT, 10 );
+        Assert.assertEquals( 80, mcs.getCounterValue( queueName, DatabaseQueueMessage.Type.DEFAULT ) );
+
+        mcs.decrementCounter( queueName, DatabaseQueueMessage.Type.DEFAULT, 10 );
+        Assert.assertEquals( 70, mcs.getCounterValue( queueName, DatabaseQueueMessage.Type.DEFAULT ) );
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9306f12e/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardCounterSerializationTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardCounterSerializationTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardCounterSerializationTest.java
index f9c2951..8dc16bb 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardCounterSerializationTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/serialization/sharding/ShardCounterSerializationTest.java
@@ -36,9 +36,6 @@ public class ShardCounterSerializationTest extends AbstractTest {
     @Test
     public void testBasicOperation() throws Exception {
 
-        CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class );
-
-
         ShardCounterSerialization scs = getInjector().getInstance( ShardCounterSerialization.class );
 
         String queueName = "scst_queue_" + RandomStringUtils.randomAlphanumeric( 20 );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9306f12e/stack/corepersistence/queue/src/test/resources/qakka.properties
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/resources/qakka.properties b/stack/corepersistence/queue/src/test/resources/qakka.properties
index aacc187..fb46f3d 100644
--- a/stack/corepersistence/queue/src/test/resources/qakka.properties
+++ b/stack/corepersistence/queue/src/test/resources/qakka.properties
@@ -44,6 +44,7 @@ queue.shard.allocation.check.frequency.millis=100
 queue.shard.allocation.advance.time.millis=200
 
 queue.max.inmemory.shard.counter = 100
+queue.max.inmemory.max.message.changes=3
 
 queue.long.polling.time.millis=2000
 


[09/10] usergrid git commit: Change to use proper Guice injection instead of static injector kludge.

Posted by sn...@apache.org.
http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a19ba9a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueReaderTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueReaderTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueReaderTest.java
index 19c1211..ba3c0f8 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueReaderTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueReaderTest.java
@@ -22,8 +22,11 @@ package org.apache.usergrid.persistence.qakka.distributed.actors;
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.actor.Props;
+import com.google.inject.Inject;
+import com.google.inject.Injector;
 import org.apache.commons.lang.RandomStringUtils;
 import org.apache.usergrid.persistence.actorsystem.ActorSystemFig;
+import org.apache.usergrid.persistence.actorsystem.GuiceActorProducer;
 import org.apache.usergrid.persistence.qakka.QakkaFig;
 import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl;
 import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard;
@@ -52,10 +55,7 @@ public class QueueReaderTest extends AbstractTest {
     @Test
     public void testBasicOperation() throws Exception {
 
-        CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class );
-
-
-        getInjector().getInstance( App.class ); // init the INJECTOR
+        Injector injector = getInjector();
 
         QakkaFig qakkaFig = getInjector().getInstance( QakkaFig.class );
         ActorSystemFig actorSystemFig = getInjector().getInstance( ActorSystemFig.class );
@@ -94,7 +94,8 @@ public class QueueReaderTest extends AbstractTest {
         // run the QueueRefresher to fill up the in-memory queue
 
         ActorSystem system = ActorSystem.create("Test-" + queueName);
-        ActorRef queueReaderRef = system.actorOf( Props.create( QueueRefresher.class, queueName ), "queueReader");
+        ActorRef queueReaderRef = system.actorOf(
+            Props.create( GuiceActorProducer.class, injector, QueueRefresher.class ), "queueReader");
         QueueRefreshRequest refreshRequest = new QueueRefreshRequest( queueName, false );
 
         // need to wait for refresh to complete

http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a19ba9a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouterTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouterTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouterTest.java
index e3541a4..3079773 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouterTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouterTest.java
@@ -22,8 +22,10 @@ package org.apache.usergrid.persistence.qakka.distributed.actors;
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.actor.Props;
+import com.google.inject.Injector;
 import org.apache.commons.lang.RandomStringUtils;
 import org.apache.usergrid.persistence.actorsystem.ActorSystemFig;
+import org.apache.usergrid.persistence.actorsystem.GuiceActorProducer;
 import org.apache.usergrid.persistence.qakka.AbstractTest;
 import org.apache.usergrid.persistence.qakka.App;
 import org.apache.usergrid.persistence.qakka.QakkaFig;
@@ -58,7 +60,7 @@ public class QueueTimeouterTest extends AbstractTest {
 
         CassandraClient cassandraClient = getInjector().getInstance( CassandraClientImpl.class );
 
-        getInjector().getInstance( App.class ); // init the INJECTOR
+        Injector injector = getInjector();
 
         QakkaFig qakkaFig             = getInjector().getInstance( QakkaFig.class );
         ActorSystemFig actorSystemFig = getInjector().getInstance( ActorSystemFig.class );
@@ -110,7 +112,8 @@ public class QueueTimeouterTest extends AbstractTest {
         // run timeouter actor
 
         ActorSystem system = ActorSystem.create("Test-" + queueName);
-        ActorRef timeouterRef = system.actorOf( Props.create( QueueTimeouter.class, queueName ), "timeouter");
+        ActorRef timeouterRef = system.actorOf( Props.create(
+            GuiceActorProducer.class, injector, QueueTimeouter.class), "timeouter");
         QueueTimeoutRequest qtr = new QueueTimeoutRequest( queueName );
         timeouterRef.tell( qtr, null ); // tell sends message, returns immediately
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a19ba9a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java
index 7fd664f..b602177 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocatorTest.java
@@ -28,6 +28,7 @@ import com.google.inject.Guice;
 import com.google.inject.Injector;
 import org.apache.commons.lang.RandomStringUtils;
 import org.apache.usergrid.persistence.actorsystem.ActorSystemFig;
+import org.apache.usergrid.persistence.actorsystem.GuiceActorProducer;
 import org.apache.usergrid.persistence.qakka.AbstractTest;
 import org.apache.usergrid.persistence.qakka.App;
 import org.apache.usergrid.persistence.qakka.QakkaModule;
@@ -111,7 +112,8 @@ public class ShardAllocatorTest extends AbstractTest {
         // Run shard allocator actor by sending message to it
 
         ActorSystem system = ActorSystem.create("Test-" + queueName);
-        ActorRef shardAllocRef = system.actorOf( Props.create( ShardAllocator.class, queueName ), "shardallocator");
+        ActorRef shardAllocRef = system.actorOf( Props.create(
+            GuiceActorProducer.class, injector, ShardAllocator.class), "shardallocator");
 
         ShardCheckRequest checkRequest = new ShardCheckRequest( queueName );
         shardAllocRef.tell( checkRequest, null ); // tell sends message, returns immediately
@@ -187,26 +189,32 @@ public class ShardAllocatorTest extends AbstractTest {
 
         queueManager.createQueue( new Queue( queueName ));
 
-        // Create 4000 messages
+        try {
 
-        int numMessages = 4000;
+            // Create 4000 messages
 
-        for ( int i=0; i<numMessages; i++ ) {
-            queueMessageManager.sendMessages(
+            int numMessages = 4000;
+
+            for (int i = 0; i < numMessages; i++) {
+                queueMessageManager.sendMessages(
                     queueName,
                     Collections.singletonList( region ),
                     null, // delay
                     null, // expiration
                     "application/json",
                     DataType.serializeValue( "{}", ProtocolVersion.NEWEST_SUPPORTED ) );
-        }
+            }
 
-        distributedQueueService.refresh();
-        Thread.sleep(3000);
+            distributedQueueService.refresh();
+            Thread.sleep( 3000 );
 
-        // Test that 8 shards were created
+            // Test that 8 shards were created
 
-        Assert.assertTrue("num shards >= 7",
-            countShards( cassandraClient, shardCounterSer, queueName, region, Shard.Type.DEFAULT ) >= 7 );
+            Assert.assertTrue( "num shards >= 7",
+                countShards( cassandraClient, shardCounterSer, queueName, region, Shard.Type.DEFAULT ) >= 7 );
+
+        } finally {
+            queueManager.deleteQueue( queueName );
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a19ba9a/stack/corepersistence/queue/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/resources/log4j.properties b/stack/corepersistence/queue/src/test/resources/log4j.properties
index b207ea3..e1cbda4 100644
--- a/stack/corepersistence/queue/src/test/resources/log4j.properties
+++ b/stack/corepersistence/queue/src/test/resources/log4j.properties
@@ -28,6 +28,6 @@ log4j.logger.org.glassfish=WARN
 #log4j.logger.org.apache.usergrid.persistence.actorsystem=DEBUG
 #log4j.logger.org.apache.usergrid.persistence.qakka=DEBUG
 
-log4j.logger.org.apache.usergrid.persistence.qakka=DEBUG
-log4j.logger.org.apache.usergrid.persistence.queue=DEBUG
-log4j.logger.org.apache.usergrid.corepersistence.asyncevents=DEBUG
+log4j.logger.org.apache.usergrid.persistence.qakka.distributed.actors=DEBUG
+log4j.logger.org.apache.usergrid.persistence.queue=INFO
+log4j.logger.org.apache.usergrid.corepersistence.asyncevents=INFO


[05/10] usergrid git commit: When writing to queue, if in-memory queue is empty then refresh will be done.

Posted by sn...@apache.org.
When writing to queue, if in-memory queue is empty then refresh will be done.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/727ff1d6
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/727ff1d6
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/727ff1d6

Branch: refs/heads/usergrid-1318-queue
Commit: 727ff1d642fdaf3768ab2b1f0c07364854ec2e60
Parents: 9306f12
Author: Dave Johnson <sn...@apache.org>
Authored: Wed Sep 21 13:48:47 2016 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Wed Sep 21 13:48:47 2016 -0400

----------------------------------------------------------------------
 .../org/apache/usergrid/persistence/qakka/QakkaFig.java  |  2 +-
 .../persistence/qakka/core/impl/InMemoryQueue.java       |  4 ++++
 .../persistence/qakka/distributed/actors/QueueActor.java | 11 +++++++----
 .../qakka/distributed/actors/QueueWriter.java            |  3 +++
 .../distributed/impl/DistributedQueueServiceImpl.java    |  8 ++++++--
 .../qakka/distributed/messages/QueueRefreshRequest.java  |  8 +++++++-
 .../persistence/qakka/core/QueueMessageManagerTest.java  |  1 +
 .../qakka/distributed/actors/QueueReaderTest.java        |  2 +-
 8 files changed, 30 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/727ff1d6/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
index c3f4189..da47c98 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
@@ -94,7 +94,7 @@ public interface QakkaFig extends GuicyFig, Serializable {
 
     /** How often to refresh each queue's in-memory data */
     @Key(QUEUE_REFRESH_MILLISECONDS)
-    @Default("500")
+    @Default("1000")
     int getQueueRefreshMilliseconds();
 
     /** How many queue messages to keep in-memory */

http://git-wip-us.apache.org/repos/asf/usergrid/blob/727ff1d6/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java
index 474ef5c..27de079 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java
@@ -78,6 +78,10 @@ public class InMemoryQueue {
         return getQueue( queueName ).poll();
     }
 
+    public DatabaseQueueMessage peek( String queueName ) {
+        return getQueue( queueName ).peek();
+    }
+
     public int size( String queueName ) {
         return getQueue( queueName ).size();
     }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/727ff1d6/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
index 3b50711..315975c 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
@@ -86,7 +86,7 @@ public class QueueActor extends UntypedActor {
                     Duration.create( 0, TimeUnit.MILLISECONDS),
                     Duration.create( qakkaFig.getQueueRefreshMilliseconds(), TimeUnit.MILLISECONDS),
                     self(),
-                    new QueueRefreshRequest( request.getQueueName() ),
+                    new QueueRefreshRequest( request.getQueueName(), false ),
                     getContext().dispatcher(),
                     getSelf());
                 refreshSchedulersByQueueName.put( request.getQueueName(), scheduler );
@@ -121,9 +121,12 @@ public class QueueActor extends UntypedActor {
             QueueRefreshRequest request = (QueueRefreshRequest)message;
 
             if ( queueReadersByQueueName.get( request.getQueueName() ) == null ) {
-                ActorRef readerRef = getContext().actorOf( Props.create(
-                    QueueRefresher.class, request.getQueueName()), request.getQueueName() + "_reader");
-                queueReadersByQueueName.put( request.getQueueName(), readerRef );
+
+                if ( !request.isOnlyIfEmpty() || inMemoryQueue.peek( request.getQueueName()) == null ) {
+                    ActorRef readerRef = getContext().actorOf( Props.create(
+                        QueueRefresher.class, request.getQueueName()), request.getQueueName() + "_reader");
+                    queueReadersByQueueName.put( request.getQueueName(), readerRef );
+                }
             }
 
             // hand-off to queue's reader

http://git-wip-us.apache.org/repos/asf/usergrid/blob/727ff1d6/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java
index 7166ef1..e54d916 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java
@@ -25,6 +25,7 @@ import com.google.inject.Injector;
 import org.apache.usergrid.persistence.qakka.App;
 import org.apache.usergrid.persistence.qakka.MetricsService;
 import org.apache.usergrid.persistence.qakka.core.QakkaUtils;
+import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService;
 import org.apache.usergrid.persistence.qakka.distributed.messages.QueueWriteRequest;
 import org.apache.usergrid.persistence.qakka.distributed.messages.QueueWriteResponse;
 import org.apache.usergrid.persistence.qakka.serialization.auditlog.AuditLog;
@@ -44,6 +45,7 @@ public class QueueWriter extends UntypedActor {
 
     public enum WriteStatus { SUCCESS_XFERLOG_DELETED, SUCCESS_XFERLOG_NOTDELETED, ERROR };
 
+    private final DistributedQueueService   distributedQueueService;
     private final QueueMessageSerialization messageSerialization;
     private final TransferLogSerialization  transferLogSerialization;
     private final AuditLogSerialization     auditLogSerialization;
@@ -61,6 +63,7 @@ public class QueueWriter extends UntypedActor {
         auditLogSerialization    = injector.getInstance( AuditLogSerialization.class );
         metricsService           = injector.getInstance( MetricsService.class );
 
+        distributedQueueService     = injector.getInstance( DistributedQueueService.class );
         messageCounterSerialization = injector.getInstance( MessageCounterSerialization.class );
     }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/727ff1d6/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
index 3d6a808..9d71c31 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
@@ -25,7 +25,6 @@ import akka.util.Timeout;
 import com.datastax.driver.core.exceptions.InvalidQueryException;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
-import org.apache.log4j.net.SyslogAppender;
 import org.apache.usergrid.persistence.actorsystem.ActorSystemManager;
 import org.apache.usergrid.persistence.actorsystem.ClientActor;
 import org.apache.usergrid.persistence.qakka.QakkaFig;
@@ -114,7 +113,7 @@ public class DistributedQueueServiceImpl implements DistributedQueueService {
     @Override
     public void refreshQueue(String queueName) {
         logger.info("{} Requesting refresh for queue: {}", this, queueName);
-        QueueRefreshRequest request = new QueueRefreshRequest( queueName );
+        QueueRefreshRequest request = new QueueRefreshRequest( queueName, false );
         ActorRef clientActor = actorSystemManager.getClientActor();
         clientActor.tell( request, null );
     }
@@ -168,6 +167,11 @@ public class DistributedQueueServiceImpl implements DistributedQueueService {
                         if ( retries > 1 ) {
                             logger.debug("SUCCESS after {} retries", retries );
                         }
+
+                        logger.debug("{} Requesting refresh if empty for queue: {}", this, queueName);
+                        QueueRefreshRequest qrr = new QueueRefreshRequest( queueName, false );
+                        clientActor.tell( qrr, null );
+
                         return qarm.getSendStatus();
 
                     } else {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/727ff1d6/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueRefreshRequest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueRefreshRequest.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueRefreshRequest.java
index a81a6fd..b65ad3d 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueRefreshRequest.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/messages/QueueRefreshRequest.java
@@ -24,10 +24,16 @@ import org.apache.commons.lang3.builder.ToStringBuilder;
 
 public class QueueRefreshRequest implements QakkaMessage {
     private final String queueName;
+    private final boolean onlyIfEmpty;
 
 
-    public QueueRefreshRequest(String queueName ) {
+    public QueueRefreshRequest( String queueName, boolean onlyIfEmpty ) {
         this.queueName = queueName;
+        this.onlyIfEmpty = onlyIfEmpty;
+    }
+
+    public boolean isOnlyIfEmpty() {
+        return onlyIfEmpty;
     }
 
     public String getQueueName() {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/727ff1d6/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
index 3225a66..c10d1f5 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
@@ -175,6 +175,7 @@ public class QueueMessageManagerTest extends AbstractTest {
             if (inMemoryQueue.size( queueName ) == 40) {
                 break;
             }
+            Thread.sleep( 500 );
         }
 
         Assert.assertEquals( numMessages, qmm.getQueueDepth( queueName ) );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/727ff1d6/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueReaderTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueReaderTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueReaderTest.java
index 0b8b795..19c1211 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueReaderTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueReaderTest.java
@@ -95,7 +95,7 @@ public class QueueReaderTest extends AbstractTest {
 
         ActorSystem system = ActorSystem.create("Test-" + queueName);
         ActorRef queueReaderRef = system.actorOf( Props.create( QueueRefresher.class, queueName ), "queueReader");
-        QueueRefreshRequest refreshRequest = new QueueRefreshRequest( queueName );
+        QueueRefreshRequest refreshRequest = new QueueRefreshRequest( queueName, false );
 
         // need to wait for refresh to complete
         int maxRetries = 10;


[10/10] usergrid git commit: Change to use proper Guice injection instead of static injector kludge.

Posted by sn...@apache.org.
Change to use proper Guice injection instead of static injector kludge.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/5a19ba9a
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/5a19ba9a
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/5a19ba9a

Branch: refs/heads/usergrid-1318-queue
Commit: 5a19ba9a748c5d96f5356d77df1e7845aa92fc0f
Parents: 2cd8ecb
Author: Dave Johnson <sn...@apache.org>
Authored: Fri Sep 30 17:18:24 2016 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Fri Sep 30 17:18:24 2016 -0400

----------------------------------------------------------------------
 build.log                                       |  15 ++
 .../apache/usergrid/persistence/qakka/App.java  |  17 +-
 .../qakka/core/impl/InMemoryQueue.java          |  11 +-
 .../core/impl/QueueMessageManagerImpl.java      |  18 +-
 .../distributed/DistributedQueueService.java    |   2 +-
 .../qakka/distributed/actors/QueueActor.java    |  56 +++---
 .../distributed/actors/QueueActorHelper.java    | 105 ++++++++++-
 .../distributed/actors/QueueActorRouter.java    |  11 +-
 .../distributed/actors/QueueRefresher.java      | 120 +-----------
 .../qakka/distributed/actors/QueueSender.java   |   5 +-
 .../distributed/actors/QueueSenderRouter.java   |  10 +-
 .../distributed/actors/QueueTimeouter.java      |  26 +--
 .../qakka/distributed/actors/QueueWriter.java   |   6 +-
 .../distributed/actors/QueueWriterRouter.java   |  11 +-
 .../distributed/actors/ShardAllocator.java      |  22 +--
 .../impl/DistributedQueueServiceImpl.java       |  26 ++-
 .../qakka/core/QueueMessageManagerTest.java     | 178 +++++++++---------
 .../distributed/QueueActorServiceTest.java      | 112 ++++++-----
 .../actors/QueueActorHelperTest.java            | 186 +++++++++++--------
 .../distributed/actors/QueueReaderTest.java     |  11 +-
 .../distributed/actors/QueueTimeouterTest.java  |   7 +-
 .../distributed/actors/ShardAllocatorTest.java  |  30 +--
 .../queue/src/test/resources/log4j.properties   |   6 +-
 23 files changed, 531 insertions(+), 460 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a19ba9a/build.log
----------------------------------------------------------------------
diff --git a/build.log b/build.log
new file mode 100644
index 0000000..43ffacd
--- /dev/null
+++ b/build.log
@@ -0,0 +1,15 @@
+[INFO] Scanning for projects...
+[INFO] ------------------------------------------------------------------------
+[INFO] BUILD FAILURE
+[INFO] ------------------------------------------------------------------------
+[INFO] Total time: 0.086 s
+[INFO] Finished at: 2016-09-30T07:54:28-04:00
+[INFO] Final Memory: 46M/6710M
+[INFO] ------------------------------------------------------------------------
+[ERROR] The goal you specified requires a project to execute but there is no POM in this directory (/Users/ApigeeCorporation/src/usergrid-snoopdave). Please verify you invoked Maven from the correct directory. -> [Help 1]
+[ERROR] 
+[ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
+[ERROR] Re-run Maven using the -X switch to enable full debug logging.
+[ERROR] 
+[ERROR] For more information about the errors and possible solutions, please read the following articles:
+[ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MissingProjectException

http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a19ba9a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/App.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/App.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/App.java
index abbf3da..35fdb20 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/App.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/App.java
@@ -44,9 +44,6 @@ import org.slf4j.LoggerFactory;
 public class App implements MetricsService {
     private static final Logger logger = LoggerFactory.getLogger( App.class );
 
-    // TODO: can we avoid this kludge with better Akka-Guice integration?
-    static public Injector INJECTOR;
-
     private final ActorSystemFig          actorSystemFig;
     private final ActorSystemManager      actorSystemManager;
     private final DistributedQueueService distributedQueueService;
@@ -55,14 +52,16 @@ public class App implements MetricsService {
 
     @Inject
     public App(
-            Injector                  injector,
             QakkaFig                  qakkaFig,
             ActorSystemFig            actorSystemFig,
             ActorSystemManager        actorSystemManager,
             DistributedQueueService   distributedQueueService,
-            MigrationManager          migrationManager) {
+            MigrationManager          migrationManager,
+            QueueActorRouterProducer  queueActorRouterProducer,
+            QueueWriterRouterProducer queueWriterRouterProducer,
+            QueueSenderRouterProducer queueSenderRouterProducer
+            ) {
 
-        this.INJECTOR = injector;
         this.actorSystemFig = actorSystemFig;
         this.actorSystemManager = actorSystemManager;
         this.distributedQueueService = distributedQueueService;
@@ -74,9 +73,9 @@ public class App implements MetricsService {
             } catch (MigrationException e) {
                 throw new QakkaRuntimeException( "Error running migration", e );
             }
-            actorSystemManager.registerRouterProducer( injector.getInstance( QueueActorRouterProducer.class ) );
-            actorSystemManager.registerRouterProducer( injector.getInstance( QueueWriterRouterProducer.class ) );
-            actorSystemManager.registerRouterProducer( injector.getInstance( QueueSenderRouterProducer.class ) );
+            actorSystemManager.registerRouterProducer( queueActorRouterProducer );
+            actorSystemManager.registerRouterProducer( queueWriterRouterProducer );
+            actorSystemManager.registerRouterProducer( queueSenderRouterProducer );
         }
     }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a19ba9a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java
index 27de079..1f6fe6e 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/InMemoryQueue.java
@@ -19,8 +19,10 @@
 
 package org.apache.usergrid.persistence.qakka.core.impl;
 
+import com.datastax.driver.core.utils.UUIDs;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 import org.apache.usergrid.persistence.qakka.QakkaFig;
 import org.apache.usergrid.persistence.qakka.distributed.actors.QueueRefresher;
 import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
@@ -71,7 +73,14 @@ public class InMemoryQueue {
     }
 
     public UUID getNewest( String queueName ) {
-        return newestByQueueName.get( queueName );
+        UUID newest = newestByQueueName.get( queueName );
+//        if ( newest == null ) {
+//            // Create oldest UUID from a UNIX timestamp via DataStax utility
+//            // https://docs.datastax.com/en/drivers/java/2.0/com/datastax/driver/core/utils/UUIDs.html
+//            newest = UUIDs.startOf( 0L );
+//            newestByQueueName.put( queueName, newest );
+//        }
+        return newest;
     }
 
     public DatabaseQueueMessage poll( String queueName ) {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a19ba9a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java
index 691c1a6..59e0ce0 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueMessageManagerImpl.java
@@ -138,10 +138,6 @@ public class QueueMessageManagerImpl implements QueueMessageManager {
     @Override
     public List<QueueMessage> getNextMessages(String queueName, int count) {
 
-        if ( queueManager.getQueueConfig( queueName ) == null ) {
-            throw new NotFoundException( "Queue not found: " + queueName );
-        }
-
         Collection<DatabaseQueueMessage> dbMessages = distributedQueueService.getNextMessages( queueName, count );
 
         List<QueueMessage> queueMessages = joinMessages( queueName, dbMessages );
@@ -210,15 +206,14 @@ public class QueueMessageManagerImpl implements QueueMessageManager {
     @Override
     public void ackMessage(String queueName, UUID queueMessageId) {
 
-        if ( queueManager.getQueueConfig( queueName ) == null ) {
-            throw new NotFoundException( "Queue not found: " + queueName );
-        }
-
         DistributedQueueService.Status status = distributedQueueService.ackMessage( queueName, queueMessageId );
 
-        if ( DistributedQueueService.Status.BAD_REQUEST.equals( status )) {
+        if ( DistributedQueueService.Status.NOT_INFLIGHT.equals( status )) {
             throw new BadRequestException( "Message not inflight" );
 
+        } else if ( DistributedQueueService.Status.BAD_REQUEST.equals( status )) {
+            throw new BadRequestException( "Bad request" );
+
         } else if ( DistributedQueueService.Status.ERROR.equals( status )) {
             throw new QakkaRuntimeException( "Unable to ack message due to error" );
         }
@@ -228,10 +223,6 @@ public class QueueMessageManagerImpl implements QueueMessageManager {
     @Override
     public void requeueMessage(String queueName, UUID messageId, Long delayMs) {
 
-        if ( queueManager.getQueueConfig( queueName ) == null ) {
-            throw new NotFoundException( "Queue not found: " + queueName );
-        }
-
         // TODO: implement requeueMessage
 
         throw new UnsupportedOperationException( "requeueMessage not yet implemented" );
@@ -268,7 +259,6 @@ public class QueueMessageManagerImpl implements QueueMessageManager {
 
         // first look in INFLIGHT storage
 
-
         DatabaseQueueMessage dbMessage = queueMessageSerialization.loadMessage(
                 queueName, actorSystemFig.getRegionLocal(), null,
                 DatabaseQueueMessage.Type.INFLIGHT, queueMessageId );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a19ba9a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/DistributedQueueService.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/DistributedQueueService.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/DistributedQueueService.java
index b02a623..b11dcff 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/DistributedQueueService.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/DistributedQueueService.java
@@ -30,7 +30,7 @@ import java.util.UUID;
  */
 public interface DistributedQueueService {
 
-    enum Status { SUCCESS, ERROR, BAD_REQUEST };
+    enum Status { SUCCESS, ERROR, BAD_REQUEST, NOT_INFLIGHT };
 
     void init();
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a19ba9a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
index 87342ad..5ebba3d 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActor.java
@@ -24,7 +24,9 @@ import akka.actor.Cancellable;
 import akka.actor.Props;
 import akka.actor.UntypedActor;
 import com.codahale.metrics.Timer;
+import com.google.inject.Inject;
 import com.google.inject.Injector;
+import org.apache.usergrid.persistence.actorsystem.GuiceActorProducer;
 import org.apache.usergrid.persistence.qakka.App;
 import org.apache.usergrid.persistence.qakka.MetricsService;
 import org.apache.usergrid.persistence.qakka.QakkaFig;
@@ -65,10 +67,13 @@ public class QueueActor extends UntypedActor {
     private final AtomicLong messageCount = new AtomicLong(0);
     private final Set<String> queuesSeen = new HashSet<>();
 
+    private final Injector injector;
 
-    public QueueActor() {
 
-        Injector injector = App.INJECTOR;
+    @Inject
+    public QueueActor( Injector injector ) {
+
+        this.injector = injector;
 
         qakkaFig         = injector.getInstance( QakkaFig.class );
         inMemoryQueue    = injector.getInstance( InMemoryQueue.class );
@@ -107,7 +112,7 @@ public class QueueActor extends UntypedActor {
                         getContext().dispatcher(),
                         getSelf());
                 timeoutSchedulersByQueueName.put( request.getQueueName(), scheduler );
-                logger.debug("Created scheduler for queue {}", request.getQueueName() );
+                logger.debug("Created timeouter for queue {}", request.getQueueName() );
             }
 
             if ( shardAllocationSchedulersByQueueName.get( request.getQueueName() ) == null ) {
@@ -126,18 +131,20 @@ public class QueueActor extends UntypedActor {
             QueueRefreshRequest request = (QueueRefreshRequest)message;
 
             queuesSeen.add( request.getQueueName() );
-
-            if ( queueReadersByQueueName.get( request.getQueueName() ) == null ) {
-
-                if ( !request.isOnlyIfEmpty() || inMemoryQueue.peek( request.getQueueName()) == null ) {
-                    ActorRef readerRef = getContext().actorOf( Props.create(
-                        QueueRefresher.class, request.getQueueName()), request.getQueueName() + "_reader");
-                    queueReadersByQueueName.put( request.getQueueName(), readerRef );
-                }
-            }
-
-            // hand-off to queue's reader
-            queueReadersByQueueName.get( request.getQueueName() ).tell( request, self() );
+            queueActorHelper.queueRefresh( request.getQueueName() );
+
+//            if ( queueReadersByQueueName.get( request.getQueueName() ) == null ) {
+//
+//                if ( !request.isOnlyIfEmpty() || inMemoryQueue.peek( request.getQueueName()) == null ) {
+//                    ActorRef readerRef = getContext().actorOf(
+//                        Props.create( GuiceActorProducer.class, injector, QueueRefresher.class ),
+//                        request.getQueueName() + "_reader");
+//                    queueReadersByQueueName.put( request.getQueueName(), readerRef );
+//                }
+//            }
+//
+//            // hand-off to queue's reader
+//            queueReadersByQueueName.get( request.getQueueName() ).tell( request, self() );
 
         } else if ( message instanceof QueueTimeoutRequest ) {
             QueueTimeoutRequest request = (QueueTimeoutRequest)message;
@@ -145,8 +152,9 @@ public class QueueActor extends UntypedActor {
             queuesSeen.add( request.getQueueName() );
 
             if ( queueTimeoutersByQueueName.get( request.getQueueName() ) == null ) {
-                ActorRef readerRef = getContext().actorOf( Props.create(
-                    QueueTimeouter.class, request.getQueueName()), request.getQueueName() + "_timeouter");
+                ActorRef readerRef = getContext().actorOf(
+                    Props.create( GuiceActorProducer.class, injector, QueueTimeouter.class),
+                    request.getQueueName() + "_timeouter");
                 queueTimeoutersByQueueName.put( request.getQueueName(), readerRef );
             }
 
@@ -160,8 +168,9 @@ public class QueueActor extends UntypedActor {
             queuesSeen.add( request.getQueueName() );
 
             if ( shardAllocatorsByQueueName.get( request.getQueueName() ) == null ) {
-                ActorRef readerRef = getContext().actorOf( Props.create(
-                        ShardAllocator.class, request.getQueueName()), request.getQueueName() + "_shard_allocator");
+                ActorRef readerRef = getContext().actorOf(
+                    Props.create( GuiceActorProducer.class, injector, ShardAllocator.class),
+                    request.getQueueName() + "_shard_allocator");
                 shardAllocatorsByQueueName.put( request.getQueueName(), readerRef );
             }
 
@@ -181,15 +190,15 @@ public class QueueActor extends UntypedActor {
 
                 while (queueMessages.size() < queueGetRequest.getNumRequested()) {
 
-                    DatabaseQueueMessage queueMessage = inMemoryQueue.poll( queueGetRequest.getQueueName() );
+                    DatabaseQueueMessage queueMessage = inMemoryQueue.peek( queueGetRequest.getQueueName() );
 
                     if (queueMessage != null) {
                         if (queueActorHelper.putInflight( queueGetRequest.getQueueName(), queueMessage )) {
                             queueMessages.add( queueMessage );
                         }
                     } else {
-//                        logger.debug("in-memory queue for {} is empty, object is: {}",
-//                                queueGetRequest.getQueueName(), inMemoryQueue );
+                        logger.debug("in-memory queue for {} is empty, object is: {}",
+                                queueGetRequest.getQueueName(), inMemoryQueue );
                         break;
                     }
                 }
@@ -199,6 +208,9 @@ public class QueueActor extends UntypedActor {
                     DatabaseQueueMessage.Type.DEFAULT,
                     queueMessages.size());
 
+                logger.debug("{} returning {} for queue {}",
+                    this, queueMessages.size(), queueGetRequest.getQueueName());
+
                 getSender().tell( new QueueGetResponse(
                         DistributedQueueService.Status.SUCCESS, queueMessages ), getSender() );
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a19ba9a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java
index 26db903..68250df 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelper.java
@@ -19,17 +19,28 @@
 
 package org.apache.usergrid.persistence.qakka.distributed.actors;
 
+import com.codahale.metrics.Timer;
 import com.google.inject.Inject;
 import org.apache.usergrid.persistence.actorsystem.ActorSystemFig;
+import org.apache.usergrid.persistence.qakka.MetricsService;
+import org.apache.usergrid.persistence.qakka.QakkaFig;
+import org.apache.usergrid.persistence.qakka.core.CassandraClient;
+import org.apache.usergrid.persistence.qakka.core.impl.InMemoryQueue;
 import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService;
+import org.apache.usergrid.persistence.qakka.serialization.MultiShardMessageIterator;
 import org.apache.usergrid.persistence.qakka.serialization.auditlog.AuditLog;
 import org.apache.usergrid.persistence.qakka.serialization.auditlog.AuditLogSerialization;
 import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
 import org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization;
+import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard;
+import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardIterator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.text.DecimalFormat;
+import java.util.Optional;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
 
 
 public class QueueActorHelper {
@@ -38,18 +49,33 @@ public class QueueActorHelper {
     private final ActorSystemFig            actorSystemFig;
     private final QueueMessageSerialization messageSerialization;
     private final AuditLogSerialization     auditLogSerialization;
+    private final InMemoryQueue             inMemoryQueue;
+    private final QakkaFig                  qakkaFig;
+    private final MetricsService            metricsService;
+    private final CassandraClient           cassandraClient;
+
+    private final AtomicLong runCount = new AtomicLong(0);
+    private final AtomicLong totalRead = new AtomicLong(0);
 
 
     @Inject
     public QueueActorHelper(
-            ActorSystemFig actorSystemFig,
+            QakkaFig                  qakkaFig,
+            ActorSystemFig            actorSystemFig,
             QueueMessageSerialization messageSerialization,
-            AuditLogSerialization auditLogSerialization
+            AuditLogSerialization     auditLogSerialization,
+            InMemoryQueue             inMemoryQueue,
+            MetricsService            metricsService,
+            CassandraClient           cassandraClient
             ) {
 
-        this.actorSystemFig = actorSystemFig;
-        this.messageSerialization = messageSerialization;
+        this.actorSystemFig        = actorSystemFig;
+        this.messageSerialization  = messageSerialization;
         this.auditLogSerialization = auditLogSerialization;
+        this.inMemoryQueue         = inMemoryQueue;
+        this.qakkaFig              = qakkaFig;
+        this.metricsService        = metricsService;
+        this.cassandraClient       = cassandraClient;
     }
 
 
@@ -78,7 +104,7 @@ public class QueueActorHelper {
                 queueName, queueMessageId, DatabaseQueueMessage.Type.INFLIGHT );
 
         if ( queueMessage == null ) {
-            return DistributedQueueService.Status.BAD_REQUEST;
+            return DistributedQueueService.Status.NOT_INFLIGHT;
         }
 
         boolean error = false;
@@ -164,4 +190,73 @@ public class QueueActorHelper {
 
         return true;
     }
+
+    void queueRefresh( String queueName ) {
+
+        Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.REFRESH_TIME).time();
+
+        try {
+
+            if (inMemoryQueue.size( queueName ) < qakkaFig.getQueueInMemorySize()) {
+
+                ShardIterator shardIterator = new ShardIterator(
+                    cassandraClient, queueName, actorSystemFig.getRegionLocal(),
+                    Shard.Type.DEFAULT, Optional.empty() );
+
+                UUID since = inMemoryQueue.getNewest( queueName );
+
+                String region = actorSystemFig.getRegionLocal();
+                MultiShardMessageIterator multiShardIterator = new MultiShardMessageIterator(
+                    cassandraClient, queueName, region, DatabaseQueueMessage.Type.DEFAULT,
+                    shardIterator, since);
+
+                int need = qakkaFig.getQueueInMemorySize() - inMemoryQueue.size( queueName );
+                int count = 0;
+
+                while ( multiShardIterator.hasNext() && count < need ) {
+                    DatabaseQueueMessage queueMessage = multiShardIterator.next();
+                    inMemoryQueue.add( queueName, queueMessage );
+                    count++;
+                }
+
+                long runs = runCount.incrementAndGet();
+                long readCount = totalRead.addAndGet( count );
+
+                if ( logger.isDebugEnabled() && runs % 100 == 0 ) {
+
+                    final DecimalFormat format = new DecimalFormat("##.###");
+                    final long nano = 1000000000;
+                    Timer t = metricsService.getMetricRegistry().timer( MetricsService.REFRESH_TIME );
+
+                    logger.debug("QueueRefresher for queue '{}' stats:\n" +
+                            "   Num runs={}\n" +
+                            "   Read count={}\n" +
+                            "   Mean={}\n" +
+                            "   One min rate={}\n" +
+                            "   Five min rate={}\n" +
+                            "   Snapshot mean={}\n" +
+                            "   Snapshot min={}\n" +
+                            "   Snapshot max={}",
+                        queueName,
+                        t.getCount(),
+                        readCount,
+                        format.format( t.getMeanRate() ),
+                        format.format( t.getOneMinuteRate() ),
+                        format.format( t.getFiveMinuteRate() ),
+                        format.format( t.getSnapshot().getMean() / nano ),
+                        format.format( (double) t.getSnapshot().getMin() / nano ),
+                        format.format( (double) t.getSnapshot().getMax() / nano ) );
+                }
+
+                if ( count > 0 ) {
+                    logger.debug( "Added {} in-memory for queue {}, new size = {}",
+                        count, queueName, inMemoryQueue.size( queueName ) );
+                }
+            }
+
+        } finally {
+            timer.close();
+        }
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a19ba9a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java
index 97e591c..9257a0d 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorRouter.java
@@ -24,6 +24,9 @@ import akka.actor.Props;
 import akka.actor.UntypedActor;
 import akka.routing.ConsistentHashingRouter;
 import akka.routing.FromConfig;
+import com.google.inject.Inject;
+import com.google.inject.Injector;
+import org.apache.usergrid.persistence.actorsystem.GuiceActorProducer;
 import org.apache.usergrid.persistence.qakka.distributed.messages.*;
 
 
@@ -35,9 +38,11 @@ public class QueueActorRouter extends UntypedActor {
     private final ActorRef routerRef;
 
 
-    public QueueActorRouter() {
-        routerRef = getContext().actorOf(
-                FromConfig.getInstance().props( Props.create(QueueActor.class)), "router");
+    @Inject
+    public QueueActorRouter( Injector injector ) {
+
+        this.routerRef = getContext().actorOf( FromConfig.getInstance().props(
+            Props.create(GuiceActorProducer.class, injector, QueueActor.class)), "router");
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a19ba9a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java
index dbd5235..2f70088 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueRefresher.java
@@ -20,55 +20,20 @@
 package org.apache.usergrid.persistence.qakka.distributed.actors;
 
 import akka.actor.UntypedActor;
-import com.codahale.metrics.Timer;
-import com.google.inject.Injector;
-import org.apache.usergrid.persistence.actorsystem.ActorSystemFig;
-import org.apache.usergrid.persistence.qakka.App;
-import org.apache.usergrid.persistence.qakka.MetricsService;
-import org.apache.usergrid.persistence.qakka.QakkaFig;
-import org.apache.usergrid.persistence.qakka.core.CassandraClient;
-import org.apache.usergrid.persistence.qakka.core.CassandraClientImpl;
-import org.apache.usergrid.persistence.qakka.core.impl.InMemoryQueue;
+import com.google.inject.Inject;
 import org.apache.usergrid.persistence.qakka.distributed.messages.QueueRefreshRequest;
-import org.apache.usergrid.persistence.qakka.exceptions.QakkaRuntimeException;
-import org.apache.usergrid.persistence.qakka.serialization.MultiShardMessageIterator;
-import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
-import org.apache.usergrid.persistence.qakka.serialization.queuemessages.QueueMessageSerialization;
-import org.apache.usergrid.persistence.qakka.serialization.sharding.Shard;
-import org.apache.usergrid.persistence.qakka.serialization.sharding.ShardIterator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.text.DecimalFormat;
-import java.util.Optional;
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicLong;
-
 
 public class QueueRefresher extends UntypedActor {
     private static final Logger logger = LoggerFactory.getLogger( QueueRefresher.class );
 
-    private final String          queueName;
-    private final InMemoryQueue   inMemoryQueue;
-    private final QakkaFig        qakkaFig;
-    private final ActorSystemFig  actorSystemFig;
-    private final MetricsService  metricsService;
-    private final CassandraClient cassandraClient;
-
-    private final AtomicLong      runCount = new AtomicLong(0);
-    private final AtomicLong      totalRead = new AtomicLong(0);
-
+    final QueueActorHelper helper;
 
-    public QueueRefresher(String queueName ) {
-        this.queueName = queueName;
-
-        Injector injector = App.INJECTOR;
-
-        inMemoryQueue  = injector.getInstance( InMemoryQueue.class );
-        qakkaFig       = injector.getInstance( QakkaFig.class );
-        actorSystemFig = injector.getInstance( ActorSystemFig.class );
-        metricsService = injector.getInstance( MetricsService.class );
-        cassandraClient = injector.getInstance( CassandraClientImpl.class );
+    @Inject
+    public QueueRefresher( QueueActorHelper helper ) {
+        this.helper = helper;
     }
 
 
@@ -78,78 +43,9 @@ public class QueueRefresher extends UntypedActor {
         if ( message instanceof QueueRefreshRequest ) {
 
             QueueRefreshRequest request = (QueueRefreshRequest) message;
-
-            //logger.debug( "running for queue {}", queueName );
-
-            if (!request.getQueueName().equals( queueName )) {
-                throw new QakkaRuntimeException(
-                        "QueueWriter for " + queueName + ": Incorrect queueName " + request.getQueueName() );
-            }
-
-            Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.REFRESH_TIME).time();
-
-            try {
-
-                if (inMemoryQueue.size( queueName ) < qakkaFig.getQueueInMemorySize()) {
-
-                    ShardIterator shardIterator = new ShardIterator(
-                            cassandraClient, queueName, actorSystemFig.getRegionLocal(),
-                            Shard.Type.DEFAULT, Optional.empty() );
-
-                    UUID since = inMemoryQueue.getNewest( queueName );
-
-                    String region = actorSystemFig.getRegionLocal();
-                    MultiShardMessageIterator multiShardIterator = new MultiShardMessageIterator(
-                            cassandraClient, queueName, region, DatabaseQueueMessage.Type.DEFAULT,
-                            shardIterator, since);
-
-                    int need = qakkaFig.getQueueInMemorySize() - inMemoryQueue.size( queueName );
-                    int count = 0;
-
-                    while ( multiShardIterator.hasNext() && count < need ) {
-                        DatabaseQueueMessage queueMessage = multiShardIterator.next();
-                        inMemoryQueue.add( queueName, queueMessage );
-                        count++;
-                    }
-
-                    long runs = runCount.incrementAndGet();
-                    long readCount = totalRead.addAndGet( count );
-
-                    if ( logger.isDebugEnabled() && runs % 100 == 0 ) {
-
-                        final DecimalFormat format = new DecimalFormat("##.###");
-                        final long nano = 1000000000;
-                        Timer t = metricsService.getMetricRegistry().timer(MetricsService.REFRESH_TIME );
-
-                        logger.debug("QueueRefresher for queue '{}' stats:\n" +
-                            "   Num runs={}\n" +
-                            "   Read count={}\n" +
-                            "   Mean={}\n" +
-                            "   One min rate={}\n" +
-                            "   Five min rate={}\n" +
-                            "   Snapshot mean={}\n" +
-                            "   Snapshot min={}\n" +
-                            "   Snapshot max={}",
-                            queueName,
-                            t.getCount(),
-                            readCount,
-                            format.format( t.getMeanRate() ),
-                            format.format( t.getOneMinuteRate() ),
-                            format.format( t.getFiveMinuteRate() ),
-                            format.format( t.getSnapshot().getMean() / nano ),
-                            format.format( (double) t.getSnapshot().getMin() / nano ),
-                            format.format( (double) t.getSnapshot().getMax() / nano ) );
-                    }
-
-//                    if ( count > 0 ) {
-//                        logger.debug( "Added {} in-memory for queue {}, new size = {}",
-//                                count, queueName, inMemoryQueue.size( queueName ) );
-//                    }
-                }
-
-            } finally {
-                timer.close();
-            }
+            logger.debug( "running for queue {}", request.getQueueName() );
+            String queueName = request.getQueueName();
+            helper.queueRefresh( queueName );
 
         } else {
             unhandled( message );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a19ba9a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java
index 03d1216..739e1c4 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java
@@ -25,6 +25,7 @@ import akka.cluster.client.ClusterClient;
 import akka.pattern.Patterns;
 import akka.util.Timeout;
 import com.codahale.metrics.Timer;
+import com.google.inject.Inject;
 import com.google.inject.Injector;
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.usergrid.persistence.actorsystem.ActorSystemFig;
@@ -63,9 +64,9 @@ public class QueueSender extends UntypedActor {
     private final QakkaFig qakkaFig;
     private final MetricsService            metricsService;
 
-    public QueueSender() {
 
-        Injector injector = App.INJECTOR;
+    @Inject
+    public QueueSender( Injector injector ) {
 
         actorSystemManager       = injector.getInstance( ActorSystemManager.class );
         transferLogSerialization = injector.getInstance( TransferLogSerialization.class );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a19ba9a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSenderRouter.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSenderRouter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSenderRouter.java
index 20603a5..92d0785 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSenderRouter.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSenderRouter.java
@@ -23,6 +23,9 @@ import akka.actor.ActorRef;
 import akka.actor.Props;
 import akka.actor.UntypedActor;
 import akka.routing.FromConfig;
+import com.google.inject.Inject;
+import com.google.inject.Injector;
+import org.apache.usergrid.persistence.actorsystem.GuiceActorProducer;
 import org.apache.usergrid.persistence.qakka.distributed.messages.QueueSendRequest;
 
 
@@ -34,10 +37,11 @@ public class QueueSenderRouter extends UntypedActor {
     private final ActorRef router;
 
 
-    public QueueSenderRouter() {
+    @Inject
+    public QueueSenderRouter( Injector injector ) {
 
-        router = getContext().actorOf(
-                FromConfig.getInstance().props(Props.create(QueueSender.class )), "router");
+        this.router = getContext().actorOf( FromConfig.getInstance().props(
+            Props.create( GuiceActorProducer.class, injector, QueueSender.class )), "router");
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a19ba9a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java
index b47aac6..9b11277 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueTimeouter.java
@@ -21,6 +21,7 @@ package org.apache.usergrid.persistence.qakka.distributed.actors;
 
 import akka.actor.UntypedActor;
 import com.codahale.metrics.Timer;
+import com.google.inject.Inject;
 import com.google.inject.Injector;
 import org.apache.usergrid.persistence.actorsystem.ActorSystemFig;
 import org.apache.usergrid.persistence.qakka.App;
@@ -49,8 +50,6 @@ import java.util.concurrent.atomic.AtomicLong;
 public class QueueTimeouter extends UntypedActor {
     private static final Logger logger = LoggerFactory.getLogger( QueueTimeouter.class );
 
-    private final String                    queueName;
-
     private final QueueMessageSerialization messageSerialization;
     private final MetricsService            metricsService;
     private final ActorSystemFig            actorSystemFig;
@@ -62,10 +61,8 @@ public class QueueTimeouter extends UntypedActor {
     private final AtomicLong totalTimedout = new AtomicLong(0);
 
 
-    public QueueTimeouter(String queueName ) {
-        this.queueName = queueName;
-
-        Injector injector = App.INJECTOR;
+    @Inject
+    public QueueTimeouter( Injector injector) {
 
         messageSerialization = injector.getInstance( QueueMessageSerialization.class );
         actorSystemFig       = injector.getInstance( ActorSystemFig.class );
@@ -88,10 +85,7 @@ public class QueueTimeouter extends UntypedActor {
 
                 QueueTimeoutRequest request = (QueueTimeoutRequest) message;
 
-                if (!request.getQueueName().equals( queueName )) {
-                    throw new QakkaRuntimeException(
-                            "QueueTimeouter for " + queueName + ": Incorrect queueName " + request.getQueueName() );
-                }
+                String queueName = request.getQueueName();
 
                 //logger.debug("Processing timeouts for queue {} ", queueName );
 
@@ -171,12 +165,12 @@ public class QueueTimeouter extends UntypedActor {
                         format.format( (double) t.getSnapshot().getMax() / nano ) );
                 }
 
-//                if (count > 0) {
-//                    logger.debug( "Timed out {} messages for queue {}", count, queueName );
-//
-//                    messageCounterSerialization.decrementCounter(
-//                        queueName, DatabaseQueueMessage.Type.DEFAULT, count);
-//                }
+                if (count > 0) {
+                    logger.debug( "Timed out {} messages for queue {}", count, queueName );
+
+                    messageCounterSerialization.decrementCounter(
+                        queueName, DatabaseQueueMessage.Type.DEFAULT, count);
+                }
 
             } finally {
                 timer.close();

http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a19ba9a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java
index e54d916..273f0b2 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriter.java
@@ -21,6 +21,7 @@ package org.apache.usergrid.persistence.qakka.distributed.actors;
 
 import akka.actor.UntypedActor;
 import com.codahale.metrics.Timer;
+import com.google.inject.Inject;
 import com.google.inject.Injector;
 import org.apache.usergrid.persistence.qakka.App;
 import org.apache.usergrid.persistence.qakka.MetricsService;
@@ -54,9 +55,8 @@ public class QueueWriter extends UntypedActor {
     private final MessageCounterSerialization messageCounterSerialization;
 
 
-    public QueueWriter() {
-
-        Injector injector = App.INJECTOR;
+    @Inject
+    public QueueWriter( Injector injector ) {
 
         messageSerialization     = injector.getInstance( QueueMessageSerialization.class );
         transferLogSerialization = injector.getInstance( TransferLogSerialization.class );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a19ba9a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriterRouter.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriterRouter.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriterRouter.java
index 9cf06d9..f0540af 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriterRouter.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueWriterRouter.java
@@ -23,6 +23,9 @@ import akka.actor.ActorRef;
 import akka.actor.Props;
 import akka.actor.UntypedActor;
 import akka.routing.FromConfig;
+import com.google.inject.Inject;
+import com.google.inject.Injector;
+import org.apache.usergrid.persistence.actorsystem.GuiceActorProducer;
 import org.apache.usergrid.persistence.qakka.distributed.messages.QueueWriteRequest;
 
 
@@ -33,11 +36,11 @@ public class QueueWriterRouter extends UntypedActor {
 
     private final ActorRef router;
 
+    @Inject
+    public QueueWriterRouter( Injector injector ) {
 
-    public QueueWriterRouter() {
-
-        router = getContext().actorOf(
-                FromConfig.getInstance().props(Props.create(QueueWriter.class )), "router");
+        this.router = getContext().actorOf( FromConfig.getInstance().props(
+            Props.create( GuiceActorProducer.class, injector, QueueWriter.class )), "router");
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a19ba9a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocator.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocator.java
index 46e4906..65c3370 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocator.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocator.java
@@ -23,6 +23,7 @@ package org.apache.usergrid.persistence.qakka.distributed.actors;
 import akka.actor.UntypedActor;
 import com.codahale.metrics.Timer;
 import com.datastax.driver.core.utils.UUIDs;
+import com.google.inject.Inject;
 import com.google.inject.Injector;
 import org.apache.usergrid.persistence.actorsystem.ActorSystemFig;
 import org.apache.usergrid.persistence.qakka.App;
@@ -49,8 +50,6 @@ import java.util.UUID;
 public class ShardAllocator extends UntypedActor {
     private static final Logger logger = LoggerFactory.getLogger( ShardAllocator.class );
 
-    private final String queueName;
-
     private final QakkaFig qakkaFig;
     private final ActorSystemFig            actorSystemFig;
     private final ShardSerialization        shardSerialization;
@@ -59,10 +58,8 @@ public class ShardAllocator extends UntypedActor {
     private final CassandraClient           cassandraClient;
 
 
-    public ShardAllocator( String queueName ) {
-        this.queueName = queueName;
-
-        Injector injector = App.INJECTOR;
+    @Inject
+    public ShardAllocator( Injector injector ) {
 
         this.qakkaFig                  = injector.getInstance( QakkaFig.class );
         this.shardCounterSerialization = injector.getInstance( ShardCounterSerializationImpl.class );
@@ -70,8 +67,6 @@ public class ShardAllocator extends UntypedActor {
         this.actorSystemFig            = injector.getInstance( ActorSystemFig.class );
         this.metricsService            = injector.getInstance( MetricsService.class );
         this.cassandraClient           = injector.getInstance( CassandraClientImpl.class );
-
-        logger.debug( "Created shard allocator for queue {}", queueName );
     }
 
 
@@ -82,14 +77,9 @@ public class ShardAllocator extends UntypedActor {
 
             ShardCheckRequest request = (ShardCheckRequest) message;
 
-            if (!request.getQueueName().equals( queueName )) {
-                throw new QakkaRuntimeException(
-                        "ShardAllocator for " + queueName + ": Incorrect queueName " + request.getQueueName() );
-            }
-
             // check both types of shard
-            checkLatestShard( Shard.Type.DEFAULT );
-            checkLatestShard( Shard.Type.INFLIGHT );
+            checkLatestShard( request.getQueueName(), Shard.Type.DEFAULT );
+            checkLatestShard( request.getQueueName(), Shard.Type.INFLIGHT );
 
         } else {
             unhandled( message );
@@ -97,7 +87,7 @@ public class ShardAllocator extends UntypedActor {
 
     }
 
-    private void checkLatestShard( Shard.Type type ) {
+    private void checkLatestShard( String queueName, Shard.Type type ) {
 
         Timer.Context timer = metricsService.getMetricRegistry().timer( MetricsService.ALLOCATE_TIME).time();
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a19ba9a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
index bcb6b79..e24bdb4 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
@@ -31,6 +31,7 @@ import org.apache.usergrid.persistence.qakka.QakkaFig;
 import org.apache.usergrid.persistence.qakka.core.QueueManager;
 import org.apache.usergrid.persistence.qakka.distributed.DistributedQueueService;
 import org.apache.usergrid.persistence.qakka.distributed.messages.*;
+import org.apache.usergrid.persistence.qakka.exceptions.NotFoundException;
 import org.apache.usergrid.persistence.qakka.exceptions.QakkaRuntimeException;
 import org.apache.usergrid.persistence.qakka.serialization.queuemessages.DatabaseQueueMessage;
 import org.apache.usergrid.persistence.qakka.serialization.queuemessages.MessageCounterSerialization;
@@ -134,9 +135,8 @@ public class DistributedQueueServiceImpl implements DistributedQueueService {
             String queueName, String sourceRegion, String destRegion, UUID messageId,
             Long deliveryTime, Long expirationTime ) {
 
-        List<String> queueNames = queueManager.getListOfQueues();
-        if ( !queueNames.contains( queueName ) ) {
-            throw new QakkaRuntimeException( "Queue name: " + queueName + " does not exist");
+        if ( queueManager.getQueueConfig( queueName ) == null ) {
+            throw new NotFoundException( "Queue not found: " + queueName );
         }
 
         int maxRetries = qakkaFig.getMaxSendRetries();
@@ -213,9 +213,8 @@ public class DistributedQueueServiceImpl implements DistributedQueueService {
 
     public Collection<DatabaseQueueMessage> getNextMessagesInternal( String queueName, int count ) {
 
-        List<String> queueNames = queueManager.getListOfQueues();
-        if ( !queueNames.contains( queueName ) ) {
-            throw new QakkaRuntimeException( "Queue name: " + queueName + " does not exist");
+        if ( queueManager.getQueueConfig( queueName ) == null ) {
+            throw new NotFoundException( "Queue not found: " + queueName );
         }
 
         if ( actorSystemManager.getClientActor() == null || !actorSystemManager.isReady() ) {
@@ -280,9 +279,8 @@ public class DistributedQueueServiceImpl implements DistributedQueueService {
     @Override
     public Status ackMessage(String queueName, UUID queueMessageId ) {
 
-        List<String> queueNames = queueManager.getListOfQueues();
-        if ( !queueNames.contains( queueName ) ) {
-            return Status.BAD_REQUEST;
+        if ( queueManager.getQueueConfig( queueName ) == null ) {
+            throw new NotFoundException( "Queue not found: " + queueName );
         }
 
         QueueAckRequest message = new QueueAckRequest( queueName, queueMessageId );
@@ -293,9 +291,8 @@ public class DistributedQueueServiceImpl implements DistributedQueueService {
     @Override
     public Status requeueMessage(String queueName, UUID messageId) {
 
-        List<String> queueNames = queueManager.getListOfQueues();
-        if ( !queueNames.contains( queueName ) ) {
-            throw new QakkaRuntimeException( "Queue name: " + queueName + " does not exist");
+        if ( queueManager.getQueueConfig( queueName ) == null ) {
+            throw new NotFoundException( "Queue not found: " + queueName );
         }
 
         QueueAckRequest message = new QueueAckRequest( queueName, messageId );
@@ -306,9 +303,8 @@ public class DistributedQueueServiceImpl implements DistributedQueueService {
     @Override
     public Status clearMessages(String queueName) {
 
-        List<String> queueNames = queueManager.getListOfQueues();
-        if ( !queueNames.contains( queueName ) ) {
-            throw new QakkaRuntimeException( "Queue name: " + queueName + " does not exist");
+        if ( queueManager.getQueueConfig( queueName ) == null ) {
+            throw new NotFoundException( "Queue not found: " + queueName );
         }
 
         // TODO: implement clear queue

http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a19ba9a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
index c10d1f5..8ce9822 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/core/QueueMessageManagerTest.java
@@ -70,9 +70,9 @@ public class QueueMessageManagerTest extends AbstractTest {
     @Test
     public void testBasicOperation() throws Exception {
 
-        Injector injector = getInjector();
+        String queueName = "qmmt_queue_" + RandomStringUtils.randomAlphanumeric(15);
 
-        CassandraClient cassandraClient = injector.getInstance( CassandraClientImpl.class );
+        Injector injector = getInjector();
 
         DistributedQueueService distributedQueueService = injector.getInstance( DistributedQueueService.class );
         ActorSystemFig actorSystemFig = injector.getInstance( ActorSystemFig.class );
@@ -82,54 +82,60 @@ public class QueueMessageManagerTest extends AbstractTest {
         app.start( "localhost", getNextAkkaPort(), region );
 
         // create queue and send one message to it
-        String queueName = "qmmt_queue_" + RandomStringUtils.randomAlphanumeric(15);
         QueueManager queueManager = injector.getInstance( QueueManager.class );
-        QueueMessageManager qmm = injector.getInstance( QueueMessageManager.class );
-        queueManager.createQueue( new Queue( queueName, "test-type", region, region, 0L, 5, 10, null ));
-        String jsonData = "{}";
-        qmm.sendMessages( queueName, Collections.singletonList(region), null, null,
-                "application/json", DataType.serializeValue( jsonData, ProtocolVersion.NEWEST_SUPPORTED) );
 
-        distributedQueueService.refresh();
-        Thread.sleep(1000);
+        try {
 
-        // get message from the queue
-        List<QueueMessage> messages = qmm.getNextMessages( queueName, 1 );
-        Assert.assertEquals( 1, messages.size() );
-        QueueMessage message = messages.get(0);
+            QueueMessageManager qmm = injector.getInstance( QueueMessageManager.class );
+            queueManager.createQueue( new Queue( queueName, "test-type", region, region, 0L, 5, 10, null ) );
+            String jsonData = "{}";
+            qmm.sendMessages( queueName, Collections.singletonList( region ), null, null,
+                "application/json", DataType.serializeValue( jsonData, ProtocolVersion.NEWEST_SUPPORTED ) );
 
-        // test that queue message data is present and correct
-        QueueMessageSerialization qms = injector.getInstance( QueueMessageSerialization.class );
-        DatabaseQueueMessageBody data = qms.loadMessageData( message.getMessageId() );
-        Assert.assertNotNull( data );
-        Assert.assertEquals( "application/json", data.getContentType() );
-        String jsonDataReturned = new String( data.getBlob().array(), Charset.forName("UTF-8") );
-        Assert.assertEquals( jsonData, jsonDataReturned );
-
-        // test that transfer log is empty for our queue
-        TransferLogSerialization tlogs = injector.getInstance( TransferLogSerialization.class );
-        Result<TransferLog> all = tlogs.getAllTransferLogs( null, 1000 );
-        List<TransferLog> logs = all.getEntities().stream()
+            distributedQueueService.refresh();
+            Thread.sleep( 1000 );
+
+            // get message from the queue
+            List<QueueMessage> messages = qmm.getNextMessages( queueName, 1 );
+            Assert.assertEquals( 1, messages.size() );
+            QueueMessage message = messages.get( 0 );
+
+            // test that queue message data is present and correct
+            QueueMessageSerialization qms = injector.getInstance( QueueMessageSerialization.class );
+            DatabaseQueueMessageBody data = qms.loadMessageData( message.getMessageId() );
+            Assert.assertNotNull( data );
+            Assert.assertEquals( "application/json", data.getContentType() );
+            String jsonDataReturned = new String( data.getBlob().array(), Charset.forName( "UTF-8" ) );
+            Assert.assertEquals( jsonData, jsonDataReturned );
+
+            // test that transfer log is empty for our queue
+            TransferLogSerialization tlogs = injector.getInstance( TransferLogSerialization.class );
+            Result<TransferLog> all = tlogs.getAllTransferLogs( null, 1000 );
+            List<TransferLog> logs = all.getEntities().stream()
                 .filter( log -> log.getQueueName().equals( queueName ) ).collect( Collectors.toList() );
-        Assert.assertTrue( logs.isEmpty() );
+            Assert.assertTrue( logs.isEmpty() );
 
-        // ack the message
-        qmm.ackMessage( queueName, message.getQueueMessageId() );
+            // ack the message
+            qmm.ackMessage( queueName, message.getQueueMessageId() );
 
-        // test that message is no longer stored in non-replicated keyspace
+            // test that message is no longer stored in non-replicated keyspace
 
-        Assert.assertNull( qms.loadMessage( queueName, region, null,
-                DatabaseQueueMessage.Type.DEFAULT, message.getQueueMessageId() ));
+            Assert.assertNull( qms.loadMessage( queueName, region, null,
+                DatabaseQueueMessage.Type.DEFAULT, message.getQueueMessageId() ) );
 
-        Assert.assertNull( qms.loadMessage( queueName, region, null,
-                DatabaseQueueMessage.Type.INFLIGHT, message.getQueueMessageId() ));
+            Assert.assertNull( qms.loadMessage( queueName, region, null,
+                DatabaseQueueMessage.Type.INFLIGHT, message.getQueueMessageId() ) );
 
-        // test that audit log entry was written
-        AuditLogSerialization auditLogSerialization = injector.getInstance( AuditLogSerialization.class );
-        Result<AuditLog> auditLogs = auditLogSerialization.getAuditLogs( message.getMessageId() );
-        Assert.assertEquals( 3, auditLogs.getEntities().size() );
+            // test that audit log entry was written
+            AuditLogSerialization auditLogSerialization = injector.getInstance( AuditLogSerialization.class );
+            Result<AuditLog> auditLogs = auditLogSerialization.getAuditLogs( message.getMessageId() );
+            Assert.assertEquals( 3, auditLogs.getEntities().size() );
 
-        distributedQueueService.shutdown();
+            distributedQueueService.shutdown();
+
+        } finally {
+            queueManager.deleteQueue( queueName );
+        }
     }
 
 
@@ -138,8 +144,6 @@ public class QueueMessageManagerTest extends AbstractTest {
 
         Injector injector = getInjector();
 
-        CassandraClient cassandraClient = injector.getInstance( CassandraClientImpl.class );
-
         DistributedQueueService distributedQueueService = injector.getInstance( DistributedQueueService.class );
         QakkaFig qakkaFig             = injector.getInstance( QakkaFig.class );
         ActorSystemFig actorSystemFig = injector.getInstance( ActorSystemFig.class );
@@ -152,74 +156,82 @@ public class QueueMessageManagerTest extends AbstractTest {
         // create some number of queue messages
 
         QueueManager queueManager = injector.getInstance( QueueManager.class );
-        QueueMessageManager qmm   = injector.getInstance( QueueMessageManager.class );
-        String queueName = "queue_testQueueMessageTimeouts_" + RandomStringUtils.randomAlphanumeric(15);
-        queueManager.createQueue( new Queue( queueName, "test-type", region, region, 0L, 5, 10, null ));
 
-        int numMessages = 40;
+        String queueName = "queue_testQueueMessageTimeouts_" + RandomStringUtils.randomAlphanumeric( 15 );
 
-        for ( int i=0; i<numMessages; i++ ) {
-            qmm.sendMessages(
+        try {
+
+            QueueMessageManager qmm = injector.getInstance( QueueMessageManager.class );
+            queueManager.createQueue( new Queue( queueName, "test-type", region, region, 0L, 5, 10, null ) );
+
+            int numMessages = 40;
+
+            for (int i = 0; i < numMessages; i++) {
+                qmm.sendMessages(
                     queueName,
                     Collections.singletonList( region ),
                     null, // delay
                     null, // expiration
                     "application/json",
                     DataType.serializeValue( "{}", ProtocolVersion.NEWEST_SUPPORTED ) );
-        }
+            }
 
-        int maxRetries = 15;
-        int retries = 0;
-        while ( retries++ < maxRetries ) {
-            distributedQueueService.refresh();
-            if (inMemoryQueue.size( queueName ) == 40) {
-                break;
+            int maxRetries = 15;
+            int retries = 0;
+            while (retries++ < maxRetries) {
+                distributedQueueService.refresh();
+                if (inMemoryQueue.size( queueName ) == 40) {
+                    break;
+                }
+                Thread.sleep( 500 );
             }
-            Thread.sleep( 500 );
-        }
 
-        Assert.assertEquals( numMessages, qmm.getQueueDepth( queueName ) );
+            Assert.assertEquals( numMessages, qmm.getQueueDepth( queueName ) );
 
-        // get all messages from queue
+            // get all messages from queue
 
-        List<QueueMessage> messages = qmm.getNextMessages( queueName, numMessages );
-        Assert.assertEquals( numMessages, messages.size() );
+            List<QueueMessage> messages = qmm.getNextMessages( queueName, numMessages );
+            Assert.assertEquals( numMessages, messages.size() );
 
-        // ack half of the messages
+            // ack half of the messages
 
-        List<QueueMessage> remove = new ArrayList<>();
+            List<QueueMessage> remove = new ArrayList<>();
 
-        for ( int i=0; i<numMessages/2; i++ ) {
-            QueueMessage queueMessage = messages.get( i );
-            qmm.ackMessage( queueName, queueMessage.getQueueMessageId() );
-            remove.add( queueMessage );
-        }
+            for (int i = 0; i < numMessages / 2; i++) {
+                QueueMessage queueMessage = messages.get( i );
+                qmm.ackMessage( queueName, queueMessage.getQueueMessageId() );
+                remove.add( queueMessage );
+            }
 
-        for ( QueueMessage message : remove ) {
-            messages.remove( message );
-        }
+            for (QueueMessage message : remove) {
+                messages.remove( message );
+            }
 
-        // wait for twice timeout period
+            // wait for twice timeout period
 
-        Thread.sleep( 2 * qakkaFig.getQueueTimeoutSeconds()*1000 );
+            Thread.sleep( 2 * qakkaFig.getQueueTimeoutSeconds() * 1000 );
 
-        distributedQueueService.processTimeouts();
+            distributedQueueService.processTimeouts();
 
-        Thread.sleep( qakkaFig.getQueueTimeoutSeconds()*1000 );
+            Thread.sleep( qakkaFig.getQueueTimeoutSeconds() * 1000 );
 
-        // attempt to ack other half of messages
+            // attempt to ack other half of messages
 
-        for ( QueueMessage message : messages ) {
-            try {
-                qmm.ackMessage( queueName, message.getQueueMessageId() );
-                Assert.fail("Message should have timed out by now");
+            for (QueueMessage message : messages) {
+                try {
+                    qmm.ackMessage( queueName, message.getQueueMessageId() );
+                    Assert.fail( "Message should have timed out by now" );
 
-            } catch ( QakkaRuntimeException expected ) {
-                // keep on going...
+                } catch (QakkaRuntimeException expected) {
+                    // keep on going...
+                }
             }
-        }
 
-        distributedQueueService.shutdown();
+            distributedQueueService.shutdown();
+
+        } finally {
+            queueManager.deleteQueue( queueName );
+        }
     }
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a19ba9a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java
index 7423424..53f9224 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java
@@ -78,35 +78,42 @@ public class QueueActorServiceTest extends AbstractTest {
         QueueManager queueManager = injector.getInstance( QueueManager.class );
         queueManager.createQueue( new Queue( queueName, "test-type", region, region, 0L, 5, 10, null ));
 
-        // send 1 queue message, get back one queue message
-        UUID messageId = UUIDGen.getTimeUUID();
+        try {
 
-        final String data = "my test data";
-        final DatabaseQueueMessageBody messageBody = new DatabaseQueueMessageBody(
+            // send 1 queue message, get back one queue message
+            UUID messageId = UUIDGen.getTimeUUID();
+
+            final String data = "my test data";
+            final DatabaseQueueMessageBody messageBody = new DatabaseQueueMessageBody(
                 DataType.serializeValue( data, ProtocolVersion.NEWEST_SUPPORTED ), "text/plain" );
-        serialization.writeMessageData( messageId, messageBody );
+            serialization.writeMessageData( messageId, messageBody );
 
-        distributedQueueService.sendMessageToRegion(
-                queueName, region, region, messageId, null, null);
+            distributedQueueService.sendMessageToRegion(
+                queueName, region, region, messageId, null, null );
 
-        distributedQueueService.refresh();
-        Thread.sleep(1000);
+            distributedQueueService.refresh();
+            Thread.sleep( 1000 );
 
-        Collection<DatabaseQueueMessage> qmReturned = distributedQueueService.getNextMessages( queueName, 1 );
-        Assert.assertEquals( 1, qmReturned.size() );
+            Collection<DatabaseQueueMessage> qmReturned = distributedQueueService.getNextMessages( queueName, 1 );
+            Assert.assertEquals( 1, qmReturned.size() );
 
-        DatabaseQueueMessage dqm = qmReturned.iterator().next();
-        DatabaseQueueMessageBody dqmb = serialization.loadMessageData( dqm.getMessageId() );
-        ByteBuffer blob = dqmb.getBlob();
+            DatabaseQueueMessage dqm = qmReturned.iterator().next();
+            DatabaseQueueMessageBody dqmb = serialization.loadMessageData( dqm.getMessageId() );
+            ByteBuffer blob = dqmb.getBlob();
 
-        String returnedData = new String( blob.array(), "UTF-8");
+            String returnedData = new String( blob.array(), "UTF-8" );
 //        ByteArrayInputStream bais = new ByteArrayInputStream( blob.array() );
 //        ObjectInputStream ios = new ObjectInputStream( bais );
 //        String returnedData = (String)ios.readObject();
 
-        Assert.assertEquals( data, returnedData );
+            Assert.assertEquals( data, returnedData );
+
+            distributedQueueService.shutdown();
+
+        } finally {
+            queueManager.deleteQueue( queueName );
+        }
 
-        distributedQueueService.shutdown();
     }
 
 
@@ -128,51 +135,58 @@ public class QueueActorServiceTest extends AbstractTest {
 
         String queueName = "queue_testGetMultipleQueueMessages_" + UUID.randomUUID();
         QueueManager queueManager = injector.getInstance( QueueManager.class );
-        queueManager.createQueue(
-                new Queue( queueName, "test-type", region, region, 0L, 5, 10, null ));
 
-        for ( int i=0; i<100; i++ ) {
+        try {
 
-            UUID messageId = UUIDGen.getTimeUUID();
+            queueManager.createQueue(
+                new Queue( queueName, "test-type", region, region, 0L, 5, 10, null ) );
 
-            final String data = "my test data";
-            final DatabaseQueueMessageBody messageBody = new DatabaseQueueMessageBody(
+            for (int i = 0; i < 100; i++) {
+
+                UUID messageId = UUIDGen.getTimeUUID();
+
+                final String data = "my test data";
+                final DatabaseQueueMessageBody messageBody = new DatabaseQueueMessageBody(
                     DataType.serializeValue( data, ProtocolVersion.NEWEST_SUPPORTED ), "text/plain" );
-            serialization.writeMessageData( messageId, messageBody );
+                serialization.writeMessageData( messageId, messageBody );
 
-            xferLogSerialization.recordTransferLog(
-                queueName, actorSystemFig.getRegionLocal(), region, messageId );
+                xferLogSerialization.recordTransferLog(
+                    queueName, actorSystemFig.getRegionLocal(), region, messageId );
 
-            distributedQueueService.sendMessageToRegion(
-                    queueName, region, region, messageId , null, null);
-        }
+                distributedQueueService.sendMessageToRegion(
+                    queueName, region, region, messageId, null, null );
+            }
 
-        int maxRetries = 25;
-        int retries = 0;
-        int count = 0;
-        while ( retries++ < maxRetries ) {
-            distributedQueueService.refresh();
-            if (inMemoryQueue.size( queueName ) == 100) {
-                count = 100;
-                break;
+            int maxRetries = 25;
+            int retries = 0;
+            int count = 0;
+            while (retries++ < maxRetries) {
+                distributedQueueService.refresh();
+                if (inMemoryQueue.size( queueName ) == 100) {
+                    count = 100;
+                    break;
+                }
+                Thread.sleep( 1000 );
             }
-            Thread.sleep(1000);
-        }
 
-        Assert.assertEquals( 100, count );
+            Assert.assertEquals( 100, count );
 
-        Assert.assertEquals( 25, distributedQueueService.getNextMessages( queueName, 25 ).size() );
-        Assert.assertEquals( 75, inMemoryQueue.size( queueName ) );
+            Assert.assertEquals( 25, distributedQueueService.getNextMessages( queueName, 25 ).size() );
+            Assert.assertEquals( 75, inMemoryQueue.size( queueName ) );
 
-        Assert.assertEquals( 25, distributedQueueService.getNextMessages( queueName, 25 ).size() );
-        Assert.assertEquals( 50, inMemoryQueue.size( queueName ) );
+            Assert.assertEquals( 25, distributedQueueService.getNextMessages( queueName, 25 ).size() );
+            Assert.assertEquals( 50, inMemoryQueue.size( queueName ) );
 
-        Assert.assertEquals( 25, distributedQueueService.getNextMessages( queueName, 25 ).size() );
-        Assert.assertEquals( 25, inMemoryQueue.size( queueName ) );
+            Assert.assertEquals( 25, distributedQueueService.getNextMessages( queueName, 25 ).size() );
+            Assert.assertEquals( 25, inMemoryQueue.size( queueName ) );
 
-        Assert.assertEquals( 25, distributedQueueService.getNextMessages( queueName, 25 ).size() );
-        Assert.assertEquals( 0, inMemoryQueue.size( queueName ) );
+            Assert.assertEquals( 25, distributedQueueService.getNextMessages( queueName, 25 ).size() );
+            Assert.assertEquals( 0, inMemoryQueue.size( queueName ) );
 
-        distributedQueueService.shutdown();
+            distributedQueueService.shutdown();
+
+        } finally {
+            queueManager.deleteQueue( queueName );
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/5a19ba9a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelperTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelperTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelperTest.java
index 3bf352f..791650e 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelperTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueActorHelperTest.java
@@ -53,7 +53,6 @@ public class QueueActorHelperTest extends AbstractTest {
     public void loadDatabaseQueueMessage() throws Exception {
 
         Injector injector = getInjector();
-        CassandraClient cassandraClient = injector.getInstance( CassandraClientImpl.class );
 
         injector.getInstance( App.class ); // init the INJECTOR
 
@@ -66,33 +65,39 @@ public class QueueActorHelperTest extends AbstractTest {
         app.start( "localhost", getNextAkkaPort(), region );
 
         String queueName = "qat_queue_" + RandomStringUtils.randomAlphanumeric( 10 );
-        queueManager.createQueue( new Queue( queueName ) );
 
-        UUID queueMessageId = QakkaUtils.getTimeUuid();
+        try {
+            queueManager.createQueue( new Queue( queueName ) );
 
-        // write message
+            UUID queueMessageId = QakkaUtils.getTimeUuid();
 
-        DatabaseQueueMessage message = new DatabaseQueueMessage(
-                QakkaUtils.getTimeUuid(),
-                DatabaseQueueMessage.Type.DEFAULT,
-                queueName,
-                actorSystemFig.getRegionLocal(),
-                null,
-                System.currentTimeMillis(),
-                null,
-                queueMessageId);
-        qms.writeMessage( message );
+            // write message
+
+            DatabaseQueueMessage message = new DatabaseQueueMessage(
+                    QakkaUtils.getTimeUuid(),
+                    DatabaseQueueMessage.Type.DEFAULT,
+                    queueName,
+                    actorSystemFig.getRegionLocal(),
+                    null,
+                    System.currentTimeMillis(),
+                    null,
+                    queueMessageId);
+            qms.writeMessage( message );
 
-        // load message
+            // load message
 
-        QueueActorHelper helper = injector.getInstance( QueueActorHelper.class );
-        DatabaseQueueMessage queueMessage = helper.loadDatabaseQueueMessage(
-                queueName, message.getQueueMessageId(), message.getType() );
+            QueueActorHelper helper = injector.getInstance( QueueActorHelper.class );
+            DatabaseQueueMessage queueMessage = helper.loadDatabaseQueueMessage(
+                    queueName, message.getQueueMessageId(), message.getType() );
 
-        Assert.assertNotNull( queueMessage );
+            Assert.assertNotNull( queueMessage );
 
-        DistributedQueueService distributedQueueService = injector.getInstance( DistributedQueueService.class );
-        distributedQueueService.shutdown();
+            DistributedQueueService distributedQueueService = injector.getInstance( DistributedQueueService.class );
+            distributedQueueService.shutdown();
+
+        } finally {
+            queueManager.deleteQueue( queueName );
+        }
     }
 
 
@@ -100,8 +105,6 @@ public class QueueActorHelperTest extends AbstractTest {
     public void loadDatabaseQueueMessageNotFound() throws Exception {
 
         Injector injector = getInjector();
-        CassandraClient cassandraClient = injector.getInstance( CassandraClientImpl.class );
-
 
         injector.getInstance( App.class ); // init the INJECTOR
         QueueManager queueManager = injector.getInstance( QueueManager.class );
@@ -112,20 +115,27 @@ public class QueueActorHelperTest extends AbstractTest {
         app.start( "localhost", getNextAkkaPort(), region );
 
         String queueName = "qat_queue_" + RandomStringUtils.randomAlphanumeric( 10 );
+
         queueManager.createQueue( new Queue( queueName ) );
 
-        // don't write any message
+        try {
+
+            // don't write any message
 
-        // load message
+            // load message
 
-        QueueActorHelper helper = injector.getInstance( QueueActorHelper.class );
-        DatabaseQueueMessage queueMessage = helper.loadDatabaseQueueMessage(
+            QueueActorHelper helper = injector.getInstance( QueueActorHelper.class );
+            DatabaseQueueMessage queueMessage = helper.loadDatabaseQueueMessage(
                 queueName, QakkaUtils.getTimeUuid(), DatabaseQueueMessage.Type.DEFAULT );
 
-        Assert.assertNull( queueMessage );
+            Assert.assertNull( queueMessage );
 
-        DistributedQueueService distributedQueueService = injector.getInstance( DistributedQueueService.class );
-        distributedQueueService.shutdown();
+            DistributedQueueService distributedQueueService = injector.getInstance( DistributedQueueService.class );
+            distributedQueueService.shutdown();
+
+        } finally {
+            queueManager.deleteQueue( queueName );
+        }
     }
 
 
@@ -133,8 +143,6 @@ public class QueueActorHelperTest extends AbstractTest {
     public void putInflight() throws Exception {
 
         Injector injector = getInjector();
-        CassandraClient cassandraClient = injector.getInstance( CassandraClientImpl.class );
-
 
         injector.getInstance( App.class ); // init the INJECTOR
 
@@ -153,7 +161,9 @@ public class QueueActorHelperTest extends AbstractTest {
         String queueName = "qat_queue_" + RandomStringUtils.randomAlphanumeric( 10 );
         queueManager.createQueue( new Queue( queueName ) );
 
-        DatabaseQueueMessage message = new DatabaseQueueMessage(
+        try {
+
+            DatabaseQueueMessage message = new DatabaseQueueMessage(
                 QakkaUtils.getTimeUuid(),
                 DatabaseQueueMessage.Type.DEFAULT,
                 queueName,
@@ -161,42 +171,46 @@ public class QueueActorHelperTest extends AbstractTest {
                 null,
                 System.currentTimeMillis(),
                 null,
-                queueMessageId);
-        qms.writeMessage( message );
+                queueMessageId );
+            qms.writeMessage( message );
 
-        // put message inflight
+            // put message inflight
 
-        QueueActorHelper helper = injector.getInstance( QueueActorHelper.class );
-        helper.putInflight( queueName, message );
+            QueueActorHelper helper = injector.getInstance( QueueActorHelper.class );
+            helper.putInflight( queueName, message );
 
-        // message must be gone from messages_available table
+            // message must be gone from messages_available table
 
-        Assert.assertNull( qms.loadMessage(
+            Assert.assertNull( qms.loadMessage(
                 queueName,
                 actorSystemFig.getRegionLocal(),
                 null,
                 DatabaseQueueMessage.Type.DEFAULT,
                 message.getQueueMessageId() ) );
 
-        // message must be present in messages_inflight table
+            // message must be present in messages_inflight table
 
-        Assert.assertNotNull( qms.loadMessage(
+            Assert.assertNotNull( qms.loadMessage(
                 queueName,
                 actorSystemFig.getRegionLocal(),
                 null,
                 DatabaseQueueMessage.Type.INFLIGHT,
                 message.getQueueMessageId() ) );
 
-        // there must be an audit log record of the successful get operation
+            // there must be an audit log record of the successful get operation
+
+            AuditLogSerialization auditLogSerialization = injector.getInstance( AuditLogSerialization.class );
+            Result<AuditLog> auditLogs = auditLogSerialization.getAuditLogs( message.getMessageId() );
+            Assert.assertEquals( 1, auditLogs.getEntities().size() );
+            Assert.assertEquals( AuditLog.Status.SUCCESS, auditLogs.getEntities().get( 0 ).getStatus() );
+            Assert.assertEquals( AuditLog.Action.GET, auditLogs.getEntities().get( 0 ).getAction() );
 
-        AuditLogSerialization auditLogSerialization = injector.getInstance( AuditLogSerialization.class );
-        Result<AuditLog> auditLogs = auditLogSerialization.getAuditLogs( message.getMessageId() );
-        Assert.assertEquals( 1, auditLogs.getEntities().size() );
-        Assert.assertEquals( AuditLog.Status.SUCCESS, auditLogs.getEntities().get(0).getStatus()  );
-        Assert.assertEquals( AuditLog.Action.GET,     auditLogs.getEntities().get(0).getAction()  );
+            DistributedQueueService distributedQueueService = injector.getInstance( DistributedQueueService.class );
+            distributedQueueService.shutdown();
 
-        DistributedQueueService distributedQueueService = injector.getInstance( DistributedQueueService.class );
-        distributedQueueService.shutdown();
+        } finally {
+            queueManager.deleteQueue( queueName );
+        }
     }
 
 
@@ -222,9 +236,11 @@ public class QueueActorHelperTest extends AbstractTest {
         String queueName = "qat_queue_" + RandomStringUtils.randomAlphanumeric( 10 );
         queueManager.createQueue( new Queue( queueName ) );
 
-        // write message to messages_inflight table
+        try {
 
-        DatabaseQueueMessage message = new DatabaseQueueMessage(
+            // write message to messages_inflight table
+
+            DatabaseQueueMessage message = new DatabaseQueueMessage(
                 QakkaUtils.getTimeUuid(),
                 DatabaseQueueMessage.Type.INFLIGHT,
                 queueName,
@@ -232,34 +248,38 @@ public class QueueActorHelperTest extends AbstractTest {
                 null,
                 System.currentTimeMillis(),
                 null,
-                queueMessageId);
-        qms.writeMessage( message );
+                queueMessageId );
+            qms.writeMessage( message );
+
+            // ack message
 
-        // ack message
+            QueueActorHelper helper = injector.getInstance( QueueActorHelper.class );
+            helper.ackQueueMessage( queueName, message.getQueueMessageId() );
 
-        QueueActorHelper helper = injector.getInstance( QueueActorHelper.class );
-        helper.ackQueueMessage( queueName, message.getQueueMessageId() );
+            // message must be gone from messages_available table
 
-        // message must be gone from messages_available table
+            Assert.assertNull( helper.loadDatabaseQueueMessage(
+                queueName, QakkaUtils.getTimeUuid(), DatabaseQueueMessage.Type.INFLIGHT ) );
 
-        Assert.assertNull( helper.loadDatabaseQueueMessage(
-                queueName, QakkaUtils.getTimeUuid(), DatabaseQueueMessage.Type.INFLIGHT ));
+            // message must be gone from messages_inflight table
 
-        // message must be gone from messages_inflight table
+            Assert.assertNull( helper.loadDatabaseQueueMessage(
+                queueName, QakkaUtils.getTimeUuid(), DatabaseQueueMessage.Type.DEFAULT ) );
 
-        Assert.assertNull( helper.loadDatabaseQueueMessage(
-                queueName, QakkaUtils.getTimeUuid(), DatabaseQueueMessage.Type.DEFAULT ));
+            // there should be an audit log record of the successful ack operation
 
-        // there should be an audit log record of the successful ack operation
+            AuditLogSerialization auditLogSerialization = injector.getInstance( AuditLogSerialization.class );
+            Result<AuditLog> auditLogs = auditLogSerialization.getAuditLogs( message.getMessageId() );
+            Assert.assertEquals( 1, auditLogs.getEntities().size() );
+            Assert.assertEquals( AuditLog.Status.SUCCESS, auditLogs.getEntities().get( 0 ).getStatus() );
+            Assert.assertEquals( AuditLog.Action.ACK, auditLogs.getEntities().get( 0 ).getAction() );
 
-        AuditLogSerialization auditLogSerialization = injector.getInstance( AuditLogSerialization.class );
-        Result<AuditLog> auditLogs = auditLogSerialization.getAuditLogs( message.getMessageId() );
-        Assert.assertEquals( 1, auditLogs.getEntities().size() );
-        Assert.assertEquals( AuditLog.Status.SUCCESS, auditLogs.getEntities().get(0).getStatus()  );
-        Assert.assertEquals( AuditLog.Action.ACK,     auditLogs.getEntities().get(0).getAction()  );
+            DistributedQueueService distributedQueueService = injector.getInstance( DistributedQueueService.class );
+            distributedQueueService.shutdown();
 
-        DistributedQueueService distributedQueueService = injector.getInstance( DistributedQueueService.class );
-        distributedQueueService.shutdown();
+        } finally {
+            queueManager.deleteQueue( queueName );
+        }
     }
 
 
@@ -267,8 +287,6 @@ public class QueueActorHelperTest extends AbstractTest {
     public void ackQueueMessageNotFound() throws Exception {
 
         Injector injector = getInjector();
-        CassandraClient cassandraClient = injector.getInstance( CassandraClientImpl.class );
-
 
         injector.getInstance( App.class ); // init the INJECTOR
         QueueManager queueManager     = injector.getInstance( QueueManager.class );
@@ -281,17 +299,23 @@ public class QueueActorHelperTest extends AbstractTest {
         String queueName = "qat_queue_" + RandomStringUtils.randomAlphanumeric( 10 );
         queueManager.createQueue( new Queue( queueName ) );
 
-        // don't write message, just make up some bogus IDs
+        try {
 
-        UUID queueMessageId = QakkaUtils.getTimeUuid();
+            // don't write message, just make up some bogus IDs
+
+            UUID queueMessageId = QakkaUtils.getTimeUuid();
+
+            // ack message must fail
 
-        // ack message must fail
+            QueueActorHelper helper = injector.getInstance( QueueActorHelper.class );
+            Assert.assertEquals( DistributedQueueService.Status.NOT_INFLIGHT,
+                helper.ackQueueMessage( queueName, queueMessageId ) );
 
-        QueueActorHelper helper = injector.getInstance( QueueActorHelper.class );
-        Assert.assertEquals( DistributedQueueService.Status.BAD_REQUEST,
-            helper.ackQueueMessage( queueName, queueMessageId ));
+            DistributedQueueService distributedQueueService = injector.getInstance( DistributedQueueService.class );
+            distributedQueueService.shutdown();
 
-        DistributedQueueService distributedQueueService = injector.getInstance( DistributedQueueService.class );
-        distributedQueueService.shutdown();
+        } finally {
+            queueManager.deleteQueue( queueName );
+        }
     }
 }


[07/10] usergrid git commit: Make DatabaseQueueMessage serializable because it is sent in Akka messages

Posted by sn...@apache.org.
Make DatabaseQueueMessage serializable because it is sent in Akka messages


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/00eb1396
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/00eb1396
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/00eb1396

Branch: refs/heads/usergrid-1318-queue
Commit: 00eb13965e2b31bc1c57e437ceac0a42f6610861
Parents: c08e02a
Author: Dave Johnson <sn...@apache.org>
Authored: Thu Sep 22 07:47:44 2016 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Thu Sep 22 07:47:44 2016 -0400

----------------------------------------------------------------------
 .../qakka/serialization/queuemessages/DatabaseQueueMessage.java   | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/00eb1396/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/DatabaseQueueMessage.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/DatabaseQueueMessage.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/DatabaseQueueMessage.java
index dab47d5..89a3eeb 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/DatabaseQueueMessage.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/DatabaseQueueMessage.java
@@ -19,10 +19,11 @@
 
 package org.apache.usergrid.persistence.qakka.serialization.queuemessages;
 
+import java.io.Serializable;
 import java.util.UUID;
 
 
-public class DatabaseQueueMessage {
+public class DatabaseQueueMessage implements Serializable {
 
     public enum Type {
         DEFAULT, INFLIGHT