You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2014/10/16 15:00:28 UTC

svn commit: r1632292 - in /sling/trunk/bundles/extensions/event/src: main/java/org/apache/sling/event/impl/jobs/ main/java/org/apache/sling/event/impl/jobs/queues/ main/java/org/apache/sling/event/impl/jobs/topics/ test/java/org/apache/sling/event/it/

Author: cziegeler
Date: Thu Oct 16 13:00:28 2014
New Revision: 1632292

URL: http://svn.apache.org/r1632292
Log:
SLING-4048 : Avoid keeping jobs in memory. Fix job removal (WiP)

Modified:
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/MaintenanceTask.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/ParallelJobQueue.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/TopicManager.java
    sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/JobHandlingTest.java
    sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/RoundRobinQueueTest.java

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java?rev=1632292&r1=1632291&r2=1632292&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java Thu Oct 16 13:00:28 2014
@@ -107,4 +107,13 @@ public class JobHandler {
     public String toString() {
         return "JobHandler(" + this.job.getId() + ")";
     }
+
+    public void addToRetryList() {
+        this.jobManager.addJobToRetryList(this.job);
+
+    }
+
+    public void removeFromRetryList() {
+        this.jobManager.removeJobFromRetryList(this.job);
+    }
 }
\ No newline at end of file

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java?rev=1632292&r1=1632291&r2=1632292&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java Thu Oct 16 13:00:28 2014
@@ -46,7 +46,6 @@ import org.apache.sling.api.resource.Val
 import org.apache.sling.commons.scheduler.Scheduler;
 import org.apache.sling.commons.threads.ThreadPoolManager;
 import org.apache.sling.event.EventUtil;
-import org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration;
 import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager;
 import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager.QueueInfo;
 import org.apache.sling.event.impl.jobs.notifications.NotificationUtility;
@@ -379,12 +378,20 @@ public class JobManagerImpl
     private boolean internalRemoveJobById(final String jobId, final boolean forceRemove) {
         logger.debug("Trying to remove job {}", jobId);
         boolean result = true;
-        final JobImpl job = (JobImpl)this.getJobById(jobId);
+        JobImpl job = (JobImpl)this.getJobById(jobId);
         if ( job != null ) {
-            logger.debug("Found removal job: {}", job);
+            if ( logger.isDebugEnabled() ) {
+                logger.debug("Found removal job: {}", Utility.toString(job));
+            }
+            final JobImpl retryJob = this.getJobFromRetryList(jobId);
+            if ( retryJob != null ) {
+                job = retryJob;
+            }
             // currently running?
             if ( !forceRemove && job.getProcessingStarted() != null ) {
-                logger.debug("Unable to remove job - job is started: {}", job);
+                if ( logger.isDebugEnabled() ) {
+                    logger.debug("Unable to remove job - job is started: {}", Utility.toString(job));
+                }
                 result = false;
             } else {
                 final boolean isHistoryJob = this.configuration.isStoragePath(job.getResourcePath());
@@ -506,7 +513,7 @@ public class JobManagerImpl
                     final JobImpl job = Utility.readJob(logger, jobResource);
                     if ( job != null ) {
                         if ( logger.isDebugEnabled() ) {
-                            logger.debug("Found job with id {} = {}", id, job);
+                            logger.debug("Found job with id {} = {}", id, Utility.toString(job));
                         }
                         return job;
                     }
@@ -987,15 +994,49 @@ public class JobManagerImpl
         return new JobImpl(jobTopic, jobName, jobId, properties);
     }
 
+    /**
+     * Reassign a job to a new instance
+     * @param job The job to reassign.
+     */
     public void reassign(final JobImpl job) {
         final QueueInfo queueInfo = queueManager.getQueueInfo(job.getTopic());
-        final InternalQueueConfiguration config = queueInfo.queueConfiguration;
 
         // Sanity check if queue configuration has changed
-        String targetId = null;
         final TopologyCapabilities caps = this.topologyCapabilities;
-        targetId = (caps == null ? null : caps.detectTarget(job.getTopic(), job.getProperties(), queueInfo));
-        this.maintenanceTask.reassignJob(job, targetId);
+        final String targetId = (caps == null ? null : caps.detectTarget(job.getTopic(), job.getProperties(), queueInfo));
+
+        final ResourceResolver resolver = this.configuration.createResourceResolver();
+        try {
+            final Resource jobResource = resolver.getResource(job.getResourcePath());
+            if ( jobResource != null ) {
+                try {
+                    final ValueMap vm = ResourceHelper.getValueMap(jobResource);
+                    final String newPath = this.configuration.getUniquePath(targetId, job.getTopic(), job.getId(), job.getProperties());
+
+                    final Map<String, Object> props = new HashMap<String, Object>(vm);
+                    props.remove(Job.PROPERTY_JOB_QUEUE_NAME);
+                    if ( targetId == null ) {
+                        props.remove(Job.PROPERTY_JOB_TARGET_INSTANCE);
+                    } else {
+                        props.put(Job.PROPERTY_JOB_TARGET_INSTANCE, targetId);
+                    }
+                    props.remove(Job.PROPERTY_JOB_STARTED_TIME);
+
+                    try {
+                        ResourceHelper.getOrCreateResource(resolver, newPath, props);
+                        resolver.delete(jobResource);
+                        resolver.commit();
+                    } catch ( final PersistenceException pe ) {
+                        this.ignoreException(pe);
+                    }
+                } catch (final InstantiationException ie) {
+                    // something happened with the resource in the meantime
+                    this.ignoreException(ie);
+                }
+            }
+        } finally {
+            resolver.close();
+        }
     }
 
     /**
@@ -1175,4 +1216,29 @@ public class JobManagerImpl
         }
         return null;
     }
+
+    private final List<JobImpl> retryList = new ArrayList<JobImpl>();
+
+    public void addJobToRetryList(final JobImpl job) {
+        synchronized ( retryList ) {
+            retryList.add(job);
+        }
+    }
+
+    public void removeJobFromRetryList(final JobImpl job) {
+        synchronized ( retryList ) {
+            retryList.remove(job);
+        }
+    }
+
+    private JobImpl getJobFromRetryList(final String jobId) {
+        synchronized ( retryList ) {
+            for(final JobImpl j : retryList) {
+                if ( jobId.equals(j.getId())) {
+                    return j;
+                }
+            }
+        }
+        return null;
+    }
 }

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/MaintenanceTask.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/MaintenanceTask.java?rev=1632292&r1=1632291&r2=1632292&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/MaintenanceTask.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/MaintenanceTask.java Thu Oct 16 13:00:28 2014
@@ -20,10 +20,8 @@ package org.apache.sling.event.impl.jobs
 
 import java.util.ArrayList;
 import java.util.Calendar;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
 
 import org.apache.sling.api.resource.PersistenceException;
 import org.apache.sling.api.resource.Resource;
@@ -32,8 +30,6 @@ import org.apache.sling.api.resource.Res
 import org.apache.sling.api.resource.ValueMap;
 import org.apache.sling.event.impl.jobs.topology.TopologyCapabilities;
 import org.apache.sling.event.impl.support.BatchResourceRemover;
-import org.apache.sling.event.impl.support.ResourceHelper;
-import org.apache.sling.event.jobs.Job;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -129,37 +125,6 @@ public class MaintenanceTask {
                     this.ignoreException(pe);
                     resolver.refresh();
                 }
-
-/* Old implementation using a query
-                final StringBuilder buf = new StringBuilder(64);
-
-                buf.append("//element(*)[@");
-                buf.append(ISO9075.encode(ResourceResolver.PROPERTY_RESOURCE_TYPE));
-                buf.append(" = '");
-                buf.append(Utility.RESOURCE_TYPE_LOCK);
-                buf.append("' and @");
-                buf.append(ISO9075.encode(Utility.PROPERTY_LOCK_CREATED));
-                buf.append(" < xs:dateTime('");
-                buf.append(ISO8601.format(startDate));
-                buf.append("')]");
-                final Iterator<Resource> result = resolver.findResources(buf.toString(), "xpath");
-
-                while ( caps.isActive() && result.hasNext() ) {
-                    final Resource lockResource = result.next();
-                    // sanity check for the path
-                    if ( this.configuration.isLock(lockResource.getPath()) ) {
-                        try {
-                            resolver.delete(lockResource);
-                            resolver.commit();
-                        } catch ( final PersistenceException pe) {
-                            this.ignoreException(pe);
-                            resolver.refresh();
-                        }
-                    }
-                }
-            } catch (final QuerySyntaxException qse) {
-                this.ignoreException(qse);
-*/
             } finally {
                 resolver.close();
             }
@@ -348,46 +313,6 @@ public class MaintenanceTask {
     }
 
     /**
-     * Reassign a job to a different target
-     * @param job The job
-     * @param targetId New target or <code>null</code> if unknown
-     */
-    public void reassignJob(final JobImpl job, final String targetId) {
-        final ResourceResolver resolver = this.configuration.createResourceResolver();
-        try {
-            final Resource jobResource = resolver.getResource(job.getResourcePath());
-            if ( jobResource != null ) {
-                try {
-                    final ValueMap vm = ResourceHelper.getValueMap(jobResource);
-                    final String newPath = this.configuration.getUniquePath(targetId, job.getTopic(), job.getId(), job.getProperties());
-
-                    final Map<String, Object> props = new HashMap<String, Object>(vm);
-                    props.remove(Job.PROPERTY_JOB_QUEUE_NAME);
-                    if ( targetId == null ) {
-                        props.remove(Job.PROPERTY_JOB_TARGET_INSTANCE);
-                    } else {
-                        props.put(Job.PROPERTY_JOB_TARGET_INSTANCE, targetId);
-                    }
-                    props.remove(Job.PROPERTY_JOB_STARTED_TIME);
-
-                    try {
-                        ResourceHelper.getOrCreateResource(resolver, newPath, props);
-                        resolver.delete(jobResource);
-                        resolver.commit();
-                    } catch ( final PersistenceException pe ) {
-                        this.ignoreException(pe);
-                    }
-                } catch (final InstantiationException ie) {
-                    // something happened with the resource in the meantime
-                    this.ignoreException(ie);
-                }
-            }
-        } finally {
-            resolver.close();
-        }
-    }
-
-    /**
      * Helper method which just logs the exception in debug mode.
      * @param e
      */

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java?rev=1632292&r1=1632291&r2=1632292&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java Thu Oct 16 13:00:28 2014
@@ -286,7 +286,7 @@ public abstract class AbstractJobQueue
                 if ( process ) {
                     if ( handler.reschedule() ) {
                         this.logger.info("No acknowledge received for job {} stored at {}. Requeueing job.", Utility.toString(handler.getJob()), handler.getJob().getId());
-                        this.reschedule(handler);
+                        handler.getJob().retry();
                         this.services.topicManager.reschedule(handler);
                         this.notifyFinished(true);
                     }
@@ -607,7 +607,7 @@ public abstract class AbstractJobQueue
                     NotificationUtility.sendNotification(this.services.eventAdmin, NotificationConstants.TOPIC_JOB_CANCELLED, handler.getJob(), null);
                 } else {
                     info.reschedule = true;
-                    this.reschedule(handler);
+                    handler.getJob().retry();
                     if ( this.logger.isDebugEnabled() ) {
                         this.logger.debug("Failed job {}", Utility.toString(handler.getJob()));
                     }
@@ -680,7 +680,7 @@ public abstract class AbstractJobQueue
             final boolean keepJobs = resultState != Job.JobState.SUCCEEDED || this.configuration.isKeepJobs();
             handler.finished(resultState, keepJobs, rescheduleInfo.processingTime);
         } else {
-            this.services.topicManager.reschedule(handler);
+            this.reschedule(handler);
         }
         this.notifyFinished(rescheduleInfo.reschedule);
 
@@ -820,8 +820,7 @@ public abstract class AbstractJobQueue
     }
 
     protected void reschedule(final JobHandler handler) {
-        // update event with retry count and retries
-        handler.getJob().retry();
+        this.services.topicManager.reschedule(handler);
     }
 
     /**

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/ParallelJobQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/ParallelJobQueue.java?rev=1632292&r1=1632291&r2=1632292&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/ParallelJobQueue.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/ParallelJobQueue.java Thu Oct 16 13:00:28 2014
@@ -116,6 +116,7 @@ public final class ParallelJobQueue exte
         // this job again
         final long delay = this.getRetryDelay(handler);
         if ( delay > 0 ) {
+            handler.addToRetryList();
             final Date fireDate = new Date();
             fireDate.setTime(System.currentTimeMillis() + delay);
 
@@ -123,6 +124,7 @@ public final class ParallelJobQueue exte
             final Runnable t = new Runnable() {
                 @Override
                 public void run() {
+                    handler.removeFromRetryList();
                     ParallelJobQueue.super.reschedule(handler);
                 }
             };

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/TopicManager.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/TopicManager.java?rev=1632292&r1=1632291&r2=1632292&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/TopicManager.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/topics/TopicManager.java Thu Oct 16 13:00:28 2014
@@ -249,7 +249,7 @@ public class TopicManager implements Eve
             this.queueLocks.remove(queueName);
         }
         if ( logger.isDebugEnabled() ) {
-            logger.debug("Took new job for {} : {}", queueName, Utility.toString(result));
+            logger.debug("Returning job for {} : {}", queueName, Utility.toString(result));
         }
         return (result != null ? new JobHandler( result, (JobManagerImpl)this.jobManager) : null);
     }

Modified: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/JobHandlingTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/JobHandlingTest.java?rev=1632292&r1=1632291&r2=1632292&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/JobHandlingTest.java (original)
+++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/JobHandlingTest.java Thu Oct 16 13:00:28 2014
@@ -230,9 +230,9 @@ public class JobHandlingTest extends Abs
             jobManager.addJob(TOPIC, "myid2", null);
             cb.block();
 
-            assertEquals(1, jobManager.findJobs(JobManager.QueryType.ALL, "sling/test", -1, (Map<String, Object>[])null).size());
+            assertEquals(1, jobManager.findJobs(JobManager.QueryType.ALL, TOPIC, -1, (Map<String, Object>[])null).size());
             // job is currently waiting, therefore cancel fails
-            final Event e1 = jobManager.findJob("sling/test", Collections.singletonMap(JobUtil.PROPERTY_JOB_NAME, (Object)"myid2"));
+            final Event e1 = jobManager.findJob(TOPIC, Collections.singletonMap(JobUtil.PROPERTY_JOB_NAME, (Object)"myid2"));
             assertNotNull(e1);
             assertFalse(jobManager.removeJob((String)e1.getProperty(ResourceHelper.PROPERTY_JOB_ID)));
             cb2.block(); // and continue job
@@ -240,7 +240,7 @@ public class JobHandlingTest extends Abs
             sleep(200);
 
             // the job is now in the queue again
-            final Event e2 = jobManager.findJob("sling/test", Collections.singletonMap(JobUtil.PROPERTY_JOB_NAME, (Object)"myid2"));
+            final Event e2 = jobManager.findJob(TOPIC, Collections.singletonMap(JobUtil.PROPERTY_JOB_NAME, (Object)"myid2"));
             assertNotNull(e2);
             assertTrue(jobManager.removeJob((String)e2.getProperty(ResourceHelper.PROPERTY_JOB_ID)));
             assertEquals(0, jobManager.findJobs(JobManager.QueryType.ALL, "sling/test", -1, (Map<String, Object>[])null).size());

Modified: sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/RoundRobinQueueTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/RoundRobinQueueTest.java?rev=1632292&r1=1632291&r2=1632292&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/RoundRobinQueueTest.java (original)
+++ sling/trunk/bundles/extensions/event/src/test/java/org/apache/sling/event/it/RoundRobinQueueTest.java Thu Oct 16 13:00:28 2014
@@ -45,7 +45,6 @@ import org.ops4j.pax.exam.spi.reactors.P
 import org.osgi.framework.ServiceRegistration;
 import org.osgi.service.event.Event;
 import org.osgi.service.event.EventHandler;
-import org.slf4j.LoggerFactory;
 
 @RunWith(PaxExam.class)
 @ExamReactorStrategy(PerMethod.class)
@@ -123,7 +122,6 @@ public class RoundRobinQueueTest extends
 
                     @Override
                     public void handleEvent(final Event event) {
-                        LoggerFactory.getLogger("test").info("Received finished event {}", event.getProperty(NotificationConstants.NOTIFICATION_PROPERTY_JOB_ID));
                         count.incrementAndGet();
                     }
                 });