You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by ms...@apache.org on 2007/09/06 07:46:57 UTC

svn commit: r573153 [6/9] - in /ode/trunk: ./ axis2/src/main/java/org/apache/ode/axis2/ bpel-api/src/ bpel-api/src/main/java/org/apache/ode/bpel/explang/ bpel-api/src/main/java/org/apache/ode/bpel/iapi/ bpel-api/src/main/java/org/apache/ode/bpel/pmapi/...

Modified: ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ProcessAndInstanceManagementImpl.java
URL: http://svn.apache.org/viewvc/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ProcessAndInstanceManagementImpl.java?rev=573153&r1=573152&r2=573153&view=diff
==============================================================================
--- ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ProcessAndInstanceManagementImpl.java (original)
+++ ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/ProcessAndInstanceManagementImpl.java Wed Sep  5 22:46:42 2007
@@ -359,7 +359,7 @@
     // INSTANCE ACTIONS
     //
     public InstanceInfoDocument fault(Long iid, QName faultname, Element faultData) {
-        // TODO: Implement
+        // TODO: implement me!
         return genInstanceInfoDocument(iid);
     }
 
@@ -397,7 +397,7 @@
                         return null;
                     for (ActivityRecoveryDAO recovery : instance.getActivityRecoveries()) {
                         if (recovery.getActivityId() == aid) {
-                            BpelProcess process = _server._engine._activeProcesses.get(instance.getProcess().getProcessId());
+                            BpelProcess process = _server.getBpelProcess(instance.getProcess().getProcessId());
                             if (process != null) {
                                 process.recoverActivity(instance, recovery.getChannel(), aid, action, null);
                                 break;
@@ -480,7 +480,7 @@
     public ActivityExtInfoListDocument getExtensibilityElements(QName pid, Integer[] aids) {
         ActivityExtInfoListDocument aeild = ActivityExtInfoListDocument.Factory.newInstance();
         TActivitytExtInfoList taeil = aeild.addNewActivityExtInfoList();
-        OProcess oprocess = _server._engine.getOProcess(pid);
+        OProcess oprocess = _server.getOProcess(pid);
 
         for (int aid : aids) {
             OBase obase = oprocess.getChild(aid);
@@ -516,7 +516,7 @@
      */
     protected final DebuggerSupport getDebugger(QName procid) throws ManagementException {
 
-        BpelProcess process = _server._engine._activeProcesses.get(procid);
+        BpelProcess process = _server.getBpelProcess(procid);
         if (process == null)
             throw new InvalidRequestException("The process \"" + procid + "\" is available.");
 
@@ -714,7 +714,7 @@
 
         TDeploymentInfo depinfo = info.addNewDeploymentInfo();
         depinfo.setPackage(pconf.getPackage());
-        depinfo.setDocument(pconf.getBpelDocument());
+        //depinfo.setDocument(pconf.getBpelDocument());
         depinfo.setDeployDate(toCalendar(pconf.getDeployDate()));
         depinfo.setDeployer(pconf.getDeployer());
         if (custom.includeInstanceSummary()) {
@@ -747,13 +747,13 @@
             }
         }
 
-        OProcess oprocess = _server._engine.getOProcess(pconf.getProcessId());
+        OProcess oprocess = _server.getOProcess(pconf.getProcessId());
         if (custom.includeEndpoints() && oprocess != null) {
             TEndpointReferences eprs = info.addNewEndpoints();
             for (OPartnerLink oplink : oprocess.getAllPartnerLinks()) {
                 if (oplink.hasPartnerRole() && oplink.initializePartnerRole) {
                     // TODO: this is very uncool.
-                    EndpointReference pepr = _server._engine._activeProcesses.get(pconf.getProcessId())
+                    EndpointReference pepr = _server.getBpelProcess(pconf.getProcessId())
                             .getInitialPartnerRoleEPR(oplink);
 
                     if (pepr != null) {
@@ -765,7 +765,6 @@
             }
         }
 
-        // TODO: add documents to the above data structure.
     }
 
     /**
@@ -1234,8 +1233,7 @@
                     else if ("version".equals(orderKey))
                         c = new Comparator<ProcessConf>() {
                             public int compare(ProcessConf o1, ProcessConf o2) {
-                                // TODO: implement version comparisons.
-                                return 0;
+                                return (int) (o1.getVersion() - o2.getVersion());
                             }
                         };
                     else if ("deployed".equals(orderKey))

Modified: ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/WorkEvent.java
URL: http://svn.apache.org/viewvc/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/WorkEvent.java?rev=573153&r1=573152&r2=573153&view=diff
==============================================================================
--- ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/WorkEvent.java (original)
+++ ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/WorkEvent.java Wed Sep  5 22:46:42 2007
@@ -59,8 +59,24 @@
         return _jobDetail;
     }
 
+    public String toString() {
+        return "WorkEvent" + _jobDetail;
+    }
+    
     public enum Type {
-        TIMER, RESUME, INVOKE_RESPONSE, MATCHER, INVOKE_INTERNAL
+        TIMER, 
+        
+        RESUME, 
+        
+        /** Response from partner (i.e. the result of a partner-role invoke) has been received. */
+        PARTNER_RESPONSE, 
+        
+        MATCHER, 
+        
+        /** Invoke a "my role" operation (i.e. implemented by the process). */
+        MYROLE_INVOKE, 
+        
+        MYROLE_INVOKE_ASYNC_RESPONSE
     }
 
     public String getChannel() {
@@ -99,16 +115,6 @@
     
     public void setCorrelationKey(CorrelationKey ckey) {
         _jobDetail.put("ckey", ckey == null ? null : ckey.toCanonicalString());
-    }
-
-    public void setInMem(boolean inmem) {
-        _jobDetail.put("inmem", inmem);
-    }
-
-    public boolean isInMem() {
-        Boolean bool = (Boolean) _jobDetail.get("inmem");
-        if (bool == null) return false;
-        else return bool;
     }
 
     public void setProcessId(QName pid) {

Modified: ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/FaultMessageExchangeException.java
URL: http://svn.apache.org/viewvc/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/FaultMessageExchangeException.java?rev=573153&r1=573152&r2=573153&view=diff
==============================================================================
--- ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/FaultMessageExchangeException.java (original)
+++ ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/FaultMessageExchangeException.java Wed Sep  5 22:46:42 2007
@@ -20,7 +20,7 @@
 
 import javax.xml.namespace.QName;
 
-import org.apache.ode.bpel.iapi.Message;
+import org.w3c.dom.Element;
 
 /**
  * Exception thrown by {@link org.apache.ode.bpel.intercept.MessageExchangeInterceptor}
@@ -32,9 +32,9 @@
 	private static final long serialVersionUID = 1L;
 
 	private QName _faultName;
-	private Message _faultData;
+	private Element _faultData;
 
-	public FaultMessageExchangeException(String errmsg, QName faultName, Message faultData) {
+	public FaultMessageExchangeException(String errmsg, QName faultName, Element faultData) {
 		super(errmsg);
 		
 		_faultName = faultName;
@@ -45,7 +45,7 @@
 		return _faultName;
 	}
 	
-	public Message getFaultData() {
+	public Element getFaultData() {
 		return _faultData;
 	}
 }

Modified: ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/InterceptorInvoker.java
URL: http://svn.apache.org/viewvc/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/InterceptorInvoker.java?rev=573153&r1=573152&r2=573153&view=diff
==============================================================================
--- ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/InterceptorInvoker.java (original)
+++ ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/InterceptorInvoker.java Wed Sep  5 22:46:42 2007
@@ -18,10 +18,7 @@
  */
 package org.apache.ode.bpel.intercept;
 
-import org.apache.ode.bpel.iapi.MessageExchange;
-import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
-import org.apache.ode.bpel.iapi.PartnerRoleMessageExchange;
-import org.apache.ode.bpel.intercept.MessageExchangeInterceptor.InterceptorContext;
+import org.apache.ode.bpel.intercept.MessageExchangeInterceptor.InterceptorEvent;
 
 /**
  * Helper for invoking the appropriate {@link org.apache.ode.bpel.intercept.MessageExchangeInterceptor}
@@ -34,35 +31,28 @@
 	private final String _name;
 	// Closures anyone? 
 	
-	/** Invoke {@link MessageExchangeInterceptor#onProcessInvoked(MyRoleMessageExchange, InterceptorContext)} */
+	/** Invoke {@link MessageExchangeInterceptor#onProcessInvoked(MyRoleMessageExchange, InterceptorEvent)} */
 	public static final InterceptorInvoker  __onProcessInvoked= new InterceptorInvoker("onProcessInvoked") {
-		public void invoke(MessageExchangeInterceptor i, MessageExchange mex, InterceptorContext ictx) 
+		public void invoke(MessageExchangeInterceptor i, InterceptorEvent ictx) 
 			throws FailMessageExchangeException, FaultMessageExchangeException {
-			i.onProcessInvoked((MyRoleMessageExchange) mex, ictx);
-		}
-	};
-	
-	/** Invoke {@link MessageExchangeInterceptor#onBpelServerInvoked(MyRoleMessageExchange, InterceptorContext)} */
-	public static final InterceptorInvoker __onBpelServerInvoked = new InterceptorInvoker("onBpelServerInvoked") {
-		public void invoke(MessageExchangeInterceptor i, MessageExchange mex, InterceptorContext ictx) 
-			throws FailMessageExchangeException, FaultMessageExchangeException {
-			i.onBpelServerInvoked((MyRoleMessageExchange) mex, ictx);
+			i.onProcessInvoked(ictx);
 		}
 	};
 
-	/** Invoke {@link MessageExchangeInterceptor#onPartnerInvoked(PartnerRoleMessageExchange, InterceptorContext)} */
+
+	/** Invoke {@link MessageExchangeInterceptor#onPartnerInvoked(PartnerRoleMessageExchange, InterceptorEvent)} */
 	public static final InterceptorInvoker __onPartnerInvoked = new InterceptorInvoker("onPartnerInvoked") {
-		public void invoke(MessageExchangeInterceptor i, MessageExchange mex, InterceptorContext ictx) 
+		public void invoke(MessageExchangeInterceptor i, InterceptorEvent ictx) 
 			throws FailMessageExchangeException, FaultMessageExchangeException {
-			i.onPartnerInvoked((PartnerRoleMessageExchange) mex, ictx);
+			i.onPartnerInvoked(ictx);
 		}
 	};
 
-	/** Invoke {@link MessageExchangeInterceptor#onPartnerInvoked(PartnerRoleMessageExchange, InterceptorContext)} */
+	/** Invoke {@link MessageExchangeInterceptor#onPartnerInvoked(PartnerRoleMessageExchange, InterceptorEvent)} */
 	public static final InterceptorInvoker __onNewInstanceInvoked = new InterceptorInvoker("onNewInstanceInvoked") {
-		public void invoke(MessageExchangeInterceptor i, MessageExchange mex, InterceptorContext ictx) 
+		public void invoke(MessageExchangeInterceptor i, InterceptorEvent ictx) 
 			throws FailMessageExchangeException, FaultMessageExchangeException {
-			i.onNewInstanceInvoked((MyRoleMessageExchange) mex, ictx);
+			i.onNewInstanceInvoked(ictx);
 		}
 	};
 
@@ -71,7 +61,7 @@
 		_name = name;
 	}
 	
-	public abstract void invoke(MessageExchangeInterceptor i, MessageExchange mex, InterceptorContext ictx)
+	public abstract void invoke(MessageExchangeInterceptor i, InterceptorEvent ictx)
 		throws FailMessageExchangeException, FaultMessageExchangeException;
 	
 	public String toString() {

Modified: ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/MessageExchangeInterceptor.java
URL: http://svn.apache.org/viewvc/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/MessageExchangeInterceptor.java?rev=573153&r1=573152&r2=573153&view=diff
==============================================================================
--- ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/MessageExchangeInterceptor.java (original)
+++ ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/MessageExchangeInterceptor.java Wed Sep  5 22:46:42 2007
@@ -19,14 +19,14 @@
 package org.apache.ode.bpel.intercept;
 
 import org.apache.ode.bpel.dao.BpelDAOConnection;
+import org.apache.ode.bpel.dao.MessageExchangeDAO;
 import org.apache.ode.bpel.dao.ProcessDAO;
-import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
-import org.apache.ode.bpel.iapi.PartnerRoleMessageExchange;
 import org.apache.ode.bpel.iapi.ProcessConf;
 
 /**
- * Hook into the BPEL server that enables intercepting of message exchange
- * invocation.
+ * Hook into the BPEL server that enables intercepting of parntner/server invocations. This interface operates at 
+ * a level that is a bit lower than the IAPI, as it allows access to internal engine datastructures. Caution should
+ * be used when implementing interceptors. 
  * 
  * @author Maciej Szefler
  * 
@@ -40,7 +40,7 @@
      * @param mex
      *            message exchange
      */
-    void onBpelServerInvoked(MyRoleMessageExchange mex, InterceptorContext ic)
+    void onBpelServerInvoked(InterceptorEvent ic)
         throws FailMessageExchangeException, FaultMessageExchangeException;
 
     /**
@@ -50,7 +50,7 @@
      * @param mex
      *            message exchange
      */
-    void onProcessInvoked(MyRoleMessageExchange mex, InterceptorContext ic)
+    void onProcessInvoked(InterceptorEvent ic)
         throws FailMessageExchangeException, FaultMessageExchangeException;
 
     /**
@@ -61,7 +61,7 @@
      * @param mex
      *            message exchange
      */
-    void onNewInstanceInvoked(MyRoleMessageExchange mex, InterceptorContext ic)
+    void onNewInstanceInvoked(InterceptorEvent ic)
         throws FailMessageExchangeException, FaultMessageExchangeException;
 
     /**
@@ -71,17 +71,30 @@
      * @param mex
      *            message exchange
      */
-    void onPartnerInvoked(PartnerRoleMessageExchange mex, InterceptorContext ic)
+    void onPartnerInvoked(InterceptorEvent ic)
         throws FailMessageExchangeException, FaultMessageExchangeException;
 
 
-    public interface InterceptorContext {
+    /**
+     * Representation of an intercept event. 
+     * 
+     * @author Maciej Szefler <mszefler at gmail dot com>
+     *
+     */
+    public interface InterceptorEvent {
 
+        /** Get the connection to the data store. */
         BpelDAOConnection getConnection();
-
+        
+        /** Get the DB representation of the process. */
         ProcessDAO getProcessDAO();
 
+        /** Get the process configuration. */
         ProcessConf getProcessConf();
+        
+        /** Get the database representation of the message exchange. */
+        MessageExchangeDAO getMessageExchangeDAO();
+        
 
     }
 }

Modified: ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/NoOpInterceptor.java
URL: http://svn.apache.org/viewvc/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/NoOpInterceptor.java?rev=573153&r1=573152&r2=573153&view=diff
==============================================================================
--- ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/NoOpInterceptor.java (original)
+++ ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/NoOpInterceptor.java Wed Sep  5 22:46:42 2007
@@ -18,8 +18,6 @@
  */
 package org.apache.ode.bpel.intercept;
 
-import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
-import org.apache.ode.bpel.iapi.PartnerRoleMessageExchange;
 
 /**
  * No-Op implementation of the
@@ -31,25 +29,20 @@
  */
 public class NoOpInterceptor implements MessageExchangeInterceptor {
 
-	public void onBpelServerInvoked(MyRoleMessageExchange mex,
-			InterceptorContext ic) throws FailMessageExchangeException,
-			FaultMessageExchangeException {
-	}
-
-	public void onProcessInvoked(MyRoleMessageExchange mex,
-			InterceptorContext ic) throws FailMessageExchangeException,
-			FaultMessageExchangeException {
-	}
-
-	public void onPartnerInvoked(PartnerRoleMessageExchange mex,
-			InterceptorContext ic) throws FailMessageExchangeException,
-			FaultMessageExchangeException {
-	}
-
-	public void onNewInstanceInvoked(MyRoleMessageExchange mex,
-			InterceptorContext ic) throws FailMessageExchangeException,
-			FaultMessageExchangeException {
+    public void onBpelServerInvoked(InterceptorEvent ic) throws FailMessageExchangeException, FaultMessageExchangeException {
+        
+    }
 
-	}
+    public void onNewInstanceInvoked(InterceptorEvent ic) throws FailMessageExchangeException, FaultMessageExchangeException {
+        
+    }
+
+    public void onPartnerInvoked(InterceptorEvent ic) throws FailMessageExchangeException, FaultMessageExchangeException {
+
+    }
+
+    public void onProcessInvoked(InterceptorEvent ic) throws FailMessageExchangeException, FaultMessageExchangeException {
+        
+    }
 
 }

Modified: ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/ThrottlingInterceptor.java
URL: http://svn.apache.org/viewvc/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/ThrottlingInterceptor.java?rev=573153&r1=573152&r2=573153&view=diff
==============================================================================
--- ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/ThrottlingInterceptor.java (original)
+++ ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/intercept/ThrottlingInterceptor.java Wed Sep  5 22:46:42 2007
@@ -18,12 +18,12 @@
  */
 package org.apache.ode.bpel.intercept;
 
-import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
-import org.w3c.dom.Node;
-import org.w3c.dom.Text;
+import java.util.Map;
 
 import javax.xml.namespace.QName;
-import java.util.Map;
+
+import org.w3c.dom.Node;
+import org.w3c.dom.Text;
 
 /**
  * An example of a  simple interceptor providing a "throttling"  capability - that is an 
@@ -36,8 +36,7 @@
     private static final QName PROP_MAX_INSTANCES = new QName("urn:org.apache.ode.bpel.intercept", "maxInstances");
 
     @Override
-    public void onNewInstanceInvoked(MyRoleMessageExchange mex,
-                                     InterceptorContext ic) throws FailMessageExchangeException {
+    public void onNewInstanceInvoked(InterceptorEvent ic) throws FailMessageExchangeException {
         int maxInstances;
         try {
             maxInstances = Integer.valueOf(getSimpleProperty(PROP_MAX_INSTANCES, ic));
@@ -56,7 +55,7 @@
      * @param ic interceptor context
      * @return value of the property, or <code>null</code> if not set
      */
-    private String getSimpleProperty(QName propertyName, InterceptorContext ic) {
+    private String getSimpleProperty(QName propertyName, InterceptorEvent ic) {
         Map<QName, Node> props =  ic.getProcessConf().getProperties();
         for (Map.Entry<QName, Node> prop : props.entrySet()) {
             if (prop.getKey().equals(propertyName))

Modified: ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/BpelDAOConnectionFactoryImpl.java
URL: http://svn.apache.org/viewvc/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/BpelDAOConnectionFactoryImpl.java?rev=573153&r1=573152&r2=573153&view=diff
==============================================================================
--- ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/BpelDAOConnectionFactoryImpl.java (original)
+++ ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/BpelDAOConnectionFactoryImpl.java Wed Sep  5 22:46:42 2007
@@ -22,6 +22,7 @@
 import org.apache.ode.bpel.dao.BpelDAOConnectionFactory;
 import org.apache.ode.bpel.iapi.Scheduler;
 
+import javax.transaction.TransactionManager;
 import javax.xml.namespace.QName;
 import java.util.HashMap;
 import java.util.Map;
@@ -33,14 +34,14 @@
 public class BpelDAOConnectionFactoryImpl implements BpelDAOConnectionFactory {
     private static final Map<QName, ProcessDaoImpl> __StateStore = new HashMap<QName, ProcessDaoImpl>();
 
-    private Scheduler _scheduler;
+    private TransactionManager _txm;
 
-    public BpelDAOConnectionFactoryImpl(Scheduler sched) {
-        _scheduler = sched;
+    public BpelDAOConnectionFactoryImpl(TransactionManager txm) {
+        _txm = txm;
     }
 
     public BpelDAOConnection getConnection() {
-        return new BpelDAOConnectionImpl(__StateStore, _scheduler);
+        return new BpelDAOConnectionImpl(__StateStore, _txm);
     }
 
     /**

Modified: ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/BpelDAOConnectionImpl.java
URL: http://svn.apache.org/viewvc/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/BpelDAOConnectionImpl.java?rev=573153&r1=573152&r2=573153&view=diff
==============================================================================
--- ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/BpelDAOConnectionImpl.java (original)
+++ ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/BpelDAOConnectionImpl.java Wed Sep  5 22:46:42 2007
@@ -18,6 +18,22 @@
  */
 package org.apache.ode.bpel.memdao;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.transaction.Status;
+import javax.transaction.Synchronization;
+import javax.transaction.TransactionManager;
+import javax.xml.namespace.QName;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.ode.bpel.common.BpelEventFilter;
@@ -30,25 +46,10 @@
 import org.apache.ode.bpel.dao.ProcessInstanceDAO;
 import org.apache.ode.bpel.dao.ScopeDAO;
 import org.apache.ode.bpel.evt.BpelEvent;
-import org.apache.ode.bpel.iapi.Scheduler;
 import org.apache.ode.utils.ISO8601DateParser;
 import org.apache.ode.utils.stl.CollectionsX;
 import org.apache.ode.utils.stl.UnaryFunction;
 
-import javax.xml.namespace.QName;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.ConcurrentHashMap;
-
-
 /**
  * A very simple, in-memory implementation of the {@link BpelDAOConnection} interface.
  */
@@ -56,17 +57,21 @@
     private static final Log __log = LogFactory.getLog(BpelDAOConnectionImpl.class);
     public static long TIME_TO_LIVE = 10*60*1000;
 
-    private Scheduler _scheduler;
+    private TransactionManager _txm;
+
     private Map<QName, ProcessDaoImpl> _store;
+
     private List<BpelEvent> _events = new LinkedList<BpelEvent>();
-    private static Map<String,MessageExchangeDAO> _mexStore = Collections.synchronizedMap(new HashMap<String,MessageExchangeDAO>());
-    protected static Map<String, Long> _mexAge = new ConcurrentHashMap<String, Long>();
+
+    private final List<MessageExchangeDAOImpl> _mexList = new LinkedList<MessageExchangeDAOImpl>();
+    private final Map<String, MessageExchangeDAOImpl> _mexStore = new HashMap<String, MessageExchangeDAOImpl>();
+
     private static AtomicLong counter = new AtomicLong(Long.MAX_VALUE / 2);
     private static volatile long _lastRemoval = 0;
 
-    BpelDAOConnectionImpl(Map<QName, ProcessDaoImpl> store, Scheduler scheduler) {
+    BpelDAOConnectionImpl(Map<QName, ProcessDaoImpl> store, TransactionManager txm) {
         _store = store;
-        _scheduler = scheduler;
+        _txm = txm;
     }
 
     public ProcessDAO getProcess(QName processId) {
@@ -74,8 +79,8 @@
     }
 
     public ProcessDAO createProcess(QName pid, QName type, String guid, long version) {
-        ProcessDaoImpl process = new ProcessDaoImpl(this,_store,pid,type, guid,version);
-        _store.put(pid,process);
+        ProcessDaoImpl process = new ProcessDaoImpl(this, _store, pid, type, guid, version);
+        _store.put(pid, process);
         return process;
     }
 
@@ -89,13 +94,12 @@
     }
 
     public Collection<ProcessInstanceDAO> instanceQuery(InstanceFilter filter) {
-        if(filter.getLimit()==0) {
+        if (filter.getLimit() == 0) {
             return Collections.EMPTY_LIST;
         }
         List<ProcessInstanceDAO> matched = new ArrayList<ProcessInstanceDAO>();
         // Selecting
-        selectionCompleted:
-        for (ProcessDaoImpl proc : _store.values()) {
+        selectionCompleted: for (ProcessDaoImpl proc : _store.values()) {
             boolean pmatch = true;
             if (filter.getNameFilter() != null
                     && !equalsOrWildcardMatch(filter.getNameFilter(), proc.getProcessId().getLocalPart()))
@@ -111,9 +115,11 @@
                     if (filter.getStatusFilter() != null) {
                         boolean statusMatch = false;
                         for (Short status : filter.convertFilterState()) {
-                            if (inst.getState() == status.byteValue()) statusMatch = true;
+                            if (inst.getState() == status.byteValue())
+                                statusMatch = true;
                         }
-                        if (!statusMatch) match = false;
+                        if (!statusMatch)
+                            match = false;
                     }
                     if (filter.getStartedDateFilter() != null
                             && !dateMatch(filter.getStartedDateFilter(), inst.getCreateTime(), filter))
@@ -122,25 +128,25 @@
                             && !dateMatch(filter.getLastActiveDateFilter(), inst.getLastActiveTime(), filter))
                         match = false;
 
-//                    if (filter.getPropertyValuesFilter() != null) {
-//                        for (Map.Entry propEntry : filter.getPropertyValuesFilter().entrySet()) {
-//                            boolean entryMatched = false;
-//                            for (ProcessPropertyDAO prop : proc.getProperties()) {
-//                                if (prop.getName().equals(propEntry.getKey())
-//                                        && (propEntry.getValue().equals(prop.getMixedContent())
-//                                        || propEntry.getValue().equals(prop.getSimpleContent()))) {
-//                                    entryMatched = true;
-//                                }
-//                            }
-//                            if (!entryMatched) {
-//                                match = false;
-//                            }
-//                        }
-//                    }
+                    // if (filter.getPropertyValuesFilter() != null) {
+                    // for (Map.Entry propEntry : filter.getPropertyValuesFilter().entrySet()) {
+                    // boolean entryMatched = false;
+                    // for (ProcessPropertyDAO prop : proc.getProperties()) {
+                    // if (prop.getName().equals(propEntry.getKey())
+                    // && (propEntry.getValue().equals(prop.getMixedContent())
+                    // || propEntry.getValue().equals(prop.getSimpleContent()))) {
+                    // entryMatched = true;
+                    // }
+                    // }
+                    // if (!entryMatched) {
+                    // match = false;
+                    // }
+                    // }
+                    // }
 
                     if (match) {
                         matched.add(inst);
-                        if(matched.size()==filter.getLimit()) {
+                        if (matched.size() == filter.getLimit()) {
                             break selectionCompleted;
                         }
                     }
@@ -153,9 +159,10 @@
 
             Collections.sort(matched, new Comparator<ProcessInstanceDAO>() {
                 public int compare(ProcessInstanceDAO o1, ProcessInstanceDAO o2) {
-                    for (String orderKey: orders) {
+                    for (String orderKey : orders) {
                         int result = compareInstanceUsingKey(orderKey, o1, o2);
-                        if (result != 0) return result;
+                        if (result != 0)
+                            return result;
                     }
                     return 0;
                 }
@@ -175,37 +182,65 @@
         throw new UnsupportedOperationException("Can't query process configuration using a transient DAO.");
     }
 
-    public MessageExchangeDAO createMessageExchange(char dir) {
-        final String id = Long.toString(counter.getAndIncrement());
-        MessageExchangeDAO mex = new MessageExchangeDAOImpl(dir,id);
-        long now = System.currentTimeMillis();
-        _mexStore.put(id,mex);
-        _mexAge.put(id, now);
-
-        if (now > _lastRemoval + (TIME_TO_LIVE/10)) {
-            _lastRemoval = now;
-            Object[] oldMexs = _mexAge.keySet().toArray();
-            for (int i=oldMexs.length-1; i>0; i--) {
-                String oldMex = (String) oldMexs[i];
-                Long age = _mexAge.get(oldMex);
-                if (age != null && now-age > TIME_TO_LIVE) {
-                    removeMessageExchange(oldMex);
-                    _mexAge.remove(oldMex);
-                }
-            }
+    public MessageExchangeDAO createMessageExchange(final String mexId, char dir) {
+        MessageExchangeDAOImpl mex = new MessageExchangeDAOImpl(dir, mexId);
+        mex.createTime  = new Date();
+        
+        // FIXME: Why is this necessary? We should explicitly remove these thigs -mbs
+        
+        synchronized (_mexStore) {
+            _mexStore.put(mexId, mex);
+            _mexList.add(mex);
         }
 
+
+        cleanupDeadWood();
+        
         // Removing right away on rollback
         onRollback(new Runnable() {
             public void run() {
-                removeMessageExchange(id);
-                _mexAge.remove(id);
+                synchronized (_mexStore) {
+                    MessageExchangeDAOImpl mexdao = _mexStore.remove(mexId);
+                    
+                    if (mexdao != null) 
+                        _mexList.remove(mexdao);
+                }
             }
         });
 
         return mex;
     }
 
+
+    
+    /**
+     * Remove old message exchanges from the Mex store.
+     * 
+     */
+    private void cleanupDeadWood() {
+        long now = System.currentTimeMillis();
+        
+        if (now  > _lastRemoval + (TIME_TO_LIVE/10)) {
+            _lastRemoval = now;
+            
+            synchronized (_mexStore) {
+                LinkedList trash = new LinkedList<MessageExchangeDAOImpl>();
+                for (MessageExchangeDAOImpl mexdao : _mexList) {
+                    long createtime = mexdao._createTime.getTime();
+                    if (now-createtime> TIME_TO_LIVE) {
+                        trash.add(mexdao);
+                    } else 
+                        break;
+                }
+                
+                _mexList.removeAll(trash);
+                _mexStore.values().removeAll(trash);
+            }
+        }
+
+        
+    }
+
     public MessageExchangeDAO getMessageExchange(String mexid) {
         return _mexStore.get(mexid);
     }
@@ -217,7 +252,8 @@
         String orderKey = key;
         if (key.startsWith("+") || key.startsWith("-")) {
             orderKey = key.substring(1, key.length());
-            if (key.startsWith("-")) ascending = false;
+            if (key.startsWith("-"))
+                ascending = false;
         }
         ProcessDAO process1 = getProcess(instanceDAO1.getProcess().getProcessId());
         ProcessDAO process2 = getProcess(instanceDAO2.getProcess().getProcessId());
@@ -231,11 +267,11 @@
             s1 = process1.getProcessId().getNamespaceURI();
             s2 = process2.getProcessId().getNamespaceURI();
         } else if ("version".equals(orderKey)) {
-            s1 = ""+process1.getVersion();
-            s2 = ""+process2.getVersion();
+            s1 = "" + process1.getVersion();
+            s2 = "" + process2.getVersion();
         } else if ("status".equals(orderKey)) {
-            s1 = ""+instanceDAO1.getState();
-            s2 = ""+instanceDAO2.getState();
+            s1 = "" + instanceDAO1.getState();
+            s2 = "" + instanceDAO2.getState();
         } else if ("started".equals(orderKey)) {
             s1 = ISO8601DateParser.format(instanceDAO1.getCreateTime());
             s2 = ISO8601DateParser.format(instanceDAO2.getCreateTime());
@@ -243,62 +279,71 @@
             s1 = ISO8601DateParser.format(instanceDAO1.getLastActiveTime());
             s2 = ISO8601DateParser.format(instanceDAO2.getLastActiveTime());
         }
-        if (ascending) return s1.compareTo(s2);
-        else return s2.compareTo(s1);
+        if (ascending)
+            return s1.compareTo(s2);
+        else
+            return s2.compareTo(s1);
     }
 
     private boolean equalsOrWildcardMatch(String s1, String s2) {
-        if (s1 == null || s2 == null) return false;
-        if (s1.equals(s2)) return true;
+        if (s1 == null || s2 == null)
+            return false;
+        if (s1.equals(s2))
+            return true;
         if (s1.endsWith("*")) {
-            if (s2.startsWith(s1.substring(0, s1.length() - 1))) return true;
+            if (s2.startsWith(s1.substring(0, s1.length() - 1)))
+                return true;
         }
         if (s2.endsWith("*")) {
-            if (s1.startsWith(s2.substring(0, s2.length() - 1))) return true;
+            if (s1.startsWith(s2.substring(0, s2.length() - 1)))
+                return true;
         }
         return false;
     }
 
-    public boolean dateMatch(List<String> dateFilters, Date instanceDate,  InstanceFilter filter) {
+    public boolean dateMatch(List<String> dateFilters, Date instanceDate, InstanceFilter filter) {
         boolean match = true;
         for (String ddf : dateFilters) {
             String isoDate = ISO8601DateParser.format(instanceDate);
             String critDate = Filter.getDateWithoutOp(ddf);
             if (ddf.startsWith("=")) {
-                if (!isoDate.startsWith(critDate)) match = false;
+                if (!isoDate.startsWith(critDate))
+                    match = false;
             } else if (ddf.startsWith("<=")) {
-                if (!isoDate.startsWith(critDate) && isoDate.compareTo(critDate) > 0) match = false;
+                if (!isoDate.startsWith(critDate) && isoDate.compareTo(critDate) > 0)
+                    match = false;
             } else if (ddf.startsWith(">=")) {
-                if (!isoDate.startsWith(critDate) && isoDate.compareTo(critDate) < 0) match = false;
+                if (!isoDate.startsWith(critDate) && isoDate.compareTo(critDate) < 0)
+                    match = false;
             } else if (ddf.startsWith("<")) {
-                if (isoDate.compareTo(critDate) > 0) match = false;
+                if (isoDate.compareTo(critDate) > 0)
+                    match = false;
             } else if (ddf.startsWith(">")) {
-                if (isoDate.compareTo(critDate) < 0) match = false;
+                if (isoDate.compareTo(critDate) < 0)
+                    match = false;
             }
         }
         return match;
     }
 
-
     public ScopeDAO getScope(Long siidl) {
         for (ProcessDaoImpl process : _store.values()) {
             for (ProcessInstanceDAO instance : process._instances.values()) {
-                if (instance.getScope(siidl) != null) return instance.getScope(siidl);
+                if (instance.getScope(siidl) != null)
+                    return instance.getScope(siidl);
             }
         }
         return null;
     }
 
-
     public void insertBpelEvent(BpelEvent event, ProcessDAO processConfiguration, ProcessInstanceDAO instance) {
         _events.add(event);
     }
 
-
     public List<Date> bpelEventTimelineQuery(InstanceFilter ifilter, BpelEventFilter efilter) {
         // TODO : Provide more correct implementation:
         ArrayList<Date> dates = new ArrayList<Date>();
-        CollectionsX.transform(dates, _events, new UnaryFunction<BpelEvent,Date>() {
+        CollectionsX.transform(dates, _events, new UnaryFunction<BpelEvent, Date>() {
             public Date apply(BpelEvent x) {
                 return x.getTimestamp();
             }
@@ -306,7 +351,6 @@
         return dates;
     }
 
-
     public List<BpelEvent> bpelEventQuery(InstanceFilter ifilter, BpelEventFilter efilter) {
         // TODO : Provide a more correct (filtering) implementation:
         return _events;
@@ -316,35 +360,39 @@
      * @see org.apache.ode.bpel.dao.BpelDAOConnection#instanceQuery(String)
      */
     public Collection<ProcessInstanceDAO> instanceQuery(String expression) {
-        //TODO
+        // TODO
         throw new UnsupportedOperationException();
     }
 
-    static void removeMessageExchange(String mexId) {
-        // Cleaning up mex
-        if (__log.isDebugEnabled()) __log.debug("Removing mex " + mexId + " from memory store.");
-        MessageExchangeDAO mex = _mexStore.remove(mexId);
-        if (mex == null)
-            __log.warn("Couldn't find mex " + mexId + " for cleanup.");
-        _mexAge.remove(mexId);
-    }
-
     public void defer(final Runnable runnable) {
-        _scheduler.registerSynchronizer(new Scheduler.Synchronizer() {
-            public void afterCompletion(boolean success) {
-            }
-            public void beforeCompletion() {
-                runnable.run();
-            }
-        });
+        try {
+            _txm.getTransaction().registerSynchronization(new Synchronization() {
+                public void afterCompletion(int status) {
+                }
+
+                public void beforeCompletion() {
+                    runnable.run();
+                }
+            });
+       
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
     }
+    
+    
     public void onRollback(final Runnable runnable) {
-        _scheduler.registerSynchronizer(new Scheduler.Synchronizer() {
-            public void afterCompletion(boolean success) {
-                if (!success) runnable.run();
-            }
-            public void beforeCompletion() {
-            }
-        });
+        try {
+            _txm.getTransaction().registerSynchronization(new Synchronization() {
+                public void afterCompletion(int status) {
+                    if (status != Status.STATUS_COMMITTED) runnable.run();
+                }
+                
+                public void beforeCompletion() {}
+            });
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
     }
+    
 }

Modified: ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/MessageDAOImpl.java
URL: http://svn.apache.org/viewvc/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/MessageDAOImpl.java?rev=573153&r1=573152&r2=573153&view=diff
==============================================================================
--- ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/MessageDAOImpl.java (original)
+++ ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/MessageDAOImpl.java Wed Sep  5 22:46:42 2007
@@ -19,20 +19,17 @@
 
 package org.apache.ode.bpel.memdao;
 
+import javax.xml.namespace.QName;
+
 import org.apache.ode.bpel.dao.MessageDAO;
-import org.apache.ode.bpel.dao.MessageExchangeDAO;
 import org.apache.ode.utils.DOMUtils;
 import org.w3c.dom.Element;
 
-import javax.xml.namespace.QName;
-
 public class MessageDAOImpl extends DaoBaseImpl implements MessageDAO {
 	private QName type;
 	private Element data;
-	private MessageExchangeDAO messageExchange;
 	
-	public MessageDAOImpl(MessageExchangeDAO messageExchange) {
-		this.messageExchange = messageExchange;
+	public MessageDAOImpl() {
 	}
 
 	public void setType(QName type) {
@@ -54,8 +51,5 @@
 		return data;
 	}
 
-	public MessageExchangeDAO getMessageExchange() {
-		return messageExchange;
-	}
-
+	
 }

Modified: ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/MessageExchangeDAOImpl.java
URL: http://svn.apache.org/viewvc/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/MessageExchangeDAOImpl.java?rev=573153&r1=573152&r2=573153&view=diff
==============================================================================
--- ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/MessageExchangeDAOImpl.java (original)
+++ ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/MessageExchangeDAOImpl.java Wed Sep  5 22:46:42 2007
@@ -19,46 +19,54 @@
 
 package org.apache.ode.bpel.memdao;
 
+import java.util.Date;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+import java.util.Map.Entry;
+
+import javax.xml.namespace.QName;
+
 import org.apache.ode.bpel.dao.MessageDAO;
 import org.apache.ode.bpel.dao.MessageExchangeDAO;
 import org.apache.ode.bpel.dao.PartnerLinkDAO;
 import org.apache.ode.bpel.dao.ProcessDAO;
 import org.apache.ode.bpel.dao.ProcessInstanceDAO;
+import org.apache.ode.bpel.iapi.InvocationStyle;
+import org.apache.ode.bpel.iapi.MessageExchange.AckType;
+import org.apache.ode.bpel.iapi.MessageExchange.FailureType;
+import org.apache.ode.bpel.iapi.MessageExchange.Status;
 import org.w3c.dom.Element;
 
-import javax.xml.namespace.QName;
-import java.util.Date;
-import java.util.HashSet;
-import java.util.Map.Entry;
-import java.util.Properties;
-import java.util.Set;
-
 public class MessageExchangeDAOImpl extends DaoBaseImpl implements MessageExchangeDAO {
 
-    private String messageExchangeId;
-	private MessageDAO response;
-	private Date createTime;
-	private MessageDAO request;
-	private String operation;
-	private QName portType;
-	private String status;
-	private int partnerLinkModelId;
-	private String correlationId;
-	private String pattern;
-	private Element ePR;
-	private Element callbackEPR;
-	private String channel;
-	private boolean propagateTransactionFlag;
-	private QName fault;
-    private String faultExplanation;
-    private String correlationStatus;
-	private ProcessDAO process;
-	private ProcessInstanceDAO instance;
-	private char direction;
-	private QName callee;
-	private Properties properties = new Properties();
-    private PartnerLinkDAOImpl _plink;
-    private String pipedMessageExchangeId;
+    String messageExchangeId;
+	MessageDAO response;
+	Date createTime;
+	MessageDAO request;
+	String operation;
+	QName portType;
+	Status status;
+	int partnerLinkModelId;
+	String correlationId;
+	String pattern;
+	Element ePR;
+	String channel;
+	QName fault;
+    String faultExplanation;
+    String correlationStatus;
+	ProcessDAO process;
+	ProcessInstanceDAO instance;
+	char direction;
+	QName callee;
+	Properties properties = new Properties();
+    PartnerLinkDAOImpl _plink;
+    InvocationStyle _istyle;
+    String _pipedExchange;
+    FailureType _failureType;
+    long _timeout;
+    AckType _ackType;
+    QName _pipedPID;
 
 	public MessageExchangeDAOImpl(char direction, String messageEchangeId){
 		this.direction = direction;
@@ -94,17 +102,17 @@
 
 	}
 
-	public void setStatus(String status) {
+	public void setStatus(Status status) {
 		this.status = status;
 
 	}
 
-	public String getStatus() {
+	public Status getStatus() {
 		return status;
 	}
 
 	public MessageDAO createMessage(QName type) {
-		MessageDAO messageDAO = new MessageDAOImpl(this);
+		MessageDAO messageDAO = new MessageDAOImpl();
 		messageDAO.setType(type);
 		return messageDAO;
 	}
@@ -128,11 +136,11 @@
 
 	}
 
-	public String getCorrelationId() {
+	public String getPartnersKey() {
 		return correlationId;
 	}
 
-	public void setCorrelationId(String correlationId) {
+	public void setPartnersKey(String correlationId) {
 		this.correlationId = correlationId;
 
 	}
@@ -156,15 +164,7 @@
 		return ePR;
 	}
 
-	public void setCallbackEPR(Element epr) {
-		this.callbackEPR = epr;
-
-	}
-
-	public Element getCallbackEPR() {
-		return callbackEPR;
-	}
-
+	
 	public String getPattern() {
 		return pattern;
 	}
@@ -177,10 +177,6 @@
 		this.channel = string;
 	}
 
-	public boolean getPropagateTransactionFlag() {
-		return propagateTransactionFlag;
-	}
-
 	public QName getFault() {
 		return fault;
 	}
@@ -198,7 +194,6 @@
     }
 
 
-
     public void setCorrelationStatus(String cstatus) {
 		this.correlationStatus = cstatus;
 	}
@@ -263,13 +258,6 @@
         return retVal;
     }
 
-    public String getPipedMessageExchangeId() {
-        return pipedMessageExchangeId;
-    }
-
-    public void setPipedMessageExchangeId(String pipedMessageExchangeId) {
-        this.pipedMessageExchangeId = pipedMessageExchangeId;
-    }
 
     public void release() {
         instance = null;
@@ -277,10 +265,60 @@
         _plink = null;
         request = null;
         response = null;
-        BpelDAOConnectionImpl.removeMessageExchange(getMessageExchangeId());
     }
 
     public String toString() {
         return "mem.mex(direction=" + direction + " id=" + messageExchangeId + ")";
+    }
+
+    public InvocationStyle getInvocationStyle() {
+        return _istyle;
+    }
+
+    public String getPipedMessageExchangeId() {
+        return _pipedExchange;
+    }
+
+    public void setFailureType(FailureType failureType) {
+        _failureType = failureType;
+    }
+
+    public FailureType getFailureType() {
+        return _failureType;
+    }
+    
+    public void setInvocationStyle(InvocationStyle invocationStyle) {
+        _istyle = invocationStyle;
+        
+    }
+
+    public void setPipedMessageExchangeId(String pipedMexId) {
+        _pipedExchange = pipedMexId;
+        
+    }
+
+    public long getTimeout() {
+        return _timeout;
+    }
+
+    public void setTimeout(long timeout) {
+        _timeout = timeout;
+    }
+
+    public AckType getAckType() {
+        return _ackType;
+    }
+
+    public void setAckType(AckType ackType) {
+        _ackType = ackType;
+    }
+
+    public QName getPipedPID() {
+        return _pipedPID;
+    }
+
+    public void setPipedPID(QName pipedPid) {
+        _pipedPID = pipedPid;
+        
     }
 }

Modified: ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/ProcessDaoImpl.java
URL: http://svn.apache.org/viewvc/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/ProcessDaoImpl.java?rev=573153&r1=573152&r2=573153&view=diff
==============================================================================
--- ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/ProcessDaoImpl.java (original)
+++ ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/ProcessDaoImpl.java Wed Sep  5 22:46:42 2007
@@ -22,6 +22,9 @@
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Date;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -52,10 +55,10 @@
     protected final Map<Integer, PartnerLinkDAO> _plinks = new ConcurrentHashMap<Integer, PartnerLinkDAO>();
     private Map<QName, ProcessDaoImpl> _store;
     private BpelDAOConnectionImpl _conn;
-    private int _executionCount = 0;
     private Collection<Long> _instancesToRemove = new ConcurrentLinkedQueue<Long>();
     private static volatile long _lastRemoval = 0;
 
+
     private String _guid;
 
     public ProcessDaoImpl(BpelDAOConnectionImpl conn, Map<QName, ProcessDaoImpl> store,
@@ -136,7 +139,6 @@
             }
         });
 
-        _executionCount++;
         return newInstance;
     }
 
@@ -204,8 +206,7 @@
     }
 
     public int getNumInstances() {
-        // Instances are removed after execution, using a counter instead
-        return _executionCount;
+        return _instances.size();
     }
 
     public ProcessInstanceDAO getInstanceWithLock(Long iid) {

Modified: ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/ProcessInstanceDaoImpl.java
URL: http://svn.apache.org/viewvc/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/ProcessInstanceDaoImpl.java?rev=573153&r1=573152&r2=573153&view=diff
==============================================================================
--- ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/ProcessInstanceDaoImpl.java (original)
+++ ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/memdao/ProcessInstanceDaoImpl.java Wed Sep  5 22:46:42 2007
@@ -45,33 +45,52 @@
 import java.util.Set;
 
 /**
- * A very simple, in-memory implementation of the {@link ProcessInstanceDAO}
- * interface.
+ * A very simple, in-memory implementation of the {@link ProcessInstanceDAO} interface.
  */
 public class ProcessInstanceDaoImpl extends DaoBaseImpl implements ProcessInstanceDAO {
     private static final Collection<ScopeDAO> EMPTY_SCOPE_DAOS = Collections.emptyList();
 
     private short _previousState;
+
     private short _state;
+
     private Long _instanceId;
+
     private ProcessDaoImpl _processDao;
+
     private Object _soup;
+
     private Map<Long, ScopeDAO> _scopes = new HashMap<Long, ScopeDAO>();
+
     private Map<String, List<ScopeDAO>> _scopesByName = new HashMap<String, List<ScopeDAO>>();
+
     private Map<String, byte[]> _messageExchanges = new HashMap<String, byte[]>();
+
     private ScopeDAO _rootScope;
+
     private FaultDAO _fault;
+
     private CorrelatorDAO _instantiatingCorrelator;
+
     private BpelDAOConnection _conn;
+
     private int _failureCount;
+
     private Date _failureDateTime;
+
     private Map<String, ActivityRecoveryDAO> _activityRecoveries = new HashMap<String, ActivityRecoveryDAO>();
 
     // TODO: Remove this, we should be using the main event store...
     private List<ProcessInstanceEvent> _events = new ArrayList<ProcessInstanceEvent>();
+
     private Date _lastActive;
+
     private int _seq;
 
+    private byte[] _execState;
+
+    private int _execStateCount;
+
     ProcessInstanceDaoImpl(BpelDAOConnection conn, ProcessDaoImpl processDao, CorrelatorDAO correlator) {
         _state = 0;
         _processDao = processDao;
@@ -125,11 +144,11 @@
      * @see ProcessInstanceDAO#getExecutionState()
      */
     public byte[] getExecutionState() {
-        throw new IllegalStateException("In-memory instances are never serialized");
+        return _execState;
     }
 
     public void setExecutionState(byte[] bytes) {
-        throw new IllegalStateException("In-memory instances are never serialized");
+        _execState = bytes;
     }
 
     public Object getSoup() {
@@ -314,7 +333,7 @@
     }
 
     public void createActivityRecovery(String channel, long activityId, String reason, Date dateTime, Element data,
-                                       String[] actions, int retries) {
+            String[] actions, int retries) {
         _activityRecoveries
                 .put(channel, new ActivityRecoveryDAOImpl(channel, activityId, reason, dateTime, data, actions, retries));
         _failureCount = _activityRecoveries.size();
@@ -347,7 +366,7 @@
         private int _retries;
 
         ActivityRecoveryDAOImpl(String channel, long activityId, String reason, Date dateTime, Element details, String[] actions,
-                                int retries) {
+                int retries) {
             _activityId = activityId;
             _channel = channel;
             _reason = reason;
@@ -404,5 +423,13 @@
 
     public String toString() {
         return "mem.instance(type=" + _processDao.getType() + " iid=" + _instanceId + ")";
+    }
+
+    public int getExecutionStateCounter() {
+        return _execStateCount;
+    }
+
+    public void setExecutionStateCounter(int stateCounter) {
+        _execStateCount = stateCounter;
     }
 }

Modified: ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/REPLY.java
URL: http://svn.apache.org/viewvc/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/REPLY.java?rev=573153&r1=573152&r2=573153&view=diff
==============================================================================
--- ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/REPLY.java (original)
+++ ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/runtime/REPLY.java Wed Sep  5 22:46:42 2007
@@ -46,13 +46,12 @@
         }
         FaultData fault = null;
 
-        // TODO: Check for fault without message.
 
         try {
-            Node msg = getBpelRuntimeContext()
+            Node msg = oreply.variable == null ? null : getBpelRuntimeContext()
                     .fetchVariableData(_scopeFrame.resolve(oreply.variable), false);
 
-            assert msg instanceof Element;
+            assert msg == null || msg instanceof Element; // note msg can be null for faults 
 
             for (Iterator i = oreply.initCorrelations.iterator(); i.hasNext(); ) {
                 OScope.CorrelationSet cset = (OScope.CorrelationSet) i.next();
@@ -64,7 +63,7 @@
             getBpelRuntimeContext()
                     .reply(_scopeFrame.resolve(oreply.partnerLink), oreply.operation.getName(),
                             oreply.messageExchangeId, (Element)msg,
-                            (oreply.fault != null) ? oreply.fault : null);
+                            oreply.fault);
         } catch (FaultException e) {
             __log.error(e);
             fault = createFault(e.getQName(), oreply);

Modified: ode/trunk/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/MockBpelServer.java
URL: http://svn.apache.org/viewvc/ode/trunk/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/MockBpelServer.java?rev=573153&r1=573152&r2=573153&view=diff
==============================================================================
--- ode/trunk/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/MockBpelServer.java (original)
+++ ode/trunk/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/MockBpelServer.java Wed Sep  5 22:46:42 2007
@@ -18,21 +18,35 @@
  */
 package org.apache.ode.bpel.runtime;
 
+import java.io.File;
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import javax.sql.DataSource;
+import javax.transaction.TransactionManager;
+import javax.wsdl.PortType;
+import javax.xml.namespace.QName;
+
 import org.apache.ode.bpel.dao.BpelDAOConnectionFactory;
 import org.apache.ode.bpel.dao.BpelDAOConnectionFactoryJDBC;
 import org.apache.ode.bpel.engine.BpelServerImpl;
 import org.apache.ode.bpel.iapi.BindingContext;
+import org.apache.ode.bpel.iapi.BpelEngineException;
 import org.apache.ode.bpel.iapi.ContextException;
 import org.apache.ode.bpel.iapi.Endpoint;
 import org.apache.ode.bpel.iapi.EndpointReference;
 import org.apache.ode.bpel.iapi.EndpointReferenceContext;
+import org.apache.ode.bpel.iapi.InvocationStyle;
 import org.apache.ode.bpel.iapi.Message;
 import org.apache.ode.bpel.iapi.MessageExchangeContext;
 import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
 import org.apache.ode.bpel.iapi.PartnerRoleChannel;
 import org.apache.ode.bpel.iapi.PartnerRoleMessageExchange;
 import org.apache.ode.bpel.iapi.Scheduler;
-import org.apache.ode.bpel.memdao.BpelDAOConnectionFactoryImpl;
 import org.apache.ode.dao.jpa.BPELDAOConnectionFactoryImpl;
 import org.apache.ode.il.EmbeddedGeronimoFactory;
 import org.apache.ode.il.MockScheduler;
@@ -44,36 +58,31 @@
 import org.w3c.dom.Document;
 import org.w3c.dom.Element;
 
-import javax.sql.DataSource;
-import javax.transaction.TransactionManager;
-import javax.wsdl.PortType;
-import javax.xml.namespace.QName;
-import java.io.File;
-import java.util.Collection;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
+class MockBpelServer {
 
+    BpelServerImpl _server;
 
-class MockBpelServer {
+    ProcessStoreImpl _store;
+
+    TransactionManager _txManager;
+
+    Database _database;
+
+    DataSource _dataSource;
+
+    SchedulerWrapper _scheduler;
+
+    BpelDAOConnectionFactory _daoCF;
+
+    EndpointReferenceContext _eprContext;
 
-    BpelServerImpl            _server;
-    ProcessStoreImpl          _store;
-    TransactionManager        _txManager;
-    Database                  _database;
-    DataSource                _dataSource;
-    SchedulerWrapper          _scheduler;
-    BpelDAOConnectionFactory  _daoCF;
-    EndpointReferenceContext  _eprContext;
-    MessageExchangeContext    _mexContext;
-    BindingContext            _bindContext;
-    HashMap<String, QName>    _activated = new HashMap();
-    HashMap                   _endpoints = new HashMap();
+    MessageExchangeContext _mexContext;
+
+    BindingContext _bindContext;
+
+    HashMap<String, QName> _activated = new HashMap<String, QName>();
+
+    HashMap<String, EndpointReference> _endpoints = new HashMap<String, EndpointReference>();
 
     public MockBpelServer() {
         try {
@@ -85,10 +94,10 @@
             if (_daoCF == null)
                 throw new RuntimeException("No DAO");
             _server.setDaoConnectionFactory(_daoCF);
-            _server.setInMemDaoConnectionFactory(new BpelDAOConnectionFactoryImpl(_scheduler));
             if (_scheduler == null)
                 throw new RuntimeException("No scheduler");
-            _store = new ProcessStoreImpl(_dataSource,"jpa", true);
+            _store = new ProcessStoreImpl(_dataSource, "jpa", true);
+            _server.setTransactionManager(_txManager);
             _server.setScheduler(_scheduler);
             _server.setEndpointReferenceContext(createEndpointReferenceContext());
             _server.setMessageExchangeContext(createMessageExchangeContext());
@@ -104,52 +113,34 @@
 
     public Collection<QName> deploy(File deploymentUnitDirectory) {
         Collection<QName> pids = _store.deploy(deploymentUnitDirectory);
-        for (QName pid: pids)
+        for (QName pid : pids)
             _server.register(_store.getProcessConfiguration(pid));
         return pids;
     }
 
     public void invoke(QName serviceName, String opName, Element body) throws Exception {
-        try {
-            String messageId = new GUID().toString();
-            MyRoleMessageExchange mex;
+        String messageId = new GUID().toString();
+        MyRoleMessageExchange mex;
+
+        mex = _server.createMessageExchange(InvocationStyle.UNRELIABLE, serviceName, opName, "" + messageId);
+        if (mex.getOperation() == null)
+            throw new Exception("Did not find operation " + opName + " on service " + serviceName);
+        Message request = mex.createMessage(mex.getOperation().getInput().getMessage().getQName());
+        Element wrapper = body.getOwnerDocument().createElementNS("", "main");
+        wrapper.appendChild(body);
+        Element message = body.getOwnerDocument().createElementNS("", "message");
+        message.appendChild(wrapper);
+        request.setMessage(message);
+        mex.setRequest(request);
+        mex.invokeBlocking();
+        mex.complete();
 
-            _txManager.begin();
-            mex = _server.getEngine().createMessageExchange("" + messageId, serviceName, opName);
-            if (mex.getOperation() == null)
-                throw new Exception("Did not find operation " + opName + " on service " + serviceName);
-            Message request = mex.createMessage(mex.getOperation().getInput().getMessage().getQName());
-            Element wrapper = body.getOwnerDocument().createElementNS("", "main");
-            wrapper.appendChild(body);
-            Element message = body.getOwnerDocument().createElementNS("", "message");
-            message.appendChild(wrapper);
-            request.setMessage(message);
-            mex.invoke(request);
-            mex.complete();
-            _txManager.commit();
-        } catch (Exception except) {
-              _txManager.rollback();
-              throw except;
-        }
     }
 
     public TransactionManager getTransactionManager() {
         return _txManager;
     }
 
-    public void waitForBlocking() {
-        try {
-            long delay = 1000;
-            while (true) {
-                // Be warned: ugly hack and not safe for slow CPUs.
-                long cutoff = System.currentTimeMillis() - delay;
-                if (_scheduler._nextSchedule < cutoff)
-                    break;
-                Thread.sleep(delay);
-            }
-        } catch (InterruptedException except) { }
-    }
-
     public void shutdown() throws Exception {
         _server.stop();
         _scheduler.stop();
@@ -205,17 +196,47 @@
         _eprContext = new EndpointReferenceContext() {
             public EndpointReference resolveEndpointReference(Element element) {
                 String service = DOMUtils.getChildCharacterData(element);
-                return (EndpointReference)_endpoints.get(service);
+                return (EndpointReference) _endpoints.get(service);
+            }
+
+            public EndpointReference convertEndpoint(QName qName, Element element) {
+                return null;
             }
-            public EndpointReference convertEndpoint(QName qName, Element element) { return null; }
         };
         return _eprContext;
     }
 
     protected MessageExchangeContext createMessageExchangeContext() {
-       _mexContext =  new MessageExchangeContext() {
-            public void invokePartner(PartnerRoleMessageExchange mex) { }
-            public void onAsyncReply(MyRoleMessageExchange myRoleMex) { }
+        _mexContext = new MessageExchangeContext() {
+           
+            public void onMyRoleMessageExchangeStateChanged(MyRoleMessageExchange myRoleMex) {
+            }
+
+            public void cancel(PartnerRoleMessageExchange mex) throws ContextException {
+                // TODO Auto-generated method stub
+
+            }
+
+            public Set<InvocationStyle> getSupportedInvocationStyle(PartnerRoleChannel prc, EndpointReference partnerEpr) {
+                // TODO Auto-generated method stub
+                return null;
+            }
+
+            public void invokePartnerUnreliable(PartnerRoleMessageExchange mex) throws ContextException {
+                // TODO Auto-generated method stub
+
+            }
+
+            public void invokePartnerReliable(PartnerRoleMessageExchange mex) throws ContextException {
+                // TODO Auto-generated method stub
+
+            }
+
+            public void invokePartnerTransacted(PartnerRoleMessageExchange mex) throws ContextException {
+                // TODO Auto-generated method stub
+
+            }
+
         };
         return _mexContext;
     }
@@ -225,12 +246,14 @@
             public EndpointReference activateMyRoleEndpoint(QName processId, Endpoint myRoleEndpoint) {
                 final Document doc = DOMUtils.newDocument();
                 Element serviceRef = doc.createElementNS(EndpointReference.SERVICE_REF_QNAME.getNamespaceURI(),
-                    EndpointReference.SERVICE_REF_QNAME.getLocalPart());
+                        EndpointReference.SERVICE_REF_QNAME.getLocalPart());
                 serviceRef.appendChild(doc.createTextNode(myRoleEndpoint.serviceName.toString()));
                 doc.appendChild(serviceRef);
                 _activated.put(myRoleEndpoint.toString(), processId);
                 return new EndpointReference() {
-                    public Document toXML() { return doc; }
+                    public Document toXML() {
+                        return doc;
+                    }
                 };
             }
 
@@ -239,12 +262,12 @@
             }
 
             public PartnerRoleChannel createPartnerRoleChannel(QName processId, PortType portType,
-                                                               final Endpoint initialPartnerEndpoint) {
+                    final Endpoint initialPartnerEndpoint) {
                 final EndpointReference epr = new EndpointReference() {
                     public Document toXML() {
                         Document doc = DOMUtils.newDocument();
                         Element serviceRef = doc.createElementNS(EndpointReference.SERVICE_REF_QNAME.getNamespaceURI(),
-                            EndpointReference.SERVICE_REF_QNAME.getLocalPart());
+                                EndpointReference.SERVICE_REF_QNAME.getLocalPart());
                         serviceRef.appendChild(doc.createTextNode(initialPartnerEndpoint.serviceName.toString()));
                         doc.appendChild(serviceRef);
                         return doc;
@@ -252,36 +275,32 @@
                 };
                 _endpoints.put(initialPartnerEndpoint.serviceName.toString(), epr);
                 return new PartnerRoleChannel() {
-                    public EndpointReference getInitialEndpointReference() { return epr; }
-                    public void close() { };
+                    public EndpointReference getInitialEndpointReference() {
+                        return epr;
+                    }
+
+                    public void close() {
+                    };
                 };
             }
         };
         return _bindContext;
     }
 
-
     private class SchedulerWrapper implements Scheduler {
 
         MockScheduler _scheduler;
-        long                _nextSchedule;
+
+        long _nextSchedule;
 
         SchedulerWrapper(BpelServerImpl server, TransactionManager txManager, DataSource dataSource) {
-            ExecutorService executorService = Executors.newCachedThreadPool();
             _scheduler = new MockScheduler(_txManager);
-            _scheduler.setExecutorSvc(executorService);
             _scheduler.setJobProcessor(server);
         }
 
-        public String schedulePersistedJob(Map<String,Object>jobDetail,Date when) throws ContextException {
+        public String schedulePersistedJob(Map<String, Object> jobDetail, Date when) throws ContextException {
             String jobId = _scheduler.schedulePersistedJob(jobDetail, when);
-            _nextSchedule = when == null ?  System.currentTimeMillis() : when.getTime();
-            return jobId;
-        }
-
-        public String scheduleVolatileJob(boolean transacted, Map<String,Object> jobDetail) throws ContextException {
-            String jobId = _scheduler.scheduleVolatileJob(transacted, jobDetail);
-            _nextSchedule = System.currentTimeMillis();
+            _nextSchedule = when == null ? System.currentTimeMillis() : when.getTime();
             return jobId;
         }
 
@@ -289,30 +308,32 @@
             _scheduler.cancelJob(jobId);
         }
 
-        public <T> T execTransaction(Callable<T> transaction) throws Exception, ContextException {
-            return _scheduler.execTransaction(transaction);
+        public void start() {
+            _scheduler.start();
         }
 
-        public <T> Future<T> execIsolatedTransaction(Callable<T> transaction) throws Exception, ContextException {
-            return _scheduler.execIsolatedTransaction(transaction);
+        public void stop() {
+            _scheduler.stop();
         }
 
-        public boolean isTransacted() {
-            return _scheduler.isTransacted();
+        public void shutdown() {
+            _scheduler.shutdown();
         }
 
-        public void start() { _scheduler.start(); }
-        public void stop() { _scheduler.stop(); }
-        public void shutdown() { _scheduler.shutdown(); }
+        public void setJobProcessor(JobProcessor processor) throws ContextException {
+            _scheduler.setJobProcessor(processor);
 
-        public void registerSynchronizer(Synchronizer synch) throws ContextException {
-            _scheduler.registerSynchronizer(synch);
         }
 
-        public void setJobProcessor(JobProcessor processor) throws ContextException {
-            _scheduler.setJobProcessor(processor);
+        public void jobCompleted(String jobId) {
+            _scheduler.jobCompleted(jobId);
 
         }
+    }
+
+    public void waitForBlocking() {
+        // TODO Auto-generated method stub
+        
     }
 
 }

Modified: ode/trunk/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/ProcessManagementTest.java
URL: http://svn.apache.org/viewvc/ode/trunk/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/ProcessManagementTest.java?rev=573153&r1=573152&r2=573153&view=diff
==============================================================================
--- ode/trunk/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/ProcessManagementTest.java (original)
+++ ode/trunk/bpel-runtime/src/test/java/org/apache/ode/bpel/runtime/ProcessManagementTest.java Wed Sep  5 22:46:42 2007
@@ -116,6 +116,5 @@
         _processQName = new QName(NAMESPACE, process);
         _server.invoke(_processQName, "instantiate",
                        DOMUtils.newDocument().createElementNS(NAMESPACE, "tns:RequestElement"));
-        _server.waitForBlocking();
     }
 }

Modified: ode/trunk/bpel-test/src/main/java/org/apache/ode/test/BPELTestAbstract.java
URL: http://svn.apache.org/viewvc/ode/trunk/bpel-test/src/main/java/org/apache/ode/test/BPELTestAbstract.java?rev=573153&r1=573152&r2=573153&view=diff
==============================================================================
--- ode/trunk/bpel-test/src/main/java/org/apache/ode/test/BPELTestAbstract.java (original)
+++ ode/trunk/bpel-test/src/main/java/org/apache/ode/test/BPELTestAbstract.java Wed Sep  5 22:46:42 2007
@@ -18,17 +18,36 @@
  */
 package org.apache.ode.test;
 
-import org.apache.ode.bpel.common.evt.DebugBpelEventListener;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Future;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import javax.persistence.EntityManager;
+import javax.persistence.EntityManagerFactory;
+import javax.xml.namespace.QName;
+
 import org.apache.ode.bpel.dao.BpelDAOConnectionFactory;
 import org.apache.ode.bpel.engine.BpelServerImpl;
+import org.apache.ode.bpel.iapi.InvocationStyle;
 import org.apache.ode.bpel.iapi.Message;
 import org.apache.ode.bpel.iapi.MessageExchange;
-import org.apache.ode.bpel.iapi.MessageExchange.Status;
 import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
-import org.apache.ode.bpel.iapi.MyRoleMessageExchange.CorrelationStatus;
 import org.apache.ode.bpel.iapi.ProcessStore;
 import org.apache.ode.bpel.iapi.ProcessStoreEvent;
 import org.apache.ode.bpel.iapi.ProcessStoreListener;
+import org.apache.ode.bpel.iapi.MessageExchange.AckType;
+import org.apache.ode.bpel.iapi.MessageExchange.Status;
+import org.apache.ode.bpel.iapi.MyRoleMessageExchange.CorrelationStatus;
 import org.apache.ode.bpel.memdao.BpelDAOConnectionFactoryImpl;
 import org.apache.ode.dao.jpa.BPELDAOConnectionFactoryImpl;
 import org.apache.ode.il.MockScheduler;
@@ -41,28 +60,9 @@
 import org.junit.Before;
 import org.w3c.dom.Element;
 
-import javax.persistence.EntityManager;
-import javax.persistence.EntityManagerFactory;
-import javax.persistence.Persistence;
-import javax.xml.namespace.QName;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.InputStream;
-import java.net.URISyntaxException;
-import java.net.URL;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
 public abstract class BPELTestAbstract {
-	private static final String SHOW_EVENTS_ON_CONSOLE = "no";
-	
+    private static final String SHOW_EVENTS_ON_CONSOLE = "no";
+
     protected BpelServerImpl _server;
 
     protected ProcessStore store;
@@ -89,6 +89,8 @@
     /** What's actually been deployed. */
     private List<Deployment> _deployed;
 
+    private MockTransactionManager _txm;
+
     @Before
     public void setUp() throws Exception {
         _failures = new CopyOnWriteArrayList<Failure>();
@@ -99,39 +101,24 @@
         _deployed = new ArrayList<Deployment>();
 
         if (Boolean.getBoolean("org.apache.ode.test.persistent")) {
-            emf = Persistence.createEntityManagerFactory("ode-unit-test-embedded");
-            em = emf.createEntityManager();
-            _cf = new BPELDAOConnectionFactoryImpl();
-            _server.setDaoConnectionFactory(_cf);
-            scheduler = new MockScheduler() {
-                @Override
-                public void begin() {
-                    super.begin();
-                    em.getTransaction().begin();
-                }
-
-                @Override
-                public void commit() {
-                    super.commit();
-                    em.getTransaction().commit();
-                }
 
-                @Override
-                public void rollback() {
-                    super.rollback();
-                    em.getTransaction().rollback();
-                }
+            _server.setDaoConnectionFactory(_cf);
+            _txm = new MockTransactionManager();
 
-            };
+            BPELDAOConnectionFactoryImpl cf = new BPELDAOConnectionFactoryImpl();
+            cf.setTransactionManager(_txm);
+            // cf.setDataSource(datasource);
+            scheduler = new MockScheduler(_txm);
         } else {
-            scheduler = new MockScheduler();
-            _cf = new BpelDAOConnectionFactoryImpl(scheduler);
+            _txm = new MockTransactionManager();
+            scheduler = new MockScheduler(_txm);
+            _cf = new BpelDAOConnectionFactoryImpl(_txm);
             _server.setDaoConnectionFactory(_cf);
         }
-        _server.setInMemDaoConnectionFactory(new BpelDAOConnectionFactoryImpl(scheduler));
         _server.setScheduler(scheduler);
         _server.setBindingContext(new BindingContextImpl());
         _server.setMessageExchangeContext(mexContext);
+        _server.setTransactionManager(_txm);
         scheduler.setJobProcessor(_server);
         store = new ProcessStoreImpl(null, "jpa", true);
         store.registerListener(new ProcessStoreListener() {
@@ -147,7 +134,7 @@
             }
         });
         _server.setConfigProperties(getConfigProperties());
-        _server.registerBpelEventListener(new DebugBpelEventListener());
+        // _server.registerBpelEventListener(new DebugBpelEventListener());
         _server.init();
         _server.start();
     }
@@ -162,19 +149,18 @@
                 System.err.println("Error undeploying " + d);
             }
         }
-        
 
         if (em != null)
             em.close();
         if (emf != null)
             emf.close();
-        
+
         _server.stop();
         _failures = null;
         _deployed = null;
         _deployments = null;
         _invocations = null;
-        
+
     }
 
     protected void negative(String deployDir) throws Throwable {
@@ -244,22 +230,24 @@
         }
     }
 
-    protected Invocation addInvoke(String id, QName target, String operation, String request, String responsePattern) throws Exception {
+    protected Invocation addInvoke(String id, QName target, String operation, String request, String responsePattern)
+            throws Exception {
 
         Invocation inv = new Invocation(id);
         inv.target = target;
         inv.operation = operation;
         inv.request = DOMUtils.stringToDOM(request);
-        inv.expectedStatus = null;
         if (responsePattern != null) {
-            inv.expectedFinalStatus = MessageExchange.Status.RESPONSE;
+            inv.expectedFinalStatus = AckType.RESPONSE;
+
             inv.expectedResponsePattern = Pattern.compile(responsePattern, Pattern.DOTALL);
-        }
+        } else
+            inv.expectedFinalStatus = AckType.ONEWAY;
 
         _invocations.add(inv);
         return inv;
     }
-    
+
     protected void go() throws Exception {
         try {
             doDeployments();
@@ -271,23 +259,22 @@
 
     protected void checkFailure() {
         StringBuffer sb = new StringBuffer("Failure report:\n");
-    	for (Failure failure : _failures) {
+        for (Failure failure : _failures) {
             sb.append(failure);
             sb.append('\n');
         }
-    	if (_failures.size() != 0) {
-        	System.err.println(sb.toString());
+        if (_failures.size() != 0) {
+            System.err.println(sb.toString());
             Assert.fail(sb.toString());
-    	}
+        }
     }
 
-    
     protected Deployment deploy(String location) {
         Deployment deployment = new Deployment(makeDeployDir(location));
         doDeployment(deployment);
         return deployment;
     }
-    
+
     protected void doDeployments() {
         for (Deployment d : _deployments)
             doDeployment(d);
@@ -303,7 +290,7 @@
 
         try {
             procs = store.deploy(d.deployDir);
-            
+
             _deployed.add(d);
         } catch (Exception ex) {
             if (d.expectedException == null) {
@@ -311,10 +298,9 @@
                 failure(d, "DEPLOY: Unexpected exception: " + ex, ex);
             } else if (!d.expectedException.isAssignableFrom(ex.getClass())) {
                 ex.printStackTrace();
-                failure(d, "DEPLOY: Wrong exception; expected " + d.expectedException + " but got " + ex.getClass(), ex);                
+                failure(d, "DEPLOY: Wrong exception; expected " + d.expectedException + " but got " + ex.getClass(), ex);
             }
 
-
             return;
         }
 
@@ -342,7 +328,7 @@
                 failure(d, "Undeployment failed.", ex);
             }
         }
-        
+
         _deployments.clear();
     }
 
@@ -352,6 +338,7 @@
             store.undeploy(d.deployDir);
         }
     }
+
     protected void doInvokes() throws Exception {
         ArrayList<Thread> testThreads = new ArrayList<Thread>();
         for (Invocation i : _invocations) {
@@ -374,6 +361,7 @@
     private void failure(Object where, String message, Exception ex) {
         Failure f = new Failure(where, message, ex);
         _failures.add(f);
+        ex.printStackTrace();
         Assert.fail(f.toString());
     }
 
@@ -394,28 +382,27 @@
             Assert.fail("Resource not found: " + deployxml);
         }
         try {
-			return new File(deployxmlurl.toURI().getPath()).getParentFile();
-		} catch (URISyntaxException e) {
-			e.printStackTrace();
-			Assert.fail(e.getMessage());
-			return null;
-		}
+            return new File(deployxmlurl.toURI().getPath()).getParentFile();
+        } catch (URISyntaxException e) {
+            e.printStackTrace();
+            Assert.fail(e.getMessage());
+            return null;
+        }
     }
 
     /**
-     * Override this to provide configuration properties for Ode extensions 
-     * like BpelEventListeners.
+     * Override this to provide configuration properties for Ode extensions like BpelEventListeners.
      * 
      * @return
      */
     protected Properties getConfigProperties() {
-    	// could also return null, returning an empty properties 
-    	// object is more fail-safe.
-    	Properties p = new Properties();
-    	p.setProperty("debugeventlistener.dumpToStdOut", SHOW_EVENTS_ON_CONSOLE);
-    	return p;
+        // could also return null, returning an empty properties
+        // object is more fail-safe.
+        Properties p = new Properties();
+        p.setProperty("debugeventlistener.dumpToStdOut", SHOW_EVENTS_ON_CONSOLE);
+        return p;
     }
-    
+
     protected static class Failure {
         Object where;
 
@@ -442,7 +429,7 @@
         public String toString() {
             StringBuffer sbuf = new StringBuffer(where + ": " + msg);
             if (ex != null) {
-            	sbuf.append("; got exception msg: " + ex.getMessage());
+                sbuf.append("; got exception msg: " + ex.getMessage());
             }
             if (actual != null)
                 sbuf.append("; got " + actual + ", expected " + expected);
@@ -499,11 +486,8 @@
         /** If non-null, expect an exception of this class (or subclass) on invoke. */
         public Class expectedInvokeException = null;
 
-        /** If non-null, expecte this status right after invoke. */
-        public MessageExchange.Status expectedStatus = null;
-
         /** If non-null, expect this status after response received. */
-        public MessageExchange.Status expectedFinalStatus = MessageExchange.Status.COMPLETED_OK;
+        public AckType expectedFinalStatus = AckType.RESPONSE;
 
         /** If non-null, expect this correlation status right after invoke. */
         public CorrelationStatus expectedCorrelationStatus = null;
@@ -550,21 +534,17 @@
             } catch (Exception ex) {
             }
 
-            scheduler.begin();
             try {
-                mex = _server.getEngine().createMessageExchange(new GUID().toString(), _invocation.target, _invocation.operation);
-                mexContext.clearCurrentResponse();
+                mex = _server.createMessageExchange(InvocationStyle.UNRELIABLE, _invocation.target, _invocation.operation,
+                        new GUID().toString());
 
                 Message request = mex.createMessage(_invocation.requestType);
                 request.setMessage(_invocation.request);
                 _invocation.invokeTime = System.currentTimeMillis();
-                running = mex.invoke(request);
+                mex.setRequest(request);
+                mex.invokeBlocking();
 
-                Status status = mex.getStatus();
                 CorrelationStatus cstatus = mex.getCorrelationStatus();
-                if (_invocation.expectedStatus != null && !status.equals(_invocation.expectedStatus))
-                    failure(_invocation, "Unexpected message exchange status", _invocation.expectedStatus, status);
-
                 if (_invocation.expectedCorrelationStatus != null && !cstatus.equals(_invocation.expectedCorrelationStatus))
                     failure(_invocation, "Unexpected correlation status", _invocation.expectedCorrelationStatus, cstatus);
 
@@ -575,19 +555,13 @@
                     failure(_invocation, "Unexpected invocation exception.", _invocation.expectedInvokeException, ex.getClass());
 
                 return;
-            } finally {
-                scheduler.commit();
             }
 
-            if (isFailed())
-                return;
+            if (mex.getStatus() != Status.ACK)
+                failure(_invocation, "No ACK status", Status.ACK.toString(), mex.getStatus().toString());
 
-            try {
-                running.get(_invocation.maximumWaitMs, TimeUnit.MILLISECONDS);
-            } catch (Exception ex) {
-                failure(_invocation, "Exception on future object.", ex);
+            if (isFailed())
                 return;
-            }
 
             long ctime = System.currentTimeMillis();
             long itime = ctime - _invocation.invokeTime;
@@ -600,32 +574,29 @@
             if (isFailed())
                 return;
 
-            scheduler.begin();
-            try {
-                Status finalstat = mex.getStatus();
-                if (_invocation.expectedFinalStatus != null && !_invocation.expectedFinalStatus.equals(finalstat))
-                    if (finalstat.equals(Status.FAULT)) {
-                    	failure(_invocation, "Unexpected final message exchange status", _invocation.expectedFinalStatus, "FAULT: " 
-                    			+ mex.getFault() + " | " + mex.getFaultExplanation());
-                    } else {
-                    	failure(_invocation, "Unexpected final message exchange status", _invocation.expectedFinalStatus, finalstat);
+            AckType finalstat = mex.getAckType();
+            if (_invocation.expectedFinalStatus != null && _invocation.expectedFinalStatus != finalstat) {
+                if (finalstat.equals(AckType.FAULT)) {
+                    failure(_invocation, "Unexpected final message exchange status", _invocation.expectedFinalStatus, "FAULT: "
+                            + mex.getFault() + " | " + mex.getFaultExplanation());
+                } else {
+                    failure(_invocation, "Unexpected final message exchange status", _invocation.expectedFinalStatus, finalstat);
+
+                    if (_invocation.expectedFinalCorrelationStatus != null
+                            && !_invocation.expectedFinalCorrelationStatus.equals(mex.getCorrelationStatus())) {
+                        failure(_invocation, "Unexpected final correlation status", _invocation.expectedFinalCorrelationStatus, mex
+                                .getCorrelationStatus());
+                    }
+                    if (_invocation.expectedResponsePattern != null) {
+                        if (mex.getResponse() == null)
+                            failure(_invocation, "Expected response, but got none.", null);
+                        String responseStr = DOMUtils.domToString(mex.getResponse().getMessage());
+                        Matcher matcher = _invocation.expectedResponsePattern.matcher(responseStr);
+                        if (!matcher.matches())
+                            failure(_invocation, "Response does not match expected pattern", _invocation.expectedResponsePattern,
+                                    responseStr);
                     }
-
-                if (_invocation.expectedFinalCorrelationStatus != null
-                        && !_invocation.expectedFinalCorrelationStatus.equals(mex.getCorrelationStatus())) {
-                    failure(_invocation, "Unexpected final correlation status", _invocation.expectedFinalCorrelationStatus, mex
-                            .getCorrelationStatus());
-                }
-                if (_invocation.expectedResponsePattern != null) {
-                    if (mex.getResponse() == null)
-                        failure(_invocation, "Expected response, but got none.", null);
-                    String responseStr = DOMUtils.domToString(mex.getResponse().getMessage());
-                    Matcher matcher = _invocation.expectedResponsePattern.matcher(responseStr);
-                    if (!matcher.matches())
-                        failure(_invocation, "Response does not match expected pattern", _invocation.expectedResponsePattern, responseStr);
                 }
-            } finally {
-                scheduler.commit();
             }
         }
     }