You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2016/09/16 22:34:35 UTC

[06/25] usergrid git commit: Move under org.apache.usergrid.persistence package in preparation for integration into Usergrid.

Move under org.apache.usergrid.persistence package in preparation for integration into Usergrid.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/9016fd29
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/9016fd29
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/9016fd29

Branch: refs/heads/usergrid-1318-queue
Commit: 9016fd290e80b88ba3199c0020fca7722b82d780
Parents: 1574b3a
Author: Dave Johnson <sn...@apache.org>
Authored: Fri Sep 9 17:31:44 2016 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Fri Sep 9 17:31:44 2016 -0400

----------------------------------------------------------------------
 .../asyncevents/AsyncEventServiceImpl.java      |  58 +++----
 .../asyncevents/AsyncIndexProvider.java         |  12 +-
 .../index/AsyncEventServiceImplTest.java        |   8 +-
 .../usergrid/mq/LegacyQueuePathsTest.java       |  48 ++++++
 .../org/apache/usergrid/mq/QueuePathsTest.java  |  48 ------
 .../usergrid/persistence/queue/LegacyQueue.java |  35 +++++
 .../persistence/queue/LegacyQueueFig.java       | 106 +++++++++++++
 .../persistence/queue/LegacyQueueManager.java   |  79 ++++++++++
 .../queue/LegacyQueueManagerFactory.java        |  23 +++
 .../LegacyQueueManagerInternalFactory.java      |  28 ++++
 .../persistence/queue/LegacyQueueMessage.java   |  70 +++++++++
 .../persistence/queue/LegacyQueueScope.java     |  45 ++++++
 .../persistence/queue/LocalQueueManager.java    |  18 +--
 .../usergrid/persistence/queue/Queue.java       |  35 -----
 .../usergrid/persistence/queue/QueueFig.java    | 106 -------------
 .../persistence/queue/QueueManager.java         |  79 ----------
 .../persistence/queue/QueueManagerFactory.java  |  23 ---
 .../queue/QueueManagerInternalFactory.java      |  28 ----
 .../persistence/queue/QueueMessage.java         |  70 ---------
 .../usergrid/persistence/queue/QueueScope.java  |  45 ------
 .../persistence/queue/guice/QueueModule.java    |  16 +-
 .../queue/impl/LegacyQueueScopeImpl.java        |  67 ++++++++
 .../queue/impl/QueueManagerFactoryImpl.java     |  20 +--
 .../persistence/queue/impl/QueueScopeImpl.java  |  68 --------
 .../queue/impl/SNSQueueManagerImpl.java         |  48 +++---
 .../queue/util/AmazonNotificationUtils.java     |   4 +-
 .../queue/LegacyQueueManagerTest.java           | 156 +++++++++++++++++++
 .../persistence/queue/QueueManagerTest.java     | 156 -------------------
 .../notifications/ApplicationQueueManager.java  |   4 +-
 .../ApplicationQueueManagerCache.java           |   6 +-
 .../notifications/NotificationsService.java     |  19 ++-
 .../services/notifications/QueueJob.java        |   1 -
 .../services/notifications/QueueListener.java   |  29 ++--
 .../impl/ApplicationQueueManagerImpl.java       |  18 +--
 .../services/queues/ImportQueueListener.java    |   8 +-
 .../services/queues/ImportQueueManager.java     |  12 +-
 .../usergrid/services/queues/QueueListener.java |  18 +--
 37 files changed, 803 insertions(+), 811 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
index 0bff887..6add88c 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
@@ -57,12 +57,12 @@ import org.apache.usergrid.persistence.map.impl.MapScopeImpl;
 import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.util.UUIDGenerator;
-import org.apache.usergrid.persistence.queue.QueueFig;
-import org.apache.usergrid.persistence.queue.QueueManager;
-import org.apache.usergrid.persistence.queue.QueueManagerFactory;
-import org.apache.usergrid.persistence.queue.QueueMessage;
-import org.apache.usergrid.persistence.queue.QueueScope;
-import org.apache.usergrid.persistence.queue.impl.QueueScopeImpl;
+import org.apache.usergrid.persistence.queue.LegacyQueueFig;
+import org.apache.usergrid.persistence.queue.LegacyQueueManager;
+import org.apache.usergrid.persistence.queue.LegacyQueueManagerFactory;
+import org.apache.usergrid.persistence.queue.LegacyQueueMessage;
+import org.apache.usergrid.persistence.queue.LegacyQueueScope;
+import org.apache.usergrid.persistence.queue.impl.LegacyQueueScopeImpl;
 
 import com.codahale.metrics.Counter;
 import com.codahale.metrics.Gauge;
@@ -104,9 +104,9 @@ public class AsyncEventServiceImpl implements AsyncEventService {
     public int MAX_TAKE = 10;
     public static final String QUEUE_NAME = "index"; //keep this short as AWS limits queue name size to 80 chars
 
-    private final QueueManager queue;
+    private final LegacyQueueManager queue;
     private final IndexProcessorFig indexProcessorFig;
-    private final QueueFig queueFig;
+    private final LegacyQueueFig queueFig;
     private final IndexProducer indexProducer;
     private final EntityCollectionManagerFactory entityCollectionManagerFactory;
     private final IndexLocationStrategyFactory indexLocationStrategyFactory;
@@ -134,7 +134,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
 
 
     @Inject
-    public AsyncEventServiceImpl(final QueueManagerFactory queueManagerFactory,
+    public AsyncEventServiceImpl(final LegacyQueueManagerFactory queueManagerFactory,
                                  final IndexProcessorFig indexProcessorFig,
                                  final IndexProducer indexProducer,
                                  final MetricsFactory metricsFactory,
@@ -143,7 +143,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
                                  final EntityIndexFactory entityIndexFactory,
                                  final EventBuilder eventBuilder,
                                  final MapManagerFactory mapManagerFactory,
-                                 final QueueFig queueFig,
+                                 final LegacyQueueFig queueFig,
                                  @EventExecutionScheduler
                                     final RxTaskScheduler rxTaskScheduler ) {
         this.indexProducer = indexProducer;
@@ -159,7 +159,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
 
         this.rxTaskScheduler = rxTaskScheduler;
 
-        QueueScope queueScope = new QueueScopeImpl(QUEUE_NAME, QueueScope.RegionImplementation.ALL);
+        LegacyQueueScope queueScope = new LegacyQueueScopeImpl(QUEUE_NAME, LegacyQueueScope.RegionImplementation.ALL);
         this.queue = queueManagerFactory.getQueueManager(queueScope);
 
         this.indexProcessorFig = indexProcessorFig;
@@ -233,7 +233,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
     /**
      * Take message from SQS
      */
-    private List<QueueMessage> take() {
+    private List<LegacyQueueMessage> take() {
 
         final Timer.Context timer = this.readTimer.time();
 
@@ -251,7 +251,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
     /**
      * Ack message in SQS
      */
-    public void ack(final List<QueueMessage> messages) {
+    public void ack(final List<LegacyQueueMessage> messages) {
 
         final Timer.Context timer = this.ackTimer.time();
 
@@ -275,7 +275,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
      * @param messages
      * @return
      */
-    private List<IndexEventResult> callEventHandlers(final List<QueueMessage> messages) {
+    private List<IndexEventResult> callEventHandlers(final List<LegacyQueueMessage> messages) {
 
         if (logger.isDebugEnabled()) {
             logger.debug("callEventHandlers with {} message(s)", messages.size());
@@ -422,7 +422,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
 
     }
 
-    private IndexOperationMessage handleEntityIndexUpdate(final QueueMessage message) {
+    private IndexOperationMessage handleEntityIndexUpdate(final LegacyQueueMessage message) {
 
         Preconditions.checkNotNull( message, "Queue Message cannot be null for handleEntityIndexUpdate" );
 
@@ -457,7 +457,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
 
     }
 
-    private IndexOperationMessage handleEdgeIndex(final QueueMessage message) {
+    private IndexOperationMessage handleEdgeIndex(final LegacyQueueMessage message) {
 
         Preconditions.checkNotNull( message, "Queue Message cannot be null for handleEdgeIndex" );
 
@@ -486,7 +486,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
         offer( new EdgeDeleteEvent( queueFig.getPrimaryRegion(), applicationScope, edge ) );
     }
 
-    private IndexOperationMessage  handleEdgeDelete(final QueueMessage message) {
+    private IndexOperationMessage  handleEdgeDelete(final LegacyQueueMessage message) {
 
         Preconditions.checkNotNull( message, "Queue Message cannot be null for handleEdgeDelete" );
 
@@ -668,7 +668,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
         offer( new EntityDeleteEvent(queueFig.getPrimaryRegion(), new EntityIdScope( applicationScope, entityId ) ) );
     }
 
-    private IndexOperationMessage handleEntityDelete(final QueueMessage message) {
+    private IndexOperationMessage handleEntityDelete(final LegacyQueueMessage message) {
 
         Preconditions.checkNotNull(message, "Queue Message cannot be null for handleEntityDelete");
 
@@ -700,7 +700,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
     }
 
 
-    private void handleInitializeApplicationIndex(final AsyncEvent event, final QueueMessage message) {
+    private void handleInitializeApplicationIndex(final AsyncEvent event, final LegacyQueueMessage message) {
         Preconditions.checkNotNull(message, "Queue Message cannot be null for handleInitializeApplicationIndex");
         Preconditions.checkArgument(event instanceof InitializeApplicationIndexEvent, String.format("Event Type for handleInitializeApplicationIndex must be APPLICATION_INDEX, got %s", event.getClass()));
 
@@ -741,15 +741,15 @@ public class AsyncEventServiceImpl implements AsyncEventService {
     private void startWorker() {
         synchronized (mutex) {
 
-            Observable<List<QueueMessage>> consumer =
-                    Observable.create( new Observable.OnSubscribe<List<QueueMessage>>() {
+            Observable<List<LegacyQueueMessage>> consumer =
+                    Observable.create( new Observable.OnSubscribe<List<LegacyQueueMessage>>() {
                         @Override
-                        public void call( final Subscriber<? super List<QueueMessage>> subscriber ) {
+                        public void call( final Subscriber<? super List<LegacyQueueMessage>> subscriber ) {
 
                             //name our thread so it's easy to see
                             Thread.currentThread().setName( "QueueConsumer_" + counter.incrementAndGet() );
 
-                            List<QueueMessage> drainList = null;
+                            List<LegacyQueueMessage> drainList = null;
 
                             do {
                                 try {
@@ -799,7 +799,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
                                                      List<IndexEventResult> indexEventResults = callEventHandlers( messages );
 
                                                      // submit the processed messages to index producer
-                                                     List<QueueMessage> messagesToAck = submitToIndex( indexEventResults );
+                                                     List<LegacyQueueMessage> messagesToAck = submitToIndex( indexEventResults );
 
                                                      if ( messagesToAck.size() < messages.size() ) {
                                                          logger.warn( "Missing {} message(s) from index processing",
@@ -834,7 +834,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
      * Submit results to index and return the queue messages to be ack'd
      *
      */
-    private List<QueueMessage> submitToIndex(List<IndexEventResult> indexEventResults) {
+    private List<LegacyQueueMessage> submitToIndex(List<IndexEventResult> indexEventResults) {
 
         // if nothing came back then return empty list
         if(indexEventResults==null){
@@ -842,7 +842,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
         }
 
         IndexOperationMessage combined = new IndexOperationMessage();
-        List<QueueMessage> queueMessages = indexEventResults.stream()
+        List<LegacyQueueMessage> queueMessages = indexEventResults.stream()
 
             // filter out messages that are not present, they were not processed and put into the results
             .filter( result -> result.getQueueMessage().isPresent() )
@@ -898,10 +898,10 @@ public class AsyncEventServiceImpl implements AsyncEventService {
 
     public class IndexEventResult{
         private final Optional<IndexOperationMessage> indexOperationMessage;
-        private final Optional<QueueMessage> queueMessage;
+        private final Optional<LegacyQueueMessage> queueMessage;
         private final long creationTime;
 
-        public IndexEventResult(Optional<IndexOperationMessage> indexOperationMessage, Optional<QueueMessage> queueMessage, long creationTime){
+        public IndexEventResult(Optional<IndexOperationMessage> indexOperationMessage, Optional<LegacyQueueMessage> queueMessage, long creationTime){
 
             this.queueMessage = queueMessage;
             this.creationTime = creationTime;
@@ -912,7 +912,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
             return indexOperationMessage;
         }
 
-        public Optional<QueueMessage> getQueueMessage() {
+        public Optional<LegacyQueueMessage> getQueueMessage() {
             return queueMessage;
         }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
index abd4ce1..aac0e66 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
@@ -29,8 +29,8 @@ import org.apache.usergrid.persistence.index.EntityIndexFactory;
 import org.apache.usergrid.persistence.index.impl.IndexProducer;
 import org.apache.usergrid.persistence.queue.LocalQueueManager;
 import org.apache.usergrid.persistence.map.MapManagerFactory;
-import org.apache.usergrid.persistence.queue.QueueFig;
-import org.apache.usergrid.persistence.queue.QueueManagerFactory;
+import org.apache.usergrid.persistence.queue.LegacyQueueFig;
+import org.apache.usergrid.persistence.queue.LegacyQueueManagerFactory;
 
 import com.google.inject.Inject;
 import com.google.inject.Provider;
@@ -45,7 +45,7 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> {
 
     private final IndexProcessorFig indexProcessorFig;
 
-    private final QueueManagerFactory queueManagerFactory;
+    private final LegacyQueueManagerFactory queueManagerFactory;
     private final MetricsFactory metricsFactory;
     private final RxTaskScheduler rxTaskScheduler;
     private final EntityCollectionManagerFactory entityCollectionManagerFactory;
@@ -54,14 +54,14 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> {
     private final EntityIndexFactory entityIndexFactory;
     private final IndexProducer indexProducer;
     private final MapManagerFactory mapManagerFactory;
-    private final QueueFig queueFig;
+    private final LegacyQueueFig queueFig;
 
     private AsyncEventService asyncEventService;
 
 
     @Inject
     public AsyncIndexProvider(final IndexProcessorFig indexProcessorFig,
-                              final QueueManagerFactory queueManagerFactory,
+                              final LegacyQueueManagerFactory queueManagerFactory,
                               final MetricsFactory metricsFactory,
                               @EventExecutionScheduler final RxTaskScheduler rxTaskScheduler,
                               final EntityCollectionManagerFactory entityCollectionManagerFactory,
@@ -70,7 +70,7 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> {
                               final EntityIndexFactory entityIndexFactory,
                               final IndexProducer indexProducer,
                               final MapManagerFactory mapManagerFactory,
-                              final QueueFig queueFig) {
+                              final LegacyQueueFig queueFig) {
 
         this.indexProcessorFig = indexProcessorFig;
         this.queueManagerFactory = queueManagerFactory;

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncEventServiceImplTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncEventServiceImplTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncEventServiceImplTest.java
index 4c0058b..92b5983 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncEventServiceImplTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncEventServiceImplTest.java
@@ -34,8 +34,8 @@ import org.apache.usergrid.persistence.index.EntityIndexFactory;
 import org.apache.usergrid.persistence.index.impl.EsRunner;
 import org.apache.usergrid.persistence.index.impl.IndexProducer;
 import org.apache.usergrid.persistence.map.MapManagerFactory;
-import org.apache.usergrid.persistence.queue.QueueFig;
-import org.apache.usergrid.persistence.queue.QueueManagerFactory;
+import org.apache.usergrid.persistence.queue.LegacyQueueFig;
+import org.apache.usergrid.persistence.queue.LegacyQueueManagerFactory;
 import org.junit.Rule;
 import org.junit.runner.RunWith;
 
@@ -53,13 +53,13 @@ public class AsyncEventServiceImplTest extends AsyncIndexServiceTest {
 
 
     @Inject
-    public QueueManagerFactory queueManagerFactory;
+    public LegacyQueueManagerFactory queueManagerFactory;
 
     @Inject
     public IndexProcessorFig indexProcessorFig;
 
     @Inject
-    public QueueFig queueFig;
+    public LegacyQueueFig queueFig;
 
 
     @Inject

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/core/src/test/java/org/apache/usergrid/mq/LegacyQueuePathsTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/mq/LegacyQueuePathsTest.java b/stack/core/src/test/java/org/apache/usergrid/mq/LegacyQueuePathsTest.java
new file mode 100644
index 0000000..5ffa553
--- /dev/null
+++ b/stack/core/src/test/java/org/apache/usergrid/mq/LegacyQueuePathsTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.mq;
+
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.usergrid.mq.Queue.getQueueParentPaths;
+import static org.apache.usergrid.mq.Queue.normalizeQueuePath;
+import static org.apache.usergrid.utils.JsonUtils.mapToFormattedJsonString;
+
+
+
+public class LegacyQueuePathsTest {
+    private static final Logger logger = LoggerFactory.getLogger( LegacyQueuePathsTest.class );
+
+
+    @Test
+    // TODO - why does this test case not have assertions to test results?
+    // tests should not be written like this: what's the point? If it's
+    // code coverage this is still bad.
+    public void testPaths() throws Exception {
+        logger.info( normalizeQueuePath( "a/b/c" ) );
+        logger.info( normalizeQueuePath( "a/b/c/" ) );
+        logger.info( normalizeQueuePath( "/a/b/c" ) );
+        logger.info( normalizeQueuePath( "/////a/b/c" ) );
+        logger.info( normalizeQueuePath( "/" ) );
+
+        logger.info( mapToFormattedJsonString( getQueueParentPaths( "/a/b/c" ) ) );
+        logger.info( mapToFormattedJsonString( getQueueParentPaths( "/" ) ) );
+    }
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/core/src/test/java/org/apache/usergrid/mq/QueuePathsTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/mq/QueuePathsTest.java b/stack/core/src/test/java/org/apache/usergrid/mq/QueuePathsTest.java
deleted file mode 100644
index 86dc8bc..0000000
--- a/stack/core/src/test/java/org/apache/usergrid/mq/QueuePathsTest.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.usergrid.mq;
-
-
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.usergrid.mq.Queue.getQueueParentPaths;
-import static org.apache.usergrid.mq.Queue.normalizeQueuePath;
-import static org.apache.usergrid.utils.JsonUtils.mapToFormattedJsonString;
-
-
-
-public class QueuePathsTest {
-    private static final Logger logger = LoggerFactory.getLogger( QueuePathsTest.class );
-
-
-    @Test
-    // TODO - why does this test case not have assertions to test results?
-    // tests should not be written like this: what's the point? If it's
-    // code coverage this is still bad.
-    public void testPaths() throws Exception {
-        logger.info( normalizeQueuePath( "a/b/c" ) );
-        logger.info( normalizeQueuePath( "a/b/c/" ) );
-        logger.info( normalizeQueuePath( "/a/b/c" ) );
-        logger.info( normalizeQueuePath( "/////a/b/c" ) );
-        logger.info( normalizeQueuePath( "/" ) );
-
-        logger.info( mapToFormattedJsonString( getQueueParentPaths( "/a/b/c" ) ) );
-        logger.info( mapToFormattedJsonString( getQueueParentPaths( "/" ) ) );
-    }
-}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueue.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueue.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueue.java
new file mode 100644
index 0000000..c12dc41
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueue.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  The ASF licenses this file to You
+ * under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.  For additional information regarding
+ * copyright in this work, please see the NOTICE file in the top level
+ * directory of this distribution.
+ */
+package org.apache.usergrid.persistence.queue;
+
+
+public class LegacyQueue {
+    private final String url;
+
+    public LegacyQueue(String url) {
+        this.url = url;
+    }
+
+    public String getUrl(){
+        return url;
+    }
+
+    public boolean isEmpty(){
+        return url == null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueFig.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueFig.java
new file mode 100644
index 0000000..907eec2
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueFig.java
@@ -0,0 +1,106 @@
+package org.apache.usergrid.persistence.queue;
+
+import org.safehaus.guicyfig.Default;
+import org.safehaus.guicyfig.FigSingleton;
+import org.safehaus.guicyfig.GuicyFig;
+import org.safehaus.guicyfig.Key;
+
+@FigSingleton
+public interface LegacyQueueFig extends GuicyFig {
+
+    /**
+     * Any region value string must exactly match the region names specified on this page:
+     *
+     * http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/using-regions-availability-zones.html*
+     */
+
+
+    /**
+     * Primary region to use for Amazon queues.
+     */
+    @Key( "usergrid.queue.region" )
+    @Default("us-east-1")
+    String getPrimaryRegion();
+
+    /**
+     * Flag to determine if Usergrid should use a multi-region Amazon queue
+     * implementation.
+     */
+    @Key( "usergrid.queue.multiregion" )
+    @Default("false")
+    boolean isMultiRegion();
+
+    /**
+     * Comma-separated list of one or more Amazon regions to use if multiregion
+     * is set to true.
+     */
+    @Key( "usergrid.queue.regionList" )
+    @Default("us-east-1")
+    String getRegionList();
+
+
+    /**
+     * Set the amount of time (in minutes) to retain messages in a queue.
+     * 1209600 = 14 days (maximum retention period)
+     */
+    @Key( "usergrid.queue.retention" )
+    @Default("1209600")
+    String getRetentionPeriod();
+
+    /**
+     * Set the amount of time (in minutes) to retain messages in a dead letter queue.
+     * 1209600 = 14 days (maximum retention period)
+     */
+    @Key( "usergrid.queue.deadletter.retention" )
+    @Default("1209600")
+    String getDeadletterRetentionPeriod();
+
+    /**
+     * The maximum number of attempts to attempt to deliver before failing into the DLQ
+     */
+    @Key( "usergrid.queue.deliveryLimit" )
+    @Default("10")
+    String getQueueDeliveryLimit();
+
+    @Key("usergrid.use.default.queue")
+    @Default("false")
+    boolean overrideQueueForDefault();
+
+    @Key("usergrid.queue.publish.threads")
+    @Default("100")
+    int getAsyncMaxThreads();
+
+    // current msg size 1.2kb * 850000 = 1.02 GB (let this default be the most we'll queue in heap)
+    @Key("usergrid.queue.publish.queuesize")
+    @Default("250000")
+    int getAsyncQueueSize();
+
+    /**
+     * Set the visibility timeout (in milliseconds) for faster retries
+     * @return
+     */
+    @Key( "usergrid.queue.visibilityTimeout" )
+    @Default("5000") // 5 seconds
+    int getVisibilityTimeout();
+
+    @Key( "usergrid.queue.localquorum.timeout")
+    @Default("30000") // 30 seconds
+    int getLocalQuorumTimeout();
+
+    @Key( "usergrid.queue.client.connection.timeout")
+    @Default( "5000" ) // 5 seconds
+    int getQueueClientConnectionTimeout();
+
+    @Key( "usergrid.queue.client.socket.timeout")
+    @Default( "50000" ) // 50 seconds
+    int getQueueClientSocketTimeout();
+
+    @Key( "usergrid.queue.poll.timeout")
+    @Default( "10000" ) // 10 seconds
+    int getQueuePollTimeout();
+
+    @Key( "usergrid.queue.quorum.fallback")
+    @Default("false") // 30 seconds
+    boolean getQuorumFallback();
+
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueManager.java
new file mode 100644
index 0000000..053dd36
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueManager.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  The ASF licenses this file to You
+ * under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.  For additional information regarding
+ * copyright in this work, please see the NOTICE file in the top level
+ * directory of this distribution.
+ */
+package org.apache.usergrid.persistence.queue;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+
+/**ctor
+ * Manages queues for usergrid.  Current implementation is sqs based.
+ */
+public interface LegacyQueueManager {
+
+    /**
+     * Read messages from queue
+     * @param limit
+     * @param klass class to cast the return from
+     * @return List of Queue Messages
+     */
+    List<LegacyQueueMessage> getMessages(int limit, Class klass);
+
+    /**
+     * get the queue depth
+     * @return
+     */
+    long getQueueDepth();
+
+    /**
+     * Commit the transaction
+     * @param queueMessage
+     */
+    void commitMessage( LegacyQueueMessage queueMessage);
+
+    /**
+     * commit multiple messages
+     * @param queueMessages
+     */
+    void commitMessages( List<LegacyQueueMessage> queueMessages);
+
+    /**
+     * send messages to queue
+     * @param bodies body objects must be serializable
+     * @throws IOException
+     */
+    void sendMessages(List bodies) throws IOException;
+
+    /**
+     * send a message to queue
+     * @param body
+     * @throws IOException
+     */
+    <T extends Serializable> void sendMessage(T body)throws IOException;
+
+    /**
+     * Send a messae to the topic to be sent to other queues
+     * @param body
+     */
+    <T extends Serializable> void sendMessageToTopic(T body) throws IOException;
+
+    /**
+     * purge messages
+     */
+    void deleteQueue();
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerFactory.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerFactory.java
new file mode 100644
index 0000000..53afc22
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerFactory.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  The ASF licenses this file to You
+ * under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.  For additional information regarding
+ * copyright in this work, please see the NOTICE file in the top level
+ * directory of this distribution.
+ */
+package org.apache.usergrid.persistence.queue;
+
+public interface LegacyQueueManagerFactory {
+    public LegacyQueueManager getQueueManager(final LegacyQueueScope scope );
+
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerInternalFactory.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerInternalFactory.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerInternalFactory.java
new file mode 100644
index 0000000..986f211
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerInternalFactory.java
@@ -0,0 +1,28 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  *  contributor license agreements.  The ASF licenses this file to You
+ *  * under the Apache License, Version 2.0 (the "License"); you may not
+ *  * use this file except in compliance with the License.
+ *  * You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.  For additional information regarding
+ *  * copyright in this work, please see the NOTICE file in the top level
+ *  * directory of this distribution.
+ *
+ */
+package org.apache.usergrid.persistence.queue;
+
+/**
+ * QueueManagerInternal guice factory
+ */
+public interface LegacyQueueManagerInternalFactory {
+    LegacyQueueManager getQueueManager(final LegacyQueueScope scope );
+
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueMessage.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueMessage.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueMessage.java
new file mode 100644
index 0000000..939443d
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueMessage.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  The ASF licenses this file to You
+ * under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.  For additional information regarding
+ * copyright in this work, please see the NOTICE file in the top level
+ * directory of this distribution.
+ */
+package org.apache.usergrid.persistence.queue;
+
+public class LegacyQueueMessage {
+    private final Object body;
+    private final String messageId;
+    private final String handle;
+    private final String type;
+    private String stringBody;
+    private int receiveCount;
+
+
+    public LegacyQueueMessage(String messageId, String handle, Object body, String type) {
+        this.body = body;
+        this.messageId = messageId;
+        this.handle = handle;
+        this.type = type;
+        this.stringBody = "";
+        this.receiveCount = 1; // we'll always receive once if we're taking it off the in mem or AWS queue
+    }
+
+    public String getHandle() {
+        return handle;
+    }
+
+    public Object getBody(){
+        return body;
+    }
+
+    public String getMessageId() {
+        return messageId;
+    }
+
+
+    public String getType() {
+        return type;
+    }
+
+    public void setStringBody(String stringBody) {
+        this.stringBody = stringBody;
+    }
+
+    public String getStringBody() {
+        return stringBody;
+    }
+
+    public void setReceiveCount(int receiveCount){
+        this.receiveCount = receiveCount;
+    }
+
+    public int getReceiveCount(){
+        return receiveCount;
+    }
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueScope.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueScope.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueScope.java
new file mode 100644
index 0000000..1f932b2
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LegacyQueueScope.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.persistence.queue;
+
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+
+
+public interface LegacyQueueScope {
+
+    /**
+     * LOCAL will create a SNS topic with a queue subscription in a single AWS region.
+     * ALL will create SNS topics and queue subscriptions  in ALL AWS regions.
+     */
+    enum RegionImplementation {
+        LOCAL,
+        ALL
+    }
+
+    /**
+     * Get the name of the the map
+     * @return
+     */
+    public String getName();
+
+    /**
+     * Get the Usergrid region enum
+     */
+    public RegionImplementation getRegionImplementation();
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LocalQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LocalQueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LocalQueueManager.java
index 1f4261a..630c2e7 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LocalQueueManager.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/LocalQueueManager.java
@@ -34,17 +34,17 @@ import java.util.concurrent.TimeUnit;
 /**
  * Default queue manager implementation, uses in memory linked queue
  */
-public class LocalQueueManager implements QueueManager {
+public class LocalQueueManager implements LegacyQueueManager {
 
     private static final Logger logger = LoggerFactory.getLogger(LocalQueueManager.class);
 
-    public ArrayBlockingQueue<QueueMessage> queue = new ArrayBlockingQueue<>(10000);
+    public ArrayBlockingQueue<LegacyQueueMessage> queue = new ArrayBlockingQueue<>(10000);
 
     @Override
-    public    List<QueueMessage> getMessages(int limit, Class klass) {
-        List<QueueMessage> returnQueue = new ArrayList<>();
+    public    List<LegacyQueueMessage> getMessages(int limit, Class klass) {
+        List<LegacyQueueMessage> returnQueue = new ArrayList<>();
         try {
-            QueueMessage message=null;
+            LegacyQueueMessage message=null;
             int count = 5;
             do {
                 message = queue.poll(100, TimeUnit.MILLISECONDS);
@@ -64,11 +64,11 @@ public class LocalQueueManager implements QueueManager {
     }
 
     @Override
-    public void commitMessage(QueueMessage queueMessage) {
+    public void commitMessage(LegacyQueueMessage queueMessage) {
     }
 
     @Override
-    public void commitMessages(List<QueueMessage> queueMessages) {
+    public void commitMessages(List<LegacyQueueMessage> queueMessages) {
     }
 
     @Override
@@ -76,7 +76,7 @@ public class LocalQueueManager implements QueueManager {
         for(Object body : bodies){
             String uuid = UUID.randomUUID().toString();
             try {
-                queue.put(new QueueMessage(uuid, "handle_" + uuid, body, "put type here"));
+                queue.put(new LegacyQueueMessage(uuid, "handle_" + uuid, body, "put type here"));
             }catch (InterruptedException ie){
                 throw new RuntimeException(ie);
             }
@@ -88,7 +88,7 @@ public class LocalQueueManager implements QueueManager {
     public <T extends Serializable> void sendMessage( final T body ) throws IOException {
         String uuid = UUID.randomUUID().toString();
         try {
-            queue.offer(new QueueMessage(uuid, "handle_" + uuid, body, "put type here"),5000,TimeUnit.MILLISECONDS);
+            queue.offer(new LegacyQueueMessage(uuid, "handle_" + uuid, body, "put type here"),5000,TimeUnit.MILLISECONDS);
         }catch (InterruptedException ie){
             throw new RuntimeException(ie);
         }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/Queue.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/Queue.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/Queue.java
deleted file mode 100644
index 24070d0..0000000
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/Queue.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- *  contributor license agreements.  The ASF licenses this file to You
- * under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.  For additional information regarding
- * copyright in this work, please see the NOTICE file in the top level
- * directory of this distribution.
- */
-package org.apache.usergrid.persistence.queue;
-
-
-public class Queue {
-    private final String url;
-
-    public Queue(String url) {
-        this.url = url;
-    }
-
-    public String getUrl(){
-        return url;
-    }
-
-    public boolean isEmpty(){
-        return url == null;
-    }
-}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java
deleted file mode 100644
index 74912ae..0000000
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java
+++ /dev/null
@@ -1,106 +0,0 @@
-package org.apache.usergrid.persistence.queue;
-
-import org.safehaus.guicyfig.Default;
-import org.safehaus.guicyfig.FigSingleton;
-import org.safehaus.guicyfig.GuicyFig;
-import org.safehaus.guicyfig.Key;
-
-@FigSingleton
-public interface QueueFig extends GuicyFig {
-
-    /**
-     * Any region value string must exactly match the region names specified on this page:
-     *
-     * http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/using-regions-availability-zones.html*
-     */
-
-
-    /**
-     * Primary region to use for Amazon queues.
-     */
-    @Key( "usergrid.queue.region" )
-    @Default("us-east-1")
-    String getPrimaryRegion();
-
-    /**
-     * Flag to determine if Usergrid should use a multi-region Amazon queue
-     * implementation.
-     */
-    @Key( "usergrid.queue.multiregion" )
-    @Default("false")
-    boolean isMultiRegion();
-
-    /**
-     * Comma-separated list of one or more Amazon regions to use if multiregion
-     * is set to true.
-     */
-    @Key( "usergrid.queue.regionList" )
-    @Default("us-east-1")
-    String getRegionList();
-
-
-    /**
-     * Set the amount of time (in minutes) to retain messages in a queue.
-     * 1209600 = 14 days (maximum retention period)
-     */
-    @Key( "usergrid.queue.retention" )
-    @Default("1209600")
-    String getRetentionPeriod();
-
-    /**
-     * Set the amount of time (in minutes) to retain messages in a dead letter queue.
-     * 1209600 = 14 days (maximum retention period)
-     */
-    @Key( "usergrid.queue.deadletter.retention" )
-    @Default("1209600")
-    String getDeadletterRetentionPeriod();
-
-    /**
-     * The maximum number of attempts to attempt to deliver before failing into the DLQ
-     */
-    @Key( "usergrid.queue.deliveryLimit" )
-    @Default("10")
-    String getQueueDeliveryLimit();
-
-    @Key("usergrid.use.default.queue")
-    @Default("false")
-    boolean overrideQueueForDefault();
-
-    @Key("usergrid.queue.publish.threads")
-    @Default("100")
-    int getAsyncMaxThreads();
-
-    // current msg size 1.2kb * 850000 = 1.02 GB (let this default be the most we'll queue in heap)
-    @Key("usergrid.queue.publish.queuesize")
-    @Default("250000")
-    int getAsyncQueueSize();
-
-    /**
-     * Set the visibility timeout (in milliseconds) for faster retries
-     * @return
-     */
-    @Key( "usergrid.queue.visibilityTimeout" )
-    @Default("5000") // 5 seconds
-    int getVisibilityTimeout();
-
-    @Key( "usergrid.queue.localquorum.timeout")
-    @Default("30000") // 30 seconds
-    int getLocalQuorumTimeout();
-
-    @Key( "usergrid.queue.client.connection.timeout")
-    @Default( "5000" ) // 5 seconds
-    int getQueueClientConnectionTimeout();
-
-    @Key( "usergrid.queue.client.socket.timeout")
-    @Default( "50000" ) // 50 seconds
-    int getQueueClientSocketTimeout();
-
-    @Key( "usergrid.queue.poll.timeout")
-    @Default( "10000" ) // 10 seconds
-    int getQueuePollTimeout();
-
-    @Key( "usergrid.queue.quorum.fallback")
-    @Default("false") // 30 seconds
-    boolean getQuorumFallback();
-
-}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
deleted file mode 100644
index d2e29cb..0000000
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManager.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- *  contributor license agreements.  The ASF licenses this file to You
- * under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.  For additional information regarding
- * copyright in this work, please see the NOTICE file in the top level
- * directory of this distribution.
- */
-package org.apache.usergrid.persistence.queue;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.List;
-
-/**ctor
- * Manages queues for usergrid.  Current implementation is sqs based.
- */
-public interface QueueManager {
-
-    /**
-     * Read messages from queue
-     * @param limit
-     * @param klass class to cast the return from
-     * @return List of Queue Messages
-     */
-    List<QueueMessage> getMessages(int limit, Class klass);
-
-    /**
-     * get the queue depth
-     * @return
-     */
-    long getQueueDepth();
-
-    /**
-     * Commit the transaction
-     * @param queueMessage
-     */
-    void commitMessage( QueueMessage queueMessage);
-
-    /**
-     * commit multiple messages
-     * @param queueMessages
-     */
-    void commitMessages( List<QueueMessage> queueMessages);
-
-    /**
-     * send messages to queue
-     * @param bodies body objects must be serializable
-     * @throws IOException
-     */
-    void sendMessages(List bodies) throws IOException;
-
-    /**
-     * send a message to queue
-     * @param body
-     * @throws IOException
-     */
-    <T extends Serializable> void sendMessage(T body)throws IOException;
-
-    /**
-     * Send a messae to the topic to be sent to other queues
-     * @param body
-     */
-    <T extends Serializable> void sendMessageToTopic(T body) throws IOException;
-
-    /**
-     * purge messages
-     */
-    void deleteQueue();
-}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManagerFactory.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManagerFactory.java
deleted file mode 100644
index 4cdb5e2..0000000
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManagerFactory.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- *  contributor license agreements.  The ASF licenses this file to You
- * under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.  For additional information regarding
- * copyright in this work, please see the NOTICE file in the top level
- * directory of this distribution.
- */
-package org.apache.usergrid.persistence.queue;
-
-public interface QueueManagerFactory {
-    public QueueManager getQueueManager( final QueueScope scope );
-
-}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManagerInternalFactory.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManagerInternalFactory.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManagerInternalFactory.java
deleted file mode 100644
index 119c064..0000000
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManagerInternalFactory.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  *  contributor license agreements.  The ASF licenses this file to You
- *  * under the Apache License, Version 2.0 (the "License"); you may not
- *  * use this file except in compliance with the License.
- *  * You may obtain a copy of the License at
- *  *
- *  *     http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.  For additional information regarding
- *  * copyright in this work, please see the NOTICE file in the top level
- *  * directory of this distribution.
- *
- */
-package org.apache.usergrid.persistence.queue;
-
-/**
- * QueueManagerInternal guice factory
- */
-public interface QueueManagerInternalFactory {
-    QueueManager getQueueManager( final QueueScope scope );
-
-}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueMessage.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueMessage.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueMessage.java
deleted file mode 100644
index f8ce6ef..0000000
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueMessage.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- *  contributor license agreements.  The ASF licenses this file to You
- * under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.  For additional information regarding
- * copyright in this work, please see the NOTICE file in the top level
- * directory of this distribution.
- */
-package org.apache.usergrid.persistence.queue;
-
-public class QueueMessage {
-    private final Object body;
-    private final String messageId;
-    private final String handle;
-    private final String type;
-    private String stringBody;
-    private int receiveCount;
-
-
-    public QueueMessage(String messageId, String handle, Object body,String type) {
-        this.body = body;
-        this.messageId = messageId;
-        this.handle = handle;
-        this.type = type;
-        this.stringBody = "";
-        this.receiveCount = 1; // we'll always receive once if we're taking it off the in mem or AWS queue
-    }
-
-    public String getHandle() {
-        return handle;
-    }
-
-    public Object getBody(){
-        return body;
-    }
-
-    public String getMessageId() {
-        return messageId;
-    }
-
-
-    public String getType() {
-        return type;
-    }
-
-    public void setStringBody(String stringBody) {
-        this.stringBody = stringBody;
-    }
-
-    public String getStringBody() {
-        return stringBody;
-    }
-
-    public void setReceiveCount(int receiveCount){
-        this.receiveCount = receiveCount;
-    }
-
-    public int getReceiveCount(){
-        return receiveCount;
-    }
-}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueScope.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueScope.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueScope.java
deleted file mode 100644
index f58584f..0000000
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueScope.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.usergrid.persistence.queue;
-
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-
-
-public interface QueueScope  {
-
-    /**
-     * LOCAL will create a SNS topic with a queue subscription in a single AWS region.
-     * ALL will create SNS topics and queue subscriptions  in ALL AWS regions.
-     */
-    enum RegionImplementation {
-        LOCAL,
-        ALL
-    }
-
-    /**
-     * Get the name of the the map
-     * @return
-     */
-    public String getName();
-
-    /**
-     * Get the Usergrid region enum
-     */
-    public RegionImplementation getRegionImplementation();
-}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java
index caf61bf..6d62da0 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/guice/QueueModule.java
@@ -18,14 +18,14 @@
 package org.apache.usergrid.persistence.queue.guice;
 
 
-import org.apache.usergrid.persistence.queue.QueueManagerInternalFactory;
+import org.apache.usergrid.persistence.queue.LegacyQueueManagerInternalFactory;
 import org.apache.usergrid.persistence.queue.impl.QueueManagerFactoryImpl;
 import org.apache.usergrid.persistence.queue.impl.SNSQueueManagerImpl;
 import org.safehaus.guicyfig.GuicyFigModule;
 
-import org.apache.usergrid.persistence.queue.QueueFig;
-import org.apache.usergrid.persistence.queue.QueueManager;
-import org.apache.usergrid.persistence.queue.QueueManagerFactory;
+import org.apache.usergrid.persistence.queue.LegacyQueueFig;
+import org.apache.usergrid.persistence.queue.LegacyQueueManager;
+import org.apache.usergrid.persistence.queue.LegacyQueueManagerFactory;
 
 import com.google.inject.AbstractModule;
 import com.google.inject.assistedinject.FactoryModuleBuilder;
@@ -42,11 +42,11 @@ public class QueueModule extends AbstractModule {
     @Override
     protected void configure() {
 
-        install(new GuicyFigModule(QueueFig.class));
+        install(new GuicyFigModule(LegacyQueueFig.class));
 
-        bind(QueueManagerFactory.class).to(QueueManagerFactoryImpl.class);
-        install(new FactoryModuleBuilder().implement(QueueManager.class, SNSQueueManagerImpl.class)
-            .build(QueueManagerInternalFactory.class));
+        bind(LegacyQueueManagerFactory.class).to(QueueManagerFactoryImpl.class);
+        install(new FactoryModuleBuilder().implement(LegacyQueueManager.class, SNSQueueManagerImpl.class)
+            .build(LegacyQueueManagerInternalFactory.class));
 
     }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/LegacyQueueScopeImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/LegacyQueueScopeImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/LegacyQueueScopeImpl.java
new file mode 100644
index 0000000..51d6c03
--- /dev/null
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/LegacyQueueScopeImpl.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  The ASF licenses this file to You
+ * under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.  For additional information regarding
+ * copyright in this work, please see the NOTICE file in the top level
+ * directory of this distribution.
+ */
+package org.apache.usergrid.persistence.queue.impl;
+
+import org.apache.usergrid.persistence.queue.LegacyQueueScope;
+
+public class LegacyQueueScopeImpl implements LegacyQueueScope {
+
+    private final String name;
+    private final RegionImplementation regionImpl;
+
+    public LegacyQueueScopeImpl(final String name, final RegionImplementation regionImpl) {
+        this.name = name;
+        this.regionImpl = regionImpl;
+    }
+
+
+
+
+    @Override
+    public String getName() {
+        return name;
+    }
+
+    @Override
+    public RegionImplementation getRegionImplementation() {return regionImpl;}
+
+    @Override
+    public boolean equals( final Object o ) {
+        if ( this == o ) {
+            return true;
+        }
+        if ( !( o instanceof LegacyQueueScopeImpl) ) {
+            return false;
+        }
+
+        final LegacyQueueScopeImpl queueScope = (LegacyQueueScopeImpl) o;
+
+        if ( !name.equals( queueScope.name ) ) {
+            return false;
+        }
+
+
+        return true;
+    }
+
+
+    @Override
+    public int hashCode() {
+        return name.hashCode();
+    }
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerFactoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerFactoryImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerFactoryImpl.java
index 93b2fb2..2d51903 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerFactoryImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerFactoryImpl.java
@@ -33,26 +33,26 @@ import java.util.concurrent.ExecutionException;
  * manages whether we take in an external in memory override for queues.
  */
 @Singleton
-public class QueueManagerFactoryImpl implements QueueManagerFactory {
+public class QueueManagerFactoryImpl implements LegacyQueueManagerFactory {
 
     private static final Logger logger = LoggerFactory.getLogger( QueueManagerFactoryImpl.class );
 
-    private final QueueFig queueFig;
-    private final QueueManagerInternalFactory queuemanagerInternalFactory;
-    private final Map<String,QueueManager> defaultManager;
-    private final LoadingCache<QueueScope, QueueManager> queueManager =
+    private final LegacyQueueFig queueFig;
+    private final LegacyQueueManagerInternalFactory queuemanagerInternalFactory;
+    private final Map<String,LegacyQueueManager> defaultManager;
+    private final LoadingCache<LegacyQueueScope, LegacyQueueManager> queueManager =
         CacheBuilder
             .newBuilder()
             .initialCapacity(5)
             .maximumSize(100)
-            .build(new CacheLoader<QueueScope, QueueManager>() {
+            .build(new CacheLoader<LegacyQueueScope, LegacyQueueManager>() {
 
                 @Override
-                public QueueManager load( QueueScope scope ) throws Exception {
+                public LegacyQueueManager load(LegacyQueueScope scope ) throws Exception {
 
                     if ( queueFig.overrideQueueForDefault() ){
 
-                        QueueManager manager = defaultManager.get( scope.getName() );
+                        LegacyQueueManager manager = defaultManager.get( scope.getName() );
                         if ( manager == null ) {
                             manager = new LocalQueueManager();
                             defaultManager.put( scope.getName(), manager );
@@ -67,14 +67,14 @@ public class QueueManagerFactoryImpl implements QueueManagerFactory {
             });
 
     @Inject
-    public QueueManagerFactoryImpl(final QueueFig queueFig, final QueueManagerInternalFactory queuemanagerInternalFactory){
+    public QueueManagerFactoryImpl(final LegacyQueueFig queueFig, final LegacyQueueManagerInternalFactory queuemanagerInternalFactory){
         this.queueFig = queueFig;
         this.queuemanagerInternalFactory = queuemanagerInternalFactory;
         this.defaultManager = new HashMap<>(10);
     }
 
     @Override
-    public QueueManager getQueueManager(QueueScope scope) {
+    public LegacyQueueManager getQueueManager(LegacyQueueScope scope) {
 
         try {
             return queueManager.get(scope);

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueScopeImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueScopeImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueScopeImpl.java
deleted file mode 100644
index fa50b49..0000000
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueScopeImpl.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- *  contributor license agreements.  The ASF licenses this file to You
- * under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.  For additional information regarding
- * copyright in this work, please see the NOTICE file in the top level
- * directory of this distribution.
- */
-package org.apache.usergrid.persistence.queue.impl;
-
-import org.apache.usergrid.persistence.model.entity.Id;
-import org.apache.usergrid.persistence.queue.QueueScope;
-
-public class QueueScopeImpl implements QueueScope {
-
-    private final String name;
-    private final RegionImplementation regionImpl;
-
-    public QueueScopeImpl(  final String name, final RegionImplementation regionImpl) {
-        this.name = name;
-        this.regionImpl = regionImpl;
-    }
-
-
-
-
-    @Override
-    public String getName() {
-        return name;
-    }
-
-    @Override
-    public RegionImplementation getRegionImplementation() {return regionImpl;}
-
-    @Override
-    public boolean equals( final Object o ) {
-        if ( this == o ) {
-            return true;
-        }
-        if ( !( o instanceof QueueScopeImpl ) ) {
-            return false;
-        }
-
-        final QueueScopeImpl queueScope = ( QueueScopeImpl ) o;
-
-        if ( !name.equals( queueScope.name ) ) {
-            return false;
-        }
-
-
-        return true;
-    }
-
-
-    @Override
-    public int hashCode() {
-        return name.hashCode();
-    }
-}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
index ae11517..6d4e0c4 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
@@ -35,11 +35,11 @@ import org.slf4j.LoggerFactory;
 import org.apache.usergrid.persistence.core.CassandraFig;
 import org.apache.usergrid.persistence.core.executor.TaskExecutorFactory;
 import org.apache.usergrid.persistence.core.guicyfig.ClusterFig;
-import org.apache.usergrid.persistence.queue.Queue;
-import org.apache.usergrid.persistence.queue.QueueFig;
-import org.apache.usergrid.persistence.queue.QueueManager;
-import org.apache.usergrid.persistence.queue.QueueMessage;
-import org.apache.usergrid.persistence.queue.QueueScope;
+import org.apache.usergrid.persistence.queue.LegacyQueue;
+import org.apache.usergrid.persistence.queue.LegacyQueueFig;
+import org.apache.usergrid.persistence.queue.LegacyQueueManager;
+import org.apache.usergrid.persistence.queue.LegacyQueueMessage;
+import org.apache.usergrid.persistence.queue.LegacyQueueScope;
 import org.apache.usergrid.persistence.queue.util.AmazonNotificationUtils;
 
 import com.amazonaws.AmazonServiceException;
@@ -80,12 +80,12 @@ import com.google.inject.Inject;
 import com.google.inject.assistedinject.Assisted;
 
 
-public class SNSQueueManagerImpl implements QueueManager {
+public class SNSQueueManagerImpl implements LegacyQueueManager {
 
     private static final Logger logger = LoggerFactory.getLogger( SNSQueueManagerImpl.class );
 
-    private final QueueScope scope;
-    private final QueueFig fig;
+    private final LegacyQueueScope scope;
+    private final LegacyQueueFig fig;
     private final ClusterFig clusterFig;
     private final CassandraFig cassandraFig;
     private final ClientConfiguration clientConfiguration;
@@ -121,16 +121,16 @@ public class SNSQueueManagerImpl implements QueueManager {
             }
         } );
 
-    private final LoadingCache<String, Queue> readQueueUrlMap =
-        CacheBuilder.newBuilder().maximumSize( 1000 ).build( new CacheLoader<String, Queue>() {
+    private final LoadingCache<String, LegacyQueue> readQueueUrlMap =
+        CacheBuilder.newBuilder().maximumSize( 1000 ).build( new CacheLoader<String, LegacyQueue>() {
             @Override
-            public Queue load( String queueName ) throws Exception {
+            public LegacyQueue load(String queueName ) throws Exception {
 
-                Queue queue = null;
+                LegacyQueue queue = null;
 
                 try {
                     GetQueueUrlResult result = sqs.getQueueUrl( queueName );
-                    queue = new Queue( result.getQueueUrl() );
+                    queue = new LegacyQueue( result.getQueueUrl() );
                 }
                 catch ( QueueDoesNotExistException queueDoesNotExistException ) {
                     logger.error( "Queue {} does not exist, will create", queueName );
@@ -142,7 +142,7 @@ public class SNSQueueManagerImpl implements QueueManager {
 
                 if ( queue == null ) {
                     String url = AmazonNotificationUtils.createQueue( sqs, queueName, fig );
-                    queue = new Queue( url );
+                    queue = new LegacyQueue( url );
                 }
 
                 setupTopics( queueName );
@@ -153,8 +153,8 @@ public class SNSQueueManagerImpl implements QueueManager {
 
 
     @Inject
-    public SNSQueueManagerImpl( @Assisted QueueScope scope, QueueFig fig, ClusterFig clusterFig,
-                                CassandraFig cassandraFig, QueueFig queueFig ) {
+    public SNSQueueManagerImpl(@Assisted LegacyQueueScope scope, LegacyQueueFig fig, ClusterFig clusterFig,
+                               CassandraFig cassandraFig, LegacyQueueFig queueFig ) {
         this.scope = scope;
         this.fig = fig;
         this.clusterFig = clusterFig;
@@ -232,7 +232,7 @@ public class SNSQueueManagerImpl implements QueueManager {
                 "Unable to subscribe PRIMARY queue=[{}] to topic=[{}]", queueUrl, primaryTopicArn, e );
         }
 
-        if ( fig.isMultiRegion() && scope.getRegionImplementation() == QueueScope.RegionImplementation.ALL ) {
+        if ( fig.isMultiRegion() && scope.getRegionImplementation() == LegacyQueueScope.RegionImplementation.ALL ) {
 
             String multiRegion = fig.getRegionList();
 
@@ -391,7 +391,7 @@ public class SNSQueueManagerImpl implements QueueManager {
     }
 
 
-    public Queue getReadQueue() {
+    public LegacyQueue getReadQueue() {
         String queueName = getName();
 
         try {
@@ -414,7 +414,7 @@ public class SNSQueueManagerImpl implements QueueManager {
 
 
     @Override
-    public List<QueueMessage> getMessages(final int limit, final Class klass) {
+    public List<LegacyQueueMessage> getMessages(final int limit, final Class klass) {
 
         if ( sqs == null ) {
             logger.error( "SQS is null - was not initialized properly" );
@@ -457,7 +457,7 @@ public class SNSQueueManagerImpl implements QueueManager {
                 logger.trace( "Received {} messages from {}", messages.size(), url );
             }
 
-            List<QueueMessage> queueMessages = new ArrayList<>( messages.size() );
+            List<LegacyQueueMessage> queueMessages = new ArrayList<>( messages.size() );
 
             for ( Message message : messages ) {
 
@@ -487,7 +487,7 @@ public class SNSQueueManagerImpl implements QueueManager {
                     throw new RuntimeException( e );
                 }
 
-                QueueMessage queueMessage = new QueueMessage( message.getMessageId(), message.getReceiptHandle(), payload,
+                LegacyQueueMessage queueMessage = new LegacyQueueMessage( message.getMessageId(), message.getReceiptHandle(), payload,
                     message.getAttributes().get( "type" ) );
                 queueMessage.setStringBody( originalBody );
                 int receiveCount = Integer.valueOf(message.getAttributes().get("ApproximateReceiveCount"));
@@ -634,7 +634,7 @@ public class SNSQueueManagerImpl implements QueueManager {
 
 
     @Override
-    public void commitMessage( final QueueMessage queueMessage ) {
+    public void commitMessage( final LegacyQueueMessage queueMessage ) {
         String url = getReadQueue().getUrl();
         if ( logger.isTraceEnabled() ) {
             logger.trace( "Commit message {} to queue {}", queueMessage.getMessageId(), url );
@@ -646,7 +646,7 @@ public class SNSQueueManagerImpl implements QueueManager {
 
 
     @Override
-    public void commitMessages( final List<QueueMessage> queueMessages ) {
+    public void commitMessages( final List<LegacyQueueMessage> queueMessages ) {
         String url = getReadQueue().getUrl();
 
         if ( logger.isTraceEnabled() ) {
@@ -655,7 +655,7 @@ public class SNSQueueManagerImpl implements QueueManager {
 
         List<DeleteMessageBatchRequestEntry> entries = new ArrayList<>();
 
-        for ( QueueMessage message : queueMessages ) {
+        for ( LegacyQueueMessage message : queueMessages ) {
             entries.add( new DeleteMessageBatchRequestEntry( message.getMessageId(), message.getHandle() ) );
         }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/util/AmazonNotificationUtils.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/util/AmazonNotificationUtils.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/util/AmazonNotificationUtils.java
index b6b1aaa..56bef91 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/util/AmazonNotificationUtils.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/util/AmazonNotificationUtils.java
@@ -9,7 +9,7 @@ import java.util.Map;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.usergrid.persistence.queue.QueueFig;
+import org.apache.usergrid.persistence.queue.LegacyQueueFig;
 
 import com.amazonaws.auth.policy.Condition;
 import com.amazonaws.auth.policy.Policy;
@@ -40,7 +40,7 @@ public class AmazonNotificationUtils {
     private static final Logger logger = LoggerFactory.getLogger( AmazonNotificationUtils.class );
 
 
-    public static String createQueue( final AmazonSQSClient sqs, final String queueName, final QueueFig fig )
+    public static String createQueue( final AmazonSQSClient sqs, final String queueName, final LegacyQueueFig fig )
         throws Exception {
 
         final String deadletterQueueName = String.format( "%s_dead", queueName );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerTest.java
new file mode 100644
index 0000000..69655e5
--- /dev/null
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/LegacyQueueManagerTest.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.queue;
+
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.apache.usergrid.persistence.core.aws.NoAWSCredsRule;
+import org.apache.usergrid.persistence.core.test.ITRunner;
+import org.apache.usergrid.persistence.core.test.UseModules;
+import org.apache.usergrid.persistence.queue.guice.TestQueueModule;
+import org.apache.usergrid.persistence.queue.impl.LegacyQueueScopeImpl;
+
+import com.google.inject.Inject;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assume.assumeTrue;
+
+
+@RunWith( ITRunner.class )
+@UseModules( { TestQueueModule.class } )
+public class LegacyQueueManagerTest {
+
+    @Inject
+    protected LegacyQueueFig queueFig;
+    @Inject
+    protected LegacyQueueManagerFactory qmf;
+
+    /**
+     * Mark tests as ignored if no AWS creds are present
+     */
+    @Rule
+    public NoAWSCredsRule awsCredsRule = new NoAWSCredsRule();
+
+
+    protected LegacyQueueScope scope;
+    private LegacyQueueManager qm;
+
+    public static long queueSeed = System.currentTimeMillis();
+
+
+    @Before
+    public void mockApp() {
+
+        this.scope = new LegacyQueueScopeImpl( "testQueue"+queueSeed++, LegacyQueueScope.RegionImplementation.LOCAL);
+        qm = qmf.getQueueManager(scope);
+    }
+
+    @org.junit.After
+    public void cleanup(){
+        qm.deleteQueue();
+    }
+
+
+    @Test
+    public void send() throws Exception{
+        String value = "bodytest";
+        qm.sendMessage(value);
+        List<LegacyQueueMessage> messageList = qm.getMessages(1, String.class);
+        assertTrue(messageList.size() >= 1);
+        for(LegacyQueueMessage message : messageList){
+            assertTrue(message.getBody().equals(value));
+            qm.commitMessage(message);
+        }
+
+        messageList = qm.getMessages(1, String.class);
+        assertTrue(messageList.size() <= 0);
+
+    }
+
+    @Test
+    public void sendMore() throws Exception{
+        HashMap<String,String> values = new HashMap<>();
+        values.put("test","Test");
+
+        List<Map<String,String>> bodies = new ArrayList<>();
+        bodies.add(values);
+        qm.sendMessages(bodies);
+        List<LegacyQueueMessage> messageList = qm.getMessages(1, values.getClass());
+        assertTrue(messageList.size() >= 1);
+        for(LegacyQueueMessage message : messageList){
+            assertTrue(message.getBody().equals(values));
+        }
+        qm.commitMessages(messageList);
+
+        messageList = qm.getMessages(1, values.getClass());
+        assertTrue(messageList.size() <= 0);
+
+    }
+
+    @Test
+    public void queueSize() throws Exception{
+        HashMap<String,String> values = new HashMap<>();
+        values.put("test", "Test");
+
+        List<Map<String,String>> bodies = new ArrayList<>();
+        bodies.add(values);
+        long initialDepth = qm.getQueueDepth();
+        qm.sendMessages(bodies);
+        long depth = 0;
+        for(int i=0; i<10;i++){
+             depth = qm.getQueueDepth();
+            if(depth>0){
+                break;
+            }
+            Thread.sleep(1000);
+        }
+        assertTrue(depth>0);
+
+        List<LegacyQueueMessage> messageList = qm.getMessages(10, values.getClass());
+        assertTrue(messageList.size() <= 500);
+        for(LegacyQueueMessage message : messageList){
+            assertTrue(message.getBody().equals(values));
+        }
+        if(messageList.size()>0) {
+            qm.commitMessages(messageList);
+        }
+        for(int i=0; i<10;i++){
+            depth = qm.getQueueDepth();
+            if(depth==initialDepth){
+                break;
+            }
+            Thread.sleep(1000);
+        }
+        assertEquals(initialDepth, depth);
+    }
+
+
+
+}