You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ofbiz.apache.org by Scott Gray <sc...@hotwaxmedia.com> on 2012/08/10 02:24:57 UTC

Re: svn commit: r1370437 - in /ofbiz/trunk/framework/service: config/serviceengine.xml dtd/service-config.xsd src/org/ofbiz/service/job/JobManager.java src/org/ofbiz/service/job/JobPoller.java

Hi Adrian,

I have a feeling that using a direct update to queue jobs was done on purpose to avoid duplication when multiple instances are polling for jobs in the same pool.  The problem with doing a select then update is the following:
1. Instance A uses SELECT to gather the jobs to queue, table is locked for updates but reads are still possible
2. Instance B does the same and retrieves the same rows plus any new additions, while instance A works it's way through updating the rows 
3. Instance B attempts to update the same rows but is met with a lock on those which instance A has already updated, potential for lock wait timeout issues for instance B (an existing and unavoidable issue)
4. Instance A finishes the updates and commits, locks are released and B can begin it's updates (if the lock wait didn't time out).  A sends the retrieved jobs back to the poller to be queued in memory.
5. Instance B finishes the updates and commits, sends the retrieved jobs back to the poller to be queued in memory.

I could be wrong about the above but I'm fairly sure the table wouldn't be locked for reading until A has made its first update (even then I can't recall off the top of my head if this prevents reads until the update is committed).  I believe SELECT FOR UPDATE is the only "select then update" strategy that would lock the table immediately and OFBiz doesn't support it.

Regards
Scott

On 8/08/2012, at 7:11 AM, adrianc@apache.org wrote:

> Author: adrianc
> Date: Tue Aug  7 19:11:06 2012
> New Revision: 1370437
> 
> URL: http://svn.apache.org/viewvc?rev=1370437&view=rev
> Log:
> More work on the Job Scheduler:
> 
> 1. Gave the job queue a fixed size so the job poller can't create an out-of-memory condition.
> 2. Changed min/max thread settings to more conservative values.
> 3. Changed JobSandbox polling code to accept a limit argument to control the number of records retrieved. Also used ELI to limit memory use.
> 4. Improved JobManager reloadCrashedJobs method to recover queued jobs that were being missed previously.
> 
> At this stage the Job Scheduler has been fixed to not crash/saturate the server, but it can still lose jobs. That will be fixed in the next commit.
> 
> Modified:
>    ofbiz/trunk/framework/service/config/serviceengine.xml
>    ofbiz/trunk/framework/service/dtd/service-config.xsd
>    ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java
>    ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java
> 
> Modified: ofbiz/trunk/framework/service/config/serviceengine.xml
> URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/config/serviceengine.xml?rev=1370437&r1=1370436&r2=1370437&view=diff
> ==============================================================================
> --- ofbiz/trunk/framework/service/config/serviceengine.xml (original)
> +++ ofbiz/trunk/framework/service/config/serviceengine.xml Tue Aug  7 19:11:06 2012
> @@ -25,15 +25,16 @@ under the License.
>         <!-- Name of the service to use for authorization -->
>         <authorization service-name="userLogin"/>
> 
> -        <!-- Thread pool configuration (max/min threads, uses to live and time to live) -->
> +        <!-- Job poller configuration. Many of these attributes are set to the job poller defaults, but they are included here for convenience. -->
>         <thread-pool send-to-pool="pool"
>                      purge-job-days="4"
>                      failed-retry-min="3"
> -                     ttl="18000000"
> -                     min-threads="5"
> -                     max-threads="15"
> +                     ttl="120000"
> +                     jobs="100"
> +                     min-threads="2"
> +                     max-threads="5"
>                      poll-enabled="true"
> -                     poll-db-millis="20000">
> +                     poll-db-millis="30000">
>             <run-from-pool name="pool"/>
>         </thread-pool>
> 
> 
> Modified: ofbiz/trunk/framework/service/dtd/service-config.xsd
> URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/dtd/service-config.xsd?rev=1370437&r1=1370436&r2=1370437&view=diff
> ==============================================================================
> --- ofbiz/trunk/framework/service/dtd/service-config.xsd (original)
> +++ ofbiz/trunk/framework/service/dtd/service-config.xsd Tue Aug  7 19:11:06 2012
> @@ -58,38 +58,53 @@ under the License.
>     <xs:element name="thread-pool">
>         <xs:complexType>
>             <xs:sequence>
> -                <xs:element minOccurs="0" maxOccurs="unbounded" ref="run-from-pool"/>
> +                <xs:element name="run-from-pool" minOccurs="0" maxOccurs="unbounded">
> +                    <xs:complexType>
> +                        <xs:attribute type="xs:string" name="name" use="required" />
> +                    </xs:complexType>
> +                </xs:element>
>             </xs:sequence>
> -            <xs:attributeGroup ref="attlist.thread-pool"/>
> +            <xs:attribute type="xs:string" name="send-to-pool" use="required" />
> +            <xs:attribute type="xs:nonNegativeInteger" name="purge-job-days" default="30" />
> +            <xs:attribute type="xs:nonNegativeInteger" name="failed-retry-min" default="30" />
> +            <xs:attribute type="xs:nonNegativeInteger" name="ttl">
> +                <xs:annotation>
> +                    <xs:documentation>Idle queue service thread lifespan in milliseconds. Defaults to "120000" (2 minutes).</xs:documentation>
> +                </xs:annotation>
> +            </xs:attribute>
> +            <xs:attribute type="xs:nonNegativeInteger" name="jobs">
> +                <xs:annotation>
> +                    <xs:documentation>Job queue size. Defaults to "100".</xs:documentation>
> +                </xs:annotation>
> +            </xs:attribute>
> +            <xs:attribute type="xs:nonNegativeInteger" name="min-threads">
> +                <xs:annotation>
> +                    <xs:documentation>Minimum number of queue service threads. Defaults to "1".</xs:documentation>
> +                </xs:annotation>
> +            </xs:attribute>
> +            <xs:attribute type="xs:nonNegativeInteger" name="max-threads">
> +                <xs:annotation>
> +                    <xs:documentation>Maximum number of queue service threads. Defaults to "5".</xs:documentation>
> +                </xs:annotation>
> +            </xs:attribute>
> +            <xs:attribute name="poll-enabled">
> +                <xs:annotation>
> +                    <xs:documentation>Enable database polling. Defaults to "true".</xs:documentation>
> +                </xs:annotation>
> +                <xs:simpleType>
> +                    <xs:restriction base="xs:token">
> +                        <xs:enumeration value="true" />
> +                        <xs:enumeration value="false" />
> +                    </xs:restriction>
> +                </xs:simpleType>
> +            </xs:attribute>
> +            <xs:attribute type="xs:nonNegativeInteger" name="poll-db-millis">
> +                <xs:annotation>
> +                    <xs:documentation>Database polling interval in milliseconds. Defaults to "30000" (30 seconds).</xs:documentation>
> +                </xs:annotation>
> +            </xs:attribute>
>         </xs:complexType>
>     </xs:element>
> -    <xs:attributeGroup name="attlist.thread-pool">
> -        <xs:attribute type="xs:string" name="send-to-pool" use="required"/>
> -        <xs:attribute type="xs:nonNegativeInteger" name="purge-job-days" default="30"/>
> -        <xs:attribute type="xs:nonNegativeInteger" name="failed-retry-min" default="30"/>
> -        <xs:attribute type="xs:nonNegativeInteger" name="ttl" use="required"/>
> -        <xs:attribute type="xs:nonNegativeInteger" name="wait-millis"/> <!-- deprecated -->
> -        <xs:attribute type="xs:nonNegativeInteger" name="jobs"/> <!-- deprecated -->
> -        <xs:attribute type="xs:nonNegativeInteger" name="min-threads" use="required"/>
> -        <xs:attribute type="xs:nonNegativeInteger" name="max-threads" use="required"/>
> -        <xs:attribute name="poll-enabled" default="true">
> -            <xs:simpleType>
> -                <xs:restriction base="xs:token">
> -                    <xs:enumeration value="true"/>
> -                    <xs:enumeration value="false"/>
> -                </xs:restriction>
> -            </xs:simpleType>
> -        </xs:attribute>
> -        <xs:attribute type="xs:nonNegativeInteger" name="poll-db-millis" use="required"/>
> -    </xs:attributeGroup>
> -    <xs:element name="run-from-pool">
> -        <xs:complexType>
> -            <xs:attributeGroup ref="attlist.run-from-pool"/>
> -        </xs:complexType>
> -    </xs:element>
> -    <xs:attributeGroup name="attlist.run-from-pool">
> -        <xs:attribute type="xs:string" name="name" use="required"/>
> -    </xs:attributeGroup>
>     <xs:element name="engine">
>         <xs:complexType>
>             <xs:sequence>
> 
> Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java
> URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java?rev=1370437&r1=1370436&r2=1370437&view=diff
> ==============================================================================
> --- ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java (original)
> +++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java Tue Aug  7 19:11:06 2012
> @@ -38,13 +38,13 @@ import org.ofbiz.entity.Delegator;
> import org.ofbiz.entity.GenericEntityException;
> import org.ofbiz.entity.GenericValue;
> import org.ofbiz.entity.condition.EntityCondition;
> -import org.ofbiz.entity.condition.EntityConditionList;
> import org.ofbiz.entity.condition.EntityExpr;
> import org.ofbiz.entity.condition.EntityOperator;
> import org.ofbiz.entity.serialize.SerializeException;
> import org.ofbiz.entity.serialize.XmlSerializer;
> import org.ofbiz.entity.transaction.GenericTransactionException;
> import org.ofbiz.entity.transaction.TransactionUtil;
> +import org.ofbiz.entity.util.EntityListIterator;
> import org.ofbiz.service.DispatchContext;
> import org.ofbiz.service.LocalDispatcher;
> import org.ofbiz.service.ServiceContainer;
> @@ -146,7 +146,7 @@ public final class JobManager {
>      * Returns an empty list if there are no jobs due to run.
>      * This method is called by the {@link JobPoller} polling thread.
>      */
> -    protected synchronized List<Job> poll() {
> +    protected List<Job> poll(int limit) {
>         assertIsRunning();
>         DispatchContext dctx = getDispatcher().getDispatchContext();
>         if (dctx == null) {
> @@ -154,8 +154,6 @@ public final class JobManager {
>             return null;
>         }
>         List<Job> poll = FastList.newInstance();
> -        // sort the results by time
> -        List<String> order = UtilMisc.toList("runTime");
>         // basic query
>         List<EntityExpr> expressions = UtilMisc.toList(EntityCondition.makeCondition("runTime", EntityOperator.LESS_THAN_EQUAL_TO, UtilDateTime.nowTimestamp()),
>                 EntityCondition.makeCondition("startDateTime", EntityOperator.EQUALS, null),
> @@ -173,21 +171,24 @@ public final class JobManager {
>         EntityCondition baseCondition = EntityCondition.makeCondition(expressions);
>         EntityCondition poolCondition = EntityCondition.makeCondition(poolsExpr, EntityOperator.OR);
>         EntityCondition mainCondition = EntityCondition.makeCondition(UtilMisc.toList(baseCondition, poolCondition));
> +        EntityListIterator jobsIterator = null;
>         boolean beganTransaction = false;
>         try {
>             beganTransaction = TransactionUtil.begin();
>             if (!beganTransaction) {
> -                Debug.logError("Unable to poll for jobs; transaction was not started by this process", module);
> +                Debug.logError("Unable to poll JobSandbox for jobs; transaction was not started by this process", module);
>                 return null;
>             }
> -            // first update the jobs w/ this instance running information
> -            delegator.storeByCondition("JobSandbox", updateFields, mainCondition);
> -            // now query all the 'queued' jobs for this instance
> -            List<GenericValue> jobEnt = delegator.findByAnd("JobSandbox", updateFields, order, false);
> -            if (UtilValidate.isNotEmpty(jobEnt)) {
> -                for (GenericValue v : jobEnt) {
> -                    poll.add(new PersistedServiceJob(dctx, v, null)); // TODO fix the requester
> +            jobsIterator = delegator.find("JobSandbox", mainCondition, null, null, UtilMisc.toList("runTime"), null);
> +            GenericValue jobValue = jobsIterator.next();
> +            while (jobValue != null) {
> +                jobValue.putAll(updateFields);
> +                jobValue.store();
> +                poll.add(new PersistedServiceJob(dctx, jobValue, null));
> +                if (poll.size() == limit) {
> +                    break;
>                 }
> +                jobValue = jobsIterator.next();
>             }
>         } catch (Throwable t) {
>             // catch Throwable so nothing slips through the cracks... this is a fairly sensitive operation
> @@ -200,6 +201,13 @@ public final class JobManager {
>                 Debug.logError(e2, "[Delegator] Could not rollback transaction: " + e2.toString(), module);
>             }
>         } finally {
> +            if (jobsIterator != null) {
> +                try {
> +                    jobsIterator.close();
> +                } catch (GenericEntityException e) {
> +                    Debug.logWarning(e, module);
> +                }
> +            }
>             try {
>                 // only commit the transaction if we started one... but make sure we try
>                 TransactionUtil.commit(beganTransaction);
> @@ -214,11 +222,19 @@ public final class JobManager {
> 
>     private void reloadCrashedJobs() {
>         List<GenericValue> crashed = null;
> -        List<EntityExpr> exprs = UtilMisc.toList(EntityCondition.makeCondition("runByInstanceId", instanceId));
> -        exprs.add(EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_RUNNING"));
> -        EntityConditionList<EntityExpr> ecl = EntityCondition.makeCondition(exprs);
> +        List<EntityExpr> statusExprList = UtilMisc.toList(EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_QUEUED"), EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_RUNNING"));
> +        EntityCondition statusCondition = EntityCondition.makeCondition(statusExprList, EntityOperator.OR);
> +        List<EntityExpr> poolsExpr = UtilMisc.toList(EntityCondition.makeCondition("poolId", EntityOperator.EQUALS, null));
> +        List<String> pools = ServiceConfigUtil.getRunPools();
> +        if (pools != null) {
> +            for (String poolName : pools) {
> +                poolsExpr.add(EntityCondition.makeCondition("poolId", EntityOperator.EQUALS, poolName));
> +            }
> +        }
> +        EntityCondition poolCondition = EntityCondition.makeCondition(poolsExpr, EntityOperator.OR);
> +        EntityCondition mainCondition = EntityCondition.makeCondition(UtilMisc.toList(EntityCondition.makeCondition("runByInstanceId", instanceId), statusCondition, poolCondition));
>         try {
> -            crashed = delegator.findList("JobSandbox", ecl, null, UtilMisc.toList("startDateTime"), null, false);
> +            crashed = delegator.findList("JobSandbox", mainCondition, null, UtilMisc.toList("startDateTime"), null, false);
>         } catch (GenericEntityException e) {
>             Debug.logError(e, "Unable to load crashed jobs", module);
>         }
> 
> Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java
> URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java?rev=1370437&r1=1370436&r2=1370437&view=diff
> ==============================================================================
> --- ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java (original)
> +++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java Tue Aug  7 19:11:06 2012
> @@ -33,8 +33,6 @@ import java.util.concurrent.atomic.Atomi
> import org.ofbiz.base.util.Debug;
> import org.ofbiz.service.config.ServiceConfigUtil;
> 
> -import org.apache.commons.lang.math.NumberUtils;
> -
> /**
>  * Job poller. Queues and runs jobs.
>  */
> @@ -42,10 +40,11 @@ public final class JobPoller implements 
> 
>     public static final String module = JobPoller.class.getName();
>     private static final AtomicInteger created = new AtomicInteger();
> -    public static final int MIN_THREADS = 1;
> -    public static final int MAX_THREADS = 15;
> -    public static final int POLL_WAIT = 20000;
> -    public static final long THREAD_TTL = 18000000;
> +    private static final int MIN_THREADS = 1; // Must be no less than one or the executor will shut down.
> +    private static final int MAX_THREADS = 5; // Values higher than 5 might slow things down.
> +    private static final int POLL_WAIT = 30000; // Database polling interval - 30 seconds.
> +    private static final int QUEUE_SIZE = 100;
> +    private static final long THREAD_TTL = 120000; // Idle thread lifespan - 2 minutes.
> 
>     private final JobManager jm;
>     private final ThreadPoolExecutor executor;
> @@ -61,7 +60,7 @@ public final class JobPoller implements 
>     public JobPoller(JobManager jm) {
>         this.name = jm.getDelegator().getDelegatorName();
>         this.jm = jm;
> -        this.executor = new ThreadPoolExecutor(minThreads(), maxThreads(), getTTL(), TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),
> +        this.executor = new ThreadPoolExecutor(minThreads(), maxThreads(), getTTL(), TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(queueSize()),
>                 new JobInvokerThreadFactory(this.name), new ThreadPoolExecutor.AbortPolicy());
>     }
> 
> @@ -116,33 +115,48 @@ public final class JobPoller implements 
>     }
> 
>     private long getTTL() {
> -        long ttl = THREAD_TTL;
> -        try {
> -            ttl = NumberUtils.toLong(ServiceConfigUtil.getElementAttr("thread-pool", "ttl"));
> -        } catch (NumberFormatException nfe) {
> -            Debug.logError("Problems reading value from attribute [ttl] of element [thread-pool] in serviceengine.xml file [" + nfe.toString() + "]. Using default (" + THREAD_TTL + ").", module);
> +        String threadTTLAttr = ServiceConfigUtil.getElementAttr("thread-pool", "ttl");
> +        if (!threadTTLAttr.isEmpty()) {
> +            try {
> +                int threadTTL = Integer.parseInt(threadTTLAttr);
> +                if (threadTTL > 0) {
> +                    return threadTTL;
> +                }
> +            } catch (NumberFormatException e) {
> +                Debug.logError("Exception thrown while parsing thread TTL from serviceengine.xml file [" + e + "]. Using default value.", module);
> +            }
>         }
> -        return ttl;
> +        return THREAD_TTL;
>     }
> 
>     private int maxThreads() {
> -        int max = MAX_THREADS;
> -        try {
> -            max = Integer.parseInt(ServiceConfigUtil.getElementAttr("thread-pool", "max-threads"));
> -        } catch (NumberFormatException nfe) {
> -            Debug.logError("Problems reading values from serviceengine.xml file [" + nfe.toString() + "]. Using defaults.", module);
> +        String maxThreadsAttr = ServiceConfigUtil.getElementAttr("thread-pool", "max-threads");
> +        if (!maxThreadsAttr.isEmpty()) {
> +            try {
> +                int maxThreads = Integer.parseInt(maxThreadsAttr);
> +                if (maxThreads > 0) {
> +                    return maxThreads;
> +                }
> +            } catch (NumberFormatException e) {
> +                Debug.logError("Exception thrown while parsing maximum threads from serviceengine.xml file [" + e + "]. Using default value.", module);
> +            }
>         }
> -        return max;
> +        return MAX_THREADS;
>     }
> 
>     private int minThreads() {
> -        int min = MIN_THREADS;
> -        try {
> -            min = Integer.parseInt(ServiceConfigUtil.getElementAttr("thread-pool", "min-threads"));
> -        } catch (NumberFormatException nfe) {
> -            Debug.logError("Problems reading values from serviceengine.xml file [" + nfe.toString() + "]. Using defaults.", module);
> +        String minThreadsAttr = ServiceConfigUtil.getElementAttr("thread-pool", "min-threads");
> +        if (!minThreadsAttr.isEmpty()) {
> +            try {
> +                int minThreads = Integer.parseInt(minThreadsAttr);
> +                if (minThreads > 0) {
> +                    return minThreads;
> +                }
> +            } catch (NumberFormatException e) {
> +                Debug.logError("Exception thrown while parsing minimum threads from serviceengine.xml file [" + e + "]. Using default value.", module);
> +            }
>         }
> -        return min;
> +        return MIN_THREADS;
>     }
> 
>     private boolean pollEnabled() {
> @@ -153,13 +167,33 @@ public final class JobPoller implements 
>     }
> 
>     private int pollWaitTime() {
> -        int poll = POLL_WAIT;
> -        try {
> -            poll = Integer.parseInt(ServiceConfigUtil.getElementAttr("thread-pool", "poll-db-millis"));
> -        } catch (NumberFormatException nfe) {
> -            Debug.logError("Problems reading values from serviceengine.xml file [" + nfe.toString() + "]. Using defaults.", module);
> +        String pollIntervalAttr = ServiceConfigUtil.getElementAttr("thread-pool", "poll-db-millis");
> +        if (!pollIntervalAttr.isEmpty()) {
> +            try {
> +                int pollInterval = Integer.parseInt(pollIntervalAttr);
> +                if (pollInterval > 0) {
> +                    return pollInterval;
> +                }
> +            } catch (NumberFormatException e) {
> +                Debug.logError("Exception thrown while parsing database polling interval from serviceengine.xml file [" + e + "]. Using default value.", module);
> +            }
> +        }
> +        return POLL_WAIT;
> +    }
> +
> +    private int queueSize() {
> +        String queueSizeAttr = ServiceConfigUtil.getElementAttr("thread-pool", "jobs");
> +        if (!queueSizeAttr.isEmpty()) {
> +            try {
> +                int queueSize = Integer.parseInt(queueSizeAttr);
> +                if (queueSize > 0) {
> +                    return queueSize;
> +                }
> +            } catch (NumberFormatException e) {
> +                Debug.logError("Exception thrown while parsing queue size from serviceengine.xml file [" + e + "]. Using default value.", module);
> +            }
>         }
> -        return poll;
> +        return QUEUE_SIZE;
>     }
> 
>     /**
> @@ -172,29 +206,28 @@ public final class JobPoller implements 
>         this.executor.execute(new JobInvoker(job));
>     }
> 
> -    public synchronized void run() {
> +    public void run() {
>         try {
>             // wait 30 seconds before the first poll
> -            java.lang.Thread.sleep(30000);
> -        } catch (InterruptedException e) {
> -        }
> -        while (!executor.isShutdown()) {
> -            try {
> -                // grab a list of jobs to run.
> -                List<Job> pollList = jm.poll();
> -                for (Job job : pollList) {
> -                    try {
> -                        queueNow(job);
> -                    } catch (InvalidJobException e) {
> -                        Debug.logError(e, module);
> +            Thread.sleep(30000);
> +            while (!executor.isShutdown()) {
> +                int remainingCapacity = executor.getQueue().remainingCapacity();
> +                if (remainingCapacity > 0) {
> +                    List<Job> pollList = jm.poll(remainingCapacity);
> +                    for (Job job : pollList) {
> +                        try {
> +                            queueNow(job);
> +                        } catch (InvalidJobException e) {
> +                            Debug.logError(e, module);
> +                        }
>                     }
>                 }
> -                // NOTE: using sleep instead of wait for stricter locking
> -                java.lang.Thread.sleep(pollWaitTime());
> -            } catch (InterruptedException e) {
> -                Debug.logError(e, module);
> -                stop();
> +                Thread.sleep(pollWaitTime());
>             }
> +        } catch (InterruptedException e) {
> +            Debug.logError(e, module);
> +            stop();
> +            Thread.currentThread().interrupt();
>         }
>         Debug.logInfo("JobPoller " + this.name + " thread terminated.", module);
>     }
> @@ -234,7 +267,7 @@ public final class JobPoller implements 
>         }
> 
>         public Thread newThread(Runnable runnable) {
> -            return new Thread(runnable, "OFBiz-JobInvoker-" + poolName + "-" + created.getAndIncrement());
> +            return new Thread(runnable, "OFBiz-JobQueue-" + poolName + "-" + created.getAndIncrement());
>         }
>     }
> }
> 
> 


Re: svn commit: r1370437 - in /ofbiz/trunk/framework/service: config/serviceengine.xml dtd/service-config.xsd src/org/ofbiz/service/job/JobManager.java src/org/ofbiz/service/job/JobPoller.java

Posted by Adrian Crum <ad...@sandglass-software.com>.
Thanks for the review Scott!

I understand why the original code was done that way, but it is causing 
problems on busy servers. I will update the new code to check for 
data-race conditions.

-Adrian

On 8/10/2012 1:24 AM, Scott Gray wrote:
> Hi Adrian,
>
> I have a feeling that using a direct update to queue jobs was done on purpose to avoid duplication when multiple instances are polling for jobs in the same pool.  The problem with doing a select then update is the following:
> 1. Instance A uses SELECT to gather the jobs to queue, table is locked for updates but reads are still possible
> 2. Instance B does the same and retrieves the same rows plus any new additions, while instance A works it's way through updating the rows
> 3. Instance B attempts to update the same rows but is met with a lock on those which instance A has already updated, potential for lock wait timeout issues for instance B (an existing and unavoidable issue)
> 4. Instance A finishes the updates and commits, locks are released and B can begin it's updates (if the lock wait didn't time out).  A sends the retrieved jobs back to the poller to be queued in memory.
> 5. Instance B finishes the updates and commits, sends the retrieved jobs back to the poller to be queued in memory.
>
> I could be wrong about the above but I'm fairly sure the table wouldn't be locked for reading until A has made its first update (even then I can't recall off the top of my head if this prevents reads until the update is committed).  I believe SELECT FOR UPDATE is the only "select then update" strategy that would lock the table immediately and OFBiz doesn't support it.
>
> Regards
> Scott
>
> On 8/08/2012, at 7:11 AM, adrianc@apache.org wrote:
>
>> Author: adrianc
>> Date: Tue Aug  7 19:11:06 2012
>> New Revision: 1370437
>>
>> URL: http://svn.apache.org/viewvc?rev=1370437&view=rev
>> Log:
>> More work on the Job Scheduler:
>>
>> 1. Gave the job queue a fixed size so the job poller can't create an out-of-memory condition.
>> 2. Changed min/max thread settings to more conservative values.
>> 3. Changed JobSandbox polling code to accept a limit argument to control the number of records retrieved. Also used ELI to limit memory use.
>> 4. Improved JobManager reloadCrashedJobs method to recover queued jobs that were being missed previously.
>>
>> At this stage the Job Scheduler has been fixed to not crash/saturate the server, but it can still lose jobs. That will be fixed in the next commit.
>>
>> Modified:
>>     ofbiz/trunk/framework/service/config/serviceengine.xml
>>     ofbiz/trunk/framework/service/dtd/service-config.xsd
>>     ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java
>>     ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java
>>
>> Modified: ofbiz/trunk/framework/service/config/serviceengine.xml
>> URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/config/serviceengine.xml?rev=1370437&r1=1370436&r2=1370437&view=diff
>> ==============================================================================
>> --- ofbiz/trunk/framework/service/config/serviceengine.xml (original)
>> +++ ofbiz/trunk/framework/service/config/serviceengine.xml Tue Aug  7 19:11:06 2012
>> @@ -25,15 +25,16 @@ under the License.
>>          <!-- Name of the service to use for authorization -->
>>          <authorization service-name="userLogin"/>
>>
>> -        <!-- Thread pool configuration (max/min threads, uses to live and time to live) -->
>> +        <!-- Job poller configuration. Many of these attributes are set to the job poller defaults, but they are included here for convenience. -->
>>          <thread-pool send-to-pool="pool"
>>                       purge-job-days="4"
>>                       failed-retry-min="3"
>> -                     ttl="18000000"
>> -                     min-threads="5"
>> -                     max-threads="15"
>> +                     ttl="120000"
>> +                     jobs="100"
>> +                     min-threads="2"
>> +                     max-threads="5"
>>                       poll-enabled="true"
>> -                     poll-db-millis="20000">
>> +                     poll-db-millis="30000">
>>              <run-from-pool name="pool"/>
>>          </thread-pool>
>>
>>
>> Modified: ofbiz/trunk/framework/service/dtd/service-config.xsd
>> URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/dtd/service-config.xsd?rev=1370437&r1=1370436&r2=1370437&view=diff
>> ==============================================================================
>> --- ofbiz/trunk/framework/service/dtd/service-config.xsd (original)
>> +++ ofbiz/trunk/framework/service/dtd/service-config.xsd Tue Aug  7 19:11:06 2012
>> @@ -58,38 +58,53 @@ under the License.
>>      <xs:element name="thread-pool">
>>          <xs:complexType>
>>              <xs:sequence>
>> -                <xs:element minOccurs="0" maxOccurs="unbounded" ref="run-from-pool"/>
>> +                <xs:element name="run-from-pool" minOccurs="0" maxOccurs="unbounded">
>> +                    <xs:complexType>
>> +                        <xs:attribute type="xs:string" name="name" use="required" />
>> +                    </xs:complexType>
>> +                </xs:element>
>>              </xs:sequence>
>> -            <xs:attributeGroup ref="attlist.thread-pool"/>
>> +            <xs:attribute type="xs:string" name="send-to-pool" use="required" />
>> +            <xs:attribute type="xs:nonNegativeInteger" name="purge-job-days" default="30" />
>> +            <xs:attribute type="xs:nonNegativeInteger" name="failed-retry-min" default="30" />
>> +            <xs:attribute type="xs:nonNegativeInteger" name="ttl">
>> +                <xs:annotation>
>> +                    <xs:documentation>Idle queue service thread lifespan in milliseconds. Defaults to "120000" (2 minutes).</xs:documentation>
>> +                </xs:annotation>
>> +            </xs:attribute>
>> +            <xs:attribute type="xs:nonNegativeInteger" name="jobs">
>> +                <xs:annotation>
>> +                    <xs:documentation>Job queue size. Defaults to "100".</xs:documentation>
>> +                </xs:annotation>
>> +            </xs:attribute>
>> +            <xs:attribute type="xs:nonNegativeInteger" name="min-threads">
>> +                <xs:annotation>
>> +                    <xs:documentation>Minimum number of queue service threads. Defaults to "1".</xs:documentation>
>> +                </xs:annotation>
>> +            </xs:attribute>
>> +            <xs:attribute type="xs:nonNegativeInteger" name="max-threads">
>> +                <xs:annotation>
>> +                    <xs:documentation>Maximum number of queue service threads. Defaults to "5".</xs:documentation>
>> +                </xs:annotation>
>> +            </xs:attribute>
>> +            <xs:attribute name="poll-enabled">
>> +                <xs:annotation>
>> +                    <xs:documentation>Enable database polling. Defaults to "true".</xs:documentation>
>> +                </xs:annotation>
>> +                <xs:simpleType>
>> +                    <xs:restriction base="xs:token">
>> +                        <xs:enumeration value="true" />
>> +                        <xs:enumeration value="false" />
>> +                    </xs:restriction>
>> +                </xs:simpleType>
>> +            </xs:attribute>
>> +            <xs:attribute type="xs:nonNegativeInteger" name="poll-db-millis">
>> +                <xs:annotation>
>> +                    <xs:documentation>Database polling interval in milliseconds. Defaults to "30000" (30 seconds).</xs:documentation>
>> +                </xs:annotation>
>> +            </xs:attribute>
>>          </xs:complexType>
>>      </xs:element>
>> -    <xs:attributeGroup name="attlist.thread-pool">
>> -        <xs:attribute type="xs:string" name="send-to-pool" use="required"/>
>> -        <xs:attribute type="xs:nonNegativeInteger" name="purge-job-days" default="30"/>
>> -        <xs:attribute type="xs:nonNegativeInteger" name="failed-retry-min" default="30"/>
>> -        <xs:attribute type="xs:nonNegativeInteger" name="ttl" use="required"/>
>> -        <xs:attribute type="xs:nonNegativeInteger" name="wait-millis"/> <!-- deprecated -->
>> -        <xs:attribute type="xs:nonNegativeInteger" name="jobs"/> <!-- deprecated -->
>> -        <xs:attribute type="xs:nonNegativeInteger" name="min-threads" use="required"/>
>> -        <xs:attribute type="xs:nonNegativeInteger" name="max-threads" use="required"/>
>> -        <xs:attribute name="poll-enabled" default="true">
>> -            <xs:simpleType>
>> -                <xs:restriction base="xs:token">
>> -                    <xs:enumeration value="true"/>
>> -                    <xs:enumeration value="false"/>
>> -                </xs:restriction>
>> -            </xs:simpleType>
>> -        </xs:attribute>
>> -        <xs:attribute type="xs:nonNegativeInteger" name="poll-db-millis" use="required"/>
>> -    </xs:attributeGroup>
>> -    <xs:element name="run-from-pool">
>> -        <xs:complexType>
>> -            <xs:attributeGroup ref="attlist.run-from-pool"/>
>> -        </xs:complexType>
>> -    </xs:element>
>> -    <xs:attributeGroup name="attlist.run-from-pool">
>> -        <xs:attribute type="xs:string" name="name" use="required"/>
>> -    </xs:attributeGroup>
>>      <xs:element name="engine">
>>          <xs:complexType>
>>              <xs:sequence>
>>
>> Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java
>> URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java?rev=1370437&r1=1370436&r2=1370437&view=diff
>> ==============================================================================
>> --- ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java (original)
>> +++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobManager.java Tue Aug  7 19:11:06 2012
>> @@ -38,13 +38,13 @@ import org.ofbiz.entity.Delegator;
>> import org.ofbiz.entity.GenericEntityException;
>> import org.ofbiz.entity.GenericValue;
>> import org.ofbiz.entity.condition.EntityCondition;
>> -import org.ofbiz.entity.condition.EntityConditionList;
>> import org.ofbiz.entity.condition.EntityExpr;
>> import org.ofbiz.entity.condition.EntityOperator;
>> import org.ofbiz.entity.serialize.SerializeException;
>> import org.ofbiz.entity.serialize.XmlSerializer;
>> import org.ofbiz.entity.transaction.GenericTransactionException;
>> import org.ofbiz.entity.transaction.TransactionUtil;
>> +import org.ofbiz.entity.util.EntityListIterator;
>> import org.ofbiz.service.DispatchContext;
>> import org.ofbiz.service.LocalDispatcher;
>> import org.ofbiz.service.ServiceContainer;
>> @@ -146,7 +146,7 @@ public final class JobManager {
>>       * Returns an empty list if there are no jobs due to run.
>>       * This method is called by the {@link JobPoller} polling thread.
>>       */
>> -    protected synchronized List<Job> poll() {
>> +    protected List<Job> poll(int limit) {
>>          assertIsRunning();
>>          DispatchContext dctx = getDispatcher().getDispatchContext();
>>          if (dctx == null) {
>> @@ -154,8 +154,6 @@ public final class JobManager {
>>              return null;
>>          }
>>          List<Job> poll = FastList.newInstance();
>> -        // sort the results by time
>> -        List<String> order = UtilMisc.toList("runTime");
>>          // basic query
>>          List<EntityExpr> expressions = UtilMisc.toList(EntityCondition.makeCondition("runTime", EntityOperator.LESS_THAN_EQUAL_TO, UtilDateTime.nowTimestamp()),
>>                  EntityCondition.makeCondition("startDateTime", EntityOperator.EQUALS, null),
>> @@ -173,21 +171,24 @@ public final class JobManager {
>>          EntityCondition baseCondition = EntityCondition.makeCondition(expressions);
>>          EntityCondition poolCondition = EntityCondition.makeCondition(poolsExpr, EntityOperator.OR);
>>          EntityCondition mainCondition = EntityCondition.makeCondition(UtilMisc.toList(baseCondition, poolCondition));
>> +        EntityListIterator jobsIterator = null;
>>          boolean beganTransaction = false;
>>          try {
>>              beganTransaction = TransactionUtil.begin();
>>              if (!beganTransaction) {
>> -                Debug.logError("Unable to poll for jobs; transaction was not started by this process", module);
>> +                Debug.logError("Unable to poll JobSandbox for jobs; transaction was not started by this process", module);
>>                  return null;
>>              }
>> -            // first update the jobs w/ this instance running information
>> -            delegator.storeByCondition("JobSandbox", updateFields, mainCondition);
>> -            // now query all the 'queued' jobs for this instance
>> -            List<GenericValue> jobEnt = delegator.findByAnd("JobSandbox", updateFields, order, false);
>> -            if (UtilValidate.isNotEmpty(jobEnt)) {
>> -                for (GenericValue v : jobEnt) {
>> -                    poll.add(new PersistedServiceJob(dctx, v, null)); // TODO fix the requester
>> +            jobsIterator = delegator.find("JobSandbox", mainCondition, null, null, UtilMisc.toList("runTime"), null);
>> +            GenericValue jobValue = jobsIterator.next();
>> +            while (jobValue != null) {
>> +                jobValue.putAll(updateFields);
>> +                jobValue.store();
>> +                poll.add(new PersistedServiceJob(dctx, jobValue, null));
>> +                if (poll.size() == limit) {
>> +                    break;
>>                  }
>> +                jobValue = jobsIterator.next();
>>              }
>>          } catch (Throwable t) {
>>              // catch Throwable so nothing slips through the cracks... this is a fairly sensitive operation
>> @@ -200,6 +201,13 @@ public final class JobManager {
>>                  Debug.logError(e2, "[Delegator] Could not rollback transaction: " + e2.toString(), module);
>>              }
>>          } finally {
>> +            if (jobsIterator != null) {
>> +                try {
>> +                    jobsIterator.close();
>> +                } catch (GenericEntityException e) {
>> +                    Debug.logWarning(e, module);
>> +                }
>> +            }
>>              try {
>>                  // only commit the transaction if we started one... but make sure we try
>>                  TransactionUtil.commit(beganTransaction);
>> @@ -214,11 +222,19 @@ public final class JobManager {
>>
>>      private void reloadCrashedJobs() {
>>          List<GenericValue> crashed = null;
>> -        List<EntityExpr> exprs = UtilMisc.toList(EntityCondition.makeCondition("runByInstanceId", instanceId));
>> -        exprs.add(EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_RUNNING"));
>> -        EntityConditionList<EntityExpr> ecl = EntityCondition.makeCondition(exprs);
>> +        List<EntityExpr> statusExprList = UtilMisc.toList(EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_QUEUED"), EntityCondition.makeCondition("statusId", EntityOperator.EQUALS, "SERVICE_RUNNING"));
>> +        EntityCondition statusCondition = EntityCondition.makeCondition(statusExprList, EntityOperator.OR);
>> +        List<EntityExpr> poolsExpr = UtilMisc.toList(EntityCondition.makeCondition("poolId", EntityOperator.EQUALS, null));
>> +        List<String> pools = ServiceConfigUtil.getRunPools();
>> +        if (pools != null) {
>> +            for (String poolName : pools) {
>> +                poolsExpr.add(EntityCondition.makeCondition("poolId", EntityOperator.EQUALS, poolName));
>> +            }
>> +        }
>> +        EntityCondition poolCondition = EntityCondition.makeCondition(poolsExpr, EntityOperator.OR);
>> +        EntityCondition mainCondition = EntityCondition.makeCondition(UtilMisc.toList(EntityCondition.makeCondition("runByInstanceId", instanceId), statusCondition, poolCondition));
>>          try {
>> -            crashed = delegator.findList("JobSandbox", ecl, null, UtilMisc.toList("startDateTime"), null, false);
>> +            crashed = delegator.findList("JobSandbox", mainCondition, null, UtilMisc.toList("startDateTime"), null, false);
>>          } catch (GenericEntityException e) {
>>              Debug.logError(e, "Unable to load crashed jobs", module);
>>          }
>>
>> Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java
>> URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java?rev=1370437&r1=1370436&r2=1370437&view=diff
>> ==============================================================================
>> --- ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java (original)
>> +++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/JobPoller.java Tue Aug  7 19:11:06 2012
>> @@ -33,8 +33,6 @@ import java.util.concurrent.atomic.Atomi
>> import org.ofbiz.base.util.Debug;
>> import org.ofbiz.service.config.ServiceConfigUtil;
>>
>> -import org.apache.commons.lang.math.NumberUtils;
>> -
>> /**
>>   * Job poller. Queues and runs jobs.
>>   */
>> @@ -42,10 +40,11 @@ public final class JobPoller implements
>>
>>      public static final String module = JobPoller.class.getName();
>>      private static final AtomicInteger created = new AtomicInteger();
>> -    public static final int MIN_THREADS = 1;
>> -    public static final int MAX_THREADS = 15;
>> -    public static final int POLL_WAIT = 20000;
>> -    public static final long THREAD_TTL = 18000000;
>> +    private static final int MIN_THREADS = 1; // Must be no less than one or the executor will shut down.
>> +    private static final int MAX_THREADS = 5; // Values higher than 5 might slow things down.
>> +    private static final int POLL_WAIT = 30000; // Database polling interval - 30 seconds.
>> +    private static final int QUEUE_SIZE = 100;
>> +    private static final long THREAD_TTL = 120000; // Idle thread lifespan - 2 minutes.
>>
>>      private final JobManager jm;
>>      private final ThreadPoolExecutor executor;
>> @@ -61,7 +60,7 @@ public final class JobPoller implements
>>      public JobPoller(JobManager jm) {
>>          this.name = jm.getDelegator().getDelegatorName();
>>          this.jm = jm;
>> -        this.executor = new ThreadPoolExecutor(minThreads(), maxThreads(), getTTL(), TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),
>> +        this.executor = new ThreadPoolExecutor(minThreads(), maxThreads(), getTTL(), TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(queueSize()),
>>                  new JobInvokerThreadFactory(this.name), new ThreadPoolExecutor.AbortPolicy());
>>      }
>>
>> @@ -116,33 +115,48 @@ public final class JobPoller implements
>>      }
>>
>>      private long getTTL() {
>> -        long ttl = THREAD_TTL;
>> -        try {
>> -            ttl = NumberUtils.toLong(ServiceConfigUtil.getElementAttr("thread-pool", "ttl"));
>> -        } catch (NumberFormatException nfe) {
>> -            Debug.logError("Problems reading value from attribute [ttl] of element [thread-pool] in serviceengine.xml file [" + nfe.toString() + "]. Using default (" + THREAD_TTL + ").", module);
>> +        String threadTTLAttr = ServiceConfigUtil.getElementAttr("thread-pool", "ttl");
>> +        if (!threadTTLAttr.isEmpty()) {
>> +            try {
>> +                int threadTTL = Integer.parseInt(threadTTLAttr);
>> +                if (threadTTL > 0) {
>> +                    return threadTTL;
>> +                }
>> +            } catch (NumberFormatException e) {
>> +                Debug.logError("Exception thrown while parsing thread TTL from serviceengine.xml file [" + e + "]. Using default value.", module);
>> +            }
>>          }
>> -        return ttl;
>> +        return THREAD_TTL;
>>      }
>>
>>      private int maxThreads() {
>> -        int max = MAX_THREADS;
>> -        try {
>> -            max = Integer.parseInt(ServiceConfigUtil.getElementAttr("thread-pool", "max-threads"));
>> -        } catch (NumberFormatException nfe) {
>> -            Debug.logError("Problems reading values from serviceengine.xml file [" + nfe.toString() + "]. Using defaults.", module);
>> +        String maxThreadsAttr = ServiceConfigUtil.getElementAttr("thread-pool", "max-threads");
>> +        if (!maxThreadsAttr.isEmpty()) {
>> +            try {
>> +                int maxThreads = Integer.parseInt(maxThreadsAttr);
>> +                if (maxThreads > 0) {
>> +                    return maxThreads;
>> +                }
>> +            } catch (NumberFormatException e) {
>> +                Debug.logError("Exception thrown while parsing maximum threads from serviceengine.xml file [" + e + "]. Using default value.", module);
>> +            }
>>          }
>> -        return max;
>> +        return MAX_THREADS;
>>      }
>>
>>      private int minThreads() {
>> -        int min = MIN_THREADS;
>> -        try {
>> -            min = Integer.parseInt(ServiceConfigUtil.getElementAttr("thread-pool", "min-threads"));
>> -        } catch (NumberFormatException nfe) {
>> -            Debug.logError("Problems reading values from serviceengine.xml file [" + nfe.toString() + "]. Using defaults.", module);
>> +        String minThreadsAttr = ServiceConfigUtil.getElementAttr("thread-pool", "min-threads");
>> +        if (!minThreadsAttr.isEmpty()) {
>> +            try {
>> +                int minThreads = Integer.parseInt(minThreadsAttr);
>> +                if (minThreads > 0) {
>> +                    return minThreads;
>> +                }
>> +            } catch (NumberFormatException e) {
>> +                Debug.logError("Exception thrown while parsing minimum threads from serviceengine.xml file [" + e + "]. Using default value.", module);
>> +            }
>>          }
>> -        return min;
>> +        return MIN_THREADS;
>>      }
>>
>>      private boolean pollEnabled() {
>> @@ -153,13 +167,33 @@ public final class JobPoller implements
>>      }
>>
>>      private int pollWaitTime() {
>> -        int poll = POLL_WAIT;
>> -        try {
>> -            poll = Integer.parseInt(ServiceConfigUtil.getElementAttr("thread-pool", "poll-db-millis"));
>> -        } catch (NumberFormatException nfe) {
>> -            Debug.logError("Problems reading values from serviceengine.xml file [" + nfe.toString() + "]. Using defaults.", module);
>> +        String pollIntervalAttr = ServiceConfigUtil.getElementAttr("thread-pool", "poll-db-millis");
>> +        if (!pollIntervalAttr.isEmpty()) {
>> +            try {
>> +                int pollInterval = Integer.parseInt(pollIntervalAttr);
>> +                if (pollInterval > 0) {
>> +                    return pollInterval;
>> +                }
>> +            } catch (NumberFormatException e) {
>> +                Debug.logError("Exception thrown while parsing database polling interval from serviceengine.xml file [" + e + "]. Using default value.", module);
>> +            }
>> +        }
>> +        return POLL_WAIT;
>> +    }
>> +
>> +    private int queueSize() {
>> +        String queueSizeAttr = ServiceConfigUtil.getElementAttr("thread-pool", "jobs");
>> +        if (!queueSizeAttr.isEmpty()) {
>> +            try {
>> +                int queueSize = Integer.parseInt(queueSizeAttr);
>> +                if (queueSize > 0) {
>> +                    return queueSize;
>> +                }
>> +            } catch (NumberFormatException e) {
>> +                Debug.logError("Exception thrown while parsing queue size from serviceengine.xml file [" + e + "]. Using default value.", module);
>> +            }
>>          }
>> -        return poll;
>> +        return QUEUE_SIZE;
>>      }
>>
>>      /**
>> @@ -172,29 +206,28 @@ public final class JobPoller implements
>>          this.executor.execute(new JobInvoker(job));
>>      }
>>
>> -    public synchronized void run() {
>> +    public void run() {
>>          try {
>>              // wait 30 seconds before the first poll
>> -            java.lang.Thread.sleep(30000);
>> -        } catch (InterruptedException e) {
>> -        }
>> -        while (!executor.isShutdown()) {
>> -            try {
>> -                // grab a list of jobs to run.
>> -                List<Job> pollList = jm.poll();
>> -                for (Job job : pollList) {
>> -                    try {
>> -                        queueNow(job);
>> -                    } catch (InvalidJobException e) {
>> -                        Debug.logError(e, module);
>> +            Thread.sleep(30000);
>> +            while (!executor.isShutdown()) {
>> +                int remainingCapacity = executor.getQueue().remainingCapacity();
>> +                if (remainingCapacity > 0) {
>> +                    List<Job> pollList = jm.poll(remainingCapacity);
>> +                    for (Job job : pollList) {
>> +                        try {
>> +                            queueNow(job);
>> +                        } catch (InvalidJobException e) {
>> +                            Debug.logError(e, module);
>> +                        }
>>                      }
>>                  }
>> -                // NOTE: using sleep instead of wait for stricter locking
>> -                java.lang.Thread.sleep(pollWaitTime());
>> -            } catch (InterruptedException e) {
>> -                Debug.logError(e, module);
>> -                stop();
>> +                Thread.sleep(pollWaitTime());
>>              }
>> +        } catch (InterruptedException e) {
>> +            Debug.logError(e, module);
>> +            stop();
>> +            Thread.currentThread().interrupt();
>>          }
>>          Debug.logInfo("JobPoller " + this.name + " thread terminated.", module);
>>      }
>> @@ -234,7 +267,7 @@ public final class JobPoller implements
>>          }
>>
>>          public Thread newThread(Runnable runnable) {
>> -            return new Thread(runnable, "OFBiz-JobInvoker-" + poolName + "-" + created.getAndIncrement());
>> +            return new Thread(runnable, "OFBiz-JobQueue-" + poolName + "-" + created.getAndIncrement());
>>          }
>>      }
>> }
>>
>>