You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2013/04/25 11:02:45 UTC

svn commit: r1475680 - in /sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs: JobHandler.java queues/AbstractJobQueue.java queues/OrderedJobQueue.java

Author: cziegeler
Date: Thu Apr 25 09:02:44 2013
New Revision: 1475680

URL: http://svn.apache.org/r1475680
Log:
SLING-2829 :  Add API for starting a job and service interface for executing a job - first shot at an async processing api

Modified:
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java?rev=1475680&r1=1475679&r2=1475680&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java Thu Apr 25 09:02:44 2013
@@ -75,6 +75,6 @@ public class JobHandler {
 
     @Override
     public String toString() {
-        return "JobEvent(" + this.job.getId() + ")";
+        return "JobHandler(" + this.job.getId() + ")";
     }
 }
\ No newline at end of file

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java?rev=1475680&r1=1475679&r2=1475680&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java Thu Apr 25 09:02:44 2013
@@ -239,11 +239,7 @@ public abstract class AbstractJobQueue
      */
     @Override
     public boolean sendAcknowledge(final Event job) {
-        final String location = (String)job.getProperty(JobUtil.JOB_ID);
-        return this.sendAcknowledge(location);
-    }
-
-    private boolean sendAcknowledge(final String jobId) {
+        final String jobId = (String)job.getProperty(JobUtil.JOB_ID);
         final JobHandler ack;
         synchronized ( this.startedJobsLists ) {
             ack = this.startedJobsLists.remove(jobId);
@@ -397,6 +393,7 @@ public abstract class AbstractJobQueue
     protected boolean canBeMarkedForRemoval() {
         return this.isEmpty() && !this.isWaiting &&!this.isSuspended();
     }
+
     /**
      * Mark this queue for removal.
      */
@@ -488,24 +485,15 @@ public abstract class AbstractJobQueue
                 }
                 try {
                     handler.started = System.currentTimeMillis();
-                    // let's add the event to our processing list
-                    synchronized ( this.startedJobsLists ) {
-                        this.startedJobsLists.put(job.getId(), handler);
-                    }
 
                     if ( consumer != null ) {
-                        // first check for a notifier context to send an acknowledge
-                        boolean notify = true;
-                        if ( !this.sendAcknowledge(job.getId()) ) {
-                            // if we don't get an ack, someone else is already processing this job.
-                            // we process but do not notify the job event handler.
-                            logger.info("Someone else is already processing job {}.", Utility.toString(job));
-                            notify = false;
+                        final long queueTime = handler.started - handler.queued;
+                        this.addActive(queueTime);
+                        Utility.sendNotification(this.eventAdmin, JobUtil.TOPIC_JOB_STARTED, job, queueTime);
+                        synchronized ( this.processsingJobsLists ) {
+                            this.processsingJobsLists.put(job.getId(), handler);
                         }
 
-                        final JobUtil.JobPriority priority = job.getJobPriority();
-                        final boolean notifyResult = notify;
-
                         final Runnable task = new Runnable() {
 
                             /**
@@ -519,8 +507,8 @@ public abstract class AbstractJobQueue
                                 final int oldPriority = currentThread.getPriority();
 
                                 currentThread.setName(oldName + "-" + job.getQueueName() + "(" + job.getTopic() + ")");
-                                if ( priority != null ) {
-                                    switch ( priority ) {
+                                if ( job.getJobPriority() != null ) {
+                                    switch ( job.getJobPriority() ) {
                                         case NORM : currentThread.setPriority(Thread.NORM_PRIORITY);
                                                     break;
                                         case MIN  : currentThread.setPriority(Thread.MIN_PRIORITY);
@@ -573,7 +561,7 @@ public abstract class AbstractJobQueue
                                     job.setProperty(JobConsumer.PROPERTY_JOB_ASYNC_HANDLER, null);
                                     currentThread.setPriority(oldPriority);
                                     currentThread.setName(oldName);
-                                    if ( notifyResult && result != JobConsumer.JobResult.ASYNC ) {
+                                    if ( result != JobConsumer.JobResult.ASYNC ) {
                                         finishedJob(job.getId(), result, false);
                                     }
                                 }
@@ -607,6 +595,10 @@ public abstract class AbstractJobQueue
                         }
 
                     } else {
+                        // let's add the event to our processing list
+                        synchronized ( this.startedJobsLists ) {
+                            this.startedJobsLists.put(job.getId(), handler);
+                        }
                         final Event jobEvent = this.getJobEvent(handler);
                         // we need async delivery, otherwise we might create a deadlock
                         // as this method runs inside a synchronized block and the finishedJob
@@ -674,11 +666,6 @@ public abstract class AbstractJobQueue
     }
 
     /**
-     * Reschedule a job.
-     */
-    protected abstract JobHandler reschedule(final JobHandler info);
-
-    /**
      * @see org.apache.sling.event.jobs.Queue#getStatistics()
      */
     @Override
@@ -771,6 +758,11 @@ public abstract class AbstractJobQueue
     }
 
     /**
+     * Reschedule a job.
+     */
+    protected abstract JobHandler reschedule(final JobHandler info);
+
+    /**
      * Put another job into the queue.
      */
     protected abstract void put(final JobHandler event);

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java?rev=1475680&r1=1475679&r2=1475680&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/OrderedJobQueue.java Thu Apr 25 09:02:44 2013
@@ -20,9 +20,11 @@ package org.apache.sling.event.impl.jobs
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Comparator;
+import java.util.Iterator;
 import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.Set;
+import java.util.TreeSet;
 
 import org.apache.sling.event.impl.jobs.JobConsumerManager;
 import org.apache.sling.event.impl.jobs.JobHandler;
@@ -38,14 +40,20 @@ import org.osgi.service.event.EventAdmin
  */
 public final class OrderedJobQueue extends AbstractJobQueue {
 
-    /** The job event for rescheduling. */
-    private volatile JobHandler jobEvent;
+    /** The job handler for rescheduling. */
+    private volatile JobHandler jobHandler;
 
     /** Lock and status object for handling the sleep phase. */
     private final SleepLock sleepLock = new SleepLock();
 
-    /** The queue. */
-    private final BlockingQueue<JobHandler> queue = new LinkedBlockingQueue<JobHandler>();
+    /** The queue - we use a set which is sorted by job creation date. */
+    private final Set<JobHandler> queue = new TreeSet<JobHandler>(new Comparator<JobHandler>() {
+
+        @Override
+        public int compare(final JobHandler o1, final JobHandler o2) {
+            return o1.getJob().getCreated().compareTo(o2.getJob().getCreated());
+        }
+    });
 
     private final Object syncLock = new Object();
 
@@ -62,12 +70,12 @@ public final class OrderedJobQueue exten
     }
 
     @Override
-    protected JobHandler start(final JobHandler processInfo) {
-        JobHandler rescheduleInfo = null;
+    protected JobHandler start(final JobHandler handler) {
+        JobHandler rescheduleHandler = null;
 
         // if we are ordered we simply wait for the finish
         synchronized ( this.syncLock ) {
-            if ( this.executeJob(processInfo) ) {
+            if ( this.executeJob(handler) ) {
                 this.isWaiting = true;
                 this.logger.debug("Job queue {} is waiting for finish.", this.queueName);
                 while ( this.isWaiting ) {
@@ -78,18 +86,18 @@ public final class OrderedJobQueue exten
                     }
                 }
                 this.logger.debug("Job queue {} is continuing.", this.queueName);
-                rescheduleInfo = this.jobEvent;
-                this.jobEvent = null;
+                rescheduleHandler = this.jobHandler;
+                this.jobHandler = null;
             }
         }
-        return rescheduleInfo;
+        return rescheduleHandler;
     }
 
     private void wakeUp(final boolean discardJob) {
         synchronized ( this.sleepLock ) {
             if ( this.sleepLock.sleepingSince != -1 ) {
                 if ( discardJob ) {
-                    this.sleepLock.jobEvent = null;
+                    this.sleepLock.jobHandler = null;
                 }
                 this.sleepLock.notify();
             }
@@ -103,34 +111,41 @@ public final class OrderedJobQueue exten
     }
 
     @Override
-    protected void put(final JobHandler event) {
-        try {
-            this.queue.put(event);
-        } catch (final InterruptedException e) {
-            // this should never happen
-            this.ignoreException(e);
+    protected void put(final JobHandler handler) {
+        synchronized ( this.queue ) {
+            this.queue.add(handler);
+            this.queue.notify();
         }
     }
 
     @Override
     protected JobHandler take() {
-        try {
-            return this.queue.take();
-        } catch (final InterruptedException e) {
-            // this should never happen
-            this.ignoreException(e);
+        synchronized ( this.queue ) {
+            while ( this.queue.isEmpty() ) {
+                try {
+                    this.queue.wait();
+                } catch (final InterruptedException e) {
+                    this.ignoreException(e);
+                }
+            }
+            // get the first element and remove it
+            final Iterator<JobHandler> i = this.queue.iterator();
+            final JobHandler result = i.next();
+            i.remove();
+            return result;
         }
-        return null;
     }
 
     @Override
     protected boolean isEmpty() {
-        return this.queue.isEmpty();
+        synchronized ( this.queue ) {
+            return this.queue.isEmpty();
+        }
     }
 
     @Override
-    protected void notifyFinished(final JobHandler rescheduleInfo) {
-        this.jobEvent = rescheduleInfo;
+    protected void notifyFinished(final JobHandler rescheduleHandler) {
+        this.jobHandler = rescheduleHandler;
         this.logger.debug("Notifying job queue {} to continue processing.", this.queueName);
         synchronized ( this.syncLock ) {
             this.isWaiting = false;
@@ -139,17 +154,17 @@ public final class OrderedJobQueue exten
     }
 
     @Override
-    protected JobHandler reschedule(final JobHandler info) {
+    protected JobHandler reschedule(final JobHandler handler) {
         // we just sleep for the delay time - if none, we continue and retry
         // this job again
         long delay = this.configuration.getRetryDelayInMs();
-        if ( info.getJob().getProperty(Job.PROPERTY_JOB_RETRY_DELAY) != null ) {
-            delay = info.getJob().getProperty(Job.PROPERTY_JOB_RETRY_DELAY, Long.class);
+        if ( handler.getJob().getProperty(Job.PROPERTY_JOB_RETRY_DELAY) != null ) {
+            delay = handler.getJob().getProperty(Job.PROPERTY_JOB_RETRY_DELAY, Long.class);
         }
         if ( delay > 0 ) {
             synchronized ( this.sleepLock ) {
                 this.sleepLock.sleepingSince = System.currentTimeMillis();
-                this.sleepLock.jobEvent = info;
+                this.sleepLock.jobHandler = handler;
                 this.logger.debug("Job queue {} is sleeping for {}ms.", this.queueName, delay);
                 try {
                     this.sleepLock.wait(delay);
@@ -157,16 +172,16 @@ public final class OrderedJobQueue exten
                     this.ignoreException(e);
                 }
                 this.sleepLock.sleepingSince = -1;
-                final JobHandler result = this.sleepLock.jobEvent;
-                this.sleepLock.jobEvent = null;
+                final JobHandler result = this.sleepLock.jobHandler;
+                this.sleepLock.jobHandler = null;
 
                 if ( result == null ) {
-                    info.remove();
+                    handler.remove();
                 }
                 return result;
             }
         }
-        return info;
+        return handler;
     }
 
     /**
@@ -174,7 +189,9 @@ public final class OrderedJobQueue exten
      */
     @Override
     public void clear() {
-        this.queue.clear();
+        synchronized ( this.queue ) {
+            this.queue.clear();
+        }
         super.clear();
     }
 
@@ -182,14 +199,17 @@ public final class OrderedJobQueue exten
     public synchronized void removeAll() {
         // remove all remaining jobs first
         super.removeAll();
-        this.jobEvent = null;
+        this.jobHandler = null;
         this.wakeUp(true);
     }
 
     @Override
     protected Collection<JobHandler> removeAllJobs() {
         final List<JobHandler> events = new ArrayList<JobHandler>();
-        this.queue.drainTo(events);
+        synchronized ( this.queue ) {
+            events.addAll(this.queue);
+            this.queue.clear();
+        }
         return events;
     }
 
@@ -207,7 +227,7 @@ public final class OrderedJobQueue exten
         public volatile long sleepingSince = -1;
 
         /** The job event to be returned after sleeping. */
-        public volatile JobHandler jobEvent;
+        public volatile JobHandler jobHandler;
     }
 }