You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ofbiz.apache.org by nm...@apache.org on 2020/02/22 17:49:04 UTC

[ofbiz-framework] branch trunk updated: Improved: Refactoring old job process (OFBIZ-11200)

This is an automated email from the ASF dual-hosted git repository.

nmalin pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/ofbiz-framework.git


The following commit(s) were added to refs/heads/trunk by this push:
     new dff226e  Improved: Refactoring old job process (OFBIZ-11200)
dff226e is described below

commit dff226ea601958608926d5623c2940232a707a93
Author: Nicolas Malin <ni...@nereide.fr>
AuthorDate: Sat Feb 22 18:48:32 2020 +0100

    Improved: Refactoring old job process
    (OFBIZ-11200)
    
    Before this commit, to purge old job we use the thread job manager to check
    if some job need to be run and if the pool is empty, check the old job to purge.
    
    I detected a problem when you have many job's server and more than one pool.
    When you have a first pool that run regular job and a second that receive
    huge asynchrone services (by persist call), the server who manage the second pool
    have rarely the possibility to purge their jobs that, by the way, increase JobSandbox table.
    
    The Server who manage the first pool, call often the purge process, but the table size
    generate a long query time to purge few element. Long query time, run often,
    this purge process load uselessly the database.
    
    After analyze, I refactoring these following items :
    
     * When you call JobSandbox table for a purge, use a limit on the query
       with the max thread pool because when you have the database return,
       you purge only with this limit so this help the database to not scan
       the whole table. Other small improvement, do not sort the result
       (no functional gain).
    
     * Purge by the pool thread is nice, each server can only purge their
       own job (filtered on run_by_instance_id), to help an overloaded server,
       I was rehability purgeOldJobs service that it can run with specific
        parameter to make assistance.
    
     * Last, all job services are historical on class org.apache.ofbiz.service.ServiceUtil,
       I moved them all in a new dedicate class org.apache.ofbiz.service.job.JobServices
---
 framework/service/servicedef/services.xml          |  12 +-
 .../java/org/apache/ofbiz/service/ServiceUtil.java | 278 +--------------------
 .../org/apache/ofbiz/service/job/JobManager.java   |  39 +--
 .../org/apache/ofbiz/service/job/JobServices.java  | 187 ++++++++++++++
 .../service/job/{PurgeJob.java => JobUtil.java}    |  79 +++---
 .../org/apache/ofbiz/service/job/PurgeJob.java     |  27 +-
 6 files changed, 250 insertions(+), 372 deletions(-)

diff --git a/framework/service/servicedef/services.xml b/framework/service/servicedef/services.xml
index 15d8272..419d553 100644
--- a/framework/service/servicedef/services.xml
+++ b/framework/service/servicedef/services.xml
@@ -26,16 +26,18 @@ under the License.
 
     <!-- Service Engine Services -->
     <service name="purgeOldJobs" engine="java"
-            location="org.apache.ofbiz.service.ServiceUtil" invoke="purgeOldJobs" auth="true" use-transaction="false" semaphore="wait">
+            location="org.apache.ofbiz.service.job.JobServices" invoke="purgeOldJobs" auth="true" use-transaction="false" semaphore="wait">
         <description>Cleans out old jobs which have been around longer then what is defined in serviceengine.xml</description>
         <required-permissions join-type="AND">
             <check-permission permission="SERVICE_INVOKE_ANY"/>
         </required-permissions>
-        <!-- this service has no attributes -->
+        <attribute name="poolId" type="String" mode="IN" optional="true"/>
+        <attribute name="daysToKeep" type="Integer" mode="IN" optional="true"/>
+        <attribute name="limit" type="Integer" mode="IN" optional="true"/>
     </service>
 
     <service name="cancelScheduledJob" engine="java"
-            location="org.apache.ofbiz.service.ServiceUtil" invoke="cancelJob" auth="true">
+            location="org.apache.ofbiz.service.job.JobServices" invoke="cancelJob" auth="true">
         <description>Cancels a schedule job</description>
         <required-permissions join-type="AND">
             <check-permission permission="SERVICE_INVOKE_ANY"/>
@@ -46,7 +48,7 @@ under the License.
     </service>
 
     <service name="cancelJobRetries" engine="java"
-            location="org.apache.ofbiz.service.ServiceUtil" invoke="cancelJobRetries" auth="true">
+            location="org.apache.ofbiz.service.job.JobServices" invoke="cancelJobRetries" auth="true">
         <description>Cancels a job retry flag</description>
         <required-permissions join-type="AND">
             <check-permission permission="SERVICE_INVOKE_ANY"/>
@@ -55,7 +57,7 @@ under the License.
     </service>
 
     <service name="resetScheduledJob" engine="java"
-            location="org.apache.ofbiz.service.ServiceUtil" invoke="resetJob" auth="true">
+            location="org.apache.ofbiz.service.job.JobServices" invoke="resetJob" auth="true">
         <description>Resets a stale job so it can be re-run</description>
         <required-permissions join-type="AND">
             <check-permission permission="SERVICE_INVOKE_ANY"/>
diff --git a/framework/service/src/main/java/org/apache/ofbiz/service/ServiceUtil.java b/framework/service/src/main/java/org/apache/ofbiz/service/ServiceUtil.java
index 1e5b6b0..3da4e6c 100644
--- a/framework/service/src/main/java/org/apache/ofbiz/service/ServiceUtil.java
+++ b/framework/service/src/main/java/org/apache/ofbiz/service/ServiceUtil.java
@@ -18,7 +18,6 @@
  *******************************************************************************/
 package org.apache.ofbiz.service;
 
-import java.math.BigDecimal;
 import java.sql.Timestamp;
 import java.util.HashMap;
 import java.util.LinkedList;
@@ -30,7 +29,6 @@ import java.util.TimeZone;
 import javax.servlet.http.HttpServletRequest;
 import javax.transaction.Transaction;
 
-import org.apache.ofbiz.base.config.GenericConfigException;
 import org.apache.ofbiz.base.util.Debug;
 import org.apache.ofbiz.base.util.UtilDateTime;
 import org.apache.ofbiz.base.util.UtilGenerics;
@@ -40,17 +38,12 @@ import org.apache.ofbiz.base.util.UtilValidate;
 import org.apache.ofbiz.entity.Delegator;
 import org.apache.ofbiz.entity.GenericEntityException;
 import org.apache.ofbiz.entity.GenericValue;
-import org.apache.ofbiz.entity.condition.EntityCondition;
-import org.apache.ofbiz.entity.condition.EntityExpr;
-import org.apache.ofbiz.entity.condition.EntityOperator;
 import org.apache.ofbiz.entity.transaction.GenericTransactionException;
 import org.apache.ofbiz.entity.transaction.TransactionUtil;
-import org.apache.ofbiz.entity.util.EntityListIterator;
 import org.apache.ofbiz.entity.util.EntityQuery;
 import org.apache.ofbiz.security.Security;
-import org.apache.ofbiz.service.config.ServiceConfigUtil;
+import org.apache.ofbiz.service.job.JobUtil;
 
-import com.ibm.icu.util.Calendar;
 
 /**
  * Generic Service Utility Class
@@ -394,233 +387,6 @@ public final class ServiceUtil {
         }
     }
 
-    public static Map<String, Object> purgeOldJobs(DispatchContext dctx, Map<String, ? extends Object> context) {
-         Locale locale = (Locale)context.get("locale");
-        Debug.logWarning("purgeOldJobs service invoked. This service is obsolete - the Job Scheduler will purge old jobs automatically.", module);
-        String sendPool = null;
-        Calendar cal = Calendar.getInstance();
-        try {
-            sendPool = ServiceConfigUtil.getServiceEngine().getThreadPool().getSendToPool();
-            int daysToKeep = ServiceConfigUtil.getServiceEngine().getThreadPool().getPurgeJobDays();
-            cal.add(Calendar.DAY_OF_YEAR, -daysToKeep);
-        } catch (GenericConfigException e) {
-            Debug.logWarning(e, "Exception thrown while getting service configuration: ", module);
-            return returnError(UtilProperties.getMessage(ServiceUtil.resource, "ServiceExceptionThrownWhileGettingServiceConfiguration", UtilMisc.toMap("errorString", e), locale));
-        }
-        Delegator delegator = dctx.getDelegator();
-
-        Timestamp purgeTime = new Timestamp(cal.getTimeInMillis());
-
-        // create the conditions to query
-        EntityCondition pool = EntityCondition.makeCondition("poolId", sendPool);
-
-        List<EntityExpr> finExp = UtilMisc.toList(EntityCondition.makeCondition("finishDateTime", EntityOperator.NOT_EQUAL, null));
-        finExp.add(EntityCondition.makeCondition("finishDateTime", EntityOperator.LESS_THAN, purgeTime));
-
-        List<EntityExpr> canExp = UtilMisc.toList(EntityCondition.makeCondition("cancelDateTime", EntityOperator.NOT_EQUAL, null));
-        canExp.add(EntityCondition.makeCondition("cancelDateTime", EntityOperator.LESS_THAN, purgeTime));
-
-        EntityCondition cancelled = EntityCondition.makeCondition(canExp);
-        EntityCondition finished = EntityCondition.makeCondition(finExp);
-
-        EntityCondition doneCond = EntityCondition.makeCondition(UtilMisc.toList(cancelled, finished), EntityOperator.OR);
-
-        // always suspend the current transaction; use the one internally
-        Transaction parent = null;
-        try {
-            if (TransactionUtil.getStatus() != TransactionUtil.STATUS_NO_TRANSACTION) {
-                parent = TransactionUtil.suspend();
-            }
-
-            // lookup the jobs - looping 1000 at a time to avoid problems with cursors
-            // also, using unique transaction to delete as many as possible even with errors
-            boolean noMoreResults = false;
-            boolean beganTx1 = false;
-            while (!noMoreResults) {
-                // current list of records
-                List<GenericValue> curList = null;
-                try {
-                    // begin this transaction
-                    beganTx1 = TransactionUtil.begin();
-                    EntityQuery eq = EntityQuery.use(delegator)
-                            .select("jobId")
-                            .from("JobSandbox")
-                            .where(EntityCondition.makeCondition(UtilMisc.toList(doneCond, pool)))
-                            .cursorScrollInsensitive()
-                            .maxRows(1000);
-
-                    try (EntityListIterator foundJobs = eq.queryIterator()) {
-                        curList = foundJobs.getPartialList(1, 1000);
-                    }
-
-                } catch (GenericEntityException e) {
-                    Debug.logError(e, "Cannot obtain job data from datasource", module);
-                    try {
-                        TransactionUtil.rollback(beganTx1, e.getMessage(), e);
-                    } catch (GenericTransactionException e1) {
-                        Debug.logWarning(e1, module);
-                    }
-                    return ServiceUtil.returnError(e.getMessage());
-                } finally {
-                    try {
-                        TransactionUtil.commit(beganTx1);
-                    } catch (GenericTransactionException e) {
-                        Debug.logWarning(e, module);
-                    }
-                }
-                // remove each from the list in its own transaction
-                if (UtilValidate.isNotEmpty(curList)) {
-                    for (GenericValue job: curList) {
-                        String jobId = job.getString("jobId");
-                        boolean beganTx2 = false;
-                        try {
-                            beganTx2 = TransactionUtil.begin();
-                            job.remove();
-                        } catch (GenericEntityException e) {
-                            Debug.logInfo("Cannot remove job data for ID: " + jobId, module);
-                            try {
-                                TransactionUtil.rollback(beganTx2, e.getMessage(), e);
-                            } catch (GenericTransactionException e1) {
-                                Debug.logWarning(e1, module);
-                            }
-                        } finally {
-                            try {
-                                TransactionUtil.commit(beganTx2);
-                            } catch (GenericTransactionException e) {
-                                Debug.logWarning(e, module);
-                            }
-                        }
-                    }
-                } else {
-                    noMoreResults = true;
-                }
-            }
-
-            // Now JobSandbox data is cleaned up. Now process Runtime data and remove the whole data in single shot that is of no need.
-            boolean beganTx3 = false;
-            GenericValue runtimeData = null;
-            List<GenericValue> runtimeDataToDelete = new LinkedList<>();
-            long jobsandBoxCount = 0;
-            try {
-                // begin this transaction
-                beganTx3 = TransactionUtil.begin();
-
-                EntityQuery eq =EntityQuery.use(delegator).select("runtimeDataId").from("RuntimeData");
-                try (EntityListIterator  runTimeDataIt = eq.queryIterator()) {
-                    while ((runtimeData = runTimeDataIt.next()) != null) {
-                        EntityCondition whereCondition = EntityCondition.makeCondition(UtilMisc.toList(EntityCondition.makeCondition("runtimeDataId", EntityOperator.NOT_EQUAL, null),
-                                EntityCondition.makeCondition("runtimeDataId", EntityOperator.EQUALS, runtimeData.getString("runtimeDataId"))), EntityOperator.AND);
-                        jobsandBoxCount = EntityQuery.use(delegator).from("JobSandbox").where(whereCondition).queryCount();
-                        if (BigDecimal.ZERO.compareTo(BigDecimal.valueOf(jobsandBoxCount)) == 0) {
-                            runtimeDataToDelete.add(runtimeData);
-                        }
-                    }
-                }
-                // Now we are ready to delete runtimeData, we can safely delete complete list that we have recently fetched i.e runtimeDataToDelete.
-                delegator.removeAll(runtimeDataToDelete);
-            } catch (GenericEntityException e) {
-                Debug.logError(e, "Cannot obtain runtime data from datasource", module);
-                try {
-                    TransactionUtil.rollback(beganTx3, e.getMessage(), e);
-                } catch (GenericTransactionException e1) {
-                    Debug.logWarning(e1, module);
-                }
-                return ServiceUtil.returnError(e.getMessage());
-            } finally {
-                try {
-                    TransactionUtil.commit(beganTx3);
-                } catch (GenericTransactionException e) {
-                    Debug.logWarning(e, module);
-                }
-            }
-        } catch (GenericTransactionException e) {
-            Debug.logError(e, "Unable to suspend transaction; cannot purge jobs!", module);
-            return ServiceUtil.returnError(e.getMessage());
-        } finally {
-            if (parent != null) {
-                try {
-                    TransactionUtil.resume(parent);
-                } catch (GenericTransactionException e) {
-                    Debug.logWarning(e, module);
-                }
-            }
-        }
-
-        return ServiceUtil.returnSuccess();
-    }
-
-    public static Map<String, Object> cancelJob(DispatchContext dctx, Map<String, ? extends Object> context) {
-        Delegator delegator = dctx.getDelegator();
-        Security security = dctx.getSecurity();
-        GenericValue userLogin = (GenericValue) context.get("userLogin");
-        Locale locale = getLocale(context);
-
-        if (!security.hasPermission("SERVICE_INVOKE_ANY", userLogin)) {
-            String errMsg = UtilProperties.getMessage(ServiceUtil.resource, "serviceUtil.no_permission_to_run", locale) + ".";
-            return ServiceUtil.returnError(errMsg);
-        }
-
-        String jobId = (String) context.get("jobId");
-        Map<String, Object> fields = UtilMisc.<String, Object>toMap("jobId", jobId);
-
-        GenericValue job = null;
-        try {
-            job = EntityQuery.use(delegator).from("JobSandbox").where("jobId", jobId).queryOne();
-            if (job != null) {
-                job.set("cancelDateTime", UtilDateTime.nowTimestamp());
-                job.set("statusId", "SERVICE_CANCELLED");
-                job.store();
-            }
-        } catch (GenericEntityException e) {
-            Debug.logError(e, module);
-            String errMsg = UtilProperties.getMessage(ServiceUtil.resource, "serviceUtil.unable_to_cancel_job", locale) + " : " + fields;
-            return ServiceUtil.returnError(errMsg);
-        }
-
-        if (job != null) {
-            Timestamp cancelDate = job.getTimestamp("cancelDateTime");
-            Map<String, Object> result = ServiceUtil.returnSuccess();
-            result.put("cancelDateTime", cancelDate);
-            result.put("statusId", "SERVICE_PENDING"); // To more easily see current pending jobs and possibly cancel some others
-            return result;
-        }
-        String errMsg = UtilProperties.getMessage(ServiceUtil.resource, "serviceUtil.unable_to_cancel_job", locale) + " : " + null;
-        return ServiceUtil.returnError(errMsg);
-    }
-
-    public static Map<String, Object> cancelJobRetries(DispatchContext dctx, Map<String, ? extends Object> context) {
-        Delegator delegator = dctx.getDelegator();
-        Security security = dctx.getSecurity();
-        GenericValue userLogin = (GenericValue) context.get("userLogin");
-        Locale locale = getLocale(context);
-        if (!security.hasPermission("SERVICE_INVOKE_ANY", userLogin)) {
-            String errMsg = UtilProperties.getMessage(ServiceUtil.resource, "serviceUtil.no_permission_to_run", locale) + ".";
-            return ServiceUtil.returnError(errMsg);
-        }
-
-        String jobId = (String) context.get("jobId");
-        Map<String, Object> fields = UtilMisc.<String, Object>toMap("jobId", jobId);
-
-        GenericValue job = null;
-        try {
-            job = EntityQuery.use(delegator).from("JobSandbox").where("jobId", jobId).queryOne();
-            if (job != null) {
-                job.set("maxRetry", 0L);
-                job.store();
-            }
-        } catch (GenericEntityException e) {
-            Debug.logError(e, module);
-            String errMsg = UtilProperties.getMessage(ServiceUtil.resource, "serviceUtil.unable_to_cancel_job_retries", locale) + " : " + fields;
-            return ServiceUtil.returnError(errMsg);
-        }
-
-        if (job != null) {
-            return ServiceUtil.returnSuccess();
-        }
-        String errMsg = UtilProperties.getMessage(ServiceUtil.resource, "serviceUtil.unable_to_cancel_job_retries", locale) + " : " + null;
-        return ServiceUtil.returnError(errMsg);
-    }
-
     public static Map<String, Object> genericDateCondition(DispatchContext dctx, Map<String, ? extends Object> context) {
         Timestamp fromDate = (Timestamp) context.get("fromDate");
         Timestamp thruDate = (Timestamp) context.get("thruDate");
@@ -655,7 +421,7 @@ public final class ServiceUtil {
         return userLogin;
     }
 
-    private static Locale getLocale(Map<String, ? extends Object> context) {
+    public static Locale getLocale(Map<String, ? extends Object> context) {
         Locale locale = (Locale) context.get("locale");
         if (locale == null) {
             locale = Locale.getDefault();
@@ -676,46 +442,6 @@ public final class ServiceUtil {
         return UtilGenerics.cast(UtilMisc.toMap(args));
     }
 
-    public static Map<String, Object> resetJob(DispatchContext dctx, Map<String, Object> context) {
-        Delegator delegator = dctx.getDelegator();
-        Security security = dctx.getSecurity();
-        GenericValue userLogin = (GenericValue) context.get("userLogin");
-        Locale locale = getLocale(context);
-
-        if (!security.hasPermission("SERVICE_INVOKE_ANY", userLogin)) {
-            String errMsg = UtilProperties.getMessage(ServiceUtil.resource, "serviceUtil.no_permission_to_run", locale) + ".";
-            return ServiceUtil.returnError(errMsg);
-        }
-
-        String jobId = (String) context.get("jobId");
-        GenericValue job;
-        try {
-            job = EntityQuery.use(delegator).from("JobSandbox").where("jobId", jobId).cache().queryOne();
-        } catch (GenericEntityException e) {
-            Debug.logError(e, module);
-            return ServiceUtil.returnError(e.getMessage());
-        }
-
-        // update the job
-        if (job != null) {
-            job.set("statusId", "SERVICE_PENDING");
-            job.set("startDateTime", null);
-            job.set("finishDateTime", null);
-            job.set("cancelDateTime", null);
-            job.set("runByInstanceId", null);
-
-            // save the job
-            try {
-                job.store();
-            } catch (GenericEntityException e) {
-                Debug.logError(e, module);
-                return ServiceUtil.returnError(e.getMessage());
-            }
-        }
-
-        return ServiceUtil.returnSuccess();
-    }
-
     /**
      * Checks all incoming service attributes and look for fields with the same
      * name in the incoming map and copy those onto the outgoing map. Also
diff --git a/framework/service/src/main/java/org/apache/ofbiz/service/job/JobManager.java b/framework/service/src/main/java/org/apache/ofbiz/service/job/JobManager.java
index c9173aa..d9051de 100644
--- a/framework/service/src/main/java/org/apache/ofbiz/service/job/JobManager.java
+++ b/framework/service/src/main/java/org/apache/ofbiz/service/job/JobManager.java
@@ -20,6 +20,8 @@ package org.apache.ofbiz.service.job;
 
 import java.io.IOException;
 import java.sql.Timestamp;
+import java.time.Duration;
+import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Date;
@@ -246,19 +248,24 @@ public final class JobManager {
         }
         if (poll.isEmpty()) {
             // No jobs to run, see if there are any jobs to purge
-            Calendar cal = Calendar.getInstance();
+            Timestamp purgeTime;
             try {
                 int daysToKeep = ServiceConfigUtil.getServiceEngine().getThreadPool().getPurgeJobDays();
-                cal.add(Calendar.DAY_OF_YEAR, -daysToKeep);
+                purgeTime = Timestamp.from(Instant.now().minus(Duration.ofDays(daysToKeep)));
             } catch (GenericConfigException e) {
                 Debug.logWarning(e, "Unable to get purge job days: ", module);
                 return Collections.emptyList();
             }
-            Timestamp purgeTime = new Timestamp(cal.getTimeInMillis());
-            List<EntityExpr> finExp = UtilMisc.toList(EntityCondition.makeCondition("finishDateTime", EntityOperator.NOT_EQUAL, null), EntityCondition.makeCondition("finishDateTime", EntityOperator.LESS_THAN, purgeTime));
-            List<EntityExpr> canExp = UtilMisc.toList(EntityCondition.makeCondition("cancelDateTime", EntityOperator.NOT_EQUAL, null), EntityCondition.makeCondition("cancelDateTime", EntityOperator.LESS_THAN, purgeTime));
-            EntityCondition doneCond = EntityCondition.makeCondition(UtilMisc.toList(EntityCondition.makeCondition(canExp), EntityCondition.makeCondition(finExp)), EntityOperator.OR);
-            mainCondition = EntityCondition.makeCondition(UtilMisc.toList(EntityCondition.makeCondition("runByInstanceId", instanceId), doneCond));
+            List<EntityCondition> purgeCondition = UtilMisc.toList(
+                    EntityCondition.makeCondition("runByInstanceId", instanceId),
+                    EntityCondition.makeCondition(UtilMisc.toList(
+                            EntityCondition.makeCondition(UtilMisc.toList(
+                                    EntityCondition.makeCondition("finishDateTime", EntityOperator.NOT_EQUAL, null),
+                                    EntityCondition.makeCondition("finishDateTime", EntityOperator.LESS_THAN, purgeTime))),
+                            EntityCondition.makeCondition(UtilMisc.toList(
+                                    EntityCondition.makeCondition("cancelDateTime", EntityOperator.NOT_EQUAL, null),
+                                    EntityCondition.makeCondition("cancelDateTime", EntityOperator.LESS_THAN, purgeTime)))),
+                            EntityOperator.OR));
             beganTransaction = false;
             try {
                 beganTransaction = TransactionUtil.begin();
@@ -266,18 +273,12 @@ public final class JobManager {
                     Debug.logWarning("Unable to poll JobSandbox for jobs; unable to begin transaction.", module);
                     return Collections.emptyList();
                 }
-                try (EntityListIterator jobsIterator = EntityQuery.use(delegator).from("JobSandbox").where(mainCondition).orderBy("jobId").queryIterator()) {
-                    GenericValue jobValue = jobsIterator.next();
-                    while (jobValue != null) {
-                        poll.add(new PurgeJob(jobValue));
-                        if (poll.size() == limit) {
-                            break;
-                        }
-                        jobValue = jobsIterator.next();
-                    }
-                } catch (GenericEntityException e) {
-                    Debug.logWarning(e, module);
-                }
+                List<GenericValue> jobs = EntityQuery.use(delegator).from("JobSandbox")
+                        .where(purgeCondition)
+                        .select("jobId")
+                        .maxRows(limit)
+                        .queryList();
+                jobs.forEach(jobValue -> poll.add(new PurgeJob(jobValue)));
                 TransactionUtil.commit(beganTransaction);
             } catch (Throwable t) {
                 String errMsg = "Exception thrown while polling JobSandbox: ";
diff --git a/framework/service/src/main/java/org/apache/ofbiz/service/job/JobServices.java b/framework/service/src/main/java/org/apache/ofbiz/service/job/JobServices.java
new file mode 100644
index 0000000..f3242b3
--- /dev/null
+++ b/framework/service/src/main/java/org/apache/ofbiz/service/job/JobServices.java
@@ -0,0 +1,187 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *******************************************************************************/
+package org.apache.ofbiz.service.job;
+
+import java.sql.Timestamp;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import org.apache.ofbiz.base.config.GenericConfigException;
+import org.apache.ofbiz.base.util.Debug;
+import org.apache.ofbiz.base.util.UtilDateTime;
+import org.apache.ofbiz.base.util.UtilMisc;
+import org.apache.ofbiz.base.util.UtilProperties;
+import org.apache.ofbiz.entity.Delegator;
+import org.apache.ofbiz.entity.GenericEntityException;
+import org.apache.ofbiz.entity.GenericValue;
+import org.apache.ofbiz.entity.condition.EntityCondition;
+import org.apache.ofbiz.entity.condition.EntityOperator;
+import org.apache.ofbiz.entity.util.EntityQuery;
+import org.apache.ofbiz.security.Security;
+import org.apache.ofbiz.service.DispatchContext;
+import org.apache.ofbiz.service.ServiceUtil;
+import org.apache.ofbiz.service.config.ServiceConfigUtil;
+
+public class JobServices {
+
+    public static final String module = JobServices.class.getName();
+    private static final String resource = "ServiceErrorUiLabels";
+
+    public static Map<String, Object> cancelJob(DispatchContext dctx, Map<String, ? extends Object> context) {
+        Delegator delegator = dctx.getDelegator();
+        Locale locale = ServiceUtil.getLocale(context);
+
+        String jobId = (String) context.get("jobId");
+        Map<String, Object> fields = UtilMisc.<String, Object>toMap("jobId", jobId);
+
+        GenericValue job = null;
+        try {
+            job = EntityQuery.use(delegator).from("JobSandbox").where("jobId", jobId).queryOne();
+            if (job != null) {
+                job.set("cancelDateTime", UtilDateTime.nowTimestamp());
+                job.set("statusId", "SERVICE_CANCELLED");
+                job.store();
+            }
+        } catch (GenericEntityException e) {
+            Debug.logError(e, module);
+            String errMsg = UtilProperties.getMessage(resource, "serviceUtil.unable_to_cancel_job", locale) + " : " + fields;
+            return ServiceUtil.returnError(errMsg);
+        }
+
+        if (job != null) {
+            Timestamp cancelDate = job.getTimestamp("cancelDateTime");
+            Map<String, Object> result = ServiceUtil.returnSuccess();
+            result.put("cancelDateTime", cancelDate);
+            result.put("statusId", "SERVICE_PENDING"); // To more easily see current pending jobs and possibly cancel some others
+            return result;
+        }
+        String errMsg = UtilProperties.getMessage(resource, "serviceUtil.unable_to_cancel_job", locale) + " : " + null;
+        return ServiceUtil.returnError(errMsg);
+    }
+
+    public static Map<String, Object> cancelJobRetries(DispatchContext dctx, Map<String, ? extends Object> context) {
+        Delegator delegator = dctx.getDelegator();
+        Security security = dctx.getSecurity();
+        GenericValue userLogin = (GenericValue) context.get("userLogin");
+        Locale locale = ServiceUtil.getLocale(context);
+        if (!security.hasPermission("SERVICE_INVOKE_ANY", userLogin)) {
+            String errMsg = UtilProperties.getMessage(resource, "serviceUtil.no_permission_to_run", locale) + ".";
+            return ServiceUtil.returnError(errMsg);
+        }
+
+        String jobId = (String) context.get("jobId");
+        Map<String, Object> fields = UtilMisc.<String, Object>toMap("jobId", jobId);
+
+        GenericValue job = null;
+        try {
+            job = EntityQuery.use(delegator).from("JobSandbox").where("jobId", jobId).queryOne();
+            if (job != null) {
+                job.set("maxRetry", 0L);
+                job.store();
+            }
+        } catch (GenericEntityException e) {
+            Debug.logError(e, module);
+            String errMsg = UtilProperties.getMessage(resource, "serviceUtil.unable_to_cancel_job_retries", locale) + " : " + fields;
+            return ServiceUtil.returnError(errMsg);
+        }
+
+        if (job != null) {
+            return ServiceUtil.returnSuccess();
+        }
+        String errMsg = UtilProperties.getMessage(resource, "serviceUtil.unable_to_cancel_job_retries", locale) + " : " + null;
+        return ServiceUtil.returnError(errMsg);
+    }
+
+    public static Map<String, Object> purgeOldJobs(DispatchContext dctx, Map<String, ? extends Object> context) {
+        Locale locale = (Locale)context.get("locale");
+        String sendPool = (String) context.get("poolId");
+        Integer daysToKeep = (Integer) context.get("daysToKeep");
+        Integer limit = (Integer) context.get("limit");
+        try {
+            if (sendPool == null) sendPool = ServiceConfigUtil.getServiceEngine().getThreadPool().getSendToPool();
+            if (daysToKeep == null) daysToKeep = ServiceConfigUtil.getServiceEngine().getThreadPool().getPurgeJobDays();
+            if (limit == null) limit = ServiceConfigUtil.getServiceEngine().getThreadPool().getMaxThreads();
+        } catch (GenericConfigException e) {
+            Debug.logWarning(e, "Exception thrown while getting service configuration: ", module);
+            return ServiceUtil.returnError(UtilProperties.getMessage(resource, "ServiceExceptionThrownWhileGettingServiceConfiguration", UtilMisc.toMap("errorString", e), locale));
+        }
+        Delegator delegator = dctx.getDelegator();
+        Timestamp purgeTime = Timestamp.from(Instant.now().minus(Duration.ofDays(daysToKeep)));
+
+        // create the conditions to query
+        List<EntityCondition> purgeCondition = UtilMisc.toList(
+                EntityCondition.makeCondition("poolId", sendPool),
+                EntityCondition.makeCondition(UtilMisc.toList(
+                        EntityCondition.makeCondition(UtilMisc.toList(
+                                EntityCondition.makeCondition("finishDateTime", EntityOperator.NOT_EQUAL, null),
+                                EntityCondition.makeCondition("finishDateTime", EntityOperator.LESS_THAN, purgeTime))),
+                        EntityCondition.makeCondition(UtilMisc.toList(
+                                EntityCondition.makeCondition("cancelDateTime", EntityOperator.NOT_EQUAL, null),
+                                EntityCondition.makeCondition("cancelDateTime", EntityOperator.LESS_THAN, purgeTime)))),
+                        EntityOperator.OR));
+
+        EntityQuery jobQuery = EntityQuery.use(delegator).from("JobSandbox")
+                .where(purgeCondition)
+                .select("jobId");
+        if (limit != null) {
+            jobQuery.maxRows(limit);
+        }
+        try {
+            jobQuery.queryList().forEach(JobUtil::removeJob);
+        } catch (GenericEntityException e) {
+            Debug.logWarning(e, module);
+        }
+
+        return ServiceUtil.returnSuccess();
+    }
+
+    public static Map<String, Object> resetJob(DispatchContext dctx, Map<String, Object> context) {
+        Delegator delegator = dctx.getDelegator();
+
+        String jobId = (String) context.get("jobId");
+        GenericValue job;
+        try {
+            job = EntityQuery.use(delegator).from("JobSandbox").where("jobId", jobId).queryOne();
+        } catch (GenericEntityException e) {
+            Debug.logError(e, module);
+            return ServiceUtil.returnError(e.getMessage());
+        }
+
+        // update the job
+        if (job != null) {
+            job.set("statusId", "SERVICE_PENDING");
+            job.set("startDateTime", null);
+            job.set("finishDateTime", null);
+            job.set("cancelDateTime", null);
+            job.set("runByInstanceId", null);
+
+            // save the job
+            try {
+                job.store();
+            } catch (GenericEntityException e) {
+                Debug.logError(e, module);
+                return ServiceUtil.returnError(e.getMessage());
+            }
+        }
+
+        return ServiceUtil.returnSuccess();
+    }
+}
diff --git a/framework/service/src/main/java/org/apache/ofbiz/service/job/PurgeJob.java b/framework/service/src/main/java/org/apache/ofbiz/service/job/JobUtil.java
similarity index 55%
copy from framework/service/src/main/java/org/apache/ofbiz/service/job/PurgeJob.java
copy to framework/service/src/main/java/org/apache/ofbiz/service/job/JobUtil.java
index e8a0a72..4e6724a 100644
--- a/framework/service/src/main/java/org/apache/ofbiz/service/job/PurgeJob.java
+++ b/framework/service/src/main/java/org/apache/ofbiz/service/job/JobUtil.java
@@ -18,37 +18,27 @@
  *******************************************************************************/
 package org.apache.ofbiz.service.job;
 
-import java.io.Serializable;
 import java.util.List;
-
+import javax.transaction.Transaction;
 import org.apache.ofbiz.base.util.Debug;
 import org.apache.ofbiz.entity.GenericEntityException;
 import org.apache.ofbiz.entity.GenericValue;
+import org.apache.ofbiz.entity.transaction.GenericTransactionException;
+import org.apache.ofbiz.entity.transaction.TransactionUtil;
 
-/**
- * Purge job - removes a JobSandbox entity value and its related values.
- */
-@SuppressWarnings("serial")
-public class PurgeJob extends AbstractJob implements Serializable {
-
-    public static final String module = PurgeJob.class.getName();
+public class JobUtil {
 
-    private final GenericValue jobValue;
+    public static final String module = JobUtil.class.getName();
 
-    public PurgeJob(GenericValue jobValue) {
-        super(jobValue.getString("jobId"), "Purge " + jobValue.getString("jobName"));
-        this.jobValue = jobValue;
-    }
-
-    @Override
-    public void exec() throws InvalidJobException {
-        if (currentState != State.QUEUED) {
-            throw new InvalidJobException("Illegal state change");
-        }
-        currentState = State.RUNNING;
+    public static void removeJob(GenericValue jobValue) {
+        // always suspend the current transaction; use the one internally
+        boolean beganTransaction = false;
+        Transaction parent = null;
         try {
-            // TODO: This might need to be in a transaction - to avoid the possibility of
-            // leaving orphaned related values.
+            if (TransactionUtil.getStatus() != TransactionUtil.STATUS_NO_TRANSACTION) {
+                parent = TransactionUtil.suspend();
+            }
+            beganTransaction = TransactionUtil.begin(60);
             jobValue.remove();
             GenericValue relatedValue = jobValue.getRelatedOne("RecurrenceInfo", false);
             if (relatedValue != null) {
@@ -65,29 +55,26 @@ public class PurgeJob extends AbstractJob implements Serializable {
                     relatedValue.remove();
                 }
             }
-            Debug.logInfo("Purged job " + getJobId(), module);
-        } catch (GenericEntityException e) {
-            Debug.logWarning(e, "Exception thrown while purging job: ", module);
-        }
-    }
-
-    @Override
-    public boolean isValid() {
-        return currentState == State.CREATED;
-    }
-
-    @Override
-    public void deQueue() throws InvalidJobException {
-        if (currentState != State.QUEUED) {
-            throw new InvalidJobException("Illegal state change");
+            TransactionUtil.commit(beganTransaction);
+            if (Debug.infoOn()) {
+                Debug.logInfo("Purged job " + jobValue.get("jobId"), module);
+            }
+        } catch (Throwable t) {
+            String errMsg = "Exception thrown while purging job: ";
+            try {
+                TransactionUtil.rollback(beganTransaction, errMsg, t);
+            } catch (GenericEntityException e) {
+                Debug.logWarning(e, "Exception thrown while rolling back transaction: ", module);
+            }
+            Debug.logWarning(errMsg, module);
+        } finally {
+            if (parent != null) {
+                try {
+                    TransactionUtil.resume(parent);
+                } catch (GenericTransactionException e) {
+                    Debug.logWarning(e, "Exception thrown while resume transaction: ", module);
+                }
+            }
         }
     }
-
-    /* 
-     * Returns JobPriority.LOW
-     */
-    @Override
-    public long getPriority() {
-        return JobPriority.LOW;
-    }
 }
diff --git a/framework/service/src/main/java/org/apache/ofbiz/service/job/PurgeJob.java b/framework/service/src/main/java/org/apache/ofbiz/service/job/PurgeJob.java
index e8a0a72..e73177b 100644
--- a/framework/service/src/main/java/org/apache/ofbiz/service/job/PurgeJob.java
+++ b/framework/service/src/main/java/org/apache/ofbiz/service/job/PurgeJob.java
@@ -19,10 +19,7 @@
 package org.apache.ofbiz.service.job;
 
 import java.io.Serializable;
-import java.util.List;
 
-import org.apache.ofbiz.base.util.Debug;
-import org.apache.ofbiz.entity.GenericEntityException;
 import org.apache.ofbiz.entity.GenericValue;
 
 /**
@@ -46,29 +43,7 @@ public class PurgeJob extends AbstractJob implements Serializable {
             throw new InvalidJobException("Illegal state change");
         }
         currentState = State.RUNNING;
-        try {
-            // TODO: This might need to be in a transaction - to avoid the possibility of
-            // leaving orphaned related values.
-            jobValue.remove();
-            GenericValue relatedValue = jobValue.getRelatedOne("RecurrenceInfo", false);
-            if (relatedValue != null) {
-                List<GenericValue> valueList = relatedValue.getRelated("JobSandbox", null, null, false);
-                if (valueList.isEmpty()) {
-                    relatedValue.remove();
-                    relatedValue.removeRelated("RecurrenceRule");
-                }
-            }
-            relatedValue = jobValue.getRelatedOne("RuntimeData", false);
-            if (relatedValue != null) {
-                List<GenericValue> valueList = relatedValue.getRelated("JobSandbox", null, null, false);
-                if (valueList.isEmpty()) {
-                    relatedValue.remove();
-                }
-            }
-            Debug.logInfo("Purged job " + getJobId(), module);
-        } catch (GenericEntityException e) {
-            Debug.logWarning(e, "Exception thrown while purging job: ", module);
-        }
+        JobUtil.removeJob(jobValue);
     }
 
     @Override