You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2013/04/24 17:03:07 UTC
svn commit: r1471455 - in /sling/trunk/bundles/extensions/event/src/main:
java/org/apache/sling/event/impl/jobs/config/
java/org/apache/sling/event/impl/jobs/queues/
java/org/apache/sling/event/jobs/ resources/OSGI-INF/metatype/
Author: cziegeler
Date: Wed Apr 24 15:03:06 2013
New Revision: 1471455
URL: http://svn.apache.org/r1471455
Log:
SLING-2829 : Add API for starting a job and service interface for executing a job - first shot at an async processing api
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/ConfigurationConstants.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/QueueConfiguration.java
sling/trunk/bundles/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/ConfigurationConstants.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/ConfigurationConstants.java?rev=1471455&r1=1471454&r2=1471455&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/ConfigurationConstants.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/ConfigurationConstants.java Wed Apr 24 15:03:06 2013
@@ -38,4 +38,5 @@ public abstract class ConfigurationConst
public static final String PROP_RETRIES = "queue.retries";
public static final String PROP_RETRY_DELAY = "queue.retrydelay";
public static final String PROP_PRIORITY = "queue.priority";
+ public static final String PROP_WAIT_FOR_ASYNC = "queue.waitforasync";
}
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java?rev=1471455&r1=1471454&r2=1471455&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/config/InternalQueueConfiguration.java Wed Apr 24 15:03:06 2013
@@ -63,7 +63,8 @@ import org.osgi.framework.Constants;
@PropertyOption(name="MIN",value="Min"),
@PropertyOption(name="MAX",value="Max")}),
@Property(name=Constants.SERVICE_RANKING, intValue=0, propertyPrivate=false,
- label="%queue.ranking.name", description="%queue.ranking.description")
+ label="%queue.ranking.name", description="%queue.ranking.description"),
+ @Property(name=ConfigurationConstants.PROP_WAIT_FOR_ASYNC, boolValue=true)
})
public class InternalQueueConfiguration
implements QueueConfiguration, Comparable<InternalQueueConfiguration> {
@@ -98,6 +99,9 @@ public class InternalQueueConfiguration
/** Valid flag. */
private boolean valid = false;
+ /** Wait for async flag. */
+ private boolean waitForAsync = true;
+
private String pid;
/**
@@ -133,6 +137,7 @@ public class InternalQueueConfiguration
this.topics = topicsParam;
}
this.serviceRanking = PropertiesUtil.toInteger(params.get(Constants.SERVICE_RANKING), 0);
+ this.waitForAsync = PropertiesUtil.toBoolean(params.get(ConfigurationConstants.PROP_WAIT_FOR_ASYNC), true);
this.pid = (String)params.get(Constants.SERVICE_PID);
this.valid = this.checkIsValid();
}
@@ -269,6 +274,14 @@ public class InternalQueueConfiguration
return null;
}
+ /**
+ * @see org.apache.sling.event.jobs.QueueConfiguration#waitForAsyncJobConsumers()
+ */
+ @Override
+ public boolean waitForAsyncJobConsumers() {
+ return this.waitForAsync;
+ }
+
@Override
public String toString() {
return "Queue-Configuration(" + this.hashCode() + ") : {" +
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java?rev=1471455&r1=1471454&r2=1471455&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java Wed Apr 24 15:03:06 2013
@@ -320,10 +320,12 @@ public abstract class AbstractJobQueue
@Override
public boolean finishedJob(final Event job, final boolean shouldReschedule) {
final String location = (String)job.getProperty(JobUtil.JOB_ID);
- return this.finishedJob(location, shouldReschedule ? JobResult.FAILED : JobResult.OK);
+ return this.finishedJob(location, shouldReschedule ? JobResult.FAILED : JobResult.OK, false);
}
- private boolean finishedJob(final String jobId, final JobConsumer.JobResult result) {
+ private boolean finishedJob(final String jobId,
+ final JobConsumer.JobResult result,
+ final boolean isAsync) {
if ( this.logger.isDebugEnabled() ) {
this.logger.debug("Received finish for job {}, result={}", jobId, result);
}
@@ -341,9 +343,7 @@ public abstract class AbstractJobQueue
}
if ( !this.running ) {
- if ( this.logger.isDebugEnabled() ) {
- this.logger.debug("Queue is not running anymore. Discarding finish for {}", jobId);
- }
+ this.logger.warn("Queue is not running anymore. Discarding finish for {}", jobId);
return false;
}
@@ -367,12 +367,23 @@ public abstract class AbstractJobQueue
finishSuccessful = info.reschedule();
}
- if ( !finishSuccessful || !reschedule ) {
- checkForNotify(null);
- return false;
+ if ( !isAsync || this.configuration.waitForAsyncJobConsumers() ) {
+ if ( !finishSuccessful || !reschedule ) {
+ checkForNotify(null);
+ return false;
+ }
+ checkForNotify(info);
+ return true;
+ } else {
+ // async result
+ if ( finishSuccessful && reschedule ) {
+ final JobHandler reprocessHandler = this.reschedule(info);
+ if ( reprocessHandler != null ) {
+ this.put(reprocessHandler);
+ }
+ }
+ return true;
}
- checkForNotify(info);
- return true;
}
private void checkForNotify(final JobHandler info) {
@@ -464,7 +475,7 @@ public abstract class AbstractJobQueue
}
/**
- * Process a job
+ * Execute a job
*/
protected boolean executeJob(final JobHandler handler) {
final JobImpl job = handler.getJob();
@@ -520,15 +531,15 @@ public abstract class AbstractJobQueue
}
JobConsumer.JobResult result = JobConsumer.JobResult.CANCEL;
final Object asyncLock = new Object();
- final AtomicBoolean asnycDone = new AtomicBoolean(false);
+ final AtomicBoolean asyncDone = new AtomicBoolean(false);
final JobConsumer.AsyncHandler asyncHandler =
new JobConsumer.AsyncHandler() {
private void check(final JobConsumer.JobResult result) {
synchronized ( asyncLock ) {
- if ( !asnycDone.get() ) {
- asnycDone.set(true);
- finishedJob(job.getId(), result);
+ if ( !asyncDone.get() ) {
+ asyncDone.set(true);
+ finishedJob(job.getId(), result, true);
} else {
throw new IllegalStateException("Job is already marked as processed");
}
@@ -539,7 +550,6 @@ public abstract class AbstractJobQueue
@Override
public void ok() {
this.check(JobConsumer.JobResult.OK);
- finishedJob(job.getId(), JobConsumer.JobResult.OK);
}
@Override
@@ -564,18 +574,22 @@ public abstract class AbstractJobQueue
currentThread.setPriority(oldPriority);
currentThread.setName(oldName);
if ( notifyResult && result != JobConsumer.JobResult.ASYNC ) {
- finishedJob(job.getId(), result);
+ finishedJob(job.getId(), result, false);
}
}
if ( result == JobConsumer.JobResult.ASYNC ) {
- synchronized ( asyncLock ) {
- while ( !asnycDone.get() ) {
- try {
- asyncLock.wait();
- } catch (final InterruptedException e) {
- ignoreException(e);
+ if ( configuration.waitForAsyncJobConsumers() ) {
+ synchronized ( asyncLock ) {
+ while ( !asyncDone.get() ) {
+ try {
+ asyncLock.wait();
+ } catch (final InterruptedException e) {
+ ignoreException(e);
+ }
}
}
+ } else {
+ notifyFinished(null);
}
}
}
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/QueueConfiguration.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/QueueConfiguration.java?rev=1471455&r1=1471454&r2=1471455&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/QueueConfiguration.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/jobs/QueueConfiguration.java Wed Apr 24 15:03:06 2013
@@ -80,4 +80,10 @@ public interface QueueConfiguration {
* Get the ranking of this configuration.
*/
int getRanking();
+
+ /**
+ * Wait for async job consumers
+ * @since 1.2
+ */
+ boolean waitForAsyncJobConsumers();
}
Modified: sling/trunk/bundles/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties?rev=1471455&r1=1471454&r2=1471455&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties (original)
+++ sling/trunk/bundles/extensions/event/src/main/resources/OSGI-INF/metatype/metatype.properties Wed Apr 24 15:03:06 2013
@@ -65,6 +65,10 @@ queue.ranking.name = Ranking
queue.ranking.description = Integer value defining the ranking of this queue configuration. \
If more than one queue matches a job topic, the one with the highest ranking is used.
+queue.waitforasync.name = Wait For Async
+queue.waitforasync.description = If a job consumer is processing a job asynchronously, this \
+ flag controls whether the queue waits for the consumer to finish before starting new jobs.
+
#
# Job Event Handler
job.events.name = Apache Sling Job Default Queue