You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ofbiz.apache.org by ad...@apache.org on 2012/08/05 10:01:29 UTC

svn commit: r1369536 - /ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java

Author: adrianc
Date: Sun Aug  5 08:01:29 2012
New Revision: 1369536

URL: http://svn.apache.org/viewvc?rev=1369536&view=rev
Log:
Refactored PersistedServiceJob.java so that it contains the GenericValue it represents. This improves throughput because the original code hit the JobSandbox entity about 10 times for each job, and with this change, there are only two JobSandbox hits per job.

Modified:
    ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java

Modified: ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java
URL: http://svn.apache.org/viewvc/ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java?rev=1369536&r1=1369535&r2=1369536&view=diff
==============================================================================
--- ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java (original)
+++ ofbiz/trunk/framework/service/src/org/ofbiz/service/job/PersistedServiceJob.java Sun Aug  5 08:01:29 2012
@@ -54,17 +54,19 @@ import org.apache.commons.lang.StringUti
 /**
  * A {@link Job} that is backed by the entity engine. Job data is stored
  * in the JobSandbox entity.
+ * <p>When the job is queued, this object "owns" the entity value. Any external changes
+ * are ignored except the cancelDateTime field - jobs can be canceled after they are queued.</p>
  */
 @SuppressWarnings("serial")
 public class PersistedServiceJob extends GenericServiceJob {
 
     public static final String module = PersistedServiceJob.class.getName();
 
-    private transient Delegator delegator = null;
+    private final transient Delegator delegator;
     private long nextRecurrence = -1;
-    private long maxRetry = -1;
-    private long currentRetryCount = 0;
-    private boolean warningLogged = false;
+    private final long maxRetry;
+    private final long currentRetryCount;
+    private final GenericValue jobValue;
 
     /**
      * Creates a new PersistedServiceJob
@@ -75,6 +77,7 @@ public class PersistedServiceJob extends
     public PersistedServiceJob(DispatchContext dctx, GenericValue jobValue, GenericRequester req) {
         super(dctx, jobValue.getString("jobId"), jobValue.getString("jobName"), null, null, req);
         this.delegator = dctx.getDelegator();
+        this.jobValue = jobValue;
         Timestamp storedDate = jobValue.getTimestamp("runTime");
         this.runtime = storedDate.getTime();
         this.maxRetry = jobValue.get("maxRetry") != null ? jobValue.getLong("maxRetry").longValue() : -1;
@@ -83,21 +86,13 @@ public class PersistedServiceJob extends
             this.currentRetryCount = retryCount.longValue();
         } else {
             // backward compatibility
-            this.currentRetryCount = PersistedServiceJob.getRetries(jobValue, this.delegator);
+            this.currentRetryCount = getRetries(this.delegator);
         }
     }
 
     @Override
     public void queue() throws InvalidJobException {
         super.queue();
-        // refresh the job object
-        GenericValue jobValue = null;
-        try {
-            jobValue = this.getJob();
-            jobValue.refresh();
-        } catch (GenericEntityException e) {
-            throw new InvalidJobException("Unable to refresh JobSandbox value", e);
-        }
         Timestamp cancelTime = jobValue.getTimestamp("cancelDateTime");
         Timestamp startTime = jobValue.getTimestamp("startDateTime");
         if (cancelTime != null || startTime != null) {
@@ -110,7 +105,7 @@ public class PersistedServiceJob extends
             try {
                 jobValue.store();
             } catch (GenericEntityException e) {
-                throw new InvalidJobException("Unable to set the startDateTime on the current job [" + getJobId() + "]; not running!", e);
+                throw new InvalidJobException("Unable to set the startDateTime and statusId on the current job [" + getJobId() + "]; not running!", e);
             }
         }
     }
@@ -118,45 +113,48 @@ public class PersistedServiceJob extends
     @Override
     protected void init() throws InvalidJobException {
         super.init();
-
-        // configure any addition recurrences
-        GenericValue job = this.getJob();
+        try {
+            // Job might have been canceled after it was placed in the queue.
+            jobValue.refresh();
+        } catch (GenericEntityException e) {
+            throw new InvalidJobException("Unable to refresh JobSandbox value", e);
+        }
+        if (jobValue.getTimestamp("cancelDateTime") != null) {
+            // Job cancelled
+            throw new InvalidJobException("Job [" + getJobId() + "] was cancelled");
+        }
+        String instanceId = UtilProperties.getPropertyValue("general.properties", "unique.instanceId", "ofbiz0");
+        if (!instanceId.equals(jobValue.getString("runByInstanceId"))) {
+            // This condition isn't possible, but we will leave it here.
+            throw new InvalidJobException("Job has been accepted by a different instance!");
+        }
+        // configure any additional recurrences
         long maxRecurrenceCount = -1;
         long currentRecurrenceCount = 0;
         TemporalExpression expr = null;
-        RecurrenceInfo recurrence = JobManager.getRecurrenceInfo(job);
+        RecurrenceInfo recurrence = JobManager.getRecurrenceInfo(jobValue);
         if (recurrence != null) {
-            if (!this.warningLogged) {
-                Debug.logWarning("Persisted Job [" + getJobId() + "] references a RecurrenceInfo, recommend using TemporalExpression instead", module);
-                this.warningLogged = true;
-            }
+            Debug.logWarning("Persisted Job [" + getJobId() + "] references a RecurrenceInfo, recommend using TemporalExpression instead", module);
             currentRecurrenceCount = recurrence.getCurrentCount();
             expr = RecurrenceInfo.toTemporalExpression(recurrence);
         }
-        if (expr == null && UtilValidate.isNotEmpty(job.getString("tempExprId"))) {
+        if (expr == null && UtilValidate.isNotEmpty(jobValue.getString("tempExprId"))) {
             try {
-                expr = TemporalExpressionWorker.getTemporalExpression(this.delegator, job.getString("tempExprId"));
+                expr = TemporalExpressionWorker.getTemporalExpression(this.delegator, jobValue.getString("tempExprId"));
             } catch (GenericEntityException e) {
                 throw new RuntimeException(e.getMessage());
             }
         }
-
-        String instanceId = UtilProperties.getPropertyValue("general.properties", "unique.instanceId", "ofbiz0");
-        if (!instanceId.equals(job.getString("runByInstanceId"))) {
-            throw new InvalidJobException("Job has been accepted by a different instance!");
-        }
-
-        if (job.get("maxRecurrenceCount") != null) {
-            maxRecurrenceCount = job.getLong("maxRecurrenceCount").longValue();
+        if (jobValue.get("maxRecurrenceCount") != null) {
+            maxRecurrenceCount = jobValue.getLong("maxRecurrenceCount").longValue();
         }
-        if (job.get("currentRecurrenceCount") != null) {
-            currentRecurrenceCount = job.getLong("currentRecurrenceCount").longValue();
+        if (jobValue.get("currentRecurrenceCount") != null) {
+            currentRecurrenceCount = jobValue.getLong("currentRecurrenceCount").longValue();
         }
         if (maxRecurrenceCount != -1) {
             currentRecurrenceCount++;
-            job.set("currentRecurrenceCount", currentRecurrenceCount);
+            jobValue.set("currentRecurrenceCount", currentRecurrenceCount);
         }
-
         try {
             if (expr != null && (maxRecurrenceCount == -1 || currentRecurrenceCount <= maxRecurrenceCount)) {
                 if (recurrence != null) {
@@ -164,26 +162,25 @@ public class PersistedServiceJob extends
                 }
                 Calendar next = expr.next(Calendar.getInstance());
                 if (next != null) {
-                    createRecurrence(job, next.getTimeInMillis(), false);
+                    createRecurrence(next.getTimeInMillis(), false);
                 }
             }
         } catch (GenericEntityException e) {
-            throw new RuntimeException(e.getMessage());
+            throw new InvalidJobException(e);
         }
         if (Debug.infoOn()) Debug.logInfo("Job  [" + getJobName() + "] Id ["  + getJobId() + "] -- Next runtime: " + new Date(nextRecurrence), module);
     }
 
-    private void createRecurrence(GenericValue job, long next, boolean isRetryOnFailure) throws GenericEntityException {
+    private void createRecurrence(long next, boolean isRetryOnFailure) throws GenericEntityException {
         if (Debug.verboseOn()) Debug.logVerbose("Next runtime returned: " + next, module);
-
         if (next > runtime) {
-            String pJobId = job.getString("parentJobId");
+            String pJobId = jobValue.getString("parentJobId");
             if (pJobId == null) {
-                pJobId = job.getString("jobId");
+                pJobId = jobValue.getString("jobId");
             }
-            GenericValue newJob = GenericValue.create(job);
+            GenericValue newJob = GenericValue.create(jobValue);
             newJob.remove("jobId");
-            newJob.set("previousJobId", job.getString("jobId"));
+            newJob.set("previousJobId", jobValue.getString("jobId"));
             newJob.set("parentJobId", pJobId);
             newJob.set("statusId", "SERVICE_PENDING");
             newJob.set("startDateTime", null);
@@ -203,11 +200,9 @@ public class PersistedServiceJob extends
     @Override
     protected void finish(Map<String, Object> result) throws InvalidJobException {
         super.finish(result);
-
         // set the finish date
-        GenericValue job = getJob();
-        job.set("statusId", "SERVICE_FINISHED");
-        job.set("finishDateTime", UtilDateTime.nowTimestamp());
+        jobValue.set("statusId", "SERVICE_FINISHED");
+        jobValue.set("finishDateTime", UtilDateTime.nowTimestamp());
         String jobResult = null;
         if (ServiceUtil.isError(result)) {
             jobResult = StringUtils.substring(ServiceUtil.getErrorMessage(result), 0, 255);
@@ -215,10 +210,10 @@ public class PersistedServiceJob extends
             jobResult = StringUtils.substring(ServiceUtil.makeSuccessMessage(result, "", "", "", ""), 0, 255);
         }
         if (UtilValidate.isNotEmpty(jobResult)) {
-            job.set("jobResult", jobResult);
+            jobValue.set("jobResult", jobResult);
         }
         try {
-            job.store();
+            jobValue.store();
         } catch (GenericEntityException e) {
             Debug.logError(e, "Cannot update the job [" + getJobId() + "] sandbox", module);
         }
@@ -227,20 +222,17 @@ public class PersistedServiceJob extends
     @Override
     protected void failed(Throwable t) throws InvalidJobException {
         super.failed(t);
-
-        GenericValue job = getJob();
         // if the job has not been re-scheduled; we need to re-schedule and run again
         if (nextRecurrence == -1) {
             if (this.canRetry()) {
                 // create a recurrence
                 Calendar cal = Calendar.getInstance();
-                cal.setTime(new Date());
                 cal.add(Calendar.MINUTE, ServiceConfigUtil.getFailedRetryMin());
                 long next = cal.getTimeInMillis();
                 try {
-                    createRecurrence(job, next, true);
-                } catch (GenericEntityException gee) {
-                    Debug.logError(gee, "ERROR: Unable to re-schedule job [" + getJobId() + "] to re-run : " + job, module);
+                    createRecurrence(next, true);
+                } catch (GenericEntityException e) {
+                    Debug.logError(e, "Unable to re-schedule job [" + getJobId() + "]: ", module);
                 }
                 Debug.logInfo("Persisted Job [" + getJobId() + "] Failed Re-Scheduling : " + next, module);
             } else {
@@ -248,44 +240,40 @@ public class PersistedServiceJob extends
             }
         }
         // set the failed status
-        job.set("statusId", "SERVICE_FAILED");
-        job.set("finishDateTime", UtilDateTime.nowTimestamp());
-        job.set("jobResult", StringUtils.substring(t.getMessage(), 0, 255));
+        jobValue.set("statusId", "SERVICE_FAILED");
+        jobValue.set("finishDateTime", UtilDateTime.nowTimestamp());
+        jobValue.set("jobResult", StringUtils.substring(t.getMessage(), 0, 255));
         try {
-            job.store();
+            jobValue.store();
         } catch (GenericEntityException e) {
-            Debug.logError(e, "Cannot update the job sandbox", module);
+            Debug.logError(e, "Cannot update the JobSandbox entity", module);
         }
     }
 
     @Override
     protected String getServiceName() throws InvalidJobException {
-        GenericValue jobObj = getJob();
-        if (jobObj == null || jobObj.get("serviceName") == null) {
+        if (jobValue == null || jobValue.get("serviceName") == null) {
             return null;
         }
-        return jobObj.getString("serviceName");
+        return jobValue.getString("serviceName");
     }
 
     @Override
     protected Map<String, Object> getContext() throws InvalidJobException {
         Map<String, Object> context = null;
         try {
-            GenericValue jobObj = getJob();
-            if (!UtilValidate.isEmpty(jobObj.getString("runtimeDataId"))) {
-                GenericValue contextObj = jobObj.getRelatedOne("RuntimeData", false);
+            if (!UtilValidate.isEmpty(jobValue.getString("runtimeDataId"))) {
+                GenericValue contextObj = jobValue.getRelatedOne("RuntimeData", false);
                 if (contextObj != null) {
                     context = UtilGenerics.checkMap(XmlSerializer.deserialize(contextObj.getString("runtimeInfo"), delegator), String.class, Object.class);
                 }
             }
-
             if (context == null) {
                 context = FastMap.newInstance();
             }
-
             // check the runAsUser
-            if (!UtilValidate.isEmpty(jobObj.getString("runAsUser"))) {
-                context.put("userLogin", ServiceUtil.getUserLogin(dctx, context, jobObj.getString("runAsUser")));
+            if (!UtilValidate.isEmpty(jobValue.getString("runAsUser"))) {
+                context.put("userLogin", ServiceUtil.getUserLogin(dctx, context, jobValue.getString("runAsUser")));
             }
         } catch (GenericEntityException e) {
             Debug.logError(e, "PersistedServiceJob.getContext(): Entity Exception", module);
@@ -301,42 +289,26 @@ public class PersistedServiceJob extends
         if (context == null) {
             Debug.logError("Job context is null", module);
         }
-
         return context;
     }
 
-    // gets the job value object
-    private GenericValue getJob() throws InvalidJobException {
-        try {
-            GenericValue jobObj = delegator.findOne("JobSandbox", false, "jobId", getJobId());
-            if (jobObj == null) {
-                throw new InvalidJobException("Job [" + getJobId() + "] came back null from datasource from delegator " + delegator.getDelegatorName());
-            }
-            return jobObj;
-        } catch (GenericEntityException e) {
-            throw new InvalidJobException("Cannot get job definition [" + getJobId() + "] from entity", e);
-        }
-    }
-
     // returns the number of current retries
-    private static long getRetries(GenericValue job, Delegator delegator) {
-        String pJobId = job.getString("parentJobId");
+    private long getRetries(Delegator delegator) {
+        String pJobId = jobValue.getString("parentJobId");
         if (pJobId == null) {
             return 0;
         }
-
         long count = 0;
         try {
             EntityFieldMap ecl = EntityCondition.makeConditionMap("parentJobId", pJobId, "statusId", "SERVICE_FAILED");
             count = delegator.findCountByCondition("JobSandbox", ecl, null, null);
         } catch (GenericEntityException e) {
-            Debug.logError(e, module);
+            Debug.logError(e, "Exception thrown while counting retries: ", module);
         }
-
         return count + 1; // add one for the parent
     }
 
-    private boolean canRetry() throws InvalidJobException {
+    private boolean canRetry() {
         if (maxRetry == -1) {
             return true;
         }