You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2013/10/08 14:08:47 UTC

svn commit: r1530246 - in /sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs: JobHandler.java JobManagerImpl.java Utility.java queues/AbstractJobQueue.java

Author: cziegeler
Date: Tue Oct  8 12:08:47 2013
New Revision: 1530246

URL: http://svn.apache.org/r1530246
Log:
SLING-3139 : Provide a way to schedule jobs
SLING-3138 : Add fluent api to create new jobs

Modified:
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/Utility.java
    sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java?rev=1530246&r1=1530245&r2=1530246&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobHandler.java Tue Oct  8 12:08:47 2013
@@ -33,6 +33,8 @@ public class JobHandler {
     public long queued = -1;
     public long started = -1;
 
+    private volatile boolean isStopped = false;
+
     private final JobManagerImpl jobManager;
 
     public JobHandler(final JobImpl job, final JobManagerImpl jobManager) {
@@ -45,6 +47,7 @@ public class JobHandler {
     }
 
     public boolean startProcessing(final Queue queue) {
+        this.isStopped = false;
         return this.jobManager.persistJobProperties(this.job, this.job.prepare(queue));
     }
 
@@ -79,6 +82,14 @@ public class JobHandler {
         }
     }
 
+    public boolean isStopped() {
+        return this.isStopped;
+    }
+
+    public void stop() {
+        this.isStopped = true;
+    }
+
     @Override
     public int hashCode() {
         return this.job.getId().hashCode();

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java?rev=1530246&r1=1530245&r2=1530246&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/JobManagerImpl.java Tue Oct  8 12:08:47 2013
@@ -108,6 +108,7 @@ import org.slf4j.LoggerFactory;
                      SlingConstants.TOPIC_RESOURCE_CHANGED,
                      SlingConstants.TOPIC_RESOURCE_REMOVED,
                      "org/apache/sling/event/notification/job/*",
+                     Utility.TOPIC_STOP,
                      ResourceHelper.BUNDLE_EVENT_STARTED,
                      ResourceHelper.BUNDLE_EVENT_UPDATED})
 })
@@ -468,6 +469,11 @@ public class JobManagerImpl
                 this.backgroundLoader.loadJob(path);
             }
             this.jobScheduler.handleEvent(event);
+        } else if ( Utility.TOPIC_STOP.equals(event.getTopic()) ) {
+            if ( !EventUtil.isLocal(event) ) {
+                final String jobId = (String) event.getProperty(Utility.PROPERTY_ID);
+                this.stopJobById(jobId, false);
+            }
         } else if ( ResourceHelper.BUNDLE_EVENT_STARTED.equals(event.getTopic())
                  || ResourceHelper.BUNDLE_EVENT_UPDATED.equals(event.getTopic()) ) {
             this.backgroundLoader.tryToReloadUnloadedJobs();
@@ -1427,10 +1433,30 @@ public class JobManagerImpl
      */
     @Override
     public void stopJobById(final String jobId) {
-        // 1. check if the job is running locally - stop directly
-        // 2. if running remote, send an event via event admin to stop
-        // TODO not implemented yet
-        throw new IllegalStateException("Not implemented yet...");
+        this.stopJobById(jobId, true);
+    }
+
+    private void stopJobById(final String jobId, final boolean forward) {
+        final JobImpl job = (JobImpl)this.getJobById(jobId);
+        if ( job != null && !this.configuration.isStoragePath(job.getResourcePath()) ) {
+            // get the queue configuration
+            final QueueInfo queueInfo = queueConfigManager.getQueueInfo(job.getTopic());
+            final AbstractJobQueue queue;
+            synchronized ( queuesLock ) {
+                queue = this.queues.get(queueInfo.queueName);
+            }
+            boolean stopped = false;
+            if ( queue != null ) {
+                stopped = queue.stopJob(job);
+            }
+            if ( forward && !stopped ) {
+                // send remote event
+                final Map<String, Object> props = new HashMap<String, Object>();
+                props.put(Utility.PROPERTY_ID, jobId);
+                props.put(EventUtil.PROPERTY_DISTRIBUTE, "");
+                this.eventAdmin.sendEvent(new Event(Utility.TOPIC_STOP, props));
+            }
+        }
     }
 
     /**

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/Utility.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/Utility.java?rev=1530246&r1=1530245&r2=1530246&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/Utility.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/Utility.java Tue Oct  8 12:08:47 2013
@@ -38,7 +38,9 @@ public abstract class Utility {
     public static final String PROPERTY_LOCK_CREATED_APP = "lock.created.app";
     public static final String RESOURCE_TYPE_LOCK = "slingevent:Lock";
 
-    public static final String TOPIC_STOPPED = "org/apache/sling/event/impl/jobs/STOP";
+    public static final String TOPIC_STOPPED = "org/apache/sling/event/impl/jobs/STOPPED";
+    public static final String TOPIC_STOP = "org/apache/sling/event/impl/jobs/STOP";
+    public static final String PROPERTY_ID = "id";
 
     /**
      * Check the job topic.

Modified: sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java?rev=1530246&r1=1530245&r2=1530246&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java (original)
+++ sling/trunk/bundles/extensions/event/src/main/java/org/apache/sling/event/impl/jobs/queues/AbstractJobQueue.java Tue Oct  8 12:08:47 2013
@@ -578,7 +578,7 @@ public abstract class AbstractJobQueue
 
                                             @Override
                                             public boolean isStopped() {
-                                                return false;
+                                                return handler.isStopped();
                                             }
 
                                             @Override
@@ -844,5 +844,16 @@ public abstract class AbstractJobQueue
     protected abstract JobHandler start(final JobHandler event);
 
     protected abstract void notifyFinished(final JobHandler rescheduleInfo);
+
+    public boolean stopJob(final JobImpl job) {
+        final JobHandler handler;
+        synchronized ( this.processsingJobsLists ) {
+            handler = this.processsingJobsLists.get(job.getId());
+        }
+        if ( handler != null ) {
+            handler.stop();
+        }
+        return handler != null;
+    }
 }