You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2013/04/24 17:03:07 UTC

svn commit: r1471455 - in /sling/trunk/bundles/extensions/event/src/main: java/org/apache/sling/event/impl/jobs/config/ java/org/apache/sling/event/impl/jobs/queues/ java/org/apache/sling/event/jobs/ resources/OSGI-INF/metatype/

Author: cziegeler
Date: Wed Apr 24 15:03:06 2013
New Revision: 1471455

URL: http://svn.apache.org/r1471455
Log:
SLING-2829 :  Add API for starting a job and service interface for executing a job - first shot at an async processing api

Modified:
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/ConfigurationConstants.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/QueueConfiguration.java
    sling/trunk/bundles/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/ConfigurationConstants.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/ConfigurationConstants.java?rev=1471455&r1=1471454&r2=1471455&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/ConfigurationConstants.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/ConfigurationConstants.java Wed Apr 24 15:03:06 2013
@@ -38,4 +38,5 @@ public abstract class ConfigurationConst
     public static final String PROP_RETRIES = "queue.retries";
     public static final String PROP_RETRY_DELAY = "queue.retrydelay";
     public static final String PROP_PRIORITY = "queue.priority";
+    public static final String PROP_WAIT_FOR_ASYNC = "queue.waitforasync";
 }

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java?rev=1471455&r1=1471454&r2=1471455&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java Wed Apr 24 15:03:06 2013
@@ -63,7 +63,8 @@ import org.osgi.framework.Constants;
                      @PropertyOption(name="MIN",value="Min"),
                      @PropertyOption(name="MAX",value="Max")}),
     @Property(name=Constants.SERVICE_RANKING, intValue=0, propertyPrivate=false,
-              label="%queue.ranking.name", description="%queue.ranking.description")
+              label="%queue.ranking.name", description="%queue.ranking.description"),
+    @Property(name=ConfigurationConstants.PROP_WAIT_FOR_ASYNC, boolValue=true)
 })
 public class InternalQueueConfiguration
     implements QueueConfiguration, Comparable<InternalQueueConfiguration> {
@@ -98,6 +99,9 @@ public class InternalQueueConfiguration
     /** Valid flag. */
     private boolean valid = false;
 
+    /** Wait for async flag. */
+    private boolean waitForAsync = true;
+
     private String pid;
 
     /**
@@ -133,6 +137,7 @@ public class InternalQueueConfiguration
             this.topics = topicsParam;
         }
         this.serviceRanking = PropertiesUtil.toInteger(params.get(Constants.SERVICE_RANKING), 0);
+        this.waitForAsync = PropertiesUtil.toBoolean(params.get(ConfigurationConstants.PROP_WAIT_FOR_ASYNC), true);
         this.pid = (String)params.get(Constants.SERVICE_PID);
         this.valid = this.checkIsValid();
     }
@@ -269,6 +274,14 @@ public class InternalQueueConfiguration
         return null;
     }
 
+    /**
+     * @see org.apache.sling.event.jobs.QueueConfiguration#waitForAsyncJobConsumers()
+     */
+    @Override
+    public boolean waitForAsyncJobConsumers() {
+        return this.waitForAsync;
+    }
+
     @Override
     public String toString() {
         return "Queue-Configuration(" + this.hashCode() + ") : {" +

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java?rev=1471455&r1=1471454&r2=1471455&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java Wed Apr 24 15:03:06 2013
@@ -320,10 +320,12 @@ public abstract class AbstractJobQueue
     @Override
     public boolean finishedJob(final Event job, final boolean shouldReschedule) {
         final String location = (String)job.getProperty(JobUtil.JOB_ID);
-        return this.finishedJob(location, shouldReschedule ? JobResult.FAILED : JobResult.OK);
+        return this.finishedJob(location, shouldReschedule ? JobResult.FAILED : JobResult.OK, false);
     }
 
-    private boolean finishedJob(final String jobId, final JobConsumer.JobResult result) {
+    private boolean finishedJob(final String jobId,
+                                final JobConsumer.JobResult result,
+                                final boolean isAsync) {
         if ( this.logger.isDebugEnabled() ) {
             this.logger.debug("Received finish for job {}, result={}", jobId, result);
         }
@@ -341,9 +343,7 @@ public abstract class AbstractJobQueue
         }
 
         if ( !this.running ) {
-            if ( this.logger.isDebugEnabled() ) {
-                this.logger.debug("Queue is not running anymore. Discarding finish for {}", jobId);
-            }
+            this.logger.warn("Queue is not running anymore. Discarding finish for {}", jobId);
             return false;
         }
 
@@ -367,12 +367,23 @@ public abstract class AbstractJobQueue
             finishSuccessful = info.reschedule();
         }
 
-        if ( !finishSuccessful || !reschedule ) {
-            checkForNotify(null);
-            return false;
+        if ( !isAsync || this.configuration.waitForAsyncJobConsumers() ) {
+            if ( !finishSuccessful || !reschedule ) {
+                checkForNotify(null);
+                return false;
+            }
+            checkForNotify(info);
+            return true;
+        } else {
+            // async result
+            if ( finishSuccessful && reschedule ) {
+                final JobHandler reprocessHandler = this.reschedule(info);
+                if ( reprocessHandler != null ) {
+                    this.put(reprocessHandler);
+                }
+            }
+            return true;
         }
-        checkForNotify(info);
-        return true;
     }
 
     private void checkForNotify(final JobHandler info) {
@@ -464,7 +475,7 @@ public abstract class AbstractJobQueue
     }
 
     /**
-     * Process a job
+     * Execute a job
      */
     protected boolean executeJob(final JobHandler handler) {
         final JobImpl job = handler.getJob();
@@ -520,15 +531,15 @@ public abstract class AbstractJobQueue
                                 }
                                 JobConsumer.JobResult result = JobConsumer.JobResult.CANCEL;
                                 final Object asyncLock = new Object();
-                                final AtomicBoolean asnycDone = new AtomicBoolean(false);
+                                final AtomicBoolean asyncDone = new AtomicBoolean(false);
                                 final JobConsumer.AsyncHandler asyncHandler =
                                         new JobConsumer.AsyncHandler() {
 
                                             private void check(final JobConsumer.JobResult result) {
                                                 synchronized ( asyncLock ) {
-                                                    if ( !asnycDone.get() ) {
-                                                        asnycDone.set(true);
-                                                        finishedJob(job.getId(), result);
+                                                    if ( !asyncDone.get() ) {
+                                                        asyncDone.set(true);
+                                                        finishedJob(job.getId(), result, true);
                                                     } else {
                                                         throw new IllegalStateException("Job is already marked as processed");
                                                     }
@@ -539,7 +550,6 @@ public abstract class AbstractJobQueue
                                             @Override
                                             public void ok() {
                                                 this.check(JobConsumer.JobResult.OK);
-                                                finishedJob(job.getId(), JobConsumer.JobResult.OK);
                                             }
 
                                             @Override
@@ -564,18 +574,22 @@ public abstract class AbstractJobQueue
                                     currentThread.setPriority(oldPriority);
                                     currentThread.setName(oldName);
                                     if ( notifyResult && result != JobConsumer.JobResult.ASYNC ) {
-                                        finishedJob(job.getId(), result);
+                                        finishedJob(job.getId(), result, false);
                                     }
                                 }
                                 if ( result == JobConsumer.JobResult.ASYNC ) {
-                                    synchronized ( asyncLock ) {
-                                        while ( !asnycDone.get() ) {
-                                            try {
-                                                asyncLock.wait();
-                                            } catch (final InterruptedException e) {
-                                                ignoreException(e);
+                                    if ( configuration.waitForAsyncJobConsumers() ) {
+                                        synchronized ( asyncLock ) {
+                                            while ( !asyncDone.get() ) {
+                                                try {
+                                                    asyncLock.wait();
+                                                } catch (final InterruptedException e) {
+                                                    ignoreException(e);
+                                                }
                                             }
                                         }
+                                    } else {
+                                        notifyFinished(null);
                                     }
                                 }
                             }

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/QueueConfiguration.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/QueueConfiguration.java?rev=1471455&r1=1471454&r2=1471455&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/QueueConfiguration.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/QueueConfiguration.java Wed Apr 24 15:03:06 2013
@@ -80,4 +80,10 @@ public interface QueueConfiguration {
      * Get the ranking of this configuration.
      */
     int getRanking();
+
+    /**
+     * Wait for async job consumers
+     * @since 1.2
+     */
+    boolean waitForAsyncJobConsumers();
 }

Modified: sling/trunk/bundles/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties?rev=1471455&r1=1471454&r2=1471455&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties (original)
+++ sling/trunk/bundles/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties Wed Apr 24 15:03:06 2013
@@ -65,6 +65,10 @@ queue.ranking.name = Ranking
 queue.ranking.description = Integer value defining the ranking of this queue configuration. \
  If more than one queue matches a job topic, the one with the highest ranking is used.
 
+queue.waitforasync.name = Wait For Async
+queue.waitforasync.description = If a job consumer is processing a job asynchronously, this \
+ flag controls whether the queue waits for the consumer to finish before starting new jobs.
+
 #
 # Job Event Handler
 job.events.name = Apache Sling Job Default Queue