You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2014/10/16 16:19:56 UTC
svn commit: r1632319 - in /sling/trunk/bundles/extensions/event/src:
main/java/org/apache/sling/event/impl/jobs/
main/java/org/apache/sling/event/impl/jobs/queues/
main/java/org/apache/sling/event/impl/jobs/topics/
main/java/org/apache/sling/event/impl...
Author: cziegeler
Date: Thu Oct 16 14:19:55 2014
New Revision: 1632319
URL: http://svn.apache.org/r1632319
Log:
SLING-4048 : Avoid keeping jobs in memory. Fix suspending/resume (WiP)
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/TestLogger.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/QueueJobCache.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/TopicManager.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/TopologyCapabilities.java
sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/OrderedQueueTest.java
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java?rev=1632319&r1=1632318&r2=1632319&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java Thu Oct 16 14:19:55 2014
@@ -97,7 +97,7 @@ public class JobManagerImpl
implements JobManager, EventHandler, Runnable, TopologyAware {
/** Default logger. */
- private final Logger logger = new TestLogger(LoggerFactory.getLogger(this.getClass()));
+ private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Reference
private TopologyHandler topologyHandler;
@@ -891,7 +891,7 @@ public class JobManagerImpl
* Persist the job in the resource tree
* @param jobTopic The required job topic
* @param jobName The optional job name
- * @param jobProperties The optional job properties
+ * @param passedJobProperties The optional job properties
* @return The persisted job or <code>null</code>.
*/
private Job addJobInteral(final String jobTopic,
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/TestLogger.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/TestLogger.java?rev=1632319&r1=1632318&r2=1632319&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/TestLogger.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/TestLogger.java Thu Oct 16 14:19:55 2014
@@ -5,7 +5,7 @@ import org.slf4j.Marker;
public class TestLogger implements Logger {
- private final boolean DEBUG = false;
+ private final boolean DEBUG = true;
private final Logger logger;
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java?rev=1632319&r1=1632318&r2=1632319&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java Thu Oct 16 14:19:55 2014
@@ -35,7 +35,6 @@ import org.apache.sling.event.impl.jobs.
import org.apache.sling.event.impl.jobs.JobExecutionResultImpl;
import org.apache.sling.event.impl.jobs.JobHandler;
import org.apache.sling.event.impl.jobs.JobImpl;
-import org.apache.sling.event.impl.jobs.TestLogger;
import org.apache.sling.event.impl.jobs.Utility;
import org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration;
import org.apache.sling.event.impl.jobs.deprecated.JobStatusNotifier;
@@ -123,7 +122,7 @@ public abstract class AbstractJobQueue
this.queueName = name;
this.configuration = config;
this.services = services;
- this.logger = new TestLogger(LoggerFactory.getLogger(this.getClass().getName() + '.' + name));
+ this.logger = LoggerFactory.getLogger(this.getClass().getName() + '.' + name);
this.running = true;
}
@@ -307,7 +306,7 @@ public abstract class AbstractJobQueue
}
// if we're suspended we drop the current item
- if ( this.running && info != null && !checkSuspended() ) {
+ if ( this.running && info != null && !checkSuspended(info) ) {
// if we still have a job and are running, let's go
this.start(info);
}
@@ -321,10 +320,11 @@ public abstract class AbstractJobQueue
/**
* Check if the queue is suspended and go into suspend mode
*/
- private boolean checkSuspended() {
+ private boolean checkSuspended(final JobHandler handler) {
boolean wasSuspended = false;
synchronized ( this.suspendLock ) {
while ( this.suspendedSince != -1 ) {
+ this.services.topicManager.reschedule(handler);
logger.debug("Sleeping as queue {} is suspended.", this.getName());
wasSuspended = true;
final long diff = System.currentTimeMillis() - this.suspendedSince;
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java?rev=1632319&r1=1632318&r2=1632319&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java Thu Oct 16 14:19:55 2014
@@ -35,7 +35,6 @@ import org.apache.sling.commons.schedule
import org.apache.sling.commons.threads.ThreadPoolManager;
import org.apache.sling.event.impl.jobs.JobConsumerManager;
import org.apache.sling.event.impl.jobs.JobManagerConfiguration;
-import org.apache.sling.event.impl.jobs.TestLogger;
import org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration;
import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager;
import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager.QueueInfo;
@@ -66,7 +65,7 @@ public class QueueManager
implements Runnable {
/** Default logger. */
- private final Logger logger = new TestLogger(LoggerFactory.getLogger(this.getClass()));
+ private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Reference
private EventAdmin eventAdmin;
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/QueueJobCache.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/QueueJobCache.java?rev=1632319&r1=1632318&r2=1632319&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/QueueJobCache.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/QueueJobCache.java Thu Oct 16 14:19:55 2014
@@ -32,7 +32,6 @@ import org.apache.sling.event.impl.jobs.
import org.apache.sling.event.impl.jobs.JobImpl;
import org.apache.sling.event.impl.jobs.JobManagerConfiguration;
import org.apache.sling.event.impl.jobs.JobTopicTraverser;
-import org.apache.sling.event.impl.jobs.TestLogger;
import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager.QueueInfo;
import org.apache.sling.event.jobs.QueueConfiguration.Type;
import org.slf4j.Logger;
@@ -47,7 +46,7 @@ import org.slf4j.LoggerFactory;
public class QueueJobCache {
/** Logger. */
- private final Logger logger = new TestLogger(LoggerFactory.getLogger(this.getClass()));
+ private final Logger logger = LoggerFactory.getLogger(this.getClass());
/** The maximum of pre loaded jobs for a topic. */
private final int maxPreloadLimit = 10;
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/TopicManager.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/TopicManager.java?rev=1632319&r1=1632318&r2=1632319&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/TopicManager.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/TopicManager.java Thu Oct 16 14:19:55 2014
@@ -41,7 +41,6 @@ import org.apache.sling.event.impl.jobs.
import org.apache.sling.event.impl.jobs.JobManagerConfiguration;
import org.apache.sling.event.impl.jobs.JobManagerImpl;
import org.apache.sling.event.impl.jobs.JobTopicTraverser;
-import org.apache.sling.event.impl.jobs.TestLogger;
import org.apache.sling.event.impl.jobs.Utility;
import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager;
import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager.QueueInfo;
@@ -70,7 +69,7 @@ import org.slf4j.LoggerFactory;
public class TopicManager implements EventHandler, TopologyAware {
/** Logger. */
- private final Logger logger = new TestLogger(LoggerFactory.getLogger(this.getClass()));
+ private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Reference
private JobManagerConfiguration configuration;
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/TopologyCapabilities.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/TopologyCapabilities.java?rev=1632319&r1=1632318&r2=1632319&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/TopologyCapabilities.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/TopologyCapabilities.java Thu Oct 16 14:19:55 2014
@@ -132,7 +132,7 @@ public class TopologyCapabilities {
for(final InstanceDescription desc : view.getInstances() ) {
final String topics = desc.getProperty(PROPERTY_TOPICS);
if ( topics != null && topics.length() > 0 ) {
- this.logger.info("Detected capabilities of {} : {}", desc.getSlingId(), topics);
+ this.logger.debug("Detected capabilities of {} : {}", desc.getSlingId(), topics);
for(final String topic : topics.split(",") ) {
List<InstanceDescription> list = newCaps.get(topic);
if ( list == null ) {
Modified: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/OrderedQueueTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/OrderedQueueTest.java?rev=1632319&r1=1632318&r2=1632319&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/OrderedQueueTest.java (original)
+++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/OrderedQueueTest.java Thu Oct 16 14:19:55 2014
@@ -96,7 +96,7 @@ public class OrderedQueueTest extends Ab
final ServiceRegistration jcReg = this.registerJobConsumer("sling/orderedtest/*",
new JobConsumer() {
- private int lastCounter = -1;
+ private volatile int lastCounter = -1;
@Override
public JobResult process(final Job job) {