You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ofbiz.apache.org by ad...@apache.org on 2012/08/08 00:10:28 UTC
svn commit: r1370566 - in
/ofbiz/trunk/framework/service/src/org/ofbiz/service/job:
GenericServiceJob.java Job.java JobManager.java JobPoller.java
PersistedServiceJob.java
Author: adrianc
Date: Tue Aug 7 22:10:27 2012
New Revision: 1370566
URL: http://svn.apache.org/viewvc?rev=1370566&view=rev
Log:
Fixed the Job Scheduler so jobs are not lost during heavy server load.
Modified:
ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java
ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java
ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java
ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java
ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java
Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java
URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java?rev=1370566&r1=1370565&r2=1370566&view=diff
==============================================================================
--- ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java (original)
+++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java Tue Aug 7 22:10:27 2012
@@ -144,4 +144,12 @@ public class GenericServiceJob extends A
public boolean isValid() {
return currentState == State.CREATED;
}
+
+ @Override
+ public void deQueue() throws InvalidJobException {
+ if (currentState != State.QUEUED) {
+ throw new InvalidJobException("Illegal state change");
+ }
+ throw new InvalidJobException("Unable to queue job [" + getJobId() + "]");
+ }
}
Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java
URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java?rev=1370566&r1=1370565&r2=1370566&view=diff
==============================================================================
--- ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java (original)
+++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java Tue Aug 7 22:10:27 2012
@@ -60,7 +60,13 @@ public interface Job {
boolean isValid();
/**
- * Transitions the job to the queued state.
+ * Transitions this job to the pre-queued (created) state. The job manager
+ * will call this method when there was a problem adding this job to the queue.
+ */
+ void deQueue() throws InvalidJobException;
+
+ /**
+ * Transitions this job to the queued state.
*/
void queue() throws InvalidJobException;
}
Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java
URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java?rev=1370566&r1=1370565&r2=1370566&view=diff
==============================================================================
--- ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java (original)
+++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java Tue Aug 7 22:10:27 2012
@@ -63,7 +63,6 @@ public final class JobManager {
public static final String module = JobManager.class.getName();
public static final String instanceId = UtilProperties.getPropertyValue("general.properties", "unique.instanceId", "ofbiz0");
- public static final Map<String, Object> updateFields = UtilMisc.<String, Object>toMap("runByInstanceId", instanceId, "statusId", "SERVICE_QUEUED");
private static final ConcurrentHashMap<String, JobManager> registeredManagers = new ConcurrentHashMap<String, JobManager>();
private static boolean isShutDown = false;
@@ -150,7 +149,7 @@ public final class JobManager {
assertIsRunning();
DispatchContext dctx = getDispatcher().getDispatchContext();
if (dctx == null) {
- Debug.logError("Unable to locate DispatchContext object; not running job!", module);
+ Debug.logWarning("Unable to locate DispatchContext object; not running job!", module);
return null;
}
List<Job> poll = FastList.newInstance();
@@ -176,13 +175,13 @@ public final class JobManager {
try {
beganTransaction = TransactionUtil.begin();
if (!beganTransaction) {
- Debug.logError("Unable to poll JobSandbox for jobs; transaction was not started by this process", module);
+ Debug.logWarning("Unable to poll JobSandbox for jobs; transaction was not started by this process", module);
return null;
}
jobsIterator = delegator.find("JobSandbox", mainCondition, null, null, UtilMisc.toList("runTime"), null);
GenericValue jobValue = jobsIterator.next();
while (jobValue != null) {
- jobValue.putAll(updateFields);
+ jobValue.set("runByInstanceId", instanceId); // Claim ownership of this value.
jobValue.store();
poll.add(new PersistedServiceJob(dctx, jobValue, null));
if (poll.size() == limit) {
@@ -191,14 +190,12 @@ public final class JobManager {
jobValue = jobsIterator.next();
}
} catch (Throwable t) {
- // catch Throwable so nothing slips through the cracks... this is a fairly sensitive operation
String errMsg = "Error in polling JobSandbox: [" + t.toString() + "]. Rolling back transaction.";
- Debug.logError(t, errMsg, module);
+ Debug.logWarning(t, errMsg, module);
try {
- // only rollback the transaction if we started one...
TransactionUtil.rollback(beganTransaction, errMsg, t);
} catch (GenericEntityException e2) {
- Debug.logError(e2, "[Delegator] Could not rollback transaction: " + e2.toString(), module);
+ Debug.logWarning(e2, "[Delegator] Could not rollback transaction: " + e2.toString(), module);
}
} finally {
if (jobsIterator != null) {
@@ -209,12 +206,10 @@ public final class JobManager {
}
}
try {
- // only commit the transaction if we started one... but make sure we try
TransactionUtil.commit(beganTransaction);
} catch (GenericTransactionException e) {
String errMsg = "Transaction error trying to commit when polling and updating the JobSandbox: " + e.toString();
- // we don't really want to do anything different, so just log and move on
- Debug.logError(e, errMsg, module);
+ Debug.logWarning(e, errMsg, module);
}
}
return poll;
@@ -222,7 +217,9 @@ public final class JobManager {
private void reloadCrashedJobs() {
List<GenericValue> crashed = null;
- List<EntityExpr> statusExprList = UtilMisc.toList(EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_QUEUED"), EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_RUNNING"));
+ List<EntityExpr> statusExprList = UtilMisc.toList(EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_PENDING"),
+ EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_QUEUED"),
+ EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_RUNNING"));
EntityCondition statusCondition = EntityCondition.makeCondition(statusExprList, EntityOperator.OR);
List<EntityExpr> poolsExpr = UtilMisc.toList(EntityCondition.makeCondition("poolId", EntityOperator.EQUALS, null));
List<String> pools = ServiceConfigUtil.getRunPools();
@@ -236,13 +233,13 @@ public final class JobManager {
try {
crashed = delegator.findList("JobSandbox", mainCondition, null, UtilMisc.toList("startDateTime"), null, false);
} catch (GenericEntityException e) {
- Debug.logError(e, "Unable to load crashed jobs", module);
+ Debug.logWarning(e, "Unable to load crashed jobs", module);
}
if (UtilValidate.isNotEmpty(crashed)) {
- try {
- int rescheduled = 0;
- for (GenericValue job : crashed) {
- Timestamp now = UtilDateTime.nowTimestamp();
+ int rescheduled = 0;
+ Timestamp now = UtilDateTime.nowTimestamp();
+ for (GenericValue job : crashed) {
+ try {
Debug.logInfo("Scheduling Job : " + job, module);
String pJobId = job.getString("parentJobId");
if (pJobId == null) {
@@ -261,12 +258,12 @@ public final class JobManager {
job.set("cancelDateTime", now);
delegator.store(job);
rescheduled++;
+ } catch (GenericEntityException e) {
+ Debug.logWarning(e, module);
}
- if (Debug.infoOn())
- Debug.logInfo("-- " + rescheduled + " jobs re-scheduled", module);
- } catch (GenericEntityException e) {
- Debug.logError(e, module);
}
+ if (Debug.infoOn())
+ Debug.logInfo("-- " + rescheduled + " jobs re-scheduled", module);
} else {
if (Debug.infoOn())
Debug.logInfo("No crashed jobs to re-schedule", module);
Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java
URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java?rev=1370566&r1=1370565&r2=1370566&view=diff
==============================================================================
--- ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java (original)
+++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java Tue Aug 7 22:10:27 2012
@@ -203,7 +203,11 @@ public final class JobPoller implements
*/
public void queueNow(Job job) throws InvalidJobException {
job.queue();
- this.executor.execute(new JobInvoker(job));
+ try {
+ this.executor.execute(new JobInvoker(job));
+ } catch (Exception e) {
+ job.deQueue();
+ }
}
public void run() {
Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java
URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java?rev=1370566&r1=1370565&r2=1370566&view=diff
==============================================================================
--- ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java (original)
+++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java Tue Aug 7 22:10:27 2012
@@ -20,7 +20,6 @@ package org.ofbiz.service.job;
import java.io.IOException;
import java.sql.Timestamp;
-import com.ibm.icu.util.Calendar;
import java.util.Date;
import java.util.Map;
@@ -28,14 +27,12 @@ import javax.xml.parsers.ParserConfigura
import javolution.util.FastMap;
+import org.apache.commons.lang.StringUtils;
import org.ofbiz.base.util.Debug;
import org.ofbiz.base.util.UtilDateTime;
import org.ofbiz.base.util.UtilGenerics;
import org.ofbiz.base.util.UtilProperties;
import org.ofbiz.base.util.UtilValidate;
-import org.ofbiz.service.calendar.RecurrenceInfoException;
-import org.ofbiz.service.calendar.TemporalExpression;
-import org.ofbiz.service.calendar.TemporalExpressionWorker;
import org.ofbiz.entity.Delegator;
import org.ofbiz.entity.GenericEntityException;
import org.ofbiz.entity.GenericValue;
@@ -47,10 +44,13 @@ import org.ofbiz.service.DispatchContext
import org.ofbiz.service.GenericRequester;
import org.ofbiz.service.ServiceUtil;
import org.ofbiz.service.calendar.RecurrenceInfo;
+import org.ofbiz.service.calendar.RecurrenceInfoException;
+import org.ofbiz.service.calendar.TemporalExpression;
+import org.ofbiz.service.calendar.TemporalExpressionWorker;
import org.ofbiz.service.config.ServiceConfigUtil;
import org.xml.sax.SAXException;
-import org.apache.commons.lang.StringUtils;
+import com.ibm.icu.util.Calendar;
/**
* A {@link Job} that is backed by the entity engine. Job data is stored
@@ -100,14 +100,15 @@ public class PersistedServiceJob extends
// job not available
throw new InvalidJobException("Job [" + getJobId() + "] is not available");
} else {
- // set the start time to now
- jobValue.set("startDateTime", UtilDateTime.nowTimestamp());
- jobValue.set("statusId", "SERVICE_RUNNING");
+ jobValue.set("statusId", "SERVICE_QUEUED");
try {
jobValue.store();
} catch (GenericEntityException e) {
throw new InvalidJobException("Unable to set the startDateTime and statusId on the current job [" + getJobId() + "]; not running!", e);
}
+ if (Debug.verboseOn()) {
+ Debug.logVerbose("Placing job [" + getJobId() + "] in queue", module);
+ }
}
}
@@ -129,6 +130,16 @@ public class PersistedServiceJob extends
// This condition isn't possible, but we will leave it here.
throw new InvalidJobException("Job has been accepted by a different instance!");
}
+ jobValue.set("startDateTime", UtilDateTime.nowTimestamp());
+ jobValue.set("statusId", "SERVICE_RUNNING");
+ try {
+ jobValue.store();
+ } catch (GenericEntityException e) {
+ throw new InvalidJobException("Unable to set the startDateTime and statusId on the current job [" + getJobId() + "]; not running!", e);
+ }
+ if (Debug.verboseOn()) {
+ Debug.logVerbose("Job [" + getJobId() + "] running", module);
+ }
// configure any additional recurrences
long maxRecurrenceCount = -1;
long currentRecurrenceCount = 0;
@@ -331,4 +342,24 @@ public class PersistedServiceJob extends
}
return null;
}
+
+ @Override
+ public void deQueue() throws InvalidJobException {
+ if (currentState != State.QUEUED) {
+ throw new InvalidJobException("Illegal state change");
+ }
+ currentState = State.CREATED;
+ try {
+ jobValue.refresh();
+ jobValue.set("startDateTime", null);
+ jobValue.set("runByInstanceId", null);
+ jobValue.set("statusId", "SERVICE_PENDING");
+ jobValue.store();
+ } catch (GenericEntityException e) {
+ throw new InvalidJobException("Unable to dequeue job [" + getJobId() + "]", e);
+ }
+ if (Debug.verboseOn()) {
+ Debug.logVerbose("Job [" + getJobId() + "] not queued, rescheduling", module);
+ }
+ }
}
Re: svn commit: r1370566 - in /ofbiz/trunk/framework/service/src/org/ofbiz/service/job:
GenericServiceJob.java Job.java JobManager.java JobPoller.java PersistedServiceJob.java
Posted by Adrian Crum <ad...@sandglass-software.com>.
Usually, when I do an overhaul like this I reformat the code first, in a
separate commit. But I didn't start out intending to overhaul the Job
Scheduler, I was just trying to fix some bugs I encountered, then things
progressed...
I would appreciate another review. Scott's comments were helpful, and
I'm sure yours will be also.
-Adrian
On 8/11/2012 9:49 AM, Jacques Le Roux wrote:
> OK clear, I had to look at the source not only the diff.
> I must say I try to cope with the changes there, but will need more
> time to review completly
>
> Jacques
>
> From: "Jacques Le Roux" <ja...@les7arts.com>
>> I just looked at the diff, I will see in the context, maybe you are
>> right...
>>
>> Jacques
>>
>> From: "Adrian Crum" <ad...@sandglass-software.com>
>>> You are welcome to put them back in. They seemed redundant (and
>>> silly) to me. Example:
>>>
>>> // Set var to 1
>>> int var = 1;
>>> // Call someMethod, put result in var2
>>> int var2 = someMethod();
>>> // Calculate var1 * var2
>>> int result = var1 * var2
>>>
>>> The comments are not adding anything of value - they are only
>>> stating the obvious.
>>>
>>> -Adrian
>>>
>>> On 8/8/2012 10:07 PM, Jacques Le Roux wrote:
>>>> Else forgot to say that I like it
>>>>
>>>> Jacques
>>>>
>>>> From: "Jacques Le Roux" <ja...@les7arts.com>
>>>>> Hi Adrian,
>>>>>
>>>>> Why did you remove these comments ?
>>>>>> - // catch Throwable so nothing slips through the
>>>>>> cracks... this is a fairly sensitive operation
>>>>>> - // only rollback the transaction if we started
>>>>>> one...
>>>>>> - // only commit the transaction if we started
>>>>>> one... but make sure we try
>>>>>> - // we don't really want to do anything
>>>>>> different, so just log and move on
>>>>>
>>>>> It seems they add some information
>>>>>
>>>>> Jacques
>>>>>
>>>>> From: <ad...@apache.org>
>>>>>> Author: adrianc
>>>>>> Date: Tue Aug 7 22:10:27 2012
>>>>>> New Revision: 1370566
>>>>>>
>>>>>> URL: http://svn.apache.org/viewvc?rev=1370566&view=rev
>>>>>> Log:
>>>>>> Fixed the Job Scheduler so jobs are not lost during heavy server
>>>>>> load.
>>>>>>
>>>>>> Modified:
>>>>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java
>>>>>>
>>>>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java
>>>>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java
>>>>>>
>>>>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java
>>>>>>
>>>>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java
>>>>>>
>>>>>>
>>>>>> Modified:
>>>>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java
>>>>>> URL:
>>>>>> http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java?rev=1370566&r1=1370565&r2=1370566&view=diff
>>>>>> ==============================================================================
>>>>>>
>>>>>> ---
>>>>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java
>>>>>> (original)
>>>>>> +++
>>>>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java
>>>>>> Tue Aug 7 22:10:27 2012
>>>>>> @@ -144,4 +144,12 @@ public class GenericServiceJob extends A
>>>>>> public boolean isValid() {
>>>>>> return currentState == State.CREATED;
>>>>>> }
>>>>>> +
>>>>>> + @Override
>>>>>> + public void deQueue() throws InvalidJobException {
>>>>>> + if (currentState != State.QUEUED) {
>>>>>> + throw new InvalidJobException("Illegal state change");
>>>>>> + }
>>>>>> + throw new InvalidJobException("Unable to queue job [" +
>>>>>> getJobId() + "]");
>>>>>> + }
>>>>>> }
>>>>>>
>>>>>> Modified:
>>>>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java
>>>>>> URL:
>>>>>> http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java?rev=1370566&r1=1370565&r2=1370566&view=diff
>>>>>> ==============================================================================
>>>>>>
>>>>>> ---
>>>>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java
>>>>>> (original)
>>>>>> +++
>>>>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java
>>>>>> Tue Aug 7 22:10:27 2012
>>>>>> @@ -60,7 +60,13 @@ public interface Job {
>>>>>> boolean isValid();
>>>>>>
>>>>>> /**
>>>>>> - * Transitions the job to the queued state.
>>>>>> + * Transitions this job to the pre-queued (created) state.
>>>>>> The job manager
>>>>>> + * will call this method when there was a problem adding
>>>>>> this job to the queue.
>>>>>> + */
>>>>>> + void deQueue() throws InvalidJobException;
>>>>>> +
>>>>>> + /**
>>>>>> + * Transitions this job to the queued state.
>>>>>> */
>>>>>> void queue() throws InvalidJobException;
>>>>>> }
>>>>>>
>>>>>> Modified:
>>>>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java
>>>>>>
>>>>>> URL:
>>>>>> http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java?rev=1370566&r1=1370565&r2=1370566&view=diff
>>>>>> ==============================================================================
>>>>>>
>>>>>> ---
>>>>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java
>>>>>> (original)
>>>>>> +++
>>>>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java
>>>>>> Tue Aug 7 22:10:27 2012
>>>>>> @@ -63,7 +63,6 @@ public final class JobManager {
>>>>>>
>>>>>> public static final String module = JobManager.class.getName();
>>>>>> public static final String instanceId =
>>>>>> UtilProperties.getPropertyValue("general.properties",
>>>>>> "unique.instanceId", "ofbiz0");
>>>>>> - public static final Map<String, Object> updateFields =
>>>>>> UtilMisc.<String, Object>toMap("runByInstanceId", instanceId,
>>>>>> "statusId", "SERVICE_QUEUED");
>>>>>> private static final ConcurrentHashMap<String, JobManager>
>>>>>> registeredManagers = new ConcurrentHashMap<String, JobManager>();
>>>>>> private static boolean isShutDown = false;
>>>>>>
>>>>>> @@ -150,7 +149,7 @@ public final class JobManager {
>>>>>> assertIsRunning();
>>>>>> DispatchContext dctx = getDispatcher().getDispatchContext();
>>>>>> if (dctx == null) {
>>>>>> - Debug.logError("Unable to locate DispatchContext
>>>>>> object; not running job!", module);
>>>>>> + Debug.logWarning("Unable to locate DispatchContext
>>>>>> object; not running job!", module);
>>>>>> return null;
>>>>>> }
>>>>>> List<Job> poll = FastList.newInstance();
>>>>>> @@ -176,13 +175,13 @@ public final class JobManager {
>>>>>> try {
>>>>>> beganTransaction = TransactionUtil.begin();
>>>>>> if (!beganTransaction) {
>>>>>> - Debug.logError("Unable to poll JobSandbox for
>>>>>> jobs; transaction was not started by this process", module);
>>>>>> + Debug.logWarning("Unable to poll JobSandbox for
>>>>>> jobs; transaction was not started by this process", module);
>>>>>> return null;
>>>>>> }
>>>>>> jobsIterator = delegator.find("JobSandbox",
>>>>>> mainCondition, null, null, UtilMisc.toList("runTime"), null);
>>>>>> GenericValue jobValue = jobsIterator.next();
>>>>>> while (jobValue != null) {
>>>>>> - jobValue.putAll(updateFields);
>>>>>> + jobValue.set("runByInstanceId", instanceId); //
>>>>>> Claim ownership of this value.
>>>>>> jobValue.store();
>>>>>> poll.add(new PersistedServiceJob(dctx, jobValue,
>>>>>> null));
>>>>>> if (poll.size() == limit) {
>>>>>> @@ -191,14 +190,12 @@ public final class JobManager {
>>>>>> jobValue = jobsIterator.next();
>>>>>> }
>>>>>> } catch (Throwable t) {
>>>>>> - // catch Throwable so nothing slips through the
>>>>>> cracks... this is a fairly sensitive operation
>>>>>> String errMsg = "Error in polling JobSandbox: [" +
>>>>>> t.toString() + "]. Rolling back transaction.";
>>>>>> - Debug.logError(t, errMsg, module);
>>>>>> + Debug.logWarning(t, errMsg, module);
>>>>>> try {
>>>>>> - // only rollback the transaction if we started
>>>>>> one...
>>>>>> TransactionUtil.rollback(beganTransaction, errMsg, t);
>>>>>> } catch (GenericEntityException e2) {
>>>>>> - Debug.logError(e2, "[Delegator] Could not
>>>>>> rollback transaction: " + e2.toString(), module);
>>>>>> + Debug.logWarning(e2, "[Delegator] Could not
>>>>>> rollback transaction: " + e2.toString(), module);
>>>>>> }
>>>>>> } finally {
>>>>>> if (jobsIterator != null) {
>>>>>> @@ -209,12 +206,10 @@ public final class JobManager {
>>>>>> }
>>>>>> }
>>>>>> try {
>>>>>> - // only commit the transaction if we started
>>>>>> one... but make sure we try
>>>>>> TransactionUtil.commit(beganTransaction);
>>>>>> } catch (GenericTransactionException e) {
>>>>>> String errMsg = "Transaction error trying to
>>>>>> commit when polling and updating the JobSandbox: " + e.toString();
>>>>>> - // we don't really want to do anything
>>>>>> different, so just log and move on
>>>>>> - Debug.logError(e, errMsg, module);
>>>>>> + Debug.logWarning(e, errMsg, module);
>>>>>> }
>>>>>> }
>>>>>> return poll;
>>>>>> @@ -222,7 +217,9 @@ public final class JobManager {
>>>>>>
>>>>>> private void reloadCrashedJobs() {
>>>>>> List<GenericValue> crashed = null;
>>>>>> - List<EntityExpr> statusExprList =
>>>>>> UtilMisc.toList(EntityCondition.makeCondition("statusId",
>>>>>> EntityOperator.EQUALS, "SERVICE_QUEUED"),
>>>>>> EntityCondition.makeCondition("statusId", EntityOperator.EQUALS,
>>>>>> "SERVICE_RUNNING"));
>>>>>> + List<EntityExpr> statusExprList =
>>>>>> UtilMisc.toList(EntityCondition.makeCondition("statusId",
>>>>>> EntityOperator.EQUALS, "SERVICE_PENDING"),
>>>>>> + EntityCondition.makeCondition("statusId",
>>>>>> EntityOperator.EQUALS, "SERVICE_QUEUED"),
>>>>>> + EntityCondition.makeCondition("statusId",
>>>>>> EntityOperator.EQUALS, "SERVICE_RUNNING"));
>>>>>> EntityCondition statusCondition =
>>>>>> EntityCondition.makeCondition(statusExprList, EntityOperator.OR);
>>>>>> List<EntityExpr> poolsExpr =
>>>>>> UtilMisc.toList(EntityCondition.makeCondition("poolId",
>>>>>> EntityOperator.EQUALS, null));
>>>>>> List<String> pools = ServiceConfigUtil.getRunPools();
>>>>>> @@ -236,13 +233,13 @@ public final class JobManager {
>>>>>> try {
>>>>>> crashed = delegator.findList("JobSandbox",
>>>>>> mainCondition, null, UtilMisc.toList("startDateTime"), null, false);
>>>>>> } catch (GenericEntityException e) {
>>>>>> - Debug.logError(e, "Unable to load crashed jobs",
>>>>>> module);
>>>>>> + Debug.logWarning(e, "Unable to load crashed jobs",
>>>>>> module);
>>>>>> }
>>>>>> if (UtilValidate.isNotEmpty(crashed)) {
>>>>>> - try {
>>>>>> - int rescheduled = 0;
>>>>>> - for (GenericValue job : crashed) {
>>>>>> - Timestamp now = UtilDateTime.nowTimestamp();
>>>>>> + int rescheduled = 0;
>>>>>> + Timestamp now = UtilDateTime.nowTimestamp();
>>>>>> + for (GenericValue job : crashed) {
>>>>>> + try {
>>>>>> Debug.logInfo("Scheduling Job : " + job,
>>>>>> module);
>>>>>> String pJobId = job.getString("parentJobId");
>>>>>> if (pJobId == null) {
>>>>>> @@ -261,12 +258,12 @@ public final class JobManager {
>>>>>> job.set("cancelDateTime", now);
>>>>>> delegator.store(job);
>>>>>> rescheduled++;
>>>>>> + } catch (GenericEntityException e) {
>>>>>> + Debug.logWarning(e, module);
>>>>>> }
>>>>>> - if (Debug.infoOn())
>>>>>> - Debug.logInfo("-- " + rescheduled + " jobs
>>>>>> re-scheduled", module);
>>>>>> - } catch (GenericEntityException e) {
>>>>>> - Debug.logError(e, module);
>>>>>> }
>>>>>> + if (Debug.infoOn())
>>>>>> + Debug.logInfo("-- " + rescheduled + " jobs
>>>>>> re-scheduled", module);
>>>>>> } else {
>>>>>> if (Debug.infoOn())
>>>>>> Debug.logInfo("No crashed jobs to re-schedule",
>>>>>> module);
>>>>>>
>>>>>> Modified:
>>>>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java
>>>>>>
>>>>>> URL:
>>>>>> http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java?rev=1370566&r1=1370565&r2=1370566&view=diff
>>>>>> ==============================================================================
>>>>>>
>>>>>> ---
>>>>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java
>>>>>> (original)
>>>>>> +++
>>>>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java
>>>>>> Tue Aug 7 22:10:27 2012
>>>>>> @@ -203,7 +203,11 @@ public final class JobPoller implements
>>>>>> */
>>>>>> public void queueNow(Job job) throws InvalidJobException {
>>>>>> job.queue();
>>>>>> - this.executor.execute(new JobInvoker(job));
>>>>>> + try {
>>>>>> + this.executor.execute(new JobInvoker(job));
>>>>>> + } catch (Exception e) {
>>>>>> + job.deQueue();
>>>>>> + }
>>>>>> }
>>>>>>
>>>>>> public void run() {
>>>>>>
>>>>>> Modified:
>>>>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java
>>>>>> URL:
>>>>>> http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java?rev=1370566&r1=1370565&r2=1370566&view=diff
>>>>>> ==============================================================================
>>>>>>
>>>>>> ---
>>>>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java
>>>>>> (original)
>>>>>> +++
>>>>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java
>>>>>> Tue Aug 7 22:10:27 2012
>>>>>> @@ -20,7 +20,6 @@ package org.ofbiz.service.job;
>>>>>>
>>>>>> import java.io.IOException;
>>>>>> import java.sql.Timestamp;
>>>>>> -import com.ibm.icu.util.Calendar;
>>>>>> import java.util.Date;
>>>>>> import java.util.Map;
>>>>>>
>>>>>> @@ -28,14 +27,12 @@ import javax.xml.parsers.ParserConfigura
>>>>>>
>>>>>> import javolution.util.FastMap;
>>>>>>
>>>>>> +import org.apache.commons.lang.StringUtils;
>>>>>> import org.ofbiz.base.util.Debug;
>>>>>> import org.ofbiz.base.util.UtilDateTime;
>>>>>> import org.ofbiz.base.util.UtilGenerics;
>>>>>> import org.ofbiz.base.util.UtilProperties;
>>>>>> import org.ofbiz.base.util.UtilValidate;
>>>>>> -import org.ofbiz.service.calendar.RecurrenceInfoException;
>>>>>> -import org.ofbiz.service.calendar.TemporalExpression;
>>>>>> -import org.ofbiz.service.calendar.TemporalExpressionWorker;
>>>>>> import org.ofbiz.entity.Delegator;
>>>>>> import org.ofbiz.entity.GenericEntityException;
>>>>>> import org.ofbiz.entity.GenericValue;
>>>>>> @@ -47,10 +44,13 @@ import org.ofbiz.service.DispatchContext
>>>>>> import org.ofbiz.service.GenericRequester;
>>>>>> import org.ofbiz.service.ServiceUtil;
>>>>>> import org.ofbiz.service.calendar.RecurrenceInfo;
>>>>>> +import org.ofbiz.service.calendar.RecurrenceInfoException;
>>>>>> +import org.ofbiz.service.calendar.TemporalExpression;
>>>>>> +import org.ofbiz.service.calendar.TemporalExpressionWorker;
>>>>>> import org.ofbiz.service.config.ServiceConfigUtil;
>>>>>> import org.xml.sax.SAXException;
>>>>>>
>>>>>> -import org.apache.commons.lang.StringUtils;
>>>>>> +import com.ibm.icu.util.Calendar;
>>>>>>
>>>>>> /**
>>>>>> * A {@link Job} that is backed by the entity engine. Job data is
>>>>>> stored
>>>>>> @@ -100,14 +100,15 @@ public class PersistedServiceJob extends
>>>>>> // job not available
>>>>>> throw new InvalidJobException("Job [" + getJobId() +
>>>>>> "] is not available");
>>>>>> } else {
>>>>>> - // set the start time to now
>>>>>> - jobValue.set("startDateTime",
>>>>>> UtilDateTime.nowTimestamp());
>>>>>> - jobValue.set("statusId", "SERVICE_RUNNING");
>>>>>> + jobValue.set("statusId", "SERVICE_QUEUED");
>>>>>> try {
>>>>>> jobValue.store();
>>>>>> } catch (GenericEntityException e) {
>>>>>> throw new InvalidJobException("Unable to set the
>>>>>> startDateTime and statusId on the current job [" + getJobId() +
>>>>>> "]; not running!", e);
>>>>>> }
>>>>>> + if (Debug.verboseOn()) {
>>>>>> + Debug.logVerbose("Placing job [" + getJobId() +
>>>>>> "] in queue", module);
>>>>>> + }
>>>>>> }
>>>>>> }
>>>>>>
>>>>>> @@ -129,6 +130,16 @@ public class PersistedServiceJob extends
>>>>>> // This condition isn't possible, but we will leave
>>>>>> it here.
>>>>>> throw new InvalidJobException("Job has been accepted
>>>>>> by a different instance!");
>>>>>> }
>>>>>> + jobValue.set("startDateTime", UtilDateTime.nowTimestamp());
>>>>>> + jobValue.set("statusId", "SERVICE_RUNNING");
>>>>>> + try {
>>>>>> + jobValue.store();
>>>>>> + } catch (GenericEntityException e) {
>>>>>> + throw new InvalidJobException("Unable to set the
>>>>>> startDateTime and statusId on the current job [" + getJobId() +
>>>>>> "]; not running!", e);
>>>>>> + }
>>>>>> + if (Debug.verboseOn()) {
>>>>>> + Debug.logVerbose("Job [" + getJobId() + "] running",
>>>>>> module);
>>>>>> + }
>>>>>> // configure any additional recurrences
>>>>>> long maxRecurrenceCount = -1;
>>>>>> long currentRecurrenceCount = 0;
>>>>>> @@ -331,4 +342,24 @@ public class PersistedServiceJob extends
>>>>>> }
>>>>>> return null;
>>>>>> }
>>>>>> +
>>>>>> + @Override
>>>>>> + public void deQueue() throws InvalidJobException {
>>>>>> + if (currentState != State.QUEUED) {
>>>>>> + throw new InvalidJobException("Illegal state change");
>>>>>> + }
>>>>>> + currentState = State.CREATED;
>>>>>> + try {
>>>>>> + jobValue.refresh();
>>>>>> + jobValue.set("startDateTime", null);
>>>>>> + jobValue.set("runByInstanceId", null);
>>>>>> + jobValue.set("statusId", "SERVICE_PENDING");
>>>>>> + jobValue.store();
>>>>>> + } catch (GenericEntityException e) {
>>>>>> + throw new InvalidJobException("Unable to dequeue job
>>>>>> [" + getJobId() + "]", e);
>>>>>> + }
>>>>>> + if (Debug.verboseOn()) {
>>>>>> + Debug.logVerbose("Job [" + getJobId() + "] not
>>>>>> queued, rescheduling", module);
>>>>>> + }
>>>>>> + }
>>>>>> }
>>>>>>
>>>>>>
>>>
Re: svn commit: r1370566 - in /ofbiz/trunk/framework/service/src/org/ofbiz/service/job: GenericServiceJob.java Job.java JobManager.java JobPoller.java PersistedServiceJob.java
Posted by Jacques Le Roux <ja...@les7arts.com>.
OK clear, I had to look at the source not only the diff.
I must say I try to cope with the changes there, but will need more time to review completly
Jacques
From: "Jacques Le Roux" <ja...@les7arts.com>
>I just looked at the diff, I will see in the context, maybe you are right...
>
> Jacques
>
> From: "Adrian Crum" <ad...@sandglass-software.com>
>> You are welcome to put them back in. They seemed redundant (and silly) to me. Example:
>>
>> // Set var to 1
>> int var = 1;
>> // Call someMethod, put result in var2
>> int var2 = someMethod();
>> // Calculate var1 * var2
>> int result = var1 * var2
>>
>> The comments are not adding anything of value - they are only stating the obvious.
>>
>> -Adrian
>>
>> On 8/8/2012 10:07 PM, Jacques Le Roux wrote:
>>> Else forgot to say that I like it
>>>
>>> Jacques
>>>
>>> From: "Jacques Le Roux" <ja...@les7arts.com>
>>>> Hi Adrian,
>>>>
>>>> Why did you remove these comments ?
>>>>> - // catch Throwable so nothing slips through the cracks... this is a fairly sensitive operation
>>>>> - // only rollback the transaction if we started one...
>>>>> - // only commit the transaction if we started one... but make sure we try
>>>>> - // we don't really want to do anything different, so just log and move on
>>>>
>>>> It seems they add some information
>>>>
>>>> Jacques
>>>>
>>>> From: <ad...@apache.org>
>>>>> Author: adrianc
>>>>> Date: Tue Aug 7 22:10:27 2012
>>>>> New Revision: 1370566
>>>>>
>>>>> URL: http://svn.apache.org/viewvc?rev=1370566&view=rev
>>>>> Log:
>>>>> Fixed the Job Scheduler so jobs are not lost during heavy server load.
>>>>>
>>>>> Modified:
>>>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java
>>>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java
>>>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java
>>>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java
>>>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java
>>>>>
>>>>> Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java
>>>>> URL:
>>>>> http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java?rev=1370566&r1=1370565&r2=1370566&view=diff
>>>>> ==============================================================================
>>>>> ---
>>>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java (original)
>>>>> +++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java Tue Aug 7 22:10:27 2012
>>>>> @@ -144,4 +144,12 @@ public class GenericServiceJob extends A
>>>>> public boolean isValid() {
>>>>> return currentState == State.CREATED;
>>>>> }
>>>>> +
>>>>> + @Override
>>>>> + public void deQueue() throws InvalidJobException {
>>>>> + if (currentState != State.QUEUED) {
>>>>> + throw new InvalidJobException("Illegal state change");
>>>>> + }
>>>>> + throw new InvalidJobException("Unable to queue job [" + getJobId() + "]");
>>>>> + }
>>>>> }
>>>>>
>>>>> Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java
>>>>> URL:
>>>>> http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java?rev=1370566&r1=1370565&r2=1370566&view=diff
>>>>> ==============================================================================
>>>>> --- ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java (original)
>>>>> +++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java Tue Aug 7 22:10:27 2012
>>>>> @@ -60,7 +60,13 @@ public interface Job {
>>>>> boolean isValid();
>>>>>
>>>>> /**
>>>>> - * Transitions the job to the queued state.
>>>>> + * Transitions this job to the pre-queued (created) state. The job manager
>>>>> + * will call this method when there was a problem adding this job to the queue.
>>>>> + */
>>>>> + void deQueue() throws InvalidJobException;
>>>>> +
>>>>> + /**
>>>>> + * Transitions this job to the queued state.
>>>>> */
>>>>> void queue() throws InvalidJobException;
>>>>> }
>>>>>
>>>>> Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java
>>>>> URL:
>>>>> http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java?rev=1370566&r1=1370565&r2=1370566&view=diff
>>>>> ==============================================================================
>>>>> ---
>>>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java (original)
>>>>> +++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java Tue Aug 7 22:10:27 2012
>>>>> @@ -63,7 +63,6 @@ public final class JobManager {
>>>>>
>>>>> public static final String module = JobManager.class.getName();
>>>>> public static final String instanceId = UtilProperties.getPropertyValue("general.properties", "unique.instanceId",
>>>>> "ofbiz0");
>>>>> - public static final Map<String, Object> updateFields = UtilMisc.<String, Object>toMap("runByInstanceId", instanceId,
>>>>> "statusId", "SERVICE_QUEUED");
>>>>> private static final ConcurrentHashMap<String, JobManager> registeredManagers = new ConcurrentHashMap<String,
>>>>> JobManager>();
>>>>> private static boolean isShutDown = false;
>>>>>
>>>>> @@ -150,7 +149,7 @@ public final class JobManager {
>>>>> assertIsRunning();
>>>>> DispatchContext dctx = getDispatcher().getDispatchContext();
>>>>> if (dctx == null) {
>>>>> - Debug.logError("Unable to locate DispatchContext object; not running job!", module);
>>>>> + Debug.logWarning("Unable to locate DispatchContext object; not running job!", module);
>>>>> return null;
>>>>> }
>>>>> List<Job> poll = FastList.newInstance();
>>>>> @@ -176,13 +175,13 @@ public final class JobManager {
>>>>> try {
>>>>> beganTransaction = TransactionUtil.begin();
>>>>> if (!beganTransaction) {
>>>>> - Debug.logError("Unable to poll JobSandbox for jobs; transaction was not started by this process", module);
>>>>> + Debug.logWarning("Unable to poll JobSandbox for jobs; transaction was not started by this process", module);
>>>>> return null;
>>>>> }
>>>>> jobsIterator = delegator.find("JobSandbox", mainCondition, null, null, UtilMisc.toList("runTime"), null);
>>>>> GenericValue jobValue = jobsIterator.next();
>>>>> while (jobValue != null) {
>>>>> - jobValue.putAll(updateFields);
>>>>> + jobValue.set("runByInstanceId", instanceId); // Claim ownership of this value.
>>>>> jobValue.store();
>>>>> poll.add(new PersistedServiceJob(dctx, jobValue, null));
>>>>> if (poll.size() == limit) {
>>>>> @@ -191,14 +190,12 @@ public final class JobManager {
>>>>> jobValue = jobsIterator.next();
>>>>> }
>>>>> } catch (Throwable t) {
>>>>> - // catch Throwable so nothing slips through the cracks... this is a fairly sensitive operation
>>>>> String errMsg = "Error in polling JobSandbox: [" + t.toString() + "]. Rolling back transaction.";
>>>>> - Debug.logError(t, errMsg, module);
>>>>> + Debug.logWarning(t, errMsg, module);
>>>>> try {
>>>>> - // only rollback the transaction if we started one...
>>>>> TransactionUtil.rollback(beganTransaction, errMsg, t);
>>>>> } catch (GenericEntityException e2) {
>>>>> - Debug.logError(e2, "[Delegator] Could not rollback transaction: " + e2.toString(), module);
>>>>> + Debug.logWarning(e2, "[Delegator] Could not rollback transaction: " + e2.toString(), module);
>>>>> }
>>>>> } finally {
>>>>> if (jobsIterator != null) {
>>>>> @@ -209,12 +206,10 @@ public final class JobManager {
>>>>> }
>>>>> }
>>>>> try {
>>>>> - // only commit the transaction if we started one... but make sure we try
>>>>> TransactionUtil.commit(beganTransaction);
>>>>> } catch (GenericTransactionException e) {
>>>>> String errMsg = "Transaction error trying to commit when polling and updating the JobSandbox: " +
>>>>> e.toString();
>>>>> - // we don't really want to do anything different, so just log and move on
>>>>> - Debug.logError(e, errMsg, module);
>>>>> + Debug.logWarning(e, errMsg, module);
>>>>> }
>>>>> }
>>>>> return poll;
>>>>> @@ -222,7 +217,9 @@ public final class JobManager {
>>>>>
>>>>> private void reloadCrashedJobs() {
>>>>> List<GenericValue> crashed = null;
>>>>> - List<EntityExpr> statusExprList = UtilMisc.toList(EntityCondition.makeCondition("statusId", EntityOperator.EQUALS,
>>>>> "SERVICE_QUEUED"), EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_RUNNING"));
>>>>> + List<EntityExpr> statusExprList = UtilMisc.toList(EntityCondition.makeCondition("statusId", EntityOperator.EQUALS,
>>>>> "SERVICE_PENDING"),
>>>>> + EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_QUEUED"),
>>>>> + EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_RUNNING"));
>>>>> EntityCondition statusCondition = EntityCondition.makeCondition(statusExprList, EntityOperator.OR);
>>>>> List<EntityExpr> poolsExpr = UtilMisc.toList(EntityCondition.makeCondition("poolId", EntityOperator.EQUALS, null));
>>>>> List<String> pools = ServiceConfigUtil.getRunPools();
>>>>> @@ -236,13 +233,13 @@ public final class JobManager {
>>>>> try {
>>>>> crashed = delegator.findList("JobSandbox", mainCondition, null, UtilMisc.toList("startDateTime"), null, false);
>>>>> } catch (GenericEntityException e) {
>>>>> - Debug.logError(e, "Unable to load crashed jobs", module);
>>>>> + Debug.logWarning(e, "Unable to load crashed jobs", module);
>>>>> }
>>>>> if (UtilValidate.isNotEmpty(crashed)) {
>>>>> - try {
>>>>> - int rescheduled = 0;
>>>>> - for (GenericValue job : crashed) {
>>>>> - Timestamp now = UtilDateTime.nowTimestamp();
>>>>> + int rescheduled = 0;
>>>>> + Timestamp now = UtilDateTime.nowTimestamp();
>>>>> + for (GenericValue job : crashed) {
>>>>> + try {
>>>>> Debug.logInfo("Scheduling Job : " + job, module);
>>>>> String pJobId = job.getString("parentJobId");
>>>>> if (pJobId == null) {
>>>>> @@ -261,12 +258,12 @@ public final class JobManager {
>>>>> job.set("cancelDateTime", now);
>>>>> delegator.store(job);
>>>>> rescheduled++;
>>>>> + } catch (GenericEntityException e) {
>>>>> + Debug.logWarning(e, module);
>>>>> }
>>>>> - if (Debug.infoOn())
>>>>> - Debug.logInfo("-- " + rescheduled + " jobs re-scheduled", module);
>>>>> - } catch (GenericEntityException e) {
>>>>> - Debug.logError(e, module);
>>>>> }
>>>>> + if (Debug.infoOn())
>>>>> + Debug.logInfo("-- " + rescheduled + " jobs re-scheduled", module);
>>>>> } else {
>>>>> if (Debug.infoOn())
>>>>> Debug.logInfo("No crashed jobs to re-schedule", module);
>>>>>
>>>>> Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java
>>>>> URL:
>>>>> http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java?rev=1370566&r1=1370565&r2=1370566&view=diff
>>>>> ==============================================================================
>>>>> ---
>>>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java (original)
>>>>> +++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java Tue Aug 7 22:10:27 2012
>>>>> @@ -203,7 +203,11 @@ public final class JobPoller implements
>>>>> */
>>>>> public void queueNow(Job job) throws InvalidJobException {
>>>>> job.queue();
>>>>> - this.executor.execute(new JobInvoker(job));
>>>>> + try {
>>>>> + this.executor.execute(new JobInvoker(job));
>>>>> + } catch (Exception e) {
>>>>> + job.deQueue();
>>>>> + }
>>>>> }
>>>>>
>>>>> public void run() {
>>>>>
>>>>> Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java
>>>>> URL:
>>>>> http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java?rev=1370566&r1=1370565&r2=1370566&view=diff
>>>>> ==============================================================================
>>>>> ---
>>>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java (original)
>>>>> +++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java Tue Aug 7 22:10:27 2012
>>>>> @@ -20,7 +20,6 @@ package org.ofbiz.service.job;
>>>>>
>>>>> import java.io.IOException;
>>>>> import java.sql.Timestamp;
>>>>> -import com.ibm.icu.util.Calendar;
>>>>> import java.util.Date;
>>>>> import java.util.Map;
>>>>>
>>>>> @@ -28,14 +27,12 @@ import javax.xml.parsers.ParserConfigura
>>>>>
>>>>> import javolution.util.FastMap;
>>>>>
>>>>> +import org.apache.commons.lang.StringUtils;
>>>>> import org.ofbiz.base.util.Debug;
>>>>> import org.ofbiz.base.util.UtilDateTime;
>>>>> import org.ofbiz.base.util.UtilGenerics;
>>>>> import org.ofbiz.base.util.UtilProperties;
>>>>> import org.ofbiz.base.util.UtilValidate;
>>>>> -import org.ofbiz.service.calendar.RecurrenceInfoException;
>>>>> -import org.ofbiz.service.calendar.TemporalExpression;
>>>>> -import org.ofbiz.service.calendar.TemporalExpressionWorker;
>>>>> import org.ofbiz.entity.Delegator;
>>>>> import org.ofbiz.entity.GenericEntityException;
>>>>> import org.ofbiz.entity.GenericValue;
>>>>> @@ -47,10 +44,13 @@ import org.ofbiz.service.DispatchContext
>>>>> import org.ofbiz.service.GenericRequester;
>>>>> import org.ofbiz.service.ServiceUtil;
>>>>> import org.ofbiz.service.calendar.RecurrenceInfo;
>>>>> +import org.ofbiz.service.calendar.RecurrenceInfoException;
>>>>> +import org.ofbiz.service.calendar.TemporalExpression;
>>>>> +import org.ofbiz.service.calendar.TemporalExpressionWorker;
>>>>> import org.ofbiz.service.config.ServiceConfigUtil;
>>>>> import org.xml.sax.SAXException;
>>>>>
>>>>> -import org.apache.commons.lang.StringUtils;
>>>>> +import com.ibm.icu.util.Calendar;
>>>>>
>>>>> /**
>>>>> * A {@link Job} that is backed by the entity engine. Job data is stored
>>>>> @@ -100,14 +100,15 @@ public class PersistedServiceJob extends
>>>>> // job not available
>>>>> throw new InvalidJobException("Job [" + getJobId() + "] is not available");
>>>>> } else {
>>>>> - // set the start time to now
>>>>> - jobValue.set("startDateTime", UtilDateTime.nowTimestamp());
>>>>> - jobValue.set("statusId", "SERVICE_RUNNING");
>>>>> + jobValue.set("statusId", "SERVICE_QUEUED");
>>>>> try {
>>>>> jobValue.store();
>>>>> } catch (GenericEntityException e) {
>>>>> throw new InvalidJobException("Unable to set the startDateTime and statusId on the current job [" + getJobId()
>>>>> + "]; not running!", e);
>>>>> }
>>>>> + if (Debug.verboseOn()) {
>>>>> + Debug.logVerbose("Placing job [" + getJobId() + "] in queue", module);
>>>>> + }
>>>>> }
>>>>> }
>>>>>
>>>>> @@ -129,6 +130,16 @@ public class PersistedServiceJob extends
>>>>> // This condition isn't possible, but we will leave it here.
>>>>> throw new InvalidJobException("Job has been accepted by a different instance!");
>>>>> }
>>>>> + jobValue.set("startDateTime", UtilDateTime.nowTimestamp());
>>>>> + jobValue.set("statusId", "SERVICE_RUNNING");
>>>>> + try {
>>>>> + jobValue.store();
>>>>> + } catch (GenericEntityException e) {
>>>>> + throw new InvalidJobException("Unable to set the startDateTime and statusId on the current job [" + getJobId() +
>>>>> "]; not running!", e);
>>>>> + }
>>>>> + if (Debug.verboseOn()) {
>>>>> + Debug.logVerbose("Job [" + getJobId() + "] running", module);
>>>>> + }
>>>>> // configure any additional recurrences
>>>>> long maxRecurrenceCount = -1;
>>>>> long currentRecurrenceCount = 0;
>>>>> @@ -331,4 +342,24 @@ public class PersistedServiceJob extends
>>>>> }
>>>>> return null;
>>>>> }
>>>>> +
>>>>> + @Override
>>>>> + public void deQueue() throws InvalidJobException {
>>>>> + if (currentState != State.QUEUED) {
>>>>> + throw new InvalidJobException("Illegal state change");
>>>>> + }
>>>>> + currentState = State.CREATED;
>>>>> + try {
>>>>> + jobValue.refresh();
>>>>> + jobValue.set("startDateTime", null);
>>>>> + jobValue.set("runByInstanceId", null);
>>>>> + jobValue.set("statusId", "SERVICE_PENDING");
>>>>> + jobValue.store();
>>>>> + } catch (GenericEntityException e) {
>>>>> + throw new InvalidJobException("Unable to dequeue job [" + getJobId() + "]", e);
>>>>> + }
>>>>> + if (Debug.verboseOn()) {
>>>>> + Debug.logVerbose("Job [" + getJobId() + "] not queued, rescheduling", module);
>>>>> + }
>>>>> + }
>>>>> }
>>>>>
>>>>>
>>
Re: svn commit: r1370566 - in /ofbiz/trunk/framework/service/src/org/ofbiz/service/job: GenericServiceJob.java Job.java JobManager.java JobPoller.java PersistedServiceJob.java
Posted by Jacques Le Roux <ja...@les7arts.com>.
I just looked at the diff, I will see in the context, maybe you are right...
Jacques
From: "Adrian Crum" <ad...@sandglass-software.com>
> You are welcome to put them back in. They seemed redundant (and silly) to me. Example:
>
> // Set var to 1
> int var = 1;
> // Call someMethod, put result in var2
> int var2 = someMethod();
> // Calculate var1 * var2
> int result = var1 * var2
>
> The comments are not adding anything of value - they are only stating the obvious.
>
> -Adrian
>
> On 8/8/2012 10:07 PM, Jacques Le Roux wrote:
>> Else forgot to say that I like it
>>
>> Jacques
>>
>> From: "Jacques Le Roux" <ja...@les7arts.com>
>>> Hi Adrian,
>>>
>>> Why did you remove these comments ?
>>>> - // catch Throwable so nothing slips through the cracks... this is a fairly sensitive operation
>>>> - // only rollback the transaction if we started one...
>>>> - // only commit the transaction if we started one... but make sure we try
>>>> - // we don't really want to do anything different, so just log and move on
>>>
>>> It seems they add some information
>>>
>>> Jacques
>>>
>>> From: <ad...@apache.org>
>>>> Author: adrianc
>>>> Date: Tue Aug 7 22:10:27 2012
>>>> New Revision: 1370566
>>>>
>>>> URL: http://svn.apache.org/viewvc?rev=1370566&view=rev
>>>> Log:
>>>> Fixed the Job Scheduler so jobs are not lost during heavy server load.
>>>>
>>>> Modified:
>>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java
>>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java
>>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java
>>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java
>>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java
>>>>
>>>> Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java
>>>> URL:
>>>> http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java?rev=1370566&r1=1370565&r2=1370566&view=diff
>>>> ==============================================================================
>>>> ---
>>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java (original)
>>>> +++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java Tue Aug 7 22:10:27 2012
>>>> @@ -144,4 +144,12 @@ public class GenericServiceJob extends A
>>>> public boolean isValid() {
>>>> return currentState == State.CREATED;
>>>> }
>>>> +
>>>> + @Override
>>>> + public void deQueue() throws InvalidJobException {
>>>> + if (currentState != State.QUEUED) {
>>>> + throw new InvalidJobException("Illegal state change");
>>>> + }
>>>> + throw new InvalidJobException("Unable to queue job [" + getJobId() + "]");
>>>> + }
>>>> }
>>>>
>>>> Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java
>>>> URL:
>>>> http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java?rev=1370566&r1=1370565&r2=1370566&view=diff
>>>> ==============================================================================
>>>> --- ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java (original)
>>>> +++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java Tue Aug 7 22:10:27 2012
>>>> @@ -60,7 +60,13 @@ public interface Job {
>>>> boolean isValid();
>>>>
>>>> /**
>>>> - * Transitions the job to the queued state.
>>>> + * Transitions this job to the pre-queued (created) state. The job manager
>>>> + * will call this method when there was a problem adding this job to the queue.
>>>> + */
>>>> + void deQueue() throws InvalidJobException;
>>>> +
>>>> + /**
>>>> + * Transitions this job to the queued state.
>>>> */
>>>> void queue() throws InvalidJobException;
>>>> }
>>>>
>>>> Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java
>>>> URL:
>>>> http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java?rev=1370566&r1=1370565&r2=1370566&view=diff
>>>> ==============================================================================
>>>> ---
>>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java (original)
>>>> +++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java Tue Aug 7 22:10:27 2012
>>>> @@ -63,7 +63,6 @@ public final class JobManager {
>>>>
>>>> public static final String module = JobManager.class.getName();
>>>> public static final String instanceId = UtilProperties.getPropertyValue("general.properties", "unique.instanceId",
>>>> "ofbiz0");
>>>> - public static final Map<String, Object> updateFields = UtilMisc.<String, Object>toMap("runByInstanceId", instanceId,
>>>> "statusId", "SERVICE_QUEUED");
>>>> private static final ConcurrentHashMap<String, JobManager> registeredManagers = new ConcurrentHashMap<String,
>>>> JobManager>();
>>>> private static boolean isShutDown = false;
>>>>
>>>> @@ -150,7 +149,7 @@ public final class JobManager {
>>>> assertIsRunning();
>>>> DispatchContext dctx = getDispatcher().getDispatchContext();
>>>> if (dctx == null) {
>>>> - Debug.logError("Unable to locate DispatchContext object; not running job!", module);
>>>> + Debug.logWarning("Unable to locate DispatchContext object; not running job!", module);
>>>> return null;
>>>> }
>>>> List<Job> poll = FastList.newInstance();
>>>> @@ -176,13 +175,13 @@ public final class JobManager {
>>>> try {
>>>> beganTransaction = TransactionUtil.begin();
>>>> if (!beganTransaction) {
>>>> - Debug.logError("Unable to poll JobSandbox for jobs; transaction was not started by this process", module);
>>>> + Debug.logWarning("Unable to poll JobSandbox for jobs; transaction was not started by this process", module);
>>>> return null;
>>>> }
>>>> jobsIterator = delegator.find("JobSandbox", mainCondition, null, null, UtilMisc.toList("runTime"), null);
>>>> GenericValue jobValue = jobsIterator.next();
>>>> while (jobValue != null) {
>>>> - jobValue.putAll(updateFields);
>>>> + jobValue.set("runByInstanceId", instanceId); // Claim ownership of this value.
>>>> jobValue.store();
>>>> poll.add(new PersistedServiceJob(dctx, jobValue, null));
>>>> if (poll.size() == limit) {
>>>> @@ -191,14 +190,12 @@ public final class JobManager {
>>>> jobValue = jobsIterator.next();
>>>> }
>>>> } catch (Throwable t) {
>>>> - // catch Throwable so nothing slips through the cracks... this is a fairly sensitive operation
>>>> String errMsg = "Error in polling JobSandbox: [" + t.toString() + "]. Rolling back transaction.";
>>>> - Debug.logError(t, errMsg, module);
>>>> + Debug.logWarning(t, errMsg, module);
>>>> try {
>>>> - // only rollback the transaction if we started one...
>>>> TransactionUtil.rollback(beganTransaction, errMsg, t);
>>>> } catch (GenericEntityException e2) {
>>>> - Debug.logError(e2, "[Delegator] Could not rollback transaction: " + e2.toString(), module);
>>>> + Debug.logWarning(e2, "[Delegator] Could not rollback transaction: " + e2.toString(), module);
>>>> }
>>>> } finally {
>>>> if (jobsIterator != null) {
>>>> @@ -209,12 +206,10 @@ public final class JobManager {
>>>> }
>>>> }
>>>> try {
>>>> - // only commit the transaction if we started one... but make sure we try
>>>> TransactionUtil.commit(beganTransaction);
>>>> } catch (GenericTransactionException e) {
>>>> String errMsg = "Transaction error trying to commit when polling and updating the JobSandbox: " + e.toString();
>>>> - // we don't really want to do anything different, so just log and move on
>>>> - Debug.logError(e, errMsg, module);
>>>> + Debug.logWarning(e, errMsg, module);
>>>> }
>>>> }
>>>> return poll;
>>>> @@ -222,7 +217,9 @@ public final class JobManager {
>>>>
>>>> private void reloadCrashedJobs() {
>>>> List<GenericValue> crashed = null;
>>>> - List<EntityExpr> statusExprList = UtilMisc.toList(EntityCondition.makeCondition("statusId", EntityOperator.EQUALS,
>>>> "SERVICE_QUEUED"), EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_RUNNING"));
>>>> + List<EntityExpr> statusExprList = UtilMisc.toList(EntityCondition.makeCondition("statusId", EntityOperator.EQUALS,
>>>> "SERVICE_PENDING"),
>>>> + EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_QUEUED"),
>>>> + EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_RUNNING"));
>>>> EntityCondition statusCondition = EntityCondition.makeCondition(statusExprList, EntityOperator.OR);
>>>> List<EntityExpr> poolsExpr = UtilMisc.toList(EntityCondition.makeCondition("poolId", EntityOperator.EQUALS, null));
>>>> List<String> pools = ServiceConfigUtil.getRunPools();
>>>> @@ -236,13 +233,13 @@ public final class JobManager {
>>>> try {
>>>> crashed = delegator.findList("JobSandbox", mainCondition, null, UtilMisc.toList("startDateTime"), null, false);
>>>> } catch (GenericEntityException e) {
>>>> - Debug.logError(e, "Unable to load crashed jobs", module);
>>>> + Debug.logWarning(e, "Unable to load crashed jobs", module);
>>>> }
>>>> if (UtilValidate.isNotEmpty(crashed)) {
>>>> - try {
>>>> - int rescheduled = 0;
>>>> - for (GenericValue job : crashed) {
>>>> - Timestamp now = UtilDateTime.nowTimestamp();
>>>> + int rescheduled = 0;
>>>> + Timestamp now = UtilDateTime.nowTimestamp();
>>>> + for (GenericValue job : crashed) {
>>>> + try {
>>>> Debug.logInfo("Scheduling Job : " + job, module);
>>>> String pJobId = job.getString("parentJobId");
>>>> if (pJobId == null) {
>>>> @@ -261,12 +258,12 @@ public final class JobManager {
>>>> job.set("cancelDateTime", now);
>>>> delegator.store(job);
>>>> rescheduled++;
>>>> + } catch (GenericEntityException e) {
>>>> + Debug.logWarning(e, module);
>>>> }
>>>> - if (Debug.infoOn())
>>>> - Debug.logInfo("-- " + rescheduled + " jobs re-scheduled", module);
>>>> - } catch (GenericEntityException e) {
>>>> - Debug.logError(e, module);
>>>> }
>>>> + if (Debug.infoOn())
>>>> + Debug.logInfo("-- " + rescheduled + " jobs re-scheduled", module);
>>>> } else {
>>>> if (Debug.infoOn())
>>>> Debug.logInfo("No crashed jobs to re-schedule", module);
>>>>
>>>> Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java
>>>> URL:
>>>> http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java?rev=1370566&r1=1370565&r2=1370566&view=diff
>>>> ==============================================================================
>>>> ---
>>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java (original)
>>>> +++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java Tue Aug 7 22:10:27 2012
>>>> @@ -203,7 +203,11 @@ public final class JobPoller implements
>>>> */
>>>> public void queueNow(Job job) throws InvalidJobException {
>>>> job.queue();
>>>> - this.executor.execute(new JobInvoker(job));
>>>> + try {
>>>> + this.executor.execute(new JobInvoker(job));
>>>> + } catch (Exception e) {
>>>> + job.deQueue();
>>>> + }
>>>> }
>>>>
>>>> public void run() {
>>>>
>>>> Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java
>>>> URL:
>>>> http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java?rev=1370566&r1=1370565&r2=1370566&view=diff
>>>> ==============================================================================
>>>> ---
>>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java (original)
>>>> +++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java Tue Aug 7 22:10:27 2012
>>>> @@ -20,7 +20,6 @@ package org.ofbiz.service.job;
>>>>
>>>> import java.io.IOException;
>>>> import java.sql.Timestamp;
>>>> -import com.ibm.icu.util.Calendar;
>>>> import java.util.Date;
>>>> import java.util.Map;
>>>>
>>>> @@ -28,14 +27,12 @@ import javax.xml.parsers.ParserConfigura
>>>>
>>>> import javolution.util.FastMap;
>>>>
>>>> +import org.apache.commons.lang.StringUtils;
>>>> import org.ofbiz.base.util.Debug;
>>>> import org.ofbiz.base.util.UtilDateTime;
>>>> import org.ofbiz.base.util.UtilGenerics;
>>>> import org.ofbiz.base.util.UtilProperties;
>>>> import org.ofbiz.base.util.UtilValidate;
>>>> -import org.ofbiz.service.calendar.RecurrenceInfoException;
>>>> -import org.ofbiz.service.calendar.TemporalExpression;
>>>> -import org.ofbiz.service.calendar.TemporalExpressionWorker;
>>>> import org.ofbiz.entity.Delegator;
>>>> import org.ofbiz.entity.GenericEntityException;
>>>> import org.ofbiz.entity.GenericValue;
>>>> @@ -47,10 +44,13 @@ import org.ofbiz.service.DispatchContext
>>>> import org.ofbiz.service.GenericRequester;
>>>> import org.ofbiz.service.ServiceUtil;
>>>> import org.ofbiz.service.calendar.RecurrenceInfo;
>>>> +import org.ofbiz.service.calendar.RecurrenceInfoException;
>>>> +import org.ofbiz.service.calendar.TemporalExpression;
>>>> +import org.ofbiz.service.calendar.TemporalExpressionWorker;
>>>> import org.ofbiz.service.config.ServiceConfigUtil;
>>>> import org.xml.sax.SAXException;
>>>>
>>>> -import org.apache.commons.lang.StringUtils;
>>>> +import com.ibm.icu.util.Calendar;
>>>>
>>>> /**
>>>> * A {@link Job} that is backed by the entity engine. Job data is stored
>>>> @@ -100,14 +100,15 @@ public class PersistedServiceJob extends
>>>> // job not available
>>>> throw new InvalidJobException("Job [" + getJobId() + "] is not available");
>>>> } else {
>>>> - // set the start time to now
>>>> - jobValue.set("startDateTime", UtilDateTime.nowTimestamp());
>>>> - jobValue.set("statusId", "SERVICE_RUNNING");
>>>> + jobValue.set("statusId", "SERVICE_QUEUED");
>>>> try {
>>>> jobValue.store();
>>>> } catch (GenericEntityException e) {
>>>> throw new InvalidJobException("Unable to set the startDateTime and statusId on the current job [" + getJobId()
>>>> + "]; not running!", e);
>>>> }
>>>> + if (Debug.verboseOn()) {
>>>> + Debug.logVerbose("Placing job [" + getJobId() + "] in queue", module);
>>>> + }
>>>> }
>>>> }
>>>>
>>>> @@ -129,6 +130,16 @@ public class PersistedServiceJob extends
>>>> // This condition isn't possible, but we will leave it here.
>>>> throw new InvalidJobException("Job has been accepted by a different instance!");
>>>> }
>>>> + jobValue.set("startDateTime", UtilDateTime.nowTimestamp());
>>>> + jobValue.set("statusId", "SERVICE_RUNNING");
>>>> + try {
>>>> + jobValue.store();
>>>> + } catch (GenericEntityException e) {
>>>> + throw new InvalidJobException("Unable to set the startDateTime and statusId on the current job [" + getJobId() +
>>>> "]; not running!", e);
>>>> + }
>>>> + if (Debug.verboseOn()) {
>>>> + Debug.logVerbose("Job [" + getJobId() + "] running", module);
>>>> + }
>>>> // configure any additional recurrences
>>>> long maxRecurrenceCount = -1;
>>>> long currentRecurrenceCount = 0;
>>>> @@ -331,4 +342,24 @@ public class PersistedServiceJob extends
>>>> }
>>>> return null;
>>>> }
>>>> +
>>>> + @Override
>>>> + public void deQueue() throws InvalidJobException {
>>>> + if (currentState != State.QUEUED) {
>>>> + throw new InvalidJobException("Illegal state change");
>>>> + }
>>>> + currentState = State.CREATED;
>>>> + try {
>>>> + jobValue.refresh();
>>>> + jobValue.set("startDateTime", null);
>>>> + jobValue.set("runByInstanceId", null);
>>>> + jobValue.set("statusId", "SERVICE_PENDING");
>>>> + jobValue.store();
>>>> + } catch (GenericEntityException e) {
>>>> + throw new InvalidJobException("Unable to dequeue job [" + getJobId() + "]", e);
>>>> + }
>>>> + if (Debug.verboseOn()) {
>>>> + Debug.logVerbose("Job [" + getJobId() + "] not queued, rescheduling", module);
>>>> + }
>>>> + }
>>>> }
>>>>
>>>>
>
Re: svn commit: r1370566 - in /ofbiz/trunk/framework/service/src/org/ofbiz/service/job:
GenericServiceJob.java Job.java JobManager.java JobPoller.java PersistedServiceJob.java
Posted by Adrian Crum <ad...@sandglass-software.com>.
You are welcome to put them back in. They seemed redundant (and silly)
to me. Example:
// Set var to 1
int var = 1;
// Call someMethod, put result in var2
int var2 = someMethod();
// Calculate var1 * var2
int result = var1 * var2
The comments are not adding anything of value - they are only stating
the obvious.
-Adrian
On 8/8/2012 10:07 PM, Jacques Le Roux wrote:
> Else forgot to say that I like it
>
> Jacques
>
> From: "Jacques Le Roux" <ja...@les7arts.com>
>> Hi Adrian,
>>
>> Why did you remove these comments ?
>>> - // catch Throwable so nothing slips through the
>>> cracks... this is a fairly sensitive operation
>>> - // only rollback the transaction if we started one...
>>> - // only commit the transaction if we started one...
>>> but make sure we try
>>> - // we don't really want to do anything different,
>>> so just log and move on
>>
>> It seems they add some information
>>
>> Jacques
>>
>> From: <ad...@apache.org>
>>> Author: adrianc
>>> Date: Tue Aug 7 22:10:27 2012
>>> New Revision: 1370566
>>>
>>> URL: http://svn.apache.org/viewvc?rev=1370566&view=rev
>>> Log:
>>> Fixed the Job Scheduler so jobs are not lost during heavy server load.
>>>
>>> Modified:
>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java
>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java
>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java
>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java
>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java
>>>
>>> Modified:
>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java
>>> URL:
>>> http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java?rev=1370566&r1=1370565&r2=1370566&view=diff
>>> ==============================================================================
>>>
>>> ---
>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java
>>> (original)
>>> +++
>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java
>>> Tue Aug 7 22:10:27 2012
>>> @@ -144,4 +144,12 @@ public class GenericServiceJob extends A
>>> public boolean isValid() {
>>> return currentState == State.CREATED;
>>> }
>>> +
>>> + @Override
>>> + public void deQueue() throws InvalidJobException {
>>> + if (currentState != State.QUEUED) {
>>> + throw new InvalidJobException("Illegal state change");
>>> + }
>>> + throw new InvalidJobException("Unable to queue job [" +
>>> getJobId() + "]");
>>> + }
>>> }
>>>
>>> Modified:
>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java
>>> URL:
>>> http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java?rev=1370566&r1=1370565&r2=1370566&view=diff
>>> ==============================================================================
>>>
>>> --- ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java
>>> (original)
>>> +++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java
>>> Tue Aug 7 22:10:27 2012
>>> @@ -60,7 +60,13 @@ public interface Job {
>>> boolean isValid();
>>>
>>> /**
>>> - * Transitions the job to the queued state.
>>> + * Transitions this job to the pre-queued (created) state. The
>>> job manager
>>> + * will call this method when there was a problem adding this
>>> job to the queue.
>>> + */
>>> + void deQueue() throws InvalidJobException;
>>> +
>>> + /**
>>> + * Transitions this job to the queued state.
>>> */
>>> void queue() throws InvalidJobException;
>>> }
>>>
>>> Modified:
>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java
>>> URL:
>>> http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java?rev=1370566&r1=1370565&r2=1370566&view=diff
>>> ==============================================================================
>>>
>>> ---
>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java
>>> (original)
>>> +++
>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java
>>> Tue Aug 7 22:10:27 2012
>>> @@ -63,7 +63,6 @@ public final class JobManager {
>>>
>>> public static final String module = JobManager.class.getName();
>>> public static final String instanceId =
>>> UtilProperties.getPropertyValue("general.properties",
>>> "unique.instanceId", "ofbiz0");
>>> - public static final Map<String, Object> updateFields =
>>> UtilMisc.<String, Object>toMap("runByInstanceId", instanceId,
>>> "statusId", "SERVICE_QUEUED");
>>> private static final ConcurrentHashMap<String, JobManager>
>>> registeredManagers = new ConcurrentHashMap<String, JobManager>();
>>> private static boolean isShutDown = false;
>>>
>>> @@ -150,7 +149,7 @@ public final class JobManager {
>>> assertIsRunning();
>>> DispatchContext dctx = getDispatcher().getDispatchContext();
>>> if (dctx == null) {
>>> - Debug.logError("Unable to locate DispatchContext
>>> object; not running job!", module);
>>> + Debug.logWarning("Unable to locate DispatchContext
>>> object; not running job!", module);
>>> return null;
>>> }
>>> List<Job> poll = FastList.newInstance();
>>> @@ -176,13 +175,13 @@ public final class JobManager {
>>> try {
>>> beganTransaction = TransactionUtil.begin();
>>> if (!beganTransaction) {
>>> - Debug.logError("Unable to poll JobSandbox for jobs;
>>> transaction was not started by this process", module);
>>> + Debug.logWarning("Unable to poll JobSandbox for
>>> jobs; transaction was not started by this process", module);
>>> return null;
>>> }
>>> jobsIterator = delegator.find("JobSandbox",
>>> mainCondition, null, null, UtilMisc.toList("runTime"), null);
>>> GenericValue jobValue = jobsIterator.next();
>>> while (jobValue != null) {
>>> - jobValue.putAll(updateFields);
>>> + jobValue.set("runByInstanceId", instanceId); //
>>> Claim ownership of this value.
>>> jobValue.store();
>>> poll.add(new PersistedServiceJob(dctx, jobValue,
>>> null));
>>> if (poll.size() == limit) {
>>> @@ -191,14 +190,12 @@ public final class JobManager {
>>> jobValue = jobsIterator.next();
>>> }
>>> } catch (Throwable t) {
>>> - // catch Throwable so nothing slips through the
>>> cracks... this is a fairly sensitive operation
>>> String errMsg = "Error in polling JobSandbox: [" +
>>> t.toString() + "]. Rolling back transaction.";
>>> - Debug.logError(t, errMsg, module);
>>> + Debug.logWarning(t, errMsg, module);
>>> try {
>>> - // only rollback the transaction if we started one...
>>> TransactionUtil.rollback(beganTransaction, errMsg, t);
>>> } catch (GenericEntityException e2) {
>>> - Debug.logError(e2, "[Delegator] Could not rollback
>>> transaction: " + e2.toString(), module);
>>> + Debug.logWarning(e2, "[Delegator] Could not
>>> rollback transaction: " + e2.toString(), module);
>>> }
>>> } finally {
>>> if (jobsIterator != null) {
>>> @@ -209,12 +206,10 @@ public final class JobManager {
>>> }
>>> }
>>> try {
>>> - // only commit the transaction if we started one...
>>> but make sure we try
>>> TransactionUtil.commit(beganTransaction);
>>> } catch (GenericTransactionException e) {
>>> String errMsg = "Transaction error trying to commit
>>> when polling and updating the JobSandbox: " + e.toString();
>>> - // we don't really want to do anything different,
>>> so just log and move on
>>> - Debug.logError(e, errMsg, module);
>>> + Debug.logWarning(e, errMsg, module);
>>> }
>>> }
>>> return poll;
>>> @@ -222,7 +217,9 @@ public final class JobManager {
>>>
>>> private void reloadCrashedJobs() {
>>> List<GenericValue> crashed = null;
>>> - List<EntityExpr> statusExprList =
>>> UtilMisc.toList(EntityCondition.makeCondition("statusId",
>>> EntityOperator.EQUALS, "SERVICE_QUEUED"),
>>> EntityCondition.makeCondition("statusId", EntityOperator.EQUALS,
>>> "SERVICE_RUNNING"));
>>> + List<EntityExpr> statusExprList =
>>> UtilMisc.toList(EntityCondition.makeCondition("statusId",
>>> EntityOperator.EQUALS, "SERVICE_PENDING"),
>>> + EntityCondition.makeCondition("statusId",
>>> EntityOperator.EQUALS, "SERVICE_QUEUED"),
>>> + EntityCondition.makeCondition("statusId",
>>> EntityOperator.EQUALS, "SERVICE_RUNNING"));
>>> EntityCondition statusCondition =
>>> EntityCondition.makeCondition(statusExprList, EntityOperator.OR);
>>> List<EntityExpr> poolsExpr =
>>> UtilMisc.toList(EntityCondition.makeCondition("poolId",
>>> EntityOperator.EQUALS, null));
>>> List<String> pools = ServiceConfigUtil.getRunPools();
>>> @@ -236,13 +233,13 @@ public final class JobManager {
>>> try {
>>> crashed = delegator.findList("JobSandbox",
>>> mainCondition, null, UtilMisc.toList("startDateTime"), null, false);
>>> } catch (GenericEntityException e) {
>>> - Debug.logError(e, "Unable to load crashed jobs", module);
>>> + Debug.logWarning(e, "Unable to load crashed jobs",
>>> module);
>>> }
>>> if (UtilValidate.isNotEmpty(crashed)) {
>>> - try {
>>> - int rescheduled = 0;
>>> - for (GenericValue job : crashed) {
>>> - Timestamp now = UtilDateTime.nowTimestamp();
>>> + int rescheduled = 0;
>>> + Timestamp now = UtilDateTime.nowTimestamp();
>>> + for (GenericValue job : crashed) {
>>> + try {
>>> Debug.logInfo("Scheduling Job : " + job, module);
>>> String pJobId = job.getString("parentJobId");
>>> if (pJobId == null) {
>>> @@ -261,12 +258,12 @@ public final class JobManager {
>>> job.set("cancelDateTime", now);
>>> delegator.store(job);
>>> rescheduled++;
>>> + } catch (GenericEntityException e) {
>>> + Debug.logWarning(e, module);
>>> }
>>> - if (Debug.infoOn())
>>> - Debug.logInfo("-- " + rescheduled + " jobs
>>> re-scheduled", module);
>>> - } catch (GenericEntityException e) {
>>> - Debug.logError(e, module);
>>> }
>>> + if (Debug.infoOn())
>>> + Debug.logInfo("-- " + rescheduled + " jobs
>>> re-scheduled", module);
>>> } else {
>>> if (Debug.infoOn())
>>> Debug.logInfo("No crashed jobs to re-schedule",
>>> module);
>>>
>>> Modified:
>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java
>>> URL:
>>> http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java?rev=1370566&r1=1370565&r2=1370566&view=diff
>>> ==============================================================================
>>>
>>> ---
>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java
>>> (original)
>>> +++
>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java
>>> Tue Aug 7 22:10:27 2012
>>> @@ -203,7 +203,11 @@ public final class JobPoller implements
>>> */
>>> public void queueNow(Job job) throws InvalidJobException {
>>> job.queue();
>>> - this.executor.execute(new JobInvoker(job));
>>> + try {
>>> + this.executor.execute(new JobInvoker(job));
>>> + } catch (Exception e) {
>>> + job.deQueue();
>>> + }
>>> }
>>>
>>> public void run() {
>>>
>>> Modified:
>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java
>>> URL:
>>> http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java?rev=1370566&r1=1370565&r2=1370566&view=diff
>>> ==============================================================================
>>>
>>> ---
>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java
>>> (original)
>>> +++
>>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java
>>> Tue Aug 7 22:10:27 2012
>>> @@ -20,7 +20,6 @@ package org.ofbiz.service.job;
>>>
>>> import java.io.IOException;
>>> import java.sql.Timestamp;
>>> -import com.ibm.icu.util.Calendar;
>>> import java.util.Date;
>>> import java.util.Map;
>>>
>>> @@ -28,14 +27,12 @@ import javax.xml.parsers.ParserConfigura
>>>
>>> import javolution.util.FastMap;
>>>
>>> +import org.apache.commons.lang.StringUtils;
>>> import org.ofbiz.base.util.Debug;
>>> import org.ofbiz.base.util.UtilDateTime;
>>> import org.ofbiz.base.util.UtilGenerics;
>>> import org.ofbiz.base.util.UtilProperties;
>>> import org.ofbiz.base.util.UtilValidate;
>>> -import org.ofbiz.service.calendar.RecurrenceInfoException;
>>> -import org.ofbiz.service.calendar.TemporalExpression;
>>> -import org.ofbiz.service.calendar.TemporalExpressionWorker;
>>> import org.ofbiz.entity.Delegator;
>>> import org.ofbiz.entity.GenericEntityException;
>>> import org.ofbiz.entity.GenericValue;
>>> @@ -47,10 +44,13 @@ import org.ofbiz.service.DispatchContext
>>> import org.ofbiz.service.GenericRequester;
>>> import org.ofbiz.service.ServiceUtil;
>>> import org.ofbiz.service.calendar.RecurrenceInfo;
>>> +import org.ofbiz.service.calendar.RecurrenceInfoException;
>>> +import org.ofbiz.service.calendar.TemporalExpression;
>>> +import org.ofbiz.service.calendar.TemporalExpressionWorker;
>>> import org.ofbiz.service.config.ServiceConfigUtil;
>>> import org.xml.sax.SAXException;
>>>
>>> -import org.apache.commons.lang.StringUtils;
>>> +import com.ibm.icu.util.Calendar;
>>>
>>> /**
>>> * A {@link Job} that is backed by the entity engine. Job data is
>>> stored
>>> @@ -100,14 +100,15 @@ public class PersistedServiceJob extends
>>> // job not available
>>> throw new InvalidJobException("Job [" + getJobId() + "]
>>> is not available");
>>> } else {
>>> - // set the start time to now
>>> - jobValue.set("startDateTime",
>>> UtilDateTime.nowTimestamp());
>>> - jobValue.set("statusId", "SERVICE_RUNNING");
>>> + jobValue.set("statusId", "SERVICE_QUEUED");
>>> try {
>>> jobValue.store();
>>> } catch (GenericEntityException e) {
>>> throw new InvalidJobException("Unable to set the
>>> startDateTime and statusId on the current job [" + getJobId() + "];
>>> not running!", e);
>>> }
>>> + if (Debug.verboseOn()) {
>>> + Debug.logVerbose("Placing job [" + getJobId() + "]
>>> in queue", module);
>>> + }
>>> }
>>> }
>>>
>>> @@ -129,6 +130,16 @@ public class PersistedServiceJob extends
>>> // This condition isn't possible, but we will leave it
>>> here.
>>> throw new InvalidJobException("Job has been accepted by
>>> a different instance!");
>>> }
>>> + jobValue.set("startDateTime", UtilDateTime.nowTimestamp());
>>> + jobValue.set("statusId", "SERVICE_RUNNING");
>>> + try {
>>> + jobValue.store();
>>> + } catch (GenericEntityException e) {
>>> + throw new InvalidJobException("Unable to set the
>>> startDateTime and statusId on the current job [" + getJobId() + "];
>>> not running!", e);
>>> + }
>>> + if (Debug.verboseOn()) {
>>> + Debug.logVerbose("Job [" + getJobId() + "] running",
>>> module);
>>> + }
>>> // configure any additional recurrences
>>> long maxRecurrenceCount = -1;
>>> long currentRecurrenceCount = 0;
>>> @@ -331,4 +342,24 @@ public class PersistedServiceJob extends
>>> }
>>> return null;
>>> }
>>> +
>>> + @Override
>>> + public void deQueue() throws InvalidJobException {
>>> + if (currentState != State.QUEUED) {
>>> + throw new InvalidJobException("Illegal state change");
>>> + }
>>> + currentState = State.CREATED;
>>> + try {
>>> + jobValue.refresh();
>>> + jobValue.set("startDateTime", null);
>>> + jobValue.set("runByInstanceId", null);
>>> + jobValue.set("statusId", "SERVICE_PENDING");
>>> + jobValue.store();
>>> + } catch (GenericEntityException e) {
>>> + throw new InvalidJobException("Unable to dequeue job ["
>>> + getJobId() + "]", e);
>>> + }
>>> + if (Debug.verboseOn()) {
>>> + Debug.logVerbose("Job [" + getJobId() + "] not queued,
>>> rescheduling", module);
>>> + }
>>> + }
>>> }
>>>
>>>
Re: svn commit: r1370566 - in /ofbiz/trunk/framework/service/src/org/ofbiz/service/job: GenericServiceJob.java Job.java JobManager.java JobPoller.java PersistedServiceJob.java
Posted by Jacques Le Roux <ja...@les7arts.com>.
Else forgot to say that I like it
Jacques
From: "Jacques Le Roux" <ja...@les7arts.com>
> Hi Adrian,
>
> Why did you remove these comments ?
>> - // catch Throwable so nothing slips through the cracks... this is a fairly sensitive operation
>> - // only rollback the transaction if we started one...
>> - // only commit the transaction if we started one... but make sure we try
>> - // we don't really want to do anything different, so just log and move on
>
> It seems they add some information
>
> Jacques
>
> From: <ad...@apache.org>
>> Author: adrianc
>> Date: Tue Aug 7 22:10:27 2012
>> New Revision: 1370566
>>
>> URL: http://svn.apache.org/viewvc?rev=1370566&view=rev
>> Log:
>> Fixed the Job Scheduler so jobs are not lost during heavy server load.
>>
>> Modified:
>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java
>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java
>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java
>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java
>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java
>>
>> Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java
>> URL:
>> http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java?rev=1370566&r1=1370565&r2=1370566&view=diff
>> ==============================================================================
>> --- ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java (original)
>> +++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java Tue Aug 7 22:10:27 2012
>> @@ -144,4 +144,12 @@ public class GenericServiceJob extends A
>> public boolean isValid() {
>> return currentState == State.CREATED;
>> }
>> +
>> + @Override
>> + public void deQueue() throws InvalidJobException {
>> + if (currentState != State.QUEUED) {
>> + throw new InvalidJobException("Illegal state change");
>> + }
>> + throw new InvalidJobException("Unable to queue job [" + getJobId() + "]");
>> + }
>> }
>>
>> Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java
>> URL:
>> http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java?rev=1370566&r1=1370565&r2=1370566&view=diff
>> ==============================================================================
>> --- ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java (original)
>> +++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java Tue Aug 7 22:10:27 2012
>> @@ -60,7 +60,13 @@ public interface Job {
>> boolean isValid();
>>
>> /**
>> - * Transitions the job to the queued state.
>> + * Transitions this job to the pre-queued (created) state. The job manager
>> + * will call this method when there was a problem adding this job to the queue.
>> + */
>> + void deQueue() throws InvalidJobException;
>> +
>> + /**
>> + * Transitions this job to the queued state.
>> */
>> void queue() throws InvalidJobException;
>> }
>>
>> Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java
>> URL:
>> http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java?rev=1370566&r1=1370565&r2=1370566&view=diff
>> ==============================================================================
>> --- ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java (original)
>> +++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java Tue Aug 7 22:10:27 2012
>> @@ -63,7 +63,6 @@ public final class JobManager {
>>
>> public static final String module = JobManager.class.getName();
>> public static final String instanceId = UtilProperties.getPropertyValue("general.properties", "unique.instanceId", "ofbiz0");
>> - public static final Map<String, Object> updateFields = UtilMisc.<String, Object>toMap("runByInstanceId", instanceId,
>> "statusId", "SERVICE_QUEUED");
>> private static final ConcurrentHashMap<String, JobManager> registeredManagers = new ConcurrentHashMap<String, JobManager>();
>> private static boolean isShutDown = false;
>>
>> @@ -150,7 +149,7 @@ public final class JobManager {
>> assertIsRunning();
>> DispatchContext dctx = getDispatcher().getDispatchContext();
>> if (dctx == null) {
>> - Debug.logError("Unable to locate DispatchContext object; not running job!", module);
>> + Debug.logWarning("Unable to locate DispatchContext object; not running job!", module);
>> return null;
>> }
>> List<Job> poll = FastList.newInstance();
>> @@ -176,13 +175,13 @@ public final class JobManager {
>> try {
>> beganTransaction = TransactionUtil.begin();
>> if (!beganTransaction) {
>> - Debug.logError("Unable to poll JobSandbox for jobs; transaction was not started by this process", module);
>> + Debug.logWarning("Unable to poll JobSandbox for jobs; transaction was not started by this process", module);
>> return null;
>> }
>> jobsIterator = delegator.find("JobSandbox", mainCondition, null, null, UtilMisc.toList("runTime"), null);
>> GenericValue jobValue = jobsIterator.next();
>> while (jobValue != null) {
>> - jobValue.putAll(updateFields);
>> + jobValue.set("runByInstanceId", instanceId); // Claim ownership of this value.
>> jobValue.store();
>> poll.add(new PersistedServiceJob(dctx, jobValue, null));
>> if (poll.size() == limit) {
>> @@ -191,14 +190,12 @@ public final class JobManager {
>> jobValue = jobsIterator.next();
>> }
>> } catch (Throwable t) {
>> - // catch Throwable so nothing slips through the cracks... this is a fairly sensitive operation
>> String errMsg = "Error in polling JobSandbox: [" + t.toString() + "]. Rolling back transaction.";
>> - Debug.logError(t, errMsg, module);
>> + Debug.logWarning(t, errMsg, module);
>> try {
>> - // only rollback the transaction if we started one...
>> TransactionUtil.rollback(beganTransaction, errMsg, t);
>> } catch (GenericEntityException e2) {
>> - Debug.logError(e2, "[Delegator] Could not rollback transaction: " + e2.toString(), module);
>> + Debug.logWarning(e2, "[Delegator] Could not rollback transaction: " + e2.toString(), module);
>> }
>> } finally {
>> if (jobsIterator != null) {
>> @@ -209,12 +206,10 @@ public final class JobManager {
>> }
>> }
>> try {
>> - // only commit the transaction if we started one... but make sure we try
>> TransactionUtil.commit(beganTransaction);
>> } catch (GenericTransactionException e) {
>> String errMsg = "Transaction error trying to commit when polling and updating the JobSandbox: " + e.toString();
>> - // we don't really want to do anything different, so just log and move on
>> - Debug.logError(e, errMsg, module);
>> + Debug.logWarning(e, errMsg, module);
>> }
>> }
>> return poll;
>> @@ -222,7 +217,9 @@ public final class JobManager {
>>
>> private void reloadCrashedJobs() {
>> List<GenericValue> crashed = null;
>> - List<EntityExpr> statusExprList = UtilMisc.toList(EntityCondition.makeCondition("statusId", EntityOperator.EQUALS,
>> "SERVICE_QUEUED"), EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_RUNNING"));
>> + List<EntityExpr> statusExprList = UtilMisc.toList(EntityCondition.makeCondition("statusId", EntityOperator.EQUALS,
>> "SERVICE_PENDING"),
>> + EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_QUEUED"),
>> + EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_RUNNING"));
>> EntityCondition statusCondition = EntityCondition.makeCondition(statusExprList, EntityOperator.OR);
>> List<EntityExpr> poolsExpr = UtilMisc.toList(EntityCondition.makeCondition("poolId", EntityOperator.EQUALS, null));
>> List<String> pools = ServiceConfigUtil.getRunPools();
>> @@ -236,13 +233,13 @@ public final class JobManager {
>> try {
>> crashed = delegator.findList("JobSandbox", mainCondition, null, UtilMisc.toList("startDateTime"), null, false);
>> } catch (GenericEntityException e) {
>> - Debug.logError(e, "Unable to load crashed jobs", module);
>> + Debug.logWarning(e, "Unable to load crashed jobs", module);
>> }
>> if (UtilValidate.isNotEmpty(crashed)) {
>> - try {
>> - int rescheduled = 0;
>> - for (GenericValue job : crashed) {
>> - Timestamp now = UtilDateTime.nowTimestamp();
>> + int rescheduled = 0;
>> + Timestamp now = UtilDateTime.nowTimestamp();
>> + for (GenericValue job : crashed) {
>> + try {
>> Debug.logInfo("Scheduling Job : " + job, module);
>> String pJobId = job.getString("parentJobId");
>> if (pJobId == null) {
>> @@ -261,12 +258,12 @@ public final class JobManager {
>> job.set("cancelDateTime", now);
>> delegator.store(job);
>> rescheduled++;
>> + } catch (GenericEntityException e) {
>> + Debug.logWarning(e, module);
>> }
>> - if (Debug.infoOn())
>> - Debug.logInfo("-- " + rescheduled + " jobs re-scheduled", module);
>> - } catch (GenericEntityException e) {
>> - Debug.logError(e, module);
>> }
>> + if (Debug.infoOn())
>> + Debug.logInfo("-- " + rescheduled + " jobs re-scheduled", module);
>> } else {
>> if (Debug.infoOn())
>> Debug.logInfo("No crashed jobs to re-schedule", module);
>>
>> Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java
>> URL:
>> http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java?rev=1370566&r1=1370565&r2=1370566&view=diff
>> ==============================================================================
>> --- ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java (original)
>> +++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java Tue Aug 7 22:10:27 2012
>> @@ -203,7 +203,11 @@ public final class JobPoller implements
>> */
>> public void queueNow(Job job) throws InvalidJobException {
>> job.queue();
>> - this.executor.execute(new JobInvoker(job));
>> + try {
>> + this.executor.execute(new JobInvoker(job));
>> + } catch (Exception e) {
>> + job.deQueue();
>> + }
>> }
>>
>> public void run() {
>>
>> Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java
>> URL:
>> http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java?rev=1370566&r1=1370565&r2=1370566&view=diff
>> ==============================================================================
>> --- ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java (original)
>> +++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java Tue Aug 7 22:10:27 2012
>> @@ -20,7 +20,6 @@ package org.ofbiz.service.job;
>>
>> import java.io.IOException;
>> import java.sql.Timestamp;
>> -import com.ibm.icu.util.Calendar;
>> import java.util.Date;
>> import java.util.Map;
>>
>> @@ -28,14 +27,12 @@ import javax.xml.parsers.ParserConfigura
>>
>> import javolution.util.FastMap;
>>
>> +import org.apache.commons.lang.StringUtils;
>> import org.ofbiz.base.util.Debug;
>> import org.ofbiz.base.util.UtilDateTime;
>> import org.ofbiz.base.util.UtilGenerics;
>> import org.ofbiz.base.util.UtilProperties;
>> import org.ofbiz.base.util.UtilValidate;
>> -import org.ofbiz.service.calendar.RecurrenceInfoException;
>> -import org.ofbiz.service.calendar.TemporalExpression;
>> -import org.ofbiz.service.calendar.TemporalExpressionWorker;
>> import org.ofbiz.entity.Delegator;
>> import org.ofbiz.entity.GenericEntityException;
>> import org.ofbiz.entity.GenericValue;
>> @@ -47,10 +44,13 @@ import org.ofbiz.service.DispatchContext
>> import org.ofbiz.service.GenericRequester;
>> import org.ofbiz.service.ServiceUtil;
>> import org.ofbiz.service.calendar.RecurrenceInfo;
>> +import org.ofbiz.service.calendar.RecurrenceInfoException;
>> +import org.ofbiz.service.calendar.TemporalExpression;
>> +import org.ofbiz.service.calendar.TemporalExpressionWorker;
>> import org.ofbiz.service.config.ServiceConfigUtil;
>> import org.xml.sax.SAXException;
>>
>> -import org.apache.commons.lang.StringUtils;
>> +import com.ibm.icu.util.Calendar;
>>
>> /**
>> * A {@link Job} that is backed by the entity engine. Job data is stored
>> @@ -100,14 +100,15 @@ public class PersistedServiceJob extends
>> // job not available
>> throw new InvalidJobException("Job [" + getJobId() + "] is not available");
>> } else {
>> - // set the start time to now
>> - jobValue.set("startDateTime", UtilDateTime.nowTimestamp());
>> - jobValue.set("statusId", "SERVICE_RUNNING");
>> + jobValue.set("statusId", "SERVICE_QUEUED");
>> try {
>> jobValue.store();
>> } catch (GenericEntityException e) {
>> throw new InvalidJobException("Unable to set the startDateTime and statusId on the current job [" + getJobId() +
>> "]; not running!", e);
>> }
>> + if (Debug.verboseOn()) {
>> + Debug.logVerbose("Placing job [" + getJobId() + "] in queue", module);
>> + }
>> }
>> }
>>
>> @@ -129,6 +130,16 @@ public class PersistedServiceJob extends
>> // This condition isn't possible, but we will leave it here.
>> throw new InvalidJobException("Job has been accepted by a different instance!");
>> }
>> + jobValue.set("startDateTime", UtilDateTime.nowTimestamp());
>> + jobValue.set("statusId", "SERVICE_RUNNING");
>> + try {
>> + jobValue.store();
>> + } catch (GenericEntityException e) {
>> + throw new InvalidJobException("Unable to set the startDateTime and statusId on the current job [" + getJobId() + "];
>> not running!", e);
>> + }
>> + if (Debug.verboseOn()) {
>> + Debug.logVerbose("Job [" + getJobId() + "] running", module);
>> + }
>> // configure any additional recurrences
>> long maxRecurrenceCount = -1;
>> long currentRecurrenceCount = 0;
>> @@ -331,4 +342,24 @@ public class PersistedServiceJob extends
>> }
>> return null;
>> }
>> +
>> + @Override
>> + public void deQueue() throws InvalidJobException {
>> + if (currentState != State.QUEUED) {
>> + throw new InvalidJobException("Illegal state change");
>> + }
>> + currentState = State.CREATED;
>> + try {
>> + jobValue.refresh();
>> + jobValue.set("startDateTime", null);
>> + jobValue.set("runByInstanceId", null);
>> + jobValue.set("statusId", "SERVICE_PENDING");
>> + jobValue.store();
>> + } catch (GenericEntityException e) {
>> + throw new InvalidJobException("Unable to dequeue job [" + getJobId() + "]", e);
>> + }
>> + if (Debug.verboseOn()) {
>> + Debug.logVerbose("Job [" + getJobId() + "] not queued, rescheduling", module);
>> + }
>> + }
>> }
>>
>>
Re: svn commit: r1370566 - in /ofbiz/trunk/framework/service/src/org/ofbiz/service/job: GenericServiceJob.java Job.java JobManager.java JobPoller.java PersistedServiceJob.java
Posted by Jacques Le Roux <ja...@les7arts.com>.
Hi Adrian,
Why did you remove these comments ?
> - // catch Throwable so nothing slips through the cracks... this is a fairly sensitive operation
> - // only rollback the transaction if we started one...
> - // only commit the transaction if we started one... but make sure we try
> - // we don't really want to do anything different, so just log and move on
It seems they add some information
Jacques
From: <ad...@apache.org>
> Author: adrianc
> Date: Tue Aug 7 22:10:27 2012
> New Revision: 1370566
>
> URL: http://svn.apache.org/viewvc?rev=1370566&view=rev
> Log:
> Fixed the Job Scheduler so jobs are not lost during heavy server load.
>
> Modified:
> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java
> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java
> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java
> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java
> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java
>
> Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java
> URL:
> http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java?rev=1370566&r1=1370565&r2=1370566&view=diff
> ==============================================================================
> --- ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java (original)
> +++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/GenericServiceJob.java Tue Aug 7 22:10:27 2012
> @@ -144,4 +144,12 @@ public class GenericServiceJob extends A
> public boolean isValid() {
> return currentState == State.CREATED;
> }
> +
> + @Override
> + public void deQueue() throws InvalidJobException {
> + if (currentState != State.QUEUED) {
> + throw new InvalidJobException("Illegal state change");
> + }
> + throw new InvalidJobException("Unable to queue job [" + getJobId() + "]");
> + }
> }
>
> Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java
> URL:
> http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java?rev=1370566&r1=1370565&r2=1370566&view=diff
> ==============================================================================
> --- ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java (original)
> +++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/Job.java Tue Aug 7 22:10:27 2012
> @@ -60,7 +60,13 @@ public interface Job {
> boolean isValid();
>
> /**
> - * Transitions the job to the queued state.
> + * Transitions this job to the pre-queued (created) state. The job manager
> + * will call this method when there was a problem adding this job to the queue.
> + */
> + void deQueue() throws InvalidJobException;
> +
> + /**
> + * Transitions this job to the queued state.
> */
> void queue() throws InvalidJobException;
> }
>
> Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java
> URL:
> http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java?rev=1370566&r1=1370565&r2=1370566&view=diff
> ==============================================================================
> --- ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java (original)
> +++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java Tue Aug 7 22:10:27 2012
> @@ -63,7 +63,6 @@ public final class JobManager {
>
> public static final String module = JobManager.class.getName();
> public static final String instanceId = UtilProperties.getPropertyValue("general.properties", "unique.instanceId", "ofbiz0");
> - public static final Map<String, Object> updateFields = UtilMisc.<String, Object>toMap("runByInstanceId", instanceId,
> "statusId", "SERVICE_QUEUED");
> private static final ConcurrentHashMap<String, JobManager> registeredManagers = new ConcurrentHashMap<String, JobManager>();
> private static boolean isShutDown = false;
>
> @@ -150,7 +149,7 @@ public final class JobManager {
> assertIsRunning();
> DispatchContext dctx = getDispatcher().getDispatchContext();
> if (dctx == null) {
> - Debug.logError("Unable to locate DispatchContext object; not running job!", module);
> + Debug.logWarning("Unable to locate DispatchContext object; not running job!", module);
> return null;
> }
> List<Job> poll = FastList.newInstance();
> @@ -176,13 +175,13 @@ public final class JobManager {
> try {
> beganTransaction = TransactionUtil.begin();
> if (!beganTransaction) {
> - Debug.logError("Unable to poll JobSandbox for jobs; transaction was not started by this process", module);
> + Debug.logWarning("Unable to poll JobSandbox for jobs; transaction was not started by this process", module);
> return null;
> }
> jobsIterator = delegator.find("JobSandbox", mainCondition, null, null, UtilMisc.toList("runTime"), null);
> GenericValue jobValue = jobsIterator.next();
> while (jobValue != null) {
> - jobValue.putAll(updateFields);
> + jobValue.set("runByInstanceId", instanceId); // Claim ownership of this value.
> jobValue.store();
> poll.add(new PersistedServiceJob(dctx, jobValue, null));
> if (poll.size() == limit) {
> @@ -191,14 +190,12 @@ public final class JobManager {
> jobValue = jobsIterator.next();
> }
> } catch (Throwable t) {
> - // catch Throwable so nothing slips through the cracks... this is a fairly sensitive operation
> String errMsg = "Error in polling JobSandbox: [" + t.toString() + "]. Rolling back transaction.";
> - Debug.logError(t, errMsg, module);
> + Debug.logWarning(t, errMsg, module);
> try {
> - // only rollback the transaction if we started one...
> TransactionUtil.rollback(beganTransaction, errMsg, t);
> } catch (GenericEntityException e2) {
> - Debug.logError(e2, "[Delegator] Could not rollback transaction: " + e2.toString(), module);
> + Debug.logWarning(e2, "[Delegator] Could not rollback transaction: " + e2.toString(), module);
> }
> } finally {
> if (jobsIterator != null) {
> @@ -209,12 +206,10 @@ public final class JobManager {
> }
> }
> try {
> - // only commit the transaction if we started one... but make sure we try
> TransactionUtil.commit(beganTransaction);
> } catch (GenericTransactionException e) {
> String errMsg = "Transaction error trying to commit when polling and updating the JobSandbox: " + e.toString();
> - // we don't really want to do anything different, so just log and move on
> - Debug.logError(e, errMsg, module);
> + Debug.logWarning(e, errMsg, module);
> }
> }
> return poll;
> @@ -222,7 +217,9 @@ public final class JobManager {
>
> private void reloadCrashedJobs() {
> List<GenericValue> crashed = null;
> - List<EntityExpr> statusExprList = UtilMisc.toList(EntityCondition.makeCondition("statusId", EntityOperator.EQUALS,
> "SERVICE_QUEUED"), EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_RUNNING"));
> + List<EntityExpr> statusExprList = UtilMisc.toList(EntityCondition.makeCondition("statusId", EntityOperator.EQUALS,
> "SERVICE_PENDING"),
> + EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_QUEUED"),
> + EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_RUNNING"));
> EntityCondition statusCondition = EntityCondition.makeCondition(statusExprList, EntityOperator.OR);
> List<EntityExpr> poolsExpr = UtilMisc.toList(EntityCondition.makeCondition("poolId", EntityOperator.EQUALS, null));
> List<String> pools = ServiceConfigUtil.getRunPools();
> @@ -236,13 +233,13 @@ public final class JobManager {
> try {
> crashed = delegator.findList("JobSandbox", mainCondition, null, UtilMisc.toList("startDateTime"), null, false);
> } catch (GenericEntityException e) {
> - Debug.logError(e, "Unable to load crashed jobs", module);
> + Debug.logWarning(e, "Unable to load crashed jobs", module);
> }
> if (UtilValidate.isNotEmpty(crashed)) {
> - try {
> - int rescheduled = 0;
> - for (GenericValue job : crashed) {
> - Timestamp now = UtilDateTime.nowTimestamp();
> + int rescheduled = 0;
> + Timestamp now = UtilDateTime.nowTimestamp();
> + for (GenericValue job : crashed) {
> + try {
> Debug.logInfo("Scheduling Job : " + job, module);
> String pJobId = job.getString("parentJobId");
> if (pJobId == null) {
> @@ -261,12 +258,12 @@ public final class JobManager {
> job.set("cancelDateTime", now);
> delegator.store(job);
> rescheduled++;
> + } catch (GenericEntityException e) {
> + Debug.logWarning(e, module);
> }
> - if (Debug.infoOn())
> - Debug.logInfo("-- " + rescheduled + " jobs re-scheduled", module);
> - } catch (GenericEntityException e) {
> - Debug.logError(e, module);
> }
> + if (Debug.infoOn())
> + Debug.logInfo("-- " + rescheduled + " jobs re-scheduled", module);
> } else {
> if (Debug.infoOn())
> Debug.logInfo("No crashed jobs to re-schedule", module);
>
> Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java
> URL:
> http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java?rev=1370566&r1=1370565&r2=1370566&view=diff
> ==============================================================================
> --- ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java (original)
> +++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java Tue Aug 7 22:10:27 2012
> @@ -203,7 +203,11 @@ public final class JobPoller implements
> */
> public void queueNow(Job job) throws InvalidJobException {
> job.queue();
> - this.executor.execute(new JobInvoker(job));
> + try {
> + this.executor.execute(new JobInvoker(job));
> + } catch (Exception e) {
> + job.deQueue();
> + }
> }
>
> public void run() {
>
> Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java
> URL:
> http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java?rev=1370566&r1=1370565&r2=1370566&view=diff
> ==============================================================================
> --- ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java (original)
> +++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java Tue Aug 7 22:10:27 2012
> @@ -20,7 +20,6 @@ package org.ofbiz.service.job;
>
> import java.io.IOException;
> import java.sql.Timestamp;
> -import com.ibm.icu.util.Calendar;
> import java.util.Date;
> import java.util.Map;
>
> @@ -28,14 +27,12 @@ import javax.xml.parsers.ParserConfigura
>
> import javolution.util.FastMap;
>
> +import org.apache.commons.lang.StringUtils;
> import org.ofbiz.base.util.Debug;
> import org.ofbiz.base.util.UtilDateTime;
> import org.ofbiz.base.util.UtilGenerics;
> import org.ofbiz.base.util.UtilProperties;
> import org.ofbiz.base.util.UtilValidate;
> -import org.ofbiz.service.calendar.RecurrenceInfoException;
> -import org.ofbiz.service.calendar.TemporalExpression;
> -import org.ofbiz.service.calendar.TemporalExpressionWorker;
> import org.ofbiz.entity.Delegator;
> import org.ofbiz.entity.GenericEntityException;
> import org.ofbiz.entity.GenericValue;
> @@ -47,10 +44,13 @@ import org.ofbiz.service.DispatchContext
> import org.ofbiz.service.GenericRequester;
> import org.ofbiz.service.ServiceUtil;
> import org.ofbiz.service.calendar.RecurrenceInfo;
> +import org.ofbiz.service.calendar.RecurrenceInfoException;
> +import org.ofbiz.service.calendar.TemporalExpression;
> +import org.ofbiz.service.calendar.TemporalExpressionWorker;
> import org.ofbiz.service.config.ServiceConfigUtil;
> import org.xml.sax.SAXException;
>
> -import org.apache.commons.lang.StringUtils;
> +import com.ibm.icu.util.Calendar;
>
> /**
> * A {@link Job} that is backed by the entity engine. Job data is stored
> @@ -100,14 +100,15 @@ public class PersistedServiceJob extends
> // job not available
> throw new InvalidJobException("Job [" + getJobId() + "] is not available");
> } else {
> - // set the start time to now
> - jobValue.set("startDateTime", UtilDateTime.nowTimestamp());
> - jobValue.set("statusId", "SERVICE_RUNNING");
> + jobValue.set("statusId", "SERVICE_QUEUED");
> try {
> jobValue.store();
> } catch (GenericEntityException e) {
> throw new InvalidJobException("Unable to set the startDateTime and statusId on the current job [" + getJobId() +
> "]; not running!", e);
> }
> + if (Debug.verboseOn()) {
> + Debug.logVerbose("Placing job [" + getJobId() + "] in queue", module);
> + }
> }
> }
>
> @@ -129,6 +130,16 @@ public class PersistedServiceJob extends
> // This condition isn't possible, but we will leave it here.
> throw new InvalidJobException("Job has been accepted by a different instance!");
> }
> + jobValue.set("startDateTime", UtilDateTime.nowTimestamp());
> + jobValue.set("statusId", "SERVICE_RUNNING");
> + try {
> + jobValue.store();
> + } catch (GenericEntityException e) {
> + throw new InvalidJobException("Unable to set the startDateTime and statusId on the current job [" + getJobId() + "];
> not running!", e);
> + }
> + if (Debug.verboseOn()) {
> + Debug.logVerbose("Job [" + getJobId() + "] running", module);
> + }
> // configure any additional recurrences
> long maxRecurrenceCount = -1;
> long currentRecurrenceCount = 0;
> @@ -331,4 +342,24 @@ public class PersistedServiceJob extends
> }
> return null;
> }
> +
> + @Override
> + public void deQueue() throws InvalidJobException {
> + if (currentState != State.QUEUED) {
> + throw new InvalidJobException("Illegal state change");
> + }
> + currentState = State.CREATED;
> + try {
> + jobValue.refresh();
> + jobValue.set("startDateTime", null);
> + jobValue.set("runByInstanceId", null);
> + jobValue.set("statusId", "SERVICE_PENDING");
> + jobValue.store();
> + } catch (GenericEntityException e) {
> + throw new InvalidJobException("Unable to dequeue job [" + getJobId() + "]", e);
> + }
> + if (Debug.verboseOn()) {
> + Debug.logVerbose("Job [" + getJobId() + "] not queued, rescheduling", module);
> + }
> + }
> }
>
>