You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by ms...@apache.org on 2007/09/06 07:46:57 UTC
svn commit: r573153 [6/9] - in /ode/trunk: ./
axis2/src/main/java/org/apache/ode/axis2/ bpel-api/src/
bpel-api/src/main/java/org/apache/ode/bpel/explang/
bpel-api/src/main/java/org/apache/ode/bpel/iapi/
bpel-api/src/main/java/org/apache/ode/bpel/pmapi/...
Modified: ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ProcessAndInstanceManagementImpl.java
URL: http://svn.apache.org/viewvc/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ProcessAndInstanceManagementImpl.java?rev=573153&r1=573152&r2=573153&view=diff
==============================================================================
--- ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ProcessAndInstanceManagementImpl.java (original)
+++ ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ProcessAndInstanceManagementImpl.java Wed Sep 5 22:46:42 2007
@@ -359,7 +359,7 @@
// INSTANCE ACTIONS
//
public InstanceInfoDocument fault(Long iid, QName faultname, Element faultData) {
- // TODO: Implement
+ // TODO: implement me!
return genInstanceInfoDocument(iid);
}
@@ -397,7 +397,7 @@
return null;
for (ActivityRecoveryDAO recovery : instance.getActivityRecoveries()) {
if (recovery.getActivityId() == aid) {
- BpelProcess process = _server._engine._activeProcesses.get(instance.getProcess().getProcessId());
+ BpelProcess process = _server.getBpelProcess(instance.getProcess().getProcessId());
if (process != null) {
process.recoverActivity(instance, recovery.getChannel(), aid, action, null);
break;
@@ -480,7 +480,7 @@
public ActivityExtInfoListDocument getExtensibilityElements(QName pid, Integer[] aids) {
ActivityExtInfoListDocument aeild = ActivityExtInfoListDocument.Factory.newInstance();
TActivitytExtInfoList taeil = aeild.addNewActivityExtInfoList();
- OProcess oprocess = _server._engine.getOProcess(pid);
+ OProcess oprocess = _server.getOProcess(pid);
for (int aid : aids) {
OBase obase = oprocess.getChild(aid);
@@ -516,7 +516,7 @@
*/
protected final DebuggerSupport getDebugger(QName procid) throws ManagementException {
- BpelProcess process = _server._engine._activeProcesses.get(procid);
+ BpelProcess process = _server.getBpelProcess(procid);
if (process == null)
throw new InvalidRequestException("The process \"" + procid + "\" is available.");
@@ -714,7 +714,7 @@
TDeploymentInfo depinfo = info.addNewDeploymentInfo();
depinfo.setPackage(pconf.getPackage());
- depinfo.setDocument(pconf.getBpelDocument());
+ //depinfo.setDocument(pconf.getBpelDocument());
depinfo.setDeployDate(toCalendar(pconf.getDeployDate()));
depinfo.setDeployer(pconf.getDeployer());
if (custom.includeInstanceSummary()) {
@@ -747,13 +747,13 @@
}
}
- OProcess oprocess = _server._engine.getOProcess(pconf.getProcessId());
+ OProcess oprocess = _server.getOProcess(pconf.getProcessId());
if (custom.includeEndpoints() && oprocess != null) {
TEndpointReferences eprs = info.addNewEndpoints();
for (OPartnerLink oplink : oprocess.getAllPartnerLinks()) {
if (oplink.hasPartnerRole() && oplink.initializePartnerRole) {
// TODO: this is very uncool.
- EndpointReference pepr = _server._engine._activeProcesses.get(pconf.getProcessId())
+ EndpointReference pepr = _server.getBpelProcess(pconf.getProcessId())
.getInitialPartnerRoleEPR(oplink);
if (pepr != null) {
@@ -765,7 +765,6 @@
}
}
- // TODO: add documents to the above data structure.
}
/**
@@ -1234,8 +1233,7 @@
else if ("version".equals(orderKey))
c = new Comparator<ProcessConf>() {
public int compare(ProcessConf o1, ProcessConf o2) {
- // TODO: implement version comparisons.
- return 0;
+ return (int) (o1.getVersion() - o2.getVersion());
}
};
else if ("deployed".equals(orderKey))
Modified: ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/WorkEvent.java
URL: http://svn.apache.org/viewvc/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/WorkEvent.java?rev=573153&r1=573152&r2=573153&view=diff
==============================================================================
--- ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/WorkEvent.java (original)
+++ ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/WorkEvent.java Wed Sep 5 22:46:42 2007
@@ -59,8 +59,24 @@
return _jobDetail;
}
+ public String toString() {
+ return "WorkEvent" + _jobDetail;
+ }
+
public enum Type {
- TIMER, RESUME, INVOKE_RESPONSE, MATCHER, INVOKE_INTERNAL
+ TIMER,
+
+ RESUME,
+
+ /** Response from partner (i.e. the result of a partner-role invoke) has been received. */
+ PARTNER_RESPONSE,
+
+ MATCHER,
+
+ /** Invoke a "my role" operation (i.e. implemented by the process). */
+ MYROLE_INVOKE,
+
+ MYROLE_INVOKE_ASYNC_RESPONSE
}
public String getChannel() {
@@ -99,16 +115,6 @@
public void setCorrelationKey(CorrelationKey ckey) {
_jobDetail.put("ckey", ckey == null ? null : ckey.toCanonicalString());
- }
-
- public void setInMem(boolean inmem) {
- _jobDetail.put("inmem", inmem);
- }
-
- public boolean isInMem() {
- Boolean bool = (Boolean) _jobDetail.get("inmem");
- if (bool == null) return false;
- else return bool;
}
public void setProcessId(QName pid) {
Modified: ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/FaultMessageExchangeException.java
URL: http://svn.apache.org/viewvc/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/FaultMessageExchangeException.java?rev=573153&r1=573152&r2=573153&view=diff
==============================================================================
--- ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/FaultMessageExchangeException.java (original)
+++ ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/FaultMessageExchangeException.java Wed Sep 5 22:46:42 2007
@@ -20,7 +20,7 @@
import javax.xml.namespace.QName;
-import org.apache.ode.bpel.iapi.Message;
+import org.w3c.dom.Element;
/**
* Exception thrown by {@link org.apache.ode.bpel.intercept.MessageExchangeInterceptor}
@@ -32,9 +32,9 @@
private static final long serialVersionUID = 1L;
private QName _faultName;
- private Message _faultData;
+ private Element _faultData;
- public FaultMessageExchangeException(String errmsg, QName faultName, Message faultData) {
+ public FaultMessageExchangeException(String errmsg, QName faultName, Element faultData) {
super(errmsg);
_faultName = faultName;
@@ -45,7 +45,7 @@
return _faultName;
}
- public Message getFaultData() {
+ public Element getFaultData() {
return _faultData;
}
}
Modified: ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/InterceptorInvoker.java
URL: http://svn.apache.org/viewvc/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/InterceptorInvoker.java?rev=573153&r1=573152&r2=573153&view=diff
==============================================================================
--- ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/InterceptorInvoker.java (original)
+++ ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/InterceptorInvoker.java Wed Sep 5 22:46:42 2007
@@ -18,10 +18,7 @@
*/
package org.apache.ode.bpel.intercept;
-import org.apache.ode.bpel.iapi.MessageExchange;
-import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
-import org.apache.ode.bpel.iapi.PartnerRoleMessageExchange;
-import org.apache.ode.bpel.intercept.MessageExchangeInterceptor.InterceptorContext;
+import org.apache.ode.bpel.intercept.MessageExchangeInterceptor.InterceptorEvent;
/**
* Helper for invoking the appropriate {@link org.apache.ode.bpel.intercept.MessageExchangeInterceptor}
@@ -34,35 +31,28 @@
private final String _name;
// Closures anyone?
- /** Invoke {@link MessageExchangeInterceptor#onProcessInvoked(MyRoleMessageExchange, InterceptorContext)} */
+ /** Invoke {@link MessageExchangeInterceptor#onProcessInvoked(MyRoleMessageExchange, InterceptorEvent)} */
public static final InterceptorInvoker __onProcessInvoked= new InterceptorInvoker("onProcessInvoked") {
- public void invoke(MessageExchangeInterceptor i, MessageExchange mex, InterceptorContext ictx)
+ public void invoke(MessageExchangeInterceptor i, InterceptorEvent ictx)
throws FailMessageExchangeException, FaultMessageExchangeException {
- i.onProcessInvoked((MyRoleMessageExchange) mex, ictx);
- }
- };
-
- /** Invoke {@link MessageExchangeInterceptor#onBpelServerInvoked(MyRoleMessageExchange, InterceptorContext)} */
- public static final InterceptorInvoker __onBpelServerInvoked = new InterceptorInvoker("onBpelServerInvoked") {
- public void invoke(MessageExchangeInterceptor i, MessageExchange mex, InterceptorContext ictx)
- throws FailMessageExchangeException, FaultMessageExchangeException {
- i.onBpelServerInvoked((MyRoleMessageExchange) mex, ictx);
+ i.onProcessInvoked(ictx);
}
};
- /** Invoke {@link MessageExchangeInterceptor#onPartnerInvoked(PartnerRoleMessageExchange, InterceptorContext)} */
+
+ /** Invoke {@link MessageExchangeInterceptor#onPartnerInvoked(PartnerRoleMessageExchange, InterceptorEvent)} */
public static final InterceptorInvoker __onPartnerInvoked = new InterceptorInvoker("onPartnerInvoked") {
- public void invoke(MessageExchangeInterceptor i, MessageExchange mex, InterceptorContext ictx)
+ public void invoke(MessageExchangeInterceptor i, InterceptorEvent ictx)
throws FailMessageExchangeException, FaultMessageExchangeException {
- i.onPartnerInvoked((PartnerRoleMessageExchange) mex, ictx);
+ i.onPartnerInvoked(ictx);
}
};
- /** Invoke {@link MessageExchangeInterceptor#onPartnerInvoked(PartnerRoleMessageExchange, InterceptorContext)} */
+ /** Invoke {@link MessageExchangeInterceptor#onPartnerInvoked(PartnerRoleMessageExchange, InterceptorEvent)} */
public static final InterceptorInvoker __onNewInstanceInvoked = new InterceptorInvoker("onNewInstanceInvoked") {
- public void invoke(MessageExchangeInterceptor i, MessageExchange mex, InterceptorContext ictx)
+ public void invoke(MessageExchangeInterceptor i, InterceptorEvent ictx)
throws FailMessageExchangeException, FaultMessageExchangeException {
- i.onNewInstanceInvoked((MyRoleMessageExchange) mex, ictx);
+ i.onNewInstanceInvoked(ictx);
}
};
@@ -71,7 +61,7 @@
_name = name;
}
- public abstract void invoke(MessageExchangeInterceptor i, MessageExchange mex, InterceptorContext ictx)
+ public abstract void invoke(MessageExchangeInterceptor i, InterceptorEvent ictx)
throws FailMessageExchangeException, FaultMessageExchangeException;
public String toString() {
Modified: ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/MessageExchangeInterceptor.java
URL: http://svn.apache.org/viewvc/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/MessageExchangeInterceptor.java?rev=573153&r1=573152&r2=573153&view=diff
==============================================================================
--- ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/MessageExchangeInterceptor.java (original)
+++ ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/MessageExchangeInterceptor.java Wed Sep 5 22:46:42 2007
@@ -19,14 +19,14 @@
package org.apache.ode.bpel.intercept;
import org.apache.ode.bpel.dao.BpelDAOConnection;
+import org.apache.ode.bpel.dao.MessageExchangeDAO;
import org.apache.ode.bpel.dao.ProcessDAO;
-import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
-import org.apache.ode.bpel.iapi.PartnerRoleMessageExchange;
import org.apache.ode.bpel.iapi.ProcessConf;
/**
- * Hook into the BPEL server that enables intercepting of message exchange
- * invocation.
+ * Hook into the BPEL server that enables intercepting of parntner/server invocations. This interface operates at
+ * a level that is a bit lower than the IAPI, as it allows access to internal engine datastructures. Caution should
+ * be used when implementing interceptors.
*
* @author Maciej Szefler
*
@@ -40,7 +40,7 @@
* @param mex
* message exchange
*/
- void onBpelServerInvoked(MyRoleMessageExchange mex, InterceptorContext ic)
+ void onBpelServerInvoked(InterceptorEvent ic)
throws FailMessageExchangeException, FaultMessageExchangeException;
/**
@@ -50,7 +50,7 @@
* @param mex
* message exchange
*/
- void onProcessInvoked(MyRoleMessageExchange mex, InterceptorContext ic)
+ void onProcessInvoked(InterceptorEvent ic)
throws FailMessageExchangeException, FaultMessageExchangeException;
/**
@@ -61,7 +61,7 @@
* @param mex
* message exchange
*/
- void onNewInstanceInvoked(MyRoleMessageExchange mex, InterceptorContext ic)
+ void onNewInstanceInvoked(InterceptorEvent ic)
throws FailMessageExchangeException, FaultMessageExchangeException;
/**
@@ -71,17 +71,30 @@
* @param mex
* message exchange
*/
- void onPartnerInvoked(PartnerRoleMessageExchange mex, InterceptorContext ic)
+ void onPartnerInvoked(InterceptorEvent ic)
throws FailMessageExchangeException, FaultMessageExchangeException;
- public interface InterceptorContext {
+ /**
+ * Representation of an intercept event.
+ *
+ * @author Maciej Szefler <mszefler at gmail dot com>
+ *
+ */
+ public interface InterceptorEvent {
+ /** Get the connection to the data store. */
BpelDAOConnection getConnection();
-
+
+ /** Get the DB representation of the process. */
ProcessDAO getProcessDAO();
+ /** Get the process configuration. */
ProcessConf getProcessConf();
+
+ /** Get the database representation of the message exchange. */
+ MessageExchangeDAO getMessageExchangeDAO();
+
}
}
Modified: ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/NoOpInterceptor.java
URL: http://svn.apache.org/viewvc/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/NoOpInterceptor.java?rev=573153&r1=573152&r2=573153&view=diff
==============================================================================
--- ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/NoOpInterceptor.java (original)
+++ ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/NoOpInterceptor.java Wed Sep 5 22:46:42 2007
@@ -18,8 +18,6 @@
*/
package org.apache.ode.bpel.intercept;
-import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
-import org.apache.ode.bpel.iapi.PartnerRoleMessageExchange;
/**
* No-Op implementation of the
@@ -31,25 +29,20 @@
*/
public class NoOpInterceptor implements MessageExchangeInterceptor {
- public void onBpelServerInvoked(MyRoleMessageExchange mex,
- InterceptorContext ic) throws FailMessageExchangeException,
- FaultMessageExchangeException {
- }
-
- public void onProcessInvoked(MyRoleMessageExchange mex,
- InterceptorContext ic) throws FailMessageExchangeException,
- FaultMessageExchangeException {
- }
-
- public void onPartnerInvoked(PartnerRoleMessageExchange mex,
- InterceptorContext ic) throws FailMessageExchangeException,
- FaultMessageExchangeException {
- }
-
- public void onNewInstanceInvoked(MyRoleMessageExchange mex,
- InterceptorContext ic) throws FailMessageExchangeException,
- FaultMessageExchangeException {
+ public void onBpelServerInvoked(InterceptorEvent ic) throws FailMessageExchangeException, FaultMessageExchangeException {
+
+ }
- }
+ public void onNewInstanceInvoked(InterceptorEvent ic) throws FailMessageExchangeException, FaultMessageExchangeException {
+
+ }
+
+ public void onPartnerInvoked(InterceptorEvent ic) throws FailMessageExchangeException, FaultMessageExchangeException {
+
+ }
+
+ public void onProcessInvoked(InterceptorEvent ic) throws FailMessageExchangeException, FaultMessageExchangeException {
+
+ }
}
Modified: ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/ThrottlingInterceptor.java
URL: http://svn.apache.org/viewvc/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/ThrottlingInterceptor.java?rev=573153&r1=573152&r2=573153&view=diff
==============================================================================
--- ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/ThrottlingInterceptor.java (original)
+++ ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/ThrottlingInterceptor.java Wed Sep 5 22:46:42 2007
@@ -18,12 +18,12 @@
*/
package org.apache.ode.bpel.intercept;
-import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
-import org.w3c.dom.Node;
-import org.w3c.dom.Text;
+import java.util.Map;
import javax.xml.namespace.QName;
-import java.util.Map;
+
+import org.w3c.dom.Node;
+import org.w3c.dom.Text;
/**
* An example of a simple interceptor providing a "throttling" capability - that is an
@@ -36,8 +36,7 @@
private static final QName PROP_MAX_INSTANCES = new QName("urn:org.apache.ode.bpel.intercept", "maxInstances");
@Override
- public void onNewInstanceInvoked(MyRoleMessageExchange mex,
- InterceptorContext ic) throws FailMessageExchangeException {
+ public void onNewInstanceInvoked(InterceptorEvent ic) throws FailMessageExchangeException {
int maxInstances;
try {
maxInstances = Integer.valueOf(getSimpleProperty(PROP_MAX_INSTANCES, ic));
@@ -56,7 +55,7 @@
* @param ic interceptor context
* @return value of the property, or <code>null</code> if not set
*/
- private String getSimpleProperty(QName propertyName, InterceptorContext ic) {
+ private String getSimpleProperty(QName propertyName, InterceptorEvent ic) {
Map<QName, Node> props = ic.getProcessConf().getProperties();
for (Map.Entry<QName, Node> prop : props.entrySet()) {
if (prop.getKey().equals(propertyName))
Modified: ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/BpelDAOConnectionFactoryImpl.java
URL: http://svn.apache.org/viewvc/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/BpelDAOConnectionFactoryImpl.java?rev=573153&r1=573152&r2=573153&view=diff
==============================================================================
--- ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/BpelDAOConnectionFactoryImpl.java (original)
+++ ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/BpelDAOConnectionFactoryImpl.java Wed Sep 5 22:46:42 2007
@@ -22,6 +22,7 @@
import org.apache.ode.bpel.dao.BpelDAOConnectionFactory;
import org.apache.ode.bpel.iapi.Scheduler;
+import javax.transaction.TransactionManager;
import javax.xml.namespace.QName;
import java.util.HashMap;
import java.util.Map;
@@ -33,14 +34,14 @@
public class BpelDAOConnectionFactoryImpl implements BpelDAOConnectionFactory {
private static final Map<QName, ProcessDaoImpl> __StateStore = new HashMap<QName, ProcessDaoImpl>();
- private Scheduler _scheduler;
+ private TransactionManager _txm;
- public BpelDAOConnectionFactoryImpl(Scheduler sched) {
- _scheduler = sched;
+ public BpelDAOConnectionFactoryImpl(TransactionManager txm) {
+ _txm = txm;
}
public BpelDAOConnection getConnection() {
- return new BpelDAOConnectionImpl(__StateStore, _scheduler);
+ return new BpelDAOConnectionImpl(__StateStore, _txm);
}
/**
Modified: ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/BpelDAOConnectionImpl.java
URL: http://svn.apache.org/viewvc/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/BpelDAOConnectionImpl.java?rev=573153&r1=573152&r2=573153&view=diff
==============================================================================
--- ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/BpelDAOConnectionImpl.java (original)
+++ ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/BpelDAOConnectionImpl.java Wed Sep 5 22:46:42 2007
@@ -18,6 +18,22 @@
*/
package org.apache.ode.bpel.memdao;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.transaction.Status;
+import javax.transaction.Synchronization;
+import javax.transaction.TransactionManager;
+import javax.xml.namespace.QName;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.ode.bpel.common.BpelEventFilter;
@@ -30,25 +46,10 @@
import org.apache.ode.bpel.dao.ProcessInstanceDAO;
import org.apache.ode.bpel.dao.ScopeDAO;
import org.apache.ode.bpel.evt.BpelEvent;
-import org.apache.ode.bpel.iapi.Scheduler;
import org.apache.ode.utils.ISO8601DateParser;
import org.apache.ode.utils.stl.CollectionsX;
import org.apache.ode.utils.stl.UnaryFunction;
-import javax.xml.namespace.QName;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.ConcurrentHashMap;
-
-
/**
* A very simple, in-memory implementation of the {@link BpelDAOConnection} interface.
*/
@@ -56,17 +57,21 @@
private static final Log __log = LogFactory.getLog(BpelDAOConnectionImpl.class);
public static long TIME_TO_LIVE = 10*60*1000;
- private Scheduler _scheduler;
+ private TransactionManager _txm;
+
private Map<QName, ProcessDaoImpl> _store;
+
private List<BpelEvent> _events = new LinkedList<BpelEvent>();
- private static Map<String,MessageExchangeDAO> _mexStore = Collections.synchronizedMap(new HashMap<String,MessageExchangeDAO>());
- protected static Map<String, Long> _mexAge = new ConcurrentHashMap<String, Long>();
+
+ private final List<MessageExchangeDAOImpl> _mexList = new LinkedList<MessageExchangeDAOImpl>();
+ private final Map<String, MessageExchangeDAOImpl> _mexStore = new HashMap<String, MessageExchangeDAOImpl>();
+
private static AtomicLong counter = new AtomicLong(Long.MAX_VALUE / 2);
private static volatile long _lastRemoval = 0;
- BpelDAOConnectionImpl(Map<QName, ProcessDaoImpl> store, Scheduler scheduler) {
+ BpelDAOConnectionImpl(Map<QName, ProcessDaoImpl> store, TransactionManager txm) {
_store = store;
- _scheduler = scheduler;
+ _txm = txm;
}
public ProcessDAO getProcess(QName processId) {
@@ -74,8 +79,8 @@
}
public ProcessDAO createProcess(QName pid, QName type, String guid, long version) {
- ProcessDaoImpl process = new ProcessDaoImpl(this,_store,pid,type, guid,version);
- _store.put(pid,process);
+ ProcessDaoImpl process = new ProcessDaoImpl(this, _store, pid, type, guid, version);
+ _store.put(pid, process);
return process;
}
@@ -89,13 +94,12 @@
}
public Collection<ProcessInstanceDAO> instanceQuery(InstanceFilter filter) {
- if(filter.getLimit()==0) {
+ if (filter.getLimit() == 0) {
return Collections.EMPTY_LIST;
}
List<ProcessInstanceDAO> matched = new ArrayList<ProcessInstanceDAO>();
// Selecting
- selectionCompleted:
- for (ProcessDaoImpl proc : _store.values()) {
+ selectionCompleted: for (ProcessDaoImpl proc : _store.values()) {
boolean pmatch = true;
if (filter.getNameFilter() != null
&& !equalsOrWildcardMatch(filter.getNameFilter(), proc.getProcessId().getLocalPart()))
@@ -111,9 +115,11 @@
if (filter.getStatusFilter() != null) {
boolean statusMatch = false;
for (Short status : filter.convertFilterState()) {
- if (inst.getState() == status.byteValue()) statusMatch = true;
+ if (inst.getState() == status.byteValue())
+ statusMatch = true;
}
- if (!statusMatch) match = false;
+ if (!statusMatch)
+ match = false;
}
if (filter.getStartedDateFilter() != null
&& !dateMatch(filter.getStartedDateFilter(), inst.getCreateTime(), filter))
@@ -122,25 +128,25 @@
&& !dateMatch(filter.getLastActiveDateFilter(), inst.getLastActiveTime(), filter))
match = false;
-// if (filter.getPropertyValuesFilter() != null) {
-// for (Map.Entry propEntry : filter.getPropertyValuesFilter().entrySet()) {
-// boolean entryMatched = false;
-// for (ProcessPropertyDAO prop : proc.getProperties()) {
-// if (prop.getName().equals(propEntry.getKey())
-// && (propEntry.getValue().equals(prop.getMixedContent())
-// || propEntry.getValue().equals(prop.getSimpleContent()))) {
-// entryMatched = true;
-// }
-// }
-// if (!entryMatched) {
-// match = false;
-// }
-// }
-// }
+ // if (filter.getPropertyValuesFilter() != null) {
+ // for (Map.Entry propEntry : filter.getPropertyValuesFilter().entrySet()) {
+ // boolean entryMatched = false;
+ // for (ProcessPropertyDAO prop : proc.getProperties()) {
+ // if (prop.getName().equals(propEntry.getKey())
+ // && (propEntry.getValue().equals(prop.getMixedContent())
+ // || propEntry.getValue().equals(prop.getSimpleContent()))) {
+ // entryMatched = true;
+ // }
+ // }
+ // if (!entryMatched) {
+ // match = false;
+ // }
+ // }
+ // }
if (match) {
matched.add(inst);
- if(matched.size()==filter.getLimit()) {
+ if (matched.size() == filter.getLimit()) {
break selectionCompleted;
}
}
@@ -153,9 +159,10 @@
Collections.sort(matched, new Comparator<ProcessInstanceDAO>() {
public int compare(ProcessInstanceDAO o1, ProcessInstanceDAO o2) {
- for (String orderKey: orders) {
+ for (String orderKey : orders) {
int result = compareInstanceUsingKey(orderKey, o1, o2);
- if (result != 0) return result;
+ if (result != 0)
+ return result;
}
return 0;
}
@@ -175,37 +182,65 @@
throw new UnsupportedOperationException("Can't query process configuration using a transient DAO.");
}
- public MessageExchangeDAO createMessageExchange(char dir) {
- final String id = Long.toString(counter.getAndIncrement());
- MessageExchangeDAO mex = new MessageExchangeDAOImpl(dir,id);
- long now = System.currentTimeMillis();
- _mexStore.put(id,mex);
- _mexAge.put(id, now);
-
- if (now > _lastRemoval + (TIME_TO_LIVE/10)) {
- _lastRemoval = now;
- Object[] oldMexs = _mexAge.keySet().toArray();
- for (int i=oldMexs.length-1; i>0; i--) {
- String oldMex = (String) oldMexs[i];
- Long age = _mexAge.get(oldMex);
- if (age != null && now-age > TIME_TO_LIVE) {
- removeMessageExchange(oldMex);
- _mexAge.remove(oldMex);
- }
- }
+ public MessageExchangeDAO createMessageExchange(final String mexId, char dir) {
+ MessageExchangeDAOImpl mex = new MessageExchangeDAOImpl(dir, mexId);
+ mex.createTime = new Date();
+
+ // FIXME: Why is this necessary? We should explicitly remove these thigs -mbs
+
+ synchronized (_mexStore) {
+ _mexStore.put(mexId, mex);
+ _mexList.add(mex);
}
+
+ cleanupDeadWood();
+
// Removing right away on rollback
onRollback(new Runnable() {
public void run() {
- removeMessageExchange(id);
- _mexAge.remove(id);
+ synchronized (_mexStore) {
+ MessageExchangeDAOImpl mexdao = _mexStore.remove(mexId);
+
+ if (mexdao != null)
+ _mexList.remove(mexdao);
+ }
}
});
return mex;
}
+
+
+ /**
+ * Remove old message exchanges from the Mex store.
+ *
+ */
+ private void cleanupDeadWood() {
+ long now = System.currentTimeMillis();
+
+ if (now > _lastRemoval + (TIME_TO_LIVE/10)) {
+ _lastRemoval = now;
+
+ synchronized (_mexStore) {
+ LinkedList trash = new LinkedList<MessageExchangeDAOImpl>();
+ for (MessageExchangeDAOImpl mexdao : _mexList) {
+ long createtime = mexdao._createTime.getTime();
+ if (now-createtime> TIME_TO_LIVE) {
+ trash.add(mexdao);
+ } else
+ break;
+ }
+
+ _mexList.removeAll(trash);
+ _mexStore.values().removeAll(trash);
+ }
+ }
+
+
+ }
+
public MessageExchangeDAO getMessageExchange(String mexid) {
return _mexStore.get(mexid);
}
@@ -217,7 +252,8 @@
String orderKey = key;
if (key.startsWith("+") || key.startsWith("-")) {
orderKey = key.substring(1, key.length());
- if (key.startsWith("-")) ascending = false;
+ if (key.startsWith("-"))
+ ascending = false;
}
ProcessDAO process1 = getProcess(instanceDAO1.getProcess().getProcessId());
ProcessDAO process2 = getProcess(instanceDAO2.getProcess().getProcessId());
@@ -231,11 +267,11 @@
s1 = process1.getProcessId().getNamespaceURI();
s2 = process2.getProcessId().getNamespaceURI();
} else if ("version".equals(orderKey)) {
- s1 = ""+process1.getVersion();
- s2 = ""+process2.getVersion();
+ s1 = "" + process1.getVersion();
+ s2 = "" + process2.getVersion();
} else if ("status".equals(orderKey)) {
- s1 = ""+instanceDAO1.getState();
- s2 = ""+instanceDAO2.getState();
+ s1 = "" + instanceDAO1.getState();
+ s2 = "" + instanceDAO2.getState();
} else if ("started".equals(orderKey)) {
s1 = ISO8601DateParser.format(instanceDAO1.getCreateTime());
s2 = ISO8601DateParser.format(instanceDAO2.getCreateTime());
@@ -243,62 +279,71 @@
s1 = ISO8601DateParser.format(instanceDAO1.getLastActiveTime());
s2 = ISO8601DateParser.format(instanceDAO2.getLastActiveTime());
}
- if (ascending) return s1.compareTo(s2);
- else return s2.compareTo(s1);
+ if (ascending)
+ return s1.compareTo(s2);
+ else
+ return s2.compareTo(s1);
}
private boolean equalsOrWildcardMatch(String s1, String s2) {
- if (s1 == null || s2 == null) return false;
- if (s1.equals(s2)) return true;
+ if (s1 == null || s2 == null)
+ return false;
+ if (s1.equals(s2))
+ return true;
if (s1.endsWith("*")) {
- if (s2.startsWith(s1.substring(0, s1.length() - 1))) return true;
+ if (s2.startsWith(s1.substring(0, s1.length() - 1)))
+ return true;
}
if (s2.endsWith("*")) {
- if (s1.startsWith(s2.substring(0, s2.length() - 1))) return true;
+ if (s1.startsWith(s2.substring(0, s2.length() - 1)))
+ return true;
}
return false;
}
- public boolean dateMatch(List<String> dateFilters, Date instanceDate, InstanceFilter filter) {
+ public boolean dateMatch(List<String> dateFilters, Date instanceDate, InstanceFilter filter) {
boolean match = true;
for (String ddf : dateFilters) {
String isoDate = ISO8601DateParser.format(instanceDate);
String critDate = Filter.getDateWithoutOp(ddf);
if (ddf.startsWith("=")) {
- if (!isoDate.startsWith(critDate)) match = false;
+ if (!isoDate.startsWith(critDate))
+ match = false;
} else if (ddf.startsWith("<=")) {
- if (!isoDate.startsWith(critDate) && isoDate.compareTo(critDate) > 0) match = false;
+ if (!isoDate.startsWith(critDate) && isoDate.compareTo(critDate) > 0)
+ match = false;
} else if (ddf.startsWith(">=")) {
- if (!isoDate.startsWith(critDate) && isoDate.compareTo(critDate) < 0) match = false;
+ if (!isoDate.startsWith(critDate) && isoDate.compareTo(critDate) < 0)
+ match = false;
} else if (ddf.startsWith("<")) {
- if (isoDate.compareTo(critDate) > 0) match = false;
+ if (isoDate.compareTo(critDate) > 0)
+ match = false;
} else if (ddf.startsWith(">")) {
- if (isoDate.compareTo(critDate) < 0) match = false;
+ if (isoDate.compareTo(critDate) < 0)
+ match = false;
}
}
return match;
}
-
public ScopeDAO getScope(Long siidl) {
for (ProcessDaoImpl process : _store.values()) {
for (ProcessInstanceDAO instance : process._instances.values()) {
- if (instance.getScope(siidl) != null) return instance.getScope(siidl);
+ if (instance.getScope(siidl) != null)
+ return instance.getScope(siidl);
}
}
return null;
}
-
public void insertBpelEvent(BpelEvent event, ProcessDAO processConfiguration, ProcessInstanceDAO instance) {
_events.add(event);
}
-
public List<Date> bpelEventTimelineQuery(InstanceFilter ifilter, BpelEventFilter efilter) {
// TODO : Provide more correct implementation:
ArrayList<Date> dates = new ArrayList<Date>();
- CollectionsX.transform(dates, _events, new UnaryFunction<BpelEvent,Date>() {
+ CollectionsX.transform(dates, _events, new UnaryFunction<BpelEvent, Date>() {
public Date apply(BpelEvent x) {
return x.getTimestamp();
}
@@ -306,7 +351,6 @@
return dates;
}
-
public List<BpelEvent> bpelEventQuery(InstanceFilter ifilter, BpelEventFilter efilter) {
// TODO : Provide a more correct (filtering) implementation:
return _events;
@@ -316,35 +360,39 @@
* @see org.apache.ode.bpel.dao.BpelDAOConnection#instanceQuery(String)
*/
public Collection<ProcessInstanceDAO> instanceQuery(String expression) {
- //TODO
+ // TODO
throw new UnsupportedOperationException();
}
- static void removeMessageExchange(String mexId) {
- // Cleaning up mex
- if (__log.isDebugEnabled()) __log.debug("Removing mex " + mexId + " from memory store.");
- MessageExchangeDAO mex = _mexStore.remove(mexId);
- if (mex == null)
- __log.warn("Couldn't find mex " + mexId + " for cleanup.");
- _mexAge.remove(mexId);
- }
-
public void defer(final Runnable runnable) {
- _scheduler.registerSynchronizer(new Scheduler.Synchronizer() {
- public void afterCompletion(boolean success) {
- }
- public void beforeCompletion() {
- runnable.run();
- }
- });
+ try {
+ _txm.getTransaction().registerSynchronization(new Synchronization() {
+ public void afterCompletion(int status) {
+ }
+
+ public void beforeCompletion() {
+ runnable.run();
+ }
+ });
+
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
}
+
+
public void onRollback(final Runnable runnable) {
- _scheduler.registerSynchronizer(new Scheduler.Synchronizer() {
- public void afterCompletion(boolean success) {
- if (!success) runnable.run();
- }
- public void beforeCompletion() {
- }
- });
+ try {
+ _txm.getTransaction().registerSynchronization(new Synchronization() {
+ public void afterCompletion(int status) {
+ if (status != Status.STATUS_COMMITTED) runnable.run();
+ }
+
+ public void beforeCompletion() {}
+ });
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
}
+
}
Modified: ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/MessageDAOImpl.java
URL: http://svn.apache.org/viewvc/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/MessageDAOImpl.java?rev=573153&r1=573152&r2=573153&view=diff
==============================================================================
--- ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/MessageDAOImpl.java (original)
+++ ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/MessageDAOImpl.java Wed Sep 5 22:46:42 2007
@@ -19,20 +19,17 @@
package org.apache.ode.bpel.memdao;
+import javax.xml.namespace.QName;
+
import org.apache.ode.bpel.dao.MessageDAO;
-import org.apache.ode.bpel.dao.MessageExchangeDAO;
import org.apache.ode.utils.DOMUtils;
import org.w3c.dom.Element;
-import javax.xml.namespace.QName;
-
public class MessageDAOImpl extends DaoBaseImpl implements MessageDAO {
private QName type;
private Element data;
- private MessageExchangeDAO messageExchange;
- public MessageDAOImpl(MessageExchangeDAO messageExchange) {
- this.messageExchange = messageExchange;
+ public MessageDAOImpl() {
}
public void setType(QName type) {
@@ -54,8 +51,5 @@
return data;
}
- public MessageExchangeDAO getMessageExchange() {
- return messageExchange;
- }
-
+
}
Modified: ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/MessageExchangeDAOImpl.java
URL: http://svn.apache.org/viewvc/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/MessageExchangeDAOImpl.java?rev=573153&r1=573152&r2=573153&view=diff
==============================================================================
--- ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/MessageExchangeDAOImpl.java (original)
+++ ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/MessageExchangeDAOImpl.java Wed Sep 5 22:46:42 2007
@@ -19,46 +19,54 @@
package org.apache.ode.bpel.memdao;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+import java.util.Map.Entry;
+
+import javax.xml.namespace.QName;
+
import org.apache.ode.bpel.dao.MessageDAO;
import org.apache.ode.bpel.dao.MessageExchangeDAO;
import org.apache.ode.bpel.dao.PartnerLinkDAO;
import org.apache.ode.bpel.dao.ProcessDAO;
import org.apache.ode.bpel.dao.ProcessInstanceDAO;
+import org.apache.ode.bpel.iapi.InvocationStyle;
+import org.apache.ode.bpel.iapi.MessageExchange.AckType;
+import org.apache.ode.bpel.iapi.MessageExchange.FailureType;
+import org.apache.ode.bpel.iapi.MessageExchange.Status;
import org.w3c.dom.Element;
-import javax.xml.namespace.QName;
-import java.util.Date;
-import java.util.HashSet;
-import java.util.Map.Entry;
-import java.util.Properties;
-import java.util.Set;
-
public class MessageExchangeDAOImpl extends DaoBaseImpl implements MessageExchangeDAO {
- private String messageExchangeId;
- private MessageDAO response;
- private Date createTime;
- private MessageDAO request;
- private String operation;
- private QName portType;
- private String status;
- private int partnerLinkModelId;
- private String correlationId;
- private String pattern;
- private Element ePR;
- private Element callbackEPR;
- private String channel;
- private boolean propagateTransactionFlag;
- private QName fault;
- private String faultExplanation;
- private String correlationStatus;
- private ProcessDAO process;
- private ProcessInstanceDAO instance;
- private char direction;
- private QName callee;
- private Properties properties = new Properties();
- private PartnerLinkDAOImpl _plink;
- private String pipedMessageExchangeId;
+ String messageExchangeId;
+ MessageDAO response;
+ Date createTime;
+ MessageDAO request;
+ String operation;
+ QName portType;
+ Status status;
+ int partnerLinkModelId;
+ String correlationId;
+ String pattern;
+ Element ePR;
+ String channel;
+ QName fault;
+ String faultExplanation;
+ String correlationStatus;
+ ProcessDAO process;
+ ProcessInstanceDAO instance;
+ char direction;
+ QName callee;
+ Properties properties = new Properties();
+ PartnerLinkDAOImpl _plink;
+ InvocationStyle _istyle;
+ String _pipedExchange;
+ FailureType _failureType;
+ long _timeout;
+ AckType _ackType;
+ QName _pipedPID;
public MessageExchangeDAOImpl(char direction, String messageEchangeId){
this.direction = direction;
@@ -94,17 +102,17 @@
}
- public void setStatus(String status) {
+ public void setStatus(Status status) {
this.status = status;
}
- public String getStatus() {
+ public Status getStatus() {
return status;
}
public MessageDAO createMessage(QName type) {
- MessageDAO messageDAO = new MessageDAOImpl(this);
+ MessageDAO messageDAO = new MessageDAOImpl();
messageDAO.setType(type);
return messageDAO;
}
@@ -128,11 +136,11 @@
}
- public String getCorrelationId() {
+ public String getPartnersKey() {
return correlationId;
}
- public void setCorrelationId(String correlationId) {
+ public void setPartnersKey(String correlationId) {
this.correlationId = correlationId;
}
@@ -156,15 +164,7 @@
return ePR;
}
- public void setCallbackEPR(Element epr) {
- this.callbackEPR = epr;
-
- }
-
- public Element getCallbackEPR() {
- return callbackEPR;
- }
-
+
public String getPattern() {
return pattern;
}
@@ -177,10 +177,6 @@
this.channel = string;
}
- public boolean getPropagateTransactionFlag() {
- return propagateTransactionFlag;
- }
-
public QName getFault() {
return fault;
}
@@ -198,7 +194,6 @@
}
-
public void setCorrelationStatus(String cstatus) {
this.correlationStatus = cstatus;
}
@@ -263,13 +258,6 @@
return retVal;
}
- public String getPipedMessageExchangeId() {
- return pipedMessageExchangeId;
- }
-
- public void setPipedMessageExchangeId(String pipedMessageExchangeId) {
- this.pipedMessageExchangeId = pipedMessageExchangeId;
- }
public void release() {
instance = null;
@@ -277,10 +265,60 @@
_plink = null;
request = null;
response = null;
- BpelDAOConnectionImpl.removeMessageExchange(getMessageExchangeId());
}
public String toString() {
return "mem.mex(direction=" + direction + " id=" + messageExchangeId + ")";
+ }
+
+ public InvocationStyle getInvocationStyle() {
+ return _istyle;
+ }
+
+ public String getPipedMessageExchangeId() {
+ return _pipedExchange;
+ }
+
+ public void setFailureType(FailureType failureType) {
+ _failureType = failureType;
+ }
+
+ public FailureType getFailureType() {
+ return _failureType;
+ }
+
+ public void setInvocationStyle(InvocationStyle invocationStyle) {
+ _istyle = invocationStyle;
+
+ }
+
+ public void setPipedMessageExchangeId(String pipedMexId) {
+ _pipedExchange = pipedMexId;
+
+ }
+
+ public long getTimeout() {
+ return _timeout;
+ }
+
+ public void setTimeout(long timeout) {
+ _timeout = timeout;
+ }
+
+ public AckType getAckType() {
+ return _ackType;
+ }
+
+ public void setAckType(AckType ackType) {
+ _ackType = ackType;
+ }
+
+ public QName getPipedPID() {
+ return _pipedPID;
+ }
+
+ public void setPipedPID(QName pipedPid) {
+ _pipedPID = pipedPid;
+
}
}
Modified: ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/ProcessDaoImpl.java
URL: http://svn.apache.org/viewvc/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/ProcessDaoImpl.java?rev=573153&r1=573152&r2=573153&view=diff
==============================================================================
--- ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/ProcessDaoImpl.java (original)
+++ ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/ProcessDaoImpl.java Wed Sep 5 22:46:42 2007
@@ -22,6 +22,9 @@
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -52,10 +55,10 @@
protected final Map<Integer, PartnerLinkDAO> _plinks = new ConcurrentHashMap<Integer, PartnerLinkDAO>();
private Map<QName, ProcessDaoImpl> _store;
private BpelDAOConnectionImpl _conn;
- private int _executionCount = 0;
private Collection<Long> _instancesToRemove = new ConcurrentLinkedQueue<Long>();
private static volatile long _lastRemoval = 0;
+
private String _guid;
public ProcessDaoImpl(BpelDAOConnectionImpl conn, Map<QName, ProcessDaoImpl> store,
@@ -136,7 +139,6 @@
}
});
- _executionCount++;
return newInstance;
}
@@ -204,8 +206,7 @@
}
public int getNumInstances() {
- // Instances are removed after execution, using a counter instead
- return _executionCount;
+ return _instances.size();
}
public ProcessInstanceDAO getInstanceWithLock(Long iid) {
Modified: ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/ProcessInstanceDaoImpl.java
URL: http://svn.apache.org/viewvc/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/ProcessInstanceDaoImpl.java?rev=573153&r1=573152&r2=573153&view=diff
==============================================================================
--- ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/ProcessInstanceDaoImpl.java (original)
+++ ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/ProcessInstanceDaoImpl.java Wed Sep 5 22:46:42 2007
@@ -45,33 +45,52 @@
import java.util.Set;
/**
- * A very simple, in-memory implementation of the {@link ProcessInstanceDAO}
- * interface.
+ * A very simple, in-memory implementation of the {@link ProcessInstanceDAO} interface.
*/
public class ProcessInstanceDaoImpl extends DaoBaseImpl implements ProcessInstanceDAO {
private static final Collection<ScopeDAO> EMPTY_SCOPE_DAOS = Collections.emptyList();
private short _previousState;
+
private short _state;
+
private Long _instanceId;
+
private ProcessDaoImpl _processDao;
+
private Object _soup;
+
private Map<Long, ScopeDAO> _scopes = new HashMap<Long, ScopeDAO>();
+
private Map<String, List<ScopeDAO>> _scopesByName = new HashMap<String, List<ScopeDAO>>();
+
private Map<String, byte[]> _messageExchanges = new HashMap<String, byte[]>();
+
private ScopeDAO _rootScope;
+
private FaultDAO _fault;
+
private CorrelatorDAO _instantiatingCorrelator;
+
private BpelDAOConnection _conn;
+
private int _failureCount;
+
private Date _failureDateTime;
+
private Map<String, ActivityRecoveryDAO> _activityRecoveries = new HashMap<String, ActivityRecoveryDAO>();
// TODO: Remove this, we should be using the main event store...
private List<ProcessInstanceEvent> _events = new ArrayList<ProcessInstanceEvent>();
+
private Date _lastActive;
+
private int _seq;
+ private byte[] _execState;
+
+ private int _execStateCount;
+
ProcessInstanceDaoImpl(BpelDAOConnection conn, ProcessDaoImpl processDao, CorrelatorDAO correlator) {
_state = 0;
_processDao = processDao;
@@ -125,11 +144,11 @@
* @see ProcessInstanceDAO#getExecutionState()
*/
public byte[] getExecutionState() {
- throw new IllegalStateException("In-memory instances are never serialized");
+ return _execState;
}
public void setExecutionState(byte[] bytes) {
- throw new IllegalStateException("In-memory instances are never serialized");
+ _execState = bytes;
}
public Object getSoup() {
@@ -314,7 +333,7 @@
}
public void createActivityRecovery(String channel, long activityId, String reason, Date dateTime, Element data,
- String[] actions, int retries) {
+ String[] actions, int retries) {
_activityRecoveries
.put(channel, new ActivityRecoveryDAOImpl(channel, activityId, reason, dateTime, data, actions, retries));
_failureCount = _activityRecoveries.size();
@@ -347,7 +366,7 @@
private int _retries;
ActivityRecoveryDAOImpl(String channel, long activityId, String reason, Date dateTime, Element details, String[] actions,
- int retries) {
+ int retries) {
_activityId = activityId;
_channel = channel;
_reason = reason;
@@ -404,5 +423,13 @@
public String toString() {
return "mem.instance(type=" + _processDao.getType() + " iid=" + _instanceId + ")";
+ }
+
+ public int getExecutionStateCounter() {
+ return _execStateCount;
+ }
+
+ public void setExecutionStateCounter(int stateCounter) {
+ _execStateCount = stateCounter;
}
}
Modified: ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/REPLY.java
URL: http://svn.apache.org/viewvc/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/REPLY.java?rev=573153&r1=573152&r2=573153&view=diff
==============================================================================
--- ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/REPLY.java (original)
+++ ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/REPLY.java Wed Sep 5 22:46:42 2007
@@ -46,13 +46,12 @@
}
FaultData fault = null;
- // TODO: Check for fault without message.
try {
- Node msg = getBpelRuntimeContext()
+ Node msg = oreply.variable == null ? null : getBpelRuntimeContext()
.fetchVariableData(_scopeFrame.resolve(oreply.variable), false);
- assert msg instanceof Element;
+ assert msg == null || msg instanceof Element; // note msg can be null for faults
for (Iterator i = oreply.initCorrelations.iterator(); i.hasNext(); ) {
OScope.CorrelationSet cset = (OScope.CorrelationSet) i.next();
@@ -64,7 +63,7 @@
getBpelRuntimeContext()
.reply(_scopeFrame.resolve(oreply.partnerLink), oreply.operation.getName(),
oreply.messageExchangeId, (Element)msg,
- (oreply.fault != null) ? oreply.fault : null);
+ oreply.fault);
} catch (FaultException e) {
__log.error(e);
fault = createFault(e.getQName(), oreply);
Modified: ode/trunk/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/MockBpelServer.java
URL: http://svn.apache.org/viewvc/ode/trunk/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/MockBpelServer.java?rev=573153&r1=573152&r2=573153&view=diff
==============================================================================
--- ode/trunk/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/MockBpelServer.java (original)
+++ ode/trunk/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/MockBpelServer.java Wed Sep 5 22:46:42 2007
@@ -18,21 +18,35 @@
*/
package org.apache.ode.bpel.runtime;
+import java.io.File;
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import javax.sql.DataSource;
+import javax.transaction.TransactionManager;
+import javax.wsdl.PortType;
+import javax.xml.namespace.QName;
+
import org.apache.ode.bpel.dao.BpelDAOConnectionFactory;
import org.apache.ode.bpel.dao.BpelDAOConnectionFactoryJDBC;
import org.apache.ode.bpel.engine.BpelServerImpl;
import org.apache.ode.bpel.iapi.BindingContext;
+import org.apache.ode.bpel.iapi.BpelEngineException;
import org.apache.ode.bpel.iapi.ContextException;
import org.apache.ode.bpel.iapi.Endpoint;
import org.apache.ode.bpel.iapi.EndpointReference;
import org.apache.ode.bpel.iapi.EndpointReferenceContext;
+import org.apache.ode.bpel.iapi.InvocationStyle;
import org.apache.ode.bpel.iapi.Message;
import org.apache.ode.bpel.iapi.MessageExchangeContext;
import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
import org.apache.ode.bpel.iapi.PartnerRoleChannel;
import org.apache.ode.bpel.iapi.PartnerRoleMessageExchange;
import org.apache.ode.bpel.iapi.Scheduler;
-import org.apache.ode.bpel.memdao.BpelDAOConnectionFactoryImpl;
import org.apache.ode.dao.jpa.BPELDAOConnectionFactoryImpl;
import org.apache.ode.il.EmbeddedGeronimoFactory;
import org.apache.ode.il.MockScheduler;
@@ -44,36 +58,31 @@
import org.w3c.dom.Document;
import org.w3c.dom.Element;
-import javax.sql.DataSource;
-import javax.transaction.TransactionManager;
-import javax.wsdl.PortType;
-import javax.xml.namespace.QName;
-import java.io.File;
-import java.util.Collection;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
+class MockBpelServer {
+ BpelServerImpl _server;
-class MockBpelServer {
+ ProcessStoreImpl _store;
+
+ TransactionManager _txManager;
+
+ Database _database;
+
+ DataSource _dataSource;
+
+ SchedulerWrapper _scheduler;
+
+ BpelDAOConnectionFactory _daoCF;
+
+ EndpointReferenceContext _eprContext;
- BpelServerImpl _server;
- ProcessStoreImpl _store;
- TransactionManager _txManager;
- Database _database;
- DataSource _dataSource;
- SchedulerWrapper _scheduler;
- BpelDAOConnectionFactory _daoCF;
- EndpointReferenceContext _eprContext;
- MessageExchangeContext _mexContext;
- BindingContext _bindContext;
- HashMap<String, QName> _activated = new HashMap();
- HashMap _endpoints = new HashMap();
+ MessageExchangeContext _mexContext;
+
+ BindingContext _bindContext;
+
+ HashMap<String, QName> _activated = new HashMap<String, QName>();
+
+ HashMap<String, EndpointReference> _endpoints = new HashMap<String, EndpointReference>();
public MockBpelServer() {
try {
@@ -85,10 +94,10 @@
if (_daoCF == null)
throw new RuntimeException("No DAO");
_server.setDaoConnectionFactory(_daoCF);
- _server.setInMemDaoConnectionFactory(new BpelDAOConnectionFactoryImpl(_scheduler));
if (_scheduler == null)
throw new RuntimeException("No scheduler");
- _store = new ProcessStoreImpl(_dataSource,"jpa", true);
+ _store = new ProcessStoreImpl(_dataSource, "jpa", true);
+ _server.setTransactionManager(_txManager);
_server.setScheduler(_scheduler);
_server.setEndpointReferenceContext(createEndpointReferenceContext());
_server.setMessageExchangeContext(createMessageExchangeContext());
@@ -104,52 +113,34 @@
public Collection<QName> deploy(File deploymentUnitDirectory) {
Collection<QName> pids = _store.deploy(deploymentUnitDirectory);
- for (QName pid: pids)
+ for (QName pid : pids)
_server.register(_store.getProcessConfiguration(pid));
return pids;
}
public void invoke(QName serviceName, String opName, Element body) throws Exception {
- try {
- String messageId = new GUID().toString();
- MyRoleMessageExchange mex;
+ String messageId = new GUID().toString();
+ MyRoleMessageExchange mex;
+
+ mex = _server.createMessageExchange(InvocationStyle.UNRELIABLE, serviceName, opName, "" + messageId);
+ if (mex.getOperation() == null)
+ throw new Exception("Did not find operation " + opName + " on service " + serviceName);
+ Message request = mex.createMessage(mex.getOperation().getInput().getMessage().getQName());
+ Element wrapper = body.getOwnerDocument().createElementNS("", "main");
+ wrapper.appendChild(body);
+ Element message = body.getOwnerDocument().createElementNS("", "message");
+ message.appendChild(wrapper);
+ request.setMessage(message);
+ mex.setRequest(request);
+ mex.invokeBlocking();
+ mex.complete();
- _txManager.begin();
- mex = _server.getEngine().createMessageExchange("" + messageId, serviceName, opName);
- if (mex.getOperation() == null)
- throw new Exception("Did not find operation " + opName + " on service " + serviceName);
- Message request = mex.createMessage(mex.getOperation().getInput().getMessage().getQName());
- Element wrapper = body.getOwnerDocument().createElementNS("", "main");
- wrapper.appendChild(body);
- Element message = body.getOwnerDocument().createElementNS("", "message");
- message.appendChild(wrapper);
- request.setMessage(message);
- mex.invoke(request);
- mex.complete();
- _txManager.commit();
- } catch (Exception except) {
- _txManager.rollback();
- throw except;
- }
}
public TransactionManager getTransactionManager() {
return _txManager;
}
- public void waitForBlocking() {
- try {
- long delay = 1000;
- while (true) {
- // Be warned: ugly hack and not safe for slow CPUs.
- long cutoff = System.currentTimeMillis() - delay;
- if (_scheduler._nextSchedule < cutoff)
- break;
- Thread.sleep(delay);
- }
- } catch (InterruptedException except) { }
- }
-
public void shutdown() throws Exception {
_server.stop();
_scheduler.stop();
@@ -205,17 +196,47 @@
_eprContext = new EndpointReferenceContext() {
public EndpointReference resolveEndpointReference(Element element) {
String service = DOMUtils.getChildCharacterData(element);
- return (EndpointReference)_endpoints.get(service);
+ return (EndpointReference) _endpoints.get(service);
+ }
+
+ public EndpointReference convertEndpoint(QName qName, Element element) {
+ return null;
}
- public EndpointReference convertEndpoint(QName qName, Element element) { return null; }
};
return _eprContext;
}
protected MessageExchangeContext createMessageExchangeContext() {
- _mexContext = new MessageExchangeContext() {
- public void invokePartner(PartnerRoleMessageExchange mex) { }
- public void onAsyncReply(MyRoleMessageExchange myRoleMex) { }
+ _mexContext = new MessageExchangeContext() {
+
+ public void onMyRoleMessageExchangeStateChanged(MyRoleMessageExchange myRoleMex) {
+ }
+
+ public void cancel(PartnerRoleMessageExchange mex) throws ContextException {
+ // TODO Auto-generated method stub
+
+ }
+
+ public Set<InvocationStyle> getSupportedInvocationStyle(PartnerRoleChannel prc, EndpointReference partnerEpr) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ public void invokePartnerUnreliable(PartnerRoleMessageExchange mex) throws ContextException {
+ // TODO Auto-generated method stub
+
+ }
+
+ public void invokePartnerReliable(PartnerRoleMessageExchange mex) throws ContextException {
+ // TODO Auto-generated method stub
+
+ }
+
+ public void invokePartnerTransacted(PartnerRoleMessageExchange mex) throws ContextException {
+ // TODO Auto-generated method stub
+
+ }
+
};
return _mexContext;
}
@@ -225,12 +246,14 @@
public EndpointReference activateMyRoleEndpoint(QName processId, Endpoint myRoleEndpoint) {
final Document doc = DOMUtils.newDocument();
Element serviceRef = doc.createElementNS(EndpointReference.SERVICE_REF_QNAME.getNamespaceURI(),
- EndpointReference.SERVICE_REF_QNAME.getLocalPart());
+ EndpointReference.SERVICE_REF_QNAME.getLocalPart());
serviceRef.appendChild(doc.createTextNode(myRoleEndpoint.serviceName.toString()));
doc.appendChild(serviceRef);
_activated.put(myRoleEndpoint.toString(), processId);
return new EndpointReference() {
- public Document toXML() { return doc; }
+ public Document toXML() {
+ return doc;
+ }
};
}
@@ -239,12 +262,12 @@
}
public PartnerRoleChannel createPartnerRoleChannel(QName processId, PortType portType,
- final Endpoint initialPartnerEndpoint) {
+ final Endpoint initialPartnerEndpoint) {
final EndpointReference epr = new EndpointReference() {
public Document toXML() {
Document doc = DOMUtils.newDocument();
Element serviceRef = doc.createElementNS(EndpointReference.SERVICE_REF_QNAME.getNamespaceURI(),
- EndpointReference.SERVICE_REF_QNAME.getLocalPart());
+ EndpointReference.SERVICE_REF_QNAME.getLocalPart());
serviceRef.appendChild(doc.createTextNode(initialPartnerEndpoint.serviceName.toString()));
doc.appendChild(serviceRef);
return doc;
@@ -252,36 +275,32 @@
};
_endpoints.put(initialPartnerEndpoint.serviceName.toString(), epr);
return new PartnerRoleChannel() {
- public EndpointReference getInitialEndpointReference() { return epr; }
- public void close() { };
+ public EndpointReference getInitialEndpointReference() {
+ return epr;
+ }
+
+ public void close() {
+ };
};
}
};
return _bindContext;
}
-
private class SchedulerWrapper implements Scheduler {
MockScheduler _scheduler;
- long _nextSchedule;
+
+ long _nextSchedule;
SchedulerWrapper(BpelServerImpl server, TransactionManager txManager, DataSource dataSource) {
- ExecutorService executorService = Executors.newCachedThreadPool();
_scheduler = new MockScheduler(_txManager);
- _scheduler.setExecutorSvc(executorService);
_scheduler.setJobProcessor(server);
}
- public String schedulePersistedJob(Map<String,Object>jobDetail,Date when) throws ContextException {
+ public String schedulePersistedJob(Map<String, Object> jobDetail, Date when) throws ContextException {
String jobId = _scheduler.schedulePersistedJob(jobDetail, when);
- _nextSchedule = when == null ? System.currentTimeMillis() : when.getTime();
- return jobId;
- }
-
- public String scheduleVolatileJob(boolean transacted, Map<String,Object> jobDetail) throws ContextException {
- String jobId = _scheduler.scheduleVolatileJob(transacted, jobDetail);
- _nextSchedule = System.currentTimeMillis();
+ _nextSchedule = when == null ? System.currentTimeMillis() : when.getTime();
return jobId;
}
@@ -289,30 +308,32 @@
_scheduler.cancelJob(jobId);
}
- public <T> T execTransaction(Callable<T> transaction) throws Exception, ContextException {
- return _scheduler.execTransaction(transaction);
+ public void start() {
+ _scheduler.start();
}
- public <T> Future<T> execIsolatedTransaction(Callable<T> transaction) throws Exception, ContextException {
- return _scheduler.execIsolatedTransaction(transaction);
+ public void stop() {
+ _scheduler.stop();
}
- public boolean isTransacted() {
- return _scheduler.isTransacted();
+ public void shutdown() {
+ _scheduler.shutdown();
}
- public void start() { _scheduler.start(); }
- public void stop() { _scheduler.stop(); }
- public void shutdown() { _scheduler.shutdown(); }
+ public void setJobProcessor(JobProcessor processor) throws ContextException {
+ _scheduler.setJobProcessor(processor);
- public void registerSynchronizer(Synchronizer synch) throws ContextException {
- _scheduler.registerSynchronizer(synch);
}
- public void setJobProcessor(JobProcessor processor) throws ContextException {
- _scheduler.setJobProcessor(processor);
+ public void jobCompleted(String jobId) {
+ _scheduler.jobCompleted(jobId);
}
+ }
+
+ public void waitForBlocking() {
+ // TODO Auto-generated method stub
+
}
}
Modified: ode/trunk/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/ProcessManagementTest.java
URL: http://svn.apache.org/viewvc/ode/trunk/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/ProcessManagementTest.java?rev=573153&r1=573152&r2=573153&view=diff
==============================================================================
--- ode/trunk/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/ProcessManagementTest.java (original)
+++ ode/trunk/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/ProcessManagementTest.java Wed Sep 5 22:46:42 2007
@@ -116,6 +116,5 @@
_processQName = new QName(NAMESPACE, process);
_server.invoke(_processQName, "instantiate",
DOMUtils.newDocument().createElementNS(NAMESPACE, "tns:RequestElement"));
- _server.waitForBlocking();
}
}
Modified: ode/trunk/bpel-test/src/main/java/org/apache/ode/test/BPELTestAbstract.java
URL: http://svn.apache.org/viewvc/ode/trunk/bpel-test/src/main/java/org/apache/ode/test/BPELTestAbstract.java?rev=573153&r1=573152&r2=573153&view=diff
==============================================================================
--- ode/trunk/bpel-test/src/main/java/org/apache/ode/test/BPELTestAbstract.java (original)
+++ ode/trunk/bpel-test/src/main/java/org/apache/ode/test/BPELTestAbstract.java Wed Sep 5 22:46:42 2007
@@ -18,17 +18,36 @@
*/
package org.apache.ode.test;
-import org.apache.ode.bpel.common.evt.DebugBpelEventListener;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Future;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import javax.persistence.EntityManager;
+import javax.persistence.EntityManagerFactory;
+import javax.xml.namespace.QName;
+
import org.apache.ode.bpel.dao.BpelDAOConnectionFactory;
import org.apache.ode.bpel.engine.BpelServerImpl;
+import org.apache.ode.bpel.iapi.InvocationStyle;
import org.apache.ode.bpel.iapi.Message;
import org.apache.ode.bpel.iapi.MessageExchange;
-import org.apache.ode.bpel.iapi.MessageExchange.Status;
import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
-import org.apache.ode.bpel.iapi.MyRoleMessageExchange.CorrelationStatus;
import org.apache.ode.bpel.iapi.ProcessStore;
import org.apache.ode.bpel.iapi.ProcessStoreEvent;
import org.apache.ode.bpel.iapi.ProcessStoreListener;
+import org.apache.ode.bpel.iapi.MessageExchange.AckType;
+import org.apache.ode.bpel.iapi.MessageExchange.Status;
+import org.apache.ode.bpel.iapi.MyRoleMessageExchange.CorrelationStatus;
import org.apache.ode.bpel.memdao.BpelDAOConnectionFactoryImpl;
import org.apache.ode.dao.jpa.BPELDAOConnectionFactoryImpl;
import org.apache.ode.il.MockScheduler;
@@ -41,28 +60,9 @@
import org.junit.Before;
import org.w3c.dom.Element;
-import javax.persistence.EntityManager;
-import javax.persistence.EntityManagerFactory;
-import javax.persistence.Persistence;
-import javax.xml.namespace.QName;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.InputStream;
-import java.net.URISyntaxException;
-import java.net.URL;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
public abstract class BPELTestAbstract {
- private static final String SHOW_EVENTS_ON_CONSOLE = "no";
-
+ private static final String SHOW_EVENTS_ON_CONSOLE = "no";
+
protected BpelServerImpl _server;
protected ProcessStore store;
@@ -89,6 +89,8 @@
/** What's actually been deployed. */
private List<Deployment> _deployed;
+ private MockTransactionManager _txm;
+
@Before
public void setUp() throws Exception {
_failures = new CopyOnWriteArrayList<Failure>();
@@ -99,39 +101,24 @@
_deployed = new ArrayList<Deployment>();
if (Boolean.getBoolean("org.apache.ode.test.persistent")) {
- emf = Persistence.createEntityManagerFactory("ode-unit-test-embedded");
- em = emf.createEntityManager();
- _cf = new BPELDAOConnectionFactoryImpl();
- _server.setDaoConnectionFactory(_cf);
- scheduler = new MockScheduler() {
- @Override
- public void begin() {
- super.begin();
- em.getTransaction().begin();
- }
-
- @Override
- public void commit() {
- super.commit();
- em.getTransaction().commit();
- }
- @Override
- public void rollback() {
- super.rollback();
- em.getTransaction().rollback();
- }
+ _server.setDaoConnectionFactory(_cf);
+ _txm = new MockTransactionManager();
- };
+ BPELDAOConnectionFactoryImpl cf = new BPELDAOConnectionFactoryImpl();
+ cf.setTransactionManager(_txm);
+ // cf.setDataSource(datasource);
+ scheduler = new MockScheduler(_txm);
} else {
- scheduler = new MockScheduler();
- _cf = new BpelDAOConnectionFactoryImpl(scheduler);
+ _txm = new MockTransactionManager();
+ scheduler = new MockScheduler(_txm);
+ _cf = new BpelDAOConnectionFactoryImpl(_txm);
_server.setDaoConnectionFactory(_cf);
}
- _server.setInMemDaoConnectionFactory(new BpelDAOConnectionFactoryImpl(scheduler));
_server.setScheduler(scheduler);
_server.setBindingContext(new BindingContextImpl());
_server.setMessageExchangeContext(mexContext);
+ _server.setTransactionManager(_txm);
scheduler.setJobProcessor(_server);
store = new ProcessStoreImpl(null, "jpa", true);
store.registerListener(new ProcessStoreListener() {
@@ -147,7 +134,7 @@
}
});
_server.setConfigProperties(getConfigProperties());
- _server.registerBpelEventListener(new DebugBpelEventListener());
+ // _server.registerBpelEventListener(new DebugBpelEventListener());
_server.init();
_server.start();
}
@@ -162,19 +149,18 @@
System.err.println("Error undeploying " + d);
}
}
-
if (em != null)
em.close();
if (emf != null)
emf.close();
-
+
_server.stop();
_failures = null;
_deployed = null;
_deployments = null;
_invocations = null;
-
+
}
protected void negative(String deployDir) throws Throwable {
@@ -244,22 +230,24 @@
}
}
- protected Invocation addInvoke(String id, QName target, String operation, String request, String responsePattern) throws Exception {
+ protected Invocation addInvoke(String id, QName target, String operation, String request, String responsePattern)
+ throws Exception {
Invocation inv = new Invocation(id);
inv.target = target;
inv.operation = operation;
inv.request = DOMUtils.stringToDOM(request);
- inv.expectedStatus = null;
if (responsePattern != null) {
- inv.expectedFinalStatus = MessageExchange.Status.RESPONSE;
+ inv.expectedFinalStatus = AckType.RESPONSE;
+
inv.expectedResponsePattern = Pattern.compile(responsePattern, Pattern.DOTALL);
- }
+ } else
+ inv.expectedFinalStatus = AckType.ONEWAY;
_invocations.add(inv);
return inv;
}
-
+
protected void go() throws Exception {
try {
doDeployments();
@@ -271,23 +259,22 @@
protected void checkFailure() {
StringBuffer sb = new StringBuffer("Failure report:\n");
- for (Failure failure : _failures) {
+ for (Failure failure : _failures) {
sb.append(failure);
sb.append('\n');
}
- if (_failures.size() != 0) {
- System.err.println(sb.toString());
+ if (_failures.size() != 0) {
+ System.err.println(sb.toString());
Assert.fail(sb.toString());
- }
+ }
}
-
protected Deployment deploy(String location) {
Deployment deployment = new Deployment(makeDeployDir(location));
doDeployment(deployment);
return deployment;
}
-
+
protected void doDeployments() {
for (Deployment d : _deployments)
doDeployment(d);
@@ -303,7 +290,7 @@
try {
procs = store.deploy(d.deployDir);
-
+
_deployed.add(d);
} catch (Exception ex) {
if (d.expectedException == null) {
@@ -311,10 +298,9 @@
failure(d, "DEPLOY: Unexpected exception: " + ex, ex);
} else if (!d.expectedException.isAssignableFrom(ex.getClass())) {
ex.printStackTrace();
- failure(d, "DEPLOY: Wrong exception; expected " + d.expectedException + " but got " + ex.getClass(), ex);
+ failure(d, "DEPLOY: Wrong exception; expected " + d.expectedException + " but got " + ex.getClass(), ex);
}
-
return;
}
@@ -342,7 +328,7 @@
failure(d, "Undeployment failed.", ex);
}
}
-
+
_deployments.clear();
}
@@ -352,6 +338,7 @@
store.undeploy(d.deployDir);
}
}
+
protected void doInvokes() throws Exception {
ArrayList<Thread> testThreads = new ArrayList<Thread>();
for (Invocation i : _invocations) {
@@ -374,6 +361,7 @@
private void failure(Object where, String message, Exception ex) {
Failure f = new Failure(where, message, ex);
_failures.add(f);
+ ex.printStackTrace();
Assert.fail(f.toString());
}
@@ -394,28 +382,27 @@
Assert.fail("Resource not found: " + deployxml);
}
try {
- return new File(deployxmlurl.toURI().getPath()).getParentFile();
- } catch (URISyntaxException e) {
- e.printStackTrace();
- Assert.fail(e.getMessage());
- return null;
- }
+ return new File(deployxmlurl.toURI().getPath()).getParentFile();
+ } catch (URISyntaxException e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ return null;
+ }
}
/**
- * Override this to provide configuration properties for Ode extensions
- * like BpelEventListeners.
+ * Override this to provide configuration properties for Ode extensions like BpelEventListeners.
*
* @return
*/
protected Properties getConfigProperties() {
- // could also return null, returning an empty properties
- // object is more fail-safe.
- Properties p = new Properties();
- p.setProperty("debugeventlistener.dumpToStdOut", SHOW_EVENTS_ON_CONSOLE);
- return p;
+ // could also return null, returning an empty properties
+ // object is more fail-safe.
+ Properties p = new Properties();
+ p.setProperty("debugeventlistener.dumpToStdOut", SHOW_EVENTS_ON_CONSOLE);
+ return p;
}
-
+
protected static class Failure {
Object where;
@@ -442,7 +429,7 @@
public String toString() {
StringBuffer sbuf = new StringBuffer(where + ": " + msg);
if (ex != null) {
- sbuf.append("; got exception msg: " + ex.getMessage());
+ sbuf.append("; got exception msg: " + ex.getMessage());
}
if (actual != null)
sbuf.append("; got " + actual + ", expected " + expected);
@@ -499,11 +486,8 @@
/** If non-null, expect an exception of this class (or subclass) on invoke. */
public Class expectedInvokeException = null;
- /** If non-null, expecte this status right after invoke. */
- public MessageExchange.Status expectedStatus = null;
-
/** If non-null, expect this status after response received. */
- public MessageExchange.Status expectedFinalStatus = MessageExchange.Status.COMPLETED_OK;
+ public AckType expectedFinalStatus = AckType.RESPONSE;
/** If non-null, expect this correlation status right after invoke. */
public CorrelationStatus expectedCorrelationStatus = null;
@@ -550,21 +534,17 @@
} catch (Exception ex) {
}
- scheduler.begin();
try {
- mex = _server.getEngine().createMessageExchange(new GUID().toString(), _invocation.target, _invocation.operation);
- mexContext.clearCurrentResponse();
+ mex = _server.createMessageExchange(InvocationStyle.UNRELIABLE, _invocation.target, _invocation.operation,
+ new GUID().toString());
Message request = mex.createMessage(_invocation.requestType);
request.setMessage(_invocation.request);
_invocation.invokeTime = System.currentTimeMillis();
- running = mex.invoke(request);
+ mex.setRequest(request);
+ mex.invokeBlocking();
- Status status = mex.getStatus();
CorrelationStatus cstatus = mex.getCorrelationStatus();
- if (_invocation.expectedStatus != null && !status.equals(_invocation.expectedStatus))
- failure(_invocation, "Unexpected message exchange status", _invocation.expectedStatus, status);
-
if (_invocation.expectedCorrelationStatus != null && !cstatus.equals(_invocation.expectedCorrelationStatus))
failure(_invocation, "Unexpected correlation status", _invocation.expectedCorrelationStatus, cstatus);
@@ -575,19 +555,13 @@
failure(_invocation, "Unexpected invocation exception.", _invocation.expectedInvokeException, ex.getClass());
return;
- } finally {
- scheduler.commit();
}
- if (isFailed())
- return;
+ if (mex.getStatus() != Status.ACK)
+ failure(_invocation, "No ACK status", Status.ACK.toString(), mex.getStatus().toString());
- try {
- running.get(_invocation.maximumWaitMs, TimeUnit.MILLISECONDS);
- } catch (Exception ex) {
- failure(_invocation, "Exception on future object.", ex);
+ if (isFailed())
return;
- }
long ctime = System.currentTimeMillis();
long itime = ctime - _invocation.invokeTime;
@@ -600,32 +574,29 @@
if (isFailed())
return;
- scheduler.begin();
- try {
- Status finalstat = mex.getStatus();
- if (_invocation.expectedFinalStatus != null && !_invocation.expectedFinalStatus.equals(finalstat))
- if (finalstat.equals(Status.FAULT)) {
- failure(_invocation, "Unexpected final message exchange status", _invocation.expectedFinalStatus, "FAULT: "
- + mex.getFault() + " | " + mex.getFaultExplanation());
- } else {
- failure(_invocation, "Unexpected final message exchange status", _invocation.expectedFinalStatus, finalstat);
+ AckType finalstat = mex.getAckType();
+ if (_invocation.expectedFinalStatus != null && _invocation.expectedFinalStatus != finalstat) {
+ if (finalstat.equals(AckType.FAULT)) {
+ failure(_invocation, "Unexpected final message exchange status", _invocation.expectedFinalStatus, "FAULT: "
+ + mex.getFault() + " | " + mex.getFaultExplanation());
+ } else {
+ failure(_invocation, "Unexpected final message exchange status", _invocation.expectedFinalStatus, finalstat);
+
+ if (_invocation.expectedFinalCorrelationStatus != null
+ && !_invocation.expectedFinalCorrelationStatus.equals(mex.getCorrelationStatus())) {
+ failure(_invocation, "Unexpected final correlation status", _invocation.expectedFinalCorrelationStatus, mex
+ .getCorrelationStatus());
+ }
+ if (_invocation.expectedResponsePattern != null) {
+ if (mex.getResponse() == null)
+ failure(_invocation, "Expected response, but got none.", null);
+ String responseStr = DOMUtils.domToString(mex.getResponse().getMessage());
+ Matcher matcher = _invocation.expectedResponsePattern.matcher(responseStr);
+ if (!matcher.matches())
+ failure(_invocation, "Response does not match expected pattern", _invocation.expectedResponsePattern,
+ responseStr);
}
-
- if (_invocation.expectedFinalCorrelationStatus != null
- && !_invocation.expectedFinalCorrelationStatus.equals(mex.getCorrelationStatus())) {
- failure(_invocation, "Unexpected final correlation status", _invocation.expectedFinalCorrelationStatus, mex
- .getCorrelationStatus());
- }
- if (_invocation.expectedResponsePattern != null) {
- if (mex.getResponse() == null)
- failure(_invocation, "Expected response, but got none.", null);
- String responseStr = DOMUtils.domToString(mex.getResponse().getMessage());
- Matcher matcher = _invocation.expectedResponsePattern.matcher(responseStr);
- if (!matcher.matches())
- failure(_invocation, "Response does not match expected pattern", _invocation.expectedResponsePattern, responseStr);
}
- } finally {
- scheduler.commit();
}
}
}