You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by se...@apache.org on 2009/06/02 19:42:49 UTC
svn commit: r781093 [4/5] - in /ode/trunk: ./
axis2-war/src/test/java/org/apache/ode/axis2/
axis2-war/src/test/java/org/apache/ode/axis2/instancecleanup/
axis2-war/src/test/java/org/apache/ode/dao/jpa/
axis2-war/src/test/java/org/apache/ode/daohib/bpel...
Added: ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/cron/CronScheduler.java
URL: http://svn.apache.org/viewvc/ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/cron/CronScheduler.java?rev=781093&view=auto
==============================================================================
--- ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/cron/CronScheduler.java (added)
+++ ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/cron/CronScheduler.java Tue Jun 2 17:42:46 2009
@@ -0,0 +1,296 @@
+package org.apache.ode.bpel.engine.cron;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+
+import javax.xml.namespace.QName;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ode.bpel.engine.Contexts;
+import org.apache.ode.bpel.engine.BpelServerImpl.ContextsAware;
+import org.apache.ode.bpel.iapi.ClusterAware;
+import org.apache.ode.bpel.iapi.ProcessConf;
+import org.apache.ode.bpel.iapi.ProcessConf.CronJob;
+import org.apache.ode.bpel.iapi.Scheduler.MapSerializableRunnable;
+import org.apache.ode.utils.CronExpression;
+
+public class CronScheduler {
+ static final Log __log = LogFactory.getLog(CronScheduler.class);
+
+ // minimum interval of the cron job(1 second)
+ private final long MIN_INTERVAL = 0;
+ // if the work is schedule too late from the due time, skip it
+ private final long TOLERABLE_SCHEDULE_DELAY = 0;
+
+ private ExecutorService _scheduledTaskExec;
+
+ private Contexts _contexts;
+
+ private final Timer _schedulerTimer = new Timer("CronScheduler", true);
+
+ private final Collection<TerminationListener> _systemTerminationListeners = new ArrayList<TerminationListener>();
+
+ private final Map<QName, Collection<TerminationListener>> _terminationListenersByPid = new HashMap<QName, Collection<CronScheduler.TerminationListener>>();
+
+ private volatile boolean _shuttingDown = false;
+
+ public void setScheduledTaskExec(ExecutorService taskExec) {
+ _scheduledTaskExec = taskExec;
+ }
+
+ public void setContexts(Contexts _contexts) {
+ this._contexts = _contexts;
+ }
+
+ public void shutdown() {
+ _shuttingDown = true;
+ _schedulerTimer.cancel();
+ }
+
+ public void cancelProcessCronJobs(QName pid, boolean undeployed) {
+ assert pid != null;
+
+ if( __log.isInfoEnabled() ) __log.info("Cancelling PROCESS CRON jobs for: " + pid);
+ Collection<TerminationListener> listenersToTerminate = new ArrayList<TerminationListener>();
+
+ synchronized( _terminationListenersByPid ) {
+ Collection<TerminationListener> listeners = _terminationListenersByPid.get(pid);
+ if( listeners != null ) {
+ listenersToTerminate.addAll(listeners);
+ }
+ if( undeployed ) {
+ _terminationListenersByPid.remove(pid);
+ }
+ }
+
+ // terminate existing cron jobs if there are
+ synchronized( pid ) {
+ for( TerminationListener listener : listenersToTerminate ) {
+ listener.terminate();
+ }
+ }
+ }
+
+ public void scheduleProcessCronJobs(QName pid, ProcessConf pconf) {
+ if( _shuttingDown ) {
+ return;
+ }
+ assert pid != null;
+
+ cancelProcessCronJobs(pid, false);
+ Collection<TerminationListener> newListeners = new ArrayList<TerminationListener>();
+
+ synchronized( pid ) {
+ if( __log.isInfoEnabled() ) __log.info("Scheduling PROCESS CRON jobs for: " + pid);
+
+ // start new cron jobs
+ for( final CronJob job : pconf.getCronJobs() ) {
+ if( __log.isDebugEnabled() ) __log.debug("Scheduling PROCESS CRON job: " + job.getCronExpression() + " for: " + pid);
+ // for each different scheduled time
+ Runnable runnable = new Runnable() {
+ public void run() {
+ if( __log.isDebugEnabled() ) __log.debug("Running cron cleanup with details list size: " + job.getRunnableDetailList().size());
+ for( Map<String, Object> details : job.getRunnableDetailList() ) {
+ try {
+ // for each clean up for the scheduled time
+ RuntimeDataCleanupRunnable cleanup = new RuntimeDataCleanupRunnable();
+ cleanup.restoreFromDetailsMap(details);
+ cleanup.setContexts(_contexts);
+ cleanup.run();
+ if( __log.isDebugEnabled() ) __log.debug("Finished running runtime data cleanup from a PROCESS CRON job: " + cleanup);
+ } catch(Exception re) {
+ __log.error("Error during runtime data cleanup from a PROCESS CRON: " + details + "; check your cron settings in deploy.xml.", re);
+ // don't sweat.. the rest of the system and other cron jobs still should work
+ }
+ }
+ }
+ };
+ newListeners.add(schedule(job.getCronExpression(), runnable, null, null));
+ }
+ }
+
+ // make sure the pid does not get into the terminationListener map if no cron is setup
+ if( !newListeners.isEmpty() ) {
+ synchronized( _terminationListenersByPid ) {
+ Collection<TerminationListener> oldListeners = _terminationListenersByPid.get(pid);
+ if( oldListeners == null ) {
+ _terminationListenersByPid.put(pid, newListeners);
+ } else {
+ oldListeners.addAll(newListeners);
+ }
+ }
+ }
+ }
+
+ public void refreshSystemCronJobs(SystemSchedulesConfig systemSchedulesConf) {
+ if( _shuttingDown ) {
+ return;
+ }
+
+ synchronized( _systemTerminationListeners) {
+ if( __log.isInfoEnabled() ) __log.info("Refreshing SYSTEM CRON jobs.");
+
+ try {
+ // if error thrown on reading the schedules.xml, do not cancel existing cron jobs
+ List<CronJob> systemCronJobs = systemSchedulesConf.getSystemCronJobs();
+
+ // cancel cron jobs
+ for( TerminationListener listener : _systemTerminationListeners ) {
+ listener.terminate();
+ }
+ _systemTerminationListeners.clear();
+
+ // start new cron jobs
+ for( final CronJob job : systemCronJobs ) {
+ if( __log.isDebugEnabled() ) __log.debug("Scheduling SYSTEM CRON job:" + job);
+ // for each different scheduled time
+ Runnable runnable = new Runnable() {
+ public void run() {
+ for( Map<String, Object> details : job.getRunnableDetailList() ) {
+ try {
+ // for now, we have only runtime data cleanup cron job defined
+ // for each clean up for the scheduled time
+ RuntimeDataCleanupRunnable cleanup = new RuntimeDataCleanupRunnable();
+ synchronized( _terminationListenersByPid ) {
+ if( !_terminationListenersByPid.isEmpty() ) {
+ details.put("pidsToExclude", _terminationListenersByPid.keySet());
+ }
+ }
+ cleanup.restoreFromDetailsMap(details);
+ cleanup.setContexts(_contexts);
+ cleanup.run();
+ if( __log.isDebugEnabled() ) __log.debug("Finished running runtime data cleanup from a SYSTEM CRON job:" + cleanup);
+ } catch( Exception e ) {
+ __log.error("Error running a runtime data cleanup from a SYSTEM CRON job: " + details + "; check your system cron setup.", e);
+ }
+ }
+ }
+ };
+ _systemTerminationListeners.add(schedule(job.getCronExpression(), runnable, null, null));
+ }
+ } catch( Exception e ) {
+ __log.error("Error during refreshing SYSTEM CRON schedules: ", e);
+ }
+ }
+ }
+
+ public TerminationListener schedule(final CronExpression cronExpression,
+ final Runnable runnable, final Map<String, Object> runnableDetails,
+ TerminationListener terminationListener) {
+ if( _shuttingDown ) {
+ __log.info("CRON Scheduler is being shut down. This new scheduling request is ignored.");
+ return new TerminationListener() {
+ public void terminate() {
+ // do nothing
+ }
+ };
+ }
+
+ assert cronExpression != null;
+ assert runnable != null;
+
+ final Date nextScheduleTime = cronExpression.getNextValidTimeAfter(new Date(
+ System.currentTimeMillis() + MIN_INTERVAL));
+ final CronScheduledJob job = new CronScheduledJob(nextScheduleTime, runnable, runnableDetails, cronExpression, terminationListener);
+ if( __log.isDebugEnabled() ) __log.debug("CRON will run in " + (nextScheduleTime.getTime() - System.currentTimeMillis()) + "ms.");
+
+ try {
+ _schedulerTimer.schedule(new TimerTask() {
+ @Override
+ public void run() {
+ __log.debug("Cron scheduling timer kicked in: " + cronExpression);
+ // run only if the current node is the coordinator,
+ // with the SimpleScheduler, the node is always the coordinator
+ if( !(_contexts.scheduler instanceof ClusterAware)
+ || ((ClusterAware)_contexts.scheduler).amICoordinator() ) {
+ // do not hold the timer thread too long, submit the work to an executorService
+ _scheduledTaskExec.submit(job);
+ __log.debug("CRON job scheduled " + runnable);
+ }
+ }
+ }, nextScheduleTime);
+ } catch( IllegalStateException ise ) {
+ if( _shuttingDown ) {
+ __log.info("CRON Scheduler is being shut down. This new scheduling request is ignored.");
+ } else {
+ throw ise;
+ }
+ }
+
+ return job.terminationListener;
+ }
+
+ public interface TerminationListener {
+ void terminate();
+ }
+
+ private class CronScheduledJob implements Callable<TerminationListener> {
+ private volatile boolean terminated = false;
+ private Date nextScheduleTime;
+ private Runnable runnable;
+ private Map<String, Object> runnableDetails;
+ private CronExpression cronExpression;
+ private TerminationListener terminationListener;
+
+ public CronScheduledJob(Date nextScheduleTime,
+ Runnable runnable, Map<String, Object> runnableDetails,
+ CronExpression cronExpression, TerminationListener terminationListener) {
+ this.nextScheduleTime = nextScheduleTime;
+ this.runnable = runnable;
+ this.runnableDetails = runnableDetails;
+ this.cronExpression = cronExpression;
+ if( terminationListener == null ) {
+ terminationListener = new TerminationListener() {
+ public void terminate() {
+ terminated = true;
+ }
+ };
+ }
+ this.terminationListener = terminationListener;
+ }
+
+ public TerminationListener call() throws Exception {
+ try {
+ if( TOLERABLE_SCHEDULE_DELAY == 0 ||
+ nextScheduleTime.getTime() < System.currentTimeMillis() + TOLERABLE_SCHEDULE_DELAY) {
+ if( runnableDetails != null &&
+ runnable instanceof MapSerializableRunnable ) {
+ ((MapSerializableRunnable)runnable).restoreFromDetailsMap(runnableDetails);
+ }
+ if (runnable instanceof ContextsAware) {
+ ((ContextsAware) runnable).setContexts(_contexts);
+ }
+ if( !_shuttingDown && !terminated ) {
+ __log.debug("Running CRON job: " + runnable + " for " + nextScheduleTime.getTime());
+ runnable.run();
+ }
+ } else {
+ // ignore the scheduling.. it will be scheduled later
+ }
+ } catch( Exception e ) {
+ if( _shuttingDown ) {
+ __log.info("A cron job threw an Exception during ODE shutdown: " + e.getMessage() + ", you can ignore the error.");
+ } else if( e instanceof RuntimeException ) {
+ throw e;
+ } else {
+ throw new RuntimeException("Exception during running cron scheduled job: " + runnable, e);
+ }
+ } finally {
+ if( !_shuttingDown && !terminated ) {
+ schedule(cronExpression, runnable, runnableDetails, terminationListener);
+ }
+ }
+
+ return terminationListener;
+ }
+ }
+}
\ No newline at end of file
Added: ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/cron/RuntimeDataCleanupRunnable.java
URL: http://svn.apache.org/viewvc/ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/cron/RuntimeDataCleanupRunnable.java?rev=781093&view=auto
==============================================================================
--- ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/cron/RuntimeDataCleanupRunnable.java (added)
+++ ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/cron/RuntimeDataCleanupRunnable.java Tue Jun 2 17:42:46 2009
@@ -0,0 +1,99 @@
+package org.apache.ode.bpel.engine.cron;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+
+import javax.xml.namespace.QName;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ode.bpel.common.InstanceFilter;
+import org.apache.ode.bpel.dao.BpelDAOConnection;
+import org.apache.ode.bpel.dao.FilteredInstanceDeletable;
+import org.apache.ode.bpel.engine.Contexts;
+import org.apache.ode.bpel.engine.BpelServerImpl.ContextsAware;
+import org.apache.ode.bpel.iapi.ProcessConf.CLEANUP_CATEGORY;
+import org.apache.ode.bpel.iapi.ProcessConf.CleanupInfo;
+import org.apache.ode.bpel.iapi.Scheduler.MapSerializableRunnable;
+
+public class RuntimeDataCleanupRunnable implements MapSerializableRunnable, ContextsAware {
+ private final Log _log = LogFactory.getLog(RuntimeDataCleanupRunnable.class);
+
+ private static final long serialVersionUID = 1L;
+
+ private transient Contexts _contexts;
+
+ private int _transactionSize;
+ private CleanupInfo _cleanupInfo;
+ private QName _pid;
+ private Set<QName> _pidsToExclude;
+
+ public RuntimeDataCleanupRunnable() {
+ }
+
+ @SuppressWarnings("unchecked")
+ public void restoreFromDetailsMap(Map<String, Object> details) {
+ _cleanupInfo = (CleanupInfo)details.get("cleanupInfo");
+ _transactionSize = (Integer)details.get("transactionSize");
+ _pid = (QName)details.get("pid");
+ _pidsToExclude = (Set<QName>)details.get("pidsToExclude");
+ }
+
+ public void storeToDetailsMap(Map<String, Object> details) {
+ // we don't serialize
+ }
+
+ public void setContexts(Contexts contexts) {
+ _contexts = contexts;
+ }
+
+ public void run() {
+ _log.info("CleanInstances.run().");
+ for( String filter : _cleanupInfo.getFilters() ) {
+ _log.info("CleanInstances.run(" + filter + ")");
+ long numberOfDeletedInstances = 0;
+ do {
+ numberOfDeletedInstances = cleanInstances(filter, _cleanupInfo.getCategories(), _transactionSize);
+ } while( numberOfDeletedInstances == _transactionSize );
+ }
+ }
+
+ int cleanInstances(String filter, final Set<CLEANUP_CATEGORY> categories, int limit) {
+ if( _pid != null ) {
+ filter += " pid=" + _pid;
+ } else if( _pidsToExclude != null ) {
+ StringBuffer pids = new StringBuffer();
+ for( QName pid : _pidsToExclude ) {
+ if( pids.length() > 0 ) {
+ pids.append("|");
+ }
+ pids.append(pid);
+ }
+ filter += " pid<>" + pids.toString();
+ }
+
+ _log.debug("CleanInstances using filter: " + filter + ", limit: " + limit);
+
+ final InstanceFilter instanceFilter = new InstanceFilter(filter, "", limit);
+ try {
+ if( _contexts.scheduler != null ) {
+ return _contexts.execTransaction(new Callable<Integer>() {
+ public Integer call() throws Exception {
+ BpelDAOConnection con = _contexts.dao.getConnection();
+ if( con instanceof FilteredInstanceDeletable ) {
+ return ((FilteredInstanceDeletable)con).deleteInstances(instanceFilter, categories);
+ }
+ return 0;
+ }
+ });
+ } else {
+ return 0;
+ }
+ } catch (RuntimeException re) {
+ throw re;
+ } catch (Exception e) {
+ throw new RuntimeException("Exception while listing instances: ", e);
+ }
+ }
+}
\ No newline at end of file
Added: ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/cron/SystemSchedulesConfig.java
URL: http://svn.apache.org/viewvc/ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/cron/SystemSchedulesConfig.java?rev=781093&view=auto
==============================================================================
--- ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/cron/SystemSchedulesConfig.java (added)
+++ ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/cron/SystemSchedulesConfig.java Tue Jun 2 17:42:46 2009
@@ -0,0 +1,103 @@
+package org.apache.ode.bpel.engine.cron;
+
+import java.io.File;
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ode.bpel.schedules.SchedulesDocument;
+import org.apache.ode.bpel.schedules.TSchedule;
+import org.apache.ode.bpel.dd.TCleanup;
+import org.apache.ode.bpel.iapi.ContextException;
+import org.apache.ode.bpel.iapi.ProcessConf.CleanupInfo;
+import org.apache.ode.bpel.iapi.ProcessConf.CronJob;
+import org.apache.ode.store.ProcessCleanupConfImpl;
+import org.apache.ode.utils.CronExpression;
+import org.apache.xmlbeans.XmlOptions;
+
+public class SystemSchedulesConfig {
+ private final static Log __log = LogFactory.getLog(SystemSchedulesConfig.class);
+
+ public final static String SCHEDULE_CONFIG_FILE_PROP_KEY = "org.apache.ode.scheduleConfigFile";
+ private File schedulesFile;
+
+ public SystemSchedulesConfig(File configRoot) {
+ String scheduleConfigFile = System.getProperty(SCHEDULE_CONFIG_FILE_PROP_KEY);
+ if( scheduleConfigFile != null ) {
+ schedulesFile = new File(scheduleConfigFile);
+ if( !new File(scheduleConfigFile).exists()) {
+ __log.warn("A custom location for schedules has been set. However, the file does not exist at the location: "
+ + schedulesFile.getAbsolutePath() + ". The file will be read when one gets created.");
+ }
+ } else {
+ assert configRoot != null;
+ schedulesFile = new File(configRoot, "schedules.xml");
+ }
+ __log.info("SYSTEM CRON configuration: " + schedulesFile.getAbsolutePath());
+ }
+
+ public File getSchedulesFile() {
+ return schedulesFile;
+ }
+
+ /**
+ * Returns the list of cron jobs configured for all processes. This call returns
+ * a fresh snapshot.
+ *
+ * @return the list of cron jobs
+ */
+ public List<CronJob> getSystemCronJobs() {
+ List<CronJob> jobs = new ArrayList<CronJob>();
+
+ if( schedulesFile != null && schedulesFile.exists() ) {
+ for(TSchedule schedule : getSystemSchedulesDocument().getSchedules().getScheduleList()) {
+ CronJob job = new CronJob();
+ try {
+ job.setCronExpression(new CronExpression(schedule.getWhen()));
+ for(final TCleanup aCleanup : schedule.getCleanupList()) {
+ CleanupInfo cleanupInfo = new CleanupInfo();
+ assert !aCleanup.getFilterList().isEmpty();
+ cleanupInfo.setFilters(aCleanup.getFilterList());
+ ProcessCleanupConfImpl.processACleanup(cleanupInfo.getCategories(), aCleanup.getCategoryList());
+
+ Map<String, Object> runnableDetails = new HashMap<String, Object>();
+ runnableDetails.put("cleanupInfo", cleanupInfo);
+ runnableDetails.put("transactionSize", 10);
+ job.getRunnableDetailList().add(runnableDetails);
+ __log.info("SYSTEM CRON configuration added a runtime data cleanup: " + runnableDetails);
+ }
+ jobs.add(job);
+ } catch( ParseException pe ) {
+ __log.error("Exception during parsing the schedule cron expression: " + schedule.getWhen() + ", skipped the scheduled job.", pe);
+ }
+ }
+ }
+
+ __log.info("SYSTEM CRON configuration found cron jobs: " + jobs);
+ return jobs;
+ }
+
+ @SuppressWarnings("unchecked")
+ private SchedulesDocument getSystemSchedulesDocument() {
+ SchedulesDocument sd = null;
+
+ try {
+ XmlOptions options = new XmlOptions();
+ HashMap otherNs = new HashMap();
+
+ otherNs.put("http://ode.fivesight.com/schemas/2006/06/27/dd",
+ "http://www.apache.org/ode/schemas/schedules/2009/05");
+ options.setLoadSubstituteNamespaces(otherNs);
+ sd = SchedulesDocument.Factory.parse(schedulesFile, options);
+ } catch (Exception e) {
+ throw new ContextException("Couldn't read schedule descriptor at location "
+ + schedulesFile.getAbsolutePath(), e);
+ }
+
+ return sd;
+ }
+}
\ No newline at end of file
Modified: ode/trunk/engine/src/main/java/org/apache/ode/bpel/memdao/BpelDAOConnectionImpl.java
URL: http://svn.apache.org/viewvc/ode/trunk/engine/src/main/java/org/apache/ode/bpel/memdao/BpelDAOConnectionImpl.java?rev=781093&r1=781092&r2=781093&view=diff
==============================================================================
--- ode/trunk/engine/src/main/java/org/apache/ode/bpel/memdao/BpelDAOConnectionImpl.java (original)
+++ ode/trunk/engine/src/main/java/org/apache/ode/bpel/memdao/BpelDAOConnectionImpl.java Tue Jun 2 17:42:46 2009
@@ -18,6 +18,7 @@
*/
package org.apache.ode.bpel.memdao;
+import java.io.Serializable;
import java.util.*;
import java.util.concurrent.atomic.AtomicLong;
@@ -68,6 +69,12 @@
return _store.get(processId);
}
+ public ProcessDAO createTransientProcess(Serializable id) {
+ ProcessDaoImpl process = new ProcessDaoImpl(this, _store, null, null, (String)id, 0);
+
+ return process;
+ }
+
public ProcessDAO createProcess(QName pid, QName type, String guid, long version) {
ProcessDaoImpl process = new ProcessDaoImpl(this, _store, pid, type, guid, version);
_store.put(pid, process);
@@ -83,6 +90,7 @@
return null;
}
+ @SuppressWarnings("unchecked")
public Collection<ProcessInstanceDAO> instanceQuery(InstanceFilter filter) {
if (filter.getLimit() == 0) {
return Collections.EMPTY_LIST;
Modified: ode/trunk/engine/src/main/java/org/apache/ode/bpel/memdao/ProcessDaoImpl.java
URL: http://svn.apache.org/viewvc/ode/trunk/engine/src/main/java/org/apache/ode/bpel/memdao/ProcessDaoImpl.java?rev=781093&r1=781092&r2=781093&view=diff
==============================================================================
--- ode/trunk/engine/src/main/java/org/apache/ode/bpel/memdao/ProcessDaoImpl.java (original)
+++ ode/trunk/engine/src/main/java/org/apache/ode/bpel/memdao/ProcessDaoImpl.java Tue Jun 2 17:42:46 2009
@@ -172,7 +172,7 @@
}
}
- public void delete() {
+ public void deleteProcessAndRoutes() {
_store.remove(_processId);
}
Modified: ode/trunk/il-common/src/main/java/org/apache/ode/il/MockScheduler.java
URL: http://svn.apache.org/viewvc/ode/trunk/il-common/src/main/java/org/apache/ode/il/MockScheduler.java?rev=781093&r1=781092&r2=781093&view=diff
==============================================================================
--- ode/trunk/il-common/src/main/java/org/apache/ode/il/MockScheduler.java (original)
+++ ode/trunk/il-common/src/main/java/org/apache/ode/il/MockScheduler.java Tue Jun 2 17:42:46 2009
@@ -24,6 +24,7 @@
import java.util.List;
import java.util.Map;
import java.util.Timer;
+import java.util.TimerTask;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -88,6 +89,21 @@
return new GUID().toString();
}
+ public String scheduleMapSerializableRunnable(final MapSerializableRunnable runnable, final Date when) throws ContextException {
+ registerSynchronizer(new Synchronization() {
+ public void afterCompletion(int status) {
+ _timer.schedule(new TimerTask() {
+ public void run() {
+ runnable.run();
+ }
+ }, when != null ? when : new Date());
+ }
+ public void beforeCompletion() { }
+ });
+
+ return null;
+ }
+
public void cancelJob(String arg0) throws ContextException {
}
Modified: ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
URL: http://svn.apache.org/viewvc/ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java?rev=781093&r1=781092&r2=781093&view=diff
==============================================================================
--- ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java (original)
+++ ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java Tue Jun 2 17:42:46 2009
@@ -175,11 +175,27 @@
if (__log.isDebugEnabled())
__log.debug("scheduling " + jobDetail + " for " + when);
- boolean immediate = when.getTime() <= ctime + _immediateInterval;
- boolean nearfuture = !immediate && when.getTime() <= ctime + _nearFutureInterval;
+ return schedulePersistedJob(new Job(when.getTime(), true, jobDetail), when, ctime);
+ }
+
+ public String scheduleMapSerializableRunnable(MapSerializableRunnable runnable, Date when) throws ContextException {
+ long ctime = System.currentTimeMillis();
+ if (when == null)
+ when = new Date(ctime);
+
+ Map<String, Object> jobDetails = new HashMap<String, Object>();
+ jobDetails.put("runnable", runnable);
+ runnable.storeToDetailsMap(jobDetails);
+
+ if (__log.isDebugEnabled())
+ __log.debug("scheduling " + jobDetails + " for " + when);
- Job job = new Job(when.getTime(), true, jobDetail);
+ return schedulePersistedJob(new Job(when.getTime(), true, jobDetails), when, ctime);
+ }
+ public String schedulePersistedJob(Job job, Date when, long ctime) throws ContextException {
+ boolean immediate = when.getTime() <= ctime + _immediateInterval;
+ boolean nearfuture = !immediate && when.getTime() <= ctime + _nearFutureInterval;
try {
if (immediate) {
// If we have too many jobs in the queue, we don't allow any new ones