You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2016/09/16 22:34:34 UTC
[05/25] usergrid git commit: Move under
org.apache.usergrid.persistence package in preparation for integration into
Usergrid.
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
deleted file mode 100644
index d57beab..0000000
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
+++ /dev/null
@@ -1,156 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.persistence.queue;
-
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-
-import org.apache.usergrid.persistence.core.aws.NoAWSCredsRule;
-import org.apache.usergrid.persistence.core.test.ITRunner;
-import org.apache.usergrid.persistence.core.test.UseModules;
-import org.apache.usergrid.persistence.queue.guice.TestQueueModule;
-import org.apache.usergrid.persistence.queue.impl.QueueScopeImpl;
-
-import com.google.inject.Inject;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assume.assumeTrue;
-
-
-@RunWith( ITRunner.class )
-@UseModules( { TestQueueModule.class } )
-public class QueueManagerTest {
-
- @Inject
- protected QueueFig queueFig;
- @Inject
- protected QueueManagerFactory qmf;
-
- /**
- * Mark tests as ignored if no AWS creds are present
- */
- @Rule
- public NoAWSCredsRule awsCredsRule = new NoAWSCredsRule();
-
-
- protected QueueScope scope;
- private QueueManager qm;
-
- public static long queueSeed = System.currentTimeMillis();
-
-
- @Before
- public void mockApp() {
-
- this.scope = new QueueScopeImpl( "testQueue"+queueSeed++, QueueScope.RegionImplementation.LOCAL);
- qm = qmf.getQueueManager(scope);
- }
-
- @org.junit.After
- public void cleanup(){
- qm.deleteQueue();
- }
-
-
- @Test
- public void send() throws Exception{
- String value = "bodytest";
- qm.sendMessage(value);
- List<QueueMessage> messageList = qm.getMessages(1, String.class);
- assertTrue(messageList.size() >= 1);
- for(QueueMessage message : messageList){
- assertTrue(message.getBody().equals(value));
- qm.commitMessage(message);
- }
-
- messageList = qm.getMessages(1, String.class);
- assertTrue(messageList.size() <= 0);
-
- }
-
- @Test
- public void sendMore() throws Exception{
- HashMap<String,String> values = new HashMap<>();
- values.put("test","Test");
-
- List<Map<String,String>> bodies = new ArrayList<>();
- bodies.add(values);
- qm.sendMessages(bodies);
- List<QueueMessage> messageList = qm.getMessages(1, values.getClass());
- assertTrue(messageList.size() >= 1);
- for(QueueMessage message : messageList){
- assertTrue(message.getBody().equals(values));
- }
- qm.commitMessages(messageList);
-
- messageList = qm.getMessages(1, values.getClass());
- assertTrue(messageList.size() <= 0);
-
- }
-
- @Test
- public void queueSize() throws Exception{
- HashMap<String,String> values = new HashMap<>();
- values.put("test", "Test");
-
- List<Map<String,String>> bodies = new ArrayList<>();
- bodies.add(values);
- long initialDepth = qm.getQueueDepth();
- qm.sendMessages(bodies);
- long depth = 0;
- for(int i=0; i<10;i++){
- depth = qm.getQueueDepth();
- if(depth>0){
- break;
- }
- Thread.sleep(1000);
- }
- assertTrue(depth>0);
-
- List<QueueMessage> messageList = qm.getMessages(10, values.getClass());
- assertTrue(messageList.size() <= 500);
- for(QueueMessage message : messageList){
- assertTrue(message.getBody().equals(values));
- }
- if(messageList.size()>0) {
- qm.commitMessages(messageList);
- }
- for(int i=0; i<10;i++){
- depth = qm.getQueueDepth();
- if(depth==initialDepth){
- break;
- }
- Thread.sleep(1000);
- }
- assertEquals(initialDepth, depth);
- }
-
-
-
-}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
index 3f0ca69..adad92a 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
@@ -22,7 +22,7 @@ package org.apache.usergrid.services.notifications;
import org.apache.usergrid.batch.JobExecution;
import org.apache.usergrid.persistence.entities.Notification;
-import org.apache.usergrid.persistence.queue.QueueMessage;
+import org.apache.usergrid.persistence.queue.LegacyQueueMessage;
import rx.Observable;
import java.util.List;
@@ -54,7 +54,7 @@ public interface ApplicationQueueManager {
* @param queuePath
* @return
*/
- Observable sendBatchToProviders(List<QueueMessage> messages, String queuePath);
+ Observable sendBatchToProviders(List<LegacyQueueMessage> messages, String queuePath);
/**
* stop processing and send message to providers to stop
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManagerCache.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManagerCache.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManagerCache.java
index 2ef567d..3b2eff3 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManagerCache.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManagerCache.java
@@ -20,7 +20,7 @@ import com.google.common.cache.*;
import com.google.inject.Singleton;
import org.apache.usergrid.persistence.EntityManager;
import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
-import org.apache.usergrid.persistence.queue.QueueManager;
+import org.apache.usergrid.persistence.queue.LegacyQueueManager;
import org.apache.usergrid.services.notifications.impl.ApplicationQueueManagerImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -103,7 +103,7 @@ public class ApplicationQueueManagerCache{
public ApplicationQueueManager getApplicationQueueManager( final EntityManager entityManager,
- final QueueManager queueManager,
+ final LegacyQueueManager legacyQueueManager,
final JobScheduler jobScheduler,
final MetricsFactory metricsService,
final Properties properties ) {
@@ -124,7 +124,7 @@ public class ApplicationQueueManagerCache{
manager = new ApplicationQueueManagerImpl(
jobScheduler,
entityManager,
- queueManager,
+ legacyQueueManager,
metricsService,
properties
);
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
index 907638e..b43594a 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
@@ -19,7 +19,6 @@ package org.apache.usergrid.services.notifications;
import java.util.*;
-import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
import org.apache.usergrid.services.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,10 +35,10 @@ import org.apache.usergrid.persistence.entities.Receipt;
import org.apache.usergrid.persistence.exceptions.RequiredPropertyNotFoundException;
import org.apache.usergrid.persistence.index.query.Identifier;
import org.apache.usergrid.persistence.Query;
-import org.apache.usergrid.persistence.queue.QueueManager;
-import org.apache.usergrid.persistence.queue.QueueManagerFactory;
-import org.apache.usergrid.persistence.queue.QueueScope;
-import org.apache.usergrid.persistence.queue.impl.QueueScopeImpl;
+import org.apache.usergrid.persistence.queue.LegacyQueueManager;
+import org.apache.usergrid.persistence.queue.LegacyQueueManagerFactory;
+import org.apache.usergrid.persistence.queue.LegacyQueueScope;
+import org.apache.usergrid.persistence.queue.impl.LegacyQueueScopeImpl;
import org.apache.usergrid.services.exceptions.ForbiddenServiceOperationException;
import org.apache.usergrid.services.notifications.impl.ApplicationQueueManagerImpl;
@@ -76,7 +75,7 @@ public class NotificationsService extends AbstractCollectionService {
private long gracePeriod;
private ServiceManagerFactory smf;
private EntityManagerFactory emf;
- private QueueManagerFactory queueManagerFactory;
+ private LegacyQueueManagerFactory queueManagerFactory;
private ApplicationQueueManagerCache applicationQueueManagerCache;
public NotificationsService() {
@@ -97,12 +96,12 @@ public class NotificationsService extends AbstractCollectionService {
postTimer = metricsService.getTimer(this.getClass(), "collection.post_requests");
JobScheduler jobScheduler = new JobScheduler(sm,em);
String name = ApplicationQueueManagerImpl.getQueueNames( props );
- QueueScope queueScope = new QueueScopeImpl( name, QueueScope.RegionImplementation.LOCAL);
- queueManagerFactory = getApplicationContext().getBean( Injector.class ).getInstance(QueueManagerFactory.class);
- QueueManager queueManager = queueManagerFactory.getQueueManager(queueScope);
+ LegacyQueueScope queueScope = new LegacyQueueScopeImpl( name, LegacyQueueScope.RegionImplementation.LOCAL);
+ queueManagerFactory = getApplicationContext().getBean( Injector.class ).getInstance(LegacyQueueManagerFactory.class);
+ LegacyQueueManager legacyQueueManager = queueManagerFactory.getQueueManager(queueScope);
applicationQueueManagerCache = getApplicationContext().getBean(Injector.class).getInstance(ApplicationQueueManagerCache.class);
notificationQueueManager = applicationQueueManagerCache
- .getApplicationQueueManager(em,queueManager, jobScheduler, metricsService ,props);
+ .getApplicationQueueManager(em, legacyQueueManager, jobScheduler, metricsService ,props);
gracePeriod = JobScheduler.SCHEDULER_GRACE_PERIOD;
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueJob.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueJob.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueJob.java
index 245a36f..0ba5c1e 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueJob.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueJob.java
@@ -24,7 +24,6 @@ import javax.annotation.PostConstruct;
import com.google.inject.Injector;
import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
import org.apache.usergrid.persistence.entities.Notification;
-import org.apache.usergrid.persistence.queue.QueueManagerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
index 478d5ed..20fbd84 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
@@ -18,17 +18,14 @@ package org.apache.usergrid.services.notifications;
import com.codahale.metrics.*;
import com.codahale.metrics.Timer;
-import com.google.common.cache.*;
import com.google.inject.Injector;
-import org.apache.usergrid.persistence.EntityManager;
import org.apache.usergrid.persistence.EntityManagerFactory;
import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
import org.apache.usergrid.persistence.queue.*;
-import org.apache.usergrid.persistence.queue.QueueManager;
-import org.apache.usergrid.persistence.queue.impl.QueueScopeImpl;
-import org.apache.usergrid.services.ServiceManager;
+import org.apache.usergrid.persistence.queue.LegacyQueueManager;
+import org.apache.usergrid.persistence.queue.impl.LegacyQueueScopeImpl;
import org.apache.usergrid.services.ServiceManagerFactory;
import org.apache.usergrid.services.notifications.impl.ApplicationQueueManagerImpl;
import org.slf4j.Logger;
@@ -46,7 +43,7 @@ import java.util.concurrent.atomic.AtomicLong;
*/
public class QueueListener {
- private final QueueManagerFactory queueManagerFactory;
+ private final LegacyQueueManagerFactory queueManagerFactory;
public static long DEFAULT_SLEEP = 100;
@@ -75,7 +72,7 @@ public class QueueListener {
private int consecutiveCallsToRemoveDevices;
public QueueListener(ServiceManagerFactory smf, EntityManagerFactory emf, Properties props){
- this.queueManagerFactory = smf.getApplicationContext().getBean( Injector.class ).getInstance(QueueManagerFactory.class);
+ this.queueManagerFactory = smf.getApplicationContext().getBean( Injector.class ).getInstance(LegacyQueueManagerFactory.class);
this.smf = smf;
this.emf = emf;
this.metricsService = smf.getApplicationContext().getBean( Injector.class ).getInstance(MetricsFactory.class);
@@ -161,8 +158,8 @@ public class QueueListener {
logger.trace("getting from queue {} ", queueName);
}
- QueueScope queueScope = new QueueScopeImpl( queueName, QueueScope.RegionImplementation.LOCAL);
- QueueManager queueManager = queueManagerFactory.getQueueManager(queueScope);
+ LegacyQueueScope queueScope = new LegacyQueueScopeImpl( queueName, LegacyQueueScope.RegionImplementation.LOCAL);
+ LegacyQueueManager legacyQueueManager = queueManagerFactory.getQueueManager(queueScope);
// run until there are no more active jobs
final AtomicLong runCount = new AtomicLong(0);
@@ -170,7 +167,7 @@ public class QueueListener {
while ( true ) {
Timer.Context timerContext = timer.time();
- rx.Observable.from(queueManager.getMessages(MAX_TAKE, ApplicationQueueMessage.class))
+ rx.Observable.from( legacyQueueManager.getMessages(MAX_TAKE, ApplicationQueueMessage.class))
.buffer(MAX_TAKE)
.doOnNext(messages -> {
@@ -180,10 +177,10 @@ public class QueueListener {
}
if (messages.size() > 0) {
- HashMap<UUID, List<QueueMessage>> messageMap = new HashMap<>(messages.size());
+ HashMap<UUID, List<LegacyQueueMessage>> messageMap = new HashMap<>(messages.size());
//group messages into hash map by app id
- for (QueueMessage message : messages) {
+ for (LegacyQueueMessage message : messages) {
//TODO: stop copying around this area as it gets notification specific.
ApplicationQueueMessage queueMessage = (ApplicationQueueMessage) message.getBody();
UUID applicationId = queueMessage.getApplicationId();
@@ -191,7 +188,7 @@ public class QueueListener {
//Groups queue messages by application Id, ( they are all probably going to the same place )
if (!messageMap.containsKey(applicationId)) {
//For each app id it sends the set.
- List<QueueMessage> applicationQueueMessages = new ArrayList<QueueMessage>();
+ List<LegacyQueueMessage> applicationQueueMessages = new ArrayList<LegacyQueueMessage>();
applicationQueueMessages.add(message);
messageMap.put(applicationId, applicationQueueMessages);
} else {
@@ -203,13 +200,13 @@ public class QueueListener {
Observable merge = null;
//send each set of app ids together
- for (Map.Entry<UUID, List<QueueMessage>> entry : messageMap.entrySet()) {
+ for (Map.Entry<UUID, List<LegacyQueueMessage>> entry : messageMap.entrySet()) {
UUID applicationId = entry.getKey();
ApplicationQueueManager manager = applicationQueueManagerCache
.getApplicationQueueManager(
emf.getEntityManager(applicationId),
- queueManager,
+ legacyQueueManager,
new JobScheduler(smf.getServiceManager(applicationId), emf.getEntityManager(applicationId)),
metricsService,
properties
@@ -230,7 +227,7 @@ public class QueueListener {
if(merge!=null) {
merge.toBlocking().lastOrDefault(null);
}
- queueManager.commitMessages(messages);
+ legacyQueueManager.commitMessages(messages);
meter.mark(messages.size());
if (logger.isTraceEnabled()) {
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
index 23b21f2..96e2dbd 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
@@ -47,8 +47,8 @@ import org.apache.usergrid.persistence.entities.Notifier;
import org.apache.usergrid.persistence.entities.Receipt;
import org.apache.usergrid.persistence.entities.User;
import org.apache.usergrid.persistence.index.utils.UUIDUtils;
-import org.apache.usergrid.persistence.queue.QueueManager;
-import org.apache.usergrid.persistence.queue.QueueMessage;
+import org.apache.usergrid.persistence.queue.LegacyQueueManager;
+import org.apache.usergrid.persistence.queue.LegacyQueueMessage;
import org.apache.usergrid.services.notifications.ApplicationQueueManager;
import org.apache.usergrid.services.notifications.ApplicationQueueMessage;
import org.apache.usergrid.services.notifications.JobScheduler;
@@ -72,7 +72,7 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
private static final Logger logger = LoggerFactory.getLogger(ApplicationQueueManagerImpl.class);
private final EntityManager em;
- private final QueueManager qm;
+ private final LegacyQueueManager qm;
private final JobScheduler jobScheduler;
private final MetricsFactory metricsFactory;
private final String queueName;
@@ -93,11 +93,11 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
- public ApplicationQueueManagerImpl( JobScheduler jobScheduler, EntityManager entityManager,
- QueueManager queueManager, MetricsFactory metricsFactory,
- Properties properties) {
+ public ApplicationQueueManagerImpl(JobScheduler jobScheduler, EntityManager entityManager,
+ LegacyQueueManager legacyQueueManager, MetricsFactory metricsFactory,
+ Properties properties) {
this.em = entityManager;
- this.qm = queueManager;
+ this.qm = legacyQueueManager;
this.jobScheduler = jobScheduler;
this.metricsFactory = metricsFactory;
this.queueName = getQueueNames(properties);
@@ -473,7 +473,7 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
* @throws Exception
*/
@Override
- public Observable sendBatchToProviders(final List<QueueMessage> messages, final String queuePath) {
+ public Observable sendBatchToProviders(final List<LegacyQueueMessage> messages, final String queuePath) {
if (logger.isTraceEnabled()) {
logger.trace("sending batch of {} notifications.", messages.size());
}
@@ -483,7 +483,7 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
final ConcurrentHashMap<UUID, TaskManager> taskMap = new ConcurrentHashMap<UUID, TaskManager>(messages.size());
final ConcurrentHashMap<UUID, Notification> notificationMap = new ConcurrentHashMap<UUID, Notification>(messages.size());
- final Func1<QueueMessage, ApplicationQueueMessage> func = queueMessage -> {
+ final Func1<LegacyQueueMessage, ApplicationQueueMessage> func = queueMessage -> {
boolean messageCommitted = false;
ApplicationQueueMessage message = null;
try {
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueListener.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueListener.java b/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueListener.java
index a95475d..e2a2808 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueListener.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueListener.java
@@ -20,16 +20,14 @@ package org.apache.usergrid.services.queues;
import java.util.List;
import java.util.Properties;
-import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.apache.usergrid.management.importer.ImportService;
-import org.apache.usergrid.management.importer.ImportServiceImpl;
import org.apache.usergrid.persistence.EntityManagerFactory;
-import org.apache.usergrid.persistence.queue.QueueMessage;
+import org.apache.usergrid.persistence.queue.LegacyQueueMessage;
import org.apache.usergrid.services.ServiceManagerFactory;
import com.google.inject.Inject;
@@ -65,7 +63,7 @@ public class ImportQueueListener extends QueueListener {
* @param messages
*/
@Override
- public void onMessage( final List<QueueMessage> messages ) throws Exception {
+ public void onMessage( final List<LegacyQueueMessage> messages ) throws Exception {
/**
* Much like in the original queueListener , we need to translate the Messages that we get
* back from the QueueMessage into something like an Import message. The way that a
@@ -76,7 +74,7 @@ public class ImportQueueListener extends QueueListener {
if (logger.isTraceEnabled()) {
logger.trace("Doing work in onMessage in ImportQueueListener");
}
- for (QueueMessage message : messages) {
+ for (LegacyQueueMessage message : messages) {
ImportQueueMessage queueMessage = ( ImportQueueMessage ) message.getBody();
// TODO We still need to hide this queue behind the scheduler importService.downloadAndImportFile( queueMessage );
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java b/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java
index f3c65c7..d9db84a 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/queues/ImportQueueManager.java
@@ -26,17 +26,17 @@ import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
-import org.apache.usergrid.persistence.queue.QueueManager;
-import org.apache.usergrid.persistence.queue.QueueMessage;
+import org.apache.usergrid.persistence.queue.LegacyQueueManager;
+import org.apache.usergrid.persistence.queue.LegacyQueueMessage;
/**
* Manages the queueManager implementation for Import
*/
-public class ImportQueueManager implements QueueManager {
+public class ImportQueueManager implements LegacyQueueManager {
@Override
- public List<QueueMessage> getMessages(final int limit, final Class klass) {
+ public List<LegacyQueueMessage> getMessages(final int limit, final Class klass) {
return new ArrayList<>();
}
@@ -47,13 +47,13 @@ public class ImportQueueManager implements QueueManager {
@Override
- public void commitMessage( final QueueMessage queueMessage ) {
+ public void commitMessage( final LegacyQueueMessage queueMessage ) {
}
@Override
- public void commitMessages( final List<QueueMessage> queueMessages ) {
+ public void commitMessages( final List<LegacyQueueMessage> queueMessages ) {
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9016fd29/stack/services/src/main/java/org/apache/usergrid/services/queues/QueueListener.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/queues/QueueListener.java b/stack/services/src/main/java/org/apache/usergrid/services/queues/QueueListener.java
index 9d95d87..965e95e 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/queues/QueueListener.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/queues/QueueListener.java
@@ -24,8 +24,8 @@ import org.apache.usergrid.persistence.EntityManagerFactory;
import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
import org.apache.usergrid.persistence.queue.*;
-import org.apache.usergrid.persistence.queue.QueueManager;
-import org.apache.usergrid.persistence.queue.impl.QueueScopeImpl;
+import org.apache.usergrid.persistence.queue.LegacyQueueManager;
+import org.apache.usergrid.persistence.queue.impl.LegacyQueueScopeImpl;
import org.apache.usergrid.services.ServiceManager;
import org.apache.usergrid.services.ServiceManagerFactory;
import org.slf4j.Logger;
@@ -43,7 +43,7 @@ import java.util.concurrent.atomic.AtomicInteger;
*/
public abstract class QueueListener {
public final int MESSAGE_TRANSACTION_TIMEOUT = 25 * 1000;
- private final QueueManagerFactory queueManagerFactory;
+ private final LegacyQueueManagerFactory queueManagerFactory;
public long DEFAULT_SLEEP = 5000;
@@ -83,7 +83,7 @@ public abstract class QueueListener {
*/
public QueueListener(ServiceManagerFactory smf, EntityManagerFactory emf, Injector injector, Properties props){
//TODO: change current injectors to use service module instead of CpSetup
- this.queueManagerFactory = injector.getInstance( QueueManagerFactory.class );
+ this.queueManagerFactory = injector.getInstance( LegacyQueueManagerFactory.class );
this.smf = smf;
this.emf = injector.getInstance( EntityManagerFactory.class ); //emf;
this.metricsService = injector.getInstance(MetricsFactory.class);
@@ -169,8 +169,8 @@ public abstract class QueueListener {
if (logger.isTraceEnabled()) {
logger.trace("getting from queue {} ", queueName);
}
- QueueScope queueScope = new QueueScopeImpl( queueName, QueueScope.RegionImplementation.LOCAL);
- QueueManager queueManager = queueManagerFactory.getQueueManager(queueScope);
+ LegacyQueueScope queueScope = new LegacyQueueScopeImpl( queueName, LegacyQueueScope.RegionImplementation.LOCAL);
+ LegacyQueueManager legacyQueueManager = queueManagerFactory.getQueueManager(queueScope);
// run until there are no more active jobs
long runCount = 0;
@@ -181,7 +181,7 @@ public abstract class QueueListener {
Timer.Context timerContext = timer.time();
//Get the messages out of the queue.
//TODO: a model class to get generic queueMessages out of the queueManager. Ask Shawn what should go here.
- rx.Observable.from( queueManager.getMessages(getBatchSize(), ImportQueueMessage.class))
+ rx.Observable.from( legacyQueueManager.getMessages(getBatchSize(), ImportQueueMessage.class))
.buffer(getBatchSize())
.doOnNext(messages -> {
try {
@@ -197,7 +197,7 @@ public abstract class QueueListener {
// asking for a onMessage call.
onMessage(messages);
- queueManager.commitMessages(messages);
+ legacyQueueManager.commitMessages(messages);
meter.mark(messages.size());
if (logger.isTraceEnabled()) {
@@ -267,7 +267,7 @@ public abstract class QueueListener {
* This will be the method that does the job dependant execution.
* @param messages
*/
- public abstract void onMessage(List<QueueMessage> messages) throws Exception;
+ public abstract void onMessage(List<LegacyQueueMessage> messages) throws Exception;
public abstract String getQueueName();