You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ofbiz.apache.org by ad...@apache.org on 2012/08/08 00:10:28 UTC

svn commit: r1370566 - in /ofbiz/trunk/framework/service/src/org/ofbiz/service/job: GenericServiceJob.java Job.java JobManager.java JobPoller.java PersistedServiceJob.java

Author: adrianc
Date: Tue Aug  7 22:10:27 2012
New Revision: 1370566

URL: http://svn.apache.org/viewvc?rev=1370566&view=rev
Log:
Fixed the Job Scheduler so jobs are not lost during heavy server load.

Modified:
    ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java
    ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java
    ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java
    ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java
    ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java

Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java
URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java?rev=1370566&r1=1370565&r2=1370566&view=diff
==============================================================================
--- ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java (original)
+++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java Tue Aug  7 22:10:27 2012
@@ -144,4 +144,12 @@ public class GenericServiceJob extends A
     public boolean isValid() {
         return currentState == State.CREATED;
     }
+
+    @Override
+    public void deQueue() throws InvalidJobException {
+        if (currentState != State.QUEUED) {
+            throw new InvalidJobException("Illegal state change");
+        }
+        throw new InvalidJobException("Unable to queue job [" + getJobId() + "]");
+    }
 }

Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java
URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java?rev=1370566&r1=1370565&r2=1370566&view=diff
==============================================================================
--- ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java (original)
+++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java Tue Aug  7 22:10:27 2012
@@ -60,7 +60,13 @@ public interface Job {
     boolean isValid();
 
     /**
-     * Transitions the job to the queued state.
+     * Transitions this job to the pre-queued (created) state. The job manager
+     * will call this method when there was a problem adding this job to the queue.
+     */
+    void deQueue() throws InvalidJobException;
+
+    /**
+     * Transitions this job to the queued state.
      */
     void queue() throws InvalidJobException;
 }

Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java
URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java?rev=1370566&r1=1370565&r2=1370566&view=diff
==============================================================================
--- ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java (original)
+++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java Tue Aug  7 22:10:27 2012
@@ -63,7 +63,6 @@ public final class JobManager {
 
     public static final String module = JobManager.class.getName();
     public static final String instanceId = UtilProperties.getPropertyValue("general.properties", "unique.instanceId", "ofbiz0");
-    public static final Map<String, Object> updateFields = UtilMisc.<String, Object>toMap("runByInstanceId", instanceId, "statusId", "SERVICE_QUEUED");
     private static final ConcurrentHashMap<String, JobManager> registeredManagers = new ConcurrentHashMap<String, JobManager>();
     private static boolean isShutDown = false;
 
@@ -150,7 +149,7 @@ public final class JobManager {
         assertIsRunning();
         DispatchContext dctx = getDispatcher().getDispatchContext();
         if (dctx == null) {
-            Debug.logError("Unable to locate DispatchContext object; not running job!", module);
+            Debug.logWarning("Unable to locate DispatchContext object; not running job!", module);
             return null;
         }
         List<Job> poll = FastList.newInstance();
@@ -176,13 +175,13 @@ public final class JobManager {
         try {
             beganTransaction = TransactionUtil.begin();
             if (!beganTransaction) {
-                Debug.logError("Unable to poll JobSandbox for jobs; transaction was not started by this process", module);
+                Debug.logWarning("Unable to poll JobSandbox for jobs; transaction was not started by this process", module);
                 return null;
             }
             jobsIterator = delegator.find("JobSandbox", mainCondition, null, null, UtilMisc.toList("runTime"), null);
             GenericValue jobValue = jobsIterator.next();
             while (jobValue != null) {
-                jobValue.putAll(updateFields);
+                jobValue.set("runByInstanceId", instanceId);  // Claim ownership of this value.
                 jobValue.store();
                 poll.add(new PersistedServiceJob(dctx, jobValue, null));
                 if (poll.size() == limit) {
@@ -191,14 +190,12 @@ public final class JobManager {
                 jobValue = jobsIterator.next();
             }
         } catch (Throwable t) {
-            // catch Throwable so nothing slips through the cracks... this is a fairly sensitive operation
             String errMsg = "Error in polling JobSandbox: [" + t.toString() + "]. Rolling back transaction.";
-            Debug.logError(t, errMsg, module);
+            Debug.logWarning(t, errMsg, module);
             try {
-                // only rollback the transaction if we started one...
                 TransactionUtil.rollback(beganTransaction, errMsg, t);
             } catch (GenericEntityException e2) {
-                Debug.logError(e2, "[Delegator] Could not rollback transaction: " + e2.toString(), module);
+                Debug.logWarning(e2, "[Delegator] Could not rollback transaction: " + e2.toString(), module);
             }
         } finally {
             if (jobsIterator != null) {
@@ -209,12 +206,10 @@ public final class JobManager {
                 }
             }
             try {
-                // only commit the transaction if we started one... but make sure we try
                 TransactionUtil.commit(beganTransaction);
             } catch (GenericTransactionException e) {
                 String errMsg = "Transaction error trying to commit when polling and updating the JobSandbox: " + e.toString();
-                // we don't really want to do anything different, so just log and move on
-                Debug.logError(e, errMsg, module);
+                Debug.logWarning(e, errMsg, module);
             }
         }
         return poll;
@@ -222,7 +217,9 @@ public final class JobManager {
 
     private void reloadCrashedJobs() {
         List<GenericValue> crashed = null;
-        List<EntityExpr> statusExprList = UtilMisc.toList(EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_QUEUED"), EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_RUNNING"));
+        List<EntityExpr> statusExprList = UtilMisc.toList(EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_PENDING"),
+                EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_QUEUED"),
+                EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_RUNNING"));
         EntityCondition statusCondition = EntityCondition.makeCondition(statusExprList, EntityOperator.OR);
         List<EntityExpr> poolsExpr = UtilMisc.toList(EntityCondition.makeCondition("poolId", EntityOperator.EQUALS, null));
         List<String> pools = ServiceConfigUtil.getRunPools();
@@ -236,13 +233,13 @@ public final class JobManager {
         try {
             crashed = delegator.findList("JobSandbox", mainCondition, null, UtilMisc.toList("startDateTime"), null, false);
         } catch (GenericEntityException e) {
-            Debug.logError(e, "Unable to load crashed jobs", module);
+            Debug.logWarning(e, "Unable to load crashed jobs", module);
         }
         if (UtilValidate.isNotEmpty(crashed)) {
-            try {
-                int rescheduled = 0;
-                for (GenericValue job : crashed) {
-                    Timestamp now = UtilDateTime.nowTimestamp();
+            int rescheduled = 0;
+            Timestamp now = UtilDateTime.nowTimestamp();
+            for (GenericValue job : crashed) {
+                try {
                     Debug.logInfo("Scheduling Job : " + job, module);
                     String pJobId = job.getString("parentJobId");
                     if (pJobId == null) {
@@ -261,12 +258,12 @@ public final class JobManager {
                     job.set("cancelDateTime", now);
                     delegator.store(job);
                     rescheduled++;
+                } catch (GenericEntityException e) {
+                    Debug.logWarning(e, module);
                 }
-                if (Debug.infoOn())
-                    Debug.logInfo("-- " + rescheduled + " jobs re-scheduled", module);
-            } catch (GenericEntityException e) {
-                Debug.logError(e, module);
             }
+            if (Debug.infoOn())
+                Debug.logInfo("-- " + rescheduled + " jobs re-scheduled", module);
         } else {
             if (Debug.infoOn())
                 Debug.logInfo("No crashed jobs to re-schedule", module);

Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java
URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java?rev=1370566&r1=1370565&r2=1370566&view=diff
==============================================================================
--- ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java (original)
+++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java Tue Aug  7 22:10:27 2012
@@ -203,7 +203,11 @@ public final class JobPoller implements 
      */
     public void queueNow(Job job) throws InvalidJobException {
         job.queue();
-        this.executor.execute(new JobInvoker(job));
+        try {
+            this.executor.execute(new JobInvoker(job));
+        } catch (Exception e) {
+            job.deQueue();
+        }
     }
 
     public void run() {

Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java
URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java?rev=1370566&r1=1370565&r2=1370566&view=diff
==============================================================================
--- ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java (original)
+++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java Tue Aug  7 22:10:27 2012
@@ -20,7 +20,6 @@ package org.ofbiz.service.job;
 
 import java.io.IOException;
 import java.sql.Timestamp;
-import com.ibm.icu.util.Calendar;
 import java.util.Date;
 import java.util.Map;
 
@@ -28,14 +27,12 @@ import javax.xml.parsers.ParserConfigura
 
 import javolution.util.FastMap;
 
+import org.apache.commons.lang.StringUtils;
 import org.ofbiz.base.util.Debug;
 import org.ofbiz.base.util.UtilDateTime;
 import org.ofbiz.base.util.UtilGenerics;
 import org.ofbiz.base.util.UtilProperties;
 import org.ofbiz.base.util.UtilValidate;
-import org.ofbiz.service.calendar.RecurrenceInfoException;
-import org.ofbiz.service.calendar.TemporalExpression;
-import org.ofbiz.service.calendar.TemporalExpressionWorker;
 import org.ofbiz.entity.Delegator;
 import org.ofbiz.entity.GenericEntityException;
 import org.ofbiz.entity.GenericValue;
@@ -47,10 +44,13 @@ import org.ofbiz.service.DispatchContext
 import org.ofbiz.service.GenericRequester;
 import org.ofbiz.service.ServiceUtil;
 import org.ofbiz.service.calendar.RecurrenceInfo;
+import org.ofbiz.service.calendar.RecurrenceInfoException;
+import org.ofbiz.service.calendar.TemporalExpression;
+import org.ofbiz.service.calendar.TemporalExpressionWorker;
 import org.ofbiz.service.config.ServiceConfigUtil;
 import org.xml.sax.SAXException;
 
-import org.apache.commons.lang.StringUtils;
+import com.ibm.icu.util.Calendar;
 
 /**
  * A {@link Job} that is backed by the entity engine. Job data is stored
@@ -100,14 +100,15 @@ public class PersistedServiceJob extends
             // job not available
             throw new InvalidJobException("Job [" + getJobId() + "] is not available");
         } else {
-            // set the start time to now
-            jobValue.set("startDateTime", UtilDateTime.nowTimestamp());
-            jobValue.set("statusId", "SERVICE_RUNNING");
+            jobValue.set("statusId", "SERVICE_QUEUED");
             try {
                 jobValue.store();
             } catch (GenericEntityException e) {
                 throw new InvalidJobException("Unable to set the startDateTime and statusId on the current job [" + getJobId() + "]; not running!", e);
             }
+            if (Debug.verboseOn()) {
+                Debug.logVerbose("Placing job [" + getJobId() + "] in queue", module);
+            }
         }
     }
 
@@ -129,6 +130,16 @@ public class PersistedServiceJob extends
             // This condition isn't possible, but we will leave it here.
             throw new InvalidJobException("Job has been accepted by a different instance!");
         }
+        jobValue.set("startDateTime", UtilDateTime.nowTimestamp());
+        jobValue.set("statusId", "SERVICE_RUNNING");
+        try {
+            jobValue.store();
+        } catch (GenericEntityException e) {
+            throw new InvalidJobException("Unable to set the startDateTime and statusId on the current job [" + getJobId() + "]; not running!", e);
+        }
+        if (Debug.verboseOn()) {
+            Debug.logVerbose("Job [" + getJobId() + "] running", module);
+        }
         // configure any additional recurrences
         long maxRecurrenceCount = -1;
         long currentRecurrenceCount = 0;
@@ -331,4 +342,24 @@ public class PersistedServiceJob extends
         }
         return null;
     }
+
+    @Override
+    public void deQueue() throws InvalidJobException {
+        if (currentState != State.QUEUED) {
+            throw new InvalidJobException("Illegal state change");
+        }
+        currentState = State.CREATED;
+        try {
+            jobValue.refresh();
+            jobValue.set("startDateTime", null);
+            jobValue.set("runByInstanceId", null);
+            jobValue.set("statusId", "SERVICE_PENDING");
+            jobValue.store();
+        } catch (GenericEntityException e) {
+            throw new InvalidJobException("Unable to dequeue job [" + getJobId() + "]", e);
+        }
+        if (Debug.verboseOn()) {
+            Debug.logVerbose("Job [" + getJobId() + "] not queued, rescheduling", module);
+        }
+    }
 }



Re: svn commit: r1370566 - in /ofbiz/trunk/framework/service/src/org/ofbiz/service/job: GenericServiceJob.java Job.java JobManager.java JobPoller.java PersistedServiceJob.java

Posted by Adrian Crum <ad...@sandglass-software.com>.
Usually, when I do an overhaul like this I reformat the code first, in a 
separate commit. But I didn't start out intending to overhaul the Job 
Scheduler, I was just trying to fix some bugs I encountered, then things 
progressed...

I would appreciate another review. Scott's comments were helpful, and 
I'm sure yours will be also.

-Adrian

On 8/11/2012 9:49 AM, Jacques Le Roux wrote:
> OK clear, I had to look at the source not only the diff.
> I must say I try to cope with the changes there, but will need more 
> time to review completly
>
> Jacques
>
> From: "Jacques Le Roux" <ja...@les7arts.com>
>> I just looked at the diff, I will see in the context, maybe you are 
>> right...
>>
>> Jacques
>>
>> From: "Adrian Crum" <ad...@sandglass-software.com>
>>> You are welcome to put them back in. They seemed redundant (and 
>>> silly) to me. Example:
>>>
>>> // Set var to 1
>>> int var = 1;
>>> // Call someMethod, put result in var2
>>> int var2 = someMethod();
>>> // Calculate var1 * var2
>>> int result = var1 * var2
>>>
>>> The comments are not adding anything of value - they are only 
>>> stating the obvious.
>>>
>>> -Adrian
>>>
>>> On 8/8/2012 10:07 PM, Jacques Le Roux wrote:
>>>> Else forgot to say that I like it
>>>>
>>>> Jacques
>>>>
>>>> From: "Jacques Le Roux" <ja...@les7arts.com>
>>>>> Hi Adrian,
>>>>>
>>>>> Why did you remove these comments ?
>>>>>> -            // catch Throwable so nothing slips through the 
>>>>>> cracks... this is a fairly sensitive operation
>>>>>> -                // only rollback the transaction if we started 
>>>>>> one...
>>>>>> -                // only commit the transaction if we started 
>>>>>> one... but make sure we try
>>>>>> -                // we don't really want to do anything 
>>>>>> different, so just log and move on
>>>>>
>>>>> It seems they add some information
>>>>>
>>>>> Jacques
>>>>>
>>>>> From: <ad...@apache.org>
>>>>>> Author: adrianc
>>>>>> Date: Tue Aug  7 22:10:27 2012
>>>>>> New Revision: 1370566
>>>>>>
>>>>>> URL: http://svn.apache.org/viewvc?rev=1370566&view=rev
>>>>>> Log:
>>>>>> Fixed the Job Scheduler so jobs are not lost during heavy server 
>>>>>> load.
>>>>>>
>>>>>> Modified:
>>>>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java 
>>>>>>
>>>>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java
>>>>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java 
>>>>>>
>>>>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java 
>>>>>>
>>>>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java 
>>>>>>
>>>>>>
>>>>>> Modified: 
>>>>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java
>>>>>> URL: 
>>>>>> http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java?rev=1370566&r1=1370565&r2=1370566&view=diff
>>>>>> ============================================================================== 
>>>>>>
>>>>>> --- 
>>>>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java 
>>>>>> (original)
>>>>>> +++ 
>>>>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java 
>>>>>> Tue Aug  7 22:10:27 2012
>>>>>> @@ -144,4 +144,12 @@ public class GenericServiceJob extends A
>>>>>>     public boolean isValid() {
>>>>>>         return currentState == State.CREATED;
>>>>>>     }
>>>>>> +
>>>>>> +    @Override
>>>>>> +    public void deQueue() throws InvalidJobException {
>>>>>> +        if (currentState != State.QUEUED) {
>>>>>> +            throw new InvalidJobException("Illegal state change");
>>>>>> +        }
>>>>>> +        throw new InvalidJobException("Unable to queue job [" + 
>>>>>> getJobId() + "]");
>>>>>> +    }
>>>>>> }
>>>>>>
>>>>>> Modified: 
>>>>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java
>>>>>> URL: 
>>>>>> http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java?rev=1370566&r1=1370565&r2=1370566&view=diff
>>>>>> ============================================================================== 
>>>>>>
>>>>>> --- 
>>>>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java 
>>>>>> (original)
>>>>>> +++ 
>>>>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java 
>>>>>> Tue Aug  7 22:10:27 2012
>>>>>> @@ -60,7 +60,13 @@ public interface Job {
>>>>>>     boolean isValid();
>>>>>>
>>>>>>     /**
>>>>>> -     * Transitions the job to the queued state.
>>>>>> +     * Transitions this job to the pre-queued (created) state. 
>>>>>> The job manager
>>>>>> +     * will call this method when there was a problem adding 
>>>>>> this job to the queue.
>>>>>> +     */
>>>>>> +    void deQueue() throws InvalidJobException;
>>>>>> +
>>>>>> +    /**
>>>>>> +     * Transitions this job to the queued state.
>>>>>>      */
>>>>>>     void queue() throws InvalidJobException;
>>>>>> }
>>>>>>
>>>>>> Modified: 
>>>>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java 
>>>>>>
>>>>>> URL: 
>>>>>> http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java?rev=1370566&r1=1370565&r2=1370566&view=diff
>>>>>> ============================================================================== 
>>>>>>
>>>>>> --- 
>>>>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java 
>>>>>> (original)
>>>>>> +++ 
>>>>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java 
>>>>>> Tue Aug  7 22:10:27 2012
>>>>>> @@ -63,7 +63,6 @@ public final class JobManager {
>>>>>>
>>>>>>     public static final String module = JobManager.class.getName();
>>>>>>     public static final String instanceId = 
>>>>>> UtilProperties.getPropertyValue("general.properties", 
>>>>>> "unique.instanceId", "ofbiz0");
>>>>>> -    public static final Map<String, Object> updateFields = 
>>>>>> UtilMisc.<String, Object>toMap("runByInstanceId", instanceId, 
>>>>>> "statusId", "SERVICE_QUEUED");
>>>>>>     private static final ConcurrentHashMap<String, JobManager> 
>>>>>> registeredManagers = new ConcurrentHashMap<String, JobManager>();
>>>>>>     private static boolean isShutDown = false;
>>>>>>
>>>>>> @@ -150,7 +149,7 @@ public final class JobManager {
>>>>>>         assertIsRunning();
>>>>>>         DispatchContext dctx = getDispatcher().getDispatchContext();
>>>>>>         if (dctx == null) {
>>>>>> -            Debug.logError("Unable to locate DispatchContext 
>>>>>> object; not running job!", module);
>>>>>> +            Debug.logWarning("Unable to locate DispatchContext 
>>>>>> object; not running job!", module);
>>>>>>             return null;
>>>>>>         }
>>>>>>         List<Job> poll = FastList.newInstance();
>>>>>> @@ -176,13 +175,13 @@ public final class JobManager {
>>>>>>         try {
>>>>>>             beganTransaction = TransactionUtil.begin();
>>>>>>             if (!beganTransaction) {
>>>>>> -                Debug.logError("Unable to poll JobSandbox for 
>>>>>> jobs; transaction was not started by this process", module);
>>>>>> +                Debug.logWarning("Unable to poll JobSandbox for 
>>>>>> jobs; transaction was not started by this process", module);
>>>>>>                 return null;
>>>>>>             }
>>>>>>             jobsIterator = delegator.find("JobSandbox", 
>>>>>> mainCondition, null, null, UtilMisc.toList("runTime"), null);
>>>>>>             GenericValue jobValue = jobsIterator.next();
>>>>>>             while (jobValue != null) {
>>>>>> -                jobValue.putAll(updateFields);
>>>>>> +                jobValue.set("runByInstanceId", instanceId); // 
>>>>>> Claim ownership of this value.
>>>>>>                 jobValue.store();
>>>>>>                 poll.add(new PersistedServiceJob(dctx, jobValue, 
>>>>>> null));
>>>>>>                 if (poll.size() == limit) {
>>>>>> @@ -191,14 +190,12 @@ public final class JobManager {
>>>>>>                 jobValue = jobsIterator.next();
>>>>>>             }
>>>>>>         } catch (Throwable t) {
>>>>>> -            // catch Throwable so nothing slips through the 
>>>>>> cracks... this is a fairly sensitive operation
>>>>>>             String errMsg = "Error in polling JobSandbox: [" + 
>>>>>> t.toString() + "]. Rolling back transaction.";
>>>>>> -            Debug.logError(t, errMsg, module);
>>>>>> +            Debug.logWarning(t, errMsg, module);
>>>>>>             try {
>>>>>> -                // only rollback the transaction if we started 
>>>>>> one...
>>>>>> TransactionUtil.rollback(beganTransaction, errMsg, t);
>>>>>>             } catch (GenericEntityException e2) {
>>>>>> -                Debug.logError(e2, "[Delegator] Could not 
>>>>>> rollback transaction: " + e2.toString(), module);
>>>>>> +                Debug.logWarning(e2, "[Delegator] Could not 
>>>>>> rollback transaction: " + e2.toString(), module);
>>>>>>             }
>>>>>>         } finally {
>>>>>>             if (jobsIterator != null) {
>>>>>> @@ -209,12 +206,10 @@ public final class JobManager {
>>>>>>                 }
>>>>>>             }
>>>>>>             try {
>>>>>> -                // only commit the transaction if we started 
>>>>>> one... but make sure we try
>>>>>> TransactionUtil.commit(beganTransaction);
>>>>>>             } catch (GenericTransactionException e) {
>>>>>>                 String errMsg = "Transaction error trying to 
>>>>>> commit when polling and updating the JobSandbox: " + e.toString();
>>>>>> -                // we don't really want to do anything 
>>>>>> different, so just log and move on
>>>>>> -                Debug.logError(e, errMsg, module);
>>>>>> +                Debug.logWarning(e, errMsg, module);
>>>>>>             }
>>>>>>         }
>>>>>>         return poll;
>>>>>> @@ -222,7 +217,9 @@ public final class JobManager {
>>>>>>
>>>>>>     private void reloadCrashedJobs() {
>>>>>>         List<GenericValue> crashed = null;
>>>>>> -        List<EntityExpr> statusExprList = 
>>>>>> UtilMisc.toList(EntityCondition.makeCondition("statusId", 
>>>>>> EntityOperator.EQUALS, "SERVICE_QUEUED"), 
>>>>>> EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, 
>>>>>> "SERVICE_RUNNING"));
>>>>>> +        List<EntityExpr> statusExprList = 
>>>>>> UtilMisc.toList(EntityCondition.makeCondition("statusId", 
>>>>>> EntityOperator.EQUALS, "SERVICE_PENDING"),
>>>>>> + EntityCondition.makeCondition("statusId", 
>>>>>> EntityOperator.EQUALS, "SERVICE_QUEUED"),
>>>>>> + EntityCondition.makeCondition("statusId", 
>>>>>> EntityOperator.EQUALS, "SERVICE_RUNNING"));
>>>>>>         EntityCondition statusCondition = 
>>>>>> EntityCondition.makeCondition(statusExprList, EntityOperator.OR);
>>>>>>         List<EntityExpr> poolsExpr = 
>>>>>> UtilMisc.toList(EntityCondition.makeCondition("poolId", 
>>>>>> EntityOperator.EQUALS, null));
>>>>>>         List<String> pools = ServiceConfigUtil.getRunPools();
>>>>>> @@ -236,13 +233,13 @@ public final class JobManager {
>>>>>>         try {
>>>>>>             crashed = delegator.findList("JobSandbox", 
>>>>>> mainCondition, null, UtilMisc.toList("startDateTime"), null, false);
>>>>>>         } catch (GenericEntityException e) {
>>>>>> -            Debug.logError(e, "Unable to load crashed jobs", 
>>>>>> module);
>>>>>> +            Debug.logWarning(e, "Unable to load crashed jobs", 
>>>>>> module);
>>>>>>         }
>>>>>>         if (UtilValidate.isNotEmpty(crashed)) {
>>>>>> -            try {
>>>>>> -                int rescheduled = 0;
>>>>>> -                for (GenericValue job : crashed) {
>>>>>> -                    Timestamp now = UtilDateTime.nowTimestamp();
>>>>>> +            int rescheduled = 0;
>>>>>> +            Timestamp now = UtilDateTime.nowTimestamp();
>>>>>> +            for (GenericValue job : crashed) {
>>>>>> +                try {
>>>>>>                     Debug.logInfo("Scheduling Job : " + job, 
>>>>>> module);
>>>>>>                     String pJobId = job.getString("parentJobId");
>>>>>>                     if (pJobId == null) {
>>>>>> @@ -261,12 +258,12 @@ public final class JobManager {
>>>>>>                     job.set("cancelDateTime", now);
>>>>>>                     delegator.store(job);
>>>>>>                     rescheduled++;
>>>>>> +                } catch (GenericEntityException e) {
>>>>>> +                    Debug.logWarning(e, module);
>>>>>>                 }
>>>>>> -                if (Debug.infoOn())
>>>>>> -                    Debug.logInfo("-- " + rescheduled + " jobs 
>>>>>> re-scheduled", module);
>>>>>> -            } catch (GenericEntityException e) {
>>>>>> -                Debug.logError(e, module);
>>>>>>             }
>>>>>> +            if (Debug.infoOn())
>>>>>> +                Debug.logInfo("-- " + rescheduled + " jobs 
>>>>>> re-scheduled", module);
>>>>>>         } else {
>>>>>>             if (Debug.infoOn())
>>>>>>                 Debug.logInfo("No crashed jobs to re-schedule", 
>>>>>> module);
>>>>>>
>>>>>> Modified: 
>>>>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java 
>>>>>>
>>>>>> URL: 
>>>>>> http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java?rev=1370566&r1=1370565&r2=1370566&view=diff
>>>>>> ============================================================================== 
>>>>>>
>>>>>> --- 
>>>>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java 
>>>>>> (original)
>>>>>> +++ 
>>>>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java 
>>>>>> Tue Aug  7 22:10:27 2012
>>>>>> @@ -203,7 +203,11 @@ public final class JobPoller implements
>>>>>>      */
>>>>>>     public void queueNow(Job job) throws InvalidJobException {
>>>>>>         job.queue();
>>>>>> -        this.executor.execute(new JobInvoker(job));
>>>>>> +        try {
>>>>>> +            this.executor.execute(new JobInvoker(job));
>>>>>> +        } catch (Exception e) {
>>>>>> +            job.deQueue();
>>>>>> +        }
>>>>>>     }
>>>>>>
>>>>>>     public void run() {
>>>>>>
>>>>>> Modified: 
>>>>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java
>>>>>> URL: 
>>>>>> http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java?rev=1370566&r1=1370565&r2=1370566&view=diff
>>>>>> ============================================================================== 
>>>>>>
>>>>>> --- 
>>>>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java 
>>>>>> (original)
>>>>>> +++ 
>>>>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java 
>>>>>> Tue Aug  7 22:10:27 2012
>>>>>> @@ -20,7 +20,6 @@ package org.ofbiz.service.job;
>>>>>>
>>>>>> import java.io.IOException;
>>>>>> import java.sql.Timestamp;
>>>>>> -import com.ibm.icu.util.Calendar;
>>>>>> import java.util.Date;
>>>>>> import java.util.Map;
>>>>>>
>>>>>> @@ -28,14 +27,12 @@ import javax.xml.parsers.ParserConfigura
>>>>>>
>>>>>> import javolution.util.FastMap;
>>>>>>
>>>>>> +import org.apache.commons.lang.StringUtils;
>>>>>> import org.ofbiz.base.util.Debug;
>>>>>> import org.ofbiz.base.util.UtilDateTime;
>>>>>> import org.ofbiz.base.util.UtilGenerics;
>>>>>> import org.ofbiz.base.util.UtilProperties;
>>>>>> import org.ofbiz.base.util.UtilValidate;
>>>>>> -import org.ofbiz.service.calendar.RecurrenceInfoException;
>>>>>> -import org.ofbiz.service.calendar.TemporalExpression;
>>>>>> -import org.ofbiz.service.calendar.TemporalExpressionWorker;
>>>>>> import org.ofbiz.entity.Delegator;
>>>>>> import org.ofbiz.entity.GenericEntityException;
>>>>>> import org.ofbiz.entity.GenericValue;
>>>>>> @@ -47,10 +44,13 @@ import org.ofbiz.service.DispatchContext
>>>>>> import org.ofbiz.service.GenericRequester;
>>>>>> import org.ofbiz.service.ServiceUtil;
>>>>>> import org.ofbiz.service.calendar.RecurrenceInfo;
>>>>>> +import org.ofbiz.service.calendar.RecurrenceInfoException;
>>>>>> +import org.ofbiz.service.calendar.TemporalExpression;
>>>>>> +import org.ofbiz.service.calendar.TemporalExpressionWorker;
>>>>>> import org.ofbiz.service.config.ServiceConfigUtil;
>>>>>> import org.xml.sax.SAXException;
>>>>>>
>>>>>> -import org.apache.commons.lang.StringUtils;
>>>>>> +import com.ibm.icu.util.Calendar;
>>>>>>
>>>>>> /**
>>>>>>  * A {@link Job} that is backed by the entity engine. Job data is 
>>>>>> stored
>>>>>> @@ -100,14 +100,15 @@ public class PersistedServiceJob extends
>>>>>>             // job not available
>>>>>>             throw new InvalidJobException("Job [" + getJobId() + 
>>>>>> "] is not available");
>>>>>>         } else {
>>>>>> -            // set the start time to now
>>>>>> -            jobValue.set("startDateTime", 
>>>>>> UtilDateTime.nowTimestamp());
>>>>>> -            jobValue.set("statusId", "SERVICE_RUNNING");
>>>>>> +            jobValue.set("statusId", "SERVICE_QUEUED");
>>>>>>             try {
>>>>>>                 jobValue.store();
>>>>>>             } catch (GenericEntityException e) {
>>>>>>                 throw new InvalidJobException("Unable to set the 
>>>>>> startDateTime and statusId on the current job [" + getJobId() + 
>>>>>> "]; not running!", e);
>>>>>>             }
>>>>>> +            if (Debug.verboseOn()) {
>>>>>> +                Debug.logVerbose("Placing job [" + getJobId() + 
>>>>>> "] in queue", module);
>>>>>> +            }
>>>>>>         }
>>>>>>     }
>>>>>>
>>>>>> @@ -129,6 +130,16 @@ public class PersistedServiceJob extends
>>>>>>             // This condition isn't possible, but we will leave 
>>>>>> it here.
>>>>>>             throw new InvalidJobException("Job has been accepted 
>>>>>> by a different instance!");
>>>>>>         }
>>>>>> +        jobValue.set("startDateTime", UtilDateTime.nowTimestamp());
>>>>>> +        jobValue.set("statusId", "SERVICE_RUNNING");
>>>>>> +        try {
>>>>>> +            jobValue.store();
>>>>>> +        } catch (GenericEntityException e) {
>>>>>> +            throw new InvalidJobException("Unable to set the 
>>>>>> startDateTime and statusId on the current job [" + getJobId() + 
>>>>>> "]; not running!", e);
>>>>>> +        }
>>>>>> +        if (Debug.verboseOn()) {
>>>>>> +            Debug.logVerbose("Job [" + getJobId() + "] running", 
>>>>>> module);
>>>>>> +        }
>>>>>>         // configure any additional recurrences
>>>>>>         long maxRecurrenceCount = -1;
>>>>>>         long currentRecurrenceCount = 0;
>>>>>> @@ -331,4 +342,24 @@ public class PersistedServiceJob extends
>>>>>>         }
>>>>>>         return null;
>>>>>>     }
>>>>>> +
>>>>>> +    @Override
>>>>>> +    public void deQueue() throws InvalidJobException {
>>>>>> +        if (currentState != State.QUEUED) {
>>>>>> +            throw new InvalidJobException("Illegal state change");
>>>>>> +        }
>>>>>> +        currentState = State.CREATED;
>>>>>> +        try {
>>>>>> +            jobValue.refresh();
>>>>>> +            jobValue.set("startDateTime", null);
>>>>>> +            jobValue.set("runByInstanceId", null);
>>>>>> +            jobValue.set("statusId", "SERVICE_PENDING");
>>>>>> +            jobValue.store();
>>>>>> +        } catch (GenericEntityException e) {
>>>>>> +            throw new InvalidJobException("Unable to dequeue job 
>>>>>> [" + getJobId() + "]", e);
>>>>>> +        }
>>>>>> +        if (Debug.verboseOn()) {
>>>>>> +            Debug.logVerbose("Job [" + getJobId() + "] not 
>>>>>> queued, rescheduling", module);
>>>>>> +        }
>>>>>> +    }
>>>>>> }
>>>>>>
>>>>>>
>>>


Re: svn commit: r1370566 - in /ofbiz/trunk/framework/service/src/org/ofbiz/service/job: GenericServiceJob.java Job.java JobManager.java JobPoller.java PersistedServiceJob.java

Posted by Jacques Le Roux <ja...@les7arts.com>.
OK clear, I had to look at the source not only the diff.
I must say I try to cope with the changes there, but will need more time to review completly

Jacques

From: "Jacques Le Roux" <ja...@les7arts.com>
>I just looked at the diff, I will see in the context, maybe you are right...
>
> Jacques
>
> From: "Adrian Crum" <ad...@sandglass-software.com>
>> You are welcome to put them back in. They seemed redundant (and silly) to me. Example:
>>
>> // Set var to 1
>> int var = 1;
>> // Call someMethod, put result in var2
>> int var2 = someMethod();
>> // Calculate var1 * var2
>> int result = var1 * var2
>>
>> The comments are not adding anything of value - they are only stating the obvious.
>>
>> -Adrian
>>
>> On 8/8/2012 10:07 PM, Jacques Le Roux wrote:
>>> Else forgot to say that I like it
>>>
>>> Jacques
>>>
>>> From: "Jacques Le Roux" <ja...@les7arts.com>
>>>> Hi Adrian,
>>>>
>>>> Why did you remove these comments ?
>>>>> -            // catch Throwable so nothing slips through the cracks... this is a fairly sensitive operation
>>>>> -                // only rollback the transaction if we started one...
>>>>> -                // only commit the transaction if we started one... but make sure we try
>>>>> -                // we don't really want to do anything different, so just log and move on
>>>>
>>>> It seems they add some information
>>>>
>>>> Jacques
>>>>
>>>> From: <ad...@apache.org>
>>>>> Author: adrianc
>>>>> Date: Tue Aug  7 22:10:27 2012
>>>>> New Revision: 1370566
>>>>>
>>>>> URL: http://svn.apache.org/viewvc?rev=1370566&view=rev
>>>>> Log:
>>>>> Fixed the Job Scheduler so jobs are not lost during heavy server load.
>>>>>
>>>>> Modified:
>>>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java
>>>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java
>>>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java
>>>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java
>>>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java
>>>>>
>>>>> Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java
>>>>> URL: 
>>>>> http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java?rev=1370566&r1=1370565&r2=1370566&view=diff
>>>>> ==============================================================================
>>>>> --- 
>>>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java (original)
>>>>> +++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java Tue Aug  7 22:10:27 2012
>>>>> @@ -144,4 +144,12 @@ public class GenericServiceJob extends A
>>>>>     public boolean isValid() {
>>>>>         return currentState == State.CREATED;
>>>>>     }
>>>>> +
>>>>> +    @Override
>>>>> +    public void deQueue() throws InvalidJobException {
>>>>> +        if (currentState != State.QUEUED) {
>>>>> +            throw new InvalidJobException("Illegal state change");
>>>>> +        }
>>>>> +        throw new InvalidJobException("Unable to queue job [" + getJobId() + "]");
>>>>> +    }
>>>>> }
>>>>>
>>>>> Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java
>>>>> URL: 
>>>>> http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java?rev=1370566&r1=1370565&r2=1370566&view=diff
>>>>> ==============================================================================
>>>>> --- ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java (original)
>>>>> +++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java Tue Aug  7 22:10:27 2012
>>>>> @@ -60,7 +60,13 @@ public interface Job {
>>>>>     boolean isValid();
>>>>>
>>>>>     /**
>>>>> -     * Transitions the job to the queued state.
>>>>> +     * Transitions this job to the pre-queued (created) state. The job manager
>>>>> +     * will call this method when there was a problem adding this job to the queue.
>>>>> +     */
>>>>> +    void deQueue() throws InvalidJobException;
>>>>> +
>>>>> +    /**
>>>>> +     * Transitions this job to the queued state.
>>>>>      */
>>>>>     void queue() throws InvalidJobException;
>>>>> }
>>>>>
>>>>> Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java
>>>>> URL: 
>>>>> http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java?rev=1370566&r1=1370565&r2=1370566&view=diff
>>>>> ==============================================================================
>>>>> --- 
>>>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java (original)
>>>>> +++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java Tue Aug  7 22:10:27 2012
>>>>> @@ -63,7 +63,6 @@ public final class JobManager {
>>>>>
>>>>>     public static final String module = JobManager.class.getName();
>>>>>     public static final String instanceId = UtilProperties.getPropertyValue("general.properties", "unique.instanceId", 
>>>>> "ofbiz0");
>>>>> -    public static final Map<String, Object> updateFields = UtilMisc.<String, Object>toMap("runByInstanceId", instanceId, 
>>>>> "statusId", "SERVICE_QUEUED");
>>>>>     private static final ConcurrentHashMap<String, JobManager> registeredManagers = new ConcurrentHashMap<String, 
>>>>> JobManager>();
>>>>>     private static boolean isShutDown = false;
>>>>>
>>>>> @@ -150,7 +149,7 @@ public final class JobManager {
>>>>>         assertIsRunning();
>>>>>         DispatchContext dctx = getDispatcher().getDispatchContext();
>>>>>         if (dctx == null) {
>>>>> -            Debug.logError("Unable to locate DispatchContext object; not running job!", module);
>>>>> +            Debug.logWarning("Unable to locate DispatchContext object; not running job!", module);
>>>>>             return null;
>>>>>         }
>>>>>         List<Job> poll = FastList.newInstance();
>>>>> @@ -176,13 +175,13 @@ public final class JobManager {
>>>>>         try {
>>>>>             beganTransaction = TransactionUtil.begin();
>>>>>             if (!beganTransaction) {
>>>>> -                Debug.logError("Unable to poll JobSandbox for jobs; transaction was not started by this process", module);
>>>>> +                Debug.logWarning("Unable to poll JobSandbox for jobs; transaction was not started by this process", module);
>>>>>                 return null;
>>>>>             }
>>>>>             jobsIterator = delegator.find("JobSandbox", mainCondition, null, null, UtilMisc.toList("runTime"), null);
>>>>>             GenericValue jobValue = jobsIterator.next();
>>>>>             while (jobValue != null) {
>>>>> -                jobValue.putAll(updateFields);
>>>>> +                jobValue.set("runByInstanceId", instanceId); // Claim ownership of this value.
>>>>>                 jobValue.store();
>>>>>                 poll.add(new PersistedServiceJob(dctx, jobValue, null));
>>>>>                 if (poll.size() == limit) {
>>>>> @@ -191,14 +190,12 @@ public final class JobManager {
>>>>>                 jobValue = jobsIterator.next();
>>>>>             }
>>>>>         } catch (Throwable t) {
>>>>> -            // catch Throwable so nothing slips through the cracks... this is a fairly sensitive operation
>>>>>             String errMsg = "Error in polling JobSandbox: [" + t.toString() + "]. Rolling back transaction.";
>>>>> -            Debug.logError(t, errMsg, module);
>>>>> +            Debug.logWarning(t, errMsg, module);
>>>>>             try {
>>>>> -                // only rollback the transaction if we started one...
>>>>>                 TransactionUtil.rollback(beganTransaction, errMsg, t);
>>>>>             } catch (GenericEntityException e2) {
>>>>> -                Debug.logError(e2, "[Delegator] Could not rollback transaction: " + e2.toString(), module);
>>>>> +                Debug.logWarning(e2, "[Delegator] Could not rollback transaction: " + e2.toString(), module);
>>>>>             }
>>>>>         } finally {
>>>>>             if (jobsIterator != null) {
>>>>> @@ -209,12 +206,10 @@ public final class JobManager {
>>>>>                 }
>>>>>             }
>>>>>             try {
>>>>> -                // only commit the transaction if we started one... but make sure we try
>>>>>                 TransactionUtil.commit(beganTransaction);
>>>>>             } catch (GenericTransactionException e) {
>>>>>                 String errMsg = "Transaction error trying to commit when polling and updating the JobSandbox: " + 
>>>>> e.toString();
>>>>> -                // we don't really want to do anything different, so just log and move on
>>>>> -                Debug.logError(e, errMsg, module);
>>>>> +                Debug.logWarning(e, errMsg, module);
>>>>>             }
>>>>>         }
>>>>>         return poll;
>>>>> @@ -222,7 +217,9 @@ public final class JobManager {
>>>>>
>>>>>     private void reloadCrashedJobs() {
>>>>>         List<GenericValue> crashed = null;
>>>>> -        List<EntityExpr> statusExprList = UtilMisc.toList(EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, 
>>>>> "SERVICE_QUEUED"), EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_RUNNING"));
>>>>> +        List<EntityExpr> statusExprList = UtilMisc.toList(EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, 
>>>>> "SERVICE_PENDING"),
>>>>> +                EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_QUEUED"),
>>>>> +                EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_RUNNING"));
>>>>>         EntityCondition statusCondition = EntityCondition.makeCondition(statusExprList, EntityOperator.OR);
>>>>>         List<EntityExpr> poolsExpr = UtilMisc.toList(EntityCondition.makeCondition("poolId", EntityOperator.EQUALS, null));
>>>>>         List<String> pools = ServiceConfigUtil.getRunPools();
>>>>> @@ -236,13 +233,13 @@ public final class JobManager {
>>>>>         try {
>>>>>             crashed = delegator.findList("JobSandbox", mainCondition, null, UtilMisc.toList("startDateTime"), null, false);
>>>>>         } catch (GenericEntityException e) {
>>>>> -            Debug.logError(e, "Unable to load crashed jobs", module);
>>>>> +            Debug.logWarning(e, "Unable to load crashed jobs", module);
>>>>>         }
>>>>>         if (UtilValidate.isNotEmpty(crashed)) {
>>>>> -            try {
>>>>> -                int rescheduled = 0;
>>>>> -                for (GenericValue job : crashed) {
>>>>> -                    Timestamp now = UtilDateTime.nowTimestamp();
>>>>> +            int rescheduled = 0;
>>>>> +            Timestamp now = UtilDateTime.nowTimestamp();
>>>>> +            for (GenericValue job : crashed) {
>>>>> +                try {
>>>>>                     Debug.logInfo("Scheduling Job : " + job, module);
>>>>>                     String pJobId = job.getString("parentJobId");
>>>>>                     if (pJobId == null) {
>>>>> @@ -261,12 +258,12 @@ public final class JobManager {
>>>>>                     job.set("cancelDateTime", now);
>>>>>                     delegator.store(job);
>>>>>                     rescheduled++;
>>>>> +                } catch (GenericEntityException e) {
>>>>> +                    Debug.logWarning(e, module);
>>>>>                 }
>>>>> -                if (Debug.infoOn())
>>>>> -                    Debug.logInfo("-- " + rescheduled + " jobs re-scheduled", module);
>>>>> -            } catch (GenericEntityException e) {
>>>>> -                Debug.logError(e, module);
>>>>>             }
>>>>> +            if (Debug.infoOn())
>>>>> +                Debug.logInfo("-- " + rescheduled + " jobs re-scheduled", module);
>>>>>         } else {
>>>>>             if (Debug.infoOn())
>>>>>                 Debug.logInfo("No crashed jobs to re-schedule", module);
>>>>>
>>>>> Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java
>>>>> URL: 
>>>>> http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java?rev=1370566&r1=1370565&r2=1370566&view=diff
>>>>> ==============================================================================
>>>>> --- 
>>>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java (original)
>>>>> +++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java Tue Aug  7 22:10:27 2012
>>>>> @@ -203,7 +203,11 @@ public final class JobPoller implements
>>>>>      */
>>>>>     public void queueNow(Job job) throws InvalidJobException {
>>>>>         job.queue();
>>>>> -        this.executor.execute(new JobInvoker(job));
>>>>> +        try {
>>>>> +            this.executor.execute(new JobInvoker(job));
>>>>> +        } catch (Exception e) {
>>>>> +            job.deQueue();
>>>>> +        }
>>>>>     }
>>>>>
>>>>>     public void run() {
>>>>>
>>>>> Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java
>>>>> URL: 
>>>>> http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java?rev=1370566&r1=1370565&r2=1370566&view=diff
>>>>> ==============================================================================
>>>>> --- 
>>>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java (original)
>>>>> +++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java Tue Aug  7 22:10:27 2012
>>>>> @@ -20,7 +20,6 @@ package org.ofbiz.service.job;
>>>>>
>>>>> import java.io.IOException;
>>>>> import java.sql.Timestamp;
>>>>> -import com.ibm.icu.util.Calendar;
>>>>> import java.util.Date;
>>>>> import java.util.Map;
>>>>>
>>>>> @@ -28,14 +27,12 @@ import javax.xml.parsers.ParserConfigura
>>>>>
>>>>> import javolution.util.FastMap;
>>>>>
>>>>> +import org.apache.commons.lang.StringUtils;
>>>>> import org.ofbiz.base.util.Debug;
>>>>> import org.ofbiz.base.util.UtilDateTime;
>>>>> import org.ofbiz.base.util.UtilGenerics;
>>>>> import org.ofbiz.base.util.UtilProperties;
>>>>> import org.ofbiz.base.util.UtilValidate;
>>>>> -import org.ofbiz.service.calendar.RecurrenceInfoException;
>>>>> -import org.ofbiz.service.calendar.TemporalExpression;
>>>>> -import org.ofbiz.service.calendar.TemporalExpressionWorker;
>>>>> import org.ofbiz.entity.Delegator;
>>>>> import org.ofbiz.entity.GenericEntityException;
>>>>> import org.ofbiz.entity.GenericValue;
>>>>> @@ -47,10 +44,13 @@ import org.ofbiz.service.DispatchContext
>>>>> import org.ofbiz.service.GenericRequester;
>>>>> import org.ofbiz.service.ServiceUtil;
>>>>> import org.ofbiz.service.calendar.RecurrenceInfo;
>>>>> +import org.ofbiz.service.calendar.RecurrenceInfoException;
>>>>> +import org.ofbiz.service.calendar.TemporalExpression;
>>>>> +import org.ofbiz.service.calendar.TemporalExpressionWorker;
>>>>> import org.ofbiz.service.config.ServiceConfigUtil;
>>>>> import org.xml.sax.SAXException;
>>>>>
>>>>> -import org.apache.commons.lang.StringUtils;
>>>>> +import com.ibm.icu.util.Calendar;
>>>>>
>>>>> /**
>>>>>  * A {@link Job} that is backed by the entity engine. Job data is stored
>>>>> @@ -100,14 +100,15 @@ public class PersistedServiceJob extends
>>>>>             // job not available
>>>>>             throw new InvalidJobException("Job [" + getJobId() + "] is not available");
>>>>>         } else {
>>>>> -            // set the start time to now
>>>>> -            jobValue.set("startDateTime", UtilDateTime.nowTimestamp());
>>>>> -            jobValue.set("statusId", "SERVICE_RUNNING");
>>>>> +            jobValue.set("statusId", "SERVICE_QUEUED");
>>>>>             try {
>>>>>                 jobValue.store();
>>>>>             } catch (GenericEntityException e) {
>>>>>                 throw new InvalidJobException("Unable to set the startDateTime and statusId on the current job [" + getJobId() 
>>>>> + "]; not running!", e);
>>>>>             }
>>>>> +            if (Debug.verboseOn()) {
>>>>> +                Debug.logVerbose("Placing job [" + getJobId() + "] in queue", module);
>>>>> +            }
>>>>>         }
>>>>>     }
>>>>>
>>>>> @@ -129,6 +130,16 @@ public class PersistedServiceJob extends
>>>>>             // This condition isn't possible, but we will leave it here.
>>>>>             throw new InvalidJobException("Job has been accepted by a different instance!");
>>>>>         }
>>>>> +        jobValue.set("startDateTime", UtilDateTime.nowTimestamp());
>>>>> +        jobValue.set("statusId", "SERVICE_RUNNING");
>>>>> +        try {
>>>>> +            jobValue.store();
>>>>> +        } catch (GenericEntityException e) {
>>>>> +            throw new InvalidJobException("Unable to set the startDateTime and statusId on the current job [" + getJobId() + 
>>>>> "]; not running!", e);
>>>>> +        }
>>>>> +        if (Debug.verboseOn()) {
>>>>> +            Debug.logVerbose("Job [" + getJobId() + "] running", module);
>>>>> +        }
>>>>>         // configure any additional recurrences
>>>>>         long maxRecurrenceCount = -1;
>>>>>         long currentRecurrenceCount = 0;
>>>>> @@ -331,4 +342,24 @@ public class PersistedServiceJob extends
>>>>>         }
>>>>>         return null;
>>>>>     }
>>>>> +
>>>>> +    @Override
>>>>> +    public void deQueue() throws InvalidJobException {
>>>>> +        if (currentState != State.QUEUED) {
>>>>> +            throw new InvalidJobException("Illegal state change");
>>>>> +        }
>>>>> +        currentState = State.CREATED;
>>>>> +        try {
>>>>> +            jobValue.refresh();
>>>>> +            jobValue.set("startDateTime", null);
>>>>> +            jobValue.set("runByInstanceId", null);
>>>>> +            jobValue.set("statusId", "SERVICE_PENDING");
>>>>> +            jobValue.store();
>>>>> +        } catch (GenericEntityException e) {
>>>>> +            throw new InvalidJobException("Unable to dequeue job [" + getJobId() + "]", e);
>>>>> +        }
>>>>> +        if (Debug.verboseOn()) {
>>>>> +            Debug.logVerbose("Job [" + getJobId() + "] not queued, rescheduling", module);
>>>>> +        }
>>>>> +    }
>>>>> }
>>>>>
>>>>>
>> 

Re: svn commit: r1370566 - in /ofbiz/trunk/framework/service/src/org/ofbiz/service/job: GenericServiceJob.java Job.java JobManager.java JobPoller.java PersistedServiceJob.java

Posted by Jacques Le Roux <ja...@les7arts.com>.
I just looked at the diff, I will see in the context, maybe you are right...

Jacques

From: "Adrian Crum" <ad...@sandglass-software.com>
> You are welcome to put them back in. They seemed redundant (and silly) to me. Example:
>
> // Set var to 1
> int var = 1;
> // Call someMethod, put result in var2
> int var2 = someMethod();
> // Calculate var1 * var2
> int result = var1 * var2
>
> The comments are not adding anything of value - they are only stating the obvious.
>
> -Adrian
>
> On 8/8/2012 10:07 PM, Jacques Le Roux wrote:
>> Else forgot to say that I like it
>>
>> Jacques
>>
>> From: "Jacques Le Roux" <ja...@les7arts.com>
>>> Hi Adrian,
>>>
>>> Why did you remove these comments ?
>>>> -            // catch Throwable so nothing slips through the cracks... this is a fairly sensitive operation
>>>> -                // only rollback the transaction if we started one...
>>>> -                // only commit the transaction if we started one... but make sure we try
>>>> -                // we don't really want to do anything different, so just log and move on
>>>
>>> It seems they add some information
>>>
>>> Jacques
>>>
>>> From: <ad...@apache.org>
>>>> Author: adrianc
>>>> Date: Tue Aug  7 22:10:27 2012
>>>> New Revision: 1370566
>>>>
>>>> URL: http://svn.apache.org/viewvc?rev=1370566&view=rev
>>>> Log:
>>>> Fixed the Job Scheduler so jobs are not lost during heavy server load.
>>>>
>>>> Modified:
>>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java
>>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java
>>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java
>>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java
>>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java
>>>>
>>>> Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java
>>>> URL: 
>>>> http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java?rev=1370566&r1=1370565&r2=1370566&view=diff
>>>> ==============================================================================
>>>> --- 
>>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java (original)
>>>> +++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java Tue Aug  7 22:10:27 2012
>>>> @@ -144,4 +144,12 @@ public class GenericServiceJob extends A
>>>>     public boolean isValid() {
>>>>         return currentState == State.CREATED;
>>>>     }
>>>> +
>>>> +    @Override
>>>> +    public void deQueue() throws InvalidJobException {
>>>> +        if (currentState != State.QUEUED) {
>>>> +            throw new InvalidJobException("Illegal state change");
>>>> +        }
>>>> +        throw new InvalidJobException("Unable to queue job [" + getJobId() + "]");
>>>> +    }
>>>> }
>>>>
>>>> Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java
>>>> URL: 
>>>> http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java?rev=1370566&r1=1370565&r2=1370566&view=diff
>>>> ==============================================================================
>>>> --- ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java (original)
>>>> +++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java Tue Aug  7 22:10:27 2012
>>>> @@ -60,7 +60,13 @@ public interface Job {
>>>>     boolean isValid();
>>>>
>>>>     /**
>>>> -     * Transitions the job to the queued state.
>>>> +     * Transitions this job to the pre-queued (created) state. The job manager
>>>> +     * will call this method when there was a problem adding this job to the queue.
>>>> +     */
>>>> +    void deQueue() throws InvalidJobException;
>>>> +
>>>> +    /**
>>>> +     * Transitions this job to the queued state.
>>>>      */
>>>>     void queue() throws InvalidJobException;
>>>> }
>>>>
>>>> Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java
>>>> URL: 
>>>> http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java?rev=1370566&r1=1370565&r2=1370566&view=diff
>>>> ==============================================================================
>>>> --- 
>>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java (original)
>>>> +++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java Tue Aug  7 22:10:27 2012
>>>> @@ -63,7 +63,6 @@ public final class JobManager {
>>>>
>>>>     public static final String module = JobManager.class.getName();
>>>>     public static final String instanceId = UtilProperties.getPropertyValue("general.properties", "unique.instanceId", 
>>>> "ofbiz0");
>>>> -    public static final Map<String, Object> updateFields = UtilMisc.<String, Object>toMap("runByInstanceId", instanceId, 
>>>> "statusId", "SERVICE_QUEUED");
>>>>     private static final ConcurrentHashMap<String, JobManager> registeredManagers = new ConcurrentHashMap<String, 
>>>> JobManager>();
>>>>     private static boolean isShutDown = false;
>>>>
>>>> @@ -150,7 +149,7 @@ public final class JobManager {
>>>>         assertIsRunning();
>>>>         DispatchContext dctx = getDispatcher().getDispatchContext();
>>>>         if (dctx == null) {
>>>> -            Debug.logError("Unable to locate DispatchContext object; not running job!", module);
>>>> +            Debug.logWarning("Unable to locate DispatchContext object; not running job!", module);
>>>>             return null;
>>>>         }
>>>>         List<Job> poll = FastList.newInstance();
>>>> @@ -176,13 +175,13 @@ public final class JobManager {
>>>>         try {
>>>>             beganTransaction = TransactionUtil.begin();
>>>>             if (!beganTransaction) {
>>>> -                Debug.logError("Unable to poll JobSandbox for jobs; transaction was not started by this process", module);
>>>> +                Debug.logWarning("Unable to poll JobSandbox for jobs; transaction was not started by this process", module);
>>>>                 return null;
>>>>             }
>>>>             jobsIterator = delegator.find("JobSandbox", mainCondition, null, null, UtilMisc.toList("runTime"), null);
>>>>             GenericValue jobValue = jobsIterator.next();
>>>>             while (jobValue != null) {
>>>> -                jobValue.putAll(updateFields);
>>>> +                jobValue.set("runByInstanceId", instanceId); // Claim ownership of this value.
>>>>                 jobValue.store();
>>>>                 poll.add(new PersistedServiceJob(dctx, jobValue, null));
>>>>                 if (poll.size() == limit) {
>>>> @@ -191,14 +190,12 @@ public final class JobManager {
>>>>                 jobValue = jobsIterator.next();
>>>>             }
>>>>         } catch (Throwable t) {
>>>> -            // catch Throwable so nothing slips through the cracks... this is a fairly sensitive operation
>>>>             String errMsg = "Error in polling JobSandbox: [" + t.toString() + "]. Rolling back transaction.";
>>>> -            Debug.logError(t, errMsg, module);
>>>> +            Debug.logWarning(t, errMsg, module);
>>>>             try {
>>>> -                // only rollback the transaction if we started one...
>>>>                 TransactionUtil.rollback(beganTransaction, errMsg, t);
>>>>             } catch (GenericEntityException e2) {
>>>> -                Debug.logError(e2, "[Delegator] Could not rollback transaction: " + e2.toString(), module);
>>>> +                Debug.logWarning(e2, "[Delegator] Could not rollback transaction: " + e2.toString(), module);
>>>>             }
>>>>         } finally {
>>>>             if (jobsIterator != null) {
>>>> @@ -209,12 +206,10 @@ public final class JobManager {
>>>>                 }
>>>>             }
>>>>             try {
>>>> -                // only commit the transaction if we started one... but make sure we try
>>>>                 TransactionUtil.commit(beganTransaction);
>>>>             } catch (GenericTransactionException e) {
>>>>                 String errMsg = "Transaction error trying to commit when polling and updating the JobSandbox: " + e.toString();
>>>> -                // we don't really want to do anything different, so just log and move on
>>>> -                Debug.logError(e, errMsg, module);
>>>> +                Debug.logWarning(e, errMsg, module);
>>>>             }
>>>>         }
>>>>         return poll;
>>>> @@ -222,7 +217,9 @@ public final class JobManager {
>>>>
>>>>     private void reloadCrashedJobs() {
>>>>         List<GenericValue> crashed = null;
>>>> -        List<EntityExpr> statusExprList = UtilMisc.toList(EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, 
>>>> "SERVICE_QUEUED"), EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_RUNNING"));
>>>> +        List<EntityExpr> statusExprList = UtilMisc.toList(EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, 
>>>> "SERVICE_PENDING"),
>>>> +                EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_QUEUED"),
>>>> +                EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_RUNNING"));
>>>>         EntityCondition statusCondition = EntityCondition.makeCondition(statusExprList, EntityOperator.OR);
>>>>         List<EntityExpr> poolsExpr = UtilMisc.toList(EntityCondition.makeCondition("poolId", EntityOperator.EQUALS, null));
>>>>         List<String> pools = ServiceConfigUtil.getRunPools();
>>>> @@ -236,13 +233,13 @@ public final class JobManager {
>>>>         try {
>>>>             crashed = delegator.findList("JobSandbox", mainCondition, null, UtilMisc.toList("startDateTime"), null, false);
>>>>         } catch (GenericEntityException e) {
>>>> -            Debug.logError(e, "Unable to load crashed jobs", module);
>>>> +            Debug.logWarning(e, "Unable to load crashed jobs", module);
>>>>         }
>>>>         if (UtilValidate.isNotEmpty(crashed)) {
>>>> -            try {
>>>> -                int rescheduled = 0;
>>>> -                for (GenericValue job : crashed) {
>>>> -                    Timestamp now = UtilDateTime.nowTimestamp();
>>>> +            int rescheduled = 0;
>>>> +            Timestamp now = UtilDateTime.nowTimestamp();
>>>> +            for (GenericValue job : crashed) {
>>>> +                try {
>>>>                     Debug.logInfo("Scheduling Job : " + job, module);
>>>>                     String pJobId = job.getString("parentJobId");
>>>>                     if (pJobId == null) {
>>>> @@ -261,12 +258,12 @@ public final class JobManager {
>>>>                     job.set("cancelDateTime", now);
>>>>                     delegator.store(job);
>>>>                     rescheduled++;
>>>> +                } catch (GenericEntityException e) {
>>>> +                    Debug.logWarning(e, module);
>>>>                 }
>>>> -                if (Debug.infoOn())
>>>> -                    Debug.logInfo("-- " + rescheduled + " jobs re-scheduled", module);
>>>> -            } catch (GenericEntityException e) {
>>>> -                Debug.logError(e, module);
>>>>             }
>>>> +            if (Debug.infoOn())
>>>> +                Debug.logInfo("-- " + rescheduled + " jobs re-scheduled", module);
>>>>         } else {
>>>>             if (Debug.infoOn())
>>>>                 Debug.logInfo("No crashed jobs to re-schedule", module);
>>>>
>>>> Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java
>>>> URL: 
>>>> http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java?rev=1370566&r1=1370565&r2=1370566&view=diff
>>>> ==============================================================================
>>>> --- 
>>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java (original)
>>>> +++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java Tue Aug  7 22:10:27 2012
>>>> @@ -203,7 +203,11 @@ public final class JobPoller implements
>>>>      */
>>>>     public void queueNow(Job job) throws InvalidJobException {
>>>>         job.queue();
>>>> -        this.executor.execute(new JobInvoker(job));
>>>> +        try {
>>>> +            this.executor.execute(new JobInvoker(job));
>>>> +        } catch (Exception e) {
>>>> +            job.deQueue();
>>>> +        }
>>>>     }
>>>>
>>>>     public void run() {
>>>>
>>>> Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java
>>>> URL: 
>>>> http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java?rev=1370566&r1=1370565&r2=1370566&view=diff
>>>> ==============================================================================
>>>> --- 
>>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java (original)
>>>> +++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java Tue Aug  7 22:10:27 2012
>>>> @@ -20,7 +20,6 @@ package org.ofbiz.service.job;
>>>>
>>>> import java.io.IOException;
>>>> import java.sql.Timestamp;
>>>> -import com.ibm.icu.util.Calendar;
>>>> import java.util.Date;
>>>> import java.util.Map;
>>>>
>>>> @@ -28,14 +27,12 @@ import javax.xml.parsers.ParserConfigura
>>>>
>>>> import javolution.util.FastMap;
>>>>
>>>> +import org.apache.commons.lang.StringUtils;
>>>> import org.ofbiz.base.util.Debug;
>>>> import org.ofbiz.base.util.UtilDateTime;
>>>> import org.ofbiz.base.util.UtilGenerics;
>>>> import org.ofbiz.base.util.UtilProperties;
>>>> import org.ofbiz.base.util.UtilValidate;
>>>> -import org.ofbiz.service.calendar.RecurrenceInfoException;
>>>> -import org.ofbiz.service.calendar.TemporalExpression;
>>>> -import org.ofbiz.service.calendar.TemporalExpressionWorker;
>>>> import org.ofbiz.entity.Delegator;
>>>> import org.ofbiz.entity.GenericEntityException;
>>>> import org.ofbiz.entity.GenericValue;
>>>> @@ -47,10 +44,13 @@ import org.ofbiz.service.DispatchContext
>>>> import org.ofbiz.service.GenericRequester;
>>>> import org.ofbiz.service.ServiceUtil;
>>>> import org.ofbiz.service.calendar.RecurrenceInfo;
>>>> +import org.ofbiz.service.calendar.RecurrenceInfoException;
>>>> +import org.ofbiz.service.calendar.TemporalExpression;
>>>> +import org.ofbiz.service.calendar.TemporalExpressionWorker;
>>>> import org.ofbiz.service.config.ServiceConfigUtil;
>>>> import org.xml.sax.SAXException;
>>>>
>>>> -import org.apache.commons.lang.StringUtils;
>>>> +import com.ibm.icu.util.Calendar;
>>>>
>>>> /**
>>>>  * A {@link Job} that is backed by the entity engine. Job data is stored
>>>> @@ -100,14 +100,15 @@ public class PersistedServiceJob extends
>>>>             // job not available
>>>>             throw new InvalidJobException("Job [" + getJobId() + "] is not available");
>>>>         } else {
>>>> -            // set the start time to now
>>>> -            jobValue.set("startDateTime", UtilDateTime.nowTimestamp());
>>>> -            jobValue.set("statusId", "SERVICE_RUNNING");
>>>> +            jobValue.set("statusId", "SERVICE_QUEUED");
>>>>             try {
>>>>                 jobValue.store();
>>>>             } catch (GenericEntityException e) {
>>>>                 throw new InvalidJobException("Unable to set the startDateTime and statusId on the current job [" + getJobId() 
>>>> + "]; not running!", e);
>>>>             }
>>>> +            if (Debug.verboseOn()) {
>>>> +                Debug.logVerbose("Placing job [" + getJobId() + "] in queue", module);
>>>> +            }
>>>>         }
>>>>     }
>>>>
>>>> @@ -129,6 +130,16 @@ public class PersistedServiceJob extends
>>>>             // This condition isn't possible, but we will leave it here.
>>>>             throw new InvalidJobException("Job has been accepted by a different instance!");
>>>>         }
>>>> +        jobValue.set("startDateTime", UtilDateTime.nowTimestamp());
>>>> +        jobValue.set("statusId", "SERVICE_RUNNING");
>>>> +        try {
>>>> +            jobValue.store();
>>>> +        } catch (GenericEntityException e) {
>>>> +            throw new InvalidJobException("Unable to set the startDateTime and statusId on the current job [" + getJobId() + 
>>>> "]; not running!", e);
>>>> +        }
>>>> +        if (Debug.verboseOn()) {
>>>> +            Debug.logVerbose("Job [" + getJobId() + "] running", module);
>>>> +        }
>>>>         // configure any additional recurrences
>>>>         long maxRecurrenceCount = -1;
>>>>         long currentRecurrenceCount = 0;
>>>> @@ -331,4 +342,24 @@ public class PersistedServiceJob extends
>>>>         }
>>>>         return null;
>>>>     }
>>>> +
>>>> +    @Override
>>>> +    public void deQueue() throws InvalidJobException {
>>>> +        if (currentState != State.QUEUED) {
>>>> +            throw new InvalidJobException("Illegal state change");
>>>> +        }
>>>> +        currentState = State.CREATED;
>>>> +        try {
>>>> +            jobValue.refresh();
>>>> +            jobValue.set("startDateTime", null);
>>>> +            jobValue.set("runByInstanceId", null);
>>>> +            jobValue.set("statusId", "SERVICE_PENDING");
>>>> +            jobValue.store();
>>>> +        } catch (GenericEntityException e) {
>>>> +            throw new InvalidJobException("Unable to dequeue job [" + getJobId() + "]", e);
>>>> +        }
>>>> +        if (Debug.verboseOn()) {
>>>> +            Debug.logVerbose("Job [" + getJobId() + "] not queued, rescheduling", module);
>>>> +        }
>>>> +    }
>>>> }
>>>>
>>>>
> 

Re: svn commit: r1370566 - in /ofbiz/trunk/framework/service/src/org/ofbiz/service/job: GenericServiceJob.java Job.java JobManager.java JobPoller.java PersistedServiceJob.java

Posted by Adrian Crum <ad...@sandglass-software.com>.
You are welcome to put them back in. They seemed redundant (and silly) 
to me. Example:

// Set var to 1
int var = 1;
// Call someMethod, put result in var2
int var2 = someMethod();
// Calculate var1 * var2
int result = var1 * var2

The comments are not adding anything of value - they are only stating 
the obvious.

-Adrian

On 8/8/2012 10:07 PM, Jacques Le Roux wrote:
> Else forgot to say that I like it
>
> Jacques
>
> From: "Jacques Le Roux" <ja...@les7arts.com>
>> Hi Adrian,
>>
>> Why did you remove these comments ?
>>> -            // catch Throwable so nothing slips through the 
>>> cracks... this is a fairly sensitive operation
>>> -                // only rollback the transaction if we started one...
>>> -                // only commit the transaction if we started one... 
>>> but make sure we try
>>> -                // we don't really want to do anything different, 
>>> so just log and move on
>>
>> It seems they add some information
>>
>> Jacques
>>
>> From: <ad...@apache.org>
>>> Author: adrianc
>>> Date: Tue Aug  7 22:10:27 2012
>>> New Revision: 1370566
>>>
>>> URL: http://svn.apache.org/viewvc?rev=1370566&view=rev
>>> Log:
>>> Fixed the Job Scheduler so jobs are not lost during heavy server load.
>>>
>>> Modified:
>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java
>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java
>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java
>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java
>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java
>>>
>>> Modified: 
>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java
>>> URL: 
>>> http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java?rev=1370566&r1=1370565&r2=1370566&view=diff
>>> ============================================================================== 
>>>
>>> --- 
>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java 
>>> (original)
>>> +++ 
>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java 
>>> Tue Aug  7 22:10:27 2012
>>> @@ -144,4 +144,12 @@ public class GenericServiceJob extends A
>>>     public boolean isValid() {
>>>         return currentState == State.CREATED;
>>>     }
>>> +
>>> +    @Override
>>> +    public void deQueue() throws InvalidJobException {
>>> +        if (currentState != State.QUEUED) {
>>> +            throw new InvalidJobException("Illegal state change");
>>> +        }
>>> +        throw new InvalidJobException("Unable to queue job [" + 
>>> getJobId() + "]");
>>> +    }
>>> }
>>>
>>> Modified: 
>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java
>>> URL: 
>>> http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java?rev=1370566&r1=1370565&r2=1370566&view=diff
>>> ============================================================================== 
>>>
>>> --- ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java 
>>> (original)
>>> +++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java 
>>> Tue Aug  7 22:10:27 2012
>>> @@ -60,7 +60,13 @@ public interface Job {
>>>     boolean isValid();
>>>
>>>     /**
>>> -     * Transitions the job to the queued state.
>>> +     * Transitions this job to the pre-queued (created) state. The 
>>> job manager
>>> +     * will call this method when there was a problem adding this 
>>> job to the queue.
>>> +     */
>>> +    void deQueue() throws InvalidJobException;
>>> +
>>> +    /**
>>> +     * Transitions this job to the queued state.
>>>      */
>>>     void queue() throws InvalidJobException;
>>> }
>>>
>>> Modified: 
>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java
>>> URL: 
>>> http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java?rev=1370566&r1=1370565&r2=1370566&view=diff
>>> ============================================================================== 
>>>
>>> --- 
>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java 
>>> (original)
>>> +++ 
>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java 
>>> Tue Aug  7 22:10:27 2012
>>> @@ -63,7 +63,6 @@ public final class JobManager {
>>>
>>>     public static final String module = JobManager.class.getName();
>>>     public static final String instanceId = 
>>> UtilProperties.getPropertyValue("general.properties", 
>>> "unique.instanceId", "ofbiz0");
>>> -    public static final Map<String, Object> updateFields = 
>>> UtilMisc.<String, Object>toMap("runByInstanceId", instanceId, 
>>> "statusId", "SERVICE_QUEUED");
>>>     private static final ConcurrentHashMap<String, JobManager> 
>>> registeredManagers = new ConcurrentHashMap<String, JobManager>();
>>>     private static boolean isShutDown = false;
>>>
>>> @@ -150,7 +149,7 @@ public final class JobManager {
>>>         assertIsRunning();
>>>         DispatchContext dctx = getDispatcher().getDispatchContext();
>>>         if (dctx == null) {
>>> -            Debug.logError("Unable to locate DispatchContext 
>>> object; not running job!", module);
>>> +            Debug.logWarning("Unable to locate DispatchContext 
>>> object; not running job!", module);
>>>             return null;
>>>         }
>>>         List<Job> poll = FastList.newInstance();
>>> @@ -176,13 +175,13 @@ public final class JobManager {
>>>         try {
>>>             beganTransaction = TransactionUtil.begin();
>>>             if (!beganTransaction) {
>>> -                Debug.logError("Unable to poll JobSandbox for jobs; 
>>> transaction was not started by this process", module);
>>> +                Debug.logWarning("Unable to poll JobSandbox for 
>>> jobs; transaction was not started by this process", module);
>>>                 return null;
>>>             }
>>>             jobsIterator = delegator.find("JobSandbox", 
>>> mainCondition, null, null, UtilMisc.toList("runTime"), null);
>>>             GenericValue jobValue = jobsIterator.next();
>>>             while (jobValue != null) {
>>> -                jobValue.putAll(updateFields);
>>> +                jobValue.set("runByInstanceId", instanceId); // 
>>> Claim ownership of this value.
>>>                 jobValue.store();
>>>                 poll.add(new PersistedServiceJob(dctx, jobValue, 
>>> null));
>>>                 if (poll.size() == limit) {
>>> @@ -191,14 +190,12 @@ public final class JobManager {
>>>                 jobValue = jobsIterator.next();
>>>             }
>>>         } catch (Throwable t) {
>>> -            // catch Throwable so nothing slips through the 
>>> cracks... this is a fairly sensitive operation
>>>             String errMsg = "Error in polling JobSandbox: [" + 
>>> t.toString() + "]. Rolling back transaction.";
>>> -            Debug.logError(t, errMsg, module);
>>> +            Debug.logWarning(t, errMsg, module);
>>>             try {
>>> -                // only rollback the transaction if we started one...
>>>                 TransactionUtil.rollback(beganTransaction, errMsg, t);
>>>             } catch (GenericEntityException e2) {
>>> -                Debug.logError(e2, "[Delegator] Could not rollback 
>>> transaction: " + e2.toString(), module);
>>> +                Debug.logWarning(e2, "[Delegator] Could not 
>>> rollback transaction: " + e2.toString(), module);
>>>             }
>>>         } finally {
>>>             if (jobsIterator != null) {
>>> @@ -209,12 +206,10 @@ public final class JobManager {
>>>                 }
>>>             }
>>>             try {
>>> -                // only commit the transaction if we started one... 
>>> but make sure we try
>>>                 TransactionUtil.commit(beganTransaction);
>>>             } catch (GenericTransactionException e) {
>>>                 String errMsg = "Transaction error trying to commit 
>>> when polling and updating the JobSandbox: " + e.toString();
>>> -                // we don't really want to do anything different, 
>>> so just log and move on
>>> -                Debug.logError(e, errMsg, module);
>>> +                Debug.logWarning(e, errMsg, module);
>>>             }
>>>         }
>>>         return poll;
>>> @@ -222,7 +217,9 @@ public final class JobManager {
>>>
>>>     private void reloadCrashedJobs() {
>>>         List<GenericValue> crashed = null;
>>> -        List<EntityExpr> statusExprList = 
>>> UtilMisc.toList(EntityCondition.makeCondition("statusId", 
>>> EntityOperator.EQUALS, "SERVICE_QUEUED"), 
>>> EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, 
>>> "SERVICE_RUNNING"));
>>> +        List<EntityExpr> statusExprList = 
>>> UtilMisc.toList(EntityCondition.makeCondition("statusId", 
>>> EntityOperator.EQUALS, "SERVICE_PENDING"),
>>> +                EntityCondition.makeCondition("statusId", 
>>> EntityOperator.EQUALS, "SERVICE_QUEUED"),
>>> +                EntityCondition.makeCondition("statusId", 
>>> EntityOperator.EQUALS, "SERVICE_RUNNING"));
>>>         EntityCondition statusCondition = 
>>> EntityCondition.makeCondition(statusExprList, EntityOperator.OR);
>>>         List<EntityExpr> poolsExpr = 
>>> UtilMisc.toList(EntityCondition.makeCondition("poolId", 
>>> EntityOperator.EQUALS, null));
>>>         List<String> pools = ServiceConfigUtil.getRunPools();
>>> @@ -236,13 +233,13 @@ public final class JobManager {
>>>         try {
>>>             crashed = delegator.findList("JobSandbox", 
>>> mainCondition, null, UtilMisc.toList("startDateTime"), null, false);
>>>         } catch (GenericEntityException e) {
>>> -            Debug.logError(e, "Unable to load crashed jobs", module);
>>> +            Debug.logWarning(e, "Unable to load crashed jobs", 
>>> module);
>>>         }
>>>         if (UtilValidate.isNotEmpty(crashed)) {
>>> -            try {
>>> -                int rescheduled = 0;
>>> -                for (GenericValue job : crashed) {
>>> -                    Timestamp now = UtilDateTime.nowTimestamp();
>>> +            int rescheduled = 0;
>>> +            Timestamp now = UtilDateTime.nowTimestamp();
>>> +            for (GenericValue job : crashed) {
>>> +                try {
>>>                     Debug.logInfo("Scheduling Job : " + job, module);
>>>                     String pJobId = job.getString("parentJobId");
>>>                     if (pJobId == null) {
>>> @@ -261,12 +258,12 @@ public final class JobManager {
>>>                     job.set("cancelDateTime", now);
>>>                     delegator.store(job);
>>>                     rescheduled++;
>>> +                } catch (GenericEntityException e) {
>>> +                    Debug.logWarning(e, module);
>>>                 }
>>> -                if (Debug.infoOn())
>>> -                    Debug.logInfo("-- " + rescheduled + " jobs 
>>> re-scheduled", module);
>>> -            } catch (GenericEntityException e) {
>>> -                Debug.logError(e, module);
>>>             }
>>> +            if (Debug.infoOn())
>>> +                Debug.logInfo("-- " + rescheduled + " jobs 
>>> re-scheduled", module);
>>>         } else {
>>>             if (Debug.infoOn())
>>>                 Debug.logInfo("No crashed jobs to re-schedule", 
>>> module);
>>>
>>> Modified: 
>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java
>>> URL: 
>>> http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java?rev=1370566&r1=1370565&r2=1370566&view=diff
>>> ============================================================================== 
>>>
>>> --- 
>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java 
>>> (original)
>>> +++ 
>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java 
>>> Tue Aug  7 22:10:27 2012
>>> @@ -203,7 +203,11 @@ public final class JobPoller implements
>>>      */
>>>     public void queueNow(Job job) throws InvalidJobException {
>>>         job.queue();
>>> -        this.executor.execute(new JobInvoker(job));
>>> +        try {
>>> +            this.executor.execute(new JobInvoker(job));
>>> +        } catch (Exception e) {
>>> +            job.deQueue();
>>> +        }
>>>     }
>>>
>>>     public void run() {
>>>
>>> Modified: 
>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java
>>> URL: 
>>> http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java?rev=1370566&r1=1370565&r2=1370566&view=diff
>>> ============================================================================== 
>>>
>>> --- 
>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java 
>>> (original)
>>> +++ 
>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java 
>>> Tue Aug  7 22:10:27 2012
>>> @@ -20,7 +20,6 @@ package org.ofbiz.service.job;
>>>
>>> import java.io.IOException;
>>> import java.sql.Timestamp;
>>> -import com.ibm.icu.util.Calendar;
>>> import java.util.Date;
>>> import java.util.Map;
>>>
>>> @@ -28,14 +27,12 @@ import javax.xml.parsers.ParserConfigura
>>>
>>> import javolution.util.FastMap;
>>>
>>> +import org.apache.commons.lang.StringUtils;
>>> import org.ofbiz.base.util.Debug;
>>> import org.ofbiz.base.util.UtilDateTime;
>>> import org.ofbiz.base.util.UtilGenerics;
>>> import org.ofbiz.base.util.UtilProperties;
>>> import org.ofbiz.base.util.UtilValidate;
>>> -import org.ofbiz.service.calendar.RecurrenceInfoException;
>>> -import org.ofbiz.service.calendar.TemporalExpression;
>>> -import org.ofbiz.service.calendar.TemporalExpressionWorker;
>>> import org.ofbiz.entity.Delegator;
>>> import org.ofbiz.entity.GenericEntityException;
>>> import org.ofbiz.entity.GenericValue;
>>> @@ -47,10 +44,13 @@ import org.ofbiz.service.DispatchContext
>>> import org.ofbiz.service.GenericRequester;
>>> import org.ofbiz.service.ServiceUtil;
>>> import org.ofbiz.service.calendar.RecurrenceInfo;
>>> +import org.ofbiz.service.calendar.RecurrenceInfoException;
>>> +import org.ofbiz.service.calendar.TemporalExpression;
>>> +import org.ofbiz.service.calendar.TemporalExpressionWorker;
>>> import org.ofbiz.service.config.ServiceConfigUtil;
>>> import org.xml.sax.SAXException;
>>>
>>> -import org.apache.commons.lang.StringUtils;
>>> +import com.ibm.icu.util.Calendar;
>>>
>>> /**
>>>  * A {@link Job} that is backed by the entity engine. Job data is 
>>> stored
>>> @@ -100,14 +100,15 @@ public class PersistedServiceJob extends
>>>             // job not available
>>>             throw new InvalidJobException("Job [" + getJobId() + "] 
>>> is not available");
>>>         } else {
>>> -            // set the start time to now
>>> -            jobValue.set("startDateTime", 
>>> UtilDateTime.nowTimestamp());
>>> -            jobValue.set("statusId", "SERVICE_RUNNING");
>>> +            jobValue.set("statusId", "SERVICE_QUEUED");
>>>             try {
>>>                 jobValue.store();
>>>             } catch (GenericEntityException e) {
>>>                 throw new InvalidJobException("Unable to set the 
>>> startDateTime and statusId on the current job [" + getJobId() + "]; 
>>> not running!", e);
>>>             }
>>> +            if (Debug.verboseOn()) {
>>> +                Debug.logVerbose("Placing job [" + getJobId() + "] 
>>> in queue", module);
>>> +            }
>>>         }
>>>     }
>>>
>>> @@ -129,6 +130,16 @@ public class PersistedServiceJob extends
>>>             // This condition isn't possible, but we will leave it 
>>> here.
>>>             throw new InvalidJobException("Job has been accepted by 
>>> a different instance!");
>>>         }
>>> +        jobValue.set("startDateTime", UtilDateTime.nowTimestamp());
>>> +        jobValue.set("statusId", "SERVICE_RUNNING");
>>> +        try {
>>> +            jobValue.store();
>>> +        } catch (GenericEntityException e) {
>>> +            throw new InvalidJobException("Unable to set the 
>>> startDateTime and statusId on the current job [" + getJobId() + "]; 
>>> not running!", e);
>>> +        }
>>> +        if (Debug.verboseOn()) {
>>> +            Debug.logVerbose("Job [" + getJobId() + "] running", 
>>> module);
>>> +        }
>>>         // configure any additional recurrences
>>>         long maxRecurrenceCount = -1;
>>>         long currentRecurrenceCount = 0;
>>> @@ -331,4 +342,24 @@ public class PersistedServiceJob extends
>>>         }
>>>         return null;
>>>     }
>>> +
>>> +    @Override
>>> +    public void deQueue() throws InvalidJobException {
>>> +        if (currentState != State.QUEUED) {
>>> +            throw new InvalidJobException("Illegal state change");
>>> +        }
>>> +        currentState = State.CREATED;
>>> +        try {
>>> +            jobValue.refresh();
>>> +            jobValue.set("startDateTime", null);
>>> +            jobValue.set("runByInstanceId", null);
>>> +            jobValue.set("statusId", "SERVICE_PENDING");
>>> +            jobValue.store();
>>> +        } catch (GenericEntityException e) {
>>> +            throw new InvalidJobException("Unable to dequeue job [" 
>>> + getJobId() + "]", e);
>>> +        }
>>> +        if (Debug.verboseOn()) {
>>> +            Debug.logVerbose("Job [" + getJobId() + "] not queued, 
>>> rescheduling", module);
>>> +        }
>>> +    }
>>> }
>>>
>>>


Re: svn commit: r1370566 - in /ofbiz/trunk/framework/service/src/org/ofbiz/service/job: GenericServiceJob.java Job.java JobManager.java JobPoller.java PersistedServiceJob.java

Posted by Jacques Le Roux <ja...@les7arts.com>.
Else forgot to say that I like it

Jacques

From: "Jacques Le Roux" <ja...@les7arts.com>
> Hi Adrian,
>
> Why did you remove these comments ?
>> -            // catch Throwable so nothing slips through the cracks... this is a fairly sensitive operation
>> -                // only rollback the transaction if we started one...
>> -                // only commit the transaction if we started one... but make sure we try
>> -                // we don't really want to do anything different, so just log and move on
>
> It seems they add some information
>
> Jacques
>
> From: <ad...@apache.org>
>> Author: adrianc
>> Date: Tue Aug  7 22:10:27 2012
>> New Revision: 1370566
>>
>> URL: http://svn.apache.org/viewvc?rev=1370566&view=rev
>> Log:
>> Fixed the Job Scheduler so jobs are not lost during heavy server load.
>>
>> Modified:
>>    ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java
>>    ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java
>>    ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java
>>    ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java
>>    ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java
>>
>> Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java
>> URL: 
>> http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java?rev=1370566&r1=1370565&r2=1370566&view=diff
>> ==============================================================================
>> --- ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java (original)
>> +++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java Tue Aug  7 22:10:27 2012
>> @@ -144,4 +144,12 @@ public class GenericServiceJob extends A
>>     public boolean isValid() {
>>         return currentState == State.CREATED;
>>     }
>> +
>> +    @Override
>> +    public void deQueue() throws InvalidJobException {
>> +        if (currentState != State.QUEUED) {
>> +            throw new InvalidJobException("Illegal state change");
>> +        }
>> +        throw new InvalidJobException("Unable to queue job [" + getJobId() + "]");
>> +    }
>> }
>>
>> Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java
>> URL: 
>> http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java?rev=1370566&r1=1370565&r2=1370566&view=diff
>> ==============================================================================
>> --- ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java (original)
>> +++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java Tue Aug  7 22:10:27 2012
>> @@ -60,7 +60,13 @@ public interface Job {
>>     boolean isValid();
>>
>>     /**
>> -     * Transitions the job to the queued state.
>> +     * Transitions this job to the pre-queued (created) state. The job manager
>> +     * will call this method when there was a problem adding this job to the queue.
>> +     */
>> +    void deQueue() throws InvalidJobException;
>> +
>> +    /**
>> +     * Transitions this job to the queued state.
>>      */
>>     void queue() throws InvalidJobException;
>> }
>>
>> Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java
>> URL: 
>> http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java?rev=1370566&r1=1370565&r2=1370566&view=diff
>> ==============================================================================
>> --- ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java (original)
>> +++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java Tue Aug  7 22:10:27 2012
>> @@ -63,7 +63,6 @@ public final class JobManager {
>>
>>     public static final String module = JobManager.class.getName();
>>     public static final String instanceId = UtilProperties.getPropertyValue("general.properties", "unique.instanceId", "ofbiz0");
>> -    public static final Map<String, Object> updateFields = UtilMisc.<String, Object>toMap("runByInstanceId", instanceId, 
>> "statusId", "SERVICE_QUEUED");
>>     private static final ConcurrentHashMap<String, JobManager> registeredManagers = new ConcurrentHashMap<String, JobManager>();
>>     private static boolean isShutDown = false;
>>
>> @@ -150,7 +149,7 @@ public final class JobManager {
>>         assertIsRunning();
>>         DispatchContext dctx = getDispatcher().getDispatchContext();
>>         if (dctx == null) {
>> -            Debug.logError("Unable to locate DispatchContext object; not running job!", module);
>> +            Debug.logWarning("Unable to locate DispatchContext object; not running job!", module);
>>             return null;
>>         }
>>         List<Job> poll = FastList.newInstance();
>> @@ -176,13 +175,13 @@ public final class JobManager {
>>         try {
>>             beganTransaction = TransactionUtil.begin();
>>             if (!beganTransaction) {
>> -                Debug.logError("Unable to poll JobSandbox for jobs; transaction was not started by this process", module);
>> +                Debug.logWarning("Unable to poll JobSandbox for jobs; transaction was not started by this process", module);
>>                 return null;
>>             }
>>             jobsIterator = delegator.find("JobSandbox", mainCondition, null, null, UtilMisc.toList("runTime"), null);
>>             GenericValue jobValue = jobsIterator.next();
>>             while (jobValue != null) {
>> -                jobValue.putAll(updateFields);
>> +                jobValue.set("runByInstanceId", instanceId);  // Claim ownership of this value.
>>                 jobValue.store();
>>                 poll.add(new PersistedServiceJob(dctx, jobValue, null));
>>                 if (poll.size() == limit) {
>> @@ -191,14 +190,12 @@ public final class JobManager {
>>                 jobValue = jobsIterator.next();
>>             }
>>         } catch (Throwable t) {
>> -            // catch Throwable so nothing slips through the cracks... this is a fairly sensitive operation
>>             String errMsg = "Error in polling JobSandbox: [" + t.toString() + "]. Rolling back transaction.";
>> -            Debug.logError(t, errMsg, module);
>> +            Debug.logWarning(t, errMsg, module);
>>             try {
>> -                // only rollback the transaction if we started one...
>>                 TransactionUtil.rollback(beganTransaction, errMsg, t);
>>             } catch (GenericEntityException e2) {
>> -                Debug.logError(e2, "[Delegator] Could not rollback transaction: " + e2.toString(), module);
>> +                Debug.logWarning(e2, "[Delegator] Could not rollback transaction: " + e2.toString(), module);
>>             }
>>         } finally {
>>             if (jobsIterator != null) {
>> @@ -209,12 +206,10 @@ public final class JobManager {
>>                 }
>>             }
>>             try {
>> -                // only commit the transaction if we started one... but make sure we try
>>                 TransactionUtil.commit(beganTransaction);
>>             } catch (GenericTransactionException e) {
>>                 String errMsg = "Transaction error trying to commit when polling and updating the JobSandbox: " + e.toString();
>> -                // we don't really want to do anything different, so just log and move on
>> -                Debug.logError(e, errMsg, module);
>> +                Debug.logWarning(e, errMsg, module);
>>             }
>>         }
>>         return poll;
>> @@ -222,7 +217,9 @@ public final class JobManager {
>>
>>     private void reloadCrashedJobs() {
>>         List<GenericValue> crashed = null;
>> -        List<EntityExpr> statusExprList = UtilMisc.toList(EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, 
>> "SERVICE_QUEUED"), EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_RUNNING"));
>> +        List<EntityExpr> statusExprList = UtilMisc.toList(EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, 
>> "SERVICE_PENDING"),
>> +                EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_QUEUED"),
>> +                EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_RUNNING"));
>>         EntityCondition statusCondition = EntityCondition.makeCondition(statusExprList, EntityOperator.OR);
>>         List<EntityExpr> poolsExpr = UtilMisc.toList(EntityCondition.makeCondition("poolId", EntityOperator.EQUALS, null));
>>         List<String> pools = ServiceConfigUtil.getRunPools();
>> @@ -236,13 +233,13 @@ public final class JobManager {
>>         try {
>>             crashed = delegator.findList("JobSandbox", mainCondition, null, UtilMisc.toList("startDateTime"), null, false);
>>         } catch (GenericEntityException e) {
>> -            Debug.logError(e, "Unable to load crashed jobs", module);
>> +            Debug.logWarning(e, "Unable to load crashed jobs", module);
>>         }
>>         if (UtilValidate.isNotEmpty(crashed)) {
>> -            try {
>> -                int rescheduled = 0;
>> -                for (GenericValue job : crashed) {
>> -                    Timestamp now = UtilDateTime.nowTimestamp();
>> +            int rescheduled = 0;
>> +            Timestamp now = UtilDateTime.nowTimestamp();
>> +            for (GenericValue job : crashed) {
>> +                try {
>>                     Debug.logInfo("Scheduling Job : " + job, module);
>>                     String pJobId = job.getString("parentJobId");
>>                     if (pJobId == null) {
>> @@ -261,12 +258,12 @@ public final class JobManager {
>>                     job.set("cancelDateTime", now);
>>                     delegator.store(job);
>>                     rescheduled++;
>> +                } catch (GenericEntityException e) {
>> +                    Debug.logWarning(e, module);
>>                 }
>> -                if (Debug.infoOn())
>> -                    Debug.logInfo("-- " + rescheduled + " jobs re-scheduled", module);
>> -            } catch (GenericEntityException e) {
>> -                Debug.logError(e, module);
>>             }
>> +            if (Debug.infoOn())
>> +                Debug.logInfo("-- " + rescheduled + " jobs re-scheduled", module);
>>         } else {
>>             if (Debug.infoOn())
>>                 Debug.logInfo("No crashed jobs to re-schedule", module);
>>
>> Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java
>> URL: 
>> http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java?rev=1370566&r1=1370565&r2=1370566&view=diff
>> ==============================================================================
>> --- ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java (original)
>> +++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java Tue Aug  7 22:10:27 2012
>> @@ -203,7 +203,11 @@ public final class JobPoller implements
>>      */
>>     public void queueNow(Job job) throws InvalidJobException {
>>         job.queue();
>> -        this.executor.execute(new JobInvoker(job));
>> +        try {
>> +            this.executor.execute(new JobInvoker(job));
>> +        } catch (Exception e) {
>> +            job.deQueue();
>> +        }
>>     }
>>
>>     public void run() {
>>
>> Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java
>> URL: 
>> http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java?rev=1370566&r1=1370565&r2=1370566&view=diff
>> ==============================================================================
>> --- ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java (original)
>> +++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java Tue Aug  7 22:10:27 2012
>> @@ -20,7 +20,6 @@ package org.ofbiz.service.job;
>>
>> import java.io.IOException;
>> import java.sql.Timestamp;
>> -import com.ibm.icu.util.Calendar;
>> import java.util.Date;
>> import java.util.Map;
>>
>> @@ -28,14 +27,12 @@ import javax.xml.parsers.ParserConfigura
>>
>> import javolution.util.FastMap;
>>
>> +import org.apache.commons.lang.StringUtils;
>> import org.ofbiz.base.util.Debug;
>> import org.ofbiz.base.util.UtilDateTime;
>> import org.ofbiz.base.util.UtilGenerics;
>> import org.ofbiz.base.util.UtilProperties;
>> import org.ofbiz.base.util.UtilValidate;
>> -import org.ofbiz.service.calendar.RecurrenceInfoException;
>> -import org.ofbiz.service.calendar.TemporalExpression;
>> -import org.ofbiz.service.calendar.TemporalExpressionWorker;
>> import org.ofbiz.entity.Delegator;
>> import org.ofbiz.entity.GenericEntityException;
>> import org.ofbiz.entity.GenericValue;
>> @@ -47,10 +44,13 @@ import org.ofbiz.service.DispatchContext
>> import org.ofbiz.service.GenericRequester;
>> import org.ofbiz.service.ServiceUtil;
>> import org.ofbiz.service.calendar.RecurrenceInfo;
>> +import org.ofbiz.service.calendar.RecurrenceInfoException;
>> +import org.ofbiz.service.calendar.TemporalExpression;
>> +import org.ofbiz.service.calendar.TemporalExpressionWorker;
>> import org.ofbiz.service.config.ServiceConfigUtil;
>> import org.xml.sax.SAXException;
>>
>> -import org.apache.commons.lang.StringUtils;
>> +import com.ibm.icu.util.Calendar;
>>
>> /**
>>  * A {@link Job} that is backed by the entity engine. Job data is stored
>> @@ -100,14 +100,15 @@ public class PersistedServiceJob extends
>>             // job not available
>>             throw new InvalidJobException("Job [" + getJobId() + "] is not available");
>>         } else {
>> -            // set the start time to now
>> -            jobValue.set("startDateTime", UtilDateTime.nowTimestamp());
>> -            jobValue.set("statusId", "SERVICE_RUNNING");
>> +            jobValue.set("statusId", "SERVICE_QUEUED");
>>             try {
>>                 jobValue.store();
>>             } catch (GenericEntityException e) {
>>                 throw new InvalidJobException("Unable to set the startDateTime and statusId on the current job [" + getJobId() + 
>> "]; not running!", e);
>>             }
>> +            if (Debug.verboseOn()) {
>> +                Debug.logVerbose("Placing job [" + getJobId() + "] in queue", module);
>> +            }
>>         }
>>     }
>>
>> @@ -129,6 +130,16 @@ public class PersistedServiceJob extends
>>             // This condition isn't possible, but we will leave it here.
>>             throw new InvalidJobException("Job has been accepted by a different instance!");
>>         }
>> +        jobValue.set("startDateTime", UtilDateTime.nowTimestamp());
>> +        jobValue.set("statusId", "SERVICE_RUNNING");
>> +        try {
>> +            jobValue.store();
>> +        } catch (GenericEntityException e) {
>> +            throw new InvalidJobException("Unable to set the startDateTime and statusId on the current job [" + getJobId() + "]; 
>> not running!", e);
>> +        }
>> +        if (Debug.verboseOn()) {
>> +            Debug.logVerbose("Job [" + getJobId() + "] running", module);
>> +        }
>>         // configure any additional recurrences
>>         long maxRecurrenceCount = -1;
>>         long currentRecurrenceCount = 0;
>> @@ -331,4 +342,24 @@ public class PersistedServiceJob extends
>>         }
>>         return null;
>>     }
>> +
>> +    @Override
>> +    public void deQueue() throws InvalidJobException {
>> +        if (currentState != State.QUEUED) {
>> +            throw new InvalidJobException("Illegal state change");
>> +        }
>> +        currentState = State.CREATED;
>> +        try {
>> +            jobValue.refresh();
>> +            jobValue.set("startDateTime", null);
>> +            jobValue.set("runByInstanceId", null);
>> +            jobValue.set("statusId", "SERVICE_PENDING");
>> +            jobValue.store();
>> +        } catch (GenericEntityException e) {
>> +            throw new InvalidJobException("Unable to dequeue job [" + getJobId() + "]", e);
>> +        }
>> +        if (Debug.verboseOn()) {
>> +            Debug.logVerbose("Job [" + getJobId() + "] not queued, rescheduling", module);
>> +        }
>> +    }
>> }
>>
>> 

Re: svn commit: r1370566 - in /ofbiz/trunk/framework/service/src/org/ofbiz/service/job: GenericServiceJob.java Job.java JobManager.java JobPoller.java PersistedServiceJob.java

Posted by Jacques Le Roux <ja...@les7arts.com>.
Hi Adrian,

Why did you remove these comments ?
> -            // catch Throwable so nothing slips through the cracks... this is a fairly sensitive operation
> -                // only rollback the transaction if we started one...
> -                // only commit the transaction if we started one... but make sure we try
> -                // we don't really want to do anything different, so just log and move on

It seems they add some information

Jacques

From: <ad...@apache.org>
> Author: adrianc
> Date: Tue Aug  7 22:10:27 2012
> New Revision: 1370566
>
> URL: http://svn.apache.org/viewvc?rev=1370566&view=rev
> Log:
> Fixed the Job Scheduler so jobs are not lost during heavy server load.
>
> Modified:
>    ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java
>    ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java
>    ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java
>    ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java
>    ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java
>
> Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java
> URL: 
> http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java?rev=1370566&r1=1370565&r2=1370566&view=diff
> ==============================================================================
> --- ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java (original)
> +++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java Tue Aug  7 22:10:27 2012
> @@ -144,4 +144,12 @@ public class GenericServiceJob extends A
>     public boolean isValid() {
>         return currentState == State.CREATED;
>     }
> +
> +    @Override
> +    public void deQueue() throws InvalidJobException {
> +        if (currentState != State.QUEUED) {
> +            throw new InvalidJobException("Illegal state change");
> +        }
> +        throw new InvalidJobException("Unable to queue job [" + getJobId() + "]");
> +    }
> }
>
> Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java
> URL: 
> http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java?rev=1370566&r1=1370565&r2=1370566&view=diff
> ==============================================================================
> --- ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java (original)
> +++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java Tue Aug  7 22:10:27 2012
> @@ -60,7 +60,13 @@ public interface Job {
>     boolean isValid();
>
>     /**
> -     * Transitions the job to the queued state.
> +     * Transitions this job to the pre-queued (created) state. The job manager
> +     * will call this method when there was a problem adding this job to the queue.
> +     */
> +    void deQueue() throws InvalidJobException;
> +
> +    /**
> +     * Transitions this job to the queued state.
>      */
>     void queue() throws InvalidJobException;
> }
>
> Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java
> URL: 
> http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java?rev=1370566&r1=1370565&r2=1370566&view=diff
> ==============================================================================
> --- ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java (original)
> +++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java Tue Aug  7 22:10:27 2012
> @@ -63,7 +63,6 @@ public final class JobManager {
>
>     public static final String module = JobManager.class.getName();
>     public static final String instanceId = UtilProperties.getPropertyValue("general.properties", "unique.instanceId", "ofbiz0");
> -    public static final Map<String, Object> updateFields = UtilMisc.<String, Object>toMap("runByInstanceId", instanceId, 
> "statusId", "SERVICE_QUEUED");
>     private static final ConcurrentHashMap<String, JobManager> registeredManagers = new ConcurrentHashMap<String, JobManager>();
>     private static boolean isShutDown = false;
>
> @@ -150,7 +149,7 @@ public final class JobManager {
>         assertIsRunning();
>         DispatchContext dctx = getDispatcher().getDispatchContext();
>         if (dctx == null) {
> -            Debug.logError("Unable to locate DispatchContext object; not running job!", module);
> +            Debug.logWarning("Unable to locate DispatchContext object; not running job!", module);
>             return null;
>         }
>         List<Job> poll = FastList.newInstance();
> @@ -176,13 +175,13 @@ public final class JobManager {
>         try {
>             beganTransaction = TransactionUtil.begin();
>             if (!beganTransaction) {
> -                Debug.logError("Unable to poll JobSandbox for jobs; transaction was not started by this process", module);
> +                Debug.logWarning("Unable to poll JobSandbox for jobs; transaction was not started by this process", module);
>                 return null;
>             }
>             jobsIterator = delegator.find("JobSandbox", mainCondition, null, null, UtilMisc.toList("runTime"), null);
>             GenericValue jobValue = jobsIterator.next();
>             while (jobValue != null) {
> -                jobValue.putAll(updateFields);
> +                jobValue.set("runByInstanceId", instanceId);  // Claim ownership of this value.
>                 jobValue.store();
>                 poll.add(new PersistedServiceJob(dctx, jobValue, null));
>                 if (poll.size() == limit) {
> @@ -191,14 +190,12 @@ public final class JobManager {
>                 jobValue = jobsIterator.next();
>             }
>         } catch (Throwable t) {
> -            // catch Throwable so nothing slips through the cracks... this is a fairly sensitive operation
>             String errMsg = "Error in polling JobSandbox: [" + t.toString() + "]. Rolling back transaction.";
> -            Debug.logError(t, errMsg, module);
> +            Debug.logWarning(t, errMsg, module);
>             try {
> -                // only rollback the transaction if we started one...
>                 TransactionUtil.rollback(beganTransaction, errMsg, t);
>             } catch (GenericEntityException e2) {
> -                Debug.logError(e2, "[Delegator] Could not rollback transaction: " + e2.toString(), module);
> +                Debug.logWarning(e2, "[Delegator] Could not rollback transaction: " + e2.toString(), module);
>             }
>         } finally {
>             if (jobsIterator != null) {
> @@ -209,12 +206,10 @@ public final class JobManager {
>                 }
>             }
>             try {
> -                // only commit the transaction if we started one... but make sure we try
>                 TransactionUtil.commit(beganTransaction);
>             } catch (GenericTransactionException e) {
>                 String errMsg = "Transaction error trying to commit when polling and updating the JobSandbox: " + e.toString();
> -                // we don't really want to do anything different, so just log and move on
> -                Debug.logError(e, errMsg, module);
> +                Debug.logWarning(e, errMsg, module);
>             }
>         }
>         return poll;
> @@ -222,7 +217,9 @@ public final class JobManager {
>
>     private void reloadCrashedJobs() {
>         List<GenericValue> crashed = null;
> -        List<EntityExpr> statusExprList = UtilMisc.toList(EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, 
> "SERVICE_QUEUED"), EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_RUNNING"));
> +        List<EntityExpr> statusExprList = UtilMisc.toList(EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, 
> "SERVICE_PENDING"),
> +                EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_QUEUED"),
> +                EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_RUNNING"));
>         EntityCondition statusCondition = EntityCondition.makeCondition(statusExprList, EntityOperator.OR);
>         List<EntityExpr> poolsExpr = UtilMisc.toList(EntityCondition.makeCondition("poolId", EntityOperator.EQUALS, null));
>         List<String> pools = ServiceConfigUtil.getRunPools();
> @@ -236,13 +233,13 @@ public final class JobManager {
>         try {
>             crashed = delegator.findList("JobSandbox", mainCondition, null, UtilMisc.toList("startDateTime"), null, false);
>         } catch (GenericEntityException e) {
> -            Debug.logError(e, "Unable to load crashed jobs", module);
> +            Debug.logWarning(e, "Unable to load crashed jobs", module);
>         }
>         if (UtilValidate.isNotEmpty(crashed)) {
> -            try {
> -                int rescheduled = 0;
> -                for (GenericValue job : crashed) {
> -                    Timestamp now = UtilDateTime.nowTimestamp();
> +            int rescheduled = 0;
> +            Timestamp now = UtilDateTime.nowTimestamp();
> +            for (GenericValue job : crashed) {
> +                try {
>                     Debug.logInfo("Scheduling Job : " + job, module);
>                     String pJobId = job.getString("parentJobId");
>                     if (pJobId == null) {
> @@ -261,12 +258,12 @@ public final class JobManager {
>                     job.set("cancelDateTime", now);
>                     delegator.store(job);
>                     rescheduled++;
> +                } catch (GenericEntityException e) {
> +                    Debug.logWarning(e, module);
>                 }
> -                if (Debug.infoOn())
> -                    Debug.logInfo("-- " + rescheduled + " jobs re-scheduled", module);
> -            } catch (GenericEntityException e) {
> -                Debug.logError(e, module);
>             }
> +            if (Debug.infoOn())
> +                Debug.logInfo("-- " + rescheduled + " jobs re-scheduled", module);
>         } else {
>             if (Debug.infoOn())
>                 Debug.logInfo("No crashed jobs to re-schedule", module);
>
> Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java
> URL: 
> http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java?rev=1370566&r1=1370565&r2=1370566&view=diff
> ==============================================================================
> --- ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java (original)
> +++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java Tue Aug  7 22:10:27 2012
> @@ -203,7 +203,11 @@ public final class JobPoller implements
>      */
>     public void queueNow(Job job) throws InvalidJobException {
>         job.queue();
> -        this.executor.execute(new JobInvoker(job));
> +        try {
> +            this.executor.execute(new JobInvoker(job));
> +        } catch (Exception e) {
> +            job.deQueue();
> +        }
>     }
>
>     public void run() {
>
> Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java
> URL: 
> http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java?rev=1370566&r1=1370565&r2=1370566&view=diff
> ==============================================================================
> --- ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java (original)
> +++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java Tue Aug  7 22:10:27 2012
> @@ -20,7 +20,6 @@ package org.ofbiz.service.job;
>
> import java.io.IOException;
> import java.sql.Timestamp;
> -import com.ibm.icu.util.Calendar;
> import java.util.Date;
> import java.util.Map;
>
> @@ -28,14 +27,12 @@ import javax.xml.parsers.ParserConfigura
>
> import javolution.util.FastMap;
>
> +import org.apache.commons.lang.StringUtils;
> import org.ofbiz.base.util.Debug;
> import org.ofbiz.base.util.UtilDateTime;
> import org.ofbiz.base.util.UtilGenerics;
> import org.ofbiz.base.util.UtilProperties;
> import org.ofbiz.base.util.UtilValidate;
> -import org.ofbiz.service.calendar.RecurrenceInfoException;
> -import org.ofbiz.service.calendar.TemporalExpression;
> -import org.ofbiz.service.calendar.TemporalExpressionWorker;
> import org.ofbiz.entity.Delegator;
> import org.ofbiz.entity.GenericEntityException;
> import org.ofbiz.entity.GenericValue;
> @@ -47,10 +44,13 @@ import org.ofbiz.service.DispatchContext
> import org.ofbiz.service.GenericRequester;
> import org.ofbiz.service.ServiceUtil;
> import org.ofbiz.service.calendar.RecurrenceInfo;
> +import org.ofbiz.service.calendar.RecurrenceInfoException;
> +import org.ofbiz.service.calendar.TemporalExpression;
> +import org.ofbiz.service.calendar.TemporalExpressionWorker;
> import org.ofbiz.service.config.ServiceConfigUtil;
> import org.xml.sax.SAXException;
>
> -import org.apache.commons.lang.StringUtils;
> +import com.ibm.icu.util.Calendar;
>
> /**
>  * A {@link Job} that is backed by the entity engine. Job data is stored
> @@ -100,14 +100,15 @@ public class PersistedServiceJob extends
>             // job not available
>             throw new InvalidJobException("Job [" + getJobId() + "] is not available");
>         } else {
> -            // set the start time to now
> -            jobValue.set("startDateTime", UtilDateTime.nowTimestamp());
> -            jobValue.set("statusId", "SERVICE_RUNNING");
> +            jobValue.set("statusId", "SERVICE_QUEUED");
>             try {
>                 jobValue.store();
>             } catch (GenericEntityException e) {
>                 throw new InvalidJobException("Unable to set the startDateTime and statusId on the current job [" + getJobId() + 
> "]; not running!", e);
>             }
> +            if (Debug.verboseOn()) {
> +                Debug.logVerbose("Placing job [" + getJobId() + "] in queue", module);
> +            }
>         }
>     }
>
> @@ -129,6 +130,16 @@ public class PersistedServiceJob extends
>             // This condition isn't possible, but we will leave it here.
>             throw new InvalidJobException("Job has been accepted by a different instance!");
>         }
> +        jobValue.set("startDateTime", UtilDateTime.nowTimestamp());
> +        jobValue.set("statusId", "SERVICE_RUNNING");
> +        try {
> +            jobValue.store();
> +        } catch (GenericEntityException e) {
> +            throw new InvalidJobException("Unable to set the startDateTime and statusId on the current job [" + getJobId() + "]; 
> not running!", e);
> +        }
> +        if (Debug.verboseOn()) {
> +            Debug.logVerbose("Job [" + getJobId() + "] running", module);
> +        }
>         // configure any additional recurrences
>         long maxRecurrenceCount = -1;
>         long currentRecurrenceCount = 0;
> @@ -331,4 +342,24 @@ public class PersistedServiceJob extends
>         }
>         return null;
>     }
> +
> +    @Override
> +    public void deQueue() throws InvalidJobException {
> +        if (currentState != State.QUEUED) {
> +            throw new InvalidJobException("Illegal state change");
> +        }
> +        currentState = State.CREATED;
> +        try {
> +            jobValue.refresh();
> +            jobValue.set("startDateTime", null);
> +            jobValue.set("runByInstanceId", null);
> +            jobValue.set("statusId", "SERVICE_PENDING");
> +            jobValue.store();
> +        } catch (GenericEntityException e) {
> +            throw new InvalidJobException("Unable to dequeue job [" + getJobId() + "]", e);
> +        }
> +        if (Debug.verboseOn()) {
> +            Debug.logVerbose("Job [" + getJobId() + "] not queued, rescheduling", module);
> +        }
> +    }
> }
>
>