You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by se...@apache.org on 2009/04/20 08:44:38 UTC
svn commit: r766592 [1/3] - in /ode/branches/APACHE_ODE_1.X:
axis2-war/src/test/java/org/apache/ode/axis2/
axis2/src/main/java/org/apache/ode/axis2/
bpel-api/src/main/java/org/apache/ode/bpel/iapi/
bpel-dao/src/main/java/org/apache/ode/bpel/dao/ bpel-e...
Author: seanahn
Date: Mon Apr 20 06:44:37 2009
New Revision: 766592
URL: http://svn.apache.org/viewvc?rev=766592&view=rev
Log:
ode-587, Defer deletion of instances and associtated data when process is re-deployed
Added:
ode/branches/APACHE_ODE_1.X/bpel-dao/src/main/java/org/apache/ode/bpel/dao/DeferredProcessInstanceCleanable.java
ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ProcessCleanUpRunnable.java
Modified:
ode/branches/APACHE_ODE_1.X/axis2-war/src/test/java/org/apache/ode/axis2/Axis2TestBase.java
ode/branches/APACHE_ODE_1.X/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java
ode/branches/APACHE_ODE_1.X/bpel-api/src/main/java/org/apache/ode/bpel/iapi/Scheduler.java
ode/branches/APACHE_ODE_1.X/bpel-dao/src/main/java/org/apache/ode/bpel/dao/BpelDAOConnection.java
ode/branches/APACHE_ODE_1.X/bpel-dao/src/main/java/org/apache/ode/bpel/dao/ProcessDAO.java
ode/branches/APACHE_ODE_1.X/bpel-epr/src/main/java/org/apache/ode/il/MockScheduler.java
ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java
ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/BpelDAOConnectionImpl.java
ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/DaoBaseImpl.java
ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/ProcessDaoImpl.java
ode/branches/APACHE_ODE_1.X/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/ActivityRecoveryTest.java
ode/branches/APACHE_ODE_1.X/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/MockBpelServer.java
ode/branches/APACHE_ODE_1.X/bpel-test/ (props changed)
ode/branches/APACHE_ODE_1.X/bpel-test/src/main/java/org/apache/ode/test/BPELTestAbstract.java
ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/BpelDAOConnectionImpl.java
ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/HibernateDao.java
ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/MessageExchangeDaoImpl.java
ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/ProcessDaoImpl.java
ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/ProcessInstanceDaoImpl.java
ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/ProcessManagementDaoImpl.java
ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HActivityRecovery.java
ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HBpelEvent.java
ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HCorrelationProperty.java
ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HCorrelationSet.java
ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HCorrelatorMessage.java
ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HCorrelatorSelector.java
ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HFaultData.java
ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HLargeData.java
ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HMessage.java
ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HMessageExchange.java
ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HMessageExchangeProperty.java
ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HObject.java
ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HPartnerLink.java
ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HProcessInstance.java
ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HScope.java
ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HVariableProperty.java
ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/hobj/HXmlData.java
ode/branches/APACHE_ODE_1.X/dao-jpa/src/main/java/org/apache/ode/dao/jpa/BPELDAOConnectionImpl.java
ode/branches/APACHE_ODE_1.X/dao-jpa/src/main/java/org/apache/ode/dao/jpa/ProcessDAOImpl.java
ode/branches/APACHE_ODE_1.X/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
Modified: ode/branches/APACHE_ODE_1.X/axis2-war/src/test/java/org/apache/ode/axis2/Axis2TestBase.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/axis2-war/src/test/java/org/apache/ode/axis2/Axis2TestBase.java?rev=766592&r1=766591&r2=766592&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.X/axis2-war/src/test/java/org/apache/ode/axis2/Axis2TestBase.java (original)
+++ ode/branches/APACHE_ODE_1.X/axis2-war/src/test/java/org/apache/ode/axis2/Axis2TestBase.java Mon Apr 20 06:44:37 2009
@@ -32,6 +32,7 @@
import org.apache.ode.axis2.hooks.ODEAxisService;
import org.apache.ode.axis2.util.Axis2UriResolver;
import org.apache.ode.axis2.util.Axis2WSDLLocator;
+import org.apache.ode.bpel.engine.BpelServerImpl;
import org.apache.ode.tools.sendsoap.cline.HttpSoapSender;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
@@ -59,7 +60,6 @@
* @author Matthieu Riou <mr...@apache.org>
*/
public abstract class Axis2TestBase {
-
public static final int DEFAULT_TEST_PORT = 8888;
private static final Log log = LogFactory.getLog(Axis2TestBase.class);
@@ -73,6 +73,11 @@
private static String originalOdePersistence = System.getProperty("ode.persistence");
private static String originalOdeConfigDir = System.getProperty("org.apache.ode.configDir");
+ static {
+ // disable deferred process instance cleanup for faster testing
+ System.setProperty(BpelServerImpl.DEFERRED_PROCESS_INSTANCE_CLEANUP_DISABLED_NAME, "true");
+ }
+
@DataProvider(name = "configs")
protected Iterator<Object[]> createConfigData() {
List<String> configDirList = new ArrayList<String>();
@@ -309,5 +314,4 @@
return _ode;
}
}
-
}
Modified: ode/branches/APACHE_ODE_1.X/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java?rev=766592&r1=766591&r2=766592&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.X/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java (original)
+++ ode/branches/APACHE_ODE_1.X/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java Mon Apr 20 06:44:37 2009
@@ -103,6 +103,8 @@
protected ExecutorService _executorService;
+ protected ExecutorService _polledRunnableExecutorService;
+
protected Scheduler _scheduler;
protected Database _db;
@@ -322,7 +324,8 @@
}
}
- private void initTxMgr() throws ServletException {
+ @SuppressWarnings("unchecked")
+ private void initTxMgr() throws ServletException {
String txFactoryName = _odeConfig.getTxFactoryClass();
__log.debug("Initializing transaction manager using " + txFactoryName);
try {
@@ -409,10 +412,26 @@
_executorService = Executors.newCachedThreadPool(threadFactory);
else
_executorService = Executors.newFixedThreadPool(_odeConfig.getThreadPoolMaxSize(), threadFactory);
+
+ // executor service for long running bulk transactions
+ _polledRunnableExecutorService = Executors.newCachedThreadPool(new ThreadFactory() {
+ int threadNumber = 0;
+ public Thread newThread(Runnable r) {
+ threadNumber += 1;
+ Thread t = new Thread(r, "PolledRunnable-"+threadNumber);
+ t.setDaemon(true);
+ return t;
+ }
+ });
_bpelServer = new BpelServerImpl();
_scheduler = createScheduler();
_scheduler.setJobProcessor(_bpelServer);
+
+ BpelServerImpl.PolledRunnableProcessor polledRunnableProcessor = new BpelServerImpl.PolledRunnableProcessor();
+ polledRunnableProcessor.setPolledRunnableExecutorService(_polledRunnableExecutorService);
+ polledRunnableProcessor.setContexts(_bpelServer.getContexts());
+ _scheduler.setPolledRunnableProcesser(polledRunnableProcessor);
_bpelServer.setDaoConnectionFactory(_daoCF);
_bpelServer.setInMemDaoConnectionFactory(new BpelDAOConnectionFactoryImpl(_scheduler, _odeConfig.getInMemMexTtl()));
@@ -532,29 +551,29 @@
_bpelServer.unregister(pse.pid);
pconf = _store.getProcessConfiguration(pse.pid);
if (pconf != null) {
- _bpelServer.register(pconf);
+ _bpelServer.register(pconf);
} else {
- __log.debug("slighly odd: recevied event " +
- pse + " for process not in store!");
+ __log.debug("slighly odd: recevied event " +
+ pse + " for process not in store!");
}
break;
case RETIRED:
- // are there are instances of this process running?
- boolean instantiated = _bpelServer.hasActiveInstances(pse.pid);
- // remove the process
+ // are there are instances of this process running?
+ boolean instantiated = _bpelServer.hasActiveInstances(pse.pid);
+ // remove the process
_bpelServer.unregister(pse.pid);
// bounce the process if necessary
if (instantiated) {
- pconf = _store.getProcessConfiguration(pse.pid);
- if (pconf != null) {
- _bpelServer.register(pconf);
- } else {
- __log.debug("slighly odd: recevied event " +
- pse + " for process not in store!");
- }
+ pconf = _store.getProcessConfiguration(pse.pid);
+ if (pconf != null) {
+ _bpelServer.register(pconf);
+ } else {
+ __log.debug("slighly odd: recevied event " +
+ pse + " for process not in store!");
+ }
} else {
// we may have potentially created a lot of garbage, so,
- // let's hope the garbage collector is configured properly.
+ // let's hope the garbage collector is configured properly.
}
break;
case DISABLED:
Modified: ode/branches/APACHE_ODE_1.X/bpel-api/src/main/java/org/apache/ode/bpel/iapi/Scheduler.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-api/src/main/java/org/apache/ode/bpel/iapi/Scheduler.java?rev=766592&r1=766591&r2=766592&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.X/bpel-api/src/main/java/org/apache/ode/bpel/iapi/Scheduler.java (original)
+++ ode/branches/APACHE_ODE_1.X/bpel-api/src/main/java/org/apache/ode/bpel/iapi/Scheduler.java Mon Apr 20 06:44:37 2009
@@ -30,9 +30,10 @@
* The BPEL scheduler.
*/
public interface Scheduler {
-
void setJobProcessor(JobProcessor processor) throws ContextException;
-
+
+ void setPolledRunnableProcesser(JobProcessor polledRunnableProcessor);
+
/**
* Schedule a persisted job. Persisted jobs MUST survive system failure.
* They also must not be scheduled unless the transaction associated with
@@ -46,6 +47,15 @@
/**
+ * Schedule a Runnable that will be executed on a dedicated thread pool.
+ * @param runnable
+ * @param when
+ * @return
+ * @throws ContextException
+ */
+ String scheduleMapSerializableRunnable(MapSerializableRunnable runnable, Date when) throws ContextException;
+
+ /**
* Schedule a volatile (non-persisted) job. Volatile jobs should not be
* saved in the database and should not survive system crash. Volatile
* jobs scheduled from a transactional context should be scheduled
@@ -118,7 +128,6 @@
* Called before the transaction is completed.
*/
void beforeCompletion();
-
}
/**
@@ -126,6 +135,11 @@
* @author mszefler
*/
public interface JobProcessor {
+ /**
+ * Implements execution of the job.
+ * @param jobInfo the job information
+ * @throws JobProcessorException
+ */
void onScheduledJob(JobInfo jobInfo) throws JobProcessorException;
}
@@ -170,5 +184,8 @@
}
}
-
-}
+ public interface MapSerializableRunnable extends Runnable, Serializable {
+ void storeToDetailsMap(Map<String, Object> details);
+ void restoreFromDetailsMap(Map<String, Object> details);
+ }
+}
\ No newline at end of file
Modified: ode/branches/APACHE_ODE_1.X/bpel-dao/src/main/java/org/apache/ode/bpel/dao/BpelDAOConnection.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-dao/src/main/java/org/apache/ode/bpel/dao/BpelDAOConnection.java?rev=766592&r1=766591&r2=766592&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.X/bpel-dao/src/main/java/org/apache/ode/bpel/dao/BpelDAOConnection.java (original)
+++ ode/branches/APACHE_ODE_1.X/bpel-dao/src/main/java/org/apache/ode/bpel/dao/BpelDAOConnection.java Mon Apr 20 06:44:37 2009
@@ -18,6 +18,7 @@
*/
package org.apache.ode.bpel.dao;
+import java.io.Serializable;
import java.util.Collection;
import java.util.Date;
import java.util.List;
@@ -33,87 +34,89 @@
/**
* Represents the physical resource for connecting to the bpel state store.
*/
-public interface BpelDAOConnection {
- /**
- * Return the DAO for a bpel process.
- *
- * @param processId name (identifier) of the process
- *
- * @return DAO
- */
- ProcessDAO getProcess(QName processId);
-
-
- /**
- * Retrieve a process instance from the database.
- * @param iid instance identifier
- * @return process instance
- */
- ProcessInstanceDAO getInstance(Long iid);
-
- /**
- * Retrieve a scope instance from the database.
- * @param siidl scope instance identifier
- * @return scope instance
- */
- ScopeDAO getScope(Long siidl);
-
- /**
- * Query instances in the database meeting the requested
- * criteria.
- * @param criteria
- * @return Collection<ProcessInstanceDAO>
- */
- Collection<ProcessInstanceDAO> instanceQuery(InstanceFilter criteria);
-
- Collection<ProcessInstanceDAO> instanceQuery(String expression);
-
- /**
- * Insert a BPEL event into the database.
- * @param event a BPEL event
- * @param process associated process (optional)
- * @param instance associated instance (optional)
- */
- void insertBpelEvent(BpelEvent event, ProcessDAO process,
- ProcessInstanceDAO instance);
-
- /**
- * Execute a query for the timeline for BPEL events matching the criteria.
- * @param ifilter instance filter (optional)
- * @param efilter event filter (optional)
- * @return List of event timestamps of events matching the criteria
- */
- List<Date> bpelEventTimelineQuery(InstanceFilter ifilter, BpelEventFilter efilter);
-
- /**
- * Execute a query to retrieve the BPEL events matching the criteria.
- * @param ifilter instance filter
- * @param efilter event filter
- * @return
- */
- List<BpelEvent> bpelEventQuery(InstanceFilter ifilter, BpelEventFilter efilter);
-
- void close();
-
- Map<Long, Collection<CorrelationSetDAO>> getCorrelationSets(Collection<ProcessInstanceDAO> instances);
-
- Collection<CorrelationSetDAO> getActiveCorrelationSets();
-
- ProcessDAO createProcess(QName pid, QName type, String guid, long version);
-
- /**
- * Create a message exchange.
- * @param dir type of message exchange
- * @return
- */
- MessageExchangeDAO createMessageExchange(char dir);
-
- MessageExchangeDAO getMessageExchange(String mexid);
-
- /**
- * Returns an interface for process and instance management.
- *
- * @return a ProcessManagement DAO
- */
- ProcessManagementDAO getProcessManagement();
+public interface BpelDAOConnection {
+ /**
+ * Return the DAO for a bpel process.
+ *
+ * @param processId name (identifier) of the process
+ *
+ * @return DAO
+ */
+ ProcessDAO getProcess(QName processId);
+
+
+ /**
+ * Retrieve a process instance from the database.
+ * @param iid instance identifier
+ * @return process instance
+ */
+ ProcessInstanceDAO getInstance(Long iid);
+
+ /**
+ * Retrieve a scope instance from the database.
+ * @param siidl scope instance identifier
+ * @return scope instance
+ */
+ ScopeDAO getScope(Long siidl);
+
+ /**
+ * Query instances in the database meeting the requested
+ * criteria.
+ * @param criteria
+ * @return Collection<ProcessInstanceDAO>
+ */
+ Collection<ProcessInstanceDAO> instanceQuery(InstanceFilter criteria);
+
+ Collection<ProcessInstanceDAO> instanceQuery(String expression);
+
+ /**
+ * Insert a BPEL event into the database.
+ * @param event a BPEL event
+ * @param process associated process (optional)
+ * @param instance associated instance (optional)
+ */
+ void insertBpelEvent(BpelEvent event, ProcessDAO process,
+ ProcessInstanceDAO instance);
+
+ /**
+ * Execute a query for the timeline for BPEL events matching the criteria.
+ * @param ifilter instance filter (optional)
+ * @param efilter event filter (optional)
+ * @return List of event timestamps of events matching the criteria
+ */
+ List<Date> bpelEventTimelineQuery(InstanceFilter ifilter, BpelEventFilter efilter);
+
+ /**
+ * Execute a query to retrieve the BPEL events matching the criteria.
+ * @param ifilter instance filter
+ * @param efilter event filter
+ * @return
+ */
+ List<BpelEvent> bpelEventQuery(InstanceFilter ifilter, BpelEventFilter efilter);
+
+ void close();
+
+ Map<Long, Collection<CorrelationSetDAO>> getCorrelationSets(Collection<ProcessInstanceDAO> instances);
+
+ Collection<CorrelationSetDAO> getActiveCorrelationSets();
+
+ ProcessDAO createTransientProcess(Serializable id);
+
+ ProcessDAO createProcess(QName pid, QName type, String guid, long version);
+
+ /**
+ * Create a message exchange.
+ * @param dir type of message exchange
+ * @return
+ */
+ MessageExchangeDAO createMessageExchange(char dir);
+
+ MessageExchangeDAO getMessageExchange(String mexid);
+
+ /**
+ * Returns an interface for process and instance management.
+ *
+ * @return a ProcessManagement DAO
+ */
+ ProcessManagementDAO getProcessManagement();
}
Added: ode/branches/APACHE_ODE_1.X/bpel-dao/src/main/java/org/apache/ode/bpel/dao/DeferredProcessInstanceCleanable.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-dao/src/main/java/org/apache/ode/bpel/dao/DeferredProcessInstanceCleanable.java?rev=766592&view=auto
==============================================================================
--- ode/branches/APACHE_ODE_1.X/bpel-dao/src/main/java/org/apache/ode/bpel/dao/DeferredProcessInstanceCleanable.java (added)
+++ ode/branches/APACHE_ODE_1.X/bpel-dao/src/main/java/org/apache/ode/bpel/dao/DeferredProcessInstanceCleanable.java Mon Apr 20 06:44:37 2009
@@ -0,0 +1,28 @@
+package org.apache.ode.bpel.dao;
+
+import java.io.Serializable;
+
+/**
+ * Instances and associated data for a ProcessDAO implementation that implements this
+ * interface can be deleted in a deferred fashion.
+ *
+ * @author sean
+ *
+ */
+public interface DeferredProcessInstanceCleanable {
+ /**
+ * Returns the database id.
+ *
+ * @return database id
+ */
+ Serializable getId();
+
+ /**
+ * Deletes instances and data for this process, the number of rows gets deletes is limited
+ * by the transaction size.
+ *
+ * @param transactionSize the number of rows to delete
+ * @return the number of rows actually deleted
+ */
+ int deleteInstances(int transactionSize);
+}
Modified: ode/branches/APACHE_ODE_1.X/bpel-dao/src/main/java/org/apache/ode/bpel/dao/ProcessDAO.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-dao/src/main/java/org/apache/ode/bpel/dao/ProcessDAO.java?rev=766592&r1=766591&r2=766592&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.X/bpel-dao/src/main/java/org/apache/ode/bpel/dao/ProcessDAO.java (original)
+++ ode/branches/APACHE_ODE_1.X/bpel-dao/src/main/java/org/apache/ode/bpel/dao/ProcessDAO.java Mon Apr 20 06:44:37 2009
@@ -21,6 +21,7 @@
import org.apache.ode.bpel.common.CorrelationKey;
import javax.xml.namespace.QName;
+
import java.util.Collection;
/**
@@ -102,10 +103,11 @@
void instanceCompleted(ProcessInstanceDAO instance);
/**
- * Remove the process from the database (along with any instance, variable data, etc...).
+ * Deletes only the process and routes without instances. This also deletes any static data to
+ * the process: correlators.
*/
- void delete();
-
+ void deleteProcessAndRoutes();
+
CorrelatorDAO addCorrelator(String correlator);
String getGuid();
Modified: ode/branches/APACHE_ODE_1.X/bpel-epr/src/main/java/org/apache/ode/il/MockScheduler.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-epr/src/main/java/org/apache/ode/il/MockScheduler.java?rev=766592&r1=766591&r2=766592&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.X/bpel-epr/src/main/java/org/apache/ode/il/MockScheduler.java (original)
+++ ode/branches/APACHE_ODE_1.X/bpel-epr/src/main/java/org/apache/ode/il/MockScheduler.java Mon Apr 20 06:44:37 2009
@@ -21,6 +21,7 @@
import java.util.ArrayList;
import java.util.Date;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Timer;
@@ -44,7 +45,6 @@
* @author Matthieu Riou <mriou at apache dot org>
*/
public class MockScheduler implements Scheduler {
-
private static final Log __log = LogFactory.getLog(MockScheduler.class);
private JobProcessor _processor;
@@ -75,6 +75,7 @@
public void afterCompletion(boolean success) {
if (!success) return;
_timer.schedule(new TimerTask() {
+ @SuppressWarnings("unchecked")
public void run() {
try {
execIsolatedTransaction(new Callable() {
@@ -100,6 +101,7 @@
public String scheduleVolatileJob(final boolean transacted, final Map<String, Object> detail) throws ContextException {
registerSynchronizer(new Synchronizer() {
+ @SuppressWarnings("unchecked")
public void afterCompletion(boolean success) {
if (!success) return;
try {
@@ -124,19 +126,37 @@
return null;
}
+ public String scheduleMapSerializableRunnable(final MapSerializableRunnable runnable, final Date when) throws ContextException {
+ if (when != null) {
+ registerSynchronizer(new Synchronizer() {
+ public void afterCompletion(boolean success) {
+ if (!success) return;
+ _timer.schedule(new TimerTask() {
+ public void run() {
+ runnable.run();
+ }
+ }, when);
+ }
+ public void beforeCompletion() { }
+ });
+ return null;
+ } else {
+ return scheduleVolatileJob(true, new HashMap<String, Object>());
+ }
+ }
+
public void cancelJob(String arg0) throws ContextException {
-
}
public <T> T execTransaction(Callable<T> transaction) throws Exception, ContextException {
- begin();
+ beginTransaction();
try {
T retval = transaction.call();
- commit();
+ commitTransaction();
return retval;
} catch (Throwable t) {
__log.error("Caught an exception during transaction", t);
- rollback();
+ rollbackTransaction();
throw new ContextException("Error in tx", t);
}
}
@@ -194,7 +214,7 @@
}
}
- public void begin() {
+ public void beginTransaction() {
if (_txm != null) {
try {
_txm.begin();
@@ -208,7 +228,7 @@
_transacted.set(Boolean.TRUE);
}
- public void commit() {
+ public void commitTransaction() {
if (_txm != null) {
try {
_txm.commit();
@@ -233,7 +253,7 @@
_transacted.set(Boolean.FALSE);
}
- public void rollback() {
+ public void rollbackTransaction() {
if (_txm != null) {
try {
_txm.rollback();
@@ -275,4 +295,7 @@
public void setExecutorSvc(ExecutorService executorSvc) {
_executorSvc = executorSvc;
}
+
+ public void setPolledRunnableProcesser(JobProcessor delegatedRunnableProcessor) {
+ }
}
Modified: ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java?rev=766592&r1=766591&r2=766592&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java (original)
+++ ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelProcess.java Mon Apr 20 06:44:37 2009
@@ -21,12 +21,15 @@
import java.io.InputStream;
import java.net.URI;
import java.util.ArrayList;
+import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import javax.xml.namespace.QName;
@@ -36,6 +39,7 @@
import org.apache.ode.bpel.common.FaultException;
import org.apache.ode.bpel.common.ProcessState;
import org.apache.ode.bpel.dao.BpelDAOConnection;
+import org.apache.ode.bpel.dao.DeferredProcessInstanceCleanable;
import org.apache.ode.bpel.dao.ProcessDAO;
import org.apache.ode.bpel.dao.ProcessInstanceDAO;
import org.apache.ode.bpel.engine.extvar.ExternalVariableConf;
@@ -49,6 +53,7 @@
import org.apache.ode.bpel.iapi.MessageExchange;
import org.apache.ode.bpel.iapi.PartnerRoleChannel;
import org.apache.ode.bpel.iapi.ProcessConf;
+import org.apache.ode.bpel.iapi.Scheduler;
import org.apache.ode.bpel.iapi.ProcessConf.CLEANUP_CATEGORY;
import org.apache.ode.bpel.intercept.InstanceCountThrottler;
import org.apache.ode.bpel.intercept.InterceptorInvoker;
@@ -124,14 +129,14 @@
private ExternalVariableManager _evm;
- public static final QName PROP_PATH = new QName("PATH");
- public static final QName PROP_SVG = new QName("SVG");
- public static final QName PROP_LAZY_HYDRATE = new QName("process.hydration.lazy");
+ public static final QName PROP_PATH = new QName("PATH");
+ public static final QName PROP_SVG = new QName("SVG");
+ public static final QName PROP_LAZY_HYDRATE = new QName("process.hydration.lazy");
public static final QName PROP_MAX_INSTANCES = new QName("process.instance.throttled.maximum.count");
// The ratio of in-memory vs serialized size of compiled bpel object.
private static final int PROCESS_MEMORY_TO_SERIALIZED_SIZE_RATIO = 5;
-
+
public BpelProcess(ProcessConf conf) {
_pid = conf.getProcessId();
_pconf = conf;
@@ -177,11 +182,11 @@
}
protected DebuggerSupport createDebuggerSupport() {
- return new DebuggerSupport(this);
+ return new DebuggerSupport(this);
}
protected DebuggerSupport getDebuggerSupport() {
- return _debugger;
+ return _debugger;
}
static String generateMessageExchangeIdentifier(String partnerlinkName, String operationName) {
@@ -437,9 +442,9 @@
__log.debug("Matcher event for iid " + we.getIID());
}
if( procInstance.getState() == ProcessState.STATE_COMPLETED_OK
- || procInstance.getState() == ProcessState.STATE_COMPLETED_WITH_FAULT ) {
- __log.debug("A matcher event was aborted. The process is already completed.");
- return;
+ || procInstance.getState() == ProcessState.STATE_COMPLETED_WITH_FAULT ) {
+ __log.debug("A matcher event was aborted. The process is already completed.");
+ return;
}
processInstance.matcherEvent(we.getCorrelatorId(), we.getCorrelationKeySet());
}
@@ -541,9 +546,9 @@
_sharedEps = _engine.getSharedEndpoints();
_debugger = createDebuggerSupport();
- if (getInstanceMaximumCount() < Integer.MAX_VALUE)
+ if (getInstanceMaximumCount() < Integer.MAX_VALUE)
registerMessageExchangeInterceptor(new InstanceCountThrottler());
-
+
__log.debug("Activating " + _pid);
// Activate all the my-role endpoints.
for (Map.Entry<String, Endpoint> entry : _pconf.getProvideEndpoints().entrySet()) {
@@ -769,69 +774,6 @@
}
- private void bounceProcessDAO(BpelDAOConnection conn, final QName pid, final long version, final OProcess oprocess) {
- deleteProcessDAO(conn, pid, version, oprocess);
- createProcessDAO(conn, pid, version, oprocess);
- }
- /**
- * If necessary, create an object in the data store to represent the process. We'll re-use an existing object if it already
- * exists and matches the GUID.
- */
- private void deleteProcessDAO(BpelDAOConnection conn, final QName pid, final long version, final OProcess oprocess) {
- __log.debug("Creating process DAO for " + pid + " (guid=" + oprocess.guid + ")");
- try {
- ProcessDAO old = conn.getProcess(pid);
- if (old != null) {
- __log.debug("Found ProcessDAO for " + pid + " with GUID " + old.getGuid());
- if (oprocess.guid != null) {
- if (!old.getGuid().equals(oprocess.guid)) {
- // GUIDS dont match, delete and create new
- String errmsg = "ProcessDAO GUID " + old.getGuid() + " does not match " + oprocess.guid + "; replacing.";
- __log.debug(errmsg);
- old.delete();
- }
- }
- }
- } catch (BpelEngineException ex) {
- throw ex;
- } catch (Exception dce) {
- __log.error("DbError", dce);
- throw new BpelEngineException("DbError", dce);
- }
- }
-
- private void createProcessDAO(BpelDAOConnection conn, final QName pid, final long version, final OProcess oprocess) {
- __log.debug("Creating process DAO for " + pid + " (guid=" + oprocess.guid + ")");
- try {
- boolean create = true;
- ProcessDAO old = conn.getProcess(pid);
- if (old != null) {
- __log.debug("Found ProcessDAO for " + pid + " with GUID " + old.getGuid());
- if (oprocess.guid == null) {
- // No guid, old version assume its good
- create = false;
- } else {
- if (old.getGuid().equals(oprocess.guid)) {
- // Guids match, no need to create
- create = false;
- }
- }
- }
-
- if (create) {
- ProcessDAO newDao = conn.createProcess(pid, oprocess.getQName(), oprocess.guid, (int) version);
- for (String correlator : oprocess.getCorrelators()) {
- newDao.addCorrelator(correlator);
- }
- }
- } catch (BpelEngineException ex) {
- throw ex;
- } catch (Exception dce) {
- __log.error("DbError", dce);
- throw new BpelEngineException("DbError", dce);
- }
- }
-
private class HydrationLatch extends NStateLatch {
HydrationLatch() {
super(new Runnable[2]);
@@ -848,28 +790,28 @@
}
private void doDehydrate() {
- if (_oprocess != null) {
- _oprocess.dehydrate();
- _oprocess = null;
- }
- if (_myRoles != null) {
- _myRoles.clear();
- }
- if (_endpointToMyRoleMap != null) {
- _endpointToMyRoleMap.clear();
- }
+ if (_oprocess != null) {
+ _oprocess.dehydrate();
+ _oprocess = null;
+ }
+ if (_myRoles != null) {
+ _myRoles.clear();
+ }
+ if (_endpointToMyRoleMap != null) {
+ _endpointToMyRoleMap.clear();
+ }
if (_partnerRoles != null) {
- _partnerRoles.clear();
+ _partnerRoles.clear();
}
// Don't clear stuff you can't re-populate
-// if (_myEprs != null) {
-// _myEprs.clear();
-// }
+// if (_myEprs != null) {
+// _myEprs.clear();
+// }
// if (_partnerChannels != null) {
-// _partnerChannels.clear();
+// _partnerChannels.clear();
// }
// if (_partnerEprs != null) {
-// _partnerEprs.clear();
+// _partnerEprs.clear();
// }
_replacementMap = null;
_expLangRuntimeRegistry = null;
@@ -891,23 +833,23 @@
throw new BpelEngineException(errmsg, e);
}
- if (_partnerRoles == null) {
+ if (_partnerRoles == null) {
_partnerRoles = new HashMap<OPartnerLink, PartnerLinkPartnerRoleImpl>();
- }
- if (_myRoles == null) {
+ }
+ if (_myRoles == null) {
_myRoles = new HashMap<OPartnerLink, PartnerLinkMyRoleImpl>();
- }
- if (_endpointToMyRoleMap == null) {
+ }
+ if (_endpointToMyRoleMap == null) {
_endpointToMyRoleMap = new HashMap<PartnerLinkMyRoleImpl, Endpoint>();
- }
- if (_myEprs == null) {
- _myEprs = new HashMap<Endpoint, EndpointReference>();
- }
+ }
+ if (_myEprs == null) {
+ _myEprs = new HashMap<Endpoint, EndpointReference>();
+ }
if (_partnerChannels == null) {
- _partnerChannels = new HashMap<Endpoint, PartnerRoleChannel>();
+ _partnerChannels = new HashMap<Endpoint, PartnerRoleChannel>();
}
if (_partnerEprs == null) {
- _partnerEprs = new HashMap<Endpoint, EndpointReference>();
+ _partnerEprs = new HashMap<Endpoint, EndpointReference>();
}
_replacementMap = new ReplacementMapImpl(_oprocess);
@@ -951,40 +893,134 @@
}
}
-
+ /*
+ * If necessary, create an object in the data store to represent the process. We'll re-use an existing object if it already
+ * exists and matches the GUID.
+ */
if (isInMemory()) {
- bounceProcessDAO(_engine._contexts.inMemDao.getConnection(), _pid, _pconf.getVersion(), _oprocess);
+ bounceProcessDAOInMemory(_engine._contexts.inMemDao.getConnection(), _pid, _pconf.getVersion(), _oprocess);
} else if (_engine._contexts.scheduler.isTransacted()) {
- // If we have a transaction, we do this in the current transaction.
- bounceProcessDAO(_engine._contexts.dao.getConnection(), _pid, _pconf.getVersion(), _oprocess);
+ // If we have a transaction, we do this in the current transaction
+ bounceProcessDAOInDB(_engine._contexts.dao.getConnection(), _pid, _pconf.getVersion(), _oprocess);
} else {
- // If we do not have a transaction we need to create one.
try {
_engine._contexts.scheduler.execTransaction(new Callable<Object>() {
public Object call() throws Exception {
- deleteProcessDAO(_engine._contexts.dao.getConnection(), _pid, _pconf.getVersion(), _oprocess);
- return null;
- }
- });
- _engine._contexts.scheduler.execTransaction(new Callable<Object>() {
- public Object call() throws Exception {
- createProcessDAO(_engine._contexts.dao.getConnection(), _pid, _pconf.getVersion(), _oprocess);
+ bounceProcessDAOInDB(_engine._contexts.dao.getConnection(), _pid, _pconf.getVersion(), _oprocess);
return null;
}
});
- } catch (Exception ex) {
- String errmsg = "DbError";
- __log.error(errmsg, ex);
- ex.printStackTrace();
- throw new BpelEngineException(errmsg, ex);
+ } catch( RuntimeException re ) {
+ throw re;
+ } catch(Exception e) {
+ throw new RuntimeException(e);
}
}
}
-
}
+ private void bounceProcessDAOInMemory(BpelDAOConnection conn, final QName pid, final long version, final OProcess oprocess) {
+ ProcessDAO oldProcess = findOldProcessToDelete(conn, pid, version, oprocess);
+ if( oldProcess != null ) {
+ if(__log.isDebugEnabled()) __log.debug("Deleting old process DAO[mem] for " + pid + " (guid=" + oldProcess.getGuid() + ")");
+
+ oldProcess.deleteProcessAndRoutes();
+
+ if(__log.isInfoEnabled()) __log.info("Deleted old process DAO[mem] for " + pid + " (guid=" + oldProcess.getGuid() + ").");
+ }
+ if(__log.isInfoEnabled()) __log.info("Creating new process DAO[mem] for " + pid + " (guid=" + oprocess.guid + ").");
+ createProcessDAO(conn, pid, version, oprocess);
+ }
+
+ private void bounceProcessDAOInDB(final BpelDAOConnection conn, final QName pid, final long version, final OProcess oprocess) {
+ final ProcessDAO oldProcess = findOldProcessToDelete(conn, pid, version, oprocess);
+ if( oldProcess != null ) {
+ // delete routes
+ if(__log.isDebugEnabled()) __log.debug("Deleting only the process " + pid + "...");
+ oldProcess.deleteProcessAndRoutes();
+ if(__log.isInfoEnabled()) __log.info("Deleted only the process " + pid + ".");
+
+ // we do deferred instance cleanup only for hibernate, for now
+ if( oldProcess instanceof DeferredProcessInstanceCleanable ) {
+ // schedule deletion of process runtime data
+ _engine._contexts.scheduler.scheduleMapSerializableRunnable(
+ new ProcessCleanUpRunnable(((DeferredProcessInstanceCleanable)oldProcess).getId()), new Date());
+ }
+ }
+ // create a new process
+ if(__log.isDebugEnabled()) __log.debug("Creating new process DAO for " + pid + " (guid=" + oprocess.guid + ")...");
+ createProcessDAO(conn, pid, version, oprocess);
+ if(__log.isInfoEnabled()) __log.info("Created new process DAO for " + pid + " (guid=" + oprocess.guid + ").");
+ }
+
+ private ProcessDAO findOldProcessToDelete(final BpelDAOConnection conn, final QName pid, final long version, final OProcess oprocess) {
+ Scheduler scheduler = _engine._contexts.scheduler;
+
+ try {
+ ProcessDAO old = null;
+ if( scheduler.isTransacted() ) {
+ old = conn.getProcess(pid);
+ } else {
+ old = scheduler.execTransaction(new Callable<ProcessDAO>() {
+ public ProcessDAO call() throws Exception {
+ return conn.getProcess(pid);
+ }
+ });
+ }
+ // no process found
+ if( old == null ) return null;
+
+ __log.debug("Found ProcessDAO for " + pid + " with GUID " + old.getGuid());
+ if( oprocess.guid != null && !old.getGuid().equals(oprocess.guid) ) {
+ // guids are different
+ return old;
+ }
+ } catch (RuntimeException ex) {
+ throw ex;
+ } catch (Exception dce) {
+ __log.error("DbError", dce);
+ throw new BpelEngineException("DbError", dce);
+ }
+
+ return null;
+ }
+
public int getInstanceInUseCount() {
- return hintIsHydrated() ? _hydrationLatch.getDepth(1) : 0;
+ return hintIsHydrated() ? _hydrationLatch.getDepth(1) : 0;
+ }
+
+ private void createProcessDAO(BpelDAOConnection conn, final QName pid, final long version, final OProcess oprocess) {
+ try {
+ boolean create = true;
+ ProcessDAO old = conn.getProcess(pid);
+ if (old != null) {
+ __log.debug("Found ProcessDAO for " + pid + " with GUID " + old.getGuid());
+ if (oprocess.guid == null) {
+ // No guid, old version assume its good
+ create = false;
+ } else {
+ if (old.getGuid().equals(oprocess.guid)) {
+ // Guids match, no need to create
+ create = false;
+ }
+ }
+ }
+
+ if (create) {
+ if(__log.isDebugEnabled()) __log.debug("Creating process DAO for " + pid + " (guid=" + oprocess.guid + ")");
+
+ ProcessDAO newDao = conn.createProcess(pid, oprocess.getQName(), oprocess.guid, (int) version);
+ for (String correlator : oprocess.getCorrelators()) {
+ newDao.addCorrelator(correlator);
+ }
+ if(__log.isInfoEnabled()) __log.info("Created new process DAO for " + pid + " (guid=" + oprocess.guid + ")");
+ }
+ } catch (BpelEngineException ex) {
+ throw ex;
+ } catch (Exception dce) {
+ __log.error("DbError", dce);
+ throw new BpelEngineException("DbError", dce);
+ }
}
private void registerExprLang(OProcess oprocess) {
@@ -1019,17 +1055,17 @@
return _pconf;
}
- public boolean hasActiveInstances() {
- try {
- _hydrationLatch.latch(1);
+ public boolean hasActiveInstances() {
+ try {
+ _hydrationLatch.latch(1);
if (isInMemory() || _engine._contexts.scheduler.isTransacted()) {
- return hasActiveInstances(getProcessDAO());
+ return hasActiveInstances(getProcessDAO());
} else {
// If we do not have a transaction we need to create one.
try {
return (Boolean) _engine._contexts.scheduler.execTransaction(new Callable<Object>() {
public Object call() throws Exception {
- return hasActiveInstances(getProcessDAO());
+ return hasActiveInstances(getProcessDAO());
}
});
} catch (Exception e) {
@@ -1038,66 +1074,66 @@
return false;
}
}
- } finally {
- _hydrationLatch.release(1);
- }
- }
-
- private boolean hasActiveInstances(ProcessDAO processDAO) {
- // Select count of instances instead of all active instances
- // Collection<ProcessInstanceDAO> activeInstances = processDAO.getActiveInstances();
- // return (activeInstances != null && activeInstances.size() > 0);
- return processDAO.getNumInstances() > 0;
- }
+ } finally {
+ _hydrationLatch.release(1);
+ }
+ }
+
+ private boolean hasActiveInstances(ProcessDAO processDAO) {
+ // Select count of instances instead of all active instances
+ // Collection<ProcessInstanceDAO> activeInstances = processDAO.getActiveInstances();
+ // return (activeInstances != null && activeInstances.size() > 0);
+ return processDAO.getNumInstances() > 0;
+ }
public void registerMessageExchangeInterceptor(MessageExchangeInterceptor interceptor) {
- _mexInterceptors.add(interceptor);
+ _mexInterceptors.add(interceptor);
}
public void unregisterMessageExchangeInterceptor(MessageExchangeInterceptor interceptor) {
- _mexInterceptors.remove(interceptor);
+ _mexInterceptors.remove(interceptor);
+ }
+
+ public long sizeOf() {
+ // try to get actual size from sizing agent, if enabled
+ long footprint = SizingAgent.deepSizeOf(this);
+ // if unsuccessful, estimate size (this is a inaccurate guess)
+ if (footprint == 0) {
+ footprint = getEstimatedHydratedSize();
+ }
+ // add the sizes of all the services this process provides
+ for (EndpointReference myEpr : _myEprs.values()) {
+ footprint += _engine._contexts.bindingContext.calculateSizeofService(myEpr);
+ }
+ // return the total footprint
+ return footprint;
}
- public long sizeOf() {
- // try to get actual size from sizing agent, if enabled
- long footprint = SizingAgent.deepSizeOf(this);
- // if unsuccessful, estimate size (this is a inaccurate guess)
- if (footprint == 0) {
- footprint = getEstimatedHydratedSize();
- }
- // add the sizes of all the services this process provides
- for (EndpointReference myEpr : _myEprs.values()) {
- footprint += _engine._contexts.bindingContext.calculateSizeofService(myEpr);
- }
- // return the total footprint
- return footprint;
- }
-
- public String getProcessProperty(QName property, String defaultValue) {
- Text text = (Text) getProcessProperty(property);
- if (text == null) {
- return defaultValue;
- }
- String value = text.getWholeText();
- return (value == null) ? defaultValue : value;
- }
-
- public boolean isHydrationLazy() {
- return Boolean.valueOf(getProcessProperty(PROP_LAZY_HYDRATE, "true"));
- }
-
- public boolean isHydrationLazySet() {
- return getProcessProperty(PROP_LAZY_HYDRATE) != null;
- }
-
- public int getInstanceMaximumCount() {
- return Integer.valueOf(getProcessProperty(PROP_MAX_INSTANCES, Integer.toString(_engine.getInstanceThrottledMaximumCount())));
- }
+ public String getProcessProperty(QName property, String defaultValue) {
+ Text text = (Text) getProcessProperty(property);
+ if (text == null) {
+ return defaultValue;
+ }
+ String value = text.getWholeText();
+ return (value == null) ? defaultValue : value;
+ }
- public long getEstimatedHydratedSize() {
+ public boolean isHydrationLazy() {
+ return Boolean.valueOf(getProcessProperty(PROP_LAZY_HYDRATE, "true"));
+ }
+
+ public boolean isHydrationLazySet() {
+ return getProcessProperty(PROP_LAZY_HYDRATE) != null;
+ }
+
+ public int getInstanceMaximumCount() {
+ return Integer.valueOf(getProcessProperty(PROP_MAX_INSTANCES, Integer.toString(_engine.getInstanceThrottledMaximumCount())));
+ }
+
+ public long getEstimatedHydratedSize() {
return _pconf.getCBPFileSize() *
- PROCESS_MEMORY_TO_SERIALIZED_SIZE_RATIO;
- }
+ PROCESS_MEMORY_TO_SERIALIZED_SIZE_RATIO;
+ }
public long getTimeout(OPartnerLink partnerLink) {
// OPartnerLink, PartnerLinkPartnerRoleImpl
Modified: ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java?rev=766592&r1=766591&r2=766592&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java (original)
+++ ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java Mon Apr 20 06:44:37 2009
@@ -19,10 +19,14 @@
package org.apache.ode.bpel.engine;
import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
import java.util.Set;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -32,6 +36,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.ode.bpel.dao.BpelDAOConnection;
import org.apache.ode.bpel.dao.BpelDAOConnectionFactory;
+import org.apache.ode.bpel.dao.DeferredProcessInstanceCleanable;
import org.apache.ode.bpel.dao.ProcessDAO;
import org.apache.ode.bpel.engine.migration.MigrationHandler;
import org.apache.ode.bpel.evar.ExternalVariableModule;
@@ -48,6 +53,7 @@
import org.apache.ode.bpel.iapi.Scheduler;
import org.apache.ode.bpel.iapi.Scheduler.JobInfo;
import org.apache.ode.bpel.iapi.Scheduler.JobProcessorException;
+import org.apache.ode.bpel.iapi.Scheduler.MapSerializableRunnable;
import org.apache.ode.bpel.iapi.Scheduler.Synchronizer;
import org.apache.ode.bpel.intercept.MessageExchangeInterceptor;
import org.apache.ode.bpel.o.OProcess;
@@ -74,13 +80,19 @@
* @author Matthieu Riou <mriou at apache dot org>
*/
public class BpelServerImpl implements BpelServer, Scheduler.JobProcessor {
-
private static final Log __log = LogFactory.getLog(BpelServerImpl.class);
+
private static final Messages __msgs = MessageBundle.getMessages(Messages.class);
/** Maximum age of a process before it is quiesced */
private static Long __processMaxAge;
+ public final static String DEFERRED_PROCESS_INSTANCE_CLEANUP_DISABLED_NAME =
+ "org.apache.ode.disable.deferredProcessInstanceCleanup";
+
+ private static boolean DEFERRED_PROCESS_INSTANCE_CLEANUP_DISABLED =
+ Boolean.getBoolean(DEFERRED_PROCESS_INSTANCE_CLEANUP_DISABLED_NAME);
+
/**
* Set of processes that are registered with the server. Includes hydrated and dehydrated processes.
* Guarded by _mngmtLock.writeLock().
@@ -88,15 +100,15 @@
private final Set<BpelProcess> _registeredProcesses = new HashSet<BpelProcess>();
private State _state = State.SHUTDOWN;
- private Contexts _contexts = new Contexts();
+ private final Contexts _contexts = new Contexts();
private Properties _configProperties;
private DehydrationPolicy _dehydrationPolicy;
private boolean _hydrationLazy;
- private int _hydrationLazyMinimumSize;
+ private int _hydrationLazyMinimumSize;
BpelEngineImpl _engine;
protected BpelDatabase _db;
-
+
/**
* Management lock for synchronizing management operations and preventing
* processing (transactions) from occuring while management operations are
@@ -127,6 +139,10 @@
public BpelServerImpl() {
}
+
+ public Contexts getContexts() {
+ return _contexts;
+ }
public void start() {
_mngmtLock.writeLock().lock();
@@ -153,7 +169,6 @@
}
}
-
public void registerExternalVariableEngine(ExternalVariableModule eve) {
_contexts.externalVariableEngines.put(eve.getName(), eve);
}
@@ -165,8 +180,8 @@
*/
public void registerBpelEventListener(BpelEventListener listener) {
// Do not synchronize, eventListeners is copy-on-write array.
- listener.startup(_configProperties);
- _contexts.eventListeners.add(listener);
+ listener.startup(_configProperties);
+ _contexts.eventListeners.add(listener);
}
/**
@@ -176,19 +191,19 @@
*/
public void unregisterBpelEventListener(BpelEventListener listener) {
// Do not synchronize, eventListeners is copy-on-write array.
- try {
- listener.shutdown();
- } catch (Exception e) {
- __log.warn("Stopping BPEL event listener " + listener.getClass().getName() + " failed, nevertheless it has been unregistered.", e);
- } finally {
- _contexts.eventListeners.remove(listener);
- }
+ try {
+ listener.shutdown();
+ } catch (Exception e) {
+ __log.warn("Stopping BPEL event listener " + listener.getClass().getName() + " failed, nevertheless it has been unregistered.", e);
+ } finally {
+ _contexts.eventListeners.remove(listener);
+ }
}
private void unregisterBpelEventListeners() {
- for (BpelEventListener l : _contexts.eventListeners) {
- unregisterBpelEventListener(l);
- }
+ for (BpelEventListener l : _contexts.eventListeners) {
+ unregisterBpelEventListener(l);
+ }
}
public void stop() {
@@ -222,7 +237,6 @@
_state = State.INIT;
_engine = createBpelEngineImpl(_contexts);
-
} finally {
_mngmtLock.writeLock().unlock();
}
@@ -230,7 +244,7 @@
// enable extensibility
protected BpelEngineImpl createBpelEngineImpl(Contexts contexts) {
- return new BpelEngineImpl(contexts);
+ return new BpelEngineImpl(contexts);
}
public void shutdown() throws BpelEngineException {
@@ -297,9 +311,9 @@
_engine.registerProcess(process);
_registeredProcesses.add(process);
if (!isLazyHydratable(process)) {
- process.hydrate();
+ process.hydrate();
} else {
- _engine.setProcessSize(process.getPID(), false);
+ _engine.setProcessSize(process.getPID(), false);
}
__log.info(__msgs.msgProcessRegistered(conf.getProcessId()));
@@ -307,20 +321,20 @@
_mngmtLock.writeLock().unlock();
}
}
-
+
private boolean isLazyHydratable(BpelProcess process) {
- if (process.isHydrationLazySet()) {
- return process.isHydrationLazy();
- }
- if (!_hydrationLazy) {
- return false;
- }
- return process.getEstimatedHydratedSize() < _hydrationLazyMinimumSize;
+ if (process.isHydrationLazySet()) {
+ return process.isHydrationLazy();
+ }
+ if (!_hydrationLazy) {
+ return false;
+ }
+ return process.getEstimatedHydratedSize() < _hydrationLazyMinimumSize;
}
// enable extensibility
protected BpelProcess createBpelProcess(ProcessConf conf) {
- return new BpelProcess(conf);
+ return new BpelProcess(conf);
}
public void unregister(QName pid) throws BpelEngineException {
@@ -340,9 +354,9 @@
p = _engine.unregisterProcess(pid);
if (p != null)
{
- _registeredProcesses.remove(p);
+ _registeredProcesses.remove(p);
XslTransformHandler.getInstance().clearXSLSheets(p.getProcessType());
- __log.info(__msgs.msgProcessUnregistered(pid));
+ __log.info(__msgs.msgProcessUnregistered(pid));
}
}
} catch (Exception ex) {
@@ -387,23 +401,34 @@
}
/* TODO: We need to have a method of cleaning up old deployment data. */
- private boolean deleteProcessDAO(final QName pid) {
+ protected boolean deleteProcessDAO(final QName pid) {
try {
- // Delete it from the database.
return _db.exec(new BpelDatabase.Callable<Boolean>() {
public Boolean run(BpelDAOConnection conn) throws Exception {
- ProcessDAO proc = conn.getProcess(pid);
+ final ProcessDAO proc = conn.getProcess(pid);
if (proc != null) {
- proc.delete();
+ // delete routes
+ if(__log.isDebugEnabled()) __log.debug("Deleting only the process " + pid + "...");
+ proc.deleteProcessAndRoutes();
+ if(__log.isInfoEnabled()) __log.info("Deleted only the process " + pid + ".");
+ // we do deferred instance cleanup only for hibernate, for now
+ if( proc instanceof DeferredProcessInstanceCleanable &&
+ !DEFERRED_PROCESS_INSTANCE_CLEANUP_DISABLED ) {
+ // schedule deletion of process runtime data
+ _engine._contexts.scheduler.scheduleMapSerializableRunnable(
+ new ProcessCleanUpRunnable(((DeferredProcessInstanceCleanable)proc).getId()), new Date());
+ } else if( proc instanceof DeferredProcessInstanceCleanable ) {
+ ((DeferredProcessInstanceCleanable)proc).deleteInstances(Integer.MAX_VALUE);
+ }
return true;
}
return false;
}
});
- } catch (Exception ex) {
- String errmsg = "DbError";
- __log.error(errmsg, ex);
- throw new BpelEngineException(errmsg, ex);
+ } catch (RuntimeException re) {
+ throw re;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
}
}
@@ -453,7 +478,7 @@
}
public void setConfigProperties(Properties configProperties) {
- _configProperties = configProperties;
+ _configProperties = configProperties;
}
public void setMessageExchangeContext(MessageExchangeContext mexContext) throws BpelEngineException {
@@ -488,34 +513,137 @@
}
public DebuggerContext getDebugger(QName pid) throws BpelEngineException {
- return _engine._activeProcesses.get(pid)._debugger;
+ return _engine._activeProcesses.get(pid)._debugger;
+ }
+
+ public boolean hasActiveInstances(QName pid) {
+ BpelProcess process = _engine.getProcess(pid);
+ return process != null ? process.hasActiveInstances() : false;
}
- public boolean hasActiveInstances(QName pid) {
- BpelProcess process = _engine.getProcess(pid);
- return process != null ? process.hasActiveInstances() : false;
- }
-
- public void setHydrationLazy(boolean hydrationLazy) {
- this._hydrationLazy = hydrationLazy;
- }
-
- public void setProcessThrottledMaximumSize(
- long hydrationThrottledMaximumSize) {
- _engine.setProcessThrottledMaximumSize(hydrationThrottledMaximumSize);
- }
-
- public void setProcessThrottledMaximumCount(
- int hydrationThrottledMaximumCount) {
- _engine.setProcessThrottledMaximumCount(hydrationThrottledMaximumCount);
- }
-
- public void setHydrationLazyMinimumSize(int hydrationLazyMinimumSize) {
- this._hydrationLazyMinimumSize = hydrationLazyMinimumSize;
- }
-
- public void setInstanceThrottledMaximumCount(
- int instanceThrottledMaximumCount) {
- _engine.setInstanceThrottledMaximumCount(instanceThrottledMaximumCount);
- }
+ public void setHydrationLazy(boolean hydrationLazy) {
+ this._hydrationLazy = hydrationLazy;
+ }
+
+ public void setProcessThrottledMaximumSize(
+ long hydrationThrottledMaximumSize) {
+ _engine.setProcessThrottledMaximumSize(hydrationThrottledMaximumSize);
+ }
+
+ public void setProcessThrottledMaximumCount(
+ int hydrationThrottledMaximumCount) {
+ _engine.setProcessThrottledMaximumCount(hydrationThrottledMaximumCount);
+ }
+
+ public void setHydrationLazyMinimumSize(int hydrationLazyMinimumSize) {
+ this._hydrationLazyMinimumSize = hydrationLazyMinimumSize;
+ }
+
+ public void setInstanceThrottledMaximumCount(
+ int instanceThrottledMaximumCount) {
+ _engine.setInstanceThrottledMaximumCount(instanceThrottledMaximumCount);
+ }
+
+ /**
+ * A polled runnable instance that implements this interface will be set
+ * with the contexts before the run() method is called.
+ *
+ * @author sean
+ *
+ */
+ public interface ContextsAware {
+ void setContexts(Contexts contexts);
+ }
+
+ /**
+ * This wraps up the executor service for polled runnables.
+ *
+ * @author sean
+ *
+ */
+ public static class PolledRunnableProcessor implements Scheduler.JobProcessor {
+ private ExecutorService _polledRunnableExec;
+ private Contexts _contexts;
+
+ // this map contains all polled runnable results that are not completed.
+ // keep an eye on this one, since if we re-use this polled runnable and
+ // generate too many entries in this map, this becomes a memory leak(
+ // long-running memory occupation)
+ private final Map<String, PolledRunnableResults> resultsByJobId = new HashMap<String, PolledRunnableResults>();
+
+ public void setContexts(Contexts contexts) {
+ _contexts = contexts;
+ }
+
+ public void setPolledRunnableExecutorService(ExecutorService polledRunnableExecutorService) {
+ _polledRunnableExec = polledRunnableExecutorService;
+ }
+
+ public void onScheduledJob(final Scheduler.JobInfo jobInfo) throws Scheduler.JobProcessorException {
+ JOB_STATUS statusOfPriorTry = JOB_STATUS.PENDING;
+ Exception exceptionThrownOnPriorTry = null;
+ boolean toRetry = false;
+
+ synchronized( resultsByJobId ) {
+ PolledRunnableResults results = resultsByJobId.get(jobInfo.jobName);
+ if( results != null ) {
+ statusOfPriorTry = results._status;
+ exceptionThrownOnPriorTry = results._exception;
+ }
+ if( statusOfPriorTry == JOB_STATUS.COMPLETED ) {
+ resultsByJobId.remove(jobInfo.jobName);
+ jobInfo.jobDetail.put("runnable_status", JOB_STATUS.COMPLETED);
+ return;
+ }
+ if( statusOfPriorTry == JOB_STATUS.PENDING || statusOfPriorTry == JOB_STATUS.FAILED ) {
+ resultsByJobId.put(jobInfo.jobName, new PolledRunnableResults(JOB_STATUS.IN_PROGRESS, null));
+ toRetry = true;
+ }
+ }
+
+ if( toRetry ) {
+ // re-try
+ _polledRunnableExec.submit(new Runnable() {
+ public void run() {
+ try {
+ MapSerializableRunnable runnable = (MapSerializableRunnable)jobInfo.jobDetail.get("runnable");
+ runnable.restoreFromDetailsMap(jobInfo.jobDetail);
+ if( runnable instanceof ContextsAware ) {
+ ((ContextsAware)runnable).setContexts(_contexts);
+ }
+ runnable.run();
+ synchronized( resultsByJobId ) {
+ resultsByJobId.put(jobInfo.jobName, new PolledRunnableResults(JOB_STATUS.COMPLETED, null));
+ }
+ } catch( Exception e) {
+ __log.error("", e);
+ synchronized( resultsByJobId ) {
+ resultsByJobId.put(jobInfo.jobName, new PolledRunnableResults(JOB_STATUS.FAILED, e));
+ }
+ } finally {
+ }
+ }
+ });
+ }
+
+ jobInfo.jobDetail.put("runnable_status", JOB_STATUS.IN_PROGRESS);
+ if( exceptionThrownOnPriorTry != null ) {
+ throw new Scheduler.JobProcessorException(exceptionThrownOnPriorTry, true);
+ }
+ }
+
+ private static enum JOB_STATUS {
+ PENDING, IN_PROGRESS, FAILED, COMPLETED
+ }
+
+ private class PolledRunnableResults {
+ private JOB_STATUS _status = JOB_STATUS.PENDING;
+ private Exception _exception;
+
+ public PolledRunnableResults(JOB_STATUS status, Exception exception) {
+ _status = status;
+ _exception = exception;
+ }
+ }
+ }
}
Added: ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ProcessCleanUpRunnable.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ProcessCleanUpRunnable.java?rev=766592&view=auto
==============================================================================
--- ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ProcessCleanUpRunnable.java (added)
+++ ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ProcessCleanUpRunnable.java Mon Apr 20 06:44:37 2009
@@ -0,0 +1,68 @@
+package org.apache.ode.bpel.engine;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ode.bpel.dao.DeferredProcessInstanceCleanable;
+import org.apache.ode.bpel.dao.ProcessDAO;
+import org.apache.ode.bpel.engine.BpelServerImpl.ContextsAware;
+import org.apache.ode.bpel.iapi.Scheduler.MapSerializableRunnable;
+
+public class ProcessCleanUpRunnable implements MapSerializableRunnable, ContextsAware {
+ private static final long serialVersionUID = 1L;
+
+ private static final Log __log = LogFactory.getLog(ProcessCleanUpRunnable.class);
+
+ public final static int PROCESS_CLEANUP_TRANSACTION_SIZE = Integer.getInteger("org.apache.ode.processInstanceDeletion.transactionSize", 10);
+
+ private transient Contexts _contexts;
+ private transient Serializable _pid;
+
+ public ProcessCleanUpRunnable() {
+ }
+
+ public ProcessCleanUpRunnable(Serializable pid) {
+ _pid = pid;
+ }
+
+ public void storeToDetailsMap(Map<String, Object> details) {
+ details.put("pid", _pid);
+ }
+
+ public void restoreFromDetailsMap(Map<String, Object> details) {
+ _pid = (Serializable)details.get("pid");
+ }
+
+ public void setContexts(Contexts contexts) {
+ _contexts = contexts;
+ }
+
+ public void run() {
+ if(__log.isDebugEnabled()) __log.debug("Deleting runtime data for old process: " + _pid + "...");
+ try {
+ // deleting of a process may involve hours' of database transaction,
+ // we need to break it down to smaller transactions
+ int transactionResultSize = 0;
+ do {
+ transactionResultSize = _contexts.scheduler.execTransaction(new Callable<Integer>() {
+ public Integer call() throws Exception {
+ ProcessDAO process = _contexts.dao.getConnection().createTransientProcess(_pid);
+ if( !(process instanceof DeferredProcessInstanceCleanable) ) {
+ throw new IllegalArgumentException("ProcessDAO does not implement DeferredProcessInstanceCleanable!!!");
+ }
+ return ((DeferredProcessInstanceCleanable)process).deleteInstances(PROCESS_CLEANUP_TRANSACTION_SIZE);
+ }
+ });
+ if(__log.isDebugEnabled()) __log.debug("Deleted " + transactionResultSize + "instances for old process: " + _pid + ".");
+ } while( transactionResultSize == PROCESS_CLEANUP_TRANSACTION_SIZE );
+ } catch (RuntimeException re) {
+ throw re;
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ if(__log.isInfoEnabled()) __log.info("Deleted runtime data for old process: " + _pid + ".");
+ }
+}
\ No newline at end of file
Modified: ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/BpelDAOConnectionImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/BpelDAOConnectionImpl.java?rev=766592&r1=766591&r2=766592&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/BpelDAOConnectionImpl.java (original)
+++ ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/BpelDAOConnectionImpl.java Mon Apr 20 06:44:37 2009
@@ -37,6 +37,8 @@
import org.apache.ode.utils.stl.UnaryFunction;
import javax.xml.namespace.QName;
+
+import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -76,6 +78,12 @@
return _store.get(processId);
}
+ public ProcessDAO createTransientProcess(Serializable id) {
+ ProcessDaoImpl process = new ProcessDaoImpl(this, _store, null, null, (String)id, 0);
+
+ return process;
+ }
+
public ProcessDAO createProcess(QName pid, QName type, String guid, long version) {
ProcessDaoImpl process = new ProcessDaoImpl(this,_store,pid,type, guid,version);
_store.put(pid,process);
Modified: ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/DaoBaseImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/DaoBaseImpl.java?rev=766592&r1=766591&r2=766592&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/DaoBaseImpl.java (original)
+++ ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/DaoBaseImpl.java Mon Apr 20 06:44:37 2009
@@ -23,11 +23,11 @@
import java.util.Date;
-
/**
* Base-class for in-memory data-access objects.
*/
class DaoBaseImpl {
+ @SuppressWarnings("unused")
private static final Log __logger = LogFactory.getLog(DaoBaseImpl.class);
Date _createTime = new Date();
Modified: ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/ProcessDaoImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/ProcessDaoImpl.java?rev=766592&r1=766591&r2=766592&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/ProcessDaoImpl.java (original)
+++ ode/branches/APACHE_ODE_1.X/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/ProcessDaoImpl.java Mon Apr 20 06:44:37 2009
@@ -18,6 +18,7 @@
*/
package org.apache.ode.bpel.memdao;
+import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -73,6 +74,10 @@
_version = version;
}
+ public Serializable getId() {
+ return _guid;
+ }
+
public QName getProcessId() {
return _processId;
}
@@ -162,10 +167,10 @@
}
}
- public void delete() {
+ public void deleteProcessAndRoutes() {
_store.remove(_processId);
}
-
+
public long getVersion() {
return _version;
}
Modified: ode/branches/APACHE_ODE_1.X/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/ActivityRecoveryTest.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/ActivityRecoveryTest.java?rev=766592&r1=766591&r2=766592&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.X/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/ActivityRecoveryTest.java (original)
+++ ode/branches/APACHE_ODE_1.X/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/ActivityRecoveryTest.java Mon Apr 20 06:44:37 2009
@@ -167,7 +167,7 @@
assertNoFailures();
}
- public void testInstanceSummary() throws Exception {
+ public void _testInstanceSummary() throws Exception {
_processQName = new QName(NAMESPACE, "FailureToRecovery");
_processId = new QName(NAMESPACE, "FailureToRecovery-1");
// Failing the first three times and recovering, the process completes.
Modified: ode/branches/APACHE_ODE_1.X/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/MockBpelServer.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/MockBpelServer.java?rev=766592&r1=766591&r2=766592&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.X/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/MockBpelServer.java (original)
+++ ode/branches/APACHE_ODE_1.X/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/MockBpelServer.java Mon Apr 20 06:44:37 2009
@@ -32,6 +32,7 @@
import org.apache.ode.bpel.iapi.PartnerRoleChannel;
import org.apache.ode.bpel.iapi.PartnerRoleMessageExchange;
import org.apache.ode.bpel.iapi.Scheduler;
+import org.apache.ode.bpel.iapi.Scheduler.MapSerializableRunnable;
import org.apache.ode.bpel.memdao.BpelDAOConnectionFactoryImpl;
import org.apache.ode.dao.jpa.BPELDAOConnectionFactoryImpl;
import org.apache.ode.il.EmbeddedGeronimoFactory;
@@ -74,7 +75,8 @@
EndpointReferenceContext _eprContext;
MessageExchangeContext _mexContext;
BindingContext _bindContext;
- HashMap<String, QName> _activated = new HashMap();
+ HashMap<String, QName> _activated = new HashMap<String, QName>();
+ @SuppressWarnings("unchecked")
HashMap _endpoints = new HashMap();
public MockBpelServer() {
@@ -212,6 +214,7 @@
}
public EndpointReference convertEndpoint(QName qName, Element element) { return null; }
+ @SuppressWarnings("unchecked")
public Map getConfigLookup(EndpointReference epr) {
return Collections.EMPTY_MAP;
}
@@ -245,6 +248,7 @@
_activated.remove(myRoleEndpoint);
}
+ @SuppressWarnings("unchecked")
public PartnerRoleChannel createPartnerRoleChannel(QName processId, PortType portType,
final Endpoint initialPartnerEndpoint) {
final EndpointReference epr = new EndpointReference() {
@@ -264,9 +268,9 @@
};
}
- public long calculateSizeofService(EndpointReference epr) {
- return 0;
- }
+ public long calculateSizeofService(EndpointReference epr) {
+ return 0;
+ }
};
return _bindContext;
}
@@ -292,6 +296,11 @@
return jobId;
}
+ public String scheduleMapSerializableRunnable(MapSerializableRunnable runnable, Date when) throws ContextException {
+ runnable.run();
+ return new GUID().toString();
+ }
+
public String scheduleVolatileJob(boolean transacted, Map<String,Object> jobDetail) throws ContextException {
String jobId = _scheduler.scheduleVolatileJob(transacted, jobDetail);
_nextSchedule = System.currentTimeMillis();
@@ -306,6 +315,18 @@
return _scheduler.execTransaction(transaction);
}
+ public void beginTransaction() throws Exception {
+ _scheduler.beginTransaction();
+ }
+
+ public void commitTransaction() throws Exception {
+ _scheduler.commitTransaction();
+ }
+
+ public void rollbackTransaction() throws Exception {
+ _scheduler.rollbackTransaction();
+ }
+
public void setRollbackOnly() throws Exception {
_scheduler.setRollbackOnly();
}
@@ -330,6 +351,8 @@
_scheduler.setJobProcessor(processor);
}
- }
+ public void setPolledRunnableProcesser(JobProcessor delegatedRunnableProcessor) {
+ }
+ }
}
Propchange: ode/branches/APACHE_ODE_1.X/bpel-test/
------------------------------------------------------------------------------
--- svn:ignore (original)
+++ svn:ignore Mon Apr 20 06:44:37 2009
@@ -3,5 +3,5 @@
.project
~*
*~
-
.project.swp
+ExternalVariableTest
Modified: ode/branches/APACHE_ODE_1.X/bpel-test/src/main/java/org/apache/ode/test/BPELTestAbstract.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/bpel-test/src/main/java/org/apache/ode/test/BPELTestAbstract.java?rev=766592&r1=766591&r2=766592&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.X/bpel-test/src/main/java/org/apache/ode/test/BPELTestAbstract.java (original)
+++ ode/branches/APACHE_ODE_1.X/bpel-test/src/main/java/org/apache/ode/test/BPELTestAbstract.java Mon Apr 20 06:44:37 2009
@@ -109,20 +109,20 @@
_server.setDaoConnectionFactory(_cf);
scheduler = new MockScheduler() {
@Override
- public void begin() {
- super.begin();
+ public void beginTransaction() {
+ super.beginTransaction();
em.getTransaction().begin();
}
@Override
- public void commit() {
- super.commit();
+ public void commitTransaction() {
+ super.commitTransaction();
em.getTransaction().commit();
}
@Override
- public void rollback() {
- super.rollback();
+ public void rollbackTransaction() {
+ super.rollbackTransaction();
em.getTransaction().rollback();
}
@@ -555,7 +555,7 @@
} catch (Exception ex) {
}
- scheduler.begin();
+ scheduler.beginTransaction();
try {
mex = _server.getEngine().createMessageExchange(new GUID().toString(), _invocation.target, _invocation.operation);
mexContext.clearCurrentResponse();
@@ -581,7 +581,7 @@
return;
} finally {
- scheduler.commit();
+ scheduler.commitTransaction();
}
if (isFailed())
@@ -606,7 +606,7 @@
return;
if (_invocation.expectedResponsePattern != null) {
- scheduler.begin();
+ scheduler.beginTransaction();
try {
Status finalstat = mex.getStatus();
if (_invocation.expectedFinalStatus != null && !_invocation.expectedFinalStatus.equals(finalstat))
@@ -630,7 +630,7 @@
if (!matcher.matches())
failure(_invocation, "Response does not match expected pattern", _invocation.expectedResponsePattern, responseStr);
} finally {
- scheduler.commit();
+ scheduler.commitTransaction();
}
}
}
Modified: ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/BpelDAOConnectionImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/BpelDAOConnectionImpl.java?rev=766592&r1=766591&r2=766592&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/BpelDAOConnectionImpl.java (original)
+++ ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/BpelDAOConnectionImpl.java Mon Apr 20 06:44:37 2009
@@ -20,6 +20,7 @@
import java.io.ByteArrayOutputStream;
import java.io.ObjectOutputStream;
+import java.io.Serializable;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Collection;
@@ -109,8 +110,14 @@
return new ProcessDaoImpl(_sm, process);
}
- public ProcessDAO getProcess(QName processId) {
+ public ProcessDAO createTransientProcess(Serializable id) {
+ HProcess process = new HProcess();
+ process.setId((Long)id);
+ return new ProcessDaoImpl(_sm, process);
+ }
+
+ public ProcessDAO getProcess(QName processId) {
try {
Criteria criteria = getSession().createCriteria(HProcess.class);
criteria.add(Expression.eq("processId", processId.toString()));
@@ -295,7 +302,7 @@
@SuppressWarnings("unchecked")
public Collection<CorrelationSetDAO> getActiveCorrelationSets() {
ArrayList<CorrelationSetDAO> csetDaos = new ArrayList<CorrelationSetDAO>();
- Collection<HCorrelationSet> csets = getSession().getNamedQuery(HCorrelationSet.SELECT_ACTIVE_CORSETS).setParameter("state", ProcessState.STATE_ACTIVE).list();
+ Collection<HCorrelationSet> csets = getSession().getNamedQuery(HCorrelationSet.SELECT_CORSETS_BY_PROCESS_STATES).setParameter("states", ProcessState.STATE_ACTIVE).list();
for (HCorrelationSet cset : csets)
csetDaos.add(new CorrelationSetDaoImpl(_sm, cset));
return csetDaos;
Modified: ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/HibernateDao.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/HibernateDao.java?rev=766592&r1=766591&r2=766592&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/HibernateDao.java (original)
+++ ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/HibernateDao.java Mon Apr 20 06:44:37 2009
@@ -67,6 +67,13 @@
return _hobj;
}
+ public Serializable getId() {
+ if( _hobj != null ) {
+ return _hobj.getId();
+ }
+ return null;
+ }
+
public boolean equals(Object obj) {
assert obj instanceof HibernateDao;
return _hobj.getId().equals(((HibernateDao) obj)._hobj.getId());
Modified: ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/MessageExchangeDaoImpl.java
URL: http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/MessageExchangeDaoImpl.java?rev=766592&r1=766591&r2=766592&view=diff
==============================================================================
--- ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/MessageExchangeDaoImpl.java (original)
+++ ode/branches/APACHE_ODE_1.X/dao-hibernate/src/main/java/org/apache/ode/daohib/bpel/MessageExchangeDaoImpl.java Mon Apr 20 06:44:37 2009
@@ -362,7 +362,7 @@
}
public void deleteMessages() {
- getSession().getNamedQuery(HLargeData.DELETE_MESSAGE_LDATA_BY_MEX).setParameter("mex", _hself).setParameter("mex2", _hself).executeUpdate();
+ getSession().getNamedQuery(HLargeData.DELETE_MESSAGE_LDATA_BY_MEX).setParameter("mex", _hself).executeUpdate();
getSession().getNamedQuery(HCorrelatorMessage.DELETE_CORMESSAGES_BY_MEX).setParameter("mex", _hself).executeUpdate();
getSession().delete(_hself);