You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ofbiz.apache.org by Scott Gray <sc...@hotwaxmedia.com> on 2012/08/10 02:24:57 UTC
Re: svn commit: r1370437 - in /ofbiz/trunk/framework/service: config/serviceengine.xml dtd/service-config.xsd src/org/ofbiz/service/job/JobManager.java src/org/ofbiz/service/job/JobPoller.java
Hi Adrian,
I have a feeling that using a direct update to queue jobs was done on purpose to avoid duplication when multiple instances are polling for jobs in the same pool. The problem with doing a select then update is the following:
1. Instance A uses SELECT to gather the jobs to queue, table is locked for updates but reads are still possible
2. Instance B does the same and retrieves the same rows plus any new additions, while instance A works it's way through updating the rows
3. Instance B attempts to update the same rows but is met with a lock on those which instance A has already updated, potential for lock wait timeout issues for instance B (an existing and unavoidable issue)
4. Instance A finishes the updates and commits, locks are released and B can begin it's updates (if the lock wait didn't time out). A sends the retrieved jobs back to the poller to be queued in memory.
5. Instance B finishes the updates and commits, sends the retrieved jobs back to the poller to be queued in memory.
I could be wrong about the above but I'm fairly sure the table wouldn't be locked for reading until A has made its first update (even then I can't recall off the top of my head if this prevents reads until the update is committed). I believe SELECT FOR UPDATE is the only "select then update" strategy that would lock the table immediately and OFBiz doesn't support it.
Regards
Scott
On 8/08/2012, at 7:11 AM, adrianc@apache.org wrote:
> Author: adrianc
> Date: Tue Aug 7 19:11:06 2012
> New Revision: 1370437
>
> URL: http://svn.apache.org/viewvc?rev=1370437&view=rev
> Log:
> More work on the Job Scheduler:
>
> 1. Gave the job queue a fixed size so the job poller can't create an out-of-memory condition.
> 2. Changed min/max thread settings to more conservative values.
> 3. Changed JobSandbox polling code to accept a limit argument to control the number of records retrieved. Also used ELI to limit memory use.
> 4. Improved JobManager reloadCrashedJobs method to recover queued jobs that were being missed previously.
>
> At this stage the Job Scheduler has been fixed to not crash/saturate the server, but it can still lose jobs. That will be fixed in the next commit.
>
> Modified:
> ofbiz/trunk/framework/service/config/serviceengine.xml
> ofbiz/trunk/framework/service/dtd/service-config.xsd
> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java
> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java
>
> Modified: ofbiz/trunk/framework/service/config/serviceengine.xml
> URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/config/serviceengine.xml?rev=1370437&r1=1370436&r2=1370437&view=diff
> ==============================================================================
> --- ofbiz/trunk/framework/service/config/serviceengine.xml (original)
> +++ ofbiz/trunk/framework/service/config/serviceengine.xml Tue Aug 7 19:11:06 2012
> @@ -25,15 +25,16 @@ under the License.
> <!-- Name of the service to use for authorization -->
> <authorization service-name="userLogin"/>
>
> - <!-- Thread pool configuration (max/min threads, uses to live and time to live) -->
> + <!-- Job poller configuration. Many of these attributes are set to the job poller defaults, but they are included here for convenience. -->
> <thread-pool send-to-pool="pool"
> purge-job-days="4"
> failed-retry-min="3"
> - ttl="18000000"
> - min-threads="5"
> - max-threads="15"
> + ttl="120000"
> + jobs="100"
> + min-threads="2"
> + max-threads="5"
> poll-enabled="true"
> - poll-db-millis="20000">
> + poll-db-millis="30000">
> <run-from-pool name="pool"/>
> </thread-pool>
>
>
> Modified: ofbiz/trunk/framework/service/dtd/service-config.xsd
> URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/dtd/service-config.xsd?rev=1370437&r1=1370436&r2=1370437&view=diff
> ==============================================================================
> --- ofbiz/trunk/framework/service/dtd/service-config.xsd (original)
> +++ ofbiz/trunk/framework/service/dtd/service-config.xsd Tue Aug 7 19:11:06 2012
> @@ -58,38 +58,53 @@ under the License.
> <xs:element name="thread-pool">
> <xs:complexType>
> <xs:sequence>
> - <xs:element minOccurs="0" maxOccurs="unbounded" ref="run-from-pool"/>
> + <xs:element name="run-from-pool" minOccurs="0" maxOccurs="unbounded">
> + <xs:complexType>
> + <xs:attribute type="xs:string" name="name" use="required" />
> + </xs:complexType>
> + </xs:element>
> </xs:sequence>
> - <xs:attributeGroup ref="attlist.thread-pool"/>
> + <xs:attribute type="xs:string" name="send-to-pool" use="required" />
> + <xs:attribute type="xs:nonNegativeInteger" name="purge-job-days" default="30" />
> + <xs:attribute type="xs:nonNegativeInteger" name="failed-retry-min" default="30" />
> + <xs:attribute type="xs:nonNegativeInteger" name="ttl">
> + <xs:annotation>
> + <xs:documentation>Idle queue service thread lifespan in milliseconds. Defaults to "120000" (2 minutes).</xs:documentation>
> + </xs:annotation>
> + </xs:attribute>
> + <xs:attribute type="xs:nonNegativeInteger" name="jobs">
> + <xs:annotation>
> + <xs:documentation>Job queue size. Defaults to "100".</xs:documentation>
> + </xs:annotation>
> + </xs:attribute>
> + <xs:attribute type="xs:nonNegativeInteger" name="min-threads">
> + <xs:annotation>
> + <xs:documentation>Minimum number of queue service threads. Defaults to "1".</xs:documentation>
> + </xs:annotation>
> + </xs:attribute>
> + <xs:attribute type="xs:nonNegativeInteger" name="max-threads">
> + <xs:annotation>
> + <xs:documentation>Maximum number of queue service threads. Defaults to "5".</xs:documentation>
> + </xs:annotation>
> + </xs:attribute>
> + <xs:attribute name="poll-enabled">
> + <xs:annotation>
> + <xs:documentation>Enable database polling. Defaults to "true".</xs:documentation>
> + </xs:annotation>
> + <xs:simpleType>
> + <xs:restriction base="xs:token">
> + <xs:enumeration value="true" />
> + <xs:enumeration value="false" />
> + </xs:restriction>
> + </xs:simpleType>
> + </xs:attribute>
> + <xs:attribute type="xs:nonNegativeInteger" name="poll-db-millis">
> + <xs:annotation>
> + <xs:documentation>Database polling interval in milliseconds. Defaults to "30000" (30 seconds).</xs:documentation>
> + </xs:annotation>
> + </xs:attribute>
> </xs:complexType>
> </xs:element>
> - <xs:attributeGroup name="attlist.thread-pool">
> - <xs:attribute type="xs:string" name="send-to-pool" use="required"/>
> - <xs:attribute type="xs:nonNegativeInteger" name="purge-job-days" default="30"/>
> - <xs:attribute type="xs:nonNegativeInteger" name="failed-retry-min" default="30"/>
> - <xs:attribute type="xs:nonNegativeInteger" name="ttl" use="required"/>
> - <xs:attribute type="xs:nonNegativeInteger" name="wait-millis"/> <!-- deprecated -->
> - <xs:attribute type="xs:nonNegativeInteger" name="jobs"/> <!-- deprecated -->
> - <xs:attribute type="xs:nonNegativeInteger" name="min-threads" use="required"/>
> - <xs:attribute type="xs:nonNegativeInteger" name="max-threads" use="required"/>
> - <xs:attribute name="poll-enabled" default="true">
> - <xs:simpleType>
> - <xs:restriction base="xs:token">
> - <xs:enumeration value="true"/>
> - <xs:enumeration value="false"/>
> - </xs:restriction>
> - </xs:simpleType>
> - </xs:attribute>
> - <xs:attribute type="xs:nonNegativeInteger" name="poll-db-millis" use="required"/>
> - </xs:attributeGroup>
> - <xs:element name="run-from-pool">
> - <xs:complexType>
> - <xs:attributeGroup ref="attlist.run-from-pool"/>
> - </xs:complexType>
> - </xs:element>
> - <xs:attributeGroup name="attlist.run-from-pool">
> - <xs:attribute type="xs:string" name="name" use="required"/>
> - </xs:attributeGroup>
> <xs:element name="engine">
> <xs:complexType>
> <xs:sequence>
>
> Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java
> URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java?rev=1370437&r1=1370436&r2=1370437&view=diff
> ==============================================================================
> --- ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java (original)
> +++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java Tue Aug 7 19:11:06 2012
> @@ -38,13 +38,13 @@ import org.ofbiz.entity.Delegator;
> import org.ofbiz.entity.GenericEntityException;
> import org.ofbiz.entity.GenericValue;
> import org.ofbiz.entity.condition.EntityCondition;
> -import org.ofbiz.entity.condition.EntityConditionList;
> import org.ofbiz.entity.condition.EntityExpr;
> import org.ofbiz.entity.condition.EntityOperator;
> import org.ofbiz.entity.serialize.SerializeException;
> import org.ofbiz.entity.serialize.XmlSerializer;
> import org.ofbiz.entity.transaction.GenericTransactionException;
> import org.ofbiz.entity.transaction.TransactionUtil;
> +import org.ofbiz.entity.util.EntityListIterator;
> import org.ofbiz.service.DispatchContext;
> import org.ofbiz.service.LocalDispatcher;
> import org.ofbiz.service.ServiceContainer;
> @@ -146,7 +146,7 @@ public final class JobManager {
> * Returns an empty list if there are no jobs due to run.
> * This method is called by the {@link JobPoller} polling thread.
> */
> - protected synchronized List<Job> poll() {
> + protected List<Job> poll(int limit) {
> assertIsRunning();
> DispatchContext dctx = getDispatcher().getDispatchContext();
> if (dctx == null) {
> @@ -154,8 +154,6 @@ public final class JobManager {
> return null;
> }
> List<Job> poll = FastList.newInstance();
> - // sort the results by time
> - List<String> order = UtilMisc.toList("runTime");
> // basic query
> List<EntityExpr> expressions = UtilMisc.toList(EntityCondition.makeCondition("runTime", EntityOperator.LESS_THAN_EQUAL_TO, UtilDateTime.nowTimestamp()),
> EntityCondition.makeCondition("startDateTime", EntityOperator.EQUALS, null),
> @@ -173,21 +171,24 @@ public final class JobManager {
> EntityCondition baseCondition = EntityCondition.makeCondition(expressions);
> EntityCondition poolCondition = EntityCondition.makeCondition(poolsExpr, EntityOperator.OR);
> EntityCondition mainCondition = EntityCondition.makeCondition(UtilMisc.toList(baseCondition, poolCondition));
> + EntityListIterator jobsIterator = null;
> boolean beganTransaction = false;
> try {
> beganTransaction = TransactionUtil.begin();
> if (!beganTransaction) {
> - Debug.logError("Unable to poll for jobs; transaction was not started by this process", module);
> + Debug.logError("Unable to poll JobSandbox for jobs; transaction was not started by this process", module);
> return null;
> }
> - // first update the jobs w/ this instance running information
> - delegator.storeByCondition("JobSandbox", updateFields, mainCondition);
> - // now query all the 'queued' jobs for this instance
> - List<GenericValue> jobEnt = delegator.findByAnd("JobSandbox", updateFields, order, false);
> - if (UtilValidate.isNotEmpty(jobEnt)) {
> - for (GenericValue v : jobEnt) {
> - poll.add(new PersistedServiceJob(dctx, v, null)); // TODO fix the requester
> + jobsIterator = delegator.find("JobSandbox", mainCondition, null, null, UtilMisc.toList("runTime"), null);
> + GenericValue jobValue = jobsIterator.next();
> + while (jobValue != null) {
> + jobValue.putAll(updateFields);
> + jobValue.store();
> + poll.add(new PersistedServiceJob(dctx, jobValue, null));
> + if (poll.size() == limit) {
> + break;
> }
> + jobValue = jobsIterator.next();
> }
> } catch (Throwable t) {
> // catch Throwable so nothing slips through the cracks... this is a fairly sensitive operation
> @@ -200,6 +201,13 @@ public final class JobManager {
> Debug.logError(e2, "[Delegator] Could not rollback transaction: " + e2.toString(), module);
> }
> } finally {
> + if (jobsIterator != null) {
> + try {
> + jobsIterator.close();
> + } catch (GenericEntityException e) {
> + Debug.logWarning(e, module);
> + }
> + }
> try {
> // only commit the transaction if we started one... but make sure we try
> TransactionUtil.commit(beganTransaction);
> @@ -214,11 +222,19 @@ public final class JobManager {
>
> private void reloadCrashedJobs() {
> List<GenericValue> crashed = null;
> - List<EntityExpr> exprs = UtilMisc.toList(EntityCondition.makeCondition("runByInstanceId", instanceId));
> - exprs.add(EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_RUNNING"));
> - EntityConditionList<EntityExpr> ecl = EntityCondition.makeCondition(exprs);
> + List<EntityExpr> statusExprList = UtilMisc.toList(EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_QUEUED"), EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_RUNNING"));
> + EntityCondition statusCondition = EntityCondition.makeCondition(statusExprList, EntityOperator.OR);
> + List<EntityExpr> poolsExpr = UtilMisc.toList(EntityCondition.makeCondition("poolId", EntityOperator.EQUALS, null));
> + List<String> pools = ServiceConfigUtil.getRunPools();
> + if (pools != null) {
> + for (String poolName : pools) {
> + poolsExpr.add(EntityCondition.makeCondition("poolId", EntityOperator.EQUALS, poolName));
> + }
> + }
> + EntityCondition poolCondition = EntityCondition.makeCondition(poolsExpr, EntityOperator.OR);
> + EntityCondition mainCondition = EntityCondition.makeCondition(UtilMisc.toList(EntityCondition.makeCondition("runByInstanceId", instanceId), statusCondition, poolCondition));
> try {
> - crashed = delegator.findList("JobSandbox", ecl, null, UtilMisc.toList("startDateTime"), null, false);
> + crashed = delegator.findList("JobSandbox", mainCondition, null, UtilMisc.toList("startDateTime"), null, false);
> } catch (GenericEntityException e) {
> Debug.logError(e, "Unable to load crashed jobs", module);
> }
>
> Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java
> URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java?rev=1370437&r1=1370436&r2=1370437&view=diff
> ==============================================================================
> --- ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java (original)
> +++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java Tue Aug 7 19:11:06 2012
> @@ -33,8 +33,6 @@ import java.util.concurrent.atomic.Atomi
> import org.ofbiz.base.util.Debug;
> import org.ofbiz.service.config.ServiceConfigUtil;
>
> -import org.apache.commons.lang.math.NumberUtils;
> -
> /**
> * Job poller. Queues and runs jobs.
> */
> @@ -42,10 +40,11 @@ public final class JobPoller implements
>
> public static final String module = JobPoller.class.getName();
> private static final AtomicInteger created = new AtomicInteger();
> - public static final int MIN_THREADS = 1;
> - public static final int MAX_THREADS = 15;
> - public static final int POLL_WAIT = 20000;
> - public static final long THREAD_TTL = 18000000;
> + private static final int MIN_THREADS = 1; // Must be no less than one or the executor will shut down.
> + private static final int MAX_THREADS = 5; // Values higher than 5 might slow things down.
> + private static final int POLL_WAIT = 30000; // Database polling interval - 30 seconds.
> + private static final int QUEUE_SIZE = 100;
> + private static final long THREAD_TTL = 120000; // Idle thread lifespan - 2 minutes.
>
> private final JobManager jm;
> private final ThreadPoolExecutor executor;
> @@ -61,7 +60,7 @@ public final class JobPoller implements
> public JobPoller(JobManager jm) {
> this.name = jm.getDelegator().getDelegatorName();
> this.jm = jm;
> - this.executor = new ThreadPoolExecutor(minThreads(), maxThreads(), getTTL(), TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),
> + this.executor = new ThreadPoolExecutor(minThreads(), maxThreads(), getTTL(), TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(queueSize()),
> new JobInvokerThreadFactory(this.name), new ThreadPoolExecutor.AbortPolicy());
> }
>
> @@ -116,33 +115,48 @@ public final class JobPoller implements
> }
>
> private long getTTL() {
> - long ttl = THREAD_TTL;
> - try {
> - ttl = NumberUtils.toLong(ServiceConfigUtil.getElementAttr("thread-pool", "ttl"));
> - } catch (NumberFormatException nfe) {
> - Debug.logError("Problems reading value from attribute [ttl] of element [thread-pool] in serviceengine.xml file [" + nfe.toString() + "]. Using default (" + THREAD_TTL + ").", module);
> + String threadTTLAttr = ServiceConfigUtil.getElementAttr("thread-pool", "ttl");
> + if (!threadTTLAttr.isEmpty()) {
> + try {
> + int threadTTL = Integer.parseInt(threadTTLAttr);
> + if (threadTTL > 0) {
> + return threadTTL;
> + }
> + } catch (NumberFormatException e) {
> + Debug.logError("Exception thrown while parsing thread TTL from serviceengine.xml file [" + e + "]. Using default value.", module);
> + }
> }
> - return ttl;
> + return THREAD_TTL;
> }
>
> private int maxThreads() {
> - int max = MAX_THREADS;
> - try {
> - max = Integer.parseInt(ServiceConfigUtil.getElementAttr("thread-pool", "max-threads"));
> - } catch (NumberFormatException nfe) {
> - Debug.logError("Problems reading values from serviceengine.xml file [" + nfe.toString() + "]. Using defaults.", module);
> + String maxThreadsAttr = ServiceConfigUtil.getElementAttr("thread-pool", "max-threads");
> + if (!maxThreadsAttr.isEmpty()) {
> + try {
> + int maxThreads = Integer.parseInt(maxThreadsAttr);
> + if (maxThreads > 0) {
> + return maxThreads;
> + }
> + } catch (NumberFormatException e) {
> + Debug.logError("Exception thrown while parsing maximum threads from serviceengine.xml file [" + e + "]. Using default value.", module);
> + }
> }
> - return max;
> + return MAX_THREADS;
> }
>
> private int minThreads() {
> - int min = MIN_THREADS;
> - try {
> - min = Integer.parseInt(ServiceConfigUtil.getElementAttr("thread-pool", "min-threads"));
> - } catch (NumberFormatException nfe) {
> - Debug.logError("Problems reading values from serviceengine.xml file [" + nfe.toString() + "]. Using defaults.", module);
> + String minThreadsAttr = ServiceConfigUtil.getElementAttr("thread-pool", "min-threads");
> + if (!minThreadsAttr.isEmpty()) {
> + try {
> + int minThreads = Integer.parseInt(minThreadsAttr);
> + if (minThreads > 0) {
> + return minThreads;
> + }
> + } catch (NumberFormatException e) {
> + Debug.logError("Exception thrown while parsing minimum threads from serviceengine.xml file [" + e + "]. Using default value.", module);
> + }
> }
> - return min;
> + return MIN_THREADS;
> }
>
> private boolean pollEnabled() {
> @@ -153,13 +167,33 @@ public final class JobPoller implements
> }
>
> private int pollWaitTime() {
> - int poll = POLL_WAIT;
> - try {
> - poll = Integer.parseInt(ServiceConfigUtil.getElementAttr("thread-pool", "poll-db-millis"));
> - } catch (NumberFormatException nfe) {
> - Debug.logError("Problems reading values from serviceengine.xml file [" + nfe.toString() + "]. Using defaults.", module);
> + String pollIntervalAttr = ServiceConfigUtil.getElementAttr("thread-pool", "poll-db-millis");
> + if (!pollIntervalAttr.isEmpty()) {
> + try {
> + int pollInterval = Integer.parseInt(pollIntervalAttr);
> + if (pollInterval > 0) {
> + return pollInterval;
> + }
> + } catch (NumberFormatException e) {
> + Debug.logError("Exception thrown while parsing database polling interval from serviceengine.xml file [" + e + "]. Using default value.", module);
> + }
> + }
> + return POLL_WAIT;
> + }
> +
> + private int queueSize() {
> + String queueSizeAttr = ServiceConfigUtil.getElementAttr("thread-pool", "jobs");
> + if (!queueSizeAttr.isEmpty()) {
> + try {
> + int queueSize = Integer.parseInt(queueSizeAttr);
> + if (queueSize > 0) {
> + return queueSize;
> + }
> + } catch (NumberFormatException e) {
> + Debug.logError("Exception thrown while parsing queue size from serviceengine.xml file [" + e + "]. Using default value.", module);
> + }
> }
> - return poll;
> + return QUEUE_SIZE;
> }
>
> /**
> @@ -172,29 +206,28 @@ public final class JobPoller implements
> this.executor.execute(new JobInvoker(job));
> }
>
> - public synchronized void run() {
> + public void run() {
> try {
> // wait 30 seconds before the first poll
> - java.lang.Thread.sleep(30000);
> - } catch (InterruptedException e) {
> - }
> - while (!executor.isShutdown()) {
> - try {
> - // grab a list of jobs to run.
> - List<Job> pollList = jm.poll();
> - for (Job job : pollList) {
> - try {
> - queueNow(job);
> - } catch (InvalidJobException e) {
> - Debug.logError(e, module);
> + Thread.sleep(30000);
> + while (!executor.isShutdown()) {
> + int remainingCapacity = executor.getQueue().remainingCapacity();
> + if (remainingCapacity > 0) {
> + List<Job> pollList = jm.poll(remainingCapacity);
> + for (Job job : pollList) {
> + try {
> + queueNow(job);
> + } catch (InvalidJobException e) {
> + Debug.logError(e, module);
> + }
> }
> }
> - // NOTE: using sleep instead of wait for stricter locking
> - java.lang.Thread.sleep(pollWaitTime());
> - } catch (InterruptedException e) {
> - Debug.logError(e, module);
> - stop();
> + Thread.sleep(pollWaitTime());
> }
> + } catch (InterruptedException e) {
> + Debug.logError(e, module);
> + stop();
> + Thread.currentThread().interrupt();
> }
> Debug.logInfo("JobPoller " + this.name + " thread terminated.", module);
> }
> @@ -234,7 +267,7 @@ public final class JobPoller implements
> }
>
> public Thread newThread(Runnable runnable) {
> - return new Thread(runnable, "OFBiz-JobInvoker-" + poolName + "-" + created.getAndIncrement());
> + return new Thread(runnable, "OFBiz-JobQueue-" + poolName + "-" + created.getAndIncrement());
> }
> }
> }
>
>
Re: svn commit: r1370437 - in /ofbiz/trunk/framework/service: config/serviceengine.xml
dtd/service-config.xsd src/org/ofbiz/service/job/JobManager.java src/org/ofbiz/service/job/JobPoller.java
Posted by Adrian Crum <ad...@sandglass-software.com>.
Thanks for the review Scott!
I understand why the original code was done that way, but it is causing
problems on busy servers. I will update the new code to check for
data-race conditions.
-Adrian
On 8/10/2012 1:24 AM, Scott Gray wrote:
> Hi Adrian,
>
> I have a feeling that using a direct update to queue jobs was done on purpose to avoid duplication when multiple instances are polling for jobs in the same pool. The problem with doing a select then update is the following:
> 1. Instance A uses SELECT to gather the jobs to queue, table is locked for updates but reads are still possible
> 2. Instance B does the same and retrieves the same rows plus any new additions, while instance A works it's way through updating the rows
> 3. Instance B attempts to update the same rows but is met with a lock on those which instance A has already updated, potential for lock wait timeout issues for instance B (an existing and unavoidable issue)
> 4. Instance A finishes the updates and commits, locks are released and B can begin it's updates (if the lock wait didn't time out). A sends the retrieved jobs back to the poller to be queued in memory.
> 5. Instance B finishes the updates and commits, sends the retrieved jobs back to the poller to be queued in memory.
>
> I could be wrong about the above but I'm fairly sure the table wouldn't be locked for reading until A has made its first update (even then I can't recall off the top of my head if this prevents reads until the update is committed). I believe SELECT FOR UPDATE is the only "select then update" strategy that would lock the table immediately and OFBiz doesn't support it.
>
> Regards
> Scott
>
> On 8/08/2012, at 7:11 AM, adrianc@apache.org wrote:
>
>> Author: adrianc
>> Date: Tue Aug 7 19:11:06 2012
>> New Revision: 1370437
>>
>> URL: http://svn.apache.org/viewvc?rev=1370437&view=rev
>> Log:
>> More work on the Job Scheduler:
>>
>> 1. Gave the job queue a fixed size so the job poller can't create an out-of-memory condition.
>> 2. Changed min/max thread settings to more conservative values.
>> 3. Changed JobSandbox polling code to accept a limit argument to control the number of records retrieved. Also used ELI to limit memory use.
>> 4. Improved JobManager reloadCrashedJobs method to recover queued jobs that were being missed previously.
>>
>> At this stage the Job Scheduler has been fixed to not crash/saturate the server, but it can still lose jobs. That will be fixed in the next commit.
>>
>> Modified:
>> ofbiz/trunk/framework/service/config/serviceengine.xml
>> ofbiz/trunk/framework/service/dtd/service-config.xsd
>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java
>> ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java
>>
>> Modified: ofbiz/trunk/framework/service/config/serviceengine.xml
>> URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/config/serviceengine.xml?rev=1370437&r1=1370436&r2=1370437&view=diff
>> ==============================================================================
>> --- ofbiz/trunk/framework/service/config/serviceengine.xml (original)
>> +++ ofbiz/trunk/framework/service/config/serviceengine.xml Tue Aug 7 19:11:06 2012
>> @@ -25,15 +25,16 @@ under the License.
>> <!-- Name of the service to use for authorization -->
>> <authorization service-name="userLogin"/>
>>
>> - <!-- Thread pool configuration (max/min threads, uses to live and time to live) -->
>> + <!-- Job poller configuration. Many of these attributes are set to the job poller defaults, but they are included here for convenience. -->
>> <thread-pool send-to-pool="pool"
>> purge-job-days="4"
>> failed-retry-min="3"
>> - ttl="18000000"
>> - min-threads="5"
>> - max-threads="15"
>> + ttl="120000"
>> + jobs="100"
>> + min-threads="2"
>> + max-threads="5"
>> poll-enabled="true"
>> - poll-db-millis="20000">
>> + poll-db-millis="30000">
>> <run-from-pool name="pool"/>
>> </thread-pool>
>>
>>
>> Modified: ofbiz/trunk/framework/service/dtd/service-config.xsd
>> URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/dtd/service-config.xsd?rev=1370437&r1=1370436&r2=1370437&view=diff
>> ==============================================================================
>> --- ofbiz/trunk/framework/service/dtd/service-config.xsd (original)
>> +++ ofbiz/trunk/framework/service/dtd/service-config.xsd Tue Aug 7 19:11:06 2012
>> @@ -58,38 +58,53 @@ under the License.
>> <xs:element name="thread-pool">
>> <xs:complexType>
>> <xs:sequence>
>> - <xs:element minOccurs="0" maxOccurs="unbounded" ref="run-from-pool"/>
>> + <xs:element name="run-from-pool" minOccurs="0" maxOccurs="unbounded">
>> + <xs:complexType>
>> + <xs:attribute type="xs:string" name="name" use="required" />
>> + </xs:complexType>
>> + </xs:element>
>> </xs:sequence>
>> - <xs:attributeGroup ref="attlist.thread-pool"/>
>> + <xs:attribute type="xs:string" name="send-to-pool" use="required" />
>> + <xs:attribute type="xs:nonNegativeInteger" name="purge-job-days" default="30" />
>> + <xs:attribute type="xs:nonNegativeInteger" name="failed-retry-min" default="30" />
>> + <xs:attribute type="xs:nonNegativeInteger" name="ttl">
>> + <xs:annotation>
>> + <xs:documentation>Idle queue service thread lifespan in milliseconds. Defaults to "120000" (2 minutes).</xs:documentation>
>> + </xs:annotation>
>> + </xs:attribute>
>> + <xs:attribute type="xs:nonNegativeInteger" name="jobs">
>> + <xs:annotation>
>> + <xs:documentation>Job queue size. Defaults to "100".</xs:documentation>
>> + </xs:annotation>
>> + </xs:attribute>
>> + <xs:attribute type="xs:nonNegativeInteger" name="min-threads">
>> + <xs:annotation>
>> + <xs:documentation>Minimum number of queue service threads. Defaults to "1".</xs:documentation>
>> + </xs:annotation>
>> + </xs:attribute>
>> + <xs:attribute type="xs:nonNegativeInteger" name="max-threads">
>> + <xs:annotation>
>> + <xs:documentation>Maximum number of queue service threads. Defaults to "5".</xs:documentation>
>> + </xs:annotation>
>> + </xs:attribute>
>> + <xs:attribute name="poll-enabled">
>> + <xs:annotation>
>> + <xs:documentation>Enable database polling. Defaults to "true".</xs:documentation>
>> + </xs:annotation>
>> + <xs:simpleType>
>> + <xs:restriction base="xs:token">
>> + <xs:enumeration value="true" />
>> + <xs:enumeration value="false" />
>> + </xs:restriction>
>> + </xs:simpleType>
>> + </xs:attribute>
>> + <xs:attribute type="xs:nonNegativeInteger" name="poll-db-millis">
>> + <xs:annotation>
>> + <xs:documentation>Database polling interval in milliseconds. Defaults to "30000" (30 seconds).</xs:documentation>
>> + </xs:annotation>
>> + </xs:attribute>
>> </xs:complexType>
>> </xs:element>
>> - <xs:attributeGroup name="attlist.thread-pool">
>> - <xs:attribute type="xs:string" name="send-to-pool" use="required"/>
>> - <xs:attribute type="xs:nonNegativeInteger" name="purge-job-days" default="30"/>
>> - <xs:attribute type="xs:nonNegativeInteger" name="failed-retry-min" default="30"/>
>> - <xs:attribute type="xs:nonNegativeInteger" name="ttl" use="required"/>
>> - <xs:attribute type="xs:nonNegativeInteger" name="wait-millis"/> <!-- deprecated -->
>> - <xs:attribute type="xs:nonNegativeInteger" name="jobs"/> <!-- deprecated -->
>> - <xs:attribute type="xs:nonNegativeInteger" name="min-threads" use="required"/>
>> - <xs:attribute type="xs:nonNegativeInteger" name="max-threads" use="required"/>
>> - <xs:attribute name="poll-enabled" default="true">
>> - <xs:simpleType>
>> - <xs:restriction base="xs:token">
>> - <xs:enumeration value="true"/>
>> - <xs:enumeration value="false"/>
>> - </xs:restriction>
>> - </xs:simpleType>
>> - </xs:attribute>
>> - <xs:attribute type="xs:nonNegativeInteger" name="poll-db-millis" use="required"/>
>> - </xs:attributeGroup>
>> - <xs:element name="run-from-pool">
>> - <xs:complexType>
>> - <xs:attributeGroup ref="attlist.run-from-pool"/>
>> - </xs:complexType>
>> - </xs:element>
>> - <xs:attributeGroup name="attlist.run-from-pool">
>> - <xs:attribute type="xs:string" name="name" use="required"/>
>> - </xs:attributeGroup>
>> <xs:element name="engine">
>> <xs:complexType>
>> <xs:sequence>
>>
>> Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java
>> URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java?rev=1370437&r1=1370436&r2=1370437&view=diff
>> ==============================================================================
>> --- ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java (original)
>> +++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java Tue Aug 7 19:11:06 2012
>> @@ -38,13 +38,13 @@ import org.ofbiz.entity.Delegator;
>> import org.ofbiz.entity.GenericEntityException;
>> import org.ofbiz.entity.GenericValue;
>> import org.ofbiz.entity.condition.EntityCondition;
>> -import org.ofbiz.entity.condition.EntityConditionList;
>> import org.ofbiz.entity.condition.EntityExpr;
>> import org.ofbiz.entity.condition.EntityOperator;
>> import org.ofbiz.entity.serialize.SerializeException;
>> import org.ofbiz.entity.serialize.XmlSerializer;
>> import org.ofbiz.entity.transaction.GenericTransactionException;
>> import org.ofbiz.entity.transaction.TransactionUtil;
>> +import org.ofbiz.entity.util.EntityListIterator;
>> import org.ofbiz.service.DispatchContext;
>> import org.ofbiz.service.LocalDispatcher;
>> import org.ofbiz.service.ServiceContainer;
>> @@ -146,7 +146,7 @@ public final class JobManager {
>> * Returns an empty list if there are no jobs due to run.
>> * This method is called by the {@link JobPoller} polling thread.
>> */
>> - protected synchronized List<Job> poll() {
>> + protected List<Job> poll(int limit) {
>> assertIsRunning();
>> DispatchContext dctx = getDispatcher().getDispatchContext();
>> if (dctx == null) {
>> @@ -154,8 +154,6 @@ public final class JobManager {
>> return null;
>> }
>> List<Job> poll = FastList.newInstance();
>> - // sort the results by time
>> - List<String> order = UtilMisc.toList("runTime");
>> // basic query
>> List<EntityExpr> expressions = UtilMisc.toList(EntityCondition.makeCondition("runTime", EntityOperator.LESS_THAN_EQUAL_TO, UtilDateTime.nowTimestamp()),
>> EntityCondition.makeCondition("startDateTime", EntityOperator.EQUALS, null),
>> @@ -173,21 +171,24 @@ public final class JobManager {
>> EntityCondition baseCondition = EntityCondition.makeCondition(expressions);
>> EntityCondition poolCondition = EntityCondition.makeCondition(poolsExpr, EntityOperator.OR);
>> EntityCondition mainCondition = EntityCondition.makeCondition(UtilMisc.toList(baseCondition, poolCondition));
>> + EntityListIterator jobsIterator = null;
>> boolean beganTransaction = false;
>> try {
>> beganTransaction = TransactionUtil.begin();
>> if (!beganTransaction) {
>> - Debug.logError("Unable to poll for jobs; transaction was not started by this process", module);
>> + Debug.logError("Unable to poll JobSandbox for jobs; transaction was not started by this process", module);
>> return null;
>> }
>> - // first update the jobs w/ this instance running information
>> - delegator.storeByCondition("JobSandbox", updateFields, mainCondition);
>> - // now query all the 'queued' jobs for this instance
>> - List<GenericValue> jobEnt = delegator.findByAnd("JobSandbox", updateFields, order, false);
>> - if (UtilValidate.isNotEmpty(jobEnt)) {
>> - for (GenericValue v : jobEnt) {
>> - poll.add(new PersistedServiceJob(dctx, v, null)); // TODO fix the requester
>> + jobsIterator = delegator.find("JobSandbox", mainCondition, null, null, UtilMisc.toList("runTime"), null);
>> + GenericValue jobValue = jobsIterator.next();
>> + while (jobValue != null) {
>> + jobValue.putAll(updateFields);
>> + jobValue.store();
>> + poll.add(new PersistedServiceJob(dctx, jobValue, null));
>> + if (poll.size() == limit) {
>> + break;
>> }
>> + jobValue = jobsIterator.next();
>> }
>> } catch (Throwable t) {
>> // catch Throwable so nothing slips through the cracks... this is a fairly sensitive operation
>> @@ -200,6 +201,13 @@ public final class JobManager {
>> Debug.logError(e2, "[Delegator] Could not rollback transaction: " + e2.toString(), module);
>> }
>> } finally {
>> + if (jobsIterator != null) {
>> + try {
>> + jobsIterator.close();
>> + } catch (GenericEntityException e) {
>> + Debug.logWarning(e, module);
>> + }
>> + }
>> try {
>> // only commit the transaction if we started one... but make sure we try
>> TransactionUtil.commit(beganTransaction);
>> @@ -214,11 +222,19 @@ public final class JobManager {
>>
>> private void reloadCrashedJobs() {
>> List<GenericValue> crashed = null;
>> - List<EntityExpr> exprs = UtilMisc.toList(EntityCondition.makeCondition("runByInstanceId", instanceId));
>> - exprs.add(EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_RUNNING"));
>> - EntityConditionList<EntityExpr> ecl = EntityCondition.makeCondition(exprs);
>> + List<EntityExpr> statusExprList = UtilMisc.toList(EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_QUEUED"), EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_RUNNING"));
>> + EntityCondition statusCondition = EntityCondition.makeCondition(statusExprList, EntityOperator.OR);
>> + List<EntityExpr> poolsExpr = UtilMisc.toList(EntityCondition.makeCondition("poolId", EntityOperator.EQUALS, null));
>> + List<String> pools = ServiceConfigUtil.getRunPools();
>> + if (pools != null) {
>> + for (String poolName : pools) {
>> + poolsExpr.add(EntityCondition.makeCondition("poolId", EntityOperator.EQUALS, poolName));
>> + }
>> + }
>> + EntityCondition poolCondition = EntityCondition.makeCondition(poolsExpr, EntityOperator.OR);
>> + EntityCondition mainCondition = EntityCondition.makeCondition(UtilMisc.toList(EntityCondition.makeCondition("runByInstanceId", instanceId), statusCondition, poolCondition));
>> try {
>> - crashed = delegator.findList("JobSandbox", ecl, null, UtilMisc.toList("startDateTime"), null, false);
>> + crashed = delegator.findList("JobSandbox", mainCondition, null, UtilMisc.toList("startDateTime"), null, false);
>> } catch (GenericEntityException e) {
>> Debug.logError(e, "Unable to load crashed jobs", module);
>> }
>>
>> Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java
>> URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java?rev=1370437&r1=1370436&r2=1370437&view=diff
>> ==============================================================================
>> --- ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java (original)
>> +++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java Tue Aug 7 19:11:06 2012
>> @@ -33,8 +33,6 @@ import java.util.concurrent.atomic.Atomi
>> import org.ofbiz.base.util.Debug;
>> import org.ofbiz.service.config.ServiceConfigUtil;
>>
>> -import org.apache.commons.lang.math.NumberUtils;
>> -
>> /**
>> * Job poller. Queues and runs jobs.
>> */
>> @@ -42,10 +40,11 @@ public final class JobPoller implements
>>
>> public static final String module = JobPoller.class.getName();
>> private static final AtomicInteger created = new AtomicInteger();
>> - public static final int MIN_THREADS = 1;
>> - public static final int MAX_THREADS = 15;
>> - public static final int POLL_WAIT = 20000;
>> - public static final long THREAD_TTL = 18000000;
>> + private static final int MIN_THREADS = 1; // Must be no less than one or the executor will shut down.
>> + private static final int MAX_THREADS = 5; // Values higher than 5 might slow things down.
>> + private static final int POLL_WAIT = 30000; // Database polling interval - 30 seconds.
>> + private static final int QUEUE_SIZE = 100;
>> + private static final long THREAD_TTL = 120000; // Idle thread lifespan - 2 minutes.
>>
>> private final JobManager jm;
>> private final ThreadPoolExecutor executor;
>> @@ -61,7 +60,7 @@ public final class JobPoller implements
>> public JobPoller(JobManager jm) {
>> this.name = jm.getDelegator().getDelegatorName();
>> this.jm = jm;
>> - this.executor = new ThreadPoolExecutor(minThreads(), maxThreads(), getTTL(), TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),
>> + this.executor = new ThreadPoolExecutor(minThreads(), maxThreads(), getTTL(), TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(queueSize()),
>> new JobInvokerThreadFactory(this.name), new ThreadPoolExecutor.AbortPolicy());
>> }
>>
>> @@ -116,33 +115,48 @@ public final class JobPoller implements
>> }
>>
>> private long getTTL() {
>> - long ttl = THREAD_TTL;
>> - try {
>> - ttl = NumberUtils.toLong(ServiceConfigUtil.getElementAttr("thread-pool", "ttl"));
>> - } catch (NumberFormatException nfe) {
>> - Debug.logError("Problems reading value from attribute [ttl] of element [thread-pool] in serviceengine.xml file [" + nfe.toString() + "]. Using default (" + THREAD_TTL + ").", module);
>> + String threadTTLAttr = ServiceConfigUtil.getElementAttr("thread-pool", "ttl");
>> + if (!threadTTLAttr.isEmpty()) {
>> + try {
>> + int threadTTL = Integer.parseInt(threadTTLAttr);
>> + if (threadTTL > 0) {
>> + return threadTTL;
>> + }
>> + } catch (NumberFormatException e) {
>> + Debug.logError("Exception thrown while parsing thread TTL from serviceengine.xml file [" + e + "]. Using default value.", module);
>> + }
>> }
>> - return ttl;
>> + return THREAD_TTL;
>> }
>>
>> private int maxThreads() {
>> - int max = MAX_THREADS;
>> - try {
>> - max = Integer.parseInt(ServiceConfigUtil.getElementAttr("thread-pool", "max-threads"));
>> - } catch (NumberFormatException nfe) {
>> - Debug.logError("Problems reading values from serviceengine.xml file [" + nfe.toString() + "]. Using defaults.", module);
>> + String maxThreadsAttr = ServiceConfigUtil.getElementAttr("thread-pool", "max-threads");
>> + if (!maxThreadsAttr.isEmpty()) {
>> + try {
>> + int maxThreads = Integer.parseInt(maxThreadsAttr);
>> + if (maxThreads > 0) {
>> + return maxThreads;
>> + }
>> + } catch (NumberFormatException e) {
>> + Debug.logError("Exception thrown while parsing maximum threads from serviceengine.xml file [" + e + "]. Using default value.", module);
>> + }
>> }
>> - return max;
>> + return MAX_THREADS;
>> }
>>
>> private int minThreads() {
>> - int min = MIN_THREADS;
>> - try {
>> - min = Integer.parseInt(ServiceConfigUtil.getElementAttr("thread-pool", "min-threads"));
>> - } catch (NumberFormatException nfe) {
>> - Debug.logError("Problems reading values from serviceengine.xml file [" + nfe.toString() + "]. Using defaults.", module);
>> + String minThreadsAttr = ServiceConfigUtil.getElementAttr("thread-pool", "min-threads");
>> + if (!minThreadsAttr.isEmpty()) {
>> + try {
>> + int minThreads = Integer.parseInt(minThreadsAttr);
>> + if (minThreads > 0) {
>> + return minThreads;
>> + }
>> + } catch (NumberFormatException e) {
>> + Debug.logError("Exception thrown while parsing minimum threads from serviceengine.xml file [" + e + "]. Using default value.", module);
>> + }
>> }
>> - return min;
>> + return MIN_THREADS;
>> }
>>
>> private boolean pollEnabled() {
>> @@ -153,13 +167,33 @@ public final class JobPoller implements
>> }
>>
>> private int pollWaitTime() {
>> - int poll = POLL_WAIT;
>> - try {
>> - poll = Integer.parseInt(ServiceConfigUtil.getElementAttr("thread-pool", "poll-db-millis"));
>> - } catch (NumberFormatException nfe) {
>> - Debug.logError("Problems reading values from serviceengine.xml file [" + nfe.toString() + "]. Using defaults.", module);
>> + String pollIntervalAttr = ServiceConfigUtil.getElementAttr("thread-pool", "poll-db-millis");
>> + if (!pollIntervalAttr.isEmpty()) {
>> + try {
>> + int pollInterval = Integer.parseInt(pollIntervalAttr);
>> + if (pollInterval > 0) {
>> + return pollInterval;
>> + }
>> + } catch (NumberFormatException e) {
>> + Debug.logError("Exception thrown while parsing database polling interval from serviceengine.xml file [" + e + "]. Using default value.", module);
>> + }
>> + }
>> + return POLL_WAIT;
>> + }
>> +
>> + private int queueSize() {
>> + String queueSizeAttr = ServiceConfigUtil.getElementAttr("thread-pool", "jobs");
>> + if (!queueSizeAttr.isEmpty()) {
>> + try {
>> + int queueSize = Integer.parseInt(queueSizeAttr);
>> + if (queueSize > 0) {
>> + return queueSize;
>> + }
>> + } catch (NumberFormatException e) {
>> + Debug.logError("Exception thrown while parsing queue size from serviceengine.xml file [" + e + "]. Using default value.", module);
>> + }
>> }
>> - return poll;
>> + return QUEUE_SIZE;
>> }
>>
>> /**
>> @@ -172,29 +206,28 @@ public final class JobPoller implements
>> this.executor.execute(new JobInvoker(job));
>> }
>>
>> - public synchronized void run() {
>> + public void run() {
>> try {
>> // wait 30 seconds before the first poll
>> - java.lang.Thread.sleep(30000);
>> - } catch (InterruptedException e) {
>> - }
>> - while (!executor.isShutdown()) {
>> - try {
>> - // grab a list of jobs to run.
>> - List<Job> pollList = jm.poll();
>> - for (Job job : pollList) {
>> - try {
>> - queueNow(job);
>> - } catch (InvalidJobException e) {
>> - Debug.logError(e, module);
>> + Thread.sleep(30000);
>> + while (!executor.isShutdown()) {
>> + int remainingCapacity = executor.getQueue().remainingCapacity();
>> + if (remainingCapacity > 0) {
>> + List<Job> pollList = jm.poll(remainingCapacity);
>> + for (Job job : pollList) {
>> + try {
>> + queueNow(job);
>> + } catch (InvalidJobException e) {
>> + Debug.logError(e, module);
>> + }
>> }
>> }
>> - // NOTE: using sleep instead of wait for stricter locking
>> - java.lang.Thread.sleep(pollWaitTime());
>> - } catch (InterruptedException e) {
>> - Debug.logError(e, module);
>> - stop();
>> + Thread.sleep(pollWaitTime());
>> }
>> + } catch (InterruptedException e) {
>> + Debug.logError(e, module);
>> + stop();
>> + Thread.currentThread().interrupt();
>> }
>> Debug.logInfo("JobPoller " + this.name + " thread terminated.", module);
>> }
>> @@ -234,7 +267,7 @@ public final class JobPoller implements
>> }
>>
>> public Thread newThread(Runnable runnable) {
>> - return new Thread(runnable, "OFBiz-JobInvoker-" + poolName + "-" + created.getAndIncrement());
>> + return new Thread(runnable, "OFBiz-JobQueue-" + poolName + "-" + created.getAndIncrement());
>> }
>> }
>> }
>>
>>