You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by mr...@apache.org on 2016/04/30 12:16:43 UTC
usergrid git commit: Re-work caching of push notification queue
managers such that producing and consuming leverage the same cache.
Repository: usergrid
Updated Branches:
refs/heads/release-2.1.1 c07cdb56c -> d16f4c17c
Re-work caching of push notification queue managers such that producing and consuming leverage the same cache.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/d16f4c17
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/d16f4c17
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/d16f4c17
Branch: refs/heads/release-2.1.1
Commit: d16f4c17c7b4eda01a8eddf2020139ec3898521d
Parents: c07cdb5
Author: Michael Russo <mr...@apigee.com>
Authored: Sat Apr 30 18:14:40 2016 +0800
Committer: Michael Russo <mr...@apigee.com>
Committed: Sat Apr 30 18:14:40 2016 +0800
----------------------------------------------------------------------
.../main/resources/usergrid-default.properties | 10 +-
.../shard/impl/ShardGroupCompactionImpl.java | 18 ++-
.../queue/impl/QueueManagerFactoryImpl.java | 27 ++--
.../ApplicationQueueManagerCache.java | 143 +++++++++++++++++++
.../notifications/NotificationsService.java | 6 +-
.../services/notifications/QueueListener.java | 58 ++------
.../impl/ApplicationQueueManagerImpl.java | 8 +-
7 files changed, 197 insertions(+), 73 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/d16f4c17/stack/config/src/main/resources/usergrid-default.properties
----------------------------------------------------------------------
diff --git a/stack/config/src/main/resources/usergrid-default.properties b/stack/config/src/main/resources/usergrid-default.properties
index 5cd7c7a..4f57cdd 100644
--- a/stack/config/src/main/resources/usergrid-default.properties
+++ b/stack/config/src/main/resources/usergrid-default.properties
@@ -455,12 +455,20 @@ usergrid.scheduler.job.queueName=/jobs
# Set the number of queue consumers to read from the in-region push notification queue.
#
-usergrid.push.worker_count=8
+usergrid.push.worker_count=2
# Set the sleep time between queue polling ( in milliseconds)
#
usergrid.push.sleep=100
+# This setting determines the inmemory cache TTL (in minutes) for push notifications queue managers.
+#
+usergrid.push.queuemanager.cache.time-to-live=10
+
+# This setting determines the inmemory cache size (# elements) for push notifications queue managers.
+#
+usergrid.push.queuemanager.cache.size=200
+
############################### Usergrid Central SSO #############################
http://git-wip-us.apache.org/repos/asf/usergrid/blob/d16f4c17/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
index b26ee46..0853adb 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
@@ -303,9 +303,6 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
logger.trace("Finished compacting {} shards and moved {} edges", sourceShards, totalEdgeCount);
}
- logger.info("Finished compacting {} shards and moved {} edges", sourceShards, totalEdgeCount);
-
-
resultBuilder.withCopiedEdges( totalEdgeCount ).withSourceShards( sourceShards ).withTargetShard( targetShard );
/**
@@ -351,8 +348,9 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
Shard compactedShard = new Shard( targetShard.getShardIndex(), timeService.getCurrentTime(), true );
compactedShard.setShardEnd(targetShard.getShardEnd());
- logger.info( "Shard has been fully compacted. Marking shard {} as compacted in Cassandra", compactedShard );
-
+ if(logger.isTraceEnabled()) {
+ logger.trace("Shard has been fully compacted. Marking shard {} as compacted in Cassandra", compactedShard);
+ }
final MutationBatch updateMark = edgeShardSerialization.writeShardMeta( scope, compactedShard, edgeMeta );
try {
@@ -402,7 +400,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
}
catch ( RejectedExecutionException ree ) {
- //ignore, if this happens we don't care, we're saturated, we can check later
+ // ignore, if this happens we don't care, we're saturated, we can check later
logger.info( "Rejected audit for shard of scope {} edge, meta {} and group {}", scope, edgeMeta, group );
return Futures.immediateFuture( AuditResult.NOT_CHECKED );
@@ -503,8 +501,10 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
*/
try {
CompactionResult result = compact( scope, edgeMeta, group );
- logger.info( "Compaction result for compaction of scope {} with edge meta data of {} and shard group {} is {}",
- scope, edgeMeta, group, result );
+ if(logger.isTraceEnabled()) {
+ logger.trace("Compaction result for compaction of scope {} with edge meta data of {} and shard group {} is {}",
+ scope, edgeMeta, group, result);
+ }
}
finally {
shardCompactionTaskTracker.complete( scope, edgeMeta, group );
@@ -535,8 +535,6 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
ShardEntryGroup group ) {
final Long hash = doHash( scope, edgeMeta, group ).hash().asLong();
final Boolean returned = runningTasks.putIfAbsent( hash, TRUE );
- //logger.info("hash components are app: {}, edgeMeta: {}, group: {}", scope.getApplication(), edgeMeta, group);
- //logger.info("checking hash value of: {}, already started: {}", hash, returned );
/**
* Someone already put the value
http://git-wip-us.apache.org/repos/asf/usergrid/blob/d16f4c17/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerFactoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerFactoryImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerFactoryImpl.java
index de9cac5..93b2fb2 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerFactoryImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueManagerFactoryImpl.java
@@ -1,21 +1,18 @@
/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
*
- * * Licensed to the Apache Software Foundation (ASF) under one or more
- * * contributor license agreements. The ASF licenses this file to You
- * * under the Apache License, Version 2.0 (the "License"); you may not
- * * use this file except in compliance with the License.
- * * You may obtain a copy of the License at
- * *
- * * http://www.apache.org/licenses/LICENSE-2.0
- * *
- * * Unless required by applicable law or agreed to in writing, software
- * * distributed under the License is distributed on an "AS IS" BASIS,
- * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * * See the License for the specific language governing permissions and
- * * limitations under the License. For additional information regarding
- * * copyright in this work, please see the NOTICE file in the top level
- * * directory of this distribution.
+ * http://www.apache.org/licenses/LICENSE-2.0
*
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.usergrid.persistence.queue.impl;
http://git-wip-us.apache.org/repos/asf/usergrid/blob/d16f4c17/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManagerCache.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManagerCache.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManagerCache.java
new file mode 100644
index 0000000..555e495
--- /dev/null
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/ApplicationQueueManagerCache.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.services.notifications;
+
+import com.google.common.cache.*;
+import com.google.inject.Singleton;
+import org.apache.usergrid.persistence.EntityManager;
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
+import org.apache.usergrid.persistence.queue.QueueManager;
+import org.apache.usergrid.services.notifications.impl.ApplicationQueueManagerImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+
+
+@Singleton
+public class ApplicationQueueManagerCache{
+
+ private static final Logger logger = LoggerFactory.getLogger( ApplicationQueueManagerCache.class );
+
+
+ private final Cache<UUID, ApplicationQueueManager> cache;
+
+ private static final String CACHE_TTL_PROP = "usergrid.push.queuemanager.cache.time-to-live";
+ private static final String CACHE_MAX_SIZE_PROP = "usergrid.push.queuemanager.cache.size";
+
+ public ApplicationQueueManagerCache(){
+
+ // set a smaller ttl
+ long ttl = 10;
+ int configuredMaxSize;
+
+ try{
+ ttl = Integer.parseInt(System.getProperty(CACHE_TTL_PROP));
+ } catch (NumberFormatException e){
+ // already defaulted to 1 above
+ }
+
+ try{
+ configuredMaxSize = Integer.parseInt(System.getProperty(CACHE_MAX_SIZE_PROP));
+ } catch (NumberFormatException e){
+ configuredMaxSize = 200;
+ }
+
+ this.cache = CacheBuilder.newBuilder()
+ .maximumSize(Math.max(1000,configuredMaxSize))
+ .expireAfterWrite(ttl, TimeUnit.MINUTES)
+ .removalListener(new RemovalListener<UUID, ApplicationQueueManager>() {
+ @Override
+ public void onRemoval(
+ RemovalNotification<UUID, ApplicationQueueManager> queueManagerNotifiication) {
+ try {
+ if ( queueManagerNotifiication.getValue() != null) {
+ queueManagerNotifiication.getValue().stop();
+ }
+ } catch (Exception ie) {
+ logger.error("Failed to shutdown push queue manager from cache", ie.getMessage());
+ }
+ }
+ }).build();
+
+ }
+
+ public void put(UUID key, ApplicationQueueManager value){
+
+ cache.put(key, value);
+ }
+
+ public ConcurrentMap<UUID, ApplicationQueueManager> asMap(){
+
+ return cache.asMap();
+ }
+
+ public ApplicationQueueManager get(UUID key){
+ return cache.getIfPresent(key);
+ }
+
+ public void invalidate(UUID key){
+ cache.invalidate(key);
+ }
+
+ public void invalidateAll(){
+ cache.invalidateAll();
+ }
+
+
+ public ApplicationQueueManager getApplicationQueueManager( final EntityManager entityManager,
+ final QueueManager queueManager,
+ final JobScheduler jobScheduler,
+ final MetricsFactory metricsService,
+ final Properties properties ) {
+
+
+ ApplicationQueueManager manager = cache.getIfPresent(entityManager.getApplicationId());
+
+ if(manager != null){
+ if(logger.isTraceEnabled()){
+ logger.trace("Returning push queue manager from cache for application: {}", entityManager.getApplicationId());
+ }
+ return manager;
+
+ }else {
+ if(logger.isTraceEnabled()) {
+ logger.trace("Push queue manager not found in cache, loading for application: {}", entityManager.getApplicationId());
+ }
+ manager = new ApplicationQueueManagerImpl(
+ jobScheduler,
+ entityManager,
+ queueManager,
+ metricsService,
+ properties
+ );
+
+ cache.put(entityManager.getApplicationId(), manager);
+
+ return manager;
+
+
+ }
+
+
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/d16f4c17/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
index 65425d7..907638e 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
@@ -77,6 +77,7 @@ public class NotificationsService extends AbstractCollectionService {
private ServiceManagerFactory smf;
private EntityManagerFactory emf;
private QueueManagerFactory queueManagerFactory;
+ private ApplicationQueueManagerCache applicationQueueManagerCache;
public NotificationsService() {
if (logger.isTraceEnabled()) {
@@ -99,7 +100,10 @@ public class NotificationsService extends AbstractCollectionService {
QueueScope queueScope = new QueueScopeImpl( name, QueueScope.RegionImplementation.LOCAL);
queueManagerFactory = getApplicationContext().getBean( Injector.class ).getInstance(QueueManagerFactory.class);
QueueManager queueManager = queueManagerFactory.getQueueManager(queueScope);
- notificationQueueManager = new ApplicationQueueManagerImpl(jobScheduler,em,queueManager,metricsService,props);
+ applicationQueueManagerCache = getApplicationContext().getBean(Injector.class).getInstance(ApplicationQueueManagerCache.class);
+ notificationQueueManager = applicationQueueManagerCache
+ .getApplicationQueueManager(em,queueManager, jobScheduler, metricsService ,props);
+
gracePeriod = JobScheduler.SCHEDULER_GRACE_PERIOD;
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/d16f4c17/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
index 55d1491..478d5ed 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
@@ -57,6 +57,7 @@ public class QueueListener {
private ServiceManagerFactory smf;
private EntityManagerFactory emf;
+ private ApplicationQueueManagerCache applicationQueueManagerCache;
private Properties properties;
@@ -79,6 +80,8 @@ public class QueueListener {
this.emf = emf;
this.metricsService = smf.getApplicationContext().getBean( Injector.class ).getInstance(MetricsFactory.class);
this.properties = props;
+ this.applicationQueueManagerCache = smf.getApplicationContext().getBean(Injector.class).getInstance(ApplicationQueueManagerCache.class);
+
}
/**
@@ -86,8 +89,6 @@ public class QueueListener {
*/
public void start(){
//TODO refactor this into a central component that will start/stop services
-// boolean shouldRun = new Boolean(properties.getProperty("usergrid.notifications.listener.run", "false"));
-
if (logger.isDebugEnabled()) {
logger.debug("QueueListener: starting.");
@@ -166,9 +167,6 @@ public class QueueListener {
// run until there are no more active jobs
final AtomicLong runCount = new AtomicLong(0);
- //cache to retrieve push manager, cached per notifier, so many notifications will get same push manager
- LoadingCache<UUID, ApplicationQueueManager> queueManagerMap = getQueueManagerCache(queueManager);
-
while ( true ) {
Timer.Context timerContext = timer.time();
@@ -207,7 +205,16 @@ public class QueueListener {
//send each set of app ids together
for (Map.Entry<UUID, List<QueueMessage>> entry : messageMap.entrySet()) {
UUID applicationId = entry.getKey();
- ApplicationQueueManager manager = queueManagerMap.get(applicationId);
+
+ ApplicationQueueManager manager = applicationQueueManagerCache
+ .getApplicationQueueManager(
+ emf.getEntityManager(applicationId),
+ queueManager,
+ new JobScheduler(smf.getServiceManager(applicationId), emf.getEntityManager(applicationId)),
+ metricsService,
+ properties
+ );
+
if (logger.isTraceEnabled()) {
logger.trace("send batch for app {} of {} messages", entry.getKey(), entry.getValue().size());
}
@@ -238,7 +245,7 @@ public class QueueListener {
}
if(runCount.incrementAndGet() % consecutiveCallsToRemoveDevices == 0){
- for(ApplicationQueueManager applicationQueueManager : queueManagerMap.asMap().values()){
+ for(ApplicationQueueManager applicationQueueManager : applicationQueueManagerCache.asMap().values()){
try {
applicationQueueManager.asyncCheckForInactiveDevices();
}catch (Exception inactiveDeviceException){
@@ -280,43 +287,6 @@ public class QueueListener {
}
}
- private LoadingCache<UUID, ApplicationQueueManager> getQueueManagerCache(final QueueManager queueManager) {
- return CacheBuilder
- .newBuilder()
- .expireAfterAccess(10, TimeUnit.MINUTES)
- .removalListener(new RemovalListener<UUID, ApplicationQueueManager>() {
- @Override
- public void onRemoval(
- RemovalNotification<UUID, ApplicationQueueManager> queueManagerNotifiication) {
- try {
- queueManagerNotifiication.getValue().stop();
- } catch (Exception ie) {
- logger.error("Failed to shutdown from cache", ie);
- }
- }
- }).build(new CacheLoader<UUID, ApplicationQueueManager>() {
- @Override
- public ApplicationQueueManager load(final UUID applicationId) {
- try {
- EntityManager entityManager = emf.getEntityManager(applicationId);
- ServiceManager serviceManager = smf.getServiceManager(applicationId);
-
- ApplicationQueueManagerImpl manager = new ApplicationQueueManagerImpl(
- new JobScheduler(serviceManager, entityManager),
- entityManager,
- queueManager,
- metricsService,
- properties
- );
- return manager;
- } catch (Exception e) {
- logger.error("Could not instantiate queue manager", e);
- return null;
- }
- }
- });
- }
-
public void stop(){
if (logger.isDebugEnabled()) {
logger.debug("stop processes");
http://git-wip-us.apache.org/repos/asf/usergrid/blob/d16f4c17/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
index 80e8cbe..eb5d794 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java
@@ -141,7 +141,9 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
logger.trace("notification {} start query", notification.getUuid());
}
- logger.info("Notification {} started processing", notification.getUuid());
+ if(logger.isTraceEnabled()) {
+ logger.trace("Notification {} started processing", notification.getUuid());
+ }
@@ -366,7 +368,9 @@ public class ApplicationQueueManagerImpl implements ApplicationQueueManager {
notification.setProcessingFinished(System.currentTimeMillis());
notification.setDeviceProcessedCount(deviceCount.get());
em.update(notification);
- logger.info("Notification {} finished processing {} device(s)", notification.getUuid(), deviceCount.get());
+ if(logger.isTraceEnabled()) {
+ logger.trace("Notification {} finished processing {} device(s)", notification.getUuid(), deviceCount.get());
+ }
} catch (Exception e) {
logger.error("Unable to set processing finished timestamp for notification");