You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by se...@apache.org on 2009/06/01 07:07:37 UTC
svn commit: r780562 [2/3] - in /ode/branches/APACHE_ODE_1.X:
axis2-war/src/main/
axis2-war/src/test/java/org/apache/ode/axis2/instancecleanup/
axis2-war/src/test/resources/TestProcessCronCleanup/
axis2-war/src/test/resources/TestSystemCronCleanup/ axis...
Added: ode/branches/APACHE_ODE_1.X/axis2-war/src/test/resources/TestSystemCronCleanup_exclude/TimeService.wsdl
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/axis2-war/src/test/resources/TestSystemCronCleanup_exclude/TimeService.wsdl?rev=780562&view=auto
==============================================================================
--- ode/branches/APACHE_ODE_1.X/axis2-war/src/test/resources/TestSystemCronCleanup_exclude/TimeService.wsdl (added)
+++ ode/branches/APACHE_ODE_1.X/axis2-war/src/test/resources/TestSystemCronCleanup_exclude/TimeService.wsdl Mon Jun 1 05:07:35 2009
@@ -0,0 +1,103 @@
+<?xml version="1.0" encoding="utf-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied. See the License for the
+ ~ specific language governing permissions and limitations
+ ~ under the License.
+ -->
+<wsdl:definitions
+ xmlns:soap="http://schemas.xmlsoap.org/wsdl/soap/"
+ xmlns:tns="http://ws.intalio.com/TimeService/"
+ xmlns:s="http://www.w3.org/2001/XMLSchema"
+ xmlns:http="http://schemas.xmlsoap.org/wsdl/http/"
+ targetNamespace="http://ws.intalio.com/TimeService/"
+ xmlns:wsdl="http://schemas.xmlsoap.org/wsdl/">
+ <wsdl:documentation xmlns:wsdl="http://schemas.xmlsoap.org/wsdl/">A sample Time service</wsdl:documentation>
+ <wsdl:types>
+ <s:schema elementFormDefault="qualified" targetNamespace="http://ws.intalio.com/TimeService/">
+ <s:element name="getUTCTime" type="s:string" />
+ <s:element name="getUTCTimeResponse">
+ <s:complexType>
+ <s:sequence>
+ <s:element minOccurs="0" maxOccurs="1" name="getUTCTimeResult" type="s:string" />
+ </s:sequence>
+ </s:complexType>
+ </s:element>
+ <s:element name="getCityTime">
+ <s:complexType>
+ <s:sequence>
+ <s:element minOccurs="0" maxOccurs="1" name="city" type="s:string" />
+ </s:sequence>
+ </s:complexType>
+ </s:element>
+ <s:element name="getCityTimeResponse">
+ <s:complexType>
+ <s:sequence>
+ <s:element minOccurs="0" maxOccurs="1" name="getCityTimeResult" type="s:string" />
+ </s:sequence>
+ </s:complexType>
+ </s:element>
+ </s:schema>
+ </wsdl:types>
+ <wsdl:message name="getUTCTimeSoapIn">
+ <wsdl:part name="parameters" element="tns:getUTCTime" />
+ </wsdl:message>
+ <wsdl:message name="getUTCTimeSoapOut">
+ <wsdl:part name="parameters" element="tns:getUTCTimeResponse" />
+ </wsdl:message>
+ <wsdl:message name="getCityTimeSoapIn">
+ <wsdl:part name="parameters" element="tns:getCityTime" />
+ </wsdl:message>
+ <wsdl:message name="getCityTimeSoapOut">
+ <wsdl:part name="parameters" element="tns:getCityTimeResponse" />
+ </wsdl:message>
+ <wsdl:portType name="TimeServiceSoap">
+ <wsdl:operation name="getUTCTime">
+ <wsdl:input message="tns:getUTCTimeSoapIn" />
+ <wsdl:output message="tns:getUTCTimeSoapOut" />
+ </wsdl:operation>
+ <wsdl:operation name="getCityTime">
+ <wsdl:input message="tns:getCityTimeSoapIn" />
+ <wsdl:output message="tns:getCityTimeSoapOut" />
+ </wsdl:operation>
+ </wsdl:portType>
+ <wsdl:binding name="TimeServiceSoap" type="tns:TimeServiceSoap">
+ <soap:binding transport="http://schemas.xmlsoap.org/soap/http" />
+ <wsdl:operation name="getUTCTime">
+ <soap:operation soapAction="http://ws.intalio.com/TimeService/getUTCTime" style="document" />
+ <wsdl:input>
+ <soap:body use="literal" />
+ </wsdl:input>
+ <wsdl:output>
+ <soap:body use="literal" />
+ </wsdl:output>
+ </wsdl:operation>
+ <wsdl:operation name="getCityTime">
+ <soap:operation soapAction="http://ws.intalio.com/TimeService/getCityTime" style="document" />
+ <wsdl:input>
+ <soap:body use="literal" />
+ </wsdl:input>
+ <wsdl:output>
+ <soap:body use="literal" />
+ </wsdl:output>
+ </wsdl:operation>
+ </wsdl:binding>
+ <wsdl:service name="TimeService">
+ <wsdl:documentation xmlns:wsdl="http://schemas.xmlsoap.org/wsdl/">A sample Time service</wsdl:documentation>
+ <wsdl:port name="TimeServiceSoap" binding="tns:TimeServiceSoap">
+ <soap:address location="http://ws.intalio.com/TimeService/" />
+ </wsdl:port>
+ </wsdl:service>
+</wsdl:definitions>
\ No newline at end of file
Added: ode/branches/APACHE_ODE_1.X/axis2-war/src/test/resources/TestSystemCronCleanup_exclude/deploy.xml
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/axis2-war/src/test/resources/TestSystemCronCleanup_exclude/deploy.xml?rev=780562&view=auto
==============================================================================
--- ode/branches/APACHE_ODE_1.X/axis2-war/src/test/resources/TestSystemCronCleanup_exclude/deploy.xml (added)
+++ ode/branches/APACHE_ODE_1.X/axis2-war/src/test/resources/TestSystemCronCleanup_exclude/deploy.xml Mon Jun 1 05:07:35 2009
@@ -0,0 +1,42 @@
+<?xml version='1.0' encoding='UTF-8'?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied. See the License for the
+ ~ specific language governing permissions and limitations
+ ~ under the License.
+ -->
+<dd:deploy xmlns:dd="http://ode.fivesight.com/schemas/2006/06/27/dd">
+ <dd:process xmlns:dd="http://ode.fivesight.com/schemas/2006/06/27/dd"
+ xmlns:Client="http://example.com/FirstProcess/Client" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:diag="http://example.com/FirstProcess" xmlns:TimeService="http://ws.intalio.com/TimeService/"
+ xmlns:TimeServer="http://example.com/FirstProcess/TimeServer"
+ xmlns:ns="http://bpms.intalio.com/FirstProcess/Time" xmlns:xs="http://www.w3.org/2001/XMLSchema"
+ xmlns:this="http://example.com/FirstProcess/FirstProcess" name="this:FirstProcess"
+ fileName="FirstProcess-FirstProcess.bpel">
+ <dd:property name="PATH">FirstProcess</dd:property>
+ <dd:property name="SVG">FirstProcess.svg</dd:property>
+ <dd:provide partnerLink="firstProcessAndClientPlkVar">
+ <dd:service name="this:CanonicServiceForClient" port="canonicPort"></dd:service>
+ </dd:provide>
+ <dd:invoke partnerLink="timeServerAndFirstProcessForPortTimeServiceSoapPlkVar">
+ <dd:service name="TimeService:TimeService" port="TimeServiceSoap"></dd:service>
+ </dd:invoke>
+ <dd:schedule when="* * * * * ?">
+ <dd:cleanup>
+ <dd:filter><![CDATA[last_active<-1w status=completed]]></dd:filter>
+ </dd:cleanup>
+ </dd:schedule>
+ </dd:process>
+</dd:deploy>
\ No newline at end of file
Added: ode/branches/APACHE_ODE_1.X/axis2-war/src/test/resources/TestSystemCronCleanup_exclude/testRequest.soap
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/axis2-war/src/test/resources/TestSystemCronCleanup_exclude/testRequest.soap?rev=780562&view=auto
==============================================================================
--- ode/branches/APACHE_ODE_1.X/axis2-war/src/test/resources/TestSystemCronCleanup_exclude/testRequest.soap (added)
+++ ode/branches/APACHE_ODE_1.X/axis2-war/src/test/resources/TestSystemCronCleanup_exclude/testRequest.soap Mon Jun 1 05:07:35 2009
@@ -0,0 +1,28 @@
+<?xml version="1.0" encoding="utf-8" ?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied. See the License for the
+ ~ specific language governing permissions and limitations
+ ~ under the License.
+ -->
+
+<SOAP-ENV:Envelope xmlns:SOAP-ENV="http://schemas.xmlsoap.org/soap/envelope/">
+ <!-- test soap message -->
+ <SOAP-ENV:Body>
+ <TimeRequest xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/" xmlns:wsi-wsdl="http://www.intalio.com/BPMS/wsi/wsdl" xmlns:wsi-xf="http://www.intalio.com/BPMS/wsi/xforms" xmlns:xforms="http://www.w3.org/2002/xforms" xmlns:ns0="http://bpms.intalio.com/FirstProcess/Time" xmlns:xxforms="http://orbeon.org/oxf/xml/xforms" xmlns="http://bpms.intalio.com/FirstProcess/Time">
+ <city>New York</city>
+ </TimeRequest>
+ </SOAP-ENV:Body>
+</SOAP-ENV:Envelope>
Added: ode/branches/APACHE_ODE_1.X/axis2-war/src/test/webapp/WEB-INF/test-schedules.xml
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/axis2-war/src/test/webapp/WEB-INF/test-schedules.xml?rev=780562&view=auto
==============================================================================
--- ode/branches/APACHE_ODE_1.X/axis2-war/src/test/webapp/WEB-INF/test-schedules.xml (added)
+++ ode/branches/APACHE_ODE_1.X/axis2-war/src/test/webapp/WEB-INF/test-schedules.xml Mon Jun 1 05:07:35 2009
@@ -0,0 +1,29 @@
+<?xml version='1.0' encoding='UTF-8'?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied. See the License for the
+ ~ specific language governing permissions and limitations
+ ~ under the License.
+ -->
+
+<sched:schedules
+ xmlns:sched="http://www.apache.org/ode/schemas/schedules/2009/05"
+ xmlns:dd="http://www.apache.org/ode/schemas/dd/2007/03">
+ <sched:schedule when="* * * * * ?">
+ <sched:cleanup>
+ <dd:filter><![CDATA[status=completed]]></dd:filter>
+ </sched:cleanup>
+ </sched:schedule>
+</sched:schedules>
\ No newline at end of file
Modified: ode/branches/APACHE_ODE_1.X/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java?rev=780562&r1=780561&r2=780562&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.X/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java (original)
+++ ode/branches/APACHE_ODE_1.X/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java Mon Jun 1 05:07:35 2009
@@ -56,6 +56,7 @@
import org.apache.ode.bpel.dao.BpelDAOConnectionFactory;
import org.apache.ode.bpel.engine.BpelServerImpl;
import org.apache.ode.bpel.engine.CountLRUDehydrationPolicy;
+import org.apache.ode.bpel.engine.cron.CronScheduler;
import org.apache.ode.bpel.extvar.jdbc.JdbcExternalVariableModule;
import org.apache.ode.bpel.iapi.BpelEventListener;
import org.apache.ode.bpel.iapi.EndpointReferenceContext;
@@ -106,9 +107,9 @@
protected ExecutorService _executorService;
- protected ExecutorService _polledRunnableExecutorService;
-
protected Scheduler _scheduler;
+
+ protected CronScheduler _cronScheduler;
protected Database _db;
@@ -119,14 +120,13 @@
private ManagementService _mgtService;
protected MultiThreadedHttpConnectionManager httpConnectionManager;
-
-
+
public void init(ServletConfig config, AxisConfiguration axisConf) throws ServletException {
init(config.getServletContext().getRealPath("/WEB-INF"), axisConf);
}
public void init(String contextPath, AxisConfiguration axisConf) throws ServletException {
- _axisConfig = axisConf;
+ _axisConfig = axisConf;
String rootDir = System.getProperty("org.apache.ode.rootDir");
if (rootDir != null) _appRoot = new File(rootDir);
else _appRoot = new File(contextPath);
@@ -144,12 +144,12 @@
_odeConfig = new ODEConfigProperties(_configRoot);
try {
- _odeConfig.load();
+ _odeConfig.load();
} catch (FileNotFoundException fnf) {
- String errmsg = __msgs.msgOdeInstallErrorCfgNotFound(_odeConfig.getFile());
- __log.warn(errmsg);
+ String errmsg = __msgs.msgOdeInstallErrorCfgNotFound(_odeConfig.getFile());
+ __log.warn(errmsg);
} catch (Exception ex) {
- String errmsg = __msgs.msgOdeInstallErrorCfgReadError(_odeConfig.getFile());
+ String errmsg = __msgs.msgOdeInstallErrorCfgReadError(_odeConfig.getFile());
__log.error(errmsg, ex);
throw new ServletException(errmsg, ex);
}
@@ -182,16 +182,16 @@
_store.loadAll();
try {
- _bpelServer.start();
+ _bpelServer.start();
} catch (Exception ex) {
- String errmsg = __msgs.msgOdeBpelServerStartFailure();
- __log.error(errmsg, ex);
- throw new ServletException(errmsg, ex);
+ String errmsg = __msgs.msgOdeBpelServerStartFailure();
+ __log.error(errmsg, ex);
+ throw new ServletException(errmsg, ex);
}
_poller = getDeploymentPollerExt();
if( _poller == null ) {
- _poller = new DeploymentPoller(_store.getDeployDir(), this);
+ _poller = new DeploymentPoller(_store.getDeployDir(), this);
}
_mgtService = new ManagementService();
@@ -216,36 +216,36 @@
}
@SuppressWarnings("unchecked")
- private DeploymentPoller getDeploymentPollerExt() {
- DeploymentPoller poller = null;
-
- InputStream is = null;
- try {
- is = ODEServer.class.getResourceAsStream("/deploy-ext.properties");
- if( is != null ) {
- __log.info("A deploy-ext.properties found; will use the provided class if applicable.");
- try {
- Properties props = new Properties();
- props.load(is);
- String deploymentPollerClass = props.getProperty("deploymentPoller.class");
- if( deploymentPollerClass == null ) {
- __log.warn("deploy-ext.properties found in the class path; however, the file does not have 'deploymentPoller.class' as one of the properties!!");
- } else {
- Class pollerClass = Class.forName(deploymentPollerClass);
- poller = (DeploymentPoller)pollerClass.getConstructor(File.class, ODEServer.class).newInstance(_store.getDeployDir(), this);
- __log.info("A custom deployment poller: " + deploymentPollerClass + " has been plugged in.");
- }
- } catch( Exception e ) {
- __log.warn("Deployment poller extension class is not loadable, falling back to the default DeploymentPoller.", e);
- }
- } else if( __log.isDebugEnabled() ) __log.debug("No deploy-ext.properties found.");
- } finally {
- try {
- if(is != null) is.close();
- } catch( IOException ie ) {
- // ignore
- }
- }
+ private DeploymentPoller getDeploymentPollerExt() {
+ DeploymentPoller poller = null;
+
+ InputStream is = null;
+ try {
+ is = ODEServer.class.getResourceAsStream("/deploy-ext.properties");
+ if( is != null ) {
+ __log.info("A deploy-ext.properties found; will use the provided class if applicable.");
+ try {
+ Properties props = new Properties();
+ props.load(is);
+ String deploymentPollerClass = props.getProperty("deploymentPoller.class");
+ if( deploymentPollerClass == null ) {
+ __log.warn("deploy-ext.properties found in the class path; however, the file does not have 'deploymentPoller.class' as one of the properties!!");
+ } else {
+ Class pollerClass = Class.forName(deploymentPollerClass);
+ poller = (DeploymentPoller)pollerClass.getConstructor(File.class, ODEServer.class).newInstance(_store.getDeployDir(), this);
+ __log.info("A custom deployment poller: " + deploymentPollerClass + " has been plugged in.");
+ }
+ } catch( Exception e ) {
+ __log.warn("Deployment poller extension class is not loadable, falling back to the default DeploymentPoller.", e);
+ }
+ } else if( __log.isDebugEnabled() ) __log.debug("No deploy-ext.properties found.");
+ } finally {
+ try {
+ if(is != null) is.close();
+ } catch( IOException ie ) {
+ // ignore
+ }
+ }
return poller;
}
@@ -294,6 +294,16 @@
__log.debug("Error stopping services.", ex);
}
+ if( _cronScheduler != null ) {
+ try {
+ __log.debug("shutting down cron scheduler.");
+ _cronScheduler.shutdown();
+ _cronScheduler = null;
+ } catch (Exception ex) {
+ __log.debug("Cron scheduler couldn't be shutdown.", ex);
+ }
+ }
+
if (_scheduler != null)
try {
__log.debug("shutting down scheduler.");
@@ -454,25 +464,19 @@
else
_executorService = Executors.newFixedThreadPool(_odeConfig.getThreadPoolMaxSize(), threadFactory);
- // executor service for long running bulk transactions
- _polledRunnableExecutorService = Executors.newCachedThreadPool(new ThreadFactory() {
- int threadNumber = 0;
- public Thread newThread(Runnable r) {
- threadNumber += 1;
- Thread t = new Thread(r, "PolledRunnable-"+threadNumber);
- t.setDaemon(true);
- return t;
- }
- });
-
_bpelServer = new BpelServerImpl();
_scheduler = createScheduler();
_scheduler.setJobProcessor(_bpelServer);
BpelServerImpl.PolledRunnableProcessor polledRunnableProcessor = new BpelServerImpl.PolledRunnableProcessor();
- polledRunnableProcessor.setPolledRunnableExecutorService(_polledRunnableExecutorService);
+ polledRunnableProcessor.setPolledRunnableExecutorService(_executorService);
polledRunnableProcessor.setContexts(_bpelServer.getContexts());
_scheduler.setPolledRunnableProcesser(polledRunnableProcessor);
+
+ _cronScheduler = new CronScheduler();
+ _cronScheduler.setScheduledTaskExec(_executorService);
+ _cronScheduler.setContexts(_bpelServer.getContexts());
+ _bpelServer.setCronScheduler(_cronScheduler);
_bpelServer.setDaoConnectionFactory(_daoCF);
_bpelServer.setInMemDaoConnectionFactory(new BpelDAOConnectionFactoryImpl(_scheduler, _odeConfig.getInMemMexTtl()));
@@ -534,6 +538,10 @@
return _appRoot;
}
+ public File getConfigRoot() {
+ return _configRoot;
+ }
+
private void registerEventListeners() {
String listenersStr = _odeConfig.getEventListeners();
if (listenersStr != null) {
@@ -613,24 +621,36 @@
} else {
// we may have potentially created a lot of garbage, so,
// let's hope the garbage collector is configured properly.
- if (pconf != null) {
- _bpelServer.cleanupProcess(pconf);
- }
+ if (pconf != null) {
+ _bpelServer.cleanupProcess(pconf);
+ }
}
break;
case DISABLED:
case UNDEPLOYED:
_bpelServer.unregister(pse.pid);
- if (pconf != null) {
- _bpelServer.cleanupProcess(pconf);
- }
+ if (pconf != null) {
+ _bpelServer.cleanupProcess(pconf);
+ }
break;
default:
__log.debug("Ignoring store event: " + pse);
}
- }
-
+
+ if( pconf != null ) {
+ if( pse.type == ProcessStoreEvent.Type.UNDEPLOYED) {
+ __log.debug("Cancelling all cron scheduled jobs on store event: " + pse);
+ _bpelServer.getContexts().cronScheduler.cancelProcessCronJobs(pse.pid, true);
+ }
+ // Except for undeploy event, we need to re-schedule process dependent jobs
+ __log.debug("(Re)scheduling cron scheduled jobs on store event: " + pse);
+ if( pse.type != ProcessStoreEvent.Type.UNDEPLOYED) {
+ _bpelServer.getContexts().cronScheduler.scheduleProcessCronJobs(pse.pid, pconf);
+ }
+ }
+ }
+
// Transactional debugging stuff, to track down all these little annoying bugs.
private class DebugTxMgr implements TransactionManager {
private TransactionManager _tm;
Modified: ode/branches/APACHE_ODE_1.X/axis2/src/main/java/org/apache/ode/axis2/deploy/DeploymentPoller.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/axis2/src/main/java/org/apache/ode/axis2/deploy/DeploymentPoller.java?rev=780562&r1=780561&r2=780562&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.X/axis2/src/main/java/org/apache/ode/axis2/deploy/DeploymentPoller.java (original)
+++ ode/branches/APACHE_ODE_1.X/axis2/src/main/java/org/apache/ode/axis2/deploy/DeploymentPoller.java Mon Jun 1 05:07:35 2009
@@ -41,6 +41,9 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.ode.axis2.ODEServer;
+import org.apache.ode.bpel.engine.cron.CronScheduler;
+import org.apache.ode.bpel.engine.cron.SystemSchedulesConfig;
+import org.apache.ode.utils.WatchDog;
import javax.xml.namespace.QName;
import java.io.File;
@@ -48,6 +51,8 @@
import java.io.FilenameFilter;
import java.io.IOException;
import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
/**
* Polls a directory for the deployment of a new deployment unit.
@@ -66,8 +71,15 @@
protected ODEServer _odeServer;
private boolean _onHold = false;
+
+ private SystemSchedulesConfig _systemSchedulesConf;
+
+ @SuppressWarnings("unchecked")
+ private Map<String, WatchDog> dDWatchDogsByPath = new HashMap<String, WatchDog>();
+ @SuppressWarnings("unchecked")
+ private WatchDog _systemCronConfigWatchDog;
- /** Filter accepting directories containing a .odedd file. */
+ /** Filter accepting directories containing a ode dd file. */
private static final FileFilter _fileFilter = new FileFilter() {
public boolean accept(File path) {
if (path.isDirectory()) {
@@ -88,11 +100,13 @@
}
};
- public DeploymentPoller(File deployDir, ODEServer odeServer) {
+ public DeploymentPoller(File deployDir, final ODEServer odeServer) {
_odeServer = odeServer;
_deployDir = deployDir;
if (!_deployDir.exists())
_deployDir.mkdir();
+ _systemSchedulesConf = createSystemSchedulesConfig(odeServer.getConfigRoot());
+ _systemCronConfigWatchDog = createSystemCronConfigWatchDog(odeServer.getBpelServer().getContexts().cronScheduler);
}
public void start() {
@@ -107,51 +121,55 @@
}
protected boolean isDeploymentFromODEFileSystemAllowed() {
- return true;
+ return true;
}
/**
* Scan the directory for new (or removed) files (called mainly from {@link PollingThread}) and calls whoever is in charge of
* the actual deployment (or undeployment).
*/
+ @SuppressWarnings("unchecked")
private void check() {
File[] files = _deployDir.listFiles(_fileFilter);
// Checking for new deployment directories
if (isDeploymentFromODEFileSystemAllowed() && files != null) {
- for (File file : files) {
- File deployXml = new File(file, "deploy.xml");
- File deployedMarker = new File(_deployDir, file.getName() + ".deployed");
-
- if (!deployXml.exists()) {
- // Skip if deploy.xml is abset
- __log.debug("Not deploying " + file + " (missing deploy.xml)");
- }
-
- if (deployedMarker.exists()) {
- continue;
- }
-
- try {
- deployedMarker.createNewFile();
- } catch (IOException e1) {
- __log.error("Error creating deployed marker file, " + file + " will not be deployed");
- continue;
- }
-
- try {
- _odeServer.getProcessStore().undeploy(file);
- } catch (Exception ex) {
- __log.error("Error undeploying " + file.getName());
- }
-
- try {
- Collection<QName> deployed = _odeServer.getProcessStore().deploy(file);
- __log.info("Deployment of artifact " + file.getName() + " successful: " + deployed );
- } catch (Exception e) {
- __log.error("Deployment of " + file.getName() + " failed, aborting for now.", e);
- }
- }
+ for (File file : files) {
+ File deployXml = new File(file, "deploy.xml");
+ File deployedMarker = new File(_deployDir, file.getName() + ".deployed");
+
+ if (!deployXml.exists()) {
+ // Skip if deploy.xml is abset
+ __log.debug("Not deploying " + file + " (missing deploy.xml)");
+ }
+
+ WatchDog ddWatchDog = ensureDeployXmlWatchDog(file, deployXml);
+
+ if (deployedMarker.exists()) {
+ checkDeployXmlWatchDog(ddWatchDog);
+ continue;
+ }
+
+ try {
+ deployedMarker.createNewFile();
+ } catch (IOException e1) {
+ __log.error("Error creating deployed marker file, " + file + " will not be deployed");
+ continue;
+ }
+
+ try {
+ _odeServer.getProcessStore().undeploy(file);
+ } catch (Exception ex) {
+ __log.error("Error undeploying " + file.getName());
+ }
+
+ try {
+ Collection<QName> deployed = _odeServer.getProcessStore().deploy(file);
+ __log.info("Deployment of artifact " + file.getName() + " successful: " + deployed );
+ } catch (Exception e) {
+ __log.error("Deployment of " + file.getName() + " failed, aborting for now.", e);
+ }
+ }
}
// Removing deployments that disappeared
@@ -162,10 +180,52 @@
if (!deployDir.exists()) {
Collection<QName> undeployed = _odeServer.getProcessStore().undeploy(deployDir);
file.delete();
+ disposeDeployXmlWatchDog(deployDir);
if (undeployed.size() > 0)
__log.info("Successfully undeployed " + pkg);
}
}
+
+ checkSystemCronConfigWatchDog(_systemCronConfigWatchDog);
+ }
+
+ @SuppressWarnings("unchecked")
+ protected WatchDog ensureDeployXmlWatchDog(File deployFolder, File deployXml) {
+ WatchDog ddWatchDog = dDWatchDogsByPath.get(deployXml.getAbsolutePath());
+ if( ddWatchDog == null ) {
+ ddWatchDog = WatchDog.watchFile(deployXml, new DDWatchDogObserver(deployFolder.getName()));
+ dDWatchDogsByPath.put(deployXml.getAbsolutePath(), ddWatchDog);
+ }
+
+ return ddWatchDog;
+ }
+
+ @SuppressWarnings("unchecked")
+ protected void checkDeployXmlWatchDog(WatchDog ddWatchDog) {
+ ddWatchDog.check();
+ }
+
+ protected void disposeDeployXmlWatchDog(File deployDir) {
+ dDWatchDogsByPath.remove(new File(deployDir, "deploy.xml").getAbsolutePath());
+ }
+
+ protected SystemSchedulesConfig createSystemSchedulesConfig(File configRoot) {
+ return new SystemSchedulesConfig(configRoot);
+ }
+
+ @SuppressWarnings("unchecked")
+ protected WatchDog createSystemCronConfigWatchDog(final CronScheduler cronScheduler) {
+ return WatchDog.watchFile(_systemSchedulesConf.getSchedulesFile(),
+ new WatchDog.DefaultObserver() {
+ public void init() {
+ cronScheduler.refreshSystemCronJobs(_systemSchedulesConf);
+ }
+ });
+ }
+
+ @SuppressWarnings("unchecked")
+ protected void checkSystemCronConfigWatchDog(WatchDog ddWatchDog) {
+ ddWatchDog.check();
}
/**
@@ -174,6 +234,10 @@
private class PollingThread extends Thread {
private boolean _active = true;
+ public PollingThread() {
+ setName("DeploymentPoller");
+ }
+
/** Stop this poller, and block until it terminates. */
void kill() {
synchronized (this) {
@@ -226,4 +290,17 @@
File deployedMarker = new File(_deployDir, file.getName() + ".deployed");
deployedMarker.delete();
}
+
+ @SuppressWarnings("unchecked")
+ protected class DDWatchDogObserver extends WatchDog.DefaultObserver {
+ private String deploymentPakage;
+
+ public DDWatchDogObserver(String deploymentPakage) {
+ this.deploymentPakage = deploymentPakage;
+ }
+
+ public void init() {
+ _odeServer.getProcessStore().refreshSchedules(deploymentPakage);
+ }
+ }
}
Modified: ode/branches/APACHE_ODE_1.X/bpel-api/src/main/java/org/apache/ode/bpel/common/Filter.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-api/src/main/java/org/apache/ode/bpel/common/Filter.java?rev=780562&r1=780561&r2=780562&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.X/bpel-api/src/main/java/org/apache/ode/bpel/common/Filter.java (original)
+++ ode/branches/APACHE_ODE_1.X/bpel-api/src/main/java/org/apache/ode/bpel/common/Filter.java Mon Jun 1 05:07:35 2009
@@ -44,10 +44,13 @@
/**
* Pattern that matches anything like 'abcde < fgh' or 'ijklm =nop' using
- * supported comparators
+ * supported comparators
+ * <p>
+ * The not-equal op, '<>' works only with pids.
+ * </p>
*/
private static final Pattern __comparatorPattern =
- Pattern.compile("([^=<> ]*) *(<=|>=|<|>|=) *([^=<> ]*)");
+ Pattern.compile("([^=<> ]*) *(<>|<=|>=|<|>|=) *([^=<> ]*)");
protected Map<FKEY, Restriction<String>> _criteria = new HashMap<FKEY,Restriction<String>>();
Modified: ode/branches/APACHE_ODE_1.X/bpel-api/src/main/java/org/apache/ode/bpel/common/InstanceFilter.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-api/src/main/java/org/apache/ode/bpel/common/InstanceFilter.java?rev=780562&r1=780561&r2=780562&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.X/bpel-api/src/main/java/org/apache/ode/bpel/common/InstanceFilter.java (original)
+++ ode/branches/APACHE_ODE_1.X/bpel-api/src/main/java/org/apache/ode/bpel/common/InstanceFilter.java Mon Jun 1 05:07:35 2009
@@ -22,10 +22,12 @@
import org.apache.commons.logging.LogFactory;
import org.apache.ode.bpel.pmapi.InvalidRequestException;
import org.apache.ode.utils.ISO8601DateParser;
+import org.apache.ode.utils.RelativeDateParser;
import java.io.Serializable;
import java.text.ParseException;
import java.util.ArrayList;
+import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -48,6 +50,9 @@
/** If set, will filter on the process id (PID) and select all matching process definitions */
private List<String> pids;
+ /** If set, the PID filter will work negatively */
+ private boolean arePidsNegative;
+
/** If set, will filter on the process name (accepts ending with wildcard) */
private String nameFilter;
@@ -105,6 +110,7 @@
PID {
void process(InstanceFilter filter, String key, String op, String value) {
filter.pids = parse(value);
+ filter.arePidsNegative = "<>".equals(op);
}
},
NAME {
@@ -183,7 +189,7 @@
if (startedDateFilter != null) {
for (String ddf : startedDateFilter) {
try {
- ISO8601DateParser.parse(getDateWithoutOp(ddf));
+ parseDateExpression(getDateWithoutOp(ddf));
} catch (ParseException e) {
throw new InvalidRequestException(
"Couldn't parse one of the filter date, please make "
@@ -195,7 +201,7 @@
if (lastActiveDateFilter != null) {
for (String ddf : lastActiveDateFilter) {
try {
- ISO8601DateParser.parse(getDateWithoutOp(ddf));
+ parseDateExpression(getDateWithoutOp(ddf));
} catch (ParseException e) {
throw new InvalidRequestException(
"Couldn't parse one of the filter date, please make "
@@ -235,6 +241,14 @@
this(filter, null, Integer.MAX_VALUE);
}
+ private Date parseDateExpression(String date) throws ParseException {
+ if( date.toLowerCase().startsWith("-") && date.length() > 1 ) {
+ return RelativeDateParser.parseRelativeDate(date.substring(1));
+ } else {
+ return ISO8601DateParser.parse(date);
+ }
+ }
+
/**
* Converts the status filter value as given by a filter ('active',
* 'suspended', ...) to an instance state as defined in the ProcessState
@@ -299,6 +313,10 @@
return pids;
}
+ public boolean arePidsNegative() {
+ return arePidsNegative;
+ }
+
public List<String> getIidFilter() {
return iids;
}
@@ -339,7 +357,11 @@
StringBuffer buf = new StringBuffer();
buf.append("InstanceFilter {");
buf.append("iids="+iids);
- buf.append(",pids="+pids);
+ if( !arePidsNegative ) {
+ buf.append(",pids="+pids);
+ } else {
+ buf.append(",-pids="+pids);
+ }
buf.append(",name="+nameFilter);
buf.append(",namespace="+namespaceFilter);
buf.append(",status="+statusFilter);
Added: ode/branches/APACHE_ODE_1.X/bpel-api/src/main/java/org/apache/ode/bpel/iapi/ClusterAware.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-api/src/main/java/org/apache/ode/bpel/iapi/ClusterAware.java?rev=780562&view=auto
==============================================================================
--- ode/branches/APACHE_ODE_1.X/bpel-api/src/main/java/org/apache/ode/bpel/iapi/ClusterAware.java (added)
+++ ode/branches/APACHE_ODE_1.X/bpel-api/src/main/java/org/apache/ode/bpel/iapi/ClusterAware.java Mon Jun 1 05:07:35 2009
@@ -0,0 +1,18 @@
+package org.apache.ode.bpel.iapi;
+
+/**
+ * The interface to implement for a custom Scheduler implementation to support
+ * Clustering.
+ *
+ * @author sean
+ *
+ */
+public interface ClusterAware {
+ /**
+ * A custom implementation should return true if the node that this method is called
+ * is the coordinator of the cluster.
+ *
+ * @return true when the node is coordinator
+ */
+ boolean amICoordinator();
+}
Modified: ode/branches/APACHE_ODE_1.X/bpel-api/src/main/java/org/apache/ode/bpel/iapi/ProcessConf.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-api/src/main/java/org/apache/ode/bpel/iapi/ProcessConf.java?rev=780562&r1=780561&r2=780562&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.X/bpel-api/src/main/java/org/apache/ode/bpel/iapi/ProcessConf.java (original)
+++ ode/branches/APACHE_ODE_1.X/bpel-api/src/main/java/org/apache/ode/bpel/iapi/ProcessConf.java Mon Jun 1 05:07:35 2009
@@ -21,16 +21,18 @@
import java.io.File;
import java.io.InputStream;
import java.net.URI;
+import java.util.ArrayList;
import java.util.Date;
+import java.util.EnumSet;
import java.util.List;
import java.util.Map;
-import java.util.Properties;
import java.util.Set;
import javax.wsdl.Definition;
import javax.xml.namespace.QName;
import org.apache.ode.bpel.evt.BpelEvent;
+import org.apache.ode.utils.CronExpression;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
@@ -177,6 +179,8 @@
Set<CLEANUP_CATEGORY> getCleanupCategories(boolean instanceSucceeded);
+ List<CronJob> getCronJobs();
+
public enum CLEANUP_CATEGORY {
INSTANCE,
VARIABLES,
@@ -188,4 +192,62 @@
return valueOf(CLEANUP_CATEGORY.class, lowerCase.toUpperCase());
}
}
+
+ public class CronJob {
+ private CronExpression _cronExpression;
+
+ private final List<Map<String,Object>> runnableDetailList = new ArrayList<Map<String,Object>>();
+
+ public void setCronExpression(CronExpression _cronExpression) {
+ this._cronExpression = _cronExpression;
+ }
+
+ public CronExpression getCronExpression() {
+ return _cronExpression;
+ }
+
+ public List<Map<String,Object>> getRunnableDetailList() {
+ return runnableDetailList;
+ }
+
+ public String toString() {
+ StringBuffer buf = new StringBuffer();
+
+ buf.append("Cron[");
+ buf.append(_cronExpression);
+ buf.append("] ");
+ buf.append(runnableDetailList);
+
+ return buf.toString();
+ }
+ }
+
+ public class CleanupInfo {
+ private List<String> _filters = new ArrayList<String>();
+
+ private final Set<CLEANUP_CATEGORY> _categories = EnumSet.noneOf(CLEANUP_CATEGORY.class);
+
+ public void setFilters(List<String> filters) {
+ _filters = filters;
+ }
+
+ public List<String> getFilters() {
+ return _filters;
+ }
+
+ public Set<CLEANUP_CATEGORY> getCategories() {
+ return _categories;
+ }
+
+ public String toString() {
+ StringBuffer buf = new StringBuffer();
+
+ buf.append("CleanupInfo: filters=");
+ buf.append(_filters);
+ buf.append(", categories=");
+ buf.append(_categories);
+
+ return buf.toString();
+ }
+ }
}
Modified: ode/branches/APACHE_ODE_1.X/bpel-api/src/main/java/org/apache/ode/bpel/iapi/ProcessStore.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-api/src/main/java/org/apache/ode/bpel/iapi/ProcessStore.java?rev=780562&r1=780561&r2=780562&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.X/bpel-api/src/main/java/org/apache/ode/bpel/iapi/ProcessStore.java (original)
+++ ode/branches/APACHE_ODE_1.X/bpel-api/src/main/java/org/apache/ode/bpel/iapi/ProcessStore.java Mon Jun 1 05:07:35 2009
@@ -114,4 +114,6 @@
* @return
*/
long getCurrentVersion();
+
+ void refreshSchedules(String packageName);
}
Modified: ode/branches/APACHE_ODE_1.X/bpel-api/src/main/java/org/apache/ode/bpel/iapi/ProcessStoreEvent.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-api/src/main/java/org/apache/ode/bpel/iapi/ProcessStoreEvent.java?rev=780562&r1=780561&r2=780562&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.X/bpel-api/src/main/java/org/apache/ode/bpel/iapi/ProcessStoreEvent.java (original)
+++ ode/branches/APACHE_ODE_1.X/bpel-api/src/main/java/org/apache/ode/bpel/iapi/ProcessStoreEvent.java Mon Jun 1 05:07:35 2009
@@ -49,7 +49,10 @@
DISABLED,
/** A process property was changed. */
- PROPERTY_CHANGED
+ PROPERTY_CHANGED,
+
+ /** Cron schedule settings have been changed for the process */
+ SCHEDULE_SETTINGS_CHANGED
}
/**
Added: ode/branches/APACHE_ODE_1.X/bpel-dao/src/main/java/org/apache/ode/bpel/dao/FilteredInstanceDeletable.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-dao/src/main/java/org/apache/ode/bpel/dao/FilteredInstanceDeletable.java?rev=780562&view=auto
==============================================================================
--- ode/branches/APACHE_ODE_1.X/bpel-dao/src/main/java/org/apache/ode/bpel/dao/FilteredInstanceDeletable.java (added)
+++ ode/branches/APACHE_ODE_1.X/bpel-dao/src/main/java/org/apache/ode/bpel/dao/FilteredInstanceDeletable.java Mon Jun 1 05:07:35 2009
@@ -0,0 +1,24 @@
+package org.apache.ode.bpel.dao;
+
+import java.util.Set;
+
+import org.apache.ode.bpel.common.InstanceFilter;
+import org.apache.ode.bpel.iapi.ProcessConf.CLEANUP_CATEGORY;
+
+/**
+ * An implementation of this interface provides a way to delete runtime process instances
+ * through the InstanceFilter.
+ *
+ * @author sean
+ *
+ */
+public interface FilteredInstanceDeletable {
+ /**
+ * Deletes instance filter by the given instance filter and clean up categories.
+ *
+ * @param filter instance filter
+ * @param categories clean up categories
+ * @return returns the number of instances that are deleted
+ */
+ int deleteInstances(InstanceFilter filter, Set<CLEANUP_CATEGORY> categories);
+}
Modified: ode/branches/APACHE_ODE_1.X/bpel-epr/src/main/java/org/apache/ode/il/MockScheduler.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-epr/src/main/java/org/apache/ode/il/MockScheduler.java?rev=780562&r1=780561&r2=780562&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.X/bpel-epr/src/main/java/org/apache/ode/il/MockScheduler.java (original)
+++ ode/branches/APACHE_ODE_1.X/bpel-epr/src/main/java/org/apache/ode/il/MockScheduler.java Mon Jun 1 05:07:35 2009
@@ -298,4 +298,8 @@
public void setPolledRunnableProcesser(JobProcessor delegatedRunnableProcessor) {
}
+
+ public boolean amICoordinator() {
+ return true;
+ }
}
Modified: ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java?rev=780562&r1=780561&r2=780562&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java (original)
+++ ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java Mon Jun 1 05:07:35 2009
@@ -38,6 +38,7 @@
import org.apache.ode.bpel.dao.BpelDAOConnectionFactory;
import org.apache.ode.bpel.dao.DeferredProcessInstanceCleanable;
import org.apache.ode.bpel.dao.ProcessDAO;
+import org.apache.ode.bpel.engine.cron.CronScheduler;
import org.apache.ode.bpel.engine.migration.MigrationHandler;
import org.apache.ode.bpel.evar.ExternalVariableModule;
import org.apache.ode.bpel.evt.BpelEvent;
@@ -493,6 +494,10 @@
_contexts.scheduler = scheduler;
}
+ public void setCronScheduler(CronScheduler cronScheduler) throws BpelEngineException {
+ _contexts.cronScheduler = cronScheduler;
+ }
+
public void setEndpointReferenceContext(EndpointReferenceContext eprContext) throws BpelEngineException {
_contexts.eprContext = eprContext;
}
Modified: ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/Contexts.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/Contexts.java?rev=780562&r1=780561&r2=780562&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/Contexts.java (original)
+++ ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/Contexts.java Mon Jun 1 05:07:35 2009
@@ -26,6 +26,7 @@
import org.apache.ode.bpel.iapi.MessageExchangeContext;
import org.apache.ode.bpel.iapi.Scheduler;
import org.apache.ode.bpel.intercept.MessageExchangeInterceptor;
+import org.apache.ode.bpel.engine.cron.CronScheduler;
import org.apache.ode.bpel.evar.ExternalVariableModule;
import java.util.HashMap;
@@ -39,11 +40,12 @@
* integration layer.
*/
public class Contexts {
-
MessageExchangeContext mexContext;
public Scheduler scheduler;
+ public CronScheduler cronScheduler;
+
EndpointReferenceContext eprContext;
BindingContext bindingContext;
Added: ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/cron/CronScheduler.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/cron/CronScheduler.java?rev=780562&view=auto
==============================================================================
--- ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/cron/CronScheduler.java (added)
+++ ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/cron/CronScheduler.java Mon Jun 1 05:07:35 2009
@@ -0,0 +1,296 @@
+package org.apache.ode.bpel.engine.cron;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+
+import javax.xml.namespace.QName;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ode.bpel.engine.Contexts;
+import org.apache.ode.bpel.engine.BpelServerImpl.ContextsAware;
+import org.apache.ode.bpel.iapi.ClusterAware;
+import org.apache.ode.bpel.iapi.ProcessConf;
+import org.apache.ode.bpel.iapi.ProcessConf.CronJob;
+import org.apache.ode.bpel.iapi.Scheduler.MapSerializableRunnable;
+import org.apache.ode.utils.CronExpression;
+
+public class CronScheduler {
+ static final Log __log = LogFactory.getLog(CronScheduler.class);
+
+ // minimum interval of the cron job(1 second)
+ private final long MIN_INTERVAL = 0;
+ // if the work is schedule too late from the due time, skip it
+ private final long TOLERABLE_SCHEDULE_DELAY = 0;
+
+ private ExecutorService _scheduledTaskExec;
+
+ private Contexts _contexts;
+
+ private final Timer _schedulerTimer = new Timer("CronScheduler", true);
+
+ private final Collection<TerminationListener> _systemTerminationListeners = new ArrayList<TerminationListener>();
+
+ private final Map<QName, Collection<TerminationListener>> _terminationListenersByPid = new HashMap<QName, Collection<CronScheduler.TerminationListener>>();
+
+ private volatile boolean _shuttingDown = false;
+
+ public void setScheduledTaskExec(ExecutorService taskExec) {
+ _scheduledTaskExec = taskExec;
+ }
+
+ public void setContexts(Contexts _contexts) {
+ this._contexts = _contexts;
+ }
+
+ public void shutdown() {
+ _shuttingDown = true;
+ _schedulerTimer.cancel();
+ }
+
+ public void cancelProcessCronJobs(QName pid, boolean undeployed) {
+ assert pid != null;
+
+ if( __log.isInfoEnabled() ) __log.info("Cancelling PROCESS CRON jobs for: " + pid);
+ Collection<TerminationListener> listenersToTerminate = new ArrayList<TerminationListener>();
+
+ synchronized( _terminationListenersByPid ) {
+ Collection<TerminationListener> listeners = _terminationListenersByPid.get(pid);
+ if( listeners != null ) {
+ listenersToTerminate.addAll(listeners);
+ }
+ if( undeployed ) {
+ _terminationListenersByPid.remove(pid);
+ }
+ }
+
+ // terminate existing cron jobs if there are
+ synchronized( pid ) {
+ for( TerminationListener listener : listenersToTerminate ) {
+ listener.terminate();
+ }
+ }
+ }
+
+ public void scheduleProcessCronJobs(QName pid, ProcessConf pconf) {
+ if( _shuttingDown ) {
+ return;
+ }
+ assert pid != null;
+
+ cancelProcessCronJobs(pid, false);
+ Collection<TerminationListener> newListeners = new ArrayList<TerminationListener>();
+
+ synchronized( pid ) {
+ if( __log.isInfoEnabled() ) __log.info("Scheduling PROCESS CRON jobs for: " + pid);
+
+ // start new cron jobs
+ for( final CronJob job : pconf.getCronJobs() ) {
+ if( __log.isDebugEnabled() ) __log.debug("Scheduling PROCESS CRON job: " + job.getCronExpression() + " for: " + pid);
+ // for each different scheduled time
+ Runnable runnable = new Runnable() {
+ public void run() {
+ if( __log.isDebugEnabled() ) __log.debug("Running cron cleanup with details list size: " + job.getRunnableDetailList().size());
+ for( Map<String, Object> details : job.getRunnableDetailList() ) {
+ try {
+ // for each clean up for the scheduled time
+ RuntimeDataCleanupRunnable cleanup = new RuntimeDataCleanupRunnable();
+ cleanup.restoreFromDetailsMap(details);
+ cleanup.setContexts(_contexts);
+ cleanup.run();
+ if( __log.isDebugEnabled() ) __log.debug("Finished running runtime data cleanup from a PROCESS CRON job: " + cleanup);
+ } catch(Exception re) {
+ __log.error("Error during runtime data cleanup from a PROCESS CRON: " + details + "; check your cron settings in deploy.xml.", re);
+ // don't sweat.. the rest of the system and other cron jobs still should work
+ }
+ }
+ }
+ };
+ newListeners.add(schedule(job.getCronExpression(), runnable, null, null));
+ }
+ }
+
+ // make sure the pid does not get into the terminationListener map if no cron is setup
+ if( !newListeners.isEmpty() ) {
+ synchronized( _terminationListenersByPid ) {
+ Collection<TerminationListener> oldListeners = _terminationListenersByPid.get(pid);
+ if( oldListeners == null ) {
+ _terminationListenersByPid.put(pid, newListeners);
+ } else {
+ oldListeners.addAll(newListeners);
+ }
+ }
+ }
+ }
+
+ public void refreshSystemCronJobs(SystemSchedulesConfig systemSchedulesConf) {
+ if( _shuttingDown ) {
+ return;
+ }
+
+ synchronized( _systemTerminationListeners) {
+ if( __log.isInfoEnabled() ) __log.info("Refreshing SYSTEM CRON jobs.");
+
+ try {
+ // if error thrown on reading the schedules.xml, do not cancel existing cron jobs
+ List<CronJob> systemCronJobs = systemSchedulesConf.getSystemCronJobs();
+
+ // cancel cron jobs
+ for( TerminationListener listener : _systemTerminationListeners ) {
+ listener.terminate();
+ }
+ _systemTerminationListeners.clear();
+
+ // start new cron jobs
+ for( final CronJob job : systemCronJobs ) {
+ if( __log.isDebugEnabled() ) __log.debug("Scheduling SYSTEM CRON job:" + job);
+ // for each different scheduled time
+ Runnable runnable = new Runnable() {
+ public void run() {
+ for( Map<String, Object> details : job.getRunnableDetailList() ) {
+ try {
+ // for now, we have only runtime data cleanup cron job defined
+ // for each clean up for the scheduled time
+ RuntimeDataCleanupRunnable cleanup = new RuntimeDataCleanupRunnable();
+ synchronized( _terminationListenersByPid ) {
+ if( !_terminationListenersByPid.isEmpty() ) {
+ details.put("pidsToExclude", _terminationListenersByPid.keySet());
+ }
+ }
+ cleanup.restoreFromDetailsMap(details);
+ cleanup.setContexts(_contexts);
+ cleanup.run();
+ if( __log.isDebugEnabled() ) __log.debug("Finished running runtime data cleanup from a SYSTEM CRON job:" + cleanup);
+ } catch( Exception e ) {
+ __log.error("Error running a runtime data cleanup from a SYSTEM CRON job: " + details + "; check your system cron setup.", e);
+ }
+ }
+ }
+ };
+ _systemTerminationListeners.add(schedule(job.getCronExpression(), runnable, null, null));
+ }
+ } catch( Exception e ) {
+ __log.error("Error during refreshing SYSTEM CRON schedules: ", e);
+ }
+ }
+ }
+
+ public TerminationListener schedule(final CronExpression cronExpression,
+ final Runnable runnable, final Map<String, Object> runnableDetails,
+ TerminationListener terminationListener) {
+ if( _shuttingDown ) {
+ __log.info("CRON Scheduler is being shut down. This new scheduling request is ignored.");
+ return new TerminationListener() {
+ public void terminate() {
+ // do nothing
+ }
+ };
+ }
+
+ assert cronExpression != null;
+ assert runnable != null;
+
+ final Date nextScheduleTime = cronExpression.getNextValidTimeAfter(new Date(
+ System.currentTimeMillis() + MIN_INTERVAL));
+ final CronScheduledJob job = new CronScheduledJob(nextScheduleTime, runnable, runnableDetails, cronExpression, terminationListener);
+ if( __log.isDebugEnabled() ) __log.debug("CRON will run in " + (nextScheduleTime.getTime() - System.currentTimeMillis()) + "ms.");
+
+ try {
+ _schedulerTimer.schedule(new TimerTask() {
+ @Override
+ public void run() {
+ __log.debug("Cron scheduling timer kicked in: " + cronExpression);
+ // run only if the current node is the coordinator,
+ // with the SimpleScheduler, the node is always the coordinator
+ if( !(_contexts.scheduler instanceof ClusterAware)
+ || ((ClusterAware)_contexts.scheduler).amICoordinator() ) {
+ // do not hold the timer thread too long, submit the work to an executorService
+ _scheduledTaskExec.submit(job);
+ __log.debug("CRON job scheduled " + runnable);
+ }
+ }
+ }, nextScheduleTime);
+ } catch( IllegalStateException ise ) {
+ if( _shuttingDown ) {
+ __log.info("CRON Scheduler is being shut down. This new scheduling request is ignored.");
+ } else {
+ throw ise;
+ }
+ }
+
+ return job.terminationListener;
+ }
+
+ public interface TerminationListener {
+ void terminate();
+ }
+
+ private class CronScheduledJob implements Callable<TerminationListener> {
+ private volatile boolean terminated = false;
+ private Date nextScheduleTime;
+ private Runnable runnable;
+ private Map<String, Object> runnableDetails;
+ private CronExpression cronExpression;
+ private TerminationListener terminationListener;
+
+ public CronScheduledJob(Date nextScheduleTime,
+ Runnable runnable, Map<String, Object> runnableDetails,
+ CronExpression cronExpression, TerminationListener terminationListener) {
+ this.nextScheduleTime = nextScheduleTime;
+ this.runnable = runnable;
+ this.runnableDetails = runnableDetails;
+ this.cronExpression = cronExpression;
+ if( terminationListener == null ) {
+ terminationListener = new TerminationListener() {
+ public void terminate() {
+ terminated = true;
+ }
+ };
+ }
+ this.terminationListener = terminationListener;
+ }
+
+ public TerminationListener call() throws Exception {
+ try {
+ if( TOLERABLE_SCHEDULE_DELAY == 0 ||
+ nextScheduleTime.getTime() < System.currentTimeMillis() + TOLERABLE_SCHEDULE_DELAY) {
+ if( runnableDetails != null &&
+ runnable instanceof MapSerializableRunnable ) {
+ ((MapSerializableRunnable)runnable).restoreFromDetailsMap(runnableDetails);
+ }
+ if (runnable instanceof ContextsAware) {
+ ((ContextsAware) runnable).setContexts(_contexts);
+ }
+ if( !_shuttingDown && !terminated ) {
+ __log.debug("Running CRON job: " + runnable + " for " + nextScheduleTime.getTime());
+ runnable.run();
+ }
+ } else {
+ // ignore the scheduling.. it will be scheduled later
+ }
+ } catch( Exception e ) {
+ if( _shuttingDown ) {
+ __log.info("A cron job threw an Exception during ODE shutdown: " + e.getMessage() + ", you can ignore the error.");
+ } else if( e instanceof RuntimeException ) {
+ throw e;
+ } else {
+ throw new RuntimeException("Exception during running cron scheduled job: " + runnable, e);
+ }
+ } finally {
+ if( !_shuttingDown && !terminated ) {
+ schedule(cronExpression, runnable, runnableDetails, terminationListener);
+ }
+ }
+
+ return terminationListener;
+ }
+ }
+}
\ No newline at end of file
Added: ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/cron/RuntimeDataCleanupRunnable.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/cron/RuntimeDataCleanupRunnable.java?rev=780562&view=auto
==============================================================================
--- ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/cron/RuntimeDataCleanupRunnable.java (added)
+++ ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/cron/RuntimeDataCleanupRunnable.java Mon Jun 1 05:07:35 2009
@@ -0,0 +1,99 @@
+package org.apache.ode.bpel.engine.cron;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+
+import javax.xml.namespace.QName;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ode.bpel.common.InstanceFilter;
+import org.apache.ode.bpel.dao.BpelDAOConnection;
+import org.apache.ode.bpel.dao.FilteredInstanceDeletable;
+import org.apache.ode.bpel.engine.Contexts;
+import org.apache.ode.bpel.engine.BpelServerImpl.ContextsAware;
+import org.apache.ode.bpel.iapi.ProcessConf.CLEANUP_CATEGORY;
+import org.apache.ode.bpel.iapi.ProcessConf.CleanupInfo;
+import org.apache.ode.bpel.iapi.Scheduler.MapSerializableRunnable;
+
+public class RuntimeDataCleanupRunnable implements MapSerializableRunnable, ContextsAware {
+ private final Log _log = LogFactory.getLog(RuntimeDataCleanupRunnable.class);
+
+ private static final long serialVersionUID = 1L;
+
+ private transient Contexts _contexts;
+
+ private int _transactionSize;
+ private CleanupInfo _cleanupInfo;
+ private QName _pid;
+ private Set<QName> _pidsToExclude;
+
+ public RuntimeDataCleanupRunnable() {
+ }
+
+ @SuppressWarnings("unchecked")
+ public void restoreFromDetailsMap(Map<String, Object> details) {
+ _cleanupInfo = (CleanupInfo)details.get("cleanupInfo");
+ _transactionSize = (Integer)details.get("transactionSize");
+ _pid = (QName)details.get("pid");
+ _pidsToExclude = (Set<QName>)details.get("pidsToExclude");
+ }
+
+ public void storeToDetailsMap(Map<String, Object> details) {
+ // we don't serialize
+ }
+
+ public void setContexts(Contexts contexts) {
+ _contexts = contexts;
+ }
+
+ public void run() {
+ _log.info("CleanInstances.run().");
+ for( String filter : _cleanupInfo.getFilters() ) {
+ _log.info("CleanInstances.run(" + filter + ")");
+ long numberOfDeletedInstances = 0;
+ do {
+ numberOfDeletedInstances = cleanInstances(filter, _cleanupInfo.getCategories(), _transactionSize);
+ } while( numberOfDeletedInstances == _transactionSize );
+ }
+ }
+
+ int cleanInstances(String filter, final Set<CLEANUP_CATEGORY> categories, int limit) {
+ if( _pid != null ) {
+ filter += " pid=" + _pid;
+ } else if( _pidsToExclude != null ) {
+ StringBuffer pids = new StringBuffer();
+ for( QName pid : _pidsToExclude ) {
+ if( pids.length() > 0 ) {
+ pids.append("|");
+ }
+ pids.append(pid);
+ }
+ filter += " pid<>" + pids.toString();
+ }
+
+ _log.debug("CleanInstances using filter: " + filter + ", limit: " + limit);
+
+ final InstanceFilter instanceFilter = new InstanceFilter(filter, "", limit);
+ try {
+ if( _contexts.scheduler != null ) {
+ return _contexts.scheduler.execTransaction(new Callable<Integer>() {
+ public Integer call() throws Exception {
+ BpelDAOConnection con = _contexts.dao.getConnection();
+ if( con instanceof FilteredInstanceDeletable ) {
+ return ((FilteredInstanceDeletable)con).deleteInstances(instanceFilter, categories);
+ }
+ return 0;
+ }
+ });
+ } else {
+ return 0;
+ }
+ } catch (RuntimeException re) {
+ throw re;
+ } catch (Exception e) {
+ throw new RuntimeException("Exception while listing instances: ", e);
+ }
+ }
+}
Propchange: ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/cron/RuntimeDataCleanupRunnable.java
------------------------------------------------------------------------------
svn:mergeinfo =
Added: ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/cron/SystemSchedulesConfig.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/cron/SystemSchedulesConfig.java?rev=780562&view=auto
==============================================================================
--- ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/cron/SystemSchedulesConfig.java (added)
+++ ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/cron/SystemSchedulesConfig.java Mon Jun 1 05:07:35 2009
@@ -0,0 +1,103 @@
+package org.apache.ode.bpel.engine.cron;
+
+import java.io.File;
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ode.bpel.schedules.SchedulesDocument;
+import org.apache.ode.bpel.schedules.TSchedule;
+import org.apache.ode.bpel.dd.TCleanup;
+import org.apache.ode.bpel.iapi.ContextException;
+import org.apache.ode.bpel.iapi.ProcessConf.CleanupInfo;
+import org.apache.ode.bpel.iapi.ProcessConf.CronJob;
+import org.apache.ode.store.ProcessCleanupConfImpl;
+import org.apache.ode.utils.CronExpression;
+import org.apache.xmlbeans.XmlOptions;
+
+public class SystemSchedulesConfig {
+ private final static Log __log = LogFactory.getLog(SystemSchedulesConfig.class);
+
+ public final static String SCHEDULE_CONFIG_FILE_PROP_KEY = "org.apache.ode.scheduleConfigFile";
+ private File schedulesFile;
+
+ public SystemSchedulesConfig(File configRoot) {
+ String scheduleConfigFile = System.getProperty(SCHEDULE_CONFIG_FILE_PROP_KEY);
+ if( scheduleConfigFile != null ) {
+ schedulesFile = new File(scheduleConfigFile);
+ if( !new File(scheduleConfigFile).exists()) {
+ __log.warn("A custom location for schedules has been set. However, the file does not exist at the location: "
+ + schedulesFile.getAbsolutePath() + ". The file will be read when one gets created.");
+ }
+ } else {
+ assert configRoot != null;
+ schedulesFile = new File(configRoot, "schedules.xml");
+ }
+ __log.info("SYSTEM CRON configuration: " + schedulesFile.getAbsolutePath());
+ }
+
+ public File getSchedulesFile() {
+ return schedulesFile;
+ }
+
+ /**
+ * Returns the list of cron jobs configured for all processes. This call returns
+ * a fresh snapshot.
+ *
+ * @return the list of cron jobs
+ */
+ public List<CronJob> getSystemCronJobs() {
+ List<CronJob> jobs = new ArrayList<CronJob>();
+
+ if( schedulesFile != null && schedulesFile.exists() ) {
+ for(TSchedule schedule : getSystemSchedulesDocument().getSchedules().getScheduleList()) {
+ CronJob job = new CronJob();
+ try {
+ job.setCronExpression(new CronExpression(schedule.getWhen()));
+ for(final TCleanup aCleanup : schedule.getCleanupList()) {
+ CleanupInfo cleanupInfo = new CleanupInfo();
+ assert !aCleanup.getFilterList().isEmpty();
+ cleanupInfo.setFilters(aCleanup.getFilterList());
+ ProcessCleanupConfImpl.processACleanup(cleanupInfo.getCategories(), aCleanup.getCategoryList());
+
+ Map<String, Object> runnableDetails = new HashMap<String, Object>();
+ runnableDetails.put("cleanupInfo", cleanupInfo);
+ runnableDetails.put("transactionSize", 10);
+ job.getRunnableDetailList().add(runnableDetails);
+ __log.info("SYSTEM CRON configuration added a runtime data cleanup: " + runnableDetails);
+ }
+ jobs.add(job);
+ } catch( ParseException pe ) {
+ __log.error("Exception during parsing the schedule cron expression: " + schedule.getWhen() + ", skipped the scheduled job.", pe);
+ }
+ }
+ }
+
+ __log.info("SYSTEM CRON configuration found cron jobs: " + jobs);
+ return jobs;
+ }
+
+ @SuppressWarnings("unchecked")
+ private SchedulesDocument getSystemSchedulesDocument() {
+ SchedulesDocument sd = null;
+
+ try {
+ XmlOptions options = new XmlOptions();
+ HashMap otherNs = new HashMap();
+
+ otherNs.put("http://ode.fivesight.com/schemas/2006/06/27/dd",
+ "http://www.apache.org/ode/schemas/schedules/2009/05");
+ options.setLoadSubstituteNamespaces(otherNs);
+ sd = SchedulesDocument.Factory.parse(schedulesFile, options);
+ } catch (Exception e) {
+ throw new ContextException("Couldn't read schedule descriptor at location "
+ + schedulesFile.getAbsolutePath(), e);
+ }
+
+ return sd;
+ }
+}
\ No newline at end of file
Added: ode/branches/APACHE_ODE_1.X/bpel-runtime/src/test/java/org/apache/ode/bpel/engine/cron/CronSchedulerTest.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-runtime/src/test/java/org/apache/ode/bpel/engine/cron/CronSchedulerTest.java?rev=780562&view=auto
==============================================================================
--- ode/branches/APACHE_ODE_1.X/bpel-runtime/src/test/java/org/apache/ode/bpel/engine/cron/CronSchedulerTest.java (added)
+++ ode/branches/APACHE_ODE_1.X/bpel-runtime/src/test/java/org/apache/ode/bpel/engine/cron/CronSchedulerTest.java Mon Jun 1 05:07:35 2009
@@ -0,0 +1,92 @@
+package org.apache.ode.bpel.engine.cron;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+
+import javax.xml.namespace.QName;
+
+import org.apache.log4j.BasicConfigurator;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+import org.apache.ode.bpel.engine.Contexts;
+import org.apache.ode.bpel.engine.cron.CronScheduler;
+import org.apache.ode.bpel.engine.cron.RuntimeDataCleanupRunnable;
+import org.apache.ode.bpel.iapi.Scheduler;
+import org.apache.ode.bpel.iapi.ProcessConf.CLEANUP_CATEGORY;
+import org.apache.ode.bpel.iapi.ProcessConf.CleanupInfo;
+import org.apache.ode.daohib.bpel.BpelDAOConnectionImpl;
+import org.apache.ode.utils.CronExpression;
+import org.jmock.Mock;
+import org.jmock.MockObjectTestCase;
+
+public class CronSchedulerTest extends MockObjectTestCase {
+
+ private Contexts contexts;
+ private Mock scheduler;
+ private CronScheduler cronScheduler;
+ private ExecutorService execService;
+
+ static {
+ BasicConfigurator.configure();
+ LogManager.getRootLogger().setLevel(Level.DEBUG);
+ LogManager.getLogger(RuntimeDataCleanupRunnable.class).setLevel(Level.DEBUG);
+ LogManager.getLogger(BpelDAOConnectionImpl.class).setLevel(Level.DEBUG);
+ LogManager.getLogger("org.apache.ode").setLevel(Level.DEBUG);
+ }
+
+ protected void setUp() throws Exception {
+ contexts = new Contexts();
+ scheduler = mock(Scheduler.class);
+ contexts.scheduler = (Scheduler)scheduler.proxy();
+
+ cronScheduler = new CronScheduler();
+ cronScheduler.setContexts(contexts);
+ execService = Executors.newCachedThreadPool(new ThreadFactory() {
+ int threadNumber = 0;
+ public Thread newThread(Runnable r) {
+ threadNumber += 1;
+ Thread t = new Thread(r, "LongRunning-"+threadNumber);
+ t.setDaemon(true);
+ return t;
+ }
+ });
+ cronScheduler.setScheduledTaskExec(execService);
+ }
+
+ private class NotifyingTerminationListener implements CronScheduler.TerminationListener {
+ boolean finished = false;
+
+ public synchronized void terminate() {
+ finished = true;
+ notify();
+ }
+ }
+
+ public void testNull() throws Exception {}
+
+ public void _testCleanup() throws Exception {
+ CronExpression cronExpr = new CronExpression("* * * * * ?");
+ RuntimeDataCleanupRunnable runnable = new RuntimeDataCleanupRunnable();
+
+ Map<String, Object> details = new HashMap<String, Object>();
+ details.put("pid", new QName("test"));
+ details.put("transactionSize", 10);
+ CleanupInfo cleanupInfo = new CleanupInfo();
+ cleanupInfo.getFilters().add("a=b");
+ cleanupInfo.getCategories().add(CLEANUP_CATEGORY.CORRELATIONS);
+ details.put("cleanupInfo", cleanupInfo);
+ runnable.restoreFromDetailsMap(details);
+ runnable.setContexts(contexts);
+
+ NotifyingTerminationListener listener = new NotifyingTerminationListener();
+ cronScheduler.schedule(cronExpr, runnable, null, listener);
+ while( !listener.finished ) {
+ synchronized(listener) {
+ listener.wait();
+ }
+ }
+ }
+}
\ No newline at end of file
Modified: ode/branches/APACHE_ODE_1.X/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/MockBpelServer.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/MockBpelServer.java?rev=780562&r1=780561&r2=780562&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.X/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/MockBpelServer.java (original)
+++ ode/branches/APACHE_ODE_1.X/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/MockBpelServer.java Mon Jun 1 05:07:35 2009
@@ -354,5 +354,9 @@
public void setPolledRunnableProcesser(JobProcessor delegatedRunnableProcessor) {
}
+
+ public boolean amICoordinator() {
+ return true;
+ }
}
}
Modified: ode/branches/APACHE_ODE_1.X/bpel-schemas/src/main/xsd/dd.xsd
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-schemas/src/main/xsd/dd.xsd?rev=780562&r1=780561&r2=780562&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.X/bpel-schemas/src/main/xsd/dd.xsd (original)
+++ ode/branches/APACHE_ODE_1.X/bpel-schemas/src/main/xsd/dd.xsd Mon Jun 1 05:07:35 2009
@@ -23,7 +23,8 @@
xmlns:dd="http://www.apache.org/ode/schemas/dd/2007/03"
elementFormDefault="qualified">
- <xs:element name="deploy" id="deploy" type="dd:tDeployment"/>
+ <xs:element name="deploy" id="deploy" type="dd:tDeployment">
+ </xs:element>
<xs:complexType name="tDeployment">
<xs:sequence>
@@ -93,9 +94,9 @@
</xs:annotation>
</xs:element>
- <xs:element name="cleanup"
- minOccurs="0" maxOccurs="3"
- type="dd:tCleanup" />
+ <xs:element name="cleanup" minOccurs="0" maxOccurs="3" type="dd:tCleanup" />
+ <xs:element name="schedule" minOccurs="0" maxOccurs="unbounded" type="dd:tSchedule">
+ </xs:element>
</xs:sequence>
<xs:attribute name="name" type="xs:QName" use="required"/>
@@ -206,8 +207,9 @@
</xs:restriction>
</xs:simpleType>
</xs:element>
+ <xs:element name="filter" type="xs:string" minOccurs="0" maxOccurs="unbounded"/>
</xs:sequence>
- <xs:attribute name="on" use="required">
+ <xs:attribute name="on" use="optional">
<xs:simpleType>
<xs:restriction base="xs:string">
<xs:enumeration value="success" />
@@ -217,4 +219,12 @@
</xs:simpleType>
</xs:attribute>
</xs:complexType>
+
+ <xs:complexType name="tSchedule">
+ <xs:sequence>
+ <xs:element name="cleanup" type="dd:tCleanup" minOccurs="0" maxOccurs="unbounded">
+ </xs:element>
+ </xs:sequence>
+ <xs:attribute name="when" type="xs:string" use="required"></xs:attribute>
+ </xs:complexType>
</xs:schema>
Added: ode/branches/APACHE_ODE_1.X/bpel-schemas/src/main/xsd/schedules.xsd
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-schemas/src/main/xsd/schedules.xsd?rev=780562&view=auto
==============================================================================
--- ode/branches/APACHE_ODE_1.X/bpel-schemas/src/main/xsd/schedules.xsd (added)
+++ ode/branches/APACHE_ODE_1.X/bpel-schemas/src/main/xsd/schedules.xsd Mon Jun 1 05:07:35 2009
@@ -0,0 +1,46 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied. See the License for the
+ ~ specific language governing permissions and limitations
+ ~ under the License.
+ -->
+
+<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"
+ targetNamespace="http://www.apache.org/ode/schemas/schedules/2009/05"
+ xmlns:schedules="http://www.apache.org/ode/schemas/schedules/2009/05"
+ elementFormDefault="qualified" xmlns:dd="http://www.apache.org/ode/schemas/dd/2007/03">
+
+ <xs:import namespace="http://www.apache.org/ode/schemas/dd/2007/03" schemaLocation="dd.xsd"></xs:import>
+ <xs:element name="schedules" id="schedules"
+ type="schedules:tSchedules" />
+
+
+ <xs:complexType name="tSchedule">
+ <xs:sequence>
+ <xs:element name="cleanup" type="dd:tCleanup"
+ minOccurs="0" maxOccurs="unbounded">
+ </xs:element>
+ </xs:sequence>
+ <xs:attribute name="when" type="xs:string" use="required"></xs:attribute>
+ </xs:complexType>
+
+
+ <xs:complexType name="tSchedules">
+ <xs:sequence maxOccurs="unbounded" minOccurs="0">
+ <xs:element name="schedule" type="schedules:tSchedule"></xs:element>
+ </xs:sequence>
+ </xs:complexType>
+</xs:schema>
Added: ode/branches/APACHE_ODE_1.X/bpel-schemas/src/main/xsd/schedules.xsdconfig
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-schemas/src/main/xsd/schedules.xsdconfig?rev=780562&view=auto
==============================================================================
--- ode/branches/APACHE_ODE_1.X/bpel-schemas/src/main/xsd/schedules.xsdconfig (added)
+++ ode/branches/APACHE_ODE_1.X/bpel-schemas/src/main/xsd/schedules.xsdconfig Mon Jun 1 05:07:35 2009
@@ -0,0 +1,32 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied. See the License for the
+ ~ specific language governing permissions and limitations
+ ~ under the License.
+ -->
+
+<xb:config xmlns:xb="http://xml.apache.org/xmlbeans/2004/02/xbean/config"
+ xmlns:dd="http://www.apache.org/ode/schemas/schedules/2009/05">
+
+ <xb:namespace uri="http://www.apache.org/ode/schemas/schedules/2009/05">
+ <xb:package>org.apache.ode.bpel.schedules</xb:package>
+ </xb:namespace>
+
+ <xb:namespace uri="##any">
+ <xb:prefix></xb:prefix>
+ <xb:suffix></xb:suffix>
+ </xb:namespace>
+
+</xb:config>
Modified: ode/branches/APACHE_ODE_1.X/bpel-store/src/main/java/org/apache/ode/store/ProcessCleanupConfImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-store/src/main/java/org/apache/ode/store/ProcessCleanupConfImpl.java?rev=780562&r1=780561&r2=780562&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.X/bpel-store/src/main/java/org/apache/ode/store/ProcessCleanupConfImpl.java (original)
+++ ode/branches/APACHE_ODE_1.X/bpel-store/src/main/java/org/apache/ode/store/ProcessCleanupConfImpl.java Mon Jun 1 05:07:35 2009
@@ -31,60 +31,60 @@
import org.apache.ode.bpel.iapi.ProcessConf.CLEANUP_CATEGORY;
public class ProcessCleanupConfImpl {
- protected static Log __log = LogFactory.getLog(ProcessCleanupConfImpl.class);
-
- private final Set<CLEANUP_CATEGORY> successCategories = EnumSet.noneOf(CLEANUP_CATEGORY.class);
- private final Set<CLEANUP_CATEGORY> failureCategories = EnumSet.noneOf(CLEANUP_CATEGORY.class);
-
- // package default
+ protected static Log __log = LogFactory.getLog(ProcessCleanupConfImpl.class);
+
+ private final Set<CLEANUP_CATEGORY> successCategories = EnumSet.noneOf(CLEANUP_CATEGORY.class);
+ private final Set<CLEANUP_CATEGORY> failureCategories = EnumSet.noneOf(CLEANUP_CATEGORY.class);
+
+ // package default
ProcessCleanupConfImpl(TDeployment.Process pinfo) {
- for( TCleanup cleanup : pinfo.getCleanupList() ) {
- if( cleanup.getOn() == TCleanup.On.SUCCESS || cleanup.getOn() == TCleanup.On.ALWAYS ) {
- processACleanup(successCategories, cleanup.getCategoryList());
- }
- if( cleanup.getOn() == TCleanup.On.FAILURE || cleanup.getOn() == TCleanup.On.ALWAYS ) {
- processACleanup(failureCategories, cleanup.getCategoryList());
- }
- }
-
- // validate configurations
- Set<CLEANUP_CATEGORY> categories = getCleanupCategories(true);
- if( categories.contains(CLEANUP_CATEGORY.INSTANCE) && !categories.containsAll(EnumSet.of(CLEANUP_CATEGORY.CORRELATIONS, CLEANUP_CATEGORY.VARIABLES))) {
- throw new ContextException("Cleanup configuration error: the instance category requires both the correlations and variables categories specified together!!!");
- }
- categories = getCleanupCategories(false);
- if( categories.contains(CLEANUP_CATEGORY.INSTANCE) && !categories.containsAll(EnumSet.of(CLEANUP_CATEGORY.CORRELATIONS, CLEANUP_CATEGORY.VARIABLES))) {
- throw new ContextException("Cleanup configuration error: the instance category requires both the correlations and variables categories specified together!!!");
- }
+ for( TCleanup cleanup : pinfo.getCleanupList() ) {
+ if( cleanup.getOn() == TCleanup.On.SUCCESS || cleanup.getOn() == TCleanup.On.ALWAYS ) {
+ processACleanup(successCategories, cleanup.getCategoryList());
+ }
+ if( cleanup.getOn() == TCleanup.On.FAILURE || cleanup.getOn() == TCleanup.On.ALWAYS ) {
+ processACleanup(failureCategories, cleanup.getCategoryList());
+ }
+ }
+
+ // validate configurations
+ Set<CLEANUP_CATEGORY> categories = getCleanupCategories(true);
+ if( categories.contains(CLEANUP_CATEGORY.INSTANCE) && !categories.containsAll(EnumSet.of(CLEANUP_CATEGORY.CORRELATIONS, CLEANUP_CATEGORY.VARIABLES))) {
+ throw new ContextException("Cleanup configuration error: the instance category requires both the correlations and variables categories specified together!!!");
+ }
+ categories = getCleanupCategories(false);
+ if( categories.contains(CLEANUP_CATEGORY.INSTANCE) && !categories.containsAll(EnumSet.of(CLEANUP_CATEGORY.CORRELATIONS, CLEANUP_CATEGORY.VARIABLES))) {
+ throw new ContextException("Cleanup configuration error: the instance category requires both the correlations and variables categories specified together!!!");
+ }
}
- private void processACleanup(Set<CLEANUP_CATEGORY> categories, List<TCleanup.Category.Enum> categoryList) {
- if( categoryList.isEmpty() ) {
- // add all categories
- categories.addAll(EnumSet.allOf(CLEANUP_CATEGORY.class));
- } else {
- for( TCleanup.Category.Enum aCategory : categoryList ) {
- if( aCategory == TCleanup.Category.ALL) {
- // add all categories
- categories.addAll(EnumSet.allOf(CLEANUP_CATEGORY.class));
- } else {
- categories.add(CLEANUP_CATEGORY.fromString(aCategory.toString()));
- }
- }
- }
+ public static void processACleanup(Set<CLEANUP_CATEGORY> categories, List<TCleanup.Category.Enum> categoryList) {
+ if( categoryList.isEmpty() ) {
+ // add all categories
+ categories.addAll(EnumSet.allOf(CLEANUP_CATEGORY.class));
+ } else {
+ for( TCleanup.Category.Enum aCategory : categoryList ) {
+ if( aCategory == TCleanup.Category.ALL) {
+ // add all categories
+ categories.addAll(EnumSet.allOf(CLEANUP_CATEGORY.class));
+ } else {
+ categories.add(CLEANUP_CATEGORY.fromString(aCategory.toString()));
+ }
+ }
+ }
}
// package default
boolean isCleanupCategoryEnabled(boolean instanceSucceeded, CLEANUP_CATEGORY category) {
- if( instanceSucceeded ) {
- return successCategories.contains(category);
- } else {
- return failureCategories.contains(category);
- }
+ if( instanceSucceeded ) {
+ return successCategories.contains(category);
+ } else {
+ return failureCategories.contains(category);
+ }
}
// package default
Set<CLEANUP_CATEGORY> getCleanupCategories(boolean instanceSucceeded) {
- return instanceSucceeded ? successCategories : failureCategories;
+ return instanceSucceeded ? successCategories : failureCategories;
}
}