You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by mi...@apache.org on 2009/10/14 22:05:34 UTC

svn commit: r825253 - /ode/branches/APACHE_ODE_1.X/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java

Author: midon
Date: Wed Oct 14 20:05:33 2009
New Revision: 825253

URL: http://svn.apache.org/viewvc?rev=825253&view=rev
Log:
avoid a race condition where a job was executed twice

Modified:
    ode/branches/APACHE_ODE_1.X/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java

Modified: ode/branches/APACHE_ODE_1.X/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java?rev=825253&r1=825252&r2=825253&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.X/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java (original)
+++ ode/branches/APACHE_ODE_1.X/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java Wed Oct 14 20:05:33 2009
@@ -116,6 +116,11 @@
         Used to avoid cases where a job would be dispatched twice if the server is under high load and
         does not fully process a job before it is reloaded from the database. */
     private ConcurrentHashMap<String, Long> _outstandingJobs = new ConcurrentHashMap<String, Long>();
+    /** Set of Jobs processed since the last LoadImmediate task.
+        This prevents a race condition where a job is processed twice. This could happen if a LoadImediate tasks loads a job
+        from the db before the job is processed but puts it in the _outstandingJobs map after the job was processed .
+        In such a case the job is no longer in the _outstandingJobs map, and so it's queued again. */
+    private ConcurrentHashMap<String, Long> _processedSinceLastLoadTask = new ConcurrentHashMap<String, Long>();
 
     private boolean _running;
 
@@ -481,6 +486,8 @@
                 }
                 return null;
             } finally {
+                // the order of these 2 actions is crucial to avoid a race condition.
+                _processedSinceLastLoadTask.put(job.jobId, job.schedDate);
                 _outstandingJobs.remove(job.jobId);
             }
         }
@@ -637,6 +644,8 @@
                     return _db.dequeueImmediate(_nodeId, System.currentTimeMillis() + _immediateInterval, batch);
                 }
             });
+            if (__log.isDebugEnabled()) __log.debug("loaded "+jobs.size()+" jobs from db");
+
             long delayedTime = System.currentTimeMillis() - _warningDelay;
             int delayedCount = 0;
             boolean runningLate;
@@ -652,12 +661,15 @@
                     delayedCount++;
                 }
                 if (__log.isDebugEnabled())
-                    __log.debug("todo.enqueue job from db: " + j.jobId + " for " + j.schedDate + "(" + f.format(j.schedDate)+") "+(runningLate?" delayed=true":""));
+                    __log.debug("todo.enqueue job from db: " + j.jobId.trim() + " for " + j.schedDate + "(" + f.format(j.schedDate)+") "+(runningLate?" delayed=true":""));
                 enqueue(j);
             }
             if (delayedCount > 0) {
                 __log.warn("Dispatching jobs with more than "+(_warningDelay/60000)+" minutes delay. Either the server was down for some time or the job load is greater than available capacity");
             }
+
+            // clear only if the batch succeeded
+            _processedSinceLastLoadTask.clear();
             return true;
         } catch (Exception ex) {
             __log.error("Error loading immediate jobs from database.", ex);
@@ -668,13 +680,16 @@
     }
 
     void enqueue(Job job) {
-        Long outstanding = _outstandingJobs.putIfAbsent(job.jobId, job.schedDate);
-        if (outstanding == null) {
+        boolean outstanding = _outstandingJobs.putIfAbsent(job.jobId, job.schedDate) == null;
+        boolean processed = _processedSinceLastLoadTask.get(job.jobId) == null;
+        if (!outstanding && !processed) {
             if (job.schedDate <= System.currentTimeMillis()) {
                 runJob(job);
             } else {
                 _todo.enqueue(job);
             }
+        }else if(__log.isDebugEnabled()){
+            __log.debug("Job "+job.jobId+" is already queued or processed");
         }
     }