You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ofbiz.apache.org by ad...@apache.org on 2012/08/04 23:24:28 UTC

svn commit: r1369467 - in /ofbiz/trunk/framework/service/src/org/ofbiz/service/job: AbstractJob.java GenericServiceJob.java Job.java JobInvoker.java JobManager.java JobPoller.java PersistedServiceJob.java

Author: adrianc
Date: Sat Aug  4 21:24:27 2012
New Revision: 1369467

URL: http://svn.apache.org/viewvc?rev=1369467&view=rev
Log:
Refactored Job implementations so they implement a state machine. This approach makes the code much simpler.

Modified:
    ofbiz/trunk/framework/service/src/org/ofbiz/service/job/AbstractJob.java
    ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java
    ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java
    ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobInvoker.java
    ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java
    ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java
    ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java

Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/AbstractJob.java
URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/AbstractJob.java?rev=1369467&r1=1369466&r2=1369467&view=diff
==============================================================================
--- ofbiz/trunk/framework/service/src/org/ofbiz/service/job/AbstractJob.java (original)
+++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/AbstractJob.java Sat Aug  4 21:24:27 2012
@@ -18,68 +18,43 @@
  *******************************************************************************/
 package org.ofbiz.service.job;
 
+import org.ofbiz.base.util.Assert;
+
 /**
- * Abstract Service Job - Invokes a service
+ * Abstract Job.
  */
-@SuppressWarnings("serial")
 public abstract class AbstractJob implements Job {
 
-    public static final String module = AbstractJob.class.getName();
-
-    protected long runtime = -1;
-    protected long sequence = 0;
-    private String jobId;
-    private String jobName;
-    private boolean queued = false;
+    private final String jobId;
+    private final String jobName;
+    protected State currentState = State.CREATED;
 
     protected AbstractJob(String jobId, String jobName) {
+        Assert.notNull("jobId", jobId, "jobName", jobName);
         this.jobId = jobId;
         this.jobName = jobName;
     }
 
-    /**
-     * Returns the time to run in milliseconds.
-     */
-    public long getRuntime() {
-        return runtime;
-    }
-
-    /**
-     * Returns true if this job is still valid.
-     */
-    public boolean isValid() {
-        if (runtime > 0)
-            return true;
-        return false;
+    @Override
+    public State currentState() {
+        return currentState;
     }
 
-    /**
-     * Returns the ID of this Job.
-     */
+    @Override
     public String getJobId() {
         return this.jobId;
     }
 
-    /**
-     * Returns the name of this Job.
-     */
+    @Override
     public String getJobName() {
         return this.jobName;
     }
 
-    /**
-     * Flags this job as 'is-queued'
-     */
+    @Override
     public void queue() throws InvalidJobException {
-        this.queued = true;
-    }
-
-    /**
-     *  Executes the Job.
-     */
-    public abstract void exec() throws InvalidJobException;
-
-    public boolean isQueued() {
-        return queued;
+        if (currentState != State.CREATED) {
+            throw new InvalidJobException("Illegal state change");
+        }
+        this.currentState = State.QUEUED;
     }
 }

Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java
URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java?rev=1369467&r1=1369466&r2=1369467&view=diff
==============================================================================
--- ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java (original)
+++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java Sat Aug  4 21:24:27 2012
@@ -18,43 +18,38 @@
  *******************************************************************************/
 package org.ofbiz.service.job;
 
+import java.io.Serializable;
 import java.util.Map;
 
+import org.ofbiz.base.util.Assert;
 import org.ofbiz.base.util.Debug;
 import org.ofbiz.service.DispatchContext;
 import org.ofbiz.service.GenericRequester;
 import org.ofbiz.service.LocalDispatcher;
 import org.ofbiz.service.ServiceUtil;
+import org.ofbiz.service.job.Job.State;
 
 /**
  * Generic Service Job - A generic async-service Job.
  */
 @SuppressWarnings("serial")
-public class GenericServiceJob extends AbstractJob {
+public class GenericServiceJob extends AbstractJob implements Serializable {
 
     public static final String module = GenericServiceJob.class.getName();
 
-    protected transient GenericRequester requester = null;
-    protected transient DispatchContext dctx = null;
-
-    private String service = null;
-    private Map<String, Object> context = null;
+    protected final transient GenericRequester requester;
+    protected final transient DispatchContext dctx;
+    private final String service;
+    private final Map<String, Object> context;
+    protected long runtime = System.currentTimeMillis();
 
     public GenericServiceJob(DispatchContext dctx, String jobId, String jobName, String service, Map<String, Object> context, GenericRequester req) {
         super(jobId, jobName);
+        Assert.notNull("dctx", dctx);
         this.dctx = dctx;
         this.service = service;
         this.context = context;
         this.requester = req;
-        runtime = System.currentTimeMillis();
-    }
-
-    protected GenericServiceJob(String jobId, String jobName) {
-        super(jobId, jobName);
-        this.dctx = null;
-        this.requester = null;
-        this.service = null;
-        this.context = null;
     }
 
     /**
@@ -62,6 +57,10 @@ public class GenericServiceJob extends A
      */
     @Override
     public void exec() throws InvalidJobException {
+        if (currentState != State.QUEUED) {
+            throw new InvalidJobException("Illegal state change");
+        }
+        currentState = State.RUNNING;
         init();
 
         Map<String, Object> result = null;
@@ -105,8 +104,11 @@ public class GenericServiceJob extends A
      * Method is called after the service has finished.
      */
     protected void finish(Map<String, Object> result) throws InvalidJobException {
+        if (currentState != State.RUNNING) {
+            throw new InvalidJobException("Illegal state change");
+        }
+        currentState = State.FINISHED;
         if (Debug.verboseOn()) Debug.logVerbose("Async-Service finished.", module);
-        runtime = 0;
     }
 
     /**
@@ -114,8 +116,11 @@ public class GenericServiceJob extends A
      * @param t Throwable
      */
     protected void failed(Throwable t) throws InvalidJobException {
+        if (currentState != State.RUNNING) {
+            throw new InvalidJobException("Illegal state change");
+        }
+        currentState = State.FAILED;
         Debug.logError(t, "Async-Service failed.", module);
-        runtime = 0;
     }
 
     /**
@@ -133,4 +138,14 @@ public class GenericServiceJob extends A
     protected String getServiceName() throws InvalidJobException {
         return service;
     }
+
+    @Override
+    public long getRuntime() {
+        return runtime;
+    }
+
+    @Override
+    public boolean isValid() {
+        return currentState == State.CREATED;
+    }
 }

Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java
URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java?rev=1369467&r1=1369466&r2=1369467&view=diff
==============================================================================
--- ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java (original)
+++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java Sat Aug  4 21:24:27 2012
@@ -18,41 +18,46 @@
  *******************************************************************************/
 package org.ofbiz.service.job;
 
-import java.io.Serializable;
-
 /**
- * Job Interface
+ * A scheduled job.
  */
-public interface Job extends Serializable {
+public interface Job {
+
+    public static enum State {CREATED, QUEUED, RUNNING, FINISHED, FAILED};
+
+    /**
+     * Returns the current state of this job.
+     */
+    State currentState();
 
     /**
-     *  Executes the Job.
+     *  Executes this Job.
      */
-    public void exec() throws InvalidJobException;
+    void exec() throws InvalidJobException;
 
     /**
      * Returns the ID of this Job.
      */
-    public String getJobId();
+    String getJobId();
 
     /**
      * Returns the name of this Job.
      */
-    public String getJobName();
+    String getJobName();
 
     /**
      *  Returns the time to run in milliseconds.
      */
-    public long getRuntime();
+    long getRuntime();
 
     /**
-     * Returns true if this job is still valid.
+     * Returns true if this job is ready to be queued.
      */
-    public boolean isValid();
+    boolean isValid();
 
     /**
-     * Flags this job as 'is-queued'
+     * Transitions the job to the queued state.
      */
-    public void queue() throws InvalidJobException;
+    void queue() throws InvalidJobException;
 }
 

Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobInvoker.java
URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobInvoker.java?rev=1369467&r1=1369466&r2=1369467&view=diff
==============================================================================
--- ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobInvoker.java (original)
+++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobInvoker.java Sat Aug  4 21:24:27 2012
@@ -20,6 +20,7 @@ package org.ofbiz.service.job;
 
 import java.util.Date;
 
+import org.ofbiz.base.util.Assert;
 import org.ofbiz.base.util.Debug;
 import org.ofbiz.entity.transaction.GenericTransactionException;
 import org.ofbiz.entity.transaction.TransactionUtil;
@@ -31,18 +32,15 @@ public class JobInvoker implements Runna
 
     public static final String module = JobInvoker.class.getName();
 
-    private Date created = null;
+    private final Date created = new Date();
     private long jobStart;
-
-    private Job currentJob = null;
+    private final Job currentJob;
 
     public JobInvoker(Job job) {
-        this.created = new Date();
+        Assert.notNull("job", job);
         this.currentJob = job;
     }
 
-    protected JobInvoker() {}
-
     /**
      * Gets the time when this thread was created.
      * @return Time in milliseconds when this was created.
@@ -69,11 +67,7 @@ public class JobInvoker implements Runna
      * @return String ID of the current running job.
      */
     public String getJobId() {
-        if (this.currentJob != null) {
-            return this.currentJob.getJobId();
-        } else {
-            return "WARNING: Invalid Job!";
-        }
+        return this.currentJob.getJobId();
     }
 
     /**
@@ -81,11 +75,7 @@ public class JobInvoker implements Runna
      * @return String name of the current running job.
      */
     public String getJobName() {
-        if (this.currentJob != null) {
-            return this.currentJob.getJobName();
-        } else {
-            return "WARNING: Invalid Job!";
-        }
+        return this.currentJob.getJobName();
     }
 
     /**
@@ -93,31 +83,26 @@ public class JobInvoker implements Runna
      * @return The name of the service being run.
      */
     public String getServiceName() {
-        String serviceName = null;
-        if (this.currentJob != null) {
-            if (this.currentJob instanceof GenericServiceJob) {
-                GenericServiceJob gsj = (GenericServiceJob) this.currentJob;
-                try {
-                    serviceName = gsj.getServiceName();
-                } catch (InvalidJobException e) {
-                    Debug.logError(e, module);
-                }
+        if (this.currentJob instanceof GenericServiceJob) {
+            GenericServiceJob gsj = (GenericServiceJob) this.currentJob;
+            try {
+                return gsj.getServiceName();
+            } catch (InvalidJobException e) {
+                Debug.logWarning(e, module);
             }
         }
-        return serviceName;
+        return null;
     }
 
     public void run() {
         // setup the current job settings
         this.jobStart = System.currentTimeMillis();
-
         // execute the job
         try {
             this.currentJob.exec();
         } catch (InvalidJobException e) {
             Debug.logWarning(e.getMessage(), module);
         }
-
         // sanity check; make sure we don't have any transactions in place
         try {
             // roll back current TX first
@@ -125,7 +110,6 @@ public class JobInvoker implements Runna
                 Debug.logWarning("*** NOTICE: JobInvoker finished w/ a transaction in place! Rolling back.", module);
                 TransactionUtil.rollback();
             }
-
             // now resume/rollback any suspended txs
             if (TransactionUtil.suspendedTransactionsHeld()) {
                 int suspended = TransactionUtil.cleanSuspendedTransactions();

Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java
URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java?rev=1369467&r1=1369466&r2=1369467&view=diff
==============================================================================
--- ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java (original)
+++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java Sat Aug  4 21:24:27 2012
@@ -155,13 +155,19 @@ public final class JobManager {
 
     public synchronized List<Job> poll() {
         assertIsRunning();
+        DispatchContext dctx = getDispatcher().getDispatchContext();
+        if (dctx == null) {
+            Debug.logError("Unable to locate DispatchContext object; not running job!", module);
+            return null;
+        }
         List<Job> poll = FastList.newInstance();
         // sort the results by time
         List<String> order = UtilMisc.toList("runTime");
         // basic query
         List<EntityExpr> expressions = UtilMisc.toList(EntityCondition.makeCondition("runTime", EntityOperator.LESS_THAN_EQUAL_TO, UtilDateTime.nowTimestamp()),
-                EntityCondition.makeCondition("startDateTime", EntityOperator.EQUALS, null), EntityCondition.makeCondition("cancelDateTime",
-                EntityOperator.EQUALS, null), EntityCondition.makeCondition("runByInstanceId", EntityOperator.EQUALS, null));
+                EntityCondition.makeCondition("startDateTime", EntityOperator.EQUALS, null),
+                EntityCondition.makeCondition("cancelDateTime", EntityOperator.EQUALS, null),
+                EntityCondition.makeCondition("runByInstanceId", EntityOperator.EQUALS, null));
         // limit to just defined pools
         List<String> pools = ServiceConfigUtil.getRunPools();
         List<EntityExpr> poolsExpr = UtilMisc.toList(EntityCondition.makeCondition("poolId", EntityOperator.EQUALS, null));
@@ -174,62 +180,41 @@ public final class JobManager {
         EntityCondition baseCondition = EntityCondition.makeCondition(expressions);
         EntityCondition poolCondition = EntityCondition.makeCondition(poolsExpr, EntityOperator.OR);
         EntityCondition mainCondition = EntityCondition.makeCondition(UtilMisc.toList(baseCondition, poolCondition));
-        // we will loop until we have no more to do
-        boolean pollDone = false;
-        while (!pollDone) {
-            boolean beganTransaction = false;
-            try {
-                beganTransaction = TransactionUtil.begin();
-                if (!beganTransaction) {
-                    Debug.logError("Unable to poll for jobs; transaction was not started by this process", module);
-                    return null;
-                }
-                List<Job> localPoll = FastList.newInstance();
-                // first update the jobs w/ this instance running information
-                delegator.storeByCondition("JobSandbox", updateFields, mainCondition);
-                // now query all the 'queued' jobs for this instance
-                List<GenericValue> jobEnt = delegator.findByAnd("JobSandbox", updateFields, order, false);
-                // jobEnt = delegator.findByCondition("JobSandbox", mainCondition, null, order);
-                if (UtilValidate.isNotEmpty(jobEnt)) {
-                    for (GenericValue v : jobEnt) {
-                        DispatchContext dctx = getDispatcher().getDispatchContext();
-                        if (dctx == null) {
-                            Debug.logError("Unable to locate DispatchContext object; not running job!", module);
-                            continue;
-                        }
-                        Job job = new PersistedServiceJob(dctx, v, null); // TODO fix the requester
-                        try {
-                            job.queue();
-                            localPoll.add(job);
-                        } catch (InvalidJobException e) {
-                            Debug.logError(e, module);
-                        }
-                    }
-                } else {
-                    pollDone = true;
-                }
-                // nothing should go wrong at this point, so add to the general list
-                poll.addAll(localPoll);
-            } catch (Throwable t) {
-                // catch Throwable so nothing slips through the cracks... this is a fairly sensitive operation
-                String errMsg = "Error in polling JobSandbox: [" + t.toString() + "]. Rolling back transaction.";
-                Debug.logError(t, errMsg, module);
-                try {
-                    // only rollback the transaction if we started one...
-                    TransactionUtil.rollback(beganTransaction, errMsg, t);
-                } catch (GenericEntityException e2) {
-                    Debug.logError(e2, "[Delegator] Could not rollback transaction: " + e2.toString(), module);
-                }
-            } finally {
-                try {
-                    // only commit the transaction if we started one... but make sure we try
-                    TransactionUtil.commit(beganTransaction);
-                } catch (GenericTransactionException e) {
-                    String errMsg = "Transaction error trying to commit when polling and updating the JobSandbox: " + e.toString();
-                    // we don't really want to do anything different, so just log and move on
-                    Debug.logError(e, errMsg, module);
+        boolean beganTransaction = false;
+        try {
+            beganTransaction = TransactionUtil.begin();
+            if (!beganTransaction) {
+                Debug.logError("Unable to poll for jobs; transaction was not started by this process", module);
+                return null;
+            }
+            // first update the jobs w/ this instance running information
+            delegator.storeByCondition("JobSandbox", updateFields, mainCondition);
+            // now query all the 'queued' jobs for this instance
+            List<GenericValue> jobEnt = delegator.findByAnd("JobSandbox", updateFields, order, false);
+            if (UtilValidate.isNotEmpty(jobEnt)) {
+                for (GenericValue v : jobEnt) {
+                    poll.add(new PersistedServiceJob(dctx, v, null)); // TODO fix the requester
                 }
             }
+        } catch (Throwable t) {
+            // catch Throwable so nothing slips through the cracks... this is a fairly sensitive operation
+            String errMsg = "Error in polling JobSandbox: [" + t.toString() + "]. Rolling back transaction.";
+            Debug.logError(t, errMsg, module);
+            try {
+                // only rollback the transaction if we started one...
+                TransactionUtil.rollback(beganTransaction, errMsg, t);
+            } catch (GenericEntityException e2) {
+                Debug.logError(e2, "[Delegator] Could not rollback transaction: " + e2.toString(), module);
+            }
+        } finally {
+            try {
+                // only commit the transaction if we started one... but make sure we try
+                TransactionUtil.commit(beganTransaction);
+            } catch (GenericTransactionException e) {
+                String errMsg = "Transaction error trying to commit when polling and updating the JobSandbox: " + e.toString();
+                // we don't really want to do anything different, so just log and move on
+                Debug.logError(e, errMsg, module);
+            }
         }
         return poll;
     }

Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java
URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java?rev=1369467&r1=1369466&r2=1369467&view=diff
==============================================================================
--- ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java (original)
+++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java Sat Aug  4 21:24:27 2012
@@ -161,9 +161,11 @@ public final class JobPoller implements 
 
     /**
      * Adds a job to the RUN queue.
+     * @throws InvalidJobException if the job is in an invalid state.
      * @throws RejectedExecutionException if the poller is stopped.
      */
-    public void queueNow(Job job) {
+    public void queueNow(Job job) throws InvalidJobException {
+        job.queue();
         this.executor.execute(new JobInvoker(job));
     }
 
@@ -177,11 +179,11 @@ public final class JobPoller implements 
             try {
                 // grab a list of jobs to run.
                 List<Job> pollList = jm.poll();
-                // Debug.logInfo("Received poll list from JobManager [" + pollList.size() + "]", module);
                 for (Job job : pollList) {
-                    if (job.isValid()) {
+                    try {
                         queueNow(job);
-                        // Debug.logInfo("Job [" + job.getJobId() + "] is queued", module);
+                    } catch (InvalidJobException e) {
+                        Debug.logError(e, module);
                     }
                 }
                 // NOTE: using sleep instead of wait for stricter locking

Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java
URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java?rev=1369467&r1=1369466&r2=1369467&view=diff
==============================================================================
--- ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java (original)
+++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java Sat Aug  4 21:24:27 2012
@@ -52,7 +52,8 @@ import org.xml.sax.SAXException;
 import org.apache.commons.lang.StringUtils;
 
 /**
- * Entity Service Job - Store => Schedule => Run
+ * A {@link Job} that is backed by the entity engine. Job data is stored
+ * in the JobSandbox entity.
  */
 @SuppressWarnings("serial")
 public class PersistedServiceJob extends GenericServiceJob {
@@ -60,7 +61,6 @@ public class PersistedServiceJob extends
     public static final String module = PersistedServiceJob.class.getName();
 
     private transient Delegator delegator = null;
-    private Timestamp storedDate = null;
     private long nextRecurrence = -1;
     private long maxRetry = -1;
     private long currentRetryCount = 0;
@@ -73,11 +73,9 @@ public class PersistedServiceJob extends
      * @param req
      */
     public PersistedServiceJob(DispatchContext dctx, GenericValue jobValue, GenericRequester req) {
-        super(jobValue.getString("jobId"), jobValue.getString("jobName"));
+        super(dctx, jobValue.getString("jobId"), jobValue.getString("jobName"), null, null, req);
         this.delegator = dctx.getDelegator();
-        this.requester = req;
-        this.dctx = dctx;
-        this.storedDate = jobValue.getTimestamp("runTime");
+        Timestamp storedDate = jobValue.getTimestamp("runTime");
         this.runtime = storedDate.getTime();
         this.maxRetry = jobValue.get("maxRetry") != null ? jobValue.getLong("maxRetry").longValue() : -1;
         Long retryCount = jobValue.getLong("currentRetryCount");
@@ -87,51 +85,36 @@ public class PersistedServiceJob extends
             // backward compatibility
             this.currentRetryCount = PersistedServiceJob.getRetries(jobValue, this.delegator);
         }
-
-        // Debug.logInfo("=============== New PersistedServiceJob, delegator from dctx is [" + dctx.getDelegator().getDelegatorName() + "] and delegator from jobValue is [" + jobValue.getDelegator().getDelegatorName() + "]", module);
     }
 
     @Override
     public void queue() throws InvalidJobException {
         super.queue();
-
         // refresh the job object
         GenericValue jobValue = null;
         try {
             jobValue = this.getJob();
             jobValue.refresh();
         } catch (GenericEntityException e) {
-            runtime = -1;
-            throw new InvalidJobException("Unable to refresh Job object", e);
+            throw new InvalidJobException("Unable to refresh JobSandbox value", e);
         }
-
-        // make sure it isn't already set/cancelled
-        if (runtime != -1) {
-            Timestamp cancelTime = jobValue.getTimestamp("cancelDateTime");
-            Timestamp startTime = jobValue.getTimestamp("startDateTime");
-            if (cancelTime != null || startTime != null) {
-                // job not available
-                runtime = -1;
-                throw new InvalidJobException("Job [" + getJobId() + "] is not available");
-
-            } else {
-                // set the start time to now
-                jobValue.set("startDateTime", UtilDateTime.nowTimestamp());
-                jobValue.set("statusId", "SERVICE_RUNNING");
-                try {
-                    jobValue.store();
-                } catch (GenericEntityException e) {
-                    runtime = -1;
-                    throw new InvalidJobException("Unable to set the startDateTime on the current job [" + getJobId() + "]; not running!", e);
-
-                }
+        Timestamp cancelTime = jobValue.getTimestamp("cancelDateTime");
+        Timestamp startTime = jobValue.getTimestamp("startDateTime");
+        if (cancelTime != null || startTime != null) {
+            // job not available
+            throw new InvalidJobException("Job [" + getJobId() + "] is not available");
+        } else {
+            // set the start time to now
+            jobValue.set("startDateTime", UtilDateTime.nowTimestamp());
+            jobValue.set("statusId", "SERVICE_RUNNING");
+            try {
+                jobValue.store();
+            } catch (GenericEntityException e) {
+                throw new InvalidJobException("Unable to set the startDateTime on the current job [" + getJobId() + "]; not running!", e);
             }
         }
     }
 
-    /**
-     * @see org.ofbiz.service.job.GenericServiceJob#init()
-     */
     @Override
     protected void init() throws InvalidJobException {
         super.init();
@@ -217,19 +200,13 @@ public class PersistedServiceJob extends
         }
     }
 
-    /**
-     * @see org.ofbiz.service.job.GenericServiceJob#finish(Map)
-     */
     @Override
     protected void finish(Map<String, Object> result) throws InvalidJobException {
         super.finish(result);
 
         // set the finish date
         GenericValue job = getJob();
-        String status = job.getString("statusId");
-        if (status == null || "SERVICE_RUNNING".equals(status)) {
-            job.set("statusId", "SERVICE_FINISHED");
-        }
+        job.set("statusId", "SERVICE_FINISHED");
         job.set("finishDateTime", UtilDateTime.nowTimestamp());
         String jobResult = null;
         if (ServiceUtil.isError(result)) {
@@ -247,9 +224,6 @@ public class PersistedServiceJob extends
         }
     }
 
-    /**
-     * @see org.ofbiz.service.job.GenericServiceJob#failed(Throwable)
-     */
     @Override
     protected void failed(Throwable t) throws InvalidJobException {
         super.failed(t);
@@ -284,9 +258,6 @@ public class PersistedServiceJob extends
         }
     }
 
-    /**
-     * @see org.ofbiz.service.job.GenericServiceJob#getServiceName()
-     */
     @Override
     protected String getServiceName() throws InvalidJobException {
         GenericValue jobObj = getJob();
@@ -296,9 +267,6 @@ public class PersistedServiceJob extends
         return jobObj.getString("serviceName");
     }
 
-    /**
-     * @see org.ofbiz.service.job.GenericServiceJob#getContext()
-     */
     @Override
     protected Map<String, Object> getContext() throws InvalidJobException {
         Map<String, Object> context = null;
@@ -341,7 +309,6 @@ public class PersistedServiceJob extends
     private GenericValue getJob() throws InvalidJobException {
         try {
             GenericValue jobObj = delegator.findOne("JobSandbox", false, "jobId", getJobId());
-
             if (jobObj == null) {
                 throw new InvalidJobException("Job [" + getJobId() + "] came back null from datasource from delegator " + delegator.getDelegatorName());
             }