You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ofbiz.apache.org by le...@apache.org on 2019/04/07 07:39:47 UTC
svn commit: r1857071 - in /ofbiz/ofbiz-framework/trunk:
applications/product/minilang/product/product/ framework/service/entitydef/
framework/service/src/main/java/org/apache/ofbiz/service/engine/
framework/service/src/main/java/org/apache/ofbiz/servic...
Author: lektran
Date: Sun Apr 7 07:39:47 2019
New Revision: 1857071
URL: http://svn.apache.org/viewvc?rev=1857071&view=rev
Log:
Implemented: Allow Jobs to specify a priority and be queued accordingly by the
JobPoller so that important jobs can be prioritized over normal jobs, and low
priority jobs can be left until last. [OFBIZ-10865]
Changes are as follows:
* Add a "priority" field to JobSandbox entity (numeric/Long)
* Add JobPriority constants class containing fields LOW (0), NORMAL (50) and HIGH (100)
* Add getPriority method to the Job interface and implement methods for AbstractJob (returns NORMAL), PersistedServiceJob (returns JobSandbox.priority) and PurgeJob (returns LOW)
* Change the JobPoller executor's queue to use PriorityBlockingQueue (unbounded) instead of LinkedBlockingQueue (bounded)
* Implement custom Comparator for the priority queue to sort by priority descending and then runTime ascending
* Change the poll size per poll to be (queueSize() - queue.size) instead of queue.remainingCapacity() due to the new queue being unbounded
* I've also opted to limit the database poll query to the poll size using maxRows() because it seemed dangerous to me to use an unconstrained query on this table
* Ensured recurring jobs receive the default (NORMAL) priority when being rescheduled so that they're sorted correctly on the next time they show up in the database poll
* Ensured jobs generated at runtime are given a default priority of NORMAL
Added:
ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/JobPriority.java
Modified:
ofbiz/ofbiz-framework/trunk/applications/product/minilang/product/product/ProductServices.xml
ofbiz/ofbiz-framework/trunk/framework/service/entitydef/entitymodel.xml
ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/engine/GenericAsyncEngine.java
ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/AbstractJob.java
ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/Job.java
ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/JobManager.java
ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/JobPoller.java
ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/PersistedServiceJob.java
ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/PurgeJob.java
Modified: ofbiz/ofbiz-framework/trunk/applications/product/minilang/product/product/ProductServices.xml
URL: http://svn.apache.org/viewvc/ofbiz/ofbiz-framework/trunk/applications/product/minilang/product/product/ProductServices.xml?rev=1857071&r1=1857070&r2=1857071&view=diff
==============================================================================
--- ofbiz/ofbiz-framework/trunk/applications/product/minilang/product/product/ProductServices.xml (original)
+++ ofbiz/ofbiz-framework/trunk/applications/product/minilang/product/product/ProductServices.xml Sun Apr 7 07:39:47 2019
@@ -889,6 +889,7 @@ under the License.
<create-value value-field="runtimeData"/>
<!-- Create Job For ProductGroupOrder -->
+ <!-- FIXME: Jobs should not be manually created -->
<make-value entity-name="JobSandbox" value-field="jobSandbox"/>
<sequenced-id sequence-name="JobSandbox" field="jobSandbox.jobId"/>
<set field="jobId" from-field="jobSandbox.jobId"/>
@@ -900,6 +901,7 @@ under the License.
<set field="jobSandbox.runAsUser" value="system"/>
<set field="jobSandbox.runtimeDataId" from-field="runtimeDataId"/>
<set field="jobSandbox.maxRecurrenceCount" value="1" type="Long"/>
+ <set field="jobSandbox.priority" value="50" type="Long"/>
<create-value value-field="jobSandbox"/>
<set field="productGroupOrder.jobId" from-field="jobId"/>
Modified: ofbiz/ofbiz-framework/trunk/framework/service/entitydef/entitymodel.xml
URL: http://svn.apache.org/viewvc/ofbiz/ofbiz-framework/trunk/framework/service/entitydef/entitymodel.xml?rev=1857071&r1=1857070&r2=1857071&view=diff
==============================================================================
--- ofbiz/ofbiz-framework/trunk/framework/service/entitydef/entitymodel.xml (original)
+++ ofbiz/ofbiz-framework/trunk/framework/service/entitydef/entitymodel.xml Sun Apr 7 07:39:47 2019
@@ -44,6 +44,7 @@ under the License.
<field name="jobId" type="id"></field>
<field name="jobName" type="name"></field>
<field name="runTime" type="date-time"></field>
+ <field name="priority" type="numeric"></field>
<field name="poolId" type="name"></field>
<field name="statusId" type="id"></field>
<field name="parentJobId" type="id"></field>
Modified: ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/engine/GenericAsyncEngine.java
URL: http://svn.apache.org/viewvc/ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/engine/GenericAsyncEngine.java?rev=1857071&r1=1857070&r2=1857071&view=diff
==============================================================================
--- ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/engine/GenericAsyncEngine.java (original)
+++ ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/engine/GenericAsyncEngine.java Sun Apr 7 07:39:47 2019
@@ -41,6 +41,7 @@ import org.apache.ofbiz.service.job.Gene
import org.apache.ofbiz.service.job.Job;
import org.apache.ofbiz.service.job.JobManager;
import org.apache.ofbiz.service.job.JobManagerException;
+import org.apache.ofbiz.service.job.JobPriority;
/**
* Generic Asynchronous Engine
@@ -112,6 +113,7 @@ public abstract class GenericAsyncEngine
jFields.put("loaderName", localName);
jFields.put("maxRetry", (long) modelService.maxRetry);
jFields.put("runtimeDataId", dataId);
+ jFields.put("priority", JobPriority.NORMAL);
if (UtilValidate.isNotEmpty(authUserLoginId)) {
jFields.put("authUserLoginId", authUserLoginId);
}
Modified: ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/AbstractJob.java
URL: http://svn.apache.org/viewvc/ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/AbstractJob.java?rev=1857071&r1=1857070&r2=1857071&view=diff
==============================================================================
--- ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/AbstractJob.java (original)
+++ ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/AbstractJob.java Sun Apr 7 07:39:47 2019
@@ -115,4 +115,12 @@ public abstract class AbstractJob implem
public Date getStartTime() {
return (Date) startTime.clone();
}
+
+ /*
+ * Returns JobPriority.NORMAL, the default setting
+ */
+ @Override
+ public long getPriority() {
+ return JobPriority.NORMAL;
+ }
}
Modified: ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/Job.java
URL: http://svn.apache.org/viewvc/ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/Job.java?rev=1857071&r1=1857070&r2=1857071&view=diff
==============================================================================
--- ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/Job.java (original)
+++ ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/Job.java Sun Apr 7 07:39:47 2019
@@ -72,5 +72,10 @@ public interface Job extends Runnable {
* Returns the time this job is scheduled to start.
*/
Date getStartTime();
+
+ /**
+ * Returns the priority of this job, higher the number the higher the priority
+ */
+ long getPriority();
}
Modified: ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/JobManager.java
URL: http://svn.apache.org/viewvc/ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/JobManager.java?rev=1857071&r1=1857070&r2=1857071&view=diff
==============================================================================
--- ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/JobManager.java (original)
+++ ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/JobManager.java Sun Apr 7 07:39:47 2019
@@ -213,7 +213,10 @@ public final class JobManager {
Debug.logWarning("Unable to poll JobSandbox for jobs; unable to begin transaction.", module);
return poll;
}
- try (EntityListIterator jobsIterator = EntityQuery.use(delegator).from("JobSandbox").where(mainCondition).orderBy("runTime").queryIterator()) {
+ try (EntityListIterator jobsIterator = EntityQuery.use(delegator)
+ .from("JobSandbox").where(mainCondition)
+ .orderBy("priority DESC NULLS LAST", "runTime")
+ .maxRows(limit).queryIterator()) {
GenericValue jobValue = jobsIterator.next();
while (jobValue != null) {
// Claim ownership of this value. Using storeByCondition to avoid a race condition.
@@ -546,7 +549,8 @@ public final class JobManager {
jobName = Long.toString((new Date().getTime()));
}
Map<String, Object> jFields = UtilMisc.<String, Object> toMap("jobName", jobName, "runTime", new java.sql.Timestamp(startTime),
- "serviceName", serviceName, "statusId", "SERVICE_PENDING", "recurrenceInfoId", infoId, "runtimeDataId", dataId);
+ "serviceName", serviceName, "statusId", "SERVICE_PENDING", "recurrenceInfoId", infoId, "runtimeDataId", dataId,
+ "priority", JobPriority.NORMAL);
// set the pool ID
if (UtilValidate.isNotEmpty(poolName)) {
jFields.put("poolId", poolName);
Modified: ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/JobPoller.java
URL: http://svn.apache.org/viewvc/ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/JobPoller.java?rev=1857071&r1=1857070&r2=1857071&view=diff
==============================================================================
--- ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/JobPoller.java (original)
+++ ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/JobPoller.java Sun Apr 7 07:39:47 2019
@@ -20,13 +20,14 @@ package org.apache.ofbiz.service.job;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
@@ -63,15 +64,48 @@ public final class JobPoller implements
private static ThreadPoolExecutor createThreadPoolExecutor() {
try {
ThreadPool threadPool = ServiceConfigUtil.getServiceEngine(ServiceConfigUtil.getEngine()).getThreadPool();
- return new ThreadPoolExecutor(threadPool.getMinThreads(), threadPool.getMaxThreads(), threadPool.getTtl(),
- TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(threadPool.getJobs()), new JobInvokerThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
+ return new ThreadPoolExecutor(
+ threadPool.getMinThreads(),
+ threadPool.getMaxThreads(),
+ threadPool.getTtl(),
+ TimeUnit.MILLISECONDS,
+ new PriorityBlockingQueue<>(threadPool.getJobs(), createPriorityComparator()),
+ new JobInvokerThreadFactory(),
+ new ThreadPoolExecutor.AbortPolicy());
} catch (GenericConfigException e) {
Debug.logError(e, "Exception thrown while getting <thread-pool> model, using default <thread-pool> values: ", module);
- return new ThreadPoolExecutor(ThreadPool.MIN_THREADS, ThreadPool.MAX_THREADS, ThreadPool.THREAD_TTL,
- TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(ThreadPool.QUEUE_SIZE), new JobInvokerThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
+ return new ThreadPoolExecutor(
+ ThreadPool.MIN_THREADS,
+ ThreadPool.MAX_THREADS,
+ ThreadPool.THREAD_TTL,
+ TimeUnit.MILLISECONDS,
+ new PriorityBlockingQueue<>(ThreadPool.QUEUE_SIZE, createPriorityComparator()),
+ new JobInvokerThreadFactory(),
+ new ThreadPoolExecutor.AbortPolicy());
}
}
+ private static Comparator<Runnable> createPriorityComparator() {
+ return new Comparator<Runnable>() {
+
+ /**
+ * Sorts jobs by priority then by start time
+ */
+ @Override
+ public int compare(Runnable o1, Runnable o2) {
+ Job j1 = (Job) o1;
+ Job j2 = (Job) o2;
+ // Descending priority (higher number returns -1)
+ int priorityCompare = Long.compare(j2.getPriority(), j1.getPriority());
+ if (priorityCompare != 0) {
+ return priorityCompare;
+ }
+ // Ascending start time (earlier time returns -1)
+ return Long.compare(j1.getStartTime().getTime(), j2.getStartTime().getTime());
+ }
+ };
+ }
+
private static int pollWaitTime() {
try {
ThreadPool threadPool = ServiceConfigUtil.getServiceEngine(ServiceConfigUtil.getEngine()).getThreadPool();
@@ -82,6 +116,16 @@ public final class JobPoller implements
}
}
+ static int queueSize() {
+ try {
+ ThreadPool threadPool = ServiceConfigUtil.getServiceEngine(ServiceConfigUtil.getEngine()).getThreadPool();
+ return threadPool.getJobs();
+ } catch (GenericConfigException e) {
+ Debug.logError(e, "Exception thrown while getting <thread-pool> model, using default <thread-pool> values: ", module);
+ return ThreadPool.QUEUE_SIZE;
+ }
+ }
+
/**
* Register a {@link JobManager} with the job poller.
*
@@ -170,6 +214,7 @@ public final class JobPoller implements
try {
executor.execute(job);
} catch (Exception e) {
+ Debug.logError(e, module);
job.deQueue();
}
}
@@ -197,6 +242,7 @@ public final class JobPoller implements
private static class JobInvokerThreadFactory implements ThreadFactory {
+ @Override
public Thread newThread(Runnable runnable) {
return new Thread(runnable, "OFBiz-JobQueue-" + created.getAndIncrement());
}
@@ -214,7 +260,7 @@ public final class JobPoller implements
Thread.sleep(1000);
}
while (!executor.isShutdown()) {
- int remainingCapacity = executor.getQueue().remainingCapacity();
+ int remainingCapacity = queueSize() - executor.getQueue().size();
if (remainingCapacity > 0) {
// Build "list of lists"
Collection<JobManager> jmCollection = jobManagers.values();
Added: ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/JobPriority.java
URL: http://svn.apache.org/viewvc/ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/JobPriority.java?rev=1857071&view=auto
==============================================================================
--- ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/JobPriority.java (added)
+++ ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/JobPriority.java Sun Apr 7 07:39:47 2019
@@ -0,0 +1,7 @@
+package org.apache.ofbiz.service.job;
+
+public final class JobPriority {
+ public static final long LOW = 0;
+ public static final long NORMAL = 50;
+ public static final long HIGH = 100;
+}
Modified: ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/PersistedServiceJob.java
URL: http://svn.apache.org/viewvc/ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/PersistedServiceJob.java?rev=1857071&r1=1857070&r2=1857071&view=diff
==============================================================================
--- ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/PersistedServiceJob.java (original)
+++ ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/PersistedServiceJob.java Sun Apr 7 07:39:47 2019
@@ -211,6 +211,10 @@ public class PersistedServiceJob extends
newJob.set("currentRetryCount", 0L);
}
nextRecurrence = next;
+ // Set priority if missing
+ if (newJob.getLong("priority") == null) {
+ newJob.set("priority", JobPriority.NORMAL);
+ }
delegator.createSetNextSeqId(newJob);
if (Debug.verboseOn()) {
Debug.logVerbose("Created next job entry: " + newJob, module);
@@ -379,4 +383,17 @@ public class PersistedServiceJob extends
public Date getStartTime() {
return new Date(startTime);
}
+
+ /*
+ * Returns the priority stored in the JobSandbox.priority field, if no value is present
+ * then it defaults to AbstractJob.getPriority()
+ */
+ @Override
+ public long getPriority() {
+ Long priority = jobValue.getLong("priority");
+ if (priority == null) {
+ return super.getPriority();
+ }
+ return priority;
+ }
}
Modified: ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/PurgeJob.java
URL: http://svn.apache.org/viewvc/ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/PurgeJob.java?rev=1857071&r1=1857070&r2=1857071&view=diff
==============================================================================
--- ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/PurgeJob.java (original)
+++ ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/PurgeJob.java Sun Apr 7 07:39:47 2019
@@ -82,4 +82,12 @@ public class PurgeJob extends AbstractJo
throw new InvalidJobException("Illegal state change");
}
}
+
+ /*
+ * Returns JobPriority.LOW
+ */
+ @Override
+ public long getPriority() {
+ return JobPriority.LOW;
+ }
}