You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by se...@apache.org on 2009/06/01 07:07:37 UTC

svn commit: r780562 [2/3] - in /ode/branches/APACHE_ODE_1.X: axis2-war/src/main/ axis2-war/src/test/java/org/apache/ode/axis2/instancecleanup/ axis2-war/src/test/resources/TestProcessCronCleanup/ axis2-war/src/test/resources/TestSystemCronCleanup/ axis...

Added: ode/branches/APACHE_ODE_1.X/axis2-war/src/test/resources/TestSystemCronCleanup_exclude/TimeService.wsdl
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/axis2-war/src/test/resources/TestSystemCronCleanup_exclude/TimeService.wsdl?rev=780562&view=auto
==============================================================================
--- ode/branches/APACHE_ODE_1.X/axis2-war/src/test/resources/TestSystemCronCleanup_exclude/TimeService.wsdl (added)
+++ ode/branches/APACHE_ODE_1.X/axis2-war/src/test/resources/TestSystemCronCleanup_exclude/TimeService.wsdl Mon Jun  1 05:07:35 2009
@@ -0,0 +1,103 @@
+<?xml version="1.0" encoding="utf-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+<wsdl:definitions
+    xmlns:soap="http://schemas.xmlsoap.org/wsdl/soap/" 
+    xmlns:tns="http://ws.intalio.com/TimeService/"
+    xmlns:s="http://www.w3.org/2001/XMLSchema"
+    xmlns:http="http://schemas.xmlsoap.org/wsdl/http/"
+    targetNamespace="http://ws.intalio.com/TimeService/"
+    xmlns:wsdl="http://schemas.xmlsoap.org/wsdl/">
+  <wsdl:documentation xmlns:wsdl="http://schemas.xmlsoap.org/wsdl/">A sample Time service</wsdl:documentation>
+  <wsdl:types>
+    <s:schema elementFormDefault="qualified" targetNamespace="http://ws.intalio.com/TimeService/">
+      <s:element name="getUTCTime" type="s:string" />
+      <s:element name="getUTCTimeResponse">
+        <s:complexType>
+          <s:sequence>
+            <s:element minOccurs="0" maxOccurs="1" name="getUTCTimeResult" type="s:string" />
+          </s:sequence>
+        </s:complexType>
+      </s:element>
+      <s:element name="getCityTime">
+        <s:complexType>
+          <s:sequence>
+            <s:element minOccurs="0" maxOccurs="1" name="city" type="s:string" />
+          </s:sequence>
+        </s:complexType>
+      </s:element>
+      <s:element name="getCityTimeResponse">
+        <s:complexType>
+          <s:sequence>
+            <s:element minOccurs="0" maxOccurs="1" name="getCityTimeResult" type="s:string" />
+          </s:sequence>
+        </s:complexType>
+      </s:element>
+    </s:schema>
+  </wsdl:types>
+  <wsdl:message name="getUTCTimeSoapIn">
+    <wsdl:part name="parameters" element="tns:getUTCTime" />
+  </wsdl:message>
+  <wsdl:message name="getUTCTimeSoapOut">
+    <wsdl:part name="parameters" element="tns:getUTCTimeResponse" />
+  </wsdl:message>
+  <wsdl:message name="getCityTimeSoapIn">
+    <wsdl:part name="parameters" element="tns:getCityTime" />
+  </wsdl:message>
+  <wsdl:message name="getCityTimeSoapOut">
+    <wsdl:part name="parameters" element="tns:getCityTimeResponse" />
+  </wsdl:message>
+  <wsdl:portType name="TimeServiceSoap">
+    <wsdl:operation name="getUTCTime">
+      <wsdl:input message="tns:getUTCTimeSoapIn" />
+      <wsdl:output message="tns:getUTCTimeSoapOut" />
+    </wsdl:operation>
+    <wsdl:operation name="getCityTime">
+      <wsdl:input message="tns:getCityTimeSoapIn" />
+      <wsdl:output message="tns:getCityTimeSoapOut" />
+    </wsdl:operation>
+  </wsdl:portType>
+  <wsdl:binding name="TimeServiceSoap" type="tns:TimeServiceSoap">
+    <soap:binding transport="http://schemas.xmlsoap.org/soap/http" />
+    <wsdl:operation name="getUTCTime">
+      <soap:operation soapAction="http://ws.intalio.com/TimeService/getUTCTime" style="document" />
+      <wsdl:input>
+        <soap:body use="literal" />
+      </wsdl:input>
+      <wsdl:output>
+        <soap:body use="literal" />
+      </wsdl:output>
+    </wsdl:operation>
+    <wsdl:operation name="getCityTime">
+      <soap:operation soapAction="http://ws.intalio.com/TimeService/getCityTime" style="document" />
+      <wsdl:input>
+        <soap:body use="literal" />
+      </wsdl:input>
+      <wsdl:output>
+        <soap:body use="literal" />
+      </wsdl:output>
+    </wsdl:operation>
+  </wsdl:binding>
+  <wsdl:service name="TimeService">
+    <wsdl:documentation xmlns:wsdl="http://schemas.xmlsoap.org/wsdl/">A sample Time service</wsdl:documentation>
+    <wsdl:port name="TimeServiceSoap" binding="tns:TimeServiceSoap">
+      <soap:address location="http://ws.intalio.com/TimeService/" />
+    </wsdl:port>
+  </wsdl:service>
+</wsdl:definitions>
\ No newline at end of file

Added: ode/branches/APACHE_ODE_1.X/axis2-war/src/test/resources/TestSystemCronCleanup_exclude/deploy.xml
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/axis2-war/src/test/resources/TestSystemCronCleanup_exclude/deploy.xml?rev=780562&view=auto
==============================================================================
--- ode/branches/APACHE_ODE_1.X/axis2-war/src/test/resources/TestSystemCronCleanup_exclude/deploy.xml (added)
+++ ode/branches/APACHE_ODE_1.X/axis2-war/src/test/resources/TestSystemCronCleanup_exclude/deploy.xml Mon Jun  1 05:07:35 2009
@@ -0,0 +1,42 @@
+<?xml version='1.0' encoding='UTF-8'?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+<dd:deploy xmlns:dd="http://ode.fivesight.com/schemas/2006/06/27/dd">
+    <dd:process xmlns:dd="http://ode.fivesight.com/schemas/2006/06/27/dd"
+        xmlns:Client="http://example.com/FirstProcess/Client" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+        xmlns:diag="http://example.com/FirstProcess" xmlns:TimeService="http://ws.intalio.com/TimeService/"
+        xmlns:TimeServer="http://example.com/FirstProcess/TimeServer"
+        xmlns:ns="http://bpms.intalio.com/FirstProcess/Time" xmlns:xs="http://www.w3.org/2001/XMLSchema"
+        xmlns:this="http://example.com/FirstProcess/FirstProcess" name="this:FirstProcess"
+        fileName="FirstProcess-FirstProcess.bpel">
+        <dd:property name="PATH">FirstProcess</dd:property>
+        <dd:property name="SVG">FirstProcess.svg</dd:property>
+        <dd:provide partnerLink="firstProcessAndClientPlkVar">
+            <dd:service name="this:CanonicServiceForClient" port="canonicPort"></dd:service>
+        </dd:provide>
+        <dd:invoke partnerLink="timeServerAndFirstProcessForPortTimeServiceSoapPlkVar">
+            <dd:service name="TimeService:TimeService" port="TimeServiceSoap"></dd:service>
+        </dd:invoke>        
+        <dd:schedule when="* * * * * ?">
+            <dd:cleanup>
+                <dd:filter><![CDATA[last_active<-1w status=completed]]></dd:filter>
+            </dd:cleanup>
+        </dd:schedule>
+    </dd:process>
+</dd:deploy>
\ No newline at end of file

Added: ode/branches/APACHE_ODE_1.X/axis2-war/src/test/resources/TestSystemCronCleanup_exclude/testRequest.soap
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/axis2-war/src/test/resources/TestSystemCronCleanup_exclude/testRequest.soap?rev=780562&view=auto
==============================================================================
--- ode/branches/APACHE_ODE_1.X/axis2-war/src/test/resources/TestSystemCronCleanup_exclude/testRequest.soap (added)
+++ ode/branches/APACHE_ODE_1.X/axis2-war/src/test/resources/TestSystemCronCleanup_exclude/testRequest.soap Mon Jun  1 05:07:35 2009
@@ -0,0 +1,28 @@
+<?xml version="1.0" encoding="utf-8" ?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+
+<SOAP-ENV:Envelope xmlns:SOAP-ENV="http://schemas.xmlsoap.org/soap/envelope/">
+  <!-- test soap message -->
+  <SOAP-ENV:Body>
+    <TimeRequest xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/" xmlns:wsi-wsdl="http://www.intalio.com/BPMS/wsi/wsdl" xmlns:wsi-xf="http://www.intalio.com/BPMS/wsi/xforms" xmlns:xforms="http://www.w3.org/2002/xforms" xmlns:ns0="http://bpms.intalio.com/FirstProcess/Time" xmlns:xxforms="http://orbeon.org/oxf/xml/xforms" xmlns="http://bpms.intalio.com/FirstProcess/Time">                   
+        <city>New York</city>                 
+    </TimeRequest>  
+  </SOAP-ENV:Body>
+</SOAP-ENV:Envelope>

Added: ode/branches/APACHE_ODE_1.X/axis2-war/src/test/webapp/WEB-INF/test-schedules.xml
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/axis2-war/src/test/webapp/WEB-INF/test-schedules.xml?rev=780562&view=auto
==============================================================================
--- ode/branches/APACHE_ODE_1.X/axis2-war/src/test/webapp/WEB-INF/test-schedules.xml (added)
+++ ode/branches/APACHE_ODE_1.X/axis2-war/src/test/webapp/WEB-INF/test-schedules.xml Mon Jun  1 05:07:35 2009
@@ -0,0 +1,29 @@
+<?xml version='1.0' encoding='UTF-8'?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+
+<sched:schedules 
+        xmlns:sched="http://www.apache.org/ode/schemas/schedules/2009/05"
+        xmlns:dd="http://www.apache.org/ode/schemas/dd/2007/03">
+    <sched:schedule when="* * * * * ?">
+        <sched:cleanup>
+            <dd:filter><![CDATA[status=completed]]></dd:filter>
+        </sched:cleanup>
+    </sched:schedule>
+</sched:schedules>
\ No newline at end of file

Modified: ode/branches/APACHE_ODE_1.X/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java?rev=780562&r1=780561&r2=780562&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.X/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java (original)
+++ ode/branches/APACHE_ODE_1.X/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java Mon Jun  1 05:07:35 2009
@@ -56,6 +56,7 @@
 import org.apache.ode.bpel.dao.BpelDAOConnectionFactory;
 import org.apache.ode.bpel.engine.BpelServerImpl;
 import org.apache.ode.bpel.engine.CountLRUDehydrationPolicy;
+import org.apache.ode.bpel.engine.cron.CronScheduler;
 import org.apache.ode.bpel.extvar.jdbc.JdbcExternalVariableModule;
 import org.apache.ode.bpel.iapi.BpelEventListener;
 import org.apache.ode.bpel.iapi.EndpointReferenceContext;
@@ -106,9 +107,9 @@
 
     protected ExecutorService _executorService;
 
-    protected ExecutorService _polledRunnableExecutorService;
-
     protected Scheduler _scheduler;
+    
+    protected CronScheduler _cronScheduler;
 
     protected Database _db;
 
@@ -119,14 +120,13 @@
     private ManagementService _mgtService;
 
     protected MultiThreadedHttpConnectionManager httpConnectionManager;
-
-
+    
     public void init(ServletConfig config, AxisConfiguration axisConf) throws ServletException {
         init(config.getServletContext().getRealPath("/WEB-INF"), axisConf);
     }
 
     public void init(String contextPath, AxisConfiguration axisConf) throws ServletException {
-    	_axisConfig = axisConf;
+        _axisConfig = axisConf;
         String rootDir = System.getProperty("org.apache.ode.rootDir");
         if (rootDir != null) _appRoot = new File(rootDir);
         else _appRoot = new File(contextPath);
@@ -144,12 +144,12 @@
         _odeConfig = new ODEConfigProperties(_configRoot);
 
         try {
-        	_odeConfig.load();
+            _odeConfig.load();
         } catch (FileNotFoundException fnf) {
-        	String errmsg = __msgs.msgOdeInstallErrorCfgNotFound(_odeConfig.getFile());
-        	__log.warn(errmsg);
+            String errmsg = __msgs.msgOdeInstallErrorCfgNotFound(_odeConfig.getFile());
+            __log.warn(errmsg);
         } catch (Exception ex) {
-        	String errmsg = __msgs.msgOdeInstallErrorCfgReadError(_odeConfig.getFile());
+            String errmsg = __msgs.msgOdeInstallErrorCfgReadError(_odeConfig.getFile());
                 __log.error(errmsg, ex);
                 throw new ServletException(errmsg, ex);
             }
@@ -182,16 +182,16 @@
         _store.loadAll();
 
         try {
-        	_bpelServer.start();
+            _bpelServer.start();
         } catch (Exception ex) {
-        	String errmsg = __msgs.msgOdeBpelServerStartFailure();
-        	__log.error(errmsg, ex);
-        	throw new ServletException(errmsg, ex);
+            String errmsg = __msgs.msgOdeBpelServerStartFailure();
+            __log.error(errmsg, ex);
+            throw new ServletException(errmsg, ex);
         }
 
         _poller = getDeploymentPollerExt();
         if( _poller == null ) {
-        	_poller = new DeploymentPoller(_store.getDeployDir(), this);
+            _poller = new DeploymentPoller(_store.getDeployDir(), this);
         }
 
         _mgtService = new ManagementService();
@@ -216,36 +216,36 @@
     }
 
     @SuppressWarnings("unchecked")
-	private DeploymentPoller getDeploymentPollerExt() {
-    	DeploymentPoller poller = null;
-    	
-    	InputStream is = null;
-    	try {
-	        is = ODEServer.class.getResourceAsStream("/deploy-ext.properties");
-	        if( is != null ) {
-	        	__log.info("A deploy-ext.properties found; will use the provided class if applicable.");
-	        	try {
-	                Properties props = new Properties();
-			        props.load(is);
-			        String deploymentPollerClass = props.getProperty("deploymentPoller.class");
-			        if( deploymentPollerClass == null ) {
-			        	__log.warn("deploy-ext.properties found in the class path; however, the file does not have 'deploymentPoller.class' as one of the properties!!");
-			        } else {
-			    		Class pollerClass = Class.forName(deploymentPollerClass);
-			    		poller = (DeploymentPoller)pollerClass.getConstructor(File.class, ODEServer.class).newInstance(_store.getDeployDir(), this);
-			        	__log.info("A custom deployment poller: " + deploymentPollerClass + " has been plugged in.");
-			        }
-	        	} catch( Exception e ) {
-	        		__log.warn("Deployment poller extension class is not loadable, falling back to the default DeploymentPoller.", e);
-	        	}
-	        } else if( __log.isDebugEnabled() ) __log.debug("No deploy-ext.properties found.");
-    	} finally {
-    		try {
-    			if(is != null) is.close();
-    		} catch( IOException ie ) {
-    			// ignore
-    		}
-    	}
+    private DeploymentPoller getDeploymentPollerExt() {
+        DeploymentPoller poller = null;
+        
+        InputStream is = null;
+        try {
+            is = ODEServer.class.getResourceAsStream("/deploy-ext.properties");
+            if( is != null ) {
+                __log.info("A deploy-ext.properties found; will use the provided class if applicable.");
+                try {
+                    Properties props = new Properties();
+                    props.load(is);
+                    String deploymentPollerClass = props.getProperty("deploymentPoller.class");
+                    if( deploymentPollerClass == null ) {
+                        __log.warn("deploy-ext.properties found in the class path; however, the file does not have 'deploymentPoller.class' as one of the properties!!");
+                    } else {
+                        Class pollerClass = Class.forName(deploymentPollerClass);
+                        poller = (DeploymentPoller)pollerClass.getConstructor(File.class, ODEServer.class).newInstance(_store.getDeployDir(), this);
+                        __log.info("A custom deployment poller: " + deploymentPollerClass + " has been plugged in.");
+                    }
+                } catch( Exception e ) {
+                    __log.warn("Deployment poller extension class is not loadable, falling back to the default DeploymentPoller.", e);
+                }
+            } else if( __log.isDebugEnabled() ) __log.debug("No deploy-ext.properties found.");
+        } finally {
+            try {
+                if(is != null) is.close();
+            } catch( IOException ie ) {
+                // ignore
+            }
+        }
 
         return poller;
     }
@@ -294,6 +294,16 @@
                     __log.debug("Error stopping services.", ex);
                 }
 
+            if( _cronScheduler != null ) {
+                try {
+                    __log.debug("shutting down cron scheduler.");
+                    _cronScheduler.shutdown();
+                    _cronScheduler = null;
+                } catch (Exception ex) {
+                    __log.debug("Cron scheduler couldn't be shutdown.", ex);
+                }
+            }
+            
             if (_scheduler != null)
                 try {
                     __log.debug("shutting down scheduler.");
@@ -454,25 +464,19 @@
         else
             _executorService = Executors.newFixedThreadPool(_odeConfig.getThreadPoolMaxSize(), threadFactory);
         
-        // executor service for long running bulk transactions
-        _polledRunnableExecutorService = Executors.newCachedThreadPool(new ThreadFactory() {
-            int threadNumber = 0;
-            public Thread newThread(Runnable r) {
-                threadNumber += 1;
-                Thread t = new Thread(r, "PolledRunnable-"+threadNumber);
-                t.setDaemon(true);
-                return t;
-            }
-        });
-
         _bpelServer = new BpelServerImpl();
         _scheduler = createScheduler();
         _scheduler.setJobProcessor(_bpelServer);
         
         BpelServerImpl.PolledRunnableProcessor polledRunnableProcessor = new BpelServerImpl.PolledRunnableProcessor();
-        polledRunnableProcessor.setPolledRunnableExecutorService(_polledRunnableExecutorService);
+        polledRunnableProcessor.setPolledRunnableExecutorService(_executorService);
         polledRunnableProcessor.setContexts(_bpelServer.getContexts());
         _scheduler.setPolledRunnableProcesser(polledRunnableProcessor);
+        
+        _cronScheduler = new CronScheduler();
+        _cronScheduler.setScheduledTaskExec(_executorService);
+        _cronScheduler.setContexts(_bpelServer.getContexts());
+        _bpelServer.setCronScheduler(_cronScheduler);
 
         _bpelServer.setDaoConnectionFactory(_daoCF);
         _bpelServer.setInMemDaoConnectionFactory(new BpelDAOConnectionFactoryImpl(_scheduler, _odeConfig.getInMemMexTtl()));
@@ -534,6 +538,10 @@
         return _appRoot;
     }
 
+    public File getConfigRoot() {
+        return _configRoot;
+    }
+    
     private void registerEventListeners() {
         String listenersStr = _odeConfig.getEventListeners();
         if (listenersStr != null) {
@@ -613,24 +621,36 @@
                 } else {
                     // we may have potentially created a lot of garbage, so,
                     // let's hope the garbage collector is configured properly.
-                	if (pconf != null) {
-	                	_bpelServer.cleanupProcess(pconf);
-                	}
+                    if (pconf != null) {
+                        _bpelServer.cleanupProcess(pconf);
+                    }
                 }
                 break;
             case DISABLED:
             case UNDEPLOYED:
                 _bpelServer.unregister(pse.pid);
-            	if (pconf != null) {
-                	_bpelServer.cleanupProcess(pconf);
-            	}
+                if (pconf != null) {
+                    _bpelServer.cleanupProcess(pconf);
+                }
                 break;
             default:
                 __log.debug("Ignoring store event: " + pse);
         }
-    }
-
+        
+        if( pconf != null ) {
+            if( pse.type == ProcessStoreEvent.Type.UNDEPLOYED) {
+                __log.debug("Cancelling all cron scheduled jobs on store event: " + pse);
+                _bpelServer.getContexts().cronScheduler.cancelProcessCronJobs(pse.pid, true);
+            }
 
+            // Except for undeploy event, we need to re-schedule process dependent jobs
+            __log.debug("(Re)scheduling cron scheduled jobs on store event: " + pse);
+            if( pse.type != ProcessStoreEvent.Type.UNDEPLOYED) {
+                _bpelServer.getContexts().cronScheduler.scheduleProcessCronJobs(pse.pid, pconf);
+            }
+        }
+    }
+    
     // Transactional debugging stuff, to track down all these little annoying bugs.
     private class DebugTxMgr implements TransactionManager {
         private TransactionManager _tm;

Modified: ode/branches/APACHE_ODE_1.X/axis2/src/main/java/org/apache/ode/axis2/deploy/DeploymentPoller.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/axis2/src/main/java/org/apache/ode/axis2/deploy/DeploymentPoller.java?rev=780562&r1=780561&r2=780562&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.X/axis2/src/main/java/org/apache/ode/axis2/deploy/DeploymentPoller.java (original)
+++ ode/branches/APACHE_ODE_1.X/axis2/src/main/java/org/apache/ode/axis2/deploy/DeploymentPoller.java Mon Jun  1 05:07:35 2009
@@ -41,6 +41,9 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.ode.axis2.ODEServer;
+import org.apache.ode.bpel.engine.cron.CronScheduler;
+import org.apache.ode.bpel.engine.cron.SystemSchedulesConfig;
+import org.apache.ode.utils.WatchDog;
 
 import javax.xml.namespace.QName;
 import java.io.File;
@@ -48,6 +51,8 @@
 import java.io.FilenameFilter;
 import java.io.IOException;
 import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
 
 /**
  * Polls a directory for the deployment of a new deployment unit.
@@ -66,8 +71,15 @@
     protected ODEServer _odeServer;
 
     private boolean _onHold = false;
+    
+    private SystemSchedulesConfig _systemSchedulesConf;
+    
+    @SuppressWarnings("unchecked")
+    private Map<String, WatchDog> dDWatchDogsByPath = new HashMap<String, WatchDog>();
+    @SuppressWarnings("unchecked")
+    private WatchDog _systemCronConfigWatchDog;
 
-    /** Filter accepting directories containing a .odedd file. */
+    /** Filter accepting directories containing a ode dd file. */
     private static final FileFilter _fileFilter = new FileFilter() {
         public boolean accept(File path) {
             if (path.isDirectory()) {
@@ -88,11 +100,13 @@
         }
     };
 
-    public DeploymentPoller(File deployDir, ODEServer odeServer) {
+    public DeploymentPoller(File deployDir, final ODEServer odeServer) {
         _odeServer = odeServer;
         _deployDir = deployDir;
         if (!_deployDir.exists())
             _deployDir.mkdir();
+        _systemSchedulesConf = createSystemSchedulesConfig(odeServer.getConfigRoot());
+        _systemCronConfigWatchDog = createSystemCronConfigWatchDog(odeServer.getBpelServer().getContexts().cronScheduler);
     }
 
     public void start() {
@@ -107,51 +121,55 @@
     }
 
     protected boolean isDeploymentFromODEFileSystemAllowed() {
-    	return true;
+        return true;
     }
 
     /**
      * Scan the directory for new (or removed) files (called mainly from {@link PollingThread}) and calls whoever is in charge of
      * the actual deployment (or undeployment).
      */
+    @SuppressWarnings("unchecked")
     private void check() {
         File[] files = _deployDir.listFiles(_fileFilter);
 
         // Checking for new deployment directories
         if (isDeploymentFromODEFileSystemAllowed() && files != null) {
-	        for (File file : files) {
-	            File deployXml = new File(file, "deploy.xml");
-	            File deployedMarker = new File(_deployDir, file.getName() + ".deployed");
-	
-	            if (!deployXml.exists()) {
-	                // Skip if deploy.xml is abset
-	                __log.debug("Not deploying " + file + " (missing deploy.xml)");
-	            }
-	
-	            if (deployedMarker.exists()) {
-	                continue;
-	            }
-	
-	            try {
-	                deployedMarker.createNewFile();
-	            } catch (IOException e1) {
-	                __log.error("Error creating deployed marker file, " + file + " will not be deployed");
-	                continue;
-	            }
-	
-	            try {
-	                _odeServer.getProcessStore().undeploy(file);
-	            } catch (Exception ex) {
-	                __log.error("Error undeploying " + file.getName());
-	            }
-	
-	            try {
-	                Collection<QName> deployed = _odeServer.getProcessStore().deploy(file);
-	                __log.info("Deployment of artifact " + file.getName() + " successful: " + deployed );
-	            } catch (Exception e) {
-	                __log.error("Deployment of " + file.getName() + " failed, aborting for now.", e);
-	            }
-	        }
+            for (File file : files) {
+                File deployXml = new File(file, "deploy.xml");
+                File deployedMarker = new File(_deployDir, file.getName() + ".deployed");
+
+                if (!deployXml.exists()) {
+                    // Skip if deploy.xml is abset
+                    __log.debug("Not deploying " + file + " (missing deploy.xml)");
+                }
+
+                WatchDog ddWatchDog = ensureDeployXmlWatchDog(file, deployXml);
+                
+                if (deployedMarker.exists()) {
+                    checkDeployXmlWatchDog(ddWatchDog);
+                    continue;
+                }
+    
+                try {
+                    deployedMarker.createNewFile();
+                } catch (IOException e1) {
+                    __log.error("Error creating deployed marker file, " + file + " will not be deployed");
+                    continue;
+                }
+    
+                try {
+                    _odeServer.getProcessStore().undeploy(file);
+                } catch (Exception ex) {
+                    __log.error("Error undeploying " + file.getName());
+                }
+    
+                try {
+                    Collection<QName> deployed = _odeServer.getProcessStore().deploy(file);
+                    __log.info("Deployment of artifact " + file.getName() + " successful: " + deployed );
+                } catch (Exception e) {
+                    __log.error("Deployment of " + file.getName() + " failed, aborting for now.", e);
+                }
+            }
         }
 
         // Removing deployments that disappeared
@@ -162,10 +180,52 @@
             if (!deployDir.exists()) {
                 Collection<QName> undeployed = _odeServer.getProcessStore().undeploy(deployDir);
                 file.delete();
+                disposeDeployXmlWatchDog(deployDir);
                 if (undeployed.size() > 0)
                     __log.info("Successfully undeployed " + pkg);
             }
         }
+        
+        checkSystemCronConfigWatchDog(_systemCronConfigWatchDog);
+    }
+
+    @SuppressWarnings("unchecked")
+    protected WatchDog ensureDeployXmlWatchDog(File deployFolder, File deployXml) {
+        WatchDog ddWatchDog = dDWatchDogsByPath.get(deployXml.getAbsolutePath());
+        if( ddWatchDog == null ) {
+            ddWatchDog = WatchDog.watchFile(deployXml, new DDWatchDogObserver(deployFolder.getName()));
+            dDWatchDogsByPath.put(deployXml.getAbsolutePath(), ddWatchDog);
+        }
+        
+        return ddWatchDog;
+    }
+
+    @SuppressWarnings("unchecked")
+    protected void checkDeployXmlWatchDog(WatchDog ddWatchDog) {
+        ddWatchDog.check();
+    }
+
+    protected void disposeDeployXmlWatchDog(File deployDir) {
+        dDWatchDogsByPath.remove(new File(deployDir, "deploy.xml").getAbsolutePath());
+    }
+
+    protected SystemSchedulesConfig createSystemSchedulesConfig(File configRoot) {
+        return new SystemSchedulesConfig(configRoot);
+    }
+    
+    @SuppressWarnings("unchecked")
+    protected WatchDog createSystemCronConfigWatchDog(final CronScheduler cronScheduler) {
+        return WatchDog.watchFile(_systemSchedulesConf.getSchedulesFile(), 
+            new WatchDog.DefaultObserver() {
+                public void init() {
+                    cronScheduler.refreshSystemCronJobs(_systemSchedulesConf);
+                }
+            });
+    }
+    
+    @SuppressWarnings("unchecked")
+    protected void checkSystemCronConfigWatchDog(WatchDog ddWatchDog) {
+        ddWatchDog.check();
     }
 
     /**
@@ -174,6 +234,10 @@
     private class PollingThread extends Thread {
         private boolean _active = true;
 
+        public PollingThread() {
+            setName("DeploymentPoller");
+        }
+        
         /** Stop this poller, and block until it terminates. */
         void kill() {
             synchronized (this) {
@@ -226,4 +290,17 @@
         File deployedMarker = new File(_deployDir, file.getName() + ".deployed");
         deployedMarker.delete();
     }
+
+    @SuppressWarnings("unchecked")
+    protected class DDWatchDogObserver extends WatchDog.DefaultObserver {
+        private String deploymentPakage;
+        
+        public DDWatchDogObserver(String deploymentPakage) {
+            this.deploymentPakage = deploymentPakage;
+        }
+        
+        public void init() {
+            _odeServer.getProcessStore().refreshSchedules(deploymentPakage);
+        }
+    }
 }

Modified: ode/branches/APACHE_ODE_1.X/bpel-api/src/main/java/org/apache/ode/bpel/common/Filter.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-api/src/main/java/org/apache/ode/bpel/common/Filter.java?rev=780562&r1=780561&r2=780562&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.X/bpel-api/src/main/java/org/apache/ode/bpel/common/Filter.java (original)
+++ ode/branches/APACHE_ODE_1.X/bpel-api/src/main/java/org/apache/ode/bpel/common/Filter.java Mon Jun  1 05:07:35 2009
@@ -44,10 +44,13 @@
 
   /**
    *  Pattern that matches anything like 'abcde <  fgh' or 'ijklm  =nop' using 
-   *  supported comparators 
+   *  supported comparators
+   *  <p>
+   *  The not-equal op, '<>' works only with pids.
+   *  </p>
    */
   private static final Pattern __comparatorPattern = 
-    Pattern.compile("([^=<> ]*) *(<=|>=|<|>|=) *([^=<> ]*)");
+    Pattern.compile("([^=<> ]*) *(<>|<=|>=|<|>|=) *([^=<> ]*)");
 
   protected Map<FKEY, Restriction<String>> _criteria = new HashMap<FKEY,Restriction<String>>();
 

Modified: ode/branches/APACHE_ODE_1.X/bpel-api/src/main/java/org/apache/ode/bpel/common/InstanceFilter.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-api/src/main/java/org/apache/ode/bpel/common/InstanceFilter.java?rev=780562&r1=780561&r2=780562&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.X/bpel-api/src/main/java/org/apache/ode/bpel/common/InstanceFilter.java (original)
+++ ode/branches/APACHE_ODE_1.X/bpel-api/src/main/java/org/apache/ode/bpel/common/InstanceFilter.java Mon Jun  1 05:07:35 2009
@@ -22,10 +22,12 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.ode.bpel.pmapi.InvalidRequestException;
 import org.apache.ode.utils.ISO8601DateParser;
+import org.apache.ode.utils.RelativeDateParser;
 
 import java.io.Serializable;
 import java.text.ParseException;
 import java.util.ArrayList;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -48,6 +50,9 @@
     /** If set, will filter on the process id (PID) and select all matching process definitions */
     private List<String> pids;
 
+    /** If set, the PID filter will work negatively */
+    private boolean arePidsNegative;
+
     /** If set, will filter on the process name (accepts ending with wildcard) */
     private String nameFilter;
 
@@ -105,6 +110,7 @@
         PID {
             void process(InstanceFilter filter, String key, String op, String value) {
                 filter.pids = parse(value);
+                filter.arePidsNegative = "<>".equals(op);
             }
         },
         NAME {
@@ -183,7 +189,7 @@
         if (startedDateFilter != null) {
             for (String ddf : startedDateFilter) {
                 try {
-                    ISO8601DateParser.parse(getDateWithoutOp(ddf));
+                    parseDateExpression(getDateWithoutOp(ddf));
                 } catch (ParseException e) {
                     throw new InvalidRequestException(
                             "Couldn't parse one of the filter date, please make "
@@ -195,7 +201,7 @@
         if (lastActiveDateFilter != null) {
             for (String ddf : lastActiveDateFilter) {
                 try {
-                    ISO8601DateParser.parse(getDateWithoutOp(ddf));
+                    parseDateExpression(getDateWithoutOp(ddf));
                 } catch (ParseException e) {
                     throw new InvalidRequestException(
                             "Couldn't parse one of the filter date, please make "
@@ -235,6 +241,14 @@
         this(filter, null, Integer.MAX_VALUE);
     }
 
+    private Date parseDateExpression(String date) throws ParseException {
+        if( date.toLowerCase().startsWith("-")  && date.length() > 1 ) {
+            return RelativeDateParser.parseRelativeDate(date.substring(1));
+        } else {
+            return ISO8601DateParser.parse(date);
+        }
+    }
+    
     /**
      * Converts the status filter value as given by a filter ('active',
      * 'suspended', ...) to an instance state as defined in the ProcessState
@@ -299,6 +313,10 @@
         return pids;
     }
 
+    public boolean arePidsNegative() {
+        return arePidsNegative;
+    }
+
     public List<String> getIidFilter() {
         return iids;
     }
@@ -339,7 +357,11 @@
         StringBuffer buf = new StringBuffer();
         buf.append("InstanceFilter {");
         buf.append("iids="+iids);
-        buf.append(",pids="+pids);
+        if( !arePidsNegative ) {
+            buf.append(",pids="+pids);
+        } else {
+            buf.append(",-pids="+pids);
+        }
         buf.append(",name="+nameFilter);
         buf.append(",namespace="+namespaceFilter);
         buf.append(",status="+statusFilter);

Added: ode/branches/APACHE_ODE_1.X/bpel-api/src/main/java/org/apache/ode/bpel/iapi/ClusterAware.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-api/src/main/java/org/apache/ode/bpel/iapi/ClusterAware.java?rev=780562&view=auto
==============================================================================
--- ode/branches/APACHE_ODE_1.X/bpel-api/src/main/java/org/apache/ode/bpel/iapi/ClusterAware.java (added)
+++ ode/branches/APACHE_ODE_1.X/bpel-api/src/main/java/org/apache/ode/bpel/iapi/ClusterAware.java Mon Jun  1 05:07:35 2009
@@ -0,0 +1,18 @@
+package org.apache.ode.bpel.iapi;
+
+/**
+ * The interface to implement for a custom Scheduler implementation to support
+ * Clustering.
+ * 
+ * @author sean
+ *
+ */
+public interface ClusterAware {
+    /**
+     * A custom implementation should return true if the node that this method is called
+     * is the coordinator of the cluster.
+     * 
+     * @return true when the node is coordinator
+     */
+    boolean amICoordinator();
+}

Modified: ode/branches/APACHE_ODE_1.X/bpel-api/src/main/java/org/apache/ode/bpel/iapi/ProcessConf.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-api/src/main/java/org/apache/ode/bpel/iapi/ProcessConf.java?rev=780562&r1=780561&r2=780562&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.X/bpel-api/src/main/java/org/apache/ode/bpel/iapi/ProcessConf.java (original)
+++ ode/branches/APACHE_ODE_1.X/bpel-api/src/main/java/org/apache/ode/bpel/iapi/ProcessConf.java Mon Jun  1 05:07:35 2009
@@ -21,16 +21,18 @@
 import java.io.File;
 import java.io.InputStream;
 import java.net.URI;
+import java.util.ArrayList;
 import java.util.Date;
+import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Properties;
 import java.util.Set;
 
 import javax.wsdl.Definition;
 import javax.xml.namespace.QName;
 
 import org.apache.ode.bpel.evt.BpelEvent;
+import org.apache.ode.utils.CronExpression;
 import org.w3c.dom.Element;
 import org.w3c.dom.Node;
 
@@ -177,6 +179,8 @@
     
     Set<CLEANUP_CATEGORY> getCleanupCategories(boolean instanceSucceeded);
 
+    List<CronJob> getCronJobs();
+    
     public enum CLEANUP_CATEGORY {
         INSTANCE,
         VARIABLES,
@@ -188,4 +192,62 @@
             return valueOf(CLEANUP_CATEGORY.class, lowerCase.toUpperCase());
         }
     }
+    
+    public class CronJob {
+        private CronExpression _cronExpression;
+        
+        private final List<Map<String,Object>> runnableDetailList = new ArrayList<Map<String,Object>>();
+        
+        public void setCronExpression(CronExpression _cronExpression) {
+            this._cronExpression = _cronExpression;
+        }
+        
+        public CronExpression getCronExpression() {
+            return _cronExpression;
+        }
+        
+        public List<Map<String,Object>> getRunnableDetailList() {
+            return runnableDetailList;
+        }
+        
+        public String toString() {
+            StringBuffer buf = new StringBuffer();
+            
+            buf.append("Cron[");
+            buf.append(_cronExpression);
+            buf.append("] ");
+            buf.append(runnableDetailList);
+            
+            return buf.toString();
+        }
+    }
+    
+    public class CleanupInfo {
+        private List<String> _filters = new ArrayList<String>();
+        
+        private final Set<CLEANUP_CATEGORY> _categories = EnumSet.noneOf(CLEANUP_CATEGORY.class);
+        
+        public void setFilters(List<String> filters) {
+            _filters = filters;
+        }
+        
+        public List<String> getFilters() {
+            return _filters;
+        }
+        
+        public Set<CLEANUP_CATEGORY> getCategories() {
+            return _categories;
+        }
+        
+        public String toString() {
+            StringBuffer buf = new StringBuffer();
+            
+            buf.append("CleanupInfo: filters=");
+            buf.append(_filters);
+            buf.append(", categories=");
+            buf.append(_categories);
+            
+            return buf.toString();
+        }
+    }
 }

Modified: ode/branches/APACHE_ODE_1.X/bpel-api/src/main/java/org/apache/ode/bpel/iapi/ProcessStore.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-api/src/main/java/org/apache/ode/bpel/iapi/ProcessStore.java?rev=780562&r1=780561&r2=780562&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.X/bpel-api/src/main/java/org/apache/ode/bpel/iapi/ProcessStore.java (original)
+++ ode/branches/APACHE_ODE_1.X/bpel-api/src/main/java/org/apache/ode/bpel/iapi/ProcessStore.java Mon Jun  1 05:07:35 2009
@@ -114,4 +114,6 @@
      * @return
      */
     long getCurrentVersion();
+    
+    void refreshSchedules(String packageName);
 }

Modified: ode/branches/APACHE_ODE_1.X/bpel-api/src/main/java/org/apache/ode/bpel/iapi/ProcessStoreEvent.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-api/src/main/java/org/apache/ode/bpel/iapi/ProcessStoreEvent.java?rev=780562&r1=780561&r2=780562&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.X/bpel-api/src/main/java/org/apache/ode/bpel/iapi/ProcessStoreEvent.java (original)
+++ ode/branches/APACHE_ODE_1.X/bpel-api/src/main/java/org/apache/ode/bpel/iapi/ProcessStoreEvent.java Mon Jun  1 05:07:35 2009
@@ -49,7 +49,10 @@
         DISABLED,
         
         /** A process property was changed. */
-        PROPERTY_CHANGED
+        PROPERTY_CHANGED,
+        
+        /** Cron schedule settings have been changed for the process */
+        SCHEDULE_SETTINGS_CHANGED
     }
 
     /**

Added: ode/branches/APACHE_ODE_1.X/bpel-dao/src/main/java/org/apache/ode/bpel/dao/FilteredInstanceDeletable.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-dao/src/main/java/org/apache/ode/bpel/dao/FilteredInstanceDeletable.java?rev=780562&view=auto
==============================================================================
--- ode/branches/APACHE_ODE_1.X/bpel-dao/src/main/java/org/apache/ode/bpel/dao/FilteredInstanceDeletable.java (added)
+++ ode/branches/APACHE_ODE_1.X/bpel-dao/src/main/java/org/apache/ode/bpel/dao/FilteredInstanceDeletable.java Mon Jun  1 05:07:35 2009
@@ -0,0 +1,24 @@
+package org.apache.ode.bpel.dao;
+
+import java.util.Set;
+
+import org.apache.ode.bpel.common.InstanceFilter;
+import org.apache.ode.bpel.iapi.ProcessConf.CLEANUP_CATEGORY;
+
+/**
+ * An implementation of this interface provides a way to delete runtime process instances
+ * through the InstanceFilter.
+ * 
+ * @author sean
+ *
+ */
+public interface FilteredInstanceDeletable {
+    /**
+     * Deletes instance filter by the given instance filter and clean up categories.
+     * 
+     * @param filter instance filter
+     * @param categories clean up categories
+     * @return returns the number of instances that are deleted
+     */
+    int deleteInstances(InstanceFilter filter, Set<CLEANUP_CATEGORY> categories);
+}

Modified: ode/branches/APACHE_ODE_1.X/bpel-epr/src/main/java/org/apache/ode/il/MockScheduler.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-epr/src/main/java/org/apache/ode/il/MockScheduler.java?rev=780562&r1=780561&r2=780562&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.X/bpel-epr/src/main/java/org/apache/ode/il/MockScheduler.java (original)
+++ ode/branches/APACHE_ODE_1.X/bpel-epr/src/main/java/org/apache/ode/il/MockScheduler.java Mon Jun  1 05:07:35 2009
@@ -298,4 +298,8 @@
 
     public void setPolledRunnableProcesser(JobProcessor delegatedRunnableProcessor) {
     }
+
+    public boolean amICoordinator() {
+        return true;
+    }
 }

Modified: ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java?rev=780562&r1=780561&r2=780562&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java (original)
+++ ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java Mon Jun  1 05:07:35 2009
@@ -38,6 +38,7 @@
 import org.apache.ode.bpel.dao.BpelDAOConnectionFactory;
 import org.apache.ode.bpel.dao.DeferredProcessInstanceCleanable;
 import org.apache.ode.bpel.dao.ProcessDAO;
+import org.apache.ode.bpel.engine.cron.CronScheduler;
 import org.apache.ode.bpel.engine.migration.MigrationHandler;
 import org.apache.ode.bpel.evar.ExternalVariableModule;
 import org.apache.ode.bpel.evt.BpelEvent;
@@ -493,6 +494,10 @@
         _contexts.scheduler = scheduler;
     }
 
+    public void setCronScheduler(CronScheduler cronScheduler) throws BpelEngineException {
+        _contexts.cronScheduler = cronScheduler;
+    }
+
     public void setEndpointReferenceContext(EndpointReferenceContext eprContext) throws BpelEngineException {
         _contexts.eprContext = eprContext;
     }

Modified: ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/Contexts.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/Contexts.java?rev=780562&r1=780561&r2=780562&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/Contexts.java (original)
+++ ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/Contexts.java Mon Jun  1 05:07:35 2009
@@ -26,6 +26,7 @@
 import org.apache.ode.bpel.iapi.MessageExchangeContext;
 import org.apache.ode.bpel.iapi.Scheduler;
 import org.apache.ode.bpel.intercept.MessageExchangeInterceptor;
+import org.apache.ode.bpel.engine.cron.CronScheduler;
 import org.apache.ode.bpel.evar.ExternalVariableModule;
 
 import java.util.HashMap;
@@ -39,11 +40,12 @@
  * integration layer.
  */
 public class Contexts {
-
     MessageExchangeContext mexContext;
 
     public Scheduler scheduler;
 
+    public CronScheduler cronScheduler;
+
     EndpointReferenceContext eprContext;
 
     BindingContext bindingContext;

Added: ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/cron/CronScheduler.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/cron/CronScheduler.java?rev=780562&view=auto
==============================================================================
--- ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/cron/CronScheduler.java (added)
+++ ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/cron/CronScheduler.java Mon Jun  1 05:07:35 2009
@@ -0,0 +1,296 @@
+package org.apache.ode.bpel.engine.cron;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+
+import javax.xml.namespace.QName;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ode.bpel.engine.Contexts;
+import org.apache.ode.bpel.engine.BpelServerImpl.ContextsAware;
+import org.apache.ode.bpel.iapi.ClusterAware;
+import org.apache.ode.bpel.iapi.ProcessConf;
+import org.apache.ode.bpel.iapi.ProcessConf.CronJob;
+import org.apache.ode.bpel.iapi.Scheduler.MapSerializableRunnable;
+import org.apache.ode.utils.CronExpression;
+
+public class CronScheduler {
+    static final Log __log = LogFactory.getLog(CronScheduler.class);
+
+    // minimum interval of the cron job(1 second)
+    private final long MIN_INTERVAL = 0;
+    // if the work is schedule too late from the due time, skip it
+    private final long TOLERABLE_SCHEDULE_DELAY = 0;
+
+    private ExecutorService _scheduledTaskExec;
+    
+    private Contexts _contexts;
+    
+    private final Timer _schedulerTimer = new Timer("CronScheduler", true);
+
+    private final Collection<TerminationListener> _systemTerminationListeners = new ArrayList<TerminationListener>();
+    
+    private final Map<QName, Collection<TerminationListener>> _terminationListenersByPid = new HashMap<QName, Collection<CronScheduler.TerminationListener>>();
+    
+    private volatile boolean _shuttingDown = false;
+    
+    public void setScheduledTaskExec(ExecutorService taskExec) {
+        _scheduledTaskExec = taskExec;
+    }
+    
+    public void setContexts(Contexts _contexts) {
+        this._contexts = _contexts;
+    }
+    
+    public void shutdown() {
+        _shuttingDown = true;
+        _schedulerTimer.cancel();
+    }
+    
+    public void cancelProcessCronJobs(QName pid, boolean undeployed) {
+        assert pid != null;
+        
+        if( __log.isInfoEnabled() ) __log.info("Cancelling PROCESS CRON jobs for: " + pid);
+        Collection<TerminationListener> listenersToTerminate = new ArrayList<TerminationListener>();
+
+        synchronized( _terminationListenersByPid ) {
+            Collection<TerminationListener> listeners = _terminationListenersByPid.get(pid);
+            if( listeners != null ) {
+                listenersToTerminate.addAll(listeners);
+            }
+            if( undeployed ) {
+                _terminationListenersByPid.remove(pid);
+            }
+        }
+        
+        // terminate existing cron jobs if there are
+        synchronized( pid ) {
+            for( TerminationListener listener : listenersToTerminate ) {
+                listener.terminate();
+            }
+        }
+    }
+    
+    public void scheduleProcessCronJobs(QName pid, ProcessConf pconf) {
+        if( _shuttingDown ) {
+            return;
+        }
+        assert pid != null;
+        
+        cancelProcessCronJobs(pid, false);
+        Collection<TerminationListener> newListeners = new ArrayList<TerminationListener>();
+        
+        synchronized( pid ) {
+            if( __log.isInfoEnabled() ) __log.info("Scheduling PROCESS CRON jobs for: " + pid);
+            
+            // start new cron jobs
+            for( final CronJob job : pconf.getCronJobs() ) {
+                if( __log.isDebugEnabled() ) __log.debug("Scheduling PROCESS CRON job: " + job.getCronExpression() + " for: " + pid);
+                // for each different scheduled time
+                Runnable runnable = new Runnable() {
+                    public void run() {
+                        if( __log.isDebugEnabled() ) __log.debug("Running cron cleanup with details list size: " + job.getRunnableDetailList().size());
+                        for( Map<String, Object> details : job.getRunnableDetailList() ) {
+                            try {
+                                // for each clean up for the scheduled time
+                                RuntimeDataCleanupRunnable cleanup = new RuntimeDataCleanupRunnable();
+                                cleanup.restoreFromDetailsMap(details);
+                                cleanup.setContexts(_contexts);
+                                cleanup.run();
+                                if( __log.isDebugEnabled() ) __log.debug("Finished running runtime data cleanup from a PROCESS CRON job: " + cleanup);
+                            } catch(Exception re) {
+                                __log.error("Error during runtime data cleanup from a PROCESS CRON: " + details + "; check your cron settings in deploy.xml.", re);
+                                // don't sweat.. the rest of the system and other cron jobs still should work
+                            }
+                        }
+                    }
+                };
+                newListeners.add(schedule(job.getCronExpression(), runnable, null, null));
+            }
+        }
+        
+        // make sure the pid does not get into the terminationListener map if no cron is setup 
+        if( !newListeners.isEmpty() ) {
+            synchronized( _terminationListenersByPid ) {
+                Collection<TerminationListener> oldListeners = _terminationListenersByPid.get(pid);
+                if( oldListeners == null ) {
+                    _terminationListenersByPid.put(pid, newListeners);
+                } else {
+                    oldListeners.addAll(newListeners);
+                }
+            }
+        }
+    }
+    
+    public void refreshSystemCronJobs(SystemSchedulesConfig systemSchedulesConf) {
+        if( _shuttingDown ) {
+            return;
+        }
+
+        synchronized( _systemTerminationListeners) {
+            if( __log.isInfoEnabled() ) __log.info("Refreshing SYSTEM CRON jobs.");
+
+            try {
+                // if error thrown on reading the schedules.xml, do not cancel existing cron jobs
+                List<CronJob> systemCronJobs = systemSchedulesConf.getSystemCronJobs();
+                
+                // cancel cron jobs
+                for( TerminationListener listener : _systemTerminationListeners ) {
+                    listener.terminate();
+                }
+                _systemTerminationListeners.clear();
+                
+                // start new cron jobs
+                for( final CronJob job : systemCronJobs ) {
+                    if( __log.isDebugEnabled() ) __log.debug("Scheduling SYSTEM CRON job:" + job);
+                    // for each different scheduled time
+                    Runnable runnable = new Runnable() {
+                        public void run() {
+                            for( Map<String, Object> details : job.getRunnableDetailList() ) {
+                                try {
+                                    // for now, we have only runtime data cleanup cron job defined
+                                    // for each clean up for the scheduled time
+                                    RuntimeDataCleanupRunnable cleanup = new RuntimeDataCleanupRunnable();
+                                    synchronized( _terminationListenersByPid ) {
+                                        if( !_terminationListenersByPid.isEmpty() ) {
+                                            details.put("pidsToExclude", _terminationListenersByPid.keySet());
+                                        }
+                                    }
+                                    cleanup.restoreFromDetailsMap(details);
+                                    cleanup.setContexts(_contexts);
+                                    cleanup.run();
+                                    if( __log.isDebugEnabled() ) __log.debug("Finished running runtime data cleanup from a SYSTEM CRON job:" + cleanup);
+                                } catch( Exception e ) {
+                                    __log.error("Error running a runtime data cleanup from a SYSTEM CRON job: " + details + "; check your system cron setup.", e);
+                                }
+                            }
+                        }
+                    };
+                    _systemTerminationListeners.add(schedule(job.getCronExpression(), runnable, null, null));
+                }
+            } catch( Exception e ) {
+                __log.error("Error during refreshing SYSTEM CRON schedules: ", e);
+            }
+        }
+    }
+
+    public TerminationListener schedule(final CronExpression cronExpression, 
+            final Runnable runnable, final Map<String, Object> runnableDetails, 
+            TerminationListener terminationListener) {
+        if( _shuttingDown ) {
+            __log.info("CRON Scheduler is being shut down. This new scheduling request is ignored.");
+            return new TerminationListener() {
+                public void terminate() {
+                    // do nothing
+                }
+            };
+        }
+        
+        assert cronExpression != null;
+        assert runnable != null;
+        
+        final Date nextScheduleTime = cronExpression.getNextValidTimeAfter(new Date(
+                System.currentTimeMillis() + MIN_INTERVAL));
+        final CronScheduledJob job = new CronScheduledJob(nextScheduleTime, runnable, runnableDetails, cronExpression, terminationListener);
+        if( __log.isDebugEnabled() ) __log.debug("CRON will run in " + (nextScheduleTime.getTime() - System.currentTimeMillis()) + "ms.");
+        
+        try {
+            _schedulerTimer.schedule(new TimerTask() {
+                @Override
+                public void run() {
+                    __log.debug("Cron scheduling timer kicked in: " + cronExpression);
+                    // run only if the current node is the coordinator, 
+                    // with the SimpleScheduler, the node is always the coordinator
+                    if( !(_contexts.scheduler instanceof ClusterAware) 
+                            || ((ClusterAware)_contexts.scheduler).amICoordinator() ) {
+                        // do not hold the timer thread too long, submit the work to an executorService
+                        _scheduledTaskExec.submit(job);
+                        __log.debug("CRON job scheduled " + runnable);
+                    }
+                }
+            }, nextScheduleTime);
+        } catch( IllegalStateException ise ) {
+            if( _shuttingDown ) {
+                __log.info("CRON Scheduler is being shut down. This new scheduling request is ignored.");
+            } else {
+                throw ise;
+            }
+        }
+
+        return job.terminationListener;
+    }
+    
+    public interface TerminationListener {
+        void terminate();
+    }
+    
+    private class CronScheduledJob implements Callable<TerminationListener> {
+        private volatile boolean terminated = false;
+        private Date nextScheduleTime;
+        private Runnable runnable;
+        private Map<String, Object> runnableDetails;
+        private CronExpression cronExpression;
+        private TerminationListener terminationListener;
+        
+        public CronScheduledJob(Date nextScheduleTime,
+                Runnable runnable, Map<String, Object> runnableDetails,
+                CronExpression cronExpression, TerminationListener terminationListener) {
+            this.nextScheduleTime = nextScheduleTime;
+            this.runnable = runnable;
+            this.runnableDetails = runnableDetails;
+            this.cronExpression = cronExpression;
+            if( terminationListener == null ) {
+                terminationListener = new TerminationListener() {
+                    public void terminate() {
+                        terminated = true;
+                    }
+                };
+            }
+            this.terminationListener = terminationListener;
+        }
+
+        public TerminationListener call() throws Exception {
+            try {
+                if( TOLERABLE_SCHEDULE_DELAY == 0 ||
+                    nextScheduleTime.getTime() < System.currentTimeMillis() + TOLERABLE_SCHEDULE_DELAY) {
+                    if( runnableDetails != null && 
+                            runnable instanceof MapSerializableRunnable ) {
+                        ((MapSerializableRunnable)runnable).restoreFromDetailsMap(runnableDetails);
+                    }
+                    if (runnable instanceof ContextsAware) {
+                        ((ContextsAware) runnable).setContexts(_contexts);
+                    }
+                    if( !_shuttingDown && !terminated ) {
+                        __log.debug("Running CRON job: " + runnable + " for " + nextScheduleTime.getTime());
+                        runnable.run();
+                    }
+                } else {
+                    // ignore the scheduling.. it will be scheduled later
+                }
+            } catch( Exception e ) {
+                if( _shuttingDown ) {
+                    __log.info("A cron job threw an Exception during ODE shutdown: " + e.getMessage() + ", you can ignore the error.");
+                } else if( e instanceof RuntimeException ) {
+                    throw e;
+                } else {
+                    throw new RuntimeException("Exception during running cron scheduled job: " + runnable, e);
+                }
+            } finally {
+                if( !_shuttingDown && !terminated ) {
+                    schedule(cronExpression, runnable, runnableDetails, terminationListener);
+                }
+            }
+            
+            return terminationListener;
+        }
+    }
+}
\ No newline at end of file

Added: ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/cron/RuntimeDataCleanupRunnable.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/cron/RuntimeDataCleanupRunnable.java?rev=780562&view=auto
==============================================================================
--- ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/cron/RuntimeDataCleanupRunnable.java (added)
+++ ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/cron/RuntimeDataCleanupRunnable.java Mon Jun  1 05:07:35 2009
@@ -0,0 +1,99 @@
+package org.apache.ode.bpel.engine.cron;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+
+import javax.xml.namespace.QName;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ode.bpel.common.InstanceFilter;
+import org.apache.ode.bpel.dao.BpelDAOConnection;
+import org.apache.ode.bpel.dao.FilteredInstanceDeletable;
+import org.apache.ode.bpel.engine.Contexts;
+import org.apache.ode.bpel.engine.BpelServerImpl.ContextsAware;
+import org.apache.ode.bpel.iapi.ProcessConf.CLEANUP_CATEGORY;
+import org.apache.ode.bpel.iapi.ProcessConf.CleanupInfo;
+import org.apache.ode.bpel.iapi.Scheduler.MapSerializableRunnable;
+
+public class RuntimeDataCleanupRunnable implements MapSerializableRunnable, ContextsAware {
+    private final Log _log = LogFactory.getLog(RuntimeDataCleanupRunnable.class);
+
+    private static final long serialVersionUID = 1L;
+
+    private transient Contexts _contexts;
+
+    private int _transactionSize;
+    private CleanupInfo _cleanupInfo;
+    private QName _pid;
+    private Set<QName> _pidsToExclude;
+    
+    public RuntimeDataCleanupRunnable() {
+    }
+    
+    @SuppressWarnings("unchecked")
+    public void restoreFromDetailsMap(Map<String, Object> details) {
+        _cleanupInfo = (CleanupInfo)details.get("cleanupInfo");
+        _transactionSize = (Integer)details.get("transactionSize");
+        _pid = (QName)details.get("pid");
+        _pidsToExclude = (Set<QName>)details.get("pidsToExclude");
+    }
+
+    public void storeToDetailsMap(Map<String, Object> details) {
+        // we don't serialize
+    }
+
+    public void setContexts(Contexts contexts) {
+        _contexts = contexts;
+    }
+    
+    public void run() {
+        _log.info("CleanInstances.run().");
+        for( String filter : _cleanupInfo.getFilters() ) {
+            _log.info("CleanInstances.run(" + filter + ")");
+            long numberOfDeletedInstances = 0;
+            do {
+                numberOfDeletedInstances = cleanInstances(filter, _cleanupInfo.getCategories(), _transactionSize);
+            } while( numberOfDeletedInstances == _transactionSize );
+        }
+    }
+    
+    int cleanInstances(String filter, final Set<CLEANUP_CATEGORY> categories, int limit) {
+        if( _pid != null ) {
+            filter += " pid=" + _pid;
+        } else if( _pidsToExclude != null ) {
+            StringBuffer pids = new StringBuffer();
+            for( QName pid : _pidsToExclude ) {
+                if( pids.length() > 0 ) {
+                    pids.append("|");
+                }
+                pids.append(pid);
+            }
+            filter += " pid<>" + pids.toString();
+        }
+        
+        _log.debug("CleanInstances using filter: " + filter + ", limit: " + limit);
+        
+        final InstanceFilter instanceFilter = new InstanceFilter(filter, "", limit);
+        try {
+            if( _contexts.scheduler != null ) {
+                return _contexts.scheduler.execTransaction(new Callable<Integer>() {
+                    public Integer call() throws Exception {
+                        BpelDAOConnection con = _contexts.dao.getConnection();
+                        if( con instanceof FilteredInstanceDeletable ) {
+                            return ((FilteredInstanceDeletable)con).deleteInstances(instanceFilter, categories);
+                        }
+                        return 0;
+                    }
+                });
+            } else {
+                return 0;
+            }
+        } catch (RuntimeException re) {
+            throw re;
+        } catch (Exception e) {
+            throw new RuntimeException("Exception while listing instances: ",  e);
+        }
+    }
+}

Propchange: ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/cron/RuntimeDataCleanupRunnable.java
------------------------------------------------------------------------------
    svn:mergeinfo = 

Added: ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/cron/SystemSchedulesConfig.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/cron/SystemSchedulesConfig.java?rev=780562&view=auto
==============================================================================
--- ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/cron/SystemSchedulesConfig.java (added)
+++ ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/cron/SystemSchedulesConfig.java Mon Jun  1 05:07:35 2009
@@ -0,0 +1,103 @@
+package org.apache.ode.bpel.engine.cron;
+
+import java.io.File;
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ode.bpel.schedules.SchedulesDocument;
+import org.apache.ode.bpel.schedules.TSchedule;
+import org.apache.ode.bpel.dd.TCleanup;
+import org.apache.ode.bpel.iapi.ContextException;
+import org.apache.ode.bpel.iapi.ProcessConf.CleanupInfo;
+import org.apache.ode.bpel.iapi.ProcessConf.CronJob;
+import org.apache.ode.store.ProcessCleanupConfImpl;
+import org.apache.ode.utils.CronExpression;
+import org.apache.xmlbeans.XmlOptions;
+
+public class SystemSchedulesConfig {
+    private final static Log __log = LogFactory.getLog(SystemSchedulesConfig.class);
+    
+    public final static String SCHEDULE_CONFIG_FILE_PROP_KEY = "org.apache.ode.scheduleConfigFile";
+    private File schedulesFile;
+    
+    public SystemSchedulesConfig(File configRoot) {
+        String scheduleConfigFile = System.getProperty(SCHEDULE_CONFIG_FILE_PROP_KEY);
+        if( scheduleConfigFile != null ) {
+            schedulesFile = new File(scheduleConfigFile);
+            if( !new File(scheduleConfigFile).exists()) {
+                __log.warn("A custom location for schedules has been set. However, the file does not exist at the location: " 
+                        + schedulesFile.getAbsolutePath() + ". The file will be read when one gets created.");
+            }
+        } else {
+            assert configRoot != null;
+            schedulesFile = new File(configRoot, "schedules.xml");
+        }
+        __log.info("SYSTEM CRON configuration: " + schedulesFile.getAbsolutePath());
+    }
+    
+    public File getSchedulesFile() {
+        return schedulesFile;
+    }
+    
+    /**
+     * Returns the list of cron jobs configured for all processes. This call returns
+     * a fresh snapshot.
+     * 
+     * @return the list of cron jobs
+     */
+    public List<CronJob> getSystemCronJobs() {
+        List<CronJob> jobs = new ArrayList<CronJob>();
+        
+        if( schedulesFile != null && schedulesFile.exists() ) {
+            for(TSchedule schedule : getSystemSchedulesDocument().getSchedules().getScheduleList()) {
+                CronJob job = new CronJob();
+                try {
+                    job.setCronExpression(new CronExpression(schedule.getWhen()));
+                    for(final TCleanup aCleanup : schedule.getCleanupList()) {
+                        CleanupInfo cleanupInfo = new CleanupInfo();
+                        assert !aCleanup.getFilterList().isEmpty();
+                        cleanupInfo.setFilters(aCleanup.getFilterList());
+                        ProcessCleanupConfImpl.processACleanup(cleanupInfo.getCategories(), aCleanup.getCategoryList());
+                        
+                        Map<String, Object> runnableDetails = new HashMap<String, Object>();
+                        runnableDetails.put("cleanupInfo", cleanupInfo);
+                        runnableDetails.put("transactionSize", 10);
+                        job.getRunnableDetailList().add(runnableDetails);
+                        __log.info("SYSTEM CRON configuration added a runtime data cleanup: " + runnableDetails);
+                    }
+                    jobs.add(job);
+                } catch( ParseException pe ) {
+                    __log.error("Exception during parsing the schedule cron expression: " + schedule.getWhen() + ", skipped the scheduled job.", pe);
+                }
+            }
+        }
+        
+        __log.info("SYSTEM CRON configuration found cron jobs: " + jobs);
+        return jobs;
+    }
+
+    @SuppressWarnings("unchecked")
+    private SchedulesDocument getSystemSchedulesDocument() {
+        SchedulesDocument sd = null;
+        
+        try {
+            XmlOptions options = new XmlOptions();
+            HashMap otherNs = new HashMap();
+
+            otherNs.put("http://ode.fivesight.com/schemas/2006/06/27/dd",
+                    "http://www.apache.org/ode/schemas/schedules/2009/05");
+            options.setLoadSubstituteNamespaces(otherNs);
+            sd = SchedulesDocument.Factory.parse(schedulesFile, options);
+        } catch (Exception e) {
+            throw new ContextException("Couldn't read schedule descriptor at location "
+                    + schedulesFile.getAbsolutePath(), e);
+        }
+    
+        return sd;
+    }
+}
\ No newline at end of file

Added: ode/branches/APACHE_ODE_1.X/bpel-runtime/src/test/java/org/apache/ode/bpel/engine/cron/CronSchedulerTest.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-runtime/src/test/java/org/apache/ode/bpel/engine/cron/CronSchedulerTest.java?rev=780562&view=auto
==============================================================================
--- ode/branches/APACHE_ODE_1.X/bpel-runtime/src/test/java/org/apache/ode/bpel/engine/cron/CronSchedulerTest.java (added)
+++ ode/branches/APACHE_ODE_1.X/bpel-runtime/src/test/java/org/apache/ode/bpel/engine/cron/CronSchedulerTest.java Mon Jun  1 05:07:35 2009
@@ -0,0 +1,92 @@
+package org.apache.ode.bpel.engine.cron;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+
+import javax.xml.namespace.QName;
+
+import org.apache.log4j.BasicConfigurator;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+import org.apache.ode.bpel.engine.Contexts;
+import org.apache.ode.bpel.engine.cron.CronScheduler;
+import org.apache.ode.bpel.engine.cron.RuntimeDataCleanupRunnable;
+import org.apache.ode.bpel.iapi.Scheduler;
+import org.apache.ode.bpel.iapi.ProcessConf.CLEANUP_CATEGORY;
+import org.apache.ode.bpel.iapi.ProcessConf.CleanupInfo;
+import org.apache.ode.daohib.bpel.BpelDAOConnectionImpl;
+import org.apache.ode.utils.CronExpression;
+import org.jmock.Mock;
+import org.jmock.MockObjectTestCase;
+
+public class CronSchedulerTest extends MockObjectTestCase {
+    
+    private Contexts contexts;
+    private Mock scheduler;
+    private CronScheduler cronScheduler;
+    private ExecutorService execService;
+
+    static {
+        BasicConfigurator.configure();
+        LogManager.getRootLogger().setLevel(Level.DEBUG);
+        LogManager.getLogger(RuntimeDataCleanupRunnable.class).setLevel(Level.DEBUG);
+        LogManager.getLogger(BpelDAOConnectionImpl.class).setLevel(Level.DEBUG);
+        LogManager.getLogger("org.apache.ode").setLevel(Level.DEBUG);
+    }
+    
+    protected void setUp() throws Exception {
+        contexts = new Contexts();
+        scheduler = mock(Scheduler.class);
+        contexts.scheduler = (Scheduler)scheduler.proxy();
+        
+        cronScheduler = new CronScheduler();
+        cronScheduler.setContexts(contexts);
+        execService = Executors.newCachedThreadPool(new ThreadFactory() {
+            int threadNumber = 0;
+            public Thread newThread(Runnable r) {
+                threadNumber += 1;
+                Thread t = new Thread(r, "LongRunning-"+threadNumber);
+                t.setDaemon(true);
+                return t;
+            }
+        });
+        cronScheduler.setScheduledTaskExec(execService);
+    }
+    
+    private class NotifyingTerminationListener implements CronScheduler.TerminationListener {
+        boolean finished = false;
+        
+        public synchronized void terminate() {
+            finished = true;
+            notify();
+        }
+    }
+    
+    public void testNull() throws Exception {}
+    
+    public void _testCleanup() throws Exception {
+        CronExpression cronExpr = new CronExpression("* * * * * ?");
+        RuntimeDataCleanupRunnable runnable = new RuntimeDataCleanupRunnable();
+        
+        Map<String, Object> details = new HashMap<String, Object>();
+        details.put("pid", new QName("test"));
+        details.put("transactionSize", 10);
+        CleanupInfo cleanupInfo = new CleanupInfo();
+        cleanupInfo.getFilters().add("a=b");
+        cleanupInfo.getCategories().add(CLEANUP_CATEGORY.CORRELATIONS);
+        details.put("cleanupInfo", cleanupInfo);
+        runnable.restoreFromDetailsMap(details);
+        runnable.setContexts(contexts);
+        
+        NotifyingTerminationListener listener = new NotifyingTerminationListener();
+        cronScheduler.schedule(cronExpr, runnable, null, listener);
+        while( !listener.finished ) {
+            synchronized(listener) {
+                listener.wait();
+            }
+        }
+    }
+}
\ No newline at end of file

Modified: ode/branches/APACHE_ODE_1.X/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/MockBpelServer.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/MockBpelServer.java?rev=780562&r1=780561&r2=780562&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.X/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/MockBpelServer.java (original)
+++ ode/branches/APACHE_ODE_1.X/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/MockBpelServer.java Mon Jun  1 05:07:35 2009
@@ -354,5 +354,9 @@
 
         public void setPolledRunnableProcesser(JobProcessor delegatedRunnableProcessor) {
         }
+
+		public boolean amICoordinator() {
+			return true;
+		}
     }
 }

Modified: ode/branches/APACHE_ODE_1.X/bpel-schemas/src/main/xsd/dd.xsd
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-schemas/src/main/xsd/dd.xsd?rev=780562&r1=780561&r2=780562&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.X/bpel-schemas/src/main/xsd/dd.xsd (original)
+++ ode/branches/APACHE_ODE_1.X/bpel-schemas/src/main/xsd/dd.xsd Mon Jun  1 05:07:35 2009
@@ -23,7 +23,8 @@
            xmlns:dd="http://www.apache.org/ode/schemas/dd/2007/03"
            elementFormDefault="qualified">
 
-    <xs:element name="deploy" id="deploy" type="dd:tDeployment"/>
+    <xs:element name="deploy" id="deploy" type="dd:tDeployment">
+    </xs:element>
 
     <xs:complexType name="tDeployment">
         <xs:sequence>
@@ -93,9 +94,9 @@
                             </xs:annotation>
                         </xs:element>
 
-                        <xs:element name="cleanup"
-                            minOccurs="0" maxOccurs="3"
-                            type="dd:tCleanup" />
+                        <xs:element name="cleanup" minOccurs="0" maxOccurs="3" type="dd:tCleanup" />
+                        <xs:element name="schedule" minOccurs="0" maxOccurs="unbounded" type="dd:tSchedule">
+                        </xs:element>
                     </xs:sequence>
 
                     <xs:attribute name="name" type="xs:QName" use="required"/>
@@ -206,8 +207,9 @@
                     </xs:restriction>
                 </xs:simpleType>
             </xs:element>
+            <xs:element name="filter" type="xs:string" minOccurs="0" maxOccurs="unbounded"/>
         </xs:sequence>
-        <xs:attribute name="on" use="required">
+        <xs:attribute name="on" use="optional">
             <xs:simpleType>
                 <xs:restriction base="xs:string">
                     <xs:enumeration value="success" />
@@ -217,4 +219,12 @@
             </xs:simpleType>
         </xs:attribute>
     </xs:complexType>
+
+    <xs:complexType name="tSchedule">
+        <xs:sequence>
+            <xs:element name="cleanup" type="dd:tCleanup" minOccurs="0" maxOccurs="unbounded">
+            </xs:element>
+        </xs:sequence>
+        <xs:attribute name="when" type="xs:string" use="required"></xs:attribute>
+    </xs:complexType>
 </xs:schema>

Added: ode/branches/APACHE_ODE_1.X/bpel-schemas/src/main/xsd/schedules.xsd
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-schemas/src/main/xsd/schedules.xsd?rev=780562&view=auto
==============================================================================
--- ode/branches/APACHE_ODE_1.X/bpel-schemas/src/main/xsd/schedules.xsd (added)
+++ ode/branches/APACHE_ODE_1.X/bpel-schemas/src/main/xsd/schedules.xsd Mon Jun  1 05:07:35 2009
@@ -0,0 +1,46 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+
+<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"
+	targetNamespace="http://www.apache.org/ode/schemas/schedules/2009/05"
+	xmlns:schedules="http://www.apache.org/ode/schemas/schedules/2009/05"
+	elementFormDefault="qualified" xmlns:dd="http://www.apache.org/ode/schemas/dd/2007/03">
+
+	<xs:import namespace="http://www.apache.org/ode/schemas/dd/2007/03" schemaLocation="dd.xsd"></xs:import>
+	<xs:element name="schedules" id="schedules"
+		type="schedules:tSchedules" />
+
+
+	<xs:complexType name="tSchedule">
+		<xs:sequence>
+			<xs:element name="cleanup" type="dd:tCleanup"
+				minOccurs="0" maxOccurs="unbounded">
+			</xs:element>
+		</xs:sequence>
+		<xs:attribute name="when" type="xs:string" use="required"></xs:attribute>
+	</xs:complexType>
+
+
+	<xs:complexType name="tSchedules">
+		<xs:sequence maxOccurs="unbounded" minOccurs="0">
+			<xs:element name="schedule" type="schedules:tSchedule"></xs:element>
+		</xs:sequence>
+	</xs:complexType>
+</xs:schema>

Added: ode/branches/APACHE_ODE_1.X/bpel-schemas/src/main/xsd/schedules.xsdconfig
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-schemas/src/main/xsd/schedules.xsdconfig?rev=780562&view=auto
==============================================================================
--- ode/branches/APACHE_ODE_1.X/bpel-schemas/src/main/xsd/schedules.xsdconfig (added)
+++ ode/branches/APACHE_ODE_1.X/bpel-schemas/src/main/xsd/schedules.xsdconfig Mon Jun  1 05:07:35 2009
@@ -0,0 +1,32 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+
+<xb:config xmlns:xb="http://xml.apache.org/xmlbeans/2004/02/xbean/config"
+           xmlns:dd="http://www.apache.org/ode/schemas/schedules/2009/05">
+
+  <xb:namespace uri="http://www.apache.org/ode/schemas/schedules/2009/05">
+    <xb:package>org.apache.ode.bpel.schedules</xb:package>
+  </xb:namespace> 
+  
+  <xb:namespace uri="##any">
+    <xb:prefix></xb:prefix>
+    <xb:suffix></xb:suffix>
+  </xb:namespace>
+
+</xb:config>

Modified: ode/branches/APACHE_ODE_1.X/bpel-store/src/main/java/org/apache/ode/store/ProcessCleanupConfImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-store/src/main/java/org/apache/ode/store/ProcessCleanupConfImpl.java?rev=780562&r1=780561&r2=780562&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.X/bpel-store/src/main/java/org/apache/ode/store/ProcessCleanupConfImpl.java (original)
+++ ode/branches/APACHE_ODE_1.X/bpel-store/src/main/java/org/apache/ode/store/ProcessCleanupConfImpl.java Mon Jun  1 05:07:35 2009
@@ -31,60 +31,60 @@
 import org.apache.ode.bpel.iapi.ProcessConf.CLEANUP_CATEGORY;
 
 public class ProcessCleanupConfImpl {
-	protected static Log __log = LogFactory.getLog(ProcessCleanupConfImpl.class);
-	
-	private final Set<CLEANUP_CATEGORY> successCategories = EnumSet.noneOf(CLEANUP_CATEGORY.class);
-	private final Set<CLEANUP_CATEGORY> failureCategories = EnumSet.noneOf(CLEANUP_CATEGORY.class);
-	
-	// package default
+    protected static Log __log = LogFactory.getLog(ProcessCleanupConfImpl.class);
+    
+    private final Set<CLEANUP_CATEGORY> successCategories = EnumSet.noneOf(CLEANUP_CATEGORY.class);
+    private final Set<CLEANUP_CATEGORY> failureCategories = EnumSet.noneOf(CLEANUP_CATEGORY.class);
+    
+    // package default
     ProcessCleanupConfImpl(TDeployment.Process pinfo) {
-		for( TCleanup cleanup : pinfo.getCleanupList() ) {
-			if( cleanup.getOn() == TCleanup.On.SUCCESS || cleanup.getOn() == TCleanup.On.ALWAYS ) {
-				processACleanup(successCategories, cleanup.getCategoryList());
-			}
-			if( cleanup.getOn() == TCleanup.On.FAILURE || cleanup.getOn() == TCleanup.On.ALWAYS ) {
-				processACleanup(failureCategories, cleanup.getCategoryList());
-			}
-		}
-		
-		// validate configurations
-		Set<CLEANUP_CATEGORY> categories = getCleanupCategories(true);
-		if( categories.contains(CLEANUP_CATEGORY.INSTANCE) && !categories.containsAll(EnumSet.of(CLEANUP_CATEGORY.CORRELATIONS, CLEANUP_CATEGORY.VARIABLES))) {
-			throw new ContextException("Cleanup configuration error: the instance category requires both the correlations and variables categories specified together!!!");
-		}
-		categories = getCleanupCategories(false);
-		if( categories.contains(CLEANUP_CATEGORY.INSTANCE) && !categories.containsAll(EnumSet.of(CLEANUP_CATEGORY.CORRELATIONS, CLEANUP_CATEGORY.VARIABLES))) {
-			throw new ContextException("Cleanup configuration error: the instance category requires both the correlations and variables categories specified together!!!");
-		}
+        for( TCleanup cleanup : pinfo.getCleanupList() ) {
+            if( cleanup.getOn() == TCleanup.On.SUCCESS || cleanup.getOn() == TCleanup.On.ALWAYS ) {
+                processACleanup(successCategories, cleanup.getCategoryList());
+            }
+            if( cleanup.getOn() == TCleanup.On.FAILURE || cleanup.getOn() == TCleanup.On.ALWAYS ) {
+                processACleanup(failureCategories, cleanup.getCategoryList());
+            }
+        }
+        
+        // validate configurations
+        Set<CLEANUP_CATEGORY> categories = getCleanupCategories(true);
+        if( categories.contains(CLEANUP_CATEGORY.INSTANCE) && !categories.containsAll(EnumSet.of(CLEANUP_CATEGORY.CORRELATIONS, CLEANUP_CATEGORY.VARIABLES))) {
+            throw new ContextException("Cleanup configuration error: the instance category requires both the correlations and variables categories specified together!!!");
+        }
+        categories = getCleanupCategories(false);
+        if( categories.contains(CLEANUP_CATEGORY.INSTANCE) && !categories.containsAll(EnumSet.of(CLEANUP_CATEGORY.CORRELATIONS, CLEANUP_CATEGORY.VARIABLES))) {
+            throw new ContextException("Cleanup configuration error: the instance category requires both the correlations and variables categories specified together!!!");
+        }
     }
 
-    private void processACleanup(Set<CLEANUP_CATEGORY> categories, List<TCleanup.Category.Enum> categoryList) {
-		if( categoryList.isEmpty() ) {
-			// add all categories
-			categories.addAll(EnumSet.allOf(CLEANUP_CATEGORY.class));
-		} else {
-			for( TCleanup.Category.Enum aCategory : categoryList ) {
-				if( aCategory == TCleanup.Category.ALL) {
-					// add all categories
-					categories.addAll(EnumSet.allOf(CLEANUP_CATEGORY.class));
-				} else {
-					categories.add(CLEANUP_CATEGORY.fromString(aCategory.toString()));
-				}
-			}
-		}
+    public static void processACleanup(Set<CLEANUP_CATEGORY> categories, List<TCleanup.Category.Enum> categoryList) {
+        if( categoryList.isEmpty() ) {
+            // add all categories
+            categories.addAll(EnumSet.allOf(CLEANUP_CATEGORY.class));
+        } else {
+            for( TCleanup.Category.Enum aCategory : categoryList ) {
+                if( aCategory == TCleanup.Category.ALL) {
+                    // add all categories
+                    categories.addAll(EnumSet.allOf(CLEANUP_CATEGORY.class));
+                } else {
+                    categories.add(CLEANUP_CATEGORY.fromString(aCategory.toString()));
+                }
+            }
+        }
     }
     
     // package default
     boolean isCleanupCategoryEnabled(boolean instanceSucceeded, CLEANUP_CATEGORY category) {
-    	if( instanceSucceeded ) {
-    		return successCategories.contains(category);
-    	} else {
-    		return failureCategories.contains(category);
-    	}
+        if( instanceSucceeded ) {
+            return successCategories.contains(category);
+        } else {
+            return failureCategories.contains(category);
+        }
     }
     
     // package default
     Set<CLEANUP_CATEGORY> getCleanupCategories(boolean instanceSucceeded) {
-    	return instanceSucceeded ? successCategories : failureCategories;
+        return instanceSucceeded ? successCategories : failureCategories;
     }
 }