You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by bo...@apache.org on 2009/10/27 23:13:25 UTC

svn commit: r830365 - in /ode/branches/APACHE_ODE_1.X/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple: DatabaseDelegate.java JdbcDelegate.java SimpleScheduler.java

Author: boisvert
Date: Tue Oct 27 22:13:24 2009
New Revision: 830365

URL: http://svn.apache.org/viewvc?rev=830365&view=rev
Log:
1) Fixed issue where job would not be retried if SQLException happened during db.deleteJob() during
2) Fixed issue where jobs would not be re-dispatched once they go into _processSinceLastLoadTask be
   _outstandingJobs.putIfAbsent() was checked before and has side-effect
3) When retrying jobs, simply UPDATE the job instead of DELETE + INSERT


Modified:
    ode/branches/APACHE_ODE_1.X/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/DatabaseDelegate.java
    ode/branches/APACHE_ODE_1.X/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/JdbcDelegate.java
    ode/branches/APACHE_ODE_1.X/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java

Modified: ode/branches/APACHE_ODE_1.X/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/DatabaseDelegate.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/DatabaseDelegate.java?rev=830365&r1=830364&r2=830365&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.X/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/DatabaseDelegate.java (original)
+++ ode/branches/APACHE_ODE_1.X/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/DatabaseDelegate.java Tue Oct 27 22:13:24 2009
@@ -38,6 +38,14 @@
     boolean insertJob(Job job, String nodeId, boolean loaded) throws DatabaseException ;
 
     /**
+     * Update the job in the database (only updates timestamp and details)
+     *
+     * @param job the job
+     * @throws DatabaseException in case of error
+     */
+    boolean updateJob(Job job) throws DatabaseException;
+
+    /**
      * Delete a job from the database.
      * @param jobid job identifier 
      * @param nodeId node identifier

Modified: ode/branches/APACHE_ODE_1.X/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/JdbcDelegate.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/JdbcDelegate.java?rev=830365&r1=830364&r2=830365&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.X/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/JdbcDelegate.java (original)
+++ ode/branches/APACHE_ODE_1.X/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/JdbcDelegate.java Tue Oct 27 22:13:24 2009
@@ -52,6 +52,8 @@
 
     private static final String UPDATE_REASSIGN = "update ODE_JOB set nodeid = ?, scheduled = 0 where nodeid = ?";
 
+    private static final String UPDATE_JOB = "update ODE_JOB set ts = ?, details = ? where jobid = ?";
+
     private static final String UPGRADE_JOB_DEFAULT = "update ODE_JOB set nodeid = ? where nodeid is null "
             + "and mod(ts,?) = ? and ts < ?";
 
@@ -159,6 +161,34 @@
         }
     }
 
+    public boolean updateJob(Job job) throws DatabaseException {
+        if (__log.isDebugEnabled())
+            __log.debug("updateJob " + job.jobId + " details=" + job);
+
+        Connection con = null;
+        PreparedStatement ps = null;
+        try {
+            con = getConnection();
+            ps = con.prepareStatement(UPDATE_JOB);
+            ps.setLong(1, job.schedDate);
+            ByteArrayOutputStream bos = new ByteArrayOutputStream();
+            try {
+                StreamUtils.write(bos, (Serializable) job.detail);
+            } catch (Exception ex) {
+                __log.error("Error serializing job detail: " + job.detail);
+                throw new DatabaseException(ex);
+            }
+            ps.setBytes(2, bos.toByteArray());
+            ps.setString(3, job.jobId);
+            return ps.executeUpdate() == 1;
+        } catch (SQLException se) {
+            throw new DatabaseException(se);
+        } finally {
+            close(ps);
+            close(con);
+        }
+    }
+
     @SuppressWarnings("unchecked")
     public List<Job> dequeueImmediate(String nodeId, long maxtime, int maxjobs) throws DatabaseException {
         ArrayList<Job> ret = new ArrayList<Job>(maxjobs);

Modified: ode/branches/APACHE_ODE_1.X/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java?rev=830365&r1=830364&r2=830365&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.X/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java (original)
+++ ode/branches/APACHE_ODE_1.X/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java Tue Oct 27 22:13:24 2009
@@ -478,7 +478,7 @@
                 final Scheduler.JobInfo jobInfo = new Scheduler.JobInfo(job.jobId, job.detail,
                         (Integer) (job.detail.get("retry") != null ? job.detail.get("retry") : 0));
                 if (job.transacted) {
-                    final boolean[] needRetry = new boolean[]{false};
+                    final boolean[] needRetry = new boolean[]{true};
                     try {
                         execTransaction(new Callable<Void>() {
                             public Void call() throws Exception {
@@ -499,10 +499,9 @@
                                         _db.insertJob(job, _nodeId, false);
                                     }
                                 } catch (JobProcessorException jpe) {
-                                    if (jpe.retry) {
-                                        needRetry[0] = true;
-                                    } else {
-                                        __log.error("Error while processing transaction, no retry.", jpe);
+                                    if (!jpe.retry) {
+                                        needRetry[0] = false;
+                                        __log.error("Error while processing job, no retry: "+job, jpe);
                                     }
                                     // Let execTransaction know that shit happened.
                                     throw jpe;
@@ -515,11 +514,32 @@
                         // it the synchronization is a best-effort but not perfect.
                         __log.debug("job no longer in db forced rollback.");
                     } catch (final Exception ex) {
-                        __log.error("Error while executing transaction", ex);
+                        __log.error("Error while executing job: "+job, ex);
 
                         // We only get here if the above execTransaction fails, so that transaction got
                         // rollbacked already
-                        execTransaction(new Retry(job, needRetry[0]));
+                        if (job.persisted) {
+                            execTransaction(new Callable<Void>() {
+                                public Void call() throws Exception {
+                                    if (needRetry[0]) {
+                                        int retry = job.detail.get("retry") != null ? (((Integer) job.detail.get("retry")) + 1) : 0;
+                                        if (retry <= 10) {
+                                            job.detail.put("retry", retry);
+                                            long delay = (long)(Math.pow(5, retry));
+                                            job.schedDate = System.currentTimeMillis() + delay*1000;
+                                            _db.updateJob(job);
+                                            __log.error("Error while processing job, retrying in " + delay + "s");
+                                        } else {
+                                            _db.deleteJob(job.jobId, _nodeId);
+                                            __log.error("Error while processing job after 10 retries, no more retries:" + job);
+                                        }
+                                    } else {
+                                        _db.deleteJob(job.jobId, _nodeId);
+                                    }
+                                    return null;
+                                }
+                            });
+                        }
                     }
                 } else {
                     processor.onScheduledJob(jobInfo);
@@ -568,44 +588,6 @@
          _exec.submit(new RunJob(job, _polledRunnableProcessor));
     }
 
-    class Retry implements Callable<Void> {
-        final Job job;
-        final boolean needRetry;
-
-        Retry(Job job, boolean needRetry) {
-            this.job = job;
-            this.needRetry = needRetry;
-        }
-
-        public Void call() throws Exception {
-            if (needRetry) {
-                int retry = job.detail.get("retry") != null ? (((Integer) job.detail.get("retry")) + 1) : 0;
-                if (retry <= 10) {
-                    long delay = doRetry(job);
-                    __log.error("Error while processing transaction, retrying in " + delay + "s");
-                } else {
-                    __log.error("Error while processing transaction after 10 retries, no more retries:" + job);
-                }
-            }
-
-            // We got rollbacked, as we schedule a retry we want to be sure the original job is
-            // really deleted
-            if (job.persisted) _db.deleteJob(job.jobId, _nodeId);
-
-            return null;
-        }
-
-        private long doRetry(Job job) throws DatabaseException {
-          int retry = job.detail.get("retry") != null ? (((Integer)job.detail.get("retry")) + 1) : 0;
-          job.detail.put("retry", retry);
-          long delay = (long)(Math.pow(5, retry));
-          Job jobRetry = new Job(System.currentTimeMillis() + delay*1000, true, job.detail);
-          _db.insertJob(jobRetry, _nodeId, false);
-          return delay;
-        }
-
-    }
-
     private void addTodoOnCommit(final Job job) {
         registerSynchronizer(new Synchronizer() {
             public void afterCompletion(boolean success) {
@@ -725,16 +707,18 @@
     }
 
     void enqueue(Job job) {
-        boolean not_outstanding = _outstandingJobs.putIfAbsent(job.jobId, job.schedDate) == null;
-        boolean not_processed = _processedSinceLastLoadTask.get(job.jobId) == null;
-        if (not_outstanding && not_processed) {
-            if (job.schedDate <= System.currentTimeMillis()) {
-                runJob(job);
+        if (_processedSinceLastLoadTask.get(job.jobId) == null) {
+            if (_outstandingJobs.putIfAbsent(job.jobId, job.schedDate) == null) {
+                if (job.schedDate <= System.currentTimeMillis()) {
+                    runJob(job);
+                } else {
+                    _todo.enqueue(job);
+                }
             } else {
-                _todo.enqueue(job);
+              if (__log.isDebugEnabled()) __log.debug("Job "+job.jobId+" is being processed (outstanding job)");
             }
-        }else if(__log.isDebugEnabled()){
-            __log.debug("Job "+job.jobId+" is already queued or processed");
+        } else {
+            if (__log.isDebugEnabled()) __log.debug("Job "+job.jobId+" is being processed (processed since last load)");
         }
     }