You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by se...@apache.org on 2009/06/04 00:23:43 UTC

svn commit: r781611 - in /ode/trunk: ./ axis2-war/src/test/webapp/ axis2-war/src/test/webapp/WEB-INF/ axis2/src/main/java/org/apache/ode/axis2/ axis2/src/main/java/org/apache/ode/axis2/deploy/ bpel-api/src/main/java/org/apache/ode/bpel/iapi/ bpel-store...

Author: seanahn
Date: Wed Jun  3 22:23:43 2009
New Revision: 781611

URL: http://svn.apache.org/viewvc?rev=781611&view=rev
Log:
fixed system cron and process cron

Added:
    ode/trunk/axis2-war/src/test/webapp/
    ode/trunk/axis2-war/src/test/webapp/WEB-INF/
    ode/trunk/axis2-war/src/test/webapp/WEB-INF/test-schedules.xml
Modified:
    ode/trunk/Buildfile
    ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java
    ode/trunk/axis2/src/main/java/org/apache/ode/axis2/deploy/DeploymentPoller.java
    ode/trunk/bpel-api/src/main/java/org/apache/ode/bpel/iapi/ProcessStoreEvent.java
    ode/trunk/bpel-api/src/main/java/org/apache/ode/bpel/iapi/Scheduler.java
    ode/trunk/bpel-store/src/main/java/org/apache/ode/store/ProcessStoreImpl.java
    ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/BpelDAOConnectionImpl.java
    ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HProcessInstance.java
    ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java
    ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/cron/RuntimeDataCleanupRunnable.java
    ode/trunk/il-common/src/main/java/org/apache/ode/il/MockScheduler.java
    ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java

Modified: ode/trunk/Buildfile
URL: http://svn.apache.org/viewvc/ode/trunk/Buildfile?rev=781611&r1=781610&r2=781611&view=diff
==============================================================================
--- ode/trunk/Buildfile (original)
+++ ode/trunk/Buildfile Wed Jun  3 22:23:43 2009
@@ -253,6 +253,8 @@
     webapp_dir = "#{test.compile.target}/webapp"
     test.setup task(:prepare_webapp) do |task|
       cp_r _("src/main/webapp"), test.compile.target.to_s
+      rm_rf Dir[_(webapp_dir) + "/**/.svn"]
+      cp_r _("src/test/webapp"), test.compile.target.to_s
       cp Dir[_("src/main/webapp/WEB-INF/classes/*")], test.compile.target.to_s
       cp Dir[project("axis2").path_to("src/main/wsdl/*")], "#{webapp_dir}/WEB-INF"
       cp project("bpel-schemas").path_to("src/main/xsd/pmapi.xsd"), "#{webapp_dir}/WEB-INF"

Added: ode/trunk/axis2-war/src/test/webapp/WEB-INF/test-schedules.xml
URL: http://svn.apache.org/viewvc/ode/trunk/axis2-war/src/test/webapp/WEB-INF/test-schedules.xml?rev=781611&view=auto
==============================================================================
--- ode/trunk/axis2-war/src/test/webapp/WEB-INF/test-schedules.xml (added)
+++ ode/trunk/axis2-war/src/test/webapp/WEB-INF/test-schedules.xml Wed Jun  3 22:23:43 2009
@@ -0,0 +1,29 @@
+<?xml version='1.0' encoding='UTF-8'?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+
+<sched:schedules 
+        xmlns:sched="http://www.apache.org/ode/schemas/schedules/2009/05"
+        xmlns:dd="http://www.apache.org/ode/schemas/dd/2007/03">
+    <sched:schedule when="* * * * * ?">
+        <sched:cleanup>
+            <dd:filter><![CDATA[status=completed]]></dd:filter>
+        </sched:cleanup>
+    </sched:schedule>
+</sched:schedules>
\ No newline at end of file

Modified: ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java
URL: http://svn.apache.org/viewvc/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java?rev=781611&r1=781610&r2=781611&view=diff
==============================================================================
--- ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java (original)
+++ ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java Wed Jun  3 22:23:43 2009
@@ -25,6 +25,9 @@
 import java.util.Map;
 import java.util.Iterator;
 import java.util.StringTokenizer;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
 
 import javax.servlet.ServletConfig;
 import javax.servlet.ServletException;
@@ -65,6 +68,7 @@
 import org.apache.ode.bpel.dao.BpelDAOConnectionFactory;
 import org.apache.ode.bpel.engine.BpelServerImpl;
 import org.apache.ode.bpel.engine.CountLRUDehydrationPolicy;
+import org.apache.ode.bpel.engine.cron.CronScheduler;
 import org.apache.ode.bpel.evtproc.DebugBpelEventListener;
 import org.apache.ode.bpel.extvar.jdbc.JdbcExternalVariableModule;
 import org.apache.ode.bpel.iapi.BpelEventListener;
@@ -107,6 +111,10 @@
     protected TransactionManager _txMgr;
     protected BpelDAOConnectionFactory _daoCF;
     protected Scheduler _scheduler;
+
+    protected ExecutorService _executorService;
+    protected CronScheduler _cronScheduler;
+
     protected Database _db;
     private DeploymentPoller _poller;
     private MultiKeyMap _services = new MultiKeyMap();
@@ -484,10 +492,34 @@
             __log.debug("ODE initializing");
         }
 
+        ThreadFactory threadFactory = new ThreadFactory() {
+            int threadNumber = 0;
+            public Thread newThread(Runnable r) {
+                threadNumber += 1;
+                Thread t = new Thread(r, "BULK-"+threadNumber);
+                t.setDaemon(true);
+                return t;
+            }
+        };
+
+        if (_odeConfig.getThreadPoolMaxSize() == 0)
+            _executorService = Executors.newCachedThreadPool(threadFactory);
+        else
+            _executorService = Executors.newFixedThreadPool(_odeConfig.getThreadPoolMaxSize(), threadFactory);
+
         _bpelServer = new BpelServerImpl();
         _scheduler = createScheduler();
         _scheduler.setJobProcessor(_bpelServer);
 
+        _cronScheduler = new CronScheduler();
+        _cronScheduler.setScheduledTaskExec(_executorService);
+        _cronScheduler.setContexts(_bpelServer.getContexts());
+        _bpelServer.setCronScheduler(_cronScheduler);
+        BpelServerImpl.PolledRunnableProcessor polledRunnableProcessor = new BpelServerImpl.PolledRunnableProcessor();
+        polledRunnableProcessor.setPolledRunnableExecutorService(_executorService);
+        polledRunnableProcessor.setContexts(_bpelServer.getContexts());
+        _scheduler.setPolledRunnableProcesser(polledRunnableProcessor);
+        
         _bpelServer.setDaoConnectionFactory(_daoCF);
         _bpelServer.setEndpointReferenceContext(eprContext);
         _bpelServer.setMessageExchangeContext(new MessageExchangeContextImpl(this));
@@ -541,6 +573,10 @@
         return _appRoot;
     }
 
+    public File getConfigRoot() {
+        return _configRoot;
+    }
+
     /**
      * Register event listeners configured in the configuration.
      *
@@ -655,6 +691,20 @@
             default:
                 __log.debug("Ignoring store event: " + pse);
         }
+
+        ProcessConf pconf = _store.getProcessConfiguration(pse.pid);
+        if( pconf != null ) {
+            if( pse.type == ProcessStoreEvent.Type.UNDEPLOYED) {
+                __log.debug("Cancelling all cron scheduled jobs on store event: " + pse);
+                _bpelServer.getContexts().cronScheduler.cancelProcessCronJobs(pse.pid, true);
+            }
+
+            // Except for undeploy event, we need to re-schedule process dependent jobs
+            __log.debug("(Re)scheduling cron scheduled jobs on store event: " + pse);
+            if( pse.type != ProcessStoreEvent.Type.UNDEPLOYED) {
+                _bpelServer.getContexts().cronScheduler.scheduleProcessCronJobs(pse.pid, pconf);
+            }
+        }
     }
 
     // Transactional debugging stuff, to track down all these little annoying bugs.

Modified: ode/trunk/axis2/src/main/java/org/apache/ode/axis2/deploy/DeploymentPoller.java
URL: http://svn.apache.org/viewvc/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/deploy/DeploymentPoller.java?rev=781611&r1=781610&r2=781611&view=diff
==============================================================================
--- ode/trunk/axis2/src/main/java/org/apache/ode/axis2/deploy/DeploymentPoller.java (original)
+++ ode/trunk/axis2/src/main/java/org/apache/ode/axis2/deploy/DeploymentPoller.java Wed Jun  3 22:23:43 2009
@@ -41,6 +41,9 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.ode.axis2.ODEServer;
+import org.apache.ode.bpel.engine.cron.CronScheduler;
+import org.apache.ode.bpel.engine.cron.SystemSchedulesConfig;
+import org.apache.ode.utils.WatchDog;
 
 import javax.xml.namespace.QName;
 import java.io.File;
@@ -48,6 +51,8 @@
 import java.io.FilenameFilter;
 import java.io.IOException;
 import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
 
 /**
  * Polls a directory for the deployment of a new deployment unit.
@@ -67,6 +72,13 @@
 
     private boolean _onHold = false;
 
+    private SystemSchedulesConfig _systemSchedulesConf;
+
+    @SuppressWarnings("unchecked")
+    private Map<String, WatchDog> dDWatchDogsByPath = new HashMap<String, WatchDog>();
+    @SuppressWarnings("unchecked")
+    private WatchDog _systemCronConfigWatchDog;
+
     /** Filter accepting directories containing a .odedd file. */
     private static final FileFilter _fileFilter = new FileFilter() {
         public boolean accept(File path) {
@@ -93,6 +105,8 @@
         _deployDir = deployDir;
         if (!_deployDir.exists())
             _deployDir.mkdir();
+        _systemSchedulesConf = createSystemSchedulesConfig(odeServer.getConfigRoot());
+        _systemCronConfigWatchDog = createSystemCronConfigWatchDog(odeServer.getBpelServer().getContexts().cronScheduler);
     }
 
     public void start() {
@@ -110,6 +124,7 @@
      * Scan the directory for new (or removed) files (called mainly from {@link PollingThread}) and calls whoever is in charge of
      * the actual deployment (or undeployment).
      */
+    @SuppressWarnings("unchecked")
     private void check() {
         File[] files = _deployDir.listFiles(_fileFilter);
 
@@ -123,7 +138,10 @@
                 __log.debug("Not deploying " + file + " (missing deploy.xml)");
             }
 
+            WatchDog ddWatchDog = ensureDeployXmlWatchDog(file, deployXml);
+
             if (deployedMarker.exists()) {
+                checkDeployXmlWatchDog(ddWatchDog);
                 continue;
             }
 
@@ -160,6 +178,47 @@
                     __log.info("Successfully undeployed " + pkg);
             }
         }
+
+        checkSystemCronConfigWatchDog(_systemCronConfigWatchDog);
+    }
+
+    @SuppressWarnings("unchecked")
+    protected WatchDog ensureDeployXmlWatchDog(File deployFolder, File deployXml) {
+        WatchDog ddWatchDog = dDWatchDogsByPath.get(deployXml.getAbsolutePath());
+        if( ddWatchDog == null ) {
+            ddWatchDog = WatchDog.watchFile(deployXml, new DDWatchDogObserver(deployFolder.getName()));
+            dDWatchDogsByPath.put(deployXml.getAbsolutePath(), ddWatchDog);
+        }
+        
+        return ddWatchDog;
+    }
+
+    @SuppressWarnings("unchecked")
+    protected void checkDeployXmlWatchDog(WatchDog ddWatchDog) {
+        ddWatchDog.check();
+    }
+
+    protected void disposeDeployXmlWatchDog(File deployDir) {
+        dDWatchDogsByPath.remove(new File(deployDir, "deploy.xml").getAbsolutePath());
+    }
+
+    protected SystemSchedulesConfig createSystemSchedulesConfig(File configRoot) {
+        return new SystemSchedulesConfig(configRoot);
+    }
+    
+    @SuppressWarnings("unchecked")
+    protected WatchDog createSystemCronConfigWatchDog(final CronScheduler cronScheduler) {
+        return WatchDog.watchFile(_systemSchedulesConf.getSchedulesFile(), 
+            new WatchDog.DefaultObserver() {
+                public void init() {
+                    cronScheduler.refreshSystemCronJobs(_systemSchedulesConf);
+                }
+            });
+    }
+    
+    @SuppressWarnings("unchecked")
+    protected void checkSystemCronConfigWatchDog(WatchDog ddWatchDog) {
+        ddWatchDog.check();
     }
 
     /**
@@ -220,4 +279,17 @@
         File deployedMarker = new File(_deployDir, file.getName() + ".deployed");
         deployedMarker.delete();
     }
+
+    @SuppressWarnings("unchecked")
+    protected class DDWatchDogObserver extends WatchDog.DefaultObserver {
+        private String deploymentPakage;
+        
+        public DDWatchDogObserver(String deploymentPakage) {
+            this.deploymentPakage = deploymentPakage;
+        }
+        
+        public void init() {
+            _odeServer.getProcessStore().refreshSchedules(deploymentPakage);
+        }
+    }
 }

Modified: ode/trunk/bpel-api/src/main/java/org/apache/ode/bpel/iapi/ProcessStoreEvent.java
URL: http://svn.apache.org/viewvc/ode/trunk/bpel-api/src/main/java/org/apache/ode/bpel/iapi/ProcessStoreEvent.java?rev=781611&r1=781610&r2=781611&view=diff
==============================================================================
--- ode/trunk/bpel-api/src/main/java/org/apache/ode/bpel/iapi/ProcessStoreEvent.java (original)
+++ ode/trunk/bpel-api/src/main/java/org/apache/ode/bpel/iapi/ProcessStoreEvent.java Wed Jun  3 22:23:43 2009
@@ -49,7 +49,10 @@
         DISABLED,
         
         /** A process property was changed. */
-        PROPERTY_CHANGED
+        PROPERTY_CHANGED,
+
+        /** Cron schedule settings have been changed for the process */
+        SCHEDULE_SETTINGS_CHANGED
     }
 
     /**

Modified: ode/trunk/bpel-api/src/main/java/org/apache/ode/bpel/iapi/Scheduler.java
URL: http://svn.apache.org/viewvc/ode/trunk/bpel-api/src/main/java/org/apache/ode/bpel/iapi/Scheduler.java?rev=781611&r1=781610&r2=781611&view=diff
==============================================================================
--- ode/trunk/bpel-api/src/main/java/org/apache/ode/bpel/iapi/Scheduler.java (original)
+++ ode/trunk/bpel-api/src/main/java/org/apache/ode/bpel/iapi/Scheduler.java Wed Jun  3 22:23:43 2009
@@ -23,16 +23,15 @@
 import java.util.Date;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.Future;
 
 /**
  * The BPEL scheduler.
  */
 public interface Scheduler {
-
     void setJobProcessor(JobProcessor processor) throws ContextException;
 
+    void setPolledRunnableProcesser(JobProcessor polledRunnableProcessor);
+
     /**
      * Schedule a persisted job. Persisted jobs MUST survive system failure.
      * They also must not be scheduled unless the transaction associated with

Modified: ode/trunk/bpel-store/src/main/java/org/apache/ode/store/ProcessStoreImpl.java
URL: http://svn.apache.org/viewvc/ode/trunk/bpel-store/src/main/java/org/apache/ode/store/ProcessStoreImpl.java?rev=781611&r1=781610&r2=781611&view=diff
==============================================================================
--- ode/trunk/bpel-store/src/main/java/org/apache/ode/store/ProcessStoreImpl.java (original)
+++ ode/trunk/bpel-store/src/main/java/org/apache/ode/store/ProcessStoreImpl.java Wed Jun  3 22:23:43 2009
@@ -819,4 +819,9 @@
         }
     }
     
+    public void refreshSchedules(String packageName) {
+        for( QName pid : listProcesses(packageName) ) {
+            fireEvent(new ProcessStoreEvent(ProcessStoreEvent.Type.SCHEDULE_SETTINGS_CHANGED, pid, packageName));
+        }
+    }
 }

Modified: ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/BpelDAOConnectionImpl.java
URL: http://svn.apache.org/viewvc/ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/BpelDAOConnectionImpl.java?rev=781611&r1=781610&r2=781611&view=diff
==============================================================================
--- ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/BpelDAOConnectionImpl.java (original)
+++ ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/BpelDAOConnectionImpl.java Wed Jun  3 22:23:43 2009
@@ -25,6 +25,7 @@
 import org.apache.ode.bpel.dao.*;
 import org.apache.ode.bpel.evt.BpelEvent;
 import org.apache.ode.bpel.evt.ScopeEvent;
+import org.apache.ode.bpel.iapi.ProcessConf.CLEANUP_CATEGORY;
 import org.apache.ode.daohib.SessionManager;
 import org.apache.ode.daohib.bpel.hobj.*;
 import org.apache.ode.daohib.bpel.ql.HibernateInstancesQueryCompiler;
@@ -52,29 +53,30 @@
 /**
  * Hibernate-based {@link BpelDAOConnection} implementation.
  */
-public class BpelDAOConnectionImpl implements BpelDAOConnection {
+public class BpelDAOConnectionImpl implements BpelDAOConnection, FilteredInstanceDeletable {
     private static final Log __log = LogFactory.getLog(BpelDAOConnectionImpl.class);
 
-    private Session _session;
-
     protected SessionManager _sm;
 
     public BpelDAOConnectionImpl(SessionManager sm) {
         _sm = sm;
-        _session = _sm.getSession();
+    }
+
+    protected Session getSession(){
+        return _sm.getSession();
     }
 
     public MessageExchangeDAO createMessageExchange(String mexId, char dir) {
         HMessageExchange mex = new HMessageExchange();
         mex.setMexId(mexId);
         mex.setDirection(dir);
-        _session.save(mex);
+        getSession().save(mex);
         return new MessageExchangeDaoImpl(_sm, mex);
     }
 
     public MessageExchangeDAO getMessageExchange(String mexId) {
         try {
-            org.hibernate.Query query = _session.createQuery("from HMessageExchange x where x.mexId = ?");
+            org.hibernate.Query query = getSession().createQuery("from HMessageExchange x where x.mexId = ?");
             query.setString(0, mexId);
             HMessageExchange mex = (HMessageExchange) query.uniqueResult();
             return mex == null ? null : new MessageExchangeDaoImpl(_sm, mex);
@@ -93,7 +95,7 @@
 
     public ResourceRouteDAO getResourceRoute(String url, String method) {
         try {
-            Criteria criteria = _session.createCriteria(HResourceRoute.class);
+            Criteria criteria = getSession().createCriteria(HResourceRoute.class);
             criteria.add(Expression.eq("url", url));
             criteria.add(Expression.eq("method", method));
             HResourceRoute hrr = (HResourceRoute) criteria.uniqueResult();
@@ -105,12 +107,12 @@
     }
 
     public void deleteResourceRoute(String url, String method) {
-        _session.createQuery("delete from HResourceRoute r where r.url = :url and r.method = :method")
+        getSession().createQuery("delete from HResourceRoute r where r.url = :url and r.method = :method")
                 .setString("url", url).setString("method", method).executeUpdate();
     }
 
     public List<ResourceRouteDAO> getAllResourceRoutes() {
-        List<HResourceRoute> hrr = _session.createCriteria(HResourceRoute.class).list();
+        List<HResourceRoute> hrr = getSession().createCriteria(HResourceRoute.class).list();
         ArrayList<ResourceRouteDAO> rr = new ArrayList<ResourceRouteDAO>(hrr.size());
         for (HResourceRoute hroute : hrr)
             rr.add(new ResourceRouteDaoImpl(_sm, hroute));
@@ -125,7 +127,7 @@
         process.setDeployDate(new Date());
         process.setGuid(guid);
         process.setVersion(version);
-        _session.save(process);
+        getSession().save(process);
         return new ProcessDaoImpl(_sm, process);
     }
 
@@ -139,7 +141,7 @@
     public ProcessDAO getProcess(QName processId) {
 
         try {
-            Criteria criteria = _session.createCriteria(HProcess.class);
+            Criteria criteria = getSession().createCriteria(HProcess.class);
             criteria.add(Expression.eq("processId", processId.toString()));
             // For the moment we are expecting only one result.
             HProcess hprocess = (HProcess) criteria.uniqueResult();
@@ -158,11 +160,11 @@
      * @see org.apache.ode.bpel.dao.ProcessDAO#getInstance(java.lang.Long)
      */
     public ProcessInstanceDAO getInstance(Long instanceId) {
-        return _getInstance(_sm, _session, instanceId);
+        return _getInstance(_sm, getSession(), instanceId);
     }
 
     public ScopeDAO getScope(Long siidl) {
-        return _getScope(_sm, _session, siidl);
+        return _getScope(_sm, getSession(), siidl);
     }
 
     public Collection<ProcessInstanceDAO> instanceQuery(InstanceFilter criteria) {
@@ -171,7 +173,7 @@
         }
         List<ProcessInstanceDAO> daos = new ArrayList<ProcessInstanceDAO>();
 
-        Iterator<HProcessInstance> iter = _instanceQuery(_session, false, criteria);
+        Iterator<HProcessInstance> iter = _instanceQuery(getSession(), false, criteria);
         while (iter.hasNext()) {
             daos.add(new ProcessInstanceDaoImpl(_sm, iter.next()));
         }
@@ -238,7 +240,7 @@
     @SuppressWarnings( { "unchecked", "deprecation" })
     public List<Date> bpelEventTimelineQuery(InstanceFilter ifilter, BpelEventFilter efilter) {
         CriteriaBuilder cb = new CriteriaBuilder();
-        Criteria crit = _session.createCriteria(HBpelEvent.class);
+        Criteria crit = getSession().createCriteria(HBpelEvent.class);
         if (ifilter != null)
             cb.buildCriteria(crit, efilter);
         if (ifilter != null)
@@ -251,7 +253,7 @@
     @SuppressWarnings("unchecked")
     public List<BpelEvent> bpelEventQuery(InstanceFilter ifilter, BpelEventFilter efilter) {
         CriteriaBuilder cb = new CriteriaBuilder();
-        Criteria crit = _session.createCriteria(HBpelEvent.class);
+        Criteria crit = getSession().createCriteria(HBpelEvent.class);
         if (efilter != null)
             cb.buildCriteria(crit, efilter);
         if (ifilter != null)
@@ -283,7 +285,7 @@
         HibernateInstancesQueryCompiler compiler = new HibernateInstancesQueryCompiler();
 
         CommandEvaluator<List, Session> eval = compiler.compile((Query) rootNode);
-        List<HProcessInstance> instancesList = (List<HProcessInstance>) eval.evaluate(_session);
+        List<HProcessInstance> instancesList = (List<HProcessInstance>) eval.evaluate(getSession());
 
         Collection<ProcessInstanceDAO> result = new ArrayList<ProcessInstanceDAO>(instancesList.size());
         for (HProcessInstance instance : instancesList) {
@@ -292,6 +294,33 @@
         return result;
     }
 
+    public int deleteInstances(InstanceFilter criteria, Set<CLEANUP_CATEGORY> categories) {
+        if (criteria.getLimit() == 0) {
+            return 0;
+        }
+
+        List<HProcessInstance> instances = _instanceQueryForList(getSession(), false, criteria);
+        if( __log.isDebugEnabled() ) __log.debug("Collected " + instances.size() + " instances to delete.");
+        
+        if( !instances.isEmpty() ) {
+            ProcessDaoImpl process = (ProcessDaoImpl)createTransientProcess(instances.get(0).getProcessId());
+            return process.deleteInstances(instances, categories);
+        }
+        
+        return 0;
+    }
+
+    @SuppressWarnings("unchecked")
+    private static List<HProcessInstance> _instanceQueryForList(Session session, boolean countOnly, InstanceFilter filter) {
+        Criteria crit = session.createCriteria(HProcessInstance.class);
+        CriteriaBuilder cb = new CriteriaBuilder();
+        cb.buildCriteria(crit, filter);
+        
+        crit.setFetchMode("fault", FetchMode.JOIN);
+
+        return crit.list();
+    }
+
     @SuppressWarnings("unchecked")
     public Map<Long, Collection<CorrelationSetDAO>> getCorrelationSets(Collection<ProcessInstanceDAO> instances) {
         if (instances.size() == 0) {
@@ -303,7 +332,7 @@
             iids[i] = dao.getInstanceId();
             i++;
         }
-        Collection<HCorrelationSet> csets = _session.getNamedQuery(HCorrelationSet.SELECT_CORSETS_BY_INSTANCES).setParameterList("instances", iids).list();        
+        Collection<HCorrelationSet> csets = getSession().getNamedQuery(HCorrelationSet.SELECT_CORSETS_BY_INSTANCES).setParameterList("instances", iids).list();        
         Map<Long, Collection<CorrelationSetDAO>> map = new HashMap<Long, Collection<CorrelationSetDAO>>();
         for (HCorrelationSet cset: csets) {
             Long id = cset.getInstance().getId();

Modified: ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HProcessInstance.java
URL: http://svn.apache.org/viewvc/ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HProcessInstance.java?rev=781611&r1=781610&r2=781611&view=diff
==============================================================================
--- ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HProcessInstance.java (original)
+++ ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HProcessInstance.java Wed Jun  3 22:23:43 2009
@@ -38,10 +38,12 @@
     public static final String SELECT_INSTANCES_BY_PROCESS="SELECT_INSTANCES_BY_PROCESS";
     public static final String SELECT_INSTANCES_BY_PROCESSES_AND_STATES="SELECT_INSTANCES_BY_PROCESSES_AND_STATES";
     public static final String DELETE_INSTANCES="DELETE_INSTANCES";
-	
+
     /** Foreign key to owner {@link HProcess}. */
     private HProcess _process;
 
+    private Long _processId;
+
     /** Foreign key to the instantiating {@link HCorrelator}. */
     private HCorrelator _instantiatingCorrelator;
 
@@ -201,6 +203,17 @@
     }
 
     /**
+     * @hibernate.property column="PROCESS_ID" insert="false" update="false"
+     */
+    public Long getProcessId() {
+        return _processId;
+    }
+
+    public void setProcessId(Long processId) {
+        _processId = processId;
+    }
+
+    /**
      * @hibernate.bag lazy="true" inverse="true"
      * @hibernate.collection-key column="PIID" foreign-key="none"
      * @hibernate.collection-one-to-many class="org.apache.ode.daohib.bpel.hobj.HScope"

Modified: ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java
URL: http://svn.apache.org/viewvc/ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java?rev=781611&r1=781610&r2=781611&view=diff
==============================================================================
--- ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java (original)
+++ ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java Wed Jun  3 22:23:43 2009
@@ -41,6 +41,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.ode.bpel.dao.*;
+import org.apache.ode.bpel.engine.cron.CronScheduler;
 import org.apache.ode.bpel.evar.ExternalVariableModule;
 import org.apache.ode.bpel.evt.BpelEvent;
 import org.apache.ode.bpel.extension.ExtensionBundleRuntime;
@@ -55,8 +56,6 @@
 import org.apache.ode.utils.msg.MessageBundle;
 import org.apache.ode.utils.stl.CollectionsX;
 import org.apache.ode.utils.stl.MemberOfFunction;
-import org.apache.ode.bpel.rapi.ProcessModel;
-import org.apache.ode.bpel.extension.ExtensionBundleRuntime;
 
 /**
  * <p>
@@ -74,7 +73,6 @@
  * @author Matthieu Riou <mriou at apache dot org>
  */
 public class BpelServerImpl implements BpelServer, Scheduler.JobProcessor {
-
     private static final Log __log = LogFactory.getLog(BpelServerImpl.class);
 
     private static final Messages __msgs = MessageBundle.getMessages(Messages.class);
@@ -157,6 +155,10 @@
     public BpelServerImpl() {
     }
 
+    public Contexts getContexts() {
+        return _contexts;
+    }
+    
     protected void waitForQuiessence() {
         do{
         _mngmtLock.writeLock().lock();
@@ -583,6 +585,10 @@
         _contexts.scheduler = scheduler;
     }
 
+    public void setCronScheduler(CronScheduler cronScheduler) throws BpelEngineException {
+        _contexts.cronScheduler = cronScheduler;
+    }
+
     public void setEndpointReferenceContext(EndpointReferenceContext eprContext) throws BpelEngineException {
         _contexts.eprContext = eprContext;
     }

Modified: ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/cron/RuntimeDataCleanupRunnable.java
URL: http://svn.apache.org/viewvc/ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/cron/RuntimeDataCleanupRunnable.java?rev=781611&r1=781610&r2=781611&view=diff
==============================================================================
--- ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/cron/RuntimeDataCleanupRunnable.java (original)
+++ ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/cron/RuntimeDataCleanupRunnable.java Wed Jun  3 22:23:43 2009
@@ -50,12 +50,18 @@
     
     public void run() {
         _log.info("CleanInstances.run().");
-        for( String filter : _cleanupInfo.getFilters() ) {
-            _log.info("CleanInstances.run(" + filter + ")");
-            long numberOfDeletedInstances = 0;
-            do {
-                numberOfDeletedInstances = cleanInstances(filter, _cleanupInfo.getCategories(), _transactionSize);
-            } while( numberOfDeletedInstances == _transactionSize );
+        try {
+            for( String filter : _cleanupInfo.getFilters() ) {
+                _log.info("CleanInstances.run(" + filter + ")");
+                long numberOfDeletedInstances = 0;
+                do {
+                    numberOfDeletedInstances = cleanInstances(filter, _cleanupInfo.getCategories(), _transactionSize);
+                } while( numberOfDeletedInstances == _transactionSize );
+            }
+        } catch( RuntimeException re ) {
+            throw re;
+        } catch( Exception e ) {
+            throw new RuntimeException("", e);
         }
     }
     

Modified: ode/trunk/il-common/src/main/java/org/apache/ode/il/MockScheduler.java
URL: http://svn.apache.org/viewvc/ode/trunk/il-common/src/main/java/org/apache/ode/il/MockScheduler.java?rev=781611&r1=781610&r2=781611&view=diff
==============================================================================
--- ode/trunk/il-common/src/main/java/org/apache/ode/il/MockScheduler.java (original)
+++ ode/trunk/il-common/src/main/java/org/apache/ode/il/MockScheduler.java Wed Jun  3 22:23:43 2009
@@ -145,4 +145,7 @@
     public void jobCompleted(String jobId) {
 
     }
+
+    public void setPolledRunnableProcesser(JobProcessor delegatedRunnableProcessor) {
+    }
 }

Modified: ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
URL: http://svn.apache.org/viewvc/ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java?rev=781611&r1=781610&r2=781611&view=diff
==============================================================================
--- ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java (original)
+++ ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java Wed Jun  3 22:23:43 2009
@@ -80,6 +80,8 @@
     /** The object that actually handles the jobs. */
     volatile JobProcessor _jobProcessor;
 
+    volatile JobProcessor _polledRunnableProcessor;
+
     private SchedulerThread _todo;
 
     private DatabaseDelegate _db;
@@ -131,6 +133,10 @@
         _db = dbd;
     }
 
+    public void setPolledRunnableProcesser(JobProcessor polledRunnableProcessor) {
+        _polledRunnableProcessor = polledRunnableProcessor;
+    }
+
     public void cancelJob(String jobId) throws ContextException {
         _todo.dequeue(new Job(0, jobId, false, null));
         try {