You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2014/08/22 22:36:47 UTC
[3/3] git commit: removing props
removing props
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/8a8f7e6b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/8a8f7e6b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/8a8f7e6b
Branch: refs/heads/two-dot-o-notifications-queue
Commit: 8a8f7e6b0fceac6fade29887f39b955272c8f38b
Parents: 5be4d45
Author: Shawn Feldman <sf...@apache.org>
Authored: Fri Aug 22 14:35:33 2014 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Fri Aug 22 14:35:33 2014 -0600
----------------------------------------------------------------------
.../NotificationsQueueManager.java | 28 ++------------------
.../notifications/NotificationsService.java | 2 +-
.../services/notifications/QueueListener.java | 20 +++++---------
3 files changed, 9 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8a8f7e6b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsQueueManager.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsQueueManager.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsQueueManager.java
index 9f49b2a..e2b9df3 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsQueueManager.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsQueueManager.java
@@ -59,15 +59,10 @@ public class NotificationsQueueManager implements NotificationServiceProxy {
private final Histogram queueSize;
private static ExecutorService INACTIVE_DEVICE_CHECK_POOL = Executors.newFixedThreadPool(5);
public static final String NOTIFIER_ID_POSTFIX = ".notifier.id";
- // If this property is set Notifications are automatically expired in
- // the isOkToSent() method after the specified number of milliseconds
- public static final String PUSH_AUTO_EXPIRE_AFTER_PROPNAME = "usergrid.push-auto-expire-after";
+
private final EntityManager em;
private final QueueManager qm;
private final JobScheduler jobScheduler;
- private final Properties props;
- private final InflectionUtils utils;
- private Long pushAutoExpireAfter = null;
public final Map<String, ProviderAdapter> providerAdapters = new HashMap<String, ProviderAdapter>(3);
{
@@ -105,14 +100,12 @@ public class NotificationsQueueManager implements NotificationServiceProxy {
}
});;
- public NotificationsQueueManager(JobScheduler jobScheduler, EntityManager entityManager, Properties props, QueueManager queueManager, MetricsFactory metricsFactory){
+ public NotificationsQueueManager(JobScheduler jobScheduler, EntityManager entityManager, QueueManager queueManager, MetricsFactory metricsFactory){
this.em = entityManager;
this.qm = queueManager;
this.jobScheduler = jobScheduler;
- this.props = props;
this.sendMeter = metricsFactory.getMeter(NotificationsService.class, "send");
this.queueSize = metricsFactory.getHistogram(NotificationsService.class, "queue_size");
- utils = new InflectionUtils();
}
public boolean scheduleQueueJob(Notification notification) throws Exception{
@@ -132,8 +125,6 @@ public class NotificationsQueueManager implements NotificationServiceProxy {
LOG.info("notification {} start queuing", notification.getUuid());
final PathQuery<Device> pathQuery = notification.getPathQuery(); //devices query
final AtomicInteger deviceCount = new AtomicInteger(); //count devices so you can make a judgement on batching
- final AtomicInteger batchCount = new AtomicInteger(); //count devices so you can make a judgement on batching
- final int numCurrentBatchesConfig = getNumConcurrentBatches();
final ConcurrentLinkedQueue<String> errorMessages = new ConcurrentLinkedQueue<String>(); //build up list of issues
final HashMap<Object,Notifier> notifierMap = getNotifierMap();
final Map<String,Object> payloads = notification.getPayloads();
@@ -208,7 +199,6 @@ public class NotificationsQueueManager implements NotificationServiceProxy {
}, Schedulers.io()).toBlocking().lastOrDefault(null);
}
- batchCount.set(Math.min(numCurrentBatchesConfig, batchCount.get()));
// update queued time
Map<String, Object> properties = new HashMap<String, Object>(2);
properties.put("queued", notification.getQueued());
@@ -451,9 +441,6 @@ public class NotificationsQueueManager implements NotificationServiceProxy {
return translatedPayloads;
}
- private int getNumConcurrentBatches() {
- return Integer.parseInt(props.getProperty(NOTIFICATION_CONCURRENT_BATCHES, "1"));
- }
private static final class IteratorObservable<T> implements rx.Observable.OnSubscribe<T> {
private final Iterator<T> input;
@@ -549,17 +536,6 @@ public class NotificationsQueueManager implements NotificationServiceProxy {
}
private boolean isOkToSend(Notification notification) {
- String autoExpireAfterString = props.getProperty(PUSH_AUTO_EXPIRE_AFTER_PROPNAME);
-
- if (autoExpireAfterString != null) {
- pushAutoExpireAfter = Long.parseLong(autoExpireAfterString);
- }
- if (pushAutoExpireAfter != null) {
- if (notification.getCreated() < System.currentTimeMillis() - pushAutoExpireAfter) {
- notification.setExpire(System.currentTimeMillis() - 1L);
- }
- }
-
if (notification.getFinished() != null) {
LOG.info("notification {} already processed. not sending.",
notification.getUuid());
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8a8f7e6b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
index 1a76b10..478e9c2 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
@@ -96,7 +96,7 @@ public class NotificationsService extends AbstractCollectionService {
queueSize = metricsService.getHistogram(NotificationsService.class, "queue_size");
outstandingQueue = metricsService.getCounter(NotificationsService.class,"current_queue");
JobScheduler jobScheduler = new JobScheduler(sm,em);
- notificationQueueManager = new NotificationsQueueManager(jobScheduler,em,sm.getProperties(),sm.getQueueManager(),metricsService);
+ notificationQueueManager = new NotificationsQueueManager(jobScheduler,em,sm.getQueueManager(),metricsService);
gracePeriod = jobScheduler.SCHEDULER_GRACE_PERIOD;
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8a8f7e6b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
index 18ea0ca..a1ab0db 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
@@ -33,6 +33,8 @@ import rx.Observable;
import javax.annotation.PostConstruct;
import java.util.*;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
@Component( "notificationsQueueListener" )
@@ -53,25 +55,16 @@ public class QueueListener {
private EntityManagerFactory emf;
private QueueManager queueManager;
private ServiceManager svcMgr;
- private Properties properties;
-
+ ExecutorService pool;
public QueueListener() {
+ // pool = Executors.newFixedThreadPool(1);
}
@PostConstruct
void init() {
- svcMgr = smf.getServiceManager(smf.getManagementAppId());
+ svcMgr = smf.getServiceManager(smf.getManagementAppId());
queueManager = svcMgr.getQueueManager();
- properties = new Properties();
- try {
- properties.load(Thread.currentThread()
- .getContextClassLoader()
- .getResourceAsStream("usergrid.properties"));
- } catch (Exception e) {
- LOG.error("Could not load props","");
- }
-
- run();
+ // run();
}
public void run(){
@@ -103,7 +96,6 @@ public class QueueListener {
NotificationsQueueManager manager = new NotificationsQueueManager(
new JobScheduler(serviceManager,entityManager),
entityManager,
- properties,
queueManager,
metricsService
);