You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by se...@apache.org on 2009/06/02 19:42:49 UTC

svn commit: r781093 [4/5] - in /ode/trunk: ./ axis2-war/src/test/java/org/apache/ode/axis2/ axis2-war/src/test/java/org/apache/ode/axis2/instancecleanup/ axis2-war/src/test/java/org/apache/ode/dao/jpa/ axis2-war/src/test/java/org/apache/ode/daohib/bpel...

Added: ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/cron/CronScheduler.java
URL: http://svn.apache.org/viewvc/ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/cron/CronScheduler.java?rev=781093&view=auto
==============================================================================
--- ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/cron/CronScheduler.java (added)
+++ ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/cron/CronScheduler.java Tue Jun  2 17:42:46 2009
@@ -0,0 +1,296 @@
+package org.apache.ode.bpel.engine.cron;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+
+import javax.xml.namespace.QName;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ode.bpel.engine.Contexts;
+import org.apache.ode.bpel.engine.BpelServerImpl.ContextsAware;
+import org.apache.ode.bpel.iapi.ClusterAware;
+import org.apache.ode.bpel.iapi.ProcessConf;
+import org.apache.ode.bpel.iapi.ProcessConf.CronJob;
+import org.apache.ode.bpel.iapi.Scheduler.MapSerializableRunnable;
+import org.apache.ode.utils.CronExpression;
+
+public class CronScheduler {
+    static final Log __log = LogFactory.getLog(CronScheduler.class);
+
+    // minimum interval of the cron job(1 second)
+    private final long MIN_INTERVAL = 0;
+    // if the work is schedule too late from the due time, skip it
+    private final long TOLERABLE_SCHEDULE_DELAY = 0;
+
+    private ExecutorService _scheduledTaskExec;
+    
+    private Contexts _contexts;
+    
+    private final Timer _schedulerTimer = new Timer("CronScheduler", true);
+
+    private final Collection<TerminationListener> _systemTerminationListeners = new ArrayList<TerminationListener>();
+    
+    private final Map<QName, Collection<TerminationListener>> _terminationListenersByPid = new HashMap<QName, Collection<CronScheduler.TerminationListener>>();
+    
+    private volatile boolean _shuttingDown = false;
+    
+    public void setScheduledTaskExec(ExecutorService taskExec) {
+        _scheduledTaskExec = taskExec;
+    }
+    
+    public void setContexts(Contexts _contexts) {
+        this._contexts = _contexts;
+    }
+    
+    public void shutdown() {
+        _shuttingDown = true;
+        _schedulerTimer.cancel();
+    }
+    
+    public void cancelProcessCronJobs(QName pid, boolean undeployed) {
+        assert pid != null;
+        
+        if( __log.isInfoEnabled() ) __log.info("Cancelling PROCESS CRON jobs for: " + pid);
+        Collection<TerminationListener> listenersToTerminate = new ArrayList<TerminationListener>();
+
+        synchronized( _terminationListenersByPid ) {
+            Collection<TerminationListener> listeners = _terminationListenersByPid.get(pid);
+            if( listeners != null ) {
+                listenersToTerminate.addAll(listeners);
+            }
+            if( undeployed ) {
+                _terminationListenersByPid.remove(pid);
+            }
+        }
+        
+        // terminate existing cron jobs if there are
+        synchronized( pid ) {
+            for( TerminationListener listener : listenersToTerminate ) {
+                listener.terminate();
+            }
+        }
+    }
+    
+    public void scheduleProcessCronJobs(QName pid, ProcessConf pconf) {
+        if( _shuttingDown ) {
+            return;
+        }
+        assert pid != null;
+        
+        cancelProcessCronJobs(pid, false);
+        Collection<TerminationListener> newListeners = new ArrayList<TerminationListener>();
+        
+        synchronized( pid ) {
+            if( __log.isInfoEnabled() ) __log.info("Scheduling PROCESS CRON jobs for: " + pid);
+            
+            // start new cron jobs
+            for( final CronJob job : pconf.getCronJobs() ) {
+                if( __log.isDebugEnabled() ) __log.debug("Scheduling PROCESS CRON job: " + job.getCronExpression() + " for: " + pid);
+                // for each different scheduled time
+                Runnable runnable = new Runnable() {
+                    public void run() {
+                        if( __log.isDebugEnabled() ) __log.debug("Running cron cleanup with details list size: " + job.getRunnableDetailList().size());
+                        for( Map<String, Object> details : job.getRunnableDetailList() ) {
+                            try {
+                                // for each clean up for the scheduled time
+                                RuntimeDataCleanupRunnable cleanup = new RuntimeDataCleanupRunnable();
+                                cleanup.restoreFromDetailsMap(details);
+                                cleanup.setContexts(_contexts);
+                                cleanup.run();
+                                if( __log.isDebugEnabled() ) __log.debug("Finished running runtime data cleanup from a PROCESS CRON job: " + cleanup);
+                            } catch(Exception re) {
+                                __log.error("Error during runtime data cleanup from a PROCESS CRON: " + details + "; check your cron settings in deploy.xml.", re);
+                                // don't sweat.. the rest of the system and other cron jobs still should work
+                            }
+                        }
+                    }
+                };
+                newListeners.add(schedule(job.getCronExpression(), runnable, null, null));
+            }
+        }
+        
+        // make sure the pid does not get into the terminationListener map if no cron is setup 
+        if( !newListeners.isEmpty() ) {
+            synchronized( _terminationListenersByPid ) {
+                Collection<TerminationListener> oldListeners = _terminationListenersByPid.get(pid);
+                if( oldListeners == null ) {
+                    _terminationListenersByPid.put(pid, newListeners);
+                } else {
+                    oldListeners.addAll(newListeners);
+                }
+            }
+        }
+    }
+    
+    public void refreshSystemCronJobs(SystemSchedulesConfig systemSchedulesConf) {
+        if( _shuttingDown ) {
+            return;
+        }
+
+        synchronized( _systemTerminationListeners) {
+            if( __log.isInfoEnabled() ) __log.info("Refreshing SYSTEM CRON jobs.");
+
+            try {
+                // if error thrown on reading the schedules.xml, do not cancel existing cron jobs
+                List<CronJob> systemCronJobs = systemSchedulesConf.getSystemCronJobs();
+                
+                // cancel cron jobs
+                for( TerminationListener listener : _systemTerminationListeners ) {
+                    listener.terminate();
+                }
+                _systemTerminationListeners.clear();
+                
+                // start new cron jobs
+                for( final CronJob job : systemCronJobs ) {
+                    if( __log.isDebugEnabled() ) __log.debug("Scheduling SYSTEM CRON job:" + job);
+                    // for each different scheduled time
+                    Runnable runnable = new Runnable() {
+                        public void run() {
+                            for( Map<String, Object> details : job.getRunnableDetailList() ) {
+                                try {
+                                    // for now, we have only runtime data cleanup cron job defined
+                                    // for each clean up for the scheduled time
+                                    RuntimeDataCleanupRunnable cleanup = new RuntimeDataCleanupRunnable();
+                                    synchronized( _terminationListenersByPid ) {
+                                        if( !_terminationListenersByPid.isEmpty() ) {
+                                            details.put("pidsToExclude", _terminationListenersByPid.keySet());
+                                        }
+                                    }
+                                    cleanup.restoreFromDetailsMap(details);
+                                    cleanup.setContexts(_contexts);
+                                    cleanup.run();
+                                    if( __log.isDebugEnabled() ) __log.debug("Finished running runtime data cleanup from a SYSTEM CRON job:" + cleanup);
+                                } catch( Exception e ) {
+                                    __log.error("Error running a runtime data cleanup from a SYSTEM CRON job: " + details + "; check your system cron setup.", e);
+                                }
+                            }
+                        }
+                    };
+                    _systemTerminationListeners.add(schedule(job.getCronExpression(), runnable, null, null));
+                }
+            } catch( Exception e ) {
+                __log.error("Error during refreshing SYSTEM CRON schedules: ", e);
+            }
+        }
+    }
+
+    public TerminationListener schedule(final CronExpression cronExpression, 
+            final Runnable runnable, final Map<String, Object> runnableDetails, 
+            TerminationListener terminationListener) {
+        if( _shuttingDown ) {
+            __log.info("CRON Scheduler is being shut down. This new scheduling request is ignored.");
+            return new TerminationListener() {
+                public void terminate() {
+                    // do nothing
+                }
+            };
+        }
+        
+        assert cronExpression != null;
+        assert runnable != null;
+        
+        final Date nextScheduleTime = cronExpression.getNextValidTimeAfter(new Date(
+                System.currentTimeMillis() + MIN_INTERVAL));
+        final CronScheduledJob job = new CronScheduledJob(nextScheduleTime, runnable, runnableDetails, cronExpression, terminationListener);
+        if( __log.isDebugEnabled() ) __log.debug("CRON will run in " + (nextScheduleTime.getTime() - System.currentTimeMillis()) + "ms.");
+        
+        try {
+            _schedulerTimer.schedule(new TimerTask() {
+                @Override
+                public void run() {
+                    __log.debug("Cron scheduling timer kicked in: " + cronExpression);
+                    // run only if the current node is the coordinator, 
+                    // with the SimpleScheduler, the node is always the coordinator
+                    if( !(_contexts.scheduler instanceof ClusterAware) 
+                            || ((ClusterAware)_contexts.scheduler).amICoordinator() ) {
+                        // do not hold the timer thread too long, submit the work to an executorService
+                        _scheduledTaskExec.submit(job);
+                        __log.debug("CRON job scheduled " + runnable);
+                    }
+                }
+            }, nextScheduleTime);
+        } catch( IllegalStateException ise ) {
+            if( _shuttingDown ) {
+                __log.info("CRON Scheduler is being shut down. This new scheduling request is ignored.");
+            } else {
+                throw ise;
+            }
+        }
+
+        return job.terminationListener;
+    }
+    
+    public interface TerminationListener {
+        void terminate();
+    }
+    
+    private class CronScheduledJob implements Callable<TerminationListener> {
+        private volatile boolean terminated = false;
+        private Date nextScheduleTime;
+        private Runnable runnable;
+        private Map<String, Object> runnableDetails;
+        private CronExpression cronExpression;
+        private TerminationListener terminationListener;
+        
+        public CronScheduledJob(Date nextScheduleTime,
+                Runnable runnable, Map<String, Object> runnableDetails,
+                CronExpression cronExpression, TerminationListener terminationListener) {
+            this.nextScheduleTime = nextScheduleTime;
+            this.runnable = runnable;
+            this.runnableDetails = runnableDetails;
+            this.cronExpression = cronExpression;
+            if( terminationListener == null ) {
+                terminationListener = new TerminationListener() {
+                    public void terminate() {
+                        terminated = true;
+                    }
+                };
+            }
+            this.terminationListener = terminationListener;
+        }
+
+        public TerminationListener call() throws Exception {
+            try {
+                if( TOLERABLE_SCHEDULE_DELAY == 0 ||
+                    nextScheduleTime.getTime() < System.currentTimeMillis() + TOLERABLE_SCHEDULE_DELAY) {
+                    if( runnableDetails != null && 
+                            runnable instanceof MapSerializableRunnable ) {
+                        ((MapSerializableRunnable)runnable).restoreFromDetailsMap(runnableDetails);
+                    }
+                    if (runnable instanceof ContextsAware) {
+                        ((ContextsAware) runnable).setContexts(_contexts);
+                    }
+                    if( !_shuttingDown && !terminated ) {
+                        __log.debug("Running CRON job: " + runnable + " for " + nextScheduleTime.getTime());
+                        runnable.run();
+                    }
+                } else {
+                    // ignore the scheduling.. it will be scheduled later
+                }
+            } catch( Exception e ) {
+                if( _shuttingDown ) {
+                    __log.info("A cron job threw an Exception during ODE shutdown: " + e.getMessage() + ", you can ignore the error.");
+                } else if( e instanceof RuntimeException ) {
+                    throw e;
+                } else {
+                    throw new RuntimeException("Exception during running cron scheduled job: " + runnable, e);
+                }
+            } finally {
+                if( !_shuttingDown && !terminated ) {
+                    schedule(cronExpression, runnable, runnableDetails, terminationListener);
+                }
+            }
+            
+            return terminationListener;
+        }
+    }
+}
\ No newline at end of file

Added: ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/cron/RuntimeDataCleanupRunnable.java
URL: http://svn.apache.org/viewvc/ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/cron/RuntimeDataCleanupRunnable.java?rev=781093&view=auto
==============================================================================
--- ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/cron/RuntimeDataCleanupRunnable.java (added)
+++ ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/cron/RuntimeDataCleanupRunnable.java Tue Jun  2 17:42:46 2009
@@ -0,0 +1,99 @@
+package org.apache.ode.bpel.engine.cron;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+
+import javax.xml.namespace.QName;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ode.bpel.common.InstanceFilter;
+import org.apache.ode.bpel.dao.BpelDAOConnection;
+import org.apache.ode.bpel.dao.FilteredInstanceDeletable;
+import org.apache.ode.bpel.engine.Contexts;
+import org.apache.ode.bpel.engine.BpelServerImpl.ContextsAware;
+import org.apache.ode.bpel.iapi.ProcessConf.CLEANUP_CATEGORY;
+import org.apache.ode.bpel.iapi.ProcessConf.CleanupInfo;
+import org.apache.ode.bpel.iapi.Scheduler.MapSerializableRunnable;
+
+public class RuntimeDataCleanupRunnable implements MapSerializableRunnable, ContextsAware {
+    private final Log _log = LogFactory.getLog(RuntimeDataCleanupRunnable.class);
+
+    private static final long serialVersionUID = 1L;
+
+    private transient Contexts _contexts;
+
+    private int _transactionSize;
+    private CleanupInfo _cleanupInfo;
+    private QName _pid;
+    private Set<QName> _pidsToExclude;
+    
+    public RuntimeDataCleanupRunnable() {
+    }
+    
+    @SuppressWarnings("unchecked")
+    public void restoreFromDetailsMap(Map<String, Object> details) {
+        _cleanupInfo = (CleanupInfo)details.get("cleanupInfo");
+        _transactionSize = (Integer)details.get("transactionSize");
+        _pid = (QName)details.get("pid");
+        _pidsToExclude = (Set<QName>)details.get("pidsToExclude");
+    }
+
+    public void storeToDetailsMap(Map<String, Object> details) {
+        // we don't serialize
+    }
+
+    public void setContexts(Contexts contexts) {
+        _contexts = contexts;
+    }
+    
+    public void run() {
+        _log.info("CleanInstances.run().");
+        for( String filter : _cleanupInfo.getFilters() ) {
+            _log.info("CleanInstances.run(" + filter + ")");
+            long numberOfDeletedInstances = 0;
+            do {
+                numberOfDeletedInstances = cleanInstances(filter, _cleanupInfo.getCategories(), _transactionSize);
+            } while( numberOfDeletedInstances == _transactionSize );
+        }
+    }
+    
+    int cleanInstances(String filter, final Set<CLEANUP_CATEGORY> categories, int limit) {
+        if( _pid != null ) {
+            filter += " pid=" + _pid;
+        } else if( _pidsToExclude != null ) {
+            StringBuffer pids = new StringBuffer();
+            for( QName pid : _pidsToExclude ) {
+                if( pids.length() > 0 ) {
+                    pids.append("|");
+                }
+                pids.append(pid);
+            }
+            filter += " pid<>" + pids.toString();
+        }
+        
+        _log.debug("CleanInstances using filter: " + filter + ", limit: " + limit);
+        
+        final InstanceFilter instanceFilter = new InstanceFilter(filter, "", limit);
+        try {
+            if( _contexts.scheduler != null ) {
+                return _contexts.execTransaction(new Callable<Integer>() {
+                    public Integer call() throws Exception {
+                        BpelDAOConnection con = _contexts.dao.getConnection();
+                        if( con instanceof FilteredInstanceDeletable ) {
+                            return ((FilteredInstanceDeletable)con).deleteInstances(instanceFilter, categories);
+                        }
+                        return 0;
+                    }
+                });
+            } else {
+                return 0;
+            }
+        } catch (RuntimeException re) {
+            throw re;
+        } catch (Exception e) {
+            throw new RuntimeException("Exception while listing instances: ",  e);
+        }
+    }
+}
\ No newline at end of file

Added: ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/cron/SystemSchedulesConfig.java
URL: http://svn.apache.org/viewvc/ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/cron/SystemSchedulesConfig.java?rev=781093&view=auto
==============================================================================
--- ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/cron/SystemSchedulesConfig.java (added)
+++ ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/cron/SystemSchedulesConfig.java Tue Jun  2 17:42:46 2009
@@ -0,0 +1,103 @@
+package org.apache.ode.bpel.engine.cron;
+
+import java.io.File;
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ode.bpel.schedules.SchedulesDocument;
+import org.apache.ode.bpel.schedules.TSchedule;
+import org.apache.ode.bpel.dd.TCleanup;
+import org.apache.ode.bpel.iapi.ContextException;
+import org.apache.ode.bpel.iapi.ProcessConf.CleanupInfo;
+import org.apache.ode.bpel.iapi.ProcessConf.CronJob;
+import org.apache.ode.store.ProcessCleanupConfImpl;
+import org.apache.ode.utils.CronExpression;
+import org.apache.xmlbeans.XmlOptions;
+
+public class SystemSchedulesConfig {
+    private final static Log __log = LogFactory.getLog(SystemSchedulesConfig.class);
+    
+    public final static String SCHEDULE_CONFIG_FILE_PROP_KEY = "org.apache.ode.scheduleConfigFile";
+    private File schedulesFile;
+    
+    public SystemSchedulesConfig(File configRoot) {
+        String scheduleConfigFile = System.getProperty(SCHEDULE_CONFIG_FILE_PROP_KEY);
+        if( scheduleConfigFile != null ) {
+            schedulesFile = new File(scheduleConfigFile);
+            if( !new File(scheduleConfigFile).exists()) {
+                __log.warn("A custom location for schedules has been set. However, the file does not exist at the location: " 
+                        + schedulesFile.getAbsolutePath() + ". The file will be read when one gets created.");
+            }
+        } else {
+            assert configRoot != null;
+            schedulesFile = new File(configRoot, "schedules.xml");
+        }
+        __log.info("SYSTEM CRON configuration: " + schedulesFile.getAbsolutePath());
+    }
+    
+    public File getSchedulesFile() {
+        return schedulesFile;
+    }
+    
+    /**
+     * Returns the list of cron jobs configured for all processes. This call returns
+     * a fresh snapshot.
+     * 
+     * @return the list of cron jobs
+     */
+    public List<CronJob> getSystemCronJobs() {
+        List<CronJob> jobs = new ArrayList<CronJob>();
+        
+        if( schedulesFile != null && schedulesFile.exists() ) {
+            for(TSchedule schedule : getSystemSchedulesDocument().getSchedules().getScheduleList()) {
+                CronJob job = new CronJob();
+                try {
+                    job.setCronExpression(new CronExpression(schedule.getWhen()));
+                    for(final TCleanup aCleanup : schedule.getCleanupList()) {
+                        CleanupInfo cleanupInfo = new CleanupInfo();
+                        assert !aCleanup.getFilterList().isEmpty();
+                        cleanupInfo.setFilters(aCleanup.getFilterList());
+                        ProcessCleanupConfImpl.processACleanup(cleanupInfo.getCategories(), aCleanup.getCategoryList());
+                        
+                        Map<String, Object> runnableDetails = new HashMap<String, Object>();
+                        runnableDetails.put("cleanupInfo", cleanupInfo);
+                        runnableDetails.put("transactionSize", 10);
+                        job.getRunnableDetailList().add(runnableDetails);
+                        __log.info("SYSTEM CRON configuration added a runtime data cleanup: " + runnableDetails);
+                    }
+                    jobs.add(job);
+                } catch( ParseException pe ) {
+                    __log.error("Exception during parsing the schedule cron expression: " + schedule.getWhen() + ", skipped the scheduled job.", pe);
+                }
+            }
+        }
+        
+        __log.info("SYSTEM CRON configuration found cron jobs: " + jobs);
+        return jobs;
+    }
+
+    @SuppressWarnings("unchecked")
+    private SchedulesDocument getSystemSchedulesDocument() {
+        SchedulesDocument sd = null;
+        
+        try {
+            XmlOptions options = new XmlOptions();
+            HashMap otherNs = new HashMap();
+
+            otherNs.put("http://ode.fivesight.com/schemas/2006/06/27/dd",
+                    "http://www.apache.org/ode/schemas/schedules/2009/05");
+            options.setLoadSubstituteNamespaces(otherNs);
+            sd = SchedulesDocument.Factory.parse(schedulesFile, options);
+        } catch (Exception e) {
+            throw new ContextException("Couldn't read schedule descriptor at location "
+                    + schedulesFile.getAbsolutePath(), e);
+        }
+    
+        return sd;
+    }
+}
\ No newline at end of file

Modified: ode/trunk/engine/src/main/java/org/apache/ode/bpel/memdao/BpelDAOConnectionImpl.java
URL: http://svn.apache.org/viewvc/ode/trunk/engine/src/main/java/org/apache/ode/bpel/memdao/BpelDAOConnectionImpl.java?rev=781093&r1=781092&r2=781093&view=diff
==============================================================================
--- ode/trunk/engine/src/main/java/org/apache/ode/bpel/memdao/BpelDAOConnectionImpl.java (original)
+++ ode/trunk/engine/src/main/java/org/apache/ode/bpel/memdao/BpelDAOConnectionImpl.java Tue Jun  2 17:42:46 2009
@@ -18,6 +18,7 @@
  */
 package org.apache.ode.bpel.memdao;
 
+import java.io.Serializable;
 import java.util.*;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -68,6 +69,12 @@
         return _store.get(processId);
     }
 
+    public ProcessDAO createTransientProcess(Serializable id) {
+        ProcessDaoImpl process = new ProcessDaoImpl(this, _store, null, null, (String)id, 0);
+
+        return process;
+    }
+
     public ProcessDAO createProcess(QName pid, QName type, String guid, long version) {
         ProcessDaoImpl process = new ProcessDaoImpl(this, _store, pid, type, guid, version);
         _store.put(pid, process);
@@ -83,6 +90,7 @@
         return null;
     }
 
+    @SuppressWarnings("unchecked")
     public Collection<ProcessInstanceDAO> instanceQuery(InstanceFilter filter) {
         if (filter.getLimit() == 0) {
             return Collections.EMPTY_LIST;

Modified: ode/trunk/engine/src/main/java/org/apache/ode/bpel/memdao/ProcessDaoImpl.java
URL: http://svn.apache.org/viewvc/ode/trunk/engine/src/main/java/org/apache/ode/bpel/memdao/ProcessDaoImpl.java?rev=781093&r1=781092&r2=781093&view=diff
==============================================================================
--- ode/trunk/engine/src/main/java/org/apache/ode/bpel/memdao/ProcessDaoImpl.java (original)
+++ ode/trunk/engine/src/main/java/org/apache/ode/bpel/memdao/ProcessDaoImpl.java Tue Jun  2 17:42:46 2009
@@ -172,7 +172,7 @@
         }
     }
 
-    public void delete() {
+    public void deleteProcessAndRoutes() {
         _store.remove(_processId);
     }
 

Modified: ode/trunk/il-common/src/main/java/org/apache/ode/il/MockScheduler.java
URL: http://svn.apache.org/viewvc/ode/trunk/il-common/src/main/java/org/apache/ode/il/MockScheduler.java?rev=781093&r1=781092&r2=781093&view=diff
==============================================================================
--- ode/trunk/il-common/src/main/java/org/apache/ode/il/MockScheduler.java (original)
+++ ode/trunk/il-common/src/main/java/org/apache/ode/il/MockScheduler.java Tue Jun  2 17:42:46 2009
@@ -24,6 +24,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Timer;
+import java.util.TimerTask;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -88,6 +89,21 @@
         return new GUID().toString();
     }
 
+    public String scheduleMapSerializableRunnable(final MapSerializableRunnable runnable, final Date when) throws ContextException {
+        registerSynchronizer(new Synchronization() {
+            public void afterCompletion(int status) {
+                _timer.schedule(new TimerTask() {
+                    public void run() {
+                        runnable.run();
+                    }
+                }, when != null ? when : new Date());
+            }
+            public void beforeCompletion() { }
+        });
+
+        return null;
+    }
+
     public void cancelJob(String arg0) throws ContextException {
         
     }

Modified: ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
URL: http://svn.apache.org/viewvc/ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java?rev=781093&r1=781092&r2=781093&view=diff
==============================================================================
--- ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java (original)
+++ ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java Tue Jun  2 17:42:46 2009
@@ -175,11 +175,27 @@
         if (__log.isDebugEnabled())
             __log.debug("scheduling " + jobDetail + " for " + when);
 
-        boolean immediate = when.getTime() <= ctime + _immediateInterval;
-        boolean nearfuture = !immediate && when.getTime() <= ctime + _nearFutureInterval;
+        return schedulePersistedJob(new Job(when.getTime(), true, jobDetail), when, ctime);
+    }
+
+    public String scheduleMapSerializableRunnable(MapSerializableRunnable runnable, Date when) throws ContextException {
+        long ctime = System.currentTimeMillis();
+        if (when == null)
+            when = new Date(ctime);
+
+        Map<String, Object> jobDetails = new HashMap<String, Object>();
+        jobDetails.put("runnable", runnable);
+        runnable.storeToDetailsMap(jobDetails);
+        
+        if (__log.isDebugEnabled())
+            __log.debug("scheduling " + jobDetails + " for " + when);
 
-        Job job = new Job(when.getTime(), true, jobDetail);
+        return schedulePersistedJob(new Job(when.getTime(), true, jobDetails), when, ctime);
+    }
 
+    public String schedulePersistedJob(Job job, Date when, long ctime) throws ContextException {
+        boolean immediate = when.getTime() <= ctime + _immediateInterval;
+        boolean nearfuture = !immediate && when.getTime() <= ctime + _nearFutureInterval;
         try {
             if (immediate) {
                 // If we have too many jobs in the queue, we don't allow any new ones