You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by se...@apache.org on 2009/06/04 00:23:43 UTC
svn commit: r781611 - in /ode/trunk: ./ axis2-war/src/test/webapp/
axis2-war/src/test/webapp/WEB-INF/ axis2/src/main/java/org/apache/ode/axis2/
axis2/src/main/java/org/apache/ode/axis2/deploy/
bpel-api/src/main/java/org/apache/ode/bpel/iapi/ bpel-store...
Author: seanahn
Date: Wed Jun 3 22:23:43 2009
New Revision: 781611
URL: http://svn.apache.org/viewvc?rev=781611&view=rev
Log:
fixed system cron and process cron
Added:
ode/trunk/axis2-war/src/test/webapp/
ode/trunk/axis2-war/src/test/webapp/WEB-INF/
ode/trunk/axis2-war/src/test/webapp/WEB-INF/test-schedules.xml
Modified:
ode/trunk/Buildfile
ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java
ode/trunk/axis2/src/main/java/org/apache/ode/axis2/deploy/DeploymentPoller.java
ode/trunk/bpel-api/src/main/java/org/apache/ode/bpel/iapi/ProcessStoreEvent.java
ode/trunk/bpel-api/src/main/java/org/apache/ode/bpel/iapi/Scheduler.java
ode/trunk/bpel-store/src/main/java/org/apache/ode/store/ProcessStoreImpl.java
ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/BpelDAOConnectionImpl.java
ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HProcessInstance.java
ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java
ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/cron/RuntimeDataCleanupRunnable.java
ode/trunk/il-common/src/main/java/org/apache/ode/il/MockScheduler.java
ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
Modified: ode/trunk/Buildfile
URL: http://svn.apache.org/viewvc/ode/trunk/Buildfile?rev=781611&r1=781610&r2=781611&view=diff
==============================================================================
--- ode/trunk/Buildfile (original)
+++ ode/trunk/Buildfile Wed Jun 3 22:23:43 2009
@@ -253,6 +253,8 @@
webapp_dir = "#{test.compile.target}/webapp"
test.setup task(:prepare_webapp) do |task|
cp_r _("src/main/webapp"), test.compile.target.to_s
+ rm_rf Dir[_(webapp_dir) + "/**/.svn"]
+ cp_r _("src/test/webapp"), test.compile.target.to_s
cp Dir[_("src/main/webapp/WEB-INF/classes/*")], test.compile.target.to_s
cp Dir[project("axis2").path_to("src/main/wsdl/*")], "#{webapp_dir}/WEB-INF"
cp project("bpel-schemas").path_to("src/main/xsd/pmapi.xsd"), "#{webapp_dir}/WEB-INF"
Added: ode/trunk/axis2-war/src/test/webapp/WEB-INF/test-schedules.xml
URL: http://svn.apache.org/viewvc/ode/trunk/axis2-war/src/test/webapp/WEB-INF/test-schedules.xml?rev=781611&view=auto
==============================================================================
--- ode/trunk/axis2-war/src/test/webapp/WEB-INF/test-schedules.xml (added)
+++ ode/trunk/axis2-war/src/test/webapp/WEB-INF/test-schedules.xml Wed Jun 3 22:23:43 2009
@@ -0,0 +1,29 @@
+<?xml version='1.0' encoding='UTF-8'?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied. See the License for the
+ ~ specific language governing permissions and limitations
+ ~ under the License.
+ -->
+
+<sched:schedules
+ xmlns:sched="http://www.apache.org/ode/schemas/schedules/2009/05"
+ xmlns:dd="http://www.apache.org/ode/schemas/dd/2007/03">
+ <sched:schedule when="* * * * * ?">
+ <sched:cleanup>
+ <dd:filter><![CDATA[status=completed]]></dd:filter>
+ </sched:cleanup>
+ </sched:schedule>
+</sched:schedules>
\ No newline at end of file
Modified: ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java
URL: http://svn.apache.org/viewvc/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java?rev=781611&r1=781610&r2=781611&view=diff
==============================================================================
--- ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java (original)
+++ ode/trunk/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java Wed Jun 3 22:23:43 2009
@@ -25,6 +25,9 @@
import java.util.Map;
import java.util.Iterator;
import java.util.StringTokenizer;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
import javax.servlet.ServletConfig;
import javax.servlet.ServletException;
@@ -65,6 +68,7 @@
import org.apache.ode.bpel.dao.BpelDAOConnectionFactory;
import org.apache.ode.bpel.engine.BpelServerImpl;
import org.apache.ode.bpel.engine.CountLRUDehydrationPolicy;
+import org.apache.ode.bpel.engine.cron.CronScheduler;
import org.apache.ode.bpel.evtproc.DebugBpelEventListener;
import org.apache.ode.bpel.extvar.jdbc.JdbcExternalVariableModule;
import org.apache.ode.bpel.iapi.BpelEventListener;
@@ -107,6 +111,10 @@
protected TransactionManager _txMgr;
protected BpelDAOConnectionFactory _daoCF;
protected Scheduler _scheduler;
+
+ protected ExecutorService _executorService;
+ protected CronScheduler _cronScheduler;
+
protected Database _db;
private DeploymentPoller _poller;
private MultiKeyMap _services = new MultiKeyMap();
@@ -484,10 +492,34 @@
__log.debug("ODE initializing");
}
+ ThreadFactory threadFactory = new ThreadFactory() {
+ int threadNumber = 0;
+ public Thread newThread(Runnable r) {
+ threadNumber += 1;
+ Thread t = new Thread(r, "BULK-"+threadNumber);
+ t.setDaemon(true);
+ return t;
+ }
+ };
+
+ if (_odeConfig.getThreadPoolMaxSize() == 0)
+ _executorService = Executors.newCachedThreadPool(threadFactory);
+ else
+ _executorService = Executors.newFixedThreadPool(_odeConfig.getThreadPoolMaxSize(), threadFactory);
+
_bpelServer = new BpelServerImpl();
_scheduler = createScheduler();
_scheduler.setJobProcessor(_bpelServer);
+ _cronScheduler = new CronScheduler();
+ _cronScheduler.setScheduledTaskExec(_executorService);
+ _cronScheduler.setContexts(_bpelServer.getContexts());
+ _bpelServer.setCronScheduler(_cronScheduler);
+ BpelServerImpl.PolledRunnableProcessor polledRunnableProcessor = new BpelServerImpl.PolledRunnableProcessor();
+ polledRunnableProcessor.setPolledRunnableExecutorService(_executorService);
+ polledRunnableProcessor.setContexts(_bpelServer.getContexts());
+ _scheduler.setPolledRunnableProcesser(polledRunnableProcessor);
+
_bpelServer.setDaoConnectionFactory(_daoCF);
_bpelServer.setEndpointReferenceContext(eprContext);
_bpelServer.setMessageExchangeContext(new MessageExchangeContextImpl(this));
@@ -541,6 +573,10 @@
return _appRoot;
}
+ public File getConfigRoot() {
+ return _configRoot;
+ }
+
/**
* Register event listeners configured in the configuration.
*
@@ -655,6 +691,20 @@
default:
__log.debug("Ignoring store event: " + pse);
}
+
+ ProcessConf pconf = _store.getProcessConfiguration(pse.pid);
+ if( pconf != null ) {
+ if( pse.type == ProcessStoreEvent.Type.UNDEPLOYED) {
+ __log.debug("Cancelling all cron scheduled jobs on store event: " + pse);
+ _bpelServer.getContexts().cronScheduler.cancelProcessCronJobs(pse.pid, true);
+ }
+
+ // Except for undeploy event, we need to re-schedule process dependent jobs
+ __log.debug("(Re)scheduling cron scheduled jobs on store event: " + pse);
+ if( pse.type != ProcessStoreEvent.Type.UNDEPLOYED) {
+ _bpelServer.getContexts().cronScheduler.scheduleProcessCronJobs(pse.pid, pconf);
+ }
+ }
}
// Transactional debugging stuff, to track down all these little annoying bugs.
Modified: ode/trunk/axis2/src/main/java/org/apache/ode/axis2/deploy/DeploymentPoller.java
URL: http://svn.apache.org/viewvc/ode/trunk/axis2/src/main/java/org/apache/ode/axis2/deploy/DeploymentPoller.java?rev=781611&r1=781610&r2=781611&view=diff
==============================================================================
--- ode/trunk/axis2/src/main/java/org/apache/ode/axis2/deploy/DeploymentPoller.java (original)
+++ ode/trunk/axis2/src/main/java/org/apache/ode/axis2/deploy/DeploymentPoller.java Wed Jun 3 22:23:43 2009
@@ -41,6 +41,9 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.ode.axis2.ODEServer;
+import org.apache.ode.bpel.engine.cron.CronScheduler;
+import org.apache.ode.bpel.engine.cron.SystemSchedulesConfig;
+import org.apache.ode.utils.WatchDog;
import javax.xml.namespace.QName;
import java.io.File;
@@ -48,6 +51,8 @@
import java.io.FilenameFilter;
import java.io.IOException;
import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
/**
* Polls a directory for the deployment of a new deployment unit.
@@ -67,6 +72,13 @@
private boolean _onHold = false;
+ private SystemSchedulesConfig _systemSchedulesConf;
+
+ @SuppressWarnings("unchecked")
+ private Map<String, WatchDog> dDWatchDogsByPath = new HashMap<String, WatchDog>();
+ @SuppressWarnings("unchecked")
+ private WatchDog _systemCronConfigWatchDog;
+
/** Filter accepting directories containing a .odedd file. */
private static final FileFilter _fileFilter = new FileFilter() {
public boolean accept(File path) {
@@ -93,6 +105,8 @@
_deployDir = deployDir;
if (!_deployDir.exists())
_deployDir.mkdir();
+ _systemSchedulesConf = createSystemSchedulesConfig(odeServer.getConfigRoot());
+ _systemCronConfigWatchDog = createSystemCronConfigWatchDog(odeServer.getBpelServer().getContexts().cronScheduler);
}
public void start() {
@@ -110,6 +124,7 @@
* Scan the directory for new (or removed) files (called mainly from {@link PollingThread}) and calls whoever is in charge of
* the actual deployment (or undeployment).
*/
+ @SuppressWarnings("unchecked")
private void check() {
File[] files = _deployDir.listFiles(_fileFilter);
@@ -123,7 +138,10 @@
__log.debug("Not deploying " + file + " (missing deploy.xml)");
}
+ WatchDog ddWatchDog = ensureDeployXmlWatchDog(file, deployXml);
+
if (deployedMarker.exists()) {
+ checkDeployXmlWatchDog(ddWatchDog);
continue;
}
@@ -160,6 +178,47 @@
__log.info("Successfully undeployed " + pkg);
}
}
+
+ checkSystemCronConfigWatchDog(_systemCronConfigWatchDog);
+ }
+
+ @SuppressWarnings("unchecked")
+ protected WatchDog ensureDeployXmlWatchDog(File deployFolder, File deployXml) {
+ WatchDog ddWatchDog = dDWatchDogsByPath.get(deployXml.getAbsolutePath());
+ if( ddWatchDog == null ) {
+ ddWatchDog = WatchDog.watchFile(deployXml, new DDWatchDogObserver(deployFolder.getName()));
+ dDWatchDogsByPath.put(deployXml.getAbsolutePath(), ddWatchDog);
+ }
+
+ return ddWatchDog;
+ }
+
+ @SuppressWarnings("unchecked")
+ protected void checkDeployXmlWatchDog(WatchDog ddWatchDog) {
+ ddWatchDog.check();
+ }
+
+ protected void disposeDeployXmlWatchDog(File deployDir) {
+ dDWatchDogsByPath.remove(new File(deployDir, "deploy.xml").getAbsolutePath());
+ }
+
+ protected SystemSchedulesConfig createSystemSchedulesConfig(File configRoot) {
+ return new SystemSchedulesConfig(configRoot);
+ }
+
+ @SuppressWarnings("unchecked")
+ protected WatchDog createSystemCronConfigWatchDog(final CronScheduler cronScheduler) {
+ return WatchDog.watchFile(_systemSchedulesConf.getSchedulesFile(),
+ new WatchDog.DefaultObserver() {
+ public void init() {
+ cronScheduler.refreshSystemCronJobs(_systemSchedulesConf);
+ }
+ });
+ }
+
+ @SuppressWarnings("unchecked")
+ protected void checkSystemCronConfigWatchDog(WatchDog ddWatchDog) {
+ ddWatchDog.check();
}
/**
@@ -220,4 +279,17 @@
File deployedMarker = new File(_deployDir, file.getName() + ".deployed");
deployedMarker.delete();
}
+
+ @SuppressWarnings("unchecked")
+ protected class DDWatchDogObserver extends WatchDog.DefaultObserver {
+ private String deploymentPakage;
+
+ public DDWatchDogObserver(String deploymentPakage) {
+ this.deploymentPakage = deploymentPakage;
+ }
+
+ public void init() {
+ _odeServer.getProcessStore().refreshSchedules(deploymentPakage);
+ }
+ }
}
Modified: ode/trunk/bpel-api/src/main/java/org/apache/ode/bpel/iapi/ProcessStoreEvent.java
URL: http://svn.apache.org/viewvc/ode/trunk/bpel-api/src/main/java/org/apache/ode/bpel/iapi/ProcessStoreEvent.java?rev=781611&r1=781610&r2=781611&view=diff
==============================================================================
--- ode/trunk/bpel-api/src/main/java/org/apache/ode/bpel/iapi/ProcessStoreEvent.java (original)
+++ ode/trunk/bpel-api/src/main/java/org/apache/ode/bpel/iapi/ProcessStoreEvent.java Wed Jun 3 22:23:43 2009
@@ -49,7 +49,10 @@
DISABLED,
/** A process property was changed. */
- PROPERTY_CHANGED
+ PROPERTY_CHANGED,
+
+ /** Cron schedule settings have been changed for the process */
+ SCHEDULE_SETTINGS_CHANGED
}
/**
Modified: ode/trunk/bpel-api/src/main/java/org/apache/ode/bpel/iapi/Scheduler.java
URL: http://svn.apache.org/viewvc/ode/trunk/bpel-api/src/main/java/org/apache/ode/bpel/iapi/Scheduler.java?rev=781611&r1=781610&r2=781611&view=diff
==============================================================================
--- ode/trunk/bpel-api/src/main/java/org/apache/ode/bpel/iapi/Scheduler.java (original)
+++ ode/trunk/bpel-api/src/main/java/org/apache/ode/bpel/iapi/Scheduler.java Wed Jun 3 22:23:43 2009
@@ -23,16 +23,15 @@
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.Future;
/**
* The BPEL scheduler.
*/
public interface Scheduler {
-
void setJobProcessor(JobProcessor processor) throws ContextException;
+ void setPolledRunnableProcesser(JobProcessor polledRunnableProcessor);
+
/**
* Schedule a persisted job. Persisted jobs MUST survive system failure.
* They also must not be scheduled unless the transaction associated with
Modified: ode/trunk/bpel-store/src/main/java/org/apache/ode/store/ProcessStoreImpl.java
URL: http://svn.apache.org/viewvc/ode/trunk/bpel-store/src/main/java/org/apache/ode/store/ProcessStoreImpl.java?rev=781611&r1=781610&r2=781611&view=diff
==============================================================================
--- ode/trunk/bpel-store/src/main/java/org/apache/ode/store/ProcessStoreImpl.java (original)
+++ ode/trunk/bpel-store/src/main/java/org/apache/ode/store/ProcessStoreImpl.java Wed Jun 3 22:23:43 2009
@@ -819,4 +819,9 @@
}
}
+ public void refreshSchedules(String packageName) {
+ for( QName pid : listProcesses(packageName) ) {
+ fireEvent(new ProcessStoreEvent(ProcessStoreEvent.Type.SCHEDULE_SETTINGS_CHANGED, pid, packageName));
+ }
+ }
}
Modified: ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/BpelDAOConnectionImpl.java
URL: http://svn.apache.org/viewvc/ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/BpelDAOConnectionImpl.java?rev=781611&r1=781610&r2=781611&view=diff
==============================================================================
--- ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/BpelDAOConnectionImpl.java (original)
+++ ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/BpelDAOConnectionImpl.java Wed Jun 3 22:23:43 2009
@@ -25,6 +25,7 @@
import org.apache.ode.bpel.dao.*;
import org.apache.ode.bpel.evt.BpelEvent;
import org.apache.ode.bpel.evt.ScopeEvent;
+import org.apache.ode.bpel.iapi.ProcessConf.CLEANUP_CATEGORY;
import org.apache.ode.daohib.SessionManager;
import org.apache.ode.daohib.bpel.hobj.*;
import org.apache.ode.daohib.bpel.ql.HibernateInstancesQueryCompiler;
@@ -52,29 +53,30 @@
/**
* Hibernate-based {@link BpelDAOConnection} implementation.
*/
-public class BpelDAOConnectionImpl implements BpelDAOConnection {
+public class BpelDAOConnectionImpl implements BpelDAOConnection, FilteredInstanceDeletable {
private static final Log __log = LogFactory.getLog(BpelDAOConnectionImpl.class);
- private Session _session;
-
protected SessionManager _sm;
public BpelDAOConnectionImpl(SessionManager sm) {
_sm = sm;
- _session = _sm.getSession();
+ }
+
+ protected Session getSession(){
+ return _sm.getSession();
}
public MessageExchangeDAO createMessageExchange(String mexId, char dir) {
HMessageExchange mex = new HMessageExchange();
mex.setMexId(mexId);
mex.setDirection(dir);
- _session.save(mex);
+ getSession().save(mex);
return new MessageExchangeDaoImpl(_sm, mex);
}
public MessageExchangeDAO getMessageExchange(String mexId) {
try {
- org.hibernate.Query query = _session.createQuery("from HMessageExchange x where x.mexId = ?");
+ org.hibernate.Query query = getSession().createQuery("from HMessageExchange x where x.mexId = ?");
query.setString(0, mexId);
HMessageExchange mex = (HMessageExchange) query.uniqueResult();
return mex == null ? null : new MessageExchangeDaoImpl(_sm, mex);
@@ -93,7 +95,7 @@
public ResourceRouteDAO getResourceRoute(String url, String method) {
try {
- Criteria criteria = _session.createCriteria(HResourceRoute.class);
+ Criteria criteria = getSession().createCriteria(HResourceRoute.class);
criteria.add(Expression.eq("url", url));
criteria.add(Expression.eq("method", method));
HResourceRoute hrr = (HResourceRoute) criteria.uniqueResult();
@@ -105,12 +107,12 @@
}
public void deleteResourceRoute(String url, String method) {
- _session.createQuery("delete from HResourceRoute r where r.url = :url and r.method = :method")
+ getSession().createQuery("delete from HResourceRoute r where r.url = :url and r.method = :method")
.setString("url", url).setString("method", method).executeUpdate();
}
public List<ResourceRouteDAO> getAllResourceRoutes() {
- List<HResourceRoute> hrr = _session.createCriteria(HResourceRoute.class).list();
+ List<HResourceRoute> hrr = getSession().createCriteria(HResourceRoute.class).list();
ArrayList<ResourceRouteDAO> rr = new ArrayList<ResourceRouteDAO>(hrr.size());
for (HResourceRoute hroute : hrr)
rr.add(new ResourceRouteDaoImpl(_sm, hroute));
@@ -125,7 +127,7 @@
process.setDeployDate(new Date());
process.setGuid(guid);
process.setVersion(version);
- _session.save(process);
+ getSession().save(process);
return new ProcessDaoImpl(_sm, process);
}
@@ -139,7 +141,7 @@
public ProcessDAO getProcess(QName processId) {
try {
- Criteria criteria = _session.createCriteria(HProcess.class);
+ Criteria criteria = getSession().createCriteria(HProcess.class);
criteria.add(Expression.eq("processId", processId.toString()));
// For the moment we are expecting only one result.
HProcess hprocess = (HProcess) criteria.uniqueResult();
@@ -158,11 +160,11 @@
* @see org.apache.ode.bpel.dao.ProcessDAO#getInstance(java.lang.Long)
*/
public ProcessInstanceDAO getInstance(Long instanceId) {
- return _getInstance(_sm, _session, instanceId);
+ return _getInstance(_sm, getSession(), instanceId);
}
public ScopeDAO getScope(Long siidl) {
- return _getScope(_sm, _session, siidl);
+ return _getScope(_sm, getSession(), siidl);
}
public Collection<ProcessInstanceDAO> instanceQuery(InstanceFilter criteria) {
@@ -171,7 +173,7 @@
}
List<ProcessInstanceDAO> daos = new ArrayList<ProcessInstanceDAO>();
- Iterator<HProcessInstance> iter = _instanceQuery(_session, false, criteria);
+ Iterator<HProcessInstance> iter = _instanceQuery(getSession(), false, criteria);
while (iter.hasNext()) {
daos.add(new ProcessInstanceDaoImpl(_sm, iter.next()));
}
@@ -238,7 +240,7 @@
@SuppressWarnings( { "unchecked", "deprecation" })
public List<Date> bpelEventTimelineQuery(InstanceFilter ifilter, BpelEventFilter efilter) {
CriteriaBuilder cb = new CriteriaBuilder();
- Criteria crit = _session.createCriteria(HBpelEvent.class);
+ Criteria crit = getSession().createCriteria(HBpelEvent.class);
if (ifilter != null)
cb.buildCriteria(crit, efilter);
if (ifilter != null)
@@ -251,7 +253,7 @@
@SuppressWarnings("unchecked")
public List<BpelEvent> bpelEventQuery(InstanceFilter ifilter, BpelEventFilter efilter) {
CriteriaBuilder cb = new CriteriaBuilder();
- Criteria crit = _session.createCriteria(HBpelEvent.class);
+ Criteria crit = getSession().createCriteria(HBpelEvent.class);
if (efilter != null)
cb.buildCriteria(crit, efilter);
if (ifilter != null)
@@ -283,7 +285,7 @@
HibernateInstancesQueryCompiler compiler = new HibernateInstancesQueryCompiler();
CommandEvaluator<List, Session> eval = compiler.compile((Query) rootNode);
- List<HProcessInstance> instancesList = (List<HProcessInstance>) eval.evaluate(_session);
+ List<HProcessInstance> instancesList = (List<HProcessInstance>) eval.evaluate(getSession());
Collection<ProcessInstanceDAO> result = new ArrayList<ProcessInstanceDAO>(instancesList.size());
for (HProcessInstance instance : instancesList) {
@@ -292,6 +294,33 @@
return result;
}
+ public int deleteInstances(InstanceFilter criteria, Set<CLEANUP_CATEGORY> categories) {
+ if (criteria.getLimit() == 0) {
+ return 0;
+ }
+
+ List<HProcessInstance> instances = _instanceQueryForList(getSession(), false, criteria);
+ if( __log.isDebugEnabled() ) __log.debug("Collected " + instances.size() + " instances to delete.");
+
+ if( !instances.isEmpty() ) {
+ ProcessDaoImpl process = (ProcessDaoImpl)createTransientProcess(instances.get(0).getProcessId());
+ return process.deleteInstances(instances, categories);
+ }
+
+ return 0;
+ }
+
+ @SuppressWarnings("unchecked")
+ private static List<HProcessInstance> _instanceQueryForList(Session session, boolean countOnly, InstanceFilter filter) {
+ Criteria crit = session.createCriteria(HProcessInstance.class);
+ CriteriaBuilder cb = new CriteriaBuilder();
+ cb.buildCriteria(crit, filter);
+
+ crit.setFetchMode("fault", FetchMode.JOIN);
+
+ return crit.list();
+ }
+
@SuppressWarnings("unchecked")
public Map<Long, Collection<CorrelationSetDAO>> getCorrelationSets(Collection<ProcessInstanceDAO> instances) {
if (instances.size() == 0) {
@@ -303,7 +332,7 @@
iids[i] = dao.getInstanceId();
i++;
}
- Collection<HCorrelationSet> csets = _session.getNamedQuery(HCorrelationSet.SELECT_CORSETS_BY_INSTANCES).setParameterList("instances", iids).list();
+ Collection<HCorrelationSet> csets = getSession().getNamedQuery(HCorrelationSet.SELECT_CORSETS_BY_INSTANCES).setParameterList("instances", iids).list();
Map<Long, Collection<CorrelationSetDAO>> map = new HashMap<Long, Collection<CorrelationSetDAO>>();
for (HCorrelationSet cset: csets) {
Long id = cset.getInstance().getId();
Modified: ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HProcessInstance.java
URL: http://svn.apache.org/viewvc/ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HProcessInstance.java?rev=781611&r1=781610&r2=781611&view=diff
==============================================================================
--- ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HProcessInstance.java (original)
+++ ode/trunk/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HProcessInstance.java Wed Jun 3 22:23:43 2009
@@ -38,10 +38,12 @@
public static final String SELECT_INSTANCES_BY_PROCESS="SELECT_INSTANCES_BY_PROCESS";
public static final String SELECT_INSTANCES_BY_PROCESSES_AND_STATES="SELECT_INSTANCES_BY_PROCESSES_AND_STATES";
public static final String DELETE_INSTANCES="DELETE_INSTANCES";
-
+
/** Foreign key to owner {@link HProcess}. */
private HProcess _process;
+ private Long _processId;
+
/** Foreign key to the instantiating {@link HCorrelator}. */
private HCorrelator _instantiatingCorrelator;
@@ -201,6 +203,17 @@
}
/**
+ * @hibernate.property column="PROCESS_ID" insert="false" update="false"
+ */
+ public Long getProcessId() {
+ return _processId;
+ }
+
+ public void setProcessId(Long processId) {
+ _processId = processId;
+ }
+
+ /**
* @hibernate.bag lazy="true" inverse="true"
* @hibernate.collection-key column="PIID" foreign-key="none"
* @hibernate.collection-one-to-many class="org.apache.ode.daohib.bpel.hobj.HScope"
Modified: ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java
URL: http://svn.apache.org/viewvc/ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java?rev=781611&r1=781610&r2=781611&view=diff
==============================================================================
--- ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java (original)
+++ ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java Wed Jun 3 22:23:43 2009
@@ -41,6 +41,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.ode.bpel.dao.*;
+import org.apache.ode.bpel.engine.cron.CronScheduler;
import org.apache.ode.bpel.evar.ExternalVariableModule;
import org.apache.ode.bpel.evt.BpelEvent;
import org.apache.ode.bpel.extension.ExtensionBundleRuntime;
@@ -55,8 +56,6 @@
import org.apache.ode.utils.msg.MessageBundle;
import org.apache.ode.utils.stl.CollectionsX;
import org.apache.ode.utils.stl.MemberOfFunction;
-import org.apache.ode.bpel.rapi.ProcessModel;
-import org.apache.ode.bpel.extension.ExtensionBundleRuntime;
/**
* <p>
@@ -74,7 +73,6 @@
* @author Matthieu Riou <mriou at apache dot org>
*/
public class BpelServerImpl implements BpelServer, Scheduler.JobProcessor {
-
private static final Log __log = LogFactory.getLog(BpelServerImpl.class);
private static final Messages __msgs = MessageBundle.getMessages(Messages.class);
@@ -157,6 +155,10 @@
public BpelServerImpl() {
}
+ public Contexts getContexts() {
+ return _contexts;
+ }
+
protected void waitForQuiessence() {
do{
_mngmtLock.writeLock().lock();
@@ -583,6 +585,10 @@
_contexts.scheduler = scheduler;
}
+ public void setCronScheduler(CronScheduler cronScheduler) throws BpelEngineException {
+ _contexts.cronScheduler = cronScheduler;
+ }
+
public void setEndpointReferenceContext(EndpointReferenceContext eprContext) throws BpelEngineException {
_contexts.eprContext = eprContext;
}
Modified: ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/cron/RuntimeDataCleanupRunnable.java
URL: http://svn.apache.org/viewvc/ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/cron/RuntimeDataCleanupRunnable.java?rev=781611&r1=781610&r2=781611&view=diff
==============================================================================
--- ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/cron/RuntimeDataCleanupRunnable.java (original)
+++ ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/cron/RuntimeDataCleanupRunnable.java Wed Jun 3 22:23:43 2009
@@ -50,12 +50,18 @@
public void run() {
_log.info("CleanInstances.run().");
- for( String filter : _cleanupInfo.getFilters() ) {
- _log.info("CleanInstances.run(" + filter + ")");
- long numberOfDeletedInstances = 0;
- do {
- numberOfDeletedInstances = cleanInstances(filter, _cleanupInfo.getCategories(), _transactionSize);
- } while( numberOfDeletedInstances == _transactionSize );
+ try {
+ for( String filter : _cleanupInfo.getFilters() ) {
+ _log.info("CleanInstances.run(" + filter + ")");
+ long numberOfDeletedInstances = 0;
+ do {
+ numberOfDeletedInstances = cleanInstances(filter, _cleanupInfo.getCategories(), _transactionSize);
+ } while( numberOfDeletedInstances == _transactionSize );
+ }
+ } catch( RuntimeException re ) {
+ throw re;
+ } catch( Exception e ) {
+ throw new RuntimeException("", e);
}
}
Modified: ode/trunk/il-common/src/main/java/org/apache/ode/il/MockScheduler.java
URL: http://svn.apache.org/viewvc/ode/trunk/il-common/src/main/java/org/apache/ode/il/MockScheduler.java?rev=781611&r1=781610&r2=781611&view=diff
==============================================================================
--- ode/trunk/il-common/src/main/java/org/apache/ode/il/MockScheduler.java (original)
+++ ode/trunk/il-common/src/main/java/org/apache/ode/il/MockScheduler.java Wed Jun 3 22:23:43 2009
@@ -145,4 +145,7 @@
public void jobCompleted(String jobId) {
}
+
+ public void setPolledRunnableProcesser(JobProcessor delegatedRunnableProcessor) {
+ }
}
Modified: ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
URL: http://svn.apache.org/viewvc/ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java?rev=781611&r1=781610&r2=781611&view=diff
==============================================================================
--- ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java (original)
+++ ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java Wed Jun 3 22:23:43 2009
@@ -80,6 +80,8 @@
/** The object that actually handles the jobs. */
volatile JobProcessor _jobProcessor;
+ volatile JobProcessor _polledRunnableProcessor;
+
private SchedulerThread _todo;
private DatabaseDelegate _db;
@@ -131,6 +133,10 @@
_db = dbd;
}
+ public void setPolledRunnableProcesser(JobProcessor polledRunnableProcessor) {
+ _polledRunnableProcessor = polledRunnableProcessor;
+ }
+
public void cancelJob(String jobId) throws ContextException {
_todo.dequeue(new Job(0, jobId, false, null));
try {