You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by bo...@apache.org on 2009/10/27 23:13:25 UTC
svn commit: r830365 - in
/ode/branches/APACHE_ODE_1.X/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple:
DatabaseDelegate.java JdbcDelegate.java SimpleScheduler.java
Author: boisvert
Date: Tue Oct 27 22:13:24 2009
New Revision: 830365
URL: http://svn.apache.org/viewvc?rev=830365&view=rev
Log:
1) Fixed issue where job would not be retried if SQLException happened during db.deleteJob() during
2) Fixed issue where jobs would not be re-dispatched once they go into _processSinceLastLoadTask be
_outstandingJobs.putIfAbsent() was checked before and has side-effect
3) When retrying jobs, simply UPDATE the job instead of DELETE + INSERT
Modified:
ode/branches/APACHE_ODE_1.X/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/DatabaseDelegate.java
ode/branches/APACHE_ODE_1.X/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/JdbcDelegate.java
ode/branches/APACHE_ODE_1.X/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
Modified: ode/branches/APACHE_ODE_1.X/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/DatabaseDelegate.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/DatabaseDelegate.java?rev=830365&r1=830364&r2=830365&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.X/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/DatabaseDelegate.java (original)
+++ ode/branches/APACHE_ODE_1.X/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/DatabaseDelegate.java Tue Oct 27 22:13:24 2009
@@ -38,6 +38,14 @@
boolean insertJob(Job job, String nodeId, boolean loaded) throws DatabaseException ;
/**
+ * Update the job in the database (only updates timestamp and details)
+ *
+ * @param job the job
+ * @throws DatabaseException in case of error
+ */
+ boolean updateJob(Job job) throws DatabaseException;
+
+ /**
* Delete a job from the database.
* @param jobid job identifier
* @param nodeId node identifier
Modified: ode/branches/APACHE_ODE_1.X/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/JdbcDelegate.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/JdbcDelegate.java?rev=830365&r1=830364&r2=830365&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.X/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/JdbcDelegate.java (original)
+++ ode/branches/APACHE_ODE_1.X/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/JdbcDelegate.java Tue Oct 27 22:13:24 2009
@@ -52,6 +52,8 @@
private static final String UPDATE_REASSIGN = "update ODE_JOB set nodeid = ?, scheduled = 0 where nodeid = ?";
+ private static final String UPDATE_JOB = "update ODE_JOB set ts = ?, details = ? where jobid = ?";
+
private static final String UPGRADE_JOB_DEFAULT = "update ODE_JOB set nodeid = ? where nodeid is null "
+ "and mod(ts,?) = ? and ts < ?";
@@ -159,6 +161,34 @@
}
}
+ public boolean updateJob(Job job) throws DatabaseException {
+ if (__log.isDebugEnabled())
+ __log.debug("updateJob " + job.jobId + " details=" + job);
+
+ Connection con = null;
+ PreparedStatement ps = null;
+ try {
+ con = getConnection();
+ ps = con.prepareStatement(UPDATE_JOB);
+ ps.setLong(1, job.schedDate);
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ try {
+ StreamUtils.write(bos, (Serializable) job.detail);
+ } catch (Exception ex) {
+ __log.error("Error serializing job detail: " + job.detail);
+ throw new DatabaseException(ex);
+ }
+ ps.setBytes(2, bos.toByteArray());
+ ps.setString(3, job.jobId);
+ return ps.executeUpdate() == 1;
+ } catch (SQLException se) {
+ throw new DatabaseException(se);
+ } finally {
+ close(ps);
+ close(con);
+ }
+ }
+
@SuppressWarnings("unchecked")
public List<Job> dequeueImmediate(String nodeId, long maxtime, int maxjobs) throws DatabaseException {
ArrayList<Job> ret = new ArrayList<Job>(maxjobs);
Modified: ode/branches/APACHE_ODE_1.X/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java?rev=830365&r1=830364&r2=830365&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.X/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java (original)
+++ ode/branches/APACHE_ODE_1.X/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java Tue Oct 27 22:13:24 2009
@@ -478,7 +478,7 @@
final Scheduler.JobInfo jobInfo = new Scheduler.JobInfo(job.jobId, job.detail,
(Integer) (job.detail.get("retry") != null ? job.detail.get("retry") : 0));
if (job.transacted) {
- final boolean[] needRetry = new boolean[]{false};
+ final boolean[] needRetry = new boolean[]{true};
try {
execTransaction(new Callable<Void>() {
public Void call() throws Exception {
@@ -499,10 +499,9 @@
_db.insertJob(job, _nodeId, false);
}
} catch (JobProcessorException jpe) {
- if (jpe.retry) {
- needRetry[0] = true;
- } else {
- __log.error("Error while processing transaction, no retry.", jpe);
+ if (!jpe.retry) {
+ needRetry[0] = false;
+ __log.error("Error while processing job, no retry: "+job, jpe);
}
// Let execTransaction know that shit happened.
throw jpe;
@@ -515,11 +514,32 @@
// it the synchronization is a best-effort but not perfect.
__log.debug("job no longer in db forced rollback.");
} catch (final Exception ex) {
- __log.error("Error while executing transaction", ex);
+ __log.error("Error while executing job: "+job, ex);
// We only get here if the above execTransaction fails, so that transaction got
// rollbacked already
- execTransaction(new Retry(job, needRetry[0]));
+ if (job.persisted) {
+ execTransaction(new Callable<Void>() {
+ public Void call() throws Exception {
+ if (needRetry[0]) {
+ int retry = job.detail.get("retry") != null ? (((Integer) job.detail.get("retry")) + 1) : 0;
+ if (retry <= 10) {
+ job.detail.put("retry", retry);
+ long delay = (long)(Math.pow(5, retry));
+ job.schedDate = System.currentTimeMillis() + delay*1000;
+ _db.updateJob(job);
+ __log.error("Error while processing job, retrying in " + delay + "s");
+ } else {
+ _db.deleteJob(job.jobId, _nodeId);
+ __log.error("Error while processing job after 10 retries, no more retries:" + job);
+ }
+ } else {
+ _db.deleteJob(job.jobId, _nodeId);
+ }
+ return null;
+ }
+ });
+ }
}
} else {
processor.onScheduledJob(jobInfo);
@@ -568,44 +588,6 @@
_exec.submit(new RunJob(job, _polledRunnableProcessor));
}
- class Retry implements Callable<Void> {
- final Job job;
- final boolean needRetry;
-
- Retry(Job job, boolean needRetry) {
- this.job = job;
- this.needRetry = needRetry;
- }
-
- public Void call() throws Exception {
- if (needRetry) {
- int retry = job.detail.get("retry") != null ? (((Integer) job.detail.get("retry")) + 1) : 0;
- if (retry <= 10) {
- long delay = doRetry(job);
- __log.error("Error while processing transaction, retrying in " + delay + "s");
- } else {
- __log.error("Error while processing transaction after 10 retries, no more retries:" + job);
- }
- }
-
- // We got rollbacked, as we schedule a retry we want to be sure the original job is
- // really deleted
- if (job.persisted) _db.deleteJob(job.jobId, _nodeId);
-
- return null;
- }
-
- private long doRetry(Job job) throws DatabaseException {
- int retry = job.detail.get("retry") != null ? (((Integer)job.detail.get("retry")) + 1) : 0;
- job.detail.put("retry", retry);
- long delay = (long)(Math.pow(5, retry));
- Job jobRetry = new Job(System.currentTimeMillis() + delay*1000, true, job.detail);
- _db.insertJob(jobRetry, _nodeId, false);
- return delay;
- }
-
- }
-
private void addTodoOnCommit(final Job job) {
registerSynchronizer(new Synchronizer() {
public void afterCompletion(boolean success) {
@@ -725,16 +707,18 @@
}
void enqueue(Job job) {
- boolean not_outstanding = _outstandingJobs.putIfAbsent(job.jobId, job.schedDate) == null;
- boolean not_processed = _processedSinceLastLoadTask.get(job.jobId) == null;
- if (not_outstanding && not_processed) {
- if (job.schedDate <= System.currentTimeMillis()) {
- runJob(job);
+ if (_processedSinceLastLoadTask.get(job.jobId) == null) {
+ if (_outstandingJobs.putIfAbsent(job.jobId, job.schedDate) == null) {
+ if (job.schedDate <= System.currentTimeMillis()) {
+ runJob(job);
+ } else {
+ _todo.enqueue(job);
+ }
} else {
- _todo.enqueue(job);
+ if (__log.isDebugEnabled()) __log.debug("Job "+job.jobId+" is being processed (outstanding job)");
}
- }else if(__log.isDebugEnabled()){
- __log.debug("Job "+job.jobId+" is already queued or processed");
+ } else {
+ if (__log.isDebugEnabled()) __log.debug("Job "+job.jobId+" is being processed (processed since last load)");
}
}