You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2013/10/08 14:08:47 UTC
svn commit: r1530246 - in
/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs:
JobHandler.java JobManagerImpl.java Utility.java queues/AbstractJobQueue.java
Author: cziegeler
Date: Tue Oct 8 12:08:47 2013
New Revision: 1530246
URL: http://svn.apache.org/r1530246
Log:
SLING-3139 : Provide a way to schedule jobs
SLING-3138 : Add fluent api to create new jobs
Modified:
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/Utility.java
sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java?rev=1530246&r1=1530245&r2=1530246&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java Tue Oct 8 12:08:47 2013
@@ -33,6 +33,8 @@ public class JobHandler {
public long queued = -1;
public long started = -1;
+ private volatile boolean isStopped = false;
+
private final JobManagerImpl jobManager;
public JobHandler(final JobImpl job, final JobManagerImpl jobManager) {
@@ -45,6 +47,7 @@ public class JobHandler {
}
public boolean startProcessing(final Queue queue) {
+ this.isStopped = false;
return this.jobManager.persistJobProperties(this.job, this.job.prepare(queue));
}
@@ -79,6 +82,14 @@ public class JobHandler {
}
}
+ public boolean isStopped() {
+ return this.isStopped;
+ }
+
+ public void stop() {
+ this.isStopped = true;
+ }
+
@Override
public int hashCode() {
return this.job.getId().hashCode();
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java?rev=1530246&r1=1530245&r2=1530246&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java Tue Oct 8 12:08:47 2013
@@ -108,6 +108,7 @@ import org.slf4j.LoggerFactory;
SlingConstants.TOPIC_RESOURCE_CHANGED,
SlingConstants.TOPIC_RESOURCE_REMOVED,
"org/apache/sling/event/notification/job/*",
+ Utility.TOPIC_STOP,
ResourceHelper.BUNDLE_EVENT_STARTED,
ResourceHelper.BUNDLE_EVENT_UPDATED})
})
@@ -468,6 +469,11 @@ public class JobManagerImpl
this.backgroundLoader.loadJob(path);
}
this.jobScheduler.handleEvent(event);
+ } else if ( Utility.TOPIC_STOP.equals(event.getTopic()) ) {
+ if ( !EventUtil.isLocal(event) ) {
+ final String jobId = (String) event.getProperty(Utility.PROPERTY_ID);
+ this.stopJobById(jobId, false);
+ }
} else if ( ResourceHelper.BUNDLE_EVENT_STARTED.equals(event.getTopic())
|| ResourceHelper.BUNDLE_EVENT_UPDATED.equals(event.getTopic()) ) {
this.backgroundLoader.tryToReloadUnloadedJobs();
@@ -1427,10 +1433,30 @@ public class JobManagerImpl
*/
@Override
public void stopJobById(final String jobId) {
- // 1. check if the job is running locally - stop directly
- // 2. if running remote, send an event via event admin to stop
- // TODO not implemented yet
- throw new IllegalStateException("Not implemented yet...");
+ this.stopJobById(jobId, true);
+ }
+
+ private void stopJobById(final String jobId, final boolean forward) {
+ final JobImpl job = (JobImpl)this.getJobById(jobId);
+ if ( job != null && !this.configuration.isStoragePath(job.getResourcePath()) ) {
+ // get the queue configuration
+ final QueueInfo queueInfo = queueConfigManager.getQueueInfo(job.getTopic());
+ final AbstractJobQueue queue;
+ synchronized ( queuesLock ) {
+ queue = this.queues.get(queueInfo.queueName);
+ }
+ boolean stopped = false;
+ if ( queue != null ) {
+ stopped = queue.stopJob(job);
+ }
+ if ( forward && !stopped ) {
+ // send remote event
+ final Map<String, Object> props = new HashMap<String, Object>();
+ props.put(Utility.PROPERTY_ID, jobId);
+ props.put(EventUtil.PROPERTY_DISTRIBUTE, "");
+ this.eventAdmin.sendEvent(new Event(Utility.TOPIC_STOP, props));
+ }
+ }
}
/**
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/Utility.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/Utility.java?rev=1530246&r1=1530245&r2=1530246&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/Utility.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/Utility.java Tue Oct 8 12:08:47 2013
@@ -38,7 +38,9 @@ public abstract class Utility {
public static final String PROPERTY_LOCK_CREATED_APP = "lock.created.app";
public static final String RESOURCE_TYPE_LOCK = "slingevent:Lock";
- public static final String TOPIC_STOPPED = "org/apache/sling/event/impl/jobs/STOP";
+ public static final String TOPIC_STOPPED = "org/apache/sling/event/impl/jobs/STOPPED";
+ public static final String TOPIC_STOP = "org/apache/sling/event/impl/jobs/STOP";
+ public static final String PROPERTY_ID = "id";
/**
* Check the job topic.
Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java?rev=1530246&r1=1530245&r2=1530246&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java Tue Oct 8 12:08:47 2013
@@ -578,7 +578,7 @@ public abstract class AbstractJobQueue
@Override
public boolean isStopped() {
- return false;
+ return handler.isStopped();
}
@Override
@@ -844,5 +844,16 @@ public abstract class AbstractJobQueue
protected abstract JobHandler start(final JobHandler event);
protected abstract void notifyFinished(final JobHandler rescheduleInfo);
+
+ public boolean stopJob(final JobImpl job) {
+ final JobHandler handler;
+ synchronized ( this.processsingJobsLists ) {
+ handler = this.processsingJobsLists.get(job.getId());
+ }
+ if ( handler != null ) {
+ handler.stop();
+ }
+ return handler != null;
+ }
}