You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by bo...@apache.org on 2008/11/15 14:52:09 UTC

svn commit: r714258 - /ode/branches/APACHE_ODE_1.X/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java

Author: boisvert
Date: Sat Nov 15 05:52:08 2008
New Revision: 714258

URL: http://svn.apache.org/viewvc?rev=714258&view=rev
Log:
ODE-424: SimpleScheduler creates duplicate jobs when job execution fails

Modified:
    ode/branches/APACHE_ODE_1.X/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java

Modified: ode/branches/APACHE_ODE_1.X/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java?rev=714258&r1=714257&r2=714258&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.X/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java (original)
+++ ode/branches/APACHE_ODE_1.X/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java Sat Nov 15 05:52:08 2008
@@ -69,7 +69,7 @@
     long _immediateInterval = 30000;
 
     /**
-     * Jobs sccheduled with a time that is between (now+immediateInterval,now+nearFutureInterval) will be assigned to the current
+     * Jobs scheduled with a time that is between (now+immediateInterval,now+nearFutureInterval) will be assigned to the current
      * node, but will not be placed on the todo queue (the promoter will pick them up).
      */
     long _nearFutureInterval = 10 * 60 * 1000;
@@ -77,6 +77,13 @@
     /** 10s of no communication and you are deemed dead. */
     long _staleInterval = 10000;
 
+    /**
+     * Estimated sustained transaction per second capacity of the system.
+     * e.g. 100 means the system can process 100 jobs per seconds, on average
+     * This number is used to determine how many jobs to load from the database at once.
+     */
+    int _tps = 100;
+
     TransactionManager _txm;
 
     ExecutorService _exec;
@@ -132,6 +139,10 @@
         _nearFutureInterval = nearFutureInterval;
     }
 
+    public void setTransactionPerSecond(int tps) {
+        _tps = tps;
+    }
+
     public void setTransactionManager(TransactionManager txm) {
         _txm = txm;
     }
@@ -227,25 +238,20 @@
 
         try {
             if (immediate) {
-                // If we have too many jobs in the queue, we don't allow any new ones
-                if (_todo.size() > _todoLimit) {
-                    __log.error("The execution queue is backed up, the engine can't keep up with the load. Either " +
-                            "increase the queue size or regulate the flow.");
-                    return null;
-                }
-
                 // Immediate scheduling means we put it in the DB for safe keeping
                 _db.insertJob(job, _nodeId, true);
+                
                 // And add it to our todo list .
-                addTodoOnCommit(job);
-
+                if (_todo.size() < _todoLimit) {
+                    addTodoOnCommit(job);
+                }
                 __log.debug("scheduled immediate job: " + job.jobId);
             } else if (nearfuture) {
                 // Near future, assign the job to ourselves (why? -- this makes it very unlikely that we
                 // would get two nodes trying to process the same instance, which causes unsightly rollbacks).
                 _db.insertJob(job, _nodeId, false);
                 __log.debug("scheduled near-future job: " + job.jobId);
-            } else /* far future */{
+            } else /* far future */ {
                 // Not the near future, we don't assign a node-id, we'll assign it later.
                 _db.insertJob(job, null, false);
                 __log.debug("scheduled far-future job: " + job.jobId);
@@ -303,18 +309,19 @@
             throw new ContextException("Error retrieving node list.", ex);
         }
 
+        long now = System.currentTimeMillis();
+        
         // Pretend we got a heartbeat...
-        for (String s : _knownNodes)
-            _lastHeartBeat.put(s, System.currentTimeMillis());
+        for (String s : _knownNodes) _lastHeartBeat.put(s, now);
 
         // schedule immediate job loading for now!
-        _todo.enqueue(new LoadImmediateTask(System.currentTimeMillis()));
+        _todo.enqueue(new LoadImmediateTask(now));
 
         // schedule check for stale nodes, make it random so that the nodes don't overlap.
-        _todo.enqueue(new CheckStaleNodes(System.currentTimeMillis() + (long) (_random.nextDouble() * _staleInterval)));
+        _todo.enqueue(new CheckStaleNodes(now + (long) (_random.nextDouble() * _staleInterval + (_staleInterval/2))));
 
         // do the upgrade sometime (random) in the immediate interval.
-        _todo.enqueue(new UpgradeJobsTask(System.currentTimeMillis() + (long) (_random.nextDouble() * _immediateInterval)));
+        _todo.enqueue(new UpgradeJobsTask(now + (long) (_random.nextDouble() * _immediateInterval + (_immediateInterval/2))));
 
         _todo.start();
         _running = true;
@@ -347,10 +354,25 @@
                     try {
                         execTransaction(new Callable<Void>() {
                             public Void call() throws Exception {
-                                _jobProcessor.onScheduledJob(jobInfo);
                                 if (job.persisted)
                                     if (!_db.deleteJob(job.jobId, _nodeId))
                                         throw new JobNoLongerInDbException(job.jobId,_nodeId);
+                                    
+                                try {
+                                    _jobProcessor.onScheduledJob(jobInfo);
+                                } catch (JobProcessorException jpe) {
+                                    if (jpe.retry) {
+                                        int retry = job.detail.get("retry") != null ? (((Integer)job.detail.get("retry")) + 1) : 0;
+                                        if (retry <= 10) {
+                                            long delay = doRetry(job);
+                                            __log.error("Error while processing transaction, retrying in " + delay + "s");
+                                        } else {
+                                            __log.error("Error while processing transaction after 10 retries, no more retries:"+job);
+                                        }
+                                    } else {
+                                        __log.error("Error while processing transaction, no retry.", jpe);
+                                    }
+                                }
                                 return null;
                             }
                         });
@@ -358,11 +380,6 @@
                         // This may happen if two node try to do the same job... we try to avoid
                         // it the synchronization is a best-effort but not perfect.
                         __log.debug("job no longer in db forced rollback.");
-                    } catch (JobProcessorException jpe) {
-                        if (jpe.retry)
-                            __log.error("Error while processing transaction, retrying in " + doRetry(job) + "s");
-                        else
-                            __log.error("Error while processing transaction, no retry.", jpe);
                     } catch (Exception ex) {
                         __log.error("Error while executing transaction", ex);
                     }
@@ -418,21 +435,25 @@
 
     boolean doLoadImmediate() {
         __log.debug("LOAD IMMEDIATE started");
+        
+        // don't load anything if we're already half-full;  we've got plenty to do already
+        if (_todo.size() > _todoLimit/2) return true;
+        
         List<Job> jobs;
         try {
-            do {
-                jobs = execTransaction(new Callable<List<Job>>() {
-                    public List<Job> call() throws Exception {
-                        return _db.dequeueImmediate(_nodeId, System.currentTimeMillis() + _immediateInterval, 10);
-                    }
-                });
-                for (Job j : jobs) {
-                    if (__log.isDebugEnabled())
-                        __log.debug("todo.enqueue job from db: " + j.jobId + " for " + j.schedDate);
+            final int batch = (int) (_immediateInterval * _tps / 1000);
+            jobs = execTransaction(new Callable<List<Job>>() {
+                public List<Job> call() throws Exception {
+                    return _db.dequeueImmediate(_nodeId, System.currentTimeMillis() + _immediateInterval, batch);
+                }
+            });
+            for (Job j : jobs) {
+                if (__log.isDebugEnabled())
+                    __log.debug("todo.enqueue job from db: " + j.jobId + " for " + j.schedDate);
 
+                if (_todo.size() < _todoLimit) 
                     _todo.enqueue(j);
-                }
-            } while (jobs.size() == 10);
+            }
             return true;
         } catch (Exception ex) {
             __log.error("Error loading immediate jobs from database.", ex);
@@ -540,9 +561,9 @@
                 success = doLoadImmediate();
             } finally {
                 if (success)
-                    _todo.enqueue(new LoadImmediateTask(System.currentTimeMillis() + (long) (_immediateInterval * .75)));
+                    _todo.enqueue(new LoadImmediateTask(System.currentTimeMillis() + (long) (_immediateInterval * .90)));
                 else
-                    _todo.enqueue(new LoadImmediateTask(System.currentTimeMillis() + 100));
+                    _todo.enqueue(new LoadImmediateTask(System.currentTimeMillis() + 1000));
             }
         }
 
@@ -576,7 +597,7 @@
             try {
                 success = doUpgrade();
             } finally {
-                long future = System.currentTimeMillis() + (success ? (long) (_nearFutureInterval * .50) : 100);
+                long future = System.currentTimeMillis() + (success ? (long) (_nearFutureInterval * .50) : 1000);
                 _nextUpgrade.set(future);
                 _todo.enqueue(new UpgradeJobsTask(future));
                 __log.debug("UPGRADE completed, success = " + success + "; next time in " + (future - ctime) + "ms");
@@ -599,8 +620,11 @@
             __log.debug("CHECK STALE NODES started");
             for (String nodeId : _knownNodes) {
                 Long lastSeen = _lastHeartBeat.get(nodeId);
-                if (lastSeen == null || (System.currentTimeMillis() - lastSeen) > _staleInterval)
+                if ((lastSeen == null || (System.currentTimeMillis() - lastSeen) > _staleInterval)
+                    && !_nodeId.equals(nodeId))
+                {
                     recoverStaleNode(nodeId);
+                }
             }
         }