You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2014/10/04 03:11:56 UTC
[2/4] git commit: dependency issues, removing old queueing mechanism
dependency issues, removing old queueing mechanism
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/fadf5277
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/fadf5277
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/fadf5277
Branch: refs/heads/sqs_queues
Commit: fadf5277011d980525e8a35302cd776068446c4a
Parents: eb5204b
Author: Shawn Feldman <sf...@apache.org>
Authored: Fri Oct 3 16:12:35 2014 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Fri Oct 3 16:12:35 2014 -0600
----------------------------------------------------------------------
stack/core/pom.xml | 15 +++++
.../usergrid/corepersistence/GuiceModule.java | 4 ++
stack/corepersistence/map/pom.xml | 16 +++++
stack/corepersistence/pom.xml | 16 +++++
stack/corepersistence/queue/pom.xml | 1 +
.../persistence/queue/QueueManagerFactory.java | 3 -
.../queue/impl/SQSQueueManagerImpl.java | 3 +-
stack/pom.xml | 6 +-
stack/queue/pom.xml | 16 -----
.../notifications/ApplicationQueueManager.java | 69 +++++++-------------
.../notifications/ApplicationQueueMessage.java | 67 +++++++------------
.../notifications/NotificationsService.java | 15 ++++-
.../services/notifications/QueueListener.java | 56 ++++++++--------
.../services/notifications/QueueManager.java | 31 ---------
.../services/notifications/TaskManager.java | 27 ++++----
.../apns/NotificationsServiceIT.java | 2 +-
.../gcm/NotificationsServiceIT.java | 2 +-
17 files changed, 164 insertions(+), 185 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fadf5277/stack/core/pom.xml
----------------------------------------------------------------------
diff --git a/stack/core/pom.xml b/stack/core/pom.xml
index 6cd5dac..30a14d1 100644
--- a/stack/core/pom.xml
+++ b/stack/core/pom.xml
@@ -526,6 +526,21 @@
<type>jar</type>
</dependency>
+ <dependency>
+ <groupId>org.apache.usergrid</groupId>
+ <artifactId>map</artifactId>
+ <version>2.0.0-SNAPSHOT</version>
+ <type>jar</type>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.usergrid</groupId>
+ <artifactId>queue</artifactId>
+ <version>2.0.0-SNAPSHOT</version>
+ <type>jar</type>
+ </dependency>
+
+
<!--<dependency>-->
<!--<artifactId>lucene-core</artifactId>-->
<!--<groupId>org.apache.lucene</groupId>-->
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fadf5277/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceModule.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceModule.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceModule.java
index 1f3d615..42f81d5 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceModule.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceModule.java
@@ -17,6 +17,8 @@
package org.apache.usergrid.corepersistence;
+import org.apache.usergrid.persistence.map.guice.MapModule;
+import org.apache.usergrid.persistence.queue.guice.QueueModule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,6 +43,8 @@ public class GuiceModule extends AbstractModule {
install(new CollectionModule());
install(new GraphModule());
install(new IndexModule());
+ install(new MapModule());
+ install(new QueueModule());
bind(CpEntityDeleteListener.class).asEagerSingleton();
bind(CpEntityIndexDeleteListener.class).asEagerSingleton();
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fadf5277/stack/corepersistence/map/pom.xml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/map/pom.xml b/stack/corepersistence/map/pom.xml
index 93b1030..e9cb5ab 100644
--- a/stack/corepersistence/map/pom.xml
+++ b/stack/corepersistence/map/pom.xml
@@ -1,4 +1,20 @@
<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fadf5277/stack/corepersistence/pom.xml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/pom.xml b/stack/corepersistence/pom.xml
index 90b4838..7482271 100644
--- a/stack/corepersistence/pom.xml
+++ b/stack/corepersistence/pom.xml
@@ -1,4 +1,20 @@
<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fadf5277/stack/corepersistence/queue/pom.xml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/pom.xml b/stack/corepersistence/queue/pom.xml
index 6795248..94f11a8 100644
--- a/stack/corepersistence/queue/pom.xml
+++ b/stack/corepersistence/queue/pom.xml
@@ -85,6 +85,7 @@
<version>1.8.11</version>
</dependency>
+
</dependencies>
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fadf5277/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManagerFactory.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManagerFactory.java
index 79bcba9..4cdb5e2 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManagerFactory.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueManagerFactory.java
@@ -17,9 +17,6 @@
*/
package org.apache.usergrid.persistence.queue;
-/**
- * Created by ApigeeCorporation on 10/3/14.
- */
public interface QueueManagerFactory {
public QueueManager getQueueManager( final QueueScope scope );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fadf5277/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
index 84e3cab..4c898f3 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SQSQueueManagerImpl.java
@@ -67,7 +67,8 @@ public class SQSQueueManagerImpl implements QueueManager {
}
private String getName() {
- return scope.getApplication().getType() + scope.getApplication().getUuid().toString() + scope.getName();
+ String name = scope.getApplication().getType() + scope.getApplication().getUuid().toString() + scope.getName();
+ return name;
}
public Queue getQueue(){
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fadf5277/stack/pom.xml
----------------------------------------------------------------------
diff --git a/stack/pom.xml b/stack/pom.xml
index e105c79..2e0d835 100644
--- a/stack/pom.xml
+++ b/stack/pom.xml
@@ -276,7 +276,7 @@
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
- <version>4.1.3</version>
+ <version>4.2</version>
<exclusions>
<exclusion>
<groupId>commons-codec</groupId>
@@ -457,6 +457,10 @@
<version>${cassandra-version}</version>
<exclusions>
<exclusion>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
+ </exclusion>
+ <exclusion>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
</exclusion>
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fadf5277/stack/queue/pom.xml
----------------------------------------------------------------------
diff --git a/stack/queue/pom.xml b/stack/queue/pom.xml
deleted file mode 100644
index 4cdf79d..0000000
--- a/stack/queue/pom.xml
+++ /dev/null
@@ -1,16 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <parent>
- <artifactId>persistence</artifactId>
- <groupId>org.apache.usergrid</groupId>
- <version>2.0.0-SNAPSHOT</version>
- <relativePath>../corepersistence/pom.xml</relativePath>
- </parent>
- <modelVersion>4.0.0</modelVersion>
-
- <artifactId>queue</artifactId>
-
-
-</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fadf5277/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
index c88cead..bb0061d 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManager.java
@@ -27,6 +27,8 @@ import org.apache.usergrid.persistence.entities.Notification;
import org.apache.usergrid.persistence.entities.Notifier;
import org.apache.usergrid.persistence.entities.Receipt;
import org.apache.usergrid.persistence.index.query.Query;
+import org.apache.usergrid.persistence.queue.QueueManager;
+import org.apache.usergrid.persistence.queue.QueueMessage;
import org.apache.usergrid.services.notifications.apns.APNsAdapter;
import org.apache.usergrid.services.notifications.gcm.GCMAdapter;
import org.slf4j.Logger;
@@ -42,13 +44,10 @@ import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
-/**
- * Created by ApigeeCorporation on 8/27/14.
- */
-public class ApplicationQueueManager implements QueueManager {
+public class ApplicationQueueManager {
- public static String DEFAULT_QUEUE_NAME = "notifications/queuelistenerv1_40;notifications/queuelistenerv1_41;notifications/queuelistenerv1_42";
- public static final String DEFAULT_QUEUE_PROPERTY = "usergrid.notifications.listener.queueName";
+ public static String DEFAULT_QUEUE_NAME = "queuelistenerv1_60";
+ public static final String DEFAULT_QUEUE_PROPERTY = "usergrid.notifications.listener.queue";
private static final Logger LOG = LoggerFactory.getLogger(ApplicationQueueManager.class);
//this is for tests, will not mark initial post complete, set to false for tests
@@ -57,11 +56,10 @@ public class ApplicationQueueManager implements QueueManager {
public static final String NOTIFIER_ID_POSTFIX = ".notifier.id";
private final EntityManager em;
- private final org.apache.usergrid.mq.QueueManager qm;
+ private final QueueManager qm;
private final JobScheduler jobScheduler;
private final MetricsFactory metricsFactory;
- private final String[] queueNames;
- private boolean sendNow = true;
+ private final String queueName;
HashMap<Object, Notifier> notifierHashMap; // only retrieve notifiers once
@@ -77,13 +75,12 @@ public class ApplicationQueueManager implements QueueManager {
public static ProviderAdapter TEST_ADAPTER = new TestAdapter();
- public ApplicationQueueManager(JobScheduler jobScheduler, EntityManager entityManager, org.apache.usergrid.mq.QueueManager queueManager, MetricsFactory metricsFactory, Properties properties){
+ public ApplicationQueueManager(JobScheduler jobScheduler, EntityManager entityManager, QueueManager queueManager, MetricsFactory metricsFactory, Properties properties){
this.em = entityManager;
this.qm = queueManager;
this.jobScheduler = jobScheduler;
this.metricsFactory = metricsFactory;
- this.queueNames = getQueueNames(properties);
- this.sendNow = new Boolean(properties.getProperty("usergrid.notifications.sendNow",""+sendNow));
+ this.queueName = getQueueNames(properties);
}
@@ -115,7 +112,6 @@ public class ApplicationQueueManager implements QueueManager {
final ConcurrentLinkedQueue<String> errorMessages = new ConcurrentLinkedQueue<String>(); //build up list of issues
final HashMap<Object,Notifier> notifierMap = getNotifierMap();
- final String queueName = getRandomQueue(queueNames);
final List<ApplicationQueueMessage> messages = new ArrayList<>();
//get devices in querystring, and make sure you have access
@@ -132,7 +128,6 @@ public class ApplicationQueueManager implements QueueManager {
final UUID appId = em.getApplication().getUuid();
final Map<String,Object> payloads = notification.getPayloads();
- final boolean sendNow = this.sendNow; //&& jobExecution == null;
final Func1<Entity,Entity> entityListFunct = new Func1<Entity, Entity>() {
@Override
@@ -183,11 +178,8 @@ public class ApplicationQueueManager implements QueueManager {
LOG.info("ApplicationQueueMessage: notification {} device {} queue time set. duration "+(System.currentTimeMillis()-now)+" ms", notification.getUuid(), deviceRef.getUuid());
}
now = System.currentTimeMillis();
- if(sendNow){ //if(jobExecution == null && sendNow) {
- messages.add(message);
- }else{
- qm.postToQueue(queueName, message);
- }
+
+ qm.sendMessage(message);
LOG.info("ApplicationQueueMessage: notification {} post-queue to device {} duration " + (System.currentTimeMillis() - now) + " ms "+queueName+" queue", notification.getUuid(), deviceRef.getUuid());
deviceCount.incrementAndGet();
queueMeter.mark();
@@ -242,7 +234,7 @@ public class ApplicationQueueManager implements QueueManager {
//do i have devices, and have i already started batching.
if (deviceCount.get() <= 0) {
- TaskManager taskManager = new TaskManager(em, qm, this, notification,queueName);
+ TaskManager taskManager = new TaskManager(em, this, notification,this.qm);
//if i'm in a test value will be false, do not mark finished for test orchestration, not ideal need real tests
taskManager.finishedBatch();
}
@@ -251,13 +243,6 @@ public class ApplicationQueueManager implements QueueManager {
long elapsed = notification.getQueued() != null ? notification.getQueued() - startTime : 0;
LOG.info("ApplicationQueueMessage: notification {} done queuing to {} devices in "+elapsed+" ms",notification.getUuid().toString(),deviceCount.get());
}
-
- if(messages.size()>0){
- now = System.currentTimeMillis();
- sendBatchToProviders(messages,null).toBlocking().lastOrDefault(null);
- LOG.info("ApplicationQueueMessage: notification {} done sending to "+messages.size()+" devicess in {} ms", notification.getUuid(), System.currentTimeMillis() - now);
- }
-
}
/**
@@ -300,20 +285,22 @@ public class ApplicationQueueManager implements QueueManager {
* @throws Exception
*/
- public Observable sendBatchToProviders( final List<ApplicationQueueMessage> messages, final String queuePath) {
+ public Observable sendBatchToProviders( final List<QueueMessage> messages, final String queuePath) {
LOG.info("sending batch of {} notifications.", messages.size());
final Meter sendMeter = metricsFactory.getMeter(NotificationsService.class, "send");
final Map<Object, Notifier> notifierMap = getNotifierMap();
- final QueueManager proxy = this;
+ final ApplicationQueueManager proxy = this;
final ConcurrentHashMap<UUID,TaskManager> taskMap = new ConcurrentHashMap<UUID, TaskManager>(messages.size());
final ConcurrentHashMap<UUID,Notification> notificationMap = new ConcurrentHashMap<UUID, Notification>(messages.size());
- final Func1<ApplicationQueueMessage, ApplicationQueueMessage> func = new Func1<ApplicationQueueMessage, ApplicationQueueMessage>() {
+ final Func1<QueueMessage, ApplicationQueueMessage> func = new Func1<QueueMessage, ApplicationQueueMessage>() {
@Override
- public ApplicationQueueMessage call(ApplicationQueueMessage message) {
+ public ApplicationQueueMessage call(QueueMessage queueMessage) {
boolean messageCommitted = false;
+ ApplicationQueueMessage message = null;
try {
+ message = (ApplicationQueueMessage) queueMessage.getBody();
LOG.info("start sending notification for device {} for Notification: {} on thread "+Thread.currentThread().getId(), message.getDeviceId(), message.getNotificationId());
UUID deviceUUID = message.getDeviceId();
@@ -325,7 +312,7 @@ public class ApplicationQueueManager implements QueueManager {
}
TaskManager taskManager = taskMap.get(message.getNotificationId());
if (taskManager == null) {
- taskManager = new TaskManager(em, qm, proxy, notification,queuePath);
+ taskManager = new TaskManager(em, proxy, notification, qm);
taskMap.putIfAbsent(message.getNotificationId(), taskManager);
taskManager = taskMap.get(message.getNotificationId());
}
@@ -334,7 +321,7 @@ public class ApplicationQueueManager implements QueueManager {
final Map<String, Object> translatedPayloads = translatePayloads(payloads, notifierMap);
LOG.info("sending notification for device {} for Notification: {}", deviceUUID, notification.getUuid());
- taskManager.addMessage(deviceUUID,message);
+ taskManager.addMessage(deviceUUID,queueMessage);
try {
String notifierName = message.getNotifierKey().toLowerCase();
Notifier notifier = notifierMap.get(notifierName.toLowerCase());
@@ -368,7 +355,7 @@ public class ApplicationQueueManager implements QueueManager {
LOG.error("Failure while sending",e);
try {
if(!messageCommitted && queuePath != null) {
- qm.commitTransaction(queuePath, message.getTransaction(), null);
+ qm.commitMessage(queueMessage);
}
}catch (Exception queueException){
LOG.error("Failed to commit message.",queueException);
@@ -378,9 +365,9 @@ public class ApplicationQueueManager implements QueueManager {
}
};
Observable o = rx.Observable.from(messages)
- .parallel(new Func1<rx.Observable<ApplicationQueueMessage>, rx.Observable<ApplicationQueueMessage>>() {
+ .parallel(new Func1<rx.Observable<QueueMessage>, rx.Observable<ApplicationQueueMessage>>() {
@Override
- public rx.Observable<ApplicationQueueMessage> call(rx.Observable<ApplicationQueueMessage> messageObservable) {
+ public rx.Observable<ApplicationQueueMessage> call(rx.Observable<QueueMessage> messageObservable) {
return messageObservable.map(func);
}
}, Schedulers.io())
@@ -446,14 +433,8 @@ public class ApplicationQueueManager implements QueueManager {
return translatedPayloads;
}
- public static String[] getQueueNames(Properties properties) {
- String[] names = properties.getProperty(ApplicationQueueManager.DEFAULT_QUEUE_PROPERTY,ApplicationQueueManager.DEFAULT_QUEUE_NAME).split(";");
- return names;
- }
- public static String getRandomQueue(String[] queueNames) {
- int size = queueNames.length;
- Random random = new Random();
- String name = queueNames[random.nextInt(size)];
+ public static String getQueueNames(Properties properties) {
+ String name = properties.getProperty(ApplicationQueueManager.DEFAULT_QUEUE_PROPERTY,ApplicationQueueManager.DEFAULT_QUEUE_NAME);
return name;
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fadf5277/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueMessage.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueMessage.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueMessage.java
index 91f1312..fa75531 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueMessage.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueMessage.java
@@ -16,6 +16,7 @@
*/
package org.apache.usergrid.services.notifications;
+import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.Arrays;
import org.apache.usergrid.mq.Message;
@@ -28,23 +29,25 @@ import org.slf4j.LoggerFactory;
/**
* Created by ApigeeCorporation on 9/4/14.
*/
-public class ApplicationQueueMessage extends Message {
+public class ApplicationQueueMessage implements Serializable {
private static final Logger log = LoggerFactory.getLogger(ApplicationQueueMessage.class);
-
- static final String MESSAGE_PROPERTY_DEVICE_UUID = "deviceUUID";
- static final String MESSAGE_PROPERTY_APPLICATION_UUID = "applicationUUID";
- static final String MESSAGE_PROPERTY_NOTIFIER_ID = "notifierId";
- static final String MESSAGE_PROPERTY_NOTIFICATION_ID = "notificationId";
- static final String MESSAGE_PROPERTY_NOTIFIER_NAME = "notifierName";
+ private UUID applicationId;
+ private UUID notificationId;
+ private UUID deviceId;
+ private String notifierKey;
+ private String notifierId;
public ApplicationQueueMessage() {
}
public ApplicationQueueMessage(UUID applicationId, UUID notificationId, UUID deviceId, String notifierKey, String notifierId) {
- setApplicationId(applicationId);
- setDeviceId(deviceId);
+ this.applicationId = applicationId;
+ this.notificationId = notificationId;
+ this.deviceId = deviceId;
+ this.notifierKey = notifierKey;
+ this.notifierId = notifierId;
setNotificationId(notificationId);
setNotifierKey(notifierKey);
setNotifierId(notifierId);
@@ -58,71 +61,45 @@ public class ApplicationQueueMessage extends Message {
return new UUID( msb, lsb );
}
- public static ApplicationQueueMessage generate(Message message) {
-
- // this crazyness may indicate that Core Persistence is not storing UUIDs correctly
-
- byte[] mpaBytes = (byte[])message.getObjectProperty(MESSAGE_PROPERTY_APPLICATION_UUID);
- UUID mpaUuid = bytesToUuid(mpaBytes);
-
- byte[] mpnBytes = (byte[])message.getObjectProperty(MESSAGE_PROPERTY_NOTIFICATION_ID);
- UUID mpnUuid = bytesToUuid(mpnBytes);
-
- final UUID mpdUuid;
- Object o = message.getObjectProperty(MESSAGE_PROPERTY_DEVICE_UUID);
- if ( o instanceof UUID ) {
- mpdUuid = (UUID)message.getObjectProperty(MESSAGE_PROPERTY_DEVICE_UUID);
- } else {
- byte[] mpdBytes = (byte[])o;
- mpdUuid = bytesToUuid(mpdBytes);
- }
-
- // end of crazyness
-
- return new ApplicationQueueMessage(
- mpaUuid, mpnUuid, mpdUuid,
- message.getStringProperty(MESSAGE_PROPERTY_NOTIFIER_NAME),
- message.getStringProperty(MESSAGE_PROPERTY_NOTIFIER_ID));
- }
public UUID getApplicationId() {
- return (UUID) this.getObjectProperty(MESSAGE_PROPERTY_APPLICATION_UUID);
+ return applicationId;
}
public void setApplicationId(UUID applicationId) {
- this.setProperty(MESSAGE_PROPERTY_APPLICATION_UUID, applicationId);
+ this.applicationId = applicationId;
}
public UUID getDeviceId() {
- return (UUID) this.getObjectProperty(MESSAGE_PROPERTY_DEVICE_UUID);
+ return deviceId;
}
public void setDeviceId(UUID deviceId) {
- this.setProperty(MESSAGE_PROPERTY_DEVICE_UUID, deviceId);
+ this.deviceId = deviceId;
}
public UUID getNotificationId() {
- return (UUID) this.getObjectProperty(MESSAGE_PROPERTY_NOTIFICATION_ID);
+ return notificationId;
}
public void setNotificationId(UUID notificationId) {
- this.setProperty(MESSAGE_PROPERTY_NOTIFICATION_ID, notificationId);
+ this.notificationId = notificationId;
}
public String getNotifierId() {
- return this.getStringProperty(MESSAGE_PROPERTY_NOTIFIER_ID);
+ return notifierId;
}
public void setNotifierId(String notifierId) {
- this.setProperty(MESSAGE_PROPERTY_NOTIFIER_ID, notifierId);
+ this.notifierId = notifierId;
}
public String getNotifierKey() {
- return this.getStringProperty(MESSAGE_PROPERTY_NOTIFIER_NAME);
+ return notifierKey;
}
public void setNotifierKey(String name) {
- this.setProperty(MESSAGE_PROPERTY_NOTIFIER_NAME, name);
+ notifierKey = name;
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fadf5277/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
index 4e5692e..c5cd3c4 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
@@ -20,6 +20,7 @@ import java.util.*;
import com.codahale.metrics.*;
import com.codahale.metrics.Timer;
+import org.apache.usergrid.corepersistence.CpSetup;
import org.apache.usergrid.metrics.MetricsFactory;
import org.apache.usergrid.mq.Message;
import org.apache.usergrid.persistence.*;
@@ -28,6 +29,11 @@ import org.apache.usergrid.persistence.entities.Notifier;
import org.apache.usergrid.persistence.entities.Receipt;
import org.apache.usergrid.persistence.index.query.Identifier;
import org.apache.usergrid.persistence.index.query.Query;
+import org.apache.usergrid.persistence.model.entity.SimpleId;
+import org.apache.usergrid.persistence.queue.QueueManager;
+import org.apache.usergrid.persistence.queue.QueueManagerFactory;
+import org.apache.usergrid.persistence.queue.QueueScope;
+import org.apache.usergrid.persistence.queue.impl.QueueScopeImpl;
import org.apache.usergrid.services.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -80,10 +86,9 @@ public class NotificationsService extends AbstractCollectionService {
private ApplicationQueueManager notificationQueueManager;
private long gracePeriod;
- @Autowired
private ServiceManagerFactory smf;
- @Autowired
private EntityManagerFactory emf;
+ private QueueManagerFactory queueManagerFactory;
public NotificationsService() {
LOG.info("/notifications");
@@ -99,7 +104,11 @@ public class NotificationsService extends AbstractCollectionService {
postMeter = metricsService.getMeter(NotificationsService.class, "requests");
postTimer = metricsService.getTimer(this.getClass(), "execution_rest");
JobScheduler jobScheduler = new JobScheduler(sm,em);
- notificationQueueManager = new ApplicationQueueManager(jobScheduler,em,smf.getServiceManager(smf.getManagementAppId()).getQueueManager(),metricsService,props);
+ String name = ApplicationQueueManager.getQueueNames(props);
+ QueueScope queueScope = new QueueScopeImpl(new SimpleId(smf.getManagementAppId(),"notifications"),name);
+ queueManagerFactory = CpSetup.getInjector().getInstance(QueueManagerFactory.class);
+ QueueManager queueManager = queueManagerFactory.getQueueManager(queueScope);
+ notificationQueueManager = new ApplicationQueueManager(jobScheduler,em,queueManager,metricsService,props);
gracePeriod = jobScheduler.SCHEDULER_GRACE_PERIOD;
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fadf5277/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
index 7d76ee9..a9e0702 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
@@ -16,11 +16,17 @@
*/
package org.apache.usergrid.services.notifications;
+import org.apache.usergrid.corepersistence.CpSetup;
import org.apache.usergrid.metrics.MetricsFactory;
-import org.apache.usergrid.mq.*;
+
+import org.apache.usergrid.mq.QueueResults;
import org.apache.usergrid.persistence.EntityManager;
import org.apache.usergrid.persistence.EntityManagerFactory;
+import org.apache.usergrid.persistence.model.entity.SimpleId;
+import org.apache.usergrid.persistence.queue.*;
+import org.apache.usergrid.persistence.queue.QueueManager;
+import org.apache.usergrid.persistence.queue.impl.QueueScopeImpl;
import org.apache.usergrid.services.ServiceManager;
import org.apache.usergrid.services.ServiceManagerFactory;
import org.slf4j.Logger;
@@ -35,7 +41,8 @@ import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
public class QueueListener {
- public final long MESSAGE_TRANSACTION_TIMEOUT = 1 * 60 * 1000;
+ public final int MESSAGE_TRANSACTION_TIMEOUT = 25 * 1000;
+ private final QueueManagerFactory queueManagerFactory;
public long DEFAULT_SLEEP = 5000;
@@ -50,7 +57,6 @@ public class QueueListener {
private Properties properties;
- private org.apache.usergrid.mq.QueueManager queueManager;
private ServiceManager svcMgr;
@@ -62,12 +68,11 @@ public class QueueListener {
private List<Future> futures;
public final String MAX_THREADS = "2";
- private Integer batchSize = 100;
- private String[] queueNames;
-
-
+ private Integer batchSize = 10;
+ private String queueName;
public QueueListener(ServiceManagerFactory smf, EntityManagerFactory emf, MetricsFactory metricsService, Properties props){
+ this.queueManagerFactory = CpSetup.getInjector().getInstance(QueueManagerFactory.class);
this.smf = smf;
this.emf = emf;
this.metricsService = metricsService;
@@ -76,7 +81,7 @@ public class QueueListener {
@PostConstruct
public void start(){
- boolean shouldRun = new Boolean(properties.getProperty("usergrid.notifications.listener.run", "false"));
+ boolean shouldRun = new Boolean(properties.getProperty("usergrid.notifications.listener.run", "true"));
if(shouldRun) {
LOG.info("QueueListener: starting.");
@@ -86,7 +91,7 @@ public class QueueListener {
sleepBetweenRuns = new Long(properties.getProperty("usergrid.notifications.listener.sleep.between", "0")).longValue();
sleepWhenNoneFound = new Long(properties.getProperty("usergrid.notifications.listener.sleep.after", ""+DEFAULT_SLEEP)).longValue();
batchSize = new Integer(properties.getProperty("usergrid.notifications.listener.batchSize", (""+batchSize)));
- queueNames = ApplicationQueueManager.getQueueNames(properties);
+ queueName = ApplicationQueueManager.getQueueNames(properties);
int maxThreads = new Integer(properties.getProperty("usergrid.notifications.listener.maxThreads", MAX_THREADS));
futures = new ArrayList<Future>(maxThreads);
@@ -120,7 +125,7 @@ public class QueueListener {
}
private void execute(){
- Thread.currentThread().setDaemon(true);
+// Thread.currentThread().setDaemon(true);
Thread.currentThread().setName("Notifications_Processor"+UUID.randomUUID());
final AtomicInteger consecutiveExceptions = new AtomicInteger();
@@ -130,31 +135,30 @@ public class QueueListener {
while ( true ) {
try {
svcMgr = smf.getServiceManager(smf.getManagementAppId());
- queueManager = svcMgr.getQueueManager();
- String queueName = ApplicationQueueManager.getRandomQueue(queueNames);
LOG.info("getting from queue {} ", queueName);
- QueueResults results = getDeliveryBatch(queueManager,queueName);
- LOG.info("QueueListener: retrieved batch of {} messages from queue {} ", results.size(),queueName);
+ QueueScope queueScope = new QueueScopeImpl(new SimpleId(smf.getManagementAppId(),"notifications"),queueName);
+ QueueManager queueManager = queueManagerFactory.getQueueManager(queueScope);
+ List<QueueMessage> messages = getDeliveryBatch(queueManager);
+ LOG.info("QueueListener: retrieved batch of {} messages from queue {} ", messages.size(),queueName);
- List<Message> messages = results.getMessages();
if (messages.size() > 0) {
- HashMap<UUID, List<ApplicationQueueMessage>> messageMap = new HashMap<>(messages.size());
+ HashMap<UUID, List<QueueMessage>> messageMap = new HashMap<>(messages.size());
//group messages into hash map by app id
- for (Message message : messages) {
- ApplicationQueueMessage queueMessage = ApplicationQueueMessage.generate(message);
+ for (QueueMessage message : messages) {
+ ApplicationQueueMessage queueMessage = (ApplicationQueueMessage) message.getBody();
UUID applicationId = queueMessage.getApplicationId();
if (!messageMap.containsKey(applicationId)) {
- List<ApplicationQueueMessage> applicationQueueMessages = new ArrayList<ApplicationQueueMessage>();
- applicationQueueMessages.add(queueMessage);
+ List<QueueMessage> applicationQueueMessages = new ArrayList<QueueMessage>();
+ applicationQueueMessages.add(message);
messageMap.put(applicationId, applicationQueueMessages);
} else {
- messageMap.get(applicationId).add(queueMessage);
+ messageMap.get(applicationId).add(message);
}
}
long now = System.currentTimeMillis();
Observable merge = null;
//send each set of app ids together
- for (Map.Entry<UUID, List<ApplicationQueueMessage>> entry : messageMap.entrySet()) {
+ for (Map.Entry<UUID, List<QueueMessage>> entry : messageMap.entrySet()) {
UUID applicationId = entry.getKey();
EntityManager entityManager = emf.getEntityManager(applicationId);
ServiceManager serviceManager = smf.getServiceManager(applicationId);
@@ -218,11 +222,9 @@ public class QueueListener {
pool.shutdownNow();
}
- private QueueResults getDeliveryBatch(org.apache.usergrid.mq.QueueManager queueManager,String queuePath) throws Exception {
- QueueQuery qq = new QueueQuery();
- qq.setLimit(this.getBatchSize());
- qq.setTimeout(MESSAGE_TRANSACTION_TIMEOUT);
- QueueResults results = queueManager.getFromQueue(queuePath, qq);
+ private List<QueueMessage> getDeliveryBatch(QueueManager queueManager) throws Exception {
+
+ List<QueueMessage> results = queueManager.getMessages(getBatchSize(),MESSAGE_TRANSACTION_TIMEOUT);
return results;
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fadf5277/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueManager.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueManager.java
deleted file mode 100644
index 0024417..0000000
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueManager.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.usergrid.services.notifications;
-
-import org.apache.usergrid.persistence.entities.Notifier;
-
-import java.util.HashMap;
-import java.util.Set;
-
-/**
- * Created by ApigeeCorporation on 9/4/14.
- */
-public interface QueueManager {
-
- public void asyncCheckForInactiveDevices(Set<Notifier> notifiers) throws Exception ;
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fadf5277/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java
index 08f067d..42bd65c 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/TaskManager.java
@@ -23,6 +23,8 @@ import org.apache.usergrid.persistence.entities.Device;
import org.apache.usergrid.persistence.entities.Notification;
import org.apache.usergrid.persistence.entities.Notifier;
import org.apache.usergrid.persistence.entities.Receipt;
+import org.apache.usergrid.persistence.queue.QueueManager;
+import org.apache.usergrid.persistence.queue.QueueMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,28 +35,26 @@ import java.util.concurrent.atomic.AtomicLong;
public class TaskManager {
private static final Logger LOG = LoggerFactory.getLogger(TaskManager.class);
- private final QueueManager proxy;
- private final String queuePath;
+ private final ApplicationQueueManager proxy;
+ private final QueueManager queueManager;
private Notification notification;
private AtomicLong successes = new AtomicLong();
private AtomicLong failures = new AtomicLong();
- private org.apache.usergrid.mq.QueueManager qm;
private EntityManager em;
- private ConcurrentHashMap<UUID, ApplicationQueueMessage> messageMap;
+ private ConcurrentHashMap<UUID, QueueMessage> messageMap;
private boolean hasFinished;
- public TaskManager(EntityManager em, org.apache.usergrid.mq.QueueManager qm, QueueManager proxy, Notification notification, String queuePath) {
+ public TaskManager(EntityManager em,ApplicationQueueManager proxy, Notification notification, QueueManager queueManager) {
this.em = em;
- this.qm = qm;
this.notification = notification;
this.proxy = proxy;
- this.messageMap = new ConcurrentHashMap<UUID, ApplicationQueueMessage>();
+ this.messageMap = new ConcurrentHashMap<UUID, QueueMessage>();
hasFinished = false;
- this.queuePath = queuePath;
+ this.queueManager = queueManager;
}
- public void addMessage(UUID deviceId, ApplicationQueueMessage message) {
+ public void addMessage(UUID deviceId, QueueMessage message) {
messageMap.put(deviceId, message);
}
@@ -62,9 +62,7 @@ public class TaskManager {
LOG.debug("REMOVED {}", deviceUUID);
try {
LOG.debug("notification {} removing device {} from remaining", notification.getUuid(), deviceUUID);
- if(queuePath!=null){
- qm.commitTransaction(queuePath, messageMap.get(deviceUUID).getTransaction(), null);
- }
+
EntityRef deviceRef = new SimpleEntityRef(Device.ENTITY_TYPE, deviceUUID);
if (receipt != null) {
@@ -148,6 +146,11 @@ public class TaskManager {
}
public void finishedBatch() throws Exception {
+ if(queueManager!=null){
+ List<QueueMessage> list = new ArrayList<QueueMessage>();
+ list.addAll(messageMap.values());
+ queueManager.commitMessages(list);
+ }
long successes = this.successes.getAndSet(0); //reset counters
long failures = this.failures.getAndSet(0); //reset counters
this.hasFinished = true;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fadf5277/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java b/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java
index 27e6d27..12d0925 100644
--- a/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/services/notifications/apns/NotificationsServiceIT.java
@@ -65,7 +65,7 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
@BeforeClass
public static void setup(){
- ApplicationQueueManager.DEFAULT_QUEUE_NAME = "notifications/test/" + UUID.randomUUID().toString()+";"+"notifications/test/" + UUID.randomUUID().toString();
+ ApplicationQueueManager.DEFAULT_QUEUE_NAME = "test";
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fadf5277/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java
----------------------------------------------------------------------
diff --git a/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java b/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java
index 8907226..034723e 100644
--- a/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java
+++ b/stack/services/src/test/java/org/apache/usergrid/services/notifications/gcm/NotificationsServiceIT.java
@@ -61,7 +61,7 @@ public class NotificationsServiceIT extends AbstractServiceNotificationIT {
@BeforeClass
public static void setup(){
- ApplicationQueueManager.DEFAULT_QUEUE_NAME = "notifications/test/" + UUID.randomUUID().toString()+";"+"notifications/test/" + UUID.randomUUID().toString();
+ ApplicationQueueManager.DEFAULT_QUEUE_NAME = "test";
}
@Override
@Before