You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2014/10/16 16:19:56 UTC

svn commit: r1632319 - in /sling/trunk/bundles/extensions/event/src: main/java/org/apache/sling/event/impl/jobs/ main/java/org/apache/sling/event/impl/jobs/queues/ main/java/org/apache/sling/event/impl/jobs/topics/ main/java/org/apache/sling/event/impl...

Author: cziegeler
Date: Thu Oct 16 14:19:55 2014
New Revision: 1632319

URL: http://svn.apache.org/r1632319
Log:
SLING-4048 : Avoid keeping jobs in memory. Fix suspending/resume (WiP)

Modified:
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/TestLogger.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/QueueJobCache.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/TopicManager.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/TopologyCapabilities.java
    sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/OrderedQueueTest.java

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java?rev=1632319&r1=1632318&r2=1632319&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java Thu Oct 16 14:19:55 2014
@@ -97,7 +97,7 @@ public class JobManagerImpl
     implements JobManager, EventHandler, Runnable, TopologyAware {
 
     /** Default logger. */
-    private final Logger logger = new TestLogger(LoggerFactory.getLogger(this.getClass()));
+    private final Logger logger = LoggerFactory.getLogger(this.getClass());
 
     @Reference
     private TopologyHandler topologyHandler;
@@ -891,7 +891,7 @@ public class JobManagerImpl
      * Persist the job in the resource tree
      * @param jobTopic The required job topic
      * @param jobName The optional job name
-     * @param jobProperties The optional job properties
+     * @param passedJobProperties The optional job properties
      * @return The persisted job or <code>null</code>.
      */
     private Job addJobInteral(final String jobTopic,

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/TestLogger.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/TestLogger.java?rev=1632319&r1=1632318&r2=1632319&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/TestLogger.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/TestLogger.java Thu Oct 16 14:19:55 2014
@@ -5,7 +5,7 @@ import org.slf4j.Marker;
 
 public class TestLogger implements Logger {
 
-    private final boolean DEBUG = false;
+    private final boolean DEBUG = true;
 
     private final Logger logger;
 

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java?rev=1632319&r1=1632318&r2=1632319&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java Thu Oct 16 14:19:55 2014
@@ -35,7 +35,6 @@ import org.apache.sling.event.impl.jobs.
 import org.apache.sling.event.impl.jobs.JobExecutionResultImpl;
 import org.apache.sling.event.impl.jobs.JobHandler;
 import org.apache.sling.event.impl.jobs.JobImpl;
-import org.apache.sling.event.impl.jobs.TestLogger;
 import org.apache.sling.event.impl.jobs.Utility;
 import org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration;
 import org.apache.sling.event.impl.jobs.deprecated.JobStatusNotifier;
@@ -123,7 +122,7 @@ public abstract class AbstractJobQueue
         this.queueName = name;
         this.configuration = config;
         this.services = services;
-        this.logger = new TestLogger(LoggerFactory.getLogger(this.getClass().getName() + '.' + name));
+        this.logger = LoggerFactory.getLogger(this.getClass().getName() + '.' + name);
         this.running = true;
     }
 
@@ -307,7 +306,7 @@ public abstract class AbstractJobQueue
             }
 
             // if we're suspended we drop the current item
-            if ( this.running && info != null && !checkSuspended() ) {
+            if ( this.running && info != null && !checkSuspended(info) ) {
                 // if we still have a job and are running, let's go
                 this.start(info);
             }
@@ -321,10 +320,11 @@ public abstract class AbstractJobQueue
     /**
      * Check if the queue is suspended and go into suspend mode
      */
-    private boolean checkSuspended() {
+    private boolean checkSuspended(final JobHandler handler) {
         boolean wasSuspended = false;
         synchronized ( this.suspendLock ) {
             while ( this.suspendedSince != -1 ) {
+                this.services.topicManager.reschedule(handler);
                 logger.debug("Sleeping as queue {} is suspended.", this.getName());
                 wasSuspended = true;
                 final long diff = System.currentTimeMillis() - this.suspendedSince;

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java?rev=1632319&r1=1632318&r2=1632319&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java Thu Oct 16 14:19:55 2014
@@ -35,7 +35,6 @@ import org.apache.sling.commons.schedule
 import org.apache.sling.commons.threads.ThreadPoolManager;
 import org.apache.sling.event.impl.jobs.JobConsumerManager;
 import org.apache.sling.event.impl.jobs.JobManagerConfiguration;
-import org.apache.sling.event.impl.jobs.TestLogger;
 import org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration;
 import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager;
 import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager.QueueInfo;
@@ -66,7 +65,7 @@ public class QueueManager
     implements Runnable {
 
     /** Default logger. */
-    private final Logger logger = new TestLogger(LoggerFactory.getLogger(this.getClass()));
+    private final Logger logger = LoggerFactory.getLogger(this.getClass());
 
     @Reference
     private EventAdmin eventAdmin;

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/QueueJobCache.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/QueueJobCache.java?rev=1632319&r1=1632318&r2=1632319&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/QueueJobCache.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/QueueJobCache.java Thu Oct 16 14:19:55 2014
@@ -32,7 +32,6 @@ import org.apache.sling.event.impl.jobs.
 import org.apache.sling.event.impl.jobs.JobImpl;
 import org.apache.sling.event.impl.jobs.JobManagerConfiguration;
 import org.apache.sling.event.impl.jobs.JobTopicTraverser;
-import org.apache.sling.event.impl.jobs.TestLogger;
 import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager.QueueInfo;
 import org.apache.sling.event.jobs.QueueConfiguration.Type;
 import org.slf4j.Logger;
@@ -47,7 +46,7 @@ import org.slf4j.LoggerFactory;
 public class QueueJobCache {
 
     /** Logger. */
-    private final Logger logger = new TestLogger(LoggerFactory.getLogger(this.getClass()));
+    private final Logger logger = LoggerFactory.getLogger(this.getClass());
 
     /** The maximum of pre loaded jobs for a topic. */
     private final int maxPreloadLimit = 10;

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/TopicManager.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/TopicManager.java?rev=1632319&r1=1632318&r2=1632319&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/TopicManager.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/TopicManager.java Thu Oct 16 14:19:55 2014
@@ -41,7 +41,6 @@ import org.apache.sling.event.impl.jobs.
 import org.apache.sling.event.impl.jobs.JobManagerConfiguration;
 import org.apache.sling.event.impl.jobs.JobManagerImpl;
 import org.apache.sling.event.impl.jobs.JobTopicTraverser;
-import org.apache.sling.event.impl.jobs.TestLogger;
 import org.apache.sling.event.impl.jobs.Utility;
 import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager;
 import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager.QueueInfo;
@@ -70,7 +69,7 @@ import org.slf4j.LoggerFactory;
 public class TopicManager implements EventHandler, TopologyAware {
 
     /** Logger. */
-    private final Logger logger = new TestLogger(LoggerFactory.getLogger(this.getClass()));
+    private final Logger logger = LoggerFactory.getLogger(this.getClass());
 
     @Reference
     private JobManagerConfiguration configuration;

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/TopologyCapabilities.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/TopologyCapabilities.java?rev=1632319&r1=1632318&r2=1632319&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/TopologyCapabilities.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topology/TopologyCapabilities.java Thu Oct 16 14:19:55 2014
@@ -132,7 +132,7 @@ public class TopologyCapabilities {
         for(final InstanceDescription desc : view.getInstances() ) {
             final String topics = desc.getProperty(PROPERTY_TOPICS);
             if ( topics != null && topics.length() > 0 ) {
-                this.logger.info("Detected capabilities of {} : {}", desc.getSlingId(), topics);
+                this.logger.debug("Detected capabilities of {} : {}", desc.getSlingId(), topics);
                 for(final String topic : topics.split(",") ) {
                     List<InstanceDescription> list = newCaps.get(topic);
                     if ( list == null ) {

Modified: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/OrderedQueueTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/OrderedQueueTest.java?rev=1632319&r1=1632318&r2=1632319&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/OrderedQueueTest.java (original)
+++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/OrderedQueueTest.java Thu Oct 16 14:19:55 2014
@@ -96,7 +96,7 @@ public class OrderedQueueTest extends Ab
         final ServiceRegistration jcReg = this.registerJobConsumer("sling/orderedtest/*",
                 new JobConsumer() {
 
-                    private int lastCounter = -1;
+                    private volatile int lastCounter = -1;
 
                     @Override
                     public JobResult process(final Job job) {