You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by bo...@apache.org on 2009/09/24 00:09:51 UTC

svn commit: r818279 - /ode/branches/APACHE_ODE_1.X/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java

Author: boisvert
Date: Wed Sep 23 22:09:50 2009
New Revision: 818279

URL: http://svn.apache.org/viewvc?rev=818279&view=rev
Log:
-Fix issue with retry logic and rollback 
-Fix issue where a job could be dispatched twice (concurrently) under load
-Improve stability of scheduler under load by preventing overload 
(with contributions from Alexis Midon and Matthieu Riou)

Modified:
    ode/branches/APACHE_ODE_1.X/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java

Modified: ode/branches/APACHE_ODE_1.X/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java?rev=818279&r1=818278&r2=818279&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.X/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java (original)
+++ ode/branches/APACHE_ODE_1.X/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java Wed Sep 23 22:09:50 2009
@@ -38,24 +38,25 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.ode.bpel.iapi.ContextException;
 import org.apache.ode.bpel.iapi.Scheduler;
+import org.apache.log4j.helpers.AbsoluteTimeDateFormat;
 
 /**
- * A reliable and relatively simple scheduler that uses a database to persist information about 
+ * A reliable and relatively simple scheduler that uses a database to persist information about
  * scheduled tasks.
- * 
+ *
  * The challenge is to achieve high performance in a small memory footprint without loss of reliability
  * while supporting distributed/clustered configurations.
- * 
- * The design is based around three time horizons: "immediate", "near future", and "everything else". 
+ *
+ * The design is based around three time horizons: "immediate", "near future", and "everything else".
  * Immediate jobs (i.e. jobs that are about to be up) are written to the database and kept in
  * an in-memory priority queue. When they execute, they are removed from the database. Near future
  * jobs are placed in the database and assigned to the current node, however they are not stored in
  * memory. Periodically jobs are "upgraded" from near-future to immediate status, at which point they
- * get loaded into memory. Jobs that are further out in time, are placed in the database without a 
+ * get loaded into memory. Jobs that are further out in time, are placed in the database without a
  * node identifer; when they are ready to be "upgraded" to near-future jobs they are assigned to one
- * of the known live nodes. Recovery is rather straighforward, with stale node identifiers being 
- * reassigned to known good nodes.       
- * 
+ * of the known live nodes. Recovery is rather straighforward, with stale node identifiers being
+ * reassigned to known good nodes.
+ *
  * @author Maciej Szefler ( m s z e f l e r @ g m a i l . c o m )
  *
  */
@@ -77,6 +78,9 @@
     /** 10s of no communication and you are deemed dead. */
     long _staleInterval = 10000;
 
+    /** Duration used to log a warning if a job scheduled at a date D is queued at D'>D+_warningDelay */
+    long _warningDelay = 5*60*1000;
+
     /**
      * Estimated sustained transaction per second capacity of the system.
      * e.g. 100 means the system can process 100 jobs per seconds, on average
@@ -97,7 +101,7 @@
     volatile JobProcessor _jobProcessor;
 
     volatile JobProcessor _polledRunnableProcessor;
-    
+
     private SchedulerThread _todo;
 
     private DatabaseDelegate _db;
@@ -121,7 +125,7 @@
     private Random _random = new Random();
 
     private long _pollIntervalForPolledRunnable = Long.getLong("org.apache.ode.polledRunnable.pollInterval", 10 * 60 * 1000);
-    
+
     public SimpleScheduler(String nodeId, DatabaseDelegate del, Properties conf) {
         _nodeId = nodeId;
         _db = del;
@@ -130,9 +134,10 @@
         _nearFutureInterval = getLongProperty(conf, "ode.scheduler.nearFutureInterval", _nearFutureInterval);
         _staleInterval = getLongProperty(conf, "ode.scheduler.staleInterval", _staleInterval);
         _tps = getIntProperty(conf, "ode.scheduler.transactionsPerSecond", _tps);
+        _warningDelay =  getLongProperty(conf, "ode.scheduler.warningDelay", _warningDelay);
         _todo = new SchedulerThread(this);
     }
-    
+
     public void setPollIntervalForPolledRunnable(long pollIntervalForPolledRunnable) {
         _pollIntervalForPolledRunnable = pollIntervalForPolledRunnable;
     }
@@ -148,7 +153,7 @@
         if (s != null) return Long.parseLong(s);
         else return defaultValue;
     }
-        
+
     public void setNodeId(String nodeId) {
         _nodeId = nodeId;
     }
@@ -218,7 +223,7 @@
             String errmsg = "Internal Error, could not begin transaction.";
             throw new ContextException(errmsg, ex);
         }
-        
+
         boolean success = false;
         try {
             T retval = transaction.call();
@@ -278,7 +283,7 @@
         Map<String, Object> jobDetails = new HashMap<String, Object>();
         jobDetails.put("runnable", runnable);
         runnable.storeToDetailsMap(jobDetails);
-        
+
         if (__log.isDebugEnabled())
             __log.debug("scheduling " + jobDetails + " for " + when);
 
@@ -292,9 +297,9 @@
             if (immediate) {
                 // Immediate scheduling means we put it in the DB for safe keeping
                 _db.insertJob(job, _nodeId, true);
-                
+
                 // And add it to our todo list .
-                if (_todo.size() < _todoLimit) {
+                if (_outstandingJobs.size() < _todoLimit) {
                     addTodoOnCommit(job);
                 }
                 __log.debug("scheduled immediate job: " + job.jobId);
@@ -362,7 +367,7 @@
         }
 
         long now = System.currentTimeMillis();
-        
+
         // Pretend we got a heartbeat...
         for (String s : _knownNodes) _lastHeartBeat.put(s, now);
 
@@ -378,11 +383,11 @@
         _todo.start();
         _running = true;
     }
-    
+
     private long randomMean(long mean) {
         return (long) _random.nextDouble() * mean + (mean/2);
     }
-        
+
     public synchronized void stop() {
         if (!_running)
             return;
@@ -393,6 +398,10 @@
         _todo.clearTasks(CheckStaleNodes.class);
         _outstandingJobs.clear();
 
+        // disable because this is not the right way to do it
+        // will be fixed by ODE-595
+        // graceful shutdown; any new submits will throw RejectedExecutionExceptions
+//        _exec.shutdown();
         _running = false;
     }
 
@@ -409,24 +418,18 @@
             public Void call() throws Exception {
                 try {
                     if (job.transacted) {
+                        final Object[] needRetry = new Object[] { false };
                         try {
                             execTransaction(new Callable<Void>() {
                                 public Void call() throws Exception {
                                     if (job.persisted)
                                         if (!_db.deleteJob(job.jobId, _nodeId))
                                             throw new JobNoLongerInDbException(job.jobId,_nodeId);
-                                        
                                     try {
                                         _jobProcessor.onScheduledJob(jobInfo);
                                     } catch (JobProcessorException jpe) {
                                         if (jpe.retry) {
-                                            int retry = job.detail.get("retry") != null ? (((Integer)job.detail.get("retry")) + 1) : 0;
-                                            if (retry <= 10) {
-                                                long delay = doRetry(job);
-                                                __log.error("Error while processing transaction, retrying in " + delay + "s");
-                                            } else {
-                                                __log.error("Error while processing transaction after 10 retries, no more retries:"+job);
-                                            }
+                                            needRetry[0] = true;
                                         } else {
                                             __log.error("Error while processing transaction, no retry.", jpe);
                                         }
@@ -440,15 +443,36 @@
                             // This may happen if two node try to do the same job... we try to avoid
                             // it the synchronization is a best-effort but not perfect.
                             __log.debug("job no longer in db forced rollback.");
-                        } catch (Exception ex) {
-                            __log.error("Error while executing transaction", ex);
+                        } catch (final Exception ex) {
+                            // We only get here if the above execTransaction fails, so that transaction got
+                            // rollbacked already
+                            execTransaction(new Callable<Void>() {
+                                public Void call() throws Exception {
+                                    if ((Boolean)needRetry[0]) {
+                                        int retry = job.detail.get("retry") != null ? (((Integer)job.detail.get("retry")) + 1) : 0;
+                                        if (retry <= 10) {
+                                            long delay = doRetry(job);
+                                            __log.error("Error while processing transaction, retrying in " + delay + "s");
+                                        } else {
+                                            __log.error("Error while processing transaction after 10 retries, no more retries:"+job);
+                                        }
+                                    }
+
+                                    // We got rollbacked, as we schedule a retry we want to be sure the original kob is
+                                    // really deleted
+                                    if (job.persisted) _db.deleteJob(job.jobId, _nodeId);
+
+                                    __log.error("Error while executing transaction", ex);
+                                    return null;
+                                }
+                            });
                         }
                     } else {
                         _jobProcessor.onScheduledJob(jobInfo);
                     }
                     return null;
                 } finally {
-                     _outstandingJobs.remove(job.jobId);
+                    _outstandingJobs.remove(job.jobId);
                 }
             }
         });
@@ -469,7 +493,7 @@
      * <li>5. System powered off and restarts; the poller job does not know what the status
      * of the runnable. This is handled just like the case #1.</li>
      * </ul>
-     * 
+     *
      * There is at least one re-scheduling of the poller job. Since, the runnable's state is
      * not persisted, and the same runnable may be tried again after system failure,
      * the runnable that's used with this polling should be repeatable.
@@ -487,7 +511,7 @@
                         public Void call() throws Exception {
                             if (!_db.deleteJob(job.jobId, _nodeId))
                                 throw new JobNoLongerInDbException(job.jobId,_nodeId);
-                            
+
                             try {
                                 _polledRunnableProcessor.onScheduledJob(jobInfo);
                                 if( !"COMPLETED".equals(String.valueOf(jobInfo.jobDetail.get("runnable_status"))) ) {
@@ -530,7 +554,7 @@
             }
         });
     }
-    
+
     private void addTodoOnCommit(final Job job) {
         registerSynchronizer(new Synchronizer() {
             public void afterCompletion(boolean success) {
@@ -578,27 +602,48 @@
 
     boolean doLoadImmediate() {
         __log.debug("LOAD IMMEDIATE started");
-        
+
         // don't load anything if we're already half-full;  we've got plenty to do already
-        if (_todo.size() > _todoLimit/2) return true;
-        
+        if (_outstandingJobs.size() > _todoLimit/2) return true;
+
         List<Job> jobs;
         try {
-            final int batch = (int) (_immediateInterval * _tps / 1000);
+            // don't load more than we can chew
+            final int batch = Math.min((int) (_immediateInterval * _tps / 1000), _todoLimit-_outstandingJobs.size());
+
+            // jobs might have been enqueued by #addTodoOnCommit meanwhile
+            if (batch<=0) {
+                if (__log.isDebugEnabled()) __log.debug("Max capacity reached: "+_outstandingJobs.size()+" jobs dispacthed i.e. queued or being executed");
+                return true;
+            }
+
+            if (__log.isDebugEnabled()) __log.debug("loading "+batch+" jobs from db");
             jobs = execTransaction(new Callable<List<Job>>() {
                 public List<Job> call() throws Exception {
                     return _db.dequeueImmediate(_nodeId, System.currentTimeMillis() + _immediateInterval, batch);
                 }
             });
+            long delayedTime = System.currentTimeMillis() - _warningDelay;
+            int delayedCount = 0;
+            boolean runningLate;
+            AbsoluteTimeDateFormat f = new AbsoluteTimeDateFormat();
             for (Job j : jobs) {
-                if (__log.isDebugEnabled())
-                    __log.debug("todo.enqueue job from db: " + j.jobId + " for " + j.schedDate);
-
-                if (_todo.size() >= _todoLimit)
+                // jobs might have been enqueued by #addTodoOnCommit meanwhile
+                if (_outstandingJobs.size() >= _todoLimit){
+                    if (__log.isDebugEnabled()) __log.debug("Max capacity reached: "+_outstandingJobs.size()+" jobs dispacthed i.e. queued or being executed");
                     break;
-                
+                }
+                runningLate = j.schedDate <= delayedTime;
+                if (runningLate) {
+                    delayedCount++;
+                }
+                if (__log.isDebugEnabled())
+                    __log.debug("todo.enqueue job from db: " + j.jobId + " for " + j.schedDate + "(" + f.format(j.schedDate)+") "+(runningLate?" delayed=true":""));
                 enqueue(j);
             }
+            if (delayedCount > 0) {
+                __log.warn("Dispatching jobs with more than "+(_warningDelay/60000)+" minutes delay. Either the server was down for some time or the job load is greater than available capacity");
+            }
             return true;
         } catch (Exception ex) {
             __log.error("Error loading immediate jobs from database.", ex);
@@ -609,16 +654,13 @@
     }
 
     void enqueue(Job job) {
-        Long outstanding = _outstandingJobs.get(job.jobId);
-        if (outstanding != null && System.currentTimeMillis()-outstanding.longValue() > 60*60*1000) {
-            __log.error("Stale outstanding job: "+job.jobId);
-            outstanding = null;
-        }
+        Long outstanding = _outstandingJobs.putIfAbsent(job.jobId, job.schedDate);
         if (outstanding == null) {
-            _outstandingJobs.put(job.jobId, System.currentTimeMillis());
-            _todo.enqueue(job);
-        } else {
-            __log.info("Outstanding job: "+job.jobId);
+            if (job.schedDate <= System.currentTimeMillis()) {
+                runJob(job);
+            } else {
+                _todo.enqueue(job);
+            }
         }
     }