You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2016/09/16 22:34:34 UTC

[05/25] usergrid git commit: Move under org.apache.usergrid.persistence package in preparation for integration into Usergrid.

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
deleted file mode 100644
index d57beab..0000000
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
+++ /dev/null
@@ -1,156 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.persistence.queue;
-
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-
-import org.apache.usergrid.persistence.core.aws.NoAWSCredsRule;
-import org.apache.usergrid.persistence.core.test.ITRunner;
-import org.apache.usergrid.persistence.core.test.UseModules;
-import org.apache.usergrid.persistence.queue.guice.TestQueueModule;
-import org.apache.usergrid.persistence.queue.impl.QueueScopeImpl;
-
-import com.google.inject.Inject;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assume.assumeTrue;
-
-
-@RunWith( ITRunner.class )
-@UseModules( { TestQueueModule.class } )
-public class QueueManagerTest {
-
-    @Inject
-    protected QueueFig queueFig;
-    @Inject
-    protected QueueManagerFactory qmf;
-
-    /**
-     * Mark tests as ignored if no AWS creds are present
-     */
-    @Rule
-    public NoAWSCredsRule awsCredsRule = new NoAWSCredsRule();
-
-
-    protected QueueScope scope;
-    private QueueManager qm;
-
-    public static long queueSeed = System.currentTimeMillis();
-
-
-    @Before
-    public void mockApp() {
-
-        this.scope = new QueueScopeImpl( "testQueue"+queueSeed++, QueueScope.RegionImplementation.LOCAL);
-        qm = qmf.getQueueManager(scope);
-    }
-
-    @org.junit.After
-    public void cleanup(){
-        qm.deleteQueue();
-    }
-
-
-    @Test
-    public void send() throws Exception{
-        String value = "bodytest";
-        qm.sendMessage(value);
-        List<QueueMessage> messageList = qm.getMessages(1, String.class);
-        assertTrue(messageList.size() >= 1);
-        for(QueueMessage message : messageList){
-            assertTrue(message.getBody().equals(value));
-            qm.commitMessage(message);
-        }
-
-        messageList = qm.getMessages(1, String.class);
-        assertTrue(messageList.size() <= 0);
-
-    }
-
-    @Test
-    public void sendMore() throws Exception{
-        HashMap<String,String> values = new HashMap<>();
-        values.put("test","Test");
-
-        List<Map<String,String>> bodies = new ArrayList<>();
-        bodies.add(values);
-        qm.sendMessages(bodies);
-        List<QueueMessage> messageList = qm.getMessages(1, values.getClass());
-        assertTrue(messageList.size() >= 1);
-        for(QueueMessage message : messageList){
-            assertTrue(message.getBody().equals(values));
-        }
-        qm.commitMessages(messageList);
-
-        messageList = qm.getMessages(1, values.getClass());
-        assertTrue(messageList.size() <= 0);
-
-    }
-
-    @Test
-    public void queueSize() throws Exception{
-        HashMap<String,String> values = new HashMap<>();
-        values.put("test", "Test");
-
-        List<Map<String,String>> bodies = new ArrayList<>();
-        bodies.add(values);
-        long initialDepth = qm.getQueueDepth();
-        qm.sendMessages(bodies);
-        long depth = 0;
-        for(int i=0; i<10;i++){
-             depth = qm.getQueueDepth();
-            if(depth>0){
-                break;
-            }
-            Thread.sleep(1000);
-        }
-        assertTrue(depth>0);
-
-        List<QueueMessage> messageList = qm.getMessages(10, values.getClass());
-        assertTrue(messageList.size() <= 500);
-        for(QueueMessage message : messageList){
-            assertTrue(message.getBody().equals(values));
-        }
-        if(messageList.size()>0) {
-            qm.commitMessages(messageList);
-        }
-        for(int i=0; i<10;i++){
-            depth = qm.getQueueDepth();
-            if(depth==initialDepth){
-                break;
-            }
-            Thread.sleep(1000);
-        }
-        assertEquals(initialDepth, depth);
-    }
-
-
-
-}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
index 3f0ca69..adad92a 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
@@ -22,7 +22,7 @@ package org.apache.usergrid.services.notifications;
 
 import org.apache.usergrid.batch.JobExecution;
 import org.apache.usergrid.persistence.entities.Notification;
-import org.apache.usergrid.persistence.queue.QueueMessage;
+import org.apache.usergrid.persistence.queue.LegacyQueueMessage;
 import rx.Observable;
 
 import java.util.List;
@@ -54,7 +54,7 @@ public interface ApplicationQueueManager {
      * @param queuePath
      * @return
      */
-    Observable sendBatchToProviders(List<QueueMessage> messages, String queuePath);
+    Observable sendBatchToProviders(List<LegacyQueueMessage> messages, String queuePath);
 
     /**
      * stop processing and send message to providers to stop

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManagerCache.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManagerCache.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManagerCache.java
index 2ef567d..3b2eff3 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManagerCache.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManagerCache.java
@@ -20,7 +20,7 @@ import com.google.common.cache.*;
 import com.google.inject.Singleton;
 import org.apache.usergrid.persistence.EntityManager;
 import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
-import org.apache.usergrid.persistence.queue.QueueManager;
+import org.apache.usergrid.persistence.queue.LegacyQueueManager;
 import org.apache.usergrid.services.notifications.impl.ApplicationQueueManagerImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -103,7 +103,7 @@ public class ApplicationQueueManagerCache{
 
 
     public ApplicationQueueManager getApplicationQueueManager( final EntityManager entityManager,
-                                                               final QueueManager queueManager,
+                                                               final LegacyQueueManager legacyQueueManager,
                                                                final JobScheduler jobScheduler,
                                                                final MetricsFactory metricsService,
                                                                final Properties properties ) {
@@ -124,7 +124,7 @@ public class ApplicationQueueManagerCache{
             manager = new ApplicationQueueManagerImpl(
                 jobScheduler,
                 entityManager,
-                queueManager,
+                legacyQueueManager,
                 metricsService,
                 properties
             );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
index 907638e..b43594a 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
@@ -19,7 +19,6 @@ package org.apache.usergrid.services.notifications;
 
 import java.util.*;
 
-import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
 import org.apache.usergrid.services.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -36,10 +35,10 @@ import org.apache.usergrid.persistence.entities.Receipt;
 import org.apache.usergrid.persistence.exceptions.RequiredPropertyNotFoundException;
 import org.apache.usergrid.persistence.index.query.Identifier;
 import org.apache.usergrid.persistence.Query;
-import org.apache.usergrid.persistence.queue.QueueManager;
-import org.apache.usergrid.persistence.queue.QueueManagerFactory;
-import org.apache.usergrid.persistence.queue.QueueScope;
-import org.apache.usergrid.persistence.queue.impl.QueueScopeImpl;
+import org.apache.usergrid.persistence.queue.LegacyQueueManager;
+import org.apache.usergrid.persistence.queue.LegacyQueueManagerFactory;
+import org.apache.usergrid.persistence.queue.LegacyQueueScope;
+import org.apache.usergrid.persistence.queue.impl.LegacyQueueScopeImpl;
 import org.apache.usergrid.services.exceptions.ForbiddenServiceOperationException;
 import org.apache.usergrid.services.notifications.impl.ApplicationQueueManagerImpl;
 
@@ -76,7 +75,7 @@ public class NotificationsService extends AbstractCollectionService {
     private long gracePeriod;
     private ServiceManagerFactory smf;
     private EntityManagerFactory emf;
-    private QueueManagerFactory queueManagerFactory;
+    private LegacyQueueManagerFactory queueManagerFactory;
     private ApplicationQueueManagerCache applicationQueueManagerCache;
 
     public NotificationsService() {
@@ -97,12 +96,12 @@ public class NotificationsService extends AbstractCollectionService {
         postTimer = metricsService.getTimer(this.getClass(), "collection.post_requests");
         JobScheduler jobScheduler = new JobScheduler(sm,em);
         String name = ApplicationQueueManagerImpl.getQueueNames( props );
-        QueueScope queueScope = new QueueScopeImpl( name, QueueScope.RegionImplementation.LOCAL);
-        queueManagerFactory = getApplicationContext().getBean( Injector.class ).getInstance(QueueManagerFactory.class);
-        QueueManager queueManager = queueManagerFactory.getQueueManager(queueScope);
+        LegacyQueueScope queueScope = new LegacyQueueScopeImpl( name, LegacyQueueScope.RegionImplementation.LOCAL);
+        queueManagerFactory = getApplicationContext().getBean( Injector.class ).getInstance(LegacyQueueManagerFactory.class);
+        LegacyQueueManager legacyQueueManager = queueManagerFactory.getQueueManager(queueScope);
         applicationQueueManagerCache = getApplicationContext().getBean(Injector.class).getInstance(ApplicationQueueManagerCache.class);
         notificationQueueManager = applicationQueueManagerCache
-            .getApplicationQueueManager(em,queueManager, jobScheduler, metricsService ,props);
+            .getApplicationQueueManager(em, legacyQueueManager, jobScheduler, metricsService ,props);
 
         gracePeriod = JobScheduler.SCHEDULER_GRACE_PERIOD;
     }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueJob.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueJob.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueJob.java
index 245a36f..0ba5c1e 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueJob.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueJob.java
@@ -24,7 +24,6 @@ import javax.annotation.PostConstruct;
 import com.google.inject.Injector;
 import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
 import org.apache.usergrid.persistence.entities.Notification;
-import org.apache.usergrid.persistence.queue.QueueManagerFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
index 478d5ed..20fbd84 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
@@ -18,17 +18,14 @@ package org.apache.usergrid.services.notifications;
 
 import com.codahale.metrics.*;
 import com.codahale.metrics.Timer;
-import com.google.common.cache.*;
 import com.google.inject.Injector;
 
-import org.apache.usergrid.persistence.EntityManager;
 import org.apache.usergrid.persistence.EntityManagerFactory;
 
 import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
 import org.apache.usergrid.persistence.queue.*;
-import org.apache.usergrid.persistence.queue.QueueManager;
-import org.apache.usergrid.persistence.queue.impl.QueueScopeImpl;
-import org.apache.usergrid.services.ServiceManager;
+import org.apache.usergrid.persistence.queue.LegacyQueueManager;
+import org.apache.usergrid.persistence.queue.impl.LegacyQueueScopeImpl;
 import org.apache.usergrid.services.ServiceManagerFactory;
 import org.apache.usergrid.services.notifications.impl.ApplicationQueueManagerImpl;
 import org.slf4j.Logger;
@@ -46,7 +43,7 @@ import java.util.concurrent.atomic.AtomicLong;
  */
 public class QueueListener  {
 
-    private final QueueManagerFactory queueManagerFactory;
+    private final LegacyQueueManagerFactory queueManagerFactory;
 
     public static long DEFAULT_SLEEP = 100;
 
@@ -75,7 +72,7 @@ public class QueueListener  {
     private int consecutiveCallsToRemoveDevices;
 
     public QueueListener(ServiceManagerFactory smf, EntityManagerFactory emf, Properties props){
-        this.queueManagerFactory = smf.getApplicationContext().getBean( Injector.class ).getInstance(QueueManagerFactory.class);
+        this.queueManagerFactory = smf.getApplicationContext().getBean( Injector.class ).getInstance(LegacyQueueManagerFactory.class);
         this.smf = smf;
         this.emf = emf;
         this.metricsService = smf.getApplicationContext().getBean( Injector.class ).getInstance(MetricsFactory.class);
@@ -161,8 +158,8 @@ public class QueueListener  {
             logger.trace("getting from queue {} ", queueName);
         }
 
-        QueueScope queueScope = new QueueScopeImpl( queueName, QueueScope.RegionImplementation.LOCAL);
-        QueueManager queueManager = queueManagerFactory.getQueueManager(queueScope);
+        LegacyQueueScope queueScope = new LegacyQueueScopeImpl( queueName, LegacyQueueScope.RegionImplementation.LOCAL);
+        LegacyQueueManager legacyQueueManager = queueManagerFactory.getQueueManager(queueScope);
 
         // run until there are no more active jobs
         final AtomicLong runCount = new AtomicLong(0);
@@ -170,7 +167,7 @@ public class QueueListener  {
         while ( true ) {
 
                 Timer.Context timerContext = timer.time();
-                rx.Observable.from(queueManager.getMessages(MAX_TAKE, ApplicationQueueMessage.class))
+                rx.Observable.from( legacyQueueManager.getMessages(MAX_TAKE, ApplicationQueueMessage.class))
                     .buffer(MAX_TAKE)
                     .doOnNext(messages -> {
 
@@ -180,10 +177,10 @@ public class QueueListener  {
                             }
 
                             if (messages.size() > 0) {
-                                HashMap<UUID, List<QueueMessage>> messageMap = new HashMap<>(messages.size());
+                                HashMap<UUID, List<LegacyQueueMessage>> messageMap = new HashMap<>(messages.size());
 
                                 //group messages into hash map by app id
-                                for (QueueMessage message : messages) {
+                                for (LegacyQueueMessage message : messages) {
                                     //TODO: stop copying around this area as it gets notification specific.
                                     ApplicationQueueMessage queueMessage = (ApplicationQueueMessage) message.getBody();
                                     UUID applicationId = queueMessage.getApplicationId();
@@ -191,7 +188,7 @@ public class QueueListener  {
                                     //Groups queue messages by application Id, ( they are all probably going to the same place )
                                     if (!messageMap.containsKey(applicationId)) {
                                         //For each app id it sends the set.
-                                        List<QueueMessage> applicationQueueMessages = new ArrayList<QueueMessage>();
+                                        List<LegacyQueueMessage> applicationQueueMessages = new ArrayList<LegacyQueueMessage>();
                                         applicationQueueMessages.add(message);
                                         messageMap.put(applicationId, applicationQueueMessages);
                                     } else {
@@ -203,13 +200,13 @@ public class QueueListener  {
                                 Observable merge = null;
 
                                 //send each set of app ids together
-                                for (Map.Entry<UUID, List<QueueMessage>> entry : messageMap.entrySet()) {
+                                for (Map.Entry<UUID, List<LegacyQueueMessage>> entry : messageMap.entrySet()) {
                                     UUID applicationId = entry.getKey();
 
                                     ApplicationQueueManager manager = applicationQueueManagerCache
                                         .getApplicationQueueManager(
                                             emf.getEntityManager(applicationId),
-                                            queueManager,
+                                            legacyQueueManager,
                                             new JobScheduler(smf.getServiceManager(applicationId), emf.getEntityManager(applicationId)),
                                             metricsService,
                                             properties
@@ -230,7 +227,7 @@ public class QueueListener  {
                                 if(merge!=null) {
                                     merge.toBlocking().lastOrDefault(null);
                                 }
-                                queueManager.commitMessages(messages);
+                                legacyQueueManager.commitMessages(messages);
 
                                 meter.mark(messages.size());
                                 if (logger.isTraceEnabled()) {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
index 23b21f2..96e2dbd 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
@@ -47,8 +47,8 @@ import org.apache.usergrid.persistence.entities.Notifier;
 import org.apache.usergrid.persistence.entities.Receipt;
 import org.apache.usergrid.persistence.entities.User;
 import org.apache.usergrid.persistence.index.utils.UUIDUtils;
-import org.apache.usergrid.persistence.queue.QueueManager;
-import org.apache.usergrid.persistence.queue.QueueMessage;
+import org.apache.usergrid.persistence.queue.LegacyQueueManager;
+import org.apache.usergrid.persistence.queue.LegacyQueueMessage;
 import org.apache.usergrid.services.notifications.ApplicationQueueManager;
 import org.apache.usergrid.services.notifications.ApplicationQueueMessage;
 import org.apache.usergrid.services.notifications.JobScheduler;
@@ -72,7 +72,7 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
     private static final Logger logger = LoggerFactory.getLogger(ApplicationQueueManagerImpl.class);
 
     private final EntityManager em;
-    private final QueueManager qm;
+    private final LegacyQueueManager qm;
     private final JobScheduler jobScheduler;
     private final MetricsFactory metricsFactory;
     private final String queueName;
@@ -93,11 +93,11 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
 
 
 
-    public ApplicationQueueManagerImpl( JobScheduler jobScheduler, EntityManager entityManager,
-                                        QueueManager queueManager, MetricsFactory metricsFactory,
-                                        Properties properties) {
+    public ApplicationQueueManagerImpl(JobScheduler jobScheduler, EntityManager entityManager,
+                                       LegacyQueueManager legacyQueueManager, MetricsFactory metricsFactory,
+                                       Properties properties) {
         this.em = entityManager;
-        this.qm = queueManager;
+        this.qm = legacyQueueManager;
         this.jobScheduler = jobScheduler;
         this.metricsFactory = metricsFactory;
         this.queueName = getQueueNames(properties);
@@ -473,7 +473,7 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
      * @throws Exception
      */
     @Override
-    public Observable sendBatchToProviders(final List<QueueMessage> messages, final String queuePath) {
+    public Observable sendBatchToProviders(final List<LegacyQueueMessage> messages, final String queuePath) {
         if (logger.isTraceEnabled()) {
             logger.trace("sending batch of {} notifications.", messages.size());
         }
@@ -483,7 +483,7 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
         final ConcurrentHashMap<UUID, TaskManager> taskMap = new ConcurrentHashMap<UUID, TaskManager>(messages.size());
         final ConcurrentHashMap<UUID, Notification> notificationMap = new ConcurrentHashMap<UUID, Notification>(messages.size());
 
-        final Func1<QueueMessage, ApplicationQueueMessage> func = queueMessage -> {
+        final Func1<LegacyQueueMessage, ApplicationQueueMessage> func = queueMessage -> {
             boolean messageCommitted = false;
             ApplicationQueueMessage message = null;
             try {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueListener.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueListener.java b/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueListener.java
index a95475d..e2a2808 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueListener.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueListener.java
@@ -20,16 +20,14 @@ package org.apache.usergrid.services.queues;
 import java.util.List;
 import java.util.Properties;
 
-import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 
 import org.apache.usergrid.management.importer.ImportService;
-import org.apache.usergrid.management.importer.ImportServiceImpl;
 
 import org.apache.usergrid.persistence.EntityManagerFactory;
-import org.apache.usergrid.persistence.queue.QueueMessage;
+import org.apache.usergrid.persistence.queue.LegacyQueueMessage;
 import org.apache.usergrid.services.ServiceManagerFactory;
 
 import com.google.inject.Inject;
@@ -65,7 +63,7 @@ public class ImportQueueListener extends QueueListener {
      * @param messages
      */
     @Override
-    public void onMessage( final List<QueueMessage> messages ) throws Exception {
+    public void onMessage( final List<LegacyQueueMessage> messages ) throws Exception {
         /**
          * Much like in the original queueListener , we need to translate the Messages that we get
          * back from the QueueMessage into something like an Import message. The way that a
@@ -76,7 +74,7 @@ public class ImportQueueListener extends QueueListener {
         if (logger.isTraceEnabled()) {
             logger.trace("Doing work in onMessage in ImportQueueListener");
         }
-        for (QueueMessage message : messages) {
+        for (LegacyQueueMessage message : messages) {
             ImportQueueMessage queueMessage = ( ImportQueueMessage ) message.getBody();
 
 //        TODO   We still need to hide this queue behind the scheduler importService.downloadAndImportFile( queueMessage );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java b/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java
index f3c65c7..d9db84a 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java
@@ -26,17 +26,17 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.usergrid.persistence.queue.QueueManager;
-import org.apache.usergrid.persistence.queue.QueueMessage;
+import org.apache.usergrid.persistence.queue.LegacyQueueManager;
+import org.apache.usergrid.persistence.queue.LegacyQueueMessage;
 
 
 /**
  * Manages the queueManager implementation for Import
  */
-public class ImportQueueManager implements QueueManager {
+public class ImportQueueManager implements LegacyQueueManager {
 
     @Override
-    public List<QueueMessage> getMessages(final int limit, final Class klass) {
+    public List<LegacyQueueMessage> getMessages(final int limit, final Class klass) {
         return new ArrayList<>();
     }
 
@@ -47,13 +47,13 @@ public class ImportQueueManager implements QueueManager {
 
 
     @Override
-    public void commitMessage( final QueueMessage queueMessage ) {
+    public void commitMessage( final LegacyQueueMessage queueMessage ) {
 
     }
 
 
     @Override
-    public void commitMessages( final List<QueueMessage> queueMessages ) {
+    public void commitMessages( final List<LegacyQueueMessage> queueMessages ) {
 
     }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/services/src/main/java/org/apache/usergrid/services/queues/QueueListener.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/queues/QueueListener.java b/stack/services/src/main/java/org/apache/usergrid/services/queues/QueueListener.java
index 9d95d87..965e95e 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/queues/QueueListener.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/queues/QueueListener.java
@@ -24,8 +24,8 @@ import org.apache.usergrid.persistence.EntityManagerFactory;
 
 import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
 import org.apache.usergrid.persistence.queue.*;
-import org.apache.usergrid.persistence.queue.QueueManager;
-import org.apache.usergrid.persistence.queue.impl.QueueScopeImpl;
+import org.apache.usergrid.persistence.queue.LegacyQueueManager;
+import org.apache.usergrid.persistence.queue.impl.LegacyQueueScopeImpl;
 import org.apache.usergrid.services.ServiceManager;
 import org.apache.usergrid.services.ServiceManagerFactory;
 import org.slf4j.Logger;
@@ -43,7 +43,7 @@ import java.util.concurrent.atomic.AtomicInteger;
  */
 public abstract class QueueListener  {
     public  final int MESSAGE_TRANSACTION_TIMEOUT =  25 * 1000;
-    private final QueueManagerFactory queueManagerFactory;
+    private final LegacyQueueManagerFactory queueManagerFactory;
 
     public  long DEFAULT_SLEEP = 5000;
 
@@ -83,7 +83,7 @@ public abstract class QueueListener  {
      */
     public QueueListener(ServiceManagerFactory smf, EntityManagerFactory emf, Injector injector, Properties props){
         //TODO: change current injectors to use service module instead of CpSetup
-        this.queueManagerFactory = injector.getInstance( QueueManagerFactory.class );
+        this.queueManagerFactory = injector.getInstance( LegacyQueueManagerFactory.class );
         this.smf = smf;
         this.emf = injector.getInstance( EntityManagerFactory.class ); //emf;
         this.metricsService = injector.getInstance(MetricsFactory.class);
@@ -169,8 +169,8 @@ public abstract class QueueListener  {
         if (logger.isTraceEnabled()) {
             logger.trace("getting from queue {} ", queueName);
         }
-        QueueScope queueScope = new QueueScopeImpl( queueName, QueueScope.RegionImplementation.LOCAL);
-        QueueManager queueManager = queueManagerFactory.getQueueManager(queueScope);
+        LegacyQueueScope queueScope = new LegacyQueueScopeImpl( queueName, LegacyQueueScope.RegionImplementation.LOCAL);
+        LegacyQueueManager legacyQueueManager = queueManagerFactory.getQueueManager(queueScope);
         // run until there are no more active jobs
         long runCount = 0;
 
@@ -181,7 +181,7 @@ public abstract class QueueListener  {
                 Timer.Context timerContext = timer.time();
                 //Get the messages out of the queue.
                 //TODO: a model class to get generic queueMessages out of the queueManager. Ask Shawn what should go here.
-                rx.Observable.from( queueManager.getMessages(getBatchSize(), ImportQueueMessage.class))
+                rx.Observable.from( legacyQueueManager.getMessages(getBatchSize(), ImportQueueMessage.class))
                     .buffer(getBatchSize())
                     .doOnNext(messages -> {
                         try {
@@ -197,7 +197,7 @@ public abstract class QueueListener  {
                                 // asking for a onMessage call.
                                 onMessage(messages);
 
-                                queueManager.commitMessages(messages);
+                                legacyQueueManager.commitMessages(messages);
 
                                 meter.mark(messages.size());
                                 if (logger.isTraceEnabled()) {
@@ -267,7 +267,7 @@ public abstract class QueueListener  {
      * This will be the method that does the job dependant execution.
      * @param messages
      */
-    public abstract void onMessage(List<QueueMessage> messages) throws Exception;
+    public abstract void onMessage(List<LegacyQueueMessage> messages) throws Exception;
 
     public abstract String getQueueName();