You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ofbiz.apache.org by le...@apache.org on 2019/04/07 07:39:47 UTC

svn commit: r1857071 - in /ofbiz/ofbiz-framework/trunk: applications/product/minilang/product/product/ framework/service/entitydef/ framework/service/src/main/java/org/apache/ofbiz/service/engine/ framework/service/src/main/java/org/apache/ofbiz/servic...

Author: lektran
Date: Sun Apr  7 07:39:47 2019
New Revision: 1857071

URL: http://svn.apache.org/viewvc?rev=1857071&view=rev
Log:
Implemented: Allow Jobs to specify a priority and be queued accordingly by the 
JobPoller so that important jobs can be prioritized over normal jobs, and low 
priority jobs can be left until last. [OFBIZ-10865]

Changes are as follows:
* Add a "priority" field to JobSandbox entity (numeric/Long)
* Add JobPriority constants class containing fields LOW (0), NORMAL (50) and HIGH (100)
* Add getPriority method to the Job interface and implement methods for AbstractJob (returns NORMAL), PersistedServiceJob (returns JobSandbox.priority) and PurgeJob (returns LOW)
* Change the JobPoller executor's queue to use PriorityBlockingQueue (unbounded) instead of LinkedBlockingQueue (bounded)
* Implement custom Comparator for the priority queue to sort by priority descending and then runTime ascending
* Change the poll size per poll to be (queueSize() - queue.size) instead of queue.remainingCapacity() due to the new queue being unbounded
* I've also opted to limit the database poll query to the poll size using maxRows() because it seemed dangerous to me to use an unconstrained query on this table
* Ensured recurring jobs receive the default (NORMAL) priority when being rescheduled so that they're sorted correctly on the next time they show up in the database poll
* Ensured jobs generated at runtime are given a default priority of NORMAL


Added:
    ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/JobPriority.java
Modified:
    ofbiz/ofbiz-framework/trunk/applications/product/minilang/product/product/ProductServices.xml
    ofbiz/ofbiz-framework/trunk/framework/service/entitydef/entitymodel.xml
    ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/engine/GenericAsyncEngine.java
    ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/AbstractJob.java
    ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/Job.java
    ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/JobManager.java
    ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/JobPoller.java
    ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/PersistedServiceJob.java
    ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/PurgeJob.java

Modified: ofbiz/ofbiz-framework/trunk/applications/product/minilang/product/product/ProductServices.xml
URL: http://svn.apache.org/viewvc/ofbiz/ofbiz-framework/trunk/applications/product/minilang/product/product/ProductServices.xml?rev=1857071&r1=1857070&r2=1857071&view=diff
==============================================================================
--- ofbiz/ofbiz-framework/trunk/applications/product/minilang/product/product/ProductServices.xml (original)
+++ ofbiz/ofbiz-framework/trunk/applications/product/minilang/product/product/ProductServices.xml Sun Apr  7 07:39:47 2019
@@ -889,6 +889,7 @@ under the License.
             <create-value value-field="runtimeData"/>
 
              <!-- Create Job For ProductGroupOrder -->
+             <!-- FIXME: Jobs should not be manually created -->
             <make-value entity-name="JobSandbox" value-field="jobSandbox"/>
             <sequenced-id sequence-name="JobSandbox" field="jobSandbox.jobId"/>
             <set field="jobId" from-field="jobSandbox.jobId"/>
@@ -900,6 +901,7 @@ under the License.
             <set field="jobSandbox.runAsUser" value="system"/>
             <set field="jobSandbox.runtimeDataId" from-field="runtimeDataId"/>
             <set field="jobSandbox.maxRecurrenceCount" value="1" type="Long"/>
+            <set field="jobSandbox.priority" value="50" type="Long"/>
             <create-value value-field="jobSandbox"/>
 
             <set field="productGroupOrder.jobId" from-field="jobId"/>

Modified: ofbiz/ofbiz-framework/trunk/framework/service/entitydef/entitymodel.xml
URL: http://svn.apache.org/viewvc/ofbiz/ofbiz-framework/trunk/framework/service/entitydef/entitymodel.xml?rev=1857071&r1=1857070&r2=1857071&view=diff
==============================================================================
--- ofbiz/ofbiz-framework/trunk/framework/service/entitydef/entitymodel.xml (original)
+++ ofbiz/ofbiz-framework/trunk/framework/service/entitydef/entitymodel.xml Sun Apr  7 07:39:47 2019
@@ -44,6 +44,7 @@ under the License.
         <field name="jobId" type="id"></field>
         <field name="jobName" type="name"></field>
         <field name="runTime" type="date-time"></field>
+        <field name="priority" type="numeric"></field>
         <field name="poolId" type="name"></field>
         <field name="statusId" type="id"></field>
         <field name="parentJobId" type="id"></field>

Modified: ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/engine/GenericAsyncEngine.java
URL: http://svn.apache.org/viewvc/ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/engine/GenericAsyncEngine.java?rev=1857071&r1=1857070&r2=1857071&view=diff
==============================================================================
--- ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/engine/GenericAsyncEngine.java (original)
+++ ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/engine/GenericAsyncEngine.java Sun Apr  7 07:39:47 2019
@@ -41,6 +41,7 @@ import org.apache.ofbiz.service.job.Gene
 import org.apache.ofbiz.service.job.Job;
 import org.apache.ofbiz.service.job.JobManager;
 import org.apache.ofbiz.service.job.JobManagerException;
+import org.apache.ofbiz.service.job.JobPriority;
 
 /**
  * Generic Asynchronous Engine
@@ -112,6 +113,7 @@ public abstract class GenericAsyncEngine
                 jFields.put("loaderName", localName);
                 jFields.put("maxRetry", (long) modelService.maxRetry);
                 jFields.put("runtimeDataId", dataId);
+                jFields.put("priority", JobPriority.NORMAL);
                 if (UtilValidate.isNotEmpty(authUserLoginId)) {
                     jFields.put("authUserLoginId", authUserLoginId);
                 }

Modified: ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/AbstractJob.java
URL: http://svn.apache.org/viewvc/ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/AbstractJob.java?rev=1857071&r1=1857070&r2=1857071&view=diff
==============================================================================
--- ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/AbstractJob.java (original)
+++ ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/AbstractJob.java Sun Apr  7 07:39:47 2019
@@ -115,4 +115,12 @@ public abstract class AbstractJob implem
     public Date getStartTime() {
         return (Date) startTime.clone();
     }
+
+    /* 
+     * Returns JobPriority.NORMAL, the default setting
+     */
+    @Override
+    public long getPriority() {
+        return JobPriority.NORMAL;
+    }
 }

Modified: ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/Job.java
URL: http://svn.apache.org/viewvc/ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/Job.java?rev=1857071&r1=1857070&r2=1857071&view=diff
==============================================================================
--- ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/Job.java (original)
+++ ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/Job.java Sun Apr  7 07:39:47 2019
@@ -72,5 +72,10 @@ public interface Job extends Runnable {
      * Returns the time this job is scheduled to start.
      */
     Date getStartTime();
+
+    /**
+     * Returns the priority of this job, higher the number the higher the priority
+     */
+    long getPriority();
 }
 

Modified: ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/JobManager.java
URL: http://svn.apache.org/viewvc/ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/JobManager.java?rev=1857071&r1=1857070&r2=1857071&view=diff
==============================================================================
--- ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/JobManager.java (original)
+++ ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/JobManager.java Sun Apr  7 07:39:47 2019
@@ -213,7 +213,10 @@ public final class JobManager {
                 Debug.logWarning("Unable to poll JobSandbox for jobs; unable to begin transaction.", module);
                 return poll;
             }
-            try (EntityListIterator jobsIterator = EntityQuery.use(delegator).from("JobSandbox").where(mainCondition).orderBy("runTime").queryIterator()) {
+            try (EntityListIterator jobsIterator = EntityQuery.use(delegator)
+                    .from("JobSandbox").where(mainCondition)
+                    .orderBy("priority DESC NULLS LAST", "runTime")
+                    .maxRows(limit).queryIterator()) {
                 GenericValue jobValue = jobsIterator.next();
                 while (jobValue != null) {
                     // Claim ownership of this value. Using storeByCondition to avoid a race condition.
@@ -546,7 +549,8 @@ public final class JobManager {
             jobName = Long.toString((new Date().getTime()));
         }
         Map<String, Object> jFields = UtilMisc.<String, Object> toMap("jobName", jobName, "runTime", new java.sql.Timestamp(startTime),
-                "serviceName", serviceName, "statusId", "SERVICE_PENDING", "recurrenceInfoId", infoId, "runtimeDataId", dataId);
+                "serviceName", serviceName, "statusId", "SERVICE_PENDING", "recurrenceInfoId", infoId, "runtimeDataId", dataId,
+                "priority", JobPriority.NORMAL);
         // set the pool ID
         if (UtilValidate.isNotEmpty(poolName)) {
             jFields.put("poolId", poolName);

Modified: ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/JobPoller.java
URL: http://svn.apache.org/viewvc/ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/JobPoller.java?rev=1857071&r1=1857070&r2=1857071&view=diff
==============================================================================
--- ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/JobPoller.java (original)
+++ ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/JobPoller.java Sun Apr  7 07:39:47 2019
@@ -20,13 +20,14 @@ package org.apache.ofbiz.service.job;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -63,15 +64,48 @@ public final class JobPoller implements
     private static ThreadPoolExecutor createThreadPoolExecutor() {
         try {
             ThreadPool threadPool = ServiceConfigUtil.getServiceEngine(ServiceConfigUtil.getEngine()).getThreadPool();
-            return new ThreadPoolExecutor(threadPool.getMinThreads(), threadPool.getMaxThreads(), threadPool.getTtl(),
-                    TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(threadPool.getJobs()), new JobInvokerThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
+            return new ThreadPoolExecutor(
+                    threadPool.getMinThreads(), 
+                    threadPool.getMaxThreads(), 
+                    threadPool.getTtl(),
+                    TimeUnit.MILLISECONDS, 
+                    new PriorityBlockingQueue<>(threadPool.getJobs(), createPriorityComparator()), 
+                    new JobInvokerThreadFactory(), 
+                    new ThreadPoolExecutor.AbortPolicy());
         } catch (GenericConfigException e) {
             Debug.logError(e, "Exception thrown while getting <thread-pool> model, using default <thread-pool> values: ", module);
-            return new ThreadPoolExecutor(ThreadPool.MIN_THREADS, ThreadPool.MAX_THREADS, ThreadPool.THREAD_TTL,
-                    TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(ThreadPool.QUEUE_SIZE), new JobInvokerThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
+            return new ThreadPoolExecutor(
+                    ThreadPool.MIN_THREADS, 
+                    ThreadPool.MAX_THREADS, 
+                    ThreadPool.THREAD_TTL,
+                    TimeUnit.MILLISECONDS, 
+                    new PriorityBlockingQueue<>(ThreadPool.QUEUE_SIZE, createPriorityComparator()), 
+                    new JobInvokerThreadFactory(), 
+                    new ThreadPoolExecutor.AbortPolicy());
         }
     }
 
+    private static Comparator<Runnable> createPriorityComparator() {
+        return new Comparator<Runnable>() {
+
+            /**
+             * Sorts jobs by priority then by start time
+             */
+            @Override
+            public int compare(Runnable o1, Runnable o2) {
+                Job j1 = (Job) o1;
+                Job j2 = (Job) o2;
+                // Descending priority (higher number returns -1)
+                int priorityCompare = Long.compare(j2.getPriority(), j1.getPriority());
+                if (priorityCompare != 0) {
+                    return priorityCompare;
+                }
+                // Ascending start time (earlier time returns -1)
+                return Long.compare(j1.getStartTime().getTime(), j2.getStartTime().getTime());
+            }
+        };
+    }
+
     private static int pollWaitTime() {
         try {
             ThreadPool threadPool = ServiceConfigUtil.getServiceEngine(ServiceConfigUtil.getEngine()).getThreadPool();
@@ -82,6 +116,16 @@ public final class JobPoller implements
         }
     }
 
+    static int queueSize() {
+        try {
+            ThreadPool threadPool = ServiceConfigUtil.getServiceEngine(ServiceConfigUtil.getEngine()).getThreadPool();
+            return threadPool.getJobs();
+        } catch (GenericConfigException e) {
+            Debug.logError(e, "Exception thrown while getting <thread-pool> model, using default <thread-pool> values: ", module);
+            return ThreadPool.QUEUE_SIZE;
+        }
+    }
+
     /**
      * Register a {@link JobManager} with the job poller.
      *
@@ -170,6 +214,7 @@ public final class JobPoller implements
         try {
             executor.execute(job);
         } catch (Exception e) {
+            Debug.logError(e, module);
             job.deQueue();
         }
     }
@@ -197,6 +242,7 @@ public final class JobPoller implements
 
     private static class JobInvokerThreadFactory implements ThreadFactory {
 
+        @Override
         public Thread newThread(Runnable runnable) {
             return new Thread(runnable, "OFBiz-JobQueue-" + created.getAndIncrement());
         }
@@ -214,7 +260,7 @@ public final class JobPoller implements
                     Thread.sleep(1000);
                 }
                 while (!executor.isShutdown()) {
-                    int remainingCapacity = executor.getQueue().remainingCapacity();
+                    int remainingCapacity = queueSize() - executor.getQueue().size();
                     if (remainingCapacity > 0) {
                         // Build "list of lists"
                         Collection<JobManager> jmCollection = jobManagers.values();

Added: ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/JobPriority.java
URL: http://svn.apache.org/viewvc/ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/JobPriority.java?rev=1857071&view=auto
==============================================================================
--- ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/JobPriority.java (added)
+++ ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/JobPriority.java Sun Apr  7 07:39:47 2019
@@ -0,0 +1,7 @@
+package org.apache.ofbiz.service.job;
+
+public final class JobPriority {
+    public static final long LOW = 0;
+    public static final long NORMAL = 50;
+    public static final long HIGH = 100;
+}

Modified: ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/PersistedServiceJob.java
URL: http://svn.apache.org/viewvc/ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/PersistedServiceJob.java?rev=1857071&r1=1857070&r2=1857071&view=diff
==============================================================================
--- ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/PersistedServiceJob.java (original)
+++ ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/PersistedServiceJob.java Sun Apr  7 07:39:47 2019
@@ -211,6 +211,10 @@ public class PersistedServiceJob extends
                 newJob.set("currentRetryCount", 0L);
             }
             nextRecurrence = next;
+            // Set priority if missing
+            if (newJob.getLong("priority") == null) {
+                newJob.set("priority", JobPriority.NORMAL);
+            }
             delegator.createSetNextSeqId(newJob);
             if (Debug.verboseOn()) {
                 Debug.logVerbose("Created next job entry: " + newJob, module);
@@ -379,4 +383,17 @@ public class PersistedServiceJob extends
     public Date getStartTime() {
         return new Date(startTime);
     }
+
+    /* 
+     * Returns the priority stored in the JobSandbox.priority field, if no value is present
+     * then it defaults to AbstractJob.getPriority()
+     */
+    @Override
+    public long getPriority() {
+        Long priority = jobValue.getLong("priority");
+        if (priority == null) {
+            return super.getPriority();
+        }
+        return priority;
+    }
 }

Modified: ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/PurgeJob.java
URL: http://svn.apache.org/viewvc/ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/PurgeJob.java?rev=1857071&r1=1857070&r2=1857071&view=diff
==============================================================================
--- ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/PurgeJob.java (original)
+++ ofbiz/ofbiz-framework/trunk/framework/service/src/main/java/org/apache/ofbiz/service/job/PurgeJob.java Sun Apr  7 07:39:47 2019
@@ -82,4 +82,12 @@ public class PurgeJob extends AbstractJo
             throw new InvalidJobException("Illegal state change");
         }
     }
+
+    /* 
+     * Returns JobPriority.LOW
+     */
+    @Override
+    public long getPriority() {
+        return JobPriority.LOW;
+    }
 }